Commit ee242ef7 authored by Stefan Eissing's avatar Stefan Eissing
Browse files

On the trunk:

mod_http2: less and more granular mutex use for improved performance.



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1790284 13f79535-47bb-0310-9956-ffa450edef68
parent c57bcf66
Loading
Loading
Loading
Loading
+4 −2
Original line number Diff line number Diff line
                                                         -*- coding: utf-8 -*-
Changes with Apache 2.5.0

  *) mod_http2/mod_proxy_http2: less read attempts on bucket beams that already
     delivered EOS/headers. Fixed bug in re-attempting proxy request after 
  *) mod_http2: less and more granular mutex use for improved performance.
     [Stefan Eissing]
     
  *) mod_proxy_http2: Fixed bug in re-attempting proxy requests after 
     connection error. [Stefan Eissing]
  
  *) core: Disallow multiple Listen on the same IP:port when listener buckets
+48 −54
Original line number Diff line number Diff line
@@ -65,14 +65,19 @@ apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
        return rv;\
    } } while(0)

#define H2_MPLX_LEAVE(m)    \
    apr_thread_mutex_unlock(m->lock)
 
#define H2_MPLX_ENTER_ALWAYS(m)    \
    apr_thread_mutex_lock(m->lock)

#define H2_MPLX_LEAVE(m)    \
    apr_thread_mutex_unlock(m->lock)
#define H2_MPLX_ENTER_MAYBE(m, lock)    \
    if (lock) apr_thread_mutex_lock(m->lock)

#define H2_MPLX_LEAVE_MAYBE(m, lock)    \
    if (lock) apr_thread_mutex_unlock(m->lock)

static void check_data_for(h2_mplx *m, int stream_id);
static void check_data_for(h2_mplx *m, h2_stream *stream, int lock);

static void stream_output_consumed(void *ctx, 
                                   h2_bucket_beam *beam, apr_off_t length)
@@ -121,6 +126,7 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream)
    h2_stream_cleanup(stream);

    h2_iq_remove(m->q, stream->id);
    h2_fifo_remove(m->readyq, stream);
    h2_ihash_remove(m->streams, stream->id);
    h2_ihash_add(m->shold, stream);
    
@@ -206,7 +212,12 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
        m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
        m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
        m->q = h2_iq_create(m->pool, m->max_streams);
        m->readyq = h2_iq_create(m->pool, m->max_streams);

        status = h2_fifo_set_create(&m->readyq, m->pool, m->max_streams);
        if (status != APR_SUCCESS) {
            apr_pool_destroy(m->pool);
            return NULL;
        }

        m->workers = workers;
        m->max_active = workers->max_workers;
@@ -481,15 +492,10 @@ h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)

static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
{
    h2_mplx *m = ctx;
    
    H2_MPLX_ENTER_ALWAYS(m);

    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                  "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id);
    check_data_for(m, beam->id);
    h2_stream *stream = ctx;
    h2_mplx *m = stream->session->mplx;
    
    H2_MPLX_LEAVE(m);
    check_data_for(m, stream, 1);
}

static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
@@ -513,7 +519,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
    }
    
    h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream);
    h2_beam_on_produced(stream->output, output_produced, m);
    h2_beam_on_produced(stream->output, output_produced, stream);
    if (stream->task->output.copy_files) {
        h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL);
    }
@@ -523,7 +529,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
    
    /* we might see some file buckets in the output, see
     * if we have enough handles reserved. */
    check_data_for(m, stream->id);
    check_data_for(m, stream, 0);
    return status;
}

@@ -563,7 +569,7 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task)
    status = h2_beam_close(task->output.beam);
    h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close");
    output_consumed_signal(m, task);
    check_data_for(m, task->stream_id);
    check_data_for(m, stream, 0);
    return status;
}

@@ -577,7 +583,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
    if (m->aborted) {
        status = APR_ECONNABORTED;
    }
    else if (apr_atomic_read32(&m->event_pending) > 0) {
    else if (h2_mplx_has_master_events(m)) {
        status = APR_SUCCESS;
    }
    else {
@@ -597,14 +603,16 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
    return status;
}

static void check_data_for(h2_mplx *m, int stream_id)
static void check_data_for(h2_mplx *m, h2_stream *stream, int lock)
{
    ap_assert(m);
    h2_iq_append(m->readyq, stream_id);
    if (h2_fifo_push(m->readyq, stream) == APR_SUCCESS) {
        apr_atomic_set32(&m->event_pending, 1);
        H2_MPLX_ENTER_MAYBE(m, lock);
        if (m->added_output) {
            apr_thread_cond_signal(m->added_output);
        }
        H2_MPLX_LEAVE_MAYBE(m, lock);
    }
}

apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
@@ -656,7 +664,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
        h2_ihash_add(m->streams, stream);
        if (h2_stream_is_ready(stream)) {
            /* already have a response */
            check_data_for(m, stream->id);
            check_data_for(m, stream, 0);
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                          H2_STRM_MSG(stream, "process, add to readyq")); 
        }
@@ -828,7 +836,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
        if (stream->output) {
            h2_beam_mutex_disable(stream->output);
        }
        check_data_for(m, stream->id);
        check_data_for(m, stream, 0);
    }
    else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
