diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 124 |
1 files changed, 73 insertions, 51 deletions
@@ -655,30 +655,34 @@ rsRetVal actionDbgPrint(action_t *pThis) /* prepare the calling parameters for doAction() * rgerhards, 2009-05-07 */ -static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **ppMsgs, size_t *lenMsgs) +static rsRetVal prepareDoActionParams(action_t *pAction, batch_obj_t *pElem) { int i; + msg_t *pMsg; DEFiRet; ASSERT(pAction != NULL); + ASSERT(pElem != NULL); + pMsg = (msg_t*) pElem->pUsrp; /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i])); + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]), + &pElem->staticLenStrings[i])); + pElem->staticActParams[i] = pElem->staticActStrings[i]; break; case ACT_ARRAY_PASSING: - CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(ppMsgs[i]))); + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]))); break; case ACT_MSG_PASSING: - /* we abuse the uchar* ptr, it now actually is a void*, but we can not - * change that other than by chaning the interface, what we don't like... - */ - ppMsgs[i] = (void*) pMsg; - lenMsgs[i] = 0; /* init for *next* action */ + pElem->staticActParams[i] = (void*) pMsg; + break; + default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n", + (int) pAction->eParamPassing); + assert(0); /* software bug if this happens! */ break; - default:assert(0); /* software bug if this happens! */ } } @@ -687,26 +691,56 @@ finalize_it: } -/* cleanup doAction calling parameters - * rgerhards, 2009-05-07 +/* free a batches ressources, but not string buffers (because they will + * most probably be reused). String buffers are only deleted upon final + * destruction of the batch. + * This function here must be called only when the batch is actually no + * longer used, also not for retrying actions or such. It invalidates + * buffers. + * rgerhards, 2010-12-17 */ -static rsRetVal cleanupDoActionParams(action_t *pAction, uchar ***ppMsgs) +static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) { int iArr; - int i; + int i, j; + batch_obj_t *pElem; + uchar ***ppMsgs; DEFiRet; ASSERT(pAction != NULL); - for(i = 0 ; i < pAction->iNumTpls ; ++i) { - if(((uchar**)ppMsgs)[i] != NULL) { - iArr = 0; - while((((uchar***)ppMsgs)[i][iArr]) != NULL) { - d_free(((uchar ***)ppMsgs)[i][iArr++]); - ((uchar ***)ppMsgs)[i][iArr++] = NULL; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + pElem = &(pBatch->pElem[i]); + if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { + switch(pAction->eParamPassing) { + case ACT_ARRAY_PASSING: + ppMsgs = (uchar***) pElem->staticActParams; + for(i = 0 ; i < pAction->iNumTpls ; ++i) { + if(((uchar**)ppMsgs)[i] != NULL) { + iArr = 0; + while(ppMsgs[i][iArr] != NULL) { + d_free(ppMsgs[i][iArr++]); + ppMsgs[i][iArr++] = NULL; + } + d_free(((uchar**)ppMsgs)[i]); + ((uchar**)ppMsgs)[i] = NULL; + } + } + break; + case ACT_STRING_PASSING: + case ACT_MSG_PASSING: + /* nothing to do in that case */ + /* TODO ... and yet we do something ;) This is considered not + * really needed, but I was not bold enough to remove that while + * fixing the stable. It should be removed in a devel version + * soon (I really don't see a reason why we would need it). + * rgerhards, 2010-12-16 + */ + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + ((uchar**)pElem->staticActParams)[j] = NULL; + } + break; } - d_free(((uchar**)ppMsgs)[i]); - ((uchar**)ppMsgs)[i] = NULL; } } @@ -720,7 +754,6 @@ static rsRetVal cleanupDoActionParams(action_t *pAction, uchar ***ppMsgs) rsRetVal actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) { - int i; DEFiRet; ASSERT(pThis != NULL); @@ -729,10 +762,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); pThis->bHadAutoCommit = 0; -//d_pthread_mutex_lock(&pThis->mutActExec); -//pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec); iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); -//pthread_cleanup_pop(1); /* unlock mutex */ switch(iRet) { case RS_RET_OK: actionCommitted(pThis); @@ -761,27 +791,6 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) iRet = getReturnCode(pThis); finalize_it: - - /* we need to cleanup the batches string buffers if they have been used - * in a non-standard way. -- rgerhards, 2010-06-15 - * Note that we may do this at the batch level, this would provide a bit - * more concurrency (TODO). - */ - switch(pThis->eParamPassing) { - case ACT_STRING_PASSING: - /* nothing to do in that case */ - break; - case ACT_ARRAY_PASSING: - cleanupDoActionParams(pThis, actParams); /* iRet ignored! */ - break; - case ACT_MSG_PASSING: - /* nothing to do in that case */ - for(i = 0 ; i < pThis->iNumTpls ; ++i) { - ((uchar**)actParams)[i] = NULL; - } - break; - } - RETiRet; } @@ -944,10 +953,12 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) int i; int bDone; rsRetVal localRet; + int wasDoneTo; DEFiRet; assert(pBatch != NULL); + wasDoneTo = pBatch->iDoneUpTo; bDone = 0; do { localRet = tryDoAction(pAction, pBatch, &nElem); @@ -971,7 +982,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) ; /* do nothing, this will retry the full batch */ } else if(localRet == RS_RET_ACTION_FAILED) { /* in this case, everything not yet committed is BAD */ - for(i = pBatch->iDoneUpTo ; i < nElem ; ++i) { + for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { if( pBatch->pElem[i].state != BATCH_STATE_DISC && pBatch->pElem[i].state != BATCH_STATE_COMM ) { pBatch->pElem[i].state = BATCH_STATE_BAD; @@ -981,7 +992,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) bDone = 1; } else { if(nElem == 1) { - batchSetElemState(pBatch, i, BATCH_STATE_BAD); + batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD); bDone = 1; } else { /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ @@ -1021,8 +1032,7 @@ prepareBatch(action_t *pAction, batch_t *pBatch) pElem = &(pBatch->pElem[i]); if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { pElem->state = BATCH_STATE_RDY; - prepareDoActionParams(pAction, (msg_t*) pElem->pUsrp, - (uchar**) &(pElem->staticActParams), pElem->staticLenParams); + prepareDoActionParams(pAction, pElem); } } RETiRet; @@ -1055,6 +1065,7 @@ static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { int *pbShutdownImmdtSave; + rsRetVal localRet; DEFiRet; assert(pBatch != NULL); @@ -1076,6 +1087,16 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) pthread_cleanup_pop(1); /* unlock mutex */ + /* even if processAction failed, we need to release the batch (else we + * have a memory leak). So we do this first, and then check if we need to + * return an error code. If so, the code from processAction has priority. + * rgerhards, 2010-12-17 + */ + localRet = releaseBatch(pAction, pBatch); + + if(iRet == RS_RET_OK) + iRet = localRet; + finalize_it: pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; @@ -1360,7 +1381,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) int i; int bProcessMarkMsgs = 0; int bModifiedFilter; - sbool FilterSave[128]; + sbool FilterSave[1024]; sbool *pFilterSave; DEFiRet; @@ -1405,6 +1426,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) if(bModifiedFilter) { /* in this case, we need to restore previous state */ for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { + /* note: clang static code analyzer reports a false positive below */ pBatch->pElem[i].bFilterOK = pFilterSave[i]; } } |