summaryrefslogtreecommitdiff
path: root/plugins/omzmq3/omzmq3.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/omzmq3/omzmq3.c')
-rw-r--r--plugins/omzmq3/omzmq3.c96
1 files changed, 57 insertions, 39 deletions
diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c
index c8552f1..4eb4a37 100644
--- a/plugins/omzmq3/omzmq3.c
+++ b/plugins/omzmq3/omzmq3.c
@@ -2,26 +2,25 @@
* Copyright 2012 Talksum, Inc
* Using the czmq interface to zeromq, we output
* to a zmq socket.
-
-
-*
-* This program is free software: you can redistribute it and/or
-* modify it under the terms of the GNU Lesser General Public License
-* as published by the Free Software Foundation, either version 3 of
-* the License, or (at your option) any later version.
-*
-* This program is distributed in the hope that it will be useful, but
-* WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this program. If not, see
-* <http://www.gnu.org/licenses/>.
-*
-* Author: David Kelly
-* <davidk@talksum.com>
-*/
+ * Copyright (C) 2014 Rainer Gerhards
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Author: David Kelly
+ * <davidk@talksum.com>
+ */
#include "config.h"
@@ -51,6 +50,8 @@ MODULE_CNFNAME("omzmq3")
DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
+
/* convienent symbols to denote a socket we want to bind
vs one we want to just connect to
*/
@@ -97,6 +98,10 @@ typedef struct _instanceData {
uchar* tplName;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* ----------------------------------------------------------------------------
* Static definitions/initializations
@@ -309,6 +314,11 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -328,15 +338,26 @@ CODESTARTfreeInstance
free(pData->identity);
ENDfreeInstance
+
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINtryResume
CODESTARTtryResume
- if(NULL == pData->socket)
- iRet = initZMQ(pData);
+ pthread_mutex_lock(&mutDoAct);
+ if(NULL == pWrkrData->pData->socket)
+ iRet = initZMQ(pWrkrData->pData);
+ pthread_mutex_unlock(&mutDoAct);
ENDtryResume
BEGINdoAction
+ instanceData *pData = pWrkrData->pData;
CODESTARTdoAction
-iRet = writeZMQ(ppString[0], pData);
+ pthread_mutex_lock(&mutDoAct);
+ iRet = writeZMQ(ppString[0], pData);
+ pthread_mutex_unlock(&mutDoAct);
ENDdoAction
@@ -348,10 +369,10 @@ CODESTARTnewActInst
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
-CHKiRet(createInstance(&pData));
-setInstParamDefaults(pData);
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
-CODE_STD_STRING_REQUESTnewActInst(1)
+ CODE_STD_STRING_REQUESTnewActInst(1)
for (i = 0; i < actpblk.nParams; ++i) {
if (!pvals[i].bUsed)
continue;
@@ -423,26 +444,25 @@ CODE_STD_STRING_REQUESTnewActInst(1)
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
-CODE_STD_FINALIZERnewActInst
+ CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
BEGINparseSelectorAct
CODESTARTparseSelectorAct
-
-/* tell the engine we only want one template string */
-CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ /* tell the engine we only want one template string */
+ CODE_STD_STRING_REQUESTparseSelectorAct(1)
if(!strncmp((char*) p, ":omzmq3:", sizeof(":omzmq3:") - 1))
errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
"omzmq3 supports only v6 config format, use: "
"action(type=\"omzmq3\" serverport=...)");
ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
-CODE_STD_FINALIZERparseSelectorAct
+ CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
BEGINinitConfVars /* (re)set config variables to defaults */
CODESTARTinitConfVars
-s_workerThreads = -1;
+ s_workerThreads = -1;
ENDinitConfVars
BEGINmodExit
@@ -456,8 +476,9 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
-CODEqueryEtryPt_STD_OMOD_QUERIES
-CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+ CODEqueryEtryPt_STD_OMOD_QUERIES
+ CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+ CODEqueryEtryPt_STD_OMOD8_QUERIES
ENDqueryEtryPt
BEGINmodInit()
@@ -468,9 +489,6 @@ CODEmodInit_QueryRegCFSLineHdlr
INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
DBGPRINTF("omzmq3: module compiled with rsyslog version %s.\n", VERSION);
-INITLegCnfVars
-CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID));
+ INITLegCnfVars
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID));
ENDmodInit
-
-
-