From e560b899c7875b42d9103b7785188be824ceae90 Mon Sep 17 00:00:00 2001 From: Korpse <543514071@qq.com> Date: Thu, 11 Jan 2024 22:26:38 +0800 Subject: [PATCH] fix: Support some commands and API of blackwidow. * KEYS * Storage::DoCompactRange() * Storage::PKHScanRange() * Storage::PKHRSranRange() --- include/pika_stream.h | 3 +- src/pika_kv.cc | 2 + src/storage/include/storage/storage.h | 1 + src/storage/src/redis_streams.cc | 106 ++++++++++++++++++++++++++ src/storage/src/redis_streams.h | 4 + src/storage/src/storage.cc | 43 +++++++++++ tests/integration/stream_test.go | 26 +++++++ 7 files changed, 184 insertions(+), 1 deletion(-) diff --git a/include/pika_stream.h b/include/pika_stream.h index 356cca5a46..a86f0d21cf 100644 --- a/include/pika_stream.h +++ b/include/pika_stream.h @@ -6,10 +6,11 @@ #ifndef PIKA_STREAM_H_ #define PIKA_STREAM_H_ +#include "include/acl.h" #include "include/pika_command.h" #include "include/pika_slot.h" -#include "storage/storage.h" #include "storage/src/redis_streams.h" +#include "storage/storage.h" /* * stream diff --git a/src/pika_kv.cc b/src/pika_kv.cc index f021195898..8837653c05 100644 --- a/src/pika_kv.cc +++ b/src/pika_kv.cc @@ -611,6 +611,8 @@ void KeysCmd::DoInitial() { type_ = storage::DataType::kLists; } else if (strcasecmp(opt.data(), "hash") == 0) { type_ = storage::DataType::kHashes; + } else if (strcasecmp(opt.data(), "stream") == 0) { + type_ = storage::DataType::kStreams; } else { res_.SetRes(CmdRes::kSyntaxErr); } diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index d872f41dc2..cbd55ae3a3 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -148,6 +148,7 @@ enum Operation { kCleanZSets, kCleanSets, kCleanLists, + kCleanStreams, kCompactRange }; diff --git a/src/storage/src/redis_streams.cc b/src/storage/src/redis_streams.cc index f0e82dc337..2ea34efb3c 100644 --- a/src/storage/src/redis_streams.cc +++ b/src/storage/src/redis_streams.cc @@ -491,6 +491,112 @@ Status RedisStreams::PKPatternMatchDel(const std::string& pattern, int32_t* ret) return s; } +rocksdb::Status RedisStreams::PKScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern, + int32_t limit, std::vector* keys, std::string* next_key) { + next_key->clear(); + + std::string key; + int32_t remain = limit; + rocksdb::ReadOptions iterator_options; + const rocksdb::Snapshot* snapshot; + ScopeSnapshot ss(db_, &snapshot); + iterator_options.snapshot = snapshot; + iterator_options.fill_cache = false; + + bool start_no_limit = key_start.compare("") == 0; + bool end_no_limit = key_end.compare("") == 0; + + if (!start_no_limit && !end_no_limit && (key_start.compare(key_end) > 0)) { + return rocksdb::Status::InvalidArgument("error in given range"); + } + + rocksdb::Iterator* it = db_->NewIterator(iterator_options, handles_[0]); + if (start_no_limit) { + it->SeekToFirst(); + } else { + it->Seek(key_start); + } + + while (it->Valid() && remain > 0 && (end_no_limit || it->key().compare(key_end) <= 0)) { + ParsedStreamMetaValue parsed_meta_value(it->value()); + if (parsed_meta_value.length() == 0) { + it->Next(); + } else { + key = it->key().ToString(); + if (StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0) { + keys->push_back(key); + } + remain--; + it->Next(); + } + } + + while (it->Valid() && (end_no_limit || it->key().compare(key_end) <= 0)) { + ParsedStreamMetaValue parsed_meta_value(it->value()); + if (parsed_meta_value.length() == 0) { + it->Next(); + } else { + *next_key = it->key().ToString(); + break; + } + } + delete it; + return rocksdb::Status::OK(); +} + +Status RedisStreams::PKRScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern, int32_t limit, + std::vector* keys, std::string* next_key) { + next_key->clear(); + + std::string key; + int32_t remain = limit; + rocksdb::ReadOptions iterator_options; + const rocksdb::Snapshot* snapshot; + ScopeSnapshot ss(db_, &snapshot); + iterator_options.snapshot = snapshot; + iterator_options.fill_cache = false; + + bool start_no_limit = key_start.compare("") == 0; + bool end_no_limit = key_end.compare("") == 0; + + if (!start_no_limit && !end_no_limit && (key_start.compare(key_end) < 0)) { + return Status::InvalidArgument("error in given range"); + } + + rocksdb::Iterator* it = db_->NewIterator(iterator_options, handles_[0]); + if (start_no_limit) { + it->SeekToLast(); + } else { + it->SeekForPrev(key_start); + } + + while (it->Valid() && remain > 0 && (end_no_limit || it->key().compare(key_end) >= 0)) { + ParsedStreamMetaValue parsed_streams_meta_value(it->value()); + if (parsed_streams_meta_value.length() == 0) { + it->Prev(); + } else { + key = it->key().ToString(); + if (StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0) { + keys->push_back(key); + } + remain--; + it->Prev(); + } + } + + while (it->Valid() && (end_no_limit || it->key().compare(key_end) >= 0)) { + ParsedStreamMetaValue parsed_streams_meta_value(it->value()); + if (parsed_streams_meta_value.length() == 0) { + it->Prev(); + } else { + *next_key = it->key().ToString(); + break; + } + } + delete it; + return Status::OK(); +} + Status RedisStreams::Del(const Slice& key) { std::string meta_value; ScopeRecordLock l(lock_mgr_, key); diff --git a/src/storage/src/redis_streams.h b/src/storage/src/redis_streams.h index c36aa69d77..df4da61411 100644 --- a/src/storage/src/redis_streams.h +++ b/src/storage/src/redis_streams.h @@ -150,6 +150,10 @@ class RedisStreams : public Redis { Status ScanKeyNum(KeyInfo* keyinfo) override; Status ScanKeys(const std::string& pattern, std::vector* keys) override; Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override; + Status PKScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern, int32_t limit, + std::vector* keys, std::string* next_key); + Status PKRScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern, int32_t limit, + std::vector* keys, std::string* next_key); //===--------------------------------------------------------------------===// // Keys Commands diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index b326102b26..3dbb6c29b4 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -929,6 +929,22 @@ int64_t Storage::Scan(const DataType& dtype, int64_t cursor, const std::string& } } start_key = prefix; + case 'x': + is_finish = streams_db_->Scan(start_key, pattern, keys, &leftover_visits, &next_key); + if ((leftover_visits == 0) && !is_finish) { + cursor_ret = cursor + step_length; + StoreCursorStartKey(DataType::kStreams, cursor_ret, std::string("x") + next_key); + break; + } else if (is_finish) { + if (DataType::kStreams == dtype) { + cursor_ret = 0; + break; + } else if (leftover_visits == 0) { + cursor_ret = cursor + step_length; + StoreCursorStartKey(DataType::kStreams, cursor_ret, std::string("k") + prefix); + break; + } + } case 'z': is_finish = zsets_db_->Scan(start_key, pattern, keys, &leftover_visits, &next_key); if ((leftover_visits == 0) && !is_finish) { @@ -1079,6 +1095,9 @@ Status Storage::PKScanRange(const DataType& data_type, const Slice& key_start, c case DataType::kSets: s = sets_db_->PKScanRange(key_start, key_end, pattern, limit, keys, next_key); break; + case DataType::kStreams: + s = streams_db_->PKScanRange(key_start, key_end, pattern, limit, keys, next_key); + break; default: s = Status::Corruption("Unsupported data types"); break; @@ -1108,6 +1127,9 @@ Status Storage::PKRScanRange(const DataType& data_type, const Slice& key_start, case DataType::kSets: s = sets_db_->PKRScanRange(key_start, key_end, pattern, limit, keys, next_key); break; + case DataType::kStreams: + s = streams_db_->PKRScanRange(key_start, key_end, pattern, limit, keys, next_key); + break; default: s = Status::Corruption("Unsupported data types"); break; @@ -1161,6 +1183,9 @@ Status Storage::Scanx(const DataType& data_type, const std::string& start_key, c case DataType::kSets: sets_db_->Scan(start_key, pattern, keys, &count, next_key); break; + case DataType::kStreams: + streams_db_->Scan(start_key, pattern, keys, &count, next_key); + break; default: Status::Corruption("Unsupported data types"); break; @@ -1407,6 +1432,11 @@ Status Storage::Keys(const DataType& data_type, const std::string& pattern, std: if (!s.ok()) { return s; } + } else if (data_type == DataType::kStreams) { + s = streams_db_->ScanKeys(pattern, keys); + if (!s.ok()) { + return s; + } } else { s = strings_db_->ScanKeys(pattern, keys); if (!s.ok()) { @@ -1428,6 +1458,10 @@ Status Storage::Keys(const DataType& data_type, const std::string& pattern, std: if (!s.ok()) { return s; } + s = streams_db_->ScanKeys(pattern, keys); + if (!s.ok()) { + return s; + } } return s; } @@ -1648,6 +1682,9 @@ Status Storage::DoCompact(const DataType& type) { } else if (type == kLists) { current_task_type_ = Operation::kCleanLists; s = lists_db_->CompactRange(nullptr, nullptr); + } else if (type == kStreams) { + current_task_type_ = Operation::kCleanStreams; + s = streams_db_->CompactRange(nullptr, nullptr); } else { current_task_type_ = Operation::kCleanAll; s = strings_db_->CompactRange(nullptr, nullptr); @@ -1655,6 +1692,7 @@ Status Storage::DoCompact(const DataType& type) { s = sets_db_->CompactRange(nullptr, nullptr); s = zsets_db_->CompactRange(nullptr, nullptr); s = lists_db_->CompactRange(nullptr, nullptr); + s = streams_db_->CompactRange(nullptr, nullptr); } current_task_type_ = Operation::kNone; return s; @@ -1702,6 +1740,9 @@ Status Storage::DoCompactRange(const DataType& type, const std::string& start, c } else if (type == kLists) { s = lists_db_->CompactRange(&slice_meta_begin, &slice_meta_end, kMeta); s = lists_db_->CompactRange(&slice_data_begin, &slice_data_end, kData); + } else if (type == kStreams) { + s = streams_db_->CompactRange(&slice_meta_begin, &slice_meta_end, kMeta); + s = streams_db_->CompactRange(&slice_data_begin, &slice_data_end, kData); } return s; } @@ -1745,6 +1786,8 @@ std::string Storage::GetCurrentTaskType() { return "Set"; case kCleanLists: return "List"; + case kCleanStreams: + return "Stream"; case kNone: default: return "No"; diff --git a/tests/integration/stream_test.go b/tests/integration/stream_test.go index 82d47a8e1e..3c49ad9973 100644 --- a/tests/integration/stream_test.go +++ b/tests/integration/stream_test.go @@ -204,6 +204,32 @@ var _ = Describe("Stream Commands", func() { return atomic.LoadInt32(&messageCount) }).Should(Equal(int32(totalMessages))) }) + + It("should create a stream and find it using keys * command", func() { + Expect(client.Del(ctx, "mystream").Err()).NotTo(HaveOccurred()) + // Creating a stream and adding entries + _, err := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "mystream", + ID: "*", + Values: map[string]interface{}{"key1": "value1", "key2": "value2"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + // Using keys * to find all keys including the stream + keys, err := client.Keys(ctx, "*").Result() + Expect(err).NotTo(HaveOccurred()) + + // Checking if the stream 'mystream' exists in the returned keys + found := false + for _, key := range keys { + if key == "mystream" { + found = true + break + } + } + Expect(found).To(BeTrue(), "Stream 'mystream' should exist in keys") + }) + It("XADD wrong number of args", func() {