Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: PkPatternMatchDel inconsistent between rediscache and db #2777

Merged
merged 9 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,11 +466,15 @@ class PKPatternMatchDelCmd : public Cmd {
PKPatternMatchDelCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::ADMIN)) {}
void Do() override;
void DoThroughDB() override;
void DoUpdateCache() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PKPatternMatchDelCmd(*this); }
void DoBinlog() override;

private:
std::vector<std::string> remove_keys_;
storage::DataType type_ = storage::DataType::kAll;
std::string pattern_;
void DoInitial() override;
Expand Down
42 changes: 37 additions & 5 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3129,14 +3129,46 @@ void PKPatternMatchDelCmd::DoInitial() {
pattern_ = argv_[1];
}

//TODO: may lead to inconsistent between rediscache and db, because currently it only cleans db
void PKPatternMatchDelCmd::Do() {
int ret = 0;
rocksdb::Status s = db_->storage()->PKPatternMatchDel(type_, pattern_, &ret);
if (s.ok()) {
res_.AppendInteger(ret);
int64_t count = 0;
rocksdb::Status s = db_->storage()->PKPatternMatchDelWithRemoveKeys(type_, pattern_, &count, &remove_keys_);

if(s.ok()){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)和{中间加空格。

res_.AppendInteger(count);
s_ = rocksdb::Status::OK();
std::vector<std::string>::const_iterator it;
for (it = remove_keys_.begin(); it != remove_keys_.end(); it++) {
RemSlotKey(*it, db_);
}
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
if (count >= 0) {
s_ = rocksdb::Status::OK();
std::vector<std::string>::const_iterator it;
for (it = remove_keys_.begin(); it != remove_keys_.end(); it++) {
RemSlotKey(*it, db_);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid duplicate code for removing slot keys.

The code for removing slot keys is duplicated. Consider refactoring to avoid duplication.

-  } else {
-    res_.SetRes(CmdRes::kErrOther, s.ToString());
-    if (count >= 0) {
-      s_ = rocksdb::Status::OK();
-      std::vector<std::string>::const_iterator it;
-      for (it = remove_keys_.begin(); it != remove_keys_.end(); it++) {
-        RemSlotKey(*it, db_);
-      }
-    }
-  }
+  } else if (count >= 0) {
+    s_ = rocksdb::Status::OK();
+  } else {
+    res_.SetRes(CmdRes::kErrOther, s.ToString());
+  }
+  for (const auto& key : remove_keys_) {
+    RemSlotKey(key, db_);
+  }

Committable suggestion was skipped due to low confidence.

}
}

void PKPatternMatchDelCmd::DoThroughDB() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

既然记录了删除的key,那是不是binlog内容也可以修改下?直接删除指定的key即可?省去了slave遍历的开销。

Do();
}

