diff options
author | Michael Biebl <biebl@debian.org> | 2013-07-23 01:01:14 +0200 |
---|---|---|
committer | Michael Biebl <biebl@debian.org> | 2013-07-23 01:01:14 +0200 |
commit | a400789c367fd3cd8d0f9588ca2af72dcac47c9c (patch) | |
tree | 8c1caa6308f9e720e1e461da321107e6dc29f256 /runtime/queue.c | |
parent | f5638ea86a8626b7e0e471d720359cb4e3d8cb1c (diff) | |
download | rsyslog-a400789c367fd3cd8d0f9588ca2af72dcac47c9c.tar.gz |
Imported Upstream version 7.4.3upstream/7.4.3
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 71 |
1 files changed, 55 insertions, 16 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 85b1e45..abe2be0 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -877,7 +877,8 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) DEFiRet; ASSERT(pThis != NULL); - + + free(pThis->pszQIFNam); if(pThis->tVars.disk.pWrite != NULL) strm.Destruct(&pThis->tVars.disk.pWrite); if(pThis->tVars.disk.pReadDeq != NULL) @@ -1022,7 +1023,7 @@ qqueueAdd(qqueue_t *pThis, msg_t *pMsg) if(pThis->qType != QUEUETYPE_DIRECT) { ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); - DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n", + DBGOPRINT((obj_t*) pThis, "qqueueAdd: entry added, size now log %d, phys %d entries\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); } @@ -1466,7 +1467,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) */ if(bytesDel != 0) { pThis->tVars.disk.sizeOnDisk -= bytesDel; - DBGOPRINT((obj_t*) pThis, "a %lld octet file has been deleted, now %lld octets disk " + DBGOPRINT((obj_t*) pThis, "doDeleteBatch: a %lld octet file has been deleted, now %lld octets disk " "space used\n", bytesDel, pThis->tVars.disk.sizeOnDisk); /* awake possibly waiting enq process */ pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */ @@ -1480,7 +1481,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize); ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq); - DBGPRINTF("delete batch from store, new sizes: log %d, phys %d\n", + DBGPRINTF("doDeleteBatch: delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ @@ -1550,13 +1551,13 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); ++nEnqueued; if(localRet != RS_RET_OK) { - DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); + DBGPRINTF("DeleteProcessedBatch: error %d re-enqueuing unprocessed data element - discarded\n", localRet); } } msgDestruct(&pMsg); } - DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); + DBGPRINTF("DeleteProcessedBatch: we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); if(nEnqueued > 0) qqueueChkPersist(pThis, nEnqueued); @@ -1913,8 +1914,16 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { - CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, - MsgAddRef(pWti->batch.pElem[i].pMsg))); + iRet = qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, MsgAddRef(pWti->batch.pElem[i].pMsg)); + if(iRet != RS_RET_OK) { + if(iRet == RS_RET_ERR_QUEUE_EMERGENCY) { + /* Queue emergency error occured */ + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg caught RS_RET_ERR_QUEUE_EMERGENCY, aborting loop.\n"); + FINALIZE; + } else { + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg item (%d) returned with error state: '%d'\n", i, iRet); + } + } pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */ } @@ -1922,10 +1931,38 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(iCancelStateSave, NULL); finalize_it: + /* Check the last return state of qqueueEnqMsg. If an error was returned, we acknowledge it only. + * Unless the error code is RS_RET_ERR_QUEUE_EMERGENCY, we reset the return state to RS_RET_OK. + * Otherwise the Caller functions would run into an infinite Loop trying to enqueue the + * same messages over and over again. + * + * However we do NOT overwrite positive return states like + * RS_RET_TERMINATE_NOW, + * RS_RET_NO_RUN, + * RS_RET_IDLE, + * RS_RET_TERMINATE_WHEN_IDLE + * These return states are important for Queue handling of the upper laying functions. + * RGer: Note that checking for iRet < 0 is a bit bold. In theory, positive iRet + * values are "OK" states, and things that the caller shall deal with. However, + * this has not been done so consistently. Andre convinced me that the current + * code is an elegant solution. However, if problems with queue workers and/or + * shutdown come up, this code here should be looked at suspiciously. In those + * cases it may work out to check all status codes explicitely, just to avoid + * a pitfall due to unexpected states being passed on to the caller. + */ + if( iRet != RS_RET_OK && + iRet != RS_RET_ERR_QUEUE_EMERGENCY && + iRet < 0) { + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg Resetting iRet from %d back to RS_RET_OK\n", iRet); + iRet = RS_RET_OK; + } else { + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg returns with iRet %d\n", iRet); + } + /* now we are done, but potentially need to re-aquire the mutex */ if(bNeedReLock) d_pthread_mutex_lock(pThis->mut); - DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); + RETiRet; } @@ -2491,7 +2528,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) * In any case, this was the old code (if we do the TODO): * pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); */ - DBGOPRINT((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message " + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: FullDelay mark reached for full delayable message " "- blocking, queue size is %d.\n", pThis->iQueueSize); timeoutComp(&t, 1000); err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); @@ -2508,7 +2545,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) } } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY && !glbl.GetGlobalInputTermState()) { if(pThis->iQueueSize >= pThis->iLightDlyMrk) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light " + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: LightDelay mark reached for light " "delayable message - blocking a bit.\n"); timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */ err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); @@ -2531,24 +2568,26 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull); if(pThis->toEnq == 0 || pThis->bEnqOnly) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n"); + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - configured for immediate discarding QueueSize=%d " + "MaxQueueSize=%d sizeOnDisk=%lld sizeOnDiskMax=%lld\n", pThis->iQueueSize, pThis->iMaxQueueSize, + pThis->tVars.disk.sizeOnDisk, pThis->sizeOnDiskMax); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq); + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - waiting %dms to drain.\n", pThis->toEnq); if(glbl.GetGlobalInputTermState()) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL, discard due to FORCE_TERM.\n"); + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL, discard due to FORCE_TERM.\n"); ABORT_FINALIZE(RS_RET_FORCE_TERM); } timeoutComp(&t, pThis->toEnq); if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: cond timeout, dropping message!\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } - dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n"); + dbgoprint((obj_t*) pThis, "doEnqSingleObject: wait solved queue full condition, enqueing\n"); } } |