summaryrefslogtreecommitdiff
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c124
1 files changed, 73 insertions, 51 deletions
diff --git a/action.c b/action.c
index 0d29867..4bf8ba0 100644
--- a/action.c
+++ b/action.c
@@ -655,30 +655,34 @@ rsRetVal actionDbgPrint(action_t *pThis)
/* prepare the calling parameters for doAction()
* rgerhards, 2009-05-07
*/
-static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **ppMsgs, size_t *lenMsgs)
+static rsRetVal prepareDoActionParams(action_t *pAction, batch_obj_t *pElem)
{
int i;
+ msg_t *pMsg;
DEFiRet;
ASSERT(pAction != NULL);
+ ASSERT(pElem != NULL);
+ pMsg = (msg_t*) pElem->pUsrp;
/* here we must loop to process all requested strings */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i]));
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]),
+ &pElem->staticLenStrings[i]));
+ pElem->staticActParams[i] = pElem->staticActStrings[i];
break;
case ACT_ARRAY_PASSING:
- CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(ppMsgs[i])));
+ CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i])));
break;
case ACT_MSG_PASSING:
- /* we abuse the uchar* ptr, it now actually is a void*, but we can not
- * change that other than by chaning the interface, what we don't like...
- */
- ppMsgs[i] = (void*) pMsg;
- lenMsgs[i] = 0; /* init for *next* action */
+ pElem->staticActParams[i] = (void*) pMsg;
+ break;
+ default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n",
+ (int) pAction->eParamPassing);
+ assert(0); /* software bug if this happens! */
break;
- default:assert(0); /* software bug if this happens! */
}
}
@@ -687,26 +691,56 @@ finalize_it:
}
-/* cleanup doAction calling parameters
- * rgerhards, 2009-05-07
+/* free a batches ressources, but not string buffers (because they will
+ * most probably be reused). String buffers are only deleted upon final
+ * destruction of the batch.
+ * This function here must be called only when the batch is actually no
+ * longer used, also not for retrying actions or such. It invalidates
+ * buffers.
+ * rgerhards, 2010-12-17
*/
-static rsRetVal cleanupDoActionParams(action_t *pAction, uchar ***ppMsgs)
+static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch)
{
int iArr;
- int i;
+ int i, j;
+ batch_obj_t *pElem;
+ uchar ***ppMsgs;
DEFiRet;
ASSERT(pAction != NULL);
- for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- if(((uchar**)ppMsgs)[i] != NULL) {
- iArr = 0;
- while((((uchar***)ppMsgs)[i][iArr]) != NULL) {
- d_free(((uchar ***)ppMsgs)[i][iArr++]);
- ((uchar ***)ppMsgs)[i][iArr++] = NULL;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ pElem = &(pBatch->pElem[i]);
+ if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) {
+ switch(pAction->eParamPassing) {
+ case ACT_ARRAY_PASSING:
+ ppMsgs = (uchar***) pElem->staticActParams;
+ for(i = 0 ; i < pAction->iNumTpls ; ++i) {
+ if(((uchar**)ppMsgs)[i] != NULL) {
+ iArr = 0;
+ while(ppMsgs[i][iArr] != NULL) {
+ d_free(ppMsgs[i][iArr++]);
+ ppMsgs[i][iArr++] = NULL;
+ }
+ d_free(((uchar**)ppMsgs)[i]);
+ ((uchar**)ppMsgs)[i] = NULL;
+ }
+ }
+ break;
+ case ACT_STRING_PASSING:
+ case ACT_MSG_PASSING:
+ /* nothing to do in that case */
+ /* TODO ... and yet we do something ;) This is considered not
+ * really needed, but I was not bold enough to remove that while
+ * fixing the stable. It should be removed in a devel version
+ * soon (I really don't see a reason why we would need it).
+ * rgerhards, 2010-12-16
+ */
+ for(j = 0 ; j < pAction->iNumTpls ; ++j) {
+ ((uchar**)pElem->staticActParams)[j] = NULL;
+ }
+ break;
}
- d_free(((uchar**)ppMsgs)[i]);
- ((uchar**)ppMsgs)[i] = NULL;
}
}
@@ -720,7 +754,6 @@ static rsRetVal cleanupDoActionParams(action_t *pAction, uchar ***ppMsgs)
rsRetVal
actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams)
{
- int i;
DEFiRet;
ASSERT(pThis != NULL);
@@ -729,10 +762,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams)
DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
pThis->bHadAutoCommit = 0;
-//d_pthread_mutex_lock(&pThis->mutActExec);
-//pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec);
iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData);
-//pthread_cleanup_pop(1); /* unlock mutex */
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis);
@@ -761,27 +791,6 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams)
iRet = getReturnCode(pThis);
finalize_it:
-
- /* we need to cleanup the batches string buffers if they have been used
- * in a non-standard way. -- rgerhards, 2010-06-15
- * Note that we may do this at the batch level, this would provide a bit
- * more concurrency (TODO).
- */
- switch(pThis->eParamPassing) {
- case ACT_STRING_PASSING:
- /* nothing to do in that case */
- break;
- case ACT_ARRAY_PASSING:
- cleanupDoActionParams(pThis, actParams); /* iRet ignored! */
- break;
- case ACT_MSG_PASSING:
- /* nothing to do in that case */
- for(i = 0 ; i < pThis->iNumTpls ; ++i) {
- ((uchar**)actParams)[i] = NULL;
- }
- break;
- }
-
RETiRet;
}
@@ -944,10 +953,12 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
int i;
int bDone;
rsRetVal localRet;
+ int wasDoneTo;
DEFiRet;
assert(pBatch != NULL);
+ wasDoneTo = pBatch->iDoneUpTo;
bDone = 0;
do {
localRet = tryDoAction(pAction, pBatch, &nElem);
@@ -971,7 +982,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
; /* do nothing, this will retry the full batch */
} else if(localRet == RS_RET_ACTION_FAILED) {
/* in this case, everything not yet committed is BAD */
- for(i = pBatch->iDoneUpTo ; i < nElem ; ++i) {
+ for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) {
if( pBatch->pElem[i].state != BATCH_STATE_DISC
&& pBatch->pElem[i].state != BATCH_STATE_COMM ) {
pBatch->pElem[i].state = BATCH_STATE_BAD;
@@ -981,7 +992,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
bDone = 1;
} else {
if(nElem == 1) {
- batchSetElemState(pBatch, i, BATCH_STATE_BAD);
+ batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD);
bDone = 1;
} else {
/* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
@@ -1021,8 +1032,7 @@ prepareBatch(action_t *pAction, batch_t *pBatch)
pElem = &(pBatch->pElem[i]);
if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) {
pElem->state = BATCH_STATE_RDY;
- prepareDoActionParams(pAction, (msg_t*) pElem->pUsrp,
- (uchar**) &(pElem->staticActParams), pElem->staticLenParams);
+ prepareDoActionParams(pAction, pElem);
}
}
RETiRet;
@@ -1055,6 +1065,7 @@ static rsRetVal
processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
{
int *pbShutdownImmdtSave;
+ rsRetVal localRet;
DEFiRet;
assert(pBatch != NULL);
@@ -1076,6 +1087,16 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
pthread_cleanup_pop(1); /* unlock mutex */
+ /* even if processAction failed, we need to release the batch (else we
+ * have a memory leak). So we do this first, and then check if we need to
+ * return an error code. If so, the code from processAction has priority.
+ * rgerhards, 2010-12-17
+ */
+ localRet = releaseBatch(pAction, pBatch);
+
+ if(iRet == RS_RET_OK)
+ iRet = localRet;
+
finalize_it:
pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
RETiRet;
@@ -1360,7 +1381,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
int i;
int bProcessMarkMsgs = 0;
int bModifiedFilter;
- sbool FilterSave[128];
+ sbool FilterSave[1024];
sbool *pFilterSave;
DEFiRet;
@@ -1405,6 +1426,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
if(bModifiedFilter) {
/* in this case, we need to restore previous state */
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
+ /* note: clang static code analyzer reports a false positive below */
pBatch->pElem[i].bFilterOK = pFilterSave[i];
}
}