diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index 17d73e93e..c08890ac7 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -136,6 +136,7 @@ void usage(int argc, const char* argv[]) { lf_print(" clock sync attempt (default is 10). Applies to 'init' and 'on'.\n"); lf_print(" -a, --auth Turn on HMAC authentication options.\n"); lf_print(" -t, --tracing Turn on tracing.\n"); + lf_print(" -d, --disable_dnet Turn off the use of DNET signals.\n"); lf_print("Command given:"); for (int i = 0; i < argc; i++) { @@ -263,6 +264,8 @@ int process_args(int argc, const char* argv[]) { rti.authentication_enabled = true; } else if (strcmp(argv[i], "-t") == 0 || strcmp(argv[i], "--tracing") == 0) { rti.base.tracing_enabled = true; + } else if (strcmp(argv[i], "-d") == 0 || strcmp(argv[i], "--dnet_disabled") == 0) { + rti.base.dnet_disabled = true; } else if (strcmp(argv[i], " ") == 0) { // Tolerate spaces continue; @@ -316,9 +319,9 @@ int main(int argc, const char* argv[]) { assert(rti.base.number_of_scheduling_nodes < UINT16_MAX); // Allocate memory for the federates - rti.base.scheduling_nodes = - (scheduling_node_t**)calloc(rti.base.number_of_scheduling_nodes, sizeof(scheduling_node_t*)); - for (uint16_t i = 0; i < rti.base.number_of_scheduling_nodes; i++) { + int n = rti.base.number_of_scheduling_nodes; + rti.base.scheduling_nodes = (scheduling_node_t**)calloc(n, sizeof(scheduling_node_t*)); + for (uint16_t i = 0; i < n; i++) { federate_info_t* fed_info = (federate_info_t*)calloc(1, sizeof(federate_info_t)); initialize_federate(fed_info, i); rti.base.scheduling_nodes[i] = (scheduling_node_t*)fed_info; diff --git a/core/federated/RTI/rti_common.c b/core/federated/RTI/rti_common.c index 3a1b16fab..f99edb543 100644 --- a/core/federated/RTI/rti_common.c +++ b/core/federated/RTI/rti_common.c @@ -20,6 +20,7 @@ extern instant_t start_time; void initialize_rti_common(rti_common_t* _rti_common) { rti_common = _rti_common; + rti_common->min_delays = NULL; rti_common->max_stop_tag = NEVER_TAG; rti_common->number_of_scheduling_nodes = 0; rti_common->num_scheduling_nodes_handling_stop = 0; @@ -33,12 +34,15 @@ void initialize_rti_common(rti_common_t* _rti_common) { #define IS_IN_ZERO_DELAY_CYCLE 1 #define IS_IN_CYCLE 2 -void invalidate_min_delays_upstream(scheduling_node_t* node) { - if (node->min_delays != NULL) - free(node->min_delays); - node->min_delays = NULL; - node->num_min_delays = 0; - node->flags = 0; // All flags cleared because they get set lazily. +void invalidate_min_delays() { + if (rti_common->min_delays != NULL) { + uint16_t n = rti_common->number_of_scheduling_nodes; + for (uint16_t i = 0; i < n; i++) { + scheduling_node_t* node = rti_common->scheduling_nodes[i]; + node->flags = 0; // All flags cleared because they get set lazily. + } + free(rti_common->min_delays); + } } void initialize_scheduling_node(scheduling_node_t* e, uint16_t id) { @@ -47,14 +51,15 @@ void initialize_scheduling_node(scheduling_node_t* e, uint16_t id) { e->last_granted = NEVER_TAG; e->last_provisionally_granted = NEVER_TAG; e->next_event = NEVER_TAG; + e->last_DNET = NEVER_TAG; e->state = NOT_CONNECTED; - e->upstream = NULL; - e->upstream_delay = NULL; - e->num_upstream = 0; - e->downstream = NULL; - e->num_downstream = 0; + e->immediate_upstreams = NULL; + e->immediate_upstream_delays = NULL; + e->num_immediate_upstreams = 0; + e->immediate_downstreams = NULL; + e->num_immediate_downstreams = 0; e->mode = REALTIME; - invalidate_min_delays_upstream(e); + e->flags = 0; } void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) { @@ -68,8 +73,8 @@ void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) { enclave->completed.time - start_time, enclave->completed.microstep); // Check downstream scheduling_nodes to see whether they should now be granted a TAG. - for (int i = 0; i < enclave->num_downstream; i++) { - scheduling_node_t* downstream = rti_common->scheduling_nodes[enclave->downstream[i]]; + for (int i = 0; i < enclave->num_immediate_downstreams; i++) { + scheduling_node_t* downstream = rti_common->scheduling_nodes[enclave->immediate_downstreams[i]]; // Notify downstream enclave if appropriate. notify_advance_grant_if_safe(downstream); bool* visited = (bool*)calloc(rti_common->number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0. @@ -85,35 +90,38 @@ tag_t earliest_future_incoming_message_tag(scheduling_node_t* e) { // First, we need to find the shortest path (minimum delay) path to each upstream node // and then find the minimum of the node's recorded NET plus the minimum path delay. // Update the shortest paths, if necessary. - update_min_delays_upstream(e); + update_min_delays(); // Next, find the tag of the earliest possible incoming message from upstream enclaves or // federates, which will be the smallest upstream NET plus the least delay. // This could be NEVER_TAG if the RTI has not seen a NET from some upstream node. tag_t t_d = FOREVER_TAG; - for (int i = 0; i < e->num_min_delays; i++) { - // Node e->min_delays[i].id is upstream of e with min delay e->min_delays[i].min_delay. - scheduling_node_t* upstream = rti_common->scheduling_nodes[e->min_delays[i].id]; - // If we haven't heard from the upstream node, then assume it can send an event at the start time. - if (lf_tag_compare(upstream->next_event, NEVER_TAG) == 0) { - tag_t start_tag = {.time = start_time, .microstep = 0}; - upstream->next_event = start_tag; - } - // The min_delay here is a tag_t, not an interval_t because it may account for more than - // one connection. No delay at all is represented by (0,0). A delay of 0 is represented - // by (0,1). If the time part of the delay is greater than 0, then we want to ignore - // the microstep in upstream->next_event because that microstep will have been lost. - // Otherwise, we want preserve it and add to it. This is handled by lf_tag_add(). - tag_t earliest_tag_from_upstream = lf_tag_add(upstream->next_event, e->min_delays[i].min_delay); - - /* Following debug message is too verbose for normal use: - LF_PRINT_DEBUG("RTI: Earliest next event upstream of fed/encl %d at fed/encl %d has tag " PRINTF_TAG ".", - e->id, - upstream->id, - earliest_tag_from_upstream.time - start_time, earliest_tag_from_upstream.microstep); - */ - if (lf_tag_compare(earliest_tag_from_upstream, t_d) < 0) { - t_d = earliest_tag_from_upstream; + int n = rti_common->number_of_scheduling_nodes; + for (int i = 0; i < n; i++) { + if (lf_tag_compare(rti_common->min_delays[i * n + e->id], FOREVER_TAG) != 0) { + // Node i is upstream of e with min delay rti_common->min_delays[i * n + e->id] + scheduling_node_t* upstream = rti_common->scheduling_nodes[i]; + // If we haven't heard from the upstream node, then assume it can send an event at the start time. + if (lf_tag_compare(upstream->next_event, NEVER_TAG) == 0) { + tag_t start_tag = {.time = start_time, .microstep = 0}; + upstream->next_event = start_tag; + } + // The min_delay here is a tag_t, not an interval_t because it may account for more than + // one connection. No delay at all is represented by (0,0). A delay of 0 is represented + // by (0,1). If the time part of the delay is greater than 0, then we want to ignore + // the microstep in upstream->next_event because that microstep will have been lost. + // Otherwise, we want preserve it and add to it. This is handled by lf_tag_add(). + tag_t earliest_tag_from_upstream = lf_tag_add(upstream->next_event, rti_common->min_delays[i * n + e->id]); + + /* Following debug message is too verbose for normal use: + LF_PRINT_DEBUG("RTI: Earliest next event upstream of fed/encl %d at fed/encl %d has tag " PRINTF_TAG ".", + e->id, + upstream->id, + earliest_tag_from_upstream.time - start_time, earliest_tag_from_upstream.microstep); + */ + if (lf_tag_compare(earliest_tag_from_upstream, t_d) < 0) { + t_d = earliest_tag_from_upstream; + } } } return t_d; @@ -125,8 +133,8 @@ tag_t eimt_strict(scheduling_node_t* e) { // This will be the smallest upstream NET plus the least delay. // This could be NEVER_TAG if the RTI has not seen a NET from some upstream node. tag_t t_d = FOREVER_TAG; - for (int i = 0; i < e->num_upstream; i++) { - scheduling_node_t* upstream = rti_common->scheduling_nodes[e->upstream[i]]; + for (int i = 0; i < e->num_immediate_upstreams; i++) { + scheduling_node_t* upstream = rti_common->scheduling_nodes[e->immediate_upstreams[i]]; // Skip this node if it is part of a zero-delay cycle. if (is_in_zero_delay_cycle(upstream)) continue; @@ -142,7 +150,7 @@ tag_t eimt_strict(scheduling_node_t* e) { if (lf_tag_compare(upstream->next_event, earliest) < 0) { earliest = upstream->next_event; } - tag_t earliest_tag_from_upstream = lf_delay_tag(earliest, e->upstream_delay[i]); + tag_t earliest_tag_from_upstream = lf_delay_tag(earliest, e->immediate_upstream_delays[i]); LF_PRINT_DEBUG("RTI: Strict EIMT of fed/encl %d at fed/encl %d has tag " PRINTF_TAG ".", e->id, upstream->id, earliest_tag_from_upstream.time - start_time, earliest_tag_from_upstream.microstep); if (lf_tag_compare(earliest_tag_from_upstream, t_d) < 0) { @@ -158,8 +166,8 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { // Find the earliest LTC of upstream scheduling_nodes (M). tag_t min_upstream_completed = FOREVER_TAG; - for (int j = 0; j < e->num_upstream; j++) { - scheduling_node_t* upstream = rti_common->scheduling_nodes[e->upstream[j]]; + for (int j = 0; j < e->num_immediate_upstreams; j++) { + scheduling_node_t* upstream = rti_common->scheduling_nodes[e->immediate_upstreams[j]]; // Ignore this enclave/federate if it is not connected. if (upstream->state == NOT_CONNECTED) @@ -168,7 +176,7 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { // Adjust by the "after" delay. // Note that "no delay" is encoded as NEVER, // whereas one microstep delay is encoded as 0LL. - tag_t candidate = lf_delay_strict(upstream->completed, e->upstream_delay[j]); + tag_t candidate = lf_delay_strict(upstream->completed, e->immediate_upstream_delays[j]); if (lf_tag_compare(candidate, min_upstream_completed) < 0) { min_upstream_completed = candidate; @@ -217,7 +225,7 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { "(adjusted by after delay). Granting tag advance (TAG) for " PRINTF_TAG, e->id, t_d.time - lf_time_start(), t_d.microstep, e->next_event.time - lf_time_start(), e->next_event.microstep); - result.tag = e->next_event; + result.tag = lf_tag_latest_earlier(t_d); } else if ( // Scenario (2) above lf_tag_compare(t_d, e->next_event) == 0 // EIMT equal to NET && is_in_zero_delay_cycle(e) // The node is part of a ZDC @@ -239,8 +247,8 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { void notify_downstream_advance_grant_if_safe(scheduling_node_t* e, bool visited[]) { visited[e->id] = true; - for (int i = 0; i < e->num_downstream; i++) { - scheduling_node_t* downstream = rti_common->scheduling_nodes[e->downstream[i]]; + for (int i = 0; i < e->num_immediate_downstreams; i++) { + scheduling_node_t* downstream = rti_common->scheduling_nodes[e->immediate_downstreams[i]]; if (visited[downstream->id]) continue; notify_advance_grant_if_safe(downstream); @@ -257,18 +265,37 @@ void update_scheduling_node_next_event_tag_locked(scheduling_node_t* e, tag_t ne // Check to see whether we can reply now with a tag advance grant. // If the enclave has no upstream scheduling_nodes, then it does not wait for // nor expect a reply. It just proceeds to advance time. - if (e->num_upstream > 0) { + if (e->num_immediate_upstreams > 0) { notify_advance_grant_if_safe(e); } else { // Even though there was no grant, mark the tag as if there was. e->last_granted = next_event_tag; } + + update_min_delays(); // Check downstream scheduling_nodes to see whether they should now be granted a TAG. - // To handle cycles, need to create a boolean array to keep - // track of which downstream scheduling_nodes have been visited. - bool* visited = (bool*)calloc(rti_common->number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0. - notify_downstream_advance_grant_if_safe(e, visited); - free(visited); + int n = rti_common->number_of_scheduling_nodes; + for (int j = 0; j < n; j++) { + if (lf_tag_compare(rti_common->min_delays[e->id * n + j], FOREVER_TAG) != 0) { + // The node j is a downstream node of e. + scheduling_node_t* downstream = rti_common->scheduling_nodes[j]; + notify_advance_grant_if_safe(downstream); + } + } + + if (!rti_common->dnet_disabled) { + // Send DNET to the node e's upstream federates if needed + for (int i = 0; i < n; i++) { + if (lf_tag_compare(rti_common->min_delays[i * n + e->id], FOREVER_TAG) != 0 && i != e->id) { + // The node i is an upstream node of e. + scheduling_node_t* upstream = rti_common->scheduling_nodes[i]; + tag_t dnet = downstream_next_event_tag(upstream, e->id); + if (lf_tag_compare(upstream->last_DNET, dnet) != 0 && lf_tag_compare(upstream->next_event, dnet) <= 0) { + notify_downstream_next_event_tag(upstream, dnet); + } + } + } + } } void notify_advance_grant_if_safe(scheduling_node_t* e) { @@ -303,32 +330,33 @@ static void _update_min_delays_upstream(scheduling_node_t* end, scheduling_node_ // NOTE: It would be better to iterate through these sorted by minimum delay, // but for most programs, the gain might be negligible since there are relatively few // upstream nodes. - for (int i = 0; i < intermediate->num_upstream; i++) { + for (int i = 0; i < intermediate->num_immediate_upstreams; i++) { // Add connection delay to path delay so far. Because tag addition is not commutative, // the calculation order should be carefully handled. Specifically, we should calculate // intermediate->upstream_delay[i] + delay_from_intermediate_so_far, // NOT delay_from_intermediate_so_far + intermediate->upstream_delay[i]. // Before calculating path delay, convert intermediate->upstream_delay[i] to a tag // cause there is no function that adds a tag to an interval. - tag_t connection_delay = lf_delay_tag(ZERO_TAG, intermediate->upstream_delay[i]); + tag_t connection_delay = lf_delay_tag(ZERO_TAG, intermediate->immediate_upstream_delays[i]); tag_t path_delay = lf_tag_add(connection_delay, delay_from_intermediate_so_far); // If the path delay is less than the so-far recorded path delay from upstream, update upstream. - if (lf_tag_compare(path_delay, path_delays[intermediate->upstream[i]]) < 0) { - if (path_delays[intermediate->upstream[i]].time == FOREVER) { + if (lf_tag_compare(path_delay, path_delays[intermediate->immediate_upstreams[i]]) < 0) { + if (path_delays[intermediate->immediate_upstreams[i]].time == FOREVER) { // Found a finite path. *count = *count + 1; } - path_delays[intermediate->upstream[i]] = path_delay; + path_delays[intermediate->immediate_upstreams[i]] = path_delay; // Since the path delay to upstream has changed, recursively update those upstream of it. // Do not do this, however, if the upstream node is the end node because this means we have // completed a cycle. - if (end->id != intermediate->upstream[i]) { - _update_min_delays_upstream(end, rti_common->scheduling_nodes[intermediate->upstream[i]], path_delays, count); + if (end->id != intermediate->immediate_upstreams[i]) { + _update_min_delays_upstream(end, rti_common->scheduling_nodes[intermediate->immediate_upstreams[i]], + path_delays, count); } else { // Found a cycle. end->flags = end->flags | IS_IN_CYCLE; // Is it a zero-delay cycle? - if (lf_tag_compare(path_delay, ZERO_TAG) == 0 && intermediate->upstream_delay[i] < 0) { + if (lf_tag_compare(path_delay, ZERO_TAG) == 0 && intermediate->immediate_upstream_delays[i] < 0) { end->flags = end->flags | IS_IN_ZERO_DELAY_CYCLE; } else { // Clear the flag. @@ -339,51 +367,137 @@ static void _update_min_delays_upstream(scheduling_node_t* end, scheduling_node_ } } -void update_min_delays_upstream(scheduling_node_t* node) { +void update_min_delays() { + int n = rti_common->number_of_scheduling_nodes; // Check whether cached result is valid. - if (node->min_delays == NULL) { + if (rti_common->min_delays == NULL) { + // Allocate memory for the array of min_delays. + rti_common->min_delays = (tag_t*)calloc((n * n), sizeof(tag_t)); + + for (int j = 0; j < n; j++) { + // This is not Dijkstra's algorithm, but rather one optimized for sparse upstream nodes. + // There must be a name for this algorithm. + + scheduling_node_t* node = rti_common->scheduling_nodes[j]; + // Array of results on the stack: + tag_t path_delays[n]; + // This will be the number of non-FOREVER entries put into path_delays. + size_t count = 0; + + for (int i = 0; i < n; i++) { + path_delays[i] = FOREVER_TAG; + } + _update_min_delays_upstream(node, NULL, path_delays, &count); + + // Put the results onto the matrix. + LF_PRINT_DEBUG("++++ Node %hu is in ZDC: %d", node->id, is_in_zero_delay_cycle(node)); + int k = 0; + for (int i = 0; i < n; i++) { + rti_common->min_delays[i * n + j] = path_delays[i]; + if (lf_tag_compare(path_delays[i], FOREVER_TAG) < 0) { + // Node i is upstream. + if (k >= count) { + lf_print_error_and_exit("Internal error! Count of upstream nodes %zu for node %d is wrong!", count, i); + } + // N^2 debug statement could be a problem with large benchmarks. + LF_PRINT_DEBUG("++++ Node %hu is upstream with delay" PRINTF_TAG "\n", i, path_delays[i].time, + path_delays[i].microstep); + } + } + } + } +} + +tag_t get_dnet_candidate(tag_t next_event_tag, tag_t minimum_delay) { + // (A.t, A.m) - (B.t - B.m) + // B cannot be NEVER_TAG as (0, 0) denotes no delay. + // Also, we assume B is not FOREVER_TAG because FOREVER_TAG delay means that there is no connection. + // 1) If A.t = NEVER, return NEVER. + // 2) If A.t = FOREVER, return FOREVER. + // 3) A.t is not NEVER neither FOREVER + // a) If A < B (A.t < B.t or A.t == B.t and A.m < B.m) return NEVER + // b) A >= B + // i) If A.t >= B.t = 0 and A.m >= B.m return (A.t - B.t, A.m - B.m) + // ii) If A.t >= B.t > 0 and A.m >= B.m return (A.t - B.t, UINT_MAX) + // iii) If A.t >= B.t > 0 and A.m < B.m return (A.t - B.t - 1, UINT_MAX) + + if (next_event_tag.time == NEVER || lf_tag_compare(next_event_tag, minimum_delay) < 0) + return NEVER_TAG; + if (next_event_tag.time == FOREVER) + return FOREVER_TAG; + tag_t result = {.time = next_event_tag.time - minimum_delay.time, + .microstep = next_event_tag.microstep - minimum_delay.microstep}; + if (next_event_tag.microstep < minimum_delay.microstep) { + result.time -= 1; + result.microstep = UINT_MAX; + } else { + if (minimum_delay.time == 0) { + // + } else { + result.microstep = UINT_MAX; + } + } + return result; +} - // This is not Dijkstra's algorithm, but rather one optimized for sparse upstream nodes. - // There must be a name for this algorithm. +tag_t downstream_next_event_tag(scheduling_node_t* target_node, uint16_t node_sending_new_NET_id) { + if (is_in_zero_delay_cycle(target_node)) { + return NEVER_TAG; + } - // Array of results on the stack: - tag_t path_delays[rti_common->number_of_scheduling_nodes]; - // This will be the number of non-FOREVER entries put into path_delays. - size_t count = 0; + tag_t result = FOREVER_TAG; + scheduling_node_t* node_sending_new_NET = rti_common->scheduling_nodes[node_sending_new_NET_id]; + if (is_in_zero_delay_cycle(node_sending_new_NET)) { + return NEVER_TAG; + } - for (int i = 0; i < rti_common->number_of_scheduling_nodes; i++) { - path_delays[i] = FOREVER_TAG; - } - _update_min_delays_upstream(node, NULL, path_delays, &count); - - // Put the results onto the node's struct. - node->num_min_delays = count; - node->min_delays = (minimum_delay_t*)calloc(count, sizeof(minimum_delay_t)); - LF_PRINT_DEBUG("++++ Node %hu is in ZDC: %d", node->id, is_in_zero_delay_cycle(node)); - int k = 0; - for (int i = 0; i < rti_common->number_of_scheduling_nodes; i++) { - if (lf_tag_compare(path_delays[i], FOREVER_TAG) < 0) { - // Node i is upstream. - if (k >= count) { - lf_print_error_and_exit("Internal error! Count of upstream nodes %zu for node %d is wrong!", count, i); + int n = rti_common->number_of_scheduling_nodes; + int index = target_node->id * n + node_sending_new_NET_id; + tag_t candidate = get_dnet_candidate(node_sending_new_NET->next_event, rti_common->min_delays[index]); + + if (lf_tag_compare(target_node->last_DNET, candidate) >= 0) { + // This function is called because a downstream node of target_node sent a new NET. + // If the candidate computed by that downstream node is earlier than or equal to the last DNET, + // this candidate must be the minimum among every candidate. + // Else if this candidate is later than the last DNET, we need to loop up every downstream node + // to compute the DNET value. + result = candidate; + } else { + for (int j = 0; j < n; j++) { + if (target_node->id != j && (lf_tag_compare(rti_common->min_delays[target_node->id * n + j], FOREVER_TAG) != 0)) { + // The node j is a downstream node and not the target node itself. + scheduling_node_t* target_dowstream = rti_common->scheduling_nodes[j]; + // if (is_in_zero_delay_cycle(target_dowstream)) { + // // The target node is an upstream of ZDC. Do not send DNET to this node. + // return NEVER_TAG; + // } + + // Minimum tag increment between the node and its downstream node. + tag_t delay = rti_common->min_delays[target_node->id * n + j]; + candidate = get_dnet_candidate(target_dowstream->next_event, delay); + + if (lf_tag_compare(result, candidate) > 0) { + result = candidate; } - minimum_delay_t min_delay = {.id = i, .min_delay = path_delays[i]}; - node->min_delays[k++] = min_delay; - // N^2 debug statement could be a problem with large benchmarks. - // LF_PRINT_DEBUG("++++ Node %hu is upstream with delay" PRINTF_TAG "\n", i, path_delays[i].time, - // path_delays[i].microstep); } } } + if (result.time < start_time) { + // DNET with the time smaller than the start time acts as the same as DNET of the NEVER tag. + // Thus, set the result as NEVER_TAG to prevent sending unnecessary DNETs. + result = NEVER_TAG; + } + + return result; } bool is_in_zero_delay_cycle(scheduling_node_t* node) { - update_min_delays_upstream(node); + update_min_delays(); return node->flags & IS_IN_ZERO_DELAY_CYCLE; } bool is_in_cycle(scheduling_node_t* node) { - update_min_delays_upstream(node); + update_min_delays(); return node->flags & IS_IN_CYCLE; } diff --git a/core/federated/RTI/rti_common.h b/core/federated/RTI/rti_common.h index 615ca8a8a..3d87dc301 100644 --- a/core/federated/RTI/rti_common.h +++ b/core/federated/RTI/rti_common.h @@ -53,17 +53,16 @@ typedef struct scheduling_node_t { tag_t last_granted; // The maximum TAG that has been granted so far (or NEVER if none granted) tag_t last_provisionally_granted; // The maximum PTAG that has been provisionally granted (or NEVER if none granted) tag_t next_event; // Most recent NET received from the scheduling node (or NEVER if none received). + tag_t last_DNET; // Most recent DNET. scheduling_node_state_t state; // State of the scheduling node. - uint16_t* upstream; // Array of upstream scheduling node ids. - interval_t* upstream_delay; // Minimum delay on connections from upstream scheduling nodes. - // Here, NEVER encodes no delay. 0LL is a microstep delay. - int num_upstream; // Size of the array of upstream scheduling nodes and delays. - uint16_t* downstream; // Array of downstream scheduling node ids. - int num_downstream; // Size of the array of downstream scheduling nodes. - execution_mode_t mode; // FAST or REALTIME. - minimum_delay_t* min_delays; // Array of minimum delays from upstream nodes, not including this node. - size_t num_min_delays; // Size of min_delays array. - int flags; // Or of IS_IN_ZERO_DELAY_CYCLE, IS_IN_CYCLE + uint16_t* immediate_upstreams; // Array of immediate upstream scheduling node ids. + interval_t* immediate_upstream_delays; // Minimum delay on connections from immdediate upstream scheduling nodes. + // Here, NEVER encodes no delay. 0LL is a microstep delay. + uint16_t num_immediate_upstreams; // Size of the array of immediate upstream scheduling nodes and delays. + uint16_t* immediate_downstreams; // Array of immediate downstream scheduling node ids. + uint16_t num_immediate_downstreams; // Size of the array of immediate downstream scheduling nodes. + execution_mode_t mode; // FAST or REALTIME. + int flags; // One of IS_IN_ZERO_DELAY_CYCLE, IS_IN_CYCLE } scheduling_node_t; /** @@ -76,7 +75,13 @@ typedef struct rti_common_t { scheduling_node_t** scheduling_nodes; // Number of scheduling nodes - int32_t number_of_scheduling_nodes; + uint16_t number_of_scheduling_nodes; + + // Matrix of minimum delays between pairs of nodes. + // Rows represent upstream nodes and Columns represent downstream nodes. + // FOREVER_TAG means there is no path, and ZERO_TAG means there is no delay. + // This could be NULL if the matrix is not being used, so accesses should test for NULL first. + tag_t* min_delays; // RTI's decided stop tag for the scheduling nodes tag_t max_stop_tag; @@ -87,6 +92,9 @@ typedef struct rti_common_t { // Boolean indicating that tracing is enabled. bool tracing_enabled; + // Boolean indicating that DNET is enabled. + bool dnet_disabled; + // The RTI mutex for making thread-safe access to the shared state. lf_mutex_t* mutex; } rti_common_t; @@ -246,34 +254,66 @@ tag_t earliest_future_incoming_message_tag(scheduling_node_t* e); tag_t eimt_strict(scheduling_node_t* e); /** - * Return true if the node is in a zero-delay cycle. + * If necessary, update the `min_delays` and the fields that indicate cycles. + * These fields will be updated only if they have not been previously updated or if invalidate_min_delays + * has been called since they were last updated. * @param node The node. */ -bool is_in_zero_delay_cycle(scheduling_node_t* node); +void update_min_delays(); /** - * Return true if the node is in a cycle (possibly a zero-delay cycle). - * @param node The node. + * Find the tag g that is the latest tag that satisfies lf_tag_add(g, minimum_delay) < next_event_tag. + * This function behaves like the tag subtraction, next_event_tag - minimum_delay. + * minimum_delay cannot be NEVER. + * + * This function is called in function downstream_next_event_tag. + * @param next_event_tag The next event tag of a downstream node. + * @param minimum_delay The minimum delay between the target upstream node and the downstream node. */ -bool is_in_cycle(scheduling_node_t* node); +tag_t get_dnet_candidate(tag_t next_event_tag, tag_t minimum_delay); /** - * For the given scheduling node (enclave or federate), if necessary, update the `min_delays`, - * `num_min_delays`, and the fields that indicate cycles. These fields will be - * updated only if they have not been previously updated or if invalidate_min_delays_upstream - * has been called since they were last updated. + * @brief Determine whether the specified scheduling node is needed to receive a downstream next event tag (DNET), + * and, if so, return the details. + * + * This function is called upon receiving a NET from one of the specified node's downstream nodes. + * + * This function calculates the minimum tag M over + * all downstream scheduling nodes of the most recent NET from that node minus the "after delay" (see function + * get_dnet_candidate). If M is earlier than the startup tag, then set the result as the NEVER_TAG. + * + * @param node The target node that may receive a new DNET. + * @param node_sending_new_net_id The ID of the node that sends a new NET. If this node's new NET does not + * change the DNET value, we can exit this function immediately. If it does, we have to look up the target node's + * downstream federates to compute the exact new DNET value. + * @return If needed, return the tag value. Otherwise, return the NEVER_TAG. + */ +tag_t downstream_next_event_tag(scheduling_node_t* node, uint16_t node_sending_new_net_id); + +/** + * Notify a downstream next event tag (DNET) signal to the specified scheduling node. + * @param e The target node. + * @param tag The downstream next event tag for e. + */ +void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag); + +/** + * Return true if the node is in a zero-delay cycle. * @param node The node. */ -void update_min_delays_upstream(scheduling_node_t* node); +bool is_in_zero_delay_cycle(scheduling_node_t* node); /** - * For the given scheduling node (enclave or federate), invalidate the `min_delays`, - * `num_min_delays`, and the fields that indicate cycles. - * This should be called whenever the structure of the connections upstream of the - * given node have changed. + * Return true if the node is in a cycle (possibly a zero-delay cycle). * @param node The node. */ -void invalidate_min_delays_upstream(scheduling_node_t* node); +bool is_in_cycle(scheduling_node_t* node); + +/** + * Invalidate the `min_delays`, `num_min_delays`, and the fields that indicate cycles + * of all nodes. This should be called whenever the structure of the connections have changed. + */ +void invalidate_min_delays(); /** * Free dynamically allocated memory on the scheduling nodes and the scheduling node array itself. diff --git a/core/federated/RTI/rti_local.c b/core/federated/RTI/rti_local.c index 9e70137d5..f1a91e1cf 100644 --- a/core/federated/RTI/rti_local.c +++ b/core/federated/RTI/rti_local.c @@ -53,9 +53,9 @@ void initialize_local_rti(environment_t* envs, int num_envs) { rti_local->base.scheduling_nodes[i] = (scheduling_node_t*)enclave_info; // Encode the connection topology into the enclave_info object. - enclave_info->base.num_downstream = lf_get_downstream_of(i, &enclave_info->base.downstream); - enclave_info->base.num_upstream = lf_get_upstream_of(i, &enclave_info->base.upstream); - lf_get_upstream_delay_of(i, &enclave_info->base.upstream_delay); + enclave_info->base.num_immediate_downstreams = _lf_get_downstream_of(i, &enclave_info->base.immediate_downstreams); + enclave_info->base.num_immediate_upstreams = _lf_get_upstream_of(i, &enclave_info->base.immediate_upstreams); + _lf_get_upstream_delay_of(i, &enclave_info->base.immediate_upstream_delays); enclave_info->base.state = GRANTED; } @@ -112,7 +112,7 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) { } // If this enclave has no upstream, then we give a TAG till forever straight away. - if (e->base.num_upstream == 0) { + if (e->base.num_immediate_upstreams == 0) { LF_PRINT_LOG("RTI: enclave %u has no upstream. Giving it a to the NET", e->base.id); e->base.last_granted = next_event_tag; } @@ -191,6 +191,10 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { LF_PRINT_LOG("RTI: enclave %u callback with PTAG " PRINTF_TAG " ", e->id, tag.time - lf_time_start(), tag.microstep); } +void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) { + // Nothing to do here. +} + void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) { // Nothing to do here. } diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 6f705d2b9..7f3fd6f15 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -125,8 +125,8 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { // Note that this is transitive. // NOTE: This is not needed for enclaves because zero-delay loops are prohibited. // It's only needed for federates, which is why this is implemented here. - for (int j = 0; j < e->num_upstream; j++) { - scheduling_node_t* upstream = rti_remote->base.scheduling_nodes[e->upstream[j]]; + for (int j = 0; j < e->num_immediate_upstreams; j++) { + scheduling_node_t* upstream = rti_remote->base.scheduling_nodes[e->immediate_upstreams[j]]; // Ignore this federate if it has resigned. if (upstream->state == NOT_CONNECTED) @@ -146,6 +146,35 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { } } +void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) { + if (e->state == NOT_CONNECTED) { + return; + } + // Need to make sure that the destination federate's thread has already + // sent the starting MSG_TYPE_TIMESTAMP message. + while (e->state == PENDING) { + // Need to wait here. + lf_cond_wait(&sent_start_time); + } + size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t); + unsigned char buffer[message_length]; + buffer[0] = MSG_TYPE_DOWNSTREAM_NEXT_EVENT_TAG; + encode_int64(tag.time, &(buffer[1])); + encode_int32((int32_t)tag.microstep, &(buffer[1 + sizeof(int64_t)])); + + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_DNET, e->id, &tag); + } + if (write_to_socket(((federate_info_t*)e)->socket, message_length, buffer)) { + lf_print_error("RTI failed to send downstream next event tag to federate %d.", e->id); + e->state = NOT_CONNECTED; + } else { + e->last_DNET = tag; + LF_PRINT_LOG("RTI sent to federate %d the Downstream Next Event Tag (DNET) " PRINTF_TAG ".", e->id, + tag.time - start_time, tag.microstep); + } +} + void update_federate_next_event_tag_locked(uint16_t federate_id, tag_t next_event_tag) { federate_info_t* fed = GET_FED_INFO(federate_id); tag_t min_in_transit_tag = pqueue_tag_peek_tag(fed->in_transit_message_tags); @@ -1200,31 +1229,29 @@ static int receive_connection_information(int* socket_id, uint16_t fed_id) { } else { federate_info_t* fed = GET_FED_INFO(fed_id); // Read the number of upstream and downstream connections - fed->enclave.num_upstream = extract_int32(&(connection_info_header[1])); - fed->enclave.num_downstream = extract_int32(&(connection_info_header[1 + sizeof(int32_t)])); - LF_PRINT_DEBUG("RTI got %d upstreams and %d downstreams from federate %d.", fed->enclave.num_upstream, - fed->enclave.num_downstream, fed_id); + fed->enclave.num_immediate_upstreams = extract_int32(&(connection_info_header[1])); + fed->enclave.num_immediate_downstreams = extract_int32(&(connection_info_header[1 + sizeof(int32_t)])); + LF_PRINT_DEBUG("RTI got %d upstreams and %d downstreams from federate %d.", fed->enclave.num_immediate_upstreams, + fed->enclave.num_immediate_downstreams, fed_id); // Allocate memory for the upstream and downstream pointers - if (fed->enclave.num_upstream > 0) { - fed->enclave.upstream = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_upstream); - LF_ASSERT_NON_NULL(fed->enclave.upstream); + if (fed->enclave.num_immediate_upstreams > 0) { + fed->enclave.immediate_upstreams = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_immediate_upstreams); // Allocate memory for the upstream delay pointers - fed->enclave.upstream_delay = (interval_t*)malloc(sizeof(interval_t) * fed->enclave.num_upstream); - LF_ASSERT_NON_NULL(fed->enclave.upstream_delay); + fed->enclave.immediate_upstream_delays = + (interval_t*)malloc(sizeof(interval_t) * fed->enclave.num_immediate_upstreams); } else { - fed->enclave.upstream = (uint16_t*)NULL; - fed->enclave.upstream_delay = (interval_t*)NULL; + fed->enclave.immediate_upstreams = (uint16_t*)NULL; + fed->enclave.immediate_upstream_delays = (interval_t*)NULL; } - if (fed->enclave.num_downstream > 0) { - fed->enclave.downstream = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_downstream); - LF_ASSERT_NON_NULL(fed->enclave.downstream); + if (fed->enclave.num_immediate_downstreams > 0) { + fed->enclave.immediate_downstreams = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_immediate_downstreams); } else { - fed->enclave.downstream = (uint16_t*)NULL; + fed->enclave.immediate_downstreams = (uint16_t*)NULL; } - size_t connections_info_body_size = ((sizeof(uint16_t) + sizeof(int64_t)) * fed->enclave.num_upstream) + - (sizeof(uint16_t) * fed->enclave.num_downstream); + size_t connections_info_body_size = ((sizeof(uint16_t) + sizeof(int64_t)) * fed->enclave.num_immediate_upstreams) + + (sizeof(uint16_t) * fed->enclave.num_immediate_downstreams); unsigned char* connections_info_body = NULL; if (connections_info_body_size > 0) { connections_info_body = (unsigned char*)malloc(connections_info_body_size); @@ -1235,16 +1262,16 @@ static int receive_connection_information(int* socket_id, uint16_t fed_id) { // Keep track of where we are in the buffer size_t message_head = 0; // First, read the info about upstream federates - for (int i = 0; i < fed->enclave.num_upstream; i++) { - fed->enclave.upstream[i] = extract_uint16(&(connections_info_body[message_head])); + for (int i = 0; i < fed->enclave.num_immediate_upstreams; i++) { + fed->enclave.immediate_upstreams[i] = extract_uint16(&(connections_info_body[message_head])); message_head += sizeof(uint16_t); - fed->enclave.upstream_delay[i] = extract_int64(&(connections_info_body[message_head])); + fed->enclave.immediate_upstream_delays[i] = extract_int64(&(connections_info_body[message_head])); message_head += sizeof(int64_t); } // Next, read the info about downstream federates - for (int i = 0; i < fed->enclave.num_downstream; i++) { - fed->enclave.downstream[i] = extract_uint16(&(connections_info_body[message_head])); + for (int i = 0; i < fed->enclave.num_immediate_downstreams; i++) { + fed->enclave.immediate_downstreams[i] = extract_uint16(&(connections_info_body[message_head])); message_head += sizeof(uint16_t); } @@ -1596,6 +1623,7 @@ void initialize_RTI(rti_remote_t* rti) { rti_remote->clock_sync_exchanges_per_interval = 10; rti_remote->authentication_enabled = false; rti_remote->base.tracing_enabled = false; + rti_remote->base.dnet_disabled = false; rti_remote->stop_in_progress = false; } @@ -1606,17 +1634,15 @@ void clock_sync_add_offset(instant_t* t) { (void)t; } void clock_sync_subtract_offset(instant_t* t) { (void)t; } void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) { + invalidate_min_delays(); for (uint16_t i = 0; i < number_of_scheduling_nodes; i++) { scheduling_node_t* node = scheduling_nodes[i]; - if (node->upstream != NULL) { - free(node->upstream); - free(node->upstream_delay); - } - if (node->min_delays != NULL) { - free(node->min_delays); + if (node->immediate_upstreams != NULL) { + free(node->immediate_upstreams); + free(node->immediate_upstream_delays); } - if (node->downstream != NULL) { - free(node->downstream); + if (node->immediate_downstreams != NULL) { + free(node->immediate_downstreams); } free(node); } diff --git a/core/federated/RTI/rti_remote.h b/core/federated/RTI/rti_remote.h index de6b144aa..99e439588 100644 --- a/core/federated/RTI/rti_remote.h +++ b/core/federated/RTI/rti_remote.h @@ -153,6 +153,7 @@ typedef struct rti_remote_t { * Boolean indicating that authentication is enabled. */ bool authentication_enabled; + /** * Boolean indicating that a stop request is already in progress. */ diff --git a/core/federated/RTI/test/rti_common_test.c b/core/federated/RTI/test/rti_common_test.c index 107d08057..63affda5a 100644 --- a/core/federated/RTI/test/rti_common_test.c +++ b/core/federated/RTI/test/rti_common_test.c @@ -7,7 +7,7 @@ #include "tag.h" // The RTI under test. -static rti_common_t test_rti; +static rti_common_t test_RTI; /******************************************Start of Utility * Functions******************************************************/ @@ -17,16 +17,16 @@ static rti_common_t test_rti; * @param node The node to be freed */ void delete_scheduling_node(scheduling_node_t* node) { - if (node->upstream != NULL) { - free(node->upstream); + if (node->immediate_upstreams != NULL) { + free(node->immediate_upstreams); } - if (node->upstream_delay != NULL) { - free(node->upstream_delay); + if (node->immediate_upstream_delays != NULL) { + free(node->immediate_upstream_delays); } - if (node->downstream != NULL) { - free(node->downstream); + if (node->immediate_downstreams != NULL) { + free(node->immediate_downstreams); } - invalidate_min_delays_upstream(node); + free(node); } /** @@ -34,35 +34,35 @@ void delete_scheduling_node(scheduling_node_t* node) { * Before calling this function, reset_common_RTI should be called to * reset every scheduling nodes. * @param id The ID of the scheduling node. - * @param num_upstream The number of upstreams of the scheduling node. - * @param num_downstream The number of downstreams of the scheduling node. - * @param upstream The array of IDs from upstream nodes. - * @param upstream_delay The array of delays from upstream nodes. - * @param downstream The array of IDs from downstream nodes. + * @param num_immediate_upstreams The number of upstreams of the scheduling node. + * @param num_immediate_downstreams The number of downstreams of the scheduling node. + * @param immediate_upstreams The array of IDs from immediate upstream nodes. + * @param immediate_upstream_delays The array of delays from immediate upstream nodes. + * @param immediate_downstreams The array of IDs from immediate downstream nodes. */ -void set_scheduling_node(int id, int num_upstream, int num_downstream, int* upstream, interval_t* upstream_delay, - int* downstream) { - // Save the number of upstream and downstream nodes. - test_rti.scheduling_nodes[id]->num_upstream = num_upstream; - test_rti.scheduling_nodes[id]->num_downstream = num_downstream; - - // If there is any upstream nodes, store IDs and delays from the upstream nodes into the structure. - if (test_rti.scheduling_nodes[id]->num_upstream > 0) { - test_rti.scheduling_nodes[id]->upstream = - (uint16_t*)calloc(test_rti.scheduling_nodes[id]->num_upstream, sizeof(uint16_t)); - test_rti.scheduling_nodes[id]->upstream_delay = - (interval_t*)calloc(test_rti.scheduling_nodes[id]->num_upstream, sizeof(interval_t)); - for (int i = 0; i < test_rti.scheduling_nodes[id]->num_upstream; i++) { - test_rti.scheduling_nodes[id]->upstream[i] = upstream[i]; - test_rti.scheduling_nodes[id]->upstream_delay[i] = upstream_delay[i]; +void set_scheduling_node(int id, int num_immediate_upstreams, int num_immediate_downstreams, int* immediate_upstreams, + interval_t* immediate_upstream_delays, int* immediate_downstreams) { + // Save the number of immediate upstream and immediate downstream nodes. + test_RTI.scheduling_nodes[id]->num_immediate_upstreams = num_immediate_upstreams; + test_RTI.scheduling_nodes[id]->num_immediate_downstreams = num_immediate_downstreams; + + // If there is any immediate upstream nodes, store IDs and delays from the upstream nodes into the structure. + if (test_RTI.scheduling_nodes[id]->num_immediate_upstreams > 0) { + test_RTI.scheduling_nodes[id]->immediate_upstreams = + (uint16_t*)calloc(test_RTI.scheduling_nodes[id]->num_immediate_upstreams, sizeof(uint16_t)); + test_RTI.scheduling_nodes[id]->immediate_upstream_delays = + (interval_t*)calloc(test_RTI.scheduling_nodes[id]->num_immediate_upstreams, sizeof(interval_t)); + for (int i = 0; i < test_RTI.scheduling_nodes[id]->num_immediate_upstreams; i++) { + test_RTI.scheduling_nodes[id]->immediate_upstreams[i] = immediate_upstreams[i]; + test_RTI.scheduling_nodes[id]->immediate_upstream_delays[i] = immediate_upstream_delays[i]; } } - // If there is any downstream nodes, store IDs of the downstream nodes into the structure. - if (test_rti.scheduling_nodes[id]->num_downstream > 0) { - test_rti.scheduling_nodes[id]->downstream = - (uint16_t*)calloc(test_rti.scheduling_nodes[id]->num_downstream, sizeof(uint16_t)); - for (int i = 0; i < test_rti.scheduling_nodes[id]->num_downstream; i++) { - test_rti.scheduling_nodes[id]->downstream[i] = downstream[i]; + // If there is any immediate downstream nodes, store IDs of the downstream nodes into the structure. + if (test_RTI.scheduling_nodes[id]->num_immediate_downstreams > 0) { + test_RTI.scheduling_nodes[id]->immediate_downstreams = + (uint16_t*)calloc(test_RTI.scheduling_nodes[id]->num_immediate_downstreams, sizeof(uint16_t)); + for (int i = 0; i < test_RTI.scheduling_nodes[id]->num_immediate_downstreams; i++) { + test_RTI.scheduling_nodes[id]->immediate_downstreams[i] = immediate_downstreams[i]; } } } @@ -72,15 +72,15 @@ void set_scheduling_node(int id, int num_upstream, int num_downstream, int* upst * This includes freeing every scheduling node and the array of nodes. */ void reset_common_RTI() { + invalidate_min_delays(); // For every scheduling nodes, delete them and free themselves, too. - for (uint16_t i = 0; i < test_rti.number_of_scheduling_nodes; i++) { - delete_scheduling_node(test_rti.scheduling_nodes[i]); - free(test_rti.scheduling_nodes[i]); + for (uint16_t i = 0; i < test_RTI.number_of_scheduling_nodes; i++) { + delete_scheduling_node(test_RTI.scheduling_nodes[i]); } // Free the array of scheduling nodes either. This will be re-created // in set_common_RTI(). - if (test_rti.scheduling_nodes != NULL) { - free(test_rti.scheduling_nodes); + if (test_RTI.scheduling_nodes != NULL) { + free(test_RTI.scheduling_nodes); } } @@ -90,17 +90,18 @@ void reset_common_RTI() { * @param num_nodes The number of scheduling nodes. */ void set_common_RTI(uint16_t num_nodes) { - reset_common_RTI(); - - test_rti.number_of_scheduling_nodes = num_nodes; + test_RTI.number_of_scheduling_nodes = num_nodes; // Allocate memory for the scheduling nodes - test_rti.scheduling_nodes = - (scheduling_node_t**)calloc(test_rti.number_of_scheduling_nodes, sizeof(scheduling_node_t*)); - for (uint16_t i = 0; i < test_rti.number_of_scheduling_nodes; i++) { + test_RTI.scheduling_nodes = + (scheduling_node_t**)calloc(test_RTI.number_of_scheduling_nodes, sizeof(scheduling_node_t*)); + + test_RTI.min_delays = NULL; + + for (uint16_t i = 0; i < test_RTI.number_of_scheduling_nodes; i++) { scheduling_node_t* scheduling_node = (scheduling_node_t*)malloc(sizeof(scheduling_node_t)); initialize_scheduling_node(scheduling_node, i); - test_rti.scheduling_nodes[i] = scheduling_node; + test_RTI.scheduling_nodes[i] = scheduling_node; } } @@ -110,8 +111,8 @@ void set_common_RTI(uint16_t num_nodes) { * @param state The state that every scheduling node will have. */ void set_state_of_nodes(scheduling_node_state_t state) { - for (uint16_t i = 0; i < test_rti.number_of_scheduling_nodes; i++) { - test_rti.scheduling_nodes[i]->state = state; + for (uint16_t i = 0; i < test_RTI.number_of_scheduling_nodes; i++) { + test_RTI.scheduling_nodes[i]->state = state; } } /******************************************End of Utility @@ -119,6 +120,7 @@ void set_state_of_nodes(scheduling_node_state_t state) { void valid_cache() { set_common_RTI(2); + uint16_t n = test_RTI.number_of_scheduling_nodes; // Construct the structure illustrated below. // node[0] --> node[1] @@ -127,15 +129,19 @@ void valid_cache() { set_state_of_nodes(GRANTED); + test_RTI.min_delays = (tag_t*)calloc((n * n), sizeof(tag_t)); + test_RTI.min_delays[0] = (tag_t){.time = NSEC(1), .microstep = 0}; + // If min_delays is not null (the cached data is valid), nothing should be changed. - test_rti.scheduling_nodes[1]->num_min_delays = 1; - test_rti.scheduling_nodes[1]->min_delays = (minimum_delay_t*)calloc(1, sizeof(minimum_delay_t)); - update_min_delays_upstream(test_rti.scheduling_nodes[1]); - assert(test_rti.scheduling_nodes[1]->num_min_delays == 1); + update_min_delays(); + assert(lf_tag_compare(test_RTI.min_delays[0], (tag_t){.time = NSEC(1), .microstep = 0}) == 0); + + reset_common_RTI(); } void not_connected() { set_common_RTI(2); + uint16_t n = test_RTI.number_of_scheduling_nodes; // Construct the structure illustrated below. // node[0] --> node[1] @@ -144,13 +150,20 @@ void not_connected() { set_state_of_nodes(NOT_CONNECTED); - // If the nodes are not connected, num_min_delays should not be changed. - update_min_delays_upstream(test_rti.scheduling_nodes[1]); - assert(test_rti.scheduling_nodes[1]->num_min_delays == 0); + // If the nodes are not connected, the matrix should be filled with FOREVER_TAG. + update_min_delays(); + for (uint16_t i = 0; i < n; i++) { + for (uint16_t j = 0; j < n; j++) { + assert(lf_tag_compare(test_RTI.min_delays[i * n + j], FOREVER_TAG) == 0); + } + } + + reset_common_RTI(); } static void two_nodes_no_delay() { set_common_RTI(2); + uint16_t n = test_RTI.number_of_scheduling_nodes; // Construct the structure illustrated below. // node[0] --> node[1] @@ -159,18 +172,22 @@ static void two_nodes_no_delay() { set_state_of_nodes(GRANTED); - update_min_delays_upstream(test_rti.scheduling_nodes[0]); - assert(test_rti.scheduling_nodes[1]->num_min_delays == 0); // node[0] has no upstream nodes. + update_min_delays(); + // The min_delay from 0 to 0 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 0], FOREVER_TAG) == 0); + // The min_delay from 0 to 1 should be ZERO_TAG which means no delay. + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 1], ZERO_TAG) == 0); + // The min_delay from 1 to 0 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 0], FOREVER_TAG) == 0); + // The min_delay from 1 to 1 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 1], FOREVER_TAG) == 0); - update_min_delays_upstream(test_rti.scheduling_nodes[1]); - assert(test_rti.scheduling_nodes[1]->num_min_delays == 1); // node[1] has one upstream nodes. - assert(test_rti.scheduling_nodes[1]->min_delays[0].id == 0); // node[1]'s upstream node is node[0]. - // The min_delay between them is node[0] and node[1] which means no delay. - assert(lf_tag_compare(test_rti.scheduling_nodes[1]->min_delays[0].min_delay, ZERO_TAG) == 0); + reset_common_RTI(); } static void two_nodes_zero_delay() { set_common_RTI(2); + uint16_t n = test_RTI.number_of_scheduling_nodes; // Construct the structure illustrated below. // node[0] --/0/--> node[1] @@ -179,19 +196,22 @@ static void two_nodes_zero_delay() { set_state_of_nodes(GRANTED); - update_min_delays_upstream(test_rti.scheduling_nodes[0]); - assert(test_rti.scheduling_nodes[1]->num_min_delays == 0); // node[0] has no upstream nodes. + update_min_delays(); + // The min_delay from 0 to 0 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 0], FOREVER_TAG) == 0); + // The min_delay from 0 to 1 should be (0, 1). + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 1], (tag_t){.time = 0, .microstep = 1}) == 0); + // The min_delay from 1 to 0 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 0], FOREVER_TAG) == 0); + // The min_delay from 1 to 1 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 1], FOREVER_TAG) == 0); - update_min_delays_upstream(test_rti.scheduling_nodes[1]); - assert(test_rti.scheduling_nodes[1]->num_min_delays == 1); // node[1] has one upstream nodes. - assert(test_rti.scheduling_nodes[1]->min_delays[0].id == 0); // node[1]'s upstream node is node[0]. - // The min_delay between node[0] and node[1] is (0, 1) which means zero delay. - assert(lf_tag_compare(test_rti.scheduling_nodes[1]->min_delays[0].min_delay, (tag_t){.time = 0, .microstep = 1}) == - 0); + reset_common_RTI(); } static void two_nodes_normal_delay() { set_common_RTI(2); + uint16_t n = test_RTI.number_of_scheduling_nodes; // Construct the structure illustrated below. // node[0] --/1 nsec/--> node[1] @@ -200,19 +220,81 @@ static void two_nodes_normal_delay() { set_state_of_nodes(GRANTED); - update_min_delays_upstream(test_rti.scheduling_nodes[0]); - assert(test_rti.scheduling_nodes[1]->num_min_delays == 0); // node[0] has no upstream nodes. + update_min_delays(); + // The min_delay from 0 to 0 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 0], FOREVER_TAG) == 0); + // The min_delay from 0 to 1 should be (1, 0). + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 1], (tag_t){.time = 1, .microstep = 0}) == 0); + // The min_delay from 1 to 0 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 0], FOREVER_TAG) == 0); + // The min_delay from 1 to 1 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 1], FOREVER_TAG) == 0); - update_min_delays_upstream(test_rti.scheduling_nodes[1]); - assert(test_rti.scheduling_nodes[1]->num_min_delays == 1); // node[1] has one upstream nodes. - assert(test_rti.scheduling_nodes[1]->min_delays[0].id == 0); // node[1]'s upstream node is node[0]. - // The min_delay between node[0] and node[1] is (1 nsec, 0). - assert(lf_tag_compare(test_rti.scheduling_nodes[1]->min_delays[0].min_delay, - (tag_t){.time = NSEC(1), .microstep = 0}) == 0); + reset_common_RTI(); +} + +static void two_nodes_cycle() { + set_common_RTI(2); + uint16_t n = test_RTI.number_of_scheduling_nodes; + + // Construct the structure illustrated below. + // node[0] --/1 nsec/--> node[1] --> node[0] + set_scheduling_node(0, 1, 1, (int[]){1}, (interval_t[]){NEVER}, (int[]){1}); + set_scheduling_node(1, 1, 1, (int[]){0}, (interval_t[]){NSEC(1)}, (int[]){0}); + + set_state_of_nodes(GRANTED); + + update_min_delays(); + // The min_delay from 0 to 0 should be (1, 0). + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 0], (tag_t){.time = 1, .microstep = 0}) == 0); + // The min_delay from 0 to 1 should be (1, 0). + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 1], (tag_t){.time = 1, .microstep = 0}) == 0); + // The min_delay from 1 to 0 should be ZERO_TAG. + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 0], ZERO_TAG) == 0); + // The min_delay from 1 to 1 should be (1, 0). + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 1], (tag_t){.time = 1, .microstep = 0}) == 0); + + // Both of them are in a cycle. + assert(is_in_cycle(test_RTI.scheduling_nodes[0]) == 1); + assert(is_in_cycle(test_RTI.scheduling_nodes[1]) == 1); + // Both of them are in a zero delay cycle. + assert(is_in_zero_delay_cycle(test_RTI.scheduling_nodes[0]) == 0); + assert(is_in_zero_delay_cycle(test_RTI.scheduling_nodes[1]) == 0); + + reset_common_RTI(); +} + +static void two_nodes_ZDC() { + set_common_RTI(2); + uint16_t n = test_RTI.number_of_scheduling_nodes; + + // Construct the structure illustrated below. + // node[0] --> node[1] --> node[0] + set_scheduling_node(0, 1, 1, (int[]){1}, (interval_t[]){NEVER}, (int[]){1}); + set_scheduling_node(1, 1, 1, (int[]){0}, (interval_t[]){NEVER}, (int[]){0}); + + set_state_of_nodes(GRANTED); + + update_min_delays(); + // The min_delay from 0 to 0 should be ZERO_TAG. + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 0], ZERO_TAG) == 0); + // The min_delay from 0 to 1 should be ZERO_TAG. + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 1], ZERO_TAG) == 0); + // The min_delay from 1 to 0 should be ZERO_TAG. + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 0], ZERO_TAG) == 0); + // The min_delay from 1 to 1 should be ZERO_TAG. + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 1], ZERO_TAG) == 0); + + // Both of them are in a zero delay cycle. + assert(is_in_zero_delay_cycle(test_RTI.scheduling_nodes[0]) == 1); + assert(is_in_zero_delay_cycle(test_RTI.scheduling_nodes[1]) == 1); + + reset_common_RTI(); } static void multiple_nodes() { set_common_RTI(4); + uint16_t n = test_RTI.number_of_scheduling_nodes; // Construct the structure illustrated below. // node[0] --/1 nsec/--> node[1] --/0/--> node[2] --/2 nsec/--> node[3] @@ -223,36 +305,56 @@ static void multiple_nodes() { set_state_of_nodes(GRANTED); - update_min_delays_upstream(test_rti.scheduling_nodes[2]); - assert(test_rti.scheduling_nodes[2]->num_min_delays == 2); // node[2] has two upstream nodes. - assert(test_rti.scheduling_nodes[2]->min_delays[1].id == 1); // node[2]'s first upstream node is node[1]. - // The min_delay between node[1] and node[2] is (0, 1), which denotes zero delay. - assert(lf_tag_compare(test_rti.scheduling_nodes[2]->min_delays[1].min_delay, (tag_t){0, 1}) == 0); - assert(test_rti.scheduling_nodes[2]->min_delays[0].id == 0); // node[2]'s second upstream node is node[0]. - // The min_delay between node[0] and node[2] is (1 nsec, 1) = 1 nsec + zero delay. - assert(lf_tag_compare(test_rti.scheduling_nodes[2]->min_delays[0].min_delay, (tag_t){NSEC(1), 1}) == 0); - - update_min_delays_upstream(test_rti.scheduling_nodes[3]); - assert(test_rti.scheduling_nodes[3]->num_min_delays == 3); // node[3] has three upstream nodes. - assert(test_rti.scheduling_nodes[3]->min_delays[2].id == 2); // node[3]'s first upstream node is node [2]. - // The min_delay between node[2] and node[3] is (2 nsec, 0). - assert(lf_tag_compare(test_rti.scheduling_nodes[3]->min_delays[2].min_delay, (tag_t){NSEC(2), 0}) == 0); - assert(test_rti.scheduling_nodes[3]->min_delays[1].id == 1); // node[3]'s second upstream node is node [1]. - // The min_delay between node[1] and node[3] is (2 nsec, 0) = zero_delay + 2 nsec. - assert(lf_tag_compare(test_rti.scheduling_nodes[3]->min_delays[1].min_delay, (tag_t){NSEC(2), 0}) == 0); - assert(test_rti.scheduling_nodes[3]->min_delays[0].id == 0); // node[3]'s third upstream node is node [0]. - // The min_delay between node[0] and node[3] is (3 nsec, 0) = 1 nsec + zero_delay + 2 nsec. - assert(lf_tag_compare(test_rti.scheduling_nodes[3]->min_delays[0].min_delay, (tag_t){NSEC(3), 0}) == 0); + update_min_delays(); + // The min_delay from 0 to 0 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 0], FOREVER_TAG) == 0); + // The min_delay from 0 to 1 should be (1, 0). + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 1], (tag_t){.time = 1, .microstep = 0}) == 0); + // The min_delay from 0 to 2 should be (1, 1). + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 2], (tag_t){.time = 1, .microstep = 1}) == 0); + // The min_delay from 0 to 3 should be (3, 0). + assert(lf_tag_compare(test_RTI.min_delays[0 * n + 3], (tag_t){.time = 3, .microstep = 0}) == 0); + + // The min_delay from 1 to 0 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 0], FOREVER_TAG) == 0); + // The min_delay from 1 to 1 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 1], FOREVER_TAG) == 0); + // The min_delay from 1 to 2 should be (0, 1). + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 2], (tag_t){.time = 0, .microstep = 1}) == 0); + // The min_delay from 1 to 3 should be (2, 0). + assert(lf_tag_compare(test_RTI.min_delays[1 * n + 3], (tag_t){.time = 2, .microstep = 0}) == 0); + + // The min_delay from 2 to 0 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[2 * n + 0], FOREVER_TAG) == 0); + // The min_delay from 2 to 1 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[2 * n + 1], FOREVER_TAG) == 0); + // The min_delay from 2 to 2 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[2 * n + 2], FOREVER_TAG) == 0); + // The min_delay from 2 to 3 should be (2, 0). + assert(lf_tag_compare(test_RTI.min_delays[2 * n + 3], (tag_t){.time = 2, .microstep = 0}) == 0); + + // The min_delay from 3 to 0 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[3 * n + 0], FOREVER_TAG) == 0); + // The min_delay from 3 to 1 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[3 * n + 1], FOREVER_TAG) == 0); + // The min_delay from 3 to 2 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[3 * n + 2], FOREVER_TAG) == 0); + // The min_delay from 3 to 3 should be FOREVER_TAG. + assert(lf_tag_compare(test_RTI.min_delays[3 * n + 3], FOREVER_TAG) == 0); + + reset_common_RTI(); } int main() { - initialize_rti_common(&test_rti); + initialize_rti_common(&test_RTI); - // Tests for the function update_min_delays_upstream() + // Tests for the function update_min_delays valid_cache(); not_connected(); two_nodes_no_delay(); two_nodes_zero_delay(); two_nodes_normal_delay(); + two_nodes_cycle(); + two_nodes_ZDC(); multiple_nodes(); } diff --git a/core/federated/federate.c b/core/federated/federate.c index fc2f86911..f7f52e37a 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -86,9 +86,12 @@ federate_instance_t _fed = {.socket_TCP_RTI = -1, .is_last_TAG_provisional = false, .has_upstream = false, .has_downstream = false, + .received_any_DNET = false, + .last_DNET = {.time = NEVER, .microstep = 0u}, .received_stop_request_from_rti = false, .last_sent_LTC = {.time = NEVER, .microstep = 0u}, .last_sent_NET = {.time = NEVER, .microstep = 0u}, + .last_skipped_NET = {.time = NEVER, .microstep = 0u}, .min_delay_from_physical_action_to_federate_output = NEVER}; federation_metadata_t federation_metadata = { @@ -1455,6 +1458,36 @@ static void handle_stop_request_message() { tag_to_stop.microstep); } +/** + * Handle a downstream next event tag (DNET) message from the RTI. + */ +static void handle_downstream_next_event_tag() { + size_t bytes_to_read = sizeof(instant_t) + sizeof(microstep_t); + unsigned char buffer[bytes_to_read]; + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, + "Failed to read downstream next event tag from RTI."); + tag_t DNET = extract_tag(buffer); + + // Trace the event when tracing is enabled + tracepoint_federate_from_rti(receive_DNET, _lf_my_fed_id, &DNET); + + LF_PRINT_LOG("Received Downstream Next Event Tag (DNET): " PRINTF_TAG ".", DNET.time - start_time, DNET.microstep); + _fed.received_any_DNET = true; + + environment_t* env; + _lf_get_environments(&env); + if (lf_tag_compare(DNET, _fed.last_skipped_NET) < 0) { + LF_PRINT_LOG("The incoming DNET " PRINTF_TAG " is earlier than the last skipped NET " PRINTF_TAG + ". Send the skipped NET", + DNET.time - start_time, DNET.microstep, _fed.last_skipped_NET.time, _fed.last_skipped_NET.microstep); + send_tag(MSG_TYPE_NEXT_EVENT_TAG, _fed.last_skipped_NET); + _fed.last_sent_NET = _fed.last_skipped_NET; + _fed.last_skipped_NET = NEVER_TAG; + } + + _fed.last_DNET = DNET; +} + /** * Send a resign signal to the RTI. */ @@ -1559,6 +1592,9 @@ static void* listen_to_rti_TCP(void* args) { lf_print_error_and_exit("Failed to complete the reading of an absent message from the RTI."); } break; + case MSG_TYPE_DOWNSTREAM_NEXT_EVENT_TAG: + handle_downstream_next_event_tag(); + break; case MSG_TYPE_FAILED: handle_rti_failed_message(); break; @@ -2229,6 +2265,22 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply) if (lf_tag_compare(_fed.last_TAG, tag) >= 0) { LF_PRINT_DEBUG("Granted tag " PRINTF_TAG " because TAG or PTAG has been received.", _fed.last_TAG.time - start_time, _fed.last_TAG.microstep); + + // In case a downstream federate needs the NET of this tag or has not received any DNET, send NET. + if (!_fed.received_any_DNET || + (lf_tag_compare(_fed.last_DNET, tag) < 0 && lf_tag_compare(_fed.last_DNET, _fed.last_sent_NET) >= 0)) { + send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag); + _fed.last_sent_NET = tag; + _fed.last_skipped_NET = NEVER_TAG; + LF_PRINT_LOG("Sent a next event tag (NET) " PRINTF_TAG " to RTI based on the last DNET " PRINTF_TAG ".", + tag.time - start_time, tag.microstep, _fed.last_DNET.time - start_time, _fed.last_DNET.microstep); + } else { + _fed.last_skipped_NET = tag; + LF_PRINT_LOG("Skip sending a next event tag (NET) " PRINTF_TAG " to RTI based on the last DNET " PRINTF_TAG + " and the last sent NET" PRINTF_TAG ".", + tag.time - start_time, tag.microstep, _fed.last_DNET.time - start_time, _fed.last_DNET.microstep, + _fed.last_sent_NET.time - start_time, _fed.last_sent_NET.microstep); + } return _fed.last_TAG; } @@ -2250,9 +2302,15 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply) // This if statement does not fall through but rather returns. // NET is not bounded by physical time or has no downstream federates. // Normal case. - send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag); - _fed.last_sent_NET = tag; - LF_PRINT_LOG("Sent next event tag (NET) " PRINTF_TAG " to RTI.", tag.time - start_time, tag.microstep); + if (lf_tag_compare(_fed.last_DNET, tag) < 0 || (_fed.has_upstream && lf_tag_compare(_fed.last_TAG, tag) < 0)) { + send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag); + _fed.last_sent_NET = tag; + _fed.last_skipped_NET = NEVER_TAG; + LF_PRINT_LOG("Sent next event tag (NET) " PRINTF_TAG " to RTI.", tag.time - start_time, tag.microstep); + } else { + _fed.last_skipped_NET = tag; + LF_PRINT_LOG("Skip sending next event tag (NET) " PRINTF_TAG " to RTI.", tag.time - start_time, tag.microstep); + } if (!wait_for_reply) { LF_PRINT_LOG("Not waiting for reply to NET."); @@ -2288,6 +2346,7 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply) if (lf_tag_compare(next_tag, tag) != 0) { send_tag(MSG_TYPE_NEXT_EVENT_TAG, next_tag); _fed.last_sent_NET = next_tag; + _fed.last_skipped_NET = NEVER_TAG; LF_PRINT_LOG("Sent next event tag (NET) " PRINTF_TAG " to RTI from loop.", next_tag.time - lf_time_start(), next_tag.microstep); } @@ -2484,6 +2543,10 @@ int lf_send_tagged_message(environment_t* env, interval_t additional_delay, int tracepoint_federate_to_rti(send_TAGGED_MSG, _lf_my_fed_id, ¤t_message_intended_tag); } + if (lf_tag_compare(_fed.last_DNET, current_message_intended_tag) > 0) { + _fed.last_DNET = current_message_intended_tag; + } + int result = write_to_socket_close_on_error(socket, header_length, header_buffer); if (result == 0) { // Header sent successfully. Send the body. diff --git a/core/tag.c b/core/tag.c index e777eccc1..10e3f282f 100644 --- a/core/tag.c +++ b/core/tag.c @@ -143,6 +143,18 @@ tag_t lf_delay_strict(tag_t tag, interval_t interval) { return result; } +tag_t lf_tag_latest_earlier(tag_t tag) { + if (lf_tag_compare(tag, NEVER_TAG) == 0 || lf_tag_compare(tag, FOREVER_TAG) == 0) { + return tag; + } else if (tag.microstep == 0) { + tag.time -= 1; + tag.microstep = UINT_MAX; + } else { + tag.microstep -= 1; + } + return tag; +} + instant_t lf_time_logical(void* env) { assert(env != GLOBAL_ENVIRONMENT); return ((environment_t*)env)->current_tag.time; diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 50c59daa1..e7697f259 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -164,10 +164,25 @@ typedef struct federate_instance_t { tag_t last_sent_LTC; /** - * A record of the most recently sent NET (next event tag) message. + * A record of the most recently sent NET (next event tag) signal. */ tag_t last_sent_NET; + /** + * A record of the most recently skipped NET (next event tag) signal. + */ + tag_t last_skipped_NET; + + /** + * Indicator of whether this federate has received any DNET (downstream next event tag) signal. + */ + bool received_any_DNET; + + /** + * A record of the most recent DNET (downstream next event tag) signal. + */ + tag_t last_DNET; + /** * For use in federates with centralized coordination, the minimum * time delay between a physical action within this federate and an diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index 47826be3e..36a5dcac4 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -617,6 +617,18 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define MSG_TYPE_FAILED 25 +/** + * Byte identifying a downstream next event tag (DNET) message sent + * from the RTI in centralized coordination. + * The next eight bytes will be the timestamp. + * The next four bytes will be the microstep. + * This signal from the RTI tells the destination federate that downstream + * federates do not need for it to send any next event tag (NET) signal + * with a tag _g_ less than the specified tag. Thus, it should only send + * those signals if needs permission from the RTI to advance to _g_. + */ +#define MSG_TYPE_DOWNSTREAM_NEXT_EVENT_TAG 26 + ///////////////////////////////////////////// //// Rejection codes diff --git a/tag/api/tag.h b/tag/api/tag.h index 97c1aa0d7..f6243b552 100644 --- a/tag/api/tag.h +++ b/tag/api/tag.h @@ -168,6 +168,14 @@ tag_t lf_delay_tag(tag_t tag, interval_t interval); */ tag_t lf_delay_strict(tag_t tag, interval_t interval); +/** + * @brief Return the greatest tag earlier than the given tag. + * + * If the given tag is `FOREVER_TAG` or `NEVER_TAG`, however, just return the given tag. + * @param tag The tag. + */ +tag_t lf_tag_latest_earlier(tag_t tag); + /** * Return the current logical time in nanoseconds. * On many platforms, this is the number of nanoseconds diff --git a/trace/api/types/trace_types.h b/trace/api/types/trace_types.h index 6d8758fa4..b7cc7be0c 100644 --- a/trace/api/types/trace_types.h +++ b/trace/api/types/trace_types.h @@ -49,6 +49,7 @@ typedef enum { send_P2P_MSG, send_ADR_AD, send_ADR_QR, + send_DNET, // Receiving messages receive_ACK, receive_FAILED, @@ -71,6 +72,7 @@ typedef enum { receive_P2P_MSG, receive_ADR_AD, receive_ADR_QR, + receive_DNET, receive_UNIDENTIFIED, NUM_EVENT_TYPES } trace_event_t; @@ -112,6 +114,7 @@ static const char* trace_event_names[] = { "Sending P2P_MSG", "Sending ADR_AD", "Sending ADR_QR", + "Sending DNET", // Receiving messages "Receiving ACK", "Receiving FAILED", @@ -134,6 +137,7 @@ static const char* trace_event_names[] = { "Receiving P2P_MSG", "Receiving ADR_AD", "Receiving ADR_QR", + "Receiving DNET", "Receiving UNIDENTIFIED", }; diff --git a/util/tracing/visualization/fedsd.py b/util/tracing/visualization/fedsd.py index d66000e4b..a680d27c4 100644 --- a/util/tracing/visualization/fedsd.py +++ b/util/tracing/visualization/fedsd.py @@ -26,6 +26,7 @@ .NET { stroke: #118ab2; fill: #118ab2} \ .PTAG { stroke: #06d6a0; fill: #06d6a0} \ .TAG { stroke: #08a578; fill: #08a578} \ + .DNET { stroke: purple; fill: purple} \ .TIMESTAMP { stroke: grey; fill: grey } \ .FED_ID {stroke: #80DD99; fill: #80DD99 } \ .ADV {stroke-linecap="round" ; stroke: "red" ; fill: "red"} \ @@ -61,6 +62,7 @@ "Sending P2P_MSG": "P2P_MSG", "Sending ADR_AD": "ADR_AD", "Sending ADR_QR": "ADR_QR", + "Sending DNET": "DNET", "Receiving ACK": "ACK", "Receiving FAILED": "FAILED", "Receiving TIMESTAMP": "TIMESTAMP", @@ -82,6 +84,7 @@ "Receiving P2P_MSG": "P2P_MSG", "Receiving ADR_AD": "ADR_AD", "Receiving ADR_QR": "ADR_QR", + "Receiving DNET": "DNET", "Receiving UNIDENTIFIED": "UNIDENTIFIED", "Scheduler advancing time ends": "AdvLT" }