Commit 5f7e679f authored by Stefan Eissing's avatar Stefan Eissing
Browse files

On the trunk:

mod_http2: separate mutex instances for each bucket beam, resulting in 
     less lock contention. input beams only created when necessary.



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1784571 13f79535-47bb-0310-9956-ffa450edef68
parent 89e33de2
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
                                                         -*- coding: utf-8 -*-
Changes with Apache 2.5.0

  *) mod_http2: separate mutex instances for each bucket beam, resulting in 
     less lock contention. input beams only created when necessary.
     [Stefan Eissing]
     
  *) mod_syslog: Support use of optional "tag" in syslog entries.
     PR 60525. [Ben Rubson <ben.rubson gmail.com>, Jim Jagielski]

+78 −40
Original line number Diff line number Diff line
@@ -195,6 +195,19 @@ static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam,
 * bucket beam that can transport buckets across threads
 ******************************************************************************/

static void mutex_leave(void *ctx, apr_thread_mutex_t *lock)
{
    apr_thread_mutex_unlock(lock);
}

static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl)
{
    h2_bucket_beam *beam = ctx;
    pbl->mutex = beam->lock;
    pbl->leave = mutex_leave;
    return apr_thread_mutex_lock(pbl->mutex);
}

static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
{
    h2_beam_mutex_enter *enter = beam->m_enter;
@@ -227,26 +240,37 @@ static apr_off_t bucket_mem_used(apr_bucket *b)
    }
}

static int report_consumption(h2_bucket_beam *beam)
static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
{
    int rv = 0;
    if (beam->cons_io_cb) { 
        beam->cons_io_cb(beam->cons_ctx, beam, beam->received_bytes
                         - beam->cons_bytes_reported);
    apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
    h2_beam_io_callback *cb = beam->cons_io_cb;
     
    if (cb) {
        void *ctx = beam->cons_ctx;
        
        if (pbl) leave_yellow(beam, pbl);
        cb(ctx, beam, len);
        if (pbl) enter_yellow(beam, pbl);
        rv = 1;
    }
    beam->cons_bytes_reported = beam->received_bytes;
    beam->cons_bytes_reported += len;
    return rv;
}

static void report_prod_io(h2_bucket_beam *beam, int force)
static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl)
{
    if (force || beam->prod_bytes_reported != beam->sent_bytes) {
        if (beam->prod_io_cb) { 
            beam->prod_io_cb(beam->prod_ctx, beam, beam->sent_bytes
                             - beam->prod_bytes_reported);
    apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported;
    if (force || len > 0) {
        h2_beam_io_callback *cb = beam->prod_io_cb; 
        if (cb) {
            void *ctx = beam->prod_ctx;
            
            leave_yellow(beam, pbl);
            cb(ctx, beam, len);
            enter_yellow(beam, pbl);
        }
        beam->prod_bytes_reported = beam->sent_bytes;
        beam->prod_bytes_reported += len;
    }
}

@@ -293,10 +317,10 @@ static apr_size_t calc_space_left(h2_bucket_beam *beam)
static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock)
{
    if (beam->timeout > 0) {
        return apr_thread_cond_timedwait(beam->m_cond, lock, beam->timeout);
        return apr_thread_cond_timedwait(beam->cond, lock, beam->timeout);
    }
    else {
        return apr_thread_cond_wait(beam->m_cond, lock);
        return apr_thread_cond_wait(beam->cond, lock);
    }
}

@@ -307,7 +331,7 @@ static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
    while (!beam->aborted && *premain <= 0 
           && (block == APR_BLOCK_READ) && pbl->mutex) {
        apr_status_t status;
        report_prod_io(beam, 1);
        report_prod_io(beam, 1, pbl);
        status = wait_cond(beam, pbl->mutex);
        if (APR_STATUS_IS_TIMEUP(status)) {
            return status;
@@ -378,8 +402,8 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
        if (!bl.mutex) {
            r_purge_sent(beam);
        }
        else if (beam->m_cond) {
            apr_thread_cond_broadcast(beam->m_cond);
        else if (beam->cond) {
            apr_thread_cond_broadcast(beam->cond);
        }
        leave_yellow(beam, &bl);
    }
@@ -399,8 +423,8 @@ static apr_status_t beam_close(h2_bucket_beam *beam)
{
    if (!beam->closed) {
        beam->closed = 1;
        if (beam->m_cond) {
            apr_thread_cond_broadcast(beam->m_cond);
        if (beam->cond) {
            apr_thread_cond_broadcast(beam->cond);
        }
    }
    return APR_SUCCESS;
@@ -445,7 +469,7 @@ static apr_status_t beam_send_cleanup(void *data)
    /* sender is going away, clear up all references to its memory */
    r_purge_sent(beam);
    h2_blist_cleanup(&beam->send_list);
    report_consumption(beam);
    report_consumption(beam, NULL);
    while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
        h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
        H2_BPROXY_REMOVE(proxy);
@@ -555,10 +579,16 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool,
    H2_BPROXY_LIST_INIT(&beam->proxies);
    beam->tx_mem_limits = 1;
    beam->max_buf_size = max_buf_size;
    apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);

    status = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, 
                                     pool);
    if (status == APR_SUCCESS) {
        status = apr_thread_cond_create(&beam->cond, pool);
        if (status == APR_SUCCESS) {
            apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);
            *pbeam = beam;
    
        }
    }
    return status;
}

@@ -586,7 +616,6 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)

