h2_bucket_beam.c 39 KB
Newer Older
powelld's avatar
powelld committed
                APR_BUCKET_REMOVE(bsender);
                H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
                continue;
            }
            else if (APR_BUCKET_IS_FILE(bsender)) {
                /* This is set aside into the target brigade pool so that 
                 * any read operation messes with that pool and not 
                 * the sender one. */
                apr_bucket_file *f = (apr_bucket_file *)bsender->data;
                apr_file_t *fd = f->fd;
                int setaside = (f->readpool != bb->p);
                
                if (setaside) {
                    status = apr_file_setaside(&fd, fd, bb->p);
                    if (status != APR_SUCCESS) {
                        goto leave;
                    }
                    ++beam->files_beamed;
                }
                ng = apr_brigade_insert_file(bb, fd, bsender->start, bsender->length, 
                                             bb->p);
#if APR_HAS_MMAP
                /* disable mmap handling as this leads to segfaults when
                 * the underlying file is changed while memory pointer has
                 * been handed out. See also PR 59348 */
                apr_bucket_file_enable_mmap(ng, 0);
#endif
                APR_BUCKET_REMOVE(bsender);
                H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);

                remain -= bsender->length;
                ++transferred;
                ++transferred_buckets;
                continue;
            }
            else {
                /* create a "receiver" standin bucket. we took care about the
                 * underlying sender bucket and its data when we placed it into
                 * the sender brigade.
                 * the beam bucket will notify us on destruction that bsender is
                 * no longer needed. */
                brecv = h2_beam_bucket_create(beam, bsender, bb->bucket_alloc,
                                               beam->buckets_sent++);
            }
            
            /* Place the sender bucket into our hold, to be destroyed when no
             * receiver bucket references it any more. */
            APR_BUCKET_REMOVE(bsender);
            H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
            
            beam->received_bytes += bsender->length;
            ++transferred_buckets;
            
            if (brecv) {
                APR_BRIGADE_INSERT_TAIL(bb, brecv);
                remain -= brecv->length;
                ++transferred;
            }
            else {
                /* let outside hook determine how bucket is beamed */
                leave_yellow(beam, &bl);
                brecv = h2_beam_bucket(beam, bb, bsender);
                enter_yellow(beam, &bl);
                
                while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
                    ++transferred;
                    remain -= brecv->length;
                    brecv = APR_BUCKET_NEXT(brecv);
                }
            }
        }

        if (remain < 0) {
            /* too much, put some back into out recv_buffer */
            remain = readbytes;
            for (brecv = APR_BRIGADE_FIRST(bb);
                 brecv != APR_BRIGADE_SENTINEL(bb);
                 brecv = APR_BUCKET_NEXT(brecv)) {
                remain -= (beam->tx_mem_limits? bucket_mem_used(brecv) 
                           : brecv->length);
                if (remain < 0) {
                    apr_bucket_split(brecv, brecv->length+remain);
                    beam->recv_buffer = apr_brigade_split_ex(bb, 
                                                             APR_BUCKET_NEXT(brecv), 
                                                             beam->recv_buffer);
                    break;
                }
            }
        }

        if (beam->closed && buffer_is_empty(beam)) {
            /* beam is closed and we have nothing more to receive */ 
            if (!beam->close_sent) {
                apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
                APR_BRIGADE_INSERT_TAIL(bb, b);
                beam->close_sent = 1;
                ++transferred;
                status = APR_SUCCESS;
            }
        }
        
        if (transferred_buckets > 0) {
           if (beam->cons_ev_cb) { 
               beam->cons_ev_cb(beam->cons_ctx, beam);
            }
        }
        
        if (transferred) {
            apr_thread_cond_broadcast(beam->change);
            status = APR_SUCCESS;
        }
        else {
            status = wait_not_empty(beam, block, bl.mutex);
            if (status != APR_SUCCESS) {
                goto leave;
            }
            goto transfer;
        }
leave:        
        leave_yellow(beam, &bl);
    }
    return status;
}

void h2_beam_on_consumed(h2_bucket_beam *beam, 
                         h2_beam_ev_callback *ev_cb,
                         h2_beam_io_callback *io_cb, void *ctx)
{
    h2_beam_lock bl;
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
        beam->cons_ev_cb = ev_cb;
        beam->cons_io_cb = io_cb;
        beam->cons_ctx = ctx;
        leave_yellow(beam, &bl);
    }
}

void h2_beam_on_produced(h2_bucket_beam *beam, 
                         h2_beam_io_callback *io_cb, void *ctx)
{
    h2_beam_lock bl;
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
        beam->prod_io_cb = io_cb;
        beam->prod_ctx = ctx;
        leave_yellow(beam, &bl);
    }
}

void h2_beam_on_file_beam(h2_bucket_beam *beam, 
                          h2_beam_can_beam_callback *cb, void *ctx)
{
    h2_beam_lock bl;
    
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
        beam->can_beam_fn = cb;
        beam->can_beam_ctx = ctx;
        leave_yellow(beam, &bl);
    }
}


apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
{
    apr_bucket *b;
    apr_off_t l = 0;
    h2_beam_lock bl;
    
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
        for (b = H2_BLIST_FIRST(&beam->send_list); 
            b != H2_BLIST_SENTINEL(&beam->send_list);
            b = APR_BUCKET_NEXT(b)) {
            /* should all have determinate length */
            l += b->length;
        }
        leave_yellow(beam, &bl);
    }
    return l;
}

apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
{
    apr_bucket *b;
    apr_off_t l = 0;
    h2_beam_lock bl;
    
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
        for (b = H2_BLIST_FIRST(&beam->send_list); 
            b != H2_BLIST_SENTINEL(&beam->send_list);
            b = APR_BUCKET_NEXT(b)) {
            l += bucket_mem_used(b);
        }
        leave_yellow(beam, &bl);
    }
    return l;
}

int h2_beam_empty(h2_bucket_beam *beam)
{
    int empty = 1;
    h2_beam_lock bl;
    
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
        empty = (H2_BLIST_EMPTY(&beam->send_list) 
                 && (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer)));
        leave_yellow(beam, &bl);
    }
    return empty;
}

int h2_beam_holds_proxies(h2_bucket_beam *beam)
{
    int has_proxies = 1;
    h2_beam_lock bl;
    
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
        has_proxies = !H2_BPROXY_LIST_EMPTY(&beam->proxies);
        leave_yellow(beam, &bl);
    }
    return has_proxies;
}

int h2_beam_was_received(h2_bucket_beam *beam)
{
    int happend = 0;
    h2_beam_lock bl;
    
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
        happend = (beam->received_bytes > 0);
        leave_yellow(beam, &bl);
    }
    return happend;
}

apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam)
{
    apr_size_t n = 0;
    h2_beam_lock bl;
    
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
        n = beam->files_beamed;
        leave_yellow(beam, &bl);
    }
    return n;
}

int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
{
    return 0;
}

int h2_beam_report_consumption(h2_bucket_beam *beam)
{
    h2_beam_lock bl;
    int rv = 0;
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
        rv = report_consumption(beam, &bl);
        leave_yellow(beam, &bl);
    }
    return rv;
}

void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg)
{
    if (beam && APLOG_C_IS_LEVEL(c,level)) {
        ap_log_cerror(APLOG_MARK, level, 0, c, 
                      "beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s", 
                      (c->master? c->master->id : c->id), beam->id, beam->tag, 
                      beam->closed, beam->aborted, h2_beam_empty(beam), 
                      (long)h2_beam_get_buffered(beam), msg);
    }
}