summaryrefslogtreecommitdiff
path: root/usr/src/uts/common/inet/squeue.c
diff options
context:
space:
mode:
Diffstat (limited to 'usr/src/uts/common/inet/squeue.c')
-rw-r--r--usr/src/uts/common/inet/squeue.c456
1 files changed, 240 insertions, 216 deletions
diff --git a/usr/src/uts/common/inet/squeue.c b/usr/src/uts/common/inet/squeue.c
index 2e08dc359b..a1c0dbe697 100644
--- a/usr/src/uts/common/inet/squeue.c
+++ b/usr/src/uts/common/inet/squeue.c
@@ -23,7 +23,7 @@
*/
/*
- * Copyright 2012 Joyent, Inc. All rights reserved.
+ * Copyright 2017 Joyent, Inc.
*/
/*
@@ -61,6 +61,10 @@
* connection are processed on that squeue. The connection ("conn") to
* squeue mapping is stored in "conn_t" member "conn_sqp".
*
+ * If the squeue is not related to TCP/IP, then the value of sqp->sq_isip is
+ * false and it will not have an associated conn_t, which means many aspects of
+ * the system, such as polling and swtiching squeues will not be used.
+ *
* Since the processing of the connection cuts across multiple layers
* but still allows packets for different connnection to be processed on
* other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
@@ -132,21 +136,20 @@
#include <sys/squeue_impl.h>
-static void squeue_fire(void *);
static void squeue_drain(squeue_t *, uint_t, hrtime_t);
static void squeue_worker(squeue_t *sqp);
static void squeue_polling_thread(squeue_t *sqp);
+static void squeue_worker_wakeup(squeue_t *sqp);
+static void squeue_try_drain_one(squeue_t *, conn_t *);
kmem_cache_t *squeue_cache;
#define SQUEUE_MSEC_TO_NSEC 1000000
int squeue_drain_ms = 20;
-int squeue_workerwait_ms = 0;
/* The values above converted to ticks or nano seconds */
-static int squeue_drain_ns = 0;
-static int squeue_workerwait_tick = 0;
+static uint_t squeue_drain_ns = 0;
uintptr_t squeue_drain_stack_needed = 10240;
uint_t squeue_drain_stack_toodeep;
@@ -239,19 +242,16 @@ squeue_init(void)
sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
- squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
}
-/* ARGSUSED */
squeue_t *
-squeue_create(clock_t wait, pri_t pri)
+squeue_create(pri_t pri, boolean_t isip)
{
squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
bzero(sqp, sizeof (squeue_t));
sqp->sq_bind = PBIND_NONE;
sqp->sq_priority = pri;
- sqp->sq_wait = MSEC_TO_TICK(wait);
sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
sqp, 0, &p0, TS_RUN, pri);
@@ -260,11 +260,36 @@ squeue_create(clock_t wait, pri_t pri)
sqp->sq_enter = squeue_enter;
sqp->sq_drain = squeue_drain;
+ sqp->sq_isip = isip;
return (sqp);
}
/*
+ * We need to kill the threads and then clean up. We should VERIFY that
+ * polling is disabled so we don't have to worry about disassociating from
+ * MAC/IP/etc.
+ */
+void
+squeue_destroy(squeue_t *sqp)
+{
+ kt_did_t worker, poll;
+ mutex_enter(&sqp->sq_lock);
+ VERIFY(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
+ SQS_POLL_QUIESCE_DONE | SQS_PAUSE | SQS_EXIT)));
+ worker = sqp->sq_worker->t_did;
+ poll = sqp->sq_poll_thr->t_did;
+ sqp->sq_state |= SQS_EXIT;
+ cv_signal(&sqp->sq_poll_cv);
+ cv_signal(&sqp->sq_worker_cv);
+ mutex_exit(&sqp->sq_lock);
+
+ thread_join(poll);
+ thread_join(worker);
+ kmem_cache_free(squeue_cache, sqp);
+}
+
+/*
* Bind squeue worker thread to the specified CPU, given by CPU id.
* If the CPU id value is -1, bind the worker thread to the value
* specified in sq_bind field. If a thread is already bound to a
@@ -309,97 +334,6 @@ squeue_unbind(squeue_t *sqp)
mutex_exit(&sqp->sq_lock);
}
-void
-squeue_worker_wakeup(squeue_t *sqp)
-{
- timeout_id_t tid = (sqp)->sq_tid;
-
- ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
-
- if (sqp->sq_wait == 0) {
- ASSERT(tid == 0);
- ASSERT(!(sqp->sq_state & SQS_TMO_PROG));
- sqp->sq_awaken = ddi_get_lbolt();
- cv_signal(&sqp->sq_worker_cv);
- mutex_exit(&sqp->sq_lock);
- return;
- }
-
- /*
- * Queue isn't being processed, so take
- * any post enqueue actions needed before leaving.
- */
- if (tid != 0) {
- /*
- * Waiting for an enter() to process mblk(s).
- */
- clock_t now = ddi_get_lbolt();
- clock_t waited = now - sqp->sq_awaken;
-
- if (TICK_TO_MSEC(waited) >= sqp->sq_wait) {
- /*
- * Times up and have a worker thread
- * waiting for work, so schedule it.
- */
- sqp->sq_tid = 0;
- sqp->sq_awaken = now;
- cv_signal(&sqp->sq_worker_cv);
- mutex_exit(&sqp->sq_lock);
- (void) untimeout(tid);
- return;
- }
- mutex_exit(&sqp->sq_lock);
- return;
- } else if (sqp->sq_state & SQS_TMO_PROG) {
- mutex_exit(&sqp->sq_lock);
- return;
- } else {
- clock_t wait = sqp->sq_wait;
- /*
- * Wait up to sqp->sq_wait ms for an
- * enter() to process this queue. We
- * don't want to contend on timeout locks
- * with sq_lock held for performance reasons,
- * so drop the sq_lock before calling timeout
- * but we need to check if timeout is required
- * after re acquiring the sq_lock. Once
- * the sq_lock is dropped, someone else could
- * have processed the packet or the timeout could
- * have already fired.
- */
- sqp->sq_state |= SQS_TMO_PROG;
- mutex_exit(&sqp->sq_lock);
- tid = timeout(squeue_fire, sqp, wait);
- mutex_enter(&sqp->sq_lock);
- /* Check again if we still need the timeout */
- if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==
- SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
- (sqp->sq_first != NULL)) {
- sqp->sq_state &= ~SQS_TMO_PROG;
- sqp->sq_tid = tid;
- mutex_exit(&sqp->sq_lock);
- return;
- } else {
- if (sqp->sq_state & SQS_TMO_PROG) {
- sqp->sq_state &= ~SQS_TMO_PROG;
- mutex_exit(&sqp->sq_lock);
- (void) untimeout(tid);
- } else {
- /*
- * The timer fired before we could
- * reacquire the sq_lock. squeue_fire
- * removes the SQS_TMO_PROG flag
- * and we don't need to do anything
- * else.
- */
- mutex_exit(&sqp->sq_lock);
- }
- }
- }
-
- ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
-}
-
/*
* squeue_enter() - enter squeue sqp with mblk mp (which can be
* a chain), while tail points to the end and cnt in number of
@@ -475,18 +409,21 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
* Handle squeue switching. More details in the
* block comment at the top of the file
*/
- if (connp->conn_sqp == sqp) {
+ if (sqp->sq_isip == B_FALSE || connp->conn_sqp == sqp) {
SQUEUE_DBG_SET(sqp, mp, proc, connp,
tag);
- connp->conn_on_sqp = B_TRUE;
+ if (sqp->sq_isip == B_TRUE)
+ connp->conn_on_sqp = B_TRUE;
DTRACE_PROBE3(squeue__proc__start, squeue_t *,
sqp, mblk_t *, mp, conn_t *, connp);
(*proc)(connp, mp, sqp, ira);
DTRACE_PROBE2(squeue__proc__end, squeue_t *,
sqp, conn_t *, connp);
- connp->conn_on_sqp = B_FALSE;
+ if (sqp->sq_isip == B_TRUE) {
+ connp->conn_on_sqp = B_FALSE;
+ CONN_DEC_REF(connp);
+ }
SQUEUE_DBG_CLEAR(sqp);
- CONN_DEC_REF(connp);
} else {
SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
@@ -497,23 +434,28 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
sqp->sq_run = NULL;
if (sqp->sq_first == NULL ||
process_flag == SQ_NODRAIN) {
- if (sqp->sq_first != NULL) {
- squeue_worker_wakeup(sqp);
- return;
+ /*
+ * Even if SQ_NODRAIN was specified, it may
+ * still be best to process a single queued
+ * item if it matches the active connection.
+ */
+ if (sqp->sq_first != NULL && sqp->sq_isip) {
+ squeue_try_drain_one(sqp, connp);
}
+
/*
- * We processed inline our packet and nothing
- * new has arrived. We are done. In case any
- * control actions are pending, wake up the
- * worker.
+ * If work or control actions are pending, wake
+ * up the worker thread.
*/
- if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
- cv_signal(&sqp->sq_worker_cv);
+ if (sqp->sq_first != NULL ||
+ sqp->sq_state & SQS_WORKER_THR_CONTROL) {
+ squeue_worker_wakeup(sqp);
+ }
mutex_exit(&sqp->sq_lock);
return;
}
} else {
- if (ira != NULL) {
+ if (sqp->sq_isip == B_TRUE && ira != NULL) {
mblk_t *attrmp;
ASSERT(cnt == 1);
@@ -565,10 +507,9 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
* up the worker.
*/
sqp->sq_run = NULL;
- if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
- cv_signal(&sqp->sq_worker_cv);
- mutex_exit(&sqp->sq_lock);
- return;
+ if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
+ squeue_worker_wakeup(sqp);
+ }
} else {
/*
* We let a thread processing a squeue reenter only
@@ -587,7 +528,8 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
if (!(sqp->sq_state & SQS_REENTER) &&
(process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
(sqp->sq_run == curthread) && (cnt == 1) &&
- (connp->conn_on_sqp == B_FALSE)) {
+ (sqp->sq_isip == B_FALSE ||
+ connp->conn_on_sqp == B_FALSE)) {
sqp->sq_state |= SQS_REENTER;
mutex_exit(&sqp->sq_lock);
@@ -602,15 +544,21 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
* Handle squeue switching. More details in the
* block comment at the top of the file
*/
- if (connp->conn_sqp == sqp) {
- connp->conn_on_sqp = B_TRUE;
+ if (sqp->sq_isip == B_FALSE || connp->conn_sqp == sqp) {
+ SQUEUE_DBG_SET(sqp, mp, proc, connp,
+ tag);
+ if (sqp->sq_isip == B_TRUE)
+ connp->conn_on_sqp = B_TRUE;
DTRACE_PROBE3(squeue__proc__start, squeue_t *,
sqp, mblk_t *, mp, conn_t *, connp);
(*proc)(connp, mp, sqp, ira);
DTRACE_PROBE2(squeue__proc__end, squeue_t *,
sqp, conn_t *, connp);
- connp->conn_on_sqp = B_FALSE;
- CONN_DEC_REF(connp);
+ if (sqp->sq_isip == B_TRUE) {
+ connp->conn_on_sqp = B_FALSE;
+ CONN_DEC_REF(connp);
+ }
+ SQUEUE_DBG_CLEAR(sqp);
} else {
SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
@@ -631,7 +579,7 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
#ifdef DEBUG
mp->b_tag = tag;
#endif
- if (ira != NULL) {
+ if (sqp->sq_isip && ira != NULL) {
mblk_t *attrmp;
ASSERT(cnt == 1);
@@ -657,54 +605,33 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
tail = mp = attrmp;
}
ENQUEUE_CHAIN(sqp, mp, tail, cnt);
- if (!(sqp->sq_state & SQS_PROC)) {
- squeue_worker_wakeup(sqp);
- return;
- }
/*
- * In case any control actions are pending, wake
- * up the worker.
+ * If the worker isn't running or control actions are pending,
+ * wake it it up now.
*/
- if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
- cv_signal(&sqp->sq_worker_cv);
- mutex_exit(&sqp->sq_lock);
- return;
+ if ((sqp->sq_state & SQS_PROC) == 0 ||
+ (sqp->sq_state & SQS_WORKER_THR_CONTROL) != 0) {
+ squeue_worker_wakeup(sqp);
+ }
}
+ mutex_exit(&sqp->sq_lock);
}
/*
* PRIVATE FUNCTIONS
*/
+
+/*
+ * Wake up worker thread for squeue to process queued work.
+ */
static void
-squeue_fire(void *arg)
+squeue_worker_wakeup(squeue_t *sqp)
{
- squeue_t *sqp = arg;
- uint_t state;
-
- mutex_enter(&sqp->sq_lock);
-
- state = sqp->sq_state;
- if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) {
- mutex_exit(&sqp->sq_lock);
- return;
- }
-
- sqp->sq_tid = 0;
- /*
- * The timeout fired before we got a chance to set it.
- * Process it anyway but remove the SQS_TMO_PROG so that
- * the guy trying to set the timeout knows that it has
- * already been processed.
- */
- if (state & SQS_TMO_PROG)
- sqp->sq_state &= ~SQS_TMO_PROG;
+ ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
- if (!(state & SQS_PROC)) {
- sqp->sq_awaken = ddi_get_lbolt();
- cv_signal(&sqp->sq_worker_cv);
- }
- mutex_exit(&sqp->sq_lock);
+ cv_signal(&sqp->sq_worker_cv);
+ sqp->sq_awoken = gethrtime();
}
static void
@@ -714,10 +641,8 @@ squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
mblk_t *head;
sqproc_t proc;
conn_t *connp;
- timeout_id_t tid;
ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring;
hrtime_t now;
- boolean_t did_wakeup = B_FALSE;
boolean_t sq_poll_capable;
ip_recv_attr_t *ira, iras;
@@ -729,8 +654,7 @@ squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() -
(uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) {
ASSERT(mutex_owned(&sqp->sq_lock));
- sqp->sq_awaken = ddi_get_lbolt();
- cv_signal(&sqp->sq_worker_cv);
+ squeue_worker_wakeup(sqp);
squeue_drain_stack_toodeep++;
return;
}
@@ -746,9 +670,6 @@ again:
sqp->sq_last = NULL;
sqp->sq_count = 0;
- if ((tid = sqp->sq_tid) != 0)
- sqp->sq_tid = 0;
-
sqp->sq_state |= SQS_PROC | proc_type;
/*
@@ -765,9 +686,6 @@ again:
SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
mutex_exit(&sqp->sq_lock);
- if (tid != 0)
- (void) untimeout(tid);
-
while ((mp = head) != NULL) {
head = mp->b_next;
@@ -779,7 +697,7 @@ again:
mp->b_prev = NULL;
/* Is there an ip_recv_attr_t to handle? */
- if (ip_recv_attr_is_mblk(mp)) {
+ if (sqp->sq_isip == B_TRUE && ip_recv_attr_is_mblk(mp)) {
mblk_t *attrmp = mp;
ASSERT(attrmp->b_cont != NULL);
@@ -804,20 +722,25 @@ again:
/*
- * Handle squeue switching. More details in the
- * block comment at the top of the file
+ * Handle squeue switching. More details in the block comment at
+ * the top of the file. non-IP squeues cannot switch, as there
+ * is no conn_t.
*/
- if (connp->conn_sqp == sqp) {
+ if (sqp->sq_isip == B_FALSE || connp->conn_sqp == sqp) {
SQUEUE_DBG_SET(sqp, mp, proc, connp,
mp->b_tag);
- connp->conn_on_sqp = B_TRUE;
+ if (sqp->sq_isip == B_TRUE)
+ connp->conn_on_sqp = B_TRUE;
DTRACE_PROBE3(squeue__proc__start, squeue_t *,
sqp, mblk_t *, mp, conn_t *, connp);
(*proc)(connp, mp, sqp, ira);
DTRACE_PROBE2(squeue__proc__end, squeue_t *,
sqp, conn_t *, connp);
- connp->conn_on_sqp = B_FALSE;
- CONN_DEC_REF(connp);
+ if (sqp->sq_isip == B_TRUE) {
+ connp->conn_on_sqp = B_FALSE;
+ CONN_DEC_REF(connp);
+ }
+ SQUEUE_DBG_CLEAR(sqp);
} else {
SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira,
SQ_FILL, SQTAG_SQUEUE_CHANGE);
@@ -864,11 +787,9 @@ again:
if (proc_type == SQS_WORKER)
SQS_POLL_RING(sqp);
goto again;
- } else {
- did_wakeup = B_TRUE;
- sqp->sq_awaken = ddi_get_lbolt();
- cv_signal(&sqp->sq_worker_cv);
}
+
+ squeue_worker_wakeup(sqp);
}
/*
@@ -927,17 +848,14 @@ again:
SQS_POLL_QUIESCE_DONE)));
SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
sqp->sq_state &= ~(SQS_PROC | proc_type);
- if (!did_wakeup && sqp->sq_first != NULL) {
- squeue_worker_wakeup(sqp);
- mutex_enter(&sqp->sq_lock);
- }
/*
* If we are not the worker and there is a pending quiesce
* event, wake up the worker
*/
if ((proc_type != SQS_WORKER) &&
- (sqp->sq_state & SQS_WORKER_THR_CONTROL))
- cv_signal(&sqp->sq_worker_cv);
+ (sqp->sq_state & SQS_WORKER_THR_CONTROL)) {
+ squeue_worker_wakeup(sqp);
+ }
}
}
@@ -1051,6 +969,11 @@ squeue_polling_thread(squeue_t *sqp)
cv_wait(async, lock);
CALLB_CPR_SAFE_END(&cprinfo, lock);
+ if (sqp->sq_state & SQS_EXIT) {
+ mutex_exit(lock);
+ thread_exit();
+ }
+
ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
SQS_POLL_THR_QUIESCED);
if (ctl_state != 0) {
@@ -1076,6 +999,9 @@ squeue_polling_thread(squeue_t *sqp)
(SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
(SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
+ /* Only IP related squeues should reach this point */
+ VERIFY(sqp->sq_isip == B_TRUE);
+
poll_again:
sq_rx_ring = sqp->sq_rx_ring;
sq_get_pkts = sq_rx_ring->rr_rx;
@@ -1137,7 +1063,6 @@ poll_again:
*/
}
- sqp->sq_awaken = ddi_get_lbolt();
/*
* Put the SQS_PROC_HELD on so the worker
* thread can distinguish where its called from. We
@@ -1153,7 +1078,7 @@ poll_again:
*/
sqp->sq_state |= SQS_PROC_HELD;
sqp->sq_state &= ~SQS_GET_PKTS;
- cv_signal(&sqp->sq_worker_cv);
+ squeue_worker_wakeup(sqp);
} else if (sqp->sq_first == NULL &&
!(sqp->sq_state & SQS_WORKER)) {
/*
@@ -1173,8 +1098,9 @@ poll_again:
* wake up the worker, since it is currently
* not running.
*/
- if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
- cv_signal(&sqp->sq_worker_cv);
+ if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
+ squeue_worker_wakeup(sqp);
+ }
} else {
/*
* Worker thread is already running. We don't need
@@ -1205,6 +1131,7 @@ squeue_worker_thr_control(squeue_t *sqp)
ill_rx_ring_t *rx_ring;
ASSERT(MUTEX_HELD(&sqp->sq_lock));
+ VERIFY(sqp->sq_isip == B_TRUE);
if (sqp->sq_state & SQS_POLL_RESTART) {
/* Restart implies a previous quiesce. */
@@ -1316,6 +1243,11 @@ squeue_worker(squeue_t *sqp)
for (;;) {
for (;;) {
+ if (sqp->sq_state & SQS_EXIT) {
+ mutex_exit(lock);
+ thread_exit();
+ }
+
/*
* If the poll thread has handed control to us
* we need to break out of the wait.
@@ -1412,6 +1344,7 @@ squeue_synch_enter(conn_t *connp, mblk_t *use_mp)
again:
sqp = connp->conn_sqp;
+ VERIFY(sqp->sq_isip == B_TRUE);
mutex_enter(&sqp->sq_lock);
if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) {
@@ -1483,36 +1416,109 @@ again:
}
}
-void
-squeue_synch_exit(conn_t *connp)
+/*
+ * If possible, attempt to immediately process a single queued request, should
+ * it match the supplied conn_t reference. This is primarily intended to elide
+ * squeue worker thread wake-ups during local TCP connect() or close()
+ * operations where the response is placed on the squeue during processing.
+ */
+static void
+squeue_try_drain_one(squeue_t *sqp, conn_t *compare_conn)
{
- squeue_t *sqp = connp->conn_sqp;
+ mblk_t *next, *mp = sqp->sq_first;
+ conn_t *connp;
+ sqproc_t proc = (sqproc_t)mp->b_queue;
+ ip_recv_attr_t iras, *ira = NULL;
- mutex_enter(&sqp->sq_lock);
- if (sqp->sq_run == curthread) {
- ASSERT(sqp->sq_state & SQS_PROC);
+ ASSERT(MUTEX_HELD(&sqp->sq_lock));
+ ASSERT((sqp->sq_state & SQS_PROC) == 0);
+ ASSERT(sqp->sq_run == NULL);
+ ASSERT(sqp->sq_isip);
+ VERIFY(mp != NULL);
- sqp->sq_state &= ~SQS_PROC;
- sqp->sq_run = NULL;
- connp->conn_on_sqp = B_FALSE;
+ /*
+ * There is no guarantee that compare_conn references a valid object at
+ * this time, so under no circumstance may it be deferenced unless it
+ * matches the squeue entry.
+ */
+ connp = (conn_t *)mp->b_prev;
+ if (connp != compare_conn) {
+ return;
+ }
- if (sqp->sq_first == NULL) {
- mutex_exit(&sqp->sq_lock);
- } else {
- /*
- * If this was a normal thread, then it would
- * (most likely) continue processing the pending
- * requests. Since the just completed operation
- * was executed synchronously, the thread should
- * not be delayed. To compensate, wake up the
- * worker thread right away when there are outstanding
- * requests.
- */
- sqp->sq_awaken = ddi_get_lbolt();
- cv_signal(&sqp->sq_worker_cv);
- mutex_exit(&sqp->sq_lock);
- }
+ next = mp->b_next;
+ proc = (sqproc_t)mp->b_queue;
+
+ ASSERT(proc != NULL);
+ ASSERT(sqp->sq_count > 0);
+
+ /* Dequeue item from squeue */
+ if (next == NULL) {
+ sqp->sq_first = NULL;
+ sqp->sq_last = NULL;
} else {
+ sqp->sq_first = next;
+ }
+ sqp->sq_count--;
+
+ sqp->sq_state |= SQS_PROC;
+ sqp->sq_run = curthread;
+ mutex_exit(&sqp->sq_lock);
+
+ /* Prep mblk_t and retrieve ira if needed */
+ mp->b_prev = NULL;
+ mp->b_queue = NULL;
+ mp->b_next = NULL;
+ if (ip_recv_attr_is_mblk(mp)) {
+ mblk_t *attrmp = mp;
+
+ ASSERT(attrmp->b_cont != NULL);
+
+ mp = attrmp->b_cont;
+ attrmp->b_cont = NULL;
+
+ ASSERT(mp->b_queue == NULL);
+ ASSERT(mp->b_prev == NULL);
+
+ if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
+ /* ill_t or ip_stack_t disappeared */
+ ip_drop_input("ip_recv_attr_from_mblk", mp, NULL);
+ ira_cleanup(&iras, B_TRUE);
+ CONN_DEC_REF(connp);
+ goto done;
+ }
+ ira = &iras;
+ }
+
+ SQUEUE_DBG_SET(sqp, mp, proc, connp, mp->b_tag);
+ connp->conn_on_sqp = B_TRUE;
+ DTRACE_PROBE3(squeue__proc__start, squeue_t *, sqp, mblk_t *, mp,
+ conn_t *, connp);
+ (*proc)(connp, mp, sqp, ira);
+ DTRACE_PROBE2(squeue__proc__end, squeue_t *, sqp, conn_t *, connp);
+ connp->conn_on_sqp = B_FALSE;
+ CONN_DEC_REF(connp);
+ SQUEUE_DBG_CLEAR(sqp);
+
+ if (ira != NULL)
+ ira_cleanup(ira, B_TRUE);
+
+done:
+ mutex_enter(&sqp->sq_lock);
+ sqp->sq_state &= ~(SQS_PROC);
+ sqp->sq_run = NULL;
+}
+
+void
+squeue_synch_exit(conn_t *connp, int flag)
+{
+ squeue_t *sqp = connp->conn_sqp;
+
+ VERIFY(sqp->sq_isip == B_TRUE);
+ ASSERT(flag == SQ_NODRAIN || flag == SQ_PROCESS);
+
+ mutex_enter(&sqp->sq_lock);
+ if (sqp->sq_run != curthread) {
/*
* The caller doesn't own the squeue, clear the SQS_PAUSE flag,
* and wake up the squeue owner, such that owner can continue
@@ -1524,5 +1530,23 @@ squeue_synch_exit(conn_t *connp)
/* There should be only one thread blocking on sq_synch_cv. */
cv_signal(&sqp->sq_synch_cv);
mutex_exit(&sqp->sq_lock);
+ return;
}
+
+ ASSERT(sqp->sq_state & SQS_PROC);
+
+ sqp->sq_state &= ~SQS_PROC;
+ sqp->sq_run = NULL;
+ connp->conn_on_sqp = B_FALSE;
+
+ /* If the caller opted in, attempt to process the head squeue item. */
+ if (flag == SQ_PROCESS && sqp->sq_first != NULL) {
+ squeue_try_drain_one(sqp, connp);
+ }
+
+ /* Wake up the worker if further requests are pending. */
+ if (sqp->sq_first != NULL) {
+ squeue_worker_wakeup(sqp);
+ }
+ mutex_exit(&sqp->sq_lock);
}