Commit d20fa8ce authored by kelsey's avatar kelsey
Browse files

Improvements to documentation, logging, connection error handling behavior,...

Improvements to documentation, logging, connection error handling behavior, and container queue management
parent 1c30be19
Loading
Loading
Loading
Loading
+8 −3
Original line number Diff line number Diff line
@@ -175,11 +175,16 @@ Under Linux, one has to resort to `kill -SIGUSR1 <pid>` from another terminal.

The apache-httpd configuration is installed at `<install_dir>/etc/apache24/httpd.conf`
and includes `<install_dir>/etc/apache24/httpd_tlmsp.conf`, which in turn 
defines two virtual hosts:
defines several virtual hosts:

* One on `localhost:4443` configured to use `${TLMSP_UCL}/apache.ucl`
* One on `localhost:4444` configured to use `${TLMSP_UCL}/apache.1mbox.ucl`
| Listen address | Configuration file              |
|----------------|---------------------------------|
| localhost:4443 | ${TLMSP_UCL}/apache.ucl         |
| localhost:4444 | ${TLMSP_UCL}/apache.1mbox.ucl   |
| localhost:4445 | ${TLMSP_UCL}/apache.2mbox.ucl   |
| localhost:4446 | ${TLMSP_UCL}/apache.251mbox.ucl |

<br>
The apache-httpd TLMSP integration requires the UCL file to define two contexts,
one with the tag 'header' and one with the tag 'body', into which it places
response headers and bodies, respectively.
+1 −0
Original line number Diff line number Diff line
@@ -270,6 +270,7 @@ if [ "${do_configure}" = "yes" ]; then
    announce "Configuring OpenSSL"
    require_success ./config \
		    ${debug:+-d} \
		    --strict-warnings \
		    --prefix=${install_dir}
