summaryrefslogtreecommitdiff
path: root/usr/src/uts/common/inet/tcp/tcp_fusion.c
diff options
context:
space:
mode:
authorAnders Persson <Anders.Persson@Sun.COM>2009-06-29 13:59:58 -0700
committerAnders Persson <Anders.Persson@Sun.COM>2009-06-29 13:59:58 -0700
commit7b8f5432e28de8a1817b54aeae412921f8db38ec (patch)
treec0c9a644b4bde15e2e49fd0d19aee59c2b3f606e /usr/src/uts/common/inet/tcp/tcp_fusion.c
parent4da9f95ba48d107db68c73219b0e39a146588343 (diff)
downloadillumos-joyent-7b8f5432e28de8a1817b54aeae412921f8db38ec.tar.gz
6826274 remove synchronous streams from tcp
Diffstat (limited to 'usr/src/uts/common/inet/tcp/tcp_fusion.c')
-rw-r--r--usr/src/uts/common/inet/tcp/tcp_fusion.c723
1 files changed, 78 insertions, 645 deletions
diff --git a/usr/src/uts/common/inet/tcp/tcp_fusion.c b/usr/src/uts/common/inet/tcp/tcp_fusion.c
index c6af3564bc..dfecc57795 100644
--- a/usr/src/uts/common/inet/tcp/tcp_fusion.c
+++ b/usr/src/uts/common/inet/tcp/tcp_fusion.c
@@ -52,53 +52,14 @@
* fails, we fall back to the regular TCP data path; if it succeeds,
* both endpoints proceed to use tcp_fuse_output() as the transmit path.
* tcp_fuse_output() enqueues application data directly onto the peer's
- * receive queue; no protocol processing is involved. After enqueueing
- * the data, the sender can either push (putnext) data up the receiver's
- * read queue; or the sender can simply return and let the receiver
- * retrieve the enqueued data via the synchronous streams entry point
- * tcp_fuse_rrw(). The latter path is taken if synchronous streams is
- * enabled (the default). It is disabled if sockfs no longer resides
- * directly on top of tcp module due to a module insertion or removal.
- * It also needs to be temporarily disabled when sending urgent data
- * because the tcp_fuse_rrw() path bypasses the M_PROTO processing done
- * by strsock_proto() hook.
+ * receive queue; no protocol processing is involved.
*
* Sychronization is handled by squeue and the mutex tcp_non_sq_lock.
* One of the requirements for fusion to succeed is that both endpoints
* need to be using the same squeue. This ensures that neither side
- * can disappear while the other side is still sending data. By itself,
- * squeue is not sufficient for guaranteeing safety when synchronous
- * streams is enabled. The reason is that tcp_fuse_rrw() doesn't enter
- * the squeue and its access to tcp_rcv_list and other fusion-related
- * fields needs to be sychronized with the sender. tcp_non_sq_lock is
- * used for this purpose. When there is urgent data, the sender needs
- * to push the data up the receiver's streams read queue. In order to
- * avoid holding the tcp_non_sq_lock across putnext(), the sender sets
- * the peer tcp's tcp_fuse_syncstr_plugged bit and releases tcp_non_sq_lock
- * (see macro TCP_FUSE_SYNCSTR_PLUG_DRAIN()). If tcp_fuse_rrw() enters
- * after this point, it will see that synchronous streams is plugged and
- * will wait on tcp_fuse_plugcv. After the sender has finished pushing up
- * all urgent data, it will clear the tcp_fuse_syncstr_plugged bit using
- * TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(). This will cause any threads waiting
- * on tcp_fuse_plugcv to return EBUSY, and in turn cause strget() to call
- * getq_noenab() to dequeue data from the stream head instead. Once the
- * data on the stream head has been consumed, tcp_fuse_rrw() may again
- * be used to process tcp_rcv_list. However, if TCP_FUSE_SYNCSTR_STOP()
- * has been called, all future calls to tcp_fuse_rrw() will return EBUSY,
- * effectively disabling synchronous streams.
- *
- * The following note applies only to the synchronous streams mode.
- *
- * Flow control is done by checking the size of receive buffer and
- * the number of data blocks, both set to different limits. This is
- * different than regular streams flow control where cumulative size
- * check dominates block count check -- streams queue high water mark
- * typically represents bytes. Each enqueue triggers notifications
- * to the receiving process; a build up of data blocks indicates a
- * slow receiver and the sender should be blocked or informed at the
- * earliest moment instead of further wasting system resources. In
- * effect, this is equivalent to limiting the number of outstanding
- * segments in flight.
+ * can disappear while the other side is still sending data. Flow
+ * control information is manipulated outside the squeue, so the
+ * tcp_non_sq_lock must be held when touching tcp_flow_stopped.
*/
/*
@@ -108,26 +69,6 @@
boolean_t do_tcp_fusion = B_TRUE;
/*
- * Enabling this flag allows sockfs to retrieve data directly
- * from a fused tcp endpoint using synchronous streams interface.
- */
-boolean_t do_tcp_direct_sockfs = B_FALSE;
-
-/*
- * This is the minimum amount of outstanding writes allowed on
- * a synchronous streams-enabled receiving endpoint before the
- * sender gets flow-controlled. Setting this value to 0 means
- * that the data block limit is equivalent to the byte count
- * limit, which essentially disables the check.
- */
-#define TCP_FUSION_RCV_UNREAD_MIN 8
-uint_t tcp_fusion_rcv_unread_min = TCP_FUSION_RCV_UNREAD_MIN;
-
-static void tcp_fuse_syncstr_enable(tcp_t *);
-static void tcp_fuse_syncstr_disable(tcp_t *);
-static boolean_t strrput_sig(queue_t *, boolean_t);
-
-/*
* Return true if this connection needs some IP functionality
*/
static boolean_t
@@ -332,7 +273,7 @@ tcp_fuse(tcp_t *tcp, uchar_t *iphdr, tcph_t *tcph)
* able to flow control it in case it sends down huge amount
* of data while we're still detached. To prevent that we
* inherit the listener's recv_hiwater value; this is temporary
- * since we'll repeat the process intcp_accept_finish().
+ * since we'll repeat the process in tcp_accept_finish().
*/
if (!tcp->tcp_refuse) {
(void) tcp_fuse_set_rcv_hiwat(tcp,
@@ -419,6 +360,7 @@ void
tcp_unfuse(tcp_t *tcp)
{
tcp_t *peer_tcp = tcp->tcp_loopback_peer;
+ tcp_stack_t *tcps = tcp->tcp_tcps;
ASSERT(tcp->tcp_fused && peer_tcp != NULL);
ASSERT(peer_tcp->tcp_fused && peer_tcp->tcp_loopback_peer == tcp);
@@ -426,11 +368,49 @@ tcp_unfuse(tcp_t *tcp)
ASSERT(tcp->tcp_unsent == 0 && peer_tcp->tcp_unsent == 0);
/*
- * We disable synchronous streams, drain any queued data and
- * clear tcp_direct_sockfs. The synchronous streams entry
- * points will become no-ops after this point.
+ * Cancel any pending push timers.
*/
- tcp_fuse_disable_pair(tcp, B_TRUE);
+ if (tcp->tcp_push_tid != 0) {
+ (void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid);
+ tcp->tcp_push_tid = 0;
+ }
+ if (peer_tcp->tcp_push_tid != 0) {
+ (void) TCP_TIMER_CANCEL(peer_tcp, peer_tcp->tcp_push_tid);
+ peer_tcp->tcp_push_tid = 0;
+ }
+
+ /*
+ * Drain any pending data; Note that in case of a detached tcp, the
+ * draining will happen later after the tcp is unfused. For non-
+ * urgent data, this can be handled by the regular tcp_rcv_drain().
+ * If we have urgent data sitting in the receive list, we will
+ * need to send up a SIGURG signal first before draining the data.
+ * All of these will be handled by the code in tcp_fuse_rcv_drain()
+ * when called from tcp_rcv_drain().
+ */
+ if (!TCP_IS_DETACHED(tcp)) {
+ (void) tcp_fuse_rcv_drain(tcp->tcp_rq, tcp,
+ &tcp->tcp_fused_sigurg_mp);
+ }
+ if (!TCP_IS_DETACHED(peer_tcp)) {
+ (void) tcp_fuse_rcv_drain(peer_tcp->tcp_rq, peer_tcp,
+ &peer_tcp->tcp_fused_sigurg_mp);
+ }
+
+ /* Lift up any flow-control conditions */
+ mutex_enter(&tcp->tcp_non_sq_lock);
+ if (tcp->tcp_flow_stopped) {
+ tcp_clrqfull(tcp);
+ TCP_STAT(tcps, tcp_fusion_backenabled);
+ }
+ mutex_exit(&tcp->tcp_non_sq_lock);
+
+ mutex_enter(&peer_tcp->tcp_non_sq_lock);
+ if (peer_tcp->tcp_flow_stopped) {
+ tcp_clrqfull(peer_tcp);
+ TCP_STAT(tcps, tcp_fusion_backenabled);
+ }
+ mutex_exit(&peer_tcp->tcp_non_sq_lock);
/*
* Update th_seq and th_ack in the header template
@@ -447,8 +427,8 @@ tcp_unfuse(tcp_t *tcp)
ASSERT(peer_tcp->tcp_fused_sigurg_mp != NULL);
freeb(peer_tcp->tcp_fused_sigurg_mp);
peer_tcp->tcp_fused_sigurg_mp = NULL;
- }
- if (!IPCL_IS_NONSTR(tcp->tcp_connp)) {
+
+ ASSERT(!IPCL_IS_NONSTR(tcp->tcp_connp));
ASSERT(tcp->tcp_fused_sigurg_mp != NULL);
freeb(tcp->tcp_fused_sigurg_mp);
tcp->tcp_fused_sigurg_mp = NULL;
@@ -470,8 +450,8 @@ tcp_fuse_output_urg(tcp_t *tcp, mblk_t *mp)
tcp_stack_t *tcps = tcp->tcp_tcps;
ASSERT(tcp->tcp_fused);
- ASSERT(!IPCL_IS_NONSTR(tcp->tcp_connp));
ASSERT(peer_tcp != NULL && peer_tcp->tcp_loopback_peer == tcp);
+ ASSERT(!IPCL_IS_NONSTR(tcp->tcp_connp));
ASSERT(DB_TYPE(mp) == M_PROTO || DB_TYPE(mp) == M_PCPROTO);
ASSERT(mp->b_cont != NULL && DB_TYPE(mp->b_cont) == M_DATA);
ASSERT(MBLKL(mp) >= sizeof (*tei) && MBLKL(mp->b_cont) > 0);
@@ -555,7 +535,6 @@ boolean_t
tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size)
{
tcp_t *peer_tcp = tcp->tcp_loopback_peer;
- uint_t max_unread;
boolean_t flow_stopped, peer_data_queued = B_FALSE;
boolean_t urgent = (DB_TYPE(mp) != M_DATA);
boolean_t push = B_TRUE;
@@ -609,7 +588,6 @@ tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size)
freemsg(mp);
return (B_TRUE);
}
- max_unread = peer_tcp->tcp_fuse_rcv_unread_hiwater;
/*
* Handle urgent data; we either send up SIGURG to the peer now
@@ -617,14 +595,6 @@ tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size)
* or if we're short of memory for M_PCSIG mblk.
*/
if (urgent) {
- /*
- * We stop synchronous streams when we have urgent data
- * queued to prevent tcp_fuse_rrw() from pulling it. If
- * for some reasons the urgent data can't be delivered
- * below, synchronous streams will remain stopped until
- * someone drains the tcp_rcv_list.
- */
- TCP_FUSE_SYNCSTR_PLUG_DRAIN(peer_tcp);
tcp_fuse_output_urg(tcp, mp);
mp1 = mp->b_cont;
@@ -755,38 +725,17 @@ tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size)
freemsg(mp1);
}
- mutex_enter(&peer_tcp->tcp_non_sq_lock);
- /*
- * Wake up and signal the peer; it is okay to do this before
- * enqueueing because we are holding the lock. One of the
- * advantages of synchronous streams is the ability for us to
- * find out when the application performs a read on the socket,
- * by way of tcp_fuse_rrw() entry point being called. Every
- * data that gets enqueued onto the receiver is treated as if
- * it has arrived at the receiving endpoint, thus generating
- * SIGPOLL/SIGIO for asynchronous socket just as in the strrput()
- * case. However, we only wake up the application when necessary,
- * i.e. during the first enqueue. When tcp_fuse_rrw() is called
- * it will send everything upstream.
- */
- if (peer_tcp->tcp_direct_sockfs && !urgent &&
- !TCP_IS_DETACHED(peer_tcp)) {
- /* Update poll events and send SIGPOLL/SIGIO if necessary */
- STR_WAKEUP_SENDSIG(STREAM(peer_tcp->tcp_rq),
- peer_tcp->tcp_rcv_list);
- }
-
/*
* Enqueue data into the peer's receive list; we may or may not
* drain the contents depending on the conditions below.
*
- * tcp_hard_binding indicates that accept has not yet completed,
- * in which case we use tcp_rcv_enqueue() instead of calling
- * su_recv directly. Queued data will be drained when the accept
- * completes (in tcp_accept_finish()).
+ * For non-STREAMS sockets we normally queue data directly in the
+ * socket by calling the su_recv upcall. However, if the peer is
+ * detached we use tcp_rcv_enqueue() instead. Queued data will be
+ * drained when the accept completes (in tcp_accept_finish()).
*/
if (IPCL_IS_NONSTR(peer_tcp->tcp_connp) &&
- !peer_tcp->tcp_hard_binding) {
+ !TCP_IS_DETACHED(peer_tcp)) {
int error;
int flags = 0;
@@ -814,59 +763,31 @@ tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size)
*/
tcp->tcp_valid_bits &= ~TCP_URG_VALID;
freemsg(mp);
- mutex_exit(&peer_tcp->tcp_non_sq_lock);
return (B_TRUE);
}
tcp_rcv_enqueue(peer_tcp, mp, recv_size);
- }
- /* In case it wrapped around and also to keep it constant */
- peer_tcp->tcp_rwnd += recv_size;
- /*
- * We increase the peer's unread message count here whilst still
- * holding it's tcp_non_sq_lock. This ensures that the increment
- * occurs in the same lock acquisition perimeter as the enqueue.
- * Depending on lock hierarchy, we can release these locks which
- * creates a window in which we can race with tcp_fuse_rrw()
- */
- peer_tcp->tcp_fuse_rcv_unread_cnt++;
+ /* In case it wrapped around and also to keep it constant */
+ peer_tcp->tcp_rwnd += recv_size;
+ }
/*
* Exercise flow-control when needed; we will get back-enabled
- * in either tcp_accept_finish(), tcp_unfuse(), or tcp_fuse_rrw().
- * If tcp_direct_sockfs is on or if the peer endpoint is detached,
- * we emulate streams flow control by checking the peer's queue
- * size and high water mark; otherwise we simply use canputnext()
- * to decide if we need to stop our flow.
+ * in either tcp_accept_finish(), tcp_unfuse(), or when data is
+ * consumed. If peer endpoint is detached, we emulate streams flow
+ * control by checking the peer's queue size and high water mark;
+ * otherwise we simply use canputnext() to decide if we need to stop
+ * our flow.
*
- * The outstanding unread data block check does not apply for a
- * detached receiver; this is to avoid unnecessary blocking of the
- * sender while the accept is currently in progress and is quite
- * similar to the regular tcp.
- */
- if (TCP_IS_DETACHED(peer_tcp) || max_unread == 0)
- max_unread = UINT_MAX;
-
- /*
* Since we are accessing our tcp_flow_stopped and might modify it,
- * we need to take tcp->tcp_non_sq_lock. The lock for the highest
- * address is held first. Dropping peer_tcp->tcp_non_sq_lock should
- * not be an issue here since we are within the squeue and the peer
- * won't disappear.
+ * we need to take tcp->tcp_non_sq_lock.
*/
- if (tcp > peer_tcp) {
- mutex_exit(&peer_tcp->tcp_non_sq_lock);
- mutex_enter(&tcp->tcp_non_sq_lock);
- mutex_enter(&peer_tcp->tcp_non_sq_lock);
- } else {
- mutex_enter(&tcp->tcp_non_sq_lock);
- }
+ mutex_enter(&tcp->tcp_non_sq_lock);
flow_stopped = tcp->tcp_flow_stopped;
- if (((peer_tcp->tcp_direct_sockfs || TCP_IS_DETACHED(peer_tcp)) &&
- (peer_tcp->tcp_rcv_cnt >= peer_tcp->tcp_fuse_rcv_hiwater ||
- peer_tcp->tcp_fuse_rcv_unread_cnt >= max_unread)) ||
- (!peer_tcp->tcp_direct_sockfs && !TCP_IS_DETACHED(peer_tcp) &&
+ if ((TCP_IS_DETACHED(peer_tcp) &&
+ (peer_tcp->tcp_rcv_cnt >= peer_tcp->tcp_fuse_rcv_hiwater)) ||
+ (!TCP_IS_DETACHED(peer_tcp) &&
!IPCL_IS_NONSTR(peer_tcp->tcp_connp) &&
!canputnext(peer_tcp->tcp_rq))) {
peer_data_queued = B_TRUE;
@@ -877,9 +798,8 @@ tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size)
tcp_setqfull(tcp);
flow_stopped = B_TRUE;
TCP_STAT(tcps, tcp_fusion_flowctl);
- DTRACE_PROBE4(tcp__fuse__output__flowctl, tcp_t *, tcp,
- uint_t, send_size, uint_t, peer_tcp->tcp_rcv_cnt,
- uint_t, peer_tcp->tcp_fuse_rcv_unread_cnt);
+ DTRACE_PROBE3(tcp__fuse__output__flowctl, tcp_t *, tcp,
+ uint_t, send_size, uint_t, peer_tcp->tcp_rcv_cnt);
} else if (flow_stopped && !peer_data_queued &&
(TCP_UNSENT_BYTES(tcp) <= tcp->tcp_xmit_lowater)) {
tcp_clrqfull(tcp);
@@ -888,21 +808,6 @@ tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size)
}
mutex_exit(&tcp->tcp_non_sq_lock);
- /*
- * If we are in synchronous streams mode and the peer read queue is
- * not full then schedule a push timer if one is not scheduled
- * already. This is needed for applications which use MSG_PEEK to
- * determine the number of bytes available before issuing a 'real'
- * read. It also makes flow control more deterministic, particularly
- * for smaller message sizes.
- */
- if (!urgent && peer_tcp->tcp_direct_sockfs &&
- peer_tcp->tcp_push_tid == 0 && !TCP_IS_DETACHED(peer_tcp) &&
- canputnext(peer_tcp->tcp_rq)) {
- peer_tcp->tcp_push_tid = TCP_TIMER(peer_tcp, tcp_push_timer,
- MSEC_TO_TICK(tcps->tcps_push_timer_interval));
- }
- mutex_exit(&peer_tcp->tcp_non_sq_lock);
ipst->ips_loopback_packets++;
tcp->tcp_last_sent_len = send_size;
@@ -928,11 +833,9 @@ tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size)
!TCP_IS_DETACHED(peer_tcp)) {
/*
* Drain the peer's receive queue it has urgent data or if
- * we're not flow-controlled. There is no need for draining
- * normal data when tcp_direct_sockfs is on because the peer
- * will pull the data via tcp_fuse_rrw().
+ * we're not flow-controlled.
*/
- if (urgent || (!flow_stopped && !peer_tcp->tcp_direct_sockfs)) {
+ if (urgent || !flow_stopped) {
ASSERT(peer_tcp->tcp_rcv_list != NULL);
/*
* For TLI-based streams, a thread in tcp_accept_swap()
@@ -945,12 +848,6 @@ tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size)
membar_consumer();
(void) tcp_fuse_rcv_drain(peer_tcp->tcp_rq, peer_tcp,
NULL);
- /*
- * If synchronous streams was stopped above due
- * to the presence of urgent data, re-enable it.
- */
- if (urgent)
- TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(peer_tcp);
}
}
return (B_TRUE);
@@ -976,7 +873,6 @@ tcp_fuse_rcv_drain(queue_t *q, tcp_t *tcp, mblk_t **sigurg_mpp)
#endif
tcp_stack_t *tcps = tcp->tcp_tcps;
tcp_t *peer_tcp = tcp->tcp_loopback_peer;
- boolean_t sd_rd_eof = B_FALSE;
ASSERT(tcp->tcp_loopback);
ASSERT(tcp->tcp_fused || tcp->tcp_fused_sigurg);
@@ -1036,22 +932,6 @@ tcp_fuse_rcv_drain(queue_t *q, tcp_t *tcp, mblk_t **sigurg_mpp)
return (B_FALSE);
}
- /*
- * In the synchronous streams case, we generate SIGPOLL/SIGIO for
- * each M_DATA that gets enqueued onto the receiver. At this point
- * we are about to drain any queued data via putnext(). In order
- * to avoid extraneous signal generation from strrput(), we set
- * STRGETINPROG flag at the stream head prior to the draining and
- * restore it afterwards. This masks out signal generation only
- * for M_DATA messages and does not affect urgent data. We only do
- * this if the STREOF flag is not set which can happen if the
- * application shuts down the read side of a stream. In this case
- * we simply free these messages to approximate the flushq behavior
- * which normally occurs when STREOF is on the stream head read queue.
- */
- if (tcp->tcp_direct_sockfs)
- sd_rd_eof = strrput_sig(q, B_FALSE);
-
/* Drain the data */
while ((mp = tcp->tcp_rcv_list) != NULL) {
tcp->tcp_rcv_list = mp->b_next;
@@ -1060,441 +940,27 @@ tcp_fuse_rcv_drain(queue_t *q, tcp_t *tcp, mblk_t **sigurg_mpp)
cnt += msgdsize(mp);
#endif
ASSERT(!IPCL_IS_NONSTR(connp));
- if (sd_rd_eof) {
- freemsg(mp);
- } else {
- putnext(q, mp);
- TCP_STAT(tcps, tcp_fusion_putnext);
- }
+ putnext(q, mp);
+ TCP_STAT(tcps, tcp_fusion_putnext);
}
- if (tcp->tcp_direct_sockfs && !sd_rd_eof)
- (void) strrput_sig(q, B_TRUE);
-
#ifdef DEBUG
ASSERT(cnt == tcp->tcp_rcv_cnt);
#endif
tcp->tcp_rcv_last_head = NULL;
tcp->tcp_rcv_last_tail = NULL;
tcp->tcp_rcv_cnt = 0;
- tcp->tcp_fuse_rcv_unread_cnt = 0;
tcp->tcp_rwnd = tcp->tcp_recv_hiwater;
+ mutex_enter(&peer_tcp->tcp_non_sq_lock);
if (peer_tcp->tcp_flow_stopped && (TCP_UNSENT_BYTES(peer_tcp) <=
peer_tcp->tcp_xmit_lowater)) {
tcp_clrqfull(peer_tcp);
TCP_STAT(tcps, tcp_fusion_backenabled);
}
-
- return (B_TRUE);
-}
-
-/*
- * Synchronous stream entry point for sockfs to retrieve
- * data directly from tcp_rcv_list.
- * tcp_fuse_rrw() might end up modifying the peer's tcp_flow_stopped,
- * for which it must take the tcp_non_sq_lock of the peer as well
- * making any change. The order of taking the locks is based on
- * the TCP pointer itself. Before we get the peer we need to take
- * our tcp_non_sq_lock so that the peer doesn't disappear. However,
- * we cannot drop the lock if we have to grab the peer's lock (because
- * of ordering), since the peer might disappear in the interim. So,
- * we take our tcp_non_sq_lock, get the peer, increment the ref on the
- * peer's conn, drop all the locks and then take the tcp_non_sq_lock in the
- * desired order. Incrementing the conn ref on the peer means that the
- * peer won't disappear when we drop our tcp_non_sq_lock.
- */
-int
-tcp_fuse_rrw(queue_t *q, struiod_t *dp)
-{
- tcp_t *tcp = Q_TO_CONN(q)->conn_tcp;
- mblk_t *mp;
- tcp_t *peer_tcp;
- tcp_stack_t *tcps = tcp->tcp_tcps;
-
- mutex_enter(&tcp->tcp_non_sq_lock);
-
- /*
- * If tcp_fuse_syncstr_plugged is set, then another thread is moving
- * the underlying data to the stream head. We need to wait until it's
- * done, then return EBUSY so that strget() will dequeue data from the
- * stream head to ensure data is drained in-order.
- */
-plugged:
- if (tcp->tcp_fuse_syncstr_plugged) {
- do {
- cv_wait(&tcp->tcp_fuse_plugcv, &tcp->tcp_non_sq_lock);
- } while (tcp->tcp_fuse_syncstr_plugged);
-
- mutex_exit(&tcp->tcp_non_sq_lock);
- TCP_STAT(tcps, tcp_fusion_rrw_plugged);
- TCP_STAT(tcps, tcp_fusion_rrw_busy);
- return (EBUSY);
- }
-
- peer_tcp = tcp->tcp_loopback_peer;
-
- /*
- * If someone had turned off tcp_direct_sockfs or if synchronous
- * streams is stopped, we return EBUSY. This causes strget() to
- * dequeue data from the stream head instead.
- */
- if (!tcp->tcp_direct_sockfs || tcp->tcp_fuse_syncstr_stopped) {
- mutex_exit(&tcp->tcp_non_sq_lock);
- TCP_STAT(tcps, tcp_fusion_rrw_busy);
- return (EBUSY);
- }
-
- /*
- * Grab lock in order. The highest addressed tcp is locked first.
- * We don't do this within the tcp_rcv_list check since if we
- * have to drop the lock, for ordering, then the tcp_rcv_list
- * could change.
- */
- if (peer_tcp > tcp) {
- CONN_INC_REF(peer_tcp->tcp_connp);
- mutex_exit(&tcp->tcp_non_sq_lock);
- mutex_enter(&peer_tcp->tcp_non_sq_lock);
- mutex_enter(&tcp->tcp_non_sq_lock);
- /*
- * This might have changed in the interim
- * Once read-side tcp_non_sq_lock is dropped above
- * anything can happen, we need to check all
- * known conditions again once we reaquire
- * read-side tcp_non_sq_lock.
- */
- if (tcp->tcp_fuse_syncstr_plugged) {
- mutex_exit(&peer_tcp->tcp_non_sq_lock);
- CONN_DEC_REF(peer_tcp->tcp_connp);
- goto plugged;
- }
- if (!tcp->tcp_direct_sockfs || tcp->tcp_fuse_syncstr_stopped) {
- mutex_exit(&tcp->tcp_non_sq_lock);
- mutex_exit(&peer_tcp->tcp_non_sq_lock);
- CONN_DEC_REF(peer_tcp->tcp_connp);
- TCP_STAT(tcps, tcp_fusion_rrw_busy);
- return (EBUSY);
- }
- CONN_DEC_REF(peer_tcp->tcp_connp);
- } else {
- mutex_enter(&peer_tcp->tcp_non_sq_lock);
- }
-
- if ((mp = tcp->tcp_rcv_list) != NULL) {
-
- DTRACE_PROBE3(tcp__fuse__rrw, tcp_t *, tcp,
- uint32_t, tcp->tcp_rcv_cnt, ssize_t, dp->d_uio.uio_resid);
-
- tcp->tcp_rcv_list = NULL;
- TCP_STAT(tcps, tcp_fusion_rrw_msgcnt);
-
- /*
- * At this point nothing should be left in tcp_rcv_list.
- * The only possible case where we would have a chain of
- * b_next-linked messages is urgent data, but we wouldn't
- * be here if that's true since urgent data is delivered
- * via putnext() and synchronous streams is stopped until
- * tcp_fuse_rcv_drain() is finished.
- */
- ASSERT(DB_TYPE(mp) == M_DATA && mp->b_next == NULL);
-
- tcp->tcp_rcv_last_head = NULL;
- tcp->tcp_rcv_last_tail = NULL;
- tcp->tcp_rcv_cnt = 0;
- tcp->tcp_fuse_rcv_unread_cnt = 0;
-
- if (peer_tcp->tcp_flow_stopped &&
- (TCP_UNSENT_BYTES(peer_tcp) <=
- peer_tcp->tcp_xmit_lowater)) {
- tcp_clrqfull(peer_tcp);
- TCP_STAT(tcps, tcp_fusion_backenabled);
- }
- }
mutex_exit(&peer_tcp->tcp_non_sq_lock);
- /*
- * Either we just dequeued everything or we get here from sockfs
- * and have nothing to return; in this case clear RSLEEP.
- */
- ASSERT(tcp->tcp_rcv_last_head == NULL);
- ASSERT(tcp->tcp_rcv_last_tail == NULL);
- ASSERT(tcp->tcp_rcv_cnt == 0);
- ASSERT(tcp->tcp_fuse_rcv_unread_cnt == 0);
- STR_WAKEUP_CLEAR(STREAM(q));
-
- mutex_exit(&tcp->tcp_non_sq_lock);
- dp->d_mp = mp;
- return (0);
-}
-
-/*
- * Synchronous stream entry point used by certain ioctls to retrieve
- * information about or peek into the tcp_rcv_list.
- */
-int
-tcp_fuse_rinfop(queue_t *q, infod_t *dp)
-{
- tcp_t *tcp = Q_TO_CONN(q)->conn_tcp;
- mblk_t *mp;
- uint_t cmd = dp->d_cmd;
- int res = 0;
- int error = 0;
- struct stdata *stp = STREAM(q);
-
- mutex_enter(&tcp->tcp_non_sq_lock);
- /* If shutdown on read has happened, return nothing */
- mutex_enter(&stp->sd_lock);
- if (stp->sd_flag & STREOF) {
- mutex_exit(&stp->sd_lock);
- goto done;
- }
- mutex_exit(&stp->sd_lock);
-
- /*
- * It is OK not to return an answer if tcp_rcv_list is
- * currently not accessible.
- */
- if (!tcp->tcp_direct_sockfs || tcp->tcp_fuse_syncstr_stopped ||
- tcp->tcp_fuse_syncstr_plugged || (mp = tcp->tcp_rcv_list) == NULL)
- goto done;
-
- if (cmd & INFOD_COUNT) {
- /*
- * We have at least one message and
- * could return only one at a time.
- */
- dp->d_count++;
- res |= INFOD_COUNT;
- }
- if (cmd & INFOD_BYTES) {
- /*
- * Return size of all data messages.
- */
- dp->d_bytes += tcp->tcp_rcv_cnt;
- res |= INFOD_BYTES;
- }
- if (cmd & INFOD_FIRSTBYTES) {
- /*
- * Return size of first data message.
- */
- dp->d_bytes = msgdsize(mp);
- res |= INFOD_FIRSTBYTES;
- dp->d_cmd &= ~INFOD_FIRSTBYTES;
- }
- if (cmd & INFOD_COPYOUT) {
- mblk_t *mp1;
- int n;
-
- if (DB_TYPE(mp) == M_DATA) {
- mp1 = mp;
- } else {
- mp1 = mp->b_cont;
- ASSERT(mp1 != NULL);
- }
-
- /*
- * Return data contents of first message.
- */
- ASSERT(DB_TYPE(mp1) == M_DATA);
- while (mp1 != NULL && dp->d_uiop->uio_resid > 0) {
- n = MIN(dp->d_uiop->uio_resid, MBLKL(mp1));
- if (n != 0 && (error = uiomove((char *)mp1->b_rptr, n,
- UIO_READ, dp->d_uiop)) != 0) {
- goto done;
- }
- mp1 = mp1->b_cont;
- }
- res |= INFOD_COPYOUT;
- dp->d_cmd &= ~INFOD_COPYOUT;
- }
-done:
- mutex_exit(&tcp->tcp_non_sq_lock);
-
- dp->d_res |= res;
-
- return (error);
-}
-
-/*
- * Enable synchronous streams on a fused tcp loopback endpoint.
- */
-static void
-tcp_fuse_syncstr_enable(tcp_t *tcp)
-{
- queue_t *rq = tcp->tcp_rq;
- struct stdata *stp = STREAM(rq);
-
- /* We can only enable synchronous streams for sockfs mode */
- tcp->tcp_direct_sockfs = tcp->tcp_issocket && do_tcp_direct_sockfs;
-
- if (!tcp->tcp_direct_sockfs)
- return;
-
- mutex_enter(&stp->sd_lock);
- mutex_enter(QLOCK(rq));
-
- /*
- * We replace our q_qinfo with one that has the qi_rwp entry point.
- * Clear SR_SIGALLDATA because we generate the equivalent signal(s)
- * for every enqueued data in tcp_fuse_output().
- */
- rq->q_qinfo = &tcp_loopback_rinit;
- rq->q_struiot = tcp_loopback_rinit.qi_struiot;
- stp->sd_struiordq = rq;
- stp->sd_rput_opt &= ~SR_SIGALLDATA;
-
- mutex_exit(QLOCK(rq));
- mutex_exit(&stp->sd_lock);
-}
-
-/*
- * Disable synchronous streams on a fused tcp loopback endpoint.
- */
-static void
-tcp_fuse_syncstr_disable(tcp_t *tcp)
-{
- queue_t *rq = tcp->tcp_rq;
- struct stdata *stp = STREAM(rq);
-
- if (!tcp->tcp_direct_sockfs)
- return;
-
- mutex_enter(&stp->sd_lock);
- mutex_enter(QLOCK(rq));
-
- /*
- * Reset q_qinfo to point to the default tcp entry points.
- * Also restore SR_SIGALLDATA so that strrput() can generate
- * the signals again for future M_DATA messages.
- */
- rq->q_qinfo = &tcp_rinitv4; /* No open - same as rinitv6 */
- rq->q_struiot = tcp_rinitv4.qi_struiot;
- stp->sd_struiordq = NULL;
- stp->sd_rput_opt |= SR_SIGALLDATA;
- tcp->tcp_direct_sockfs = B_FALSE;
-
- mutex_exit(QLOCK(rq));
- mutex_exit(&stp->sd_lock);
-}
-
-/*
- * Enable synchronous streams on a pair of fused tcp endpoints.
- */
-void
-tcp_fuse_syncstr_enable_pair(tcp_t *tcp)
-{
- tcp_t *peer_tcp = tcp->tcp_loopback_peer;
-
- ASSERT(tcp->tcp_fused);
- ASSERT(peer_tcp != NULL);
-
- tcp_fuse_syncstr_enable(tcp);
- tcp_fuse_syncstr_enable(peer_tcp);
-}
-
-/*
- * Used to enable/disable signal generation at the stream head. We already
- * generated the signal(s) for these messages when they were enqueued on the
- * receiver. We also check if STREOF is set here. If it is, we return false
- * and let the caller decide what to do.
- */
-static boolean_t
-strrput_sig(queue_t *q, boolean_t on)
-{
- struct stdata *stp = STREAM(q);
-
- mutex_enter(&stp->sd_lock);
- if (stp->sd_flag == STREOF) {
- mutex_exit(&stp->sd_lock);
- return (B_TRUE);
- }
- if (on)
- stp->sd_flag &= ~STRGETINPROG;
- else
- stp->sd_flag |= STRGETINPROG;
- mutex_exit(&stp->sd_lock);
-
- return (B_FALSE);
-}
-
-/*
- * Disable synchronous streams on a pair of fused tcp endpoints and drain
- * any queued data; called either during unfuse or upon transitioning from
- * a socket to a stream endpoint due to _SIOCSOCKFALLBACK.
- */
-void
-tcp_fuse_disable_pair(tcp_t *tcp, boolean_t unfusing)
-{
- tcp_t *peer_tcp = tcp->tcp_loopback_peer;
- tcp_stack_t *tcps = tcp->tcp_tcps;
-
- ASSERT(tcp->tcp_fused);
- ASSERT(peer_tcp != NULL);
- /*
- * Force any tcp_fuse_rrw() calls to block until we've moved the data
- * onto the stream head.
- */
- TCP_FUSE_SYNCSTR_PLUG_DRAIN(tcp);
- TCP_FUSE_SYNCSTR_PLUG_DRAIN(peer_tcp);
-
- /*
- * Cancel any pending push timers.
- */
- if (tcp->tcp_push_tid != 0) {
- (void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid);
- tcp->tcp_push_tid = 0;
- }
- if (peer_tcp->tcp_push_tid != 0) {
- (void) TCP_TIMER_CANCEL(peer_tcp, peer_tcp->tcp_push_tid);
- peer_tcp->tcp_push_tid = 0;
- }
-
- /*
- * Drain any pending data; the detached check is needed because
- * we may be called as a result of a tcp_unfuse() triggered by
- * tcp_fuse_output(). Note that in case of a detached tcp, the
- * draining will happen later after the tcp is unfused. For non-
- * urgent data, this can be handled by the regular tcp_rcv_drain().
- * If we have urgent data sitting in the receive list, we will
- * need to send up a SIGURG signal first before draining the data.
- * All of these will be handled by the code in tcp_fuse_rcv_drain()
- * when called from tcp_rcv_drain().
- */
- if (!TCP_IS_DETACHED(tcp)) {
- (void) tcp_fuse_rcv_drain(tcp->tcp_rq, tcp,
- (unfusing ? &tcp->tcp_fused_sigurg_mp : NULL));
- }
- if (!TCP_IS_DETACHED(peer_tcp)) {
- (void) tcp_fuse_rcv_drain(peer_tcp->tcp_rq, peer_tcp,
- (unfusing ? &peer_tcp->tcp_fused_sigurg_mp : NULL));
- }
-
- /*
- * Make all current and future tcp_fuse_rrw() calls fail with EBUSY.
- * To ensure threads don't sneak past the checks in tcp_fuse_rrw(),
- * a given stream must be stopped prior to being unplugged (but the
- * ordering of operations between the streams is unimportant).
- */
- TCP_FUSE_SYNCSTR_STOP(tcp);
- TCP_FUSE_SYNCSTR_STOP(peer_tcp);
- TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(tcp);
- TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(peer_tcp);
-
- /* Lift up any flow-control conditions */
- if (tcp->tcp_flow_stopped) {
- tcp_clrqfull(tcp);
- TCP_STAT(tcps, tcp_fusion_backenabled);
- }
- if (peer_tcp->tcp_flow_stopped) {
- tcp_clrqfull(peer_tcp);
- TCP_STAT(tcps, tcp_fusion_backenabled);
- }
-
- /* Disable synchronous streams */
- if (!IPCL_IS_NONSTR(tcp->tcp_connp))
- tcp_fuse_syncstr_disable(tcp);
- if (!IPCL_IS_NONSTR(peer_tcp->tcp_connp))
- tcp_fuse_syncstr_disable(peer_tcp);
+ return (B_TRUE);
}
/*
@@ -1549,23 +1015,6 @@ tcp_fuse_maxpsz_set(tcp_t *tcp)
maxpsz = peer_tcp->tcp_fuse_rcv_hiwater;
maxpsz = P2ROUNDUP_TYPED(maxpsz, PAGESIZE, uint_t) >> 1;
- /*
- * Calculate the peer's limit for the number of outstanding unread
- * data block. This is the amount of data blocks that are allowed
- * to reside in the receiver's queue before the sender gets flow
- * controlled. It is used only in the synchronous streams mode as
- * a way to throttle the sender when it performs consecutive writes
- * faster than can be read. The value is derived from SO_SNDBUF in
- * order to give the sender some control; we divide it with a large
- * value (16KB) to produce a fairly low initial limit.
- */
- if (tcp_fusion_rcv_unread_min == 0) {
- /* A value of 0 means that we disable the check */
- peer_tcp->tcp_fuse_rcv_unread_hiwater = 0;
- } else {
- peer_tcp->tcp_fuse_rcv_unread_hiwater =
- MAX(sndbuf >> 14, tcp_fusion_rcv_unread_min);
- }
return (maxpsz);
}
@@ -1584,32 +1033,16 @@ tcp_fuse_backenable(tcp_t *tcp)
ASSERT(tcp->tcp_connp->conn_sqp ==
peer_tcp->tcp_connp->conn_sqp);
- /*
- * Normally we would not get backenabled in synchronous
- * streams mode, but in case this happens, we need to plug
- * synchronous streams during our drain to prevent a race
- * with tcp_fuse_rrw() or tcp_fuse_rinfop().
- */
- TCP_FUSE_SYNCSTR_PLUG_DRAIN(tcp);
if (tcp->tcp_rcv_list != NULL)
(void) tcp_fuse_rcv_drain(tcp->tcp_rq, tcp, NULL);
- if (peer_tcp > tcp) {
- mutex_enter(&peer_tcp->tcp_non_sq_lock);
- mutex_enter(&tcp->tcp_non_sq_lock);
- } else {
- mutex_enter(&tcp->tcp_non_sq_lock);
- mutex_enter(&peer_tcp->tcp_non_sq_lock);
- }
-
+ mutex_enter(&peer_tcp->tcp_non_sq_lock);
if (peer_tcp->tcp_flow_stopped &&
(TCP_UNSENT_BYTES(peer_tcp) <=
peer_tcp->tcp_xmit_lowater)) {
tcp_clrqfull(peer_tcp);
}
mutex_exit(&peer_tcp->tcp_non_sq_lock);
- mutex_exit(&tcp->tcp_non_sq_lock);
- TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(tcp);
TCP_STAT(tcp->tcp_tcps, tcp_fusion_backenabled);
}