Skip to content

Commit

Permalink
Prevent sending redundant reply to stop request
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardalee committed Dec 21, 2023
1 parent 444ebee commit ab7605e
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 21 deletions.
11 changes: 5 additions & 6 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,10 @@ void handle_stop_request_message(federate_info_t* fed) {
read_from_socket_errexit(fed->socket, bytes_to_read, buffer,
"RTI failed to read the MSG_TYPE_STOP_REQUEST payload from federate %d.", fed->enclave.id);

if (rti_remote->base.tracing_enabled) {
tracepoint_rti_from_federate(rti_remote->base.trace, receive_STOP_REQ, fed->enclave.id, &proposed_stop_tag);
}

// Acquire a mutex lock to ensure that this state does change while a
// message is in transport or being used to determine a TAG.
lf_mutex_lock(&rti_mutex);
Expand All @@ -620,10 +624,6 @@ void handle_stop_request_message(federate_info_t* fed) {
// Extract the proposed stop tag for the federate
tag_t proposed_stop_tag = extract_tag(buffer);

if (rti_remote->base.tracing_enabled) {
tracepoint_rti_from_federate(rti_remote->base.trace, receive_STOP_REQ, fed->enclave.id, &proposed_stop_tag);
}

// Update the maximum stop tag received from federates
if (lf_tag_compare(proposed_stop_tag, rti_remote->base.max_stop_tag) > 0) {
rti_remote->base.max_stop_tag = proposed_stop_tag;
Expand All @@ -638,8 +638,7 @@ void handle_stop_request_message(federate_info_t* fed) {

if (rti_remote->base.num_scheduling_nodes_handling_stop == rti_remote->base.number_of_scheduling_nodes) {
// We now have information about the stop time of all
// federates. This is extremely unlikely, but it can occur
// all federates call lf_request_stop() at the same tag.
// federates, and mark_federate_requesting_stop has sent out stop time to all.
lf_mutex_unlock(&rti_mutex);
return;
}
Expand Down
26 changes: 14 additions & 12 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -2165,12 +2165,12 @@ int _lf_fd_send_stop_request_to_rti(tag_t stop_tag) {
lf_mutex_unlock(&outbound_socket_mutex);
return -1;
}
// Trace the event when tracing is enabled
tracepoint_federate_to_rti(_fed.trace, send_STOP_REQ, _lf_my_fed_id, &stop_tag);
write_to_socket_with_mutex(_fed.socket_TCP_RTI, MSG_TYPE_STOP_REQUEST_LENGTH,
buffer, &outbound_socket_mutex,
"Failed to send stop time " PRINTF_TIME " to the RTI.", stop_tag.time - start_time);
lf_mutex_unlock(&outbound_socket_mutex);
// Trace the event when tracing is enabled
tracepoint_federate_to_rti(_fed.trace, send_STOP_REQ, _lf_my_fed_id, &stop_tag);
return 0;
} else {
lf_mutex_unlock(&outbound_socket_mutex);
Expand Down Expand Up @@ -2221,10 +2221,7 @@ void handle_stop_granted_message() {
env[i].stop_tag.microstep);

if (env[i].barrier.requestors) _lf_decrement_tag_barrier_locked(&env[i]);
// We signal instead of broadcast under the assumption that only
// one worker thread can call wait_until at a given time because
// the call to wait_until is protected by a mutex lock
lf_cond_signal(&env[i].event_q_changed);
lf_cond_broadcast(&env[i].event_q_changed);
lf_mutex_unlock(&env[i].mutex);
}
}
Expand Down Expand Up @@ -2268,6 +2265,12 @@ void handle_stop_request_message() {
}
lf_mutex_unlock(&global_mutex);

if (already_blocked) {
// Either we have sent a stop request to the RTI ourselves,
// or we have previously received a stop request from the RTI.
// Nothing more to do. Tag advance is already blocked on enclaves.
return;
}

// Iterate over the scheduling enclaves to find the maximum current tag
// and adjust the tag_to_stop if any of those is greater than tag_to_stop.
Expand All @@ -2281,10 +2284,9 @@ void handle_stop_request_message() {
tag_to_stop = env->current_tag;
tag_to_stop.microstep++;
}
if (!already_blocked) {
// Set a barrier to prevent the enclave from advancing past the so-far tag to stop.
_lf_increment_tag_barrier_locked(&env[i], tag_to_stop);
}
// Set a barrier to prevent the enclave from advancing past the so-far tag to stop.
_lf_increment_tag_barrier_locked(&env[i], tag_to_stop);

lf_mutex_unlock(&env[i].mutex);
}
// Send the reply, which is the least tag at which we can stop.
Expand All @@ -2297,14 +2299,14 @@ void handle_stop_request_message() {
lf_mutex_unlock(&outbound_socket_mutex);
return;
}
// Trace the event when tracing is enabled
tracepoint_federate_to_rti(_fed.trace, send_STOP_REQ_REP, _lf_my_fed_id, &tag_to_stop);
// Send the current logical time to the RTI. This message does not have an identifying byte
// since the RTI is waiting for a response from this federate.
write_to_socket_with_mutex(
_fed.socket_TCP_RTI, MSG_TYPE_STOP_REQUEST_REPLY_LENGTH, outgoing_buffer, &outbound_socket_mutex,
"Failed to send the answer to MSG_TYPE_STOP_REQUEST to RTI.");
lf_mutex_unlock(&outbound_socket_mutex);
// Trace the event when tracing is enabled
tracepoint_federate_to_rti(_fed.trace, send_STOP_REQ_REP, _lf_my_fed_id, &tag_to_stop);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
NEVER_TAG.time - start_time, 0);

environment_init_tags(env, start_time, duration);
// Start tracing if enalbed
// Start tracing if enabled.
start_trace(env->trace);
#ifdef MODAL_REACTORS
// Set up modal infrastructure
Expand Down
4 changes: 2 additions & 2 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -611,9 +611,9 @@ void lf_request_stop() {
}

#ifdef FEDERATED
// In the federated case, do not set lf_stop_requested because the RTI might grant a
// In the federated case, the RTI might grant a
// later stop tag than the current tag. The above code has raised
// a barrier no greater than the requested stop tag for each enclave.
// a barrier no greater than max_current_tag.
if (_lf_fd_send_stop_request_to_rti(max_current_tag) != 0) {
// Message was not sent to the RTI.
// Decrement the barriers to reverse our previous increment.
Expand Down

0 comments on commit ab7605e

Please sign in to comment.