Commit 7905acac authored by Stefan Eissing's avatar Stefan Eissing
Browse files

On the 2.4.x branch:

Merged /httpd/httpd/trunk:r1789740,1790102,1790113,1790284,1790754,1790826-1790827,1790842



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1790847 13f79535-47bb-0310-9956-ffa450edef68
parent 3804b8c0
Loading
Loading
Loading
Loading
+4 −3
Original line number Diff line number Diff line
@@ -2,6 +2,10 @@

Changes with Apache 2.4.26

  *) mod_proxy_http2: Fixed bug in re-attempting proxy requests after 
     connection error. Reliability of reconnect handling improved. 
     [Stefan Eissing]
  
  *) mod_http2: better performance, eliminated need for nested locks and
     thread privates. Moving request setups from the main connection to the
     worker threads. Increase number of spare connections kept.
@@ -22,9 +26,6 @@ Changes with Apache 2.4.26
     format from 2.2 in the Last Modified column. PR60846.
     [Hank Ibell <hwibell gmail.com>]

  *) mod_http2: fixed PR60869 by making h2 workers exit explicitly waking up
     all threads to exit in a defined way. [Stefan Eissing]
     
  *) core: Add %{REMOTE_PORT} to the expression parser. PR59938
     [Hank Ibell <hwibell gmail.com>]

