diff options
Diffstat (limited to 'plugins/imudp/imudp.c')
-rw-r--r-- | plugins/imudp/imudp.c | 428 |
1 files changed, 331 insertions, 97 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index a5ba6a2..180c45f 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -31,6 +31,8 @@ #include <errno.h> #include <unistd.h> #include <netdb.h> +#include <sys/socket.h> +#include <pthread.h> #if HAVE_SYS_EPOLL_H # include <sys/epoll.h> #endif @@ -59,6 +61,7 @@ MODULE_TYPE_NOKEEP MODULE_CNFNAME("imudp") /* defines */ +#define MAX_WRKR_THREADS 32 /* Module static data */ DEF_IMOD_STATIC_DATA @@ -78,9 +81,11 @@ static struct lstn_s { prop_t *pInputName; statsobj_t *stats; /* listener stats */ ratelimit_t *ratelimiter; + uchar *dfltTZ; STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) } *lcnfRoot = NULL, *lcnfLast = NULL; + static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */ static int iMaxLine; /* maximum UDP message size supported */ @@ -88,11 +93,7 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte * This shall prevent remote DoS when the "discard on disallowed sender" * message is configured to be logged on occurance of such a case. */ -static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc - * it so that we can check available memory in willRun() and request - * termination if we can not get it. -- rgerhards, 2007-12-27 - */ - +#define BATCH_SIZE_DFLT 32 /* do not overdo, has heavy toll on memory, especially with large msgs */ #define TIME_REQUERY_DFLT 2 #define SCHED_PRIO_UNSET -12345678 /* a value that indicates that the scheduling priority has not been set */ /* config vars for legacy config system */ @@ -110,12 +111,33 @@ struct instanceConf_s { uchar *pszBindRuleset; /* name of ruleset to bind to */ uchar *inputname; ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + uchar *dfltTZ; int ratelimitInterval; int ratelimitBurst; + int rcvbuf; /* 0 means: do not set, keep OS default */ struct instanceConf_s *next; sbool bAppendPortToInpname; }; +/* The following structure controls the worker threads. Global data is + * needed for their access. + */ +static struct wrkrInfo_s { + pthread_t tid; /* the worker's thread ID */ + int id; + thrdInfo_t *pThrd; + statsobj_t *stats; /* worker thread stats */ + STATSCOUNTER_DEF(ctrCall_recvmmsg, mutCtrCall_recvmmsg) + STATSCOUNTER_DEF(ctrCall_recvmsg, mutCtrCall_recvmsg) + STATSCOUNTER_DEF(ctrMsgsRcvd, mutCtrMsgsRcvd) + uchar *pRcvBuf; /* receive buffer (for a single packet) */ +# ifdef HAVE_RECVMMSG + struct sockaddr_storage *frominet; + struct mmsghdr *recvmsg_mmh; + struct iovec *recvmsg_iov; +# endif +} wrkrInfo[MAX_WRKR_THREADS]; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ instanceConf_t *root, *tail; @@ -123,6 +145,8 @@ struct modConfData_s { int iSchedPolicy; /* scheduling policy as SCHED_xxx */ int iSchedPrio; /* scheduling priority */ int iTimeRequery; /* how often is time to be queried inside tight recv loop? 0=always */ + int batchSize; /* max nbr of input batch --> also recvmmsg() max count */ + int8_t wrkrMax; /* max nbr of worker threads */ sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ @@ -132,6 +156,8 @@ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current lo static struct cnfparamdescr modpdescr[] = { { "schedulingpolicy", eCmdHdlrGetWord, 0 }, { "schedulingpriority", eCmdHdlrInt, 0 }, + { "batchsize", eCmdHdlrInt, 0 }, + { "threads", eCmdHdlrPositiveInt, 0 }, { "timerequery", eCmdHdlrInt, 0 } }; static struct cnfparamblk modpblk = @@ -143,12 +169,14 @@ static struct cnfparamblk modpblk = /* input instance parameters */ static struct cnfparamdescr inppdescr[] = { { "port", eCmdHdlrArray, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */ + { "defaulttz", eCmdHdlrString, 0 }, { "inputname", eCmdHdlrGetWord, 0 }, { "inputname.appendport", eCmdHdlrBinary, 0 }, { "address", eCmdHdlrString, 0 }, - { "ruleset", eCmdHdlrString, 0 }, { "ratelimit.interval", eCmdHdlrInt, 0 }, - { "ratelimit.burst", eCmdHdlrInt, 0 } + { "ratelimit.burst", eCmdHdlrInt, 0 }, + { "rcvbufsize", eCmdHdlrSize, 0 }, + { "ruleset", eCmdHdlrString, 0 } }; static struct cnfparamblk inppblk = { CNFPARAMBLK_VERSION, @@ -177,6 +205,8 @@ createInstance(instanceConf_t **pinst) inst->bAppendPortToInpname = 0; inst->ratelimitBurst = 10000; /* arbitrary high limit */ inst->ratelimitInterval = 0; /* off */ + inst->rcvbuf = 0; + inst->dfltTZ = NULL; /* node created, let's add to config */ if(loadModConf->tail == NULL) { @@ -252,7 +282,7 @@ addListner(instanceConf_t *inst) DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", bindName, inst->pszBindPort); - newSocks = net.create_udp_socket(bindAddr, port, 1); + newSocks = net.create_udp_socket(bindAddr, port, 1, inst->rcvbuf); if(newSocks != NULL) { /* we now need to add the new sockets to the existing set */ /* ready to copy */ @@ -261,14 +291,15 @@ addListner(instanceConf_t *inst) newlcnfinfo->next = NULL; newlcnfinfo->sock = newSocks[iSrc]; newlcnfinfo->pRuleset = inst->pBindRuleset; - snprintf((char*)dispname, sizeof(dispname), "imudp(%s:%s)", bindName, port); - dispname[sizeof(dispname)-1] = '\0'; /* just to be on the save side... */ - CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter, (char*)dispname, NULL)); + newlcnfinfo->dfltTZ = inst->dfltTZ; if(inst->inputname == NULL) { inputname = (uchar*)"imudp"; } else { inputname = inst->inputname; } + snprintf((char*)dispname, sizeof(dispname), "%s(%s:%s)", inputname, bindName, port); + dispname[sizeof(dispname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter, (char*)dispname, NULL)); if(inst->bAppendPortToInpname) { snprintf((char*)inpnameBuf, sizeof(inpnameBuf), "%s%s", inputname, port); @@ -286,7 +317,7 @@ addListner(instanceConf_t *inst) CHKiRet(statsobj.SetName(newlcnfinfo->stats, dispname)); STATSCOUNTER_INIT(newlcnfinfo->ctrSubmit, newlcnfinfo->mutCtrSubmit); CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"), - ctrType_IntCtr, &(newlcnfinfo->ctrSubmit))); + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(newlcnfinfo->ctrSubmit))); CHKiRet(statsobj.ConstructFinalize(newlcnfinfo->stats)); /* link to list. Order must be preserved to take care for * conflicting matches. @@ -318,6 +349,155 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta } +/* This function processes received data. It provides unified handling + * in cases where recvmmsg() is available and not. + */ +static inline rsRetVal +processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, + uchar *rcvBuf, ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime, + struct sockaddr_storage *frominet, socklen_t socklen, multi_submit_t *multiSub) +{ + DEFiRet; + msg_t *pMsg; + + assert(pThrd != NULL); + + if(lenRcvBuf == 0) + FINALIZE; /* this looks a bit strange, but practice shows it happens... */ + + /* if we reach this point, we had a good receive and can process the packet received */ + /* check if we have a different sender than before, if so, we need to query some new values */ + if(bDoACLCheck) { + socklen = sizeof(struct sockaddr_storage); + if(net.CmpHost(frominet, frominetPrev, socklen) != 0) { + memcpy(frominetPrev, frominet, socklen); /* update cache indicator */ + /* Here we check if a host is permitted to send us syslog messages. If it isn't, + * we do not further process the message but log a warning (if we are + * configured to do this). However, if the check would require name resolution, + * it is postponed to the main queue. See also my blog post at + * http://blog.gerhards.net/2009/11/acls-imudp-and-accepting-messages.html + * rgerhards, 2009-11-16 + */ + *pbIsPermitted = net.isAllowedSender2((uchar*)"UDP", + (struct sockaddr *)frominet, "", 0); + + if(*pbIsPermitted == 0) { + DBGPRINTF("msg is not from an allowed sender\n"); + if(glbl.GetOption_DisallowWarning) { + time_t tt; + datetime.GetTime(&tt); + if(tt > ttLastDiscard + 60) { + ttLastDiscard = tt; + errmsg.LogError(0, NO_ERRCODE, + "UDP message from disallowed sender discarded"); + } + } + } + } + } else { + *pbIsPermitted = 1; /* no check -> everything permitted */ + } + + DBGPRINTF("recv(%d,%d),acl:%d,msg:%.128s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, rcvBuf); + + if(*pbIsPermitted != 0) { + /* we now create our own message object and submit it to the queue */ + CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); + MsgSetRawMsg(pMsg, (char*)rcvBuf, lenRcvBuf); + MsgSetInputName(pMsg, lstn->pInputName); + MsgSetRuleset(pMsg, lstn->pRuleset); + MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); + if(lstn->dfltTZ != NULL) + MsgSetDfltTZ(pMsg, (char*) lstn->dfltTZ); + pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; + if(*pbIsPermitted == 2) + pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ + CHKiRet(msgSetFromSockinfo(pMsg, frominet)); + CHKiRet(ratelimitAddMsg(lstn->ratelimiter, multiSub, pMsg)); + STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); + } + +finalize_it: + RETiRet; +} + + + + +/* The following "two" functions are helpers to runInput. Actually, it is + * just one function. Depending on whether or not we have recvmmsg(), + * an appropriate version is compiled (as such we need to maintain both!). + */ +#ifdef HAVE_RECVMMSG +static inline rsRetVal +processSocket(struct wrkrInfo_s *pWrkr, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) +{ + DEFiRet; + int iNbrTimeUsed; + time_t ttGenTime; + struct syslogTime stTime; + char errStr[1024]; + msg_t *pMsgs[CONF_NUM_MULTISUB]; + multi_submit_t multiSub; + int nelem; + int i; + + multiSub.ppMsgs = pMsgs; + multiSub.maxElem = CONF_NUM_MULTISUB; + multiSub.nElem = 0; + iNbrTimeUsed = 0; + while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */ + if(pWrkr->pThrd->bShallStop == RSTRUE) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + memset(pWrkr->recvmsg_iov, 0, runModConf->batchSize * sizeof(struct iovec)); + memset(pWrkr->recvmsg_mmh, 0, runModConf->batchSize * sizeof(struct mmsghdr)); + for(i = 0 ; i < runModConf->batchSize ; ++i) { + pWrkr->recvmsg_iov[i].iov_base = pWrkr->pRcvBuf+(i*(iMaxLine+1)); + pWrkr->recvmsg_iov[i].iov_len = iMaxLine; + pWrkr->recvmsg_mmh[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); + pWrkr->recvmsg_mmh[i].msg_hdr.msg_name = &(pWrkr->frominet[i]); + pWrkr->recvmsg_mmh[i].msg_hdr.msg_iov = &(pWrkr->recvmsg_iov[i]); + pWrkr->recvmsg_mmh[i].msg_hdr.msg_iovlen = 1; + } + nelem = recvmmsg(lstn->sock, pWrkr->recvmsg_mmh, runModConf->batchSize, 0, NULL); + STATSCOUNTER_INC(pWrkr->ctrCall_recvmmsg, pWrkr->mutCtrCall_recvmmsg); + DBGPRINTF("imudp: recvmmsg returned %d\n", nelem); + if(nelem < 0 && errno == ENOSYS) { + /* be careful: some versions of valgrind do not support recvmmsg()! */ + DBGPRINTF("imudp: error ENOSYS on call to recvmmsg() - fall back to recvmsg\n"); + nelem = recvmsg(lstn->sock, &(pWrkr->recvmsg_mmh[0].msg_hdr), 0); + STATSCOUNTER_INC(pWrkr->ctrCall_recvmsg, pWrkr->mutCtrCall_recvmsg); + if(nelem >= 0) { + pWrkr->recvmsg_mmh[0].msg_len = nelem; + nelem = 1; + } + } + if(nelem < 0) { + if(errno != EINTR && errno != EAGAIN) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); + errmsg.LogError(errno, NO_ERRCODE, "imudp: error receiving on socket: %s", errStr); + } + ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! + } + + if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { + datetime.getCurrTime(&stTime, &ttGenTime); + } + + pWrkr->ctrMsgsRcvd += nelem; + for(i = 0 ; i < nelem ; ++i) { + processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, pWrkr->recvmsg_mmh[i].msg_hdr.msg_iov->iov_base, + pWrkr->recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &(pWrkr->frominet[i]), + pWrkr->recvmsg_mmh[i].msg_hdr.msg_namelen, &multiSub); + } + } + +finalize_it: + multiSubmitFlush(&multiSub); + RETiRet; +} +#else /* we do not have recvmmsg() */ /* This function is a helper to runInput. I have extracted it * from the main loop just so that we do not have that large amount of code * in a single place. This function takes a socket and pulls messages from @@ -333,108 +513,61 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta * on scheduling order. -- rgerhards, 2008-10-02 */ static inline rsRetVal -processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) +processSocket(struct wrkrInfo_s *pWrkr, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) { int iNbrTimeUsed; time_t ttGenTime; struct syslogTime stTime; - socklen_t socklen; ssize_t lenRcvBuf; struct sockaddr_storage frominet; - msg_t *pMsg; - prop_t *propFromHost = NULL; - prop_t *propFromHostIP = NULL; multi_submit_t multiSub; msg_t *pMsgs[CONF_NUM_MULTISUB]; char errStr[1024]; + struct msghdr mh; + struct iovec iov[1]; DEFiRet; - assert(pThrd != NULL); multiSub.ppMsgs = pMsgs; multiSub.maxElem = CONF_NUM_MULTISUB; multiSub.nElem = 0; iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a bad receive, done below in the body */ - if(pThrd->bShallStop == RSTRUE) + if(pWrkr->pThrd->bShallStop == RSTRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); - socklen = sizeof(struct sockaddr_storage); - lenRcvBuf = recvfrom(lstn->sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + memset(iov, 0, sizeof(iov)); + iov[0].iov_base = pWrkr->pRcvBuf; + iov[0].iov_len = iMaxLine; + memset(&mh, 0, sizeof(mh)); + mh.msg_name = &frominet; + mh.msg_namelen = sizeof(struct sockaddr_storage); + mh.msg_iov = iov; + mh.msg_iovlen = 1; + lenRcvBuf = recvmsg(lstn->sock, &mh, 0); + STATSCOUNTER_INC(pWrkr->ctrCall_recvmsg, pWrkr->mutCtrCall_recvmsg); if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); - errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); + errmsg.LogError(errno, NO_ERRCODE, "imudp: error receiving on socket: %s", errStr); } ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! } - if(lenRcvBuf == 0) - continue; /* this looks a bit strange, but practice shows it happens... */ - - /* if we reach this point, we had a good receive and can process the packet received */ - /* check if we have a different sender than before, if so, we need to query some new values */ - if(bDoACLCheck) { - if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) { - memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ - /* Here we check if a host is permitted to send us syslog messages. If it isn't, - * we do not further process the message but log a warning (if we are - * configured to do this). However, if the check would require name resolution, - * it is postponed to the main queue. See also my blog post at - * http://blog.gerhards.net/2009/11/acls-imudp-and-accepting-messages.html - * rgerhards, 2009-11-16 - */ - *pbIsPermitted = net.isAllowedSender2((uchar*)"UDP", - (struct sockaddr *)&frominet, "", 0); - - if(*pbIsPermitted == 0) { - DBGPRINTF("msg is not from an allowed sender\n"); - if(glbl.GetOption_DisallowWarning) { - time_t tt; - datetime.GetTime(&tt); - if(tt > ttLastDiscard + 60) { - ttLastDiscard = tt; - errmsg.LogError(0, NO_ERRCODE, - "UDP message from disallowed sender discarded"); - } - } - } - } - } else { - *pbIsPermitted = 1; /* no check -> everything permitted */ + ++pWrkr->ctrMsgsRcvd; + if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { + datetime.getCurrTime(&stTime, &ttGenTime); } - DBGPRINTF("imudp:recv(%d,%d),acl:%d,msg:%.128s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); - - if(*pbIsPermitted != 0) { - if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { - datetime.getCurrTime(&stTime, &ttGenTime); - } - /* we now create our own message object and submit it to the queue */ - CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); - MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); - MsgSetInputName(pMsg, lstn->pInputName); - MsgSetRuleset(pMsg, lstn->pRuleset); - MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); - pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; - if(*pbIsPermitted == 2) - pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ - CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); - CHKiRet(ratelimitAddMsg(lstn->ratelimiter, &multiSub, pMsg)); - STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); - } + CHKiRet(processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, pWrkr->pRcvBuf, lenRcvBuf, &stTime, + ttGenTime, &frominet, mh.msg_namelen, &multiSub)); } finalize_it: multiSubmitFlush(&multiSub); - - if(propFromHost != NULL) - prop.Destruct(&propFromHost); - if(propFromHostIP != NULL) - prop.Destruct(&propFromHostIP); - RETiRet; } +#endif /* #ifdef HAVE_RECVMMSG */ /* check configured scheduling priority. @@ -565,7 +698,7 @@ finalize_it: */ #if defined(HAVE_EPOLL_CREATE1) || defined(HAVE_EPOLL_CREATE) #define NUM_EPOLL_EVENTS 10 -rsRetVal rcvMainLoop(thrdInfo_t *pThrd) +rsRetVal rcvMainLoop(struct wrkrInfo_s *pWrkr) { DEFiRet; int nfds; @@ -628,11 +761,11 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1); DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds); - if(pThrd->bShallStop == RSTRUE) + if(pWrkr->pThrd->bShallStop == RSTRUE) break; /* terminate input! */ for(i = 0 ; i < nfds ; ++i) { - processSocket(pThrd, currEvt[i].data.ptr, &frominetPrev, &bIsPermitted); + processSocket(pWrkr, currEvt[i].data.ptr, &frominetPrev, &bIsPermitted); } } @@ -644,7 +777,7 @@ finalize_it: } #else /* #if HAVE_EPOLL_CREATE1 */ /* this is the code for the select() interface */ -rsRetVal rcvMainLoop(thrdInfo_t *pThrd) +rsRetVal rcvMainLoop(thrdInfo_t *pWrkr) { DEFiRet; int maxfds; @@ -691,7 +824,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) { if(FD_ISSET(lstn->sock, &readfds)) { - processSocket(pThrd, lstn, &frominetPrev, &bIsPermitted); + processSocket(pWrkr, lstn, &frominetPrev, &bIsPermitted); --nfds; /* indicate we have processed one descriptor */ } } @@ -721,6 +854,8 @@ createListner(es_str_t *port, struct cnfparamvals *pvals) inst->inputname = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "inputname.appendport")) { inst->bAppendPortToInpname = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "defaulttz")) { + inst->dfltTZ = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "address")) { inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "ruleset")) { @@ -729,6 +864,8 @@ createListner(es_str_t *port, struct cnfparamvals *pvals) inst->ratelimitBurst = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) { inst->ratelimitInterval = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "rcvbufsize")) { + inst->rcvbuf = (int) pvals[i].val.d.n; } else { dbgprintf("imudp: program error, non-handled " "param '%s'\n", inppblk.descr[i].name); @@ -772,6 +909,8 @@ CODESTARTbeginCnfLoad pModConf->pConf = pConf; /* init our settings */ loadModConf->configSetViaV2Method = 0; + loadModConf->wrkrMax = 1; /* conservative, but least msg reordering */ + loadModConf->batchSize = BATCH_SIZE_DFLT; loadModConf->iTimeRequery = TIME_REQUERY_DFLT; loadModConf->iSchedPrio = SCHED_PRIO_UNSET; loadModConf->pszSchedPolicy = NULL; @@ -788,6 +927,7 @@ ENDbeginCnfLoad BEGINsetModCnf struct cnfparamvals *pvals = NULL; int i; + int wrkrMax; CODESTARTsetModCnf pvals = nvlstGetParams(lst, &modpblk, NULL); if(pvals == NULL) { @@ -806,10 +946,22 @@ CODESTARTsetModCnf continue; if(!strcmp(modpblk.descr[i].name, "timerequery")) { loadModConf->iTimeRequery = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "batchsize")) { + loadModConf->batchSize = (int) pvals[i].val.d.n; } else if(!strcmp(modpblk.descr[i].name, "schedulingpriority")) { loadModConf->iSchedPrio = (int) pvals[i].val.d.n; } else if(!strcmp(modpblk.descr[i].name, "schedulingpolicy")) { loadModConf->pszSchedPolicy = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(modpblk.descr[i].name, "threads")) { + wrkrMax = (int) pvals[i].val.d.n; + if(wrkrMax > MAX_WRKR_THREADS) { + errmsg.LogError(0, RS_RET_PARAM_ERROR, "imudp: configured for %d" + "worker threads, but maximum permitted is %d", + wrkrMax, MAX_WRKR_THREADS); + loadModConf->wrkrMax = MAX_WRKR_THREADS; + } else { + loadModConf->wrkrMax = wrkrMax; + } } else { dbgprintf("imudp: program error, non-handled " "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); @@ -881,10 +1033,24 @@ ENDactivateCnfPrePrivDrop BEGINactivateCnf + int i; + int lenRcvBuf; CODESTARTactivateCnf /* caching various settings */ iMaxLine = glbl.GetMaxLine(); - CHKmalloc(pRcvBuf = MALLOC((iMaxLine + 1) * sizeof(char))); + lenRcvBuf = (iMaxLine + 1) * sizeof(char); +# ifdef HAVE_RECVMMSG + lenRcvBuf *= runModConf->batchSize; +# endif + for(i = 0 ; i < runModConf->wrkrMax ; ++i) { +# ifdef HAVE_RECVMMSG + CHKmalloc(wrkrInfo[i].recvmsg_iov = MALLOC(runModConf->batchSize * sizeof(struct iovec))); + CHKmalloc(wrkrInfo[i].recvmsg_mmh = MALLOC(runModConf->batchSize * sizeof(struct mmsghdr))); + CHKmalloc(wrkrInfo[i].frominet = MALLOC(runModConf->batchSize * sizeof(struct sockaddr_storage))); +# endif + CHKmalloc(wrkrInfo[i].pRcvBuf = MALLOC(lenRcvBuf)); + wrkrInfo[i].id = i; + } finalize_it: ENDactivateCnf @@ -896,20 +1062,34 @@ CODESTARTfreeCnf free(inst->pszBindPort); free(inst->pszBindAddr); free(inst->inputname); + free(inst->dfltTZ); del = inst; inst = inst->next; free(del); } ENDfreeCnf -/* This function is called to gather input. - * Note that sock must be non-NULL because otherwise we would not have - * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 - */ -BEGINrunInput -CODESTARTrunInput + +static void * +wrkr(void *myself) +{ + struct wrkrInfo_s *pWrkr = (struct wrkrInfo_s*) myself; +# if HAVE_PRCTL && defined PR_SET_NAME + uchar *pszDbgHdr; +# endif + uchar thrdName[32]; + + snprintf((char*)thrdName, sizeof(thrdName), "imudp(w%d)", pWrkr->id); +# if HAVE_PRCTL && defined PR_SET_NAME + /* set thread name - we ignore if the call fails, has no harsh consequences... */ + if(prctl(PR_SET_NAME, thrdName, 0, 0, 0) != 0) { + DBGPRINTF("prctl failed, not setting thread name for '%s'\n", thrdName); + } +# endif + dbgOutputTID((char*)thrdName); + /* Note well: the setting of scheduling parameters will not work - * when we dropped privileges (if the user is not sufficently + * when we dropped privileges (if the user is not sufficiently * privileged, of course). Howerver, we can't change the * scheduling params in PrePrivDrop(), as at that point our thread * is not yet created. So at least as an interim solution, we do @@ -917,7 +1097,51 @@ CODESTARTrunInput * privileges within the same instance. */ setSchedParams(runModConf); - iRet = rcvMainLoop(pThrd); + + /* support statistics gathering */ + statsobj.Construct(&(pWrkr->stats)); + statsobj.SetName(pWrkr->stats, thrdName); + STATSCOUNTER_INIT(pWrkr->ctrCall_recvmmsg, pWrkr->mutCtrCall_recvmmsg); + statsobj.AddCounter(pWrkr->stats, UCHAR_CONSTANT("called.recvmmsg"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkr->ctrCall_recvmmsg)); + STATSCOUNTER_INIT(pWrkr->ctrCall_recvmsg, pWrkr->mutCtrCall_recvmsg); + statsobj.AddCounter(pWrkr->stats, UCHAR_CONSTANT("called.recvmsg"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkr->ctrCall_recvmsg)); + STATSCOUNTER_INIT(pWrkr->ctrMsgsRcvd, pWrkr->mutCtrMsgsRcvd); + statsobj.AddCounter(pWrkr->stats, UCHAR_CONSTANT("msgs.received"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkr->ctrMsgsRcvd)); + statsobj.ConstructFinalize(pWrkr->stats); + + rcvMainLoop(pWrkr); + + /* cleanup */ + return NULL; +} + +/* This function is called to gather input. + * In essence, it just starts the pool of workers. To save resources, + * we run one of the workers on our own thread -- otherwise that thread would + * just idle around and wait for the workers to finish. + */ +BEGINrunInput + int i; + pthread_attr_t wrkrThrdAttr; +CODESTARTrunInput + pthread_attr_init(&wrkrThrdAttr); + pthread_attr_setstacksize(&wrkrThrdAttr, 4096*1024); + for(i = 0 ; i < runModConf->wrkrMax - 1 ; ++i) { + wrkrInfo[i].pThrd = pThrd; + pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i])); + } + pthread_attr_destroy(&wrkrThrdAttr); + + wrkrInfo[i].pThrd = pThrd; + wrkrInfo[i].id = i; + wrkr(&wrkrInfo[i]); + + for(i = 0 ; i < runModConf->wrkrMax - 1 ; ++i) { + pthread_join(wrkrInfo[i].tid, NULL); + } ENDrunInput @@ -931,6 +1155,7 @@ ENDwillRun BEGINafterRun struct lstn_s *lstn, *lstnDel; + int i; CODESTARTafterRun /* do cleanup here */ net.clearAllowedSenders((uchar*)"UDP"); @@ -944,9 +1169,13 @@ CODESTARTafterRun free(lstnDel); } lcnfRoot = lcnfLast = NULL; - if(pRcvBuf != NULL) { - free(pRcvBuf); - pRcvBuf = NULL; + for(i = 0 ; i < runModConf->wrkrMax ; ++i) { +# ifdef HAVE_RECVMMSG + free(wrkrInfo[i].recvmsg_iov); + free(wrkrInfo[i].recvmsg_mmh); + free(wrkrInfo[i].frominet); +# endif + free(wrkrInfo[i].pRcvBuf); } ENDafterRun @@ -1007,6 +1236,11 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(ruleset, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); + DBGPRINTF("imudp: version %s initializing\n", VERSION); +# ifdef HAVE_RECVMMSG + DBGPRINTF("imdup: support for recvmmsg() present\n"); +# endif + /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputudpserverbindruleset", 0, eCmdHdlrGetWord, NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); |