Skip to content

Commit

Permalink
canardTxPush now flushes expired frames
Browse files Browse the repository at this point in the history
  • Loading branch information
serges147 committed Dec 4, 2024
1 parent c67a59c commit ee67adc
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 19 deletions.
62 changes: 50 additions & 12 deletions libcanard/canard.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@
#define INITIAL_TOGGLE_STATE true

#define CONTAINER_OF(type, ptr, member) \
((type*) (((ptr) == NULL) ? NULL : (void*) (((char*) (ptr)) - offsetof(type, member))))
#define CONST_CONTAINER_OF(type, ptr, member) \
((const type*) (((ptr) == NULL) ? NULL : (const void*) (((const char*) (ptr)) - offsetof(type, member))))
#define MUTABLE_CONTAINER_OF(type, ptr, member) \
((type*) (((ptr) == NULL) ? NULL : (void*) (((char*) (ptr)) - offsetof(type, member))))

/// Used for inserting new items into AVL trees.
CANARD_PRIVATE struct CanardTreeNode* avlTrivialFactory(void* const user_reference)
Expand Down Expand Up @@ -346,8 +346,8 @@ CANARD_PRIVATE int8_t txAVLPriorityPredicate( //
{
typedef struct CanardTxQueueItem TxItem;

const TxItem* const target = CONST_CONTAINER_OF(TxItem, user_reference, priority_base);
const TxItem* const other = CONST_CONTAINER_OF(TxItem, node, priority_base);
const TxItem* const target = CONTAINER_OF(TxItem, user_reference, priority_base);
const TxItem* const other = CONTAINER_OF(TxItem, node, priority_base);
CANARD_ASSERT((target != NULL) && (other != NULL));
return (target->frame.extended_can_id >= other->frame.extended_can_id) ? +1 : -1;
}
Expand All @@ -362,8 +362,8 @@ CANARD_PRIVATE int8_t txAVLDeadlinePredicate( //
{
typedef struct CanardTxQueueItem TxItem;

const TxItem* const target = CONST_CONTAINER_OF(TxItem, user_reference, deadline_base);
const TxItem* const other = CONST_CONTAINER_OF(TxItem, node, deadline_base);
const TxItem* const target = CONTAINER_OF(TxItem, user_reference, deadline_base);
const TxItem* const other = CONTAINER_OF(TxItem, node, deadline_base);
CANARD_ASSERT((target != NULL) && (other != NULL));
return (target->tx_deadline_usec >= other->tx_deadline_usec) ? +1 : -1;
}
Expand Down Expand Up @@ -592,6 +592,35 @@ CANARD_PRIVATE int32_t txPushMultiFrame(struct CanardTxQueue* const que,
return out;
}

/// Flushes expired transfers by comparing deadline timestamps of the pending transfers with the current time.
CANARD_PRIVATE void txFlushExpiredTransfers(struct CanardTxQueue* const que,
const struct CanardInstance* const ins,
const CanardMicrosecond now_usec)
{
struct CanardTxQueueItem* tx_item = NULL;
while (NULL != (tx_item = MUTABLE_CONTAINER_OF( //
struct CanardTxQueueItem,
cavlFindExtremum(que->deadline_root, false),
deadline_base)))
{
if (now_usec <= tx_item->tx_deadline_usec)
{
// The queue is sorted by deadline, so we can stop here.
break;
}

// All frames of the transfer are released at once b/c they all have the same deadline.
struct CanardTxQueueItem* tx_item_to_free = NULL;
while (NULL != (tx_item_to_free = canardTxPop(que, tx_item)))
{
tx_item = tx_item_to_free->next_in_transfer;
canardTxFree(que, ins, tx_item_to_free);

que->stats.dropped_frames++;
}
}
}

// --------------------------------------------- RECEPTION ---------------------------------------------

#define RX_SESSIONS_PER_SUBSCRIPTION (CANARD_NODE_ID_MAX + 1U)
Expand Down Expand Up @@ -1052,7 +1081,7 @@ rxSubscriptionPredicateOnPortID(void* const user_reference, // NOSONAR Cavl API
{
CANARD_ASSERT((user_reference != NULL) && (node != NULL));
const CanardPortID sought = *((const CanardPortID*) user_reference);
const CanardPortID other = CONST_CONTAINER_OF(struct CanardRxSubscription, node, base)->port_id;
const CanardPortID other = CONTAINER_OF(struct CanardRxSubscription, node, base)->port_id;
static const int8_t NegPos[2] = {-1, +1};
// Clang-Tidy mistakenly identifies a narrowing cast to int8_t here, which is incorrect.
return (sought == other) ? 0 : NegPos[sought > other]; // NOLINT no narrowing conversion is taking place here
Expand All @@ -1063,7 +1092,7 @@ rxSubscriptionPredicateOnStruct(void* const user_reference, // NOSONAR Cavl API
const struct CanardTreeNode* const node)
{
return rxSubscriptionPredicateOnPortID( //
&CONTAINER_OF(struct CanardRxSubscription, user_reference, base)->port_id,
&MUTABLE_CONTAINER_OF(struct CanardRxSubscription, user_reference, base)->port_id,
node);
}

Expand Down Expand Up @@ -1119,6 +1148,15 @@ int32_t canardTxPush(struct CanardTxQueue* const que,
const struct CanardPayload payload,
const CanardMicrosecond now_usec)
{
// Before pushing payload (potentially in multiple frames), we need to try to flush any expired transfers.
// This is necessary to ensure that we don't exhaust the capacity of the queue by holding outdated frames.
// The flushing is done by comparing deadline timestamps of the pending transfers with the current time,
// which makes sense only if the current time is known (bigger than zero).
if (now_usec > 0)
{
txFlushExpiredTransfers(que, ins, now_usec);
}

(void) now_usec;

int32_t out = -CANARD_ERROR_INVALID_ARGUMENT;
Expand Down Expand Up @@ -1167,7 +1205,7 @@ struct CanardTxQueueItem* canardTxPeek(const struct CanardTxQueue* const que)
// Paragraph 6.7.2.1.15 of the C standard says:
// A pointer to a structure object, suitably converted, points to its initial member, and vice versa.
struct CanardTreeNode* const priority_node = cavlFindExtremum(que->priority_root, false);
out = CONTAINER_OF(struct CanardTxQueueItem, priority_node, priority_base);
out = MUTABLE_CONTAINER_OF(struct CanardTxQueueItem, priority_node, priority_base);
}
return out;
}
Expand Down Expand Up @@ -1223,7 +1261,7 @@ int8_t canardRxAccept(struct CanardInstance* const ins,
// This is the reason the function has a logarithmic time complexity of the number of subscriptions.
// Note also that this one of the two variable-complexity operations in the RX pipeline; the other one
// is memcpy(). Excepting these two cases, the entire RX pipeline contains neither loops nor recursion.
struct CanardRxSubscription* const sub = CONTAINER_OF( //
struct CanardRxSubscription* const sub = MUTABLE_CONTAINER_OF( //
struct CanardRxSubscription,
cavlSearch(&ins->rx_subscriptions[(size_t) model.transfer_kind],
&model.port_id,
Expand Down Expand Up @@ -1307,7 +1345,7 @@ int8_t canardRxUnsubscribe(struct CanardInstance* const ins,
{
CanardPortID port_id_mutable = port_id;

struct CanardRxSubscription* const sub = CONTAINER_OF( //
struct CanardRxSubscription* const sub = MUTABLE_CONTAINER_OF( //
struct CanardRxSubscription,
cavlSearch(&ins->rx_subscriptions[tk], &port_id_mutable, &rxSubscriptionPredicateOnPortID, NULL),
base);
Expand Down Expand Up @@ -1348,7 +1386,7 @@ int8_t canardRxGetSubscription(struct CanardInstance* const ins,
{
CanardPortID port_id_mutable = port_id;

struct CanardRxSubscription* const sub = CONTAINER_OF( //
struct CanardRxSubscription* const sub = MUTABLE_CONTAINER_OF( //
struct CanardRxSubscription,
cavlSearch(&ins->rx_subscriptions[tk], &port_id_mutable, &rxSubscriptionPredicateOnPortID, NULL),
base);
Expand Down
7 changes: 4 additions & 3 deletions libcanard/canard.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ struct CanardMemoryResource
/// Holds the statistics of a transmission queue.
struct CanardTxQueueStats
{
/// Holds number of dropped TX frames (due to timeout when `now > deadline`).
/// Holds number of dropped TX frames due to timeout (when `now > deadline`) or b/c of transmission failures.
size_t dropped_frames;
};

Expand Down Expand Up @@ -458,8 +458,9 @@ struct CanardInstance
/// The time complexity models given in the API documentation are made on the assumption that the memory management
/// functions have constant complexity O(1).
///
/// The following API functions may allocate memory: canardRxAccept(), canardTxPush().
/// The following API functions may deallocate memory: canardRxAccept(), canardRxSubscribe(), canardRxUnsubscribe().
/// The following API functions may allocate memory: canardTxPush(), canardRxAccept().
/// The following API functions may deallocate memory: canardTxPush(), canardTxFree(), canardRxAccept(),
/// canardRxSubscribe(), canardRxUnsubscribe().
/// The exact memory requirement and usage model is specified for each function in its documentation.
struct CanardMemoryResource memory;

Expand Down
14 changes: 10 additions & 4 deletions tests/helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,17 @@ class TxQueue
const CanardMicrosecond now_usec = 0ULL)
{
checkInvariants();
const auto size_before = que_.size;
const auto ret = canardTxPush(&que_, ins, transmission_deadline_usec, &metadata, payload, now_usec);
const auto num_added = static_cast<std::size_t>(ret);
enforce((ret < 0) || ((size_before + num_added) == que_.size), "Unexpected size change after push");

const auto size_before = que_.size;
const auto dropped_before = que_.stats.dropped_frames;

const auto ret = canardTxPush(&que_, ins, transmission_deadline_usec, &metadata, payload, now_usec);
const auto num_added = static_cast<std::size_t>(ret);

enforce((ret < 0) || ((size_before + num_added + dropped_before - que_.stats.dropped_frames) == que_.size),
"Unexpected size change after push");
checkInvariants();

return ret;
}

Expand Down
116 changes: 116 additions & 0 deletions tests/test_public_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -814,3 +814,119 @@ TEST_CASE("TxPayloadOwnership")
}
}
}

TEST_CASE("TxFlushExpired")
{
helpers::Instance ins;
helpers::TxQueue que{2, CANARD_MTU_CAN_FD}; // Limit capacity at 2 frames.

auto& tx_alloc = que.getAllocator();
auto& ins_alloc = ins.getAllocator();

std::array<std::uint8_t, 1024> payload{};
std::iota(payload.begin(), payload.end(), 0U);

REQUIRE(CANARD_NODE_ID_UNSET == ins.getNodeID());
REQUIRE(CANARD_MTU_CAN_FD == que.getMTU());
REQUIRE(0 == que.getSize());
REQUIRE(0 == tx_alloc.getNumAllocatedFragments());
REQUIRE(0 == ins_alloc.getNumAllocatedFragments());

CanardMicrosecond now = 10'000'000ULL; // 10s
const CanardMicrosecond deadline = 1'000'000ULL; // 1s

CanardTransferMetadata meta{};

// 1. Push single-frame with padding, peek. @ 10s
{
meta.priority = CanardPriorityNominal;
meta.transfer_kind = CanardTransferKindMessage;
meta.port_id = 321;
meta.remote_node_id = CANARD_NODE_ID_UNSET;
meta.transfer_id = 21;
REQUIRE(1 == que.push(&ins.getInstance(), now + deadline, meta, {8, payload.data()}, now));
REQUIRE(1 == que.getSize());
REQUIRE(1 == tx_alloc.getNumAllocatedFragments());
REQUIRE((8 + 4) == tx_alloc.getTotalAllocatedAmount());
REQUIRE(1 == ins_alloc.getNumAllocatedFragments());
REQUIRE(sizeof(CanardTxQueueItem) * 1 == ins_alloc.getTotalAllocatedAmount());

// Peek and check the payload.
CanardTxQueueItem* ti = que.peek();
REQUIRE(nullptr != ti); // Make sure we get the same frame again.
REQUIRE(ti->frame.payload.size == 12);
REQUIRE(ti->frame.payload.allocated_size == 12);
REQUIRE(0 == std::memcmp(ti->frame.payload.data, payload.data(), 8));
REQUIRE(ti->tx_deadline_usec == now + deadline);
REQUIRE(1 == tx_alloc.getNumAllocatedFragments());
REQUIRE((8 + 4) == tx_alloc.getTotalAllocatedAmount());
REQUIRE(1 == ins_alloc.getNumAllocatedFragments());
REQUIRE(sizeof(CanardTxQueueItem) * 1 == ins_alloc.getTotalAllocatedAmount());

// Don't pop and free the item - we gonna flush it by the next push at 12s.
}

now += 2 * deadline; // 10s -> 12s

// 2. Push two-frames, peek. @ 12s (after 2x deadline)
// These 2 frames should still fit into the queue (with capacity 2) despite one expired frame still there.`
{
que.setMTU(8);
ins.setNodeID(42);
meta.transfer_id = 22;
REQUIRE(2 == que.push(&ins.getInstance(), now + deadline, meta, {8, payload.data()}, now));
REQUIRE(2 == que.getSize());
REQUIRE(2 == tx_alloc.getNumAllocatedFragments());
REQUIRE((8 + 4) == tx_alloc.getTotalAllocatedAmount());
REQUIRE(2 == ins_alloc.getNumAllocatedFragments());
REQUIRE(sizeof(CanardTxQueueItem) * 2 == ins_alloc.getTotalAllocatedAmount());
REQUIRE(1 == que.getInstance().stats.dropped_frames);

// a) Peek and check the payload of the 1st frame
CanardTxQueueItem* ti = NULL;
{
ti = que.peek();
REQUIRE(nullptr != ti);
REQUIRE(ti->frame.payload.size == 8);
REQUIRE(ti->frame.payload.allocated_size == 8);
REQUIRE(0 == std::memcmp(ti->frame.payload.data, payload.data(), 7));
REQUIRE(ti->tx_deadline_usec == now + deadline);
REQUIRE(2 == tx_alloc.getNumAllocatedFragments());
REQUIRE((8 + 4) == tx_alloc.getTotalAllocatedAmount());
REQUIRE(2 == ins_alloc.getNumAllocatedFragments());
REQUIRE(sizeof(CanardTxQueueItem) * 2 == ins_alloc.getTotalAllocatedAmount());

// Don't pop and free the item - we gonna flush it by the next push @ 14s.
}
// b) Check the payload of the 2nd frame
{
ti = ti->next_in_transfer;
REQUIRE(nullptr != ti);
REQUIRE(ti->frame.payload.size == 4);
REQUIRE(ti->frame.payload.allocated_size == 4);
REQUIRE(0 == std::memcmp(ti->frame.payload.data, payload.data() + 7, 1));
REQUIRE(ti->tx_deadline_usec == now + deadline);

// Don't pop and free the item - we gonna flush it by the next push @ 14s.
}
}

now += 2 * deadline; // 12s -> 14s

// 3. Push three-frames, peek. @ 14s (after another 2x deadline)
// These 3 frames should not fit into the queue (with capacity 2),
// but as a side effect, the expired frames (from push @ 12s) should be flushed as well.
{
meta.transfer_id = 23;
REQUIRE(-CANARD_ERROR_OUT_OF_MEMORY ==
que.push(&ins.getInstance(), now + deadline, meta, {8 * 2, payload.data()}, now));
REQUIRE(0 == que.getSize());
REQUIRE(0 == tx_alloc.getNumAllocatedFragments());
REQUIRE(0 == tx_alloc.getTotalAllocatedAmount());
REQUIRE(0 == ins_alloc.getNumAllocatedFragments());
REQUIRE(0 == ins_alloc.getTotalAllocatedAmount());
REQUIRE(1 + 2 == que.getInstance().stats.dropped_frames);

REQUIRE(nullptr == que.peek());
}
}

0 comments on commit ee67adc

Please sign in to comment.