Commit 6f0b0709 authored by Stefan Eissing's avatar Stefan Eissing
Browse files

On the trunk:

mod_http2: input buffering and dynamic flow windows for increased throughput.



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1788981 13f79535-47bb-0310-9956-ffa450edef68
parent 23637c59
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
                                                         -*- coding: utf-8 -*-
Changes with Apache 2.5.0

  *) mod_http2: input buffering and dynamic flow windows for increased 
     throughput. [Stefan Eissing]

  *) mod_http2: h2 workers with improved scalability for better scheduling
     performance. There are H2MaxWorkers threads created at start and the
     number is kept constant. [Stefan Eissing]
+2 −1
Original line number Diff line number Diff line
@@ -115,7 +115,8 @@ typedef enum {
    H2_SEV_CLOSED_L,
    H2_SEV_CLOSED_R,
    H2_SEV_CANCELLED,
    H2_SEV_EOS_SENT
    H2_SEV_EOS_SENT,
    H2_SEV_IN_DATA_PENDING,
} h2_stream_event_t;


+13 −11
Original line number Diff line number Diff line
@@ -246,6 +246,7 @@ static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
    apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
    h2_beam_io_callback *cb = beam->cons_io_cb;
     
    if (len > 0) {
        if (cb) {
            void *ctx = beam->cons_ctx;
            
@@ -255,6 +256,7 @@ static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
            rv = 1;
        }
        beam->cons_bytes_reported += len;
    }
    return rv;
}

@@ -1250,9 +1252,9 @@ void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg)
    if (beam && APLOG_C_IS_LEVEL(c,level)) {
        ap_log_cerror(APLOG_MARK, level, 0, c, 
                      "beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s", 
                      c->id, beam->id, beam->tag, beam->closed, beam->aborted, 
                      h2_beam_empty(beam), (long)h2_beam_get_buffered(beam),
                      msg);
                      (c->master? c->master->id : c->id), beam->id, beam->tag, 
                      beam->closed, beam->aborted, h2_beam_empty(beam), 
                      (long)h2_beam_get_buffered(beam), msg);
    }
}

+69 −43
Original line number Diff line number Diff line
@@ -44,55 +44,80 @@
#define UNSET       -1
#define H2MIN(x,y) ((x) < (y) ? (x) : (y))

static apr_status_t consume_brigade(h2_filter_cin *cin, 
                                    apr_bucket_brigade *bb, 
                                    apr_read_type_e block)
