Commit e7c64e94 authored by Yann Ylavic's avatar Yann Ylavic
Browse files

Revert r1823629 to stage backport commits too.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1823636 13f79535-47bb-0310-9956-ffa450edef68
parent 9dbfbe89
Loading
Loading
Loading
Loading
+0 −6
Original line number Diff line number Diff line
                                                         -*- coding: utf-8 -*-
Changes with Apache 2.4.30
 
  *) mpm_event: Let the listener thread do its maintenance job on resources
     shortage.  PR 61979.  [Yann Ylavic]

  *) mpm_event: Wakeup the listener to re-enable listening sockets.
     [Yann Ylavic]

  *) mod_ssl: The SSLCompression directive will now give an error if used
     with an OpenSSL build which does not support any compression methods.
     [Joe Orton]
+119 −159
Original line number Diff line number Diff line
@@ -173,10 +173,10 @@ static int max_workers = 0; /* MaxRequestWorkers */
static int server_limit = 0;                /* ServerLimit */
static int thread_limit = 0;                /* ThreadLimit */
static int had_healthy_child = 0;
static volatile int dying = 0;
static volatile int workers_may_exit = 0;
static volatile int start_thread_may_exit = 0;
static volatile int listener_may_exit = 0;
static int dying = 0;
static int workers_may_exit = 0;
static int start_thread_may_exit = 0;
static int listener_may_exit = 0;
static int listener_is_wakeable = 0;        /* Pollset supports APR_POLLSET_WAKEABLE */
static int num_listensocks = 0;
static apr_int32_t conns_this_child;        /* MaxConnectionsPerChild, only access
@@ -287,7 +287,7 @@ static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el)
    apr_time_t next_expiry;

    APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list);
    ++*q->total;
    apr_atomic_inc32(q->total);
    ++q->count;

    /* Cheaply update the overall queues' next expiry according to the
@@ -309,7 +309,7 @@ static void TO_QUEUE_REMOVE(struct timeout_queue *q, event_conn_state_t *el)
{
    APR_RING_REMOVE(el, timeout_list);
    APR_RING_ELEM_INIT(el, timeout_list);
    --*q->total;
    apr_atomic_dec32(q->total);
    --q->count;
}

@@ -440,8 +440,6 @@ static pid_t ap_my_pid; /* Linux getpid() doesn't work except in main
static pid_t parent_pid;
static apr_os_thread_t *listener_os_thread;

static int ap_child_slot;       /* Current child process slot in scoreboard */

/* The LISTENER_SIGNAL signal will be sent from the main thread to the
 * listener thread to wake it up for graceful termination (what a child
 * process from an old generation does when the admin does "apachectl
@@ -455,27 +453,19 @@ static int ap_child_slot; /* Current child process slot in scoreboard */
 */
static apr_socket_t **worker_sockets;

static volatile apr_uint32_t listensocks_disabled;

static void disable_listensocks(void)
static void disable_listensocks(int process_slot)
{
    int i;
    if (apr_atomic_cas32(&listensocks_disabled, 1, 0) != 0) {
        return;
    }
    if (event_pollset) {
    for (i = 0; i < num_listensocks; i++) {
        apr_pollset_remove(event_pollset, &listener_pollfd[i]);
    }
    }
    ap_scoreboard_image->parent[ap_child_slot].not_accepting = 1;
    ap_scoreboard_image->parent[process_slot].not_accepting = 1;
}

