diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 160 |
1 files changed, 114 insertions, 46 deletions
@@ -69,7 +69,7 @@ * beast. * rgerhards, 2011-06-15 * - * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -114,6 +114,7 @@ #include "unicode-helper.h" #include "atomic.h" #include "ruleset.h" +#include "parserif.h" #include "statsobj.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ @@ -179,6 +180,8 @@ configSettings_t cs_save; /* our saved (scope!) config settings */ * counting. -- rgerhards, 2008-01-29 */ static int iActionNbr = 0; +int bActionReportSuspension = 1; +int bActionReportSuspensionCont = 0; /* tables for interfacing with the v6 config system */ static struct cnfparamdescr cnfparamdescr[] = { @@ -191,6 +194,8 @@ static struct cnfparamdescr cnfparamdescr[] = { { "action.execonlywhenpreviousissuspended", eCmdHdlrBinary, 0 }, /* legacy: actionexeconlywhenpreviousissuspended */ { "action.repeatedmsgcontainsoriginalmsg", eCmdHdlrBinary, 0 }, /* legacy: repeatedmsgcontainsoriginalmsg */ { "action.resumeretrycount", eCmdHdlrInt, 0 }, /* legacy: actionresumeretrycount */ + { "action.reportsuspension", eCmdHdlrBinary, 0 }, + { "action.reportsuspensioncontinuation", eCmdHdlrBinary, 0 }, { "action.resumeinterval", eCmdHdlrInt, 0 } }; static struct cnfparamblk pblk = @@ -253,8 +258,8 @@ actionResetQueueParams(void) cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ cs.iActionQueueSize = 1000; /* size of the main message queue above */ cs.iActionQueueDeqBatchSize = 16; /* default batch size */ - cs.iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ - cs.iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ + cs.iActionQHighWtrMark = -1; /* high water mark for disk-assisted queues */ + cs.iActionQLowWtrMark = -1; /* low water mark for disk-assisted queues */ cs.iActionQDiscardMark = 980; /* begin to discard messages */ cs.iActionQDiscardSeverity = 8; /* discard warning and above */ cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ @@ -265,7 +270,7 @@ actionResetQueueParams(void) cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ cs.iActionQtoEnq = 50; /* timeout for queue enque */ cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ - cs.iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ + cs.iActionQWrkMinMsgs = -1; /* minimum messages per worker needed to start a new one */ cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ cs.iActionQueMaxDiskSpace = 0; cs.iActionQueueDeqSlowdown = 0; @@ -340,6 +345,9 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->iSecsExecOnceInterval = 0; pThis->bExecWhenPrevSusp = 0; pThis->bRepMsgHasMsg = 0; + pThis->bReportSuspension = -1; /* indicate "not yet set" */ + pThis->bReportSuspensionCont = -1; /* indicate "not yet set" */ + pThis->bJustResumed = 0; pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ pthread_mutex_init(&pThis->mutActExec, NULL); pthread_mutex_init(&pThis->mutAction, NULL); @@ -357,7 +365,7 @@ finalize_it: /* action construction finalizer */ rsRetVal -actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) +actionConstructFinalize(action_t *pThis, struct nvlst *lst) { DEFiRet; uchar pszAName[64]; /* friendly name of our action */ @@ -371,35 +379,40 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) /* generate a friendly name for us action stats */ if(pThis->pszName == NULL) { snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr); - } else { - ustrncpy(pszAName, pThis->pszName, sizeof(pszAName)); - pszAName[sizeof(pszAName)-1] = '\0'; /* to be on the save side */ + pThis->pszName = ustrdup(pszAName); } /* support statistics gathering */ CHKiRet(statsobj.Construct(&pThis->statsobj)); - CHKiRet(statsobj.SetName(pThis->statsobj, pszAName)); + CHKiRet(statsobj.SetName(pThis->statsobj, pThis->pszName)); STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"), - ctrType_IntCtr, &pThis->ctrProcessed)); + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrProcessed)); STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"), - ctrType_IntCtr, &pThis->ctrFail)); + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrFail)); + + STATSCOUNTER_INIT(pThis->ctrSuspend, pThis->mutCtrSuspend); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrSuspend)); + STATSCOUNTER_INIT(pThis->ctrSuspendDuration, pThis->mutCtrSuspendDuration); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended.duration"), + ctrType_IntCtr, 0, &pThis->ctrSuspendDuration)); + + STATSCOUNTER_INIT(pThis->ctrResume, pThis->mutCtrResume); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("resumed"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrResume)); CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); /* create our queue */ /* generate a friendly name for the queue */ - if(pThis->pszName == NULL) { - snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d queue", - iActionNbr); - } else { - ustrncpy(pszAName, pThis->pszName, sizeof(pszAName)); - pszAName[63] = '\0'; /* to be on the save side */ - } + snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "%s queue", + pThis->pszName); + /* now check if we can run the action in "firehose mode" during stage one of * its processing (that is before messages are enqueued into the action q). * This is only possible if some features, which require strict sequence, are @@ -432,7 +445,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) obj.SetName((obj_t*) pThis->pQueue, pszAName); qqueueSetpAction(pThis->pQueue, pThis); - if(queueParams == NULL) { /* use legacy params? */ + if(lst == NULL) { /* use legacy params? */ /* ... set some properties ... */ # define setQPROP(func, directive, data) \ CHKiRet_Hdlr(func(pThis->pQueue, data)) { \ @@ -467,7 +480,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) } else { /* we have v6-style config params */ qqueueSetDefaultsActionQueue(pThis->pQueue); - qqueueApplyCnfParam(pThis->pQueue, queueParams); + qqueueApplyCnfParam(pThis->pQueue, lst); } # undef setQPROP @@ -476,6 +489,12 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) qqueueDbgPrint(pThis->pQueue); DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue); + + if(pThis->eParamPassing == ACT_MSG_PASSING && pThis->pQueue->qType != QUEUETYPE_DIRECT) { + parser_warnmsg("module %s with message passing mode uses " + "non-direct queue. This most probably leads to undesired " + "results", (char*)modGetName(pThis->pMod)); + } /* and now reset the queue params (see comment in its function header!) */ actionResetQueueParams(); @@ -598,7 +617,7 @@ static void actionDisable(action_t *pThis) } -/* Suspend action, this involves changing the acton state as well +/* Suspend action, this involves changing the action state as well * as setting the next retry time. * if we have more than 10 retries, we prolong the * retry interval. If something is really stalled, it will @@ -606,17 +625,48 @@ static void actionDisable(action_t *pThis) * CPU time. TODO: maybe a config option for that? * rgerhards, 2007-08-02 */ -static inline void actionSuspend(action_t *pThis) +static inline void +actionSuspend(action_t * const pThis) { time_t ttNow; + int suspendDuration; + char timebuf[32]; + + /* we need to defer setting the action's own bReportSuspension state until + * after the full config has been processed. So the most simple case to do + * that is here. It's not a performance problem, as it happens infrequently. + * it's not a threading race problem, as always the same value will be written. + */ + if(pThis->bReportSuspension == -1) + pThis->bReportSuspension = bActionReportSuspension; + if(pThis->bReportSuspensionCont == -1) { + pThis->bReportSuspensionCont = bActionReportSuspensionCont; + if(pThis->bReportSuspensionCont == -1) + pThis->bReportSuspension = 1; + } /* note: we can NOT use a cached timestamp, as time may have evolved * since caching, and this would break logic (and it actually did so!) */ datetime.GetTime(&ttNow); - pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + suspendDuration = pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + pThis->ttResumeRtry = ttNow + suspendDuration; actionSetState(pThis, ACT_STATE_SUSP); - DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry); + pThis->ctrSuspendDuration += suspendDuration; + if(pThis->iNbrResRtry == 0) { + STATSCOUNTER_INC(pThis->ctrSuspend, pThis->mutCtrSuspend); + } + if( pThis->bReportSuspensionCont + || (pThis->bReportSuspension && pThis->iNbrResRtry == 0) ) { + ctime_r(&pThis->ttResumeRtry, timebuf); + timebuf[strlen(timebuf)-1] = '\0'; /* strip LF */ + errmsg.LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, + "action '%s' suspended, next retry is %s", + pThis->pszName, timebuf); + } + DBGPRINTF("action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d\n", + pThis->pszName, (long long) pThis->ttResumeRtry, (long long) ttNow, + pThis->iNbrResRtry); } @@ -647,9 +697,9 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) iRetries = 0; while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { - DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); + DBGPRINTF("actionDoRetry: %s enter loop, iRetries=%d\n", pThis->pszName, iRetries); iRet = pThis->pMod->tryResume(pThis->pModData); - DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); + DBGPRINTF("actionDoRetry: %s action->tryResume returned %d\n", pThis->pszName, iRet); if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { bTreatOKasSusp = 1; pThis->iResumeOKinRow = 0; @@ -657,16 +707,20 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) bTreatOKasSusp = 0; } if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { - DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet); + DBGPRINTF("actionDoRetry: %s had success RDY again (iRet=%d)\n", + pThis->pszName, iRet); + pThis->bJustResumed = 1; actionSetState(pThis, ACT_STATE_RDY); } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { /* max retries reached? */ - DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n", - pThis->iResumeRetryCount, iRetries); + DBGPRINTF("actionDoRetry: %s check for max retries, iResumeRetryCount " + "%d, iRetries %d\n", + pThis->pszName, pThis->iResumeRetryCount, iRetries); if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { actionSuspend(pThis); + if(pThis->iNbrResRtry < 20) + ++pThis->iNbrResRtry; } else { - ++pThis->iNbrResRtry; ++iRetries; iSleepPeriod = pThis->iResumeInterval; srSleep(iSleepPeriod, 0); @@ -901,6 +955,25 @@ static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) done: RETiRet; } +/* This is used in resume processing. We only finally know that a resume + * worked when we have been able to actually process a messages. As such, + * we need to do some cleanup and status tracking in that case. + */ +static void +actionSetActionWorked(action_t *__restrict__ const pThis) +{ + pThis->iResumeOKinRow = 0; /* we had a successful call! */ + + if(pThis->bJustResumed) { + /* OK, we *really* could resume, so tell user! */ + if(pThis->bReportSuspension) { + errmsg.LogMsg(0, RS_RET_RESUMED, LOG_INFO, "action '%s' " + "resumed (module '%s')", + pThis->pszName, pThis->pMod->pszName); + } + pThis->bJustResumed = 0; + } +} /* call the DoAction output plugin entry point * rgerhards, 2008-01-28 @@ -920,16 +993,16 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) switch(iRet) { case RS_RET_OK: actionCommitted(pThis); - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + actionSetActionWorked(pThis); /* we had a successful call! */ break; case RS_RET_DEFER_COMMIT: - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + actionSetActionWorked(pThis); /* we had a successful call! */ /* we are done, action state remains the same */ break; case RS_RET_PREVIOUS_COMMITTED: /* action state remains the same, but we had a commit. */ pThis->bHadAutoCommit = 1; - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + actionSetActionWorked(pThis); /* we had a successful call! */ break; case RS_RET_SUSPENDED: actionRetry(pThis); @@ -1770,6 +1843,10 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals) pAction->bRepMsgHasMsg = pvals[i].val.d.n; } else if(!strcmp(pblk.descr[i].name, "action.resumeretrycount")) { pAction->iResumeRetryCount = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.reportsuspension")) { + pAction->bReportSuspension = (int) pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.reportsuspensioncontinuation")) { + pAction->bReportSuspensionCont = (int) pvals[i].val.d.n; } else if(!strcmp(pblk.descr[i].name, "action.resumeinterval")) { pAction->iResumeInterval = pvals[i].val.d.n; } else { @@ -1789,7 +1866,7 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals) rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, - struct cnfparamvals *queueParams, int bSuspended) + struct nvlst *lst, int bSuspended) { DEFiRet; int i; @@ -1882,7 +1959,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, if(bSuspended) actionSuspend(pAction); - CHKiRet(actionConstructFinalize(pAction, queueParams)); + CHKiRet(actionConstructFinalize(pAction, lst)); /* TODO: if we exit here, we have a memory leak... */ @@ -1941,26 +2018,19 @@ rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction) { struct cnfparamvals *paramvals; - struct cnfparamvals *queueParams; modInfo_t *pMod; uchar *cnfModName = NULL; omodStringRequest_t *pOMSR; void *pModData; action_t *pAction; - int typeIdx; DEFiRet; paramvals = nvlstGetParams(lst, &pblk, NULL); if(paramvals == NULL) { - ABORT_FINALIZE(RS_RET_ERR); + ABORT_FINALIZE(RS_RET_PARAM_ERROR); } dbgprintf("action param blk after actionNewInst:\n"); cnfparamsPrint(&pblk, paramvals); - typeIdx = cnfparamGetIdx(&pblk, "type"); - if(paramvals[typeIdx].bUsed == 0) { - errmsg.LogError(0, RS_RET_CONF_RQRD_PARAM_MISSING, "action type missing"); - ABORT_FINALIZE(RS_RET_CONF_RQRD_PARAM_MISSING); // TODO: move this into rainerscript handlers - } cnfModName = (uchar*)es_str2cstr(paramvals[cnfparamGetIdx(&pblk, ("type"))].val.d.estr, NULL); if((pMod = module.FindWithCnfName(loadConf, cnfModName, eMOD_OUT)) == NULL) { errmsg.LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName); @@ -1972,9 +2042,7 @@ actionNewInst(struct nvlst *lst, action_t **ppAction) FINALIZE; /* iRet is already set to error state */ } - qqueueDoCnfParams(lst, &queueParams); - - if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, queueParams, + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { /* check if the module is compatible with select features * (currently no such features exist) */ |