summaryrefslogtreecommitdiff
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorMichael Biebl <biebl@debian.org>2010-09-30 14:07:18 +0200
committerMichael Biebl <biebl@debian.org>2010-09-30 14:07:18 +0200
commit017fb92bd811ce1083504eafda4e2080d9520a31 (patch)
tree777a2a3627f64f6a0e2bea061c0e392af7437300 /runtime/queue.c
parentdea652279a335b6d83050e5f65c45dd762901022 (diff)
downloadrsyslog-017fb92bd811ce1083504eafda4e2080d9520a31.tar.gz
Imported Upstream version 5.7.0upstream/5.7.0
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c2064
1 files changed, 1025 insertions, 1039 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 9d7a905..387c15c 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -8,7 +8,11 @@
* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
* if you are getting aquainted to the object.
*
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ * NOTE: as of 2009-04-22, I have begin to remove the qqueue* prefix from static
+ * function names - this makes it really hard to read and does not provide much
+ * benefit, at least I (now) think so...
+ *
+ * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -51,71 +55,177 @@
#include "wti.h"
#include "msg.h"
#include "atomic.h"
+#include "errmsg.h"
+#include "datetime.h"
+#include "unicode-helper.h"
+#include "statsobj.h"
+#include "msg.h" /* TODO: remove once we remove MsgAddRef() call */
#ifdef OS_SOLARIS
# include <sched.h>
-# define pthread_yield() sched_yield()
#endif
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(glbl)
DEFobjCurrIf(strm)
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(datetime)
+DEFobjCurrIf(statsobj)
/* forward-definitions */
-static rsRetVal qqueueChkPersist(qqueue_t *pThis);
-static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
-static rsRetVal qqueueRateLimiter(qqueue_t *pThis);
+static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr);
+static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
+static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
-static int qqueueIsIdleDA(qqueue_t *pThis);
-static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
-static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2);
-static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
+static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
+static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
+static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
+static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
+static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
#define QUEUE_NO_CHECKPOINT 0
+/***********************************************************************
+ * we need a private data structure, the "to-delete" list. As C does
+ * not provide any partly private data structures, we implement this
+ * structure right here inside the module.
+ * Note that this list must always be kept sorted based on a unique
+ * dequeue ID (which is monotonically increasing).
+ * rgerhards, 2009-05-18
+ ***********************************************************************/
+
+/* generate next uniqueue dequeue ID. Note that uniqueness is only required
+ * on a per-queue basis and while this instance runs. So a stricly monotonically
+ * increasing counter is sufficient (if enough bits are used).
+ */
+static inline qDeqID getNextDeqID(qqueue_t *pQueue)
+{
+ ISOBJ_TYPE_assert(pQueue, qqueue);
+ return pQueue->deqIDAdd++;
+}
+
+
+/* return the top element of the to-delete list or NULL, if the
+ * list is empty.
+ */
+static inline toDeleteLst_t *tdlPeek(qqueue_t *pQueue)
+{
+ ISOBJ_TYPE_assert(pQueue, qqueue);
+ return pQueue->toDeleteLst;
+}
+
+
+/* remove the top element of the to-delete list. Nothing but the
+ * element itself is destroyed. Must not be called when the list
+ * is empty.
+ */
+static inline rsRetVal tdlPop(qqueue_t *pQueue)
+{
+ toDeleteLst_t *pRemove;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pQueue, qqueue);
+ assert(pQueue->toDeleteLst != NULL);
+
+ pRemove = pQueue->toDeleteLst;
+ pQueue->toDeleteLst = pQueue->toDeleteLst->pNext;
+ free(pRemove);
+
+ RETiRet;
+}
+
+
+/* Add a new to-delete list entry. The function allocates the data
+ * structure, populates it with the values provided and links the new
+ * element into the correct place inside the list.
+ */
+static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElemDeq)
+{
+ toDeleteLst_t *pNew;
+ toDeleteLst_t *pPrev;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pQueue, qqueue);
+ assert(pQueue->toDeleteLst != NULL);
+
+ CHKmalloc(pNew = MALLOC(sizeof(toDeleteLst_t)));
+ pNew->deqID = deqID;
+ pNew->nElemDeq = nElemDeq;
+
+ /* now find right spot */
+ for( pPrev = pQueue->toDeleteLst
+ ; pPrev != NULL && deqID > pPrev->deqID
+ ; pPrev = pPrev->pNext) {
+ /*JUST SEARCH*/;
+ }
+
+ if(pPrev == NULL) {
+ pNew->pNext = pQueue->toDeleteLst;
+ pQueue->toDeleteLst = pNew;
+ } else {
+ pNew->pNext = pPrev->pNext;
+ pPrev->pNext = pNew;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
/* methods */
-/* get the overall queue size, which includes ungotten objects. Must only be called
+/* get the physical queue size. Must only be called
* while mutex is locked!
* rgerhards, 2008-01-29
*/
static inline int
-qqueueGetOverallQueueSize(qqueue_t *pThis)
+getPhysicalQueueSize(qqueue_t *pThis)
{
-#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */
-BEGINfunc
-dbgoprint((obj_t*) pThis, "queue size: %d (regular %d, ungotten %d)\n",
- pThis->iQueueSize + pThis->iUngottenObjs, pThis->iQueueSize, pThis->iUngottenObjs);
-ENDfunc
-#endif
- return pThis->iQueueSize + pThis->iUngottenObjs;
+ return pThis->iQueueSize;
+}
+
+
+/* get the logical queue size (that is store size minus logically dequeued elements).
+ * Must only be called while mutex is locked!
+ * rgerhards, 2009-05-19
+ */
+static inline int
+getLogicalQueueSize(qqueue_t *pThis)
+{
+ return pThis->iQueueSize - pThis->nLogDeq;
}
+
/* This function drains the queue in cases where this needs to be done. The most probable
* reason is a HUP which needs to discard data (because the queue is configured to be lossy).
* During a shutdown, this is typically not needed, as the OS frees up ressources and does
* this much quicker than when we clean up ourselvs. -- rgerhards, 2008-10-21
* This function returns void, as it makes no sense to communicate an error back, even if
* it happens.
+ * This functions works "around" the regular deque mechanism, because it is only used to
+ * clean up (in cases where message loss is acceptable).
*/
static inline void queueDrain(qqueue_t *pThis)
{
void *pUsr;
-
ASSERT(pThis != NULL);
+ BEGINfunc
+ DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) {
- pThis->qDel(pThis, &pUsr);
+ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
+ pThis->qDeq(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
}
+ pThis->qDel(pThis);
}
+ ENDfunc
}
@@ -126,7 +236,8 @@ static inline void queueDrain(qqueue_t *pThis)
* this point in time. The mutex must be locked when
* ths function is called. -- rgerhards, 2008-01-25
*/
-static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
+static inline rsRetVal
+qqueueAdviseMaxWorkers(qqueue_t *pThis)
{
DEFiRet;
int iMaxWorkers;
@@ -134,96 +245,17 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
if(!pThis->bEnqOnly) {
- if(pThis->bRunsDA) {
- /* if we have not yet reached the high water mark, there is no need to start a
- * worker. -- rgerhards, 2008-01-26
- */
- if(qqueueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
- }
+ if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
} else {
- if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
+ if(getLogicalQueueSize(pThis) == 0) {
+ iMaxWorkers = 0;
+ } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
iMaxWorkers = 1;
} else {
- iMaxWorkers = qqueueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
+ iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
}
- wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
- }
- }
-
- RETiRet;
-}
-
-
-/* wait until we have a fully initialized DA queue. Sometimes, we need to
- * sync with it, as we expect it for some function.
- * rgerhards, 2008-02-27
- */
-static rsRetVal
-qqueueWaitDAModeInitialized(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
- ASSERT(pThis->bRunsDA);
-
- while(pThis->bRunsDA != 2) {
- d_pthread_cond_wait(&pThis->condDAReady, pThis->mut);
- }
-
- RETiRet;
-}
-
-
-/* Destruct DA queue. This is the last part of DA-to-normal-mode
- * transistion. This is called asynchronously and some time quite a
- * while after the actual transistion. The key point is that we need to
- * do it at some later time, because we need to destruct the DA queue. That,
- * however, can not be done in a thread that has been signalled
- * This is to be called when we revert back to our own queue.
- * This function must be called with the queue mutex locked (the wti
- * class ensures this).
- * rgerhards, 2008-01-15
- */
-static rsRetVal
-qqueueTurnOffDAMode(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
- ASSERT(pThis->bRunsDA);
-
- /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
- * to wait for its startup... -- rgerhards, 2008-01-25
- */
- qqueueWaitDAModeInitialized(pThis);
-
- /* if we need to pull any data that we still need from the (child) disk queue,
- * now would be the time to do so. At present, we do not need this, but I'd like to
- * keep that comment if future need arises.
- */
-
- /* we need to check if the DA queue is empty because the DA worker may simply have
- * terminated do to no new messages arriving. That does not, however, mean that the
- * DA queue is empty. If there is still data in that queue, we do nothing and leave
- * that for a later incarnation of this function (it will be called multiple times
- * during the lifetime of DA-mode, depending on how often the DA worker receives an
- * inactivity timeout. -- rgerhards, 2008-01-25
- */
- if(pThis->pqDA->iQueueSize == 0) {
- pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
- /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
- * this will be quick.
- */
- qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
- dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
- iRet);
- /* now we need to check if the regular queue has some messages. This may be the case
- * when it is waiting that the high water mark is reached again. If so, we need to start up
- * a regular worker. -- rgerhards, 2008-01-26
- */
- if(qqueueGetOverallQueueSize(pThis) > 0) {
- qqueueAdviseMaxWorkers(pThis);
+ wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
}
}
@@ -246,37 +278,26 @@ qqueueChkIsDA(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pszFilePrefix != NULL) {
pThis->bIsDA = 1;
- dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n");
+ DBGOPRINT((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n");
} else {
- dbgoprint((obj_t*) pThis, "is NOT disk-assisted\n");
+ DBGOPRINT((obj_t*) pThis, "is NOT disk-assisted\n");
}
RETiRet;
}
-/* Start disk-assisted queue mode. All internal settings are changed. This is supposed
- * to be called from the DA worker, which must have been started before. The most important
- * chore of this function is to create the DA queue object. If that function fails,
- * the DA worker should return with an appropriate state, which in turn should lead to
- * a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this
- * function is called, else a number of races will happen.
- * Please note that this function may be called *while* we in DA mode. This is due to the
- * fact that the DA worker calls it and the DA worker may be suspended (and restarted) due
- * to inactivity timeouts.
+/* Start disk-assisted queue mode.
* rgerhards, 2008-01-15
*/
static rsRetVal
-qqueueStartDA(qqueue_t *pThis)
+StartDA(qqueue_t *pThis)
{
DEFiRet;
uchar pszDAQName[128];
ISOBJ_TYPE_assert(pThis, qqueue);
- if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */
- FINALIZE; /* ... then we are already done! */
-
/* create message queue */
CHKiRet(qqueueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
@@ -298,39 +319,22 @@ qqueueStartDA(qqueue_t *pThis)
CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles));
CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
- CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
+ CHKiRet(qqueueSettoQShutdown(pThis->pqDA, pThis->toQShutdown));
CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0));
- if(pThis->toQShutdown == 0) {
- CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
- } else {
- /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND
- * have an obviously large backlog, we can't finish it in any case. So there is no point
- * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15
- */
- CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1));
- }
iRet = qqueueStart(pThis->pqDA);
/* file not found is expected, that means it is no previous QIF available */
- if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
+ if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) {
+ errno = 0; /* else an errno is shown in errmsg! */
+ errmsg.LogError(errno, iRet, "error starting up disk queue, using pure in-memory mode");
+ pThis->bIsDA = 0; /* disable memory mode */
FINALIZE; /* something is wrong */
+ }
- /* as we are right now starting DA mode because we are so busy, it is
- * extremely unlikely that any regular worker is sleeping on empty queue. HOWEVER,
- * we want to be on the safe side, and so we awake anyone that is waiting
- * on one. So even if the scheduler plays badly with us, things should be
- * quite well. -- rgerhards, 2008-01-15
- */
- wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */
-
- pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */
- pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */
- pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
-
- dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
+ DBGOPRINT((obj_t*) pThis, "DA queue initialized, disk queue 0x%lx\n",
qqueueGetID(pThis->pqDA));
finalize_it:
@@ -338,7 +342,7 @@ finalize_it:
if(pThis->pqDA != NULL) {
qqueueDestruct(&pThis->pqDA);
}
- dbgoprint((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet);
+ DBGOPRINT((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet);
pThis->bIsDA = 0;
}
@@ -352,8 +356,8 @@ finalize_it:
* If this function fails (should not happen), DA mode is not turned on.
* rgerhards, 2008-01-16
*/
-static inline rsRetVal
-qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
+static rsRetVal
+InitDA(qqueue_t *pThis, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -366,82 +370,30 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
* is intentional. We assume that when we need it once, we may also need it on another
* occasion. Ressources used are quite minimal when no worker is running.
* rgerhards, 2008-01-24
+ * NOTE: this is the DA worker *pool*, not the DA queue!
*/
- if(pThis->pWtpDA == NULL) {
- lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
- CHKiRet(wtpConstruct (&pThis->pWtpDA));
- CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
- CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
- CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
- CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
- CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown));
- CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis));
- CHKiRet(wtpConstructFinalize (pThis->pWtpDA));
- }
+ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis));
+ CHKiRet(wtpConstruct (&pThis->pWtpDA));
+ CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA));
+ CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
+ CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
+ CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
+ CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
+ CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown));
+ CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis));
+ CHKiRet(wtpConstructFinalize (pThis->pWtpDA));
/* if we reach this point, we have a "good" DA worker pool */
- /* indicate we now run in DA mode - this is reset by the DA worker if it fails */
- pThis->bRunsDA = 1;
- pThis->bDAEnqOnly = bEnqOnly;
-
- /* now we must now adivse the wtp that we need one worker. If none is yet active,
- * that will also start one up. If we forgot that step, everything would be stalled
- * until the next enqueue request.
- */
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues alsways have just one worker max */
-
-finalize_it:
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- RETiRet;
-}
-
-
-/* check if we need to start disk assisted mode and send some signals to
- * keep it running if we are already in it. It also checks if DA mode is
- * partially initialized, in which case it waits for initialization to
- * complete.
- * rgerhards, 2008-01-14
- */
-static inline rsRetVal
-qqueueChkStrtDA(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- /* if we do not hit the high water mark, we have nothing to do */
- if(qqueueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk)
- ABORT_FINALIZE(RS_RET_OK);
-
- if(pThis->bRunsDA) {
- /* then we need to signal that we are at the high water mark again. If that happens
- * on our way down the queue, that doesn't matter, because then nobody is waiting
- * on the condition variable.
- * (Remember that a DA queue stops draining the queue once it has reached the low
- * water mark and restarts it when the high water mark is reached again - this is
- * what this code here is responsible for. Please note that all workers may have been
- * terminated due to the inactivity timeout, thus we need to advise the pool that
- * we need at least one).
- */
- dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
- qqueueGetOverallQueueSize(pThis));
- qqueueAdviseMaxWorkers(pThis);
- } else {
- /* this is the case when we are currently not running in DA mode. So it is time
- * to turn it back on.
- */
- dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
- qqueueGetOverallQueueSize(pThis));
- qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
+ /* now construct the actual queue (if it does not already exist) */
+ if(pThis->pqDA == NULL) {
+ CHKiRet(StartDA(pThis));
}
finalize_it:
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
RETiRet;
}
@@ -465,10 +417,11 @@ static rsRetVal qConstructFixedArray(qqueue_t *pThis)
if(pThis->iMaxQueueSize == 0)
ABORT_FINALIZE(RS_RET_QSIZE_ZERO);
- if((pThis->tVars.farray.pBuf = malloc(sizeof(void *) * pThis->iMaxQueueSize)) == NULL) {
+ if((pThis->tVars.farray.pBuf = MALLOC(sizeof(void *) * pThis->iMaxQueueSize)) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
+ pThis->tVars.farray.deqhead = 0;
pThis->tVars.farray.head = 0;
pThis->tVars.farray.tail = 0;
@@ -486,9 +439,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis)
ASSERT(pThis != NULL);
queueDrain(pThis); /* discard any remaining queue entries */
-
- if(pThis->tVars.farray.pBuf != NULL)
- free(pThis->tVars.farray.pBuf);
+ free(pThis->tVars.farray.pBuf);
RETiRet;
}
@@ -507,76 +458,37 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
RETiRet;
}
-static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out)
+
+static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out)
{
DEFiRet;
ASSERT(pThis != NULL);
- *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.head];
+ *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.deqhead];
- pThis->tVars.farray.head++;
- if (pThis->tVars.farray.head == pThis->iMaxQueueSize)
- pThis->tVars.farray.head = 0;
+ pThis->tVars.farray.deqhead++;
+ if (pThis->tVars.farray.deqhead == pThis->iMaxQueueSize)
+ pThis->tVars.farray.deqhead = 0;
RETiRet;
}
-/* -------------------- linked list -------------------- */
-
-/* first some generic functions which are also used for the unget linked list */
-
-static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
+static rsRetVal qDelFixedArray(qqueue_t *pThis)
{
DEFiRet;
- qLinkedList_t *pEntry;
- ASSERT(ppRoot != NULL);
- ASSERT(ppLast != NULL);
-
- if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
-
- pEntry->pNext = NULL;
- pEntry->pUsr = pUsr;
+ ASSERT(pThis != NULL);
- if(*ppRoot == NULL) {
- *ppRoot = *ppLast = pEntry;
- } else {
- (*ppLast)->pNext = pEntry;
- *ppLast = pEntry;
- }
+ pThis->tVars.farray.head++;
+ if (pThis->tVars.farray.head == pThis->iMaxQueueSize)
+ pThis->tVars.farray.head = 0;
-finalize_it:
RETiRet;
}
-static inline rsRetVal qqueueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr)
-{
- DEFiRet;
- qLinkedList_t *pEntry;
-
- ASSERT(ppRoot != NULL);
- ASSERT(ppLast != NULL);
- ASSERT(ppUsr != NULL);
- ASSERT(*ppRoot != NULL);
-
- pEntry = *ppRoot;
- *ppUsr = pEntry->pUsr;
-
- if(*ppRoot == *ppLast) {
- *ppRoot = NULL;
- *ppLast = NULL;
- } else {
- *ppRoot = pEntry->pNext;
- }
- free(pEntry);
-
- RETiRet;
-}
-/* end generic functions which are also used for the unget linked list */
+/* -------------------- linked list -------------------- */
static rsRetVal qConstructLinkedList(qqueue_t *pThis)
@@ -585,8 +497,9 @@ static rsRetVal qConstructLinkedList(qqueue_t *pThis)
ASSERT(pThis != NULL);
- pThis->tVars.linklist.pRoot = 0;
- pThis->tVars.linklist.pLast = 0;
+ pThis->tVars.linklist.pDeqRoot = NULL;
+ pThis->tVars.linklist.pDelRoot = NULL;
+ pThis->tVars.linklist.pLast = NULL;
qqueueChkIsDA(pThis);
@@ -609,54 +522,59 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
{
- DEFiRet;
-
- iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
-#if 0
qLinkedList_t *pEntry;
+ DEFiRet;
- ASSERT(pThis != NULL);
- if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t))));
pEntry->pNext = NULL;
pEntry->pUsr = pUsr;
- if(pThis->tVars.linklist.pRoot == NULL) {
- pThis->tVars.linklist.pRoot = pThis->tVars.linklist.pLast = pEntry;
+ if(pThis->tVars.linklist.pDelRoot == NULL) {
+ pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry;
} else {
pThis->tVars.linklist.pLast->pNext = pEntry;
pThis->tVars.linklist.pLast = pEntry;
}
+ if(pThis->tVars.linklist.pDeqRoot == NULL) {
+ pThis->tVars.linklist.pDeqRoot = pEntry;
+ }
+
finalize_it:
-#endif
RETiRet;
}
-static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr)
+
+static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr)
{
- DEFiRet;
- iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
-#if 0
qLinkedList_t *pEntry;
+ DEFiRet;
- ASSERT(pThis != NULL);
- ASSERT(pThis->tVars.linklist.pRoot != NULL);
-
- pEntry = pThis->tVars.linklist.pRoot;
+ pEntry = pThis->tVars.linklist.pDeqRoot;
+ ISOBJ_TYPE_assert(pEntry->pUsr, msg);
*ppUsr = pEntry->pUsr;
+ pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
- if(pThis->tVars.linklist.pRoot == pThis->tVars.linklist.pLast) {
- pThis->tVars.linklist.pRoot = NULL;
- pThis->tVars.linklist.pLast = NULL;
+ RETiRet;
+}
+
+
+static rsRetVal qDelLinkedList(qqueue_t *pThis)
+{
+ qLinkedList_t *pEntry;
+ DEFiRet;
+
+ pEntry = pThis->tVars.linklist.pDelRoot;
+
+ if(pThis->tVars.linklist.pDelRoot == pThis->tVars.linklist.pLast) {
+ pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = NULL;
} else {
- pThis->tVars.linklist.pRoot = pEntry->pNext;
+ pThis->tVars.linklist.pDelRoot = pEntry->pNext;
}
+
free(pEntry);
-#endif
RETiRet;
}
@@ -676,44 +594,6 @@ finalize_it:
}
-/* This method checks if we have a QIF file for the current queue (no matter of
- * queue mode). Returns RS_RET_OK if we have a QIF file or an error status otherwise.
- * rgerhards, 2008-01-15
- */
-static rsRetVal
-qqueueHaveQIF(qqueue_t *pThis)
-{
- DEFiRet;
- uchar pszQIFNam[MAXFNAME];
- size_t lenQIFNam;
- struct stat stat_buf;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- if(pThis->pszFilePrefix == NULL)
- ABORT_FINALIZE(RS_RET_NO_FILEPREFIX);
-
- /* Construct file name */
- lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
- (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
-
- /* check if the file exists */
- if(stat((char*) pszQIFNam, &stat_buf) == -1) {
- if(errno == ENOENT) {
- dbgoprint((obj_t*) pThis, "no .qi file found\n");
- ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
- } else {
- dbgoprint((obj_t*) pThis, "error %d trying to access .qi file\n", errno);
- ABORT_FINALIZE(RS_RET_IO_ERROR);
- }
- }
- /* If we reach this point, we have a .qi file */
-
-finalize_it:
- RETiRet;
-}
-
-
/* The method loads the persistent queue information.
* rgerhards, 2008-01-11
*/
@@ -725,8 +605,6 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
struct stat stat_buf;
- int iUngottenObjs;
- obj_t *pUsr;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -737,10 +615,10 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
/* check if the file exists */
if(stat((char*) pszQIFNam, &stat_buf) == -1) {
if(errno == ENOENT) {
- dbgoprint((obj_t*) pThis, "clean startup, no .qi file found\n");
+ DBGOPRINT((obj_t*) pThis, "clean startup, no .qi file found\n");
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
} else {
- dbgoprint((obj_t*) pThis, "error %d trying to access .qi file\n", errno);
+ DBGOPRINT((obj_t*) pThis, "error %d trying to access .qi file\n", errno);
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
}
@@ -756,25 +634,22 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
/* first, we try to read the property bag for ourselfs */
CHKiRet(obj.DeserializePropBag((obj_t*) pThis, psQIF));
- /* then the ungotten object queue */
- iUngottenObjs = pThis->iUngottenObjs;
- pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */
-
- while(iUngottenObjs > 0) {
- /* fill the queue from disk */
- CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
- qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
- --iUngottenObjs; /* one less */
- }
-
- /* and now the stream objects (some order as when persisted!) */
+ /* then the stream objects (same order as when persisted!) */
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
- CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF,
+ CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
+ /* create a duplicate for the read "pointer".
+ */
+
+ CHKiRet(strm.Dup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq));
+ CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */
+ CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq));
+
CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pWrite));
- CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pRead));
+ CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pReadDel));
+ CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pReadDeq));
/* OK, we could successfully read the file, so we now can request that it be
* deleted when we are done with the persisted information.
@@ -786,7 +661,7 @@ finalize_it:
strm.Destruct(&psQIF);
if(iRet != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n",
+ DBGOPRINT((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n",
iRet);
}
@@ -826,18 +701,26 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strm.SetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pWrite));
- CHKiRet(strm.Construct(&pThis->tVars.disk.pRead));
- CHKiRet(strm.SetbSync(pThis->tVars.disk.pRead, pThis->bSyncQueueFiles));
- CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
- CHKiRet(strm.SetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
- CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
- CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ));
- CHKiRet(strm.SetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR));
- CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pRead));
+ CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDeq));
+ CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0));
+ CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000));
+ CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ));
+ CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq));
+ CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDel));
+ CHKiRet(strm.SetbSync(pThis->tVars.disk.pReadDel, pThis->bSyncQueueFiles));
+ CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1));
+ CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDel, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000));
+ CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ));
+ CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDel));
- CHKiRet(strm.SetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix));
- CHKiRet(strm.SetFName(pThis->tVars.disk.pRead, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(strm.SetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(strm.SetFName(pThis->tVars.disk.pReadDeq, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(strm.SetFName(pThis->tVars.disk.pReadDel, pThis->pszFilePrefix, pThis->lenFilePrefix));
}
/* now we set (and overwrite in case of a persisted restart) some parameters which
@@ -846,7 +729,8 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
* ability to read existing queue files. -- rgerhards, 2008-01-12
*/
CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize));
- CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize));
+ CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pReadDeq, pThis->iMaxFileSize));
+ CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pReadDel, pThis->iMaxFileSize));
finalize_it:
RETiRet;
@@ -858,11 +742,13 @@ static rsRetVal qDestructDisk(qqueue_t *pThis)
DEFiRet;
ASSERT(pThis != NULL);
-
- if (pThis->tVars.disk.pWrite != NULL)
+
+ if(pThis->tVars.disk.pWrite != NULL)
strm.Destruct(&pThis->tVars.disk.pWrite);
- if (pThis->tVars.disk.pRead != NULL)
- strm.Destruct(&pThis->tVars.disk.pRead);
+ if(pThis->tVars.disk.pReadDeq != NULL)
+ strm.Destruct(&pThis->tVars.disk.pReadDeq);
+ if(pThis->tVars.disk.pReadDel != NULL)
+ strm.Destruct(&pThis->tVars.disk.pReadDel);
RETiRet;
}
@@ -887,23 +773,37 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
*/
objDestruct(pUsr);
- dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n",
- nWriteCount, pThis->tVars.disk.sizeOnDisk);
+ DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n",
+ nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly);
+
+finalize_it:
+ RETiRet;
+}
+
+
+static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr)
+{
+ DEFiRet;
+
+ CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL));
finalize_it:
RETiRet;
}
-static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr)
+
+static rsRetVal qDelDisk(qqueue_t *pThis)
{
+ obj_t *pDummyObj; /* we need to deserialize it... */
DEFiRet;
int64 offsIn;
int64 offsOut;
- CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pRead, &offsIn));
- CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pRead, NULL, NULL));
- CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pRead, &offsOut));
+ CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn));
+ CHKiRet(obj.Deserialize(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL));
+ objDestruct(pDummyObj);
+ CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut));
/* This time it is a bit tricky: we free disk space only upon file deletion. So we need
* to keep track of what we have read until we get an out-offset that is lower than the
@@ -915,7 +815,7 @@ static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr)
} else {
pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead;
pThis->tVars.disk.bytesRead = offsOut;
- dbgoprint((obj_t*) pThis, "a file has been deleted, now %lld octets disk space used\n", pThis->tVars.disk.sizeOnDisk);
+ DBGOPRINT((obj_t*) pThis, "a file has been deleted, now %lld octets disk space used\n", pThis->tVars.disk.sizeOnDisk);
/* awake possibly waiting enq process */
pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
}
@@ -924,6 +824,7 @@ finalize_it:
RETiRet;
}
+
/* -------------------- direct (no queueing) -------------------- */
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
{
@@ -938,8 +839,11 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
+ batch_t singleBatch;
+ batch_obj_t batchObj;
DEFiRet;
+ //TODO: init batchObj (states _OK and new fields -- CHECK)
ASSERT(pThis != NULL);
/* calling the consumer is quite different here than it is from a worker thread */
@@ -947,70 +851,52 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
* mode the consumer probably has a lot to convey (which get's lost in the other modes
* because they are asynchronous. But direct mode is deliberately synchronous.
* rgerhards, 2008-02-12
+ * We use our knowledge about the batch_t structure below, but without that, we
+ * pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- iRet = pThis->pConsumer(pThis->pUsr, pUsr);
+ memset(&batchObj, 0, sizeof(batch_obj_t));
+ memset(&singleBatch, 0, sizeof(batch_t));
+ batchObj.state = BATCH_STATE_RDY;
+ batchObj.pUsrp = (obj_t*) pUsr;
+ batchObj.bFilterOK = 1;
+ singleBatch.nElem = 1; /* there always is only one in direct mode */
+ singleBatch.pElem = &batchObj;
+ iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
+ objDestruct(pUsr);
RETiRet;
}
-static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
-{
- return RS_RET_OK;
-}
-
-
-/* --------------- end type-specific handlers -------------------- */
-
-
-/* unget a user pointer that has been dequeued. This functionality is especially important
- * for consumer cancel cleanup handlers. To support it, a short list of ungotten user pointers
- * is maintened in memory.
- * rgerhards, 2008-01-20
+/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead
+ * otherwise incured. -- rgerhards, ~2010-06-23
*/
-static rsRetVal
-qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
+rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
{
DEFiRet;
- DEFVARS_mutexProtection;
- ISOBJ_TYPE_assert(pThis, qqueue);
- ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15
- The second time I noticed it the queue was in destruction with NO worker threads
- running. The pUsr ptr was totally off and provided no clue what it may be pointing
- at (except that it looked like the static data pool). Both times, the abort happend
- inside an action queue */
+ ASSERT(pThis != NULL);
- dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr));
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
- iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
- ++pThis->iUngottenObjs; /* indicate one more */
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ /* calling the consumer is quite different here than it is from a worker thread */
+ /* we need to provide the consumer's return value back to the caller because in direct
+ * mode the consumer probably has a lot to convey (which get's lost in the other modes
+ * because they are asynchronous. But direct mode is deliberately synchronous.
+ * rgerhards, 2008-02-12
+ * We use our knowledge about the batch_t structure below, but without that, we
+ * pay a too-large performance toll... -- rgerhards, 2009-04-22
+ */
+ iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate);
RETiRet;
}
-/* dequeues a user pointer from the ungotten queue. Pointers from there should always be
- * dequeued first.
- *
- * This function must only be called when the mutex is locked!
- *
- * rgerhards, 2008-01-29
- */
-static rsRetVal
-qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
+static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
- ASSERT(ppUsr != NULL);
+ return RS_RET_OK;
+}
- iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
- --pThis->iUngottenObjs; /* indicate one less */
- dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr));
- RETiRet;
-}
+/* --------------- end type-specific handlers -------------------- */
/* generic code to add a queue entry
@@ -1028,8 +914,9 @@ qqueueAdd(qqueue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ATOMIC_INC(pThis->iQueueSize);
- dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize);
+ ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
+ DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n",
+ getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
}
finalize_it:
@@ -1037,12 +924,10 @@ finalize_it:
}
-/* generic code to remove a queue entry
- * rgerhards, 2008-01-29: we must first see if there is any object in the
- * ungotten list and, if so, dequeue it first.
+/* generic code to dequeue a queue entry
*/
static rsRetVal
-qqueueDel(qqueue_t *pThis, void *pUsr)
+qqueueDeq(qqueue_t *pThis, void **ppUsr)
{
DEFiRet;
@@ -1053,224 +938,249 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
* If we decrement, however, we may lose a message. But that is better than
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
- if(pThis->iUngottenObjs > 0) {
- iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
- } else {
- iRet = pThis->qDel(pThis, pUsr);
- ATOMIC_DEC(pThis->iQueueSize);
- }
+ iRet = pThis->qDeq(pThis, ppUsr);
+ ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq);
- dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
- iRet, pThis->iQueueSize);
+// DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
+// getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
}
-/* This function shuts down all worker threads and waits until they
- * have terminated. If they timeout, they are cancelled. Parameters have been set
- * before this function is called so that DA queues will be fully persisted to
- * disk (if configured to do so).
- * rgerhards, 2008-01-24
- * Please note that this function shuts down BOTH the parent AND the child queue
- * in DA case. This is necessary because their timeouts are tightly coupled. Most
- * importantly, the timeouts would be applied twice (or logic be extremely
- * complex) if each would have its own shutdown. The function does not self check
- * this condition - the caller must make sure it is not called with a parent.
+/* Try to shut down regular and DA queue workers, within the queue timeout
+ * period. That means processing continues as usual. This is the expected
+ * usual case, where during shutdown those messages remaining are being
+ * processed. At this point, it is acceptable that the queue can not be
+ * fully depleted, that case is handled in the next step. During this phase,
+ * we first shut down the main queue DA worker to prevent new data to arrive
+ * at the DA queue, and then we ask the regular workers of both the Regular
+ * and DA queue to try complete processing.
+ * rgerhards, 2009-10-14
*/
-static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
+static inline rsRetVal
+tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
{
- DEFiRet;
- DEFVARS_mutexProtection;
struct timespec tTimeout;
rsRetVal iRetLocal;
+ DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
- dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
+ if(pThis->bIsDA) {
+ /* We need to lock the mutex, as otherwise we may have a race that prevents
+ * us from awaking the DA worker. */
+ d_pthread_mutex_lock(pThis->mut);
- /* we reduce the low water mark in any case. This is not absolutely necessary, but
- * it is useful because we enable DA mode at several spots below and so we do not need
- * to think about the low water mark each time.
- */
- pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */
- pThis->iLowWtrMrk = 0;
+ /* tell regular queue DA worker to stop shuffling messages to DA queue... */
+ DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode for DA worker\n");
+ pThis->pqDA->bEnqOnly = 1;
+ wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE);
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
+ DBGOPRINT((obj_t*) pThis, "awoke DA worker, told it to shut down.\n");
- /* first try to shutdown the queue within the regular shutdown period */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(qqueueGetOverallQueueSize(pThis) > 0) {
- if(pThis->bRunsDA) {
- /* We may have waited on the low water mark. As it may have changed, we
- * see if we reactivate the worker.
- */
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
- }
+ /* also tell the DA queue worker to shut down, so that it already knows... */
+ wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN);
+ wtpAdviseMaxWorkers(pThis->pqDA->pWtpReg, 1); /* awake its lone worker */
+ DBGOPRINT((obj_t*) pThis, "awoke DA queue regular worker, told it to shut down when done.\n");
+
+ d_pthread_mutex_unlock(pThis->mut);
}
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found
- * out there are no active workers - that doesn't matter: the wtp knows about that and so will
- * return immediately.
- * We do not yet care about the DA worker - that will be handled down later in the process.
- * Note that we must not request shutdown right now - that may introduce a race: if the regular queue
- * still runs DA assisted and the DA worker gets scheduled first, it will terminate itself (if the DA
- * queue happens to be empty at that instant). Then the regular worker enqueues messages, what will lead
- * to a restart of the worker. Of course, everything will continue to run, but in a bit sub-optimal way
- * (from a performance point of view). So we don't do anything right now. The DA queue will continue to
- * process messages and shutdown itself in any case if there is nothing to do. So we don't loose anything
- * by not requesting shutdown now.
- * rgerhards, 2008-01-25
- */
+
/* first calculate absolute timeout - we need the absolute value here, because we need to coordinate
* shutdown of both the regular and DA queue on *the same* timeout.
*/
timeoutComp(&tTimeout, pThis->toQShutdown);
- dbgoprint((obj_t*) pThis, "trying shutdown of regular workers\n");
+ DBGOPRINT((obj_t*) pThis, "trying shutdown of regular workers\n");
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "regular shutdown timed out on primary queue (this is OK)\n");
+ DBGOPRINT((obj_t*) pThis, "regular shutdown timed out on primary queue (this is OK)\n");
} else {
- /* OK, the regular queue is now shut down. So we can now wait for the DA queue (if running DA) */
- dbgoprint((obj_t*) pThis, "regular queue workers shut down.\n");
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(pThis->bRunsDA) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
- qqueueGetID(pThis->pqDA));
- /* we use the same absolute timeout as above, so we do not use more than the configured
- * timeout interval!
- */
- dbgoprint((obj_t*) pThis, "trying shutdown of DA workers\n");
- iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "shutdown timed out on DA queue (this is OK)\n");
- }
- } else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- }
+ DBGOPRINT((obj_t*) pThis, "regular queue workers shut down.\n");
}
- /* when we reach this point, both queues are either empty or the regular queue shutdown timeout
- * has expired. Now we need to check if we are configured to not loose messages. If so, we need
- * to persist the queue to disk (this is only possible if the queue is DA-enabled). We must also
- * set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate as soon as its consumer
- * is done. This is especially important as we otherwise may interfere with queue order while the
- * DA consumer is running. -- rgerhards, 2008-01-27
- * Note: there was a note that we should not wait eternally on the DA worker if we run in
- * enqueue-only note. I have reviewed the code and think there is no need for this check. Howerver,
- * I'd like to keep this note in here should we happen to run into some related trouble.
- * rgerhards, 2008-01-28
- */
- wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */
-
- /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
- if(pThis->bRunsDA)
- qqueueWaitDAModeInitialized(pThis);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- /* optimize parameters for shutdown of DA-enabled queues */
- if(pThis->bIsDA && qqueueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
- /* switch to enqueue-only mode so that no more actions happen */
- if(pThis->bRunsDA == 0) {
- qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
+ /* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */
+ if(pThis->pqDA != NULL) {
+ DBGOPRINT((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
+ qqueueGetID(pThis->pqDA));
+ /* we use the same absolute timeout as above, so we do not use more than the configured
+ * timeout interval!
+ */
+ DBGOPRINT((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n");
+ iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ DBGOPRINT((obj_t*) pThis, "shutdown timed out on DA queue worker (this is OK)\n");
} else {
- /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1)
- * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27
- */
- qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
+ DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n");
}
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- /* make sure we do not timeout before we are done */
- dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n");
- timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
- /* and run the primary queue's DA worker to drain the queue */
- iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
- if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
- "continuing, but results are unpredictable\n", iRetLocal);
- }
- } else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
- /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we
- * can now request immediate shutdown of any remaining workers. Note that if bSaveOnShutdown was set,
- * the queue is now empty. If regular workers are still running, and try to pull the next message,
- * they will automatically terminate as there no longer is any message left to process.
- */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(qqueueGetOverallQueueSize(pThis) > 0) {
- timeoutComp(&tTimeout, pThis->toActShutdown);
- if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n");
- iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and "
- "triggers cancellation)\n");
- } else if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue "
- "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
- }
- /* we need to re-aquire the mutex for the next check in this case! */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ RETiRet;
+}
+
+
+/* Try to shut down regular and DA queue workers, within the action timeout
+ * period. This aborts processing, but at the end of the current action, in
+ * a well-defined manner. During this phase, we terminate all three worker
+ * pools, including the regular queue DA worker if it not yet has terminated.
+ * Not finishing processing all messages is OK (and expected) at this stage
+ * (they may be preserved later, depending * on bSaveOnShutdown setting).
+ * rgerhards, 2009-10-14
+ */
+static rsRetVal
+tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
+{
+ struct timespec tTimeout;
+ rsRetVal iRetLocal;
+ DEFiRet;
+
+RUNLOG_STR("trying to shutdown workers within Action Timeout");
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
+
+ /* instruct workers to finish ASAP, even if still work exists */
+ DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n");
+ pThis->bEnqOnly = 1;
+ pThis->bShutdownImmediate = 1;
+ /* now DA queue */
+ if(pThis->bIsDA) {
+ pThis->pqDA->bEnqOnly = 1;
+ pThis->pqDA->bShutdownImmediate = 1;
+ }
+
+// TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance...
+ /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
+ timeoutComp(&tTimeout, pThis->toActShutdown);
+ DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n");
+ iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ DBGOPRINT((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and "
+ "triggers cancellation)\n");
+ } else if(iRetLocal != RS_RET_OK) {
+ DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue "
+ "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
+ }
+
+ if(pThis->pqDA != NULL) {
+ /* and now the same for the DA queue */
+ DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n");
+ iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ DBGOPRINT((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable "
+ "and triggers cancellation)\n");
+ } else if(iRetLocal != RS_RET_OK) {
+ DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA "
+ "queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
- if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
- /* and now the same for the DA queue */
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA workers\n");
- iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable and "
- "triggers cancellation)\n");
- } else if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue "
- "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
- }
+
+ /* and now we need to terminate the DA worker itself. We always grant it a 100ms timeout,
+ * which should be sufficient and usually not be required (it is expected to have finished
+ * long before while we were processing the queue timeout in shutdown phase 1).
+ * rgerhards, 2009-10-14
+ */
+ timeoutComp(&tTimeout, 100);
+ DBGOPRINT((obj_t*) pThis, "trying regular shutdown of main queue DA worker pool\n");
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool "
+ "(this is not good, but probably OK)\n");
} else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down.\n");
}
- } else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
+ RETiRet;
+}
+
+
+/* This function cancels all remaining regular workers for both the main and the DA
+ * queue.
+ * rgerhards, 2009-05-29
+ */
+static rsRetVal
+cancelWorkers(qqueue_t *pThis)
+{
+ rsRetVal iRetLocal;
+ DEFiRet;
+
/* Now queue workers should have terminated. If not, we need to cancel them as we have applied
* all timeout setting. If any worker in any queue still executes, its consumer is possibly
- * long-running and cancelling is the only way to get rid of it. Note that the
- * cancellation handler will probably re-queue a user pointer, so the queue's enqueue
- * function is still needed (what is no problem as we do not yet destroy the queue - but I
- * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25
+ * long-running and cancelling is the only way to get rid of it.
*/
- dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n");
+ DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n");
iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */
if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel primary queue worker "
+ DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel primary queue worker "
"threads, continuing, but results are unpredictable\n", iRetLocal);
}
-
- /* TODO: think: do we really need to do this here? Can't it happen on DA queue destruction? If we
- * disable it, we get an assertion... I think this is OK, as we need to have a certain order and
- * canceling the DA workers here ensures that order. But in any instant, we may have a look at this
- * code after we have reaced the milestone. -- rgerhards, 2008-01-27
- */
/* ... and now the DA queue, if it exists (should always be after the primary one) */
if(pThis->pqDA != NULL) {
- dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the DA queue\n");
+ DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the DA queue\n");
iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg); /* returns immediately if all threads already have terminated */
if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
+ DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
"threads, continuing, but results are unpredictable\n", iRetLocal);
}
+
+ /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
+ * restarted later to persist the queue. But we stop it, because otherwise we get into
+ * big trouble when resetting the logical dequeue pointer. This operation can only be
+ * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
+ */
+ DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n");
+ iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */
}
+ RETiRet;
+}
+
+
+/* This function shuts down all worker threads and waits until they
+ * have terminated. If they timeout, they are cancelled.
+ * rgerhards, 2008-01-24
+ * Please note that this function shuts down BOTH the parent AND the child queue
+ * in DA case. This is necessary because their timeouts are tightly coupled. Most
+ * importantly, the timeouts would be applied twice (or logic be extremely
+ * complex) if each would have its own shutdown. The function does not self check
+ * this condition - the caller must make sure it is not called with a parent.
+ * rgerhards, 2009-05-26: we do NO longer persist the queue here if bSaveOnShutdown
+ * is set. This must be handled by the caller. Not doing that cleans up the queue
+ * shutdown considerably. Also, older engines had a potential hang condition when
+ * the DA queue was already started and the DA worker configured for infinite
+ * retries and the action was during retry processing. This was a design issue,
+ * which is solved as of now. Note that the shutdown now may take a little bit
+ * longer, because we no longer can persist the queue in parallel to waiting
+ * on worker timeouts.
+ */
+static rsRetVal
+ShutdownWorkers(qqueue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
+
+ DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
+
+ CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis));
+
+ if(getPhysicalQueueSize(pThis) > 0) {
+ CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis));
+ }
+
+ CHKiRet(cancelWorkers(pThis));
+
/* ... finally ... all worker threads have terminated :-)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
- * may still be running.
+ * may still be running. Note that the main queue's DA worker may still be running.
*/
- dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", qqueueGetOverallQueueSize(pThis));
+ DBGOPRINT((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n",
+ getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+finalize_it:
RETiRet;
}
@@ -1282,7 +1192,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1291,9 +1201,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
ASSERT(pConsumer != NULL);
ASSERT(iWorkerThreads >= 0);
- if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ CHKmalloc(pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t)));
/* we have an object, so let's fill the properties */
objConstructSetObjInfo(pThis);
@@ -1304,13 +1212,15 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */
pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */
- pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir);
+ pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
+ pThis->nLogDeq = 0;
pThis->iMaxQueueSize = iMaxQueueSize;
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
+ pThis->iDeqBatchSize = 8; /* conservative default, should still provide good performance */
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
@@ -1321,19 +1231,25 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qConstruct = qConstructFixedArray;
pThis->qDestruct = qDestructFixedArray;
pThis->qAdd = qAddFixedArray;
+ pThis->qDeq = qDeqFixedArray;
pThis->qDel = qDelFixedArray;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_LINKEDLIST:
pThis->qConstruct = qConstructLinkedList;
pThis->qDestruct = qDestructLinkedList;
pThis->qAdd = qAddLinkedList;
- pThis->qDel = (rsRetVal (*)(qqueue_t*,void**)) qDelLinkedList;
+ pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
+ pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
pThis->qDestruct = qDestructDisk;
pThis->qAdd = qAddDisk;
+ pThis->qDeq = qDeqDisk;
pThis->qDel = qDelDisk;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
break;
@@ -1342,49 +1258,23 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qDestruct = qDestructDirect;
pThis->qAdd = qAddDirect;
pThis->qDel = qDelDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
break;
}
-finalize_it:
- OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
- RETiRet;
-}
-
-
-/* cancellation cleanup handler for queueWorker ()
- * Updates admin structure and frees ressources.
- * Params:
- * arg1 - user pointer (in this case a qqueue_t)
- * arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!])
- * Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists!
- * rgerhards, 2008-01-16
- */
-static rsRetVal
-qqueueConsumerCancelCleanup(void *arg1, void *arg2)
-{
- DEFiRet;
-
- qqueue_t *pThis = (qqueue_t*) arg1;
- obj_t *pUsr = (obj_t*) arg2;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
- if(pUsr != NULL) {
- /* make sure the data element is not lost */
- dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
- CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX));
- }
-
finalize_it:
+ OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
RETiRet;
}
-
/* This function checks if the provided message shall be discarded and does so, if needed.
* In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
* provide real-time creation of spool files.
- * Note: cached copies of iQueueSize and bRunsDA are provided so that no mutex locks are required.
+ * Note: cached copies of iQueueSize is provided so that no mutex locks are required.
* The caller must have obtained them while the mutex was locked. Of course, these values may no
* longer be current, but that is OK for the discard check. At worst, the message is either processed
* or discarded when it should not have been. As discarding is in itself somewhat racy and erratic,
@@ -1394,7 +1284,7 @@ finalize_it:
* the return state!
* rgerhards, 2008-01-24
*/
-static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr)
+static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr)
{
DEFiRet;
rsRetVal iRetLocal;
@@ -1403,15 +1293,15 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, voi
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_assert(pUsr);
- if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) {
+ if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) {
iRetLocal = objGetSeverity(pUsr, &iSeverity);
if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
- dbgoprint((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n",
+ DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n",
iQueueSize, iSeverity);
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
- dbgoprint((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg "
+ DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg "
"(iRet: %d, severity %d)\n", iQueueSize, iRetLocal, iSeverity);
}
}
@@ -1421,38 +1311,191 @@ finalize_it:
}
-/* dequeue the queued object for the queue consumers.
- * rgerhards, 2008-10-21
+/* Finally remove n elements from the queue store.
*/
-static rsRetVal
-qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+static inline rsRetVal
+DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
+{
+ int i;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ /* now send delete request to storage driver */
+ for(i = 0 ; i < nElem ; ++i) {
+ pThis->qDel(pThis);
+ }
+
+ /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
+ ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize);
+ ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq);
+dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+ ++pThis->deqIDDel; /* one more batch dequeued */
+
+ RETiRet;
+}
+
+
+/* remove messages from the physical queue store that are fully processed. This is
+ * controlled via the to-delete list.
+ */
+static inline rsRetVal
+DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
{
+ toDeleteLst_t *pTdl;
+ qDeqID deqIDDel;
DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(pBatch != NULL);
+
+ pTdl = tdlPeek(pThis); /* get current head element */
+ if(pTdl == NULL) { /* to-delete list empty */
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
+ } else if(pBatch->deqID == pThis->deqIDDel) {
+ deqIDDel = pThis->deqIDDel;
+ pTdl = tdlPeek(pThis);
+ while(pTdl != NULL && deqIDDel == pTdl->deqID) {
+ DoDeleteBatchFromQStore(pThis, pTdl->nElemDeq);
+ tdlPop(pThis);
+ ++deqIDDel;
+ pTdl = tdlPeek(pThis);
+ }
+ /* old entries deleted, now delete current ones... */
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
+ } else {
+ /* can not delete, insert into to-delete list */
+ dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID);
+ CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* Delete a batch of processed user objects from the queue, which includes
+ * destructing the objects themself. Any entries not marked as finally
+ * processed are enqueued again. The new enqueue is necessary because we have a
+ * rgerhards, 2009-05-13
+ */
+static inline rsRetVal
+DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
+{
+ int i;
void *pUsr;
+ int nEnqueued = 0;
+ rsRetVal localRet;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(pBatch != NULL);
+
+ for(i = 0 ; i < pBatch->nElem ; ++i) {
+ pUsr = pBatch->pElem[i].pUsrp;
+ if( pBatch->pElem[i].state == BATCH_STATE_RDY
+ || pBatch->pElem[i].state == BATCH_STATE_SUB) {
+dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch->nElem, pBatch->pElem[i].state);
+ localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
+ (obj_t*)MsgAddRef((msg_t*) pUsr));
+ ++nEnqueued;
+ if(localRet != RS_RET_OK) {
+ DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
+ }
+ }
+ objDestruct(pUsr);
+ }
+
+ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
+
+ if(nEnqueued > 0)
+ qqueueChkPersist(pThis, nEnqueued);
+
+ iRet = DeleteBatchFromQStore(pThis, pBatch);
+
+ pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ // TODO: more fine init, new fields! 2010-06-14
+
+ RETiRet;
+}
+
+
+/* dequeue as many user pointers as are available, until we hit the configured
+ * upper limit of pointers. Note that this function also deletes all processed
+ * objects from the previous batch. However, it is perfectly valid that the
+ * previous batch contained NO objects at all. For example, this happens
+ * immediately after system startup or when a queue was exhausted and the queue
+ * worker needed to wait for new data.
+ * This must only be called when the queue mutex is LOOKED, otherwise serious
+ * malfunction will happen.
+ */
+static inline rsRetVal
+DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSize)
+{
+ int nDequeued;
+ int nDiscarded;
+ int nDeleted;
int iQueueSize;
- int bRunsDA; /* cache for early mutex release */
-
- /* dequeue element (still protected from mutex) */
- iRet = qqueueDel(pThis, &pUsr);
- qqueueChkPersist(pThis);
- iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */
- bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
-
- /* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY
- * if we could successfully obtain a user pointer. Otherwise, we would bring the
- * cancel cleanup handler into big troubles (and we did ;)). Note that we can
- * NOT set the variable further below, as this may lead to an object leak. We
- * may get cancelled before we reach that part of the code, so the only
- * solution is to do it here. -- rgerhards, 2008-02-27
- */
- if(iRet == RS_RET_OK) {
- pWti->pUsrp = pUsr;
+ void *pUsr;
+ rsRetVal localRet;
+ DEFiRet;
+
+ nDeleted = pWti->batch.nElemDeq;
+ DeleteProcessedBatch(pThis, &pWti->batch);
+
+ nDequeued = nDiscarded = 0;
+ while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
+ CHKiRet(qqueueDeq(pThis, &pUsr));
+
+ /* check if we should discard this element */
+ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr);
+ if(localRet == RS_RET_QUEUE_FULL) {
+ ++nDiscarded;
+ continue;
+ } else if(localRet != RS_RET_OK) {
+ ABORT_FINALIZE(localRet);
+ }
+
+ /* all well, use this element */
+ pWti->batch.pElem[nDequeued].pUsrp = pUsr;
+ pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
+ pWti->batch.pElem[nDequeued].bFilterOK = 1; // TODO: think again if we can handle that with more performance
+ ++nDequeued;
}
+ /* it is sufficient to persist only when the bulk of work is done */
+ qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
+
+ pWti->batch.nElem = nDequeued;
+ pWti->batch.nElemDeq = nDequeued + nDiscarded;
+ pWti->batch.deqID = getNextDeqID(pThis);
+ *piRemainingQueueSize = iQueueSize;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* dequeue the queued object for the queue consumers.
+ * rgerhards, 2008-10-21
+ * I made a radical change - we now dequeue multiple elements, and store these objects in
+ * an array of user pointers. We expect that this increases performance.
+ * rgerhards, 2009-04-22
+ */
+static rsRetVal
+DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
+{
+ DEFiRet;
+ int iQueueSize = 0; /* keep the compiler happy... */
+
+ /* dequeue element batch (still protected from mutex) */
+ iRet = DequeueConsumableElements(pThis, pWti, &iQueueSize);
+
/* awake some flow-controlled sources if we can do this right now */
/* TODO: this could be done better from a performance point of view -- do it only if
* we have someone waiting for the condition (or only when we hit the watermark right
* on the nail [exact value]) -- rgerhards, 2008-03-14
+ * now that we dequeue batches of pointers, this is much less an issue...
+ * rgerhards, 2009-04-22
*/
if(iQueueSize < pThis->iFullDlyMrk / 2) {
pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
@@ -1462,37 +1505,15 @@ qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
- /* rgerhards, 2008-09-30: I reversed the order of cond_signal und mutex_unlock
- * as of the pthreads recommendation on predictable scheduling behaviour. I don't see
- * any problems caused by this, but I add this comment in case some will be seen
- * in the next time.
- */
+ // TODO: MULTI: check physical queue size?
pthread_cond_signal(&pThis->notFull);
- d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
- /* do actual processing (the lengthy part, runs in parallel)
- * If we had a problem while dequeing, we do not call the consumer,
- * but we otherwise ignore it. This is in the hopes that it will be
- * self-healing. However, this is really not a good thing.
- * rgerhards, 2008-01-03
- */
- if(iRet != RS_RET_OK)
- FINALIZE;
-
- /* we are running in normal, non-disk-assisted mode do a quick check if we need to drain the queue.
- * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
- * provide real-time creation of spool files.
- * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong.
- */
- CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
-
-finalize_it:
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
- dbgoprint((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things "
+ DBGOPRINT((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things "
"may happen\n", iRet);
}
+
RETiRet;
}
@@ -1535,7 +1556,7 @@ finalize_it:
* but you get the idea from the code above.
*/
static rsRetVal
-qqueueRateLimiter(qqueue_t *pThis)
+RateLimiter(qqueue_t *pThis)
{
DEFiRet;
int iDelay;
@@ -1548,7 +1569,7 @@ qqueueRateLimiter(qqueue_t *pThis)
iDelay = 0;
if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
/* time calls are expensive, so only do them when needed */
- time(&tCurr);
+ datetime.GetTime(&tCurr);
localtime_r(&tCurr, &m);
iHrCurr = m.tm_hour;
@@ -1584,7 +1605,7 @@ qqueueRateLimiter(qqueue_t *pThis)
}
if(iDelay > 0) {
- dbgoprint((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay);
+ DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay);
srSleep(iDelay, 0);
}
@@ -1592,37 +1613,105 @@ qqueueRateLimiter(qqueue_t *pThis)
}
+/* This dequeues the next batch. Note that this function must not be
+ * cancelled, else it will leave back an inconsistent state.
+ * rgerhards, 2009-05-20
+ */
+static inline rsRetVal
+DequeueForConsumer(qqueue_t *pThis, wti_t *pWti)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ ISOBJ_TYPE_assert(pWti, wti);
+
+ CHKiRet(DequeueConsumable(pThis, pWti));
+
+ if(pWti->batch.nElem == 0)
+ ABORT_FINALIZE(RS_RET_IDLE);
+
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* This is called when a batch is processed and the worker does not
+ * ask for another batch (e.g. because it is to be terminated)
+ * Note that we must not be terminated while we delete a processed
+ * batch. Otherwise, we may not complete it, and then the cancel
+ * handler also tries to delete the batch. But then it finds some of
+ * the messages already destructed. This was a bug we have seen, especially
+ * with disk mode, where a delete takes rather long. Anyhow, the coneptual
+ * problem exists in all queue modes.
+ * rgerhards, 2009-05-27
+ */
+static rsRetVal
+batchProcessed(qqueue_t *pThis, wti_t *pWti)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ ISOBJ_TYPE_assert(pWti, wti);
+
+ int iCancelStateSave;
+ /* at this spot, we must not be cancelled */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ DeleteProcessedBatch(pThis, &pWti->batch);
+ qqueueChkPersist(pThis, pWti->batch.nElemDeq);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+
+ RETiRet;
+}
+
/* This is the queue consumer in the regular (non-DA) case. It is
* protected by the queue mutex, but MUST release it as soon as possible.
* rgerhards, 2008-01-21
*/
static rsRetVal
-qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerReg(qqueue_t *pThis, wti_t *pWti)
{
+ int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
+ CHKiRet(DequeueForConsumer(pThis, pWti));
+
+ /* we now have a non-idle batch of work, so we can release the queue mutex and process it */
+ d_pthread_mutex_unlock(pThis->mut);
+
+ /* at this spot, we may be cancelled */
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
+
+ CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
*/
+//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that...
if(pThis->iDeqSlowdown) {
- dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n",
+ DBGOPRINT((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n",
pThis->iDeqSlowdown);
srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000);
}
+ /* but now cancellation is no longer permitted */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+
+ /* now we are done, but need to re-aquire the mutex */
+ d_pthread_mutex_lock(pThis->mut);
+
finalize_it:
+ dbgprintf("regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet,
+ getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
}
-/* This is a special consumer to feed the disk-queue in disk-assited mode.
+/* This is a special consumer to feed the disk-queue in disk-assisted mode.
* When active, our own queue more or less acts as a memory buffer to the disk.
* So this consumer just needs to drain the memory queue and submit entries
* to the disk queue. The disk queue will then call the actual consumer from
@@ -1632,151 +1721,92 @@ finalize_it:
* rgerhards, 2008-01-14
*/
static rsRetVal
-qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerDA(qqueue_t *pThis, wti_t *pWti)
{
+ int i;
+ int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
+ CHKiRet(DequeueForConsumer(pThis, pWti));
+
+ /* we now have a non-idle batch of work, so we can release the queue mutex and process it */
+ d_pthread_mutex_unlock(pThis->mut);
+
+ /* at this spot, we may be cancelled */
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
+
+ /* iterate over returned results and enqueue them in DA queue */
+ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
+ /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
+ * the message. So far, we simply assume we always have msg_t, what currently is always the case.
+ * rgerhards, 2009-05-28
+ */
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY,
+ (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
+ pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
+ }
+
+ /* but now cancellation is no longer permitted */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+
+ /* now we are done, but need to re-aquire the mutex */
+ d_pthread_mutex_lock(pThis->mut);
finalize_it:
- dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
+ DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
RETiRet;
}
/* must only be called when the queue mutex is locked, else results
* are not stable!
- * If we are a child, we have done our duty when the queue is empty. In that case,
- * we can terminate.
- * Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from
- * the DA queue
*/
-static int
+static rsRetVal
qqueueChkStopWrkrDA(qqueue_t *pThis)
{
- /* if our queue is in destruction, we drain to the DA queue and so we shall not terminate
- * until we have done so.
- */
- int bStopWrkr;
-
- BEGINfunc
+ DEFiRet;
if(pThis->bEnqOnly) {
- bStopWrkr = 1;
- } else {
- if(pThis->bRunsDA == 2) {
- ASSERT(pThis->pqDA != NULL);
- if( pThis->pqDA->bEnqOnly
- && pThis->pqDA->sizeOnDiskMax > 0
- && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
- /* this queue can never grow, so we can give up... */
- bStopWrkr = 1;
- } else if(qqueueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
- bStopWrkr = 1;
- } else {
- bStopWrkr = 0;
- }
- } else {
- bStopWrkr = 1;
- }
+ iRet = RS_RET_TERMINATE_WHEN_IDLE;
}
- ENDfunc
- return bStopWrkr;
+ RETiRet;
}
/* must only be called when the queue mutex is locked, else results
* are not stable!
* If we are a child, we have done our duty when the queue is empty. In that case,
- * we can terminate.
- * Version for the regular worker thread. NOTE: the pThis->bRunsDA is different from
- * the DA queue
- */
-static int
-qqueueChkStopWrkrReg(qqueue_t *pThis)
-{
- return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0);
-}
-
-
-/* must only be called when the queue mutex is locked, else results
- * are not stable! DA queue version
- */
-static int
-qqueueIsIdleDA(qqueue_t *pThis)
-{
- /* remember: iQueueSize is the DA queue size, not the main queue! */
- /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
- return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
-}
-/* must only be called when the queue mutex is locked, else results
- * are not stable! Regular queue version
- */
-static int
-qqueueIsIdleReg(qqueue_t *pThis)
-{
-#if 0 /* enable for performance testing */
- int ret;
- ret = qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk);
- if(ret) fprintf(stderr, "queue is idle\n");
- return ret;
-#else
- /* regular code! */
- return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
-#endif
-}
-
-
-/* This function is called when a worker thread for the regular queue is shut down.
- * If we are the primary queue, this is not really interesting to us. If, however,
- * we are the DA (child) queue, that means the DA queue is empty. In that case, we
- * need to signal the parent queue's DA worker, so that it can terminate DA mode.
- * rgerhards, 2008-01-26
- * rgerhards, 2008-02-27: HOWEVER, in a shutdown condition, it may be that the parent's worker thread pool
- * has already been terminated and destructed. This *is* a legal condition and happens
- * from time to time in practice. So we need to signal only if there still is a
- * parent DA worker queue. Please keep in mind that the the parent's DA worker
- * pool is DIFFERENT from our (DA queue) regular worker pool. So when the parent's
- * pWtpDA is destructed, there can still be some of our (DAq/wtp) threads be running.
- * I am telling this, because I, too, always get confused by those...
+ * we can terminate. Version for the regular worker thread.
*/
static rsRetVal
-qqueueRegOnWrkrShutdown(qqueue_t *pThis)
+ChkStopWrkrReg(qqueue_t *pThis)
{
DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- if(pThis->pqParent != NULL) {
- pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
- if(pThis->pqParent->pWtpDA != NULL) { /* see comment in function header from 2008-02-27 */
- wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
- }
+ if(pThis->bEnqOnly) {
+ iRet = RS_RET_TERMINATE_NOW;
+ } else if(pThis->pqParent != NULL) {
+ iRet = RS_RET_TERMINATE_WHEN_IDLE;
}
RETiRet;
}
-/* The following function is called when a regular queue worker starts up. We need this
- * hook to indicate in the parent queue (if we are a child) that we are not done yet.
+/* return the configured "deq max at once" interval
+ * rgerhards, 2009-04-22
*/
static rsRetVal
-qqueueRegOnWrkrStartup(qqueue_t *pThis)
+GetDeqBatchSize(qqueue_t *pThis, int *pVal)
{
DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- if(pThis->pqParent != NULL) {
- pThis->pqParent->bChildIsDone = 0;
- }
-
+ assert(pVal != NULL);
+ *pVal = pThis->iDeqBatchSize;
+if(pThis->pqParent != NULL) // TODO: check why we actually do this!
+ *pVal = 16;
RETiRet;
}
@@ -1784,12 +1814,12 @@ qqueueRegOnWrkrStartup(qqueue_t *pThis)
/* start up the queue - it must have been constructed and parameters defined
* before.
*/
-rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
+rsRetVal
+qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
- rsRetVal iRetLocal;
- int bInitialized = 0; /* is queue already initialized? */
uchar pszBuf[64];
+ uchar *qName;
size_t lenBuf;
ASSERT(pThis != NULL);
@@ -1802,11 +1832,11 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
* influenced by properties which might have been set after queueConstruct ()
*/
if(pThis->pqParent == NULL) {
- pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
+ pThis->mut = (pthread_mutex_t *) MALLOC (sizeof (pthread_mutex_t));
pthread_mutex_init(pThis->mut, NULL);
} else {
/* child queue, we need to use parent's mutex */
- dbgoprint((obj_t*) pThis, "I am a child\n");
+ DBGOPRINT((obj_t*) pThis, "I am a child\n");
pThis->mut = pThis->pqParent->mut;
}
@@ -1820,28 +1850,26 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
- dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, "
- "full delay %d, light delay %d starting\n",
+ DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, "
+ "full delay %d, light delay %d, deq batch size %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
- qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1,
- pThis->iFullDlyMrk, pThis->iLightDlyMrk);
+ getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis),
+ pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk,
+ pThis->iDeqBatchSize);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
- /* create worker thread pools for regular operation. The DA pool is created on an as-needed
- * basis, which potentially means never under most circumstances.
+ /* create worker thread pools for regular and DA operation.
*/
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
- CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown));
+ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg));
+ CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
@@ -1849,27 +1877,11 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis));
CHKiRet(wtpConstructFinalize (pThis->pWtpReg));
- /* initialize worker thread instances */
- if(pThis->bIsDA) {
- /* If we are disk-assisted, we need to check if there is a QIF file
- * which we need to load. -- rgerhards, 2008-01-15
- */
- iRetLocal = qqueueHaveQIF(pThis);
- if(iRetLocal == RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
- qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
- bInitialized = 1; /* we are done */
- } else {
- /* TODO: use logerror? -- rgerhards, 2008-01-16 */
- dbgoprint((obj_t*) pThis, "error %d trying to access on-disk queue files, starting without them. "
- "Some data may be lost\n", iRetLocal);
- }
- }
+ /* set up DA system if we have a disk-assisted queue */
+ if(pThis->bIsDA)
+ InitDA(pThis, LOCK_MUTEX); /* initiate DA mode */
- if(!bInitialized) {
- dbgoprint((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA "
- "queue itself!)\n");
- }
+ DBGOPRINT((obj_t*) pThis, "queue finished initialization\n");
/* if the queue already contains data, we need to start the correct number of worker threads. This can be
* the case when a disk queue has been loaded. If we did not start it here, it would never start.
@@ -1877,6 +1889,27 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
qqueueAdviseMaxWorkers(pThis);
pThis->bQueueStarted = 1;
+ /* support statistics gathering */
+ qName = obj.GetName((obj_t*)pThis);
+ CHKiRet(statsobj.Construct(&pThis->statsobj));
+ CHKiRet(statsobj.SetName(pThis->statsobj, qName));
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("size"),
+ ctrType_Int, &pThis->iQueueSize));
+
+ STATSCOUNTER_INIT(pThis->ctrEnqueued, pThis->mutCtrEnqueued);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("enqueued"),
+ ctrType_IntCtr, &pThis->ctrEnqueued));
+
+ STATSCOUNTER_INIT(pThis->ctrFull, pThis->mutCtrFull);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("full"),
+ ctrType_IntCtr, &pThis->ctrFull));
+
+ pThis->ctrMaxqsize = 0;
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("maxqsize"),
+ ctrType_Int, &pThis->ctrMaxqsize));
+
+ CHKiRet(statsobj.ConstructFinalize(pThis->statsobj));
+
finalize_it:
RETiRet;
}
@@ -1895,12 +1928,11 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
strm_t *psQIF = NULL; /* Queue Info File */
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
- obj_t *pUsr;
ASSERT(pThis != NULL);
if(pThis->qType != QUEUETYPE_DISK) {
- if(qqueueGetOverallQueueSize(pThis) > 0) {
+ if(getPhysicalQueueSize(pThis) > 0) {
/* This error code is OK, but we will probably not implement this any time
* The reason is that persistence happens via DA queues. But I would like to
* leave the code as is, as we so have a hook in case we need one.
@@ -1911,20 +1943,20 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
FINALIZE; /* if the queue is empty, we are happy and done... */
}
- dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", qqueueGetOverallQueueSize(pThis));
+ DBGOPRINT((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis));
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
(char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
- if((bIsCheckpoint != QUEUE_CHECKPOINT) && (qqueueGetOverallQueueSize(pThis) == 0)) {
+ if((bIsCheckpoint != QUEUE_CHECKPOINT) && (getPhysicalQueueSize(pThis) == 0)) {
if(pThis->bNeedDelQIF) {
unlink((char*)pszQIFNam);
pThis->bNeedDelQIF = 0;
}
/* indicate spool file needs to be deleted */
- if (pThis->tVars.disk.pRead != NULL)
- CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
+ if(pThis->tVars.disk.pReadDel != NULL) /* may be NULL if we had a startup failure! */
+ CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1));
FINALIZE; /* nothing left to do, so be happy */
}
@@ -1943,31 +1975,19 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
*/
CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
- objSerializeSCALAR(psQIF, iUngottenObjs, INT);
objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64);
objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64);
CHKiRet(obj.EndSerialize(psQIF));
- /* now we must persist all objects on the ungotten queue - they can not go to
- * to the regular files. -- rgerhards, 2008-01-29
- */
- while(pThis->iUngottenObjs > 0) {
- CHKiRet(qqueueGetUngottenObj(pThis, &pUsr));
- CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
- objDestruct(pUsr);
- }
-
/* now persist the stream info */
- if (pThis->tVars.disk.pWrite != NULL)
- CHKiRet(strm.Serialize(pThis->tVars.disk.pWrite, psQIF));
- if (pThis->tVars.disk.pRead != NULL)
- CHKiRet(strm.Serialize(pThis->tVars.disk.pRead, psQIF));
+ CHKiRet(strm.Serialize(pThis->tVars.disk.pWrite, psQIF));
+ CHKiRet(strm.Serialize(pThis->tVars.disk.pReadDel, psQIF));
/* tell the input file object that it must not delete the file on close if the queue
* is non-empty - but only if we are not during a simple checkpoint
*/
- if(bIsCheckpoint != QUEUE_CHECKPOINT && pThis->tVars.disk.pRead != NULL) {
- CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
+ if(bIsCheckpoint != QUEUE_CHECKPOINT) {
+ CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 0));
}
/* we have persisted the queue object. So whenever it comes to an empty queue,
@@ -1984,36 +2004,86 @@ finalize_it:
/* check if we need to persist the current queue info. If an
- * error occurs, thus should be ignored by caller (but we still
+ * error occurs, this should be ignored by caller (but we still
* abide to our regular call interface)...
* rgerhards, 2008-01-13
+ * nUpdates is the number of updates since the last call to this function.
+ * It may be > 1 due to batches. -- rgerhards, 2009-05-12
*/
-static rsRetVal qqueueChkPersist(qqueue_t *pThis)
+static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates)
{
+ DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(nUpdates >= 0);
- if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
+ if(nUpdates == 0)
+ FINALIZE;
+
+ pThis->iUpdsSincePersist += nUpdates;
+ if(pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
qqueuePersist(pThis, QUEUE_CHECKPOINT);
pThis->iUpdsSincePersist = 0;
}
- return RS_RET_OK;
+finalize_it:
+ RETiRet;
+}
+
+
+/* persist a queue with all data elements to disk - this is used to handle
+ * bSaveOnShutdown. We utilize the DA worker to do this. This must only
+ * be called after all workers have been shut down and if bSaveOnShutdown
+ * is actually set. Note that this function may potentially run long,
+ * depending on the queue configuration (e.g. store on remote machine).
+ * rgerhards, 2009-05-26
+ */
+static inline rsRetVal
+DoSaveOnShutdown(qqueue_t *pThis)
+{
+ struct timespec tTimeout;
+ rsRetVal iRetLocal;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ /* we reduce the low water mark, otherwise the DA worker would terminate when
+ * it is reached.
+ */
+ DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown set, restarting DA worker...\n");
+ pThis->bShutdownImmediate = 0; /* would termiante the DA worker! */
+ pThis->iLowWtrMrk = 0;
+ wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN); /* shutdown worker (only) when done (was _IMMEDIATE!) */
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* restart DA worker */
+
+ DBGOPRINT((obj_t*) pThis, "waiting for DA worker to terminate...\n");
+ timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
+ /* and run the primary queue's DA worker to drain the queue */
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
+ DBGOPRINT((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n",
+ iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+ if(iRetLocal != RS_RET_OK) {
+ DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
+ "continuing, but results are unpredictable\n", iRetLocal);
+ }
+
+ RETiRet;
}
/* destructor for the queue object */
BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(qqueue)
- pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
-
- /* shut down all workers (handles *all* of the persistence logic)
- * See function head comment of queueShutdownWorkers () on why we don't call it
- * We also do not need to shutdown workers when we are in enqueue-only mode or we are a
+ /* shut down all workers
+ * We do not need to shutdown workers when we are in enqueue-only mode or we are a
* direct queue - because in both cases we have none... ;)
* with a child! -- rgerhards, 2008-01-28
*/
if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL)
- qqueueShutdownWorkers(pThis);
+ ShutdownWorkers(pThis);
+
+ if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
+ CHKiRet(DoSaveOnShutdown(pThis));
+ }
/* finally destruct our (regular) worker thread pool
* Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
@@ -2049,7 +2119,7 @@ CODESTARTobjDestruct(qqueue)
* if need arises (what I doubt...) -- rgerhards, 2008-01-25
*/
CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
- dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet);
+ DBGOPRINT((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet);
}
/* finally, clean up some simple things... */
@@ -2065,14 +2135,18 @@ CODESTARTobjDestruct(qqueue)
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
+
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
- if(pThis->pszFilePrefix != NULL)
- free(pThis->pszFilePrefix);
+ free(pThis->pszFilePrefix);
+ free(pThis->pszSpoolDir);
- if(pThis->pszSpoolDir != NULL)
- free(pThis->pszSpoolDir);
+ /* some queues do not provide stats and thus have no statsobj! */
+ if(pThis->statsobj != NULL)
+ statsobj.Destruct(&pThis->statsobj);
ENDobjDestruct(qqueue)
@@ -2086,13 +2160,13 @@ qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
{
DEFiRet;
- if(pThis->pszFilePrefix != NULL)
- free(pThis->pszFilePrefix);
+ free(pThis->pszFilePrefix);
+ pThis->pszFilePrefix = NULL;
if(pszPrefix == NULL) /* just unset the prefix! */
ABORT_FINALIZE(RS_RET_OK);
- if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL)
+ if((pThis->pszFilePrefix = MALLOC(sizeof(uchar) * iLenPrefix + 1)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1);
pThis->lenFilePrefix = iLenPrefix;
@@ -2122,111 +2196,7 @@ finalize_it:
}
-/* enqueue a new user data element
- * Enqueues the new element and awakes worker thread.
- */
-rsRetVal
-qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
-{
- DEFiRet;
- int iCancelStateSave;
- struct timespec t;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- /* first check if we need to discard this message (which will cause CHKiRet() to exit)
- * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize
- * and bRunsDA parameters may not reflect the correct settings here, but they are
- * "good enough" in the sense that they can be used to drive the decision. Valgrind's
- * threading tools may point this access to be an error, but this is done
- * intentional. I do not see this causes problems to us.
- */
- CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
-
- /* Please note that this function is not cancel-safe and consequently
- * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
- * during its execution. If that is not done, race conditions occur if the
- * thread is canceled (most important use case is input module termination).
- * rgerhards, 2008-01-08
- */
- if(pThis->qType != QUEUETYPE_DIRECT) {
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->mut);
- }
-
- /* then check if we need to add an assistance disk queue */
- if(pThis->bIsDA)
- CHKiRet(qqueueChkStrtDA(pThis));
-
- /* handle flow control
- * There are two different flow control mechanisms: basic and advanced flow control.
- * Basic flow control has always been implemented and protects the queue structures
- * in that it makes sure no more data is enqueued than the queue is configured to
- * support. Enhanced flow control is being added today. There are some sources which
- * can easily be stopped, e.g. a file reader. This is the case because it is unlikely
- * that blocking those sources will have negative effects (after all, the file is
- * continued to be written). Other sources can somewhat be blocked (e.g. the kernel
- * log reader or the local log stream reader): in general, nothing is lost if messages
- * from these sources are not picked up immediately. HOWEVER, they can not block for
- * an extended period of time, as this either causes message loss or - even worse - some
- * other bad effects (e.g. unresponsive system in respect to the main system log socket).
- * Finally, there are some (few) sources which can not be blocked at all. UDP syslog is
- * a prime example. If a UDP message is not received, it is simply lost. So we can't
- * do anything against UDP sockets that come in too fast. The core idea of advanced
- * flow control is that we take into account the different natures of the sources and
- * select flow control mechanisms that fit these needs. This also means, in the end
- * result, that non-blockable sources like UDP syslog receive priority in the system.
- * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14
- */
- if(flowCtlType == eFLOWCTL_FULL_DELAY) {
- while(pThis->iQueueSize >= pThis->iFullDlyMrk) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n");
- pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */
- }
- } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) {
- if(pThis->iQueueSize >= pThis->iLightDlyMrk) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n");
- timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
- pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */
- }
- }
-
- /* from our regular flow control settings, we are now ready to enqueue the object.
- * However, we now need to do a check if the queue permits to add more data. If that
- * is not the case, basic flow control enters the field, which means we wait for
- * the queue to become ready or drop the new message. -- rgerhards, 2008-03-14
- */
- while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize)
- || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
- && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
- timeoutComp(&t, pThis->toEnq);
- if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
- objDestruct(pUsr);
- ABORT_FINALIZE(RS_RET_QUEUE_FULL);
- }
- }
-
- /* and finally enqueue the message */
- CHKiRet(qqueueAdd(pThis, pUsr));
- qqueueChkPersist(pThis);
-
-finalize_it:
- if(pThis->qType != QUEUETYPE_DIRECT) {
- /* make sure at least one worker is running. */
- qqueueAdviseMaxWorkers(pThis);
- /* and release the mutex */
- d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
- dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
- }
-
- RETiRet;
-}
-
-
-/* enqueue a single data object. This currently is a helper to qqueueMultiEnqObj.
+/* enqueue a single data object.
* Note that the queue mutex MUST already be locked when this function is called.
* rgerhards, 2009-06-16
*/
@@ -2236,14 +2206,11 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
DEFiRet;
struct timespec t;
+ STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued);
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
*/
- CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
+ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr));
- /* then check if we need to add an assistance disk queue */
- if(pThis->bIsDA)
- CHKiRet(qqueueChkStrtDA(pThis));
-
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
* Basic flow control has always been implemented and protects the queue structures
@@ -2266,12 +2233,12 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
*/
if(flowCtlType == eFLOWCTL_FULL_DELAY) {
while(pThis->iQueueSize >= pThis->iFullDlyMrk) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n");
+ DBGOPRINT((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n");
pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */
}
} else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) {
if(pThis->iQueueSize >= pThis->iLightDlyMrk) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n");
+ DBGOPRINT((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n");
timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */
}
@@ -2285,10 +2252,12 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize)
|| (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
&& pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
+ DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
timeoutComp(&t, pThis->toEnq);
+ STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull);
+// TODO : handle enqOnly => discard!
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
+ DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
@@ -2296,12 +2265,13 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
/* and finally enqueue the message */
CHKiRet(qqueueAdd(pThis, pUsr));
- qqueueChkPersist(pThis); // TODO: optimize, do in outer function! (but we need parts from v5?)
+ STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize);
finalize_it:
RETiRet;
}
+/* ------------------------------ multi-enqueue functions ------------------------------ */
/* enqueue multiple user data elements at once. The aim is to provide a faster interface
* for object submission. Uses the multi_submit_t helper object.
* Please note that this function is not cancel-safe and consequently
@@ -2309,9 +2279,12 @@ finalize_it:
* during its execution. If that is not done, race conditions occur if the
* thread is canceled (most important use case is input module termination).
* rgerhards, 2009-06-16
+ * Note: there now exists multiple different functions implementing specially
+ * optimized algorithms for different config cases. -- rgerhards, 2010-06-09
*/
-rsRetVal
-qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
+/* now the function for all modes but direct */
+static rsRetVal
+qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
{
int iCancelStateSave;
int i;
@@ -2320,78 +2293,89 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pMultiSub != NULL);
- if(pThis->qType != QUEUETYPE_DIRECT) {
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->mut);
- }
-
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(pThis->mut);
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
-dbgprintf("queueMultiEnq: %d\n", i);
CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]));
}
+ qqueueChkPersist(pThis, pMultiSub->nElem);
finalize_it:
- if(pThis->qType != QUEUETYPE_DIRECT) {
- /* make sure at least one worker is running. */
- qqueueAdviseMaxWorkers(pThis);
- /* and release the mutex */
- d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
- dbgoprint((obj_t*) pThis, "MultiEnqObj advised worker start\n");
+ /* make sure at least one worker is running. */
+ qqueueAdviseMaxWorkers(pThis);
+ /* and release the mutex */
+ d_pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n");
+
+ RETiRet;
+}
+
+/* now, the same function, but for direct mode */
+static rsRetVal
+qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
+{
+ int i;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(pMultiSub != NULL);
+
+ for(i = 0 ; i < pMultiSub->nElem ; ++i) {
+ CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i]));
}
+finalize_it:
RETiRet;
}
+/* ------------------------------ END multi-enqueue functions ------------------------------ */
-/* set queue mode to enqueue only or not
- * There is one subtle issue: this method may be called during queue
- * construction or while it is running. In the former case, the queue
- * mutex does not yet exist (it is NULL), while in the later case it
- * must be locked. The function detects the state and operates as
- * required.
- * rgerhards, 2008-01-16
+/* enqueue a new user data element in direct mode
+ * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better
+ * code later on (like multi submit!) 2010-06-10
+ * Enqueues the new element and awakes worker thread.
*/
-static rsRetVal
-qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
+rsRetVal
+qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr)
{
DEFiRet;
- DEFVARS_mutexProtection;
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ iRet = qAddDirect(pThis, pUsr);
+ RETiRet;
+}
+
+
+/* enqueue a new user data element
+ * Enqueues the new element and awakes worker thread.
+ */
+rsRetVal
+qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+{
+ DEFiRet;
+ int iCancelStateSave;
ISOBJ_TYPE_assert(pThis, qqueue);
- /* for simplicity, we do one big mutex lock. This method is extremely seldom
- * called, so that doesn't matter... -- rgerhards, 2008-01-16
- */
- if(pThis->mut != NULL) {
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(pThis->mut);
}
- if(bEnqOnly == pThis->bEnqOnly)
- FINALIZE; /* no change, nothing to do */
-
- if(pThis->bQueueStarted) {
- /* we need to adjust queue operation only if we are not during initial param setup */
- if(bEnqOnly == 1) {
- /* switch to enqueue-only mode */
- /* this means we need to terminate all workers - that's it... */
- dbgoprint((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n");
- if(pThis->pWtpReg != NULL)
- wtpWakeupAllWrkr(pThis->pWtpReg);
- if(pThis->pWtpDA != NULL)
- wtpWakeupAllWrkr(pThis->pWtpDA);
- } else {
- /* switch back to regular mode */
- ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */
- }
- }
+ CHKiRet(doEnqSingleObj(pThis, flowCtlType, pUsr));
- pThis->bEnqOnly = bEnqOnly;
+ qqueueChkPersist(pThis, 1);
finalize_it:
- if(pThis->mut != NULL) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ /* make sure at least one worker is running. */
+ qqueueAdviseMaxWorkers(pThis);
+ /* and release the mutex */
+ d_pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ DBGOPRINT((obj_t*) pThis, "EnqueueMsg advised worker start\n");
}
+
RETiRet;
}
@@ -2415,6 +2399,7 @@ DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
DEFpropSetMeth(qqueue, pUsr, void*)
DEFpropSetMeth(qqueue, iDeqSlowdown, int)
+DEFpropSetMeth(qqueue, iDeqBatchSize, int)
DEFpropSetMeth(qqueue, sizeOnDiskMax, int64)
@@ -2433,8 +2418,6 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
if(isProp("iQueueSize")) {
pThis->iQueueSize = pProp->val.num;
- } else if(isProp("iUngottenObjs")) {
- pThis->iUngottenObjs = pProp->val.num;
} else if(isProp("tVars.disk.sizeOnDisk")) {
pThis->tVars.disk.sizeOnDisk = pProp->val.num;
} else if(isProp("tVars.disk.bytesRead")) {
@@ -2460,6 +2443,9 @@ BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(strm, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
/* now set our own handlers */
OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty);