Skip to content

Commit

Permalink
Merge pull request #349 from lf-lang/rti-DNET
Browse files Browse the repository at this point in the history
Downstream next event tag (DNET),  a new signal for more efficient centralized federated execution
  • Loading branch information
byeonggiljun authored Jan 24, 2025
2 parents 906be3f + 31d28e2 commit 856e98a
Show file tree
Hide file tree
Showing 14 changed files with 667 additions and 260 deletions.
9 changes: 6 additions & 3 deletions core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ void usage(int argc, const char* argv[]) {
lf_print(" clock sync attempt (default is 10). Applies to 'init' and 'on'.\n");
lf_print(" -a, --auth Turn on HMAC authentication options.\n");
lf_print(" -t, --tracing Turn on tracing.\n");
lf_print(" -d, --disable_dnet Turn off the use of DNET signals.\n");

lf_print("Command given:");
for (int i = 0; i < argc; i++) {
Expand Down Expand Up @@ -263,6 +264,8 @@ int process_args(int argc, const char* argv[]) {
rti.authentication_enabled = true;
} else if (strcmp(argv[i], "-t") == 0 || strcmp(argv[i], "--tracing") == 0) {
rti.base.tracing_enabled = true;
} else if (strcmp(argv[i], "-d") == 0 || strcmp(argv[i], "--dnet_disabled") == 0) {
rti.base.dnet_disabled = true;
} else if (strcmp(argv[i], " ") == 0) {
// Tolerate spaces
continue;
Expand Down Expand Up @@ -316,9 +319,9 @@ int main(int argc, const char* argv[]) {
assert(rti.base.number_of_scheduling_nodes < UINT16_MAX);

// Allocate memory for the federates
rti.base.scheduling_nodes =
(scheduling_node_t**)calloc(rti.base.number_of_scheduling_nodes, sizeof(scheduling_node_t*));
for (uint16_t i = 0; i < rti.base.number_of_scheduling_nodes; i++) {
int n = rti.base.number_of_scheduling_nodes;
rti.base.scheduling_nodes = (scheduling_node_t**)calloc(n, sizeof(scheduling_node_t*));
for (uint16_t i = 0; i < n; i++) {
federate_info_t* fed_info = (federate_info_t*)calloc(1, sizeof(federate_info_t));
initialize_federate(fed_info, i);
rti.base.scheduling_nodes[i] = (scheduling_node_t*)fed_info;
Expand Down
296 changes: 205 additions & 91 deletions core/federated/RTI/rti_common.c

Large diffs are not rendered by default.

92 changes: 66 additions & 26 deletions core/federated/RTI/rti_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,16 @@ typedef struct scheduling_node_t {
tag_t last_granted; // The maximum TAG that has been granted so far (or NEVER if none granted)
tag_t last_provisionally_granted; // The maximum PTAG that has been provisionally granted (or NEVER if none granted)
tag_t next_event; // Most recent NET received from the scheduling node (or NEVER if none received).
tag_t last_DNET; // Most recent DNET.
scheduling_node_state_t state; // State of the scheduling node.
uint16_t* upstream; // Array of upstream scheduling node ids.
interval_t* upstream_delay; // Minimum delay on connections from upstream scheduling nodes.
// Here, NEVER encodes no delay. 0LL is a microstep delay.
int num_upstream; // Size of the array of upstream scheduling nodes and delays.
uint16_t* downstream; // Array of downstream scheduling node ids.
int num_downstream; // Size of the array of downstream scheduling nodes.
execution_mode_t mode; // FAST or REALTIME.
minimum_delay_t* min_delays; // Array of minimum delays from upstream nodes, not including this node.
size_t num_min_delays; // Size of min_delays array.
int flags; // Or of IS_IN_ZERO_DELAY_CYCLE, IS_IN_CYCLE
uint16_t* immediate_upstreams; // Array of immediate upstream scheduling node ids.
interval_t* immediate_upstream_delays; // Minimum delay on connections from immdediate upstream scheduling nodes.
// Here, NEVER encodes no delay. 0LL is a microstep delay.
uint16_t num_immediate_upstreams; // Size of the array of immediate upstream scheduling nodes and delays.
uint16_t* immediate_downstreams; // Array of immediate downstream scheduling node ids.
uint16_t num_immediate_downstreams; // Size of the array of immediate downstream scheduling nodes.
execution_mode_t mode; // FAST or REALTIME.
int flags; // One of IS_IN_ZERO_DELAY_CYCLE, IS_IN_CYCLE
} scheduling_node_t;

/**
Expand All @@ -76,7 +75,13 @@ typedef struct rti_common_t {
scheduling_node_t** scheduling_nodes;

// Number of scheduling nodes
int32_t number_of_scheduling_nodes;
uint16_t number_of_scheduling_nodes;

// Matrix of minimum delays between pairs of nodes.
// Rows represent upstream nodes and Columns represent downstream nodes.
// FOREVER_TAG means there is no path, and ZERO_TAG means there is no delay.
// This could be NULL if the matrix is not being used, so accesses should test for NULL first.
tag_t* min_delays;

// RTI's decided stop tag for the scheduling nodes
tag_t max_stop_tag;
Expand All @@ -87,6 +92,9 @@ typedef struct rti_common_t {
// Boolean indicating that tracing is enabled.
bool tracing_enabled;

// Boolean indicating that DNET is enabled.
bool dnet_disabled;

// The RTI mutex for making thread-safe access to the shared state.
lf_mutex_t* mutex;
} rti_common_t;
Expand Down Expand Up @@ -246,34 +254,66 @@ tag_t earliest_future_incoming_message_tag(scheduling_node_t* e);
tag_t eimt_strict(scheduling_node_t* e);

/**
* Return true if the node is in a zero-delay cycle.
* If necessary, update the `min_delays` and the fields that indicate cycles.
* These fields will be updated only if they have not been previously updated or if invalidate_min_delays
* has been called since they were last updated.
* @param node The node.
*/
bool is_in_zero_delay_cycle(scheduling_node_t* node);
void update_min_delays();

/**
* Return true if the node is in a cycle (possibly a zero-delay cycle).
* @param node The node.
* Find the tag g that is the latest tag that satisfies lf_tag_add(g, minimum_delay) < next_event_tag.
* This function behaves like the tag subtraction, next_event_tag - minimum_delay.
* minimum_delay cannot be NEVER.
*
* This function is called in function downstream_next_event_tag.
* @param next_event_tag The next event tag of a downstream node.
* @param minimum_delay The minimum delay between the target upstream node and the downstream node.
*/
bool is_in_cycle(scheduling_node_t* node);
tag_t get_dnet_candidate(tag_t next_event_tag, tag_t minimum_delay);

/**
* For the given scheduling node (enclave or federate), if necessary, update the `min_delays`,
* `num_min_delays`, and the fields that indicate cycles. These fields will be
* updated only if they have not been previously updated or if invalidate_min_delays_upstream
* has been called since they were last updated.
* @brief Determine whether the specified scheduling node is needed to receive a downstream next event tag (DNET),
* and, if so, return the details.
*
* This function is called upon receiving a NET from one of the specified node's downstream nodes.
*
* This function calculates the minimum tag M over
* all downstream scheduling nodes of the most recent NET from that node minus the "after delay" (see function
* get_dnet_candidate). If M is earlier than the startup tag, then set the result as the NEVER_TAG.
*
* @param node The target node that may receive a new DNET.
* @param node_sending_new_net_id The ID of the node that sends a new NET. If this node's new NET does not
* change the DNET value, we can exit this function immediately. If it does, we have to look up the target node's
* downstream federates to compute the exact new DNET value.
* @return If needed, return the tag value. Otherwise, return the NEVER_TAG.
*/
tag_t downstream_next_event_tag(scheduling_node_t* node, uint16_t node_sending_new_net_id);

/**
* Notify a downstream next event tag (DNET) signal to the specified scheduling node.
* @param e The target node.
* @param tag The downstream next event tag for e.
*/
void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag);

/**
* Return true if the node is in a zero-delay cycle.
* @param node The node.
*/
void update_min_delays_upstream(scheduling_node_t* node);
bool is_in_zero_delay_cycle(scheduling_node_t* node);

/**
* For the given scheduling node (enclave or federate), invalidate the `min_delays`,
* `num_min_delays`, and the fields that indicate cycles.
* This should be called whenever the structure of the connections upstream of the
* given node have changed.
* Return true if the node is in a cycle (possibly a zero-delay cycle).
* @param node The node.
*/
void invalidate_min_delays_upstream(scheduling_node_t* node);
bool is_in_cycle(scheduling_node_t* node);

/**
* Invalidate the `min_delays`, `num_min_delays`, and the fields that indicate cycles
* of all nodes. This should be called whenever the structure of the connections have changed.
*/
void invalidate_min_delays();

/**
* Free dynamically allocated memory on the scheduling nodes and the scheduling node array itself.
Expand Down
12 changes: 8 additions & 4 deletions core/federated/RTI/rti_local.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ void initialize_local_rti(environment_t* envs, int num_envs) {
rti_local->base.scheduling_nodes[i] = (scheduling_node_t*)enclave_info;

// Encode the connection topology into the enclave_info object.
enclave_info->base.num_downstream = lf_get_downstream_of(i, &enclave_info->base.downstream);
enclave_info->base.num_upstream = lf_get_upstream_of(i, &enclave_info->base.upstream);
lf_get_upstream_delay_of(i, &enclave_info->base.upstream_delay);
enclave_info->base.num_immediate_downstreams = _lf_get_downstream_of(i, &enclave_info->base.immediate_downstreams);
enclave_info->base.num_immediate_upstreams = _lf_get_upstream_of(i, &enclave_info->base.immediate_upstreams);
_lf_get_upstream_delay_of(i, &enclave_info->base.immediate_upstream_delays);

enclave_info->base.state = GRANTED;
}
Expand Down Expand Up @@ -112,7 +112,7 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
}

// If this enclave has no upstream, then we give a TAG till forever straight away.
if (e->base.num_upstream == 0) {
if (e->base.num_immediate_upstreams == 0) {
LF_PRINT_LOG("RTI: enclave %u has no upstream. Giving it a to the NET", e->base.id);
e->base.last_granted = next_event_tag;
}
Expand Down Expand Up @@ -191,6 +191,10 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
LF_PRINT_LOG("RTI: enclave %u callback with PTAG " PRINTF_TAG " ", e->id, tag.time - lf_time_start(), tag.microstep);
}

void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) {
// Nothing to do here.
}

void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) {
// Nothing to do here.
}
Expand Down
90 changes: 58 additions & 32 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
// Note that this is transitive.
// NOTE: This is not needed for enclaves because zero-delay loops are prohibited.
// It's only needed for federates, which is why this is implemented here.
for (int j = 0; j < e->num_upstream; j++) {
scheduling_node_t* upstream = rti_remote->base.scheduling_nodes[e->upstream[j]];
for (int j = 0; j < e->num_immediate_upstreams; j++) {
scheduling_node_t* upstream = rti_remote->base.scheduling_nodes[e->immediate_upstreams[j]];

// Ignore this federate if it has resigned.
if (upstream->state == NOT_CONNECTED)
Expand All @@ -146,6 +146,35 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
}
}

void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) {
if (e->state == NOT_CONNECTED) {
return;
}
// Need to make sure that the destination federate's thread has already
// sent the starting MSG_TYPE_TIMESTAMP message.
while (e->state == PENDING) {
// Need to wait here.
lf_cond_wait(&sent_start_time);
}
size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t);
unsigned char buffer[message_length];
buffer[0] = MSG_TYPE_DOWNSTREAM_NEXT_EVENT_TAG;
encode_int64(tag.time, &(buffer[1]));
encode_int32((int32_t)tag.microstep, &(buffer[1 + sizeof(int64_t)]));

if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(send_DNET, e->id, &tag);
}
if (write_to_socket(((federate_info_t*)e)->socket, message_length, buffer)) {
lf_print_error("RTI failed to send downstream next event tag to federate %d.", e->id);
e->state = NOT_CONNECTED;
} else {
e->last_DNET = tag;
LF_PRINT_LOG("RTI sent to federate %d the Downstream Next Event Tag (DNET) " PRINTF_TAG ".", e->id,
tag.time - start_time, tag.microstep);
}
}

void update_federate_next_event_tag_locked(uint16_t federate_id, tag_t next_event_tag) {
federate_info_t* fed = GET_FED_INFO(federate_id);
tag_t min_in_transit_tag = pqueue_tag_peek_tag(fed->in_transit_message_tags);
Expand Down Expand Up @@ -1200,31 +1229,29 @@ static int receive_connection_information(int* socket_id, uint16_t fed_id) {
} else {
federate_info_t* fed = GET_FED_INFO(fed_id);
// Read the number of upstream and downstream connections
fed->enclave.num_upstream = extract_int32(&(connection_info_header[1]));
fed->enclave.num_downstream = extract_int32(&(connection_info_header[1 + sizeof(int32_t)]));
LF_PRINT_DEBUG("RTI got %d upstreams and %d downstreams from federate %d.", fed->enclave.num_upstream,
fed->enclave.num_downstream, fed_id);
fed->enclave.num_immediate_upstreams = extract_int32(&(connection_info_header[1]));
fed->enclave.num_immediate_downstreams = extract_int32(&(connection_info_header[1 + sizeof(int32_t)]));
LF_PRINT_DEBUG("RTI got %d upstreams and %d downstreams from federate %d.", fed->enclave.num_immediate_upstreams,
fed->enclave.num_immediate_downstreams, fed_id);

// Allocate memory for the upstream and downstream pointers
if (fed->enclave.num_upstream > 0) {
fed->enclave.upstream = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_upstream);
LF_ASSERT_NON_NULL(fed->enclave.upstream);
if (fed->enclave.num_immediate_upstreams > 0) {
fed->enclave.immediate_upstreams = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_immediate_upstreams);
// Allocate memory for the upstream delay pointers
fed->enclave.upstream_delay = (interval_t*)malloc(sizeof(interval_t) * fed->enclave.num_upstream);
LF_ASSERT_NON_NULL(fed->enclave.upstream_delay);
fed->enclave.immediate_upstream_delays =
(interval_t*)malloc(sizeof(interval_t) * fed->enclave.num_immediate_upstreams);
} else {
fed->enclave.upstream = (uint16_t*)NULL;
fed->enclave.upstream_delay = (interval_t*)NULL;
fed->enclave.immediate_upstreams = (uint16_t*)NULL;
fed->enclave.immediate_upstream_delays = (interval_t*)NULL;
}
if (fed->enclave.num_downstream > 0) {
fed->enclave.downstream = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_downstream);
LF_ASSERT_NON_NULL(fed->enclave.downstream);
if (fed->enclave.num_immediate_downstreams > 0) {
fed->enclave.immediate_downstreams = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_immediate_downstreams);
} else {
fed->enclave.downstream = (uint16_t*)NULL;
fed->enclave.immediate_downstreams = (uint16_t*)NULL;
}

size_t connections_info_body_size = ((sizeof(uint16_t) + sizeof(int64_t)) * fed->enclave.num_upstream) +
(sizeof(uint16_t) * fed->enclave.num_downstream);
size_t connections_info_body_size = ((sizeof(uint16_t) + sizeof(int64_t)) * fed->enclave.num_immediate_upstreams) +
(sizeof(uint16_t) * fed->enclave.num_immediate_downstreams);
unsigned char* connections_info_body = NULL;
if (connections_info_body_size > 0) {
connections_info_body = (unsigned char*)malloc(connections_info_body_size);
Expand All @@ -1235,16 +1262,16 @@ static int receive_connection_information(int* socket_id, uint16_t fed_id) {
// Keep track of where we are in the buffer
size_t message_head = 0;
// First, read the info about upstream federates
for (int i = 0; i < fed->enclave.num_upstream; i++) {
fed->enclave.upstream[i] = extract_uint16(&(connections_info_body[message_head]));
for (int i = 0; i < fed->enclave.num_immediate_upstreams; i++) {
fed->enclave.immediate_upstreams[i] = extract_uint16(&(connections_info_body[message_head]));
message_head += sizeof(uint16_t);
fed->enclave.upstream_delay[i] = extract_int64(&(connections_info_body[message_head]));
fed->enclave.immediate_upstream_delays[i] = extract_int64(&(connections_info_body[message_head]));
message_head += sizeof(int64_t);
}

// Next, read the info about downstream federates
for (int i = 0; i < fed->enclave.num_downstream; i++) {
fed->enclave.downstream[i] = extract_uint16(&(connections_info_body[message_head]));
for (int i = 0; i < fed->enclave.num_immediate_downstreams; i++) {
fed->enclave.immediate_downstreams[i] = extract_uint16(&(connections_info_body[message_head]));
message_head += sizeof(uint16_t);
}

Expand Down Expand Up @@ -1596,6 +1623,7 @@ void initialize_RTI(rti_remote_t* rti) {
rti_remote->clock_sync_exchanges_per_interval = 10;
rti_remote->authentication_enabled = false;
rti_remote->base.tracing_enabled = false;
rti_remote->base.dnet_disabled = false;
rti_remote->stop_in_progress = false;
}

Expand All @@ -1606,17 +1634,15 @@ void clock_sync_add_offset(instant_t* t) { (void)t; }
void clock_sync_subtract_offset(instant_t* t) { (void)t; }

void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) {
invalidate_min_delays();
for (uint16_t i = 0; i < number_of_scheduling_nodes; i++) {
scheduling_node_t* node = scheduling_nodes[i];
if (node->upstream != NULL) {
free(node->upstream);
free(node->upstream_delay);
}
if (node->min_delays != NULL) {
free(node->min_delays);
if (node->immediate_upstreams != NULL) {
free(node->immediate_upstreams);
free(node->immediate_upstream_delays);
}
if (node->downstream != NULL) {
free(node->downstream);
if (node->immediate_downstreams != NULL) {
free(node->immediate_downstreams);
}
free(node);
}
Expand Down
1 change: 1 addition & 0 deletions core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ typedef struct rti_remote_t {
* Boolean indicating that authentication is enabled.
*/
bool authentication_enabled;

/**
* Boolean indicating that a stop request is already in progress.
*/
Expand Down
Loading

0 comments on commit 856e98a

Please sign in to comment.