Commit 29737a74 authored by Stefan Eissing's avatar Stefan Eissing
Browse files

On the trunk:

mod_http2: move stuff from master connection to worker threads, increase spare slave connections, create output beams in worker when needed.



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1789535 13f79535-47bb-0310-9956-ffa450edef68
parent 01872a6d
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -2,7 +2,9 @@
Changes with Apache 2.5.0

  *) mod_http2: better performance, eliminated need for nested locks and
     thread privates. [Stefan Eissing]
     thread privates. Moving request setups from the main connection to the
     worker threads. Increase number of spare connections kept.
     [Stefan Eissing]
     
  *) core: Disallow multiple Listen on the same IP:port when listener buckets
     are configured (ListenCoresBucketsRatio > 0), consistently with the single
+1 −2
Original line number Diff line number Diff line
@@ -832,11 +832,10 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
        apr_bucket_file *bf = b->data;
        apr_file_t *fd = bf->fd;
        int can_beam = (bf->refcount.refcount == 1);
        if (can_beam && beam->last_beamed != fd && beam->can_beam_fn) {
        if (can_beam && beam->can_beam_fn) {
            can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
        }
        if (can_beam) {
            beam->last_beamed = fd;
            status = apr_bucket_setaside(b, beam->send_pool);
        }
        /* else: enter ENOTIMPL case below */
+10 −1
Original line number Diff line number Diff line
@@ -183,7 +183,6 @@ struct h2_bucket_beam {

    apr_size_t buckets_sent;  /* # of beam buckets sent */
    apr_size_t files_beamed;  /* how many file handles have been set aside */
    apr_file_t *last_beamed;  /* last file beamed */
    
    unsigned int aborted : 1;
    unsigned int closed : 1;
@@ -376,6 +375,16 @@ int h2_beam_report_consumption(h2_bucket_beam *beam);
void h2_beam_on_produced(h2_bucket_beam *beam, 
                         h2_beam_io_callback *io_cb, void *ctx);

/**
 * Register a callback that may prevent a file from being beam as
 * file handle, forcing the file content to be copied. Then no callback
 * is set (NULL), file handles are transferred directly.
 * @param beam the beam to set the callback on
 * @param io_cb the callback or NULL, called on receiver with bytes produced
 * @param ctx  the context to use in callback invocation
 * 
 * Call from the receiver side, callbacks invoked on either side.
 */
void h2_beam_on_file_beam(h2_bucket_beam *beam, 
                          h2_beam_can_beam_callback *cb, void *ctx);

+2 −0
Original line number Diff line number Diff line
@@ -308,6 +308,7 @@ conn_rec *h2_slave_create(conn_rec *master, int slave_id, apr_pool_t *parent)
                                             master->id, slave_id);
    /* Simulate that we had already a request on this connection. */
    c->keepalives             = 1;
    c->aborted                = 0;
    /* We cannot install the master connection socket on the slaves, as
     * modules mess with timeouts/blocking of the socket, with
     * unwanted side effects to the master connection processing.
@@ -335,6 +336,7 @@ void h2_slave_destroy(conn_rec *slave)
    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave,
                  "h2_stream(%s): destroy slave", 
                  apr_table_get(slave->notes, H2_TASK_ID_NOTE));
    slave->sbh = NULL;
    apr_pool_destroy(slave->pool);
}

+57 −61
Original line number Diff line number Diff line
@@ -131,11 +131,6 @@ static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t len
    h2_stream_in_consumed(ctx, length);
}

static int can_always_beam_file(void *ctx, h2_bucket_beam *beam,  apr_file_t *file)
{
    return 1;
}

static void stream_joined(h2_mplx *m, h2_stream *stream)
{
    ap_assert(!stream->task || stream->task->worker_done);
@@ -152,8 +147,10 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream)
        h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
        h2_beam_abort(stream->input);
    }
    if (stream->output) {
        h2_beam_on_produced(stream->output, NULL, NULL);
        h2_beam_leave(stream->output);
    }
    
    h2_stream_cleanup(stream);

@@ -246,10 +243,10 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
        m->readyq = h2_iq_create(m->pool, m->max_streams);

        m->workers = workers;
        m->workers_max = workers->max_workers;
        m->workers_limit = 6; /* the original h1 max parallel connections */
        m->max_active = workers->max_workers;
        m->limit_active = 6; /* the original h1 max parallel connections */
        m->last_limit_change = m->last_idle_block = apr_time_now();
        m->limit_change_interval = apr_time_from_msec(200);
        m->limit_change_interval = apr_time_from_msec(100);
        
        m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*));
        
@@ -301,7 +298,7 @@ static void task_destroy(h2_mplx *m, h2_task *task)
    int reuse_slave = 0;
    
    slave = task->c;
    reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
    reuse_slave = ((m->spare_slaves->nelts < (m->limit_active * 3 / 2))
                   && !task->rst_error);
    
    if (slave) {
@@ -328,12 +325,6 @@ static int stream_destroy_iter(void *ctx, void *val)
    h2_ihash_remove(m->spurge, stream->id);
    ap_assert(stream->state == H2_SS_CLEANUP);
    
    if (stream->output == NULL) {
        ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, m->c, 
                      H2_STRM_MSG(stream, "already with beams==NULL"));
        return 0;
    }
    
    if (stream->input) {
        /* Process outstanding events before destruction */
        input_consumed_signal(m, stream);    
@@ -342,9 +333,6 @@ static int stream_destroy_iter(void *ctx, void *val)
        stream->input = NULL;
    }

    h2_beam_log(stream->output, m->c, APLOG_TRACE2, "stream_destroy");
    h2_beam_destroy(stream->output);
    stream->output = NULL;
    if (stream->task) {
        task_destroy(m, stream->task);
        stream->task = NULL;
@@ -547,10 +535,13 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
    apr_status_t status = APR_SUCCESS;
    h2_stream *stream = h2_ihash_get(m->streams, stream_id);
    
    if (!stream || !stream->task) {
    if (!stream || !stream->task || m->aborted) {
        return APR_ECONNABORTED;
    }
    
    ap_assert(stream->output == NULL);
    stream->output = beam;
    
    if (APLOGctrace2(m->c)) {
        h2_beam_log(beam, m->c, APLOG_TRACE2, "out_open");
    }
@@ -561,8 +552,8 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
    
    h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream);
    h2_beam_on_produced(stream->output, output_produced, m);
    if (!stream->task->output.copy_files) {
        h2_beam_on_file_beam(stream->output, can_always_beam_file, m);
    if (stream->task->output.copy_files) {
        h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL);
    }
    
    /* time to protect the beam against multi-threaded use */
@@ -721,7 +712,7 @@ static h2_task *next_stream_task(h2_mplx *m)
{
    h2_stream *stream;
    int sid;
    while (!m->aborted && (m->workers_busy < m->workers_limit)
    while (!m->aborted && (m->tasks_active < m->limit_active)
           && (sid = h2_iq_shift(m->q)) > 0) {
        
        stream = h2_ihash_get(m->streams, sid);
@@ -731,36 +722,37 @@ static h2_task *next_stream_task(h2_mplx *m)
            pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
            if (pslave) {
                slave = *pslave;
                slave->aborted = 0;
            }
            else {
                slave = h2_slave_create(m->c, stream->id, m->pool);
            }
            
            slave->sbh = m->c->sbh;
            slave->aborted = 0;
            if (!stream->task) {
                stream->task = h2_task_create(stream, slave);
            
                m->c->keepalives++;
                apr_table_setn(slave->notes, H2_TASK_ID_NOTE, stream->task->id);
                h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));

                if (sid > m->max_stream_started) {
                    m->max_stream_started = sid;
                }
                
                if (stream->input) {
                    h2_beam_on_consumed(stream->input, stream_input_ev, 
                                        stream_input_consumed, stream);
                    h2_beam_on_file_beam(stream->input, can_always_beam_file, m);
                    h2_beam_mutex_enable(stream->input);
                }
                
                h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
                stream->task = h2_task_create(slave, stream->id, 
                                              stream->request, m, stream->input, 
                                              stream->session->s->timeout,
                                              m->stream_max_mem);
                if (!stream->task) {
                    ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave,
                                  H2_STRM_LOG(APLOGNO(02941), stream, 
                                  "create task"));
                    return NULL;
                }
                
            }
            stream->task->worker_started = 1;
            stream->task->started_at = apr_time_now();
            ++m->workers_busy;
            
            ++m->tasks_active;
            return stream->task;
        }
    }
