Skip to content

Commit

Permalink
Merge branch 'main' into pavel/fix_macos_cross_compilation
Browse files Browse the repository at this point in the history
  • Loading branch information
shpala committed Feb 23, 2024
2 parents cf7440e + 95300a1 commit d1ef98a
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 64 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/check_license_and_history.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ name: Check License and History
on: # this workflow is planned to be called by the ci_pipeline and it will compare the PR files with the main
workflow_call:
workflow_dispatch:
#pull_request_review:
# types: [submitted]
push:
pull_request_target:


jobs:
changedfiles:
Expand Down
9 changes: 7 additions & 2 deletions .github/workflows/ci_pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ jobs:
ROCKSDB_DISABLE_JEMALLOC=1 PORTABLE=1 DEBUG_LEVEL=0 make -j 4 rocksdbjavastatic
Linux-build:
Linux-build-and-UnitTest:
if: ${{ (always() && !failure() && !cancelled()) && (github.event.review.state == 'approved' || github.event_name == 'workflow_dispatch' || startsWith(github.ref, 'refs/heads/release')) }}
needs: [Build]
runs-on: ubuntu-latest
Expand Down Expand Up @@ -317,6 +317,11 @@ jobs:
make clean
SPDB_RELEASE_BUILD=1 LIB_MODE=static DEBUG_LEVEL=0 PORTABLE=1 JAVA_HOME=/usr/lib/jvm/java-openjdk make -j$(nproc) rocksdbjavastatic
- name: Java Unit test
run: |
make clean
JAVA_HOME=/usr/lib/jvm/java-openjdk make -j$(nproc) jtest
- name: Build db_bench
run: |
yum install -y gflags-devel
Expand Down Expand Up @@ -364,7 +369,7 @@ jobs:
CI-all:
if: ${{ github.event.review.state == 'approved' || github.event_name == 'workflow_dispatch' || startsWith(github.ref, 'refs/heads/release') }}
needs: [Check-Licence-And-History, Build, QA-Tests, Fuzz, Linux-Arm-build, Linux-build, Macos-build, Windows-build-test]
needs: [Check-Licence-And-History, Build, QA-Tests, Fuzz, Linux-Arm-build, Linux-build-and-UnitTest, Macos-build, Windows-build-test]
runs-on: ubuntu-latest
steps:
- name: Summary
Expand Down
18 changes: 17 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
# Speedb Change Log

## Unreleased

### New Features

### Enhancements

### Bug Fixes
* LOG Consistency:Display the pinning policy options same as block cache options / metadata cache options (#804).

### Miscellaneous
* WriteController logging: Remove redundant reports when WC is not shared between dbs


## Incaberry 2.8.0 (31/1/2024)
Based on RocksDB 8.6.7

### New Features
Expand All @@ -16,6 +29,7 @@ Based on RocksDB 8.6.7
* LOG Enhancement: Have a separate LOG entry per CF Stats. This ensures that no CF stats data is lost in case the size of the combined CF stats text exceeds the LOG's threshold (#534).

### Bug Fixes
* Fix a bug in db_stress where non existence parameters have checked with enable_speedb_features.
* Added IsRefreshIterSupported() to memtable_rep, to publish if the memtable support Refresh() of the iterator.
Refresh() will return status NotSupported for memtables that do not support Refresh().
IsAllowRefresh() has been added.
Expand All @@ -28,6 +42,8 @@ db_stress has been updated as well to take into account that some memtables do n
* stress test: Fix TestIterateAgainstExpected not supporting 0 iterations. TestIterateAgainstExpected was not designed to support value of 0 in FLAGS_num_iterations.
RocksDB has a value of 10 by default and we've added the option to randomize the values from 0 to 100 in https://github.com/speedb-io/speedb/commit/434692a63318036a3995a53001337f18bf467903
* Add more checks for using db_stress with --enable_speedb_features=true
* Proactive Flushes: Have the initiator return a correct answer when it was requested to initate a flush (#812).
* stress test: Adding a trace file by default in PR https://github.com/speedb-io/speedb/pull/797 has revealed some incompatibilities between the trace file and several configurations (more details in https://github.com/speedb-io/speedb/issues/813). Keep the trace file and remove the IsDone assertion.
* Allow cross-compilation on macOS systems when using Cmake. See https://github.com/speedb-io/speedb/issues/822 for full details.

### Miscellaneous
Expand All @@ -36,7 +52,7 @@ RocksDB has a value of 10 by default and we've added the option to randomize the
* Options: Set level_compaction_dynamic_level_bytes as false by default. This flag is not working properly with Speedb. see https://github.com/speedb-io/speedb/issues/786 for more details.
* zlib: Update links to zlib 1.3 in CI and Makefile since the link in zlib.net is dead.

## Hazlenut 2.7.0 (27/10/2023)
## Hazelnut 2.7.0 (27/10/2023)
Based on RocksDB 8.1.1

### New Features
Expand Down
10 changes: 5 additions & 5 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1123,8 +1123,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
ROCKS_LOG_WARN(
ioptions_.logger,
"[%s] Stalling writes because we have %d immutable memtables "
"(waiting for flush), max_write_buffer_number is set to %d "
"rate %" PRIu64,
"(waiting for flush), max_write_buffer_number is set to %d. "
"delayed write rate: %" PRIu64,
name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number,
write_controller->delayed_write_rate());
Expand All @@ -1146,8 +1146,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
1);
}
ROCKS_LOG_WARN(ioptions_.logger,
"[%s] Stalling writes because we have %d level-0 files "
"rate %" PRIu64,
"[%s] Stalling writes because we have %d level-0 files. "
"delayed write rate: %" PRIu64,
name_.c_str(), vstorage->l0_delay_trigger_count(),
write_controller->delayed_write_rate());
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
Expand Down Expand Up @@ -1175,7 +1175,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
ROCKS_LOG_WARN(
ioptions_.logger,
"[%s] Stalling writes because of estimated pending compaction "
"bytes %" PRIu64 " rate %" PRIu64,
"bytes %" PRIu64 ". delayed write rate: %" PRIu64,
name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
write_controller->delayed_write_rate());
} else {
Expand Down
12 changes: 7 additions & 5 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,9 @@ class DBImpl : public DB {

// flush initiated by the write buffer manager to free some space
bool InitiateMemoryManagerFlushRequest(size_t min_size_to_flush);
bool InitiateMemoryManagerFlushRequestAtomicFlush(
size_t InitiateMemoryManagerFlushRequestAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options);
bool InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options);

virtual SequenceNumber GetLatestSequenceNumber() const override;
Expand Down Expand Up @@ -1995,15 +1995,17 @@ class DBImpl : public DB {
// Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
FlushReason flush_reason,
bool entered_write_thread = false);
bool entered_write_thread = false,
size_t* num_flushes_initiated = nullptr);

// Atomic-flush memtables from quanlified CFs among `provided_candidate_cfds`
// (if non-empty) or amomg all column families and atomically record the
// result to the MANIFEST.
Status AtomicFlushMemTables(
const FlushOptions& options, FlushReason flush_reason,
const autovector<ColumnFamilyData*>& provided_candidate_cfds = {},
bool entered_write_thread = false);
bool entered_write_thread = false,
size_t* num_flushes_initiated = nullptr);

// Wait until flushing this column family won't stall writes
Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
Expand Down Expand Up @@ -2156,7 +2158,7 @@ class DBImpl : public DB {
void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
FlushReason flush_reason, FlushRequest* req);

void SchedulePendingFlush(const FlushRequest& req);
bool SchedulePendingFlush(const FlushRequest& req);

void SchedulePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
Expand Down
88 changes: 62 additions & 26 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2333,7 +2333,12 @@ void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_options,
FlushReason flush_reason,
bool entered_write_thread) {
bool entered_write_thread,
size_t* num_flushes_initiated) {
if (num_flushes_initiated != nullptr) {
*num_flushes_initiated = 0U;
}

// This method should not be called if atomic_flush is true.
assert(!immutable_db_options_.atomic_flush);
if (!flush_options.wait && write_controller_->IsStopped()) {
Expand Down Expand Up @@ -2447,7 +2452,10 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
}
for (const auto& req : flush_reqs) {
SchedulePendingFlush(req);
bool pushed_req = SchedulePendingFlush(req);
if (pushed_req && (num_flushes_initiated != nullptr)) {
++(*num_flushes_initiated);
}
}
MaybeScheduleFlushOrCompaction();
}
Expand Down Expand Up @@ -2486,8 +2494,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
Status DBImpl::AtomicFlushMemTables(
const FlushOptions& flush_options, FlushReason flush_reason,
const autovector<ColumnFamilyData*>& provided_candidate_cfds,
bool entered_write_thread) {
bool entered_write_thread, size_t* num_flushes_initiated) {
assert(immutable_db_options_.atomic_flush);

if (num_flushes_initiated != nullptr) {
*num_flushes_initiated = 0U;
}

if (!flush_options.wait && write_controller_->IsStopped()) {
std::ostringstream oss;
oss << "Writes have been stopped, thus unable to perform manual flush. "
Expand Down Expand Up @@ -2598,7 +2611,10 @@ Status DBImpl::AtomicFlushMemTables(
}
}
GenerateFlushRequest(cfds, flush_reason, &flush_req);
SchedulePendingFlush(flush_req);
bool pushed_req = SchedulePendingFlush(flush_req);
if (pushed_req && (num_flushes_initiated != nullptr)) {
++(*num_flushes_initiated);
}
MaybeScheduleFlushOrCompaction();
}

Expand Down Expand Up @@ -3014,14 +3030,17 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue(
return cfd;
}

void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
mutex_.AssertHeld();
if (reject_new_background_jobs_) {
return;
return false;
}
if (flush_req.cfd_to_max_mem_id_to_persist.empty()) {
return;
return false;
}

bool pushed_req = false;

if (!immutable_db_options_.atomic_flush) {
// For the non-atomic flush case, we never schedule multiple column
// families in the same flush request.
Expand All @@ -3035,6 +3054,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
cfd->set_queued_for_flush(true);
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
pushed_req = true;
}
} else {
for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
Expand All @@ -3043,7 +3063,10 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
}
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
pushed_req = true;
}

return pushed_req;
}

void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
Expand Down Expand Up @@ -3273,11 +3296,6 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
bg_job_limits.max_compactions, bg_flush_scheduled_,
bg_compaction_scheduled_);
}
*reason = bg_flush_args[0].flush_reason_;
if (write_buffer_manager_) {
write_buffer_manager_->FlushStarted(
*reason == FlushReason::kWriteBufferManagerInitiated);
}

status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer, thread_pri);
Expand Down Expand Up @@ -3325,6 +3343,12 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
Status s =
BackgroundFlush(&made_progress, &job_context, &log_buffer, &reason,
&flush_rescheduled_to_retain_udt, thread_pri);

if (write_buffer_manager_) {
write_buffer_manager_->FlushStarted(
reason == FlushReason::kWriteBufferManagerInitiated);
}

if (s.IsTryAgain() && flush_rescheduled_to_retain_udt) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
Expand Down Expand Up @@ -4351,16 +4375,20 @@ bool DBImpl::InitiateMemoryManagerFlushRequest(size_t min_size_to_flush) {
flush_options.allow_write_stall = true;
flush_options.wait = false;

size_t num_flushes_initiated = 0U;
if (immutable_db_options_.atomic_flush) {
return InitiateMemoryManagerFlushRequestAtomicFlush(min_size_to_flush,
flush_options);
num_flushes_initiated = InitiateMemoryManagerFlushRequestAtomicFlush(
min_size_to_flush, flush_options);
} else {
return InitiateMemoryManagerFlushRequestNonAtomicFlush(min_size_to_flush,
flush_options);
num_flushes_initiated = InitiateMemoryManagerFlushRequestNonAtomicFlush(
min_size_to_flush, flush_options);
}

// TODO - Have Proactive Flushes handle num_flushes_initiated > 1
return (num_flushes_initiated > 0U);
}

bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(
size_t DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options) {
assert(immutable_db_options_.atomic_flush);

Expand All @@ -4370,7 +4398,7 @@ bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(

SelectColumnFamiliesForAtomicFlush(&cfds);
if (cfds.empty()) {
return false;
return 0U;
}

// min_size_to_flush may be 0.
Expand All @@ -4391,7 +4419,7 @@ bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(
}
}
if (total_size_to_flush < min_size_to_flush) {
return false;
return 0U;
}
}
}
Expand All @@ -4404,17 +4432,21 @@ bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(

TEST_SYNC_POINT(
"DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush::BeforeFlush");
size_t num_flushes_initiated = 0U;
auto s = AtomicFlushMemTables(
flush_options, FlushReason::kWriteBufferManagerInitiated, cfds);
flush_options, FlushReason::kWriteBufferManagerInitiated, cfds,
false /* entered_write_thread */, &num_flushes_initiated);

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"write buffer manager initiated Atomic flush finished, status: %s",
s.ToString().c_str());
return s.ok();

assert(s.ok() || (num_flushes_initiated == 0));
return num_flushes_initiated;
}

bool DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options) {
assert(immutable_db_options_.atomic_flush == false);

Expand Down Expand Up @@ -4456,7 +4488,7 @@ bool DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush(
}

if (cfd_to_flush == nullptr) {
return false;
return 0U;
}

orig_cfd_to_flush = cfd_to_flush;
Expand Down Expand Up @@ -4503,15 +4535,19 @@ bool DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush(

TEST_SYNC_POINT(
"DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush::BeforeFlush");
auto s = FlushMemTable(cfd_to_flush, flush_options,
FlushReason::kWriteBufferManagerInitiated);
size_t num_flushes_initiated = 0U;

auto s = FlushMemTable(
cfd_to_flush, flush_options, FlushReason::kWriteBufferManagerInitiated,
false /* entered_write_thread */, &num_flushes_initiated);

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] write buffer manager initialize flush finished, status: %s\n",
cfd_to_flush->GetName().c_str(), s.ToString().c_str());

return s.ok();
assert(s.ok() || (num_flushes_initiated == 0));
return num_flushes_initiated;
}

} // namespace ROCKSDB_NAMESPACE
4 changes: 4 additions & 0 deletions db/write_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ void WriteController::HandleNewDelayReq(void* client_id,

{
std::lock_guard<std::mutex> logger_lock(loggers_map_mu_);
// The below WARN msg is intended only when the WC is shared among loggers.
if (loggers_to_client_ids_map_.size() == 1) {
return;
}
for (auto& logger_and_clients : loggers_to_client_ids_map_) {
ROCKS_LOG_WARN(logger_and_clients.first.get(),
"WC setting delay of %" PRIu64
Expand Down
Loading

0 comments on commit d1ef98a

Please sign in to comment.