diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 124 |
1 files changed, 67 insertions, 57 deletions
@@ -301,12 +301,13 @@ rsRetVal actionDestruct(action_t * const pThis) if(pThis->statsobj != NULL) statsobj.Destruct(&pThis->statsobj); - if(pThis->pMod != NULL) + if(pThis->pModData != NULL) pThis->pMod->freeInstance(pThis->pModData); pthread_mutex_destroy(&pThis->mutAction); d_free(pThis->pszName); d_free(pThis->ppTpl); + d_free(pThis->peParamPassing); finalize_it: d_free(pThis); @@ -387,14 +388,21 @@ actionConstructFinalize(action_t *__restrict__ const pThis, struct nvlst *lst) /* cache transactional attribute */ pThis->isTransactional = pThis->pMod->mod.om.supportsTX; - if(pThis->isTransactional && pThis->eParamPassing != ACT_STRING_PASSING) { - errmsg.LogError(0, RS_RET_INVLD_OMOD, "action '%s'(%d) is transactional but " - "uses invalid paramter passing mode -- disabling " - "action. This is probably caused by a pre-v7 " - "output module that needs upgrade.", - pThis->pszName, pThis->iActionNbr); - actionDisable(pThis); - ABORT_FINALIZE(RS_RET_INVLD_OMOD); + if(pThis->isTransactional) { + int i; + for(i = 0 ; i < pThis->iNumTpls ; ++i) { + if(pThis->peParamPassing[i] != ACT_STRING_PASSING) { + errmsg.LogError(0, RS_RET_INVLD_OMOD, "action '%s'(%d) is transactional but " + "parameter %d " + "uses invalid paramter passing mode -- disabling " + "action. This is probably caused by a pre-v7 " + "output module that needs upgrade.", + pThis->pszName, pThis->iActionNbr, i); + actionDisable(pThis); + ABORT_FINALIZE(RS_RET_INVLD_OMOD); + + } + } } @@ -506,7 +514,7 @@ actionConstructFinalize(action_t *__restrict__ const pThis, struct nvlst *lst) DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue); - if(pThis->eParamPassing == ACT_MSG_PASSING && pThis->pQueue->qType != QUEUETYPE_DIRECT) { + if(pThis->bUsesMsgPassingMode && pThis->pQueue->qType != QUEUETYPE_DIRECT) { parser_warnmsg("module %s with message passing mode uses " "non-direct queue. This most probably leads to undesired " "results", (char*)modGetName(pThis->pMod)); @@ -906,7 +914,7 @@ prepareDoActionParams(action_t * __restrict__ const pAction, } } else { for(i = 0 ; i < pAction->iNumTpls ; ++i) { - switch(pAction->eParamPassing) { + switch(pAction->peParamPassing[i]) { case ACT_STRING_PASSING: CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pWrkrInfo->p.nontx.actParams[i]), @@ -923,10 +931,8 @@ prepareDoActionParams(action_t * __restrict__ const pAction, CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow)); pWrkrInfo->p.nontx.actParams[i].param = (void*) json; break; - default:dbgprintf("software bug/error: unknown pAction->eParamPassing " - "%d in prepareDoActionParams\n", - (int) pAction->eParamPassing); - assert(0); /* software bug if this happens! */ + default:dbgprintf("software bug/error: unknown pAction->peParamPassing[%d] %d in prepareDoActionParams\n", + i, (int) pAction->peParamPassing[i]); break; } } @@ -945,14 +951,11 @@ releaseDoActionParams(action_t *__restrict__ const pAction, wti_t *__restrict__ actWrkrInfo_t *__restrict__ pWrkrInfo; uchar ***ppMsgs; - if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING) - goto done; /* we need to do nothing with these types! */ - pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); - switch(pAction->eParamPassing) { - case ACT_ARRAY_PASSING: - ppMsgs = (uchar***) pWrkrInfo->p.nontx.actParams[0].param; - for(j = 0 ; j < pAction->iNumTpls ; ++j) { + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + switch(pAction->peParamPassing[j]) { + case ACT_ARRAY_PASSING: + ppMsgs = (uchar***) pWrkrInfo->p.nontx.actParams[0].param; if(((uchar**)ppMsgs)[j] != NULL) { jArr = 0; while(ppMsgs[j][jArr] != NULL) { @@ -963,23 +966,20 @@ releaseDoActionParams(action_t *__restrict__ const pAction, wti_t *__restrict__ free(((uchar**)ppMsgs)[j]); ((uchar**)ppMsgs)[j] = NULL; } - } - break; - case ACT_JSON_PASSING: - for(j = 0 ; j < pAction->iNumTpls ; ++j) { + break; + case ACT_JSON_PASSING: json_object_put((struct json_object*) pWrkrInfo->p.nontx.actParams[j].param); pWrkrInfo->p.nontx.actParams[j].param = NULL; + break; + case ACT_STRING_PASSING: + case ACT_MSG_PASSING: + /* no need to do anything with these */ + break; } - break; - case ACT_STRING_PASSING: - /* strings are destructed when the worker terminates */ - case ACT_MSG_PASSING: - /* can never happen, just to keep compiler happy! */ - break; } -done: return; + return; } /* This is used in resume processing. We only finally know that a resume @@ -1288,7 +1288,8 @@ processMsgMain(action_t *__restrict__ const pAction, iRet = actionProcessMessage(pAction, pWti->actWrkrInfo[pAction->iActionNbr].p.nontx.actParams, pWti); - releaseDoActionParams(pAction, pWti); + if(pAction->bNeedReleaseBatch) + releaseDoActionParams(pAction, pWti); finalize_it: if(iRet == RS_RET_OK) { if(pWti->execState.bDoAutoCommit) @@ -1688,43 +1689,50 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, * the discard action, which does not require a string. -- rgerhards, 2007-07-30 */ if(pAction->iNumTpls > 0) { - /* we first need to create the template pointer array */ + /* we first need to create the template arrays */ CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *))); + CHKmalloc(pAction->peParamPassing = (paramPassing_t*)calloc(pAction->iNumTpls, sizeof(paramPassing_t))); } + pAction->bUsesMsgPassingMode = 0; + pAction->bNeedReleaseBatch = 0; for(i = 0 ; i < pAction->iNumTpls ; ++i) { CHKiRet(OMSRgetEntry(pOMSR, i, &pTplName, &iTplOpts)); /* Ok, we got everything, so it now is time to look up the template * (Hint: templates MUST be defined before they are used!) */ - if( !(iTplOpts & OMSR_TPL_AS_MSG) - && (pAction->ppTpl[i] = - tplFind(ourConf, (char*)pTplName, strlen((char*)pTplName))) == NULL) { - snprintf(errMsg, sizeof(errMsg) / sizeof(char), - " Could not find template '%s' - action disabled", - pTplName); - errno = 0; - errmsg.LogError(0, RS_RET_NOT_FOUND, "%s", errMsg); - ABORT_FINALIZE(RS_RET_NOT_FOUND); - } - /* check required template options */ - if( (iTplOpts & OMSR_RQD_TPL_OPT_SQL) - && (pAction->ppTpl[i]->optFormatEscape == 0)) { - errno = 0; - errmsg.LogError(0, RS_RET_RQD_TPLOPT_MISSING, "Action disabled. To use this action, you have to specify " - "the SQL or stdSQL option in your template!\n"); - ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING); + if(!(iTplOpts & OMSR_TPL_AS_MSG)) { + if((pAction->ppTpl[i] = + tplFind(ourConf, (char*)pTplName, strlen((char*)pTplName))) == NULL) { + snprintf(errMsg, sizeof(errMsg) / sizeof(char), + " Could not find template '%s' - action disabled", + pTplName); + errno = 0; + errmsg.LogError(0, RS_RET_NOT_FOUND, "%s", errMsg); + ABORT_FINALIZE(RS_RET_NOT_FOUND); + } + /* check required template options */ + if( (iTplOpts & OMSR_RQD_TPL_OPT_SQL) + && (pAction->ppTpl[i]->optFormatEscape == 0)) { + errno = 0; + errmsg.LogError(0, RS_RET_RQD_TPLOPT_MISSING, "Action disabled. To use this action, you have to specify " + "the SQL or stdSQL option in your template!\n"); + ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING); + } } /* set parameter-passing mode */ if(iTplOpts & OMSR_TPL_AS_ARRAY) { - pAction->eParamPassing = ACT_ARRAY_PASSING; + pAction->peParamPassing[i] = ACT_ARRAY_PASSING; + pAction->bNeedReleaseBatch = 1; } else if(iTplOpts & OMSR_TPL_AS_MSG) { - pAction->eParamPassing = ACT_MSG_PASSING; + pAction->peParamPassing[i] = ACT_MSG_PASSING; + pAction->bUsesMsgPassingMode = 1; } else if(iTplOpts & OMSR_TPL_AS_JSON) { - pAction->eParamPassing = ACT_JSON_PASSING; + pAction->peParamPassing[i] = ACT_JSON_PASSING; + pAction->bNeedReleaseBatch = 1; } else { - pAction->eParamPassing = ACT_STRING_PASSING; + pAction->peParamPassing[i] = ACT_STRING_PASSING; } DBGPRINTF("template: '%s' assigned\n", pTplName); @@ -1816,8 +1824,10 @@ actionNewInst(struct nvlst *lst, action_t **ppAction) /* check if the module is compatible with select features * (currently no such features exist) */ loadConf->actions.nbrActions++; /* one more active action! */ + *ppAction = pAction; + } else { + // TODO: cleanup } - *ppAction = pAction; finalize_it: free(cnfModName); |