Skip to content

Commit

Permalink
fix: Support some commands and API of blackwidow.
Browse files Browse the repository at this point in the history
* KEYS
* Storage::DoCompactRange()
* Storage::PKHScanRange()
* Storage::PKHRSranRange()
  • Loading branch information
KKorpse committed Jan 11, 2024
1 parent 5b43462 commit e560b89
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 1 deletion.
3 changes: 2 additions & 1 deletion include/pika_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
#ifndef PIKA_STREAM_H_
#define PIKA_STREAM_H_

#include "include/acl.h"
#include "include/pika_command.h"
#include "include/pika_slot.h"
#include "storage/storage.h"
#include "storage/src/redis_streams.h"
#include "storage/storage.h"

/*
* stream
Expand Down
2 changes: 2 additions & 0 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,8 @@ void KeysCmd::DoInitial() {
type_ = storage::DataType::kLists;
} else if (strcasecmp(opt.data(), "hash") == 0) {
type_ = storage::DataType::kHashes;
} else if (strcasecmp(opt.data(), "stream") == 0) {
type_ = storage::DataType::kStreams;
} else {
res_.SetRes(CmdRes::kSyntaxErr);
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ enum Operation {
kCleanZSets,
kCleanSets,
kCleanLists,
kCleanStreams,
kCompactRange
};

Expand Down
106 changes: 106 additions & 0 deletions src/storage/src/redis_streams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,112 @@ Status RedisStreams::PKPatternMatchDel(const std::string& pattern, int32_t* ret)
return s;
}

rocksdb::Status RedisStreams::PKScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern,
int32_t limit, std::vector<std::string>* keys, std::string* next_key) {
next_key->clear();

std::string key;
int32_t remain = limit;
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
iterator_options.snapshot = snapshot;
iterator_options.fill_cache = false;

bool start_no_limit = key_start.compare("") == 0;
bool end_no_limit = key_end.compare("") == 0;

if (!start_no_limit && !end_no_limit && (key_start.compare(key_end) > 0)) {
return rocksdb::Status::InvalidArgument("error in given range");
}

rocksdb::Iterator* it = db_->NewIterator(iterator_options, handles_[0]);
if (start_no_limit) {
it->SeekToFirst();
} else {
it->Seek(key_start);
}

while (it->Valid() && remain > 0 && (end_no_limit || it->key().compare(key_end) <= 0)) {
ParsedStreamMetaValue parsed_meta_value(it->value());
if (parsed_meta_value.length() == 0) {
it->Next();
} else {
key = it->key().ToString();
if (StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0) {
keys->push_back(key);
}
remain--;
it->Next();
}
}

while (it->Valid() && (end_no_limit || it->key().compare(key_end) <= 0)) {
ParsedStreamMetaValue parsed_meta_value(it->value());
if (parsed_meta_value.length() == 0) {
it->Next();
} else {
*next_key = it->key().ToString();
break;
}
}
delete it;
return rocksdb::Status::OK();
}

Status RedisStreams::PKRScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern, int32_t limit,
std::vector<std::string>* keys, std::string* next_key) {
next_key->clear();

std::string key;
int32_t remain = limit;
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
iterator_options.snapshot = snapshot;
iterator_options.fill_cache = false;

bool start_no_limit = key_start.compare("") == 0;
bool end_no_limit = key_end.compare("") == 0;

if (!start_no_limit && !end_no_limit && (key_start.compare(key_end) < 0)) {
return Status::InvalidArgument("error in given range");
}

rocksdb::Iterator* it = db_->NewIterator(iterator_options, handles_[0]);
if (start_no_limit) {
it->SeekToLast();
} else {
it->SeekForPrev(key_start);
}

while (it->Valid() && remain > 0 && (end_no_limit || it->key().compare(key_end) >= 0)) {
ParsedStreamMetaValue parsed_streams_meta_value(it->value());
if (parsed_streams_meta_value.length() == 0) {
it->Prev();
} else {
key = it->key().ToString();
if (StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0) {
keys->push_back(key);
}
remain--;
it->Prev();
}
}

while (it->Valid() && (end_no_limit || it->key().compare(key_end) >= 0)) {
ParsedStreamMetaValue parsed_streams_meta_value(it->value());
if (parsed_streams_meta_value.length() == 0) {
it->Prev();
} else {
*next_key = it->key().ToString();
break;
}
}
delete it;
return Status::OK();
}

Status RedisStreams::Del(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/redis_streams.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ class RedisStreams : public Redis {
Status ScanKeyNum(KeyInfo* keyinfo) override;
Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) override;
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override;
Status PKScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern, int32_t limit,
std::vector<std::string>* keys, std::string* next_key);
Status PKRScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern, int32_t limit,
std::vector<std::string>* keys, std::string* next_key);

//===--------------------------------------------------------------------===//
// Keys Commands
Expand Down
43 changes: 43 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,22 @@ int64_t Storage::Scan(const DataType& dtype, int64_t cursor, const std::string&
}
}
start_key = prefix;
case 'x':
is_finish = streams_db_->Scan(start_key, pattern, keys, &leftover_visits, &next_key);
if ((leftover_visits == 0) && !is_finish) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(DataType::kStreams, cursor_ret, std::string("x") + next_key);
break;
} else if (is_finish) {
if (DataType::kStreams == dtype) {
cursor_ret = 0;
break;
} else if (leftover_visits == 0) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(DataType::kStreams, cursor_ret, std::string("k") + prefix);
break;
}
}
case 'z':
is_finish = zsets_db_->Scan(start_key, pattern, keys, &leftover_visits, &next_key);
if ((leftover_visits == 0) && !is_finish) {
Expand Down Expand Up @@ -1079,6 +1095,9 @@ Status Storage::PKScanRange(const DataType& data_type, const Slice& key_start, c
case DataType::kSets:
s = sets_db_->PKScanRange(key_start, key_end, pattern, limit, keys, next_key);
break;
case DataType::kStreams:
s = streams_db_->PKScanRange(key_start, key_end, pattern, limit, keys, next_key);
break;
default:
s = Status::Corruption("Unsupported data types");
break;
Expand Down Expand Up @@ -1108,6 +1127,9 @@ Status Storage::PKRScanRange(const DataType& data_type, const Slice& key_start,
case DataType::kSets:
s = sets_db_->PKRScanRange(key_start, key_end, pattern, limit, keys, next_key);
break;
case DataType::kStreams:
s = streams_db_->PKRScanRange(key_start, key_end, pattern, limit, keys, next_key);
break;
default:
s = Status::Corruption("Unsupported data types");
break;
Expand Down Expand Up @@ -1161,6 +1183,9 @@ Status Storage::Scanx(const DataType& data_type, const std::string& start_key, c
case DataType::kSets:
sets_db_->Scan(start_key, pattern, keys, &count, next_key);
break;
case DataType::kStreams:
streams_db_->Scan(start_key, pattern, keys, &count, next_key);
break;
default:
Status::Corruption("Unsupported data types");
break;
Expand Down Expand Up @@ -1407,6 +1432,11 @@ Status Storage::Keys(const DataType& data_type, const std::string& pattern, std:
if (!s.ok()) {
return s;
}
} else if (data_type == DataType::kStreams) {
s = streams_db_->ScanKeys(pattern, keys);
if (!s.ok()) {
return s;
}
} else {
s = strings_db_->ScanKeys(pattern, keys);
if (!s.ok()) {
Expand All @@ -1428,6 +1458,10 @@ Status Storage::Keys(const DataType& data_type, const std::string& pattern, std:
if (!s.ok()) {
return s;
}
s = streams_db_->ScanKeys(pattern, keys);
if (!s.ok()) {
return s;
}
}
return s;
}
Expand Down Expand Up @@ -1648,13 +1682,17 @@ Status Storage::DoCompact(const DataType& type) {
} else if (type == kLists) {
current_task_type_ = Operation::kCleanLists;
s = lists_db_->CompactRange(nullptr, nullptr);
} else if (type == kStreams) {
current_task_type_ = Operation::kCleanStreams;
s = streams_db_->CompactRange(nullptr, nullptr);
} else {
current_task_type_ = Operation::kCleanAll;
s = strings_db_->CompactRange(nullptr, nullptr);
s = hashes_db_->CompactRange(nullptr, nullptr);
s = sets_db_->CompactRange(nullptr, nullptr);
s = zsets_db_->CompactRange(nullptr, nullptr);
s = lists_db_->CompactRange(nullptr, nullptr);
s = streams_db_->CompactRange(nullptr, nullptr);
}
current_task_type_ = Operation::kNone;
return s;
Expand Down Expand Up @@ -1702,6 +1740,9 @@ Status Storage::DoCompactRange(const DataType& type, const std::string& start, c
} else if (type == kLists) {
s = lists_db_->CompactRange(&slice_meta_begin, &slice_meta_end, kMeta);
s = lists_db_->CompactRange(&slice_data_begin, &slice_data_end, kData);
} else if (type == kStreams) {
s = streams_db_->CompactRange(&slice_meta_begin, &slice_meta_end, kMeta);
s = streams_db_->CompactRange(&slice_data_begin, &slice_data_end, kData);
}
return s;
}
Expand Down Expand Up @@ -1745,6 +1786,8 @@ std::string Storage::GetCurrentTaskType() {
return "Set";
case kCleanLists:
return "List";
case kCleanStreams:
return "Stream";
case kNone:
default:
return "No";
Expand Down
26 changes: 26 additions & 0 deletions tests/integration/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,32 @@ var _ = Describe("Stream Commands", func() {
return atomic.LoadInt32(&messageCount)
}).Should(Equal(int32(totalMessages)))
})

It("should create a stream and find it using keys * command", func() {
Expect(client.Del(ctx, "mystream").Err()).NotTo(HaveOccurred())
// Creating a stream and adding entries
_, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream",
ID: "*",
Values: map[string]interface{}{"key1": "value1", "key2": "value2"},
}).Result()
Expect(err).NotTo(HaveOccurred())

// Using keys * to find all keys including the stream
keys, err := client.Keys(ctx, "*").Result()
Expect(err).NotTo(HaveOccurred())

// Checking if the stream 'mystream' exists in the returned keys
found := false
for _, key := range keys {
if key == "mystream" {
found = true
break
}
}
Expect(found).To(BeTrue(), "Stream 'mystream' should exist in keys")
})



It("XADD wrong number of args", func() {
Expand Down

0 comments on commit e560b89

Please sign in to comment.