+4 −0
Original line number Diff line number Diff line
@@ -1035,7 +1035,11 @@ transfer:
                ++transferred;
            }
            else {
                /* let outside hook determine how bucket is beamed */
                leave_yellow(beam, &bl);
                brecv = h2_beam_bucket(beam, bb, bsender);
                enter_yellow(beam, &bl);
                
                while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
                    ++transferred;
                    remain -= brecv->length;
+17 −14
Original line number Diff line number Diff line
@@ -428,38 +428,41 @@ static void add_stats(apr_bucket_brigade *bb, h2_session *s,

static apr_status_t h2_status_insert(h2_task *task, apr_bucket *b)
{
    h2_mplx *m = task->mplx;
    h2_stream *stream = h2_mplx_stream_get(m, task->stream_id);
    h2_session *s;
    conn_rec *c;
    
    conn_rec *c = task->c->master;
    h2_ctx *h2ctx = h2_ctx_get(c, 0);
    h2_session *session;
    h2_stream *stream;
    apr_bucket_brigade *bb;
    apr_bucket *e;
    int32_t connFlowIn, connFlowOut;
    
    
    if (!h2ctx || (session = h2_ctx_session_get(h2ctx)) == NULL) {
        return APR_SUCCESS;
    }
    
    stream = h2_session_stream_get(session, task->stream_id);
    if (!stream) {
        /* stream already done */
        return APR_SUCCESS;
    }
    s = stream->session;
    c = s->c;
    
    bb = apr_brigade_create(stream->pool, c->bucket_alloc);
    
    connFlowIn = nghttp2_session_get_effective_local_window_size(s->ngh2); 
    connFlowOut = nghttp2_session_get_remote_window_size(s->ngh2);
    connFlowIn = nghttp2_session_get_effective_local_window_size(session->ngh2); 
    connFlowOut = nghttp2_session_get_remote_window_size(session->ngh2);
     
    bbout(bb, "{\n");
    bbout(bb, "  \"version\": \"draft-01\",\n");
    add_settings(bb, s, 0);
    add_peer_settings(bb, s, 0);
    add_settings(bb, session, 0);
    add_peer_settings(bb, session, 0);
    bbout(bb, "  \"connFlowIn\": %d,\n", connFlowIn);
    bbout(bb, "  \"connFlowOut\": %d,\n", connFlowOut);
    bbout(bb, "  \"sentGoAway\": %d,\n", s->local.shutdown);
    bbout(bb, "  \"sentGoAway\": %d,\n", session->local.shutdown);

    add_streams(bb, s, 0);
    add_streams(bb, session, 0);
    
    add_stats(bb, s, stream, 1);
    add_stats(bb, session, stream, 1);
    bbout(bb, "}\n");
    
    while ((e = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) {
+306 −347
Original line number Diff line number Diff line
@@ -55,58 +55,29 @@ typedef struct {
    apr_time_t now;
} stream_iter_ctx;

/* NULL or the mutex hold by this thread, used for recursive calls
 */
static const int nested_lock = 0;

static apr_threadkey_t *thread_lock;

apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
{
    if (nested_lock) {
        return apr_threadkey_private_create(&thread_lock, NULL, pool);
    }
    return APR_SUCCESS;
}

static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
{
    apr_status_t status;
#define H2_MPLX_ENTER(m)    \
    do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
        return rv;\
    } } while(0)

    if (nested_lock) {
        void *mutex = NULL;
        /* Enter the mutex if this thread already holds the lock or
         * if we can acquire it. Only on the later case do we unlock
         * onleaving the mutex.
         * This allow recursive entering of the mutex from the saem thread,
         * which is what we need in certain situations involving callbacks
         */
        apr_threadkey_private_get(&mutex, thread_lock);
        if (mutex == m->lock) {
            *pacquired = 0;
            ap_assert(NULL); /* nested, why? */
            return APR_SUCCESS;
        }
    }
    status = apr_thread_mutex_lock(m->lock);
    *pacquired = (status == APR_SUCCESS);
    if (nested_lock && *pacquired) {
        apr_threadkey_private_set(m->lock, thread_lock);
    }
    return status;
}
#define H2_MPLX_LEAVE(m)    \
    apr_thread_mutex_unlock(m->lock)
 
static void leave_mutex(h2_mplx *m, int acquired)
{
    if (acquired) {
        if (nested_lock) {
            apr_threadkey_private_set(NULL, thread_lock);
        }
        apr_thread_mutex_unlock(m->lock);
    }
}
#define H2_MPLX_ENTER_ALWAYS(m)    \
    apr_thread_mutex_lock(m->lock)

#define H2_MPLX_ENTER_MAYBE(m, lock)    \
    if (lock) apr_thread_mutex_lock(m->lock)

#define H2_MPLX_LEAVE_MAYBE(m, lock)    \
    if (lock) apr_thread_mutex_unlock(m->lock)

static void check_data_for(h2_mplx *m, int stream_id);
static void check_data_for(h2_mplx *m, h2_stream *stream, int lock);

static void stream_output_consumed(void *ctx, 
                                   h2_bucket_beam *beam, apr_off_t length)
@@ -155,6 +126,7 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream)
    h2_stream_cleanup(stream);

    h2_iq_remove(m->q, stream->id);
    h2_fifo_remove(m->readyq, stream);
    h2_ihash_remove(m->streams, stream->id);
    h2_ihash_add(m->shold, stream);
    
@@ -240,7 +212,12 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
        m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
        m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
        m->q = h2_iq_create(m->pool, m->max_streams);
        m->readyq = h2_iq_create(m->pool, m->max_streams);

        status = h2_fifo_set_create(&m->readyq, m->pool, m->max_streams);
        if (status != APR_SUCCESS) {
            apr_pool_destroy(m->pool);
            return NULL;
        }

        m->workers = workers;
        m->max_active = workers->max_workers;
@@ -259,14 +236,15 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,

int h2_mplx_shutdown(h2_mplx *m)
{
    int acquired, max_stream_started = 0;
    int max_stream_started = 0;
    
    H2_MPLX_ENTER(m);

    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
    max_stream_started = m->max_stream_started;
    /* Clear schedule queue, disabling existing streams from starting */ 
    h2_iq_clear(m->q);
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return max_stream_started;
}

@@ -341,12 +319,14 @@ static int stream_destroy_iter(void *ctx, void *val)
    return 0;
}

static void purge_streams(h2_mplx *m)
static void purge_streams(h2_mplx *m, int lock)
{
    if (!h2_ihash_empty(m->spurge)) {
        H2_MPLX_ENTER_MAYBE(m, lock);
        while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) {
            /* repeat until empty */
        }
        H2_MPLX_LEAVE_MAYBE(m, lock);
    }
}

@@ -363,18 +343,16 @@ static int stream_iter_wrap(void *ctx, void *stream)

apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
{
    apr_status_t status;
    int acquired;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    stream_iter_ctx_t x;
    
    H2_MPLX_ENTER(m);

    x.cb = cb;
    x.ctx = ctx;
    h2_ihash_iter(m->streams, stream_iter_wrap, &x);
        
        leave_mutex(m, acquired);
    }
    return status;
    H2_MPLX_LEAVE(m);
    return APR_SUCCESS;
}

static int report_stream_iter(void *ctx, void *val) {
@@ -430,14 +408,13 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
    apr_status_t status;
    int i, wait_secs = 60;
    int acquired;

    /* How to shut down a h2 connection:
     * 0. abort and tell the workers that no more tasks will come from us */
    m->aborted = 1;
    h2_workers_unregister(m->workers, m);
    
    enter_mutex(m, &acquired);
    H2_MPLX_ENTER_ALWAYS(m);

    /* How to shut down a h2 connection:
     * 1. cancel all streams still active */
@@ -482,7 +459,7 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
        h2_ihash_iter(m->shold, unexpected_stream_iter, m);
    }
    
    leave_mutex(m, acquired);
    H2_MPLX_LEAVE(m);

    /* 5. unregister again, now that our workers are done */
    h2_workers_unregister(m->workers, m);
@@ -493,41 +470,34 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)

apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
{
    apr_status_t status = APR_SUCCESS;
    int acquired;
    H2_MPLX_ENTER(m);
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                  H2_STRM_MSG(stream, "cleanup"));
    stream_cleanup(m, stream);        
        leave_mutex(m, acquired);
    }
    return status;
    
    H2_MPLX_LEAVE(m);
    return APR_SUCCESS;
}

h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
{
    h2_stream *s = NULL;
    int acquired;
    
    if ((enter_mutex(m, &acquired)) == APR_SUCCESS) {
    H2_MPLX_ENTER_ALWAYS(m);

    s = h2_ihash_get(m->streams, id);
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return s;
}

static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
{
    h2_mplx *m = ctx;
    int acquired;
    h2_stream *stream = ctx;
    h2_mplx *m = stream->session->mplx;
    
    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                      "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id);
        check_data_for(m, beam->id);
        leave_mutex(m, acquired);
    }
    check_data_for(m, stream, 1);
}

static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
@@ -551,7 +521,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
    }
    
    h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream);
    h2_beam_on_produced(stream->output, output_produced, m);
    h2_beam_on_produced(stream->output, output_produced, stream);
    if (stream->task->output.copy_files) {
        h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL);
    }
@@ -561,24 +531,24 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
    
    /* we might see some file buckets in the output, see
     * if we have enough handles reserved. */
    check_data_for(m, stream->id);
    check_data_for(m, stream, 0);
    return status;
}

apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
{
    apr_status_t status;
    int acquired;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    H2_MPLX_ENTER(m);

    if (m->aborted) {
        status = APR_ECONNABORTED;
    }
    else {
        status = out_open(m, stream_id, beam);
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}

@@ -601,7 +571,7 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task)
    status = h2_beam_close(task->output.beam);
    h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close");
    output_consumed_signal(m, task);
    check_data_for(m, task->stream_id);
    check_data_for(m, stream, 0);
    return status;
}

@@ -609,17 +579,17 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
                                 apr_thread_cond_t *iowait)
{
    apr_status_t status;
    int acquired;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    H2_MPLX_ENTER(m);

    if (m->aborted) {
        status = APR_ECONNABORTED;
    }
        else if (apr_atomic_read32(&m->event_pending) > 0) {
    else if (h2_mplx_has_master_events(m)) {
        status = APR_SUCCESS;
    }
    else {
            purge_streams(m);
        purge_streams(m, 0);
        h2_ihash_iter(m->streams, report_consumption_iter, m);
        m->added_output = iowait;
        status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
@@ -630,27 +600,29 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
        }
        m->added_output = NULL;
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}

static void check_data_for(h2_mplx *m, int stream_id)
static void check_data_for(h2_mplx *m, h2_stream *stream, int lock)
{
    ap_assert(m);
    h2_iq_append(m->readyq, stream_id);
    if (h2_fifo_push(m->readyq, stream) == APR_SUCCESS) {
        apr_atomic_set32(&m->event_pending, 1);
        H2_MPLX_ENTER_MAYBE(m, lock);
        if (m->added_output) {
            apr_thread_cond_signal(m->added_output);
        }
        H2_MPLX_LEAVE_MAYBE(m, lock);
    }
}

apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
{
    apr_status_t status;
    int acquired;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    H2_MPLX_ENTER(m);

    if (m->aborted) {
        status = APR_ECONNABORTED;
    }
@@ -658,9 +630,10 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
        h2_iq_sort(m->q, cmp, ctx);
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                      "h2_mplx(%ld): reprioritize tasks", m->id);
        status = APR_SUCCESS;
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}

@@ -682,17 +655,18 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
                             h2_stream_pri_cmp *cmp, void *ctx)
{
    apr_status_t status;
    int acquired;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    H2_MPLX_ENTER(m);

    if (m->aborted) {
        status = APR_ECONNABORTED;
    }
    else {
        status = APR_SUCCESS;
        h2_ihash_add(m->streams, stream);
        if (h2_stream_is_ready(stream)) {
            /* already have a response */
                check_data_for(m, stream->id);
            check_data_for(m, stream, 0);
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                          H2_STRM_MSG(stream, "process, add to readyq")); 
        }
@@ -703,8 +677,8 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
                          H2_STRM_MSG(stream, "process, added to q")); 
        }
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}

@@ -762,11 +736,10 @@ static h2_task *next_stream_task(h2_mplx *m)
h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
{
    h2_task *task = NULL;
    apr_status_t status;
    int acquired;
    
    H2_MPLX_ENTER_ALWAYS(m);

    *has_more = 0;
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    if (!m->aborted) {
        task = next_stream_task(m);
        if (task != NULL && !h2_iq_empty(m->q)) {
@@ -776,8 +749,8 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
            m->is_registered = 0; /* h2_workers will discard this mplx */
        }
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return task;
}

@@ -814,7 +787,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
    if (task->engine) {
        if (!m->aborted && !task->c->aborted 
            && !h2_req_engine_is_shutdown(task->engine)) {
            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(10022)
                          "h2_mplx(%ld): task(%s) has not-shutdown "
                          "engine(%s)", m->id, task->id, 
                          h2_req_engine_get_id(task->engine));
@@ -845,35 +818,37 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
    }
    
    stream = h2_ihash_get(m->streams, task->stream_id);
    if (stream && !m->aborted && h2_ihash_get(m->sredo, stream->id)) {
    if (stream) {
        /* stream not done yet. */
        if (!m->aborted && h2_ihash_get(m->sredo, stream->id)) {
            /* reset and schedule again */
            h2_task_redo(task);
            h2_ihash_remove(m->sredo, stream->id);
            h2_iq_add(m->q, stream->id, NULL, NULL);
        return;
        }
    
    if (stream) {
        else {
            /* stream not cleaned up, stay around */
            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                          H2_STRM_MSG(stream, "task_done, stream open")); 
            /* more data will not arrive, resume the stream */
            check_data_for(m, stream, 0);
            
            if (stream->input) {
            h2_beam_mutex_disable(stream->input);
                h2_beam_leave(stream->input);
                h2_beam_mutex_disable(stream->input);
            }
            if (stream->output) {
                h2_beam_mutex_disable(stream->output);
            }
        check_data_for(m, stream->id);
        }
    }
    else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
        /* stream is done, was just waiting for this. */
        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                      H2_STRM_MSG(stream, "task_done, in hold"));
        /* stream was just waiting for us. */
        if (stream->input) {
            h2_beam_mutex_disable(stream->input);
            h2_beam_leave(stream->input);
            h2_beam_mutex_disable(stream->input);
        }
        if (stream->output) {
            h2_beam_mutex_disable(stream->output);
@@ -895,11 +870,11 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)

void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
{
    int acquired;
    H2_MPLX_ENTER_ALWAYS(m);

    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
    task_done(m, task, NULL);
    --m->tasks_active;
    
    if (m->join_wait) {
        apr_thread_cond_signal(m->join_wait);
    }
@@ -908,8 +883,8 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
        *ptask = next_stream_task(m);
    }
    register_if_needed(m);
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
}

/*******************************************************************************
@@ -1001,10 +976,11 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
{
    apr_status_t status = APR_SUCCESS;
    apr_time_t now;            
    int acquired;
    apr_size_t scount;
    
    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
        apr_size_t scount = h2_ihash_count(m->streams);
    H2_MPLX_ENTER(m);

    scount = h2_ihash_count(m->streams);
    if (scount > 0 && m->tasks_active) {
        /* If we have streams in connection state 'IDLE', meaning
         * all streams are ready to sent data out, but lack
@@ -1045,8 +1021,8 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
        }
    }
    register_if_needed(m);
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}

@@ -1090,7 +1066,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
    apr_status_t status;
    h2_mplx *m;
    h2_task *task;
    int acquired;
    h2_stream *stream;
    
    task = h2_ctx_rget_task(r);
    if (!task) {
@@ -1098,17 +1074,17 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
    }
    m = task->mplx;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
        h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
    H2_MPLX_ENTER(m);

    stream = h2_ihash_get(m->streams, task->stream_id);
    if (stream) {
        status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
    }
    else {
        status = APR_ECONNABORTED;
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}

@@ -1120,10 +1096,11 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
    h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
    h2_mplx *m = h2_ngn_shed_get_ctx(shed);
    apr_status_t status;
    int acquired;
    int want_shutdown;
    
    H2_MPLX_ENTER(m);

    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
        int want_shutdown = (block == APR_BLOCK_READ);
    want_shutdown = (block == APR_BLOCK_READ);

    /* Take this opportunity to update output consummation 
     * for this engine */
