diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 1359 |
1 files changed, 602 insertions, 757 deletions
@@ -13,18 +13,15 @@ * The different modes (and calling sequence) are: * * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval - * - doSubmitToActionQComplexBatch - * - helperSubmitToActionQComplexBatch - * - doActionCallAction + * - doSubmitToActionQComplex * handles mark message reduction, but in essence calls * - actionWriteToAction * - qqueueEnqObj * (now queue engine processing) - * if(pThis->bWriteAllMarkMsgs == RSFALSE) - this is the DEFAULT - * - doSubmitToActionQNotAllMarkBatch - * - doSubmitToActionQBatch (and from here like in the else case below!) + * if(pThis->bWriteAllMarkMsgs == RSFALSE) + * - doSubmitToActionQNotAllMark + * - doSubmitToActionQ (and from here like in the else case below!) * else - * - doSubmitToActionQBatch * - doSubmitToActionQ * - qqueueEnqObj * (now queue engine processing) @@ -36,9 +33,6 @@ * * After dequeue, processing is as follows: * - processBatchMain - * - processAction - * - submitBatch - * - tryDoAction * - ... * * MORE ON PROCESSING, QUEUES and FILTERING @@ -69,7 +63,7 @@ * beast. * rgerhards, 2011-06-15 * - * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -98,7 +92,7 @@ #include <strings.h> #include <time.h> #include <errno.h> -#include <json/json.h> +#include <json.h> #include "dirty.h" #include "template.h" @@ -114,18 +108,18 @@ #include "unicode-helper.h" #include "atomic.h" #include "ruleset.h" +#include "parserif.h" #include "statsobj.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); -static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t * const pWti); +static rsRetVal doSubmitToActionQ(action_t * const pAction, wti_t * const pWti, msg_t*); +static rsRetVal doSubmitToActionQComplex(action_t * const pAction, wti_t * const pWti, msg_t*); +static rsRetVal doSubmitToActionQNotAllMark(action_t * const pAction, wti_t * const pWti, msg_t*); /* object static data (once for all instances) */ -/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ DEFobjCurrIf(obj) DEFobjCurrIf(datetime) DEFobjCurrIf(module) @@ -175,10 +169,11 @@ configSettings_t cs_save; /* our saved (scope!) config settings */ /* the counter below counts actions created. It is used to obtain unique IDs for the action. They * should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice * to have during one instance of an rsyslogd run. For example, I use them to name actions when there - * is no better name available. Note that I do NOT recover previous numbers on HUP - we simply keep - * counting. -- rgerhards, 2008-01-29 + * is no better name available. */ -static int iActionNbr = 0; +int iActionNbr = 0; +int bActionReportSuspension = 1; +int bActionReportSuspensionCont = 0; /* tables for interfacing with the v6 config system */ static struct cnfparamdescr cnfparamdescr[] = { @@ -191,6 +186,8 @@ static struct cnfparamdescr cnfparamdescr[] = { { "action.execonlywhenpreviousissuspended", eCmdHdlrBinary, 0 }, /* legacy: actionexeconlywhenpreviousissuspended */ { "action.repeatedmsgcontainsoriginalmsg", eCmdHdlrBinary, 0 }, /* legacy: repeatedmsgcontainsoriginalmsg */ { "action.resumeretrycount", eCmdHdlrInt, 0 }, /* legacy: actionresumeretrycount */ + { "action.reportsuspension", eCmdHdlrBinary, 0 }, + { "action.reportsuspensioncontinuation", eCmdHdlrBinary, 0 }, { "action.resumeinterval", eCmdHdlrInt, 0 } }; static struct cnfparamblk pblk = @@ -224,7 +221,7 @@ static struct cnfparamblk pblk = * a lot. So we simply return the system time. */ static inline time_t -getActNow(action_t *pThis) +getActNow(action_t * const pThis) { assert(pThis != NULL); if(pThis->tActNow == -1) { @@ -253,8 +250,8 @@ actionResetQueueParams(void) cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ cs.iActionQueueSize = 1000; /* size of the main message queue above */ cs.iActionQueueDeqBatchSize = 16; /* default batch size */ - cs.iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ - cs.iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ + cs.iActionQHighWtrMark = -1; /* high water mark for disk-assisted queues */ + cs.iActionQLowWtrMark = -1; /* low water mark for disk-assisted queues */ cs.iActionQDiscardMark = 980; /* begin to discard messages */ cs.iActionQDiscardSeverity = 8; /* discard warning and above */ cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ @@ -265,7 +262,7 @@ actionResetQueueParams(void) cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ cs.iActionQtoEnq = 50; /* timeout for queue enque */ cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ - cs.iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ + cs.iActionQWrkMinMsgs = -1; /* minimum messages per worker needed to start a new one */ cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ cs.iActionQueMaxDiskSpace = 0; cs.iActionQueueDeqSlowdown = 0; @@ -284,7 +281,7 @@ actionResetQueueParams(void) /* destructs an action descriptor object * rgerhards, 2007-08-01 */ -rsRetVal actionDestruct(action_t *pThis) +rsRetVal actionDestruct(action_t * const pThis) { DEFiRet; ASSERT(pThis != NULL); @@ -304,13 +301,13 @@ rsRetVal actionDestruct(action_t *pThis) if(pThis->statsobj != NULL) statsobj.Destruct(&pThis->statsobj); - if(pThis->pMod != NULL) + if(pThis->pModData != NULL) pThis->pMod->freeInstance(pThis->pModData); pthread_mutex_destroy(&pThis->mutAction); - pthread_mutex_destroy(&pThis->mutActExec); d_free(pThis->pszName); d_free(pThis->ppTpl); + d_free(pThis->peParamPassing); finalize_it: d_free(pThis); @@ -318,6 +315,19 @@ finalize_it: } +/* Disable action, this means it will never again be usable + * until rsyslog is reloaded. Use only as a last resort, but + * depends on output module. + * rgerhards, 2007-08-02 + */ +static inline void +actionDisable(action_t *__restrict__ const pThis) +{ + pThis->bDisabled = 1; +} + + + /* create a new action descriptor object * rgerhards, 2007-08-01 * Note that it is vital to set proper initial values as the v6 config @@ -334,14 +344,18 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->iResumeInterval = 30; pThis->iResumeRetryCount = 0; pThis->pszName = NULL; - pThis->bWriteAllMarkMsgs = RSFALSE; + pThis->bWriteAllMarkMsgs = 1; pThis->iExecEveryNthOccur = 0; pThis->iExecEveryNthOccurTO = 0; pThis->iSecsExecOnceInterval = 0; pThis->bExecWhenPrevSusp = 0; pThis->bRepMsgHasMsg = 0; + pThis->bDisabled = 0; + pThis->isTransactional = 0; + pThis->bReportSuspension = -1; /* indicate "not yet set" */ + pThis->bReportSuspensionCont = -1; /* indicate "not yet set" */ pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ - pthread_mutex_init(&pThis->mutActExec, NULL); + pThis->iActionNbr = iActionNbr; pthread_mutex_init(&pThis->mutAction, NULL); INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); @@ -357,13 +371,11 @@ finalize_it: /* action construction finalizer */ rsRetVal -actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) +actionConstructFinalize(action_t *__restrict__ const pThis, struct nvlst *lst) { DEFiRet; uchar pszAName[64]; /* friendly name of our action */ - ASSERT(pThis != NULL); - if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) { /* discard actions will be optimized out */ FINALIZE; @@ -371,35 +383,60 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) /* generate a friendly name for us action stats */ if(pThis->pszName == NULL) { snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr); - } else { - ustrncpy(pszAName, pThis->pszName, sizeof(pszAName)); - pszAName[sizeof(pszAName)-1] = '\0'; /* to be on the save side */ + pThis->pszName = ustrdup(pszAName); + } + + /* cache transactional attribute */ + pThis->isTransactional = pThis->pMod->mod.om.supportsTX; + if(pThis->isTransactional) { + int i; + for(i = 0 ; i < pThis->iNumTpls ; ++i) { + if(pThis->peParamPassing[i] != ACT_STRING_PASSING) { + errmsg.LogError(0, RS_RET_INVLD_OMOD, "action '%s'(%d) is transactional but " + "parameter %d " + "uses invalid paramter passing mode -- disabling " + "action. This is probably caused by a pre-v7 " + "output module that needs upgrade.", + pThis->pszName, pThis->iActionNbr, i); + actionDisable(pThis); + ABORT_FINALIZE(RS_RET_INVLD_OMOD); + + } + } } + /* support statistics gathering */ CHKiRet(statsobj.Construct(&pThis->statsobj)); - CHKiRet(statsobj.SetName(pThis->statsobj, pszAName)); + CHKiRet(statsobj.SetName(pThis->statsobj, pThis->pszName)); STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"), - ctrType_IntCtr, &pThis->ctrProcessed)); + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrProcessed)); STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"), - ctrType_IntCtr, &pThis->ctrFail)); + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrFail)); + + STATSCOUNTER_INIT(pThis->ctrSuspend, pThis->mutCtrSuspend); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrSuspend)); + STATSCOUNTER_INIT(pThis->ctrSuspendDuration, pThis->mutCtrSuspendDuration); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended.duration"), + ctrType_IntCtr, 0, &pThis->ctrSuspendDuration)); + + STATSCOUNTER_INIT(pThis->ctrResume, pThis->mutCtrResume); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("resumed"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrResume)); CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); /* create our queue */ /* generate a friendly name for the queue */ - if(pThis->pszName == NULL) { - snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d queue", - iActionNbr); - } else { - ustrncpy(pszAName, pThis->pszName, sizeof(pszAName)); - pszAName[63] = '\0'; /* to be on the save side */ - } + snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "%s queue", + pThis->pszName); + /* now check if we can run the action in "firehose mode" during stage one of * its processing (that is before messages are enqueued into the action q). * This is only possible if some features, which require strict sequence, are @@ -412,13 +449,13 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) DBGPRINTF("info: firehose mode disabled for action because " "iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n", pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval); - pThis->submitToActQ = doSubmitToActionQComplexBatch; - } else if(pThis->bWriteAllMarkMsgs == RSFALSE) { - /* nearly full-speed submission mode, default case */ - pThis->submitToActQ = doSubmitToActionQNotAllMarkBatch; + pThis->submitToActQ = doSubmitToActionQComplex; + } else if(pThis->bWriteAllMarkMsgs) { + /* full firehose submission mode, default case*/ + pThis->submitToActQ = doSubmitToActionQ; } else { - /* full firehose submission mode */ - pThis->submitToActQ = doSubmitToActionQBatch; + /* nearly full-speed submission mode */ + pThis->submitToActQ = doSubmitToActionQNotAllMark; } /* create queue */ @@ -428,11 +465,11 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, - (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); + processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszAName); qqueueSetpAction(pThis->pQueue, pThis); - if(queueParams == NULL) { /* use legacy params? */ + if(lst == NULL) { /* use legacy params? */ /* ... set some properties ... */ # define setQPROP(func, directive, data) \ CHKiRet_Hdlr(func(pThis->pQueue, data)) { \ @@ -459,6 +496,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark); setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity); setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs); + setQPROP(qqueueSetiNumWorkerThreads, "$ActionQueueWorkerThreads", cs.iActionQueueNumWorkers); setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown); setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown); setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr); @@ -466,7 +504,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) } else { /* we have v6-style config params */ qqueueSetDefaultsActionQueue(pThis->pQueue); - qqueueApplyCnfParam(pThis->pQueue, queueParams); + qqueueApplyCnfParam(pThis->pQueue, lst); } # undef setQPROP @@ -475,6 +513,12 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) qqueueDbgPrint(pThis->pQueue); DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue); + + if(pThis->bUsesMsgPassingMode && pThis->pQueue->qType != QUEUETYPE_DIRECT) { + parser_warnmsg("module %s with message passing mode uses " + "non-direct queue. This most probably leads to undesired " + "results", (char*)modGetName(pThis->pMod)); + } /* and now reset the queue params (see comment in its function header!) */ actionResetQueueParams(); @@ -498,9 +542,9 @@ rsRetVal actionSetGlobalResumeInterval(int iNewVal) * returned string must not be modified. * rgerhards, 2009-05-07 */ -static uchar *getActStateName(action_t *pThis) +static uchar *getActStateName(action_t * const pThis, wti_t * const pWti) { - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: return (uchar*) "rdy"; case ACT_STATE_ITX: @@ -509,8 +553,6 @@ static uchar *getActStateName(action_t *pThis) return (uchar*) "rtry"; case ACT_STATE_SUSP: return (uchar*) "susp"; - case ACT_STATE_DIED: - return (uchar*) "died"; case ACT_STATE_COMM: return (uchar*) "comm"; default: @@ -522,12 +564,12 @@ static uchar *getActStateName(action_t *pThis) /* returns a suitable return code based on action state * rgerhards, 2009-05-07 */ -static rsRetVal getReturnCode(action_t *pThis) +static rsRetVal getReturnCode(action_t * const pThis, wti_t * const pWti) { DEFiRet; ASSERT(pThis != NULL); - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: iRet = RS_RET_OK; break; @@ -543,12 +585,11 @@ static rsRetVal getReturnCode(action_t *pThis) iRet = RS_RET_SUSPENDED; break; case ACT_STATE_SUSP: - case ACT_STATE_DIED: iRet = RS_RET_ACTION_FAILED; break; default: - DBGPRINTF("Invalid action engine state %d, program error\n", - (int) pThis->eState); + DBGPRINTF("Invalid action engine state %u, program error\n", + getActionState(pWti, pThis)); iRet = RS_RET_ERR; break; } @@ -560,44 +601,34 @@ static rsRetVal getReturnCode(action_t *pThis) /* set the action to a new state * rgerhards, 2007-08-02 */ -static inline void actionSetState(action_t *pThis, action_state_t newState) +static inline void +actionSetState(action_t * const pThis, wti_t * const pWti, uint8_t newState) { - pThis->eState = newState; - DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); + setActionState(pWti, pThis, newState); + DBGPRINTF("Action %d transitioned to state: %s\n", + pThis->iActionNbr, getActStateName(pThis, pWti)); } /* Handles the transient commit state. So far, this is * mostly a dummy... * rgerhards, 2007-08-02 */ -static void actionCommitted(action_t *pThis) +static void actionCommitted(action_t * const pThis, wti_t * const pWti) { - actionSetState(pThis, ACT_STATE_RDY); + actionSetState(pThis, pWti, ACT_STATE_RDY); } /* set action to "rtry" state. * rgerhards, 2007-08-02 */ -static void actionRetry(action_t *pThis) +static void actionRetry(action_t * const pThis, wti_t * const pWti) { - actionSetState(pThis, ACT_STATE_RTRY); - pThis->iResumeOKinRow++; + actionSetState(pThis, pWti, ACT_STATE_RTRY); + incActionResumeInRow(pWti, pThis); } - -/* Disable action, this means it will never again be usable - * until rsyslog is reloaded. Use only as a last resort, but - * depends on output module. - * rgerhards, 2007-08-02 - */ -static void actionDisable(action_t *pThis) -{ - actionSetState(pThis, ACT_STATE_DIED); -} - - -/* Suspend action, this involves changing the acton state as well +/* Suspend action, this involves changing the action state as well * as setting the next retry time. * if we have more than 10 retries, we prolong the * retry interval. If something is really stalled, it will @@ -605,17 +636,50 @@ static void actionDisable(action_t *pThis) * CPU time. TODO: maybe a config option for that? * rgerhards, 2007-08-02 */ -static inline void actionSuspend(action_t *pThis) +static inline void +actionSuspend(action_t * const pThis, wti_t * const pWti) { time_t ttNow; + int suspendDuration; + char timebuf[32]; + + /* we need to defer setting the action's own bReportSuspension state until + * after the full config has been processed. So the most simple case to do + * that is here. It's not a performance problem, as it happens infrequently. + * it's not a threading race problem, as always the same value will be written. + */ + if(pThis->bReportSuspension == -1) + pThis->bReportSuspension = bActionReportSuspension; + if(pThis->bReportSuspensionCont == -1) { + pThis->bReportSuspensionCont = bActionReportSuspensionCont; + if(pThis->bReportSuspensionCont == -1) + pThis->bReportSuspension = 1; + } /* note: we can NOT use a cached timestamp, as time may have evolved * since caching, and this would break logic (and it actually did so!) */ datetime.GetTime(&ttNow); - pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); - actionSetState(pThis, ACT_STATE_SUSP); - DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry); + suspendDuration = pThis->iResumeInterval * (getActionNbrResRtry(pWti, pThis) / 10 + 1); + pThis->ttResumeRtry = ttNow + suspendDuration; + actionSetState(pThis, pWti, ACT_STATE_SUSP); + pThis->ctrSuspendDuration += suspendDuration; + if(getActionNbrResRtry(pWti, pThis) == 0) { + STATSCOUNTER_INC(pThis->ctrSuspend, pThis->mutCtrSuspend); + } + + if( pThis->bReportSuspensionCont + || (pThis->bReportSuspension && getActionNbrResRtry(pWti, pThis) == 0) ) { + ctime_r(&pThis->ttResumeRtry, timebuf); + timebuf[strlen(timebuf)-1] = '\0'; /* strip LF */ + errmsg.LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, + "action '%s' suspended, next retry is %s", + pThis->pszName, timebuf); + } + DBGPRINTF("action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d, " + "duration %d\n", + pThis->pszName, (long long) pThis->ttResumeRtry, (long long) ttNow, + getActionNbrResRtry(pWti, pThis), suspendDuration); } @@ -627,15 +691,15 @@ static inline void actionSuspend(action_t *pThis) * entry point. This is invalid, but has harsh consequences: it will cause the rsyslog * engine to go into a tight loop. That obviously is not acceptable. As such, we track the * count of iterations that a tryResume returning RS_RET_OK is immediately followed by - * an unsuccessful call to doAction(). If that happens more than 1,000 times, we assume + * an unsuccessful call to doAction(). If that happens more than 10 times, we assume * the return acutally is a RS_RET_SUSPENDED. In order to go through the various - * resumption stages, we do this for every 1000 requests. This magic number 1000 may + * resumption stages, we do this for every 10 requests. This magic number 10 may * not be the most appropriate, but it should be thought of a "if nothing else helps" * kind of facility: in the first place, the module should return a proper indication * of its inability to recover. -- rgerhards, 2010-04-26. */ -static inline rsRetVal -actionDoRetry(action_t *pThis, int *pbShutdownImmediate) +static rsRetVal +actionDoRetry(action_t * const pThis, wti_t * const pWti) { int iRetries; int iSleepPeriod; @@ -645,31 +709,40 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) ASSERT(pThis != NULL); iRetries = 0; - while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { - DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); - iRet = pThis->pMod->tryResume(pThis->pModData); - DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); - if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { + while((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { + DBGPRINTF("actionDoRetry: %s enter loop, iRetries=%d\n", pThis->pszName, iRetries); + iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); + DBGPRINTF("actionDoRetry: %s action->tryResume returned %d\n", pThis->pszName, iRet); + if((getActionResumeInRow(pWti, pThis) > 9) && (getActionResumeInRow(pWti, pThis) % 10 == 0)) { bTreatOKasSusp = 1; - pThis->iResumeOKinRow = 0; + setActionResumeInRow(pWti, pThis, 0); } else { bTreatOKasSusp = 0; } if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { - DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet); - actionSetState(pThis, ACT_STATE_RDY); + DBGPRINTF("actionDoRetry: %s had success RDY again (iRet=%d)\n", + pThis->pszName, iRet); + if(pThis->bReportSuspension) { + errmsg.LogMsg(0, RS_RET_OK, LOG_INFO, "action '%s' " + "resumed (module '%s')", + pThis->pszName, pThis->pMod->pszName); + } + setActionJustResumed(pWti, pThis, 1); + actionSetState(pThis, pWti, ACT_STATE_RDY); } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { /* max retries reached? */ - DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n", - pThis->iResumeRetryCount, iRetries); + DBGPRINTF("actionDoRetry: %s check for max retries, iResumeRetryCount " + "%d, iRetries %d\n", + pThis->pszName, pThis->iResumeRetryCount, iRetries); if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { - actionSuspend(pThis); + actionSuspend(pThis, pWti); + if(getActionNbrResRtry(pWti, pThis) < 20) + incActionNbrResRtry(pWti, pThis); } else { - ++pThis->iNbrResRtry; ++iRetries; iSleepPeriod = pThis->iResumeInterval; srSleep(iSleepPeriod, 0); - if(*pbShutdownImmediate) { + if(*pWti->pbShutdownImmediate) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } } @@ -678,8 +751,8 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) } } - if(pThis->eState == ACT_STATE_RDY) { - pThis->iNbrResRtry = 0; + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { + setActionNbrResRtry(pWti, pThis, 0); } finalize_it: @@ -687,17 +760,32 @@ finalize_it: } +static rsRetVal +actionCheckAndCreateWrkrInstance(action_t * const pThis, wti_t * const pWti) +{ + DEFiRet; + if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) { + DBGPRINTF("wti %p: we need to create a new action worker instance for " + "action %d\n", pWti, pThis->iActionNbr); + CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), + pThis->pModData)); + pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; + setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */ + } +finalize_it: + RETiRet; +} + /* try to resume an action -- rgerhards, 2007-08-02 * changed to new action state engine -- rgerhards, 2009-05-07 */ -static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) +static rsRetVal +actionTryResume(action_t * const pThis, wti_t * const pWti) { DEFiRet; time_t ttNow = NO_TIME_PROVIDED; - ASSERT(pThis != NULL); - - if(pThis->eState == ACT_STATE_SUSP) { + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used * to use the action timestamp, but in this case we will never reach a @@ -707,19 +795,19 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) */ datetime.GetTime(&ttNow); /* cache "now" */ if(ttNow >= pThis->ttResumeRtry) { - actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ + actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */ } } - if(pThis->eState == ACT_STATE_RTRY) { + if(getActionState(pWti, pThis) == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); - CHKiRet(actionDoRetry(pThis, pbShutdownImmediate)); + CHKiRet(actionDoRetry(pThis, pWti)); } - if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { + if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) { DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n", - pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + pThis, getActStateName(pThis, pWti), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); } finalize_it: @@ -731,24 +819,25 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate) +static inline rsRetVal +actionPrepare(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) { DEFiRet; - assert(pThis != NULL); - CHKiRet(actionTryResume(pThis, pbShutdownImmediate)); + CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); + CHKiRet(actionTryResume(pThis, pWti)); /* if we are now ready, we initialize the transaction and advance * action state accordingly */ - if(pThis->eState == ACT_STATE_RDY) { - iRet = pThis->pMod->mod.om.beginTransaction(pThis->pModData); + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { + iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionSetState(pThis, ACT_STATE_ITX); + actionSetState(pThis, pWti, ACT_STATE_ITX); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -762,10 +851,11 @@ finalize_it: } +#if 0 // TODO: remove? /* debug-print the contents of an action object * rgerhards, 2007-08-02 */ -rsRetVal actionDbgPrint(action_t *pThis) +static rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; char *sz; @@ -775,11 +865,12 @@ rsRetVal actionDbgPrint(action_t *pThis) dbgprintf("\n"); dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); - if(pThis->eState == ACT_STATE_SUSP) { +#if 0 // do we need this ??? + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { dbgprintf("\tresume next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); } - dbgprintf("\tState: %s\n", getActStateName(pThis)); +#endif dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); if(pThis->submitToActQ == doSubmitToActionQComplexBatch) { sz = "slow, but feature-rich"; @@ -795,45 +886,55 @@ rsRetVal actionDbgPrint(action_t *pThis) RETiRet; } +#endif /* prepare the calling parameters for doAction() * rgerhards, 2009-05-07 */ static rsRetVal -prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow) +prepareDoActionParams(action_t * __restrict__ const pAction, + wti_t * __restrict__ const pWti, + msg_t *__restrict__ const pMsg, + struct syslogTime *ttNow) { int i; - msg_t *pMsg; struct json_object *json; + actWrkrIParams_t *iparams; + actWrkrInfo_t *__restrict__ pWrkrInfo; DEFiRet; - ASSERT(pAction != NULL); - ASSERT(pElem != NULL); - - pMsg = pElem->pMsg; - /* here we must loop to process all requested strings */ - for(i = 0 ; i < pAction->iNumTpls ; ++i) { - switch(pAction->eParamPassing) { + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + if(pAction->isTransactional) { + CHKiRet(wtiNewIParam(pWti, pAction, &iparams)); + for(i = 0 ; i < pAction->iNumTpls ; ++i) { + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, + &actParam(iparams, pAction->iNumTpls, 0, i), + ttNow)); + } + } else { + for(i = 0 ; i < pAction->iNumTpls ; ++i) { + switch(pAction->peParamPassing[i]) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]), - &pElem->staticLenStrings[i], ttNow)); - pElem->staticActParams[i] = pElem->staticActStrings[i]; + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, + &(pWrkrInfo->p.nontx.actParams[i]), + ttNow)); break; case ACT_ARRAY_PASSING: - CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow)); + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, + (uchar***) &(pWrkrInfo->p.nontx.actParams[i].param), ttNow)); break; case ACT_MSG_PASSING: - pElem->staticActParams[i] = (void*) pMsg; + pWrkrInfo->p.nontx.actParams[i].param = (void*) pMsg; break; case ACT_JSON_PASSING: CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow)); - pElem->staticActParams[i] = (void*) json; + pWrkrInfo->p.nontx.actParams[i].param = (void*) json; break; - default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n", - (int) pAction->eParamPassing); - assert(0); /* software bug if this happens! */ + default:dbgprintf("software bug/error: unknown pAction->peParamPassing[%d] %d in prepareDoActionParams\n", + i, (int) pAction->peParamPassing[i]); break; + } } } @@ -842,96 +943,87 @@ finalize_it: } -/* free a batches ressources, but not string buffers (because they will - * most probably be reused). String buffers are only deleted upon final - * destruction of the batch. - * This function here must be called only when the batch is actually no - * longer used, also not for retrying actions or such. It invalidates - * buffers. - * rgerhards, 2010-12-17 - */ -static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) +static void +releaseDoActionParams(action_t *__restrict__ const pAction, wti_t *__restrict__ const pWti) { int jArr; - int i, j; - batch_obj_t *pElem; + int j; + actWrkrInfo_t *__restrict__ pWrkrInfo; uchar ***ppMsgs; - DEFiRet; - - ASSERT(pAction != NULL); - if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING) - goto done; /* we need to do nothing with these types! */ - - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - switch(pAction->eParamPassing) { - case ACT_ARRAY_PASSING: - ppMsgs = (uchar***) pElem->staticActParams; - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - if(((uchar**)ppMsgs)[j] != NULL) { - jArr = 0; - while(ppMsgs[j][jArr] != NULL) { - d_free(ppMsgs[j][jArr]); - ppMsgs[j][jArr] = NULL; - ++jArr; - } - d_free(((uchar**)ppMsgs)[j]); - ((uchar**)ppMsgs)[j] = NULL; - } + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + switch(pAction->peParamPassing[j]) { + case ACT_ARRAY_PASSING: + ppMsgs = (uchar***) pWrkrInfo->p.nontx.actParams[0].param; + if(((uchar**)ppMsgs)[j] != NULL) { + jArr = 0; + while(ppMsgs[j][jArr] != NULL) { + free(ppMsgs[j][jArr]); + ppMsgs[j][jArr] = NULL; + ++jArr; } - break; - case ACT_JSON_PASSING: - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - json_object_put((struct json_object*) - pElem->staticActParams[j]); - pElem->staticActParams[j] = NULL; - } - break; - case ACT_STRING_PASSING: - case ACT_MSG_PASSING: - /* can never happen, just to keep compiler happy! */ - break; + free(((uchar**)ppMsgs)[j]); + ((uchar**)ppMsgs)[j] = NULL; } + break; + case ACT_JSON_PASSING: + json_object_put((struct json_object*) + pWrkrInfo->p.nontx.actParams[j].param); + pWrkrInfo->p.nontx.actParams[j].param = NULL; + break; + case ACT_STRING_PASSING: + case ACT_MSG_PASSING: + /* no need to do anything with these */ + break; } } -done: RETiRet; + return; } - -/* call the DoAction output plugin entry point - * rgerhards, 2008-01-28 +/* This is used in resume processing. We only finally know that a resume + * worked when we have been able to actually process a messages. As such, + * we need to do some cleanup and status tracking in that case. */ -rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) +static void +actionSetActionWorked(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) { - DEFiRet; - - ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); - - DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); + setActionResumeInRow(pWti, pThis, 0); + + if(getActionJustResumed(pWti, pThis)) { + /* OK, we *really* could resume, so tell user! */ + if(pThis->bReportSuspension) { + errmsg.LogMsg(0, RS_RET_RESUMED, LOG_INFO, "action '%s' " + "resumed (module '%s')", + pThis->pszName, pThis->pMod->pszName); + } + setActionJustResumed(pWti, pThis, 0); + } +} - pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); - switch(iRet) { +static rsRetVal +handleActionExecResult(action_t *__restrict__ const pThis, + wti_t *__restrict__ const pWti, + const rsRetVal ret) +{ + DEFiRet; + switch(ret) { case RS_RET_OK: - actionCommitted(pThis); - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + actionCommitted(pThis, pWti); + actionSetActionWorked(pThis, pWti); /* we had a successful call! */ break; case RS_RET_DEFER_COMMIT: - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + actionSetActionWorked(pThis, pWti); /* we had a successful call! */ /* we are done, action state remains the same */ break; case RS_RET_PREVIOUS_COMMITTED: /* action state remains the same, but we had a commit. */ pThis->bHadAutoCommit = 1; - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + actionSetActionWorked(pThis, pWti); /* we had a successful call! */ break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -939,70 +1031,134 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) default:/* permanent failure of this message - no sense in retrying. This is * not yet handled (but easy TODO) */ + iRet = ret; FINALIZE; } - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; } +/* call the DoAction output plugin entry point + * rgerhards, 2008-01-28 + */ +static rsRetVal +actionCallDoAction(action_t *__restrict__ const pThis, + actWrkrIParams_t *__restrict__ const iparams, + wti_t *__restrict__ const pWti) +{ + uchar *param[CONF_OMOD_NUMSTRINGS_MAXSIZE]; + int i; + DEFiRet; + + DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", + getActStateName(pThis, pWti), pThis->iActionNbr); + + pThis->bHadAutoCommit = 0; + /* for this interface, we need to emulate the old style way + * of parameter passing. + */ + for(i = 0 ; i < pThis->iNumTpls ; ++i) { + param[i] = actParam(iparams, pThis->iNumTpls, 0, i).param; + } + + iRet = pThis->pMod->mod.om.doAction(param, + pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); + iRet = handleActionExecResult(pThis, pWti, iRet); + RETiRet; +} + + +/* call the commitTransaction output plugin entry point */ +static rsRetVal +actionCallCommitTransaction(action_t * const pThis, + const actWrkrInfo_t *const wrkrInfo, + wti_t *const pWti) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + DBGPRINTF("entering actionCallCommitTransaction(), state: %s, actionNbr %d, " + "nMsgs %u\n", + getActStateName(pThis, pWti), pThis->iActionNbr, + wrkrInfo->p.tx.currIParam); + + iRet = pThis->pMod->mod.om.commitTransaction( + pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData, + wrkrInfo->p.tx.iparams, wrkrInfo->p.tx.currIParam); + iRet = handleActionExecResult(pThis, pWti, iRet); + RETiRet; +} + /* process a message * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ -static inline rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate) +rsRetVal +actionProcessMessage(action_t * const pThis, void *actParams, wti_t * const pWti) { DEFiRet; - ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); - - CHKiRet(actionPrepare(pThis, pbShutdownImmediate)); + CHKiRet(actionPrepare(pThis, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) - pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); - if(pThis->eState == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); + pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pWti->pbShutdownImmediate); + if(getActionState(pWti, pThis) == ACT_STATE_ITX) + CHKiRet(actionCallDoAction(pThis, actParams, pWti)); - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; } -/* finish processing a batch. Most importantly, that means we commit if we - * need to do so. - * rgerhards, 2008-01-28 - */ +/* the following functions simulates a potential future new omo callback */ static rsRetVal -finishBatch(action_t *pThis, batch_t *pBatch) +doTransaction(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) { + actWrkrInfo_t *wrkrInfo; int i; DEFiRet; - ASSERT(pThis != NULL); - - if(pThis->eState == ACT_STATE_RDY) { - /* we just need to flag the batch as commited */ - FINALIZE; /* nothing to do */ + wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); + if(pThis->pMod->mod.om.commitTransaction != NULL) { + DBGPRINTF("doTransaction: have commitTransaction IF, using that, pWrkrInfo %p\n", wrkrInfo); + CHKiRet(actionCallCommitTransaction(pThis, wrkrInfo, pWti)); + } else { /* note: this branch is for compatibility with old TX modules */ + DBGPRINTF("doTransaction: action %d, currIParam %d\n", + pThis->iActionNbr, wrkrInfo->p.tx.currIParam); + for(i = 0 ; i < wrkrInfo->p.tx.currIParam ; ++i) { + /* Note: we provide the message's base iparam - actionProcessMessage() + * uses this as *base* address. + */ + iRet = actionProcessMessage(pThis, + &actParam(wrkrInfo->p.tx.iparams, pThis->iNumTpls, i, 0), pWti); + } } +finalize_it: + RETiRet; +} + + +/* Commit try committing (do not handle retry processing and such) */ +static rsRetVal +actionTryCommit(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) +{ + DEFiRet; + + doTransaction(pThis, pWti); - CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate)); - if(pThis->eState == ACT_STATE_ITX) { - iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData); + CHKiRet(actionPrepare(pThis, pWti)); + if(getActionState(pWti, pThis) == ACT_STATE_ITX) { + iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionCommitted(pThis); - /* flag messages as committed */ - for(i = 0 ; i < pBatch->nElem ; ++i) { - batchSetElemState(pBatch, i, BATCH_STATE_COMM); - pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */ - } + actionCommitted(pThis, pWti); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -1010,12 +1166,12 @@ finishBatch(action_t *pThis, batch_t *pBatch) case RS_RET_DEFER_COMMIT: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; case RS_RET_PREVIOUS_COMMITTED: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; default:/* permanent failure of this message - no sense in retrying. This is * not yet handled (but easy TODO) @@ -1023,325 +1179,173 @@ finishBatch(action_t *pThis, batch_t *pBatch) FINALIZE; } } - iRet = getReturnCode(pThis); - -finalize_it: - RETiRet; -} - - -/* try to submit a partial batch of elements. - * rgerhards, 2009-05-12 - */ -static inline rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) -{ - int i; - int iElemProcessed; - int iCommittedUpTo; - msg_t *pMsg; - rsRetVal localRet; - DEFiRet; - - assert(pBatch != NULL); - assert(pnElem != NULL); - - i = pBatch->iDoneUpTo; /* all messages below that index are processed */ - iElemProcessed = 0; - iCommittedUpTo = i; - DBGPRINTF("tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem); - while(iElemProcessed <= *pnElem && i < pBatch->nElem) { - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - /* NOTE: do NOT extend the filter below! Anything else must be done on the - * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15 - */ - if(batchIsValidElem(pBatch, i)) { - pMsg = pBatch->pElem[i].pMsg; - localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, - pBatch->pbShutdownImmediate); - DBGPRINTF("action %p call returned %d\n", pAction, localRet); - /* Note: we directly modify the batch object state, because we know that - * wo do not overwrite BATCH_STATE_DISC indicators! - */ - if(localRet == RS_RET_OK) { - /* mark messages as committed */ - while(iCommittedUpTo <= i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; - } - } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { - /* mark messages as committed */ - while(iCommittedUpTo < i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; - } - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DEFER_COMMIT) { - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DISCARDMSG) { - pBatch->eltState[i] = BATCH_STATE_DISC; - } else { - dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n", - localRet, *pnElem, iCommittedUpTo); - iRet = localRet; - FINALIZE; - } - } - ++i; - ++iElemProcessed; - } + iRet = getReturnCode(pThis, pWti); finalize_it: - if(pBatch->iDoneUpTo != iCommittedUpTo) { - pBatch->iDoneUpTo = iCommittedUpTo; - } + pWti->actWrkrInfo[pThis->iActionNbr].p.tx.currIParam = 0; /* reset to beginning */ RETiRet; } -/* submit a batch for actual action processing. - * The first nElem elements are processed. This function calls itself - * recursively if it needs to handle errors. - * Note: we don't need the number of the first message to be processed as a parameter, - * because this is kept track of inside the batch itself (iDoneUpTo). - * rgerhards, 2009-05-12 +/* Note: we currently need to return an iRet, as this is used in + * direct mode. TODO: However, it may be worth further investigating this, + * as it looks like there is no ultimate consumer of this code. + * rgerhards, 2013-11-06 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +actionCommit(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) { - int i; - int bDone; - rsRetVal localRet; - int wasDoneTo; + sbool bDone; DEFiRet; - assert(pBatch != NULL); + if(!pThis->isTransactional || + pWti->actWrkrInfo[pThis->iActionNbr].p.tx.currIParam == 0 || + getActionState(pWti, pThis) == ACT_STATE_SUSP + ) { + FINALIZE; + } - DBGPRINTF("submitBatch: enter, nElem %d\n", nElem); - wasDoneTo = pBatch->iDoneUpTo; + /* even more TODO: + This is the place where retry processing needs to go in. If the action + permanently fails, we should - as a new feature - add the capability to + write an error file. This is already done be omelasticsearch, and IMHO + pretty useful. + For the time being, I do NOT implement all of this (not even retry!) + as I want to get the rest of the engine to SISD (non-SIMD ;)) so that + I know any potential suprises and complications that arise out of this. + When this is done, I can come back here and complete this work. Obviously, + many features do not work in the mean time (but it is not planned to release + any of these partial implementations). + rgerhards, 2013-11-04 + */ bDone = 0; do { - localRet = tryDoAction(pAction, pBatch, &nElem); - if(localRet == RS_RET_FORCE_TERM) { + iRet = actionTryCommit(pThis, pWti); + DBGPRINTF("actionCommit, in retry loop, iRet %d\n", iRet); + if(iRet == RS_RET_FORCE_TERM) { ABORT_FINALIZE(RS_RET_FORCE_TERM); - } - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { - /* try commit transaction, once done, we can simply do so as if - * that return state was returned from tryDoAction(). - */ - localRet = finishBatch(pAction, pBatch); - } - - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { + } else if(iRet == RS_RET_OK || + iRet == RS_RET_SUSPENDED || + iRet == RS_RET_ACTION_FAILED) { bDone = 1; - } else if(localRet == RS_RET_SUSPENDED) { - DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n"); - /* do nothing, this will retry the full batch */ - } else if(localRet == RS_RET_ACTION_FAILED) { - /* in this case, everything not yet committed is BAD */ - for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && pBatch->eltState[i] != BATCH_STATE_COMM ) { - pBatch->eltState[i] = BATCH_STATE_BAD; - pBatch->pElem[i].bPrevWasSuspended = 1; - STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); - } - } + } + if(getActionState(pWti, pThis) == ACT_STATE_RDY || + getActionState(pWti, pThis) == ACT_STATE_SUSP) { bDone = 1; - } else { - if(nElem == 1) { - batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD); - bDone = 1; - } else { - /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ - DBGPRINTF("submitBatch recursing trying to find and exclude the culprit " - "for iRet %d\n", localRet); - submitBatch(pAction, pBatch, nElem / 2); - submitBatch(pAction, pBatch, nElem - (nElem / 2)); - bDone = 1; - } } - } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */ - - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - + } while(!bDone); finalize_it: RETiRet; } - -/* copy "active" array of batch, as we need to modify it. The caller - * must make sure the new array is freed and the orginal batch - * pointer is restored (thus the caller must save it). If active - * is currently NULL, this is properly handled. - * Note: the batches active pointer is modified, so it must be - * saved BEFORE calling this function! - * rgerhards, 2012-09-12 - */ -static rsRetVal -copyActive(batch_t *pBatch) +/* Commit all active transactions in *DIRECT mode* */ +void +actionCommitAllDirect(wti_t *__restrict__ const pWti) { - sbool *active; - DEFiRet; + int i; + action_t *pAction; - CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); - if(pBatch->active == NULL) - memset(active, 1, batchNumMsgs(pBatch)); - else - memcpy(active, pBatch->active, batchNumMsgs(pBatch)); - pBatch->active = active; -finalize_it: - RETiRet; + for(i = 0 ; i < iActionNbr ; ++i) { + pAction = pWti->actWrkrInfo[i].pAction; + if(pAction == NULL) + continue; + DBGPRINTF("actionCommitAll: action %d, state %u, nbr to commit %d " + "isTransactional %d\n", + i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo->p.tx.currIParam, + pAction->isTransactional); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + actionCommit(pAction, pWti); + } } -/* The following function prepares a batch for processing, that it is - * reinitializes batch states, generates strings and does everything else - * that needs to be done in order to make the batch ready for submission to - * the actual output module. Note that we look at the precomputed - * filter OK condition and process only those messages, that actually matched - * the filter. - * rgerhards, 2010-06-14 +/* process a single message. This is both called if we run from the + * cosumer side of an action queue as well as directly from the main + * queue thread if the action queue is set to "direct". */ -static inline rsRetVal -prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr) +static rsRetVal +processMsgMain(action_t *__restrict__ const pAction, + wti_t *__restrict__ const pWti, + msg_t *__restrict__ const pMsg, + struct syslogTime *ttNow) { - int i; - batch_obj_t *pElem; - struct syslogTime ttNow; DEFiRet; - /* indicate we have not yet read the date */ - ttNow.year = 0; - - pBatch->iDoneUpTo = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - pBatch->eltState[i] = BATCH_STATE_RDY; - if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) { - /* make sure we have our copy of "active" array */ - if(!*bMustRestoreActivePtr) { - *activeSave = pBatch->active; - copyActive(pBatch); - } - pBatch->active[i] = RSFALSE; - } - } + if(pAction->bExecWhenPrevSusp && !pWti->execState.bPrevWasSuspended) { + DBGPRINTF("action %d: NOT executing, as previous action was " + "not suspended\n", pAction->iActionNbr); + FINALIZE; } - RETiRet; -} - -/* receive a batch and process it. This includes retry handling. - * rgerhards, 2009-05-12 - */ -static inline rsRetVal -processAction(action_t *pAction, batch_t *pBatch) -{ - DEFiRet; + iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow); - assert(pBatch != NULL); - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); - iRet = finishBatch(pAction, pBatch); + if(pAction->isTransactional) { + pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction; + DBGPRINTF("action %d is transactional - executing in commit phase\n", pAction->iActionNbr); + actionPrepare(pAction, pWti); + iRet = getReturnCode(pAction, pWti); + FINALIZE; + } + iRet = actionProcessMessage(pAction, + pWti->actWrkrInfo[pAction->iActionNbr].p.nontx.actParams, + pWti); + if(pAction->bNeedReleaseBatch) + releaseDoActionParams(pAction, pWti); finalize_it: + if(iRet == RS_RET_OK) { + if(pWti->execState.bDoAutoCommit) + iRet = actionCommit(pAction, pWti); + } + pWti->execState.bPrevWasSuspended = (iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED); RETiRet; } - -#pragma GCC diagnostic ignored "-Wempty-body" -/* receive an array of to-process user pointers and submit them - * for processing. - * rgerhards, 2009-04-22 +/* This entry point is called by the ACTION queue (not main queue!) */ static rsRetVal -processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) +processBatchMain(void *__restrict__ const pVoid, + batch_t *__restrict__ const pBatch, + wti_t *__restrict__ const pWti) { - int *pbShutdownImmdtSave; - sbool *activeSave; - int bMustRestoreActivePtr = 0; - rsRetVal localRet; + action_t *__restrict__ const pAction = (action_t*__restrict__ const) pVoid; + int i; + struct syslogTime ttNow; DEFiRet; - assert(pBatch != NULL); - - if(pbShutdownImmediate != NULL) { - pbShutdownImmdtSave = pBatch->pbShutdownImmediate; - pBatch->pbShutdownImmediate = pbShutdownImmediate; - } - CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); - - /* We now must guard the output module against execution by multiple threads. The - * plugin interface specifies that output modules must not be thread-safe (except - * if they notify us they are - functionality not yet implemented...). - * rgerhards, 2008-01-30 - */ - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - - iRet = processAction(pAction, pBatch); - - pthread_cleanup_pop(1); /* unlock mutex */ - - /* even if processAction failed, we need to release the batch (else we - * have a memory leak). So we do this first, and then check if we need to - * return an error code. If so, the code from processAction has priority. - * rgerhards, 2010-12-17 - */ - localRet = releaseBatch(pAction, pBatch); + wtiResetExecState(pWti, pBatch); + /* indicate we have not yet read the date */ + ttNow.year = 0; - if(iRet == RS_RET_OK) - iRet = localRet; - - if(bMustRestoreActivePtr) { - free(pBatch->active); - pBatch->active = activeSave; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*pWti->pbShutdownImmediate ; ++i) { + if(batchIsValidElem(pBatch, i)) { + iRet = processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow); + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + } } -finalize_it: - if(pbShutdownImmediate != NULL) - pBatch->pbShutdownImmediate = pbShutdownImmdtSave; + iRet = actionCommit(pAction, pWti); RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" -/* call the HUP handler for a given action, if such a handler is defined. The - * action mutex is locked, because the HUP handler most probably needs to modify - * some internal state information. - * rgerhards, 2008-10-22 +/* call the HUP handler for a given action, if such a handler is defined. + * Note that the action must be able to service HUP requests concurrently + * to any current doAction() processing. */ -#pragma GCC diagnostic ignored "-Wempty-body" rsRetVal -actionCallHUPHdlr(action_t *pAction) +actionCallHUPHdlr(action_t * const pAction) { DEFiRet; ASSERT(pAction != NULL); DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP); - if(pAction->pMod->doHUP == NULL) { - FINALIZE; /* no HUP handler, so we are done ;) */ + if(pAction->pMod->doHUP != NULL) { + CHKiRet(pAction->pMod->doHUP(pAction->pModData)); } - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - CHKiRet(pAction->pMod->doHUP(pAction->pModData)); - pthread_cleanup_pop(1); /* unlock mutex */ - finalize_it: RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" /* set the action message queue mode @@ -1376,27 +1380,28 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT /* This submits the message to the action queue in case we do NOT need to handle repeat * message processing. That case permits us to gain lots of freedom during processing - * and thus speed. This is also utilized to submit messages in complex case once + * and thus speed. This is also utilized to submit messages in more complex cases once * the complex logic has been applied ;) * rgerhards, 2010-06-08 */ -static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +static rsRetVal +doSubmitToActionQ(action_t * const pAction, wti_t * const pWti, msg_t *pMsg) { + struct syslogTime ttNow; // TODO: think if we can buffer this in pWti DEFiRet; - if(pAction->eState == ACT_STATE_DIED) { - DBGPRINTF("action %p died, do NOT execute\n", pAction); - FINALIZE; - } + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg)); - else + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { + ttNow.year = 0; + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow); + } else {/* in this case, we do single submits to the queue. + * TODO: optimize this, we may do at least a multi-submit! + */ iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); + } -finalize_it: RETiRet; } @@ -1409,7 +1414,7 @@ finalize_it: * be filtered out before calling us (what is done currently!). */ rsRetVal -actionWriteToAction(action_t *pAction, msg_t *pMsg) +actionWriteToAction(action_t * const pAction, msg_t *pMsg, wti_t * const pWti) { DEFiRet; @@ -1464,44 +1469,46 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pMsg); + iRet = doSubmitToActionQ(pAction, pWti, pMsg); finalize_it: RETiRet; } -/* helper to actonCallAction, mostly needed because of this damn - * pthread_cleanup_push() POSIX macro... +/* Call configured action, most complex case with all features supported (and thus slow). + * rgerhards, 2010-06-08 */ -static inline rsRetVal -doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) +#pragma GCC diagnostic ignored "-Wempty-body" +static rsRetVal +doSubmitToActionQComplex(action_t * const pAction, wti_t * const pWti, msg_t *pMsg) { - msg_t *pMsg; DEFiRet; - pMsg = pBatch->pElem[idxBtch].pMsg; + d_pthread_mutex_lock(&pAction->mutAction); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); + DBGPRINTF("Called action %p (complex case), logging to %s\n", + pAction, module.GetStateName(pAction->pMod)); + pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ + // TODO: can we optimize the "now" handling again (was batch, I guess...)? /* don't output marks to recently written outputs */ - if(pAction->bWriteAllMarkMsgs == RSFALSE + if(pAction->bWriteAllMarkMsgs == 0 && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { ABORT_FINALIZE(RS_RET_OK); } /* call the output driver */ - iRet = actionWriteToAction(pAction, pMsg); + iRet = actionWriteToAction(pAction, pMsg, pWti); finalize_it: - /* we need to update the batch to handle failover processing correctly */ - if(iRet == RS_RET_OK) { - pBatch->pElem[idxBtch].bPrevWasSuspended = 0; - } else if(iRet == RS_RET_ACTION_FAILED) { - pBatch->pElem[idxBtch].bPrevWasSuspended = 1; - } + d_pthread_mutex_unlock(&pAction->mutAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ RETiRet; } +#pragma GCC diagnostic warning "-Wempty-body" /* helper to activateActions, it activates a specific action. @@ -1509,7 +1516,7 @@ finalize_it: DEFFUNC_llExecFunc(doActivateActions) { rsRetVal localRet; - action_t *pThis = (action_t*) pData; + action_t * const pThis = (action_t*) pData; BEGINfunc localRet = qqueueStart(pThis->pQueue); if(localRet != RS_RET_OK) { @@ -1550,201 +1557,48 @@ activateActions(void) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQNotAllMark(action_t * const pAction, wti_t * const pWti, msg_t * const pMsg) { - time_t now = 0; + int doProcess = 1; time_t lastAct; - int i; - sbool *activeSave; - DEFiRet; - - activeSave = pBatch->active; - copyActive(pBatch); - - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i]) - continue; - if(now == 0) { - now = datetime.GetTime(NULL); /* good time call - the only one done */ - } - /* CAS loop, we write back a bit early, but that's OK... */ - /* we use reception time, not dequeue time - this is considered more appropriate and - * also faster ;) -- rgerhards, 2008-09-17 */ - do { - lastAct = pAction->f_time; - if(pBatch->pElem[i].pMsg->msgFlags & MARK) { - if((now - lastAct) < MarkInterval / 2) { - pBatch->active[i] = 0; - DBGPRINTF("batch item %d: action was recently called, ignoring " - "mark message\n", i); - break; /* do not update timestamp for non-written mark messages */ - } - } - } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, - pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0); - if(pBatch->active[i]) { - DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", - i, module.GetStateName(pAction->pMod)); - } - } - - iRet = doSubmitToActionQBatch(pAction, pBatch); - - free(pBatch->active); - pBatch->active = activeSave; - - RETiRet; -} - -static inline void -countStatsBatchEnq(action_t *pAction, batch_t *pBatch) -{ - int i; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - } - } -} - - -/* enqueue a batch in direct mode. We have put this into its own function just to avoid - * cluttering the actual submit function. - * rgerhards, 2011-06-16 - */ -static inline rsRetVal -doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) -{ - sbool bNeedSubmit; - sbool *activeSave; - int i; DEFiRet; - activeSave = pBatch->active; - copyActive(pBatch); - - /* note: for direct mode, we need to adjust the filter property. For non-direct - * this is not necessary, because in that case we enqueue only what actually needs - * to be processed. + /* TODO: think about the whole logic. If messages come in out of order, things + * tend to become a bit unreliable. On the other hand, this only happens if we have + * very high traffic, in which this use case here is not really affected (as the + * MarkInterval is pretty corase). */ - if(pAction->bExecWhenPrevSusp) { - bNeedSubmit = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if(!pBatch->pElem[i].bPrevWasSuspended) { - DBGPRINTF("action enq stage: change active to 0 due to " - "failover case in elem %d\n", i); - pBatch->active[i] = 0; - } - if(batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - bNeedSubmit = 1; - } - DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - } - if(bNeedSubmit) { - /* note: stats were already computed above */ - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); - } else { - DBGPRINTF("no need to submit batch, all invalid\n"); - } - } else { - if(GatherStats) - countStatsBatchEnq(pAction, pBatch); - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); - } - - free(pBatch->active); - pBatch->active = activeSave; - RETiRet; -} - -/* This submits the message to the action queue in case we do NOT need to handle repeat - * message processing. That case permits us to gain lots of freedom during processing - * and thus speed. - * rgerhards, 2010-06-08 - */ -static rsRetVal -doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) -{ - int i; - DEFiRet; - - DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); - - if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { - iRet = doQueueEnqObjDirectBatch(pAction, pBatch); - } else {/* in this case, we do single submits to the queue. - * TODO: optimize this, we may do at least a multi-submit! - */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( batchIsValidElem(pBatch, i) - && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { - doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg); + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if(pMsg->msgFlags & MARK) { + if((pMsg->ttGenTime - lastAct) < MarkInterval / 2) { + doProcess = 0; + DBGPRINTF("action was recently called, ignoring mark message\n"); + break; /* do not update timestamp for non-written mark messages */ } } - } + } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, + pMsg->ttGenTime, &pAction->mutCAS) == 0); - RETiRet; -} - - - -/* Helper to submit a batch of actions to the engine. Note that we have rather - * complicated processing here, so we need to do this one message after another. - * rgerhards, 2010-06-23 - */ -static inline rsRetVal -helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) -{ - int i; - DEFiRet; - - DBGPRINTF("Called action %p (complex case), logging to %s\n", - pAction, module.GetStateName(pAction->pMod)); - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( batchIsValidElem(pBatch, i) - && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { - doActionCallAction(pAction, pBatch, i); - } + if(doProcess) { + DBGPRINTF("Called action(NotAllMark), processing via '%s'\n", + module.GetStateName(pAction->pMod)); + iRet = doSubmitToActionQ(pAction, pWti, pMsg); } RETiRet; } -/* Call configured action, most complex case with all features supported (and thus slow). - * rgerhards, 2010-06-08 - */ -#pragma GCC diagnostic ignored "-Wempty-body" -static rsRetVal -doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) -{ - DEFiRet; - - d_pthread_mutex_lock(&pAction->mutAction); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); - iRet = helperSubmitToActionQComplexBatch(pAction, pBatch); - d_pthread_mutex_unlock(&pAction->mutAction); - pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - - RETiRet; -} -#pragma GCC diagnostic warning "-Wempty-body" - /* apply all params from param block to action. This supports the v6 config system. * Defaults must have been set appropriately during action construct! * rgerhards, 2011-08-01 */ static rsRetVal -actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals) +actionApplyCnfParam(action_t * const pAction, struct cnfparamvals * const pvals) { int i; @@ -1769,6 +1623,10 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals) pAction->bRepMsgHasMsg = pvals[i].val.d.n; } else if(!strcmp(pblk.descr[i].name, "action.resumeretrycount")) { pAction->iResumeRetryCount = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.reportsuspension")) { + pAction->bReportSuspension = (int) pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.reportsuspensioncontinuation")) { + pAction->bReportSuspensionCont = (int) pvals[i].val.d.n; } else if(!strcmp(pblk.descr[i].name, "action.resumeinterval")) { pAction->iResumeInterval = pvals[i].val.d.n; } else { @@ -1788,7 +1646,7 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals) rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, - struct cnfparamvals *queueParams, int bSuspended) + struct nvlst * const lst) { DEFiRet; int i; @@ -1817,7 +1675,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->bRepMsgHasMsg = cs.bActionRepMsgHasMsg; cs.iActExecEveryNthOccur = 0; /* auto-reset */ cs.iActExecEveryNthOccurTO = 0; /* auto-reset */ - cs.bActionWriteAllMarkMsgs = RSFALSE; /* auto-reset */ + cs.bActionWriteAllMarkMsgs = 1; /* auto-reset */ cs.pszActionName = NULL; /* free again! */ } else { actionApplyCnfParam(pAction, actParams); @@ -1831,43 +1689,50 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, * the discard action, which does not require a string. -- rgerhards, 2007-07-30 */ if(pAction->iNumTpls > 0) { - /* we first need to create the template pointer array */ + /* we first need to create the template arrays */ CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *))); + CHKmalloc(pAction->peParamPassing = (paramPassing_t*)calloc(pAction->iNumTpls, sizeof(paramPassing_t))); } + pAction->bUsesMsgPassingMode = 0; + pAction->bNeedReleaseBatch = 0; for(i = 0 ; i < pAction->iNumTpls ; ++i) { CHKiRet(OMSRgetEntry(pOMSR, i, &pTplName, &iTplOpts)); /* Ok, we got everything, so it now is time to look up the template * (Hint: templates MUST be defined before they are used!) */ - if( !(iTplOpts & OMSR_TPL_AS_MSG) - && (pAction->ppTpl[i] = - tplFind(ourConf, (char*)pTplName, strlen((char*)pTplName))) == NULL) { - snprintf(errMsg, sizeof(errMsg) / sizeof(char), - " Could not find template '%s' - action disabled", - pTplName); - errno = 0; - errmsg.LogError(0, RS_RET_NOT_FOUND, "%s", errMsg); - ABORT_FINALIZE(RS_RET_NOT_FOUND); - } - /* check required template options */ - if( (iTplOpts & OMSR_RQD_TPL_OPT_SQL) - && (pAction->ppTpl[i]->optFormatEscape == 0)) { - errno = 0; - errmsg.LogError(0, RS_RET_RQD_TPLOPT_MISSING, "Action disabled. To use this action, you have to specify " - "the SQL or stdSQL option in your template!\n"); - ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING); + if(!(iTplOpts & OMSR_TPL_AS_MSG)) { + if((pAction->ppTpl[i] = + tplFind(ourConf, (char*)pTplName, strlen((char*)pTplName))) == NULL) { + snprintf(errMsg, sizeof(errMsg) / sizeof(char), + " Could not find template '%s' - action disabled", + pTplName); + errno = 0; + errmsg.LogError(0, RS_RET_NOT_FOUND, "%s", errMsg); + ABORT_FINALIZE(RS_RET_NOT_FOUND); + } + /* check required template options */ + if( (iTplOpts & OMSR_RQD_TPL_OPT_SQL) + && (pAction->ppTpl[i]->optFormatEscape == 0)) { + errno = 0; + errmsg.LogError(0, RS_RET_RQD_TPLOPT_MISSING, "Action disabled. To use this action, you have to specify " + "the SQL or stdSQL option in your template!\n"); + ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING); + } } /* set parameter-passing mode */ if(iTplOpts & OMSR_TPL_AS_ARRAY) { - pAction->eParamPassing = ACT_ARRAY_PASSING; + pAction->peParamPassing[i] = ACT_ARRAY_PASSING; + pAction->bNeedReleaseBatch = 1; } else if(iTplOpts & OMSR_TPL_AS_MSG) { - pAction->eParamPassing = ACT_MSG_PASSING; + pAction->peParamPassing[i] = ACT_MSG_PASSING; + pAction->bUsesMsgPassingMode = 1; } else if(iTplOpts & OMSR_TPL_AS_JSON) { - pAction->eParamPassing = ACT_JSON_PASSING; + pAction->peParamPassing[i] = ACT_JSON_PASSING; + pAction->bNeedReleaseBatch = 1; } else { - pAction->eParamPassing = ACT_STRING_PASSING; + pAction->peParamPassing[i] = ACT_STRING_PASSING; } DBGPRINTF("template: '%s' assigned\n", pTplName); @@ -1875,15 +1740,10 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->pMod = pMod; pAction->pModData = pModData; - /* check if the module is compatible with select features (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ - - if(bSuspended) - actionSuspend(pAction); - CHKiRet(actionConstructFinalize(pAction, queueParams)); + CHKiRet(actionConstructFinalize(pAction, lst)); - /* TODO: if we exit here, we have a memory leak... */ + /* TODO: if we exit here, we have a (quite acceptable...) memory leak */ *ppAction = pAction; /* finally store the action pointer */ @@ -1919,7 +1779,7 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus static inline void initConfigVariables(void) { - cs.bActionWriteAllMarkMsgs = RSFALSE; + cs.bActionWriteAllMarkMsgs = 1; cs.glbliActionResumeRetryCount = 0; cs.bActExecWhenPrevSusp = 0; cs.iActExecOnceInterval = 0; @@ -1940,47 +1800,34 @@ rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction) { struct cnfparamvals *paramvals; - struct cnfparamvals *queueParams; modInfo_t *pMod; uchar *cnfModName = NULL; omodStringRequest_t *pOMSR; void *pModData; action_t *pAction; - int typeIdx; DEFiRet; paramvals = nvlstGetParams(lst, &pblk, NULL); if(paramvals == NULL) { - ABORT_FINALIZE(RS_RET_ERR); + ABORT_FINALIZE(RS_RET_PARAM_ERROR); } dbgprintf("action param blk after actionNewInst:\n"); cnfparamsPrint(&pblk, paramvals); - typeIdx = cnfparamGetIdx(&pblk, "type"); - if(paramvals[typeIdx].bUsed == 0) { - errmsg.LogError(0, RS_RET_CONF_RQRD_PARAM_MISSING, "action type missing"); - ABORT_FINALIZE(RS_RET_CONF_RQRD_PARAM_MISSING); // TODO: move this into rainerscript handlers - } cnfModName = (uchar*)es_str2cstr(paramvals[cnfparamGetIdx(&pblk, ("type"))].val.d.estr, NULL); if((pMod = module.FindWithCnfName(loadConf, cnfModName, eMOD_OUT)) == NULL) { errmsg.LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName); ABORT_FINALIZE(RS_RET_MOD_UNKNOWN); } - iRet = pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR); - // TODO: check if RS_RET_SUSPENDED is still valid in v6! - if(iRet != RS_RET_OK && iRet != RS_RET_SUSPENDED) { - FINALIZE; /* iRet is already set to error state */ - } - - qqueueDoCnfParams(lst, &queueParams); + CHKiRet(pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR)); - if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, queueParams, - (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst)) == RS_RET_OK) { /* check if the module is compatible with select features * (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ loadConf->actions.nbrActions++; /* one more active action! */ + *ppAction = pAction; + } else { + // TODO: cleanup } - *ppAction = pAction; finalize_it: free(cnfModName); @@ -1988,8 +1835,6 @@ finalize_it: RETiRet; } -/* TODO: we are not yet a real object, the ClassInit here just looks like it is.. - */ rsRetVal actionClassInit(void) { DEFiRet; |