Skip to content

Commit

Permalink
Improve parallel i/o task performance
Browse files Browse the repository at this point in the history
  • Loading branch information
kingcrimsontianyu committed Feb 13, 2025
1 parent 1691bd0 commit 80f67dc
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions cpp/include/kvikio/parallel_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,30 +119,34 @@ std::future<std::size_t> parallel_io(F op,
return detail::submit_task(op, buf, size, file_offset, devPtr_offset);
}

// We know an upper bound of the total number of tasks
std::vector<std::future<std::size_t>> tasks;
tasks.reserve(size / task_size + 2);

// 1) Submit `task_size` sized tasks
while (size >= task_size) {
tasks.push_back(detail::submit_task(op, buf, task_size, file_offset, devPtr_offset));
std::vector<std::future<std::size_t>> tasks_before_last;
auto const num_full_tasks = size / task_size;
auto const remaining_bytes = size - num_full_tasks * task_size;
auto num_tasks{num_full_tasks};
if (remaining_bytes != 0) { ++num_tasks; }
tasks_before_last.reserve(num_tasks - 1);

// 1) Submit all tasks before the last one. These are all `task_size` sized tasks.
for (std::size_t i = 0; i < num_tasks - 1; ++i) {
tasks_before_last.push_back(
detail::submit_task(op, buf, task_size, file_offset, devPtr_offset));
file_offset += task_size;
devPtr_offset += task_size;
size -= task_size;
}

// 2) Submit a task for the remainder
if (size > 0) { tasks.push_back(detail::submit_task(op, buf, size, file_offset, devPtr_offset)); }
// 2) Submit the last task
auto last_task_size = (remaining_bytes == 0) ? task_size : remaining_bytes;

// Finally, we sum the result of all tasks.
auto gather_tasks = [tasks = std::move(tasks)]() mutable -> std::size_t {
std::size_t ret = 0;
for (auto& task : tasks) {
auto last_task = [=, tasks_before_last = std::move(tasks_before_last)]() mutable -> std::size_t {
std::size_t ret = op(buf, last_task_size, file_offset, devPtr_offset);
for (auto& task : tasks_before_last) {
ret += task.get();
}
return ret;
};
return detail::submit_move_only_task(std::move(gather_tasks));

return detail::submit_move_only_task(std::move(last_task));
}

} // namespace kvikio

0 comments on commit 80f67dc

Please sign in to comment.