diff options
Diffstat (limited to 'usr/src/uts/common/inet/squeue.c')
| -rw-r--r-- | usr/src/uts/common/inet/squeue.c | 1970 |
1 files changed, 861 insertions, 1109 deletions
diff --git a/usr/src/uts/common/inet/squeue.c b/usr/src/uts/common/inet/squeue.c index 4895e2249e..559abd9178 100644 --- a/usr/src/uts/common/inet/squeue.c +++ b/usr/src/uts/common/inet/squeue.c @@ -19,144 +19,95 @@ * CDDL HEADER END */ /* - * Copyright 2007 Sun Microsystems, Inc. All rights reserved. + * Copyright 2008 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ -#pragma ident "%Z%%M% %I% %E% SMI" - /* - * Squeues - TCP/IP serialization mechanism. - * - * This is a general purpose high-performance serialization mechanism. It is - * similar to a taskq with a single worker thread, the difference is that it - * does not imply a context switch - the thread placing a request may actually - * process it. It is also biased for processing requests in interrupt context. - * - * Each squeue has a worker thread which may optionally be bound to a CPU. - * - * Only one thread may process requests from a given squeue at any time. This is - * called "entering" squeue. - * - * Each dispatched request is processed either by - * - * a) Dispatching thread or - * b) Some other thread that is currently processing squeue at the time of - * request or - * c) worker thread. - * - * INTERFACES: - * - * squeue_t *squeue_create(name, bind, wait, pri) - * - * name: symbolic name for squeue. - * wait: time to wait before waiking the worker thread after queueing - * request. - * bind: preferred CPU binding for the worker thread. - * pri: thread priority for the worker thread. - * - * This function never fails and may sleep. It returns a transparent pointer - * to the squeue_t structure that is passed to all other squeue operations. - * - * void squeue_bind(sqp, bind) - * - * Bind squeue worker thread to a CPU specified by the 'bind' argument. The - * 'bind' value of -1 binds to the preferred thread specified for - * squeue_create. - * - * NOTE: Any value of 'bind' other then -1 is not supported currently, but the - * API is present - in the future it may be useful to specify different - * binding. - * - * void squeue_unbind(sqp) - * - * Unbind the worker thread from its preferred CPU. - * - * void squeue_enter(*sqp, *mp, proc, arg, tag) - * - * Post a single request for processing. Each request consists of mblock 'mp', - * function 'proc' to execute and an argument 'arg' to pass to this - * function. The function is called as (*proc)(arg, mp, sqp); The tag is an - * arbitrary number from 0 to 255 which will be stored in mp to track exact - * caller of squeue_enter. The combination of function name and the tag should - * provide enough information to identify the caller. - * - * If no one is processing the squeue, squeue_enter() will call the function - * immediately. Otherwise it will add the request to the queue for later - * processing. Once the function is executed, the thread may continue - * executing all other requests pending on the queue. + * Squeues: General purpose serialization mechanism + * ------------------------------------------------ * - * NOTE: The tagging information is only used when SQUEUE_DEBUG is set to 1. - * NOTE: The argument can be conn_t only. Ideally we'd like to have generic - * argument, but we want to drop connection reference count here - this - * improves tail-call optimizations. - * XXX: The arg should have type conn_t. + * Background: + * ----------- * - * void squeue_enter_nodrain(*sqp, *mp, proc, arg, tag) + * This is a general purpose high-performance serialization mechanism + * currently used by TCP/IP. It is implement by means of a per CPU queue, + * a worker thread and a polling thread with are bound to the CPU + * associated with the squeue. The squeue is strictly FIFO for both read + * and write side and only one thread can process it at any given time. + * The design goal of squeue was to offer a very high degree of + * parallelization (on a per H/W execution pipeline basis) with at + * most one queuing. * - * Same as squeue_enter(), but the entering thread will only try to execute a - * single request. It will not continue executing any pending requests. + * The modules needing protection typically calls squeue_enter() or + * squeue_enter_chain() routine as soon as a thread enter the module + * from either direction. For each packet, the processing function + * and argument is stored in the mblk itself. When the packet is ready + * to be processed, the squeue retrieves the stored function and calls + * it with the supplied argument and the pointer to the packet itself. + * The called function can assume that no other thread is processing + * the squeue when it is executing. * - * void squeue_fill(*sqp, *mp, proc, arg, tag) + * Squeue/connection binding: + * -------------------------- * - * Just place the request on the queue without trying to execute it. Arrange - * for the worker thread to process the request. + * TCP/IP uses an IP classifier in conjunction with squeue where specific + * connections are assigned to specific squeue (based on various policies), + * at the connection creation time. Once assigned, the connection to + * squeue mapping is never changed and all future packets for that + * connection are processed on that squeue. The connection ("conn") to + * squeue mapping is stored in "conn_t" member "conn_sqp". * - * void squeue_profile_enable(sqp) - * void squeue_profile_disable(sqp) + * Since the processing of the connection cuts across multiple layers + * but still allows packets for different connnection to be processed on + * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or + * "Per Connection Vertical Perimeter". * - * Enable or disable profiling for specified 'sqp'. Profiling is only - * available when SQUEUE_PROFILE is set. + * Processing Model: + * ----------------- * - * void squeue_profile_reset(sqp) + * Squeue doesn't necessary processes packets with its own worker thread. + * The callers can pick if they just want to queue the packet, process + * their packet if nothing is queued or drain and process. The first two + * modes are typically employed when the packet was generated while + * already doing the processing behind the squeue and last mode (drain + * and process) is typically employed when the thread is entering squeue + * for the first time. The squeue still imposes a finite time limit + * for which a external thread can do processing after which it switches + * processing to its own worker thread. * - * Reset all profiling information to zero. Profiling is only - * available when SQUEUE_PROFILE is set. + * Once created, squeues are never deleted. Hence squeue pointers are + * always valid. This means that functions outside the squeue can still + * refer safely to conn_sqp and their is no need for ref counts. * - * void squeue_profile_start() - * void squeue_profile_stop() + * Only a thread executing in the squeue can change the squeue of the + * connection. It does so by calling a squeue framework function to do this. + * After changing the squeue, the thread must leave the squeue. It must not + * continue to execute any code that needs squeue protection. * - * Globally enable or disabled profiling for all squeues. + * The squeue framework, after entering the squeue, checks if the current + * squeue matches the conn_sqp. If the check fails, the packet is delivered + * to right squeue. * - * uintptr_t *squeue_getprivate(sqp, p) + * Polling Model: + * -------------- * - * Each squeue keeps small amount of private data space available for various - * consumers. Current consumers include TCP and NCA. Other consumers need to - * add their private tag to the sqprivate_t enum. The private information is - * limited to an uintptr_t value. The squeue has no knowledge of its content - * and does not manage it in any way. + * Squeues can control the rate of packet arrival into itself from the + * NIC or specific Rx ring within a NIC. As part of capability negotiation + * between IP and MAC layer, squeue are created for each TCP soft ring + * (or TCP Rx ring - to be implemented in future). As part of this + * negotiation, squeues get a cookie for underlying soft ring or Rx + * ring, a function to turn off incoming packets and a function to call + * to poll for packets. This helps schedule the receive side packet + * processing so that queue backlog doesn't build up and packet processing + * doesn't keep getting disturbed by high priority interrupts. As part + * of this mode, as soon as a backlog starts building, squeue turns off + * the interrupts and switches to poll mode. In poll mode, when poll + * thread goes down to retrieve packets, it retrieves them in the form of + * a chain which improves performance even more. As the squeue/softring + * system gets more packets, it gets more efficient by switching to + * polling more often and dealing with larger packet chains. * - * The typical use may be a breakdown of data structures per CPU (since - * squeues are usually per CPU). See NCA for examples of use. - * Currently 'p' may have one legal value SQPRIVATE_TCP. - * - * processorid_t squeue_binding(sqp) - * - * Returns the CPU binding for a given squeue. - * - * TUNABALES: - * - * squeue_intrdrain_ms: Maximum time in ms interrupts spend draining any - * squeue. Note that this is approximation - squeues have no control on the - * time it takes to process each request. This limit is only checked - * between processing individual messages. - * Default: 20 ms. - * - * squeue_writerdrain_ms: Maximum time in ms non-interrupts spend draining any - * squeue. Note that this is approximation - squeues have no control on the - * time it takes to process each request. This limit is only checked - * between processing individual messages. - * Default: 10 ms. - * - * squeue_workerdrain_ms: Maximum time in ms worker thread spends draining any - * squeue. Note that this is approximation - squeues have no control on the - * time it takes to process each request. This limit is only checked - * between processing individual messages. - * Default: 10 ms. - * - * squeue_workerwait_ms: When worker thread is interrupted because workerdrain - * expired, how much time to wait before waking worker thread again. - * Default: 10 ms. */ #include <sys/types.h> @@ -169,208 +120,30 @@ #include <sys/callb.h> #include <sys/sdt.h> #include <sys/ddi.h> +#include <sys/sunddi.h> #include <inet/ipclassifier.h> #include <inet/udp_impl.h> -/* - * State flags. - * Note: The MDB IP module depends on the values of these flags. - */ -#define SQS_PROC 0x0001 /* being processed */ -#define SQS_WORKER 0x0002 /* worker thread */ -#define SQS_ENTER 0x0004 /* enter thread */ -#define SQS_FAST 0x0008 /* enter-fast thread */ -#define SQS_USER 0x0010 /* A non interrupt user */ -#define SQS_BOUND 0x0020 /* Worker thread is bound */ -#define SQS_PROFILE 0x0040 /* Enable profiling */ -#define SQS_REENTER 0x0080 /* Re entered thread */ -#define SQS_TMO_PROG 0x0100 /* Timeout is being set */ - #include <sys/squeue_impl.h> static void squeue_fire(void *); static void squeue_drain(squeue_t *, uint_t, hrtime_t); static void squeue_worker(squeue_t *sqp); - -#if SQUEUE_PROFILE -static kmutex_t squeue_kstat_lock; -static int squeue_kstat_update(kstat_t *, int); -#endif +static void squeue_polling_thread(squeue_t *sqp); kmem_cache_t *squeue_cache; #define SQUEUE_MSEC_TO_NSEC 1000000 -int squeue_intrdrain_ms = 20; -int squeue_writerdrain_ms = 10; -int squeue_workerdrain_ms = 10; -int squeue_workerwait_ms = 10; +int squeue_drain_ms = 20; +int squeue_workerwait_ms = 0; /* The values above converted to ticks or nano seconds */ -static int squeue_intrdrain_ns = 0; -static int squeue_writerdrain_ns = 0; -static int squeue_workerdrain_ns = 0; +static int squeue_drain_ns = 0; static int squeue_workerwait_tick = 0; -/* - * The minimum packet queued when worker thread doing the drain triggers - * polling (if squeue allows it). The choice of 3 is arbitrary. You - * definitely don't want it to be 1 since that will trigger polling - * on very low loads as well (ssh seems to do be one such example - * where packet flow was very low yet somehow 1 packet ended up getting - * queued and worker thread fires every 10ms and blanking also gets - * triggered. - */ -int squeue_worker_poll_min = 3; - -#if SQUEUE_PROFILE -/* - * Set to B_TRUE to enable profiling. - */ -static int squeue_profile = B_FALSE; -#define SQ_PROFILING(sqp) (squeue_profile && ((sqp)->sq_state & SQS_PROFILE)) - -#define SQSTAT(sqp, x) ((sqp)->sq_stats.x++) -#define SQDELTA(sqp, x, d) ((sqp)->sq_stats.x += (d)) - -struct squeue_kstat { - kstat_named_t sq_count; - kstat_named_t sq_max_qlen; - kstat_named_t sq_npackets_worker; - kstat_named_t sq_npackets_intr; - kstat_named_t sq_npackets_other; - kstat_named_t sq_nqueued_intr; - kstat_named_t sq_nqueued_other; - kstat_named_t sq_ndrains_worker; - kstat_named_t sq_ndrains_intr; - kstat_named_t sq_ndrains_other; - kstat_named_t sq_time_worker; - kstat_named_t sq_time_intr; - kstat_named_t sq_time_other; -} squeue_kstat = { - { "count", KSTAT_DATA_UINT64 }, - { "max_qlen", KSTAT_DATA_UINT64 }, - { "packets_worker", KSTAT_DATA_UINT64 }, - { "packets_intr", KSTAT_DATA_UINT64 }, - { "packets_other", KSTAT_DATA_UINT64 }, - { "queued_intr", KSTAT_DATA_UINT64 }, - { "queued_other", KSTAT_DATA_UINT64 }, - { "ndrains_worker", KSTAT_DATA_UINT64 }, - { "ndrains_intr", KSTAT_DATA_UINT64 }, - { "ndrains_other", KSTAT_DATA_UINT64 }, - { "time_worker", KSTAT_DATA_UINT64 }, - { "time_intr", KSTAT_DATA_UINT64 }, - { "time_other", KSTAT_DATA_UINT64 }, -}; -#endif - -#define SQUEUE_WORKER_WAKEUP(sqp) { \ - timeout_id_t tid = (sqp)->sq_tid; \ - \ - ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ - /* \ - * Queue isn't being processed, so take \ - * any post enqueue actions needed before leaving. \ - */ \ - if (tid != 0) { \ - /* \ - * Waiting for an enter() to process mblk(s). \ - */ \ - clock_t waited = lbolt - (sqp)->sq_awaken; \ - \ - if (TICK_TO_MSEC(waited) >= (sqp)->sq_wait) { \ - /* \ - * Times up and have a worker thread \ - * waiting for work, so schedule it. \ - */ \ - (sqp)->sq_tid = 0; \ - (sqp)->sq_awaken = lbolt; \ - cv_signal(&(sqp)->sq_async); \ - mutex_exit(&(sqp)->sq_lock); \ - (void) untimeout(tid); \ - return; \ - } \ - mutex_exit(&(sqp)->sq_lock); \ - return; \ - } else if ((sqp)->sq_state & SQS_TMO_PROG) { \ - mutex_exit(&(sqp)->sq_lock); \ - return; \ - } else if ((sqp)->sq_wait != 0) { \ - clock_t wait = (sqp)->sq_wait; \ - /* \ - * Wait up to sqp->sq_wait ms for an \ - * enter() to process this queue. We \ - * don't want to contend on timeout locks \ - * with sq_lock held for performance reasons, \ - * so drop the sq_lock before calling timeout \ - * but we need to check if timeout is required \ - * after re acquiring the sq_lock. Once \ - * the sq_lock is dropped, someone else could \ - * have processed the packet or the timeout could \ - * have already fired. \ - */ \ - (sqp)->sq_state |= SQS_TMO_PROG; \ - mutex_exit(&(sqp)->sq_lock); \ - tid = timeout(squeue_fire, (sqp), wait); \ - mutex_enter(&(sqp)->sq_lock); \ - /* Check again if we still need the timeout */ \ - if ((((sqp)->sq_state & (SQS_PROC|SQS_TMO_PROG)) == \ - SQS_TMO_PROG) && ((sqp)->sq_tid == 0) && \ - ((sqp)->sq_first != NULL)) { \ - (sqp)->sq_state &= ~SQS_TMO_PROG; \ - (sqp)->sq_awaken = lbolt; \ - (sqp)->sq_tid = tid; \ - mutex_exit(&(sqp)->sq_lock); \ - return; \ - } else { \ - if ((sqp)->sq_state & SQS_TMO_PROG) { \ - (sqp)->sq_state &= ~SQS_TMO_PROG; \ - mutex_exit(&(sqp)->sq_lock); \ - (void) untimeout(tid); \ - } else { \ - /* \ - * The timer fired before we could \ - * reacquire the sq_lock. squeue_fire \ - * removes the SQS_TMO_PROG flag \ - * and we don't need to do anything \ - * else. \ - */ \ - mutex_exit(&(sqp)->sq_lock); \ - } \ - } \ - } else { \ - /* \ - * Schedule the worker thread. \ - */ \ - (sqp)->sq_awaken = lbolt; \ - cv_signal(&(sqp)->sq_async); \ - mutex_exit(&(sqp)->sq_lock); \ - } \ - ASSERT(MUTEX_NOT_HELD(&(sqp)->sq_lock)); \ -} - -#define ENQUEUE_MP(sqp, mp, proc, arg) { \ - /* \ - * Enque our mblk. \ - */ \ - (mp)->b_queue = NULL; \ - ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ - ASSERT((mp)->b_prev == NULL && (mp)->b_next == NULL); \ - (mp)->b_queue = (queue_t *)(proc); \ - (mp)->b_prev = (mblk_t *)(arg); \ - \ - if ((sqp)->sq_last != NULL) \ - (sqp)->sq_last->b_next = (mp); \ - else \ - (sqp)->sq_first = (mp); \ - (sqp)->sq_last = (mp); \ - (sqp)->sq_count++; \ - ASSERT((sqp)->sq_count > 0); \ - DTRACE_PROBE2(squeue__enqueue, squeue_t *, sqp, \ - mblk_t *, mp); \ -} - +#define MAX_BYTES_TO_PICKUP 150000 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \ /* \ @@ -390,89 +163,120 @@ struct squeue_kstat { \ } -#define SQS_POLLING_ON(sqp, rx_ring) { \ - ASSERT(rx_ring != NULL); \ +#define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \ ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ - rx_ring->rr_blank(rx_ring->rr_handle, \ - MIN((sqp->sq_avg_drain_time * sqp->sq_count), \ - rx_ring->rr_max_blank_time), \ - rx_ring->rr_max_pkt_cnt); \ - rx_ring->rr_poll_state |= ILL_POLLING; \ - rx_ring->rr_poll_time = lbolt; \ + if (sq_poll_capable) { \ + ASSERT(rx_ring != NULL); \ + ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ + if (!(sqp->sq_state & SQS_POLLING)) { \ + sqp->sq_state |= SQS_POLLING; \ + rx_ring->rr_intr_disable(rx_ring->rr_intr_handle); \ + } \ + } \ } +#define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \ + ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ + if (sq_poll_capable) { \ + ASSERT(rx_ring != NULL); \ + ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ + if (sqp->sq_state & SQS_POLLING) { \ + sqp->sq_state &= ~SQS_POLLING; \ + rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \ + } \ + } \ +} -#define SQS_POLLING_OFF(sqp, rx_ring) { \ - ASSERT(rx_ring != NULL); \ +#define SQS_POLL_RING(sqp, sq_poll_capable) { \ ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \ - rx_ring->rr_blank(rx_ring->rr_handle, \ - rx_ring->rr_min_blank_time, \ - rx_ring->rr_min_pkt_cnt); \ + if (sq_poll_capable) { \ + ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \ + if (!(sqp->sq_state & SQS_GET_PKTS)) { \ + sqp->sq_state |= SQS_GET_PKTS; \ + cv_signal(&sqp->sq_poll_cv); \ + } \ + } \ } +#ifdef DEBUG +#define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \ + (sqp)->sq_curmp = (mp); \ + (sqp)->sq_curproc = (proc); \ + (sqp)->sq_connp = (connp); \ + (mp)->b_tag = (sqp)->sq_tag = (tag); \ +} + +#define SQUEUE_DBG_CLEAR(sqp) { \ + (sqp)->sq_curmp = NULL; \ + (sqp)->sq_curproc = NULL; \ + (sqp)->sq_connp = NULL; \ +} +#else +#define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) +#define SQUEUE_DBG_CLEAR(sqp) +#endif + void squeue_init(void) { squeue_cache = kmem_cache_create("squeue_cache", sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0); - squeue_intrdrain_ns = squeue_intrdrain_ms * SQUEUE_MSEC_TO_NSEC; - squeue_writerdrain_ns = squeue_writerdrain_ms * SQUEUE_MSEC_TO_NSEC; - squeue_workerdrain_ns = squeue_workerdrain_ms * SQUEUE_MSEC_TO_NSEC; + squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC; squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms); } /* ARGSUSED */ squeue_t * -squeue_create(char *name, processorid_t bind, clock_t wait, pri_t pri) +squeue_create(clock_t wait, pri_t pri) { squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP); bzero(sqp, sizeof (squeue_t)); - (void) strncpy(sqp->sq_name, name, SQ_NAMELEN + 1); - sqp->sq_name[SQ_NAMELEN] = '\0'; - - sqp->sq_bind = bind; + sqp->sq_bind = PBIND_NONE; + sqp->sq_priority = pri; sqp->sq_wait = MSEC_TO_TICK(wait); - sqp->sq_avg_drain_time = - drv_hztousec(NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns)) / - NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns); - -#if SQUEUE_PROFILE - if ((sqp->sq_kstat = kstat_create("ip", bind, name, - "net", KSTAT_TYPE_NAMED, - sizeof (squeue_kstat) / sizeof (kstat_named_t), - KSTAT_FLAG_VIRTUAL)) != NULL) { - sqp->sq_kstat->ks_lock = &squeue_kstat_lock; - sqp->sq_kstat->ks_data = &squeue_kstat; - sqp->sq_kstat->ks_update = squeue_kstat_update; - sqp->sq_kstat->ks_private = sqp; - kstat_install(sqp->sq_kstat); - } -#endif - sqp->sq_worker = thread_create(NULL, 0, squeue_worker, sqp, 0, &p0, TS_RUN, pri); + sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread, + sqp, 0, &p0, TS_RUN, pri); + + sqp->sq_enter = squeue_enter; + sqp->sq_drain = squeue_drain; + return (sqp); } -/* ARGSUSED */ +/* + * Bind squeue worker thread to the specified CPU, given by CPU id. + * If the CPU id value is -1, bind the worker thread to the value + * specified in sq_bind field. If a thread is already bound to a + * different CPU, unbind it from the old CPU and bind to the new one. + */ + void squeue_bind(squeue_t *sqp, processorid_t bind) { - ASSERT(bind == -1); - mutex_enter(&sqp->sq_lock); + ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE); + ASSERT(MUTEX_HELD(&cpu_lock)); + if (sqp->sq_state & SQS_BOUND) { - mutex_exit(&sqp->sq_lock); - return; + if (sqp->sq_bind == bind) { + mutex_exit(&sqp->sq_lock); + return; + } + thread_affinity_clear(sqp->sq_worker); + } else { + sqp->sq_state |= SQS_BOUND; } - sqp->sq_state |= SQS_BOUND; - mutex_exit(&sqp->sq_lock); + if (bind != PBIND_NONE) + sqp->sq_bind = bind; thread_affinity_set(sqp->sq_worker, sqp->sq_bind); + mutex_exit(&sqp->sq_lock); } void @@ -485,9 +289,98 @@ squeue_unbind(squeue_t *sqp) } sqp->sq_state &= ~SQS_BOUND; + thread_affinity_clear(sqp->sq_worker); mutex_exit(&sqp->sq_lock); +} - thread_affinity_clear(sqp->sq_worker); +void +squeue_worker_wakeup(squeue_t *sqp) +{ + timeout_id_t tid = (sqp)->sq_tid; + + ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); + + if (sqp->sq_wait == 0) { + ASSERT(tid == 0); + ASSERT(!(sqp->sq_state & SQS_TMO_PROG)); + sqp->sq_awaken = lbolt; + cv_signal(&sqp->sq_worker_cv); + mutex_exit(&sqp->sq_lock); + return; + } + + /* + * Queue isn't being processed, so take + * any post enqueue actions needed before leaving. + */ + if (tid != 0) { + /* + * Waiting for an enter() to process mblk(s). + */ + clock_t waited = lbolt - sqp->sq_awaken; + + if (TICK_TO_MSEC(waited) >= sqp->sq_wait) { + /* + * Times up and have a worker thread + * waiting for work, so schedule it. + */ + sqp->sq_tid = 0; + sqp->sq_awaken = lbolt; + cv_signal(&sqp->sq_worker_cv); + mutex_exit(&sqp->sq_lock); + (void) untimeout(tid); + return; + } + mutex_exit(&sqp->sq_lock); + return; + } else if (sqp->sq_state & SQS_TMO_PROG) { + mutex_exit(&sqp->sq_lock); + return; + } else { + clock_t wait = sqp->sq_wait; + /* + * Wait up to sqp->sq_wait ms for an + * enter() to process this queue. We + * don't want to contend on timeout locks + * with sq_lock held for performance reasons, + * so drop the sq_lock before calling timeout + * but we need to check if timeout is required + * after re acquiring the sq_lock. Once + * the sq_lock is dropped, someone else could + * have processed the packet or the timeout could + * have already fired. + */ + sqp->sq_state |= SQS_TMO_PROG; + mutex_exit(&sqp->sq_lock); + tid = timeout(squeue_fire, sqp, wait); + mutex_enter(&sqp->sq_lock); + /* Check again if we still need the timeout */ + if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) == + SQS_TMO_PROG) && (sqp->sq_tid == 0) && + (sqp->sq_first != NULL)) { + sqp->sq_state &= ~SQS_TMO_PROG; + sqp->sq_tid = tid; + mutex_exit(&sqp->sq_lock); + return; + } else { + if (sqp->sq_state & SQS_TMO_PROG) { + sqp->sq_state &= ~SQS_TMO_PROG; + mutex_exit(&sqp->sq_lock); + (void) untimeout(tid); + } else { + /* + * The timer fired before we could + * reacquire the sq_lock. squeue_fire + * removes the SQS_TMO_PROG flag + * and we don't need to do anything + * else. + */ + mutex_exit(&sqp->sq_lock); + } + } + } + + ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); } /* @@ -500,18 +393,20 @@ squeue_unbind(squeue_t *sqp) * * The proc and arg for each mblk is already stored in the mblk in * appropriate places. + * + * The process_flag specifies if we are allowed to process the mblk + * and drain in the entering thread context. If process_flag is + * SQ_FILL, then we just queue the mblk and return (after signaling + * the worker thread if no one else is processing the squeue). */ +/* ARGSUSED */ void -squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail, - uint32_t cnt, uint8_t tag) +squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt, + int process_flag, uint8_t tag) { - int interrupt = servicing_interrupt(); - void *arg; + conn_t *connp; sqproc_t proc; hrtime_t now; -#if SQUEUE_PROFILE - hrtime_t start, delta; -#endif ASSERT(sqp != NULL); ASSERT(mp != NULL); @@ -520,355 +415,111 @@ squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail, ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); mutex_enter(&sqp->sq_lock); - if (!(sqp->sq_state & SQS_PROC)) { + + /* + * Try to process the packet if SQ_FILL flag is not set and + * we are allowed to process the squeue. The SQ_NODRAIN is + * ignored if the packet chain consists of more than 1 packet. + */ + if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) || + (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) { /* * See if anything is already queued. If we are the * first packet, do inline processing else queue the * packet and do the drain. */ - sqp->sq_run = curthread; if (sqp->sq_first == NULL && cnt == 1) { /* * Fast-path, ok to process and nothing queued. */ sqp->sq_state |= (SQS_PROC|SQS_FAST); + sqp->sq_run = curthread; mutex_exit(&sqp->sq_lock); /* * We are the chain of 1 packet so * go through this fast path. */ - arg = mp->b_prev; + ASSERT(mp->b_prev != NULL); + ASSERT(mp->b_queue != NULL); + connp = (conn_t *)mp->b_prev; mp->b_prev = NULL; proc = (sqproc_t)mp->b_queue; mp->b_queue = NULL; - - ASSERT(proc != NULL); - ASSERT(arg != NULL); + ASSERT(proc != NULL && connp != NULL); ASSERT(mp->b_next == NULL); -#if SQUEUE_DEBUG - sqp->sq_isintr = interrupt; - sqp->sq_curmp = mp; - sqp->sq_curproc = proc; - sqp->sq_connp = arg; - mp->b_tag = sqp->sq_tag = tag; -#endif -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - if (interrupt) - SQSTAT(sqp, sq_npackets_intr); - else - SQSTAT(sqp, sq_npackets_other); - start = gethrtime(); - } -#endif - ((conn_t *)arg)->conn_on_sqp = B_TRUE; - DTRACE_PROBE3(squeue__proc__start, squeue_t *, - sqp, mblk_t *, mp, conn_t *, arg); - (*proc)(arg, mp, sqp); - DTRACE_PROBE2(squeue__proc__end, squeue_t *, - sqp, conn_t *, arg); - ((conn_t *)arg)->conn_on_sqp = B_FALSE; - -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - delta = gethrtime() - start; - if (interrupt) - SQDELTA(sqp, sq_time_intr, delta); - else - SQDELTA(sqp, sq_time_other, delta); - } -#endif -#if SQUEUE_DEBUG - sqp->sq_curmp = NULL; - sqp->sq_curproc = NULL; - sqp->sq_connp = NULL; - sqp->sq_isintr = 0; -#endif - - CONN_DEC_REF((conn_t *)arg); - ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); - mutex_enter(&sqp->sq_lock); - sqp->sq_state &= ~(SQS_PROC|SQS_FAST); - if (sqp->sq_first == NULL) { - /* - * We processed inline our packet and - * nothing new has arrived. We are done. - */ - sqp->sq_run = NULL; - mutex_exit(&sqp->sq_lock); - return; - } else if (sqp->sq_bind != CPU->cpu_id) { - /* - * If the current thread is not running - * on the CPU to which this squeue is bound, - * then don't allow it to drain. - */ - sqp->sq_run = NULL; - SQUEUE_WORKER_WAKEUP(sqp); - return; - } - } else { - ENQUEUE_CHAIN(sqp, mp, tail, cnt); -#if SQUEUE_DEBUG - mp->b_tag = tag; -#endif -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - if (servicing_interrupt()) - SQSTAT(sqp, sq_nqueued_intr); - else - SQSTAT(sqp, sq_nqueued_other); - if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) - sqp->sq_stats.sq_max_qlen = - sqp->sq_count; - } -#endif - } - - /* - * We are here because either we couldn't do inline - * processing (because something was already queued), - * or we had a chanin of more than one packet, - * or something else arrived after we were done with - * inline processing. - */ - ASSERT(MUTEX_HELD(&sqp->sq_lock)); - ASSERT(sqp->sq_first != NULL); - -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - start = gethrtime(); - } -#endif -#if SQUEUE_DEBUG - sqp->sq_isintr = interrupt; -#endif - - now = gethrtime(); - if (interrupt) { - squeue_drain(sqp, SQS_ENTER, now + - squeue_intrdrain_ns); - } else { - squeue_drain(sqp, SQS_USER, now + - squeue_writerdrain_ns); - } - -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - delta = gethrtime() - start; - if (interrupt) - SQDELTA(sqp, sq_time_intr, delta); - else - SQDELTA(sqp, sq_time_other, delta); - } -#endif -#if SQUEUE_DEBUG - sqp->sq_isintr = 0; -#endif - - /* - * If we didn't do a complete drain, the worker - * thread was already signalled by squeue_drain. - */ - sqp->sq_run = NULL; - mutex_exit(&sqp->sq_lock); - return; - } else { - ASSERT(sqp->sq_run != NULL); - /* - * Queue is already being processed. Just enqueue - * the packet and go away. - */ -#if SQUEUE_DEBUG - mp->b_tag = tag; -#endif -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - if (servicing_interrupt()) - SQSTAT(sqp, sq_nqueued_intr); - else - SQSTAT(sqp, sq_nqueued_other); - if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) - sqp->sq_stats.sq_max_qlen = sqp->sq_count; - } -#endif - - ENQUEUE_CHAIN(sqp, mp, tail, cnt); - mutex_exit(&sqp->sq_lock); - return; - } -} - -/* - * squeue_enter() - enter squeue *sqp with mblk *mp with argument of *arg. - */ -void -squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, - uint8_t tag) -{ - int interrupt = servicing_interrupt(); - hrtime_t now; -#if SQUEUE_PROFILE - hrtime_t start, delta; -#endif -#if SQUEUE_DEBUG - conn_t *connp = (conn_t *)arg; - ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); - ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); -#endif - - ASSERT(proc != NULL); - ASSERT(sqp != NULL); - ASSERT(mp != NULL); - ASSERT(mp->b_next == NULL); - ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); - - mutex_enter(&sqp->sq_lock); - if (!(sqp->sq_state & SQS_PROC)) { - /* - * See if anything is already queued. If we are the - * first packet, do inline processing else queue the - * packet and do the drain. - */ - sqp->sq_run = curthread; - if (sqp->sq_first == NULL) { /* - * Fast-path, ok to process and nothing queued. + * Handle squeue switching. More details in the + * block comment at the top of the file */ - sqp->sq_state |= (SQS_PROC|SQS_FAST); - mutex_exit(&sqp->sq_lock); - -#if SQUEUE_DEBUG - sqp->sq_isintr = interrupt; - sqp->sq_curmp = mp; - sqp->sq_curproc = proc; - sqp->sq_connp = connp; - mp->b_tag = sqp->sq_tag = tag; -#endif -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - if (interrupt) - SQSTAT(sqp, sq_npackets_intr); - else - SQSTAT(sqp, sq_npackets_other); - start = gethrtime(); + if (connp->conn_sqp == sqp) { + SQUEUE_DBG_SET(sqp, mp, proc, connp, + tag); + connp->conn_on_sqp = B_TRUE; + DTRACE_PROBE3(squeue__proc__start, squeue_t *, + sqp, mblk_t *, mp, conn_t *, connp); + (*proc)(connp, mp, sqp); + DTRACE_PROBE2(squeue__proc__end, squeue_t *, + sqp, conn_t *, connp); + connp->conn_on_sqp = B_FALSE; + SQUEUE_DBG_CLEAR(sqp); + CONN_DEC_REF(connp); + } else { + SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, + connp, SQ_FILL, SQTAG_SQUEUE_CHANGE); } -#endif - ((conn_t *)arg)->conn_on_sqp = B_TRUE; - DTRACE_PROBE3(squeue__proc__start, squeue_t *, - sqp, mblk_t *, mp, conn_t *, arg); - (*proc)(arg, mp, sqp); - DTRACE_PROBE2(squeue__proc__end, squeue_t *, - sqp, conn_t *, arg); - ((conn_t *)arg)->conn_on_sqp = B_FALSE; - -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - delta = gethrtime() - start; - if (interrupt) - SQDELTA(sqp, sq_time_intr, delta); - else - SQDELTA(sqp, sq_time_other, delta); - } -#endif -#if SQUEUE_DEBUG - sqp->sq_curmp = NULL; - sqp->sq_curproc = NULL; - sqp->sq_connp = NULL; - sqp->sq_isintr = 0; -#endif - - CONN_DEC_REF((conn_t *)arg); ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); mutex_enter(&sqp->sq_lock); sqp->sq_state &= ~(SQS_PROC|SQS_FAST); - if (sqp->sq_first == NULL) { + sqp->sq_run = NULL; + if (sqp->sq_first == NULL || + process_flag == SQ_NODRAIN) { + if (sqp->sq_first != NULL) { + squeue_worker_wakeup(sqp); + return; + } /* - * We processed inline our packet and - * nothing new has arrived. We are done. + * We processed inline our packet and nothing + * new has arrived. We are done. In case any + * control actions are pending, wake up the + * worker. */ - sqp->sq_run = NULL; + if (sqp->sq_state & SQS_WORKER_THR_CONTROL) + cv_signal(&sqp->sq_worker_cv); mutex_exit(&sqp->sq_lock); return; - } else if (sqp->sq_bind != CPU->cpu_id) { - /* - * If the current thread is not running - * on the CPU to which this squeue is bound, - * then don't allow it to drain. - */ - sqp->sq_run = NULL; - SQUEUE_WORKER_WAKEUP(sqp); - return; } } else { - ENQUEUE_MP(sqp, mp, proc, arg); -#if SQUEUE_DEBUG + ENQUEUE_CHAIN(sqp, mp, tail, cnt); +#ifdef DEBUG mp->b_tag = tag; #endif -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - if (servicing_interrupt()) - SQSTAT(sqp, sq_nqueued_intr); - else - SQSTAT(sqp, sq_nqueued_other); - if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) - sqp->sq_stats.sq_max_qlen = - sqp->sq_count; - } -#endif } - /* * We are here because either we couldn't do inline - * processing (because something was already queued) + * processing (because something was already queued), + * or we had a chain of more than one packet, * or something else arrived after we were done with * inline processing. */ ASSERT(MUTEX_HELD(&sqp->sq_lock)); ASSERT(sqp->sq_first != NULL); - -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - start = gethrtime(); - } -#endif -#if SQUEUE_DEBUG - sqp->sq_isintr = interrupt; -#endif - now = gethrtime(); - if (interrupt) { - squeue_drain(sqp, SQS_ENTER, now + - squeue_intrdrain_ns); - } else { - squeue_drain(sqp, SQS_USER, now + - squeue_writerdrain_ns); - } - -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - delta = gethrtime() - start; - if (interrupt) - SQDELTA(sqp, sq_time_intr, delta); - else - SQDELTA(sqp, sq_time_other, delta); - } -#endif -#if SQUEUE_DEBUG - sqp->sq_isintr = 0; -#endif + sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns); /* * If we didn't do a complete drain, the worker * thread was already signalled by squeue_drain. + * In case any control actions are pending, wake + * up the worker. */ sqp->sq_run = NULL; + if (sqp->sq_state & SQS_WORKER_THR_CONTROL) + cv_signal(&sqp->sq_worker_cv); mutex_exit(&sqp->sq_lock); return; } else { - ASSERT(sqp->sq_run != NULL); /* * We let a thread processing a squeue reenter only * once. This helps the case of incoming connection @@ -878,168 +529,42 @@ squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, * loopback connection where the two ends are bound * to the same squeue (which is typical on single * CPU machines). + * * We let the thread reenter only once for the fear * of stack getting blown with multiple traversal. */ + connp = (conn_t *)mp->b_prev; if (!(sqp->sq_state & SQS_REENTER) && - (sqp->sq_run == curthread) && sqp->sq_first == NULL && - (((conn_t *)arg)->conn_on_sqp == B_FALSE)) { + (process_flag != SQ_FILL) && (sqp->sq_first == NULL) && + (sqp->sq_run == curthread) && (cnt == 1) && + (connp->conn_on_sqp == B_FALSE)) { sqp->sq_state |= SQS_REENTER; mutex_exit(&sqp->sq_lock); - ((conn_t *)arg)->conn_on_sqp = B_TRUE; - DTRACE_PROBE3(squeue__proc__start, squeue_t *, - sqp, mblk_t *, mp, conn_t *, arg); - (*proc)(arg, mp, sqp); - DTRACE_PROBE2(squeue__proc__end, squeue_t *, - sqp, conn_t *, arg); - ((conn_t *)arg)->conn_on_sqp = B_FALSE; - CONN_DEC_REF((conn_t *)arg); - - mutex_enter(&sqp->sq_lock); - sqp->sq_state &= ~SQS_REENTER; - mutex_exit(&sqp->sq_lock); - return; - } - /* - * Queue is already being processed. Just enqueue - * the packet and go away. - */ -#if SQUEUE_DEBUG - mp->b_tag = tag; -#endif -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - if (servicing_interrupt()) - SQSTAT(sqp, sq_nqueued_intr); - else - SQSTAT(sqp, sq_nqueued_other); - if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) - sqp->sq_stats.sq_max_qlen = sqp->sq_count; - } -#endif - - ENQUEUE_MP(sqp, mp, proc, arg); - mutex_exit(&sqp->sq_lock); - return; - } -} - -void -squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, - uint8_t tag) -{ - int interrupt = servicing_interrupt(); - boolean_t being_processed; -#if SQUEUE_DEBUG - conn_t *connp = (conn_t *)arg; -#endif -#if SQUEUE_PROFILE - hrtime_t start, delta; -#endif + ASSERT(mp->b_prev != NULL); + ASSERT(mp->b_queue != NULL); - ASSERT(proc != NULL); - ASSERT(sqp != NULL); - ASSERT(mp != NULL); - ASSERT(mp->b_next == NULL); - ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); - ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); - ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); - - mutex_enter(&sqp->sq_lock); - - being_processed = (sqp->sq_state & SQS_PROC); - if (!being_processed && (sqp->sq_first == NULL)) { - /* - * Fast-path, ok to process and nothing queued. - */ - sqp->sq_state |= (SQS_PROC|SQS_FAST); - sqp->sq_run = curthread; - mutex_exit(&sqp->sq_lock); - -#if SQUEUE_DEBUG - sqp->sq_isintr = interrupt; - sqp->sq_curmp = mp; - sqp->sq_curproc = proc; - sqp->sq_connp = connp; - mp->b_tag = sqp->sq_tag = tag; -#endif - -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - if (interrupt) - SQSTAT(sqp, sq_npackets_intr); - else - SQSTAT(sqp, sq_npackets_other); - start = gethrtime(); - } -#endif - - ((conn_t *)arg)->conn_on_sqp = B_TRUE; - DTRACE_PROBE3(squeue__proc__start, squeue_t *, - sqp, mblk_t *, mp, conn_t *, arg); - (*proc)(arg, mp, sqp); - DTRACE_PROBE2(squeue__proc__end, squeue_t *, - sqp, conn_t *, arg); - ((conn_t *)arg)->conn_on_sqp = B_FALSE; - -#if SQUEUE_DEBUG - sqp->sq_curmp = NULL; - sqp->sq_curproc = NULL; - sqp->sq_connp = NULL; - sqp->sq_isintr = 0; -#endif -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - delta = gethrtime() - start; - if (interrupt) - SQDELTA(sqp, sq_time_intr, delta); - else - SQDELTA(sqp, sq_time_other, delta); - } -#endif + mp->b_prev = NULL; + proc = (sqproc_t)mp->b_queue; + mp->b_queue = NULL; - CONN_DEC_REF((conn_t *)arg); - mutex_enter(&sqp->sq_lock); - sqp->sq_state &= ~(SQS_PROC|SQS_FAST); - sqp->sq_run = NULL; - if (sqp->sq_first == NULL) { /* - * We processed inline our packet and - * nothing new has arrived. We are done. + * Handle squeue switching. More details in the + * block comment at the top of the file */ - mutex_exit(&sqp->sq_lock); - } else { - SQUEUE_WORKER_WAKEUP(sqp); - } - return; - } else { - /* - * We let a thread processing a squeue reenter only - * once. This helps the case of incoming connection - * where a SYN-ACK-ACK that triggers the conn_ind - * doesn't have to queue the packet if listener and - * eager are on the same squeue. Also helps the - * loopback connection where the two ends are bound - * to the same squeue (which is typical on single - * CPU machines). - * We let the thread reenter only once for the fear - * of stack getting blown with multiple traversal. - */ - if (being_processed && !(sqp->sq_state & SQS_REENTER) && - (sqp->sq_run == curthread) && sqp->sq_first == NULL && - (((conn_t *)arg)->conn_on_sqp == B_FALSE)) { - sqp->sq_state |= SQS_REENTER; - mutex_exit(&sqp->sq_lock); - - ((conn_t *)arg)->conn_on_sqp = B_TRUE; - DTRACE_PROBE3(squeue__proc__start, squeue_t *, - sqp, mblk_t *, mp, conn_t *, arg); - (*proc)(arg, mp, sqp); - DTRACE_PROBE2(squeue__proc__end, squeue_t *, - sqp, conn_t *, arg); - ((conn_t *)arg)->conn_on_sqp = B_FALSE; - CONN_DEC_REF((conn_t *)arg); + if (connp->conn_sqp == sqp) { + connp->conn_on_sqp = B_TRUE; + DTRACE_PROBE3(squeue__proc__start, squeue_t *, + sqp, mblk_t *, mp, conn_t *, connp); + (*proc)(connp, mp, sqp); + DTRACE_PROBE2(squeue__proc__end, squeue_t *, + sqp, conn_t *, connp); + connp->conn_on_sqp = B_FALSE; + CONN_DEC_REF(connp); + } else { + SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, + connp, SQ_FILL, SQTAG_SQUEUE_CHANGE); + } mutex_enter(&sqp->sq_lock); sqp->sq_state &= ~SQS_REENTER; @@ -1047,80 +572,32 @@ squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg, return; } -#if SQUEUE_DEBUG + /* + * Queue is already being processed or there is already + * one or more paquets on the queue. Enqueue the + * packet and wakeup the squeue worker thread if the + * squeue is not being processed. + */ +#ifdef DEBUG mp->b_tag = tag; #endif -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - if (servicing_interrupt()) - SQSTAT(sqp, sq_nqueued_intr); - else - SQSTAT(sqp, sq_nqueued_other); - if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) - sqp->sq_stats.sq_max_qlen = sqp->sq_count; - } -#endif - ENQUEUE_MP(sqp, mp, proc, arg); - if (being_processed) { - /* - * Queue is already being processed. - * No need to do anything. - */ - mutex_exit(&sqp->sq_lock); + + ENQUEUE_CHAIN(sqp, mp, tail, cnt); + if (!(sqp->sq_state & SQS_PROC)) { + squeue_worker_wakeup(sqp); return; } - SQUEUE_WORKER_WAKEUP(sqp); - } -} - -/* - * squeue_fill() - fill squeue *sqp with mblk *mp with argument of *arg - * without processing the squeue. - */ -/* ARGSUSED */ -void -squeue_fill(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void * arg, - uint8_t tag) -{ -#if SQUEUE_DEBUG - conn_t *connp = (conn_t *)arg; -#endif - ASSERT(proc != NULL); - ASSERT(sqp != NULL); - ASSERT(mp != NULL); - ASSERT(mp->b_next == NULL); - ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp); - ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp); - - ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock)); - mutex_enter(&sqp->sq_lock); - ENQUEUE_MP(sqp, mp, proc, arg); -#if SQUEUE_DEBUG - mp->b_tag = tag; -#endif -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - if (servicing_interrupt()) - SQSTAT(sqp, sq_nqueued_intr); - else - SQSTAT(sqp, sq_nqueued_other); - if (sqp->sq_stats.sq_max_qlen < sqp->sq_count) - sqp->sq_stats.sq_max_qlen = sqp->sq_count; - } -#endif - - /* - * If queue is already being processed. No need to do anything. - */ - if (sqp->sq_state & SQS_PROC) { + /* + * In case any control actions are pending, wake + * up the worker. + */ + if (sqp->sq_state & SQS_WORKER_THR_CONTROL) + cv_signal(&sqp->sq_worker_cv); mutex_exit(&sqp->sq_lock); return; } - - SQUEUE_WORKER_WAKEUP(sqp); } - /* * PRIVATE FUNCTIONS */ @@ -1151,7 +628,7 @@ squeue_fire(void *arg) if (!(state & SQS_PROC)) { sqp->sq_awaken = lbolt; - cv_signal(&sqp->sq_async); + cv_signal(&sqp->sq_worker_cv); } mutex_exit(&sqp->sq_lock); } @@ -1159,64 +636,52 @@ squeue_fire(void *arg) static void squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire) { - mblk_t *mp; - mblk_t *head; - sqproc_t proc; - conn_t *connp; - clock_t start = lbolt; - clock_t drain_time; - timeout_id_t tid; - uint_t cnt; - uint_t total_cnt = 0; + mblk_t *mp; + mblk_t *head; + sqproc_t proc; + conn_t *connp; + timeout_id_t tid; ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring; - int interrupt = servicing_interrupt(); - boolean_t poll_on = B_FALSE; - hrtime_t now; + hrtime_t now; + boolean_t did_wakeup = B_FALSE; + boolean_t sq_poll_capable; + sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0; +again: ASSERT(mutex_owned(&sqp->sq_lock)); - ASSERT(!(sqp->sq_state & SQS_PROC)); - -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - if (interrupt) - SQSTAT(sqp, sq_ndrains_intr); - else if (!(proc_type & SQS_WORKER)) - SQSTAT(sqp, sq_ndrains_other); - else - SQSTAT(sqp, sq_ndrains_worker); - } -#endif + ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | + SQS_POLL_QUIESCE_DONE))); + + head = sqp->sq_first; + sqp->sq_first = NULL; + sqp->sq_last = NULL; + sqp->sq_count = 0; if ((tid = sqp->sq_tid) != 0) sqp->sq_tid = 0; sqp->sq_state |= SQS_PROC | proc_type; - head = sqp->sq_first; - sqp->sq_first = NULL; - sqp->sq_last = NULL; - cnt = sqp->sq_count; + /* * We have backlog built up. Switch to polling mode if the - * device underneath allows it. Need to do it only for - * drain by non-interrupt thread so interrupts don't - * come and disrupt us in between. If its a interrupt thread, - * no need because most devices will not issue another - * interrupt till this one returns. + * device underneath allows it. Need to do it so that + * more packets don't come in and disturb us (by contending + * for sq_lock or higher priority thread preempting us). + * + * The worker thread is allowed to do active polling while we + * just disable the interrupts for drain by non worker (kernel + * or userland) threads so they can peacefully process the + * packets during time allocated to them. */ - if ((sqp->sq_state & SQS_POLL_CAPAB) && !(proc_type & SQS_ENTER) && - (sqp->sq_count > squeue_worker_poll_min)) { - ASSERT(sq_rx_ring != NULL); - SQS_POLLING_ON(sqp, sq_rx_ring); - poll_on = B_TRUE; - } - + SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring); mutex_exit(&sqp->sq_lock); if (tid != 0) (void) untimeout(tid); -again: + while ((mp = head) != NULL) { + head = mp->b_next; mp->b_next = NULL; @@ -1224,255 +689,548 @@ again: mp->b_queue = NULL; connp = (conn_t *)mp->b_prev; mp->b_prev = NULL; -#if SQUEUE_DEBUG - sqp->sq_curmp = mp; - sqp->sq_curproc = proc; - sqp->sq_connp = connp; - sqp->sq_tag = mp->b_tag; -#endif -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - if (interrupt) - SQSTAT(sqp, sq_npackets_intr); - else if (!(proc_type & SQS_WORKER)) - SQSTAT(sqp, sq_npackets_other); - else - SQSTAT(sqp, sq_npackets_worker); + /* + * Handle squeue switching. More details in the + * block comment at the top of the file + */ + if (connp->conn_sqp == sqp) { + SQUEUE_DBG_SET(sqp, mp, proc, connp, + mp->b_tag); + connp->conn_on_sqp = B_TRUE; + DTRACE_PROBE3(squeue__proc__start, squeue_t *, + sqp, mblk_t *, mp, conn_t *, connp); + (*proc)(connp, mp, sqp); + DTRACE_PROBE2(squeue__proc__end, squeue_t *, + sqp, conn_t *, connp); + connp->conn_on_sqp = B_FALSE; + CONN_DEC_REF(connp); + } else { + SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, + SQ_FILL, SQTAG_SQUEUE_CHANGE); } -#endif - - connp->conn_on_sqp = B_TRUE; - DTRACE_PROBE3(squeue__proc__start, squeue_t *, - sqp, mblk_t *, mp, conn_t *, connp); - (*proc)(connp, mp, sqp); - DTRACE_PROBE2(squeue__proc__end, squeue_t *, - sqp, conn_t *, connp); - connp->conn_on_sqp = B_FALSE; - CONN_DEC_REF(connp); } - -#if SQUEUE_DEBUG - sqp->sq_curmp = NULL; - sqp->sq_curproc = NULL; - sqp->sq_connp = NULL; -#endif + SQUEUE_DBG_CLEAR(sqp); mutex_enter(&sqp->sq_lock); - sqp->sq_count -= cnt; - total_cnt += cnt; + /* + * Check if there is still work to do (either more arrived or timer + * expired). If we are the worker thread and we are polling capable, + * continue doing the work since no one else is around to do the + * work anyway (but signal the poll thread to retrieve some packets + * in the meanwhile). If we are not the worker thread, just + * signal the worker thread to take up the work if processing time + * has expired. + */ if (sqp->sq_first != NULL) { - - now = gethrtime(); - if (!expire || (now < expire)) { - /* More arrived and time not expired */ - head = sqp->sq_first; - sqp->sq_first = NULL; - sqp->sq_last = NULL; - cnt = sqp->sq_count; - mutex_exit(&sqp->sq_lock); - goto again; - } - /* - * If we are not worker thread and we - * reached our time limit to do drain, - * signal the worker thread to pick - * up the work. - * If we were the worker thread, then - * we take a break to allow an interrupt - * or writer to pick up the load. + * Still more to process. If time quanta not expired, we + * should let the drain go on. The worker thread is allowed + * to drain as long as there is anything left. */ - if (proc_type != SQS_WORKER) { + now = gethrtime(); + if ((now < expire) || (proc_type == SQS_WORKER)) { + /* + * If time not expired or we are worker thread and + * this squeue is polling capable, continue to do + * the drain. + * + * We turn off interrupts for all userland threads + * doing drain but we do active polling only for + * worker thread. + */ + if (proc_type == SQS_WORKER) + SQS_POLL_RING(sqp, sq_poll_capable); + goto again; + } else { + did_wakeup = B_TRUE; sqp->sq_awaken = lbolt; - cv_signal(&sqp->sq_async); + cv_signal(&sqp->sq_worker_cv); } } /* - * Try to see if we can get a time estimate to process a packet. - * Do it only in interrupt context since less chance of context - * switch or pinning etc. to get a better estimate. + * If the poll thread is already running, just return. The + * poll thread continues to hold the proc and will finish + * processing. */ - if (interrupt && ((drain_time = (lbolt - start)) > 0)) - sqp->sq_avg_drain_time = ((80 * sqp->sq_avg_drain_time) + - (20 * (drv_hztousec(drain_time)/total_cnt)))/100; - - sqp->sq_state &= ~(SQS_PROC | proc_type); + if (sqp->sq_state & SQS_GET_PKTS) { + ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | + SQS_POLL_QUIESCE_DONE))); + sqp->sq_state &= ~proc_type; + return; + } /* - * If polling was turned on, turn it off and reduce the default - * interrupt blank interval as well to bring new packets in faster - * (reduces the latency when there is no backlog). + * + * If we are the worker thread and no work is left, send the poll + * thread down once more to see if something arrived. Otherwise, + * turn the interrupts back on and we are done. */ - if (poll_on && (sqp->sq_state & SQS_POLL_CAPAB)) { - ASSERT(sq_rx_ring != NULL); - SQS_POLLING_OFF(sqp, sq_rx_ring); + if ((proc_type == SQS_WORKER) && + (sqp->sq_state & SQS_POLL_CAPAB)) { + /* + * Do one last check to see if anything arrived + * in the NIC. We leave the SQS_PROC set to ensure + * that poll thread keeps the PROC and can decide + * if it needs to turn polling off or continue + * processing. + * + * If we drop the SQS_PROC here and poll thread comes + * up empty handed, it can not safely turn polling off + * since someone else could have acquired the PROC + * and started draining. The previously running poll + * thread and the current thread doing drain would end + * up in a race for turning polling on/off and more + * complex code would be required to deal with it. + * + * Its lot simpler for drain to hand the SQS_PROC to + * poll thread (if running) and let poll thread finish + * without worrying about racing with any other thread. + */ + ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | + SQS_POLL_QUIESCE_DONE))); + SQS_POLL_RING(sqp, sq_poll_capable); + sqp->sq_state &= ~proc_type; + } else { + /* + * The squeue is either not capable of polling or + * poll thread already finished processing and didn't + * find anything. Since there is nothing queued and + * we already turn polling on (for all threads doing + * drain), we should turn polling off and relinquish + * the PROC. + */ + ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | + SQS_POLL_QUIESCE_DONE))); + SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring); + sqp->sq_state &= ~(SQS_PROC | proc_type); + if (!did_wakeup && sqp->sq_first != NULL) { + squeue_worker_wakeup(sqp); + mutex_enter(&sqp->sq_lock); + } + /* + * If we are not the worker and there is a pending quiesce + * event, wake up the worker + */ + if ((proc_type != SQS_WORKER) && + (sqp->sq_state & SQS_WORKER_THR_CONTROL)) + cv_signal(&sqp->sq_worker_cv); } } +/* + * Quiesce, Restart, or Cleanup of the squeue poll thread. + * + * Quiesce and Restart: After an squeue poll thread has been quiesced, it does + * not attempt to poll the underlying soft ring any more. The quiesce is + * triggered by the mac layer when it wants to quiesce a soft ring. Typically + * control operations such as changing the fanout of a NIC or VNIC (dladm + * setlinkprop) need to quiesce data flow before changing the wiring. + * The operation is done by the mac layer, but it calls back into IP to + * quiesce the soft ring. After completing the operation (say increase or + * decrease of the fanout) the mac layer then calls back into IP to restart + * the quiesced soft ring. + * + * Cleanup: This is triggered when the squeue binding to a soft ring is + * removed permanently. Typically interface plumb and unplumb would trigger + * this. It can also be triggered from the mac layer when a soft ring is + * being deleted say as the result of a fanout reduction. Since squeues are + * never deleted, the cleanup marks the squeue as fit for recycling and + * moves it to the zeroth squeue set. + */ static void -squeue_worker(squeue_t *sqp) +squeue_poll_thr_control(squeue_t *sqp) +{ + if (sqp->sq_state & SQS_POLL_THR_RESTART) { + /* Restart implies a previous quiesce */ + ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED); + sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED | + SQS_POLL_THR_RESTART); + sqp->sq_state |= SQS_POLL_CAPAB; + cv_signal(&sqp->sq_worker_cv); + return; + } + + if (sqp->sq_state & SQS_POLL_THR_QUIESCE) { + sqp->sq_state |= SQS_POLL_THR_QUIESCED; + sqp->sq_state &= ~SQS_POLL_THR_QUIESCE; + cv_signal(&sqp->sq_worker_cv); + return; + } +} + +/* + * POLLING Notes + * + * With polling mode, we want to do as much processing as we possibly can + * in worker thread context. The sweet spot is worker thread keeps doing + * work all the time in polling mode and writers etc. keep dumping packets + * to worker thread. Occassionally, we send the poll thread (running at + * lower priority to NIC to get the chain of packets to feed to worker). + * Sending the poll thread down to NIC is dependant on 3 criterions + * + * 1) Its always driven from squeue_drain and only if worker thread is + * doing the drain. + * 2) We clear the backlog once and more packets arrived in between. + * Before starting drain again, send the poll thread down if + * the drain is being done by worker thread. + * 3) Before exiting the squeue_drain, if the poll thread is not already + * working and we are the worker thread, try to poll one more time. + * + * For latency sake, we do allow any thread calling squeue_enter + * to process its packet provided: + * + * 1) Nothing is queued + * 2) If more packets arrived in between, the non worker thread are allowed + * to do the drain till their time quanta expired provided SQS_GET_PKTS + * wasn't set in between. + * + * Avoiding deadlocks with interrupts + * ================================== + * + * One of the big problem is that we can't send poll_thr down while holding + * the sq_lock since the thread can block. So we drop the sq_lock before + * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the + * poll thread is running so that no other thread can acquire the + * perimeter in between. If the squeue_drain gets done (no more work + * left), it leaves the SQS_PROC set if poll thread is running. + */ + +/* + * This is the squeue poll thread. In poll mode, it polls the underlying + * TCP softring and feeds packets into the squeue. The worker thread then + * drains the squeue. The poll thread also responds to control signals for + * quiesceing, restarting, or cleanup of an squeue. These are driven by + * control operations like plumb/unplumb or as a result of dynamic Rx ring + * related operations that are driven from the mac layer. + */ +static void +squeue_polling_thread(squeue_t *sqp) { kmutex_t *lock = &sqp->sq_lock; - kcondvar_t *async = &sqp->sq_async; + kcondvar_t *async = &sqp->sq_poll_cv; + ip_mac_rx_t sq_get_pkts; + ip_accept_t ip_accept; + ill_rx_ring_t *sq_rx_ring; + ill_t *sq_ill; + mblk_t *head, *tail, *mp; + uint_t cnt; + void *sq_mac_handle; callb_cpr_t cprinfo; - hrtime_t now; -#if SQUEUE_PROFILE - hrtime_t start; -#endif + size_t bytes_to_pickup; + uint32_t ctl_state; - CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "nca"); + CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll"); mutex_enter(lock); for (;;) { - while (sqp->sq_first == NULL || (sqp->sq_state & SQS_PROC)) { - CALLB_CPR_SAFE_BEGIN(&cprinfo); -still_wait: - cv_wait(async, lock); - if (sqp->sq_state & SQS_PROC) { - goto still_wait; - } - CALLB_CPR_SAFE_END(&cprinfo, lock); + CALLB_CPR_SAFE_BEGIN(&cprinfo); + cv_wait(async, lock); + CALLB_CPR_SAFE_END(&cprinfo, lock); + + ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL | + SQS_POLL_THR_QUIESCED); + if (ctl_state != 0) { + /* + * If the squeue is quiesced, then wait for a control + * request. A quiesced squeue must not poll the + * underlying soft ring. + */ + if (ctl_state == SQS_POLL_THR_QUIESCED) + continue; + /* + * Act on control requests to quiesce, cleanup or + * restart an squeue + */ + squeue_poll_thr_control(sqp); + continue; } -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - start = gethrtime(); + if (!(sqp->sq_state & SQS_POLL_CAPAB)) + continue; + + ASSERT((sqp->sq_state & + (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == + (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); + +poll_again: + sq_rx_ring = sqp->sq_rx_ring; + sq_get_pkts = sq_rx_ring->rr_rx; + sq_mac_handle = sq_rx_ring->rr_rx_handle; + ip_accept = sq_rx_ring->rr_ip_accept; + sq_ill = sq_rx_ring->rr_ill; + bytes_to_pickup = MAX_BYTES_TO_PICKUP; + mutex_exit(lock); + head = sq_get_pkts(sq_mac_handle, bytes_to_pickup); + mp = NULL; + if (head != NULL) { + /* + * We got the packet chain from the mac layer. It + * would be nice to be able to process it inline + * for better performance but we need to give + * IP a chance to look at this chain to ensure + * that packets are really meant for this squeue + * and do the IP processing. + */ + mp = ip_accept(sq_ill, sq_rx_ring, sqp, head, + &tail, &cnt); } -#endif + mutex_enter(lock); + if (mp != NULL) + ENQUEUE_CHAIN(sqp, mp, tail, cnt); - ASSERT(squeue_workerdrain_ns != 0); - now = gethrtime(); - sqp->sq_run = curthread; - squeue_drain(sqp, SQS_WORKER, now + squeue_workerdrain_ns); - sqp->sq_run = NULL; + ASSERT((sqp->sq_state & + (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) == + (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)); - if (sqp->sq_first != NULL) { + if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) { /* - * Doing too much processing by worker thread - * in presense of interrupts can be sub optimal. - * Instead, once a drain is done by worker thread - * for squeue_writerdrain_ns (the reason we are - * here), we force wait for squeue_workerwait_tick - * before doing more processing even if sq_wait is - * set to 0. - * - * This can be counterproductive for performance - * if worker thread is the only means to process - * the packets (interrupts or writers are not - * allowed inside the squeue). + * We have packets to process and worker thread + * is not running. Check to see if poll thread is + * allowed to process. Let it do processing only if it + * picked up some packets from the NIC otherwise + * wakeup the worker thread. */ - if (sqp->sq_tid == 0 && - !(sqp->sq_state & SQS_TMO_PROG)) { - timeout_id_t tid; + if (mp != NULL) { + hrtime_t now; + + now = gethrtime(); + sqp->sq_run = curthread; + sqp->sq_drain(sqp, SQS_POLL_PROC, now + + squeue_drain_ns); + sqp->sq_run = NULL; + + if (sqp->sq_first == NULL) + goto poll_again; - sqp->sq_state |= SQS_TMO_PROG; - mutex_exit(&sqp->sq_lock); - tid = timeout(squeue_fire, sqp, - squeue_workerwait_tick); - mutex_enter(&sqp->sq_lock); /* - * Check again if we still need - * the timeout + * Couldn't do the entire drain because the + * time limit expired, let the + * worker thread take over. */ - if (((sqp->sq_state & (SQS_TMO_PROG|SQS_PROC)) - == SQS_TMO_PROG) && (sqp->sq_tid == 0) && - (sqp->sq_first != NULL)) { - sqp->sq_state &= ~SQS_TMO_PROG; - sqp->sq_awaken = lbolt; - sqp->sq_tid = tid; - } else if (sqp->sq_state & SQS_TMO_PROG) { - /* timeout not needed */ - sqp->sq_state &= ~SQS_TMO_PROG; - mutex_exit(&(sqp)->sq_lock); - (void) untimeout(tid); - mutex_enter(&sqp->sq_lock); - } } - CALLB_CPR_SAFE_BEGIN(&cprinfo); - cv_wait(async, lock); - CALLB_CPR_SAFE_END(&cprinfo, lock); - } - -#if SQUEUE_PROFILE - if (SQ_PROFILING(sqp)) { - SQDELTA(sqp, sq_time_worker, gethrtime() - start); + sqp->sq_awaken = lbolt; + /* + * Put the SQS_PROC_HELD on so the worker + * thread can distinguish where its called from. We + * can remove the SQS_PROC flag here and turn off the + * polling so that it wouldn't matter who gets the + * processing but we get better performance this way + * and save the cost of turn polling off and possibly + * on again as soon as we start draining again. + * + * We can't remove the SQS_PROC flag without turning + * polling off until we can guarantee that control + * will return to squeue_drain immediately. + */ + sqp->sq_state |= SQS_PROC_HELD; + sqp->sq_state &= ~SQS_GET_PKTS; + cv_signal(&sqp->sq_worker_cv); + } else if (sqp->sq_first == NULL && + !(sqp->sq_state & SQS_WORKER)) { + /* + * Nothing queued and worker thread not running. + * Since we hold the proc, no other thread is + * processing the squeue. This means that there + * is no work to be done and nothing is queued + * in squeue or in NIC. Turn polling off and go + * back to interrupt mode. + */ + sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS); + /* LINTED: constant in conditional context */ + SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring); + } else { + /* + * Worker thread is already running. We don't need + * to do anything. Indicate that poll thread is done. + */ + sqp->sq_state &= ~SQS_GET_PKTS; + } + if (sqp->sq_state & SQS_POLL_THR_CONTROL) { + /* + * Act on control requests to quiesce, cleanup or + * restart an squeue + */ + squeue_poll_thr_control(sqp); } -#endif } } -#if SQUEUE_PROFILE -static int -squeue_kstat_update(kstat_t *ksp, int rw) +/* + * The squeue worker thread acts on any control requests to quiesce, cleanup + * or restart an ill_rx_ring_t by calling this function. The worker thread + * synchronizes with the squeue poll thread to complete the request and finally + * wakes up the requestor when the request is completed. + */ +static void +squeue_worker_thr_control(squeue_t *sqp) { - struct squeue_kstat *sqsp = &squeue_kstat; - squeue_t *sqp = ksp->ks_private; + ill_t *ill; + ill_rx_ring_t *rx_ring; - if (rw == KSTAT_WRITE) - return (EACCES); + ASSERT(MUTEX_HELD(&sqp->sq_lock)); -#if SQUEUE_DEBUG - sqsp->sq_count.value.ui64 = sqp->sq_count; - sqsp->sq_max_qlen.value.ui64 = sqp->sq_stats.sq_max_qlen; -#endif - sqsp->sq_npackets_worker.value.ui64 = sqp->sq_stats.sq_npackets_worker; - sqsp->sq_npackets_intr.value.ui64 = sqp->sq_stats.sq_npackets_intr; - sqsp->sq_npackets_other.value.ui64 = sqp->sq_stats.sq_npackets_other; - sqsp->sq_nqueued_intr.value.ui64 = sqp->sq_stats.sq_nqueued_intr; - sqsp->sq_nqueued_other.value.ui64 = sqp->sq_stats.sq_nqueued_other; - sqsp->sq_ndrains_worker.value.ui64 = sqp->sq_stats.sq_ndrains_worker; - sqsp->sq_ndrains_intr.value.ui64 = sqp->sq_stats.sq_ndrains_intr; - sqsp->sq_ndrains_other.value.ui64 = sqp->sq_stats.sq_ndrains_other; - sqsp->sq_time_worker.value.ui64 = sqp->sq_stats.sq_time_worker; - sqsp->sq_time_intr.value.ui64 = sqp->sq_stats.sq_time_intr; - sqsp->sq_time_other.value.ui64 = sqp->sq_stats.sq_time_other; - return (0); -} -#endif + if (sqp->sq_state & SQS_POLL_RESTART) { + /* Restart implies a previous quiesce. */ + ASSERT((sqp->sq_state & (SQS_PROC_HELD | + SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) == + (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)); + /* + * Request the squeue poll thread to restart and wait till + * it actually restarts. + */ + sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE; + sqp->sq_state |= SQS_POLL_THR_RESTART; + cv_signal(&sqp->sq_poll_cv); + while (sqp->sq_state & SQS_POLL_THR_QUIESCED) + cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); + sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC | + SQS_WORKER); + /* + * Signal any waiter that is waiting for the restart + * to complete + */ + sqp->sq_state |= SQS_POLL_RESTART_DONE; + cv_signal(&sqp->sq_ctrlop_done_cv); + return; + } -void -squeue_profile_enable(squeue_t *sqp) -{ - mutex_enter(&sqp->sq_lock); - sqp->sq_state |= SQS_PROFILE; - mutex_exit(&sqp->sq_lock); -} + if (sqp->sq_state & SQS_PROC_HELD) { + /* The squeue poll thread handed control to us */ + ASSERT(sqp->sq_state & SQS_PROC); + } -void -squeue_profile_disable(squeue_t *sqp) -{ - mutex_enter(&sqp->sq_lock); - sqp->sq_state &= ~SQS_PROFILE; + /* + * Prevent any other thread from processing the squeue + * until we finish the control actions by setting SQS_PROC. + * But allow ourself to reenter by setting SQS_WORKER + */ + sqp->sq_state |= (SQS_PROC | SQS_WORKER); + + /* Signal the squeue poll thread and wait for it to quiesce itself */ + if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) { + sqp->sq_state |= SQS_POLL_THR_QUIESCE; + cv_signal(&sqp->sq_poll_cv); + while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) + cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); + } + + rx_ring = sqp->sq_rx_ring; + ill = rx_ring->rr_ill; + /* + * The lock hierarchy is as follows. + * cpu_lock -> ill_lock -> sqset_lock -> sq_lock + */ mutex_exit(&sqp->sq_lock); -} + mutex_enter(&ill->ill_lock); + mutex_enter(&sqp->sq_lock); -void -squeue_profile_reset(squeue_t *sqp) -{ -#if SQUEUE_PROFILE - bzero(&sqp->sq_stats, sizeof (sqstat_t)); -#endif -} + SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0, + sqp->sq_rx_ring); + sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD); + if (sqp->sq_state & SQS_POLL_CLEANUP) { + /* + * Disassociate this squeue from its ill_rx_ring_t. + * The rr_sqp, sq_rx_ring fields are protected by the + * corresponding squeue, ill_lock* and sq_lock. Holding any + * of them will ensure that the ring to squeue mapping does + * not change. + */ + ASSERT(!(sqp->sq_state & SQS_DEFAULT)); -void -squeue_profile_start(void) -{ -#if SQUEUE_PROFILE - squeue_profile = B_TRUE; -#endif + sqp->sq_rx_ring = NULL; + rx_ring->rr_sqp = NULL; + + sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED | + SQS_POLL_QUIESCE_DONE); + sqp->sq_ill = NULL; + + rx_ring->rr_rx_handle = NULL; + rx_ring->rr_intr_handle = NULL; + rx_ring->rr_intr_enable = NULL; + rx_ring->rr_intr_disable = NULL; + sqp->sq_state |= SQS_POLL_CLEANUP_DONE; + } else { + sqp->sq_state &= ~SQS_POLL_QUIESCE; + sqp->sq_state |= SQS_POLL_QUIESCE_DONE; + } + /* + * Signal any waiter that is waiting for the quiesce or cleanup + * to complete and also wait for it to actually see and reset the + * SQS_POLL_CLEANUP_DONE. + */ + cv_signal(&sqp->sq_ctrlop_done_cv); + mutex_exit(&ill->ill_lock); + if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) { + cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock); + sqp->sq_state &= ~(SQS_PROC | SQS_WORKER); + } } -void -squeue_profile_stop(void) +static void +squeue_worker(squeue_t *sqp) { -#if SQUEUE_PROFILE - squeue_profile = B_FALSE; -#endif + kmutex_t *lock = &sqp->sq_lock; + kcondvar_t *async = &sqp->sq_worker_cv; + callb_cpr_t cprinfo; + hrtime_t now; + + CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker"); + mutex_enter(lock); + + for (;;) { + for (;;) { + /* + * If the poll thread has handed control to us + * we need to break out of the wait. + */ + if (sqp->sq_state & SQS_PROC_HELD) + break; + + /* + * If the squeue is not being processed and we either + * have messages to drain or some thread has signaled + * some control activity we need to break + */ + if (!(sqp->sq_state & SQS_PROC) && + ((sqp->sq_state & SQS_WORKER_THR_CONTROL) || + (sqp->sq_first != NULL))) + break; + + /* + * If we have started some control action, then check + * for the SQS_WORKER flag (since we don't + * release the squeue) to make sure we own the squeue + * and break out + */ + if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) && + (sqp->sq_state & SQS_WORKER)) + break; + + CALLB_CPR_SAFE_BEGIN(&cprinfo); + cv_wait(async, lock); + CALLB_CPR_SAFE_END(&cprinfo, lock); + } + if (sqp->sq_state & SQS_WORKER_THR_CONTROL) { + squeue_worker_thr_control(sqp); + continue; + } + ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED | + SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE | + SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL))); + + if (sqp->sq_state & SQS_PROC_HELD) + sqp->sq_state &= ~SQS_PROC_HELD; + + now = gethrtime(); + sqp->sq_run = curthread; + sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns); + sqp->sq_run = NULL; + } } uintptr_t * @@ -1482,9 +1240,3 @@ squeue_getprivate(squeue_t *sqp, sqprivate_t p) return (&sqp->sq_private[p]); } - -processorid_t -squeue_binding(squeue_t *sqp) -{ - return (sqp->sq_bind); -} |
