diff options
author | Michael Biebl <biebl@debian.org> | 2014-08-18 17:48:20 +0200 |
---|---|---|
committer | Michael Biebl <biebl@debian.org> | 2014-08-18 17:48:20 +0200 |
commit | 1dfcd909d90f6fad4a612b6fd998d7473a9da399 (patch) | |
tree | b9fb75cfd347088f1850b48e298c5db564304ece /runtime/queue.c | |
parent | daeb0d03d4a65fa118ad25b34958fb9cacbbd6f4 (diff) | |
download | rsyslog-1dfcd909d90f6fad4a612b6fd998d7473a9da399.tar.gz |
Imported Upstream version 8.4.0upstream/8.4.0
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 31 |
1 files changed, 30 insertions, 1 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index e1d46d5..8d10064 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -73,6 +73,11 @@ DEFobjCurrIf(errmsg) DEFobjCurrIf(datetime) DEFobjCurrIf(statsobj) + +#ifdef ENABLE_IMDIAG +unsigned int iOverallQueueSize = 0; +#endif + /* forward-definitions */ static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg); static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); @@ -1030,6 +1035,14 @@ qqueueAdd(qqueue_t *pThis, msg_t *pMsg) if(pThis->qType != QUEUETYPE_DIRECT) { ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); +# ifdef ENABLE_IMDIAG +# ifdef HAVE_ATOMIC_BUILTINS + /* mutex is never used due to conditional compilation */ + ATOMIC_INC(&iOverallQueueSize, &NULL); +# else + ++iOverallQueueSize; /* racy, but we can't wait for a mutex! */ +# endif +# endif DBGOPRINT((obj_t*) pThis, "qqueueAdd: entry added, size now log %d, phys %d entries\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); } @@ -1491,6 +1504,14 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize); +# ifdef ENABLE_IMDIAG +# ifdef HAVE_ATOMIC_BUILTINS + /* mutex is never used due to conditional compilation */ + ATOMIC_SUB(&iOverallQueueSize, nElem, &NULL); +# else + iOverallQueueSize -= nElem; /* racy, but we can't wait for a mutex! */ +# endif +# endif ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq); DBGPRINTF("doDeleteBatch: delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); @@ -2222,7 +2243,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ * influenced by properties which might have been set after queueConstruct () */ if(pThis->pqParent == NULL) { - pThis->mut = (pthread_mutex_t *) MALLOC (sizeof (pthread_mutex_t)); + CHKmalloc(pThis->mut = (pthread_mutex_t *) MALLOC (sizeof (pthread_mutex_t))); pthread_mutex_init(pThis->mut, NULL); } else { /* child queue, we need to use parent's mutex */ @@ -2321,6 +2342,11 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); finalize_it: + if(iRet != RS_RET_OK) { + /* note: a child uses it's parent mutex, so do not delete it! */ + if(pThis->pqParent == NULL && pThis->mut != NULL) + free(pThis->mut); + } RETiRet; } @@ -3059,6 +3085,9 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp) if(isProp("iQueueSize")) { pThis->iQueueSize = pProp->val.num; +# ifdef ENABLE_IMDIAG + iOverallQueueSize += pThis->iQueueSize; +# endif } else if(isProp("tVars.disk.sizeOnDisk")) { pThis->tVars.disk.sizeOnDisk = pProp->val.num; } else if(isProp("qType")) { |