/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include #include "h2.h" #include "h2_private.h" #include "h2_mplx.h" #include "h2_task.h" #include "h2_worker.h" #include "h2_workers.h" static int in_list(h2_workers *workers, h2_mplx *m) { h2_mplx *e; for (e = H2_MPLX_LIST_FIRST(&workers->mplxs); e != H2_MPLX_LIST_SENTINEL(&workers->mplxs); e = H2_MPLX_NEXT(e)) { if (e == m) { return 1; } } return 0; } static void cleanup_zombies(h2_workers *workers, int lock) { if (lock) { apr_thread_mutex_lock(workers->lock); } while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) { h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies); H2_WORKER_REMOVE(zombie); ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, "h2_workers: cleanup zombie %d", zombie->id); h2_worker_destroy(zombie); } if (lock) { apr_thread_mutex_unlock(workers->lock); } } static h2_task *next_task(h2_workers *workers) { h2_task *task = NULL; h2_mplx *last = NULL; int has_more; /* Get the next h2_mplx to process that has a task to hand out. * If it does, place it at the end of the queu and return the * task to the worker. * If it (currently) has no tasks, remove it so that it needs * to register again for scheduling. * If we run out of h2_mplx in the queue, we need to wait for * new mplx to arrive. Depending on how many workers do exist, * we do a timed wait or block indefinitely. */ while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) { h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs); if (last == m) { break; } H2_MPLX_REMOVE(m); --workers->mplx_count; task = h2_mplx_pop_task(m, &has_more); if (has_more) { H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m); ++workers->mplx_count; if (!last) { last = m; } } } return task; } /** * Get the next task for the given worker. Will block until a task arrives * or the max_wait timer expires and more than min workers exist. */ static apr_status_t get_mplx_next(h2_worker *worker, void *ctx, h2_task **ptask, int *psticky) { apr_status_t status; apr_time_t wait_until = 0, now; h2_workers *workers = ctx; h2_task *task = NULL; *ptask = NULL; *psticky = 0; status = apr_thread_mutex_lock(workers->lock); if (status == APR_SUCCESS) { ++workers->idle_workers; ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, "h2_worker(%d): looking for work", worker->id); while (!h2_worker_is_aborted(worker) && !workers->aborted && !(task = next_task(workers))) { /* Need to wait for a new tasks to arrive. If we are above * minimum workers, we do a timed wait. When timeout occurs * and we have still more workers, we shut down one after * the other. */ cleanup_zombies(workers, 0); if (workers->worker_count > workers->min_workers) { now = apr_time_now(); if (now >= wait_until) { wait_until = now + apr_time_from_sec(workers->max_idle_secs); } ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, "h2_worker(%d): waiting signal, " "workers=%d, idle=%d", worker->id, (int)workers->worker_count, workers->idle_workers); status = apr_thread_cond_timedwait(workers->mplx_added, workers->lock, wait_until - now); if (status == APR_TIMEUP && workers->worker_count > workers->min_workers) { /* waited long enough without getting a task and * we are above min workers, abort this one. */ ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, "h2_workers: aborting idle worker"); h2_worker_abort(worker); break; } } else { ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, "h2_worker(%d): waiting signal (eternal), " "worker_count=%d, idle=%d", worker->id, (int)workers->worker_count, workers->idle_workers); apr_thread_cond_wait(workers->mplx_added, workers->lock); } } /* Here, we either have gotten task or decided to shut down * the calling worker. */ if (task) { /* Ok, we got something to give back to the worker for execution. * If we have more idle workers than h2_mplx in our queue, then * we let the worker be sticky, e.g. making it poll the task's * h2_mplx instance for more work before asking back here. * This avoids entering our global lock as long as enough idle * workers remain. Stickiness of a worker ends when the connection * has no new tasks to process, so the worker will get back here * eventually. */ *ptask = task; *psticky = (workers->max_workers >= workers->mplx_count); if (workers->mplx_count && workers->idle_workers > 1) { apr_thread_cond_signal(workers->mplx_added); } } --workers->idle_workers; apr_thread_mutex_unlock(workers->lock); } return *ptask? APR_SUCCESS : APR_EOF; } static void worker_done(h2_worker *worker, void *ctx) { h2_workers *workers = ctx; apr_status_t status = apr_thread_mutex_lock(workers->lock); if (status == APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, "h2_worker(%d): done", worker->id); H2_WORKER_REMOVE(worker); --workers->worker_count; H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker); apr_thread_mutex_unlock(workers->lock); } } static apr_status_t add_worker(h2_workers *workers) { h2_worker *w = h2_worker_create(workers->next_worker_id++, workers->pool, workers->thread_attr, get_mplx_next, worker_done, workers); if (!w) { return APR_ENOMEM; } ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, "h2_workers: adding worker(%d)", w->id); ++workers->worker_count; H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w); return APR_SUCCESS; } static apr_status_t h2_workers_start(h2_workers *workers) { apr_status_t status = apr_thread_mutex_lock(workers->lock); if (status == APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, "h2_workers: starting"); while (workers->worker_count < workers->min_workers && status == APR_SUCCESS) { status = add_worker(workers); } apr_thread_mutex_unlock(workers->lock); } return status; } h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, int min_workers, int max_workers, apr_size_t max_tx_handles) { apr_status_t status; h2_workers *workers; apr_pool_t *pool; AP_DEBUG_ASSERT(s); AP_DEBUG_ASSERT(server_pool); /* let's have our own pool that will be parent to all h2_worker * instances we create. This happens in various threads, but always * guarded by our lock. Without this pool, all subpool creations would * happen on the pool handed to us, which we do not guard. */ apr_pool_create(&pool, server_pool); apr_pool_tag(pool, "h2_workers"); workers = apr_pcalloc(pool, sizeof(h2_workers)); if (workers) { workers->s = s; workers->pool = pool; workers->min_workers = min_workers; workers->max_workers = max_workers; workers->max_idle_secs = 10; workers->max_tx_handles = max_tx_handles; workers->spare_tx_handles = workers->max_tx_handles; apr_threadattr_create(&workers->thread_attr, workers->pool); if (ap_thread_stacksize != 0) { apr_threadattr_stacksize_set(workers->thread_attr, ap_thread_stacksize); ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s, "h2_workers: using stacksize=%ld", (long)ap_thread_stacksize); } APR_RING_INIT(&workers->workers, h2_worker, link); APR_RING_INIT(&workers->zombies, h2_worker, link); APR_RING_INIT(&workers->mplxs, h2_mplx, link); status = apr_thread_mutex_create(&workers->lock, APR_THREAD_MUTEX_DEFAULT, workers->pool); if (status == APR_SUCCESS) { status = apr_thread_cond_create(&workers->mplx_added, workers->pool); } if (status == APR_SUCCESS) { status = apr_thread_mutex_create(&workers->tx_lock, APR_THREAD_MUTEX_DEFAULT, workers->pool); } if (status == APR_SUCCESS) { status = h2_workers_start(workers); } if (status != APR_SUCCESS) { h2_workers_destroy(workers); workers = NULL; } } return workers; } void h2_workers_destroy(h2_workers *workers) { /* before we go, cleanup any zombie workers that may have accumulated */ cleanup_zombies(workers, 1); if (workers->mplx_added) { apr_thread_cond_destroy(workers->mplx_added); workers->mplx_added = NULL; } if (workers->lock) { apr_thread_mutex_destroy(workers->lock); workers->lock = NULL; } while (!H2_MPLX_LIST_EMPTY(&workers->mplxs)) { h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs); H2_MPLX_REMOVE(m); } while (!H2_WORKER_LIST_EMPTY(&workers->workers)) { h2_worker *w = H2_WORKER_LIST_FIRST(&workers->workers); H2_WORKER_REMOVE(w); } if (workers->pool) { apr_pool_destroy(workers->pool); /* workers is gone */ } } apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m) { apr_status_t status = apr_thread_mutex_lock(workers->lock); if (status == APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s, "h2_workers: register mplx(%ld), idle=%d", m->id, workers->idle_workers); if (in_list(workers, m)) { status = APR_EAGAIN; } else { H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m); ++workers->mplx_count; status = APR_SUCCESS; } if (workers->idle_workers > 0) { apr_thread_cond_signal(workers->mplx_added); } else if (status == APR_SUCCESS && workers->worker_count < workers->max_workers) { ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, "h2_workers: got %d worker, adding 1", workers->worker_count); add_worker(workers); } apr_thread_mutex_unlock(workers->lock); } return status; } apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m) { apr_status_t status = apr_thread_mutex_lock(workers->lock); if (status == APR_SUCCESS) { status = APR_EAGAIN; if (in_list(workers, m)) { H2_MPLX_REMOVE(m); status = APR_SUCCESS; } apr_thread_mutex_unlock(workers->lock); } return status; } void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs) { if (idle_secs <= 0) { ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s, APLOGNO(02962) "h2_workers: max_worker_idle_sec value of %d" " is not valid, ignored.", idle_secs); return; } workers->max_idle_secs = idle_secs; } apr_size_t h2_workers_tx_reserve(h2_workers *workers, apr_size_t count) { apr_status_t status = apr_thread_mutex_lock(workers->tx_lock); if (status == APR_SUCCESS) { count = H2MIN(workers->spare_tx_handles, count); workers->spare_tx_handles -= count; ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s, "h2_workers: reserved %d tx handles, %d/%d left", (int)count, (int)workers->spare_tx_handles, (int)workers->max_tx_handles); apr_thread_mutex_unlock(workers->tx_lock); return count; } return 0; } void h2_workers_tx_free(h2_workers *workers, apr_size_t count) { apr_status_t status = apr_thread_mutex_lock(workers->tx_lock); if (status == APR_SUCCESS) { workers->spare_tx_handles += count; ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s, "h2_workers: freed %d tx handles, %d/%d left", (int)count, (int)workers->spare_tx_handles, (int)workers->max_tx_handles); apr_thread_mutex_unlock(workers->tx_lock); } }