summaryrefslogtreecommitdiff
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c160
1 files changed, 114 insertions, 46 deletions
diff --git a/action.c b/action.c
index 3961159..d16269d 100644
--- a/action.c
+++ b/action.c
@@ -69,7 +69,7 @@
* beast.
* rgerhards, 2011-06-15
*
- * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -114,6 +114,7 @@
#include "unicode-helper.h"
#include "atomic.h"
#include "ruleset.h"
+#include "parserif.h"
#include "statsobj.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
@@ -179,6 +180,8 @@ configSettings_t cs_save; /* our saved (scope!) config settings */
* counting. -- rgerhards, 2008-01-29
*/
static int iActionNbr = 0;
+int bActionReportSuspension = 1;
+int bActionReportSuspensionCont = 0;
/* tables for interfacing with the v6 config system */
static struct cnfparamdescr cnfparamdescr[] = {
@@ -191,6 +194,8 @@ static struct cnfparamdescr cnfparamdescr[] = {
{ "action.execonlywhenpreviousissuspended", eCmdHdlrBinary, 0 }, /* legacy: actionexeconlywhenpreviousissuspended */
{ "action.repeatedmsgcontainsoriginalmsg", eCmdHdlrBinary, 0 }, /* legacy: repeatedmsgcontainsoriginalmsg */
{ "action.resumeretrycount", eCmdHdlrInt, 0 }, /* legacy: actionresumeretrycount */
+ { "action.reportsuspension", eCmdHdlrBinary, 0 },
+ { "action.reportsuspensioncontinuation", eCmdHdlrBinary, 0 },
{ "action.resumeinterval", eCmdHdlrInt, 0 }
};
static struct cnfparamblk pblk =
@@ -253,8 +258,8 @@ actionResetQueueParams(void)
cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
cs.iActionQueueSize = 1000; /* size of the main message queue above */
cs.iActionQueueDeqBatchSize = 16; /* default batch size */
- cs.iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
- cs.iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
+ cs.iActionQHighWtrMark = -1; /* high water mark for disk-assisted queues */
+ cs.iActionQLowWtrMark = -1; /* low water mark for disk-assisted queues */
cs.iActionQDiscardMark = 980; /* begin to discard messages */
cs.iActionQDiscardSeverity = 8; /* discard warning and above */
cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
@@ -265,7 +270,7 @@ actionResetQueueParams(void)
cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
cs.iActionQtoEnq = 50; /* timeout for queue enque */
cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
- cs.iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
+ cs.iActionQWrkMinMsgs = -1; /* minimum messages per worker needed to start a new one */
cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
cs.iActionQueMaxDiskSpace = 0;
cs.iActionQueueDeqSlowdown = 0;
@@ -340,6 +345,9 @@ rsRetVal actionConstruct(action_t **ppThis)
pThis->iSecsExecOnceInterval = 0;
pThis->bExecWhenPrevSusp = 0;
pThis->bRepMsgHasMsg = 0;
+ pThis->bReportSuspension = -1; /* indicate "not yet set" */
+ pThis->bReportSuspensionCont = -1; /* indicate "not yet set" */
+ pThis->bJustResumed = 0;
pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */
pthread_mutex_init(&pThis->mutActExec, NULL);
pthread_mutex_init(&pThis->mutAction, NULL);
@@ -357,7 +365,7 @@ finalize_it:
/* action construction finalizer
*/
rsRetVal
-actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
+actionConstructFinalize(action_t *pThis, struct nvlst *lst)
{
DEFiRet;
uchar pszAName[64]; /* friendly name of our action */
@@ -371,35 +379,40 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
/* generate a friendly name for us action stats */
if(pThis->pszName == NULL) {
snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr);
- } else {
- ustrncpy(pszAName, pThis->pszName, sizeof(pszAName));
- pszAName[sizeof(pszAName)-1] = '\0'; /* to be on the save side */
+ pThis->pszName = ustrdup(pszAName);
}
/* support statistics gathering */
CHKiRet(statsobj.Construct(&pThis->statsobj));
- CHKiRet(statsobj.SetName(pThis->statsobj, pszAName));
+ CHKiRet(statsobj.SetName(pThis->statsobj, pThis->pszName));
STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"),
- ctrType_IntCtr, &pThis->ctrProcessed));
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrProcessed));
STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"),
- ctrType_IntCtr, &pThis->ctrFail));
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrFail));
+
+ STATSCOUNTER_INIT(pThis->ctrSuspend, pThis->mutCtrSuspend);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrSuspend));
+ STATSCOUNTER_INIT(pThis->ctrSuspendDuration, pThis->mutCtrSuspendDuration);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended.duration"),
+ ctrType_IntCtr, 0, &pThis->ctrSuspendDuration));
+
+ STATSCOUNTER_INIT(pThis->ctrResume, pThis->mutCtrResume);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("resumed"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrResume));
CHKiRet(statsobj.ConstructFinalize(pThis->statsobj));
/* create our queue */
/* generate a friendly name for the queue */
- if(pThis->pszName == NULL) {
- snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d queue",
- iActionNbr);
- } else {
- ustrncpy(pszAName, pThis->pszName, sizeof(pszAName));
- pszAName[63] = '\0'; /* to be on the save side */
- }
+ snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "%s queue",
+ pThis->pszName);
+
/* now check if we can run the action in "firehose mode" during stage one of
* its processing (that is before messages are enqueued into the action q).
* This is only possible if some features, which require strict sequence, are
@@ -432,7 +445,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
obj.SetName((obj_t*) pThis->pQueue, pszAName);
qqueueSetpAction(pThis->pQueue, pThis);
- if(queueParams == NULL) { /* use legacy params? */
+ if(lst == NULL) { /* use legacy params? */
/* ... set some properties ... */
# define setQPROP(func, directive, data) \
CHKiRet_Hdlr(func(pThis->pQueue, data)) { \
@@ -467,7 +480,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
} else {
/* we have v6-style config params */
qqueueSetDefaultsActionQueue(pThis->pQueue);
- qqueueApplyCnfParam(pThis->pQueue, queueParams);
+ qqueueApplyCnfParam(pThis->pQueue, lst);
}
# undef setQPROP
@@ -476,6 +489,12 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
qqueueDbgPrint(pThis->pQueue);
DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue);
+
+ if(pThis->eParamPassing == ACT_MSG_PASSING && pThis->pQueue->qType != QUEUETYPE_DIRECT) {
+ parser_warnmsg("module %s with message passing mode uses "
+ "non-direct queue. This most probably leads to undesired "
+ "results", (char*)modGetName(pThis->pMod));
+ }
/* and now reset the queue params (see comment in its function header!) */
actionResetQueueParams();
@@ -598,7 +617,7 @@ static void actionDisable(action_t *pThis)
}
-/* Suspend action, this involves changing the acton state as well
+/* Suspend action, this involves changing the action state as well
* as setting the next retry time.
* if we have more than 10 retries, we prolong the
* retry interval. If something is really stalled, it will
@@ -606,17 +625,48 @@ static void actionDisable(action_t *pThis)
* CPU time. TODO: maybe a config option for that?
* rgerhards, 2007-08-02
*/
-static inline void actionSuspend(action_t *pThis)
+static inline void
+actionSuspend(action_t * const pThis)
{
time_t ttNow;
+ int suspendDuration;
+ char timebuf[32];
+
+ /* we need to defer setting the action's own bReportSuspension state until
+ * after the full config has been processed. So the most simple case to do
+ * that is here. It's not a performance problem, as it happens infrequently.
+ * it's not a threading race problem, as always the same value will be written.
+ */
+ if(pThis->bReportSuspension == -1)
+ pThis->bReportSuspension = bActionReportSuspension;
+ if(pThis->bReportSuspensionCont == -1) {
+ pThis->bReportSuspensionCont = bActionReportSuspensionCont;
+ if(pThis->bReportSuspensionCont == -1)
+ pThis->bReportSuspension = 1;
+ }
/* note: we can NOT use a cached timestamp, as time may have evolved
* since caching, and this would break logic (and it actually did so!)
*/
datetime.GetTime(&ttNow);
- pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
+ suspendDuration = pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
+ pThis->ttResumeRtry = ttNow + suspendDuration;
actionSetState(pThis, ACT_STATE_SUSP);
- DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry);
+ pThis->ctrSuspendDuration += suspendDuration;
+ if(pThis->iNbrResRtry == 0) {
+ STATSCOUNTER_INC(pThis->ctrSuspend, pThis->mutCtrSuspend);
+ }
+ if( pThis->bReportSuspensionCont
+ || (pThis->bReportSuspension && pThis->iNbrResRtry == 0) ) {
+ ctime_r(&pThis->ttResumeRtry, timebuf);
+ timebuf[strlen(timebuf)-1] = '\0'; /* strip LF */
+ errmsg.LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING,
+ "action '%s' suspended, next retry is %s",
+ pThis->pszName, timebuf);
+ }
+ DBGPRINTF("action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d\n",
+ pThis->pszName, (long long) pThis->ttResumeRtry, (long long) ttNow,
+ pThis->iNbrResRtry);
}
@@ -647,9 +697,9 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
iRetries = 0;
while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
- DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries);
+ DBGPRINTF("actionDoRetry: %s enter loop, iRetries=%d\n", pThis->pszName, iRetries);
iRet = pThis->pMod->tryResume(pThis->pModData);
- DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet);
+ DBGPRINTF("actionDoRetry: %s action->tryResume returned %d\n", pThis->pszName, iRet);
if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) {
bTreatOKasSusp = 1;
pThis->iResumeOKinRow = 0;
@@ -657,16 +707,20 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
bTreatOKasSusp = 0;
}
if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) {
- DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet);
+ DBGPRINTF("actionDoRetry: %s had success RDY again (iRet=%d)\n",
+ pThis->pszName, iRet);
+ pThis->bJustResumed = 1;
actionSetState(pThis, ACT_STATE_RDY);
} else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) {
/* max retries reached? */
- DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n",
- pThis->iResumeRetryCount, iRetries);
+ DBGPRINTF("actionDoRetry: %s check for max retries, iResumeRetryCount "
+ "%d, iRetries %d\n",
+ pThis->pszName, pThis->iResumeRetryCount, iRetries);
if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
actionSuspend(pThis);
+ if(pThis->iNbrResRtry < 20)
+ ++pThis->iNbrResRtry;
} else {
- ++pThis->iNbrResRtry;
++iRetries;
iSleepPeriod = pThis->iResumeInterval;
srSleep(iSleepPeriod, 0);
@@ -901,6 +955,25 @@ static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch)
done: RETiRet;
}
+/* This is used in resume processing. We only finally know that a resume
+ * worked when we have been able to actually process a messages. As such,
+ * we need to do some cleanup and status tracking in that case.
+ */
+static void
+actionSetActionWorked(action_t *__restrict__ const pThis)
+{
+ pThis->iResumeOKinRow = 0; /* we had a successful call! */
+
+ if(pThis->bJustResumed) {
+ /* OK, we *really* could resume, so tell user! */
+ if(pThis->bReportSuspension) {
+ errmsg.LogMsg(0, RS_RET_RESUMED, LOG_INFO, "action '%s' "
+ "resumed (module '%s')",
+ pThis->pszName, pThis->pMod->pszName);
+ }
+ pThis->bJustResumed = 0;
+ }
+}
/* call the DoAction output plugin entry point
* rgerhards, 2008-01-28
@@ -920,16 +993,16 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams)
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis);
- pThis->iResumeOKinRow = 0; /* we had a successful call! */
+ actionSetActionWorked(pThis); /* we had a successful call! */
break;
case RS_RET_DEFER_COMMIT:
- pThis->iResumeOKinRow = 0; /* we had a successful call! */
+ actionSetActionWorked(pThis); /* we had a successful call! */
/* we are done, action state remains the same */
break;
case RS_RET_PREVIOUS_COMMITTED:
/* action state remains the same, but we had a commit. */
pThis->bHadAutoCommit = 1;
- pThis->iResumeOKinRow = 0; /* we had a successful call! */
+ actionSetActionWorked(pThis); /* we had a successful call! */
break;
case RS_RET_SUSPENDED:
actionRetry(pThis);
@@ -1770,6 +1843,10 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals)
pAction->bRepMsgHasMsg = pvals[i].val.d.n;
} else if(!strcmp(pblk.descr[i].name, "action.resumeretrycount")) {
pAction->iResumeRetryCount = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "action.reportsuspension")) {
+ pAction->bReportSuspension = (int) pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "action.reportsuspensioncontinuation")) {
+ pAction->bReportSuspensionCont = (int) pvals[i].val.d.n;
} else if(!strcmp(pblk.descr[i].name, "action.resumeinterval")) {
pAction->iResumeInterval = pvals[i].val.d.n;
} else {
@@ -1789,7 +1866,7 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals)
rsRetVal
addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
omodStringRequest_t *pOMSR, struct cnfparamvals *actParams,
- struct cnfparamvals *queueParams, int bSuspended)
+ struct nvlst *lst, int bSuspended)
{
DEFiRet;
int i;
@@ -1882,7 +1959,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
if(bSuspended)
actionSuspend(pAction);
- CHKiRet(actionConstructFinalize(pAction, queueParams));
+ CHKiRet(actionConstructFinalize(pAction, lst));
/* TODO: if we exit here, we have a memory leak... */
@@ -1941,26 +2018,19 @@ rsRetVal
actionNewInst(struct nvlst *lst, action_t **ppAction)
{
struct cnfparamvals *paramvals;
- struct cnfparamvals *queueParams;
modInfo_t *pMod;
uchar *cnfModName = NULL;
omodStringRequest_t *pOMSR;
void *pModData;
action_t *pAction;
- int typeIdx;
DEFiRet;
paramvals = nvlstGetParams(lst, &pblk, NULL);
if(paramvals == NULL) {
- ABORT_FINALIZE(RS_RET_ERR);
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
}
dbgprintf("action param blk after actionNewInst:\n");
cnfparamsPrint(&pblk, paramvals);
- typeIdx = cnfparamGetIdx(&pblk, "type");
- if(paramvals[typeIdx].bUsed == 0) {
- errmsg.LogError(0, RS_RET_CONF_RQRD_PARAM_MISSING, "action type missing");
- ABORT_FINALIZE(RS_RET_CONF_RQRD_PARAM_MISSING); // TODO: move this into rainerscript handlers
- }
cnfModName = (uchar*)es_str2cstr(paramvals[cnfparamGetIdx(&pblk, ("type"))].val.d.estr, NULL);
if((pMod = module.FindWithCnfName(loadConf, cnfModName, eMOD_OUT)) == NULL) {
errmsg.LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName);
@@ -1972,9 +2042,7 @@ actionNewInst(struct nvlst *lst, action_t **ppAction)
FINALIZE; /* iRet is already set to error state */
}
- qqueueDoCnfParams(lst, &queueParams);
-
- if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, queueParams,
+ if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst,
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
/* check if the module is compatible with select features
* (currently no such features exist) */