summaryrefslogtreecommitdiff
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorMichael Biebl <biebl@debian.org>2014-08-18 17:48:20 +0200
committerMichael Biebl <biebl@debian.org>2014-08-18 17:48:20 +0200
commit1dfcd909d90f6fad4a612b6fd998d7473a9da399 (patch)
treeb9fb75cfd347088f1850b48e298c5db564304ece /runtime/queue.c
parentdaeb0d03d4a65fa118ad25b34958fb9cacbbd6f4 (diff)
downloadrsyslog-1dfcd909d90f6fad4a612b6fd998d7473a9da399.tar.gz
Imported Upstream version 8.4.0upstream/8.4.0
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c31
1 files changed, 30 insertions, 1 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index e1d46d5..8d10064 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -73,6 +73,11 @@ DEFobjCurrIf(errmsg)
DEFobjCurrIf(datetime)
DEFobjCurrIf(statsobj)
+
+#ifdef ENABLE_IMDIAG
+unsigned int iOverallQueueSize = 0;
+#endif
+
/* forward-definitions */
static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg);
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
@@ -1030,6 +1035,14 @@ qqueueAdd(qqueue_t *pThis, msg_t *pMsg)
if(pThis->qType != QUEUETYPE_DIRECT) {
ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
+# ifdef ENABLE_IMDIAG
+# ifdef HAVE_ATOMIC_BUILTINS
+ /* mutex is never used due to conditional compilation */
+ ATOMIC_INC(&iOverallQueueSize, &NULL);
+# else
+ ++iOverallQueueSize; /* racy, but we can't wait for a mutex! */
+# endif
+# endif
DBGOPRINT((obj_t*) pThis, "qqueueAdd: entry added, size now log %d, phys %d entries\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
}
@@ -1491,6 +1504,14 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize);
+# ifdef ENABLE_IMDIAG
+# ifdef HAVE_ATOMIC_BUILTINS
+ /* mutex is never used due to conditional compilation */
+ ATOMIC_SUB(&iOverallQueueSize, nElem, &NULL);
+# else
+ iOverallQueueSize -= nElem; /* racy, but we can't wait for a mutex! */
+# endif
+# endif
ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq);
DBGPRINTF("doDeleteBatch: delete batch from store, new sizes: log %d, phys %d\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
@@ -2222,7 +2243,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
* influenced by properties which might have been set after queueConstruct ()
*/
if(pThis->pqParent == NULL) {
- pThis->mut = (pthread_mutex_t *) MALLOC (sizeof (pthread_mutex_t));
+ CHKmalloc(pThis->mut = (pthread_mutex_t *) MALLOC (sizeof (pthread_mutex_t)));
pthread_mutex_init(pThis->mut, NULL);
} else {
/* child queue, we need to use parent's mutex */
@@ -2321,6 +2342,11 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(statsobj.ConstructFinalize(pThis->statsobj));
finalize_it:
+ if(iRet != RS_RET_OK) {
+ /* note: a child uses it's parent mutex, so do not delete it! */
+ if(pThis->pqParent == NULL && pThis->mut != NULL)
+ free(pThis->mut);
+ }
RETiRet;
}
@@ -3059,6 +3085,9 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
if(isProp("iQueueSize")) {
pThis->iQueueSize = pProp->val.num;
+# ifdef ENABLE_IMDIAG
+ iOverallQueueSize += pThis->iQueueSize;
+# endif
} else if(isProp("tVars.disk.sizeOnDisk")) {
pThis->tVars.disk.sizeOnDisk = pProp->val.num;
} else if(isProp("qType")) {