summaryrefslogtreecommitdiff
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorMichael Biebl <biebl@debian.org>2013-07-23 01:01:14 +0200
committerMichael Biebl <biebl@debian.org>2013-07-23 01:01:14 +0200
commita400789c367fd3cd8d0f9588ca2af72dcac47c9c (patch)
tree8c1caa6308f9e720e1e461da321107e6dc29f256 /runtime/queue.c
parentf5638ea86a8626b7e0e471d720359cb4e3d8cb1c (diff)
downloadrsyslog-a400789c367fd3cd8d0f9588ca2af72dcac47c9c.tar.gz
Imported Upstream version 7.4.3upstream/7.4.3
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c71
1 files changed, 55 insertions, 16 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 85b1e45..abe2be0 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -877,7 +877,8 @@ static rsRetVal qDestructDisk(qqueue_t *pThis)
DEFiRet;
ASSERT(pThis != NULL);
-
+
+ free(pThis->pszQIFNam);
if(pThis->tVars.disk.pWrite != NULL)
strm.Destruct(&pThis->tVars.disk.pWrite);
if(pThis->tVars.disk.pReadDeq != NULL)
@@ -1022,7 +1023,7 @@ qqueueAdd(qqueue_t *pThis, msg_t *pMsg)
if(pThis->qType != QUEUETYPE_DIRECT) {
ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
- DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n",
+ DBGOPRINT((obj_t*) pThis, "qqueueAdd: entry added, size now log %d, phys %d entries\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
}
@@ -1466,7 +1467,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
*/
if(bytesDel != 0) {
pThis->tVars.disk.sizeOnDisk -= bytesDel;
- DBGOPRINT((obj_t*) pThis, "a %lld octet file has been deleted, now %lld octets disk "
+ DBGOPRINT((obj_t*) pThis, "doDeleteBatch: a %lld octet file has been deleted, now %lld octets disk "
"space used\n", bytesDel, pThis->tVars.disk.sizeOnDisk);
/* awake possibly waiting enq process */
pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
@@ -1480,7 +1481,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize);
ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq);
- DBGPRINTF("delete batch from store, new sizes: log %d, phys %d\n",
+ DBGPRINTF("doDeleteBatch: delete batch from store, new sizes: log %d, phys %d\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
++pThis->deqIDDel; /* one more batch dequeued */
@@ -1550,13 +1551,13 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
++nEnqueued;
if(localRet != RS_RET_OK) {
- DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
+ DBGPRINTF("DeleteProcessedBatch: error %d re-enqueuing unprocessed data element - discarded\n", localRet);
}
}
msgDestruct(&pMsg);
}
- DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
+ DBGPRINTF("DeleteProcessedBatch: we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
if(nEnqueued > 0)
qqueueChkPersist(pThis, nEnqueued);
@@ -1913,8 +1914,16 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
- CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY,
- MsgAddRef(pWti->batch.pElem[i].pMsg)));
+ iRet = qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, MsgAddRef(pWti->batch.pElem[i].pMsg));
+ if(iRet != RS_RET_OK) {
+ if(iRet == RS_RET_ERR_QUEUE_EMERGENCY) {
+ /* Queue emergency error occured */
+ DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg caught RS_RET_ERR_QUEUE_EMERGENCY, aborting loop.\n");
+ FINALIZE;
+ } else {
+ DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg item (%d) returned with error state: '%d'\n", i, iRet);
+ }
+ }
pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */
}
@@ -1922,10 +1931,38 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
pthread_setcancelstate(iCancelStateSave, NULL);
finalize_it:
+ /* Check the last return state of qqueueEnqMsg. If an error was returned, we acknowledge it only.
+ * Unless the error code is RS_RET_ERR_QUEUE_EMERGENCY, we reset the return state to RS_RET_OK.
+ * Otherwise the Caller functions would run into an infinite Loop trying to enqueue the
+ * same messages over and over again.
+ *
+ * However we do NOT overwrite positive return states like
+ * RS_RET_TERMINATE_NOW,
+ * RS_RET_NO_RUN,
+ * RS_RET_IDLE,
+ * RS_RET_TERMINATE_WHEN_IDLE
+ * These return states are important for Queue handling of the upper laying functions.
+ * RGer: Note that checking for iRet < 0 is a bit bold. In theory, positive iRet
+ * values are "OK" states, and things that the caller shall deal with. However,
+ * this has not been done so consistently. Andre convinced me that the current
+ * code is an elegant solution. However, if problems with queue workers and/or
+ * shutdown come up, this code here should be looked at suspiciously. In those
+ * cases it may work out to check all status codes explicitely, just to avoid
+ * a pitfall due to unexpected states being passed on to the caller.
+ */
+ if( iRet != RS_RET_OK &&
+ iRet != RS_RET_ERR_QUEUE_EMERGENCY &&
+ iRet < 0) {
+ DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg Resetting iRet from %d back to RS_RET_OK\n", iRet);
+ iRet = RS_RET_OK;
+ } else {
+ DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg returns with iRet %d\n", iRet);
+ }
+
/* now we are done, but potentially need to re-aquire the mutex */
if(bNeedReLock)
d_pthread_mutex_lock(pThis->mut);
- DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
+
RETiRet;
}
@@ -2491,7 +2528,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
* In any case, this was the old code (if we do the TODO):
* pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut);
*/
- DBGOPRINT((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message "
+ DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: FullDelay mark reached for full delayable message "
"- blocking, queue size is %d.\n", pThis->iQueueSize);
timeoutComp(&t, 1000);
err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t);
@@ -2508,7 +2545,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
}
} else if(flowCtlType == eFLOWCTL_LIGHT_DELAY && !glbl.GetGlobalInputTermState()) {
if(pThis->iQueueSize >= pThis->iLightDlyMrk) {
- DBGOPRINT((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light "
+ DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: LightDelay mark reached for light "
"delayable message - blocking a bit.\n");
timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t);
@@ -2531,24 +2568,26 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
&& pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull);
if(pThis->toEnq == 0 || pThis->bEnqOnly) {
- DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n");
+ DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - configured for immediate discarding QueueSize=%d "
+ "MaxQueueSize=%d sizeOnDisk=%lld sizeOnDiskMax=%lld\n", pThis->iQueueSize, pThis->iMaxQueueSize,
+ pThis->tVars.disk.sizeOnDisk, pThis->sizeOnDiskMax);
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
- DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq);
+ DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - waiting %dms to drain.\n", pThis->toEnq);
if(glbl.GetGlobalInputTermState()) {
- DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL, discard due to FORCE_TERM.\n");
+ DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL, discard due to FORCE_TERM.\n");
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
timeoutComp(&t, pThis->toEnq);
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
- DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
+ DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: cond timeout, dropping message!\n");
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
- dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n");
+ dbgoprint((obj_t*) pThis, "doEnqSingleObject: wait solved queue full condition, enqueing\n");
}
}