diff --git a/cpp/include/kvikio/parallel_operation.hpp b/cpp/include/kvikio/parallel_operation.hpp index b69f67b37a..9efc4053c2 100644 --- a/cpp/include/kvikio/parallel_operation.hpp +++ b/cpp/include/kvikio/parallel_operation.hpp @@ -119,30 +119,34 @@ std::future parallel_io(F op, return detail::submit_task(op, buf, size, file_offset, devPtr_offset); } - // We know an upper bound of the total number of tasks - std::vector> tasks; - tasks.reserve(size / task_size + 2); - - // 1) Submit `task_size` sized tasks - while (size >= task_size) { - tasks.push_back(detail::submit_task(op, buf, task_size, file_offset, devPtr_offset)); + std::vector> tasks_before_last; + auto const num_full_tasks = size / task_size; + auto const remaining_bytes = size - num_full_tasks * task_size; + auto num_tasks{num_full_tasks}; + if (remaining_bytes != 0) { ++num_tasks; } + tasks_before_last.reserve(num_tasks - 1); + + // 1) Submit all tasks before the last one. These are all `task_size` sized tasks. + for (std::size_t i = 0; i < num_tasks - 1; ++i) { + tasks_before_last.push_back( + detail::submit_task(op, buf, task_size, file_offset, devPtr_offset)); file_offset += task_size; devPtr_offset += task_size; size -= task_size; } - // 2) Submit a task for the remainder - if (size > 0) { tasks.push_back(detail::submit_task(op, buf, size, file_offset, devPtr_offset)); } + // 2) Submit the last task + auto last_task_size = (remaining_bytes == 0) ? task_size : remaining_bytes; - // Finally, we sum the result of all tasks. - auto gather_tasks = [tasks = std::move(tasks)]() mutable -> std::size_t { - std::size_t ret = 0; - for (auto& task : tasks) { + auto last_task = [=, tasks_before_last = std::move(tasks_before_last)]() mutable -> std::size_t { + std::size_t ret = op(buf, last_task_size, file_offset, devPtr_offset); + for (auto& task : tasks_before_last) { ret += task.get(); } return ret; }; - return detail::submit_move_only_task(std::move(gather_tasks)); + + return detail::submit_move_only_task(std::move(last_task)); } } // namespace kvikio