Commit 1e953cc3 authored by Stefan Eissing's avatar Stefan Eissing
Browse files

On the trunk:

mod_http2: code cleanup after eliminating nested locks, giving worker slots their own mutex.



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1790113 13f79535-47bb-0310-9956-ffa450edef68
parent cfb69e2f
Loading
Loading
Loading
Loading
+271 −309
Original line number Diff line number Diff line
@@ -55,56 +55,22 @@ typedef struct {
    apr_time_t now;
} stream_iter_ctx;

/* NULL or the mutex hold by this thread, used for recursive calls
 */
static const int nested_lock = 0;

static apr_threadkey_t *thread_lock;

apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
{
    if (nested_lock) {
        return apr_threadkey_private_create(&thread_lock, NULL, pool);
    }
    return APR_SUCCESS;
}

static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
{
    apr_status_t status;
#define H2_MPLX_ENTER(m)    \
    do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
        return rv;\
    } } while(0)

    if (nested_lock) {
        void *mutex = NULL;
        /* Enter the mutex if this thread already holds the lock or
         * if we can acquire it. Only on the later case do we unlock
         * onleaving the mutex.
         * This allow recursive entering of the mutex from the saem thread,
         * which is what we need in certain situations involving callbacks
         */
        apr_threadkey_private_get(&mutex, thread_lock);
        if (mutex == m->lock) {
            *pacquired = 0;
            ap_assert(NULL); /* nested, why? */
            return APR_SUCCESS;
        }
    }
    status = apr_thread_mutex_lock(m->lock);
    *pacquired = (status == APR_SUCCESS);
    if (nested_lock && *pacquired) {
        apr_threadkey_private_set(m->lock, thread_lock);
    }
    return status;
}
#define H2_MPLX_ENTER_ALWAYS(m)    \
    apr_thread_mutex_lock(m->lock)

#define H2_MPLX_LEAVE(m)    \
    apr_thread_mutex_unlock(m->lock)
 
static void leave_mutex(h2_mplx *m, int acquired)
{
    if (acquired) {
        if (nested_lock) {
            apr_threadkey_private_set(NULL, thread_lock);
        }
        apr_thread_mutex_unlock(m->lock);
    }
}

static void check_data_for(h2_mplx *m, int stream_id);

@@ -259,14 +225,15 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,

int h2_mplx_shutdown(h2_mplx *m)
{
    int acquired, max_stream_started = 0;
    int max_stream_started = 0;
    
    H2_MPLX_ENTER(m);

    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
    max_stream_started = m->max_stream_started;
    /* Clear schedule queue, disabling existing streams from starting */ 
    h2_iq_clear(m->q);
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return max_stream_started;
}

@@ -363,18 +330,16 @@ static int stream_iter_wrap(void *ctx, void *stream)

apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
{
    apr_status_t status;
    int acquired;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    stream_iter_ctx_t x;
    
    H2_MPLX_ENTER(m);

    x.cb = cb;
    x.ctx = ctx;
    h2_ihash_iter(m->streams, stream_iter_wrap, &x);
        
        leave_mutex(m, acquired);
    }
    return status;
    H2_MPLX_LEAVE(m);
    return APR_SUCCESS;
}

static int report_stream_iter(void *ctx, void *val) {
@@ -430,14 +395,13 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
    apr_status_t status;
    int i, wait_secs = 60;
    int acquired;

    /* How to shut down a h2 connection:
     * 0. abort and tell the workers that no more tasks will come from us */
    m->aborted = 1;
    h2_workers_unregister(m->workers, m);
    
    enter_mutex(m, &acquired);
    H2_MPLX_ENTER_ALWAYS(m);

    /* How to shut down a h2 connection:
     * 1. cancel all streams still active */
@@ -482,7 +446,7 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
        h2_ihash_iter(m->shold, unexpected_stream_iter, m);
    }
    
    leave_mutex(m, acquired);
    H2_MPLX_LEAVE(m);

    /* 5. unregister again, now that our workers are done */
    h2_workers_unregister(m->workers, m);
@@ -493,41 +457,39 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)

apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
{
    apr_status_t status = APR_SUCCESS;
    int acquired;
    H2_MPLX_ENTER(m);
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                  H2_STRM_MSG(stream, "cleanup"));
    stream_cleanup(m, stream);        
        leave_mutex(m, acquired);
    }
    return status;
    
    H2_MPLX_LEAVE(m);
    return APR_SUCCESS;
}

