summaryrefslogtreecommitdiff
path: root/modules/http2/h2_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'modules/http2/h2_stream.c')
-rw-r--r--modules/http2/h2_stream.c507
1 files changed, 286 insertions, 221 deletions
diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c
index 0a1dadf9..a7a67641 100644
--- a/modules/http2/h2_stream.c
+++ b/modules/http2/h2_stream.c
@@ -24,6 +24,8 @@
#include <nghttp2/nghttp2.h>
#include "h2_private.h"
+#include "h2.h"
+#include "h2_bucket_beam.h"
#include "h2_conn.h"
#include "h2_config.h"
#include "h2_h2.h"
@@ -36,7 +38,6 @@
#include "h2_stream.h"
#include "h2_task.h"
#include "h2_ctx.h"
-#include "h2_task_input.h"
#include "h2_task.h"
#include "h2_util.h"
@@ -52,6 +53,20 @@ static int state_transition[][7] = {
/*CL*/{ 1, 1, 0, 0, 1, 1, 1 },
};
+static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, char *tag)
+{
+ if (APLOG_C_IS_LEVEL(s->session->c, lvl)) {
+ conn_rec *c = s->session->c;
+ char buffer[4 * 1024];
+ const char *line = "(null)";
+ apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
+
+ len = h2_util_bb_print(buffer, bmax, tag, "", s->buffer);
+ ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%ld-%d): %s",
+ c->id, s->id, len? buffer : line);
+ }
+}
+
static int set_state(h2_stream *stream, h2_stream_state_t state)
{
int allowed = state_transition[state][stream->state];
@@ -135,37 +150,97 @@ static int output_open(h2_stream *stream)
}
}
-static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response);
+static apr_status_t stream_pool_cleanup(void *ctx)
+{
+ h2_stream *stream = ctx;
+ apr_status_t status;
+
+ if (stream->input) {
+ h2_beam_destroy(stream->input);
+ stream->input = NULL;
+ }
+ if (stream->files) {
+ apr_file_t *file;
+ int i;
+ for (i = 0; i < stream->files->nelts; ++i) {
+ file = APR_ARRAY_IDX(stream->files, i, apr_file_t*);
+ status = apr_file_close(file);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, stream->session->c,
+ "h2_stream(%ld-%d): destroy, closed file %d",
+ stream->session->id, stream->id, i);
+ }
+ stream->files = NULL;
+ }
+ return APR_SUCCESS;
+}
-h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
+h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
+ int initiated_on, const h2_request *creq)
{
+ h2_request *req;
h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
+
stream->id = id;
stream->state = H2_STREAM_ST_IDLE;
stream->pool = pool;
stream->session = session;
set_state(stream, H2_STREAM_ST_OPEN);
- stream->request = h2_request_create(id, pool,
- h2_config_geti(session->config, H2_CONF_SER_HEADERS));
+ if (creq) {
+ /* take it into out pool and assure correct id's */
+ req = h2_request_clone(pool, creq);
+ req->id = id;
+ req->initiated_on = initiated_on;
+ }
+ else {
+ req = h2_req_create(id, pool,
+ h2_config_geti(session->config, H2_CONF_SER_HEADERS));
+ }
+ stream->request = req;
+
+ apr_pool_cleanup_register(pool, stream, stream_pool_cleanup,
+ apr_pool_cleanup_null);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03082)
"h2_stream(%ld-%d): opened", session->id, stream->id);
return stream;
}
-apr_status_t h2_stream_destroy(h2_stream *stream)
+void h2_stream_cleanup(h2_stream *stream)
+{
+ AP_DEBUG_ASSERT(stream);
+ if (stream->buffer) {
+ apr_brigade_cleanup(stream->buffer);
+ }
+ if (stream->input) {
+ apr_status_t status;
+ status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ, 1);
+ if (status == APR_EAGAIN) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ "h2_stream(%ld-%d): wait on input shutdown",
+ stream->session->id, stream->id);
+ status = h2_beam_shutdown(stream->input, APR_BLOCK_READ, 1);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
+ "h2_stream(%ld-%d): input shutdown returned",
+ stream->session->id, stream->id);
+ }
+ }
+}
+
+void h2_stream_destroy(h2_stream *stream)
{
AP_DEBUG_ASSERT(stream);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c,
+ "h2_stream(%ld-%d): destroy",
+ stream->session->id, stream->id);
if (stream->pool) {
apr_pool_destroy(stream->pool);
}
- return APR_SUCCESS;
}
-void h2_stream_cleanup(h2_stream *stream)
+void h2_stream_eos_destroy(h2_stream *stream)
{
- h2_session_stream_destroy(stream->session, stream);
- /* stream is gone */
+ h2_session_stream_done(stream->session, stream);
+ /* stream possibly destroyed */
}
apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
@@ -187,33 +262,7 @@ void h2_stream_rst(h2_stream *stream, int error_code)
struct h2_response *h2_stream_get_response(h2_stream *stream)
{
- return stream->sos? stream->sos->response : NULL;
-}
-
-apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
- apr_bucket_brigade *bb)
-{
- apr_status_t status = APR_SUCCESS;
- h2_sos *sos;
-
- if (!output_open(stream)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- "h2_stream(%ld-%d): output closed",
- stream->session->id, stream->id);
- return APR_ECONNRESET;
- }
-
- sos = h2_sos_mplx_create(stream, response);
- if (sos->response->sos_filter) {
- sos = h2_filter_sos_create(sos->response->sos_filter, sos);
- }
- stream->sos = sos;
-
- status = stream->sos->buffer(stream->sos, bb);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
- "h2_stream(%ld-%d): set_response(%d)",
- stream->session->id, stream->id, stream->sos->response->http_status);
- return status;
+ return stream->response;
}
apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
@@ -224,26 +273,57 @@ apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
return APR_ECONNRESET;
}
set_state(stream, H2_STREAM_ST_OPEN);
- status = h2_request_rwrite(stream->request, r);
+ status = h2_request_rwrite(stream->request, stream->pool, r);
stream->request->serialize = h2_config_geti(h2_config_rget(r),
H2_CONF_SER_HEADERS);
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
+ "h2_request(%d): rwrite %s host=%s://%s%s",
+ stream->request->id, stream->request->method,
+ stream->request->scheme, stream->request->authority,
+ stream->request->path);
return status;
}
-void h2_stream_set_h2_request(h2_stream *stream, int initiated_on,
- const h2_request *req)
-{
- h2_request_copy(stream->pool, stream->request, req);
- stream->request->initiated_on = initiated_on;
- stream->request->eoh = 0;
-}
-
apr_status_t h2_stream_add_header(h2_stream *stream,
const char *name, size_t nlen,
const char *value, size_t vlen)
{
AP_DEBUG_ASSERT(stream);
+ if (!stream->response) {
+ if (name[0] == ':') {
+ if ((vlen) > stream->session->s->limit_req_line) {
+ /* pseudo header: approximation of request line size check */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ "h2_stream(%ld-%d): pseudo header %s too long",
+ stream->session->id, stream->id, name);
+ return h2_stream_set_error(stream,
+ HTTP_REQUEST_URI_TOO_LARGE);
+ }
+ }
+ else if ((nlen + 2 + vlen) > stream->session->s->limit_req_fieldsize) {
+ /* header too long */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ "h2_stream(%ld-%d): header %s too long",
+ stream->session->id, stream->id, name);
+ return h2_stream_set_error(stream,
+ HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE);
+ }
+
+ if (name[0] != ':') {
+ ++stream->request_headers_added;
+ if (stream->request_headers_added
+ > stream->session->s->limit_req_fields) {
+ /* too many header lines */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ "h2_stream(%ld-%d): too many header lines",
+ stream->session->id, stream->id);
+ return h2_stream_set_error(stream,
+ HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE);
+ }
+ }
+ }
+
if (h2_stream_is_scheduled(stream)) {
return h2_request_add_trailer(stream->request, stream->pool,
name, nlen, value, vlen);
@@ -275,21 +355,22 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
close_input(stream);
}
+ if (stream->response) {
+ /* already have a resonse, probably a HTTP error code */
+ return h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
+ }
+
/* Seeing the end-of-headers, we have everything we need to
* start processing it.
*/
status = h2_request_end_headers(stream->request, stream->pool,
eos, push_enabled);
if (status == APR_SUCCESS) {
- if (!eos) {
- stream->request->body = 1;
- }
- stream->input_remaining = stream->request->content_length;
-
- status = h2_mplx_process(stream->session->mplx, stream->id,
- stream->request, cmp, ctx);
+ stream->request->body = !eos;
stream->scheduled = 1;
+ stream->input_remaining = stream->request->content_length;
+ status = h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): scheduled %s %s://%s%s",
stream->session->id, stream->id,
@@ -298,7 +379,7 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
}
else {
h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
"h2_stream(%ld-%d): RST=2 (internal err) %s %s://%s%s",
stream->session->id, stream->id,
stream->request->method, stream->request->scheme,
@@ -326,8 +407,8 @@ apr_status_t h2_stream_close_input(h2_stream *stream)
return APR_ECONNRESET;
}
- if (close_input(stream)) {
- status = h2_mplx_in_close(stream->session->mplx, stream->id);
+ if (close_input(stream) && stream->input) {
+ status = h2_beam_close(stream->input);
}
return status;
}
@@ -335,25 +416,29 @@ apr_status_t h2_stream_close_input(h2_stream *stream)
apr_status_t h2_stream_write_data(h2_stream *stream,
const char *data, size_t len, int eos)
{
+ conn_rec *c = stream->session->c;
apr_status_t status = APR_SUCCESS;
AP_DEBUG_ASSERT(stream);
+ if (!stream->input) {
+ return APR_EOF;
+ }
if (input_closed(stream) || !stream->request->eoh) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d",
stream->session->id, stream->id, input_closed(stream),
stream->request->eoh);
return APR_EINVAL;
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): add %ld input bytes",
stream->session->id, stream->id, (long)len);
if (!stream->request->chunked) {
stream->input_remaining -= len;
if (stream->input_remaining < 0) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c,
APLOGNO(02961)
"h2_stream(%ld-%d): got %ld more content bytes than announced "
"in content-length header: %ld",
@@ -365,10 +450,18 @@ apr_status_t h2_stream_write_data(h2_stream *stream,
}
}
- status = h2_mplx_in_write(stream->session->mplx, stream->id, data, len, eos);
+ if (!stream->tmp) {
+ stream->tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
+ }
+ apr_brigade_write(stream->tmp, NULL, NULL, data, len);
if (eos) {
+ APR_BRIGADE_INSERT_TAIL(stream->tmp,
+ apr_bucket_eos_create(c->bucket_alloc));
close_input(stream);
}
+
+ status = h2_beam_send(stream->input, stream->tmp, APR_BLOCK_READ);
+ apr_brigade_cleanup(stream->tmp);
return status;
}
@@ -387,44 +480,160 @@ int h2_stream_is_suspended(const h2_stream *stream)
return stream->suspended;
}
-apr_status_t h2_stream_out_prepare(h2_stream *stream,
- apr_off_t *plen, int *peos)
+static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount)
{
- if (stream->rst_error) {
- *plen = 0;
- *peos = 1;
+ conn_rec *c = stream->session->c;
+ apr_bucket *b;
+ apr_status_t status;
+
+ if (!stream->output) {
+ return APR_EOF;
+ }
+ status = h2_beam_receive(stream->output, stream->buffer,
+ APR_NONBLOCK_READ, amount);
+ /* The buckets we reveive are using the stream->buffer pool as
+ * lifetime which is exactly what we want since this is stream->pool.
+ *
+ * However: when we send these buckets down the core output filters, the
+ * filter might decide to setaside them into a pool of its own. And it
+ * might decide, after having sent the buckets, to clear its pool.
+ *
+ * This is problematic for file buckets because it then closed the contained
+ * file. Any split off buckets we sent afterwards will result in a
+ * APR_EBADF.
+ */
+ for (b = APR_BRIGADE_FIRST(stream->buffer);
+ b != APR_BRIGADE_SENTINEL(stream->buffer);
+ b = APR_BUCKET_NEXT(b)) {
+ if (APR_BUCKET_IS_FILE(b)) {
+ apr_bucket_file *f = (apr_bucket_file *)b->data;
+ apr_pool_t *fpool = apr_file_pool_get(f->fd);
+ if (fpool != c->pool) {
+ apr_bucket_setaside(b, c->pool);
+ if (!stream->files) {
+ stream->files = apr_array_make(stream->pool,
+ 5, sizeof(apr_file_t*));
+ }
+ APR_ARRAY_PUSH(stream->files, apr_file_t*) = f->fd;
+ }
+ }
+ }
+ return status;
+}
+
+apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
+ h2_bucket_beam *output)
+{
+ apr_status_t status = APR_SUCCESS;
+ conn_rec *c = stream->session->c;
+
+ if (!output_open(stream)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "h2_stream(%ld-%d): output closed",
+ stream->session->id, stream->id);
return APR_ECONNRESET;
}
+
+ stream->response = response;
+ stream->output = output;
+ stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
+
+ h2_stream_filter(stream);
+ if (stream->output) {
+ status = fill_buffer(stream, 0);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+ "h2_stream(%ld-%d): set_response(%d)",
+ stream->session->id, stream->id,
+ stream->response->http_status);
+ return status;
+}
- AP_DEBUG_ASSERT(stream->sos);
- return stream->sos->prepare(stream->sos, plen, peos);
+apr_status_t h2_stream_set_error(h2_stream *stream, int http_status)
+{
+ h2_response *response;
+
+ if (stream->submitted) {
+ return APR_EINVAL;
+ }
+ response = h2_response_die(stream->id, http_status, stream->request,
+ stream->pool);
+ return h2_stream_set_response(stream, response, NULL);
}
-apr_status_t h2_stream_readx(h2_stream *stream,
- h2_io_data_cb *cb, void *ctx,
- apr_off_t *plen, int *peos)
+static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9);
+
+apr_status_t h2_stream_out_prepare(h2_stream *stream,
+ apr_off_t *plen, int *peos)
{
+ conn_rec *c = stream->session->c;
+ apr_status_t status = APR_SUCCESS;
+ apr_off_t requested;
+
if (stream->rst_error) {
+ *plen = 0;
+ *peos = 1;
return APR_ECONNRESET;
}
- if (!stream->sos) {
- return APR_EGENERAL;
+
+ if (*plen > 0) {
+ requested = H2MIN(*plen, DATA_CHUNK_SIZE);
}
- return stream->sos->readx(stream->sos, cb, ctx, plen, peos);
+ else {
+ requested = DATA_CHUNK_SIZE;
+ }
+ *plen = requested;
+
+ H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre");
+ h2_util_bb_avail(stream->buffer, plen, peos);
+ if (!*peos && *plen < requested) {
+ /* try to get more data */
+ status = fill_buffer(stream, (requested - *plen) + DATA_CHUNK_SIZE);
+ if (APR_STATUS_IS_EOF(status)) {
+ apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
+ status = APR_SUCCESS;
+ }
+ else if (status == APR_EAGAIN) {
+ /* did not receive more, it's ok */
+ status = APR_SUCCESS;
+ }
+ *plen = requested;
+ h2_util_bb_avail(stream->buffer, plen, peos);
+ }
+ H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_post");
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+ "h2_stream(%ld-%d): prepare, len=%ld eos=%d, trailers=%s",
+ c->id, stream->id, (long)*plen, *peos,
+ (stream->response && stream->response->trailers)?
+ "yes" : "no");
+ if (!*peos && !*plen && status == APR_SUCCESS) {
+ return APR_EAGAIN;
+ }
+ return status;
}
+
apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
apr_off_t *plen, int *peos)
{
+ conn_rec *c = stream->session->c;
+ apr_status_t status = APR_SUCCESS;
+
if (stream->rst_error) {
return APR_ECONNRESET;
}
- if (!stream->sos) {
- return APR_EGENERAL;
+ status = h2_append_brigade(bb, stream->buffer, plen, peos);
+ if (status == APR_SUCCESS && !*peos && !*plen) {
+ status = APR_EAGAIN;
}
- return stream->sos->read_to(stream->sos, bb, plen, peos);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
+ "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
+ c->id, stream->id, (long)*plen, *peos);
+ return status;
}
+
int h2_stream_input_is_open(const h2_stream *stream)
{
return input_open(stream);
@@ -469,7 +678,7 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream)
apr_table_t *h2_stream_get_trailers(h2_stream *stream)
{
- return stream->sos? stream->sos->get_trailers(stream->sos) : NULL;
+ return stream->response? stream->response->trailers : NULL;
}
const h2_priority *h2_stream_get_priority(h2_stream *stream)
@@ -486,147 +695,3 @@ const h2_priority *h2_stream_get_priority(h2_stream *stream)
return NULL;
}
-/*******************************************************************************
- * h2_sos_mplx
- ******************************************************************************/
-
-typedef struct h2_sos_mplx {
- h2_mplx *m;
- apr_bucket_brigade *bb;
- apr_bucket_brigade *tmp;
- apr_table_t *trailers;
- apr_off_t buffer_size;
-} h2_sos_mplx;
-
-#define H2_SOS_MPLX_OUT(lvl,msos,msg) \
- do { \
- if (APLOG_C_IS_LEVEL((msos)->m->c,lvl)) \
- h2_util_bb_log((msos)->m->c,(msos)->m->id,lvl,msg,(msos)->bb); \
- } while(0)
-
-
-static apr_status_t mplx_transfer(h2_sos_mplx *msos, int stream_id,
- apr_pool_t *pool)
-{
- apr_status_t status;
- apr_table_t *trailers = NULL;
-
- if (!msos->tmp) {
- msos->tmp = apr_brigade_create(msos->bb->p, msos->bb->bucket_alloc);
- }
- status = h2_mplx_out_get_brigade(msos->m, stream_id, msos->tmp,
- msos->buffer_size-1, &trailers);
- if (!APR_BRIGADE_EMPTY(msos->tmp)) {
- h2_transfer_brigade(msos->bb, msos->tmp, pool);
- }
- if (trailers) {
- msos->trailers = trailers;
- }
- return status;
-}
-
-static apr_status_t h2_sos_mplx_read_to(h2_sos *sos, apr_bucket_brigade *bb,
- apr_off_t *plen, int *peos)
-{
- h2_sos_mplx *msos = sos->ctx;
- apr_status_t status;
-
- status = h2_append_brigade(bb, msos->bb, plen, peos);
- if (status == APR_SUCCESS && !*peos && !*plen) {
- status = APR_EAGAIN;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, msos->m->c,
- "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
- msos->m->id, sos->stream->id, (long)*plen, *peos);
- }
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
- "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
- msos->m->id, sos->stream->id, (long)*plen, *peos);
- return status;
-}
-
-static apr_status_t h2_sos_mplx_readx(h2_sos *sos, h2_io_data_cb *cb, void *ctx,
- apr_off_t *plen, int *peos)
-{
- h2_sos_mplx *msos = sos->ctx;
- apr_status_t status = APR_SUCCESS;
-
- status = h2_util_bb_readx(msos->bb, cb, ctx, plen, peos);
- if (status == APR_SUCCESS && !*peos && !*plen) {
- status = APR_EAGAIN;
- }
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, msos->m->c,
- "h2_stream(%ld-%d): readx, len=%ld eos=%d",
- msos->m->id, sos->stream->id, (long)*plen, *peos);
- return status;
-}
-
-static apr_status_t h2_sos_mplx_prepare(h2_sos *sos, apr_off_t *plen, int *peos)
-{
- h2_sos_mplx *msos = sos->ctx;
- apr_status_t status = APR_SUCCESS;
-
- H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prepare_pre");
-
- if (APR_BRIGADE_EMPTY(msos->bb)) {
- status = mplx_transfer(msos, sos->stream->id, sos->stream->pool);
- }
- h2_util_bb_avail(msos->bb, plen, peos);
-
- H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prepare_post");
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
- "h2_stream(%ld-%d): prepare, len=%ld eos=%d, trailers=%s",
- msos->m->id, sos->stream->id, (long)*plen, *peos,
- msos->trailers? "yes" : "no");
- if (!*peos && !*plen) {
- status = APR_EAGAIN;
- }
-
- return status;
-}
-
-static apr_table_t *h2_sos_mplx_get_trailers(h2_sos *sos)
-{
- h2_sos_mplx *msos = sos->ctx;
-
- return msos->trailers;
-}
-
-static apr_status_t h2_sos_mplx_buffer(h2_sos *sos, apr_bucket_brigade *bb)
-{
- h2_sos_mplx *msos = sos->ctx;
- apr_status_t status = APR_SUCCESS;
-
- if (bb && !APR_BRIGADE_EMPTY(bb)) {
- H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_pre");
- status = mplx_transfer(msos, sos->stream->id, sos->stream->pool);
- H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_post");
- }
- return status;
-}
-
-static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response)
-{
- h2_sos *sos;
- h2_sos_mplx *msos;
-
- msos = apr_pcalloc(stream->pool, sizeof(*msos));
- msos->m = stream->session->mplx;
- msos->bb = apr_brigade_create(stream->pool, msos->m->c->bucket_alloc);
- msos->buffer_size = 32 * 1024;
-
- sos = apr_pcalloc(stream->pool, sizeof(*sos));
- sos->stream = stream;
- sos->response = response;
-
- sos->ctx = msos;
- sos->buffer = h2_sos_mplx_buffer;
- sos->prepare = h2_sos_mplx_prepare;
- sos->readx = h2_sos_mplx_readx;
- sos->read_to = h2_sos_mplx_read_to;
- sos->get_trailers = h2_sos_mplx_get_trailers;
-
- sos->response = response;
-
- return sos;
-}
-