Skip to content

Commit

Permalink
Merge branch 'main' of github.com:lf-lang/reactor-c into netdriver
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakio815 committed Feb 8, 2024
2 parents c2f6f0a + 85d54a4 commit 376694e
Show file tree
Hide file tree
Showing 23 changed files with 348 additions and 312 deletions.
40 changes: 17 additions & 23 deletions core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,14 @@ static void environment_init_threaded(environment_t* env, int num_workers) {
#if !defined(LF_SINGLE_THREADED)
env->num_workers = num_workers;
env->thread_ids = (lf_thread_t*)calloc(num_workers, sizeof(lf_thread_t));
LF_ASSERT(env->thread_ids, "Out of memory");
LF_ASSERT_NON_NULL(env->thread_ids);
env->barrier.requestors = 0;
env->barrier.horizon = FOREVER_TAG;

// Initialize synchronization objects.
if (lf_mutex_init(&env->mutex) != 0) {
lf_print_error_and_exit("Could not initialize environment mutex");
}
if (lf_cond_init(&env->event_q_changed, &env->mutex) != 0) {
lf_print_error_and_exit("Could not initialize environment event queue condition variable");
}
if (lf_cond_init(&env->global_tag_barrier_requestors_reached_zero, &env->mutex)) {
lf_print_error_and_exit("Could not initialize environment tag barrier condition variable");
}

LF_MUTEX_INIT(&env->mutex);
LF_COND_INIT(&env->event_q_changed, &env->mutex);
LF_COND_INIT(&env->global_tag_barrier_requestors_reached_zero, &env->mutex);

#endif
}
Expand All @@ -84,16 +77,16 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st
#ifdef MODAL_REACTORS
if (num_modes > 0) {
mode_environment_t* modes = (mode_environment_t *) calloc(1, sizeof(mode_environment_t));
LF_ASSERT(modes, "Out of memory");
LF_ASSERT_NON_NULL(modes);
modes->modal_reactor_states = (reactor_mode_state_t**) calloc(num_modes, sizeof(reactor_mode_state_t*));
LF_ASSERT(modes->modal_reactor_states, "Out of memory");
LF_ASSERT_NON_NULL(modes->modal_reactor_states);
modes->modal_reactor_states_size = num_modes;
modes->triggered_reactions_request = 0;

modes->state_resets_size = num_state_resets;
if (modes->state_resets_size > 0) {
modes->state_resets = (mode_state_variable_reset_data_t *) calloc(modes->state_resets_size, sizeof(mode_state_variable_reset_data_t));
LF_ASSERT(modes->state_resets, "Out of memory");
LF_ASSERT_NON_NULL(modes->state_resets);
} else {
modes->state_resets = NULL;
}
Expand All @@ -113,10 +106,11 @@ static void environment_init_federated(environment_t* env, int num_is_present_fi
#ifdef FEDERATED_DECENTRALIZED
if (num_is_present_fields > 0) {
env->_lf_intended_tag_fields = (tag_t**) calloc(num_is_present_fields, sizeof(tag_t*));
LF_ASSERT(env->_lf_intended_tag_fields, "Out of memory");
LF_ASSERT_NON_NULL(env->_lf_intended_tag_fields);
env->_lf_intended_tag_fields_size = num_is_present_fields;
} else {
env->_lf_intended_tag_fields_size = NULL;
env->_lf_intended_tag_fields = NULL;
env->_lf_intended_tag_fields_size = 0;
}
#endif
}
Expand Down Expand Up @@ -198,7 +192,7 @@ int environment_init(
) {

env->name = malloc(strlen(name) + 1); // +1 for the null terminator
LF_ASSERT(env->name, "Out of memory");
LF_ASSERT_NON_NULL(env->name);
strcpy(env->name, name);

env->id = id;
Expand All @@ -207,31 +201,31 @@ int environment_init(
env->timer_triggers_size=num_timers;
if(env->timer_triggers_size > 0) {
env->timer_triggers = (trigger_t **) calloc(num_timers, sizeof(trigger_t));
LF_ASSERT(env->timer_triggers, "Out of memory");
LF_ASSERT_NON_NULL(env->timer_triggers);
} else {
env->timer_triggers = NULL;
}

env->startup_reactions_size=num_startup_reactions;
if (env->startup_reactions_size > 0) {
env->startup_reactions = (reaction_t **) calloc(num_startup_reactions, sizeof(reaction_t));
LF_ASSERT(env->startup_reactions, "Out of memory");
LF_ASSERT_NON_NULL(env->startup_reactions);
} else {
env->startup_reactions = NULL;
}

env->shutdown_reactions_size=num_shutdown_reactions;
if(env->shutdown_reactions_size > 0) {
env->shutdown_reactions = (reaction_t **) calloc(num_shutdown_reactions, sizeof(reaction_t));
LF_ASSERT(env->shutdown_reactions, "Out of memory");
LF_ASSERT_NON_NULL(env->shutdown_reactions);
} else {
env->shutdown_reactions = NULL;
}

env->reset_reactions_size=num_reset_reactions;
if (env->reset_reactions_size > 0) {
env->reset_reactions = (reaction_t **) calloc(num_reset_reactions, sizeof(reaction_t));
LF_ASSERT(env->reset_reactions, "Out of memory");
LF_ASSERT_NON_NULL(env->reset_reactions);
} else {
env->reset_reactions = NULL;
}
Expand All @@ -241,9 +235,9 @@ int environment_init(

if (env->is_present_fields_size > 0) {
env->is_present_fields = (bool**)calloc(num_is_present_fields, sizeof(bool*));
LF_ASSERT(env->is_present_fields, "Out of memory");
LF_ASSERT_NON_NULL(env->is_present_fields);
env->is_present_fields_abbreviated = (bool**)calloc(num_is_present_fields, sizeof(bool*));
LF_ASSERT(env->is_present_fields_abbreviated, "Out of memory");
LF_ASSERT_NON_NULL(env->is_present_fields_abbreviated);
} else {
env->is_present_fields = NULL;
env->is_present_fields_abbreviated = NULL;
Expand Down
2 changes: 1 addition & 1 deletion core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ int main(int argc, const char* argv[]) {
if (rti.base.tracing_enabled) {
_lf_number_of_workers = rti.base.number_of_scheduling_nodes;
rti.base.trace = trace_new(NULL, rti_trace_file_name);
LF_ASSERT(rti.base.trace, "Out of memory");
LF_ASSERT_NON_NULL(rti.base.trace);
start_trace(rti.base.trace);
lf_print("Tracing the RTI execution in %s file.", rti_trace_file_name);
}
Expand Down
4 changes: 2 additions & 2 deletions core/federated/RTI/rti_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void initialize_scheduling_node(scheduling_node_t* e, uint16_t id) {
void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) {
// FIXME: Consolidate this message with NET to get NMR (Next Message Request).
// Careful with handling startup and shutdown.
lf_mutex_lock(rti_common->mutex);
LF_MUTEX_LOCK(rti_common->mutex);

enclave->completed = completed;

Expand All @@ -78,7 +78,7 @@ void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) {
free(visited);
}

lf_mutex_unlock(rti_common->mutex);
LF_MUTEX_UNLOCK(rti_common->mutex);
}

tag_t earliest_future_incoming_message_tag(scheduling_node_t* e) {
Expand Down
26 changes: 13 additions & 13 deletions core/federated/RTI/rti_local.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ lf_mutex_t rti_mutex;

void initialize_local_rti(environment_t *envs, int num_envs) {
rti_local = (rti_local_t*)calloc(1, sizeof(rti_local_t));
LF_ASSERT(rti_local, "Out of memory");
LF_ASSERT_NON_NULL(rti_local);

initialize_rti_common(&rti_local->base);
LF_ASSERT(lf_mutex_init(&rti_mutex) == 0, "Could not create mutex");
LF_MUTEX_INIT(&rti_mutex);
rti_local->base.mutex = &rti_mutex;
rti_local->base.number_of_scheduling_nodes = num_envs;
rti_local->base.tracing_enabled = (envs[0].trace != NULL);
Expand Down Expand Up @@ -73,7 +73,7 @@ void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t * e
enclave->env = env;

// Initialize the next event condition variable.
LF_ASSERT(lf_cond_init(&enclave->next_event_condition, &rti_mutex) == 0, "Could not create cond var");
LF_COND_INIT(&enclave->next_event_condition, &rti_mutex);
}

tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
Expand All @@ -86,8 +86,8 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
}
// This is called from a critical section within the source enclave. Leave
// this critical section and acquire the RTI mutex.
LF_ASSERT(lf_mutex_unlock(&e->env->mutex) == 0, "Could not unlock mutex");
LF_ASSERT(lf_mutex_lock(rti_local->base.mutex) == 0, "Could not lock mutex");
LF_MUTEX_UNLOCK(&e->env->mutex);
LF_MUTEX_LOCK(rti_local->base.mutex);
tracepoint_federate_to_rti(e->env->trace, send_NET, e->base.id, &next_event_tag);
// First, update the enclave data structure to record this next_event_tag,
// and notify any downstream scheduling_nodes, and unblock them if appropriate.
Expand All @@ -105,8 +105,8 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
next_event_tag.time - lf_time_start(), next_event_tag.microstep);
tracepoint_federate_from_rti(e->env->trace, receive_TAG, e->base.id, &next_event_tag);
// Release RTI mutex and re-enter the critical section of the source enclave before returning.
LF_ASSERT(lf_mutex_unlock(rti_local->base.mutex) == 0, "Could not unlock mutex");
LF_ASSERT(lf_mutex_lock(&e->env->mutex) == 0, "Could not lock mutex");
LF_MUTEX_UNLOCK(rti_local->base.mutex);
LF_MUTEX_LOCK(&e->env->mutex);
return next_event_tag;
}

Expand Down Expand Up @@ -136,8 +136,8 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
e->base.id, e->base.next_event.time - lf_time_start(), e->base.next_event.microstep);
tracepoint_federate_from_rti(e->env->trace, receive_TAG, e->base.id, &result.tag);
// Release RTI mutex and re-enter the critical section of the source enclave.
LF_ASSERT(lf_mutex_unlock(rti_local->base.mutex) == 0, "Could not unlock mutex");
LF_ASSERT(lf_mutex_lock(&e->env->mutex) == 0, "Could not lock mutex");
LF_MUTEX_UNLOCK(rti_local->base.mutex);
LF_MUTEX_LOCK(&e->env->mutex);
return result.tag;
}

Expand All @@ -146,24 +146,24 @@ void rti_logical_tag_complete_locked(enclave_info_t* enclave, tag_t completed) {
return;
}
// Release the enclave mutex while doing the local RTI work.
LF_ASSERT(lf_mutex_unlock(&enclave->env->mutex) == 0, "Could not unlock mutex");
LF_MUTEX_UNLOCK(&enclave->env->mutex);
tracepoint_federate_to_rti(enclave->env->trace, send_LTC, enclave->base.id, &completed);
_logical_tag_complete(&enclave->base, completed);
// Acquire the enclave mutex again before returning.
LF_ASSERT(lf_mutex_lock(&enclave->env->mutex) == 0, "Could not lock mutex");
LF_MUTEX_LOCK(&enclave->env->mutex);
}

void rti_update_other_net_locked(enclave_info_t* src, enclave_info_t * target, tag_t net) {
// Here we do NOT leave the critical section of the target enclave before we
// acquire the RTI mutex. This means that we cannot block within this function.
LF_ASSERT(lf_mutex_lock(rti_local->base.mutex) == 0, "Could not lock mutex");
LF_MUTEX_LOCK(rti_local->base.mutex);
tracepoint_federate_to_federate(src->env->trace, send_TAGGED_MSG, src->base.id, target->base.id, &net);

// If our proposed NET is less than the current NET, update it.
if (lf_tag_compare(net, target->base.next_event) < 0) {
target->base.next_event = net;
}
LF_ASSERT(lf_mutex_unlock(rti_local->base.mutex) == 0, "Could not unlock mutex");
LF_MUTEX_UNLOCK(rti_local->base.mutex);
}

///////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 376694e

Please sign in to comment.