Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: change storage ttl time from seconds to milliseconds #2822

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
working-directory: ${{ github.workspace }}/build
# Execute tests defined by the CMake configuration.
# See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail
run: ctest -C ${{ env.BUILD_TYPE }}
run: ctest -C ${{ env.BUILD_TYPE }} --verbose

- name: Unit Test
working-directory: ${{ github.workspace }}
Expand Down Expand Up @@ -111,7 +111,7 @@ jobs:

- name: Test
working-directory: ${{ github.workspace }}/build
run: ctest --rerun-failed --output-on-failure -C ${{ env.BUILD_TYPE }}
run: ctest -C ${{ env.BUILD_TYPE }} --verbose

- name: Unit Test
working-directory: ${{ github.workspace }}
Expand Down
17 changes: 3 additions & 14 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1433,7 +1433,7 @@ void PttlCmd::DoInitial() {
}

void PttlCmd::Do() {
int64_t timestamp = db_->storage()->TTL(key_);
int64_t timestamp = db_->storage()->PTTL(key_);
if (timestamp == -3) {
res_.SetRes(CmdRes::kErrOther, "ttl internal error");
} else {
Expand All @@ -1442,19 +1442,8 @@ void PttlCmd::Do() {
}

void PttlCmd::ReadCache() {
int64_t timestamp = db_->cache()->TTL(key_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expire, expireat, pexpire, pexireat这些也需要修改吧。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这几个方法没有没有重写 ReadCche 方法,应该不用改动

if (timestamp == -3) {
res_.SetRes(CmdRes::kErrOther, "ttl internal error");
} else if (timestamp != -2) {
if (timestamp == -1) {
res_.AppendInteger(-1);
} else {
res_.AppendInteger(timestamp * 1000);
}
} else {
// mean this key not exist
res_.SetRes(CmdRes::kCacheMiss);
}
// redis cache don't support pttl cache, so read directly from db
DoThroughDB();
}

void PttlCmd::DoThroughDB() {
Expand Down
7 changes: 7 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,13 @@ class Storage {
// return > 0 TTL in seconds
int64_t TTL(const Slice& key);

// Returns the remaining time to live of a key that has a timeout.
// return -3 operation exception errors happen in database
// return -2 if the key does not exist
// return -1 if the key exists but has not associated expire
// return > 0 TTL in milliseconds
int64_t PTTL(const Slice& key);

// Reutrns the data all type of the key
// if single is true, the query will return the first one
Status GetType(const std::string& key, enum DataType& type);
Expand Down
7 changes: 2 additions & 5 deletions src/storage/src/base_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ class BaseMetaFilter : public rocksdb::CompactionFilter {
BaseMetaFilter() = default;
bool Filter(int level, const rocksdb::Slice& key, const rocksdb::Slice& value, std::string* new_value,
bool* value_changed) const override {
int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);
auto cur_time = static_cast<uint64_t>(unix_time);
auto cur_time = rocksdb::Env::Default()->NowMicros() / 1000;
/*
* For the filtering of meta information, because the field designs of string
* and list are different, their filtering policies are written separately.
Expand Down Expand Up @@ -181,8 +179,7 @@ class BaseDataFilter : public rocksdb::CompactionFilter {
return true;
}

int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);
int64_t unix_time = rocksdb::Env::Default()->NowMicros() / 1000;
if (cur_meta_etime_ != 0 && cur_meta_etime_ < static_cast<uint64_t>(unix_time)) {
TRACE("Drop[Timeout]");
return true;
Expand Down
5 changes: 2 additions & 3 deletions src/storage/src/base_meta_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BaseMetaValue : public InternalValue {
}

uint64_t UpdateVersion() {
int64_t unix_time = pstd::NowMicros() / 1000000;
int64_t unix_time = rocksdb::Env::Default()->NowMicros() / 1000;
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
if (version_ >= unix_time) {
version_++;
} else {
Expand Down Expand Up @@ -171,8 +171,7 @@ class ParsedBaseMetaValue : public ParsedInternalValue {
}

uint64_t UpdateVersion() {
int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);
int64_t unix_time = rocksdb::Env::Default()->NowMicros() / 1000;
if (version_ >= static_cast<uint64_t>(unix_time)) {
version_++;
} else {
Expand Down
13 changes: 5 additions & 8 deletions src/storage/src/base_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ class InternalValue {
void SetEtime(uint64_t etime = 0) { etime_ = etime; }
void setCtime(uint64_t ctime) { ctime_ = ctime; }
rocksdb::Status SetRelativeTimestamp(int64_t ttl) {
int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);
etime_ = uint64_t(unix_time + ttl);
int64_t unix_time = rocksdb::Env::Default()->NowMicros() / 1000;
etime_ = uint64_t(unix_time) + ttl * 1000;
return rocksdb::Status::OK();
}
void SetVersion(uint64_t version = 0) { version_ = version; }
Expand Down Expand Up @@ -123,9 +122,8 @@ class ParsedInternalValue {
}

void SetRelativeTimestamp(int64_t ttl) {
int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);
etime_ = unix_time + ttl;
int64_t unix_time = rocksdb::Env::Default()->NowMicros() / 1000;
etime_ = unix_time + ttl * 1000;
SetEtimeToValue();
}

Expand All @@ -135,8 +133,7 @@ class ParsedInternalValue {
if (etime_ == 0) {
return false;
}
int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);
int64_t unix_time = rocksdb::Env::Default()->NowMicros() / 1000;
return etime_ < unix_time;
}

Expand Down
3 changes: 1 addition & 2 deletions src/storage/src/lists_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ class ListsDataFilter : public rocksdb::CompactionFilter {
return true;
}

int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);
int64_t unix_time = rocksdb::Env::Default()->NowMicros() / 1000;
if (cur_meta_etime_ != 0 && cur_meta_etime_ < static_cast<uint64_t>(unix_time)) {
TRACE("Drop[Timeout]");
return true;
Expand Down
6 changes: 2 additions & 4 deletions src/storage/src/lists_meta_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ class ListsMetaValue : public InternalValue {
}

uint64_t UpdateVersion() {
int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);
int64_t unix_time = rocksdb::Env::Default()->NowMicros() / 1000;
if (version_ >= static_cast<uint64_t>(unix_time)) {
version_++;
} else {
Expand Down Expand Up @@ -198,8 +197,7 @@ class ParsedListsMetaValue : public ParsedInternalValue {
}

uint64_t UpdateVersion() {
int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);
int64_t unix_time = rocksdb::Env::Default()->NowMicros() / 1000;
if (version_ >= static_cast<uint64_t>(unix_time)) {
version_++;
} else {
Expand Down
9 changes: 3 additions & 6 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ Status Redis::ScanHashesKeyNum(KeyInfo* key_info) {
iterator_options.snapshot = snapshot;
iterator_options.fill_cache = false;

int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;

rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Expand Down Expand Up @@ -253,8 +252,7 @@ Status Redis::HGetallWithTTL(const Slice& key, std::vector<FieldValue>* fvs, int
if (*ttl == 0) {
*ttl = -1;
} else {
int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;
*ttl = *ttl - curtime >= 0 ? *ttl - curtime : -2;
}

Expand Down Expand Up @@ -1345,8 +1343,7 @@ Status Redis::HashesTTL(const Slice& key, int64_t* timestamp, std::string&& pref
if (*timestamp == 0) {
*timestamp = -1;
} else {
int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;
*timestamp = *timestamp - curtime >= 0 ? *timestamp - curtime : -2;
}
}
Expand Down
9 changes: 3 additions & 6 deletions src/storage/src/redis_lists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ Status Redis::ScanListsKeyNum(KeyInfo* key_info) {
iterator_options.snapshot = snapshot;
iterator_options.fill_cache = false;

int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;

rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Expand Down Expand Up @@ -503,8 +502,7 @@ Status Redis::LRangeWithTTL(const Slice& key, int64_t start, int64_t stop, std::
if (*ttl == 0) {
*ttl = -1;
} else {
int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;
*ttl = *ttl - curtime >= 0 ? *ttl - curtime : -2;
}

Expand Down Expand Up @@ -1287,8 +1285,7 @@ Status Redis::ListsTTL(const Slice& key, int64_t* timestamp, std::string&& prefe
if (*timestamp == 0) {
*timestamp = -1;
} else {
int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;
*timestamp = *timestamp - curtime >= 0 ? *timestamp - curtime : -2;
}
}
Expand Down
9 changes: 3 additions & 6 deletions src/storage/src/redis_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ rocksdb::Status Redis::ScanSetsKeyNum(KeyInfo* key_info) {
iterator_options.snapshot = snapshot;
iterator_options.fill_cache = false;

int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;

rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Expand Down Expand Up @@ -775,8 +774,7 @@ Status Redis::SMembersWithTTL(const Slice& key,
if (*ttl == 0) {
*ttl = -1;
} else {
int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;
*ttl = *ttl - curtime >= 0 ? *ttl - curtime : -2;
}

Expand Down Expand Up @@ -1596,8 +1594,7 @@ rocksdb::Status Redis::SetsTTL(const Slice& key, int64_t* timestamp, std::string
if (*timestamp == 0) {
*timestamp = -1;
} else {
int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;
*timestamp = *timestamp - curtime >= 0 ? *timestamp - curtime : -2;
}
}
Expand Down
3 changes: 0 additions & 3 deletions src/storage/src/redis_streams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,6 @@ Status Redis::ScanStreamsKeyNum(KeyInfo* key_info) {
iterator_options.snapshot = snapshot;
iterator_options.fill_cache = false;

int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);

rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
if (!ExpectedMetaValue(DataType::kStreams, iter->value().ToString())) {
Expand Down
12 changes: 4 additions & 8 deletions src/storage/src/redis_strings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ Status Redis::ScanStringsKeyNum(KeyInfo* key_info) {
iterator_options.snapshot = snapshot;
iterator_options.fill_cache = false;

int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;

// Note: This is a string type and does not need to pass the column family as
// a parameter, use the default column family
Expand Down Expand Up @@ -378,8 +377,7 @@ void ClearValueAndSetTTL(std::string* value, int64_t* ttl, int64_t ttl_value) {
}

int64_t CalculateTTL(int64_t expiry_time) {
int64_t current_time;
rocksdb::Env::Default()->GetCurrentTime(&current_time);
int64_t current_time = rocksdb::Env::Default()->NowMicros() / 1000;
return expiry_time - current_time >= 0 ? expiry_time - current_time : -2;
}

Expand Down Expand Up @@ -555,8 +553,7 @@ Status Redis::GetrangeWithValue(const Slice& key, int64_t start_offset, int64_t
if (*ttl == 0) {
*ttl = -1;
} else {
int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;
*ttl = *ttl - curtime >= 0 ? *ttl - curtime : -2;
}

Expand Down Expand Up @@ -1469,8 +1466,7 @@ Status Redis::StringsTTL(const Slice& key, int64_t* timestamp, std::string&& pre
if (*timestamp == 0) {
*timestamp = -1;
} else {
int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;
*timestamp = *timestamp - curtime >= 0 ? *timestamp - curtime : -2;
}
}
Expand Down
9 changes: 3 additions & 6 deletions src/storage/src/redis_zsets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ Status Redis::ScanZsetsKeyNum(KeyInfo* key_info) {
iterator_options.snapshot = snapshot;
iterator_options.fill_cache = false;

int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;

rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Expand Down Expand Up @@ -568,8 +567,7 @@ Status Redis::ZRangeWithTTL(const Slice& key, int32_t start, int32_t stop, std::
if (*ttl == 0) {
*ttl = -1;
} else {
int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;
*ttl = *ttl - curtime >= 0 ? *ttl - curtime : -2;
}

Expand Down Expand Up @@ -1945,8 +1943,7 @@ Status Redis::ZsetsTTL(const Slice& key, int64_t* timestamp, std::string&& prefe
if (*timestamp == 0) {
*timestamp = -1;
} else {
int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int64_t curtime = rocksdb::Env::Default()->NowMicros() / 1000;
*timestamp = *timestamp - curtime >= 0 ? *timestamp - curtime : -2;
}
}
Expand Down
20 changes: 15 additions & 5 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ Status Storage::StoreCursorStartKey(const DataType& dtype, int64_t cursor, char
return cursors_store_->Insert(index_key, index_value);
}

std::unique_ptr<Redis>& Storage::GetDBInstance(const Slice& key) {
return GetDBInstance(key.ToString());
}
std::unique_ptr<Redis>& Storage::GetDBInstance(const Slice& key) { return GetDBInstance(key.ToString()); }

std::unique_ptr<Redis>& Storage::GetDBInstance(const std::string& key) {
auto inst_index = slot_indexer_->GetInstanceID(GetSlotID(slot_num_, key));
Expand Down Expand Up @@ -1470,7 +1468,7 @@ int32_t Storage::Persist(const Slice& key) {
return count;
}

int64_t Storage::TTL(const Slice& key) {
int64_t Storage::PTTL(const Slice& key) {
int64_t timestamp = 0;
auto& inst = GetDBInstance(key);
Status s = inst->TTL(key, &timestamp);
Expand All @@ -1482,6 +1480,18 @@ int64_t Storage::TTL(const Slice& key) {
return timestamp;
}

int64_t Storage::TTL(const Slice& key) {
int64_t timestamp = 0;
auto& inst = GetDBInstance(key);
Status s = inst->TTL(key, &timestamp);
if (s.ok() || s.IsNotFound()) {
return timestamp > 0 ? timestamp / 1000 : timestamp;
} else if (!s.IsNotFound()) {
return -3;
}
return timestamp > 0 ? timestamp / 1000 : timestamp;
}

Status Storage::GetType(const std::string& key, enum DataType& type) {
auto& inst = GetDBInstance(key);
inst->GetType(key, type);
Expand Down Expand Up @@ -1775,7 +1785,7 @@ Status Storage::SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys) {
}

Status Storage::SetSmallCompactionThreshold(uint32_t small_compaction_threshold) {
for (const auto& inst: insts_) {
for (const auto& inst : insts_) {
inst->SetSmallCompactionThreshold(small_compaction_threshold);
}
return Status::OK();
Expand Down
3 changes: 1 addition & 2 deletions src/storage/src/strings_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ class StringsFilter : public rocksdb::CompactionFilter {
StringsFilter() = default;
bool Filter(int level, const rocksdb::Slice& key, const rocksdb::Slice& value, std::string* new_value,
bool* value_changed) const override {
int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);
int64_t unix_time = rocksdb::Env::Default()->NowMicros() / 1000;
auto cur_time = static_cast<uint64_t>(unix_time);
ParsedStringsValue parsed_strings_value(value);
TRACE("==========================START==========================");
Expand Down
Loading
Loading