diff options
author | Michael Biebl <biebl@debian.org> | 2014-04-03 03:08:50 +0200 |
---|---|---|
committer | Michael Biebl <biebl@debian.org> | 2014-04-03 03:08:50 +0200 |
commit | 9374a46543e9c43c009f80def8c3b2506b0b377e (patch) | |
tree | 8853fd40ee8d55ff24304ff8a4421640f3493c58 /plugins/omprog/omprog.c | |
parent | 209e193f14ec562df5aad945f04cd88b227cc602 (diff) | |
download | rsyslog-upstream/8.2.0.tar.gz |
Imported Upstream version 8.2.0upstream/8.2.0
Diffstat (limited to 'plugins/omprog/omprog.c')
-rw-r--r-- | plugins/omprog/omprog.c | 281 |
1 files changed, 215 insertions, 66 deletions
diff --git a/plugins/omprog/omprog.c b/plugins/omprog/omprog.c index cd07dcf..fb611f6 100644 --- a/plugins/omprog/omprog.c +++ b/plugins/omprog/omprog.c @@ -6,7 +6,7 @@ * * File begun on 2009-04-01 by RGerhards * - * Copyright 2009-2012 Adiscon GmbH. + * Copyright 2009-2014 Adiscon GmbH. * * This file is part of rsyslog. * @@ -34,7 +34,9 @@ #include <signal.h> #include <errno.h> #include <unistd.h> +#include <fcntl.h> #include <wait.h> +#include <pthread.h> #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" @@ -56,12 +58,21 @@ typedef struct _instanceData { uchar *szBinary; /* name of binary to call */ char **aParams; /* Optional Parameters for binary command */ uchar *tplName; /* assigned output template */ - pid_t pid; /* pid of currently running process */ - int fdPipe; /* file descriptor to write to */ - int bIsRunning; /* is binary currently running? 0-no, 1-yes */ int iParams; /* Holds the count of parameters if set*/ + int bForceSingleInst; /* only a single wrkr instance of program permitted? */ + uchar *outputFileName; /* name of file for std[out/err] or NULL if to discard */ + pthread_mutex_t mut; /* make sure only one instance is active */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; + pid_t pid; /* pid of currently running process */ + int fdOutput; /* it's fd (-1 if closed) */ + int fdPipeOut; /* file descriptor to write to */ + int fdPipeIn; /* fd we receive messages from the program (if we want to) */ + int bIsRunning; /* is binary currently running? 0-no, 1-yes */ +} wrkrInstanceData_t; + typedef struct configSettings_s { uchar *szBinary; /* name of binary to call */ } configSettings_t; @@ -72,6 +83,8 @@ static configSettings_t cs; /* action (instance) parameters */ static struct cnfparamdescr actpdescr[] = { { "binary", eCmdHdlrString, CNFPARAM_REQUIRED }, + { "output", eCmdHdlrString, 0 }, + { "forcesingleinstance", eCmdHdlrBinary, 0 }, { "template", eCmdHdlrGetWord, 0 } }; static struct cnfparamblk actpblk = @@ -89,8 +102,17 @@ ENDinitConfVars BEGINcreateInstance CODESTARTcreateInstance + pthread_mutex_init(&pData->mut, NULL); ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + pWrkrData->fdPipeIn = -1; + pWrkrData->fdPipeOut = -1; + pWrkrData->fdOutput = -1; + pWrkrData->bIsRunning = 0; +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -102,8 +124,9 @@ ENDisCompatibleWithFeature BEGINfreeInstance int i; CODESTARTfreeInstance - if(pData->szBinary != NULL) - free(pData->szBinary); + pthread_mutex_destroy(&pData->mut); + free(pData->szBinary); + free(pData->outputFileName); if(pData->aParams != NULL) { for (i = 0; i < pData->iParams; i++) { free(pData->aParams[i]); @@ -112,6 +135,10 @@ CODESTARTfreeInstance } ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -123,27 +150,99 @@ CODESTARTtryResume ENDtryResume +/* As this is assume to be a debug function, we only make + * best effort to write the message but do *not* try very + * hard to handle errors. -- rgerhards, 2014-01-16 + */ +static void +writeProgramOutput(wrkrInstanceData_t *__restrict__ const pWrkrData, + const char *__restrict__ const buf, + const ssize_t lenBuf) +{ + char errStr[1024]; + ssize_t r; + +dbgprintf("omprog: writeProgramOutput, fd %d\n", pWrkrData->fdOutput); + if(pWrkrData->fdOutput == -1) { + pWrkrData->fdOutput = open((char*)pWrkrData->pData->outputFileName, + O_WRONLY | O_APPEND | O_CREAT, 0600); + if(pWrkrData->fdOutput == -1) { + DBGPRINTF("omprog: error opening output file %s: %s\n", + pWrkrData->pData->outputFileName, + rs_strerror_r(errno, errStr, sizeof(errStr))); + goto done; + } + } + + r = write(pWrkrData->fdOutput, buf, (size_t) lenBuf); + if(r != lenBuf) { + DBGPRINTF("omprog: problem writing output file %s: bytes " + "requested %lld, written %lld, msg: %s\n", + pWrkrData->pData->outputFileName, (long long) lenBuf, (long long) r, + rs_strerror_r(errno, errStr, sizeof(errStr))); + } +done: return; +} + + +/* check output of the executed program + * If configured to care about the output, we check if there is some and, + * if so, properly handle it. + */ +static void +checkProgramOutput(wrkrInstanceData_t *__restrict__ const pWrkrData) +{ + char buf[4096]; + ssize_t r; + +dbgprintf("omprog: checking prog output, fd %d\n", pWrkrData->fdPipeIn); + if(pWrkrData->fdPipeIn == -1) + goto done; + + do { +memset(buf, 0, sizeof(buf)); + r = read(pWrkrData->fdPipeIn, buf, sizeof(buf)); +dbgprintf("omprog: read state %lld, data '%s'\n", (long long) r, buf); + if(r > 0) + writeProgramOutput(pWrkrData, buf, r); + } while(r > 0); + +done: return; +} + + + /* execute the child process (must be called in child context * after fork). */ - -static void execBinary(instanceData *pData, int fdStdin) +static void +execBinary(wrkrInstanceData_t *pWrkrData, int fdStdin, int fdStdOutErr) { int i, iRet; struct sigaction sigAct; sigset_t set; + char errStr[1024]; char *newenviron[] = { NULL }; - assert(pData != NULL); - fclose(stdin); if(dup(fdStdin) == -1) { - DBGPRINTF("omprog: dup() failed\n"); + DBGPRINTF("omprog: dup() stdin failed\n"); /* do some more error handling here? Maybe if the module * gets some more widespread use... */ } - /*fclose(stdout);*/ + if(pWrkrData->pData->outputFileName == NULL) { + close(fdStdOutErr); + } else { + close(1); + if(dup(fdStdOutErr) == -1) { + DBGPRINTF("omprog: dup() stdout failed\n"); + } + close(2); + if(dup(fdStdOutErr) == -1) { + DBGPRINTF("omprog: dup() stderr failed\n"); + } + } /* we close all file handles as we fork soon * Is there a better way to do this? - mail me! rgerhards@adiscon.com @@ -159,15 +258,26 @@ static void execBinary(instanceData *pData, int fdStdin) sigAct.sa_handler = SIG_DFL; for(i = 1 ; i < NSIG ; ++i) sigaction(i, &sigAct, NULL); + /* we need to block SIGINT, otherwise our program is cancelled when we are + * stopped in debug mode. + */ + sigAct.sa_handler = SIG_IGN; + sigaction(SIGINT, &sigAct, NULL); sigemptyset(&set); sigprocmask(SIG_SETMASK, &set, NULL); alarm(0); /* finally exec child */ - iRet = execve((char*)pData->szBinary, pData->aParams, newenviron); - if (iRet == -1) { - dbgprintf("omprog: failed to execute binary '%s' with return code: %d\n", pData->szBinary, errno); + iRet = execve((char*)pWrkrData->pData->szBinary, pWrkrData->pData->aParams, newenviron); + if(iRet == -1) { + /* Note: this will go to stdout of the **child**, so rsyslog will never + * see it except when stdout is captured. If we use the plugin interface, + * we can use this to convey a proper status back! + */ + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGPRINTF("omprog: failed to execute binary '%s': %s\n", + pWrkrData->pData->szBinary, errStr); } /* we should never reach this point, but if we do, we terminate */ @@ -179,19 +289,23 @@ static void execBinary(instanceData *pData, int fdStdin) * rgerhards, 2009-04-01 */ static rsRetVal -openPipe(instanceData *pData) +openPipe(wrkrInstanceData_t *pWrkrData) { - int pipefd[2]; + int pipestdin[2]; + int pipestdout[2]; pid_t cpid; + int flags; DEFiRet; - assert(pData != NULL); - - if(pipe(pipefd) == -1) { + if(pipe(pipestdin) == -1) { + ABORT_FINALIZE(RS_RET_ERR_CREAT_PIPE); + } + if(pipe(pipestdout) == -1) { ABORT_FINALIZE(RS_RET_ERR_CREAT_PIPE); } - DBGPRINTF("omprog: executing program '%s' with '%d' parameters\n", pData->szBinary, pData->iParams); + DBGPRINTF("omprog: executing program '%s' with '%d' parameters\n", + pWrkrData->pData->szBinary, pWrkrData->pData->iParams); /* NO OUTPUT AFTER FORK! */ @@ -199,21 +313,31 @@ openPipe(instanceData *pData) if(cpid == -1) { ABORT_FINALIZE(RS_RET_ERR_FORK); } + pWrkrData->pid = cpid; if(cpid == 0) { - /* we are now the child, just set the right selectors and - * exec the binary. If that fails, there is not much we can do. - */ - close(pipefd[1]); - execBinary(pData, pipefd[0]); + /* we are now the child, just exec the binary. */ + close(pipestdin[1]); /* close those pipe "ports" that */ + close(pipestdout[0]); /* we don't need */ + execBinary(pWrkrData, pipestdin[0], pipestdout[1]); /*NO CODE HERE - WILL NEVER BE REACHED!*/ } DBGPRINTF("omprog: child has pid %d\n", (int) cpid); - pData->fdPipe = pipefd[1]; - pData->pid = cpid; - close(pipefd[0]); - pData->bIsRunning = 1; + if(pWrkrData->pData->outputFileName != NULL) { + pWrkrData->fdPipeIn = dup(pipestdout[0]); + /* we need to set our fd to be non-blocking! */ + flags = fcntl(pWrkrData->fdPipeIn, F_GETFL); + flags |= O_NONBLOCK; + fcntl(pWrkrData->fdPipeIn, F_SETFL, flags); + } else { + pWrkrData->fdPipeIn = -1; + } + close(pipestdin[0]); + close(pipestdout[1]); + pWrkrData->pid = cpid; + pWrkrData->fdPipeOut = pipestdin[1]; + pWrkrData->bIsRunning = 1; finalize_it: RETiRet; } @@ -222,34 +346,48 @@ finalize_it: /* clean up after a terminated child */ static inline rsRetVal -cleanup(instanceData *pData) +cleanup(wrkrInstanceData_t *pWrkrData) { int status; int ret; char errStr[1024]; DEFiRet; - assert(pData != NULL); - assert(pData->bIsRunning == 1); - ret = waitpid(pData->pid, &status, 0); - if(ret != pData->pid) { + assert(pWrkrData->bIsRunning == 1); + ret = waitpid(pWrkrData->pid, &status, 0); + if(ret != pWrkrData->pid) { /* if waitpid() fails, we can not do much - try to ignore it... */ DBGPRINTF("omprog: waitpid() returned state %d[%s], future malfunction may happen\n", ret, rs_strerror_r(errno, errStr, sizeof(errStr))); } else { /* check if we should print out some diagnostic information */ DBGPRINTF("omprog: waitpid status return for program '%s': %2.2x\n", - pData->szBinary, status); + pWrkrData->pData->szBinary, status); if(WIFEXITED(status)) { errmsg.LogError(0, NO_ERRCODE, "program '%s' exited normally, state %d", - pData->szBinary, WEXITSTATUS(status)); + pWrkrData->pData->szBinary, WEXITSTATUS(status)); } else if(WIFSIGNALED(status)) { errmsg.LogError(0, NO_ERRCODE, "program '%s' terminated by signal %d.", - pData->szBinary, WTERMSIG(status)); + pWrkrData->pData->szBinary, WTERMSIG(status)); } } - pData->bIsRunning = 0; + checkProgramOutput(pWrkrData); /* try to catch any late messages */ + + if(pWrkrData->fdOutput != -1) { + close(pWrkrData->fdOutput); + pWrkrData->fdOutput = -1; + } + if(pWrkrData->fdPipeIn != -1) { + close(pWrkrData->fdPipeIn); + pWrkrData->fdPipeIn = -1; + } + if(pWrkrData->fdPipeOut != -1) { + close(pWrkrData->fdPipeOut); + pWrkrData->fdPipeOut = -1; + } + pWrkrData->bIsRunning = 0; + pWrkrData->bIsRunning = 0; RETiRet; } @@ -257,24 +395,22 @@ cleanup(instanceData *pData) /* try to restart the binary when it has stopped. */ static inline rsRetVal -tryRestart(instanceData *pData) +tryRestart(wrkrInstanceData_t *pWrkrData) { DEFiRet; - assert(pData != NULL); - assert(pData->bIsRunning == 0); + assert(pWrkrData->bIsRunning == 0); - iRet = openPipe(pData); + iRet = openPipe(pWrkrData); RETiRet; } - /* write to pipe * note that we do not try to run block-free. If the users fears something * may block (and this not be acceptable), the action should be run on its * own action queue. */ static rsRetVal -writePipe(instanceData *pData, uchar *szMsg) +writePipe(wrkrInstanceData_t *pWrkrData, uchar *szMsg) { int lenWritten; int lenWrite; @@ -282,33 +418,33 @@ writePipe(instanceData *pData, uchar *szMsg) char errStr[1024]; DEFiRet; - assert(pData != NULL); - lenWrite = strlen((char*)szMsg); writeOffset = 0; - do - { - lenWritten = write(pData->fdPipe, ((char*)szMsg)+writeOffset, lenWrite); + do { + checkProgramOutput(pWrkrData); +dbgprintf("omprog: writing to prog (fd %d): %s\n", pWrkrData->fdPipeOut, szMsg); + lenWritten = write(pWrkrData->fdPipeOut, ((char*)szMsg)+writeOffset, lenWrite); if(lenWritten == -1) { switch(errno) { - case EPIPE: - DBGPRINTF("omprog: Program '%s' terminated, trying to restart\n", - pData->szBinary); - CHKiRet(cleanup(pData)); - CHKiRet(tryRestart(pData)); - break; - default: - DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno, - rs_strerror_r(errno, errStr, sizeof(errStr))); - ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE); - break; + case EPIPE: + DBGPRINTF("omprog: program '%s' terminated, trying to restart\n", + pWrkrData->pData->szBinary); + CHKiRet(cleanup(pWrkrData)); + CHKiRet(tryRestart(pWrkrData)); + break; + default: + DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno, + rs_strerror_r(errno, errStr, sizeof(errStr))); + ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE); + break; } } else { writeOffset += lenWritten; } } while(lenWritten != lenWrite); + checkProgramOutput(pWrkrData); finalize_it: RETiRet; @@ -316,15 +452,22 @@ finalize_it: BEGINdoAction + instanceData *pData; CODESTARTdoAction - if(pData->bIsRunning == 0) { - openPipe(pData); +dbgprintf("DDDD:omprog processing message\n"); + pData = pWrkrData->pData; + if(pData->bForceSingleInst) + pthread_mutex_lock(&pData->mut); + if(pWrkrData->bIsRunning == 0) { + openPipe(pWrkrData); } - iRet = writePipe(pData, ppString[0]); + iRet = writePipe(pWrkrData, ppString[0]); if(iRet != RS_RET_OK) iRet = RS_RET_SUSPENDED; + if(pData->bForceSingleInst) + pthread_mutex_unlock(&pData->mut); ENDdoAction @@ -333,9 +476,9 @@ setInstParamDefaults(instanceData *pData) { pData->szBinary = NULL; pData->aParams = NULL; + pData->outputFileName = NULL; pData->iParams = 0; - pData->fdPipe = -1; - pData->bIsRunning = 0; + pData->bForceSingleInst = 0; } BEGINnewActInst @@ -437,6 +580,10 @@ CODESTARTnewActInst pData->aParams[iPrm] = NULL; } + } else if(!strcmp(actpblk.descr[i].name, "output")) { + pData->outputFileName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "forcesingleinstance")) { + pData->bForceSingleInst = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { @@ -447,6 +594,7 @@ CODESTARTnewActInst CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? "RSYSLOG_FileFormat" : (char*)pData->tplName), OMSR_NO_RQD_TPL_OPTS)); + DBGPRINTF("omprog: bForceSingleInst %d\n", pData->bForceSingleInst); CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst @@ -496,6 +644,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt |