Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove 'sample' parameter from stats::mean API #2389

Open
wants to merge 19 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cpp/include/raft/core/math.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ constexpr RAFT_INLINE_FUNCTION auto abs(T x)
!std::is_same_v<long long int, T>,
T>
{
return x < T{0} ? -x : x;
if constexpr (std::is_unsigned_v<T>) {
return x;
} else {
return x < T{0} ? -x : x;
}
}
#if defined(_RAFT_HAS_CUDA)
template <typename T>
Expand Down
218 changes: 201 additions & 17 deletions cpp/include/raft/linalg/detail/coalesced_reduction-inl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ struct ReductionThinPolicy {
static constexpr bool NoSequentialReduce = noLoop;
};

template <typename Type>
DI void KahanBabushkaNeumaierSum(Type& sum, Type& compensation, const Type& cur_value)
{
const Type t = sum + cur_value;
if (abs(sum) >= abs(cur_value)) {
compensation += (sum - t) + cur_value;
} else {
compensation += (cur_value - t) + sum;
}
sum = t;
}

template <typename Policy,
typename InType,
typename OutType,
Expand Down Expand Up @@ -130,6 +142,99 @@ RAFT_KERNEL __launch_bounds__(Policy::ThreadsPerBlock)
}
}

template <typename Policy,
typename InType,
typename OutType,
typename IdxType,
typename MainLambda,
typename FinalLambda>
RAFT_KERNEL __launch_bounds__(Policy::ThreadsPerBlock) coalescedSumThinKernel(OutType* dots,
const InType* data,
IdxType D,
IdxType N,
OutType init,
MainLambda main_op,
FinalLambda final_op,
bool inplace = false)
{
/* The strategy to achieve near-SOL memory bandwidth differs based on D:
* - For small D, we need to process multiple rows per logical warp in order to have
* multiple loads per thread and increase bytes in flight and amortize latencies.
* - For large D, we start with a sequential reduction. The compiler partially unrolls
* that loop (e.g. first a loop of stride 16, then 8, 4, and 1).
*/
IdxType i0 = threadIdx.y + (Policy::RowsPerBlock * static_cast<IdxType>(blockIdx.x));
if (i0 >= N) return;

OutType acc[Policy::RowsPerLogicalWarp];
OutType thread_c[Policy::RowsPerLogicalWarp];

#pragma unroll
for (int k = 0; k < Policy::RowsPerLogicalWarp; k++) {
acc[k] = init;
thread_c[k] = 0;
}

if constexpr (Policy::NoSequentialReduce) {
IdxType j = threadIdx.x;
if (j < D) {
#pragma unroll
for (IdxType k = 0; k < Policy::RowsPerLogicalWarp; k++) {
// Only the first row is known to be within bounds. Clamp to avoid out-of-mem read.
const IdxType i = raft::min(i0 + k * Policy::NumLogicalWarps, N - 1);
// acc[k] = reduce_op(acc[k], main_op(data[j + (D * i)], j));
KahanBabushkaNeumaierSum<OutType>(acc[k], thread_c[k], main_op(data[j + (D * i)], j));
}
}
} else {
for (IdxType j = threadIdx.x; j < D; j += Policy::LogicalWarpSize) {
#pragma unroll
for (IdxType k = 0; k < Policy::RowsPerLogicalWarp; k++) {
const IdxType i = raft::min(i0 + k * Policy::NumLogicalWarps, N - 1);
// acc[k] = reduce_op(acc[k], main_op(data[j + (D * i)], j));
KahanBabushkaNeumaierSum<OutType>(acc[k], thread_c[k], main_op(data[j + (D * i)], j));
}
}
}

/* This vector reduction has two benefits compared to naive separate reductions:
* - It avoids the LSU bottleneck when the number of columns is around 32 (e.g. for 32, 5 shuffles
* are required and there is no initial sequential reduction to amortize that cost).
* - It distributes the outputs to multiple threads, enabling a coalesced store when the number of
* rows per logical warp and logical warp size are equal.
*/
raft::logicalWarpReduceVector<Policy::LogicalWarpSize, Policy::RowsPerLogicalWarp>(
acc, threadIdx.x, raft::add_op());

raft::logicalWarpReduceVector<Policy::LogicalWarpSize, Policy::RowsPerLogicalWarp>(
thread_c, threadIdx.x, raft::add_op());

constexpr int reducOutVecWidth =
std::max(1, Policy::RowsPerLogicalWarp / Policy::LogicalWarpSize);
constexpr int reducOutGroupSize =
std::max(1, Policy::LogicalWarpSize / Policy::RowsPerLogicalWarp);
constexpr int reducNumGroups = Policy::LogicalWarpSize / reducOutGroupSize;

if (threadIdx.x % reducOutGroupSize == 0) {
const int groupId = threadIdx.x / reducOutGroupSize;
if (inplace) {
#pragma unroll
for (int k = 0; k < reducOutVecWidth; k++) {
const int reductionId = k * reducNumGroups + groupId;
const IdxType i = i0 + reductionId * Policy::NumLogicalWarps;
if (i < N) { dots[i] = final_op(dots[i] + acc[k] + thread_c[k]); }
}
} else {
#pragma unroll
for (int k = 0; k < reducOutVecWidth; k++) {
const int reductionId = k * reducNumGroups + groupId;
const IdxType i = i0 + reductionId * Policy::NumLogicalWarps;
if (i < N) { dots[i] = final_op(acc[k] + thread_c[k]); }
}
}
}
}

template <typename Policy,
typename InType,
typename OutType = InType,
Expand All @@ -156,8 +261,13 @@ void coalescedReductionThin(OutType* dots,
static_cast<int>(Policy::NoSequentialReduce));
dim3 threads(Policy::LogicalWarpSize, Policy::NumLogicalWarps, 1);
dim3 blocks(ceildiv<IdxType>(N, Policy::RowsPerBlock), 1, 1);
coalescedReductionThinKernel<Policy>
<<<blocks, threads, 0, stream>>>(dots, data, D, N, init, main_op, reduce_op, final_op, inplace);
if constexpr (std::is_same_v<ReduceLambda, raft::add_op>) {
coalescedSumThinKernel<Policy>
<<<blocks, threads, 0, stream>>>(dots, data, D, N, init, main_op, final_op, inplace);
} else {
coalescedReductionThinKernel<Policy><<<blocks, threads, 0, stream>>>(
dots, data, D, N, init, main_op, reduce_op, final_op, inplace);
}
RAFT_CUDA_TRY(cudaPeekAtLastError());
}

Expand Down Expand Up @@ -240,6 +350,44 @@ RAFT_KERNEL __launch_bounds__(TPB) coalescedReductionMediumKernel(OutType* dots,
}
}

template <int TPB,
typename InType,
typename OutType,
typename IdxType,
typename MainLambda,
typename FinalLambda>
RAFT_KERNEL __launch_bounds__(TPB) coalescedSumMediumKernel(OutType* dots,
const InType* data,
IdxType D,
IdxType N,
OutType init,
MainLambda main_op,
FinalLambda final_op,
bool inplace = false)
{
typedef cub::BlockReduce<OutType, TPB, cub::BLOCK_REDUCE_RAKING> BlockReduce;
__shared__ typename BlockReduce::TempStorage temp_storage1;
__shared__ typename BlockReduce::TempStorage temp_storage2;
OutType thread_data = init;
OutType thread_c = (OutType)0;

IdxType rowStart = blockIdx.x * D;
for (IdxType i = threadIdx.x; i < D; i += TPB) {
IdxType idx = rowStart + i;
KahanBabushkaNeumaierSum<OutType>(thread_data, thread_c, main_op(data[idx], i));
}
OutType block_acc = BlockReduce(temp_storage1).Sum(thread_data);
OutType block_c = BlockReduce(temp_storage2).Sum(thread_c);

if (threadIdx.x == 0) {
if (inplace) {
dots[blockIdx.x] = final_op(dots[blockIdx.x] + block_acc + block_c);
} else {
dots[blockIdx.x] = final_op(block_acc + block_c);
}
}
}

template <int TPB,
typename InType,
typename OutType = InType,
Expand All @@ -259,8 +407,13 @@ void coalescedReductionMedium(OutType* dots,
FinalLambda final_op = raft::identity_op())
{
common::nvtx::range<common::nvtx::domain::raft> fun_scope("coalescedReductionMedium<%d>", TPB);
coalescedReductionMediumKernel<TPB>
<<<N, TPB, 0, stream>>>(dots, data, D, N, init, main_op, reduce_op, final_op, inplace);
if constexpr (std::is_same_v<ReduceLambda, raft::add_op>) {
coalescedSumMediumKernel<TPB>
<<<N, TPB, 0, stream>>>(dots, data, D, N, init, main_op, final_op, inplace);
} else {
coalescedReductionMediumKernel<TPB>
<<<N, TPB, 0, stream>>>(dots, data, D, N, init, main_op, reduce_op, final_op, inplace);
}
RAFT_CUDA_TRY(cudaPeekAtLastError());
}

Expand Down Expand Up @@ -322,6 +475,32 @@ RAFT_KERNEL __launch_bounds__(Policy::ThreadsPerBlock)
if (threadIdx.x == 0) { buffer[Policy::BlocksPerRow * blockIdx.x + blockIdx.y] = acc; }
}

template <typename Policy, typename InType, typename OutType, typename IdxType, typename MainLambda>
RAFT_KERNEL __launch_bounds__(Policy::ThreadsPerBlock) coalescedSumThickKernel(
OutType* buffer, const InType* data, IdxType D, IdxType N, OutType init, MainLambda main_op)
{
typedef cub::BlockReduce<OutType, Policy::ThreadsPerBlock, cub::BLOCK_REDUCE_RAKING> BlockReduce;
__shared__ typename BlockReduce::TempStorage temp_storage1;
__shared__ typename BlockReduce::TempStorage temp_storage2;

OutType thread_data = init;
OutType thread_c = (OutType)0;

IdxType rowStart = blockIdx.x * D;
for (IdxType i = blockIdx.y * Policy::ThreadsPerBlock + threadIdx.x; i < D;
i += Policy::BlockStride) {
IdxType idx = rowStart + i;
KahanBabushkaNeumaierSum<OutType>(thread_data, thread_c, main_op(data[idx], i));
}

OutType block_acc = BlockReduce(temp_storage1).Sum(thread_data);
OutType block_c = BlockReduce(temp_storage2).Sum(thread_c);

if (threadIdx.x == 0) {
buffer[Policy::BlocksPerRow * blockIdx.x + blockIdx.y] = block_acc + block_c;
}
}

template <typename ThickPolicy,
typename ThinPolicy,
typename InType,
Expand Down Expand Up @@ -355,9 +534,13 @@ void coalescedReductionThick(OutType* dots,
* 2. coalescedReductionThinKernel reduces [N x BlocksPerRow] to [N x 1]. It doesn't apply any
* main_op but applies final_op. If in-place, the existing and new values are reduced.
*/

coalescedReductionThickKernel<ThickPolicy>
<<<blocks, threads, 0, stream>>>(buffer.data(), data, D, N, init, main_op, reduce_op);
if constexpr (std::is_same_v<ReduceLambda, raft::add_op>) {
coalescedSumThickKernel<ThickPolicy>
<<<blocks, threads, 0, stream>>>(buffer.data(), data, D, N, init, main_op);
} else {
coalescedReductionThickKernel<ThickPolicy>
<<<blocks, threads, 0, stream>>>(buffer.data(), data, D, N, init, main_op, reduce_op);
}
RAFT_CUDA_TRY(cudaPeekAtLastError());

coalescedReductionThin<ThinPolicy>(dots,
Expand Down Expand Up @@ -391,18 +574,16 @@ void coalescedReductionThickDispatcher(OutType* dots,
{
// Note: multiple elements per thread to take advantage of the sequential reduction and loop
// unrolling
if (D < IdxType(32768)) {
coalescedReductionThick<ReductionThickPolicy<256, 32>, ReductionThinPolicy<32, 128, 1>>(
dots, data, D, N, init, stream, inplace, main_op, reduce_op, final_op);
} else {
coalescedReductionThick<ReductionThickPolicy<256, 64>, ReductionThinPolicy<32, 128, 1>>(
dots, data, D, N, init, stream, inplace, main_op, reduce_op, final_op);
}
coalescedReductionThick<ReductionThickPolicy<256, 64>, ReductionThinPolicy<32, 128, 1>>(
dots, data, D, N, init, stream, inplace, main_op, reduce_op, final_op);
}

// Primitive to perform reductions along the coalesced dimension of the matrix, i.e. reduce along
// rows for row major or reduce along columns for column major layout. Can do an inplace reduction
// adding to original values of dots if requested.
// In case of an add-reduction, a compensated summation will be performed in order to reduce
// numerical error. Note that the compensation will only be performed 'per-thread' for performance
// reasons and therefore not be equivalent to a sequential compensation.
template <typename InType,
typename OutType = InType,
typename IdxType = int,
Expand All @@ -422,14 +603,17 @@ void coalescedReduction(OutType* dots,
{
/* The primitive selects one of three implementations based on heuristics:
* - Thin: very efficient when D is small and/or N is large
* At most one warp is processing each row
* - Thick: used when N is very small and D very large
* - Medium: used when N is too small to fill the GPU with the thin kernel
* Multiple blocks (32/64) processing each row
* - Medium: everything in between
* One block is processing each row
*/
const IdxType numSMs = raft::getMultiProcessorCount();
if (D <= IdxType(256) || N >= IdxType(4) * numSMs) {
if (D <= IdxType(512) || (N >= IdxType(16) * numSMs && D < IdxType(2048))) {
coalescedReductionThinDispatcher(
dots, data, D, N, init, stream, inplace, main_op, reduce_op, final_op);
} else if (N < numSMs && D >= IdxType(16384)) {
} else if (N < numSMs && D >= IdxType(1 << 17)) {
coalescedReductionThickDispatcher(
dots, data, D, N, init, stream, inplace, main_op, reduce_op, final_op);
} else {
Expand Down
Loading
Loading