Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run one worker on main thread #375

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ endif()

# Link with thread library, unless we are on the Zephyr platform.
if(NOT DEFINED LF_SINGLE_THREADED OR DEFINED LF_TRACE)
if (NOT PLATFORM_ZEPHYR)
if (NOT (PLATFORM_ZEPHYR OR PLATFORM_RP2040))
find_package(Threads REQUIRED)
target_link_libraries(reactor-c PUBLIC Threads::Threads)
endif()
Expand Down
1 change: 1 addition & 0 deletions core/platform/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Zephyr")
set(PLATFORM_ZEPHYR true)
elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Rp2040")
list(APPEND REACTORC_COMPILE_DEFS PLATFORM_RP2040)
set(PLATFORM_RP2040 true)
endif()

# Prepend all sources with platform
Expand Down
117 changes: 111 additions & 6 deletions core/platform/lf_rp2040_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* @author{Abhi Gundrala <[email protected]>}
*/

#if !defined(LF_SINGLE_THREADED)
#error "Only the single-threaded runtime has support for RP2040"
#endif

#include "lf_rp2040_support.h"
#include "platform.h"
#include "utils/util.h"
Expand All @@ -51,6 +47,11 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
static critical_section_t _lf_crit_sec;

/**
* critical section struct for atomics implementation
*/
static critical_section_t _lf_atomics_crit_sec;

/**
* binary semaphore for lf event notification
* used by external isr or second core thread.
Expand All @@ -63,13 +64,16 @@ static uint32_t _lf_num_nested_crit_sec = 0;

/**
* Initialize basic runtime infrastructure and
* synchronization structs for an single-threaded runtime.
* synchronization structs for a single-threaded runtime.
*/
void _lf_initialize_clock(void) {
// init stdio lib
// may fail, but failure may be ok/expected if printing is not needed
// (i.e. if neither USB nor UART are enabled)
stdio_init_all();
// init sync structs
critical_section_init(&_lf_crit_sec);
critical_section_init(&_lf_atomics_crit_sec);
sem_init(&_lf_sem_irq_event, 0, 1);
}

Expand Down Expand Up @@ -203,9 +207,110 @@ int lf_enable_interrupts_nested() {
*/
int _lf_single_threaded_notify_of_event() {
// notify main sleep loop of event
sem_release(&_lf_sem_irq_event);
if (sem_release(&_lf_sem_irq_event)) {
return 0;
}
return 1;
}

#else // LF_SINGLE_THREADED

#warning "Threaded runtime on RP2040 is still experimental"

/**
* @brief Get the number of cores on the host machine.
*/
int lf_available_cores() {
return 2;
}

static void *(*thread_1) (void *);
static void* thread_1_args;
static int num_create_threads_called = 0;
static semaphore_t thread_1_done;
static void* thread_1_return;

#define MAGIC_THREAD1_ID 314159

void core1_entry() {
thread_1_return = thread_1(thread_1_args);
sem_reset(&thread_1_done, 1);
}

int lf_thread_create(lf_thread_t* thread, void *(*lf_thread) (void *), void* arguments) {
// make sure this fn is only called once
if (num_create_threads_called != 0) {
return 1;
}
thread_1 = lf_thread;
thread_1_args = arguments;
num_create_threads_called += 1;
sem_init(&thread_1_done, 0, 1);
multicore_launch_core1(core1_entry);
*thread = MAGIC_THREAD1_ID;
return 0;
}

int lf_thread_join(lf_thread_t thread, void** thread_return) {
if (thread != MAGIC_THREAD1_ID) {
return 1;
}
sem_acquire_blocking(&thread_1_done);
// release in case join is called again
if (!sem_release(&thread_1_done)) {
// shouldn't be possible; lf_thread_join is only called from main thread
return 1;
}
if (thread_return) {
*thread_return = thread_1_return;
}
return 0;
}

int lf_mutex_init(lf_mutex_t* mutex) {
recursive_mutex_init(mutex);
return 0;
}

int lf_mutex_lock(lf_mutex_t* mutex) {
recursive_mutex_enter_blocking(mutex);
return 0;
}

int lf_mutex_unlock(lf_mutex_t* mutex) {
recursive_mutex_exit(mutex);
return 0;
}

int lf_cond_init(lf_cond_t* cond, lf_mutex_t* mutex) {
sem_init(&(cond->sema), 0, 1);
cond->mutex = mutex;
return 0;
}

int lf_cond_broadcast(lf_cond_t* cond) {
sem_reset(&(cond->sema), 1);
return 0;
}

int lf_cond_signal(lf_cond_t* cond) {
sem_reset(&(cond->sema), 1);
return 0;
}

int lf_cond_wait(lf_cond_t* cond) {
lf_mutex_unlock(cond->mutex);
sem_acquire_blocking(&(cond->sema));
lf_mutex_lock(cond->mutex);
return 0;
}

int _lf_cond_timedwait(lf_cond_t* cond, instant_t absolute_time_ns) {
absolute_time_t a = from_us_since_boot(absolute_time_ns / 1000);
bool acquired_permit = sem_acquire_block_until(&(cond->sema), a);
return acquired_permit ? 0 : LF_TIMEOUT;
}

#endif // LF_SINGLE_THREADED


Expand Down
54 changes: 34 additions & 20 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -1026,17 +1026,6 @@ void lf_print_snapshot(environment_t* env) {
}
}

