diff options
Diffstat (limited to 'runtime/wti.c')
-rw-r--r-- | runtime/wti.c | 127 |
1 files changed, 102 insertions, 25 deletions
diff --git a/runtime/wti.c b/runtime/wti.c index f91fb5a..3e0554a 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -44,12 +44,15 @@ #include "wti.h" #include "obj.h" #include "glbl.h" +#include "action.h" #include "atomic.h" /* static data */ DEFobjStaticHelpers DEFobjCurrIf(glbl) +pthread_key_t thrd_wti_key; + /* forward-definitions */ /* methods */ @@ -171,8 +174,9 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE CODESTARTobjDestruct(wti) /* actual destruction */ batchFree(&pThis->batch); + free(pThis->actWrkrInfo); + pthread_cond_destroy(&pThis->pcondBusy); DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning); - free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -181,6 +185,7 @@ ENDobjDestruct(wti) */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning); + pthread_cond_init(&pThis->pcondBusy, NULL); ENDobjConstruct(wti) @@ -195,11 +200,20 @@ wtiConstructFinalize(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); - DBGPRINTF("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker instance data (for %d actions)\n", + wtiGetDbgHdr(pThis), iActionNbr); /* initialize our thread instance descriptor (no concurrency here) */ pThis->bIsRunning = RSFALSE; + /* must use calloc as we need zero-init */ + CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t))); + + if(pThis->pWtp == NULL) { + dbgprintf("wtiConstructFinalize: pWtp not set, this may be intentional\n"); + FINALIZE; + } + /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); CHKiRet(batchInit(&pThis->batch, iDeqBatchSize)); @@ -249,10 +263,10 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) if(pThis->bAlwaysRunning) { /* never shut down any started worker */ - d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); + d_pthread_cond_wait(&pThis->pcondBusy, pWtp->pmutUsr); } else { timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ - if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { + if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, &t) != 0) { DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } @@ -270,71 +284,100 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) */ #pragma GCC diagnostic ignored "-Wempty-body" rsRetVal -wtiWorker(wti_t *pThis) +wtiWorker(wti_t *__restrict__ const pThis) { - wtp_t *pWtp; /* our worker thread pool */ + wtp_t *__restrict__ const pWtp = pThis->pWtp; /* our worker thread pool -- shortcut */ + const action_t *__restrict__ pAction; int bInactivityTOOccured = 0; rsRetVal localRet; rsRetVal terminateRet; + actWrkrInfo_t *__restrict__ wrkrInfo; int iCancelStateSave; + int i, j, k; DEFiRet; - ISOBJ_TYPE_assert(pThis, wti); - pWtp = pThis->pWtp; /* shortcut */ - ISOBJ_TYPE_assert(pWtp, wtp); - dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - + DBGPRINTF("wti %p: worker starting\n", pThis); /* now we have our identity, on to real processing */ - while(1) { /* loop will be broken below - need to do mutex locks */ + + /* note: in this loop, the mutex is "never" unlocked. Of course, + * this is not true: it actually is unlocked when the actual processing + * is done, as part of pWtp->pfDoWork() processing. Note that this + * function is required to re-lock it when done. We cannot do the + * lock/unlock here ourselfs, as pfDoWork() needs to access queue + * structures itself. + * The same goes for pfRateLimiter(). While we could unlock/lock when + * we call it, in practice the function is often called without any + * ratelimiting actually done. Only the rate limiter itself knows + * that. As such, it needs to bear the burden of doing the locking + * when required. -- rgerhards, 2013-11-20 + */ + d_pthread_mutex_lock(pWtp->pmutUsr); + while(1) { /* loop will be broken below */ if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */ pWtp->pfRateLimiter(pWtp->pUsr); } - d_pthread_mutex_lock(pWtp->pmutUsr); - /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); if(terminateRet == RS_RET_TERMINATE_NOW) { /* we now need to free the old batch */ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); - DBGOPRINT((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", - localRet); - d_pthread_mutex_unlock(pWtp->pmutUsr); + DBGOPRINT((obj_t*) pThis, "terminating worker because of " + "TERMINATE_NOW mode, del iRet %d\n", localRet); break; } /* try to execute and process whatever we have */ - /* Note that this function releases and re-aquires the mutex. The returned - * information on idle state must be processed before releasing the mutex again. - */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) { - d_pthread_mutex_unlock(pWtp->pmutUsr); break; /* end of loop */ } else if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { - d_pthread_mutex_unlock(pWtp->pmutUsr); DBGOPRINT((obj_t*) pThis, "terminating worker terminateRet=%d, bInactivityTOOccured=%d\n", terminateRet, bInactivityTOOccured); break; /* end of loop */ } doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); - d_pthread_mutex_unlock(pWtp->pmutUsr); continue; /* request next iteration */ } - d_pthread_mutex_unlock(pWtp->pmutUsr); - bInactivityTOOccured = 0; /* reset for next run */ } + d_pthread_mutex_unlock(pWtp->pmutUsr); + + DBGPRINTF("DDDD: wti %p: worker cleanup action instances\n", pThis); + for(i = 0 ; i < iActionNbr ; ++i) { + wrkrInfo = &(pThis->actWrkrInfo[i]); + dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, wrkrInfo->actWrkrData); + if(wrkrInfo->actWrkrData != NULL) { + pAction = wrkrInfo->pAction; + pAction->pMod->mod.om.freeWrkrInstance(wrkrInfo->actWrkrData); + if(pAction->isTransactional) { + /* free iparam "cache" - we need to go through to max! */ + for(j = 0 ; j < wrkrInfo->p.tx.maxIParams ; ++j) { + for(k = 0 ; k < pAction->iNumTpls ; ++k) { + free(actParam(wrkrInfo->p.tx.iparams, + pAction->iNumTpls, j, k).param); + } + } + free(wrkrInfo->p.tx.iparams); + wrkrInfo->p.tx.iparams = NULL; + wrkrInfo->p.tx.currIParam = 0; + wrkrInfo->p.tx.maxIParams = 0; + } + wrkrInfo->actWrkrData = NULL; /* re-init for next activation */ + } + } + /* indicate termination */ pthread_cleanup_pop(0); /* remove cleanup handler */ pthread_setcancelstate(iCancelStateSave, NULL); + dbgprintf("wti %p: worker exiting\n", pThis); RETiRet; } @@ -374,6 +417,33 @@ finalize_it: } +/* This function returns (and creates if necessary) a dummy wti suitable + * for use by the rule engine. It is intended to be used for direct-mode + * main queues (folks, don't do that!). Once created, data is stored in + * thread-specific storage. + * Note: we do NOT do error checking -- if this functions fails, all the + * rest will fail as well... (also, it will only fail under OOM, so...). + * Memleak: we leak pWti's when run in direct mode. However, this is only + * a cosmetic leak, as we need them until all inputs are terminated, + * what means essentially until rsyslog itself is terminated. So we + * don't care -- it's just not nice in valgrind, but that's it. + */ +wti_t * +wtiGetDummy(void) +{ + wti_t *pWti; + + pWti = (wti_t*) pthread_getspecific(thrd_wti_key); + if(pWti == NULL) { + wtiConstruct(&pWti); + wtiConstructFinalize(pWti); + if(pthread_setspecific(thrd_wti_key, pWti) != 0) { + DBGPRINTF("wtiGetDummy: error setspecific thrd_wti_key\n"); + } + } + return pWti; +} + /* dummy */ rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } @@ -383,6 +453,7 @@ BEGINObjClassExit(wti, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */ CODESTARTObjClassExit(nsdsel_gtls) /* release objects we no longer need */ objRelease(glbl, CORE_COMPONENT); + pthread_key_delete(thrd_wti_key); ENDObjClassExit(wti) @@ -391,8 +462,14 @@ ENDObjClassExit(wti) * rgerhards, 2008-01-09 */ BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */ + int r; /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); + r = pthread_key_create(&thrd_wti_key, NULL); + if(r != 0) { + dbgprintf("wti.c: pthread_key_create failed\n"); + iRet = RS_RET_ERR; + } ENDObjClassInit(wti) /* vi:set ai: |