Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MSCCLPP user buffer registration APIs and integrate with RCCL #1477

Merged
merged 14 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions cmake/MSCCLPP.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,20 @@ if(ENABLE_MSCCLPP)
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)
execute_process(
COMMAND git apply ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/read-allred.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
COMMAND git apply ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/read-allred.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)

execute_process(
COMMAND git apply ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/mscclpp_ibv_access_relaxed_ordering.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)

execute_process(
COMMAND git apply ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/mem-reg.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)

message(STATUS "Building mscclpp only for gfx942.")

mscclpp_cmake_arg(CMAKE_PREFIX_PATH)
Expand All @@ -102,17 +108,23 @@ if(ENABLE_MSCCLPP)

find_package(mscclpp_nccl REQUIRED)
execute_process(
COMMAND git apply --reverse ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/cpx.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)
execute_process(
COMMAND git apply --reverse ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/read-allred.patch
COMMAND git apply --reverse ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/cpx.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)
execute_process(
COMMAND git apply --reverse ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/read-allred.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)

execute_process(
COMMAND git apply --reverse ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/mscclpp_ibv_access_relaxed_ordering.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)

execute_process(
COMMAND git apply --reverse ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/mem-reg.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)
endif()

execute_process(COMMAND objcopy
Expand Down
147 changes: 147 additions & 0 deletions ext-src/mem-reg.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
diff --git a/apps/nccl/include/nccl.h b/apps/nccl/include/nccl.h
index 7f50792..b8b146d 100644
--- a/apps/nccl/include/nccl.h
+++ b/apps/nccl/include/nccl.h
@@ -344,6 +344,13 @@ ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcoun
ncclResult_t pncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcount, ncclDataType_t datatype,
ncclComm_t comm, cudaStream_t stream);

