summaryrefslogtreecommitdiff
path: root/plugins/omprog/omprog.c
diff options
context:
space:
mode:
authorMichael Biebl <biebl@debian.org>2014-04-03 03:08:50 +0200
committerMichael Biebl <biebl@debian.org>2014-04-03 03:08:50 +0200
commit9374a46543e9c43c009f80def8c3b2506b0b377e (patch)
tree8853fd40ee8d55ff24304ff8a4421640f3493c58 /plugins/omprog/omprog.c
parent209e193f14ec562df5aad945f04cd88b227cc602 (diff)
downloadrsyslog-upstream/8.2.0.tar.gz
Imported Upstream version 8.2.0upstream/8.2.0
Diffstat (limited to 'plugins/omprog/omprog.c')
-rw-r--r--plugins/omprog/omprog.c281
1 files changed, 215 insertions, 66 deletions
diff --git a/plugins/omprog/omprog.c b/plugins/omprog/omprog.c
index cd07dcf..fb611f6 100644
--- a/plugins/omprog/omprog.c
+++ b/plugins/omprog/omprog.c
@@ -6,7 +6,7 @@
*
* File begun on 2009-04-01 by RGerhards
*
- * Copyright 2009-2012 Adiscon GmbH.
+ * Copyright 2009-2014 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -34,7 +34,9 @@
#include <signal.h>
#include <errno.h>
#include <unistd.h>
+#include <fcntl.h>
#include <wait.h>
+#include <pthread.h>
#include "conf.h"
#include "syslogd-types.h"
#include "srUtils.h"
@@ -56,12 +58,21 @@ typedef struct _instanceData {
uchar *szBinary; /* name of binary to call */
char **aParams; /* Optional Parameters for binary command */
uchar *tplName; /* assigned output template */
- pid_t pid; /* pid of currently running process */
- int fdPipe; /* file descriptor to write to */
- int bIsRunning; /* is binary currently running? 0-no, 1-yes */
int iParams; /* Holds the count of parameters if set*/
+ int bForceSingleInst; /* only a single wrkr instance of program permitted? */
+ uchar *outputFileName; /* name of file for std[out/err] or NULL if to discard */
+ pthread_mutex_t mut; /* make sure only one instance is active */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ pid_t pid; /* pid of currently running process */
+ int fdOutput; /* it's fd (-1 if closed) */
+ int fdPipeOut; /* file descriptor to write to */
+ int fdPipeIn; /* fd we receive messages from the program (if we want to) */
+ int bIsRunning; /* is binary currently running? 0-no, 1-yes */
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
uchar *szBinary; /* name of binary to call */
} configSettings_t;
@@ -72,6 +83,8 @@ static configSettings_t cs;
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
{ "binary", eCmdHdlrString, CNFPARAM_REQUIRED },
+ { "output", eCmdHdlrString, 0 },
+ { "forcesingleinstance", eCmdHdlrBinary, 0 },
{ "template", eCmdHdlrGetWord, 0 }
};
static struct cnfparamblk actpblk =
@@ -89,8 +102,17 @@ ENDinitConfVars
BEGINcreateInstance
CODESTARTcreateInstance
+ pthread_mutex_init(&pData->mut, NULL);
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->fdPipeIn = -1;
+ pWrkrData->fdPipeOut = -1;
+ pWrkrData->fdOutput = -1;
+ pWrkrData->bIsRunning = 0;
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -102,8 +124,9 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
int i;
CODESTARTfreeInstance
- if(pData->szBinary != NULL)
- free(pData->szBinary);
+ pthread_mutex_destroy(&pData->mut);
+ free(pData->szBinary);
+ free(pData->outputFileName);
if(pData->aParams != NULL) {
for (i = 0; i < pData->iParams; i++) {
free(pData->aParams[i]);
@@ -112,6 +135,10 @@ CODESTARTfreeInstance
}
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -123,27 +150,99 @@ CODESTARTtryResume
ENDtryResume
+/* As this is assume to be a debug function, we only make
+ * best effort to write the message but do *not* try very
+ * hard to handle errors. -- rgerhards, 2014-01-16
+ */
+static void
+writeProgramOutput(wrkrInstanceData_t *__restrict__ const pWrkrData,
+ const char *__restrict__ const buf,
+ const ssize_t lenBuf)
+{
+ char errStr[1024];
+ ssize_t r;
+
+dbgprintf("omprog: writeProgramOutput, fd %d\n", pWrkrData->fdOutput);
+ if(pWrkrData->fdOutput == -1) {
+ pWrkrData->fdOutput = open((char*)pWrkrData->pData->outputFileName,
+ O_WRONLY | O_APPEND | O_CREAT, 0600);
+ if(pWrkrData->fdOutput == -1) {
+ DBGPRINTF("omprog: error opening output file %s: %s\n",
+ pWrkrData->pData->outputFileName,
+ rs_strerror_r(errno, errStr, sizeof(errStr)));
+ goto done;
+ }
+ }
+
+ r = write(pWrkrData->fdOutput, buf, (size_t) lenBuf);
+ if(r != lenBuf) {
+ DBGPRINTF("omprog: problem writing output file %s: bytes "
+ "requested %lld, written %lld, msg: %s\n",
+ pWrkrData->pData->outputFileName, (long long) lenBuf, (long long) r,
+ rs_strerror_r(errno, errStr, sizeof(errStr)));
+ }
+done: return;
+}
+
+
+/* check output of the executed program
+ * If configured to care about the output, we check if there is some and,
+ * if so, properly handle it.
+ */
+static void
+checkProgramOutput(wrkrInstanceData_t *__restrict__ const pWrkrData)
+{
+ char buf[4096];
+ ssize_t r;
+
+dbgprintf("omprog: checking prog output, fd %d\n", pWrkrData->fdPipeIn);
+ if(pWrkrData->fdPipeIn == -1)
+ goto done;
+
+ do {
+memset(buf, 0, sizeof(buf));
+ r = read(pWrkrData->fdPipeIn, buf, sizeof(buf));
+dbgprintf("omprog: read state %lld, data '%s'\n", (long long) r, buf);
+ if(r > 0)
+ writeProgramOutput(pWrkrData, buf, r);
+ } while(r > 0);
+
+done: return;
+}
+
+
+
/* execute the child process (must be called in child context
* after fork).
*/
-
-static void execBinary(instanceData *pData, int fdStdin)
+static void
+execBinary(wrkrInstanceData_t *pWrkrData, int fdStdin, int fdStdOutErr)
{
int i, iRet;
struct sigaction sigAct;
sigset_t set;
+ char errStr[1024];
char *newenviron[] = { NULL };
- assert(pData != NULL);
-
fclose(stdin);
if(dup(fdStdin) == -1) {
- DBGPRINTF("omprog: dup() failed\n");
+ DBGPRINTF("omprog: dup() stdin failed\n");
/* do some more error handling here? Maybe if the module
* gets some more widespread use...
*/
}
- /*fclose(stdout);*/
+ if(pWrkrData->pData->outputFileName == NULL) {
+ close(fdStdOutErr);
+ } else {
+ close(1);
+ if(dup(fdStdOutErr) == -1) {
+ DBGPRINTF("omprog: dup() stdout failed\n");
+ }
+ close(2);
+ if(dup(fdStdOutErr) == -1) {
+ DBGPRINTF("omprog: dup() stderr failed\n");
+ }
+ }
/* we close all file handles as we fork soon
* Is there a better way to do this? - mail me! rgerhards@adiscon.com
@@ -159,15 +258,26 @@ static void execBinary(instanceData *pData, int fdStdin)
sigAct.sa_handler = SIG_DFL;
for(i = 1 ; i < NSIG ; ++i)
sigaction(i, &sigAct, NULL);
+ /* we need to block SIGINT, otherwise our program is cancelled when we are
+ * stopped in debug mode.
+ */
+ sigAct.sa_handler = SIG_IGN;
+ sigaction(SIGINT, &sigAct, NULL);
sigemptyset(&set);
sigprocmask(SIG_SETMASK, &set, NULL);
alarm(0);
/* finally exec child */
- iRet = execve((char*)pData->szBinary, pData->aParams, newenviron);
- if (iRet == -1) {
- dbgprintf("omprog: failed to execute binary '%s' with return code: %d\n", pData->szBinary, errno);
+ iRet = execve((char*)pWrkrData->pData->szBinary, pWrkrData->pData->aParams, newenviron);
+ if(iRet == -1) {
+ /* Note: this will go to stdout of the **child**, so rsyslog will never
+ * see it except when stdout is captured. If we use the plugin interface,
+ * we can use this to convey a proper status back!
+ */
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ DBGPRINTF("omprog: failed to execute binary '%s': %s\n",
+ pWrkrData->pData->szBinary, errStr);
}
/* we should never reach this point, but if we do, we terminate */
@@ -179,19 +289,23 @@ static void execBinary(instanceData *pData, int fdStdin)
* rgerhards, 2009-04-01
*/
static rsRetVal
-openPipe(instanceData *pData)
+openPipe(wrkrInstanceData_t *pWrkrData)
{
- int pipefd[2];
+ int pipestdin[2];
+ int pipestdout[2];
pid_t cpid;
+ int flags;
DEFiRet;
- assert(pData != NULL);
-
- if(pipe(pipefd) == -1) {
+ if(pipe(pipestdin) == -1) {
+ ABORT_FINALIZE(RS_RET_ERR_CREAT_PIPE);
+ }
+ if(pipe(pipestdout) == -1) {
ABORT_FINALIZE(RS_RET_ERR_CREAT_PIPE);
}
- DBGPRINTF("omprog: executing program '%s' with '%d' parameters\n", pData->szBinary, pData->iParams);
+ DBGPRINTF("omprog: executing program '%s' with '%d' parameters\n",
+ pWrkrData->pData->szBinary, pWrkrData->pData->iParams);
/* NO OUTPUT AFTER FORK! */
@@ -199,21 +313,31 @@ openPipe(instanceData *pData)
if(cpid == -1) {
ABORT_FINALIZE(RS_RET_ERR_FORK);
}
+ pWrkrData->pid = cpid;
if(cpid == 0) {
- /* we are now the child, just set the right selectors and
- * exec the binary. If that fails, there is not much we can do.
- */
- close(pipefd[1]);
- execBinary(pData, pipefd[0]);
+ /* we are now the child, just exec the binary. */
+ close(pipestdin[1]); /* close those pipe "ports" that */
+ close(pipestdout[0]); /* we don't need */
+ execBinary(pWrkrData, pipestdin[0], pipestdout[1]);
/*NO CODE HERE - WILL NEVER BE REACHED!*/
}
DBGPRINTF("omprog: child has pid %d\n", (int) cpid);
- pData->fdPipe = pipefd[1];
- pData->pid = cpid;
- close(pipefd[0]);
- pData->bIsRunning = 1;
+ if(pWrkrData->pData->outputFileName != NULL) {
+ pWrkrData->fdPipeIn = dup(pipestdout[0]);
+ /* we need to set our fd to be non-blocking! */
+ flags = fcntl(pWrkrData->fdPipeIn, F_GETFL);
+ flags |= O_NONBLOCK;
+ fcntl(pWrkrData->fdPipeIn, F_SETFL, flags);
+ } else {
+ pWrkrData->fdPipeIn = -1;
+ }
+ close(pipestdin[0]);
+ close(pipestdout[1]);
+ pWrkrData->pid = cpid;
+ pWrkrData->fdPipeOut = pipestdin[1];
+ pWrkrData->bIsRunning = 1;
finalize_it:
RETiRet;
}
@@ -222,34 +346,48 @@ finalize_it:
/* clean up after a terminated child
*/
static inline rsRetVal
-cleanup(instanceData *pData)
+cleanup(wrkrInstanceData_t *pWrkrData)
{
int status;
int ret;
char errStr[1024];
DEFiRet;
- assert(pData != NULL);
- assert(pData->bIsRunning == 1);
- ret = waitpid(pData->pid, &status, 0);
- if(ret != pData->pid) {
+ assert(pWrkrData->bIsRunning == 1);
+ ret = waitpid(pWrkrData->pid, &status, 0);
+ if(ret != pWrkrData->pid) {
/* if waitpid() fails, we can not do much - try to ignore it... */
DBGPRINTF("omprog: waitpid() returned state %d[%s], future malfunction may happen\n", ret,
rs_strerror_r(errno, errStr, sizeof(errStr)));
} else {
/* check if we should print out some diagnostic information */
DBGPRINTF("omprog: waitpid status return for program '%s': %2.2x\n",
- pData->szBinary, status);
+ pWrkrData->pData->szBinary, status);
if(WIFEXITED(status)) {
errmsg.LogError(0, NO_ERRCODE, "program '%s' exited normally, state %d",
- pData->szBinary, WEXITSTATUS(status));
+ pWrkrData->pData->szBinary, WEXITSTATUS(status));
} else if(WIFSIGNALED(status)) {
errmsg.LogError(0, NO_ERRCODE, "program '%s' terminated by signal %d.",
- pData->szBinary, WTERMSIG(status));
+ pWrkrData->pData->szBinary, WTERMSIG(status));
}
}
- pData->bIsRunning = 0;
+ checkProgramOutput(pWrkrData); /* try to catch any late messages */
+
+ if(pWrkrData->fdOutput != -1) {
+ close(pWrkrData->fdOutput);
+ pWrkrData->fdOutput = -1;
+ }
+ if(pWrkrData->fdPipeIn != -1) {
+ close(pWrkrData->fdPipeIn);
+ pWrkrData->fdPipeIn = -1;
+ }
+ if(pWrkrData->fdPipeOut != -1) {
+ close(pWrkrData->fdPipeOut);
+ pWrkrData->fdPipeOut = -1;
+ }
+ pWrkrData->bIsRunning = 0;
+ pWrkrData->bIsRunning = 0;
RETiRet;
}
@@ -257,24 +395,22 @@ cleanup(instanceData *pData)
/* try to restart the binary when it has stopped.
*/
static inline rsRetVal
-tryRestart(instanceData *pData)
+tryRestart(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
- assert(pData != NULL);
- assert(pData->bIsRunning == 0);
+ assert(pWrkrData->bIsRunning == 0);
- iRet = openPipe(pData);
+ iRet = openPipe(pWrkrData);
RETiRet;
}
-
/* write to pipe
* note that we do not try to run block-free. If the users fears something
* may block (and this not be acceptable), the action should be run on its
* own action queue.
*/
static rsRetVal
-writePipe(instanceData *pData, uchar *szMsg)
+writePipe(wrkrInstanceData_t *pWrkrData, uchar *szMsg)
{
int lenWritten;
int lenWrite;
@@ -282,33 +418,33 @@ writePipe(instanceData *pData, uchar *szMsg)
char errStr[1024];
DEFiRet;
- assert(pData != NULL);
-
lenWrite = strlen((char*)szMsg);
writeOffset = 0;
- do
- {
- lenWritten = write(pData->fdPipe, ((char*)szMsg)+writeOffset, lenWrite);
+ do {
+ checkProgramOutput(pWrkrData);
+dbgprintf("omprog: writing to prog (fd %d): %s\n", pWrkrData->fdPipeOut, szMsg);
+ lenWritten = write(pWrkrData->fdPipeOut, ((char*)szMsg)+writeOffset, lenWrite);
if(lenWritten == -1) {
switch(errno) {
- case EPIPE:
- DBGPRINTF("omprog: Program '%s' terminated, trying to restart\n",
- pData->szBinary);
- CHKiRet(cleanup(pData));
- CHKiRet(tryRestart(pData));
- break;
- default:
- DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno,
- rs_strerror_r(errno, errStr, sizeof(errStr)));
- ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE);
- break;
+ case EPIPE:
+ DBGPRINTF("omprog: program '%s' terminated, trying to restart\n",
+ pWrkrData->pData->szBinary);
+ CHKiRet(cleanup(pWrkrData));
+ CHKiRet(tryRestart(pWrkrData));
+ break;
+ default:
+ DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno,
+ rs_strerror_r(errno, errStr, sizeof(errStr)));
+ ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE);
+ break;
}
} else {
writeOffset += lenWritten;
}
} while(lenWritten != lenWrite);
+ checkProgramOutput(pWrkrData);
finalize_it:
RETiRet;
@@ -316,15 +452,22 @@ finalize_it:
BEGINdoAction
+ instanceData *pData;
CODESTARTdoAction
- if(pData->bIsRunning == 0) {
- openPipe(pData);
+dbgprintf("DDDD:omprog processing message\n");
+ pData = pWrkrData->pData;
+ if(pData->bForceSingleInst)
+ pthread_mutex_lock(&pData->mut);
+ if(pWrkrData->bIsRunning == 0) {
+ openPipe(pWrkrData);
}
- iRet = writePipe(pData, ppString[0]);
+ iRet = writePipe(pWrkrData, ppString[0]);
if(iRet != RS_RET_OK)
iRet = RS_RET_SUSPENDED;
+ if(pData->bForceSingleInst)
+ pthread_mutex_unlock(&pData->mut);
ENDdoAction
@@ -333,9 +476,9 @@ setInstParamDefaults(instanceData *pData)
{
pData->szBinary = NULL;
pData->aParams = NULL;
+ pData->outputFileName = NULL;
pData->iParams = 0;
- pData->fdPipe = -1;
- pData->bIsRunning = 0;
+ pData->bForceSingleInst = 0;
}
BEGINnewActInst
@@ -437,6 +580,10 @@ CODESTARTnewActInst
pData->aParams[iPrm] = NULL;
}
+ } else if(!strcmp(actpblk.descr[i].name, "output")) {
+ pData->outputFileName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "forcesingleinstance")) {
+ pData->bForceSingleInst = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else {
@@ -447,6 +594,7 @@ CODESTARTnewActInst
CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ?
"RSYSLOG_FileFormat" : (char*)pData->tplName),
OMSR_NO_RQD_TPL_OPTS));
+ DBGPRINTF("omprog: bForceSingleInst %d\n", pData->bForceSingleInst);
CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
@@ -496,6 +644,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt