Skip to content

Commit

Permalink
Add a benchmark for CBTree concurrent writes.
Browse files Browse the repository at this point in the history
Before updating CBTree for ARM (where it is misbehaving currently),
we should have a proper test for two scenarios:

+ Writing on multiple threads.
+ Reading on multiple threads while there are also active writes.

If read threads wait for values to be inserted, it defeats the purpose
of benchmarking. Therefore, we should first populate a tree with
values for the read threads. The read threads will then read values
that are already in the tree, while the write threads continue to insert
new values.

Setting up the tree for the second scenario essentially involves
performing the first scenario. This is why both scenarios are combined
into a single test.

The new test provides the following new features (compared to just
running DoTestConcurrentInsert with higher parameters):

+ Different threads read the value that inserted it
+ Reader threads can't be assigned to a certain writer thread.
+ Keys are better distributed than the previous shuffle method.
+ Allows measuring read-heavy performance (with a flag).

Reading threads start concurrently with writing threads, not at the
end of each write thread (unlike DoTestConcurrentInsert).

Note that running only concurrent reads should not differ from
TestScanPerformance, since no locking takes place and they do not
sabotage each other. So no new test is required for that scenario.

Change-Id: I1b0b16e269c70716962fc5ebb4ddca1e2cbe68a4
Reviewed-on: http://gerrit.cloudera.org:8080/21447
Reviewed-by: Zoltan Chovan <[email protected]>
Reviewed-by: Ashwani Raina <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
(cherry picked from commit f4a47fe)
Reviewed-on: http://gerrit.cloudera.org:8080/21985
  • Loading branch information
martonka authored and alexeyserbin committed Oct 29, 2024
1 parent bb469cb commit 8d0e398
Showing 1 changed file with 197 additions and 0 deletions.
197 changes: 197 additions & 0 deletions src/kudu/tablet/cbtree-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#include <algorithm>
#include <array>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
Expand All @@ -25,6 +27,7 @@
#include <unordered_set>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

Expand All @@ -36,7 +39,9 @@
#include "kudu/util/debug/sanitizer_scopes.h"
#include "kudu/util/faststring.h"
#include "kudu/util/hexdump.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/memory/memory.h"
#include "kudu/util/memory/overwrite.h"
#include "kudu/util/slice.h"
#include "kudu/util/stopwatch.h"
Expand All @@ -49,6 +54,17 @@ using std::unordered_set;
using std::vector;
using strings::Substitute;

DEFINE_int32(concurrent_rw_benchmark_num_writer_threads, 4,
"Number of writer threads in TestConcurrentReadWritePerformance");
DEFINE_int32(concurrent_rw_benchmark_num_reader_threads, 4,
"Number of reader threads in TestConcurrentReadWritePerformance");
DEFINE_int32(concurrent_rw_benchmark_num_inserts, 1000000,
"Number of inserts in TestConcurrentReadWritePerformance");
// This might be needed, because reads are significantly faster than writes.
DEFINE_int32(concurrent_rw_benchmark_reader_boost, 1,
"Multiply the amount of values each reader thread reads in "
"TestConcurrentReadWritePerformance");

namespace kudu {
namespace tablet {
namespace btree {
Expand Down Expand Up @@ -889,6 +905,187 @@ TEST_F(TestCBTree, TestIteratorSeekAtOrBefore) {
}
}

// All applications of CBTree use a threadsafe arena with default node sizes.
struct ProdTreeTraits : public btree::BTreeTraits {
typedef ThreadSafeMemoryTrackingArena ArenaType;
};

// We benchmark two scenarios:
// 1. Writing on multiple threads.
// 2. Reading on multiple threads while there are also active writes.
//
// If read threads wait for values to be inserted, it defeats the purpose of benchmarking.
// Therefore, we should first populate a tree with values for the read threads. The read threads
// will then read values that are already in the tree, while the write threads continue to insert
// new values.
//
// Setting up the tree for the second scenario essentially involves performing the first scenario.
// This is why both scenarios are combined into a single test.
TEST_F(TestCBTree, ConcurrentReadWriteBenchmark) {
SKIP_IF_SLOW_NOT_ALLOWED();
constexpr int kTrials = 10;
// Short names to make some formulas readable
const int num_writer_threads = FLAGS_concurrent_rw_benchmark_num_writer_threads;
const int num_reader_threads = FLAGS_concurrent_rw_benchmark_num_reader_threads;
const int num_inserts = FLAGS_concurrent_rw_benchmark_num_inserts;
const int reader_boost = FLAGS_concurrent_rw_benchmark_reader_boost;

const int num_threads = num_writer_threads + num_reader_threads;
// Number of nodes we write in the 1st phase, and read back in the 2nd, while there are still
// concurrent writes going on.
const int num_inserts_first_phase = num_inserts / 2;

// We apply a (deterministic) mapping for i that feels random enough (for the current purpose).
auto generate_shuffled_kv = [](std::array<char, 32>& kbuf, std::array<char, 32>& vbuf, int i) {
// any prime number satisfying p % 4 == 3 and at least 1 order of magnitude larger than number
// of threads is good. (p < num_inserts is ok).
constexpr int p = 10007; // Just picked the first above 10000
auto random_shuffle = [](int x) {
int32_t r = static_cast<int32_t>((static_cast<uint64_t>(x) * x) % p);
if (x <= p / 2)
return r;
else
return p - r;
};
snprintf(kbuf.data(), kbuf.size(), "key_%d_%d", random_shuffle(i % p), i); // max 23 bytes used
snprintf(vbuf.data(), vbuf.size(), "val_%d", i);
};
unique_ptr<CBTree<ProdTreeTraits>> tree;
vector<thread> threads;
// We need 2 internal barriers to know when to stop the first and start the
// second LOG_TIMING(...)
Barrier start_write_barrier(num_writer_threads + 1);
Barrier finish_write_barrier(num_writer_threads + 1);
Barrier start_rw_barrier(num_threads + 1);
Barrier finish_rw_barrier(num_threads + 1);

// Writer threads insert keys from [0, num_inserts_first_phase), then wait for the internal
// barriers. Then insert keys from [num_inserts_first_phase, num_inserts_overall). We want to
// insert randomly distributed values without any significant performance penalty.
// generate_shuffled_kv will apply some smart shuffling.
for (int tidx = 0; tidx < num_writer_threads; tidx++) {
threads.emplace_back([&, tidx]() {
std::array<char, 32> kbuf;
std::array<char, 32> vbuf;
while (true) {
start_write_barrier.Wait();
if (!tree) {
start_rw_barrier.Wait(); // Allow readers to wake up too.
return;
}
// To prevent the existence of a one-to-one mapping between reader and writer threads even
// if num_writer_threads == num_reader_threads, in the first phase a writing thread writes
// a continuous section of the keys, while reader threads distribute keys in a round-robin
// fashion.
int interval_length =
(num_inserts_first_phase + num_writer_threads - 1) / num_writer_threads;
int start = interval_length * tidx;
int until = std::min(interval_length * (tidx + 1), num_inserts_first_phase);
for (int i = start; i < until; ++i) {
generate_shuffled_kv(kbuf, vbuf, i);
if (!tree->Insert(Slice(kbuf.data()), Slice(vbuf.data()))) {
ADD_FAILURE() << "Failed insert at iteration " << i;
break;
}
}
finish_write_barrier.Wait();
start_rw_barrier.Wait();
if (!tree) {
return;
}
for (int i = num_inserts_first_phase + tidx; i < num_inserts;
i += num_writer_threads) {
generate_shuffled_kv(kbuf, vbuf, i);
if (!tree->Insert(Slice(kbuf.data()), Slice(vbuf.data()))) {
ADD_FAILURE() << "Failed insert at iteration " << i;
break;
}
}
finish_rw_barrier.Wait();
}
});
}

// We want to read values while writes are also happening. However, waiting with ASSERT_EVENTUALLY
// would completely screw performance measuring. So we will read values that are already
// guaranteed to be in the tree.
for (int tidx = 0; tidx < num_reader_threads; tidx++) {
threads.emplace_back([&, tidx]() {
std::array<char, 32> kbuf;
std::array<char, 32> vbuf;
while (true) {
// At this point, the 1st phase is done, and the first half of the keys are already in the
// tree.
start_rw_barrier.Wait();
if (!tree) {
return;
}
for (int64_t i = tidx;
i < static_cast<int64_t>(num_inserts_first_phase) * reader_boost;
i += num_reader_threads) {
generate_shuffled_kv(kbuf, vbuf, static_cast<int32_t>(i % num_inserts_first_phase));
VerifyGet(*tree, Slice(kbuf.data()), Slice(vbuf.data()));
}
finish_rw_barrier.Wait();
}
});
}

std::shared_ptr<MemoryTrackingBufferAllocator> mtbf;
std::shared_ptr<ThreadSafeMemoryTrackingArena> arena_ptr;

bool skip_normal_shutdown = false;

for (int trial = 0; trial < kTrials; trial++) {
// shared_ptrs are passed on the interfaces, so at first glance one would think it is safe to
// reset the ptrs in any order. But it is not.
tree.reset();
arena_ptr.reset();
mtbf = std::make_shared<MemoryTrackingBufferAllocator>(HeapBufferAllocator::Get(),
MemTracker::GetRootTracker());
arena_ptr = std::make_shared<ThreadSafeMemoryTrackingArena>(16, mtbf);
tree.reset(new CBTree<ProdTreeTraits>(arena_ptr));

LOG_TIMING(
INFO,
Substitute(
"Writing $0 values on $1 threads", num_inserts_first_phase, num_writer_threads)) {
start_write_barrier.Wait();
finish_write_barrier.Wait();
}
if (::testing::Test::HasFatalFailure()) {
tree.reset(nullptr);
start_rw_barrier.Wait();
skip_normal_shutdown = true;
break;
}
LOG_TIMING(INFO,
Substitute("Writing $0 values on $1 threads and reading $2 values on $3 threads",
num_inserts - num_inserts_first_phase,
num_writer_threads,
static_cast<uint64_t>(num_inserts_first_phase)
* reader_boost,
num_reader_threads)) {
start_rw_barrier.Wait();
finish_rw_barrier.Wait();
}
if (::testing::Test::HasFatalFailure()) {
// Normal shutdown is fine. Threads are already waiting for the next start.
break;
}
}

if (!skip_normal_shutdown) {
tree.reset(nullptr);
start_write_barrier.Wait();
start_rw_barrier.Wait();
}

for (thread& thr : threads) {
thr.join();
}
}

} // namespace btree
} // namespace tablet
} // namespace kudu

0 comments on commit 8d0e398

Please sign in to comment.