void PKPatternMatchDelCmd::DoUpdateCache() {
if(s_.ok()) {
db_->cache()->Del(remove_keys_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果remove_key特别多的话 这个地方 remove_keys_ 会不会有放不下的情况,如果redis有按照字符匹配去删除的命令是不是直接执行命令会好一点

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果remove_key特别多的话 这个地方 remove_keys_ 会不会有放不下的情况,如果redis有按照字符匹配去删除的命令是不是直接执行命令会好一点

我感觉有几个问题

  1. 这样的话感觉需要 rediscache 中支持 scan命令。
  2. remove_keys_中感觉需要存删除失败的键,这样才能保证缓存和磁盘上数据的一致性(会有这样的问题么)
  3. PKPatternMatchDelCmd返回值,需要记录所有失败的信息么,还是和DEL命令一样。(得保证扫描完所有数据才行,现在是遇到错误就停止)

}
}

void PKPatternMatchDelCmd::DoBinlog() {
std::string opt = "del";
for(auto& key: remove_keys_) {
argv_.clear();
argv_.emplace_back(opt);
argv_.emplace_back(key);
Cmd::DoBinlog();
Comment on lines +3179 to +3185
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize binlog operations.

The binlog operations can be optimized by reducing the number of argv_ modifications.

-  std::string opt = "del";
-  for(auto& key: remove_keys_) {
-    argv_.clear();
-    argv_.emplace_back(opt);
-    argv_.emplace_back(key);
-    Cmd::DoBinlog();
-  }
+  argv_.clear();
+  argv_.emplace_back("del");
+  for (const auto& key : remove_keys_) {
+    argv_.resize(2);
+    argv_[1] = key;
+    Cmd::DoBinlog();
+  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void PKPatternMatchDelCmd::DoBinlog() {
std::string opt = "del";
for(auto& key: remove_keys_) {
argv_.clear();
argv_.emplace_back(opt);
argv_.emplace_back(key);
Cmd::DoBinlog();
void PKPatternMatchDelCmd::DoBinlog() {
argv_.clear();
argv_.emplace_back("del");
for (const auto& key : remove_keys_) {
argv_.resize(2);
argv_[1] = key;
Cmd::DoBinlog();

}
}

Expand Down
2 changes: 2 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,8 @@ class Storage {
// the pattern
Status PKPatternMatchDel(const DataType& data_type, const std::string& pattern, int32_t* ret);

Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys);

// Iterate over a collection of elements
// return next_key that the user need to use as the start_key argument
// in the next call
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ class Redis {
Status Persist(const Slice& key);
Status TTL(const Slice& key, int64_t* timestamp);
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret);
Status PKPatternMatchDelWithRemoveKeys(const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys);

Status GetType(const Slice& key, enum DataType& type);
Status IsExist(const Slice& key);
Expand Down
88 changes: 88 additions & 0 deletions src/storage/src/redis_strings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,7 @@ rocksdb::Status Redis::PKPatternMatchDel(const std::string& pattern, int32_t* re
batch.Clear();
} else {
*ret = total_delete;
delete iter;
return s;
}
}
Expand All @@ -1762,6 +1763,93 @@ rocksdb::Status Redis::PKPatternMatchDel(const std::string& pattern, int32_t* re
}

*ret = total_delete;
delete iter;
return s;
}

rocksdb::Status Redis::PKPatternMatchDelWithRemoveKeys(const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys) {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
iterator_options.snapshot = snapshot;
iterator_options.fill_cache = false;

std::string key;
std::string meta_value;
int32_t total_delete = 0;
rocksdb::Status s;
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]);
iter->SeekToFirst();
while (iter->Valid()) {
auto meta_type = static_cast<enum DataType>(static_cast<uint8_t>(iter->value()[0]));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for null pointer before dereferencing.

Ensure iter is not null before dereferencing it.

if (iter == nullptr) {
  return rocksdb::Status::InvalidArgument("Iterator creation failed.");
}

ParsedBaseMetaKey parsed_meta_key(iter->key().ToString());
key = iter->key().ToString();
meta_value = iter->value().ToString();

if (meta_type == DataType::kStrings) {
ParsedStringsValue parsed_strings_value(&meta_value);
if (!parsed_strings_value.IsStale() &&
(StringMatch(pattern.data(), pattern.size(), parsed_meta_key.Key().data(), parsed_meta_key.Key().size(), 0) != 0)) {
batch.Delete(key);
remove_keys->push_back(parsed_meta_key.Key().data());
}
} else if (meta_type == DataType::kLists) {
ParsedListsMetaValue parsed_lists_meta_value(&meta_value);
if (!parsed_lists_meta_value.IsStale() && (parsed_lists_meta_value.Count() != 0U) &&
(StringMatch(pattern.data(), pattern.size(), parsed_meta_key.Key().data(), parsed_meta_key.Key().size(), 0) !=
0)) {
parsed_lists_meta_value.InitialMetaValue();
batch.Put(handles_[kMetaCF], iter->key(), meta_value);
remove_keys->push_back(parsed_meta_key.Key().data());
}
} else if (meta_type == DataType::kStreams) {
StreamMetaValue stream_meta_value;
stream_meta_value.ParseFrom(meta_value);
if ((stream_meta_value.length() != 0) &&
(StringMatch(pattern.data(), pattern.size(), parsed_meta_key.Key().data(), parsed_meta_key.Key().size(), 0) != 0)) {
stream_meta_value.InitMetaValue();
batch.Put(handles_[kMetaCF], key, stream_meta_value.value());
remove_keys->push_back(parsed_meta_key.Key().data());
}
} else {
ParsedBaseMetaValue parsed_meta_value(&meta_value);
if (!parsed_meta_value.IsStale() && (parsed_meta_value.Count() != 0) &&
(StringMatch(pattern.data(), pattern.size(), parsed_meta_key.Key().data(), parsed_meta_key.Key().size(), 0) !=
0)) {
parsed_meta_value.InitialMetaValue();
batch.Put(handles_[kMetaCF], iter->key(), meta_value);
remove_keys->push_back(parsed_meta_key.Key().data());
}
}

if (static_cast<size_t>(batch.Count()) >= BATCH_DELETE_LIMIT) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
batch.Clear();
} else {
remove_keys->erase(remove_keys->end() - batch.Count(), remove_keys->end());
delete iter;
*ret = total_delete;
return s;
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
}
}
iter->Next();
}
if (batch.Count() != 0U) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
batch.Clear();
} else {
remove_keys->erase(remove_keys->end() - batch.Count(), remove_keys->end());
}
}

*ret = total_delete;
baixin01 marked this conversation as resolved.
Show resolved Hide resolved
delete iter;
return s;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

释放iter迭代器

}

} // namespace storage
17 changes: 17 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1399,13 +1399,30 @@ Status Storage::PKRScanRange(const DataType& data_type, const Slice& key_start,
return Status::OK();
}

Status Storage::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern,
int64_t* ret, std::vector<std::string>* remove_keys) {
Status s;
*ret = 0;
for (const auto& inst : insts_) {
int64_t tmp_ret = 0;
s = inst->PKPatternMatchDelWithRemoveKeys(pattern, &tmp_ret, remove_keys);
if (!s.ok()) {
*ret += tmp_ret;
return s;
}
*ret += tmp_ret;
}
return s;
}

Status Storage::PKPatternMatchDel(const DataType& data_type, const std::string& pattern, int32_t* ret) {
Status s;
*ret = 0;
for (const auto& inst : insts_) {
int32_t tmp_ret = 0;
s = inst->PKPatternMatchDel(pattern, &tmp_ret);
if (!s.ok()) {
*ret += tmp_ret;
return s;
}
*ret += tmp_ret;
Expand Down
Loading