h2_bucket_beam.h 15.4 KB
Newer Older
powelld's avatar
powelld committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef h2_bucket_beam_h
#define h2_bucket_beam_h

struct apr_thread_mutex_t;
struct apr_thread_cond_t;

/*******************************************************************************
 * apr_bucket list without bells and whistles
 ******************************************************************************/
 
/**
 * h2_blist can hold a list of buckets just like apr_bucket_brigade, but
 * does not to any allocations or related features.
 */
typedef struct {
    APR_RING_HEAD(h2_bucket_list, apr_bucket) list;
} h2_blist;

#define H2_BLIST_INIT(b)        APR_RING_INIT(&(b)->list, apr_bucket, link);
#define H2_BLIST_SENTINEL(b)    APR_RING_SENTINEL(&(b)->list, apr_bucket, link)
#define H2_BLIST_EMPTY(b)       APR_RING_EMPTY(&(b)->list, apr_bucket, link)
#define H2_BLIST_FIRST(b)       APR_RING_FIRST(&(b)->list)
#define H2_BLIST_LAST(b)	APR_RING_LAST(&(b)->list)
#define H2_BLIST_INSERT_HEAD(b, e) do {				\
	apr_bucket *ap__b = (e);                                        \
	APR_RING_INSERT_HEAD(&(b)->list, ap__b, apr_bucket, link);	\
    } while (0)
#define H2_BLIST_INSERT_TAIL(b, e) do {				\
	apr_bucket *ap__b = (e);					\
	APR_RING_INSERT_TAIL(&(b)->list, ap__b, apr_bucket, link);	\
    } while (0)
#define H2_BLIST_CONCAT(a, b) do {					\
        APR_RING_CONCAT(&(a)->list, &(b)->list, apr_bucket, link);	\
    } while (0)
#define H2_BLIST_PREPEND(a, b) do {					\
        APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link);	\
    } while (0)

/*******************************************************************************
 * h2_bucket_beam
 ******************************************************************************/

/**
 * A h2_bucket_beam solves the task of transferring buckets, esp. their data,
 * across threads with zero buffer copies.
 *
 * When a thread, let's call it the sender thread, wants to send buckets to
 * another, the green thread, it creates a h2_bucket_beam and adds buckets
 * via the h2_beam_send(). It gives the beam to the green thread which then
 * can receive buckets into its own brigade via h2_beam_receive().
 *
 * Sending and receiving can happen concurrently.
 *
 * The beam can limit the amount of data it accepts via the buffer_size. This
 * can also be adjusted during its lifetime. Sends and receives can be done blocking. 
 * A timeout can be set for such blocks.
 *
 * Care needs to be taken when terminating the beam. The beam registers at
 * the pool it was created with and will cleanup after itself. However, if
 * received buckets do still exist, already freed memory might be accessed.
 * The beam does a assertion on this condition.
 * 
 * The proper way of shutting down a beam is to first make sure there are no
 * more green buckets out there, then cleanup the beam to purge eventually
 * still existing sender buckets and then, possibly, terminate the beam itself
 * (or the pool it was created with).
 *
 * The following restrictions apply to bucket transport:
 * - only EOS and FLUSH meta buckets are copied through. All other meta buckets
 *   are kept in the beams hold.
 * - all kind of data buckets are transported through:
 *   - transient buckets are converted to heap ones on send
 *   - heap and pool buckets require no extra handling
 *   - buckets with indeterminate length are read on send
 *   - file buckets will transfer the file itself into a new bucket, if allowed
 *   - all other buckets are read on send to make sure data is present
 *
 * This assures that when the sender thread sends its sender buckets, the data
 * is made accessible while still on the sender side. The sender bucket then enters
 * the beams hold storage.
 * When the green thread calls receive, sender buckets in the hold are wrapped
 * into special beam buckets. Beam buckets on read present the data directly
 * from the internal sender one, but otherwise live on the green side. When a
 * beam bucket gets destroyed, it notifies its beam that the corresponding
 * sender bucket from the hold may be destroyed.
 * Since the destruction of green buckets happens in the green thread, any
 * corresponding sender bucket can not immediately be destroyed, as that would
 * result in race conditions.
 * Instead, the beam transfers such sender buckets from the hold to the purge
 * storage. Next time there is a call from the sender side, the buckets in
 * purge will be deleted.
 *
 * There are callbacks that can be registesender with a beam:
 * - a "consumed" callback that gets called on the sender side with the
 *   amount of data that has been received by the green side. The amount
 *   is a delta from the last callback invocation. The sender side can trigger
 *   these callbacks by calling h2_beam_send() with a NULL brigade.
 * - a "can_beam_file" callback that can prohibit the transfer of file handles
 *   through the beam. This will cause file buckets to be read on send and
 *   its data buffer will then be transports just like a heap bucket would.
 *   When no callback is registered, no restrictions apply and all files are
 *   passed through.
 *   File handles transfersender to the green side will stay there until the
 *   receiving brigade's pool is destroyed/cleared. If the pool lives very
 *   long or if many different files are beamed, the process might run out
 *   of available file handles.
 *
 * The name "beam" of course is inspired by good old transporter
 * technology where humans are kept inside the transporter's memory
 * buffers until the transmission is complete. Star gates use a similar trick.
 */

typedef void h2_beam_mutex_leave(void *ctx,  struct apr_thread_mutex_t *lock);

typedef struct {
    apr_thread_mutex_t *mutex;
    h2_beam_mutex_leave *leave;
    void *leave_ctx;
} h2_beam_lock;

typedef struct h2_bucket_beam h2_bucket_beam;

typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl);

typedef void h2_beam_io_callback(void *ctx, h2_bucket_beam *beam,
                                 apr_off_t bytes);
typedef void h2_beam_ev_callback(void *ctx, h2_bucket_beam *beam);

typedef struct h2_beam_proxy h2_beam_proxy;
typedef struct {
    APR_RING_HEAD(h2_beam_proxy_list, h2_beam_proxy) list;
} h2_bproxy_list;

typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam,
                                      apr_file_t *file);

typedef enum {
    H2_BEAM_OWNER_SEND,
    H2_BEAM_OWNER_RECV
} h2_beam_owner_t;

/**
 * Will deny all transfer of apr_file_t across the beam and force
 * a data copy instead.
 */
int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file);

struct h2_bucket_beam {
    int id;
    const char *tag;
    apr_pool_t *pool;
    h2_beam_owner_t owner;
    h2_blist send_list;
    h2_blist hold_list;
    h2_blist purge_list;
    apr_bucket_brigade *recv_buffer;
    h2_bproxy_list proxies;
    apr_pool_t *send_pool;
    apr_pool_t *recv_pool;
    
    apr_size_t max_buf_size;
    apr_interval_time_t timeout;

    apr_off_t sent_bytes;     /* amount of bytes send */
    apr_off_t received_bytes; /* amount of bytes received */

    apr_size_t buckets_sent;  /* # of beam buckets sent */
    apr_size_t files_beamed;  /* how many file handles have been set aside */
    
    unsigned int aborted : 1;
    unsigned int closed : 1;
    unsigned int close_sent : 1;
    unsigned int tx_mem_limits : 1; /* only memory size counts on transfers */

    struct apr_thread_mutex_t *lock;
    struct apr_thread_cond_t *change;
    
    apr_off_t cons_bytes_reported;    /* amount of bytes reported as consumed */
    h2_beam_ev_callback *cons_ev_cb;
    h2_beam_io_callback *cons_io_cb;
    void *cons_ctx;

    apr_off_t prod_bytes_reported;    /* amount of bytes reported as produced */
    h2_beam_io_callback *prod_io_cb;
    void *prod_ctx;

    h2_beam_can_beam_callback *can_beam_fn;
    void *can_beam_ctx;
};

/**
 * Creates a new bucket beam for transfer of buckets across threads.
 *
 * The pool the beam is created with will be protected by the given 
 * mutex and will be used in multiple threads. It needs a pool allocator
 * that is only used inside that same mutex.
 *
 * @param pbeam         will hold the created beam on return
 * @param pool          pool owning the beam, beam will cleanup when pool released
 * @param id            identifier of the beam
 * @param tag           tag identifying beam for logging
 * @param owner         if the beam is owned by the sender or receiver, e.g. if
 *                      the pool owner is using this beam for sending or receiving
 * @param buffer_size   maximum memory footprint of buckets buffered in beam, or
 *                      0 for no limitation
 * @param timeout       timeout for blocking operations
 */
apr_status_t h2_beam_create(h2_bucket_beam **pbeam,
                            apr_pool_t *pool, 
                            int id, const char *tag,
                            h2_beam_owner_t owner,  
                            apr_size_t buffer_size,
                            apr_interval_time_t timeout);

/**
 * Destroys the beam immediately without cleanup.
 */ 
apr_status_t h2_beam_destroy(h2_bucket_beam *beam);

/**
 * Send buckets from the given brigade through the beam. Will hold buckets 
 * internally as long as they have not been processed by the receiving side.
 * All accepted buckets are removed from the given brigade. Will return with
 * APR_EAGAIN on non-blocking sends when not all buckets could be accepted.
 * 
 * Call from the sender side only.
 */
apr_status_t h2_beam_send(h2_bucket_beam *beam,  
                          apr_bucket_brigade *bb, 
                          apr_read_type_e block);

/**
 * Register the pool from which future buckets are send. This defines
 * the lifetime of the buckets, e.g. the pool should not be cleared/destroyed
 * until the data is no longer needed (or has been received).
 */
void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p);

