diff options
| author | Stefan Fritsch <sf@sfritsch.de> | 2016-07-05 23:20:42 +0200 |
|---|---|---|
| committer | Stefan Fritsch <sf@sfritsch.de> | 2016-07-05 23:20:42 +0200 |
| commit | d5ffc4eb85d71c901c85119cf873e343349e97e2 (patch) | |
| tree | 564636012ef7538ed4d7096b83c994dbda76c9db /modules/http2/h2_mplx.c | |
| parent | 48eddd3d39fa2668ee29198ebfb33c41d4738c21 (diff) | |
| download | apache2-upstream.tar.gz | |
Imported Upstream version 2.4.23upstream
Diffstat (limited to 'modules/http2/h2_mplx.c')
| -rw-r--r-- | modules/http2/h2_mplx.c | 1432 |
1 files changed, 709 insertions, 723 deletions
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index a4dbf1f4..001eb7f6 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -29,38 +29,46 @@ #include "mod_http2.h" #include "h2_private.h" +#include "h2_bucket_beam.h" #include "h2_config.h" #include "h2_conn.h" #include "h2_ctx.h" #include "h2_h2.h" -#include "h2_int_queue.h" -#include "h2_io.h" -#include "h2_io_set.h" #include "h2_response.h" #include "h2_mplx.h" #include "h2_ngn_shed.h" #include "h2_request.h" #include "h2_stream.h" #include "h2_task.h" -#include "h2_task_input.h" -#include "h2_task_output.h" #include "h2_worker.h" #include "h2_workers.h" #include "h2_util.h" -#define H2_MPLX_IO_OUT(lvl,m,io,msg) \ - do { \ - if (APLOG_C_IS_LEVEL((m)->c,lvl)) \ - h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \ - } while(0) - -#define H2_MPLX_IO_IN(lvl,m,io,msg) \ - do { \ - if (APLOG_C_IS_LEVEL((m)->c,lvl)) \ - h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbin); \ - } while(0) +static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg, + conn_rec *c, int level) +{ + if (beam && APLOG_C_IS_LEVEL(c,level)) { + char buffer[2048]; + apr_size_t off = 0; + + off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed); + off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->red); + off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->green); + off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold); + off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge); + + ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s", + c->id, id, msg, buffer); + } +} +/* utility for iterating over ihash task sets */ +typedef struct { + h2_mplx *m; + h2_task *task; + apr_time_t now; +} task_iter_ctx; /* NULL or the mutex hold by this thread, used for recursive calls */ @@ -82,12 +90,14 @@ static apr_status_t enter_mutex(h2_mplx *m, int *pacquired) * This allow recursive entering of the mutex from the saem thread, * which is what we need in certain situations involving callbacks */ + AP_DEBUG_ASSERT(m); apr_threadkey_private_get(&mutex, thread_lock); if (mutex == m->lock) { *pacquired = 0; return APR_SUCCESS; } - + + AP_DEBUG_ASSERT(m->lock); status = apr_thread_mutex_lock(m->lock); *pacquired = (status == APR_SUCCESS); if (*pacquired) { @@ -104,23 +114,68 @@ static void leave_mutex(h2_mplx *m, int acquired) } } -static int is_aborted(h2_mplx *m, apr_status_t *pstatus) +static void beam_leave(void *ctx, apr_thread_mutex_t *lock) { - AP_DEBUG_ASSERT(m); - if (m->aborted) { - *pstatus = APR_ECONNABORTED; + leave_mutex(ctx, 1); +} + +static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl) +{ + h2_mplx *m = ctx; + int acquired; + apr_status_t status; + + status = enter_mutex(m, &acquired); + if (status == APR_SUCCESS) { + pbl->mutex = m->lock; + pbl->leave = acquired? beam_leave : NULL; + pbl->leave_ctx = m; + } + return status; +} + +static void stream_output_consumed(void *ctx, + h2_bucket_beam *beam, apr_off_t length) +{ + h2_task *task = ctx; + if (length > 0 && task && task->assigned) { + h2_req_engine_out_consumed(task->assigned, task->c, length); + } +} + +static void stream_input_consumed(void *ctx, + h2_bucket_beam *beam, apr_off_t length) +{ + h2_mplx *m = ctx; + if (m->input_consumed && length) { + m->input_consumed(m->input_consumed_ctx, beam->id, length); + } +} + +static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file) +{ + h2_mplx *m = ctx; + if (m->tx_handles_reserved > 0) { + --m->tx_handles_reserved; + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_mplx(%ld-%d): beaming file %s, tx_avail %d", + m->id, beam->id, beam->tag, m->tx_handles_reserved); return 1; } + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_mplx(%ld-%d): can_beam_file denied on %s", + m->id, beam->id, beam->tag); return 0; } static void have_out_data_for(h2_mplx *m, int stream_id); +static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master); static void check_tx_reservation(h2_mplx *m) { - if (m->tx_handles_reserved == 0) { + if (m->tx_handles_reserved <= 0) { m->tx_handles_reserved += h2_workers_tx_reserve(m->workers, - H2MIN(m->tx_chunk_size, h2_io_set_size(m->stream_ios))); + H2MIN(m->tx_chunk_size, h2_ihash_count(m->tasks))); } } @@ -131,19 +186,41 @@ static void check_tx_free(h2_mplx *m) m->tx_handles_reserved = m->tx_chunk_size; h2_workers_tx_free(m->workers, count); } - else if (m->tx_handles_reserved - && (!m->stream_ios || h2_io_set_is_empty(m->stream_ios))) { + else if (m->tx_handles_reserved && h2_ihash_empty(m->tasks)) { h2_workers_tx_free(m->workers, m->tx_handles_reserved); m->tx_handles_reserved = 0; } } +static int purge_stream(void *ctx, void *val) +{ + h2_mplx *m = ctx; + h2_stream *stream = val; + h2_task *task = h2_ihash_get(m->tasks, stream->id); + h2_ihash_remove(m->spurge, stream->id); + h2_stream_destroy(stream); + if (task) { + task_destroy(m, task, 1); + } + return 0; +} + +static void purge_streams(h2_mplx *m) +{ + if (!h2_ihash_empty(m->spurge)) { + while(!h2_ihash_iter(m->spurge, purge_stream, m)) { + /* repeat until empty */ + } + h2_ihash_clear(m->spurge); + } +} + static void h2_mplx_destroy(h2_mplx *m) { AP_DEBUG_ASSERT(m); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): destroy, ios=%d", - m->id, (int)h2_io_set_size(m->stream_ios)); + "h2_mplx(%ld): destroy, tasks=%d", + m->id, (int)h2_ihash_count(m->tasks)); check_tx_free(m); if (m->pool) { apr_pool_destroy(m->pool); @@ -204,9 +281,15 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->bucket_alloc = apr_bucket_alloc_create(m->pool); m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS); m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); + + m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id)); + m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id)); + m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->q = h2_iq_create(m->pool, m->max_streams); - m->stream_ios = h2_io_set_create(m->pool); - m->ready_ios = h2_io_set_create(m->pool); + m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id)); + m->sresume = h2_ihash_create(m->pool, offsetof(h2_stream,id)); + m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id)); + m->stream_timeout = stream_timeout; m->workers = workers; m->workers_max = workers->max_workers; @@ -240,75 +323,66 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m) return max_stream_started; } -static void workers_register(h2_mplx *m) -{ - /* h2_workers is only a hub for all the h2_worker instances. - * At the end-of-life of this h2_mplx, we always unregister at - * the workers. The thing to manage are all the h2_worker instances - * out there. Those may hold a reference to this h2_mplx and we cannot - * call them to unregister. - * - * Therefore: ref counting for h2_workers in not needed, ref counting - * for h2_worker using this is critical. - */ - m->need_registration = 0; - h2_workers_register(m->workers, m); -} - -static int io_in_consumed_signal(h2_mplx *m, h2_io *io) +static void input_consumed_signal(h2_mplx *m, h2_stream *stream) { - if (io->input_consumed && m->input_consumed) { - m->input_consumed(m->input_consumed_ctx, - io->id, io->input_consumed); - io->input_consumed = 0; - return 1; + if (stream->input && stream->started) { + h2_beam_send(stream->input, NULL, 0); /* trigger updates */ } - return 0; } -static int io_out_consumed_signal(h2_mplx *m, h2_io *io) +static int output_consumed_signal(h2_mplx *m, h2_task *task) { - if (io->output_consumed && io->task && io->task->assigned) { - h2_req_engine_out_consumed(io->task->assigned, io->task->c, - io->output_consumed); - io->output_consumed = 0; - return 1; + if (task->output.beam && task->worker_started && task->assigned) { + /* trigger updates */ + h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ); } return 0; } -static void io_destroy(h2_mplx *m, h2_io *io, int events) + +static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master) { - int reuse_slave; + conn_rec *slave = NULL; + int reuse_slave = 0; + apr_status_t status; - /* cleanup any buffered input */ - h2_io_in_shutdown(io); - if (events) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_task(%s): destroy", task->id); + if (called_from_master) { /* Process outstanding events before destruction */ - io_in_consumed_signal(m, io); + h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); + if (stream) { + input_consumed_signal(m, stream); + } } /* The pool is cleared/destroyed which also closes all * allocated file handles. Give this count back to our * file handle pool. */ - m->tx_handles_reserved += io->files_handles_owned; - - h2_io_set_remove(m->stream_ios, io); - h2_io_set_remove(m->ready_ios, io); - if (m->redo_ios) { - h2_io_set_remove(m->redo_ios, io); + if (task->output.beam) { + m->tx_handles_reserved += + h2_beam_get_files_beamed(task->output.beam); + h2_beam_on_produced(task->output.beam, NULL, NULL); + status = h2_beam_shutdown(task->output.beam, APR_NONBLOCK_READ, 1); + if (status != APR_SUCCESS){ + ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, + APLOGNO(03385) "h2_task(%s): output shutdown " + "incomplete", task->id); + } } - + + slave = task->c; reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc) - && !io->rst_error && io->eor); - if (io->task) { - conn_rec *slave = io->task->c; - h2_task_destroy(io->task); - io->task = NULL; - + && !task->rst_error); + + h2_ihash_remove(m->tasks, task->stream_id); + if (m->redo_tasks) { + h2_ihash_remove(m->redo_tasks, task->stream_id); + } + h2_task_destroy(task); + + if (slave) { if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) { - apr_bucket_delete(io->eor); - io->eor = NULL; APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave; } else { @@ -316,59 +390,104 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events) h2_slave_destroy(slave, NULL); } } - - if (io->pool) { - apr_pool_destroy(io->pool); - } - + check_tx_free(m); } -static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error) +static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) { - /* Remove io from ready set, we will never submit it */ - h2_io_set_remove(m->ready_ios, io); - if (!io->worker_started || io->worker_done) { - /* already finished or not even started yet */ - h2_iq_remove(m->q, io->id); - io_destroy(m, io, 1); - return 0; + h2_task *task; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_stream(%ld-%d): done", m->c->id, stream->id); + /* Situation: we are, on the master connection, done with processing + * the stream. Either we have handled it successfully, or the stream + * was reset by the client or the connection is gone and we are + * shutting down the whole session. + * + * We possibly have created a task for this stream to be processed + * on a slave connection. The processing might actually be ongoing + * right now or has already finished. A finished task waits for its + * stream to be done. This is the common case. + * + * If the stream had input (e.g. the request had a body), a task + * may have read, or is still reading buckets from the input beam. + * This means that the task is referencing memory from the stream's + * pool (or the master connection bucket alloc). Before we can free + * the stream pool, we need to make sure that those references are + * gone. This is what h2_beam_shutdown() on the input waits for. + * + * With the input handled, we can tear down that beam and care + * about the output beam. The stream might still have buffered some + * buckets read from the output, so we need to get rid of those. That + * is done by h2_stream_cleanup(). + * + * Now it is save to destroy the task (if it exists and is finished). + * + * FIXME: we currently destroy the stream, even if the task is still + * ongoing. This is not ok, since task->request is coming from stream + * memory. We should either copy it on task creation or wait with the + * stream destruction until the task is done. + */ + h2_iq_remove(m->q, stream->id); + h2_ihash_remove(m->sready, stream->id); + h2_ihash_remove(m->sresume, stream->id); + h2_ihash_remove(m->streams, stream->id); + if (stream->input) { + m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); + h2_beam_on_consumed(stream->input, NULL, NULL); + /* Let anyone blocked reading know that there is no more to come */ + h2_beam_abort(stream->input); + /* Remove mutex after, so that abort still finds cond to signal */ + h2_beam_mutex_set(stream->input, NULL, NULL, NULL); } - else { - /* cleanup once task is done */ - h2_io_make_orphaned(io, rst_error); - return 1; + h2_stream_cleanup(stream); + + task = h2_ihash_get(m->tasks, stream->id); + if (task) { + if (!task->worker_done) { + /* task still running, cleanup once it is done */ + if (rst_error) { + h2_task_rst(task, rst_error); + } + h2_ihash_add(m->shold, stream); + return; + } + else { + /* already finished */ + task_destroy(m, task, 0); + } } + h2_stream_destroy(stream); } -static int stream_done_iter(void *ctx, h2_io *io) +static int stream_done_iter(void *ctx, void *val) { - return io_stream_done((h2_mplx*)ctx, io, 0); + stream_done((h2_mplx*)ctx, val, 0); + return 0; } -static int stream_print(void *ctx, h2_io *io) +static int task_print(void *ctx, void *val) { h2_mplx *m = ctx; - if (io && io->request) { + h2_task *task = val; + + if (task && task->request) { + h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ - "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d" - "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", - m->id, io->id, - io->request->method, io->request->authority, io->request->path, - io->response? "http" : (io->rst_error? "reset" : "?"), - io->response? io->response->http_status : io->rst_error, - io->orphaned, io->worker_started, io->worker_done, - io->eos_in, io->eos_out); - } - else if (io) { + "->03198: h2_stream(%s): %s %s %s -> %s %d" + "[orph=%d/started=%d/done=%d]", + task->id, task->request->method, + task->request->authority, task->request->path, + task->response? "http" : (task->rst_error? "reset" : "?"), + task->response? task->response->http_status : task->rst_error, + (stream? 0 : 1), task->worker_started, + task->worker_done); + } + else if (task) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ - "->03198: h2_stream(%ld-%d): NULL -> %s %d" - "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", - m->id, io->id, - io->response? "http" : (io->rst_error? "reset" : "?"), - io->response? io->response->http_status : io->rst_error, - io->orphaned, io->worker_started, io->worker_done, - io->eos_in, io->eos_out); + "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id); } else { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ @@ -377,6 +496,32 @@ static int stream_print(void *ctx, h2_io *io) return 1; } +static int task_abort_connection(void *ctx, void *val) +{ + h2_task *task = val; + if (task->c) { + task->c->aborted = 1; + } + if (task->input.beam) { + h2_beam_abort(task->input.beam); + } + if (task->output.beam) { + h2_beam_abort(task->output.beam); + } + return 1; +} + +static int report_stream_iter(void *ctx, void *val) { + h2_mplx *m = ctx; + h2_stream *stream = val; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, " + "submitted=%d, suspended=%d", + m->id, stream->id, stream->started, stream->scheduled, + stream->submitted, stream->suspended); + return 1; +} + apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) { apr_status_t status; @@ -386,30 +531,62 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { int i, wait_secs = 5; + + if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): release_join with %d streams open, " + "%d streams resume, %d streams ready, %d tasks", + m->id, (int)h2_ihash_count(m->streams), + (int)h2_ihash_count(m->sresume), + (int)h2_ihash_count(m->sready), + (int)h2_ihash_count(m->tasks)); + h2_ihash_iter(m->streams, report_stream_iter, m); + } /* disable WINDOW_UPDATE callbacks */ h2_mplx_set_consumed_cb(m, NULL, NULL); + if (!h2_ihash_empty(m->shold)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): start release_join with %d streams in hold", + m->id, (int)h2_ihash_count(m->shold)); + } + if (!h2_ihash_empty(m->spurge)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): start release_join with %d streams to purge", + m->id, (int)h2_ihash_count(m->spurge)); + } + h2_iq_clear(m->q); apr_thread_cond_broadcast(m->task_thawed); - while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) { - /* iterate until all ios have been orphaned or destroyed */ + while (!h2_ihash_iter(m->streams, stream_done_iter, m)) { + /* iterate until all streams have been removed */ } + AP_DEBUG_ASSERT(h2_ihash_empty(m->streams)); + if (!h2_ihash_empty(m->shold)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): 2. release_join with %d streams in hold", + m->id, (int)h2_ihash_count(m->shold)); + } + if (!h2_ihash_empty(m->spurge)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): 2. release_join with %d streams to purge", + m->id, (int)h2_ihash_count(m->spurge)); + } + /* If we still have busy workers, we cannot release our memory - * pool yet, as slave connections have child pools of their respective - * h2_io's. - * Any remaining ios are processed in these workers. Any operation - * they do on their input/outputs will be errored ECONNRESET/ABORTED, - * so processing them should fail and workers *should* return. + * pool yet, as tasks have references to us. + * Any operation on the task slave connection will from now on + * be errored ECONNRESET/ABORTED, so processing them should fail + * and workers *should* return in a timely fashion. */ for (i = 0; m->workers_busy > 0; ++i) { + h2_ihash_iter(m->tasks, task_abort_connection, m); + m->join_wait = wait; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): release_join, waiting on %d worker to report back", - m->id, (int)h2_io_set_size(m->stream_ios)); - status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs)); + if (APR_STATUS_IS_TIMEUP(status)) { if (i > 0) { /* Oh, oh. Still we wait for assigned workers to report that @@ -419,11 +596,11 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) */ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198) "h2_mplx(%ld): release, waiting for %d seconds now for " - "%d h2_workers to return, have still %d requests outstanding", + "%d h2_workers to return, have still %d tasks outstanding", m->id, i*wait_secs, m->workers_busy, - (int)h2_io_set_size(m->stream_ios)); + (int)h2_ihash_count(m->tasks)); if (i == 1) { - h2_io_set_iter(m->stream_ios, stream_print, m); + h2_ihash_iter(m->tasks, task_print, m); } } h2_mplx_abort(m); @@ -431,13 +608,21 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) } } - if (!h2_io_set_is_empty(m->stream_ios)) { - ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c, - "h2_mplx(%ld): release_join, %d streams still open", - m->id, (int)h2_io_set_size(m->stream_ios)); + AP_DEBUG_ASSERT(h2_ihash_empty(m->shold)); + if (!h2_ihash_empty(m->spurge)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): 3. release_join %d streams to purge", + m->id, (int)h2_ihash_count(m->spurge)); + purge_streams(m); + } + AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge)); + + if (!h2_ihash_empty(m->tasks)) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) + "h2_mplx(%ld): release_join -> destroy, " + "%d tasks still present", + m->id, (int)h2_ihash_count(m->tasks)); } - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) - "h2_mplx(%ld): release_join -> destroy", m->id); leave_mutex(m, acquired); h2_mplx_destroy(m); /* all gone */ @@ -457,112 +642,18 @@ void h2_mplx_abort(h2_mplx *m) } } -apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) +apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream) { apr_status_t status = APR_SUCCESS; int acquired; - /* This maybe called from inside callbacks that already hold the lock. - * E.g. when we are streaming out DATA and the EOF triggers the stream - * release. - */ AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - - /* there should be an h2_io, once the stream has been scheduled - * for processing, e.g. when we received all HEADERs. But when - * a stream is cancelled very early, it will not exist. */ - if (io) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld-%d): marking stream as done.", - m->id, stream_id); - io_stream_done(m, io, rst_error); - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block, - int stream_id, apr_bucket_brigade *bb, - apr_table_t *trailers, - struct apr_thread_cond_t *iowait) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre"); - - h2_io_signal_init(io, H2_IO_READ, m->stream_timeout, iowait); - status = h2_io_in_read(io, bb, -1, trailers); - while (APR_STATUS_IS_EAGAIN(status) - && !is_aborted(m, &status) - && block == APR_BLOCK_READ) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_mplx(%ld-%d): wait on in data (BLOCK_READ)", - m->id, stream_id); - status = h2_io_signal_wait(m, io); - if (status == APR_SUCCESS) { - status = h2_io_in_read(io, bb, -1, trailers); - } - } - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_post"); - h2_io_signal_exit(io); - } - else { - status = APR_EOF; - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, - const char *data, apr_size_t len, int eos) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre"); - status = h2_io_in_write(io, data, len, eos); - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post"); - h2_io_signal(io, H2_IO_READ); - io_in_consumed_signal(m, io); - } - else { - status = APR_ECONNABORTED; - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - status = h2_io_in_close(io); - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close"); - h2_io_signal(io, H2_IO_READ); - io_in_consumed_signal(m, io); - } - else { - status = APR_ECONNABORTED; - } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld-%d): marking stream as done.", + m->id, stream->id); + stream_done(m, stream, stream->rst_error); + purge_streams(m); leave_mutex(m, acquired); } return status; @@ -574,215 +665,42 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) m->input_consumed_ctx = ctx; } -typedef struct { - h2_mplx * m; - int streams_updated; -} update_ctx; - -static int update_window(void *ctx, h2_io *io) -{ - update_ctx *uctx = (update_ctx*)ctx; - if (io_in_consumed_signal(uctx->m, io)) { - ++uctx->streams_updated; - } - return 1; -} - -apr_status_t h2_mplx_in_update_windows(h2_mplx *m) +static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response) { - apr_status_t status; - int acquired; + apr_status_t status = APR_SUCCESS; + h2_task *task = h2_ihash_get(m->tasks, stream_id); + h2_stream *stream = h2_ihash_get(m->streams, stream_id); - AP_DEBUG_ASSERT(m); - if (m->aborted) { + if (!task || !stream) { return APR_ECONNABORTED; } - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - update_ctx ctx; - - ctx.m = m; - ctx.streams_updated = 0; - - status = APR_EAGAIN; - h2_io_set_iter(m->stream_ios, update_window, &ctx); - - if (ctx.streams_updated) { - status = APR_SUCCESS; - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_out_get_brigade(h2_mplx *m, int stream_id, - apr_bucket_brigade *bb, - apr_off_t len, apr_table_t **ptrailers) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_pre"); - - status = h2_io_out_get_brigade(io, bb, len); - - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_post"); - if (status == APR_SUCCESS) { - h2_io_signal(io, H2_IO_WRITE); - } - } - else { - status = APR_ECONNABORTED; - } - *ptrailers = io->response? io->response->trailers : NULL; - leave_mutex(m, acquired); - } - return status; -} - -h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) -{ - apr_status_t status; - h2_stream *stream = NULL; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_shift(m->ready_ios); - if (io && !m->aborted) { - stream = h2_ihash_get(streams, io->id); - if (stream) { - io->submitted = 1; - if (io->rst_error) { - h2_stream_rst(stream, io->rst_error); - } - else { - AP_DEBUG_ASSERT(io->response); - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_pre"); - h2_stream_set_response(stream, io->response, io->bbout); - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_post"); - } - } - else { - /* We have the io ready, but the stream has gone away, maybe - * reset by the client. Should no longer happen since such - * streams should clear io's from the ready queue. - */ - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03347) - "h2_mplx(%ld): stream for response %d closed, " - "resetting io to close request processing", - m->id, io->id); - h2_io_make_orphaned(io, H2_ERR_STREAM_CLOSED); - if (!io->worker_started || io->worker_done) { - io_destroy(m, io, 1); - } - else { - /* hang around until the h2_task is done, but - * shutdown input and send out any events (e.g. window - * updates) asap. */ - h2_io_in_shutdown(io); - io_in_consumed_signal(m, io); - } - } - - h2_io_signal(io, H2_IO_WRITE); - } - leave_mutex(m, acquired); - } - return stream; -} - -static apr_status_t out_write(h2_mplx *m, h2_io *io, - ap_filter_t* f, int blocking, - apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait) -{ - apr_status_t status = APR_SUCCESS; - /* We check the memory footprint queued for this stream_id - * and block if it exceeds our configured limit. - * We will not split buckets to enforce the limit to the last - * byte. After all, the bucket is already in memory. - */ - while (status == APR_SUCCESS - && !APR_BRIGADE_EMPTY(bb) - && !is_aborted(m, &status)) { - - status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX, - &m->tx_handles_reserved); - io_out_consumed_signal(m, io); - - /* Wait for data to drain until there is room again or - * stream timeout expires */ - h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait); - while (status == APR_SUCCESS - && !APR_BRIGADE_EMPTY(bb) - && iowait - && (m->stream_max_mem <= h2_io_out_length(io)) - && !is_aborted(m, &status)) { - if (!blocking) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_mplx(%ld-%d): incomplete write", - m->id, io->id); - return APR_INCOMPLETE; - } - if (f) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_mplx(%ld-%d): waiting for out drain", - m->id, io->id); - } - status = h2_io_signal_wait(m, io); - } - h2_io_signal_exit(io); - } - apr_brigade_cleanup(bb); - return status; -} - -static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response, - ap_filter_t* f, apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait) -{ - apr_status_t status = APR_SUCCESS; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%s): open response: %d, rst=%d", + task->id, response->http_status, response->rst_error); - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - if (f) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_mplx(%ld-%d): open response: %d, rst=%d", - m->id, stream_id, response->http_status, - response->rst_error); - } - - h2_io_set_response(io, response); - h2_io_set_add(m->ready_ios, io); - if (response && response->http_status < 300) { - /* we might see some file buckets in the output, see - * if we have enough handles reserved. */ - check_tx_reservation(m); - } - if (bb) { - status = out_write(m, io, f, 0, bb, iowait); - if (status == APR_INCOMPLETE) { - /* write will have transferred as much data as possible. - caller has to deal with non-empty brigade */ - status = APR_SUCCESS; - } - } - have_out_data_for(m, stream_id); + h2_task_set_response(task, response); + + if (task->output.beam) { + h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem); + h2_beam_timeout_set(task->output.beam, m->stream_timeout); + h2_beam_on_consumed(task->output.beam, stream_output_consumed, task); + m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam); + h2_beam_on_file_beam(task->output.beam, can_beam_file, m); + h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m); } - else { - status = APR_ECONNABORTED; + + h2_ihash_add(m->sready, stream); + if (response && response->http_status < 300) { + /* we might see some file buckets in the output, see + * if we have enough handles reserved. */ + check_tx_reservation(m); } + have_out_data_for(m, stream_id); return status; } -apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response, - ap_filter_t* f, apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait) +apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response) { apr_status_t status; int acquired; @@ -793,125 +711,47 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response, status = APR_ECONNABORTED; } else { - status = out_open(m, stream_id, response, f, bb, iowait); - if (APLOGctrace1(m->c)) { - h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb); - } + status = out_open(m, stream_id, response); } leave_mutex(m, acquired); } return status; } -apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, - ap_filter_t* f, int blocking, - apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait) +static apr_status_t out_close(h2_mplx *m, h2_task *task) { - apr_status_t status; - int acquired; + apr_status_t status = APR_SUCCESS; + h2_stream *stream; - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - status = out_write(m, io, f, blocking, bb, iowait); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_mplx(%ld-%d): write", m->id, io->id); - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write"); - - have_out_data_for(m, stream_id); - } - else { - status = APR_ECONNABORTED; - } - leave_mutex(m, acquired); + if (!task) { + return APR_ECONNABORTED; } - return status; -} -apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - if (!io->response && !io->rst_error) { - /* In case a close comes before a response was created, - * insert an error one so that our streams can properly - * reset. - */ - h2_response *r = h2_response_die(stream_id, APR_EGENERAL, - io->request, m->pool); - status = out_open(m, stream_id, r, NULL, NULL, NULL); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, - "h2_mplx(%ld-%d): close, no response, no rst", - m->id, io->id); - } - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_mplx(%ld-%d): close with eor=%s", - m->id, io->id, io->eor? "yes" : "no"); - status = h2_io_out_close(io); - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close"); - io_out_consumed_signal(m, io); - - have_out_data_for(m, stream_id); - } - else { - status = APR_ECONNABORTED; - } - leave_mutex(m, acquired); + stream = h2_ihash_get(m->streams, task->stream_id); + if (!stream) { + return APR_ECONNABORTED; } - return status; -} -apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->rst_error && !io->orphaned) { - h2_io_rst(io, error); - if (!io->response) { - h2_io_set_add(m->ready_ios, io); - } - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst"); - - have_out_data_for(m, stream_id); - h2_io_signal(io, H2_IO_WRITE); - } - else { - status = APR_ECONNABORTED; - } - leave_mutex(m, acquired); + if (!task->response && !task->rst_error) { + /* In case a close comes before a response was created, + * insert an error one so that our streams can properly reset. + */ + h2_response *r = h2_response_die(task->stream_id, 500, + task->request, m->pool); + status = out_open(m, task->stream_id, r); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, APLOGNO(03393) + "h2_mplx(%s): close, no response, no rst", task->id); } - return status; -} - -int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id) -{ - apr_status_t status; - int has_data = 0; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - has_data = h2_io_out_has_data(io); - } - else { - has_data = 0; - } - leave_mutex(m, acquired); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, + "h2_mplx(%s): close", task->id); + if (task->output.beam) { + status = h2_beam_close(task->output.beam); + h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c, + APLOG_TRACE2); } - return has_data; + output_consumed_signal(m, task); + have_out_data_for(m, task->stream_id); + return status; } apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, @@ -925,7 +765,11 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, if (m->aborted) { status = APR_ECONNABORTED; } + else if (!h2_ihash_empty(m->sready) || !h2_ihash_empty(m->sresume)) { + status = APR_SUCCESS; + } else { + purge_streams(m); m->added_output = iowait; status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); if (APLOGctrace2(m->c)) { @@ -969,22 +813,7 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) return status; } -static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request) -{ - apr_pool_t *io_pool; - h2_io *io; - - apr_pool_create(&io_pool, m->pool); - apr_pool_tag(io_pool, "h2_io"); - io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request); - h2_io_set_add(m->stream_ios, io); - - return io; -} - - -apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, - const h2_request *req, +apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; @@ -997,24 +826,32 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, status = APR_ECONNABORTED; } else { - h2_io *io = open_io(m, stream_id, req); - - if (!io->request->body) { - status = h2_io_in_close(io); + h2_ihash_add(m->streams, stream); + if (stream->response) { + /* already have a respone, schedule for submit */ + h2_ihash_add(m->sready, stream); + } + else { + h2_beam_create(&stream->input, stream->pool, stream->id, + "input", 0); + if (!m->need_registration) { + m->need_registration = h2_iq_empty(m->q); + } + if (m->workers_busy < m->workers_max) { + do_registration = m->need_registration; + } + h2_iq_add(m->q, stream->id, cmp, ctx); + + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, + "h2_mplx(%ld-%d): process, body=%d", + m->c->id, stream->id, stream->request->body); } - - m->need_registration = m->need_registration || h2_iq_empty(m->q); - do_registration = (m->need_registration && m->workers_busy < m->workers_max); - h2_iq_add(m->q, io->id, cmp, ctx); - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, - "h2_mplx(%ld-%d): process", m->c->id, stream_id); - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process"); } leave_mutex(m, acquired); } - if (status == APR_SUCCESS && do_registration) { - workers_register(m); + if (do_registration) { + m->need_registration = 0; + h2_workers_register(m->workers, m); } return status; } @@ -1022,21 +859,16 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, static h2_task *pop_task(h2_mplx *m) { h2_task *task = NULL; + h2_stream *stream; int sid; - while (!m->aborted && !task - && (m->workers_busy < m->workers_limit) - && (sid = h2_iq_shift(m->q)) > 0) { - h2_io *io = h2_io_set_get(m->stream_ios, sid); - if (io && io->orphaned) { - io_destroy(m, io, 0); - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } - } - else if (io) { + while (!m->aborted && !task && (m->workers_busy < m->workers_limit) + && (sid = h2_iq_shift(m->q)) > 0) { + + stream = h2_ihash_get(m->streams, sid); + if (stream) { conn_rec *slave, **pslave; int new_conn = 0; - + pslave = (conn_rec **)apr_array_pop(m->spare_slaves); if (pslave) { slave = *pslave; @@ -1047,17 +879,29 @@ static h2_task *pop_task(h2_mplx *m) } slave->sbh = m->c->sbh; - io->task = task = h2_task_create(m->id, io->request, slave, m); + slave->aborted = 0; + task = h2_task_create(slave, stream->request, stream->input, m); + h2_ihash_add(m->tasks, task); + m->c->keepalives++; apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id); if (new_conn) { h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave)); } - io->worker_started = 1; - io->started_at = apr_time_now(); + stream->started = 1; + task->worker_started = 1; + task->started_at = apr_time_now(); if (sid > m->max_stream_started) { m->max_stream_started = sid; } + + if (stream->input) { + h2_beam_timeout_set(stream->input, m->stream_timeout); + h2_beam_on_consumed(stream->input, stream_input_consumed, m); + h2_beam_on_file_beam(stream->input, can_beam_file, m); + h2_beam_mutex_set(stream->input, beam_enter, task->cond, m); + } + ++m->workers_busy; } } @@ -1090,100 +934,119 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) { - if (task) { - h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); + if (task->frozen) { + /* this task was handed over to an engine for processing + * and the original worker has finished. That means the + * engine may start processing now. */ + h2_task_thaw(task); + /* we do not want the task to block on writing response + * bodies into the mplx. */ + h2_task_set_io_blocking(task, 0); + apr_thread_cond_broadcast(m->task_thawed); + return; + } + else { + h2_stream *stream; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): task(%s) done", m->id, task->id); + out_close(m, task); + stream = h2_ihash_get(m->streams, task->stream_id); - if (task->frozen) { - /* this task was handed over to an engine for processing - * and the original worker has finished. That means the - * engine may start processing now. */ - h2_task_thaw(task); - /* we do not want the task to block on writing response - * bodies into the mplx. */ - /* FIXME: this implementation is incomplete. */ - h2_task_set_io_blocking(task, 0); - apr_thread_cond_broadcast(m->task_thawed); + if (ngn) { + apr_off_t bytes = 0; + if (task->output.beam) { + h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ); + bytes += h2_beam_get_buffered(task->output.beam); + } + if (bytes > 0) { + /* we need to report consumed and current buffered output + * to the engine. The request will be streamed out or cancelled, + * no more data is coming from it and the engine should update + * its calculations before we destroy this information. */ + h2_req_engine_out_consumed(ngn, task->c, bytes); + } } - else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): task(%s) done", m->id, task->id); - /* clean our references and report request as done. Signal - * that we want another unless we have been aborted */ - /* TODO: this will keep a worker attached to this h2_mplx as - * long as it has requests to handle. Might no be fair to - * other mplx's. Perhaps leave after n requests? */ - h2_mplx_out_close(m, task->stream_id); - - if (ngn && io) { - apr_off_t bytes = io->output_consumed + h2_io_out_length(io); - if (bytes > 0) { - /* we need to report consumed and current buffered output - * to the engine. The request will be streamed out or cancelled, - * no more data is coming from it and the engine should update - * its calculations before we destroy this information. */ - h2_req_engine_out_consumed(ngn, task->c, bytes); - io->output_consumed = 0; - } + + if (task->engine) { + if (!h2_req_engine_is_shutdown(task->engine)) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + "h2_mplx(%ld): task(%s) has not-shutdown " + "engine(%s)", m->id, task->id, + h2_req_engine_get_id(task->engine)); } - - if (task->engine) { - if (!h2_req_engine_is_shutdown(task->engine)) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - "h2_mplx(%ld): task(%s) has not-shutdown " - "engine(%s)", m->id, task->id, - h2_req_engine_get_id(task->engine)); - } - h2_ngn_shed_done_ngn(m->ngn_shed, task->engine); + h2_ngn_shed_done_ngn(m->ngn_shed, task->engine); + } + + if (!m->aborted && stream && m->redo_tasks + && h2_ihash_get(m->redo_tasks, task->stream_id)) { + /* reset and schedule again */ + h2_task_redo(task); + h2_ihash_remove(m->redo_tasks, task->stream_id); + h2_iq_add(m->q, task->stream_id, NULL, NULL); + return; + } + + task->worker_done = 1; + task->done_at = apr_time_now(); + if (task->output.beam) { + h2_beam_on_consumed(task->output.beam, NULL, NULL); + h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL); + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%s): request done, %f ms elapsed", task->id, + (task->done_at - task->started_at) / 1000.0); + if (task->started_at > m->last_idle_block) { + /* this task finished without causing an 'idle block', e.g. + * a block by flow control. + */ + if (task->done_at- m->last_limit_change >= m->limit_change_interval + && m->workers_limit < m->workers_max) { + /* Well behaving stream, allow it more workers */ + m->workers_limit = H2MIN(m->workers_limit * 2, + m->workers_max); + m->last_limit_change = task->done_at; + m->need_registration = 1; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): increase worker limit to %d", + m->id, m->workers_limit); } - - if (io) { - apr_time_t now = apr_time_now(); - if (!io->orphaned && m->redo_ios - && h2_io_set_get(m->redo_ios, io->id)) { - /* reset and schedule again */ - h2_io_redo(io); - h2_io_set_remove(m->redo_ios, io); - h2_iq_add(m->q, io->id, NULL, NULL); - } - else { - io->worker_done = 1; - io->done_at = now; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): request(%d) done, %f ms" - " elapsed", m->id, io->id, - (io->done_at - io->started_at) / 1000.0); - if (io->started_at > m->last_idle_block) { - /* this task finished without causing an 'idle block', e.g. - * a block by flow control. - */ - if (now - m->last_limit_change >= m->limit_change_interval - && m->workers_limit < m->workers_max) { - /* Well behaving stream, allow it more workers */ - m->workers_limit = H2MIN(m->workers_limit * 2, - m->workers_max); - m->last_limit_change = now; - m->need_registration = 1; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): increase worker limit to %d", - m->id, m->workers_limit); - } - } - } - - if (io->orphaned) { - io_destroy(m, io, 0); - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } - } - else { - /* hang around until the stream deregisters */ - } + } + + if (stream) { + /* hang around until the stream deregisters */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%s): task_done, stream still open", + task->id); + if (h2_stream_is_suspended(stream)) { + /* more data will not arrive, resume the stream */ + h2_ihash_add(m->sresume, stream); + have_out_data_for(m, stream->id); + } + } + else { + /* stream no longer active, was it placed in hold? */ + stream = h2_ihash_get(m->shold, task->stream_id); + if (stream) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%s): task_done, stream in hold", + task->id); + /* We cannot destroy the stream here since this is + * called from a worker thread and freeing memory pools + * is only safe in the only thread using it (and its + * parent pool / allocator) */ + h2_ihash_remove(m->shold, stream->id); + h2_ihash_add(m->spurge, stream); } else { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - "h2_mplx(%ld): task %s without corresp. h2_io", - m->id, task->id); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%s): task_done, stream not found", + task->id); + task_destroy(m, task, 0); + } + + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); } } } @@ -1208,80 +1071,76 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) * h2_mplx DoS protection ******************************************************************************/ -typedef struct { - h2_mplx *m; - h2_io *io; - apr_time_t now; -} io_iter_ctx; - -static int latest_repeatable_busy_unsubmitted_iter(void *data, h2_io *io) +static int latest_repeatable_unsubmitted_iter(void *data, void *val) { - io_iter_ctx *ctx = data; - if (io->worker_started && !io->worker_done - && h2_io_is_repeatable(io) - && !h2_io_set_get(ctx->m->redo_ios, io->id)) { - /* this io occupies a worker, the response has not been submitted yet, + task_iter_ctx *ctx = data; + h2_task *task = val; + if (!task->worker_done && h2_task_can_redo(task) + && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) { + /* this task occupies a worker, the response has not been submitted yet, * not been cancelled and it is a repeatable request * -> it can be re-scheduled later */ - if (!ctx->io || ctx->io->started_at < io->started_at) { + if (!ctx->task || ctx->task->started_at < task->started_at) { /* we did not have one or this one was started later */ - ctx->io = io; + ctx->task = task; } } return 1; } -static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m) +static h2_task *get_latest_repeatable_unsubmitted_task(h2_mplx *m) { - io_iter_ctx ctx; + task_iter_ctx ctx; ctx.m = m; - ctx.io = NULL; - h2_io_set_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx); - return ctx.io; + ctx.task = NULL; + h2_ihash_iter(m->tasks, latest_repeatable_unsubmitted_iter, &ctx); + return ctx.task; } -static int timed_out_busy_iter(void *data, h2_io *io) +static int timed_out_busy_iter(void *data, void *val) { - io_iter_ctx *ctx = data; - if (io->worker_started && !io->worker_done - && (ctx->now - io->started_at) > ctx->m->stream_timeout) { + task_iter_ctx *ctx = data; + h2_task *task = val; + if (!task->worker_done + && (ctx->now - task->started_at) > ctx->m->stream_timeout) { /* timed out stream occupying a worker, found */ - ctx->io = io; + ctx->task = task; return 0; } return 1; } -static h2_io *get_timed_out_busy_stream(h2_mplx *m) + +static h2_task *get_timed_out_busy_task(h2_mplx *m) { - io_iter_ctx ctx; + task_iter_ctx ctx; ctx.m = m; - ctx.io = NULL; + ctx.task = NULL; ctx.now = apr_time_now(); - h2_io_set_iter(m->stream_ios, timed_out_busy_iter, &ctx); - return ctx.io; + h2_ihash_iter(m->tasks, timed_out_busy_iter, &ctx); + return ctx.task; } -static apr_status_t unschedule_slow_ios(h2_mplx *m) +static apr_status_t unschedule_slow_tasks(h2_mplx *m) { - h2_io *io; + h2_task *task; int n; - if (!m->redo_ios) { - m->redo_ios = h2_io_set_create(m->pool); + if (!m->redo_tasks) { + m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id)); } /* Try to get rid of streams that occupy workers. Look for safe requests * that are repeatable. If none found, fail the connection. */ - n = (m->workers_busy - m->workers_limit - h2_io_set_size(m->redo_ios)); - while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) { - h2_io_set_add(m->redo_ios, io); - h2_io_rst(io, H2_ERR_CANCEL); + n = (m->workers_busy - m->workers_limit - h2_ihash_count(m->redo_tasks)); + while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) { + h2_task_rst(task, H2_ERR_CANCEL); + h2_ihash_add(m->redo_tasks, task); --n; } - if ((m->workers_busy - h2_io_set_size(m->redo_ios)) > m->workers_limit) { - io = get_timed_out_busy_stream(m); - if (io) { + if ((m->workers_busy - h2_ihash_count(m->redo_tasks)) > m->workers_limit) { + task = get_timed_out_busy_task(m); + if (task) { /* Too many busy workers, unable to cancel enough streams * and with a busy, timed out stream, we tell the client * to go away... */ @@ -1298,7 +1157,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m) int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { - apr_size_t scount = h2_io_set_size(m->stream_ios); + apr_size_t scount = h2_ihash_count(m->streams); if (scount > 0 && m->workers_busy) { /* If we have streams in connection state 'IDLE', meaning * all streams are ready to sent data out, but lack @@ -1335,7 +1194,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m) } if (m->workers_busy > m->workers_limit) { - status = unschedule_slow_ios(m); + status = unschedule_slow_tasks(m); } } leave_mutex(m, acquired); @@ -1353,11 +1212,12 @@ typedef struct { int streams_updated; } ngn_update_ctx; -static int ngn_update_window(void *ctx, h2_io *io) +static int ngn_update_window(void *ctx, void *val) { ngn_update_ctx *uctx = ctx; - if (io && io->task && io->task->assigned == uctx->ngn - && io_out_consumed_signal(uctx->m, io)) { + h2_task *task = val; + if (task && task->assigned == uctx->ngn + && output_consumed_signal(uctx->m, task)) { ++uctx->streams_updated; } return 1; @@ -1370,7 +1230,7 @@ static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn) ctx.m = m; ctx.ngn = ngn; ctx.streams_updated = 0; - h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx); + h2_ihash_iter(m->tasks, ngn_update_window, &ctx); return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN; } @@ -1392,12 +1252,13 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, task->r = r; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); - if (!io || io->orphaned) { - status = APR_ECONNABORTED; + h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); + + if (stream) { + status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit); } else { - status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit); + status = APR_ECONNABORTED; } leave_mutex(m, acquired); } @@ -1469,4 +1330,129 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn) } } } - + +/******************************************************************************* + * mplx master events dispatching + ******************************************************************************/ + +static int update_window(void *ctx, void *val) +{ + input_consumed_signal(ctx, val); + return 1; +} + +apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, + stream_ev_callback *on_resume, + stream_ev_callback *on_response, + void *on_ctx) +{ + apr_status_t status; + int acquired; + int streams[32]; + h2_stream *stream; + h2_task *task; + size_t i, n; + + AP_DEBUG_ASSERT(m); + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_mplx(%ld): dispatch events", m->id); + + /* update input windows for streams */ + h2_ihash_iter(m->streams, update_window, m); + + if (on_response && !h2_ihash_empty(m->sready)) { + n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams)); + for (i = 0; i < n; ++i) { + stream = h2_ihash_get(m->streams, streams[i]); + if (!stream) { + continue; + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_mplx(%ld-%d): on_response", + m->id, stream->id); + task = h2_ihash_get(m->tasks, stream->id); + if (task) { + task->submitted = 1; + if (task->rst_error) { + h2_stream_rst(stream, task->rst_error); + } + else { + AP_DEBUG_ASSERT(task->response); + h2_stream_set_response(stream, task->response, task->output.beam); + } + } + else { + /* We have the stream ready without a task. This happens + * when we fail streams early. A response should already + * be present. */ + AP_DEBUG_ASSERT(stream->response || stream->rst_error); + } + status = on_response(on_ctx, stream->id); + } + } + + if (on_resume && !h2_ihash_empty(m->sresume)) { + n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams)); + for (i = 0; i < n; ++i) { + stream = h2_ihash_get(m->streams, streams[i]); + if (!stream) { + continue; + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_mplx(%ld-%d): on_resume", + m->id, stream->id); + h2_stream_set_suspended(stream, 0); + status = on_resume(on_ctx, stream->id); + } + } + + leave_mutex(m, acquired); + } + return status; +} + +static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) +{ + h2_mplx *m = ctx; + apr_status_t status; + h2_stream *stream; + int acquired; + + AP_DEBUG_ASSERT(m); + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + stream = h2_ihash_get(m->streams, beam->id); + if (stream && h2_stream_is_suspended(stream)) { + h2_ihash_add(m->sresume, stream); + h2_beam_on_produced(beam, NULL, NULL); + have_out_data_for(m, beam->id); + } + leave_mutex(m, acquired); + } +} + +apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id) +{ + apr_status_t status; + h2_stream *stream; + h2_task *task; + int acquired; + + AP_DEBUG_ASSERT(m); + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + stream = h2_ihash_get(m->streams, stream_id); + if (stream) { + h2_stream_set_suspended(stream, 1); + task = h2_ihash_get(m->tasks, stream->id); + if (stream->started && (!task || task->worker_done)) { + h2_ihash_add(m->sresume, stream); + } + else { + /* register callback so that we can resume on new output */ + h2_beam_on_produced(task->output.beam, output_produced, m); + } + } + leave_mutex(m, acquired); + } + return status; +} |
