Commit db852cb6 authored by Ryan Bloom's avatar Ryan Bloom
Browse files

Make the worker MPM shutdown and restart cleanly. This also

cleans up some race conditions, and gets the worker using
pools more cleanly.

Submitted by:	[Aaron Bannert <aaron@clove.org>]


git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@90635 13f79535-47bb-0310-9956-ffa450edef68
parent 348c220c
Loading
Loading
Loading
Loading
+7 −2
Original line number Diff line number Diff line
Changes with Apache 2.0.25-dev

  *) Make the worker MPM shutdown and restart cleanly.  This also
     cleans up some race conditions, and gets the worker using
     pools more cleanly.  [Aaron Bannert <aaron@clove.org>]

  *) Implement CRYPTO_set_locking_callback() in terms of apr_lock
     for mod_ssl
     [Madhusudan Mathihalli <madhusudan_mathihalli@hp.com>]
+101 −42
Original line number Diff line number Diff line
@@ -60,7 +60,11 @@

/* Assumption: increment and decrement are atomic on int */

int ap_increase_blanks(FDQueue *queue) 
/**
 * Threadsafe way to increment the number of empty slots ("blanks")
 * in the resource queue.
 */
int ap_increase_blanks(fd_queue_t *queue) 
{
    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
@@ -69,61 +73,129 @@ int ap_increase_blanks(FDQueue *queue)
    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }

    return FD_QUEUE_SUCCESS;
}

/**
 * Detects when the fd_queue_t is full. This utility function is expected
 * to be called from within critical sections, and is not threadsafe.
 */
static int ap_queue_full(fd_queue_t *queue)
{
    return (queue->blanks <= 0);
}

/**
 * Detects when the fd_queue_t is empty. This utility function is expected
 * to be called from within critical sections, and is not threadsafe.
 */
static int ap_queue_empty(fd_queue_t *queue)
{
    /*return (queue->head == queue->tail);*/
    return (queue->blanks >= queue->bounds - 1);
}

/**
 * Callback routine that is called to destroy this
 * fd_queue_t when it's pool is destroyed.
 */
static apr_status_t ap_queue_destroy(void *data) 
{
    FDQueue *queue = data;
    /* Ignore errors here, we can't do anything about them anyway */
    pthread_cond_destroy(&(queue->not_empty));
    pthread_cond_destroy(&(queue->not_full));
    fd_queue_t *queue = data;

    /* Ignore errors here, we can't do anything about them anyway.
     * XXX: We should at least try to signal an error here, it is
     * indicative of a programmer error. -aaron */
    pthread_cond_destroy(&queue->not_empty);
    pthread_cond_destroy(&queue->not_full);
    pthread_mutex_destroy(&queue->one_big_mutex);

    return FD_QUEUE_SUCCESS;
}

int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) 
/**
 * Initialize the fd_queue_t.
 */
int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a) 
{
    int i;
    int bounds = queue_capacity + 1;
    pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
    pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
    queue->not_empty = not_empty;
    queue->not_full = not_full;
    pthread_mutex_init(&queue->one_big_mutex, NULL);
    int bounds;

    if (pthread_mutex_init(&queue->one_big_mutex, NULL) != 0)
        return FD_QUEUE_FAILURE;
    if (pthread_cond_init(&queue->not_empty, NULL) != 0)
        return FD_QUEUE_FAILURE;
    if (pthread_cond_init(&queue->not_full, NULL) != 0)
        return FD_QUEUE_FAILURE;

    bounds = queue_capacity + 1;
    queue->head = queue->tail = 0;
    queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement));
    queue->data = apr_palloc(a, bounds * sizeof(fd_queue_elem_t));
    queue->bounds = bounds;
    queue->blanks = 0;
    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
    queue->blanks = queue_capacity;

    /* Set all the sockets in the queue to NULL */
    for (i = 0; i < bounds; ++i)
        queue->data[i].sd = NULL;

    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);

    return FD_QUEUE_SUCCESS;
}

int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) 
/**
 * Push a new socket onto the queue. Blocks if the queue is full. Once
 * the push operation has completed, it signals other threads waiting
 * in apr_queue_pop() that they may continue consuming sockets.
 */
int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p) 
{
    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }

    /* Keep waiting until we wake up and find that the queue is not full. */
    while (ap_queue_full(queue)) {
        pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
    }

    queue->data[queue->tail].sd = sd;
    queue->data[queue->tail].p = p;
    queue->tail = (queue->tail + 1) % queue->bounds;
    queue->blanks--;

    pthread_cond_signal(&queue->not_empty);

    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
    pthread_cond_signal(&(queue->not_empty));

    return FD_QUEUE_SUCCESS;
}

apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p) 
/**
 * Retrieves the next available socket from the queue. If there are no
 * sockets available, it will block until one becomes available.
 * Once retrieved, the socket is placed into the address specified by
 * 'sd'.
 */
apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) 
{
    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
    if (queue->head == queue->tail) {
        pthread_cond_wait(&(queue->not_empty), &queue->one_big_mutex);

    /* Keep waiting until we wake up and find that the queue is not empty. */
    if (ap_queue_empty(queue)) {
        pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex);
        /* If we wake up and it's still empty, then we were interrupted */
        if (ap_queue_empty(queue)) {
            if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
                return FD_QUEUE_FAILURE;
            }
            return FD_QUEUE_EINTR;
        }
    } 
    
    *sd = queue->data[queue->head].sd;
@@ -133,40 +205,27 @@ apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p)
    if (sd != NULL) {
        queue->head = (queue->head + 1) % queue->bounds;
    }
    queue->blanks++;

    /* we just consumed a slot, so we're no longer full */
    pthread_cond_signal(&queue->not_full);

    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
    if (queue->blanks > 0) {
        pthread_cond_signal(&(queue->not_full));
    }
    return APR_SUCCESS;
}

int ap_queue_size(FDQueue *queue) 
{
    return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
}

int ap_queue_full(FDQueue *queue) 
{
    return(queue->blanks <= 0);
    return APR_SUCCESS;
}

int ap_block_on_queue(FDQueue *queue) 
apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
{
    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
    if (ap_queue_full(queue)) {
        pthread_cond_wait(&(queue->not_full), &queue->one_big_mutex);
    }
    pthread_cond_broadcast(&queue->not_empty);
    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
    return FD_QUEUE_SUCCESS;
}
void ap_queue_signal_all_wakeup(FDQueue *queue)
{
    pthread_cond_broadcast(&(queue->not_empty));
}
+23 −22
Original line number Diff line number Diff line
@@ -66,34 +66,35 @@
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <apr_errno.h>

#define FD_QUEUE_SUCCESS 0
#define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
                               of queue_pop semantics */
#define FD_QUEUE_EINTR APR_EINTR

typedef struct fd_queue_elem {
struct fd_queue_elem_t {
    apr_socket_t      *sd;
    apr_pool_t        *p;
} FDQueueElement;
};
typedef struct fd_queue_elem_t fd_queue_elem_t;

typedef struct fd_queue {
struct fd_queue_t {
    int                head;
    int                tail;
    FDQueueElement *data;
    fd_queue_elem_t   *data;
    int                bounds;
    int                blanks;
    pthread_mutex_t    one_big_mutex;
    pthread_cond_t     not_empty;
    pthread_cond_t     not_full;
} FDQueue;
    int                cancel_state;
};
typedef struct fd_queue_t fd_queue_t;

int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a);
int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p);
apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p);
int ap_queue_size(FDQueue *queue);
int ap_queue_full(FDQueue *queue);
int ap_block_on_queue(FDQueue *queue);
void ap_queue_signal_all_wakeup(FDQueue *queue);
int ap_increase_blanks(FDQueue *queue);
int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);

#endif /* FDQUEUE_H */
+34 −22
Original line number Diff line number Diff line
@@ -124,14 +124,13 @@ static int workers_may_exit = 0;
static int requests_this_child;
static int num_listensocks = 0;
static apr_socket_t **listensocks;
static FDQueue *worker_queue;
static fd_queue_t *worker_queue;

/* The structure used to pass unique initialization info to each thread */
typedef struct {
    int pid;
    int tid;
    int sd;
    apr_pool_t *tpool; /* "pthread" would be confusing */
} proc_info;

/* Structure used to pass information to the thread responsible for 
@@ -201,7 +200,9 @@ static const char *lock_fname;
static void signal_workers(void)
{
    workers_may_exit = 1;
    ap_queue_signal_all_wakeup(worker_queue);
    /* XXX: This will happen naturally on a graceful, and we don't care otherwise.
    ap_queue_signal_all_wakeup(worker_queue); */
    ap_queue_interrupt_all(worker_queue);
}

AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result)
@@ -553,7 +554,7 @@ static void *listener_thread(apr_thread_t *thd, void * dummy)
    proc_info * ti = dummy;
    int process_slot = ti->pid;
    int thread_slot = ti->tid;
    apr_pool_t *tpool = ti->tpool;
    apr_pool_t *tpool = apr_thread_pool_get(thd);
    apr_socket_t *csd = NULL;
    apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
    apr_socket_t *sd = NULL;
@@ -564,8 +565,6 @@ static void *listener_thread(apr_thread_t *thd, void * dummy)

    free(ti);

    apr_pool_create(&ptrans, tpool);

    apr_lock_acquire(worker_thread_count_mutex);
    worker_thread_count++;
    apr_lock_release(worker_thread_count_mutex);
