h2_mplx.c 38.9 KB
Newer Older
powelld's avatar
powelld committed
             * h2 workers. As worker threads are a scarce resource, 
             * we need to take measures that we do not get DoSed.
             * 
             * This is what we call an 'idle block'. Limit the amount 
             * of busy workers we allow for this connection until it
             * well behaves.
             */
            now = apr_time_now();
            m->last_idle_block = now;
            if (m->limit_active > 2 
                && now - m->last_limit_change >= m->limit_change_interval) {
                if (m->limit_active > 16) {
                    m->limit_active = 16;
                }
                else if (m->limit_active > 8) {
                    m->limit_active = 8;
                }
                else if (m->limit_active > 4) {
                    m->limit_active = 4;
                }
                else if (m->limit_active > 2) {
                    m->limit_active = 2;
                }
                m->last_limit_change = now;
                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                              "h2_mplx(%ld): decrease worker limit to %d",
                              m->id, m->limit_active);
            }
            
            if (m->tasks_active > m->limit_active) {
                status = unschedule_slow_tasks(m);
            }
        }
        else if (!h2_iq_empty(m->q)) {
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                          "h2_mplx(%ld): idle, but %d streams to process",
                          m->id, (int)h2_iq_count(m->q));
            status = APR_EAGAIN;
        }
        else {
            /* idle, have streams, but no tasks active. what are we waiting for?
             * WINDOW_UPDATEs from client? */
            h2_stream *stream = NULL;
            
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                          "h2_mplx(%ld): idle, no tasks ongoing, %d streams",
                          m->id, (int)h2_ihash_count(m->streams));
            h2_ihash_shift(m->streams, (void**)&stream, 1);
            if (stream) {
                h2_ihash_add(m->streams, stream);
                if (stream->output && !stream->out_checked) {
                    /* FIXME: this looks like a race between the session thinking
                     * it is idle and the EOF on a stream not being sent.
                     * Signal to caller to leave IDLE state.
                     */
                    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                                  H2_STRM_MSG(stream, "output closed=%d, mplx idle"
                                              ", out has %ld bytes buffered"),
                                  h2_beam_is_closed(stream->output),
                                  (long)h2_beam_get_buffered(stream->output));
                    h2_ihash_add(m->streams, stream);
                    check_data_for(m, stream, 0);
                    stream->out_checked = 1;
                    status = APR_EAGAIN;
                }
            }
        }
    }
    register_if_needed(m);

    H2_MPLX_LEAVE(m);
    return status;
}

/*******************************************************************************
 * HTTP/2 request engines
 ******************************************************************************/

typedef struct {
    h2_mplx * m;
    h2_req_engine *ngn;
    int streams_updated;
} ngn_update_ctx;

static int ngn_update_window(void *ctx, void *val)
{
    ngn_update_ctx *uctx = ctx;
    h2_stream *stream = val;
    if (stream->task && stream->task->assigned == uctx->ngn
        && output_consumed_signal(uctx->m, stream->task)) {
        ++uctx->streams_updated;
    }
    return 1;
}

static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn)
{
    ngn_update_ctx ctx;
        
    ctx.m = m;
    ctx.ngn = ngn;
    ctx.streams_updated = 0;
    h2_ihash_iter(m->streams, ngn_update_window, &ctx);
    
    return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
}

apr_status_t h2_mplx_req_engine_push(const char *ngn_type, 
                                     request_rec *r,
                                     http2_req_engine_init *einit)
{
    apr_status_t status;
    h2_mplx *m;
    h2_task *task;
    h2_stream *stream;
    
    task = h2_ctx_rget_task(r);
    if (!task) {
        return APR_ECONNABORTED;
    }
    m = task->mplx;
    
    H2_MPLX_ENTER(m);

    stream = h2_ihash_get(m->streams, task->stream_id);
    if (stream) {
        status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
    }
    else {
        status = APR_ECONNABORTED;
    }

    H2_MPLX_LEAVE(m);
    return status;
}

apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, 
                                     apr_read_type_e block, 
                                     int capacity, 
                                     request_rec **pr)
{   
    h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
    h2_mplx *m = h2_ngn_shed_get_ctx(shed);
    apr_status_t status;
    int want_shutdown;
    
    H2_MPLX_ENTER(m);

    want_shutdown = (block == APR_BLOCK_READ);

    /* Take this opportunity to update output consummation 
     * for this engine */
    ngn_out_update_windows(m, ngn);
    
    if (want_shutdown && !h2_iq_empty(m->q)) {
        /* For a blocking read, check first if requests are to be
         * had and, if not, wait a short while before doing the
         * blocking, and if unsuccessful, terminating read.
         */
        status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
        if (APR_STATUS_IS_EAGAIN(status)) {
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                          "h2_mplx(%ld): start block engine pull", m->id);
            apr_thread_cond_timedwait(m->task_thawed, m->lock, 
                                      apr_time_from_msec(20));
            status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
        }
    }
    else {
        status = h2_ngn_shed_pull_request(shed, ngn, capacity,
                                          want_shutdown, pr);
    }

    H2_MPLX_LEAVE(m);
    return status;
}
 
void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
                             apr_status_t status)
{
    h2_task *task = h2_ctx_cget_task(r_conn);
    
    if (task) {
        h2_mplx *m = task->mplx;
        h2_stream *stream;

        H2_MPLX_ENTER_ALWAYS(m);

        stream = h2_ihash_get(m->streams, task->stream_id);
        
        ngn_out_update_windows(m, ngn);
        h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
        
        if (status != APR_SUCCESS && stream 
            && h2_task_can_redo(task) 
            && !h2_ihash_get(m->sredo, stream->id)) {
            h2_ihash_add(m->sredo, stream);
        }

        if (task->engine) { 
            /* cannot report that as done until engine returns */
        }
        else {
            task_done(m, task, ngn);
        }

        H2_MPLX_LEAVE(m);
    }
}

/*******************************************************************************
 * mplx master events dispatching
 ******************************************************************************/

int h2_mplx_has_master_events(h2_mplx *m)
{
    return apr_atomic_read32(&m->event_pending) > 0;
}

apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, 
                                            stream_ev_callback *on_resume, 
                                            void *on_ctx)
{
    h2_stream *stream;
    int n, id;
    
    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                  "h2_mplx(%ld): dispatch events", m->id);        
    apr_atomic_set32(&m->event_pending, 0);

    /* update input windows for streams */
    h2_ihash_iter(m->streams, report_consumption_iter, m);    
    purge_streams(m, 1);
    
    n = h2_ififo_count(m->readyq);
    while (n > 0 
           && (h2_ififo_try_pull(m->readyq, &id) == APR_SUCCESS)) {
        --n;
        stream = h2_ihash_get(m->streams, id);
        if (stream) {
            on_resume(on_ctx, stream);
        }
    }
    
    return APR_SUCCESS;
}

apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream)
{
    check_data_for(m, stream, 1);
    return APR_SUCCESS;
}

int h2_mplx_awaits_data(h2_mplx *m)
{
    int waiting = 1;
     
    H2_MPLX_ENTER_ALWAYS(m);

    if (h2_ihash_empty(m->streams)) {
        waiting = 0;
    }
    else if (!m->tasks_active && !h2_ififo_count(m->readyq)
             && h2_iq_empty(m->q)) {
        waiting = 0;
    }

    H2_MPLX_LEAVE(m);
    return waiting;
}