Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MSCCLPP user buffer registration APIs and integrate with RCCL #1477

Merged
merged 14 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions cmake/MSCCLPP.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,20 @@ if(ENABLE_MSCCLPP)
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)
execute_process(
COMMAND git apply ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/read-allred.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)
execute_process(
COMMAND git apply ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/read-allred.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)

execute_process(
isaki001 marked this conversation as resolved.
Show resolved Hide resolved
COMMAND git apply ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/mscclpp_ibv_access_relaxed_ordering.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)


execute_process(
COMMAND git apply ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/mem-reg.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)

isaki001 marked this conversation as resolved.
Show resolved Hide resolved
message(STATUS "Building mscclpp only for gfx942.")

mscclpp_cmake_arg(CMAKE_PREFIX_PATH)
Expand All @@ -102,17 +108,23 @@ if(ENABLE_MSCCLPP)

find_package(mscclpp_nccl REQUIRED)
execute_process(
COMMAND git apply --reverse ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/cpx.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)
execute_process(
COMMAND git apply --reverse ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/read-allred.patch
COMMAND git apply --reverse ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/cpx.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)
execute_process(
)
execute_process(
COMMAND git apply --reverse ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/read-allred.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)

execute_process(
isaki001 marked this conversation as resolved.
Show resolved Hide resolved
COMMAND git apply --reverse ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/mscclpp_ibv_access_relaxed_ordering.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)
)

execute_process(
COMMAND git apply --reverse ${CMAKE_CURRENT_SOURCE_DIR}/ext-src/mem-reg.patch
WORKING_DIRECTORY ${MSCCLPP_SOURCE}
)
isaki001 marked this conversation as resolved.
Show resolved Hide resolved
endif()

execute_process(COMMAND objcopy
Expand Down
147 changes: 147 additions & 0 deletions ext-src/mem-reg.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
diff --git a/apps/nccl/include/nccl.h b/apps/nccl/include/nccl.h
index 7f50792..b8b146d 100644
--- a/apps/nccl/include/nccl.h
+++ b/apps/nccl/include/nccl.h
@@ -344,6 +344,13 @@ ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcoun
ncclResult_t pncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcount, ncclDataType_t datatype,
ncclComm_t comm, cudaStream_t stream);

+/*
+ * Register/Deregister
+ */
+ncclResult_t ncclCommRegister(ncclComm_t comm, void* buff, size_t size, void** handle);
+ncclResult_t ncclCommDeregister(ncclComm_t comm, void* handle);
+ncclResult_t ncclBuffIsRegistered(ncclComm_t comm, const void* buff, size_t count, bool* registered);
+size_t ncclBufferSize(ncclComm_t comm, void* handle);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're inserting functions into mscclpp, you can just give them names such as mscclppCommRegister, and then there's no need to rename via mscclpp_nccl_syms.txt.

Copy link
Contributor Author

@nusislam nusislam Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ncclCommRegister() and ncclCommDeRegister() are NCCL APIs. We need to keep these names so that a workload can also be run with the mscclpp backend.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious why ncclCommRegister and ncclCommDeRegister are not implement in mscclpp? The stub functions already exist: https://github.com/microsoft/mscclpp/blob/6d26b92665383b60ffae7e4dd9ab4cc047d34721/apps/nccl/src/nccl.cu#L809C1-L817C2

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nusislam Should these changes be submitted as a mscclpp PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, yes. But at the moment, will use this patch.

/*
* Send
*
diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu
index a697be2..1d4af61 100644
--- a/apps/nccl/src/nccl.cu
+++ b/apps/nccl/src/nccl.cu
@@ -65,6 +65,7 @@ struct ncclComm {
std::unordered_map<channelKey, ChannelInfo> channelInInfos;
std::unordered_map<channelKey, ChannelInfo> channelOutInfos;
std::unordered_map<channelKey, ChannelInfo> channelScratchInfos;
+ std::unordered_map<void*, channelKey> handleKeys;
std::shared_ptr<char> scratchBuff;
std::vector<mscclpp::RegisteredMemory> remoteScratchRegMemories;

@@ -73,6 +74,11 @@ struct ncclComm {
uint32_t buffFlag;
};

+struct handleInfo {
+ void * buff;
+ cudaIpcMemHandle_t ipcHandle;
+};
+
static size_t ncclTypeSize(ncclDataType_t type) {
switch (type) {
case ncclInt8:
@@ -577,6 +583,104 @@ NCCL_API ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t
return ncclSuccess;
}

+NCCL_API ncclResult_t ncclCommRegister(ncclComm_t comm, void* buff, size_t size, void** handle) {
+ size_t buffBytes = size;
+ CUdeviceptr buffBasePtr;
+ MSCCLPP_CUTHROW(cuMemGetAddressRange(&buffBasePtr, &buffBytes, (CUdeviceptr)buff));
+
+ int rank = comm->comm->bootstrap()->getRank();
+ channelKey buffKey{(void*)buffBasePtr, buffBytes};
+
+ std::vector<mscclpp::RegisteredMemory> remoteMemories;
+
+ // Creating the channels
+ auto buffIt = comm->channelScratchInfos.find(buffKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no guarantee this will be called from a single thread, we probably need to lock access to channelScratchInfos and the rest of the maps. I suggest using one lock if this function doesn't fall on the critical path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This map is supposed to be checked by the host process and MSCCLPP is invoked only when using one GPU per process case.

+ if (buffIt == comm->channelScratchInfos.end()) {
+ std::vector<mscclpp::SmChannel> channels =
+ setupSmChannels(comm, comm->remoteScratchRegMemories, const_cast<void*>((void*)buffBasePtr));
+ ChannelInfo channelInfo{channels, channels, setupSmChannelDeviceHandles(channels), setupSmChannelDeviceHandles(channels)};
+ buffIt = comm->channelScratchInfos.emplace(buffKey, channelInfo).first;
+ }
+ auto sendIt = comm->channelInInfos.find(buffKey);
+ if (sendIt == comm->channelInInfos.end()) {
+ std::vector<mscclpp::SmChannel> channels =
+ setupSmChannels(comm, comm->remoteScratchRegMemories, const_cast<void*>((void*)buffBasePtr));
+
+ remoteMemories =
+ setupRemoteMemories(comm->comm, rank, (void*)buffBasePtr, buffBytes, mscclpp::Transport::CudaIpc);
+ std::vector<mscclpp::SmChannel> channels1 =
+ setupSmChannels(comm, remoteMemories, const_cast<void*>((void*)buffBasePtr));
+
+ ChannelInfo channelInfo{channels, channels1, setupSmChannelDeviceHandles(channels), setupSmChannelDeviceHandles(channels1)};
+ sendIt = comm->channelInInfos.emplace(buffKey, channelInfo).first;
+ }
+ auto recvIt = comm->channelOutInfos.find(buffKey);
+ if (recvIt == comm->channelOutInfos.end()) {
+ remoteMemories =
+ setupRemoteMemories(comm->comm, rank, (void*)buffBasePtr, buffBytes, mscclpp::Transport::CudaIpc);
+ std::vector<mscclpp::SmChannel> outChannels =
+ setupSmChannels(comm, remoteMemories, const_cast<void*>((void*)buffBasePtr));
+ ChannelInfo channelInfo{outChannels, outChannels, setupSmChannelDeviceHandles(outChannels), setupSmChannelDeviceHandles(outChannels)};
+ recvIt = comm->channelOutInfos.emplace(buffKey, channelInfo).first;
+ }
+
+ cudaIpcMemHandle_t ipcHandle;
+ MSCCLPP_CUDATHROW(cudaIpcGetMemHandle(&ipcHandle, buffBasePtr));
+
+ struct handleInfo *p = (struct handleInfo *) malloc(sizeof(struct handleInfo));
+ p->buff = buffBasePtr;
+ p->ipcHandle = ipcHandle;
+ *handle = p;
+
+ auto it = comm->handleKeys.find(*handle);
+ if (it == comm->handleKeys.end()) {
+ comm->handleKeys[*handle] = buffKey;
+ }
+
+ return ncclSuccess;
+}
+
+NCCL_API ncclResult_t ncclCommDeregister(ncclComm_t comm, void* handle) {
+ if (comm && handle) {
+ channelKey buffKey = comm->handleKeys[handle];
+
+ auto scratchIt = comm->channelScratchInfos.find(buffKey);
+ if (scratchIt != comm->channelScratchInfos.end()) {
+ comm->channelScratchInfos.erase(scratchIt);
+ }
+
+ auto inIt = comm->channelInInfos.find(buffKey);
+ if (inIt != comm->channelInInfos.end()) {
+ comm->channelInInfos.erase(inIt);
+ }
+
+ auto outIt = comm->channelOutInfos.find(buffKey);
+ if (outIt != comm->channelOutInfos.end()) {
+ comm->channelOutInfos.erase(outIt);
+ }
+
+ free(handle);
+ }
+ return ncclSuccess;
+}
+
+NCCL_API ncclResult_t ncclBuffIsRegistered(ncclComm_t comm, const void* buff, size_t count, bool* registered){
+ size_t buffBytes;
+ CUdeviceptr buffBasePtr;
+ MSCCLPP_CUTHROW(cuMemGetAddressRange(&buffBasePtr, &buffBytes, (CUdeviceptr)buff));
+ channelKey buffKey{(void*)buffBasePtr, buffBytes};
+ auto buffIt = comm->channelScratchInfos.find(buffKey);
+ *registered = buffIt != comm->channelScratchInfos.end();
+ return ncclSuccess;
+}
+size_t
+ncclBufferSize(ncclComm_t comm, void* handle){
+ if (!(comm && handle)){
+ return 0;
+ }
+ auto buffKeyIt = comm->handleKeys.find(handle);
+ return buffKeyIt != comm->handleKeys.end() ? buffKeyIt->second.bytes : 0;
+}
NCCL_API ncclResult_t ncclSend(const void*, size_t, ncclDataType_t, int, ncclComm_t, cudaStream_t) {
// TODO: implement this function
return ncclInternalError;
8 changes: 8 additions & 0 deletions src/include/mscclpp/mscclpp_nccl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ extern "C" {
/* See ncclAllGather. */
ncclResult_t mscclpp_ncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcount,
ncclDataType_t datatype, mscclppComm_t comm, hipStream_t stream);

ncclResult_t mscclpp_ncclCommRegister(mscclppComm_t comm, void* buff, size_t size, void** handle);

ncclResult_t mscclpp_ncclCommDeregister(mscclppComm_t comm, void* handle);

ncclResult_t mscclpp_ncclBuffIsRegistered(mscclppComm_t comm, const void* buff, size_t count, bool* registered);

size_t mscclpp_ncclBufferSize(mscclppComm_t comm, void* handle);
}

namespace std {
Expand Down
18 changes: 16 additions & 2 deletions src/misc/msccl/msccl_lifecycle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,15 @@ ncclResult_t mscclEnqueueCheck(
NCCLCHECK(mscclGetCaptureStatus(comm->rank, stream));
}

bool sendBuffRegistered = false;
bool recvBuffRegistered = false;
mscclpp_ncclBuffIsRegistered(comm->mscclpp_comm, sendBuff, count, &sendBuffRegistered);
mscclpp_ncclBuffIsRegistered(comm->mscclpp_comm, recvBuff, count, &recvBuffRegistered);
const bool graphMode = threadLocalStatus.captureStatus != mscclNoCapture;
const bool buffsRegistedNonGraphMode = !graphMode && sendBuffRegistered && recvBuffRegistered;

/* check if one rank per GPU and graph mode is enabled */
if ((threadLocalStatus.captureStatus != mscclNoCapture) && comm->mscclCompatible && nBytes > 0 && (nBytes & 31) == 0) {
if ((graphMode || buffsRegistedNonGraphMode) && comm->mscclCompatible && nBytes > 0 && (nBytes & 31) == 0) {
bool isManagedBuffer = false;
if (sendBuff) CUDACHECK(hipPointerGetAttribute(&isManagedBuffer, HIP_POINTER_ATTRIBUTE_IS_MANAGED, const_cast<void*>(sendBuff)));
if (!isManagedBuffer && recvBuff) CUDACHECK(hipPointerGetAttribute(&isManagedBuffer, HIP_POINTER_ATTRIBUTE_IS_MANAGED, const_cast<void*>(recvBuff)));
Expand Down Expand Up @@ -565,8 +572,15 @@ ncclResult_t mscclEnqueueCheck(
NCCLCHECK(mscclGetCaptureStatus(comm->rank, stream));
}

bool sendBuffRegistered = false;
bool recvBuffRegistered = false;
mscclpp_ncclBuffIsRegistered(comm->mscclpp_comm, sendBuff, count, &sendBuffRegistered);
mscclpp_ncclBuffIsRegistered(comm->mscclpp_comm, recvBuff, count, &recvBuffRegistered);
const bool graphMode = threadLocalStatus.captureStatus != mscclNoCapture;
const bool buffsRegistedNonGraphMode = !graphMode && sendBuffRegistered && recvBuffRegistered;

/* check if one rank per GPU and graph mode is enabled */
if ((threadLocalStatus.captureStatus != mscclNoCapture) && comm->mscclCompatible && nBytes > 0 && (nBytes & 31) == 0) {
if ((graphMode || buffsRegistedNonGraphMode) && comm->mscclCompatible && nBytes > 0 && (nBytes & 31) == 0) {
bool isManagedBuffer = false;
if (sendBuff) CUDACHECK(hipPointerGetAttribute(&isManagedBuffer, HIP_POINTER_ATTRIBUTE_IS_MANAGED, const_cast<void*>(sendBuff)));
if (!isManagedBuffer && recvBuff) CUDACHECK(hipPointerGetAttribute(&isManagedBuffer, HIP_POINTER_ATTRIBUTE_IS_MANAGED, const_cast<void*>(recvBuff)));
Expand Down
4 changes: 4 additions & 0 deletions src/misc/mscclpp/mscclpp_nccl_syms.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@ ncclRedOpDestroy mscclpp_ncclRedOpDestroy
ncclReduce mscclpp_ncclReduce
ncclReduceScatter mscclpp_ncclReduceScatter
ncclSend mscclpp_ncclSend
ncclCommRegister mscclpp_ncclCommRegister
ncclCommDeregister mscclpp_ncclCommDeregister
ncclBuffIsRegistered mscclpp_ncclBuffIsRegistered
ncclBufferSize mscclpp_ncclBufferSize
75 changes: 53 additions & 22 deletions src/register.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include "net.h"
#include "register.h"
#include "api_trace.h"
#ifdef ENABLE_MSCCLPP
#include "mscclpp/mscclpp_nccl.h"
#endif

ncclResult_t ncclNetDeregister(struct ncclComm* comm, struct ncclReg* reg) {
struct ncclRegCache* cache = &comm->regCache;
Expand Down Expand Up @@ -155,32 +158,60 @@ NCCL_API(ncclResult_t, ncclCommRegister, const ncclComm_t comm, void* buff, size
ncclResult_t ncclCommRegister_impl(const ncclComm_t comm, void* buff, size_t size, void** handle) {
NCCLCHECK(CommCheck(comm, "ncclCommRegister", "comm"));
if (comm->checkPointers) NCCLCHECK(CudaPtrCheck(buff, comm, "buff", "ncclCommRegister"));
NCCLCHECK(ncclRegister(comm, buff, size, handle));
#ifdef ENABLE_MSCCLPP
if (comm->mscclCompatible && size > 0 && (size & 31) == 0 && size <= comm->mscclpp_threshold){
bool isManagedBuffer = false;
CUDACHECK(hipPointerGetAttribute(&isManagedBuffer, HIP_POINTER_ATTRIBUTE_IS_MANAGED, const_cast<void*>(buff)));
if(!isManagedBuffer){
INFO(NCCL_INIT, "MSCCL++: ncclCommRegister");
NCCLCHECK(mscclpp_ncclCommRegister(comm->mscclpp_comm, buff, size, handle));
}
else{
WARN("MSCCL++: Cannot register user-buffers on managed memory");
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will do nothing, issue a warning, then fall through and return ncclSuccess. Is that the right behaviour? Should it not return an error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the expectation is that if no registration happens in MSCCL++ due to the buffer being in managed memory, no MSCCL++ or RCCL registration will happen and execution will fall back to RCCL kernel without UBR.

I actually need to do a small adjustment to not cause a RCCL deregistration error when MSCCL++ detects no registered buffer because of this. I just tested a simple fix for this.

But since we always fallback to RCCL kernel when MSCCL++ is not enabled, I think a warning without return an error is OK. Also, I haven't seen any errors returned in any UBR related routines, except when RCCL cannot find the registration handle during deregistration. Of course, this definitely makes it harder to see if anything goes wrong.

}
else
#endif
{
INFO(NCCL_INIT, "RCCL: ncclCommRegister");
NCCLCHECK(ncclRegister(comm, buff, size, handle));
}
isaki001 marked this conversation as resolved.
Show resolved Hide resolved
return ncclSuccess;
}

NCCL_API(ncclResult_t, ncclCommDeregister, const ncclComm_t comm, void* handle);
ncclResult_t ncclCommDeregister_impl(const ncclComm_t comm, void* handle) {
NCCLCHECK(CommCheck(comm, "ncclCommRegister", "comm"));
struct ncclReg* reg = (struct ncclReg*)handle;
struct ncclRegCache* cache = &comm->regCache;
int slot;
for (slot=0; slot<cache->population && cache->slots[slot] != reg; slot++);
if (slot == cache->population) {
WARN("Deregister: Could not find handle");
return ncclInvalidUsage;
}
if (--reg->refs) return ncclSuccess;
NCCLCHECK(ncclNetDeregister(comm, reg));
if (reg->state & NVLS_REG_COMPLETE) {
NCCLCHECK(ncclNvlsDeregBuffer(&reg->mcHandle, reg->regAddr, reg->dev, reg->regSize));
reg->regAddr = (CUdeviceptr)NULL;
}
if (reg->state & COLLNET_REG_COMPLETE) {
NCCLCHECK(ncclCollnetDeregBuffer(comm, reg->proxyconn, reg->collnetHandle));
}
free(reg);
memmove(cache->slots+slot, cache->slots+slot+1, (cache->population-slot-1)*sizeof(struct ncclReg*));
cache->population -= 1;

#ifdef ENABLE_MSCCLPP

const size_t size = mscclpp_ncclBufferSize(comm->mscclpp_comm, handle);
if (comm->mscclCompatible && size > 0 && (size & 31) == 0 && size <= comm->mscclpp_threshold){
NCCLCHECK(mscclpp_ncclCommDeregister(comm->mscclpp_comm, handle));
}
else
#endif
{
NCCLCHECK(CommCheck(comm, "ncclCommRegister", "comm"));
struct ncclReg* reg = (struct ncclReg*)handle;
struct ncclRegCache* cache = &comm->regCache;
int slot;
for (slot=0; slot<cache->population && cache->slots[slot] != reg; slot++);
if (slot == cache->population) {
WARN("Deregister: Could not find handle");
return ncclInvalidUsage;
}
if (--reg->refs) return ncclSuccess;
NCCLCHECK(ncclNetDeregister(comm, reg));
if (reg->state & NVLS_REG_COMPLETE) {
NCCLCHECK(ncclNvlsDeregBuffer(&reg->mcHandle, reg->regAddr, reg->dev, reg->regSize));
reg->regAddr = (CUdeviceptr)NULL;
}
if (reg->state & COLLNET_REG_COMPLETE) {
NCCLCHECK(ncclCollnetDeregBuffer(comm, reg->proxyconn, reg->collnetHandle));
}
free(reg);
memmove(cache->slots+slot, cache->slots+slot+1, (cache->population-slot-1)*sizeof(struct ncclReg*));
cache->population -= 1;
}
corey-derochie-amd marked this conversation as resolved.
Show resolved Hide resolved
return ncclSuccess;
}