@@ -1161,48 +1169,33 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
                                            stream_ev_callback *on_resume, 
                                            void *on_ctx)
{
    int ids[100];
    h2_stream *stream;
    size_t i, n;
    
    H2_MPLX_ENTER(m);
    int n;
    
    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                  "h2_mplx(%ld): dispatch events", m->id);        
    apr_atomic_set32(&m->event_pending, 0);
    purge_streams(m);

    /* update input windows for streams */
    h2_ihash_iter(m->streams, report_consumption_iter, m);
    
    if (!h2_iq_empty(m->readyq)) {
        n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
        for (i = 0; i < n; ++i) {
            stream = h2_ihash_get(m->streams, ids[i]);
            if (stream) {
                H2_MPLX_LEAVE(m);

    n = h2_fifo_count(m->readyq);
    while (n > 0 
           && (h2_fifo_try_pull(m->readyq, (void**)&stream) == APR_SUCCESS)) {
        --n;
        on_resume(on_ctx, stream);

                H2_MPLX_ENTER(m);
            }
        }
    }
    if (!h2_iq_empty(m->readyq)) {
        apr_atomic_set32(&m->event_pending, 1);
    }
    
    H2_MPLX_ENTER(m);
    purge_streams(m);
    H2_MPLX_LEAVE(m);
    
    return APR_SUCCESS;
}

apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id)
apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream)
{
    H2_MPLX_ENTER(m);

    check_data_for(m, stream_id);

    H2_MPLX_LEAVE(m);
    check_data_for(m, stream, 1);
    return APR_SUCCESS;
}

@@ -1215,7 +1208,8 @@ int h2_mplx_awaits_data(h2_mplx *m)
    if (h2_ihash_empty(m->streams)) {
        waiting = 0;
    }
    if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
    if ((h2_fifo_count(m->readyq) == 0) 
        && h2_iq_empty(m->q) && !m->tasks_active) {
        waiting = 0;
    }

+2 −2
Original line number Diff line number Diff line
@@ -68,7 +68,7 @@ struct h2_mplx {
    struct h2_ihash_t *spurge;      /* all streams done, ready for destroy */
    
    struct h2_iqueue *q;            /* all stream ids that need to be started */
    struct h2_iqueue *readyq;       /* all stream ids ready for output */
    struct h2_fifo *readyq;         /* all streams ready for output */
        
    struct h2_ihash_t *redo_tasks;  /* all tasks that need to be redone */
    
@@ -158,7 +158,7 @@ apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, struct h2_stream *stream);
apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
                                 struct apr_thread_cond_t *iowait);

apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id);
apr_status_t h2_mplx_keep_active(h2_mplx *m, struct h2_stream *stream);

/*******************************************************************************
 * Stream processing.
+1 −1
Original line number Diff line number Diff line
@@ -859,7 +859,7 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
    
    if (status == APR_EAGAIN) {
        /* TODO: ugly, someone needs to retrieve the response first */
        h2_mplx_keep_active(stream->session->mplx, stream->id);
        h2_mplx_keep_active(stream->session->mplx, stream);
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                      H2_STRM_MSG(stream, "prep, response eagain"));
        return status;
+37 −6
Original line number Diff line number Diff line
@@ -438,12 +438,12 @@ int h2_iq_count(h2_iqueue *q)
}


void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
{
    int i;
    
    if (h2_iq_contains(q, sid)) {
        return;
        return 0;
    }
    if (q->nelts >= q->nalloc) {
        iq_grow(q, q->nalloc * 2);
@@ -456,11 +456,12 @@ void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
        /* bubble it to the front of the queue */
        iq_bubble_up(q, i, q->head, cmp, ctx);
    }
    return 1;
}

void h2_iq_append(h2_iqueue *q, int sid)
int h2_iq_append(h2_iqueue *q, int sid)
{
    h2_iq_add(q, sid, NULL, NULL);
    return h2_iq_add(q, sid, NULL, NULL);
}

int h2_iq_remove(h2_iqueue *q, int sid)
@@ -612,6 +613,7 @@ int h2_iq_contains(h2_iqueue *q, int sid)
struct h2_fifo {
    void **elems;
    int nelems;
    int set;
    int head;
    int count;
    int aborted;
@@ -636,7 +638,20 @@ static apr_status_t fifo_destroy(void *data)
    return APR_SUCCESS;
}

apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
static int index_of(h2_fifo *fifo, void *elem)
{
    int i;
    
    for (i = 0; i < fifo->count; ++i) {
        if (elem == fifo->elems[nth_index(fifo, i)]) {
            return i;
        }
    }
    return -1;
}

static apr_status_t create_int(h2_fifo **pfifo, apr_pool_t *pool, 
                               int capacity, int as_set)
{
    apr_status_t rv;
    h2_fifo *fifo;
@@ -667,6 +682,7 @@ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
        return APR_ENOMEM;
    }
    fifo->nelems = capacity;
    fifo->set = as_set;
    
    *pfifo = fifo;
    apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null);
@@ -674,6 +690,16 @@ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
    return APR_SUCCESS;
}

apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
{
    return create_int(pfifo, pool, capacity, 0);
}

apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
{
    return create_int(pfifo, pool, capacity, 1);
}

apr_status_t h2_fifo_term(h2_fifo *fifo)
{
    apr_status_t rv;
@@ -725,7 +751,12 @@ static apr_status_t fifo_push(h2_fifo *fifo, void *elem, int block)
    }

    if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
        if (fifo->count == fifo->nelems) {
        if (fifo->set && index_of(fifo, elem) >= 0) {
            /* set mode, elem already member */
            apr_thread_mutex_unlock(fifo->lock);
            return APR_EEXIST;
        }
        else if (fifo->count == fifo->nelems) {
            if (block) {
                while (fifo->count == fifo->nelems) {
                    if (fifo->aborted) {
Loading