+/*
+ * Register/Deregister
+ */
+ncclResult_t ncclCommRegister(ncclComm_t comm, void* buff, size_t size, void** handle);
+ncclResult_t ncclCommDeregister(ncclComm_t comm, void* handle);
+bool mscclpp_BuffIsRegistered(ncclComm_t comm, const void* buff, size_t count);
+size_t mscclpp_BufferSize(ncclComm_t comm, void* handle);
/*
* Send
*
diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu
index a697be2..1d4af61 100644
--- a/apps/nccl/src/nccl.cu
+++ b/apps/nccl/src/nccl.cu
@@ -65,6 +65,7 @@ struct ncclComm {
std::unordered_map<channelKey, ChannelInfo> channelInInfos;
std::unordered_map<channelKey, ChannelInfo> channelOutInfos;
std::unordered_map<channelKey, ChannelInfo> channelScratchInfos;
+ std::unordered_map<void*, channelKey> handleKeys;
std::shared_ptr<char> scratchBuff;
std::vector<mscclpp::RegisteredMemory> remoteScratchRegMemories;

@@ -73,6 +74,11 @@ struct ncclComm {
uint32_t buffFlag;
};

+struct handleInfo {
+ void * buff;
+ cudaIpcMemHandle_t ipcHandle;
+};
+
static size_t ncclTypeSize(ncclDataType_t type) {
switch (type) {
case ncclInt8:
@@ -577,6 +583,104 @@ NCCL_API ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t
return ncclSuccess;
}

+NCCL_API ncclResult_t ncclCommRegister(ncclComm_t comm, void* buff, size_t size, void** handle) {
+ size_t buffBytes = size;
+ CUdeviceptr buffBasePtr;
+ MSCCLPP_CUTHROW(cuMemGetAddressRange(&buffBasePtr, &buffBytes, (CUdeviceptr)buff));
+
+ int rank = comm->comm->bootstrap()->getRank();
+ channelKey buffKey{(void*)buffBasePtr, buffBytes};
+
+ std::vector<mscclpp::RegisteredMemory> remoteMemories;
+
+ // Creating the channels
+ auto buffIt = comm->channelScratchInfos.find(buffKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no guarantee this will be called from a single thread, we probably need to lock access to channelScratchInfos and the rest of the maps. I suggest using one lock if this function doesn't fall on the critical path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This map is supposed to be checked by the host process and MSCCLPP is invoked only when using one GPU per process case.

+ if (buffIt == comm->channelScratchInfos.end()) {
+ std::vector<mscclpp::SmChannel> channels =
+ setupSmChannels(comm, comm->remoteScratchRegMemories, const_cast<void*>((void*)buffBasePtr));
+ ChannelInfo channelInfo{channels, channels, setupSmChannelDeviceHandles(channels), setupSmChannelDeviceHandles(channels)};
+ buffIt = comm->channelScratchInfos.emplace(buffKey, channelInfo).first;
+ }
+ auto sendIt = comm->channelInInfos.find(buffKey);
+ if (sendIt == comm->channelInInfos.end()) {
+ std::vector<mscclpp::SmChannel> channels =
+ setupSmChannels(comm, comm->remoteScratchRegMemories, const_cast<void*>((void*)buffBasePtr));
+
+ remoteMemories =
+ setupRemoteMemories(comm->comm, rank, (void*)buffBasePtr, buffBytes, mscclpp::Transport::CudaIpc);
+ std::vector<mscclpp::SmChannel> channels1 =
+ setupSmChannels(comm, remoteMemories, const_cast<void*>((void*)buffBasePtr));
+
+ ChannelInfo channelInfo{channels, channels1, setupSmChannelDeviceHandles(channels), setupSmChannelDeviceHandles(channels1)};
+ sendIt = comm->channelInInfos.emplace(buffKey, channelInfo).first;
+ }
+ auto recvIt = comm->channelOutInfos.find(buffKey);
+ if (recvIt == comm->channelOutInfos.end()) {
+ remoteMemories =
+ setupRemoteMemories(comm->comm, rank, (void*)buffBasePtr, buffBytes, mscclpp::Transport::CudaIpc);
+ std::vector<mscclpp::SmChannel> outChannels =
+ setupSmChannels(comm, remoteMemories, const_cast<void*>((void*)buffBasePtr));
+ ChannelInfo channelInfo{outChannels, outChannels, setupSmChannelDeviceHandles(outChannels), setupSmChannelDeviceHandles(outChannels)};
+ recvIt = comm->channelOutInfos.emplace(buffKey, channelInfo).first;
+ }
+
+ cudaIpcMemHandle_t ipcHandle;
+ MSCCLPP_CUDATHROW(cudaIpcGetMemHandle(&ipcHandle, buffBasePtr));
+
+ struct handleInfo *p = (struct handleInfo *) malloc(sizeof(struct handleInfo));
+ p->buff = buffBasePtr;
+ p->ipcHandle = ipcHandle;
+ *handle = p;
+
+ auto it = comm->handleKeys.find(*handle);
+ if (it == comm->handleKeys.end()) {
+ comm->handleKeys[*handle] = buffKey;
+ }
+
+ return ncclSuccess;
+}
+
+NCCL_API ncclResult_t ncclCommDeregister(ncclComm_t comm, void* handle) {
+ if (comm && handle) {
+ channelKey buffKey = comm->handleKeys[handle];
+
+ auto scratchIt = comm->channelScratchInfos.find(buffKey);
+ if (scratchIt != comm->channelScratchInfos.end()) {
+ comm->channelScratchInfos.erase(scratchIt);
+ }
+
+ auto inIt = comm->channelInInfos.find(buffKey);
+ if (inIt != comm->channelInInfos.end()) {
+ comm->channelInInfos.erase(inIt);
+ }
+
+ auto outIt = comm->channelOutInfos.find(buffKey);
+ if (outIt != comm->channelOutInfos.end()) {
+ comm->channelOutInfos.erase(outIt);
+ }
+
+ free(handle);
+ }
+ return ncclSuccess;
+}
+
+bool mscclpp_BuffIsRegistered(ncclComm_t comm, const void* buff, size_t count){
+ size_t buffBytes;
+ CUdeviceptr buffBasePtr;
+ MSCCLPP_CUTHROW(cuMemGetAddressRange(&buffBasePtr, &buffBytes, (CUdeviceptr)buff));
+ channelKey buffKey{(void*)buffBasePtr, buffBytes};
+ auto buffIt = comm->channelScratchInfos.find(buffKey);
+ bool registered = buffIt != comm->channelScratchInfos.end();
+ return registered;
+}
+size_t
+mscclpp_BufferSize(ncclComm_t comm, void* handle){
+ if (!(comm && handle)){
+ return 0;
+ }
+ auto buffKeyIt = comm->handleKeys.find(handle);
+ return buffKeyIt != comm->handleKeys.end() ? buffKeyIt->second.bytes : 0;
+}
NCCL_API ncclResult_t ncclSend(const void*, size_t, ncclDataType_t, int, ncclComm_t, cudaStream_t) {
// TODO: implement this function
return ncclInternalError;
8 changes: 8 additions & 0 deletions src/include/mscclpp/mscclpp_nccl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ extern "C" {
/* See ncclAllGather. */
ncclResult_t mscclpp_ncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcount,
ncclDataType_t datatype, mscclppComm_t comm, hipStream_t stream);

