Commit 5a4a1733 authored by Stefan Eissing's avatar Stefan Eissing
Browse files

On the trunk:

mod_http2: better performance, eliminated need for nested locks and thread privates.



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1789395 13f79535-47bb-0310-9956-ffa450edef68
parent 04c61b0c
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
                                                         -*- coding: utf-8 -*-
Changes with Apache 2.5.0

  *) mod_http2: better performance, eliminated need for nested locks and
     thread privates. [Stefan Eissing]
     
  *) core: Disallow multiple Listen on the same IP:port when listener buckets
     are configured (ListenCoresBucketsRatio > 0), consistently with the single
     bucket case (default), thus avoiding the leak of the corresponding socket
+45 −64
Original line number Diff line number Diff line
@@ -57,35 +57,40 @@ typedef struct {

/* NULL or the mutex hold by this thread, used for recursive calls
 */
static const int nested_lock = 0;

static apr_threadkey_t *thread_lock;

apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
{
    if (nested_lock) {
        return apr_threadkey_private_create(&thread_lock, NULL, pool);
    }
    return APR_SUCCESS;
}

static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
{
    apr_status_t status;
    void *mutex = NULL;
    
    if (nested_lock) {
        void *mutex = NULL;
        /* Enter the mutex if this thread already holds the lock or
         * if we can acquire it. Only on the later case do we unlock
         * onleaving the mutex.
         * This allow recursive entering of the mutex from the saem thread,
         * which is what we need in certain situations involving callbacks
         */
    ap_assert(m);
        apr_threadkey_private_get(&mutex, thread_lock);
        if (mutex == m->lock) {
            *pacquired = 0;
            ap_assert(NULL); /* nested, why? */
            return APR_SUCCESS;
        }

    ap_assert(m->lock);
    }
    status = apr_thread_mutex_lock(m->lock);
    *pacquired = (status == APR_SUCCESS);
    if (*pacquired) {
    if (nested_lock && *pacquired) {
        apr_threadkey_private_set(m->lock, thread_lock);
    }
    return status;
@@ -94,7 +99,9 @@ static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
static void leave_mutex(h2_mplx *m, int acquired)
{
    if (acquired) {
        if (nested_lock) {
            apr_threadkey_private_set(NULL, thread_lock);
        }
        apr_thread_mutex_unlock(m->lock);
    }
}
@@ -105,38 +112,23 @@ static void stream_output_consumed(void *ctx,
                                   h2_bucket_beam *beam, apr_off_t length)
{
    h2_stream *stream = ctx;
    h2_mplx *m = stream->session->mplx;
    h2_task *task = stream->task;
    int acquired;
    
    if (length > 0 && task && task->assigned) {
        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
        h2_req_engine_out_consumed(task->assigned, task->c, length); 
            leave_mutex(m, acquired);
        }
    }
}

static void stream_input_ev(void *ctx, h2_bucket_beam *beam)
{
    h2_mplx *m = ctx;
    h2_stream *stream = ctx;
    h2_mplx *m = stream->session->mplx;
    apr_atomic_set32(&m->event_pending, 1); 
}

static void stream_input_consumed(void *ctx, 
                                  h2_bucket_beam *beam, apr_off_t length)
static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
{
    if (length > 0) { 
        h2_mplx *m = ctx;
        int acquired;
    
        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
            if (m->input_consumed) {
                m->input_consumed(m->input_consumed_ctx, beam->id, length);
            }
            leave_mutex(m, acquired);
        }
    }
    h2_stream_in_consumed(ctx, length);
}

static int can_always_beam_file(void *ctx, h2_bucket_beam *beam,  apr_file_t *file)
@@ -289,6 +281,12 @@ static int input_consumed_signal(h2_mplx *m, h2_stream *stream)
    return 0;
}

static int report_consumption_iter(void *ctx, void *val)
{
    input_consumed_signal(ctx, val);
    return 1;
}

static int output_consumed_signal(h2_mplx *m, h2_task *task)
{
    if (task->output.beam) {
@@ -426,6 +424,10 @@ static int stream_cancel_iter(void *ctx, void *val) {
    h2_mplx *m = ctx;
    h2_stream *stream = val;

    /* disabled input consumed reporting */
    if (stream->input) {
        h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
    }
    /* take over event monitoring */
    h2_stream_set_monitor(stream, NULL);
    /* Reset, should transit to CLOSED state */
@@ -527,12 +529,6 @@ h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
    return s;
}

void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
{
    m->input_consumed = cb;
    m->input_consumed_ctx = ctx;
}

static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
{
    h2_mplx *m = ctx;
@@ -618,18 +614,6 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task)
    return status;
}

static int report_input_consumption(void *ctx, void *val)
{
    h2_stream *stream = val;
    
    (void)ctx;
    if (stream->input) {
        h2_beam_report_consumption(stream->input);
    }
    return 1;
}


apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
                                 apr_thread_cond_t *iowait)
{
@@ -645,7 +629,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
        }
        else {
            purge_streams(m);
            h2_ihash_iter(m->streams, report_input_consumption, m);
            h2_ihash_iter(m->streams, report_consumption_iter, m);
            m->added_output = iowait;
            status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
            if (APLOGctrace2(m->c)) {
@@ -757,7 +741,7 @@ static h2_task *next_stream_task(h2_mplx *m)
                
                if (stream->input) {
                    h2_beam_on_consumed(stream->input, stream_input_ev, 
                                        stream_input_consumed, m);
                                        stream_input_consumed, stream);
                    h2_beam_on_file_beam(stream->input, can_always_beam_file, m);
                    h2_beam_mutex_enable(stream->input);
                }
@@ -1207,12 +1191,6 @@ int h2_mplx_has_master_events(h2_mplx *m)
    return apr_atomic_read32(&m->event_pending) > 0;
}

static int report_consumption_iter(void *ctx, void *val)
{
    input_consumed_signal(ctx, val);
    return 1;
}

apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, 
                                            stream_ev_callback *on_resume, 
                                            void *on_ctx)
@@ -1227,16 +1205,19 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                      "h2_mplx(%ld): dispatch events", m->id);        
        apr_atomic_set32(&m->event_pending, 0);
        purge_streams(m);
        
        /* update input windows for streams */
        h2_ihash_iter(m->streams, report_consumption_iter, m);
        purge_streams(m);
        
        if (!h2_iq_empty(m->readyq)) {
            n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
            for (i = 0; i < n; ++i) {
                stream = h2_ihash_get(m->streams, ids[i]);
                if (stream) {
                    leave_mutex(m, acquired);
                    on_resume(on_ctx, stream);
                    enter_mutex(m, &acquired);
                }
            }
        }
+0 −21
Original line number Diff line number Diff line
@@ -53,12 +53,6 @@ struct h2_req_engine;

typedef struct h2_mplx h2_mplx;

/**
 * Callback invoked for every stream that had input data read since
 * the last invocation.
 */
typedef void h2_mplx_consumed_cb(void *ctx, int stream_id, apr_off_t consumed);

struct h2_mplx {
    long id;
    conn_rec *c;
@@ -100,9 +94,6 @@ struct h2_mplx {
    
    struct h2_workers *workers;
    
    h2_mplx_consumed_cb *input_consumed;
    void *input_consumed_ctx;

    struct h2_ngn_shed *ngn_shed;
};

@@ -194,18 +185,6 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
 */
apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx);

/**
 * Register a callback for the amount of input data consumed per stream. The
 * will only ever be invoked from the thread creating this h2_mplx, e.g. when
 * calls from that thread into this h2_mplx are made.
 *
 * @param m the multiplexer to register the callback at
 * @param cb the function to invoke
 * @param ctx user supplied argument to invocation.
 */
void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx);


typedef apr_status_t stream_ev_callback(void *ctx, struct h2_stream *stream);