h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
{
    h2_stream *s = NULL;
    int acquired;
    
    if ((enter_mutex(m, &acquired)) == APR_SUCCESS) {
    H2_MPLX_ENTER_ALWAYS(m);

    s = h2_ihash_get(m->streams, id);
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return s;
}

static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
{
    h2_mplx *m = ctx;
    int acquired;
    
    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
    H2_MPLX_ENTER_ALWAYS(m);

    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                  "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id);
    check_data_for(m, beam->id);
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
}

static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
@@ -568,17 +530,17 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
{
    apr_status_t status;
    int acquired;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    H2_MPLX_ENTER(m);

    if (m->aborted) {
        status = APR_ECONNABORTED;
    }
    else {
        status = out_open(m, stream_id, beam);
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}

@@ -609,9 +571,9 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
                                 apr_thread_cond_t *iowait)
{
    apr_status_t status;
    int acquired;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    H2_MPLX_ENTER(m);

    if (m->aborted) {
        status = APR_ECONNABORTED;
    }
@@ -630,8 +592,8 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
        }
        m->added_output = NULL;
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}

@@ -648,9 +610,9 @@ static void check_data_for(h2_mplx *m, int stream_id)
apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
{
    apr_status_t status;
    int acquired;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    H2_MPLX_ENTER(m);

    if (m->aborted) {
        status = APR_ECONNABORTED;
    }
@@ -658,9 +620,10 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
        h2_iq_sort(m->q, cmp, ctx);
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                      "h2_mplx(%ld): reprioritize tasks", m->id);
        status = APR_SUCCESS;
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}

@@ -682,13 +645,14 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
                             h2_stream_pri_cmp *cmp, void *ctx)
{
    apr_status_t status;
    int acquired;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    H2_MPLX_ENTER(m);

    if (m->aborted) {
        status = APR_ECONNABORTED;
    }
    else {
        status = APR_SUCCESS;
        h2_ihash_add(m->streams, stream);
        if (h2_stream_is_ready(stream)) {
            /* already have a response */
@@ -703,8 +667,8 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
                          H2_STRM_MSG(stream, "process, added to q")); 
        }
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}

@@ -762,11 +726,10 @@ static h2_task *next_stream_task(h2_mplx *m)
h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
{
    h2_task *task = NULL;
    apr_status_t status;
    int acquired;
    
    H2_MPLX_ENTER_ALWAYS(m);

    *has_more = 0;
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    if (!m->aborted) {
        task = next_stream_task(m);
        if (task != NULL && !h2_iq_empty(m->q)) {
@@ -776,8 +739,8 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
            m->is_registered = 0; /* h2_workers will discard this mplx */
        }
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return task;
}

@@ -895,9 +858,8 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)

void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
{
    int acquired;
    H2_MPLX_ENTER_ALWAYS(m);

    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
    task_done(m, task, NULL);
    --m->tasks_active;
    if (m->join_wait) {
@@ -908,8 +870,8 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
        *ptask = next_stream_task(m);
    }
    register_if_needed(m);
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
}

/*******************************************************************************
@@ -1001,10 +963,11 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
{
    apr_status_t status = APR_SUCCESS;
    apr_time_t now;            
    int acquired;
    apr_size_t scount;
    
    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
        apr_size_t scount = h2_ihash_count(m->streams);
    H2_MPLX_ENTER(m);

    scount = h2_ihash_count(m->streams);
    if (scount > 0 && m->tasks_active) {
        /* If we have streams in connection state 'IDLE', meaning
         * all streams are ready to sent data out, but lack
@@ -1045,8 +1008,8 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
        }
    }
    register_if_needed(m);
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}

@@ -1090,7 +1053,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
    apr_status_t status;
    h2_mplx *m;
    h2_task *task;
    int acquired;
    h2_stream *stream;
    
    task = h2_ctx_rget_task(r);
    if (!task) {
@@ -1098,17 +1061,17 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
    }
    m = task->mplx;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
        h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
    H2_MPLX_ENTER(m);

    stream = h2_ihash_get(m->streams, task->stream_id);
    if (stream) {
        status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
    }
    else {
        status = APR_ECONNABORTED;
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}

@@ -1120,10 +1083,11 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
    h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
    h2_mplx *m = h2_ngn_shed_get_ctx(shed);
    apr_status_t status;
    int acquired;
    int want_shutdown;
    
    H2_MPLX_ENTER(m);

    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
        int want_shutdown = (block == APR_BLOCK_READ);
    want_shutdown = (block == APR_BLOCK_READ);

    /* Take this opportunity to update output consummation 
     * for this engine */
@@ -1147,8 +1111,8 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
        status = h2_ngn_shed_pull_request(shed, ngn, capacity,
                                          want_shutdown, pr);
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}
 
@@ -1159,10 +1123,11 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
    
    if (task) {
        h2_mplx *m = task->mplx;
        int acquired;
        h2_stream *stream;

        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
            h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
        H2_MPLX_ENTER_ALWAYS(m);

        stream = h2_ihash_get(m->streams, task->stream_id);
        
        ngn_out_update_windows(m, ngn);
        h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
@@ -1178,10 +1143,8 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
        else {
            task_done(m, task, ngn);
        }
            /* Take this opportunity to update output consummation 
             * for this engine */
            leave_mutex(m, acquired);
        }

        H2_MPLX_LEAVE(m);
    }
}

