summaryrefslogtreecommitdiff
path: root/runtime/wti.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wti.c')
-rw-r--r--runtime/wti.c127
1 files changed, 102 insertions, 25 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index f91fb5a..3e0554a 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -44,12 +44,15 @@
#include "wti.h"
#include "obj.h"
#include "glbl.h"
+#include "action.h"
#include "atomic.h"
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(glbl)
+pthread_key_t thrd_wti_key;
+
/* forward-definitions */
/* methods */
@@ -171,8 +174,9 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE
CODESTARTobjDestruct(wti)
/* actual destruction */
batchFree(&pThis->batch);
+ free(pThis->actWrkrInfo);
+ pthread_cond_destroy(&pThis->pcondBusy);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
-
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -181,6 +185,7 @@ ENDobjDestruct(wti)
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
+ pthread_cond_init(&pThis->pcondBusy, NULL);
ENDobjConstruct(wti)
@@ -195,11 +200,20 @@ wtiConstructFinalize(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
- DBGPRINTF("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
+ DBGPRINTF("%s: finalizing construction of worker instance data (for %d actions)\n",
+ wtiGetDbgHdr(pThis), iActionNbr);
/* initialize our thread instance descriptor (no concurrency here) */
pThis->bIsRunning = RSFALSE;
+ /* must use calloc as we need zero-init */
+ CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t)));
+
+ if(pThis->pWtp == NULL) {
+ dbgprintf("wtiConstructFinalize: pWtp not set, this may be intentional\n");
+ FINALIZE;
+ }
+
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
CHKiRet(batchInit(&pThis->batch, iDeqBatchSize));
@@ -249,10 +263,10 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
if(pThis->bAlwaysRunning) {
/* never shut down any started worker */
- d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
+ d_pthread_cond_wait(&pThis->pcondBusy, pWtp->pmutUsr);
} else {
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
- if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
+ if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, &t) != 0) {
DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
*pbInactivityTOOccured = 1; /* indicate we had a timeout */
}
@@ -270,71 +284,100 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
*/
#pragma GCC diagnostic ignored "-Wempty-body"
rsRetVal
-wtiWorker(wti_t *pThis)
+wtiWorker(wti_t *__restrict__ const pThis)
{
- wtp_t *pWtp; /* our worker thread pool */
+ wtp_t *__restrict__ const pWtp = pThis->pWtp; /* our worker thread pool -- shortcut */
+ const action_t *__restrict__ pAction;
int bInactivityTOOccured = 0;
rsRetVal localRet;
rsRetVal terminateRet;
+ actWrkrInfo_t *__restrict__ wrkrInfo;
int iCancelStateSave;
+ int i, j, k;
DEFiRet;
- ISOBJ_TYPE_assert(pThis, wti);
- pWtp = pThis->pWtp; /* shortcut */
- ISOBJ_TYPE_assert(pWtp, wtp);
-
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
-
+ DBGPRINTF("wti %p: worker starting\n", pThis);
/* now we have our identity, on to real processing */
- while(1) { /* loop will be broken below - need to do mutex locks */
+
+ /* note: in this loop, the mutex is "never" unlocked. Of course,
+ * this is not true: it actually is unlocked when the actual processing
+ * is done, as part of pWtp->pfDoWork() processing. Note that this
+ * function is required to re-lock it when done. We cannot do the
+ * lock/unlock here ourselfs, as pfDoWork() needs to access queue
+ * structures itself.
+ * The same goes for pfRateLimiter(). While we could unlock/lock when
+ * we call it, in practice the function is often called without any
+ * ratelimiting actually done. Only the rate limiter itself knows
+ * that. As such, it needs to bear the burden of doing the locking
+ * when required. -- rgerhards, 2013-11-20
+ */
+ d_pthread_mutex_lock(pWtp->pmutUsr);
+ while(1) { /* loop will be broken below */
if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */
pWtp->pfRateLimiter(pWtp->pUsr);
}
- d_pthread_mutex_lock(pWtp->pmutUsr);
-
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
if(terminateRet == RS_RET_TERMINATE_NOW) {
/* we now need to free the old batch */
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
- DBGOPRINT((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
- localRet);
- d_pthread_mutex_unlock(pWtp->pmutUsr);
+ DBGOPRINT((obj_t*) pThis, "terminating worker because of "
+ "TERMINATE_NOW mode, del iRet %d\n", localRet);
break;
}
/* try to execute and process whatever we have */
- /* Note that this function releases and re-aquires the mutex. The returned
- * information on idle state must be processed before releasing the mutex again.
- */
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) {
- d_pthread_mutex_unlock(pWtp->pmutUsr);
break; /* end of loop */
} else if(localRet == RS_RET_IDLE) {
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
- d_pthread_mutex_unlock(pWtp->pmutUsr);
DBGOPRINT((obj_t*) pThis, "terminating worker terminateRet=%d, bInactivityTOOccured=%d\n",
terminateRet, bInactivityTOOccured);
break; /* end of loop */
}
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
- d_pthread_mutex_unlock(pWtp->pmutUsr);
continue; /* request next iteration */
}
- d_pthread_mutex_unlock(pWtp->pmutUsr);
-
bInactivityTOOccured = 0; /* reset for next run */
}
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
+
+ DBGPRINTF("DDDD: wti %p: worker cleanup action instances\n", pThis);
+ for(i = 0 ; i < iActionNbr ; ++i) {
+ wrkrInfo = &(pThis->actWrkrInfo[i]);
+ dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, wrkrInfo->actWrkrData);
+ if(wrkrInfo->actWrkrData != NULL) {
+ pAction = wrkrInfo->pAction;
+ pAction->pMod->mod.om.freeWrkrInstance(wrkrInfo->actWrkrData);
+ if(pAction->isTransactional) {
+ /* free iparam "cache" - we need to go through to max! */
+ for(j = 0 ; j < wrkrInfo->p.tx.maxIParams ; ++j) {
+ for(k = 0 ; k < pAction->iNumTpls ; ++k) {
+ free(actParam(wrkrInfo->p.tx.iparams,
+ pAction->iNumTpls, j, k).param);
+ }
+ }
+ free(wrkrInfo->p.tx.iparams);
+ wrkrInfo->p.tx.iparams = NULL;
+ wrkrInfo->p.tx.currIParam = 0;
+ wrkrInfo->p.tx.maxIParams = 0;
+ }
+ wrkrInfo->actWrkrData = NULL; /* re-init for next activation */
+ }
+ }
+
/* indicate termination */
pthread_cleanup_pop(0); /* remove cleanup handler */
pthread_setcancelstate(iCancelStateSave, NULL);
+ dbgprintf("wti %p: worker exiting\n", pThis);
RETiRet;
}
@@ -374,6 +417,33 @@ finalize_it:
}
+/* This function returns (and creates if necessary) a dummy wti suitable
+ * for use by the rule engine. It is intended to be used for direct-mode
+ * main queues (folks, don't do that!). Once created, data is stored in
+ * thread-specific storage.
+ * Note: we do NOT do error checking -- if this functions fails, all the
+ * rest will fail as well... (also, it will only fail under OOM, so...).
+ * Memleak: we leak pWti's when run in direct mode. However, this is only
+ * a cosmetic leak, as we need them until all inputs are terminated,
+ * what means essentially until rsyslog itself is terminated. So we
+ * don't care -- it's just not nice in valgrind, but that's it.
+ */
+wti_t *
+wtiGetDummy(void)
+{
+ wti_t *pWti;
+
+ pWti = (wti_t*) pthread_getspecific(thrd_wti_key);
+ if(pWti == NULL) {
+ wtiConstruct(&pWti);
+ wtiConstructFinalize(pWti);
+ if(pthread_setspecific(thrd_wti_key, pWti) != 0) {
+ DBGPRINTF("wtiGetDummy: error setspecific thrd_wti_key\n");
+ }
+ }
+ return pWti;
+}
+
/* dummy */
rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
@@ -383,6 +453,7 @@ BEGINObjClassExit(wti, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */
CODESTARTObjClassExit(nsdsel_gtls)
/* release objects we no longer need */
objRelease(glbl, CORE_COMPONENT);
+ pthread_key_delete(thrd_wti_key);
ENDObjClassExit(wti)
@@ -391,8 +462,14 @@ ENDObjClassExit(wti)
* rgerhards, 2008-01-09
*/
BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */
+ int r;
/* request objects we use */
CHKiRet(objUse(glbl, CORE_COMPONENT));
+ r = pthread_key_create(&thrd_wti_key, NULL);
+ if(r != 0) {
+ dbgprintf("wti.c: pthread_key_create failed\n");
+ iRet = RS_RET_ERR;
+ }
ENDObjClassInit(wti)
/* vi:set ai: