diff options
Diffstat (limited to 'modules/http2/h2_mplx.h')
-rw-r--r-- | modules/http2/h2_mplx.h | 121 |
1 files changed, 28 insertions, 93 deletions
diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 40298476..821e6d65 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -37,21 +37,20 @@ struct apr_pool_t; struct apr_thread_mutex_t; struct apr_thread_cond_t; +struct h2_bucket_beam; struct h2_config; struct h2_ihash_t; struct h2_response; struct h2_task; struct h2_stream; struct h2_request; -struct h2_io_set; struct apr_thread_cond_t; struct h2_workers; -struct h2_int_queue; +struct h2_iqueue; struct h2_ngn_shed; struct h2_req_engine; #include <apr_queue.h> -#include "h2_io.h" typedef struct h2_mplx h2_mplx; @@ -72,10 +71,16 @@ struct h2_mplx { unsigned int aborted : 1; unsigned int need_registration : 1; - struct h2_int_queue *q; - struct h2_io_set *stream_ios; - struct h2_io_set *ready_ios; - struct h2_io_set *redo_ios; + struct h2_ihash_t *streams; /* all streams currently processing */ + struct h2_ihash_t *shold; /* all streams done with task ongoing */ + struct h2_ihash_t *spurge; /* all streams done, ready for destroy */ + + struct h2_iqueue *q; /* all stream ids that need to be started */ + struct h2_ihash_t *sready; /* all streams ready for response */ + struct h2_ihash_t *sresume; /* all streams that can be resumed */ + + struct h2_ihash_t *tasks; /* all tasks started and not destroyed */ + struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */ apr_uint32_t max_streams; /* max # of concurrent streams */ apr_uint32_t max_stream_started; /* highest stream id that started processing */ @@ -96,10 +101,11 @@ struct h2_mplx { apr_size_t stream_max_mem; apr_interval_time_t stream_timeout; + apr_pool_t *spare_io_pool; apr_array_header_t *spare_slaves; /* spare slave connections */ struct h2_workers *workers; - apr_size_t tx_handles_reserved; + int tx_handles_reserved; apr_size_t tx_chunk_size; h2_mplx_consumed_cb *input_consumed; @@ -160,15 +166,11 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m); * Notifies mplx that a stream has finished processing. * * @param m the mplx itself - * @param stream_id the id of the stream being done + * @param stream the id of the stream being done * @param rst_error if != 0, the stream was reset with the error given * */ -apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error); - -/* Return != 0 iff the multiplexer has output data for the given stream. - */ -int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id); +apr_status_t h2_mplx_stream_done(h2_mplx *m, struct h2_stream *stream); /** * Waits on output data from any stream in this session to become available. @@ -185,13 +187,12 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, * Process a stream request. * * @param m the multiplexer - * @param stream_id the identifier of the stream + * @param stream the identifier of the stream * @param r the request to be processed * @param cmp the stream priority compare function * @param ctx context data for the compare function */ -apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, - const struct h2_request *r, +apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, h2_stream_pri_cmp *cmp, void *ctx); /** @@ -214,96 +215,30 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) */ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx); -/******************************************************************************* - * Input handling of streams. - ******************************************************************************/ -/** - * Reads a buckets for the given stream_id. Will return ARP_EAGAIN when - * called with APR_NONBLOCK_READ and no data present. Will return APR_EOF - * when the end of the stream input has been reached. - * The condition passed in will be used for blocking/signalling and will - * be protected by the mplx's own mutex. - */ -apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block, - int stream_id, apr_bucket_brigade *bb, - apr_table_t *trailers, - struct apr_thread_cond_t *iowait); - -/** - * Appends data to the input of the given stream. Storage of input data is - * not subject to flow control. - */ -apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, - const char *data, apr_size_t len, int eos); +typedef apr_status_t stream_ev_callback(void *ctx, int stream_id); /** - * Closes the input for the given stream_id. + * Dispatch events for the master connection, such as + * - resume: new output data has arrived for a suspended stream + * - response: the response for a stream is ready */ -apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id); +apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, + stream_ev_callback *on_resume, + stream_ev_callback *on_response, + void *ctx); -/** - * Invoke the consumed callback for all streams that had bytes read since the - * last call to this function. If no stream had input data consumed, the - * callback is not invoked. - * The consumed callback may also be invoked at other times whenever - * the need arises. - * Returns APR_SUCCESS when an update happened, APR_EAGAIN if no update - * happened. - */ -apr_status_t h2_mplx_in_update_windows(h2_mplx *m); +apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id); /******************************************************************************* * Output handling of streams. ******************************************************************************/ /** - * Get a stream whose response is ready for submit. Will set response and - * any out data available in stream. - * @param m the mplxer to get a response from - * @param bb the brigade to place any existing repsonse body data into - */ -struct h2_stream *h2_mplx_next_submit(h2_mplx *m, - struct h2_ihash_t *streams); - -/** - * Reads output data into the given brigade. Will never block, but - * return APR_EAGAIN until data arrives or the stream is closed. - */ -apr_status_t h2_mplx_out_get_brigade(h2_mplx *mplx, int stream_id, - apr_bucket_brigade *bb, - apr_off_t len, apr_table_t **ptrailers); - -/** * Opens the output for the given stream with the specified response. */ apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id, - struct h2_response *response, - ap_filter_t* filter, apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait); - -/** - * Append the brigade to the stream output. Might block if amount - * of bytes buffered reaches configured max. - * @param stream_id the stream identifier - * @param filter the apache filter context of the data - * @param blocking == 0 iff call should return with APR_INCOMPLETE if - * the full brigade cannot be written at once - * @param bb the bucket brigade to append - * @param iowait a conditional used for block/signalling in h2_mplx - */ -apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id, - ap_filter_t* filter, - int blocking, - apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait); - -/** - * Closes the output for stream stream_id. - */ -apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id); - -apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error); + struct h2_response *response); /******************************************************************************* * h2_mplx list Manipulation. |