summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--usr/src/uts/common/rpc/rpcib.c4
-rw-r--r--usr/src/uts/common/rpc/rpcmod.c290
-rw-r--r--usr/src/uts/common/rpc/svc.c290
-rw-r--r--usr/src/uts/common/rpc/svc.h12
-rw-r--r--usr/src/uts/common/rpc/svc_gen.c11
-rw-r--r--usr/src/uts/common/rpc/svc_rdma.c5
6 files changed, 349 insertions, 263 deletions
diff --git a/usr/src/uts/common/rpc/rpcib.c b/usr/src/uts/common/rpc/rpcib.c
index a2ded72177..07719f74fe 100644
--- a/usr/src/uts/common/rpc/rpcib.c
+++ b/usr/src/uts/common/rpc/rpcib.c
@@ -20,6 +20,7 @@
*/
/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
*/
/*
@@ -1420,7 +1421,8 @@ rib_svc_rcq_handler(ibt_cq_hdl_t cq_hdl, void *arg)
rdp->rpcmsg.len = wc.wc_bytes_xfer;
rdp->status = wc.wc_status;
mp->b_wptr += sizeof (*rdp);
- svc_queuereq((queue_t *)rib_stat->q, mp);
+ (void) svc_queuereq((queue_t *)rib_stat->q, mp,
+ FALSE);
mutex_exit(&plugin_state_lock);
} else {
/*
diff --git a/usr/src/uts/common/rpc/rpcmod.c b/usr/src/uts/common/rpc/rpcmod.c
index fc99ca89b3..09863c970f 100644
--- a/usr/src/uts/common/rpc/rpcmod.c
+++ b/usr/src/uts/common/rpc/rpcmod.c
@@ -21,7 +21,10 @@
/*
* Copyright 2010 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
+ */
+/*
* Copyright 2012 Milan Jurik. All rights reserved.
+ * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
*/
/* Copyright (c) 1990 Mentat Inc. */
@@ -231,9 +234,6 @@ static void mir_wsrv(queue_t *q);
static struct module_info rpcmod_info =
{RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024};
-/*
- * Read side has no service procedure.
- */
static struct qinit rpcmodrinit = {
(int (*)())rmm_rput,
(int (*)())rmm_rsrv,
@@ -269,6 +269,9 @@ struct xprt_style_ops {
void (*xo_rsrv)();
};
+/*
+ * Read side has no service procedure.
+ */
static struct xprt_style_ops xprt_clts_ops = {
rpcmodopen,
rpcmodclose,
@@ -291,7 +294,7 @@ static struct xprt_style_ops xprt_cots_ops = {
* Per rpcmod "slot" data structure. q->q_ptr points to one of these.
*/
struct rpcm {
- void *rm_krpc_cell; /* Reserved for use by KRPC */
+ void *rm_krpc_cell; /* Reserved for use by kRPC */
struct xprt_style_ops *rm_ops;
int rm_type; /* Client or server side stream */
#define RM_CLOSING 0x1 /* somebody is trying to close slot */
@@ -312,7 +315,7 @@ struct temp_slot {
};
typedef struct mir_s {
- void *mir_krpc_cell; /* Reserved for KRPC use. This field */
+ void *mir_krpc_cell; /* Reserved for kRPC use. This field */
/* must be first in the structure. */
struct xprt_style_ops *rm_ops;
int mir_type; /* Client or server side stream */
@@ -362,7 +365,7 @@ typedef struct mir_s {
* to 1 whenever a new request is sent out (mir_wput)
* and cleared when the timer fires (mir_timer). If
* the timer fires with this value equal to 0, then the
- * stream is considered idle and KRPC is notified.
+ * stream is considered idle and kRPC is notified.
*/
mir_clntreq : 1,
/*
@@ -404,9 +407,9 @@ typedef struct mir_s {
/* that a kernel RPC server thread */
/* (see svc_run()) has on this rpcmod */
/* slot. Effectively, it is the */
- /* number * of unprocessed messages */
+ /* number of unprocessed messages */
/* that have been passed up to the */
- /* KRPC layer */
+ /* kRPC layer */
mblk_t *mir_svc_pend_mp; /* Pending T_ORDREL_IND or */
/* T_DISCON_IND */
@@ -567,7 +570,7 @@ rmm_close(queue_t *q, int flag, cred_t *crp)
return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp));
}
-static void rpcmod_release(queue_t *, mblk_t *);
+static void rpcmod_release(queue_t *, mblk_t *, bool_t);
/*
* rpcmodopen - open routine gets called when the module gets pushed
* onto the stream.
@@ -578,7 +581,7 @@ rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
{
struct rpcm *rmp;
- extern void (*rpc_rele)(queue_t *, mblk_t *);
+ extern void (*rpc_rele)(queue_t *, mblk_t *, bool_t);
TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:");
@@ -687,13 +690,6 @@ rpcmodclose(queue_t *q, int flag, cred_t *crp)
return (0);
}
-#ifdef DEBUG
-int rpcmod_send_msg_up = 0;
-int rpcmod_send_uderr = 0;
-int rpcmod_send_dup = 0;
-int rpcmod_send_dup_cnt = 0;
-#endif
-
/*
* rpcmodrput - Module read put procedure. This is called from
* the module, driver, or stream head downstream.
@@ -715,54 +711,6 @@ rpcmodrput(queue_t *q, mblk_t *mp)
return;
}
-#ifdef DEBUG
- if (rpcmod_send_msg_up > 0) {
- mblk_t *nmp = copymsg(mp);
- if (nmp) {
- putnext(q, nmp);
- rpcmod_send_msg_up--;
- }
- }
- if ((rpcmod_send_uderr > 0) && mp->b_datap->db_type == M_PROTO) {
- mblk_t *nmp;
- struct T_unitdata_ind *data;
- struct T_uderror_ind *ud;
- int d;
- data = (struct T_unitdata_ind *)mp->b_rptr;
- if (data->PRIM_type == T_UNITDATA_IND) {
- d = sizeof (*ud) - sizeof (*data);
- nmp = allocb(mp->b_wptr - mp->b_rptr + d, BPRI_HI);
- if (nmp) {
- ud = (struct T_uderror_ind *)nmp->b_rptr;
- ud->PRIM_type = T_UDERROR_IND;
- ud->DEST_length = data->SRC_length;
- ud->DEST_offset = data->SRC_offset + d;
- ud->OPT_length = data->OPT_length;
- ud->OPT_offset = data->OPT_offset + d;
- ud->ERROR_type = ENETDOWN;
- if (data->SRC_length) {
- bcopy(mp->b_rptr +
- data->SRC_offset,
- nmp->b_rptr +
- ud->DEST_offset,
- data->SRC_length);
- }
- if (data->OPT_length) {
- bcopy(mp->b_rptr +
- data->OPT_offset,
- nmp->b_rptr +
- ud->OPT_offset,
- data->OPT_length);
- }
- nmp->b_wptr += d;
- nmp->b_wptr += (mp->b_wptr - mp->b_rptr);
- nmp->b_datap->db_type = M_PROTO;
- putnext(q, nmp);
- rpcmod_send_uderr--;
- }
- }
- }
-#endif
switch (mp->b_datap->db_type) {
default:
putnext(q, mp);
@@ -774,14 +722,12 @@ rpcmodrput(queue_t *q, mblk_t *mp)
pptr = (union T_primitives *)mp->b_rptr;
/*
- * Forward this message to krpc if it is data.
+ * Forward this message to kRPC if it is data.
*/
if (pptr->type == T_UNITDATA_IND) {
- mblk_t *nmp;
-
- /*
- * Check if the module is being popped.
- */
+ /*
+ * Check if the module is being popped.
+ */
mutex_enter(&rmp->rm_lock);
if (rmp->rm_state & RM_CLOSING) {
mutex_exit(&rmp->rm_lock);
@@ -818,47 +764,21 @@ rpcmodrput(queue_t *q, mblk_t *mp)
case RPC_SERVER:
/*
* rm_krpc_cell is exclusively used by the kRPC
- * CLTS server
+ * CLTS server. Try to submit the message to
+ * kRPC. Since this is an unreliable channel, we
+ * can just free the message in case the kRPC
+ * does not accept new messages.
*/
- if (rmp->rm_krpc_cell) {
-#ifdef DEBUG
- /*
- * Test duplicate request cache and
- * rm_ref count handling by sending a
- * duplicate every so often, if
- * desired.
- */
- if (rpcmod_send_dup &&
- rpcmod_send_dup_cnt++ %
- rpcmod_send_dup)
- nmp = copymsg(mp);
- else
- nmp = NULL;
-#endif
+ if (rmp->rm_krpc_cell &&
+ svc_queuereq(q, mp, TRUE)) {
/*
* Raise the reference count on this
* module to prevent it from being
- * popped before krpc generates the
+ * popped before kRPC generates the
* reply.
*/
rmp->rm_ref++;
mutex_exit(&rmp->rm_lock);
-
- /*
- * Submit the message to krpc.
- */
- svc_queuereq(q, mp);
-#ifdef DEBUG
- /*
- * Send duplicate if we created one.
- */
- if (nmp) {
- mutex_enter(&rmp->rm_lock);
- rmp->rm_ref++;
- mutex_exit(&rmp->rm_lock);
- svc_queuereq(q, nmp);
- }
-#endif
} else {
mutex_exit(&rmp->rm_lock);
freemsg(mp);
@@ -1030,8 +950,9 @@ rpcmodwsrv(queue_t *q)
}
}
+/* ARGSUSED */
static void
-rpcmod_release(queue_t *q, mblk_t *bp)
+rpcmod_release(queue_t *q, mblk_t *bp, bool_t enable)
{
struct rpcm *rmp;
@@ -1084,7 +1005,7 @@ rpcmod_release(queue_t *q, mblk_t *bp)
static int mir_clnt_dup_request(queue_t *q, mblk_t *mp);
static void mir_rput_proto(queue_t *q, mblk_t *mp);
static int mir_svc_policy_notify(queue_t *q, int event);
-static void mir_svc_release(queue_t *wq, mblk_t *mp);
+static void mir_svc_release(queue_t *wq, mblk_t *mp, bool_t);
static void mir_svc_start(queue_t *wq);
static void mir_svc_idle_start(queue_t *, mir_t *);
static void mir_svc_idle_stop(queue_t *, mir_t *);
@@ -1099,7 +1020,7 @@ static void mir_disconnect(queue_t *, mir_t *ir);
static int mir_check_len(queue_t *, int32_t, mblk_t *);
static void mir_timer(void *);
-extern void (*mir_rele)(queue_t *, mblk_t *);
+extern void (*mir_rele)(queue_t *, mblk_t *, bool_t);
extern void (*mir_start)(queue_t *);
extern void (*clnt_stop_idle)(queue_t *);
@@ -1256,7 +1177,7 @@ mir_close(queue_t *q)
mutex_exit(&mir->mir_mutex);
qprocsoff(q);
- /* Notify KRPC that this stream is going away. */
+ /* Notify kRPC that this stream is going away. */
svc_queueclose(q);
} else {
mutex_exit(&mir->mir_mutex);
@@ -1337,7 +1258,7 @@ mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
mir_t *mir;
RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q);
- /* Set variables used directly by KRPC. */
+ /* Set variables used directly by kRPC. */
if (!mir_rele)
mir_rele = mir_svc_release;
if (!mir_start)
@@ -1357,7 +1278,7 @@ mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
* be held on the read-side queue until the stream is completely
* initialized with a RPC_CLIENT or RPC_SERVER ioctl. During
* the ioctl processing, the flag is cleared and any messages that
- * arrived between the open and the ioctl are delivered to KRPC.
+ * arrived between the open and the ioctl are delivered to kRPC.
*
* Early data should never arrive on a client stream since
* servers only respond to our requests and we do not send any.
@@ -1365,7 +1286,7 @@ mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
* very common on a server stream where the client will start
* sending data as soon as the connection is made (and this
* is especially true with TCP where the protocol accepts the
- * connection before nfsd or KRPC is notified about it).
+ * connection before nfsd or kRPC is notified about it).
*/
mir->mir_hold_inbound = 1;
@@ -1420,7 +1341,7 @@ mir_rput(queue_t *q, mblk_t *mp)
* If the stream has not been set up as a RPC_CLIENT or RPC_SERVER
* with the corresponding ioctl, then don't accept
* any inbound data. This should never happen for streams
- * created by nfsd or client-side KRPC because they are careful
+ * created by nfsd or client-side kRPC because they are careful
* to set the mode of the stream before doing anything else.
*/
if (mir->mir_type == 0) {
@@ -1456,7 +1377,7 @@ mir_rput(queue_t *q, mblk_t *mp)
* If a module on the stream is trying set the Stream head's
* high water mark, then set our hiwater to the requested
* value. We are the "stream head" for all inbound
- * data messages since messages are passed directly to KRPC.
+ * data messages since messages are passed directly to kRPC.
*/
if (MBLKL(mp) >= sizeof (struct stroptions)) {
struct stroptions *stropts;
@@ -1629,23 +1550,32 @@ mir_rput(queue_t *q, mblk_t *mp)
case RPC_SERVER:
/*
* Check for flow control before passing the
- * message to KRPC.
+ * message to kRPC.
*/
if (!mir->mir_hold_inbound) {
if (mir->mir_krpc_cell) {
- /*
- * If the reference count is 0
- * (not including this request),
- * then the stream is transitioning
- * from idle to non-idle. In this case,
- * we cancel the idle timer.
- */
- if (mir->mir_ref_cnt++ == 0)
- stop_timer = B_TRUE;
+
if (mir_check_len(q,
- (int32_t)msgdsize(mp), mp))
+ (int32_t)msgdsize(head_mp),
+ head_mp))
return;
- svc_queuereq(q, head_mp); /* to KRPC */
+
+ if (q->q_first == NULL &&
+ svc_queuereq(q, head_mp, TRUE)) {
+ /*
+ * If the reference count is 0
+ * (not including this
+ * request), then the stream is
+ * transitioning from idle to
+ * non-idle. In this case, we
+ * cancel the idle timer.
+ */
+ if (mir->mir_ref_cnt++ == 0)
+ stop_timer = B_TRUE;
+ } else {
+ (void) putq(q, head_mp);
+ mir->mir_inrservice = B_TRUE;
+ }
} else {
/*
* Count # of times this happens. Should
@@ -1811,7 +1741,7 @@ mir_rput_proto(queue_t *q, mblk_t *mp)
break;
default:
RPCLOG(1, "mir_rput: unexpected message %d "
- "for KRPC client\n",
+ "for kRPC client\n",
((union T_primitives *)mp->b_rptr)->type);
break;
}
@@ -1925,37 +1855,12 @@ mir_rput_proto(queue_t *q, mblk_t *mp)
* outbound flow control is exerted. When outbound flow control is
* relieved, mir_wsrv qenables the read-side queue. Read-side queues
* are not enabled by STREAMS and are explicitly noenable'ed in mir_open.
- *
- * For the server side, we have two types of messages queued. The first type
- * are messages that are ready to be XDR decoded and and then sent to the
- * RPC program's dispatch routine. The second type are "raw" messages that
- * haven't been processed, i.e. assembled from rpc record fragements into
- * full requests. The only time we will see the second type of message
- * queued is if we have a memory allocation failure while processing a
- * a raw message. The field mir_first_non_processed_mblk will mark the
- * first such raw message. So the flow for server side is:
- *
- * - send processed queued messages to kRPC until we run out or find
- * one that needs additional processing because we were short on memory
- * earlier
- * - process a message that was deferred because of lack of
- * memory
- * - continue processing messages until the queue empties or we
- * have to stop because of lack of memory
- * - during each of the above phase, if the queue is empty and
- * there are no pending messages that were passed to the RPC
- * layer, send upstream the pending disconnect/ordrel indication if
- * there is one
- *
- * The read-side queue is also enabled by a bufcall callback if dupmsg
- * fails in mir_rput.
*/
static void
mir_rsrv(queue_t *q)
{
mir_t *mir;
mblk_t *mp;
- mblk_t *cmp = NULL;
boolean_t stop_timer = B_FALSE;
mir = (mir_t *)q->q_ptr;
@@ -1966,43 +1871,28 @@ mir_rsrv(queue_t *q)
case RPC_SERVER:
if (mir->mir_ref_cnt == 0)
mir->mir_hold_inbound = 0;
- if (mir->mir_hold_inbound) {
-
- ASSERT(cmp == NULL);
- if (q->q_first == NULL) {
-
- MIR_CLEAR_INRSRV(mir);
-
- if (MIR_SVC_QUIESCED(mir)) {
- cmp = mir->mir_svc_pend_mp;
- mir->mir_svc_pend_mp = NULL;
- }
- }
-
- mutex_exit(&mir->mir_mutex);
-
- if (cmp != NULL) {
- RPCLOG(16, "mir_rsrv: line %d: sending a held "
- "disconnect/ord rel indication upstream\n",
- __LINE__);
- putnext(q, cmp);
- }
+ if (mir->mir_hold_inbound)
+ break;
- return;
- }
while (mp = getq(q)) {
if (mir->mir_krpc_cell &&
(mir->mir_svc_no_more_msgs == 0)) {
- /*
- * If we were idle, turn off idle timer since
- * we aren't idle any more.
- */
- if (mir->mir_ref_cnt++ == 0)
- stop_timer = B_TRUE;
+
if (mir_check_len(q,
(int32_t)msgdsize(mp), mp))
return;
- svc_queuereq(q, mp);
+
+ if (svc_queuereq(q, mp, TRUE)) {
+ /*
+ * If we were idle, turn off idle timer
+ * since we aren't idle any more.
+ */
+ if (mir->mir_ref_cnt++ == 0)
+ stop_timer = B_TRUE;
+ } else {
+ (void) putbq(q, mp);
+ break;
+ }
} else {
/*
* Count # of times this happens. Should be
@@ -2041,10 +1931,10 @@ mir_rsrv(queue_t *q)
}
if (q->q_first == NULL) {
+ mblk_t *cmp = NULL;
MIR_CLEAR_INRSRV(mir);
- ASSERT(cmp == NULL);
if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) {
cmp = mir->mir_svc_pend_mp;
mir->mir_svc_pend_mp = NULL;
@@ -2111,15 +2001,14 @@ mir_svc_start_close(queue_t *wq, mir_t *mir)
ASSERT((wq->q_flag & QREADR) == 0);
ASSERT(mir->mir_type == RPC_SERVER);
-
/*
* Do not accept any more messages.
*/
mir->mir_svc_no_more_msgs = 1;
/*
- * Next two statements will make the read service procedure invoke
- * svc_queuereq() on everything stuck in the streams read queue.
+ * Next two statements will make the read service procedure
+ * free everything stuck in the streams read queue.
* It's not necessary because enabling the write queue will
* have the same effect, but why not speed the process along?
*/
@@ -2134,11 +2023,11 @@ mir_svc_start_close(queue_t *wq, mir_t *mir)
}
/*
- * This routine is called directly by KRPC after a request is completed,
+ * This routine is called directly by kRPC after a request is completed,
* whether a reply was sent or the request was dropped.
*/
static void
-mir_svc_release(queue_t *wq, mblk_t *mp)
+mir_svc_release(queue_t *wq, mblk_t *mp, bool_t enable)
{
mir_t *mir = (mir_t *)wq->q_ptr;
mblk_t *cmp = NULL;
@@ -2147,6 +2036,9 @@ mir_svc_release(queue_t *wq, mblk_t *mp)
if (mp)
freemsg(mp);
+ if (enable)
+ qenable(RD(wq));
+
mutex_enter(&mir->mir_mutex);
/*
@@ -2194,7 +2086,7 @@ mir_svc_release(queue_t *wq, mblk_t *mp)
}
/*
- * This routine is called by server-side KRPC when it is ready to
+ * This routine is called by server-side kRPC when it is ready to
* handle inbound messages on the stream.
*/
static void
@@ -2286,7 +2178,7 @@ mir_timer(void *arg)
* For clients, the timer fires at clnt_idle_timeout
* intervals. If the activity marker (mir_clntreq) is
* zero, then the stream has been idle since the last
- * timer event and we notify KRPC. If mir_clntreq is
+ * timer event and we notify kRPC. If mir_clntreq is
* non-zero, then the stream is active and we just
* restart the timer for another interval. mir_clntreq
* is set to 1 in mir_wput for every request passed
@@ -2337,10 +2229,10 @@ printf("mir_timer[%d]: doing client timeout\n", now / hz);
mutex_exit(&mir->mir_mutex);
/*
* We pass T_ORDREL_REQ as an integer value
- * to KRPC as the indication that the stream
+ * to kRPC as the indication that the stream
* is idle. This is not a T_ORDREL_REQ message,
* it is just a convenient value since we call
- * the same KRPC routine for T_ORDREL_INDs and
+ * the same kRPC routine for T_ORDREL_INDs and
* T_DISCON_INDs.
*/
clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0);
@@ -2354,7 +2246,7 @@ printf("mir_timer[%d]: doing client timeout\n", now / hz);
* by mir_wput when mir_type is set to RPC_SERVER and
* by mir_svc_idle_start whenever the stream goes idle
* (mir_ref_cnt == 0). The timer is cancelled in
- * mir_rput whenever a new inbound request is passed to KRPC
+ * mir_rput whenever a new inbound request is passed to kRPC
* and the stream was previously idle.
*
* The timer interval can be changed for individual
@@ -2424,12 +2316,12 @@ mir_wput(queue_t *q, mblk_t *mp)
!IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) {
/*
* Since we know that M_DATA messages are created exclusively
- * by KRPC, we expect that KRPC will leave room for our header
+ * by kRPC, we expect that kRPC will leave room for our header
* and 4 byte align which is normal for XDR.
- * If KRPC (or someone else) does not cooperate, then we
+ * If kRPC (or someone else) does not cooperate, then we
* just throw away the message.
*/
- RPCLOG(1, "mir_wput: KRPC did not leave space for record "
+ RPCLOG(1, "mir_wput: kRPC did not leave space for record "
"fragment header (%d bytes left)\n",
(int)(rptr - mp->b_datap->db_base));
freemsg(mp);
@@ -2650,7 +2542,7 @@ ioc_eperm:
/*
* If the stream is not idle, then we hold the
* orderly release until it becomes idle. This
- * ensures that KRPC will be able to reply to
+ * ensures that kRPC will be able to reply to
* all requests that we have passed to it.
*
* We also queue the request if there is data already
@@ -2896,10 +2788,10 @@ mir_disconnect(queue_t *q, mir_t *mir)
mutex_exit(&mir->mir_mutex);
/*
- * T_DISCON_REQ is passed to KRPC as an integer value
+ * T_DISCON_REQ is passed to kRPC as an integer value
* (this is not a TPI message). It is used as a
* convenient value to indicate a sanity check
- * failure -- the same KRPC routine is also called
+ * failure -- the same kRPC routine is also called
* for T_DISCON_INDs and T_ORDREL_INDs.
*/
clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0);
@@ -2944,7 +2836,7 @@ mir_check_len(queue_t *q, int32_t frag_len, mblk_t *head_mp)
mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) {
cmn_err(CE_NOTE,
- "KRPC: record fragment from %s of size(%d) exceeds "
+ "kRPC: record fragment from %s of size(%d) exceeds "
"maximum (%u). Disconnecting",
(mir->mir_type == RPC_CLIENT) ? "server" :
(mir->mir_type == RPC_SERVER) ? "client" :
diff --git a/usr/src/uts/common/rpc/svc.c b/usr/src/uts/common/rpc/svc.c
index 250b5984c6..c0ca1ede3f 100644
--- a/usr/src/uts/common/rpc/svc.c
+++ b/usr/src/uts/common/rpc/svc.c
@@ -20,11 +20,12 @@
*/
/*
- * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
- * Use is subject to license terms.
+ * Copyright 2015 Nexenta Systems, Inc. All rights reserved.
*/
+
/*
- * Copyright 2014 Nexenta Systems, Inc. All rights reserved.
+ * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
+ * Use is subject to license terms.
*/
/*
@@ -47,7 +48,7 @@
* threads processing events on the transport. Some fields in the
* master structure are protected by locks
* - xp_req_lock protects the request queue:
- * xp_req_head, xp_req_tail
+ * xp_req_head, xp_req_tail, xp_reqs, xp_size, xp_full, xp_enable
* - xp_thread_lock protects the thread (clone) counts
* xp_threads, xp_detached_threads, xp_wq
* Each master transport is registered to exactly one thread pool.
@@ -72,7 +73,7 @@
* to restrict resource usage by the service. Some fields are protected
* by locks:
* - p_req_lock protects several counts and flags:
- * p_reqs, p_walkers, p_asleep, p_drowsy, p_req_cv
+ * p_reqs, p_size, p_walkers, p_asleep, p_drowsy, p_req_cv
* - p_thread_lock governs other thread counts:
* p_threads, p_detached_threads, p_reserved_threads, p_closing
*
@@ -113,13 +114,6 @@
* thread processes a request and sends a reply it returns to svc_run()
* and svc_run() calls svc_poll() to find new input.
*
- * There is no longer an "inconsistent" but "safe" optimization in the
- * svc_queuereq() code. This "inconsistent" state was leading to
- * inconsistencies between the actual number of requests and the value
- * of p_reqs (the total number of requests). Because of this, hangs were
- * occurring in svc_poll() where p_reqs was greater than one and no
- * requests were found on the request queues.
- *
* svc_poll().
* In order to avoid unnecessary locking, which causes performance
* problems, we always look for a pending request on the current transport.
@@ -201,6 +195,7 @@
#include <sys/user.h>
#include <sys/stream.h>
#include <sys/strsubr.h>
+#include <sys/strsun.h>
#include <sys/tihdr.h>
#include <sys/debug.h>
#include <sys/cmn_err.h>
@@ -292,6 +287,11 @@ struct svc_globals {
int rdma_check = 0;
/*
+ * This allows disabling flow control in svc_queuereq().
+ */
+volatile int svc_flowcontrol_disable = 0;
+
+/*
* Authentication parameters list.
*/
static caddr_t rqcred_head;
@@ -300,15 +300,15 @@ static kmutex_t rqcred_lock;
/*
* Pointers to transport specific `rele' routines in rpcmod (set from rpcmod).
*/
-void (*rpc_rele)(queue_t *, mblk_t *) = NULL;
-void (*mir_rele)(queue_t *, mblk_t *) = NULL;
+void (*rpc_rele)(queue_t *, mblk_t *, bool_t) = NULL;
+void (*mir_rele)(queue_t *, mblk_t *, bool_t) = NULL;
/* ARGSUSED */
void
-rpc_rdma_rele(queue_t *q, mblk_t *mp)
+rpc_rdma_rele(queue_t *q, mblk_t *mp, bool_t enable)
{
}
-void (*rdma_rele)(queue_t *, mblk_t *) = rpc_rdma_rele;
+void (*rdma_rele)(queue_t *, mblk_t *, bool_t) = rpc_rdma_rele;
/*
@@ -914,7 +914,7 @@ svc_xprt_qinit(SVCPOOL *pool, size_t qsize)
* - insert a pointer to xprt into the xprt-ready queue (FIFO)
* - if the xprt-ready queue is full turn the overflow flag on.
*
- * NOTICE: pool->p_qtop is protected by the the pool's request lock
+ * NOTICE: pool->p_qtop is protected by the pool's request lock
* and the caller (svc_queuereq()) must hold the lock.
*/
static void
@@ -922,7 +922,7 @@ svc_xprt_qput(SVCPOOL *pool, SVCMASTERXPRT *xprt)
{
ASSERT(MUTEX_HELD(&pool->p_req_lock));
- /* If the overflow flag is there is nothing we can do */
+ /* If the overflow flag is on there is nothing we can do */
if (pool->p_qoverflow)
return;
@@ -1871,15 +1871,8 @@ svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
if (xprt && xprt->xp_req_head && (!pool->p_qoverflow ||
clone_xprt->xp_same_xprt++ < pool->p_max_same_xprt)) {
mutex_enter(&xprt->xp_req_lock);
- if (xprt->xp_req_head) {
- mutex_enter(&pool->p_req_lock);
- pool->p_reqs--;
- if (pool->p_reqs == 0)
- pool->p_qoverflow = FALSE;
- mutex_exit(&pool->p_req_lock);
-
+ if (xprt->xp_req_head)
return (xprt);
- }
mutex_exit(&xprt->xp_req_lock);
}
clone_xprt->xp_same_xprt = 0;
@@ -1921,9 +1914,6 @@ svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
rw_exit(&pool->p_lrwlock);
mutex_enter(&pool->p_req_lock);
- pool->p_reqs--;
- if (pool->p_reqs == 0)
- pool->p_qoverflow = FALSE;
pool->p_walkers--;
mutex_exit(&pool->p_req_lock);
@@ -1994,9 +1984,6 @@ svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
rw_exit(&pool->p_lrwlock);
mutex_enter(&pool->p_req_lock);
- pool->p_reqs--;
- if (pool->p_reqs == 0)
- pool->p_qoverflow = FALSE;
pool->p_walkers--;
mutex_exit(&pool->p_req_lock);
@@ -2086,6 +2073,139 @@ svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
}
/*
+ * calculate memory space used by message
+ */
+static size_t
+svc_msgsize(mblk_t *mp)
+{
+ size_t count = 0;
+
+ for (; mp; mp = mp->b_cont)
+ count += MBLKSIZE(mp);
+
+ return (count);
+}
+
+/*
+ * svc_flowcontrol() attempts to turn the flow control on or off for the
+ * transport.
+ *
+ * On input the xprt->xp_full determines whether the flow control is currently
+ * off (FALSE) or on (TRUE). If it is off we do tests to see whether we should
+ * turn it on, and vice versa.
+ *
+ * There are two conditions considered for the flow control. Both conditions
+ * have the low and the high watermark. Once the high watermark is reached in
+ * EITHER condition the flow control is turned on. For turning the flow
+ * control off BOTH conditions must be below the low watermark.
+ *
+ * Condition #1 - Number of requests queued:
+ *
+ * The max number of threads working on the pool is roughly pool->p_maxthreads.
+ * Every thread could handle up to pool->p_max_same_xprt requests from one
+ * transport before it moves to another transport. See svc_poll() for details.
+ * In case all threads in the pool are working on a transport they will handle
+ * no more than enough_reqs (pool->p_maxthreads * pool->p_max_same_xprt)
+ * requests in one shot from that transport. We are turning the flow control
+ * on once the high watermark is reached for a transport so that the underlying
+ * queue knows the rate of incoming requests is higher than we are able to
+ * handle.
+ *
+ * The high watermark: 2 * enough_reqs
+ * The low watermark: enough_reqs
+ *
+ * Condition #2 - Length of the data payload for the queued messages/requests:
+ *
+ * We want to prevent a particular pool exhausting the memory, so once the
+ * total length of queued requests for the whole pool reaches the high
+ * watermark we start to turn on the flow control for significant memory
+ * consumers (individual transports). To keep the implementation simple
+ * enough, this condition is not exact, because we count only the data part of
+ * the queued requests and we ignore the overhead. For our purposes this
+ * should be enough. We should also consider that up to pool->p_maxthreads
+ * threads for the pool might work on large requests (this is not counted for
+ * this condition). We need to leave some space for rest of the system and for
+ * other big memory consumers (like ZFS). Also, after the flow control is
+ * turned on (on cots transports) we can start to accumulate a few megabytes in
+ * queues for each transport.
+ *
+ * Usually, the big memory consumers are NFS WRITE requests, so we do not
+ * expect to see this condition met for other than NFS pools.
+ *
+ * The high watermark: 1/5 of available memory
+ * The low watermark: 1/6 of available memory
+ *
+ * Once the high watermark is reached we turn the flow control on only for
+ * transports exceeding a per-transport memory limit. The per-transport
+ * fraction of memory is calculated as:
+ *
+ * the high watermark / number of transports
+ *
+ * For transports with less than the per-transport fraction of memory consumed,
+ * the flow control is not turned on, so they are not blocked by a few "hungry"
+ * transports. Because of this, the total memory consumption for the
+ * particular pool might grow up to 2 * the high watermark.
+ *
+ * The individual transports are unblocked once their consumption is below:
+ *
+ * per-transport fraction of memory / 2
+ *
+ * or once the total memory consumption for the whole pool falls below the low
+ * watermark.
+ *
+ */
+static void
+svc_flowcontrol(SVCMASTERXPRT *xprt)
+{
+ SVCPOOL *pool = xprt->xp_pool;
+ size_t totalmem = ptob(physmem);
+ int enough_reqs = pool->p_maxthreads * pool->p_max_same_xprt;
+
+ ASSERT(MUTEX_HELD(&xprt->xp_req_lock));
+
+ /* Should we turn the flow control on? */
+ if (xprt->xp_full == FALSE) {
+ /* Is flow control disabled? */
+ if (svc_flowcontrol_disable != 0)
+ return;
+
+ /* Is there enough requests queued? */
+ if (xprt->xp_reqs >= enough_reqs * 2) {
+ xprt->xp_full = TRUE;
+ return;
+ }
+
+ /*
+ * If this pool uses over 20% of memory and this transport is
+ * significant memory consumer then we are full
+ */
+ if (pool->p_size >= totalmem / 5 &&
+ xprt->xp_size >= totalmem / 5 / pool->p_lcount)
+ xprt->xp_full = TRUE;
+
+ return;
+ }
+
+ /* We might want to turn the flow control off */
+
+ /* Do we still have enough requests? */
+ if (xprt->xp_reqs > enough_reqs)
+ return;
+
+ /*
+ * If this pool still uses over 16% of memory and this transport is
+ * still significant memory consumer then we are still full
+ */
+ if (pool->p_size >= totalmem / 6 &&
+ xprt->xp_size >= totalmem / 5 / pool->p_lcount / 2)
+ return;
+
+ /* Turn the flow control off and make sure rpcmod is notified */
+ xprt->xp_full = FALSE;
+ xprt->xp_enable = TRUE;
+}
+
+/*
* Main loop of the kernel RPC server
* - wait for input (find a transport with a pending request).
* - dequeue the request
@@ -2111,6 +2231,8 @@ svc_run(SVCPOOL *pool)
for (;;) {
SVCMASTERXPRT *next;
mblk_t *mp;
+ bool_t enable;
+ size_t size;
TRACE_0(TR_FAC_KRPC, TR_SVC_RUN, "svc_run");
@@ -2168,6 +2290,20 @@ svc_run(SVCPOOL *pool)
mp = next->xp_req_head;
next->xp_req_head = mp->b_next;
mp->b_next = (mblk_t *)0;
+ size = svc_msgsize(mp);
+
+ mutex_enter(&pool->p_req_lock);
+ pool->p_reqs--;
+ if (pool->p_reqs == 0)
+ pool->p_qoverflow = FALSE;
+ pool->p_size -= size;
+ mutex_exit(&pool->p_req_lock);
+
+ next->xp_reqs--;
+ next->xp_size -= size;
+
+ if (next->xp_full)
+ svc_flowcontrol(next);
TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_DEQ,
"rpc_que_req_deq:pool %p mp %p", pool, mp);
@@ -2248,14 +2384,19 @@ svc_run(SVCPOOL *pool)
* Release our reference on the rpcmod
* slot attached to xp_wq->q_ptr.
*/
- (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL);
+ mutex_enter(&xprt->xp_req_lock);
+ enable = xprt->xp_enable;
+ if (enable)
+ xprt->xp_enable = FALSE;
+ mutex_exit(&xprt->xp_req_lock);
+ (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL, enable);
}
/* NOTREACHED */
}
/*
* Flush any pending requests for the queue and
- * and free the associated mblks.
+ * free the associated mblks.
*/
void
svc_queueclean(queue_t *q)
@@ -2270,14 +2411,21 @@ svc_queueclean(queue_t *q)
mutex_enter(&xprt->xp_req_lock);
pool = xprt->xp_pool;
while ((mp = xprt->xp_req_head) != NULL) {
- /* remove the request from the list and decrement p_reqs */
+ /* remove the request from the list */
xprt->xp_req_head = mp->b_next;
- mutex_enter(&pool->p_req_lock);
mp->b_next = (mblk_t *)0;
- pool->p_reqs--;
- mutex_exit(&pool->p_req_lock);
- (*RELE_PROC(xprt)) (xprt->xp_wq, mp);
+ (*RELE_PROC(xprt)) (xprt->xp_wq, mp, FALSE);
}
+
+ mutex_enter(&pool->p_req_lock);
+ pool->p_reqs -= xprt->xp_reqs;
+ pool->p_size -= xprt->xp_size;
+ mutex_exit(&pool->p_req_lock);
+
+ xprt->xp_reqs = 0;
+ xprt->xp_size = 0;
+ xprt->xp_full = FALSE;
+ xprt->xp_enable = FALSE;
mutex_exit(&xprt->xp_req_lock);
}
@@ -2362,14 +2510,16 @@ svc_queueclose(queue_t *q)
* - put a request at the tail of the transport request queue
* - insert a hint for svc_poll() into the xprt-ready queue
* - increment the `pending-requests' count for the pool
+ * - handle flow control
* - wake up a thread sleeping in svc_poll() if necessary
* - if all the threads are running ask the creator for a new one.
*/
-void
-svc_queuereq(queue_t *q, mblk_t *mp)
+bool_t
+svc_queuereq(queue_t *q, mblk_t *mp, bool_t flowcontrol)
{
SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
SVCPOOL *pool = xprt->xp_pool;
+ size_t size;
TRACE_0(TR_FAC_KRPC, TR_SVC_QUEUEREQ_START, "svc_queuereq_start");
@@ -2386,6 +2536,12 @@ svc_queuereq(queue_t *q, mblk_t *mp)
* pending request count it looks atomic.
*/
mutex_enter(&xprt->xp_req_lock);
+ if (flowcontrol && xprt->xp_full) {
+ mutex_exit(&xprt->xp_req_lock);
+
+ return (FALSE);
+ }
+ ASSERT(xprt->xp_full == FALSE);
mutex_enter(&pool->p_req_lock);
if (xprt->xp_req_head == NULL)
xprt->xp_req_head = mp;
@@ -2396,15 +2552,24 @@ svc_queuereq(queue_t *q, mblk_t *mp)
/*
* Step 2.
* Insert a hint into the xprt-ready queue, increment
- * `pending-requests' count for the pool, and wake up
+ * counters, handle flow control, and wake up
* a thread sleeping in svc_poll() if necessary.
*/
/* Insert pointer to this transport into the xprt-ready queue */
svc_xprt_qput(pool, xprt);
- /* Increment the `pending-requests' count for the pool */
+ /* Increment counters */
pool->p_reqs++;
+ xprt->xp_reqs++;
+
+ size = svc_msgsize(mp);
+ xprt->xp_size += size;
+ pool->p_size += size;
+
+ /* Handle flow control */
+ if (flowcontrol)
+ svc_flowcontrol(xprt);
TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_ENQ,
"rpc_que_req_enq:pool %p mp %p", pool, mp);
@@ -2449,6 +2614,8 @@ svc_queuereq(queue_t *q, mblk_t *mp)
TRACE_1(TR_FAC_KRPC, TR_SVC_QUEUEREQ_END,
"svc_queuereq_end:(%S)", "end");
+
+ return (TRUE);
}
/*
@@ -2539,6 +2706,7 @@ svc_detach_thread(SVCXPRT *clone_xprt)
{
SVCMASTERXPRT *xprt = clone_xprt->xp_master;
SVCPOOL *pool = xprt->xp_pool;
+ bool_t enable;
/* Thread must have a reservation */
ASSERT(clone_xprt->xp_reserved);
@@ -2558,7 +2726,12 @@ svc_detach_thread(SVCXPRT *clone_xprt)
mutex_exit(&pool->p_thread_lock);
/* Release an rpcmod slot for this request */
- (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL);
+ mutex_enter(&xprt->xp_req_lock);
+ enable = xprt->xp_enable;
+ if (enable)
+ xprt->xp_enable = FALSE;
+ mutex_exit(&xprt->xp_req_lock);
+ (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL, enable);
/* Mark the clone (thread) as detached */
clone_xprt->xp_reserved = FALSE;
@@ -2603,23 +2776,24 @@ rdma_stop(rdma_xprt_group_t *rdma_xprts)
mutex_enter(&xprt->xp_req_lock);
pool = xprt->xp_pool;
while ((mp = xprt->xp_req_head) != NULL) {
- /*
- * remove the request from the list and
- * decrement p_reqs
- */
+ rdma_recv_data_t *rdp = (rdma_recv_data_t *)mp->b_rptr;
+
+ /* remove the request from the list */
xprt->xp_req_head = mp->b_next;
- mutex_enter(&pool->p_req_lock);
mp->b_next = (mblk_t *)0;
- pool->p_reqs--;
- mutex_exit(&pool->p_req_lock);
- if (mp) {
- rdma_recv_data_t *rdp = (rdma_recv_data_t *)
- mp->b_rptr;
- RDMA_BUF_FREE(rdp->conn, &rdp->rpcmsg);
- RDMA_REL_CONN(rdp->conn);
- freemsg(mp);
- }
+
+ RDMA_BUF_FREE(rdp->conn, &rdp->rpcmsg);
+ RDMA_REL_CONN(rdp->conn);
+ freemsg(mp);
}
+ mutex_enter(&pool->p_req_lock);
+ pool->p_reqs -= xprt->xp_reqs;
+ pool->p_size -= xprt->xp_size;
+ mutex_exit(&pool->p_req_lock);
+ xprt->xp_reqs = 0;
+ xprt->xp_size = 0;
+ xprt->xp_full = FALSE;
+ xprt->xp_enable = FALSE;
mutex_exit(&xprt->xp_req_lock);
svc_queueclose(q);
#ifdef DEBUG
diff --git a/usr/src/uts/common/rpc/svc.h b/usr/src/uts/common/rpc/svc.h
index dd3cb44c27..134b690a3f 100644
--- a/usr/src/uts/common/rpc/svc.h
+++ b/usr/src/uts/common/rpc/svc.h
@@ -20,6 +20,7 @@
*/
/*
* Copyright (c) 1989, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
*/
/* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
/* All Rights Reserved */
@@ -254,7 +255,7 @@ struct __svcpool {
* The pool's thread lock p_thread_lock protects:
* - p_threads, p_detached_threads, p_reserved_threads and p_closing
* The pool's request lock protects:
- * - p_asleep, p_drowsy, p_reqs, p_walkers, p_req_cv.
+ * - p_asleep, p_drowsy, p_reqs, p_size, p_walkers, p_req_cv.
* The following fields are `initialized constants':
* - p_id, p_stksize, p_timeout.
* Access to p_next and p_prev is protected by the pool
@@ -359,6 +360,8 @@ struct __svcpool {
kmutex_t p_user_lock; /* Creator lock */
void (*p_offline)(); /* callout for unregister */
void (*p_shutdown)(); /* callout for shutdown */
+
+ size_t p_size; /* Total size of queued msgs */
};
/*
@@ -426,6 +429,11 @@ struct __svcmasterxprt {
struct netbuf xp_addrmask; /* address mask */
caddr_t xp_p2; /* private: for use by svc ops */
+
+ int xp_full : 1; /* xprt is full */
+ int xp_enable : 1; /* xprt needs to be enabled */
+ int xp_reqs; /* number of requests queued */
+ size_t xp_size; /* total size of queued msgs */
};
/*
@@ -787,7 +795,7 @@ extern int svc_clts_kcreate(struct file *, uint_t, struct T_info_ack *,
SVCMASTERXPRT **);
extern int svc_cots_kcreate(struct file *, uint_t, struct T_info_ack *,
SVCMASTERXPRT **);
-extern void svc_queuereq(queue_t *, mblk_t *);
+extern bool_t svc_queuereq(queue_t *, mblk_t *, bool_t);
extern void svc_queueclean(queue_t *);
extern void svc_queueclose(queue_t *);
extern int svc_reserve_thread(SVCXPRT *);
diff --git a/usr/src/uts/common/rpc/svc_gen.c b/usr/src/uts/common/rpc/svc_gen.c
index fafbff299a..87ea2a7282 100644
--- a/usr/src/uts/common/rpc/svc_gen.c
+++ b/usr/src/uts/common/rpc/svc_gen.c
@@ -23,6 +23,9 @@
* Copyright 2004 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
+/*
+ * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
+ */
/* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
/* All Rights Reserved */
@@ -32,8 +35,6 @@
* under license from the Regents of the University of California.
*/
-#pragma ident "%Z%%M% %I% %E% SMI"
-
#include <sys/types.h>
#include <sys/sysmacros.h>
#include <sys/param.h>
@@ -177,6 +178,10 @@ svc_tli_kcreate(
*/
xprt->xp_req_head = (mblk_t *)0;
xprt->xp_req_tail = (mblk_t *)0;
+ xprt->xp_full = FALSE;
+ xprt->xp_enable = FALSE;
+ xprt->xp_reqs = 0;
+ xprt->xp_size = 0;
mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL);
mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL);
xprt->xp_type = tinfo.SERV_type;
@@ -226,7 +231,7 @@ svc_tli_kcreate(
if (hotstream && tinfo.SERV_type == T_CLTS) {
udpmaj = ddi_name_to_major("udp");
if (udpmaj != (major_t)-1 &&
- getmajor(fp->f_vnode->v_rdev) == udpmaj)
+ getmajor(fp->f_vnode->v_rdev) == udpmaj)
create_putlocks(wq, 1);
}
diff --git a/usr/src/uts/common/rpc/svc_rdma.c b/usr/src/uts/common/rpc/svc_rdma.c
index 905b479112..e0a0fe32d1 100644
--- a/usr/src/uts/common/rpc/svc_rdma.c
+++ b/usr/src/uts/common/rpc/svc_rdma.c
@@ -20,6 +20,7 @@
*/
/*
* Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
*/
/* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
/* All Rights Reserved */
@@ -256,6 +257,10 @@ svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id,
mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL);
xprt->xp_req_head = (mblk_t *)0;
xprt->xp_req_tail = (mblk_t *)0;
+ xprt->xp_full = FALSE;
+ xprt->xp_enable = FALSE;
+ xprt->xp_reqs = 0;
+ xprt->xp_size = 0;
xprt->xp_threads = 0;
xprt->xp_detached_threads = 0;