Commit 04a9b08a authored by Yann Ylavic's avatar Yann Ylavic
Browse files

Merge r1643279, r1703241, r1802535, r1819847, r1819848, r1819852, r1819853 from trunk:

mpm_event(opt): avoid casts/comparisons from unsigned to signed (atomics).


mpm_event/worker: make ap_queue_term() atomic (acquire/release the mutex once).


mpm_event: ap_queue_info_try_get_idler() may atomically decrement and then
re-increment the number idlers if it went under or to zero.  We can avoid
this by switching to a compare-and-swap scheme.


mpm_event: avoid unexpected compiler optimizations.

Make sure the compiler doesn't play games with our synchronization variables
by marking them volatile.


mpm_event: make sure wakeup_listener() does its minimal job.

Even if the listener thread is not created yet (i.e. about to be), we must
still tell it to leave, and terminate the worker queue in any case.


mpm_event: worker factor vs pollset.

Make sure the worker factor is at least one (w.r.t. WORKER_FACTOR_SCALE), and
use it to size the pollset appropriately (including K-A and lingering close
connections), in addition to the listening sockets.


mpm_event: remove atomics for timeout_queue's total counter.

It's always updated under the timeout_mutex lock, or read for logging and
scoreboard updates (not critical).

For the read cases a volatile access is enough, while removing the atomic ops
for the already protected write cases saves cycles and context switches.


Submitted by: ylavic
Reviewed by: ylavic, jim, icing


git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1823642 13f79535-47bb-0310-9956-ffa450edef68
parent bd6cc3d9
Loading
Loading
Loading
Loading
+55 −40
Original line number Diff line number Diff line
@@ -173,10 +173,10 @@ static int max_workers = 0; /* MaxRequestWorkers */
static int server_limit = 0;                /* ServerLimit */
static int thread_limit = 0;                /* ThreadLimit */
static int had_healthy_child = 0;
static int dying = 0;
static int workers_may_exit = 0;
static int start_thread_may_exit = 0;
static int listener_may_exit = 0;
static volatile int dying = 0;
static volatile int workers_may_exit = 0;
static volatile int start_thread_may_exit = 0;
static volatile int listener_may_exit = 0;
static int listener_is_wakeable = 0;        /* Pollset supports APR_POLLSET_WAKEABLE */
static int num_listensocks = 0;
static apr_int32_t conns_this_child;        /* MaxConnectionsPerChild, only access
@@ -287,7 +287,7 @@ static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el)
    apr_time_t next_expiry;

    APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list);
    apr_atomic_inc32(q->total);
    ++*q->total;
    ++q->count;

    /* Cheaply update the overall queues' next expiry according to the
@@ -309,7 +309,7 @@ static void TO_QUEUE_REMOVE(struct timeout_queue *q, event_conn_state_t *el)
{
    APR_RING_REMOVE(el, timeout_list);
    APR_RING_ELEM_INIT(el, timeout_list);
    apr_atomic_dec32(q->total);
    --*q->total;
    --q->count;
}

@@ -541,23 +541,25 @@ static void close_worker_sockets(void)
static void wakeup_listener(void)
{
    listener_may_exit = 1;
    if (!listener_os_thread) {
        /* XXX there is an obscure path that this doesn't handle perfectly:
         *     right after listener thread is created but before
         *     listener_os_thread is set, the first worker thread hits an
         *     error and starts graceful termination
         */
        return;
    }

    /* Unblock the listener if it's poll()ing */
    if (listener_is_wakeable) {
    if (event_pollset && listener_is_wakeable) {
        apr_pollset_wakeup(event_pollset);
    }

    /* unblock the listener if it's waiting for a worker */
    if (worker_queue_info) {
        ap_queue_info_term(worker_queue_info);
    }

    if (!listener_os_thread) {
        /* XXX there is an obscure path that this doesn't handle perfectly:
         *     right after listener thread is created but before
         *     listener_os_thread is set, the first worker thread hits an
         *     error and starts graceful termination
         */
        return;
    }
    /*
     * we should just be able to "kill(ap_my_pid, LISTENER_SIGNAL)" on all
     * platforms and wake up the listener thread since it is the only thread
@@ -578,7 +580,7 @@ static int terminate_mode = ST_INIT;

static void signal_threads(int mode)
{
    if (terminate_mode == mode) {
    if (terminate_mode >= mode) {
        return;
    }
    terminate_mode = mode;
@@ -1461,7 +1463,7 @@ static void process_timeout_queue(struct timeout_queue *q,
    struct timeout_queue *qp;
    apr_status_t rv;

    if (!apr_atomic_read32(q->total)) {
    if (!*q->total) {
        return;
    }

@@ -1511,8 +1513,8 @@ static void process_timeout_queue(struct timeout_queue *q,
        APR_RING_UNSPLICE(first, last, timeout_list);
        APR_RING_SPLICE_TAIL(&trash, first, last, event_conn_state_t,
                             timeout_list);
        AP_DEBUG_ASSERT(apr_atomic_read32(q->total) >= count);
        apr_atomic_sub32(q->total, count);
        AP_DEBUG_ASSERT(*q->total >= count && qp->count >= count);
        *q->total -= count;
        qp->count -= count;
        total += count;
    }
@@ -1538,8 +1540,7 @@ static void process_keepalive_queue(apr_time_t timeout_time)
    if (!timeout_time) {
        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
                     "All workers are busy or dying, will close %u "
                     "keep-alive connections",
                     apr_atomic_read32(keepalive_q->total));
                     "keep-alive connections", *keepalive_q->total);
    }
    process_timeout_queue(keepalive_q, timeout_time,
                          start_lingering_close_nonblocking);
@@ -1578,6 +1579,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
        timer_event_t *te;
        const apr_pollfd_t *out_pfd;
        apr_int32_t num = 0;
        apr_uint32_t c_count, l_count, i_count;
        apr_interval_time_t timeout_interval;
        apr_time_t now, timeout_time;
        int workers_were_busy = 0;
@@ -1603,8 +1605,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                             "keep-alive: %d lingering: %d suspended: %u)",
                             apr_atomic_read32(&connection_count),
                             apr_atomic_read32(&clogged_count),
                             apr_atomic_read32(write_completion_q->total),
                             apr_atomic_read32(keepalive_q->total),
                             *(volatile apr_uint32_t*)write_completion_q->total,
                             *(volatile apr_uint32_t*)keepalive_q->total,
                             apr_atomic_read32(&lingering_count),
                             apr_atomic_read32(&suspended_count));
                if (dying) {
@@ -1762,11 +1764,12 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                                 "All workers busy, not accepting new conns "
                                 "in this process");
                }
                else if (  (int)apr_atomic_read32(&connection_count)
                           - (int)apr_atomic_read32(&lingering_count)
                         > threads_per_child
                           + ap_queue_info_get_idlers(worker_queue_info) *
                             worker_factor / WORKER_FACTOR_SCALE)
                else if ((c_count = apr_atomic_read32(&connection_count))
                             > (l_count = apr_atomic_read32(&lingering_count))
                         && (c_count - l_count
                                > ap_queue_info_get_idlers(worker_queue_info)
                                  * worker_factor / WORKER_FACTOR_SCALE
                                  + threads_per_child))
                {
                    if (!listeners_disabled)
                        disable_listensocks(process_slot);
@@ -1875,14 +1878,14 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)

            apr_thread_mutex_unlock(timeout_mutex);

            ps->keep_alive = apr_atomic_read32(keepalive_q->total);
            ps->write_completion = apr_atomic_read32(write_completion_q->total);
            ps->keep_alive = *(volatile apr_uint32_t*)keepalive_q->total;
            ps->write_completion = *(volatile apr_uint32_t*)write_completion_q->total;
            ps->connections = apr_atomic_read32(&connection_count);
            ps->suspended = apr_atomic_read32(&suspended_count);
            ps->lingering_close = apr_atomic_read32(&lingering_count);
        }
        else if ((workers_were_busy || dying)
                 && apr_atomic_read32(keepalive_q->total)) {
                 && *(volatile apr_uint32_t*)keepalive_q->total) {
            apr_thread_mutex_lock(timeout_mutex);
            process_keepalive_queue(0); /* kill'em all \m/ */
            apr_thread_mutex_unlock(timeout_mutex);
@@ -1908,10 +1911,12 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
        }

        if (listeners_disabled && !workers_were_busy
            && (int)apr_atomic_read32(&connection_count)
               - (int)apr_atomic_read32(&lingering_count)
               < ((int)ap_queue_info_get_idlers(worker_queue_info) - 1)
                 * worker_factor / WORKER_FACTOR_SCALE + threads_per_child)
            && ((c_count = apr_atomic_read32(&connection_count))
                    >= (l_count = apr_atomic_read32(&lingering_count))
                && (i_count = ap_queue_info_get_idlers(worker_queue_info)) > 0
                && (c_count - l_count
                        < (i_count - 1) * worker_factor / WORKER_FACTOR_SCALE
                          + threads_per_child)))
        {
            listeners_disabled = 0;
            enable_listensocks(process_slot);
@@ -2143,9 +2148,14 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy)
    int loops;
    int prev_threads_created;
    int max_recycled_pools = -1;
    int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL};
    /* XXX don't we need more to handle K-A or lingering close? */
    const apr_uint32_t pollset_size = threads_per_child * 2;
    const int good_methods[] = { APR_POLLSET_KQUEUE,
                                 APR_POLLSET_PORT,
                                 APR_POLLSET_EPOLL };
    /* XXX: K-A or lingering close connection included in the async factor */
    const apr_uint32_t async_factor = worker_factor / WORKER_FACTOR_SCALE;
    const apr_uint32_t pollset_size = (apr_uint32_t)num_listensocks +
                                      (apr_uint32_t)threads_per_child *
                                      (async_factor > 2 ? async_factor : 2);

    /* We must create the fd queues before we start up the listener
     * and worker threads. */
@@ -3339,6 +3349,10 @@ static int event_pre_config(apr_pool_t * pconf, apr_pool_t * plog,
    had_healthy_child = 0;
    ap_extended_status = 0;

    event_pollset = NULL;
    worker_queue_info = NULL;
    listener_os_thread = NULL;

    return OK;
}

@@ -3752,8 +3766,9 @@ static const char *set_worker_factor(cmd_parms * cmd, void *dummy,
        return "AsyncRequestWorkerFactor argument must be a positive number";

    worker_factor = val * WORKER_FACTOR_SCALE;
    if (worker_factor == 0)
        worker_factor = 1;
    if (worker_factor < WORKER_FACTOR_SCALE) {
        worker_factor = WORKER_FACTOR_SCALE;
    }
    return NULL;
}

+37 −46
Original line number Diff line number Diff line
@@ -27,10 +27,10 @@ struct recycled_pool

struct fd_queue_info_t
{
    apr_uint32_t idlers;     /**
    apr_uint32_t volatile idlers; /**
                                   * >= zero_pt: number of idle worker threads
                              * < zero_pt:  number of threads blocked waiting
                              *             for an idle worker
                                   * <  zero_pt: number of threads blocked,
                                   *             waiting for an idle worker
                                   */
    apr_thread_mutex_t *idlers_mutex;
    apr_thread_cond_t *wait_for_idler;
@@ -38,7 +38,7 @@ struct fd_queue_info_t
    int max_idlers;
    int max_recycled_pools;
    apr_uint32_t recycled_pools_count;
    struct recycled_pool *recycled_pools;
    struct recycled_pool *volatile recycled_pools;
};

