Skip to content

Commit

Permalink
test tbb
Browse files Browse the repository at this point in the history
  • Loading branch information
abbycin committed Feb 28, 2017
1 parent 1f7a09f commit 66ecde1
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 20 deletions.
2 changes: 1 addition & 1 deletion channel/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Channel

A C++ implementation of **MPSC** (multi-producer single-consumer) [channel](./channel.cpp).
A C++ implementation of **MPSC** (multi-producer single-consumer) [channel](./channel.h).

see [mpsc.rs](./mpsc.rs).
72 changes: 53 additions & 19 deletions channel/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,33 @@
#include <mutex>
#include <tuple>

#ifdef TBB_ALLOC // -ltbbmalloc
#include <tbb/scalable_allocator.h>

template<typename T>
T* alloc()
{
return static_cast<T*>(scalable_malloc(sizeof(T)));
}

void dealloc(void* x)
{
scalable_free(x);
}
#else
template<typename T>
T* alloc()
{
return new T;
}

template<typename T>
void dealloc(T* x)
{
delete x;
}
#endif

class SpinLock
{
public:
Expand All @@ -23,16 +50,16 @@ class SpinLock
public:
Guard(T& lk)
: lk_(lk)
{
lk_.lock();
}

~Guard()
{
lk_.unlock();
}
private:
T& lk_;
{
lk_.lock();
}

~Guard()
{
lk_.unlock();
}
private:
T& lk_;
};

SpinLock()
Expand Down Expand Up @@ -100,7 +127,7 @@ class Queue
return size_ == 0;
}

size_t size() const
size_t unsafe_size() const
{
return size_.load();
}
Expand All @@ -115,14 +142,14 @@ class Queue
size_ += 1;
}

bool pop(T& data)
bool try_pop(T& data)
{
auto tail = tail_.load(std::memory_order_release);
auto tail = tail_.load(std::memory_order_relaxed);
auto next = tail->next.load(std::memory_order_acquire);
if(next == nullptr)
return false;
data = next->data;
tail_.store(next, std::memory_order_acquire);
tail_.store(next, std::memory_order_release);
delete tail;
size_ -= 1;
return true;
Expand All @@ -134,6 +161,13 @@ class Queue
std::atomic<Node*> tail_;
};

#ifdef TBB_QUEUE // -ltbb
#include<tbb/concurrent_queue.h>
template<typename T> using Queue_t = tbb::concurrent_queue<T>;
#else
template<typename T> using Queue_t = Queue<T>;
#endif

template<typename> class Sender;

template<typename> class Receiver;
Expand All @@ -159,15 +193,15 @@ class ReceiverImpl
void send(const T& data)
{
queue_.push(data);
if(queue_.size() > 128)
if(queue_.unsafe_size() > 128)
return;
std::lock_guard<std::mutex> guard(mtx_);
cond_.notify_one();
}

bool try_recv(T& data)
{
return queue_.pop(data);
return queue_.try_pop(data);
}

void recv(T& data)
Expand All @@ -176,7 +210,7 @@ class ReceiverImpl
cond_.wait(lk, [this] {
return !queue_.empty();
});
while(!queue_.pop(data));
while(!queue_.try_pop(data));
}

template<typename Rep, typename Period>
Expand All @@ -186,11 +220,11 @@ class ReceiverImpl
cond_.wait_for(lk, timeout, [this] {
return !queue_.empty();
});
return queue_.pop(data);
return queue_.try_pop(data);
}

private:
Queue<T> queue_;
Queue_t<T> queue_;
std::mutex mtx_;
std::condition_variable cond_;
//std::condition_variable_any cond_;
Expand Down
4 changes: 4 additions & 0 deletions channel/mpsc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::thread;
use std::env::args;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::time::Instant;

fn sender(tx: Sender<u64>, data: u64, mut limit: u64) {
loop {
Expand Down Expand Up @@ -34,6 +35,7 @@ fn main() {
let num: u64 = args().nth(1).unwrap().parse().unwrap();
let (tx, rx) = channel();
let (tx1, tx2, tx3) = (tx.clone(), tx.clone(), tx.clone());
let now = Instant::now();
let rcv = thread::spawn(move || {
receiver(rx);
});
Expand All @@ -54,4 +56,6 @@ fn main() {
tid3.join().unwrap();
tx.send(309).unwrap();
rcv.join().unwrap();
let dur = now.elapsed();
println!("{}.{}", dur.as_secs(), dur.subsec_nanos());
}
16 changes: 16 additions & 0 deletions channel/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "channel.h"
#include <iostream>
#include <chrono>
#include <string>
#include <thread>
#include <vector>
Expand Down Expand Up @@ -38,6 +39,17 @@ void receiver(Receiver<size_t> rx)
printf("receiver done.\n");
}

auto now()
{
return std::chrono::high_resolution_clock::now();
}

template<typename T> // no GNU extension.
auto duration(const T& dur)
{
return std::chrono::duration_cast<std::chrono::nanoseconds>(dur).count();
}

int main(int argc, char* argv[])
{
if(argc != 2)
Expand All @@ -51,6 +63,7 @@ int main(int argc, char* argv[])
auto pair = channel<size_t>();
auto& tx = std::get<0>(pair);
auto& rx = std::get<1>(pair);
auto start = now();

std::thread rcv(receiver, std::move(rx));

Expand All @@ -64,4 +77,7 @@ int main(int argc, char* argv[])
x.join();
tx.send(309);
rcv.join();
auto end = now();
auto dur = static_cast<double>(duration(end - start));
printf("%.9f\n", dur / 1000000000);
}

0 comments on commit 66ecde1

Please sign in to comment.