summaryrefslogtreecommitdiff
path: root/runtime/stream.c
diff options
context:
space:
mode:
authorMichael Biebl <biebl@debian.org>2013-03-18 15:39:58 +0100
committerMichael Biebl <biebl@debian.org>2013-03-18 15:39:58 +0100
commit86831d7a4f485e19befa8cc500d17766798ad07c (patch)
tree67b3cc93fcf2734e594c4e0c11609eaf1359e0f8 /runtime/stream.c
parentac1a840d0afd724fa614eed5d64112a9ecf097f8 (diff)
downloadrsyslog-86831d7a4f485e19befa8cc500d17766798ad07c.tar.gz
Imported Upstream version 7.3.8upstream/7.3.8
Diffstat (limited to 'runtime/stream.c')
-rw-r--r--runtime/stream.c233
1 files changed, 169 insertions, 64 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index 193d14d..3e890c7 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -74,11 +74,12 @@ DEFobjStaticHelpers
DEFobjCurrIf(zlibw)
/* forward definitions */
-static rsRetVal strmFlushInternal(strm_t *pThis);
+static rsRetVal strmFlushInternal(strm_t *pThis, int bFlushZip);
static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
static rsRetVal strmCloseFile(strm_t *pThis);
static void *asyncWriterThread(void *pPtr);
-static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
+static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush);
+static rsRetVal doZipFinish(strm_t *pThis);
static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
@@ -341,7 +342,10 @@ static rsRetVal strmCloseFile(strm_t *pThis)
(pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName);
if(pThis->tOperationsMode != STREAMMODE_READ) {
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
+ if(pThis->iZipLevel) {
+ doZipFinish(pThis);
+ }
if(pThis->bAsyncWrite) {
strmWaitAsyncWriterDone(pThis);
}
@@ -361,7 +365,13 @@ static rsRetVal strmCloseFile(strm_t *pThis)
pThis->fdDir = -1;
}
- if(pThis->bDeleteOnClose && pThis->pszCurrFName != NULL) {
+ if(pThis->bDeleteOnClose) {
+ if(pThis->pszCurrFName == NULL) {
+ CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
+ pThis->pszFName, pThis->lenFName, pThis->iCurrFNum,
+ pThis->iFileNumDigits));
+ }
+ DBGPRINTF("strmCloseFile: deleting '%s'\n", pThis->pszCurrFName);
if(unlink((char*) pThis->pszCurrFName) == -1) {
char errStr[1024];
int err = errno;
@@ -369,12 +379,13 @@ static rsRetVal strmCloseFile(strm_t *pThis)
DBGPRINTF("error %d unlinking '%s' - ignored: %s\n",
errno, pThis->pszCurrFName, errStr);
}
- free(pThis->pszCurrFName); /* no longer needed in any case (just for open) */
+ free(pThis->pszCurrFName);
pThis->pszCurrFName = NULL;
}
pThis->iCurrOffs = 0; /* we are back at begin of file */
+finalize_it:
RETiRet;
}
@@ -682,6 +693,7 @@ BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro!
pThis->fd = -1;
pThis->fdDir = -1;
pThis->iUngetC = -1;
+ pThis->bVeryReliableZip = 0;
pThis->sType = STREAMTYPE_FILE_SINGLE;
pThis->sIOBufSize = glblGetIOBufSize();
pThis->tOpenMode = 0600;
@@ -785,6 +797,7 @@ stopWriter(strm_t *pThis)
BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */
int i;
CODESTARTobjDestruct(strm)
+ /* we need to stop the ZIP writer */
if(pThis->bAsyncWrite)
/* Note: mutex will be unlocked in stopWriter! */
d_pthread_mutex_lock(&pThis->mut);
@@ -927,14 +940,14 @@ finalize_it:
/* write memory buffer to a stream object.
*/
static inline rsRetVal
-doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush)
{
DEFiRet;
ASSERT(pThis != NULL);
if(pThis->iZipLevel) {
- CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
+ CHKiRet(doZipWrite(pThis, pBuf, lenBuf, bFlush));
} else {
/* write without zipping */
CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
@@ -979,7 +992,7 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf)
* the background thread. -- rgerhards, 2009-07-07
*/
static rsRetVal
-strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlushZip)
{
DEFiRet;
@@ -998,7 +1011,7 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
if(pThis->bAsyncWrite) {
CHKiRet(doAsyncWriteInternal(pThis, lenBuf));
} else {
- CHKiRet(doWriteInternal(pThis, pBuf, lenBuf));
+ CHKiRet(doWriteInternal(pThis, pBuf, lenBuf, bFlushZip));
}
@@ -1019,17 +1032,20 @@ asyncWriterThread(void *pPtr)
sbool bTimedOut = 0;
strm_t *pThis = (strm_t*) pPtr;
int err;
+ uchar thrdName[256] = "rs:";
ISOBJ_TYPE_assert(pThis, strm);
BEGINfunc
+ ustrncpy(thrdName+3, pThis->pszFName, sizeof(thrdName)-4);
+ dbgOutputTID((char*)thrdName);
# if HAVE_PRCTL && defined PR_SET_NAME
- if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) {
+ if(prctl(PR_SET_NAME, (char*)thrdName, 0, 0, 0) != 0) {
DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer");
}
# endif
+ d_pthread_mutex_lock(&pThis->mut);
while(1) { /* loop broken inside */
- d_pthread_mutex_lock(&pThis->mut);
while(pThis->iCnt == 0) {
if(pThis->bStopWriter) {
pthread_cond_broadcast(&pThis->isEmpty);
@@ -1038,18 +1054,17 @@ asyncWriterThread(void *pPtr)
}
if(bTimedOut && pThis->iBufPtr > 0) {
/* if we timed out, we need to flush pending data */
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
bTimedOut = 0;
- continue; /* now we should have data */
+ d_pthread_mutex_unlock(&pThis->mut);
+ continue;
}
bTimedOut = 0;
timeoutComp(&t, pThis->iFlushInterval * 1000); /* *1000 millisconds */
if(pThis->bDoTimedWait) {
if((err = pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t)) != 0) {
- if(err == ETIMEDOUT) {
- bTimedOut = 1;
- } else {
- bTimedOut = 1;
+ bTimedOut = 1; /* simulate in any case */
+ if(err != ETIMEDOUT) {
char errStr[1024];
rs_strerror_r(err, errStr, sizeof(errStr));
DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n",
@@ -1064,8 +1079,12 @@ asyncWriterThread(void *pPtr)
bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
- doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
+
+ /* now we can do the actual write in parallel */
+ d_pthread_mutex_unlock(&pThis->mut);
+ doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf, 0); // TODO: flush state
// TODO: error check????? 2009-07-06
+ d_pthread_mutex_lock(&pThis->mut);
--pThis->iCnt;
if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) {
@@ -1073,8 +1092,8 @@ asyncWriterThread(void *pPtr)
if(pThis->iCnt == 0)
pthread_cond_broadcast(&pThis->isEmpty);
}
- d_pthread_mutex_unlock(&pThis->mut);
}
+ d_pthread_mutex_unlock(&pThis->mut);
finalize_it:
ENDfunc
@@ -1175,63 +1194,97 @@ finalize_it:
* rgerhards, 2009-06-04
*/
static rsRetVal
-doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush)
{
- z_stream zstrm;
int zRet; /* zlib return state */
- sbool bzInitDone = RSFALSE;
DEFiRet;
+ unsigned outavail;
assert(pThis != NULL);
assert(pBuf != NULL);
- /* allocate deflate state */
- zstrm.zalloc = Z_NULL;
- zstrm.zfree = Z_NULL;
- zstrm.opaque = Z_NULL;
- zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */
- /* see note in file header for the params we use with deflateInit2() */
- zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
- if(zRet != Z_OK) {
- DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
- ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ if(!pThis->bzInitDone) {
+ /* allocate deflate state */
+ pThis->zstrm.zalloc = Z_NULL;
+ pThis->zstrm.zfree = Z_NULL;
+ pThis->zstrm.opaque = Z_NULL;
+ /* see note in file header for the params we use with deflateInit2() */
+ zRet = zlibw.DeflateInit2(&pThis->zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
+ pThis->bzInitDone = RSTRUE;
}
- bzInitDone = RSTRUE;
/* now doing the compression */
- zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */
- zstrm.avail_in = lenBuf;
+ pThis->zstrm.next_in = (Bytef*) pBuf;
+ pThis->zstrm.avail_in = lenBuf;
/* run deflate() on buffer until everything has been compressed */
do {
- DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
- zstrm.avail_out = pThis->sIOBufSize;
- zstrm.next_out = pThis->pZipBuf;
- zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */
- DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
- assert(zRet != Z_STREAM_ERROR); /* state not clobbered */
- if(zstrm.avail_out == pThis->sIOBufSize)
- break; /* this is valid, indicates end of compression --> see zlib howto */
- CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out));
- } while (zstrm.avail_out == 0);
- assert(zstrm.avail_in == 0); /* all input will be used */
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = pThis->sIOBufSize;
+ pThis->zstrm.next_out = pThis->pZipBuf;
+ zRet = zlibw.Deflate(&pThis->zstrm, bFlush ? Z_SYNC_FLUSH : Z_NO_FLUSH); /* no bad return value */
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail =pThis->sIOBufSize - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail));
+ }
+ } while (pThis->zstrm.avail_out == 0);
finalize_it:
- if(bzInitDone) {
- zRet = zlibw.DeflateEnd(&zstrm);
- if(zRet != Z_OK) {
- DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
- }
+ if(pThis->bzInitDone && pThis->bVeryReliableZip) {
+ doZipFinish(pThis);
}
-
RETiRet;
}
+
+/* finish zlib buffer, to be called before closing the ZIP file (if
+ * running in stream mode).
+ */
+static rsRetVal
+doZipFinish(strm_t *pThis)
+{
+ int zRet; /* zlib return state */
+ DEFiRet;
+ unsigned outavail;
+ assert(pThis != NULL);
+
+ if(!pThis->bzInitDone)
+ goto done;
+
+ pThis->zstrm.avail_in = 0;
+ /* run deflate() on buffer until everything has been compressed */
+ do {
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = pThis->sIOBufSize;
+ pThis->zstrm.next_out = pThis->pZipBuf;
+ zRet = zlibw.Deflate(&pThis->zstrm, Z_FINISH); /* no bad return value */
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail = pThis->sIOBufSize - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail));
+ }
+ } while (pThis->zstrm.avail_out == 0);
+
+finalize_it:
+ zRet = zlibw.DeflateEnd(&pThis->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
+ }
+
+ pThis->bzInitDone = 0;
+done: RETiRet;
+}
+
/* flush stream output buffer to persistent storage. This can be called at any time
* and is automatically called when the output buffer is full.
* rgerhards, 2008-01-10
*/
static rsRetVal
-strmFlushInternal(strm_t *pThis)
+strmFlushInternal(strm_t *pThis, int bFlushZip)
{
DEFiRet;
@@ -1241,7 +1294,7 @@ strmFlushInternal(strm_t *pThis)
(long) pThis->iBufPtr, (pThis->iBufPtr == 0) ? " (no need to flush)" : "");
if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) {
- iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr);
+ iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr, bFlushZip);
}
RETiRet;
@@ -1263,7 +1316,7 @@ strmFlush(strm_t *pThis)
if(pThis->bAsyncWrite)
d_pthread_mutex_lock(&pThis->mut);
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 1));
finalize_it:
if(pThis->bAsyncWrite)
@@ -1286,7 +1339,7 @@ static rsRetVal strmSeek(strm_t *pThis, off64_t offs)
if(pThis->fd == -1) {
CHKiRet(strmOpenFile(pThis));
} else {
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 0));
}
long long i;
DBGOPRINT((obj_t*) pThis, "file %d seek, pos %llu\n", pThis->fd, (long long unsigned) offs);
@@ -1298,6 +1351,56 @@ finalize_it:
RETiRet;
}
+/* multi-file seek, seeks to file number & offset within file. This
+ * is a support function for the queue, in circular mode. DO NOT USE
+ * IT FOR OTHER NEEDS - it may not work as expected. It will
+ * seek to the new position and delete interim files, as it skips them.
+ * Note: this code can be removed when the queue gets a new disk store
+ * handler (if and when it does ;)).
+ * The output parameter bytesDel receives the number of bytes that have
+ * been deleted (if a file is deleted) or 0 if nothing was deleted.
+ * rgerhards, 2012-11-07
+ */
+rsRetVal
+strmMultiFileSeek(strm_t *pThis, int FNum, off64_t offs, off64_t *bytesDel)
+{
+ struct stat statBuf;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ if(FNum == 0 && offs == 0) { /* happens during queue init */
+ *bytesDel = 0;
+ FINALIZE;
+ }
+
+ if(pThis->iCurrFNum != FNum) {
+ /* Note: we assume that no more than one file is skipped - an
+ * assumption that is being used also by the whole rest of the
+ * code and most notably the queue subsystem.
+ */
+ CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
+ pThis->pszFName, pThis->lenFName, pThis->iCurrFNum,
+ pThis->iFileNumDigits));
+ stat((char*)pThis->pszCurrFName, &statBuf);
+ *bytesDel = statBuf.st_size;
+ DBGPRINTF("strmMultiFileSeek: detected new filenum, was %d, new %d, "
+ "deleting '%s' (%lld bytes)\n", pThis->iCurrFNum, FNum,
+ pThis->pszCurrFName, (long long) *bytesDel);
+ unlink((char*)pThis->pszCurrFName);
+ free(pThis->pszCurrFName);
+ pThis->pszCurrFName = NULL;
+ pThis->iCurrFNum = FNum;
+ } else {
+ *bytesDel = 0;
+ }
+ pThis->iCurrOffs = offs;
+
+finalize_it:
+ RETiRet;
+}
+
+
/* seek to current offset. This is primarily a helper to readjust the OS file
* pointer after a strm object has been deserialized.
@@ -1329,7 +1432,7 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
/* if the buffer is full, we need to flush before we can write */
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 0));
}
/* we now always have space for one character, so we simply copy it */
*(pThis->pIOBuf + pThis->iBufPtr) = c;
@@ -1389,17 +1492,17 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
ASSERT(pThis != NULL);
ASSERT(pBuf != NULL);
-//DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs);
- if(pThis->bAsyncWrite)
- d_pthread_mutex_lock(&pThis->mut);
-
+ /* DEV DEBUG ONLY DBGPRINTF("strmWrite(%p[%s], '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pThis->pszCurrFName, pBuf,(long) lenBuf, pThis->bDisabled, (long) pThis->iSizeLimit, (long long) pThis->iCurrOffs); */
if(pThis->bDisabled)
ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_lock(&pThis->mut);
+
iOffset = 0;
do {
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */
}
iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
if(iWrite > lenBuf)
@@ -1414,7 +1517,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
* write it. This seems more natural than waiting (hours?) for the next message...
*/
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */
}
finalize_it:
@@ -1442,6 +1545,7 @@ DEFpropSetMeth(strm, tOperationsMode, int)
DEFpropSetMeth(strm, tOpenMode, mode_t)
DEFpropSetMeth(strm, sType, strmType_t)
DEFpropSetMeth(strm, iZipLevel, int)
+DEFpropSetMeth(strm, bVeryReliableZip, int)
DEFpropSetMeth(strm, bSync, int)
DEFpropSetMeth(strm, sIOBufSize, size_t)
DEFpropSetMeth(strm, iSizeLimit, off_t)
@@ -1573,7 +1677,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
ISOBJ_TYPE_assert(pThis, strm);
ISOBJ_TYPE_assert(pStrm, strm);
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
CHKiRet(obj.BeginSerialize(pStrm, (obj_t*) pThis));
objSerializeSCALAR(pStrm, iCurrFNum, INT);
@@ -1770,6 +1874,7 @@ CODESTARTobjQueryInterface(strm)
pIf->SettOpenMode = strmSettOpenMode;
pIf->SetsType = strmSetsType;
pIf->SetiZipLevel = strmSetiZipLevel;
+ pIf->SetbVeryReliableZip = strmSetbVeryReliableZip;
pIf->SetbSync = strmSetbSync;
pIf->SetsIOBufSize = strmSetsIOBufSize;
pIf->SetiSizeLimit = strmSetiSizeLimit;