static apr_status_t queue_info_cleanup(void *data_)
@@ -97,15 +97,11 @@ apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info,
                                    apr_pool_t * pool_to_recycle)
{
    apr_status_t rv;
    apr_int32_t prev_idlers;

    ap_push_pool(queue_info, pool_to_recycle);

    /* Atomically increment the count of idle workers */
    prev_idlers = apr_atomic_inc32(&(queue_info->idlers)) - zero_pt;

    /* If other threads are waiting on a worker, wake one up */
    if (prev_idlers < 0) {
    if (apr_atomic_inc32(&queue_info->idlers) < zero_pt) {
        rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
        if (rv != APR_SUCCESS) {
            AP_DEBUG_ASSERT(0);
@@ -127,31 +123,32 @@ apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info,

apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t * queue_info)
{
    apr_int32_t new_idlers;
    new_idlers = apr_atomic_add32(&(queue_info->idlers), -1) - zero_pt;
    if (--new_idlers <= 0) {
        apr_atomic_inc32(&(queue_info->idlers));    /* back out dec */
    /* Don't block if there isn't any idle worker. */
    for (;;) {
        apr_uint32_t idlers = queue_info->idlers;
        if (idlers <= zero_pt) {
            return APR_EAGAIN;
        }
        if (apr_atomic_cas32(&queue_info->idlers, idlers - 1,
                             idlers) == idlers) {
            return APR_SUCCESS;
        }
    }
}

apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info,
                                          int *had_to_block)
{
    apr_status_t rv;
    apr_int32_t prev_idlers;

    /* Atomically decrement the idle worker count, saving the old value */
    /* See TODO in ap_queue_info_set_idle() */
    prev_idlers = apr_atomic_add32(&(queue_info->idlers), -1) - zero_pt;

    /* Block if there weren't any idle workers */
    if (prev_idlers <= 0) {
    /* Block if there isn't any idle worker.
     * apr_atomic_add32(x, -1) does the same as dec32(x), except
     * that it returns the previous value (unlike dec32's bool).
     */
    if (apr_atomic_add32(&queue_info->idlers, -1) <= zero_pt) {
        rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
        if (rv != APR_SUCCESS) {
            AP_DEBUG_ASSERT(0);
            /* See TODO in ap_queue_info_set_idle() */
            apr_atomic_inc32(&(queue_info->idlers));    /* back out dec */
            return rv;
        }
@@ -205,11 +202,11 @@ apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info,

apr_uint32_t ap_queue_info_get_idlers(fd_queue_info_t * queue_info)
{
    apr_int32_t val;
    val = (apr_int32_t)apr_atomic_read32(&queue_info->idlers) - zero_pt;
    if (val < 0)
    apr_uint32_t val;
    val = apr_atomic_read32(&queue_info->idlers);
    if (val <= zero_pt)
        return 0;
    return val;
    return val - zero_pt;
}

void ap_push_pool(fd_queue_info_t * queue_info,
@@ -490,13 +487,20 @@ apr_status_t ap_queue_pop_something(fd_queue_t * queue, apr_socket_t ** sd,
    return rv;
}

static apr_status_t queue_interrupt(fd_queue_t * queue, int all)
static apr_status_t queue_interrupt(fd_queue_t * queue, int all, int term)
{
    apr_status_t rv;

    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
        return rv;
    }
    /* we must hold one_big_mutex when setting this... otherwise,
     * we could end up setting it and waking everybody up just after a
     * would-be popper checks it but right before they block
     */
    if (term) {
        queue->terminated = 1;
    }
    if (all)
        apr_thread_cond_broadcast(queue->not_empty);
    else
@@ -506,28 +510,15 @@ static apr_status_t queue_interrupt(fd_queue_t * queue, int all)

apr_status_t ap_queue_interrupt_all(fd_queue_t * queue)
{
    return queue_interrupt(queue, 1);
    return queue_interrupt(queue, 1, 0);
}

apr_status_t ap_queue_interrupt_one(fd_queue_t * queue)
{
    return queue_interrupt(queue, 0);
    return queue_interrupt(queue, 0, 0);
}

apr_status_t ap_queue_term(fd_queue_t * queue)
{
    apr_status_t rv;

    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
        return rv;
    }
    /* we must hold one_big_mutex when setting this... otherwise,
     * we could end up setting it and waking everybody up just after a
     * would-be popper checks it but right before they block
     */
    queue->terminated = 1;
    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
        return rv;
    }
    return ap_queue_interrupt_all(queue);
    return queue_interrupt(queue, 1, 1);
}
+14 −15
Original line number Diff line number Diff line
@@ -382,18 +382,7 @@ apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p)
    return rv;
}

apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
{
    apr_status_t rv;

    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
        return rv;
    }
    apr_thread_cond_broadcast(queue->not_empty);
    return apr_thread_mutex_unlock(queue->one_big_mutex);
}

apr_status_t ap_queue_term(fd_queue_t *queue)
static apr_status_t queue_interrupt_all(fd_queue_t *queue, int term)
{
    apr_status_t rv;

@@ -404,9 +393,19 @@ apr_status_t ap_queue_term(fd_queue_t *queue)
     * we could end up setting it and waking everybody up just after a
     * would-be popper checks it but right before they block
     */
    if (term) {
        queue->terminated = 1;
    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
        return rv;
    }
    return ap_queue_interrupt_all(queue);
    apr_thread_cond_broadcast(queue->not_empty);
    return apr_thread_mutex_unlock(queue->one_big_mutex);
}

apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
{
    return queue_interrupt_all(queue, 0);
}

apr_status_t ap_queue_term(fd_queue_t *queue)
{
    return queue_interrupt_all(queue, 1);
}