From ae82a633a4d42dfdc942d8f0f4f5e6470f13797e Mon Sep 17 00:00:00 2001 From: Michael Biebl Date: Thu, 1 Sep 2011 23:27:36 +0200 Subject: Imported Upstream version 5.8.5 --- action.c | 65 +++++++++++++++++++++++++++++++++++++--------------------------- 1 file changed, 38 insertions(+), 27 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 54a05fc..951b472 100644 --- a/action.c +++ b/action.c @@ -960,7 +960,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) i = pBatch->iDoneUpTo; /* all messages below that index are processed */ iElemProcessed = 0; iCommittedUpTo = i; -dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem); +dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem); while(iElemProcessed <= *pnElem && i < pBatch->nElem) { if(*(pBatch->pbShutdownImmediate)) ABORT_FINALIZE(RS_RET_FORCE_TERM); @@ -980,13 +980,17 @@ dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, /* mark messages as committed */ while(iCommittedUpTo <= i) { pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); + ++iCommittedUpTo; + //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; } } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { /* mark messages as committed */ while(iCommittedUpTo < i) { pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); + ++iCommittedUpTo; + //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; } pBatch->pElem[i].state = BATCH_STATE_SUB; } else if(localRet == RS_RET_DEFER_COMMIT) { @@ -1011,6 +1015,15 @@ finalize_it: RETiRet; } +/* debug aid */ +static void displayBatchState(batch_t *pBatch) +{ + int i; + for(i = 0 ; i < pBatch->nElem ; ++i) { + dbgprintf("XXXXX: displayBatchState2 %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); + } +} + /* submit a batch for actual action processing. * The first nElem elements are processed. This function calls itself @@ -1468,7 +1481,6 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) time_t now = 0; time_t lastAct; int i; - int bProcessMarkMsgs = 0; int bModifiedFilter; sbool FilterSave[1024]; sbool *pFilterSave; @@ -1482,33 +1494,32 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) bModifiedFilter = 0; for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { + if(!pBatch->pElem[i].bFilterOK) + continue; pFilterSave[i] = pBatch->pElem[i].bFilterOK; - if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) { - /* check if we need to write or not */ - if(now == 0) { - now = datetime.GetTime(NULL); /* good time call - the only one done */ - /* CAS loop, we write back a bit early, but that's OK... */ - /* we use reception time, not dequeue time - this is considered more appropriate and - * also faster ;) -- rgerhards, 2008-09-17 */ - do { - lastAct = pAction->f_time; - if((now - lastAct) < MarkInterval / 2) { - DBGPRINTF("action was recently called, ignoring mark message\n"); - bProcessMarkMsgs = 0; - } else { - bProcessMarkMsgs = 1; - } - } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, - ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0); - } - if(bProcessMarkMsgs) { - pBatch->pElem[i].bFilterOK = 0; - bModifiedFilter = 1; + if(now == 0) { + now = datetime.GetTime(NULL); /* good time call - the only one done */ + } + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) { + if((now - lastAct) < MarkInterval / 2) { + pBatch->pElem[i].bFilterOK = 0; + bModifiedFilter = 1; + DBGPRINTF("action was recently called, ignoring mark message\n"); + break; /* do not update timestamp for non-written mark messages */ + } } + } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, + ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0); + if(pBatch->pElem[i].bFilterOK) { + DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", + i, module.GetStateName(pAction->pMod)); } } - - DBGPRINTF("Called action(NotAllMark), logging to %s\n", module.GetStateName(pAction->pMod)); iRet = doSubmitToActionQBatch(pAction, pBatch); -- cgit v1.2.3