summaryrefslogtreecommitdiff
path: root/modules/http2/h2_mplx.h
diff options
context:
space:
mode:
Diffstat (limited to 'modules/http2/h2_mplx.h')
-rw-r--r--modules/http2/h2_mplx.h121
1 files changed, 28 insertions, 93 deletions
diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h
index 40298476..821e6d65 100644
--- a/modules/http2/h2_mplx.h
+++ b/modules/http2/h2_mplx.h
@@ -37,21 +37,20 @@
struct apr_pool_t;
struct apr_thread_mutex_t;
struct apr_thread_cond_t;
+struct h2_bucket_beam;
struct h2_config;
struct h2_ihash_t;
struct h2_response;
struct h2_task;
struct h2_stream;
struct h2_request;
-struct h2_io_set;
struct apr_thread_cond_t;
struct h2_workers;
-struct h2_int_queue;
+struct h2_iqueue;
struct h2_ngn_shed;
struct h2_req_engine;
#include <apr_queue.h>
-#include "h2_io.h"
typedef struct h2_mplx h2_mplx;
@@ -72,10 +71,16 @@ struct h2_mplx {
unsigned int aborted : 1;
unsigned int need_registration : 1;
- struct h2_int_queue *q;
- struct h2_io_set *stream_ios;
- struct h2_io_set *ready_ios;
- struct h2_io_set *redo_ios;
+ struct h2_ihash_t *streams; /* all streams currently processing */
+ struct h2_ihash_t *shold; /* all streams done with task ongoing */
+ struct h2_ihash_t *spurge; /* all streams done, ready for destroy */
+
+ struct h2_iqueue *q; /* all stream ids that need to be started */
+ struct h2_ihash_t *sready; /* all streams ready for response */
+ struct h2_ihash_t *sresume; /* all streams that can be resumed */
+
+ struct h2_ihash_t *tasks; /* all tasks started and not destroyed */
+ struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */
apr_uint32_t max_streams; /* max # of concurrent streams */
apr_uint32_t max_stream_started; /* highest stream id that started processing */
@@ -96,10 +101,11 @@ struct h2_mplx {
apr_size_t stream_max_mem;
apr_interval_time_t stream_timeout;
+ apr_pool_t *spare_io_pool;
apr_array_header_t *spare_slaves; /* spare slave connections */
struct h2_workers *workers;
- apr_size_t tx_handles_reserved;
+ int tx_handles_reserved;
apr_size_t tx_chunk_size;
h2_mplx_consumed_cb *input_consumed;
@@ -160,15 +166,11 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m);
* Notifies mplx that a stream has finished processing.
*
* @param m the mplx itself
- * @param stream_id the id of the stream being done
+ * @param stream the id of the stream being done
* @param rst_error if != 0, the stream was reset with the error given
*
*/
-apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
-
-/* Return != 0 iff the multiplexer has output data for the given stream.
- */
-int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
+apr_status_t h2_mplx_stream_done(h2_mplx *m, struct h2_stream *stream);
/**
* Waits on output data from any stream in this session to become available.
@@ -185,13 +187,12 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
* Process a stream request.
*
* @param m the multiplexer
- * @param stream_id the identifier of the stream
+ * @param stream the identifier of the stream
* @param r the request to be processed
* @param cmp the stream priority compare function
* @param ctx context data for the compare function
*/
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
- const struct h2_request *r,
+apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
h2_stream_pri_cmp *cmp, void *ctx);
/**
@@ -214,96 +215,30 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
*/
void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx);
-/*******************************************************************************
- * Input handling of streams.
- ******************************************************************************/
-/**
- * Reads a buckets for the given stream_id. Will return ARP_EAGAIN when
- * called with APR_NONBLOCK_READ and no data present. Will return APR_EOF
- * when the end of the stream input has been reached.
- * The condition passed in will be used for blocking/signalling and will
- * be protected by the mplx's own mutex.
- */
-apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
- int stream_id, apr_bucket_brigade *bb,
- apr_table_t *trailers,
- struct apr_thread_cond_t *iowait);
-
-/**
- * Appends data to the input of the given stream. Storage of input data is
- * not subject to flow control.
- */
-apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
- const char *data, apr_size_t len, int eos);
+typedef apr_status_t stream_ev_callback(void *ctx, int stream_id);
/**
- * Closes the input for the given stream_id.
+ * Dispatch events for the master connection, such as
+ * - resume: new output data has arrived for a suspended stream
+ * - response: the response for a stream is ready
*/
-apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id);
+apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
+ stream_ev_callback *on_resume,
+ stream_ev_callback *on_response,
+ void *ctx);
-/**
- * Invoke the consumed callback for all streams that had bytes read since the
- * last call to this function. If no stream had input data consumed, the
- * callback is not invoked.
- * The consumed callback may also be invoked at other times whenever
- * the need arises.
- * Returns APR_SUCCESS when an update happened, APR_EAGAIN if no update
- * happened.
- */
-apr_status_t h2_mplx_in_update_windows(h2_mplx *m);
+apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id);
/*******************************************************************************
* Output handling of streams.
******************************************************************************/
/**
- * Get a stream whose response is ready for submit. Will set response and
- * any out data available in stream.
- * @param m the mplxer to get a response from
- * @param bb the brigade to place any existing repsonse body data into
- */
-struct h2_stream *h2_mplx_next_submit(h2_mplx *m,
- struct h2_ihash_t *streams);
-
-/**
- * Reads output data into the given brigade. Will never block, but
- * return APR_EAGAIN until data arrives or the stream is closed.
- */
-apr_status_t h2_mplx_out_get_brigade(h2_mplx *mplx, int stream_id,
- apr_bucket_brigade *bb,
- apr_off_t len, apr_table_t **ptrailers);
-
-/**
* Opens the output for the given stream with the specified response.
*/
apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,
- struct h2_response *response,
- ap_filter_t* filter, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait);
-
-/**
- * Append the brigade to the stream output. Might block if amount
- * of bytes buffered reaches configured max.
- * @param stream_id the stream identifier
- * @param filter the apache filter context of the data
- * @param blocking == 0 iff call should return with APR_INCOMPLETE if
- * the full brigade cannot be written at once
- * @param bb the bucket brigade to append
- * @param iowait a conditional used for block/signalling in h2_mplx
- */
-apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id,
- ap_filter_t* filter,
- int blocking,
- apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait);
-
-/**
- * Closes the output for stream stream_id.
- */
-apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id);
-
-apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error);
+ struct h2_response *response);
/*******************************************************************************
* h2_mplx list Manipulation.