/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include #include #include #include "mod_http2.h" #include "h2_private.h" #include "h2_bucket_beam.h" #include "h2_config.h" #include "h2_conn.h" #include "h2_ctx.h" #include "h2_h2.h" #include "h2_response.h" #include "h2_mplx.h" #include "h2_ngn_shed.h" #include "h2_request.h" #include "h2_stream.h" #include "h2_task.h" #include "h2_worker.h" #include "h2_workers.h" #include "h2_util.h" static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg, conn_rec *c, int level) { if (beam && APLOG_C_IS_LEVEL(c,level)) { char buffer[2048]; apr_size_t off = 0; off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed); off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->red); off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->green); off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold); off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge); ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s", c->id, id, msg, buffer); } } /* utility for iterating over ihash task sets */ typedef struct { h2_mplx *m; h2_task *task; apr_time_t now; } task_iter_ctx; /* NULL or the mutex hold by this thread, used for recursive calls */ static apr_threadkey_t *thread_lock; apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s) { return apr_threadkey_private_create(&thread_lock, NULL, pool); } static apr_status_t enter_mutex(h2_mplx *m, int *pacquired) { apr_status_t status; void *mutex = NULL; /* Enter the mutex if this thread already holds the lock or * if we can acquire it. Only on the later case do we unlock * onleaving the mutex. * This allow recursive entering of the mutex from the saem thread, * which is what we need in certain situations involving callbacks */ AP_DEBUG_ASSERT(m); apr_threadkey_private_get(&mutex, thread_lock); if (mutex == m->lock) { *pacquired = 0; return APR_SUCCESS; } AP_DEBUG_ASSERT(m->lock); status = apr_thread_mutex_lock(m->lock); *pacquired = (status == APR_SUCCESS); if (*pacquired) { apr_threadkey_private_set(m->lock, thread_lock); } return status; } static void leave_mutex(h2_mplx *m, int acquired) { if (acquired) { apr_threadkey_private_set(NULL, thread_lock); apr_thread_mutex_unlock(m->lock); } } static void beam_leave(void *ctx, apr_thread_mutex_t *lock) { leave_mutex(ctx, 1); } static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl) { h2_mplx *m = ctx; int acquired; apr_status_t status; status = enter_mutex(m, &acquired); if (status == APR_SUCCESS) { pbl->mutex = m->lock; pbl->leave = acquired? beam_leave : NULL; pbl->leave_ctx = m; } return status; } static void stream_output_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) { h2_task *task = ctx; if (length > 0 && task && task->assigned) { h2_req_engine_out_consumed(task->assigned, task->c, length); } } static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) { h2_mplx *m = ctx; if (m->input_consumed && length) { m->input_consumed(m->input_consumed_ctx, beam->id, length); } } static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file) { h2_mplx *m = ctx; if (m->tx_handles_reserved > 0) { --m->tx_handles_reserved; ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, "h2_mplx(%ld-%d): beaming file %s, tx_avail %d", m->id, beam->id, beam->tag, m->tx_handles_reserved); return 1; } ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, "h2_mplx(%ld-%d): can_beam_file denied on %s", m->id, beam->id, beam->tag); return 0; } static void have_out_data_for(h2_mplx *m, int stream_id); static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master); static void check_tx_reservation(h2_mplx *m) { if (m->tx_handles_reserved <= 0) { m->tx_handles_reserved += h2_workers_tx_reserve(m->workers, H2MIN(m->tx_chunk_size, h2_ihash_count(m->tasks))); } } static void check_tx_free(h2_mplx *m) { if (m->tx_handles_reserved > m->tx_chunk_size) { apr_size_t count = m->tx_handles_reserved - m->tx_chunk_size; m->tx_handles_reserved = m->tx_chunk_size; h2_workers_tx_free(m->workers, count); } else if (m->tx_handles_reserved && h2_ihash_empty(m->tasks)) { h2_workers_tx_free(m->workers, m->tx_handles_reserved); m->tx_handles_reserved = 0; } } static int purge_stream(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; h2_task *task = h2_ihash_get(m->tasks, stream->id); h2_ihash_remove(m->spurge, stream->id); h2_stream_destroy(stream); if (task) { task_destroy(m, task, 1); } return 0; } static void purge_streams(h2_mplx *m) { if (!h2_ihash_empty(m->spurge)) { while(!h2_ihash_iter(m->spurge, purge_stream, m)) { /* repeat until empty */ } h2_ihash_clear(m->spurge); } } static void h2_mplx_destroy(h2_mplx *m) { AP_DEBUG_ASSERT(m); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): destroy, tasks=%d", m->id, (int)h2_ihash_count(m->tasks)); check_tx_free(m); if (m->pool) { apr_pool_destroy(m->pool); } } /** * A h2_mplx needs to be thread-safe *and* if will be called by * the h2_session thread *and* the h2_worker threads. Therefore: * - calls are protected by a mutex lock, m->lock * - the pool needs its own allocator, since apr_allocator_t are * not re-entrant. The separate allocator works without a * separate lock since we already protect h2_mplx itself. * Since HTTP/2 connections can be expected to live longer than * their HTTP/1 cousins, the separate allocator seems to work better * than protecting a shared h2_session one with an own lock. */ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, const h2_config *conf, apr_interval_time_t stream_timeout, h2_workers *workers) { apr_status_t status = APR_SUCCESS; apr_allocator_t *allocator = NULL; h2_mplx *m; AP_DEBUG_ASSERT(conf); status = apr_allocator_create(&allocator); if (status != APR_SUCCESS) { return NULL; } m = apr_pcalloc(parent, sizeof(h2_mplx)); if (m) { m->id = c->id; APR_RING_ELEM_INIT(m, link); m->c = c; apr_pool_create_ex(&m->pool, parent, NULL, allocator); if (!m->pool) { return NULL; } apr_pool_tag(m->pool, "h2_mplx"); apr_allocator_owner_set(allocator, m->pool); status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT, m->pool); if (status != APR_SUCCESS) { h2_mplx_destroy(m); return NULL; } status = apr_thread_cond_create(&m->task_thawed, m->pool); if (status != APR_SUCCESS) { h2_mplx_destroy(m); return NULL; } m->bucket_alloc = apr_bucket_alloc_create(m->pool); m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS); m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->q = h2_iq_create(m->pool, m->max_streams); m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->sresume = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id)); m->stream_timeout = stream_timeout; m->workers = workers; m->workers_max = workers->max_workers; m->workers_def_limit = 4; m->workers_limit = m->workers_def_limit; m->last_limit_change = m->last_idle_block = apr_time_now(); m->limit_change_interval = apr_time_from_msec(200); m->tx_handles_reserved = 0; m->tx_chunk_size = 4; m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*)); m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->max_streams, m->stream_max_mem); h2_ngn_shed_set_ctx(m->ngn_shed , m); } return m; } apr_uint32_t h2_mplx_shutdown(h2_mplx *m) { int acquired, max_stream_started = 0; if (enter_mutex(m, &acquired) == APR_SUCCESS) { max_stream_started = m->max_stream_started; /* Clear schedule queue, disabling existing streams from starting */ h2_iq_clear(m->q); leave_mutex(m, acquired); } return max_stream_started; } static void input_consumed_signal(h2_mplx *m, h2_stream *stream) { if (stream->input && stream->started) { h2_beam_send(stream->input, NULL, 0); /* trigger updates */ } } static int output_consumed_signal(h2_mplx *m, h2_task *task) { if (task->output.beam && task->worker_started && task->assigned) { /* trigger updates */ h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ); } return 0; } static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master) { conn_rec *slave = NULL; int reuse_slave = 0; apr_status_t status; ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, "h2_task(%s): destroy", task->id); if (called_from_master) { /* Process outstanding events before destruction */ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); if (stream) { input_consumed_signal(m, stream); } } /* The pool is cleared/destroyed which also closes all * allocated file handles. Give this count back to our * file handle pool. */ if (task->output.beam) { m->tx_handles_reserved += h2_beam_get_files_beamed(task->output.beam); h2_beam_on_produced(task->output.beam, NULL, NULL); status = h2_beam_shutdown(task->output.beam, APR_NONBLOCK_READ, 1); if (status != APR_SUCCESS){ ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, APLOGNO(03385) "h2_task(%s): output shutdown " "incomplete", task->id); } } slave = task->c; reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc) && !task->rst_error); h2_ihash_remove(m->tasks, task->stream_id); if (m->redo_tasks) { h2_ihash_remove(m->redo_tasks, task->stream_id); } h2_task_destroy(task); if (slave) { if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) { APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave; } else { slave->sbh = NULL; h2_slave_destroy(slave, NULL); } } check_tx_free(m); } static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) { h2_task *task; ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, "h2_stream(%ld-%d): done", m->c->id, stream->id); /* Situation: we are, on the master connection, done with processing * the stream. Either we have handled it successfully, or the stream * was reset by the client or the connection is gone and we are * shutting down the whole session. * * We possibly have created a task for this stream to be processed * on a slave connection. The processing might actually be ongoing * right now or has already finished. A finished task waits for its * stream to be done. This is the common case. * * If the stream had input (e.g. the request had a body), a task * may have read, or is still reading buckets from the input beam. * This means that the task is referencing memory from the stream's * pool (or the master connection bucket alloc). Before we can free * the stream pool, we need to make sure that those references are * gone. This is what h2_beam_shutdown() on the input waits for. * * With the input handled, we can tear down that beam and care * about the output beam. The stream might still have buffered some * buckets read from the output, so we need to get rid of those. That * is done by h2_stream_cleanup(). * * Now it is save to destroy the task (if it exists and is finished). * * FIXME: we currently destroy the stream, even if the task is still * ongoing. This is not ok, since task->request is coming from stream * memory. We should either copy it on task creation or wait with the * stream destruction until the task is done. */ h2_iq_remove(m->q, stream->id); h2_ihash_remove(m->sready, stream->id); h2_ihash_remove(m->sresume, stream->id); h2_ihash_remove(m->streams, stream->id); if (stream->input) { m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); h2_beam_on_consumed(stream->input, NULL, NULL); /* Let anyone blocked reading know that there is no more to come */ h2_beam_abort(stream->input); /* Remove mutex after, so that abort still finds cond to signal */ h2_beam_mutex_set(stream->input, NULL, NULL, NULL); } h2_stream_cleanup(stream); task = h2_ihash_get(m->tasks, stream->id); if (task) { if (!task->worker_done) { /* task still running, cleanup once it is done */ if (rst_error) { h2_task_rst(task, rst_error); } h2_ihash_add(m->shold, stream); return; } else { /* already finished */ task_destroy(m, task, 0); } } h2_stream_destroy(stream); } static int stream_done_iter(void *ctx, void *val) { stream_done((h2_mplx*)ctx, val, 0); return 0; } static int task_print(void *ctx, void *val) { h2_mplx *m = ctx; h2_task *task = val; if (task && task->request) { h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%s): %s %s %s -> %s %d" "[orph=%d/started=%d/done=%d]", task->id, task->request->method, task->request->authority, task->request->path, task->response? "http" : (task->rst_error? "reset" : "?"), task->response? task->response->http_status : task->rst_error, (stream? 0 : 1), task->worker_started, task->worker_done); } else if (task) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id); } else { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%ld-NULL): NULL", m->id); } return 1; } static int task_abort_connection(void *ctx, void *val) { h2_task *task = val; if (task->c) { task->c->aborted = 1; } if (task->input.beam) { h2_beam_abort(task->input.beam); } if (task->output.beam) { h2_beam_abort(task->output.beam); } return 1; } static int report_stream_iter(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, " "submitted=%d, suspended=%d", m->id, stream->id, stream->started, stream->scheduled, stream->submitted, stream->suspended); return 1; } apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) { apr_status_t status; int acquired; h2_workers_unregister(m->workers, m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { int i, wait_secs = 5; if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): release_join with %d streams open, " "%d streams resume, %d streams ready, %d tasks", m->id, (int)h2_ihash_count(m->streams), (int)h2_ihash_count(m->sresume), (int)h2_ihash_count(m->sready), (int)h2_ihash_count(m->tasks)); h2_ihash_iter(m->streams, report_stream_iter, m); } /* disable WINDOW_UPDATE callbacks */ h2_mplx_set_consumed_cb(m, NULL, NULL); if (!h2_ihash_empty(m->shold)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld): start release_join with %d streams in hold", m->id, (int)h2_ihash_count(m->shold)); } if (!h2_ihash_empty(m->spurge)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld): start release_join with %d streams to purge", m->id, (int)h2_ihash_count(m->spurge)); } h2_iq_clear(m->q); apr_thread_cond_broadcast(m->task_thawed); while (!h2_ihash_iter(m->streams, stream_done_iter, m)) { /* iterate until all streams have been removed */ } AP_DEBUG_ASSERT(h2_ihash_empty(m->streams)); if (!h2_ihash_empty(m->shold)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld): 2. release_join with %d streams in hold", m->id, (int)h2_ihash_count(m->shold)); } if (!h2_ihash_empty(m->spurge)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld): 2. release_join with %d streams to purge", m->id, (int)h2_ihash_count(m->spurge)); } /* If we still have busy workers, we cannot release our memory * pool yet, as tasks have references to us. * Any operation on the task slave connection will from now on * be errored ECONNRESET/ABORTED, so processing them should fail * and workers *should* return in a timely fashion. */ for (i = 0; m->workers_busy > 0; ++i) { h2_ihash_iter(m->tasks, task_abort_connection, m); m->join_wait = wait; status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs)); if (APR_STATUS_IS_TIMEUP(status)) { if (i > 0) { /* Oh, oh. Still we wait for assigned workers to report that * they are done. Unless we have a bug, a worker seems to be hanging. * If we exit now, all will be deallocated and the worker, once * it does return, will walk all over freed memory... */ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198) "h2_mplx(%ld): release, waiting for %d seconds now for " "%d h2_workers to return, have still %d tasks outstanding", m->id, i*wait_secs, m->workers_busy, (int)h2_ihash_count(m->tasks)); if (i == 1) { h2_ihash_iter(m->tasks, task_print, m); } } h2_mplx_abort(m); apr_thread_cond_broadcast(m->task_thawed); } } AP_DEBUG_ASSERT(h2_ihash_empty(m->shold)); if (!h2_ihash_empty(m->spurge)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld): 3. release_join %d streams to purge", m->id, (int)h2_ihash_count(m->spurge)); purge_streams(m); } AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge)); if (!h2_ihash_empty(m->tasks)) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) "h2_mplx(%ld): release_join -> destroy, " "%d tasks still present", m->id, (int)h2_ihash_count(m->tasks)); } leave_mutex(m, acquired); h2_mplx_destroy(m); /* all gone */ } return status; } void h2_mplx_abort(h2_mplx *m) { int acquired; AP_DEBUG_ASSERT(m); if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) { m->aborted = 1; h2_ngn_shed_abort(m->ngn_shed); leave_mutex(m, acquired); } } apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream) { apr_status_t status = APR_SUCCESS; int acquired; AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld-%d): marking stream as done.", m->id, stream->id); stream_done(m, stream, stream->rst_error); purge_streams(m); leave_mutex(m, acquired); } return status; } void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) { m->input_consumed = cb; m->input_consumed_ctx = ctx; } static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response) { apr_status_t status = APR_SUCCESS; h2_task *task = h2_ihash_get(m->tasks, stream_id); h2_stream *stream = h2_ihash_get(m->streams, stream_id); if (!task || !stream) { return APR_ECONNABORTED; } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%s): open response: %d, rst=%d", task->id, response->http_status, response->rst_error); h2_task_set_response(task, response); if (task->output.beam) { h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem); h2_beam_timeout_set(task->output.beam, m->stream_timeout); h2_beam_on_consumed(task->output.beam, stream_output_consumed, task); m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam); h2_beam_on_file_beam(task->output.beam, can_beam_file, m); h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m); } h2_ihash_add(m->sready, stream); if (response && response->http_status < 300) { /* we might see some file buckets in the output, see * if we have enough handles reserved. */ check_tx_reservation(m); } have_out_data_for(m, stream_id); return status; } apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response) { apr_status_t status; int acquired; AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { status = APR_ECONNABORTED; } else { status = out_open(m, stream_id, response); } leave_mutex(m, acquired); } return status; } static apr_status_t out_close(h2_mplx *m, h2_task *task) { apr_status_t status = APR_SUCCESS; h2_stream *stream; if (!task) { return APR_ECONNABORTED; } stream = h2_ihash_get(m->streams, task->stream_id); if (!stream) { return APR_ECONNABORTED; } if (!task->response && !task->rst_error) { /* In case a close comes before a response was created, * insert an error one so that our streams can properly reset. */ h2_response *r = h2_response_die(task->stream_id, 500, task->request, m->pool); status = out_open(m, task->stream_id, r); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, APLOGNO(03393) "h2_mplx(%s): close, no response, no rst", task->id); } ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, "h2_mplx(%s): close", task->id); if (task->output.beam) { status = h2_beam_close(task->output.beam); h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c, APLOG_TRACE2); } output_consumed_signal(m, task); have_out_data_for(m, task->stream_id); return status; } apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, apr_thread_cond_t *iowait) { apr_status_t status; int acquired; AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { status = APR_ECONNABORTED; } else if (!h2_ihash_empty(m->sready) || !h2_ihash_empty(m->sresume)) { status = APR_SUCCESS; } else { purge_streams(m); m->added_output = iowait; status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); if (APLOGctrace2(m->c)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld): trywait on data for %f ms)", m->id, timeout/1000.0); } m->added_output = NULL; } leave_mutex(m, acquired); } return status; } static void have_out_data_for(h2_mplx *m, int stream_id) { (void)stream_id; AP_DEBUG_ASSERT(m); if (m->added_output) { apr_thread_cond_signal(m->added_output); } } apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; int acquired; AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { status = APR_ECONNABORTED; } else { h2_iq_sort(m->q, cmp, ctx); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): reprioritize tasks", m->id); } leave_mutex(m, acquired); } return status; } apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; int do_registration = 0; int acquired; AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { status = APR_ECONNABORTED; } else { h2_ihash_add(m->streams, stream); if (stream->response) { /* already have a respone, schedule for submit */ h2_ihash_add(m->sready, stream); } else { h2_beam_create(&stream->input, stream->pool, stream->id, "input", 0); if (!m->need_registration) { m->need_registration = h2_iq_empty(m->q); } if (m->workers_busy < m->workers_max) { do_registration = m->need_registration; } h2_iq_add(m->q, stream->id, cmp, ctx); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, "h2_mplx(%ld-%d): process, body=%d", m->c->id, stream->id, stream->request->body); } } leave_mutex(m, acquired); } if (do_registration) { m->need_registration = 0; h2_workers_register(m->workers, m); } return status; } static h2_task *pop_task(h2_mplx *m) { h2_task *task = NULL; h2_stream *stream; int sid; while (!m->aborted && !task && (m->workers_busy < m->workers_limit) && (sid = h2_iq_shift(m->q)) > 0) { stream = h2_ihash_get(m->streams, sid); if (stream) { conn_rec *slave, **pslave; int new_conn = 0; pslave = (conn_rec **)apr_array_pop(m->spare_slaves); if (pslave) { slave = *pslave; } else { slave = h2_slave_create(m->c, m->pool, NULL); new_conn = 1; } slave->sbh = m->c->sbh; slave->aborted = 0; task = h2_task_create(slave, stream->request, stream->input, m); h2_ihash_add(m->tasks, task); m->c->keepalives++; apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id); if (new_conn) { h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave)); } stream->started = 1; task->worker_started = 1; task->started_at = apr_time_now(); if (sid > m->max_stream_started) { m->max_stream_started = sid; } if (stream->input) { h2_beam_timeout_set(stream->input, m->stream_timeout); h2_beam_on_consumed(stream->input, stream_input_consumed, m); h2_beam_on_file_beam(stream->input, can_beam_file, m); h2_beam_mutex_set(stream->input, beam_enter, task->cond, m); } ++m->workers_busy; } } return task; } h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) { h2_task *task = NULL; apr_status_t status; int acquired; AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { *has_more = 0; } else { task = pop_task(m); *has_more = !h2_iq_empty(m->q); } if (has_more && !task) { m->need_registration = 1; } leave_mutex(m, acquired); } return task; } static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) { if (task->frozen) { /* this task was handed over to an engine for processing * and the original worker has finished. That means the * engine may start processing now. */ h2_task_thaw(task); /* we do not want the task to block on writing response * bodies into the mplx. */ h2_task_set_io_blocking(task, 0); apr_thread_cond_broadcast(m->task_thawed); return; } else { h2_stream *stream; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): task(%s) done", m->id, task->id); out_close(m, task); stream = h2_ihash_get(m->streams, task->stream_id); if (ngn) { apr_off_t bytes = 0; if (task->output.beam) { h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ); bytes += h2_beam_get_buffered(task->output.beam); } if (bytes > 0) { /* we need to report consumed and current buffered output * to the engine. The request will be streamed out or cancelled, * no more data is coming from it and the engine should update * its calculations before we destroy this information. */ h2_req_engine_out_consumed(ngn, task->c, bytes); } } if (task->engine) { if (!h2_req_engine_is_shutdown(task->engine)) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, "h2_mplx(%ld): task(%s) has not-shutdown " "engine(%s)", m->id, task->id, h2_req_engine_get_id(task->engine)); } h2_ngn_shed_done_ngn(m->ngn_shed, task->engine); } if (!m->aborted && stream && m->redo_tasks && h2_ihash_get(m->redo_tasks, task->stream_id)) { /* reset and schedule again */ h2_task_redo(task); h2_ihash_remove(m->redo_tasks, task->stream_id); h2_iq_add(m->q, task->stream_id, NULL, NULL); return; } task->worker_done = 1; task->done_at = apr_time_now(); if (task->output.beam) { h2_beam_on_consumed(task->output.beam, NULL, NULL); h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL); } ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%s): request done, %f ms elapsed", task->id, (task->done_at - task->started_at) / 1000.0); if (task->started_at > m->last_idle_block) { /* this task finished without causing an 'idle block', e.g. * a block by flow control. */ if (task->done_at- m->last_limit_change >= m->limit_change_interval && m->workers_limit < m->workers_max) { /* Well behaving stream, allow it more workers */ m->workers_limit = H2MIN(m->workers_limit * 2, m->workers_max); m->last_limit_change = task->done_at; m->need_registration = 1; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): increase worker limit to %d", m->id, m->workers_limit); } } if (stream) { /* hang around until the stream deregisters */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%s): task_done, stream still open", task->id); if (h2_stream_is_suspended(stream)) { /* more data will not arrive, resume the stream */ h2_ihash_add(m->sresume, stream); have_out_data_for(m, stream->id); } } else { /* stream no longer active, was it placed in hold? */ stream = h2_ihash_get(m->shold, task->stream_id); if (stream) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%s): task_done, stream in hold", task->id); /* We cannot destroy the stream here since this is * called from a worker thread and freeing memory pools * is only safe in the only thread using it (and its * parent pool / allocator) */ h2_ihash_remove(m->shold, stream->id); h2_ihash_add(m->spurge, stream); } else { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%s): task_done, stream not found", task->id); task_destroy(m, task, 0); } if (m->join_wait) { apr_thread_cond_signal(m->join_wait); } } } } void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) { int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { task_done(m, task, NULL); --m->workers_busy; if (ptask) { /* caller wants another task */ *ptask = pop_task(m); } leave_mutex(m, acquired); } } /******************************************************************************* * h2_mplx DoS protection ******************************************************************************/ static int latest_repeatable_unsubmitted_iter(void *data, void *val) { task_iter_ctx *ctx = data; h2_task *task = val; if (!task->worker_done && h2_task_can_redo(task) && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) { /* this task occupies a worker, the response has not been submitted yet, * not been cancelled and it is a repeatable request * -> it can be re-scheduled later */ if (!ctx->task || ctx->task->started_at < task->started_at) { /* we did not have one or this one was started later */ ctx->task = task; } } return 1; } static h2_task *get_latest_repeatable_unsubmitted_task(h2_mplx *m) { task_iter_ctx ctx; ctx.m = m; ctx.task = NULL; h2_ihash_iter(m->tasks, latest_repeatable_unsubmitted_iter, &ctx); return ctx.task; } static int timed_out_busy_iter(void *data, void *val) { task_iter_ctx *ctx = data; h2_task *task = val; if (!task->worker_done && (ctx->now - task->started_at) > ctx->m->stream_timeout) { /* timed out stream occupying a worker, found */ ctx->task = task; return 0; } return 1; } static h2_task *get_timed_out_busy_task(h2_mplx *m) { task_iter_ctx ctx; ctx.m = m; ctx.task = NULL; ctx.now = apr_time_now(); h2_ihash_iter(m->tasks, timed_out_busy_iter, &ctx); return ctx.task; } static apr_status_t unschedule_slow_tasks(h2_mplx *m) { h2_task *task; int n; if (!m->redo_tasks) { m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id)); } /* Try to get rid of streams that occupy workers. Look for safe requests * that are repeatable. If none found, fail the connection. */ n = (m->workers_busy - m->workers_limit - h2_ihash_count(m->redo_tasks)); while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) { h2_task_rst(task, H2_ERR_CANCEL); h2_ihash_add(m->redo_tasks, task); --n; } if ((m->workers_busy - h2_ihash_count(m->redo_tasks)) > m->workers_limit) { task = get_timed_out_busy_task(m); if (task) { /* Too many busy workers, unable to cancel enough streams * and with a busy, timed out stream, we tell the client * to go away... */ return APR_TIMEUP; } } return APR_SUCCESS; } apr_status_t h2_mplx_idle(h2_mplx *m) { apr_status_t status = APR_SUCCESS; apr_time_t now; int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { apr_size_t scount = h2_ihash_count(m->streams); if (scount > 0 && m->workers_busy) { /* If we have streams in connection state 'IDLE', meaning * all streams are ready to sent data out, but lack * WINDOW_UPDATEs. * * This is ok, unless we have streams that still occupy * h2 workers. As worker threads are a scarce resource, * we need to take measures that we do not get DoSed. * * This is what we call an 'idle block'. Limit the amount * of busy workers we allow for this connection until it * well behaves. */ now = apr_time_now(); m->last_idle_block = now; if (m->workers_limit > 2 && now - m->last_limit_change >= m->limit_change_interval) { if (m->workers_limit > 16) { m->workers_limit = 16; } else if (m->workers_limit > 8) { m->workers_limit = 8; } else if (m->workers_limit > 4) { m->workers_limit = 4; } else if (m->workers_limit > 2) { m->workers_limit = 2; } m->last_limit_change = now; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): decrease worker limit to %d", m->id, m->workers_limit); } if (m->workers_busy > m->workers_limit) { status = unschedule_slow_tasks(m); } } leave_mutex(m, acquired); } return status; } /******************************************************************************* * HTTP/2 request engines ******************************************************************************/ typedef struct { h2_mplx * m; h2_req_engine *ngn; int streams_updated; } ngn_update_ctx; static int ngn_update_window(void *ctx, void *val) { ngn_update_ctx *uctx = ctx; h2_task *task = val; if (task && task->assigned == uctx->ngn && output_consumed_signal(uctx->m, task)) { ++uctx->streams_updated; } return 1; } static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn) { ngn_update_ctx ctx; ctx.m = m; ctx.ngn = ngn; ctx.streams_updated = 0; h2_ihash_iter(m->tasks, ngn_update_window, &ctx); return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN; } apr_status_t h2_mplx_req_engine_push(const char *ngn_type, request_rec *r, http2_req_engine_init *einit) { apr_status_t status; h2_mplx *m; h2_task *task; int acquired; task = h2_ctx_rget_task(r); if (!task) { return APR_ECONNABORTED; } m = task->mplx; task->r = r; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); if (stream) { status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit); } else { status = APR_ECONNABORTED; } leave_mutex(m, acquired); } return status; } apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, apr_read_type_e block, apr_uint32_t capacity, request_rec **pr) { h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn); h2_mplx *m = h2_ngn_shed_get_ctx(shed); apr_status_t status; h2_task *task = NULL; int acquired; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { int want_shutdown = (block == APR_BLOCK_READ); /* Take this opportunity to update output consummation * for this engine */ ngn_out_update_windows(m, ngn); if (want_shutdown && !h2_iq_empty(m->q)) { /* For a blocking read, check first if requests are to be * had and, if not, wait a short while before doing the * blocking, and if unsuccessful, terminating read. */ status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task); if (APR_STATUS_IS_EAGAIN(status)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): start block engine pull", m->id); apr_thread_cond_timedwait(m->task_thawed, m->lock, apr_time_from_msec(20)); status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task); } } else { status = h2_ngn_shed_pull_task(shed, ngn, capacity, want_shutdown, &task); } leave_mutex(m, acquired); } *pr = task? task->r : NULL; return status; } void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn) { h2_task *task = h2_ctx_cget_task(r_conn); if (task) { h2_mplx *m = task->mplx; int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { ngn_out_update_windows(m, ngn); h2_ngn_shed_done_task(m->ngn_shed, ngn, task); if (task->engine) { /* cannot report that as done until engine returns */ } else { task_done(m, task, ngn); } /* Take this opportunity to update output consummation * for this engine */ leave_mutex(m, acquired); } } } /******************************************************************************* * mplx master events dispatching ******************************************************************************/ static int update_window(void *ctx, void *val) { input_consumed_signal(ctx, val); return 1; } apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, stream_ev_callback *on_resume, stream_ev_callback *on_response, void *on_ctx) { apr_status_t status; int acquired; int streams[32]; h2_stream *stream; h2_task *task; size_t i, n; AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, "h2_mplx(%ld): dispatch events", m->id); /* update input windows for streams */ h2_ihash_iter(m->streams, update_window, m); if (on_response && !h2_ihash_empty(m->sready)) { n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams)); for (i = 0; i < n; ++i) { stream = h2_ihash_get(m->streams, streams[i]); if (!stream) { continue; } ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, "h2_mplx(%ld-%d): on_response", m->id, stream->id); task = h2_ihash_get(m->tasks, stream->id); if (task) { task->submitted = 1; if (task->rst_error) { h2_stream_rst(stream, task->rst_error); } else { AP_DEBUG_ASSERT(task->response); h2_stream_set_response(stream, task->response, task->output.beam); } } else { /* We have the stream ready without a task. This happens * when we fail streams early. A response should already * be present. */ AP_DEBUG_ASSERT(stream->response || stream->rst_error); } status = on_response(on_ctx, stream->id); } } if (on_resume && !h2_ihash_empty(m->sresume)) { n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams)); for (i = 0; i < n; ++i) { stream = h2_ihash_get(m->streams, streams[i]); if (!stream) { continue; } ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, "h2_mplx(%ld-%d): on_resume", m->id, stream->id); h2_stream_set_suspended(stream, 0); status = on_resume(on_ctx, stream->id); } } leave_mutex(m, acquired); } return status; } static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) { h2_mplx *m = ctx; apr_status_t status; h2_stream *stream; int acquired; AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { stream = h2_ihash_get(m->streams, beam->id); if (stream && h2_stream_is_suspended(stream)) { h2_ihash_add(m->sresume, stream); h2_beam_on_produced(beam, NULL, NULL); have_out_data_for(m, beam->id); } leave_mutex(m, acquired); } } apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id) { apr_status_t status; h2_stream *stream; h2_task *task; int acquired; AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { stream = h2_ihash_get(m->streams, stream_id); if (stream) { h2_stream_set_suspended(stream, 1); task = h2_ihash_get(m->tasks, stream->id); if (stream->started && (!task || task->worker_done)) { h2_ihash_add(m->sresume, stream); } else { /* register callback so that we can resume on new output */ h2_beam_on_produced(task->output.beam, output_produced, m); } } leave_mutex(m, acquired); } return status; }