summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorMichael Biebl <biebl@debian.org>2014-06-04 22:22:46 +0200
committerMichael Biebl <biebl@debian.org>2014-06-04 22:22:46 +0200
commitdaeb0d03d4a65fa118ad25b34958fb9cacbbd6f4 (patch)
tree0c0d778c5d2f16d8dcc04e89b44a75e02769fa67 /plugins
parentf1040389ccb2430b9ab2ba3209aa28a62565b721 (diff)
downloadrsyslog-daeb0d03d4a65fa118ad25b34958fb9cacbbd6f4.tar.gz
Imported Upstream version 8.2.2upstream/8.2.2
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imdiag/imdiag.c10
-rw-r--r--plugins/imptcp/imptcp.c1
-rw-r--r--plugins/imudp/imudp.c1
-rw-r--r--plugins/mmrfc5424addhmac/mmrfc5424addhmac.c16
-rw-r--r--plugins/mmsnmptrapd/mmsnmptrapd.c10
-rw-r--r--plugins/omgssapi/omgssapi.c24
-rw-r--r--plugins/omhdfs/omhdfs.c43
-rw-r--r--plugins/ommysql/Makefile.am2
-rw-r--r--plugins/ommysql/Makefile.in2
-rw-r--r--plugins/ommysql/ommysql.c1
-rw-r--r--plugins/ommysql/ommysql.h31
-rw-r--r--plugins/omrabbitmq/omrabbitmq.c23
-rw-r--r--plugins/omstdout/omstdout.c2
-rw-r--r--plugins/omzmq3/omzmq3.c96
14 files changed, 163 insertions, 99 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c
index 5fdc6ef..0532b5b 100644
--- a/plugins/imdiag/imdiag.c
+++ b/plugins/imdiag/imdiag.c
@@ -1,13 +1,11 @@
/* imdiag.c
- * This is a diagnostics module, primarily meant for troubleshooting
- * and information about the runtime state of rsyslog. It is implemented
- * as an input plugin, because that interface best suits our needs
- * and also enables us to inject test messages (something not yet
- * implemented).
+ * This is a testbench tool. It started out with a broader scope,
+ * but we dropped this idea. To learn about rsyslog runtime statistics
+ * have a look at impstats.
*
* File begun on 2008-07-25 by RGerhards
*
- * Copyright 2008-2012 Adiscon GmbH.
+ * Copyright 2008-2014 Adiscon GmbH.
*
* This file is part of rsyslog.
*
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 920d876..9dc965b 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -1720,6 +1720,7 @@ BEGINactivateCnfPrePrivDrop
instanceConf_t *inst;
CODESTARTactivateCnfPrePrivDrop
iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */
+ DBGPRINTF("imptcp: config params iMaxLine %d\n", iMaxLine);
runModConf = pModConf;
for(inst = runModConf->root ; inst != NULL ; inst = inst->next) {
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 180c45f..61dfc58 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -1042,6 +1042,7 @@ CODESTARTactivateCnf
# ifdef HAVE_RECVMMSG
lenRcvBuf *= runModConf->batchSize;
# endif
+ DBGPRINTF("imudp: config params iMaxLine %d, lenRcvBuf %d\n", iMaxLine, lenRcvBuf);
for(i = 0 ; i < runModConf->wrkrMax ; ++i) {
# ifdef HAVE_RECVMMSG
CHKmalloc(wrkrInfo[i].recvmsg_iov = MALLOC(runModConf->batchSize * sizeof(struct iovec)));
diff --git a/plugins/mmrfc5424addhmac/mmrfc5424addhmac.c b/plugins/mmrfc5424addhmac/mmrfc5424addhmac.c
index 959a8ba..3a58edd 100644
--- a/plugins/mmrfc5424addhmac/mmrfc5424addhmac.c
+++ b/plugins/mmrfc5424addhmac/mmrfc5424addhmac.c
@@ -80,6 +80,10 @@ typedef struct _instanceData {
const EVP_MD *algo;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
@@ -129,6 +133,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
ENDisCompatibleWithFeature
@@ -139,6 +148,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
static inline void
setInstParamDefaults(instanceData *pData)
{
@@ -322,6 +336,7 @@ hashMsg(instanceData *pData, msg_t *pMsg)
BEGINdoAction
+ instanceData *pData = pWrkrData->pData;
msg_t *pMsg;
CODESTARTdoAction
pMsg = (msg_t*) ppString[0];
@@ -364,6 +379,7 @@ BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
diff --git a/plugins/mmsnmptrapd/mmsnmptrapd.c b/plugins/mmsnmptrapd/mmsnmptrapd.c
index 3547dcf..456eb79 100644
--- a/plugins/mmsnmptrapd/mmsnmptrapd.c
+++ b/plugins/mmsnmptrapd/mmsnmptrapd.c
@@ -68,8 +68,8 @@ struct severMap_s {
typedef struct _instanceData {
uchar *pszTagName;
- uchar *pszTagID; /* chaced: name plus trailing shlash (for compares) */
- int lenTagID; /* cached length of tag ID, for performance reasons */
+ uchar *pszTagID; /* cached: name plus trailing shlash (for compares) */
+ int lenTagID; /* cached: length of tag ID, for performance reasons */
struct severMap_s *severMap;
} instanceData;
@@ -202,7 +202,6 @@ getTagComponent(uchar *tag, uchar *dst, int *lenDst)
++i;
}
dst[i] = '\0';
-dbgprintf("XXXX: getTagComponent dst on output: '%s', len %d\n", dst, i);
*lenDst = i;
done:
return i;
@@ -241,7 +240,6 @@ BEGINdoAction
CODESTARTdoAction
pData = pWrkrData->pData;
pMsg = (msg_t*) ppString[0];
- dbgprintf("XXXX: mmsnmptrapd called with pMsg %p\n", pMsg);
getTAG(pMsg, &pszTag, &lenTAG);
if(strncmp((char*)pszTag, (char*)pData->pszTagID, pData->lenTagID)) {
DBGPRINTF("tag '%s' not matching, mmsnmptrapd ignoring this message\n",
@@ -250,18 +248,16 @@ CODESTARTdoAction
}
lenSever = sizeof(pszSever);
-dbgprintf("XXXX: pszTag: '%s', lenID %d\n", pszTag, pData->lenTagID);
getTagComponent(pszTag+pData->lenTagID-1, pszSever, &lenSever);
lenHost = sizeof(pszHost);
getTagComponent(pszTag+pData->lenTagID+lenSever, pszHost, &lenHost);
- dbgprintf("XXXX: mmsnmptrapd sever '%s'(%d), host '%s'(%d)\n", pszSever, lenSever, pszHost,lenHost);
+ DBGPRINTF("mmsnmptrapd: sever '%s'(%d), host '%s'(%d)\n", pszSever, lenSever, pszHost,lenHost);
if(pszHost[lenHost-1] == ':') {
pszHost[lenHost-1] = '\0';
--lenHost;
}
sevCode = lookupSeverityCode(pData, pszSever);
-dbgprintf("XXXX: severity for message is %d\n", sevCode);
/* now apply new settings */
MsgSetTAG(pMsg, pData->pszTagName, pData->lenTagID);
MsgSetHOSTNAME(pMsg, pszHost, lenHost);
diff --git a/plugins/omgssapi/omgssapi.c b/plugins/omgssapi/omgssapi.c
index 818a7cf..25400c7 100644
--- a/plugins/omgssapi/omgssapi.c
+++ b/plugins/omgssapi/omgssapi.c
@@ -4,7 +4,7 @@
* NOTE: read comments in module-template.h to understand how this file
* works!
*
- * Copyright 2007, 2008 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2014 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -88,6 +88,10 @@ typedef struct _instanceData {
OM_uint32 gss_flags;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* config data */
typedef enum gss_mode_e {
@@ -101,6 +105,7 @@ static struct configSettings_s {
gss_mode_t gss_mode;
} cs;
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
/* get the syslog forward port from selector_t. The passed in
* struct must be one that is setup for forwarding.
@@ -122,6 +127,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -163,6 +173,9 @@ CODESTARTfreeInstance
free(pData->f_hname);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -379,14 +392,19 @@ static rsRetVal doTryResume(instanceData *pData)
BEGINtryResume
CODESTARTtryResume
- iRet = doTryResume(pData);
+ pthread_mutex_lock(&mutDoAct);
+ iRet = doTryResume(pWrkrData->pData);
+ pthread_mutex_unlock(&mutDoAct);
ENDtryResume
BEGINdoAction
char *psz = NULL; /* temporary buffering */
register unsigned l;
int iMaxLine;
+ instanceData *pData;
CODESTARTdoAction
+ pthread_mutex_lock(&mutDoAct);
+ pData = pWrkrData->pData;
switch (pData->eDestState) {
case eDestFORW_SUSP:
dbgprintf("internal error in omgssapi.c, eDestFORW_SUSP in doAction()!\n");
@@ -465,6 +483,7 @@ finalize_it:
free(psz);
}
# endif
+ pthread_mutex_unlock(&mutDoAct);
ENDdoAction
@@ -656,6 +675,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
ENDqueryEtryPt
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
index f8a7e73..e173fb3 100644
--- a/plugins/omhdfs/omhdfs.c
+++ b/plugins/omhdfs/omhdfs.c
@@ -4,7 +4,7 @@
* NOTE: read comments in module-template.h to understand how this file
* works!
*
- * Copyright 2010 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2010-2014 Rainer Gerhards and Adiscon GmbH.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@@ -36,7 +36,12 @@
#include <unistd.h>
#include <sys/file.h>
#include <pthread.h>
-#include <hdfs.h>
+#ifdef HAVE_HDFS_H
+# include <hdfs.h>
+#endif
+#ifdef HAVE_HADOOP_HDFS_H
+# include <hadoop/hdfs.h>
+#endif
#include "syslogd-types.h"
#include "srUtils.h"
@@ -51,7 +56,7 @@
MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
-MODULE_CNFNAME("omhdfs")
+/* MODULE_CNFNAME("omhdfs") we need this only when we convert the module to v2 config system */
/* internal structures
*/
@@ -60,6 +65,7 @@ DEFobjCurrIf(errmsg)
/* global data */
static struct hashtable *files; /* holds all file objects that we know */
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
typedef struct configSettings_s {
uchar *fileName;
@@ -69,11 +75,6 @@ typedef struct configSettings_s {
} configSettings_t;
static configSettings_t cs;
-
-BEGINinitConfVars /* (re)set config variables to default values */
-CODESTARTinitConfVars
-ENDinitConfVars
-
typedef struct {
uchar *name;
hdfsFS fs;
@@ -91,6 +92,10 @@ typedef struct _instanceData {
unsigned offsBuf;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* forward definitions (down here, need data types) */
static inline rsRetVal fileClose(file_t *pFile);
@@ -387,6 +392,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINfreeInstance
CODESTARTfreeInstance
if(pData->pFile != NULL)
@@ -394,8 +404,15 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINtryResume
+ instanceData *pData = pWrkrData->pData;
CODESTARTtryResume
+ pthread_mutex_lock(&mutDoAct);
fileClose(pData->pFile);
fileOpen(pData->pFile);
if(pData->pFile->fh == NULL){
@@ -403,6 +420,7 @@ CODESTARTtryResume
pData->pFile->name);
iRet = RS_RET_SUSPENDED;
}
+ pthread_mutex_unlock(&mutDoAct);
ENDtryResume
@@ -413,20 +431,26 @@ ENDbeginTransaction
BEGINdoAction
+ instanceData *pData = pWrkrData->pData;
CODESTARTdoAction
DBGPRINTF("omhdfs: action to to write to %s\n", pData->pFile->name);
+ pthread_mutex_lock(&mutDoAct);
iRet = addData(pData, ppString[0]);
-dbgprintf("omhdfs: done doAction\n");
+ DBGPRINTF("omhdfs: done doAction\n");
+ pthread_mutex_unlock(&mutDoAct);
ENDdoAction
BEGINendTransaction
+ instanceData *pData = pWrkrData->pData;
CODESTARTendTransaction
dbgprintf("omhdfs: endTransaction\n");
+ pthread_mutex_lock(&mutDoAct);
if(pData->offsBuf != 0) {
DBGPRINTF("omhdfs: data unwritten at end of transaction, persisting...\n");
iRet = fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf);
}
+ pthread_mutex_unlock(&mutDoAct);
ENDendTransaction
@@ -526,6 +550,7 @@ BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_doHUP
ENDqueryEtryPt
diff --git a/plugins/ommysql/Makefile.am b/plugins/ommysql/Makefile.am
index e253b9d..f621a1b 100644
--- a/plugins/ommysql/Makefile.am
+++ b/plugins/ommysql/Makefile.am
@@ -1,6 +1,6 @@
pkglib_LTLIBRARIES = ommysql.la
-ommysql_la_SOURCES = ommysql.c ommysql.h
+ommysql_la_SOURCES = ommysql.c
ommysql_la_CPPFLAGS = $(RSRT_CFLAGS) $(MYSQL_CFLAGS) $(PTHREADS_CFLAGS)
ommysql_la_LDFLAGS = -module -avoid-version
ommysql_la_LIBADD = $(MYSQL_LIBS)
diff --git a/plugins/ommysql/Makefile.in b/plugins/ommysql/Makefile.in
index 794736a..86efbad 100644
--- a/plugins/ommysql/Makefile.in
+++ b/plugins/ommysql/Makefile.in
@@ -384,7 +384,7 @@ top_build_prefix = @top_build_prefix@
top_builddir = @top_builddir@
top_srcdir = @top_srcdir@
pkglib_LTLIBRARIES = ommysql.la
-ommysql_la_SOURCES = ommysql.c ommysql.h
+ommysql_la_SOURCES = ommysql.c
ommysql_la_CPPFLAGS = $(RSRT_CFLAGS) $(MYSQL_CFLAGS) $(PTHREADS_CFLAGS)
ommysql_la_LDFLAGS = -module -avoid-version
ommysql_la_LIBADD = $(MYSQL_LIBS)
diff --git a/plugins/ommysql/ommysql.c b/plugins/ommysql/ommysql.c
index c004d1c..d63297d 100644
--- a/plugins/ommysql/ommysql.c
+++ b/plugins/ommysql/ommysql.c
@@ -38,7 +38,6 @@
#include "syslogd-types.h"
#include "srUtils.h"
#include "template.h"
-#include "ommysql.h"
#include "module-template.h"
#include "errmsg.h"
#include "cfsysline.h"
diff --git a/plugins/ommysql/ommysql.h b/plugins/ommysql/ommysql.h
deleted file mode 100644
index d807578..0000000
--- a/plugins/ommysql/ommysql.h
+++ /dev/null
@@ -1,31 +0,0 @@
-/* omusrmsg.c
- * These are the definitions for the build-in MySQL output module.
- *
- * File begun on 2007-07-13 by RGerhards (extracted from syslogd.c)
- *
- * Copyright 2007 Rainer Gerhards and Adiscon GmbH.
- *
- * This file is part of rsyslog.
- *
- * Rsyslog is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * Rsyslog is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
- *
- * A copy of the GPL can be found in the file "COPYING" in this distribution.
- */
-#ifndef OMMYSQL_H_INCLUDED
-#define OMMYSQL_H_INCLUDED 1
-
-#endif /* #ifndef OMMYSQL_H_INCLUDED */
-/*
- * vi:set ai:
- */
diff --git a/plugins/omrabbitmq/omrabbitmq.c b/plugins/omrabbitmq/omrabbitmq.c
index 7ea7793..8ea7e62 100644
--- a/plugins/omrabbitmq/omrabbitmq.c
+++ b/plugins/omrabbitmq/omrabbitmq.c
@@ -3,6 +3,7 @@
* This output plugin enables rsyslog to send messages to the RabbitMQ.
*
* Copyright 2012-2013 Vaclav Tomec
+ * Copyright 2014 Rainer Gerhards
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@@ -52,6 +53,7 @@ MODULE_CNFNAME("omrabbitmq")
DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
typedef struct _instanceData {
/* here you need to define all action-specific data. A record of type
@@ -72,6 +74,10 @@ typedef struct _instanceData {
uchar *tplName;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* tables for interfacing with the v6 config system */
/* action (instance) parameters */
@@ -226,6 +232,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
/* use this to specify if select features are supported by this
@@ -254,6 +265,10 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
/* permits to spit out some debug info */
@@ -270,6 +285,7 @@ ENDdbgPrintInstInfo
BEGINtryResume
+ instanceData *pData = pWrkrData->pData;
CODESTARTtryResume
/* this is called when an action has been suspended and the
* rsyslog core tries to resume it. The action must then
@@ -293,14 +309,17 @@ CODESTARTtryResume
* not always be the case.
*/
+ pthread_mutex_lock(&mutDoAct);
if (pData->conn == NULL) {
iRet = initRabbitMQ(pData);
}
+ pthread_mutex_unlock(&mutDoAct);
ENDtryResume
BEGINdoAction
+ instanceData *pData = pWrkrData->pData;
CODESTARTdoAction
/* this is where you receive the message and need to carry out the
* action. Data is provided in ppString[i] where 0 <= i <= num of strings
@@ -315,6 +334,7 @@ CODESTARTdoAction
amqp_bytes_t body_bytes;
+ pthread_mutex_lock(&mutDoAct);
if (pData->conn == NULL) {
CHKiRet(initRabbitMQ(pData));
}
@@ -330,7 +350,7 @@ CODESTARTdoAction
}
finalize_it:
-
+ pthread_mutex_unlock(&mutDoAct);
ENDdoAction
@@ -455,6 +475,7 @@ BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+ CODEqueryEtryPt_STD_OMOD8_QUERIES
ENDqueryEtryPt
diff --git a/plugins/omstdout/omstdout.c b/plugins/omstdout/omstdout.c
index 210b016..5e63ed7 100644
--- a/plugins/omstdout/omstdout.c
+++ b/plugins/omstdout/omstdout.c
@@ -156,7 +156,7 @@ CODESTARTdoAction
* needs to be more solid. -- rgerhards, 2012-11-28
*/
if((r = write(1, toWrite, len)) != (int) len) { /* 1 is stdout! */
- DBGPRINTF("omstdout: error %d writing to stdout[%d]: %s\n",
+ DBGPRINTF("omstdout: error %d writing to stdout[%zd]: %s\n",
r, len, toWrite);
}
if(pWrkrData->pData->bEnsureLFEnding && toWrite[len-1] != '\n') {
diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c
index c8552f1..4eb4a37 100644
--- a/plugins/omzmq3/omzmq3.c
+++ b/plugins/omzmq3/omzmq3.c
@@ -2,26 +2,25 @@
* Copyright 2012 Talksum, Inc
* Using the czmq interface to zeromq, we output
* to a zmq socket.
-
-
-*
-* This program is free software: you can redistribute it and/or
-* modify it under the terms of the GNU Lesser General Public License
-* as published by the Free Software Foundation, either version 3 of
-* the License, or (at your option) any later version.
-*
-* This program is distributed in the hope that it will be useful, but
-* WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this program. If not, see
-* <http://www.gnu.org/licenses/>.
-*
-* Author: David Kelly
-* <davidk@talksum.com>
-*/
+ * Copyright (C) 2014 Rainer Gerhards
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Author: David Kelly
+ * <davidk@talksum.com>
+ */
#include "config.h"
@@ -51,6 +50,8 @@ MODULE_CNFNAME("omzmq3")
DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
+
/* convienent symbols to denote a socket we want to bind
vs one we want to just connect to
*/
@@ -97,6 +98,10 @@ typedef struct _instanceData {
uchar* tplName;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* ----------------------------------------------------------------------------
* Static definitions/initializations
@@ -309,6 +314,11 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -328,15 +338,26 @@ CODESTARTfreeInstance
free(pData->identity);
ENDfreeInstance
+
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINtryResume
CODESTARTtryResume
- if(NULL == pData->socket)
- iRet = initZMQ(pData);
+ pthread_mutex_lock(&mutDoAct);
+ if(NULL == pWrkrData->pData->socket)
+ iRet = initZMQ(pWrkrData->pData);
+ pthread_mutex_unlock(&mutDoAct);
ENDtryResume
BEGINdoAction
+ instanceData *pData = pWrkrData->pData;
CODESTARTdoAction
-iRet = writeZMQ(ppString[0], pData);
+ pthread_mutex_lock(&mutDoAct);
+ iRet = writeZMQ(ppString[0], pData);
+ pthread_mutex_unlock(&mutDoAct);
ENDdoAction
@@ -348,10 +369,10 @@ CODESTARTnewActInst
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
-CHKiRet(createInstance(&pData));
-setInstParamDefaults(pData);
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
-CODE_STD_STRING_REQUESTnewActInst(1)
+ CODE_STD_STRING_REQUESTnewActInst(1)
for (i = 0; i < actpblk.nParams; ++i) {
if (!pvals[i].bUsed)
continue;
@@ -423,26 +444,25 @@ CODE_STD_STRING_REQUESTnewActInst(1)
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
-CODE_STD_FINALIZERnewActInst
+ CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
BEGINparseSelectorAct
CODESTARTparseSelectorAct
-
-/* tell the engine we only want one template string */
-CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ /* tell the engine we only want one template string */
+ CODE_STD_STRING_REQUESTparseSelectorAct(1)
if(!strncmp((char*) p, ":omzmq3:", sizeof(":omzmq3:") - 1))
errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
"omzmq3 supports only v6 config format, use: "
"action(type=\"omzmq3\" serverport=...)");
ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
-CODE_STD_FINALIZERparseSelectorAct
+ CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
BEGINinitConfVars /* (re)set config variables to defaults */
CODESTARTinitConfVars
-s_workerThreads = -1;
+ s_workerThreads = -1;
ENDinitConfVars
BEGINmodExit
@@ -456,8 +476,9 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
-CODEqueryEtryPt_STD_OMOD_QUERIES
-CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+ CODEqueryEtryPt_STD_OMOD_QUERIES
+ CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+ CODEqueryEtryPt_STD_OMOD8_QUERIES
ENDqueryEtryPt
BEGINmodInit()
@@ -468,9 +489,6 @@ CODEmodInit_QueryRegCFSLineHdlr
INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
DBGPRINTF("omzmq3: module compiled with rsyslog version %s.\n", VERSION);
-INITLegCnfVars
-CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID));
+ INITLegCnfVars
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID));
ENDmodInit
-
-
-