summaryrefslogtreecommitdiff
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c124
1 files changed, 67 insertions, 57 deletions
diff --git a/action.c b/action.c
index b676fe1..d652d0c 100644
--- a/action.c
+++ b/action.c
@@ -301,12 +301,13 @@ rsRetVal actionDestruct(action_t * const pThis)
if(pThis->statsobj != NULL)
statsobj.Destruct(&pThis->statsobj);
- if(pThis->pMod != NULL)
+ if(pThis->pModData != NULL)
pThis->pMod->freeInstance(pThis->pModData);
pthread_mutex_destroy(&pThis->mutAction);
d_free(pThis->pszName);
d_free(pThis->ppTpl);
+ d_free(pThis->peParamPassing);
finalize_it:
d_free(pThis);
@@ -387,14 +388,21 @@ actionConstructFinalize(action_t *__restrict__ const pThis, struct nvlst *lst)
/* cache transactional attribute */
pThis->isTransactional = pThis->pMod->mod.om.supportsTX;
- if(pThis->isTransactional && pThis->eParamPassing != ACT_STRING_PASSING) {
- errmsg.LogError(0, RS_RET_INVLD_OMOD, "action '%s'(%d) is transactional but "
- "uses invalid paramter passing mode -- disabling "
- "action. This is probably caused by a pre-v7 "
- "output module that needs upgrade.",
- pThis->pszName, pThis->iActionNbr);
- actionDisable(pThis);
- ABORT_FINALIZE(RS_RET_INVLD_OMOD);
+ if(pThis->isTransactional) {
+ int i;
+ for(i = 0 ; i < pThis->iNumTpls ; ++i) {
+ if(pThis->peParamPassing[i] != ACT_STRING_PASSING) {
+ errmsg.LogError(0, RS_RET_INVLD_OMOD, "action '%s'(%d) is transactional but "
+ "parameter %d "
+ "uses invalid paramter passing mode -- disabling "
+ "action. This is probably caused by a pre-v7 "
+ "output module that needs upgrade.",
+ pThis->pszName, pThis->iActionNbr, i);
+ actionDisable(pThis);
+ ABORT_FINALIZE(RS_RET_INVLD_OMOD);
+
+ }
+ }
}
@@ -506,7 +514,7 @@ actionConstructFinalize(action_t *__restrict__ const pThis, struct nvlst *lst)
DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue);
- if(pThis->eParamPassing == ACT_MSG_PASSING && pThis->pQueue->qType != QUEUETYPE_DIRECT) {
+ if(pThis->bUsesMsgPassingMode && pThis->pQueue->qType != QUEUETYPE_DIRECT) {
parser_warnmsg("module %s with message passing mode uses "
"non-direct queue. This most probably leads to undesired "
"results", (char*)modGetName(pThis->pMod));
@@ -906,7 +914,7 @@ prepareDoActionParams(action_t * __restrict__ const pAction,
}
} else {
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- switch(pAction->eParamPassing) {
+ switch(pAction->peParamPassing[i]) {
case ACT_STRING_PASSING:
CHKiRet(tplToString(pAction->ppTpl[i], pMsg,
&(pWrkrInfo->p.nontx.actParams[i]),
@@ -923,10 +931,8 @@ prepareDoActionParams(action_t * __restrict__ const pAction,
CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow));
pWrkrInfo->p.nontx.actParams[i].param = (void*) json;
break;
- default:dbgprintf("software bug/error: unknown pAction->eParamPassing "
- "%d in prepareDoActionParams\n",
- (int) pAction->eParamPassing);
- assert(0); /* software bug if this happens! */
+ default:dbgprintf("software bug/error: unknown pAction->peParamPassing[%d] %d in prepareDoActionParams\n",
+ i, (int) pAction->peParamPassing[i]);
break;
}
}
@@ -945,14 +951,11 @@ releaseDoActionParams(action_t *__restrict__ const pAction, wti_t *__restrict__
actWrkrInfo_t *__restrict__ pWrkrInfo;
uchar ***ppMsgs;
- if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING)
- goto done; /* we need to do nothing with these types! */
-
pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
- switch(pAction->eParamPassing) {
- case ACT_ARRAY_PASSING:
- ppMsgs = (uchar***) pWrkrInfo->p.nontx.actParams[0].param;
- for(j = 0 ; j < pAction->iNumTpls ; ++j) {
+ for(j = 0 ; j < pAction->iNumTpls ; ++j) {
+ switch(pAction->peParamPassing[j]) {
+ case ACT_ARRAY_PASSING:
+ ppMsgs = (uchar***) pWrkrInfo->p.nontx.actParams[0].param;
if(((uchar**)ppMsgs)[j] != NULL) {
jArr = 0;
while(ppMsgs[j][jArr] != NULL) {
@@ -963,23 +966,20 @@ releaseDoActionParams(action_t *__restrict__ const pAction, wti_t *__restrict__
free(((uchar**)ppMsgs)[j]);
((uchar**)ppMsgs)[j] = NULL;
}
- }
- break;
- case ACT_JSON_PASSING:
- for(j = 0 ; j < pAction->iNumTpls ; ++j) {
+ break;
+ case ACT_JSON_PASSING:
json_object_put((struct json_object*)
pWrkrInfo->p.nontx.actParams[j].param);
pWrkrInfo->p.nontx.actParams[j].param = NULL;
+ break;
+ case ACT_STRING_PASSING:
+ case ACT_MSG_PASSING:
+ /* no need to do anything with these */
+ break;
}
- break;
- case ACT_STRING_PASSING:
- /* strings are destructed when the worker terminates */
- case ACT_MSG_PASSING:
- /* can never happen, just to keep compiler happy! */
- break;
}
-done: return;
+ return;
}
/* This is used in resume processing. We only finally know that a resume
@@ -1288,7 +1288,8 @@ processMsgMain(action_t *__restrict__ const pAction,
iRet = actionProcessMessage(pAction,
pWti->actWrkrInfo[pAction->iActionNbr].p.nontx.actParams,
pWti);
- releaseDoActionParams(pAction, pWti);
+ if(pAction->bNeedReleaseBatch)
+ releaseDoActionParams(pAction, pWti);
finalize_it:
if(iRet == RS_RET_OK) {
if(pWti->execState.bDoAutoCommit)
@@ -1688,43 +1689,50 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
* the discard action, which does not require a string. -- rgerhards, 2007-07-30
*/
if(pAction->iNumTpls > 0) {
- /* we first need to create the template pointer array */
+ /* we first need to create the template arrays */
CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *)));
+ CHKmalloc(pAction->peParamPassing = (paramPassing_t*)calloc(pAction->iNumTpls, sizeof(paramPassing_t)));
}
+ pAction->bUsesMsgPassingMode = 0;
+ pAction->bNeedReleaseBatch = 0;
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
CHKiRet(OMSRgetEntry(pOMSR, i, &pTplName, &iTplOpts));
/* Ok, we got everything, so it now is time to look up the template
* (Hint: templates MUST be defined before they are used!)
*/
- if( !(iTplOpts & OMSR_TPL_AS_MSG)
- && (pAction->ppTpl[i] =
- tplFind(ourConf, (char*)pTplName, strlen((char*)pTplName))) == NULL) {
- snprintf(errMsg, sizeof(errMsg) / sizeof(char),
- " Could not find template '%s' - action disabled",
- pTplName);
- errno = 0;
- errmsg.LogError(0, RS_RET_NOT_FOUND, "%s", errMsg);
- ABORT_FINALIZE(RS_RET_NOT_FOUND);
- }
- /* check required template options */
- if( (iTplOpts & OMSR_RQD_TPL_OPT_SQL)
- && (pAction->ppTpl[i]->optFormatEscape == 0)) {
- errno = 0;
- errmsg.LogError(0, RS_RET_RQD_TPLOPT_MISSING, "Action disabled. To use this action, you have to specify "
- "the SQL or stdSQL option in your template!\n");
- ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING);
+ if(!(iTplOpts & OMSR_TPL_AS_MSG)) {
+ if((pAction->ppTpl[i] =
+ tplFind(ourConf, (char*)pTplName, strlen((char*)pTplName))) == NULL) {
+ snprintf(errMsg, sizeof(errMsg) / sizeof(char),
+ " Could not find template '%s' - action disabled",
+ pTplName);
+ errno = 0;
+ errmsg.LogError(0, RS_RET_NOT_FOUND, "%s", errMsg);
+ ABORT_FINALIZE(RS_RET_NOT_FOUND);
+ }
+ /* check required template options */
+ if( (iTplOpts & OMSR_RQD_TPL_OPT_SQL)
+ && (pAction->ppTpl[i]->optFormatEscape == 0)) {
+ errno = 0;
+ errmsg.LogError(0, RS_RET_RQD_TPLOPT_MISSING, "Action disabled. To use this action, you have to specify "
+ "the SQL or stdSQL option in your template!\n");
+ ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING);
+ }
}
/* set parameter-passing mode */
if(iTplOpts & OMSR_TPL_AS_ARRAY) {
- pAction->eParamPassing = ACT_ARRAY_PASSING;
+ pAction->peParamPassing[i] = ACT_ARRAY_PASSING;
+ pAction->bNeedReleaseBatch = 1;
} else if(iTplOpts & OMSR_TPL_AS_MSG) {
- pAction->eParamPassing = ACT_MSG_PASSING;
+ pAction->peParamPassing[i] = ACT_MSG_PASSING;
+ pAction->bUsesMsgPassingMode = 1;
} else if(iTplOpts & OMSR_TPL_AS_JSON) {
- pAction->eParamPassing = ACT_JSON_PASSING;
+ pAction->peParamPassing[i] = ACT_JSON_PASSING;
+ pAction->bNeedReleaseBatch = 1;
} else {
- pAction->eParamPassing = ACT_STRING_PASSING;
+ pAction->peParamPassing[i] = ACT_STRING_PASSING;
}
DBGPRINTF("template: '%s' assigned\n", pTplName);
@@ -1816,8 +1824,10 @@ actionNewInst(struct nvlst *lst, action_t **ppAction)
/* check if the module is compatible with select features
* (currently no such features exist) */
loadConf->actions.nbrActions++; /* one more active action! */
+ *ppAction = pAction;
+ } else {
+ // TODO: cleanup
}
- *ppAction = pAction;
finalize_it:
free(cnfModName);