From ab8aff95e85ecc76b4c7bdef4b7af98ece0bfa1e Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Tue, 26 Nov 2024 19:57:10 -0600 Subject: [PATCH 01/25] Fix collective trace --- src/device/common.h | 6 ++---- src/include/device.h | 2 -- src/init.cc | 4 ++-- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/device/common.h b/src/device/common.h index 390608a9d..524f2216b 100644 --- a/src/device/common.h +++ b/src/device/common.h @@ -57,13 +57,13 @@ __trace_hwreg()\ if (ncclShmem.work.header.type == ncclWorkTypeP2p) { \ struct ncclWorkElemP2p *p2pElems = ncclShmem.work.p2pElems; \ - collTrace->p2p[0].connIndex = 0; \ + collTrace->p2p[0].connIndex = p2pElems[0].connIndex; \ collTrace->p2pOpCount[0] = p2pElems[0].opCount; \ collTrace->p2p[0].ngroups = p2pElems[0].ngroups; \ collTrace->p2p[0].nWarps = p2pElems[0].nWarps; \ collTrace->p2p[0].warpStart = p2pElems[0].warpStart; \ collTrace->p2p[0].peer = p2pElems[0].p2pType == ncclWorkP2pTypeRecv ? (uint16_t)(p2pElems[0].peer) : -1; \ - collTrace->p2p[1].connIndex = 0; \ + collTrace->p2p[1].connIndex = p2pElems[1].connIndex; \ collTrace->p2pOpCount[1] = p2pElems[1].opCount; \ collTrace->p2p[1].ngroups = p2pElems[1].ngroups; \ collTrace->p2p[1].nWarps = p2pElems[1].nWarps; \ @@ -74,8 +74,6 @@ struct ncclWorkElem *elems = ncclShmem.work.elems; \ collTrace->opCount = elems[0].opCount; \ collTrace->coll.nWarps = elems[0].nWarps; \ - collTrace->coll.bid = elems[0].bid; \ - collTrace->coll.nChannels = elems[0].nChannels; \ collTrace->type = (launch_type) | ncclCollTraceCollElemType; \ } \ } diff --git a/src/include/device.h b/src/include/device.h index 983601e82..05c9462c9 100644 --- a/src/include/device.h +++ b/src/include/device.h @@ -388,8 +388,6 @@ struct ncclCollTrace { uint64_t data_1; struct { uint8_t nWarps; - uint8_t bid; - uint8_t nChannels; } coll; struct { int16_t peer; diff --git a/src/init.cc b/src/init.cc index 9bec5637f..d1e2eb787 100644 --- a/src/init.cc +++ b/src/init.cc @@ -260,7 +260,7 @@ void *ncclCommThreadMain(void *arg) { sprintf(line, "## [%012.6f] [%02d:%02d:%02d] %06lx", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, td->bid, td->channelId, td->opCount); offset = strlen(line); if (type == ncclCollTraceCollElemType) { - sprintf(line+offset, " CE %s nw %d bi %d nc %d busId %lx nRanks %d", funcNames[fIdx], td->coll.nWarps, td->coll.bid, td->coll.nChannels, comm->busId, comm->nRanks); + sprintf(line+offset, " CE %s nw %d busId %lx nRanks %d", funcNames[fIdx], td->coll.nWarps, comm->busId, comm->nRanks); } else if (type == ncclCollTraceP2pElemType) { sprintf(line+offset, " PE %s %d -> %d/%d/%d/%d conn/nw/ws/ng %d/%d/%d/%d -> %d busId %lx nRanks %d", funcNames[fIdx], td->p2p[0].peer, td->p2p[0].connIndex, td->p2p[0].nWarps, td->p2p[0].warpStart, td->p2p[0].ngroups, @@ -275,7 +275,7 @@ void *ncclCommThreadMain(void *arg) { sprintf(line+offset, " CL %s", funcNames[fIdx]); offset = strlen(line); if ((type&0xf0) == ncclCollTraceCollElemType) - sprintf(line+offset, " nw %d bi %d nc %d busId %lx nRanks %d", td->coll.nWarps, td->coll.bid, td->coll.nChannels, comm->busId, comm->nRanks); + sprintf(line+offset, " nw %d busId %lx nRanks %d", td->coll.nWarps, comm->busId, comm->nRanks); else if ((type&0xf0) == ncclCollTraceP2pElemType) sprintf(line+offset, " %d -> %d/%d/%d/%d conn/nw/ws/ng %d/%d/%d/%d -> %d busId %lx nRanks %d", td->p2p[0].peer, td->p2p[0].connIndex, td->p2p[0].nWarps, td->p2p[0].warpStart, td->p2p[0].ngroups, From 1a2aa40f6dc109653cc8ac021850f053d947d440 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Wed, 27 Nov 2024 13:12:24 -0600 Subject: [PATCH 02/25] Use nontemporal for st_global --- src/device/op128.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/device/op128.h b/src/device/op128.h index 2bf7d0739..8ee0b4224 100644 --- a/src/device/op128.h +++ b/src/device/op128.h @@ -199,8 +199,8 @@ template<> __device__ __forceinline__ void st_global<0>(uintptr_t addr, BytePack } \ template<> \ __device__ __forceinline__ void st_##space(addr_cxx_ty addr, BytePack value) { \ - data_cxx_ty tmp = value.native; \ - *((data_cxx_ty *)addr) = tmp; \ + data_cxx_ty tmp = __builtin_nontemporal_load((data_cxx_ty *)&value.native); \ + __builtin_nontemporal_store(tmp, (data_cxx_ty *)addr); \ } // #if __CUDA_ARCH__ >= 700 From 56f1474823177e8408b958b05aa27c53a21b9564 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Wed, 27 Nov 2024 16:03:03 -0600 Subject: [PATCH 03/25] Fix previous commit --- src/device/op128.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/device/op128.h b/src/device/op128.h index 8ee0b4224..72f90d0c1 100644 --- a/src/device/op128.h +++ b/src/device/op128.h @@ -199,8 +199,7 @@ template<> __device__ __forceinline__ void st_global<0>(uintptr_t addr, BytePack } \ template<> \ __device__ __forceinline__ void st_##space(addr_cxx_ty addr, BytePack value) { \ - data_cxx_ty tmp = __builtin_nontemporal_load((data_cxx_ty *)&value.native); \ - __builtin_nontemporal_store(tmp, (data_cxx_ty *)addr); \ + __builtin_nontemporal_store(value.native, (data_cxx_ty *)addr); \ } // #if __CUDA_ARCH__ >= 700 From 873737075a1e68da69cb87c7979d44cf3d35af22 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Mon, 9 Dec 2024 09:44:42 -0800 Subject: [PATCH 04/25] Add HDP flush to data receive path --- src/transport/net.cc | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/transport/net.cc b/src/transport/net.cc index 8abb47a37..7b8057145 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -249,7 +249,11 @@ static ncclResult_t recvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, netId, 0, &req.useGdr)); // Determine whether we need to flush the GDR buffer on recv or not - if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm->topo, myInfo->busId, &req.needFlush)); + if (req.useGdr) { + NCCLCHECK(ncclTopoNeedFlush(comm->topo, myInfo->busId, &req.needFlush)); + CUDACHECK(hipDeviceGetAttribute((int*)&req.curr_hdp_reg, hipDeviceAttributeHdpMemFlushCntl, myInfo->cudaDev)); + recv->conn.curr_hdp_reg = req.curr_hdp_reg; + } // We don't support PXN on receive yet tpProxyRank = comm->topParentRanks[myInfo->rank]; @@ -1538,6 +1542,8 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct if (totalSize > 0 && p == NCCL_PROTO_SIMPLE && needFlush) { // GDRCOPY support struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources); + if (resources->curr_hdp_reg) *resources->curr_hdp_reg = 0x1; + __sync_synchronize(); if (resources->gdcFlush) { #if defined (__x86_64__) // Force a PCI-E read from GPU memory From 99837475af8ba739a373c9a8b2eeada46c3eb4ba Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Mon, 9 Dec 2024 10:04:08 -0800 Subject: [PATCH 05/25] Fix previous commit --- src/transport/net.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/transport/net.cc b/src/transport/net.cc index 7b8057145..47b1c565b 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -658,6 +658,7 @@ static ncclResult_t recvProxySetup(struct ncclProxyConnection* connection, struc resources->needFlush = req->needFlush; resources->channelId = req->channelId; resources->connIndex = req->connIndex; + resources->curr_hdp_reg = req->curr_hdp_reg; ncclNetProperties_t props; NCCLCHECK(proxyState->ncclNet->getProperties(req->netDev, &props)); /* DMA-BUF support */ From 605a16488fdfb469cbbb4facf5041edbc483b5a8 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Wed, 11 Dec 2024 11:35:10 -0800 Subject: [PATCH 06/25] Control flushing by NCCL_NET_FORCE_FLUSH and RCCL_NET_HDP_FLUSH --- src/graph/paths.cc | 4 ++-- src/transport/net.cc | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/graph/paths.cc b/src/graph/paths.cc index 8e64e108c..cb26ed92a 100644 --- a/src/graph/paths.cc +++ b/src/graph/paths.cc @@ -450,7 +450,7 @@ ncclResult_t ncclTopoCheckGdr(struct ncclTopoSystem* system, int64_t busId, int6 } // Set to 0 to disable the flush on Hopper when using GDR -NCCL_PARAM(NetForceFlush, "NET_FORCE_FLUSH", 0); +NCCL_PARAM(NetForceFlush, "NET_FORCE_FLUSH", 1); // Determine whether we need to flush the GDR recv buffers ncclResult_t ncclTopoNeedFlush(struct ncclTopoSystem* system, int64_t busId, int* flush) { @@ -458,7 +458,7 @@ ncclResult_t ncclTopoNeedFlush(struct ncclTopoSystem* system, int64_t busId, int NCCLCHECK(ncclTopoIdToIndex(system, GPU, busId, &g)); struct ncclTopoNode* gpu = system->nodes[GPU].nodes+g; #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) - *flush = 1; + *flush = ncclParamNetForceFlush(); #else // Flush is required on Ampere and earlier *flush = gpu->gpu.cudaCompCap < 90 ? 1 : ncclParamNetForceFlush(); diff --git a/src/transport/net.cc b/src/transport/net.cc index 47b1c565b..bcd0da656 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -1354,6 +1354,8 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct return ncclSuccess; } +RCCL_PARAM(NetHdpFlush, "NET_HDP_FLUSH", 1); + static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct ncclProxyArgs* args) { #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) g_npkit_net_poll_cnt++; @@ -1543,8 +1545,10 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct if (totalSize > 0 && p == NCCL_PROTO_SIMPLE && needFlush) { // GDRCOPY support struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources); - if (resources->curr_hdp_reg) *resources->curr_hdp_reg = 0x1; - __sync_synchronize(); + if (rcclParamNetHdpFlush() && resources->curr_hdp_reg) { + *resources->curr_hdp_reg = 0x1; + __sync_synchronize(); + } if (resources->gdcFlush) { #if defined (__x86_64__) // Force a PCI-E read from GPU memory From 68cb0cf5f4f009020acdb2bc6e6a5cb588f54ff2 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Wed, 11 Dec 2024 13:22:18 -0800 Subject: [PATCH 07/25] Introduce RCCL_NET_HDP_FLUSH and RCCL_NET_GDR_FLUSH Both are on by default. Turn both off will skip all flush will likely result in data error. --- src/graph/paths.cc | 4 ++-- src/transport/net.cc | 13 ++++++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/graph/paths.cc b/src/graph/paths.cc index cb26ed92a..8e64e108c 100644 --- a/src/graph/paths.cc +++ b/src/graph/paths.cc @@ -450,7 +450,7 @@ ncclResult_t ncclTopoCheckGdr(struct ncclTopoSystem* system, int64_t busId, int6 } // Set to 0 to disable the flush on Hopper when using GDR -NCCL_PARAM(NetForceFlush, "NET_FORCE_FLUSH", 1); +NCCL_PARAM(NetForceFlush, "NET_FORCE_FLUSH", 0); // Determine whether we need to flush the GDR recv buffers ncclResult_t ncclTopoNeedFlush(struct ncclTopoSystem* system, int64_t busId, int* flush) { @@ -458,7 +458,7 @@ ncclResult_t ncclTopoNeedFlush(struct ncclTopoSystem* system, int64_t busId, int NCCLCHECK(ncclTopoIdToIndex(system, GPU, busId, &g)); struct ncclTopoNode* gpu = system->nodes[GPU].nodes+g; #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) - *flush = ncclParamNetForceFlush(); + *flush = 1; #else // Flush is required on Ampere and earlier *flush = gpu->gpu.cudaCompCap < 90 ? 1 : ncclParamNetForceFlush(); diff --git a/src/transport/net.cc b/src/transport/net.cc index bcd0da656..ed7fd780c 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -1355,6 +1355,7 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct } RCCL_PARAM(NetHdpFlush, "NET_HDP_FLUSH", 1); +RCCL_PARAM(NetGdrFlush, "NET_GDR_FLUSH", 1); static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct ncclProxyArgs* args) { #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) @@ -1546,8 +1547,13 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct // GDRCOPY support struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources); if (rcclParamNetHdpFlush() && resources->curr_hdp_reg) { + static bool once = true; *resources->curr_hdp_reg = 0x1; __sync_synchronize(); + if (once) { + once = false; + INFO(NCCL_INIT, "%s: flushed HDP %p", __func__, resources->curr_hdp_reg); + } } if (resources->gdcFlush) { #if defined (__x86_64__) @@ -1557,8 +1563,9 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct WARN("NET: GDR Flush only supported on x86_64"); return ncclInternalError; #endif - } else { + } else if (rcclParamNetGdrFlush()) { int subCount = 0; + static bool once = true; for (int i=0; igroupSize; i++) { struct ncclProxySubArgs* sub = subGroup + i; if (step < sub->nsteps) { @@ -1575,6 +1582,10 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct } struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources); NCCLCHECK(proxyState->ncclNet->iflush(resources->netRecvComm, subCount, ptrs, sizes, mhandles, subGroup->requests+(step%NCCL_STEPS))); + if (once) { + once = false; + INFO(NCCL_INIT, "%s: issued GDR flush", __func__); + } } } args->idle = 0; From 5684290e99417ed0ee1e73566630153443393b86 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Thu, 12 Dec 2024 11:48:54 -0800 Subject: [PATCH 08/25] Enable GDR copy by default --- src/init.cc | 2 +- src/transport/net.cc | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/init.cc b/src/init.cc index d1e2eb787..a59d912f3 100644 --- a/src/init.cc +++ b/src/init.cc @@ -122,7 +122,7 @@ static constexpr int64_t defaultEnableMscclpp = 0; RCCL_PARAM(MscclppEnabled, "MSCCLPP_ENABLE", defaultEnableMscclpp); // GDRCOPY support: Off by default -NCCL_PARAM(GdrCopyEnable, "GDRCOPY_ENABLE", 0); +NCCL_PARAM(GdrCopyEnable, "GDRCOPY_ENABLE", 1); // GDRCOPY support gdr_t ncclGdrCopy = NULL; diff --git a/src/transport/net.cc b/src/transport/net.cc index ed7fd780c..ca2ddce5c 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -1558,7 +1558,12 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct if (resources->gdcFlush) { #if defined (__x86_64__) // Force a PCI-E read from GPU memory + static bool once = true; asm volatile ("mov (%0), %%eax" :: "l"(resources->gdcFlush) : "%eax"); + if (once) { + once = false; + INFO(NCCL_INIT, "%s: issued GDC flush", __func__); + } #else WARN("NET: GDR Flush only supported on x86_64"); return ncclInternalError; From 35ddf445a77a2a3b5cb663d796c23a4022dd302b Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Thu, 19 Dec 2024 14:19:37 -0800 Subject: [PATCH 09/25] Remove GDR flush env var because it is disabled by GDC flush --- src/transport/net.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/transport/net.cc b/src/transport/net.cc index ca2ddce5c..5f7846bd1 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -1355,7 +1355,6 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct } RCCL_PARAM(NetHdpFlush, "NET_HDP_FLUSH", 1); -RCCL_PARAM(NetGdrFlush, "NET_GDR_FLUSH", 1); static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct ncclProxyArgs* args) { #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) @@ -1568,7 +1567,7 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct WARN("NET: GDR Flush only supported on x86_64"); return ncclInternalError; #endif - } else if (rcclParamNetGdrFlush()) { + } else { int subCount = 0; static bool once = true; for (int i=0; igroupSize; i++) { From 01a29667b67c7b66414daa7d998efb6fe8f71b62 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Thu, 9 Jan 2025 09:55:01 -0800 Subject: [PATCH 10/25] Output kernel collective trace at comm destroy by default --- src/enqueue.cc | 2 +- src/include/comm.h | 1 + src/init.cc | 34 ++++++++++++++++++++++------------ 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/enqueue.cc b/src/enqueue.cc index 9cb0044a8..b275c3d3b 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -31,7 +31,7 @@ struct ncclKernelMatch { }; #ifdef ENABLE_COLLTRACE -#define ncclGetKernelIndex(p_comm) ((p_comm)->unroll + ((p_comm)->collTraceThread ? 2 : 0)) +#define ncclGetKernelIndex(p_comm) ((p_comm)->unroll + ((p_comm)->collTraceEnabled ? 2 : 0)) static ncclKernelMatch const ncclKerns[4] = { {(void *)ncclDevKernel_Generic, true}, {(void *)ncclDevKernel_Generic_4, true}, diff --git a/src/include/comm.h b/src/include/comm.h index be3422ec4..182a0d03a 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -403,6 +403,7 @@ struct ncclComm { union ncclCollTraceTail *collTraceTail; pthread_t collTraceThread; volatile bool collTraceExit; + bool collTraceEnabled; #endif ncclConfig_t config; diff --git a/src/init.cc b/src/init.cc index a59d912f3..4cbbca2e4 100644 --- a/src/init.cc +++ b/src/init.cc @@ -218,6 +218,7 @@ void NCCL_NO_OPTIMIZE commPoison(ncclComm_t comm) { } RCCL_PARAM(KernelCollTraceEnable, "KERNEL_COLL_TRACE_ENABLE", 0); +RCCL_PARAM(KernelCollTraceThreadEnable, "KERNEL_COLL_TRACE_THREAD_ENABLE", 0); #ifdef ENABLE_COLLTRACE // Should be in sync with 'ALL_COLLS' in Generator.cmake @@ -231,16 +232,14 @@ void *ncclCommThreadMain(void *arg) { do { int numActiveChans = MAXCHANNELS; for (int channel = 0; channel < MAXCHANNELS; channel++) { - int tail = comm->collTraceTail[channel].tail%COLLTRACE_NUM_ITEMS; + int tail = comm->collTraceTail[channel].tail; int count; - if (head[channel] <= tail) - count = tail - head[channel]; - else - count = COLLTRACE_NUM_ITEMS + head[channel] - tail; + count = tail - head[channel]; if (count == 0) { numActiveChans--; continue; } + count = count%COLLTRACE_NUM_ITEMS; for (int i = 0; i < count; i++) { volatile struct ncclCollTrace *td = comm->collTrace+COLLTRACE_NUM_ITEMS*channel+head[channel]; uint8_t type = td->type; @@ -296,14 +295,16 @@ void *ncclCommThreadMain(void *arg) { INFO(NCCL_COLL, "%s", line); td->type = ncclCollTraceNotReady; head[channel] ++; - head[channel] %= COLLTRACE_NUM_ITEMS; } } if (comm->collTraceExit && numActiveChans == 0) break; usleep(1000); //sleep 1ms } while(true); - pthread_exit(NULL); + if (comm->collTraceThread) + pthread_exit(NULL); + else + return 0; } #endif @@ -398,7 +399,12 @@ static ncclResult_t commFree(ncclComm_t comm) { #ifdef ENABLE_COLLTRACE comm->collTraceExit = 1; - if (comm->collTraceThread) pthread_join(comm->collTraceThread, NULL); + if (comm->collTraceEnabled) { + if (comm->collTraceThread) + pthread_join(comm->collTraceThread, NULL); + else + ncclCommThreadMain((void *)comm); + } NCCLCHECK(ncclCudaHostFree((void *)comm->collTrace)); NCCLCHECK(ncclCudaHostFree((void *)comm->collTraceTail)); #endif @@ -579,10 +585,14 @@ static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, in NCCLCHECK(ncclCudaHostCalloc(&comm->collTraceTail, MAXCHANNELS)); NCCLCHECK(ncclCudaHostCalloc(&comm->collTrace, COLLTRACE_NUM_ITEMS*MAXCHANNELS)); comm->collTraceExit = 0; - if ((ncclDebugLevel >= NCCL_LOG_INFO) && rcclParamKernelCollTraceEnable()) - pthread_create(&comm->collTraceThread, NULL, ncclCommThreadMain, (void *)comm); - else - comm->collTraceThread = 0; + comm->collTraceEnabled = false; // we can enable colltrace without starting a thread + if ((ncclDebugLevel >= NCCL_LOG_INFO) && rcclParamKernelCollTraceEnable()) { + comm->collTraceEnabled = true; + if (rcclParamKernelCollTraceThreadEnable()) + pthread_create(&comm->collTraceThread, NULL, ncclCommThreadMain, (void *)comm); + else + comm->collTraceThread = 0; + } #endif comm->collNetSupport = 0; memset(comm->collNetSupportMatrix, 0, sizeof(comm->collNetSupportMatrix)); From 974a3de6ba8859dd004630746aeb2def85c0955a Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Sat, 14 Dec 2024 16:17:33 -0800 Subject: [PATCH 11/25] Limit kernel timeout messages to 100 --- src/device/prims_simple.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/device/prims_simple.h b/src/device/prims_simple.h index bb9b02bc4..ce407fc8b 100644 --- a/src/device/prims_simple.h +++ b/src/device/prims_simple.h @@ -116,12 +116,16 @@ class Primitives< if (((flags & (Recv*RoleWaitRecv)) && !noRecvWait) || ((flags & (Send*RoleWaitSend)) && !noSendWait)) { int spins = 0; + static int repeat = 100; while (connStepCache + (isSendNotRecv ? NCCL_STEPS : 0) < step + StepPerSlice) { __builtin_amdgcn_s_sleep(1); connStepCache = loadStepValue(connStepPtr); if (checkAbort(spins)) break; //if (spins == 0) printf("r=%d b=%d t=%d SPUN OUT got=%d want=%d\n", ncclShmem.comm.rank, blockIdx.x, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice)); - if (spins == 0) traceData(__LINE__, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice)); + if (spins == 0 && repeat > 0) { + repeat --; + traceData(__LINE__, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice)); + } } __asm__ __volatile__("s_wakeup"); } From d3017d61264e43bb551385f3ca38d534316c0997 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Mon, 13 Jan 2025 10:01:43 -0800 Subject: [PATCH 12/25] Use system relaxed atomic for loadInt --- src/device/common_kernel.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/device/common_kernel.h b/src/device/common_kernel.h index 4161bb6b1..4e198e0f7 100644 --- a/src/device/common_kernel.h +++ b/src/device/common_kernel.h @@ -23,7 +23,7 @@ inline __device__ int min(int a, ssize_t b) { return (a < b) ? a : b; } inline __device__ int loadInt(int* ptr) { int v; - v = atomicAdd((unsigned long long *)ptr, 0); + v = __atomic_load_n(ptr, __ATOMIC_RELAXED); return v; } From 097b47380a6e1c1795c5466e99e4759724e81b86 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Mon, 13 Jan 2025 12:20:24 -0800 Subject: [PATCH 13/25] Refine timeout messages and use atomic for setting offset from CPU --- src/device/prims_simple.h | 3 ++- src/transport/net.cc | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/device/prims_simple.h b/src/device/prims_simple.h index ce407fc8b..cb1de6f9e 100644 --- a/src/device/prims_simple.h +++ b/src/device/prims_simple.h @@ -56,6 +56,7 @@ class Primitives< uint32_t* next_hdp_reg; void* mhandle; void* netDeviceHandle; + int repeat; #if defined(ENABLE_NPKIT) public: @@ -116,7 +117,7 @@ class Primitives< if (((flags & (Recv*RoleWaitRecv)) && !noRecvWait) || ((flags & (Send*RoleWaitSend)) && !noSendWait)) { int spins = 0; - static int repeat = 100; + repeat = 50; while (connStepCache + (isSendNotRecv ? NCCL_STEPS : 0) < step + StepPerSlice) { __builtin_amdgcn_s_sleep(1); connStepCache = loadStepValue(connStepPtr); diff --git a/src/transport/net.cc b/src/transport/net.cc index 5f7846bd1..a261225bf 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -1167,7 +1167,7 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct int sharedBuffSlot = sub->posted%maxDepth; int offset; NCCLCHECK(sharedBuffersGet(proxyState, sub->channelId, sharedBuffSlot*args->nsubs+s, &offset, NULL)); - resources->recvMem->connFifo[buffSlot].offset = offset; + __atomic_store_n(&resources->recvMem->connFifo[buffSlot].offset, offset, __ATOMIC_RELAXED); __sync_synchronize(); } volatile uint64_t* sendHead = resources->gdcSync ? resources->gdcSync : &resources->sendMem->head; @@ -1435,7 +1435,7 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct int sharedBuffSlot = sub->posted%maxDepth; int offset; NCCLCHECK(sharedBuffersGet(proxyState, sub->channelId, sharedBuffSlot*args->nsubs+s+i, &offset, sizes+subCount)); - connFifo[buffSlot].offset = offset; + __atomic_store_n(&connFifo[buffSlot].offset, offset, __ATOMIC_RELAXED); ptrs[subCount] = localBuff+offset; } } else { From ae3b55e1d96cb19e6ae76ef9087c0e19a4bd1792 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Tue, 14 Jan 2025 14:43:59 -0800 Subject: [PATCH 14/25] Use atomic for exchanging size and offset between CPU and GPU --- src/device/prims_simple.h | 2 +- src/transport/net.cc | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/device/prims_simple.h b/src/device/prims_simple.h index cb1de6f9e..b2f0a098b 100644 --- a/src/device/prims_simple.h +++ b/src/device/prims_simple.h @@ -133,7 +133,7 @@ class Primitives< if (flags & (Recv*RoleWaitRecv | Send*RoleWaitSend)) { if (flags & ConnFifoEnabled) - connFifo[step%NCCL_STEPS].size = nelts*sizeof(T); + __atomic_store_n(&connFifo[step%NCCL_STEPS].size, nelts*sizeof(T), __ATOMIC_RELAXED); void **ptrs = isSendNotRecv ? (ncclShmem.groups[group].dsts + Dst) : (ncclShmem.groups[group].srcs + Src); diff --git a/src/transport/net.cc b/src/transport/net.cc index a261225bf..d9e494533 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -1194,7 +1194,7 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct sub->npKitSizesFifo[buffSlot] = size; #endif bool shared = (p == NCCL_PROTO_SIMPLE) && resources->shared; - char* buff = shared ? localBuff+connFifo[buffSlot].offset : localBuff+buffSlot*stepSize; + char* buff = shared ? localBuff+__atomic_load_n(&connFifo[buffSlot].offset, __ATOMIC_RELAXED) : localBuff+buffSlot*stepSize; int ready = 1; if (p == NCCL_PROTO_LL128) { ready = resources->useGdr; @@ -1219,7 +1219,7 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct if (f1[0] != flag || f2[0] != flag) { ready = 0; break; } } } else if (p == NCCL_PROTO_SIMPLE && resources->shared) { - buff = sub->reg ? (char*)sub->recvbuff : localBuff+resources->recvMem->connFifo[buffSlot].offset; + buff = sub->reg ? (char*)sub->recvbuff : localBuff+__atomic_load_n(&resources->recvMem->connFifo[buffSlot].offset, __ATOMIC_RELAXED); } if (ready) { // flush HDP if not done @@ -1317,11 +1317,11 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct sub->nsteps++; } else { // Signal the GPU the send is complete and it can return. - connFifo[sub->base%NCCL_STEPS].size = -1; + __atomic_store_n(&connFifo[sub->base%NCCL_STEPS].size, -1, __ATOMIC_RELAXED); } } // Make sure size is reset to -1 before we update the head. - if (sub->reg == 0) connFifo[buffSlot].size = -1; + if (sub->reg == 0) __atomic_store_n(&connFifo[buffSlot].size, -1, __ATOMIC_RELAXED); __sync_synchronize(); TRACE(NCCL_NET, "sendProxy [%ld/%d] request %p done", sub->done, buffSlot, sub->requests[buffSlot]); sub->done += args->sliceSteps; @@ -1428,7 +1428,7 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct if (p == NCCL_PROTO_SIMPLE && resources->shared) { if (sub->reg) { // Wait until CUDA kernel has started before we access the user buffer directly. - if (connFifo[sub->base%NCCL_STEPS].size == -1) continue; + if (__atomic_load_n(&connFifo[sub->base%NCCL_STEPS].size, __ATOMIC_RELAXED) == -1) continue; ptrs[subCount] = sub->recvbuff; sizes[subCount] = std::min(MAX_NET_SIZE, sub->nbytes); } else { @@ -1530,7 +1530,7 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct // There is a __sync_synchronize() later to ensure it is reset before it is set again by the GPU. struct recvNetResources* resources = (struct recvNetResources*) (sub->connection->transportResources); volatile struct ncclConnFifo* connFifo = (volatile struct ncclConnFifo*)resources->recvMem->connFifo; - connFifo[sub->base%NCCL_STEPS].size = -1; + __atomic_store_n(&connFifo[sub->base%NCCL_STEPS].size, -1, __ATOMIC_RELAXED); } } } @@ -1578,7 +1578,7 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct char* localBuff = NCCL_NET_MAP_GET_POINTER(&resources->map, cpu, buffs[p]); int buffSlot = (sub->base+sub->received-args->sliceSteps)%NCCL_STEPS; ptrs[subCount] = resources->shared ? - (sub->reg ? (char*)sub->recvbuff : localBuff+resources->recvMem->connFifo[buffSlot].offset) : + (sub->reg ? (char*)sub->recvbuff : localBuff+__atomic_load_n(&resources->recvMem->connFifo[buffSlot].offset, __ATOMIC_RELAXED)) : localBuff+buffSlot*stepSize; mhandles[subCount] = sub->mhandle; subCount++; From 157da0b88a6250a6ef190b0ce56cc90a34421c01 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Fri, 17 Jan 2025 16:46:22 -0800 Subject: [PATCH 15/25] Add kernel trace for barrier timeout --- src/device/primitives.h | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/device/primitives.h b/src/device/primitives.h index c9f824535..983032e40 100644 --- a/src/device/primitives.h +++ b/src/device/primitives.h @@ -25,7 +25,19 @@ if (wid == 0) { \ barrier_next[w] += nthreads/WARP_SIZE; \ atomicAdd((unsigned long long *)barriers, 1); \ - while (atomicAdd((unsigned long long *)barriers, 0) < barrier_next[w]) __builtin_amdgcn_s_sleep(1); \ + int spins = 0; \ + int rate_limit = 50; \ + while (atomicAdd((unsigned long long *)barriers, 0) < barrier_next[w]) { \ + spins++; \ + if (spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { \ + spins = 0; \ + } \ + if (spins == 0 && rate_limit > 0) { \ + rate_limit --; \ + traceData(__LINE__, threadIdx.x, *barriers, barrier_next[w]); \ + } \ + __builtin_amdgcn_s_sleep(1); \ + } \ __asm__ __volatile__("s_wakeup"); \ } \ } \ From e6085f6feaa5c81e94d2adc2efed97be15d0f177 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Mon, 20 Jan 2025 12:01:42 -0800 Subject: [PATCH 16/25] Add backup barrier to avoid race in atomicAdd --- src/device/common.h | 8 +++++--- src/device/primitives.h | 12 +++++++++--- src/device/prims_ll.h | 2 +- src/device/prims_ll128.h | 2 +- src/device/prims_simple.h | 2 +- 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/device/common.h b/src/device/common.h index 524f2216b..c29a85a90 100644 --- a/src/device/common.h +++ b/src/device/common.h @@ -110,7 +110,7 @@ struct ncclShmemGroup { void* userOutput; void* srcs[NCCL_MAX_ARITY+1]; void* dsts[NCCL_MAX_ARITY+1]; - uint64_t barrier; + uint64_t barrier[2]; uint64_t barrier_next[NCCL_MAX_GROUPS]; union { unpackGroupShmem unpack; @@ -260,8 +260,10 @@ __forceinline__ __device__ void ncclKernelMain(struct ncclDevComm* comm, struct } break; case 1: - if (tid < WARP_SIZE + NCCL_MAX_GROUPS) - ncclShmem.groups[tid-WARP_SIZE].barrier = 0; + if (tid < WARP_SIZE + NCCL_MAX_GROUPS) { + ncclShmem.groups[tid-WARP_SIZE].barrier[0] = 0; + ncclShmem.groups[tid-WARP_SIZE].barrier[1] = 0; + } break; case 2: if (tid < 2*WARP_SIZE + NCCL_MAX_GROUPS*NCCL_MAX_GROUPS) diff --git a/src/device/primitives.h b/src/device/primitives.h index 983032e40..675d660ec 100644 --- a/src/device/primitives.h +++ b/src/device/primitives.h @@ -24,17 +24,23 @@ const int wid = threadIdx.x%WARP_SIZE; \ if (wid == 0) { \ barrier_next[w] += nthreads/WARP_SIZE; \ - atomicAdd((unsigned long long *)barriers, 1); \ + atomicAdd((unsigned long long *)&barriers[w%2], 1); \ + atomicAdd((unsigned long long *)&barriers[(w+1)%2], 1); \ int spins = 0; \ int rate_limit = 50; \ - while (atomicAdd((unsigned long long *)barriers, 0) < barrier_next[w]) { \ + while (atomicAdd((unsigned long long *)&barriers[0], 0) < barrier_next[w] && \ + atomicAdd((unsigned long long *)&barriers[1], 0) < barrier_next[w]) { \ spins++; \ if (spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { \ + if (__atomic_load_n(ncclShmem.comm.abortFlag, __ATOMIC_SEQ_CST)) { \ + ncclShmem.aborted = 1; \ + break; \ + } \ spins = 0; \ } \ if (spins == 0 && rate_limit > 0) { \ rate_limit --; \ - traceData(__LINE__, threadIdx.x, *barriers, barrier_next[w]); \ + traceData(__LINE__, threadIdx.x, barriers[0]+(barriers[1]<<32), barrier_next[w]); \ } \ __builtin_amdgcn_s_sleep(1); \ } \ diff --git a/src/device/prims_ll.h b/src/device/prims_ll.h index 3be676a23..16e795b07 100644 --- a/src/device/prims_ll.h +++ b/src/device/prims_ll.h @@ -616,7 +616,7 @@ class Primitives: tid(tid), nthreads(nthreads), wid(tid%WARP_SIZE), group(group), stepLines(ncclShmem.comm.buffSizes[NCCL_PROTO_LL]/NCCL_STEPS/sizeof(ncclLLFifoLine)) { auto *channel = &ncclShmem.channel; - barriers = &ncclShmem.groups[group].barrier; + barriers = ncclShmem.groups[group].barrier; barrier_next = ncclShmem.groups[group].barrier_next; // If we are going to support oneshot collNet + LL, then we would need to add connector index here int nrecv=0, nsend=0; diff --git a/src/device/prims_ll128.h b/src/device/prims_ll128.h index 8b72b3578..3270f0166 100644 --- a/src/device/prims_ll128.h +++ b/src/device/prims_ll128.h @@ -506,7 +506,7 @@ class Primitives: flagThread((tid%4)==3), group(group), stepSize(ncclShmem.comm.buffSizes[NCCL_PROTO_LL128]/NCCL_STEPS/sizeof(uint64_t)) { auto *channel = &ncclShmem.channel; - barriers = &ncclShmem.groups[group].barrier; + barriers = ncclShmem.groups[group].barrier; barrier_next = ncclShmem.groups[group].barrier_next; int nrecv=0, nsend=0; while (nrecv < MaxRecv && recvPeers[nrecv] >= 0) { diff --git a/src/device/prims_simple.h b/src/device/prims_simple.h index b2f0a098b..d9ecdb68b 100644 --- a/src/device/prims_simple.h +++ b/src/device/prims_simple.h @@ -673,7 +673,7 @@ class Primitives< stepSize(stepSize_ == 0 ? ncclShmem.comm.buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS/sizeof(T) : stepSize_) { // For send operations, we need an extra warp to overlap the threadfence and the copy - barriers = &ncclShmem.groups[group].barrier; + barriers = ncclShmem.groups[group].barrier; barrier_next = ncclShmem.groups[group].barrier_next; this->nworkers = nthreads; From 452079b2eb2a7b5bca98e1eb9da51854f74189d2 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Mon, 20 Jan 2025 15:50:58 -0800 Subject: [PATCH 17/25] Use different counters for different warps --- src/device/common.h | 9 +++---- src/device/primitives.h | 53 +++++++++++++-------------------------- src/device/prims_ll.h | 1 + src/device/prims_ll128.h | 1 + src/device/prims_simple.h | 1 + 5 files changed, 25 insertions(+), 40 deletions(-) diff --git a/src/device/common.h b/src/device/common.h index c29a85a90..80b20b7df 100644 --- a/src/device/common.h +++ b/src/device/common.h @@ -110,8 +110,9 @@ struct ncclShmemGroup { void* userOutput; void* srcs[NCCL_MAX_ARITY+1]; void* dsts[NCCL_MAX_ARITY+1]; - uint64_t barrier[2]; + uint64_t barrier[NCCL_MAX_GROUPS]; uint64_t barrier_next[NCCL_MAX_GROUPS]; + uint8_t warpStart; union { unpackGroupShmem unpack; } devicePlugin; @@ -260,10 +261,8 @@ __forceinline__ __device__ void ncclKernelMain(struct ncclDevComm* comm, struct } break; case 1: - if (tid < WARP_SIZE + NCCL_MAX_GROUPS) { - ncclShmem.groups[tid-WARP_SIZE].barrier[0] = 0; - ncclShmem.groups[tid-WARP_SIZE].barrier[1] = 0; - } + if (tid < WARP_SIZE + NCCL_MAX_GROUPS*NCCL_MAX_GROUPS) + ncclShmem.groups[(tid-WARP_SIZE)/NCCL_MAX_GROUPS].barrier[(tid-WARP_SIZE)%NCCL_MAX_GROUPS] = 0; break; case 2: if (tid < 2*WARP_SIZE + NCCL_MAX_GROUPS*NCCL_MAX_GROUPS) diff --git a/src/device/primitives.h b/src/device/primitives.h index 675d660ec..cf50cf94d 100644 --- a/src/device/primitives.h +++ b/src/device/primitives.h @@ -15,7 +15,6 @@ #define NCCL_SPINS_BEFORE_CHECK_ABORT 1000000 -#if defined(__gfx940__) || defined(__gfx941__) || defined(__gfx942__) #define barrier_by_group() do { \ if (nthreads == NCCL_MAX_NTHREADS) { \ __builtin_amdgcn_s_barrier(); \ @@ -23,48 +22,32 @@ const int w = threadIdx.x/WARP_SIZE; \ const int wid = threadIdx.x%WARP_SIZE; \ if (wid == 0) { \ - barrier_next[w] += nthreads/WARP_SIZE; \ - atomicAdd((unsigned long long *)&barriers[w%2], 1); \ - atomicAdd((unsigned long long *)&barriers[(w+1)%2], 1); \ + barrier_next[w] += 1; \ + barriers[w] += 1; \ int spins = 0; \ int rate_limit = 50; \ - while (atomicAdd((unsigned long long *)&barriers[0], 0) < barrier_next[w] && \ - atomicAdd((unsigned long long *)&barriers[1], 0) < barrier_next[w]) { \ - spins++; \ - if (spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { \ - if (__atomic_load_n(ncclShmem.comm.abortFlag, __ATOMIC_SEQ_CST)) { \ - ncclShmem.aborted = 1; \ - break; \ + for (int i = 0; i < nthreads/WARP_SIZE; i++) { \ + uint8_t warp = ncclShmem.groups[group].warpStart + i; \ + while (atomicAdd((unsigned long long *)&barriers[warp], 0) < barrier_next[w]) { \ + spins++; \ + if (spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { \ + if (__atomic_load_n(ncclShmem.comm.abortFlag, __ATOMIC_SEQ_CST)) { \ + ncclShmem.aborted = 1; \ + break; \ + } \ + spins = 0; \ } \ - spins = 0; \ - } \ - if (spins == 0 && rate_limit > 0) { \ - rate_limit --; \ - traceData(__LINE__, threadIdx.x, barriers[0]+(barriers[1]<<32), barrier_next[w]); \ + if (spins == 0 && rate_limit > 0) { \ + rate_limit --; \ + traceData(__LINE__, threadIdx.x, barriers[warp], barrier_next[warp]); \ + } \ + __builtin_amdgcn_s_sleep(1); \ } \ - __builtin_amdgcn_s_sleep(1); \ + __asm__ __volatile__("s_wakeup"); \ } \ - __asm__ __volatile__("s_wakeup"); \ - } \ - } \ -} while (0) -#else -#define barrier_by_group() do { \ - if (nthreads == NCCL_MAX_NTHREADS) { \ - __threadfence(); __builtin_amdgcn_s_barrier(); \ - } else { \ - const int w = threadIdx.x/WARP_SIZE; \ - const int wid = threadIdx.x%WARP_SIZE; \ - __threadfence(); \ - if (wid == 0) { \ - barrier_next[w] += nthreads/WARP_SIZE; \ - atomicAdd((unsigned long long *)barriers, 1); \ - while (atomicAdd((unsigned long long *)barriers, 0) < barrier_next[w]) __builtin_amdgcn_s_sleep(1); \ - __asm__ __volatile__("s_wakeup"); \ } \ } \ } while (0) -#endif /* Protocol classes: ProtoSimple, ProtoLL, ProtoLL128 * We use these as template args to the Primtiives class instead of integral diff --git a/src/device/prims_ll.h b/src/device/prims_ll.h index 16e795b07..d4f46d93b 100644 --- a/src/device/prims_ll.h +++ b/src/device/prims_ll.h @@ -616,6 +616,7 @@ class Primitives: tid(tid), nthreads(nthreads), wid(tid%WARP_SIZE), group(group), stepLines(ncclShmem.comm.buffSizes[NCCL_PROTO_LL]/NCCL_STEPS/sizeof(ncclLLFifoLine)) { auto *channel = &ncclShmem.channel; + if (tid == 0) ncclShmem.groups[group].warpStart = threadIdx.x/WARP_SIZE; barriers = ncclShmem.groups[group].barrier; barrier_next = ncclShmem.groups[group].barrier_next; // If we are going to support oneshot collNet + LL, then we would need to add connector index here diff --git a/src/device/prims_ll128.h b/src/device/prims_ll128.h index 3270f0166..e3925869e 100644 --- a/src/device/prims_ll128.h +++ b/src/device/prims_ll128.h @@ -506,6 +506,7 @@ class Primitives: flagThread((tid%4)==3), group(group), stepSize(ncclShmem.comm.buffSizes[NCCL_PROTO_LL128]/NCCL_STEPS/sizeof(uint64_t)) { auto *channel = &ncclShmem.channel; + if (tid == 0) ncclShmem.groups[group].warpStart = threadIdx.x/WARP_SIZE; barriers = ncclShmem.groups[group].barrier; barrier_next = ncclShmem.groups[group].barrier_next; int nrecv=0, nsend=0; diff --git a/src/device/prims_simple.h b/src/device/prims_simple.h index d9ecdb68b..d02d3c626 100644 --- a/src/device/prims_simple.h +++ b/src/device/prims_simple.h @@ -673,6 +673,7 @@ class Primitives< stepSize(stepSize_ == 0 ? ncclShmem.comm.buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS/sizeof(T) : stepSize_) { // For send operations, we need an extra warp to overlap the threadfence and the copy + if (tid == 0) ncclShmem.groups[group].warpStart = threadIdx.x/WARP_SIZE; barriers = ncclShmem.groups[group].barrier; barrier_next = ncclShmem.groups[group].barrier_next; this->nworkers = nthreads; From 5bb601a4cb1ec26804fa2be18ed25e9dd5b3101c Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Tue, 21 Jan 2025 08:40:05 -0800 Subject: [PATCH 18/25] Rework barrier implementation --- src/device/common.h | 9 +++------ src/device/primitives.h | 6 +++--- src/device/prims_ll.h | 7 ++++--- src/device/prims_ll128.h | 7 ++++--- src/device/prims_simple.h | 7 ++++--- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/device/common.h b/src/device/common.h index 80b20b7df..a0c15fec6 100644 --- a/src/device/common.h +++ b/src/device/common.h @@ -110,8 +110,6 @@ struct ncclShmemGroup { void* userOutput; void* srcs[NCCL_MAX_ARITY+1]; void* dsts[NCCL_MAX_ARITY+1]; - uint64_t barrier[NCCL_MAX_GROUPS]; - uint64_t barrier_next[NCCL_MAX_GROUPS]; uint8_t warpStart; union { unpackGroupShmem unpack; @@ -123,6 +121,7 @@ struct ncclShmemGroup { struct ncclShmemData { struct ncclShmemGroup groups[NCCL_MAX_GROUPS]; + uint64_t barrier[NCCL_MAX_GROUPS]; uint64_t redOpArgs[NCCL_MAX_ARITY+1]; int channelId; int aborted; @@ -261,12 +260,10 @@ __forceinline__ __device__ void ncclKernelMain(struct ncclDevComm* comm, struct } break; case 1: - if (tid < WARP_SIZE + NCCL_MAX_GROUPS*NCCL_MAX_GROUPS) - ncclShmem.groups[(tid-WARP_SIZE)/NCCL_MAX_GROUPS].barrier[(tid-WARP_SIZE)%NCCL_MAX_GROUPS] = 0; + if (tid < WARP_SIZE + NCCL_MAX_GROUPS) + ncclShmem.barrier[(tid-WARP_SIZE)%NCCL_MAX_GROUPS] = 0; break; case 2: - if (tid < 2*WARP_SIZE + NCCL_MAX_GROUPS*NCCL_MAX_GROUPS) - ncclShmem.groups[(tid-2*WARP_SIZE)/NCCL_MAX_GROUPS].barrier_next[(tid-2*WARP_SIZE)%NCCL_MAX_GROUPS] = 0; break; case 3: /* set abort flag to 0 */ diff --git a/src/device/primitives.h b/src/device/primitives.h index cf50cf94d..8f7fba0a0 100644 --- a/src/device/primitives.h +++ b/src/device/primitives.h @@ -22,13 +22,13 @@ const int w = threadIdx.x/WARP_SIZE; \ const int wid = threadIdx.x%WARP_SIZE; \ if (wid == 0) { \ - barrier_next[w] += 1; \ + barrier_next += 1; \ barriers[w] += 1; \ int spins = 0; \ int rate_limit = 50; \ for (int i = 0; i < nthreads/WARP_SIZE; i++) { \ uint8_t warp = ncclShmem.groups[group].warpStart + i; \ - while (atomicAdd((unsigned long long *)&barriers[warp], 0) < barrier_next[w]) { \ + while (atomicAdd((unsigned long long *)&barriers[warp], 0) < barrier_next) { \ spins++; \ if (spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { \ if (__atomic_load_n(ncclShmem.comm.abortFlag, __ATOMIC_SEQ_CST)) { \ @@ -39,7 +39,7 @@ } \ if (spins == 0 && rate_limit > 0) { \ rate_limit --; \ - traceData(__LINE__, threadIdx.x, barriers[warp], barrier_next[warp]); \ + traceData(__LINE__, threadIdx.x, barriers[warp]+((uint64_t)warp<<32), barrier_next); \ } \ __builtin_amdgcn_s_sleep(1); \ } \ diff --git a/src/device/prims_ll.h b/src/device/prims_ll.h index d4f46d93b..70d9cacce 100644 --- a/src/device/prims_ll.h +++ b/src/device/prims_ll.h @@ -66,7 +66,7 @@ class Primitives: inline __device__ uint32_t sendFlag(int i) { return NCCL_LL_FLAG(sendStep[i]+1); } uint64_t* barriers; - uint64_t* barrier_next; + uint64_t barrier_next; inline __device__ void barrier() { #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) @@ -617,8 +617,9 @@ class Primitives: stepLines(ncclShmem.comm.buffSizes[NCCL_PROTO_LL]/NCCL_STEPS/sizeof(ncclLLFifoLine)) { auto *channel = &ncclShmem.channel; if (tid == 0) ncclShmem.groups[group].warpStart = threadIdx.x/WARP_SIZE; - barriers = ncclShmem.groups[group].barrier; - barrier_next = ncclShmem.groups[group].barrier_next; + barriers = ncclShmem.barrier; + if (tid%WARP_SIZE == 0) barriers[threadIdx.x/WARP_SIZE] = 0; + barrier_next = 0; // If we are going to support oneshot collNet + LL, then we would need to add connector index here int nrecv=0, nsend=0; // We compare with Fan::MaxRecv here because this->MaxRecv is always at least 1 diff --git a/src/device/prims_ll128.h b/src/device/prims_ll128.h index e3925869e..3356b5afa 100644 --- a/src/device/prims_ll128.h +++ b/src/device/prims_ll128.h @@ -62,7 +62,7 @@ class Primitives: inline __device__ uint64_t sendFlag(int i) { return sendStep[i]+1; } uint64_t* barriers; - uint64_t* barrier_next; + uint64_t barrier_next; #if defined(ENABLE_NPKIT) public: @@ -507,8 +507,9 @@ class Primitives: stepSize(ncclShmem.comm.buffSizes[NCCL_PROTO_LL128]/NCCL_STEPS/sizeof(uint64_t)) { auto *channel = &ncclShmem.channel; if (tid == 0) ncclShmem.groups[group].warpStart = threadIdx.x/WARP_SIZE; - barriers = ncclShmem.groups[group].barrier; - barrier_next = ncclShmem.groups[group].barrier_next; + barriers = ncclShmem.barrier; + if (tid%WARP_SIZE == 0) barriers[threadIdx.x/WARP_SIZE] = 0; + barrier_next = 0; int nrecv=0, nsend=0; while (nrecv < MaxRecv && recvPeers[nrecv] >= 0) { loadRecvConn(&channel->peers[recvPeers[nrecv]]->recv[connIndexRecv], nrecv); diff --git a/src/device/prims_simple.h b/src/device/prims_simple.h index d02d3c626..c2d90a253 100644 --- a/src/device/prims_simple.h +++ b/src/device/prims_simple.h @@ -52,7 +52,7 @@ class Primitives< uint64_t connStepCache; // Cache last seen value of (*connStepPtr) int connStepSize; // Connection step size uint64_t* barriers; - uint64_t* barrier_next; + uint64_t barrier_next; uint32_t* next_hdp_reg; void* mhandle; void* netDeviceHandle; @@ -674,8 +674,9 @@ class Primitives< // For send operations, we need an extra warp to overlap the threadfence and the copy if (tid == 0) ncclShmem.groups[group].warpStart = threadIdx.x/WARP_SIZE; - barriers = ncclShmem.groups[group].barrier; - barrier_next = ncclShmem.groups[group].barrier_next; + barriers = ncclShmem.barrier; + if (tid%WARP_SIZE == 0) barriers[threadIdx.x/WARP_SIZE] = 0; + barrier_next = 0; this->nworkers = nthreads; int nrecv=0, nsend=0; From 6ba73cd8b4c63275ea6c3cd4a9a5cc265d7160ed Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Tue, 21 Jan 2025 23:21:45 +0000 Subject: [PATCH 19/25] Fix for other GFX --- src/device/primitives.h | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/src/device/primitives.h b/src/device/primitives.h index 8f7fba0a0..8e79d4b22 100644 --- a/src/device/primitives.h +++ b/src/device/primitives.h @@ -15,6 +15,7 @@ #define NCCL_SPINS_BEFORE_CHECK_ABORT 1000000 +#if defined(__gfx940__) || defined(__gfx941__) || defined(__gfx942__) #define barrier_by_group() do { \ if (nthreads == NCCL_MAX_NTHREADS) { \ __builtin_amdgcn_s_barrier(); \ @@ -28,7 +29,7 @@ int rate_limit = 50; \ for (int i = 0; i < nthreads/WARP_SIZE; i++) { \ uint8_t warp = ncclShmem.groups[group].warpStart + i; \ - while (atomicAdd((unsigned long long *)&barriers[warp], 0) < barrier_next) { \ + while (__hip_atomic_load(barriers+warp, __ATOMIC_RELAXED, __HIP_MEMORY_SCOPE_WORKGROUP) < barrier_next) { \ spins++; \ if (spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { \ if (__atomic_load_n(ncclShmem.comm.abortFlag, __ATOMIC_SEQ_CST)) { \ @@ -48,7 +49,42 @@ } \ } \ } while (0) - +#else +#define barrier_by_group() do { \ + if (nthreads == NCCL_MAX_NTHREADS) { \ + __threadfence(); __builtin_amdgcn_s_barrier(); \ + } else { \ + const int w = threadIdx.x/WARP_SIZE; \ + const int wid = threadIdx.x%WARP_SIZE; \ + __threadfence(); \ + if (wid == 0) { \ + barrier_next += 1; \ + barriers[w] += 1; \ + int spins = 0; \ + int rate_limit = 50; \ + for (int i = 0; i < nthreads/WARP_SIZE; i++) { \ + uint8_t warp = ncclShmem.groups[group].warpStart + i; \ + while (__hip_atomic_load(barriers+warp, __ATOMIC_RELAXED, __HIP_MEMORY_SCOPE_WORKGROUP) < barrier_next) { \ + spins++; \ + if (spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { \ + if (__atomic_load_n(ncclShmem.comm.abortFlag, __ATOMIC_SEQ_CST)) { \ + ncclShmem.aborted = 1; \ + break; \ + } \ + spins = 0; \ + } \ + if (spins == 0 && rate_limit > 0) { \ + rate_limit --; \ + traceData(__LINE__, threadIdx.x, barriers[warp]+((uint64_t)warp<<32), barrier_next); \ + } \ + __builtin_amdgcn_s_sleep(1); \ + } \ + __asm__ __volatile__("s_wakeup"); \ + } \ + } \ + } \ +} while (0) +#endif /* Protocol classes: ProtoSimple, ProtoLL, ProtoLL128 * We use these as template args to the Primtiives class instead of integral * enums (e.g. NCCL_PROTO_LL) because for SIMPLE we need to carry a few extra From 967e9c3de5fe4dae6d5c8cc08532f0405da45821 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Wed, 22 Jan 2025 01:14:37 +0000 Subject: [PATCH 20/25] Use __hip_atomic_store and __hip_atomic_load --- src/device/primitives.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/device/primitives.h b/src/device/primitives.h index 8e79d4b22..c616efa78 100644 --- a/src/device/primitives.h +++ b/src/device/primitives.h @@ -24,12 +24,12 @@ const int wid = threadIdx.x%WARP_SIZE; \ if (wid == 0) { \ barrier_next += 1; \ - barriers[w] += 1; \ + __hip_atomic_store(barriers+w, barrier_next, __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \ int spins = 0; \ int rate_limit = 50; \ for (int i = 0; i < nthreads/WARP_SIZE; i++) { \ uint8_t warp = ncclShmem.groups[group].warpStart + i; \ - while (__hip_atomic_load(barriers+warp, __ATOMIC_RELAXED, __HIP_MEMORY_SCOPE_WORKGROUP) < barrier_next) { \ + while (__hip_atomic_load(barriers+warp, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP) < barrier_next) { \ spins++; \ if (spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { \ if (__atomic_load_n(ncclShmem.comm.abortFlag, __ATOMIC_SEQ_CST)) { \ @@ -59,12 +59,12 @@ __threadfence(); \ if (wid == 0) { \ barrier_next += 1; \ - barriers[w] += 1; \ + __hip_atomic_store(barriers+w, barrier_next, __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \ int spins = 0; \ int rate_limit = 50; \ for (int i = 0; i < nthreads/WARP_SIZE; i++) { \ uint8_t warp = ncclShmem.groups[group].warpStart + i; \ - while (__hip_atomic_load(barriers+warp, __ATOMIC_RELAXED, __HIP_MEMORY_SCOPE_WORKGROUP) < barrier_next) { \ + while (__hip_atomic_load(barriers+warp, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP) < barrier_next) { \ spins++; \ if (spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { \ if (__atomic_load_n(ncclShmem.comm.abortFlag, __ATOMIC_SEQ_CST)) { \ From 2388a1536e9c03b79a8a1fa1cd465ea0f1dc9b9a Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Thu, 23 Jan 2025 18:54:53 +0000 Subject: [PATCH 21/25] Fix bug in previous commit --- src/device/primitives.h | 4 ++-- src/device/prims_ll.h | 2 +- src/device/prims_ll128.h | 2 +- src/device/prims_simple.h | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/device/primitives.h b/src/device/primitives.h index c616efa78..439712607 100644 --- a/src/device/primitives.h +++ b/src/device/primitives.h @@ -40,7 +40,7 @@ } \ if (spins == 0 && rate_limit > 0) { \ rate_limit --; \ - traceData(__LINE__, threadIdx.x, barriers[warp]+((uint64_t)warp<<32), barrier_next); \ + traceData(__LINE__, threadIdx.x, __hip_atomic_load(barriers+warp, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP)+((uint64_t)warp<<32), barrier_next); \ } \ __builtin_amdgcn_s_sleep(1); \ } \ @@ -75,7 +75,7 @@ } \ if (spins == 0 && rate_limit > 0) { \ rate_limit --; \ - traceData(__LINE__, threadIdx.x, barriers[warp]+((uint64_t)warp<<32), barrier_next); \ + traceData(__LINE__, threadIdx.x, __hip_atomic_load(barriers+warp, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP)+((uint64_t)warp<<32), barrier_next); \ } \ __builtin_amdgcn_s_sleep(1); \ } \ diff --git a/src/device/prims_ll.h b/src/device/prims_ll.h index 70d9cacce..d5a2996a8 100644 --- a/src/device/prims_ll.h +++ b/src/device/prims_ll.h @@ -618,7 +618,7 @@ class Primitives: auto *channel = &ncclShmem.channel; if (tid == 0) ncclShmem.groups[group].warpStart = threadIdx.x/WARP_SIZE; barriers = ncclShmem.barrier; - if (tid%WARP_SIZE == 0) barriers[threadIdx.x/WARP_SIZE] = 0; + if (tid%WARP_SIZE == 0 && threadIdx.x/WARP_SIZE < NCCL_MAX_GROUPS) barriers[threadIdx.x/WARP_SIZE] = 0; barrier_next = 0; // If we are going to support oneshot collNet + LL, then we would need to add connector index here int nrecv=0, nsend=0; diff --git a/src/device/prims_ll128.h b/src/device/prims_ll128.h index 3356b5afa..7fb966124 100644 --- a/src/device/prims_ll128.h +++ b/src/device/prims_ll128.h @@ -508,7 +508,7 @@ class Primitives: auto *channel = &ncclShmem.channel; if (tid == 0) ncclShmem.groups[group].warpStart = threadIdx.x/WARP_SIZE; barriers = ncclShmem.barrier; - if (tid%WARP_SIZE == 0) barriers[threadIdx.x/WARP_SIZE] = 0; + if (tid%WARP_SIZE == 0 && threadIdx.x/WARP_SIZE < NCCL_MAX_GROUPS) barriers[threadIdx.x/WARP_SIZE] = 0; barrier_next = 0; int nrecv=0, nsend=0; while (nrecv < MaxRecv && recvPeers[nrecv] >= 0) { diff --git a/src/device/prims_simple.h b/src/device/prims_simple.h index c2d90a253..9359c5995 100644 --- a/src/device/prims_simple.h +++ b/src/device/prims_simple.h @@ -675,7 +675,7 @@ class Primitives< // For send operations, we need an extra warp to overlap the threadfence and the copy if (tid == 0) ncclShmem.groups[group].warpStart = threadIdx.x/WARP_SIZE; barriers = ncclShmem.barrier; - if (tid%WARP_SIZE == 0) barriers[threadIdx.x/WARP_SIZE] = 0; + if (tid%WARP_SIZE == 0 && threadIdx.x/WARP_SIZE < NCCL_MAX_GROUPS) barriers[threadIdx.x/WARP_SIZE] = 0; barrier_next = 0; this->nworkers = nthreads; From 410ff3cfc00adbe0972787dd308b01226a41c96d Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Fri, 24 Jan 2025 01:50:17 +0000 Subject: [PATCH 22/25] Don't reset barrier values in running kernel --- src/device/common.h | 9 ++++++--- src/device/primitives.h | 21 +++++++++------------ src/device/prims_ll.h | 7 +++---- src/device/prims_ll128.h | 7 +++---- src/device/prims_simple.h | 7 +++---- 5 files changed, 24 insertions(+), 27 deletions(-) diff --git a/src/device/common.h b/src/device/common.h index a0c15fec6..80b20b7df 100644 --- a/src/device/common.h +++ b/src/device/common.h @@ -110,6 +110,8 @@ struct ncclShmemGroup { void* userOutput; void* srcs[NCCL_MAX_ARITY+1]; void* dsts[NCCL_MAX_ARITY+1]; + uint64_t barrier[NCCL_MAX_GROUPS]; + uint64_t barrier_next[NCCL_MAX_GROUPS]; uint8_t warpStart; union { unpackGroupShmem unpack; @@ -121,7 +123,6 @@ struct ncclShmemGroup { struct ncclShmemData { struct ncclShmemGroup groups[NCCL_MAX_GROUPS]; - uint64_t barrier[NCCL_MAX_GROUPS]; uint64_t redOpArgs[NCCL_MAX_ARITY+1]; int channelId; int aborted; @@ -260,10 +261,12 @@ __forceinline__ __device__ void ncclKernelMain(struct ncclDevComm* comm, struct } break; case 1: - if (tid < WARP_SIZE + NCCL_MAX_GROUPS) - ncclShmem.barrier[(tid-WARP_SIZE)%NCCL_MAX_GROUPS] = 0; + if (tid < WARP_SIZE + NCCL_MAX_GROUPS*NCCL_MAX_GROUPS) + ncclShmem.groups[(tid-WARP_SIZE)/NCCL_MAX_GROUPS].barrier[(tid-WARP_SIZE)%NCCL_MAX_GROUPS] = 0; break; case 2: + if (tid < 2*WARP_SIZE + NCCL_MAX_GROUPS*NCCL_MAX_GROUPS) + ncclShmem.groups[(tid-2*WARP_SIZE)/NCCL_MAX_GROUPS].barrier_next[(tid-2*WARP_SIZE)%NCCL_MAX_GROUPS] = 0; break; case 3: /* set abort flag to 0 */ diff --git a/src/device/primitives.h b/src/device/primitives.h index 439712607..e04aacecb 100644 --- a/src/device/primitives.h +++ b/src/device/primitives.h @@ -23,13 +23,13 @@ const int w = threadIdx.x/WARP_SIZE; \ const int wid = threadIdx.x%WARP_SIZE; \ if (wid == 0) { \ - barrier_next += 1; \ - __hip_atomic_store(barriers+w, barrier_next, __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \ + barrier_next[w] += 1; \ + __hip_atomic_store(barriers+w, barrier_next[w], __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \ int spins = 0; \ int rate_limit = 50; \ for (int i = 0; i < nthreads/WARP_SIZE; i++) { \ - uint8_t warp = ncclShmem.groups[group].warpStart + i; \ - while (__hip_atomic_load(barriers+warp, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP) < barrier_next) { \ + uint8_t warpIter = ncclShmem.groups[group].warpStart + i; \ + while (__hip_atomic_load(barriers+warpIter, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP) < barrier_next[w]) { \ spins++; \ if (spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { \ if (__atomic_load_n(ncclShmem.comm.abortFlag, __ATOMIC_SEQ_CST)) { \ @@ -40,7 +40,7 @@ } \ if (spins == 0 && rate_limit > 0) { \ rate_limit --; \ - traceData(__LINE__, threadIdx.x, __hip_atomic_load(barriers+warp, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP)+((uint64_t)warp<<32), barrier_next); \ + traceData(__LINE__, threadIdx.x, __hip_atomic_load(barriers+warpIter, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP)+((uint64_t)warpIter<<32), barrier_next[w]); \ } \ __builtin_amdgcn_s_sleep(1); \ } \ @@ -58,13 +58,11 @@ const int wid = threadIdx.x%WARP_SIZE; \ __threadfence(); \ if (wid == 0) { \ - barrier_next += 1; \ - __hip_atomic_store(barriers+w, barrier_next, __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \ + barrier_next[w] += nthreads/WARP_SIZE; \ + __hip_atomic_fetch_add(barriers, 1, __ATOMIC_RELEASE, __HIP_MEMORY_SCOPE_WORKGROUP); \ int spins = 0; \ int rate_limit = 50; \ - for (int i = 0; i < nthreads/WARP_SIZE; i++) { \ - uint8_t warp = ncclShmem.groups[group].warpStart + i; \ - while (__hip_atomic_load(barriers+warp, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP) < barrier_next) { \ + while (__hip_atomic_load(barriers, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP) < barrier_next[w]) { \ spins++; \ if (spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { \ if (__atomic_load_n(ncclShmem.comm.abortFlag, __ATOMIC_SEQ_CST)) { \ @@ -75,12 +73,11 @@ } \ if (spins == 0 && rate_limit > 0) { \ rate_limit --; \ - traceData(__LINE__, threadIdx.x, __hip_atomic_load(barriers+warp, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP)+((uint64_t)warp<<32), barrier_next); \ + traceData(__LINE__, threadIdx.x, __hip_atomic_load(barriers, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP), barrier_next[w]); \ } \ __builtin_amdgcn_s_sleep(1); \ } \ __asm__ __volatile__("s_wakeup"); \ - } \ } \ } \ } while (0) diff --git a/src/device/prims_ll.h b/src/device/prims_ll.h index d5a2996a8..d4f46d93b 100644 --- a/src/device/prims_ll.h +++ b/src/device/prims_ll.h @@ -66,7 +66,7 @@ class Primitives: inline __device__ uint32_t sendFlag(int i) { return NCCL_LL_FLAG(sendStep[i]+1); } uint64_t* barriers; - uint64_t barrier_next; + uint64_t* barrier_next; inline __device__ void barrier() { #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) @@ -617,9 +617,8 @@ class Primitives: stepLines(ncclShmem.comm.buffSizes[NCCL_PROTO_LL]/NCCL_STEPS/sizeof(ncclLLFifoLine)) { auto *channel = &ncclShmem.channel; if (tid == 0) ncclShmem.groups[group].warpStart = threadIdx.x/WARP_SIZE; - barriers = ncclShmem.barrier; - if (tid%WARP_SIZE == 0 && threadIdx.x/WARP_SIZE < NCCL_MAX_GROUPS) barriers[threadIdx.x/WARP_SIZE] = 0; - barrier_next = 0; + barriers = ncclShmem.groups[group].barrier; + barrier_next = ncclShmem.groups[group].barrier_next; // If we are going to support oneshot collNet + LL, then we would need to add connector index here int nrecv=0, nsend=0; // We compare with Fan::MaxRecv here because this->MaxRecv is always at least 1 diff --git a/src/device/prims_ll128.h b/src/device/prims_ll128.h index 7fb966124..e3925869e 100644 --- a/src/device/prims_ll128.h +++ b/src/device/prims_ll128.h @@ -62,7 +62,7 @@ class Primitives: inline __device__ uint64_t sendFlag(int i) { return sendStep[i]+1; } uint64_t* barriers; - uint64_t barrier_next; + uint64_t* barrier_next; #if defined(ENABLE_NPKIT) public: @@ -507,9 +507,8 @@ class Primitives: stepSize(ncclShmem.comm.buffSizes[NCCL_PROTO_LL128]/NCCL_STEPS/sizeof(uint64_t)) { auto *channel = &ncclShmem.channel; if (tid == 0) ncclShmem.groups[group].warpStart = threadIdx.x/WARP_SIZE; - barriers = ncclShmem.barrier; - if (tid%WARP_SIZE == 0 && threadIdx.x/WARP_SIZE < NCCL_MAX_GROUPS) barriers[threadIdx.x/WARP_SIZE] = 0; - barrier_next = 0; + barriers = ncclShmem.groups[group].barrier; + barrier_next = ncclShmem.groups[group].barrier_next; int nrecv=0, nsend=0; while (nrecv < MaxRecv && recvPeers[nrecv] >= 0) { loadRecvConn(&channel->peers[recvPeers[nrecv]]->recv[connIndexRecv], nrecv); diff --git a/src/device/prims_simple.h b/src/device/prims_simple.h index 9359c5995..d02d3c626 100644 --- a/src/device/prims_simple.h +++ b/src/device/prims_simple.h @@ -52,7 +52,7 @@ class Primitives< uint64_t connStepCache; // Cache last seen value of (*connStepPtr) int connStepSize; // Connection step size uint64_t* barriers; - uint64_t barrier_next; + uint64_t* barrier_next; uint32_t* next_hdp_reg; void* mhandle; void* netDeviceHandle; @@ -674,9 +674,8 @@ class Primitives< // For send operations, we need an extra warp to overlap the threadfence and the copy if (tid == 0) ncclShmem.groups[group].warpStart = threadIdx.x/WARP_SIZE; - barriers = ncclShmem.barrier; - if (tid%WARP_SIZE == 0 && threadIdx.x/WARP_SIZE < NCCL_MAX_GROUPS) barriers[threadIdx.x/WARP_SIZE] = 0; - barrier_next = 0; + barriers = ncclShmem.groups[group].barrier; + barrier_next = ncclShmem.groups[group].barrier_next; this->nworkers = nthreads; int nrecv=0, nsend=0; From f7cc41652e78b412cd2e2a4d8c7fd1fe7743a52e Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Fri, 24 Jan 2025 17:49:55 +0000 Subject: [PATCH 23/25] Update trace format --- src/device/primitives.h | 4 +++- src/device/prims_simple.h | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/device/primitives.h b/src/device/primitives.h index e04aacecb..5e8bb7b15 100644 --- a/src/device/primitives.h +++ b/src/device/primitives.h @@ -40,7 +40,9 @@ } \ if (spins == 0 && rate_limit > 0) { \ rate_limit --; \ - traceData(__LINE__, threadIdx.x, __hip_atomic_load(barriers+warpIter, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP)+((uint64_t)warpIter<<32), barrier_next[w]); \ + uint64_t tmp = __hip_atomic_load(barriers + warpIter, __ATOMIC_RELAXED, __HIP_MEMORY_SCOPE_WORKGROUP) \ + + ((uint64_t)group<<48) + ((uint64_t)warpIter<<32) + (((uint64_t)ncclShmem.groups[group].warpStart)<<16); \ + traceData(_LINE_, threadIdx.x, tmp, barrier_next[w] + ((uint64_t)i<<32)); \ } \ __builtin_amdgcn_s_sleep(1); \ } \ diff --git a/src/device/prims_simple.h b/src/device/prims_simple.h index d02d3c626..54f9bbcf6 100644 --- a/src/device/prims_simple.h +++ b/src/device/prims_simple.h @@ -43,7 +43,7 @@ class Primitives< Fan fan; int index; // Peer index I'm responsible for int flags; - int group; + const int group; uint64_t step; struct ncclConnFifo* connFifo = NULL; T* connEltsFifo; From 84c65c4831fd32c5c0d5d2b13239f507fb8a6648 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Fri, 24 Jan 2025 18:44:03 +0000 Subject: [PATCH 24/25] Fix typo --- src/device/primitives.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/device/primitives.h b/src/device/primitives.h index 5e8bb7b15..7f131a192 100644 --- a/src/device/primitives.h +++ b/src/device/primitives.h @@ -42,7 +42,7 @@ rate_limit --; \ uint64_t tmp = __hip_atomic_load(barriers + warpIter, __ATOMIC_RELAXED, __HIP_MEMORY_SCOPE_WORKGROUP) \ + ((uint64_t)group<<48) + ((uint64_t)warpIter<<32) + (((uint64_t)ncclShmem.groups[group].warpStart)<<16); \ - traceData(_LINE_, threadIdx.x, tmp, barrier_next[w] + ((uint64_t)i<<32)); \ + traceData(__LINE__, threadIdx.x, tmp, barrier_next[w] + ((uint64_t)i<<32)); \ } \ __builtin_amdgcn_s_sleep(1); \ } \ From af13e869b014915ee4e67847f6b1e7be621afffe Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Fri, 24 Jan 2025 19:27:41 +0000 Subject: [PATCH 25/25] Switch back to hip_atomic_fetch_add --- src/device/primitives.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/device/primitives.h b/src/device/primitives.h index 7f131a192..0b7a25569 100644 --- a/src/device/primitives.h +++ b/src/device/primitives.h @@ -15,7 +15,7 @@ #define NCCL_SPINS_BEFORE_CHECK_ABORT 1000000 -#if defined(__gfx940__) || defined(__gfx941__) || defined(__gfx942__) +#if 0 #define barrier_by_group() do { \ if (nthreads == NCCL_MAX_NTHREADS) { \ __builtin_amdgcn_s_barrier(); \