diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/datetime.c | 5 | ||||
-rw-r--r-- | runtime/debug.c | 8 | ||||
-rw-r--r-- | runtime/module-template.h | 2 | ||||
-rw-r--r-- | runtime/msg.c | 146 | ||||
-rw-r--r-- | runtime/msg.h | 12 | ||||
-rw-r--r-- | runtime/queue.c | 44 | ||||
-rw-r--r-- | runtime/queue.h | 2 | ||||
-rw-r--r-- | runtime/rsconf.c | 8 | ||||
-rw-r--r-- | runtime/ruleset.c | 25 |
9 files changed, 90 insertions, 162 deletions
diff --git a/runtime/datetime.c b/runtime/datetime.c index 10ab3c6..53bc165 100644 --- a/runtime/datetime.c +++ b/runtime/datetime.c @@ -900,6 +900,11 @@ time_t syslogTime2time_t(struct syslogTime *ts) case 12: MonthInDays = 334; //until 01 of December break; + default: /* this cannot happen (and would be a program error) + * but we need the code to keep the compiler silent. + */ + MonthInDays = 0; /* any value fits ;) */ + break; } diff --git a/runtime/debug.c b/runtime/debug.c index 307a8bb..1d306db 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -902,8 +902,12 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg) lenCopy = lenMsg; memcpy(pszWriteBuf + offsWriteBuf, pszMsg, lenCopy); offsWriteBuf += lenCopy; - if(stddbg != -1) write(stddbg, pszWriteBuf, offsWriteBuf); - if(altdbg != -1) write(altdbg, pszWriteBuf, offsWriteBuf); + /* the write is included in an "if" just to silence compiler + * warnings. Here, we really don't care if the write fails, we + * have no good response to that in any case... -- rgerhards, 2012-11-28 + */ + if(stddbg != -1) if(write(stddbg, pszWriteBuf, offsWriteBuf)){}; + if(altdbg != -1) if(write(altdbg, pszWriteBuf, offsWriteBuf)){}; bWasNL = (pszMsg[lenMsg - 1] == '\n') ? 1 : 0; } diff --git a/runtime/module-template.h b/runtime/module-template.h index 72a139c..fe74bac 100644 --- a/runtime/module-template.h +++ b/runtime/module-template.h @@ -113,7 +113,7 @@ static rsRetVal modGetID(void **pID) \ /* macro to provide the v6 config system module name */ #define MODULE_CNFNAME(name) \ -static __attribute__((unused)) rsRetVal modGetCnfName(uchar **cnfName) \ +static rsRetVal modGetCnfName(uchar **cnfName) \ { \ *cnfName = (uchar*) name; \ return RS_RET_OK;\ diff --git a/runtime/msg.c b/runtime/msg.c index b627cd5..09f6d64 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -300,120 +300,20 @@ static uchar * jsonPathGetLeaf(uchar *name, int lenName); static struct json_object *jsonDeepCopy(struct json_object *src); -/* The following functions will support advanced output module - * multithreading, once this is implemented. Currently, we - * include them as hooks only. The idea is that we need to guard - * some msg objects data fields against concurrent access if - * we run on multiple threads. Please note that in any case this - * is not necessary for calls from INPUT modules, because they - * construct the message object and do this serially. Only when - * the message is in the processing queue, multiple threads may - * access a single object. Consequently, there are no guard functions - * for "set" methods, as these are called during input. Only "get" - * functions that modify important structures have them. - * rgerhards, 2007-07-20 - * We now support locked and non-locked operations, depending on - * the configuration of rsyslog. To support this, we use function - * pointers. Initially, we start in non-locked mode. There, all - * locking operations call into dummy functions. When locking is - * enabled, the function pointers are changed to functions doing - * actual work. We also introduced another MsgPrepareEnqueue() function - * which initializes the locking structures, if needed. This is - * necessary because internal messages during config file startup - * processing are always created in non-locking mode. So we can - * not initialize locking structures during constructions. We now - * postpone this until when the message is fully constructed and - * enqueued. Then we know the status of locking. This has a nice - * side effect, and that is that during the initial creation of - * the Msg object no locking needs to be done, which results in better - * performance. -- rgerhards, 2008-01-05 - */ -static void (*funcLock)(msg_t *pMsg); -static void (*funcUnlock)(msg_t *pMsg); -static void (*funcDeleteMutex)(msg_t *pMsg); -void (*funcMsgPrepareEnqueue)(msg_t *pMsg); -#if 1 /* This is a debug aid */ -#define MsgLock(pMsg) funcLock(pMsg) -#define MsgUnlock(pMsg) funcUnlock(pMsg) -#else -#define MsgLock(pMsg) {dbgprintf("MsgLock line %d\n - ", __LINE__); funcLock(pMsg);; } -#define MsgUnlock(pMsg) {dbgprintf("MsgUnlock line %d - ", __LINE__); funcUnlock(pMsg); } -#endif - -/* the next function is a dummy to be used by the looking functions - * when the class is not yet running in an environment where locking - * is necessary. Please note that the need to lock can (and will) change - * during a single run. Typically, this is depending on the operation mode - * of the message queues (which is operator-configurable). -- rgerhards, 2008-01-05 - */ -static void MsgLockingDummy(msg_t __attribute__((unused)) *pMsg) -{ - /* empty be design */ -} - - -/* The following function prepares a message for enqueue into the queue. This is - * where a message may be accessed by multiple threads. This implementation here - * is the version for multiple concurrent acces. It initializes the locking - * structures. - * TODO: change to an iRet interface! -- rgerhards, 2008-07-14 - */ -static void MsgPrepareEnqueueLockingCase(msg_t *pThis) -{ - BEGINfunc - assert(pThis != NULL); - pthread_mutex_init(&pThis->mut, NULL); - pThis->bDoLock = 1; - ENDfunc -} - - -/* ... and now the locking and unlocking implementations: */ -static void MsgLockLockingCase(msg_t *pThis) +/* the locking and unlocking implementations: */ +static inline void +MsgLock(msg_t *pThis) { /* DEV debug only! dbgprintf("MsgLock(0x%lx)\n", (unsigned long) pThis); */ - assert(pThis != NULL); - if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */ - pthread_mutex_lock(&pThis->mut); + pthread_mutex_lock(&pThis->mut); } - -static void MsgUnlockLockingCase(msg_t *pThis) +static inline void +MsgUnlock(msg_t *pThis) { /* DEV debug only! dbgprintf("MsgUnlock(0x%lx)\n", (unsigned long) pThis); */ - assert(pThis != NULL); - if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */ - pthread_mutex_unlock(&pThis->mut); + pthread_mutex_unlock(&pThis->mut); } -/* delete the mutex object on message destruction (locking case) - */ -static void MsgDeleteMutexLockingCase(msg_t *pThis) -{ - assert(pThis != NULL); - pthread_mutex_destroy(&pThis->mut); -} - -/* enable multiple concurrent access on the message object - * This works on a class-wide basis and can bot be undone. - * That is, if it is once enabled, it can not be disabled during - * the same run. When this function is called, no other thread - * must manipulate message objects. Then we would have race conditions, - * but guarding against this is counter-productive because it - * would cost additional time. Plus, it would be a programming error. - * rgerhards, 2008-01-05 - */ -rsRetVal MsgEnableThreadSafety(void) -{ - DEFiRet; - funcLock = MsgLockLockingCase; - funcUnlock = MsgUnlockLockingCase; - funcMsgPrepareEnqueue = MsgPrepareEnqueueLockingCase; - funcDeleteMutex = MsgDeleteMutexLockingCase; - RETiRet; -} - -/* end locking functions */ - static inline int getProtocolVersion(msg_t *pM) { @@ -716,8 +616,6 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis) /* initialize members in ORDER they appear in structure (think "cache line"!) */ pM->flowCtlType = 0; - pM->bDoLock = 0; - pM->bAlreadyFreed = 0; pM->bParseSuccess = 0; pM->iRefCount = 1; pM->iSeverity = -1; @@ -760,6 +658,7 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis) pM->pszTIMESTAMP_Unix[0] = '\0'; pM->pszRcvdAt_Unix[0] = '\0'; pM->pszUUID = NULL; + pthread_mutex_init(&pM->mut, NULL); /* DEV debugging only! dbgprintf("msgConstruct\t0x%x, ref 1\n", (int)pM);*/ @@ -849,15 +748,6 @@ CODESTARTobjDestruct(msg) if(currRefCount == 0) { /* DEV Debugging Only! dbgprintf("msgDestruct\t0x%lx, RefCount now 0, doing DESTROY\n", (unsigned long)pThis); */ - /* The if below is included to try to nail down a well-hidden bug causing - * segfaults. I hope that do to the test code the problem is sooner detected and - * thus we get better data for debugging and resolving it. -- rgerhards, 2011-02-23. - * TODO: remove when no longer needed. - */ - if(pThis->bAlreadyFreed) - abort(); - pThis->bAlreadyFreed = 1; - /* end debug code */ if(pThis->pszRawMsg != pThis->szRawMsg) free(pThis->pszRawMsg); freeTAG(pThis); @@ -895,7 +785,7 @@ CODESTARTobjDestruct(msg) # ifndef HAVE_ATOMIC_BUILTINS MsgUnlock(pThis); # endif - funcDeleteMutex(pThis); + pthread_mutex_destroy(&pThis->mut); /* now we need to do our own optimization. Testing has shown that at least the glibc * malloc() subsystem returns memory to the OS far too late in our case. So we need * to help it a bit, by calling malloc_trim(), which will tell the alloc subsystem @@ -3656,18 +3546,6 @@ finalize_it: #undef isProp -/* This is a construction finalizer that must be called after all properties - * have been set. It does some final work on the message object. After this - * is done, the object is considered ready for full processing. - * rgerhards, 2008-07-08 - */ -static rsRetVal msgConstructFinalizer(msg_t *pThis) -{ - MsgPrepareEnqueue(pThis); - return RS_RET_OK; -} - - /* get the severity - this is an entry point that * satisfies the base object class getSeverity semantics. * rgerhards, 2008-01-14 @@ -3977,13 +3855,7 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE) /* set our own handlers */ OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, MsgSetProperty); - OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, msgConstructFinalizer); OBJSetMethodHandler(objMethod_GETSEVERITY, MsgGetSeverity); - /* initially, we have no need to lock message objects */ - funcLock = MsgLockingDummy; - funcUnlock = MsgLockingDummy; - funcDeleteMutex = MsgLockingDummy; - funcMsgPrepareEnqueue = MsgLockingDummy; /* some more inits */ # if HAVE_MALLOC_TRIM INIT_ATOMIC_HELPER_MUT(mutTrimCtr); diff --git a/runtime/msg.h b/runtime/msg.h index 396e861..ab47900 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -64,7 +64,6 @@ struct msg { once data has entered the queue, this property is no longer needed. */ pthread_mutex_t mut; int iRefCount; /* reference counter (0 = unused) */ - sbool bDoLock; /* use the mutex? */ sbool bAlreadyFreed; /* aid to help detect a well-hidden bad bug -- TODO: remove when no longer needed */ sbool bParseSuccess; /* set to reflect state of last executed higher level parser */ short iSeverity; /* the severity 0..7 */ @@ -177,7 +176,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, char *textpri(char *pRes, size_t pResLen, int pri); rsRetVal msgGetMsgVar(msg_t *pThis, cstr_t *pstrPropName, var_t **ppVar); es_str_t* msgGetMsgVarNew(msg_t *pThis, uchar *name); -rsRetVal MsgEnableThreadSafety(void); uchar *getRcvFrom(msg_t *pM); void getTAG(msg_t *pM, uchar **ppBuf, int *piLen); char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt); @@ -213,16 +211,6 @@ msgUnsetJSON(msg_t *pMsg, uchar *varname) { } -/* The MsgPrepareEnqueue() function is a macro for performance reasons. - * It needs one global variable to work. This is acceptable, as it gains - * us quite some performance and is fully abstracted using this header file. - * The important thing is that no other module is permitted to actually - * access that global variable! -- rgerhards, 2008-01-05 - */ -extern void (*funcMsgPrepareEnqueue)(msg_t *pMsg); -#define MsgPrepareEnqueue(pMsg) funcMsgPrepareEnqueue(pMsg) - - /* ------------------------------ some inline functions ------------------------------ */ /* set raw message size. This is needed in some cases where a trunctation is necessary diff --git a/runtime/queue.c b/runtime/queue.c index fbf7710..bb40e54 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -242,6 +242,9 @@ getQueueTypeName(queueType_t t) case QUEUETYPE_DIRECT: r = "Direct"; break; + default: + r = "unknown queue type"; + break; } return r; } @@ -1379,7 +1382,7 @@ finalize_it: } -/* set default inisde queue object suitable for action queues. +/* set default inside queue object suitable for action queues. * This shall be called directly after queue construction. This functions has * been added in support of the new v6 config system. It expect properly pre-initialized * objects, but we need to differentiate between ruleset main and action queues. @@ -1413,6 +1416,36 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) } +/* set defaults inside queue object suitable for main/ruleset queues. + * See queueSetDefaultsActionQueue() for more details and background. + */ +void +qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) +{ + pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */ + pThis->iMaxQueueSize = 50000; /* size of the main message queue above */ + pThis->iDeqBatchSize = 1024; /* default batch size */ + pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */ + pThis->iDiscardMrk = 49500; /* begin to discard messages */ + pThis->iDiscardSeverity = 8; /* turn off */ + pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ + pThis->iMaxFileSize = 16*1024*1024; + pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */ + pThis->bSyncQueueFiles = 0; + pThis->toQShutdown = 1500; /* queue shutdown */ + pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */ + pThis->toEnq = 2000; /* timeout for queue enque */ + pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */ + pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */ + pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ + pThis->sizeOnDiskMax = 0; /* unlimited */ + pThis->iDeqSlowdown = 0; + pThis->iDeqtWinFromHr = 0; + pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ +} + + /* This function checks if the provided message shall be discarded and does so, if needed. * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to * provide real-time creation of spool files. @@ -2675,6 +2708,15 @@ qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals) return RS_RET_OK; } + +/* are any queue params set at all? 1 - yes, 0 - no */ +int +queueCnfParamsSet(struct cnfparamvals *pvals) +{ + return cnfparamvalsIsSet(&pblk, pvals); +} + + /* apply all params from param block to queue. Must be called before * finalizing. This supports the v6 config system. Defaults were already * set during queue creation. The pvals object is destructed by this diff --git a/runtime/queue.h b/runtime/queue.h index edb770c..91c100e 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -193,7 +193,9 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals); +int queueCnfParamsSet(struct cnfparamvals *pvals); rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals); +void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis); void qqueueSetDefaultsActionQueue(qqueue_t *pThis); void qqueueDbgPrint(qqueue_t *pThis); diff --git a/runtime/rsconf.c b/runtime/rsconf.c index dcaa1ad..ac9cd80 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -292,6 +292,9 @@ getNOW(eNOWType eNow, es_str_t **estr) case NOW_MINUTE: len = snprintf((char*) szBuf, sizeof(szBuf)/sizeof(uchar), "%2.2d", t.minute); break; + default: + len = snprintf((char*) szBuf, sizeof(szBuf)/sizeof(uchar), "*invld eNow*"); + break; } /* now create a string object out of it and hand that over to the var */ @@ -476,6 +479,9 @@ cnfGetVar(char *name, void *usrptr) estr = msgGetCEEVarNew((msg_t*) usrptr, name+2); else estr = msgGetMsgVarNew((msg_t*) usrptr, (uchar*)name+1); + } else { /* if this happens, we have a program logic error */ + estr = es_newStrFromCStr("err: var must start with $", + strlen("err: var must start with $")); } if(Debug) { char *s; @@ -752,7 +758,7 @@ activateMainQueue() { DEFiRet; /* create message queue */ - CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"))) { + CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"), NULL)) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet); FINALIZE; diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 8d2bb92..3459f54 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -844,7 +844,7 @@ doRulesetCreateQueue(rsconf_t *conf, int *pNewVal) rsname = (conf->rulesets.pCurr->pszName == NULL) ? (uchar*) "[ruleset]" : conf->rulesets.pCurr->pszName; DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname); - CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname)); + CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname, NULL)); finalize_it: RETiRet; @@ -904,6 +904,7 @@ rsRetVal rulesetProcessCnf(struct cnfobj *o) { struct cnfparamvals *pvals; + struct cnfparamvals *queueParams; rsRetVal localRet; uchar *rsName = NULL; uchar *parserName; @@ -911,6 +912,7 @@ rulesetProcessCnf(struct cnfobj *o) ruleset_t *pRuleset; struct cnfarray *ar; int i; + uchar *rsname; DEFiRet; pvals = nvlstGetParams(o->nvlst, &rspblk, NULL); @@ -938,14 +940,21 @@ rulesetProcessCnf(struct cnfobj *o) /* we have only two params, so we do NOT do the usual param loop */ parserIdx = cnfparamGetIdx(&rspblk, "parser"); - if(parserIdx == -1 || !pvals[parserIdx].bUsed) - FINALIZE; + if(parserIdx != -1 && pvals[parserIdx].bUsed) { + ar = pvals[parserIdx].val.d.ar; + for(i = 0 ; i < ar->nmemb ; ++i) { + parserName = (uchar*)es_str2cstr(ar->arr[i], NULL); + doRulesetAddParser(pRuleset, parserName); + free(parserName); + } + } - ar = pvals[parserIdx].val.d.ar; - for(i = 0 ; i < ar->nmemb ; ++i) { - parserName = (uchar*)es_str2cstr(ar->arr[i], NULL); - doRulesetAddParser(pRuleset, parserName); - free(parserName); + /* pick up ruleset queue parameters */ + qqueueDoCnfParams(o->nvlst, &queueParams); + if(queueCnfParamsSet(queueParams)) { + rsname = (pRuleset->pszName == NULL) ? (uchar*) "[ruleset]" : pRuleset->pszName; + DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname); + CHKiRet(createMainQueue(&pRuleset->pQueue, rsname, queueParams)); } finalize_it: |