@@ -1147,8 +1124,8 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
        status = h2_ngn_shed_pull_request(shed, ngn, capacity,
                                          want_shutdown, pr);
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return status;
}
 
@@ -1159,10 +1136,11 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
    
    if (task) {
        h2_mplx *m = task->mplx;
        int acquired;
        h2_stream *stream;

        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
            h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
        H2_MPLX_ENTER_ALWAYS(m);

        stream = h2_ihash_get(m->streams, task->stream_id);
        
        ngn_out_update_windows(m, ngn);
        h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
@@ -1172,16 +1150,15 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
            && !h2_ihash_get(m->sredo, stream->id)) {
            h2_ihash_add(m->sredo, stream);
        }

        if (task->engine) { 
            /* cannot report that as done until engine returns */
        }
        else {
            task_done(m, task, ngn);
        }
            /* Take this opportunity to update output consummation 
             * for this engine */
            leave_mutex(m, acquired);
        }

        H2_MPLX_LEAVE(m);
    }
}

@@ -1198,65 +1175,47 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
                                            stream_ev_callback *on_resume, 
                                            void *on_ctx)
{
    apr_status_t status;
    int acquired;
    int ids[100];
    h2_stream *stream;
    size_t i, n;
    int n;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                  "h2_mplx(%ld): dispatch events", m->id);        
    apr_atomic_set32(&m->event_pending, 0);
        purge_streams(m);

    /* update input windows for streams */
    h2_ihash_iter(m->streams, report_consumption_iter, m);    
    purge_streams(m, 1);
    
        if (!h2_iq_empty(m->readyq)) {
            n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
            for (i = 0; i < n; ++i) {
                stream = h2_ihash_get(m->streams, ids[i]);
                if (stream) {
                    leave_mutex(m, acquired);
    n = h2_fifo_count(m->readyq);
    while (n > 0 
           && (h2_fifo_try_pull(m->readyq, (void**)&stream) == APR_SUCCESS)) {
        --n;
        on_resume(on_ctx, stream);
                    enter_mutex(m, &acquired);
                }
            }
    }
        if (!h2_iq_empty(m->readyq)) {
            apr_atomic_set32(&m->event_pending, 1);
        } 
        leave_mutex(m, acquired);
    }
    return status;
    
    return APR_SUCCESS;
}

apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id)
apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream)
{
    apr_status_t status;
    int acquired;
    
    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
        check_data_for(m, stream_id);
        leave_mutex(m, acquired);
    }
    return status;
    check_data_for(m, stream, 1);
    return APR_SUCCESS;
}

int h2_mplx_awaits_data(h2_mplx *m)
{
    apr_status_t status;
    int acquired, waiting = 1;
    int waiting = 1;
     
    H2_MPLX_ENTER_ALWAYS(m);

    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
    if (h2_ihash_empty(m->streams)) {
        waiting = 0;
    }
        if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
    if ((h2_fifo_count(m->readyq) == 0) 
        && h2_iq_empty(m->q) && !m->tasks_active) {
        waiting = 0;
    }
        leave_mutex(m, acquired);
    }

    H2_MPLX_LEAVE(m);
    return waiting;
}
+2 −2
Original line number Diff line number Diff line
@@ -68,7 +68,7 @@ struct h2_mplx {
    struct h2_ihash_t *spurge;      /* all streams done, ready for destroy */
    
    struct h2_iqueue *q;            /* all stream ids that need to be started */
    struct h2_iqueue *readyq;       /* all stream ids ready for output */
    struct h2_fifo *readyq;         /* all streams ready for output */
        
    struct h2_ihash_t *redo_tasks;  /* all tasks that need to be redone */
    
@@ -158,7 +158,7 @@ apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, struct h2_stream *stream);
apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
                                 struct apr_thread_cond_t *iowait);

apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id);
apr_status_t h2_mplx_keep_active(h2_mplx *m, struct h2_stream *stream);

/*******************************************************************************
 * Stream processing.
Loading