Commit 7819465c authored by Ryan Bloom's avatar Ryan Bloom
Browse files

Get the worker MPM working again. This should fix the serialization

problems, and it makes up initialize the queue only once.


git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@89930 13f79535-47bb-0310-9956-ffa450edef68
parent 143fc83e
Loading
Loading
Loading
Loading
+63 −59
Original line number Diff line number Diff line
@@ -57,112 +57,116 @@
 */

#include "fdqueue.h"
#include "apr_pools.h"

/* Assumption: queue itself is allocated by the user */
/* Assumption: increment and decrement are atomic on int */

int ap_queue_size(FDQueue *queue) {
    return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
}

int ap_queue_full(FDQueue *queue) {
    return(queue->blanks <= 0);
}

int ap_block_on_queue(FDQueue *queue) {
#if 0
int ap_increase_blanks(FDQueue *queue) 
{
    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
#endif
    if (ap_queue_full(queue)) {
        pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
    }
#if 0
    queue->blanks++;
    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
#endif
    return FD_QUEUE_SUCCESS;
}

static int increase_blanks(FDQueue *queue) {
    queue->blanks++;
    return FD_QUEUE_SUCCESS;
}

static apr_status_t ap_queue_destroy(void *data) {
static apr_status_t ap_queue_destroy(void *data) 
{
    FDQueue *queue = data;
    /* Ignore errors here, we can't do anything about them anyway */
    pthread_cond_destroy(&queue->not_empty);
    pthread_cond_destroy(&queue->not_full);
    pthread_cond_destroy(&(queue->not_empty));
    pthread_cond_destroy(&(queue->not_full));
    pthread_mutex_destroy(&queue->one_big_mutex);
    return FD_QUEUE_SUCCESS;
}

int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) {
int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) 
{
    int i;
    int bounds = queue_capacity + 1;
    pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
    pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
    queue->not_empty = not_empty;
    queue->not_full = not_full;
    pthread_mutex_init(&queue->one_big_mutex, NULL);
    pthread_cond_init(&queue->not_empty, NULL);
    pthread_cond_init(&queue->not_full, NULL);
    queue->head = queue->tail = 0;
    queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement));
    queue->bounds = bounds;
    queue->blanks = queue_capacity;
    queue->blanks = 0;
    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
    for (i=0; i < bounds; ++i)
        queue->data[i].sd = NULL;
    return FD_QUEUE_SUCCESS;
}

int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) {
int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) 
{
    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
    queue->data[queue->tail].sd = sd;
    queue->data[queue->tail].p = p;
    queue->tail = (queue->tail + 1) % queue->bounds;
    queue->blanks--;
    pthread_cond_signal(&queue->not_empty);
#if 0
    if (queue->head == (queue->tail + 1) % queue->bounds) {
#endif
    if (ap_queue_full(queue)) {
        pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
    pthread_cond_signal(&(queue->not_empty));
    return FD_QUEUE_SUCCESS;
}

apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty) {
    increase_blanks(queue);
    /* We have just removed one from the queue.  By definition, it is
     * no longer full.  We can ALWAYS signal the listener thread at
     * this point.  However, the original code didn't do it this way,
     * so I am leaving the original code in, just commented out.  BTW,
     * originally, the increase_blanks wasn't in this function either.
     *
     if (queue->blanks > 0) {
     */
    pthread_cond_signal(&queue->not_full);

    /*    }    */
    if (queue->head == queue->tail) {
        if (block_if_empty) {
            pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex);
fprintf(stderr, "Found a non-empty queue  :-)\n");
apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p) 
{
    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
    if (queue->head == queue->tail) {
        pthread_cond_wait(&(queue->not_empty), &queue->one_big_mutex);
    } 
    
    *sd = queue->data[queue->head].sd;
    *p = queue->data[queue->head].p;
    queue->data[queue->head].sd = NULL;
    if (*sd != NULL) {
    queue->data[queue->head].p = NULL;
    if (sd != NULL) {
        queue->head = (queue->head + 1) % queue->bounds;
    }
    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
    if (queue->blanks > 0) {
        pthread_cond_signal(&(queue->not_full));
    }
    return APR_SUCCESS;
}

int ap_queue_size(FDQueue *queue) 
{
    return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
}

int ap_queue_full(FDQueue *queue) 
{
    return(queue->blanks <= 0);
}

int ap_block_on_queue(FDQueue *queue) 
{
    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
    if (ap_queue_full(queue)) {
        pthread_cond_wait(&(queue->not_full), &queue->one_big_mutex);
    }
    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
    return FD_QUEUE_SUCCESS;
}

void ap_queue_signal_all_wakeup(FDQueue *queue)
{
fprintf(stderr, "trying to broadcast to all workers\n");
    pthread_cond_broadcast(&queue->not_empty);
    pthread_cond_broadcast(&(queue->not_empty));
}
+2 −1
Original line number Diff line number Diff line
@@ -87,10 +87,11 @@ typedef struct fd_queue {

int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a);
int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p);
apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty);
apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p);
int ap_queue_size(FDQueue *queue);
int ap_queue_full(FDQueue *queue);
int ap_block_on_queue(FDQueue *queue);
void ap_queue_signal_all_wakeup(FDQueue *queue);
int ap_increase_blanks(FDQueue *queue);

#endif /* FDQUEUE_H */
+10 −10
Original line number Diff line number Diff line
@@ -510,7 +510,6 @@ static void check_infinite_requests(void)
/* Sets workers_may_exit if we received a character on the pipe_of_death */
static void check_pipe_of_death(void)
{
fprintf(stderr, "looking at pipe of death\n");
    apr_lock_acquire(pipe_of_death_mutex);
    if (!workers_may_exit) {
        apr_status_t ret;
@@ -684,7 +683,8 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
    free(ti);

    while (!workers_may_exit) {
        ap_queue_pop(worker_queue, &csd, &ptrans, 1);
        ap_queue_pop(worker_queue, &csd, &ptrans);
        ap_increase_blanks(worker_queue);
        process_socket(ptrans, csd, process_slot, thread_slot);
        requests_this_child--;
        apr_pool_clear(ptrans);
@@ -729,21 +729,21 @@ static void *start_threads(apr_thread_t *thd, void * dummy)
    apr_thread_t **threads = ts->threads;
    apr_threadattr_t *thread_attr = ts->threadattr;
    int child_num_arg = ts->child_num_arg;
    int i;
    int my_child_num = child_num_arg;
    proc_info *my_info = NULL;
    apr_status_t rv;
    int i = 0;
    int threads_created = 0;
    apr_thread_t *listener;

    while (1) {
    my_info = (proc_info *)malloc(sizeof(proc_info));
    my_info->pid = my_child_num;
    my_info->tid = i;
    my_info->sd = 0;
    apr_pool_create(&my_info->tpool, pchild);
    apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
        for (i=0; i < ap_threads_per_child; i++) {
    while (1) {
        for (i=1; i < ap_threads_per_child; i++) {
            int status = ap_scoreboard_image->servers[child_num_arg][i].status;

            if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {