summaryrefslogtreecommitdiff
path: root/runtime
diff options
context:
space:
mode:
authorMichael Biebl <biebl@debian.org>2012-12-07 15:57:10 +0100
committerMichael Biebl <biebl@debian.org>2012-12-07 15:57:10 +0100
commited0fad5385d95f30f7073bf3013e4ecabc4b29e4 (patch)
tree7445e112c605e9bbe04c70bf347fb4587a459d4b /runtime
parent1796f8e02b6d0bc29ab65427d2ebf97f82f41999 (diff)
downloadrsyslog-ed0fad5385d95f30f7073bf3013e4ecabc4b29e4.tar.gz
Imported Upstream version 7.2.4upstream/7.2.4
Diffstat (limited to 'runtime')
-rw-r--r--runtime/datetime.c5
-rw-r--r--runtime/debug.c8
-rw-r--r--runtime/module-template.h2
-rw-r--r--runtime/msg.c146
-rw-r--r--runtime/msg.h12
-rw-r--r--runtime/queue.c44
-rw-r--r--runtime/queue.h2
-rw-r--r--runtime/rsconf.c8
-rw-r--r--runtime/ruleset.c25
9 files changed, 90 insertions, 162 deletions
diff --git a/runtime/datetime.c b/runtime/datetime.c
index 10ab3c6..53bc165 100644
--- a/runtime/datetime.c
+++ b/runtime/datetime.c
@@ -900,6 +900,11 @@ time_t syslogTime2time_t(struct syslogTime *ts)
case 12:
MonthInDays = 334; //until 01 of December
break;
+ default: /* this cannot happen (and would be a program error)
+ * but we need the code to keep the compiler silent.
+ */
+ MonthInDays = 0; /* any value fits ;) */
+ break;
}
diff --git a/runtime/debug.c b/runtime/debug.c
index 307a8bb..1d306db 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -902,8 +902,12 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg)
lenCopy = lenMsg;
memcpy(pszWriteBuf + offsWriteBuf, pszMsg, lenCopy);
offsWriteBuf += lenCopy;
- if(stddbg != -1) write(stddbg, pszWriteBuf, offsWriteBuf);
- if(altdbg != -1) write(altdbg, pszWriteBuf, offsWriteBuf);
+ /* the write is included in an "if" just to silence compiler
+ * warnings. Here, we really don't care if the write fails, we
+ * have no good response to that in any case... -- rgerhards, 2012-11-28
+ */
+ if(stddbg != -1) if(write(stddbg, pszWriteBuf, offsWriteBuf)){};
+ if(altdbg != -1) if(write(altdbg, pszWriteBuf, offsWriteBuf)){};
bWasNL = (pszMsg[lenMsg - 1] == '\n') ? 1 : 0;
}
diff --git a/runtime/module-template.h b/runtime/module-template.h
index 72a139c..fe74bac 100644
--- a/runtime/module-template.h
+++ b/runtime/module-template.h
@@ -113,7 +113,7 @@ static rsRetVal modGetID(void **pID) \
/* macro to provide the v6 config system module name
*/
#define MODULE_CNFNAME(name) \
-static __attribute__((unused)) rsRetVal modGetCnfName(uchar **cnfName) \
+static rsRetVal modGetCnfName(uchar **cnfName) \
{ \
*cnfName = (uchar*) name; \
return RS_RET_OK;\
diff --git a/runtime/msg.c b/runtime/msg.c
index b627cd5..09f6d64 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -300,120 +300,20 @@ static uchar * jsonPathGetLeaf(uchar *name, int lenName);
static struct json_object *jsonDeepCopy(struct json_object *src);
-/* The following functions will support advanced output module
- * multithreading, once this is implemented. Currently, we
- * include them as hooks only. The idea is that we need to guard
- * some msg objects data fields against concurrent access if
- * we run on multiple threads. Please note that in any case this
- * is not necessary for calls from INPUT modules, because they
- * construct the message object and do this serially. Only when
- * the message is in the processing queue, multiple threads may
- * access a single object. Consequently, there are no guard functions
- * for "set" methods, as these are called during input. Only "get"
- * functions that modify important structures have them.
- * rgerhards, 2007-07-20
- * We now support locked and non-locked operations, depending on
- * the configuration of rsyslog. To support this, we use function
- * pointers. Initially, we start in non-locked mode. There, all
- * locking operations call into dummy functions. When locking is
- * enabled, the function pointers are changed to functions doing
- * actual work. We also introduced another MsgPrepareEnqueue() function
- * which initializes the locking structures, if needed. This is
- * necessary because internal messages during config file startup
- * processing are always created in non-locking mode. So we can
- * not initialize locking structures during constructions. We now
- * postpone this until when the message is fully constructed and
- * enqueued. Then we know the status of locking. This has a nice
- * side effect, and that is that during the initial creation of
- * the Msg object no locking needs to be done, which results in better
- * performance. -- rgerhards, 2008-01-05
- */
-static void (*funcLock)(msg_t *pMsg);
-static void (*funcUnlock)(msg_t *pMsg);
-static void (*funcDeleteMutex)(msg_t *pMsg);
-void (*funcMsgPrepareEnqueue)(msg_t *pMsg);
-#if 1 /* This is a debug aid */
-#define MsgLock(pMsg) funcLock(pMsg)
-#define MsgUnlock(pMsg) funcUnlock(pMsg)
-#else
-#define MsgLock(pMsg) {dbgprintf("MsgLock line %d\n - ", __LINE__); funcLock(pMsg);; }
-#define MsgUnlock(pMsg) {dbgprintf("MsgUnlock line %d - ", __LINE__); funcUnlock(pMsg); }
-#endif
-
-/* the next function is a dummy to be used by the looking functions
- * when the class is not yet running in an environment where locking
- * is necessary. Please note that the need to lock can (and will) change
- * during a single run. Typically, this is depending on the operation mode
- * of the message queues (which is operator-configurable). -- rgerhards, 2008-01-05
- */
-static void MsgLockingDummy(msg_t __attribute__((unused)) *pMsg)
-{
- /* empty be design */
-}
-
-
-/* The following function prepares a message for enqueue into the queue. This is
- * where a message may be accessed by multiple threads. This implementation here
- * is the version for multiple concurrent acces. It initializes the locking
- * structures.
- * TODO: change to an iRet interface! -- rgerhards, 2008-07-14
- */
-static void MsgPrepareEnqueueLockingCase(msg_t *pThis)
-{
- BEGINfunc
- assert(pThis != NULL);
- pthread_mutex_init(&pThis->mut, NULL);
- pThis->bDoLock = 1;
- ENDfunc
-}
-
-
-/* ... and now the locking and unlocking implementations: */
-static void MsgLockLockingCase(msg_t *pThis)
+/* the locking and unlocking implementations: */
+static inline void
+MsgLock(msg_t *pThis)
{
/* DEV debug only! dbgprintf("MsgLock(0x%lx)\n", (unsigned long) pThis); */
- assert(pThis != NULL);
- if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */
- pthread_mutex_lock(&pThis->mut);
+ pthread_mutex_lock(&pThis->mut);
}
-
-static void MsgUnlockLockingCase(msg_t *pThis)
+static inline void
+MsgUnlock(msg_t *pThis)
{
/* DEV debug only! dbgprintf("MsgUnlock(0x%lx)\n", (unsigned long) pThis); */
- assert(pThis != NULL);
- if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */
- pthread_mutex_unlock(&pThis->mut);
+ pthread_mutex_unlock(&pThis->mut);
}
-/* delete the mutex object on message destruction (locking case)
- */
-static void MsgDeleteMutexLockingCase(msg_t *pThis)
-{
- assert(pThis != NULL);
- pthread_mutex_destroy(&pThis->mut);
-}
-
-/* enable multiple concurrent access on the message object
- * This works on a class-wide basis and can bot be undone.
- * That is, if it is once enabled, it can not be disabled during
- * the same run. When this function is called, no other thread
- * must manipulate message objects. Then we would have race conditions,
- * but guarding against this is counter-productive because it
- * would cost additional time. Plus, it would be a programming error.
- * rgerhards, 2008-01-05
- */
-rsRetVal MsgEnableThreadSafety(void)
-{
- DEFiRet;
- funcLock = MsgLockLockingCase;
- funcUnlock = MsgUnlockLockingCase;
- funcMsgPrepareEnqueue = MsgPrepareEnqueueLockingCase;
- funcDeleteMutex = MsgDeleteMutexLockingCase;
- RETiRet;
-}
-
-/* end locking functions */
-
static inline int getProtocolVersion(msg_t *pM)
{
@@ -716,8 +616,6 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis)
/* initialize members in ORDER they appear in structure (think "cache line"!) */
pM->flowCtlType = 0;
- pM->bDoLock = 0;
- pM->bAlreadyFreed = 0;
pM->bParseSuccess = 0;
pM->iRefCount = 1;
pM->iSeverity = -1;
@@ -760,6 +658,7 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis)
pM->pszTIMESTAMP_Unix[0] = '\0';
pM->pszRcvdAt_Unix[0] = '\0';
pM->pszUUID = NULL;
+ pthread_mutex_init(&pM->mut, NULL);
/* DEV debugging only! dbgprintf("msgConstruct\t0x%x, ref 1\n", (int)pM);*/
@@ -849,15 +748,6 @@ CODESTARTobjDestruct(msg)
if(currRefCount == 0)
{
/* DEV Debugging Only! dbgprintf("msgDestruct\t0x%lx, RefCount now 0, doing DESTROY\n", (unsigned long)pThis); */
- /* The if below is included to try to nail down a well-hidden bug causing
- * segfaults. I hope that do to the test code the problem is sooner detected and
- * thus we get better data for debugging and resolving it. -- rgerhards, 2011-02-23.
- * TODO: remove when no longer needed.
- */
- if(pThis->bAlreadyFreed)
- abort();
- pThis->bAlreadyFreed = 1;
- /* end debug code */
if(pThis->pszRawMsg != pThis->szRawMsg)
free(pThis->pszRawMsg);
freeTAG(pThis);
@@ -895,7 +785,7 @@ CODESTARTobjDestruct(msg)
# ifndef HAVE_ATOMIC_BUILTINS
MsgUnlock(pThis);
# endif
- funcDeleteMutex(pThis);
+ pthread_mutex_destroy(&pThis->mut);
/* now we need to do our own optimization. Testing has shown that at least the glibc
* malloc() subsystem returns memory to the OS far too late in our case. So we need
* to help it a bit, by calling malloc_trim(), which will tell the alloc subsystem
@@ -3656,18 +3546,6 @@ finalize_it:
#undef isProp
-/* This is a construction finalizer that must be called after all properties
- * have been set. It does some final work on the message object. After this
- * is done, the object is considered ready for full processing.
- * rgerhards, 2008-07-08
- */
-static rsRetVal msgConstructFinalizer(msg_t *pThis)
-{
- MsgPrepareEnqueue(pThis);
- return RS_RET_OK;
-}
-
-
/* get the severity - this is an entry point that
* satisfies the base object class getSeverity semantics.
* rgerhards, 2008-01-14
@@ -3977,13 +3855,7 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE)
/* set our own handlers */
OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize);
OBJSetMethodHandler(objMethod_SETPROPERTY, MsgSetProperty);
- OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, msgConstructFinalizer);
OBJSetMethodHandler(objMethod_GETSEVERITY, MsgGetSeverity);
- /* initially, we have no need to lock message objects */
- funcLock = MsgLockingDummy;
- funcUnlock = MsgLockingDummy;
- funcDeleteMutex = MsgLockingDummy;
- funcMsgPrepareEnqueue = MsgLockingDummy;
/* some more inits */
# if HAVE_MALLOC_TRIM
INIT_ATOMIC_HELPER_MUT(mutTrimCtr);
diff --git a/runtime/msg.h b/runtime/msg.h
index 396e861..ab47900 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -64,7 +64,6 @@ struct msg {
once data has entered the queue, this property is no longer needed. */
pthread_mutex_t mut;
int iRefCount; /* reference counter (0 = unused) */
- sbool bDoLock; /* use the mutex? */
sbool bAlreadyFreed; /* aid to help detect a well-hidden bad bug -- TODO: remove when no longer needed */
sbool bParseSuccess; /* set to reflect state of last executed higher level parser */
short iSeverity; /* the severity 0..7 */
@@ -177,7 +176,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
char *textpri(char *pRes, size_t pResLen, int pri);
rsRetVal msgGetMsgVar(msg_t *pThis, cstr_t *pstrPropName, var_t **ppVar);
es_str_t* msgGetMsgVarNew(msg_t *pThis, uchar *name);
-rsRetVal MsgEnableThreadSafety(void);
uchar *getRcvFrom(msg_t *pM);
void getTAG(msg_t *pM, uchar **ppBuf, int *piLen);
char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt);
@@ -213,16 +211,6 @@ msgUnsetJSON(msg_t *pMsg, uchar *varname) {
}
-/* The MsgPrepareEnqueue() function is a macro for performance reasons.
- * It needs one global variable to work. This is acceptable, as it gains
- * us quite some performance and is fully abstracted using this header file.
- * The important thing is that no other module is permitted to actually
- * access that global variable! -- rgerhards, 2008-01-05
- */
-extern void (*funcMsgPrepareEnqueue)(msg_t *pMsg);
-#define MsgPrepareEnqueue(pMsg) funcMsgPrepareEnqueue(pMsg)
-
-
/* ------------------------------ some inline functions ------------------------------ */
/* set raw message size. This is needed in some cases where a trunctation is necessary
diff --git a/runtime/queue.c b/runtime/queue.c
index fbf7710..bb40e54 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -242,6 +242,9 @@ getQueueTypeName(queueType_t t)
case QUEUETYPE_DIRECT:
r = "Direct";
break;
+ default:
+ r = "unknown queue type";
+ break;
}
return r;
}
@@ -1379,7 +1382,7 @@ finalize_it:
}
-/* set default inisde queue object suitable for action queues.
+/* set default inside queue object suitable for action queues.
* This shall be called directly after queue construction. This functions has
* been added in support of the new v6 config system. It expect properly pre-initialized
* objects, but we need to differentiate between ruleset main and action queues.
@@ -1413,6 +1416,36 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
}
+/* set defaults inside queue object suitable for main/ruleset queues.
+ * See queueSetDefaultsActionQueue() for more details and background.
+ */
+void
+qqueueSetDefaultsRulesetQueue(qqueue_t *pThis)
+{
+ pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */
+ pThis->iMaxQueueSize = 50000; /* size of the main message queue above */
+ pThis->iDeqBatchSize = 1024; /* default batch size */
+ pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */
+ pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */
+ pThis->iDiscardMrk = 49500; /* begin to discard messages */
+ pThis->iDiscardSeverity = 8; /* turn off */
+ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
+ pThis->iMaxFileSize = 16*1024*1024;
+ pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */
+ pThis->bSyncQueueFiles = 0;
+ pThis->toQShutdown = 1500; /* queue shutdown */
+ pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */
+ pThis->toEnq = 2000; /* timeout for queue enque */
+ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */
+ pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */
+ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+ pThis->sizeOnDiskMax = 0; /* unlimited */
+ pThis->iDeqSlowdown = 0;
+ pThis->iDeqtWinFromHr = 0;
+ pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
+}
+
+
/* This function checks if the provided message shall be discarded and does so, if needed.
* In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
* provide real-time creation of spool files.
@@ -2675,6 +2708,15 @@ qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals)
return RS_RET_OK;
}
+
+/* are any queue params set at all? 1 - yes, 0 - no */
+int
+queueCnfParamsSet(struct cnfparamvals *pvals)
+{
+ return cnfparamvalsIsSet(&pblk, pvals);
+}
+
+
/* apply all params from param block to queue. Must be called before
* finalizing. This supports the v6 config system. Defaults were already
* set during queue creation. The pvals object is destructed by this
diff --git a/runtime/queue.h b/runtime/queue.h
index edb770c..91c100e 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -193,7 +193,9 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals);
+int queueCnfParamsSet(struct cnfparamvals *pvals);
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals);
+void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis);
void qqueueSetDefaultsActionQueue(qqueue_t *pThis);
void qqueueDbgPrint(qqueue_t *pThis);
diff --git a/runtime/rsconf.c b/runtime/rsconf.c
index dcaa1ad..ac9cd80 100644
--- a/runtime/rsconf.c
+++ b/runtime/rsconf.c
@@ -292,6 +292,9 @@ getNOW(eNOWType eNow, es_str_t **estr)
case NOW_MINUTE:
len = snprintf((char*) szBuf, sizeof(szBuf)/sizeof(uchar), "%2.2d", t.minute);
break;
+ default:
+ len = snprintf((char*) szBuf, sizeof(szBuf)/sizeof(uchar), "*invld eNow*");
+ break;
}
/* now create a string object out of it and hand that over to the var */
@@ -476,6 +479,9 @@ cnfGetVar(char *name, void *usrptr)
estr = msgGetCEEVarNew((msg_t*) usrptr, name+2);
else
estr = msgGetMsgVarNew((msg_t*) usrptr, (uchar*)name+1);
+ } else { /* if this happens, we have a program logic error */
+ estr = es_newStrFromCStr("err: var must start with $",
+ strlen("err: var must start with $"));
}
if(Debug) {
char *s;
@@ -752,7 +758,7 @@ activateMainQueue()
{
DEFiRet;
/* create message queue */
- CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"))) {
+ CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"), NULL)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
FINALIZE;
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 8d2bb92..3459f54 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -844,7 +844,7 @@ doRulesetCreateQueue(rsconf_t *conf, int *pNewVal)
rsname = (conf->rulesets.pCurr->pszName == NULL) ? (uchar*) "[ruleset]" : conf->rulesets.pCurr->pszName;
DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname);
- CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname));
+ CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname, NULL));
finalize_it:
RETiRet;
@@ -904,6 +904,7 @@ rsRetVal
rulesetProcessCnf(struct cnfobj *o)
{
struct cnfparamvals *pvals;
+ struct cnfparamvals *queueParams;
rsRetVal localRet;
uchar *rsName = NULL;
uchar *parserName;
@@ -911,6 +912,7 @@ rulesetProcessCnf(struct cnfobj *o)
ruleset_t *pRuleset;
struct cnfarray *ar;
int i;
+ uchar *rsname;
DEFiRet;
pvals = nvlstGetParams(o->nvlst, &rspblk, NULL);
@@ -938,14 +940,21 @@ rulesetProcessCnf(struct cnfobj *o)
/* we have only two params, so we do NOT do the usual param loop */
parserIdx = cnfparamGetIdx(&rspblk, "parser");
- if(parserIdx == -1 || !pvals[parserIdx].bUsed)
- FINALIZE;
+ if(parserIdx != -1 && pvals[parserIdx].bUsed) {
+ ar = pvals[parserIdx].val.d.ar;
+ for(i = 0 ; i < ar->nmemb ; ++i) {
+ parserName = (uchar*)es_str2cstr(ar->arr[i], NULL);
+ doRulesetAddParser(pRuleset, parserName);
+ free(parserName);
+ }
+ }
- ar = pvals[parserIdx].val.d.ar;
- for(i = 0 ; i < ar->nmemb ; ++i) {
- parserName = (uchar*)es_str2cstr(ar->arr[i], NULL);
- doRulesetAddParser(pRuleset, parserName);
- free(parserName);
+ /* pick up ruleset queue parameters */
+ qqueueDoCnfParams(o->nvlst, &queueParams);
+ if(queueCnfParamsSet(queueParams)) {
+ rsname = (pRuleset->pszName == NULL) ? (uchar*) "[ruleset]" : pRuleset->pszName;
+ DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname);
+ CHKiRet(createMainQueue(&pRuleset->pQueue, rsname, queueParams));
}
finalize_it: