Skip to content

Commit

Permalink
Better handling of startup
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardalee committed Dec 27, 2023
1 parent a31a5d4 commit b849176
Showing 1 changed file with 16 additions and 21 deletions.
37 changes: 16 additions & 21 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1128,16 +1128,14 @@ void update_last_known_status_on_input_ports(tag_t tag) {
*/
void update_last_known_status_on_input_port(tag_t tag, int port_id) {
trigger_t* input_port_action = _lf_action_for_port(port_id)->trigger;
if (lf_tag_compare(tag,
input_port_action->last_known_status_tag) >= 0) {
if (lf_tag_compare(tag,
input_port_action->last_known_status_tag) == 0) {
// If the intended tag for an input port is equal to the last known status, we need
// to increment the microstep. This is a direct result of the behavior of the lf_delay_tag()
// semantics in tag.h.
tag.microstep++;
}
LF_PRINT_DEBUG(
if (lf_tag_compare(tag, input_port_action->last_known_status_tag) >= 0) {
if (lf_tag_compare(tag, input_port_action->last_known_status_tag) == 0) {
// If the intended tag for an input port is equal to the last known status, we need
// to increment the microstep. This is a direct result of the behavior of the lf_delay_tag()
// semantics in tag.h.
tag.microstep++;
}
LF_PRINT_LOG(
"Updating the last known status tag of port %d from " PRINTF_TAG " to " PRINTF_TAG ".",
port_id,
input_port_action->last_known_status_tag.time - lf_time_start(),
Expand Down Expand Up @@ -1311,25 +1309,26 @@ static trigger_handle_t schedule_message_received_from_network_locked(
// in the future relative to the tag of this
// federate. By default, assume it is not.
bool message_tag_is_in_the_future = lf_tag_compare(tag, env->current_tag) > 0;

// Assign the intended tag temporarily to restore later.
tag_t previous_intended_tag = trigger->intended_tag;
trigger->intended_tag = tag;

// Calculate the extra_delay required to be passed
// to the schedule function.
interval_t extra_delay = tag.time - env->current_tag.time;
if (!message_tag_is_in_the_future) {
if (!message_tag_is_in_the_future && _lf_execution_started) {
#ifdef FEDERATED_CENTRALIZED
// If the coordination is centralized, receiving a message
// that does not carry a timestamp that is in the future
// would indicate a critical condition, showing that the
// time advance mechanism is not working correctly.
lf_print_error_and_exit("Received a message at tag " PRINTF_TAG " that"
" has a tag " PRINTF_TAG " that has violated the STP offset. "
"Centralized coordination should not have these types of messages.",
env->current_tag.time - start_time, env->current_tag.microstep,
tag.time - start_time, tag.microstep);
LF_MUTEX_UNLOCK(&env->mutex);
lf_print_error_and_exit(
"Received a message at tag " PRINTF_TAG " that has a tag " PRINTF_TAG
" that has violated the STP offset. "
"Centralized coordination should not have these types of messages.",
env->current_tag.time - start_time, env->current_tag.microstep,
tag.time - start_time, tag.microstep);
#else
// Set the delay back to 0
extra_delay = 0LL;
Expand Down Expand Up @@ -1563,10 +1562,7 @@ void handle_tagged_message(int socket, int fed_id) {
// can be checked in this scenario without this race condition. The message with
// intended_tag of 9 in this case needs to wait one microstep to be processed.
if (lf_tag_compare(intended_tag, lf_tag(env)) == 0 // The event is meant for the current tag.
#if defined FEDERATED_DECENTRALIZED
// Not sure why this test is only needed for decentralized coordination.
&& _lf_execution_started
#endif // FEDERATED_DECENTRALIZED
// Check that MLAA is blocking at the right level. Otherwise, data can be lost.
&& action->trigger->reactions[0]->index >= max_level_allowed_to_advance
&& !action->trigger->is_physical
Expand Down Expand Up @@ -1602,7 +1598,6 @@ void handle_tagged_message(int socket, int fed_id) {
} else {
// If no port absent reaction is waiting for this message, or if the intended
// tag is in the future, use schedule functions to process the message.

update_last_known_status_on_input_port(intended_tag, port_id);

// Before that, if the current time >= stop time, discard the message.
Expand Down

0 comments on commit b849176

Please sign in to comment.