Skip to content

Commit

Permalink
fix:3.5 spop
Browse files Browse the repository at this point in the history
  • Loading branch information
brother-jin committed Jan 17, 2025
1 parent 230d72f commit 7c89623
Showing 1 changed file with 56 additions and 74 deletions.
130 changes: 56 additions & 74 deletions src/storage/src/redis_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -835,94 +835,76 @@ rocksdb::Status RedisSets::SMove(const Slice& source, const Slice& destination,
return s;
}

rocksdb::Status RedisSets::SPop(const Slice& key, std::vector<std::string>* members, int64_t cnt) {
std::default_random_engine engine;
rocksdb::Status RedisSets::SPop(const Slice& key, std::vector<std::string>* members, int64_t count) {
std::default_random_engine engine(std::random_device{}());

// Lock granularity optimization
ScopeRecordLock l(lock_mgr_, key);

std::string meta_value;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
auto status = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (!status.ok()) {
return status.IsNotFound() ? Status::NotFound() : status;
}

uint64_t start_us = pstd::NowMicros();
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedSetsMetaValue parsed_sets_meta_value(&meta_value);
if (parsed_sets_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_sets_meta_value.count() == 0) {
return Status::NotFound();
} else {
int32_t length = parsed_sets_meta_value.count();
if (length < cnt) {
int32_t size = parsed_sets_meta_value.count();
int32_t cur_index = 0;
int32_t version = parsed_sets_meta_value.version();
SetsMemberKey sets_member_key(key, version, Slice());
auto iter = db_->NewIterator(default_read_options_, handles_[1]);
for (iter->Seek(sets_member_key.Encode());
iter->Valid() && cur_index < size;
iter->Next(), cur_index++) {

batch.Delete(handles_[1], iter->key());
ParsedSetsMemberKey parsed_sets_member_key(iter->key());
members->push_back(parsed_sets_member_key.member().ToString());
ParsedSetsMetaValue parsed_sets_meta_value(&meta_value);
if (parsed_sets_meta_value.IsStale() || parsed_sets_meta_value.count() == 0) {
return Status::NotFound("Empty or stale set");
}

}
int32_t set_size = parsed_sets_meta_value.count();
if (set_size < count) {
count = set_size; // Limit to actual size
}

//parsed_sets_meta_value.ModifyCount(-cnt);
//batch.Put(handles_[0], key, meta_value);
batch.Delete(handles_[0], key);
delete iter;
int32_t version = parsed_sets_meta_value.version();
SetsMemberKey sets_member_key(key, version, Slice());

} else {
engine.seed(time(nullptr));
int32_t cur_index = 0;
int32_t size = parsed_sets_meta_value.count();
int32_t target_index = -1;
int32_t version = parsed_sets_meta_value.version();
std::unordered_set<int32_t> sets_index;
int32_t modnum = size;

for (int64_t cur_round = 0;
cur_round < cnt;
cur_round++) {
do {
target_index = static_cast<int32_t>( engine() % modnum);
} while (sets_index.find(target_index) != sets_index.end());
sets_index.insert(target_index);
}
// Optimize: Generate sorted random indices
std::unordered_set<int32_t> random_indices;
while (random_indices.size() < count) {
random_indices.insert(engine() % set_size);
}

SetsMemberKey sets_member_key(key, version, Slice());
int64_t del_count = 0;
KeyStatisticsDurationGuard guard(this, key.ToString());
auto iter = db_->NewIterator(default_read_options_, handles_[1]);
for (iter->Seek(sets_member_key.Encode());
iter->Valid() && cur_index < size;
iter->Next(), cur_index++) {
if (del_count == cnt) {
break;
}
if (sets_index.find(cur_index) != sets_index.end()) {
del_count++;
batch.Delete(handles_[1], iter->key());
ParsedSetsMemberKey parsed_sets_member_key(iter->key());
members->push_back(parsed_sets_member_key.member().ToString());
}
}
std::vector<int32_t> sorted_indices(random_indices.begin(), random_indices.end());
std::sort(sorted_indices.begin(), sorted_indices.end());

if (!parsed_sets_meta_value.CheckModifyCount(static_cast<int32_t>(-cnt))){
return Status::InvalidArgument("set size overflow");
}
parsed_sets_meta_value.ModifyCount(static_cast<int32_t>(-cnt));
batch.Put(handles_[0], key, meta_value);
delete iter;
auto iter = db_->NewIterator(default_read_options_, handles_[1]);
int32_t cur_index = 0;
int64_t deleted_count = 0;

for (int32_t target_index : sorted_indices) {
for (iter->Seek(sets_member_key.Encode());
iter->Valid() && cur_index <= target_index;
iter->Next(), ++cur_index) {

if (cur_index == target_index) {
batch.Delete(handles_[1], iter->key());
ParsedSetsMemberKey parsed_sets_member_key(iter->key());
members->push_back(parsed_sets_member_key.member().ToString());
deleted_count++;
break;
}
}
} else {
return s;
}
return db_->Write(default_write_options_, &batch);
delete iter;

// Update metadata
if (deleted_count > 0) {
parsed_sets_meta_value.ModifyCount(-deleted_count);
if (parsed_sets_meta_value.count() == 0) {
batch.Delete(handles_[0], key);
} else {
batch.Put(handles_[0], key, meta_value);
}
return db_->Write(default_write_options_, &batch);
}

return Status::InvalidArgument("No elements removed");
}


rocksdb::Status RedisSets::SRandmember(const Slice& key, int32_t count, std::vector<std::string>* members) {
if (count == 0) {
return rocksdb::Status::OK();
Expand Down

0 comments on commit 7c89623

Please sign in to comment.