/**
 * Receive buckets from the beam into the given brigade. Will return APR_EOF
 * when reading past an EOS bucket. Reads can be blocking until data is 
 * available or the beam has been closed. Non-blocking calls return APR_EAGAIN
 * if no data is available.
 *
 * Call from the receiver side only.
 */
apr_status_t h2_beam_receive(h2_bucket_beam *beam, 
                             apr_bucket_brigade *green_buckets, 
                             apr_read_type_e block,
                             apr_off_t readbytes);

/**
 * Determine if beam is empty. 
 */
int h2_beam_empty(h2_bucket_beam *beam);

/**
 * Determine if beam has handed out proxy buckets that are not destroyed. 
 */
int h2_beam_holds_proxies(h2_bucket_beam *beam);

/**
 * Abort the beam. Will cleanup any buffered buckets and answer all send
 * and receives with APR_ECONNABORTED.
 * 
 * Call from the sender side only.
 */
void h2_beam_abort(h2_bucket_beam *beam);

/**
 * Close the beam. Sending an EOS bucket serves the same purpose. 
 * 
 * Call from the sender side only.
 */
apr_status_t h2_beam_close(h2_bucket_beam *beam);

/**
 * Receives leaves the beam, e.g. will no longer read. This will
 * interrupt any sender blocked writing and fail future send. 
 * 
 * Call from the receiver side only.
 */
apr_status_t h2_beam_leave(h2_bucket_beam *beam);

int h2_beam_is_closed(h2_bucket_beam *beam);

/**
 * Return APR_SUCCESS when all buckets in transit have been handled. 
 * When called with APR_BLOCK_READ and a mutex set, will wait until the green
 * side has consumed all data. Otherwise APR_EAGAIN is returned.
 * With clear_buffers set, any queued data is discarded.
 * If a timeout is set on the beam, waiting might also time out and
 * return APR_ETIMEUP.
 *
 * Call from the sender side only.
 */
apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block);

/** 
 * Set/get the timeout for blocking read/write operations. Only works
 * if a mutex has been set for the beam.
 */
void h2_beam_timeout_set(h2_bucket_beam *beam, 
                         apr_interval_time_t timeout);
apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam);

/**
 * Set/get the maximum buffer size for beam data (memory footprint).
 */
void h2_beam_buffer_size_set(h2_bucket_beam *beam, 
                             apr_size_t buffer_size);
apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam);

/**
 * Register a callback to be invoked on the sender side with the
 * amount of bytes that have been consumed by the receiver, since the
 * last callback invocation or reset.
 * @param beam the beam to set the callback on
 * @param ev_cb the callback or NULL, called when bytes are consumed
 * @param io_cb the callback or NULL, called on sender with bytes consumed
 * @param ctx  the context to use in callback invocation
 * 
 * Call from the sender side, io callbacks invoked on sender side, ev callback
 * from any side.
 */
void h2_beam_on_consumed(h2_bucket_beam *beam, 
                         h2_beam_ev_callback *ev_cb,
                         h2_beam_io_callback *io_cb, void *ctx);

/**
 * Call any registered consumed handler, if any changes have happened
 * since the last invocation. 
 * @return !=0 iff a handler has been called
 *
 * Needs to be invoked from the sending side.
 */
int h2_beam_report_consumption(h2_bucket_beam *beam);

/**
 * Register a callback to be invoked on the receiver side with the
 * amount of bytes that have been produces by the sender, since the
 * last callback invocation or reset.
 * @param beam the beam to set the callback on
 * @param io_cb the callback or NULL, called on receiver with bytes produced
 * @param ctx  the context to use in callback invocation
 * 
 * Call from the receiver side, callbacks invoked on either side.
 */
void h2_beam_on_produced(h2_bucket_beam *beam, 
                         h2_beam_io_callback *io_cb, void *ctx);

/**
 * Register a callback that may prevent a file from being beam as
 * file handle, forcing the file content to be copied. Then no callback
 * is set (NULL), file handles are transferred directly.
 * @param beam the beam to set the callback on
 * @param io_cb the callback or NULL, called on receiver with bytes produced
 * @param ctx  the context to use in callback invocation
 * 
 * Call from the receiver side, callbacks invoked on either side.
 */
void h2_beam_on_file_beam(h2_bucket_beam *beam, 
                          h2_beam_can_beam_callback *cb, void *ctx);

/**
 * Get the amount of bytes currently buffered in the beam (unread).
 */
apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam);

/**
 * Get the memory used by the buffered buckets, approximately.
 */
apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam);

/**
 * Return != 0 iff (some) data from the beam has been received.
 */
int h2_beam_was_received(h2_bucket_beam *beam);

apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam);

typedef apr_bucket *h2_bucket_beamer(h2_bucket_beam *beam, 
                                     apr_bucket_brigade *dest,
                                     const apr_bucket *src);

void h2_register_bucket_beamer(h2_bucket_beamer *beamer);

void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg);

#endif /* h2_bucket_beam_h */