void h2_beam_mutex_set(h2_bucket_beam *beam, 
                       h2_beam_mutex_enter m_enter,
                       apr_thread_cond_t *cond,
                       void *m_ctx)
{
    h2_beam_lock bl;
@@ -594,11 +623,20 @@ void h2_beam_mutex_set(h2_bucket_beam *beam,
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
        beam->m_enter = m_enter;
        beam->m_ctx   = m_ctx;
        beam->m_cond  = cond;
        leave_yellow(beam, &bl);
    }
}

void h2_beam_mutex_enable(h2_bucket_beam *beam)
{
    h2_beam_mutex_set(beam, mutex_enter, beam);
}

void h2_beam_mutex_disable(h2_bucket_beam *beam)
{
    h2_beam_mutex_set(beam, NULL, NULL);
}

void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
{
    h2_beam_lock bl;
@@ -630,10 +668,10 @@ void h2_beam_abort(h2_bucket_beam *beam)
            beam->aborted = 1;
            r_purge_sent(beam);
            h2_blist_cleanup(&beam->send_list);
            report_consumption(beam);
            report_consumption(beam, &bl);
        }
        if (beam->m_cond) {
            apr_thread_cond_broadcast(beam->m_cond);
        if (beam->cond) {
            apr_thread_cond_broadcast(beam->cond);
        }
        leave_yellow(beam, &bl);
    }
@@ -646,7 +684,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam)
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
        r_purge_sent(beam);
        beam_close(beam);
        report_consumption(beam);
        report_consumption(beam, &bl);
        leave_yellow(beam, &bl);
    }
    return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
@@ -680,8 +718,8 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block)
                status = APR_EAGAIN;
                break;
            }
            if (beam->m_cond) {
                apr_thread_cond_broadcast(beam->m_cond);
            if (beam->cond) {
                apr_thread_cond_broadcast(beam->cond);
            }
            status = wait_cond(beam, bl.mutex);
        }
@@ -868,12 +906,12 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
                b = APR_BRIGADE_FIRST(sender_bb);
                status = append_bucket(beam, b, block, &bl);
            }
            report_prod_io(beam, force_report);
            if (beam->m_cond) {
                apr_thread_cond_broadcast(beam->m_cond);
            report_prod_io(beam, force_report, &bl);
            if (beam->cond) {
                apr_thread_cond_broadcast(beam->cond);
            }
        }
        report_consumption(beam);
        report_consumption(beam, &bl);
        leave_yellow(beam, &bl);
    }
    return status;