// Start threads in the thread pool.
void start_threads(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);

LF_PRINT_LOG("Starting %u worker threads in environment", env->num_workers);
for (unsigned int i = 0; i < env->num_workers; i++) {
if (lf_thread_create(&env->thread_ids[i], worker, env) != 0) {
lf_print_error_and_exit("Could not start thread-%u", i);
}
}
}

/**
* @brief Determine the number of workers.
Expand Down Expand Up @@ -1161,25 +1150,50 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
_lf_initialize_start_tag(env);

lf_print("Environment %u: ---- Spawning %d workers.",env->id, env->num_workers);
start_threads(env);

for (unsigned int j = 0; j < env->num_workers; j++) {
if (i == 0 && j == 0) {
// The first worker thread of the first environment will be
// run on the main thread, rather than creating a new thread.
// This is important for bare-metal platforms, who can't
// afford to have the main thread sit idle.
continue;
}
if (lf_thread_create(&env->thread_ids[j], worker, env) != 0) {
lf_print_error_and_exit("Could not start thread-%u", j);
}
}

// Unlock mutex and allow threads proceed
LF_MUTEX_UNLOCK(&env->mutex);
}

// main thread worker (first worker thread of first environment)
void* main_thread_exit_status = NULL;
if (num_envs > 0 && envs[0].num_workers > 0) {
environment_t *env = &envs[0];
main_thread_exit_status = worker(env);
}

for (int i = 0; i<num_envs; i++) {
// Wait for the worker threads to exit.
environment_t* env = &envs[i];
void* worker_thread_exit_status = NULL;
int ret = 0;
for (int i = 0; i < env->num_workers; i++) {
int failure = lf_thread_join(env->thread_ids[i], &worker_thread_exit_status);
if (failure) {
lf_print_error("Failed to join thread listening for incoming messages: %s", strerror(failure));
}
if (worker_thread_exit_status != NULL) {
lf_print_error("---- Worker %d reports error code %p", i, worker_thread_exit_status);
for (int j = 0; j < env->num_workers; j++) {
if (i == 0 && j == 0) {
// main thread worker
worker_thread_exit_status = main_thread_exit_status;
} else {
int failure = lf_thread_join(env->thread_ids[j], &worker_thread_exit_status);
if (failure) {
lf_print_error("Failed to join thread listening for incoming messages: %s", strerror(failure));
}
}
if (worker_thread_exit_status != NULL) {
lf_print_error("---- Worker %d reports error code %p", j, worker_thread_exit_status);
ret = 1;
}
}
}

if (ret == 0) {
Expand Down
12 changes: 12 additions & 0 deletions include/core/platform/lf_rp2040_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,16 @@
#define LF_TIME_BUFFER_LENGTH 80
#define _LF_TIMEOUT 1

#ifndef LF_SINGLE_THREADED
#warning "Threaded support on rp2040 is still experimental"

typedef recursive_mutex_t lf_mutex_t;
typedef struct {
semaphore_t sema;
lf_mutex_t* mutex;
} lf_cond_t;
typedef int lf_thread_t;

#endif // LF_SINGLE_THREADED

#endif // LF_PICO_SUPPORT_H