diff options
Diffstat (limited to 'runtime/stream.c')
-rw-r--r-- | runtime/stream.c | 955 |
1 files changed, 814 insertions, 141 deletions
diff --git a/runtime/stream.c b/runtime/stream.c index 1cff2da..36f4400 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -1,4 +1,3 @@ -//TODO: O_TRUC mode! /* The serial stream class. * * A serial stream provides serial data access. In theory, serial streams @@ -7,8 +6,17 @@ * "driver"). * * File begun on 2008-01-09 by RGerhards + * Large modifications in 2009-06 to support using it with omfile, including zip writer. + * Note that this file obtains the zlib wrapper object is needed, but it never frees it + * again. While this sounds like a leak (and one may argue it actually is), there is no + * harm associated with that. The reason is that strm is a core object, so it is terminated + * only when rsyslogd exists. As we could only release on termination (or else bear more + * overhead for keeping track of how many users we have), not releasing zlibw is OK, because + * it will be released when rsyslogd terminates. We may want to revisit this decision if + * it turns out to be problematic. Then, we need to quasi-refcount the number of accesses + * to the object. * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -39,24 +47,198 @@ #include <unistd.h> #include <sys/stat.h> /* required for HP UX */ #include <errno.h> +#include <pthread.h> #include "rsyslog.h" #include "stringbuf.h" #include "srUtils.h" #include "obj.h" #include "stream.h" +#include "unicode-helper.h" +#include "module-template.h" +#if HAVE_SYS_PRCTL_H +# include <sys/prctl.h> +#endif + +#define inline /* static data */ DEFobjStaticHelpers +DEFobjCurrIf(zlibw) + +/* forward definitions */ +static rsRetVal strmFlush(strm_t *pThis); +static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); +static rsRetVal strmCloseFile(strm_t *pThis); +static void *asyncWriterThread(void *pPtr); +static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); +static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); + /* methods */ -/* first, we define type-specific handlers. The provide a generic functionality, +/* Try to resolve a size limit situation. This is used to support custom-file size handlers + * for omfile. It first runs the command, and then checks if we are still above the size + * treshold. Note that this works only with single file names, NOT with circular names. + * Note that pszCurrFName can NOT be taken from pThis, because the stream is closed when + * we are called (and that destroys pszCurrFName, as there is NO CURRENT file name!). So + * we need to receive the name as a parameter. + * initially wirtten 2005-06-21, moved to this class & updates 2009-06-01, both rgerhards + */ +static rsRetVal +resolveFileSizeLimit(strm_t *pThis, uchar *pszCurrFName) +{ + uchar *pParams; + uchar *pCmd; + uchar *p; + off_t actualFileSize; + rsRetVal localRet; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + assert(pszCurrFName != NULL); + + if(pThis->pszSizeLimitCmd == NULL) { + ABORT_FINALIZE(RS_RET_NON_SIZELIMITCMD); /* nothing we can do in this case... */ + } + + /* we first check if we have command line parameters. We assume this, + * when we have a space in the program name. If we find it, everything after + * the space is treated as a single argument. + */ + CHKmalloc(pCmd = ustrdup(pThis->pszSizeLimitCmd)); + + for(p = pCmd ; *p && *p != ' ' ; ++p) { + /* JUST SKIP */ + } + + if(*p == ' ') { + *p = '\0'; /* pretend string-end */ + pParams = p+1; + } else + pParams = NULL; + + /* the execProg() below is probably not great, but at least is is + * fairly secure now. Once we change the way file size limits are + * handled, we should also revisit how this command is run (and + * with which parameters). rgerhards, 2007-07-20 + */ + execProg(pCmd, 1, pParams); + + free(pCmd); + + localRet = getFileSize(pszCurrFName, &actualFileSize); + + if(localRet == RS_RET_OK && actualFileSize >= pThis->iSizeLimit) { + ABORT_FINALIZE(RS_RET_SIZELIMITCMD_DIDNT_RESOLVE); /* OK, it didn't work out... */ + } else if(localRet != RS_RET_FILE_NOT_FOUND) { + /* file not found is OK, the command may have moved away the file */ + ABORT_FINALIZE(localRet); + } + +finalize_it: + if(iRet != RS_RET_OK) { + if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) { + DBGPRINTF("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName); + } else { + DBGPRINTF("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet); + } + pThis->bDisabled = 1; + } + + RETiRet; +} + + +/* Check if the file has grown beyond the configured omfile iSizeLimit + * and, if so, initiate processing. + */ +static rsRetVal +doSizeLimitProcessing(strm_t *pThis) +{ + uchar *pszCurrFName = NULL; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + ASSERT(pThis->iSizeLimit != 0); + ASSERT(pThis->fd != -1); + + if(pThis->iCurrOffs >= pThis->iSizeLimit) { + /* strmClosefile() destroys the current file name, so we + * need to preserve it. + */ + CHKmalloc(pszCurrFName = ustrdup(pThis->pszCurrFName)); + CHKiRet(strmCloseFile(pThis)); + CHKiRet(resolveFileSizeLimit(pThis, pszCurrFName)); + } + +finalize_it: + free(pszCurrFName); + RETiRet; +} + + +/* now, we define type-specific handlers. The provide a generic functionality, * but for this specific type of strm. The mapping to these handlers happens during * strm construction. Later on, handlers are called by pointers present in the * strm instance object. */ +/* do the physical open() call on a file. + */ +static rsRetVal +doPhysOpen(strm_t *pThis) +{ + int iFlags = 0; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + /* compute which flags we need to provide to open */ + switch(pThis->tOperationsMode) { + case STREAMMODE_READ: + iFlags = O_CLOEXEC | O_NOCTTY | O_RDONLY; + break; + case STREAMMODE_WRITE: /* legacy mode used inside queue engine */ + iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT; + break; + case STREAMMODE_WRITE_TRUNC: + iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT | O_TRUNC; + break; + case STREAMMODE_WRITE_APPEND: + iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT | O_APPEND; + break; + default:assert(0); + break; + } + if(pThis->sType == STREAMTYPE_NAMED_PIPE) { + DBGPRINTF("Note: stream '%s' is a named pipe, open with O_NONBLOCK\n", pThis->pszCurrFName); + iFlags |= O_NONBLOCK; + } + + pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); + DBGPRINTF("file '%s' opened as #%d with mode %d\n", pThis->pszCurrFName, pThis->fd, pThis->tOpenMode); + if(pThis->fd == -1) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + dbgoprint((obj_t*) pThis, "open error %d, file '%s': %s\n", errno, pThis->pszCurrFName, errStr); + if(err == ENOENT) + ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); + else + ABORT_FINALIZE(RS_RET_IO_ERROR); + } else { + if(!ustrcmp(pThis->pszCurrFName, UCHAR_CONSTANT(_PATH_CONSOLE)) || isatty(pThis->fd)) { + DBGPRINTF("file %d is a tty-type file\n", pThis->fd); + pThis->bIsTTY = 1; + } else { + pThis->bIsTTY = 0; + } + } + +finalize_it: + RETiRet; +} + + /* open a strm file * It is OK to call this function when the stream is already open. In that * case, it returns immediately with RS_RET_OK @@ -64,10 +246,8 @@ DEFobjStaticHelpers static rsRetVal strmOpenFile(strm_t *pThis) { DEFiRet; - int iFlags; ASSERT(pThis != NULL); - ASSERT(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE); if(pThis->fd != -1) ABORT_FINALIZE(RS_RET_OK); @@ -88,34 +268,41 @@ static rsRetVal strmOpenFile(strm_t *pThis) } } - /* compute which flags we need to provide to open */ - if(pThis->tOperationsMode == STREAMMODE_READ) - iFlags = O_RDONLY; - else - iFlags = O_WRONLY | O_CREAT; - - iFlags |= pThis->iAddtlOpenFlags; - - pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); - if(pThis->fd == -1) { - int ierrnoSave = errno; - dbgoprint((obj_t*) pThis, "open error %d, file '%s'\n", errno, pThis->pszCurrFName); - if(ierrnoSave == ENOENT) - ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); - else - ABORT_FINALIZE(RS_RET_IO_ERROR); - } + CHKiRet(doPhysOpen(pThis)); pThis->iCurrOffs = 0; + if(pThis->tOperationsMode == STREAMMODE_WRITE_APPEND) { + /* we need to obtain the current offset */ + off_t offset; + CHKiRet(getFileSize(pThis->pszCurrFName, &offset)); + pThis->iCurrOffs = offset; + } - dbgoprint((obj_t*) pThis, "opened file '%s' for %s (0x%x) as %d\n", pThis->pszCurrFName, - (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", iFlags, pThis->fd); + dbgoprint((obj_t*) pThis, "opened file '%s' for %s as %d\n", pThis->pszCurrFName, + (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", pThis->fd); finalize_it: RETiRet; } +/* wait for the output writer thread to be done. This must be called before actions + * that require data to be persisted. May be called in non-async mode and is a null + * operation than. Must be called with the mutex locked. + */ +static inline void +strmWaitAsyncWriterDone(strm_t *pThis) +{ + BEGINfunc + if(pThis->bAsyncWrite) { + /* awake writer thread and make it write out everything */ + pthread_cond_signal(&pThis->notEmpty); + d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + } + ENDfunc +} + + /* close a strm file * Note that the bDeleteOnClose flag is honored. If it is set, the file will be * deleted after close. This is in support for the qRead thread. @@ -128,14 +315,33 @@ static rsRetVal strmCloseFile(strm_t *pThis) ASSERT(pThis->fd != -1); dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); - if(pThis->tOperationsMode == STREAMMODE_WRITE) - strmFlush(pThis); + if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) { + pThis->bInClose = 1; + if(pThis->bAsyncWrite) { + strmFlush(pThis); + } else { + strmWaitAsyncWriterDone(pThis); + } + pThis->bInClose = 0; + } - close(pThis->fd); // TODO: error check + close(pThis->fd); pThis->fd = -1; + if(pThis->fdDir != -1) { + /* close associated directory handle, if it is open */ + close(pThis->fdDir); + pThis->fdDir = -1; + } + if(pThis->bDeleteOnClose) { - unlink((char*) pThis->pszCurrFName); // TODO: check returncode + if(unlink((char*) pThis->pszCurrFName) == -1) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("error %d unlinking '%s' - ignored: %s\n", + errno, pThis->pszCurrFName, errStr); + } } pThis->iCurrOffs = 0; /* we are back at begin of file */ @@ -229,15 +435,12 @@ strmHandleEOF(strm_t *pThis) ISOBJ_TYPE_assert(pThis, strm); switch(pThis->sType) { case STREAMTYPE_FILE_SINGLE: + case STREAMTYPE_NAMED_PIPE: ABORT_FINALIZE(RS_RET_EOF); break; case STREAMTYPE_FILE_CIRCULAR: /* we have multiple files and need to switch to the next one */ /* TODO: think about emulating EOF in this case (not yet needed) */ -#if 0 - if(pThis->iMaxFiles == 0) /* TODO: why do we need this? ;) */ - ABORT_FINALIZE(RS_RET_EOF); -#endif dbgoprint((obj_t*) pThis, "file %d EOF\n", pThis->fd); CHKiRet(strmNextFile(pThis)); break; @@ -295,7 +498,7 @@ finalize_it: * NOTE: needs to be enhanced to support sticking with a strm entry (if not * deleted). */ -rsRetVal strmReadChar(strm_t *pThis, uchar *pC) +static rsRetVal strmReadChar(strm_t *pThis, uchar *pC) { DEFiRet; @@ -329,7 +532,7 @@ finalize_it: * character buffering capability. * rgerhards, 2008-01-07 */ -rsRetVal strmUnreadChar(strm_t *pThis, uchar c) +static rsRetVal strmUnreadChar(strm_t *pThis, uchar c) { ASSERT(pThis != NULL); ASSERT(pThis->iUngetC == -1); @@ -351,7 +554,7 @@ rsRetVal strmUnreadChar(strm_t *pThis, uchar c) * are pthread_killed() upon termination. So if we use their native pointer, they * can cleanup (but only then). */ -rsRetVal +static rsRetVal strmReadLine(strm_t *pThis, cstr_t **ppCStr) { DEFiRet; @@ -360,19 +563,19 @@ strmReadLine(strm_t *pThis, cstr_t **ppCStr) ASSERT(pThis != NULL); ASSERT(ppCStr != NULL); - CHKiRet(rsCStrConstruct(ppCStr)); + CHKiRet(cstrConstruct(ppCStr)); /* now read the line */ CHKiRet(strmReadChar(pThis, &c)); while(c != '\n') { - CHKiRet(rsCStrAppendChar(*ppCStr, c)); + CHKiRet(cstrAppendChar(*ppCStr, c)); CHKiRet(strmReadChar(pThis, &c)); } CHKiRet(cstrFinalize(*ppCStr)); finalize_it: if(iRet != RS_RET_OK && *ppCStr != NULL) - rsCStrDestruct(ppCStr); + cstrDestruct(ppCStr); RETiRet; } @@ -383,26 +586,75 @@ finalize_it: BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro! */ pThis->iCurrFNum = 1; pThis->fd = -1; + pThis->fdDir = -1; pThis->iUngetC = -1; pThis->sType = STREAMTYPE_FILE_SINGLE; pThis->sIOBufSize = glblGetIOBufSize(); - pThis->tOpenMode = 0600; /* TODO: make configurable */ + pThis->tOpenMode = 0600; ENDobjConstruct(strm) /* ConstructionFinalizer * rgerhards, 2008-01-09 */ -rsRetVal strmConstructFinalize(strm_t *pThis) +static rsRetVal strmConstructFinalize(strm_t *pThis) { + rsRetVal localRet; + int i; DEFiRet; ASSERT(pThis != NULL); - if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */ - if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - pThis->iBufPtrMax = 0; /* results in immediate read request */ + pThis->iBufPtrMax = 0; /* results in immediate read request */ + if(pThis->iZipLevel) { /* do we need a zip buf? */ + localRet = objUse(zlibw, LM_ZLIBW_FILENAME); + if(localRet != RS_RET_OK) { + pThis->iZipLevel = 0; + DBGPRINTF("stream was requested with zip mode, but zlibw module unavailable (%d) - using " + "without zip\n", localRet); + } else { + /* we use the same size as the original buf, as we would like + * to make sure we can write out everything with a SINGLE api call! + * We add another 128 bytes to take care of the gzip header and "all eventualities". + */ + CHKmalloc(pThis->pZipBuf = (Bytef*) malloc(sizeof(uchar) * pThis->sIOBufSize + 128)); + } + } + + /* if we are aset to sync, we must obtain a file handle to the directory for fsync() purposes */ + if(pThis->bSync && !pThis->bIsTTY) { + pThis->fdDir = open((char*)pThis->pszDir, O_RDONLY | O_CLOEXEC | O_NOCTTY); + if(pThis->fdDir == -1) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("error %d opening directory file for fsync() use - fsync for directory disabled: %s\n", + errno, errStr); + } + } + + /* if we have a flush interval, we need to do async writes in any case */ + if(pThis->iFlushInterval != 0) { + pThis->bAsyncWrite = 1; + } + + /* if we work asynchronously, we need a couple of synchronization objects */ + if(pThis->bAsyncWrite) { + pthread_mutex_init(&pThis->mut, 0); + pthread_cond_init(&pThis->notFull, 0); + pthread_cond_init(&pThis->notEmpty, 0); + pthread_cond_init(&pThis->isEmpty, 0); + pThis->iCnt = pThis->iEnq = pThis->iDeq = 0; + for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { + CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + } + pThis->pIOBuf = pThis->asyncBuf[0].pBuf; + pThis->bStopWriter = 0; + if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) + DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); + } else { + /* we work synchronously, so we need to alloc a fixed pIOBuf */ + CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } finalize_it: @@ -410,24 +662,57 @@ finalize_it: } +/* stop the writer thread (we MUST be runnnig asynchronously when this method + * is called!). Note that the mutex must be locked! -- rgerhards, 2009-07-06 + */ +static inline void +stopWriter(strm_t *pThis) +{ + BEGINfunc + pThis->bStopWriter = 1; + pthread_cond_signal(&pThis->notEmpty); + d_pthread_mutex_unlock(&pThis->mut); + pthread_join(pThis->writerThreadID, NULL); + ENDfunc +} + + /* destructor for the strm object */ BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ + int i; CODESTARTobjDestruct(strm) - if(pThis->tOperationsMode == STREAMMODE_WRITE) + if(pThis->bAsyncWrite) + /* Note: mutex will be unlocked in stopWriter! */ + d_pthread_mutex_lock(&pThis->mut); + + if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); - /* ... then free resources */ + if(pThis->bAsyncWrite) { + stopWriter(pThis); + pthread_mutex_destroy(&pThis->mut); + pthread_cond_destroy(&pThis->notFull); + pthread_cond_destroy(&pThis->notEmpty); + pthread_cond_destroy(&pThis->isEmpty); + for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { + free(pThis->asyncBuf[i].pBuf); + } + } else { + free(pThis->pIOBuf); + } + + /* Finally, we can free the resources. + * IMPORTANT: we MUST free this only AFTER the ansyncWriter has been stopped, else + * we get random errors... + */ if(pThis->fd != -1) strmCloseFile(pThis); - if(pThis->pszDir != NULL) - free(pThis->pszDir); - if(pThis->pIOBuf != NULL) - free(pThis->pIOBuf); - if(pThis->pszCurrFName != NULL) - free(pThis->pszCurrFName); - if(pThis->pszFName != NULL) - free(pThis->pszFName); + free(pThis->pszDir); + free(pThis->pZipBuf); + free(pThis->pszCurrFName); + free(pThis->pszFName); + ENDobjDestruct(strm) @@ -443,6 +728,9 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis) if(pThis->fd == -1) FINALIZE; + /* wait for output to be empty, so that our counts are correct */ + strmWaitAsyncWriterDone(pThis); + if(pThis->iCurrOffs >= pThis->iMaxFileSize) { dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n", (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); @@ -453,47 +741,375 @@ finalize_it: RETiRet; } + +/* try to recover a tty after a write error. This may have happend + * due to vhangup(), and, if so, we can simply re-open it. + */ +#ifdef linux +# define ERR_TTYHUP EIO +#else +# define ERR_TTYHUP EBADF +#endif +static rsRetVal +tryTTYRecover(strm_t *pThis, int err) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + if(err == ERR_TTYHUP) { + close(pThis->fd); + CHKiRet(doPhysOpen(pThis)); + } + +finalize_it: + RETiRet; +} +#undef ER_TTYHUP + + +/* issue write() api calls until either the buffer is completely + * written or an error occured (it may happen that multiple writes + * are required, what is perfectly legal. On exit, *pLenBuf contains + * the number of bytes actually written. + * rgerhards, 2009-06-08 + */ +static rsRetVal +doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) +{ + ssize_t lenBuf; + ssize_t iTotalWritten; + ssize_t iWritten; + char *pWriteBuf; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + lenBuf = *pLenBuf; + pWriteBuf = (char*) pBuf; + iTotalWritten = 0; + do { + iWritten = write(pThis->fd, pWriteBuf, lenBuf); + if(iWritten < 0) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("log file (%d) write error %d: %s\n", pThis->fd, err, errStr); + if(err == EINTR) { + /*NO ERROR, just continue */; + } else { + if(pThis->bIsTTY) { + CHKiRet(tryTTYRecover(pThis, err)); + } else { + ABORT_FINALIZE(RS_RET_IO_ERROR); + /* Would it make sense to cover more error cases? So far, I + * do not see good reason to do so. + */ + } + } + } + /* advance buffer to next write position */ + iTotalWritten += iWritten; + lenBuf -= iWritten; + pWriteBuf += iWritten; + } while(lenBuf > 0); /* Warning: do..while()! */ + + dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); + +finalize_it: + *pLenBuf = iTotalWritten; + RETiRet; +} + + + /* write memory buffer to a stream object. - * To support direct writes of large objects, this method may be called - * with a buffer pointing to some region other than the stream buffer itself. - * However, in that case the stream buffer must be empty (strmFlush() has to - * be called before), because we would otherwise mess up with the sequence - * inside the stream. -- rgerhards, 2008-01-10 */ -static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +static inline rsRetVal +doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; - int iWritten; ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); + + if(pThis->iZipLevel) { + CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); + } else { + /* write without zipping */ + CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); + } + +finalize_it: + RETiRet; +} + + +/* This function is called to "do" an async write call, what primarily means that + * the data is handed over to the writer thread (which will then do the actual write + * in parallel). Note that the stream mutex has already been locked by the + * strmWrite...() calls. Also note that we always have only a single producer, + * so we can simply serially assign the next free buffer to it and be sure that + * the very some producer comes back in sequence to submit the then-filled buffers. + * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 + */ +static inline rsRetVal +doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) + d_pthread_cond_wait(&pThis->notFull, &pThis->mut); + + pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; + pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf; + + pThis->bDoTimedWait = 0; /* everything written, no need to timeout partial buffer writes */ + if(++pThis->iCnt == 1) + pthread_cond_signal(&pThis->notEmpty); + + RETiRet; +} + + +/* schedule writing to the stream. Depending on our concurrency settings, + * this either directly writes to the stream or schedules writing via + * the background thread. -- rgerhards, 2009-07-07 + */ +static rsRetVal +strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + if(pThis->bAsyncWrite) { + CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); + } else { + CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); + } + + pThis->iBufPtr = 0; /* we are at the begin of a new buffer */ + +finalize_it: + RETiRet; +} + + + +/* This is the writer thread for asynchronous mode. + * -- rgerhards, 2009-07-06 + */ +static void* +asyncWriterThread(void *pPtr) +{ + int iDeq; + struct timespec t; + bool bTimedOut = 0; + strm_t *pThis = (strm_t*) pPtr; + ISOBJ_TYPE_assert(pThis, strm); + + BEGINfunc +# if HAVE_PRCTL && defined PR_SET_NAME + if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) { + DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); + } +#endif + + while(1) { /* loop broken inside */ + d_pthread_mutex_lock(&pThis->mut); + while(pThis->iCnt == 0) { + if(pThis->bStopWriter) { + pthread_cond_broadcast(&pThis->isEmpty); + d_pthread_mutex_unlock(&pThis->mut); + goto finalize_it; /* break main loop */ + } + if(bTimedOut && pThis->iBufPtr > 0) { + /* if we timed out, we need to flush pending data */ + strmFlush(pThis); + bTimedOut = 0; + continue; /* now we should have data */ + } + bTimedOut = 0; + timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ + if(pThis->bDoTimedWait) { + if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) { + int err = errno; + if(err == ETIMEDOUT) { + bTimedOut = 1; + } else { + bTimedOut = 1; + char errStr[1024]; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n", + err, errStr); + } + } + } else { + d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + } + } + + bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ + + iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; + doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); + // TODO: error check????? 2009-07-06 + + --pThis->iCnt; + if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) { + pthread_cond_signal(&pThis->notFull); + if(pThis->iCnt == 0) + pthread_cond_broadcast(&pThis->isEmpty); + } + d_pthread_mutex_unlock(&pThis->mut); + } + +finalize_it: + ENDfunc + return NULL; /* to keep pthreads happy */ +} + + +/* sync the file to disk, so that any unwritten data is persisted. This + * also syncs the directory and thus makes sure that the file survives + * fatal failure. Note that we do NOT return an error status if the + * sync fails. Doing so would probably cause more trouble than it + * is worth (read: data loss may occur where we otherwise might not + * have it). -- rgerhards, 2009-06-08 + */ +#undef SYNCCALL +#if HAVE_FDATASYNC +# define SYNCCALL(x) fdatasync(x) +#else +# define SYNCCALL(x) fsync(x) +#endif +static rsRetVal +syncFile(strm_t *pThis) +{ + int ret; + DEFiRet; + + if(pThis->bIsTTY) + FINALIZE; /* TTYs can not be synced */ + + DBGPRINTF("syncing file %d\n", pThis->fd); + ret = SYNCCALL(pThis->fd); + if(ret != 0) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("sync failed for file %d with error (%d): %s - ignoring\n", + pThis->fd, err, errStr); + } + + if(pThis->fdDir != -1) { + ret = fsync(pThis->fdDir); + } + +finalize_it: + RETiRet; +} +#undef SYNCCALL + +/* physically write to the output file. the provided data is ready for + * writing (e.g. zipped if we are requested to do that). + * Note that if the write() API fails, we do not reset any pointers, but return + * an error code. That means we may redo work in the next iteration. + * rgerhards, 2009-06-04 + */ +static rsRetVal +strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + size_t iWritten; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); if(pThis->fd == -1) CHKiRet(strmOpenFile(pThis)); - iWritten = write(pThis->fd, pBuf, lenBuf); - dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, iWritten); - /* TODO: handle error case -- rgerhards, 2008-01-07 */ - - /* Now indicate buffer empty again. We do this in any case, because there - * is no way we could react more intelligently to an error during write. - * This MUST be done BEFORE strCheckNextOutputFile(), otherwise we have an - * endless loop. We reset the buffer pointer also in finalize_it - this is - * necessary if we run into problems. Not resetting it would again cause an - * endless loop. So it is better to loose some data (which also justifies - * duplicating that code, too...) -- rgerhards, 2008-01-10 - */ - pThis->iBufPtr = 0; + iWritten = lenBuf; + CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); + pThis->iCurrOffs += iWritten; /* update user counter, if provided */ if(pThis->pUsrWCntr != NULL) *pThis->pUsrWCntr += iWritten; - if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) + if(pThis->bSync) { + CHKiRet(syncFile(pThis)); + } + + if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) { CHKiRet(strmCheckNextOutputFile(pThis)); + } else if(pThis->iSizeLimit != 0) { + CHKiRet(doSizeLimitProcessing(pThis)); + } + +finalize_it: + RETiRet; +} + + +/* write the output buffer in zip mode + * This means we compress it first and then do a physical write. + * Note that we always do a full deflateInit ... deflate ... deflateEnd + * sequence. While this is not optimal, we need to do it because we need + * to ensure that the file is readable even when we are aborted. Doing the + * full sequence brings us as far towards this goal as possible (and not + * doing it would be a total failure). It may be worth considering to + * add a config switch so that the user can decide the risk he is ready + * to take, but so far this is not yet implemented (not even requested ;)). + * rgerhards, 2009-06-04 + * For the time being, we take a very conservative approach and do not run this + * method multithreaded. This is done in an effort to solve a segfault condition + * that seems to be related to the zip code. -- rgerhards, 2009-09-22 + * TODO: make multithreaded again! + */ +static rsRetVal +doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + z_stream zstrm; + int zRet; /* zlib return state */ + bool bzInitDone = FALSE; + DEFiRet; + assert(pThis != NULL); + assert(pBuf != NULL); + + /* allocate deflate state */ + zstrm.zalloc = Z_NULL; + zstrm.zfree = Z_NULL; + zstrm.opaque = Z_NULL; + zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */ + /* see note in file header for the params we use with deflateInit2() */ + zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY); + if(zRet != Z_OK) { + DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet); + ABORT_FINALIZE(RS_RET_ZLIB_ERR); + } + bzInitDone = TRUE; + + /* now doing the compression */ + zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */ + zstrm.avail_in = lenBuf; + /* run deflate() on buffer until everything has been compressed */ + do { + DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in); + zstrm.avail_out = pThis->sIOBufSize; + zstrm.next_out = pThis->pZipBuf; + zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */ + DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out); + assert(zRet != Z_STREAM_ERROR); /* state not clobbered */ + if(zstrm.avail_out == pThis->sIOBufSize) + break; /* this is valid, indicates end of compression --> see zlib howto */ + CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out)); + } while (zstrm.avail_out == 0); + assert(zstrm.avail_in == 0); /* all input will be used */ finalize_it: - pThis->iBufPtr = 0; /* see comment above */ + if(bzInitDone) { + zRet = zlibw.DeflateEnd(&zstrm); + if(zRet != Z_OK) { + DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet); + } + } RETiRet; } @@ -503,15 +1119,16 @@ finalize_it: * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 */ -rsRetVal strmFlush(strm_t *pThis) +static rsRetVal +strmFlush(strm_t *pThis) { DEFiRet; ASSERT(pThis != NULL); dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr); - if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0) { - iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); + if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) { + iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr); } RETiRet; @@ -545,7 +1162,7 @@ static rsRetVal strmSeek(strm_t *pThis, off_t offs) /* seek to current offset. This is primarily a helper to readjust the OS file * pointer after a strm object has been deserialized. */ -rsRetVal strmSeekCurrOffs(strm_t *pThis) +static rsRetVal strmSeekCurrOffs(strm_t *pThis) { DEFiRet; @@ -558,12 +1175,18 @@ rsRetVal strmSeekCurrOffs(strm_t *pThis) /* write a *single* character to a stream object -- rgerhards, 2008-01-10 */ -rsRetVal strmWriteChar(strm_t *pThis, uchar c) +static rsRetVal strmWriteChar(strm_t *pThis, uchar c) { DEFiRet; ASSERT(pThis != NULL); + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + + if(pThis->bDisabled) + ABORT_FINALIZE(RS_RET_STREAM_DISABLED); + /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); @@ -573,12 +1196,19 @@ rsRetVal strmWriteChar(strm_t *pThis, uchar c) pThis->iBufPtr++; finalize_it: + if(pThis->bAsyncWrite) + d_pthread_mutex_unlock(&pThis->mut); + RETiRet; } -/* write an integer value (actually a long) to a stream object */ -rsRetVal strmWriteLong(strm_t *pThis, long i) +/* write an integer value (actually a long) to a stream object + * Note that we do not need to lock the mutex here, because we call + * strmWrite(), which does the lock (aka: we must not lock it, else we + * would run into a recursive lock, resulting in a deadlock!) + */ +static rsRetVal strmWriteLong(strm_t *pThis, long i) { DEFiRet; uchar szBuf[32]; @@ -593,45 +1223,67 @@ finalize_it: } -/* write memory buffer to a stream object +/* write memory buffer to a stream object. + * process the data in chunks and copy it over to our buffer. The caller-provided data + * may theoritically be larger than our buffer. In that case, we do multiple copies. One + * may argue if it were more efficient to write out the caller-provided buffer in that case + * and earlier versions of rsyslog did this. However, this introduces a lot of complexity + * inside the buffered writer and potential performance bottlenecks when trying to solve + * it. Now keep in mind that we actually do (almost?) never have a case where the + * caller-provided buffer is larger than our one. So instead of optimizing a case + * which normally does not exist, we expect some degradation in its case but make us + * perform better in the regular cases. -- rgerhards, 2009-07-07 */ -rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +static rsRetVal +strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; - size_t iPartial; + size_t iWrite; + size_t iOffset; ASSERT(pThis != NULL); ASSERT(pBuf != NULL); - /* check if the to-be-written data is larger than our buffer size */ - if(lenBuf >= pThis->sIOBufSize) { - /* it is - so we do a direct write, that is most efficient. - * TODO: is it really? think about disk block sizes! - */ - CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ - CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf)); - } else { - /* data fits into a buffer - we just need to see if it - * fits into the current buffer... - */ - if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) { - /* nope, so we must split it */ - iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ - if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial); - pThis->iBufPtr += iPartial; - } +//DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + + if(pThis->bDisabled) + ABORT_FINALIZE(RS_RET_STREAM_DISABLED); + + iOffset = 0; + do { + if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ - memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial); - pThis->iBufPtr = lenBuf - iPartial; - } else { - /* we have space, so we simply copy over the string */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf); - pThis->iBufPtr += lenBuf; } + iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ + if(iWrite > lenBuf) + iWrite = lenBuf; + memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf + iOffset, iWrite); + pThis->iBufPtr += iWrite; + iOffset += iWrite; + lenBuf -= iWrite; + } while(lenBuf > 0); + + /* now check if the buffer right at the end of the write is full and, if so, + * write it. This seems more natural than waiting (hours?) for the next message... + */ + if(pThis->iBufPtr == pThis->sIOBufSize) { + CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } finalize_it: + if(pThis->bAsyncWrite) { + if(pThis->bDoTimedWait == 0) { + /* we potentially have a partial buffer, so re-activate the + * writer thread that it can set and pick up timeouts. + */ + pThis->bDoTimedWait = 1; + pthread_cond_signal(&pThis->notEmpty); + } + d_pthread_mutex_unlock(&pThis->mut); + } + RETiRet; } @@ -644,34 +1296,27 @@ DEFpropSetMeth(strm, iFileNumDigits, int) DEFpropSetMeth(strm, tOperationsMode, int) DEFpropSetMeth(strm, tOpenMode, mode_t) DEFpropSetMeth(strm, sType, strmType_t) - -rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) +DEFpropSetMeth(strm, iZipLevel, int) +DEFpropSetMeth(strm, bSync, int) +DEFpropSetMeth(strm, sIOBufSize, size_t) +DEFpropSetMeth(strm, iSizeLimit, off_t) +DEFpropSetMeth(strm, iFlushInterval, int) +DEFpropSetMeth(strm, pszSizeLimitCmd, uchar*) + +static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) { pThis->iMaxFiles = iNewVal; pThis->iFileNumDigits = getNumberDigits(iNewVal); return RS_RET_OK; } -rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal) -{ - DEFiRet; - - if(iNewVal & O_APPEND) - ABORT_FINALIZE(RS_RET_PARAM_ERROR); - - pThis->iAddtlOpenFlags = iNewVal; - -finalize_it: - RETiRet; -} - /* set the stream's file prefix * The passed-in string is duplicated. So if the caller does not need * it any longer, it must free it. * rgerhards, 2008-01-09 */ -rsRetVal +static rsRetVal strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName) { DEFiRet; @@ -685,7 +1330,7 @@ strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName) if(pThis->pszFName != NULL) free(pThis->pszFName); - if((pThis->pszFName = malloc(sizeof(uchar) * iLenName + 1)) == NULL) + if((pThis->pszFName = malloc(sizeof(uchar) * (iLenName + 1))) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszFName, pszName, iLenName + 1); /* always think about the \0! */ @@ -701,7 +1346,7 @@ finalize_it: * it any longer, it must free it. * rgerhards, 2008-01-09 */ -rsRetVal +static rsRetVal strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir) { DEFiRet; @@ -745,7 +1390,7 @@ finalize_it: * * rgerhards, 2008-01-10 */ -rsRetVal strmRecordBegin(strm_t *pThis) +static rsRetVal strmRecordBegin(strm_t *pThis) { ASSERT(pThis != NULL); ASSERT(pThis->bInRecord == 0); @@ -753,7 +1398,7 @@ rsRetVal strmRecordBegin(strm_t *pThis) return RS_RET_OK; } -rsRetVal strmRecordEnd(strm_t *pThis) +static rsRetVal strmRecordEnd(strm_t *pThis) { DEFiRet; ASSERT(pThis != NULL); @@ -775,7 +1420,7 @@ rsRetVal strmRecordEnd(strm_t *pThis) * We do not serialize the dynamic properties. * rgerhards, 2008-01-10 */ -rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) +static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) { DEFiRet; int i; @@ -821,7 +1466,7 @@ finalize_it: * any new set overwrites the previous one. * rgerhards, 2008-02-27 */ -rsRetVal +static rsRetVal strmSetWCntr(strm_t *pThis, number_t *pWCnt) { DEFiRet; @@ -841,8 +1486,8 @@ strmSetWCntr(strm_t *pThis, number_t *pWCnt) /* This function can be used as a generic way to set properties. * rgerhards, 2008-01-11 */ -#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1) -rsRetVal strmSetProperty(strm_t *pThis, var_t *pProp) +#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, UCHAR_CONSTANT(name), sizeof(name) - 1) +static rsRetVal strmSetProperty(strm_t *pThis, var_t *pProp) { DEFiRet; @@ -881,7 +1526,7 @@ finalize_it: * reported on the second call may actually be lower than on the first call. This is due to * file circulation. A caller must deal with that. -- rgerhards, 2008-01-30 */ -rsRetVal +static rsRetVal strmGetCurrOffset(strm_t *pThis, int64 *pOffs) { DEFiRet; @@ -909,8 +1554,38 @@ CODESTARTobjQueryInterface(strm) * work here (if we can support an older interface version - that, * of course, also affects the "if" above). */ - /*xxxpIf->oID = OBJvm; SAMPLE */ - + pIf->Construct = strmConstruct; + pIf->ConstructFinalize = strmConstructFinalize; + pIf->Destruct = strmDestruct; + pIf->ReadChar = strmReadChar; + pIf->UnreadChar = strmUnreadChar; + pIf->ReadLine = strmReadLine; + pIf->SeekCurrOffs = strmSeekCurrOffs; + pIf->Write = strmWrite; + pIf->WriteChar = strmWriteChar; + pIf->WriteLong = strmWriteLong; + pIf->SetFName = strmSetFName; + pIf->SetDir = strmSetDir; + pIf->Flush = strmFlush; + pIf->RecordBegin = strmRecordBegin; + pIf->RecordEnd = strmRecordEnd; + pIf->Serialize = strmSerialize; + pIf->GetCurrOffset = strmGetCurrOffset; + pIf->SetWCntr = strmSetWCntr; + /* set methods */ + pIf->SetbDeleteOnClose = strmSetbDeleteOnClose; + pIf->SetiMaxFileSize = strmSetiMaxFileSize; + pIf->SetiMaxFiles = strmSetiMaxFiles; + pIf->SetiFileNumDigits = strmSetiFileNumDigits; + pIf->SettOperationsMode = strmSettOperationsMode; + pIf->SettOpenMode = strmSettOpenMode; + pIf->SetsType = strmSetsType; + pIf->SetiZipLevel = strmSetiZipLevel; + pIf->SetbSync = strmSetbSync; + pIf->SetsIOBufSize = strmSetsIOBufSize; + pIf->SetiSizeLimit = strmSetiSizeLimit; + pIf->SetiFlushInterval = strmSetiFlushInterval; + pIf->SetpszSizeLimitCmd = strmSetpszSizeLimitCmd; finalize_it: ENDobjQueryInterface(strm) @@ -927,7 +1602,5 @@ BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE) OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize); ENDObjClassInit(strm) - -/* - * vi:set ai: +/* vi:set ai: */ |