Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add reduced rate subscriptions #16

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion inc/rtcan.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ typedef struct _rtcan_subscriber_t {
*/
struct _rtcan_subscriber_t* next_subscriber_ptr;

/**
* @brief Subscription period (optional)
*/
uint32_t subscription_period;

/**
* @brief Timestamp of the last delivered message.
*/
uint32_t last_timestamp;

} rtcan_subscriber_t;

/**
Expand Down Expand Up @@ -233,7 +243,8 @@ rtcan_status_t rtcan_handle_rx_it(rtcan_handle_t* rtcan_h,

rtcan_status_t rtcan_subscribe(rtcan_handle_t* rtcan_h,
uint32_t can_id,
TX_QUEUE* queue_ptr);
TX_QUEUE* queue_ptr,
uint32_t period);

rtcan_status_t rtcan_msg_consumed(rtcan_handle_t* rtcan_h,
rtcan_msg_t* msg_ptr);
Expand Down
48 changes: 34 additions & 14 deletions src/rtcan.c
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,10 @@ static rtcan_hashmap_node_t* create_hashmap_node(rtcan_handle_t* rtcan_h,
*
* @param[in] rtcan_h RTCAN handle
* @param[in] queue_ptr Pointer to subscriber's associated queue
* @param[in] period Subscriber period in ticks (rate)
*/
static rtcan_subscriber_t* create_subscriber(rtcan_handle_t* rtcan_h,
TX_QUEUE* queue_ptr)
TX_QUEUE* queue_ptr, uint32_t period)
{
rtcan_subscriber_t* new_subscriber_ptr = NULL;

Expand All @@ -447,6 +448,8 @@ static rtcan_subscriber_t* create_subscriber(rtcan_handle_t* rtcan_h,
{
new_subscriber_ptr->next_subscriber_ptr = NULL;
new_subscriber_ptr->queue_ptr = queue_ptr;
new_subscriber_ptr->subscription_period = period;
new_subscriber_ptr->last_timestamp = tx_time_get();
}

return new_subscriber_ptr;
Expand Down Expand Up @@ -481,10 +484,11 @@ static rtcan_hashmap_node_t* find_hashmap_node(rtcan_handle_t* rtcan_h,
* @param[in] rtcan_h RTCAN handle
* @param[in] can_id CAN ID to receive notification for
* @param[in] queue_ptr Destination to receive messages
* @param[in] period Subscription period in ticks (rate)
*/
rtcan_status_t rtcan_subscribe(rtcan_handle_t* rtcan_h,
uint32_t can_id,
TX_QUEUE* queue_ptr)
TX_QUEUE* queue_ptr, uint32_t period)
{
const uint32_t index = hashmap_index(can_id);

Expand All @@ -497,7 +501,8 @@ rtcan_status_t rtcan_subscribe(rtcan_handle_t* rtcan_h,
if (no_errors(rtcan_h))
{
new_node_ptr->first_subscriber_ptr = create_subscriber(rtcan_h,
queue_ptr);
queue_ptr,
period);
}

if (no_errors(rtcan_h))
Expand Down Expand Up @@ -534,7 +539,8 @@ rtcan_status_t rtcan_subscribe(rtcan_handle_t* rtcan_h,
{
node_ptr = node_ptr->chained_node_ptr;
node_ptr->first_subscriber_ptr = create_subscriber(rtcan_h,
queue_ptr);
queue_ptr,
period);
}
}
else // add to existing node
Expand All @@ -547,7 +553,8 @@ rtcan_status_t rtcan_subscribe(rtcan_handle_t* rtcan_h,
}

subscriber_ptr->next_subscriber_ptr = create_subscriber(rtcan_h,
queue_ptr);
queue_ptr,
period);
}

}
Expand All @@ -561,7 +568,8 @@ rtcan_status_t rtcan_subscribe(rtcan_handle_t* rtcan_h,
}

subscriber_ptr->next_subscriber_ptr = create_subscriber(rtcan_h,
queue_ptr);
queue_ptr,
period);
}
}

Expand Down Expand Up @@ -684,20 +692,32 @@ static void rtcan_rx_thread_entry(ULONG input)
msg_ptr->identifier);

if (node_ptr != NULL)
{
{
rtcan_subscriber_t* subscriber_ptr = node_ptr->first_subscriber_ptr;

while (subscriber_ptr != NULL)
{
msg_ptr->reference_count++;

tx_status = tx_queue_send(subscriber_ptr->queue_ptr,
&msg_ptr,
TX_NO_WAIT);
UINT time_now = tx_time_get();
UINT time_delta = time_now - subscriber_ptr->last_timestamp;

if (tx_status != TX_SUCCESS)
// only deliver message if enough ticks have passed
if(time_delta >= subscriber_ptr->subscription_period)
{
msg_ptr->reference_count--;
msg_ptr->reference_count++;

tx_status = tx_queue_send(subscriber_ptr->queue_ptr,
&msg_ptr,
TX_NO_WAIT);

if (tx_status != TX_SUCCESS)
{
msg_ptr->reference_count--;
}
else
{
// update last delivered timestamp if success
subscriber_ptr->last_timestamp = time_now;
}
}

subscriber_ptr = subscriber_ptr->next_subscriber_ptr;
Expand Down