cpp55Example_main.cpp

A performance example for RDM Core functions with producers and consumers that runs in parallel. This example needs a compiled schema, cpp55.sdl.

/* \file cpp55_main.c
\brief Example program for efficient insertion of 6 byte reading values
*/
#include "rdm.h"
#include "rdmstartupapi.h"
#include "cpp55_cat.h"
#include "cpp55_structs.h"
#include "thread_api.h"
#include "sw.h"
#include "rand.h"
#include <string.h>
#include <stdlib.h>
#include <sstream>
#include <string>
#include <cstdio>
#include <cstdlib>
/*
* This example shows how to utilize multiple threads to rapidly
* insert 6-byte readings into RDM databases and at the same time
* reading the data from that database.
*
* The default configuration inserts 1,000,000,000 6-byte readings
* into a series of database (1 database per thread) hosted by a
* single TFS.
*
* The databases are opened in shared mode and use a durability
* setting of consistent. The durability setting can be configured
* using the command line parameters.
*
* ## Thread Usage
*
* The test uses a number of insertion threads (producers). Each
* thread will insert a portion of the total readings for a test run
* into the database for that thread. The test also uses a number of
* lookup threads (consumers). Using more threads (and more databases)
* can increase overall throughput up until the point that there may
* be more threads that processor cores available or available I/O
* bandwith.
*
* The test defaults to inserting 1,000,000,000 readings across all of
* the databases utilized in a test run. There will be one database
* per insert thread. The default is to insert 100 readings into the
* data column of a reading row. Changing the size of the data column
* in the readings table in the cpp55.sdl file will change the number
* of readings stored per row.
*
* ## Number of Readings
*
* The test defaults to inserting 1000 rows in a single transaction
* block. A transaction commit is an expensive operation and
* performance can be greatly improved by combining multiple inserts
* into a transaction. However too many inserts in a transaction can
* reduce performance be requiring a large amount of cached rows while
* processing transactions.
*
* ## Durability ##
*
* The durability options affect when database changes are sync'd to
* disk. There are several durability options:
*
* * **durable** The highest level of transaction safeness. The user
* should never lose any data assuming the underlying media
* is durable.
*
* * **consistent** This is a medium level of transaction
* safeness. The user may lose data in the case the operating
* system crashes or shuts down without flushing its file
* system cache. Recovery will ensure a consistent state
* where a partial transaction should never be observed.
*
* * **unsafe** This is the lowest level of transaction safeness. The
* user may lose data including the entire database in the
* case the operating system crashes or shuts down without
* flushing its file system cache.
*/
using namespace RDM_CPP::CPP55;
using namespace std;
/* \brief Standard error print macro */
#define print_error(rc) print_errorEx (rc, __FILE__, __LINE__)
/* \brief Standard error print format
*
* This demonstrates the usage of the rdm_retcodeGetName() and
* rdm_retcodeGetDescription() functions.
*/
RDM_RETCODE rc, /*< [in] RDM_RETCODE to analyze */
const char *file, /*< [in] Sourcec filename */
int line /*< [in] Lineno in source file */
)
{
if (rc < sOKAY)
{
cerr << file << ":" << line << ": error: " <<
rdm_retcodeGetName (rc) << " (" << rc << "): " << rdm_retcodeGetDescription (rc) << endl;
}
}
#define MAX_THREADS 64
/* \brief Get the CPU count
*/
extern "C" RDM_EXPORT uint32_t psp_envCPUCount (void);
/* \brief context structure for a threads
A context structure that contains information and data for a
thread to do its work.
*/
typedef struct THREAD_CTX_S
{
RDM_DB db; /*< Database hande for the thrad */
uint32_t threadNum; /*< The thread number */
psp_thread_t thread; /*< The thread handle */
RDM_BOOL_T consumer; /*< Is this a consumer that only read rows */
SW *swInsertStart; /*< Stopwatch for starting a transaction for doing inserts */
SW *swInsert; /*< Stopwatch for doing inserts */
SW *swInsertEnd; /*< Stopwatch for ending a transaction for inserts */
SW *swReadStart; /*< Stopwatch for starting a transaction for doing reads */
SW *swReadMove; /*< Stopwatch for looking up a row by primary key */
SW *swRead; /*< Stopwatch for doing reads */
SW *swReadNext; /*< Stopwatch for navigating to next row */
SW *swReadEnd; /*< Stopwatch for ending a transaction for reads */
uint64_t maxTs; /*< The maximum time stamp written by this thread */
uint64_t numReadings;/*< The number of readings in the database before thread termination */
struct TEST_CTX_S *testCtx; /*< A pointer to the test context */
} THREAD_CTX;
/* \brief context structure for a test run
A context structure that contains configuration and results for a single test run.
The results will be stored in a database table so different test runs using different
configurations can be compared.
*/
typedef struct TEST_CTX_S
{
RDM_TFS tfs; /*< TFS handle common to all threads (if set) */
uint64_t num_readings; /*< The number of readings to be inserted*/
uint64_t num_rows; /*< The number of rows to be inserted */
uint32_t num_readings_per_row; /*< The number of reading per row */
uint32_t num_rows_per_trans; /*< The number of rows per transaction */
uint32_t num_threads; /*< The number of threads to distibute the work */
uint32_t num_consumers; /*< The number of consumer threads */
uint32_t num_producers; /*< The number of producer threads */
const char *durability; /*< The durability mode for databases */
RDM_OPEN_MODE openMode; /*< The open mode for databases */
THREAD_CTX threadCtxs[MAX_THREADS]; /*< An array of thread contexts*/
} TEST_CTX;
typedef uint8_t reading[6];
/*
* \brief Allocate and initialize a single TFS handle
*
* \returns A RDM_RETCODE value
*
* \retval sOKAY on success
*/
static RDM_RETCODE allocAndInitTFS (RDM_TFS *pTFS /*< [OUT] The TFS handle for the allocated TFS */
)
{
RDM_TFS tfs;
rc = rdm_rdmAllocTFS (&tfs);
if (rc == sOKAY)
{
rc = rdm_tfsSetOption (tfs, "tfstype", "embed");
if (rc == sOKAY)
{
rc = rdm_tfsInitialize (tfs);
}
if (rc != sOKAY)
{
(void) rdm_tfsFree (tfs);
}
}
if (rc == sOKAY)
{
*pTFS = tfs;
}
return rc;
}
/*
* \brief Get the DB name to open
*
* This function calculates the database name. If this thread is a
* producer it will simply be a database name based on the thread
* number. For a consumer thread it is a union of all the databases.
*/
static void getDatabaseName (THREAD_CTX *threadCtx, /*< [IN] A pointer to a thread context */
string &dbName /*< [OUT] The returned database name */
)
{
TEST_CTX *testCtx = threadCtx->testCtx;
stringstream dbNameStream;
if (threadCtx->consumer)
{
for (int ii = 0; ii < testCtx->num_producers; ii++)
{
if (ii > 0)
{
dbNameStream << "|";
}
dbNameStream << "cpp55_" << ii;
}
}
else
{
dbNameStream << "cpp55_" << threadCtx->threadNum;
}
dbName = dbNameStream.str ();
}
/*
* \brief Open the readings database for a single thread
*
* \returns A RDM_RETCODE value
*
* \retval sOKAY on success
*/
static RDM_RETCODE openDatabase_thread (THREAD_CTX *threadCtx /*< [IN] A pointer to a thread context */
)
{
RDM_DB db;
TEST_CTX *testCtx = threadCtx->testCtx;
string dbName;
getDatabaseName (threadCtx, dbName);
if (threadCtx->consumer == RDM_FALSE)
{
rdm_tfsDropDatabase (testCtx->tfs, dbName.c_str ());
}
rc = rdm_tfsAllocDatabase (testCtx->tfs, &db);
if (rc == sOKAY)
{
rc = rdm_dbSetOption (db, "durability", testCtx->durability);
if (rc == sOKAY)
{
rc = rdm_dbSetCatalog (db, cpp55_cat);
}
if (rc == sOKAY)
{
if (threadCtx->consumer)
{
rc = rdm_dbOpen (db, dbName.c_str (), RDM_OPEN_READONLY);
}
else
{
rc = rdm_dbOpen (db, dbName.c_str (), testCtx->openMode);
}
}
if (rc != sOKAY)
{
}
}
if (rc == sOKAY)
{
threadCtx->db = db;
}
return rc;
}
/*
* \brief Close a database handle for a single thread
*
* \returns A RDM_RETCODE value
*
* \retval sOKAY on success
*/
static RDM_RETCODE closeDatabase_thread (THREAD_CTX *threadCtx /*< [IN] A pointer to the thread context to process */
)
{
rc = rdm_dbClose (threadCtx->db);
if (rc == sOKAY)
{
rc = rdm_dbFree (threadCtx->db);
}
else
{
rdm_dbFree (threadCtx->db);
}
return rc;
}
/* \brief Get the progress for the producer threads
*
* The progress that all the producers collectively have made is
* communicated here.
*
* Please note that Valgrind with the Helgrind tool, Intel
* Inspectory, or other tools that can find race conditions may
* report race conditions here. However, the code is safe since the
* producers only write increasing values and the consumers only
* checks whether the values have reached a certain value.
*
* @return Every time stamp less than the time stamp returned are available for any consummer
*/
static uint64_t getProgress_thread__race_condition_ok (TEST_CTX *testCtx /*< [in] The test context */
)
{
uint64_t readyTs = RDM_UINT64_MAX;
uint32_t numProducers = testCtx->num_producers;
for (uint32_t producer = 0; producer < numProducers; producer++)
{
THREAD_CTX *producerThread = &testCtx->threadCtxs[producer];
uint64_t producerMaxTs = producerThread->maxTs;
if (producerMaxTs < readyTs)
{
readyTs = producerMaxTs;
}
}
return readyTs;
}
/* \brief Set the progress for a thread
*
* Whenever a producer has committed some changes the progress it has
* made is communicated here.
*
* Please note that Valgrind with the Helgrind tool, Intel
* Inspectory, or other tools that can find race conditions may
* report race conditions here. However, the code is safe since the
* producers only write increasing values and the consumers only
* checks whether the values have reached a certain value.
*
*/
static void setProgress_thread__race_condition_ok (THREAD_CTX *threadCtx, /*< [IN] The context for this thread */
uint64_t currTs /*< [IN] The current time stamp for the thread */
)
{
threadCtx->maxTs = currTs;
}
/*
* \brief Thread function for inserting readings into the database for a single thread
*
* Loop through and insert the number of rows this thread is responsible for. Each row
* will contain several diffent readings in the data column.
*
* \returns A RDM_RETCODE value
*
* \retval sOKAY on success
*/
static RDM_RETCODE readReadings_thread (THREAD_CTX *threadCtx /*< [IN] The context for this thread */
)
{
READINGS tableData = {0};
RDM_DB db = threadCtx->db;
TEST_CTX *testCtx = threadCtx->testCtx;
RAND rand;
SW *swStart = threadCtx->swReadStart;
SW *swMove = threadCtx->swReadMove;
SW *swRead = threadCtx->swRead;
SW *swNext = threadCtx->swReadNext;
SW *swEnd = threadCtx->swReadEnd;
uint64_t numRows = testCtx->num_rows;
uint32_t threadNum = threadCtx->threadNum;
uint32_t numProducers = testCtx->num_producers;
uint32_t numConsumers = testCtx->num_consumers;
uint32_t consumerNum = threadNum - numProducers;
uint32_t num_readings_per_row = testCtx->num_readings_per_row;
uint32_t num_rows_per_trans = testCtx->num_rows_per_trans;
RDM_CURSOR cursor = NULL;
uint64_t nextTs = num_rows_per_trans * consumerNum;
rc = rdm_dbGetRowsByKey (db, KEY_READINGS_TS, &cursor);
while (rc == sOKAY && nextTs < numRows)
{
RDM_TABLE_ID tables_to_lock = TABLE_READINGS;
uint64_t maxTs = getProgress_thread__race_condition_ok (testCtx);
/* Waiting for rows to become available. You may want to
* replace this with some other synchronization between the
* threads, especially if the producers are slow or more
* resources are allocated for the consumers than the
* producers.
*/
if (nextTs + num_rows_per_trans > numRows)
{
while (numRows > maxTs)
{
maxTs = getProgress_thread__race_condition_ok (testCtx);
}
}
else
{
while (nextTs + num_rows_per_trans > maxTs)
{
maxTs = getProgress_thread__race_condition_ok (testCtx);
}
}
swStart->start ();
rc = rdm_dbStartSnapshot (db, &tables_to_lock, 1, NULL);
swStart->stop ();
if (rc == sOKAY)
{
for (uint32_t ii = 0; rc == sOKAY && ii < num_rows_per_trans && nextTs < numRows; ii++)
{
if (ii == 0)
{
swMove->start ();
rc = rdm_cursorMoveToKey (cursor, KEY_READINGS_TS, &nextTs, sizeof(nextTs));
swMove->stop ();
}
else
{
swNext->start ();
rc = rdm_cursorMoveToNext (cursor);
swNext->stop ();
}
if (rc == sOKAY)
{
swRead->start ();
rc = rdm_cursorReadRow (cursor, &tableData, sizeof (tableData), NULL);
swRead->stop ();
}
if (tableData.ts != nextTs)
{
cout << "TS missmatch Ts: " << tableData.ts << ", nextTs: " << nextTs << endl;
}
nextTs++;
}
if (rc == sENDOFCURSOR)
{
rc = sOKAY;
}
if (rc == sOKAY)
{
swEnd->start ();
rc = rdm_dbEnd (db);
swEnd->stop ();
}
else
{
(void) rdm_dbEnd (db);
}
}
nextTs += num_rows_per_trans * (numConsumers - 1);
}
return rc;
}
/*
* \brief Thread function for inserting readings into the database for a single thread
*
* Loop through and insert the number of rows this thread is responsible for. Each row
* will contain several diffent readings in the data column.
*
* \returns A RDM_RETCODE value
*
* \retval sOKAY on success
*/
static RDM_RETCODE insertReadings_thread (THREAD_CTX *threadCtx /*< [IN] The context for this thread */
)
{
READINGS tableData = {0};
RDM_DB db = threadCtx->db;
reading *currReadings;
TEST_CTX *testCtx = threadCtx->testCtx;
SW *swStart = threadCtx->swInsertStart;
SW *swInsert = threadCtx->swInsert;
SW *swEnd = threadCtx->swInsertEnd;
uint64_t numRows = testCtx->num_rows;
uint64_t numProducers = testCtx->num_producers;
uint32_t threadNum = threadCtx->threadNum;
uint32_t num_readings_per_row = testCtx->num_readings_per_row;
uint32_t num_rows_per_trans = testCtx->num_rows_per_trans;
uint32_t currTs = (uint32_t) (threadNum);
while (rc == sOKAY && currTs < numRows)
{
RDM_TABLE_ID tables_to_lock = TABLE_READINGS;
swStart->start ();
rc = rdm_dbStartUpdate (db, &tables_to_lock, 1, RDM_LOCK_NONE, 0, NULL);
swStart->stop ();
if (rc == sOKAY)
{
for (uint32_t ii = 0; rc == sOKAY && ii < num_rows_per_trans && currTs < numRows; ii++)
{
uint32_t currValue = currTs;
tableData.ts = currTs;
/* Loop to set the number of readings stored in a row */
uint32_t jj;
for (jj = 0, currReadings = (reading *) tableData.data;
jj < num_readings_per_row; jj++, currReadings++)
{
memcpy (currReadings, &currValue, sizeof (currValue));
currValue++;
}
swInsert->start ();
rc = rdm_dbInsertRow (db, TABLE_READINGS, &tableData, sizeof (tableData), NULL);
swInsert->stop ();
currTs += numProducers;
}
if (rc == sOKAY)
{
swEnd->start ();
rc = rdm_dbEnd (db);
swEnd->stop ();
}
else
{
(void) rdm_dbEndRollback (db);
}
setProgress_thread__race_condition_ok (threadCtx, currTs);
}
}
return rc;
}
/*
* \brief Calculate the number of readings stored in a database for a single thread
*
* Get the row count in the readings table and multiply that by the
* number of readings stored in a row.
*
* \returns A RDM_RETCODE value
*
* \retval sOKAY on success
*/
static RDM_RETCODE getNumReadings_thread (THREAD_CTX *threadCtx /*< [IN] A pointer to a thread context */
)
{
RDM_TABLE_ID tables_to_lock = TABLE_READINGS;
RDM_RETCODE rc = rdm_dbStartRead (threadCtx->db, &tables_to_lock, 1, NULL);
TEST_CTX *testCtx = threadCtx->testCtx;
if (rc == sOKAY)
{
RDM_CURSOR cursor = NULL;
rc = rdm_dbGetRows (threadCtx->db, TABLE_READINGS, &cursor);
if (rc == sOKAY)
{
const uint32_t num_readings_per_row = testCtx->num_readings_per_row;
uint64_t count;
rc = rdm_cursorGetCount (cursor, &count);
threadCtx->numReadings = count * num_readings_per_row;
if (rc == sOKAY)
{
rc = rdm_cursorFree (cursor);
}
else
{
(void) rdm_cursorFree (cursor);
}
}
if (rc == sOKAY)
{
rc = rdm_dbEnd (threadCtx->db);
}
else
{
rdm_dbEnd (threadCtx->db);
}
}
return rc;
}
/*
* \brief Do the actual inserts assigned to a single thread
*
* \returns A RDM_RETCODE value
*
* \retval sOKAY on success
*/
static RDM_RETCODE EXTERNAL_FCN doWork_thread (void *data /*< [IN] The context for this thread */
)
{
THREAD_CTX *threadCtx = reinterpret_cast<THREAD_CTX *> (data);
if (threadCtx->consumer == RDM_TRUE)
{
rc = readReadings_thread (threadCtx);
}
else
{
rc = insertReadings_thread (threadCtx);
if (rc == sOKAY)
{
rc = getNumReadings_thread (threadCtx);
}
}
if (rc == sOKAY)
{
rc = closeDatabase_thread (threadCtx);
}
else
{
closeDatabase_thread (threadCtx);
}
return rc;
}
/* \brief Setup stopwatches
*
* Iterate through the context for each thread and initialaze the
* stopwatches.
*/
static void setupStopwatches (TEST_CTX *testCtx /*< [IN] A pointer to the test context */
)
{
uint32_t ii;
THREAD_CTX *threadCtx;
for (ii = 0, threadCtx = testCtx->threadCtxs; ii < testCtx->num_threads; ii++, threadCtx++)
{
stringstream descriptionInsertStart;
descriptionInsertStart << "Insert row - Start transaction for thread " << ii;
threadCtx->swInsertStart = new SW (descriptionInsertStart.str ());
stringstream descriptionInsert;
descriptionInsert << "Insert row - Insert one row for thread " << ii;
threadCtx->swInsert = new SW (descriptionInsert.str ());
stringstream descriptionInsertEnd;
descriptionInsertEnd << "Insert row - End transaction for thread " << ii;
threadCtx->swInsertEnd = new SW (descriptionInsertEnd.str ());
stringstream descriptionReadStart;
descriptionReadStart << "Read row - Start transaction for thread " << ii;
threadCtx->swReadStart = new SW (descriptionReadStart.str ());
stringstream descriptionReadMove;
descriptionReadMove << "Read row - Move to a row by primary key for thread " << ii;
threadCtx->swReadMove = new SW (descriptionReadMove.str ());
stringstream descriptionRead;
descriptionRead << "Read row - Read one row for thread " << ii;
threadCtx->swRead = new SW (descriptionRead.str ());
stringstream descriptionReadNext;
descriptionReadNext << "Read row - Navigato to next row for thread " << ii;
threadCtx->swReadNext = new SW (descriptionReadNext.str ());
stringstream descriptionReadEnd;
descriptionReadEnd << "Read row - End transaction for thread " << ii;
threadCtx->swReadEnd = new SW (descriptionReadEnd.str ());
}
}
/* \brief Report stopwatches
*
* Iterate through the context for each thread and delete the
* stopwatches and their measurements will be reported.
*/
static void reportStopwatches (TEST_CTX *testCtx /*< [IN] A pointer to the test context */
)
{
uint32_t ii;
THREAD_CTX *threadCtx;
for (ii = 0, threadCtx = testCtx->threadCtxs; ii < testCtx->num_threads; ii++, threadCtx++)
{
delete threadCtx->swInsertStart;
}
for (ii = 0, threadCtx = testCtx->threadCtxs; ii < testCtx->num_threads; ii++, threadCtx++)
{
delete threadCtx->swInsert;
}
for (ii = 0, threadCtx = testCtx->threadCtxs; ii < testCtx->num_threads; ii++, threadCtx++)
{
delete threadCtx->swInsertEnd;
}
for (ii = 0, threadCtx = testCtx->threadCtxs; ii < testCtx->num_threads; ii++, threadCtx++)
{
delete threadCtx->swReadStart;
}
for (ii = 0, threadCtx = testCtx->threadCtxs; ii < testCtx->num_threads; ii++, threadCtx++)
{
delete threadCtx->swReadMove;
}
for (ii = 0, threadCtx = testCtx->threadCtxs; ii < testCtx->num_threads; ii++, threadCtx++)
{
delete threadCtx->swRead;
}
for (ii = 0, threadCtx = testCtx->threadCtxs; ii < testCtx->num_threads; ii++, threadCtx++)
{
delete threadCtx->swReadNext;
}
for (ii = 0, threadCtx = testCtx->threadCtxs; ii < testCtx->num_threads; ii++, threadCtx++)
{
delete threadCtx->swReadEnd;
}
}
/* \brief Setup TFSs and open databases
*
* Iterate through the context for each thread and set up their TFS
* and open the database.
*/
static RDM_RETCODE setupTFSsAndOpenDatabases (TEST_CTX *testCtx /*< [IN] A pointer to the test context */
)
{
uint32_t ii;
THREAD_CTX *threadCtx;
for (ii = 0, threadCtx = testCtx->threadCtxs; rc == sOKAY && ii < testCtx->num_threads; ii++, threadCtx++)
{
rc = openDatabase_thread (threadCtx);
}
return rc;
}
/*
* \brief Do the actual work by spawingan and running a number of threads
*
* \returns A RDM_RETCODE value
*
* \retval sOKAY on success
*/
static RDM_RETCODE doWorkByStartingANumberOfThreads (TEST_CTX *testCtx, /*< [IN] A pointer to the test context */
uint32_t *pNumThreadsStarted /*< [OUT] The number of threads that have started */
)
{
uint32_t ii;
uint32_t numThreadsStarted = 0;
THREAD_CTX *threadCtx;
for (ii = 0, threadCtx = testCtx->threadCtxs; rc == sOKAY && ii < testCtx->num_threads; ii++, threadCtx++)
{
rc = psp_threadBegin (&threadCtx->thread, doWork_thread, 0, threadCtx, 0, "");
if (rc == sOKAY)
{
numThreadsStarted++;
}
else
{
break;
}
}
*pNumThreadsStarted = numThreadsStarted;
return rc;
}
/*
* \brief Join threads
*
* Join the threads and terminate the stop watches for them to write
* out performance numbers.
*/
static RDM_RETCODE joinThreads (TEST_CTX *testCtx, /*< [IN] A pointer to the test context */
uint32_t numThreadsStarted /*< The number of threads that was started */
)
{
uint32_t ii;
THREAD_CTX *threadCtx;
for (ii = 0, threadCtx = testCtx->threadCtxs; ii < numThreadsStarted; ii++, threadCtx++)
{
if (rc == sOKAY)
{
rc = psp_threadJoin (&threadCtx->thread);
}
else
{
psp_threadJoin (&threadCtx->thread);
}
}
return rc;
}
/*
* \brief Display the number of readings stored in all databases
*
* Determine the number of readings stored across all of the databases
* in a test run.
*
* \returns A RDM_RETCODE value
*
* \retval sOKAY on success
*/
static RDM_RETCODE displayTotalReadings (
const TEST_CTX *testCtx /*< [IN] A pointer to the test context */
)
{
uint64_t numReadings = 0;
for (uint32_t ii = 0; ii < testCtx->num_threads; ii++)
{
numReadings += testCtx->threadCtxs[ii].numReadings;
}
cout << "Total number of readings in the database: " << numReadings << endl;
return sOKAY;
}
/*
* \brief Do the actual work by using a number of threads
*
* \returns A RDM_RETCODE value
*
* \retval sOKAY on success
*/
static RDM_RETCODE doWorkByUsingANumberOfThreads (TEST_CTX *testCtx /*< [IN] A pointer to the test context */
)
{
{
uint32_t numInsertThreadsStarted = 0;
SW sw ("Complete run of all the threads");
sw.start();
setupStopwatches (testCtx);
rc = setupTFSsAndOpenDatabases (testCtx);
if (rc == sOKAY)
{
rc = doWorkByStartingANumberOfThreads (testCtx, &numInsertThreadsStarted);
if (rc == sOKAY)
{
rc = joinThreads (testCtx, numInsertThreadsStarted);
}
else
{
joinThreads (testCtx, numInsertThreadsStarted);
}
}
reportStopwatches (testCtx);
sw.stop();
}
if (rc == sOKAY)
{
displayTotalReadings (testCtx);
}
return rc;
}
/*
* \brief Allocate and initialize the required TFS handles for a test run
*
* \returns A RDM_RETCODE value
*
* \retval sOKAY on success
*/
static RDM_RETCODE setupTFS (TEST_CTX *ctx /*< [IN] A pointer to the test context */
)
{
RDM_TFS tfs;
rc = allocAndInitTFS (&tfs);
if (rc == sOKAY)
{
ctx->tfs = tfs;
}
return rc;
}
/*
* \brief Cleanup the TFS handles for a test run
*
* \returns A RDM_RETCODE value
*
* \retval sOKAY on success
*/
static RDM_RETCODE cleanupTFS (TEST_CTX *testCtx /*< [IN] A pointer to the test context */
)
{
rc = rdm_tfsFree (testCtx->tfs);
return rc;
}
/*
* \brief Display the settings for a test run to stdout
*/
static void displayTestSettings (const TEST_CTX *testCtx /*< [IN] Pointer to the test context */
)
{
cout << "Run with these parameters:" << endl;
cout << " --trans-size=" << testCtx->num_rows_per_trans << endl;
cout << " --readings=" << testCtx->num_readings << endl;
cout << " --consumers=" << testCtx->num_consumers << endl;
cout << " --producers=" << testCtx->num_producers << endl;
cout << " threads=" << testCtx->num_threads << " (calculated)" << endl;
cout << " readings_per_row=" << testCtx->num_readings_per_row << " (calculated)" << endl;
cout << " total_rows_to_insert=" << testCtx->num_rows << " (calculated)" << endl;
cout << " total_readings_to_insert=" << testCtx->num_rows * testCtx->num_readings_per_row << " (calculated)" << endl;
cout << " --durability=" << testCtx->durability << endl;
}
/* \brief Initialize the test
*
* Parse the command line and initialize the test and some aspects of
* the context for each thread.
*
* \returns A RDM_RETCODE value
*
* \retval sOKAY on success
*/
static RDM_RETCODE initializeTest (int32_t argc, /*< [IN] The number of command line arguments */
const char *const *argv, /*< [IN] The command line arguments */
TEST_CTX *testCtx /*< [OUT] The test context */
)
{
int32_t ii;
char flag;
READINGS tableData;
const char *optarg;
const RDM_CMDLINE_OPT opts[] = {
{"d", "durability", "s=mode", "Transaction durability mode"},
{"t", "trans-size", "i=#", "Number of rows to insert in each transaction"},
{"r", "readings", "i=#", "Total number of readings to insert"},
{"c", "consumers", "i=#", "The number of consumers that are reading rows"},
{"p", "producers", "i=#", "The number of producers that are inserting rows"},
{NULL, NULL, NULL, NULL}};
uint32_t cpuCount = psp_envCPUCount ();
uint32_t num_xxxxucers = cpuCount > MAX_THREADS ? MAX_THREADS : cpuCount;
/* Set the default values for a test run */
testCtx->num_producers = num_xxxxucers / 2;
testCtx->num_consumers = num_xxxxucers / 2;
testCtx->num_readings = 100000000;
testCtx->num_rows_per_trans = 1000;
testCtx->openMode = RDM_OPEN_SHARED;
testCtx->durability = "CONSISTENT";
testCtx->num_readings_per_row = sizeof (tableData.data) / sizeof (reading);
/* Process */
rc = rdm_cmdlineInit (&cmd, argc, argv, "Performance example with a number of producers "
"and consumers. There is one database assigned to each producer "
"and the consumers reads from these databases using a union.",opts);
if (rc == sOKAY)
{
char opt;
while ((opt = rdm_cmdlineNextShortOption (&cmd, &optarg)) != '\0')
{
switch (opt)
{
case 'd':
testCtx->durability = optarg;
break;
case 'r':
testCtx->num_readings = atoi (optarg);
break;
case 'c':
testCtx->num_consumers = atoi (optarg);
break;
case 'p':
testCtx->num_producers = atoi (optarg);
break;
case 't':
testCtx->num_rows_per_trans = atoi (optarg);
break;
}
}
}
if (rc == sOKAY)
{
/* Allocate a thread context for each thread */
testCtx->num_threads = testCtx->num_consumers + testCtx->num_producers;
if (testCtx->num_threads > MAX_THREADS)
{
cerr << "error: Too many threads specified. Max value is " << MAX_THREADS << endl;
rc = eRANGE;
}
}
if (rc == sOKAY)
{
THREAD_CTX *threadCtx;
/* Initialize each thread context */
testCtx->num_rows = testCtx->num_readings / testCtx->num_readings_per_row;
if (testCtx->num_readings % testCtx->num_readings_per_row)
{
testCtx->num_rows++;
}
for (ii = 0, threadCtx = testCtx->threadCtxs; ii < (int32_t) testCtx->num_threads; ii++, threadCtx++)
{
threadCtx->testCtx = testCtx;
threadCtx->threadNum = ii;
if (ii < testCtx->num_producers)
{
threadCtx->consumer = RDM_FALSE;
}
else
{
threadCtx->consumer = RDM_TRUE;
}
}
/* Display test settings */
displayTestSettings (testCtx);
}
return rc;
}
/*
* \brief The main function for the example
*
* \returns A 32-bit integer value
*
* \retval 0 on success
* \retval 1 on failure
*/
int main_cpp55 (int argc, const char *const *argv)
{
static TEST_CTX testCtx;
rc = initializeTest (argc, argv, &testCtx);
if (rc == sOKAY)
{
rc = setupTFS (&testCtx);
}
if (rc == sOKAY)
{
if (rc == sOKAY)
{
rc = doWorkByUsingANumberOfThreads (&testCtx);
}
if (rc == sOKAY)
{
rc = cleanupTFS (&testCtx);
}
else
{
(void) cleanupTFS (&testCtx);
}
}
if (rc != sOKAY)
{
cerr << "Error " << rdm_retcodeGetName (rc) << " (" << rc << ") - " << rdm_retcodeGetDescription (rc) << endl;
}
return (rc == sOKAY) ? 0 : 1;
}
Definition: cpp50Example/rand.h:29
const char * rdm_retcodeGetName(RDM_RETCODE retcode)
Get the mnemonic name for an error or status code.
RDM_RETCODE rdm_cursorMoveToNext(RDM_CURSOR cursor)
Position a cursor to the next row in the collection.
Header for the native RDM Runtime API.
@ eRANGE
Definition: rdmretcodetypes.h:142
RDM_RETCODE rdm_rdmAllocTFS(RDM_TFS *phTFS)
Allocate a TFS handle.
@ RDM_FALSE
Definition: psptypes.h:59
void start(void)
Definition: cpp50Example/sw.h:60
RDM_RETCODE rdm_dbEnd(RDM_DB db)
End a transactional operation.
struct RDM_CURSOR_S * RDM_CURSOR
Definition: rdmtypes.h:306
RDM_RETCODE rdm_dbStartRead(RDM_DB db, const RDM_TABLE_ID *tableIds, uint32_t numTableIds, RDM_TRANS *pTrans)
Get read locks.
RDM_RETCODE rdm_cursorReadRow(RDM_CURSOR cursor, void *colValues, size_t bytesIn, size_t *bytesOut)
Read all columns from a row.
void stop(void)
Definition: cpp50Example/sw.h:72
The buffer used by the command line parser to hold state information.
Definition: rdmcmdlinetypes.h:85
#define RDM_UINT64_MAX
Definition: psptypes.h:108
RDM_RETCODE rdm_cursorGetCount(RDM_CURSOR cursor, uint64_t *count)
Get the row count for a cursor.
RDM_RETCODE rdm_dbClose(RDM_DB db)
Close the database associated with a database handle.
RDM_RETCODE rdm_dbGetRowsByKey(RDM_DB db, RDM_KEY_ID keyId, RDM_CURSOR *pCursor)
Associate an RDM_CURSOR with a row set based on a key.
RDM_RETCODE rdm_dbSetCatalog(RDM_DB db, const char *catalog)
Associate a catalog with an allocated database.
uint32_t RDM_TABLE_ID
Definition: rdmtypes.h:27
RDM_RETCODE rdm_dbFree(RDM_DB db)
Free a database handle.
RDM_RETCODE rdm_dbOpen(RDM_DB db, const char *dbNameSpec, RDM_OPEN_MODE mode)
Open an existing RDM database using the specified database handle.
#define print_error(rc)
Definition: example_fcns.h:17
@ sOKAY
Definition: rdmretcodetypes.h:97
@ RDM_TRUE
Definition: psptypes.h:60
RDM_EXPORT void psp_threadSleep(uint32_t recMillisecs)
Generic usage function option record.
Definition: rdmcmdlinetypes.h:32
RDM_RETCODE rdm_cursorFree(RDM_CURSOR cursor)
Free an RDM_CURSOR.
RDM_RETCODE rdm_dbSetOption(RDM_DB db, const char *keyword, const char *strValue)
Set a single RDM option from a string.
RDM_OPEN_MODE
Enumeration for open modes.
Definition: rdmtypes.h:251
@ RDM_OPEN_READONLY
Definition: rdmtypes.h:255
RDM_RETCODE rdm_tfsAllocDatabase(RDM_TFS tfs, RDM_DB *pDb)
Allocate memory for a new RDM db.
RDM_EXPORT RDM_RETCODE psp_threadJoin(PSP_THREAD_PTR_T)
#define RDM_STARTUP_EXAMPLE(name)
Definition: rdmstartuptypes.h:73
RDM_EXPORT RDM_RETCODE psp_threadBegin(PSP_THREAD_PTR_T pThread, PSP_THREAD_FCN fcn, uint32_t stacksize, void *arg, int32_t priority, const char *name)
RDM_RETCODE rdm_dbStartSnapshot(RDM_DB db, const RDM_TABLE_ID *tableIds, uint32_t numTableIds, RDM_TRANS *pTrans)
Start a snapshot.
RDM_RETCODE rdm_tfsSetOption(RDM_TFS tfs, const char *keyword, const char *strValue)
Set a single TFS option from a string.
RDM_RETCODE rdm_dbGetRows(RDM_DB db, RDM_TABLE_ID tableId, RDM_CURSOR *pCursor)
Associate an RDM_CURSOR with rows based on a table id.
const char * rdm_retcodeGetDescription(RDM_RETCODE retcode)
Invoke RDM error handler.
RDM_RETCODE rdm_dbEndRollback(RDM_DB db)
End and rollback a transactional operation.
RDM_BOOL_T
Definition: psptypes.h:58
@ RDM_OPEN_SHARED
Definition: rdmtypes.h:253
Definition: cpp50Example/sw.h:35
RDM_RETCODE rdm_tfsDropDatabase(RDM_TFS tfs, const char *dbNameSpec)
Drop the specified database.
RDM_RETCODE rdm_dbInsertRow(RDM_DB db, RDM_TABLE_ID tableId, const void *colValues, size_t bytesIn, RDM_CURSOR *pCursor)
Insert a new row into a table at the specified rowId.
#define RDM_LOCK_NONE
Definition: rdmtypes.h:168
RDM_RETCODE rdm_cursorMoveToKey(RDM_CURSOR cursor, RDM_KEY_ID keyId, const void *keyValue, size_t bytesIn)
Position a cursor based on a key value.
struct RDM_TFS_S * RDM_TFS
RDM TFS Handle.
Definition: rdmtfstypes.h:21
RDM_RETCODE rdm_cmdlineInit(RDM_CMDLINE *cmd, int32_t argc, const char *const argv[], const char *description, const RDM_CMDLINE_OPT *opts)
Initialize an RDM_CMDLINE buffer and validate the command line.
struct RDM_DB_S * RDM_DB
Definition: rdmtypes.h:305
RDM_RETCODE rdm_tfsFree(RDM_TFS hTFS)
Terminate a TFS service.
void print_errorEx(int rc, const char *file, int line)
char rdm_cmdlineNextShortOption(RDM_CMDLINE *cmd, const char **arg)
Get next option or argument.
RDM_RETCODE rdm_dbStartUpdate(RDM_DB db, const RDM_TABLE_ID *writeTableIds, uint32_t numWriteTableIds, const RDM_TABLE_ID *readTableIds, uint32_t numReadTableIds, RDM_TRANS *pTrans)
Get write locks.
@ sENDOFCURSOR
Definition: rdmretcodetypes.h:58
Internal RDM Startup API used by startup macros.
RDM_RETCODE
RDM status and error return codes.
Definition: rdmretcodetypes.h:43
RDM_RETCODE rdm_tfsInitialize(RDM_TFS tfs)
Initialize a RDM_TFS instance.