Skip to content

Commit

Permalink
Optionally specify stream for pointers in CUDA algorithms
Browse files Browse the repository at this point in the history
Summary:
Work may be queued on CUDA streams for asynchronous execution. The
memory backed by pointers passed to any algorithm can therefore be
mutated after constructing an algorithm instance. By also passing in
the streams these mutations happen on, the algorithms can synchronize
with these mutations to ensure no invalid data is used.

By passing in these streams, any work done by these algorithms will
*also* be queued, which effectively removes a single synchronization
step from any algorithm run.

Differential Revision: D4589394

fbshipit-source-id: 0c8cd6ba9c9018f33d6f4c55a037083fc4164acb
  • Loading branch information
pietern authored and facebook-github-bot committed Feb 20, 2017
1 parent 0722775 commit d6ca382
Show file tree
Hide file tree
Showing 16 changed files with 396 additions and 177 deletions.
7 changes: 4 additions & 3 deletions gloo/benchmark/cuda_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ class CudaAllreduceRingBenchmark : public Benchmark {

protected:
virtual float* allocate(int elements) override {
ptrs_.push_back(make_unique<CudaMemory<float> >(elements, context_->rank_));
return **ptrs_[0];
ptrs_.push_back(CudaMemory<float>(elements));
ptrs_[ptrs_.size()-1].set(context_->rank_);
return *ptrs_[ptrs_.size()-1];
}

std::vector<std::unique_ptr<CudaMemory<float> > > ptrs_;
std::vector<CudaMemory<float> > ptrs_;
};

} // namespace
Expand Down
22 changes: 8 additions & 14 deletions gloo/cuda.cu
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,31 @@

namespace gloo {

const cudaStream_t kStreamNotSet = (cudaStream_t)(-1);

template<typename T>
CudaDevicePointer<T>
CudaDevicePointer<T>::createWithNewStream(
CudaDevicePointer<T>::create(
T* ptr,
size_t count) {
size_t count,
cudaStream_t stream) {
CudaDevicePointer p(ptr, count);

// Create new stream for operations concerning this pointer
{
if (stream == kStreamNotSet) {
CudaDeviceScope scope(p.deviceId_);
int loPri, hiPri;
CUDA_CHECK(cudaDeviceGetStreamPriorityRange(&loPri, &hiPri));
CUDA_CHECK(cudaStreamCreateWithPriority(
&p.stream_, cudaStreamNonBlocking, hiPri));
p.streamOwner_ = true;
} else {
p.stream_ = stream;
}

return p;
}

template<typename T>
CudaDevicePointer<T>
CudaDevicePointer<T>::createWithStream(
T* ptr,
size_t count,
cudaStream_t stream) {
CudaDevicePointer p(ptr, count);
p.stream_ = stream;
return p;
}

template<typename T>
CudaDevicePointer<T>::CudaDevicePointer(T* ptr, size_t count)
: device_(ptr),
Expand Down
30 changes: 22 additions & 8 deletions gloo/cuda.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,35 @@

namespace gloo {

extern const cudaStream_t kStreamNotSet;

template<typename T>
class CudaDevicePointer {
public:
static CudaDevicePointer createWithNewStream(
T* ptr,
size_t count);

static CudaDevicePointer createWithStream(
static CudaDevicePointer create(
T* ptr,
size_t count,
cudaStream_t stream);
cudaStream_t stream = kStreamNotSet);

CudaDevicePointer(CudaDevicePointer&&) noexcept;
~CudaDevicePointer();

T* operator*() const {
return device_;
}

int getCount() const {
return count_;
}

int getDeviceID() const {
return deviceId_;
}

cudaStream_t getStream() const {
return stream_;
}

// Copy contents of device pointer to host.
void copyToHostAsync(T* dst);

Expand All @@ -42,7 +56,7 @@ class CudaDevicePointer {
// Instances must be created through static functions
CudaDevicePointer(T* ptr, size_t count);

// Instances cannot be copied or assigned
// Instances cannot be copied or copy-assigned
CudaDevicePointer(const CudaDevicePointer&) = delete;
CudaDevicePointer& operator=(const CudaDevicePointer&) = delete;

Expand All @@ -60,7 +74,7 @@ class CudaDevicePointer {
// this pointer lives on. The stream can be specified at
// construction time if one has already been created outside this
// library. If it is not specified, a new stream is created.
cudaStream_t stream_ = 0;
cudaStream_t stream_ = kStreamNotSet;
cudaEvent_t event_ = 0;

// If no stream is specified at construction time, this class
Expand Down
28 changes: 20 additions & 8 deletions gloo/cuda_allreduce_ring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,29 @@ namespace gloo {
template <typename T>
CudaAllreduceRing<T>::CudaAllreduceRing(
const std::shared_ptr<Context>& context,
std::vector<T*> ptrs,
int count)
const std::vector<T*>& ptrs,
int count,
const std::vector<cudaStream_t>& streams)
: Allreduce<T>(context, nullptr),
count_(count),
bytes_(count * sizeof(T)),
leftPair_(this->getLeftPair()),
rightPair_(this->getRightPair()) {
count_(count),
bytes_(count * sizeof(T)),
leftPair_(this->getLeftPair()),
rightPair_(this->getRightPair()) {
auto newStream = true;
if (streams.size() > 0) {
GLOO_ENFORCE_EQ(streams.size(), ptrs.size());
newStream = false;
}

hostPtrs_.resize(ptrs.size());
for (int i = 0; i < ptrs.size(); i++) {
devicePtrs_.push_back(
CudaDevicePointer<T>::createWithNewStream(ptrs[i], count_));
if (newStream) {
devicePtrs_.push_back(
CudaDevicePointer<T>::create(ptrs[i], count_));
} else {
devicePtrs_.push_back(
CudaDevicePointer<T>::create(ptrs[i], count_, streams[i]));
}
CUDA_CHECK(cudaMallocHost(&hostPtrs_[i], bytes_));
}

Expand Down
5 changes: 3 additions & 2 deletions gloo/cuda_allreduce_ring.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ class CudaAllreduceRing : public Allreduce<T> {
public:
CudaAllreduceRing(
const std::shared_ptr<Context>& context,
std::vector<T*> ptrs,
int count);
const std::vector<T*>& ptrs,
int count,
const std::vector<cudaStream_t>& streams = std::vector<cudaStream_t>());

virtual ~CudaAllreduceRing();

Expand Down
29 changes: 20 additions & 9 deletions gloo/cuda_allreduce_ring_chunked.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@ namespace gloo {
template <typename T>
CudaAllreduceRingChunked<T>::CudaAllreduceRingChunked(
const std::shared_ptr<Context>& context,
std::vector<T*> ptrs,
int count)
const std::vector<T*>& ptrs,
int count,
const std::vector<cudaStream_t>& streams)
: Allreduce<T>(context, nullptr),
count_(count),
bytes_(count * sizeof(T)),
leftPair_(this->getLeftPair()),
rightPair_(this->getRightPair()) {
// Setup device and host memory
count_(count),
bytes_(count * sizeof(T)),
leftPair_(this->getLeftPair()),
rightPair_(this->getRightPair()) {
auto newStream = true;
if (streams.size() > 0) {
GLOO_ENFORCE_EQ(streams.size(), ptrs.size());
newStream = false;
}

hostPtrs_.resize(ptrs.size());
for (int i = 0; i < ptrs.size(); i++) {
devicePtrs_.push_back(
CudaDevicePointer<T>::createWithNewStream(ptrs[i], count_));
if (newStream) {
devicePtrs_.push_back(
CudaDevicePointer<T>::create(ptrs[i], count_));
} else {
devicePtrs_.push_back(
CudaDevicePointer<T>::create(ptrs[i], count_, streams[i]));
}
CUDA_CHECK(cudaMallocHost(&hostPtrs_[i], bytes_));
}

Expand Down
5 changes: 3 additions & 2 deletions gloo/cuda_allreduce_ring_chunked.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ class CudaAllreduceRingChunked : public Allreduce<T> {
public:
CudaAllreduceRingChunked(
const std::shared_ptr<Context>& context,
std::vector<T*> ptrs,
int count);
const std::vector<T*>& ptrs,
int count,
const std::vector<cudaStream_t>& streams = std::vector<cudaStream_t>());

virtual ~CudaAllreduceRingChunked();

Expand Down
14 changes: 8 additions & 6 deletions gloo/cuda_broadcast_one_to_all.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ CudaBroadcastOneToAll<T>::CudaBroadcastOneToAll(
const std::shared_ptr<Context>& context,
T* ptr,
int count,
int rootRank)
int rootRank,
cudaStream_t stream)
: Broadcast<T>(context, rootRank),
devicePtr_(CudaDevicePointer<T>::createWithNewStream(ptr, count)) {
auto bytes = count * sizeof(T);
CUDA_CHECK(cudaMallocHost(&hostPtr_, bytes));
devicePtr_(CudaDevicePointer<T>::create(ptr, count, stream)),
count_(count),
bytes_(count * sizeof(T)) {
CUDA_CHECK(cudaMallocHost(&hostPtr_, bytes_));
if (this->contextRank_ == this->rootRank_) {
for (int i = 0; i < this->contextSize_; i++) {
if (i == this->contextRank_) {
Expand All @@ -31,11 +33,11 @@ CudaBroadcastOneToAll<T>::CudaBroadcastOneToAll(

auto& pair = this->context_->getPair(i);
sendDataBuffers_.push_back(
pair->createSendBuffer(0, hostPtr_, bytes));
pair->createSendBuffer(0, hostPtr_, bytes_));
}
} else {
auto& rootPair = this->context_->getPair(this->rootRank_);
recvDataBuffer_ = rootPair->createRecvBuffer(0, hostPtr_, bytes);
recvDataBuffer_ = rootPair->createRecvBuffer(0, hostPtr_, bytes_);
}
}

Expand Down
6 changes: 5 additions & 1 deletion gloo/cuda_broadcast_one_to_all.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class CudaBroadcastOneToAll : public Broadcast<T> {
const std::shared_ptr<Context>& context,
T* ptr,
int count,
int rootRank = 0);
int rootRank = 0,
cudaStream_t stream = kStreamNotSet);

virtual ~CudaBroadcastOneToAll();

Expand All @@ -31,6 +32,9 @@ class CudaBroadcastOneToAll : public Broadcast<T> {
CudaDevicePointer<T> devicePtr_;
T* hostPtr_;

const int count_;
const int bytes_;

// For the sender (root)
std::vector<std::unique_ptr<transport::Buffer>> sendDataBuffers_;

Expand Down
33 changes: 26 additions & 7 deletions gloo/cuda_private.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,45 @@
namespace gloo {

template<typename T>
__global__ void initializeMemory(T* ptr, const T val, const size_t n)
{
__global__ void initializeMemory(T* ptr, const T val, const size_t n) {
int i = blockIdx.x * blockDim.x + threadIdx.x;
for (; i < n; i += blockDim.x) {
ptr[i] = val;
}
}

template<typename T>
CudaMemory<T>::CudaMemory(size_t n, T val): n_(n), bytes_(n * sizeof(T)) {
CudaMemory<T>::CudaMemory(size_t n): n_(n), bytes_(n * sizeof(T)) {
CUDA_CHECK(cudaGetDevice(&device_));
CUDA_CHECK(cudaMalloc(&ptr_, bytes_));
initializeMemory<<<1, 32>>>(ptr_, val, n);
}

template<typename T>
CudaMemory<T>::CudaMemory(CudaMemory<T>&& other) noexcept
: n_(other.n_),
bytes_(other.bytes_),
device_(other.device_),
ptr_(other.ptr_) {
// Nullify pointer on move source
other.ptr_ = nullptr;
}

template<typename T>
CudaMemory<T>::~CudaMemory() {
CudaDeviceGuard guard;
CUDA_CHECK(cudaSetDevice(device_));
CUDA_CHECK(cudaFree(ptr_));
CudaDeviceScope scope(device_);
if (ptr_ != nullptr) {
CUDA_CHECK(cudaFree(ptr_));
}
}

template<typename T>
void CudaMemory<T>::set(T val, cudaStream_t stream) {
CudaDeviceScope scope(device_);
if (stream == kStreamNotSet) {
initializeMemory<<<1, 32>>>(ptr_, val, n_);
} else {
initializeMemory<<<1, 32, 0, stream>>>(ptr_, val, n_);
}
}

template<typename T>
Expand Down
8 changes: 7 additions & 1 deletion gloo/cuda_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,22 @@ class CudaDeviceScope {
template<typename T>
class CudaMemory {
public:
CudaMemory(size_t n, T val);
explicit CudaMemory(size_t n);
CudaMemory(CudaMemory&&) noexcept;
~CudaMemory();

void set(T val, cudaStream_t stream = kStreamNotSet);

T* operator*() const {
return ptr_;
}

std::unique_ptr<T[]> copyToHost();

protected:
CudaMemory(const CudaMemory&) = delete;
CudaMemory& operator=(const CudaMemory&) = delete;

size_t n_;
size_t bytes_;
int device_;
Expand Down
11 changes: 11 additions & 0 deletions gloo/test/base_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <thread>
#include <vector>

#include "gloo/context.h"
#include "gloo/rendezvous/hash_store.h"
#include "gloo/transport/tcp/device.h"

Expand Down Expand Up @@ -53,6 +54,16 @@ class BaseTest : public ::testing::Test {
}
}

void spawn(
int size,
std::function<void(int, std::shared_ptr<Context>)> fn) {
spawnThreads(size, [&](int rank) {
auto context = std::make_shared<::gloo::Context>(rank, size);
context->connectFullMesh(*store_, device_);
fn(rank, std::move(context));
});
}

std::shared_ptr<::gloo::transport::Device> device_;
std::unique_ptr<::gloo::rendezvous::Store> store_;
};
Expand Down
Loading

0 comments on commit d6ca382

Please sign in to comment.