Skip to content

Commit

Permalink
add parallel_range_blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzziqersoftware committed Nov 3, 2024
1 parent d5fb435 commit 4aaecf1
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 55 deletions.
107 changes: 91 additions & 16 deletions src/Tools.hh
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,6 @@ inline CallOnDestroy on_close_scope(std::function<void()> f) {
return CallOnDestroy(std::move(f));
}

template <typename IntT>
void parallel_range_thread_fn(
std::function<bool(IntT, size_t thread_num)>& fn,
std::atomic<IntT>& current_value,
std::atomic<IntT>& result_value,
IntT end_value,
size_t thread_num) {
IntT v;
while ((v = current_value.fetch_add(1)) < end_value) {
if (fn(v, thread_num)) {
result_value = v;
current_value = end_value;
}
}
}

template <typename IntT>
void parallel_range_default_progress_fn(IntT start_value, IntT end_value, IntT current_value, uint64_t start_time) {
std::string format_str = "... %08";
Expand All @@ -66,6 +50,22 @@ void parallel_range_default_progress_fn(IntT start_value, IntT end_value, IntT c
elapsed_str.c_str(), remaining_str.c_str());
}

template <typename IntT>
void parallel_range_thread_fn(
std::function<bool(IntT, size_t thread_num)>& fn,
std::atomic<IntT>& current_value,
std::atomic<IntT>& result_value,
IntT end_value,
size_t thread_num) {
IntT v;
while ((v = current_value.fetch_add(1)) < end_value) {
if (fn(v, thread_num)) {
result_value = v;
current_value = end_value;
}
}
}

// This function runs a function in parallel, using the specified number of
// threads. If the thread count is 0, the function uses the same number of
// threads as there are CPU cores in the system. If any instance of the callback
Expand Down Expand Up @@ -114,4 +114,79 @@ IntT parallel_range(
return result_value;
}

template <typename IntT>
void parallel_range_blocks_thread_fn(
std::function<bool(IntT, size_t thread_num)>& fn,
std::atomic<IntT>& current_value,
std::atomic<IntT>& result_value,
IntT end_value,
IntT block_size,
size_t thread_num) {
IntT block_start;
while ((block_start = current_value.fetch_add(block_size)) < end_value) {
IntT block_end = block_start + block_size;
for (IntT z = block_start; z < block_end; z++) {
if (fn(z, thread_num)) {
result_value = z;
current_value = end_value;
break;
}
}
}
}

// Like parallel_range, but faster since due to fewer atomic memory operations.
// block_size must evenly divide the input range, but it is not required that
// start_value or end_value themselves be multiples of block_size. For example,
// the following are both valid inputs:
// start_value = 0x00000000, end_value = 0x10000000, block_size = 0x1000
// start_value = 0x47F92AC2, end_value = 0x67F92AC2, block_size = 0x10000
// However, this would be invalid:
// start_value = 0x0000004F, end_value = 0x10000059, block_size = 0x10000
template <typename IntT = uint64_t>
IntT parallel_range_blocks(
std::function<bool(IntT value, size_t thread_num)> fn,
IntT start_value,
IntT end_value,
IntT block_size,
size_t num_threads = 0,
std::function<void(IntT start_value, IntT end_value, IntT current_value, uint64_t start_time_usecs)> progress_fn = parallel_range_default_progress_fn<IntT>) {
if ((end_value - start_value) % block_size) {
throw std::logic_error("block_size must evenly divide the entire range");
}

if (num_threads == 0) {
num_threads = std::thread::hardware_concurrency();
}

std::atomic<IntT> current_value(start_value);
std::atomic<IntT> result_value(end_value);
std::vector<std::thread> threads;
while (threads.size() < num_threads) {
threads.emplace_back(
parallel_range_blocks_thread_fn<IntT>,
std::ref(fn),
std::ref(current_value),
std::ref(result_value),
end_value,
block_size,
threads.size());
}

if (progress_fn != nullptr) {
uint64_t start_time = now();
IntT progress_current_value;
while ((progress_current_value = current_value.load()) < end_value) {
progress_fn(start_value, end_value, progress_current_value, start_time);
usleep(1000000);
}
}

for (auto& t : threads) {
t.join();
}

return result_value;
}

} // namespace phosg
130 changes: 91 additions & 39 deletions src/ToolsTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,53 +7,105 @@ using namespace std;
using namespace phosg;

int main(int, char**) {
printf("-- on_close_scope\n");
bool called = false;
{
auto g1 = on_close_scope([&]() {
called = true;
});
expect(!called);
printf("-- on_close_scope\n");
bool called = false;
{
auto g1 = on_close_scope([&]() {
called = true;
});
expect(!called);
}
expect(called);
}
expect(called);

const size_t num_threads = thread::hardware_concurrency();

printf("-- parallel_range\n");
vector<uint8_t> hits(0x1000000, 0);
auto handle_value = [&](uint64_t v, size_t thread_num) -> bool {
hits[v] = thread_num + 1;
return false;
};
parallel_range<uint64_t>(handle_value, 0, hits.size(), num_threads, nullptr);

vector<size_t> thread_counts(num_threads, 0);
for (size_t x = 0; x < hits.size(); x++) {
expect_ne(hits[x], 0);
thread_counts.at(hits[x] - 1)++;
{
printf("-- parallel_range\n");
vector<uint8_t> hits(0x1000000, 0);
auto handle_value = [&](uint64_t v, size_t thread_num) -> bool {
hits[v] = thread_num + 1;
return false;
};
uint64_t start_time = now();
parallel_range<uint64_t>(handle_value, 0, hits.size(), num_threads, nullptr);
uint64_t duration = now() - start_time;
fprintf(stderr, "---- time: %" PRIu64 "\n", duration);

vector<size_t> thread_counts(num_threads, 0);
for (size_t x = 0; x < hits.size(); x++) {
expect_ne(hits[x], 0);
thread_counts.at(hits[x] - 1)++;
}

size_t sum = 0;
for (size_t x = 0; x < thread_counts.size(); x++) {
expect_ne(thread_counts[x], 0);
fprintf(stderr, "---- thread %zu: %zu\n", x, thread_counts[x]);
sum += thread_counts[x];
}
expect_eq(sum, hits.size());
}

{
printf("-- parallel_range_blocks\n");
vector<uint8_t> hits(0x1000000, 0);
auto handle_value = [&](uint64_t v, size_t thread_num) -> bool {
hits[v] = thread_num + 1;
return false;
};
uint64_t start_time = now();
parallel_range_blocks<uint64_t>(handle_value, 0, hits.size(), 0x1000, num_threads, nullptr);
uint64_t duration = now() - start_time;
fprintf(stderr, "---- time: %" PRIu64 "\n", duration);

vector<size_t> thread_counts(num_threads, 0);
for (size_t x = 0; x < hits.size(); x++) {
expect_ne(hits[x], 0);
thread_counts.at(hits[x] - 1)++;
}

size_t sum = 0;
for (size_t x = 0; x < thread_counts.size(); x++) {
expect_ne(thread_counts[x], 0);
fprintf(stderr, "---- thread %zu: %zu\n", x, thread_counts[x]);
sum += thread_counts[x];
}
expect_eq(sum, hits.size());
}

{
printf("-- parallel_range return value\n");
uint64_t target_value = 0xC349;
auto is_equal = [&](uint64_t v, size_t) -> bool {
return (v == target_value);
};
expect_eq((parallel_range<uint64_t>(is_equal, 0, 0x10000, num_threads, nullptr)), target_value);
// Note: We can't check that parallel_range ends early when fn returns true
// because it's not actually guaranteed to do so - it's only guaranteed to
// return any of the values for which fn returns true. One could imagine a sequence of events in
// which the target value's call takes a very long time, and all other threads
// could finish checking all other values before the target one returns true.
target_value = 0xCC349; // > end_value; should not be found
expect_eq((parallel_range<uint64_t>(is_equal, 0, 0x10000, num_threads, nullptr)), 0x10000);
}

size_t sum = 0;
for (size_t x = 0; x < thread_counts.size(); x++) {
expect_ne(thread_counts[x], 0);
fprintf(stderr, "---- thread %zu: %zu\n", x, thread_counts[x]);
sum += thread_counts[x];
{
printf("-- parallel_range_blocks return value\n");
uint64_t target_value = 0xC349;
auto is_equal = [&](uint64_t v, size_t) -> bool {
return (v == target_value);
};
expect_eq((parallel_range_blocks<uint64_t>(is_equal, 0, 0x100000, 0x1000, num_threads, nullptr)), target_value);
// Note: We can't check that parallel_range ends early when fn returns true
// because it's not actually guaranteed to do so - it's only guaranteed to
// return any of the values for which fn returns true. One could imagine a sequence of events in
// which the target value's call takes a very long time, and all other threads
// could finish checking all other values before the target one returns true.
target_value = 0xCCC349; // > end_value; should not be found
expect_eq((parallel_range_blocks<uint64_t>(is_equal, 0, 0x100000, 0x1000, num_threads, nullptr)), 0x100000);
}
expect_eq(sum, hits.size());

printf("-- parallel_range return value\n");
uint64_t target_value = 0xC349;
auto is_equal = [&](uint64_t v, size_t) -> bool {
return (v == target_value);
};
expect_eq((parallel_range<uint64_t>(is_equal, 0, 0x10000, num_threads, nullptr)), target_value);
// Note: We can't check that parallel_range ends early when fn returns true
// because it's not actually guaranteed to do so - it's only guaranteed to
// return any of the values for which fn returns true. One could imagine a sequence of events in
// which the target value's call takes a very long time, and all other threads
// could finish checking all other values before the target one returns true.
target_value = 0xCC349; // > end_value; should not be found
expect_eq((parallel_range<uint64_t>(is_equal, 0, 0x10000, num_threads, nullptr)), 0x10000);

printf("ToolsTest: all tests passed\n");
return 0;
Expand Down

0 comments on commit 4aaecf1

Please sign in to comment.