@@ -1198,13 +1161,12 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
                                            stream_ev_callback *on_resume, 
                                            void *on_ctx)
{
    apr_status_t status;
    int acquired;
    int ids[100];
    h2_stream *stream;
    size_t i, n;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    H2_MPLX_ENTER(m);

    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                  "h2_mplx(%ld): dispatch events", m->id);        
    apr_atomic_set32(&m->event_pending, 0);
@@ -1218,45 +1180,45 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
        for (i = 0; i < n; ++i) {
            stream = h2_ihash_get(m->streams, ids[i]);
            if (stream) {
                    leave_mutex(m, acquired);
                H2_MPLX_LEAVE(m);

                on_resume(on_ctx, stream);
                    enter_mutex(m, &acquired);

                H2_MPLX_ENTER(m);
            }
        }
    }
    if (!h2_iq_empty(m->readyq)) {
        apr_atomic_set32(&m->event_pending, 1);
    } 
        leave_mutex(m, acquired);
    }
    return status;

    H2_MPLX_LEAVE(m);
    return APR_SUCCESS;
}

apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id)
{
    apr_status_t status;
    int acquired;
    H2_MPLX_ENTER(m);

    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    check_data_for(m, stream_id);
        leave_mutex(m, acquired);
    }
    return status;

    H2_MPLX_LEAVE(m);
    return APR_SUCCESS;
}

int h2_mplx_awaits_data(h2_mplx *m)
{
    apr_status_t status;
    int acquired, waiting = 1;
    int waiting = 1;
     
    H2_MPLX_ENTER_ALWAYS(m);

    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    if (h2_ihash_empty(m->streams)) {
        waiting = 0;
    }
    if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
        waiting = 0;
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return waiting;
}
+21 −8
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ struct h2_slot {
    int sticks;
    h2_task *task;
    apr_thread_t *thread;
    apr_thread_mutex_t *lock;
    apr_thread_cond_t *not_idle;
};

@@ -78,6 +79,17 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
    slot->workers = workers;
    slot->aborted = 0;
    slot->task = NULL;

    if (!slot->lock) {
        status = apr_thread_mutex_create(&slot->lock,
                                         APR_THREAD_MUTEX_DEFAULT,
                                         workers->pool);
        if (status != APR_SUCCESS) {
            push_slot(&workers->free, slot);
            return status;
        }
    }

    if (!slot->not_idle) {
        status = apr_thread_cond_create(&slot->not_idle, workers->pool);
        if (status != APR_SUCCESS) {
@@ -112,9 +124,9 @@ static void wake_idle_worker(h2_workers *workers)
{
    h2_slot *slot = pop_slot(&workers->idle);
    if (slot) {
        apr_thread_mutex_lock(workers->lock);
        apr_thread_mutex_lock(slot->lock);
        apr_thread_cond_signal(slot->not_idle);
        apr_thread_mutex_unlock(workers->lock);
        apr_thread_mutex_unlock(slot->lock);
    }
    else if (workers->dynamic) {
        add_worker(workers);
@@ -185,15 +197,16 @@ static apr_status_t get_next(h2_slot *slot)
            return APR_SUCCESS;
        }
        
        apr_thread_mutex_lock(workers->lock);
        cleanup_zombies(workers);

        ++workers->idle_workers;

        apr_thread_mutex_lock(slot->lock);
        push_slot(&workers->idle, slot);
        apr_thread_cond_wait(slot->not_idle, workers->lock);
        --workers->idle_workers;
        apr_thread_cond_wait(slot->not_idle, slot->lock);
        apr_thread_mutex_unlock(slot->lock);

        apr_thread_mutex_unlock(workers->lock);
        --workers->idle_workers;
    }
    return APR_EOF;
}
@@ -239,21 +252,21 @@ static apr_status_t workers_pool_cleanup(void *data)
    h2_slot *slot;
    
    if (!workers->aborted) {
        apr_thread_mutex_lock(workers->lock);
        workers->aborted = 1;
        /* before we go, cleanup any zombies and abort the rest */
        cleanup_zombies(workers);
        for (;;) {
            slot = pop_slot(&workers->idle);
            if (slot) {
                apr_thread_mutex_lock(slot->lock);
                slot->aborted = 1;
                apr_thread_cond_signal(slot->not_idle);
                apr_thread_mutex_unlock(slot->lock);
            }
            else {
                break;
            }
        }
        apr_thread_mutex_unlock(workers->lock);

        h2_fifo_term(workers->mplxs);
        h2_fifo_interrupt(workers->mplxs);