Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward-merge branch-23.12 to branch-24.02 #5672

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cpp/include/cuml/linear_model/qn_mg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,37 @@ void qnFit(raft::handle_t& handle,
float* f,
int* num_iters);

/**
* @brief support sparse vectors (Compressed Sparse Row format) for MNMG logistic regression fit
* using quasi newton methods
* @param[in] handle: the internal cuml handle object
* @param[in] input_values: vector holding non-zero values of all partitions for that rank
* @param[in] input_cols: vector holding column indices of non-zero values of all partitions for
* that rank
* @param[in] input_row_ids: vector holding row pointers of non-zero values of all partitions for
* that rank
* @param[in] X_nnz: the number of non-zero values of that rank
* @param[in] input_desc: PartDescriptor object for the input
* @param[in] labels: labels data
* @param[out] coef: learned coefficients
* @param[in] pams: model parameters
* @param[in] n_classes: number of outputs (number of classes or `1` for regression)
* @param[out] f: host pointer holding the final objective value
* @param[out] num_iters: host pointer holding the actual number of iterations taken
*/
void qnFitSparse(raft::handle_t& handle,
std::vector<Matrix::Data<float>*>& input_values,
int* input_cols,
int* input_row_ids,
int X_nnz,
Matrix::PartDescriptor& input_desc,
std::vector<Matrix::Data<float>*>& labels,
float* coef,
const qn_params& pams,
int n_classes,
float* f,
int* num_iters);

}; // namespace opg
}; // namespace GLM
}; // namespace ML
21 changes: 14 additions & 7 deletions cpp/src/glm/qn/mg/glm_base_mg.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <raft/core/comms.hpp>
#include <raft/core/handle.hpp>
#include <raft/linalg/add.cuh>
#include <raft/linalg/multiply.cuh>
#include <raft/util/cudart_utils.hpp>

Expand Down Expand Up @@ -112,34 +113,42 @@ struct GLMWithDataMG : ML::GLM::detail::GLMWithData<T, GLMObjective> {
T* dev_scalar,
cudaStream_t stream)
{
raft::comms::comms_t const& communicator = raft::resource::get_comms(*(this->handle_p));
SimpleDenseMat<T> W(wFlat.data, this->C, this->dims);
SimpleDenseMat<T> G(gradFlat.data, this->C, this->dims);
SimpleVec<T> lossVal(dev_scalar, 1);

// Ensure the same coefficients on all GPU
communicator.bcast(wFlat.data, this->C * this->dims, 0, stream);
communicator.sync_stream(stream);

// apply regularization
auto regularizer_obj = this->objective;
auto lossFunc = regularizer_obj->loss;
auto reg = regularizer_obj->reg;
G.fill(0, stream);
float reg_host = 0;
T reg_host = 0;
if (reg->l2_penalty != 0) {
reg->reg_grad(dev_scalar, G, W, lossFunc->fit_intercept, stream);
raft::update_host(&reg_host, dev_scalar, 1, stream);
// note: avoid syncing here because there's a sync before reg_host is used.
raft::resource::sync_stream(*(this->handle_p));
}

// apply linearFwd, getLossAndDz, linearBwd
ML::GLM::detail::linearFwd(
lossFunc->handle, *(this->Z), *(this->X), W); // linear part: forward pass

raft::comms::comms_t const& communicator = raft::resource::get_comms(*(this->handle_p));

lossFunc->getLossAndDZ(dev_scalar, *(this->Z), *(this->y), stream); // loss specific part

// normalize local loss before allreduce sum
T factor = 1.0 * (*this->y).len / this->n_samples;
raft::linalg::multiplyScalar(dev_scalar, dev_scalar, factor, 1, stream);

// GPUs calculates reg_host independently and may get values that show tiny divergence.
// Take the averaged reg_host to avoid the divergence.
T reg_factor = reg_host / this->n_ranks;
raft::linalg::addScalar(dev_scalar, dev_scalar, reg_factor, 1, stream);

communicator.allreduce(dev_scalar, dev_scalar, 1, raft::comms::op_t::SUM, stream);
communicator.sync_stream(stream);

Expand All @@ -154,11 +163,9 @@ struct GLMWithDataMG : ML::GLM::detail::GLMWithData<T, GLMObjective> {
communicator.allreduce(G.data, G.data, this->C * this->dims, raft::comms::op_t::SUM, stream);
communicator.sync_stream(stream);

float loss_host;
T loss_host;
raft::update_host(&loss_host, dev_scalar, 1, stream);
raft::resource::sync_stream(*(this->handle_p));
loss_host += reg_host;
lossVal.fill(loss_host + reg_host, stream);

return loss_host;
}
Expand Down
73 changes: 73 additions & 0 deletions cpp/src/glm/qn_mg.cu
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include <vector>
using namespace MLCommon;

#include <iostream>

namespace ML {
namespace GLM {
namespace opg {
Expand Down Expand Up @@ -172,6 +174,77 @@ void qnFit(raft::handle_t& handle,
handle, input_data, input_desc, labels, coef, pams, X_col_major, n_classes, f, num_iters);
}

template <typename T, typename I>
void qnFitSparse_impl(const raft::handle_t& handle,
const qn_params& pams,
T* X_values,
I* X_cols,
I* X_row_ids,
I X_nnz,
T* y,
size_t N,
size_t D,
size_t C,
T* w0,
T* f,
int* num_iters,
size_t n_samples,
int rank,
int n_ranks)
{
auto X_simple = SimpleSparseMat<T>(X_values, X_cols, X_row_ids, X_nnz, N, D);

ML::GLM::opg::qn_fit_x_mg(handle,
pams,
X_simple,
y,
C,
w0,
f,
num_iters,
n_samples,
rank,
n_ranks); // ignore sample_weight, svr_eps
return;
}

void qnFitSparse(raft::handle_t& handle,
std::vector<Matrix::Data<float>*>& input_values,
int* input_cols,
int* input_row_ids,
int X_nnz,
Matrix::PartDescriptor& input_desc,
std::vector<Matrix::Data<float>*>& labels,
float* coef,
const qn_params& pams,
int n_classes,
float* f,
int* num_iters)
{
RAFT_EXPECTS(input_values.size() == 1,
"qn_mg.cu currently does not accept more than one input matrix");

auto data_input_values = input_values[0];
auto data_y = labels[0];

qnFitSparse_impl<float, int>(handle,
pams,
data_input_values->ptr,
input_cols,
input_row_ids,
X_nnz,
data_y->ptr,
input_desc.totalElementsOwnedBy(input_desc.rank),
input_desc.N,
n_classes,
coef,
f,
num_iters,
input_desc.M,
input_desc.rank,
input_desc.uniqueRanks().size());
}

}; // namespace opg
}; // namespace GLM
}; // namespace ML
18 changes: 17 additions & 1 deletion python/cuml/dask/linear_model/logistic_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from raft_dask.common.comms import get_raft_comm_state
from dask.distributed import get_worker

