Commit 14260889 authored by Stefan Eissing's avatar Stefan Eissing
Browse files

Merge of 1761434,1761477 from trunk:

mod_http2: fix for output blocking race condition



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1761478 13f79535-47bb-0310-9956-ffa450edef68
parent 008f614d
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -2,6 +2,9 @@

Changes with Apache 2.4.24

  *) mod_http2: fix suspended handling for streams. Output could become
     blocked in rare cases.

  *) mpm_winnt: Prevent a denial of service when the 'data' AcceptFilter is in
     use by replacing it with the 'connect' filter. PR 59970. [Jacob Champion]

+31 −23
Original line number Diff line number Diff line
@@ -223,6 +223,28 @@ static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
    }
}

static void report_consumption(h2_bucket_beam *beam, int force)
{
    if (force || beam->received_bytes != beam->reported_consumed_bytes) {
        if (beam->consumed_fn) { 
            beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
                              - beam->reported_consumed_bytes);
        }
        beam->reported_consumed_bytes = beam->received_bytes;
    }
}

static void report_production(h2_bucket_beam *beam, int force)
{
    if (force || beam->sent_bytes != beam->reported_produced_bytes) {
        if (beam->produced_fn) { 
            beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
                              - beam->reported_produced_bytes);
        }
        beam->reported_produced_bytes = beam->sent_bytes;
    }
}

static apr_off_t calc_buffered(h2_bucket_beam *beam)
{
    apr_off_t len = 0;
@@ -279,7 +301,9 @@ static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
    *premain = calc_space_left(beam);
    while (!beam->aborted && *premain <= 0 
           && (block == APR_BLOCK_READ) && pbl->mutex) {
        apr_status_t status = wait_cond(beam, pbl->mutex);
        apr_status_t status;
        report_production(beam, 1);
        status = wait_cond(beam, pbl->mutex);
        if (APR_STATUS_IS_TIMEUP(status)) {
            return status;
        }
@@ -356,28 +380,6 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
    }
}

static void report_consumption(h2_bucket_beam *beam, int force)
{
    if (force || beam->received_bytes != beam->reported_consumed_bytes) {
        if (beam->consumed_fn) { 
            beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
                              - beam->reported_consumed_bytes);
        }
        beam->reported_consumed_bytes = beam->received_bytes;
    }
}

static void report_production(h2_bucket_beam *beam, int force)
{
    if (force || beam->sent_bytes != beam->reported_produced_bytes) {
        if (beam->produced_fn) { 
            beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
                              - beam->reported_produced_bytes);
        }
        beam->reported_produced_bytes = beam->sent_bytes;
    }
}

static void h2_blist_cleanup(h2_blist *bl)
{
    apr_bucket *e;
@@ -877,6 +879,9 @@ transfer:
        }
        
        if (transferred) {
            if (beam->m_cond) {
                apr_thread_cond_broadcast(beam->m_cond);
            }
            status = APR_SUCCESS;
        }
        else if (beam->closed) {
@@ -890,6 +895,9 @@ transfer:
            goto transfer;
        }
        else {
            if (beam->m_cond) {
                apr_thread_cond_broadcast(beam->m_cond);
            }
            status = APR_EAGAIN;
        }
leave:        
+58 −39
Original line number Diff line number Diff line
@@ -168,7 +168,7 @@ static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
    return 0;
}

static void have_out_data_for(h2_mplx *m, int stream_id);
static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response);
static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);

static void check_tx_reservation(h2_mplx *m) 
@@ -545,7 +545,7 @@ static int task_abort_connection(void *ctx, void *val)
    if (task->input.beam) {
        h2_beam_abort(task->input.beam);
    }
    if (task->output.beam) {
    if (task->worker_started && !task->worker_done && task->output.beam) {
        h2_beam_abort(task->output.beam);
    }
    return 1;
@@ -713,6 +713,23 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
    m->input_consumed_ctx = ctx;
}

static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
{
    h2_mplx *m = ctx;
    apr_status_t status;
    h2_stream *stream;
    int acquired;
    
    AP_DEBUG_ASSERT(m);
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
        stream = h2_ihash_get(m->streams, beam->id);
        if (stream) {
            have_out_data_for(m, stream, 0);
        }
        leave_mutex(m, acquired);
    }
}

