summaryrefslogtreecommitdiff
path: root/plugins/omhiredis/omhiredis.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/omhiredis/omhiredis.c')
-rw-r--r--plugins/omhiredis/omhiredis.c102
1 files changed, 82 insertions, 20 deletions
diff --git a/plugins/omhiredis/omhiredis.c b/plugins/omhiredis/omhiredis.c
index 7c45443..051ac0b 100644
--- a/plugins/omhiredis/omhiredis.c
+++ b/plugins/omhiredis/omhiredis.c
@@ -20,7 +20,6 @@
*/
-
#include "config.h"
#include <stdio.h>
#include <string.h>
@@ -49,11 +48,16 @@ MODULE_CNFNAME("omhiredis")
DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
+/* our instance data.
+ * this will be accessable
+ * via pData */
typedef struct _instanceData {
- redisContext *conn;
- uchar *server;
- int port;
- uchar *tplName;
+ redisContext *conn; /* redis connection */
+ uchar *server; /* redis server address */
+ int port; /* redis port */
+ uchar *tplName; /* template name */
+ redisReply **replies; /* array to hold replies from redis */
+ int count; /* count of command sent for current batch */
} instanceData;
@@ -78,6 +82,7 @@ CODESTARTisCompatibleWithFeature
iRet = RS_RET_OK;
ENDisCompatibleWithFeature
+/* called when closing */
static void closeHiredis(instanceData *pData)
{
if(pData->conn != NULL) {
@@ -86,7 +91,8 @@ static void closeHiredis(instanceData *pData)
}
}
-
+/* Free our instance data.
+ * TODO: free **replies */
BEGINfreeInstance
CODESTARTfreeInstance
closeHiredis(pData);
@@ -100,7 +106,7 @@ CODESTARTdbgPrintInstInfo
/* nothing special here */
ENDdbgPrintInstInfo
-
+/* establish our connection to redis */
static rsRetVal initHiredis(instanceData *pData, int bSilent)
{
char *server;
@@ -115,55 +121,104 @@ static rsRetVal initHiredis(instanceData *pData, int bSilent)
if(!bSilent)
errmsg.LogError(0, RS_RET_SUSPENDED,
"can not initialize redis handle");
- ABORT_FINALIZE(RS_RET_SUSPENDED);
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
}
-
finalize_it:
RETiRet;
}
rsRetVal writeHiredis(uchar *message, instanceData *pData)
{
- redisReply *reply;
DEFiRet;
+ /* if we do not have a redis connection, call
+ * initHiredis and try to establish one */
if(pData->conn == NULL)
CHKiRet(initHiredis(pData, 0));
- reply = redisCommand(pData->conn, (char*)message);
- if (reply->type == REDIS_REPLY_ERROR) {
- errmsg.LogError(0, NO_ERRCODE, "omhiredis: %s", reply->str);
- dbgprintf("omhiredis: %s\n", reply->str);
- freeReplyObject(reply);
+ /* try to append the command to the pipeline.
+ * REDIS_ERR reply indicates something bad
+ * happened, in which case abort. otherwise
+ * increase our current pipeline count
+ * by 1 and continue. */
+ int rc;
+ rc = redisAppendCommand(pData->conn, (char*)message);
+ if (rc == REDIS_ERR) {
+ errmsg.LogError(0, NO_ERRCODE, "omhiredis: %s", pData->conn->errstr);
+ dbgprintf("omhiredis: %s\n", pData->conn->errstr);
ABORT_FINALIZE(RS_RET_ERR);
} else {
- freeReplyObject(reply);
- }
+ pData->count++;
+ }
finalize_it:
RETiRet;
}
+/* called when resuming from suspended state.
+ * try to restablish our connection to redis */
BEGINtryResume
CODESTARTtryResume
if(pData->conn == NULL)
iRet = initHiredis(pData, 0);
ENDtryResume
+/* begin a transaction. for now does nothing.
+ * if I decide to use MULTI ... EXEC in the
+ * fture, this block should send the
+ * MULTI command to redis. */
+BEGINbeginTransaction
+CODESTARTbeginTransaction
+ dbgprintf("omhiredis: beginTransaction called\n");
+ENDbeginTransaction
+
+/* call writeHiredis for this log line,
+ * which appends it as a command to the
+ * current pipeline */
BEGINdoAction
CODESTARTdoAction
- iRet = writeHiredis(ppString[0], pData);
+ CHKiRet(writeHiredis(ppString[0], pData));
+ iRet = RS_RET_DEFER_COMMIT;
+finalize_it:
ENDdoAction
-
+/* called when we have reached the end of a
+ * batch (queue.dequeuebatchsize). this
+ * iterates over the replies, putting them
+ * into the pData->replies buffer. we currently
+ * don't really bother to check for errors
+ * which should be fixed */
+BEGINendTransaction
+CODESTARTendTransaction
+ dbgprintf("omhiredis: endTransaction called\n");
+ int i;
+ pData->replies = malloc ( sizeof ( redisReply* ) * pData->count );
+ for ( i = 0; i < pData->count; i++ ) {
+ redisGetReply ( pData->conn, (void *)&pData->replies[i] );
+ /* TODO: add error checking here! */
+ free ( pData->replies[i] );
+ }
+ free ( pData->replies );
+ pData->count = 0;
+ENDendTransaction
+
+/* set defaults. note server is set to NULL
+ * and is set to a default in initHiredis if
+ * it is still null when it's called - I should
+ * probable just set the default here instead */
static inline void
setInstParamDefaults(instanceData *pData)
{
pData->server = NULL;
pData->port = 6379;
pData->tplName = NULL;
+ pData->count = 0;
}
+/* here is where the work to set up a new instance
+ * is done. this reads the config options from
+ * the rsyslog conf and takes appropriate setup
+ * actions. */
BEGINnewActInst
struct cnfparamvals *pvals;
int i;
@@ -222,18 +277,25 @@ BEGINmodExit
CODESTARTmodExit
ENDmodExit
-
+/* register our plugin entry points
+ * with the rsyslog core engine */
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_TXIF_OMOD_QUERIES /* supports transaction interface */
ENDqueryEtryPt
+/* note we do not support rsyslog v5 syntax */
BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* only supports rsyslog 6 configs */
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
+ if (!bCoreSupportsBatching) {
+ errmsg.LogError(0, NO_ERRCODE, "omhiredis: rsyslog core does not support batching - abort");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
DBGPRINTF("omhiredis: module compiled with rsyslog version %s.\n", VERSION);
ENDmodInit