fi
announce "Building OpenSSL"
+32 −13
Original line number Diff line number Diff line
@@ -421,7 +421,6 @@ connection_died(struct demo_connection *conn)
	struct client_state *client = conn_state->client;

	demo_connection_stop_io(conn);
	demo_connection_wait_for_none(conn);

	/*
	 * If we arrived here due to SSL_ERROR_WANT_RECONNECT, there will be
@@ -508,6 +507,7 @@ read_containers(struct demo_connection *conn)
	int ssl_result;
	int ssl_error;
	TLMSP_Container *container;
	size_t length;
	tlmsp_context_id_t context_id;

	/*
@@ -535,13 +535,29 @@ read_containers(struct demo_connection *conn)
	} else {
		ssl_result = TLMSP_container_read(ssl, &container);
		if (ssl_result > 0) {
			demo_conn_log(2, conn, "Received container (length=%u) "
			    "in context %u using container API",
			    TLMSP_container_length(container),
			    TLMSP_container_context(container));
			if (!container_queue_add(&conn->read_queue, container)) {
			context_id = TLMSP_container_context(container);
			length = TLMSP_container_length(container);
			if (TLMSP_container_deleted(container)) {
				demo_conn_log(2, conn, "Received deleted "
				    "container in context %u using container API",
				    context_id);
				TLMSP_container_free(ssl, container);
				return (true);
			} else if (!TLMSP_container_readable(container)) {
				demo_conn_print_error(conn,
				    "Opaque container unexpectedly received in "
				    "context %d using containe API", context_id);
				TLMSP_container_free(ssl, container);
				return (false);
			} else {
				demo_conn_log(2, conn, "Received container "
				    "(length=%u) in context %u using container "
				    "API", length, context_id);
				if (!container_queue_add(&conn->read_queue,
					container)) {
					TLMSP_container_free(ssl, container);
					return (false);
				}
			}
		}
	}
@@ -597,6 +613,7 @@ write_containers(struct demo_connection *conn)
	struct client_state *client = conn_state->client;
	SSL *ssl = conn->ssl;
	TLMSP_Container *container;
	size_t length;
	tlmsp_context_id_t context_id;
	int result;
	int ssl_result;
@@ -605,12 +622,12 @@ write_containers(struct demo_connection *conn)
	container = container_queue_head(&conn->write_queue);
	while (container != NULL) {
		context_id = TLMSP_container_context(container);
		length = TLMSP_container_length(container);
		demo_conn_log(2, conn, "Sending container (length=%u) "
		    "in context %u using %s API",
		    TLMSP_container_length(container), context_id,
		    "in context %u using %s API", length, context_id,
		    client->use_stream_api ? "stream" : "container");
		demo_conn_log_buf(3, conn, TLMSP_container_get_data(container),
		    TLMSP_container_length(container), true, "Container data");
		    length, true, "Container data");
		if (client->use_stream_api) {
			if (!TLMSP_set_current_context(ssl, context_id)) {
				demo_conn_print_error(conn,
@@ -619,8 +636,7 @@ write_containers(struct demo_connection *conn)
				return (false);
			}
			ssl_result = SSL_write(ssl,
			    TLMSP_container_get_data(container),
			    TLMSP_container_length(container));
			    TLMSP_container_get_data(container), length);
			if (ssl_result > 0) {
				TLMSP_container_free(ssl, container);
			}
@@ -628,12 +644,15 @@ write_containers(struct demo_connection *conn)
			ssl_result = TLMSP_container_write(ssl, container);
		}
		if (ssl_result > 0) {
			demo_conn_log(2, conn, "Container send complete (result=%d)", ssl_result);
			demo_conn_log(2, conn,
			    "Container send complete (result=%d)", ssl_result);
			container_queue_remove_head(&conn->write_queue);
			container = container_queue_head(&conn->write_queue);
		} else
		} else {
			demo_conn_log(2, conn, "Channel not ready");
			break;
		}
	}
	result = true;
	if (ssl_result <= 0) {
		ssl_error = SSL_get_error(ssl, ssl_result);
+80 −152
Original line number Diff line number Diff line
@@ -89,20 +89,9 @@ struct payload_handler_state {
};


static bool demo_activity_queue_initial(struct demo_connection *conn,
                                        struct tlmsp_cfg_activity **activities,
                                        unsigned int num_activities,
                                        bool replies);
static bool demo_activity_set_up_time_triggered(struct demo_connection *conn,
                                                struct tlmsp_cfg_activity **activities,
                                                unsigned int num_activities,
                                                bool replies);
static void demo_activity_get_activities(struct demo_connection *conn,
                                         bool replies,
                                         struct tlmsp_cfg_activity ***activities,
                                         unsigned int *num_activities);
static bool demo_activity_queue_initial(struct demo_connection *from_conn,
                                        bool sends, bool replies);
static bool demo_activity_queue_payload(struct demo_connection *conn,
                                        bool replies,
                                        struct tlmsp_cfg_payload *payload,
                                        struct match_groups *match_groups,
                                        bool present);
@@ -136,7 +125,7 @@ static bool demo_activity_queue_range_to_buffer(struct container_queue_range *ra
                                                uint8_t **buf, size_t *len);
static bool demo_activity_apply_match_actions(struct demo_connection *inbound_conn,
                                              struct demo_activity_match_state *match_state);
static bool demo_activity_apply_actions(struct demo_connection *conn,
static bool demo_activity_apply_actions(struct demo_connection *activity_conn,
                                        struct tlmsp_cfg_activity *activity,
                                        struct match_groups *match_groups, bool sends,
                                        bool replies);
@@ -159,24 +148,28 @@ static struct demo_connection *demo_activity_get_dataflow_conn(struct demo_conne
 * this connection, and if so, add them to the write queue.
 */
