Skip to content

Commit

Permalink
expand tests (#48)
Browse files Browse the repository at this point in the history
* simplify MPSC and expand SPSC tests
  • Loading branch information
geseq authored Mar 2, 2024
1 parent 255624e commit 4e01132
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 16 deletions.
28 changes: 17 additions & 11 deletions include/mpsc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,30 @@ class MPSC {
MPSC() = default;

put_t put(const T &value) noexcept {
alignas(64) thread_local static std::size_t reader_index_cache;
alignas(64) thread_local static std::size_t write_index;
alignas(64) thread_local static Producer p;
do {
while (write_index > (reader_index_cache + common_.index_mask_)) {
write_index = next_free_index_.load(std::memory_order_acquire);
reader_index_cache = consumer_.reader_index_.load(std::memory_order_relaxed);
while (p.write_index_cache_ > (p.reader_index_cache_ + common_.index_mask_)) {
p.write_index_cache_ = next_free_index_.load(std::memory_order_acquire);
p.reader_index_cache_ = consumer_.reader_index_.load(std::memory_order_relaxed);
if constexpr (std::is_same<PutWaitStrategy, ReturnImmediateStrategy>::value) {
return false;
} else {
common_.put_wait_.wait([this] { return write_index <= (consumer_.reader_index_.load(std::memory_order_relaxed) + common_.index_mask_); });
common_.put_wait_.wait(
[this] { return p.write_index_cache_ <= (consumer_.reader_index_.load(std::memory_order_relaxed) + common_.index_mask_); });
}
}
} while (!next_free_index_.compare_exchange_strong(write_index, write_index + 1, std::memory_order_acq_rel, std::memory_order_acquire));
} while (
!next_free_index_.compare_exchange_strong(p.write_index_cache_, p.write_index_cache_ + 1, std::memory_order_acq_rel, std::memory_order_acquire));

contents_[write_index & common_.index_mask_] = value;
contents_[p.write_index_cache_ & common_.index_mask_] = value;

// commit in the correct order to avoid problems
while (last_committed_index_.load(std::memory_order_relaxed) != write_index) {
while (last_committed_index_.load(std::memory_order_relaxed) != p.write_index_cache_) {
// we don't return at this point even in case of ReturnImmediatelyStrategy as we've already taken the token
common_.put_wait_.wait([this] { return last_committed_index_.load(std::memory_order_relaxed) == write_index; });
common_.put_wait_.wait([this] { return last_committed_index_.load(std::memory_order_relaxed) == p.write_index_cache_; });
}

last_committed_index_.store(++write_index, std::memory_order_release);
last_committed_index_.store(++p.write_index_cache_, std::memory_order_release);

common_.get_wait_.notify();
common_.put_wait_.notify();
Expand Down Expand Up @@ -93,6 +94,11 @@ class MPSC {
const std::size_t index_mask_ = roundUpNextPowerOfTwo(min_size) - 1;
};

struct alignas(64) Producer {
std::size_t reader_index_cache_{0};
std::size_t write_index_cache_{0};
};

struct alignas(64) Consumer {
std::size_t last_committed_index_cache_{0};
std::size_t reader_index_2_{0};
Expand Down
28 changes: 23 additions & 5 deletions test/fastchan_spsc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ void testSPSCSingleThreaded_PutGet() {
// Test put and get with a single thread
for (int i = 0; i < iterations; ++i) {
if constexpr (std::is_same<put_wait_strategy, fastchan::ReturnImmediateStrategy>::value) {
assert(chan.put(i));
auto result = false;
do {
result = chan.put(i);
} while (!result);
} else {
chan.put(i);
}
Expand Down Expand Up @@ -123,17 +126,32 @@ void testSPSC() {

int main() {
testSPSC<fastchan::PauseWaitStrategy, fastchan::PauseWaitStrategy>();
testSPSC<fastchan::PauseWaitStrategy, fastchan::YieldWaitStrategy>();
testSPSC<fastchan::PauseWaitStrategy, fastchan::NoOpWaitStrategy>();
testSPSC<fastchan::PauseWaitStrategy, fastchan::CVWaitStrategy>();
testSPSC<fastchan::PauseWaitStrategy, fastchan::ReturnImmediateStrategy>();
testSPSC<fastchan::ReturnImmediateStrategy, fastchan::PauseWaitStrategy>();
testSPSC<fastchan::ReturnImmediateStrategy, fastchan::ReturnImmediateStrategy>();

testSPSC<fastchan::YieldWaitStrategy, fastchan::YieldWaitStrategy>();
testSPSC<fastchan::YieldWaitStrategy, fastchan::PauseWaitStrategy>();
testSPSC<fastchan::YieldWaitStrategy, fastchan::NoOpWaitStrategy>();
testSPSC<fastchan::YieldWaitStrategy, fastchan::CVWaitStrategy>();
testSPSC<fastchan::YieldWaitStrategy, fastchan::ReturnImmediateStrategy>();
testSPSC<fastchan::ReturnImmediateStrategy, fastchan::YieldWaitStrategy>();
testSPSC<fastchan::ReturnImmediateStrategy, fastchan::ReturnImmediateStrategy>();

testSPSC<fastchan::NoOpWaitStrategy, fastchan::NoOpWaitStrategy>();
testSPSC<fastchan::NoOpWaitStrategy, fastchan::YieldWaitStrategy>();
testSPSC<fastchan::NoOpWaitStrategy, fastchan::PauseWaitStrategy>();
testSPSC<fastchan::NoOpWaitStrategy, fastchan::CVWaitStrategy>();
testSPSC<fastchan::NoOpWaitStrategy, fastchan::ReturnImmediateStrategy>();

testSPSC<fastchan::CVWaitStrategy, fastchan::CVWaitStrategy>();
testSPSC<fastchan::CVWaitStrategy, fastchan::YieldWaitStrategy>();
testSPSC<fastchan::CVWaitStrategy, fastchan::PauseWaitStrategy>();
testSPSC<fastchan::CVWaitStrategy, fastchan::NoOpWaitStrategy>();
testSPSC<fastchan::CVWaitStrategy, fastchan::ReturnImmediateStrategy>();

testSPSC<fastchan::ReturnImmediateStrategy, fastchan::PauseWaitStrategy>();
testSPSC<fastchan::ReturnImmediateStrategy, fastchan::YieldWaitStrategy>();
testSPSC<fastchan::ReturnImmediateStrategy, fastchan::NoOpWaitStrategy>();
testSPSC<fastchan::ReturnImmediateStrategy, fastchan::CVWaitStrategy>();
testSPSC<fastchan::ReturnImmediateStrategy, fastchan::ReturnImmediateStrategy>();

Expand Down

0 comments on commit 4e01132

Please sign in to comment.