Loading CHANGES +0 −3 Original line number Diff line number Diff line -*- coding: utf-8 -*- Changes with Apache 2.5.1 *) mod_http2: internal code cleanups and simplifications. Common output code for h2 and h2c protocols, using nested mutex locks for simplified calls. [Stefan Eissing] *) mod_proxy/ssl: Proxy SSL client certificate configuration and other proxy SSL configurations broken inside <Proxy> context. PR 63430. [Ruediger Pluem, Yann Ylavic] Loading modules/http2/h2.h +0 −12 Original line number Diff line number Diff line Loading @@ -112,7 +112,6 @@ typedef enum h2_stream_state_t { H2_SS_CLOSED_L, H2_SS_CLOSED, H2_SS_CLEANUP, H2_SS_DESTROYED, H2_SS_MAX } h2_stream_state_t; Loading @@ -124,17 +123,6 @@ typedef enum { H2_SEV_IN_DATA_PENDING, } h2_stream_event_t; typedef enum { H2_PS_NONE, H2_PS_QUEUED, H2_PS_RUNNING, H2_PS_FINISHED, } h2_processing_state_t; #define H2_PS_IS_RUNNING(s) ((s) == H2_PS_RUNNING) #define H2_PS_IS_NOT_RUNNING(s) ((s) != H2_PS_RUNNING) #define H2_PS_IS_WAS_STARTED(s) ((s) >= H2_PS_RUNNING) #define H2_PS_IS_HAS_FINISHED(s) ((s) == H2_PS_FINISHED) /* h2_request is the transformer of HTTP2 streams into HTTP/1.1 internal * format that will be fed to various httpd input filters to finally Loading modules/http2/h2_bucket_beam.c +165 −58 Original line number Diff line number Diff line Loading @@ -24,7 +24,6 @@ #include <httpd.h> #include <http_protocol.h> #include <http_request.h> #include <http_log.h> #include "h2_private.h" Loading Loading @@ -155,30 +154,6 @@ const apr_bucket_type_t h2_bucket_type_beam = { * h2_blist, a brigade without allocations ******************************************************************************/ static void h2_blist_cleanup(h2_blist *bl) { apr_bucket *e; while (!H2_BLIST_EMPTY(bl)) { e = H2_BLIST_FIRST(bl); apr_bucket_delete(e); } } static void brigade_move_to_blist(apr_bucket_brigade *bb, h2_blist *list) { apr_bucket *b; while (bb && !APR_BRIGADE_EMPTY(bb)) { b = APR_BRIGADE_FIRST(bb); APR_BUCKET_REMOVE(b); H2_BLIST_INSERT_TAIL(list, b); } } /******************************************************************************* * bucket beamer registration ******************************************************************************/ static apr_array_header_t *beamers; static apr_status_t cleanup_beamers(void *dummy) Loading Loading @@ -315,6 +290,17 @@ static apr_size_t calc_buffered(h2_bucket_beam *beam) return len; } static void r_purge_sent(h2_bucket_beam *beam) { apr_bucket *b; /* delete all sender buckets in purge brigade, needs to be called * from sender thread only */ while (!H2_BLIST_EMPTY(&beam->purge_list)) { b = H2_BLIST_FIRST(&beam->purge_list); apr_bucket_delete(b); } } static apr_size_t calc_space_left(h2_bucket_beam *beam) { if (beam->max_buf_size > 0) { Loading Loading @@ -449,7 +435,7 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) } else { /* it should be there unless we screwed up */ ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->pool, ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->send_pool, APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not " "in hold, n=%d", beam->id, beam->tag, (int)proxy->n); Loading @@ -458,7 +444,7 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) } /* notify anyone waiting on space to become available */ if (!bl.mutex) { h2_blist_cleanup(&beam->purge_list); r_purge_sent(beam); } else { apr_thread_cond_broadcast(beam->change); Loading @@ -467,6 +453,16 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) } } static void h2_blist_cleanup(h2_blist *bl) { apr_bucket *e; while (!H2_BLIST_EMPTY(bl)) { e = H2_BLIST_FIRST(bl); apr_bucket_delete(e); } } static apr_status_t beam_close(h2_bucket_beam *beam) { if (!beam->closed) { Loading @@ -481,10 +477,40 @@ int h2_beam_is_closed(h2_bucket_beam *beam) return beam->closed; } static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool, apr_status_t (*cleanup)(void *)) { if (pool && pool != beam->pool) { apr_pool_pre_cleanup_register(pool, beam, cleanup); return 1; } return 0; } static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool, apr_status_t (*cleanup)(void *)) { if (pool && pool != beam->pool) { apr_pool_cleanup_kill(pool, beam, cleanup); return 1; } return 0; } static apr_status_t beam_recv_cleanup(void *data) { h2_bucket_beam *beam = data; /* receiver pool has gone away, clear references */ beam->recv_buffer = NULL; beam->recv_pool = NULL; return APR_SUCCESS; } static apr_status_t beam_send_cleanup(void *data) { h2_bucket_beam *beam = data; /* sender is going away, clear up all references to its memory */ r_purge_sent(beam); h2_blist_cleanup(&beam->send_list); report_consumption(beam, NULL); while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) { h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies); Loading @@ -494,10 +520,22 @@ static apr_status_t beam_send_cleanup(void *data) } h2_blist_cleanup(&beam->purge_list); h2_blist_cleanup(&beam->hold_list); h2_blist_cleanup(&beam->send_list); beam->send_pool = NULL; return APR_SUCCESS; } static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool) { if (beam->send_pool != pool) { if (beam->send_pool && beam->send_pool != beam->pool) { pool_kill(beam, beam->send_pool, beam_send_cleanup); beam_send_cleanup(beam); } beam->send_pool = pool; pool_register(beam, beam->send_pool, beam_send_cleanup); } } static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl) { if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) { Loading @@ -521,18 +559,74 @@ static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl) } } apr_status_t h2_beam_destroy(h2_bucket_beam *beam) static apr_status_t beam_cleanup(h2_bucket_beam *beam, int from_pool) { /* no more io callbacks */ apr_status_t status = APR_SUCCESS; int safe_send = (beam->owner == H2_BEAM_OWNER_SEND); int safe_recv = (beam->owner == H2_BEAM_OWNER_RECV); /* * Owner of the beam is going away, depending on which side it owns, * cleanup strategies will differ. * * In general, receiver holds references to memory from sender. * Clean up receiver first, if safe, then cleanup sender, if safe. */ /* When called from pool destroy, io callbacks are disabled */ if (from_pool) { beam->cons_io_cb = NULL; } /* When modify send is not safe, this means we still have multi-thread * protection and the owner is receiving the buckets. If the sending * side has not gone away, this means we could have dangling buckets * in our lists that never get destroyed. This should not happen. */ ap_assert(safe_send || !beam->send_pool); if (!H2_BLIST_EMPTY(&beam->send_list)) { ap_assert(beam->send_pool); } if (safe_recv) { if (beam->recv_pool) { pool_kill(beam, beam->recv_pool, beam_recv_cleanup); beam->recv_pool = NULL; } recv_buffer_cleanup(beam, NULL); } else { beam->recv_buffer = NULL; beam->recv_pool = NULL; } if (safe_send && beam->send_pool) { pool_kill(beam, beam->send_pool, beam_send_cleanup); status = beam_send_cleanup(beam); } if (safe_recv) { ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies)); ap_assert(H2_BLIST_EMPTY(&beam->send_list)); ap_assert(H2_BLIST_EMPTY(&beam->hold_list)); ap_assert(H2_BLIST_EMPTY(&beam->purge_list)); } return status; } static apr_status_t beam_pool_cleanup(void *data) { return beam_cleanup(data, 1); } return beam_send_cleanup(beam); apr_status_t h2_beam_destroy(h2_bucket_beam *beam) { apr_pool_cleanup_kill(beam->pool, beam, beam_pool_cleanup); return beam_cleanup(beam, 0); } apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, int id, const char *tag, h2_beam_owner_t owner, apr_size_t max_buf_size, apr_interval_time_t timeout) { Loading @@ -547,6 +641,7 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, beam->id = id; beam->tag = tag; beam->pool = pool; beam->owner = owner; H2_BLIST_INIT(&beam->send_list); H2_BLIST_INIT(&beam->hold_list); H2_BLIST_INIT(&beam->purge_list); Loading @@ -555,11 +650,14 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, beam->max_buf_size = max_buf_size; beam->timeout = timeout; rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_NESTED, pool); rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool); if (APR_SUCCESS == rv) { rv = apr_thread_cond_create(&beam->change, pool); if (APR_SUCCESS == rv) { apr_pool_pre_cleanup_register(pool, beam, beam_pool_cleanup); *pbeam = beam; } } *pbeam = (APR_SUCCESS == rv)? beam : NULL; return rv; } Loading Loading @@ -613,7 +711,7 @@ void h2_beam_abort(h2_bucket_beam *beam) if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { beam->aborted = 1; h2_blist_cleanup(&beam->purge_list); r_purge_sent(beam); h2_blist_cleanup(&beam->send_list); report_consumption(beam, &bl); apr_thread_cond_broadcast(beam->change); Loading @@ -626,7 +724,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam) h2_beam_lock bl; if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { h2_blist_cleanup(&beam->purge_list); r_purge_sent(beam); beam_close(beam); report_consumption(beam, &bl); leave_yellow(beam, &bl); Loading Loading @@ -659,6 +757,17 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block) return status; } static void move_to_hold(h2_bucket_beam *beam, apr_bucket_brigade *sender_bb) { apr_bucket *b; while (sender_bb && !APR_BRIGADE_EMPTY(sender_bb)) { b = APR_BRIGADE_FIRST(sender_bb); APR_BUCKET_REMOVE(b); H2_BLIST_INSERT_TAIL(&beam->send_list, b); } } static apr_status_t append_bucket(h2_bucket_beam *beam, apr_bucket *b, apr_read_type_e block, Loading @@ -680,19 +789,6 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, if (APR_BUCKET_IS_EOS(b)) { beam->closed = 1; } if (AP_BUCKET_IS_EOR(b)) { /* The problem with EOR buckets: * - we cannot delete it now, as it will destroy the request pool * and free data that we are still holding in the beam. * - if we add it to the send_list, as all other buckets, * it will most likely not be read, as an EOS came before. * This means we still juggle it when the beam is destroyed, * and rarely this seems to cause the pool to be freed twice... * if asan stack traces are to be believed... * - since we */ beam->closed = 1; } APR_BUCKET_REMOVE(b); H2_BLIST_INSERT_TAIL(&beam->send_list, b); return APR_SUCCESS; Loading Loading @@ -748,7 +844,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, /* this takes care of transient buckets and converts them * into heap ones. Other bucket types might or might not be * affected by this. */ status = apr_bucket_setaside(b, beam->pool); status = apr_bucket_setaside(b, beam->send_pool); } else if (APR_BUCKET_IS_HEAP(b)) { /* For heap buckets read from a receiver thread is fine. The Loading @@ -768,7 +864,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, } } else if (APR_BUCKET_IS_FILE(b) && can_beam) { status = apr_bucket_setaside(b, beam->pool); status = apr_bucket_setaside(b, beam->send_pool); } if (status == APR_ENOTIMPL) { Loading @@ -780,7 +876,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, * use pools/allocators safely. */ status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); if (status == APR_SUCCESS) { status = apr_bucket_setaside(b, beam->pool); status = apr_bucket_setaside(b, beam->send_pool); } } Loading @@ -795,6 +891,17 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, return APR_SUCCESS; } void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p) { h2_beam_lock bl; /* Called from the sender thread to add buckets to the beam */ if (enter_yellow(beam, &bl) == APR_SUCCESS) { r_purge_sent(beam); beam_set_send_pool(beam, p); leave_yellow(beam, &bl); } } apr_status_t h2_beam_send(h2_bucket_beam *beam, apr_bucket_brigade *sender_bb, apr_read_type_e block) Loading @@ -806,11 +913,11 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, /* Called from the sender thread to add buckets to the beam */ if (enter_yellow(beam, &bl) == APR_SUCCESS) { ap_assert(beam->pool); h2_blist_cleanup(&beam->purge_list); ap_assert(beam->send_pool); r_purge_sent(beam); if (beam->aborted) { brigade_move_to_blist(sender_bb, &beam->send_list); move_to_hold(beam, sender_bb); rv = APR_ECONNABORTED; } else if (sender_bb) { Loading @@ -820,7 +927,7 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) { if (space_left <= 0) { report_prod_io(beam, force_report, &bl); h2_blist_cleanup(&beam->purge_list); r_purge_sent(beam); rv = wait_not_full(beam, block, &space_left, &bl); if (APR_SUCCESS != rv) { break; Loading modules/http2/h2_bucket_beam.h +17 −0 Original line number Diff line number Diff line Loading @@ -150,6 +150,11 @@ typedef struct { typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam, apr_file_t *file); typedef enum { H2_BEAM_OWNER_SEND, H2_BEAM_OWNER_RECV } h2_beam_owner_t; /** * Will deny all transfer of apr_file_t across the beam and force * a data copy instead. Loading @@ -160,11 +165,13 @@ struct h2_bucket_beam { int id; const char *tag; apr_pool_t *pool; h2_beam_owner_t owner; h2_blist send_list; h2_blist hold_list; h2_blist purge_list; apr_bucket_brigade *recv_buffer; h2_bproxy_list proxies; apr_pool_t *send_pool; apr_pool_t *recv_pool; apr_size_t max_buf_size; Loading Loading @@ -208,6 +215,8 @@ struct h2_bucket_beam { * @param pool pool owning the beam, beam will cleanup when pool released * @param id identifier of the beam * @param tag tag identifying beam for logging * @param owner if the beam is owned by the sender or receiver, e.g. if * the pool owner is using this beam for sending or receiving * @param buffer_size maximum memory footprint of buckets buffered in beam, or * 0 for no limitation * @param timeout timeout for blocking operations Loading @@ -215,6 +224,7 @@ struct h2_bucket_beam { apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, int id, const char *tag, h2_beam_owner_t owner, apr_size_t buffer_size, apr_interval_time_t timeout); Loading @@ -235,6 +245,13 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, apr_bucket_brigade *bb, apr_read_type_e block); /** * Register the pool from which future buckets are send. This defines * the lifetime of the buckets, e.g. the pool should not be cleared/destroyed * until the data is no longer needed (or has been received). */ void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p); /** * Receive buckets from the beam into the given brigade. Will return APR_EOF * when reading past an EOS bucket. Reads can be blocking until data is Loading modules/http2/h2_bucket_eos.c +30 −13 Original line number Diff line number Diff line Loading @@ -24,17 +24,27 @@ #include "h2_private.h" #include "h2.h" #include "h2_ctx.h" #include "h2_mplx.h" #include "h2_session.h" #include "h2_stream.h" #include "h2_bucket_eos.h" typedef struct { apr_bucket_refcount refcount; conn_rec *c; int stream_id; h2_stream *stream; } h2_bucket_eos; static apr_status_t bucket_cleanup(void *data) { h2_stream **pstream = data; if (*pstream) { /* If bucket_destroy is called after us, this prevents * bucket_destroy from trying to destroy the stream again. */ *pstream = NULL; } return APR_SUCCESS; } static apr_status_t bucket_read(apr_bucket *b, const char **str, apr_size_t *len, apr_read_type_e block) { Loading @@ -45,13 +55,12 @@ static apr_status_t bucket_read(apr_bucket *b, const char **str, return APR_SUCCESS; } apr_bucket *h2_bucket_eos_make(apr_bucket *b, conn_rec *c, int stream_id) apr_bucket *h2_bucket_eos_make(apr_bucket *b, h2_stream *stream) { h2_bucket_eos *h; h = apr_bucket_alloc(sizeof(*h), b->list); h->c = c; h->stream_id = stream_id; h->stream = stream; b = apr_bucket_shared_make(b, h, 0, 0); b->type = &h2_bucket_type_eos; Loading @@ -59,27 +68,35 @@ apr_bucket *h2_bucket_eos_make(apr_bucket *b, conn_rec *c, int stream_id) return b; } apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list, conn_rec *c, int stream_id) apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list, h2_stream *stream) { apr_bucket *b = apr_bucket_alloc(sizeof(*b), list); APR_BUCKET_INIT(b); b->free = apr_bucket_free; b->list = list; b = h2_bucket_eos_make(b, c, stream_id); b = h2_bucket_eos_make(b, stream); if (stream) { h2_bucket_eos *h = b->data; apr_pool_pre_cleanup_register(stream->pool, &h->stream, bucket_cleanup); } return b; } static void bucket_destroy(void *data) { h2_bucket_eos *h = data; h2_session *session; if (apr_bucket_shared_destroy(h)) { if ((session = h2_ctx_get_session(h->c))) { h2_session_eos_sent(session, h->stream_id); h2_stream *stream = h->stream; if (stream && stream->pool) { apr_pool_cleanup_kill(stream->pool, &h->stream, bucket_cleanup); } apr_bucket_free(h); if (stream) { h2_stream_dispatch(stream, H2_SEV_EOS_SENT); } } } Loading Loading
CHANGES +0 −3 Original line number Diff line number Diff line -*- coding: utf-8 -*- Changes with Apache 2.5.1 *) mod_http2: internal code cleanups and simplifications. Common output code for h2 and h2c protocols, using nested mutex locks for simplified calls. [Stefan Eissing] *) mod_proxy/ssl: Proxy SSL client certificate configuration and other proxy SSL configurations broken inside <Proxy> context. PR 63430. [Ruediger Pluem, Yann Ylavic] Loading
modules/http2/h2.h +0 −12 Original line number Diff line number Diff line Loading @@ -112,7 +112,6 @@ typedef enum h2_stream_state_t { H2_SS_CLOSED_L, H2_SS_CLOSED, H2_SS_CLEANUP, H2_SS_DESTROYED, H2_SS_MAX } h2_stream_state_t; Loading @@ -124,17 +123,6 @@ typedef enum { H2_SEV_IN_DATA_PENDING, } h2_stream_event_t; typedef enum { H2_PS_NONE, H2_PS_QUEUED, H2_PS_RUNNING, H2_PS_FINISHED, } h2_processing_state_t; #define H2_PS_IS_RUNNING(s) ((s) == H2_PS_RUNNING) #define H2_PS_IS_NOT_RUNNING(s) ((s) != H2_PS_RUNNING) #define H2_PS_IS_WAS_STARTED(s) ((s) >= H2_PS_RUNNING) #define H2_PS_IS_HAS_FINISHED(s) ((s) == H2_PS_FINISHED) /* h2_request is the transformer of HTTP2 streams into HTTP/1.1 internal * format that will be fed to various httpd input filters to finally Loading
modules/http2/h2_bucket_beam.c +165 −58 Original line number Diff line number Diff line Loading @@ -24,7 +24,6 @@ #include <httpd.h> #include <http_protocol.h> #include <http_request.h> #include <http_log.h> #include "h2_private.h" Loading Loading @@ -155,30 +154,6 @@ const apr_bucket_type_t h2_bucket_type_beam = { * h2_blist, a brigade without allocations ******************************************************************************/ static void h2_blist_cleanup(h2_blist *bl) { apr_bucket *e; while (!H2_BLIST_EMPTY(bl)) { e = H2_BLIST_FIRST(bl); apr_bucket_delete(e); } } static void brigade_move_to_blist(apr_bucket_brigade *bb, h2_blist *list) { apr_bucket *b; while (bb && !APR_BRIGADE_EMPTY(bb)) { b = APR_BRIGADE_FIRST(bb); APR_BUCKET_REMOVE(b); H2_BLIST_INSERT_TAIL(list, b); } } /******************************************************************************* * bucket beamer registration ******************************************************************************/ static apr_array_header_t *beamers; static apr_status_t cleanup_beamers(void *dummy) Loading Loading @@ -315,6 +290,17 @@ static apr_size_t calc_buffered(h2_bucket_beam *beam) return len; } static void r_purge_sent(h2_bucket_beam *beam) { apr_bucket *b; /* delete all sender buckets in purge brigade, needs to be called * from sender thread only */ while (!H2_BLIST_EMPTY(&beam->purge_list)) { b = H2_BLIST_FIRST(&beam->purge_list); apr_bucket_delete(b); } } static apr_size_t calc_space_left(h2_bucket_beam *beam) { if (beam->max_buf_size > 0) { Loading Loading @@ -449,7 +435,7 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) } else { /* it should be there unless we screwed up */ ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->pool, ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->send_pool, APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not " "in hold, n=%d", beam->id, beam->tag, (int)proxy->n); Loading @@ -458,7 +444,7 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) } /* notify anyone waiting on space to become available */ if (!bl.mutex) { h2_blist_cleanup(&beam->purge_list); r_purge_sent(beam); } else { apr_thread_cond_broadcast(beam->change); Loading @@ -467,6 +453,16 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) } } static void h2_blist_cleanup(h2_blist *bl) { apr_bucket *e; while (!H2_BLIST_EMPTY(bl)) { e = H2_BLIST_FIRST(bl); apr_bucket_delete(e); } } static apr_status_t beam_close(h2_bucket_beam *beam) { if (!beam->closed) { Loading @@ -481,10 +477,40 @@ int h2_beam_is_closed(h2_bucket_beam *beam) return beam->closed; } static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool, apr_status_t (*cleanup)(void *)) { if (pool && pool != beam->pool) { apr_pool_pre_cleanup_register(pool, beam, cleanup); return 1; } return 0; } static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool, apr_status_t (*cleanup)(void *)) { if (pool && pool != beam->pool) { apr_pool_cleanup_kill(pool, beam, cleanup); return 1; } return 0; } static apr_status_t beam_recv_cleanup(void *data) { h2_bucket_beam *beam = data; /* receiver pool has gone away, clear references */ beam->recv_buffer = NULL; beam->recv_pool = NULL; return APR_SUCCESS; } static apr_status_t beam_send_cleanup(void *data) { h2_bucket_beam *beam = data; /* sender is going away, clear up all references to its memory */ r_purge_sent(beam); h2_blist_cleanup(&beam->send_list); report_consumption(beam, NULL); while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) { h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies); Loading @@ -494,10 +520,22 @@ static apr_status_t beam_send_cleanup(void *data) } h2_blist_cleanup(&beam->purge_list); h2_blist_cleanup(&beam->hold_list); h2_blist_cleanup(&beam->send_list); beam->send_pool = NULL; return APR_SUCCESS; } static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool) { if (beam->send_pool != pool) { if (beam->send_pool && beam->send_pool != beam->pool) { pool_kill(beam, beam->send_pool, beam_send_cleanup); beam_send_cleanup(beam); } beam->send_pool = pool; pool_register(beam, beam->send_pool, beam_send_cleanup); } } static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl) { if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) { Loading @@ -521,18 +559,74 @@ static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl) } } apr_status_t h2_beam_destroy(h2_bucket_beam *beam) static apr_status_t beam_cleanup(h2_bucket_beam *beam, int from_pool) { /* no more io callbacks */ apr_status_t status = APR_SUCCESS; int safe_send = (beam->owner == H2_BEAM_OWNER_SEND); int safe_recv = (beam->owner == H2_BEAM_OWNER_RECV); /* * Owner of the beam is going away, depending on which side it owns, * cleanup strategies will differ. * * In general, receiver holds references to memory from sender. * Clean up receiver first, if safe, then cleanup sender, if safe. */ /* When called from pool destroy, io callbacks are disabled */ if (from_pool) { beam->cons_io_cb = NULL; } /* When modify send is not safe, this means we still have multi-thread * protection and the owner is receiving the buckets. If the sending * side has not gone away, this means we could have dangling buckets * in our lists that never get destroyed. This should not happen. */ ap_assert(safe_send || !beam->send_pool); if (!H2_BLIST_EMPTY(&beam->send_list)) { ap_assert(beam->send_pool); } if (safe_recv) { if (beam->recv_pool) { pool_kill(beam, beam->recv_pool, beam_recv_cleanup); beam->recv_pool = NULL; } recv_buffer_cleanup(beam, NULL); } else { beam->recv_buffer = NULL; beam->recv_pool = NULL; } if (safe_send && beam->send_pool) { pool_kill(beam, beam->send_pool, beam_send_cleanup); status = beam_send_cleanup(beam); } if (safe_recv) { ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies)); ap_assert(H2_BLIST_EMPTY(&beam->send_list)); ap_assert(H2_BLIST_EMPTY(&beam->hold_list)); ap_assert(H2_BLIST_EMPTY(&beam->purge_list)); } return status; } static apr_status_t beam_pool_cleanup(void *data) { return beam_cleanup(data, 1); } return beam_send_cleanup(beam); apr_status_t h2_beam_destroy(h2_bucket_beam *beam) { apr_pool_cleanup_kill(beam->pool, beam, beam_pool_cleanup); return beam_cleanup(beam, 0); } apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, int id, const char *tag, h2_beam_owner_t owner, apr_size_t max_buf_size, apr_interval_time_t timeout) { Loading @@ -547,6 +641,7 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, beam->id = id; beam->tag = tag; beam->pool = pool; beam->owner = owner; H2_BLIST_INIT(&beam->send_list); H2_BLIST_INIT(&beam->hold_list); H2_BLIST_INIT(&beam->purge_list); Loading @@ -555,11 +650,14 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, beam->max_buf_size = max_buf_size; beam->timeout = timeout; rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_NESTED, pool); rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool); if (APR_SUCCESS == rv) { rv = apr_thread_cond_create(&beam->change, pool); if (APR_SUCCESS == rv) { apr_pool_pre_cleanup_register(pool, beam, beam_pool_cleanup); *pbeam = beam; } } *pbeam = (APR_SUCCESS == rv)? beam : NULL; return rv; } Loading Loading @@ -613,7 +711,7 @@ void h2_beam_abort(h2_bucket_beam *beam) if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { beam->aborted = 1; h2_blist_cleanup(&beam->purge_list); r_purge_sent(beam); h2_blist_cleanup(&beam->send_list); report_consumption(beam, &bl); apr_thread_cond_broadcast(beam->change); Loading @@ -626,7 +724,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam) h2_beam_lock bl; if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { h2_blist_cleanup(&beam->purge_list); r_purge_sent(beam); beam_close(beam); report_consumption(beam, &bl); leave_yellow(beam, &bl); Loading Loading @@ -659,6 +757,17 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block) return status; } static void move_to_hold(h2_bucket_beam *beam, apr_bucket_brigade *sender_bb) { apr_bucket *b; while (sender_bb && !APR_BRIGADE_EMPTY(sender_bb)) { b = APR_BRIGADE_FIRST(sender_bb); APR_BUCKET_REMOVE(b); H2_BLIST_INSERT_TAIL(&beam->send_list, b); } } static apr_status_t append_bucket(h2_bucket_beam *beam, apr_bucket *b, apr_read_type_e block, Loading @@ -680,19 +789,6 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, if (APR_BUCKET_IS_EOS(b)) { beam->closed = 1; } if (AP_BUCKET_IS_EOR(b)) { /* The problem with EOR buckets: * - we cannot delete it now, as it will destroy the request pool * and free data that we are still holding in the beam. * - if we add it to the send_list, as all other buckets, * it will most likely not be read, as an EOS came before. * This means we still juggle it when the beam is destroyed, * and rarely this seems to cause the pool to be freed twice... * if asan stack traces are to be believed... * - since we */ beam->closed = 1; } APR_BUCKET_REMOVE(b); H2_BLIST_INSERT_TAIL(&beam->send_list, b); return APR_SUCCESS; Loading Loading @@ -748,7 +844,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, /* this takes care of transient buckets and converts them * into heap ones. Other bucket types might or might not be * affected by this. */ status = apr_bucket_setaside(b, beam->pool); status = apr_bucket_setaside(b, beam->send_pool); } else if (APR_BUCKET_IS_HEAP(b)) { /* For heap buckets read from a receiver thread is fine. The Loading @@ -768,7 +864,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, } } else if (APR_BUCKET_IS_FILE(b) && can_beam) { status = apr_bucket_setaside(b, beam->pool); status = apr_bucket_setaside(b, beam->send_pool); } if (status == APR_ENOTIMPL) { Loading @@ -780,7 +876,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, * use pools/allocators safely. */ status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); if (status == APR_SUCCESS) { status = apr_bucket_setaside(b, beam->pool); status = apr_bucket_setaside(b, beam->send_pool); } } Loading @@ -795,6 +891,17 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, return APR_SUCCESS; } void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p) { h2_beam_lock bl; /* Called from the sender thread to add buckets to the beam */ if (enter_yellow(beam, &bl) == APR_SUCCESS) { r_purge_sent(beam); beam_set_send_pool(beam, p); leave_yellow(beam, &bl); } } apr_status_t h2_beam_send(h2_bucket_beam *beam, apr_bucket_brigade *sender_bb, apr_read_type_e block) Loading @@ -806,11 +913,11 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, /* Called from the sender thread to add buckets to the beam */ if (enter_yellow(beam, &bl) == APR_SUCCESS) { ap_assert(beam->pool); h2_blist_cleanup(&beam->purge_list); ap_assert(beam->send_pool); r_purge_sent(beam); if (beam->aborted) { brigade_move_to_blist(sender_bb, &beam->send_list); move_to_hold(beam, sender_bb); rv = APR_ECONNABORTED; } else if (sender_bb) { Loading @@ -820,7 +927,7 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) { if (space_left <= 0) { report_prod_io(beam, force_report, &bl); h2_blist_cleanup(&beam->purge_list); r_purge_sent(beam); rv = wait_not_full(beam, block, &space_left, &bl); if (APR_SUCCESS != rv) { break; Loading
modules/http2/h2_bucket_beam.h +17 −0 Original line number Diff line number Diff line Loading @@ -150,6 +150,11 @@ typedef struct { typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam, apr_file_t *file); typedef enum { H2_BEAM_OWNER_SEND, H2_BEAM_OWNER_RECV } h2_beam_owner_t; /** * Will deny all transfer of apr_file_t across the beam and force * a data copy instead. Loading @@ -160,11 +165,13 @@ struct h2_bucket_beam { int id; const char *tag; apr_pool_t *pool; h2_beam_owner_t owner; h2_blist send_list; h2_blist hold_list; h2_blist purge_list; apr_bucket_brigade *recv_buffer; h2_bproxy_list proxies; apr_pool_t *send_pool; apr_pool_t *recv_pool; apr_size_t max_buf_size; Loading Loading @@ -208,6 +215,8 @@ struct h2_bucket_beam { * @param pool pool owning the beam, beam will cleanup when pool released * @param id identifier of the beam * @param tag tag identifying beam for logging * @param owner if the beam is owned by the sender or receiver, e.g. if * the pool owner is using this beam for sending or receiving * @param buffer_size maximum memory footprint of buckets buffered in beam, or * 0 for no limitation * @param timeout timeout for blocking operations Loading @@ -215,6 +224,7 @@ struct h2_bucket_beam { apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, int id, const char *tag, h2_beam_owner_t owner, apr_size_t buffer_size, apr_interval_time_t timeout); Loading @@ -235,6 +245,13 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, apr_bucket_brigade *bb, apr_read_type_e block); /** * Register the pool from which future buckets are send. This defines * the lifetime of the buckets, e.g. the pool should not be cleared/destroyed * until the data is no longer needed (or has been received). */ void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p); /** * Receive buckets from the beam into the given brigade. Will return APR_EOF * when reading past an EOS bucket. Reads can be blocking until data is Loading
modules/http2/h2_bucket_eos.c +30 −13 Original line number Diff line number Diff line Loading @@ -24,17 +24,27 @@ #include "h2_private.h" #include "h2.h" #include "h2_ctx.h" #include "h2_mplx.h" #include "h2_session.h" #include "h2_stream.h" #include "h2_bucket_eos.h" typedef struct { apr_bucket_refcount refcount; conn_rec *c; int stream_id; h2_stream *stream; } h2_bucket_eos; static apr_status_t bucket_cleanup(void *data) { h2_stream **pstream = data; if (*pstream) { /* If bucket_destroy is called after us, this prevents * bucket_destroy from trying to destroy the stream again. */ *pstream = NULL; } return APR_SUCCESS; } static apr_status_t bucket_read(apr_bucket *b, const char **str, apr_size_t *len, apr_read_type_e block) { Loading @@ -45,13 +55,12 @@ static apr_status_t bucket_read(apr_bucket *b, const char **str, return APR_SUCCESS; } apr_bucket *h2_bucket_eos_make(apr_bucket *b, conn_rec *c, int stream_id) apr_bucket *h2_bucket_eos_make(apr_bucket *b, h2_stream *stream) { h2_bucket_eos *h; h = apr_bucket_alloc(sizeof(*h), b->list); h->c = c; h->stream_id = stream_id; h->stream = stream; b = apr_bucket_shared_make(b, h, 0, 0); b->type = &h2_bucket_type_eos; Loading @@ -59,27 +68,35 @@ apr_bucket *h2_bucket_eos_make(apr_bucket *b, conn_rec *c, int stream_id) return b; } apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list, conn_rec *c, int stream_id) apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list, h2_stream *stream) { apr_bucket *b = apr_bucket_alloc(sizeof(*b), list); APR_BUCKET_INIT(b); b->free = apr_bucket_free; b->list = list; b = h2_bucket_eos_make(b, c, stream_id); b = h2_bucket_eos_make(b, stream); if (stream) { h2_bucket_eos *h = b->data; apr_pool_pre_cleanup_register(stream->pool, &h->stream, bucket_cleanup); } return b; } static void bucket_destroy(void *data) { h2_bucket_eos *h = data; h2_session *session; if (apr_bucket_shared_destroy(h)) { if ((session = h2_ctx_get_session(h->c))) { h2_session_eos_sent(session, h->stream_id); h2_stream *stream = h->stream; if (stream && stream->pool) { apr_pool_cleanup_kill(stream->pool, &h->stream, bucket_cleanup); } apr_bucket_free(h); if (stream) { h2_stream_dispatch(stream, H2_SEV_EOS_SENT); } } } Loading