Skip to content

Commit

Permalink
Change rank_count check
Browse files Browse the repository at this point in the history
  • Loading branch information
olegkkruglov committed Aug 28, 2024
1 parent fe90a18 commit 43e79d5
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,21 @@ result_t finalize_compute_kernel_dense_impl<Float>::operator()(const descriptor_
const auto nobs_nd = pr::table2ndarray_1d<Float>(q, input.get_partial_n_rows());

auto rows_count_global = nobs_nd.get_data()[0];
auto is_distributed = (comm_.get_rank_count() > 1);
{
ONEDAL_PROFILER_TASK(allreduce_rows_count_global);
comm_.allreduce(rows_count_global, spmd::reduce_op::sum).wait();
if (is_distributed) {
comm_.allreduce(rows_count_global, spmd::reduce_op::sum).wait();
}
}
if (res_op.test(result_options::min)) {
ONEDAL_ASSERT(input.get_partial_min().get_column_count() == column_count);
const auto min =
pr::table2ndarray_1d<Float>(q, input.get_partial_min(), sycl::usm::alloc::device);

{ comm_.allreduce(min.flatten(q, {}), spmd::reduce_op::min).wait(); }
if (is_distributed) {
comm_.allreduce(min.flatten(q, {}), spmd::reduce_op::min).wait();
}
res.set_min(homogen_table::wrap(min.flatten(q, {}), 1, column_count));
}

Expand All @@ -178,46 +183,46 @@ result_t finalize_compute_kernel_dense_impl<Float>::operator()(const descriptor_
if (res_op_partial.test(result_options::sum)) {
auto sums_nd =
pr::table2ndarray_1d<Float>(q, input.get_partial_sum(), sycl::usm::alloc::device);
if (comm_.get_rank_count() > 1) {
auto sums2_nd = pr::table2ndarray_1d<Float>(q,
input.get_partial_sum_squares(),
sycl::usm::alloc::device);

auto sums2cent_nd = pr::table2ndarray_1d<Float>(q,
input.get_partial_sum_squares_centered(),
sycl::usm::alloc::device);
if (is_distributed) {
auto sums_nd_copy =
pr::ndarray<Float, 1>::empty(q, { column_count }, sycl::usm::alloc::device);
auto copy_event = copy(q, sums_nd_copy, sums_nd, {});
copy_event.wait_and_throw();
sums_nd = sums_nd_copy;
}

{
ONEDAL_PROFILER_TASK(allreduce_sums, q);
comm_.allreduce(sums_nd.flatten(q, {}), spmd::reduce_op::sum).wait();
}
auto sums2_nd = pr::table2ndarray_1d<Float>(q,
input.get_partial_sum_squares(),
sycl::usm::alloc::device);
if (comm_.get_rank_count() > 1) {
{
ONEDAL_PROFILER_TASK(allreduce_sums, q);
comm_.allreduce(sums_nd.flatten(q, {}), spmd::reduce_op::sum).wait();
}

auto sums2_nd_copy =
pr::ndarray<Float, 1>::empty(q, { column_count }, sycl::usm::alloc::device);
auto copy_event = copy(q, sums2_nd_copy, sums2_nd, {});
copy_event = copy(q, sums2_nd_copy, sums2_nd, {});
copy_event.wait_and_throw();
sums2_nd = sums2_nd_copy;
}
{
ONEDAL_PROFILER_TASK(allreduce_sums, q);
comm_.allreduce(sums2_nd.flatten(q, {}), spmd::reduce_op::sum).wait();
}
auto sums2cent_nd = pr::table2ndarray_1d<Float>(q,
input.get_partial_sum_squares_centered(),
sycl::usm::alloc::device);
if (comm_.get_rank_count() > 1) {

{
ONEDAL_PROFILER_TASK(allreduce_sums, q);
comm_.allreduce(sums2_nd.flatten(q, {}), spmd::reduce_op::sum).wait();
}
auto sums2cent_nd_copy =
pr::ndarray<Float, 1>::empty(q, { column_count }, sycl::usm::alloc::device);
auto copy_event = copy(q, sums2cent_nd_copy, sums2cent_nd, {});
copy_event = copy(q, sums2cent_nd_copy, sums2cent_nd, {});
copy_event.wait_and_throw();
sums2cent_nd = sums2cent_nd_copy;
{
ONEDAL_PROFILER_TASK(allreduce_sums, q);
comm_.allreduce(sums2cent_nd.flatten(q, {}), spmd::reduce_op::sum).wait();
}
}
{
ONEDAL_PROFILER_TASK(allreduce_sums, q);
comm_.allreduce(sums2cent_nd.flatten(q, {}), spmd::reduce_op::sum).wait();
}

auto [result_means,
result_variance,
result_raw_moment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,41 +66,38 @@ result_t finalize_compute_kernel_dense_impl<Float>::operator()(const descriptor_

const auto nobs_host = pr::table2ndarray<Float>(q, input.get_partial_n_rows());
auto rows_count_global = nobs_host.get_data()[0];
{
ONEDAL_PROFILER_TASK(allreduce_rows_count_global);
comm_.allreduce(rows_count_global, spmd::reduce_op::sum).wait();
}

ONEDAL_ASSERT(rows_count_global > 0);

auto sums = pr::table2ndarray_1d<Float>(q, input.get_partial_sum(), sycl::usm::alloc::device);
auto xtx =
pr::table2ndarray<Float>(q, input.get_partial_crossproduct(), sycl::usm::alloc::device);

if (comm_.get_rank_count() > 1) {
{
ONEDAL_PROFILER_TASK(allreduce_rows_count_global);
comm_.allreduce(rows_count_global, spmd::reduce_op::sum).wait();
}
auto sums_copy =
pr::ndarray<Float, 1>::empty(q, { column_count }, sycl::usm::alloc::device);
auto copy_event = copy(q, sums_copy, sums, {});
copy_event.wait_and_throw();
sums = sums_copy;
}
{
ONEDAL_PROFILER_TASK(allreduce_sums, q);
comm_.allreduce(sums.flatten(q, {}), spmd::reduce_op::sum).wait();
}
{
ONEDAL_PROFILER_TASK(allreduce_sums, q);
comm_.allreduce(sums.flatten(q, {}), spmd::reduce_op::sum).wait();
}

auto xtx =
pr::table2ndarray<Float>(q, input.get_partial_crossproduct(), sycl::usm::alloc::device);
if (comm_.get_rank_count() > 1) {
auto xtx_copy = pr::ndarray<Float, 2>::empty(q,
{ column_count, column_count },
sycl::usm::alloc::device);
auto copy_event = copy(q, xtx_copy, xtx, {});
copy_event = copy(q, xtx_copy, xtx, {});
copy_event.wait_and_throw();
xtx = xtx_copy;
{
ONEDAL_PROFILER_TASK(allreduce_xtx, q);
comm_.allreduce(xtx.flatten(q, {}), spmd::reduce_op::sum).wait();
}
}

{
ONEDAL_PROFILER_TASK(allreduce_xtx, q);
comm_.allreduce(xtx.flatten(q, {}), spmd::reduce_op::sum).wait();
}
ONEDAL_ASSERT(rows_count_global > 0);

if (desc.get_result_options().test(result_options::cov_matrix)) {
auto [cov, cov_event] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,29 @@ train_result<Task> finalize_train_kernel_norm_eq_impl<Float, Task>::operator()(
const pr::ndshape<2> betas_shape{ response_count, feature_count + 1 };

auto xtx_nd = pr::table2ndarray<Float>(q, input.get_partial_xtx(), sycl::usm::alloc::device);
if (comm_.get_rank_count() > 1) {
auto xtx_nd_copy = pr::ndarray<Float, 2>::empty(q, xtx_shape, sycl::usm::alloc::device);
auto copy_event = copy(q, xtx_nd_copy, xtx_nd, {});
copy_event.wait_and_throw();
xtx_nd = xtx_nd_copy;
}
auto xty_nd = pr::table2ndarray<Float, pr::ndorder::f>(q,
input.get_partial_xty(),
sycl::usm::alloc::device);
if (comm_.get_rank_count() > 1) {
auto xty_nd_copy =
pr::ndarray<Float, 2, pr::ndorder::f>::empty(q, betas_shape, sycl::usm::alloc::device);
auto copy_event = copy(q, xty_nd_copy, xty_nd, {});
copy_event.wait_and_throw();
xty_nd = xty_nd_copy;
}

const auto betas_size = check_mul_overflow(response_count, feature_count + 1);
auto betas_arr = array<Float>::zeros(q, betas_size, alloc);

if (comm_.get_rank_count() > 1) {
auto xtx_nd_copy = pr::ndarray<Float, 2>::empty(q, xtx_shape, sycl::usm::alloc::device);
auto copy_event = copy(q, xtx_nd_copy, xtx_nd, {});
copy_event.wait_and_throw();
xtx_nd = xtx_nd_copy;
{
ONEDAL_PROFILER_TASK(xtx_allreduce);
auto xtx_arr =
dal::array<Float>::wrap(q, xtx_nd.get_mutable_data(), xtx_nd.get_count());
comm_.allreduce(xtx_arr).wait();
}
auto xty_nd_copy =
pr::ndarray<Float, 2, pr::ndorder::f>::empty(q, betas_shape, sycl::usm::alloc::device);
copy_event = copy(q, xty_nd_copy, xty_nd, {});
copy_event.wait_and_throw();
xty_nd = xty_nd_copy;
{
ONEDAL_PROFILER_TASK(xty_allreduce);
auto xty_arr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,43 +59,41 @@ result_t finalize_train_kernel_cov_impl<Float>::operator()(const descriptor_t& d

const auto nobs_host = pr::table2ndarray<Float>(q, input.get_partial_n_rows());
auto rows_count_global = nobs_host.get_data()[0];
{
ONEDAL_PROFILER_TASK(allreduce_rows_count_global);
comm_.allreduce(rows_count_global, spmd::reduce_op::sum).wait();
}

auto sums = pr::table2ndarray_1d<Float>(q, input.get_partial_sum(), sycl::usm::alloc::device);
auto xtx =
pr::table2ndarray<Float>(q, input.get_partial_crossproduct(), sycl::usm::alloc::device);
if (comm_.get_rank_count() > 1) {
{
ONEDAL_PROFILER_TASK(allreduce_rows_count_global);
comm_.allreduce(rows_count_global, spmd::reduce_op::sum).wait();
}
auto sums_copy =
pr::ndarray<Float, 1>::empty(q, { column_count }, sycl::usm::alloc::device);
auto copy_event = copy(q, sums_copy, sums, {});
copy_event.wait_and_throw();
sums = sums_copy;
}

{
ONEDAL_PROFILER_TASK(allreduce_sums, q);
comm_.allreduce(sums.flatten(q, {}), spmd::reduce_op::sum).wait();
}

if (desc.get_result_options().test(result_options::means)) {
auto [means, means_event] = compute_means(q, sums, rows_count_global, {});
result.set_means(homogen_table::wrap(means.flatten(q, { means_event }), 1, column_count));
}

auto xtx =
pr::table2ndarray<Float>(q, input.get_partial_crossproduct(), sycl::usm::alloc::device);
if (comm_.get_rank_count() > 1) {
auto xtx_copy = pr::ndarray<Float, 2>::empty(q,
{ column_count, column_count },
sycl::usm::alloc::device);
auto copy_event = copy(q, xtx_copy, xtx, {});
copy_event = copy(q, xtx_copy, xtx, {});
copy_event.wait_and_throw();
xtx = xtx_copy;

{
ONEDAL_PROFILER_TASK(allreduce_sums, q);
comm_.allreduce(sums.flatten(q, {}), spmd::reduce_op::sum).wait();
}

{
ONEDAL_PROFILER_TASK(allreduce_xtx, q);
comm_.allreduce(xtx.flatten(q, {}), spmd::reduce_op::sum).wait();
}
}
{
ONEDAL_PROFILER_TASK(allreduce_xtx, q);
comm_.allreduce(xtx.flatten(q, {}), spmd::reduce_op::sum).wait();

if (desc.get_result_options().test(result_options::means)) {
auto [means, means_event] = compute_means(q, sums, rows_count_global, {});
result.set_means(homogen_table::wrap(means.flatten(q, { means_event }), 1, column_count));
}
auto [cov, cov_event] = compute_covariance(q, rows_count_global, xtx, sums, {});

Expand Down

0 comments on commit 43e79d5

Please sign in to comment.