diff options
Diffstat (limited to 'usr/src/uts/common/inet/squeue.c')
-rw-r--r-- | usr/src/uts/common/inet/squeue.c | 456 |
1 files changed, 240 insertions, 216 deletions
diff --git a/usr/src/uts/common/inet/squeue.c b/usr/src/uts/common/inet/squeue.c index 2e08dc359b..a1c0dbe697 100644 --- a/usr/src/uts/common/inet/squeue.c +++ b/usr/src/uts/common/inet/squeue.c @@ -23,7 +23,7 @@ */ /* - * Copyright 2012 Joyent, Inc. All rights reserved. + * Copyright 2017 Joyent, Inc. */ /* @@ -61,6 +61,10 @@ * connection are processed on that squeue. The connection ("conn") to * squeue mapping is stored in "conn_t" member "conn_sqp". * + * If the squeue is not related to TCP/IP, then the value of sqp->sq_isip is + * false and it will not have an associated conn_t, which means many aspects of + * the system, such as polling and swtiching squeues will not be used. + * * Since the processing of the connection cuts across multiple layers * but still allows packets for different connnection to be processed on * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or @@ -132,21 +136,20 @@ #include <sys/squeue_impl.h> -static void squeue_fire(void *); static void squeue_drain(squeue_t *, uint_t, hrtime_t); static void squeue_worker(squeue_t *sqp); static void squeue_polling_thread(squeue_t *sqp); +static void squeue_worker_wakeup(squeue_t *sqp); +static void squeue_try_drain_one(squeue_t *, conn_t *); kmem_cache_t *squeue_cache; #define SQUEUE_MSEC_TO_NSEC 1000000 int squeue_drain_ms = 20; -int squeue_workerwait_ms = 0; /* The values above converted to ticks or nano seconds */ -static int squeue_drain_ns = 0; -static int squeue_workerwait_tick = 0; +static uint_t squeue_drain_ns = 0; uintptr_t squeue_drain_stack_needed = 10240; uint_t squeue_drain_stack_toodeep; @@ -239,19 +242,16 @@ squeue_init(void) sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC; - squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms); } -/* ARGSUSED */ squeue_t * -squeue_create(clock_t wait, pri_t pri) +squeue_create(pri_t pri, boolean_t isip) { squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); bzero(sqp, sizeof (squeue_t)); sqp->sq_bind = PBIND_NONE; sqp->sq_priority = pri; - sqp->sq_wait = MSEC_TO_TICK(wait); sqp->sq_worker = thread_create(NULL, 0, squeue_worker, sqp, 0, &p0, TS_RUN, pri); @@ -260,11 +260,36 @@ squeue_create(clock_t wait, pri_t pri) sqp->sq_enter = squeue_enter; sqp->sq_drain = squeue_drain; + sqp->sq_isip = isip; return (sqp); } /* + * We need to kill the threads and then clean up. We should VERIFY that + * polling is disabled so we don't have to worry about disassociating from + * MAC/IP/etc. + */ +void +squeue_destroy(squeue_t *sqp) +{ + kt_did_t worker, poll; + mutex_enter(&sqp->sq_lock); + VERIFY(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | + SQS_POLL_QUIESCE_DONE | SQS_PAUSE | SQS_EXIT))); + worker = sqp->sq_worker->t_did; + poll = sqp->sq_poll_thr->t_did; + sqp->sq_state |= SQS_EXIT; + cv_signal(&sqp->sq_poll_cv); + cv_signal(&sqp->sq_worker_cv); + mutex_exit(&sqp->sq_lock); + + thread_join(poll); + thread_join(worker); + kmem_cache_free(squeue_cache, sqp); +} + +/* * Bind squeue worker thread to the specified CPU, given by CPU id. * If the CPU id value is -1, bind the worker thread to the value * specified in sq_bind field. If a thread is already bound to a @@ -309,97 +334,6 @@ squeue_unbind(squeue_t *sqp) mutex_exit(&sqp->sq_lock); } -void -squeue_worker_wakeup(squeue_t *sqp) -{ - timeout_id_t tid = (sqp)->sq_tid; - - ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); - - if (sqp->sq_wait == 0) { - ASSERT(tid == 0); - ASSERT(!(sqp->sq_state & SQS_TMO_PROG)); - sqp->sq_awaken = ddi_get_lbolt(); - cv_signal(&sqp->sq_worker_cv); - mutex_exit(&sqp->sq_lock); - return; - } - - /* - * Queue isn't being processed, so take - * any post enqueue actions needed before leaving. - */ - if (tid != 0) { - /* - * Waiting for an enter() to process mblk(s). - */ - clock_t now = ddi_get_lbolt(); - clock_t waited = now - sqp->sq_awaken; - - if (TICK_TO_MSEC(waited) >= sqp->sq_wait) { - /* - * Times up and have a worker thread - * waiting for work, so schedule it. - */ - sqp->sq_tid = 0; - sqp->sq_awaken = now; - cv_signal(&sqp->sq_worker_cv); - mutex_exit(&sqp->sq_lock); - (void) untimeout(tid); - return; - } - mutex_exit(&sqp->sq_lock); - return; - } else if (sqp->sq_state & SQS_TMO_PROG) { - mutex_exit(&sqp->sq_lock); - return; - } else { - clock_t wait = sqp->sq_wait; - /* - * Wait up to sqp->sq_wait ms for an - * enter() to process this queue. We - * don't want to contend on timeout locks - * with sq_lock held for performance reasons, - * so drop the sq_lock before calling timeout - * but we need to check if timeout is required - * after re acquiring the sq_lock. Once - * the sq_lock is dropped, someone else could - * have processed the packet or the timeout could - * have already fired. - */ - sqp->sq_state |= SQS_TMO_PROG; - mutex_exit(&sqp->sq_lock); - tid = timeout(squeue_fire, sqp, wait); - mutex_enter(&sqp->sq_lock); - /* Check again if we still need the timeout */ - if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) == - SQS_TMO_PROG) && (sqp->sq_tid == 0) && - (sqp->sq_first != NULL)) { - sqp->sq_state &= ~SQS_TMO_PROG; - sqp->sq_tid = tid; - mutex_exit(&sqp->sq_lock); - return; - } else { - if (sqp->sq_state & SQS_TMO_PROG) { - sqp->sq_state &= ~SQS_TMO_PROG; - mutex_exit(&sqp->sq_lock); - (void) untimeout(tid); - } else { - /* - * The timer fired before we could - * reacquire the sq_lock. squeue_fire - * removes the SQS_TMO_PROG flag - * and we don't need to do anything - * else. - */ - mutex_exit(&sqp->sq_lock); - } - } - } - - ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); -} - /* * squeue_enter() - enter squeue sqp with mblk mp (which can be * a chain), while tail points to the end and cnt in number of @@ -475,18 +409,21 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, * Handle squeue switching. More details in the * block comment at the top of the file */ - if (connp->conn_sqp == sqp) { + if (sqp->sq_isip == B_FALSE || connp->conn_sqp == sqp) { SQUEUE_DBG_SET(sqp, mp, proc, connp, tag); - connp->conn_on_sqp = B_TRUE; + if (sqp->sq_isip == B_TRUE) + connp->conn_on_sqp = B_TRUE; DTRACE_PROBE3(squeue__proc__start, squeue_t *, sqp, mblk_t *, mp, conn_t *, connp); (*proc)(connp, mp, sqp, ira); DTRACE_PROBE2(squeue__proc__end, squeue_t *, sqp, conn_t *, connp); - connp->conn_on_sqp = B_FALSE; + if (sqp->sq_isip == B_TRUE) { + connp->conn_on_sqp = B_FALSE; + CONN_DEC_REF(connp); + } SQUEUE_DBG_CLEAR(sqp); - CONN_DEC_REF(connp); } else { SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); @@ -497,23 +434,28 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, sqp->sq_run = NULL; if (sqp->sq_first == NULL || process_flag == SQ_NODRAIN) { - if (sqp->sq_first != NULL) { - squeue_worker_wakeup(sqp); - return; + /* + * Even if SQ_NODRAIN was specified, it may + * still be best to process a single queued + * item if it matches the active connection. + */ + if (sqp->sq_first != NULL && sqp->sq_isip) { + squeue_try_drain_one(sqp, connp); } + /* - * We processed inline our packet and nothing - * new has arrived. We are done. In case any - * control actions are pending, wake up the - * worker. + * If work or control actions are pending, wake + * up the worker thread. */ - if (sqp->sq_state & SQS_WORKER_THR_CONTROL) - cv_signal(&sqp->sq_worker_cv); + if (sqp->sq_first != NULL || + sqp->sq_state & SQS_WORKER_THR_CONTROL) { + squeue_worker_wakeup(sqp); + } mutex_exit(&sqp->sq_lock); return; } } else { - if (ira != NULL) { + if (sqp->sq_isip == B_TRUE && ira != NULL) { mblk_t *attrmp; ASSERT(cnt == 1); @@ -565,10 +507,9 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, * up the worker. */ sqp->sq_run = NULL; - if (sqp->sq_state & SQS_WORKER_THR_CONTROL) - cv_signal(&sqp->sq_worker_cv); - mutex_exit(&sqp->sq_lock); - return; + if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { + squeue_worker_wakeup(sqp); + } } else { /* * We let a thread processing a squeue reenter only @@ -587,7 +528,8 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, if (!(sqp->sq_state & SQS_REENTER) && (process_flag != SQ_FILL) && (sqp->sq_first == NULL) && (sqp->sq_run == curthread) && (cnt == 1) && - (connp->conn_on_sqp == B_FALSE)) { + (sqp->sq_isip == B_FALSE || + connp->conn_on_sqp == B_FALSE)) { sqp->sq_state |= SQS_REENTER; mutex_exit(&sqp->sq_lock); @@ -602,15 +544,21 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, * Handle squeue switching. More details in the * block comment at the top of the file */ - if (connp->conn_sqp == sqp) { - connp->conn_on_sqp = B_TRUE; + if (sqp->sq_isip == B_FALSE || connp->conn_sqp == sqp) { + SQUEUE_DBG_SET(sqp, mp, proc, connp, + tag); + if (sqp->sq_isip == B_TRUE) + connp->conn_on_sqp = B_TRUE; DTRACE_PROBE3(squeue__proc__start, squeue_t *, sqp, mblk_t *, mp, conn_t *, connp); (*proc)(connp, mp, sqp, ira); DTRACE_PROBE2(squeue__proc__end, squeue_t *, sqp, conn_t *, connp); - connp->conn_on_sqp = B_FALSE; - CONN_DEC_REF(connp); + if (sqp->sq_isip == B_TRUE) { + connp->conn_on_sqp = B_FALSE; + CONN_DEC_REF(connp); + } + SQUEUE_DBG_CLEAR(sqp); } else { SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); @@ -631,7 +579,7 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, #ifdef DEBUG mp->b_tag = tag; #endif - if (ira != NULL) { + if (sqp->sq_isip && ira != NULL) { mblk_t *attrmp; ASSERT(cnt == 1); @@ -657,54 +605,33 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, tail = mp = attrmp; } ENQUEUE_CHAIN(sqp, mp, tail, cnt); - if (!(sqp->sq_state & SQS_PROC)) { - squeue_worker_wakeup(sqp); - return; - } /* - * In case any control actions are pending, wake - * up the worker. + * If the worker isn't running or control actions are pending, + * wake it it up now. */ - if (sqp->sq_state & SQS_WORKER_THR_CONTROL) - cv_signal(&sqp->sq_worker_cv); - mutex_exit(&sqp->sq_lock); - return; + if ((sqp->sq_state & SQS_PROC) == 0 || + (sqp->sq_state & SQS_WORKER_THR_CONTROL) != 0) { + squeue_worker_wakeup(sqp); + } } + mutex_exit(&sqp->sq_lock); } /* * PRIVATE FUNCTIONS */ + +/* + * Wake up worker thread for squeue to process queued work. + */ static void -squeue_fire(void *arg) +squeue_worker_wakeup(squeue_t *sqp) { - squeue_t *sqp = arg; - uint_t state; - - mutex_enter(&sqp->sq_lock); - - state = sqp->sq_state; - if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) { - mutex_exit(&sqp->sq_lock); - return; - } - - sqp->sq_tid = 0; - /* - * The timeout fired before we got a chance to set it. - * Process it anyway but remove the SQS_TMO_PROG so that - * the guy trying to set the timeout knows that it has - * already been processed. - */ - if (state & SQS_TMO_PROG) - sqp->sq_state &= ~SQS_TMO_PROG; + ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); - if (!(state & SQS_PROC)) { - sqp->sq_awaken = ddi_get_lbolt(); - cv_signal(&sqp->sq_worker_cv); - } - mutex_exit(&sqp->sq_lock); + cv_signal(&sqp->sq_worker_cv); + sqp->sq_awoken = gethrtime(); } static void @@ -714,10 +641,8 @@ squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) mblk_t *head; sqproc_t proc; conn_t *connp; - timeout_id_t tid; ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; hrtime_t now; - boolean_t did_wakeup = B_FALSE; boolean_t sq_poll_capable; ip_recv_attr_t *ira, iras; @@ -729,8 +654,7 @@ squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() - (uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) { ASSERT(mutex_owned(&sqp->sq_lock)); - sqp->sq_awaken = ddi_get_lbolt(); - cv_signal(&sqp->sq_worker_cv); + squeue_worker_wakeup(sqp); squeue_drain_stack_toodeep++; return; } @@ -746,9 +670,6 @@ again: sqp->sq_last = NULL; sqp->sq_count = 0; - if ((tid = sqp->sq_tid) != 0) - sqp->sq_tid = 0; - sqp->sq_state |= SQS_PROC | proc_type; /* @@ -765,9 +686,6 @@ again: SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring); mutex_exit(&sqp->sq_lock); - if (tid != 0) - (void) untimeout(tid); - while ((mp = head) != NULL) { head = mp->b_next; @@ -779,7 +697,7 @@ again: mp->b_prev = NULL; /* Is there an ip_recv_attr_t to handle? */ - if (ip_recv_attr_is_mblk(mp)) { + if (sqp->sq_isip == B_TRUE && ip_recv_attr_is_mblk(mp)) { mblk_t *attrmp = mp; ASSERT(attrmp->b_cont != NULL); @@ -804,20 +722,25 @@ again: /* - * Handle squeue switching. More details in the - * block comment at the top of the file + * Handle squeue switching. More details in the block comment at + * the top of the file. non-IP squeues cannot switch, as there + * is no conn_t. */ - if (connp->conn_sqp == sqp) { + if (sqp->sq_isip == B_FALSE || connp->conn_sqp == sqp) { SQUEUE_DBG_SET(sqp, mp, proc, connp, mp->b_tag); - connp->conn_on_sqp = B_TRUE; + if (sqp->sq_isip == B_TRUE) + connp->conn_on_sqp = B_TRUE; DTRACE_PROBE3(squeue__proc__start, squeue_t *, sqp, mblk_t *, mp, conn_t *, connp); (*proc)(connp, mp, sqp, ira); DTRACE_PROBE2(squeue__proc__end, squeue_t *, sqp, conn_t *, connp); - connp->conn_on_sqp = B_FALSE; - CONN_DEC_REF(connp); + if (sqp->sq_isip == B_TRUE) { + connp->conn_on_sqp = B_FALSE; + CONN_DEC_REF(connp); + } + SQUEUE_DBG_CLEAR(sqp); } else { SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE); @@ -864,11 +787,9 @@ again: if (proc_type == SQS_WORKER) SQS_POLL_RING(sqp); goto again; - } else { - did_wakeup = B_TRUE; - sqp->sq_awaken = ddi_get_lbolt(); - cv_signal(&sqp->sq_worker_cv); } + + squeue_worker_wakeup(sqp); } /* @@ -927,17 +848,14 @@ again: SQS_POLL_QUIESCE_DONE))); SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring); sqp->sq_state &= ~(SQS_PROC | proc_type); - if (!did_wakeup && sqp->sq_first != NULL) { - squeue_worker_wakeup(sqp); - mutex_enter(&sqp->sq_lock); - } /* * If we are not the worker and there is a pending quiesce * event, wake up the worker */ if ((proc_type != SQS_WORKER) && - (sqp->sq_state & SQS_WORKER_THR_CONTROL)) - cv_signal(&sqp->sq_worker_cv); + (sqp->sq_state & SQS_WORKER_THR_CONTROL)) { + squeue_worker_wakeup(sqp); + } } } @@ -1051,6 +969,11 @@ squeue_polling_thread(squeue_t *sqp) cv_wait(async, lock); CALLB_CPR_SAFE_END(&cprinfo, lock); + if (sqp->sq_state & SQS_EXIT) { + mutex_exit(lock); + thread_exit(); + } + ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL | SQS_POLL_THR_QUIESCED); if (ctl_state != 0) { @@ -1076,6 +999,9 @@ squeue_polling_thread(squeue_t *sqp) (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); + /* Only IP related squeues should reach this point */ + VERIFY(sqp->sq_isip == B_TRUE); + poll_again: sq_rx_ring = sqp->sq_rx_ring; sq_get_pkts = sq_rx_ring->rr_rx; @@ -1137,7 +1063,6 @@ poll_again: */ } - sqp->sq_awaken = ddi_get_lbolt(); /* * Put the SQS_PROC_HELD on so the worker * thread can distinguish where its called from. We @@ -1153,7 +1078,7 @@ poll_again: */ sqp->sq_state |= SQS_PROC_HELD; sqp->sq_state &= ~SQS_GET_PKTS; - cv_signal(&sqp->sq_worker_cv); + squeue_worker_wakeup(sqp); } else if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_WORKER)) { /* @@ -1173,8 +1098,9 @@ poll_again: * wake up the worker, since it is currently * not running. */ - if (sqp->sq_state & SQS_WORKER_THR_CONTROL) - cv_signal(&sqp->sq_worker_cv); + if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { + squeue_worker_wakeup(sqp); + } } else { /* * Worker thread is already running. We don't need @@ -1205,6 +1131,7 @@ squeue_worker_thr_control(squeue_t *sqp) ill_rx_ring_t *rx_ring; ASSERT(MUTEX_HELD(&sqp->sq_lock)); + VERIFY(sqp->sq_isip == B_TRUE); if (sqp->sq_state & SQS_POLL_RESTART) { /* Restart implies a previous quiesce. */ @@ -1316,6 +1243,11 @@ squeue_worker(squeue_t *sqp) for (;;) { for (;;) { + if (sqp->sq_state & SQS_EXIT) { + mutex_exit(lock); + thread_exit(); + } + /* * If the poll thread has handed control to us * we need to break out of the wait. @@ -1412,6 +1344,7 @@ squeue_synch_enter(conn_t *connp, mblk_t *use_mp) again: sqp = connp->conn_sqp; + VERIFY(sqp->sq_isip == B_TRUE); mutex_enter(&sqp->sq_lock); if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) { @@ -1483,36 +1416,109 @@ again: } } -void -squeue_synch_exit(conn_t *connp) +/* + * If possible, attempt to immediately process a single queued request, should + * it match the supplied conn_t reference. This is primarily intended to elide + * squeue worker thread wake-ups during local TCP connect() or close() + * operations where the response is placed on the squeue during processing. + */ +static void +squeue_try_drain_one(squeue_t *sqp, conn_t *compare_conn) { - squeue_t *sqp = connp->conn_sqp; + mblk_t *next, *mp = sqp->sq_first; + conn_t *connp; + sqproc_t proc = (sqproc_t)mp->b_queue; + ip_recv_attr_t iras, *ira = NULL; - mutex_enter(&sqp->sq_lock); - if (sqp->sq_run == curthread) { - ASSERT(sqp->sq_state & SQS_PROC); + ASSERT(MUTEX_HELD(&sqp->sq_lock)); + ASSERT((sqp->sq_state & SQS_PROC) == 0); + ASSERT(sqp->sq_run == NULL); + ASSERT(sqp->sq_isip); + VERIFY(mp != NULL); - sqp->sq_state &= ~SQS_PROC; - sqp->sq_run = NULL; - connp->conn_on_sqp = B_FALSE; + /* + * There is no guarantee that compare_conn references a valid object at + * this time, so under no circumstance may it be deferenced unless it + * matches the squeue entry. + */ + connp = (conn_t *)mp->b_prev; + if (connp != compare_conn) { + return; + } - if (sqp->sq_first == NULL) { - mutex_exit(&sqp->sq_lock); - } else { - /* - * If this was a normal thread, then it would - * (most likely) continue processing the pending - * requests. Since the just completed operation - * was executed synchronously, the thread should - * not be delayed. To compensate, wake up the - * worker thread right away when there are outstanding - * requests. - */ - sqp->sq_awaken = ddi_get_lbolt(); - cv_signal(&sqp->sq_worker_cv); - mutex_exit(&sqp->sq_lock); - } + next = mp->b_next; + proc = (sqproc_t)mp->b_queue; + + ASSERT(proc != NULL); + ASSERT(sqp->sq_count > 0); + + /* Dequeue item from squeue */ + if (next == NULL) { + sqp->sq_first = NULL; + sqp->sq_last = NULL; } else { + sqp->sq_first = next; + } + sqp->sq_count--; + + sqp->sq_state |= SQS_PROC; + sqp->sq_run = curthread; + mutex_exit(&sqp->sq_lock); + + /* Prep mblk_t and retrieve ira if needed */ + mp->b_prev = NULL; + mp->b_queue = NULL; + mp->b_next = NULL; + if (ip_recv_attr_is_mblk(mp)) { + mblk_t *attrmp = mp; + + ASSERT(attrmp->b_cont != NULL); + + mp = attrmp->b_cont; + attrmp->b_cont = NULL; + + ASSERT(mp->b_queue == NULL); + ASSERT(mp->b_prev == NULL); + + if (!ip_recv_attr_from_mblk(attrmp, &iras)) { + /* ill_t or ip_stack_t disappeared */ + ip_drop_input("ip_recv_attr_from_mblk", mp, NULL); + ira_cleanup(&iras, B_TRUE); + CONN_DEC_REF(connp); + goto done; + } + ira = &iras; + } + + SQUEUE_DBG_SET(sqp, mp, proc, connp, mp->b_tag); + connp->conn_on_sqp = B_TRUE; + DTRACE_PROBE3(squeue__proc__start, squeue_t *, sqp, mblk_t *, mp, + conn_t *, connp); + (*proc)(connp, mp, sqp, ira); + DTRACE_PROBE2(squeue__proc__end, squeue_t *, sqp, conn_t *, connp); + connp->conn_on_sqp = B_FALSE; + CONN_DEC_REF(connp); + SQUEUE_DBG_CLEAR(sqp); + + if (ira != NULL) + ira_cleanup(ira, B_TRUE); + +done: + mutex_enter(&sqp->sq_lock); + sqp->sq_state &= ~(SQS_PROC); + sqp->sq_run = NULL; +} + +void +squeue_synch_exit(conn_t *connp, int flag) +{ + squeue_t *sqp = connp->conn_sqp; + + VERIFY(sqp->sq_isip == B_TRUE); + ASSERT(flag == SQ_NODRAIN || flag == SQ_PROCESS); + + mutex_enter(&sqp->sq_lock); + if (sqp->sq_run != curthread) { /* * The caller doesn't own the squeue, clear the SQS_PAUSE flag, * and wake up the squeue owner, such that owner can continue @@ -1524,5 +1530,23 @@ squeue_synch_exit(conn_t *connp) /* There should be only one thread blocking on sq_synch_cv. */ cv_signal(&sqp->sq_synch_cv); mutex_exit(&sqp->sq_lock); + return; } + + ASSERT(sqp->sq_state & SQS_PROC); + + sqp->sq_state &= ~SQS_PROC; + sqp->sq_run = NULL; + connp->conn_on_sqp = B_FALSE; + + /* If the caller opted in, attempt to process the head squeue item. */ + if (flag == SQ_PROCESS && sqp->sq_first != NULL) { + squeue_try_drain_one(sqp, connp); + } + + /* Wake up the worker if further requests are pending. */ + if (sqp->sq_first != NULL) { + squeue_worker_wakeup(sqp); + } + mutex_exit(&sqp->sq_lock); } |