diff options
author | Michael Biebl <biebl@debian.org> | 2014-04-03 03:08:50 +0200 |
---|---|---|
committer | Michael Biebl <biebl@debian.org> | 2014-04-03 03:08:50 +0200 |
commit | 9374a46543e9c43c009f80def8c3b2506b0b377e (patch) | |
tree | 8853fd40ee8d55ff24304ff8a4421640f3493c58 /action.c | |
parent | 209e193f14ec562df5aad945f04cd88b227cc602 (diff) | |
download | rsyslog-9374a46543e9c43c009f80def8c3b2506b0b377e.tar.gz |
Imported Upstream version 8.2.0upstream/8.2.0
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 1168 |
1 files changed, 467 insertions, 701 deletions
@@ -13,18 +13,15 @@ * The different modes (and calling sequence) are: * * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval - * - doSubmitToActionQComplexBatch - * - helperSubmitToActionQComplexBatch - * - doActionCallAction + * - doSubmitToActionQComplex * handles mark message reduction, but in essence calls * - actionWriteToAction * - qqueueEnqObj * (now queue engine processing) - * if(pThis->bWriteAllMarkMsgs == RSFALSE) - this is the DEFAULT - * - doSubmitToActionQNotAllMarkBatch - * - doSubmitToActionQBatch (and from here like in the else case below!) + * if(pThis->bWriteAllMarkMsgs == RSFALSE) + * - doSubmitToActionQNotAllMark + * - doSubmitToActionQ (and from here like in the else case below!) * else - * - doSubmitToActionQBatch * - doSubmitToActionQ * - qqueueEnqObj * (now queue engine processing) @@ -36,9 +33,6 @@ * * After dequeue, processing is as follows: * - processBatchMain - * - processAction - * - submitBatch - * - tryDoAction * - ... * * MORE ON PROCESSING, QUEUES and FILTERING @@ -120,13 +114,12 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); -static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t * const pWti); +static rsRetVal doSubmitToActionQ(action_t * const pAction, wti_t * const pWti, msg_t*); +static rsRetVal doSubmitToActionQComplex(action_t * const pAction, wti_t * const pWti, msg_t*); +static rsRetVal doSubmitToActionQNotAllMark(action_t * const pAction, wti_t * const pWti, msg_t*); /* object static data (once for all instances) */ -/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ DEFobjCurrIf(obj) DEFobjCurrIf(datetime) DEFobjCurrIf(module) @@ -176,10 +169,9 @@ configSettings_t cs_save; /* our saved (scope!) config settings */ /* the counter below counts actions created. It is used to obtain unique IDs for the action. They * should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice * to have during one instance of an rsyslogd run. For example, I use them to name actions when there - * is no better name available. Note that I do NOT recover previous numbers on HUP - we simply keep - * counting. -- rgerhards, 2008-01-29 + * is no better name available. */ -static int iActionNbr = 0; +int iActionNbr = 0; int bActionReportSuspension = 1; int bActionReportSuspensionCont = 0; @@ -229,7 +221,7 @@ static struct cnfparamblk pblk = * a lot. So we simply return the system time. */ static inline time_t -getActNow(action_t *pThis) +getActNow(action_t * const pThis) { assert(pThis != NULL); if(pThis->tActNow == -1) { @@ -289,7 +281,7 @@ actionResetQueueParams(void) /* destructs an action descriptor object * rgerhards, 2007-08-01 */ -rsRetVal actionDestruct(action_t *pThis) +rsRetVal actionDestruct(action_t * const pThis) { DEFiRet; ASSERT(pThis != NULL); @@ -313,7 +305,6 @@ rsRetVal actionDestruct(action_t *pThis) pThis->pMod->freeInstance(pThis->pModData); pthread_mutex_destroy(&pThis->mutAction); - pthread_mutex_destroy(&pThis->mutActExec); d_free(pThis->pszName); d_free(pThis->ppTpl); @@ -323,6 +314,19 @@ finalize_it: } +/* Disable action, this means it will never again be usable + * until rsyslog is reloaded. Use only as a last resort, but + * depends on output module. + * rgerhards, 2007-08-02 + */ +static inline void +actionDisable(action_t *__restrict__ const pThis) +{ + pThis->bDisabled = 1; +} + + + /* create a new action descriptor object * rgerhards, 2007-08-01 * Note that it is vital to set proper initial values as the v6 config @@ -339,17 +343,18 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->iResumeInterval = 30; pThis->iResumeRetryCount = 0; pThis->pszName = NULL; - pThis->bWriteAllMarkMsgs = RSFALSE; + pThis->bWriteAllMarkMsgs = 1; pThis->iExecEveryNthOccur = 0; pThis->iExecEveryNthOccurTO = 0; pThis->iSecsExecOnceInterval = 0; pThis->bExecWhenPrevSusp = 0; pThis->bRepMsgHasMsg = 0; + pThis->bDisabled = 0; + pThis->isTransactional = 0; pThis->bReportSuspension = -1; /* indicate "not yet set" */ pThis->bReportSuspensionCont = -1; /* indicate "not yet set" */ - pThis->bJustResumed = 0; pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ - pthread_mutex_init(&pThis->mutActExec, NULL); + pThis->iActionNbr = iActionNbr; pthread_mutex_init(&pThis->mutAction, NULL); INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); @@ -365,13 +370,11 @@ finalize_it: /* action construction finalizer */ rsRetVal -actionConstructFinalize(action_t *pThis, struct nvlst *lst) +actionConstructFinalize(action_t *__restrict__ const pThis, struct nvlst *lst) { DEFiRet; uchar pszAName[64]; /* friendly name of our action */ - ASSERT(pThis != NULL); - if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) { /* discard actions will be optimized out */ FINALIZE; @@ -382,6 +385,19 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst) pThis->pszName = ustrdup(pszAName); } + /* cache transactional attribute */ + pThis->isTransactional = pThis->pMod->mod.om.supportsTX; + if(pThis->isTransactional && pThis->eParamPassing != ACT_STRING_PASSING) { + errmsg.LogError(0, RS_RET_INVLD_OMOD, "action '%s'(%d) is transactional but " + "uses invalid paramter passing mode -- disabling " + "action. This is probably caused by a pre-v7 " + "output module that needs upgrade.", + pThis->pszName, pThis->iActionNbr); + actionDisable(pThis); + ABORT_FINALIZE(RS_RET_INVLD_OMOD); + } + + /* support statistics gathering */ CHKiRet(statsobj.Construct(&pThis->statsobj)); CHKiRet(statsobj.SetName(pThis->statsobj, pThis->pszName)); @@ -425,13 +441,13 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst) DBGPRINTF("info: firehose mode disabled for action because " "iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n", pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval); - pThis->submitToActQ = doSubmitToActionQComplexBatch; - } else if(pThis->bWriteAllMarkMsgs == RSFALSE) { - /* nearly full-speed submission mode, default case */ - pThis->submitToActQ = doSubmitToActionQNotAllMarkBatch; + pThis->submitToActQ = doSubmitToActionQComplex; + } else if(pThis->bWriteAllMarkMsgs) { + /* full firehose submission mode, default case*/ + pThis->submitToActQ = doSubmitToActionQ; } else { - /* full firehose submission mode */ - pThis->submitToActQ = doSubmitToActionQBatch; + /* nearly full-speed submission mode */ + pThis->submitToActQ = doSubmitToActionQNotAllMark; } /* create queue */ @@ -441,7 +457,7 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, - (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); + processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszAName); qqueueSetpAction(pThis->pQueue, pThis); @@ -518,9 +534,9 @@ rsRetVal actionSetGlobalResumeInterval(int iNewVal) * returned string must not be modified. * rgerhards, 2009-05-07 */ -static uchar *getActStateName(action_t *pThis) +static uchar *getActStateName(action_t * const pThis, wti_t * const pWti) { - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: return (uchar*) "rdy"; case ACT_STATE_ITX: @@ -529,8 +545,6 @@ static uchar *getActStateName(action_t *pThis) return (uchar*) "rtry"; case ACT_STATE_SUSP: return (uchar*) "susp"; - case ACT_STATE_DIED: - return (uchar*) "died"; case ACT_STATE_COMM: return (uchar*) "comm"; default: @@ -542,12 +556,12 @@ static uchar *getActStateName(action_t *pThis) /* returns a suitable return code based on action state * rgerhards, 2009-05-07 */ -static rsRetVal getReturnCode(action_t *pThis) +static rsRetVal getReturnCode(action_t * const pThis, wti_t * const pWti) { DEFiRet; ASSERT(pThis != NULL); - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: iRet = RS_RET_OK; break; @@ -563,12 +577,11 @@ static rsRetVal getReturnCode(action_t *pThis) iRet = RS_RET_SUSPENDED; break; case ACT_STATE_SUSP: - case ACT_STATE_DIED: iRet = RS_RET_ACTION_FAILED; break; default: - DBGPRINTF("Invalid action engine state %d, program error\n", - (int) pThis->eState); + DBGPRINTF("Invalid action engine state %u, program error\n", + getActionState(pWti, pThis)); iRet = RS_RET_ERR; break; } @@ -580,43 +593,33 @@ static rsRetVal getReturnCode(action_t *pThis) /* set the action to a new state * rgerhards, 2007-08-02 */ -static inline void actionSetState(action_t *pThis, action_state_t newState) +static inline void +actionSetState(action_t * const pThis, wti_t * const pWti, uint8_t newState) { - pThis->eState = newState; - DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); + setActionState(pWti, pThis, newState); + DBGPRINTF("Action %d transitioned to state: %s\n", + pThis->iActionNbr, getActStateName(pThis, pWti)); } /* Handles the transient commit state. So far, this is * mostly a dummy... * rgerhards, 2007-08-02 */ -static void actionCommitted(action_t *pThis) +static void actionCommitted(action_t * const pThis, wti_t * const pWti) { - actionSetState(pThis, ACT_STATE_RDY); + actionSetState(pThis, pWti, ACT_STATE_RDY); } /* set action to "rtry" state. * rgerhards, 2007-08-02 */ -static void actionRetry(action_t *pThis) +static void actionRetry(action_t * const pThis, wti_t * const pWti) { - actionSetState(pThis, ACT_STATE_RTRY); - pThis->iResumeOKinRow++; + actionSetState(pThis, pWti, ACT_STATE_RTRY); + incActionResumeInRow(pWti, pThis); } - -/* Disable action, this means it will never again be usable - * until rsyslog is reloaded. Use only as a last resort, but - * depends on output module. - * rgerhards, 2007-08-02 - */ -static void actionDisable(action_t *pThis) -{ - actionSetState(pThis, ACT_STATE_DIED); -} - - /* Suspend action, this involves changing the action state as well * as setting the next retry time. * if we have more than 10 retries, we prolong the @@ -626,7 +629,7 @@ static void actionDisable(action_t *pThis) * rgerhards, 2007-08-02 */ static inline void -actionSuspend(action_t * const pThis) +actionSuspend(action_t * const pThis, wti_t * const pWti) { time_t ttNow; int suspendDuration; @@ -649,24 +652,26 @@ actionSuspend(action_t * const pThis) * since caching, and this would break logic (and it actually did so!) */ datetime.GetTime(&ttNow); - suspendDuration = pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + suspendDuration = pThis->iResumeInterval * (getActionNbrResRtry(pWti, pThis) / 10 + 1); pThis->ttResumeRtry = ttNow + suspendDuration; - actionSetState(pThis, ACT_STATE_SUSP); + actionSetState(pThis, pWti, ACT_STATE_SUSP); pThis->ctrSuspendDuration += suspendDuration; - if(pThis->iNbrResRtry == 0) { + if(getActionNbrResRtry(pWti, pThis) == 0) { STATSCOUNTER_INC(pThis->ctrSuspend, pThis->mutCtrSuspend); } + if( pThis->bReportSuspensionCont - || (pThis->bReportSuspension && pThis->iNbrResRtry == 0) ) { + || (pThis->bReportSuspension && getActionNbrResRtry(pWti, pThis) == 0) ) { ctime_r(&pThis->ttResumeRtry, timebuf); timebuf[strlen(timebuf)-1] = '\0'; /* strip LF */ errmsg.LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, "action '%s' suspended, next retry is %s", pThis->pszName, timebuf); } - DBGPRINTF("action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d\n", + DBGPRINTF("action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d, " + "duration %d\n", pThis->pszName, (long long) pThis->ttResumeRtry, (long long) ttNow, - pThis->iNbrResRtry); + getActionNbrResRtry(pWti, pThis), suspendDuration); } @@ -678,15 +683,15 @@ actionSuspend(action_t * const pThis) * entry point. This is invalid, but has harsh consequences: it will cause the rsyslog * engine to go into a tight loop. That obviously is not acceptable. As such, we track the * count of iterations that a tryResume returning RS_RET_OK is immediately followed by - * an unsuccessful call to doAction(). If that happens more than 1,000 times, we assume + * an unsuccessful call to doAction(). If that happens more than 10 times, we assume * the return acutally is a RS_RET_SUSPENDED. In order to go through the various - * resumption stages, we do this for every 1000 requests. This magic number 1000 may + * resumption stages, we do this for every 10 requests. This magic number 10 may * not be the most appropriate, but it should be thought of a "if nothing else helps" * kind of facility: in the first place, the module should return a proper indication * of its inability to recover. -- rgerhards, 2010-04-26. */ -static inline rsRetVal -actionDoRetry(action_t *pThis, int *pbShutdownImmediate) +static rsRetVal +actionDoRetry(action_t * const pThis, wti_t * const pWti) { int iRetries; int iSleepPeriod; @@ -696,35 +701,40 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) ASSERT(pThis != NULL); iRetries = 0; - while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { + while((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { DBGPRINTF("actionDoRetry: %s enter loop, iRetries=%d\n", pThis->pszName, iRetries); - iRet = pThis->pMod->tryResume(pThis->pModData); + iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); DBGPRINTF("actionDoRetry: %s action->tryResume returned %d\n", pThis->pszName, iRet); - if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { + if((getActionResumeInRow(pWti, pThis) > 9) && (getActionResumeInRow(pWti, pThis) % 10 == 0)) { bTreatOKasSusp = 1; - pThis->iResumeOKinRow = 0; + setActionResumeInRow(pWti, pThis, 0); } else { bTreatOKasSusp = 0; } if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { DBGPRINTF("actionDoRetry: %s had success RDY again (iRet=%d)\n", pThis->pszName, iRet); - pThis->bJustResumed = 1; - actionSetState(pThis, ACT_STATE_RDY); + if(pThis->bReportSuspension) { + errmsg.LogMsg(0, RS_RET_OK, LOG_INFO, "action '%s' " + "resumed (module '%s')", + pThis->pszName, pThis->pMod->pszName); + } + setActionJustResumed(pWti, pThis, 1); + actionSetState(pThis, pWti, ACT_STATE_RDY); } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { /* max retries reached? */ DBGPRINTF("actionDoRetry: %s check for max retries, iResumeRetryCount " "%d, iRetries %d\n", pThis->pszName, pThis->iResumeRetryCount, iRetries); if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { - actionSuspend(pThis); - if(pThis->iNbrResRtry < 20) - ++pThis->iNbrResRtry; + actionSuspend(pThis, pWti); + if(getActionNbrResRtry(pWti, pThis) < 20) + incActionNbrResRtry(pWti, pThis); } else { ++iRetries; iSleepPeriod = pThis->iResumeInterval; srSleep(iSleepPeriod, 0); - if(*pbShutdownImmediate) { + if(*pWti->pbShutdownImmediate) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } } @@ -733,8 +743,8 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) } } - if(pThis->eState == ACT_STATE_RDY) { - pThis->iNbrResRtry = 0; + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { + setActionNbrResRtry(pWti, pThis, 0); } finalize_it: @@ -742,17 +752,32 @@ finalize_it: } +static rsRetVal +actionCheckAndCreateWrkrInstance(action_t * const pThis, wti_t * const pWti) +{ + DEFiRet; + if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) { + DBGPRINTF("wti %p: we need to create a new action worker instance for " + "action %d\n", pWti, pThis->iActionNbr); + CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), + pThis->pModData)); + pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; + setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */ + } +finalize_it: + RETiRet; +} + /* try to resume an action -- rgerhards, 2007-08-02 * changed to new action state engine -- rgerhards, 2009-05-07 */ -static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) +static rsRetVal +actionTryResume(action_t * const pThis, wti_t * const pWti) { DEFiRet; time_t ttNow = NO_TIME_PROVIDED; - ASSERT(pThis != NULL); - - if(pThis->eState == ACT_STATE_SUSP) { + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used * to use the action timestamp, but in this case we will never reach a @@ -762,19 +787,19 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) */ datetime.GetTime(&ttNow); /* cache "now" */ if(ttNow >= pThis->ttResumeRtry) { - actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ + actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */ } } - if(pThis->eState == ACT_STATE_RTRY) { + if(getActionState(pWti, pThis) == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); - CHKiRet(actionDoRetry(pThis, pbShutdownImmediate)); + CHKiRet(actionDoRetry(pThis, pWti)); } - if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { + if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) { DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n", - pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + pThis, getActStateName(pThis, pWti), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); } finalize_it: @@ -786,24 +811,25 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate) +static inline rsRetVal +actionPrepare(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) { DEFiRet; - assert(pThis != NULL); - CHKiRet(actionTryResume(pThis, pbShutdownImmediate)); + CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); + CHKiRet(actionTryResume(pThis, pWti)); /* if we are now ready, we initialize the transaction and advance * action state accordingly */ - if(pThis->eState == ACT_STATE_RDY) { - iRet = pThis->pMod->mod.om.beginTransaction(pThis->pModData); + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { + iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionSetState(pThis, ACT_STATE_ITX); + actionSetState(pThis, pWti, ACT_STATE_ITX); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -817,10 +843,11 @@ finalize_it: } +#if 0 // TODO: remove? /* debug-print the contents of an action object * rgerhards, 2007-08-02 */ -rsRetVal actionDbgPrint(action_t *pThis) +static rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; char *sz; @@ -830,11 +857,12 @@ rsRetVal actionDbgPrint(action_t *pThis) dbgprintf("\n"); dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); - if(pThis->eState == ACT_STATE_SUSP) { +#if 0 // do we need this ??? + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { dbgprintf("\tresume next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); } - dbgprintf("\tState: %s\n", getActStateName(pThis)); +#endif dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); if(pThis->submitToActQ == doSubmitToActionQComplexBatch) { sz = "slow, but feature-rich"; @@ -850,45 +878,57 @@ rsRetVal actionDbgPrint(action_t *pThis) RETiRet; } +#endif /* prepare the calling parameters for doAction() * rgerhards, 2009-05-07 */ static rsRetVal -prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow) +prepareDoActionParams(action_t * __restrict__ const pAction, + wti_t * __restrict__ const pWti, + msg_t *__restrict__ const pMsg, + struct syslogTime *ttNow) { int i; - msg_t *pMsg; struct json_object *json; + actWrkrIParams_t *iparams; + actWrkrInfo_t *__restrict__ pWrkrInfo; DEFiRet; - ASSERT(pAction != NULL); - ASSERT(pElem != NULL); - - pMsg = pElem->pMsg; - /* here we must loop to process all requested strings */ - for(i = 0 ; i < pAction->iNumTpls ; ++i) { - switch(pAction->eParamPassing) { + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + if(pAction->isTransactional) { + CHKiRet(wtiNewIParam(pWti, pAction, &iparams)); + for(i = 0 ; i < pAction->iNumTpls ; ++i) { + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, + &actParam(iparams, pAction->iNumTpls, 0, i), + ttNow)); + } + } else { + for(i = 0 ; i < pAction->iNumTpls ; ++i) { + switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]), - &pElem->staticLenStrings[i], ttNow)); - pElem->staticActParams[i] = pElem->staticActStrings[i]; + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, + &(pWrkrInfo->p.nontx.actParams[i]), + ttNow)); break; case ACT_ARRAY_PASSING: - CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow)); + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, + (uchar***) &(pWrkrInfo->p.nontx.actParams[i].param), ttNow)); break; case ACT_MSG_PASSING: - pElem->staticActParams[i] = (void*) pMsg; + pWrkrInfo->p.nontx.actParams[i].param = (void*) pMsg; break; case ACT_JSON_PASSING: CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow)); - pElem->staticActParams[i] = (void*) json; + pWrkrInfo->p.nontx.actParams[i].param = (void*) json; break; - default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n", + default:dbgprintf("software bug/error: unknown pAction->eParamPassing " + "%d in prepareDoActionParams\n", (int) pAction->eParamPassing); assert(0); /* software bug if this happens! */ break; + } } } @@ -897,62 +937,49 @@ finalize_it: } -/* free a batches ressources, but not string buffers (because they will - * most probably be reused). String buffers are only deleted upon final - * destruction of the batch. - * This function here must be called only when the batch is actually no - * longer used, also not for retrying actions or such. It invalidates - * buffers. - * rgerhards, 2010-12-17 - */ -static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) +static void +releaseDoActionParams(action_t *__restrict__ const pAction, wti_t *__restrict__ const pWti) { int jArr; - int i, j; - batch_obj_t *pElem; + int j; + actWrkrInfo_t *__restrict__ pWrkrInfo; uchar ***ppMsgs; - DEFiRet; - - ASSERT(pAction != NULL); if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING) goto done; /* we need to do nothing with these types! */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - switch(pAction->eParamPassing) { - case ACT_ARRAY_PASSING: - ppMsgs = (uchar***) pElem->staticActParams; - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - if(((uchar**)ppMsgs)[j] != NULL) { - jArr = 0; - while(ppMsgs[j][jArr] != NULL) { - d_free(ppMsgs[j][jArr]); - ppMsgs[j][jArr] = NULL; - ++jArr; - } - d_free(((uchar**)ppMsgs)[j]); - ((uchar**)ppMsgs)[j] = NULL; - } + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + switch(pAction->eParamPassing) { + case ACT_ARRAY_PASSING: + ppMsgs = (uchar***) pWrkrInfo->p.nontx.actParams[0].param; + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + if(((uchar**)ppMsgs)[j] != NULL) { + jArr = 0; + while(ppMsgs[j][jArr] != NULL) { + free(ppMsgs[j][jArr]); + ppMsgs[j][jArr] = NULL; + ++jArr; } - break; - case ACT_JSON_PASSING: - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - json_object_put((struct json_object*) - pElem->staticActParams[j]); - pElem->staticActParams[j] = NULL; - } - break; - case ACT_STRING_PASSING: - case ACT_MSG_PASSING: - /* can never happen, just to keep compiler happy! */ - break; + free(((uchar**)ppMsgs)[j]); + ((uchar**)ppMsgs)[j] = NULL; } } + break; + case ACT_JSON_PASSING: + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + json_object_put((struct json_object*) + pWrkrInfo->p.nontx.actParams[j].param); + pWrkrInfo->p.nontx.actParams[j].param = NULL; + } + break; + case ACT_STRING_PASSING: + /* strings are destructed when the worker terminates */ + case ACT_MSG_PASSING: + /* can never happen, just to keep compiler happy! */ + break; } -done: RETiRet; +done: return; } /* This is used in resume processing. We only finally know that a resume @@ -960,52 +987,43 @@ done: RETiRet; * we need to do some cleanup and status tracking in that case. */ static void -actionSetActionWorked(action_t *__restrict__ const pThis) +actionSetActionWorked(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) { - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + setActionResumeInRow(pWti, pThis, 0); - if(pThis->bJustResumed) { + if(getActionJustResumed(pWti, pThis)) { /* OK, we *really* could resume, so tell user! */ if(pThis->bReportSuspension) { errmsg.LogMsg(0, RS_RET_RESUMED, LOG_INFO, "action '%s' " "resumed (module '%s')", pThis->pszName, pThis->pMod->pszName); } - pThis->bJustResumed = 0; + setActionJustResumed(pWti, pThis, 0); } } -/* call the DoAction output plugin entry point - * rgerhards, 2008-01-28 - */ -rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) +static rsRetVal +handleActionExecResult(action_t *__restrict__ const pThis, + wti_t *__restrict__ const pWti, + const rsRetVal ret) { DEFiRet; - - ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); - - DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); - - pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); - switch(iRet) { + switch(ret) { case RS_RET_OK: - actionCommitted(pThis); - actionSetActionWorked(pThis); /* we had a successful call! */ + actionCommitted(pThis, pWti); + actionSetActionWorked(pThis, pWti); /* we had a successful call! */ break; case RS_RET_DEFER_COMMIT: - actionSetActionWorked(pThis); /* we had a successful call! */ + actionSetActionWorked(pThis, pWti); /* we had a successful call! */ /* we are done, action state remains the same */ break; case RS_RET_PREVIOUS_COMMITTED: /* action state remains the same, but we had a commit. */ pThis->bHadAutoCommit = 1; - actionSetActionWorked(pThis); /* we had a successful call! */ + actionSetActionWorked(pThis, pWti); /* we had a successful call! */ break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -1013,70 +1031,134 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) default:/* permanent failure of this message - no sense in retrying. This is * not yet handled (but easy TODO) */ + iRet = ret; FINALIZE; } - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; } +/* call the DoAction output plugin entry point + * rgerhards, 2008-01-28 + */ +static rsRetVal +actionCallDoAction(action_t *__restrict__ const pThis, + actWrkrIParams_t *__restrict__ const iparams, + wti_t *__restrict__ const pWti) +{ + uchar *param[CONF_OMOD_NUMSTRINGS_MAXSIZE]; + int i; + DEFiRet; + + DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", + getActStateName(pThis, pWti), pThis->iActionNbr); + + pThis->bHadAutoCommit = 0; + /* for this interface, we need to emulate the old style way + * of parameter passing. + */ + for(i = 0 ; i < pThis->iNumTpls ; ++i) { + param[i] = actParam(iparams, pThis->iNumTpls, 0, i).param; + } + + iRet = pThis->pMod->mod.om.doAction(param, + pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); + iRet = handleActionExecResult(pThis, pWti, iRet); + RETiRet; +} + + +/* call the commitTransaction output plugin entry point */ +static rsRetVal +actionCallCommitTransaction(action_t * const pThis, + const actWrkrInfo_t *const wrkrInfo, + wti_t *const pWti) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + DBGPRINTF("entering actionCallCommitTransaction(), state: %s, actionNbr %d, " + "nMsgs %u\n", + getActStateName(pThis, pWti), pThis->iActionNbr, + wrkrInfo->p.tx.currIParam); + + iRet = pThis->pMod->mod.om.commitTransaction( + pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData, + wrkrInfo->p.tx.iparams, wrkrInfo->p.tx.currIParam); + iRet = handleActionExecResult(pThis, pWti, iRet); + RETiRet; +} + /* process a message * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ -static inline rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate) +rsRetVal +actionProcessMessage(action_t * const pThis, void *actParams, wti_t * const pWti) { DEFiRet; - ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); - - CHKiRet(actionPrepare(pThis, pbShutdownImmediate)); + CHKiRet(actionPrepare(pThis, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) - pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); - if(pThis->eState == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); + pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pWti->pbShutdownImmediate); + if(getActionState(pWti, pThis) == ACT_STATE_ITX) + CHKiRet(actionCallDoAction(pThis, actParams, pWti)); - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; } -/* finish processing a batch. Most importantly, that means we commit if we - * need to do so. - * rgerhards, 2008-01-28 - */ +/* the following functions simulates a potential future new omo callback */ static rsRetVal -finishBatch(action_t *pThis, batch_t *pBatch) +doTransaction(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) { + actWrkrInfo_t *wrkrInfo; int i; DEFiRet; - ASSERT(pThis != NULL); - - if(pThis->eState == ACT_STATE_RDY) { - /* we just need to flag the batch as commited */ - FINALIZE; /* nothing to do */ + wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); + if(pThis->pMod->mod.om.commitTransaction != NULL) { + DBGPRINTF("doTransaction: have commitTransaction IF, using that, pWrkrInfo %p\n", wrkrInfo); + CHKiRet(actionCallCommitTransaction(pThis, wrkrInfo, pWti)); + } else { /* note: this branch is for compatibility with old TX modules */ + DBGPRINTF("doTransaction: action %d, currIParam %d\n", + pThis->iActionNbr, wrkrInfo->p.tx.currIParam); + for(i = 0 ; i < wrkrInfo->p.tx.currIParam ; ++i) { + /* Note: we provide the message's base iparam - actionProcessMessage() + * uses this as *base* address. + */ + iRet = actionProcessMessage(pThis, + &actParam(wrkrInfo->p.tx.iparams, pThis->iNumTpls, i, 0), pWti); + } } +finalize_it: + RETiRet; +} - CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate)); - if(pThis->eState == ACT_STATE_ITX) { - iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData); + +/* Commit try committing (do not handle retry processing and such) */ +static rsRetVal +actionTryCommit(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) +{ + DEFiRet; + + doTransaction(pThis, pWti); + + CHKiRet(actionPrepare(pThis, pWti)); + if(getActionState(pWti, pThis) == ACT_STATE_ITX) { + iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionCommitted(pThis); - /* flag messages as committed */ - for(i = 0 ; i < pBatch->nElem ; ++i) { - batchSetElemState(pBatch, i, BATCH_STATE_COMM); - pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */ - } + actionCommitted(pThis, pWti); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -1084,12 +1166,12 @@ finishBatch(action_t *pThis, batch_t *pBatch) case RS_RET_DEFER_COMMIT: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; case RS_RET_PREVIOUS_COMMITTED: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; default:/* permanent failure of this message - no sense in retrying. This is * not yet handled (but easy TODO) @@ -1097,325 +1179,172 @@ finishBatch(action_t *pThis, batch_t *pBatch) FINALIZE; } } - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: + pWti->actWrkrInfo[pThis->iActionNbr].p.tx.currIParam = 0; /* reset to beginning */ RETiRet; } - -/* try to submit a partial batch of elements. - * rgerhards, 2009-05-12 - */ -static inline rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) -{ - int i; - int iElemProcessed; - int iCommittedUpTo; - msg_t *pMsg; - rsRetVal localRet; - DEFiRet; - - assert(pBatch != NULL); - assert(pnElem != NULL); - - i = pBatch->iDoneUpTo; /* all messages below that index are processed */ - iElemProcessed = 0; - iCommittedUpTo = i; - DBGPRINTF("tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem); - while(iElemProcessed <= *pnElem && i < pBatch->nElem) { - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - /* NOTE: do NOT extend the filter below! Anything else must be done on the - * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15 - */ - if(batchIsValidElem(pBatch, i)) { - pMsg = pBatch->pElem[i].pMsg; - localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, - pBatch->pbShutdownImmediate); - DBGPRINTF("action %p call returned %d\n", pAction, localRet); - /* Note: we directly modify the batch object state, because we know that - * wo do not overwrite BATCH_STATE_DISC indicators! - */ - if(localRet == RS_RET_OK) { - /* mark messages as committed */ - while(iCommittedUpTo <= i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; - } - } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { - /* mark messages as committed */ - while(iCommittedUpTo < i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; - } - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DEFER_COMMIT) { - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DISCARDMSG) { - pBatch->eltState[i] = BATCH_STATE_DISC; - } else { - dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n", - localRet, *pnElem, iCommittedUpTo); - iRet = localRet; - FINALIZE; - } - } - ++i; - ++iElemProcessed; - } - -finalize_it: - if(pBatch->iDoneUpTo != iCommittedUpTo) { - pBatch->iDoneUpTo = iCommittedUpTo; - } - RETiRet; -} - -/* submit a batch for actual action processing. - * The first nElem elements are processed. This function calls itself - * recursively if it needs to handle errors. - * Note: we don't need the number of the first message to be processed as a parameter, - * because this is kept track of inside the batch itself (iDoneUpTo). - * rgerhards, 2009-05-12 +/* Note: we currently need to return an iRet, as this is used in + * direct mode. TODO: However, it may be worth further investigating this, + * as it looks like there is no ultimate consumer of this code. + * rgerhards, 2013-11-06 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +actionCommit(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) { - int i; - int bDone; - rsRetVal localRet; - int wasDoneTo; + sbool bDone; DEFiRet; - assert(pBatch != NULL); + if(!pThis->isTransactional || + pWti->actWrkrInfo[pThis->iActionNbr].p.tx.currIParam == 0 || + getActionState(pWti, pThis) == ACT_STATE_SUSP + ) { + FINALIZE; + } - DBGPRINTF("submitBatch: enter, nElem %d\n", nElem); - wasDoneTo = pBatch->iDoneUpTo; + /* even more TODO: + This is the place where retry processing needs to go in. If the action + permanently fails, we should - as a new feature - add the capability to + write an error file. This is already done be omelasticsearch, and IMHO + pretty useful. + For the time being, I do NOT implement all of this (not even retry!) + as I want to get the rest of the engine to SISD (non-SIMD ;)) so that + I know any potential suprises and complications that arise out of this. + When this is done, I can come back here and complete this work. Obviously, + many features do not work in the mean time (but it is not planned to release + any of these partial implementations). + rgerhards, 2013-11-04 + */ bDone = 0; do { - localRet = tryDoAction(pAction, pBatch, &nElem); - if(localRet == RS_RET_FORCE_TERM) { + iRet = actionTryCommit(pThis, pWti); + DBGPRINTF("actionCommit, in retry loop, iRet %d\n", iRet); + if(iRet == RS_RET_FORCE_TERM) { ABORT_FINALIZE(RS_RET_FORCE_TERM); - } - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { - /* try commit transaction, once done, we can simply do so as if - * that return state was returned from tryDoAction(). - */ - localRet = finishBatch(pAction, pBatch); - } - - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { + } else if(iRet == RS_RET_OK || + iRet == RS_RET_SUSPENDED || + iRet == RS_RET_ACTION_FAILED) { bDone = 1; - } else if(localRet == RS_RET_SUSPENDED) { - DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n"); - /* do nothing, this will retry the full batch */ - } else if(localRet == RS_RET_ACTION_FAILED) { - /* in this case, everything not yet committed is BAD */ - for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && pBatch->eltState[i] != BATCH_STATE_COMM ) { - pBatch->eltState[i] = BATCH_STATE_BAD; - pBatch->pElem[i].bPrevWasSuspended = 1; - STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); - } - } + } + if(getActionState(pWti, pThis) == ACT_STATE_RDY || + getActionState(pWti, pThis) == ACT_STATE_SUSP) { bDone = 1; - } else { - if(nElem == 1) { - batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD); - bDone = 1; - } else { - /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ - DBGPRINTF("submitBatch recursing trying to find and exclude the culprit " - "for iRet %d\n", localRet); - submitBatch(pAction, pBatch, nElem / 2); - submitBatch(pAction, pBatch, nElem - (nElem / 2)); - bDone = 1; - } } - } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */ - - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - + } while(!bDone); finalize_it: RETiRet; } - -/* copy "active" array of batch, as we need to modify it. The caller - * must make sure the new array is freed and the orginal batch - * pointer is restored (thus the caller must save it). If active - * is currently NULL, this is properly handled. - * Note: the batches active pointer is modified, so it must be - * saved BEFORE calling this function! - * rgerhards, 2012-09-12 - */ -static rsRetVal -copyActive(batch_t *pBatch) +/* Commit all active transactions in *DIRECT mode* */ +void +actionCommitAllDirect(wti_t *__restrict__ const pWti) { - sbool *active; - DEFiRet; + int i; + action_t *pAction; - CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); - if(pBatch->active == NULL) - memset(active, 1, batchNumMsgs(pBatch)); - else - memcpy(active, pBatch->active, batchNumMsgs(pBatch)); - pBatch->active = active; -finalize_it: - RETiRet; + for(i = 0 ; i < iActionNbr ; ++i) { + pAction = pWti->actWrkrInfo[i].pAction; + if(pAction == NULL) + continue; + DBGPRINTF("actionCommitAll: action %d, state %u, nbr to commit %d " + "isTransactional %d\n", + i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo->p.tx.currIParam, + pAction->isTransactional); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + actionCommit(pAction, pWti); + } } -/* The following function prepares a batch for processing, that it is - * reinitializes batch states, generates strings and does everything else - * that needs to be done in order to make the batch ready for submission to - * the actual output module. Note that we look at the precomputed - * filter OK condition and process only those messages, that actually matched - * the filter. - * rgerhards, 2010-06-14 +/* process a single message. This is both called if we run from the + * cosumer side of an action queue as well as directly from the main + * queue thread if the action queue is set to "direct". */ -static inline rsRetVal -prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr) +static rsRetVal +processMsgMain(action_t *__restrict__ const pAction, + wti_t *__restrict__ const pWti, + msg_t *__restrict__ const pMsg, + struct syslogTime *ttNow) { - int i; - batch_obj_t *pElem; - struct syslogTime ttNow; DEFiRet; - /* indicate we have not yet read the date */ - ttNow.year = 0; - - pBatch->iDoneUpTo = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - pBatch->eltState[i] = BATCH_STATE_RDY; - if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) { - /* make sure we have our copy of "active" array */ - if(!*bMustRestoreActivePtr) { - *activeSave = pBatch->active; - copyActive(pBatch); - } - pBatch->active[i] = RSFALSE; - } - } + if(pAction->bExecWhenPrevSusp && !pWti->execState.bPrevWasSuspended) { + DBGPRINTF("action %d: NOT executing, as previous action was " + "not suspended\n", pAction->iActionNbr); + FINALIZE; } - RETiRet; -} + iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow); -/* receive a batch and process it. This includes retry handling. - * rgerhards, 2009-05-12 - */ -static inline rsRetVal -processAction(action_t *pAction, batch_t *pBatch) -{ - DEFiRet; - - assert(pBatch != NULL); - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); - iRet = finishBatch(pAction, pBatch); + if(pAction->isTransactional) { + pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction; + DBGPRINTF("action %d is transactional - executing in commit phase\n", pAction->iActionNbr); + actionPrepare(pAction, pWti); + iRet = getReturnCode(pAction, pWti); + FINALIZE; + } + iRet = actionProcessMessage(pAction, + pWti->actWrkrInfo[pAction->iActionNbr].p.nontx.actParams, + pWti); + releaseDoActionParams(pAction, pWti); finalize_it: + if(iRet == RS_RET_OK) { + if(pWti->execState.bDoAutoCommit) + iRet = actionCommit(pAction, pWti); + } + pWti->execState.bPrevWasSuspended = (iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED); RETiRet; } - -#pragma GCC diagnostic ignored "-Wempty-body" -/* receive an array of to-process user pointers and submit them - * for processing. - * rgerhards, 2009-04-22 +/* This entry point is called by the ACTION queue (not main queue!) */ static rsRetVal -processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) +processBatchMain(void *__restrict__ const pVoid, + batch_t *__restrict__ const pBatch, + wti_t *__restrict__ const pWti) { - int *pbShutdownImmdtSave; - sbool *activeSave; - int bMustRestoreActivePtr = 0; - rsRetVal localRet; + action_t *__restrict__ const pAction = (action_t*__restrict__ const) pVoid; + int i; + struct syslogTime ttNow; DEFiRet; - assert(pBatch != NULL); - - if(pbShutdownImmediate != NULL) { - pbShutdownImmdtSave = pBatch->pbShutdownImmediate; - pBatch->pbShutdownImmediate = pbShutdownImmediate; - } - CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); - - /* We now must guard the output module against execution by multiple threads. The - * plugin interface specifies that output modules must not be thread-safe (except - * if they notify us they are - functionality not yet implemented...). - * rgerhards, 2008-01-30 - */ - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - - iRet = processAction(pAction, pBatch); - - pthread_cleanup_pop(1); /* unlock mutex */ - - /* even if processAction failed, we need to release the batch (else we - * have a memory leak). So we do this first, and then check if we need to - * return an error code. If so, the code from processAction has priority. - * rgerhards, 2010-12-17 - */ - localRet = releaseBatch(pAction, pBatch); + wtiResetExecState(pWti, pBatch); + /* indicate we have not yet read the date */ + ttNow.year = 0; - if(iRet == RS_RET_OK) - iRet = localRet; - - if(bMustRestoreActivePtr) { - free(pBatch->active); - pBatch->active = activeSave; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*pWti->pbShutdownImmediate ; ++i) { + if(batchIsValidElem(pBatch, i)) { + iRet = processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow); + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + } } -finalize_it: - if(pbShutdownImmediate != NULL) - pBatch->pbShutdownImmediate = pbShutdownImmdtSave; + iRet = actionCommit(pAction, pWti); RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" -/* call the HUP handler for a given action, if such a handler is defined. The - * action mutex is locked, because the HUP handler most probably needs to modify - * some internal state information. - * rgerhards, 2008-10-22 +/* call the HUP handler for a given action, if such a handler is defined. + * Note that the action must be able to service HUP requests concurrently + * to any current doAction() processing. */ -#pragma GCC diagnostic ignored "-Wempty-body" rsRetVal -actionCallHUPHdlr(action_t *pAction) +actionCallHUPHdlr(action_t * const pAction) { DEFiRet; ASSERT(pAction != NULL); DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP); - if(pAction->pMod->doHUP == NULL) { - FINALIZE; /* no HUP handler, so we are done ;) */ + if(pAction->pMod->doHUP != NULL) { + CHKiRet(pAction->pMod->doHUP(pAction->pModData)); } - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - CHKiRet(pAction->pMod->doHUP(pAction->pModData)); - pthread_cleanup_pop(1); /* unlock mutex */ - finalize_it: RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" /* set the action message queue mode @@ -1450,27 +1379,28 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT /* This submits the message to the action queue in case we do NOT need to handle repeat * message processing. That case permits us to gain lots of freedom during processing - * and thus speed. This is also utilized to submit messages in complex case once + * and thus speed. This is also utilized to submit messages in more complex cases once * the complex logic has been applied ;) * rgerhards, 2010-06-08 */ -static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +static rsRetVal +doSubmitToActionQ(action_t * const pAction, wti_t * const pWti, msg_t *pMsg) { + struct syslogTime ttNow; // TODO: think if we can buffer this in pWti DEFiRet; - if(pAction->eState == ACT_STATE_DIED) { - DBGPRINTF("action %p died, do NOT execute\n", pAction); - FINALIZE; - } + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg)); - else + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { + ttNow.year = 0; + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow); + } else {/* in this case, we do single submits to the queue. + * TODO: optimize this, we may do at least a multi-submit! + */ iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); + } -finalize_it: RETiRet; } @@ -1483,7 +1413,7 @@ finalize_it: * be filtered out before calling us (what is done currently!). */ rsRetVal -actionWriteToAction(action_t *pAction, msg_t *pMsg) +actionWriteToAction(action_t * const pAction, msg_t *pMsg, wti_t * const pWti) { DEFiRet; @@ -1538,44 +1468,46 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pMsg); + iRet = doSubmitToActionQ(pAction, pWti, pMsg); finalize_it: RETiRet; } -/* helper to actonCallAction, mostly needed because of this damn - * pthread_cleanup_push() POSIX macro... +/* Call configured action, most complex case with all features supported (and thus slow). + * rgerhards, 2010-06-08 */ -static inline rsRetVal -doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) +#pragma GCC diagnostic ignored "-Wempty-body" +static rsRetVal +doSubmitToActionQComplex(action_t * const pAction, wti_t * const pWti, msg_t *pMsg) { - msg_t *pMsg; DEFiRet; - pMsg = pBatch->pElem[idxBtch].pMsg; + d_pthread_mutex_lock(&pAction->mutAction); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); + DBGPRINTF("Called action %p (complex case), logging to %s\n", + pAction, module.GetStateName(pAction->pMod)); + pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ + // TODO: can we optimize the "now" handling again (was batch, I guess...)? /* don't output marks to recently written outputs */ - if(pAction->bWriteAllMarkMsgs == RSFALSE + if(pAction->bWriteAllMarkMsgs == 0 && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { ABORT_FINALIZE(RS_RET_OK); } /* call the output driver */ - iRet = actionWriteToAction(pAction, pMsg); + iRet = actionWriteToAction(pAction, pMsg, pWti); finalize_it: - /* we need to update the batch to handle failover processing correctly */ - if(iRet == RS_RET_OK) { - pBatch->pElem[idxBtch].bPrevWasSuspended = 0; - } else if(iRet == RS_RET_ACTION_FAILED) { - pBatch->pElem[idxBtch].bPrevWasSuspended = 1; - } + d_pthread_mutex_unlock(&pAction->mutAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ RETiRet; } +#pragma GCC diagnostic warning "-Wempty-body" /* helper to activateActions, it activates a specific action. @@ -1583,7 +1515,7 @@ finalize_it: DEFFUNC_llExecFunc(doActivateActions) { rsRetVal localRet; - action_t *pThis = (action_t*) pData; + action_t * const pThis = (action_t*) pData; BEGINfunc localRet = qqueueStart(pThis->pQueue); if(localRet != RS_RET_OK) { @@ -1624,201 +1556,48 @@ activateActions(void) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQNotAllMark(action_t * const pAction, wti_t * const pWti, msg_t * const pMsg) { - time_t now = 0; + int doProcess = 1; time_t lastAct; - int i; - sbool *activeSave; - DEFiRet; - - activeSave = pBatch->active; - copyActive(pBatch); - - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i]) - continue; - if(now == 0) { - now = datetime.GetTime(NULL); /* good time call - the only one done */ - } - /* CAS loop, we write back a bit early, but that's OK... */ - /* we use reception time, not dequeue time - this is considered more appropriate and - * also faster ;) -- rgerhards, 2008-09-17 */ - do { - lastAct = pAction->f_time; - if(pBatch->pElem[i].pMsg->msgFlags & MARK) { - if((now - lastAct) < MarkInterval / 2) { - pBatch->active[i] = 0; - DBGPRINTF("batch item %d: action was recently called, ignoring " - "mark message\n", i); - break; /* do not update timestamp for non-written mark messages */ - } - } - } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, - pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0); - if(pBatch->active[i]) { - DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", - i, module.GetStateName(pAction->pMod)); - } - } - - iRet = doSubmitToActionQBatch(pAction, pBatch); - - free(pBatch->active); - pBatch->active = activeSave; - - RETiRet; -} - -static inline void -countStatsBatchEnq(action_t *pAction, batch_t *pBatch) -{ - int i; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - } - } -} - - -/* enqueue a batch in direct mode. We have put this into its own function just to avoid - * cluttering the actual submit function. - * rgerhards, 2011-06-16 - */ -static inline rsRetVal -doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) -{ - sbool bNeedSubmit; - sbool *activeSave; - int i; DEFiRet; - activeSave = pBatch->active; - copyActive(pBatch); - - /* note: for direct mode, we need to adjust the filter property. For non-direct - * this is not necessary, because in that case we enqueue only what actually needs - * to be processed. + /* TODO: think about the whole logic. If messages come in out of order, things + * tend to become a bit unreliable. On the other hand, this only happens if we have + * very high traffic, in which this use case here is not really affected (as the + * MarkInterval is pretty corase). */ - if(pAction->bExecWhenPrevSusp) { - bNeedSubmit = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if(!pBatch->pElem[i].bPrevWasSuspended) { - DBGPRINTF("action enq stage: change active to 0 due to " - "failover case in elem %d\n", i); - pBatch->active[i] = 0; - } - if(batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - bNeedSubmit = 1; - } - DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - } - if(bNeedSubmit) { - /* note: stats were already computed above */ - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); - } else { - DBGPRINTF("no need to submit batch, all invalid\n"); - } - } else { - if(GatherStats) - countStatsBatchEnq(pAction, pBatch); - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); - } - - free(pBatch->active); - pBatch->active = activeSave; - RETiRet; -} - -/* This submits the message to the action queue in case we do NOT need to handle repeat - * message processing. That case permits us to gain lots of freedom during processing - * and thus speed. - * rgerhards, 2010-06-08 - */ -static rsRetVal -doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) -{ - int i; - DEFiRet; - - DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); - - if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { - iRet = doQueueEnqObjDirectBatch(pAction, pBatch); - } else {/* in this case, we do single submits to the queue. - * TODO: optimize this, we may do at least a multi-submit! - */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( batchIsValidElem(pBatch, i) - && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { - doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg); + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if(pMsg->msgFlags & MARK) { + if((pMsg->ttGenTime - lastAct) < MarkInterval / 2) { + doProcess = 0; + DBGPRINTF("action was recently called, ignoring mark message\n"); + break; /* do not update timestamp for non-written mark messages */ } } - } - - RETiRet; -} + } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, + pMsg->ttGenTime, &pAction->mutCAS) == 0); - - -/* Helper to submit a batch of actions to the engine. Note that we have rather - * complicated processing here, so we need to do this one message after another. - * rgerhards, 2010-06-23 - */ -static inline rsRetVal -helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) -{ - int i; - DEFiRet; - - DBGPRINTF("Called action %p (complex case), logging to %s\n", - pAction, module.GetStateName(pAction->pMod)); - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( batchIsValidElem(pBatch, i) - && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { - doActionCallAction(pAction, pBatch, i); - } + if(doProcess) { + DBGPRINTF("Called action(NotAllMark), processing via '%s'\n", + module.GetStateName(pAction->pMod)); + iRet = doSubmitToActionQ(pAction, pWti, pMsg); } RETiRet; } -/* Call configured action, most complex case with all features supported (and thus slow). - * rgerhards, 2010-06-08 - */ -#pragma GCC diagnostic ignored "-Wempty-body" -static rsRetVal -doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) -{ - DEFiRet; - - d_pthread_mutex_lock(&pAction->mutAction); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); - iRet = helperSubmitToActionQComplexBatch(pAction, pBatch); - d_pthread_mutex_unlock(&pAction->mutAction); - pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - - RETiRet; -} -#pragma GCC diagnostic warning "-Wempty-body" - /* apply all params from param block to action. This supports the v6 config system. * Defaults must have been set appropriately during action construct! * rgerhards, 2011-08-01 */ static rsRetVal -actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals) +actionApplyCnfParam(action_t * const pAction, struct cnfparamvals * const pvals) { int i; @@ -1866,7 +1645,7 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals) rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, - struct nvlst *lst, int bSuspended) + struct nvlst * const lst) { DEFiRet; int i; @@ -1895,7 +1674,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->bRepMsgHasMsg = cs.bActionRepMsgHasMsg; cs.iActExecEveryNthOccur = 0; /* auto-reset */ cs.iActExecEveryNthOccurTO = 0; /* auto-reset */ - cs.bActionWriteAllMarkMsgs = RSFALSE; /* auto-reset */ + cs.bActionWriteAllMarkMsgs = 1; /* auto-reset */ cs.pszActionName = NULL; /* free again! */ } else { actionApplyCnfParam(pAction, actParams); @@ -1953,15 +1732,10 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->pMod = pMod; pAction->pModData = pModData; - /* check if the module is compatible with select features (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ - - if(bSuspended) - actionSuspend(pAction); CHKiRet(actionConstructFinalize(pAction, lst)); - /* TODO: if we exit here, we have a memory leak... */ + /* TODO: if we exit here, we have a (quite acceptable...) memory leak */ *ppAction = pAction; /* finally store the action pointer */ @@ -1997,7 +1771,7 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus static inline void initConfigVariables(void) { - cs.bActionWriteAllMarkMsgs = RSFALSE; + cs.bActionWriteAllMarkMsgs = 1; cs.glbliActionResumeRetryCount = 0; cs.bActExecWhenPrevSusp = 0; cs.iActExecOnceInterval = 0; @@ -2036,17 +1810,11 @@ actionNewInst(struct nvlst *lst, action_t **ppAction) errmsg.LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName); ABORT_FINALIZE(RS_RET_MOD_UNKNOWN); } - iRet = pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR); - // TODO: check if RS_RET_SUSPENDED is still valid in v6! - if(iRet != RS_RET_OK && iRet != RS_RET_SUSPENDED) { - FINALIZE; /* iRet is already set to error state */ - } + CHKiRet(pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR)); - if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst, - (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst)) == RS_RET_OK) { /* check if the module is compatible with select features * (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ loadConf->actions.nbrActions++; /* one more active action! */ } *ppAction = pAction; @@ -2057,8 +1825,6 @@ finalize_it: RETiRet; } -/* TODO: we are not yet a real object, the ClassInit here just looks like it is.. - */ rsRetVal actionClassInit(void) { DEFiRet; |