From 5984f49eedc988f8bded8f40d1fc79d6c8c566ad Mon Sep 17 00:00:00 2001 From: gilbertlee-amd <44450918+gilbertlee-amd@users.noreply.github.com> Date: Tue, 21 Jan 2025 12:38:25 -0700 Subject: [PATCH] TransferBench V1.59 (#162) Adding NIC execution capabilities, various bug fixes introduced by header-only-library refactor --------- Co-authored-by: Mustafa Abduljabbar --- CHANGELOG.md | 24 + CMakeLists.txt | 14 +- Makefile | 25 +- examples/example.cfg | 14 +- src/client/Client.cpp | 41 +- src/client/Client.hpp | 4 +- src/client/EnvVars.hpp | 81 +- src/client/Presets/AllToAll.hpp | 60 +- src/client/Presets/Sweep.hpp | 44 +- src/client/Topology.hpp | 47 +- src/header/TransferBench.hpp | 1427 +++++++++++++++++++++++++++---- 11 files changed, 1572 insertions(+), 209 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f953b87..4dc2537 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,30 @@ Documentation for TransferBench is available at [https://rocm.docs.amd.com/projects/TransferBench](https://rocm.docs.amd.com/projects/TransferBench). +## v1.59.00 +### Added +- Adding in support for NIC executor, which allows for RDMA copies on NICs that support IBVerbs + By default, NIC executor will be enabled if IBVerbs is found in the dynamic linker cache +- NIC executor can be indexed in two methods + - "I" Ix.y will use NIC x as the source and NIC y as the destination. + E.g. (G0 I0.5 G4) + - "N" Nx.y will use NIC closest to GPU x as source, and NIC closest to GPU y as destination + E.g. (G0 N0.4 N4) +- The closest NIC can be overridden by the environment variable CLOSEST_NIC, which should be a comma-separated + list of NIC indices to use for the corresponding GPU +- This feature can be explicitly disabled at compile time by specifying DISABLE_NIC_EXEC=1 + +### Modified +- Changing default data size to 256M from 64M +- Adding NUM_QUEUE_PAIRS which enables NIC traffic in A2A. Each GPU will talk to the next GPU via the closest NIC +- Sweep preset now saves last sweep run configuration to /tmp/lastSweep.cfg and can be changed via SWEEP_FILE + +### Fixed +- Fixed bug with reporting when using subiterations +- Fixed bug with per-Transfer data size specification +- Fixed bug when using XCC prefered table + + ## v1.58.00 ### Fixed - Fixed broken specific DMA-engine copies diff --git a/CMakeLists.txt b/CMakeLists.txt index cc63b74..110c1f8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,7 +7,7 @@ else() endif() cmake_minimum_required(VERSION 3.5) -project(TransferBench VERSION 1.58.00 LANGUAGES CXX) +project(TransferBench VERSION 1.59.00 LANGUAGES CXX) # Default GPU architectures to build #================================================================================================== @@ -56,6 +56,18 @@ set( CMAKE_CXX_FLAGS "${flags_str} ${CMAKE_CXX_FLAGS}") set( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3 -L${ROCM_PATH}/lib") include_directories(${ROCM_PATH}/include) +find_library(IBVERBS_LIBRARY ibverbs) +if (IBVERBS_LIBRARY) + if (DEFINED ENV{DISABLE_NIC_EXEC}) + message(STATUS "Disabling NIC Executor support") + else() + message(STATUS "Found ibverbs: ${IBVERBS_LIBRARY}. Building with NIC executor support. Can set DISABLE_NIC_EXEC=1 to disable") + add_definitions(-DNIC_EXEC_ENABLED) + link_libraries(ibverbs) + endif() +else() + message(WARNING "IBVerbs library not found. Building without NIC executor support") +endif() link_libraries(numa hsa-runtime64 pthread) set (CMAKE_RUNTIME_OUTPUT_DIRECTORY .) add_executable(TransferBench src/client/Client.cpp) diff --git a/Makefile b/Makefile index d81f602..4f613eb 100644 --- a/Makefile +++ b/Makefile @@ -11,9 +11,9 @@ NVCC=$(CUDA_PATH)/bin/nvcc # Compile TransferBenchCuda if nvcc detected ifeq ("$(shell test -e $(NVCC) && echo found)", "found") - EXE=TransferBenchCuda + EXE=TransferBenchCuda else - EXE=TransferBench + EXE=TransferBench endif CXXFLAGS = -I$(ROCM_PATH)/include -lnuma -L$(ROCM_PATH)/lib -lhsa-runtime64 @@ -21,13 +21,30 @@ NVFLAGS = -x cu -lnuma -arch=native COMMON_FLAGS = -O3 -I./src/header -I./src/client -I./src/client/Presets LDFLAGS += -lpthread +# Compile RDMA executor if IBVerbs is found in the Dynamic Linker cache +NIC_ENABLED = 0 +ifneq ($(DISABLE_NIC_EXEC),1) + ifneq ("$(shell ldconfig -p | grep -c ibverbs)", "0") + LDFLAGS += -libverbs -DNIC_EXEC_ENABLED + NVFLAGS += -libverbs -DNIC_EXEC_ENABLED + NIC_ENABLED = 1 + endif +endif + all: $(EXE) -TransferBench: ./src/client/Client.cpp $(shell find -regex ".*\.\hpp") +TransferBench: ./src/client/Client.cpp $(shell find -regex ".*\.\hpp") NicStatus $(HIPCC) $(CXXFLAGS) $(COMMON_FLAGS) $< -o $@ $(LDFLAGS) -TransferBenchCuda: ./src/client/Client.cpp $(shell find -regex ".*\.\hpp") +TransferBenchCuda: ./src/client/Client.cpp $(shell find -regex ".*\.\hpp") NicStatus $(NVCC) $(NVFLAGS) $(COMMON_FLAGS) $< -o $@ $(LDFLAGS) clean: rm -f *.o ./TransferBench ./TransferBenchCuda + +NicStatus: + ifeq ($(NIC_ENABLED), 1) + $(info Building with NIC executor support. Can set DISABLE_NIC_EXEC=1 to disable) + else + $(info Building without NIC executor support) + endif diff --git a/examples/example.cfg b/examples/example.cfg index 5f5d7cc..7372d9c 100644 --- a/examples/example.cfg +++ b/examples/example.cfg @@ -13,6 +13,7 @@ # 1) CPU CPU thread # 2) GPU GPU threadblock/Compute Unit (CU) # 3) DMA N/A. (May only be used for copies (single SRC/DST) +# 4) NIC Queue Pair # Each single line in the configuration file defines a set of Transfers (a Test) to run in parallel @@ -34,9 +35,11 @@ # #SEs : Number of SubExectors to use (CPU threads/ GPU threadblocks) # srcMemL : Source memory locations (Where the data is to be read from) # Executor : Executor is specified by a character indicating type, followed by device index (0-indexed) -# - C: CPU-executed (Indexed from 0 to # NUMA nodes - 1) -# - G: GPU-executed (Indexed from 0 to # GPUs - 1) -# - D: DMA-executor (Indexed from 0 to # GPUs - 1) +# - C: CPU-executed (Indexed from 0 to # NUMA nodes - 1) +# - G: GPU-executed (Indexed from 0 to # GPUs - 1) +# - D: DMA-executor (Indexed from 0 to # GPUs - 1) +# - I#.#: NIC executor (Indexed from 0 to # NICs - 1) +# - N#.#: Nearest NIC executor (Indexed from 0 to # GPUs - 1) # dstMemL : Destination memory locations (Where the data is to be written to) # bytesL : Number of bytes to copy (0 means use command-line specified size) # Must be a multiple of 4 and may be suffixed with ('K','M', or 'G') @@ -56,7 +59,10 @@ # 1 4 (C1->G2->G0) Uses 4 CUs on GPU2 to copy from CPU1 to GPU0 # 2 4 G0->G0->G1 G1->G1->G0 Copes from GPU0 to GPU1, and GPU1 to GPU0, each with 4 SEs # -2 (G0 G0 G1 4 1M) (G1 G1 G0 2 2M) Copies 1Mb from GPU0 to GPU1 with 4 SEs, and 2Mb from GPU1 to GPU0 with 2 SEs - +# 1 2 (F0->I0.2->F1) Uses 2 QPs to transfer data from GPU0 via NIC0 to GPU1 via NIC2 +# 1 1 (F0->N0.1->F1) Uses 1 QP to transfer data from GPU0 via GPU0's closest NIC to GPU1 via GPU1's closest NIC +# -2 (G0->N0.1->G1 2 128M) (G1->N1.0->G0 1 256M) Uses Nearest NIC executor to copy 128Mb from GPU0 to GPU1 with 2 QPs, +# and 256Mb from GPU1 to GPU0 with 1 QP # Round brackets and arrows' ->' may be included for human clarity, but will be ignored and are unnecessary # Lines starting with # will be ignored. Lines starting with ## will be echoed to output diff --git a/src/client/Client.cpp b/src/client/Client.cpp index 2bdcbbd..57b7461 100644 --- a/src/client/Client.cpp +++ b/src/client/Client.cpp @@ -121,13 +121,23 @@ int main(int argc, char **argv) { } } + // Track which transfers have already numBytes specified + std::vector bytesSpecified(transfers.size()); + int hasUnspecified = false; + for (int i = 0; i < transfers.size(); i++) { + bytesSpecified[i] = (transfers[i].numBytes != 0); + if (transfers[i].numBytes == 0) hasUnspecified = true; + } + // Run the specified numbers of bytes otherwise generate a range of values for (size_t bytes = (1<<10); bytes <= (1<<29); bytes *= 2) { size_t deltaBytes = std::max(1UL, bytes / ev.samplingFactor); size_t currBytes = (numBytesPerTransfer == 0) ? bytes : numBytesPerTransfer; do { - for (auto& t : transfers) - t.numBytes = currBytes; + for (int i = 0; i < transfers.size(); i++) { + if (!bytesSpecified[i]) + transfers[i].numBytes = currBytes; + } if (maxVarCount == 0) { if (TransferBench::RunTransfers(cfgOptions, transfers, results)) { @@ -162,17 +172,21 @@ int main(int argc, char **argv) { PrintResults(ev, ++testNum, bestTransfers, bestResults); PrintErrors(bestResults.errResults); } - if (numBytesPerTransfer != 0) break; + if (numBytesPerTransfer != 0 || !hasUnspecified) break; currBytes += deltaBytes; } while (currBytes < bytes * 2); - if (numBytesPerTransfer != 0) break; + if (numBytesPerTransfer != 0 || !hasUnspecified) break; } } } void DisplayUsage(char const* cmdName) { - printf("TransferBench v%s.%s\n", TransferBench::VERSION, CLIENT_VERSION); + std::string nicSupport = ""; +#if NIC_EXEC_ENABLED + nicSupport = " (with NIC support)"; +#endif + printf("TransferBench v%s.%s%s\n", TransferBench::VERSION, CLIENT_VERSION, nicSupport.c_str()); printf("========================================\n"); if (numa_available() == -1) { @@ -218,7 +232,7 @@ void PrintResults(EnvVars const& ev, int const testNum, ExeType const exeType = exeDevice.exeType; int32_t const exeIndex = exeDevice.exeIndex; - printf(" Executor: %3s %02d %c %7.3f GB/s %c %8.3f ms %c %12lu bytes %c %-7.3f GB/s (sum)\n", + printf(" Executor: %3s %02d %c %8.3f GB/s %c %8.3f ms %c %12lu bytes %c %-7.3f GB/s (sum)\n", ExeTypeName[exeType], exeIndex, sep, exeResult.avgBandwidthGbPerSec, sep, exeResult.avgDurationMsec, sep, exeResult.numBytes, sep, exeResult.sumBandwidthGbPerSec); @@ -230,14 +244,15 @@ void PrintResults(EnvVars const& ev, int const testNum, char exeSubIndexStr[32] = ""; if (t.exeSubIndex != -1) sprintf(exeSubIndexStr, ".%d", t.exeSubIndex); - - printf(" Transfer %02d %c %7.3f GB/s %c %8.3f ms %c %12lu bytes %c %s -> %s%02d%s:%03d -> %s\n", + printf(" Transfer %02d %c %8.3f GB/s %c %8.3f ms %c %12lu bytes %c %s -> %c%03d%s:%03d -> %s\n", idx, sep, r.avgBandwidthGbPerSec, sep, r.avgDurationMsec, sep, r.numBytes, sep, - MemDevicesToStr(t.srcs).c_str(), ExeTypeName[exeType], exeIndex, - exeSubIndexStr, t.numSubExecs, MemDevicesToStr(t.dsts).c_str()); + MemDevicesToStr(t.srcs).c_str(), + TransferBench::ExeTypeStr[t.exeDevice.exeType], t.exeDevice.exeIndex, + exeSubIndexStr, t.numSubExecs, + MemDevicesToStr(t.dsts).c_str()); // Show per-iteration timing information if (ev.showIterations) { @@ -269,7 +284,7 @@ void PrintResults(EnvVars const& ev, int const testNum, for (auto& time : times) { double iterDurationMsec = time.first; double iterBandwidthGbs = (t.numBytes / 1.0E9) / iterDurationMsec * 1000.0f; - printf(" Iter %03d %c %7.3f GB/s %c %8.3f ms %c", time.second, sep, iterBandwidthGbs, sep, iterDurationMsec, sep); + printf(" Iter %03d %c %8.3f GB/s %c %8.3f ms %c", time.second, sep, iterBandwidthGbs, sep, iterDurationMsec, sep); std::set usedXccs; if (time.second - 1 < r.perIterCUs.size()) { @@ -285,11 +300,11 @@ void PrintResults(EnvVars const& ev, int const testNum, printf(" %02d", x); printf("\n"); } - printf(" StandardDev %c %7.3f GB/s %c %8.3f ms %c\n", sep, stdDevBw, sep, stdDevTime, sep); + printf(" StandardDev %c %8.3f GB/s %c %8.3f ms %c\n", sep, stdDevBw, sep, stdDevTime, sep); } } } - printf(" Aggregate (CPU) %c %7.3f GB/s %c %8.3f ms %c %12lu bytes %c Overhead: %.3f ms\n", + printf(" Aggregate (CPU) %c %8.3f GB/s %c %8.3f ms %c %12lu bytes %c Overhead: %.3f ms\n", sep, results.avgTotalBandwidthGbPerSec, sep, results.avgTotalDurationMsec, sep, results.totalBytesTransferred, diff --git a/src/client/Client.hpp b/src/client/Client.hpp index 81dd7c5..b69dc1d 100644 --- a/src/client/Client.hpp +++ b/src/client/Client.hpp @@ -28,9 +28,9 @@ THE SOFTWARE. #include "TransferBench.hpp" #include "EnvVars.hpp" -size_t const DEFAULT_BYTES_PER_TRANSFER = (1<<26); +size_t const DEFAULT_BYTES_PER_TRANSFER = (1<<28); -char const ExeTypeName[4][4] = {"CPU", "GPU", "DMA", "IBV"}; +char const ExeTypeName[5][4] = {"CPU", "GPU", "DMA", "NIC", "NIC"}; // Display detected hardware void DisplayTopology(bool outputToCsv); diff --git a/src/client/EnvVars.hpp b/src/client/EnvVars.hpp index 869afde..4a2813b 100644 --- a/src/client/EnvVars.hpp +++ b/src/client/EnvVars.hpp @@ -100,6 +100,14 @@ class EnvVars int outputToCsv; // Output in CSV format int samplingFactor; // Affects how many different values of N are generated (when N set to 0) + // NIC options + int ibGidIndex; // GID Index for RoCE NICs + int roceVersion; // RoCE version number + int ipAddressFamily; // IP Address Famliy + uint8_t ibPort; // NIC port number to be used + int nicRelaxedOrder; // Use relaxed ordering for RDMA + std::string closestNicStr; // Holds the user-specified list of closest NICs + // Developer features int gpuMaxHwQueues; // Tracks GPU_MAX_HW_QUEUES environment variable @@ -147,8 +155,16 @@ class EnvVars validateDirect = GetEnvVar("VALIDATE_DIRECT" , 0); validateSource = GetEnvVar("VALIDATE_SOURCE" , 0); + ibGidIndex = GetEnvVar("IB_GID_INDEX" ,-1); + ibPort = GetEnvVar("IB_PORT_NUMBER" , 1); + roceVersion = GetEnvVar("ROCE_VERSION" , 2); + ipAddressFamily = GetEnvVar("IP_ADDRESS_FAMILY" , 4); + nicRelaxedOrder = GetEnvVar("NIC_RELAX_ORDER" , 1); + closestNicStr = GetEnvVar("CLOSEST_NIC" , ""); + gpuMaxHwQueues = GetEnvVar("GPU_MAX_HW_QUEUES" , 4); + // Check for fill pattern char* pattern = getenv("FILL_PATTERN"); if (pattern != NULL) { @@ -279,18 +295,32 @@ class EnvVars printf(" BLOCK_SIZE - # of threads per threadblock (Must be multiple of 64)\n"); printf(" BLOCK_BYTES - Controls granularity of how work is divided across subExecutors\n"); printf(" BYTE_OFFSET - Initial byte-offset for memory allocations. Must be multiple of 4\n"); +#if NIC_EXEC_ENABLED + printf(" CLOSEST_NIC - Comma-separated list of per-GPU closest NIC (default=auto)\n"); +#endif printf(" CU_MASK - CU mask for streams. Can specify ranges e.g '5,10-12,14'\n"); printf(" FILL_PATTERN - Big-endian pattern for source data, specified in hex digits. Must be even # of digits\n"); printf(" GFX_UNROLL - Unroll factor for GFX kernel (0=auto), must be less than %d\n", TransferBench::GetIntAttribute(ATR_GFX_MAX_UNROLL)); printf(" GFX_SINGLE_TEAM - Have subexecutors work together on full array instead of working on disjoint subarrays\n"); printf(" GFX_WAVE_ORDER - Stride pattern for GFX kernel (0=UWC,1=UCW,2=WUC,3=WCU,4=CUW,5=CWU)\n"); printf(" HIDE_ENV - Hide environment variable value listing\n"); +#if NIC_EXEC_ENABLED + printf(" IB_GID_INDEX - Required for RoCE NICs (default=-1/auto)\n"); + printf(" IB_PORT_NUMBER - RDMA port count for RDMA NIC (default=1)\n"); + printf(" IP_ADDRESS_FAMILY - IP address family (4=v4, 6=v6, default=v4)\n"); +#endif printf(" MIN_VAR_SUBEXEC - Minumum # of subexecutors to use for variable subExec Transfers\n"); printf(" MAX_VAR_SUBEXEC - Maximum # of subexecutors to use for variable subExec Transfers (0 for device limits)\n"); +#if NIC_EXEC_ENABLED + printf(" NIC_RELAX_ORDER - Set to non-zero to use relaxed ordering"); +#endif printf(" NUM_ITERATIONS - # of timed iterations per test. If negative, run for this many seconds instead\n"); printf(" NUM_SUBITERATIONS - # of sub-iterations to run per iteration. Must be non-negative\n"); printf(" NUM_WARMUPS - # of untimed warmup iterations per test\n"); printf(" OUTPUT_TO_CSV - Outputs to CSV format if set\n"); +#if NIC_EXEC_ENABLED + printf(" ROCE_VERSION - RoCE version (default=2)\n"); +#endif printf(" SAMPLING_FACTOR - Add this many samples (when possible) between powers of 2 when auto-generating data sizes\n"); printf(" SHOW_ITERATIONS - Show per-iteration timing info\n"); printf(" USE_HIP_EVENTS - Use HIP events for GFX executor timing\n"); @@ -301,6 +331,7 @@ class EnvVars printf(" VALIDATE_SOURCE - Validate GPU src memory immediately after preparation\n"); } + void Print(std::string const& name, int32_t const value, const char* format, ...) const { printf("%-20s%s%12d%s", name.c_str(), outputToCsv ? "," : " = ", value, outputToCsv ? "," : " : "); @@ -325,9 +356,12 @@ class EnvVars void DisplayEnvVars() const { int numGpuDevices = TransferBench::GetNumExecutors(EXE_GPU_GFX); - + std::string nicSupport = ""; +#if NIC_EXEC_ENABLED + nicSupport = " (with NIC support)"; +#endif if (!outputToCsv) { - printf("TransferBench v%s.%s\n", TransferBench::VERSION, CLIENT_VERSION); + printf("TransferBench v%s.%s%s\n", TransferBench::VERSION, CLIENT_VERSION, nicSupport.c_str()); printf("===============================================================\n"); if (!hideEnv) printf("[Common] (Suppress by setting HIDE_ENV=1)\n"); } @@ -341,6 +375,10 @@ class EnvVars "Each CU gets a mulitple of %d bytes to copy", blockBytes); Print("BYTE_OFFSET", byteOffset, "Using byte offset of %d", byteOffset); +#if NIC_EXEC_ENABLED + Print("CLOSEST_NIC", (closestNicStr == "" ? "auto" : "user-input"), + "Per-GPU closest NIC is set as %s", (closestNicStr == "" ? "auto" : closestNicStr.c_str())); +#endif Print("CU_MASK", getenv("CU_MASK") ? 1 : 0, "%s", (cuMask.size() ? GetCuMaskDesc().c_str() : "All")); Print("FILL_PATTERN", getenv("FILL_PATTERN") ? 1 : 0, @@ -359,11 +397,24 @@ class EnvVars gfxWaveOrder == 3 ? "Wavefront,CU,Unroll" : gfxWaveOrder == 4 ? "CU,Unroll,Wavefront" : "CU,Wavefront,Unroll")); +#if NIC_EXEC_ENABLED + Print("IP_ADDRESS_FAMILY", ipAddressFamily, + "IP address family is set to IPv%d", ipAddressFamily); + + Print("IB_GID_INDEX", ibGidIndex, + "RoCE GID index is set to %s", (ibGidIndex < 0 ? "auto" : std::to_string(ibGidIndex).c_str())); + Print("IB_PORT_NUMBER", ibPort, + "IB port number is set to %d", ibPort); +#endif Print("MIN_VAR_SUBEXEC", minNumVarSubExec, "Using at least %d subexecutor(s) for variable subExec tranfers", minNumVarSubExec); Print("MAX_VAR_SUBEXEC", maxNumVarSubExec, "Using up to %s subexecutors for variable subExec transfers", maxNumVarSubExec ? std::to_string(maxNumVarSubExec).c_str() : "all available"); +#if NIC_EXEC_ENABLED + Print("NIC_RELAX_ORDER", nicRelaxedOrder, + "Using %s ordering for NIC RDMA", nicRelaxedOrder ? "relaxed" : "strict"); +#endif Print("NUM_ITERATIONS", numIterations, (numIterations == 0) ? "Running infinitely" : "Running %d %s", abs(numIterations), (numIterations > 0 ? " timed iteration(s)" : "seconds(s) per Test")); @@ -371,6 +422,10 @@ class EnvVars "Running %s subiterations", (numSubIterations == 0 ? "infinite" : std::to_string(numSubIterations)).c_str()); Print("NUM_WARMUPS", numWarmups, "Running %d warmup iteration(s) per Test", numWarmups); +#if NIC_EXEC_ENABLED + Print("ROCE_VERSION", roceVersion, + "RoCE version is set to %d", roceVersion); +#endif Print("SHOW_ITERATIONS", showIterations, "%s per-iteration timing", showIterations ? "Showing" : "Hiding"); Print("USE_HIP_EVENTS", useHipEvents, @@ -381,7 +436,6 @@ class EnvVars "Running in %s mode", useInteractive ? "interactive" : "non-interactive"); Print("USE_SINGLE_STREAM", useSingleStream, "Using single stream per GFX %s", useSingleStream ? "device" : "Transfer"); - if (getenv("XCC_PREF_TABLE")) { printf("%36s: Preferred XCC Table (XCC_PREF_TABLE)\n", ""); printf("%36s: ", ""); @@ -479,6 +533,27 @@ class EnvVars cfg.gfx.useSingleTeam = gfxSingleTeam; cfg.gfx.waveOrder = gfxWaveOrder; + cfg.nic.ibGidIndex = ibGidIndex; + cfg.nic.ibPort = ibPort; + cfg.nic.ipAddressFamily = ipAddressFamily; + cfg.nic.useRelaxedOrder = nicRelaxedOrder; + cfg.nic.roceVersion = roceVersion; + + std::vector closestNics; + if(closestNicStr != "") { + std::stringstream ss(closestNicStr); + std::string item; + while (std::getline(ss, item, ',')) { + try { + int nic = std::stoi(item); + closestNics.push_back(nic); + } catch (const std::invalid_argument& e) { + printf("[ERROR] Invalid NIC index (%s) by user in %s\n", item.c_str(), closestNicStr.c_str()); + exit(1); + } + } + cfg.nic.closestNics = closestNics; + } return cfg; } }; diff --git a/src/client/Presets/AllToAll.hpp b/src/client/Presets/AllToAll.hpp index c7d5d76..216bd0d 100644 --- a/src/client/Presets/AllToAll.hpp +++ b/src/client/Presets/AllToAll.hpp @@ -47,6 +47,7 @@ void AllToAllPreset(EnvVars& ev, int a2aLocal = EnvVars::GetEnvVar("A2A_LOCAL" , 0); int a2aMode = EnvVars::GetEnvVar("A2A_MODE" , 0); int numGpus = EnvVars::GetEnvVar("NUM_GPU_DEVICES", numDetectedGpus); + int numQueuePairs = EnvVars::GetEnvVar("NUM_QUEUE_PAIRS", 0); int numSubExecs = EnvVars::GetEnvVar("NUM_SUB_EXEC" , 8); int useDmaExec = EnvVars::GetEnvVar("USE_DMA_EXEC" , 0); int useFineGrain = EnvVars::GetEnvVar("USE_FINE_GRAIN" , 1); @@ -60,6 +61,7 @@ void AllToAllPreset(EnvVars& ev, ev.Print("A2A_LOCAL" , a2aLocal , "%s local transfers", a2aLocal ? "Include" : "Exclude"); ev.Print("A2A_MODE" , a2aMode , a2aModeStr[a2aMode]); ev.Print("NUM_GPU_DEVICES", numGpus , "Using %d GPUs", numGpus); + ev.Print("NUM_QUEUE_PAIRS", numQueuePairs, "Using %d queue pairs for NIC transfers", numQueuePairs); ev.Print("NUM_SUB_EXEC" , numSubExecs , "Using %d subexecutors/CUs per Transfer", numSubExecs); ev.Print("USE_DMA_EXEC" , useDmaExec , "Using %s executor", useDmaExec ? "DMA" : "GFX"); ev.Print("USE_FINE_GRAIN" , useFineGrain , "Using %s-grained memory", useFineGrain ? "fine" : "coarse"); @@ -114,6 +116,23 @@ void AllToAllPreset(EnvVars& ev, } } + // Create a ring using NICs + std::vector nicTransferIdx(numGpus); + if (numQueuePairs > 0) { + int numNics = TransferBench::GetNumExecutors(EXE_NIC); + for (int i = 0; i < numGpus; i++) { + TransferBench::Transfer transfer; + transfer.numBytes = numBytesPerTransfer; + transfer.srcs.push_back({memType, i}); + transfer.dsts.push_back({memType, (i+1) % numGpus}); + transfer.exeDevice = {TransferBench::EXE_NIC_NEAREST, i}; + transfer.exeSubIndex = (i+1) % numGpus; + transfer.numSubExecs = numQueuePairs; + nicTransferIdx[i] = transfers.size(); + transfers.push_back(transfer); + } + } + printf("GPU-GFX All-To-All benchmark:\n"); printf("==========================\n"); printf("- Copying %lu bytes between %s pairs of GPUs using %d CUs (%lu Transfers)\n", @@ -138,15 +157,18 @@ void AllToAllPreset(EnvVars& ev, printf("SRC\\DST "); for (int dst = 0; dst < numGpus; dst++) printf("%cGPU %02d ", separator, dst); + if (numQueuePairs > 0) + printf("%cNIC(%02d QP)", separator, numQueuePairs); printf(" %cSTotal %cActual\n", separator, separator); double totalBandwidthGpu = 0.0; - double minExecutorBandwidth = std::numeric_limits::max(); - double maxExecutorBandwidth = 0.0; - std::vector colTotalBandwidth(numGpus+1, 0.0); + double minActualBandwidth = std::numeric_limits::max(); + double maxActualBandwidth = 0.0; + std::vector colTotalBandwidth(numGpus+2, 0.0); for (int src = 0; src < numGpus; src++) { double rowTotalBandwidth = 0; - double executorBandwidth = 0; + int transferCount = 0; + double minBandwidth = std::numeric_limits::max(); printf("GPU %02d", src); for (int dst = 0; dst < numGpus; dst++) { if (reIndex.count(std::make_pair(src, dst))) { @@ -155,24 +177,38 @@ void AllToAllPreset(EnvVars& ev, colTotalBandwidth[dst] += r.avgBandwidthGbPerSec; rowTotalBandwidth += r.avgBandwidthGbPerSec; totalBandwidthGpu += r.avgBandwidthGbPerSec; - executorBandwidth = std::max(executorBandwidth, - results.exeResults[transfers[transferIdx].exeDevice].avgBandwidthGbPerSec); + minBandwidth = std::min(minBandwidth, r.avgBandwidthGbPerSec); + transferCount++; printf("%c%8.3f ", separator, r.avgBandwidthGbPerSec); } else { printf("%c%8s ", separator, "N/A"); } } - printf(" %c%8.3f %c%8.3f\n", separator, rowTotalBandwidth, separator, executorBandwidth); - minExecutorBandwidth = std::min(minExecutorBandwidth, executorBandwidth); - maxExecutorBandwidth = std::max(maxExecutorBandwidth, executorBandwidth); - colTotalBandwidth[numGpus] += rowTotalBandwidth; + + if (numQueuePairs > 0) { + TransferBench::TransferResult const& r = results.tfrResults[nicTransferIdx[src]]; + colTotalBandwidth[numGpus] += r.avgBandwidthGbPerSec; + rowTotalBandwidth += r.avgBandwidthGbPerSec; + totalBandwidthGpu += r.avgBandwidthGbPerSec; + minBandwidth = std::min(minBandwidth, r.avgBandwidthGbPerSec); + transferCount++; + printf("%c%8.3f ", separator, r.avgBandwidthGbPerSec); + } + double actualBandwidth = minBandwidth * transferCount; + printf(" %c%8.3f %c%8.3f\n", separator, rowTotalBandwidth, separator, actualBandwidth); + minActualBandwidth = std::min(minActualBandwidth, actualBandwidth); + maxActualBandwidth = std::max(maxActualBandwidth, actualBandwidth); + colTotalBandwidth[numGpus+1] += rowTotalBandwidth; } printf("\nRTotal"); for (int dst = 0; dst < numGpus; dst++) { printf("%c%8.3f ", separator, colTotalBandwidth[dst]); } - printf(" %c%8.3f %c%8.3f %c%8.3f\n", separator, colTotalBandwidth[numGpus], - separator, minExecutorBandwidth, separator, maxExecutorBandwidth); + if (numQueuePairs > 0) { + printf("%c%8.3f ", separator, colTotalBandwidth[numGpus]); + } + printf(" %c%8.3f %c%8.3f %c%8.3f\n", separator, colTotalBandwidth[numGpus+1], + separator, minActualBandwidth, separator, maxActualBandwidth); printf("\n"); printf("Average bandwidth (GPU Timed): %8.3f GB/s\n", totalBandwidthGpu / transfers.size()); diff --git a/src/client/Presets/Sweep.hpp b/src/client/Presets/Sweep.hpp index 5162496..cb6a5fe 100644 --- a/src/client/Presets/Sweep.hpp +++ b/src/client/Presets/Sweep.hpp @@ -22,19 +22,21 @@ THE SOFTWARE. void LogTransfers(FILE *fp, int const testNum, std::vector const& transfers) { - fprintf(fp, "# Test %d\n", testNum); - fprintf(fp, "%d", -1 * (int)transfers.size()); - for (auto const& transfer : transfers) - { - fprintf(fp, " (%s->%c%d->%s %d %lu)", - MemDevicesToStr(transfer.srcs).c_str(), - ExeTypeStr[transfer.exeDevice.exeType], transfer.exeDevice.exeIndex, - MemDevicesToStr(transfer.dsts).c_str(), - transfer.numSubExecs, - transfer.numBytes); + if (fp) { + fprintf(fp, "# Test %d\n", testNum); + fprintf(fp, "%d", -1 * (int)transfers.size()); + for (auto const& transfer : transfers) + { + fprintf(fp, " (%s->%c%d->%s %d %lu)", + MemDevicesToStr(transfer.srcs).c_str(), + ExeTypeStr[transfer.exeDevice.exeType], transfer.exeDevice.exeIndex, + MemDevicesToStr(transfer.dsts).c_str(), + transfer.numSubExecs, + transfer.numBytes); + } + fprintf(fp, "\n"); + fflush(fp); } - fprintf(fp, "\n"); - fflush(fp); } void SweepPreset(EnvVars& ev, @@ -54,6 +56,7 @@ void SweepPreset(EnvVars& ev, int numGpuSubExecs = EnvVars::GetEnvVar("NUM_GPU_SE" , 4); std::string sweepDst = EnvVars::GetEnvVar("SWEEP_DST" , "CG"); std::string sweepExe = EnvVars::GetEnvVar("SWEEP_EXE" , "CDG"); + std::string sweepFile = EnvVars::GetEnvVar("SWEEP_FILE" , "/tmp/lastSweep.cfg"); int sweepMax = EnvVars::GetEnvVar("SWEEP_MAX" , 24); int sweepMin = EnvVars::GetEnvVar("SWEEP_MIN" , 1); int sweepRandBytes = EnvVars::GetEnvVar("SWEEP_RAND_BYTES" , 0); @@ -78,6 +81,7 @@ void SweepPreset(EnvVars& ev, ev.Print("NUM_GPU_SE", numGpuSubExecs, "Using %d subExecutors/CUs per GPU executed Transfer", numGpuSubExecs); ev.Print("SWEEP_DST", sweepDst.c_str(), "Destination Memory Types to sweep"); ev.Print("SWEEP_EXE", sweepExe.c_str(), "Executor Types to sweep"); + ev.Print("SWEEP_FILE", sweepFile.c_str(),"File to store the executing sweep configuration"); ev.Print("SWEEP_MAX", sweepMax, "Max simultaneous transfers (0 = no limit)"); ev.Print("SWEEP_MIN", sweepMin, "Min simultaenous transfers"); ev.Print("SWEEP_RAND_BYTES", sweepRandBytes, "Using %s number of bytes per Transfer", (sweepRandBytes ? "random" : "constant")); @@ -283,10 +287,14 @@ void SweepPreset(EnvVars& ev, std::uniform_int_distribution distribution(sweepMin, maxParallelTransfers); // Log sweep to configuration file - FILE *fp = fopen("lastSweep.cfg", "w"); + char absPath[1024]; + auto const res = realpath(sweepFile.c_str(), absPath); + + FILE *fp = fopen(sweepFile.c_str(), "w"); if (!fp) { - printf("[ERROR] Unable to open lastSweep.cfg. Check permissions\n"); - exit(1); + printf("[WARN] Unable to open %s. Skipping output of sweep configuration file\n", res ? absPath : sweepFile.c_str()); + } else { + printf("Sweep configuration saved to: %s\n", res ? absPath : sweepFile.c_str()); } // Create bitmask of numPossible triplets, of which M will be chosen @@ -333,7 +341,7 @@ void SweepPreset(EnvVars& ev, // Check for test limit if (numTestsRun == sweepTestLimit) { - printf("Test limit reached\n"); + printf("Sweep Test limit reached\n"); break; } @@ -341,7 +349,7 @@ void SweepPreset(EnvVars& ev, auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart; double totalCpuTime = std::chrono::duration_cast>(cpuDelta).count(); if (sweepTimeLimit && totalCpuTime > sweepTimeLimit) { - printf("Time limit exceeded\n"); + printf("Sweep Time limit exceeded\n"); break; } @@ -357,5 +365,5 @@ void SweepPreset(EnvVars& ev, bitmask[i] = (i < M) ? 1 : 0; } } - fclose(fp); + if (fp) fclose(fp); } diff --git a/src/client/Topology.hpp b/src/client/Topology.hpp index 42f4b5f..3cde3e9 100644 --- a/src/client/Topology.hpp +++ b/src/client/Topology.hpp @@ -38,21 +38,53 @@ static int RemappedCpuIndex(int origIdx) return remappingCpu[origIdx]; } +static void PrintNicToGPUTopo(bool outputToCsv) +{ +#ifdef NIC_EXEC_ENABLED + printf(" NIC | Device Name | Active | PCIe Bus ID | NUMA | Closest GPU(s)\n"); + if(!outputToCsv) + printf("-----+-------------+--------+--------------+------+---------------\n"); + + int numGpus = TransferBench::GetNumExecutors(EXE_GPU_GFX); + auto const& ibvDeviceList = GetIbvDeviceList(); + for (int i = 0; i < ibvDeviceList.size(); i++) { + + std::string closestGpusStr = ""; + for (int j = 0; j < numGpus; j++) { + if (TransferBench::GetClosestNicToGpu(j) == i) { + if (closestGpusStr != "") closestGpusStr += ","; + closestGpusStr += std::to_string(j); + } + } + + printf(" %-3d | %-11s | %-6s | %-12s | %-4d | %-20s\n", + i, ibvDeviceList[i].name.c_str(), + ibvDeviceList[i].hasActivePort ? "Yes" : "No", + ibvDeviceList[i].busId.c_str(), + ibvDeviceList[i].numaNode, + closestGpusStr.c_str()); + } + printf("\n"); +#endif +} + void DisplayTopology(bool outputToCsv) { int numCpus = TransferBench::GetNumExecutors(EXE_CPU); int numGpus = TransferBench::GetNumExecutors(EXE_GPU_GFX); - + int numNics = TransferBench::GetNumExecutors(EXE_NIC); char sep = (outputToCsv ? ',' : '|'); if (outputToCsv) { printf("NumCpus,%d\n", numCpus); printf("NumGpus,%d\n", numGpus); + printf("NumNics,%d\n", numNics); } else { printf("\nDetected Topology:\n"); printf("==================\n"); printf(" %d configured CPU NUMA node(s) [%d total]\n", numCpus, numa_max_node() + 1); printf(" %d GPU device(s)\n", numGpus); + printf(" %d Supported NIC device(s)\n", numNics); } // Print out detected CPU topology @@ -91,8 +123,10 @@ void DisplayTopology(bool outputToCsv) } printf("\n"); - // Print out detected GPU topology + // Print out detected NIC topology + PrintNicToGPUTopo(outputToCsv); + // Print out detected GPU topology #if defined(__NVCC__) for (int i = 0; i < numGpus; i++) { hipDeviceProp_t prop; @@ -118,12 +152,12 @@ void DisplayTopology(bool outputToCsv) printf(" %c", sep); for (int j = 0; j < numGpus; j++) printf(" GPU %02d %c", j, sep); - printf(" PCIe Bus ID %c #CUs %c NUMA %c #DMA %c #XCC\n", sep, sep, sep, sep); + printf(" PCIe Bus ID %c #CUs %c NUMA %c #DMA %c #XCC %c NIC\n", sep, sep, sep, sep, sep); if (!outputToCsv) { for (int j = 0; j <= numGpus; j++) printf("--------+"); - printf("--------------+------+------+------+------\n"); + printf("--------------+------+------+------+------+------\n"); } // Loop over each GPU device @@ -149,12 +183,13 @@ void DisplayTopology(bool outputToCsv) char pciBusId[20]; HIP_CALL(hipDeviceGetPCIBusId(pciBusId, 20, i)); - printf(" %11s %c %4d %c %4d %c %4d %c %4d\n", + printf(" %-11s %c %-4d %c %-4d %c %-4d %c %-4d %c %-4d\n", pciBusId, sep, TransferBench::GetNumSubExecutors({EXE_GPU_GFX, i}), sep, TransferBench::GetClosestCpuNumaToGpu(i), sep, TransferBench::GetNumExecutorSubIndices({EXE_GPU_DMA, i}), sep, - TransferBench::GetNumExecutorSubIndices({EXE_GPU_GFX, i})); + TransferBench::GetNumExecutorSubIndices({EXE_GPU_GFX, i}), sep, + TransferBench::GetClosestNicToGpu(i)); } #endif } diff --git a/src/header/TransferBench.hpp b/src/header/TransferBench.hpp index 54ed692..19a1534 100644 --- a/src/header/TransferBench.hpp +++ b/src/header/TransferBench.hpp @@ -34,6 +34,19 @@ THE SOFTWARE. #include #include +#ifdef NIC_EXEC_ENABLED +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + #if defined(__NVCC__) #include #else @@ -51,7 +64,7 @@ namespace TransferBench using std::set; using std::vector; - constexpr char VERSION[] = "1.58"; + constexpr char VERSION[] = "1.59"; /** * Enumeration of supported Executor types @@ -64,11 +77,13 @@ namespace TransferBench EXE_CPU = 0, ///< CPU executor (subExecutor = CPU thread) EXE_GPU_GFX = 1, ///< GPU kernel-based executor (subExecutor = threadblock/CU) EXE_GPU_DMA = 2, ///< GPU SDMA executor (subExecutor = not supported) - EXE_IBV = 3, ///< IBVerbs executor (subExecutor = queue pair) + EXE_NIC = 3, ///< NIC RDMA executor (subExecutor = queue pair) + EXE_NIC_NEAREST = 4 ///< NIC RDMA nearest executor (subExecutor = queue pair) }; - char const ExeTypeStr[5] = "CGDI"; + char const ExeTypeStr[6] = "CGDIN"; inline bool IsCpuExeType(ExeType e){ return e == EXE_CPU; } inline bool IsGpuExeType(ExeType e){ return e == EXE_GPU_GFX || e == EXE_GPU_DMA; } + inline bool IsNicExeType(ExeType e){ return e == EXE_NIC || e == EXE_NIC_NEAREST; } /** * A ExeDevice defines a specific Executor @@ -120,11 +135,10 @@ namespace TransferBench */ struct Transfer { - size_t numBytes = (1<<26); ///< Number of bytes to Transfer + size_t numBytes = 0; ///< Number of bytes to Transfer vector srcs = {}; ///< List of source memory devices vector dsts = {}; ///< List of destination memory devices ExeDevice exeDevice = {}; ///< Executor to use - int32_t exeDstIndex = -1; ///< Destination executor index (for RDMA executor only) int32_t exeSubIndex = -1; ///< Executor subindex int numSubExecs = 0; ///< Number of subExecutors to use for this Transfer }; @@ -154,15 +168,6 @@ namespace TransferBench int validateSource = 0; ///< Validate src GPU memory immediately after preparation }; - /** - * DMA Executor options - */ - struct DmaOptions - { - int useHipEvents = 1; ///< Use HIP events for timing DMA Executor - int useHsaCopy = 0; ///< Use HSA copy instead of HIP copy to perform DMA - }; - /** * GFX Executor options */ @@ -178,6 +183,33 @@ namespace TransferBench int waveOrder = 0; ///< GFX-kernel wavefront ordering }; + /** + * DMA Executor options + */ + struct DmaOptions + { + int useHipEvents = 1; ///< Use HIP events for timing DMA Executor + int useHsaCopy = 0; ///< Use HSA copy instead of HIP copy to perform DMA + }; + + /** + * NIC Executor options + */ + struct NicOptions + { + vector closestNics = {}; ///< Overrides the auto-detected closest NIC per GPU + int ibGidIndex = -1; ///< GID Index for RoCE NICs (-1 is auto) + uint8_t ibPort = 1; ///< NIC port number to be used + int ipAddressFamily = 4; ///< 4=IPv4, 6=IPv6 (used for auto GID detection) + int maxRecvWorkReq = 16; ///< Maximum number of recv work requests per queue pair + int maxSendWorkReq = 16; ///< Maximum number of send work requests per queue pair + int queueSize = 100; ///< Completion queue size + int roceVersion = 2; ///< RoCE version (used for auto GID detection) + int useRelaxedOrder = 1; ///< Use relaxed ordering + int useNuma = 0; ///< Switch to closest numa thread for execution + }; + + /** * Configuration options for performing Transfers */ @@ -188,6 +220,7 @@ namespace TransferBench GfxOptions gfx; ///< GFX executor options DmaOptions dma; ///< DMA executor options + NicOptions nic; ///< NIC executor options }; /** @@ -243,6 +276,9 @@ namespace TransferBench // Only filled in if recordPerIteration = 1 vector perIterMsec; ///< Duration for each individual iteration vector>> perIterCUs; ///< GFX-Executor only. XCC:CU used per iteration + + ExeDevice exeDevice; ///< Tracks which executor performed this Transfer (e.g. for EXE_NIC_NEAREST) + ExeDevice exeDstDevice; ///< Tracks actual destination executor (only valid for EXE_NIC/EXE_NIC_NEAREST) }; /** @@ -344,6 +380,23 @@ namespace TransferBench */ int GetClosestCpuNumaToGpu(int gpuIndex); + /** + * Returns the index of the NUMA node closest to the given NIC + * + * @param[in] nicIndex Index of the NIC to query + * @returns NUMA node index closest to the NIC nicIndex, or -1 if unable to detect + */ + int GetClosestCpuNumaToNic(int nicIndex); + + /** + * Returns the index of the NIC closest to the given GPU + * + * @param[in] gpuIndex Index of the GPU to query + * @note This function is applicable when the IBV/RDMA executor is available + * @returns IB Verbs capable NIC index closest to GPU gpuIndex, or -1 if unable to detect + */ + int GetClosestNicToGpu(int gpuIndex); + /** * Helper function to parse a line containing Transfers into a vector of Transfers * @@ -353,7 +406,6 @@ namespace TransferBench */ ErrResult ParseTransfers(std::string str, std::vector& transfers); - }; //========================================================================================== // End of TransferBench API @@ -435,7 +487,7 @@ namespace TransferBench #endif // Macro for collecting XCC GFX kernel is running on -#if defined(__gfx940__) || defined(__gfx941__) || defined(__gfx942__) +#if defined(__gfx940__) || defined(__gfx941__) || defined(__gfx942__) || defined(__gfx950__) #define GetXccId(val) asm volatile ("s_getreg_b32 %0, hwreg(HW_REG_XCC_ID)" : "=s" (val)); #else #define GetXccId(val) val = 0 @@ -459,6 +511,45 @@ namespace TransferBench return false; \ } while (0) +// Helper macros for calling RDMA functions and reporting errors +#ifdef VERBS_DEBUG +#define IBV_CALL(__func__, ...) \ + do { \ + int error = __func__(__VA_ARGS__); \ + if (error != 0) { \ + return {ERR_FATAL, "Encountered IbVerbs error (%d) at line (%d) " \ + "and function (%s)", (error), __LINE__, #__func__}; \ + } \ + } while (0) + +#define IBV_PTR_CALL(__ptr__, __func__, ...) \ + do { \ + __ptr__ = __func__(__VA_ARGS__); \ + if (__ptr__ == nullptr) { \ + return {ERR_FATAL, "Encountered IbVerbs nullptr error at line (%d) " \ + "and function (%s)", __LINE__, #__func__}; \ + } \ + } while (0) +#else +#define IBV_CALL(__func__, ...) \ + do { \ + int error = __func__(__VA_ARGS__); \ + if (error != 0) { \ + return {ERR_FATAL, "Encountered IbVerbs error (%d) in func (%s) " \ + , error, #__func__}; \ + } \ + } while (0) + +#define IBV_PTR_CALL(__ptr__, __func__, ...) \ + do { \ + __ptr__ = __func__(__VA_ARGS__); \ + if (__ptr__ == nullptr) { \ + return {ERR_FATAL, "Encountered IbVerbs nullptr error in func (%s) " \ + , #__func__}; \ + } \ + } while (0) +#endif + namespace TransferBench { @@ -469,13 +560,14 @@ namespace { // Constants //======================================================================================== - int constexpr MAX_BLOCKSIZE = 512; // Max threadblock size - int constexpr MAX_WAVEGROUPS = MAX_BLOCKSIZE / 64; // Max wavegroups/warps - int constexpr MAX_UNROLL = 8; // Max unroll factor - int constexpr MAX_SRCS = 8; // Max number of srcs per Transfer - int constexpr MAX_DSTS = 8; // Max number of dsts per Transfer - int constexpr MEMSET_CHAR = 75; // Value to memset (char) - float constexpr MEMSET_VAL = 13323083.0f; // Value to memset (double) + + int constexpr MAX_BLOCKSIZE = 512; // Max threadblock size + int constexpr MAX_WAVEGROUPS = MAX_BLOCKSIZE / 64; // Max wavegroups/warps + int constexpr MAX_UNROLL = 8; // Max unroll factor + int constexpr MAX_SRCS = 8; // Max srcs per Transfer + int constexpr MAX_DSTS = 8; // Max dsts per Transfer + int constexpr MEMSET_CHAR = 75; // Value to memset (char) + float constexpr MEMSET_VAL = 13323083.0f; // Value to memset (double) // Parsing-related functions //======================================================================================== @@ -594,7 +686,7 @@ namespace { if (mistakeCount > 0) { return {ERR_FATAL, "%lu out of %lu pages for memory allocation were not on NUMA node %d." - " This could be due to hardware memory issues", + " This could be due to hardware memory issues, or the use of numa-rebalancing daemons such as numad", mistakeCount, numPages, targetId}; } return ERR_NONE; @@ -744,6 +836,7 @@ namespace { break; case EXE_GPU_GFX: case EXE_GPU_DMA: if (exeIndex < 0 || exeIndex >= numGpus) + return {ERR_FATAL, "GPU index must be between 0 and %d inclusively", numGpus - 1}; agent = gpuAgents[exeIndex]; break; @@ -769,6 +862,29 @@ namespace { // Setup validation-related functions //======================================================================================== + static ErrResult GetActualExecutor(ConfigOptions const& cfg, + ExeDevice const& origExeDevice, + ExeDevice& actualExeDevice) + { + // By default, nothing needs to change + actualExeDevice = origExeDevice; + + // When using NIC_NEAREST, remap to the closest NIC to the GPU + if (origExeDevice.exeType == EXE_NIC_NEAREST) { + actualExeDevice.exeType = EXE_NIC; + + if (cfg.nic.closestNics.size() > 0) { + if (origExeDevice.exeIndex < 0 || origExeDevice.exeIndex >= cfg.nic.closestNics.size()) + return {ERR_FATAL, "NIC index is out of range (%d)", origExeDevice.exeIndex}; + + actualExeDevice.exeIndex = cfg.nic.closestNics[origExeDevice.exeIndex]; + } else { + actualExeDevice.exeIndex = GetClosestNicToGpu(origExeDevice.exeIndex); + } + } + return ERR_NONE; + } + // Validate that MemDevice exists static ErrResult CheckMemDevice(MemDevice const& memDevice) { @@ -848,6 +964,20 @@ namespace { } } + // Check NIC options +#ifdef NIC_EXEC_ENABLED + int numNics = GetNumExecutors(EXE_NIC); + for (auto const& nic : cfg.nic.closestNics) + if (nic < 0 || nic >= numNics) + errors.push_back({ERR_FATAL, "NIC index (%d) in user-specified closest NIC list must be between 0 and %d", + nic, numNics - 1}); + + size_t closetNicsSize = cfg.nic.closestNics.size(); + if (closetNicsSize > 0 && closetNicsSize < numGpus) + errors.push_back({ERR_FATAL, "User-specified closest NIC list must match GPU count of %d", + numGpus}); +#endif + // NVIDIA specific #if defined(__NVCC__) if (cfg.data.validateDirect) @@ -881,6 +1011,7 @@ namespace { { int numCpus = GetNumExecutors(EXE_CPU); int numGpus = GetNumExecutors(EXE_GPU_GFX); + int numNics = GetNumExecutors(EXE_NIC); std::set executors; std::map transferCount; @@ -1025,8 +1156,33 @@ namespace { } } break; - case EXE_IBV: - errors.push_back({ERR_FATAL, "Transfer %d: IBV executor currently not supported", i}); + case EXE_NIC: +#ifdef NIC_EXEC_ENABLED + { + int srcIndex = t.exeDevice.exeIndex; + int dstIndex = t.exeSubIndex; + if (srcIndex < 0 || srcIndex >= numNics) + errors.push_back({ERR_FATAL, "Transfer %d: src NIC executor indexes an out-of-range NIC (%d)", i, srcIndex}); + if (dstIndex < 0 || dstIndex >= numNics) + errors.push_back({ERR_FATAL, "Transfer %d: dst NIC executor indexes an out-of-range NIC (%d)", i, dstIndex}); + } +#else + errors.push_back({ERR_FATAL, "Transfer %d: NIC executor is requested but is not available", i}); +#endif + break; + case EXE_NIC_NEAREST: +#ifdef NIC_EXEC_ENABLED + { + ExeDevice srcExeDevice; + ErrResult errSrc = GetActualExecutor(cfg, t.exeDevice, srcExeDevice); + if (errSrc.errType != ERR_NONE) errors.push_back(errSrc); + ExeDevice dstExeDevice; + ErrResult errDst = GetActualExecutor(cfg, {t.exeDevice.exeType, t.exeSubIndex}, dstExeDevice); + if (errDst.errType != ERR_NONE) errors.push_back(errDst); + } +#else + errors.push_back({ERR_FATAL, "Transfer %d: NIC executor is requested but is not available", i}); +#endif break; } @@ -1106,7 +1262,6 @@ namespace { } } - // Check for fatal errors for (auto const& err : errors) if (err.errType == ERR_FATAL) return true; @@ -1147,6 +1302,7 @@ namespace { vector dstMem; ///< Destination memory vector subExecParamCpu; ///< Defines subarrays for each subexecutor vector subExecIdx; ///< Indices into subExecParamGpu + int numaNode; ///< NUMA node to use for this Transfer // For GFX executor SubExecParam* subExecParamGpuPtr; @@ -1159,6 +1315,29 @@ namespace { hsa_amd_sdma_engine_id_t sdmaEngineId; ///< DMA engine ID #endif +// For IBV executor +#ifdef NIC_EXEC_ENABLED + int srcNicIndex; ///< SRC NIC index + int dstNicIndex; ///< DST NIC index + ibv_context* srcContext; ///< Device context for SRC NIC + ibv_context* dstContext; ///< Device context for DST NIC + ibv_pd* srcProtect; ///< Protection domain for SRC NIC + ibv_pd* dstProtect; ///< Protection domain for DST NIC + ibv_cq* srcCompQueue; ///< Completion queue for SRC NIC + ibv_cq* dstCompQueue; ///< Completion queue for DST NIC + ibv_port_attr srcPortAttr; ///< Port attributes for SRC NIC + ibv_port_attr dstPortAttr; ///< Port attributes for DST NIC + ibv_gid srcGid; ///< GID handle for SRC NIC + ibv_gid dstGid; ///< GID handle for DST NIC + vector srcQueuePairs; ///< Queue pairs for SRC NIC + vector dstQueuePairs; ///< Queue pairs for DST NIC + ibv_mr* srcMemRegion; ///< Memory region for SRC + ibv_mr* dstMemRegion; ///< Memory region for DST + uint8_t qpCount; ///< Number of QPs to be used for transferring data + vector sgePerQueuePair; ///< Scatter-gather elements per queue pair + vector sendWorkRequests; ///< Send work requests per queue pair +#endif + // Counters double totalDurationMsec; ///< Total duration for all iterations for this Transfer vector perIterMsec; ///< Duration for each individual iteration @@ -1173,7 +1352,6 @@ namespace { int totalSubExecs; ///< Total number of subExecutors to use bool useSubIndices; ///< Use subexecutor indicies int numSubIndices; ///< Number of subindices this ExeDevice has - int wallClockRate; ///< (GFX-only) Device wall clock rate vector subExecParamCpu; ///< Subexecutor parameters for this executor vector resources; ///< Per-Transfer resources @@ -1182,8 +1360,750 @@ namespace { vector streams; ///< HIP streams to launch on vector startEvents; ///< HIP start timing event vector stopEvents; ///< HIP stop timing event + int wallClockRate; ///< (GFX-only) Device wall clock rate }; + // Structure to track PCIe topology + struct PCIeNode + { + std::string address; ///< PCIe address for this PCIe node + std::string description; ///< Description for this PCIe node + std::set children; ///< Children PCIe nodes + + // Default constructor + PCIeNode() : address(""), description("") {} + + // Constructor + PCIeNode(std::string const& addr) : address(addr) {} + + // Constructor + PCIeNode(std::string const& addr, std::string const& desc) + :address(addr), description(desc) {} + + // Comparison operator for std::set + bool operator<(PCIeNode const& other) const { + return address < other.address; + } + }; + +#ifdef NIC_EXEC_ENABLED + // Structure to track information about IBV devices + struct IbvDevice + { + ibv_device* devicePtr; + std::string name; + std::string busId; + bool hasActivePort; + int numaNode; + }; +#endif + +#ifdef NIC_EXEC_ENABLED +// Function to collect information about IBV devices +//======================================================================================== + static vector& GetIbvDeviceList() + { + static bool isInitialized = false; + static vector ibvDeviceList = {}; + + // Build list on first use + if (!isInitialized) { + + // Query the number of IBV devices + int numIbvDevices = 0; + ibv_device** deviceList = ibv_get_device_list(&numIbvDevices); + + if (deviceList && numIbvDevices > 0) { + // Loop over each device to collect information + for (int i = 0; i < numIbvDevices; i++) { + IbvDevice ibvDevice; + ibvDevice.devicePtr = deviceList[i]; + ibvDevice.name = deviceList[i]->name; + ibvDevice.hasActivePort = false; + { + struct ibv_context *context = ibv_open_device(ibvDevice.devicePtr); + if (context) { + struct ibv_device_attr deviceAttr; + if (!ibv_query_device(context, &deviceAttr)) { + for (int port = 1; port <= deviceAttr.phys_port_cnt; ++port) { + struct ibv_port_attr portAttr; + if (ibv_query_port(context, port, &portAttr)) continue; + if (portAttr.state == IBV_PORT_ACTIVE) + ibvDevice.hasActivePort = true; + break; + } + } + ibv_close_device(context); + } + } + ibvDevice.busId = ""; + { + std::string device_path(ibvDevice.devicePtr->dev_path); + if (std::filesystem::exists(device_path)) { + std::string pciPath = std::filesystem::canonical(device_path + "/device").string(); + std::size_t pos = pciPath.find_last_of('/'); + if (pos != std::string::npos) { + ibvDevice.busId = pciPath.substr(pos + 1); + } + } + } + + // Get nearest numa node for this device + ibvDevice.numaNode = -1; + std::filesystem::path devicePath = "/sys/bus/pci/devices/" + ibvDevice.busId + "/numa_node"; + std::string canonicalPath = std::filesystem::canonical(devicePath).string(); + + if (std::filesystem::exists(canonicalPath)) { + std::ifstream file(canonicalPath); + if (file.is_open()) { + std::string numaNodeStr; + std::getline(file, numaNodeStr); + int numaNodeVal; + if (sscanf(numaNodeStr.c_str(), "%d", &numaNodeVal) == 1) + ibvDevice.numaNode = numaNodeVal; + file.close(); + } + } + ibvDeviceList.push_back(ibvDevice); + } + } + ibv_free_device_list(deviceList); + isInitialized = true; + } + return ibvDeviceList; + } +#endif // NIC_EXEC_ENABLED + +#ifdef NIC_EXEC_ENABLED +// PCIe-related functions +//======================================================================================== + + // Prints off PCIe tree + static void PrintPCIeTree(PCIeNode const& node, + std::string const& prefix = "", + bool isLast = true) + { + if (!node.address.empty()) { + printf("%s%s%s", prefix.c_str(), (isLast ? "└── " : "├── "), node.address.c_str()); + if (!node.description.empty()) { + printf("(%s)", node.description.c_str()); + } + printf("\n"); + } + auto const& children = node.children; + for (auto it = children.begin(); it != children.end(); ++it) { + PrintPCIeTree(*it, prefix + (isLast ? " " : "│ "), std::next(it) == children.end()); + } + } + + // Inserts nodes along pcieAddress down a tree starting from root + static ErrResult InsertPCIePathToTree(std::string const& pcieAddress, + std::string const& description, + PCIeNode& root) + { + std::filesystem::path devicePath = "/sys/bus/pci/devices/" + pcieAddress; + std::string canonicalPath = std::filesystem::canonical(devicePath).string(); + + if (!std::filesystem::exists(devicePath)) { + return {ERR_FATAL, "Device path %s does not exist", devicePath.c_str()}; + } + + std::istringstream iss(canonicalPath); + std::string token; + + PCIeNode* currNode = &root; + while (std::getline(iss, token, '/')) { + auto it = (currNode->children.insert(PCIeNode(token))).first; + currNode = const_cast(&(*it)); + } + currNode->description = description; + + return ERR_NONE; + } + + // Returns root node for PCIe tree. Constructed on first use + static PCIeNode* GetPCIeTreeRoot() + { + static bool isInitialized = false; + static PCIeNode pcieRoot; + + // Build PCIe tree on first use + if (!isInitialized) { + // Add NICs to the tree + int numNics = GetNumExecutors(EXE_NIC); + auto const& ibvDeviceList = GetIbvDeviceList(); + for (IbvDevice const& ibvDevice : ibvDeviceList) { + if (!ibvDevice.hasActivePort || ibvDevice.busId == "") continue; + InsertPCIePathToTree(ibvDevice.busId, ibvDevice.name, pcieRoot); + } + + // Add GPUs to the tree + int numGpus = GetNumExecutors(EXE_GPU_GFX); + for (int i = 0; i < numGpus; ++i) { + char hipPciBusId[64]; + if (hipDeviceGetPCIBusId(hipPciBusId, sizeof(hipPciBusId), i) == hipSuccess) { + InsertPCIePathToTree(hipPciBusId, "GPU " + std::to_string(i), pcieRoot); + } + } +#ifdef VERBS_DEBUG + PrintPCIeTree(pcieRoot); +#endif + isInitialized = true; + } + return &pcieRoot; + } + + // Finds the lowest common ancestor in PCIe tree between two nodes + static PCIeNode const* GetLcaBetweenNodes(PCIeNode const* root, + std::string const& node1Address, + std::string const& node2Address) + { + if (!root || root->address == node1Address || root->address == node2Address) + return root; + + PCIeNode const* lcaFound1 = nullptr; + PCIeNode const* lcaFound2 = nullptr; + + // Recursively iterate over children + for (auto const& child : root->children) { + PCIeNode const* lca = GetLcaBetweenNodes(&child, node1Address, node2Address); + if (!lca) continue; + if (!lcaFound1) { + // First time found + lcaFound1 = lca; + } else { + // Second time found + lcaFound2 = lca; + break; + } + } + + // If two children were found, then current node is the lowest common ancestor + return (lcaFound1 && lcaFound2) ? root : lcaFound1; + } + + // Gets the depth of an node in the PCIe tree + static int GetLcaDepth(std::string const& targetBusID, + PCIeNode const* const& node, + int depth = 0) + { + if (!node) return -1; + if (targetBusID == node->address) return depth; + + for (auto const& child : node->children) { + int distance = GetLcaDepth(targetBusID, &child, depth + 1); + if (distance != -1) + return distance; + } + return -1; + } + + // Function to extract the bus number from a PCIe address (domain:bus:device.function) + static int ExtractBusNumber(std::string const& pcieAddress) + { + int domain, bus, device, function; + char delimiter; + + std::istringstream iss(pcieAddress); + iss >> std::hex >> domain >> delimiter >> bus >> delimiter >> device >> delimiter >> function; + if (iss.fail()) { +#ifdef VERBS_DEBUG + printf("Invalid PCIe address format: %s\n", pcieAddress.c_str()); +#endif + return -1; + } + return bus; + } + + // Function to compute the distance between two bus IDs + static int GetBusIdDistance(std::string const& pcieAddress1, + std::string const& pcieAddress2) + { + int bus1 = ExtractBusNumber(pcieAddress1); + int bus2 = ExtractBusNumber(pcieAddress2); + return (bus1 < 0 || bus2 < 0) ? -1 : std::abs(bus1 - bus2); + } + + // Given a target busID and a set of candidate devices, returns a set of indices + // that is "closest" to the target + static std::set GetNearestDevicesInTree(std::string const& targetBusId, + std::vector const& candidateBusIdList) + { + int maxDepth = -1; + int minDistance = std::numeric_limits::max(); + std::set matches = {}; + + // Loop over the candidates to find the ones with the lowest common ancestor (LCA) + for (int i = 0; i < candidateBusIdList.size(); i++) { + std::string const& candidateBusId = candidateBusIdList[i]; + if (candidateBusId == "") continue; + PCIeNode const* lca = GetLcaBetweenNodes(GetPCIeTreeRoot(), targetBusId, candidateBusId); + if (!lca) continue; + + int depth = GetLcaDepth(lca->address, GetPCIeTreeRoot()); + int currDistance = GetBusIdDistance(targetBusId, candidateBusId); + + // When more than one LCA match is found, choose the one with smallest busId difference + // NOTE: currDistance could be -1, which signals problem with parsing, however still + // remains a valid "closest" candidate, so is included + if (depth > maxDepth || (depth == maxDepth && depth >= 0 && currDistance < minDistance)) { + maxDepth = depth; + matches.clear(); + matches.insert(i); + minDistance = currDistance; + } else if (depth == maxDepth && depth >= 0 && currDistance == minDistance) { + matches.insert(i); + } + } + return matches; + } +#endif // NIC_EXEC_ENABLED + +#ifdef NIC_EXEC_ENABLED +// IB Verbs-related functions +//======================================================================================== + + // Create a queue pair + static ErrResult CreateQueuePair(ConfigOptions const& cfg, + struct ibv_pd* pd, + struct ibv_cq* cq, + struct ibv_qp*& qp) + { + // Set queue pair attributes + struct ibv_qp_init_attr attr = {}; + attr.qp_type = IBV_QPT_RC; // Set type to reliable connection + attr.send_cq = cq; // Send completion queue + attr.recv_cq = cq; // Recv completion queue + attr.cap.max_send_wr = cfg.nic.maxSendWorkReq; // Max send work requests + attr.cap.max_recv_wr = cfg.nic.maxRecvWorkReq; // Max recv work requests + attr.cap.max_send_sge = 1; // Max send scatter-gather entries + attr.cap.max_recv_sge = 1; // Max recv scatter-gather entries + + qp = ibv_create_qp(pd, &attr); + if (qp == NULL) + return {ERR_FATAL, "Error while creating QP"}; + + return ERR_NONE; + } + + // Initialize a queue pair + static ErrResult InitQueuePair(struct ibv_qp* qp, + uint8_t port, + unsigned flags) + { + struct ibv_qp_attr attr = {}; // Clear all attributes + attr.qp_state = IBV_QPS_INIT; // Set the QP state to INIT + attr.pkey_index = 0; // Set the partition key index to 0 + attr.port_num = port; // Set the port number to the defined IB_PORT + attr.qp_access_flags = flags; // Set the QP access flags to the provided flags + + int ret = ibv_modify_qp(qp, &attr, + IBV_QP_STATE | // Modify the QP state + IBV_QP_PKEY_INDEX | // Modify the partition key index + IBV_QP_PORT | // Modify the port number + IBV_QP_ACCESS_FLAGS); // Modify the access flags + + if (ret != 0) + return {ERR_FATAL, "Error during QP Init. IB Verbs Error code: %d", ret}; + + return ERR_NONE; + } + + // Transition QueuePair to Ready to Receive State + static ErrResult TransitionQpToRtr(ibv_qp* qp, + uint16_t const& dlid, + uint32_t const& dqpn, + ibv_gid const& gid, + uint8_t const& gidIndex, + uint8_t const& port, + bool const& isRoCE, + ibv_mtu const& mtu) + { + // Prepare QP attributes + struct ibv_qp_attr attr = {}; + attr.qp_state = IBV_QPS_RTR; + attr.path_mtu = mtu; + attr.rq_psn = 0; + attr.max_dest_rd_atomic = 1; + attr.min_rnr_timer = 12; + if (isRoCE) { + attr.ah_attr.is_global = 1; + attr.ah_attr.grh.dgid.global.subnet_prefix = gid.global.subnet_prefix; + attr.ah_attr.grh.dgid.global.interface_id = gid.global.interface_id; + attr.ah_attr.grh.flow_label = 0; + attr.ah_attr.grh.sgid_index = gidIndex; + attr.ah_attr.grh.hop_limit = 255; + } else { + attr.ah_attr.is_global = 0; + attr.ah_attr.dlid = dlid; + } + attr.ah_attr.sl = 0; + attr.ah_attr.src_path_bits = 0; + attr.ah_attr.port_num = port; + attr.dest_qp_num = dqpn; + + // Modify the QP + int ret = ibv_modify_qp(qp, &attr, + IBV_QP_STATE | + IBV_QP_AV | + IBV_QP_PATH_MTU | + IBV_QP_DEST_QPN | + IBV_QP_RQ_PSN | + IBV_QP_MAX_DEST_RD_ATOMIC | + IBV_QP_MIN_RNR_TIMER); + if (ret != 0) + return {ERR_FATAL, "Error during QP RTR. IB Verbs Error code: %d", ret}; + + return ERR_NONE; + } + + // Transition QueuePair to Ready to Send state + static ErrResult TransitionQpToRts(struct ibv_qp *qp) + { + struct ibv_qp_attr attr = {}; + attr.qp_state = IBV_QPS_RTS; + attr.sq_psn = 0; + attr.timeout = 14; + attr.retry_cnt = 7; + attr.rnr_retry = 7; + attr.max_rd_atomic = 1; + + int ret = ibv_modify_qp(qp, &attr, + IBV_QP_STATE | + IBV_QP_TIMEOUT | + IBV_QP_RETRY_CNT | + IBV_QP_RNR_RETRY | + IBV_QP_SQ_PSN | + IBV_QP_MAX_QP_RD_ATOMIC); + if (ret != 0) + return {ERR_FATAL, "Error during QP RTS. IB Verbs Error code: %d", ret}; + + return ERR_NONE; + } + + static bool IsConfiguredGid(union ibv_gid* gid) + { + const struct in6_addr *a = (struct in6_addr *)gid->raw; + int trailer = (a->s6_addr32[1] | a->s6_addr32[2] | a->s6_addr32[3]); + if (((a->s6_addr32[0] | trailer) == 0UL) || + ((a->s6_addr32[0] == htonl(0xfe800000)) && (trailer == 0UL))) { + return false; + } + return true; + } + + static bool LinkLocalGid(union ibv_gid* gid) + { + const struct in6_addr *a = (struct in6_addr *)gid->raw; + if (a->s6_addr32[0] == htonl(0xfe800000) && a->s6_addr32[1] == 0UL) { + return true; + } + return false; + } + + static bool IsValidGid(union ibv_gid* gid) + { + return (IsConfiguredGid(gid) && !LinkLocalGid(gid)); + } + + static sa_family_t GetGidAddressFamily(union ibv_gid* gid) + { + const struct in6_addr *a = (struct in6_addr *)gid->raw; + bool isIpV4Mapped = ((a->s6_addr32[0] | a->s6_addr32[1]) | + (a->s6_addr32[2] ^ htonl(0x0000ffff))) == 0UL; + bool isIpV4MappedMulticast = (a->s6_addr32[0] == htonl(0xff0e0000) && + ((a->s6_addr32[1] | + (a->s6_addr32[2] ^ htonl(0x0000ffff))) == 0UL)); + return (isIpV4Mapped || isIpV4MappedMulticast) ? AF_INET : AF_INET6; + } + + static bool MatchGidAddressFamily(sa_family_t const& af, + void* prefix, + int prefixLen, + union ibv_gid* gid) + { + struct in_addr *base = NULL; + struct in6_addr *base6 = NULL; + struct in6_addr *addr6 = NULL;; + if (af == AF_INET) { + base = (struct in_addr *)prefix; + } else { + base6 = (struct in6_addr *)prefix; + } + addr6 = (struct in6_addr *)gid->raw; +#define NETMASK(bits) (htonl(0xffffffff ^ ((1 << (32 - bits)) - 1))) + int i = 0; + while (prefixLen > 0 && i < 4) { + if (af == AF_INET) { + int mask = NETMASK(prefixLen); + if ((base->s_addr & mask) ^ (addr6->s6_addr32[3] & mask)) + break; + prefixLen = 0; + break; + } else { + if (prefixLen >= 32) { + if (base6->s6_addr32[i] ^ addr6->s6_addr32[i]) + break; + prefixLen -= 32; + ++i; + } else { + int mask = NETMASK(prefixLen); + if ((base6->s6_addr32[i] & mask) ^ (addr6->s6_addr32[i] & mask)) + break; + prefixLen = 0; + } + } + } + return (prefixLen == 0) ? true : false; +#undef NETMASK + } + + static ErrResult GetRoceVersionNumber(struct ibv_context* const& context, + int const& portNum, + int const& gidIndex, + int* version) + { + char const* deviceName = ibv_get_device_name(context->device); + char gidRoceVerStr[16] = {}; + char roceTypePath[PATH_MAX] = {}; + sprintf(roceTypePath, "/sys/class/infiniband/%s/ports/%d/gid_attrs/types/%d", + deviceName, portNum, gidIndex); + + int fd = open(roceTypePath, O_RDONLY); + if (fd == -1) + return {ERR_FATAL, "Failed while opening RoCE file path (%s)", roceTypePath}; + + int ret = read(fd, gidRoceVerStr, 15); + close(fd); + + if (ret == -1) + return {ERR_FATAL, "Failed while reading RoCE version"}; + + if (strlen(gidRoceVerStr)) { + if (strncmp(gidRoceVerStr, "IB/RoCE v1", strlen("IB/RoCE v1")) == 0 + || strncmp(gidRoceVerStr, "RoCE v1", strlen("RoCE v1")) == 0) { + *version = 1; + } + else if (strncmp(gidRoceVerStr, "RoCE v2", strlen("RoCE v2")) == 0) { + *version = 2; + } + } + return ERR_NONE; + } + + static ErrResult GetGidIndex(ConfigOptions const& cfg, + struct ibv_context* context, + int const& gidTblLen, + int& gidIndex) + { + // Use GID index if user specified + if (gidIndex >= 0) return ERR_NONE; + + // Try to find the best GID index + int port = cfg.nic.ibPort; + sa_family_t targetAddrFam = (cfg.nic.ipAddressFamily == 6)? AF_INET6 : AF_INET; + int targetRoCEVer = cfg.nic.roceVersion; + + // Initially assume gidIndex = 0 + int gidIndexCurr = 0; + union ibv_gid gidCurr; + IBV_CALL(ibv_query_gid, context, port, gidIndexCurr, &gidCurr); + sa_family_t gidCurrFam = GetGidAddressFamily(&gidCurr); + bool gidCurrIsValid = IsValidGid(&gidCurr); + int gidCurrRoceVersion; + ERR_CHECK(GetRoceVersionNumber(context, port, gidIndexCurr, &gidCurrRoceVersion)); + + // Loop over GID table to find the best match + for (int gidIndexTest = 1; gidIndexTest < gidTblLen; ++gidIndexTest) { + union ibv_gid gidTest; + IBV_CALL(ibv_query_gid, context, cfg.nic.ibPort, gidIndexTest, &gidTest); + if (!IsValidGid(&gidTest)) continue; + + sa_family_t gidTestFam = GetGidAddressFamily(&gidTest); + bool gidTestMatchSubnet = MatchGidAddressFamily(targetAddrFam, NULL, 0, &gidTest); + int gidTestRoceVersion; + ERR_CHECK(GetRoceVersionNumber(context, port, gidIndexTest, &gidTestRoceVersion)); + + if (!gidCurrIsValid || + (gidTestFam == targetAddrFam && gidTestMatchSubnet && + (gidCurrFam != targetAddrFam || gidTestRoceVersion == targetRoCEVer))) { + // Switch to better match + gidIndexCurr = gidIndexTest; + gidCurrFam = gidTestFam; + gidCurrIsValid = true; + gidCurrRoceVersion = gidTestRoceVersion; + } + } + + gidIndex = gidIndexCurr; + return ERR_NONE; + } + + static ErrResult PrepareNicTransferResources(ConfigOptions const& cfg, + ExeDevice const& srcExeDevice, + Transfer const& t, + TransferResources& rss) + + { + // Switch to the closest NUMA node to this NIC + int numaNode = GetIbvDeviceList()[srcExeDevice.exeIndex].numaNode; + if (numaNode != -1) + numa_run_on_node(numaNode); + + int const port = cfg.nic.ibPort; + + // Figure out destination NIC (Accounts for possible remap due to use of EXE_NIC_NEAREST) + ExeDevice dstExeDevice; + ERR_CHECK(GetActualExecutor(cfg, {t.exeDevice.exeType, t.exeSubIndex}, dstExeDevice)); + + rss.srcNicIndex = srcExeDevice.exeIndex; + rss.dstNicIndex = dstExeDevice.exeIndex; + rss.qpCount = t.numSubExecs; + + // Check for valid NICs and active ports + int numNics = GetNumExecutors(EXE_NIC); + if (rss.srcNicIndex < 0 || rss.srcNicIndex >= numNics) + return {ERR_FATAL, "SRC NIC index is out of range (%d)", rss.srcNicIndex}; + if (rss.dstNicIndex < 0 || rss.dstNicIndex >= numNics) + return {ERR_FATAL, "DST NIC index is out of range (%d)", rss.dstNicIndex}; + if (!GetIbvDeviceList()[rss.srcNicIndex].hasActivePort) + return {ERR_FATAL, "SRC NIC %d is not active\n", rss.srcNicIndex}; + if (!GetIbvDeviceList()[rss.dstNicIndex].hasActivePort) + return {ERR_FATAL, "DST NIC %d is not active\n", rss.dstNicIndex}; + + // Queue pair flags + unsigned int rdmaAccessFlags = (IBV_ACCESS_LOCAL_WRITE | + IBV_ACCESS_REMOTE_READ | + IBV_ACCESS_REMOTE_WRITE | + IBV_ACCESS_REMOTE_ATOMIC); + + unsigned int rdmaMemRegFlags = rdmaAccessFlags; + if (cfg.nic.useRelaxedOrder) rdmaMemRegFlags |= IBV_ACCESS_RELAXED_ORDERING; + + // Open NIC contexts + IBV_PTR_CALL(rss.srcContext, ibv_open_device, GetIbvDeviceList()[rss.srcNicIndex].devicePtr); + IBV_PTR_CALL(rss.dstContext, ibv_open_device, GetIbvDeviceList()[rss.dstNicIndex].devicePtr); + + // Open protection domains + IBV_PTR_CALL(rss.srcProtect, ibv_alloc_pd, rss.srcContext); + IBV_PTR_CALL(rss.dstProtect, ibv_alloc_pd, rss.dstContext); + + // Register memory region + IBV_PTR_CALL(rss.srcMemRegion, ibv_reg_mr, rss.srcProtect, rss.srcMem[0], rss.numBytes, rdmaMemRegFlags); + IBV_PTR_CALL(rss.dstMemRegion, ibv_reg_mr, rss.dstProtect, rss.dstMem[0], rss.numBytes, rdmaMemRegFlags); + + // Create completion queues + IBV_PTR_CALL(rss.srcCompQueue, ibv_create_cq, rss.srcContext, cfg.nic.queueSize, NULL, NULL, 0); + IBV_PTR_CALL(rss.dstCompQueue, ibv_create_cq, rss.dstContext, cfg.nic.queueSize, NULL, NULL, 0); + + // Get port attributes + IBV_CALL(ibv_query_port, rss.srcContext, port, &rss.srcPortAttr); + IBV_CALL(ibv_query_port, rss.dstContext, port, &rss.dstPortAttr); + + + if (rss.srcPortAttr.link_layer != rss.dstPortAttr.link_layer) + return {ERR_FATAL, "SRC NIC (%d) and DST NIC (%d) do not have the same link layer", rss.srcNicIndex, rss.dstNicIndex}; + + // Prepare GID index + int srcGidIndex = cfg.nic.ibGidIndex; + int dstGidIndex = cfg.nic.ibGidIndex; + + // Check for RDMA over Converged Ethernet (RoCE) and update GID index appropriately + bool isRoCE = (rss.srcPortAttr.link_layer == IBV_LINK_LAYER_ETHERNET); + if (isRoCE) { + // Try to auto-detect the GID index + ERR_CHECK(GetGidIndex(cfg, rss.srcContext, rss.srcPortAttr.gid_tbl_len, srcGidIndex)); + ERR_CHECK(GetGidIndex(cfg, rss.dstContext, rss.dstPortAttr.gid_tbl_len, dstGidIndex)); + IBV_CALL(ibv_query_gid, rss.srcContext, port, srcGidIndex, &rss.srcGid); + IBV_CALL(ibv_query_gid, rss.dstContext, port, dstGidIndex, &rss.dstGid); + } + + // Prepare queue pairs and send elements + rss.srcQueuePairs.resize(rss.qpCount); + rss.dstQueuePairs.resize(rss.qpCount); + rss.sgePerQueuePair.resize(rss.qpCount); + rss.sendWorkRequests.resize(rss.qpCount); + + for (int i = 0; i < rss.qpCount; ++i) { + + // Create scatter-gather element for the portion of memory assigned to this queue pair + ibv_sge sg = {}; + sg.addr = (uint64_t)rss.subExecParamCpu[i].src[0]; + sg.length = rss.subExecParamCpu[i].N * sizeof(float); + sg.lkey = rss.srcMemRegion->lkey; + rss.sgePerQueuePair[i] = sg; + + // Create send work request + ibv_send_wr wr = {}; + wr.wr_id = i; + wr.sg_list = &rss.sgePerQueuePair[i]; + wr.num_sge = 1; + wr.opcode = IBV_WR_RDMA_WRITE; + wr.send_flags = IBV_SEND_SIGNALED; + wr.wr.rdma.remote_addr = (uint64_t)rss.subExecParamCpu[i].dst[0]; + wr.wr.rdma.rkey = rss.dstMemRegion->rkey; + rss.sendWorkRequests[i] = wr; + + // Create SRC/DST queue pairs + ERR_CHECK(CreateQueuePair(cfg, rss.srcProtect, rss.srcCompQueue, rss.srcQueuePairs[i])); + ERR_CHECK(CreateQueuePair(cfg, rss.dstProtect, rss.dstCompQueue, rss.dstQueuePairs[i])); + + // Initialize SRC/DST queue pairs + ERR_CHECK(InitQueuePair(rss.srcQueuePairs[i], port, rdmaAccessFlags)); + ERR_CHECK(InitQueuePair(rss.dstQueuePairs[i], port, rdmaAccessFlags)); + + // Transition the SRC queue pair to ready to receive + ERR_CHECK(TransitionQpToRtr(rss.srcQueuePairs[i], rss.dstPortAttr.lid, + rss.dstQueuePairs[i]->qp_num, rss.dstGid, + dstGidIndex, port, isRoCE, + rss.srcPortAttr.active_mtu)); + + // Transition the SRC queue pair to ready to send + ERR_CHECK(TransitionQpToRts(rss.srcQueuePairs[i])); + + // Transition the DST queue pair to ready to receive + ERR_CHECK(TransitionQpToRtr(rss.dstQueuePairs[i], rss.srcPortAttr.lid, + rss.srcQueuePairs[i]->qp_num, rss.srcGid, + srcGidIndex, port, isRoCE, + rss.dstPortAttr.active_mtu)); + + // Transition the DST queue pair to ready to send + ERR_CHECK(TransitionQpToRts(rss.dstQueuePairs[i])); + } + + return ERR_NONE; + } + + static ErrResult TeardownNicTransferResources(TransferResources& rss) + { + // Deregister memory regions + IBV_CALL(ibv_dereg_mr, rss.srcMemRegion); + IBV_CALL(ibv_dereg_mr, rss.dstMemRegion); + + // Destroy queue pairs + for (auto srcQueuePair : rss.srcQueuePairs) + IBV_CALL(ibv_destroy_qp, srcQueuePair); + rss.srcQueuePairs.clear(); + for (auto dstQueuePair : rss.dstQueuePairs) + IBV_CALL(ibv_destroy_qp, dstQueuePair); + rss.dstQueuePairs.clear(); + + // Destroy completion queues + IBV_CALL(ibv_destroy_cq, rss.srcCompQueue); + IBV_CALL(ibv_destroy_cq, rss.dstCompQueue); + + // Deallocate protection domains + IBV_CALL(ibv_dealloc_pd, rss.srcProtect); + IBV_CALL(ibv_dealloc_pd, rss.dstProtect); + + // Destroy context + IBV_CALL(ibv_close_device, rss.srcContext); + IBV_CALL(ibv_close_device, rss.dstContext); + + return ERR_NONE; + } +#endif // NIC_EXEC_ENABLED + // Data validation-related functions //======================================================================================== @@ -1248,17 +2168,17 @@ namespace { float* output; size_t initOffset = cfg.data.byteOffset / sizeof(float); - for (auto resource : transferResources) { - int transferIdx = resource->transferIdx; + for (auto rss : transferResources) { + int transferIdx = rss->transferIdx; Transfer const& t = transfers[transferIdx]; size_t N = t.numBytes / sizeof(float); float const* expected = dstReference[t.srcs.size()].data(); - for (int dstIdx = 0; dstIdx < resource->dstMem.size(); dstIdx++) { + for (int dstIdx = 0; dstIdx < rss->dstMem.size(); dstIdx++) { if (IsCpuMemType(t.dsts[dstIdx].memType) || cfg.data.validateDirect) { - output = (resource->dstMem[dstIdx]) + initOffset; + output = (rss->dstMem[dstIdx]) + initOffset; } else { - ERR_CHECK(hipMemcpy(outputBuffer.data(), (resource->dstMem[dstIdx]) + initOffset, t.numBytes, hipMemcpyDefault)); + ERR_CHECK(hipMemcpy(outputBuffer.data(), (rss->dstMem[dstIdx]) + initOffset, t.numBytes, hipMemcpyDefault)); ERR_CHECK(hipDeviceSynchronize()); output = outputBuffer.data(); } @@ -1286,7 +2206,7 @@ namespace { // Initializes counters static ErrResult PrepareSubExecParams(ConfigOptions const& cfg, Transfer const& transfer, - TransferResources& resources) + TransferResources& rss) { // Each subExecutor needs to know src/dst pointers and how many elements to transfer // Figure out the sub-array each subExecutor works on for this Transfer @@ -1300,15 +2220,15 @@ namespace { int const maxSubExecToUse = std::min((size_t)(N + targetMultiple - 1) / targetMultiple, (size_t)transfer.numSubExecs); - vector& subExecParam = resources.subExecParamCpu; + vector& subExecParam = rss.subExecParamCpu; subExecParam.clear(); subExecParam.resize(transfer.numSubExecs); size_t assigned = 0; for (int i = 0; i < transfer.numSubExecs; ++i) { SubExecParam& p = subExecParam[i]; - p.numSrcs = resources.srcMem.size(); - p.numDsts = resources.dstMem.size(); + p.numSrcs = rss.srcMem.size(); + p.numDsts = rss.dstMem.size(); p.startCycle = 0; p.stopCycle = 0; p.hwId = 0; @@ -1319,8 +2239,8 @@ namespace { p.N = N; p.teamSize = transfer.numSubExecs; p.teamIdx = i; - for (int iSrc = 0; iSrc < p.numSrcs; ++iSrc) p.src[iSrc] = resources.srcMem[iSrc] + initOffset; - for (int iDst = 0; iDst < p.numDsts; ++iDst) p.dst[iDst] = resources.dstMem[iDst] + initOffset; + for (int iSrc = 0; iSrc < p.numSrcs; ++iSrc) p.src[iSrc] = rss.srcMem[iSrc] + initOffset; + for (int iDst = 0; iDst < p.numDsts; ++iDst) p.dst[iDst] = rss.dstMem[iDst] + initOffset; } else { // Otherwise, each subexecutor works on separate subarrays int const subExecLeft = std::max(0, maxSubExecToUse - i); @@ -1330,8 +2250,8 @@ namespace { p.N = subExecLeft ? std::min(leftover, ((roundedN / subExecLeft) * targetMultiple)) : 0; p.teamSize = 1; p.teamIdx = 0; - for (int iSrc = 0; iSrc < p.numSrcs; ++iSrc) p.src[iSrc] = resources.srcMem[iSrc] + initOffset + assigned; - for (int iDst = 0; iDst < p.numDsts; ++iDst) p.dst[iDst] = resources.dstMem[iDst] + initOffset + assigned; + for (int iSrc = 0; iSrc < p.numSrcs; ++iSrc) p.src[iSrc] = rss.srcMem[iSrc] + initOffset + assigned; + for (int iDst = 0; iDst < p.numDsts; ++iDst) p.dst[iDst] = rss.dstMem[iDst] + initOffset + assigned; assigned += p.N; } @@ -1352,7 +2272,7 @@ namespace { } // Clear counters - resources.totalDurationMsec = 0.0; + rss.totalDurationMsec = 0.0; return ERR_NONE; } @@ -1367,12 +2287,12 @@ namespace { exeInfo.totalDurationMsec = 0.0; // Loop over each transfer this executor is involved in - for (auto& resources : exeInfo.resources) { - Transfer const& t = transfers[resources.transferIdx]; - resources.numBytes = t.numBytes; + for (auto& rss : exeInfo.resources) { + Transfer const& t = transfers[rss.transferIdx]; + rss.numBytes = t.numBytes; // Allocate source memory - resources.srcMem.resize(t.srcs.size()); + rss.srcMem.resize(t.srcs.size()); for (int iSrc = 0; iSrc < t.srcs.size(); ++iSrc) { MemDevice const& srcMemDevice = t.srcs[iSrc]; @@ -1381,11 +2301,11 @@ namespace { srcMemDevice.memIndex != exeDevice.exeIndex) { ERR_CHECK(EnablePeerAccess(exeDevice.exeIndex, srcMemDevice.memIndex)); } - ERR_CHECK(AllocateMemory(srcMemDevice, t.numBytes + cfg.data.byteOffset, (void**)&resources.srcMem[iSrc])); + ERR_CHECK(AllocateMemory(srcMemDevice, t.numBytes + cfg.data.byteOffset, (void**)&rss.srcMem[iSrc])); } // Allocate destination memory - resources.dstMem.resize(t.dsts.size()); + rss.dstMem.resize(t.dsts.size()); for (int iDst = 0; iDst < t.dsts.size(); ++iDst) { MemDevice const& dstMemDevice = t.dsts[iDst]; @@ -1394,7 +2314,7 @@ namespace { dstMemDevice.memIndex != exeDevice.exeIndex) { ERR_CHECK(EnablePeerAccess(exeDevice.exeIndex, dstMemDevice.memIndex)); } - ERR_CHECK(AllocateMemory(dstMemDevice, t.numBytes + cfg.data.byteOffset, (void**)&resources.dstMem[iDst])); + ERR_CHECK(AllocateMemory(dstMemDevice, t.numBytes + cfg.data.byteOffset, (void**)&rss.dstMem[iDst])); } if (exeDevice.exeType == EXE_GPU_DMA && (t.exeSubIndex != -1 || cfg.dma.useHsaCopy)) { @@ -1402,22 +2322,22 @@ namespace { // Collect HSA agent information hsa_amd_pointer_info_t info; info.size = sizeof(info); - ERR_CHECK(hsa_amd_pointer_info(resources.dstMem[0], &info, NULL, NULL, NULL)); - resources.dstAgent = info.agentOwner; + ERR_CHECK(hsa_amd_pointer_info(rss.dstMem[0], &info, NULL, NULL, NULL)); + rss.dstAgent = info.agentOwner; - ERR_CHECK(hsa_amd_pointer_info(resources.srcMem[0], &info, NULL, NULL, NULL)); - resources.srcAgent = info.agentOwner; + ERR_CHECK(hsa_amd_pointer_info(rss.srcMem[0], &info, NULL, NULL, NULL)); + rss.srcAgent = info.agentOwner; // Create HSA completion signal - ERR_CHECK(hsa_signal_create(1, 0, NULL, &resources.signal)); + ERR_CHECK(hsa_signal_create(1, 0, NULL, &rss.signal)); if (t.exeSubIndex != -1) - resources.sdmaEngineId = (hsa_amd_sdma_engine_id_t)(1U << t.exeSubIndex); + rss.sdmaEngineId = (hsa_amd_sdma_engine_id_t)(1U << t.exeSubIndex); #endif } // Prepare subexecutor parameters - ERR_CHECK(PrepareSubExecParams(cfg, t, resources)); + ERR_CHECK(PrepareSubExecParams(cfg, t, rss)); } // Prepare additional requirements for GPU-based executors @@ -1476,11 +2396,11 @@ namespace { exeDevice.exeIndex)); #endif int transferOffset = 0; - for (auto& resources : exeInfo.resources) { - Transfer const& t = transfers[resources.transferIdx]; - resources.subExecParamGpuPtr = exeInfo.subExecParamGpu + transferOffset; - for (auto p : resources.subExecParamCpu) { - resources.subExecIdx.push_back(exeInfo.subExecParamCpu.size()); + for (auto& rss : exeInfo.resources) { + Transfer const& t = transfers[rss.transferIdx]; + rss.subExecParamGpuPtr = exeInfo.subExecParamGpu + transferOffset; + for (auto p : rss.subExecParamCpu) { + rss.subExecIdx.push_back(exeInfo.subExecParamCpu.size()); exeInfo.subExecParamCpu.push_back(p); transferOffset++; } @@ -1495,6 +2415,17 @@ namespace { ERR_CHECK(hipDeviceSynchronize()); } + // Prepare for NIC-based executors + if (IsNicExeType(exeDevice.exeType)) { +#ifdef NIC_EXEC_ENABLED + for (auto& rss : exeInfo.resources) { + Transfer const& t = transfers[rss.transferIdx]; + ERR_CHECK(PrepareNicTransferResources(cfg, exeDevice, t, rss)); + } +#else + return {ERR_FATAL, "RDMA executor is not supported"}; +#endif + } return ERR_NONE; } @@ -1508,23 +2439,30 @@ namespace { ExeInfo& exeInfo) { // Loop over each transfer this executor is involved in - for (auto& resources : exeInfo.resources) { - Transfer const& t = transfers[resources.transferIdx]; + for (auto& rss : exeInfo.resources) { + Transfer const& t = transfers[rss.transferIdx]; // Deallocate source memory for (int iSrc = 0; iSrc < t.srcs.size(); ++iSrc) { - ERR_CHECK(DeallocateMemory(t.srcs[iSrc].memType, resources.srcMem[iSrc], t.numBytes + cfg.data.byteOffset)); + ERR_CHECK(DeallocateMemory(t.srcs[iSrc].memType, rss.srcMem[iSrc], t.numBytes + cfg.data.byteOffset)); } // Deallocate destination memory for (int iDst = 0; iDst < t.dsts.size(); ++iDst) { - ERR_CHECK(DeallocateMemory(t.dsts[iDst].memType, resources.dstMem[iDst], t.numBytes + cfg.data.byteOffset)); + ERR_CHECK(DeallocateMemory(t.dsts[iDst].memType, rss.dstMem[iDst], t.numBytes + cfg.data.byteOffset)); } // Destroy HSA signal for DMA executor #if !defined(__NVCC__) if (exeDevice.exeType == EXE_GPU_DMA && (t.exeSubIndex != -1 || cfg.dma.useHsaCopy)) { - ERR_CHECK(hsa_signal_destroy(resources.signal)); + ERR_CHECK(hsa_signal_destroy(rss.signal)); + } +#endif + + // Destroy NIC related resources +#ifdef NIC_EXEC_ENABLED + if (IsNicExeType(exeDevice.exeType)) { + ERR_CHECK(TeardownNicTransferResources(rss)); } #endif } @@ -1557,68 +2495,69 @@ namespace { //======================================================================================== // Kernel for CPU execution (run by a single subexecutor) - static void CpuReduceKernel(SubExecParam const& p) + static void CpuReduceKernel(SubExecParam const& p, int numSubIterations) { if (p.N == 0) return; - int const& numSrcs = p.numSrcs; - int const& numDsts = p.numDsts; - - if (numSrcs == 0) { - for (int i = 0; i < numDsts; ++i) { - memset(p.dst[i], MEMSET_CHAR, p.N * sizeof(float)); - //for (int j = 0; j < p.N; j++) p.dst[i][j] = MEMSET_VAL; - } - } else if (numSrcs == 1) { - float const* __restrict__ src = p.src[0]; - if (numDsts == 0) { - float sum = 0.0; - for (int j = 0; j < p.N; j++) - sum += p.src[0][j]; + int subIteration = 0; + do { + int const& numSrcs = p.numSrcs; + int const& numDsts = p.numDsts; - // Add a dummy check to ensure the read is not optimized out - if (sum != sum) { - printf("[ERROR] Nan detected\n"); + if (numSrcs == 0) { + for (int i = 0; i < numDsts; ++i) { + memset(p.dst[i], MEMSET_CHAR, p.N * sizeof(float)); + //for (int j = 0; j < p.N; j++) p.dst[i][j] = MEMSET_VAL; + } + } else if (numSrcs == 1) { + float const* __restrict__ src = p.src[0]; + if (numDsts == 0) { + float sum = 0.0; + for (int j = 0; j < p.N; j++) + sum += p.src[0][j]; + + // Add a dummy check to ensure the read is not optimized out + if (sum != sum) { + printf("[ERROR] Nan detected\n"); + } + } else { + for (int i = 0; i < numDsts; ++i) + memcpy(p.dst[i], src, p.N * sizeof(float)); } } else { - for (int i = 0; i < numDsts; ++i) - memcpy(p.dst[i], src, p.N * sizeof(float)); - } - } else { - float sum = 0.0f; - for (int j = 0; j < p.N; j++) { - sum = p.src[0][j]; - for (int i = 1; i < numSrcs; i++) sum += p.src[i][j]; - for (int i = 0; i < numDsts; i++) p.dst[i][j] = sum; + float sum = 0.0f; + for (int j = 0; j < p.N; j++) { + sum = p.src[0][j]; + for (int i = 1; i < numSrcs; i++) sum += p.src[i][j]; + for (int i = 0; i < numDsts; i++) p.dst[i][j] = sum; + } } - } + } while (++subIteration != numSubIterations); } // Execution of a single CPU Transfers static void ExecuteCpuTransfer(int const iteration, ConfigOptions const& cfg, int const exeIndex, - TransferResources& resources) + TransferResources& rss) { auto cpuStart = std::chrono::high_resolution_clock::now(); vector childThreads; - int subIteration = 0; - do { - for (auto const& subExecParam : resources.subExecParamCpu) - childThreads.emplace_back(std::thread(CpuReduceKernel, std::cref(subExecParam))); - for (auto& subExecThread : childThreads) - subExecThread.join(); - childThreads.clear(); - } while (++subIteration != cfg.general.numSubIterations); + for (auto const& subExecParam : rss.subExecParamCpu) + childThreads.emplace_back(std::thread(CpuReduceKernel, std::cref(subExecParam), cfg.general.numSubIterations)); + + for (auto& subExecThread : childThreads) + subExecThread.join(); + childThreads.clear(); auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart; double deltaMsec = std::chrono::duration_cast>(cpuDelta).count() * 1000.0; if (iteration >= 0) { - resources.totalDurationMsec += deltaMsec; + rss.totalDurationMsec += deltaMsec; if (cfg.general.recordPerIteration) - resources.perIterMsec.push_back(deltaMsec); + rss.perIterMsec.push_back(deltaMsec); } } @@ -1632,12 +2571,12 @@ namespace { auto cpuStart = std::chrono::high_resolution_clock::now(); vector asyncTransfers; - for (auto& resource : exeInfo.resources) { + for (auto& rss : exeInfo.resources) { asyncTransfers.emplace_back(std::thread(ExecuteCpuTransfer, iteration, std::cref(cfg), exeIndex, - std::ref(resource))); + std::ref(rss))); } for (auto& asyncTransfer : asyncTransfers) asyncTransfer.join(); @@ -1649,6 +2588,90 @@ namespace { return ERR_NONE; } +#ifdef NIC_EXEC_ENABLED + // Execution of a single NIC Transfer + static ErrResult ExecuteNicTransfer(int const iteration, + ConfigOptions const& cfg, + int const exeIndex, + TransferResources& rss) + { + auto cpuStart = std::chrono::high_resolution_clock::now(); + + // Switch to the closest NUMA node to this NIC + if (cfg.nic.useNuma) { + int numaNode = GetIbvDeviceList()[exeIndex].numaNode; + if (numaNode != -1) + numa_run_on_node(numaNode); + } + + int subIteration = 0; + do { + // Loop over each of the queue pairs and post the send + ibv_send_wr* badWorkReq; + for (int qpIndex = 0; qpIndex < rss.qpCount; qpIndex++) { + int error = ibv_post_send(rss.srcQueuePairs[qpIndex], &rss.sendWorkRequests[qpIndex], &badWorkReq); + if (error) + return {ERR_FATAL, "Transfer %d: Error when calling ibv_post_send for QP %d Error code %d\n", + rss.transferIdx, qpIndex, error}; + } + + // Poll the completion queue until all queue pairs are complete + // The order of completion doesn't matter because this completion queue is dedicated to this Transfer + int numComplete = 0; + ibv_wc wc; + while (numComplete < rss.qpCount) { + int nc = ibv_poll_cq(rss.srcCompQueue, 1, &wc); + if (nc > 0) { + numComplete++; + if (wc.status != IBV_WC_SUCCESS) { + return {ERR_FATAL, "Transfer %d: Received unsuccessful work completion", rss.transferIdx}; + } + } else if (nc < 0) { + return {ERR_FATAL, "Transfer %d: Received negative work completion", rss.transferIdx}; + } + } + } while (++subIteration != cfg.general.numSubIterations); + + auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart; + double deltaMsec = std::chrono::duration_cast>(cpuDelta).count() * 1000.0; + + if (iteration >= 0) { + rss.totalDurationMsec += deltaMsec; + if (cfg.general.recordPerIteration) + rss.perIterMsec.push_back(deltaMsec); + } + return ERR_NONE; + } + + // Execution of a single NIC executor + static ErrResult RunNicExecutor(int const iteration, + ConfigOptions const& cfg, + int const exeIndex, + ExeInfo& exeInfo) + { + vector> asyncTransfers; + + auto cpuStart = std::chrono::high_resolution_clock::now(); + for (int i = 0; i < exeInfo.resources.size(); i++) { + asyncTransfers.emplace_back(std::async(std::launch::async, + ExecuteNicTransfer, + iteration, + std::cref(cfg), + exeIndex, + std::ref(exeInfo.resources[i]))); + } + for (auto& asyncTransfer : asyncTransfers) + ERR_CHECK(asyncTransfer.get()); + + auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart; + double deltaMsec = std::chrono::duration_cast>(cpuDelta).count() * 1000.0; + + if (iteration >= 0) + exeInfo.totalDurationMsec += deltaMsec; + + return ERR_NONE; + } +#endif // GFX Executor-related functions //======================================================================================== @@ -1789,7 +2812,7 @@ namespace { if (numSrcs == 0) val = MemsetVal(); size_t const loop3Stride = nTeams * nWaves * warpSize; - for( size_t idx = numFloat4 * 4 + (teamIdx * teamStride2 + waveIdx * waveStride2) * warpSize + tIdx; idx < p.N; idx += loop3Stride) { + for ( size_t idx = numFloat4 * 4 + (teamIdx * teamStride2 + waveIdx * waveStride2) * warpSize + tIdx; idx < p.N; idx += loop3Stride) { if (numSrcs) { val = p.src[0][idx]; for (int s = 1; s < numSrcs; s++) @@ -1848,11 +2871,11 @@ namespace { hipEvent_t const stopEvent, int const xccDim, ConfigOptions const& cfg, - TransferResources& resources) + TransferResources& rss) { auto cpuStart = std::chrono::high_resolution_clock::now(); - int numSubExecs = resources.subExecParamCpu.size(); + int numSubExecs = rss.subExecParamCpu.size(); dim3 const gridSize(xccDim, numSubExecs, 1); dim3 const blockSize(cfg.gfx.blockSize, 1); @@ -1862,13 +2885,13 @@ namespace { GpuKernelTable[cfg.gfx.blockSize/64 - 1][cfg.gfx.unrollFactor - 1] <<>> - (resources.subExecParamGpuPtr, cfg.gfx.waveOrder, cfg.general.numSubIterations); + (rss.subExecParamGpuPtr, cfg.gfx.waveOrder, cfg.general.numSubIterations); if (stopEvent != NULL) ERR_CHECK(hipEventRecord(stopEvent, stream)); #else hipExtLaunchKernelGGL(GpuKernelTable[cfg.gfx.blockSize/64 - 1][cfg.gfx.unrollFactor - 1], gridSize, blockSize, 0, stream, startEvent, stopEvent, - 0, resources.subExecParamGpuPtr, cfg.gfx.waveOrder, cfg.general.numSubIterations); + 0, rss.subExecParamGpuPtr, cfg.gfx.waveOrder, cfg.general.numSubIterations); #endif ERR_CHECK(hipStreamSynchronize(stream)); @@ -1883,15 +2906,15 @@ namespace { ERR_CHECK(hipEventElapsedTime(&gpuDeltaMsec, startEvent, stopEvent)); deltaMsec = gpuDeltaMsec; } - resources.totalDurationMsec += deltaMsec; + rss.totalDurationMsec += deltaMsec; if (cfg.general.recordPerIteration) { - resources.perIterMsec.push_back(deltaMsec); + rss.perIterMsec.push_back(deltaMsec); std::set> CUs; for (int i = 0; i < numSubExecs; i++) { - CUs.insert(std::make_pair(resources.subExecParamGpuPtr[i].xccId, - GetId(resources.subExecParamGpuPtr[i].hwId))); + CUs.insert(std::make_pair(rss.subExecParamGpuPtr[i].xccId, + GetId(rss.subExecParamGpuPtr[i].hwId))); } - resources.perIterCUs.push_back(CUs); + rss.perIterCUs.push_back(CUs); } } return ERR_NONE; @@ -1965,12 +2988,12 @@ namespace { // Determine timing for each of the individual transfers that were part of this launch if (!cfg.gfx.useMultiStream) { for (int i = 0; i < exeInfo.resources.size(); i++) { - TransferResources& resources = exeInfo.resources[i]; + TransferResources& rss = exeInfo.resources[i]; long long minStartCycle = std::numeric_limits::max(); long long maxStopCycle = std::numeric_limits::min(); std::set> CUs; - for (auto subExecIdx : resources.subExecIdx) { + for (auto subExecIdx : rss.subExecIdx) { minStartCycle = std::min(minStartCycle, exeInfo.subExecParamGpu[subExecIdx].startCycle); maxStopCycle = std::max(maxStopCycle, exeInfo.subExecParamGpu[subExecIdx].stopCycle); if (cfg.general.recordPerIteration) { @@ -1980,10 +3003,10 @@ namespace { } double deltaMsec = (maxStopCycle - minStartCycle) / (double)(exeInfo.wallClockRate); - resources.totalDurationMsec += deltaMsec; + rss.totalDurationMsec += deltaMsec; if (cfg.general.recordPerIteration) { - resources.perIterMsec.push_back(deltaMsec); - resources.perIterCUs.push_back(CUs); + rss.perIterMsec.push_back(deltaMsec); + rss.perIterCUs.push_back(CUs); } } } @@ -1991,7 +3014,6 @@ namespace { return ERR_NONE; } - // DMA Executor-related functions //======================================================================================== @@ -2106,6 +3128,9 @@ namespace { case EXE_CPU: return RunCpuExecutor(iteration, cfg, exeDevice.exeIndex, exeInfo); case EXE_GPU_GFX: return RunGpuExecutor(iteration, cfg, exeDevice.exeIndex, exeInfo); case EXE_GPU_DMA: return RunDmaExecutor(iteration, cfg, exeDevice.exeIndex, exeInfo); +#ifdef NIC_EXEC_ENABLED + case EXE_NIC: return RunNicExecutor(iteration, cfg, exeDevice.exeIndex, exeInfo); +#endif default: return {ERR_FATAL, "Unsupported executor (%d)", exeDevice.exeType}; } } @@ -2182,16 +3207,17 @@ namespace { std::map executorMap; for (int i = 0; i < transfers.size(); i++) { Transfer const& t = transfers[i]; - - ExeInfo& exeInfo = executorMap[t.exeDevice]; - exeInfo.totalBytes += t.numBytes; - exeInfo.totalSubExecs += t.numSubExecs; - exeInfo.useSubIndices |= (t.exeSubIndex != -1); + ExeDevice exeDevice; + ERR_APPEND(GetActualExecutor(cfg, t.exeDevice, exeDevice), errResults); TransferResources resource = {}; resource.transferIdx = i; - exeInfo.resources.push_back(resource); + ExeInfo& exeInfo = executorMap[exeDevice]; + exeInfo.totalBytes += t.numBytes; + exeInfo.totalSubExecs += t.numSubExecs; + exeInfo.useSubIndices |= (t.exeSubIndex != -1 || (t.exeDevice.exeType == EXE_GPU_GFX && !cfg.gfx.prefXccTable.empty())); + exeInfo.resources.push_back(resource); minNumSrcs = std::min(minNumSrcs, (int)t.srcs.size()); maxNumSrcs = std::max(maxNumSrcs, (int)t.srcs.size()); maxNumBytes = std::max(maxNumBytes, t.numBytes); @@ -2322,8 +3348,8 @@ namespace { results.tfrResults.resize(transfers.size()); results.numTimedIterations = numTimedIterations; results.totalBytesTransferred = 0; - results.avgTotalDurationMsec = (totalCpuTimeSec * 1000.0) / numTimedIterations; - results.overheadMsec = 0.0; + results.avgTotalDurationMsec = (totalCpuTimeSec * 1000.0) / (numTimedIterations * cfg.general.numSubIterations); + results.overheadMsec = results.avgTotalDurationMsec; for (auto& exeInfoPair : executorMap) { ExeDevice const& exeDevice = exeInfoPair.first; ExeInfo& exeInfo = exeInfoPair.second; @@ -2331,25 +3357,32 @@ namespace { // Copy over executor results ExeResult& exeResult = results.exeResults[exeDevice]; exeResult.numBytes = exeInfo.totalBytes; - exeResult.avgDurationMsec = exeInfo.totalDurationMsec / numTimedIterations; + exeResult.avgDurationMsec = exeInfo.totalDurationMsec / (numTimedIterations * cfg.general.numSubIterations); exeResult.avgBandwidthGbPerSec = (exeResult.numBytes / 1.0e6) / exeResult.avgDurationMsec; exeResult.sumBandwidthGbPerSec = 0.0; exeResult.transferIdx.clear(); results.totalBytesTransferred += exeInfo.totalBytes; - results.overheadMsec = std::max(results.overheadMsec, (results.avgTotalDurationMsec - + results.overheadMsec = std::min(results.overheadMsec, (results.avgTotalDurationMsec - exeResult.avgDurationMsec)); // Copy over transfer results - for (auto const& resources : exeInfo.resources) { - int const transferIdx = resources.transferIdx; - TransferResult& tfrResult = results.tfrResults[transferIdx]; + for (auto const& rss : exeInfo.resources) { + int const transferIdx = rss.transferIdx; exeResult.transferIdx.push_back(transferIdx); - tfrResult.numBytes = resources.numBytes; - tfrResult.avgDurationMsec = resources.totalDurationMsec / numTimedIterations; - tfrResult.avgBandwidthGbPerSec = (resources.numBytes / 1.0e6) / tfrResult.avgDurationMsec; + + TransferResult& tfrResult = results.tfrResults[transferIdx]; + tfrResult.exeDevice = exeDevice; +#ifdef NIC_EXEC_ENABLED + tfrResult.exeDstDevice = {exeDevice.exeType, rss.dstNicIndex}; +#else + tfrResult.exeDstDevice = exeDevice; +#endif + tfrResult.numBytes = rss.numBytes; + tfrResult.avgDurationMsec = rss.totalDurationMsec / numTimedIterations; + tfrResult.avgBandwidthGbPerSec = (rss.numBytes / 1.0e6) / tfrResult.avgDurationMsec; if (cfg.general.recordPerIteration) { - tfrResult.perIterMsec = resources.perIterMsec; - tfrResult.perIterCUs = resources.perIterCUs; + tfrResult.perIterMsec = rss.perIterMsec; + tfrResult.perIterCUs = rss.perIterCUs; } exeResult.sumBandwidthGbPerSec += tfrResult.avgBandwidthGbPerSec; } @@ -2390,7 +3423,7 @@ namespace { { // Replace any round brackets or '->' with spaces, for (int i = 1; line[i]; i++) - if (line[i] == '(' || line[i] == ')' || line[i] == '-' || line[i] == '>' ) line[i] = ' '; + if (line[i] == '(' || line[i] == ')' || line[i] == '-' || line[i] == ':' || line[i] == '>' ) line[i] = ' '; transfers.clear(); @@ -2424,17 +3457,18 @@ namespace { transfer.numSubExecs = numSubExecs; if (iss.fail()) { return {ERR_FATAL, - "Parsing error: Unable to read valid Transfer %d (SRC EXE DST) triplet", i+1}; + "Parsing error: Unable to read valid Transfer %d (SRC EXE DST) triplet", i+1}; } + transfer.numBytes = 0; } else { iss >> srcStr >> exeStr >> dstStr >> transfer.numSubExecs >> numBytesToken; if (iss.fail()) { return {ERR_FATAL, - "Parsing error: Unable to read valid Transfer %d (SRC EXE DST $CU #Bytes) tuple", i+1}; + "Parsing error: Unable to read valid Transfer %d (SRC EXE DST $CU #Bytes) tuple", i+1}; } if (sscanf(numBytesToken.c_str(), "%lu", &transfer.numBytes) != 1) { return {ERR_FATAL, - "Parsing error: Unable to read valid Transfer %d (SRC EXE DST #CU #Bytes) tuple", i+1}; + "Parsing error: Unable to read valid Transfer %d (SRC EXE DST #CU #Bytes) tuple", i+1}; } char units = numBytesToken.back(); @@ -2448,7 +3482,6 @@ namespace { ERR_CHECK(ParseMemType(srcStr, transfer.srcs)); ERR_CHECK(ParseMemType(dstStr, transfer.dsts)); ERR_CHECK(ParseExeType(exeStr, transfer.exeDevice, transfer.exeSubIndex)); - transfers.push_back(transfer); } return ERR_NONE; @@ -2466,6 +3499,12 @@ namespace { if (status != hipSuccess) numDetectedGpus = 0; return numDetectedGpus; } +#ifdef NIC_EXEC_ENABLED + case EXE_NIC: case EXE_NIC_NEAREST: + { + return GetIbvDeviceList().size(); + } +#endif default: return 0; } @@ -2572,6 +3611,102 @@ namespace { #endif } + int GetClosestCpuNumaToNic(int nicIndex) + { +#ifdef NIC_EXEC_ENABLED + int numNics = GetNumExecutors(EXE_NIC); + if (nicIndex < 0 || nicIndex >= numNics) return -1; + return GetIbvDeviceList()[nicIndex].numaNode; +#else + return -1; +#endif + } + + + int GetClosestNicToGpu(int gpuIndex) + { +#ifdef NIC_EXEC_ENABLED + static bool isInitialized = false; + static std::vector closestNicId; + + int numGpus = GetNumExecutors(EXE_GPU_GFX); + if (gpuIndex < 0 || gpuIndex >= numGpus) return -1; + + // Build closest NICs per GPU on first use + if (!isInitialized) { + closestNicId.resize(numGpus, -1); + + // Build up list of NIC bus addresses + std::vector ibvAddressList; + auto const& ibvDeviceList = GetIbvDeviceList(); + for (auto const& ibvDevice : ibvDeviceList) + ibvAddressList.push_back(ibvDevice.hasActivePort ? ibvDevice.busId : ""); + + // Track how many times a device has been assigned as "closest" + // This allows distributed work across devices using multiple ports (sharing the same busID) + // NOTE: This isn't necessarily optimal, but likely to work in most cases involving multi-port + // Counter example: + // + // G0 prefers (N0,N1), picks N0 + // G1 prefers (N1,N2), picks N1 + // G2 prefers N0, picks N0 + // + // instead of G0->N1, G1->N2, G2->N0 + + std::vector assignedCount(ibvDeviceList.size(), 0); + + // Loop over each GPU to find the closest NIC(s) based on PCIe address + for (int i = 0; i < numGpus; i++) { + // Collect PCIe address for the GPU + char hipPciBusId[64]; + hipError_t err = hipDeviceGetPCIBusId(hipPciBusId, sizeof(hipPciBusId), i); + if (err != hipSuccess) { +#ifdef VERBS_DEBUG + printf("Failed to get PCI Bus ID for HIP device %d: %s\n", i, hipGetErrorString(err)); +#endif + closestNicId[i] = -1; + continue; + } + + // Find closest NICs + std::set closestNicIdxs = GetNearestDevicesInTree(hipPciBusId, ibvAddressList); + + // Pick the least-used NIC to assign as closest + int closestIdx = -1; + for (auto idx : closestNicIdxs) { + if (closestIdx == -1 || assignedCount[idx] < assignedCount[closestIdx]) + closestIdx = idx; + } + + // The following will only use distance between bus IDs + // to determine the closest NIC to GPU if the PCIe tree approach fails + if (closestIdx < 0) { +#ifdef VERBS_DEBUG + printf("[WARN] Falling back to PCIe bus ID distance to determine proximity\n"); +#endif + + int minDistance = std::numeric_limits::max(); + for (int j = 0; j < ibvDeviceList.size(); j++) { + if (ibvDeviceList[j].busId != "") { + int distance = GetBusIdDistance(hipPciBusId, ibvDeviceList[j].busId); + if (distance < minDistance && distance >= 0) { + minDistance = distance; + closestIdx = j; + } + } + } + } + closestNicId[i] = closestIdx; + if (closestIdx != -1) assignedCount[closestIdx]++; + } + isInitialized = true; + } + return closestNicId[gpuIndex]; +#else + return -1; +#endif + } + // Undefine CUDA compatibility macros #if defined(__NVCC__)