@@ -1040,15 +1078,15 @@ transfer:
        }
        
        if (transferred) {
            if (beam->m_cond) {
                apr_thread_cond_broadcast(beam->m_cond);
            if (beam->cond) {
                apr_thread_cond_broadcast(beam->cond);
            }
            status = APR_SUCCESS;
        }
        else if (beam->closed) {
            status = APR_EOF;
        }
        else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
        else if (block == APR_BLOCK_READ && bl.mutex && beam->cond) {
            status = wait_cond(beam, bl.mutex);
            if (status != APR_SUCCESS) {
                goto leave;
@@ -1056,8 +1094,8 @@ transfer:
            goto transfer;
        }
        else {
            if (beam->m_cond) {
                apr_thread_cond_broadcast(beam->m_cond);
            if (beam->cond) {
                apr_thread_cond_broadcast(beam->cond);
            }
            status = APR_EAGAIN;
        }
@@ -1198,7 +1236,7 @@ int h2_beam_report_consumption(h2_bucket_beam *beam)
    h2_beam_lock bl;
    int rv = 0;
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
        rv = report_consumption(beam);
        rv = report_consumption(beam, &bl);
        leave_yellow(beam, &bl);
    }
    return rv;
+5 −2
Original line number Diff line number Diff line
@@ -190,9 +190,10 @@ struct h2_bucket_beam {
    unsigned int close_sent : 1;
    unsigned int tx_mem_limits : 1; /* only memory size counts on transfers */

    struct apr_thread_mutex_t *lock;
    struct apr_thread_cond_t *cond;
    void *m_ctx;
    h2_beam_mutex_enter *m_enter;
    struct apr_thread_cond_t *m_cond;
    
    apr_off_t cons_bytes_reported;    /* amount of bytes reported as consumed */
    h2_beam_ev_callback *cons_ev_cb;
@@ -315,9 +316,11 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block);

void h2_beam_mutex_set(h2_bucket_beam *beam, 
                       h2_beam_mutex_enter m_enter,
                       struct apr_thread_cond_t *cond,
                       void *m_ctx);

void h2_beam_mutex_enable(h2_bucket_beam *beam);
void h2_beam_mutex_disable(h2_bucket_beam *beam);

/** 
 * Set/get the timeout for blocking read/write operations. Only works
 * if a mutex has been set for the beam.
+52 −46
Original line number Diff line number Diff line
@@ -100,33 +100,19 @@ static void leave_mutex(h2_mplx *m, int acquired)
    }
}

static void beam_leave(void *ctx, apr_thread_mutex_t *lock)
{
    leave_mutex(ctx, 1);
}

static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl)
{
    h2_mplx *m = ctx;
    int acquired;
    apr_status_t status;
    
    status = enter_mutex(m, &acquired);
    if (status == APR_SUCCESS) {
        pbl->mutex = m->lock;
        pbl->leave = acquired? beam_leave : NULL;
        pbl->leave_ctx = m;
    }
    return status;
}

static void stream_output_consumed(void *ctx, 
                                   h2_bucket_beam *beam, apr_off_t length)
{
    h2_stream *stream = ctx;
    h2_mplx *m = stream->session->mplx;
    h2_task *task = stream->task;
    int acquired;
    
    if (length > 0 && task && task->assigned) {
        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
            h2_req_engine_out_consumed(task->assigned, task->c, length); 
            leave_mutex(m, acquired);
        }
    }
}

@@ -139,10 +125,17 @@ static void stream_input_ev(void *ctx, h2_bucket_beam *beam)
static void stream_input_consumed(void *ctx, 
                                  h2_bucket_beam *beam, apr_off_t length)
{
    if (length > 0) { 
        h2_mplx *m = ctx;
    if (m->input_consumed && length) {
        int acquired;
    
        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
            if (m->input_consumed) {
                m->input_consumed(m->input_consumed_ctx, beam->id, length);
            }
            leave_mutex(m, acquired);
        }
    }
}

static int can_beam_file(void *ctx, h2_bucket_beam *beam,  apr_file_t *file)
@@ -190,7 +183,9 @@ static void stream_joined(h2_mplx *m, h2_stream *stream)
    
    h2_ihash_remove(m->shold, stream->id);
    h2_ihash_add(m->spurge, stream);
    if (stream->input) {
        m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
    }
    m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
}

@@ -198,9 +193,11 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream)
{
    ap_assert(stream->state == H2_SS_CLEANUP);

    h2_beam_on_produced(stream->output, NULL, NULL);
    if (stream->input) {
        h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
        h2_beam_abort(stream->input);
    }
    h2_beam_on_produced(stream->output, NULL, NULL);
    h2_beam_leave(stream->output);
    
    h2_stream_cleanup(stream);
@@ -376,18 +373,21 @@ static int stream_destroy_iter(void *ctx, void *val)
    h2_ihash_remove(m->spurge, stream->id);
    ap_assert(stream->state == H2_SS_CLEANUP);
    
    if (stream->input == NULL || stream->output == NULL) {
    if (stream->output == NULL) {
        ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, m->c, 
                      H2_STRM_MSG(stream, "already with beams==NULL"));
        return 0;
    }
    
    if (stream->input) {
        /* Process outstanding events before destruction */
        input_consumed_signal(m, stream);    
    
        h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy");
    h2_beam_log(stream->output, m->c, APLOG_TRACE2, "stream_destroy");
        h2_beam_destroy(stream->input);
        stream->input = NULL;
    }
    
    h2_beam_log(stream->output, m->c, APLOG_TRACE2, "stream_destroy");
    h2_beam_destroy(stream->output);
    stream->output = NULL;
    if (stream->task) {
@@ -628,7 +628,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
    }
    
    /* time to protect the beam against multi-threaded use */
    h2_beam_mutex_set(stream->output, beam_enter, stream->task->cond, m);
    h2_beam_mutex_enable(stream->output);
    
    /* we might see some file buckets in the output, see
     * if we have enough handles reserved. */
@@ -812,11 +812,13 @@ static h2_task *next_stream_task(h2_mplx *m)
                    m->max_stream_started = sid;
                }
                
                if (stream->input) {
                    h2_beam_timeout_set(stream->input, m->stream_timeout);
                    h2_beam_on_consumed(stream->input, stream_input_ev, 
                                        stream_input_consumed, m);
                    h2_beam_on_file_beam(stream->input, can_beam_file, m);
                h2_beam_mutex_set(stream->input, beam_enter, stream->task->cond, m);
                    h2_beam_mutex_enable(stream->input);
                }
                
                h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
                h2_beam_timeout_set(stream->output, m->stream_timeout);
@@ -931,18 +933,22 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                      H2_STRM_MSG(stream, "task_done, stream open")); 
        /* more data will not arrive, resume the stream */
        h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
        h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
        if (stream->input) {
            h2_beam_mutex_disable(stream->input);
            h2_beam_leave(stream->input);
        }
        h2_beam_mutex_disable(stream->output);
        have_out_data_for(m, stream);
    }
    else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                      H2_STRM_MSG(stream, "task_done, in hold"));
        /* stream was just waiting for us. */
        h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
        h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
        if (stream->input) {
            h2_beam_mutex_disable(stream->input);
            h2_beam_leave(stream->input);
        }
        h2_beam_mutex_disable(stream->output);
        stream_joined(m, stream);
    }
    else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) {
+1 −6
Original line number Diff line number Diff line
@@ -1738,12 +1738,7 @@ static void ev_stream_open(h2_session *session, h2_stream *stream)
    }
    
    ap_assert(!stream->scheduled);
    if (stream->request) {
        const h2_request *r = stream->request;
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
                      H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
                      r->method, r->scheme, r->authority, r->path, r->chunked);
        stream->scheduled = 1;
    if (h2_stream_prep_processing(stream) == APR_SUCCESS) {
        h2_mplx_process(session->mplx, stream, stream_pri_cmp, session);
    }
    else {
Loading