Commit a21809bf authored by Stefan Eissing's avatar Stefan Eissing
Browse files

On the trunk:

M    modules/http2/h2_bucket_beam.c
     - renaming: former red/green is now send/recv, better to read
M    modules/http2/h2_from_h1.c
     - produce a response also when body it totally absent (PR 60599)
M    modules/http2/h2_session.c
     - more work on cleaner connection shutdown



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1779896 13f79535-47bb-0310-9956-ffa450edef68
parent 189be21d
Loading
Loading
Loading
Loading
+34 −34
Original line number Diff line number Diff line
@@ -361,7 +361,7 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
            if (b != H2_BLIST_SENTINEL(&beam->hold_list)) {
                /* bucket is in hold as it should be, mark this one
                 * and all before it for purging. We might have placed meta
                 * buckets without a green proxy into the hold before it 
                 * buckets without a receiver proxy into the hold before it 
                 * and schedule them for purging now */
                for (b = H2_BLIST_FIRST(&beam->hold_list); 
                     b != H2_BLIST_SENTINEL(&beam->hold_list);
@@ -455,7 +455,7 @@ static void beam_set_recv_pool(h2_bucket_beam *beam, apr_pool_t *pool)
static apr_status_t beam_send_cleanup(void *data)
{
    h2_bucket_beam *beam = data;
    /* sender has gone away, clear up all references to its memory */
    /* sender is going away, clear up all references to its memory */
    r_purge_sent(beam);
    h2_blist_cleanup(&beam->send_list);
    report_consumption(beam);
@@ -740,7 +740,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
    

    /* The fundamental problem is that reading a sender bucket from
     * a green thread is a total NO GO, because the bucket might use
     * a receiver thread is a total NO GO, because the bucket might use
     * its pool/bucket_alloc from a foreign thread and that will
     * corrupt. */
    status = APR_ENOTIMPL;
@@ -751,7 +751,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
        status = apr_bucket_setaside(b, beam->send_pool);
    }
    else if (APR_BUCKET_IS_HEAP(b)) {
        /* For heap buckets read from a green thread is fine. The
        /* For heap buckets read from a receiver thread is fine. The
         * data will be there and live until the bucket itself is
         * destroyed. */
        status = APR_SUCCESS;
@@ -760,7 +760,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
        /* pool buckets are bastards that register at pool cleanup
         * to morph themselves into heap buckets. That may happen anytime,
         * even after the bucket data pointer has been read. So at
         * any time inside the green thread, the pool bucket memory
         * any time inside the receiver thread, the pool bucket memory
         * may disappear. yikes. */
        status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
        if (status == APR_SUCCESS) {
@@ -879,13 +879,13 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
                             apr_off_t readbytes)
{
    h2_beam_lock bl;
    apr_bucket *bsender, *bgreen, *ng;
    apr_bucket *bsender, *brecv, *ng;
    int transferred = 0;
    apr_status_t status = APR_SUCCESS;
    apr_off_t remain = readbytes;
    int transferred_buckets = 0;
    
    /* Called from the green thread to take buckets from the beam */
    /* Called from the receiver thread to take buckets from the beam */
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
transfer:
        if (beam->aborted) {
@@ -896,26 +896,26 @@ transfer:
            goto leave;
        }

        /* transfer enough buckets from our green brigade, if we have one */
        /* transfer enough buckets from our receiver brigade, if we have one */
        beam_set_recv_pool(beam, bb->p);
        while (beam->recv_buffer
               && !APR_BRIGADE_EMPTY(beam->recv_buffer)
               && (readbytes <= 0 || remain >= 0)) {
            bgreen = APR_BRIGADE_FIRST(beam->recv_buffer);
            if (readbytes > 0 && bgreen->length > 0 && remain <= 0) {
            brecv = APR_BRIGADE_FIRST(beam->recv_buffer);
            if (readbytes > 0 && brecv->length > 0 && remain <= 0) {
                break;
            }            
            APR_BUCKET_REMOVE(bgreen);
            APR_BRIGADE_INSERT_TAIL(bb, bgreen);
            remain -= bgreen->length;
            APR_BUCKET_REMOVE(brecv);
            APR_BRIGADE_INSERT_TAIL(bb, brecv);
            remain -= brecv->length;
            ++transferred;
        }

        /* transfer from our sender brigade, transforming sender buckets to
         * green ones until we have enough */
         * receiver ones until we have enough */
        while (!H2_BLIST_EMPTY(&beam->send_list) && (readbytes <= 0 || remain >= 0)) {
            bsender = H2_BLIST_FIRST(&beam->send_list);
            bgreen = NULL;
            brecv = NULL;
            
            if (readbytes > 0 && bsender->length > 0 && remain <= 0) {
                break;
@@ -923,15 +923,15 @@ transfer:
                        
            if (APR_BUCKET_IS_METADATA(bsender)) {
                if (APR_BUCKET_IS_EOS(bsender)) {
                    bgreen = apr_bucket_eos_create(bb->bucket_alloc);
                    brecv = apr_bucket_eos_create(bb->bucket_alloc);
                    beam->close_sent = 1;
                }
                else if (APR_BUCKET_IS_FLUSH(bsender)) {
                    bgreen = apr_bucket_flush_create(bb->bucket_alloc);
                    brecv = apr_bucket_flush_create(bb->bucket_alloc);
                }
                else if (AP_BUCKET_IS_ERROR(bsender)) {
                    ap_bucket_error *eb = (ap_bucket_error *)bsender;
                    bgreen = ap_bucket_error_create(eb->status, eb->data,
                    brecv = ap_bucket_error_create(eb->status, eb->data,
                                                    bb->p, bb->bucket_alloc);
                }
            }
@@ -966,33 +966,33 @@ transfer:
                continue;
            }
            else {
                /* create a "green" standin bucket. we took care about the
                /* create a "receiver" standin bucket. we took care about the
                 * underlying sender bucket and its data when we placed it into
                 * the sender brigade.
                 * the beam bucket will notify us on destruction that bsender is
                 * no longer needed. */
                bgreen = h2_beam_bucket_create(beam, bsender, bb->bucket_alloc,
                brecv = h2_beam_bucket_create(beam, bsender, bb->bucket_alloc,
                                               beam->buckets_sent++);
            }
            
            /* Place the sender bucket into our hold, to be destroyed when no
             * green bucket references it any more. */
             * receiver bucket references it any more. */
            APR_BUCKET_REMOVE(bsender);
            H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
            beam->received_bytes += bsender->length;
            ++transferred_buckets;
            
            if (bgreen) {
                APR_BRIGADE_INSERT_TAIL(bb, bgreen);
                remain -= bgreen->length;
            if (brecv) {
                APR_BRIGADE_INSERT_TAIL(bb, brecv);
                remain -= brecv->length;
                ++transferred;
            }
            else {
                bgreen = h2_beam_bucket(beam, bb, bsender);
                while (bgreen && bgreen != APR_BRIGADE_SENTINEL(bb)) {
                brecv = h2_beam_bucket(beam, bb, bsender);
                while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
                    ++transferred;
                    remain -= bgreen->length;
                    bgreen = APR_BUCKET_NEXT(bgreen);
                    remain -= brecv->length;
                    brecv = APR_BUCKET_NEXT(brecv);
                }
            }
        }
@@ -1000,14 +1000,14 @@ transfer:
        if (readbytes > 0 && remain < 0) {
            /* too much, put some back */
            remain = readbytes;
            for (bgreen = APR_BRIGADE_FIRST(bb);
                 bgreen != APR_BRIGADE_SENTINEL(bb);
                 bgreen = APR_BUCKET_NEXT(bgreen)) {
                 remain -= bgreen->length;
            for (brecv = APR_BRIGADE_FIRST(bb);
                 brecv != APR_BRIGADE_SENTINEL(bb);
                 brecv = APR_BUCKET_NEXT(brecv)) {
                 remain -= brecv->length;
                 if (remain < 0) {
                     apr_bucket_split(bgreen, bgreen->length+remain);
                     apr_bucket_split(brecv, brecv->length+remain);
                     beam->recv_buffer = apr_brigade_split_ex(bb, 
                                                        APR_BUCKET_NEXT(bgreen), 
                                                        APR_BUCKET_NEXT(brecv), 
                                                        beam->recv_buffer);
                     break;
                 }
+4 −0
Original line number Diff line number Diff line
@@ -527,6 +527,10 @@ apr_status_t h2_filter_headers_out(ap_filter_t *f, apr_bucket_brigade *bb)
            if (AP_BUCKET_IS_ERROR(b) && !eb) {
                eb = b->data;
            }
            else if (APR_BUCKET_IS_EOS(b) || AP_BUCKET_IS_EOR(b)) {
                body_bucket = b;
                break;
            } 
            else if (AP_BUCKET_IS_EOC(b)) {
                /* If we see an EOC bucket it is a signal that we should get out
                 * of the way doing nothing.
+32 −56
Original line number Diff line number Diff line
@@ -95,7 +95,7 @@ typedef struct stream_sel_ctx {
    h2_stream *candidate;
} stream_sel_ctx;

static int find_cleanup_stream(h2_stream *stream, void *ictx)
static int find_unprocessed_stream(h2_stream *stream, void *ictx)
{
    stream_sel_ctx *ctx = ictx;
    if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
@@ -115,20 +115,17 @@ static int find_cleanup_stream(h2_stream *stream, void *ictx)
    return 1;
}

static void cleanup_streams(h2_session *session)
static void cleanup_unprocessed_streams(h2_session *session)
{
    stream_sel_ctx ctx;
    ctx.session = session;
    ctx.candidate = NULL;
    while (1) {
        h2_mplx_stream_do(session->mplx, find_cleanup_stream, &ctx);
        if (ctx.candidate) {
            h2_session_stream_done(session, ctx.candidate);
        ctx.candidate = NULL;
        }
        else {
        h2_mplx_stream_do(session->mplx, find_unprocessed_stream, &ctx);
        if (!ctx.candidate) {
            break;
        }
        h2_session_stream_done(session, ctx.candidate);
    }
}

@@ -317,7 +314,7 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
    return 0;
}

static apr_status_t stream_release(h2_session *session, 
static apr_status_t stream_closed(h2_session *session, 
                                  h2_stream *stream,
                                  uint32_t error_code) 
{
@@ -329,12 +326,11 @@ static apr_status_t stream_release(h2_session *session,
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                      "h2_stream(%ld-%d): handled, closing", 
                      session->id, (int)stream->id);
        if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
            if (stream->id > session->local.completed_max) {
        if (H2_STREAM_CLIENT_INITIATED(stream->id)
            && stream->id > session->local.completed_max) {
            session->local.completed_max = stream->id;
        }
    }
    }
    else {
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03065)
                      "h2_stream(%ld-%d): closing with err=%d %s", 
@@ -342,7 +338,11 @@ static apr_status_t stream_release(h2_session *session,
                      h2_h2_err_description(error_code));
        h2_stream_rst(stream, error_code);
    }
    
    /* The stream might have data in the buffers of the main connection.
     * We can only free the allocated resources once all had been written.
     * Send a special buckets on the connection that gets destroyed when
     * all preceding data has been handled. On its destruction, it is safe
     * to purge all resources of the stream. */
    b = h2_bucket_eos_create(c->bucket_alloc, stream);
    APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
    status = h2_conn_io_pass(&session->io, session->bbtmp);
@@ -359,7 +359,7 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
    (void)ngh2;
    stream = get_stream(session, stream_id);
    if (stream) {
        stream_release(session, stream, error_code);
        stream_closed(session, stream, error_code);
    }
    return 0;
}
@@ -729,32 +729,6 @@ static apr_status_t init_callbacks(conn_rec *c, nghttp2_session_callbacks **pcb)
    return APR_SUCCESS;
}

static void h2_session_cleanup(h2_session *session)
{
    ap_assert(session);    

    if (session->mplx) {
        h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
        h2_mplx_release_and_join(session->mplx, session->iowait);
        session->mplx = NULL;
    }

    ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
                                     session->c->input_filters), "H2_IN");
    if (session->ngh2) {
        nghttp2_session_del(session->ngh2);
        session->ngh2 = NULL;
    }
    if (session->c) {
        h2_ctx_clear(session->c);
    }

    if (APLOGctrace1(session->c)) {
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
                      "h2_session(%ld): cleanup", session->id);
    }
}

static apr_status_t h2_session_shutdown_notice(h2_session *session)
{
    apr_status_t status;
@@ -829,17 +803,11 @@ static apr_status_t h2_session_shutdown(h2_session *session, int error,
static apr_status_t session_pool_cleanup(void *data)
{
    h2_session *session = data;
    /* On a controlled connection shutdown, this gets never
     * called as we deregister and destroy our pool manually.
     * However when we have an async mpm, and handed it our idle
     * connection, it will just cleanup once the connection is closed
     * from the other side (and sometimes even from out side) and
     * here we arrive then.
     */
    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
                  "session(%ld): pool_cleanup", session->id);
    
    if (session->state != H2_SESSION_ST_DONE) {
    if (session->state != H2_SESSION_ST_DONE
        && session->state != H2_SESSION_ST_INIT) {
        /* Not good. The connection is being torn down and we have
         * not sent a goaway. This is considered a protocol error and
         * the client has to assume that any streams "in flight" may have
@@ -853,8 +821,16 @@ static apr_status_t session_pool_cleanup(void *data)
                      "goodbye, clients will be confused, should not happen", 
                      session->id);
    }
    h2_session_cleanup(session);
    session->pool = NULL;

    if (session->mplx) {
        h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
        h2_mplx_release_and_join(session->mplx, session->iowait);
        session->mplx = NULL;
    }
    if (session->ngh2) {
        nghttp2_session_del(session->ngh2);
        session->ngh2 = NULL;
    }
    return APR_SUCCESS;
}

@@ -1774,7 +1750,7 @@ static void h2_session_ev_init(h2_session *session, int arg, const char *msg)

static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg)
{
    cleanup_streams(session);
    cleanup_unprocessed_streams(session);
    if (!session->remote.shutdown) {
        update_child_status(session, SERVER_CLOSING, "local goaway");
    }
@@ -1787,7 +1763,7 @@ static void h2_session_ev_remote_goaway(h2_session *session, int arg, const char
        session->remote.error = arg;
        session->remote.accepting = 0;
        session->remote.shutdown = 1;
        cleanup_streams(session);
        cleanup_unprocessed_streams(session);
        update_child_status(session, SERVER_CLOSING, "remote goaway");
        transit(session, "remote goaway", H2_SESSION_ST_DONE);
    }