diff options
Diffstat (limited to 'plugins/omzmq3')
-rw-r--r-- | plugins/omzmq3/omzmq3.c | 96 |
1 files changed, 57 insertions, 39 deletions
diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c index c8552f1..4eb4a37 100644 --- a/plugins/omzmq3/omzmq3.c +++ b/plugins/omzmq3/omzmq3.c @@ -2,26 +2,25 @@ * Copyright 2012 Talksum, Inc * Using the czmq interface to zeromq, we output * to a zmq socket. - - -* -* This program is free software: you can redistribute it and/or -* modify it under the terms of the GNU Lesser General Public License -* as published by the Free Software Foundation, either version 3 of -* the License, or (at your option) any later version. -* -* This program is distributed in the hope that it will be useful, but -* WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -* Lesser General Public License for more details. -* -* You should have received a copy of the GNU Lesser General Public -* License along with this program. If not, see -* <http://www.gnu.org/licenses/>. -* -* Author: David Kelly -* <davidk@talksum.com> -*/ + * Copyright (C) 2014 Rainer Gerhards + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: David Kelly + * <davidk@talksum.com> + */ #include "config.h" @@ -51,6 +50,8 @@ MODULE_CNFNAME("omzmq3") DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; + /* convienent symbols to denote a socket we want to bind vs one we want to just connect to */ @@ -97,6 +98,10 @@ typedef struct _instanceData { uchar* tplName; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* ---------------------------------------------------------------------------- * Static definitions/initializations @@ -309,6 +314,11 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance + +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -328,15 +338,26 @@ CODESTARTfreeInstance free(pData->identity); ENDfreeInstance + +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINtryResume CODESTARTtryResume - if(NULL == pData->socket) - iRet = initZMQ(pData); + pthread_mutex_lock(&mutDoAct); + if(NULL == pWrkrData->pData->socket) + iRet = initZMQ(pWrkrData->pData); + pthread_mutex_unlock(&mutDoAct); ENDtryResume BEGINdoAction + instanceData *pData = pWrkrData->pData; CODESTARTdoAction -iRet = writeZMQ(ppString[0], pData); + pthread_mutex_lock(&mutDoAct); + iRet = writeZMQ(ppString[0], pData); + pthread_mutex_unlock(&mutDoAct); ENDdoAction @@ -348,10 +369,10 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } -CHKiRet(createInstance(&pData)); -setInstParamDefaults(pData); + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); -CODE_STD_STRING_REQUESTnewActInst(1) + CODE_STD_STRING_REQUESTnewActInst(1) for (i = 0; i < actpblk.nParams; ++i) { if (!pvals[i].bUsed) continue; @@ -423,26 +444,25 @@ CODE_STD_STRING_REQUESTnewActInst(1) ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } -CODE_STD_FINALIZERnewActInst + CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst BEGINparseSelectorAct CODESTARTparseSelectorAct - -/* tell the engine we only want one template string */ -CODE_STD_STRING_REQUESTparseSelectorAct(1) + /* tell the engine we only want one template string */ + CODE_STD_STRING_REQUESTparseSelectorAct(1) if(!strncmp((char*) p, ":omzmq3:", sizeof(":omzmq3:") - 1)) errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, "omzmq3 supports only v6 config format, use: " "action(type=\"omzmq3\" serverport=...)"); ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); -CODE_STD_FINALIZERparseSelectorAct + CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct BEGINinitConfVars /* (re)set config variables to defaults */ CODESTARTinitConfVars -s_workerThreads = -1; + s_workerThreads = -1; ENDinitConfVars BEGINmodExit @@ -456,8 +476,9 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt -CODEqueryEtryPt_STD_OMOD_QUERIES -CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES + CODEqueryEtryPt_STD_OMOD_QUERIES + CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES + CODEqueryEtryPt_STD_OMOD8_QUERIES ENDqueryEtryPt BEGINmodInit() @@ -468,9 +489,6 @@ CODEmodInit_QueryRegCFSLineHdlr INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); DBGPRINTF("omzmq3: module compiled with rsyslog version %s.\n", VERSION); -INITLegCnfVars -CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID)); + INITLegCnfVars + CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID)); ENDmodInit - - - |