Skip to content

Commit

Permalink
Merge branch main into branch 'rti-DNET'
Browse files Browse the repository at this point in the history
  • Loading branch information
byeonggiljun committed Apr 22, 2024
2 parents 800b570 + 4f39b72 commit f266230
Show file tree
Hide file tree
Showing 77 changed files with 1,154 additions and 1,245 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
/docs/api
**/.vscode/
**/build/
**/lib/
**/.DS_Store
/core/federated/RTI/build/
/cmake-build-debug/
Expand Down
54 changes: 17 additions & 37 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
set(CORE_ROOT ${CMAKE_CURRENT_SOURCE_DIR})
set(LF_ROOT ${CMAKE_CURRENT_LIST_DIR}/..)

if(${CMAKE_SYSTEM_NAME} STREQUAL "Windows")
set(CMAKE_SYSTEM_VERSION 10.0)
message("Using Windows SDK version ${CMAKE_VS_WINDOWS_TARGET_PLATFORM_VERSION}")
elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Nrf52")
list(APPEND REACTORC_COMPILE_DEFS PLATFORM_NRF52)
elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Zephyr")
list(APPEND REACTORC_COMPILE_DEFS PLATFORM_ZEPHYR)
set(PLATFORM_ZEPHYR true)
elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Rp2040")
list(APPEND REACTORC_COMPILE_DEFS PLATFORM_RP2040)
endif()
include(${LF_ROOT}/core/lf_utils.cmake)

# Get the general common sources for reactor-c
list(APPEND GENERAL_SOURCES tag.c clock.c port.c mixed_radix.c reactor_common.c lf_token.c environment.c)
Expand Down Expand Up @@ -50,23 +40,18 @@ endif()

# Include sources from subdirectories
include(utils/CMakeLists.txt)

if (DEFINED MODAL_REACTORS)
include(modal_models/CMakeLists.txt)
endif()

# Print sources used for compilation
list(JOIN REACTORC_SOURCES ", " PRINTABLE_SOURCE_LIST)
message(STATUS "Including the following sources: " ${PRINTABLE_SOURCE_LIST})

# Create the reactor-c library. If we are targeting Zephyr we have to use the
# Zephyr Cmake extension to create the library and add the sources.
if(PLATFORM_ZEPHYR)
message("--- Building Zephyr library")
zephyr_library_named(reactor-c)
zephyr_library_sources(${REACTORC_SOURCES})
zephyr_library_link_libraries(kernel)
else()
add_library(reactor-c)
target_sources(reactor-c PRIVATE ${REACTORC_SOURCES})
endif()
add_library(reactor-c)
target_sources(reactor-c PRIVATE ${REACTORC_SOURCES})
lf_enable_compiler_warnings(reactor-c)

if (DEFINED LF_TRACE)
include(${LF_ROOT}/trace/api/CMakeLists.txt)
Expand Down Expand Up @@ -98,9 +83,6 @@ include(${LF_ROOT}/platform/impl/CMakeLists.txt)
target_link_libraries(reactor-c PUBLIC lf::platform-api)
target_link_libraries(reactor-c PRIVATE lf::platform-impl)

# Apply compile definitions to the reactor-c library.
target_compile_definitions(reactor-c PUBLIC ${REACTORC_COMPILE_DEFS})

target_include_directories(reactor-c PUBLIC ../include)
target_include_directories(reactor-c PUBLIC ../include/core)
target_include_directories(reactor-c PUBLIC ../include/core/federated)
Expand Down Expand Up @@ -134,14 +116,18 @@ if(DEFINED _LF_CLOCK_SYNC_ON)
endif()
endif()

# Link with thread library, unless we are on the Zephyr platform.
if(NOT DEFINED LF_SINGLE_THREADED OR DEFINED LF_TRACE)
if (NOT PLATFORM_ZEPHYR)
find_package(Threads REQUIRED)
target_link_libraries(reactor-c PUBLIC Threads::Threads)
endif()
# Unless specified otherwise initial event queue and reaction queue to size 10
if (NOT DEFINED INITIAL_EVENT_QUEUE_SIZE)
set(INITIAL_EVENT_QUEUE_SIZE 10)
endif()
if (NOT DEFINED INITIAL_REACT_QUEUE_SIZE)
set(INITIAL_REACT_QUEUE_SIZE 10)
endif()

target_compile_definitions(reactor-c PRIVATE INITIAL_EVENT_QUEUE_SIZE=${INITIAL_EVENT_QUEUE_SIZE})
target_compile_definitions(reactor-c PRIVATE INITIAL_REACT_QUEUE_SIZE=${INITIAL_REACT_QUEUE_SIZE})
target_compile_definitions(reactor-c PUBLIC PLATFORM_${CMAKE_SYSTEM_NAME})

# Macro for translating a command-line argument into compile definition for
# reactor-c lib
macro(define X)
Expand All @@ -151,12 +137,6 @@ macro(define X)
endif(DEFINED ${X})
endmacro()

# FIXME: May want these to be application dependent, hence passed as
# parameters to Cmake.
target_compile_definitions(reactor-c PRIVATE INITIAL_EVENT_QUEUE_SIZE=10)
target_compile_definitions(reactor-c PRIVATE INITIAL_REACT_QUEUE_SIZE=10)
target_compile_definitions(reactor-c PUBLIC PLATFORM_${CMAKE_SYSTEM_NAME})

# Search and apply all possible compile definitions
message(STATUS "Applying preprocessor definitions...")
define(_LF_CLOCK_SYNC_ATTENUATION)
Expand Down
89 changes: 67 additions & 22 deletions core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,31 @@
#include "scheduler.h"
#endif

//////////////////
// Local functions, not intended for use outside this file.

/**
* @brief Callback function to determine whether two events have the same trigger.
* This function is used by event queue and recycle.
* Return 1 if the triggers are identical, 0 otherwise.
* @param event1 A pointer to an event.
* @param event2 A pointer to an event.
*/
static int event_matches(void* event1, void* event2) {
return (((event_t*)event1)->trigger == ((event_t*)event2)->trigger);
}

/**
* @brief Callback function to print information about an event.
* This function is used by event queue and recycle.
* @param element A pointer to an event.
*/
static void print_event(void* event) {
event_t* e = (event_t*)event;
LF_PRINT_DEBUG("tag: " PRINTF_TAG ", trigger: %p, token: %p", e->base.tag.time, e->base.tag.microstep,
(void*)e->trigger, (void*)e->token);
}

/**
* @brief Initialize the threaded part of the environment struct.
*/
Expand All @@ -53,9 +78,12 @@ static void environment_init_threaded(environment_t* env, int num_workers) {
LF_MUTEX_INIT(&env->mutex);
LF_COND_INIT(&env->event_q_changed, &env->mutex);
LF_COND_INIT(&env->global_tag_barrier_requestors_reached_zero, &env->mutex);

#else
(void)env;
(void)num_workers;
#endif
}

/**
* @brief Initialize the single-threaded-specific parts of the environment struct.
*/
Expand All @@ -67,6 +95,8 @@ static void environment_init_single_threaded(environment_t* env) {
env->reaction_q = pqueue_init(INITIAL_REACT_QUEUE_SIZE, in_reverse_order, get_reaction_index, get_reaction_position,
set_reaction_position, reaction_matches, print_reaction);

#else
(void)env;
#endif
}

Expand Down Expand Up @@ -97,6 +127,10 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st
} else {
env->modes = NULL;
}
#else
(void)env;
(void)num_modes;
(void)num_state_resets;
#endif
}

Expand All @@ -113,31 +147,26 @@ static void environment_init_federated(environment_t* env, int num_is_present_fi
env->_lf_intended_tag_fields = NULL;
env->_lf_intended_tag_fields_size = 0;
}
#else
(void)env;
(void)num_is_present_fields;
#endif
}

void environment_init_tags(environment_t* env, instant_t start_time, interval_t duration) {
env->current_tag = (tag_t){.time = start_time, .microstep = 0u};

tag_t stop_tag = FOREVER_TAG_INITIALIZER;
if (duration >= 0LL) {
// A duration has been specified. Calculate the stop time.
stop_tag.time = env->current_tag.time + duration;
stop_tag.microstep = 0;
}
env->stop_tag = stop_tag;
}

static void environment_free_threaded(environment_t* env) {
#if !defined(LF_SINGLE_THREADED)
free(env->thread_ids);
lf_sched_free(env->scheduler);
#else
(void)env;
#endif
}

static void environment_free_single_threaded(environment_t* env) {
#ifdef LF_SINGLE_THREADED
pqueue_free(env->reaction_q);
#else
(void)env;
#endif
}

Expand All @@ -148,15 +177,22 @@ static void environment_free_modes(environment_t* env) {
free(env->modes->state_resets);
free(env->modes);
}
#else
(void)env;
#endif
}

static void environment_free_federated(environment_t* env) {
#ifdef FEDERATED_DECENTRALIZED
free(env->_lf_intended_tag_fields);
#else
(void)env;
#endif
}

//////////////////
// Functions defined in environment.h.

void environment_free(environment_t* env) {
free(env->name);
free(env->timer_triggers);
Expand All @@ -165,20 +201,32 @@ void environment_free(environment_t* env) {
free(env->reset_reactions);
free(env->is_present_fields);
free(env->is_present_fields_abbreviated);
pqueue_free(env->event_q);
pqueue_free(env->recycle_q);
pqueue_free(env->next_q);
pqueue_tag_free(env->event_q);
pqueue_tag_free(env->recycle_q);

environment_free_threaded(env);
environment_free_single_threaded(env);
environment_free_modes(env);
environment_free_federated(env);
}

void environment_init_tags(environment_t* env, instant_t start_time, interval_t duration) {
env->current_tag = (tag_t){.time = start_time, .microstep = 0u};

tag_t stop_tag = FOREVER_TAG_INITIALIZER;
if (duration >= 0LL) {
// A duration has been specified. Calculate the stop time.
stop_tag.time = env->current_tag.time + duration;
stop_tag.microstep = 0;
}
env->stop_tag = stop_tag;
}

int environment_init(environment_t* env, const char* name, int id, int num_workers, int num_timers,
int num_startup_reactions, int num_shutdown_reactions, int num_reset_reactions,
int num_is_present_fields, int num_modes, int num_state_resets, int num_watchdogs,
const char* trace_file_name) {
(void)trace_file_name; // Will be used with future enclave support.

env->name = malloc(strlen(name) + 1); // +1 for the null terminator
LF_ASSERT_NON_NULL(env->name);
Expand Down Expand Up @@ -241,12 +289,9 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke
env->_lf_handle = 1;

// Initialize our priority queues.
env->event_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_reverse_order, get_event_time, get_event_position,
set_event_position, event_matches, print_event);
env->recycle_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, get_event_time, get_event_position,
set_event_position, event_matches, print_event);
env->next_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, get_event_time, get_event_position,
set_event_position, event_matches, print_event);
env->event_q = pqueue_tag_init_customize(INITIAL_EVENT_QUEUE_SIZE, pqueue_tag_compare, event_matches, print_event);
env->recycle_q =
pqueue_tag_init_customize(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, event_matches, print_event);

// Initialize functionality depending on target properties.
environment_init_threaded(env, num_workers);
Expand Down
7 changes: 1 addition & 6 deletions core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ int process_args(int argc, const char* argv[]) {
}
i++;
long num_federates = strtol(argv[i], NULL, 10);
if (num_federates == 0L || num_federates == LONG_MAX || num_federates == LONG_MIN) {
if (num_federates <= 0L || num_federates == LONG_MAX || num_federates == LONG_MIN) {
lf_print_error("--number_of_federates needs a valid positive integer argument.");
usage(argc, argv);
return 0;
Expand Down Expand Up @@ -272,11 +272,6 @@ int process_args(int argc, const char* argv[]) {
return 0;
}
}
if (rti.base.number_of_scheduling_nodes == 0) {
lf_print_error("--number_of_federates needs a valid positive integer argument.");
usage(argc, argv);
return 0;
}
return 1;
}
int main(int argc, const char* argv[]) {
Expand Down
3 changes: 1 addition & 2 deletions core/federated/RTI/rti.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Docker file for building the image of the rti
FROM alpine:latest
COPY core /lingua-franca/core
COPY include /lingua-franca/include
COPY . /lingua-franca
WORKDIR /lingua-franca/core/federated/RTI
RUN set -ex && apk add --no-cache gcc musl-dev cmake make && \
mkdir container && \
Expand Down
28 changes: 2 additions & 26 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,27 +280,7 @@ void send_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) {
encode_int32((int32_t)tag.microstep, &(buffer[1 + sizeof(int64_t)]));

if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(rti_remote->base.trace, send_DNET, e->id, &tag);
}
if (write_to_socket(((federate_info_t*)e)->socket, message_length, buffer)) {
lf_print_error("RTI failed to send downstream next event tag to federate %d.", e->id);
e->state = NOT_CONNECTED;
} else {
e->last_DNET = tag;
LF_PRINT_LOG("RTI sent to federate %d the Downstream Next Event Tag (DNET) " PRINTF_TAG ".", e->id,
tag.time - start_time, tag.microstep);
}
}

void send_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) {
size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t);
unsigned char buffer[message_length];
buffer[0] = MSG_TYPE_DOWNSTREAM_NEXT_EVENT_TAG;
encode_int64(tag.time, &(buffer[1]));
encode_int32((int32_t)tag.microstep, &(buffer[1 + sizeof(int64_t)]));

if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(rti_remote->base.trace, send_DNET, e->id, &tag);
tracepoint_rti_to_federate(send_DNET, e->id, &tag);
}
if (write_to_socket(((federate_info_t*)e)->socket, message_length, buffer)) {
lf_print_error("RTI failed to send downstream next event tag to federate %d.", e->id);
Expand Down Expand Up @@ -931,11 +911,6 @@ void* clock_synchronization_thread(void* noargs) {
}

// Initiate a clock synchronization every rti->clock_sync_period_ns
// Initiate a clock synchronization every rti->clock_sync_period_ns
struct timespec sleep_time = {(time_t)rti_remote->clock_sync_period_ns / BILLION,
rti_remote->clock_sync_period_ns % BILLION};
struct timespec remaining_time;

bool any_federates_connected = true;
while (any_federates_connected) {
// Sleep
Expand Down Expand Up @@ -1382,6 +1357,7 @@ static int receive_connection_information(int* socket_id, uint16_t fed_id) {
unsigned char* connections_info_body = NULL;
if (connections_info_body_size > 0) {
connections_info_body = (unsigned char*)malloc(connections_info_body_size);
LF_ASSERT_NON_NULL(connections_info_body);
read_from_socket_fail_on_error(socket_id, connections_info_body_size, connections_info_body, NULL,
"RTI failed to read MSG_TYPE_NEIGHBOR_STRUCTURE message body from federate %d.",
fed_id);
Expand Down
Loading

0 comments on commit f266230

Please sign in to comment.