ncclResult_t mscclpp_ncclCommRegister(mscclppComm_t comm, void* buff, size_t size, void** handle);

ncclResult_t mscclpp_ncclCommDeregister(mscclppComm_t comm, void* handle);

bool mscclpp_BuffIsRegistered(mscclppComm_t comm, const void* buff, size_t count);

size_t mscclpp_BufferSize(mscclppComm_t comm, void* handle);
}

namespace std {
Expand Down
14 changes: 12 additions & 2 deletions src/misc/msccl/msccl_lifecycle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,13 @@ ncclResult_t mscclEnqueueCheck(
NCCLCHECK(mscclGetCaptureStatus(comm->rank, stream));
}

const bool sendBuffRegistered = mscclpp_BuffIsRegistered(comm->mscclpp_comm, sendBuff, count);
const bool recvBuffRegistered = mscclpp_BuffIsRegistered(comm->mscclpp_comm, recvBuff, count);
const bool graphMode = threadLocalStatus.captureStatus != mscclNoCapture;
const bool buffsRegistedNonGraphMode = !graphMode && sendBuffRegistered && recvBuffRegistered;

/* check if one rank per GPU and graph mode is enabled */
if ((threadLocalStatus.captureStatus != mscclNoCapture) && comm->mscclCompatible && nBytes > 0 && (nBytes & 31) == 0) {
if ((graphMode || buffsRegistedNonGraphMode) && comm->mscclCompatible && nBytes > 0 && (nBytes & 31) == 0) {
bool isManagedBuffer = false;
if (sendBuff) CUDACHECK(hipPointerGetAttribute(&isManagedBuffer, HIP_POINTER_ATTRIBUTE_IS_MANAGED, const_cast<void*>(sendBuff)));
if (!isManagedBuffer && recvBuff) CUDACHECK(hipPointerGetAttribute(&isManagedBuffer, HIP_POINTER_ATTRIBUTE_IS_MANAGED, const_cast<void*>(recvBuff)));
Expand Down Expand Up @@ -565,8 +570,13 @@ ncclResult_t mscclEnqueueCheck(
NCCLCHECK(mscclGetCaptureStatus(comm->rank, stream));
}

const bool sendBuffRegistered = mscclpp_BuffIsRegistered(comm->mscclpp_comm, sendBuff, count);
const bool recvBuffRegistered = mscclpp_BuffIsRegistered(comm->mscclpp_comm, recvBuff, count);
const bool graphMode = threadLocalStatus.captureStatus != mscclNoCapture;
const bool buffsRegistedNonGraphMode = !graphMode && sendBuffRegistered && recvBuffRegistered;

/* check if one rank per GPU and graph mode is enabled */
if ((threadLocalStatus.captureStatus != mscclNoCapture) && comm->mscclCompatible && nBytes > 0 && (nBytes & 31) == 0) {
if ((graphMode || buffsRegistedNonGraphMode) && comm->mscclCompatible && nBytes > 0 && (nBytes & 31) == 0) {
bool isManagedBuffer = false;
if (sendBuff) CUDACHECK(hipPointerGetAttribute(&isManagedBuffer, HIP_POINTER_ATTRIBUTE_IS_MANAGED, const_cast<void*>(sendBuff)));
if (!isManagedBuffer && recvBuff) CUDACHECK(hipPointerGetAttribute(&isManagedBuffer, HIP_POINTER_ATTRIBUTE_IS_MANAGED, const_cast<void*>(recvBuff)));
Expand Down
2 changes: 2 additions & 0 deletions src/misc/mscclpp/mscclpp_nccl_syms.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ ncclRedOpDestroy mscclpp_ncclRedOpDestroy
ncclReduce mscclpp_ncclReduce
ncclReduceScatter mscclpp_ncclReduceScatter
ncclSend mscclpp_ncclSend
ncclCommRegister mscclpp_ncclCommRegister
ncclCommDeregister mscclpp_ncclCommDeregister
27 changes: 27 additions & 0 deletions src/register.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include "net.h"
#include "register.h"
#include "api_trace.h"
#ifdef ENABLE_MSCCLPP
#include "mscclpp/mscclpp_nccl.h"
#endif

ncclResult_t ncclNetDeregister(struct ncclComm* comm, struct ncclReg* reg) {
struct ncclRegCache* cache = &comm->regCache;
Expand Down Expand Up @@ -155,12 +158,36 @@ NCCL_API(ncclResult_t, ncclCommRegister, const ncclComm_t comm, void* buff, size
ncclResult_t ncclCommRegister_impl(const ncclComm_t comm, void* buff, size_t size, void** handle) {
NCCLCHECK(CommCheck(comm, "ncclCommRegister", "comm"));
if (comm->checkPointers) NCCLCHECK(CudaPtrCheck(buff, comm, "buff", "ncclCommRegister"));
#ifdef ENABLE_MSCCLPP
if (comm->mscclCompatible && size > 0 && (size & 31) == 0 && size <= comm->mscclpp_threshold){
bool isManagedBuffer = false;
CUDACHECK(hipPointerGetAttribute(&isManagedBuffer, HIP_POINTER_ATTRIBUTE_IS_MANAGED, const_cast<void*>(buff)));
if(!isManagedBuffer){
INFO(NCCL_INIT, "MSCCL++: ncclCommRegister");
NCCLCHECK(mscclpp_ncclCommRegister(comm->mscclpp_comm, buff, size, handle));
return ncclSuccess;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm debating whether this is the right behaviour... Are these buffers reused in ways that might call MSCCL++ sometimes and RCCL other times? Are buffers always matched to message size, or could they use a buffer that's larger than the given message size? If so, the safest thing to do might be to register for BOTH RCCL and MSCCL++. Thoughts, @nusislam ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cases when mscclppRegister should be used vs. rcclRegister should be used are mutually exclusive.

}
else{
WARN("MSCCL++: Cannot register user-buffers on managed memory. RCCL user-buffer registration will occur.");
}
}
#endif
INFO(NCCL_INIT, "RCCL: ncclCommRegister");
NCCLCHECK(ncclRegister(comm, buff, size, handle));
return ncclSuccess;
}

NCCL_API(ncclResult_t, ncclCommDeregister, const ncclComm_t comm, void* handle);
ncclResult_t ncclCommDeregister_impl(const ncclComm_t comm, void* handle) {

#ifdef ENABLE_MSCCLPP
const size_t size = mscclpp_BufferSize(comm->mscclpp_comm, handle);
if (comm->mscclCompatible && size > 0 && (size & 31) == 0 && size <= comm->mscclpp_threshold) {
NCCLCHECK(mscclpp_ncclCommDeregister(comm->mscclpp_comm, handle));
return ncclSuccess;
}
#endif

NCCLCHECK(CommCheck(comm, "ncclCommRegister", "comm"));
struct ncclReg* reg = (struct ncclReg*)handle;
struct ncclRegCache* cache = &comm->regCache;
Expand Down