diff options
Diffstat (limited to 'plugins/omhiredis/omhiredis.c')
-rw-r--r-- | plugins/omhiredis/omhiredis.c | 102 |
1 files changed, 82 insertions, 20 deletions
diff --git a/plugins/omhiredis/omhiredis.c b/plugins/omhiredis/omhiredis.c index 7c45443..051ac0b 100644 --- a/plugins/omhiredis/omhiredis.c +++ b/plugins/omhiredis/omhiredis.c @@ -20,7 +20,6 @@ */ - #include "config.h" #include <stdio.h> #include <string.h> @@ -49,11 +48,16 @@ MODULE_CNFNAME("omhiredis") DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) +/* our instance data. + * this will be accessable + * via pData */ typedef struct _instanceData { - redisContext *conn; - uchar *server; - int port; - uchar *tplName; + redisContext *conn; /* redis connection */ + uchar *server; /* redis server address */ + int port; /* redis port */ + uchar *tplName; /* template name */ + redisReply **replies; /* array to hold replies from redis */ + int count; /* count of command sent for current batch */ } instanceData; @@ -78,6 +82,7 @@ CODESTARTisCompatibleWithFeature iRet = RS_RET_OK; ENDisCompatibleWithFeature +/* called when closing */ static void closeHiredis(instanceData *pData) { if(pData->conn != NULL) { @@ -86,7 +91,8 @@ static void closeHiredis(instanceData *pData) } } - +/* Free our instance data. + * TODO: free **replies */ BEGINfreeInstance CODESTARTfreeInstance closeHiredis(pData); @@ -100,7 +106,7 @@ CODESTARTdbgPrintInstInfo /* nothing special here */ ENDdbgPrintInstInfo - +/* establish our connection to redis */ static rsRetVal initHiredis(instanceData *pData, int bSilent) { char *server; @@ -115,55 +121,104 @@ static rsRetVal initHiredis(instanceData *pData, int bSilent) if(!bSilent) errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize redis handle"); - ABORT_FINALIZE(RS_RET_SUSPENDED); + ABORT_FINALIZE(RS_RET_SUSPENDED); } - finalize_it: RETiRet; } rsRetVal writeHiredis(uchar *message, instanceData *pData) { - redisReply *reply; DEFiRet; + /* if we do not have a redis connection, call + * initHiredis and try to establish one */ if(pData->conn == NULL) CHKiRet(initHiredis(pData, 0)); - reply = redisCommand(pData->conn, (char*)message); - if (reply->type == REDIS_REPLY_ERROR) { - errmsg.LogError(0, NO_ERRCODE, "omhiredis: %s", reply->str); - dbgprintf("omhiredis: %s\n", reply->str); - freeReplyObject(reply); + /* try to append the command to the pipeline. + * REDIS_ERR reply indicates something bad + * happened, in which case abort. otherwise + * increase our current pipeline count + * by 1 and continue. */ + int rc; + rc = redisAppendCommand(pData->conn, (char*)message); + if (rc == REDIS_ERR) { + errmsg.LogError(0, NO_ERRCODE, "omhiredis: %s", pData->conn->errstr); + dbgprintf("omhiredis: %s\n", pData->conn->errstr); ABORT_FINALIZE(RS_RET_ERR); } else { - freeReplyObject(reply); - } + pData->count++; + } finalize_it: RETiRet; } +/* called when resuming from suspended state. + * try to restablish our connection to redis */ BEGINtryResume CODESTARTtryResume if(pData->conn == NULL) iRet = initHiredis(pData, 0); ENDtryResume +/* begin a transaction. for now does nothing. + * if I decide to use MULTI ... EXEC in the + * fture, this block should send the + * MULTI command to redis. */ +BEGINbeginTransaction +CODESTARTbeginTransaction + dbgprintf("omhiredis: beginTransaction called\n"); +ENDbeginTransaction + +/* call writeHiredis for this log line, + * which appends it as a command to the + * current pipeline */ BEGINdoAction CODESTARTdoAction - iRet = writeHiredis(ppString[0], pData); + CHKiRet(writeHiredis(ppString[0], pData)); + iRet = RS_RET_DEFER_COMMIT; +finalize_it: ENDdoAction - +/* called when we have reached the end of a + * batch (queue.dequeuebatchsize). this + * iterates over the replies, putting them + * into the pData->replies buffer. we currently + * don't really bother to check for errors + * which should be fixed */ +BEGINendTransaction +CODESTARTendTransaction + dbgprintf("omhiredis: endTransaction called\n"); + int i; + pData->replies = malloc ( sizeof ( redisReply* ) * pData->count ); + for ( i = 0; i < pData->count; i++ ) { + redisGetReply ( pData->conn, (void *)&pData->replies[i] ); + /* TODO: add error checking here! */ + free ( pData->replies[i] ); + } + free ( pData->replies ); + pData->count = 0; +ENDendTransaction + +/* set defaults. note server is set to NULL + * and is set to a default in initHiredis if + * it is still null when it's called - I should + * probable just set the default here instead */ static inline void setInstParamDefaults(instanceData *pData) { pData->server = NULL; pData->port = 6379; pData->tplName = NULL; + pData->count = 0; } +/* here is where the work to set up a new instance + * is done. this reads the config options from + * the rsyslog conf and takes appropriate setup + * actions. */ BEGINnewActInst struct cnfparamvals *pvals; int i; @@ -222,18 +277,25 @@ BEGINmodExit CODESTARTmodExit ENDmodExit - +/* register our plugin entry points + * with the rsyslog core engine */ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* supports transaction interface */ ENDqueryEtryPt +/* note we do not support rsyslog v5 syntax */ BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* only supports rsyslog 6 configs */ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); + if (!bCoreSupportsBatching) { + errmsg.LogError(0, NO_ERRCODE, "omhiredis: rsyslog core does not support batching - abort"); + ABORT_FINALIZE(RS_RET_ERR); + } DBGPRINTF("omhiredis: module compiled with rsyslog version %s.\n", VERSION); ENDmodInit |