Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HDP flush for gfx940 #1434

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
6 changes: 2 additions & 4 deletions src/device/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@
__trace_hwreg()\
if (ncclShmem.work.header.type == ncclWorkTypeP2p) { \
struct ncclWorkElemP2p *p2pElems = ncclShmem.work.p2pElems; \
collTrace->p2p[0].connIndex = 0; \
collTrace->p2p[0].connIndex = p2pElems[0].connIndex; \
collTrace->p2pOpCount[0] = p2pElems[0].opCount; \
collTrace->p2p[0].ngroups = p2pElems[0].ngroups; \
collTrace->p2p[0].nWarps = p2pElems[0].nWarps; \
collTrace->p2p[0].warpStart = p2pElems[0].warpStart; \
collTrace->p2p[0].peer = p2pElems[0].p2pType == ncclWorkP2pTypeRecv ? (uint16_t)(p2pElems[0].peer) : -1; \
collTrace->p2p[1].connIndex = 0; \
collTrace->p2p[1].connIndex = p2pElems[1].connIndex; \
collTrace->p2pOpCount[1] = p2pElems[1].opCount; \
collTrace->p2p[1].ngroups = p2pElems[1].ngroups; \
collTrace->p2p[1].nWarps = p2pElems[1].nWarps; \
Expand All @@ -74,8 +74,6 @@
struct ncclWorkElem *elems = ncclShmem.work.elems; \
collTrace->opCount = elems[0].opCount; \
collTrace->coll.nWarps = elems[0].nWarps; \
collTrace->coll.bid = elems[0].bid; \
collTrace->coll.nChannels = elems[0].nChannels; \
collTrace->type = (launch_type) | ncclCollTraceCollElemType; \
} \
}
Expand Down
3 changes: 1 addition & 2 deletions src/device/op128.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ template<> __device__ __forceinline__ void st_global<0>(uintptr_t addr, BytePack
} \
template<> \
__device__ __forceinline__ void st_##space<bytes>(addr_cxx_ty addr, BytePack<bytes> value) { \
data_cxx_ty tmp = value.native; \
*((data_cxx_ty *)addr) = tmp; \
__builtin_nontemporal_store(value.native, (data_cxx_ty *)addr); \
}

// #if __CUDA_ARCH__ >= 700
Expand Down
6 changes: 5 additions & 1 deletion src/device/prims_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,16 @@ class Primitives<
if (((flags & (Recv*RoleWaitRecv)) && !noRecvWait) ||
((flags & (Send*RoleWaitSend)) && !noSendWait)) {
int spins = 0;
static int repeat = 100;
while (connStepCache + (isSendNotRecv ? NCCL_STEPS : 0) < step + StepPerSlice) {
__builtin_amdgcn_s_sleep(1);
connStepCache = loadStepValue(connStepPtr);
if (checkAbort(spins)) break;
//if (spins == 0) printf("r=%d b=%d t=%d SPUN OUT got=%d want=%d\n", ncclShmem.comm.rank, blockIdx.x, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice));
if (spins == 0) traceData(__LINE__, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice));
if (spins == 0 && repeat > 0) {
repeat --;
traceData(__LINE__, threadIdx.x, int(connStepCache + (isSendNotRecv ? NCCL_STEPS : 0)), int(step+StepPerSlice));
}
}
__asm__ __volatile__("s_wakeup");
}
Expand Down
2 changes: 1 addition & 1 deletion src/enqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct ncclKernelMatch {
};

#ifdef ENABLE_COLLTRACE
#define ncclGetKernelIndex(p_comm) ((p_comm)->unroll + ((p_comm)->collTraceThread ? 2 : 0))
#define ncclGetKernelIndex(p_comm) ((p_comm)->unroll + ((p_comm)->collTraceEnabled ? 2 : 0))
static ncclKernelMatch const ncclKerns[4] = {
{(void *)ncclDevKernel_Generic, true},
{(void *)ncclDevKernel_Generic_4, true},
Expand Down
1 change: 1 addition & 0 deletions src/include/comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ struct ncclComm {
union ncclCollTraceTail *collTraceTail;
pthread_t collTraceThread;
volatile bool collTraceExit;
bool collTraceEnabled;
#endif

ncclConfig_t config;
Expand Down
2 changes: 0 additions & 2 deletions src/include/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,6 @@ struct ncclCollTrace {
uint64_t data_1;
struct {
uint8_t nWarps;
uint8_t bid;
uint8_t nChannels;
} coll;
struct {
int16_t peer;
Expand Down
40 changes: 25 additions & 15 deletions src/init.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ static constexpr int64_t defaultEnableMscclpp = 0;
RCCL_PARAM(MscclppEnabled, "MSCCLPP_ENABLE", defaultEnableMscclpp);

// GDRCOPY support: Off by default
NCCL_PARAM(GdrCopyEnable, "GDRCOPY_ENABLE", 0);
NCCL_PARAM(GdrCopyEnable, "GDRCOPY_ENABLE", 1);

// GDRCOPY support
gdr_t ncclGdrCopy = NULL;
Expand Down Expand Up @@ -218,6 +218,7 @@ void NCCL_NO_OPTIMIZE commPoison(ncclComm_t comm) {
}

RCCL_PARAM(KernelCollTraceEnable, "KERNEL_COLL_TRACE_ENABLE", 0);
RCCL_PARAM(KernelCollTraceThreadEnable, "KERNEL_COLL_TRACE_THREAD_ENABLE", 0);

#ifdef ENABLE_COLLTRACE
// Should be in sync with 'ALL_COLLS' in Generator.cmake
Expand All @@ -231,16 +232,14 @@ void *ncclCommThreadMain(void *arg) {
do {
int numActiveChans = MAXCHANNELS;
for (int channel = 0; channel < MAXCHANNELS; channel++) {
int tail = comm->collTraceTail[channel].tail%COLLTRACE_NUM_ITEMS;
int tail = comm->collTraceTail[channel].tail;
int count;
if (head[channel] <= tail)
count = tail - head[channel];
else
count = COLLTRACE_NUM_ITEMS + head[channel] - tail;
count = tail - head[channel];
if (count == 0) {
numActiveChans--;
continue;
}
count = count%COLLTRACE_NUM_ITEMS;
for (int i = 0; i < count; i++) {
volatile struct ncclCollTrace *td = comm->collTrace+COLLTRACE_NUM_ITEMS*channel+head[channel];
uint8_t type = td->type;
Expand All @@ -260,7 +259,7 @@ void *ncclCommThreadMain(void *arg) {
sprintf(line, "## [%012.6f] [%02d:%02d:%02d] %06lx", (double)(td->timeStamp)/vega_gpu_rtc_freq, comm->rank, td->bid, td->channelId, td->opCount);
offset = strlen(line);
if (type == ncclCollTraceCollElemType) {
sprintf(line+offset, " CE %s nw %d bi %d nc %d busId %lx nRanks %d", funcNames[fIdx], td->coll.nWarps, td->coll.bid, td->coll.nChannels, comm->busId, comm->nRanks);
sprintf(line+offset, " CE %s nw %d busId %lx nRanks %d", funcNames[fIdx], td->coll.nWarps, comm->busId, comm->nRanks);
} else if (type == ncclCollTraceP2pElemType) {
sprintf(line+offset, " PE %s %d -> %d/%d/%d/%d conn/nw/ws/ng %d/%d/%d/%d -> %d busId %lx nRanks %d", funcNames[fIdx],
td->p2p[0].peer, td->p2p[0].connIndex, td->p2p[0].nWarps, td->p2p[0].warpStart, td->p2p[0].ngroups,
Expand All @@ -275,7 +274,7 @@ void *ncclCommThreadMain(void *arg) {
sprintf(line+offset, " CL %s", funcNames[fIdx]);
offset = strlen(line);
if ((type&0xf0) == ncclCollTraceCollElemType)
sprintf(line+offset, " nw %d bi %d nc %d busId %lx nRanks %d", td->coll.nWarps, td->coll.bid, td->coll.nChannels, comm->busId, comm->nRanks);
sprintf(line+offset, " nw %d busId %lx nRanks %d", td->coll.nWarps, comm->busId, comm->nRanks);
else if ((type&0xf0) == ncclCollTraceP2pElemType)
sprintf(line+offset, " %d -> %d/%d/%d/%d conn/nw/ws/ng %d/%d/%d/%d -> %d busId %lx nRanks %d",
td->p2p[0].peer, td->p2p[0].connIndex, td->p2p[0].nWarps, td->p2p[0].warpStart, td->p2p[0].ngroups,
Expand All @@ -296,14 +295,16 @@ void *ncclCommThreadMain(void *arg) {
INFO(NCCL_COLL, "%s", line);
td->type = ncclCollTraceNotReady;
head[channel] ++;
head[channel] %= COLLTRACE_NUM_ITEMS;
}
}
if (comm->collTraceExit && numActiveChans == 0)
break;
usleep(1000); //sleep 1ms
} while(true);
pthread_exit(NULL);
if (comm->collTraceThread)
pthread_exit(NULL);
else
return 0;
}
#endif

Expand Down Expand Up @@ -398,7 +399,12 @@ static ncclResult_t commFree(ncclComm_t comm) {

#ifdef ENABLE_COLLTRACE
comm->collTraceExit = 1;
if (comm->collTraceThread) pthread_join(comm->collTraceThread, NULL);
if (comm->collTraceEnabled) {
if (comm->collTraceThread)
pthread_join(comm->collTraceThread, NULL);
else
ncclCommThreadMain((void *)comm);
}
NCCLCHECK(ncclCudaHostFree((void *)comm->collTrace));
NCCLCHECK(ncclCudaHostFree((void *)comm->collTraceTail));
#endif
Expand Down Expand Up @@ -579,10 +585,14 @@ static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, in
NCCLCHECK(ncclCudaHostCalloc(&comm->collTraceTail, MAXCHANNELS));
NCCLCHECK(ncclCudaHostCalloc(&comm->collTrace, COLLTRACE_NUM_ITEMS*MAXCHANNELS));
comm->collTraceExit = 0;
if ((ncclDebugLevel >= NCCL_LOG_INFO) && rcclParamKernelCollTraceEnable())
pthread_create(&comm->collTraceThread, NULL, ncclCommThreadMain, (void *)comm);
else
comm->collTraceThread = 0;
comm->collTraceEnabled = false; // we can enable colltrace without starting a thread
if ((ncclDebugLevel >= NCCL_LOG_INFO) && rcclParamKernelCollTraceEnable()) {
comm->collTraceEnabled = true;
if (rcclParamKernelCollTraceThreadEnable())
pthread_create(&comm->collTraceThread, NULL, ncclCommThreadMain, (void *)comm);
else
comm->collTraceThread = 0;
}
#endif
comm->collNetSupport = 0;
memset(comm->collNetSupportMatrix, 0, sizeof(comm->collNetSupportMatrix));
Expand Down
28 changes: 27 additions & 1 deletion src/transport/net.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,11 @@ static ncclResult_t recvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph
NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, netId, 0, &req.useGdr));

// Determine whether we need to flush the GDR buffer on recv or not
if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm->topo, myInfo->busId, &req.needFlush));
if (req.useGdr) {
NCCLCHECK(ncclTopoNeedFlush(comm->topo, myInfo->busId, &req.needFlush));
CUDACHECK(hipDeviceGetAttribute((int*)&req.curr_hdp_reg, hipDeviceAttributeHdpMemFlushCntl, myInfo->cudaDev));
recv->conn.curr_hdp_reg = req.curr_hdp_reg;
}

// We don't support PXN on receive yet
tpProxyRank = comm->topParentRanks[myInfo->rank];
Expand Down Expand Up @@ -654,6 +658,7 @@ static ncclResult_t recvProxySetup(struct ncclProxyConnection* connection, struc
resources->needFlush = req->needFlush;
resources->channelId = req->channelId;
resources->connIndex = req->connIndex;
resources->curr_hdp_reg = req->curr_hdp_reg;
ncclNetProperties_t props;
NCCLCHECK(proxyState->ncclNet->getProperties(req->netDev, &props));
/* DMA-BUF support */
Expand Down Expand Up @@ -1349,6 +1354,8 @@ static ncclResult_t sendProxyProgress(struct ncclProxyState* proxyState, struct
return ncclSuccess;
}

RCCL_PARAM(NetHdpFlush, "NET_HDP_FLUSH", 1);

static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct ncclProxyArgs* args) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt++;
Expand Down Expand Up @@ -1538,16 +1545,31 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
if (totalSize > 0 && p == NCCL_PROTO_SIMPLE && needFlush) {
// GDRCOPY support
struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources);
if (rcclParamNetHdpFlush() && resources->curr_hdp_reg) {
static bool once = true;
*resources->curr_hdp_reg = 0x1;
__sync_synchronize();
if (once) {
once = false;
INFO(NCCL_INIT, "%s: flushed HDP %p", __func__, resources->curr_hdp_reg);
}
}
if (resources->gdcFlush) {
#if defined (__x86_64__)
// Force a PCI-E read from GPU memory
static bool once = true;
asm volatile ("mov (%0), %%eax" :: "l"(resources->gdcFlush) : "%eax");
if (once) {
once = false;
INFO(NCCL_INIT, "%s: issued GDC flush", __func__);
}
#else
WARN("NET: GDR Flush only supported on x86_64");
return ncclInternalError;
#endif
} else {
int subCount = 0;
static bool once = true;
for (int i=0; i<subGroup->groupSize; i++) {
struct ncclProxySubArgs* sub = subGroup + i;
if (step < sub->nsteps) {
Expand All @@ -1564,6 +1586,10 @@ static ncclResult_t recvProxyProgress(struct ncclProxyState* proxyState, struct
}
struct recvNetResources* resources = (struct recvNetResources*) (subGroup->connection->transportResources);
NCCLCHECK(proxyState->ncclNet->iflush(resources->netRecvComm, subCount, ptrs, sizes, mhandles, subGroup->requests+(step%NCCL_STEPS)));
if (once) {
once = false;
INFO(NCCL_INIT, "%s: issued GDR flush", __func__);
}
}
}
args->idle = 0;
Expand Down