diff options
Diffstat (limited to 'tools/omfile.c')
-rw-r--r-- | tools/omfile.c | 83 |
1 files changed, 80 insertions, 3 deletions
diff --git a/tools/omfile.c b/tools/omfile.c index ab17afa..39c0173 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -17,7 +17,7 @@ * pipes. These have been moved to ompipe, to reduced the entanglement * between the two different functionalities. -- rgerhards * - * Copyright 2007-2013 Adiscon GmbH. + * Copyright 2007-2014 Adiscon GmbH. * * This file is part of rsyslog. * @@ -71,6 +71,7 @@ #include "statsobj.h" #include "sigprov.h" #include "cryprov.h" +#include "janitor.h" MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP @@ -95,7 +96,7 @@ DEFobjCurrIf(statsobj) * That should be sufficient (and even than, there would no really bad effect ;)). * The variable below is the global counter/clock. */ -#if HAVE_ATOMIC_BUILTINS_64BIT +#if HAVE_ATOMIC_BUILTINS64 static uint64 clockFileAccess = 0; #else static unsigned clockFileAccess = 0; @@ -107,7 +108,7 @@ static pthread_mutex_t mutClock; static inline uint64 getClockFileAccess(void) { -#if HAVE_ATOMIC_BUILTINS_64BIT +#if HAVE_ATOMIC_BUILTINS64 return ATOMIC_INC_AND_FETCH_uint64(&clockFileAccess, &mutClock); #else return ATOMIC_INC_AND_FETCH_unsigned(&clockFileAccess, &mutClock); @@ -122,6 +123,7 @@ struct s_dynaFileCacheEntry { strm_t *pStrm; /* our output stream */ void *sigprovFileData; /* opaque data ptr for provider use */ uint64 clkTickAccessed;/* for LRU - based on clockFileAccess */ + short nInactive; /* number of minutes not writen - for close timeout */ }; typedef struct s_dynaFileCacheEntry dynaFileCacheEntry; @@ -137,6 +139,7 @@ typedef struct _instanceData { uchar *fname; /* file or template name (display only) */ uchar *tplName; /* name of assigned template */ strm_t *pStrm; /* our output stream */ + short nInactive; /* number of minutes not writen (STATIC files only) */ char bDynamicName; /* 0 - static name, 1 - dynamic name (with properties) */ int fCreateMode; /* file creation mode for open() */ int fDirCreateMode; /* creation mode for mkdir() */ @@ -172,6 +175,7 @@ typedef struct _instanceData { int iZipLevel; /* zip mode to use for this selector */ int iIOBufSize; /* size of associated io buffer */ int iFlushInterval; /* how fast flush buffer on inactivity? */ + short iCloseTimeout; /* after how many *minutes* shall the file be closed if inactive? */ sbool bFlushOnTXEnd; /* flush write buffers when transaction has ended? */ sbool bUseAsyncWriter; /* use async stream writer? */ sbool bVeryRobustZip; @@ -181,6 +185,8 @@ typedef struct _instanceData { STATSCOUNTER_DEF(ctrEvict, mutCtrEvict); STATSCOUNTER_DEF(ctrMiss, mutCtrMiss); STATSCOUNTER_DEF(ctrMax, mutCtrMax); + STATSCOUNTER_DEF(ctrCloseTimeouts, mutCtrCloseTimeouts); + char janitorID[128]; /* holds ID for janitor calls */ } instanceData; @@ -271,6 +277,7 @@ static struct cnfparamdescr actpdescr[] = { { "dynafile", eCmdHdlrString, 0 }, /* "dynafile" MUST be present */ { "sig.provider", eCmdHdlrGetWord, 0 }, { "cry.provider", eCmdHdlrGetWord, 0 }, + { "closetimeout", eCmdHdlrPositiveInt, 0 }, { "template", eCmdHdlrGetWord, 0 } }; static struct cnfparamblk actpblk = @@ -772,6 +779,7 @@ prepareDynFile(instanceData *__restrict__ const pData, const uchar *__restrict__ DBGPRINTF("Added new entry %d for file cache, file '%s'.\n", iFirstFree, newFileName); finalize_it: + pCache[pData->iCurrElt]->nInactive = 0; RETiRet; } @@ -826,6 +834,7 @@ writeFile(instanceData *__restrict__ const pData, "Could not open output file '%s'", pData->fname); } } + pData->nInactive = 0; } CHKiRet(doWrite(pData, @@ -908,6 +917,54 @@ finalize_it: cnfparamvalsDestruct(pvals, &modpblk); ENDsetModCnf +/* This function checks dynafile cache for janitor action */ +static inline void +janitorChkDynaFiles(instanceData *__restrict__ const pData) +{ + int i; + dynaFileCacheEntry **pCache = pData->dynCache; + + for(i = 0 ; i < pData->iCurrCacheSize ; ++i) { + if(pCache[i] == NULL) + continue; + DBGPRINTF("omfile janitor: checking dynafile %d:%s, inactive since %d\n", i, + pCache[i]->pName == NULL ? UCHAR_CONSTANT("[OPEN FAILED]") : pCache[i]->pName, + (int) pCache[i]->nInactive); + if(pCache[i]->nInactive >= pData->iCloseTimeout) { + STATSCOUNTER_INC(pData->ctrCloseTimeouts, pData->mutCtrCloseTimeouts); + dynaFileDelCacheEntry(pData, i, 1); + if(pData->iCurrElt == i) + pData->iCurrElt = -1; /* no longer available! */ + } else { + pCache[i]->nInactive += janitorInterval; + } + } +} + +/* callback for the janitor. This cleans out files (if so configured) */ +void +janitorCB(void *pUsr) +{ + instanceData *__restrict__ const pData = (instanceData *) pUsr; + pthread_mutex_lock(&pData->mutWrite); + if(pData->bDynamicName) { + janitorChkDynaFiles(pData); + } else { + if(pData->pStrm != NULL) { + DBGPRINTF("omfile janitor: checking file %s, inactive since %d\n", + pData->fname, pData->nInactive); + if(pData->nInactive >= pData->iCloseTimeout) { + STATSCOUNTER_INC(pData->ctrCloseTimeouts, pData->mutCtrCloseTimeouts); + closeFile(pData); + } else { + pData->nInactive += janitorInterval; + } + } + } + pthread_mutex_unlock(&pData->mutWrite); +} + + BEGINendCnfLoad CODESTARTendCnfLoad loadModConf = NULL; /* done loading */ @@ -947,6 +1004,8 @@ BEGINfreeInstance CODESTARTfreeInstance free(pData->tplName); free(pData->fname); + if(pData->iCloseTimeout > 0) + janitorDelEtry(pData->janitorID); if(pData->bDynamicName) { dynaFileFreeCache(pData); } else if(pData->pStrm != NULL) @@ -1032,6 +1091,7 @@ setInstParamDefaults(instanceData *__restrict__ const pData) pData->cryprovName = NULL; pData->useSigprov = 0; pData->useCryprov = 0; + pData->iCloseTimeout = -1; } @@ -1065,6 +1125,9 @@ setupInstStatsCtrs(instanceData *__restrict__ const pData) STATSCOUNTER_INIT(pData->ctrMax, pData->mutCtrMax); CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("maxused"), ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pData->ctrMax))); + STATSCOUNTER_INIT(pData->ctrCloseTimeouts, pData->mutCtrCloseTimeouts); + CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("closetimeouts"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pData->ctrCloseTimeouts))); CHKiRet(statsobj.ConstructFinalize(pData->stats)); finalize_it: @@ -1236,6 +1299,8 @@ CODESTARTnewActInst pData->sigprovName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "cry.provider")) { pData->cryprovName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "closetimeout")) { + pData->iCloseTimeout = (int) pvals[i].val.d.n; } else { dbgprintf("omfile: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); @@ -1275,6 +1340,17 @@ CODESTARTnewActInst // TODO: add pData->iSizeLimit = 0; /* default value, use outchannels to configure! */ setupInstStatsCtrs(pData); + if(pData->iCloseTimeout == -1) { /* unset? */ + pData->iCloseTimeout = (pData->bDynamicName) ? 10 : 0; + } + + snprintf(pData->janitorID, sizeof(pData->janitorID), "omfile:%sfile:%s:%p", + (pData->bDynamicName) ? "dyna" : "", pData->fname, pData); + pData->janitorID[sizeof(pData->janitorID)-1] = '\0'; /* just in case... */ + + if(pData->iCloseTimeout > 0) + janitorAddEtry(janitorCB, pData->janitorID, pData); + CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst @@ -1366,6 +1442,7 @@ CODESTARTparseSelectorAct pData->iFlushInterval = cs.iFlushInterval; pData->bUseAsyncWriter = cs.bUseAsyncWriter; pData->bVeryRobustZip = 0; /* cannot be specified via legacy conf */ + pData->iCloseTimeout = 0; /* cannot be specified via legacy conf */ setupInstStatsCtrs(pData); CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct |