diff options
author | Michael Biebl <biebl@debian.org> | 2010-02-24 20:31:30 +0100 |
---|---|---|
committer | Michael Biebl <biebl@debian.org> | 2010-02-24 20:31:30 +0100 |
commit | 98a45d0b54c09ca82b3540491915ad6cc61c1a97 (patch) | |
tree | 0ec782ee7d1097acfdf2962b3b9b3404db314002 /runtime/stream.h | |
parent | b743785de633f7ff5c39f980496d359e4758ec83 (diff) | |
download | rsyslog-98a45d0b54c09ca82b3540491915ad6cc61c1a97.tar.gz |
Imported Upstream version 4.6.0upstream/4.6.0
Diffstat (limited to 'runtime/stream.h')
-rw-r--r-- | runtime/stream.h | 138 |
1 files changed, 100 insertions, 38 deletions
diff --git a/runtime/stream.h b/runtime/stream.h index 371358a..89175b0 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -19,7 +19,29 @@ * can easily be persistet. The bottom line is that it makes much sense to * use this class whereever possible as its features may grow in the future. * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * An important note on writing gzip format via zlib (kept anonymous + * by request): + * + * -------------------------------------------------------------------------- + * We'd like to make sure the output file is in full gzip format + * (compatible with gzip -d/zcat etc). There is a flag in how the output + * is initialized within zlib to properly add the gzip wrappers to the + * output. (gzip is effectively a small metadata wrapper around raw + * zstream output.) + * + * I had written an old bit of code to do this - the documentation on + * deflatInit2() was pretty tricky to nail down on this specific feature: + * + * int deflateInit2 (z_streamp strm, int level, int method, int windowBits, + * int memLevel, int strategy); + * + * I believe "31" would be the value for the "windowBits" field that you'd + * want to try: + * + * deflateInit2(zstrmptr, 6, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY); + * -------------------------------------------------------------------------- + * + * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -47,20 +69,26 @@ #include "obj-types.h" #include "glbl.h" #include "stream.h" +#include "zlibw.h" +#include "apc.h" /* stream types */ typedef enum { STREAMTYPE_FILE_SINGLE = 0, /**< read a single file */ STREAMTYPE_FILE_CIRCULAR = 1, /**< circular files */ - STREAMTYPE_FILE_MONITOR = 2 /**< monitor a (third-party) file */ + STREAMTYPE_FILE_MONITOR = 2, /**< monitor a (third-party) file */ + STREAMTYPE_NAMED_PIPE = 3 /**< file is a named pipe (so far, tested for output only) */ } strmType_t; -typedef enum { +typedef enum { /* when extending, do NOT change existing modes! */ STREAMMMODE_INVALID = 0, STREAMMODE_READ = 1, - STREAMMODE_WRITE = 2 + STREAMMODE_WRITE = 2, + STREAMMODE_WRITE_TRUNC = 3, + STREAMMODE_WRITE_APPEND = 4 } strmMode_t; +#define STREAM_ASYNC_NUMBUFS 2 /* must be a power of 2 -- TODO: make configurable */ /* The strm_t data structure */ typedef struct strm_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ @@ -71,61 +99,95 @@ typedef struct strm_s { int lenFName; strmMode_t tOperationsMode; mode_t tOpenMode; - int iAddtlOpenFlags; /* can be used to specifiy additional (compatible!) open flags */ int64 iMaxFileSize;/* maximum size a file may grow to */ int iMaxFiles; /* maximum number of files if a circular mode is in use */ int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */ - int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */ + bool bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */ int64 iCurrOffs;/* current offset */ int64 *pUsrWCntr; /* NULL or a user-provided counter that receives the nbr of bytes written since the last CntrSet() */ /* dynamic properties, valid only during file open, not to be persistet */ - size_t sIOBufSize;/* size of IO buffer */ + bool bDisabled; /* should file no longer be written to? (currently set only if omfile file size limit fails) */ + bool bSync; /* sync this file after every write? */ + size_t sIOBufSize;/* size of IO buffer */ uchar *pszDir; /* Directory */ int lenDir; int fd; /* the file descriptor, -1 if closed */ + int fdDir; /* the directory's descriptor, in case bSync is requested (-1 if closed) */ uchar *pszCurrFName; /* name of current file (if open) */ - uchar *pIOBuf; /* io Buffer */ + uchar *pIOBuf; /* the iobuffer currently in use to gather data */ size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */ size_t iBufPtr; /* pointer into current buffer */ int iUngetC; /* char set via UngetChar() call or -1 if none set */ - int bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */ + bool bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */ + bool bInClose; /* used to break "deadly close loops", tells us we are already inside a close */ + int iZipLevel; /* zip level (0..9). If 0, zip is completely disabled */ + Bytef *pZipBuf; + /* support for async flush procesing */ + bool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */ + bool bStopWriter; /* shall writer thread terminate? */ + bool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */ + int iFlushInterval; /* flush in which interval - 0, no flushing */ + apc_id_t apcID; /* id of current Apc request (used for cancelling) */ + pthread_mutex_t mut;/* mutex for flush in async mode */ + pthread_cond_t notFull; + pthread_cond_t notEmpty; + pthread_cond_t isEmpty; + unsigned short iEnq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */ + unsigned short iDeq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */ + short iCnt; /* current nbr of elements in buffer */ + struct { + uchar *pBuf; + size_t lenBuf; + } asyncBuf[STREAM_ASYNC_NUMBUFS]; + pthread_t writerThreadID; + int apcRequested; /* is an apc Requested? */ + /* support for omfile size-limiting commands, special counters, NOT persisted! */ + off_t iSizeLimit; /* file size limit, 0 = no limit */ + uchar *pszSizeLimitCmd; /* command to carry out when size limit is reached */ + bool bIsTTY; /* is this a tty file? */ } strm_t; + /* interfaces */ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ + rsRetVal (*Construct)(strm_t **ppThis); + rsRetVal (*ConstructFinalize)(strm_t *pThis); + rsRetVal (*Destruct)(strm_t **ppThis); + rsRetVal (*SetMaxFileSize)(strm_t *pThis, int64 iMaxFileSize); + rsRetVal (*SetFileName)(strm_t *pThis, uchar *pszName, size_t iLenName); + rsRetVal (*ReadChar)(strm_t *pThis, uchar *pC); + rsRetVal (*UnreadChar)(strm_t *pThis, uchar c); + rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr); + rsRetVal (*SeekCurrOffs)(strm_t *pThis); + rsRetVal (*Write)(strm_t *pThis, uchar *pBuf, size_t lenBuf); + rsRetVal (*WriteChar)(strm_t *pThis, uchar c); + rsRetVal (*WriteLong)(strm_t *pThis, long i); + rsRetVal (*SetFName)(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); + rsRetVal (*SetDir)(strm_t *pThis, uchar *pszDir, size_t iLenDir); + rsRetVal (*Flush)(strm_t *pThis); + rsRetVal (*RecordBegin)(strm_t *pThis); + rsRetVal (*RecordEnd)(strm_t *pThis); + rsRetVal (*Serialize)(strm_t *pThis, strm_t *pStrm); + rsRetVal (*GetCurrOffset)(strm_t *pThis, int64 *pOffs); + rsRetVal (*SetWCntr)(strm_t *pThis, number_t *pWCnt); + INTERFACEpropSetMeth(strm, bDeleteOnClose, int); + INTERFACEpropSetMeth(strm, iMaxFileSize, int); + INTERFACEpropSetMeth(strm, iMaxFiles, int); + INTERFACEpropSetMeth(strm, iFileNumDigits, int); + INTERFACEpropSetMeth(strm, tOperationsMode, int); + INTERFACEpropSetMeth(strm, tOpenMode, mode_t); + INTERFACEpropSetMeth(strm, sType, strmType_t); + INTERFACEpropSetMeth(strm, iZipLevel, int); + INTERFACEpropSetMeth(strm, bSync, int); + INTERFACEpropSetMeth(strm, sIOBufSize, size_t); + INTERFACEpropSetMeth(strm, iSizeLimit, off_t); + INTERFACEpropSetMeth(strm, iFlushInterval, int); + INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*); ENDinterface(strm) -#define strmCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ +#define strmCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */ /* prototypes */ -rsRetVal strmConstruct(strm_t **ppThis); -rsRetVal strmConstructFinalize(strm_t __attribute__((unused)) *pThis); -rsRetVal strmDestruct(strm_t **ppThis); -rsRetVal strmSetMaxFileSize(strm_t *pThis, int64 iMaxFileSize); -rsRetVal strmSetFileName(strm_t *pThis, uchar *pszName, size_t iLenName); -rsRetVal strmReadChar(strm_t *pThis, uchar *pC); -rsRetVal strmUnreadChar(strm_t *pThis, uchar c); -rsRetVal strmReadLine(strm_t *pThis, cstr_t **ppCStr); -rsRetVal strmSeekCurrOffs(strm_t *pThis); -rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); -rsRetVal strmWriteChar(strm_t *pThis, uchar c); -rsRetVal strmWriteLong(strm_t *pThis, long i); -rsRetVal strmSetFName(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); -rsRetVal strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir); -rsRetVal strmFlush(strm_t *pThis); -rsRetVal strmRecordBegin(strm_t *pThis); -rsRetVal strmRecordEnd(strm_t *pThis); -rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm); -rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal); -rsRetVal strmGetCurrOffset(strm_t *pThis, int64 *pOffs); -rsRetVal strmSetWCntr(strm_t *pThis, number_t *pWCnt); PROTOTYPEObjClassInit(strm); -PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int); -PROTOTYPEpropSetMeth(strm, iMaxFileSize, int); -PROTOTYPEpropSetMeth(strm, iMaxFiles, int); -PROTOTYPEpropSetMeth(strm, iFileNumDigits, int); -PROTOTYPEpropSetMeth(strm, tOperationsMode, int); -PROTOTYPEpropSetMeth(strm, tOpenMode, mode_t); -PROTOTYPEpropSetMeth(strm, sType, strmType_t); #endif /* #ifndef STREAM_H_INCLUDED */ |