diff options
author | Michael Biebl <biebl@debian.org> | 2010-09-30 14:07:18 +0200 |
---|---|---|
committer | Michael Biebl <biebl@debian.org> | 2010-09-30 14:07:18 +0200 |
commit | 017fb92bd811ce1083504eafda4e2080d9520a31 (patch) | |
tree | 777a2a3627f64f6a0e2bea061c0e392af7437300 /action.c | |
parent | dea652279a335b6d83050e5f65c45dd762901022 (diff) | |
download | rsyslog-017fb92bd811ce1083504eafda4e2080d9520a31.tar.gz |
Imported Upstream version 5.7.0upstream/5.7.0
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 1117 |
1 files changed, 876 insertions, 241 deletions
@@ -42,11 +42,19 @@ #include "cfsysline.h" #include "srUtils.h" #include "errmsg.h" +#include "batch.h" +#include "wti.h" #include "datetime.h" #include "unicode-helper.h" +#include "atomic.h" + +#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg); +static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); +static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -55,16 +63,19 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(module) DEFobjCurrIf(errmsg) +static int iActExecOnceInterval = 0; /* execute action once every nn seconds */ static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */ static time_t iActExecEveryNthOccurTO = 0; /* timeout for n-occurence setting (in seconds, 0=never) */ static int glbliActionResumeInterval = 30; int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */ static int bActionRepMsgHasMsg = 0; /* last messsage repeated... has msg fragment in it */ +static int bActionWriteAllMarkMsgs = FALSE; /* should all mark messages be unconditionally written? */ static uchar *pszActionName; /* short name for the action */ -/* main message queue and its configuration parameters */ +/* action queue and its configuration parameters */ static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ static int iActionQueueSize = 1000; /* size of the main message queue above */ +static int iActionQueueDeqBatchSize = 16; /* batch size for action queues */ static int iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ static int iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ static int iActionQDiscardMark = 9800; /* begin to discard messages */ @@ -122,7 +133,7 @@ getActNow(action_t *pThis) { assert(pThis != NULL); if(pThis->tActNow == -1) { - pThis->tActNow = time(NULL); /* good time call - the only one done */ + pThis->tActNow = datetime.GetTime(NULL); /* good time call - the only one done */ if(pThis->tLastExec > pThis->tActNow) { /* if we are traveling back in time, reset tLastExec */ pThis->tLastExec = (time_t) 0; @@ -146,6 +157,7 @@ actionResetQueueParams(void) ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ iActionQueueSize = 1000; /* size of the main message queue above */ + iActionQueueDeqBatchSize = 16; /* default batch size */ iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ iActionQDiscardMark = 9800; /* begin to discard messages */ @@ -179,7 +191,6 @@ actionResetQueueParams(void) */ rsRetVal actionDestruct(action_t *pThis) { - int i; DEFiRet; ASSERT(pThis != NULL); @@ -198,32 +209,6 @@ rsRetVal actionDestruct(action_t *pThis) d_free(pThis->pszName); d_free(pThis->ppTpl); - /* message ptr cleanup */ - for(i = 0 ; i < pThis->iNumTpls ; ++i) { - if(pThis->ppMsgs[i] != NULL) { - switch(pThis->eParamPassing) { - case ACT_ARRAY_PASSING: -#if 0 /* later! */ - iArr = 0; - while(((char **)pThis->ppMsgs[i])[iArr] != NULL) { - d_free(((char **)pThis->ppMsgs[i])[iArr++]); - ((char **)pThis->ppMsgs[i])[iArr++] = NULL; - } - d_free(pThis->ppMsgs[i]); - pThis->ppMsgs[i] = NULL; -#endif - break; - case ACT_STRING_PASSING: - d_free(pThis->ppMsgs[i]); - break; - default: - assert(0); - } - } - } - d_free(pThis->ppMsgs); - d_free(pThis->lenMsgs); - d_free(pThis); RETiRet; @@ -243,8 +228,9 @@ rsRetVal actionConstruct(action_t **ppThis) CHKmalloc(pThis = (action_t*) calloc(1, sizeof(action_t))); pThis->iResumeInterval = glbliActionResumeInterval; pThis->iResumeRetryCount = glbliActionResumeRetryCount; - pThis->tLastOccur = time(NULL); /* done once per action on startup only */ + pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ pthread_mutex_init(&pThis->mutActExec, NULL); + INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); SYNC_OBJ_TOOL_INIT(pThis); /* indicate we have a new action */ @@ -269,6 +255,32 @@ actionConstructFinalize(action_t *pThis) /* find a name for our queue */ snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr); + /* now check if we can run the action in "firehose mode" during stage one of + * its processing (that is before messages are enqueued into the action q). + * This is only possible if some features, which require strict sequence, are + * not used. Thankfully, that is usually the case. The benefit of firehose + * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08 + */ + if( pThis->iExecEveryNthOccur > 1 + || pThis->f_ReduceRepeated + || pThis->iSecsExecOnceInterval + ) { + DBGPRINTF("info: firehose mode disabled for action because " + "iExecEveryNthOccur=%d, " + "ReduceRepeated=%d, " + "iSecsExecOnceInterval=%d\n", + pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated, + pThis->iSecsExecOnceInterval + ); + pThis->submitToActQ = doSubmitToActionQComplexBatch; + } else if(pThis->bWriteAllMarkMsgs == FALSE) { + /* nearly full-speed submission mode, default case */ + pThis->submitToActQ = doSubmitToActionQNotAllMarkBatch; + } else { + /* full firehose submission mode */ + pThis->submitToActQ = doSubmitToActionQBatch; + } + /* we need to make a safety check: if the queue is NOT in direct mode, a single * message object may be accessed by multiple threads. As such, we need to enable * msg object thread safety in this case (this costs a bit performance and thus @@ -283,7 +295,8 @@ actionConstructFinalize(action_t *pThis) * to be run on multiple threads. So far, this is forbidden by the interface * spec. -- rgerhards, 2008-01-30 */ - CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); + CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, + (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -298,6 +311,7 @@ actionConstructFinalize(action_t *pThis) qqueueSetpUsr(pThis->pQueue, pThis); setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); + setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", iActionQueueDeqBatchSize); setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); @@ -334,87 +348,271 @@ finalize_it: } -/* set an action back to active state -- rgerhards, 2007-08-02 + +/* set the global resume interval */ -static rsRetVal actionResume(action_t *pThis) +rsRetVal actionSetGlobalResumeInterval(int iNewVal) +{ + glbliActionResumeInterval = iNewVal; + return RS_RET_OK; +} + + +/* returns the action state name in human-readable form + * returned string must not be modified. + * rgerhards, 2009-05-07 + */ +static uchar *getActStateName(action_t *pThis) +{ + switch(pThis->eState) { + case ACT_STATE_RDY: + return (uchar*) "rdy"; + case ACT_STATE_ITX: + return (uchar*) "itx"; + case ACT_STATE_RTRY: + return (uchar*) "rtry"; + case ACT_STATE_SUSP: + return (uchar*) "susp"; + case ACT_STATE_DIED: + return (uchar*) "died"; + case ACT_STATE_COMM: + return (uchar*) "comm"; + default: + return (uchar*) "ERROR/UNKNWON"; + } +} + + +/* returns a suitable return code based on action state + * rgerhards, 2009-05-07 + */ +static rsRetVal getReturnCode(action_t *pThis) { DEFiRet; ASSERT(pThis != NULL); - pThis->bSuspended = 0; + switch(pThis->eState) { + case ACT_STATE_RDY: + iRet = RS_RET_OK; + break; + case ACT_STATE_ITX: + if(pThis->bHadAutoCommit) { + pThis->bHadAutoCommit = 0; /* auto-reset */ + iRet = RS_RET_PREVIOUS_COMMITTED; + } else { + iRet = RS_RET_DEFER_COMMIT; + } + break; + case ACT_STATE_RTRY: + iRet = RS_RET_SUSPENDED; + break; + case ACT_STATE_SUSP: + case ACT_STATE_DIED: + iRet = RS_RET_ACTION_FAILED; + break; + default: + DBGPRINTF("Invalid action engine state %d, program error\n", + (int) pThis->eState); + iRet = RS_RET_ERR; + break; + } RETiRet; } -/* set the global resume interval +/* set the action to a new state + * rgerhards, 2007-08-02 */ -rsRetVal actionSetGlobalResumeInterval(int iNewVal) +static inline void actionSetState(action_t *pThis, action_state_t newState) { - glbliActionResumeInterval = iNewVal; - return RS_RET_OK; + pThis->eState = newState; + DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); +} + +/* Handles the transient commit state. So far, this is + * mostly a dummy... + * rgerhards, 2007-08-02 + */ +static void actionCommitted(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_RDY); +} + + +/* set action to "rtry" state. + * rgerhards, 2007-08-02 + */ +static void actionRetry(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_RTRY); + pThis->iResumeOKinRow++; +} + + +/* Disable action, this means it will never again be usable + * until rsyslog is reloaded. Use only as a last resort, but + * depends on output module. + * rgerhards, 2007-08-02 + */ +static void actionDisable(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_DIED); +} + + +/* Suspend action, this involves changing the acton state as well + * as setting the next retry time. + * if we have more than 10 retries, we prolong the + * retry interval. If something is really stalled, it will + * get re-tried only very, very seldom - but that saves + * CPU time. TODO: maybe a config option for that? + * rgerhards, 2007-08-02 + */ +static inline void actionSuspend(action_t *pThis, time_t ttNow) +{ + if(ttNow == NO_TIME_PROVIDED) + datetime.GetTime(&ttNow); + pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + actionSetState(pThis, ACT_STATE_SUSP); + DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry); } -/* suspend an action -- rgerhards, 2007-08-02 +/* actually do retry processing. Note that the function receives a timestamp so + * that we do not need to call the (expensive) time() API. + * Note that we do the full retry processing here, doing the configured number of + * iterations. -- rgerhards, 2009-05-07 + * We need to guard against module which always return RS_RET_OK from their tryResume() + * entry point. This is invalid, but has harsh consequences: it will cause the rsyslog + * engine to go into a tight loop. That obviously is not acceptable. As such, we track the + * count of iterations that a tryResume returning RS_RET_OK is immediately followed by + * an unsuccessful call to doAction(). If that happens more than 1,000 times, we assume + * the return acutally is a RS_RET_SUSPENDED. In order to go through the various + * resumption stages, we do this for every 1000 requests. This magic number 1000 may + * not be the most appropriate, but it should be thought of a "if nothing else helps" + * kind of facility: in the first place, the module should return a proper indication + * of its inability to recover. -- rgerhards, 2010-04-26. */ -static rsRetVal actionSuspend(action_t *pThis, time_t tNow) +static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) { + int iRetries; + int iSleepPeriod; + int bTreatOKasSusp; DEFiRet; ASSERT(pThis != NULL); - pThis->bSuspended = 1; - pThis->ttResumeRtry = tNow + pThis->iResumeInterval; - pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */ + iRetries = 0; + while((*pThis->pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { + iRet = pThis->pMod->tryResume(pThis->pModData); + if((pThis->iResumeOKinRow > 999) && (pThis->iResumeOKinRow % 1000 == 0)) { + bTreatOKasSusp = 1; + pThis->iResumeOKinRow = 0; + } else { + bTreatOKasSusp = 0; + } + if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { + actionSetState(pThis, ACT_STATE_RDY); + } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { + /* max retries reached? */ + if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { + actionSuspend(pThis, ttNow); + } else { + ++pThis->iNbrResRtry; + ++iRetries; + iSleepPeriod = pThis->iResumeInterval; + ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */ + srSleep(iSleepPeriod, 0); + if(*pThis->pbShutdownImmediate) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } + } + } else if(iRet == RS_RET_DISABLE_ACTION) { + actionDisable(pThis); + } + } + + if(pThis->eState == ACT_STATE_RDY) { + pThis->iNbrResRtry = 0; + } + +finalize_it: RETiRet; } /* try to resume an action -- rgerhards, 2007-08-02 - * returns RS_RET_OK if resumption worked, RS_RET_SUSPEND if the - * action is still suspended. + * changed to new action state engine -- rgerhards, 2009-05-07 */ static rsRetVal actionTryResume(action_t *pThis) { DEFiRet; - time_t ttNow; + time_t ttNow = NO_TIME_PROVIDED; ASSERT(pThis != NULL); - /* for resume handling, we must always obtain a fresh timestamp. We used - * to use the action timestamp, but in this case we will never reach a - * point where a resumption is actually tried, because the action timestamp - * is always in the past. So we can not avoid doing a fresh time() call - * here. -- rgerhards, 2009-03-18 - */ - time(&ttNow); /* cache "now" */ - - /* first check if it is time for a re-try */ - if(ttNow > pThis->ttResumeRtry) { - iRet = pThis->pMod->tryResume(pThis->pModData); - if(iRet == RS_RET_SUSPENDED) { - /* set new tryResume time */ - ++pThis->iNbrResRtry; - /* if we have more than 10 retries, we prolong the - * retry interval. If something is really stalled, it will - * get re-tried only very, very seldom - but that saves - * CPU time. TODO: maybe a config option for that? - * rgerhards, 2007-08-02 - */ - pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + if(pThis->eState == ACT_STATE_SUSP) { + /* if we are suspended, we need to check if the timeout expired. + * for this handling, we must always obtain a fresh timestamp. We used + * to use the action timestamp, but in this case we will never reach a + * point where a resumption is actually tried, because the action timestamp + * is always in the past. So we can not avoid doing a fresh time() call + * here. -- rgerhards, 2009-03-18 + */ + datetime.GetTime(&ttNow); /* cache "now" */ + if(ttNow > pThis->ttResumeRtry) { + actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ } - } else { - /* it's too early, we are still suspended --> indicate this */ - iRet = RS_RET_SUSPENDED; } - if(iRet == RS_RET_OK) - actionResume(pThis); + if(pThis->eState == ACT_STATE_RTRY) { + if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ + datetime.GetTime(&ttNow); + CHKiRet(actionDoRetry(pThis, ttNow)); + } + + if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { + DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n", + getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + } + +finalize_it: + RETiRet; +} + + +/* prepare an action for performing work. This involves trying to recover it, + * depending on its current state. + * rgerhards, 2009-05-07 + */ +static inline rsRetVal actionPrepare(action_t *pThis) +{ + DEFiRet; - DBGPRINTF("actionTryResume: iRet: %d, next retry (if applicable): %u [now %u]\n", - iRet, (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + assert(pThis != NULL); + CHKiRet(actionTryResume(pThis)); + /* if we are now ready, we initialize the transaction and advance + * action state accordingly + */ + if(pThis->eState == ACT_STATE_RDY) { + iRet = pThis->pMod->mod.om.beginTransaction(pThis->pModData); + switch(iRet) { + case RS_RET_OK: + actionSetState(pThis, ACT_STATE_ITX); + break; + case RS_RET_SUSPENDED: + actionRetry(pThis); + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + default:FINALIZE; + } + } + +finalize_it: RETiRet; } @@ -425,129 +623,461 @@ static rsRetVal actionTryResume(action_t *pThis) rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; + char *sz; dbgprintf("%s: ", module.GetStateName(pThis->pMod)); pThis->pMod->dbgPrintInstInfo(pThis->pModData); dbgprintf("\n\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); - dbgprintf("\tSuspended: %d", pThis->bSuspended); - if(pThis->bSuspended) { - dbgprintf(" next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); + if(pThis->eState == ACT_STATE_SUSP) { + dbgprintf("\tresume next retry: %u, number retries: %d", + (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); } - dbgprintf("\n"); - dbgprintf("\tDisabled: %d\n", !pThis->bEnabled); + dbgprintf("\tState: %s\n", getActStateName(pThis)); dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); + if(pThis->submitToActQ == doSubmitToActionQComplexBatch) { + sz = "slow, but feature-rich"; + } else if(pThis->submitToActQ == doSubmitToActionQNotAllMarkBatch) { + sz = "fast, but supports partial mark messages"; + } else if(pThis->submitToActQ == doSubmitToActionQBatch) { + sz = "firehose (fastest)"; + } else { + sz = "unknown (need to update debug display?)"; + } + dbgprintf("\tsubmission mode: %s\n", sz); dbgprintf("\n"); RETiRet; } -/* call the DoAction output plugin entry point - * rgerhards, 2008-01-28 +/* prepare the calling parameters for doAction() + * rgerhards, 2009-05-07 */ -#pragma GCC diagnostic ignored "-Wempty-body" -rsRetVal -actionCallDoAction(action_t *pAction, msg_t *pMsg) +static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **ppMsgs, size_t *lenMsgs) { - DEFiRet; - int iRetries; int i; - int iArr; - int iSleepPeriod; - int bCallAction; - int iCancelStateSave; + DEFiRet; ASSERT(pAction != NULL); - /* We now must guard the output module against execution by multiple threads. The - * plugin interface specifies that output modules must not be thread-safe (except - * if they notify us they are - functionality not yet implemented...). - * rgerhards, 2008-01-30 - */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - pthread_setcancelstate(iCancelStateSave, NULL); - /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pAction->ppMsgs[i]), &(pAction->lenMsgs[i]))); + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i])); break; case ACT_ARRAY_PASSING: - CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pAction->ppMsgs[i]))); + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(ppMsgs[i]))); + break; + case ACT_MSG_PASSING: + /* we abuse the uchar* ptr, it now actually is a void*, but we can not + * change that other than by chaning the interface, what we don't like... + */ + ppMsgs[i] = (void*) pMsg; + lenMsgs[i] = 0; /* init for *next* action */ break; default:assert(0); /* software bug if this happens! */ } } - iRetries = 0; - do { - /* on first invocation, this if should never be true. We just put it at the top - * of the loop so that processing (and code) is simplified. This code is actually - * triggered on the 2nd+ invocation. -- rgerhards, 2008-01-30 - */ - if(iRet == RS_RET_SUSPENDED) { - /* ok, this calls for our retry logic... */ - ++iRetries; - iSleepPeriod = pAction->iResumeInterval; - srSleep(iSleepPeriod, 0); - } - /* first check if we are suspended and, if so, retry */ - if(actionIsSuspended(pAction)) { - iRet = actionTryResume(pAction); - if(iRet == RS_RET_OK) - bCallAction = 1; - else - bCallAction = 0; - } else { - bCallAction = 1; - } +finalize_it: + RETiRet; +} + - if(bCallAction) { - /* call configured action */ - iRet = pAction->pMod->mod.om.doAction(pAction->ppMsgs, pMsg->msgFlags, pAction->pModData); - if(iRet == RS_RET_SUSPENDED) { - DBGPRINTF("Action requested to be suspended, done that.\n"); - actionSuspend(pAction, getActNow(pAction)); +/* cleanup doAction calling parameters + * rgerhards, 2009-05-07 + */ +static rsRetVal cleanupDoActionParams(action_t *pAction, uchar ***ppMsgs) +{ + int iArr; + int i; + DEFiRet; + + ASSERT(pAction != NULL); + + for(i = 0 ; i < pAction->iNumTpls ; ++i) { + if(((uchar**)ppMsgs)[i] != NULL) { + iArr = 0; + while((((uchar***)ppMsgs)[i][iArr]) != NULL) { + d_free(((uchar ***)ppMsgs)[i][iArr++]); + ((uchar ***)ppMsgs)[i][iArr++] = NULL; } + d_free(((uchar**)ppMsgs)[i]); + ((uchar**)ppMsgs)[i] = NULL; } + } + + RETiRet; +} - } while(iRet == RS_RET_SUSPENDED && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */ - if(iRet == RS_RET_DISABLE_ACTION) { - DBGPRINTF("Action requested to be disabled, done that.\n"); - pAction->bEnabled = 0; /* that's it... */ +/* call the DoAction output plugin entry point + * rgerhards, 2008-01-28 + */ +rsRetVal +actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) +{ + int i; + DEFiRet; + + ASSERT(pThis != NULL); + ISOBJ_TYPE_assert(pMsg, msg); + + DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); + + pThis->bHadAutoCommit = 0; +//d_pthread_mutex_lock(&pThis->mutActExec); +//pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec); + iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); +//pthread_cleanup_pop(1); /* unlock mutex */ + switch(iRet) { + case RS_RET_OK: + actionCommitted(pThis); + pThis->iResumeOKinRow = 0; /* we had a successful call! */ + break; + case RS_RET_DEFER_COMMIT: + pThis->iResumeOKinRow = 0; /* we had a successful call! */ + /* we are done, action state remains the same */ + break; + case RS_RET_PREVIOUS_COMMITTED: + /* action state remains the same, but we had a commit. */ + pThis->bHadAutoCommit = 1; + pThis->iResumeOKinRow = 0; /* we had a successful call! */ + break; + case RS_RET_SUSPENDED: + actionRetry(pThis); + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + default:/* permanent failure of this message - no sense in retrying. This is + * not yet handled (but easy TODO) + */ + FINALIZE; } + iRet = getReturnCode(pThis); finalize_it: - /* cleanup */ - for(i = 0 ; i < pAction->iNumTpls ; ++i) { - if(pAction->ppMsgs[i] != NULL) { - switch(pAction->eParamPassing) { - case ACT_ARRAY_PASSING: - iArr = 0; - while(((char **)pAction->ppMsgs[i])[iArr] != NULL) { - d_free(((char **)pAction->ppMsgs[i])[iArr++]); - ((char **)pAction->ppMsgs[i])[iArr++] = NULL; + + /* we need to cleanup the batches string buffers if they have been used + * in a non-standard way. -- rgerhards, 2010-06-15 + * Note that we may do this at the batch level, this would provide a bit + * more concurrency (TODO). + */ + switch(pThis->eParamPassing) { + case ACT_STRING_PASSING: + /* nothing to do in that case */ + break; + case ACT_ARRAY_PASSING: + cleanupDoActionParams(pThis, actParams); /* iRet ignored! */ + break; + case ACT_MSG_PASSING: + /* nothing to do in that case */ + for(i = 0 ; i < pThis->iNumTpls ; ++i) { + ((uchar**)actParams)[i] = NULL; + } + break; + } + + RETiRet; +} + + +/* process a message + * this readies the action and then calls doAction() + * rgerhards, 2008-01-28 + */ +static inline rsRetVal +actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ISOBJ_TYPE_assert(pMsg, msg); + + CHKiRet(actionPrepare(pThis)); + if(pThis->eState == ACT_STATE_ITX) + CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); + + iRet = getReturnCode(pThis); +finalize_it: + RETiRet; +} + + +/* finish processing a batch. Most importantly, that means we commit if we + * need to do so. + * rgerhards, 2008-01-28 + */ +static rsRetVal +finishBatch(action_t *pThis, batch_t *pBatch) +{ + int i; + DEFiRet; + + ASSERT(pThis != NULL); + + if(pThis->eState == ACT_STATE_RDY) { + /* we just need to flag the batch as commited */ + FINALIZE; /* nothing to do */ + } + + CHKiRet(actionPrepare(pThis)); + if(pThis->eState == ACT_STATE_ITX) { + iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData); + switch(iRet) { + case RS_RET_OK: + actionCommitted(pThis); + /* flag messages as committed */ + for(i = 0 ; i < pBatch->nElem ; ++i) { + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */ } - d_free(pAction->ppMsgs[i]); - pAction->ppMsgs[i] = NULL; break; - case ACT_STRING_PASSING: + case RS_RET_SUSPENDED: + actionRetry(pThis); break; - default: - assert(0); + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + case RS_RET_DEFER_COMMIT: + DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " + "- ignored\n"); + actionCommitted(pThis); + break; + case RS_RET_PREVIOUS_COMMITTED: + DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " + "- ignored\n"); + actionCommitted(pThis); + break; + default:/* permanent failure of this message - no sense in retrying. This is + * not yet handled (but easy TODO) + */ + FINALIZE; + } + } + iRet = getReturnCode(pThis); + +finalize_it: + RETiRet; +} + + +/* try to submit a partial batch of elements. + * rgerhards, 2009-05-12 + */ +static inline rsRetVal +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) +{ + int i; + int iElemProcessed; + int iCommittedUpTo; + msg_t *pMsg; + rsRetVal localRet; + DEFiRet; + + assert(pBatch != NULL); + assert(pnElem != NULL); + + i = pBatch->iDoneUpTo; /* all messages below that index are processed */ + iElemProcessed = 0; + iCommittedUpTo = i; + while(iElemProcessed <= *pnElem && i < pBatch->nElem) { + if(*(pBatch->pbShutdownImmediate)) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + if( pBatch->pElem[i].bFilterOK + && pBatch->pElem[i].state != BATCH_STATE_DISC + && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; + localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams); + DBGPRINTF("action call returned %d\n", localRet); + /* Note: we directly modify the batch object state, because we know that + * wo do not overwrite BATCH_STATE_DISC indicators! + */ + if(localRet == RS_RET_OK) { + /* mark messages as committed */ + while(iCommittedUpTo <= i) { + pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { + /* mark messages as committed */ + while(iCommittedUpTo < i) { + pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + pBatch->pElem[i].state = BATCH_STATE_SUB; + } else if(localRet == RS_RET_DEFER_COMMIT) { + pBatch->pElem[i].state = BATCH_STATE_SUB; + } else if(localRet == RS_RET_DISCARDMSG) { + pBatch->pElem[i].state = BATCH_STATE_DISC; + } else { + dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n", + localRet, *pnElem, iCommittedUpTo); + iRet = localRet; + FINALIZE; } } + ++i; + ++iElemProcessed; } +finalize_it: + if(pBatch->iDoneUpTo != iCommittedUpTo) { + pBatch->iDoneUpTo = iCommittedUpTo; + } + RETiRet; +} + + +/* submit a batch for actual action processing. + * The first nElem elements are processed. This function calls itself + * recursively if it needs to handle errors. + * Note: we don't need the number of the first message to be processed as a parameter, + * because this is kept track of inside the batch itself (iDoneUpTo). + * rgerhards, 2009-05-12 + */ +static rsRetVal +submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +{ + int i; + int bDone; + rsRetVal localRet; + DEFiRet; + + assert(pBatch != NULL); + + bDone = 0; + do { + localRet = tryDoAction(pAction, pBatch, &nElem); + if(localRet == RS_RET_FORCE_TERM) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } + if( localRet == RS_RET_OK + || localRet == RS_RET_PREVIOUS_COMMITTED + || localRet == RS_RET_DEFER_COMMIT) { + /* try commit transaction, once done, we can simply do so as if + * that return state was returned from tryDoAction(). + */ + localRet = finishBatch(pAction, pBatch); + } + + if( localRet == RS_RET_OK + || localRet == RS_RET_PREVIOUS_COMMITTED + || localRet == RS_RET_DEFER_COMMIT) { + bDone = 1; + } else if(localRet == RS_RET_SUSPENDED) { + ; /* do nothing, this will retry the full batch */ + } else if(localRet == RS_RET_ACTION_FAILED) { + /* in this case, everything not yet committed is BAD */ + for(i = pBatch->iDoneUpTo ; i < nElem ; ++i) { + if( pBatch->pElem[i].state != BATCH_STATE_DISC + && pBatch->pElem[i].state != BATCH_STATE_COMM ) { + pBatch->pElem[i].state = BATCH_STATE_BAD; + pBatch->pElem[i].bPrevWasSuspended = 1; + } + } + bDone = 1; + } else { + if(nElem == 1) { + batchSetElemState(pBatch, i, BATCH_STATE_BAD); + bDone = 1; + } else { + /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ + submitBatch(pAction, pBatch, nElem / 2); + submitBatch(pAction, pBatch, nElem - (nElem / 2)); + bDone = 1; + } + } + } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */ + + if(*(pBatch->pbShutdownImmediate)) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + +finalize_it: + RETiRet; +} + + + +/* The following function prepares a batch for processing, that it is + * reinitializes batch states, generates strings and does everything else + * that needs to be done in order to make the batch ready for submission to + * the actual output module. Note that we look at the precomputed + * filter OK condition and process only those messages, that actually matched + * the filter. + * rgerhards, 2010-06-14 + */ +static inline rsRetVal +prepareBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + batch_obj_t *pElem; + DEFiRet; + + pBatch->iDoneUpTo = 0; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + pElem = &(pBatch->pElem[i]); + if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { + pElem->state = BATCH_STATE_RDY; + prepareDoActionParams(pAction, (msg_t*) pElem->pUsrp, + (uchar**) &(pElem->staticActParams), pElem->staticLenParams); + } + } + RETiRet; +} + + +/* receive a batch and process it. This includes retry handling. + * rgerhards, 2009-05-12 + */ +static inline rsRetVal +processAction(action_t *pAction, batch_t *pBatch) +{ + DEFiRet; + + assert(pBatch != NULL); + CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); + iRet = finishBatch(pAction, pBatch); + +finalize_it: + RETiRet; +} + + +#pragma GCC diagnostic ignored "-Wempty-body" +/* receive an array of to-process user pointers and submit them + * for processing. + * rgerhards, 2009-04-22 + */ +static rsRetVal +processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) +{ + int *pbShutdownImmdtSave; + DEFiRet; + + assert(pBatch != NULL); + + pbShutdownImmdtSave = pBatch->pbShutdownImmediate; + pBatch->pbShutdownImmediate = pbShutdownImmediate; + pAction->pbShutdownImmediate = pBatch->pbShutdownImmediate; + CHKiRet(prepareBatch(pAction, pBatch)); + + /* We now must guard the output module against execution by multiple threads. The + * plugin interface specifies that output modules must not be thread-safe (except + * if they notify us they are - functionality not yet implemented...). + * rgerhards, 2008-01-30 + */ + d_pthread_mutex_lock(&pAction->mutActExec); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); + + iRet = processAction(pAction, pBatch); + pthread_cleanup_pop(1); /* unlock mutex */ - msgDestruct(&pMsg); /* we are now finished with the message */ +finalize_it: + pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" @@ -563,7 +1093,6 @@ rsRetVal actionCallHUPHdlr(action_t *pAction) { DEFiRet; - int iCancelStateSave; ASSERT(pAction != NULL); DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP); @@ -572,10 +1101,8 @@ actionCallHUPHdlr(action_t *pAction) FINALIZE; /* no HUP handler, so we are done ;) */ } - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - pthread_setcancelstate(iCancelStateSave, NULL); CHKiRet(pAction->pMod->doHUP(pAction->pModData)); pthread_cleanup_pop(1); /* unlock mutex */ @@ -615,20 +1142,8 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT } -/* rgerhards 2004-11-09: fprintlog() is the actual driver for - * the output channel. It receives the channel description (f) as - * well as the message and outputs them according to the channel - * semantics. The message is typically already contained in the - * channel save buffer (f->f_prevline). This is not only the case - * when a message was already repeated but also when a new message - * arrived. - * rgerhards 2007-08-01: interface changed to use action_t - * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE - * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do - * not do this here. Failing to do so results in all kinds of - * "interesting" problems! - * RGERHARDS, 2008-01-29: - * This is now the action caller and has been renamed. +/* This function builds up a batch of messages to be (later) + * submitted to the action queue. */ rsRetVal actionWriteToAction(action_t *pAction) @@ -754,28 +1269,16 @@ finalize_it: /* helper to actonCallAction, mostly needed because of this damn * pthread_cleanup_push() POSIX macro... */ -static rsRetVal +static inline rsRetVal doActionCallAction(action_t *pAction, msg_t *pMsg) { DEFiRet; - /* first, we need to check if this is a disabled - * entry. If so, we must not further process it. - * rgerhards 2005-09-26 - * In the future, disabled modules may be re-probed from time - * to time. They are in a perfectly legal state, except that the - * doAction method indicated that it wanted to be disabled - but - * we do not consider this is a solution for eternity... So we - * should check from time to time if affairs have improved. - * rgerhards, 2007-07-24 - */ - if(pAction->bEnabled == 0) { - ABORT_FINALIZE(RS_RET_OK); - } pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ /* don't output marks to recently written outputs */ - if((pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { + if(pAction->bWriteAllMarkMsgs == FALSE + && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { ABORT_FINALIZE(RS_RET_OK); } @@ -821,77 +1324,167 @@ finalize_it: RETiRet; } -/* call the configured action. Does all necessary housekeeping. - * rgerhards, 2007-08-01 - * FYI: currently, this function is only called from the queue - * consumer. So we (conceptually) run detached from the input - * threads (which also means we may run much later than when the - * message was generated). +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. + * rgerhards, 2010-06-08 */ -#pragma GCC diagnostic ignored "-Wempty-body" -rsRetVal -actionCallAction(action_t *pAction, msg_t *pMsg) +static inline rsRetVal +doSubmitToActionQ(action_t *pAction, msg_t *pMsg) { DEFiRet; - int iCancelStateSave; - ISOBJ_TYPE_assert(pMsg, msg); - ASSERT(pAction != NULL); - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - LockObj(pAction); - pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); - pthread_setcancelstate(iCancelStateSave, NULL); - iRet = doActionCallAction(pAction, pMsg); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - UnlockObj(pAction); - pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - pthread_setcancelstate(iCancelStateSave, NULL); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); + else + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" -/* add our cfsysline handlers - * rgerhards, 2008-01-28 + +/* This submits the message to the action queue in case where we need to handle + * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop + * for the synchronization. Here, we just modify the filter condition to be false when + * a mark message must not be written. However, in this case we must save the previous + * filter as we may need it in the next action (potential future optimization: check if this is + * the last action TODO). + * rgerhards, 2010-06-08 */ -rsRetVal -actionAddCfSysLineHdrl(void) +static rsRetVal +doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) { + time_t now = 0; + time_t lastAct; + int i; + int bProcessMarkMsgs; + int bModifiedFilter; + sbool FilterSave[128]; + sbool *pFilterSave; DEFiRet; - CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL)); + if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) { + pFilterSave = FilterSave; + } else { + CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); + } + + bModifiedFilter = 0; + for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { + pFilterSave[i] = pBatch->pElem[i].bFilterOK; + if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) { + /* check if we need to write or not */ + if(now == 0) { + now = datetime.GetTime(NULL); /* good time call - the only one done */ + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if((now - lastAct) < MarkInterval / 2) { + DBGPRINTF("action was recently called, ignoring mark message\n"); + bProcessMarkMsgs = 0; + } else { + bProcessMarkMsgs = 1; + } + } while(ATOMIC_CAS(&pAction->f_time, lastAct, + ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0); + } + if(bProcessMarkMsgs) { + pBatch->pElem[i].bFilterOK = 0; + bModifiedFilter = 1; + } + } + } + DBGPRINTF("Called action(NotAllMark), logging to %s\n", module.GetStateName(pAction->pMod)); + + iRet = doSubmitToActionQBatch(pAction, pBatch); + + if(bModifiedFilter) { + /* in this case, we need to restore previous state */ + for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { + pBatch->pElem[i].bFilterOK = pFilterSave[i]; + } + } + finalize_it: + if(pFilterSave != FilterSave) + free(pFilterSave); + + RETiRet; +} + + +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. + * rgerhards, 2010-06-08 + */ +static rsRetVal +doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + DEFiRet; + + DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + else { /* in this case, we do single submits to the queue. + * TODO: optimize this, we may do at least a multi-submit! + */ + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(pBatch->pElem[i].bFilterOK) { + doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + } + } + } + RETiRet; } + +/* Helper to submit a batch of actions to the engine. Note that we have rather + * complicated processing here, so we need to do this one message after another. + * rgerhards, 2010-06-23 + */ +static inline rsRetVal +helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + DEFiRet; + + DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(pBatch->pElem[i].bFilterOK) { + doActionCallAction(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + } + } + + RETiRet; +} + +/* Call configured action, most complex case with all features supported (and thus slow). + * rgerhards, 2010-06-08 + */ +#pragma GCC diagnostic ignored "-Wempty-body" +static rsRetVal +doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +{ + DEFiRet; + + LockObj(pAction); + pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); + iRet = helperSubmitToActionQComplexBatch(pAction, pBatch); + UnlockObj(pAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ + + RETiRet; +} +#pragma GCC diagnostic warning "-Wempty-body" + /* add an Action to the current selector * The pOMSR is freed, as it is not needed after this function. * Note: this function pulls global data that specifies action config state. @@ -917,6 +1510,8 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques pAction->pModData = pModData; pAction->pszName = pszActionName; pszActionName = NULL; /* free again! */ + pAction->bWriteAllMarkMsgs = bActionWriteAllMarkMsgs; + bActionWriteAllMarkMsgs = FALSE; /* reset */ pAction->bExecWhenPrevSusp = bActExecWhenPrevSusp; pAction->iSecsExecOnceInterval = iActExecOnceInterval; pAction->iExecEveryNthOccur = iActExecEveryNthOccur; @@ -935,15 +1530,12 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques if(pAction->iNumTpls > 0) { /* we first need to create the template pointer array */ CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *))); - CHKmalloc(pAction->ppMsgs = (uchar**) calloc(pAction->iNumTpls, sizeof(uchar *))); - CHKmalloc(pAction->lenMsgs = (size_t*) calloc(pAction->iNumTpls, sizeof(size_t))); } for(i = 0 ; i < pAction->iNumTpls ; ++i) { CHKiRet(OMSRgetEntry(pOMSR, i, &pTplName, &iTplOpts)); - /* Ok, we got everything, so it now is time to look up the - * template (Hint: templates MUST be defined before they are - * used!) + /* Ok, we got everything, so it now is time to look up the template + * (Hint: templates MUST be defined before they are used!) */ if((pAction->ppTpl[i] = tplFind((char*)pTplName, strlen((char*)pTplName))) == NULL) { snprintf(errMsg, sizeof(errMsg) / sizeof(char), @@ -965,6 +1557,8 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques /* set parameter-passing mode */ if(iTplOpts & OMSR_TPL_AS_ARRAY) { pAction->eParamPassing = ACT_ARRAY_PASSING; + } else if(iTplOpts & OMSR_TPL_AS_MSG) { + pAction->eParamPassing = ACT_MSG_PASSING; } else { pAction->eParamPassing = ACT_STRING_PASSING; } @@ -981,10 +1575,10 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n"); pAction->f_ReduceRepeated = 0; } - pAction->bEnabled = 1; /* action is enabled */ + pAction->eState = ACT_STATE_RDY; /* action is enabled */ if(bSuspended) - actionSuspend(pAction, time(NULL)); /* "good" time call, only during init and unavoidable */ + actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */ CHKiRet(actionConstructFinalize(pAction)); @@ -1006,6 +1600,17 @@ finalize_it: } +/* Reset config variables to default values. + * rgerhards, 2009-11-12 + */ +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + iActExecOnceInterval = 0; + return RS_RET_OK; +} + + /* TODO: we are not yet a real object, the ClassInit here just looks like it is.. */ rsRetVal actionClassInit(void) @@ -1017,6 +1622,36 @@ rsRetVal actionClassInit(void) CHKiRet(objUse(module, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, &bActionWriteAllMarkMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyonceeveryinterval", 0, eCmdHdlrInt, NULL, &iActExecOnceInterval, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL)); + finalize_it: RETiRet; } |