Loading modules/http2/h2_bucket_beam.c +4 −0 Original line number Diff line number Diff line Loading @@ -1035,7 +1035,11 @@ transfer: ++transferred; } else { /* let outside hook determine how bucket is beamed */ leave_yellow(beam, &bl); brecv = h2_beam_bucket(beam, bb, bsender); enter_yellow(beam, &bl); while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) { ++transferred; remain -= brecv->length; Loading modules/http2/h2_mplx.c +30 −27 Original line number Diff line number Diff line Loading @@ -319,12 +319,14 @@ static int stream_destroy_iter(void *ctx, void *val) return 0; } static void purge_streams(h2_mplx *m) static void purge_streams(h2_mplx *m, int lock) { if (!h2_ihash_empty(m->spurge)) { H2_MPLX_ENTER_MAYBE(m, lock); while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) { /* repeat until empty */ } H2_MPLX_LEAVE_MAYBE(m, lock); } } Loading Loading @@ -587,7 +589,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, status = APR_SUCCESS; } else { purge_streams(m); purge_streams(m, 0); h2_ihash_iter(m->streams, report_consumption_iter, m); m->added_output = iowait; status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); Loading Loading @@ -816,35 +818,37 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) } stream = h2_ihash_get(m->streams, task->stream_id); if (stream && !m->aborted && h2_ihash_get(m->sredo, stream->id)) { if (stream) { /* stream not done yet. */ if (!m->aborted && h2_ihash_get(m->sredo, stream->id)) { /* reset and schedule again */ h2_task_redo(task); h2_ihash_remove(m->sredo, stream->id); h2_iq_add(m->q, stream->id, NULL, NULL); return; } if (stream) { else { /* stream not cleaned up, stay around */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, H2_STRM_MSG(stream, "task_done, stream open")); /* more data will not arrive, resume the stream */ check_data_for(m, stream, 0); if (stream->input) { h2_beam_mutex_disable(stream->input); h2_beam_leave(stream->input); h2_beam_mutex_disable(stream->input); } if (stream->output) { h2_beam_mutex_disable(stream->output); } check_data_for(m, stream, 0); } } else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) { /* stream is done, was just waiting for this. */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, H2_STRM_MSG(stream, "task_done, in hold")); /* stream was just waiting for us. */ if (stream->input) { h2_beam_mutex_disable(stream->input); h2_beam_leave(stream->input); h2_beam_mutex_disable(stream->input); } if (stream->output) { h2_beam_mutex_disable(stream->output); Loading @@ -870,6 +874,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) task_done(m, task, NULL); --m->tasks_active; if (m->join_wait) { apr_thread_cond_signal(m->join_wait); } Loading Loading @@ -1145,6 +1150,7 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn, && !h2_ihash_get(m->sredo, stream->id)) { h2_ihash_add(m->sredo, stream); } if (task->engine) { /* cannot report that as done until engine returns */ } Loading Loading @@ -1178,6 +1184,7 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, /* update input windows for streams */ h2_ihash_iter(m->streams, report_consumption_iter, m); purge_streams(m, 1); n = h2_fifo_count(m->readyq); while (n > 0 Loading @@ -1186,10 +1193,6 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, on_resume(on_ctx, stream); } H2_MPLX_ENTER(m); purge_streams(m); H2_MPLX_LEAVE(m); return APR_SUCCESS; } Loading modules/http2/h2_task.c +1 −1 Original line number Diff line number Diff line Loading @@ -383,7 +383,7 @@ static apr_status_t h2_filter_parse_h1(ap_filter_t* f, apr_bucket_brigade* bb) /* There are cases where we need to parse a serialized http/1.1 * response. One example is a 100-continue answer in serialized mode * or via a mod_proxy setup */ while (!task->output.sent_response) { while (bb && !task->output.sent_response) { status = h2_from_h1_parse_response(task, f, bb); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c, "h2_task(%s): parsed response", task->id); Loading modules/http2/h2_util.c +22 −23 Original line number Diff line number Diff line Loading @@ -793,6 +793,22 @@ apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem) return fifo_push(fifo, elem, 0); } static void *pull_head(h2_fifo *fifo) { void *elem; ap_assert(fifo->count > 0); elem = fifo->elems[fifo->head]; --fifo->count; if (fifo->count > 0) { fifo->head = nth_index(fifo, 1); if (fifo->count+1 == fifo->nelems) { apr_thread_cond_broadcast(fifo->not_full); } } return elem; } static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block) { apr_status_t rv; Loading @@ -809,14 +825,8 @@ static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block) } ap_assert(fifo->count > 0); *pelem = fifo->elems[fifo->head]; --fifo->count; if (fifo->count > 0) { fifo->head = nth_index(fifo, 1); if (fifo->count+1 == fifo->nelems) { apr_thread_cond_broadcast(fifo->not_full); } } *pelem = pull_head(fifo); apr_thread_mutex_unlock(fifo->lock); } return rv; Loading Loading @@ -848,29 +858,18 @@ static apr_status_t fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx, int } ap_assert(fifo->count > 0); elem = fifo->elems[fifo->head]; elem = pull_head(fifo); apr_thread_mutex_unlock(fifo->lock); switch (fn(elem, ctx)) { case H2_FIFO_OP_PULL: --fifo->count; if (fifo->count > 0) { fifo->head = nth_index(fifo, 1); if (fifo->count+1 == fifo->nelems) { apr_thread_cond_broadcast(fifo->not_full); } } break; case H2_FIFO_OP_REPUSH: if (fifo->count > 1) { fifo->head = nth_index(fifo, 1); if (fifo->count < fifo->nelems) { fifo->elems[nth_index(fifo, fifo->count-1)] = elem; } } return h2_fifo_push(fifo, elem); break; } apr_thread_mutex_unlock(fifo->lock); } return rv; } Loading modules/http2/h2_workers.c +3 −2 Original line number Diff line number Diff line Loading @@ -249,8 +249,7 @@ static apr_status_t workers_pool_cleanup(void *data) if (!workers->aborted) { workers->aborted = 1; /* before we go, cleanup any zombies and abort the rest */ cleanup_zombies(workers); /* abort all idle slots */ for (;;) { slot = pop_slot(&workers->idle); if (slot) { Loading @@ -266,6 +265,8 @@ static apr_status_t workers_pool_cleanup(void *data) h2_fifo_term(workers->mplxs); h2_fifo_interrupt(workers->mplxs); cleanup_zombies(workers); } return APR_SUCCESS; } Loading Loading
modules/http2/h2_bucket_beam.c +4 −0 Original line number Diff line number Diff line Loading @@ -1035,7 +1035,11 @@ transfer: ++transferred; } else { /* let outside hook determine how bucket is beamed */ leave_yellow(beam, &bl); brecv = h2_beam_bucket(beam, bb, bsender); enter_yellow(beam, &bl); while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) { ++transferred; remain -= brecv->length; Loading
modules/http2/h2_mplx.c +30 −27 Original line number Diff line number Diff line Loading @@ -319,12 +319,14 @@ static int stream_destroy_iter(void *ctx, void *val) return 0; } static void purge_streams(h2_mplx *m) static void purge_streams(h2_mplx *m, int lock) { if (!h2_ihash_empty(m->spurge)) { H2_MPLX_ENTER_MAYBE(m, lock); while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) { /* repeat until empty */ } H2_MPLX_LEAVE_MAYBE(m, lock); } } Loading Loading @@ -587,7 +589,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, status = APR_SUCCESS; } else { purge_streams(m); purge_streams(m, 0); h2_ihash_iter(m->streams, report_consumption_iter, m); m->added_output = iowait; status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); Loading Loading @@ -816,35 +818,37 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) } stream = h2_ihash_get(m->streams, task->stream_id); if (stream && !m->aborted && h2_ihash_get(m->sredo, stream->id)) { if (stream) { /* stream not done yet. */ if (!m->aborted && h2_ihash_get(m->sredo, stream->id)) { /* reset and schedule again */ h2_task_redo(task); h2_ihash_remove(m->sredo, stream->id); h2_iq_add(m->q, stream->id, NULL, NULL); return; } if (stream) { else { /* stream not cleaned up, stay around */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, H2_STRM_MSG(stream, "task_done, stream open")); /* more data will not arrive, resume the stream */ check_data_for(m, stream, 0); if (stream->input) { h2_beam_mutex_disable(stream->input); h2_beam_leave(stream->input); h2_beam_mutex_disable(stream->input); } if (stream->output) { h2_beam_mutex_disable(stream->output); } check_data_for(m, stream, 0); } } else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) { /* stream is done, was just waiting for this. */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, H2_STRM_MSG(stream, "task_done, in hold")); /* stream was just waiting for us. */ if (stream->input) { h2_beam_mutex_disable(stream->input); h2_beam_leave(stream->input); h2_beam_mutex_disable(stream->input); } if (stream->output) { h2_beam_mutex_disable(stream->output); Loading @@ -870,6 +874,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) task_done(m, task, NULL); --m->tasks_active; if (m->join_wait) { apr_thread_cond_signal(m->join_wait); } Loading Loading @@ -1145,6 +1150,7 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn, && !h2_ihash_get(m->sredo, stream->id)) { h2_ihash_add(m->sredo, stream); } if (task->engine) { /* cannot report that as done until engine returns */ } Loading Loading @@ -1178,6 +1184,7 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, /* update input windows for streams */ h2_ihash_iter(m->streams, report_consumption_iter, m); purge_streams(m, 1); n = h2_fifo_count(m->readyq); while (n > 0 Loading @@ -1186,10 +1193,6 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, on_resume(on_ctx, stream); } H2_MPLX_ENTER(m); purge_streams(m); H2_MPLX_LEAVE(m); return APR_SUCCESS; } Loading
modules/http2/h2_task.c +1 −1 Original line number Diff line number Diff line Loading @@ -383,7 +383,7 @@ static apr_status_t h2_filter_parse_h1(ap_filter_t* f, apr_bucket_brigade* bb) /* There are cases where we need to parse a serialized http/1.1 * response. One example is a 100-continue answer in serialized mode * or via a mod_proxy setup */ while (!task->output.sent_response) { while (bb && !task->output.sent_response) { status = h2_from_h1_parse_response(task, f, bb); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c, "h2_task(%s): parsed response", task->id); Loading
modules/http2/h2_util.c +22 −23 Original line number Diff line number Diff line Loading @@ -793,6 +793,22 @@ apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem) return fifo_push(fifo, elem, 0); } static void *pull_head(h2_fifo *fifo) { void *elem; ap_assert(fifo->count > 0); elem = fifo->elems[fifo->head]; --fifo->count; if (fifo->count > 0) { fifo->head = nth_index(fifo, 1); if (fifo->count+1 == fifo->nelems) { apr_thread_cond_broadcast(fifo->not_full); } } return elem; } static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block) { apr_status_t rv; Loading @@ -809,14 +825,8 @@ static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block) } ap_assert(fifo->count > 0); *pelem = fifo->elems[fifo->head]; --fifo->count; if (fifo->count > 0) { fifo->head = nth_index(fifo, 1); if (fifo->count+1 == fifo->nelems) { apr_thread_cond_broadcast(fifo->not_full); } } *pelem = pull_head(fifo); apr_thread_mutex_unlock(fifo->lock); } return rv; Loading Loading @@ -848,29 +858,18 @@ static apr_status_t fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx, int } ap_assert(fifo->count > 0); elem = fifo->elems[fifo->head]; elem = pull_head(fifo); apr_thread_mutex_unlock(fifo->lock); switch (fn(elem, ctx)) { case H2_FIFO_OP_PULL: --fifo->count; if (fifo->count > 0) { fifo->head = nth_index(fifo, 1); if (fifo->count+1 == fifo->nelems) { apr_thread_cond_broadcast(fifo->not_full); } } break; case H2_FIFO_OP_REPUSH: if (fifo->count > 1) { fifo->head = nth_index(fifo, 1); if (fifo->count < fifo->nelems) { fifo->elems[nth_index(fifo, fifo->count-1)] = elem; } } return h2_fifo_push(fifo, elem); break; } apr_thread_mutex_unlock(fifo->lock); } return rv; } Loading
modules/http2/h2_workers.c +3 −2 Original line number Diff line number Diff line Loading @@ -249,8 +249,7 @@ static apr_status_t workers_pool_cleanup(void *data) if (!workers->aborted) { workers->aborted = 1; /* before we go, cleanup any zombies and abort the rest */ cleanup_zombies(workers); /* abort all idle slots */ for (;;) { slot = pop_slot(&workers->idle); if (slot) { Loading @@ -266,6 +265,8 @@ static apr_status_t workers_pool_cleanup(void *data) h2_fifo_term(workers->mplxs); h2_fifo_interrupt(workers->mplxs); cleanup_zombies(workers); } return APR_SUCCESS; } Loading