static apr_status_t recv_RAW_DATA(conn_rec *c, h2_filter_cin *cin, 
                                  apr_bucket *b, apr_read_type_e block)
{
    h2_session *session = cin->session;
    apr_status_t status = APR_SUCCESS;
    apr_size_t readlen = 0;
    apr_size_t len;
    const char *data;
    ssize_t n;
    
    while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
    status = apr_bucket_read(b, &data, &len, block);
    
    while (status == APR_SUCCESS && len > 0) {
        n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len);
        
        apr_bucket* bucket = APR_BRIGADE_FIRST(bb);
        if (APR_BUCKET_IS_METADATA(bucket)) {
            /* we do nothing regarding any meta here */
        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
                      H2_SSSN_MSG(session, "fed %ld bytes to nghttp2, %ld read"),
                      (long)len, (long)n);
        if (n < 0) {
            if (nghttp2_is_fatal((int)n)) {
                h2_session_event(session, H2_SESSION_EV_PROTO_ERROR, 
                                 (int)n, nghttp2_strerror((int)n));
                status = APR_EGENERAL;
            }
        }
        else {
            const char *bucket_data = NULL;
            apr_size_t bucket_length = 0;
            status = apr_bucket_read(bucket, &bucket_data,
                                     &bucket_length, block);
            
            if (status == APR_SUCCESS && bucket_length > 0) {
                apr_size_t consumed = 0;
            session->io.bytes_read += n;
            if (len <= n) {
                break;
            }
            len -= n;
            data += n;
        }
    }
    
                status = cin->cb(cin->cb_ctx, bucket_data, bucket_length, &consumed);
                if (status == APR_SUCCESS && bucket_length > consumed) {
                    /* We have data left in the bucket. Split it. */
                    status = apr_bucket_split(bucket, consumed);
    return status;
}
                readlen += consumed;
                cin->start_read = apr_time_now();

static apr_status_t recv_RAW_brigade(conn_rec *c, h2_filter_cin *cin, 
                                     apr_bucket_brigade *bb, 
                                     apr_read_type_e block)
{
    apr_status_t status = APR_SUCCESS;
    apr_bucket* b;
    int consumed = 0;
    
    h2_util_bb_log(c, c->id, APLOG_TRACE2, "RAW_in", bb);
    while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
        b = APR_BRIGADE_FIRST(bb);

        if (APR_BUCKET_IS_METADATA(b)) {
            /* nop */
        }
        else {
            status = recv_RAW_DATA(c, cin, b, block);
        }
        apr_bucket_delete(bucket);
        consumed = 1;
        apr_bucket_delete(b);
    }
    
    if (readlen == 0 && status == APR_SUCCESS && block == APR_NONBLOCK_READ) {
    if (!consumed && status == APR_SUCCESS && block == APR_NONBLOCK_READ) {
        return APR_EAGAIN;
    }
    return status;
}

h2_filter_cin *h2_filter_cin_create(apr_pool_t *p, h2_filter_cin_cb *cb, void *ctx)
h2_filter_cin *h2_filter_cin_create(h2_session *session)
{
    h2_filter_cin *cin;
    
    cin = apr_pcalloc(p, sizeof(*cin));
    cin->pool      = p;
    cin->cb        = cb;
    cin->cb_ctx    = ctx;
    cin->start_read = UNSET;
    cin = apr_pcalloc(session->pool, sizeof(*cin));
    if (!cin) {
        return NULL;
    }
    cin->session = session;
    return cin;
}

@@ -110,11 +135,14 @@ apr_status_t h2_filter_core_input(ap_filter_t* f,
    h2_filter_cin *cin = f->ctx;
    apr_status_t status = APR_SUCCESS;
    apr_interval_time_t saved_timeout = UNSET;
    const int trace1 = APLOGctrace1(f->c);
    
    if (trace1) {
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
                      "h2_session(%ld): read, %s, mode=%d, readbytes=%ld", 
                      (long)f->c->id, (block == APR_BLOCK_READ)? 
                      "BLOCK_READ" : "NONBLOCK_READ", mode, (long)readbytes);
    }
    
    if (mode == AP_MODE_INIT || mode == AP_MODE_SPECULATIVE) {
        return ap_get_brigade(f->next, brigade, mode, block, readbytes);
@@ -125,20 +153,16 @@ apr_status_t h2_filter_core_input(ap_filter_t* f,
    }
    
    if (!cin->bb) {
        cin->bb = apr_brigade_create(cin->pool, f->c->bucket_alloc);
        cin->bb = apr_brigade_create(cin->session->pool, f->c->bucket_alloc);
    }

    if (!cin->socket) {
        cin->socket = ap_get_conn_socket(f->c);
    }
    
    cin->start_read = apr_time_now();
    if (APR_BRIGADE_EMPTY(cin->bb)) {
        /* We only do a blocking read when we have no streams to process. So,
         * in httpd scoreboard lingo, we are in a KEEPALIVE connection state.
         * When reading non-blocking, we do have streams to process and update
         * child with NULL request. That way, any current request information
         * in the scoreboard is preserved.
         */
        if (block == APR_BLOCK_READ) {
            if (cin->timeout > 0) {
@@ -155,13 +179,15 @@ apr_status_t h2_filter_core_input(ap_filter_t* f,
    
    switch (status) {
        case APR_SUCCESS:
            status = consume_brigade(cin, cin->bb, block);
            status = recv_RAW_brigade(f->c, cin, cin->bb, block);
            break;
        case APR_EOF:
        case APR_EAGAIN:
        case APR_TIMEUP:
            if (trace1) {
                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
                              "h2_session(%ld): read", f->c->id);
            }
            break;
        default:
            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, f->c, APLOGNO(03046)
+4 −9
Original line number Diff line number Diff line
@@ -21,21 +21,16 @@ struct h2_headers;
struct h2_stream;
struct h2_session;

typedef apr_status_t h2_filter_cin_cb(void *ctx, 
                                      const char *data, apr_size_t len,
                                      apr_size_t *readlen);

typedef struct h2_filter_cin {
    apr_pool_t *pool;
    apr_bucket_brigade *bb;
    h2_filter_cin_cb *cb;
    void *cb_ctx;
    apr_socket_t *socket;
    apr_interval_time_t timeout;
    apr_time_t start_read;
    apr_bucket_brigade *bb;
    struct h2_session *session;
    apr_bucket *cur;
} h2_filter_cin;

h2_filter_cin *h2_filter_cin_create(apr_pool_t *p, h2_filter_cin_cb *cb, void *ctx);
h2_filter_cin *h2_filter_cin_create(struct h2_session *session);

void h2_filter_cin_timeout_set(h2_filter_cin *cin, apr_interval_time_t timeout);

Loading