diff options
author | Michael Biebl <biebl@debian.org> | 2011-03-26 19:16:55 +0100 |
---|---|---|
committer | Michael Biebl <biebl@debian.org> | 2011-03-26 19:16:55 +0100 |
commit | dea543841bb1f1c46586a68da21d55b44c6bf158 (patch) | |
tree | 46dd88ab15d5da40adae5ca8748f73e079df979a /action.c | |
parent | a362b3333f84518889cefc1706d4e6adaa92888c (diff) | |
download | rsyslog-dea543841bb1f1c46586a68da21d55b44c6bf158.tar.gz |
Imported Upstream version 5.7.9upstream/5.7.9
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 139 |
1 files changed, 109 insertions, 30 deletions
@@ -4,7 +4,44 @@ * * File begun on 2007-08-06 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH. + * Some notes on processing (this hopefully makes it easier to find + * the right code in question): For performance reasons, this module + * uses different methods of message submission based on the user-selected + * configuration. This code is similar, but can not be abstracted because + * of the performanse-affecting differences in it. As such, it is often + * necessary to triple-check that everything works well in *all* modes. + * The different modes (and calling sequence) are: + * + * if set iExecEveryNthOccur > 1 || f_ReduceRepeated || iSecsExecOnceInterval + * - doSubmitToActionQComplexBatch + * - helperSubmitToActionQComplexBatch + * - doActionCallAction + * handles duplicate message processing, but in essence calls + * - actionWriteToAction + * - qqueueEnqObj + * (now queue engine processing) + * if(pThis->bWriteAllMarkMsgs == FALSE) - this is the DEFAULT + * - doSubmitToActionQNotAllMarkBatch + * - doSubmitToActionQBatch (and from here like in the else case below!) + * else + * - doSubmitToActionQBatch + * - doSubmitToActionQ + * - qqueueEnqObj + * (now queue engine processing) + * + * Note that bWriteAllMakrMsgs on or off creates almost the same processing. + * The difference ist that if WriteAllMarkMsgs is not set, we need to + * preprocess the batch and drop mark messages which are not yet due for + * writing. + * + * After dequeue, processing is as follows: + * - processBatchMain + * - processAction + * - submitBatch + * - tryDoAction + * - + * + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -899,7 +936,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) if(*(pBatch->pbShutdownImmediate)) ABORT_FINALIZE(RS_RET_FORCE_TERM); if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC + && pBatch->pElem[i].state != BATCH_STATE_DISC//) { && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, @@ -1164,11 +1201,33 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT } +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. This is also utilized to submit messages in complex case once + * the complex logic has been applied ;) + * rgerhards, 2010-06-08 + */ +static inline rsRetVal +doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); + else + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + + RETiRet; +} + + /* This function builds up a batch of messages to be (later) * submitted to the action queue. + * Note: this function is also called from syslogd itself as part of its + * flush processing. If so, pBatch will be NULL and idxBtch undefined. */ rsRetVal -actionWriteToAction(action_t *pAction) +actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch) { msg_t *pMsgSave; /* to save current message pointer, necessary to restore it in case it needs to be updated (e.g. repeated msgs) */ @@ -1239,7 +1298,7 @@ actionWriteToAction(action_t *pAction) pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */ } - DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); + DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); /* now check if we need to drop the message because otherwise the action would be too * frequently called. -- rgerhards, 2008-04-08 @@ -1257,14 +1316,43 @@ actionWriteToAction(action_t *pAction) FINALIZE; } - /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ + /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) + * rgerhards, 2008-09-17 */ pAction->tLastExec = getActNow(pAction); /* re-init time flags */ pAction->f_time = pAction->f_pMsg->ttGenTime; /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); + if( pBatch != NULL + && (pAction->bExecWhenPrevSusp == 1 && pBatch->pElem[idxBtch].bPrevWasSuspended)) { + /* in that case, we need to create a special batch which reflects the + * suspended state. Otherwise, that information would be dropped inside + * the queue engine. TODO: in later releases (v6?) create a better + * solution than what we do here. However, for v5 this sounds much too + * intrusive. -- rgerhardsm, 2011-03-16 + * (Code is copied over from queue.c and slightly modified) + */ + batch_t singleBatch; + batch_obj_t batchObj; + int i; + memset(&batchObj, 0, sizeof(batch_obj_t)); + memset(&singleBatch, 0, sizeof(batch_t)); + batchObj.state = BATCH_STATE_RDY; + batchObj.pUsrp = (obj_t*) pAction->f_pMsg; + batchObj.bPrevWasSuspended = 1; + batchObj.bFilterOK = 1; + singleBatch.nElem = 1; /* there always is only one in direct mode */ + singleBatch.pElem = &batchObj; + + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, &singleBatch); + + for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { + free(batchObj.staticActStrings[i]); + } + } else { /* standard case, just submit */ + iRet = doSubmitToActionQ(pAction, pAction->f_pMsg); + } if(iRet == RS_RET_OK) pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ @@ -1292,10 +1380,12 @@ finalize_it: * pthread_cleanup_push() POSIX macro... */ static inline rsRetVal -doActionCallAction(action_t *pAction, msg_t *pMsg) +doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) { + msg_t *pMsg; DEFiRet; + pMsg = (msg_t*)(pBatch->pElem[idxBtch].pUsrp); pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ /* don't output marks to recently written outputs */ @@ -1322,7 +1412,7 @@ doActionCallAction(action_t *pAction, msg_t *pMsg) * isolated messages), but back off so we'll flush less often in the future. */ if(getActNow(pAction) > REPEATTIME(pAction)) { - iRet = actionWriteToAction(pAction); + iRet = actionWriteToAction(pAction, pBatch, idxBtch); BACKOFF(pAction); } } else {/* new message, save it */ @@ -1331,7 +1421,7 @@ doActionCallAction(action_t *pAction, msg_t *pMsg) */ if(pAction->f_pMsg != NULL) { if(pAction->f_prevcount > 0) - actionWriteToAction(pAction); + actionWriteToAction(pAction, pBatch, idxBtch); /* we do not care about iRet above - I think it's right but if we have * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01 */ @@ -1339,33 +1429,21 @@ doActionCallAction(action_t *pAction, msg_t *pMsg) } pAction->f_pMsg = MsgAddRef(pMsg); /* call the output driver */ - iRet = actionWriteToAction(pAction); + iRet = actionWriteToAction(pAction, pBatch, idxBtch); } finalize_it: - RETiRet; -} - -/* This submits the message to the action queue in case we do NOT need to handle repeat - * message processing. That case permits us to gain lots of freedom during processing - * and thus speed. - * rgerhards, 2010-06-08 - */ -static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg) -{ - DEFiRet; - - if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); - else - iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + /* we need to update the batch to handle failover processing correctly */ + if(iRet == RS_RET_OK) { + pBatch->pElem[idxBtch].bPrevWasSuspended = 0; + } else if(iRet == RS_RET_ACTION_FAILED) { + pBatch->pElem[idxBtch].bPrevWasSuspended = 1; + } RETiRet; } - /* This submits the message to the action queue in case where we need to handle * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop * for the synchronization. Here, we just modify the filter condition to be false when @@ -1482,8 +1560,9 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC) { - doActionCallAction(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + && pBatch->pElem[i].state != BATCH_STATE_DISC + && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { + doActionCallAction(pAction, pBatch, i); } } |