summaryrefslogtreecommitdiff
path: root/plugins/imjournal/imjournal.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imjournal/imjournal.c')
-rwxr-xr-xplugins/imjournal/imjournal.c116
1 files changed, 95 insertions, 21 deletions
diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c
index ae29154..36c7e04 100755
--- a/plugins/imjournal/imjournal.c
+++ b/plugins/imjournal/imjournal.c
@@ -3,7 +3,7 @@
* To test under Linux:
* emmit log message into systemd journal
*
- * Copyright (C) 2008-2012 Adiscon GmbH
+ * Copyright (C) 2008-2013 Adiscon GmbH
*
* This file is part of rsyslog.
*
@@ -33,6 +33,7 @@
#include <sys/poll.h>
#include <sys/socket.h>
#include <errno.h>
+#include <systemd/sd-journal.h>
#include "dirty.h"
#include "cfsysline.h"
@@ -47,7 +48,7 @@
#include "errmsg.h"
#include "srUtils.h"
#include "unicode-helper.h"
-#include <systemd/sd-journal.h>
+#include "ratelimit.h"
MODULE_TYPE_INPUT
MODULE_TYPE_NOKEEP
@@ -64,12 +65,18 @@ DEFobjCurrIf(errmsg)
static struct configSettings_s {
char *stateFile;
int iPersistStateInterval;
+ int ratelimitInterval;
+ int ratelimitBurst;
+ int bIgnorePrevious;
} cs;
-/* module-gloval parameters */
+/* module-global parameters */
static struct cnfparamdescr modpdescr[] = {
{ "statefile", eCmdHdlrGetWord, 0 },
- { "persiststateinterval", eCmdHdlrInt, 0 }
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 },
+ { "persiststateinterval", eCmdHdlrInt, 0 },
+ { "ignorepreviousmessages", eCmdHdlrBinary, 0 }
};
static struct cnfparamblk modpblk =
{ CNFPARAMBLK_VERSION,
@@ -79,11 +86,12 @@ static struct cnfparamblk modpblk =
#define DFLT_persiststateinterval 10
-static int bLegacyCnfModGlobalsPermitted = 0;/* are legacy module-global config parameters permitted? */
+static int bLegacyCnfModGlobalsPermitted = 1;/* are legacy module-global config parameters permitted? */
static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */
static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 */
+static ratelimit_t *ratelimiter = NULL;
static sd_journal *j;
/* enqueue the the journal message into the message queue.
@@ -121,7 +129,7 @@ enqMsg(uchar *msg, uchar *pszTag, int iFacility, int iSeverity, struct timeval *
msgAddJSON(pMsg, (uchar*)"!", json);
}
- CHKiRet(submitMsg2(pMsg));
+ CHKiRet(ratelimitAddMsg(ratelimiter, NULL, pMsg));
finalize_it:
RETiRet;
@@ -244,7 +252,13 @@ readjournal() {
SD_JOURNAL_FOREACH_DATA(j, get, l) {
/* locate equal sign, this is always present */
equal_sign = memchr(get, '=', l);
- assert (equal_sign != NULL);
+
+ /* ... but we know better than to trust the specs */
+ if (equal_sign == NULL) {
+ errmsg.LogError(0, RS_RET_ERR, "SD_JOURNAL_FOREACH_DATA()"
+ "returned a malformed field (has no '='): '%s'", get);
+ continue; /* skip the entry */
+ }
/* get length of journal data prefix */
prefixlen = ((char *)equal_sign - (char *)get);
@@ -427,12 +441,13 @@ finalize_it:
}
-BEGINrunInput
-CODESTARTrunInput
- /* this is an endless loop - it is terminated when the thread is
- * signalled to do so. This, however, is handled by the framework,
- * right into the sleep below.
- */
+/* This function loads a journal cursor from the state file.
+ */
+static rsRetVal
+loadJournalState()
+{
+ DEFiRet;
+
if (cs.stateFile[0] != '/') {
char *new_stateFile;
@@ -470,8 +485,48 @@ CODESTARTrunInput
errmsg.LogError(0, RS_RET_FOPEN_FAILURE, "imjournal: "
"open on state file `%s' failed\n", cs.stateFile);
}
+ } else {
+ /* when IgnorePrevious, seek to the end of journal */
+ if (cs.bIgnorePrevious) {
+ if (sd_journal_seek_tail(j) < 0) {
+ char errStr[256];
+
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, RS_RET_ERR,
+ "sd_journal_seek_tail() failed: '%s'", errStr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ if (sd_journal_previous(j) < 0) {
+ char errStr[256];
+
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, RS_RET_ERR,
+ "sd_journal_previous() failed: '%s'", errStr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+BEGINrunInput
+CODESTARTrunInput
+ CHKiRet(ratelimitNew(&ratelimiter, "imjournal", NULL));
+ dbgprintf("imjournal: ratelimiting burst %d, interval %d\n", cs.ratelimitBurst,
+ cs.ratelimitInterval);
+ ratelimitSetLinuxLike(ratelimiter, cs.ratelimitInterval, cs.ratelimitBurst);
+ ratelimitSetNoTimeCache(ratelimiter);
+
+ if (cs.stateFile) {
+ CHKiRet(loadJournalState());
}
+ /* this is an endless loop - it is terminated when the thread is
+ * signalled to do so. This, however, is handled by the framework.
+ */
while (glbl.GetGlobalInputTermState() == 0) {
int count = 0, r;
@@ -492,11 +547,13 @@ CODESTARTrunInput
}
CHKiRet(readjournal());
- /* TODO: This could use some finer metric. */
- count++;
- if (count == cs.iPersistStateInterval) {
- count = 0;
- persistJournalState();
+ if (cs.stateFile) { /* can't persist without a state file */
+ /* TODO: This could use some finer metric. */
+ count++;
+ if (count == cs.iPersistStateInterval) {
+ count = 0;
+ persistJournalState();
+ }
}
}
@@ -510,6 +567,8 @@ CODESTARTbeginCnfLoad
cs.iPersistStateInterval = DFLT_persiststateinterval;
cs.stateFile = NULL;
+ cs.ratelimitBurst = 20000;
+ cs.ratelimitInterval = 600;
ENDbeginCnfLoad
@@ -545,8 +604,11 @@ ENDwillRun
/* close journal */
BEGINafterRun
CODESTARTafterRun
- persistJournalState();
+ if (cs.stateFile) { /* can't persist without a state file */
+ persistJournalState();
+ }
sd_journal_close(j);
+ ratelimitDestruct(ratelimiter);
ENDafterRun
@@ -589,6 +651,12 @@ CODESTARTsetModCnf
cs.iPersistStateInterval = (int) pvals[i].val.d.n;
} else if (!strcmp(modpblk.descr[i].name, "statefile")) {
cs.stateFile = (char *)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(modpblk.descr[i].name, "ratelimit.burst")) {
+ cs.ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "ratelimit.interval")) {
+ cs.ratelimitInterval = (int) pvals[i].val.d.n;
+ } else if (!strcmp(modpblk.descr[i].name, "ignorepreviousmessages")) {
+ cs.bIgnorePrevious = (int) pvals[i].val.d.n;
} else {
dbgprintf("imjournal: program error, non-handled "
"param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
@@ -632,10 +700,16 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(prop.CreateStringProp(&pInputName, UCHAR_CONSTANT("imjournal"), sizeof("imjournal") - 1));
CHKiRet(prop.CreateStringProp(&pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"persiststateinterval", 0, eCmdHdlrInt,
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalpersiststateinterval", 0, eCmdHdlrInt,
NULL, &cs.iPersistStateInterval, STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"statefile", 0, eCmdHdlrGetWord,
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalratelimitinterval", 0, eCmdHdlrInt,
+ NULL, &cs.ratelimitInterval, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalratelimitburst", 0, eCmdHdlrInt,
+ NULL, &cs.ratelimitBurst, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalstatefile", 0, eCmdHdlrGetWord,
NULL, &cs.stateFile, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalignorepreviousmessages", 0, eCmdHdlrBinary,
+ NULL, &cs.bIgnorePrevious, STD_LOADABLE_MODULE_ID));
ENDmodInit