diff options
Diffstat (limited to 'plugins/omhdfs/omhdfs.c')
-rw-r--r-- | plugins/omhdfs/omhdfs.c | 43 |
1 files changed, 34 insertions, 9 deletions
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c index f8a7e73..e173fb3 100644 --- a/plugins/omhdfs/omhdfs.c +++ b/plugins/omhdfs/omhdfs.c @@ -4,7 +4,7 @@ * NOTE: read comments in module-template.h to understand how this file * works! * - * Copyright 2010 Rainer Gerhards and Adiscon GmbH. + * Copyright 2010-2014 Rainer Gerhards and Adiscon GmbH. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -36,7 +36,12 @@ #include <unistd.h> #include <sys/file.h> #include <pthread.h> -#include <hdfs.h> +#ifdef HAVE_HDFS_H +# include <hdfs.h> +#endif +#ifdef HAVE_HADOOP_HDFS_H +# include <hadoop/hdfs.h> +#endif #include "syslogd-types.h" #include "srUtils.h" @@ -51,7 +56,7 @@ MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP -MODULE_CNFNAME("omhdfs") +/* MODULE_CNFNAME("omhdfs") we need this only when we convert the module to v2 config system */ /* internal structures */ @@ -60,6 +65,7 @@ DEFobjCurrIf(errmsg) /* global data */ static struct hashtable *files; /* holds all file objects that we know */ +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; typedef struct configSettings_s { uchar *fileName; @@ -69,11 +75,6 @@ typedef struct configSettings_s { } configSettings_t; static configSettings_t cs; - -BEGINinitConfVars /* (re)set config variables to default values */ -CODESTARTinitConfVars -ENDinitConfVars - typedef struct { uchar *name; hdfsFS fs; @@ -91,6 +92,10 @@ typedef struct _instanceData { unsigned offsBuf; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* forward definitions (down here, need data types) */ static inline rsRetVal fileClose(file_t *pFile); @@ -387,6 +392,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINfreeInstance CODESTARTfreeInstance if(pData->pFile != NULL) @@ -394,8 +404,15 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINtryResume + instanceData *pData = pWrkrData->pData; CODESTARTtryResume + pthread_mutex_lock(&mutDoAct); fileClose(pData->pFile); fileOpen(pData->pFile); if(pData->pFile->fh == NULL){ @@ -403,6 +420,7 @@ CODESTARTtryResume pData->pFile->name); iRet = RS_RET_SUSPENDED; } + pthread_mutex_unlock(&mutDoAct); ENDtryResume @@ -413,20 +431,26 @@ ENDbeginTransaction BEGINdoAction + instanceData *pData = pWrkrData->pData; CODESTARTdoAction DBGPRINTF("omhdfs: action to to write to %s\n", pData->pFile->name); + pthread_mutex_lock(&mutDoAct); iRet = addData(pData, ppString[0]); -dbgprintf("omhdfs: done doAction\n"); + DBGPRINTF("omhdfs: done doAction\n"); + pthread_mutex_unlock(&mutDoAct); ENDdoAction BEGINendTransaction + instanceData *pData = pWrkrData->pData; CODESTARTendTransaction dbgprintf("omhdfs: endTransaction\n"); + pthread_mutex_lock(&mutDoAct); if(pData->offsBuf != 0) { DBGPRINTF("omhdfs: data unwritten at end of transaction, persisting...\n"); iRet = fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf); } + pthread_mutex_unlock(&mutDoAct); ENDendTransaction @@ -526,6 +550,7 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_doHUP ENDqueryEtryPt |