Skip to content

Commit

Permalink
adding UARTChannel implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tanneberger committed Jan 16, 2025
1 parent eaeb7c9 commit 3ce9382
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 32 deletions.
2 changes: 1 addition & 1 deletion include/reactor-uc/federated.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ void FederatedInputConnection_ctor(FederatedInputConnection *self, Reactor *pare
bool *payload_used_buf, size_t payload_size, size_t payload_buf_capacity);

void Federated_distribute_start_tag(Environment *env, instant_t start_time);
#endif
#endif
2 changes: 2 additions & 0 deletions include/reactor-uc/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
#define LF_LOG_LEVEL_DEBUG 4

// Add color codes to the output
#ifndef LF_COLORIZE_LOGS
#define LF_COLORIZE_LOGS 1
#endif

// The default log level for any unspecified module
#ifndef LF_LOG_LEVEL_ALL
Expand Down
15 changes: 14 additions & 1 deletion include/reactor-uc/network_channel.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef REACTOR_UC_NETWORK_CHANNEL_H
#define REACTOR_UC_NETWORK_CHANNEL_H

#include "nanopb/pb.h"
#include <nanopb/pb.h>

#include "proto/message.pb.h"
#include "reactor-uc/tag.h"
Expand Down Expand Up @@ -35,6 +35,11 @@ typedef enum {
NETWORK_CHANNEL_TYPE_UART
} NetworkChannelType;

typedef enum {
NETWORK_CHANNEL_RECEIVE_TYPE_POLL,
NETWORK_CHANNEL_RECEIVE_TYPE_ASYNC,
} NetworkChannelReceiveType;

char *NetworkChannel_state_to_string(NetworkChannelState state);

typedef struct FederatedConnectionBundle FederatedConnectionBundle;
Expand All @@ -50,6 +55,8 @@ struct NetworkChannel {
* @brief Type of the network channel to differentiate between different implementations such as TcpIp or CoapUdpIp.
*/
NetworkChannelType type;

NetworkChannelReceiveType receive_type;

/**
* @brief Get the current state of the connection.
Expand Down Expand Up @@ -88,6 +95,12 @@ struct NetworkChannel {
* @brief Free up NetworkChannel, join threads etc.
*/
void (*free)(NetworkChannel *self);

bool data_available;
bool needs_polling;

void (*poll)(NetworkChannel *self);

};

#endif // REACTOR_UC_NETWORK_CHANNEL_H
43 changes: 37 additions & 6 deletions include/reactor-uc/platform/riot/uart_channel.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,39 @@
//
// Created by tanneberger on 1/3/25.
//
#ifndef REACTOR_UC_UART_CHANNEL_H
#define REACTOR_UC_UART_CHANNEL_H

#ifndef UART_CHANNEL_H
#define UART_CHANNEL_H
#define MODULE_PERIPH_UART_RXSTART_IRQ

#endif //UART_CHANNEL_H
#include "reactor-uc/network_channel.h"
#include "reactor-uc/environment.h"

#include "periph/uart.h"
#include "cond.h"

typedef struct UARTChannel UARTChannel;
typedef struct FederatedConnectionBundle FederatedConnectionBundle;

#define UART_CHANNEL_BUFFERSIZE 1024
#define UART_CHANNEL_EXPECTED_CONNECT_DURATION MSEC(10) //TODO:

struct UARTChannel {
NetworkChannel super;
NetworkChannelState state;

FederateMessage output;
unsigned char write_buffer[UART_CHANNEL_BUFFERSIZE];
unsigned char read_buffer[UART_CHANNEL_BUFFERSIZE];
unsigned char connection_thread_stack[THREAD_STACKSIZE_MAIN];
int connection_thread_pid;

unsigned int read_index;
uart_t uart_dev;
mutex_t receive_lock;
cond_t receive_cv;

FederatedConnectionBundle *federated_connection;
void (*receive_callback)(FederatedConnectionBundle *conn, const FederateMessage *message);
};

void UARTChannel_ctor(UARTChannel *self, Environment *env, uint32_t baud);

#endif
6 changes: 6 additions & 0 deletions make/riot/external_modules/reactor-uc/Makefile.dep
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ ifeq ($(filter -DNETWORK_CHANNEL_COAP_RIOT, $(CFLAGS)), -DNETWORK_CHANNEL_COAP_R
# Enable coap
USEMODULE += gcoap
endif

ifeq ($(filter -DNETWORK_CHANNEL_UART_RIOT, $(CFLAGS)), -DNETWORK_CHANNEL_UART_RIOT)
USEMODULE += periph_uart
USEMODULE += periph_uart_rxstart_irq
endif

4 changes: 2 additions & 2 deletions src/logging.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ void log_message(int level, const char *module, const char *fmt, ...) {
#endif
log_printf("[%s] [%s] ", level_str, module);
Platform_vprintf(fmt, args);
#ifdef LF_COLORIZE_LOGS
#if LF_COLORIZE_LOGS==1
log_printf(ANSI_COLOR_RESET);
#endif
log_printf("\n");
va_end(args);
}
}
82 changes: 63 additions & 19 deletions src/platform/riot/uart_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "reactor-uc/logging.h"
#include "reactor-uc/environment.h"
#include "reactor-uc/serialization.h"
#include "led.h"

#define UART_CHANNEL_ERR(fmt, ...) LF_ERR(NET, "UARTChannel: " fmt, ##__VA_ARGS__)
#define UART_CHANNEL_WARN(fmt, ...) LF_WARN(NET, "UARTChannel: " fmt, ##__VA_ARGS__)
Expand Down Expand Up @@ -30,19 +31,18 @@ static bool UARTChannel_is_connected(NetworkChannel *untyped_self) {
}