@@ -574,12 +573,10 @@ static void *listener_thread(apr_thread_t *thd, void * dummy)
    for(n=0 ; n <= num_listensocks ; ++n)
	apr_poll_socket_add(pollset, listensocks[n], APR_POLLIN);

    worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
    ap_queue_init(worker_queue, ap_threads_per_child, pchild);

    /* TODO: Switch to a system where threads reuse the results from earlier
       poll calls - manoj */
    while (1) {
        /* TODO: requests_this_child should be synchronized - aaron */
        if (requests_this_child <= 0) {
            check_infinite_requests();
        }
@@ -644,6 +641,9 @@ static void *listener_thread(apr_thread_t *thd, void * dummy)
        }
    got_fd:
        if (!workers_may_exit) {
            /* create a new transaction pool for each accepted socket */
            apr_pool_create(&ptrans, tpool);

            if ((rv = apr_accept(&csd, sd, ptrans)) != APR_SUCCESS) {
                csd = NULL;
                ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, 
@@ -658,7 +658,6 @@ static void *listener_thread(apr_thread_t *thd, void * dummy)
            }
            if (csd != NULL) {
                ap_queue_push(worker_queue, csd, ptrans);
                ap_block_on_queue(worker_queue);
            }
        }
        else {
@@ -673,13 +672,15 @@ static void *listener_thread(apr_thread_t *thd, void * dummy)
        }
    }

    apr_pool_destroy(tpool);
    ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
        (request_rec *) NULL);
    dying = 1;
    ap_scoreboard_image->parent[process_slot].quiescing = 1;
    kill(ap_my_pid, SIGTERM);

/* this is uncommented when we make a pool-pool
    apr_thread_exit(thd, APR_SUCCESS);
*/
    return NULL;
}

@@ -688,30 +689,35 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
    proc_info * ti = dummy;
    int process_slot = ti->pid;
    int thread_slot = ti->tid;
    apr_pool_t *tpool = ti->tpool;
    apr_socket_t *csd = NULL;
    apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
    apr_status_t rv;

    free(ti);

    /* apr_pool_create(&ptrans, tpool); */

    while (!workers_may_exit) {
        ap_queue_pop(worker_queue, &csd, &ptrans);
        if (!csd) {
        rv = ap_queue_pop(worker_queue, &csd, &ptrans);
        /* We get FD_QUEUE_EINTR whenever ap_queue_pop() has been interrupted
         * from an explicit call to ap_queue_interrupt_all(). This allows
         * us to unblock threads stuck in ap_queue_pop() when a shutdown
         * is pending. */
        if (rv == FD_QUEUE_EINTR || !csd) {
            continue;
        }
        ap_increase_blanks(worker_queue);
        process_socket(ptrans, csd, process_slot, thread_slot);
        requests_this_child--;
        apr_pool_clear(ptrans);
        requests_this_child--; /* FIXME: should be synchronized - aaron */
        apr_pool_destroy(ptrans);
    }

    apr_pool_destroy(tpool);
    ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
        (request_rec *) NULL);
    apr_lock_acquire(worker_thread_count_mutex);
    worker_thread_count--;
    apr_lock_release(worker_thread_count_mutex);

    apr_thread_exit(thd, APR_SUCCESS);
    return NULL;
}

@@ -738,13 +744,19 @@ static void *start_threads(apr_thread_t *thd, void * dummy)
    int threads_created = 0;
    apr_thread_t *listener;

    /* We must create the fd queue before we start up the listener
     * and worker threads. */
    worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
    ap_queue_init(worker_queue, ap_threads_per_child, pchild);

    my_info = (proc_info *)malloc(sizeof(proc_info));
    my_info->pid = my_child_num;
    my_info->tid = i;
    my_info->sd = 0;
    apr_pool_create(&my_info->tpool, pchild);
    apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
    while (1) {
        /* Does ap_threads_per_child include the listener thread?
         * Why does this forloop start at 1? -aaron */
        for (i = 1; i < ap_threads_per_child; i++) {
            int status = ap_scoreboard_image->servers[child_num_arg][i].status;

@@ -761,7 +773,6 @@ static void *start_threads(apr_thread_t *thd, void * dummy)
	    my_info->pid = my_child_num;
            my_info->tid = i;
	    my_info->sd = 0;
	    apr_pool_create(&my_info->tpool, pchild);
	
  	    /* We are creating threads right now */
	    (void) ap_update_child_status(my_child_num, i, SERVER_STARTING, 
@@ -794,6 +805,7 @@ static void *start_threads(apr_thread_t *thd, void * dummy)
     *  "life_status" is almost right, but it's in the worker's structure, and 
     *  the name could be clearer.   gla
     */
    apr_thread_exit(thd, APR_SUCCESS);
    return NULL;
}

@@ -870,7 +882,7 @@ static void child_main(int child_num_arg)
    apr_lock_create(&pipe_of_death_mutex, APR_MUTEX, APR_INTRAPROCESS, 
                    NULL, pchild);

    ts = apr_palloc(pchild, sizeof(*ts));
    ts = (thread_starter *)apr_palloc(pchild, sizeof(*ts));

    apr_threadattr_create(&thread_attr, pchild);
    apr_threadattr_detach_set(thread_attr, 0);    /* 0 means PTHREAD_CREATE_JOINABLE */