Loading build/initial-build.sh +6 −3 Original line number Original line Diff line number Diff line Loading @@ -17,16 +17,19 @@ local_install_dir=$1 shift shift build_script_dir=$(pwd) build_script_dir=$(pwd) tlmsp_tools_dir=$(pwd)/.. tlmsp_tools_dir=$(pwd)/.. openssl_dir=${build_script_dir}/../../openssl openssl_dir=${build_script_dir}/../../tlmsp-openssl if [ ! -f ${build_script_dir}/$(basename $0) ]; then if [ ! -f ${build_script_dir}/$(basename $0) ]; then echo "This script is intended to be run from the directory that contains it" echo "This script is intended to be run from the directory that contains it" exit 4 exit 4 fi fi if [ ! -d ${openssl_dir} ]; then openssl_dir=${build_script_dir}/../../openssl if [ ! -d ${openssl_dir} ]; then if [ ! -d ${openssl_dir} ]; then echo "The openssl source directory needs to be alongside the tlmsp-tools directory" echo "The openssl source directory needs to be alongside the tlmsp-tools directory" exit 5 exit 5 fi fi fi if [ ! -f "${openssl_dir}/include/openssl/tlmsp.h" ]; then if [ ! -f "${openssl_dir}/include/openssl/tlmsp.h" ]; then echo "The openssl source directory does include TLMSP support" echo "The openssl source directory does include TLMSP support" exit 6 exit 6 Loading client/client.c +12 −15 Original line number Original line Diff line number Diff line Loading @@ -500,12 +500,6 @@ conn_cb(EV_P_ ev_io *w, int revents) connection_died(conn); connection_died(conn); return; return; } } /* * As processing read data above may have added to an * otherwise empty write queue, ensure response to EV_WRITE. */ if (demo_connection_writes_pending(conn)) demo_connection_wait_for(conn, EV_WRITE); } } /* /* Loading Loading @@ -546,7 +540,7 @@ read_containers(struct demo_connection *conn) if (ssl_result > 0) { if (ssl_result > 0) { if (!TLMSP_get_last_read_context(ssl, &context_id)) if (!TLMSP_get_last_read_context(ssl, &context_id)) return (false); return (false); demo_conn_log(3, conn, "received container (length=%u) " demo_conn_log(2, conn, "Received container (length=%u) " "in context %u using stream API", ssl_result, "in context %u using stream API", ssl_result, context_id); context_id); if (!TLMSP_container_create(ssl, &container, context_id, if (!TLMSP_container_create(ssl, &container, context_id, Loading @@ -560,7 +554,7 @@ read_containers(struct demo_connection *conn) } else { } else { ssl_result = TLMSP_container_read(ssl, &container); ssl_result = TLMSP_container_read(ssl, &container); if (ssl_result > 0) { if (ssl_result > 0) { demo_conn_log(3, conn, "received container (length=%u) " demo_conn_log(2, conn, "Received container (length=%u) " "in context %u using container API", "in context %u using container API", TLMSP_container_length(container), TLMSP_container_length(container), TLMSP_container_context(container)); TLMSP_container_context(container)); Loading @@ -579,12 +573,15 @@ read_containers(struct demo_connection *conn) ssl_error = SSL_get_error(ssl, ssl_result); ssl_error = SSL_get_error(ssl, ssl_result); switch (ssl_error) { switch (ssl_error) { case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ: demo_conn_log(5, conn, "SSL_ERROR_WANT_READ"); demo_connection_wait_for(conn, EV_READ); demo_connection_wait_for(conn, EV_READ); break; break; case SSL_ERROR_WANT_WRITE: case SSL_ERROR_WANT_WRITE: demo_conn_log(5, conn, "SSL_ERROR_WANT_WRITE"); demo_connection_wait_for(conn, EV_WRITE); demo_connection_wait_for(conn, EV_WRITE); break; break; case SSL_ERROR_WANT_RECONNECT: case SSL_ERROR_WANT_RECONNECT: demo_conn_log(5, conn, "SSL_ERROR_WANT_RECONNECT"); conn_state->reconnect_state = conn_state->reconnect_state = TLMSP_get_reconnect_state(ssl); TLMSP_get_reconnect_state(ssl); result = false; result = false; Loading @@ -598,7 +595,7 @@ read_containers(struct demo_connection *conn) return (result); return (result); } } demo_conn_log_buf(4, conn, "Container data", demo_conn_log_buf(3, conn, "Container data", TLMSP_container_get_data(container), TLMSP_container_get_data(container), TLMSP_container_length(container), true); TLMSP_container_length(container), true); Loading Loading @@ -628,11 +625,11 @@ write_containers(struct demo_connection *conn) container = container_queue_head(&conn->write_queue); container = container_queue_head(&conn->write_queue); while (container != NULL) { while (container != NULL) { context_id = TLMSP_container_context(container); context_id = TLMSP_container_context(container); demo_conn_log(3, conn, "sending container (length=%u) " demo_conn_log(2, conn, "Sending container (length=%u) " "in context %u using %s API", "in context %u using %s API", TLMSP_container_length(container), context_id, TLMSP_container_length(container), context_id, client->use_stream_api ? "stream" : "container"); client->use_stream_api ? "stream" : "container"); demo_conn_log_buf(4, conn, "Container data", demo_conn_log_buf(3, conn, "Container data", TLMSP_container_get_data(container), TLMSP_container_get_data(container), TLMSP_container_length(container), true); TLMSP_container_length(container), true); if (client->use_stream_api) { if (client->use_stream_api) { Loading @@ -652,7 +649,7 @@ write_containers(struct demo_connection *conn) ssl_result = TLMSP_container_write(ssl, container); ssl_result = TLMSP_container_write(ssl, container); } } if (ssl_result > 0) { if (ssl_result > 0) { demo_conn_log(3, conn, "container send complete (result=%d)", ssl_result); demo_conn_log(2, conn, "Container send complete (result=%d)", ssl_result); container_queue_remove_head(&conn->write_queue); container_queue_remove_head(&conn->write_queue); container = container_queue_head(&conn->write_queue); container = container_queue_head(&conn->write_queue); } else } else Loading @@ -663,15 +660,15 @@ write_containers(struct demo_connection *conn) ssl_error = SSL_get_error(ssl, ssl_result); ssl_error = SSL_get_error(ssl, ssl_result); switch (ssl_error) { switch (ssl_error) { case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ: demo_conn_log(4, conn, "SSL_ERROR_WANT_READ"); demo_conn_log(5, conn, "SSL_ERROR_WANT_READ"); demo_connection_wait_for(conn, EV_READ); demo_connection_wait_for(conn, EV_READ); break; break; case SSL_ERROR_WANT_WRITE: case SSL_ERROR_WANT_WRITE: demo_conn_log(4, conn, "SSL_ERROR_WANT_WRITE"); demo_conn_log(5, conn, "SSL_ERROR_WANT_WRITE"); demo_connection_wait_for(conn, EV_WRITE); demo_connection_wait_for(conn, EV_WRITE); break; break; case SSL_ERROR_WANT_RECONNECT: case SSL_ERROR_WANT_RECONNECT: demo_conn_log(4, conn, "SSL_ERROR_WANT_RECONNECT"); demo_conn_log(5, conn, "SSL_ERROR_WANT_RECONNECT"); conn_state->reconnect_state = TLMSP_get_reconnect_state(ssl); conn_state->reconnect_state = TLMSP_get_reconnect_state(ssl); result = false; result = false; break; break; Loading libdemo/activity.c +55 −26 Original line number Original line Diff line number Diff line Loading @@ -339,8 +339,8 @@ demo_activity_queue_payload(struct demo_connection *conn, struct tlmsp_cfg_payload *payload, struct match_groups *match_groups, struct tlmsp_cfg_payload *payload, struct match_groups *match_groups, bool create_if_empty) bool create_if_empty) { { struct container_queue *q = &conn->write_queue; struct container_queue *q = &conn->write_queue; SSL *ssl = q->conn->ssl; TLMSP_Container *container; TLMSP_Container *container; const uint8_t *data; const uint8_t *data; size_t len; size_t len; Loading Loading @@ -368,7 +368,7 @@ demo_activity_queue_payload(struct demo_connection *conn, container_len = MAX_CONTAINER_SIZE; container_len = MAX_CONTAINER_SIZE; else else container_len = remaining; container_len = remaining; if (!TLMSP_container_create(q->ssl, &container, if (!TLMSP_container_create(ssl, &container, payload->context->id, &data[offset], payload->context->id, &data[offset], container_len)) { container_len)) { demo_conn_print_error_ssl_errq(conn, demo_conn_print_error_ssl_errq(conn, Loading @@ -379,10 +379,10 @@ demo_activity_queue_payload(struct demo_connection *conn, if (!container_queue_add(q, container)) { if (!container_queue_add(q, container)) { demo_conn_print_error(conn, demo_conn_print_error(conn, "Failed to add payload container to queue\n"); "Failed to add payload container to queue\n"); TLMSP_container_free(q->ssl, container); TLMSP_container_free(ssl, container); goto out; goto out; } } demo_conn_log(3, conn, "Queued container (length=%u) in " demo_conn_log(2, conn, "Queued container (length=%u) in " "context %u", container_len, payload->context->id); "context %u", container_len, payload->context->id); offset += container_len; offset += container_len; Loading Loading @@ -436,11 +436,11 @@ demo_activity_add_time_triggered_message(struct demo_connection *conn, msg->free_data = free_data; msg->free_data = free_data; if (every == 0.0) if (every == 0.0) demo_conn_log(3, conn, "adding on-shot message at %u ms " demo_conn_log(2, conn, "Adding on-shot message at %u ms " "(length=%u) in context %u", (unsigned int)(at * 1000.0), "(length=%u) in context %u", (unsigned int)(at * 1000.0), len, msg->context_id); len, msg->context_id); else else demo_conn_log(3, conn, "adding periodic message at %u ms, " demo_conn_log(2, conn, "Adding periodic message at %u ms, " "period %u ms (length=%u) in context %u", "period %u ms (length=%u) in context %u", (unsigned int)(((at == 0.0) ? every : at) * 1000.0), (unsigned int)(((at == 0.0) ? every : at) * 1000.0), (unsigned int)(every * 1000.0), len, msg->context_id); (unsigned int)(every * 1000.0), len, msg->context_id); Loading Loading @@ -492,7 +492,7 @@ demo_activity_time_triggered_cb(EV_P_ ev_timer *w, int revents) TLMSP_container_free(conn->ssl, container); TLMSP_container_free(conn->ssl, container); return; return; } } demo_conn_log(3, conn, "Queued time-triggered container " demo_conn_log(2, conn, "Queued time-triggered container " "(length=%u) in context %u", container_len, msg->context_id); "(length=%u) in context %u", container_len, msg->context_id); offset += container_len; offset += container_len; Loading Loading @@ -589,10 +589,6 @@ demo_activity_run_payload_handler(struct demo_connection *log_conn, goto stderr_fail; goto stderr_fail; } } printf("stdin READ(%u)=%d WRITE(%u)=%d\n", PIPE_READ_FD, stdin_pipe[PIPE_READ_FD], PIPE_WRITE_FD, stdin_pipe[PIPE_WRITE_FD]); printf("stdout READ(%u)=%d WRITE(%u)=%d\n", PIPE_READ_FD, stdout_pipe[PIPE_READ_FD], PIPE_WRITE_FD, stdout_pipe[PIPE_WRITE_FD]); printf("stderr READ(%u)=%d WRITE(%u)=%d\n", PIPE_READ_FD, stderr_pipe[PIPE_READ_FD], PIPE_WRITE_FD, stderr_pipe[PIPE_WRITE_FD]); child_pid = fork(); child_pid = fork(); if (0 == child_pid) { if (0 == child_pid) { /* child */ /* child */ Loading Loading @@ -718,8 +714,6 @@ demo_activity_handler_stdin_cb(EV_P_ ev_io *w, int revents) bytes_written = write(w->fd, state->out_buf, state->out_remaining); bytes_written = write(w->fd, state->out_buf, state->out_remaining); if (bytes_written == -1) { if (bytes_written == -1) { demo_conn_log(5, state->log_conn, "stdin -1"); demo_conn_log(5, state->log_conn, "stdin -1"); if (errno != EAGAIN) if (errno != EAGAIN) ev_break(EV_A_ EVBREAK_ONE); ev_break(EV_A_ EVBREAK_ONE); return; return; Loading @@ -743,6 +737,7 @@ demo_activity_handler_stdout_cb(EV_P_ ev_io *w, int revents) size_t space, new_size; size_t space, new_size; ssize_t bytes_read; ssize_t bytes_read; uint8_t *p; uint8_t *p; demo_conn_log(5, state->log_conn, "Handler stdout event"); demo_conn_log(5, state->log_conn, "Handler stdout event"); /* /* * If we have less than the realloc threshold, extend the buffer. * If we have less than the realloc threshold, extend the buffer. Loading Loading @@ -783,6 +778,7 @@ demo_activity_handler_stderr_cb(EV_P_ ev_io *w, int revents) struct payload_handler_state *state = w->data; struct payload_handler_state *state = w->data; ssize_t bytes_read; ssize_t bytes_read; size_t space; size_t space; demo_conn_log(5, state->log_conn, "Handler stderr event"); demo_conn_log(5, state->log_conn, "Handler stderr event"); /* always leave one byte at the end for a NUL */ /* always leave one byte at the end for a NUL */ space = sizeof(state->err_buf) - state->err_offset - 1; space = sizeof(state->err_buf) - state->err_offset - 1; Loading Loading @@ -969,6 +965,23 @@ demo_activity_process_read_queue(struct demo_connection *conn) return (false); return (false); } while ((selected_match_state != NULL) && (conn->read_queue.head != NULL)); } while ((selected_match_state != NULL) && (conn->read_queue.head != NULL)); /* * We may have added data to the connection's write queue as a * result of the match-action activity processed above, so ensure * the write event is set. */ if (demo_connection_writes_pending(conn)) demo_connection_wait_for(conn, EV_WRITE); /* * If this connection is part of a splice, the above match-action * activity may have queued new data to the other side's write * queue. */ if (conn->splice != NULL) { if (demo_connection_writes_pending(conn->other_side)) demo_connection_wait_for(conn->other_side, EV_WRITE); } return (true); return (true); } } Loading Loading @@ -1006,8 +1019,10 @@ demo_activity_container_match(struct demo_connection *conn, "(%"PRIu64" ?= %" PRIu64")", entry->container_number, "(%"PRIu64" ?= %" PRIu64")", entry->container_number, match->container.param.n); match->container.param.n); if (entry->container_number == match->container.param.n) if (entry->container_number == match->container.param.n) { demo_conn_log(4, conn, "Found container number match"); match_found = true; match_found = true; } break; break; case TLMSP_CFG_MATCH_CONTAINER_PROBABILITY: case TLMSP_CFG_MATCH_CONTAINER_PROBABILITY: demo_conn_log(5, conn, "Checking container probability " demo_conn_log(5, conn, "Checking container probability " Loading @@ -1015,11 +1030,13 @@ demo_activity_container_match(struct demo_connection *conn, match->container.param.p * match_state->containers_inspected); match->container.param.p * match_state->containers_inspected); if ((double)match_state->containers_matched < if ((double)match_state->containers_matched < (match->container.param.p * match_state->containers_inspected)) (match->container.param.p * match_state->containers_inspected)) { demo_conn_log(4, conn, "Found container probability match"); match_found = true; match_found = true; } break; break; case TLMSP_CFG_MATCH_CONTAINER_ALL: case TLMSP_CFG_MATCH_CONTAINER_ALL: demo_conn_log(5, conn, "Matching all containers"); demo_conn_log(4, conn, "Found all-containers match"); match_found = true; match_found = true; break; break; } } Loading Loading @@ -1129,6 +1146,7 @@ demo_activity_pattern_match(struct demo_connection *conn, result = memmem(search_buf, search_len, pattern_buf->p, result = memmem(search_buf, search_len, pattern_buf->p, pattern_buf->len); pattern_buf->len); if (result != NULL) { if (result != NULL) { demo_conn_log(4, conn, "Found static pattern match"); match_found = true; match_found = true; match_offset = result - search_buf; match_offset = result - search_buf; match_len = pattern_buf->len; match_len = pattern_buf->len; Loading Loading @@ -1170,7 +1188,7 @@ demo_activity_pattern_match(struct demo_connection *conn, num_match_groups = pcre2_match(match->pattern.param.regex, num_match_groups = pcre2_match(match->pattern.param.regex, search_buf, search_len, 0, 0, match_data, NULL); search_buf, search_len, 0, 0, match_data, NULL); if (num_match_groups > 0) { if (num_match_groups > 0) { demo_conn_log(5, conn, "Match with %d groups", demo_conn_log(4, conn, "Found regex match with %d groups", num_match_groups); num_match_groups); match_found = true; match_found = true; output = pcre2_get_ovector_pointer(match_data); output = pcre2_get_ovector_pointer(match_data); Loading Loading @@ -1206,8 +1224,6 @@ demo_activity_pattern_match(struct demo_connection *conn, } } if (match_found) { if (match_found) { demo_conn_log(5, conn, "Match found (offset=%zu, length=%zu)", match_offset, match_len); /* /* * Set up the match result range * Set up the match result range */ */ Loading @@ -1222,7 +1238,7 @@ demo_activity_pattern_match(struct demo_connection *conn, ((match_offset + match_len) <= (offset + entry->length))) { ((match_offset + match_len) <= (offset + entry->length))) { match_range->last = entry; match_range->last = entry; match_range->last_remainder = match_range->last_remainder = match_offset + match_len - offset; entry->length - (match_offset + match_len - offset); } } offset += entry->length; offset += entry->length; } } Loading Loading @@ -1415,6 +1431,7 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con struct container_queue_range *match_range, bool drop_all, struct container_queue_range *match_range, bool drop_all, bool delete_match) bool delete_match) { { SSL *write_q_ssl = write_q->conn->ssl; TLMSP_Container *container, *new_container; TLMSP_Container *container, *new_container; const uint8_t *src; const uint8_t *src; size_t delete_or_forward_bytes; size_t delete_or_forward_bytes; Loading @@ -1439,7 +1456,7 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "(context=%u, length=%zu)", "(context=%u, length=%zu)", TLMSP_container_context(container), TLMSP_container_context(container), TLMSP_container_length(container)); TLMSP_container_length(container)); if (!TLMSP_container_delete(write_q->ssl, container)) { if (!TLMSP_container_delete(write_q_ssl, container)) { demo_conn_print_error_ssl_errq(log_conn, demo_conn_print_error_ssl_errq(log_conn, "Failed to add delete preamble container"); "Failed to add delete preamble container"); return (false); return (false); Loading @@ -1465,7 +1482,7 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "container (context=%u, length=%zu)", "container (context=%u, length=%zu)", TLMSP_container_context(container), TLMSP_container_context(container), TLMSP_container_length(container)); TLMSP_container_length(container)); TLMSP_container_free(write_q->ssl, container); TLMSP_container_free(write_q_ssl, container); } else { } else { /* deleting or forwarding */ /* deleting or forwarding */ if (delete_match) { if (delete_match) { Loading @@ -1473,7 +1490,7 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "container (context=%u, length=%zu)", "container (context=%u, length=%zu)", TLMSP_container_context(container), TLMSP_container_context(container), TLMSP_container_length(container)); TLMSP_container_length(container)); if (!TLMSP_container_delete(write_q->ssl, container)) { if (!TLMSP_container_delete(write_q_ssl, container)) { demo_conn_print_error_ssl_errq(log_conn, demo_conn_print_error_ssl_errq(log_conn, "Failed to add delete preamble/match container"); "Failed to add delete preamble/match container"); return (false); return (false); Loading @@ -1484,6 +1501,10 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "Failed to add preamble/match container to write queue"); "Failed to add preamble/match container to write queue"); return (false); return (false); } } demo_conn_log(2, log_conn, "Queued forwarded premable/match " "container (length=%u) in context %u", TLMSP_container_length(container), TLMSP_container_context(container)); } } container = container_queue_head(read_q); container = container_queue_head(read_q); } } Loading @@ -1499,7 +1520,7 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con delete_or_forward_bytes = TLMSP_container_length(container) - delete_or_forward_bytes = TLMSP_container_length(container) - match_range->last_remainder; match_range->last_remainder; if (!drop_all && !delete_match) { if (!drop_all && !delete_match) { if (!TLMSP_container_create(write_q->ssl, if (!TLMSP_container_create(write_q_ssl, &new_container, &new_container, TLMSP_container_context(container), TLMSP_container_context(container), TLMSP_container_get_data(container), TLMSP_container_get_data(container), Loading @@ -1515,6 +1536,10 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "of match data to write queue"); "of match data to write queue"); return (false); return (false); } } demo_conn_log(2, log_conn, "Queued partial match data " "container (length=%u) in context %u", TLMSP_container_length(new_container), TLMSP_container_context(new_container)); } } /* remove leading data on container staying in queue */ /* remove leading data on container staying in queue */ src = TLMSP_container_get_data(container); src = TLMSP_container_get_data(container); Loading @@ -1526,14 +1551,14 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "(context=%u, length=%zu)", "(context=%u, length=%zu)", TLMSP_container_context(container), TLMSP_container_context(container), TLMSP_container_length(container)); TLMSP_container_length(container)); TLMSP_container_free(write_q->ssl, container); TLMSP_container_free(write_q_ssl, container); } else { } else { if (delete_match) { if (delete_match) { demo_conn_log(5, log_conn, "Deleting final match container " demo_conn_log(5, log_conn, "Deleting final match container " "(context=%u, length=%zu)", "(context=%u, length=%zu)", TLMSP_container_context(container), TLMSP_container_context(container), TLMSP_container_length(container)); TLMSP_container_length(container)); if (!TLMSP_container_delete(write_q->ssl, container)) { if (!TLMSP_container_delete(write_q_ssl, container)) { demo_conn_print_error_ssl_errq(log_conn, demo_conn_print_error_ssl_errq(log_conn, "Failed to delete complete container for " "Failed to delete complete container for " "tail end of match data"); "tail end of match data"); Loading @@ -1546,6 +1571,10 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "match data to write queue"); "match data to write queue"); return (false); return (false); } } demo_conn_log(2, log_conn, "Queued forwarded final match data " "container (length=%u) in context %u", TLMSP_container_length(container), TLMSP_container_context(container)); } } } } Loading libdemo/connection.c +19 −7 Original line number Original line Diff line number Diff line Loading @@ -16,6 +16,12 @@ #include "splice.h" #include "splice.h" #include "print.h" #include "print.h" /* XXX make configurable */ #define READ_QUEUE_MAX_IDLE_MS 100 #define READ_QUEUE_MAX_DEPTH_BYTES (1*1024*1024) static void demo_conn_connected_cb(EV_P_ ev_io *w, int revents); static void demo_conn_connected_cb(EV_P_ ev_io *w, int revents); Loading Loading @@ -96,8 +102,14 @@ demo_connection_init_io(struct demo_connection *conn, SSL_CTX *ssl_ctx, int sock return (false); return (false); } } container_queue_init(&conn->read_queue, conn->ssl); /* container_queue_init(&conn->write_queue, conn->ssl); * Read queues on endpoints do not have an idle timer, * but middlebox read queues do. */ container_queue_init(&conn->read_queue, conn, (conn->splice == NULL) ? 0 : READ_QUEUE_MAX_IDLE_MS, READ_QUEUE_MAX_DEPTH_BYTES); container_queue_init(&conn->write_queue, conn, 0, 0); if (!demo_activity_conn_queue_initial(conn)) { if (!demo_activity_conn_queue_initial(conn)) { demo_conn_print_error(conn, demo_conn_print_error(conn, Loading Loading @@ -179,11 +191,11 @@ demo_connection_events_arrived(struct demo_connection *conn, int events) { { if (events & EV_ERROR) if (events & EV_ERROR) demo_conn_log(3, conn, "ERROR event"); demo_conn_log(5, conn, "ERROR event"); if (events & EV_READ) if (events & EV_READ) demo_conn_log(3, conn, "READ event"); demo_conn_log(5, conn, "READ event"); if (events & EV_WRITE) if (events & EV_WRITE) demo_conn_log(3, conn, "WRITE event"); demo_conn_log(5, conn, "WRITE event"); conn->wait_events &= ~events; conn->wait_events &= ~events; } } Loading @@ -200,9 +212,9 @@ demo_connection_wait_for(struct demo_connection *conn, int events) { { if (events & EV_READ) if (events & EV_READ) demo_conn_log(3, conn, "wait for readable"); demo_conn_log(5, conn, "Wait for readable"); if (events & EV_WRITE) if (events & EV_WRITE) demo_conn_log(3, conn, "wait for writable"); demo_conn_log(5, conn, "Wait for writable"); conn->wait_events |= events; conn->wait_events |= events; } } Loading libdemo/container_queue.c +71 −4 Original line number Original line Diff line number Diff line Loading @@ -7,23 +7,69 @@ #include <stdlib.h> #include <stdlib.h> #include "connection.h" #include "container_queue.h" #include "container_queue.h" #include "print.h" #include "splice.h" static void container_queue_idle_cb(EV_P_ ev_timer *w, int revents); void void container_queue_init(struct container_queue *q, SSL *ssl) container_queue_init(struct container_queue *q, struct demo_connection *conn, unsigned int max_idle, size_t max_depth) { { q->ssl = ssl; q->conn = conn; q->max_idle = max_idle; q->max_depth = max_depth; q->head = NULL; q->head = NULL; q->tail = NULL; q->tail = NULL; q->length = 0; q->length = 0; q->container_counter = 0; q->container_counter = 0; /* XXX not yet */ q->max_idle = 0; if (q->max_idle != 0) { ev_init(&q->idle_timer, container_queue_idle_cb); q->idle_timer.repeat = (double)q->max_idle / 1000.0; q->idle_timer.data = q; } } static void container_queue_idle_cb(EV_P_ ev_timer *w, int revents) { struct container_queue *q = w->data; struct demo_connection *other_side = q->conn->other_side; struct demo_splice *splice = q->conn->splice; TLMSP_Container *container; /* This should only be getting enabled on a middlebox */ if (splice != NULL) { demo_conn_log(2, q->conn, "Read queue idle timer expired, " "forwarding all containers in queue"); demo_splice_stop_io(splice); /* * On a middlebox, we've exceeded our time window for * matching, so forward what has been accumulated. */ container = container_queue_head(q); while (container != NULL) { container_queue_remove_head(q); container_queue_add(&other_side->write_queue, container); container = container_queue_head(q); } demo_connection_wait_for(other_side, EV_WRITE); demo_splice_resume_io(splice); } } } bool bool container_queue_add(struct container_queue *q, TLMSP_Container *container) container_queue_add(struct container_queue *q, TLMSP_Container *container) { { struct ev_loop *loop = q->conn->loop; struct container_queue_entry *entry; struct container_queue_entry *entry; entry = malloc(sizeof(*entry)); entry = malloc(sizeof(*entry)); Loading @@ -43,6 +89,10 @@ container_queue_add(struct container_queue *q, TLMSP_Container *container) q->tail = entry; q->tail = entry; q->length += entry->length; q->length += entry->length; if (q->max_idle != 0) { ev_timer_again(EV_A_ &q->idle_timer); } return (true); return (true); } } Loading @@ -63,14 +113,23 @@ container_queue_head_entry(struct container_queue *q) TLMSP_Container * TLMSP_Container * container_queue_remove_head(struct container_queue *q) container_queue_remove_head(struct container_queue *q) { { struct ev_loop *loop = q->conn->loop; struct container_queue_entry *entry; struct container_queue_entry *entry; TLMSP_Container *container = NULL; TLMSP_Container *container = NULL; entry = q->head; entry = q->head; if (entry != NULL) { if (entry != NULL) { q->head = entry->next; q->head = entry->next; if (q->head == NULL) if (q->head == NULL) { q->tail = NULL; q->tail = NULL; if (q->max_idle != 0) { /* * Queue is empty, no need for an idle timer. */ ev_timer_stop(EV_A_ &q->idle_timer); } } q->length -= entry->length; q->length -= entry->length; container = entry->container; container = entry->container; free(entry); free(entry); Loading @@ -82,12 +141,20 @@ container_queue_remove_head(struct container_queue *q) void void container_queue_drain(struct container_queue *q) container_queue_drain(struct container_queue *q) { { struct ev_loop *loop = q->conn->loop; TLMSP_Container *container; TLMSP_Container *container; container = container_queue_head(q); container = container_queue_head(q); while (container != NULL) { while (container != NULL) { container_queue_remove_head(q); container_queue_remove_head(q); TLMSP_container_free(q->ssl, container); TLMSP_container_free(q->conn->ssl, container); container = container_queue_head(q); container = container_queue_head(q); } } if (q->max_idle != 0) { /* * Queue is empty, no need for an idle timer. */ ev_timer_stop(EV_A_ &q->idle_timer); } } } Loading
build/initial-build.sh +6 −3 Original line number Original line Diff line number Diff line Loading @@ -17,16 +17,19 @@ local_install_dir=$1 shift shift build_script_dir=$(pwd) build_script_dir=$(pwd) tlmsp_tools_dir=$(pwd)/.. tlmsp_tools_dir=$(pwd)/.. openssl_dir=${build_script_dir}/../../openssl openssl_dir=${build_script_dir}/../../tlmsp-openssl if [ ! -f ${build_script_dir}/$(basename $0) ]; then if [ ! -f ${build_script_dir}/$(basename $0) ]; then echo "This script is intended to be run from the directory that contains it" echo "This script is intended to be run from the directory that contains it" exit 4 exit 4 fi fi if [ ! -d ${openssl_dir} ]; then openssl_dir=${build_script_dir}/../../openssl if [ ! -d ${openssl_dir} ]; then if [ ! -d ${openssl_dir} ]; then echo "The openssl source directory needs to be alongside the tlmsp-tools directory" echo "The openssl source directory needs to be alongside the tlmsp-tools directory" exit 5 exit 5 fi fi fi if [ ! -f "${openssl_dir}/include/openssl/tlmsp.h" ]; then if [ ! -f "${openssl_dir}/include/openssl/tlmsp.h" ]; then echo "The openssl source directory does include TLMSP support" echo "The openssl source directory does include TLMSP support" exit 6 exit 6 Loading
client/client.c +12 −15 Original line number Original line Diff line number Diff line Loading @@ -500,12 +500,6 @@ conn_cb(EV_P_ ev_io *w, int revents) connection_died(conn); connection_died(conn); return; return; } } /* * As processing read data above may have added to an * otherwise empty write queue, ensure response to EV_WRITE. */ if (demo_connection_writes_pending(conn)) demo_connection_wait_for(conn, EV_WRITE); } } /* /* Loading Loading @@ -546,7 +540,7 @@ read_containers(struct demo_connection *conn) if (ssl_result > 0) { if (ssl_result > 0) { if (!TLMSP_get_last_read_context(ssl, &context_id)) if (!TLMSP_get_last_read_context(ssl, &context_id)) return (false); return (false); demo_conn_log(3, conn, "received container (length=%u) " demo_conn_log(2, conn, "Received container (length=%u) " "in context %u using stream API", ssl_result, "in context %u using stream API", ssl_result, context_id); context_id); if (!TLMSP_container_create(ssl, &container, context_id, if (!TLMSP_container_create(ssl, &container, context_id, Loading @@ -560,7 +554,7 @@ read_containers(struct demo_connection *conn) } else { } else { ssl_result = TLMSP_container_read(ssl, &container); ssl_result = TLMSP_container_read(ssl, &container); if (ssl_result > 0) { if (ssl_result > 0) { demo_conn_log(3, conn, "received container (length=%u) " demo_conn_log(2, conn, "Received container (length=%u) " "in context %u using container API", "in context %u using container API", TLMSP_container_length(container), TLMSP_container_length(container), TLMSP_container_context(container)); TLMSP_container_context(container)); Loading @@ -579,12 +573,15 @@ read_containers(struct demo_connection *conn) ssl_error = SSL_get_error(ssl, ssl_result); ssl_error = SSL_get_error(ssl, ssl_result); switch (ssl_error) { switch (ssl_error) { case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ: demo_conn_log(5, conn, "SSL_ERROR_WANT_READ"); demo_connection_wait_for(conn, EV_READ); demo_connection_wait_for(conn, EV_READ); break; break; case SSL_ERROR_WANT_WRITE: case SSL_ERROR_WANT_WRITE: demo_conn_log(5, conn, "SSL_ERROR_WANT_WRITE"); demo_connection_wait_for(conn, EV_WRITE); demo_connection_wait_for(conn, EV_WRITE); break; break; case SSL_ERROR_WANT_RECONNECT: case SSL_ERROR_WANT_RECONNECT: demo_conn_log(5, conn, "SSL_ERROR_WANT_RECONNECT"); conn_state->reconnect_state = conn_state->reconnect_state = TLMSP_get_reconnect_state(ssl); TLMSP_get_reconnect_state(ssl); result = false; result = false; Loading @@ -598,7 +595,7 @@ read_containers(struct demo_connection *conn) return (result); return (result); } } demo_conn_log_buf(4, conn, "Container data", demo_conn_log_buf(3, conn, "Container data", TLMSP_container_get_data(container), TLMSP_container_get_data(container), TLMSP_container_length(container), true); TLMSP_container_length(container), true); Loading Loading @@ -628,11 +625,11 @@ write_containers(struct demo_connection *conn) container = container_queue_head(&conn->write_queue); container = container_queue_head(&conn->write_queue); while (container != NULL) { while (container != NULL) { context_id = TLMSP_container_context(container); context_id = TLMSP_container_context(container); demo_conn_log(3, conn, "sending container (length=%u) " demo_conn_log(2, conn, "Sending container (length=%u) " "in context %u using %s API", "in context %u using %s API", TLMSP_container_length(container), context_id, TLMSP_container_length(container), context_id, client->use_stream_api ? "stream" : "container"); client->use_stream_api ? "stream" : "container"); demo_conn_log_buf(4, conn, "Container data", demo_conn_log_buf(3, conn, "Container data", TLMSP_container_get_data(container), TLMSP_container_get_data(container), TLMSP_container_length(container), true); TLMSP_container_length(container), true); if (client->use_stream_api) { if (client->use_stream_api) { Loading @@ -652,7 +649,7 @@ write_containers(struct demo_connection *conn) ssl_result = TLMSP_container_write(ssl, container); ssl_result = TLMSP_container_write(ssl, container); } } if (ssl_result > 0) { if (ssl_result > 0) { demo_conn_log(3, conn, "container send complete (result=%d)", ssl_result); demo_conn_log(2, conn, "Container send complete (result=%d)", ssl_result); container_queue_remove_head(&conn->write_queue); container_queue_remove_head(&conn->write_queue); container = container_queue_head(&conn->write_queue); container = container_queue_head(&conn->write_queue); } else } else Loading @@ -663,15 +660,15 @@ write_containers(struct demo_connection *conn) ssl_error = SSL_get_error(ssl, ssl_result); ssl_error = SSL_get_error(ssl, ssl_result); switch (ssl_error) { switch (ssl_error) { case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ: demo_conn_log(4, conn, "SSL_ERROR_WANT_READ"); demo_conn_log(5, conn, "SSL_ERROR_WANT_READ"); demo_connection_wait_for(conn, EV_READ); demo_connection_wait_for(conn, EV_READ); break; break; case SSL_ERROR_WANT_WRITE: case SSL_ERROR_WANT_WRITE: demo_conn_log(4, conn, "SSL_ERROR_WANT_WRITE"); demo_conn_log(5, conn, "SSL_ERROR_WANT_WRITE"); demo_connection_wait_for(conn, EV_WRITE); demo_connection_wait_for(conn, EV_WRITE); break; break; case SSL_ERROR_WANT_RECONNECT: case SSL_ERROR_WANT_RECONNECT: demo_conn_log(4, conn, "SSL_ERROR_WANT_RECONNECT"); demo_conn_log(5, conn, "SSL_ERROR_WANT_RECONNECT"); conn_state->reconnect_state = TLMSP_get_reconnect_state(ssl); conn_state->reconnect_state = TLMSP_get_reconnect_state(ssl); result = false; result = false; break; break; Loading
libdemo/activity.c +55 −26 Original line number Original line Diff line number Diff line Loading @@ -339,8 +339,8 @@ demo_activity_queue_payload(struct demo_connection *conn, struct tlmsp_cfg_payload *payload, struct match_groups *match_groups, struct tlmsp_cfg_payload *payload, struct match_groups *match_groups, bool create_if_empty) bool create_if_empty) { { struct container_queue *q = &conn->write_queue; struct container_queue *q = &conn->write_queue; SSL *ssl = q->conn->ssl; TLMSP_Container *container; TLMSP_Container *container; const uint8_t *data; const uint8_t *data; size_t len; size_t len; Loading Loading @@ -368,7 +368,7 @@ demo_activity_queue_payload(struct demo_connection *conn, container_len = MAX_CONTAINER_SIZE; container_len = MAX_CONTAINER_SIZE; else else container_len = remaining; container_len = remaining; if (!TLMSP_container_create(q->ssl, &container, if (!TLMSP_container_create(ssl, &container, payload->context->id, &data[offset], payload->context->id, &data[offset], container_len)) { container_len)) { demo_conn_print_error_ssl_errq(conn, demo_conn_print_error_ssl_errq(conn, Loading @@ -379,10 +379,10 @@ demo_activity_queue_payload(struct demo_connection *conn, if (!container_queue_add(q, container)) { if (!container_queue_add(q, container)) { demo_conn_print_error(conn, demo_conn_print_error(conn, "Failed to add payload container to queue\n"); "Failed to add payload container to queue\n"); TLMSP_container_free(q->ssl, container); TLMSP_container_free(ssl, container); goto out; goto out; } } demo_conn_log(3, conn, "Queued container (length=%u) in " demo_conn_log(2, conn, "Queued container (length=%u) in " "context %u", container_len, payload->context->id); "context %u", container_len, payload->context->id); offset += container_len; offset += container_len; Loading Loading @@ -436,11 +436,11 @@ demo_activity_add_time_triggered_message(struct demo_connection *conn, msg->free_data = free_data; msg->free_data = free_data; if (every == 0.0) if (every == 0.0) demo_conn_log(3, conn, "adding on-shot message at %u ms " demo_conn_log(2, conn, "Adding on-shot message at %u ms " "(length=%u) in context %u", (unsigned int)(at * 1000.0), "(length=%u) in context %u", (unsigned int)(at * 1000.0), len, msg->context_id); len, msg->context_id); else else demo_conn_log(3, conn, "adding periodic message at %u ms, " demo_conn_log(2, conn, "Adding periodic message at %u ms, " "period %u ms (length=%u) in context %u", "period %u ms (length=%u) in context %u", (unsigned int)(((at == 0.0) ? every : at) * 1000.0), (unsigned int)(((at == 0.0) ? every : at) * 1000.0), (unsigned int)(every * 1000.0), len, msg->context_id); (unsigned int)(every * 1000.0), len, msg->context_id); Loading Loading @@ -492,7 +492,7 @@ demo_activity_time_triggered_cb(EV_P_ ev_timer *w, int revents) TLMSP_container_free(conn->ssl, container); TLMSP_container_free(conn->ssl, container); return; return; } } demo_conn_log(3, conn, "Queued time-triggered container " demo_conn_log(2, conn, "Queued time-triggered container " "(length=%u) in context %u", container_len, msg->context_id); "(length=%u) in context %u", container_len, msg->context_id); offset += container_len; offset += container_len; Loading Loading @@ -589,10 +589,6 @@ demo_activity_run_payload_handler(struct demo_connection *log_conn, goto stderr_fail; goto stderr_fail; } } printf("stdin READ(%u)=%d WRITE(%u)=%d\n", PIPE_READ_FD, stdin_pipe[PIPE_READ_FD], PIPE_WRITE_FD, stdin_pipe[PIPE_WRITE_FD]); printf("stdout READ(%u)=%d WRITE(%u)=%d\n", PIPE_READ_FD, stdout_pipe[PIPE_READ_FD], PIPE_WRITE_FD, stdout_pipe[PIPE_WRITE_FD]); printf("stderr READ(%u)=%d WRITE(%u)=%d\n", PIPE_READ_FD, stderr_pipe[PIPE_READ_FD], PIPE_WRITE_FD, stderr_pipe[PIPE_WRITE_FD]); child_pid = fork(); child_pid = fork(); if (0 == child_pid) { if (0 == child_pid) { /* child */ /* child */ Loading Loading @@ -718,8 +714,6 @@ demo_activity_handler_stdin_cb(EV_P_ ev_io *w, int revents) bytes_written = write(w->fd, state->out_buf, state->out_remaining); bytes_written = write(w->fd, state->out_buf, state->out_remaining); if (bytes_written == -1) { if (bytes_written == -1) { demo_conn_log(5, state->log_conn, "stdin -1"); demo_conn_log(5, state->log_conn, "stdin -1"); if (errno != EAGAIN) if (errno != EAGAIN) ev_break(EV_A_ EVBREAK_ONE); ev_break(EV_A_ EVBREAK_ONE); return; return; Loading @@ -743,6 +737,7 @@ demo_activity_handler_stdout_cb(EV_P_ ev_io *w, int revents) size_t space, new_size; size_t space, new_size; ssize_t bytes_read; ssize_t bytes_read; uint8_t *p; uint8_t *p; demo_conn_log(5, state->log_conn, "Handler stdout event"); demo_conn_log(5, state->log_conn, "Handler stdout event"); /* /* * If we have less than the realloc threshold, extend the buffer. * If we have less than the realloc threshold, extend the buffer. Loading Loading @@ -783,6 +778,7 @@ demo_activity_handler_stderr_cb(EV_P_ ev_io *w, int revents) struct payload_handler_state *state = w->data; struct payload_handler_state *state = w->data; ssize_t bytes_read; ssize_t bytes_read; size_t space; size_t space; demo_conn_log(5, state->log_conn, "Handler stderr event"); demo_conn_log(5, state->log_conn, "Handler stderr event"); /* always leave one byte at the end for a NUL */ /* always leave one byte at the end for a NUL */ space = sizeof(state->err_buf) - state->err_offset - 1; space = sizeof(state->err_buf) - state->err_offset - 1; Loading Loading @@ -969,6 +965,23 @@ demo_activity_process_read_queue(struct demo_connection *conn) return (false); return (false); } while ((selected_match_state != NULL) && (conn->read_queue.head != NULL)); } while ((selected_match_state != NULL) && (conn->read_queue.head != NULL)); /* * We may have added data to the connection's write queue as a * result of the match-action activity processed above, so ensure * the write event is set. */ if (demo_connection_writes_pending(conn)) demo_connection_wait_for(conn, EV_WRITE); /* * If this connection is part of a splice, the above match-action * activity may have queued new data to the other side's write * queue. */ if (conn->splice != NULL) { if (demo_connection_writes_pending(conn->other_side)) demo_connection_wait_for(conn->other_side, EV_WRITE); } return (true); return (true); } } Loading Loading @@ -1006,8 +1019,10 @@ demo_activity_container_match(struct demo_connection *conn, "(%"PRIu64" ?= %" PRIu64")", entry->container_number, "(%"PRIu64" ?= %" PRIu64")", entry->container_number, match->container.param.n); match->container.param.n); if (entry->container_number == match->container.param.n) if (entry->container_number == match->container.param.n) { demo_conn_log(4, conn, "Found container number match"); match_found = true; match_found = true; } break; break; case TLMSP_CFG_MATCH_CONTAINER_PROBABILITY: case TLMSP_CFG_MATCH_CONTAINER_PROBABILITY: demo_conn_log(5, conn, "Checking container probability " demo_conn_log(5, conn, "Checking container probability " Loading @@ -1015,11 +1030,13 @@ demo_activity_container_match(struct demo_connection *conn, match->container.param.p * match_state->containers_inspected); match->container.param.p * match_state->containers_inspected); if ((double)match_state->containers_matched < if ((double)match_state->containers_matched < (match->container.param.p * match_state->containers_inspected)) (match->container.param.p * match_state->containers_inspected)) { demo_conn_log(4, conn, "Found container probability match"); match_found = true; match_found = true; } break; break; case TLMSP_CFG_MATCH_CONTAINER_ALL: case TLMSP_CFG_MATCH_CONTAINER_ALL: demo_conn_log(5, conn, "Matching all containers"); demo_conn_log(4, conn, "Found all-containers match"); match_found = true; match_found = true; break; break; } } Loading Loading @@ -1129,6 +1146,7 @@ demo_activity_pattern_match(struct demo_connection *conn, result = memmem(search_buf, search_len, pattern_buf->p, result = memmem(search_buf, search_len, pattern_buf->p, pattern_buf->len); pattern_buf->len); if (result != NULL) { if (result != NULL) { demo_conn_log(4, conn, "Found static pattern match"); match_found = true; match_found = true; match_offset = result - search_buf; match_offset = result - search_buf; match_len = pattern_buf->len; match_len = pattern_buf->len; Loading Loading @@ -1170,7 +1188,7 @@ demo_activity_pattern_match(struct demo_connection *conn, num_match_groups = pcre2_match(match->pattern.param.regex, num_match_groups = pcre2_match(match->pattern.param.regex, search_buf, search_len, 0, 0, match_data, NULL); search_buf, search_len, 0, 0, match_data, NULL); if (num_match_groups > 0) { if (num_match_groups > 0) { demo_conn_log(5, conn, "Match with %d groups", demo_conn_log(4, conn, "Found regex match with %d groups", num_match_groups); num_match_groups); match_found = true; match_found = true; output = pcre2_get_ovector_pointer(match_data); output = pcre2_get_ovector_pointer(match_data); Loading Loading @@ -1206,8 +1224,6 @@ demo_activity_pattern_match(struct demo_connection *conn, } } if (match_found) { if (match_found) { demo_conn_log(5, conn, "Match found (offset=%zu, length=%zu)", match_offset, match_len); /* /* * Set up the match result range * Set up the match result range */ */ Loading @@ -1222,7 +1238,7 @@ demo_activity_pattern_match(struct demo_connection *conn, ((match_offset + match_len) <= (offset + entry->length))) { ((match_offset + match_len) <= (offset + entry->length))) { match_range->last = entry; match_range->last = entry; match_range->last_remainder = match_range->last_remainder = match_offset + match_len - offset; entry->length - (match_offset + match_len - offset); } } offset += entry->length; offset += entry->length; } } Loading Loading @@ -1415,6 +1431,7 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con struct container_queue_range *match_range, bool drop_all, struct container_queue_range *match_range, bool drop_all, bool delete_match) bool delete_match) { { SSL *write_q_ssl = write_q->conn->ssl; TLMSP_Container *container, *new_container; TLMSP_Container *container, *new_container; const uint8_t *src; const uint8_t *src; size_t delete_or_forward_bytes; size_t delete_or_forward_bytes; Loading @@ -1439,7 +1456,7 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "(context=%u, length=%zu)", "(context=%u, length=%zu)", TLMSP_container_context(container), TLMSP_container_context(container), TLMSP_container_length(container)); TLMSP_container_length(container)); if (!TLMSP_container_delete(write_q->ssl, container)) { if (!TLMSP_container_delete(write_q_ssl, container)) { demo_conn_print_error_ssl_errq(log_conn, demo_conn_print_error_ssl_errq(log_conn, "Failed to add delete preamble container"); "Failed to add delete preamble container"); return (false); return (false); Loading @@ -1465,7 +1482,7 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "container (context=%u, length=%zu)", "container (context=%u, length=%zu)", TLMSP_container_context(container), TLMSP_container_context(container), TLMSP_container_length(container)); TLMSP_container_length(container)); TLMSP_container_free(write_q->ssl, container); TLMSP_container_free(write_q_ssl, container); } else { } else { /* deleting or forwarding */ /* deleting or forwarding */ if (delete_match) { if (delete_match) { Loading @@ -1473,7 +1490,7 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "container (context=%u, length=%zu)", "container (context=%u, length=%zu)", TLMSP_container_context(container), TLMSP_container_context(container), TLMSP_container_length(container)); TLMSP_container_length(container)); if (!TLMSP_container_delete(write_q->ssl, container)) { if (!TLMSP_container_delete(write_q_ssl, container)) { demo_conn_print_error_ssl_errq(log_conn, demo_conn_print_error_ssl_errq(log_conn, "Failed to add delete preamble/match container"); "Failed to add delete preamble/match container"); return (false); return (false); Loading @@ -1484,6 +1501,10 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "Failed to add preamble/match container to write queue"); "Failed to add preamble/match container to write queue"); return (false); return (false); } } demo_conn_log(2, log_conn, "Queued forwarded premable/match " "container (length=%u) in context %u", TLMSP_container_length(container), TLMSP_container_context(container)); } } container = container_queue_head(read_q); container = container_queue_head(read_q); } } Loading @@ -1499,7 +1520,7 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con delete_or_forward_bytes = TLMSP_container_length(container) - delete_or_forward_bytes = TLMSP_container_length(container) - match_range->last_remainder; match_range->last_remainder; if (!drop_all && !delete_match) { if (!drop_all && !delete_match) { if (!TLMSP_container_create(write_q->ssl, if (!TLMSP_container_create(write_q_ssl, &new_container, &new_container, TLMSP_container_context(container), TLMSP_container_context(container), TLMSP_container_get_data(container), TLMSP_container_get_data(container), Loading @@ -1515,6 +1536,10 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "of match data to write queue"); "of match data to write queue"); return (false); return (false); } } demo_conn_log(2, log_conn, "Queued partial match data " "container (length=%u) in context %u", TLMSP_container_length(new_container), TLMSP_container_context(new_container)); } } /* remove leading data on container staying in queue */ /* remove leading data on container staying in queue */ src = TLMSP_container_get_data(container); src = TLMSP_container_get_data(container); Loading @@ -1526,14 +1551,14 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "(context=%u, length=%zu)", "(context=%u, length=%zu)", TLMSP_container_context(container), TLMSP_container_context(container), TLMSP_container_length(container)); TLMSP_container_length(container)); TLMSP_container_free(write_q->ssl, container); TLMSP_container_free(write_q_ssl, container); } else { } else { if (delete_match) { if (delete_match) { demo_conn_log(5, log_conn, "Deleting final match container " demo_conn_log(5, log_conn, "Deleting final match container " "(context=%u, length=%zu)", "(context=%u, length=%zu)", TLMSP_container_context(container), TLMSP_container_context(container), TLMSP_container_length(container)); TLMSP_container_length(container)); if (!TLMSP_container_delete(write_q->ssl, container)) { if (!TLMSP_container_delete(write_q_ssl, container)) { demo_conn_print_error_ssl_errq(log_conn, demo_conn_print_error_ssl_errq(log_conn, "Failed to delete complete container for " "Failed to delete complete container for " "tail end of match data"); "tail end of match data"); Loading @@ -1546,6 +1571,10 @@ demo_activity_apply_match_delete_drop_or_forward(struct demo_connection *log_con "match data to write queue"); "match data to write queue"); return (false); return (false); } } demo_conn_log(2, log_conn, "Queued forwarded final match data " "container (length=%u) in context %u", TLMSP_container_length(container), TLMSP_container_context(container)); } } } } Loading
libdemo/connection.c +19 −7 Original line number Original line Diff line number Diff line Loading @@ -16,6 +16,12 @@ #include "splice.h" #include "splice.h" #include "print.h" #include "print.h" /* XXX make configurable */ #define READ_QUEUE_MAX_IDLE_MS 100 #define READ_QUEUE_MAX_DEPTH_BYTES (1*1024*1024) static void demo_conn_connected_cb(EV_P_ ev_io *w, int revents); static void demo_conn_connected_cb(EV_P_ ev_io *w, int revents); Loading Loading @@ -96,8 +102,14 @@ demo_connection_init_io(struct demo_connection *conn, SSL_CTX *ssl_ctx, int sock return (false); return (false); } } container_queue_init(&conn->read_queue, conn->ssl); /* container_queue_init(&conn->write_queue, conn->ssl); * Read queues on endpoints do not have an idle timer, * but middlebox read queues do. */ container_queue_init(&conn->read_queue, conn, (conn->splice == NULL) ? 0 : READ_QUEUE_MAX_IDLE_MS, READ_QUEUE_MAX_DEPTH_BYTES); container_queue_init(&conn->write_queue, conn, 0, 0); if (!demo_activity_conn_queue_initial(conn)) { if (!demo_activity_conn_queue_initial(conn)) { demo_conn_print_error(conn, demo_conn_print_error(conn, Loading Loading @@ -179,11 +191,11 @@ demo_connection_events_arrived(struct demo_connection *conn, int events) { { if (events & EV_ERROR) if (events & EV_ERROR) demo_conn_log(3, conn, "ERROR event"); demo_conn_log(5, conn, "ERROR event"); if (events & EV_READ) if (events & EV_READ) demo_conn_log(3, conn, "READ event"); demo_conn_log(5, conn, "READ event"); if (events & EV_WRITE) if (events & EV_WRITE) demo_conn_log(3, conn, "WRITE event"); demo_conn_log(5, conn, "WRITE event"); conn->wait_events &= ~events; conn->wait_events &= ~events; } } Loading @@ -200,9 +212,9 @@ demo_connection_wait_for(struct demo_connection *conn, int events) { { if (events & EV_READ) if (events & EV_READ) demo_conn_log(3, conn, "wait for readable"); demo_conn_log(5, conn, "Wait for readable"); if (events & EV_WRITE) if (events & EV_WRITE) demo_conn_log(3, conn, "wait for writable"); demo_conn_log(5, conn, "Wait for writable"); conn->wait_events |= events; conn->wait_events |= events; } } Loading
libdemo/container_queue.c +71 −4 Original line number Original line Diff line number Diff line Loading @@ -7,23 +7,69 @@ #include <stdlib.h> #include <stdlib.h> #include "connection.h" #include "container_queue.h" #include "container_queue.h" #include "print.h" #include "splice.h" static void container_queue_idle_cb(EV_P_ ev_timer *w, int revents); void void container_queue_init(struct container_queue *q, SSL *ssl) container_queue_init(struct container_queue *q, struct demo_connection *conn, unsigned int max_idle, size_t max_depth) { { q->ssl = ssl; q->conn = conn; q->max_idle = max_idle; q->max_depth = max_depth; q->head = NULL; q->head = NULL; q->tail = NULL; q->tail = NULL; q->length = 0; q->length = 0; q->container_counter = 0; q->container_counter = 0; /* XXX not yet */ q->max_idle = 0; if (q->max_idle != 0) { ev_init(&q->idle_timer, container_queue_idle_cb); q->idle_timer.repeat = (double)q->max_idle / 1000.0; q->idle_timer.data = q; } } static void container_queue_idle_cb(EV_P_ ev_timer *w, int revents) { struct container_queue *q = w->data; struct demo_connection *other_side = q->conn->other_side; struct demo_splice *splice = q->conn->splice; TLMSP_Container *container; /* This should only be getting enabled on a middlebox */ if (splice != NULL) { demo_conn_log(2, q->conn, "Read queue idle timer expired, " "forwarding all containers in queue"); demo_splice_stop_io(splice); /* * On a middlebox, we've exceeded our time window for * matching, so forward what has been accumulated. */ container = container_queue_head(q); while (container != NULL) { container_queue_remove_head(q); container_queue_add(&other_side->write_queue, container); container = container_queue_head(q); } demo_connection_wait_for(other_side, EV_WRITE); demo_splice_resume_io(splice); } } } bool bool container_queue_add(struct container_queue *q, TLMSP_Container *container) container_queue_add(struct container_queue *q, TLMSP_Container *container) { { struct ev_loop *loop = q->conn->loop; struct container_queue_entry *entry; struct container_queue_entry *entry; entry = malloc(sizeof(*entry)); entry = malloc(sizeof(*entry)); Loading @@ -43,6 +89,10 @@ container_queue_add(struct container_queue *q, TLMSP_Container *container) q->tail = entry; q->tail = entry; q->length += entry->length; q->length += entry->length; if (q->max_idle != 0) { ev_timer_again(EV_A_ &q->idle_timer); } return (true); return (true); } } Loading @@ -63,14 +113,23 @@ container_queue_head_entry(struct container_queue *q) TLMSP_Container * TLMSP_Container * container_queue_remove_head(struct container_queue *q) container_queue_remove_head(struct container_queue *q) { { struct ev_loop *loop = q->conn->loop; struct container_queue_entry *entry; struct container_queue_entry *entry; TLMSP_Container *container = NULL; TLMSP_Container *container = NULL; entry = q->head; entry = q->head; if (entry != NULL) { if (entry != NULL) { q->head = entry->next; q->head = entry->next; if (q->head == NULL) if (q->head == NULL) { q->tail = NULL; q->tail = NULL; if (q->max_idle != 0) { /* * Queue is empty, no need for an idle timer. */ ev_timer_stop(EV_A_ &q->idle_timer); } } q->length -= entry->length; q->length -= entry->length; container = entry->container; container = entry->container; free(entry); free(entry); Loading @@ -82,12 +141,20 @@ container_queue_remove_head(struct container_queue *q) void void container_queue_drain(struct container_queue *q) container_queue_drain(struct container_queue *q) { { struct ev_loop *loop = q->conn->loop; TLMSP_Container *container; TLMSP_Container *container; container = container_queue_head(q); container = container_queue_head(q); while (container != NULL) { while (container != NULL) { container_queue_remove_head(q); container_queue_remove_head(q); TLMSP_container_free(q->ssl, container); TLMSP_container_free(q->conn->ssl, container); container = container_queue_head(q); container = container_queue_head(q); } } if (q->max_idle != 0) { /* * Queue is empty, no need for an idle timer. */ ev_timer_stop(EV_A_ &q->idle_timer); } } }