summaryrefslogtreecommitdiff
path: root/action.c
diff options
context:
space:
mode:
authorMichael Biebl <biebl@debian.org>2011-09-01 23:27:36 +0200
committerMichael Biebl <biebl@debian.org>2011-09-01 23:27:36 +0200
commitae82a633a4d42dfdc942d8f0f4f5e6470f13797e (patch)
treec9835c065c1dafa1beb21ffa19f4e2ba8693cce2 /action.c
parentb9d0478eeb8b4f175704e012fcc06069b10e2553 (diff)
downloadrsyslog-ae82a633a4d42dfdc942d8f0f4f5e6470f13797e.tar.gz
Imported Upstream version 5.8.5upstream/5.8.5
Diffstat (limited to 'action.c')
-rw-r--r--action.c65
1 files changed, 38 insertions, 27 deletions
diff --git a/action.c b/action.c
index 54a05fc..951b472 100644
--- a/action.c
+++ b/action.c
@@ -960,7 +960,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
iElemProcessed = 0;
iCommittedUpTo = i;
-dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem);
+dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem);
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
@@ -980,13 +980,17 @@ dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem,
/* mark messages as committed */
while(iCommittedUpTo <= i) {
pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
- pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM);
+ ++iCommittedUpTo;
+ //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
} else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
/* mark messages as committed */
while(iCommittedUpTo < i) {
pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
- pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM);
+ ++iCommittedUpTo;
+ //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
pBatch->pElem[i].state = BATCH_STATE_SUB;
} else if(localRet == RS_RET_DEFER_COMMIT) {
@@ -1011,6 +1015,15 @@ finalize_it:
RETiRet;
}
+/* debug aid */
+static void displayBatchState(batch_t *pBatch)
+{
+ int i;
+ for(i = 0 ; i < pBatch->nElem ; ++i) {
+ dbgprintf("XXXXX: displayBatchState2 %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state);
+ }
+}
+
/* submit a batch for actual action processing.
* The first nElem elements are processed. This function calls itself
@@ -1468,7 +1481,6 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
time_t now = 0;
time_t lastAct;
int i;
- int bProcessMarkMsgs = 0;
int bModifiedFilter;
sbool FilterSave[1024];
sbool *pFilterSave;
@@ -1482,33 +1494,32 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
bModifiedFilter = 0;
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
+ if(!pBatch->pElem[i].bFilterOK)
+ continue;
pFilterSave[i] = pBatch->pElem[i].bFilterOK;
- if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) {
- /* check if we need to write or not */
- if(now == 0) {
- now = datetime.GetTime(NULL); /* good time call - the only one done */
- /* CAS loop, we write back a bit early, but that's OK... */
- /* we use reception time, not dequeue time - this is considered more appropriate and
- * also faster ;) -- rgerhards, 2008-09-17 */
- do {
- lastAct = pAction->f_time;
- if((now - lastAct) < MarkInterval / 2) {
- DBGPRINTF("action was recently called, ignoring mark message\n");
- bProcessMarkMsgs = 0;
- } else {
- bProcessMarkMsgs = 1;
- }
- } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct,
- ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0);
- }
- if(bProcessMarkMsgs) {
- pBatch->pElem[i].bFilterOK = 0;
- bModifiedFilter = 1;
+ if(now == 0) {
+ now = datetime.GetTime(NULL); /* good time call - the only one done */
+ }
+ /* CAS loop, we write back a bit early, but that's OK... */
+ /* we use reception time, not dequeue time - this is considered more appropriate and
+ * also faster ;) -- rgerhards, 2008-09-17 */
+ do {
+ lastAct = pAction->f_time;
+ if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) {
+ if((now - lastAct) < MarkInterval / 2) {
+ pBatch->pElem[i].bFilterOK = 0;
+ bModifiedFilter = 1;
+ DBGPRINTF("action was recently called, ignoring mark message\n");
+ break; /* do not update timestamp for non-written mark messages */
+ }
}
+ } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct,
+ ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0);
+ if(pBatch->pElem[i].bFilterOK) {
+ DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n",
+ i, module.GetStateName(pAction->pMod));
}
}
-
- DBGPRINTF("Called action(NotAllMark), logging to %s\n", module.GetStateName(pAction->pMod));
iRet = doSubmitToActionQBatch(pAction, pBatch);