summaryrefslogtreecommitdiff
path: root/tools/omfile.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/omfile.c')
-rw-r--r--tools/omfile.c83
1 files changed, 80 insertions, 3 deletions
diff --git a/tools/omfile.c b/tools/omfile.c
index ab17afa..39c0173 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -17,7 +17,7 @@
* pipes. These have been moved to ompipe, to reduced the entanglement
* between the two different functionalities. -- rgerhards
*
- * Copyright 2007-2013 Adiscon GmbH.
+ * Copyright 2007-2014 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -71,6 +71,7 @@
#include "statsobj.h"
#include "sigprov.h"
#include "cryprov.h"
+#include "janitor.h"
MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
@@ -95,7 +96,7 @@ DEFobjCurrIf(statsobj)
* That should be sufficient (and even than, there would no really bad effect ;)).
* The variable below is the global counter/clock.
*/
-#if HAVE_ATOMIC_BUILTINS_64BIT
+#if HAVE_ATOMIC_BUILTINS64
static uint64 clockFileAccess = 0;
#else
static unsigned clockFileAccess = 0;
@@ -107,7 +108,7 @@ static pthread_mutex_t mutClock;
static inline uint64
getClockFileAccess(void)
{
-#if HAVE_ATOMIC_BUILTINS_64BIT
+#if HAVE_ATOMIC_BUILTINS64
return ATOMIC_INC_AND_FETCH_uint64(&clockFileAccess, &mutClock);
#else
return ATOMIC_INC_AND_FETCH_unsigned(&clockFileAccess, &mutClock);
@@ -122,6 +123,7 @@ struct s_dynaFileCacheEntry {
strm_t *pStrm; /* our output stream */
void *sigprovFileData; /* opaque data ptr for provider use */
uint64 clkTickAccessed;/* for LRU - based on clockFileAccess */
+ short nInactive; /* number of minutes not writen - for close timeout */
};
typedef struct s_dynaFileCacheEntry dynaFileCacheEntry;
@@ -137,6 +139,7 @@ typedef struct _instanceData {
uchar *fname; /* file or template name (display only) */
uchar *tplName; /* name of assigned template */
strm_t *pStrm; /* our output stream */
+ short nInactive; /* number of minutes not writen (STATIC files only) */
char bDynamicName; /* 0 - static name, 1 - dynamic name (with properties) */
int fCreateMode; /* file creation mode for open() */
int fDirCreateMode; /* creation mode for mkdir() */
@@ -172,6 +175,7 @@ typedef struct _instanceData {
int iZipLevel; /* zip mode to use for this selector */
int iIOBufSize; /* size of associated io buffer */
int iFlushInterval; /* how fast flush buffer on inactivity? */
+ short iCloseTimeout; /* after how many *minutes* shall the file be closed if inactive? */
sbool bFlushOnTXEnd; /* flush write buffers when transaction has ended? */
sbool bUseAsyncWriter; /* use async stream writer? */
sbool bVeryRobustZip;
@@ -181,6 +185,8 @@ typedef struct _instanceData {
STATSCOUNTER_DEF(ctrEvict, mutCtrEvict);
STATSCOUNTER_DEF(ctrMiss, mutCtrMiss);
STATSCOUNTER_DEF(ctrMax, mutCtrMax);
+ STATSCOUNTER_DEF(ctrCloseTimeouts, mutCtrCloseTimeouts);
+ char janitorID[128]; /* holds ID for janitor calls */
} instanceData;
@@ -271,6 +277,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "dynafile", eCmdHdlrString, 0 }, /* "dynafile" MUST be present */
{ "sig.provider", eCmdHdlrGetWord, 0 },
{ "cry.provider", eCmdHdlrGetWord, 0 },
+ { "closetimeout", eCmdHdlrPositiveInt, 0 },
{ "template", eCmdHdlrGetWord, 0 }
};
static struct cnfparamblk actpblk =
@@ -772,6 +779,7 @@ prepareDynFile(instanceData *__restrict__ const pData, const uchar *__restrict__
DBGPRINTF("Added new entry %d for file cache, file '%s'.\n", iFirstFree, newFileName);
finalize_it:
+ pCache[pData->iCurrElt]->nInactive = 0;
RETiRet;
}
@@ -826,6 +834,7 @@ writeFile(instanceData *__restrict__ const pData,
"Could not open output file '%s'", pData->fname);
}
}
+ pData->nInactive = 0;
}
CHKiRet(doWrite(pData,
@@ -908,6 +917,54 @@ finalize_it:
cnfparamvalsDestruct(pvals, &modpblk);
ENDsetModCnf
+/* This function checks dynafile cache for janitor action */
+static inline void
+janitorChkDynaFiles(instanceData *__restrict__ const pData)
+{
+ int i;
+ dynaFileCacheEntry **pCache = pData->dynCache;
+
+ for(i = 0 ; i < pData->iCurrCacheSize ; ++i) {
+ if(pCache[i] == NULL)
+ continue;
+ DBGPRINTF("omfile janitor: checking dynafile %d:%s, inactive since %d\n", i,
+ pCache[i]->pName == NULL ? UCHAR_CONSTANT("[OPEN FAILED]") : pCache[i]->pName,
+ (int) pCache[i]->nInactive);
+ if(pCache[i]->nInactive >= pData->iCloseTimeout) {
+ STATSCOUNTER_INC(pData->ctrCloseTimeouts, pData->mutCtrCloseTimeouts);
+ dynaFileDelCacheEntry(pData, i, 1);
+ if(pData->iCurrElt == i)
+ pData->iCurrElt = -1; /* no longer available! */
+ } else {
+ pCache[i]->nInactive += janitorInterval;
+ }
+ }
+}
+
+/* callback for the janitor. This cleans out files (if so configured) */
+void
+janitorCB(void *pUsr)
+{
+ instanceData *__restrict__ const pData = (instanceData *) pUsr;
+ pthread_mutex_lock(&pData->mutWrite);
+ if(pData->bDynamicName) {
+ janitorChkDynaFiles(pData);
+ } else {
+ if(pData->pStrm != NULL) {
+ DBGPRINTF("omfile janitor: checking file %s, inactive since %d\n",
+ pData->fname, pData->nInactive);
+ if(pData->nInactive >= pData->iCloseTimeout) {
+ STATSCOUNTER_INC(pData->ctrCloseTimeouts, pData->mutCtrCloseTimeouts);
+ closeFile(pData);
+ } else {
+ pData->nInactive += janitorInterval;
+ }
+ }
+ }
+ pthread_mutex_unlock(&pData->mutWrite);
+}
+
+
BEGINendCnfLoad
CODESTARTendCnfLoad
loadModConf = NULL; /* done loading */
@@ -947,6 +1004,8 @@ BEGINfreeInstance
CODESTARTfreeInstance
free(pData->tplName);
free(pData->fname);
+ if(pData->iCloseTimeout > 0)
+ janitorDelEtry(pData->janitorID);
if(pData->bDynamicName) {
dynaFileFreeCache(pData);
} else if(pData->pStrm != NULL)
@@ -1032,6 +1091,7 @@ setInstParamDefaults(instanceData *__restrict__ const pData)
pData->cryprovName = NULL;
pData->useSigprov = 0;
pData->useCryprov = 0;
+ pData->iCloseTimeout = -1;
}
@@ -1065,6 +1125,9 @@ setupInstStatsCtrs(instanceData *__restrict__ const pData)
STATSCOUNTER_INIT(pData->ctrMax, pData->mutCtrMax);
CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("maxused"),
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pData->ctrMax)));
+ STATSCOUNTER_INIT(pData->ctrCloseTimeouts, pData->mutCtrCloseTimeouts);
+ CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("closetimeouts"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pData->ctrCloseTimeouts)));
CHKiRet(statsobj.ConstructFinalize(pData->stats));
finalize_it:
@@ -1236,6 +1299,8 @@ CODESTARTnewActInst
pData->sigprovName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "cry.provider")) {
pData->cryprovName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "closetimeout")) {
+ pData->iCloseTimeout = (int) pvals[i].val.d.n;
} else {
dbgprintf("omfile: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
@@ -1275,6 +1340,17 @@ CODESTARTnewActInst
// TODO: add pData->iSizeLimit = 0; /* default value, use outchannels to configure! */
setupInstStatsCtrs(pData);
+ if(pData->iCloseTimeout == -1) { /* unset? */
+ pData->iCloseTimeout = (pData->bDynamicName) ? 10 : 0;
+ }
+
+ snprintf(pData->janitorID, sizeof(pData->janitorID), "omfile:%sfile:%s:%p",
+ (pData->bDynamicName) ? "dyna" : "", pData->fname, pData);
+ pData->janitorID[sizeof(pData->janitorID)-1] = '\0'; /* just in case... */
+
+ if(pData->iCloseTimeout > 0)
+ janitorAddEtry(janitorCB, pData->janitorID, pData);
+
CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
@@ -1366,6 +1442,7 @@ CODESTARTparseSelectorAct
pData->iFlushInterval = cs.iFlushInterval;
pData->bUseAsyncWriter = cs.bUseAsyncWriter;
pData->bVeryRobustZip = 0; /* cannot be specified via legacy conf */
+ pData->iCloseTimeout = 0; /* cannot be specified via legacy conf */
setupInstStatsCtrs(pData);
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct