diff options
Diffstat (limited to 'runtime/wtp.c')
-rw-r--r-- | runtime/wtp.c | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/runtime/wtp.c b/runtime/wtp.c index 19151e7..66942e6 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -8,7 +8,7 @@ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it * if you are getting aquainted to the object. * - * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -91,6 +91,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_cond_init(&pThis->condThrdTrm, NULL); pthread_attr_init(&pThis->attrThrd); /* Set thread scheduling policy to default */ +#warning do we need this any longer? I think it was a cure for an already fixed bug.. #ifdef HAVE_PTHREAD_SETSCHEDPARAM pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy); pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param); @@ -121,7 +122,8 @@ wtpConstructFinalize(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker thread pool (numworkerThreads %d)\n", + wtpGetDbgHdr(pThis), pThis->iNumWorkerThreads); /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ @@ -233,9 +235,9 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */ d_pthread_mutex_lock(pThis->pmutUsr); wtpSetState(pThis, tShutdownCmd); - pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */ /* awake workers in retry loop */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { + pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy); wtiWakeupThrd(pThis->pWrkr[i]); } d_pthread_mutex_unlock(pThis->pmutUsr); @@ -455,7 +457,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) { DEFiRet; int nMissing; /* number workers missing to run */ - int i; + int i, nRunning; ISOBJ_TYPE_assert(pThis, wtp); @@ -475,7 +477,13 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) CHKiRet(wtpStartWrkr(pThis)); } } else { - pthread_cond_signal(pThis->pcondBusy); + /* we have needed number of workers, but they may be sleeping */ + for(i = 0, nRunning = 0; i < pThis->iNumWorkerThreads && nRunning < nMaxWrkr; ++i) { + if (wtiGetState(pThis->pWrkr[i]) != WRKTHRD_STOPPED) { + pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy); + nRunning++; + } + } } @@ -490,7 +498,6 @@ DEFpropSetMeth(wtp, wtpState, wtpState_t) DEFpropSetMeth(wtp, iNumWorkerThreads, int) DEFpropSetMeth(wtp, pUsr, void*) DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t) -DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) |