summaryrefslogtreecommitdiff
path: root/modules/http2/h2_session.c
diff options
context:
space:
mode:
authorStefan Fritsch <sf@sfritsch.de>2016-07-05 23:20:42 +0200
committerStefan Fritsch <sf@sfritsch.de>2016-07-05 23:20:42 +0200
commitd5ffc4eb85d71c901c85119cf873e343349e97e2 (patch)
tree564636012ef7538ed4d7096b83c994dbda76c9db /modules/http2/h2_session.c
parent48eddd3d39fa2668ee29198ebfb33c41d4738c21 (diff)
downloadapache2-upstream.tar.gz
Imported Upstream version 2.4.23upstream
Diffstat (limited to 'modules/http2/h2_session.c')
-rw-r--r--modules/http2/h2_session.c899
1 files changed, 452 insertions, 447 deletions
diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c
index 928bb4a6..598df177 100644
--- a/modules/http2/h2_session.c
+++ b/modules/http2/h2_session.c
@@ -28,6 +28,7 @@
#include <scoreboard.h>
#include "h2_private.h"
+#include "h2.h"
#include "h2_bucket_eoc.h"
#include "h2_bucket_eos.h"
#include "h2_config.h"
@@ -56,7 +57,7 @@ static int h2_session_status_from_apr_status(apr_status_t rv)
return NGHTTP2_ERR_WOULDBLOCK;
}
else if (APR_STATUS_IS_EOF(rv)) {
- return NGHTTP2_ERR_EOF;
+ return NGHTTP2_ERR_EOF;
}
return NGHTTP2_ERR_PROTO;
}
@@ -78,6 +79,18 @@ static int is_accepting_streams(h2_session *session);
static void dispatch_event(h2_session *session, h2_session_event_t ev,
int err, const char *msg);
+apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
+{
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): EOS bucket cleanup -> done",
+ session->id, stream->id);
+ h2_ihash_remove(session->streams, stream->id);
+ h2_mplx_stream_done(session->mplx, stream);
+
+ dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
+ return APR_SUCCESS;
+}
+
typedef struct stream_sel_ctx {
h2_session *session;
h2_stream *candidate;
@@ -112,7 +125,7 @@ static void cleanup_streams(h2_session *session)
while (1) {
h2_ihash_iter(session->streams, find_cleanup_stream, &ctx);
if (ctx.candidate) {
- h2_session_stream_destroy(session, ctx.candidate);
+ h2_session_stream_done(session, ctx.candidate);
ctx.candidate = NULL;
}
else {
@@ -121,23 +134,20 @@ static void cleanup_streams(h2_session *session)
}
}
-h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
+h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
+ int initiated_on, const h2_request *req)
{
h2_stream * stream;
apr_pool_t *stream_pool;
- if (session->spare) {
- stream_pool = session->spare;
- session->spare = NULL;
- }
- else {
- apr_pool_create(&stream_pool, session->pool);
- apr_pool_tag(stream_pool, "h2_stream");
- }
-
- stream = h2_stream_open(stream_id, stream_pool, session);
+ apr_pool_create(&stream_pool, session->pool);
+ apr_pool_tag(stream_pool, "h2_stream");
+ stream = h2_stream_open(stream_id, stream_pool, session,
+ initiated_on, req);
+ nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
h2_ihash_add(session->streams, stream);
+
if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
if (stream_id > session->remote.emitted_max) {
++session->remote.emitted_count;
@@ -151,6 +161,7 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
session->remote.emitted_max = stream->id;
}
}
+ dispatch_event(session, H2_SESSION_EV_STREAM_OPEN, 0, NULL);
return stream;
}
@@ -252,13 +263,18 @@ static int on_invalid_frame_recv_cb(nghttp2_session *ngh2,
h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03063)
- "h2_session(%ld): recv unknown FRAME[%s], frames=%ld/%ld (r/s)",
+ "h2_session(%ld): recv invalid FRAME[%s], frames=%ld/%ld (r/s)",
session->id, buffer, (long)session->frames_received,
(long)session->frames_sent);
}
return 0;
}
+static h2_stream *get_stream(h2_session *session, int stream_id)
+{
+ return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+}
+
static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
int32_t stream_id,
const uint8_t *data, size_t len, void *userp)
@@ -274,7 +290,7 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
return 0;
}
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064)
"h2_stream(%ld-%d): on_data_chunk for unknown stream",
@@ -309,8 +325,12 @@ static apr_status_t stream_release(h2_session *session,
h2_stream *stream,
uint32_t error_code)
{
+ conn_rec *c = session->c;
+ apr_bucket *b;
+ apr_status_t status;
+
if (!error_code) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): handled, closing",
session->id, (int)stream->id);
if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
@@ -320,16 +340,18 @@ static apr_status_t stream_release(h2_session *session,
}
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03065)
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03065)
"h2_stream(%ld-%d): closing with err=%d %s",
session->id, (int)stream->id, (int)error_code,
h2_h2_err_description(error_code));
h2_stream_rst(stream, error_code);
}
- return h2_conn_io_writeb(&session->io,
- h2_bucket_eos_create(session->c->bucket_alloc,
- stream));
+ b = h2_bucket_eos_create(c->bucket_alloc, stream);
+ APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+ status = h2_conn_io_pass(&session->io, session->bbtmp);
+ apr_brigade_cleanup(session->bbtmp);
+ return status;
}
static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -339,7 +361,7 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
h2_stream *stream;
(void)ngh2;
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (stream) {
stream_release(session, stream, error_code);
}
@@ -355,12 +377,12 @@ static int on_begin_headers_cb(nghttp2_session *ngh2,
/* We may see HEADERs at the start of a stream or after all DATA
* streams to carry trailers. */
(void)ngh2;
- s = h2_session_get_stream(session, frame->hd.stream_id);
+ s = get_stream(session, frame->hd.stream_id);
if (s) {
/* nop */
}
else {
- s = h2_session_open_stream((h2_session *)userp, frame->hd.stream_id);
+ s = h2_session_open_stream(userp, frame->hd.stream_id, 0, NULL);
}
return s? 0 : NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
}
@@ -375,26 +397,24 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
h2_stream * stream;
apr_status_t status;
- (void)ngh2;
(void)flags;
if (!is_accepting_streams(session)) {
/* just ignore */
return 0;
}
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
APLOGNO(02920)
- "h2_session: stream(%ld-%d): on_header for unknown stream",
+ "h2_session: stream(%ld-%d): on_header unknown stream",
session->id, (int)frame->hd.stream_id);
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
status = h2_stream_add_header(stream, (const char *)name, namelen,
(const char *)value, valuelen);
-
- if (status != APR_SUCCESS) {
+ if (status != APR_SUCCESS && !stream->response) {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
return 0;
@@ -429,7 +449,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
/* This can be HEADERS for a new stream, defining the request,
* or HEADER may come after DATA at the end of a stream as in
* trailers */
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
@@ -453,7 +473,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
}
break;
case NGHTTP2_DATA:
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
@@ -490,7 +510,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
"h2_session(%ld-%d): RST_STREAM by client, errror=%d",
session->id, (int)frame->hd.stream_id,
(int)frame->rst_stream.error_code);
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (stream && stream->request && stream->request->initiated_on) {
++session->pushes_reset;
}
@@ -533,13 +553,6 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
return 0;
}
-static apr_status_t pass_data(void *ctx,
- const char *data, apr_off_t length)
-{
- return h2_conn_io_write(&((h2_session*)ctx)->io, data, length);
-}
-
-
static char immortal_zeros[H2_MAX_PADLEN];
static int on_send_data_cb(nghttp2_session *ngh2,
@@ -556,6 +569,7 @@ static int on_send_data_cb(nghttp2_session *ngh2,
int eos;
h2_stream *stream;
apr_bucket *b;
+ apr_off_t len = length;
(void)ngh2;
(void)source;
@@ -564,65 +578,57 @@ static int on_send_data_cb(nghttp2_session *ngh2,
}
padlen = (unsigned char)frame->data.padlen;
- stream = h2_session_get_stream(session, stream_id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): send_data_cb for %ld bytes",
+ session->id, (int)stream_id, (long)length);
+
+ stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
APLOGNO(02924)
- "h2_stream(%ld-%d): send_data",
+ "h2_stream(%ld-%d): send_data, lookup stream",
session->id, (int)stream_id);
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): send_data_cb for %ld bytes",
- session->id, (int)stream_id, (long)length);
-
- if (h2_conn_io_is_buffered(&session->io)) {
- status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
- if (status == APR_SUCCESS) {
- if (padlen) {
- status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
- }
-
- if (status == APR_SUCCESS) {
- apr_off_t len = length;
- status = h2_stream_readx(stream, pass_data, session, &len, &eos);
- if (status == APR_SUCCESS && len != length) {
- status = APR_EINVAL;
- }
- }
-
- if (status == APR_SUCCESS && padlen) {
- if (padlen) {
- status = h2_conn_io_write(&session->io, immortal_zeros, padlen);
- }
- }
- }
+ status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
+ if (padlen && status == APR_SUCCESS) {
+ status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
}
- else {
- status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
- if (padlen && status == APR_SUCCESS) {
- status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
- }
- if (status == APR_SUCCESS) {
- apr_off_t len = length;
- status = h2_stream_read_to(stream, session->io.output, &len, &eos);
- if (status == APR_SUCCESS && len != length) {
- status = APR_EINVAL;
- }
- }
-
- if (status == APR_SUCCESS && padlen) {
- b = apr_bucket_immortal_create(immortal_zeros, padlen,
- session->c->bucket_alloc);
- status = h2_conn_io_writeb(&session->io, b);
- }
+
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
+ "h2_stream(%ld-%d): writing frame header",
+ session->id, (int)stream_id);
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+
+ status = h2_stream_read_to(stream, session->bbtmp, &len, &eos);
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
+ "h2_stream(%ld-%d): send_data_cb, reading stream",
+ session->id, (int)stream_id);
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+ else if (len != length) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
+ "h2_stream(%ld-%d): send_data_cb, wanted %ld bytes, "
+ "got %ld from stream",
+ session->id, (int)stream_id, (long)length, (long)len);
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
}
+ if (padlen) {
+ b = apr_bucket_immortal_create(immortal_zeros, padlen,
+ session->c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+ }
+ status = h2_conn_io_pass(&session->io, session->bbtmp);
+
+ apr_brigade_cleanup(session->bbtmp);
if (status == APR_SUCCESS) {
stream->data_frames_sent++;
- h2_conn_io_consider_pass(&session->io);
return 0;
}
else {
@@ -630,9 +636,8 @@ static int on_send_data_cb(nghttp2_session *ngh2,
APLOGNO(02925)
"h2_stream(%ld-%d): failed send_data_cb",
session->id, (int)stream_id);
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
}
-
- return h2_session_status_from_apr_status(status);
}
static int on_frame_send_cb(nghttp2_session *ngh2,
@@ -679,43 +684,30 @@ static apr_status_t init_callbacks(conn_rec *c, nghttp2_session_callbacks **pcb)
return APR_SUCCESS;
}
-static void h2_session_cleanup(h2_session *session)
+static void h2_session_destroy(h2_session *session)
{
- AP_DEBUG_ASSERT(session);
- /* This is an early cleanup of the session that may
- * discard what is no longer necessary for *new* streams
- * and general HTTP/2 processing.
- * At this point, all frames are in transit or somehwere in
- * our buffers or passed down output filters.
- * h2 streams might still being written out.
- */
- if (session->c) {
- h2_ctx_clear(session->c);
+ AP_DEBUG_ASSERT(session);
+
+ h2_ihash_clear(session->streams);
+ if (session->mplx) {
+ h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
+ h2_mplx_release_and_join(session->mplx, session->iowait);
+ session->mplx = NULL;
}
+
+ ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
+ session->c->input_filters), "H2_IN");
if (session->ngh2) {
nghttp2_session_del(session->ngh2);
session->ngh2 = NULL;
}
- if (session->spare) {
- apr_pool_destroy(session->spare);
- session->spare = NULL;
+ if (session->c) {
+ h2_ctx_clear(session->c);
}
-}
-
-static void h2_session_destroy(h2_session *session)
-{
- AP_DEBUG_ASSERT(session);
- h2_session_cleanup(session);
if (APLOGctrace1(session->c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- "h2_session(%ld): destroy, %d streams open",
- session->id, (int)h2_ihash_count(session->streams));
- }
- if (session->mplx) {
- h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
- h2_mplx_release_and_join(session->mplx, session->iowait);
- session->mplx = NULL;
+ "h2_session(%ld): destroy", session->id);
}
if (session->pool) {
apr_pool_destroy(session->pool);
@@ -897,7 +889,7 @@ static h2_session *h2_session_create_int(conn_rec *c,
h2_session_receive, session);
ap_add_input_filter("H2_IN", session->cin, r, c);
- h2_conn_io_init(&session->io, c, session->config, session->pool);
+ h2_conn_io_init(&session->io, c, session->config);
session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc);
status = init_callbacks(c, &callbacks);
@@ -1030,7 +1022,7 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
}
/* Now we need to auto-open stream 1 for the request we got. */
- stream = h2_session_open_stream(session, 1);
+ stream = h2_session_open_stream(session, 1, 0, NULL);
if (!stream) {
status = APR_EGENERAL;
ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r,
@@ -1095,65 +1087,6 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
return status;
}
-typedef struct {
- h2_session *session;
- int resume_count;
-} resume_ctx;
-
-static int resume_on_data(void *ctx, void *val)
-{
- h2_stream *stream = val;
- resume_ctx *rctx = (resume_ctx*)ctx;
- h2_session *session = rctx->session;
- AP_DEBUG_ASSERT(session);
- AP_DEBUG_ASSERT(stream);
-
- if (h2_stream_is_suspended(stream)) {
- apr_status_t status;
- apr_off_t len = -1;
- int eos;
-
- status = h2_stream_out_prepare(stream, &len, &eos);
- if (status == APR_SUCCESS) {
- int rv;
- h2_stream_set_suspended(stream, 0);
- ++rctx->resume_count;
-
- rv = nghttp2_session_resume_data(session->ngh2, stream->id);
- ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
- APLOG_ERR : APLOG_DEBUG, 0, session->c,
- APLOGNO(02936)
- "h2_stream(%ld-%d): resuming %s, len=%ld, eos=%d",
- session->id, stream->id,
- rv? nghttp2_strerror(rv) : "", (long)len, eos);
- }
- }
- return 1;
-}
-
-static int h2_session_resume_streams_with_data(h2_session *session)
-{
- AP_DEBUG_ASSERT(session);
- if (!h2_ihash_is_empty(session->streams)
- && session->mplx && !session->mplx->aborted) {
- resume_ctx ctx;
-
- ctx.session = session;
- ctx.resume_count = 0;
-
- /* Resume all streams where we have data in the out queue and
- * which had been suspended before. */
- h2_ihash_iter(session->streams, resume_on_data, &ctx);
- return ctx.resume_count;
- }
- return 0;
-}
-
-h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
-{
- return h2_ihash_get(session->streams, stream_id);
-}
-
static ssize_t stream_data_cb(nghttp2_session *ng2s,
int32_t stream_id,
uint8_t *buf,
@@ -1179,7 +1112,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
(void)ng2s;
(void)buf;
(void)source;
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
APLOGNO(02937)
@@ -1209,7 +1142,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
* it. Remember at our h2_stream that we need to do this.
*/
nread = 0;
- h2_stream_set_suspended(stream, 1);
+ h2_mplx_suspend_stream(session->mplx, stream->id);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071)
"h2_stream(%ld-%d): suspending",
session->id, (int)stream_id);
@@ -1252,102 +1185,6 @@ typedef struct {
size_t offset;
} nvctx_t;
-/**
- * Start submitting the response to a stream request. This is possible
- * once we have all the response headers. The response body will be
- * read by the session using the callback we supply.
- */
-static apr_status_t submit_response(h2_session *session, h2_stream *stream)
-{
- apr_status_t status = APR_SUCCESS;
- h2_response *response = h2_stream_get_response(stream);
- int rv = 0;
- AP_DEBUG_ASSERT(session);
- AP_DEBUG_ASSERT(stream);
- AP_DEBUG_ASSERT(response || stream->rst_error);
-
- if (stream->submitted) {
- rv = NGHTTP2_PROTOCOL_ERROR;
- }
- else if (response && response->headers) {
- nghttp2_data_provider provider, *pprovider = NULL;
- h2_ngheader *ngh;
- const h2_priority *prio;
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
- "h2_stream(%ld-%d): submit response %d",
- session->id, stream->id, response->http_status);
-
- if (response->content_length != 0) {
- memset(&provider, 0, sizeof(provider));
- provider.source.fd = stream->id;
- provider.read_callback = stream_data_cb;
- pprovider = &provider;
- }
-
- /* If this stream is not a pushed one itself,
- * and HTTP/2 server push is enabled here,
- * and the response is in the range 200-299 *),
- * and the remote side has pushing enabled,
- * -> find and perform any pushes on this stream
- * *before* we submit the stream response itself.
- * This helps clients avoid opening new streams on Link
- * headers that get pushed right afterwards.
- *
- * *) the response code is relevant, as we do not want to
- * make pushes on 401 or 403 codes, neiterh on 301/302
- * and friends. And if we see a 304, we do not push either
- * as the client, having this resource in its cache, might
- * also have the pushed ones as well.
- */
- if (stream->request && !stream->request->initiated_on
- && H2_HTTP_2XX(response->http_status)
- && h2_session_push_enabled(session)) {
-
- h2_stream_submit_pushes(stream);
- }
-
- prio = h2_stream_get_priority(stream);
- if (prio) {
- h2_session_set_prio(session, stream, prio);
- /* no showstopper if that fails for some reason */
- }
-
- ngh = h2_util_ngheader_make_res(stream->pool, response->http_status,
- response->headers);
- rv = nghttp2_submit_response(session->ngh2, response->stream_id,
- ngh->nv, ngh->nvlen, pprovider);
- }
- else {
- int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
- "h2_stream(%ld-%d): RST_STREAM, err=%d",
- session->id, stream->id, err);
-
- rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
- stream->id, err);
- }
-
- stream->submitted = 1;
- if (stream->request && stream->request->initiated_on) {
- ++session->pushes_submitted;
- }
- else {
- ++session->responses_submitted;
- }
-
- if (nghttp2_is_fatal(rv)) {
- status = APR_EGENERAL;
- dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
- ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
- APLOGNO(02940) "submit_response: %s",
- nghttp2_strerror(rv));
- }
-
- return status;
-}
-
struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
h2_push *push)
{
@@ -1372,15 +1209,13 @@ struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
session->id, is->id, nid,
push->req->method, push->req->path, is->id);
- stream = h2_session_open_stream(session, nid);
+ stream = h2_session_open_stream(session, nid, is->id, push->req);
if (stream) {
- h2_stream_set_h2_request(stream, is->id, push->req);
status = stream_schedule(session, stream, 1);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): scheduling push stream",
session->id, stream->id);
- h2_stream_cleanup(stream);
stream = NULL;
}
++session->unsent_promises;
@@ -1503,34 +1338,6 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
return status;
}
-apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream)
-{
- apr_pool_t *pool = h2_stream_detach_pool(stream);
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): cleanup by EOS bucket destroy",
- session->id, stream->id);
- /* this may be called while the session has already freed
- * some internal structures or even when the mplx is locked. */
- if (session->mplx) {
- h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
- }
-
- if (session->streams) {
- h2_ihash_remove(session->streams, stream->id);
- }
- h2_stream_destroy(stream);
-
- if (pool) {
- apr_pool_clear(pool);
- if (session->spare) {
- apr_pool_destroy(session->spare);
- }
- session->spare = pool;
- }
- return APR_SUCCESS;
-}
-
int h2_session_push_enabled(h2_session *session)
{
/* iff we can and they can and want */
@@ -1557,6 +1364,7 @@ static apr_status_t h2_session_send(h2_session *session)
if (socket) {
apr_socket_timeout_set(socket, saved_timeout);
}
+ session->have_written = 1;
if (rv != 0) {
if (nghttp2_is_fatal(rv)) {
dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
@@ -1570,6 +1378,148 @@ static apr_status_t h2_session_send(h2_session *session)
return APR_SUCCESS;
}
+/**
+ * A stream was resumed as new output data arrived.
+ */
+static apr_status_t on_stream_resume(void *ctx, int stream_id)
+{
+ h2_session *session = ctx;
+ h2_stream *stream = get_stream(session, stream_id);
+ apr_status_t status = APR_SUCCESS;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): on_resume", session->id, stream_id);
+ if (stream) {
+ int rv = nghttp2_session_resume_data(session->ngh2, stream_id);
+ session->have_written = 1;
+ ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
+ APLOG_ERR : APLOG_DEBUG, 0, session->c,
+ APLOGNO(02936)
+ "h2_stream(%ld-%d): resuming %s",
+ session->id, stream->id, rv? nghttp2_strerror(rv) : "");
+ }
+ return status;
+}
+
+/**
+ * A response for the stream is ready.
+ */
+static apr_status_t on_stream_response(void *ctx, int stream_id)
+{
+ h2_session *session = ctx;
+ h2_stream *stream = get_stream(session, stream_id);
+ apr_status_t status = APR_SUCCESS;
+ h2_response *response;
+ int rv = 0;
+
+ AP_DEBUG_ASSERT(session);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): on_response", session->id, stream_id);
+ if (!stream) {
+ return APR_NOTFOUND;
+ }
+
+ response = h2_stream_get_response(stream);
+ AP_DEBUG_ASSERT(response || stream->rst_error);
+
+ if (stream->submitted) {
+ rv = NGHTTP2_PROTOCOL_ERROR;
+ }
+ else if (response && response->headers) {
+ nghttp2_data_provider provider, *pprovider = NULL;
+ h2_ngheader *ngh;
+ const h2_priority *prio;
+
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
+ "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
+ session->id, stream->id, response->http_status,
+ (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
+
+ if (response->content_length != 0) {
+ memset(&provider, 0, sizeof(provider));
+ provider.source.fd = stream->id;
+ provider.read_callback = stream_data_cb;
+ pprovider = &provider;
+ }
+
+ /* If this stream is not a pushed one itself,
+ * and HTTP/2 server push is enabled here,
+ * and the response is in the range 200-299 *),
+ * and the remote side has pushing enabled,
+ * -> find and perform any pushes on this stream
+ * *before* we submit the stream response itself.
+ * This helps clients avoid opening new streams on Link
+ * headers that get pushed right afterwards.
+ *
+ * *) the response code is relevant, as we do not want to
+ * make pushes on 401 or 403 codes, neiterh on 301/302
+ * and friends. And if we see a 304, we do not push either
+ * as the client, having this resource in its cache, might
+ * also have the pushed ones as well.
+ */
+ if (stream->request && !stream->request->initiated_on
+ && H2_HTTP_2XX(response->http_status)
+ && h2_session_push_enabled(session)) {
+
+ h2_stream_submit_pushes(stream);
+ }
+
+ prio = h2_stream_get_priority(stream);
+ if (prio) {
+ h2_session_set_prio(session, stream, prio);
+ /* no showstopper if that fails for some reason */
+ }
+
+ ngh = h2_util_ngheader_make_res(stream->pool, response->http_status,
+ response->headers);
+ rv = nghttp2_submit_response(session->ngh2, response->stream_id,
+ ngh->nv, ngh->nvlen, pprovider);
+ }
+ else {
+ int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
+
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
+ "h2_stream(%ld-%d): RST_STREAM, err=%d",
+ session->id, stream->id, err);
+
+ rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
+ stream->id, err);
+ }
+
+ stream->submitted = 1;
+ session->have_written = 1;
+
+ if (stream->request && stream->request->initiated_on) {
+ ++session->pushes_submitted;
+ }
+ else {
+ ++session->responses_submitted;
+ }
+
+ if (nghttp2_is_fatal(rv)) {
+ status = APR_EGENERAL;
+ dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
+ APLOGNO(02940) "submit_response: %s",
+ nghttp2_strerror(rv));
+ }
+
+ ++session->unsent_submits;
+
+ /* Unsent push promises are written immediately, as nghttp2
+ * 1.5.0 realizes internal stream data structures only on
+ * send and we might need them for other submits.
+ * Also, to conserve memory, we send at least every 10 submits
+ * so that nghttp2 does not buffer all outbound items too
+ * long.
+ */
+ if (status == APR_SUCCESS
+ && (session->unsent_promises || session->unsent_submits > 10)) {
+ status = h2_session_send(session);
+ }
+ return status;
+}
+
static apr_status_t h2_session_receive(void *ctx, const char *data,
apr_size_t len, apr_size_t *readlen)
{
@@ -1697,36 +1647,6 @@ static int has_suspended_streams(h2_session *session)
return has_suspended;
}
-static apr_status_t h2_session_submit(h2_session *session)
-{
- apr_status_t status = APR_EAGAIN;
- h2_stream *stream;
-
- if (has_unsubmitted_streams(session)) {
- /* If we have responses ready, submit them now. */
- while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) {
- status = submit_response(session, stream);
- ++session->unsent_submits;
-
- /* Unsent push promises are written immediately, as nghttp2
- * 1.5.0 realizes internal stream data structures only on
- * send and we might need them for other submits.
- * Also, to conserve memory, we send at least every 10 submits
- * so that nghttp2 does not buffer all outbound items too
- * long.
- */
- if (status == APR_SUCCESS
- && (session->unsent_promises || session->unsent_submits > 10)) {
- status = h2_session_send(session);
- if (status != APR_SUCCESS) {
- break;
- }
- }
- }
- }
- return status;
-}
-
static const char *StateNames[] = {
"INIT", /* H2_SESSION_ST_INIT */
"DONE", /* H2_SESSION_ST_DONE */
@@ -1757,12 +1677,51 @@ static int is_accepting_streams(h2_session *session)
}
}
+static void update_child_status(h2_session *session, int status, const char *msg)
+{
+ /* Assume that we also change code/msg when something really happened and
+ * avoid updating the scoreboard in between */
+ if (session->last_status_code != status
+ || session->last_status_msg != msg) {
+ apr_snprintf(session->status, sizeof(session->status),
+ "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)",
+ msg? msg : "-",
+ (int)session->open_streams,
+ (int)session->remote.emitted_count,
+ (int)session->responses_submitted,
+ (int)session->pushes_submitted,
+ (int)session->pushes_reset + session->streams_reset);
+ ap_update_child_status_descr(session->c->sbh, status, session->status);
+ }
+}
+
static void transit(h2_session *session, const char *action, h2_session_state nstate)
{
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03078)
- "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id,
- state_name(session->state), action, state_name(nstate));
- session->state = nstate;
+ if (session->state != nstate) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03078)
+ "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id,
+ state_name(session->state), action, state_name(nstate));
+ session->state = nstate;
+ switch (session->state) {
+ case H2_SESSION_ST_IDLE:
+ update_child_status(session, (session->open_streams == 0?
+ SERVER_BUSY_KEEPALIVE
+ : SERVER_BUSY_READ), "idle");
+ break;
+ case H2_SESSION_ST_REMOTE_SHUTDOWN:
+ update_child_status(session, SERVER_CLOSING, "remote goaway");
+ break;
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ update_child_status(session, SERVER_CLOSING, "local goaway");
+ break;
+ case H2_SESSION_ST_DONE:
+ update_child_status(session, SERVER_CLOSING, "done");
+ break;
+ default:
+ /* nop */
+ break;
+ }
+ }
}
static void h2_session_ev_init(h2_session *session, int arg, const char *msg)
@@ -1771,7 +1730,6 @@ static void h2_session_ev_init(h2_session *session, int arg, const char *msg)
case H2_SESSION_ST_INIT:
transit(session, "init", H2_SESSION_ST_BUSY);
break;
-
default:
/* nop */
break;
@@ -1827,7 +1785,7 @@ static void h2_session_ev_conn_error(h2_session *session, int arg, const char *m
break;
default:
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03401)
"h2_session(%ld): conn error -> shutdown", session->id);
h2_session_shutdown(session, arg, msg, 0);
break;
@@ -1844,7 +1802,7 @@ static void h2_session_ev_proto_error(h2_session *session, int arg, const char *
break;
default:
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03402)
"h2_session(%ld): proto error -> shutdown", session->id);
h2_session_shutdown(session, arg, msg, 0);
break;
@@ -1870,47 +1828,52 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
case H2_SESSION_ST_BUSY:
case H2_SESSION_ST_LOCAL_SHUTDOWN:
case H2_SESSION_ST_REMOTE_SHUTDOWN:
- /* nothing for input and output to do. If we remain
- * in this state, we go into a tight loop and suck up
- * CPU cycles. Ideally, we'd like to do a blocking read, but that
- * is not possible if we have scheduled tasks and wait
- * for them to produce something. */
- if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
- dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
- }
- if (h2_ihash_is_empty(session->streams)) {
- if (!is_accepting_streams(session)) {
- /* We are no longer accepting new streams and have
- * finished processing existing ones. Time to leave. */
- h2_session_shutdown(session, arg, msg, 0);
- transit(session, "no io", H2_SESSION_ST_DONE);
+ /* Nothing to READ, nothing to WRITE on the master connection.
+ * Possible causes:
+ * - we wait for the client to send us sth
+ * - we wait for started tasks to produce output
+ * - we have finished all streams and the client has sent GO_AWAY
+ */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_session(%ld): NO_IO event, %d streams open",
+ session->id, session->open_streams);
+ if (session->open_streams > 0) {
+ if (has_unsubmitted_streams(session)
+ || has_suspended_streams(session)) {
+ /* waiting for at least one stream to produce data */
+ transit(session, "no io", H2_SESSION_ST_WAIT);
}
else {
- apr_time_t now = apr_time_now();
- /* When we have no streams, no task event are possible,
- * switch to blocking reads */
- transit(session, "no io", H2_SESSION_ST_IDLE);
- session->idle_until = (session->remote.emitted_count?
- session->s->keep_alive_timeout :
- session->s->timeout) + now;
- session->keep_sync_until = now + apr_time_from_sec(1);
+ /* we have streams open, and all are submitted and none
+ * is suspended. The only thing keeping us from WRITEing
+ * more must be the flow control.
+ * This means we only wait for WINDOW_UPDATE from the
+ * client and can block on READ. */
+ transit(session, "no io (flow wait)", H2_SESSION_ST_IDLE);
+ session->idle_until = apr_time_now() + session->s->timeout;
+ session->keep_sync_until = session->idle_until;
+ /* Make sure we have flushed all previously written output
+ * so that the client will react. */
+ if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ return;
+ }
}
}
- else if (!has_unsubmitted_streams(session)
- && !has_suspended_streams(session)) {
- /* none of our streams is waiting for a response or
- * new output data from task processing,
- * switch to blocking reads. We are probably waiting on
- * window updates. */
- transit(session, "no io", H2_SESSION_ST_IDLE);
- session->idle_until = apr_time_now() + session->s->timeout;
- session->keep_sync_until = session->idle_until;
+ else if (is_accepting_streams(session)) {
+ /* When we have no streams, but accept new, switch to idle */
+ apr_time_t now = apr_time_now();
+ transit(session, "no io (keepalive)", H2_SESSION_ST_IDLE);
+ session->idle_until = (session->remote.emitted_count?
+ session->s->keep_alive_timeout :
+ session->s->timeout) + now;
+ session->keep_sync_until = now + apr_time_from_sec(1);
}
else {
- /* Unable to do blocking reads, as we wait on events from
- * task processing in other threads. Do a busy wait with
- * backoff timer. */
- transit(session, "no io", H2_SESSION_ST_WAIT);
+ /* We are no longer accepting new streams and there are
+ * none left. Time to leave. */
+ h2_session_shutdown(session, arg, msg, 0);
+ transit(session, "no io", H2_SESSION_ST_DONE);
}
break;
default:
@@ -1938,7 +1901,6 @@ static void h2_session_ev_data_read(h2_session *session, int arg, const char *ms
case H2_SESSION_ST_WAIT:
transit(session, "data read", H2_SESSION_ST_BUSY);
break;
- /* fall through */
default:
/* nop */
break;
@@ -1983,6 +1945,37 @@ static void h2_session_ev_pre_close(h2_session *session, int arg, const char *ms
}
}
+static void h2_session_ev_stream_open(h2_session *session, int arg, const char *msg)
+{
+ ++session->open_streams;
+ switch (session->state) {
+ case H2_SESSION_ST_IDLE:
+ if (session->open_streams == 1) {
+ /* enter tiomeout, since we have a stream again */
+ session->idle_until = (session->s->timeout + apr_time_now());
+ }
+ break;
+ default:
+ break;
+ }
+}
+
+static void h2_session_ev_stream_done(h2_session *session, int arg, const char *msg)
+{
+ --session->open_streams;
+ switch (session->state) {
+ case H2_SESSION_ST_IDLE:
+ if (session->open_streams == 0) {
+ /* enter keepalive timeout, since we no longer have streams */
+ session->idle_until = (session->s->keep_alive_timeout
+ + apr_time_now());
+ }
+ break;
+ default:
+ break;
+ }
+}
+
static void dispatch_event(h2_session *session, h2_session_event_t ev,
int arg, const char *msg)
{
@@ -2023,6 +2016,12 @@ static void dispatch_event(h2_session *session, h2_session_event_t ev,
case H2_SESSION_EV_PRE_CLOSE:
h2_session_ev_pre_close(session, arg, msg);
break;
+ case H2_SESSION_EV_STREAM_OPEN:
+ h2_session_ev_stream_open(session, arg, msg);
+ break;
+ case H2_SESSION_EV_STREAM_DONE:
+ h2_session_ev_stream_done(session, arg, msg);
+ break;
default:
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%ld): unknown event %d",
@@ -2037,39 +2036,25 @@ static void dispatch_event(h2_session *session, h2_session_event_t ev,
static const int MAX_WAIT_MICROS = 200 * 1000;
-static void update_child_status(h2_session *session, int status, const char *msg)
-{
- /* Assume that we also change code/msg when something really happened and
- * avoid updating the scoreboard in between */
- if (session->last_status_code != status
- || session->last_status_msg != msg) {
- apr_snprintf(session->status, sizeof(session->status),
- "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)",
- msg? msg : "-",
- (int)h2_ihash_count(session->streams),
- (int)session->remote.emitted_count,
- (int)session->responses_submitted,
- (int)session->pushes_submitted,
- (int)session->pushes_reset + session->streams_reset);
- ap_update_child_status_descr(session->c->sbh, status, session->status);
- }
-}
-
apr_status_t h2_session_process(h2_session *session, int async)
{
apr_status_t status = APR_SUCCESS;
conn_rec *c = session->c;
- int rv, have_written, have_read, mpm_state, no_streams;
+ int rv, mpm_state, trace = APLOGctrace3(c);
- ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): process start, async=%d", session->id, async);
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): process start, async=%d",
+ session->id, async);
+ }
if (c->cs) {
c->cs->state = CONN_STATE_WRITE_COMPLETION;
}
while (1) {
- have_read = have_written = 0;
+ trace = APLOGctrace3(c);
+ session->have_read = session->have_written = 0;
if (!ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state)) {
if (mpm_state == AP_MPMQ_STOPPING) {
@@ -2102,14 +2087,15 @@ apr_status_t h2_session_process(h2_session *session, int async)
break;
case H2_SESSION_ST_IDLE:
- no_streams = h2_ihash_is_empty(session->streams);
- update_child_status(session, (no_streams? SERVER_BUSY_KEEPALIVE
- : SERVER_BUSY_READ), "idle");
- /* make certain, the client receives everything before we idle */
- if (!session->keep_sync_until
- && async && no_streams && !session->r && session->remote.emitted_count) {
- ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): async idle, nonblock read", session->id);
+ /* make certain, we send everything before we idle */
+ if (!session->keep_sync_until && async && !session->open_streams
+ && !session->r && session->remote.emitted_count) {
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): async idle, nonblock read, "
+ "%d streams open", session->id,
+ session->open_streams);
+ }
/* We do not return to the async mpm immediately, since under
* load, mpms show the tendency to throw keep_alive connections
* away very rapidly.
@@ -2122,7 +2108,7 @@ apr_status_t h2_session_process(h2_session *session, int async)
status = h2_session_read(session, 0);
if (status == APR_SUCCESS) {
- have_read = 1;
+ session->have_read = 1;
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
else if (APR_STATUS_IS_EAGAIN(status) || APR_STATUS_IS_TIMEUP(status)) {
@@ -2136,12 +2122,19 @@ apr_status_t h2_session_process(h2_session *session, int async)
}
else {
ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, c,
+ APLOGNO(03403)
"h2_session(%ld): idle, no data, error",
session->id);
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "timeout");
}
}
else {
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): sync idle, stutter 1-sec, "
+ "%d streams open", session->id,
+ session->open_streams);
+ }
/* We wait in smaller increments, using a 1 second timeout.
* That gives us the chance to check for MPMQ_STOPPING often.
*/
@@ -2153,7 +2146,7 @@ apr_status_t h2_session_process(h2_session *session, int async)
h2_filter_cin_timeout_set(session->cin, apr_time_from_sec(1));
status = h2_session_read(session, 1);
if (status == APR_SUCCESS) {
- have_read = 1;
+ session->have_read = 1;
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
else if (status == APR_EAGAIN) {
@@ -2167,11 +2160,26 @@ apr_status_t h2_session_process(h2_session *session, int async)
session->keep_sync_until = 0;
}
if (now > session->idle_until) {
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): keepalive timeout",
+ session->id);
+ }
dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
}
+ else if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): keepalive, %f sec left",
+ session->id, (session->idle_until - now) / 1000000.0f);
+ }
/* continue reading handling */
}
else {
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): idle(1 sec timeout) "
+ "read failed", session->id);
+ }
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "error");
}
}
@@ -2186,8 +2194,7 @@ apr_status_t h2_session_process(h2_session *session, int async)
h2_filter_cin_timeout_set(session->cin, session->s->timeout);
status = h2_session_read(session, 0);
if (status == APR_SUCCESS) {
- have_read = 1;
- update_child_status(session, SERVER_BUSY_READ, "busy");
+ session->have_read = 1;
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
else if (status == APR_EAGAIN) {
@@ -2202,42 +2209,32 @@ apr_status_t h2_session_process(h2_session *session, int async)
}
}
- if (!h2_ihash_is_empty(session->streams)) {
- /* resume any streams for which data is available again */
- h2_session_resume_streams_with_data(session);
- /* Submit any responses/push_promises that are ready */
- status = h2_session_submit(session);
- if (status == APR_SUCCESS) {
- have_written = 1;
- }
- else if (status != APR_EAGAIN) {
- dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
- H2_ERR_INTERNAL_ERROR, "submit error");
- break;
- }
- /* send out window updates for our inputs */
- status = h2_mplx_in_update_windows(session->mplx);
- if (status != APR_SUCCESS && status != APR_EAGAIN) {
- dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
- H2_ERR_INTERNAL_ERROR, "window update error");
- break;
- }
+ /* trigger window updates, stream resumes and submits */
+ status = h2_mplx_dispatch_master_events(session->mplx,
+ on_stream_resume,
+ on_stream_response,
+ session);
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): dispatch error",
+ session->id);
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
+ H2_ERR_INTERNAL_ERROR,
+ "dispatch error");
+ break;
}
if (nghttp2_session_want_write(session->ngh2)) {
ap_update_child_status(session->c->sbh, SERVER_BUSY_WRITE, NULL);
status = h2_session_send(session);
- if (status == APR_SUCCESS) {
- have_written = 1;
- }
- else {
+ if (status != APR_SUCCESS) {
dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
- H2_ERR_INTERNAL_ERROR, "writing");
+ H2_ERR_INTERNAL_ERROR, "writing");
break;
}
}
- if (have_read || have_written) {
+ if (session->have_read || session->have_written) {
if (session->wait_us) {
session->wait_us = 0;
}
@@ -2253,13 +2250,15 @@ apr_status_t h2_session_process(h2_session *session, int async)
session->start_wait = apr_time_now();
if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ break;
}
- update_child_status(session, SERVER_BUSY_READ, "wait");
}
else if ((apr_time_now() - session->start_wait) >= session->s->timeout) {
/* waited long enough */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, APR_TIMEUP, c,
- "h2_session: wait for data");
+ if (trace) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, APR_TIMEUP, c,
+ "h2_session: wait for data");
+ }
dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
break;
}
@@ -2268,8 +2267,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
session->wait_us = H2MIN(session->wait_us*2, MAX_WAIT_MICROS);
}
- if (APLOGctrace1(c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ if (trace) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, c,
"h2_session: wait for data, %ld micros",
(long)session->wait_us);
}
@@ -2279,13 +2278,18 @@ apr_status_t h2_session_process(h2_session *session, int async)
session->wait_us = 0;
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
- else if (status == APR_TIMEUP) {
+ else if (APR_STATUS_IS_TIMEUP(status)) {
/* go back to checking all inputs again */
transit(session, "wait cycle", session->local.accepting?
H2_SESSION_ST_BUSY : H2_SESSION_ST_LOCAL_SHUTDOWN);
}
+ else if (APR_STATUS_IS_ECONNRESET(status)
+ || APR_STATUS_IS_ECONNABORTED(status)) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ }
else {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
+ APLOGNO(03404)
"h2_session(%ld): waiting on conditional",
session->id);
h2_session_shutdown(session, H2_ERR_INTERNAL_ERROR,
@@ -2294,7 +2298,6 @@ apr_status_t h2_session_process(h2_session *session, int async)
break;
case H2_SESSION_ST_DONE:
- update_child_status(session, SERVER_CLOSING, "done");
status = APR_EOF;
goto out;
@@ -2317,10 +2320,12 @@ apr_status_t h2_session_process(h2_session *session, int async)
}
out:
- ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): [%s] process returns",
- session->id, state_name(session->state));
-
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): [%s] process returns",
+ session->id, state_name(session->state));
+ }
+
if ((session->state != H2_SESSION_ST_DONE)
&& (APR_STATUS_IS_EOF(status)
|| APR_STATUS_IS_ECONNRESET(status)