@@ -841,14 +833,14 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
         * a block by flow control.
         */
        if (task->done_at- m->last_limit_change >= m->limit_change_interval
            && m->workers_limit < m->workers_max) {
            && m->limit_active < m->max_active) {
            /* Well behaving stream, allow it more workers */
            m->workers_limit = H2MIN(m->workers_limit * 2, 
                                     m->workers_max);
            m->limit_active = H2MIN(m->limit_active * 2, 
                                     m->max_active);
            m->last_limit_change = task->done_at;
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                          "h2_mplx(%ld): increase worker limit to %d",
                          m->id, m->workers_limit);
                          m->id, m->limit_active);
        }
    }
    
@@ -870,7 +862,9 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
            h2_beam_mutex_disable(stream->input);
            h2_beam_leave(stream->input);
        }
        if (stream->output) {
            h2_beam_mutex_disable(stream->output);
        }
        check_data_for(m, stream->id);
    }
    else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
@@ -881,7 +875,9 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
            h2_beam_mutex_disable(stream->input);
            h2_beam_leave(stream->input);
        }
        if (stream->output) {
            h2_beam_mutex_disable(stream->output);
        }
        stream_joined(m, stream);
    }
    else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) {
@@ -903,7 +899,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
    
    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
        task_done(m, task, NULL);
        --m->workers_busy;
        --m->tasks_active;
        if (m->join_wait) {
            apr_thread_cond_signal(m->join_wait);
        }
@@ -982,14 +978,14 @@ static apr_status_t unschedule_slow_tasks(h2_mplx *m)
    /* Try to get rid of streams that occupy workers. Look for safe requests
     * that are repeatable. If none found, fail the connection.
     */
    n = (m->workers_busy - m->workers_limit - (int)h2_ihash_count(m->sredo));
    n = (m->tasks_active - m->limit_active - (int)h2_ihash_count(m->sredo));
    while (n > 0 && (stream = get_latest_repeatable_unsubmitted_stream(m))) {
        h2_task_rst(stream->task, H2_ERR_CANCEL);
        h2_ihash_add(m->sredo, stream);
        --n;
    }
    
    if ((m->workers_busy - h2_ihash_count(m->sredo)) > m->workers_limit) {
    if ((m->tasks_active - h2_ihash_count(m->sredo)) > m->limit_active) {
        h2_stream *stream = get_timed_out_busy_stream(m);
        if (stream) {
            /* Too many busy workers, unable to cancel enough streams
@@ -1009,7 +1005,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
    
    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
        apr_size_t scount = h2_ihash_count(m->streams);
        if (scount > 0 && m->workers_busy) {
        if (scount > 0 && m->tasks_active) {
            /* If we have streams in connection state 'IDLE', meaning
             * all streams are ready to sent data out, but lack
             * WINDOW_UPDATEs. 
@@ -1024,27 +1020,27 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
             */
            now = apr_time_now();
            m->last_idle_block = now;
            if (m->workers_limit > 2 
            if (m->limit_active > 2 
                && now - m->last_limit_change >= m->limit_change_interval) {
                if (m->workers_limit > 16) {
                    m->workers_limit = 16;
                if (m->limit_active > 16) {
                    m->limit_active = 16;
                }
                else if (m->workers_limit > 8) {
                    m->workers_limit = 8;
                else if (m->limit_active > 8) {
                    m->limit_active = 8;
                }
                else if (m->workers_limit > 4) {
                    m->workers_limit = 4;
                else if (m->limit_active > 4) {
                    m->limit_active = 4;
                }
                else if (m->workers_limit > 2) {
                    m->workers_limit = 2;
                else if (m->limit_active > 2) {
                    m->limit_active = 2;
                }
                m->last_limit_change = now;
                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                              "h2_mplx(%ld): decrease worker limit to %d",
                              m->id, m->workers_limit);
                              m->id, m->limit_active);
            }
            
            if (m->workers_busy > m->workers_limit) {
            if (m->tasks_active > m->limit_active) {
                status = unschedule_slow_tasks(m);
            }
        }
@@ -1257,7 +1253,7 @@ int h2_mplx_awaits_data(h2_mplx *m)
        if (h2_ihash_empty(m->streams)) {
            waiting = 0;
        }
        if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->workers_busy) {
        if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
            waiting = 0;
        }
        leave_mutex(m, acquired);
Loading