summaryrefslogtreecommitdiff
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c242
1 files changed, 109 insertions, 133 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index bb40e54..4c8d3ac 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -59,7 +59,6 @@
#include "datetime.h"
#include "unicode-helper.h"
#include "statsobj.h"
-#include "msg.h" /* TODO: remove once we remove MsgAddRef() call */
#ifdef OS_SOLARIS
# include <sched.h>
@@ -74,7 +73,7 @@ DEFobjCurrIf(datetime)
DEFobjCurrIf(statsobj)
/* forward-definitions */
-static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr);
+static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg);
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
@@ -83,7 +82,7 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
-static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr);
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
@@ -131,7 +130,7 @@ static inline void displayBatchState(batch_t *pBatch)
{
int i;
for(i = 0 ; i < pBatch->nElem ; ++i) {
- DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state);
+ DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->eltState[i]);
}
}
@@ -242,8 +241,8 @@ getQueueTypeName(queueType_t t)
case QUEUETYPE_DIRECT:
r = "Direct";
break;
- default:
- r = "unknown queue type";
+ default:
+ r = "invalid/unknown queue mode";
break;
}
return r;
@@ -315,16 +314,16 @@ getLogicalQueueSize(qqueue_t *pThis)
*/
static inline void queueDrain(qqueue_t *pThis)
{
- void *pUsr;
+ msg_t *pMsg;
ASSERT(pThis != NULL);
BEGINfunc
DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
- pThis->qDeq(pThis, &pUsr);
- if(pUsr != NULL) {
- objDestruct(pUsr);
+ pThis->qDeq(pThis, &pMsg);
+ if(pMsg != NULL) {
+ msgDestruct(&pMsg);
}
pThis->qDel(pThis);
}
@@ -414,7 +413,7 @@ StartDA(qqueue_t *pThis)
*/
pThis->pqDA->pqParent = pThis;
- CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr));
+ CHKiRet(qqueueSetpAction(pThis->pqDA, pThis->pAction));
CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
@@ -549,7 +548,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis)
}
-static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
+static rsRetVal qAddFixedArray(qqueue_t *pThis, msg_t* in)
{
DEFiRet;
@@ -563,7 +562,7 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
}
-static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out)
+static rsRetVal qDeqFixedArray(qqueue_t *pThis, msg_t **out)
{
DEFiRet;
@@ -624,7 +623,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
RETiRet;
}
-static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
+static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pMsg)
{
qLinkedList_t *pEntry;
DEFiRet;
@@ -632,7 +631,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t))));
pEntry->pNext = NULL;
- pEntry->pUsr = pUsr;
+ pEntry->pMsg = pMsg;
if(pThis->tVars.linklist.pDelRoot == NULL) {
pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry;
@@ -650,14 +649,13 @@ finalize_it:
}
-static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr)
+static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppMsg)
{
qLinkedList_t *pEntry;
DEFiRet;
pEntry = pThis->tVars.linklist.pDeqRoot;
- ISOBJ_TYPE_assert(pEntry->pUsr, msg);
- *ppUsr = pEntry->pUsr;
+ *ppMsg = pEntry->pMsg;
pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
RETiRet;
@@ -747,18 +745,12 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
{
DEFiRet;
strm_t *psQIF = NULL;
- uchar pszQIFNam[MAXFNAME];
- size_t lenQIFNam;
struct stat stat_buf;
ISOBJ_TYPE_assert(pThis, qqueue);
- /* Construct file name */
- lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
- (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
-
/* check if the file exists */
- if(stat((char*) pszQIFNam, &stat_buf) == -1) {
+ if(stat((char*) pThis->pszQIFNam, &stat_buf) == -1) {
if(errno == ENOENT) {
DBGOPRINT((obj_t*) pThis, "clean startup, no .qi file found\n");
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
@@ -773,7 +765,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
CHKiRet(strm.Construct(&psQIF));
CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_READ));
CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE));
- CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam));
+ CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam));
CHKiRet(strm.ConstructFinalize(psQIF));
/* first, we try to read the property bag for ourselfs */
@@ -785,9 +777,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
- /* create a duplicate for the read "pointer".
- */
-
+ /* create a duplicate for the read "pointer". */
CHKiRet(strm.Dup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq));
@@ -806,7 +796,7 @@ finalize_it:
strm.Destruct(&psQIF);
if(iRet != RS_RET_OK) {
- DBGOPRINT((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n",
+ DBGOPRINT((obj_t*) pThis, "state %d reading .qi file - can not read persisted info (if any)\n",
iRet);
}
@@ -898,7 +888,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis)
RETiRet;
}
-static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
+static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pMsg)
{
DEFiRet;
number_t nWriteCount;
@@ -906,7 +896,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
ASSERT(pThis != NULL);
CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, &nWriteCount));
- CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite));
+ CHKiRet((objSerialize(pMsg))(pMsg, pThis->tVars.disk.pWrite));
CHKiRet(strm.Flush(pThis->tVars.disk.pWrite));
CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */
@@ -916,7 +906,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
* the in-memory representation. The instance will be re-created upon
* dequeue. -- rgerhards, 2008-07-09
*/
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n",
nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly);
@@ -926,43 +916,11 @@ finalize_it:
}
-static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr)
+static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg)
{
DEFiRet;
- iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL);
- RETiRet;
-}
-
-
-static rsRetVal qDelDisk(qqueue_t *pThis)
-{
- obj_t *pDummyObj; /* we need to deserialize it... */
- DEFiRet;
-
- int64 offsIn;
- int64 offsOut;
-
- CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn));
- CHKiRet(obj.Deserialize(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL));
- objDestruct(pDummyObj);
- CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut));
-
- /* This time it is a bit tricky: we free disk space only upon file deletion. So we need
- * to keep track of what we have read until we get an out-offset that is lower than the
- * in-offset (which indicates file change). Then, we can subtract the whole thing from
- * the on-disk size. -- rgerhards, 2008-01-30
- */
- if(offsIn < offsOut) {
- pThis->tVars.disk.bytesRead += offsOut - offsIn;
- } else {
- pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead;
- pThis->tVars.disk.bytesRead = offsOut;
- DBGOPRINT((obj_t*) pThis, "a file has been deleted, now %lld octets disk space used\n", pThis->tVars.disk.sizeOnDisk);
- /* awake possibly waiting enq process */
- pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
- }
-
-finalize_it:
+ iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", 3, pThis->tVars.disk.pReadDeq, NULL,
+ NULL, msgConstructForDeserializer, NULL, MsgDeserialize);
RETiRet;
}
@@ -979,10 +937,11 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
-static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
{
batch_t singleBatch;
batch_obj_t batchObj;
+ batch_state_t batchState = BATCH_STATE_RDY;
sbool active = 1;
int i;
DEFiRet;
@@ -1000,17 +959,17 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
*/
memset(&batchObj, 0, sizeof(batch_obj_t));
memset(&singleBatch, 0, sizeof(batch_t));
- batchObj.state = BATCH_STATE_RDY;
- batchObj.pUsrp = (obj_t*) pUsr;
+ batchObj.pMsg = pMsg;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
+ singleBatch.eltState = &batchState;
singleBatch.active = &active;
- iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
+ iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate);
/* delete the batch string params: TODO: create its own "class" for this */
for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
free(batchObj.staticActStrings[i]);
}
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
RETiRet;
}
@@ -1032,7 +991,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
* We use our knowledge about the batch_t structure below, but without that, we
* pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate);
+ iRet = pThis->pConsumer(pThis->pAction, pBatch, &pThis->bShutdownImmediate);
RETiRet;
}
@@ -1053,13 +1012,13 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
* things truely different. -- rgerhards, 2008-02-12
*/
static rsRetVal
-qqueueAdd(qqueue_t *pThis, void *pUsr)
+qqueueAdd(qqueue_t *pThis, msg_t *pMsg)
{
DEFiRet;
ASSERT(pThis != NULL);
- CHKiRet(pThis->qAdd(pThis, pUsr));
+ CHKiRet(pThis->qAdd(pThis, pMsg));
if(pThis->qType != QUEUETYPE_DIRECT) {
ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
@@ -1075,7 +1034,7 @@ finalize_it:
/* generic code to dequeue a queue entry
*/
static rsRetVal
-qqueueDeq(qqueue_t *pThis, void **ppUsr)
+qqueueDeq(qqueue_t *pThis, msg_t **ppMsg)
{
DEFiRet;
@@ -1086,7 +1045,7 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr)
* If we decrement, however, we may lose a message. But that is better than
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
- iRet = pThis->qDeq(pThis, ppUsr);
+ iRet = pThis->qDeq(pThis, ppMsg);
ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq);
// DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
@@ -1184,11 +1143,11 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
rsRetVal iRetLocal;
DEFiRet;
-RUNLOG_STR("trying to shutdown workers within Action Timeout");
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
+ DBGOPRINT((obj_t*) pThis, "trying to shutdown workers within Action Timeout");
DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n");
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
@@ -1397,7 +1356,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
pThis->iDeqBatchSize = 128; /* default batch size */
pThis->iHighWtrMrk = 800; /* high water mark for disk-assisted queues */
pThis->iLowWtrMrk = 200; /* low water mark for disk-assisted queues */
- pThis->iDiscardMrk = 9800; /* begin to discard messages */
+ pThis->iDiscardMrk = 980; /* begin to discard messages */
pThis->iDiscardSeverity = 8; /* turn off */
pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
pThis->iMaxFileSize = 1024*1024;
@@ -1459,22 +1418,21 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis)
* the return state!
* rgerhards, 2008-01-24
*/
-static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr)
+static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pMsg)
{
DEFiRet;
rsRetVal iRetLocal;
int iSeverity;
ISOBJ_TYPE_assert(pThis, qqueue);
- ISOBJ_assert(pUsr);
if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) {
- iRetLocal = objGetSeverity(pUsr, &iSeverity);
+ iRetLocal = MsgGetSeverity(pMsg, &iSeverity);
if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n",
iQueueSize, iSeverity);
STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd);
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg "
@@ -1493,13 +1451,32 @@ static inline rsRetVal
DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
{
int i;
+ off64_t bytesDel;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
/* now send delete request to storage driver */
- for(i = 0 ; i < nElem ; ++i) {
- pThis->qDel(pThis);
+ if(pThis->qType == QUEUETYPE_DISK) {
+ strmMultiFileSeek(pThis->tVars.disk.pReadDel, pThis->tVars.disk.deqFileNumOut,
+ pThis->tVars.disk.deqOffs, &bytesDel);
+ /* We need to correct the on-disk file size. This time it is a bit tricky:
+ * we free disk space only upon file deletion. So we need to keep track of what we
+ * have read until we get an out-offset that is lower than the in-offset (which
+ * indicates file change). Then, we can subtract the whole thing from the on-disk
+ * size. -- rgerhards, 2008-01-30
+ */
+ if(bytesDel != 0) {
+ pThis->tVars.disk.sizeOnDisk -= bytesDel;
+ DBGOPRINT((obj_t*) pThis, "a %lld octet file has been deleted, now %lld octets disk "
+ "space used\n", bytesDel, pThis->tVars.disk.sizeOnDisk);
+ /* awake possibly waiting enq process */
+ pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
+ }
+ } else { /* memory queue */
+ for(i = 0 ; i < nElem ; ++i) {
+ pThis->qDel(pThis);
+ }
}
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
@@ -1560,7 +1537,7 @@ static inline rsRetVal
DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
{
int i;
- void *pUsr;
+ msg_t *pMsg;
int nEnqueued = 0;
rsRetVal localRet;
DEFiRet;
@@ -1569,17 +1546,16 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
assert(pBatch != NULL);
for(i = 0 ; i < pBatch->nElem ; ++i) {
- pUsr = pBatch->pElem[i].pUsrp;
- if( pBatch->pElem[i].state == BATCH_STATE_RDY
- || pBatch->pElem[i].state == BATCH_STATE_SUB) {
- localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
- (obj_t*)MsgAddRef((msg_t*) pUsr));
+ pMsg = pBatch->pElem[i].pMsg;
+ if( pBatch->eltState[i] == BATCH_STATE_RDY
+ || pBatch->eltState[i] == BATCH_STATE_SUB) {
+ localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
++nEnqueued;
if(localRet != RS_RET_OK) {
DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
}
}
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
}
DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
@@ -1611,7 +1587,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
int nDiscarded;
int nDeleted;
int iQueueSize;
- void *pUsr;
+ msg_t *pMsg;
rsRetVal localRet;
DEFiRet;
@@ -1619,11 +1595,14 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
DeleteProcessedBatch(pThis, &pWti->batch);
nDequeued = nDiscarded = 0;
+ if(pThis->qType == QUEUETYPE_DISK) {
+ pThis->tVars.disk.deqFileNumIn = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq);
+ }
while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
- CHKiRet(qqueueDeq(pThis, &pUsr));
+ CHKiRet(qqueueDeq(pThis, &pMsg));
/* check if we should discard this element */
- localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr);
+ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg);
if(localRet == RS_RET_QUEUE_FULL) {
++nDiscarded;
continue;
@@ -1632,11 +1611,16 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
}
/* all well, use this element */
- pWti->batch.pElem[nDequeued].pUsrp = pUsr;
- pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
+ pWti->batch.pElem[nDequeued].pMsg = pMsg;
+ pWti->batch.eltState[nDequeued] = BATCH_STATE_RDY;
++nDequeued;
}
+ if(pThis->qType == QUEUETYPE_DISK) {
+ strm.GetCurrOffset(pThis->tVars.disk.pReadDeq, &pThis->tVars.disk.deqOffs);
+ pThis->tVars.disk.deqFileNumOut = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq);
+ }
+
/* it is sufficient to persist only when the bulk of work is done */
qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
@@ -1644,7 +1628,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
pWti->batch.nElemDeq = nDequeued + nDiscarded;
pWti->batch.deqID = getNextDeqID(pThis);
*piRemainingQueueSize = iQueueSize;
-
finalize_it:
RETiRet;
}
@@ -1680,7 +1663,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
- // TODO: MULTI: check physical queue size?
pthread_cond_signal(&pThis->notFull);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
@@ -1874,7 +1856,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
/* at this spot, we may be cancelled */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
- CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate));
+ CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -1932,13 +1914,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
- /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
- * the message. So far, we simply assume we always have msg_t, what currently is always the case.
- * rgerhards, 2009-05-28
- */
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY,
- (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
- pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
+ CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY,
+ MsgAddRef(pWti->batch.pElem[i].pMsg)));
+ pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */
}
/* but now cancellation is no longer permitted */
@@ -2018,6 +1996,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
uchar pszBuf[64];
+ uchar pszQIFNam[MAXFNAME];
int wrk;
uchar *qName;
size_t lenBuf;
@@ -2040,8 +2019,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->qConstruct = qConstructLinkedList;
pThis->qDestruct = qDestructLinkedList;
pThis->qAdd = qAddLinkedList;
- pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
- pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->qDeq = qDeqLinkedList;
+ pThis->qDel = qDelLinkedList;
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_DISK:
@@ -2049,10 +2028,16 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->qDestruct = qDestructDisk;
pThis->qAdd = qAddDisk;
pThis->qDeq = qDeqDisk;
- pThis->qDel = qDelDisk;
+ pThis->qDel = NULL; /* delete for disk handled via special code! */
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
+ /* pre-construct file name for .qi file */
+ pThis->lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar),
+ "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
+ pThis->pszQIFNam = ustrdup(pszQIFNam);
+ DBGOPRINT((obj_t*) pThis, ".qi file name is '%s', len %d\n", pThis->pszQIFNam,
+ (int) pThis->lenQIFNam);
break;
case QUEUETYPE_DIRECT:
pThis->qConstruct = qConstructDirect;
@@ -2179,7 +2164,7 @@ finalize_it:
}
-/* persist the queue to disk. If we have something to persist, we first
+/* persist the queue to disk (write the .qi file). If we have something to persist, we first
* save the information on the queue properties itself and then we call
* the queue-type specific drivers.
* Variable bIsCheckpoint is set to 1 if the persist is for a checkpoint,
@@ -2190,8 +2175,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
{
DEFiRet;
strm_t *psQIF = NULL; /* Queue Info File */
- uchar pszQIFNam[MAXFNAME];
- size_t lenQIFNam;
ASSERT(pThis != NULL);
@@ -2209,13 +2192,9 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
DBGOPRINT((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis));
- /* Construct file name */
- lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
- (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
-
if((bIsCheckpoint != QUEUE_CHECKPOINT) && (getPhysicalQueueSize(pThis) == 0)) {
if(pThis->bNeedDelQIF) {
- unlink((char*)pszQIFNam);
+ unlink((char*)pThis->pszQIFNam);
pThis->bNeedDelQIF = 0;
}
/* indicate spool file needs to be deleted */
@@ -2228,7 +2207,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE_TRUNC));
CHKiRet(strm.SetbSync(psQIF, pThis->bSyncQueueFiles));
CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE));
- CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam));
+ CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam));
CHKiRet(strm.ConstructFinalize(psQIF));
/* first, write the property bag for ourselfs
@@ -2240,7 +2219,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64);
- objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64);
CHKiRet(obj.EndSerialize(psQIF));
/* now persist the stream info */
@@ -2470,7 +2448,7 @@ finalize_it:
* rgerhards, 2009-06-16
*/
static inline rsRetVal
-doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
{
DEFiRet;
int err;
@@ -2479,7 +2457,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued);
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
*/
- CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr));
+ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg));
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
@@ -2556,7 +2534,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
if(pThis->toEnq == 0 || pThis->bEnqOnly) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n");
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq);
@@ -2568,7 +2546,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n");
@@ -2576,7 +2554,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
}
/* and finally enqueue the message */
- CHKiRet(qqueueAdd(pThis, pUsr));
+ CHKiRet(qqueueAdd(pThis, pMsg));
STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize);
finalize_it:
@@ -2652,11 +2630,11 @@ finalize_it:
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr)
+qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
- iRet = qAddDirect(pThis, pUsr);
+ iRet = qAddDirect(pThis, pMsg);
RETiRet;
}
@@ -2665,7 +2643,7 @@ qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr)
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
{
DEFiRet;
int iCancelStateSave;
@@ -2677,7 +2655,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
d_pthread_mutex_lock(pThis->mut);
}
- CHKiRet(doEnqSingleObj(pThis, flowCtlType, pUsr));
+ CHKiRet(doEnqSingleObj(pThis, flowCtlType, pMsg));
qqueueChkPersist(pThis, 1);
@@ -2806,7 +2784,7 @@ DEFpropSetMeth(qqueue, iLightDlyMrk, int)
DEFpropSetMeth(qqueue, bIsDA, int)
DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
-DEFpropSetMeth(qqueue, pUsr, void*)
+DEFpropSetMeth(qqueue, pAction, action_t*)
DEFpropSetMeth(qqueue, iDeqSlowdown, int)
DEFpropSetMeth(qqueue, iDeqBatchSize, int)
DEFpropSetMeth(qqueue, sizeOnDiskMax, int64)
@@ -2829,8 +2807,6 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
pThis->iQueueSize = pProp->val.num;
} else if(isProp("tVars.disk.sizeOnDisk")) {
pThis->tVars.disk.sizeOnDisk = pProp->val.num;
- } else if(isProp("tVars.disk.bytesRead")) {
- pThis->tVars.disk.bytesRead = pProp->val.num;
} else if(isProp("qType")) {
if(pThis->qType != pProp->val.num)
ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH);