core38Example_main.c

Client/Server example in C language. This example needs a compiled schema, core38Example.sdl.

/*
* Demonstrates client/server with writing/reading and snapshot reads
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "rdm.h"
#include "core38Example_structs.h"
#include "core38Example_cat.h"
#include "example_fcns.h"
#include "rdmstartupapi.h"
const char* const description = "Demonstrates client/server with writing/reading and snapshot reads";
const RDM_CMDLINE_OPT opts[] = {
{"", "writer", NULL, "Start writer process with tfs_emded"},
{"", "reader", "s=*", "Start reader process and connect to given URI"},
{"", "snapshot", "s=*", "Start snapshot reader process and connect to given URI"},
{"", "timeout", "i=#", "Set writer timeout in seconds"},
{"", "end", "i=#", "Stop after NN minutes"},
{NULL, NULL, NULL, NULL} };
static uint32_t timeout = 0;
static uint32_t minutes = 5;
static RDM_RETCODE setTimeout(RDM_DB db)
{
char timeoutString[100];
printf("Timeout: %u Second\n", timeout);
sprintf(timeoutString, "%u", timeout);
return rdm_dbSetOption(db, "timeout", timeoutString);
}
static RDM_RETCODE startEmbedTFS(RDM_TFS* pTFS)
{
rc = rdm_rdmAllocTFS(pTFS);
if (rc == sOKAY)
{
rc = rdm_tfsSetOptions(*pTFS, "tfstype=embed;listen=on");
if (rc == sOKAY)
{
rc = rdm_tfsInitialize(*pTFS);
}
/* If any setup options have occurred, free the handle */
if (rc != sOKAY)
rdm_tfsFree(*pTFS);
}
return rc;
}
static RDM_RETCODE startRemoteTFS(RDM_TFS* pTFS)
{
rc = rdm_rdmAllocTFS(pTFS);
if (rc == sOKAY)
{
rc = rdm_tfsSetOptions(*pTFS, "tfstype=remote");
if (rc == sOKAY)
{
rc = rdm_tfsInitialize(*pTFS);
}
/* If any setup options have occurred, free the handle */
if (rc != sOKAY)
rdm_tfsFree(*pTFS);
}
return rc;
}
static void shutdownEmbedTFS(RDM_TFS tfs)
{
/* shutdown ability for new clients to connect */
puts("** Disable new connections.");
puts("** Drop existing connections.");
/* drop all currently connected clients */
rc = rdm_tfsKillAllRemoteConnections(tfs, "core38");
puts("** Remove database image.");
rc = rdm_tfsDropDatabase(tfs, "core38");
}
static RDM_RETCODE writerProcess(RDM_DB db)
{
READINGS recordData;
uint32_t exitTime = minutes * 60 * 1000; /* change time to millisecs */
uint32_t counter = 0;
puts("Starting Writer Process");
while (RDM_TRUE)
{
rc = rdm_dbStartUpdate(db, RDM_LOCK_ALL, 0, NULL, 0, NULL);
if (rc == eUNAVAIL)
{
printf("Timeout after inserting %u rows\n", counter);
counter = 0;
}
else if (rc == sOKAY)
{
rdm_timestampNow(0, &recordData.r_time);
recordData.r_value = rand();
strcpy(recordData.r_desc, "Sample description");
rc = rdm_dbInsertRow(db, TABLE_READINGS, &recordData, sizeof(recordData), NULL);
if (rc == sOKAY)
{
counter++;
}
else
{
}
}
else
{
break;
}
if (timeMeasureDiff(&timer) >= exitTime)
{
printf("Normal Shutdown after inserting %u rows\n", counter);
break;
}
}
return rc;
}
static RDM_RETCODE readerProcess(RDM_DB db)
{
uint32_t exitTime = minutes * 60 * 1000; /* change time to millisecs */
puts("Starting Reader Process");
while (RDM_TRUE)
{
rc = rdm_dbStartRead(db, RDM_LOCK_ALL, 0, NULL);
if (rc == eUNAVAIL)
{
puts("Lock timeout");
}
else if (rc == sOKAY)
{
RDM_CURSOR cursor = NULL;
uint32_t counter = 0;
rc = rdm_dbGetRows(db, TABLE_READINGS, &cursor);
if (rc == sOKAY)
{
for (rc = rdm_cursorMoveToFirst(cursor); rc == sOKAY; rc = rdm_cursorMoveToNext(cursor))
{
READINGS recordData;
rc = rdm_cursorReadRow(cursor, &recordData, sizeof(recordData), NULL);
/* All we'll do here is just read the row */
if (rc == sOKAY)
counter++;
}
/* sENDOFCURSOR is the expected return code */
if (rc == sENDOFCURSOR)
rc = sOKAY;
printf("Read %u rows\n", counter);
}
}
else
{
break;
}
/* See if it is time to exit this loop */
if (timeMeasureDiff(&timer) >= exitTime)
{
puts("Normal Shutdown");
break;
}
}
return rc;
}
static RDM_RETCODE snapshotProcess(RDM_DB db)
{
uint32_t exitTime = minutes * 60 * 1000; /* change time to millisecs */
puts("Starting Reader Process");
while (RDM_TRUE)
{
rc = rdm_dbStartSnapshot(db, RDM_LOCK_ALL, 0, NULL);
if (rc == eUNAVAIL)
{
puts("Lock timeout");
}
else if (rc == sOKAY)
{
RDM_CURSOR cursor = NULL;
uint32_t counter = 0;
rc = rdm_dbGetRows(db, TABLE_READINGS, &cursor);
if (rc == sOKAY)
{
for (rc = rdm_cursorMoveToFirst(cursor); rc == sOKAY; rc = rdm_cursorMoveToNext(cursor))
{
READINGS recordData;
rc = rdm_cursorReadRow(cursor, &recordData, sizeof(recordData), NULL);
/* All we'll do here is just read the row */
if (rc == sOKAY)
counter++;
}
/* sENDOFCURSOR is the expected return code */
if (rc == sENDOFCURSOR)
rc = sOKAY;
printf("Read %u rows\n", counter);
}
}
else
{
break;
}
/* See if it is time to exit this loop */
if (timeMeasureDiff(&timer) >= exitTime)
{
puts("Normal Shutdown");
break;
}
}
return rc;
}
/* Start Embed TFS and being inserting rows into the database.
*/
static RDM_RETCODE startWriterWithEmbeddedServer(void)
{
RDM_TFS tfs;
RDM_DB db;
printf("Writer with embedded TFS\n\nShutdown in %u minutes\n", minutes);
rc = startEmbedTFS(&tfs);
if (rc == sOKAY)
{
rc = exampleOpenNextDatabase(tfs, &db, "core38", core38Example_cat);
if (rc == sOKAY)
{
rc = setTimeout(db);
}
if (rc == sOKAY)
{
rc = writerProcess(db);
shutdownEmbedTFS(tfs);
}
}
return rc;
}
static RDM_RETCODE startReader(const char* uri)
{
RDM_TFS tfs;
RDM_DB db;
printf("Reader connecting to TFS at %s\n\nShutdown in %u minutes\n", uri, minutes);
rc = startRemoteTFS(&tfs);
if (rc == sOKAY)
{
char dbname[1000];
sprintf(dbname, "%s/core38", uri);
printf("Attempting to open: %s", dbname);
rc = exampleOpenNextDatabase(tfs, &db, dbname, NULL);
if (rc == sOKAY)
{
rc = setTimeout(db);
}
if (rc == sOKAY)
{
rc = readerProcess(db);
}
}
return rc;
}
static RDM_RETCODE startSnapshotReader(const char *uri)
{
RDM_TFS tfs;
RDM_DB db;
printf("Snapshot Reader connecting to TFS at %s\n\nShutdown in %u minutes\n", uri, minutes);
rc = startRemoteTFS(&tfs);
if (rc == sOKAY)
{
char dbname[1000];
sprintf(dbname, "%s/core38", uri);
printf("Attempting to open: %s", dbname);
rc = exampleOpenNextDatabase(tfs, &db, dbname, NULL);
if (rc == sOKAY)
{
rc = setTimeout(db);
}
if (rc == sOKAY)
{
rc = snapshotProcess(db);
}
}
return rc;
}
int main_core38(int argc, const char* const* argv)
{
const char* opt;
const char* optarg;
char cMode = 0;
rc = rdm_cmdlineInit(&cmd, argc, argv, description, opts);
if (rc != sCMD_USAGE)
if (rc == sOKAY)
{
while ((opt = rdm_cmdlineNextLongOption(&cmd, &optarg)) != 0)
{
switch (*opt)
{
case 'w':
if (cMode)
{
puts("TFS mode already set.");
rc = sCMD_USAGE;
}
else
{
cMode = *opt;
}
break;
case 'r':
if (cMode)
{
puts("TFS mode already set.");
rc = sCMD_USAGE;
}
else
{
cMode = *opt;
}
break;
case 's':
if (cMode)
{
puts("TFS mode already set.");
rc = sCMD_USAGE;
}
else
{
cMode = *opt;
}
break;
case 't':
timeout = atoi(optarg);
break;
case 'e':
minutes = atoi(optarg);
break;
default:
break;
}
}
if (rc == sOKAY)
{
switch (cMode)
{
case 'w':
rc = startWriterWithEmbeddedServer();
break;
case 'r':
rc = startReader(optarg);
break;
case 's':
rc = startSnapshotReader(optarg);
break;
default:
puts("No Action Defined. Exiting.");
break;
}
}
}
return (int)rc;
}
RDM_RETCODE rdm_tfsKillAllRemoteConnections(RDM_TFS tfs, const char *uri)
Kill all the TFS connections.
RDM_RETCODE rdm_cursorMoveToFirst(RDM_CURSOR cursor)
Position a cursor to the first row in the collection.
RDM_RETCODE rdm_cursorMoveToNext(RDM_CURSOR cursor)
Position a cursor to the next row in the collection.
Header for the native RDM Runtime API.
RDM_RETCODE rdm_rdmAllocTFS(RDM_TFS *phTFS)
Allocate a TFS handle.
void timeMeasureBegin(perfTimer_t *timer)
RDM_RETCODE rdm_dbEnd(RDM_DB db)
End a transactional operation.
@ sCMD_USAGE
Definition: rdmretcodetypes.h:71
RDM_RETCODE rdm_timestampNow(int16_t time_zone, RDM_PACKED_TIMESTAMP_T *ts)
Get the current local timestamp.
struct RDM_CURSOR_S * RDM_CURSOR
Definition: rdmtypes.h:306
RDM_RETCODE rdm_dbStartRead(RDM_DB db, const RDM_TABLE_ID *tableIds, uint32_t numTableIds, RDM_TRANS *pTrans)
Get read locks.
RDM_RETCODE rdm_cursorReadRow(RDM_CURSOR cursor, void *colValues, size_t bytesIn, size_t *bytesOut)
Read all columns from a row.
void timeMeasureEnd(perfTimer_t *timer)
@ eUNAVAIL
Definition: rdmretcodetypes.h:193
The buffer used by the command line parser to hold state information.
Definition: rdmcmdlinetypes.h:85
RDM_RETCODE rdm_dbClose(RDM_DB db)
Close the database associated with a database handle.
#define print_error(rc)
Definition: example_fcns.h:17
@ sOKAY
Definition: rdmretcodetypes.h:97
@ RDM_TRUE
Definition: psptypes.h:60
Generic usage function option record.
Definition: rdmcmdlinetypes.h:32
RDM_RETCODE rdm_dbSetOption(RDM_DB db, const char *keyword, const char *strValue)
Set a single RDM option from a string.
#define RDM_STARTUP_EXAMPLE(name)
Definition: rdmstartuptypes.h:73
RDM_RETCODE rdm_dbStartSnapshot(RDM_DB db, const RDM_TABLE_ID *tableIds, uint32_t numTableIds, RDM_TRANS *pTrans)
Start a snapshot.
RDM_RETCODE rdm_dbGetRows(RDM_DB db, RDM_TABLE_ID tableId, RDM_CURSOR *pCursor)
Associate an RDM_CURSOR with rows based on a table id.
#define exampleOpenNextDatabase(tfs, db, name, catalog)
Definition: example_fcns.h:26
unsigned int timeMeasureDiff(perfTimer_t *timer)
RDM_RETCODE rdm_dbEndRollback(RDM_DB db)
End and rollback a transactional operation.
RDM_RETCODE rdm_tfsDropDatabase(RDM_TFS tfs, const char *dbNameSpec)
Drop the specified database.
RDM_RETCODE rdm_dbInsertRow(RDM_DB db, RDM_TABLE_ID tableId, const void *colValues, size_t bytesIn, RDM_CURSOR *pCursor)
Insert a new row into a table at the specified rowId.
RDM_RETCODE rdm_tfsDisableListener(RDM_TFS hTFS)
Stop the listener process for the TFS.
struct RDM_TFS_S * RDM_TFS
RDM TFS Handle.
Definition: rdmtfstypes.h:21
RDM_RETCODE rdm_cmdlineInit(RDM_CMDLINE *cmd, int32_t argc, const char *const argv[], const char *description, const RDM_CMDLINE_OPT *opts)
Initialize an RDM_CMDLINE buffer and validate the command line.
struct RDM_DB_S * RDM_DB
Definition: rdmtypes.h:305
RDM_RETCODE rdm_tfsFree(RDM_TFS hTFS)
Terminate a TFS service.
#define RDM_LOCK_ALL
Definition: rdmtypes.h:170
RDM_RETCODE rdm_dbStartUpdate(RDM_DB db, const RDM_TABLE_ID *writeTableIds, uint32_t numWriteTableIds, const RDM_TABLE_ID *readTableIds, uint32_t numReadTableIds, RDM_TRANS *pTrans)
Get write locks.
const char * rdm_cmdlineNextLongOption(RDM_CMDLINE *cmd, const char **arg)
Get next option or argument.
@ sENDOFCURSOR
Definition: rdmretcodetypes.h:58
Definition: example_fcns.h:37
RDM_RETCODE rdm_tfsSetOptions(RDM_TFS tfs, const char *optString)
Set TFS options.
Internal RDM Startup API used by startup macros.
RDM_RETCODE
RDM status and error return codes.
Definition: rdmretcodetypes.h:43
RDM_RETCODE rdm_tfsInitialize(RDM_TFS tfs)
Initialize a RDM_TFS instance.