summaryrefslogtreecommitdiff
path: root/action.c
diff options
context:
space:
mode:
authorMichael Biebl <biebl@debian.org>2014-04-03 03:08:50 +0200
committerMichael Biebl <biebl@debian.org>2014-04-03 03:08:50 +0200
commit9374a46543e9c43c009f80def8c3b2506b0b377e (patch)
tree8853fd40ee8d55ff24304ff8a4421640f3493c58 /action.c
parent209e193f14ec562df5aad945f04cd88b227cc602 (diff)
downloadrsyslog-9374a46543e9c43c009f80def8c3b2506b0b377e.tar.gz
Imported Upstream version 8.2.0upstream/8.2.0
Diffstat (limited to 'action.c')
-rw-r--r--action.c1168
1 files changed, 467 insertions, 701 deletions
diff --git a/action.c b/action.c
index d16269d..b676fe1 100644
--- a/action.c
+++ b/action.c
@@ -13,18 +13,15 @@
* The different modes (and calling sequence) are:
*
* if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval
- * - doSubmitToActionQComplexBatch
- * - helperSubmitToActionQComplexBatch
- * - doActionCallAction
+ * - doSubmitToActionQComplex
* handles mark message reduction, but in essence calls
* - actionWriteToAction
* - qqueueEnqObj
* (now queue engine processing)
- * if(pThis->bWriteAllMarkMsgs == RSFALSE) - this is the DEFAULT
- * - doSubmitToActionQNotAllMarkBatch
- * - doSubmitToActionQBatch (and from here like in the else case below!)
+ * if(pThis->bWriteAllMarkMsgs == RSFALSE)
+ * - doSubmitToActionQNotAllMark
+ * - doSubmitToActionQ (and from here like in the else case below!)
* else
- * - doSubmitToActionQBatch
* - doSubmitToActionQ
* - qqueueEnqObj
* (now queue engine processing)
@@ -36,9 +33,6 @@
*
* After dequeue, processing is as follows:
* - processBatchMain
- * - processAction
- * - submitBatch
- * - tryDoAction
* - ...
*
* MORE ON PROCESSING, QUEUES and FILTERING
@@ -120,13 +114,12 @@
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
-static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*);
-static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch);
-static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch);
-static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch);
+static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t * const pWti);
+static rsRetVal doSubmitToActionQ(action_t * const pAction, wti_t * const pWti, msg_t*);
+static rsRetVal doSubmitToActionQComplex(action_t * const pAction, wti_t * const pWti, msg_t*);
+static rsRetVal doSubmitToActionQNotAllMark(action_t * const pAction, wti_t * const pWti, msg_t*);
/* object static data (once for all instances) */
-/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
DEFobjCurrIf(obj)
DEFobjCurrIf(datetime)
DEFobjCurrIf(module)
@@ -176,10 +169,9 @@ configSettings_t cs_save; /* our saved (scope!) config settings */
/* the counter below counts actions created. It is used to obtain unique IDs for the action. They
* should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice
* to have during one instance of an rsyslogd run. For example, I use them to name actions when there
- * is no better name available. Note that I do NOT recover previous numbers on HUP - we simply keep
- * counting. -- rgerhards, 2008-01-29
+ * is no better name available.
*/
-static int iActionNbr = 0;
+int iActionNbr = 0;
int bActionReportSuspension = 1;
int bActionReportSuspensionCont = 0;
@@ -229,7 +221,7 @@ static struct cnfparamblk pblk =
* a lot. So we simply return the system time.
*/
static inline time_t
-getActNow(action_t *pThis)
+getActNow(action_t * const pThis)
{
assert(pThis != NULL);
if(pThis->tActNow == -1) {
@@ -289,7 +281,7 @@ actionResetQueueParams(void)
/* destructs an action descriptor object
* rgerhards, 2007-08-01
*/
-rsRetVal actionDestruct(action_t *pThis)
+rsRetVal actionDestruct(action_t * const pThis)
{
DEFiRet;
ASSERT(pThis != NULL);
@@ -313,7 +305,6 @@ rsRetVal actionDestruct(action_t *pThis)
pThis->pMod->freeInstance(pThis->pModData);
pthread_mutex_destroy(&pThis->mutAction);
- pthread_mutex_destroy(&pThis->mutActExec);
d_free(pThis->pszName);
d_free(pThis->ppTpl);
@@ -323,6 +314,19 @@ finalize_it:
}
+/* Disable action, this means it will never again be usable
+ * until rsyslog is reloaded. Use only as a last resort, but
+ * depends on output module.
+ * rgerhards, 2007-08-02
+ */
+static inline void
+actionDisable(action_t *__restrict__ const pThis)
+{
+ pThis->bDisabled = 1;
+}
+
+
+
/* create a new action descriptor object
* rgerhards, 2007-08-01
* Note that it is vital to set proper initial values as the v6 config
@@ -339,17 +343,18 @@ rsRetVal actionConstruct(action_t **ppThis)
pThis->iResumeInterval = 30;
pThis->iResumeRetryCount = 0;
pThis->pszName = NULL;
- pThis->bWriteAllMarkMsgs = RSFALSE;
+ pThis->bWriteAllMarkMsgs = 1;
pThis->iExecEveryNthOccur = 0;
pThis->iExecEveryNthOccurTO = 0;
pThis->iSecsExecOnceInterval = 0;
pThis->bExecWhenPrevSusp = 0;
pThis->bRepMsgHasMsg = 0;
+ pThis->bDisabled = 0;
+ pThis->isTransactional = 0;
pThis->bReportSuspension = -1; /* indicate "not yet set" */
pThis->bReportSuspensionCont = -1; /* indicate "not yet set" */
- pThis->bJustResumed = 0;
pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */
- pthread_mutex_init(&pThis->mutActExec, NULL);
+ pThis->iActionNbr = iActionNbr;
pthread_mutex_init(&pThis->mutAction, NULL);
INIT_ATOMIC_HELPER_MUT(pThis->mutCAS);
@@ -365,13 +370,11 @@ finalize_it:
/* action construction finalizer
*/
rsRetVal
-actionConstructFinalize(action_t *pThis, struct nvlst *lst)
+actionConstructFinalize(action_t *__restrict__ const pThis, struct nvlst *lst)
{
DEFiRet;
uchar pszAName[64]; /* friendly name of our action */
- ASSERT(pThis != NULL);
-
if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) {
/* discard actions will be optimized out */
FINALIZE;
@@ -382,6 +385,19 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst)
pThis->pszName = ustrdup(pszAName);
}
+ /* cache transactional attribute */
+ pThis->isTransactional = pThis->pMod->mod.om.supportsTX;
+ if(pThis->isTransactional && pThis->eParamPassing != ACT_STRING_PASSING) {
+ errmsg.LogError(0, RS_RET_INVLD_OMOD, "action '%s'(%d) is transactional but "
+ "uses invalid paramter passing mode -- disabling "
+ "action. This is probably caused by a pre-v7 "
+ "output module that needs upgrade.",
+ pThis->pszName, pThis->iActionNbr);
+ actionDisable(pThis);
+ ABORT_FINALIZE(RS_RET_INVLD_OMOD);
+ }
+
+
/* support statistics gathering */
CHKiRet(statsobj.Construct(&pThis->statsobj));
CHKiRet(statsobj.SetName(pThis->statsobj, pThis->pszName));
@@ -425,13 +441,13 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst)
DBGPRINTF("info: firehose mode disabled for action because "
"iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n",
pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval);
- pThis->submitToActQ = doSubmitToActionQComplexBatch;
- } else if(pThis->bWriteAllMarkMsgs == RSFALSE) {
- /* nearly full-speed submission mode, default case */
- pThis->submitToActQ = doSubmitToActionQNotAllMarkBatch;
+ pThis->submitToActQ = doSubmitToActionQComplex;
+ } else if(pThis->bWriteAllMarkMsgs) {
+ /* full firehose submission mode, default case*/
+ pThis->submitToActQ = doSubmitToActionQ;
} else {
- /* full firehose submission mode */
- pThis->submitToActQ = doSubmitToActionQBatch;
+ /* nearly full-speed submission mode */
+ pThis->submitToActQ = doSubmitToActionQNotAllMark;
}
/* create queue */
@@ -441,7 +457,7 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst)
* spec. -- rgerhards, 2008-01-30
*/
CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize,
- (rsRetVal (*)(void*, batch_t*, int*))processBatchMain));
+ processBatchMain));
obj.SetName((obj_t*) pThis->pQueue, pszAName);
qqueueSetpAction(pThis->pQueue, pThis);
@@ -518,9 +534,9 @@ rsRetVal actionSetGlobalResumeInterval(int iNewVal)
* returned string must not be modified.
* rgerhards, 2009-05-07
*/
-static uchar *getActStateName(action_t *pThis)
+static uchar *getActStateName(action_t * const pThis, wti_t * const pWti)
{
- switch(pThis->eState) {
+ switch(getActionState(pWti, pThis)) {
case ACT_STATE_RDY:
return (uchar*) "rdy";
case ACT_STATE_ITX:
@@ -529,8 +545,6 @@ static uchar *getActStateName(action_t *pThis)
return (uchar*) "rtry";
case ACT_STATE_SUSP:
return (uchar*) "susp";
- case ACT_STATE_DIED:
- return (uchar*) "died";
case ACT_STATE_COMM:
return (uchar*) "comm";
default:
@@ -542,12 +556,12 @@ static uchar *getActStateName(action_t *pThis)
/* returns a suitable return code based on action state
* rgerhards, 2009-05-07
*/
-static rsRetVal getReturnCode(action_t *pThis)
+static rsRetVal getReturnCode(action_t * const pThis, wti_t * const pWti)
{
DEFiRet;
ASSERT(pThis != NULL);
- switch(pThis->eState) {
+ switch(getActionState(pWti, pThis)) {
case ACT_STATE_RDY:
iRet = RS_RET_OK;
break;
@@ -563,12 +577,11 @@ static rsRetVal getReturnCode(action_t *pThis)
iRet = RS_RET_SUSPENDED;
break;
case ACT_STATE_SUSP:
- case ACT_STATE_DIED:
iRet = RS_RET_ACTION_FAILED;
break;
default:
- DBGPRINTF("Invalid action engine state %d, program error\n",
- (int) pThis->eState);
+ DBGPRINTF("Invalid action engine state %u, program error\n",
+ getActionState(pWti, pThis));
iRet = RS_RET_ERR;
break;
}
@@ -580,43 +593,33 @@ static rsRetVal getReturnCode(action_t *pThis)
/* set the action to a new state
* rgerhards, 2007-08-02
*/
-static inline void actionSetState(action_t *pThis, action_state_t newState)
+static inline void
+actionSetState(action_t * const pThis, wti_t * const pWti, uint8_t newState)
{
- pThis->eState = newState;
- DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis));
+ setActionState(pWti, pThis, newState);
+ DBGPRINTF("Action %d transitioned to state: %s\n",
+ pThis->iActionNbr, getActStateName(pThis, pWti));
}
/* Handles the transient commit state. So far, this is
* mostly a dummy...
* rgerhards, 2007-08-02
*/
-static void actionCommitted(action_t *pThis)
+static void actionCommitted(action_t * const pThis, wti_t * const pWti)
{
- actionSetState(pThis, ACT_STATE_RDY);
+ actionSetState(pThis, pWti, ACT_STATE_RDY);
}
/* set action to "rtry" state.
* rgerhards, 2007-08-02
*/
-static void actionRetry(action_t *pThis)
+static void actionRetry(action_t * const pThis, wti_t * const pWti)
{
- actionSetState(pThis, ACT_STATE_RTRY);
- pThis->iResumeOKinRow++;
+ actionSetState(pThis, pWti, ACT_STATE_RTRY);
+ incActionResumeInRow(pWti, pThis);
}
-
-/* Disable action, this means it will never again be usable
- * until rsyslog is reloaded. Use only as a last resort, but
- * depends on output module.
- * rgerhards, 2007-08-02
- */
-static void actionDisable(action_t *pThis)
-{
- actionSetState(pThis, ACT_STATE_DIED);
-}
-
-
/* Suspend action, this involves changing the action state as well
* as setting the next retry time.
* if we have more than 10 retries, we prolong the
@@ -626,7 +629,7 @@ static void actionDisable(action_t *pThis)
* rgerhards, 2007-08-02
*/
static inline void
-actionSuspend(action_t * const pThis)
+actionSuspend(action_t * const pThis, wti_t * const pWti)
{
time_t ttNow;
int suspendDuration;
@@ -649,24 +652,26 @@ actionSuspend(action_t * const pThis)
* since caching, and this would break logic (and it actually did so!)
*/
datetime.GetTime(&ttNow);
- suspendDuration = pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
+ suspendDuration = pThis->iResumeInterval * (getActionNbrResRtry(pWti, pThis) / 10 + 1);
pThis->ttResumeRtry = ttNow + suspendDuration;
- actionSetState(pThis, ACT_STATE_SUSP);
+ actionSetState(pThis, pWti, ACT_STATE_SUSP);
pThis->ctrSuspendDuration += suspendDuration;
- if(pThis->iNbrResRtry == 0) {
+ if(getActionNbrResRtry(pWti, pThis) == 0) {
STATSCOUNTER_INC(pThis->ctrSuspend, pThis->mutCtrSuspend);
}
+
if( pThis->bReportSuspensionCont
- || (pThis->bReportSuspension && pThis->iNbrResRtry == 0) ) {
+ || (pThis->bReportSuspension && getActionNbrResRtry(pWti, pThis) == 0) ) {
ctime_r(&pThis->ttResumeRtry, timebuf);
timebuf[strlen(timebuf)-1] = '\0'; /* strip LF */
errmsg.LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING,
"action '%s' suspended, next retry is %s",
pThis->pszName, timebuf);
}
- DBGPRINTF("action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d\n",
+ DBGPRINTF("action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d, "
+ "duration %d\n",
pThis->pszName, (long long) pThis->ttResumeRtry, (long long) ttNow,
- pThis->iNbrResRtry);
+ getActionNbrResRtry(pWti, pThis), suspendDuration);
}
@@ -678,15 +683,15 @@ actionSuspend(action_t * const pThis)
* entry point. This is invalid, but has harsh consequences: it will cause the rsyslog
* engine to go into a tight loop. That obviously is not acceptable. As such, we track the
* count of iterations that a tryResume returning RS_RET_OK is immediately followed by
- * an unsuccessful call to doAction(). If that happens more than 1,000 times, we assume
+ * an unsuccessful call to doAction(). If that happens more than 10 times, we assume
* the return acutally is a RS_RET_SUSPENDED. In order to go through the various
- * resumption stages, we do this for every 1000 requests. This magic number 1000 may
+ * resumption stages, we do this for every 10 requests. This magic number 10 may
* not be the most appropriate, but it should be thought of a "if nothing else helps"
* kind of facility: in the first place, the module should return a proper indication
* of its inability to recover. -- rgerhards, 2010-04-26.
*/
-static inline rsRetVal
-actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
+static rsRetVal
+actionDoRetry(action_t * const pThis, wti_t * const pWti)
{
int iRetries;
int iSleepPeriod;
@@ -696,35 +701,40 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
ASSERT(pThis != NULL);
iRetries = 0;
- while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
+ while((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) {
DBGPRINTF("actionDoRetry: %s enter loop, iRetries=%d\n", pThis->pszName, iRetries);
- iRet = pThis->pMod->tryResume(pThis->pModData);
+ iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
DBGPRINTF("actionDoRetry: %s action->tryResume returned %d\n", pThis->pszName, iRet);
- if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) {
+ if((getActionResumeInRow(pWti, pThis) > 9) && (getActionResumeInRow(pWti, pThis) % 10 == 0)) {
bTreatOKasSusp = 1;
- pThis->iResumeOKinRow = 0;
+ setActionResumeInRow(pWti, pThis, 0);
} else {
bTreatOKasSusp = 0;
}
if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) {
DBGPRINTF("actionDoRetry: %s had success RDY again (iRet=%d)\n",
pThis->pszName, iRet);
- pThis->bJustResumed = 1;
- actionSetState(pThis, ACT_STATE_RDY);
+ if(pThis->bReportSuspension) {
+ errmsg.LogMsg(0, RS_RET_OK, LOG_INFO, "action '%s' "
+ "resumed (module '%s')",
+ pThis->pszName, pThis->pMod->pszName);
+ }
+ setActionJustResumed(pWti, pThis, 1);
+ actionSetState(pThis, pWti, ACT_STATE_RDY);
} else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) {
/* max retries reached? */
DBGPRINTF("actionDoRetry: %s check for max retries, iResumeRetryCount "
"%d, iRetries %d\n",
pThis->pszName, pThis->iResumeRetryCount, iRetries);
if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
- actionSuspend(pThis);
- if(pThis->iNbrResRtry < 20)
- ++pThis->iNbrResRtry;
+ actionSuspend(pThis, pWti);
+ if(getActionNbrResRtry(pWti, pThis) < 20)
+ incActionNbrResRtry(pWti, pThis);
} else {
++iRetries;
iSleepPeriod = pThis->iResumeInterval;
srSleep(iSleepPeriod, 0);
- if(*pbShutdownImmediate) {
+ if(*pWti->pbShutdownImmediate) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
}
@@ -733,8 +743,8 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
}
}
- if(pThis->eState == ACT_STATE_RDY) {
- pThis->iNbrResRtry = 0;
+ if(getActionState(pWti, pThis) == ACT_STATE_RDY) {
+ setActionNbrResRtry(pWti, pThis, 0);
}
finalize_it:
@@ -742,17 +752,32 @@ finalize_it:
}
+static rsRetVal
+actionCheckAndCreateWrkrInstance(action_t * const pThis, wti_t * const pWti)
+{
+ DEFiRet;
+ if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) {
+ DBGPRINTF("wti %p: we need to create a new action worker instance for "
+ "action %d\n", pWti, pThis->iActionNbr);
+ CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData),
+ pThis->pModData));
+ pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis;
+ setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */
+ }
+finalize_it:
+ RETiRet;
+}
+
/* try to resume an action -- rgerhards, 2007-08-02
* changed to new action state engine -- rgerhards, 2009-05-07
*/
-static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate)
+static rsRetVal
+actionTryResume(action_t * const pThis, wti_t * const pWti)
{
DEFiRet;
time_t ttNow = NO_TIME_PROVIDED;
- ASSERT(pThis != NULL);
-
- if(pThis->eState == ACT_STATE_SUSP) {
+ if(getActionState(pWti, pThis) == ACT_STATE_SUSP) {
/* if we are suspended, we need to check if the timeout expired.
* for this handling, we must always obtain a fresh timestamp. We used
* to use the action timestamp, but in this case we will never reach a
@@ -762,19 +787,19 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate)
*/
datetime.GetTime(&ttNow); /* cache "now" */
if(ttNow >= pThis->ttResumeRtry) {
- actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */
+ actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */
}
}
- if(pThis->eState == ACT_STATE_RTRY) {
+ if(getActionState(pWti, pThis) == ACT_STATE_RTRY) {
if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
datetime.GetTime(&ttNow);
- CHKiRet(actionDoRetry(pThis, pbShutdownImmediate));
+ CHKiRet(actionDoRetry(pThis, pWti));
}
- if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
+ if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) {
DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n",
- pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+ pThis, getActStateName(pThis, pWti), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
}
finalize_it:
@@ -786,24 +811,25 @@ finalize_it:
* depending on its current state.
* rgerhards, 2009-05-07
*/
-static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate)
+static inline rsRetVal
+actionPrepare(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti)
{
DEFiRet;
- assert(pThis != NULL);
- CHKiRet(actionTryResume(pThis, pbShutdownImmediate));
+ CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
+ CHKiRet(actionTryResume(pThis, pWti));
/* if we are now ready, we initialize the transaction and advance
* action state accordingly
*/
- if(pThis->eState == ACT_STATE_RDY) {
- iRet = pThis->pMod->mod.om.beginTransaction(pThis->pModData);
+ if(getActionState(pWti, pThis) == ACT_STATE_RDY) {
+ iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
- actionSetState(pThis, ACT_STATE_ITX);
+ actionSetState(pThis, pWti, ACT_STATE_ITX);
break;
case RS_RET_SUSPENDED:
- actionRetry(pThis);
+ actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
actionDisable(pThis);
@@ -817,10 +843,11 @@ finalize_it:
}
+#if 0 // TODO: remove?
/* debug-print the contents of an action object
* rgerhards, 2007-08-02
*/
-rsRetVal actionDbgPrint(action_t *pThis)
+static rsRetVal actionDbgPrint(action_t *pThis)
{
DEFiRet;
char *sz;
@@ -830,11 +857,12 @@ rsRetVal actionDbgPrint(action_t *pThis)
dbgprintf("\n");
dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData);
dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval);
- if(pThis->eState == ACT_STATE_SUSP) {
+#if 0 // do we need this ???
+ if(getActionState(pWti, pThis) == ACT_STATE_SUSP) {
dbgprintf("\tresume next retry: %u, number retries: %d",
(unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry);
}
- dbgprintf("\tState: %s\n", getActStateName(pThis));
+#endif
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
if(pThis->submitToActQ == doSubmitToActionQComplexBatch) {
sz = "slow, but feature-rich";
@@ -850,45 +878,57 @@ rsRetVal actionDbgPrint(action_t *pThis)
RETiRet;
}
+#endif
/* prepare the calling parameters for doAction()
* rgerhards, 2009-05-07
*/
static rsRetVal
-prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow)
+prepareDoActionParams(action_t * __restrict__ const pAction,
+ wti_t * __restrict__ const pWti,
+ msg_t *__restrict__ const pMsg,
+ struct syslogTime *ttNow)
{
int i;
- msg_t *pMsg;
struct json_object *json;
+ actWrkrIParams_t *iparams;
+ actWrkrInfo_t *__restrict__ pWrkrInfo;
DEFiRet;
- ASSERT(pAction != NULL);
- ASSERT(pElem != NULL);
-
- pMsg = pElem->pMsg;
- /* here we must loop to process all requested strings */
- for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- switch(pAction->eParamPassing) {
+ pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
+ if(pAction->isTransactional) {
+ CHKiRet(wtiNewIParam(pWti, pAction, &iparams));
+ for(i = 0 ; i < pAction->iNumTpls ; ++i) {
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg,
+ &actParam(iparams, pAction->iNumTpls, 0, i),
+ ttNow));
+ }
+ } else {
+ for(i = 0 ; i < pAction->iNumTpls ; ++i) {
+ switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]),
- &pElem->staticLenStrings[i], ttNow));
- pElem->staticActParams[i] = pElem->staticActStrings[i];
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg,
+ &(pWrkrInfo->p.nontx.actParams[i]),
+ ttNow));
break;
case ACT_ARRAY_PASSING:
- CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow));
+ CHKiRet(tplToArray(pAction->ppTpl[i], pMsg,
+ (uchar***) &(pWrkrInfo->p.nontx.actParams[i].param), ttNow));
break;
case ACT_MSG_PASSING:
- pElem->staticActParams[i] = (void*) pMsg;
+ pWrkrInfo->p.nontx.actParams[i].param = (void*) pMsg;
break;
case ACT_JSON_PASSING:
CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow));
- pElem->staticActParams[i] = (void*) json;
+ pWrkrInfo->p.nontx.actParams[i].param = (void*) json;
break;
- default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n",
+ default:dbgprintf("software bug/error: unknown pAction->eParamPassing "
+ "%d in prepareDoActionParams\n",
(int) pAction->eParamPassing);
assert(0); /* software bug if this happens! */
break;
+ }
}
}
@@ -897,62 +937,49 @@ finalize_it:
}
-/* free a batches ressources, but not string buffers (because they will
- * most probably be reused). String buffers are only deleted upon final
- * destruction of the batch.
- * This function here must be called only when the batch is actually no
- * longer used, also not for retrying actions or such. It invalidates
- * buffers.
- * rgerhards, 2010-12-17
- */
-static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch)
+static void
+releaseDoActionParams(action_t *__restrict__ const pAction, wti_t *__restrict__ const pWti)
{
int jArr;
- int i, j;
- batch_obj_t *pElem;
+ int j;
+ actWrkrInfo_t *__restrict__ pWrkrInfo;
uchar ***ppMsgs;
- DEFiRet;
-
- ASSERT(pAction != NULL);
if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING)
goto done; /* we need to do nothing with these types! */
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- pElem = &(pBatch->pElem[i]);
- if(batchIsValidElem(pBatch, i)) {
- switch(pAction->eParamPassing) {
- case ACT_ARRAY_PASSING:
- ppMsgs = (uchar***) pElem->staticActParams;
- for(j = 0 ; j < pAction->iNumTpls ; ++j) {
- if(((uchar**)ppMsgs)[j] != NULL) {
- jArr = 0;
- while(ppMsgs[j][jArr] != NULL) {
- d_free(ppMsgs[j][jArr]);
- ppMsgs[j][jArr] = NULL;
- ++jArr;
- }
- d_free(((uchar**)ppMsgs)[j]);
- ((uchar**)ppMsgs)[j] = NULL;
- }
+ pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
+ switch(pAction->eParamPassing) {
+ case ACT_ARRAY_PASSING:
+ ppMsgs = (uchar***) pWrkrInfo->p.nontx.actParams[0].param;
+ for(j = 0 ; j < pAction->iNumTpls ; ++j) {
+ if(((uchar**)ppMsgs)[j] != NULL) {
+ jArr = 0;
+ while(ppMsgs[j][jArr] != NULL) {
+ free(ppMsgs[j][jArr]);
+ ppMsgs[j][jArr] = NULL;
+ ++jArr;
}
- break;
- case ACT_JSON_PASSING:
- for(j = 0 ; j < pAction->iNumTpls ; ++j) {
- json_object_put((struct json_object*)
- pElem->staticActParams[j]);
- pElem->staticActParams[j] = NULL;
- }
- break;
- case ACT_STRING_PASSING:
- case ACT_MSG_PASSING:
- /* can never happen, just to keep compiler happy! */
- break;
+ free(((uchar**)ppMsgs)[j]);
+ ((uchar**)ppMsgs)[j] = NULL;
}
}
+ break;
+ case ACT_JSON_PASSING:
+ for(j = 0 ; j < pAction->iNumTpls ; ++j) {
+ json_object_put((struct json_object*)
+ pWrkrInfo->p.nontx.actParams[j].param);
+ pWrkrInfo->p.nontx.actParams[j].param = NULL;
+ }
+ break;
+ case ACT_STRING_PASSING:
+ /* strings are destructed when the worker terminates */
+ case ACT_MSG_PASSING:
+ /* can never happen, just to keep compiler happy! */
+ break;
}
-done: RETiRet;
+done: return;
}
/* This is used in resume processing. We only finally know that a resume
@@ -960,52 +987,43 @@ done: RETiRet;
* we need to do some cleanup and status tracking in that case.
*/
static void
-actionSetActionWorked(action_t *__restrict__ const pThis)
+actionSetActionWorked(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti)
{
- pThis->iResumeOKinRow = 0; /* we had a successful call! */
+ setActionResumeInRow(pWti, pThis, 0);
- if(pThis->bJustResumed) {
+ if(getActionJustResumed(pWti, pThis)) {
/* OK, we *really* could resume, so tell user! */
if(pThis->bReportSuspension) {
errmsg.LogMsg(0, RS_RET_RESUMED, LOG_INFO, "action '%s' "
"resumed (module '%s')",
pThis->pszName, pThis->pMod->pszName);
}
- pThis->bJustResumed = 0;
+ setActionJustResumed(pWti, pThis, 0);
}
}
-/* call the DoAction output plugin entry point
- * rgerhards, 2008-01-28
- */
-rsRetVal
-actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams)
+static rsRetVal
+handleActionExecResult(action_t *__restrict__ const pThis,
+ wti_t *__restrict__ const pWti,
+ const rsRetVal ret)
{
DEFiRet;
-
- ASSERT(pThis != NULL);
- ISOBJ_TYPE_assert(pMsg, msg);
-
- DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
-
- pThis->bHadAutoCommit = 0;
- iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData);
- switch(iRet) {
+ switch(ret) {
case RS_RET_OK:
- actionCommitted(pThis);
- actionSetActionWorked(pThis); /* we had a successful call! */
+ actionCommitted(pThis, pWti);
+ actionSetActionWorked(pThis, pWti); /* we had a successful call! */
break;
case RS_RET_DEFER_COMMIT:
- actionSetActionWorked(pThis); /* we had a successful call! */
+ actionSetActionWorked(pThis, pWti); /* we had a successful call! */
/* we are done, action state remains the same */
break;
case RS_RET_PREVIOUS_COMMITTED:
/* action state remains the same, but we had a commit. */
pThis->bHadAutoCommit = 1;
- actionSetActionWorked(pThis); /* we had a successful call! */
+ actionSetActionWorked(pThis, pWti); /* we had a successful call! */
break;
case RS_RET_SUSPENDED:
- actionRetry(pThis);
+ actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
actionDisable(pThis);
@@ -1013,70 +1031,134 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams)
default:/* permanent failure of this message - no sense in retrying. This is
* not yet handled (but easy TODO)
*/
+ iRet = ret;
FINALIZE;
}
- iRet = getReturnCode(pThis);
+ iRet = getReturnCode(pThis, pWti);
finalize_it:
RETiRet;
}
+/* call the DoAction output plugin entry point
+ * rgerhards, 2008-01-28
+ */
+static rsRetVal
+actionCallDoAction(action_t *__restrict__ const pThis,
+ actWrkrIParams_t *__restrict__ const iparams,
+ wti_t *__restrict__ const pWti)
+{
+ uchar *param[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+ int i;
+ DEFiRet;
+
+ DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n",
+ getActStateName(pThis, pWti), pThis->iActionNbr);
+
+ pThis->bHadAutoCommit = 0;
+ /* for this interface, we need to emulate the old style way
+ * of parameter passing.
+ */
+ for(i = 0 ; i < pThis->iNumTpls ; ++i) {
+ param[i] = actParam(iparams, pThis->iNumTpls, 0, i).param;
+ }
+
+ iRet = pThis->pMod->mod.om.doAction(param,
+ pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
+ iRet = handleActionExecResult(pThis, pWti, iRet);
+ RETiRet;
+}
+
+
+/* call the commitTransaction output plugin entry point */
+static rsRetVal
+actionCallCommitTransaction(action_t * const pThis,
+ const actWrkrInfo_t *const wrkrInfo,
+ wti_t *const pWti)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ DBGPRINTF("entering actionCallCommitTransaction(), state: %s, actionNbr %d, "
+ "nMsgs %u\n",
+ getActStateName(pThis, pWti), pThis->iActionNbr,
+ wrkrInfo->p.tx.currIParam);
+
+ iRet = pThis->pMod->mod.om.commitTransaction(
+ pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData,
+ wrkrInfo->p.tx.iparams, wrkrInfo->p.tx.currIParam);
+ iRet = handleActionExecResult(pThis, pWti, iRet);
+ RETiRet;
+}
+
/* process a message
* this readies the action and then calls doAction()
* rgerhards, 2008-01-28
*/
-static inline rsRetVal
-actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate)
+rsRetVal
+actionProcessMessage(action_t * const pThis, void *actParams, wti_t * const pWti)
{
DEFiRet;
- ASSERT(pThis != NULL);
- ISOBJ_TYPE_assert(pMsg, msg);
-
- CHKiRet(actionPrepare(pThis, pbShutdownImmediate));
+ CHKiRet(actionPrepare(pThis, pWti));
if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
- pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate);
- if(pThis->eState == ACT_STATE_ITX)
- CHKiRet(actionCallDoAction(pThis, pMsg, actParams));
+ pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pWti->pbShutdownImmediate);
+ if(getActionState(pWti, pThis) == ACT_STATE_ITX)
+ CHKiRet(actionCallDoAction(pThis, actParams, pWti));
- iRet = getReturnCode(pThis);
+ iRet = getReturnCode(pThis, pWti);
finalize_it:
RETiRet;
}
-/* finish processing a batch. Most importantly, that means we commit if we
- * need to do so.
- * rgerhards, 2008-01-28
- */
+/* the following functions simulates a potential future new omo callback */
static rsRetVal
-finishBatch(action_t *pThis, batch_t *pBatch)
+doTransaction(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti)
{
+ actWrkrInfo_t *wrkrInfo;
int i;
DEFiRet;
- ASSERT(pThis != NULL);
-
- if(pThis->eState == ACT_STATE_RDY) {
- /* we just need to flag the batch as commited */
- FINALIZE; /* nothing to do */
+ wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]);
+ if(pThis->pMod->mod.om.commitTransaction != NULL) {
+ DBGPRINTF("doTransaction: have commitTransaction IF, using that, pWrkrInfo %p\n", wrkrInfo);
+ CHKiRet(actionCallCommitTransaction(pThis, wrkrInfo, pWti));
+ } else { /* note: this branch is for compatibility with old TX modules */
+ DBGPRINTF("doTransaction: action %d, currIParam %d\n",
+ pThis->iActionNbr, wrkrInfo->p.tx.currIParam);
+ for(i = 0 ; i < wrkrInfo->p.tx.currIParam ; ++i) {
+ /* Note: we provide the message's base iparam - actionProcessMessage()
+ * uses this as *base* address.
+ */
+ iRet = actionProcessMessage(pThis,
+ &actParam(wrkrInfo->p.tx.iparams, pThis->iNumTpls, i, 0), pWti);
+ }
}
+finalize_it:
+ RETiRet;
+}
- CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate));
- if(pThis->eState == ACT_STATE_ITX) {
- iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData);
+
+/* Commit try committing (do not handle retry processing and such) */
+static rsRetVal
+actionTryCommit(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti)
+{
+ DEFiRet;
+
+ doTransaction(pThis, pWti);
+
+ CHKiRet(actionPrepare(pThis, pWti));
+ if(getActionState(pWti, pThis) == ACT_STATE_ITX) {
+ iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
- actionCommitted(pThis);
- /* flag messages as committed */
- for(i = 0 ; i < pBatch->nElem ; ++i) {
- batchSetElemState(pBatch, i, BATCH_STATE_COMM);
- pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */
- }
+ actionCommitted(pThis, pWti);
break;
case RS_RET_SUSPENDED:
- actionRetry(pThis);
+ actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
actionDisable(pThis);
@@ -1084,12 +1166,12 @@ finishBatch(action_t *pThis, batch_t *pBatch)
case RS_RET_DEFER_COMMIT:
DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT "
"- ignored\n");
- actionCommitted(pThis);
+ actionCommitted(pThis, pWti);
break;
case RS_RET_PREVIOUS_COMMITTED:
DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED "
"- ignored\n");
- actionCommitted(pThis);
+ actionCommitted(pThis, pWti);
break;
default:/* permanent failure of this message - no sense in retrying. This is
* not yet handled (but easy TODO)
@@ -1097,325 +1179,172 @@ finishBatch(action_t *pThis, batch_t *pBatch)
FINALIZE;
}
}
- iRet = getReturnCode(pThis);
+ iRet = getReturnCode(pThis, pWti);
finalize_it:
+ pWti->actWrkrInfo[pThis->iActionNbr].p.tx.currIParam = 0; /* reset to beginning */
RETiRet;
}
-
-/* try to submit a partial batch of elements.
- * rgerhards, 2009-05-12
- */
-static inline rsRetVal
-tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
-{
- int i;
- int iElemProcessed;
- int iCommittedUpTo;
- msg_t *pMsg;
- rsRetVal localRet;
- DEFiRet;
-
- assert(pBatch != NULL);
- assert(pnElem != NULL);
-
- i = pBatch->iDoneUpTo; /* all messages below that index are processed */
- iElemProcessed = 0;
- iCommittedUpTo = i;
- DBGPRINTF("tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem);
- while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
- if(*(pBatch->pbShutdownImmediate))
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
- /* NOTE: do NOT extend the filter below! Anything else must be done on the
- * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15
- */
- if(batchIsValidElem(pBatch, i)) {
- pMsg = pBatch->pElem[i].pMsg;
- localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
- pBatch->pbShutdownImmediate);
- DBGPRINTF("action %p call returned %d\n", pAction, localRet);
- /* Note: we directly modify the batch object state, because we know that
- * wo do not overwrite BATCH_STATE_DISC indicators!
- */
- if(localRet == RS_RET_OK) {
- /* mark messages as committed */
- while(iCommittedUpTo <= i) {
- pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
- batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM);
- ++iCommittedUpTo;
- //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
- }
- } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
- /* mark messages as committed */
- while(iCommittedUpTo < i) {
- pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
- batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM);
- ++iCommittedUpTo;
- //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
- }
- pBatch->eltState[i] = BATCH_STATE_SUB;
- } else if(localRet == RS_RET_DEFER_COMMIT) {
- pBatch->eltState[i] = BATCH_STATE_SUB;
- } else if(localRet == RS_RET_DISCARDMSG) {
- pBatch->eltState[i] = BATCH_STATE_DISC;
- } else {
- dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n",
- localRet, *pnElem, iCommittedUpTo);
- iRet = localRet;
- FINALIZE;
- }
- }
- ++i;
- ++iElemProcessed;
- }
-
-finalize_it:
- if(pBatch->iDoneUpTo != iCommittedUpTo) {
- pBatch->iDoneUpTo = iCommittedUpTo;
- }
- RETiRet;
-}
-
-/* submit a batch for actual action processing.
- * The first nElem elements are processed. This function calls itself
- * recursively if it needs to handle errors.
- * Note: we don't need the number of the first message to be processed as a parameter,
- * because this is kept track of inside the batch itself (iDoneUpTo).
- * rgerhards, 2009-05-12
+/* Note: we currently need to return an iRet, as this is used in
+ * direct mode. TODO: However, it may be worth further investigating this,
+ * as it looks like there is no ultimate consumer of this code.
+ * rgerhards, 2013-11-06
*/
static rsRetVal
-submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
+actionCommit(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti)
{
- int i;
- int bDone;
- rsRetVal localRet;
- int wasDoneTo;
+ sbool bDone;
DEFiRet;
- assert(pBatch != NULL);
+ if(!pThis->isTransactional ||
+ pWti->actWrkrInfo[pThis->iActionNbr].p.tx.currIParam == 0 ||
+ getActionState(pWti, pThis) == ACT_STATE_SUSP
+ ) {
+ FINALIZE;
+ }
- DBGPRINTF("submitBatch: enter, nElem %d\n", nElem);
- wasDoneTo = pBatch->iDoneUpTo;
+ /* even more TODO:
+ This is the place where retry processing needs to go in. If the action
+ permanently fails, we should - as a new feature - add the capability to
+ write an error file. This is already done be omelasticsearch, and IMHO
+ pretty useful.
+ For the time being, I do NOT implement all of this (not even retry!)
+ as I want to get the rest of the engine to SISD (non-SIMD ;)) so that
+ I know any potential suprises and complications that arise out of this.
+ When this is done, I can come back here and complete this work. Obviously,
+ many features do not work in the mean time (but it is not planned to release
+ any of these partial implementations).
+ rgerhards, 2013-11-04
+ */
bDone = 0;
do {
- localRet = tryDoAction(pAction, pBatch, &nElem);
- if(localRet == RS_RET_FORCE_TERM) {
+ iRet = actionTryCommit(pThis, pWti);
+ DBGPRINTF("actionCommit, in retry loop, iRet %d\n", iRet);
+ if(iRet == RS_RET_FORCE_TERM) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
- }
- if( localRet == RS_RET_OK
- || localRet == RS_RET_PREVIOUS_COMMITTED
- || localRet == RS_RET_DEFER_COMMIT) {
- /* try commit transaction, once done, we can simply do so as if
- * that return state was returned from tryDoAction().
- */
- localRet = finishBatch(pAction, pBatch);
- }
-
- if( localRet == RS_RET_OK
- || localRet == RS_RET_PREVIOUS_COMMITTED
- || localRet == RS_RET_DEFER_COMMIT) {
+ } else if(iRet == RS_RET_OK ||
+ iRet == RS_RET_SUSPENDED ||
+ iRet == RS_RET_ACTION_FAILED) {
bDone = 1;
- } else if(localRet == RS_RET_SUSPENDED) {
- DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n");
- /* do nothing, this will retry the full batch */
- } else if(localRet == RS_RET_ACTION_FAILED) {
- /* in this case, everything not yet committed is BAD */
- for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) {
- if( pBatch->eltState[i] != BATCH_STATE_DISC
- && pBatch->eltState[i] != BATCH_STATE_COMM ) {
- pBatch->eltState[i] = BATCH_STATE_BAD;
- pBatch->pElem[i].bPrevWasSuspended = 1;
- STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail);
- }
- }
+ }
+ if(getActionState(pWti, pThis) == ACT_STATE_RDY ||
+ getActionState(pWti, pThis) == ACT_STATE_SUSP) {
bDone = 1;
- } else {
- if(nElem == 1) {
- batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD);
- bDone = 1;
- } else {
- /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
- DBGPRINTF("submitBatch recursing trying to find and exclude the culprit "
- "for iRet %d\n", localRet);
- submitBatch(pAction, pBatch, nElem / 2);
- submitBatch(pAction, pBatch, nElem - (nElem / 2));
- bDone = 1;
- }
}
- } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */
-
- if(*(pBatch->pbShutdownImmediate))
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
-
+ } while(!bDone);
finalize_it:
RETiRet;
}
-
-/* copy "active" array of batch, as we need to modify it. The caller
- * must make sure the new array is freed and the orginal batch
- * pointer is restored (thus the caller must save it). If active
- * is currently NULL, this is properly handled.
- * Note: the batches active pointer is modified, so it must be
- * saved BEFORE calling this function!
- * rgerhards, 2012-09-12
- */
-static rsRetVal
-copyActive(batch_t *pBatch)
+/* Commit all active transactions in *DIRECT mode* */
+void
+actionCommitAllDirect(wti_t *__restrict__ const pWti)
{
- sbool *active;
- DEFiRet;
+ int i;
+ action_t *pAction;
- CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
- if(pBatch->active == NULL)
- memset(active, 1, batchNumMsgs(pBatch));
- else
- memcpy(active, pBatch->active, batchNumMsgs(pBatch));
- pBatch->active = active;
-finalize_it:
- RETiRet;
+ for(i = 0 ; i < iActionNbr ; ++i) {
+ pAction = pWti->actWrkrInfo[i].pAction;
+ if(pAction == NULL)
+ continue;
+ DBGPRINTF("actionCommitAll: action %d, state %u, nbr to commit %d "
+ "isTransactional %d\n",
+ i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo->p.tx.currIParam,
+ pAction->isTransactional);
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
+ actionCommit(pAction, pWti);
+ }
}
-/* The following function prepares a batch for processing, that it is
- * reinitializes batch states, generates strings and does everything else
- * that needs to be done in order to make the batch ready for submission to
- * the actual output module. Note that we look at the precomputed
- * filter OK condition and process only those messages, that actually matched
- * the filter.
- * rgerhards, 2010-06-14
+/* process a single message. This is both called if we run from the
+ * cosumer side of an action queue as well as directly from the main
+ * queue thread if the action queue is set to "direct".
*/
-static inline rsRetVal
-prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr)
+static rsRetVal
+processMsgMain(action_t *__restrict__ const pAction,
+ wti_t *__restrict__ const pWti,
+ msg_t *__restrict__ const pMsg,
+ struct syslogTime *ttNow)
{
- int i;
- batch_obj_t *pElem;
- struct syslogTime ttNow;
DEFiRet;
- /* indicate we have not yet read the date */
- ttNow.year = 0;
-
- pBatch->iDoneUpTo = 0;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- pElem = &(pBatch->pElem[i]);
- if(batchIsValidElem(pBatch, i)) {
- pBatch->eltState[i] = BATCH_STATE_RDY;
- if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) {
- /* make sure we have our copy of "active" array */
- if(!*bMustRestoreActivePtr) {
- *activeSave = pBatch->active;
- copyActive(pBatch);
- }
- pBatch->active[i] = RSFALSE;
- }
- }
+ if(pAction->bExecWhenPrevSusp && !pWti->execState.bPrevWasSuspended) {
+ DBGPRINTF("action %d: NOT executing, as previous action was "
+ "not suspended\n", pAction->iActionNbr);
+ FINALIZE;
}
- RETiRet;
-}
+ iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow);
-/* receive a batch and process it. This includes retry handling.
- * rgerhards, 2009-05-12
- */
-static inline rsRetVal
-processAction(action_t *pAction, batch_t *pBatch)
-{
- DEFiRet;
-
- assert(pBatch != NULL);
- CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem));
- iRet = finishBatch(pAction, pBatch);
+ if(pAction->isTransactional) {
+ pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction;
+ DBGPRINTF("action %d is transactional - executing in commit phase\n", pAction->iActionNbr);
+ actionPrepare(pAction, pWti);
+ iRet = getReturnCode(pAction, pWti);
+ FINALIZE;
+ }
+ iRet = actionProcessMessage(pAction,
+ pWti->actWrkrInfo[pAction->iActionNbr].p.nontx.actParams,
+ pWti);
+ releaseDoActionParams(pAction, pWti);
finalize_it:
+ if(iRet == RS_RET_OK) {
+ if(pWti->execState.bDoAutoCommit)
+ iRet = actionCommit(pAction, pWti);
+ }
+ pWti->execState.bPrevWasSuspended = (iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED);
RETiRet;
}
-
-#pragma GCC diagnostic ignored "-Wempty-body"
-/* receive an array of to-process user pointers and submit them
- * for processing.
- * rgerhards, 2009-04-22
+/* This entry point is called by the ACTION queue (not main queue!)
*/
static rsRetVal
-processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
+processBatchMain(void *__restrict__ const pVoid,
+ batch_t *__restrict__ const pBatch,
+ wti_t *__restrict__ const pWti)
{
- int *pbShutdownImmdtSave;
- sbool *activeSave;
- int bMustRestoreActivePtr = 0;
- rsRetVal localRet;
+ action_t *__restrict__ const pAction = (action_t*__restrict__ const) pVoid;
+ int i;
+ struct syslogTime ttNow;
DEFiRet;
- assert(pBatch != NULL);
-
- if(pbShutdownImmediate != NULL) {
- pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
- pBatch->pbShutdownImmediate = pbShutdownImmediate;
- }
- CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr));
-
- /* We now must guard the output module against execution by multiple threads. The
- * plugin interface specifies that output modules must not be thread-safe (except
- * if they notify us they are - functionality not yet implemented...).
- * rgerhards, 2008-01-30
- */
- d_pthread_mutex_lock(&pAction->mutActExec);
- pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
-
- iRet = processAction(pAction, pBatch);
-
- pthread_cleanup_pop(1); /* unlock mutex */
-
- /* even if processAction failed, we need to release the batch (else we
- * have a memory leak). So we do this first, and then check if we need to
- * return an error code. If so, the code from processAction has priority.
- * rgerhards, 2010-12-17
- */
- localRet = releaseBatch(pAction, pBatch);
+ wtiResetExecState(pWti, pBatch);
+ /* indicate we have not yet read the date */
+ ttNow.year = 0;
- if(iRet == RS_RET_OK)
- iRet = localRet;
-
- if(bMustRestoreActivePtr) {
- free(pBatch->active);
- pBatch->active = activeSave;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*pWti->pbShutdownImmediate ; ++i) {
+ if(batchIsValidElem(pBatch, i)) {
+ iRet = processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow);
+ batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+ }
}
-finalize_it:
- if(pbShutdownImmediate != NULL)
- pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
+ iRet = actionCommit(pAction, pWti);
RETiRet;
}
-#pragma GCC diagnostic warning "-Wempty-body"
-/* call the HUP handler for a given action, if such a handler is defined. The
- * action mutex is locked, because the HUP handler most probably needs to modify
- * some internal state information.
- * rgerhards, 2008-10-22
+/* call the HUP handler for a given action, if such a handler is defined.
+ * Note that the action must be able to service HUP requests concurrently
+ * to any current doAction() processing.
*/
-#pragma GCC diagnostic ignored "-Wempty-body"
rsRetVal
-actionCallHUPHdlr(action_t *pAction)
+actionCallHUPHdlr(action_t * const pAction)
{
DEFiRet;
ASSERT(pAction != NULL);
DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP);
- if(pAction->pMod->doHUP == NULL) {
- FINALIZE; /* no HUP handler, so we are done ;) */
+ if(pAction->pMod->doHUP != NULL) {
+ CHKiRet(pAction->pMod->doHUP(pAction->pModData));
}
- d_pthread_mutex_lock(&pAction->mutActExec);
- pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
- CHKiRet(pAction->pMod->doHUP(pAction->pModData));
- pthread_cleanup_pop(1); /* unlock mutex */
-
finalize_it:
RETiRet;
}
-#pragma GCC diagnostic warning "-Wempty-body"
/* set the action message queue mode
@@ -1450,27 +1379,28 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
/* This submits the message to the action queue in case we do NOT need to handle repeat
* message processing. That case permits us to gain lots of freedom during processing
- * and thus speed. This is also utilized to submit messages in complex case once
+ * and thus speed. This is also utilized to submit messages in more complex cases once
* the complex logic has been applied ;)
* rgerhards, 2010-06-08
*/
-static inline rsRetVal
-doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
+static rsRetVal
+doSubmitToActionQ(action_t * const pAction, wti_t * const pWti, msg_t *pMsg)
{
+ struct syslogTime ttNow; // TODO: think if we can buffer this in pWti
DEFiRet;
- if(pAction->eState == ACT_STATE_DIED) {
- DBGPRINTF("action %p died, do NOT execute\n", pAction);
- FINALIZE;
- }
+ DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
- if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
- iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg));
- else
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
+ ttNow.year = 0;
+ iRet = processMsgMain(pAction, pWti, pMsg, &ttNow);
+ } else {/* in this case, we do single submits to the queue.
+ * TODO: optimize this, we may do at least a multi-submit!
+ */
iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
+ }
-finalize_it:
RETiRet;
}
@@ -1483,7 +1413,7 @@ finalize_it:
* be filtered out before calling us (what is done currently!).
*/
rsRetVal
-actionWriteToAction(action_t *pAction, msg_t *pMsg)
+actionWriteToAction(action_t * const pAction, msg_t *pMsg, wti_t * const pWti)
{
DEFiRet;
@@ -1538,44 +1468,46 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg)
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = doSubmitToActionQ(pAction, pMsg);
+ iRet = doSubmitToActionQ(pAction, pWti, pMsg);
finalize_it:
RETiRet;
}
-/* helper to actonCallAction, mostly needed because of this damn
- * pthread_cleanup_push() POSIX macro...
+/* Call configured action, most complex case with all features supported (and thus slow).
+ * rgerhards, 2010-06-08
*/
-static inline rsRetVal
-doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
+#pragma GCC diagnostic ignored "-Wempty-body"
+static rsRetVal
+doSubmitToActionQComplex(action_t * const pAction, wti_t * const pWti, msg_t *pMsg)
{
- msg_t *pMsg;
DEFiRet;
- pMsg = pBatch->pElem[idxBtch].pMsg;
+ d_pthread_mutex_lock(&pAction->mutAction);
+ pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction);
+ DBGPRINTF("Called action %p (complex case), logging to %s\n",
+ pAction, module.GetStateName(pAction->pMod));
+
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
+ // TODO: can we optimize the "now" handling again (was batch, I guess...)?
/* don't output marks to recently written outputs */
- if(pAction->bWriteAllMarkMsgs == RSFALSE
+ if(pAction->bWriteAllMarkMsgs == 0
&& (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
ABORT_FINALIZE(RS_RET_OK);
}
/* call the output driver */
- iRet = actionWriteToAction(pAction, pMsg);
+ iRet = actionWriteToAction(pAction, pMsg, pWti);
finalize_it:
- /* we need to update the batch to handle failover processing correctly */
- if(iRet == RS_RET_OK) {
- pBatch->pElem[idxBtch].bPrevWasSuspended = 0;
- } else if(iRet == RS_RET_ACTION_FAILED) {
- pBatch->pElem[idxBtch].bPrevWasSuspended = 1;
- }
+ d_pthread_mutex_unlock(&pAction->mutAction);
+ pthread_cleanup_pop(0); /* remove mutex cleanup handler */
RETiRet;
}
+#pragma GCC diagnostic warning "-Wempty-body"
/* helper to activateActions, it activates a specific action.
@@ -1583,7 +1515,7 @@ finalize_it:
DEFFUNC_llExecFunc(doActivateActions)
{
rsRetVal localRet;
- action_t *pThis = (action_t*) pData;
+ action_t * const pThis = (action_t*) pData;
BEGINfunc
localRet = qqueueStart(pThis->pQueue);
if(localRet != RS_RET_OK) {
@@ -1624,201 +1556,48 @@ activateActions(void)
* rgerhards, 2010-06-08
*/
static rsRetVal
-doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
+doSubmitToActionQNotAllMark(action_t * const pAction, wti_t * const pWti, msg_t * const pMsg)
{
- time_t now = 0;
+ int doProcess = 1;
time_t lastAct;
- int i;
- sbool *activeSave;
- DEFiRet;
-
- activeSave = pBatch->active;
- copyActive(pBatch);
-
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i])
- continue;
- if(now == 0) {
- now = datetime.GetTime(NULL); /* good time call - the only one done */
- }
- /* CAS loop, we write back a bit early, but that's OK... */
- /* we use reception time, not dequeue time - this is considered more appropriate and
- * also faster ;) -- rgerhards, 2008-09-17 */
- do {
- lastAct = pAction->f_time;
- if(pBatch->pElem[i].pMsg->msgFlags & MARK) {
- if((now - lastAct) < MarkInterval / 2) {
- pBatch->active[i] = 0;
- DBGPRINTF("batch item %d: action was recently called, ignoring "
- "mark message\n", i);
- break; /* do not update timestamp for non-written mark messages */
- }
- }
- } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct,
- pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0);
- if(pBatch->active[i]) {
- DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n",
- i, module.GetStateName(pAction->pMod));
- }
- }
-
- iRet = doSubmitToActionQBatch(pAction, pBatch);
-
- free(pBatch->active);
- pBatch->active = activeSave;
-
- RETiRet;
-}
-
-static inline void
-countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
-{
- int i;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( batchIsValidElem(pBatch, i)) {
- STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
- }
- }
-}
-
-
-/* enqueue a batch in direct mode. We have put this into its own function just to avoid
- * cluttering the actual submit function.
- * rgerhards, 2011-06-16
- */
-static inline rsRetVal
-doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
-{
- sbool bNeedSubmit;
- sbool *activeSave;
- int i;
DEFiRet;
- activeSave = pBatch->active;
- copyActive(pBatch);
-
- /* note: for direct mode, we need to adjust the filter property. For non-direct
- * this is not necessary, because in that case we enqueue only what actually needs
- * to be processed.
+ /* TODO: think about the whole logic. If messages come in out of order, things
+ * tend to become a bit unreliable. On the other hand, this only happens if we have
+ * very high traffic, in which this use case here is not really affected (as the
+ * MarkInterval is pretty corase).
*/
- if(pAction->bExecWhenPrevSusp) {
- bNeedSubmit = 0;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if(!pBatch->pElem[i].bPrevWasSuspended) {
- DBGPRINTF("action enq stage: change active to 0 due to "
- "failover case in elem %d\n", i);
- pBatch->active[i] = 0;
- }
- if(batchIsValidElem(pBatch, i)) {
- STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
- bNeedSubmit = 1;
- }
- DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i],
- pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- }
- if(bNeedSubmit) {
- /* note: stats were already computed above */
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
- } else {
- DBGPRINTF("no need to submit batch, all invalid\n");
- }
- } else {
- if(GatherStats)
- countStatsBatchEnq(pAction, pBatch);
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
- }
-
- free(pBatch->active);
- pBatch->active = activeSave;
- RETiRet;
-}
-
-/* This submits the message to the action queue in case we do NOT need to handle repeat
- * message processing. That case permits us to gain lots of freedom during processing
- * and thus speed.
- * rgerhards, 2010-06-08
- */
-static rsRetVal
-doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
-{
- int i;
- DEFiRet;
-
- DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
-
- if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
- iRet = doQueueEnqObjDirectBatch(pAction, pBatch);
- } else {/* in this case, we do single submits to the queue.
- * TODO: optimize this, we may do at least a multi-submit!
- */
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i],
- pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- if( batchIsValidElem(pBatch, i)
- && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {
- doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg);
+ /* CAS loop, we write back a bit early, but that's OK... */
+ /* we use reception time, not dequeue time - this is considered more appropriate and
+ * also faster ;) -- rgerhards, 2008-09-17 */
+ do {
+ lastAct = pAction->f_time;
+ if(pMsg->msgFlags & MARK) {
+ if((pMsg->ttGenTime - lastAct) < MarkInterval / 2) {
+ doProcess = 0;
+ DBGPRINTF("action was recently called, ignoring mark message\n");
+ break; /* do not update timestamp for non-written mark messages */
}
}
- }
-
- RETiRet;
-}
+ } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct,
+ pMsg->ttGenTime, &pAction->mutCAS) == 0);
-
-
-/* Helper to submit a batch of actions to the engine. Note that we have rather
- * complicated processing here, so we need to do this one message after another.
- * rgerhards, 2010-06-23
- */
-static inline rsRetVal
-helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
-{
- int i;
- DEFiRet;
-
- DBGPRINTF("Called action %p (complex case), logging to %s\n",
- pAction, module.GetStateName(pAction->pMod));
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i],
- pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- if( batchIsValidElem(pBatch, i)
- && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
- doActionCallAction(pAction, pBatch, i);
- }
+ if(doProcess) {
+ DBGPRINTF("Called action(NotAllMark), processing via '%s'\n",
+ module.GetStateName(pAction->pMod));
+ iRet = doSubmitToActionQ(pAction, pWti, pMsg);
}
RETiRet;
}
-/* Call configured action, most complex case with all features supported (and thus slow).
- * rgerhards, 2010-06-08
- */
-#pragma GCC diagnostic ignored "-Wempty-body"
-static rsRetVal
-doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
-{
- DEFiRet;
-
- d_pthread_mutex_lock(&pAction->mutAction);
- pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction);
- iRet = helperSubmitToActionQComplexBatch(pAction, pBatch);
- d_pthread_mutex_unlock(&pAction->mutAction);
- pthread_cleanup_pop(0); /* remove mutex cleanup handler */
-
- RETiRet;
-}
-#pragma GCC diagnostic warning "-Wempty-body"
-
/* apply all params from param block to action. This supports the v6 config system.
* Defaults must have been set appropriately during action construct!
* rgerhards, 2011-08-01
*/
static rsRetVal
-actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals)
+actionApplyCnfParam(action_t * const pAction, struct cnfparamvals * const pvals)
{
int i;
@@ -1866,7 +1645,7 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals)
rsRetVal
addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
omodStringRequest_t *pOMSR, struct cnfparamvals *actParams,
- struct nvlst *lst, int bSuspended)
+ struct nvlst * const lst)
{
DEFiRet;
int i;
@@ -1895,7 +1674,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
pAction->bRepMsgHasMsg = cs.bActionRepMsgHasMsg;
cs.iActExecEveryNthOccur = 0; /* auto-reset */
cs.iActExecEveryNthOccurTO = 0; /* auto-reset */
- cs.bActionWriteAllMarkMsgs = RSFALSE; /* auto-reset */
+ cs.bActionWriteAllMarkMsgs = 1; /* auto-reset */
cs.pszActionName = NULL; /* free again! */
} else {
actionApplyCnfParam(pAction, actParams);
@@ -1953,15 +1732,10 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
pAction->pMod = pMod;
pAction->pModData = pModData;
- /* check if the module is compatible with select features (currently no such features exist) */
- pAction->eState = ACT_STATE_RDY; /* action is enabled */
-
- if(bSuspended)
- actionSuspend(pAction);
CHKiRet(actionConstructFinalize(pAction, lst));
- /* TODO: if we exit here, we have a memory leak... */
+ /* TODO: if we exit here, we have a (quite acceptable...) memory leak */
*ppAction = pAction; /* finally store the action pointer */
@@ -1997,7 +1771,7 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus
static inline void
initConfigVariables(void)
{
- cs.bActionWriteAllMarkMsgs = RSFALSE;
+ cs.bActionWriteAllMarkMsgs = 1;
cs.glbliActionResumeRetryCount = 0;
cs.bActExecWhenPrevSusp = 0;
cs.iActExecOnceInterval = 0;
@@ -2036,17 +1810,11 @@ actionNewInst(struct nvlst *lst, action_t **ppAction)
errmsg.LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName);
ABORT_FINALIZE(RS_RET_MOD_UNKNOWN);
}
- iRet = pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR);
- // TODO: check if RS_RET_SUSPENDED is still valid in v6!
- if(iRet != RS_RET_OK && iRet != RS_RET_SUSPENDED) {
- FINALIZE; /* iRet is already set to error state */
- }
+ CHKiRet(pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR));
- if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst,
- (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
+ if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst)) == RS_RET_OK) {
/* check if the module is compatible with select features
* (currently no such features exist) */
- pAction->eState = ACT_STATE_RDY; /* action is enabled */
loadConf->actions.nbrActions++; /* one more active action! */
}
*ppAction = pAction;
@@ -2057,8 +1825,6 @@ finalize_it:
RETiRet;
}
-/* TODO: we are not yet a real object, the ClassInit here just looks like it is..
- */
rsRetVal actionClassInit(void)
{
DEFiRet;