summaryrefslogtreecommitdiff
path: root/usr/src/uts/common
diff options
context:
space:
mode:
authorJerry Jelinek <jerry.jelinek@joyent.com>2015-05-13 11:27:08 +0000
committerJerry Jelinek <jerry.jelinek@joyent.com>2015-05-13 11:27:08 +0000
commit3b589280f3e40ebc05dce708bc90bfefe0ad6730 (patch)
tree99dd749659ffcfb58754e76e72eab600b28f8e31 /usr/src/uts/common
parent58d6b4e51657a5c05c3c9a7bbf051442156aacd2 (diff)
parent2695d4f4d1e2a6022c8a279d40c3cb750964974d (diff)
downloadillumos-joyent-3b589280f3e40ebc05dce708bc90bfefe0ad6730.tar.gz
[illumos-gate merge]
commit 2695d4f4d1e2a6022c8a279d40c3cb750964974d 3783 Flow control is needed in rpcmod when the NFS server is unable to keep up with the network commit 7d593912b33208f97b6a9b5aa18d6bc0227c8a67 5819 want dumpadm(1m) option to print estimated dump size
Diffstat (limited to 'usr/src/uts/common')
-rw-r--r--usr/src/uts/common/rpc/rpcib.c4
-rw-r--r--usr/src/uts/common/rpc/rpcmod.c290
-rw-r--r--usr/src/uts/common/rpc/svc.c290
-rw-r--r--usr/src/uts/common/rpc/svc.h12
-rw-r--r--usr/src/uts/common/rpc/svc_gen.c11
-rw-r--r--usr/src/uts/common/rpc/svc_rdma.c5
6 files changed, 349 insertions, 263 deletions
diff --git a/usr/src/uts/common/rpc/rpcib.c b/usr/src/uts/common/rpc/rpcib.c
index a2ded72177..07719f74fe 100644
--- a/usr/src/uts/common/rpc/rpcib.c
+++ b/usr/src/uts/common/rpc/rpcib.c
@@ -20,6 +20,7 @@
*/
/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
*/
/*
@@ -1420,7 +1421,8 @@ rib_svc_rcq_handler(ibt_cq_hdl_t cq_hdl, void *arg)
rdp->rpcmsg.len = wc.wc_bytes_xfer;
rdp->status = wc.wc_status;
mp->b_wptr += sizeof (*rdp);
- svc_queuereq((queue_t *)rib_stat->q, mp);
+ (void) svc_queuereq((queue_t *)rib_stat->q, mp,
+ FALSE);
mutex_exit(&plugin_state_lock);
} else {
/*
diff --git a/usr/src/uts/common/rpc/rpcmod.c b/usr/src/uts/common/rpc/rpcmod.c
index fc99ca89b3..09863c970f 100644
--- a/usr/src/uts/common/rpc/rpcmod.c
+++ b/usr/src/uts/common/rpc/rpcmod.c
@@ -21,7 +21,10 @@
/*
* Copyright 2010 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
+ */
+/*
* Copyright 2012 Milan Jurik. All rights reserved.
+ * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
*/
/* Copyright (c) 1990 Mentat Inc. */
@@ -231,9 +234,6 @@ static void mir_wsrv(queue_t *q);
static struct module_info rpcmod_info =
{RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024};
-/*
- * Read side has no service procedure.
- */
static struct qinit rpcmodrinit = {
(int (*)())rmm_rput,
(int (*)())rmm_rsrv,
@@ -269,6 +269,9 @@ struct xprt_style_ops {
void (*xo_rsrv)();
};
+/*
+ * Read side has no service procedure.
+ */
static struct xprt_style_ops xprt_clts_ops = {
rpcmodopen,
rpcmodclose,
@@ -291,7 +294,7 @@ static struct xprt_style_ops xprt_cots_ops = {
* Per rpcmod "slot" data structure. q->q_ptr points to one of these.
*/
struct rpcm {
- void *rm_krpc_cell; /* Reserved for use by KRPC */
+ void *rm_krpc_cell; /* Reserved for use by kRPC */
struct xprt_style_ops *rm_ops;
int rm_type; /* Client or server side stream */
#define RM_CLOSING 0x1 /* somebody is trying to close slot */
@@ -312,7 +315,7 @@ struct temp_slot {
};
typedef struct mir_s {
- void *mir_krpc_cell; /* Reserved for KRPC use. This field */
+ void *mir_krpc_cell; /* Reserved for kRPC use. This field */
/* must be first in the structure. */
struct xprt_style_ops *rm_ops;
int mir_type; /* Client or server side stream */
@@ -362,7 +365,7 @@ typedef struct mir_s {
* to 1 whenever a new request is sent out (mir_wput)
* and cleared when the timer fires (mir_timer). If
* the timer fires with this value equal to 0, then the
- * stream is considered idle and KRPC is notified.
+ * stream is considered idle and kRPC is notified.
*/
mir_clntreq : 1,
/*
@@ -404,9 +407,9 @@ typedef struct mir_s {
/* that a kernel RPC server thread */
/* (see svc_run()) has on this rpcmod */
/* slot. Effectively, it is the */
- /* number * of unprocessed messages */
+ /* number of unprocessed messages */
/* that have been passed up to the */
- /* KRPC layer */
+ /* kRPC layer */
mblk_t *mir_svc_pend_mp; /* Pending T_ORDREL_IND or */
/* T_DISCON_IND */
@@ -567,7 +570,7 @@ rmm_close(queue_t *q, int flag, cred_t *crp)
return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp));
}
-static void rpcmod_release(queue_t *, mblk_t *);
+static void rpcmod_release(queue_t *, mblk_t *, bool_t);
/*
* rpcmodopen - open routine gets called when the module gets pushed
* onto the stream.
@@ -578,7 +581,7 @@ rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
{
struct rpcm *rmp;
- extern void (*rpc_rele)(queue_t *, mblk_t *);
+ extern void (*rpc_rele)(queue_t *, mblk_t *, bool_t);
TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:");
@@ -687,13 +690,6 @@ rpcmodclose(queue_t *q, int flag, cred_t *crp)
return (0);
}
-#ifdef DEBUG
-int rpcmod_send_msg_up = 0;
-int rpcmod_send_uderr = 0;
-int rpcmod_send_dup = 0;
-int rpcmod_send_dup_cnt = 0;
-#endif
-
/*
* rpcmodrput - Module read put procedure. This is called from
* the module, driver, or stream head downstream.
@@ -715,54 +711,6 @@ rpcmodrput(queue_t *q, mblk_t *mp)
return;
}
-#ifdef DEBUG
- if (rpcmod_send_msg_up > 0) {
- mblk_t *nmp = copymsg(mp);
- if (nmp) {
- putnext(q, nmp);
- rpcmod_send_msg_up--;
- }
- }
- if ((rpcmod_send_uderr > 0) && mp->b_datap->db_type == M_PROTO) {
- mblk_t *nmp;
- struct T_unitdata_ind *data;
- struct T_uderror_ind *ud;
- int d;
- data = (struct T_unitdata_ind *)mp->b_rptr;
- if (data->PRIM_type == T_UNITDATA_IND) {
- d = sizeof (*ud) - sizeof (*data);
- nmp = allocb(mp->b_wptr - mp->b_rptr + d, BPRI_HI);
- if (nmp) {
- ud = (struct T_uderror_ind *)nmp->b_rptr;
- ud->PRIM_type = T_UDERROR_IND;
- ud->DEST_length = data->SRC_length;
- ud->DEST_offset = data->SRC_offset + d;
- ud->OPT_length = data->OPT_length;
- ud->OPT_offset = data->OPT_offset + d;
- ud->ERROR_type = ENETDOWN;
- if (data->SRC_length) {
- bcopy(mp->b_rptr +
- data->SRC_offset,
- nmp->b_rptr +
- ud->DEST_offset,
- data->SRC_length);
- }
- if (data->OPT_length) {
- bcopy(mp->b_rptr +
- data->OPT_offset,
- nmp->b_rptr +
- ud->OPT_offset,
- data->OPT_length);
- }
- nmp->b_wptr += d;
- nmp->b_wptr += (mp->b_wptr - mp->b_rptr);
- nmp->b_datap->db_type = M_PROTO;
- putnext(q, nmp);
- rpcmod_send_uderr--;
- }
- }
- }
-#endif
switch (mp->b_datap->db_type) {
default:
putnext(q, mp);
@@ -774,14 +722,12 @@ rpcmodrput(queue_t *q, mblk_t *mp)
pptr = (union T_primitives *)mp->b_rptr;
/*
- * Forward this message to krpc if it is data.
+ * Forward this message to kRPC if it is data.
*/
if (pptr->type == T_UNITDATA_IND) {
- mblk_t *nmp;
-
- /*
- * Check if the module is being popped.
- */
+ /*
+ * Check if the module is being popped.
+ */
mutex_enter(&rmp->rm_lock);
if (rmp->rm_state & RM_CLOSING) {
mutex_exit(&rmp->rm_lock);
@@ -818,47 +764,21 @@ rpcmodrput(queue_t *q, mblk_t *mp)
case RPC_SERVER:
/*
* rm_krpc_cell is exclusively used by the kRPC
- * CLTS server
+ * CLTS server. Try to submit the message to
+ * kRPC. Since this is an unreliable channel, we
+ * can just free the message in case the kRPC
+ * does not accept new messages.
*/
- if (rmp->rm_krpc_cell) {
-#ifdef DEBUG
- /*
- * Test duplicate request cache and
- * rm_ref count handling by sending a
- * duplicate every so often, if
- * desired.
- */
- if (rpcmod_send_dup &&
- rpcmod_send_dup_cnt++ %
- rpcmod_send_dup)
- nmp = copymsg(mp);
- else
- nmp = NULL;
-#endif
+ if (rmp->rm_krpc_cell &&
+ svc_queuereq(q, mp, TRUE)) {
/*
* Raise the reference count on this
* module to prevent it from being
- * popped before krpc generates the
+ * popped before kRPC generates the
* reply.
*/
rmp->rm_ref++;
mutex_exit(&rmp->rm_lock);
-
- /*
- * Submit the message to krpc.
- */
- svc_queuereq(q, mp);
-#ifdef DEBUG
- /*
- * Send duplicate if we created one.
- */
- if (nmp) {
- mutex_enter(&rmp->rm_lock);
- rmp->rm_ref++;
- mutex_exit(&rmp->rm_lock);
- svc_queuereq(q, nmp);
- }
-#endif
} else {
mutex_exit(&rmp->rm_lock);
freemsg(mp);
@@ -1030,8 +950,9 @@ rpcmodwsrv(queue_t *q)
}
}
+/* ARGSUSED */
static void
-rpcmod_release(queue_t *q, mblk_t *bp)
+rpcmod_release(queue_t *q, mblk_t *bp, bool_t enable)
{
struct rpcm *rmp;
@@ -1084,7 +1005,7 @@ rpcmod_release(queue_t *q, mblk_t *bp)
static int mir_clnt_dup_request(queue_t *q, mblk_t *mp);
static void mir_rput_proto(queue_t *q, mblk_t *mp);
static int mir_svc_policy_notify(queue_t *q, int event);
-static void mir_svc_release(queue_t *wq, mblk_t *mp);
+static void mir_svc_release(queue_t *wq, mblk_t *mp, bool_t);
static void mir_svc_start(queue_t *wq);
static void mir_svc_idle_start(queue_t *, mir_t *);
static void mir_svc_idle_stop(queue_t *, mir_t *);
@@ -1099,7 +1020,7 @@ static void mir_disconnect(queue_t *, mir_t *ir);
static int mir_check_len(queue_t *, int32_t, mblk_t *);
static void mir_timer(void *);
-extern void (*mir_rele)(queue_t *, mblk_t *);
+extern void (*mir_rele)(queue_t *, mblk_t *, bool_t);
extern void (*mir_start)(queue_t *);
extern void (*clnt_stop_idle)(queue_t *);
@@ -1256,7 +1177,7 @@ mir_close(queue_t *q)
mutex_exit(&mir->mir_mutex);
qprocsoff(q);
- /* Notify KRPC that this stream is going away. */
+ /* Notify kRPC that this stream is going away. */
svc_queueclose(q);
} else {
mutex_exit(&mir->mir_mutex);
@@ -1337,7 +1258,7 @@ mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
mir_t *mir;
RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q);
- /* Set variables used directly by KRPC. */
+ /* Set variables used directly by kRPC. */
if (!mir_rele)
mir_rele = mir_svc_release;
if (!mir_start)
@@ -1357,7 +1278,7 @@ mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
* be held on the read-side queue until the stream is completely
* initialized with a RPC_CLIENT or RPC_SERVER ioctl. During
* the ioctl processing, the flag is cleared and any messages that
- * arrived between the open and the ioctl are delivered to KRPC.
+ * arrived between the open and the ioctl are delivered to kRPC.
*
* Early data should never arrive on a client stream since
* servers only respond to our requests and we do not send any.
@@ -1365,7 +1286,7 @@ mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
* very common on a server stream where the client will start
* sending data as soon as the connection is made (and this
* is especially true with TCP where the protocol accepts the
- * connection before nfsd or KRPC is notified about it).
+ * connection before nfsd or kRPC is notified about it).
*/
mir->mir_hold_inbound = 1;
@@ -1420,7 +1341,7 @@ mir_rput(queue_t *q, mblk_t *mp)
* If the stream has not been set up as a RPC_CLIENT or RPC_SERVER
* with the corresponding ioctl, then don't accept
* any inbound data. This should never happen for streams
- * created by nfsd or client-side KRPC because they are careful
+ * created by nfsd or client-side kRPC because they are careful
* to set the mode of the stream before doing anything else.
*/
if (mir->mir_type == 0) {
@@ -1456,7 +1377,7 @@ mir_rput(queue_t *q, mblk_t *mp)
* If a module on the stream is trying set the Stream head's
* high water mark, then set our hiwater to the requested
* value. We are the "stream head" for all inbound
- * data messages since messages are passed directly to KRPC.
+ * data messages since messages are passed directly to kRPC.
*/
if (MBLKL(mp) >= sizeof (struct stroptions)) {
struct stroptions *stropts;
@@ -1629,23 +1550,32 @@ mir_rput(queue_t *q, mblk_t *mp)
case RPC_SERVER:
/*
* Check for flow control before passing the
- * message to KRPC.
+ * message to kRPC.
*/
if (!mir->mir_hold_inbound) {
if (mir->mir_krpc_cell) {
- /*
- * If the reference count is 0
- * (not including this request),
- * then the stream is transitioning
- * from idle to non-idle. In this case,
- * we cancel the idle timer.
- */
- if (mir->mir_ref_cnt++ == 0)
- stop_timer = B_TRUE;
+
if (mir_check_len(q,
- (int32_t)msgdsize(mp), mp))
+ (int32_t)msgdsize(head_mp),
+ head_mp))
return;
- svc_queuereq(q, head_mp); /* to KRPC */
+
+ if (q->q_first == NULL &&
+ svc_queuereq(q, head_mp, TRUE)) {
+ /*
+ * If the reference count is 0
+ * (not including this
+ * request), then the stream is
+ * transitioning from idle to
+ * non-idle. In this case, we
+ * cancel the idle timer.
+ */
+ if (mir->mir_ref_cnt++ == 0)
+ stop_timer = B_TRUE;
+ } else {
+ (void) putq(q, head_mp);
+ mir->mir_inrservice = B_TRUE;
+ }
} else {
/*
* Count # of times this happens. Should
@@ -1811,7 +1741,7 @@ mir_rput_proto(queue_t *q, mblk_t *mp)
break;
default:
RPCLOG(1, "mir_rput: unexpected message %d "
- "for KRPC client\n",
+ "for kRPC client\n",
((union T_primitives *)mp->b_rptr)->type);
break;
}
@@ -1925,37 +1855,12 @@ mir_rput_proto(queue_t *q, mblk_t *mp)
* outbound flow control is exerted. When outbound flow control is
* relieved, mir_wsrv qenables the read-side queue. Read-side queues
* are not enabled by STREAMS and are explicitly noenable'ed in mir_open.
- *
- * For the server side, we have two types of messages queued. The first type
- * are messages that are ready to be XDR decoded and and then sent to the
- * RPC program's dispatch routine. The second type are "raw" messages that
- * haven't been processed, i.e. assembled from rpc record fragements into
- * full requests. The only time we will see the second type of message
- * queued is if we have a memory allocation failure while processing a
- * a raw message. The field mir_first_non_processed_mblk will mark the
- * first such raw message. So the flow for server side is:
- *
- * - send processed queued messages to kRPC until we run out or find
- * one that needs additional processing because we were short on memory
- * earlier
- * - process a message that was deferred because of lack of
- * memory
- * - continue processing messages until the queue empties or we
- * have to stop because of lack of memory
- * - during each of the above phase, if the queue is empty and
- * there are no pending messages that were passed to the RPC
- * layer, send upstream the pending disconnect/ordrel indication if
- * there is one
- *
- * The read-side queue is also enabled by a bufcall callback if dupmsg
- * fails in mir_rput.
*/
static void
mir_rsrv(queue_t *q)
{
mir_t *mir;
mblk_t *mp;
- mblk_t *cmp = NULL;
boolean_t stop_timer = B_FALSE;
mir = (mir_t *)q->q_ptr;
@@ -1966,43 +1871,28 @@ mir_rsrv(queue_t *q)
case RPC_SERVER:
if (mir->mir_ref_cnt == 0)
mir->mir_hold_inbound = 0;
- if (mir->mir_hold_inbound) {
-
- ASSERT(cmp == NULL);
- if (q->q_first == NULL) {
-
- MIR_CLEAR_INRSRV(mir);
-
- if (MIR_SVC_QUIESCED(mir)) {
- cmp = mir->mir_svc_pend_mp;
- mir->mir_svc_pend_mp = NULL;
- }
- }
-
- mutex_exit(&mir->mir_mutex);
-
- if (cmp != NULL) {
- RPCLOG(16, "mir_rsrv: line %d: sending a held "
- "disconnect/ord rel indication upstream\n",
- __LINE__);
- putnext(q, cmp);
- }
+ if (mir->mir_hold_inbound)
+ break;
- return;
- }
while (mp = getq(q)) {
if (mir->mir_krpc_cell &&
(mir->mir_svc_no_more_msgs == 0)) {
- /*
- * If we were idle, turn off idle timer since
- * we aren't idle any more.
- */
- if (mir->mir_ref_cnt++ == 0)
- stop_timer = B_TRUE;
+
if (mir_check_len(q,
(int32_t)msgdsize(mp), mp))
return;
- svc_queuereq(q, mp);
+
+ if (svc_queuereq(q, mp, TRUE)) {
+ /*
+ * If we were idle, turn off idle timer
+ * since we aren't idle any more.
+ */
+ if (mir->mir_ref_cnt++ == 0)
+ stop_timer = B_TRUE;
+ } else {
+ (void) putbq(q, mp);
+ break;
+ }
} else {
/*
* Count # of times this happens. Should be
@@ -2041,10 +1931,10 @@ mir_rsrv(queue_t *q)
}
if (q->q_first == NULL) {
+ mblk_t *cmp = NULL;
MIR_CLEAR_INRSRV(mir);
- ASSERT(cmp == NULL);
if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) {
cmp = mir->mir_svc_pend_mp;
mir->mir_svc_pend_mp = NULL;
@@ -2111,15 +2001,14 @@ mir_svc_start_close(queue_t *wq, mir_t *mir)
ASSERT((wq->q_flag & QREADR) == 0);
ASSERT(mir->mir_type == RPC_SERVER);
-
/*
* Do not accept any more messages.
*/
mir->mir_svc_no_more_msgs = 1;
/*
- * Next two statements will make the read service procedure invoke
- * svc_queuereq() on everything stuck in the streams read queue.
+ * Next two statements will make the read service procedure
+ * free everything stuck in the streams read queue.
* It's not necessary because enabling the write queue will
* have the same effect, but why not speed the process along?
*/
@@ -2134,11 +2023,11 @@ mir_svc_start_close(queue_t *wq, mir_t *mir)
}
/*
- * This routine is called directly by KRPC after a request is completed,
+ * This routine is called directly by kRPC after a request is completed,
* whether a reply was sent or the request was dropped.
*/
static void
-mir_svc_release(queue_t *wq, mblk_t *mp)
+mir_svc_release(queue_t *wq, mblk_t *mp, bool_t enable)
{
mir_t *mir = (mir_t *)wq->q_ptr;
mblk_t *cmp = NULL;
@@ -2147,6 +2036,9 @@ mir_svc_release(queue_t *wq, mblk_t *mp)
if (mp)
freemsg(mp);
+ if (enable)
+ qenable(RD(wq));
+
mutex_enter(&mir->mir_mutex);
/*
@@ -2194,7 +2086,7 @@ mir_svc_release(queue_t *wq, mblk_t *mp)
}
/*
- * This routine is called by server-side KRPC when it is ready to
+ * This routine is called by server-side kRPC when it is ready to
* handle inbound messages on the stream.
*/
static void
@@ -2286,7 +2178,7 @@ mir_timer(void *arg)
* For clients, the timer fires at clnt_idle_timeout
* intervals. If the activity marker (mir_clntreq) is
* zero, then the stream has been idle since the last
- * timer event and we notify KRPC. If mir_clntreq is
+ * timer event and we notify kRPC. If mir_clntreq is
* non-zero, then the stream is active and we just
* restart the timer for another interval. mir_clntreq
* is set to 1 in mir_wput for every request passed
@@ -2337,10 +2229,10 @@ printf("mir_timer[%d]: doing client timeout\n", now / hz);
mutex_exit(&mir->mir_mutex);
/*
* We pass T_ORDREL_REQ as an integer value
- * to KRPC as the indication that the stream
+ * to kRPC as the indication that the stream
* is idle. This is not a T_ORDREL_REQ message,
* it is just a convenient value since we call
- * the same KRPC routine for T_ORDREL_INDs and
+ * the same kRPC routine for T_ORDREL_INDs and
* T_DISCON_INDs.
*/
clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0);
@@ -2354,7 +2246,7 @@ printf("mir_timer[%d]: doing client timeout\n", now / hz);
* by mir_wput when mir_type is set to RPC_SERVER and
* by mir_svc_idle_start whenever the stream goes idle
* (mir_ref_cnt == 0). The timer is cancelled in
- * mir_rput whenever a new inbound request is passed to KRPC
+ * mir_rput whenever a new inbound request is passed to kRPC
* and the stream was previously idle.
*
* The timer interval can be changed for individual
@@ -2424,12 +2316,12 @@ mir_wput(queue_t *q, mblk_t *mp)
!IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) {
/*
* Since we know that M_DATA messages are created exclusively
- * by KRPC, we expect that KRPC will leave room for our header
+ * by kRPC, we expect that kRPC will leave room for our header
* and 4 byte align which is normal for XDR.
- * If KRPC (or someone else) does not cooperate, then we
+ * If kRPC (or someone else) does not cooperate, then we
* just throw away the message.
*/
- RPCLOG(1, "mir_wput: KRPC did not leave space for record "
+ RPCLOG(1, "mir_wput: kRPC did not leave space for record "
"fragment header (%d bytes left)\n",
(int)(rptr - mp->b_datap->db_base));
freemsg(mp);
@@ -2650,7 +2542,7 @@ ioc_eperm:
/*
* If the stream is not idle, then we hold the
* orderly release until it becomes idle. This
- * ensures that KRPC will be able to reply to
+ * ensures that kRPC will be able to reply to
* all requests that we have passed to it.
*
* We also queue the request if there is data already
@@ -2896,10 +2788,10 @@ mir_disconnect(queue_t *q, mir_t *mir)
mutex_exit(&mir->mir_mutex);
/*
- * T_DISCON_REQ is passed to KRPC as an integer value
+ * T_DISCON_REQ is passed to kRPC as an integer value
* (this is not a TPI message). It is used as a
* convenient value to indicate a sanity check
- * failure -- the same KRPC routine is also called
+ * failure -- the same kRPC routine is also called
* for T_DISCON_INDs and T_ORDREL_INDs.
*/
clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0);
@@ -2944,7 +2836,7 @@ mir_check_len(queue_t *q, int32_t frag_len, mblk_t *head_mp)
mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) {
cmn_err(CE_NOTE,
- "KRPC: record fragment from %s of size(%d) exceeds "
+ "kRPC: record fragment from %s of size(%d) exceeds "
"maximum (%u). Disconnecting",
(mir->mir_type == RPC_CLIENT) ? "server" :
(mir->mir_type == RPC_SERVER) ? "client" :
diff --git a/usr/src/uts/common/rpc/svc.c b/usr/src/uts/common/rpc/svc.c
index 250b5984c6..c0ca1ede3f 100644
--- a/usr/src/uts/common/rpc/svc.c
+++ b/usr/src/uts/common/rpc/svc.c
@@ -20,11 +20,12 @@
*/
/*
- * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
- * Use is subject to license terms.
+ * Copyright 2015 Nexenta Systems, Inc. All rights reserved.
*/
+
/*
- * Copyright 2014 Nexenta Systems, Inc. All rights reserved.
+ * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
+ * Use is subject to license terms.
*/
/*
@@ -47,7 +48,7 @@
* threads processing events on the transport. Some fields in the
* master structure are protected by locks
* - xp_req_lock protects the request queue:
- * xp_req_head, xp_req_tail
+ * xp_req_head, xp_req_tail, xp_reqs, xp_size, xp_full, xp_enable
* - xp_thread_lock protects the thread (clone) counts
* xp_threads, xp_detached_threads, xp_wq
* Each master transport is registered to exactly one thread pool.
@@ -72,7 +73,7 @@
* to restrict resource usage by the service. Some fields are protected
* by locks:
* - p_req_lock protects several counts and flags:
- * p_reqs, p_walkers, p_asleep, p_drowsy, p_req_cv
+ * p_reqs, p_size, p_walkers, p_asleep, p_drowsy, p_req_cv
* - p_thread_lock governs other thread counts:
* p_threads, p_detached_threads, p_reserved_threads, p_closing
*
@@ -113,13 +114,6 @@
* thread processes a request and sends a reply it returns to svc_run()
* and svc_run() calls svc_poll() to find new input.
*
- * There is no longer an "inconsistent" but "safe" optimization in the
- * svc_queuereq() code. This "inconsistent" state was leading to
- * inconsistencies between the actual number of requests and the value
- * of p_reqs (the total number of requests). Because of this, hangs were
- * occurring in svc_poll() where p_reqs was greater than one and no
- * requests were found on the request queues.
- *
* svc_poll().
* In order to avoid unnecessary locking, which causes performance
* problems, we always look for a pending request on the current transport.
@@ -201,6 +195,7 @@
#include <sys/user.h>
#include <sys/stream.h>
#include <sys/strsubr.h>
+#include <sys/strsun.h>
#include <sys/tihdr.h>
#include <sys/debug.h>
#include <sys/cmn_err.h>
@@ -292,6 +287,11 @@ struct svc_globals {
int rdma_check = 0;
/*
+ * This allows disabling flow control in svc_queuereq().
+ */
+volatile int svc_flowcontrol_disable = 0;
+
+/*
* Authentication parameters list.
*/
static caddr_t rqcred_head;
@@ -300,15 +300,15 @@ static kmutex_t rqcred_lock;
/*
* Pointers to transport specific `rele' routines in rpcmod (set from rpcmod).
*/
-void (*rpc_rele)(queue_t *, mblk_t *) = NULL;
-void (*mir_rele)(queue_t *, mblk_t *) = NULL;
+void (*rpc_rele)(queue_t *, mblk_t *, bool_t) = NULL;
+void (*mir_rele)(queue_t *, mblk_t *, bool_t) = NULL;
/* ARGSUSED */
void
-rpc_rdma_rele(queue_t *q, mblk_t *mp)
+rpc_rdma_rele(queue_t *q, mblk_t *mp, bool_t enable)
{
}
-void (*rdma_rele)(queue_t *, mblk_t *) = rpc_rdma_rele;
+void (*rdma_rele)(queue_t *, mblk_t *, bool_t) = rpc_rdma_rele;
/*
@@ -914,7 +914,7 @@ svc_xprt_qinit(SVCPOOL *pool, size_t qsize)
* - insert a pointer to xprt into the xprt-ready queue (FIFO)
* - if the xprt-ready queue is full turn the overflow flag on.
*
- * NOTICE: pool->p_qtop is protected by the the pool's request lock
+ * NOTICE: pool->p_qtop is protected by the pool's request lock
* and the caller (svc_queuereq()) must hold the lock.
*/
static void
@@ -922,7 +922,7 @@ svc_xprt_qput(SVCPOOL *pool, SVCMASTERXPRT *xprt)
{
ASSERT(MUTEX_HELD(&pool->p_req_lock));
- /* If the overflow flag is there is nothing we can do */
+ /* If the overflow flag is on there is nothing we can do */
if (pool->p_qoverflow)
return;
@@ -1871,15 +1871,8 @@ svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
if (xprt && xprt->xp_req_head && (!pool->p_qoverflow ||
clone_xprt->xp_same_xprt++ < pool->p_max_same_xprt)) {
mutex_enter(&xprt->xp_req_lock);
- if (xprt->xp_req_head) {
- mutex_enter(&pool->p_req_lock);
- pool->p_reqs--;
- if (pool->p_reqs == 0)
- pool->p_qoverflow = FALSE;
- mutex_exit(&pool->p_req_lock);
-
+ if (xprt->xp_req_head)
return (xprt);
- }
mutex_exit(&xprt->xp_req_lock);
}
clone_xprt->xp_same_xprt = 0;
@@ -1921,9 +1914,6 @@ svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
rw_exit(&pool->p_lrwlock);
mutex_enter(&pool->p_req_lock);
- pool->p_reqs--;
- if (pool->p_reqs == 0)
- pool->p_qoverflow = FALSE;
pool->p_walkers--;
mutex_exit(&pool->p_req_lock);
@@ -1994,9 +1984,6 @@ svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
rw_exit(&pool->p_lrwlock);
mutex_enter(&pool->p_req_lock);
- pool->p_reqs--;
- if (pool->p_reqs == 0)
- pool->p_qoverflow = FALSE;
pool->p_walkers--;
mutex_exit(&pool->p_req_lock);
@@ -2086,6 +2073,139 @@ svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
}
/*
+ * calculate memory space used by message
+ */
+static size_t
+svc_msgsize(mblk_t *mp)
+{
+ size_t count = 0;
+
+ for (; mp; mp = mp->b_cont)
+ count += MBLKSIZE(mp);
+
+ return (count);
+}
+
+/*
+ * svc_flowcontrol() attempts to turn the flow control on or off for the
+ * transport.
+ *
+ * On input the xprt->xp_full determines whether the flow control is currently
+ * off (FALSE) or on (TRUE). If it is off we do tests to see whether we should
+ * turn it on, and vice versa.
+ *
+ * There are two conditions considered for the flow control. Both conditions
+ * have the low and the high watermark. Once the high watermark is reached in
+ * EITHER condition the flow control is turned on. For turning the flow
+ * control off BOTH conditions must be below the low watermark.
+ *
+ * Condition #1 - Number of requests queued:
+ *
+ * The max number of threads working on the pool is roughly pool->p_maxthreads.
+ * Every thread could handle up to pool->p_max_same_xprt requests from one
+ * transport before it moves to another transport. See svc_poll() for details.
+ * In case all threads in the pool are working on a transport they will handle
+ * no more than enough_reqs (pool->p_maxthreads * pool->p_max_same_xprt)
+ * requests in one shot from that transport. We are turning the flow control
+ * on once the high watermark is reached for a transport so that the underlying
+ * queue knows the rate of incoming requests is higher than we are able to
+ * handle.
+ *
+ * The high watermark: 2 * enough_reqs
+ * The low watermark: enough_reqs
+ *
+ * Condition #2 - Length of the data payload for the queued messages/requests:
+ *
+ * We want to prevent a particular pool exhausting the memory, so once the
+ * total length of queued requests for the whole pool reaches the high
+ * watermark we start to turn on the flow control for significant memory
+ * consumers (individual transports). To keep the implementation simple
+ * enough, this condition is not exact, because we count only the data part of
+ * the queued requests and we ignore the overhead. For our purposes this
+ * should be enough. We should also consider that up to pool->p_maxthreads
+ * threads for the pool might work on large requests (this is not counted for
+ * this condition). We need to leave some space for rest of the system and for
+ * other big memory consumers (like ZFS). Also, after the flow control is
+ * turned on (on cots transports) we can start to accumulate a few megabytes in
+ * queues for each transport.
+ *
+ * Usually, the big memory consumers are NFS WRITE requests, so we do not
+ * expect to see this condition met for other than NFS pools.
+ *
+ * The high watermark: 1/5 of available memory
+ * The low watermark: 1/6 of available memory
+ *
+ * Once the high watermark is reached we turn the flow control on only for
+ * transports exceeding a per-transport memory limit. The per-transport
+ * fraction of memory is calculated as:
+ *
+ * the high watermark / number of transports
+ *
+ * For transports with less than the per-transport fraction of memory consumed,
+ * the flow control is not turned on, so they are not blocked by a few "hungry"
+ * transports. Because of this, the total memory consumption for the
+ * particular pool might grow up to 2 * the high watermark.
+ *
+ * The individual transports are unblocked once their consumption is below:
+ *
+ * per-transport fraction of memory / 2
+ *
+ * or once the total memory consumption for the whole pool falls below the low
+ * watermark.
+ *
+ */
+static void
+svc_flowcontrol(SVCMASTERXPRT *xprt)
+{
+ SVCPOOL *pool = xprt->xp_pool;
+ size_t totalmem = ptob(physmem);
+ int enough_reqs = pool->p_maxthreads * pool->p_max_same_xprt;
+
+ ASSERT(MUTEX_HELD(&xprt->xp_req_lock));
+
+ /* Should we turn the flow control on? */
+ if (xprt->xp_full == FALSE) {
+ /* Is flow control disabled? */
+ if (svc_flowcontrol_disable != 0)
+ return;
+
+ /* Is there enough requests queued? */
+ if (xprt->xp_reqs >= enough_reqs * 2) {
+ xprt->xp_full = TRUE;
+ return;
+ }
+
+ /*
+ * If this pool uses over 20% of memory and this transport is
+ * significant memory consumer then we are full
+ */
+ if (pool->p_size >= totalmem / 5 &&
+ xprt->xp_size >= totalmem / 5 / pool->p_lcount)
+ xprt->xp_full = TRUE;
+
+ return;
+ }
+
+ /* We might want to turn the flow control off */
+
+ /* Do we still have enough requests? */
+ if (xprt->xp_reqs > enough_reqs)
+ return;
+
+ /*
+ * If this pool still uses over 16% of memory and this transport is
+ * still significant memory consumer then we are still full
+ */
+ if (pool->p_size >= totalmem / 6 &&
+ xprt->xp_size >= totalmem / 5 / pool->p_lcount / 2)
+ return;
+
+ /* Turn the flow control off and make sure rpcmod is notified */
+ xprt->xp_full = FALSE;
+ xprt->xp_enable = TRUE;
+}
+
+/*
* Main loop of the kernel RPC server
* - wait for input (find a transport with a pending request).
* - dequeue the request
@@ -2111,6 +2231,8 @@ svc_run(SVCPOOL *pool)
for (;;) {
SVCMASTERXPRT *next;
mblk_t *mp;
+ bool_t enable;
+ size_t size;
TRACE_0(TR_FAC_KRPC, TR_SVC_RUN, "svc_run");
@@ -2168,6 +2290,20 @@ svc_run(SVCPOOL *pool)
mp = next->xp_req_head;
next->xp_req_head = mp->b_next;
mp->b_next = (mblk_t *)0;
+ size = svc_msgsize(mp);
+
+ mutex_enter(&pool->p_req_lock);
+ pool->p_reqs--;
+ if (pool->p_reqs == 0)
+ pool->p_qoverflow = FALSE;
+ pool->p_size -= size;
+ mutex_exit(&pool->p_req_lock);
+
+ next->xp_reqs--;
+ next->xp_size -= size;
+
+ if (next->xp_full)
+ svc_flowcontrol(next);
TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_DEQ,
"rpc_que_req_deq:pool %p mp %p", pool, mp);
@@ -2248,14 +2384,19 @@ svc_run(SVCPOOL *pool)
* Release our reference on the rpcmod
* slot attached to xp_wq->q_ptr.
*/
- (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL);
+ mutex_enter(&xprt->xp_req_lock);
+ enable = xprt->xp_enable;
+ if (enable)
+ xprt->xp_enable = FALSE;
+ mutex_exit(&xprt->xp_req_lock);
+ (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL, enable);
}
/* NOTREACHED */
}
/*
* Flush any pending requests for the queue and
- * and free the associated mblks.
+ * free the associated mblks.
*/
void
svc_queueclean(queue_t *q)
@@ -2270,14 +2411,21 @@ svc_queueclean(queue_t *q)
mutex_enter(&xprt->xp_req_lock);
pool = xprt->xp_pool;
while ((mp = xprt->xp_req_head) != NULL) {
- /* remove the request from the list and decrement p_reqs */
+ /* remove the request from the list */
xprt->xp_req_head = mp->b_next;
- mutex_enter(&pool->p_req_lock);
mp->b_next = (mblk_t *)0;
- pool->p_reqs--;
- mutex_exit(&pool->p_req_lock);
- (*RELE_PROC(xprt)) (xprt->xp_wq, mp);
+ (*RELE_PROC(xprt)) (xprt->xp_wq, mp, FALSE);
}
+
+ mutex_enter(&pool->p_req_lock);
+ pool->p_reqs -= xprt->xp_reqs;
+ pool->p_size -= xprt->xp_size;
+ mutex_exit(&pool->p_req_lock);
+
+ xprt->xp_reqs = 0;
+ xprt->xp_size = 0;
+ xprt->xp_full = FALSE;
+ xprt->xp_enable = FALSE;
mutex_exit(&xprt->xp_req_lock);
}
@@ -2362,14 +2510,16 @@ svc_queueclose(queue_t *q)
* - put a request at the tail of the transport request queue
* - insert a hint for svc_poll() into the xprt-ready queue
* - increment the `pending-requests' count for the pool
+ * - handle flow control
* - wake up a thread sleeping in svc_poll() if necessary
* - if all the threads are running ask the creator for a new one.
*/
-void
-svc_queuereq(queue_t *q, mblk_t *mp)
+bool_t
+svc_queuereq(queue_t *q, mblk_t *mp, bool_t flowcontrol)
{
SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
SVCPOOL *pool = xprt->xp_pool;
+ size_t size;
TRACE_0(TR_FAC_KRPC, TR_SVC_QUEUEREQ_START, "svc_queuereq_start");
@@ -2386,6 +2536,12 @@ svc_queuereq(queue_t *q, mblk_t *mp)
* pending request count it looks atomic.
*/
mutex_enter(&xprt->xp_req_lock);
+ if (flowcontrol && xprt->xp_full) {
+ mutex_exit(&xprt->xp_req_lock);
+
+ return (FALSE);
+ }
+ ASSERT(xprt->xp_full == FALSE);
mutex_enter(&pool->p_req_lock);
if (xprt->xp_req_head == NULL)
xprt->xp_req_head = mp;
@@ -2396,15 +2552,24 @@ svc_queuereq(queue_t *q, mblk_t *mp)
/*
* Step 2.
* Insert a hint into the xprt-ready queue, increment
- * `pending-requests' count for the pool, and wake up
+ * counters, handle flow control, and wake up
* a thread sleeping in svc_poll() if necessary.
*/
/* Insert pointer to this transport into the xprt-ready queue */
svc_xprt_qput(pool, xprt);
- /* Increment the `pending-requests' count for the pool */
+ /* Increment counters */
pool->p_reqs++;
+ xprt->xp_reqs++;
+
+ size = svc_msgsize(mp);
+ xprt->xp_size += size;
+ pool->p_size += size;
+
+ /* Handle flow control */
+ if (flowcontrol)
+ svc_flowcontrol(xprt);
TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_ENQ,
"rpc_que_req_enq:pool %p mp %p", pool, mp);
@@ -2449,6 +2614,8 @@ svc_queuereq(queue_t *q, mblk_t *mp)
TRACE_1(TR_FAC_KRPC, TR_SVC_QUEUEREQ_END,
"svc_queuereq_end:(%S)", "end");
+
+ return (TRUE);
}
/*
@@ -2539,6 +2706,7 @@ svc_detach_thread(SVCXPRT *clone_xprt)
{
SVCMASTERXPRT *xprt = clone_xprt->xp_master;
SVCPOOL *pool = xprt->xp_pool;
+ bool_t enable;
/* Thread must have a reservation */
ASSERT(clone_xprt->xp_reserved);
@@ -2558,7 +2726,12 @@ svc_detach_thread(SVCXPRT *clone_xprt)
mutex_exit(&pool->p_thread_lock);
/* Release an rpcmod slot for this request */
- (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL);
+ mutex_enter(&xprt->xp_req_lock);
+ enable = xprt->xp_enable;
+ if (enable)
+ xprt->xp_enable = FALSE;
+ mutex_exit(&xprt->xp_req_lock);
+ (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL, enable);
/* Mark the clone (thread) as detached */
clone_xprt->xp_reserved = FALSE;
@@ -2603,23 +2776,24 @@ rdma_stop(rdma_xprt_group_t *rdma_xprts)
mutex_enter(&xprt->xp_req_lock);
pool = xprt->xp_pool;
while ((mp = xprt->xp_req_head) != NULL) {
- /*
- * remove the request from the list and
- * decrement p_reqs
- */
+ rdma_recv_data_t *rdp = (rdma_recv_data_t *)mp->b_rptr;
+
+ /* remove the request from the list */
xprt->xp_req_head = mp->b_next;
- mutex_enter(&pool->p_req_lock);
mp->b_next = (mblk_t *)0;
- pool->p_reqs--;
- mutex_exit(&pool->p_req_lock);
- if (mp) {
- rdma_recv_data_t *rdp = (rdma_recv_data_t *)
- mp->b_rptr;
- RDMA_BUF_FREE(rdp->conn, &rdp->rpcmsg);
- RDMA_REL_CONN(rdp->conn);
- freemsg(mp);
- }
+
+ RDMA_BUF_FREE(rdp->conn, &rdp->rpcmsg);
+ RDMA_REL_CONN(rdp->conn);
+ freemsg(mp);
}
+ mutex_enter(&pool->p_req_lock);
+ pool->p_reqs -= xprt->xp_reqs;
+ pool->p_size -= xprt->xp_size;
+ mutex_exit(&pool->p_req_lock);
+ xprt->xp_reqs = 0;
+ xprt->xp_size = 0;
+ xprt->xp_full = FALSE;
+ xprt->xp_enable = FALSE;
mutex_exit(&xprt->xp_req_lock);
svc_queueclose(q);
#ifdef DEBUG
diff --git a/usr/src/uts/common/rpc/svc.h b/usr/src/uts/common/rpc/svc.h
index dd3cb44c27..134b690a3f 100644
--- a/usr/src/uts/common/rpc/svc.h
+++ b/usr/src/uts/common/rpc/svc.h
@@ -20,6 +20,7 @@
*/
/*
* Copyright (c) 1989, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
*/
/* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
/* All Rights Reserved */
@@ -254,7 +255,7 @@ struct __svcpool {
* The pool's thread lock p_thread_lock protects:
* - p_threads, p_detached_threads, p_reserved_threads and p_closing
* The pool's request lock protects:
- * - p_asleep, p_drowsy, p_reqs, p_walkers, p_req_cv.
+ * - p_asleep, p_drowsy, p_reqs, p_size, p_walkers, p_req_cv.
* The following fields are `initialized constants':
* - p_id, p_stksize, p_timeout.
* Access to p_next and p_prev is protected by the pool
@@ -359,6 +360,8 @@ struct __svcpool {
kmutex_t p_user_lock; /* Creator lock */
void (*p_offline)(); /* callout for unregister */
void (*p_shutdown)(); /* callout for shutdown */
+
+ size_t p_size; /* Total size of queued msgs */
};
/*
@@ -426,6 +429,11 @@ struct __svcmasterxprt {
struct netbuf xp_addrmask; /* address mask */
caddr_t xp_p2; /* private: for use by svc ops */
+
+ int xp_full : 1; /* xprt is full */
+ int xp_enable : 1; /* xprt needs to be enabled */
+ int xp_reqs; /* number of requests queued */
+ size_t xp_size; /* total size of queued msgs */
};
/*
@@ -787,7 +795,7 @@ extern int svc_clts_kcreate(struct file *, uint_t, struct T_info_ack *,
SVCMASTERXPRT **);
extern int svc_cots_kcreate(struct file *, uint_t, struct T_info_ack *,
SVCMASTERXPRT **);
-extern void svc_queuereq(queue_t *, mblk_t *);
+extern bool_t svc_queuereq(queue_t *, mblk_t *, bool_t);
extern void svc_queueclean(queue_t *);
extern void svc_queueclose(queue_t *);
extern int svc_reserve_thread(SVCXPRT *);
diff --git a/usr/src/uts/common/rpc/svc_gen.c b/usr/src/uts/common/rpc/svc_gen.c
index fafbff299a..87ea2a7282 100644
--- a/usr/src/uts/common/rpc/svc_gen.c
+++ b/usr/src/uts/common/rpc/svc_gen.c
@@ -23,6 +23,9 @@
* Copyright 2004 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
+/*
+ * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
+ */
/* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
/* All Rights Reserved */
@@ -32,8 +35,6 @@
* under license from the Regents of the University of California.
*/
-#pragma ident "%Z%%M% %I% %E% SMI"
-
#include <sys/types.h>
#include <sys/sysmacros.h>
#include <sys/param.h>
@@ -177,6 +178,10 @@ svc_tli_kcreate(
*/
xprt->xp_req_head = (mblk_t *)0;
xprt->xp_req_tail = (mblk_t *)0;
+ xprt->xp_full = FALSE;
+ xprt->xp_enable = FALSE;
+ xprt->xp_reqs = 0;
+ xprt->xp_size = 0;
mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL);
mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL);
xprt->xp_type = tinfo.SERV_type;
@@ -226,7 +231,7 @@ svc_tli_kcreate(
if (hotstream && tinfo.SERV_type == T_CLTS) {
udpmaj = ddi_name_to_major("udp");
if (udpmaj != (major_t)-1 &&
- getmajor(fp->f_vnode->v_rdev) == udpmaj)
+ getmajor(fp->f_vnode->v_rdev) == udpmaj)
create_putlocks(wq, 1);
}
diff --git a/usr/src/uts/common/rpc/svc_rdma.c b/usr/src/uts/common/rpc/svc_rdma.c
index 905b479112..e0a0fe32d1 100644
--- a/usr/src/uts/common/rpc/svc_rdma.c
+++ b/usr/src/uts/common/rpc/svc_rdma.c
@@ -20,6 +20,7 @@
*/
/*
* Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
*/
/* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
/* All Rights Reserved */
@@ -256,6 +257,10 @@ svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id,
mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL);
xprt->xp_req_head = (mblk_t *)0;
xprt->xp_req_tail = (mblk_t *)0;
+ xprt->xp_full = FALSE;
+ xprt->xp_enable = FALSE;
+ xprt->xp_reqs = 0;
+ xprt->xp_size = 0;
xprt->xp_threads = 0;
xprt->xp_detached_threads = 0;