bool
demo_activity_conn_queue_initial(struct demo_connection *conn)
demo_activity_conn_queue_initial(struct demo_connection *to_conn)
{
	struct tlmsp_cfg_activity **activities;
	unsigned int num_activities;
	bool is_middlebox;

	is_middlebox = (to_conn->splice != NULL);

	demo_activity_get_activities(conn, false, &activities, &num_activities);
	if (!demo_activity_queue_initial(conn, activities, num_activities, false))
	/*
	 * Queue initial containers to this connection originating from this
	 * connection's activities.  For an endpoint that includes both
	 * sends and replies (which are synonymous), and for a middlebox
	 * that only includes replies.
	 */
	if (!demo_activity_queue_initial(to_conn, !is_middlebox, true))
		return (false);

	if (conn->splice != NULL) {
	if (is_middlebox) {
		/*
		 * On a middlebox, this connection can have initial data
		 * queued to it due to defined reply activities.
		 * On a middlebox, queue initial containers to this
		 * connection originating from the other-side connection,
		 * which would only be sends.
		 */
		demo_activity_get_activities(conn, true, &activities,
		    &num_activities);
		if (!demo_activity_queue_initial(conn, activities,
			num_activities, true))
		if (!demo_activity_queue_initial(to_conn->other_side, true, false))
			return (false);
	}

@@ -184,31 +177,23 @@ demo_activity_conn_queue_initial(struct demo_connection *conn)
}

static bool
demo_activity_queue_initial(struct demo_connection *conn,
    struct tlmsp_cfg_activity **activities, unsigned int num_activities,
demo_activity_queue_initial(struct demo_connection *from_conn, bool sends,
    bool replies)
{
	struct tlmsp_cfg_activity *activity;
	struct demo_connection *apply_conn;
	unsigned int i;

	/*
	 * The conn here is the conn to queue writes to, but the
	 * conn that demo_activity_apply_actions() requires is the
	 * source conn.
	 */
	apply_conn  = demo_activity_get_dataflow_conn(conn, replies);
	for (i = 0; i < num_activities; i++) {
		activity = activities[i];
	for (i = 0; i < from_conn->num_activities; i++) {
		activity = from_conn->activities[i];
		if (!activity->match.initial)
			continue;

		if (activity->present)
			demo_conn_present(conn, "Activity %s initial",
			demo_conn_present(from_conn, "Activity %s initial",
			    activity->tag);

		if (!demo_activity_apply_actions(apply_conn, activity, NULL,
			!replies, replies))
		if (!demo_activity_apply_actions(from_conn, activity, NULL,
			sends, replies))
			return (false);
	}

@@ -216,79 +201,19 @@ demo_activity_queue_initial(struct demo_connection *conn,
}

/*
 * Determine if there are any containers that are to be sent on this
 * connection on a time-triggered basis, and if so, set up timers for them.
 * Arrange for all time-triggered containers originating from this
 * connection's activities to be queued at the proper time.
 */
bool
demo_activity_conn_set_up_time_triggered(struct demo_connection *conn)
{
	struct tlmsp_cfg_activity **activities;
	unsigned int num_activities;

	demo_activity_get_activities(conn, false, &activities, &num_activities);
	if (!demo_activity_set_up_time_triggered(conn, activities, num_activities,
		false))
		return (false);

	if (conn->splice != NULL) {
		/*
		 * Middleboxes have an 'other side' for each connection that
		 * messages can be sent on.  If the other side connection
		 * has any time-triggered activities, set them up for this
		 * connection now.
		 */
		demo_activity_get_activities(conn, true, &activities,
		    &num_activities);
		if (!demo_activity_set_up_time_triggered(conn->other_side,
			activities, num_activities, true))
			return (false);
	}

	return (true);
}

void
demo_activity_conn_tear_down_time_triggered(struct demo_connection *conn)
{
	struct ev_loop *loop = conn->loop;
	struct demo_time_triggered_msg *msg, *next;

	msg = conn->time_triggered_messages;
	while (msg != NULL) {
		next = msg->next;
		ev_timer_stop(EV_A_ &msg->timer);
		free(msg);
		msg = next;
	}
}

bool
demo_activity_conn_start_time_triggered(struct demo_connection *conn)
{
	struct ev_loop *loop = conn->loop;
	struct demo_time_triggered_msg *msg;

	msg = conn->time_triggered_messages;
	while (msg != NULL) {
		ev_timer_start(EV_A_ &msg->timer);
		msg = msg->next;
	}

	return (true);
}

static bool
demo_activity_set_up_time_triggered(struct demo_connection *conn,
    struct tlmsp_cfg_activity **activities, unsigned int num_activities,
    bool replies)
demo_activity_conn_set_up_time_triggered(struct demo_connection *from_conn)
{
	struct tlmsp_cfg_activity *activity;
	struct demo_time_triggered_msg *msg;
	unsigned int i;
	ev_tstamp at, every;

	for (i = 0; i < num_activities; i++) {
		activity = activities[i];
	for (i = 0; i < from_conn->num_activities; i++) {
		activity = from_conn->activities[i];

		/*
		 * skip non-time triggered messages and initial-only time-
@@ -301,7 +226,7 @@ demo_activity_set_up_time_triggered(struct demo_connection *conn,
		if (msg == NULL)
			return (false);

		msg->conn = conn;
		msg->conn = from_conn;
		at = activity->match.at;
		every = activity->match.every;
		ev_timer_init(&msg->timer, demo_activity_time_triggered_cb,
@@ -311,45 +236,57 @@ demo_activity_set_up_time_triggered(struct demo_connection *conn,
		msg->activity = activity;

		if (every == 0.0)
			demo_conn_log(2, conn, "Adding one-shot activity %s at "
			demo_conn_log(2, from_conn, "Adding one-shot activity %s at "
			    "%u ms", activity->tag, (unsigned int)(at * 1000.0));
		else
			demo_conn_log(2, conn, "Adding periodic activity %s at "
			demo_conn_log(2, from_conn, "Adding periodic activity %s at "
			    "%u ms, period %u ms", activity->tag,
			    (unsigned int)(((at == 0.0) ? every : at) * 1000.0),
			    (unsigned int)(every * 1000.0));

		if (conn->time_triggered_messages == NULL)
			conn->time_triggered_messages = msg;
		if (from_conn->time_triggered_messages == NULL)
			from_conn->time_triggered_messages = msg;
		else {
			msg->next = conn->time_triggered_messages;
			conn->time_triggered_messages = msg;
			msg->next = from_conn->time_triggered_messages;
			from_conn->time_triggered_messages = msg;
		}
	}

	return (true);
}

/*
 * For the given connection, retrieve the list of activities that can send
 * data on it via time triggers or send-* actions (replies == false) or via
 * reply actions (replies == true).  The replies == true case only applies
 * to splices, as non-splices can only send via time triggers or send-*.
 */
static void
demo_activity_get_activities(struct demo_connection *conn, bool replies,
    struct tlmsp_cfg_activity ***activities, unsigned int *num_activities)
void
demo_activity_conn_tear_down_time_triggered(struct demo_connection *from_conn)
{
	struct ev_loop *loop = from_conn->loop;
	struct demo_time_triggered_msg *msg, *next;

	msg = from_conn->time_triggered_messages;
	while (msg != NULL) {
		next = msg->next;
		ev_timer_stop(EV_A_ &msg->timer);
		free(msg);
		msg = next;
	}
}

bool
demo_activity_conn_start_time_triggered(struct demo_connection *from_conn)
{
	struct demo_connection *source_conn;
	struct ev_loop *loop = from_conn->loop;
	struct demo_time_triggered_msg *msg;

	source_conn = demo_activity_get_dataflow_conn(conn, replies);
	msg = from_conn->time_triggered_messages;
	while (msg != NULL) {
		ev_timer_start(EV_A_ &msg->timer);
		msg = msg->next;
	}

	*activities = source_conn->activities;
	*num_activities = source_conn->num_activities;
	return (true);
}

static bool
demo_activity_queue_payload(struct demo_connection *conn, bool replies,
demo_activity_queue_payload(struct demo_connection *conn,
    struct tlmsp_cfg_payload *payload, struct match_groups *match_groups,
    bool present)
{
@@ -368,10 +305,6 @@ demo_activity_queue_payload(struct demo_connection *conn, bool replies,
	if (payload->type == TLMSP_CFG_PAYLOAD_NONE)
		return (true);

	/* skip if processing replies and this isn't one */
	if (replies && !payload->reply)
		return (true);

	data = NULL;
	free_data = false;
	result = false;
@@ -1395,43 +1328,38 @@ demo_activity_apply_match_actions(struct demo_connection *inbound_conn,
}

static bool
demo_activity_apply_actions(struct demo_connection *conn,
demo_activity_apply_actions(struct demo_connection *activity_conn,
    struct tlmsp_cfg_activity *activity, struct match_groups *match_groups,
    bool sends, bool replies)
{
	struct tlmsp_cfg_action *action;
	struct demo_connection *outbound_conn_for_replies;
	struct demo_connection *send_conn, *reply_conn;
	struct demo_connection *outbound_conn;
	struct demo_connection *send_conn;
	bool reply_filter;
	unsigned int i;

	outbound_conn = demo_activity_get_dataflow_conn(conn, false);
	outbound_conn_for_replies = demo_activity_get_dataflow_conn(conn, true);

	send_conn = demo_activity_get_dataflow_conn(activity_conn, false);
	reply_conn = demo_activity_get_dataflow_conn(activity_conn, true);
	for (i = 0; i < activity->num_actions; i++) {
		action = &activity->actions[i];

		/* XXX fault support */

		if (TLMSP_CFG_PAYLOAD_ENABLED(&action->send)) {
			demo_conn_log(5, conn, "Applying '%s' action",
			/* Skip types that aren't to be evaluated */
			if (!((sends && !action->send.reply) ||
				(replies && action->send.reply)))
				continue;

			demo_conn_log(5, activity_conn, "Applying '%s' action",
			    action->send.reply ? "reply" : "send");
			send_conn = action->send.reply ?
			    outbound_conn_for_replies : outbound_conn;
			if (sends && replies)
				reply_filter = action->send.reply;
			else if (sends)
				reply_filter = false;
			else
				reply_filter = true;
			if (!demo_activity_queue_payload(send_conn,
				reply_filter, &action->send,
				match_groups, activity->present))
			outbound_conn = action->send.reply ?
			    reply_conn : send_conn;
			if (!demo_activity_queue_payload(outbound_conn,
				&action->send, match_groups, activity->present))
				return (false);
		} else if (action->renegotiate) {
			if (!SSL_renegotiate_abbreviated(conn->ssl)) {
				demo_conn_print_error_ssl_errq(conn,
			if (!SSL_renegotiate_abbreviated(activity_conn->ssl)) {
				demo_conn_print_error_ssl_errq(activity_conn,
				    "Failed to schedule a renegotiation");
				return (false);
			}
+4 −4
Original line number Diff line number Diff line
@@ -15,10 +15,10 @@ struct demo_activity_match_state;
struct tlmsp_cfg;
struct tlmsp_cfg_activity;

bool demo_activity_conn_queue_initial(struct demo_connection *conn);
bool demo_activity_conn_set_up_time_triggered(struct demo_connection *conn);
bool demo_activity_conn_start_time_triggered(struct demo_connection *conn);
void demo_activity_conn_tear_down_time_triggered(struct demo_connection *conn);
bool demo_activity_conn_queue_initial(struct demo_connection *to_conn);
bool demo_activity_conn_set_up_time_triggered(struct demo_connection *from_conn);
bool demo_activity_conn_start_time_triggered(struct demo_connection *from_conn);
void demo_activity_conn_tear_down_time_triggered(struct demo_connection *from_conn);
struct demo_activity_match_state *demo_activity_create_match_state(struct tlmsp_cfg_activity **activities,
                                                                   unsigned int num_activities);
bool demo_activity_process_read_queue(struct demo_connection *conn);
Loading