diff options
author | Michael Biebl <biebl@debian.org> | 2012-10-10 06:45:13 +0200 |
---|---|---|
committer | Michael Biebl <biebl@debian.org> | 2012-10-10 06:45:13 +0200 |
commit | 5b0595cc98c012dfa5ac0f214dbc543a11c982cb (patch) | |
tree | 21d6b1b3cbad0c7609a3d3785332a5ffd2a8dee1 /tcpsrv.c | |
parent | e1ab13c77be9fbe3e2e5dfe3357fcd9f991b71b5 (diff) | |
download | rsyslog-5b0595cc98c012dfa5ac0f214dbc543a11c982cb.tar.gz |
Imported Upstream version 7.1.9upstream/7.1.9
Diffstat (limited to 'tcpsrv.c')
-rw-r--r-- | tcpsrv.c | 343 |
1 files changed, 312 insertions, 31 deletions
@@ -39,8 +39,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #include "config.h" +#include <stdio.h> #include <stdlib.h> #include <assert.h> #include <string.h> @@ -50,6 +50,7 @@ #include <ctype.h> #include <netinet/in.h> #include <netdb.h> +#include <pthread.h> #include <sys/types.h> #include <sys/socket.h> #if HAVE_FCNTL_H @@ -73,6 +74,7 @@ #include "ruleset.h" #include "unicode-helper.h" + MODULE_TYPE_LIB MODULE_TYPE_NOKEEP @@ -93,24 +95,47 @@ DEFobjCurrIf(netstrm) DEFobjCurrIf(nssel) DEFobjCurrIf(nspoll) DEFobjCurrIf(prop) +DEFobjCurrIf(statsobj) +static void startWorkerPool(void); + +/* The following structure controls the worker threads. Global data is + * needed for their access. + */ +static struct wrkrInfo_s { + pthread_t tid; /* the worker's thread ID */ + pthread_cond_t run; + int idx; + tcpsrv_t *pSrv; /* pSrv == NULL -> idle */ + nspoll_t *pPoll; + void *pUsr; + sbool enabled; + long long unsigned numCalled; /* how often was this called */ +} wrkrInfo[4]; +static sbool bWrkrRunning; /* are the worker threads running? */ +static pthread_mutex_t wrkrMut; +static pthread_cond_t wrkrIdle; +static int wrkrMax = 4; +static int wrkrRunning; /* add new listener port to listener port list * rgerhards, 2009-05-21 */ static inline rsRetVal -addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort) +addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort, int bSuppOctetFram) { tcpLstnPortList_t *pEntry; + uchar statname[64]; DEFiRet; ISOBJ_TYPE_assert(pThis, tcpsrv); /* create entry */ CHKmalloc(pEntry = MALLOC(sizeof(tcpLstnPortList_t))); - pEntry->pszPort = pszPort; + CHKmalloc(pEntry->pszPort = ustrdup(pszPort)); pEntry->pSrv = pThis; pEntry->pRuleset = pThis->pRuleset; + pEntry->bSuppOctetFram = bSuppOctetFram; /* we need to create a property */ CHKiRet(prop.Construct(&pEntry->pInputName)); @@ -121,6 +146,16 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort) pEntry->pNext = pThis->pLstnPorts; pThis->pLstnPorts = pEntry; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(pEntry->stats))); + snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(pEntry->stats, statname)); + STATSCOUNTER_INIT(pEntry->ctrSubmit, pEntry->mutCtrSubmit); + CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(pEntry->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(pEntry->stats)); + finalize_it: RETiRet; } @@ -131,7 +166,7 @@ finalize_it: * rgerhards, 2008-03-20 */ static rsRetVal -configureTCPListen(tcpsrv_t *pThis, uchar *pszPort) +configureTCPListen(tcpsrv_t *pThis, uchar *pszPort, int bSuppOctetFram) { int i; uchar *pPort = pszPort; @@ -147,7 +182,7 @@ configureTCPListen(tcpsrv_t *pThis, uchar *pszPort) } if(i >= 0 && i <= 65535) { - CHKiRet(addNewLstnPort(pThis, pszPort)); + CHKiRet(addNewLstnPort(pThis, pszPort, bSuppOctetFram)); } else { errmsg.LogError(0, NO_ERRCODE, "Invalid TCP listen port %s - ignored.\n", pszPort); } @@ -399,6 +434,10 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED); } + if(pThis->bUseKeepAlive) { + CHKiRet(netstrm.EnableKeepAlive(pNewStrm)); + } + /* we found a free spot and can construct our session object */ CHKiRet(tcps_sess.Construct(&pSess)); CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis)); @@ -543,6 +582,137 @@ finalize_it: RETiRet; } +/* process a single workset item + */ +static inline rsRetVal +processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) +{ + tcps_sess_t *pNewSess = NULL; + DEFiRet; + + DBGPRINTF("tcpsrv: processing item %d, pUsr %p, bAbortConn\n", idx, pUsr); + if(pUsr == pThis->ppLstn) { + DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); + iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); + if(iRet == RS_RET_OK) { + if(pPoll != NULL) { + CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + } + DBGPRINTF("New session created with NSD %p.\n", pNewSess); + } else { + DBGPRINTF("tcpsrv: error %d during accept\n", iRet); + } + } else { + pNewSess = (tcps_sess_t*) pUsr; + doReceive(pThis, &pNewSess, pPoll); + if(pPoll == NULL && pNewSess == NULL) { + pThis->pSessions[idx] = NULL; + } + } + +finalize_it: + RETiRet; +} + + +/* worker to process incoming requests + */ +static void * +wrkr(void *myself) +{ + struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself; + + pthread_mutex_lock(&wrkrMut); + while(1) { + while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) { + pthread_cond_wait(&me->run, &wrkrMut); + } + if(glbl.GetGlobalInputTermState() == 1) { + --wrkrRunning; + break; + } + pthread_mutex_unlock(&wrkrMut); + + ++me->numCalled; + processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr); + + pthread_mutex_lock(&wrkrMut); + me->pSrv = NULL; /* indicate we are free again */ + --wrkrRunning; + pthread_cond_signal(&wrkrIdle); + } + me->enabled = 0; /* indicate we are no longer available */ + pthread_mutex_unlock(&wrkrMut); + + return NULL; +} + + +/* Process a workset, that is handle io. We become activated + * from either select or epoll handler. We split the workload + * out to a pool of threads, but try to avoid context switches + * as much as possible. + */ +static rsRetVal +processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) +{ + int i; + int origEntries = numEntries; + DEFiRet; + + DBGPRINTF("tcpsrv: ready to process %d event entries\n", numEntries); + + while(numEntries > 0) { + if(glbl.GetGlobalInputTermState() == 1) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + if(numEntries == 1) { + /* process self, save context switch */ + processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr); + } else { + pthread_mutex_lock(&wrkrMut); + /* check if there is a free worker */ + for(i = 0 ; (i < wrkrMax) && ((wrkrInfo[i].pSrv != NULL) || (wrkrInfo[i].enabled == 0)) ; ++i) + /*do search*/; + if(i < wrkrMax) { + /* worker free -> use it! */ + wrkrInfo[i].pSrv = pThis; + wrkrInfo[i].pPoll = pPoll; + wrkrInfo[i].idx = workset[numEntries -1].id; + wrkrInfo[i].pUsr = workset[numEntries -1].pUsr; + /* Note: we must increment wrkrRunning HERE and not inside the worker's + * code. This is because a worker may actually never start, and thus + * increment wrkrRunning, before we finish and check the running worker + * count. We can only avoid this by incrementing it here. + */ + ++wrkrRunning; + pthread_cond_signal(&wrkrInfo[i].run); + pthread_mutex_unlock(&wrkrMut); + } else { + pthread_mutex_unlock(&wrkrMut); + /* no free worker, so we process this one ourselfs */ + processWorksetItem(pThis, pPoll, workset[numEntries-1].id, + workset[numEntries-1].pUsr); + } + } + --numEntries; + } + + if(origEntries > 1) { + /* we now need to wait until all workers finish. This is because the + * rest of this module can not handle the concurrency introduced + * by workers running during the epoll call. + */ + pthread_mutex_lock(&wrkrMut); + while(wrkrRunning > 0) { + pthread_cond_wait(&wrkrIdle, &wrkrMut); + } + pthread_mutex_unlock(&wrkrMut); + } + +finalize_it: + RETiRet; +} + /* This function is called to gather input. * This variant here is only used if we need to work with a netstream driver @@ -550,14 +720,14 @@ finalize_it: */ #pragma GCC diagnostic ignored "-Wempty-body" static inline rsRetVal -RunSelect(tcpsrv_t *pThis) +RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset) { DEFiRet; int nfds; int i; + int iWorkset; int iTCPSess; int bIsReady; - tcps_sess_t *pNewSess; nssel_t *pSel = NULL; rsRetVal localRet; @@ -592,13 +762,21 @@ RunSelect(tcpsrv_t *pThis) if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ + iWorkset = 0; for(i = 0 ; i < pThis->iLstnCurr ; ++i) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); - SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); + workset[iWorkset].id = i; + workset[iWorkset].pUsr = (void*) pThis->ppLstn; /* this is a flag to indicate listen sock */ + ++iWorkset; + if(iWorkset >= (int) sizeWorkset) { + processWorkset(pThis, NULL, iWorkset, workset); + iWorkset = 0; + } + //DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); + //SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); --nfds; /* indicate we have processed one */ } } @@ -610,11 +788,22 @@ RunSelect(tcpsrv_t *pThis) ABORT_FINALIZE(RS_RET_FORCE_TERM); localRet = nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds); if(bIsReady || localRet != RS_RET_OK) { - doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); + workset[iWorkset].id = iTCPSess; + workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess]; + ++iWorkset; + if(iWorkset >= (int) sizeWorkset) { + processWorkset(pThis, NULL, iWorkset, workset); + iWorkset = 0; + } --nfds; /* indicate we have processed one */ } iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); } + + if(iWorkset > 0) + processWorkset(pThis, NULL, iWorkset, workset); + + /* we need to copy back close descriptors */ CHKiRet(nssel.Destruct(&pSel)); finalize_it: /* this is a very special case - this time only we do not exit the function, * because that would not help us either. So we simply retry it. Let's see @@ -645,13 +834,23 @@ Run(tcpsrv_t *pThis) { DEFiRet; int i; - tcps_sess_t *pNewSess; + nsd_epworkset_t workset[128]; /* 128 is currently fixed num of concurrent requests */ + int numEntries; nspoll_t *pPoll = NULL; - void *pUsr; rsRetVal localRet; ISOBJ_TYPE_assert(pThis, tcpsrv); + /* check if we need to start the worker pool. Once it is running, all is + * well. Shutdown is done on modExit. + */ + d_pthread_mutex_lock(&wrkrMut); + if(!bWrkrRunning) { + bWrkrRunning = 1; + startWorkerPool(); + } + d_pthread_mutex_unlock(&wrkrMut); + /* this is an endless loop - it is terminated by the framework canelling * this thread. Thus, we also need to instantiate a cancel cleanup handler * to prevent us from leaking anything. -- rgerhards, 20080-04-24 @@ -662,25 +861,26 @@ Run(tcpsrv_t *pThis) } if(localRet != RS_RET_OK) { /* fall back to select */ - dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet); - iRet = RunSelect(pThis); + DBGPRINTF("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet); + iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t)); FINALIZE; } - dbgprintf("tcpsrv uses epoll() interface, nsdpol driver found\n"); + DBGPRINTF("tcpsrv uses epoll() interface, nsdpoll driver found\n"); /* flag that we are in epoll mode */ - pThis->bUsingEPoll = TRUE; + pThis->bUsingEPoll = RSTRUE; /* Add the TCP listen sockets to the list of sockets to monitor */ for(i = 0 ; i < pThis->iLstnCurr ; ++i) { - dbgprintf("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn); + DBGPRINTF("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn); CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD)); - dbgprintf("Added listener %d\n", i); + DBGPRINTF("Added listener %d\n", i); } while(1) { - localRet = nspoll.Wait(pPoll, -1, &i, &pUsr); + numEntries = sizeof(workset)/sizeof(nsd_epworkset_t); + localRet = nspoll.Wait(pPoll, -1, &numEntries, workset); if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ @@ -691,17 +891,7 @@ Run(tcpsrv_t *pThis) if(localRet != RS_RET_OK) continue; - dbgprintf("poll returned with i %d, pUsr %p\n", i, pUsr); - - if(pUsr == pThis->ppLstn) { - DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); - SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); - CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); - DBGPRINTF("New session created with NSD %p.\n", pNewSess); - } else { - pNewSess = (tcps_sess_t*) pUsr; - doReceive(pThis, &pNewSess, pPoll); - } + processWorkset(pThis, pPoll, numEntries, workset); } /* remove the tcp listen sockets from the epoll set */ @@ -723,6 +913,7 @@ BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macr pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; pThis->bDisableLFDelim = 0; pThis->OnMsgReceive = NULL; + pThis->bUseFlowControl = 1; ENDobjConstruct(tcpsrv) @@ -869,6 +1060,15 @@ SetUsrP(tcpsrv_t *pThis, void *pUsr) } static rsRetVal +SetKeepAlive(tcpsrv_t *pThis, int iVal) +{ + DEFiRet; + DBGPRINTF("tcpsrv: keep-alive set to %d\n", iVal); + pThis->bUseKeepAlive = iVal; + RETiRet; +} + +static rsRetVal SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int)) { DEFiRet; @@ -998,6 +1198,18 @@ SetLstnMax(tcpsrv_t *pThis, int iMax) } +/* set if flow control shall be supported + */ +static rsRetVal +SetUseFlowControl(tcpsrv_t *pThis, int bUseFlowControl) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, tcpsrv); + pThis->bUseFlowControl = bUseFlowControl; + RETiRet; +} + + /* set max number of sessions * this must be called before ConstructFinalize, or it will have no effect! * rgerhards, 2009-04-09 @@ -1035,11 +1247,13 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->create_tcp_socket = create_tcp_socket; pIf->Run = Run; + pIf->SetKeepAlive = SetKeepAlive; pIf->SetUsrP = SetUsrP; pIf->SetInputName = SetInputName; pIf->SetAddtlFrameDelim = SetAddtlFrameDelim; pIf->SetbDisableLFDelim = SetbDisableLFDelim; pIf->SetSessMax = SetSessMax; + pIf->SetUseFlowControl = SetUseFlowControl; pIf->SetLstnMax = SetLstnMax; pIf->SetDrvrMode = SetDrvrMode; pIf->SetDrvrAuthMode = SetDrvrAuthMode; @@ -1071,6 +1285,7 @@ CODESTARTObjClassExit(tcpsrv) objRelease(tcps_sess, DONT_LOAD_LIB); objRelease(conf, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); objRelease(ruleset, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); @@ -1097,6 +1312,7 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE CHKiRet(objUse(conf, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); /* set our own handlers */ @@ -1105,11 +1321,62 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE ENDObjClassInit(tcpsrv) -/* --------------- here now comes the plumbing that makes as a library module --------------- */ +/* start worker threads + * Important: if we fork, this MUST be done AFTER forking + */ +static void +startWorkerPool(void) +{ + int i; + int r; + pthread_attr_t sessThrdAttr; + + wrkrRunning = 0; + pthread_cond_init(&wrkrIdle, NULL); + pthread_attr_init(&sessThrdAttr); + pthread_attr_setstacksize(&sessThrdAttr, 4096*1024); + for(i = 0 ; i < wrkrMax ; ++i) { + /* init worker info structure! */ + pthread_cond_init(&wrkrInfo[i].run, NULL); + wrkrInfo[i].pSrv = NULL; + wrkrInfo[i].numCalled = 0; + r = pthread_create(&wrkrInfo[i].tid, &sessThrdAttr, wrkr, &(wrkrInfo[i])); + if(r == 0) { + wrkrInfo[i].enabled = 1; + } else { + char errStr[1024]; + wrkrInfo[i].enabled = 0; + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, NO_ERRCODE, "tcpsrv error creating thread %d: " + "%s", i, errStr); + } + } + pthread_attr_destroy(&sessThrdAttr); +} +/* destroy worker pool structures and wait for workers to terminate + */ +static void +stopWorkerPool(void) +{ + int i; + for(i = 0 ; i < wrkrMax ; ++i) { + pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ + pthread_join(wrkrInfo[i].tid, NULL); + DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); + pthread_cond_destroy(&wrkrInfo[i].run); + } + pthread_cond_destroy(&wrkrIdle); + pthread_mutex_destroy(&wrkrMut); + +} + + +/* --------------- here now comes the plumbing that makes as a library module --------------- */ BEGINmodExit CODESTARTmodExit + stopWorkerPool(); /* de-init in reverse order! */ tcpsrvClassExit(); tcps_sessClassExit(); @@ -1125,6 +1392,20 @@ ENDqueryEtryPt BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ + /* we just init the worker mutex, but do not start the workers themselves. This is deferred + * to the first call of Run(). Reasons for this: + * 1. depending on load order, tcpsrv gets loaded during rsyslog startup BEFORE + * it forks, in which case the workers would be running in the then-killed parent, + * leading to a defuncnt child (we actually had this bug). + * 2. depending on circumstances, Run() would possibly never be called, in which case + * the worker threads would be totally useless. + * Note that in order to guarantee a non-racy worker start, we need to guard the + * startup sequence by a mutex, which is why we init it here (no problem with fork() + * in this case as the mutex is a pure-memory structure). + * rgerhards, 2012-05-18 + */ + pthread_mutex_init(&wrkrMut, NULL); + bWrkrRunning = 0; /* Initialize all classes that are in our module - this includes ourselfs */ CHKiRet(tcps_sessClassInit(pModInfo)); |