from cuml.common.sparse_utils import is_sparse, has_scipy
from cuml.dask.common import parts_to_ranks
from cuml.dask.common.input_utils import DistributedDataHandler, concatenate
from raft_dask.common.comms import Comms
Expand All @@ -29,7 +30,9 @@
from cuml.internals.safe_imports import gpu_only_import

cp = gpu_only_import("cupy")
cupyx = gpu_only_import("cupyx")
np = cpu_only_import("numpy")
scipy = cpu_only_import("scipy")


class LogisticRegression(LinearRegression):
Expand Down Expand Up @@ -172,7 +175,20 @@ def _create_model(sessionId, datatype, **kwargs):

@staticmethod
def _func_fit(f, data, n_rows, n_cols, partsToSizes, rank):
inp_X = concatenate([X for X, _ in data])
if is_sparse(data[0][0]) is False:
inp_X = concatenate([X for X, _ in data])

elif has_scipy() and scipy.sparse.isspmatrix(data[0][0]):
inp_X = scipy.sparse.vstack([X for X, _ in data])

elif cupyx.scipy.sparse.isspmatrix(data[0][0]):
inp_X = cupyx.scipy.sparse.vstack([X for X, _ in data])

else:
raise ValueError(
"input matrix must be dense, scipy sparse, or cupy sparse"
)

inp_y = concatenate([y for _, y in data])
n_ranks = max([p[0] for p in partsToSizes]) + 1
aggregated_partsToSizes = [[i, 0] for i in range(n_ranks)]
Expand Down
55 changes: 45 additions & 10 deletions python/cuml/linear_model/base_mg.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ from cuml.common.opg_data_utils_mg cimport *
from cuml.internals.input_utils import input_to_cuml_array
from cuml.decomposition.utils cimport *

from cuml.common.sparse_utils import is_sparse
from cuml.internals.array_sparse import SparseCumlArray


class MGFitMixin(object):

Expand All @@ -45,8 +48,10 @@ class MGFitMixin(object):
:param partsToSizes: array of tuples in the format: [(rank,size)]
:return: self
"""

self._set_output_type(input_data[0][0])
self._set_n_features_in(n_cols)
sparse_input = is_sparse(input_data[0][0])

X_arys = []
y_arys = []
Expand All @@ -57,8 +62,14 @@ class MGFitMixin(object):
else:
check_dtype = self.dtype

X_m, _, self.n_cols, _ = \
input_to_cuml_array(input_data[i][0], check_dtype=check_dtype, order=order)
if sparse_input:

X_m = SparseCumlArray(input_data[i][0], convert_index=np.int32)
_, self.n_cols = X_m.shape
else:
X_m, _, self.n_cols, _ = \
input_to_cuml_array(input_data[i][0], check_dtype=check_dtype, order=order)

X_arys.append(X_m)

if i == 0:
Expand All @@ -81,18 +92,42 @@ class MGFitMixin(object):
rank_to_sizes,
rank)

cdef uintptr_t X_arg = opg.build_data_t(X_arys)
cdef uintptr_t X_arg
cdef uintptr_t y_arg = opg.build_data_t(y_arys)

# call inheriting class _fit that does all cython pointers and calls
self._fit(X=X_arg,
y=y_arg,
coef_ptr=coef_ptr_arg,
input_desc=part_desc)
cdef uintptr_t X_cols
cdef uintptr_t X_row_ids

if sparse_input is False:

X_arg = opg.build_data_t(X_arys)

# call inheriting class _fit that does all cython pointers and calls
self._fit(X=X_arg,
y=y_arg,
coef_ptr=coef_ptr_arg,
input_desc=part_desc)

opg.free_data_t(X_arg, self.dtype)

else:

assert len(X_arys) == 1, "does not support more than one sparse input matrix"
X_arg = opg.build_data_t([x.data for x in X_arys])
X_cols = X_arys[0].indices.ptr
X_row_ids = X_arys[0].indptr.ptr
X_nnz = sum([x.nnz for x in X_arys])

# call inheriting class _fit that does all cython pointers and calls
self._fit(X=[X_arg, X_cols, X_row_ids, X_nnz],
y=y_arg,
coef_ptr=coef_ptr_arg,
input_desc=part_desc)

for ary in X_arys:
del ary

opg.free_rank_size_pair(rank_to_sizes)
opg.free_part_descriptor(part_desc)
opg.free_data_t(X_arg, self.dtype)
opg.free_data_t(y_arg, self.dtype)

return self
Loading