static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
{
    apr_status_t status = APR_SUCCESS;
@@ -735,6 +752,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
        h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
        h2_beam_timeout_set(task->output.beam, m->stream_timeout);
        h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
        h2_beam_on_produced(task->output.beam, output_produced, m);
        m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
        if (!task->output.copy_files) {
            h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
@@ -743,13 +761,12 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
        task->output.opened = 1;
    }
    
    h2_ihash_add(m->sready, stream);
    if (response && response->http_status < 300) {
        /* we might see some file buckets in the output, see
         * if we have enough handles reserved. */
        check_tx_reservation(m);
    }
    have_out_data_for(m, stream_id);
    have_out_data_for(m, stream, 1);
    return status;
}

@@ -803,7 +820,7 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task)
                    APLOG_TRACE2);
    }
    output_consumed_signal(m, task);
    have_out_data_for(m, task->stream_id);
    have_out_data_for(m, stream, 0);
    return status;
}

@@ -837,14 +854,20 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
    return status;
}

static void have_out_data_for(h2_mplx *m, int stream_id)
static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response)
{
    (void)stream_id;
    AP_DEBUG_ASSERT(m);
    h2_ihash_t *set;
    ap_assert(m);
    ap_assert(stream);
    
    set = response?  m->sready : m->sresume;
    if (!h2_ihash_get(set, stream->id)) {
        h2_ihash_add(set, stream);
        if (m->added_output) {
            apr_thread_cond_signal(m->added_output);
        }
    }
}

apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
{
@@ -1071,11 +1094,8 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                          "h2_mplx(%s): task_done, stream still open", 
                          task->id);
            if (h2_stream_is_suspended(stream)) {
            /* more data will not arrive, resume the stream */
                h2_ihash_add(m->sresume, stream);
                have_out_data_for(m, stream->id);
            }
            have_out_data_for(m, stream, 0);
        }
        else {
            /* stream no longer active, was it placed in hold? */
@@ -1463,6 +1483,10 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
                ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
                              "h2_mplx(%ld-%d): on_resume", 
                              m->id, stream->id);
                task = h2_ihash_get(m->tasks, stream->id);
                if (task && task->rst_error) {
                    h2_stream_rst(stream, task->rst_error);
                }
                h2_stream_set_suspended(stream, 0);
                status = on_resume(on_ctx, stream->id);
            }
@@ -1473,47 +1497,42 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
    return status;
}

static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
{
    h2_mplx *m = ctx;
    apr_status_t status;
    h2_stream *stream;
    h2_task *task;
    int acquired;
    
    AP_DEBUG_ASSERT(m);
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
        stream = h2_ihash_get(m->streams, beam->id);
        if (stream && h2_stream_is_suspended(stream)) {
        stream = h2_ihash_get(m->streams, stream_id);
        if (stream && !h2_ihash_get(m->sresume, stream->id)) {
            /* not marked for resume again already */
            h2_stream_set_suspended(stream, 1);
            task = h2_ihash_get(m->tasks, stream->id);
            if (stream->started && (!task || task->worker_done)) {
                h2_ihash_add(m->sresume, stream);
            h2_beam_on_produced(beam, NULL, NULL);
            have_out_data_for(m, beam->id);
            }
        }
        leave_mutex(m, acquired);
    }
    return status;
}

apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
int h2_mplx_is_busy(h2_mplx *m)
{
    apr_status_t status;
    h2_stream *stream;
    h2_task *task;
    int acquired;
    int acquired, busy = 1;
    
    AP_DEBUG_ASSERT(m);
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
        stream = h2_ihash_get(m->streams, stream_id);
        if (stream) {
            h2_stream_set_suspended(stream, 1);
            task = h2_ihash_get(m->tasks, stream->id);
            if (stream->started && (!task || task->worker_done)) {
                h2_ihash_add(m->sresume, stream);
            }
            else if (task->output.beam) {
                /* register callback so that we can resume on new output */
                h2_beam_on_produced(task->output.beam, output_produced, m);
        if (h2_ihash_empty(m->streams)) {
            busy = 0;
        }
        if (h2_iq_empty(m->q) && h2_ihash_empty(m->tasks)) {
            busy = 0;
        }
        leave_mutex(m, acquired);
    }
    return status;
    return busy;
}
+2 −0
Original line number Diff line number Diff line
@@ -158,6 +158,8 @@ void h2_mplx_task_done(h2_mplx *m, struct h2_task *task, struct h2_task **ptask)
 */
apr_uint32_t h2_mplx_shutdown(h2_mplx *m);

int h2_mplx_is_busy(h2_mplx *m);

/*******************************************************************************
 * IO lifetime of streams.
 ******************************************************************************/
+15 −41
Original line number Diff line number Diff line
@@ -1168,8 +1168,6 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
        return NGHTTP2_ERR_CALLBACK_FAILURE;
    }
    
    AP_DEBUG_ASSERT(!h2_stream_is_suspended(stream));
    
    status = h2_stream_out_prepare(stream, &nread, &eos);
    if (nread) {
        *data_flags |=  NGHTTP2_DATA_FLAG_NO_COPY;
@@ -1179,9 +1177,10 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
        case APR_SUCCESS:
            break;
            
        case APR_ECONNABORTED:
        case APR_ECONNRESET:
            return nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
                stream->id, stream->rst_error);
                stream->id, H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR));
            
        case APR_EAGAIN:
            /* If there is no data available, our session will automatically
@@ -1431,7 +1430,17 @@ static apr_status_t on_stream_resume(void *ctx, int stream_id)
    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
                  "h2_stream(%ld-%d): on_resume", session->id, stream_id);
    if (stream) {
        int rv = nghttp2_session_resume_data(session->ngh2, stream_id);
        int rv;
        if (stream->rst_error) {
            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
                          "h2_stream(%ld-%d): RST_STREAM, err=%d",
                          session->id, stream->id, stream->rst_error);
            rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
                                           stream->id, stream->rst_error);
        }
        else {
            rv = nghttp2_session_resume_data(session->ngh2, stream_id);
        }
        session->have_written = 1;
        ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
                      APLOG_ERR : APLOG_DEBUG, 0, session->c,
@@ -1459,7 +1468,7 @@ static apr_status_t on_stream_response(void *ctx, int stream_id)
    if (!stream) {
        return APR_NOTFOUND;
    }
    else if (!stream->response) {
    else if (stream->rst_error || !stream->response) {
        int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
        
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
@@ -1652,40 +1661,6 @@ static apr_status_t h2_session_read(h2_session *session, int block)
    return rstatus;
}

static int unsubmitted_iter(void *ctx, void *val)
{
    h2_stream *stream = val;
    if (h2_stream_needs_submit(stream)) {
        *((int *)ctx) = 1;
        return 0;
    }
    return 1;
}

static int has_unsubmitted_streams(h2_session *session)
{
    int has_unsubmitted = 0;
    h2_ihash_iter(session->streams, unsubmitted_iter, &has_unsubmitted);
    return has_unsubmitted;
}

static int suspended_iter(void *ctx, void *val)
{
    h2_stream *stream = val;
    if (h2_stream_is_suspended(stream)) {
        *((int *)ctx) = 1;
        return 0;
    }
    return 1;
}

static int has_suspended_streams(h2_session *session)
{
    int has_suspended = 0;
    h2_ihash_iter(session->streams, suspended_iter, &has_suspended);
    return has_suspended;
}

static const char *StateNames[] = {
    "INIT",      /* H2_SESSION_ST_INIT */
    "DONE",      /* H2_SESSION_ST_DONE */
@@ -1830,8 +1805,7 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
                          session->id, session->open_streams);
            h2_conn_io_flush(&session->io);
            if (session->open_streams > 0) {
                if (has_unsubmitted_streams(session) 
                    || has_suspended_streams(session)) {
                if (h2_mplx_is_busy(session->mplx)) {
                    /* waiting for at least one stream to produce data */
                    transit(session, "no io", H2_SESSION_ST_WAIT);
                }
Loading