Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: PkPatternMatchDel inconsistent between rediscache and db #2839

Merged
merged 3 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,6 @@ ExternalProject_Add(rediscache
set(REDISCACHE_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
set(REDISCACHE_LIBRARY ${INSTALL_LIBDIR}/librediscache.a)


option(USE_PIKA_TOOLS "compile pika-tools" OFF)
if (USE_PIKA_TOOLS)
ExternalProject_Add(hiredis
Expand Down
5 changes: 1 addition & 4 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,9 @@ slave-priority : 100
# The disable_auto_compactions option is [true | false]
disable_auto_compactions : false

<<<<<<< HEAD
=======
# Rocksdb max_subcompactions, increasing this value can accelerate the exec speed of a single compaction task
# it's recommended to increase it's value if large compaction is found in you instance
max-subcompactions : 1
>>>>>>> f95f867c (fix: Revised RocksDB-Related Parameters in Pika (#2728))
# The minimum disk usage ratio for checking resume.
# If the disk usage ratio is lower than min-check-resume-ratio, it will not check resume, only higher will check resume.
# Its default value is 0.7.
Expand Down Expand Up @@ -496,7 +493,7 @@ default-slot-num : 1024
# [USED BY SLAVE] The transmitting speed(Rsync Rate) In full replication is controlled BY SLAVE NODE, You should modify the throttle-bytes-per-second in slave's pika.conf if you wanna change the rsync rate limit.
# [Dynamic Change Supported] send command 'config set throttle-bytes-per-second new_value' to SLAVE NODE can dynamically adjust rsync rate during full sync(use config rewrite can persist the changes).
throttle-bytes-per-second : 207200000
<<<<<<< HEAD

# Rsync timeout in full sync stage[Default value is 1000 ms], unnecessary retries will happen if this value is too small.
# [Dynamic Change Supported] similar to throttle-bytes-per-second, rsync-timeout-ms can be dynamically changed by configset command
# [USED BY SLAVE] Similar to throttle-bytes-per-second, you should change rsync-timeout-ms's value in slave's conf file if it is needed to adjust.
Expand Down
4 changes: 4 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,13 +479,17 @@ class PKPatternMatchDelCmd : public Cmd {
PKPatternMatchDelCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::ADMIN)) {}
void Do() override;
void DoThroughDB() override;
void DoUpdateCache() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PKPatternMatchDelCmd(*this); }

private:
storage::DataType type_ = storage::kAll;
std::vector<std::string> remove_keys_;
std::string pattern_;
int64_t max_count_;
void DoInitial() override;
};

Expand Down
61 changes: 57 additions & 4 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3098,15 +3098,68 @@ void PKPatternMatchDelCmd::DoInitial() {
res_.SetRes(CmdRes::kInvalidDbType, kCmdNamePKPatternMatchDel);
return;
}
max_count_ = storage::BATCH_DELETE_LIMIT;
if (argv_.size() > 2) {
if (pstd::string2int(argv_[2].data(), argv_[2].size(), &max_count_) == 0 || max_count_ < 1 || max_count_ > storage::BATCH_DELETE_LIMIT) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

argv_[2]是类型,这里应该是argv[3],上边的if条件也得是>3

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

argv_[2]是类型,这里应该是argv[3],上边的if条件也得是>3

done

res_.SetRes(CmdRes::kInvalidInt);
return;
}
}
}

void PKPatternMatchDelCmd::Do() {
int ret = 0;
rocksdb::Status s = db_->storage()->PKPatternMatchDel(type_, pattern_, &ret);
if (s.ok()) {
res_.AppendInteger(ret);
int64_t count = 0;
rocksdb::Status s = db_->storage()->PKPatternMatchDelWithRemoveKeys(type_, pattern_, &count, &remove_keys_, max_count_);

if(s.ok()) {
res_.AppendInteger(count);
s_ = rocksdb::Status::OK();
for (const auto& key : remove_keys_) {
RemSlotKey(key, db_);
}
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
if (count >= 0) {
s_ = rocksdb::Status::OK();
for (const auto& key : remove_keys_) {
RemSlotKey(key, db_);
}
}
}
}

void PKPatternMatchDelCmd::DoThroughDB() {
Do();
}

void PKPatternMatchDelCmd::DoUpdateCache() {
if(s_.ok()) {
std::vector<std::string> v;
for (auto key : remove_keys_) {
if (argv_.size() > 2) {
//only delete the corresponding prefix
switch (type_) {
case storage::kSets:
v.emplace_back(PCacheKeyPrefixS + key);
break;
case storage::kLists:
v.emplace_back(PCacheKeyPrefixL + key);
break;
case storage::kStrings:
v.emplace_back(PCacheKeyPrefixK + key);
break;
case storage::kZSets:
v.emplace_back(PCacheKeyPrefixZ + key);
break;
case storage::kHashes:
v.emplace_back(PCacheKeyPrefixH + key);
break;
default:
break;
}
}
}
db_->cache()->Del(v);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ class Storage {

// Traverses the database of the specified type, removing the Key that matches
// the pattern
Status PKPatternMatchDel(const DataType& data_type, const std::string& pattern, int32_t* ret);
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count);

// Iterate over a collection of elements
// return next_key that the user need to use as the start_key argument
Expand Down
3 changes: 1 addition & 2 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class Redis {
virtual Status GetProperty(const std::string& property, uint64_t* out) = 0;
virtual Status ScanKeyNum(KeyInfo* key_info) = 0;
virtual Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) = 0;
virtual Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) = 0;

// Keys Commands
virtual Status Expire(const Slice& key, int32_t ttl) = 0;
Expand All @@ -110,7 +109,7 @@ class Redis {
virtual Status Expireat(const Slice& key, int32_t timestamp) = 0;
virtual Status Persist(const Slice& key) = 0;
virtual Status TTL(const Slice& key, int64_t* timestamp) = 0;

virtual Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) = 0;
Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys);
Status SetSmallCompactionThreshold(uint64_t small_compaction_threshold);
Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold);
Expand Down
11 changes: 6 additions & 5 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ Status RedisHashes::ScanKeys(const std::string& pattern, std::vector<std::string
return Status::OK();
}

Status RedisHashes::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
Status RedisHashes::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) {
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
Expand All @@ -154,27 +154,28 @@ Status RedisHashes::PKPatternMatchDel(const std::string& pattern, int32_t* ret)

std::string key;
std::string meta_value;
int32_t total_delete = 0;
int64_t total_delete = 0;
Status s;
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]);
DEFER {
delete iter;
};
iter->SeekToFirst();
while (iter->Valid()) {
while (iter->Valid() && static_cast<int64_t>(batch.Count()) < max_count) {
key = iter->key().ToString();
meta_value = iter->value().ToString();
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
if (!parsed_hashes_meta_value.IsStale() && (parsed_hashes_meta_value.count() != 0) &&
(StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) {
parsed_hashes_meta_value.InitialMetaValue();
batch.Put(handles_[0], key, meta_value);
remove_keys->push_back(key.data());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key已经是string类型了吧?直接push_back(key)就可以了。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key已经是string类型了吧?直接push_back(key)就可以了。

done

}
if (static_cast<size_t>(batch.Count()) >= BATCH_DELETE_LIMIT) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>( batch.Count());
total_delete += static_cast<int64_t>( batch.Count());
batch.Clear();
} else {
*ret = total_delete;
Expand All @@ -186,7 +187,7 @@ Status RedisHashes::PKPatternMatchDel(const std::string& pattern, int32_t* ret)
if (batch.Count() != 0U) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis_hashes.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class RedisHashes : public Redis {
Status GetProperty(const std::string& property, uint64_t* out) override;
Status ScanKeyNum(KeyInfo* key_info) override;
Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) override;
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override;
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) override;

// Hashes Commands
Status HDel(const Slice& key, const std::vector<std::string>& fields, int32_t* ret);
Expand Down
11 changes: 6 additions & 5 deletions src/storage/src/redis_lists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Status RedisLists::ScanKeys(const std::string& pattern, std::vector<std::string>
return Status::OK();
}

Status RedisLists::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
Status RedisLists::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
Expand All @@ -161,27 +161,28 @@ Status RedisLists::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {

std::string key;
std::string meta_value;
int32_t total_delete = 0;
int64_t total_delete = 0;
Status s;
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]);
DEFER {
delete iter;
};
iter->SeekToFirst();
while (iter->Valid()) {
while (iter->Valid() && static_cast<int64_t>(batch.Count()) < max_count) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

182行的逻辑,里边会执行batch.clear,所以用batch.Count()去比不准确。要不你就跟4.0里的一样,把182 if里的删掉,统一在外边调用一次batch.put。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

182行的逻辑,里边会执行batch.clear,所以用batch.Count()去比不准确。要不你就跟4.0里的一样,把182 if里的删掉,统一在外边调用一次batch.put。

done

key = iter->key().ToString();
meta_value = iter->value().ToString();
ParsedListsMetaValue parsed_lists_meta_value(&meta_value);
if (!parsed_lists_meta_value.IsStale() && (parsed_lists_meta_value.count() != 0U) &&
(StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) {
parsed_lists_meta_value.InitialMetaValue();
batch.Put(handles_[0], key, meta_value);
remove_keys->push_back(key.data());
}
if (static_cast<size_t>(batch.Count()) >= BATCH_DELETE_LIMIT) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
} else {
*ret = total_delete;
Expand All @@ -193,7 +194,7 @@ Status RedisLists::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
if (batch.Count() != 0U) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis_lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class RedisLists : public Redis {
Status GetProperty(const std::string& property, uint64_t* out) override;
Status ScanKeyNum(KeyInfo* key_info) override;
Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) override;
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override;
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) override;

// Lists commands;
Status LIndex(const Slice& key, int64_t index, std::string* element);
Expand Down
11 changes: 6 additions & 5 deletions src/storage/src/redis_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ rocksdb::Status RedisSets::ScanKeys(const std::string& pattern, std::vector<std:
return rocksdb::Status::OK();
}

rocksdb::Status RedisSets::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
rocksdb::Status RedisSets::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
Expand All @@ -161,27 +161,28 @@ rocksdb::Status RedisSets::PKPatternMatchDel(const std::string& pattern, int32_t

std::string key;
std::string meta_value;
int32_t total_delete = 0;
int64_t total_delete = 0;
rocksdb::Status s;
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]);
DEFER {
delete iter;
};
iter->SeekToFirst();
while (iter->Valid()) {
while (iter->Valid() && static_cast<int64_t>(batch.Count()) < max_count) {
key = iter->key().ToString();
meta_value = iter->value().ToString();
ParsedSetsMetaValue parsed_sets_meta_value(&meta_value);
if (!parsed_sets_meta_value.IsStale() && (parsed_sets_meta_value.count() != 0) &&
(StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) {
parsed_sets_meta_value.InitialMetaValue();
batch.Put(handles_[0], key, meta_value);
remove_keys->push_back(key.data());
}
if (static_cast<size_t>(batch.Count()) >= BATCH_DELETE_LIMIT) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
} else {
*ret = total_delete;
Expand All @@ -193,7 +194,7 @@ rocksdb::Status RedisSets::PKPatternMatchDel(const std::string& pattern, int32_t
if (batch.Count() != 0U) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis_sets.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class RedisSets : public Redis {
Status GetProperty(const std::string& property, uint64_t* out) override;
Status ScanKeyNum(KeyInfo* key_info) override;
Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) override;
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override;
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) override;

// Setes Commands
Status SAdd(const Slice& key, const std::vector<std::string>& members, int32_t* ret);
Expand Down
11 changes: 6 additions & 5 deletions src/storage/src/redis_streams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ Status RedisStreams::ScanKeys(const std::string& pattern, std::vector<std::strin
return Status::OK();
}

Status RedisStreams::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
Status RedisStreams::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
Expand All @@ -452,15 +452,15 @@ Status RedisStreams::PKPatternMatchDel(const std::string& pattern, int32_t* ret)

std::string key;
std::string meta_value;
int32_t total_delete = 0;
int64_t total_delete = 0;
Status s;
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]);
DEFER {
delete iter;
};
iter->SeekToFirst();
while (iter->Valid()) {
while (iter->Valid() && static_cast<int64_t>(batch.Count()) < max_count) {
key = iter->key().ToString();
meta_value = iter->value().ToString();
StreamMetaValue stream_meta_value;
Expand All @@ -469,11 +469,12 @@ Status RedisStreams::PKPatternMatchDel(const std::string& pattern, int32_t* ret)
(StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) {
stream_meta_value.InitMetaValue();
batch.Put(handles_[0], key, stream_meta_value.value());
remove_keys->push_back(key.data());
}
if (static_cast<size_t>(batch.Count()) >= BATCH_DELETE_LIMIT) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
} else {
*ret = total_delete;
Expand All @@ -485,7 +486,7 @@ Status RedisStreams::PKPatternMatchDel(const std::string& pattern, int32_t* ret)
if (batch.Count() != 0U) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis_streams.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class RedisStreams : public Redis {
Status GetProperty(const std::string& property, uint64_t* out) override;
Status ScanKeyNum(KeyInfo* keyinfo) override;
Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) override;
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override;
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) override;
Status PKScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern, int32_t limit,
std::vector<std::string>* keys, std::string* next_key);
Status PKRScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern, int32_t limit,
Expand Down
Loading
Loading