From 98a45d0b54c09ca82b3540491915ad6cc61c1a97 Mon Sep 17 00:00:00 2001 From: Michael Biebl Date: Wed, 24 Feb 2010 20:31:30 +0100 Subject: Imported Upstream version 4.6.0 --- tcps_sess.c | 84 +++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 59 insertions(+), 25 deletions(-) (limited to 'tcps_sess.c') diff --git a/tcps_sess.c b/tcps_sess.c index cfee052..8d30738 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -36,6 +36,7 @@ #include "rsyslog.h" #include "dirty.h" +#include "unicode-helper.h" #include "module-template.h" #include "net.h" #include "tcpsrv.h" @@ -45,6 +46,7 @@ #include "netstrm.h" #include "msg.h" #include "datetime.h" +#include "prop.h" /* static data */ @@ -52,10 +54,14 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) DEFobjCurrIf(errmsg) DEFobjCurrIf(netstrm) +DEFobjCurrIf(prop) DEFobjCurrIf(datetime) static int iMaxLine; /* maximum size of a single message */ +static int iNbrTimeUsed = 0; /* how often has previous time been used so far? */ + + /* forward definitions */ static rsRetVal Close(tcps_sess_t *pThis); @@ -97,8 +103,10 @@ CODESTARTobjDestruct(tcps_sess) pThis->pSrv->pOnSessDestruct(&pThis->pUsr); } /* now destruct our own properties */ - free(pThis->fromHost); - free(pThis->fromHostIP); + if(pThis->fromHost != NULL) + CHKiRet(prop.Destruct(&pThis->fromHost)); + if(pThis->fromHostIP != NULL) + CHKiRet(prop.Destruct(&pThis->fromHostIP)); free(pThis->pMsg); ENDobjDestruct(tcps_sess) @@ -121,9 +129,13 @@ SetHost(tcps_sess_t *pThis, uchar *pszHost) ISOBJ_TYPE_assert(pThis, tcps_sess); - free(pThis->fromHost); - pThis->fromHost = pszHost; + if(pThis->fromHost == NULL) + CHKiRet(prop.Construct(&pThis->fromHost)); + CHKiRet(prop.SetString(pThis->fromHost, pszHost, ustrlen(pszHost))); + +finalize_it: + free(pszHost); /* we must free according to our (old) calling conventions */ RETiRet; } @@ -138,9 +150,13 @@ SetHostIP(tcps_sess_t *pThis, uchar *pszHostIP) ISOBJ_TYPE_assert(pThis, tcps_sess); - free(pThis->fromHostIP); - pThis->fromHostIP = pszHostIP; + if(pThis->fromHostIP == NULL) + CHKiRet(prop.Construct(&pThis->fromHostIP)); + CHKiRet(prop.SetString(pThis->fromHostIP, pszHostIP, ustrlen(pszHostIP))); + +finalize_it: + free(pszHostIP); RETiRet; } @@ -217,7 +233,7 @@ SetOnMsgReceive(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar * rgerhards, 2009-04-23 */ static rsRetVal -defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttGenTime) +defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) { msg_t *pMsg; DEFiRet; @@ -231,17 +247,23 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); - /* first trim the buffer to what we have actually received */ - CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar) * pThis->iMsg)); - memcpy(pMsg->pszRawMsg, pThis->pMsg, pThis->iMsg); - pMsg->iLenRawMsg = pThis->iMsg; - MsgSetInputName(pMsg, pThis->pLstnInfo->pszInputName, pThis->pLstnInfo->lenInputName); + MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg); + MsgSetInputName(pMsg, pThis->pLstnInfo->pInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; pMsg->bParseHOSTNAME = 1; MsgSetRcvFrom(pMsg, pThis->fromHost); CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP)); - CHKiRet(submitMsg(pMsg)); + MsgSetRuleset(pMsg, pThis->pLstnInfo->pRuleset); + + if(pMultiSub == NULL) { + CHKiRet(submitMsg(pMsg)); + } else { + pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg(pMultiSub)); + } + finalize_it: /* reset status variables */ @@ -295,7 +317,7 @@ PrepareClose(tcps_sess_t *pThis) */ dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n"); datetime.getCurrTime(&stTime, &ttGenTime); - defaultDoSubmitMessage(pThis, &stTime, ttGenTime); + defaultDoSubmitMessage(pThis, &stTime, ttGenTime, NULL); } finalize_it: @@ -314,10 +336,11 @@ Close(tcps_sess_t *pThis) ISOBJ_TYPE_assert(pThis, tcps_sess); netstrm.Destruct(&pThis->pStrm); - free(pThis->fromHost); - pThis->fromHost = NULL; /* not really needed, but... */ - free(pThis->fromHostIP); - pThis->fromHostIP = NULL; /* not really needed, but... */ + if(pThis->fromHost != NULL) { + prop.Destruct(&pThis->fromHost); + } + if(pThis->fromHostIP != NULL) + prop.Destruct(&pThis->fromHostIP); RETiRet; } @@ -330,7 +353,7 @@ Close(tcps_sess_t *pThis) * rgerhards, 2008-03-14 */ static rsRetVal -processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t ttGenTime) +processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) { DEFiRet; ISOBJ_TYPE_assert(pThis, tcps_sess); @@ -376,7 +399,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt if(pThis->iMsg >= iMaxLine) { /* emergency, we now need to flush, no matter if we are at end of message or not... */ dbgprintf("error: message received is larger than max msg size, we split it\n"); - defaultDoSubmitMessage(pThis, stTime, ttGenTime); + defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); /* we might think if it is better to ignore the rest of the * message than to treat it as a new one. Maybe this is a good * candidate for a configuration parameter... @@ -387,7 +410,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt if(( (c == '\n') || ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim)) ) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */ - defaultDoSubmitMessage(pThis, stTime, ttGenTime); + defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); pThis->inputState = eAtStrtFram; } else { /* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes! @@ -404,7 +427,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt pThis->iOctetsRemain--; if(pThis->iOctetsRemain < 1) { /* we have end of frame! */ - defaultDoSubmitMessage(pThis, stTime, ttGenTime); + defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); pThis->inputState = eAtStrtFram; } } @@ -429,9 +452,12 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt * this *is* the *correct* reception step for all the data we received, because * we have just received a bunch of data! -- rgerhards, 2009-06-16 */ +#define NUM_MULTISUB 1024 static rsRetVal DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen) { + multi_submit_t multiSub; + msg_t *pMsgs[NUM_MULTISUB]; struct syslogTime stTime; time_t ttGenTime; char *pEnd; @@ -442,17 +468,25 @@ DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen) assert(iLen > 0); datetime.getCurrTime(&stTime, &ttGenTime); + multiSub.ppMsgs = pMsgs; + multiSub.maxElem = NUM_MULTISUB; + multiSub.nElem = 0; /* We now copy the message to the session buffer. */ pEnd = pData + iLen; /* this is one off, which is intensional */ + iNbrTimeUsed = 0; /* full time query */ while(pData < pEnd) { - CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime)); + CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub)); } + /* submit anything that was not yet submitted */ + CHKiRet(multiSubmitMsg(&multiSub)); + finalize_it: RETiRet; } +#undef NUM_MULTISUB /* queryInterface function @@ -499,6 +533,7 @@ CODESTARTObjClassExit(tcps_sess) objRelease(errmsg, CORE_COMPONENT); objRelease(netstrm, LM_NETSTRMS_FILENAME); objRelease(datetime, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); ENDObjClassExit(tcps_sess) @@ -511,6 +546,7 @@ BEGINObjClassInit(tcps_sess, 1, OBJ_IS_CORE_MODULE) /* class, version - CHANGE c CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(netstrm, LM_NETSTRMS_FILENAME)); CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */ @@ -521,7 +557,5 @@ BEGINObjClassInit(tcps_sess, 1, OBJ_IS_CORE_MODULE) /* class, version - CHANGE c OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, tcps_sessConstructFinalize); ENDObjClassInit(tcps_sess) - - /* vim:set ai: */ -- cgit v1.2.3