static void enable_listensocks(void)
static void enable_listensocks(int process_slot)
{
    int i;
    if (listener_may_exit
            || apr_atomic_cas32(&listensocks_disabled, 0, 1) != 1) {
    if (listener_may_exit) {
        return;
    }
    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(00457)
@@ -493,29 +483,7 @@ static void enable_listensocks(void)
     * XXX: This is not yet optimal. If many workers suddenly become available,
     * XXX: the parent may kill some processes off too soon.
     */
    ap_scoreboard_image->parent[ap_child_slot].not_accepting = 0;
}

static APR_INLINE apr_uint32_t listeners_disabled(void)
{
    return apr_atomic_read32(&listensocks_disabled);
}

static APR_INLINE int connections_above_limit(void)
{
    apr_uint32_t i_count = ap_queue_info_get_idlers(worker_queue_info);
    if (i_count > 0) {
        apr_uint32_t c_count = apr_atomic_read32(&connection_count);
        apr_uint32_t l_count = apr_atomic_read32(&lingering_count);
        if (c_count <= l_count
                /* Off by 'listeners_disabled()' to avoid flip flop */
                || c_count - l_count < (apr_uint32_t)threads_per_child +
                                       (i_count - listeners_disabled()) *
                                       (worker_factor / WORKER_FACTOR_SCALE)) {
            return 0;
        }
    }
    return 1;
    ap_scoreboard_image->parent[process_slot].not_accepting = 0;
}

static void abort_socket_nonblocking(apr_socket_t *csd)
@@ -573,18 +541,6 @@ static void close_worker_sockets(void)
static void wakeup_listener(void)
{
    listener_may_exit = 1;
    disable_listensocks();

    /* Unblock the listener if it's poll()ing */
    if (event_pollset && listener_is_wakeable) {
        apr_pollset_wakeup(event_pollset);
    }

    /* unblock the listener if it's waiting for a worker */
    if (worker_queue_info) {
        ap_queue_info_term(worker_queue_info);
    }

    if (!listener_os_thread) {
        /* XXX there is an obscure path that this doesn't handle perfectly:
         *     right after listener thread is created but before
@@ -593,6 +549,15 @@ static void wakeup_listener(void)
         */
        return;
    }

    /* Unblock the listener if it's poll()ing */
    if (listener_is_wakeable) {
        apr_pollset_wakeup(event_pollset);
    }

    /* unblock the listener if it's waiting for a worker */
    ap_queue_info_term(worker_queue_info);

    /*
     * we should just be able to "kill(ap_my_pid, LISTENER_SIGNAL)" on all
     * platforms and wake up the listener thread since it is the only thread
@@ -613,7 +578,7 @@ static int terminate_mode = ST_INIT;

static void signal_threads(int mode)
{
    if (terminate_mode >= mode) {
    if (terminate_mode == mode) {
        return;
    }
    terminate_mode = mode;
@@ -747,7 +712,6 @@ static int child_fatal;

static apr_status_t decrement_connection_count(void *cs_)
{
    int is_last_connection;
    event_conn_state_t *cs = cs_;
    switch (cs->pub.state) {
        case CONN_STATE_LINGER_NORMAL:
@@ -760,14 +724,9 @@ static apr_status_t decrement_connection_count(void *cs_)
        default:
            break;
    }
    /* Unblock the listener if it's waiting for connection_count = 0,
     * or if the listening sockets were disabled due to limits and can
     * now accept new connections.
     */
    is_last_connection = !apr_atomic_dec32(&connection_count);
    if (listener_is_wakeable
            && ((is_last_connection && listener_may_exit)
                || (listeners_disabled() && !connections_above_limit()))) {
    /* Unblock the listener if it's waiting for connection_count = 0 */
    if (!apr_atomic_dec32(&connection_count)
             && listener_is_wakeable && listener_may_exit) {
        apr_pollset_wakeup(event_pollset);
    }
    return APR_SUCCESS;
@@ -1219,16 +1178,17 @@ static void check_infinite_requests(void)
    }
}

static void close_listeners(int *closed)
static void close_listeners(int process_slot, int *closed)
{
    if (!*closed) {
        int i;
        disable_listensocks(process_slot);
        ap_close_listeners_ex(my_bucket->listeners);
        *closed = 1;
        dying = 1;
        ap_scoreboard_image->parent[ap_child_slot].quiescing = 1;
        ap_scoreboard_image->parent[process_slot].quiescing = 1;
        for (i = 0; i < threads_per_child; ++i) {
            ap_update_child_status_from_indexes(ap_child_slot, i,
            ap_update_child_status_from_indexes(process_slot, i,
                                                SERVER_GRACEFUL, NULL);
        }
        /* wake up the main thread */
@@ -1501,7 +1461,7 @@ static void process_timeout_queue(struct timeout_queue *q,
    struct timeout_queue *qp;
    apr_status_t rv;

    if (!*q->total) {
    if (!apr_atomic_read32(q->total)) {
        return;
    }

@@ -1551,8 +1511,8 @@ static void process_timeout_queue(struct timeout_queue *q,
        APR_RING_UNSPLICE(first, last, timeout_list);
        APR_RING_SPLICE_TAIL(&trash, first, last, event_conn_state_t,
                             timeout_list);
        AP_DEBUG_ASSERT(*q->total >= count && qp->count >= count);
        *q->total -= count;
        AP_DEBUG_ASSERT(apr_atomic_read32(q->total) >= count);
        apr_atomic_sub32(q->total, count);
        qp->count -= count;
        total += count;
    }
@@ -1578,7 +1538,8 @@ static void process_keepalive_queue(apr_time_t timeout_time)
    if (!timeout_time) {
        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
                     "All workers are busy or dying, will close %u "
                     "keep-alive connections", *keepalive_q->total);
                     "keep-alive connections",
                     apr_atomic_read32(keepalive_q->total));
    }
    process_timeout_queue(keepalive_q, timeout_time,
                          start_lingering_close_nonblocking);
@@ -1586,14 +1547,22 @@ static void process_keepalive_queue(apr_time_t timeout_time)

static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
{
    timer_event_t *te;
    apr_status_t rc;
    proc_info *ti = dummy;
    int process_slot = ti->pslot;
    struct process_score *ps = ap_get_scoreboard_process(process_slot);
    apr_pool_t *tpool = apr_thread_pool_get(thd);
    int closed = 0;
    void *csd = NULL;
    apr_pool_t *ptrans;         /* Pool for per-transaction stuff */
    ap_listen_rec *lr;
    int have_idle_worker = 0;
    apr_time_t last_log;
    const apr_pollfd_t *out_pfd;
    apr_int32_t num = 0;
    apr_interval_time_t timeout_interval;
    apr_time_t timeout_time = 0, now, last_log;
    listener_poll_type *pt;
    int closed = 0, listeners_disabled = 0;

    last_log = apr_time_now();
    free(ti);
@@ -1602,9 +1571,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
    if (rc != APR_SUCCESS) {
        ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
                     "failed to initialize pollset, "
                     "shutdown process now");
        resource_shortage = 1;
        signal_threads(ST_UNGRACEFUL);
                     "attempting to shutdown process gracefully");
        signal_threads(ST_GRACEFUL);
        return NULL;
    }

@@ -1615,23 +1583,18 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
    apr_signal(LISTENER_SIGNAL, dummy_signal_handler);

    for (;;) {
        timer_event_t *te;
        const apr_pollfd_t *out_pfd;
        apr_int32_t num = 0;
        apr_interval_time_t timeout_interval;
        apr_time_t now, timeout_time;
        int workers_were_busy = 0;

        if (conns_this_child <= 0)
            check_infinite_requests();

        if (listener_may_exit) {
            close_listeners(&closed);
            close_listeners(process_slot, &closed);
            if (terminate_mode == ST_UNGRACEFUL
                || apr_atomic_read32(&connection_count) == 0)
                break;
        }

        if (conns_this_child <= 0)
            check_infinite_requests();

        now = apr_time_now();
        if (APLOGtrace6(ap_server_conf)) {
            /* trace log status every second */
@@ -1643,8 +1606,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                             "keep-alive: %d lingering: %d suspended: %u)",
                             apr_atomic_read32(&connection_count),
                             apr_atomic_read32(&clogged_count),
                             *(volatile apr_uint32_t*)write_completion_q->total,
                             *(volatile apr_uint32_t*)keepalive_q->total,
                             apr_atomic_read32(write_completion_q->total),
                             apr_atomic_read32(keepalive_q->total),
                             apr_atomic_read32(&lingering_count),
                             apr_atomic_read32(&suspended_count));
                if (dying) {
@@ -1706,13 +1669,11 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
        rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
        if (rc != APR_SUCCESS) {
            if (APR_STATUS_IS_EINTR(rc)) {
                /* Woken up, if we are exiting or listeners are disabled we
                 * must fall through to kill kept-alive connections or test
                 * whether listeners should be re-enabled. Otherwise we only
                 * need to update timeouts (logic is above, so simply restart
                 * the loop).
                /* Woken up, if we are exiting we must fall through to kill
                 * kept-alive connections, otherwise we only need to update
                 * timeouts (logic is above, so restart the loop).
                 */
                if (!listener_may_exit && !listeners_disabled()) {
                if (!listener_may_exit) {
                    continue;
                }
                timeout_time = 0;
@@ -1727,14 +1688,14 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
        }

        if (listener_may_exit) {
            close_listeners(&closed);
            close_listeners(process_slot, &closed);
            if (terminate_mode == ST_UNGRACEFUL
                || apr_atomic_read32(&connection_count) == 0)
                break;
        }

        for (; num; --num, ++out_pfd) {
            listener_poll_type *pt = (listener_poll_type *) out_pfd->client_data;
        while (num) {
            pt = (listener_poll_type *) out_pfd->client_data;
            if (pt->type == PT_CSD) {
                /* one of the sockets is readable */
                event_conn_state_t *cs = (event_conn_state_t *) pt->baton;
@@ -1794,16 +1755,24 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                    ap_assert(0);
                }
            }
            else if (pt->type == PT_ACCEPT && !listeners_disabled()) {
            else if (pt->type == PT_ACCEPT) {
                /* A Listener Socket is ready for an accept() */
                if (workers_were_busy) {
                    disable_listensocks();
                    if (!listeners_disabled)
                        disable_listensocks(process_slot);
                    listeners_disabled = 1;
                    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
                                 "All workers busy, not accepting new conns "
                                 "in this process");
                }
                else if (connections_above_limit()) {
                    disable_listensocks();
                else if (  (int)apr_atomic_read32(&connection_count)
                           - (int)apr_atomic_read32(&lingering_count)
                         > threads_per_child
                           + ap_queue_info_get_idlers(worker_queue_info) *
                             worker_factor / WORKER_FACTOR_SCALE)
                {
                    if (!listeners_disabled)
                        disable_listensocks(process_slot);
                    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
                                 "Too many open connections (%u), "
                                 "not accepting new conns in this process",
@@ -1811,41 +1780,34 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                    ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
                                 "Idle workers: %u",
                                 ap_queue_info_get_idlers(worker_queue_info));
                    workers_were_busy = 1;
                    listeners_disabled = 1;
                }
                else if (!listener_may_exit) {
                    void *csd = NULL;
                    ap_listen_rec *lr = (ap_listen_rec *) pt->baton;
                    apr_pool_t *ptrans;         /* Pool for per-transaction stuff */
                else if (listeners_disabled) {
                    listeners_disabled = 0;
                    enable_listensocks(process_slot);
                }
                if (!listeners_disabled) {
                    lr = (ap_listen_rec *) pt->baton;
                    ap_pop_pool(&ptrans, worker_queue_info);

                    if (ptrans == NULL) {
                        /* create a new transaction pool for each accepted socket */
                        apr_allocator_t *allocator = NULL;
                        apr_allocator_t *allocator;

                        rc = apr_allocator_create(&allocator);
                        if (rc == APR_SUCCESS) {
                        apr_allocator_create(&allocator);
                        apr_allocator_max_free_set(allocator,
                                                   ap_max_mem_free);
                            rc = apr_pool_create_ex(&ptrans, pconf, NULL,
                                                    allocator);
                            if (rc == APR_SUCCESS) {
                                apr_pool_tag(ptrans, "transaction");
                        apr_pool_create_ex(&ptrans, pconf, NULL, allocator);
                        apr_allocator_owner_set(allocator, ptrans);
                            }
                        }
                        if (rc != APR_SUCCESS) {
                        if (ptrans == NULL) {
                            ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
                                         ap_server_conf, APLOGNO(03097)
                                         "Failed to create transaction pool");
                            if (allocator) {
                                apr_allocator_destroy(allocator);
                            }
                            resource_shortage = 1;
                            signal_threads(ST_GRACEFUL);
                            continue;
                            return NULL;
                        }
                    }
                    apr_pool_tag(ptrans, "transaction");

                    get_worker(&have_idle_worker, 1, &workers_were_busy);
                    rc = lr->accept_func(&csd, lr, ptrans);
@@ -1872,7 +1834,9 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                    }
                }
            }               /* if:else on pt->type */
        } /* for processing poll */
            out_pfd++;
            num--;
        }                   /* while for processing poll */

        /* XXX possible optimization: stash the current time for use as
         * r->request_time for new requests
@@ -1912,14 +1876,14 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)

            apr_thread_mutex_unlock(timeout_mutex);

            ps->keep_alive = *(volatile apr_uint32_t*)keepalive_q->total;
            ps->write_completion = *(volatile apr_uint32_t*)write_completion_q->total;
            ps->keep_alive = apr_atomic_read32(keepalive_q->total);
            ps->write_completion = apr_atomic_read32(write_completion_q->total);
            ps->connections = apr_atomic_read32(&connection_count);
            ps->suspended = apr_atomic_read32(&suspended_count);
            ps->lingering_close = apr_atomic_read32(&lingering_count);
        }
        else if ((workers_were_busy || dying)
                 && *(volatile apr_uint32_t*)keepalive_q->total) {
                 && apr_atomic_read32(keepalive_q->total)) {
            apr_thread_mutex_lock(timeout_mutex);
            process_keepalive_queue(0); /* kill'em all \m/ */
            apr_thread_mutex_unlock(timeout_mutex);
@@ -1944,14 +1908,22 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
            }
        }

        if (listeners_disabled()
                && !workers_were_busy
                && !connections_above_limit()) {
            enable_listensocks();
        if (listeners_disabled && !workers_were_busy
            && (int)apr_atomic_read32(&connection_count)
               - (int)apr_atomic_read32(&lingering_count)
               < ((int)ap_queue_info_get_idlers(worker_queue_info) - 1)
                 * worker_factor / WORKER_FACTOR_SCALE + threads_per_child)
        {
            listeners_disabled = 0;
            enable_listensocks(process_slot);
        }
        /*
         * XXX: do we need to set some timeout that re-enables the listensocks
         * XXX: in case no other event occurs?
         */
    }     /* listener main loop */

    close_listeners(&closed);
    close_listeners(process_slot, &closed);
    ap_queue_term(worker_queue);

    apr_thread_exit(thd, APR_SUCCESS);
@@ -1998,8 +1970,12 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy)
    proc_info *ti = dummy;
    int process_slot = ti->pslot;
    int thread_slot = ti->tslot;
    apr_socket_t *csd = NULL;
    event_conn_state_t *cs;
    apr_pool_t *ptrans;         /* Pool for per-transaction stuff */
    apr_status_t rv;
    int is_idle = 0;
    timer_event_t *te = NULL;

    free(ti);

@@ -2010,11 +1986,6 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy)
                                        SERVER_STARTING, NULL);

    while (!workers_may_exit) {
        apr_socket_t *csd = NULL;
        event_conn_state_t *cs;
        timer_event_t *te = NULL;
        apr_pool_t *ptrans;         /* Pool for per-transaction stuff */

        if (!is_idle) {
            rv = ap_queue_info_set_idle(worker_queue_info, NULL);
            if (rv != APR_SUCCESS) {
@@ -2038,6 +2009,7 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy)
            break;
        }

        te = NULL;
        rv = ap_queue_pop_something(worker_queue, &csd, &cs, &ptrans, &te);

        if (rv != APR_SUCCESS) {
@@ -2172,14 +2144,9 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy)
    int loops;
    int prev_threads_created;
    int max_recycled_pools = -1;
    const int good_methods[] = { APR_POLLSET_KQUEUE,
                                 APR_POLLSET_PORT,
                                 APR_POLLSET_EPOLL };
    /* XXX: K-A or lingering close connection included in the async factor */
    const apr_uint32_t async_factor = worker_factor / WORKER_FACTOR_SCALE;
    const apr_uint32_t pollset_size = (apr_uint32_t)num_listensocks +
                                      (apr_uint32_t)threads_per_child *
                                      (async_factor > 2 ? async_factor : 2);
    int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL};
    /* XXX don't we need more to handle K-A or lingering close? */
    const apr_uint32_t pollset_size = threads_per_child * 2;

    /* We must create the fd queues before we start up the listener
     * and worker threads. */
@@ -2399,7 +2366,6 @@ static void child_main(int child_num_arg, int child_bucket)
    retained->mpm->mpm_state = AP_MPMQ_STARTING;

    ap_my_pid = getpid();
    ap_child_slot = child_num_arg;
    ap_fatal_signal_child_setup(ap_server_conf);
    apr_pool_create(&pchild, pconf);

@@ -3374,11 +3340,6 @@ static int event_pre_config(apr_pool_t * pconf, apr_pool_t * plog,
    had_healthy_child = 0;
    ap_extended_status = 0;

    event_pollset = NULL;
    worker_queue_info = NULL;
    listener_os_thread = NULL;
    listensocks_disabled = 0;

    return OK;
}

@@ -3792,9 +3753,8 @@ static const char *set_worker_factor(cmd_parms * cmd, void *dummy,
        return "AsyncRequestWorkerFactor argument must be a positive number";

    worker_factor = val * WORKER_FACTOR_SCALE;
    if (worker_factor < WORKER_FACTOR_SCALE) {
        worker_factor = WORKER_FACTOR_SCALE;
    }
    if (worker_factor == 0)
        worker_factor = 1;
    return NULL;
}

+46 −37

File changed.

Preview size limit exceeded, changes collapsed.

+15 −14
Original line number Diff line number Diff line
@@ -382,30 +382,31 @@ apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p)
    return rv;
}

static apr_status_t queue_interrupt_all(fd_queue_t *queue, int term)
apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
{
    apr_status_t rv;

    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
        return rv;
    }
    /* we must hold one_big_mutex when setting this... otherwise,
     * we could end up setting it and waking everybody up just after a
     * would-be popper checks it but right before they block
     */
    if (term) {
        queue->terminated = 1;
    }
    apr_thread_cond_broadcast(queue->not_empty);
    return apr_thread_mutex_unlock(queue->one_big_mutex);
}

apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
{
    return queue_interrupt_all(queue, 0);
}

apr_status_t ap_queue_term(fd_queue_t *queue)
{
    return queue_interrupt_all(queue, 1);
    apr_status_t rv;

    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
        return rv;
    }
    /* we must hold one_big_mutex when setting this... otherwise,
     * we could end up setting it and waking everybody up just after a
     * would-be popper checks it but right before they block
     */
    queue->terminated = 1;
    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
        return rv;
    }
    return ap_queue_interrupt_all(queue);
}