Commit cfb69e2f authored by Stefan Eissing's avatar Stefan Eissing
Browse files

On the trunk:

mod_http2/mod_proxy_http2: less read attempts on bucket beams that already
     delivered EOS/headers. Fixed bug in re-attempting proxy request after 
     connection error.



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1790102 13f79535-47bb-0310-9956-ffa450edef68
parent ebf53ccf
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
                                                         -*- coding: utf-8 -*-
Changes with Apache 2.5.0

  *) mod_http2/mod_proxy_http2: less read attempts on bucket beams that already
     delivered EOS/headers. Fixed bug in re-attempting proxy request after 
     connection error. [Stefan Eissing]
  
  *) core: Disallow multiple Listen on the same IP:port when listener buckets
     are configured (ListenCoresBucketsRatio > 0), consistently with the single
     bucket case (default), thus avoiding the leak of the corresponding socket
+13 −2
Original line number Diff line number Diff line
@@ -151,6 +151,7 @@ static void ngn_add_task(h2_req_engine *ngn, h2_task *task, request_rec *r)
    entry->task = task;
    entry->r = r;
    H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
    ngn->no_assigned++;
}


@@ -176,6 +177,17 @@ apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type,
        task->assigned = NULL;
    }
    
    if (task->engine) {
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, 
                      "h2_ngn_shed(%ld): push task(%s) hosting engine %s " 
                      "already with %d tasks", 
                      shed->c->id, task->id, task->engine->id,
                      task->engine->no_assigned);
        task->assigned = task->engine;
        ngn_add_task(task->engine, task, r);
        return APR_SUCCESS;
    }
    
    ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING);
    if (ngn && !ngn->shutdown) {
        /* this task will be processed in another thread,
@@ -187,7 +199,6 @@ apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type,
            h2_task_freeze(task);
        }
        ngn_add_task(ngn, task, r);
        ngn->no_assigned++;
        return APR_SUCCESS;
    }
    
@@ -211,11 +222,11 @@ apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type,
        status = einit(newngn, newngn->id, newngn->type, newngn->pool,
                       shed->req_buffer_size, r,
                       &newngn->out_consumed, &newngn->out_consumed_ctx);
        
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03395)
                      "h2_ngn_shed(%ld): create engine %s (%s)", 
                      shed->c->id, newngn->id, newngn->type);
        if (status == APR_SUCCESS) {
            ap_assert(task->engine == NULL);
            newngn->task = task;
            task->engine = newngn;
            task->assigned = newngn;
+97 −74
Original line number Diff line number Diff line
@@ -764,17 +764,76 @@ static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb)
    return NULL;
}

apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, 
                                   int *peos, h2_headers **presponse)
static apr_status_t add_data(h2_stream *stream, apr_off_t requested,
                             apr_off_t *plen, int *peos, int *complete, 
                             h2_headers **pheaders)
{
    apr_status_t status = APR_SUCCESS;
    apr_off_t requested, max_chunk = H2_DATA_CHUNK_SIZE;
    apr_bucket *b, *e;
    conn_rec *c;
    
    if (presponse) {
        *presponse = NULL;
    *peos = 0;
    *plen = 0;
    *complete = 0;
    if (pheaders) {
        *pheaders = NULL;
    }

    H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "add_data");
    b = APR_BRIGADE_FIRST(stream->out_buffer);
    while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
        e = APR_BUCKET_NEXT(b);
        if (APR_BUCKET_IS_METADATA(b)) {
            if (APR_BUCKET_IS_FLUSH(b)) {
                APR_BUCKET_REMOVE(b);
                apr_bucket_destroy(b);
            }
            else if (APR_BUCKET_IS_EOS(b)) {
                *peos = 1;
                return APR_SUCCESS;
            }
            else if (H2_BUCKET_IS_HEADERS(b)) {
                if (*plen > 0) {
                    /* data before the response, can only return up to here */
                    return APR_SUCCESS;
                }
                else if (pheaders) {
                    *pheaders = h2_bucket_headers_get(b);
                    APR_BUCKET_REMOVE(b);
                    apr_bucket_destroy(b);
                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
                                  H2_STRM_MSG(stream, "prep, -> response %d"), 
                                  (*pheaders)->status);
                    return APR_SUCCESS;
                }
                else {
                    return APR_EAGAIN;
                }
            }
        }
        else if (b->length == 0) {
            APR_BUCKET_REMOVE(b);
            apr_bucket_destroy(b);
        }
        else {
            ap_assert(b->length != (apr_size_t)-1);
            *plen += b->length;
            if (*plen >= requested) {
                *plen = requested;
                return APR_SUCCESS;
            }
        }
        b = e;
    }
    *complete = 1;
    return APR_SUCCESS;
}

apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, 
                                   int *peos, h2_headers **pheaders)
{
    apr_status_t status = APR_SUCCESS;
    apr_off_t requested, missing, max_chunk = H2_DATA_CHUNK_SIZE;
    conn_rec *c;
    int complete;

    ap_assert(stream);
    
@@ -793,15 +852,34 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
    if (stream->session->io.write_size > 0) {
        max_chunk = stream->session->io.write_size - 9; /* header bits */ 
    }
    *plen = requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
    requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
    
    h2_util_bb_avail(stream->out_buffer, plen, peos);
    if (!*peos && *plen < requested && *plen < stream->max_mem) {
        H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
    /* count the buffered data until eos or a headers bucket */
    status = add_data(stream, requested, plen, peos, &complete, pheaders);
    
    if (status == APR_EAGAIN) {
        /* TODO: ugly, someone needs to retrieve the response first */
        h2_mplx_keep_active(stream->session->mplx, stream->id);
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                      H2_STRM_MSG(stream, "prep, response eagain"));
        return status;
    }
    else if (status != APR_SUCCESS) {
        return status;
    }
    
    if (pheaders && *pheaders) {
        return APR_SUCCESS;
    }
    
    missing = H2MIN(requested, stream->max_mem) - *plen;
    if (complete && !*peos && missing > 0) {
        if (stream->output) {
            H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
            status = h2_beam_receive(stream->output, stream->out_buffer, 
                                     APR_NONBLOCK_READ, 
                                     stream->max_mem - *plen);
            H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
        }
        else {
            status = APR_EOF;
@@ -810,79 +888,24 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
        if (APR_STATUS_IS_EOF(status)) {
            apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
            APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
            *peos = 1;
            status = APR_SUCCESS;
        }
        else if (status == APR_EAGAIN) {
            status = APR_SUCCESS;
        }
        *plen = requested;
        h2_util_bb_avail(stream->out_buffer, plen, peos);
        H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
    }
    else {
        H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "ok");
    }

    b = APR_BRIGADE_FIRST(stream->out_buffer);
    while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
        e = APR_BUCKET_NEXT(b);
        if (APR_BUCKET_IS_FLUSH(b)
            || (!APR_BUCKET_IS_METADATA(b) && b->length == 0)) {
            APR_BUCKET_REMOVE(b);
            apr_bucket_destroy(b);
        }
        else {
            break;
        }
        b = e;
    }

    b = get_first_headers_bucket(stream->out_buffer);
    if (b) {
        /* there are HEADERS to submit */
        *peos = 0;
        *plen = 0;
        if (b == APR_BRIGADE_FIRST(stream->out_buffer)) {
            if (presponse) {
                *presponse = h2_bucket_headers_get(b);
                APR_BUCKET_REMOVE(b);
                apr_bucket_destroy(b);
                status = APR_SUCCESS;
            }
            else {
                /* someone needs to retrieve the response first */
                h2_mplx_keep_active(stream->session->mplx, stream->id);
                status = APR_EAGAIN;
            }
        }
        else {
            apr_bucket *e = APR_BRIGADE_FIRST(stream->out_buffer);
            while (e != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
                if (e == b) {
                    break;
                }
                else if (e->length != (apr_size_t)-1) {
                    *plen += e->length;
                }
                e = APR_BUCKET_NEXT(e);
            }
        else if (status == APR_SUCCESS) {
            /* do it again, now that we have gotten more */
            status = add_data(stream, requested, plen, peos, &complete, pheaders);
        }
    }
    
    if (status == APR_SUCCESS) {
        if (presponse && *presponse) {
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
                          H2_STRM_MSG(stream, "prepare, response %d"), 
                          (*presponse)->status);
        }
        else if (*peos || *plen) {
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
        if (*peos || *plen) {
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                          H2_STRM_MSG(stream, "prepare, len=%ld eos=%d"),
                          (long)*plen, *peos);
        }
        else {
            status = APR_EAGAIN;
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                          H2_STRM_MSG(stream, "prepare, no data"));
        }
    }
+31 −46
Original line number Diff line number Diff line
@@ -218,19 +218,12 @@ static apr_status_t proxy_engine_init(h2_req_engine *engine,
{
    h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config, 
                                             &proxy_http2_module);
    if (ctx) {
        conn_rec *c = ctx->owner;
        h2_proxy_ctx *nctx;
        
        /* we need another lifetime for this. If we do not host
         * an engine, the context lives in r->pool. Since we expect
         * to server more than r, we need to live longer */
        nctx = apr_pcalloc(pool, sizeof(*nctx));
        if (nctx == NULL) {
            return APR_ENOMEM;
    if (!ctx) {
        ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, APLOGNO(03368)
                      "h2_proxy_session, engine init, no ctx found");
        return APR_ENOTIMPL;
    }
        memcpy(nctx, ctx, sizeof(*nctx));
        ctx = nctx;
    
    ctx->pool = pool;
    ctx->engine = engine;
    ctx->engine_id = id;
@@ -239,16 +232,10 @@ static apr_status_t proxy_engine_init(h2_req_engine *engine,
    ctx->req_buffer_size = req_buffer_size;
    ctx->capacity = 100;
    
        ap_set_module_config(c->conn_config, &proxy_http2_module, ctx);

    *pconsumed = out_consumed;
    *pctx = ctx;
    return APR_SUCCESS;
}
    ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, APLOGNO(03368)
                  "h2_proxy_session, engine init, no ctx found");
    return APR_ENOTIMPL;
}

static apr_status_t add_request(h2_proxy_session *session, request_rec *r)
{
@@ -420,7 +407,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
    return status;
}

static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx)
static apr_status_t push_request_somewhere(h2_proxy_ctx *ctx, request_rec *r)
{
    conn_rec *c = ctx->owner;
    const char *engine_type, *hostname;
@@ -430,21 +417,15 @@ static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx)
    engine_type = apr_psprintf(ctx->pool, "proxy_http2 %s%s", hostname, 
                               ctx->server_portstr);
    
    if (c->master && req_engine_push && ctx->next && is_h2 && is_h2(c)) {
    if (c->master && req_engine_push && r && is_h2 && is_h2(c)) {
        /* If we are have req_engine capabilities, push the handling of this
         * request (e.g. slave connection) to a proxy_http2 engine which 
         * uses the same backend. We may be called to create an engine 
         * ourself. */
        if (req_engine_push(engine_type, ctx->next, proxy_engine_init)
            == APR_SUCCESS) {
            /* to renew the lifetime, we might have set a new ctx */
            ctx = ap_get_module_config(c->conn_config, &proxy_http2_module);
        if (req_engine_push(engine_type, r, proxy_engine_init) == APR_SUCCESS) {
            if (ctx->engine == NULL) {
                /* Another engine instance has taken over processing of this
                 * request. */
                ctx->r_status = SUSPENDED;
                ctx->next = NULL;
                return ctx;
                /* request has been assigned to an engine in another thread */
                return SUSPENDED;
            }
        }
    }
@@ -465,7 +446,7 @@ static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx)
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, 
                      "H2: hosting engine %s", ctx->engine_id);
    }
    return ctx;
    return APR_SUCCESS;
}

static int proxy_http2_handler(request_rec *r, 
@@ -507,6 +488,7 @@ static int proxy_http2_handler(request_rec *r,
        default:
            return DECLINED;
    }
    
    ctx = apr_pcalloc(r->pool, sizeof(*ctx));
    ctx->owner      = r->connection;
    ctx->pool       = r->pool;
@@ -520,6 +502,7 @@ static int proxy_http2_handler(request_rec *r,
    ctx->r_status   = HTTP_SERVICE_UNAVAILABLE;
    ctx->next       = r;
    r = NULL;
    
    ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, ctx);

    /* scheme says, this is for us. */
@@ -552,9 +535,10 @@ run_connect:
    /* If we are not already hosting an engine, try to push the request 
     * to an already existing engine or host a new engine here. */
    if (!ctx->engine) {
        ctx = push_request_somewhere(ctx);
        ctx->r_status = push_request_somewhere(ctx, ctx->next);
        if (ctx->r_status == SUSPENDED) {
            /* request was pushed to another engine */
            /* request was pushed to another thread, leave processing here */
            ctx->next = NULL;
            goto cleanup;
        }
    }
@@ -567,7 +551,7 @@ run_connect:
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03352)
                      "H2: failed to make connection to backend: %s",
                      ctx->p_conn->hostname);
        goto cleanup;
        goto reconnect;
    }
    
    /* Step Three: Create conn_rec for the socket we have open now. */
@@ -579,7 +563,7 @@ run_connect:
                          "setup new connection: is_ssl=%d %s %s %s", 
                          ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname, 
                          locurl, ctx->p_conn->hostname);
            goto cleanup;
            goto reconnect;
        }
        
        if (!ctx->p_conn->data) {
@@ -614,7 +598,7 @@ run_session:
        ctx->engine = NULL;
    }

cleanup:
reconnect:
    if (!reconnected && next_request(ctx, 1) == APR_SUCCESS) {
        /* Still more to do, tear down old conn and start over */
        if (ctx->p_conn) {
@@ -627,6 +611,7 @@ cleanup:
        goto run_connect;
    }
    
cleanup:
    if (ctx->p_conn) {
        if (status != APR_SUCCESS) {
            /* close socket when errors happened or session shut down (EOF) */