diff options
Diffstat (limited to 'plugins/imjournal/imjournal.c')
-rwxr-xr-x | plugins/imjournal/imjournal.c | 116 |
1 files changed, 95 insertions, 21 deletions
diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c index ae29154..36c7e04 100755 --- a/plugins/imjournal/imjournal.c +++ b/plugins/imjournal/imjournal.c @@ -3,7 +3,7 @@ * To test under Linux: * emmit log message into systemd journal * - * Copyright (C) 2008-2012 Adiscon GmbH + * Copyright (C) 2008-2013 Adiscon GmbH * * This file is part of rsyslog. * @@ -33,6 +33,7 @@ #include <sys/poll.h> #include <sys/socket.h> #include <errno.h> +#include <systemd/sd-journal.h> #include "dirty.h" #include "cfsysline.h" @@ -47,7 +48,7 @@ #include "errmsg.h" #include "srUtils.h" #include "unicode-helper.h" -#include <systemd/sd-journal.h> +#include "ratelimit.h" MODULE_TYPE_INPUT MODULE_TYPE_NOKEEP @@ -64,12 +65,18 @@ DEFobjCurrIf(errmsg) static struct configSettings_s { char *stateFile; int iPersistStateInterval; + int ratelimitInterval; + int ratelimitBurst; + int bIgnorePrevious; } cs; -/* module-gloval parameters */ +/* module-global parameters */ static struct cnfparamdescr modpdescr[] = { { "statefile", eCmdHdlrGetWord, 0 }, - { "persiststateinterval", eCmdHdlrInt, 0 } + { "ratelimit.interval", eCmdHdlrInt, 0 }, + { "ratelimit.burst", eCmdHdlrInt, 0 }, + { "persiststateinterval", eCmdHdlrInt, 0 }, + { "ignorepreviousmessages", eCmdHdlrBinary, 0 } }; static struct cnfparamblk modpblk = { CNFPARAMBLK_VERSION, @@ -79,11 +86,12 @@ static struct cnfparamblk modpblk = #define DFLT_persiststateinterval 10 -static int bLegacyCnfModGlobalsPermitted = 0;/* are legacy module-global config parameters permitted? */ +static int bLegacyCnfModGlobalsPermitted = 1;/* are legacy module-global config parameters permitted? */ static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */ static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 */ +static ratelimit_t *ratelimiter = NULL; static sd_journal *j; /* enqueue the the journal message into the message queue. @@ -121,7 +129,7 @@ enqMsg(uchar *msg, uchar *pszTag, int iFacility, int iSeverity, struct timeval * msgAddJSON(pMsg, (uchar*)"!", json); } - CHKiRet(submitMsg2(pMsg)); + CHKiRet(ratelimitAddMsg(ratelimiter, NULL, pMsg)); finalize_it: RETiRet; @@ -244,7 +252,13 @@ readjournal() { SD_JOURNAL_FOREACH_DATA(j, get, l) { /* locate equal sign, this is always present */ equal_sign = memchr(get, '=', l); - assert (equal_sign != NULL); + + /* ... but we know better than to trust the specs */ + if (equal_sign == NULL) { + errmsg.LogError(0, RS_RET_ERR, "SD_JOURNAL_FOREACH_DATA()" + "returned a malformed field (has no '='): '%s'", get); + continue; /* skip the entry */ + } /* get length of journal data prefix */ prefixlen = ((char *)equal_sign - (char *)get); @@ -427,12 +441,13 @@ finalize_it: } -BEGINrunInput -CODESTARTrunInput - /* this is an endless loop - it is terminated when the thread is - * signalled to do so. This, however, is handled by the framework, - * right into the sleep below. - */ +/* This function loads a journal cursor from the state file. + */ +static rsRetVal +loadJournalState() +{ + DEFiRet; + if (cs.stateFile[0] != '/') { char *new_stateFile; @@ -470,8 +485,48 @@ CODESTARTrunInput errmsg.LogError(0, RS_RET_FOPEN_FAILURE, "imjournal: " "open on state file `%s' failed\n", cs.stateFile); } + } else { + /* when IgnorePrevious, seek to the end of journal */ + if (cs.bIgnorePrevious) { + if (sd_journal_seek_tail(j) < 0) { + char errStr[256]; + + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_ERR, + "sd_journal_seek_tail() failed: '%s'", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + + if (sd_journal_previous(j) < 0) { + char errStr[256]; + + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_ERR, + "sd_journal_previous() failed: '%s'", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + } + } + +finalize_it: + RETiRet; +} + +BEGINrunInput +CODESTARTrunInput + CHKiRet(ratelimitNew(&ratelimiter, "imjournal", NULL)); + dbgprintf("imjournal: ratelimiting burst %d, interval %d\n", cs.ratelimitBurst, + cs.ratelimitInterval); + ratelimitSetLinuxLike(ratelimiter, cs.ratelimitInterval, cs.ratelimitBurst); + ratelimitSetNoTimeCache(ratelimiter); + + if (cs.stateFile) { + CHKiRet(loadJournalState()); } + /* this is an endless loop - it is terminated when the thread is + * signalled to do so. This, however, is handled by the framework. + */ while (glbl.GetGlobalInputTermState() == 0) { int count = 0, r; @@ -492,11 +547,13 @@ CODESTARTrunInput } CHKiRet(readjournal()); - /* TODO: This could use some finer metric. */ - count++; - if (count == cs.iPersistStateInterval) { - count = 0; - persistJournalState(); + if (cs.stateFile) { /* can't persist without a state file */ + /* TODO: This could use some finer metric. */ + count++; + if (count == cs.iPersistStateInterval) { + count = 0; + persistJournalState(); + } } } @@ -510,6 +567,8 @@ CODESTARTbeginCnfLoad cs.iPersistStateInterval = DFLT_persiststateinterval; cs.stateFile = NULL; + cs.ratelimitBurst = 20000; + cs.ratelimitInterval = 600; ENDbeginCnfLoad @@ -545,8 +604,11 @@ ENDwillRun /* close journal */ BEGINafterRun CODESTARTafterRun - persistJournalState(); + if (cs.stateFile) { /* can't persist without a state file */ + persistJournalState(); + } sd_journal_close(j); + ratelimitDestruct(ratelimiter); ENDafterRun @@ -589,6 +651,12 @@ CODESTARTsetModCnf cs.iPersistStateInterval = (int) pvals[i].val.d.n; } else if (!strcmp(modpblk.descr[i].name, "statefile")) { cs.stateFile = (char *)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(modpblk.descr[i].name, "ratelimit.burst")) { + cs.ratelimitBurst = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "ratelimit.interval")) { + cs.ratelimitInterval = (int) pvals[i].val.d.n; + } else if (!strcmp(modpblk.descr[i].name, "ignorepreviousmessages")) { + cs.bIgnorePrevious = (int) pvals[i].val.d.n; } else { dbgprintf("imjournal: program error, non-handled " "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); @@ -632,10 +700,16 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(prop.CreateStringProp(&pInputName, UCHAR_CONSTANT("imjournal"), sizeof("imjournal") - 1)); CHKiRet(prop.CreateStringProp(&pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"persiststateinterval", 0, eCmdHdlrInt, + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalpersiststateinterval", 0, eCmdHdlrInt, NULL, &cs.iPersistStateInterval, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"statefile", 0, eCmdHdlrGetWord, + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalratelimitinterval", 0, eCmdHdlrInt, + NULL, &cs.ratelimitInterval, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalratelimitburst", 0, eCmdHdlrInt, + NULL, &cs.ratelimitBurst, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalstatefile", 0, eCmdHdlrGetWord, NULL, &cs.stateFile, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalignorepreviousmessages", 0, eCmdHdlrBinary, + NULL, &cs.bIgnorePrevious, STD_LOADABLE_MODULE_ID)); ENDmodInit |