/**
+0 −65
Original line number Diff line number Diff line
@@ -77,68 +77,6 @@ static h2_stream *get_stream(h2_session *session, int stream_id)
    return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
}

static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
{
    h2_session *session = ctx;
    h2_stream *stream;
    
    if (bytes_read > 0) {
        apr_off_t consumed = bytes_read;
        
        while (consumed > 0) {
            int len = (bytes_read > INT_MAX)? INT_MAX : bytes_read;
            nghttp2_session_consume(session->ngh2, stream_id, (int)bytes_read);
            consumed -= len;
        }

        (void)stream;
#ifdef H2_NG2_LOCAL_WIN_SIZE
        if ((stream = get_stream(session, stream_id))) {
            int cur_size = nghttp2_session_get_stream_local_window_size(
                session->ngh2, stream->id);
            int win = stream->in_window_size;
            int thigh = win * 8/10;
            int tlow = win * 2/10;
            const int win_max = 2*1024*1024;
            const int win_min = 32*1024;
            
            /* Work in progress, probably shoud add directives for these
             * values once this stabilizes somewhat. The general idea is
             * to adapt stream window sizes if the input window changes
             * a) very quickly (< good RTT) from full to empty
             * b) only a little bit (> bad RTT)
             * where in a) it grows and in b) it shrinks again.
             */
            if (cur_size > thigh && bytes_read > thigh && win < win_max) {
                /* almost empty again with one reported consumption, how
                 * long did this take? */
                long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
                if (ms < 40) {
                    win = H2MIN(win_max, win + (64*1024));
                }
            }
            else if (cur_size < tlow && bytes_read < tlow && win > win_min) {
                /* staying full, for how long already? */
                long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
                if (ms > 700) {
                    win = H2MAX(win_min, win - (32*1024));
                }
            }
            
            if (win != stream->in_window_size) {
                stream->in_window_size = win;
                nghttp2_session_set_local_window_size(session->ngh2, 
                        NGHTTP2_FLAG_NONE, stream_id, win);
            } 
            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
                          "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d",
                          session->id, stream_id, (long)bytes_read, 
                          cur_size, stream->in_window_size);
        }
#endif
    }
}

static void dispatch_event(h2_session *session, h2_session_event_t ev, 
                             int err, const char *msg);

@@ -755,7 +693,6 @@ static apr_status_t session_cleanup(h2_session *session, const char *trigger)
    }

    transit(session, trigger, H2_SESSION_ST_CLEANUP);
    h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
    h2_mplx_release_and_join(session->mplx, session->iowait);
    session->mplx = NULL;

@@ -880,8 +817,6 @@ static apr_status_t h2_session_create_int(h2_session **psession,
    session->mplx = h2_mplx_create(c, session->pool, session->config, 
                                   workers);
    
    h2_mplx_set_consumed_cb(session->mplx, update_window, session);
    
    /* connection input filter that feeds the session */
    session->cin = h2_filter_cin_create(session);
    ap_add_input_filter("H2_IN", session->cin, r, c);
+60 −0
Original line number Diff line number Diff line
@@ -969,4 +969,64 @@ int h2_stream_was_closed(const h2_stream *stream)
    }
}

apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount)
{
    h2_session *session = stream->session;
    
    if (amount > 0) {
        apr_off_t consumed = amount;
        
        while (consumed > 0) {
            int len = (consumed > INT_MAX)? INT_MAX : consumed;
            nghttp2_session_consume(session->ngh2, stream->id, len);
            consumed -= len;
        }

#ifdef H2_NG2_LOCAL_WIN_SIZE
        if (1) {
            int cur_size = nghttp2_session_get_stream_local_window_size(
                session->ngh2, stream->id);
            int win = stream->in_window_size;
            int thigh = win * 8/10;
            int tlow = win * 2/10;
            const int win_max = 2*1024*1024;
            const int win_min = 32*1024;
            
            /* Work in progress, probably should add directives for these
             * values once this stabilizes somewhat. The general idea is
             * to adapt stream window sizes if the input window changes
             * a) very quickly (< good RTT) from full to empty
             * b) only a little bit (> bad RTT)
             * where in a) it grows and in b) it shrinks again.
             */
            if (cur_size > thigh && amount > thigh && win < win_max) {
                /* almost empty again with one reported consumption, how
                 * long did this take? */
                long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
                if (ms < 40) {
                    win = H2MIN(win_max, win + (64*1024));
                }
            }
            else if (cur_size < tlow && amount < tlow && win > win_min) {
                /* staying full, for how long already? */
                long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
                if (ms > 700) {
                    win = H2MAX(win_min, win - (32*1024));
                }
            }
            
            if (win != stream->in_window_size) {
                stream->in_window_size = win;
                nghttp2_session_set_local_window_size(session->ngh2, 
                        NGHTTP2_FLAG_NONE, stream->id, win);
            } 
            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
                          "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d",
                          session->id, stream->id, (long)amount, 
                          cur_size, stream->in_window_size);
        }
#endif
    }
    return APR_SUCCESS;   
}
Loading