diff options
-rw-r--r-- | usr/src/uts/common/rpc/rpcib.c | 4 | ||||
-rw-r--r-- | usr/src/uts/common/rpc/rpcmod.c | 290 | ||||
-rw-r--r-- | usr/src/uts/common/rpc/svc.c | 290 | ||||
-rw-r--r-- | usr/src/uts/common/rpc/svc.h | 12 | ||||
-rw-r--r-- | usr/src/uts/common/rpc/svc_gen.c | 11 | ||||
-rw-r--r-- | usr/src/uts/common/rpc/svc_rdma.c | 5 |
6 files changed, 349 insertions, 263 deletions
diff --git a/usr/src/uts/common/rpc/rpcib.c b/usr/src/uts/common/rpc/rpcib.c index a2ded72177..07719f74fe 100644 --- a/usr/src/uts/common/rpc/rpcib.c +++ b/usr/src/uts/common/rpc/rpcib.c @@ -20,6 +20,7 @@ */ /* * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. + * Copyright 2013 Nexenta Systems, Inc. All rights reserved. */ /* @@ -1420,7 +1421,8 @@ rib_svc_rcq_handler(ibt_cq_hdl_t cq_hdl, void *arg) rdp->rpcmsg.len = wc.wc_bytes_xfer; rdp->status = wc.wc_status; mp->b_wptr += sizeof (*rdp); - svc_queuereq((queue_t *)rib_stat->q, mp); + (void) svc_queuereq((queue_t *)rib_stat->q, mp, + FALSE); mutex_exit(&plugin_state_lock); } else { /* diff --git a/usr/src/uts/common/rpc/rpcmod.c b/usr/src/uts/common/rpc/rpcmod.c index fc99ca89b3..09863c970f 100644 --- a/usr/src/uts/common/rpc/rpcmod.c +++ b/usr/src/uts/common/rpc/rpcmod.c @@ -21,7 +21,10 @@ /* * Copyright 2010 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. + */ +/* * Copyright 2012 Milan Jurik. All rights reserved. + * Copyright 2013 Nexenta Systems, Inc. All rights reserved. */ /* Copyright (c) 1990 Mentat Inc. */ @@ -231,9 +234,6 @@ static void mir_wsrv(queue_t *q); static struct module_info rpcmod_info = {RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024}; -/* - * Read side has no service procedure. - */ static struct qinit rpcmodrinit = { (int (*)())rmm_rput, (int (*)())rmm_rsrv, @@ -269,6 +269,9 @@ struct xprt_style_ops { void (*xo_rsrv)(); }; +/* + * Read side has no service procedure. + */ static struct xprt_style_ops xprt_clts_ops = { rpcmodopen, rpcmodclose, @@ -291,7 +294,7 @@ static struct xprt_style_ops xprt_cots_ops = { * Per rpcmod "slot" data structure. q->q_ptr points to one of these. */ struct rpcm { - void *rm_krpc_cell; /* Reserved for use by KRPC */ + void *rm_krpc_cell; /* Reserved for use by kRPC */ struct xprt_style_ops *rm_ops; int rm_type; /* Client or server side stream */ #define RM_CLOSING 0x1 /* somebody is trying to close slot */ @@ -312,7 +315,7 @@ struct temp_slot { }; typedef struct mir_s { - void *mir_krpc_cell; /* Reserved for KRPC use. This field */ + void *mir_krpc_cell; /* Reserved for kRPC use. This field */ /* must be first in the structure. */ struct xprt_style_ops *rm_ops; int mir_type; /* Client or server side stream */ @@ -362,7 +365,7 @@ typedef struct mir_s { * to 1 whenever a new request is sent out (mir_wput) * and cleared when the timer fires (mir_timer). If * the timer fires with this value equal to 0, then the - * stream is considered idle and KRPC is notified. + * stream is considered idle and kRPC is notified. */ mir_clntreq : 1, /* @@ -404,9 +407,9 @@ typedef struct mir_s { /* that a kernel RPC server thread */ /* (see svc_run()) has on this rpcmod */ /* slot. Effectively, it is the */ - /* number * of unprocessed messages */ + /* number of unprocessed messages */ /* that have been passed up to the */ - /* KRPC layer */ + /* kRPC layer */ mblk_t *mir_svc_pend_mp; /* Pending T_ORDREL_IND or */ /* T_DISCON_IND */ @@ -567,7 +570,7 @@ rmm_close(queue_t *q, int flag, cred_t *crp) return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp)); } -static void rpcmod_release(queue_t *, mblk_t *); +static void rpcmod_release(queue_t *, mblk_t *, bool_t); /* * rpcmodopen - open routine gets called when the module gets pushed * onto the stream. @@ -578,7 +581,7 @@ rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp) { struct rpcm *rmp; - extern void (*rpc_rele)(queue_t *, mblk_t *); + extern void (*rpc_rele)(queue_t *, mblk_t *, bool_t); TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:"); @@ -687,13 +690,6 @@ rpcmodclose(queue_t *q, int flag, cred_t *crp) return (0); } -#ifdef DEBUG -int rpcmod_send_msg_up = 0; -int rpcmod_send_uderr = 0; -int rpcmod_send_dup = 0; -int rpcmod_send_dup_cnt = 0; -#endif - /* * rpcmodrput - Module read put procedure. This is called from * the module, driver, or stream head downstream. @@ -715,54 +711,6 @@ rpcmodrput(queue_t *q, mblk_t *mp) return; } -#ifdef DEBUG - if (rpcmod_send_msg_up > 0) { - mblk_t *nmp = copymsg(mp); - if (nmp) { - putnext(q, nmp); - rpcmod_send_msg_up--; - } - } - if ((rpcmod_send_uderr > 0) && mp->b_datap->db_type == M_PROTO) { - mblk_t *nmp; - struct T_unitdata_ind *data; - struct T_uderror_ind *ud; - int d; - data = (struct T_unitdata_ind *)mp->b_rptr; - if (data->PRIM_type == T_UNITDATA_IND) { - d = sizeof (*ud) - sizeof (*data); - nmp = allocb(mp->b_wptr - mp->b_rptr + d, BPRI_HI); - if (nmp) { - ud = (struct T_uderror_ind *)nmp->b_rptr; - ud->PRIM_type = T_UDERROR_IND; - ud->DEST_length = data->SRC_length; - ud->DEST_offset = data->SRC_offset + d; - ud->OPT_length = data->OPT_length; - ud->OPT_offset = data->OPT_offset + d; - ud->ERROR_type = ENETDOWN; - if (data->SRC_length) { - bcopy(mp->b_rptr + - data->SRC_offset, - nmp->b_rptr + - ud->DEST_offset, - data->SRC_length); - } - if (data->OPT_length) { - bcopy(mp->b_rptr + - data->OPT_offset, - nmp->b_rptr + - ud->OPT_offset, - data->OPT_length); - } - nmp->b_wptr += d; - nmp->b_wptr += (mp->b_wptr - mp->b_rptr); - nmp->b_datap->db_type = M_PROTO; - putnext(q, nmp); - rpcmod_send_uderr--; - } - } - } -#endif switch (mp->b_datap->db_type) { default: putnext(q, mp); @@ -774,14 +722,12 @@ rpcmodrput(queue_t *q, mblk_t *mp) pptr = (union T_primitives *)mp->b_rptr; /* - * Forward this message to krpc if it is data. + * Forward this message to kRPC if it is data. */ if (pptr->type == T_UNITDATA_IND) { - mblk_t *nmp; - - /* - * Check if the module is being popped. - */ + /* + * Check if the module is being popped. + */ mutex_enter(&rmp->rm_lock); if (rmp->rm_state & RM_CLOSING) { mutex_exit(&rmp->rm_lock); @@ -818,47 +764,21 @@ rpcmodrput(queue_t *q, mblk_t *mp) case RPC_SERVER: /* * rm_krpc_cell is exclusively used by the kRPC - * CLTS server + * CLTS server. Try to submit the message to + * kRPC. Since this is an unreliable channel, we + * can just free the message in case the kRPC + * does not accept new messages. */ - if (rmp->rm_krpc_cell) { -#ifdef DEBUG - /* - * Test duplicate request cache and - * rm_ref count handling by sending a - * duplicate every so often, if - * desired. - */ - if (rpcmod_send_dup && - rpcmod_send_dup_cnt++ % - rpcmod_send_dup) - nmp = copymsg(mp); - else - nmp = NULL; -#endif + if (rmp->rm_krpc_cell && + svc_queuereq(q, mp, TRUE)) { /* * Raise the reference count on this * module to prevent it from being - * popped before krpc generates the + * popped before kRPC generates the * reply. */ rmp->rm_ref++; mutex_exit(&rmp->rm_lock); - - /* - * Submit the message to krpc. - */ - svc_queuereq(q, mp); -#ifdef DEBUG - /* - * Send duplicate if we created one. - */ - if (nmp) { - mutex_enter(&rmp->rm_lock); - rmp->rm_ref++; - mutex_exit(&rmp->rm_lock); - svc_queuereq(q, nmp); - } -#endif } else { mutex_exit(&rmp->rm_lock); freemsg(mp); @@ -1030,8 +950,9 @@ rpcmodwsrv(queue_t *q) } } +/* ARGSUSED */ static void -rpcmod_release(queue_t *q, mblk_t *bp) +rpcmod_release(queue_t *q, mblk_t *bp, bool_t enable) { struct rpcm *rmp; @@ -1084,7 +1005,7 @@ rpcmod_release(queue_t *q, mblk_t *bp) static int mir_clnt_dup_request(queue_t *q, mblk_t *mp); static void mir_rput_proto(queue_t *q, mblk_t *mp); static int mir_svc_policy_notify(queue_t *q, int event); -static void mir_svc_release(queue_t *wq, mblk_t *mp); +static void mir_svc_release(queue_t *wq, mblk_t *mp, bool_t); static void mir_svc_start(queue_t *wq); static void mir_svc_idle_start(queue_t *, mir_t *); static void mir_svc_idle_stop(queue_t *, mir_t *); @@ -1099,7 +1020,7 @@ static void mir_disconnect(queue_t *, mir_t *ir); static int mir_check_len(queue_t *, int32_t, mblk_t *); static void mir_timer(void *); -extern void (*mir_rele)(queue_t *, mblk_t *); +extern void (*mir_rele)(queue_t *, mblk_t *, bool_t); extern void (*mir_start)(queue_t *); extern void (*clnt_stop_idle)(queue_t *); @@ -1256,7 +1177,7 @@ mir_close(queue_t *q) mutex_exit(&mir->mir_mutex); qprocsoff(q); - /* Notify KRPC that this stream is going away. */ + /* Notify kRPC that this stream is going away. */ svc_queueclose(q); } else { mutex_exit(&mir->mir_mutex); @@ -1337,7 +1258,7 @@ mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp) mir_t *mir; RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q); - /* Set variables used directly by KRPC. */ + /* Set variables used directly by kRPC. */ if (!mir_rele) mir_rele = mir_svc_release; if (!mir_start) @@ -1357,7 +1278,7 @@ mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp) * be held on the read-side queue until the stream is completely * initialized with a RPC_CLIENT or RPC_SERVER ioctl. During * the ioctl processing, the flag is cleared and any messages that - * arrived between the open and the ioctl are delivered to KRPC. + * arrived between the open and the ioctl are delivered to kRPC. * * Early data should never arrive on a client stream since * servers only respond to our requests and we do not send any. @@ -1365,7 +1286,7 @@ mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp) * very common on a server stream where the client will start * sending data as soon as the connection is made (and this * is especially true with TCP where the protocol accepts the - * connection before nfsd or KRPC is notified about it). + * connection before nfsd or kRPC is notified about it). */ mir->mir_hold_inbound = 1; @@ -1420,7 +1341,7 @@ mir_rput(queue_t *q, mblk_t *mp) * If the stream has not been set up as a RPC_CLIENT or RPC_SERVER * with the corresponding ioctl, then don't accept * any inbound data. This should never happen for streams - * created by nfsd or client-side KRPC because they are careful + * created by nfsd or client-side kRPC because they are careful * to set the mode of the stream before doing anything else. */ if (mir->mir_type == 0) { @@ -1456,7 +1377,7 @@ mir_rput(queue_t *q, mblk_t *mp) * If a module on the stream is trying set the Stream head's * high water mark, then set our hiwater to the requested * value. We are the "stream head" for all inbound - * data messages since messages are passed directly to KRPC. + * data messages since messages are passed directly to kRPC. */ if (MBLKL(mp) >= sizeof (struct stroptions)) { struct stroptions *stropts; @@ -1629,23 +1550,32 @@ mir_rput(queue_t *q, mblk_t *mp) case RPC_SERVER: /* * Check for flow control before passing the - * message to KRPC. + * message to kRPC. */ if (!mir->mir_hold_inbound) { if (mir->mir_krpc_cell) { - /* - * If the reference count is 0 - * (not including this request), - * then the stream is transitioning - * from idle to non-idle. In this case, - * we cancel the idle timer. - */ - if (mir->mir_ref_cnt++ == 0) - stop_timer = B_TRUE; + if (mir_check_len(q, - (int32_t)msgdsize(mp), mp)) + (int32_t)msgdsize(head_mp), + head_mp)) return; - svc_queuereq(q, head_mp); /* to KRPC */ + + if (q->q_first == NULL && + svc_queuereq(q, head_mp, TRUE)) { + /* + * If the reference count is 0 + * (not including this + * request), then the stream is + * transitioning from idle to + * non-idle. In this case, we + * cancel the idle timer. + */ + if (mir->mir_ref_cnt++ == 0) + stop_timer = B_TRUE; + } else { + (void) putq(q, head_mp); + mir->mir_inrservice = B_TRUE; + } } else { /* * Count # of times this happens. Should @@ -1811,7 +1741,7 @@ mir_rput_proto(queue_t *q, mblk_t *mp) break; default: RPCLOG(1, "mir_rput: unexpected message %d " - "for KRPC client\n", + "for kRPC client\n", ((union T_primitives *)mp->b_rptr)->type); break; } @@ -1925,37 +1855,12 @@ mir_rput_proto(queue_t *q, mblk_t *mp) * outbound flow control is exerted. When outbound flow control is * relieved, mir_wsrv qenables the read-side queue. Read-side queues * are not enabled by STREAMS and are explicitly noenable'ed in mir_open. - * - * For the server side, we have two types of messages queued. The first type - * are messages that are ready to be XDR decoded and and then sent to the - * RPC program's dispatch routine. The second type are "raw" messages that - * haven't been processed, i.e. assembled from rpc record fragements into - * full requests. The only time we will see the second type of message - * queued is if we have a memory allocation failure while processing a - * a raw message. The field mir_first_non_processed_mblk will mark the - * first such raw message. So the flow for server side is: - * - * - send processed queued messages to kRPC until we run out or find - * one that needs additional processing because we were short on memory - * earlier - * - process a message that was deferred because of lack of - * memory - * - continue processing messages until the queue empties or we - * have to stop because of lack of memory - * - during each of the above phase, if the queue is empty and - * there are no pending messages that were passed to the RPC - * layer, send upstream the pending disconnect/ordrel indication if - * there is one - * - * The read-side queue is also enabled by a bufcall callback if dupmsg - * fails in mir_rput. */ static void mir_rsrv(queue_t *q) { mir_t *mir; mblk_t *mp; - mblk_t *cmp = NULL; boolean_t stop_timer = B_FALSE; mir = (mir_t *)q->q_ptr; @@ -1966,43 +1871,28 @@ mir_rsrv(queue_t *q) case RPC_SERVER: if (mir->mir_ref_cnt == 0) mir->mir_hold_inbound = 0; - if (mir->mir_hold_inbound) { - - ASSERT(cmp == NULL); - if (q->q_first == NULL) { - - MIR_CLEAR_INRSRV(mir); - - if (MIR_SVC_QUIESCED(mir)) { - cmp = mir->mir_svc_pend_mp; - mir->mir_svc_pend_mp = NULL; - } - } - - mutex_exit(&mir->mir_mutex); - - if (cmp != NULL) { - RPCLOG(16, "mir_rsrv: line %d: sending a held " - "disconnect/ord rel indication upstream\n", - __LINE__); - putnext(q, cmp); - } + if (mir->mir_hold_inbound) + break; - return; - } while (mp = getq(q)) { if (mir->mir_krpc_cell && (mir->mir_svc_no_more_msgs == 0)) { - /* - * If we were idle, turn off idle timer since - * we aren't idle any more. - */ - if (mir->mir_ref_cnt++ == 0) - stop_timer = B_TRUE; + if (mir_check_len(q, (int32_t)msgdsize(mp), mp)) return; - svc_queuereq(q, mp); + + if (svc_queuereq(q, mp, TRUE)) { + /* + * If we were idle, turn off idle timer + * since we aren't idle any more. + */ + if (mir->mir_ref_cnt++ == 0) + stop_timer = B_TRUE; + } else { + (void) putbq(q, mp); + break; + } } else { /* * Count # of times this happens. Should be @@ -2041,10 +1931,10 @@ mir_rsrv(queue_t *q) } if (q->q_first == NULL) { + mblk_t *cmp = NULL; MIR_CLEAR_INRSRV(mir); - ASSERT(cmp == NULL); if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) { cmp = mir->mir_svc_pend_mp; mir->mir_svc_pend_mp = NULL; @@ -2111,15 +2001,14 @@ mir_svc_start_close(queue_t *wq, mir_t *mir) ASSERT((wq->q_flag & QREADR) == 0); ASSERT(mir->mir_type == RPC_SERVER); - /* * Do not accept any more messages. */ mir->mir_svc_no_more_msgs = 1; /* - * Next two statements will make the read service procedure invoke - * svc_queuereq() on everything stuck in the streams read queue. + * Next two statements will make the read service procedure + * free everything stuck in the streams read queue. * It's not necessary because enabling the write queue will * have the same effect, but why not speed the process along? */ @@ -2134,11 +2023,11 @@ mir_svc_start_close(queue_t *wq, mir_t *mir) } /* - * This routine is called directly by KRPC after a request is completed, + * This routine is called directly by kRPC after a request is completed, * whether a reply was sent or the request was dropped. */ static void -mir_svc_release(queue_t *wq, mblk_t *mp) +mir_svc_release(queue_t *wq, mblk_t *mp, bool_t enable) { mir_t *mir = (mir_t *)wq->q_ptr; mblk_t *cmp = NULL; @@ -2147,6 +2036,9 @@ mir_svc_release(queue_t *wq, mblk_t *mp) if (mp) freemsg(mp); + if (enable) + qenable(RD(wq)); + mutex_enter(&mir->mir_mutex); /* @@ -2194,7 +2086,7 @@ mir_svc_release(queue_t *wq, mblk_t *mp) } /* - * This routine is called by server-side KRPC when it is ready to + * This routine is called by server-side kRPC when it is ready to * handle inbound messages on the stream. */ static void @@ -2286,7 +2178,7 @@ mir_timer(void *arg) * For clients, the timer fires at clnt_idle_timeout * intervals. If the activity marker (mir_clntreq) is * zero, then the stream has been idle since the last - * timer event and we notify KRPC. If mir_clntreq is + * timer event and we notify kRPC. If mir_clntreq is * non-zero, then the stream is active and we just * restart the timer for another interval. mir_clntreq * is set to 1 in mir_wput for every request passed @@ -2337,10 +2229,10 @@ printf("mir_timer[%d]: doing client timeout\n", now / hz); mutex_exit(&mir->mir_mutex); /* * We pass T_ORDREL_REQ as an integer value - * to KRPC as the indication that the stream + * to kRPC as the indication that the stream * is idle. This is not a T_ORDREL_REQ message, * it is just a convenient value since we call - * the same KRPC routine for T_ORDREL_INDs and + * the same kRPC routine for T_ORDREL_INDs and * T_DISCON_INDs. */ clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0); @@ -2354,7 +2246,7 @@ printf("mir_timer[%d]: doing client timeout\n", now / hz); * by mir_wput when mir_type is set to RPC_SERVER and * by mir_svc_idle_start whenever the stream goes idle * (mir_ref_cnt == 0). The timer is cancelled in - * mir_rput whenever a new inbound request is passed to KRPC + * mir_rput whenever a new inbound request is passed to kRPC * and the stream was previously idle. * * The timer interval can be changed for individual @@ -2424,12 +2316,12 @@ mir_wput(queue_t *q, mblk_t *mp) !IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) { /* * Since we know that M_DATA messages are created exclusively - * by KRPC, we expect that KRPC will leave room for our header + * by kRPC, we expect that kRPC will leave room for our header * and 4 byte align which is normal for XDR. - * If KRPC (or someone else) does not cooperate, then we + * If kRPC (or someone else) does not cooperate, then we * just throw away the message. */ - RPCLOG(1, "mir_wput: KRPC did not leave space for record " + RPCLOG(1, "mir_wput: kRPC did not leave space for record " "fragment header (%d bytes left)\n", (int)(rptr - mp->b_datap->db_base)); freemsg(mp); @@ -2650,7 +2542,7 @@ ioc_eperm: /* * If the stream is not idle, then we hold the * orderly release until it becomes idle. This - * ensures that KRPC will be able to reply to + * ensures that kRPC will be able to reply to * all requests that we have passed to it. * * We also queue the request if there is data already @@ -2896,10 +2788,10 @@ mir_disconnect(queue_t *q, mir_t *mir) mutex_exit(&mir->mir_mutex); /* - * T_DISCON_REQ is passed to KRPC as an integer value + * T_DISCON_REQ is passed to kRPC as an integer value * (this is not a TPI message). It is used as a * convenient value to indicate a sanity check - * failure -- the same KRPC routine is also called + * failure -- the same kRPC routine is also called * for T_DISCON_INDs and T_ORDREL_INDs. */ clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0); @@ -2944,7 +2836,7 @@ mir_check_len(queue_t *q, int32_t frag_len, mblk_t *head_mp) mir->mir_frag_len = -(int32_t)sizeof (uint32_t); if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) { cmn_err(CE_NOTE, - "KRPC: record fragment from %s of size(%d) exceeds " + "kRPC: record fragment from %s of size(%d) exceeds " "maximum (%u). Disconnecting", (mir->mir_type == RPC_CLIENT) ? "server" : (mir->mir_type == RPC_SERVER) ? "client" : diff --git a/usr/src/uts/common/rpc/svc.c b/usr/src/uts/common/rpc/svc.c index 250b5984c6..c0ca1ede3f 100644 --- a/usr/src/uts/common/rpc/svc.c +++ b/usr/src/uts/common/rpc/svc.c @@ -20,11 +20,12 @@ */ /* - * Copyright 2010 Sun Microsystems, Inc. All rights reserved. - * Use is subject to license terms. + * Copyright 2015 Nexenta Systems, Inc. All rights reserved. */ + /* - * Copyright 2014 Nexenta Systems, Inc. All rights reserved. + * Copyright 2010 Sun Microsystems, Inc. All rights reserved. + * Use is subject to license terms. */ /* @@ -47,7 +48,7 @@ * threads processing events on the transport. Some fields in the * master structure are protected by locks * - xp_req_lock protects the request queue: - * xp_req_head, xp_req_tail + * xp_req_head, xp_req_tail, xp_reqs, xp_size, xp_full, xp_enable * - xp_thread_lock protects the thread (clone) counts * xp_threads, xp_detached_threads, xp_wq * Each master transport is registered to exactly one thread pool. @@ -72,7 +73,7 @@ * to restrict resource usage by the service. Some fields are protected * by locks: * - p_req_lock protects several counts and flags: - * p_reqs, p_walkers, p_asleep, p_drowsy, p_req_cv + * p_reqs, p_size, p_walkers, p_asleep, p_drowsy, p_req_cv * - p_thread_lock governs other thread counts: * p_threads, p_detached_threads, p_reserved_threads, p_closing * @@ -113,13 +114,6 @@ * thread processes a request and sends a reply it returns to svc_run() * and svc_run() calls svc_poll() to find new input. * - * There is no longer an "inconsistent" but "safe" optimization in the - * svc_queuereq() code. This "inconsistent" state was leading to - * inconsistencies between the actual number of requests and the value - * of p_reqs (the total number of requests). Because of this, hangs were - * occurring in svc_poll() where p_reqs was greater than one and no - * requests were found on the request queues. - * * svc_poll(). * In order to avoid unnecessary locking, which causes performance * problems, we always look for a pending request on the current transport. @@ -201,6 +195,7 @@ #include <sys/user.h> #include <sys/stream.h> #include <sys/strsubr.h> +#include <sys/strsun.h> #include <sys/tihdr.h> #include <sys/debug.h> #include <sys/cmn_err.h> @@ -292,6 +287,11 @@ struct svc_globals { int rdma_check = 0; /* + * This allows disabling flow control in svc_queuereq(). + */ +volatile int svc_flowcontrol_disable = 0; + +/* * Authentication parameters list. */ static caddr_t rqcred_head; @@ -300,15 +300,15 @@ static kmutex_t rqcred_lock; /* * Pointers to transport specific `rele' routines in rpcmod (set from rpcmod). */ -void (*rpc_rele)(queue_t *, mblk_t *) = NULL; -void (*mir_rele)(queue_t *, mblk_t *) = NULL; +void (*rpc_rele)(queue_t *, mblk_t *, bool_t) = NULL; +void (*mir_rele)(queue_t *, mblk_t *, bool_t) = NULL; /* ARGSUSED */ void -rpc_rdma_rele(queue_t *q, mblk_t *mp) +rpc_rdma_rele(queue_t *q, mblk_t *mp, bool_t enable) { } -void (*rdma_rele)(queue_t *, mblk_t *) = rpc_rdma_rele; +void (*rdma_rele)(queue_t *, mblk_t *, bool_t) = rpc_rdma_rele; /* @@ -914,7 +914,7 @@ svc_xprt_qinit(SVCPOOL *pool, size_t qsize) * - insert a pointer to xprt into the xprt-ready queue (FIFO) * - if the xprt-ready queue is full turn the overflow flag on. * - * NOTICE: pool->p_qtop is protected by the the pool's request lock + * NOTICE: pool->p_qtop is protected by the pool's request lock * and the caller (svc_queuereq()) must hold the lock. */ static void @@ -922,7 +922,7 @@ svc_xprt_qput(SVCPOOL *pool, SVCMASTERXPRT *xprt) { ASSERT(MUTEX_HELD(&pool->p_req_lock)); - /* If the overflow flag is there is nothing we can do */ + /* If the overflow flag is on there is nothing we can do */ if (pool->p_qoverflow) return; @@ -1871,15 +1871,8 @@ svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt) if (xprt && xprt->xp_req_head && (!pool->p_qoverflow || clone_xprt->xp_same_xprt++ < pool->p_max_same_xprt)) { mutex_enter(&xprt->xp_req_lock); - if (xprt->xp_req_head) { - mutex_enter(&pool->p_req_lock); - pool->p_reqs--; - if (pool->p_reqs == 0) - pool->p_qoverflow = FALSE; - mutex_exit(&pool->p_req_lock); - + if (xprt->xp_req_head) return (xprt); - } mutex_exit(&xprt->xp_req_lock); } clone_xprt->xp_same_xprt = 0; @@ -1921,9 +1914,6 @@ svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt) rw_exit(&pool->p_lrwlock); mutex_enter(&pool->p_req_lock); - pool->p_reqs--; - if (pool->p_reqs == 0) - pool->p_qoverflow = FALSE; pool->p_walkers--; mutex_exit(&pool->p_req_lock); @@ -1994,9 +1984,6 @@ svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt) rw_exit(&pool->p_lrwlock); mutex_enter(&pool->p_req_lock); - pool->p_reqs--; - if (pool->p_reqs == 0) - pool->p_qoverflow = FALSE; pool->p_walkers--; mutex_exit(&pool->p_req_lock); @@ -2086,6 +2073,139 @@ svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt) } /* + * calculate memory space used by message + */ +static size_t +svc_msgsize(mblk_t *mp) +{ + size_t count = 0; + + for (; mp; mp = mp->b_cont) + count += MBLKSIZE(mp); + + return (count); +} + +/* + * svc_flowcontrol() attempts to turn the flow control on or off for the + * transport. + * + * On input the xprt->xp_full determines whether the flow control is currently + * off (FALSE) or on (TRUE). If it is off we do tests to see whether we should + * turn it on, and vice versa. + * + * There are two conditions considered for the flow control. Both conditions + * have the low and the high watermark. Once the high watermark is reached in + * EITHER condition the flow control is turned on. For turning the flow + * control off BOTH conditions must be below the low watermark. + * + * Condition #1 - Number of requests queued: + * + * The max number of threads working on the pool is roughly pool->p_maxthreads. + * Every thread could handle up to pool->p_max_same_xprt requests from one + * transport before it moves to another transport. See svc_poll() for details. + * In case all threads in the pool are working on a transport they will handle + * no more than enough_reqs (pool->p_maxthreads * pool->p_max_same_xprt) + * requests in one shot from that transport. We are turning the flow control + * on once the high watermark is reached for a transport so that the underlying + * queue knows the rate of incoming requests is higher than we are able to + * handle. + * + * The high watermark: 2 * enough_reqs + * The low watermark: enough_reqs + * + * Condition #2 - Length of the data payload for the queued messages/requests: + * + * We want to prevent a particular pool exhausting the memory, so once the + * total length of queued requests for the whole pool reaches the high + * watermark we start to turn on the flow control for significant memory + * consumers (individual transports). To keep the implementation simple + * enough, this condition is not exact, because we count only the data part of + * the queued requests and we ignore the overhead. For our purposes this + * should be enough. We should also consider that up to pool->p_maxthreads + * threads for the pool might work on large requests (this is not counted for + * this condition). We need to leave some space for rest of the system and for + * other big memory consumers (like ZFS). Also, after the flow control is + * turned on (on cots transports) we can start to accumulate a few megabytes in + * queues for each transport. + * + * Usually, the big memory consumers are NFS WRITE requests, so we do not + * expect to see this condition met for other than NFS pools. + * + * The high watermark: 1/5 of available memory + * The low watermark: 1/6 of available memory + * + * Once the high watermark is reached we turn the flow control on only for + * transports exceeding a per-transport memory limit. The per-transport + * fraction of memory is calculated as: + * + * the high watermark / number of transports + * + * For transports with less than the per-transport fraction of memory consumed, + * the flow control is not turned on, so they are not blocked by a few "hungry" + * transports. Because of this, the total memory consumption for the + * particular pool might grow up to 2 * the high watermark. + * + * The individual transports are unblocked once their consumption is below: + * + * per-transport fraction of memory / 2 + * + * or once the total memory consumption for the whole pool falls below the low + * watermark. + * + */ +static void +svc_flowcontrol(SVCMASTERXPRT *xprt) +{ + SVCPOOL *pool = xprt->xp_pool; + size_t totalmem = ptob(physmem); + int enough_reqs = pool->p_maxthreads * pool->p_max_same_xprt; + + ASSERT(MUTEX_HELD(&xprt->xp_req_lock)); + + /* Should we turn the flow control on? */ + if (xprt->xp_full == FALSE) { + /* Is flow control disabled? */ + if (svc_flowcontrol_disable != 0) + return; + + /* Is there enough requests queued? */ + if (xprt->xp_reqs >= enough_reqs * 2) { + xprt->xp_full = TRUE; + return; + } + + /* + * If this pool uses over 20% of memory and this transport is + * significant memory consumer then we are full + */ + if (pool->p_size >= totalmem / 5 && + xprt->xp_size >= totalmem / 5 / pool->p_lcount) + xprt->xp_full = TRUE; + + return; + } + + /* We might want to turn the flow control off */ + + /* Do we still have enough requests? */ + if (xprt->xp_reqs > enough_reqs) + return; + + /* + * If this pool still uses over 16% of memory and this transport is + * still significant memory consumer then we are still full + */ + if (pool->p_size >= totalmem / 6 && + xprt->xp_size >= totalmem / 5 / pool->p_lcount / 2) + return; + + /* Turn the flow control off and make sure rpcmod is notified */ + xprt->xp_full = FALSE; + xprt->xp_enable = TRUE; +} + +/* * Main loop of the kernel RPC server * - wait for input (find a transport with a pending request). * - dequeue the request @@ -2111,6 +2231,8 @@ svc_run(SVCPOOL *pool) for (;;) { SVCMASTERXPRT *next; mblk_t *mp; + bool_t enable; + size_t size; TRACE_0(TR_FAC_KRPC, TR_SVC_RUN, "svc_run"); @@ -2168,6 +2290,20 @@ svc_run(SVCPOOL *pool) mp = next->xp_req_head; next->xp_req_head = mp->b_next; mp->b_next = (mblk_t *)0; + size = svc_msgsize(mp); + + mutex_enter(&pool->p_req_lock); + pool->p_reqs--; + if (pool->p_reqs == 0) + pool->p_qoverflow = FALSE; + pool->p_size -= size; + mutex_exit(&pool->p_req_lock); + + next->xp_reqs--; + next->xp_size -= size; + + if (next->xp_full) + svc_flowcontrol(next); TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_DEQ, "rpc_que_req_deq:pool %p mp %p", pool, mp); @@ -2248,14 +2384,19 @@ svc_run(SVCPOOL *pool) * Release our reference on the rpcmod * slot attached to xp_wq->q_ptr. */ - (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL); + mutex_enter(&xprt->xp_req_lock); + enable = xprt->xp_enable; + if (enable) + xprt->xp_enable = FALSE; + mutex_exit(&xprt->xp_req_lock); + (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL, enable); } /* NOTREACHED */ } /* * Flush any pending requests for the queue and - * and free the associated mblks. + * free the associated mblks. */ void svc_queueclean(queue_t *q) @@ -2270,14 +2411,21 @@ svc_queueclean(queue_t *q) mutex_enter(&xprt->xp_req_lock); pool = xprt->xp_pool; while ((mp = xprt->xp_req_head) != NULL) { - /* remove the request from the list and decrement p_reqs */ + /* remove the request from the list */ xprt->xp_req_head = mp->b_next; - mutex_enter(&pool->p_req_lock); mp->b_next = (mblk_t *)0; - pool->p_reqs--; - mutex_exit(&pool->p_req_lock); - (*RELE_PROC(xprt)) (xprt->xp_wq, mp); + (*RELE_PROC(xprt)) (xprt->xp_wq, mp, FALSE); } + + mutex_enter(&pool->p_req_lock); + pool->p_reqs -= xprt->xp_reqs; + pool->p_size -= xprt->xp_size; + mutex_exit(&pool->p_req_lock); + + xprt->xp_reqs = 0; + xprt->xp_size = 0; + xprt->xp_full = FALSE; + xprt->xp_enable = FALSE; mutex_exit(&xprt->xp_req_lock); } @@ -2362,14 +2510,16 @@ svc_queueclose(queue_t *q) * - put a request at the tail of the transport request queue * - insert a hint for svc_poll() into the xprt-ready queue * - increment the `pending-requests' count for the pool + * - handle flow control * - wake up a thread sleeping in svc_poll() if necessary * - if all the threads are running ask the creator for a new one. */ -void -svc_queuereq(queue_t *q, mblk_t *mp) +bool_t +svc_queuereq(queue_t *q, mblk_t *mp, bool_t flowcontrol) { SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0]; SVCPOOL *pool = xprt->xp_pool; + size_t size; TRACE_0(TR_FAC_KRPC, TR_SVC_QUEUEREQ_START, "svc_queuereq_start"); @@ -2386,6 +2536,12 @@ svc_queuereq(queue_t *q, mblk_t *mp) * pending request count it looks atomic. */ mutex_enter(&xprt->xp_req_lock); + if (flowcontrol && xprt->xp_full) { + mutex_exit(&xprt->xp_req_lock); + + return (FALSE); + } + ASSERT(xprt->xp_full == FALSE); mutex_enter(&pool->p_req_lock); if (xprt->xp_req_head == NULL) xprt->xp_req_head = mp; @@ -2396,15 +2552,24 @@ svc_queuereq(queue_t *q, mblk_t *mp) /* * Step 2. * Insert a hint into the xprt-ready queue, increment - * `pending-requests' count for the pool, and wake up + * counters, handle flow control, and wake up * a thread sleeping in svc_poll() if necessary. */ /* Insert pointer to this transport into the xprt-ready queue */ svc_xprt_qput(pool, xprt); - /* Increment the `pending-requests' count for the pool */ + /* Increment counters */ pool->p_reqs++; + xprt->xp_reqs++; + + size = svc_msgsize(mp); + xprt->xp_size += size; + pool->p_size += size; + + /* Handle flow control */ + if (flowcontrol) + svc_flowcontrol(xprt); TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_ENQ, "rpc_que_req_enq:pool %p mp %p", pool, mp); @@ -2449,6 +2614,8 @@ svc_queuereq(queue_t *q, mblk_t *mp) TRACE_1(TR_FAC_KRPC, TR_SVC_QUEUEREQ_END, "svc_queuereq_end:(%S)", "end"); + + return (TRUE); } /* @@ -2539,6 +2706,7 @@ svc_detach_thread(SVCXPRT *clone_xprt) { SVCMASTERXPRT *xprt = clone_xprt->xp_master; SVCPOOL *pool = xprt->xp_pool; + bool_t enable; /* Thread must have a reservation */ ASSERT(clone_xprt->xp_reserved); @@ -2558,7 +2726,12 @@ svc_detach_thread(SVCXPRT *clone_xprt) mutex_exit(&pool->p_thread_lock); /* Release an rpcmod slot for this request */ - (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL); + mutex_enter(&xprt->xp_req_lock); + enable = xprt->xp_enable; + if (enable) + xprt->xp_enable = FALSE; + mutex_exit(&xprt->xp_req_lock); + (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL, enable); /* Mark the clone (thread) as detached */ clone_xprt->xp_reserved = FALSE; @@ -2603,23 +2776,24 @@ rdma_stop(rdma_xprt_group_t *rdma_xprts) mutex_enter(&xprt->xp_req_lock); pool = xprt->xp_pool; while ((mp = xprt->xp_req_head) != NULL) { - /* - * remove the request from the list and - * decrement p_reqs - */ + rdma_recv_data_t *rdp = (rdma_recv_data_t *)mp->b_rptr; + + /* remove the request from the list */ xprt->xp_req_head = mp->b_next; - mutex_enter(&pool->p_req_lock); mp->b_next = (mblk_t *)0; - pool->p_reqs--; - mutex_exit(&pool->p_req_lock); - if (mp) { - rdma_recv_data_t *rdp = (rdma_recv_data_t *) - mp->b_rptr; - RDMA_BUF_FREE(rdp->conn, &rdp->rpcmsg); - RDMA_REL_CONN(rdp->conn); - freemsg(mp); - } + + RDMA_BUF_FREE(rdp->conn, &rdp->rpcmsg); + RDMA_REL_CONN(rdp->conn); + freemsg(mp); } + mutex_enter(&pool->p_req_lock); + pool->p_reqs -= xprt->xp_reqs; + pool->p_size -= xprt->xp_size; + mutex_exit(&pool->p_req_lock); + xprt->xp_reqs = 0; + xprt->xp_size = 0; + xprt->xp_full = FALSE; + xprt->xp_enable = FALSE; mutex_exit(&xprt->xp_req_lock); svc_queueclose(q); #ifdef DEBUG diff --git a/usr/src/uts/common/rpc/svc.h b/usr/src/uts/common/rpc/svc.h index dd3cb44c27..134b690a3f 100644 --- a/usr/src/uts/common/rpc/svc.h +++ b/usr/src/uts/common/rpc/svc.h @@ -20,6 +20,7 @@ */ /* * Copyright (c) 1989, 2010, Oracle and/or its affiliates. All rights reserved. + * Copyright 2013 Nexenta Systems, Inc. All rights reserved. */ /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */ /* All Rights Reserved */ @@ -254,7 +255,7 @@ struct __svcpool { * The pool's thread lock p_thread_lock protects: * - p_threads, p_detached_threads, p_reserved_threads and p_closing * The pool's request lock protects: - * - p_asleep, p_drowsy, p_reqs, p_walkers, p_req_cv. + * - p_asleep, p_drowsy, p_reqs, p_size, p_walkers, p_req_cv. * The following fields are `initialized constants': * - p_id, p_stksize, p_timeout. * Access to p_next and p_prev is protected by the pool @@ -359,6 +360,8 @@ struct __svcpool { kmutex_t p_user_lock; /* Creator lock */ void (*p_offline)(); /* callout for unregister */ void (*p_shutdown)(); /* callout for shutdown */ + + size_t p_size; /* Total size of queued msgs */ }; /* @@ -426,6 +429,11 @@ struct __svcmasterxprt { struct netbuf xp_addrmask; /* address mask */ caddr_t xp_p2; /* private: for use by svc ops */ + + int xp_full : 1; /* xprt is full */ + int xp_enable : 1; /* xprt needs to be enabled */ + int xp_reqs; /* number of requests queued */ + size_t xp_size; /* total size of queued msgs */ }; /* @@ -787,7 +795,7 @@ extern int svc_clts_kcreate(struct file *, uint_t, struct T_info_ack *, SVCMASTERXPRT **); extern int svc_cots_kcreate(struct file *, uint_t, struct T_info_ack *, SVCMASTERXPRT **); -extern void svc_queuereq(queue_t *, mblk_t *); +extern bool_t svc_queuereq(queue_t *, mblk_t *, bool_t); extern void svc_queueclean(queue_t *); extern void svc_queueclose(queue_t *); extern int svc_reserve_thread(SVCXPRT *); diff --git a/usr/src/uts/common/rpc/svc_gen.c b/usr/src/uts/common/rpc/svc_gen.c index fafbff299a..87ea2a7282 100644 --- a/usr/src/uts/common/rpc/svc_gen.c +++ b/usr/src/uts/common/rpc/svc_gen.c @@ -23,6 +23,9 @@ * Copyright 2004 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ +/* + * Copyright 2013 Nexenta Systems, Inc. All rights reserved. + */ /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */ /* All Rights Reserved */ @@ -32,8 +35,6 @@ * under license from the Regents of the University of California. */ -#pragma ident "%Z%%M% %I% %E% SMI" - #include <sys/types.h> #include <sys/sysmacros.h> #include <sys/param.h> @@ -177,6 +178,10 @@ svc_tli_kcreate( */ xprt->xp_req_head = (mblk_t *)0; xprt->xp_req_tail = (mblk_t *)0; + xprt->xp_full = FALSE; + xprt->xp_enable = FALSE; + xprt->xp_reqs = 0; + xprt->xp_size = 0; mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL); mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL); xprt->xp_type = tinfo.SERV_type; @@ -226,7 +231,7 @@ svc_tli_kcreate( if (hotstream && tinfo.SERV_type == T_CLTS) { udpmaj = ddi_name_to_major("udp"); if (udpmaj != (major_t)-1 && - getmajor(fp->f_vnode->v_rdev) == udpmaj) + getmajor(fp->f_vnode->v_rdev) == udpmaj) create_putlocks(wq, 1); } diff --git a/usr/src/uts/common/rpc/svc_rdma.c b/usr/src/uts/common/rpc/svc_rdma.c index 905b479112..e0a0fe32d1 100644 --- a/usr/src/uts/common/rpc/svc_rdma.c +++ b/usr/src/uts/common/rpc/svc_rdma.c @@ -20,6 +20,7 @@ */ /* * Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved. + * Copyright 2013 Nexenta Systems, Inc. All rights reserved. */ /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */ /* All Rights Reserved */ @@ -256,6 +257,10 @@ svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id, mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL); xprt->xp_req_head = (mblk_t *)0; xprt->xp_req_tail = (mblk_t *)0; + xprt->xp_full = FALSE; + xprt->xp_enable = FALSE; + xprt->xp_reqs = 0; + xprt->xp_size = 0; xprt->xp_threads = 0; xprt->xp_detached_threads = 0; |