diff options
author | Michael Biebl <biebl@debian.org> | 2013-03-18 15:39:58 +0100 |
---|---|---|
committer | Michael Biebl <biebl@debian.org> | 2013-03-18 15:39:58 +0100 |
commit | 86831d7a4f485e19befa8cc500d17766798ad07c (patch) | |
tree | 67b3cc93fcf2734e594c4e0c11609eaf1359e0f8 /runtime/stream.c | |
parent | ac1a840d0afd724fa614eed5d64112a9ecf097f8 (diff) | |
download | rsyslog-86831d7a4f485e19befa8cc500d17766798ad07c.tar.gz |
Imported Upstream version 7.3.8upstream/7.3.8
Diffstat (limited to 'runtime/stream.c')
-rw-r--r-- | runtime/stream.c | 233 |
1 files changed, 169 insertions, 64 deletions
diff --git a/runtime/stream.c b/runtime/stream.c index 193d14d..3e890c7 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -74,11 +74,12 @@ DEFobjStaticHelpers DEFobjCurrIf(zlibw) /* forward definitions */ -static rsRetVal strmFlushInternal(strm_t *pThis); +static rsRetVal strmFlushInternal(strm_t *pThis, int bFlushZip); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); static void *asyncWriterThread(void *pPtr); -static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); +static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush); +static rsRetVal doZipFinish(strm_t *pThis); static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); @@ -341,7 +342,10 @@ static rsRetVal strmCloseFile(strm_t *pThis) (pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName); if(pThis->tOperationsMode != STREAMMODE_READ) { - strmFlushInternal(pThis); + strmFlushInternal(pThis, 0); + if(pThis->iZipLevel) { + doZipFinish(pThis); + } if(pThis->bAsyncWrite) { strmWaitAsyncWriterDone(pThis); } @@ -361,7 +365,13 @@ static rsRetVal strmCloseFile(strm_t *pThis) pThis->fdDir = -1; } - if(pThis->bDeleteOnClose && pThis->pszCurrFName != NULL) { + if(pThis->bDeleteOnClose) { + if(pThis->pszCurrFName == NULL) { + CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, + pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, + pThis->iFileNumDigits)); + } + DBGPRINTF("strmCloseFile: deleting '%s'\n", pThis->pszCurrFName); if(unlink((char*) pThis->pszCurrFName) == -1) { char errStr[1024]; int err = errno; @@ -369,12 +379,13 @@ static rsRetVal strmCloseFile(strm_t *pThis) DBGPRINTF("error %d unlinking '%s' - ignored: %s\n", errno, pThis->pszCurrFName, errStr); } - free(pThis->pszCurrFName); /* no longer needed in any case (just for open) */ + free(pThis->pszCurrFName); pThis->pszCurrFName = NULL; } pThis->iCurrOffs = 0; /* we are back at begin of file */ +finalize_it: RETiRet; } @@ -682,6 +693,7 @@ BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro! pThis->fd = -1; pThis->fdDir = -1; pThis->iUngetC = -1; + pThis->bVeryReliableZip = 0; pThis->sType = STREAMTYPE_FILE_SINGLE; pThis->sIOBufSize = glblGetIOBufSize(); pThis->tOpenMode = 0600; @@ -785,6 +797,7 @@ stopWriter(strm_t *pThis) BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ int i; CODESTARTobjDestruct(strm) + /* we need to stop the ZIP writer */ if(pThis->bAsyncWrite) /* Note: mutex will be unlocked in stopWriter! */ d_pthread_mutex_lock(&pThis->mut); @@ -927,14 +940,14 @@ finalize_it: /* write memory buffer to a stream object. */ static inline rsRetVal -doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush) { DEFiRet; ASSERT(pThis != NULL); if(pThis->iZipLevel) { - CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); + CHKiRet(doZipWrite(pThis, pBuf, lenBuf, bFlush)); } else { /* write without zipping */ CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); @@ -979,7 +992,7 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) * the background thread. -- rgerhards, 2009-07-07 */ static rsRetVal -strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlushZip) { DEFiRet; @@ -998,7 +1011,7 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) if(pThis->bAsyncWrite) { CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); } else { - CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); + CHKiRet(doWriteInternal(pThis, pBuf, lenBuf, bFlushZip)); } @@ -1019,17 +1032,20 @@ asyncWriterThread(void *pPtr) sbool bTimedOut = 0; strm_t *pThis = (strm_t*) pPtr; int err; + uchar thrdName[256] = "rs:"; ISOBJ_TYPE_assert(pThis, strm); BEGINfunc + ustrncpy(thrdName+3, pThis->pszFName, sizeof(thrdName)-4); + dbgOutputTID((char*)thrdName); # if HAVE_PRCTL && defined PR_SET_NAME - if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) { + if(prctl(PR_SET_NAME, (char*)thrdName, 0, 0, 0) != 0) { DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } # endif + d_pthread_mutex_lock(&pThis->mut); while(1) { /* loop broken inside */ - d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt == 0) { if(pThis->bStopWriter) { pthread_cond_broadcast(&pThis->isEmpty); @@ -1038,18 +1054,17 @@ asyncWriterThread(void *pPtr) } if(bTimedOut && pThis->iBufPtr > 0) { /* if we timed out, we need to flush pending data */ - strmFlushInternal(pThis); + strmFlushInternal(pThis, 0); bTimedOut = 0; - continue; /* now we should have data */ + d_pthread_mutex_unlock(&pThis->mut); + continue; } bTimedOut = 0; timeoutComp(&t, pThis->iFlushInterval * 1000); /* *1000 millisconds */ if(pThis->bDoTimedWait) { if((err = pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t)) != 0) { - if(err == ETIMEDOUT) { - bTimedOut = 1; - } else { - bTimedOut = 1; + bTimedOut = 1; /* simulate in any case */ + if(err != ETIMEDOUT) { char errStr[1024]; rs_strerror_r(err, errStr, sizeof(errStr)); DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n", @@ -1064,8 +1079,12 @@ asyncWriterThread(void *pPtr) bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; - doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); + + /* now we can do the actual write in parallel */ + d_pthread_mutex_unlock(&pThis->mut); + doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf, 0); // TODO: flush state // TODO: error check????? 2009-07-06 + d_pthread_mutex_lock(&pThis->mut); --pThis->iCnt; if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) { @@ -1073,8 +1092,8 @@ asyncWriterThread(void *pPtr) if(pThis->iCnt == 0) pthread_cond_broadcast(&pThis->isEmpty); } - d_pthread_mutex_unlock(&pThis->mut); } + d_pthread_mutex_unlock(&pThis->mut); finalize_it: ENDfunc @@ -1175,63 +1194,97 @@ finalize_it: * rgerhards, 2009-06-04 */ static rsRetVal -doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush) { - z_stream zstrm; int zRet; /* zlib return state */ - sbool bzInitDone = RSFALSE; DEFiRet; + unsigned outavail; assert(pThis != NULL); assert(pBuf != NULL); - /* allocate deflate state */ - zstrm.zalloc = Z_NULL; - zstrm.zfree = Z_NULL; - zstrm.opaque = Z_NULL; - zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */ - /* see note in file header for the params we use with deflateInit2() */ - zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY); - if(zRet != Z_OK) { - DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet); - ABORT_FINALIZE(RS_RET_ZLIB_ERR); + if(!pThis->bzInitDone) { + /* allocate deflate state */ + pThis->zstrm.zalloc = Z_NULL; + pThis->zstrm.zfree = Z_NULL; + pThis->zstrm.opaque = Z_NULL; + /* see note in file header for the params we use with deflateInit2() */ + zRet = zlibw.DeflateInit2(&pThis->zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY); + if(zRet != Z_OK) { + DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet); + ABORT_FINALIZE(RS_RET_ZLIB_ERR); + } + pThis->bzInitDone = RSTRUE; } - bzInitDone = RSTRUE; /* now doing the compression */ - zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */ - zstrm.avail_in = lenBuf; + pThis->zstrm.next_in = (Bytef*) pBuf; + pThis->zstrm.avail_in = lenBuf; /* run deflate() on buffer until everything has been compressed */ do { - DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in); - zstrm.avail_out = pThis->sIOBufSize; - zstrm.next_out = pThis->pZipBuf; - zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */ - DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out); - assert(zRet != Z_STREAM_ERROR); /* state not clobbered */ - if(zstrm.avail_out == pThis->sIOBufSize) - break; /* this is valid, indicates end of compression --> see zlib howto */ - CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out)); - } while (zstrm.avail_out == 0); - assert(zstrm.avail_in == 0); /* all input will be used */ + DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in); + pThis->zstrm.avail_out = pThis->sIOBufSize; + pThis->zstrm.next_out = pThis->pZipBuf; + zRet = zlibw.Deflate(&pThis->zstrm, bFlush ? Z_SYNC_FLUSH : Z_NO_FLUSH); /* no bad return value */ + DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out); + outavail =pThis->sIOBufSize - pThis->zstrm.avail_out; + if(outavail != 0) { + CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail)); + } + } while (pThis->zstrm.avail_out == 0); finalize_it: - if(bzInitDone) { - zRet = zlibw.DeflateEnd(&zstrm); - if(zRet != Z_OK) { - DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet); - } + if(pThis->bzInitDone && pThis->bVeryReliableZip) { + doZipFinish(pThis); } - RETiRet; } + +/* finish zlib buffer, to be called before closing the ZIP file (if + * running in stream mode). + */ +static rsRetVal +doZipFinish(strm_t *pThis) +{ + int zRet; /* zlib return state */ + DEFiRet; + unsigned outavail; + assert(pThis != NULL); + + if(!pThis->bzInitDone) + goto done; + + pThis->zstrm.avail_in = 0; + /* run deflate() on buffer until everything has been compressed */ + do { + DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in); + pThis->zstrm.avail_out = pThis->sIOBufSize; + pThis->zstrm.next_out = pThis->pZipBuf; + zRet = zlibw.Deflate(&pThis->zstrm, Z_FINISH); /* no bad return value */ + DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out); + outavail = pThis->sIOBufSize - pThis->zstrm.avail_out; + if(outavail != 0) { + CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail)); + } + } while (pThis->zstrm.avail_out == 0); + +finalize_it: + zRet = zlibw.DeflateEnd(&pThis->zstrm); + if(zRet != Z_OK) { + DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet); + } + + pThis->bzInitDone = 0; +done: RETiRet; +} + /* flush stream output buffer to persistent storage. This can be called at any time * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 */ static rsRetVal -strmFlushInternal(strm_t *pThis) +strmFlushInternal(strm_t *pThis, int bFlushZip) { DEFiRet; @@ -1241,7 +1294,7 @@ strmFlushInternal(strm_t *pThis) (long) pThis->iBufPtr, (pThis->iBufPtr == 0) ? " (no need to flush)" : ""); if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) { - iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr); + iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr, bFlushZip); } RETiRet; @@ -1263,7 +1316,7 @@ strmFlush(strm_t *pThis) if(pThis->bAsyncWrite) d_pthread_mutex_lock(&pThis->mut); - CHKiRet(strmFlushInternal(pThis)); + CHKiRet(strmFlushInternal(pThis, 1)); finalize_it: if(pThis->bAsyncWrite) @@ -1286,7 +1339,7 @@ static rsRetVal strmSeek(strm_t *pThis, off64_t offs) if(pThis->fd == -1) { CHKiRet(strmOpenFile(pThis)); } else { - CHKiRet(strmFlushInternal(pThis)); + CHKiRet(strmFlushInternal(pThis, 0)); } long long i; DBGOPRINT((obj_t*) pThis, "file %d seek, pos %llu\n", pThis->fd, (long long unsigned) offs); @@ -1298,6 +1351,56 @@ finalize_it: RETiRet; } +/* multi-file seek, seeks to file number & offset within file. This + * is a support function for the queue, in circular mode. DO NOT USE + * IT FOR OTHER NEEDS - it may not work as expected. It will + * seek to the new position and delete interim files, as it skips them. + * Note: this code can be removed when the queue gets a new disk store + * handler (if and when it does ;)). + * The output parameter bytesDel receives the number of bytes that have + * been deleted (if a file is deleted) or 0 if nothing was deleted. + * rgerhards, 2012-11-07 + */ +rsRetVal +strmMultiFileSeek(strm_t *pThis, int FNum, off64_t offs, off64_t *bytesDel) +{ + struct stat statBuf; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + + if(FNum == 0 && offs == 0) { /* happens during queue init */ + *bytesDel = 0; + FINALIZE; + } + + if(pThis->iCurrFNum != FNum) { + /* Note: we assume that no more than one file is skipped - an + * assumption that is being used also by the whole rest of the + * code and most notably the queue subsystem. + */ + CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, + pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, + pThis->iFileNumDigits)); + stat((char*)pThis->pszCurrFName, &statBuf); + *bytesDel = statBuf.st_size; + DBGPRINTF("strmMultiFileSeek: detected new filenum, was %d, new %d, " + "deleting '%s' (%lld bytes)\n", pThis->iCurrFNum, FNum, + pThis->pszCurrFName, (long long) *bytesDel); + unlink((char*)pThis->pszCurrFName); + free(pThis->pszCurrFName); + pThis->pszCurrFName = NULL; + pThis->iCurrFNum = FNum; + } else { + *bytesDel = 0; + } + pThis->iCurrOffs = offs; + +finalize_it: + RETiRet; +} + + /* seek to current offset. This is primarily a helper to readjust the OS file * pointer after a strm object has been deserialized. @@ -1329,7 +1432,7 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlushInternal(pThis)); + CHKiRet(strmFlushInternal(pThis, 0)); } /* we now always have space for one character, so we simply copy it */ *(pThis->pIOBuf + pThis->iBufPtr) = c; @@ -1389,17 +1492,17 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pThis != NULL); ASSERT(pBuf != NULL); -//DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); - if(pThis->bAsyncWrite) - d_pthread_mutex_lock(&pThis->mut); - + /* DEV DEBUG ONLY DBGPRINTF("strmWrite(%p[%s], '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pThis->pszCurrFName, pBuf,(long) lenBuf, pThis->bDisabled, (long) pThis->iSizeLimit, (long long) pThis->iCurrOffs); */ if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + iOffset = 0; do { if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */ + CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */ } iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) @@ -1414,7 +1517,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) * write it. This seems more natural than waiting (hours?) for the next message... */ if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */ + CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */ } finalize_it: @@ -1442,6 +1545,7 @@ DEFpropSetMeth(strm, tOperationsMode, int) DEFpropSetMeth(strm, tOpenMode, mode_t) DEFpropSetMeth(strm, sType, strmType_t) DEFpropSetMeth(strm, iZipLevel, int) +DEFpropSetMeth(strm, bVeryReliableZip, int) DEFpropSetMeth(strm, bSync, int) DEFpropSetMeth(strm, sIOBufSize, size_t) DEFpropSetMeth(strm, iSizeLimit, off_t) @@ -1573,7 +1677,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) ISOBJ_TYPE_assert(pThis, strm); ISOBJ_TYPE_assert(pStrm, strm); - strmFlushInternal(pThis); + strmFlushInternal(pThis, 0); CHKiRet(obj.BeginSerialize(pStrm, (obj_t*) pThis)); objSerializeSCALAR(pStrm, iCurrFNum, INT); @@ -1770,6 +1874,7 @@ CODESTARTobjQueryInterface(strm) pIf->SettOpenMode = strmSettOpenMode; pIf->SetsType = strmSetsType; pIf->SetiZipLevel = strmSetiZipLevel; + pIf->SetbVeryReliableZip = strmSetbVeryReliableZip; pIf->SetbSync = strmSetbSync; pIf->SetsIOBufSize = strmSetsIOBufSize; pIf->SetiSizeLimit = strmSetiSizeLimit; |