From daeb0d03d4a65fa118ad25b34958fb9cacbbd6f4 Mon Sep 17 00:00:00 2001 From: Michael Biebl Date: Wed, 4 Jun 2014 22:22:46 +0200 Subject: Imported Upstream version 8.2.2 --- plugins/imdiag/imdiag.c | 10 ++- plugins/imptcp/imptcp.c | 1 + plugins/imudp/imudp.c | 1 + plugins/mmrfc5424addhmac/mmrfc5424addhmac.c | 16 +++++ plugins/mmsnmptrapd/mmsnmptrapd.c | 10 +-- plugins/omgssapi/omgssapi.c | 24 +++++++- plugins/omhdfs/omhdfs.c | 43 ++++++++++--- plugins/ommysql/Makefile.am | 2 +- plugins/ommysql/Makefile.in | 2 +- plugins/ommysql/ommysql.c | 1 - plugins/ommysql/ommysql.h | 31 ---------- plugins/omrabbitmq/omrabbitmq.c | 23 ++++++- plugins/omstdout/omstdout.c | 2 +- plugins/omzmq3/omzmq3.c | 96 +++++++++++++++++------------ 14 files changed, 163 insertions(+), 99 deletions(-) delete mode 100644 plugins/ommysql/ommysql.h (limited to 'plugins') diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c index 5fdc6ef..0532b5b 100644 --- a/plugins/imdiag/imdiag.c +++ b/plugins/imdiag/imdiag.c @@ -1,13 +1,11 @@ /* imdiag.c - * This is a diagnostics module, primarily meant for troubleshooting - * and information about the runtime state of rsyslog. It is implemented - * as an input plugin, because that interface best suits our needs - * and also enables us to inject test messages (something not yet - * implemented). + * This is a testbench tool. It started out with a broader scope, + * but we dropped this idea. To learn about rsyslog runtime statistics + * have a look at impstats. * * File begun on 2008-07-25 by RGerhards * - * Copyright 2008-2012 Adiscon GmbH. + * Copyright 2008-2014 Adiscon GmbH. * * This file is part of rsyslog. * diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 920d876..9dc965b 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -1720,6 +1720,7 @@ BEGINactivateCnfPrePrivDrop instanceConf_t *inst; CODESTARTactivateCnfPrePrivDrop iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */ + DBGPRINTF("imptcp: config params iMaxLine %d\n", iMaxLine); runModConf = pModConf; for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 180c45f..61dfc58 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -1042,6 +1042,7 @@ CODESTARTactivateCnf # ifdef HAVE_RECVMMSG lenRcvBuf *= runModConf->batchSize; # endif + DBGPRINTF("imudp: config params iMaxLine %d, lenRcvBuf %d\n", iMaxLine, lenRcvBuf); for(i = 0 ; i < runModConf->wrkrMax ; ++i) { # ifdef HAVE_RECVMMSG CHKmalloc(wrkrInfo[i].recvmsg_iov = MALLOC(runModConf->batchSize * sizeof(struct iovec))); diff --git a/plugins/mmrfc5424addhmac/mmrfc5424addhmac.c b/plugins/mmrfc5424addhmac/mmrfc5424addhmac.c index 959a8ba..3a58edd 100644 --- a/plugins/mmrfc5424addhmac/mmrfc5424addhmac.c +++ b/plugins/mmrfc5424addhmac/mmrfc5424addhmac.c @@ -80,6 +80,10 @@ typedef struct _instanceData { const EVP_MD *algo; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -129,6 +133,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature ENDisCompatibleWithFeature @@ -139,6 +148,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + static inline void setInstParamDefaults(instanceData *pData) { @@ -322,6 +336,7 @@ hashMsg(instanceData *pData, msg_t *pMsg) BEGINdoAction + instanceData *pData = pWrkrData->pData; msg_t *pMsg; CODESTARTdoAction pMsg = (msg_t*) ppString[0]; @@ -364,6 +379,7 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/mmsnmptrapd/mmsnmptrapd.c b/plugins/mmsnmptrapd/mmsnmptrapd.c index 3547dcf..456eb79 100644 --- a/plugins/mmsnmptrapd/mmsnmptrapd.c +++ b/plugins/mmsnmptrapd/mmsnmptrapd.c @@ -68,8 +68,8 @@ struct severMap_s { typedef struct _instanceData { uchar *pszTagName; - uchar *pszTagID; /* chaced: name plus trailing shlash (for compares) */ - int lenTagID; /* cached length of tag ID, for performance reasons */ + uchar *pszTagID; /* cached: name plus trailing shlash (for compares) */ + int lenTagID; /* cached: length of tag ID, for performance reasons */ struct severMap_s *severMap; } instanceData; @@ -202,7 +202,6 @@ getTagComponent(uchar *tag, uchar *dst, int *lenDst) ++i; } dst[i] = '\0'; -dbgprintf("XXXX: getTagComponent dst on output: '%s', len %d\n", dst, i); *lenDst = i; done: return i; @@ -241,7 +240,6 @@ BEGINdoAction CODESTARTdoAction pData = pWrkrData->pData; pMsg = (msg_t*) ppString[0]; - dbgprintf("XXXX: mmsnmptrapd called with pMsg %p\n", pMsg); getTAG(pMsg, &pszTag, &lenTAG); if(strncmp((char*)pszTag, (char*)pData->pszTagID, pData->lenTagID)) { DBGPRINTF("tag '%s' not matching, mmsnmptrapd ignoring this message\n", @@ -250,18 +248,16 @@ CODESTARTdoAction } lenSever = sizeof(pszSever); -dbgprintf("XXXX: pszTag: '%s', lenID %d\n", pszTag, pData->lenTagID); getTagComponent(pszTag+pData->lenTagID-1, pszSever, &lenSever); lenHost = sizeof(pszHost); getTagComponent(pszTag+pData->lenTagID+lenSever, pszHost, &lenHost); - dbgprintf("XXXX: mmsnmptrapd sever '%s'(%d), host '%s'(%d)\n", pszSever, lenSever, pszHost,lenHost); + DBGPRINTF("mmsnmptrapd: sever '%s'(%d), host '%s'(%d)\n", pszSever, lenSever, pszHost,lenHost); if(pszHost[lenHost-1] == ':') { pszHost[lenHost-1] = '\0'; --lenHost; } sevCode = lookupSeverityCode(pData, pszSever); -dbgprintf("XXXX: severity for message is %d\n", sevCode); /* now apply new settings */ MsgSetTAG(pMsg, pData->pszTagName, pData->lenTagID); MsgSetHOSTNAME(pMsg, pszHost, lenHost); diff --git a/plugins/omgssapi/omgssapi.c b/plugins/omgssapi/omgssapi.c index 818a7cf..25400c7 100644 --- a/plugins/omgssapi/omgssapi.c +++ b/plugins/omgssapi/omgssapi.c @@ -4,7 +4,7 @@ * NOTE: read comments in module-template.h to understand how this file * works! * - * Copyright 2007, 2008 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2014 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -88,6 +88,10 @@ typedef struct _instanceData { OM_uint32 gss_flags; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* config data */ typedef enum gss_mode_e { @@ -101,6 +105,7 @@ static struct configSettings_s { gss_mode_t gss_mode; } cs; +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; /* get the syslog forward port from selector_t. The passed in * struct must be one that is setup for forwarding. @@ -122,6 +127,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -163,6 +173,9 @@ CODESTARTfreeInstance free(pData->f_hname); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -379,14 +392,19 @@ static rsRetVal doTryResume(instanceData *pData) BEGINtryResume CODESTARTtryResume - iRet = doTryResume(pData); + pthread_mutex_lock(&mutDoAct); + iRet = doTryResume(pWrkrData->pData); + pthread_mutex_unlock(&mutDoAct); ENDtryResume BEGINdoAction char *psz = NULL; /* temporary buffering */ register unsigned l; int iMaxLine; + instanceData *pData; CODESTARTdoAction + pthread_mutex_lock(&mutDoAct); + pData = pWrkrData->pData; switch (pData->eDestState) { case eDestFORW_SUSP: dbgprintf("internal error in omgssapi.c, eDestFORW_SUSP in doAction()!\n"); @@ -465,6 +483,7 @@ finalize_it: free(psz); } # endif + pthread_mutex_unlock(&mutDoAct); ENDdoAction @@ -656,6 +675,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES ENDqueryEtryPt diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c index f8a7e73..e173fb3 100644 --- a/plugins/omhdfs/omhdfs.c +++ b/plugins/omhdfs/omhdfs.c @@ -4,7 +4,7 @@ * NOTE: read comments in module-template.h to understand how this file * works! * - * Copyright 2010 Rainer Gerhards and Adiscon GmbH. + * Copyright 2010-2014 Rainer Gerhards and Adiscon GmbH. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -36,7 +36,12 @@ #include #include #include -#include +#ifdef HAVE_HDFS_H +# include +#endif +#ifdef HAVE_HADOOP_HDFS_H +# include +#endif #include "syslogd-types.h" #include "srUtils.h" @@ -51,7 +56,7 @@ MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP -MODULE_CNFNAME("omhdfs") +/* MODULE_CNFNAME("omhdfs") we need this only when we convert the module to v2 config system */ /* internal structures */ @@ -60,6 +65,7 @@ DEFobjCurrIf(errmsg) /* global data */ static struct hashtable *files; /* holds all file objects that we know */ +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; typedef struct configSettings_s { uchar *fileName; @@ -69,11 +75,6 @@ typedef struct configSettings_s { } configSettings_t; static configSettings_t cs; - -BEGINinitConfVars /* (re)set config variables to default values */ -CODESTARTinitConfVars -ENDinitConfVars - typedef struct { uchar *name; hdfsFS fs; @@ -91,6 +92,10 @@ typedef struct _instanceData { unsigned offsBuf; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* forward definitions (down here, need data types) */ static inline rsRetVal fileClose(file_t *pFile); @@ -387,6 +392,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINfreeInstance CODESTARTfreeInstance if(pData->pFile != NULL) @@ -394,8 +404,15 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINtryResume + instanceData *pData = pWrkrData->pData; CODESTARTtryResume + pthread_mutex_lock(&mutDoAct); fileClose(pData->pFile); fileOpen(pData->pFile); if(pData->pFile->fh == NULL){ @@ -403,6 +420,7 @@ CODESTARTtryResume pData->pFile->name); iRet = RS_RET_SUSPENDED; } + pthread_mutex_unlock(&mutDoAct); ENDtryResume @@ -413,20 +431,26 @@ ENDbeginTransaction BEGINdoAction + instanceData *pData = pWrkrData->pData; CODESTARTdoAction DBGPRINTF("omhdfs: action to to write to %s\n", pData->pFile->name); + pthread_mutex_lock(&mutDoAct); iRet = addData(pData, ppString[0]); -dbgprintf("omhdfs: done doAction\n"); + DBGPRINTF("omhdfs: done doAction\n"); + pthread_mutex_unlock(&mutDoAct); ENDdoAction BEGINendTransaction + instanceData *pData = pWrkrData->pData; CODESTARTendTransaction dbgprintf("omhdfs: endTransaction\n"); + pthread_mutex_lock(&mutDoAct); if(pData->offsBuf != 0) { DBGPRINTF("omhdfs: data unwritten at end of transaction, persisting...\n"); iRet = fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf); } + pthread_mutex_unlock(&mutDoAct); ENDendTransaction @@ -526,6 +550,7 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_doHUP ENDqueryEtryPt diff --git a/plugins/ommysql/Makefile.am b/plugins/ommysql/Makefile.am index e253b9d..f621a1b 100644 --- a/plugins/ommysql/Makefile.am +++ b/plugins/ommysql/Makefile.am @@ -1,6 +1,6 @@ pkglib_LTLIBRARIES = ommysql.la -ommysql_la_SOURCES = ommysql.c ommysql.h +ommysql_la_SOURCES = ommysql.c ommysql_la_CPPFLAGS = $(RSRT_CFLAGS) $(MYSQL_CFLAGS) $(PTHREADS_CFLAGS) ommysql_la_LDFLAGS = -module -avoid-version ommysql_la_LIBADD = $(MYSQL_LIBS) diff --git a/plugins/ommysql/Makefile.in b/plugins/ommysql/Makefile.in index 794736a..86efbad 100644 --- a/plugins/ommysql/Makefile.in +++ b/plugins/ommysql/Makefile.in @@ -384,7 +384,7 @@ top_build_prefix = @top_build_prefix@ top_builddir = @top_builddir@ top_srcdir = @top_srcdir@ pkglib_LTLIBRARIES = ommysql.la -ommysql_la_SOURCES = ommysql.c ommysql.h +ommysql_la_SOURCES = ommysql.c ommysql_la_CPPFLAGS = $(RSRT_CFLAGS) $(MYSQL_CFLAGS) $(PTHREADS_CFLAGS) ommysql_la_LDFLAGS = -module -avoid-version ommysql_la_LIBADD = $(MYSQL_LIBS) diff --git a/plugins/ommysql/ommysql.c b/plugins/ommysql/ommysql.c index c004d1c..d63297d 100644 --- a/plugins/ommysql/ommysql.c +++ b/plugins/ommysql/ommysql.c @@ -38,7 +38,6 @@ #include "syslogd-types.h" #include "srUtils.h" #include "template.h" -#include "ommysql.h" #include "module-template.h" #include "errmsg.h" #include "cfsysline.h" diff --git a/plugins/ommysql/ommysql.h b/plugins/ommysql/ommysql.h deleted file mode 100644 index d807578..0000000 --- a/plugins/ommysql/ommysql.h +++ /dev/null @@ -1,31 +0,0 @@ -/* omusrmsg.c - * These are the definitions for the build-in MySQL output module. - * - * File begun on 2007-07-13 by RGerhards (extracted from syslogd.c) - * - * Copyright 2007 Rainer Gerhards and Adiscon GmbH. - * - * This file is part of rsyslog. - * - * Rsyslog is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Rsyslog is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Rsyslog. If not, see . - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. - */ -#ifndef OMMYSQL_H_INCLUDED -#define OMMYSQL_H_INCLUDED 1 - -#endif /* #ifndef OMMYSQL_H_INCLUDED */ -/* - * vi:set ai: - */ diff --git a/plugins/omrabbitmq/omrabbitmq.c b/plugins/omrabbitmq/omrabbitmq.c index 7ea7793..8ea7e62 100644 --- a/plugins/omrabbitmq/omrabbitmq.c +++ b/plugins/omrabbitmq/omrabbitmq.c @@ -3,6 +3,7 @@ * This output plugin enables rsyslog to send messages to the RabbitMQ. * * Copyright 2012-2013 Vaclav Tomec + * Copyright 2014 Rainer Gerhards * * This program is free software: you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -52,6 +53,7 @@ MODULE_CNFNAME("omrabbitmq") DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; typedef struct _instanceData { /* here you need to define all action-specific data. A record of type @@ -72,6 +74,10 @@ typedef struct _instanceData { uchar *tplName; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* tables for interfacing with the v6 config system */ /* action (instance) parameters */ @@ -226,6 +232,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature /* use this to specify if select features are supported by this @@ -254,6 +265,10 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo /* permits to spit out some debug info */ @@ -270,6 +285,7 @@ ENDdbgPrintInstInfo BEGINtryResume + instanceData *pData = pWrkrData->pData; CODESTARTtryResume /* this is called when an action has been suspended and the * rsyslog core tries to resume it. The action must then @@ -293,14 +309,17 @@ CODESTARTtryResume * not always be the case. */ + pthread_mutex_lock(&mutDoAct); if (pData->conn == NULL) { iRet = initRabbitMQ(pData); } + pthread_mutex_unlock(&mutDoAct); ENDtryResume BEGINdoAction + instanceData *pData = pWrkrData->pData; CODESTARTdoAction /* this is where you receive the message and need to carry out the * action. Data is provided in ppString[i] where 0 <= i <= num of strings @@ -315,6 +334,7 @@ CODESTARTdoAction amqp_bytes_t body_bytes; + pthread_mutex_lock(&mutDoAct); if (pData->conn == NULL) { CHKiRet(initRabbitMQ(pData)); } @@ -330,7 +350,7 @@ CODESTARTdoAction } finalize_it: - + pthread_mutex_unlock(&mutDoAct); ENDdoAction @@ -455,6 +475,7 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES + CODEqueryEtryPt_STD_OMOD8_QUERIES ENDqueryEtryPt diff --git a/plugins/omstdout/omstdout.c b/plugins/omstdout/omstdout.c index 210b016..5e63ed7 100644 --- a/plugins/omstdout/omstdout.c +++ b/plugins/omstdout/omstdout.c @@ -156,7 +156,7 @@ CODESTARTdoAction * needs to be more solid. -- rgerhards, 2012-11-28 */ if((r = write(1, toWrite, len)) != (int) len) { /* 1 is stdout! */ - DBGPRINTF("omstdout: error %d writing to stdout[%d]: %s\n", + DBGPRINTF("omstdout: error %d writing to stdout[%zd]: %s\n", r, len, toWrite); } if(pWrkrData->pData->bEnsureLFEnding && toWrite[len-1] != '\n') { diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c index c8552f1..4eb4a37 100644 --- a/plugins/omzmq3/omzmq3.c +++ b/plugins/omzmq3/omzmq3.c @@ -2,26 +2,25 @@ * Copyright 2012 Talksum, Inc * Using the czmq interface to zeromq, we output * to a zmq socket. - - -* -* This program is free software: you can redistribute it and/or -* modify it under the terms of the GNU Lesser General Public License -* as published by the Free Software Foundation, either version 3 of -* the License, or (at your option) any later version. -* -* This program is distributed in the hope that it will be useful, but -* WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -* Lesser General Public License for more details. -* -* You should have received a copy of the GNU Lesser General Public -* License along with this program. If not, see -* . -* -* Author: David Kelly -* -*/ + * Copyright (C) 2014 Rainer Gerhards + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * . + * + * Author: David Kelly + * + */ #include "config.h" @@ -51,6 +50,8 @@ MODULE_CNFNAME("omzmq3") DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; + /* convienent symbols to denote a socket we want to bind vs one we want to just connect to */ @@ -97,6 +98,10 @@ typedef struct _instanceData { uchar* tplName; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* ---------------------------------------------------------------------------- * Static definitions/initializations @@ -309,6 +314,11 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance + +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -328,15 +338,26 @@ CODESTARTfreeInstance free(pData->identity); ENDfreeInstance + +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINtryResume CODESTARTtryResume - if(NULL == pData->socket) - iRet = initZMQ(pData); + pthread_mutex_lock(&mutDoAct); + if(NULL == pWrkrData->pData->socket) + iRet = initZMQ(pWrkrData->pData); + pthread_mutex_unlock(&mutDoAct); ENDtryResume BEGINdoAction + instanceData *pData = pWrkrData->pData; CODESTARTdoAction -iRet = writeZMQ(ppString[0], pData); + pthread_mutex_lock(&mutDoAct); + iRet = writeZMQ(ppString[0], pData); + pthread_mutex_unlock(&mutDoAct); ENDdoAction @@ -348,10 +369,10 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } -CHKiRet(createInstance(&pData)); -setInstParamDefaults(pData); + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); -CODE_STD_STRING_REQUESTnewActInst(1) + CODE_STD_STRING_REQUESTnewActInst(1) for (i = 0; i < actpblk.nParams; ++i) { if (!pvals[i].bUsed) continue; @@ -423,26 +444,25 @@ CODE_STD_STRING_REQUESTnewActInst(1) ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } -CODE_STD_FINALIZERnewActInst + CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst BEGINparseSelectorAct CODESTARTparseSelectorAct - -/* tell the engine we only want one template string */ -CODE_STD_STRING_REQUESTparseSelectorAct(1) + /* tell the engine we only want one template string */ + CODE_STD_STRING_REQUESTparseSelectorAct(1) if(!strncmp((char*) p, ":omzmq3:", sizeof(":omzmq3:") - 1)) errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, "omzmq3 supports only v6 config format, use: " "action(type=\"omzmq3\" serverport=...)"); ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); -CODE_STD_FINALIZERparseSelectorAct + CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct BEGINinitConfVars /* (re)set config variables to defaults */ CODESTARTinitConfVars -s_workerThreads = -1; + s_workerThreads = -1; ENDinitConfVars BEGINmodExit @@ -456,8 +476,9 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt -CODEqueryEtryPt_STD_OMOD_QUERIES -CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES + CODEqueryEtryPt_STD_OMOD_QUERIES + CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES + CODEqueryEtryPt_STD_OMOD8_QUERIES ENDqueryEtryPt BEGINmodInit() @@ -468,9 +489,6 @@ CODEmodInit_QueryRegCFSLineHdlr INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); DBGPRINTF("omzmq3: module compiled with rsyslog version %s.\n", VERSION); -INITLegCnfVars -CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID)); + INITLegCnfVars + CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID)); ENDmodInit - - - -- cgit v1.2.3