diff options
Diffstat (limited to 'plugins/omzmq3/omzmq3.c')
-rw-r--r-- | plugins/omzmq3/omzmq3.c | 304 |
1 files changed, 168 insertions, 136 deletions
diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c index ee6756b..4eb4a37 100644 --- a/plugins/omzmq3/omzmq3.c +++ b/plugins/omzmq3/omzmq3.c @@ -2,26 +2,25 @@ * Copyright 2012 Talksum, Inc * Using the czmq interface to zeromq, we output * to a zmq socket. - - -* -* This program is free software: you can redistribute it and/or -* modify it under the terms of the GNU Lesser General Public License -* as published by the Free Software Foundation, either version 3 of -* the License, or (at your option) any later version. -* -* This program is distributed in the hope that it will be useful, but -* WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -* Lesser General Public License for more details. -* -* You should have received a copy of the GNU Lesser General Public -* License along with this program. If not, see -* <http://www.gnu.org/licenses/>. -* -* Author: David Kelly -* <davidk@talksum.com> -*/ + * Copyright (C) 2014 Rainer Gerhards + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: David Kelly + * <davidk@talksum.com> + */ #include "config.h" @@ -51,6 +50,8 @@ MODULE_CNFNAME("omzmq3") DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; + /* convienent symbols to denote a socket we want to bind vs one we want to just connect to */ @@ -97,6 +98,10 @@ typedef struct _instanceData { uchar* tplName; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* ---------------------------------------------------------------------------- * Static definitions/initializations @@ -110,9 +115,10 @@ static zctx_t* s_context = NULL; static int s_workerThreads = -1; static struct socket_type types[] = { - {"PUB", ZMQ_PUB }, - {"PUSH", ZMQ_PUSH }, - {"XPUB", ZMQ_XPUB } + {"PUB", ZMQ_PUB }, + {"PUSH", ZMQ_PUSH }, + {"DEALER", ZMQ_DEALER }, + {"XPUB", ZMQ_XPUB } }; static struct socket_action actions[] = { @@ -201,17 +207,18 @@ static rsRetVal initZMQ(instanceData* pData) { /* create the context if necessary. */ if (NULL == s_context) { + zsys_handler_set(NULL); s_context = zctx_new(); if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads); } pData->socket = zsocket_new(s_context, pData->type); - - /* ALWAYS set the HWM as the zmq3 default is 1000 and we default - to 0 (infinity) */ - zsocket_set_rcvhwm(pData->socket, pData->rcvHWM); - zsocket_set_sndhwm(pData->socket, pData->sndHWM); - + if (NULL == pData->socket) { + errmsg.LogError(0, RS_RET_NO_ERRCODE, + "omzmq3: zsocket_new failed for %s: %s", + pData->description, zmq_strerror(errno)); + ABORT_FINALIZE(RS_RET_NO_ERRCODE); + } /* use czmq defaults for these, unless set to non-default values */ if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity); if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf); @@ -228,17 +235,26 @@ static rsRetVal initZMQ(instanceData* pData) { if(pData->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(pData->socket, pData->reconnectIVLMax); if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only); if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity); - + if(pData->rcvHWM > -1) zsocket_set_rcvhwm(pData->socket, pData->rcvHWM); + if(pData->sndHWM > -1) zsocket_set_sndhwm(pData->socket, pData->sndHWM); + /* bind or connect to it */ if (pData->action == ACTION_BIND) { /* bind asserts, so no need to test return val here which isn't the greatest api -- oh well */ - zsocket_bind(pData->socket, (char*)pData->description); + if(-1 == zsocket_bind(pData->socket, (char*)pData->description)) { + errmsg.LogError(0, RS_RET_NO_ERRCODE, "omzmq3: bind failed for %s: %s", + pData->description, zmq_strerror(errno)); + ABORT_FINALIZE(RS_RET_NO_ERRCODE); + } + DBGPRINTF("omzmq3: bind to %s successful\n",pData->description); } else { - if(zsocket_connect(pData->socket, (char*)pData->description) == -1) { - errmsg.LogError(0, RS_RET_SUSPENDED, "omzmq3: connect failed!"); - ABORT_FINALIZE(RS_RET_SUSPENDED); + if(-1 == zsocket_connect(pData->socket, (char*)pData->description)) { + errmsg.LogError(0, RS_RET_NO_ERRCODE, "omzmq3: connect failed for %s: %s", + pData->description, zmq_strerror(errno)); + ABORT_FINALIZE(RS_RET_NO_ERRCODE); } + DBGPRINTF("omzmq3: connect to %s successful", pData->description); } finalize_it: RETiRet; @@ -256,7 +272,7 @@ rsRetVal writeZMQ(uchar* msg, instanceData* pData) { /* whine if things went wrong */ if (result == -1) { - errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result); + errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed: %s", msg, zmq_strerror(errno)); ABORT_FINALIZE(RS_RET_ERR); } finalize_it: @@ -265,13 +281,13 @@ rsRetVal writeZMQ(uchar* msg, instanceData* pData) { static inline void setInstParamDefaults(instanceData* pData) { - pData->description = (uchar*)"tcp://*:7171"; + pData->description = NULL; pData->socket = NULL; pData->tplName = NULL; pData->type = ZMQ_PUB; pData->action = ACTION_BIND; - pData->sndHWM = 0; /*unlimited*/ - pData->rcvHWM = 0; /*unlimited*/ + pData->sndHWM = -1; + pData->rcvHWM = -1; pData->identity = NULL; pData->sndBuf = -1; pData->rcvBuf = -1; @@ -298,6 +314,11 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance + +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -314,136 +335,150 @@ CODESTARTfreeInstance closeZMQ(pData); free(pData->description); free(pData->tplName); + free(pData->identity); ENDfreeInstance + +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINtryResume CODESTARTtryResume - if(NULL == pData->socket) - iRet = initZMQ(pData); + pthread_mutex_lock(&mutDoAct); + if(NULL == pWrkrData->pData->socket) + iRet = initZMQ(pWrkrData->pData); + pthread_mutex_unlock(&mutDoAct); ENDtryResume BEGINdoAction + instanceData *pData = pWrkrData->pData; CODESTARTdoAction -iRet = writeZMQ(ppString[0], pData); + pthread_mutex_lock(&mutDoAct); + iRet = writeZMQ(ppString[0], pData); + pthread_mutex_unlock(&mutDoAct); ENDdoAction BEGINnewActInst - struct cnfparamvals *pvals; - int i; + struct cnfparamvals *pvals; + int i; CODESTARTnewActInst -if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { - ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); - } - -CHKiRet(createInstance(&pData)); -setInstParamDefaults(pData); - -CODE_STD_STRING_REQUESTnewActInst(1) -for(i = 0 ; i < actpblk.nParams ; ++i) { - if(!pvals[i].bUsed) - continue; - if(!strcmp(actpblk.descr[i].name, "description")) { - pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "template")) { - pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "sockType")){ - pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL)); - } else if(!strcmp(actpblk.descr[i].name, "action")){ - pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); - } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) { - pData->sndHWM = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) { - pData->rcvHWM = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "identity")){ - pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) { - pData->sndBuf = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) { - pData->rcvBuf = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "linger")) { - pData->linger = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "backlog")) { - pData->backlog = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) { - pData->sndTimeout = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) { - pData->rcvTimeout = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) { - pData->maxMsgSize = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "rate")) { - pData->rate = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) { - pData->recoveryIVL = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) { - pData->multicastHops = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) { - pData->reconnectIVL = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { - pData->reconnectIVLMax = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) { - pData->ipv4Only = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "affinity")) { - pData->affinity = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { - s_workerThreads = (int) pvals[i].val.d.n; + if ((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); + + CODE_STD_STRING_REQUESTnewActInst(1) + for (i = 0; i < actpblk.nParams; ++i) { + if (!pvals[i].bUsed) + continue; + if (!strcmp(actpblk.descr[i].name, "description")) { + pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "sockType")){ + pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if (!strcmp(actpblk.descr[i].name, "action")){ + pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if (!strcmp(actpblk.descr[i].name, "sndHWM")) { + pData->sndHWM = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "rcvHWM")) { + pData->rcvHWM = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "identity")){ + pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "sndBuf")) { + pData->sndBuf = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "rcvBuf")) { + pData->rcvBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "linger")) { + pData->linger = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "backlog")) { + pData->backlog = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "sndTimeout")) { + pData->sndTimeout = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "rcvTimeout")) { + pData->rcvTimeout = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "maxMsgSize")) { + pData->maxMsgSize = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "rate")) { + pData->rate = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "recoveryIVL")) { + pData->recoveryIVL = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "multicastHops")) { + pData->multicastHops = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "reconnectIVL")) { + pData->reconnectIVL = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { + pData->reconnectIVLMax = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "ipv4Only")) { + pData->ipv4Only = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "affinity")) { + pData->affinity = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { + s_workerThreads = (int) pvals[i].val.d.n; + } else { + errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + + if (pData->tplName == NULL) { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup("RSYSLOG_ForwardFormat"), OMSR_NO_RQD_TPL_OPTS)); } else { - errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " - "param '%s'\n", actpblk.descr[i].name); + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS)); + } + if (NULL == pData->description) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: you didn't enter a description"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if (pData->type == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type."); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } - } - -if(pData->tplName == NULL) { - CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); - } else { - CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS)); - } - -if(pData->type == -1) { - errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type."); - ABORT_FINALIZE(RS_RET_CONFIG_ERROR); - } -if(pData->action == -1) { - errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action"); - ABORT_FINALIZE(RS_RET_CONFIG_ERROR); - } - - -CODE_STD_FINALIZERnewActInst - cnfparamvalsDestruct(pvals, &actpblk); + if (pData->action == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + + CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst BEGINparseSelectorAct CODESTARTparseSelectorAct - -/* tell the engine we only want one template string */ -CODE_STD_STRING_REQUESTparseSelectorAct(1) + /* tell the engine we only want one template string */ + CODE_STD_STRING_REQUESTparseSelectorAct(1) if(!strncmp((char*) p, ":omzmq3:", sizeof(":omzmq3:") - 1)) errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, "omzmq3 supports only v6 config format, use: " "action(type=\"omzmq3\" serverport=...)"); ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); -CODE_STD_FINALIZERparseSelectorAct + CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct BEGINinitConfVars /* (re)set config variables to defaults */ CODESTARTinitConfVars -s_workerThreads = -1; + s_workerThreads = -1; ENDinitConfVars BEGINmodExit CODESTARTmodExit -if(NULL != s_context) { - zctx_destroy(&s_context); - s_context=NULL; - } + if (NULL != s_context) { + zctx_destroy(&s_context); + s_context=NULL; + } ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt -CODEqueryEtryPt_STD_OMOD_QUERIES -CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES + CODEqueryEtryPt_STD_OMOD_QUERIES + CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES + CODEqueryEtryPt_STD_OMOD8_QUERIES ENDqueryEtryPt BEGINmodInit() @@ -454,9 +489,6 @@ CODEmodInit_QueryRegCFSLineHdlr INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); DBGPRINTF("omzmq3: module compiled with rsyslog version %s.\n", VERSION); -INITLegCnfVars -CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID)); + INITLegCnfVars + CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID)); ENDmodInit - - - |