summaryrefslogtreecommitdiff
path: root/plugins/omhdfs/omhdfs.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/omhdfs/omhdfs.c')
-rw-r--r--plugins/omhdfs/omhdfs.c43
1 files changed, 34 insertions, 9 deletions
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
index f8a7e73..e173fb3 100644
--- a/plugins/omhdfs/omhdfs.c
+++ b/plugins/omhdfs/omhdfs.c
@@ -4,7 +4,7 @@
* NOTE: read comments in module-template.h to understand how this file
* works!
*
- * Copyright 2010 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2010-2014 Rainer Gerhards and Adiscon GmbH.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@@ -36,7 +36,12 @@
#include <unistd.h>
#include <sys/file.h>
#include <pthread.h>
-#include <hdfs.h>
+#ifdef HAVE_HDFS_H
+# include <hdfs.h>
+#endif
+#ifdef HAVE_HADOOP_HDFS_H
+# include <hadoop/hdfs.h>
+#endif
#include "syslogd-types.h"
#include "srUtils.h"
@@ -51,7 +56,7 @@
MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
-MODULE_CNFNAME("omhdfs")
+/* MODULE_CNFNAME("omhdfs") we need this only when we convert the module to v2 config system */
/* internal structures
*/
@@ -60,6 +65,7 @@ DEFobjCurrIf(errmsg)
/* global data */
static struct hashtable *files; /* holds all file objects that we know */
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
typedef struct configSettings_s {
uchar *fileName;
@@ -69,11 +75,6 @@ typedef struct configSettings_s {
} configSettings_t;
static configSettings_t cs;
-
-BEGINinitConfVars /* (re)set config variables to default values */
-CODESTARTinitConfVars
-ENDinitConfVars
-
typedef struct {
uchar *name;
hdfsFS fs;
@@ -91,6 +92,10 @@ typedef struct _instanceData {
unsigned offsBuf;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* forward definitions (down here, need data types) */
static inline rsRetVal fileClose(file_t *pFile);
@@ -387,6 +392,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINfreeInstance
CODESTARTfreeInstance
if(pData->pFile != NULL)
@@ -394,8 +404,15 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINtryResume
+ instanceData *pData = pWrkrData->pData;
CODESTARTtryResume
+ pthread_mutex_lock(&mutDoAct);
fileClose(pData->pFile);
fileOpen(pData->pFile);
if(pData->pFile->fh == NULL){
@@ -403,6 +420,7 @@ CODESTARTtryResume
pData->pFile->name);
iRet = RS_RET_SUSPENDED;
}
+ pthread_mutex_unlock(&mutDoAct);
ENDtryResume
@@ -413,20 +431,26 @@ ENDbeginTransaction
BEGINdoAction
+ instanceData *pData = pWrkrData->pData;
CODESTARTdoAction
DBGPRINTF("omhdfs: action to to write to %s\n", pData->pFile->name);
+ pthread_mutex_lock(&mutDoAct);
iRet = addData(pData, ppString[0]);
-dbgprintf("omhdfs: done doAction\n");
+ DBGPRINTF("omhdfs: done doAction\n");
+ pthread_mutex_unlock(&mutDoAct);
ENDdoAction
BEGINendTransaction
+ instanceData *pData = pWrkrData->pData;
CODESTARTendTransaction
dbgprintf("omhdfs: endTransaction\n");
+ pthread_mutex_lock(&mutDoAct);
if(pData->offsBuf != 0) {
DBGPRINTF("omhdfs: data unwritten at end of transaction, persisting...\n");
iRet = fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf);
}
+ pthread_mutex_unlock(&mutDoAct);
ENDendTransaction
@@ -526,6 +550,7 @@ BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_doHUP
ENDqueryEtryPt