From 3ce9382addb0cf5d6d56109f04a657538e7a6eb1 Mon Sep 17 00:00:00 2001 From: tanneberger Date: Thu, 16 Jan 2025 22:36:28 +0100 Subject: [PATCH] adding UARTChannel implementation --- include/reactor-uc/federated.h | 2 +- include/reactor-uc/logging.h | 2 + include/reactor-uc/network_channel.h | 15 +++- .../reactor-uc/platform/riot/uart_channel.h | 43 ++++++++-- .../external_modules/reactor-uc/Makefile.dep | 6 ++ src/logging.c | 4 +- src/platform/riot/uart_channel.c | 82 ++++++++++++++----- src/schedulers/dynamic/scheduler.c | 1 - src/serialization.c | 4 +- 9 files changed, 127 insertions(+), 32 deletions(-) diff --git a/include/reactor-uc/federated.h b/include/reactor-uc/federated.h index e606d7d1..6163b44e 100644 --- a/include/reactor-uc/federated.h +++ b/include/reactor-uc/federated.h @@ -73,4 +73,4 @@ void FederatedInputConnection_ctor(FederatedInputConnection *self, Reactor *pare bool *payload_used_buf, size_t payload_size, size_t payload_buf_capacity); void Federated_distribute_start_tag(Environment *env, instant_t start_time); -#endif \ No newline at end of file +#endif diff --git a/include/reactor-uc/logging.h b/include/reactor-uc/logging.h index 9ea8d7ba..a98f8b2f 100644 --- a/include/reactor-uc/logging.h +++ b/include/reactor-uc/logging.h @@ -11,7 +11,9 @@ #define LF_LOG_LEVEL_DEBUG 4 // Add color codes to the output +#ifndef LF_COLORIZE_LOGS #define LF_COLORIZE_LOGS 1 +#endif // The default log level for any unspecified module #ifndef LF_LOG_LEVEL_ALL diff --git a/include/reactor-uc/network_channel.h b/include/reactor-uc/network_channel.h index a90d1384..32ba324e 100644 --- a/include/reactor-uc/network_channel.h +++ b/include/reactor-uc/network_channel.h @@ -1,7 +1,7 @@ #ifndef REACTOR_UC_NETWORK_CHANNEL_H #define REACTOR_UC_NETWORK_CHANNEL_H -#include "nanopb/pb.h" +#include #include "proto/message.pb.h" #include "reactor-uc/tag.h" @@ -35,6 +35,11 @@ typedef enum { NETWORK_CHANNEL_TYPE_UART } NetworkChannelType; +typedef enum { + NETWORK_CHANNEL_RECEIVE_TYPE_POLL, + NETWORK_CHANNEL_RECEIVE_TYPE_ASYNC, +} NetworkChannelReceiveType; + char *NetworkChannel_state_to_string(NetworkChannelState state); typedef struct FederatedConnectionBundle FederatedConnectionBundle; @@ -50,6 +55,8 @@ struct NetworkChannel { * @brief Type of the network channel to differentiate between different implementations such as TcpIp or CoapUdpIp. */ NetworkChannelType type; + + NetworkChannelReceiveType receive_type; /** * @brief Get the current state of the connection. @@ -88,6 +95,12 @@ struct NetworkChannel { * @brief Free up NetworkChannel, join threads etc. */ void (*free)(NetworkChannel *self); + + bool data_available; + bool needs_polling; + + void (*poll)(NetworkChannel *self); + }; #endif // REACTOR_UC_NETWORK_CHANNEL_H diff --git a/include/reactor-uc/platform/riot/uart_channel.h b/include/reactor-uc/platform/riot/uart_channel.h index fbc0f17d..5a96f59b 100644 --- a/include/reactor-uc/platform/riot/uart_channel.h +++ b/include/reactor-uc/platform/riot/uart_channel.h @@ -1,8 +1,39 @@ -// -// Created by tanneberger on 1/3/25. -// +#ifndef REACTOR_UC_UART_CHANNEL_H +#define REACTOR_UC_UART_CHANNEL_H -#ifndef UART_CHANNEL_H -#define UART_CHANNEL_H +#define MODULE_PERIPH_UART_RXSTART_IRQ -#endif //UART_CHANNEL_H +#include "reactor-uc/network_channel.h" +#include "reactor-uc/environment.h" + +#include "periph/uart.h" +#include "cond.h" + +typedef struct UARTChannel UARTChannel; +typedef struct FederatedConnectionBundle FederatedConnectionBundle; + +#define UART_CHANNEL_BUFFERSIZE 1024 +#define UART_CHANNEL_EXPECTED_CONNECT_DURATION MSEC(10) //TODO: + +struct UARTChannel { + NetworkChannel super; + NetworkChannelState state; + + FederateMessage output; + unsigned char write_buffer[UART_CHANNEL_BUFFERSIZE]; + unsigned char read_buffer[UART_CHANNEL_BUFFERSIZE]; + unsigned char connection_thread_stack[THREAD_STACKSIZE_MAIN]; + int connection_thread_pid; + + unsigned int read_index; + uart_t uart_dev; + mutex_t receive_lock; + cond_t receive_cv; + + FederatedConnectionBundle *federated_connection; + void (*receive_callback)(FederatedConnectionBundle *conn, const FederateMessage *message); +}; + +void UARTChannel_ctor(UARTChannel *self, Environment *env, uint32_t baud); + +#endif diff --git a/make/riot/external_modules/reactor-uc/Makefile.dep b/make/riot/external_modules/reactor-uc/Makefile.dep index dd437ed2..e89bb662 100644 --- a/make/riot/external_modules/reactor-uc/Makefile.dep +++ b/make/riot/external_modules/reactor-uc/Makefile.dep @@ -32,3 +32,9 @@ ifeq ($(filter -DNETWORK_CHANNEL_COAP_RIOT, $(CFLAGS)), -DNETWORK_CHANNEL_COAP_R # Enable coap USEMODULE += gcoap endif + +ifeq ($(filter -DNETWORK_CHANNEL_UART_RIOT, $(CFLAGS)), -DNETWORK_CHANNEL_UART_RIOT) + USEMODULE += periph_uart + USEMODULE += periph_uart_rxstart_irq +endif + diff --git a/src/logging.c b/src/logging.c index c99bdeec..76158195 100644 --- a/src/logging.c +++ b/src/logging.c @@ -62,9 +62,9 @@ void log_message(int level, const char *module, const char *fmt, ...) { #endif log_printf("[%s] [%s] ", level_str, module); Platform_vprintf(fmt, args); -#ifdef LF_COLORIZE_LOGS +#if LF_COLORIZE_LOGS==1 log_printf(ANSI_COLOR_RESET); #endif log_printf("\n"); va_end(args); -} \ No newline at end of file +} diff --git a/src/platform/riot/uart_channel.c b/src/platform/riot/uart_channel.c index c3128310..b0a830a9 100644 --- a/src/platform/riot/uart_channel.c +++ b/src/platform/riot/uart_channel.c @@ -2,6 +2,7 @@ #include "reactor-uc/logging.h" #include "reactor-uc/environment.h" #include "reactor-uc/serialization.h" +#include "led.h" #define UART_CHANNEL_ERR(fmt, ...) LF_ERR(NET, "UARTChannel: " fmt, ##__VA_ARGS__) #define UART_CHANNEL_WARN(fmt, ...) LF_WARN(NET, "UARTChannel: " fmt, ##__VA_ARGS__) @@ -30,19 +31,18 @@ static bool UARTChannel_is_connected(NetworkChannel *untyped_self) { } static lf_ret_t UARTChannel_send_blocking(NetworkChannel *untyped_self, const FederateMessage *message) { - UART_CHANNEL_DEBUG("Send blocking"); UARTChannel *self = (UARTChannel *)untyped_self; if (self->state == NETWORK_CHANNEL_STATE_CONNECTED) { int message_size = serialize_to_protobuf(message, self->write_buffer, UART_CHANNEL_BUFFERSIZE); - uart_write(self->uart_dev, self->write_buffer, message_size); + UART_CHANNEL_DEBUG("Sending Message of Size: %i", message_size); + uart_write(self->uart_dev, self->write_buffer, message_size); return LF_OK; } else { return LF_ERR; } - } static void UARTChannel_register_receive_callback(NetworkChannel *untyped_self, @@ -57,34 +57,77 @@ static void UARTChannel_register_receive_callback(NetworkChannel *untyped_self, } -void uart_receive_callback(void* arg, uint8_t received_byte) { +void _UARTChannel_receive_callback(void* arg, uint8_t received_byte) { UARTChannel* self = (UARTChannel*) arg; - const uint32_t minimum_message_size = 8; - + const uint32_t minimum_message_size = 12; + self->read_buffer[self->read_index] = received_byte; - + self->read_index++; + if (self->read_index >= minimum_message_size) { - UART_CHANNEL_DEBUG("Has %d bytes in read_buffer from last recv. Trying to deserialize", self->read_index); - int bytes_left = deserialize_from_protobuf(&self->output, self->read_buffer, self->read_index); - if (bytes_left >= 0) { - UART_CHANNEL_DEBUG("%d bytes left after deserialize", bytes_left); - - memcpy(self->read_buffer, self->read_buffer + (self->read_index - bytes_left), bytes_left); - self->read_index = bytes_left; - self->receive_callback(self->federated_connection, &self->output); - } + cond_signal(&self->receive_cv); } } - +void _UARTChannel_decode_loop(void *arg) { + UARTChannel* self = (UARTChannel*) arg; + mutex_lock(&self->receive_lock); + + UART_CHANNEL_DEBUG("Entering decoding loop"); + while (true) { + cond_wait(&self->receive_cv, &self->receive_lock); + + int bytes_left = deserialize_from_protobuf(&self->output, self->read_buffer, self->read_index); + UART_CHANNEL_DEBUG("Bytes Left after attempted to deserialize %d", bytes_left); + + if (bytes_left >= 0) { + int read_index = self->read_index; + self->read_index = bytes_left; + + memcpy(self->read_buffer, self->read_buffer + (read_index - bytes_left), bytes_left); + + if (self->receive_callback != NULL) { + UART_CHANNEL_DEBUG("calling user callback!"); + self->receive_callback(self->federated_connection, &self->output); + } + } + } +} void UARTChannel_ctor(UARTChannel *self, Environment *env, uint32_t baud) { assert(self != NULL); assert(env != NULL); - uart_init(self->uart_dev, baud, uart_receive_callback, self); + self->uart_dev = UART_DEV(0); + + int result = uart_init(self->uart_dev, baud, _UARTChannel_receive_callback, self); + + if (result == -ENODEV) { + UART_CHANNEL_ERR("Invalid UART device!"); + } else if (result == ENOTSUP) { + UART_CHANNEL_ERR("Given configuration to uart is not supported!"); + } else if (result < 0) { + UART_CHANNEL_ERR("UART Init error occurred"); + } + + result = uart_mode(self->uart_dev, UART_DATA_BITS_8, UART_PARITY_EVEN, UART_STOP_BITS_2); + + if (result != UART_OK) { + UART_CHANNEL_ERR("Problem to configure UART device!"); + return; + } + + cond_init(&self->receive_cv); + mutex_init(&self->receive_lock); + + // Create decoding thread + self->connection_thread_pid = thread_create( + self->connection_thread_stack, + sizeof(self->connection_thread_stack), + THREAD_PRIORITY_MAIN - 1, + 0, + _UARTChannel_decode_loop, self, "uart_channel_decode_loop"); - // Super fields self->super.expected_connect_duration = UART_CHANNEL_EXPECTED_CONNECT_DURATION; self->super.type = NETWORK_CHANNEL_TYPE_UART; self->super.is_connected = UARTChannel_is_connected; @@ -95,6 +138,7 @@ void UARTChannel_ctor(UARTChannel *self, Environment *env, uint32_t baud) { self->super.free = UARTChannel_free; // Concrete fields + self->read_index = 0; self->receive_callback = NULL; self->federated_connection = NULL; self->state = NETWORK_CHANNEL_STATE_CONNECTED; diff --git a/src/schedulers/dynamic/scheduler.c b/src/schedulers/dynamic/scheduler.c index f41b1c44..69852ac3 100644 --- a/src/schedulers/dynamic/scheduler.c +++ b/src/schedulers/dynamic/scheduler.c @@ -281,7 +281,6 @@ void Scheduler_run(Scheduler *untyped_self) { self->run_timestep(untyped_self); self->clean_up_timestep(untyped_self); - env->enter_critical_section(env); } diff --git a/src/serialization.c b/src/serialization.c index 55d63424..1b111d6a 100644 --- a/src/serialization.c +++ b/src/serialization.c @@ -1,7 +1,7 @@ #include "reactor-uc/serialization.h" -#include "nanopb/pb_decode.h" -#include "nanopb/pb_encode.h" +#include +#include #ifdef MIN #undef MIN