diff --git a/inc/rtcan.h b/inc/rtcan.h index e5b7d23..2977417 100644 --- a/inc/rtcan.h +++ b/inc/rtcan.h @@ -59,6 +59,16 @@ typedef struct _rtcan_subscriber_t { */ struct _rtcan_subscriber_t* next_subscriber_ptr; + /** + * @brief Subscription period (optional) + */ + uint32_t subscription_period; + + /** + * @brief Timestamp of the last delivered message. + */ + uint32_t last_timestamp; + } rtcan_subscriber_t; /** @@ -233,7 +243,8 @@ rtcan_status_t rtcan_handle_rx_it(rtcan_handle_t* rtcan_h, rtcan_status_t rtcan_subscribe(rtcan_handle_t* rtcan_h, uint32_t can_id, - TX_QUEUE* queue_ptr); + TX_QUEUE* queue_ptr, + uint32_t period); rtcan_status_t rtcan_msg_consumed(rtcan_handle_t* rtcan_h, rtcan_msg_t* msg_ptr); diff --git a/src/rtcan.c b/src/rtcan.c index 8c6e1c7..92bacd5 100644 --- a/src/rtcan.c +++ b/src/rtcan.c @@ -430,9 +430,10 @@ static rtcan_hashmap_node_t* create_hashmap_node(rtcan_handle_t* rtcan_h, * * @param[in] rtcan_h RTCAN handle * @param[in] queue_ptr Pointer to subscriber's associated queue + * @param[in] period Subscriber period in ticks (rate) */ static rtcan_subscriber_t* create_subscriber(rtcan_handle_t* rtcan_h, - TX_QUEUE* queue_ptr) + TX_QUEUE* queue_ptr, uint32_t period) { rtcan_subscriber_t* new_subscriber_ptr = NULL; @@ -447,6 +448,8 @@ static rtcan_subscriber_t* create_subscriber(rtcan_handle_t* rtcan_h, { new_subscriber_ptr->next_subscriber_ptr = NULL; new_subscriber_ptr->queue_ptr = queue_ptr; + new_subscriber_ptr->subscription_period = period; + new_subscriber_ptr->last_timestamp = tx_time_get(); } return new_subscriber_ptr; @@ -481,10 +484,11 @@ static rtcan_hashmap_node_t* find_hashmap_node(rtcan_handle_t* rtcan_h, * @param[in] rtcan_h RTCAN handle * @param[in] can_id CAN ID to receive notification for * @param[in] queue_ptr Destination to receive messages + * @param[in] period Subscription period in ticks (rate) */ rtcan_status_t rtcan_subscribe(rtcan_handle_t* rtcan_h, uint32_t can_id, - TX_QUEUE* queue_ptr) + TX_QUEUE* queue_ptr, uint32_t period) { const uint32_t index = hashmap_index(can_id); @@ -497,7 +501,8 @@ rtcan_status_t rtcan_subscribe(rtcan_handle_t* rtcan_h, if (no_errors(rtcan_h)) { new_node_ptr->first_subscriber_ptr = create_subscriber(rtcan_h, - queue_ptr); + queue_ptr, + period); } if (no_errors(rtcan_h)) @@ -534,7 +539,8 @@ rtcan_status_t rtcan_subscribe(rtcan_handle_t* rtcan_h, { node_ptr = node_ptr->chained_node_ptr; node_ptr->first_subscriber_ptr = create_subscriber(rtcan_h, - queue_ptr); + queue_ptr, + period); } } else // add to existing node @@ -547,7 +553,8 @@ rtcan_status_t rtcan_subscribe(rtcan_handle_t* rtcan_h, } subscriber_ptr->next_subscriber_ptr = create_subscriber(rtcan_h, - queue_ptr); + queue_ptr, + period); } } @@ -561,7 +568,8 @@ rtcan_status_t rtcan_subscribe(rtcan_handle_t* rtcan_h, } subscriber_ptr->next_subscriber_ptr = create_subscriber(rtcan_h, - queue_ptr); + queue_ptr, + period); } } @@ -684,20 +692,32 @@ static void rtcan_rx_thread_entry(ULONG input) msg_ptr->identifier); if (node_ptr != NULL) - { + { rtcan_subscriber_t* subscriber_ptr = node_ptr->first_subscriber_ptr; while (subscriber_ptr != NULL) { - msg_ptr->reference_count++; - - tx_status = tx_queue_send(subscriber_ptr->queue_ptr, - &msg_ptr, - TX_NO_WAIT); + UINT time_now = tx_time_get(); + UINT time_delta = time_now - subscriber_ptr->last_timestamp; - if (tx_status != TX_SUCCESS) + // only deliver message if enough ticks have passed + if(time_delta >= subscriber_ptr->subscription_period) { - msg_ptr->reference_count--; + msg_ptr->reference_count++; + + tx_status = tx_queue_send(subscriber_ptr->queue_ptr, + &msg_ptr, + TX_NO_WAIT); + + if (tx_status != TX_SUCCESS) + { + msg_ptr->reference_count--; + } + else + { + // update last delivered timestamp if success + subscriber_ptr->last_timestamp = time_now; + } } subscriber_ptr = subscriber_ptr->next_subscriber_ptr;