static lf_ret_t UARTChannel_send_blocking(NetworkChannel *untyped_self, const FederateMessage *message) {
UART_CHANNEL_DEBUG("Send blocking");
UARTChannel *self = (UARTChannel *)untyped_self;

if (self->state == NETWORK_CHANNEL_STATE_CONNECTED) {
int message_size = serialize_to_protobuf(message, self->write_buffer, UART_CHANNEL_BUFFERSIZE);

uart_write(self->uart_dev, self->write_buffer, message_size);
UART_CHANNEL_DEBUG("Sending Message of Size: %i", message_size);

uart_write(self->uart_dev, self->write_buffer, message_size);
return LF_OK;
} else {
return LF_ERR;
}

}

static void UARTChannel_register_receive_callback(NetworkChannel *untyped_self,
Expand All @@ -57,34 +57,77 @@ static void UARTChannel_register_receive_callback(NetworkChannel *untyped_self,
}


void uart_receive_callback(void* arg, uint8_t received_byte) {
void _UARTChannel_receive_callback(void* arg, uint8_t received_byte) {
UARTChannel* self = (UARTChannel*) arg;
const uint32_t minimum_message_size = 8;

const uint32_t minimum_message_size = 12;
self->read_buffer[self->read_index] = received_byte;

self->read_index++;

if (self->read_index >= minimum_message_size) {
UART_CHANNEL_DEBUG("Has %d bytes in read_buffer from last recv. Trying to deserialize", self->read_index);
int bytes_left = deserialize_from_protobuf(&self->output, self->read_buffer, self->read_index);
if (bytes_left >= 0) {
UART_CHANNEL_DEBUG("%d bytes left after deserialize", bytes_left);

memcpy(self->read_buffer, self->read_buffer + (self->read_index - bytes_left), bytes_left);
self->read_index = bytes_left;
self->receive_callback(self->federated_connection, &self->output);
}
cond_signal(&self->receive_cv);
}
}


void _UARTChannel_decode_loop(void *arg) {
UARTChannel* self = (UARTChannel*) arg;
mutex_lock(&self->receive_lock);

UART_CHANNEL_DEBUG("Entering decoding loop");
while (true) {
cond_wait(&self->receive_cv, &self->receive_lock);

int bytes_left = deserialize_from_protobuf(&self->output, self->read_buffer, self->read_index);
UART_CHANNEL_DEBUG("Bytes Left after attempted to deserialize %d", bytes_left);

if (bytes_left >= 0) {
int read_index = self->read_index;
self->read_index = bytes_left;

memcpy(self->read_buffer, self->read_buffer + (read_index - bytes_left), bytes_left);

if (self->receive_callback != NULL) {
UART_CHANNEL_DEBUG("calling user callback!");
self->receive_callback(self->federated_connection, &self->output);
}
}
}
}

void UARTChannel_ctor(UARTChannel *self, Environment *env, uint32_t baud) {
assert(self != NULL);
assert(env != NULL);

uart_init(self->uart_dev, baud, uart_receive_callback, self);
self->uart_dev = UART_DEV(0);

int result = uart_init(self->uart_dev, baud, _UARTChannel_receive_callback, self);

if (result == -ENODEV) {
UART_CHANNEL_ERR("Invalid UART device!");
} else if (result == ENOTSUP) {
UART_CHANNEL_ERR("Given configuration to uart is not supported!");
} else if (result < 0) {
UART_CHANNEL_ERR("UART Init error occurred");
}

result = uart_mode(self->uart_dev, UART_DATA_BITS_8, UART_PARITY_EVEN, UART_STOP_BITS_2);

if (result != UART_OK) {
UART_CHANNEL_ERR("Problem to configure UART device!");
return;
}

cond_init(&self->receive_cv);
mutex_init(&self->receive_lock);

// Create decoding thread
self->connection_thread_pid = thread_create(
self->connection_thread_stack,
sizeof(self->connection_thread_stack),
THREAD_PRIORITY_MAIN - 1,
0,
_UARTChannel_decode_loop, self, "uart_channel_decode_loop");

// Super fields
self->super.expected_connect_duration = UART_CHANNEL_EXPECTED_CONNECT_DURATION;
self->super.type = NETWORK_CHANNEL_TYPE_UART;
self->super.is_connected = UARTChannel_is_connected;
Expand All @@ -95,6 +138,7 @@ void UARTChannel_ctor(UARTChannel *self, Environment *env, uint32_t baud) {
self->super.free = UARTChannel_free;

// Concrete fields
self->read_index = 0;
self->receive_callback = NULL;
self->federated_connection = NULL;
self->state = NETWORK_CHANNEL_STATE_CONNECTED;
Expand Down
1 change: 0 additions & 1 deletion src/schedulers/dynamic/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ void Scheduler_run(Scheduler *untyped_self) {

self->run_timestep(untyped_self);
self->clean_up_timestep(untyped_self);

env->enter_critical_section(env);
}

Expand Down
4 changes: 2 additions & 2 deletions src/serialization.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "reactor-uc/serialization.h"

#include "nanopb/pb_decode.h"
#include "nanopb/pb_encode.h"
#include <nanopb/pb_decode.h>
#include <nanopb/pb_encode.h>

#ifdef MIN
#undef MIN
Expand Down

0 comments on commit 3ce9382

Please sign in to comment.