summaryrefslogtreecommitdiff
path: root/usr/src/uts/common/inet/squeue.c
diff options
context:
space:
mode:
Diffstat (limited to 'usr/src/uts/common/inet/squeue.c')
-rw-r--r--usr/src/uts/common/inet/squeue.c1970
1 files changed, 861 insertions, 1109 deletions
diff --git a/usr/src/uts/common/inet/squeue.c b/usr/src/uts/common/inet/squeue.c
index 4895e2249e..559abd9178 100644
--- a/usr/src/uts/common/inet/squeue.c
+++ b/usr/src/uts/common/inet/squeue.c
@@ -19,144 +19,95 @@
* CDDL HEADER END
*/
/*
- * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
-#pragma ident "%Z%%M% %I% %E% SMI"
-
/*
- * Squeues - TCP/IP serialization mechanism.
- *
- * This is a general purpose high-performance serialization mechanism. It is
- * similar to a taskq with a single worker thread, the difference is that it
- * does not imply a context switch - the thread placing a request may actually
- * process it. It is also biased for processing requests in interrupt context.
- *
- * Each squeue has a worker thread which may optionally be bound to a CPU.
- *
- * Only one thread may process requests from a given squeue at any time. This is
- * called "entering" squeue.
- *
- * Each dispatched request is processed either by
- *
- * a) Dispatching thread or
- * b) Some other thread that is currently processing squeue at the time of
- * request or
- * c) worker thread.
- *
- * INTERFACES:
- *
- * squeue_t *squeue_create(name, bind, wait, pri)
- *
- * name: symbolic name for squeue.
- * wait: time to wait before waiking the worker thread after queueing
- * request.
- * bind: preferred CPU binding for the worker thread.
- * pri: thread priority for the worker thread.
- *
- * This function never fails and may sleep. It returns a transparent pointer
- * to the squeue_t structure that is passed to all other squeue operations.
- *
- * void squeue_bind(sqp, bind)
- *
- * Bind squeue worker thread to a CPU specified by the 'bind' argument. The
- * 'bind' value of -1 binds to the preferred thread specified for
- * squeue_create.
- *
- * NOTE: Any value of 'bind' other then -1 is not supported currently, but the
- * API is present - in the future it may be useful to specify different
- * binding.
- *
- * void squeue_unbind(sqp)
- *
- * Unbind the worker thread from its preferred CPU.
- *
- * void squeue_enter(*sqp, *mp, proc, arg, tag)
- *
- * Post a single request for processing. Each request consists of mblock 'mp',
- * function 'proc' to execute and an argument 'arg' to pass to this
- * function. The function is called as (*proc)(arg, mp, sqp); The tag is an
- * arbitrary number from 0 to 255 which will be stored in mp to track exact
- * caller of squeue_enter. The combination of function name and the tag should
- * provide enough information to identify the caller.
- *
- * If no one is processing the squeue, squeue_enter() will call the function
- * immediately. Otherwise it will add the request to the queue for later
- * processing. Once the function is executed, the thread may continue
- * executing all other requests pending on the queue.
+ * Squeues: General purpose serialization mechanism
+ * ------------------------------------------------
*
- * NOTE: The tagging information is only used when SQUEUE_DEBUG is set to 1.
- * NOTE: The argument can be conn_t only. Ideally we'd like to have generic
- * argument, but we want to drop connection reference count here - this
- * improves tail-call optimizations.
- * XXX: The arg should have type conn_t.
+ * Background:
+ * -----------
*
- * void squeue_enter_nodrain(*sqp, *mp, proc, arg, tag)
+ * This is a general purpose high-performance serialization mechanism
+ * currently used by TCP/IP. It is implement by means of a per CPU queue,
+ * a worker thread and a polling thread with are bound to the CPU
+ * associated with the squeue. The squeue is strictly FIFO for both read
+ * and write side and only one thread can process it at any given time.
+ * The design goal of squeue was to offer a very high degree of
+ * parallelization (on a per H/W execution pipeline basis) with at
+ * most one queuing.
*
- * Same as squeue_enter(), but the entering thread will only try to execute a
- * single request. It will not continue executing any pending requests.
+ * The modules needing protection typically calls squeue_enter() or
+ * squeue_enter_chain() routine as soon as a thread enter the module
+ * from either direction. For each packet, the processing function
+ * and argument is stored in the mblk itself. When the packet is ready
+ * to be processed, the squeue retrieves the stored function and calls
+ * it with the supplied argument and the pointer to the packet itself.
+ * The called function can assume that no other thread is processing
+ * the squeue when it is executing.
*
- * void squeue_fill(*sqp, *mp, proc, arg, tag)
+ * Squeue/connection binding:
+ * --------------------------
*
- * Just place the request on the queue without trying to execute it. Arrange
- * for the worker thread to process the request.
+ * TCP/IP uses an IP classifier in conjunction with squeue where specific
+ * connections are assigned to specific squeue (based on various policies),
+ * at the connection creation time. Once assigned, the connection to
+ * squeue mapping is never changed and all future packets for that
+ * connection are processed on that squeue. The connection ("conn") to
+ * squeue mapping is stored in "conn_t" member "conn_sqp".
*
- * void squeue_profile_enable(sqp)
- * void squeue_profile_disable(sqp)
+ * Since the processing of the connection cuts across multiple layers
+ * but still allows packets for different connnection to be processed on
+ * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
+ * "Per Connection Vertical Perimeter".
*
- * Enable or disable profiling for specified 'sqp'. Profiling is only
- * available when SQUEUE_PROFILE is set.
+ * Processing Model:
+ * -----------------
*
- * void squeue_profile_reset(sqp)
+ * Squeue doesn't necessary processes packets with its own worker thread.
+ * The callers can pick if they just want to queue the packet, process
+ * their packet if nothing is queued or drain and process. The first two
+ * modes are typically employed when the packet was generated while
+ * already doing the processing behind the squeue and last mode (drain
+ * and process) is typically employed when the thread is entering squeue
+ * for the first time. The squeue still imposes a finite time limit
+ * for which a external thread can do processing after which it switches
+ * processing to its own worker thread.
*
- * Reset all profiling information to zero. Profiling is only
- * available when SQUEUE_PROFILE is set.
+ * Once created, squeues are never deleted. Hence squeue pointers are
+ * always valid. This means that functions outside the squeue can still
+ * refer safely to conn_sqp and their is no need for ref counts.
*
- * void squeue_profile_start()
- * void squeue_profile_stop()
+ * Only a thread executing in the squeue can change the squeue of the
+ * connection. It does so by calling a squeue framework function to do this.
+ * After changing the squeue, the thread must leave the squeue. It must not
+ * continue to execute any code that needs squeue protection.
*
- * Globally enable or disabled profiling for all squeues.
+ * The squeue framework, after entering the squeue, checks if the current
+ * squeue matches the conn_sqp. If the check fails, the packet is delivered
+ * to right squeue.
*
- * uintptr_t *squeue_getprivate(sqp, p)
+ * Polling Model:
+ * --------------
*
- * Each squeue keeps small amount of private data space available for various
- * consumers. Current consumers include TCP and NCA. Other consumers need to
- * add their private tag to the sqprivate_t enum. The private information is
- * limited to an uintptr_t value. The squeue has no knowledge of its content
- * and does not manage it in any way.
+ * Squeues can control the rate of packet arrival into itself from the
+ * NIC or specific Rx ring within a NIC. As part of capability negotiation
+ * between IP and MAC layer, squeue are created for each TCP soft ring
+ * (or TCP Rx ring - to be implemented in future). As part of this
+ * negotiation, squeues get a cookie for underlying soft ring or Rx
+ * ring, a function to turn off incoming packets and a function to call
+ * to poll for packets. This helps schedule the receive side packet
+ * processing so that queue backlog doesn't build up and packet processing
+ * doesn't keep getting disturbed by high priority interrupts. As part
+ * of this mode, as soon as a backlog starts building, squeue turns off
+ * the interrupts and switches to poll mode. In poll mode, when poll
+ * thread goes down to retrieve packets, it retrieves them in the form of
+ * a chain which improves performance even more. As the squeue/softring
+ * system gets more packets, it gets more efficient by switching to
+ * polling more often and dealing with larger packet chains.
*
- * The typical use may be a breakdown of data structures per CPU (since
- * squeues are usually per CPU). See NCA for examples of use.
- * Currently 'p' may have one legal value SQPRIVATE_TCP.
- *
- * processorid_t squeue_binding(sqp)
- *
- * Returns the CPU binding for a given squeue.
- *
- * TUNABALES:
- *
- * squeue_intrdrain_ms: Maximum time in ms interrupts spend draining any
- * squeue. Note that this is approximation - squeues have no control on the
- * time it takes to process each request. This limit is only checked
- * between processing individual messages.
- * Default: 20 ms.
- *
- * squeue_writerdrain_ms: Maximum time in ms non-interrupts spend draining any
- * squeue. Note that this is approximation - squeues have no control on the
- * time it takes to process each request. This limit is only checked
- * between processing individual messages.
- * Default: 10 ms.
- *
- * squeue_workerdrain_ms: Maximum time in ms worker thread spends draining any
- * squeue. Note that this is approximation - squeues have no control on the
- * time it takes to process each request. This limit is only checked
- * between processing individual messages.
- * Default: 10 ms.
- *
- * squeue_workerwait_ms: When worker thread is interrupted because workerdrain
- * expired, how much time to wait before waking worker thread again.
- * Default: 10 ms.
*/
#include <sys/types.h>
@@ -169,208 +120,30 @@
#include <sys/callb.h>
#include <sys/sdt.h>
#include <sys/ddi.h>
+#include <sys/sunddi.h>
#include <inet/ipclassifier.h>
#include <inet/udp_impl.h>
-/*
- * State flags.
- * Note: The MDB IP module depends on the values of these flags.
- */
-#define SQS_PROC 0x0001 /* being processed */
-#define SQS_WORKER 0x0002 /* worker thread */
-#define SQS_ENTER 0x0004 /* enter thread */
-#define SQS_FAST 0x0008 /* enter-fast thread */
-#define SQS_USER 0x0010 /* A non interrupt user */
-#define SQS_BOUND 0x0020 /* Worker thread is bound */
-#define SQS_PROFILE 0x0040 /* Enable profiling */
-#define SQS_REENTER 0x0080 /* Re entered thread */
-#define SQS_TMO_PROG 0x0100 /* Timeout is being set */
-
#include <sys/squeue_impl.h>
static void squeue_fire(void *);
static void squeue_drain(squeue_t *, uint_t, hrtime_t);
static void squeue_worker(squeue_t *sqp);
-
-#if SQUEUE_PROFILE
-static kmutex_t squeue_kstat_lock;
-static int squeue_kstat_update(kstat_t *, int);
-#endif
+static void squeue_polling_thread(squeue_t *sqp);
kmem_cache_t *squeue_cache;
#define SQUEUE_MSEC_TO_NSEC 1000000
-int squeue_intrdrain_ms = 20;
-int squeue_writerdrain_ms = 10;
-int squeue_workerdrain_ms = 10;
-int squeue_workerwait_ms = 10;
+int squeue_drain_ms = 20;
+int squeue_workerwait_ms = 0;
/* The values above converted to ticks or nano seconds */
-static int squeue_intrdrain_ns = 0;
-static int squeue_writerdrain_ns = 0;
-static int squeue_workerdrain_ns = 0;
+static int squeue_drain_ns = 0;
static int squeue_workerwait_tick = 0;
-/*
- * The minimum packet queued when worker thread doing the drain triggers
- * polling (if squeue allows it). The choice of 3 is arbitrary. You
- * definitely don't want it to be 1 since that will trigger polling
- * on very low loads as well (ssh seems to do be one such example
- * where packet flow was very low yet somehow 1 packet ended up getting
- * queued and worker thread fires every 10ms and blanking also gets
- * triggered.
- */
-int squeue_worker_poll_min = 3;
-
-#if SQUEUE_PROFILE
-/*
- * Set to B_TRUE to enable profiling.
- */
-static int squeue_profile = B_FALSE;
-#define SQ_PROFILING(sqp) (squeue_profile && ((sqp)->sq_state & SQS_PROFILE))
-
-#define SQSTAT(sqp, x) ((sqp)->sq_stats.x++)
-#define SQDELTA(sqp, x, d) ((sqp)->sq_stats.x += (d))
-
-struct squeue_kstat {
- kstat_named_t sq_count;
- kstat_named_t sq_max_qlen;
- kstat_named_t sq_npackets_worker;
- kstat_named_t sq_npackets_intr;
- kstat_named_t sq_npackets_other;
- kstat_named_t sq_nqueued_intr;
- kstat_named_t sq_nqueued_other;
- kstat_named_t sq_ndrains_worker;
- kstat_named_t sq_ndrains_intr;
- kstat_named_t sq_ndrains_other;
- kstat_named_t sq_time_worker;
- kstat_named_t sq_time_intr;
- kstat_named_t sq_time_other;
-} squeue_kstat = {
- { "count", KSTAT_DATA_UINT64 },
- { "max_qlen", KSTAT_DATA_UINT64 },
- { "packets_worker", KSTAT_DATA_UINT64 },
- { "packets_intr", KSTAT_DATA_UINT64 },
- { "packets_other", KSTAT_DATA_UINT64 },
- { "queued_intr", KSTAT_DATA_UINT64 },
- { "queued_other", KSTAT_DATA_UINT64 },
- { "ndrains_worker", KSTAT_DATA_UINT64 },
- { "ndrains_intr", KSTAT_DATA_UINT64 },
- { "ndrains_other", KSTAT_DATA_UINT64 },
- { "time_worker", KSTAT_DATA_UINT64 },
- { "time_intr", KSTAT_DATA_UINT64 },
- { "time_other", KSTAT_DATA_UINT64 },
-};
-#endif
-
-#define SQUEUE_WORKER_WAKEUP(sqp) { \
- timeout_id_t tid = (sqp)->sq_tid; \
- \
- ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
- /* \
- * Queue isn't being processed, so take \
- * any post enqueue actions needed before leaving. \
- */ \
- if (tid != 0) { \
- /* \
- * Waiting for an enter() to process mblk(s). \
- */ \
- clock_t waited = lbolt - (sqp)->sq_awaken; \
- \
- if (TICK_TO_MSEC(waited) >= (sqp)->sq_wait) { \
- /* \
- * Times up and have a worker thread \
- * waiting for work, so schedule it. \
- */ \
- (sqp)->sq_tid = 0; \
- (sqp)->sq_awaken = lbolt; \
- cv_signal(&(sqp)->sq_async); \
- mutex_exit(&(sqp)->sq_lock); \
- (void) untimeout(tid); \
- return; \
- } \
- mutex_exit(&(sqp)->sq_lock); \
- return; \
- } else if ((sqp)->sq_state & SQS_TMO_PROG) { \
- mutex_exit(&(sqp)->sq_lock); \
- return; \
- } else if ((sqp)->sq_wait != 0) { \
- clock_t wait = (sqp)->sq_wait; \
- /* \
- * Wait up to sqp->sq_wait ms for an \
- * enter() to process this queue. We \
- * don't want to contend on timeout locks \
- * with sq_lock held for performance reasons, \
- * so drop the sq_lock before calling timeout \
- * but we need to check if timeout is required \
- * after re acquiring the sq_lock. Once \
- * the sq_lock is dropped, someone else could \
- * have processed the packet or the timeout could \
- * have already fired. \
- */ \
- (sqp)->sq_state |= SQS_TMO_PROG; \
- mutex_exit(&(sqp)->sq_lock); \
- tid = timeout(squeue_fire, (sqp), wait); \
- mutex_enter(&(sqp)->sq_lock); \
- /* Check again if we still need the timeout */ \
- if ((((sqp)->sq_state & (SQS_PROC|SQS_TMO_PROG)) == \
- SQS_TMO_PROG) && ((sqp)->sq_tid == 0) && \
- ((sqp)->sq_first != NULL)) { \
- (sqp)->sq_state &= ~SQS_TMO_PROG; \
- (sqp)->sq_awaken = lbolt; \
- (sqp)->sq_tid = tid; \
- mutex_exit(&(sqp)->sq_lock); \
- return; \
- } else { \
- if ((sqp)->sq_state & SQS_TMO_PROG) { \
- (sqp)->sq_state &= ~SQS_TMO_PROG; \
- mutex_exit(&(sqp)->sq_lock); \
- (void) untimeout(tid); \
- } else { \
- /* \
- * The timer fired before we could \
- * reacquire the sq_lock. squeue_fire \
- * removes the SQS_TMO_PROG flag \
- * and we don't need to do anything \
- * else. \
- */ \
- mutex_exit(&(sqp)->sq_lock); \
- } \
- } \
- } else { \
- /* \
- * Schedule the worker thread. \
- */ \
- (sqp)->sq_awaken = lbolt; \
- cv_signal(&(sqp)->sq_async); \
- mutex_exit(&(sqp)->sq_lock); \
- } \
- ASSERT(MUTEX_NOT_HELD(&(sqp)->sq_lock)); \
-}
-
-#define ENQUEUE_MP(sqp, mp, proc, arg) { \
- /* \
- * Enque our mblk. \
- */ \
- (mp)->b_queue = NULL; \
- ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
- ASSERT((mp)->b_prev == NULL && (mp)->b_next == NULL); \
- (mp)->b_queue = (queue_t *)(proc); \
- (mp)->b_prev = (mblk_t *)(arg); \
- \
- if ((sqp)->sq_last != NULL) \
- (sqp)->sq_last->b_next = (mp); \
- else \
- (sqp)->sq_first = (mp); \
- (sqp)->sq_last = (mp); \
- (sqp)->sq_count++; \
- ASSERT((sqp)->sq_count > 0); \
- DTRACE_PROBE2(squeue__enqueue, squeue_t *, sqp, \
- mblk_t *, mp); \
-}
-
+#define MAX_BYTES_TO_PICKUP 150000
#define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \
/* \
@@ -390,89 +163,120 @@ struct squeue_kstat {
\
}
-#define SQS_POLLING_ON(sqp, rx_ring) { \
- ASSERT(rx_ring != NULL); \
+#define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \
ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
- rx_ring->rr_blank(rx_ring->rr_handle, \
- MIN((sqp->sq_avg_drain_time * sqp->sq_count), \
- rx_ring->rr_max_blank_time), \
- rx_ring->rr_max_pkt_cnt); \
- rx_ring->rr_poll_state |= ILL_POLLING; \
- rx_ring->rr_poll_time = lbolt; \
+ if (sq_poll_capable) { \
+ ASSERT(rx_ring != NULL); \
+ ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
+ if (!(sqp->sq_state & SQS_POLLING)) { \
+ sqp->sq_state |= SQS_POLLING; \
+ rx_ring->rr_intr_disable(rx_ring->rr_intr_handle); \
+ } \
+ } \
}
+#define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \
+ ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
+ if (sq_poll_capable) { \
+ ASSERT(rx_ring != NULL); \
+ ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
+ if (sqp->sq_state & SQS_POLLING) { \
+ sqp->sq_state &= ~SQS_POLLING; \
+ rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
+ } \
+ } \
+}
-#define SQS_POLLING_OFF(sqp, rx_ring) { \
- ASSERT(rx_ring != NULL); \
+#define SQS_POLL_RING(sqp, sq_poll_capable) { \
ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
- rx_ring->rr_blank(rx_ring->rr_handle, \
- rx_ring->rr_min_blank_time, \
- rx_ring->rr_min_pkt_cnt); \
+ if (sq_poll_capable) { \
+ ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
+ if (!(sqp->sq_state & SQS_GET_PKTS)) { \
+ sqp->sq_state |= SQS_GET_PKTS; \
+ cv_signal(&sqp->sq_poll_cv); \
+ } \
+ } \
}
+#ifdef DEBUG
+#define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \
+ (sqp)->sq_curmp = (mp); \
+ (sqp)->sq_curproc = (proc); \
+ (sqp)->sq_connp = (connp); \
+ (mp)->b_tag = (sqp)->sq_tag = (tag); \
+}
+
+#define SQUEUE_DBG_CLEAR(sqp) { \
+ (sqp)->sq_curmp = NULL; \
+ (sqp)->sq_curproc = NULL; \
+ (sqp)->sq_connp = NULL; \
+}
+#else
+#define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
+#define SQUEUE_DBG_CLEAR(sqp)
+#endif
+
void
squeue_init(void)
{
squeue_cache = kmem_cache_create("squeue_cache",
sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
- squeue_intrdrain_ns = squeue_intrdrain_ms * SQUEUE_MSEC_TO_NSEC;
- squeue_writerdrain_ns = squeue_writerdrain_ms * SQUEUE_MSEC_TO_NSEC;
- squeue_workerdrain_ns = squeue_workerdrain_ms * SQUEUE_MSEC_TO_NSEC;
+ squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
}
/* ARGSUSED */
squeue_t *
-squeue_create(char *name, processorid_t bind, clock_t wait, pri_t pri)
+squeue_create(clock_t wait, pri_t pri)
{
squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
bzero(sqp, sizeof (squeue_t));
- (void) strncpy(sqp->sq_name, name, SQ_NAMELEN + 1);
- sqp->sq_name[SQ_NAMELEN] = '\0';
-
- sqp->sq_bind = bind;
+ sqp->sq_bind = PBIND_NONE;
+ sqp->sq_priority = pri;
sqp->sq_wait = MSEC_TO_TICK(wait);
- sqp->sq_avg_drain_time =
- drv_hztousec(NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns)) /
- NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns);
-
-#if SQUEUE_PROFILE
- if ((sqp->sq_kstat = kstat_create("ip", bind, name,
- "net", KSTAT_TYPE_NAMED,
- sizeof (squeue_kstat) / sizeof (kstat_named_t),
- KSTAT_FLAG_VIRTUAL)) != NULL) {
- sqp->sq_kstat->ks_lock = &squeue_kstat_lock;
- sqp->sq_kstat->ks_data = &squeue_kstat;
- sqp->sq_kstat->ks_update = squeue_kstat_update;
- sqp->sq_kstat->ks_private = sqp;
- kstat_install(sqp->sq_kstat);
- }
-#endif
-
sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
sqp, 0, &p0, TS_RUN, pri);
+ sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread,
+ sqp, 0, &p0, TS_RUN, pri);
+
+ sqp->sq_enter = squeue_enter;
+ sqp->sq_drain = squeue_drain;
+
return (sqp);
}
-/* ARGSUSED */
+/*
+ * Bind squeue worker thread to the specified CPU, given by CPU id.
+ * If the CPU id value is -1, bind the worker thread to the value
+ * specified in sq_bind field. If a thread is already bound to a
+ * different CPU, unbind it from the old CPU and bind to the new one.
+ */
+
void
squeue_bind(squeue_t *sqp, processorid_t bind)
{
- ASSERT(bind == -1);
-
mutex_enter(&sqp->sq_lock);
+ ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE);
+ ASSERT(MUTEX_HELD(&cpu_lock));
+
if (sqp->sq_state & SQS_BOUND) {
- mutex_exit(&sqp->sq_lock);
- return;
+ if (sqp->sq_bind == bind) {
+ mutex_exit(&sqp->sq_lock);
+ return;
+ }
+ thread_affinity_clear(sqp->sq_worker);
+ } else {
+ sqp->sq_state |= SQS_BOUND;
}
- sqp->sq_state |= SQS_BOUND;
- mutex_exit(&sqp->sq_lock);
+ if (bind != PBIND_NONE)
+ sqp->sq_bind = bind;
thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
+ mutex_exit(&sqp->sq_lock);
}
void
@@ -485,9 +289,98 @@ squeue_unbind(squeue_t *sqp)
}
sqp->sq_state &= ~SQS_BOUND;
+ thread_affinity_clear(sqp->sq_worker);
mutex_exit(&sqp->sq_lock);
+}
- thread_affinity_clear(sqp->sq_worker);
+void
+squeue_worker_wakeup(squeue_t *sqp)
+{
+ timeout_id_t tid = (sqp)->sq_tid;
+
+ ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
+
+ if (sqp->sq_wait == 0) {
+ ASSERT(tid == 0);
+ ASSERT(!(sqp->sq_state & SQS_TMO_PROG));
+ sqp->sq_awaken = lbolt;
+ cv_signal(&sqp->sq_worker_cv);
+ mutex_exit(&sqp->sq_lock);
+ return;
+ }
+
+ /*
+ * Queue isn't being processed, so take
+ * any post enqueue actions needed before leaving.
+ */
+ if (tid != 0) {
+ /*
+ * Waiting for an enter() to process mblk(s).
+ */
+ clock_t waited = lbolt - sqp->sq_awaken;
+
+ if (TICK_TO_MSEC(waited) >= sqp->sq_wait) {
+ /*
+ * Times up and have a worker thread
+ * waiting for work, so schedule it.
+ */
+ sqp->sq_tid = 0;
+ sqp->sq_awaken = lbolt;
+ cv_signal(&sqp->sq_worker_cv);
+ mutex_exit(&sqp->sq_lock);
+ (void) untimeout(tid);
+ return;
+ }
+ mutex_exit(&sqp->sq_lock);
+ return;
+ } else if (sqp->sq_state & SQS_TMO_PROG) {
+ mutex_exit(&sqp->sq_lock);
+ return;
+ } else {
+ clock_t wait = sqp->sq_wait;
+ /*
+ * Wait up to sqp->sq_wait ms for an
+ * enter() to process this queue. We
+ * don't want to contend on timeout locks
+ * with sq_lock held for performance reasons,
+ * so drop the sq_lock before calling timeout
+ * but we need to check if timeout is required
+ * after re acquiring the sq_lock. Once
+ * the sq_lock is dropped, someone else could
+ * have processed the packet or the timeout could
+ * have already fired.
+ */
+ sqp->sq_state |= SQS_TMO_PROG;
+ mutex_exit(&sqp->sq_lock);
+ tid = timeout(squeue_fire, sqp, wait);
+ mutex_enter(&sqp->sq_lock);
+ /* Check again if we still need the timeout */
+ if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==
+ SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
+ (sqp->sq_first != NULL)) {
+ sqp->sq_state &= ~SQS_TMO_PROG;
+ sqp->sq_tid = tid;
+ mutex_exit(&sqp->sq_lock);
+ return;
+ } else {
+ if (sqp->sq_state & SQS_TMO_PROG) {
+ sqp->sq_state &= ~SQS_TMO_PROG;
+ mutex_exit(&sqp->sq_lock);
+ (void) untimeout(tid);
+ } else {
+ /*
+ * The timer fired before we could
+ * reacquire the sq_lock. squeue_fire
+ * removes the SQS_TMO_PROG flag
+ * and we don't need to do anything
+ * else.
+ */
+ mutex_exit(&sqp->sq_lock);
+ }
+ }
+ }
+
+ ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
}
/*
@@ -500,18 +393,20 @@ squeue_unbind(squeue_t *sqp)
*
* The proc and arg for each mblk is already stored in the mblk in
* appropriate places.
+ *
+ * The process_flag specifies if we are allowed to process the mblk
+ * and drain in the entering thread context. If process_flag is
+ * SQ_FILL, then we just queue the mblk and return (after signaling
+ * the worker thread if no one else is processing the squeue).
*/
+/* ARGSUSED */
void
-squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail,
- uint32_t cnt, uint8_t tag)
+squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
+ int process_flag, uint8_t tag)
{
- int interrupt = servicing_interrupt();
- void *arg;
+ conn_t *connp;
sqproc_t proc;
hrtime_t now;
-#if SQUEUE_PROFILE
- hrtime_t start, delta;
-#endif
ASSERT(sqp != NULL);
ASSERT(mp != NULL);
@@ -520,355 +415,111 @@ squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail,
ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
mutex_enter(&sqp->sq_lock);
- if (!(sqp->sq_state & SQS_PROC)) {
+
+ /*
+ * Try to process the packet if SQ_FILL flag is not set and
+ * we are allowed to process the squeue. The SQ_NODRAIN is
+ * ignored if the packet chain consists of more than 1 packet.
+ */
+ if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) ||
+ (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) {
/*
* See if anything is already queued. If we are the
* first packet, do inline processing else queue the
* packet and do the drain.
*/
- sqp->sq_run = curthread;
if (sqp->sq_first == NULL && cnt == 1) {
/*
* Fast-path, ok to process and nothing queued.
*/
sqp->sq_state |= (SQS_PROC|SQS_FAST);
+ sqp->sq_run = curthread;
mutex_exit(&sqp->sq_lock);
/*
* We are the chain of 1 packet so
* go through this fast path.
*/
- arg = mp->b_prev;
+ ASSERT(mp->b_prev != NULL);
+ ASSERT(mp->b_queue != NULL);
+ connp = (conn_t *)mp->b_prev;
mp->b_prev = NULL;
proc = (sqproc_t)mp->b_queue;
mp->b_queue = NULL;
-
- ASSERT(proc != NULL);
- ASSERT(arg != NULL);
+ ASSERT(proc != NULL && connp != NULL);
ASSERT(mp->b_next == NULL);
-#if SQUEUE_DEBUG
- sqp->sq_isintr = interrupt;
- sqp->sq_curmp = mp;
- sqp->sq_curproc = proc;
- sqp->sq_connp = arg;
- mp->b_tag = sqp->sq_tag = tag;
-#endif
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- if (interrupt)
- SQSTAT(sqp, sq_npackets_intr);
- else
- SQSTAT(sqp, sq_npackets_other);
- start = gethrtime();
- }
-#endif
- ((conn_t *)arg)->conn_on_sqp = B_TRUE;
- DTRACE_PROBE3(squeue__proc__start, squeue_t *,
- sqp, mblk_t *, mp, conn_t *, arg);
- (*proc)(arg, mp, sqp);
- DTRACE_PROBE2(squeue__proc__end, squeue_t *,
- sqp, conn_t *, arg);
- ((conn_t *)arg)->conn_on_sqp = B_FALSE;
-
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- delta = gethrtime() - start;
- if (interrupt)
- SQDELTA(sqp, sq_time_intr, delta);
- else
- SQDELTA(sqp, sq_time_other, delta);
- }
-#endif
-#if SQUEUE_DEBUG
- sqp->sq_curmp = NULL;
- sqp->sq_curproc = NULL;
- sqp->sq_connp = NULL;
- sqp->sq_isintr = 0;
-#endif
-
- CONN_DEC_REF((conn_t *)arg);
- ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
- mutex_enter(&sqp->sq_lock);
- sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
- if (sqp->sq_first == NULL) {
- /*
- * We processed inline our packet and
- * nothing new has arrived. We are done.
- */
- sqp->sq_run = NULL;
- mutex_exit(&sqp->sq_lock);
- return;
- } else if (sqp->sq_bind != CPU->cpu_id) {
- /*
- * If the current thread is not running
- * on the CPU to which this squeue is bound,
- * then don't allow it to drain.
- */
- sqp->sq_run = NULL;
- SQUEUE_WORKER_WAKEUP(sqp);
- return;
- }
- } else {
- ENQUEUE_CHAIN(sqp, mp, tail, cnt);
-#if SQUEUE_DEBUG
- mp->b_tag = tag;
-#endif
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- if (servicing_interrupt())
- SQSTAT(sqp, sq_nqueued_intr);
- else
- SQSTAT(sqp, sq_nqueued_other);
- if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
- sqp->sq_stats.sq_max_qlen =
- sqp->sq_count;
- }
-#endif
- }
-
- /*
- * We are here because either we couldn't do inline
- * processing (because something was already queued),
- * or we had a chanin of more than one packet,
- * or something else arrived after we were done with
- * inline processing.
- */
- ASSERT(MUTEX_HELD(&sqp->sq_lock));
- ASSERT(sqp->sq_first != NULL);
-
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- start = gethrtime();
- }
-#endif
-#if SQUEUE_DEBUG
- sqp->sq_isintr = interrupt;
-#endif
-
- now = gethrtime();
- if (interrupt) {
- squeue_drain(sqp, SQS_ENTER, now +
- squeue_intrdrain_ns);
- } else {
- squeue_drain(sqp, SQS_USER, now +
- squeue_writerdrain_ns);
- }
-
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- delta = gethrtime() - start;
- if (interrupt)
- SQDELTA(sqp, sq_time_intr, delta);
- else
- SQDELTA(sqp, sq_time_other, delta);
- }
-#endif
-#if SQUEUE_DEBUG
- sqp->sq_isintr = 0;
-#endif
-
- /*
- * If we didn't do a complete drain, the worker
- * thread was already signalled by squeue_drain.
- */
- sqp->sq_run = NULL;
- mutex_exit(&sqp->sq_lock);
- return;
- } else {
- ASSERT(sqp->sq_run != NULL);
- /*
- * Queue is already being processed. Just enqueue
- * the packet and go away.
- */
-#if SQUEUE_DEBUG
- mp->b_tag = tag;
-#endif
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- if (servicing_interrupt())
- SQSTAT(sqp, sq_nqueued_intr);
- else
- SQSTAT(sqp, sq_nqueued_other);
- if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
- sqp->sq_stats.sq_max_qlen = sqp->sq_count;
- }
-#endif
-
- ENQUEUE_CHAIN(sqp, mp, tail, cnt);
- mutex_exit(&sqp->sq_lock);
- return;
- }
-}
-
-/*
- * squeue_enter() - enter squeue *sqp with mblk *mp with argument of *arg.
- */
-void
-squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
- uint8_t tag)
-{
- int interrupt = servicing_interrupt();
- hrtime_t now;
-#if SQUEUE_PROFILE
- hrtime_t start, delta;
-#endif
-#if SQUEUE_DEBUG
- conn_t *connp = (conn_t *)arg;
- ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
- ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
-#endif
-
- ASSERT(proc != NULL);
- ASSERT(sqp != NULL);
- ASSERT(mp != NULL);
- ASSERT(mp->b_next == NULL);
- ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
-
- mutex_enter(&sqp->sq_lock);
- if (!(sqp->sq_state & SQS_PROC)) {
- /*
- * See if anything is already queued. If we are the
- * first packet, do inline processing else queue the
- * packet and do the drain.
- */
- sqp->sq_run = curthread;
- if (sqp->sq_first == NULL) {
/*
- * Fast-path, ok to process and nothing queued.
+ * Handle squeue switching. More details in the
+ * block comment at the top of the file
*/
- sqp->sq_state |= (SQS_PROC|SQS_FAST);
- mutex_exit(&sqp->sq_lock);
-
-#if SQUEUE_DEBUG
- sqp->sq_isintr = interrupt;
- sqp->sq_curmp = mp;
- sqp->sq_curproc = proc;
- sqp->sq_connp = connp;
- mp->b_tag = sqp->sq_tag = tag;
-#endif
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- if (interrupt)
- SQSTAT(sqp, sq_npackets_intr);
- else
- SQSTAT(sqp, sq_npackets_other);
- start = gethrtime();
+ if (connp->conn_sqp == sqp) {
+ SQUEUE_DBG_SET(sqp, mp, proc, connp,
+ tag);
+ connp->conn_on_sqp = B_TRUE;
+ DTRACE_PROBE3(squeue__proc__start, squeue_t *,
+ sqp, mblk_t *, mp, conn_t *, connp);
+ (*proc)(connp, mp, sqp);
+ DTRACE_PROBE2(squeue__proc__end, squeue_t *,
+ sqp, conn_t *, connp);
+ connp->conn_on_sqp = B_FALSE;
+ SQUEUE_DBG_CLEAR(sqp);
+ CONN_DEC_REF(connp);
+ } else {
+ SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
+ connp, SQ_FILL, SQTAG_SQUEUE_CHANGE);
}
-#endif
- ((conn_t *)arg)->conn_on_sqp = B_TRUE;
- DTRACE_PROBE3(squeue__proc__start, squeue_t *,
- sqp, mblk_t *, mp, conn_t *, arg);
- (*proc)(arg, mp, sqp);
- DTRACE_PROBE2(squeue__proc__end, squeue_t *,
- sqp, conn_t *, arg);
- ((conn_t *)arg)->conn_on_sqp = B_FALSE;
-
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- delta = gethrtime() - start;
- if (interrupt)
- SQDELTA(sqp, sq_time_intr, delta);
- else
- SQDELTA(sqp, sq_time_other, delta);
- }
-#endif
-#if SQUEUE_DEBUG
- sqp->sq_curmp = NULL;
- sqp->sq_curproc = NULL;
- sqp->sq_connp = NULL;
- sqp->sq_isintr = 0;
-#endif
-
- CONN_DEC_REF((conn_t *)arg);
ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
mutex_enter(&sqp->sq_lock);
sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
- if (sqp->sq_first == NULL) {
+ sqp->sq_run = NULL;
+ if (sqp->sq_first == NULL ||
+ process_flag == SQ_NODRAIN) {
+ if (sqp->sq_first != NULL) {
+ squeue_worker_wakeup(sqp);
+ return;
+ }
/*
- * We processed inline our packet and
- * nothing new has arrived. We are done.
+ * We processed inline our packet and nothing
+ * new has arrived. We are done. In case any
+ * control actions are pending, wake up the
+ * worker.
*/
- sqp->sq_run = NULL;
+ if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
+ cv_signal(&sqp->sq_worker_cv);
mutex_exit(&sqp->sq_lock);
return;
- } else if (sqp->sq_bind != CPU->cpu_id) {
- /*
- * If the current thread is not running
- * on the CPU to which this squeue is bound,
- * then don't allow it to drain.
- */
- sqp->sq_run = NULL;
- SQUEUE_WORKER_WAKEUP(sqp);
- return;
}
} else {
- ENQUEUE_MP(sqp, mp, proc, arg);
-#if SQUEUE_DEBUG
+ ENQUEUE_CHAIN(sqp, mp, tail, cnt);
+#ifdef DEBUG
mp->b_tag = tag;
#endif
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- if (servicing_interrupt())
- SQSTAT(sqp, sq_nqueued_intr);
- else
- SQSTAT(sqp, sq_nqueued_other);
- if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
- sqp->sq_stats.sq_max_qlen =
- sqp->sq_count;
- }
-#endif
}
-
/*
* We are here because either we couldn't do inline
- * processing (because something was already queued)
+ * processing (because something was already queued),
+ * or we had a chain of more than one packet,
* or something else arrived after we were done with
* inline processing.
*/
ASSERT(MUTEX_HELD(&sqp->sq_lock));
ASSERT(sqp->sq_first != NULL);
-
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- start = gethrtime();
- }
-#endif
-#if SQUEUE_DEBUG
- sqp->sq_isintr = interrupt;
-#endif
-
now = gethrtime();
- if (interrupt) {
- squeue_drain(sqp, SQS_ENTER, now +
- squeue_intrdrain_ns);
- } else {
- squeue_drain(sqp, SQS_USER, now +
- squeue_writerdrain_ns);
- }
-
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- delta = gethrtime() - start;
- if (interrupt)
- SQDELTA(sqp, sq_time_intr, delta);
- else
- SQDELTA(sqp, sq_time_other, delta);
- }
-#endif
-#if SQUEUE_DEBUG
- sqp->sq_isintr = 0;
-#endif
+ sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns);
/*
* If we didn't do a complete drain, the worker
* thread was already signalled by squeue_drain.
+ * In case any control actions are pending, wake
+ * up the worker.
*/
sqp->sq_run = NULL;
+ if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
+ cv_signal(&sqp->sq_worker_cv);
mutex_exit(&sqp->sq_lock);
return;
} else {
- ASSERT(sqp->sq_run != NULL);
/*
* We let a thread processing a squeue reenter only
* once. This helps the case of incoming connection
@@ -878,168 +529,42 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
* loopback connection where the two ends are bound
* to the same squeue (which is typical on single
* CPU machines).
+ *
* We let the thread reenter only once for the fear
* of stack getting blown with multiple traversal.
*/
+ connp = (conn_t *)mp->b_prev;
if (!(sqp->sq_state & SQS_REENTER) &&
- (sqp->sq_run == curthread) && sqp->sq_first == NULL &&
- (((conn_t *)arg)->conn_on_sqp == B_FALSE)) {
+ (process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
+ (sqp->sq_run == curthread) && (cnt == 1) &&
+ (connp->conn_on_sqp == B_FALSE)) {
sqp->sq_state |= SQS_REENTER;
mutex_exit(&sqp->sq_lock);
- ((conn_t *)arg)->conn_on_sqp = B_TRUE;
- DTRACE_PROBE3(squeue__proc__start, squeue_t *,
- sqp, mblk_t *, mp, conn_t *, arg);
- (*proc)(arg, mp, sqp);
- DTRACE_PROBE2(squeue__proc__end, squeue_t *,
- sqp, conn_t *, arg);
- ((conn_t *)arg)->conn_on_sqp = B_FALSE;
- CONN_DEC_REF((conn_t *)arg);
-
- mutex_enter(&sqp->sq_lock);
- sqp->sq_state &= ~SQS_REENTER;
- mutex_exit(&sqp->sq_lock);
- return;
- }
- /*
- * Queue is already being processed. Just enqueue
- * the packet and go away.
- */
-#if SQUEUE_DEBUG
- mp->b_tag = tag;
-#endif
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- if (servicing_interrupt())
- SQSTAT(sqp, sq_nqueued_intr);
- else
- SQSTAT(sqp, sq_nqueued_other);
- if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
- sqp->sq_stats.sq_max_qlen = sqp->sq_count;
- }
-#endif
-
- ENQUEUE_MP(sqp, mp, proc, arg);
- mutex_exit(&sqp->sq_lock);
- return;
- }
-}
-
-void
-squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
- uint8_t tag)
-{
- int interrupt = servicing_interrupt();
- boolean_t being_processed;
-#if SQUEUE_DEBUG
- conn_t *connp = (conn_t *)arg;
-#endif
-#if SQUEUE_PROFILE
- hrtime_t start, delta;
-#endif
+ ASSERT(mp->b_prev != NULL);
+ ASSERT(mp->b_queue != NULL);
- ASSERT(proc != NULL);
- ASSERT(sqp != NULL);
- ASSERT(mp != NULL);
- ASSERT(mp->b_next == NULL);
- ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
- ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
- ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
-
- mutex_enter(&sqp->sq_lock);
-
- being_processed = (sqp->sq_state & SQS_PROC);
- if (!being_processed && (sqp->sq_first == NULL)) {
- /*
- * Fast-path, ok to process and nothing queued.
- */
- sqp->sq_state |= (SQS_PROC|SQS_FAST);
- sqp->sq_run = curthread;
- mutex_exit(&sqp->sq_lock);
-
-#if SQUEUE_DEBUG
- sqp->sq_isintr = interrupt;
- sqp->sq_curmp = mp;
- sqp->sq_curproc = proc;
- sqp->sq_connp = connp;
- mp->b_tag = sqp->sq_tag = tag;
-#endif
-
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- if (interrupt)
- SQSTAT(sqp, sq_npackets_intr);
- else
- SQSTAT(sqp, sq_npackets_other);
- start = gethrtime();
- }
-#endif
-
- ((conn_t *)arg)->conn_on_sqp = B_TRUE;
- DTRACE_PROBE3(squeue__proc__start, squeue_t *,
- sqp, mblk_t *, mp, conn_t *, arg);
- (*proc)(arg, mp, sqp);
- DTRACE_PROBE2(squeue__proc__end, squeue_t *,
- sqp, conn_t *, arg);
- ((conn_t *)arg)->conn_on_sqp = B_FALSE;
-
-#if SQUEUE_DEBUG
- sqp->sq_curmp = NULL;
- sqp->sq_curproc = NULL;
- sqp->sq_connp = NULL;
- sqp->sq_isintr = 0;
-#endif
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- delta = gethrtime() - start;
- if (interrupt)
- SQDELTA(sqp, sq_time_intr, delta);
- else
- SQDELTA(sqp, sq_time_other, delta);
- }
-#endif
+ mp->b_prev = NULL;
+ proc = (sqproc_t)mp->b_queue;
+ mp->b_queue = NULL;
- CONN_DEC_REF((conn_t *)arg);
- mutex_enter(&sqp->sq_lock);
- sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
- sqp->sq_run = NULL;
- if (sqp->sq_first == NULL) {
/*
- * We processed inline our packet and
- * nothing new has arrived. We are done.
+ * Handle squeue switching. More details in the
+ * block comment at the top of the file
*/
- mutex_exit(&sqp->sq_lock);
- } else {
- SQUEUE_WORKER_WAKEUP(sqp);
- }
- return;
- } else {
- /*
- * We let a thread processing a squeue reenter only
- * once. This helps the case of incoming connection
- * where a SYN-ACK-ACK that triggers the conn_ind
- * doesn't have to queue the packet if listener and
- * eager are on the same squeue. Also helps the
- * loopback connection where the two ends are bound
- * to the same squeue (which is typical on single
- * CPU machines).
- * We let the thread reenter only once for the fear
- * of stack getting blown with multiple traversal.
- */
- if (being_processed && !(sqp->sq_state & SQS_REENTER) &&
- (sqp->sq_run == curthread) && sqp->sq_first == NULL &&
- (((conn_t *)arg)->conn_on_sqp == B_FALSE)) {
- sqp->sq_state |= SQS_REENTER;
- mutex_exit(&sqp->sq_lock);
-
- ((conn_t *)arg)->conn_on_sqp = B_TRUE;
- DTRACE_PROBE3(squeue__proc__start, squeue_t *,
- sqp, mblk_t *, mp, conn_t *, arg);
- (*proc)(arg, mp, sqp);
- DTRACE_PROBE2(squeue__proc__end, squeue_t *,
- sqp, conn_t *, arg);
- ((conn_t *)arg)->conn_on_sqp = B_FALSE;
- CONN_DEC_REF((conn_t *)arg);
+ if (connp->conn_sqp == sqp) {
+ connp->conn_on_sqp = B_TRUE;
+ DTRACE_PROBE3(squeue__proc__start, squeue_t *,
+ sqp, mblk_t *, mp, conn_t *, connp);
+ (*proc)(connp, mp, sqp);
+ DTRACE_PROBE2(squeue__proc__end, squeue_t *,
+ sqp, conn_t *, connp);
+ connp->conn_on_sqp = B_FALSE;
+ CONN_DEC_REF(connp);
+ } else {
+ SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
+ connp, SQ_FILL, SQTAG_SQUEUE_CHANGE);
+ }
mutex_enter(&sqp->sq_lock);
sqp->sq_state &= ~SQS_REENTER;
@@ -1047,80 +572,32 @@ squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
return;
}
-#if SQUEUE_DEBUG
+ /*
+ * Queue is already being processed or there is already
+ * one or more paquets on the queue. Enqueue the
+ * packet and wakeup the squeue worker thread if the
+ * squeue is not being processed.
+ */
+#ifdef DEBUG
mp->b_tag = tag;
#endif
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- if (servicing_interrupt())
- SQSTAT(sqp, sq_nqueued_intr);
- else
- SQSTAT(sqp, sq_nqueued_other);
- if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
- sqp->sq_stats.sq_max_qlen = sqp->sq_count;
- }
-#endif
- ENQUEUE_MP(sqp, mp, proc, arg);
- if (being_processed) {
- /*
- * Queue is already being processed.
- * No need to do anything.
- */
- mutex_exit(&sqp->sq_lock);
+
+ ENQUEUE_CHAIN(sqp, mp, tail, cnt);
+ if (!(sqp->sq_state & SQS_PROC)) {
+ squeue_worker_wakeup(sqp);
return;
}
- SQUEUE_WORKER_WAKEUP(sqp);
- }
-}
-
-/*
- * squeue_fill() - fill squeue *sqp with mblk *mp with argument of *arg
- * without processing the squeue.
- */
-/* ARGSUSED */
-void
-squeue_fill(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void * arg,
- uint8_t tag)
-{
-#if SQUEUE_DEBUG
- conn_t *connp = (conn_t *)arg;
-#endif
- ASSERT(proc != NULL);
- ASSERT(sqp != NULL);
- ASSERT(mp != NULL);
- ASSERT(mp->b_next == NULL);
- ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
- ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
-
- ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
- mutex_enter(&sqp->sq_lock);
- ENQUEUE_MP(sqp, mp, proc, arg);
-#if SQUEUE_DEBUG
- mp->b_tag = tag;
-#endif
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- if (servicing_interrupt())
- SQSTAT(sqp, sq_nqueued_intr);
- else
- SQSTAT(sqp, sq_nqueued_other);
- if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
- sqp->sq_stats.sq_max_qlen = sqp->sq_count;
- }
-#endif
-
- /*
- * If queue is already being processed. No need to do anything.
- */
- if (sqp->sq_state & SQS_PROC) {
+ /*
+ * In case any control actions are pending, wake
+ * up the worker.
+ */
+ if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
+ cv_signal(&sqp->sq_worker_cv);
mutex_exit(&sqp->sq_lock);
return;
}
-
- SQUEUE_WORKER_WAKEUP(sqp);
}
-
/*
* PRIVATE FUNCTIONS
*/
@@ -1151,7 +628,7 @@ squeue_fire(void *arg)
if (!(state & SQS_PROC)) {
sqp->sq_awaken = lbolt;
- cv_signal(&sqp->sq_async);
+ cv_signal(&sqp->sq_worker_cv);
}
mutex_exit(&sqp->sq_lock);
}
@@ -1159,64 +636,52 @@ squeue_fire(void *arg)
static void
squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
{
- mblk_t *mp;
- mblk_t *head;
- sqproc_t proc;
- conn_t *connp;
- clock_t start = lbolt;
- clock_t drain_time;
- timeout_id_t tid;
- uint_t cnt;
- uint_t total_cnt = 0;
+ mblk_t *mp;
+ mblk_t *head;
+ sqproc_t proc;
+ conn_t *connp;
+ timeout_id_t tid;
ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring;
- int interrupt = servicing_interrupt();
- boolean_t poll_on = B_FALSE;
- hrtime_t now;
+ hrtime_t now;
+ boolean_t did_wakeup = B_FALSE;
+ boolean_t sq_poll_capable;
+ sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0;
+again:
ASSERT(mutex_owned(&sqp->sq_lock));
- ASSERT(!(sqp->sq_state & SQS_PROC));
-
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- if (interrupt)
- SQSTAT(sqp, sq_ndrains_intr);
- else if (!(proc_type & SQS_WORKER))
- SQSTAT(sqp, sq_ndrains_other);
- else
- SQSTAT(sqp, sq_ndrains_worker);
- }
-#endif
+ ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
+ SQS_POLL_QUIESCE_DONE)));
+
+ head = sqp->sq_first;
+ sqp->sq_first = NULL;
+ sqp->sq_last = NULL;
+ sqp->sq_count = 0;
if ((tid = sqp->sq_tid) != 0)
sqp->sq_tid = 0;
sqp->sq_state |= SQS_PROC | proc_type;
- head = sqp->sq_first;
- sqp->sq_first = NULL;
- sqp->sq_last = NULL;
- cnt = sqp->sq_count;
+
/*
* We have backlog built up. Switch to polling mode if the
- * device underneath allows it. Need to do it only for
- * drain by non-interrupt thread so interrupts don't
- * come and disrupt us in between. If its a interrupt thread,
- * no need because most devices will not issue another
- * interrupt till this one returns.
+ * device underneath allows it. Need to do it so that
+ * more packets don't come in and disturb us (by contending
+ * for sq_lock or higher priority thread preempting us).
+ *
+ * The worker thread is allowed to do active polling while we
+ * just disable the interrupts for drain by non worker (kernel
+ * or userland) threads so they can peacefully process the
+ * packets during time allocated to them.
*/
- if ((sqp->sq_state & SQS_POLL_CAPAB) && !(proc_type & SQS_ENTER) &&
- (sqp->sq_count > squeue_worker_poll_min)) {
- ASSERT(sq_rx_ring != NULL);
- SQS_POLLING_ON(sqp, sq_rx_ring);
- poll_on = B_TRUE;
- }
-
+ SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
mutex_exit(&sqp->sq_lock);
if (tid != 0)
(void) untimeout(tid);
-again:
+
while ((mp = head) != NULL) {
+
head = mp->b_next;
mp->b_next = NULL;
@@ -1224,255 +689,548 @@ again:
mp->b_queue = NULL;
connp = (conn_t *)mp->b_prev;
mp->b_prev = NULL;
-#if SQUEUE_DEBUG
- sqp->sq_curmp = mp;
- sqp->sq_curproc = proc;
- sqp->sq_connp = connp;
- sqp->sq_tag = mp->b_tag;
-#endif
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- if (interrupt)
- SQSTAT(sqp, sq_npackets_intr);
- else if (!(proc_type & SQS_WORKER))
- SQSTAT(sqp, sq_npackets_other);
- else
- SQSTAT(sqp, sq_npackets_worker);
+ /*
+ * Handle squeue switching. More details in the
+ * block comment at the top of the file
+ */
+ if (connp->conn_sqp == sqp) {
+ SQUEUE_DBG_SET(sqp, mp, proc, connp,
+ mp->b_tag);
+ connp->conn_on_sqp = B_TRUE;
+ DTRACE_PROBE3(squeue__proc__start, squeue_t *,
+ sqp, mblk_t *, mp, conn_t *, connp);
+ (*proc)(connp, mp, sqp);
+ DTRACE_PROBE2(squeue__proc__end, squeue_t *,
+ sqp, conn_t *, connp);
+ connp->conn_on_sqp = B_FALSE;
+ CONN_DEC_REF(connp);
+ } else {
+ SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp,
+ SQ_FILL, SQTAG_SQUEUE_CHANGE);
}
-#endif
-
- connp->conn_on_sqp = B_TRUE;
- DTRACE_PROBE3(squeue__proc__start, squeue_t *,
- sqp, mblk_t *, mp, conn_t *, connp);
- (*proc)(connp, mp, sqp);
- DTRACE_PROBE2(squeue__proc__end, squeue_t *,
- sqp, conn_t *, connp);
- connp->conn_on_sqp = B_FALSE;
- CONN_DEC_REF(connp);
}
-
-#if SQUEUE_DEBUG
- sqp->sq_curmp = NULL;
- sqp->sq_curproc = NULL;
- sqp->sq_connp = NULL;
-#endif
+ SQUEUE_DBG_CLEAR(sqp);
mutex_enter(&sqp->sq_lock);
- sqp->sq_count -= cnt;
- total_cnt += cnt;
+ /*
+ * Check if there is still work to do (either more arrived or timer
+ * expired). If we are the worker thread and we are polling capable,
+ * continue doing the work since no one else is around to do the
+ * work anyway (but signal the poll thread to retrieve some packets
+ * in the meanwhile). If we are not the worker thread, just
+ * signal the worker thread to take up the work if processing time
+ * has expired.
+ */
if (sqp->sq_first != NULL) {
-
- now = gethrtime();
- if (!expire || (now < expire)) {
- /* More arrived and time not expired */
- head = sqp->sq_first;
- sqp->sq_first = NULL;
- sqp->sq_last = NULL;
- cnt = sqp->sq_count;
- mutex_exit(&sqp->sq_lock);
- goto again;
- }
-
/*
- * If we are not worker thread and we
- * reached our time limit to do drain,
- * signal the worker thread to pick
- * up the work.
- * If we were the worker thread, then
- * we take a break to allow an interrupt
- * or writer to pick up the load.
+ * Still more to process. If time quanta not expired, we
+ * should let the drain go on. The worker thread is allowed
+ * to drain as long as there is anything left.
*/
- if (proc_type != SQS_WORKER) {
+ now = gethrtime();
+ if ((now < expire) || (proc_type == SQS_WORKER)) {
+ /*
+ * If time not expired or we are worker thread and
+ * this squeue is polling capable, continue to do
+ * the drain.
+ *
+ * We turn off interrupts for all userland threads
+ * doing drain but we do active polling only for
+ * worker thread.
+ */
+ if (proc_type == SQS_WORKER)
+ SQS_POLL_RING(sqp, sq_poll_capable);
+ goto again;
+ } else {
+ did_wakeup = B_TRUE;
sqp->sq_awaken = lbolt;
- cv_signal(&sqp->sq_async);
+ cv_signal(&sqp->sq_worker_cv);
}
}
/*
- * Try to see if we can get a time estimate to process a packet.
- * Do it only in interrupt context since less chance of context
- * switch or pinning etc. to get a better estimate.
+ * If the poll thread is already running, just return. The
+ * poll thread continues to hold the proc and will finish
+ * processing.
*/
- if (interrupt && ((drain_time = (lbolt - start)) > 0))
- sqp->sq_avg_drain_time = ((80 * sqp->sq_avg_drain_time) +
- (20 * (drv_hztousec(drain_time)/total_cnt)))/100;
-
- sqp->sq_state &= ~(SQS_PROC | proc_type);
+ if (sqp->sq_state & SQS_GET_PKTS) {
+ ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
+ SQS_POLL_QUIESCE_DONE)));
+ sqp->sq_state &= ~proc_type;
+ return;
+ }
/*
- * If polling was turned on, turn it off and reduce the default
- * interrupt blank interval as well to bring new packets in faster
- * (reduces the latency when there is no backlog).
+ *
+ * If we are the worker thread and no work is left, send the poll
+ * thread down once more to see if something arrived. Otherwise,
+ * turn the interrupts back on and we are done.
*/
- if (poll_on && (sqp->sq_state & SQS_POLL_CAPAB)) {
- ASSERT(sq_rx_ring != NULL);
- SQS_POLLING_OFF(sqp, sq_rx_ring);
+ if ((proc_type == SQS_WORKER) &&
+ (sqp->sq_state & SQS_POLL_CAPAB)) {
+ /*
+ * Do one last check to see if anything arrived
+ * in the NIC. We leave the SQS_PROC set to ensure
+ * that poll thread keeps the PROC and can decide
+ * if it needs to turn polling off or continue
+ * processing.
+ *
+ * If we drop the SQS_PROC here and poll thread comes
+ * up empty handed, it can not safely turn polling off
+ * since someone else could have acquired the PROC
+ * and started draining. The previously running poll
+ * thread and the current thread doing drain would end
+ * up in a race for turning polling on/off and more
+ * complex code would be required to deal with it.
+ *
+ * Its lot simpler for drain to hand the SQS_PROC to
+ * poll thread (if running) and let poll thread finish
+ * without worrying about racing with any other thread.
+ */
+ ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
+ SQS_POLL_QUIESCE_DONE)));
+ SQS_POLL_RING(sqp, sq_poll_capable);
+ sqp->sq_state &= ~proc_type;
+ } else {
+ /*
+ * The squeue is either not capable of polling or
+ * poll thread already finished processing and didn't
+ * find anything. Since there is nothing queued and
+ * we already turn polling on (for all threads doing
+ * drain), we should turn polling off and relinquish
+ * the PROC.
+ */
+ ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
+ SQS_POLL_QUIESCE_DONE)));
+ SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
+ sqp->sq_state &= ~(SQS_PROC | proc_type);
+ if (!did_wakeup && sqp->sq_first != NULL) {
+ squeue_worker_wakeup(sqp);
+ mutex_enter(&sqp->sq_lock);
+ }
+ /*
+ * If we are not the worker and there is a pending quiesce
+ * event, wake up the worker
+ */
+ if ((proc_type != SQS_WORKER) &&
+ (sqp->sq_state & SQS_WORKER_THR_CONTROL))
+ cv_signal(&sqp->sq_worker_cv);
}
}
+/*
+ * Quiesce, Restart, or Cleanup of the squeue poll thread.
+ *
+ * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
+ * not attempt to poll the underlying soft ring any more. The quiesce is
+ * triggered by the mac layer when it wants to quiesce a soft ring. Typically
+ * control operations such as changing the fanout of a NIC or VNIC (dladm
+ * setlinkprop) need to quiesce data flow before changing the wiring.
+ * The operation is done by the mac layer, but it calls back into IP to
+ * quiesce the soft ring. After completing the operation (say increase or
+ * decrease of the fanout) the mac layer then calls back into IP to restart
+ * the quiesced soft ring.
+ *
+ * Cleanup: This is triggered when the squeue binding to a soft ring is
+ * removed permanently. Typically interface plumb and unplumb would trigger
+ * this. It can also be triggered from the mac layer when a soft ring is
+ * being deleted say as the result of a fanout reduction. Since squeues are
+ * never deleted, the cleanup marks the squeue as fit for recycling and
+ * moves it to the zeroth squeue set.
+ */
static void
-squeue_worker(squeue_t *sqp)
+squeue_poll_thr_control(squeue_t *sqp)
+{
+ if (sqp->sq_state & SQS_POLL_THR_RESTART) {
+ /* Restart implies a previous quiesce */
+ ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED);
+ sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED |
+ SQS_POLL_THR_RESTART);
+ sqp->sq_state |= SQS_POLL_CAPAB;
+ cv_signal(&sqp->sq_worker_cv);
+ return;
+ }
+
+ if (sqp->sq_state & SQS_POLL_THR_QUIESCE) {
+ sqp->sq_state |= SQS_POLL_THR_QUIESCED;
+ sqp->sq_state &= ~SQS_POLL_THR_QUIESCE;
+ cv_signal(&sqp->sq_worker_cv);
+ return;
+ }
+}
+
+/*
+ * POLLING Notes
+ *
+ * With polling mode, we want to do as much processing as we possibly can
+ * in worker thread context. The sweet spot is worker thread keeps doing
+ * work all the time in polling mode and writers etc. keep dumping packets
+ * to worker thread. Occassionally, we send the poll thread (running at
+ * lower priority to NIC to get the chain of packets to feed to worker).
+ * Sending the poll thread down to NIC is dependant on 3 criterions
+ *
+ * 1) Its always driven from squeue_drain and only if worker thread is
+ * doing the drain.
+ * 2) We clear the backlog once and more packets arrived in between.
+ * Before starting drain again, send the poll thread down if
+ * the drain is being done by worker thread.
+ * 3) Before exiting the squeue_drain, if the poll thread is not already
+ * working and we are the worker thread, try to poll one more time.
+ *
+ * For latency sake, we do allow any thread calling squeue_enter
+ * to process its packet provided:
+ *
+ * 1) Nothing is queued
+ * 2) If more packets arrived in between, the non worker thread are allowed
+ * to do the drain till their time quanta expired provided SQS_GET_PKTS
+ * wasn't set in between.
+ *
+ * Avoiding deadlocks with interrupts
+ * ==================================
+ *
+ * One of the big problem is that we can't send poll_thr down while holding
+ * the sq_lock since the thread can block. So we drop the sq_lock before
+ * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
+ * poll thread is running so that no other thread can acquire the
+ * perimeter in between. If the squeue_drain gets done (no more work
+ * left), it leaves the SQS_PROC set if poll thread is running.
+ */
+
+/*
+ * This is the squeue poll thread. In poll mode, it polls the underlying
+ * TCP softring and feeds packets into the squeue. The worker thread then
+ * drains the squeue. The poll thread also responds to control signals for
+ * quiesceing, restarting, or cleanup of an squeue. These are driven by
+ * control operations like plumb/unplumb or as a result of dynamic Rx ring
+ * related operations that are driven from the mac layer.
+ */
+static void
+squeue_polling_thread(squeue_t *sqp)
{
kmutex_t *lock = &sqp->sq_lock;
- kcondvar_t *async = &sqp->sq_async;
+ kcondvar_t *async = &sqp->sq_poll_cv;
+ ip_mac_rx_t sq_get_pkts;
+ ip_accept_t ip_accept;
+ ill_rx_ring_t *sq_rx_ring;
+ ill_t *sq_ill;
+ mblk_t *head, *tail, *mp;
+ uint_t cnt;
+ void *sq_mac_handle;
callb_cpr_t cprinfo;
- hrtime_t now;
-#if SQUEUE_PROFILE
- hrtime_t start;
-#endif
+ size_t bytes_to_pickup;
+ uint32_t ctl_state;
- CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "nca");
+ CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll");
mutex_enter(lock);
for (;;) {
- while (sqp->sq_first == NULL || (sqp->sq_state & SQS_PROC)) {
- CALLB_CPR_SAFE_BEGIN(&cprinfo);
-still_wait:
- cv_wait(async, lock);
- if (sqp->sq_state & SQS_PROC) {
- goto still_wait;
- }
- CALLB_CPR_SAFE_END(&cprinfo, lock);
+ CALLB_CPR_SAFE_BEGIN(&cprinfo);
+ cv_wait(async, lock);
+ CALLB_CPR_SAFE_END(&cprinfo, lock);
+
+ ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
+ SQS_POLL_THR_QUIESCED);
+ if (ctl_state != 0) {
+ /*
+ * If the squeue is quiesced, then wait for a control
+ * request. A quiesced squeue must not poll the
+ * underlying soft ring.
+ */
+ if (ctl_state == SQS_POLL_THR_QUIESCED)
+ continue;
+ /*
+ * Act on control requests to quiesce, cleanup or
+ * restart an squeue
+ */
+ squeue_poll_thr_control(sqp);
+ continue;
}
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- start = gethrtime();
+ if (!(sqp->sq_state & SQS_POLL_CAPAB))
+ continue;
+
+ ASSERT((sqp->sq_state &
+ (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
+ (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
+
+poll_again:
+ sq_rx_ring = sqp->sq_rx_ring;
+ sq_get_pkts = sq_rx_ring->rr_rx;
+ sq_mac_handle = sq_rx_ring->rr_rx_handle;
+ ip_accept = sq_rx_ring->rr_ip_accept;
+ sq_ill = sq_rx_ring->rr_ill;
+ bytes_to_pickup = MAX_BYTES_TO_PICKUP;
+ mutex_exit(lock);
+ head = sq_get_pkts(sq_mac_handle, bytes_to_pickup);
+ mp = NULL;
+ if (head != NULL) {
+ /*
+ * We got the packet chain from the mac layer. It
+ * would be nice to be able to process it inline
+ * for better performance but we need to give
+ * IP a chance to look at this chain to ensure
+ * that packets are really meant for this squeue
+ * and do the IP processing.
+ */
+ mp = ip_accept(sq_ill, sq_rx_ring, sqp, head,
+ &tail, &cnt);
}
-#endif
+ mutex_enter(lock);
+ if (mp != NULL)
+ ENQUEUE_CHAIN(sqp, mp, tail, cnt);
- ASSERT(squeue_workerdrain_ns != 0);
- now = gethrtime();
- sqp->sq_run = curthread;
- squeue_drain(sqp, SQS_WORKER, now + squeue_workerdrain_ns);
- sqp->sq_run = NULL;
+ ASSERT((sqp->sq_state &
+ (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
+ (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
- if (sqp->sq_first != NULL) {
+ if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) {
/*
- * Doing too much processing by worker thread
- * in presense of interrupts can be sub optimal.
- * Instead, once a drain is done by worker thread
- * for squeue_writerdrain_ns (the reason we are
- * here), we force wait for squeue_workerwait_tick
- * before doing more processing even if sq_wait is
- * set to 0.
- *
- * This can be counterproductive for performance
- * if worker thread is the only means to process
- * the packets (interrupts or writers are not
- * allowed inside the squeue).
+ * We have packets to process and worker thread
+ * is not running. Check to see if poll thread is
+ * allowed to process. Let it do processing only if it
+ * picked up some packets from the NIC otherwise
+ * wakeup the worker thread.
*/
- if (sqp->sq_tid == 0 &&
- !(sqp->sq_state & SQS_TMO_PROG)) {
- timeout_id_t tid;
+ if (mp != NULL) {
+ hrtime_t now;
+
+ now = gethrtime();
+ sqp->sq_run = curthread;
+ sqp->sq_drain(sqp, SQS_POLL_PROC, now +
+ squeue_drain_ns);
+ sqp->sq_run = NULL;
+
+ if (sqp->sq_first == NULL)
+ goto poll_again;
- sqp->sq_state |= SQS_TMO_PROG;
- mutex_exit(&sqp->sq_lock);
- tid = timeout(squeue_fire, sqp,
- squeue_workerwait_tick);
- mutex_enter(&sqp->sq_lock);
/*
- * Check again if we still need
- * the timeout
+ * Couldn't do the entire drain because the
+ * time limit expired, let the
+ * worker thread take over.
*/
- if (((sqp->sq_state & (SQS_TMO_PROG|SQS_PROC))
- == SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
- (sqp->sq_first != NULL)) {
- sqp->sq_state &= ~SQS_TMO_PROG;
- sqp->sq_awaken = lbolt;
- sqp->sq_tid = tid;
- } else if (sqp->sq_state & SQS_TMO_PROG) {
- /* timeout not needed */
- sqp->sq_state &= ~SQS_TMO_PROG;
- mutex_exit(&(sqp)->sq_lock);
- (void) untimeout(tid);
- mutex_enter(&sqp->sq_lock);
- }
}
- CALLB_CPR_SAFE_BEGIN(&cprinfo);
- cv_wait(async, lock);
- CALLB_CPR_SAFE_END(&cprinfo, lock);
- }
-
-#if SQUEUE_PROFILE
- if (SQ_PROFILING(sqp)) {
- SQDELTA(sqp, sq_time_worker, gethrtime() - start);
+ sqp->sq_awaken = lbolt;
+ /*
+ * Put the SQS_PROC_HELD on so the worker
+ * thread can distinguish where its called from. We
+ * can remove the SQS_PROC flag here and turn off the
+ * polling so that it wouldn't matter who gets the
+ * processing but we get better performance this way
+ * and save the cost of turn polling off and possibly
+ * on again as soon as we start draining again.
+ *
+ * We can't remove the SQS_PROC flag without turning
+ * polling off until we can guarantee that control
+ * will return to squeue_drain immediately.
+ */
+ sqp->sq_state |= SQS_PROC_HELD;
+ sqp->sq_state &= ~SQS_GET_PKTS;
+ cv_signal(&sqp->sq_worker_cv);
+ } else if (sqp->sq_first == NULL &&
+ !(sqp->sq_state & SQS_WORKER)) {
+ /*
+ * Nothing queued and worker thread not running.
+ * Since we hold the proc, no other thread is
+ * processing the squeue. This means that there
+ * is no work to be done and nothing is queued
+ * in squeue or in NIC. Turn polling off and go
+ * back to interrupt mode.
+ */
+ sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS);
+ /* LINTED: constant in conditional context */
+ SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring);
+ } else {
+ /*
+ * Worker thread is already running. We don't need
+ * to do anything. Indicate that poll thread is done.
+ */
+ sqp->sq_state &= ~SQS_GET_PKTS;
+ }
+ if (sqp->sq_state & SQS_POLL_THR_CONTROL) {
+ /*
+ * Act on control requests to quiesce, cleanup or
+ * restart an squeue
+ */
+ squeue_poll_thr_control(sqp);
}
-#endif
}
}
-#if SQUEUE_PROFILE
-static int
-squeue_kstat_update(kstat_t *ksp, int rw)
+/*
+ * The squeue worker thread acts on any control requests to quiesce, cleanup
+ * or restart an ill_rx_ring_t by calling this function. The worker thread
+ * synchronizes with the squeue poll thread to complete the request and finally
+ * wakes up the requestor when the request is completed.
+ */
+static void
+squeue_worker_thr_control(squeue_t *sqp)
{
- struct squeue_kstat *sqsp = &squeue_kstat;
- squeue_t *sqp = ksp->ks_private;
+ ill_t *ill;
+ ill_rx_ring_t *rx_ring;
- if (rw == KSTAT_WRITE)
- return (EACCES);
+ ASSERT(MUTEX_HELD(&sqp->sq_lock));
-#if SQUEUE_DEBUG
- sqsp->sq_count.value.ui64 = sqp->sq_count;
- sqsp->sq_max_qlen.value.ui64 = sqp->sq_stats.sq_max_qlen;
-#endif
- sqsp->sq_npackets_worker.value.ui64 = sqp->sq_stats.sq_npackets_worker;
- sqsp->sq_npackets_intr.value.ui64 = sqp->sq_stats.sq_npackets_intr;
- sqsp->sq_npackets_other.value.ui64 = sqp->sq_stats.sq_npackets_other;
- sqsp->sq_nqueued_intr.value.ui64 = sqp->sq_stats.sq_nqueued_intr;
- sqsp->sq_nqueued_other.value.ui64 = sqp->sq_stats.sq_nqueued_other;
- sqsp->sq_ndrains_worker.value.ui64 = sqp->sq_stats.sq_ndrains_worker;
- sqsp->sq_ndrains_intr.value.ui64 = sqp->sq_stats.sq_ndrains_intr;
- sqsp->sq_ndrains_other.value.ui64 = sqp->sq_stats.sq_ndrains_other;
- sqsp->sq_time_worker.value.ui64 = sqp->sq_stats.sq_time_worker;
- sqsp->sq_time_intr.value.ui64 = sqp->sq_stats.sq_time_intr;
- sqsp->sq_time_other.value.ui64 = sqp->sq_stats.sq_time_other;
- return (0);
-}
-#endif
+ if (sqp->sq_state & SQS_POLL_RESTART) {
+ /* Restart implies a previous quiesce. */
+ ASSERT((sqp->sq_state & (SQS_PROC_HELD |
+ SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) ==
+ (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER));
+ /*
+ * Request the squeue poll thread to restart and wait till
+ * it actually restarts.
+ */
+ sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE;
+ sqp->sq_state |= SQS_POLL_THR_RESTART;
+ cv_signal(&sqp->sq_poll_cv);
+ while (sqp->sq_state & SQS_POLL_THR_QUIESCED)
+ cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
+ sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC |
+ SQS_WORKER);
+ /*
+ * Signal any waiter that is waiting for the restart
+ * to complete
+ */
+ sqp->sq_state |= SQS_POLL_RESTART_DONE;
+ cv_signal(&sqp->sq_ctrlop_done_cv);
+ return;
+ }
-void
-squeue_profile_enable(squeue_t *sqp)
-{
- mutex_enter(&sqp->sq_lock);
- sqp->sq_state |= SQS_PROFILE;
- mutex_exit(&sqp->sq_lock);
-}
+ if (sqp->sq_state & SQS_PROC_HELD) {
+ /* The squeue poll thread handed control to us */
+ ASSERT(sqp->sq_state & SQS_PROC);
+ }
-void
-squeue_profile_disable(squeue_t *sqp)
-{
- mutex_enter(&sqp->sq_lock);
- sqp->sq_state &= ~SQS_PROFILE;
+ /*
+ * Prevent any other thread from processing the squeue
+ * until we finish the control actions by setting SQS_PROC.
+ * But allow ourself to reenter by setting SQS_WORKER
+ */
+ sqp->sq_state |= (SQS_PROC | SQS_WORKER);
+
+ /* Signal the squeue poll thread and wait for it to quiesce itself */
+ if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) {
+ sqp->sq_state |= SQS_POLL_THR_QUIESCE;
+ cv_signal(&sqp->sq_poll_cv);
+ while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED))
+ cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
+ }
+
+ rx_ring = sqp->sq_rx_ring;
+ ill = rx_ring->rr_ill;
+ /*
+ * The lock hierarchy is as follows.
+ * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
+ */
mutex_exit(&sqp->sq_lock);
-}
+ mutex_enter(&ill->ill_lock);
+ mutex_enter(&sqp->sq_lock);
-void
-squeue_profile_reset(squeue_t *sqp)
-{
-#if SQUEUE_PROFILE
- bzero(&sqp->sq_stats, sizeof (sqstat_t));
-#endif
-}
+ SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0,
+ sqp->sq_rx_ring);
+ sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD);
+ if (sqp->sq_state & SQS_POLL_CLEANUP) {
+ /*
+ * Disassociate this squeue from its ill_rx_ring_t.
+ * The rr_sqp, sq_rx_ring fields are protected by the
+ * corresponding squeue, ill_lock* and sq_lock. Holding any
+ * of them will ensure that the ring to squeue mapping does
+ * not change.
+ */
+ ASSERT(!(sqp->sq_state & SQS_DEFAULT));
-void
-squeue_profile_start(void)
-{
-#if SQUEUE_PROFILE
- squeue_profile = B_TRUE;
-#endif
+ sqp->sq_rx_ring = NULL;
+ rx_ring->rr_sqp = NULL;
+
+ sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED |
+ SQS_POLL_QUIESCE_DONE);
+ sqp->sq_ill = NULL;
+
+ rx_ring->rr_rx_handle = NULL;
+ rx_ring->rr_intr_handle = NULL;
+ rx_ring->rr_intr_enable = NULL;
+ rx_ring->rr_intr_disable = NULL;
+ sqp->sq_state |= SQS_POLL_CLEANUP_DONE;
+ } else {
+ sqp->sq_state &= ~SQS_POLL_QUIESCE;
+ sqp->sq_state |= SQS_POLL_QUIESCE_DONE;
+ }
+ /*
+ * Signal any waiter that is waiting for the quiesce or cleanup
+ * to complete and also wait for it to actually see and reset the
+ * SQS_POLL_CLEANUP_DONE.
+ */
+ cv_signal(&sqp->sq_ctrlop_done_cv);
+ mutex_exit(&ill->ill_lock);
+ if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) {
+ cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
+ sqp->sq_state &= ~(SQS_PROC | SQS_WORKER);
+ }
}
-void
-squeue_profile_stop(void)
+static void
+squeue_worker(squeue_t *sqp)
{
-#if SQUEUE_PROFILE
- squeue_profile = B_FALSE;
-#endif
+ kmutex_t *lock = &sqp->sq_lock;
+ kcondvar_t *async = &sqp->sq_worker_cv;
+ callb_cpr_t cprinfo;
+ hrtime_t now;
+
+ CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker");
+ mutex_enter(lock);
+
+ for (;;) {
+ for (;;) {
+ /*
+ * If the poll thread has handed control to us
+ * we need to break out of the wait.
+ */
+ if (sqp->sq_state & SQS_PROC_HELD)
+ break;
+
+ /*
+ * If the squeue is not being processed and we either
+ * have messages to drain or some thread has signaled
+ * some control activity we need to break
+ */
+ if (!(sqp->sq_state & SQS_PROC) &&
+ ((sqp->sq_state & SQS_WORKER_THR_CONTROL) ||
+ (sqp->sq_first != NULL)))
+ break;
+
+ /*
+ * If we have started some control action, then check
+ * for the SQS_WORKER flag (since we don't
+ * release the squeue) to make sure we own the squeue
+ * and break out
+ */
+ if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) &&
+ (sqp->sq_state & SQS_WORKER))
+ break;
+
+ CALLB_CPR_SAFE_BEGIN(&cprinfo);
+ cv_wait(async, lock);
+ CALLB_CPR_SAFE_END(&cprinfo, lock);
+ }
+ if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
+ squeue_worker_thr_control(sqp);
+ continue;
+ }
+ ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
+ SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE |
+ SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL)));
+
+ if (sqp->sq_state & SQS_PROC_HELD)
+ sqp->sq_state &= ~SQS_PROC_HELD;
+
+ now = gethrtime();
+ sqp->sq_run = curthread;
+ sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns);
+ sqp->sq_run = NULL;
+ }
}
uintptr_t *
@@ -1482,9 +1240,3 @@ squeue_getprivate(squeue_t *sqp, sqprivate_t p)
return (&sqp->sq_private[p]);
}
-
-processorid_t
-squeue_binding(squeue_t *sqp)
-{
- return (sqp->sq_bind);
-}