diff options
author | Michael Biebl <biebl@debian.org> | 2010-02-24 20:31:30 +0100 |
---|---|---|
committer | Michael Biebl <biebl@debian.org> | 2010-02-24 20:31:30 +0100 |
commit | 98a45d0b54c09ca82b3540491915ad6cc61c1a97 (patch) | |
tree | 0ec782ee7d1097acfdf2962b3b9b3404db314002 /action.c | |
parent | b743785de633f7ff5c39f980496d359e4758ec83 (diff) | |
download | rsyslog-98a45d0b54c09ca82b3540491915ad6cc61c1a97.tar.gz |
Imported Upstream version 4.6.0upstream/4.6.0
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 205 |
1 files changed, 119 insertions, 86 deletions
@@ -4,7 +4,7 @@ * * File begun on 2007-08-06 by RGerhards (extracted from syslogd.c) * - * Copyright 2007 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -43,6 +43,7 @@ #include "srUtils.h" #include "errmsg.h" #include "datetime.h" +#include "unicode-helper.h" /* forward definitions */ rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg); @@ -72,6 +73,7 @@ static int iActionQueueNumWorkers = 1; /* number of worker threads for the mm static uchar *pszActionQFName = NULL; /* prefix for the main message queue file */ static int64 iActionQueMaxFileSize = 1024*1024; static int iActionQPersistUpdCnt = 0; /* persist queue info every n updates */ +static int bActionQSyncQeueFiles = 0; /* sync queue files */ static int iActionQtoQShutdown = 0; /* queue shutdown */ static int iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ static int iActionQtoEnq = 2000; /* timeout for queue enque */ @@ -151,6 +153,7 @@ actionResetQueueParams(void) iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ iActionQueMaxFileSize = 1024*1024; iActionQPersistUpdCnt = 0; /* persist queue info every n updates */ + bActionQSyncQeueFiles = 0; iActionQtoQShutdown = 0; /* queue shutdown */ iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ iActionQtoEnq = 2000; /* timeout for queue enque */ @@ -176,6 +179,7 @@ actionResetQueueParams(void) */ rsRetVal actionDestruct(action_t *pThis) { + int i; DEFiRet; ASSERT(pThis != NULL); @@ -193,6 +197,33 @@ rsRetVal actionDestruct(action_t *pThis) pthread_mutex_destroy(&pThis->mutActExec); d_free(pThis->pszName); d_free(pThis->ppTpl); + + /* message ptr cleanup */ + for(i = 0 ; i < pThis->iNumTpls ; ++i) { + if(pThis->ppMsgs[i] != NULL) { + switch(pThis->eParamPassing) { + case ACT_ARRAY_PASSING: +#if 0 /* later! */ + iArr = 0; + while(((char **)pThis->ppMsgs[i])[iArr] != NULL) { + d_free(((char **)pThis->ppMsgs[i])[iArr++]); + ((char **)pThis->ppMsgs[i])[iArr++] = NULL; + } + d_free(pThis->ppMsgs[i]); + pThis->ppMsgs[i] = NULL; +#endif + break; + case ACT_STRING_PASSING: + d_free(pThis->ppMsgs[i]); + break; + default: + assert(0); + } + } + } + d_free(pThis->ppMsgs); + d_free(pThis->lenMsgs); + d_free(pThis); RETiRet; @@ -209,10 +240,7 @@ rsRetVal actionConstruct(action_t **ppThis) ASSERT(ppThis != NULL); - if((pThis = (action_t*) calloc(1, sizeof(action_t))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } - + CHKmalloc(pThis = (action_t*) calloc(1, sizeof(action_t))); pThis->iResumeInterval = glbliActionResumeInterval; pThis->iResumeRetryCount = glbliActionResumeRetryCount; pThis->tLastOccur = time(NULL); /* done once per action on startup only */ @@ -273,6 +301,7 @@ actionConstructFinalize(action_t *pThis) setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); + setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", bActionQSyncQeueFiles); setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown ); setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown); setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown); @@ -295,7 +324,7 @@ actionConstructFinalize(action_t *pThis) CHKiRet(qqueueStart(pThis->pQueue)); - dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue); + DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue); /* and now reset the queue params (see comment in its function header!) */ actionResetQueueParams(); @@ -383,7 +412,7 @@ static rsRetVal actionTryResume(action_t *pThis) if(iRet == RS_RET_OK) actionResume(pThis); - dbgprintf("actionTryResume: iRet: %d, next retry (if applicable): %u [now %u]\n", + DBGPRINTF("actionTryResume: iRet: %d, next retry (if applicable): %u [now %u]\n", iRet, (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); RETiRet; @@ -429,37 +458,33 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) int iSleepPeriod; int bCallAction; int iCancelStateSave; - uchar **ppMsgs; /* array of message pointers for doAction */ ASSERT(pAction != NULL); - /* create the array for doAction() message pointers */ - if((ppMsgs = calloc(pAction->iNumTpls, sizeof(uchar *))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } + /* We now must guard the output module against execution by multiple threads. The + * plugin interface specifies that output modules must not be thread-safe (except + * if they notify us they are - functionality not yet implemented...). + * rgerhards, 2008-01-30 + */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(&pAction->mutActExec); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); + pthread_setcancelstate(iCancelStateSave, NULL); /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]))); + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pAction->ppMsgs[i]), &(pAction->lenMsgs[i]))); break; case ACT_ARRAY_PASSING: - CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(ppMsgs[i]))); + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pAction->ppMsgs[i]))); break; default:assert(0); /* software bug if this happens! */ } } + iRetries = 0; - /* We now must guard the output module against execution by multiple threads. The - * plugin interface specifies that output modules must not be thread-safe (except - * if they notify us they are - functionality not yet implemented...). - * rgerhards, 2008-01-30 - */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - pthread_setcancelstate(iCancelStateSave, NULL); do { /* on first invocation, this if should never be true. We just put it at the top * of the loop so that processing (and code) is simplified. This code is actually @@ -484,9 +509,9 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) if(bCallAction) { /* call configured action */ - iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData); + iRet = pAction->pMod->mod.om.doAction(pAction->ppMsgs, pMsg->msgFlags, pAction->pModData); if(iRet == RS_RET_SUSPENDED) { - dbgprintf("Action requested to be suspended, done that.\n"); + DBGPRINTF("Action requested to be suspended, done that.\n"); actionSuspend(pAction, getActNow(pAction)); } } @@ -494,34 +519,35 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) } while(iRet == RS_RET_SUSPENDED && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */ if(iRet == RS_RET_DISABLE_ACTION) { - dbgprintf("Action requested to be disabled, done that.\n"); + DBGPRINTF("Action requested to be disabled, done that.\n"); pAction->bEnabled = 0; /* that's it... */ } - pthread_cleanup_pop(1); /* unlock mutex */ - finalize_it: /* cleanup */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { - if(ppMsgs[i] != NULL) { + if(pAction->ppMsgs[i] != NULL) { switch(pAction->eParamPassing) { case ACT_ARRAY_PASSING: iArr = 0; - while(((char **)ppMsgs[i])[iArr] != NULL) - d_free(((char **)ppMsgs[i])[iArr++]); - d_free(ppMsgs[i]); + while(((char **)pAction->ppMsgs[i])[iArr] != NULL) { + d_free(((char **)pAction->ppMsgs[i])[iArr++]); + ((char **)pAction->ppMsgs[i])[iArr++] = NULL; + } + d_free(pAction->ppMsgs[i]); + pAction->ppMsgs[i] = NULL; break; case ACT_STRING_PASSING: - d_free(ppMsgs[i]); break; default: assert(0); } } } - d_free(ppMsgs); - msgDestruct(&pMsg); /* we are now finished with the message */ + pthread_cleanup_pop(1); /* unlock mutex */ + + msgDestruct(&pMsg); /* we are now finished with the message */ RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" @@ -569,16 +595,16 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT if (!strcasecmp((char *) pszType, "fixedarray")) { ActionQueType = QUEUETYPE_FIXED_ARRAY; - dbgprintf("action queue type set to FIXED_ARRAY\n"); + DBGPRINTF("action queue type set to FIXED_ARRAY\n"); } else if (!strcasecmp((char *) pszType, "linkedlist")) { ActionQueType = QUEUETYPE_LINKEDLIST; - dbgprintf("action queue type set to LINKEDLIST\n"); + DBGPRINTF("action queue type set to LINKEDLIST\n"); } else if (!strcasecmp((char *) pszType, "disk")) { ActionQueType = QUEUETYPE_DISK; - dbgprintf("action queue type set to DISK\n"); + DBGPRINTF("action queue type set to DISK\n"); } else if (!strcasecmp((char *) pszType, "direct")) { ActionQueType = QUEUETYPE_DIRECT; - dbgprintf("action queue type set to DIRECT (no queueing at all)\n"); + DBGPRINTF("action queue type set to DIRECT (no queueing at all)\n"); } else { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "unknown actionqueue parameter: %s", (char *) pszType); iRet = RS_RET_INVALID_PARAMS; @@ -624,15 +650,15 @@ actionWriteToAction(action_t *pAction) /* we need to care about multiple occurences */ if( pAction->iExecEveryNthOccurTO > 0 && (getActNow(pAction) - pAction->tLastOccur) > pAction->iExecEveryNthOccurTO) { - dbgprintf("n-th occurence handling timed out (%d sec), restarting from 0\n", + DBGPRINTF("n-th occurence handling timed out (%d sec), restarting from 0\n", (int) (getActNow(pAction) - pAction->tLastOccur)); pAction->iNbrNoExec = 0; pAction->tLastOccur = getActNow(pAction); } if(pAction->iNbrNoExec < pAction->iExecEveryNthOccur - 1) { ++pAction->iNbrNoExec; - dbgprintf("action %p passed %d times to execution - less than neded - discarding\n", - pAction, pAction->iNbrNoExec); + DBGPRINTF("action %p passed %d times to execution - less than neded - discarding\n", + pAction, pAction->iNbrNoExec); FINALIZE; } else { pAction->iNbrNoExec = 0; /* we execute the action now, so the number of no execs is down to */ @@ -649,36 +675,34 @@ actionWriteToAction(action_t *pAction) */ if(pAction->f_prevcount > 1) { msg_t *pMsg; + size_t lenRepMsg; uchar szRepMsg[1024]; if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) { /* it failed - nothing we can do against it... */ - dbgprintf("Message duplication failed, dropping repeat message.\n"); + DBGPRINTF("Message duplication failed, dropping repeat message.\n"); ABORT_FINALIZE(RS_RET_ERR); } if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */ - snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times", + lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times", pAction->f_prevcount); } else { - snprintf((char*)szRepMsg, sizeof(szRepMsg), "message repeated %d times: [%.800s]", + lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]", pAction->f_prevcount, getMSG(pAction->f_pMsg)); } - /* We now need to update the other message properties. - * ... RAWMSG is a problem ... Please note that digital + /* We now need to update the other message properties. Please note that digital * signatures inside the message are also invalidated. */ datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime)); memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime)); - MsgSetMSG(pMsg, (char*)szRepMsg); - MsgSetRawMsg(pMsg, (char*)szRepMsg); - + MsgReplaceMSG(pMsg, szRepMsg, lenRepMsg); pMsgSave = pAction->f_pMsg; /* save message pointer for later restoration */ pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */ } - dbgprintf("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); /* now check if we need to drop the message because otherwise the action would be too * frequently called. -- rgerhards, 2008-04-08 @@ -686,10 +710,10 @@ actionWriteToAction(action_t *pAction) * a purely logical point of view. However, if safes us to check the system time in * (those common) cases where ExecOnceInterval is not used. -- rgerhards, 2008-09-16 */ - if(pAction->f_time != 0 && pAction->iSecsExecOnceInterval > 0 && + if(pAction->iSecsExecOnceInterval > 0 && pAction->iSecsExecOnceInterval + pAction->tLastExec > getActNow(pAction)) { /* in this case we need to discard the message - its not yet time to exec the action */ - dbgprintf("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n", + DBGPRINTF("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n", (int) pAction->iSecsExecOnceInterval, (int) getActNow(pAction), (int) (pAction->iSecsExecOnceInterval + pAction->tLastExec)); pAction->tLastExec = getActNow(pAction); /* re-init time flags */ @@ -697,6 +721,7 @@ actionWriteToAction(action_t *pAction) } /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ + pAction->tLastExec = getActNow(pAction); /* re-init time flags */ pAction->f_time = pAction->f_pMsg->ttGenTime; /* When we reach this point, we have a valid, non-disabled action. @@ -726,32 +751,13 @@ finalize_it: } -/* call the configured action. Does all necessary housekeeping. - * rgerhards, 2007-08-01 - * FYI: currently, this function is only called from the queue - * consumer. So we (conceptually) run detached from the input - * threads (which also means we may run much later than when the - * message was generated). +/* helper to actonCallAction, mostly needed because of this damn + * pthread_cleanup_push() POSIX macro... */ -#pragma GCC diagnostic ignored "-Wempty-body" -rsRetVal -actionCallAction(action_t *pAction, msg_t *pMsg) +static rsRetVal +doActionCallAction(action_t *pAction, msg_t *pMsg) { DEFiRet; - int iCancelStateSave; - - ISOBJ_TYPE_assert(pMsg, msg); - ASSERT(pAction != NULL); - - /* Make sure nodbody else modifies/uses this action object. Right now, this - * is important because of "message repeated n times" processing and potentially - * multiple worker threads. -- rgerhards, 2007-12-11 - */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - LockObj(pAction); - pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); - pthread_setcancelstate(iCancelStateSave, NULL); - /* first, we need to check if this is a disabled * entry. If so, we must not further process it. * rgerhards 2005-09-26 @@ -776,12 +782,12 @@ actionCallAction(action_t *pAction, msg_t *pMsg) /* suppress duplicate messages */ if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL && (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) && - !strcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) && + !ustrcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) && !strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) && - !strcmp(getPROCID(pMsg), getPROCID(pAction->f_pMsg)) && - !strcmp(getAPPNAME(pMsg), getAPPNAME(pAction->f_pMsg))) { + !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(pAction->f_pMsg, LOCK_MUTEX)) && + !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(pAction->f_pMsg, LOCK_MUTEX))) { pAction->f_prevcount++; - dbgprintf("msg repeated %d times, %ld sec of %d.\n", + DBGPRINTF("msg repeated %d times, %ld sec of %d.\n", pAction->f_prevcount, (long) getActNow(pAction) - pAction->f_time, repeatinterval[pAction->f_repeatcount]); /* use current message, so we have the new timestamp (means we need to discard previous one) */ @@ -812,10 +818,36 @@ actionCallAction(action_t *pAction, msg_t *pMsg) } finalize_it: + RETiRet; +} + +/* call the configured action. Does all necessary housekeeping. + * rgerhards, 2007-08-01 + * FYI: currently, this function is only called from the queue + * consumer. So we (conceptually) run detached from the input + * threads (which also means we may run much later than when the + * message was generated). + */ +#pragma GCC diagnostic ignored "-Wempty-body" +rsRetVal +actionCallAction(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + int iCancelStateSave; + + ISOBJ_TYPE_assert(pMsg, msg); + ASSERT(pAction != NULL); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + LockObj(pAction); + pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); + pthread_setcancelstate(iCancelStateSave, NULL); + iRet = doActionCallAction(pAction, pMsg); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); UnlockObj(pAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ pthread_setcancelstate(iCancelStateSave, NULL); + RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" @@ -838,6 +870,7 @@ actionAddCfSysLineHdrl(void) CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL)); @@ -877,7 +910,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques assert(ppAction != NULL); assert(pMod != NULL); assert(pOMSR != NULL); - dbgprintf("Module %s processed this config line.\n", module.GetName(pMod)); + DBGPRINTF("Module %s processed this config line.\n", module.GetName(pMod)); CHKiRet(actionConstruct(&pAction)); /* create action object first */ pAction->pMod = pMod; @@ -901,9 +934,9 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques */ if(pAction->iNumTpls > 0) { /* we first need to create the template pointer array */ - if((pAction->ppTpl = calloc(pAction->iNumTpls, sizeof(struct template *))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } + CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *))); + CHKmalloc(pAction->ppMsgs = (uchar**) calloc(pAction->iNumTpls, sizeof(uchar *))); + CHKmalloc(pAction->lenMsgs = (size_t*) calloc(pAction->iNumTpls, sizeof(size_t))); } for(i = 0 ; i < pAction->iNumTpls ; ++i) { @@ -936,7 +969,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques pAction->eParamPassing = ACT_STRING_PASSING; } - dbgprintf("template: '%s' assigned\n", pTplName); + DBGPRINTF("template: '%s' assigned\n", pTplName); } pAction->pMod = pMod; @@ -945,7 +978,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) pAction->f_ReduceRepeated = bReduceRepeatMsgs; else { - dbgprintf("module is incompatible with RepeatedMsgReduction - turned off\n"); + DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n"); pAction->f_ReduceRepeated = 0; } pAction->bEnabled = 1; /* action is enabled */ |