From 81f5c98331fa1d943139bb54e74c231e9e309324 Mon Sep 17 00:00:00 2001 From: cjh <1271435567@qq.com> Date: Thu, 8 Aug 2024 17:07:28 +0800 Subject: [PATCH] Revert "fix: change storage ttl time from seconds to milliseconds (#2822)" This reverts commit 645da7ea98af4e293fb8d770ac6d201c44de952c. --- .github/workflows/pika.yml | 4 +- include/pika_cache.h | 2 +- include/pika_kv.h | 25 ++-- src/net/examples/performance/server.cc | 2 +- src/pika_admin.cc | 8 +- src/pika_bit.cc | 2 +- src/pika_cache.cc | 4 +- src/pika_cache_load_thread.cc | 6 +- src/pika_conf.cc | 4 +- src/pika_geo.cc | 2 +- src/pika_kv.cc | 144 +++++++++++++--------- src/pika_list.cc | 2 +- src/pika_server.cc | 16 +-- src/pika_zset.cc | 10 +- src/pstd/include/env.h | 7 +- src/pstd/src/env.cc | 7 +- src/storage/include/storage/storage.h | 43 +++---- src/storage/src/base_data_value_format.h | 12 +- src/storage/src/base_filter.h | 9 +- src/storage/src/base_meta_value_format.h | 55 +++------ src/storage/src/base_value_format.h | 19 +-- src/storage/src/lists_filter.h | 3 +- src/storage/src/lists_meta_value_format.h | 50 ++------ src/storage/src/redis.h | 60 ++++----- src/storage/src/redis_hashes.cc | 61 ++++----- src/storage/src/redis_lists.cc | 47 +++---- src/storage/src/redis_sets.cc | 59 +++++---- src/storage/src/redis_streams.cc | 3 + src/storage/src/redis_strings.cc | 138 +++++++++++---------- src/storage/src/redis_zsets.cc | 63 +++++----- src/storage/src/storage.cc | 95 ++++++-------- src/storage/src/strings_filter.h | 3 +- src/storage/src/strings_value_format.h | 44 ++----- src/storage/src/zsets_filter.h | 3 +- src/storage/tests/keys_test.cc | 3 +- src/storage/tests/lists_filter_test.cc | 4 +- src/storage/tests/strings_filter_test.cc | 2 +- src/storage/tests/strings_test.cc | 28 ++--- 38 files changed, 496 insertions(+), 553 deletions(-) diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index 7d42a8b6c0..aa7a877c03 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -59,7 +59,7 @@ jobs: working-directory: ${{ github.workspace }}/build # Execute tests defined by the CMake configuration. # See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail - run: ctest -C ${{ env.BUILD_TYPE }} --verbose + run: ctest -C ${{ env.BUILD_TYPE }} - name: Unit Test working-directory: ${{ github.workspace }} @@ -192,7 +192,7 @@ jobs: - name: Test working-directory: ${{ github.workspace }}/build - run: ctest -C ${{ env.BUILD_TYPE }} --verbose + run: ctest --rerun-failed --output-on-failure -C ${{ env.BUILD_TYPE }} - name: Unit Test working-directory: ${{ github.workspace }} diff --git a/include/pika_cache.h b/include/pika_cache.h index d82627ced7..5073b1b174 100644 --- a/include/pika_cache.h +++ b/include/pika_cache.h @@ -67,7 +67,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this< rocksdb::Status Del(const std::vector& keys); rocksdb::Status Expire(std::string& key, int64_t ttl); - rocksdb::Status Expireat(std::string& key, int64_t ttl_sec); + rocksdb::Status Expireat(std::string& key, int64_t ttl); rocksdb::Status TTL(std::string& key, int64_t* ttl); rocksdb::Status Persist(std::string& key); rocksdb::Status Type(std::string& key, std::string* value); diff --git a/include/pika_kv.h b/include/pika_kv.h index 84a526f0dc..a02f6b1a4c 100644 --- a/include/pika_kv.h +++ b/include/pika_kv.h @@ -36,12 +36,12 @@ class SetCmd : public Cmd { std::string value_; std::string target_; int32_t success_ = 0; - int64_t ttl_millsec = 0; + int64_t sec_ = 0; bool has_ttl_ = false; SetCmd::SetCondition condition_{kNONE}; void DoInitial() override; void Clear() override { - ttl_millsec = 0; + sec_ = 0; success_ = 0; condition_ = kNONE; } @@ -69,7 +69,7 @@ class GetCmd : public Cmd { private: std::string key_; std::string value_; - int64_t ttl_millsec_ = 0; + int64_t sec_ = 0; void DoInitial() override; rocksdb::Status s_; }; @@ -351,7 +351,7 @@ class SetexCmd : public Cmd { private: std::string key_; - int64_t ttl_sec_ = 0; + int64_t sec_ = 0; std::string value_; void DoInitial() override; rocksdb::Status s_; @@ -376,7 +376,7 @@ class PsetexCmd : public Cmd { private: std::string key_; - int64_t ttl_millsec = 0; + int64_t usec_ = 0; std::string value_; void DoInitial() override; rocksdb::Status s_; @@ -540,7 +540,7 @@ class StrlenCmd : public Cmd { private: std::string key_; std::string value_; - int64_t ttl_millsec = 0; + int64_t sec_ = 0; void DoInitial() override; rocksdb::Status s_; }; @@ -581,7 +581,7 @@ class ExpireCmd : public Cmd { private: std::string key_; - int64_t ttl_sec_ = 0; + int64_t sec_ = 0; void DoInitial() override; std::string ToRedisProtocol() override; rocksdb::Status s_; @@ -605,7 +605,7 @@ class PexpireCmd : public Cmd { private: std::string key_; - int64_t ttl_millsec = 0; + int64_t msec_ = 0; void DoInitial() override; std::string ToRedisProtocol() override; rocksdb::Status s_; @@ -629,7 +629,7 @@ class ExpireatCmd : public Cmd { private: std::string key_; - int64_t time_stamp_sec_ = 0; + int64_t time_stamp_ = 0; void DoInitial() override; rocksdb::Status s_; }; @@ -652,9 +652,10 @@ class PexpireatCmd : public Cmd { private: std::string key_; - int64_t time_stamp_millsec_ = 0; + int64_t time_stamp_ms_ = 0; void DoInitial() override; rocksdb::Status s_; + std::string ToRedisProtocol() override; }; class TtlCmd : public Cmd { @@ -809,9 +810,9 @@ class PKSetexAtCmd : public Cmd { private: std::string key_; std::string value_; - int64_t time_stamp_sec_ = 0; + int64_t time_stamp_ = 0; void DoInitial() override; - void Clear() override { time_stamp_sec_ = 0; } + void Clear() override { time_stamp_ = 0; } rocksdb::Status s_; }; diff --git a/src/net/examples/performance/server.cc b/src/net/examples/performance/server.cc index ce70abddcc..5b7b65cbc7 100644 --- a/src/net/examples/performance/server.cc +++ b/src/net/examples/performance/server.cc @@ -86,7 +86,7 @@ int main(int argc, char* argv[]) { std::unique_ptr st_thread(NewDispatchThread(ip, port, 24, &conn_factory, 1000)); st_thread->StartThread(); - pstd::TimeType st, ed; + uint64_t st, ed; while (!should_stop) { st = NowMicros(); diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 9cefd99d0a..33d7ccfd61 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -2765,8 +2765,8 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { "The rsync rate limit now is " << new_throughput_limit << "(Which Is Around " << (new_throughput_limit >> 20) << " MB/s)"; res_.AppendStringRaw("+OK\r\n"); - } else if (set_item == "rsync-timeout-ms") { - if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) { + } else if(set_item == "rsync-timeout-ms"){ + if(pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0){ res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rsync-timeout-ms'\r\n"); return; } @@ -2961,9 +2961,9 @@ void DbsizeCmd::Do() { if (!dbs) { res_.SetRes(CmdRes::kInvalidDB); } else { - if (g_pika_conf->slotmigrate()) { + if (g_pika_conf->slotmigrate()){ int64_t dbsize = 0; - for (int i = 0; i < g_pika_conf->default_slot_num(); ++i) { + for (int i = 0; i < g_pika_conf->default_slot_num(); ++i){ int32_t card = 0; rocksdb::Status s = dbs->storage()->SCard(SlotKeyPrefix+std::to_string(i), &card); if (s.ok() && card >= 0) { diff --git a/src/pika_bit.cc b/src/pika_bit.cc index ee48d0ba5f..2367364054 100644 --- a/src/pika_bit.cc +++ b/src/pika_bit.cc @@ -114,7 +114,7 @@ void BitGetCmd::DoThroughDB() { Do(); } -void BitGetCmd::DoUpdateCache() { +void BitGetCmd::DoUpdateCache(){ if (s_.ok()) { db_->cache()->PushKeyToAsyncLoadQueue(PIKA_KEY_TYPE_KV, key_, db_); } diff --git a/src/pika_cache.cc b/src/pika_cache.cc index b7d1f45eb1..d6dff559de 100644 --- a/src/pika_cache.cc +++ b/src/pika_cache.cc @@ -126,10 +126,10 @@ Status PikaCache::Expire(std::string& key, int64_t ttl) { return caches_[cache_index]->Expire(key, ttl); } -Status PikaCache::Expireat(std::string& key, int64_t ttl_sec) { +Status PikaCache::Expireat(std::string& key, int64_t ttl) { int cache_index = CacheIndex(key); std::lock_guard lm(*cache_mutexs_[cache_index]); - return caches_[cache_index]->Expireat(key, ttl_sec); + return caches_[cache_index]->Expireat(key, ttl); } Status PikaCache::TTL(std::string& key, int64_t *ttl) { diff --git a/src/pika_cache_load_thread.cc b/src/pika_cache_load_thread.cc index f9bb040a40..b2205a7d49 100644 --- a/src/pika_cache_load_thread.cc +++ b/src/pika_cache_load_thread.cc @@ -113,13 +113,13 @@ bool PikaCacheLoadThread::LoadSet(std::string& key, const std::shared_ptr& d } std::vector values; - int64_t ttl_millsec = -1; - rocksdb::Status s = db->storage()->SMembersWithTTL(key, &values, &ttl_millsec); + int64_t ttl = -1; + rocksdb::Status s = db->storage()->SMembersWithTTL(key, &values, &ttl); if (!s.ok()) { LOG(WARNING) << "load set failed, key=" << key; return false; } - db->cache()->WriteSetToCache(key, values, ttl_millsec > 0 ? ttl_millsec / 1000 : ttl_millsec); + db->cache()->WriteSetToCache(key, values, ttl); return true; } diff --git a/src/pika_conf.cc b/src/pika_conf.cc index ab457ab3f5..afd05589ff 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -184,7 +184,7 @@ int PikaConf::Load() { std::string admin_cmd_list; GetConfStr("admin-cmd-list", &admin_cmd_list); - if (admin_cmd_list == "") { + if (admin_cmd_list == ""){ admin_cmd_list = "info, monitor, ping"; SetAdminCmd(admin_cmd_list); } @@ -713,7 +713,7 @@ int PikaConf::Load() { int64_t tmp_rsync_timeout_ms = -1; GetConfInt64("rsync-timeout-ms", &tmp_rsync_timeout_ms); - if (tmp_rsync_timeout_ms <= 0) { + if(tmp_rsync_timeout_ms <= 0){ rsync_timeout_ms_.store(1000); } else { rsync_timeout_ms_.store(tmp_rsync_timeout_ms); diff --git a/src/pika_geo.cc b/src/pika_geo.cc index 7e7575eca1..acb7d38dcb 100644 --- a/src/pika_geo.cc +++ b/src/pika_geo.cc @@ -366,7 +366,7 @@ static void GetAllNeighbors(const std::shared_ptr& db, std::string& key, Geo int32_t count = 0; int32_t card = db->storage()->Exists({range.storekey}); if (card) { - if (db->storage()->Del({range.storekey}) > 0) { + if (db->storage()->Del({range.storekey}) > 0){ db->cache()->Del({range.storekey}); } } diff --git a/src/pika_kv.cc b/src/pika_kv.cc index 663c7df8b1..abcf0e53d8 100644 --- a/src/pika_kv.cc +++ b/src/pika_kv.cc @@ -22,7 +22,7 @@ void SetCmd::DoInitial() { key_ = argv_[1]; value_ = argv_[2]; condition_ = SetCmd::kNONE; - ttl_millsec = 0; + sec_ = 0; size_t index = 3; while (index != argv_.size()) { std::string opt = argv_[index]; @@ -46,13 +46,13 @@ void SetCmd::DoInitial() { res_.SetRes(CmdRes::kSyntaxErr); return; } - if (pstd::string2int(argv_[index].data(), argv_[index].size(), &ttl_millsec) == 0) { + if (pstd::string2int(argv_[index].data(), argv_[index].size(), &sec_) == 0) { res_.SetRes(CmdRes::kInvalidInt); return; } - if (strcasecmp(opt.data(), "ex") == 0) { - ttl_millsec *= 1000; + if (strcasecmp(opt.data(), "px") == 0) { + sec_ /= 1000; } has_ttl_ = true; } else { @@ -67,16 +67,16 @@ void SetCmd::Do() { int32_t res = 1; switch (condition_) { case SetCmd::kXX: - s_ = db_->storage()->Setxx(key_, value_, &res, ttl_millsec); + s_ = db_->storage()->Setxx(key_, value_, &res, sec_); break; case SetCmd::kNX: - s_ = db_->storage()->Setnx(key_, value_, &res, ttl_millsec); + s_ = db_->storage()->Setnx(key_, value_, &res, sec_); break; case SetCmd::kVX: - s_ = db_->storage()->Setvx(key_, target_, value_, &success_, ttl_millsec); + s_ = db_->storage()->Setvx(key_, target_, value_, &success_, sec_); break; case SetCmd::kEXORPX: - s_ = db_->storage()->Setex(key_, value_, ttl_millsec); + s_ = db_->storage()->Setex(key_, value_, sec_); break; default: s_ = db_->storage()->Set(key_, value_); @@ -111,7 +111,7 @@ void SetCmd::DoUpdateCache() { } if (s_.ok()) { if (has_ttl_) { - db_->cache()->Setxx(key_, value_, ttl_millsec / 1000); + db_->cache()->Setxx(key_, value_, sec_); } else { db_->cache()->SetxxWithoutTTL(key_, value_); } @@ -133,9 +133,7 @@ std::string SetCmd::ToRedisProtocol() { RedisAppendContent(content, key_); // time_stamp char buf[100]; - - // TODO 精度损失 - auto time_stamp = time(nullptr) + ttl_millsec / 1000; + auto time_stamp = time(nullptr) + sec_; pstd::ll2string(buf, 100, time_stamp); std::string at(buf); RedisAppendLenUint64(content, at.size(), "$"); @@ -158,7 +156,7 @@ void GetCmd::DoInitial() { } void GetCmd::Do() { - s_ = db_->storage()->GetWithTTL(key_, &value_, &ttl_millsec_); + s_ = db_->storage()->GetWithTTL(key_, &value_, &sec_); if (s_.ok()) { res_.AppendStringLenUint64(value_.size()); res_.AppendContent(value_); @@ -188,7 +186,7 @@ void GetCmd::DoThroughDB() { void GetCmd::DoUpdateCache() { if (s_.ok()) { - db_->cache()->WriteKVToCache(key_, value_, ttl_millsec_ / 1000); + db_->cache()->WriteKVToCache(key_, value_, sec_); } } @@ -571,7 +569,7 @@ void AppendCmd::Do() { } } -void AppendCmd::DoThroughDB() { +void AppendCmd::DoThroughDB(){ Do(); } @@ -634,7 +632,7 @@ void MgetCmd::AssembleResponseFromCache() { void MgetCmd::Do() { // Without using the cache and querying only the DB, we need to use keys_. // This line will only be assigned when querying the DB directly. - if (cache_miss_keys_.size() == 0) { + if(cache_miss_keys_.size() == 0) { cache_miss_keys_ = keys_; } db_value_status_array_.clear(); @@ -706,8 +704,7 @@ void MgetCmd::DoUpdateCache() { size_t db_index = 0; for (const auto key : cache_miss_keys_) { if (db_index < db_value_status_array_.size() && db_value_status_array_[db_index].status.ok()) { - int64_t ttl_millsec = db_value_status_array_[db_index].ttl_millsec; - db_->cache()->WriteKVToCache(const_cast(key), db_value_status_array_[db_index].value, ttl_millsec > 0 ? ttl_millsec / 1000 : ttl_millsec); + db_->cache()->WriteKVToCache(const_cast(key), db_value_status_array_[db_index].value, db_value_status_array_[db_index].ttl); } db_index++; } @@ -841,7 +838,7 @@ void SetexCmd::DoInitial() { return; } key_ = argv_[1]; - if (pstd::string2int(argv_[2].data(), argv_[2].size(), &ttl_sec_) == 0) { + if (pstd::string2int(argv_[2].data(), argv_[2].size(), &sec_) == 0) { res_.SetRes(CmdRes::kInvalidInt); return; } @@ -849,7 +846,7 @@ void SetexCmd::DoInitial() { } void SetexCmd::Do() { - s_ = db_->storage()->Setex(key_, value_, ttl_sec_ * 1000); + s_ = db_->storage()->Setex(key_, value_, sec_); if (s_.ok()) { res_.SetRes(CmdRes::kOk); AddSlotKey("k", key_, db_); @@ -866,7 +863,7 @@ void SetexCmd::DoThroughDB() { void SetexCmd::DoUpdateCache() { if (s_.ok()) { - db_->cache()->Setxx(key_, value_, ttl_sec_); + db_->cache()->Setxx(key_, value_, sec_); } } @@ -884,7 +881,7 @@ std::string SetexCmd::ToRedisProtocol() { RedisAppendContent(content, key_); // time_stamp char buf[100]; - auto time_stamp = time(nullptr) + ttl_sec_; + auto time_stamp = time(nullptr) + sec_; pstd::ll2string(buf, 100, time_stamp); std::string at(buf); RedisAppendLenUint64(content, at.size(), "$"); @@ -901,7 +898,7 @@ void PsetexCmd::DoInitial() { return; } key_ = argv_[1]; - if (pstd::string2int(argv_[2].data(), argv_[2].size(), &ttl_millsec) == 0) { + if (pstd::string2int(argv_[2].data(), argv_[2].size(), &usec_) == 0) { res_.SetRes(CmdRes::kInvalidInt); return; } @@ -909,7 +906,7 @@ void PsetexCmd::DoInitial() { } void PsetexCmd::Do() { - s_ = db_->storage()->Setex(key_, value_, ttl_millsec); + s_ = db_->storage()->Setex(key_, value_, usec_ / 1000); if (s_.ok()) { res_.SetRes(CmdRes::kOk); } else if (s_.IsInvalidArgument()) { @@ -925,7 +922,7 @@ void PsetexCmd::DoThroughDB() { void PsetexCmd::DoUpdateCache() { if (s_.ok()) { - db_->cache()->Setxx(key_, value_, ttl_millsec / 1000); + db_->cache()->Setxx(key_, value_, usec_ / 1000); } } @@ -943,7 +940,7 @@ std::string PsetexCmd::ToRedisProtocol() { RedisAppendContent(content, key_); // time_stamp char buf[100]; - auto time_stamp = pstd::NowMillis() + ttl_millsec; + auto time_stamp = time(nullptr) + usec_ / 1000; pstd::ll2string(buf, 100, time_stamp); std::string at(buf); RedisAppendLenUint64(content, at.size(), "$"); @@ -1050,7 +1047,7 @@ void MsetCmd::DoBinlog() { set_argv[0] = "set"; set_cmd_->SetConn(GetConn()); set_cmd_->SetResp(resp_.lock()); - for(auto& kv: kvs_) { + for(auto& kv: kvs_){ set_argv[1] = kv.key; set_argv[2] = kv.value; set_cmd_->Initial(set_argv, db_name_); @@ -1238,7 +1235,7 @@ void StrlenCmd::ReadCache() { void StrlenCmd::DoThroughDB() { res_.clear(); - s_ = db_->storage()->GetWithTTL(key_, &value_, &ttl_millsec); + s_ = db_->storage()->GetWithTTL(key_, &value_, &sec_); if (s_.ok() || s_.IsNotFound()) { res_.AppendInteger(value_.size()); } else { @@ -1248,7 +1245,7 @@ void StrlenCmd::DoThroughDB() { void StrlenCmd::DoUpdateCache() { if (s_.ok()) { - db_->cache()->WriteKVToCache(key_, value_, ttl_millsec > 0 ? ttl_millsec : ttl_millsec / 1000); + db_->cache()->WriteKVToCache(key_, value_, sec_); } } @@ -1305,14 +1302,14 @@ void ExpireCmd::DoInitial() { return; } key_ = argv_[1]; - if (pstd::string2int(argv_[2].data(), argv_[2].size(), &ttl_sec_) == 0) { + if (pstd::string2int(argv_[2].data(), argv_[2].size(), &sec_) == 0) { res_.SetRes(CmdRes::kInvalidInt); return; } } void ExpireCmd::Do() { - int32_t res = db_->storage()->Expire(key_, ttl_sec_ * 1000); + int32_t res = db_->storage()->Expire(key_, sec_); if (res != -1) { res_.AppendInteger(res); s_ = rocksdb::Status::OK(); @@ -1336,7 +1333,7 @@ std::string ExpireCmd::ToRedisProtocol() { RedisAppendContent(content, key_); // sec char buf[100]; - int64_t expireat = time(nullptr) + ttl_sec_; + int64_t expireat = time(nullptr) + sec_; pstd::ll2string(buf, 100, expireat); std::string at(buf); RedisAppendLenUint64(content, at.size(), "$"); @@ -1350,7 +1347,7 @@ void ExpireCmd::DoThroughDB() { void ExpireCmd::DoUpdateCache() { if (s_.ok()) { - db_->cache()->Expire(key_, ttl_sec_); + db_->cache()->Expire(key_, sec_); } } @@ -1360,14 +1357,14 @@ void PexpireCmd::DoInitial() { return; } key_ = argv_[1]; - if (pstd::string2int(argv_[2].data(), argv_[2].size(), &ttl_millsec) == 0) { + if (pstd::string2int(argv_[2].data(), argv_[2].size(), &msec_) == 0) { res_.SetRes(CmdRes::kInvalidInt); return; } } void PexpireCmd::Do() { - int64_t res = db_->storage()->Expire(key_, ttl_millsec); + int64_t res = db_->storage()->Expire(key_, msec_ / 1000); if (res != -1) { res_.AppendInteger(res); s_ = rocksdb::Status::OK(); @@ -1382,8 +1379,8 @@ std::string PexpireCmd::ToRedisProtocol() { content.reserve(RAW_ARGS_LEN); RedisAppendLenUint64(content, argv_.size(), "*"); - // to pexpireat cmd - std::string expireat_cmd("pexpireat"); + // to expireat cmd + std::string expireat_cmd("expireat"); RedisAppendLenUint64(content, expireat_cmd.size(), "$"); RedisAppendContent(content, expireat_cmd); // key @@ -1391,7 +1388,7 @@ std::string PexpireCmd::ToRedisProtocol() { RedisAppendContent(content, key_); // sec char buf[100]; - int64_t expireat = pstd::NowMillis() + ttl_millsec; + int64_t expireat = time(nullptr) + msec_ / 1000; pstd::ll2string(buf, 100, expireat); std::string at(buf); RedisAppendLenUint64(content, at.size(), "$"); @@ -1399,13 +1396,13 @@ std::string PexpireCmd::ToRedisProtocol() { return content; } -void PexpireCmd::DoThroughDB() { +void PexpireCmd::DoThroughDB(){ Do(); } void PexpireCmd::DoUpdateCache() { if (s_.ok()) { - db_->cache()->Expire(key_, ttl_millsec); + db_->cache()->Expire(key_, msec_ / 1000); } } @@ -1415,14 +1412,14 @@ void ExpireatCmd::DoInitial() { return; } key_ = argv_[1]; - if (pstd::string2int(argv_[2].data(), argv_[2].size(), &time_stamp_sec_) == 0) { + if (pstd::string2int(argv_[2].data(), argv_[2].size(), &time_stamp_) == 0) { res_.SetRes(CmdRes::kInvalidInt); return; } } void ExpireatCmd::Do() { - int32_t res = db_->storage()->Expireat(key_, time_stamp_sec_ * 1000); + int32_t res = db_->storage()->Expireat(key_, time_stamp_); if (res != -1) { res_.AppendInteger(res); s_ = rocksdb::Status::OK(); @@ -1438,7 +1435,7 @@ void ExpireatCmd::DoThroughDB() { void ExpireatCmd::DoUpdateCache() { if (s_.ok()) { - db_->cache()->Expireat(key_, time_stamp_sec_); + db_->cache()->Expireat(key_, time_stamp_); } } @@ -1448,14 +1445,36 @@ void PexpireatCmd::DoInitial() { return; } key_ = argv_[1]; - if (pstd::string2int(argv_[2].data(), argv_[2].size(), &time_stamp_millsec_) == 0) { + if (pstd::string2int(argv_[2].data(), argv_[2].size(), &time_stamp_ms_) == 0) { res_.SetRes(CmdRes::kInvalidInt); return; } } +std::string PexpireatCmd::ToRedisProtocol() { + std::string content; + content.reserve(RAW_ARGS_LEN); + RedisAppendLenUint64(content, argv_.size(), "*"); + + // to expireat cmd + std::string expireat_cmd("expireat"); + RedisAppendLenUint64(content, expireat_cmd.size(), "$"); + RedisAppendContent(content, expireat_cmd); + // key + RedisAppendLenUint64(content, key_.size(), "$"); + RedisAppendContent(content, key_); + // sec + char buf[100]; + int64_t expireat = time_stamp_ms_ / 1000; + pstd::ll2string(buf, 100, expireat); + std::string at(buf); + RedisAppendLenUint64(content, at.size(), "$"); + RedisAppendContent(content, at); + return content; +} + void PexpireatCmd::Do() { - int32_t res = db_->storage()->Expireat(key_, static_cast(time_stamp_millsec_)); + int32_t res = db_->storage()->Expireat(key_, static_cast(time_stamp_ms_ / 1000)); if (res != -1) { res_.AppendInteger(res); s_ = rocksdb::Status::OK(); @@ -1471,7 +1490,7 @@ void PexpireatCmd::DoThroughDB() { void PexpireatCmd::DoUpdateCache() { if (s_.ok()) { - db_->cache()->Expireat(key_, time_stamp_millsec_ / 1000); + db_->cache()->Expireat(key_, time_stamp_ms_ / 1000); } } @@ -1484,11 +1503,11 @@ void TtlCmd::DoInitial() { } void TtlCmd::Do() { - int64_t ttl_sec_ = db_->storage()->TTL(key_); - if (ttl_sec_ == -3) { + int64_t timestamp = db_->storage()->TTL(key_); + if (timestamp == -3) { res_.SetRes(CmdRes::kErrOther, "ttl internal error"); } else { - res_.AppendInteger(ttl_sec_); + res_.AppendInteger(timestamp); } } @@ -1517,17 +1536,28 @@ void PttlCmd::DoInitial() { } void PttlCmd::Do() { - int64_t ttl_millsec = db_->storage()->PTTL(key_); - if (ttl_millsec == -3) { + int64_t timestamp = db_->storage()->TTL(key_); + if (timestamp == -3) { res_.SetRes(CmdRes::kErrOther, "ttl internal error"); } else { - res_.AppendInteger(ttl_millsec); + res_.AppendInteger(timestamp); } } void PttlCmd::ReadCache() { - // redis cache don't support pttl cache, so read directly from db - DoThroughDB(); + int64_t timestamp = db_->cache()->TTL(key_); + if (timestamp == -3) { + res_.SetRes(CmdRes::kErrOther, "ttl internal error"); + } else if (timestamp != -2) { + if (timestamp == -1) { + res_.AppendInteger(-1); + } else { + res_.AppendInteger(timestamp * 1000); + } + } else { + // mean this key not exist + res_.SetRes(CmdRes::kCacheMiss); + } } void PttlCmd::DoThroughDB() { @@ -1760,14 +1790,14 @@ void PKSetexAtCmd::DoInitial() { } key_ = argv_[1]; value_ = argv_[3]; - if ((pstd::string2int(argv_[2].data(), argv_[2].size(), &time_stamp_sec_) == 0) || time_stamp_sec_ >= INT32_MAX) { + if ((pstd::string2int(argv_[2].data(), argv_[2].size(), &time_stamp_) == 0) || time_stamp_ >= INT32_MAX) { res_.SetRes(CmdRes::kInvalidInt); return; } } void PKSetexAtCmd::Do() { - s_ = db_->storage()->PKSetexAt(key_, value_, static_cast(time_stamp_sec_ * 1000)); + s_ = db_->storage()->PKSetexAt(key_, value_, static_cast(time_stamp_)); if (s_.ok()) { res_.SetRes(CmdRes::kOk); } else if (s_.IsInvalidArgument()) { @@ -1783,7 +1813,7 @@ void PKSetexAtCmd::DoThroughDB() { void PKSetexAtCmd::DoUpdateCache() { if (s_.ok()) { - auto expire = time_stamp_sec_ - static_cast(std::time(nullptr)); + auto expire = time_stamp_ - static_cast(std::time(nullptr)); if (expire <= 0) [[unlikely]] { db_->cache()->Del({key_}); return; diff --git a/src/pika_list.cc b/src/pika_list.cc index 0851ff4c7c..06ae9e24f2 100644 --- a/src/pika_list.cc +++ b/src/pika_list.cc @@ -130,7 +130,7 @@ void LLenCmd::Do() { void LLenCmd::ReadCache() { uint64_t llen = 0; auto s = db_->cache()->LLen(key_, &llen); - if (s.ok()) { + if (s.ok()){ res_.AppendInteger(llen); } else if (s.IsNotFound()) { res_.SetRes(CmdRes::kCacheMiss); diff --git a/src/pika_server.cc b/src/pika_server.cc index 032a66c486..0d7d94ec17 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -605,7 +605,7 @@ int32_t PikaServer::GetSlaveListString(std::string& slave_list_str) { master_boffset.offset - sent_slave_boffset.offset; tmp_stream << "(" << db->DBName() << ":" << lag << ")"; } - } else if (s.ok() && slave_state == SlaveState::kSlaveDbSync) { + } else if (s.ok() && slave_state == SlaveState::kSlaveDbSync){ tmp_stream << "(" << db->DBName() << ":full syncing)"; } else { tmp_stream << "(" << db->DBName() << ":not syncing)"; @@ -1585,7 +1585,7 @@ void DoBgslotsreload(void* arg) { rocksdb::Status s; std::vector keys; int64_t cursor_ret = -1; - while(cursor_ret != 0 && p->GetSlotsreloading()) { + while(cursor_ret != 0 && p->GetSlotsreloading()){ cursor_ret = reload.db->storage()->Scan(storage::DataType::kAll, reload.cursor, reload.pattern, reload.count, &keys); std::vector::const_iterator iter; @@ -1593,8 +1593,8 @@ void DoBgslotsreload(void* arg) { std::string key_type; int s = GetKeyType(*iter, key_type, reload.db); //if key is slotkey, can't add to SlotKey - if (s > 0) { - if (key_type == "s" && ((*iter).find(SlotKeyPrefix) != std::string::npos || (*iter).find(SlotTagPrefix) != std::string::npos)) { + if (s > 0){ + if (key_type == "s" && ((*iter).find(SlotKeyPrefix) != std::string::npos || (*iter).find(SlotTagPrefix) != std::string::npos)){ continue; } @@ -1703,7 +1703,7 @@ void DoBgslotscleanup(void* arg) { std::vector keys; int64_t cursor_ret = -1; std::vector cleanupSlots(cleanup.cleanup_slots); - while (cursor_ret != 0 && p->GetSlotscleaningup()) { + while (cursor_ret != 0 && p->GetSlotscleaningup()){ cursor_ret = g_pika_server->bgslots_cleanup_.db->storage()->Scan(storage::DataType::kAll, cleanup.cursor, cleanup.pattern, cleanup.count, &keys); std::string key_type; @@ -1712,12 +1712,12 @@ void DoBgslotscleanup(void* arg) { if ((*iter).find(SlotKeyPrefix) != std::string::npos || (*iter).find(SlotTagPrefix) != std::string::npos) { continue; } - if (std::find(cleanupSlots.begin(), cleanupSlots.end(), GetSlotID(g_pika_conf->default_slot_num(), *iter)) != cleanupSlots.end()) { + if (std::find(cleanupSlots.begin(), cleanupSlots.end(), GetSlotID(g_pika_conf->default_slot_num(), *iter)) != cleanupSlots.end()){ if (GetKeyType(*iter, key_type, g_pika_server->bgslots_cleanup_.db) <= 0) { LOG(WARNING) << "slots clean get key type for slot " << GetSlotID(g_pika_conf->default_slot_num(), *iter) << " key " << *iter << " error"; continue; } - if (DeleteKey(*iter, key_type[0], g_pika_server->bgslots_cleanup_.db) <= 0) { + if (DeleteKey(*iter, key_type[0], g_pika_server->bgslots_cleanup_.db) <= 0){ LOG(WARNING) << "slots clean del for slot " << GetSlotID(g_pika_conf->default_slot_num(), *iter) << " key "<< *iter << " error"; } } @@ -1728,7 +1728,7 @@ void DoBgslotscleanup(void* arg) { keys.clear(); } - for (int cleanupSlot : cleanupSlots) { + for (int cleanupSlot : cleanupSlots){ WriteDelKeyToBinlog(GetSlotKey(cleanupSlot), g_pika_server->bgslots_cleanup_.db); WriteDelKeyToBinlog(GetSlotsTagKey(cleanupSlot), g_pika_server->bgslots_cleanup_.db); } diff --git a/src/pika_zset.cc b/src/pika_zset.cc index ad8abd1423..43da6e3b9f 100644 --- a/src/pika_zset.cc +++ b/src/pika_zset.cc @@ -77,7 +77,7 @@ void ZCardCmd::Do() { } } -void ZCardCmd::ReadCache() { +void ZCardCmd::ReadCache(){ res_.SetRes(CmdRes::kCacheMiss); } @@ -590,7 +590,7 @@ void ZRevrangebyscoreCmd::Do() { } } -void ZRevrangebyscoreCmd::ReadCache() { +void ZRevrangebyscoreCmd::ReadCache(){ if (min_score_ == storage::ZSET_SCORE_MAX || max_score_ == storage::ZSET_SCORE_MIN || max_score_ < min_score_) { res_.AppendContent("*0"); @@ -827,7 +827,7 @@ void ZUnionstoreCmd::DoBinlog() { del_cmd->SetResp(resp_.lock()); del_cmd->DoBinlog(); - if (value_to_dest_.empty()) { + if(value_to_dest_.empty()){ // The union operation got an empty set, only use del to simulate overwrite the dest_key with empty set return; } @@ -975,7 +975,7 @@ void ZRankCmd::ReadCache() { auto s = db_->cache()->ZRank(key_, member_, &rank, db_); if (s.ok()) { res_.AppendInteger(rank); - } else if (s.IsNotFound()) { + } else if (s.IsNotFound()){ res_.SetRes(CmdRes::kCacheMiss); } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); @@ -1020,7 +1020,7 @@ void ZRevrankCmd::ReadCache() { auto s = db_->cache()->ZRevrank(key_, member_, &revrank, db_); if (s.ok()) { res_.AppendInteger(revrank); - } else if (s.IsNotFound()) { + } else if (s.IsNotFound()){ res_.SetRes(CmdRes::kCacheMiss); } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); diff --git a/src/pstd/include/env.h b/src/pstd/include/env.h index f11680206f..8e8cbbaa37 100644 --- a/src/pstd/include/env.h +++ b/src/pstd/include/env.h @@ -17,8 +17,6 @@ class SequentialFile; class RWFile; class RandomRWFile; -using TimeType = uint64_t; - /* * Set the resource limits of a process */ @@ -63,10 +61,7 @@ class FileLock : public pstd::noncopyable { int GetChildren(const std::string& dir, std::vector& result); void GetDescendant(const std::string& dir, std::vector& result); -TimeType NowMicros(); - -TimeType NowMillis(); - +uint64_t NowMicros(); void SleepForMicroseconds(int micros); Status NewSequentialFile(const std::string& fname, std::unique_ptr& result); diff --git a/src/pstd/src/env.cc b/src/pstd/src/env.cc index 1abfe35cf2..7dadf924ea 100644 --- a/src/pstd/src/env.cc +++ b/src/pstd/src/env.cc @@ -217,16 +217,11 @@ uint64_t Du(const std::string& path) { return sum; } -TimeType NowMicros() { +uint64_t NowMicros() { auto now = std::chrono::system_clock::now(); return std::chrono::duration_cast(now.time_since_epoch()).count(); } -TimeType NowMillis() { - auto now = std::chrono::system_clock::now(); - return std::chrono::duration_cast(now.time_since_epoch()).count(); -} - void SleepForMicroseconds(int micros) { std::this_thread::sleep_for(std::chrono::microseconds(micros)); } SequentialFile::~SequentialFile() = default; diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index 752f0476ed..5759d54bdf 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -105,8 +105,8 @@ struct KeyInfo { struct ValueStatus { std::string value; Status status; - int64_t ttl_millsec; - bool operator==(const ValueStatus& vs) const { return (vs.value == value && vs.status == status && vs.ttl_millsec == ttl_millsec); } + int64_t ttl; + bool operator==(const ValueStatus& vs) const { return (vs.value == value && vs.status == status && vs.ttl == ttl); } }; struct FieldValue { @@ -190,7 +190,7 @@ class Storage { Status Set(const Slice& key, const Slice& value); // Set key to hold the string value. if key exist - Status Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl_millsec = 0); + Status Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl = 0); // Get the value of key. If the key does not exist // the special value nil is returned @@ -198,7 +198,7 @@ class Storage { // Get the value and ttl of key. If the key does not exist // the special value nil is returned. If the key has no ttl, ttl is -1 - Status GetWithTTL(const Slice& key, std::string* value, int64_t* ttl_millsec); + Status GetWithTTL(const Slice& key, std::string* value, int64_t* ttl); // Atomically sets key to value and returns the old value stored at key // Returns an error when key exists but does not hold a string value. @@ -227,7 +227,7 @@ class Storage { // Set key to hold string value if key does not exist // return 1 if the key was set // return 0 if the key was not set - Status Setnx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl_millsec = 0); + Status Setnx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl = 0); // Sets the given keys to their respective values. // MSETNX will not perform any operation at all even @@ -238,7 +238,7 @@ class Storage { // return 1 if the key currently hold the give value And override success // return 0 if the key doesn't exist And override fail // return -1 if the key currently does not hold the given value And override fail - Status Setvx(const Slice& key, const Slice& value, const Slice& new_value, int32_t* ret, int64_t ttl_millsec = 0); + Status Setvx(const Slice& key, const Slice& value, const Slice& new_value, int32_t* ret, int64_t ttl = 0); // delete the key that holds a given value // return 1 if the key currently hold the give value And delete success @@ -255,7 +255,7 @@ class Storage { Status Getrange(const Slice& key, int64_t start_offset, int64_t end_offset, std::string* ret); Status GetrangeWithValue(const Slice& key, int64_t start_offset, int64_t end_offset, - std::string* ret, std::string* value, int64_t* ttl_millsec); + std::string* ret, std::string* value, int64_t* ttl); // If key already exists and is a string, this command appends the value at // the end of the string @@ -293,7 +293,7 @@ class Storage { // Set key to hold the string value and set key to timeout after a given // number of seconds - Status Setex(const Slice& key, const Slice& value, int64_t ttl_millsec); + Status Setex(const Slice& key, const Slice& value, int64_t ttl); // Returns the length of the string value stored at key. An error // is returned when key holds a non-string value. @@ -303,7 +303,7 @@ class Storage { // specifying the number of seconds representing the TTL (time to live), it // takes an absolute Unix timestamp (seconds since January 1, 1970). A // timestamp in the past will delete the key immediately. - Status PKSetexAt(const Slice& key, const Slice& value, int64_t time_stamp_millsec_); + Status PKSetexAt(const Slice& key, const Slice& value, int64_t timestamp); // Hashes Commands @@ -334,7 +334,7 @@ class Storage { // reply is twice the size of the hash. Status HGetall(const Slice& key, std::vector* fvs); - Status HGetallWithTTL(const Slice& key, std::vector* fvs, int64_t* ttl_millsec); + Status HGetallWithTTL(const Slice& key, std::vector* fvs, int64_t* ttl); // Returns all field names in the hash stored at key. Status HKeys(const Slice& key, std::vector* fields); @@ -467,7 +467,7 @@ class Storage { // This has the same effect as running SINTER with one argument key. Status SMembers(const Slice& key, std::vector* members); - Status SMembersWithTTL(const Slice& key, std::vector* members, int64_t * ttl_millsec); + Status SMembersWithTTL(const Slice& key, std::vector* members, int64_t *ttl); // Remove the specified members from the set stored at key. Specified members // that are not a member of this set are ignored. If key does not exist, it is @@ -540,7 +540,7 @@ class Storage { // (the head of the list), 1 being the next element and so on. Status LRange(const Slice& key, int64_t start, int64_t stop, std::vector* ret); - Status LRangeWithTTL(const Slice& key, int64_t start, int64_t stop, std::vector* ret, int64_t * ttl_millsec); + Status LRangeWithTTL(const Slice& key, int64_t start, int64_t stop, std::vector* ret, int64_t *ttl); // Removes the first count occurrences of elements equal to value from the // list stored at key. The count argument influences the operation in the @@ -703,7 +703,7 @@ class Storage { Status ZRange(const Slice& key, int32_t start, int32_t stop, std::vector* score_members); Status ZRangeWithTTL(const Slice& key, int32_t start, int32_t stop, std::vector* score_members, - int64_t * ttl_millsec); + int64_t *ttl); // Returns all the elements in the sorted set at key with a score between min // and max (including elements with score equal to min or max). The elements @@ -957,10 +957,10 @@ class Storage { // While any error happens, you need to check type_status for // the error message - // Set a timeout on key, milliseconds unit + // Set a timeout on key // return -1 operation exception errors happen in database // return >=0 success - int32_t Expire(const Slice& key, int64_t ttl_millsec); + int32_t Expire(const Slice& key, int64_t ttl); // Removes the specified keys // return -1 operation exception errors happen in database @@ -1005,12 +1005,12 @@ class Storage { // EXPIREAT has the same effect and semantic as EXPIRE, but instead of // specifying the number of seconds representing the TTL (time to live), it - // takes an absolute Unix timestamp (milliseconds since January 1, 1970). A + // takes an absolute Unix timestamp (seconds since January 1, 1970). A // timestamp in the past will delete the key immediately. // return -1 operation exception errors happen in database // return 0 if key does not exist // return >=1 if the timueout was set - int32_t Expireat(const Slice& key, int64_t timestamp_millsec); + int32_t Expireat(const Slice& key, int64_t timestamp); // Remove the existing timeout on key, turning the key from volatile (a key // with an expire set) to persistent (a key that will never expire as no @@ -1027,13 +1027,6 @@ class Storage { // return > 0 TTL in seconds int64_t TTL(const Slice& key); - // Returns the remaining time to live of a key that has a timeout. - // return -3 operation exception errors happen in database - // return -2 if the key does not exist - // return -1 if the key exists but has not associated expire - // return > 0 TTL in milliseconds - int64_t PTTL(const Slice& key); - // Reutrns the data all type of the key // if single is true, the query will return the first one Status GetType(const std::string& key, enum DataType& type); @@ -1122,7 +1115,7 @@ class Storage { // For scan keys in data base std::atomic scan_keynum_exit_ = {false}; - Status MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl_millsec); + Status MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl); }; } // namespace storage diff --git a/src/storage/src/base_data_value_format.h b/src/storage/src/base_data_value_format.h index 556da85d39..ce118613a6 100644 --- a/src/storage/src/base_data_value_format.h +++ b/src/storage/src/base_data_value_format.h @@ -40,8 +40,7 @@ class BaseDataValue : public InternalValue { dst += user_value_.size(); memcpy(dst, reserve_, kSuffixReserveLength); dst += kSuffixReserveLength; - uint64_t ctime = ctime_ > 0 ? (ctime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, ctime); + EncodeFixed64(dst, ctime_); dst += kTimestampLength; return rocksdb::Slice(start_pos, needed); } @@ -59,8 +58,7 @@ class ParsedBaseDataValue : public ParsedInternalValue { if (value_->size() >= kBaseDataValueSuffixLength) { user_value_ = rocksdb::Slice(value_->data(), value_->size() - kBaseDataValueSuffixLength); memcpy(reserve_, value_->data() + user_value_.size(), kSuffixReserveLength); - uint64_t ctime = DecodeFixed64(value_->data() + user_value_.size() + kSuffixReserveLength); - ctime_ = (ctime & ~(1ULL << 63)); + ctime_ = DecodeFixed64(value_->data() + user_value_.size() + kSuffixReserveLength); } } @@ -72,8 +70,7 @@ class ParsedBaseDataValue : public ParsedInternalValue { if (value.size() >= kBaseDataValueSuffixLength) { user_value_ = rocksdb::Slice(value.data(), value.size() - kBaseDataValueSuffixLength); memcpy(reserve_, value.data() + user_value_.size(), kSuffixReserveLength); - uint64_t ctime = DecodeFixed64(value.data() + user_value_.size() + kSuffixReserveLength); - ctime_ = (ctime & ~(1ULL << 63)); + ctime_ = DecodeFixed64(value.data() + user_value_.size() + kSuffixReserveLength); } } @@ -84,8 +81,7 @@ class ParsedBaseDataValue : public ParsedInternalValue { void SetCtimeToValue() override { if (value_) { char* dst = const_cast(value_->data()) + value_->size() - kTimestampLength; - uint64_t ctime = ctime_ > 0 ? (ctime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, ctime); + EncodeFixed64(dst, ctime_); } } diff --git a/src/storage/src/base_filter.h b/src/storage/src/base_filter.h index 934b2d96d7..5dd17b09c6 100644 --- a/src/storage/src/base_filter.h +++ b/src/storage/src/base_filter.h @@ -28,7 +28,9 @@ class BaseMetaFilter : public rocksdb::CompactionFilter { BaseMetaFilter() = default; bool Filter(int level, const rocksdb::Slice& key, const rocksdb::Slice& value, std::string* new_value, bool* value_changed) const override { - auto cur_time = pstd::NowMillis(); + int64_t unix_time; + rocksdb::Env::Default()->GetCurrentTime(&unix_time); + auto cur_time = static_cast(unix_time); /* * For the filtering of meta information, because the field designs of string * and list are different, their filtering policies are written separately. @@ -179,8 +181,9 @@ class BaseDataFilter : public rocksdb::CompactionFilter { return true; } - pstd::TimeType unix_time = pstd::NowMillis(); - if (cur_meta_etime_ != 0 && cur_meta_etime_ < unix_time) { + int64_t unix_time; + rocksdb::Env::Default()->GetCurrentTime(&unix_time); + if (cur_meta_etime_ != 0 && cur_meta_etime_ < static_cast(unix_time)) { TRACE("Drop[Timeout]"); return true; } diff --git a/src/storage/src/base_meta_value_format.h b/src/storage/src/base_meta_value_format.h index 588c980624..150774479a 100644 --- a/src/storage/src/base_meta_value_format.h +++ b/src/storage/src/base_meta_value_format.h @@ -39,22 +39,18 @@ class BaseMetaValue : public InternalValue { dst += sizeof(version_); memcpy(dst, reserve_, sizeof(reserve_)); dst += sizeof(reserve_); - // The most significant bit is 1 for milliseconds and 0 for seconds. - // The previous data was stored in seconds, but the subsequent data was stored in milliseconds - uint64_t ctime = ctime_ > 0 ? (ctime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, ctime); + EncodeFixed64(dst, ctime_); dst += sizeof(ctime_); - uint64_t etime = etime_ > 0 ? (etime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, etime); + EncodeFixed64(dst, etime_); return {start_, needed}; } uint64_t UpdateVersion() { - pstd::TimeType unix_time = pstd::NowMillis(); + int64_t unix_time = pstd::NowMicros() / 1000000; if (version_ >= unix_time) { version_++; } else { - version_ = unix_time; + version_ = uint64_t(unix_time); } return version_; } @@ -75,20 +71,9 @@ class ParsedBaseMetaValue : public ParsedInternalValue { offset += sizeof(version_); memcpy(reserve_, internal_value_str->data() + offset, sizeof(reserve_)); offset += sizeof(reserve_); - uint64_t ctime = DecodeFixed64(internal_value_str->data() + offset); + ctime_ = DecodeFixed64(internal_value_str->data() + offset); offset += sizeof(ctime_); - uint64_t etime = DecodeFixed64(internal_value_str->data() + offset); - - ctime_ = (ctime & ~(1ULL << 63)); - // if ctime_==ctime, means ctime_ storaged in seconds - if (ctime_ == ctime) { - ctime_ *= 1000; - } - etime_ = (etime & ~(1ULL << 63)); - // if etime_==etime, means etime_ storaged in seconds - if (etime == etime_) { - etime_ *= 1000; - } + etime_ = DecodeFixed64(internal_value_str->data() + offset); } count_ = DecodeFixed32(internal_value_str->data() + kTypeLength); } @@ -106,20 +91,9 @@ class ParsedBaseMetaValue : public ParsedInternalValue { offset += sizeof(uint64_t); memcpy(reserve_, internal_value_slice.data() + offset, sizeof(reserve_)); offset += sizeof(reserve_); - uint64_t ctime = DecodeFixed64(internal_value_slice.data() + offset); + ctime_ = DecodeFixed64(internal_value_slice.data() + offset); offset += sizeof(ctime_); - uint64_t etime = DecodeFixed64(internal_value_slice.data() + offset); - - ctime_ = (ctime & ~(1ULL << 63)); - // if ctime_!=ctime, means ctime_ storaged in seconds - if (ctime_ == ctime) { - ctime_ *= 1000; - } - etime_ = (etime & ~(1ULL << 63)); - // if etime_!=etime, means etime_ storaged in seconds - if (etime == etime_) { - etime_ *= 1000; - } + etime_ = DecodeFixed64(internal_value_slice.data() + offset); } count_ = DecodeFixed32(internal_value_slice.data() + kTypeLength); } @@ -140,16 +114,14 @@ class ParsedBaseMetaValue : public ParsedInternalValue { void SetCtimeToValue() override { if (value_) { char* dst = const_cast(value_->data()) + value_->size() - 2 * kTimestampLength; - uint64_t ctime = ctime_ > 0 ? (ctime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, ctime); + EncodeFixed64(dst, ctime_); } } void SetEtimeToValue() override { if (value_) { char* dst = const_cast(value_->data()) + value_->size() - kTimestampLength; - uint64_t etime = etime_ > 0 ? (etime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, etime); + EncodeFixed64(dst, etime_); } } @@ -199,11 +171,12 @@ class ParsedBaseMetaValue : public ParsedInternalValue { } uint64_t UpdateVersion() { - pstd::TimeType unix_time = pstd::NowMillis(); - if (version_ >= unix_time) { + int64_t unix_time; + rocksdb::Env::Default()->GetCurrentTime(&unix_time); + if (version_ >= static_cast(unix_time)) { version_++; } else { - version_ = unix_time; + version_ = static_cast(unix_time); } SetVersionToValue(); return version_; diff --git a/src/storage/src/base_value_format.h b/src/storage/src/base_value_format.h index 51698fa607..3f0f181f97 100644 --- a/src/storage/src/base_value_format.h +++ b/src/storage/src/base_value_format.h @@ -41,7 +41,7 @@ constexpr char DataTypeToTag(DataType type) { class InternalValue { public: explicit InternalValue(DataType type, const rocksdb::Slice& user_value) : type_(type), user_value_(user_value) { - ctime_ = pstd::NowMillis(); + ctime_ = pstd::NowMicros() / 1e6; } virtual ~InternalValue() { @@ -51,9 +51,10 @@ class InternalValue { } void SetEtime(uint64_t etime = 0) { etime_ = etime; } void setCtime(uint64_t ctime) { ctime_ = ctime; } - rocksdb::Status SetRelativeTimeByMillsec(int64_t ttl_millsec) { - pstd::TimeType unix_time = pstd::NowMillis(); - etime_ = unix_time + ttl_millsec; + rocksdb::Status SetRelativeTimestamp(int64_t ttl) { + int64_t unix_time; + rocksdb::Env::Default()->GetCurrentTime(&unix_time); + etime_ = uint64_t(unix_time + ttl); return rocksdb::Status::OK(); } void SetVersion(uint64_t version = 0) { version_ = version; } @@ -121,9 +122,10 @@ class ParsedInternalValue { SetCtimeToValue(); } - void SetRelativeTimestamp(int64_t ttl_millsec) { - pstd::TimeType unix_time = pstd::NowMillis(); - etime_ = unix_time + ttl_millsec; + void SetRelativeTimestamp(int64_t ttl) { + int64_t unix_time; + rocksdb::Env::Default()->GetCurrentTime(&unix_time); + etime_ = unix_time + ttl; SetEtimeToValue(); } @@ -133,7 +135,8 @@ class ParsedInternalValue { if (etime_ == 0) { return false; } - pstd::TimeType unix_time = pstd::NowMillis(); + int64_t unix_time; + rocksdb::Env::Default()->GetCurrentTime(&unix_time); return etime_ < unix_time; } diff --git a/src/storage/src/lists_filter.h b/src/storage/src/lists_filter.h index 92186d5149..a5d8ccb1a6 100644 --- a/src/storage/src/lists_filter.h +++ b/src/storage/src/lists_filter.h @@ -88,7 +88,8 @@ class ListsDataFilter : public rocksdb::CompactionFilter { return true; } - pstd::TimeType unix_time = pstd::NowMillis(); + int64_t unix_time; + rocksdb::Env::Default()->GetCurrentTime(&unix_time); if (cur_meta_etime_ != 0 && cur_meta_etime_ < static_cast(unix_time)) { TRACE("Drop[Timeout]"); return true; diff --git a/src/storage/src/lists_meta_value_format.h b/src/storage/src/lists_meta_value_format.h index b417d9a186..c860ed9e12 100644 --- a/src/storage/src/lists_meta_value_format.h +++ b/src/storage/src/lists_meta_value_format.h @@ -44,18 +44,15 @@ class ListsMetaValue : public InternalValue { dst += kListValueIndexLength; memcpy(dst, reserve_, sizeof(reserve_)); dst += kSuffixReserveLength; - // The most significant bit is 1 for milliseconds and 0 for seconds. - // The previous data was stored in seconds, but the subsequent data was stored in milliseconds - uint64_t ctime = ctime_ > 0 ? (ctime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, ctime); + EncodeFixed64(dst, ctime_); dst += kTimestampLength; - uint64_t etime = etime_ > 0 ? (etime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, etime); + EncodeFixed64(dst, etime_); return {start_, needed}; } uint64_t UpdateVersion() { - pstd::TimeType unix_time = pstd::NowMillis(); + int64_t unix_time; + rocksdb::Env::Default()->GetCurrentTime(&unix_time); if (version_ >= static_cast(unix_time)) { version_++; } else { @@ -98,21 +95,10 @@ class ParsedListsMetaValue : public ParsedInternalValue { offset += kListValueIndexLength; memcpy(reserve_, internal_value_str->data() + offset, sizeof(reserve_)); offset += kSuffixReserveLength; - uint64_t ctime = DecodeFixed64(internal_value_str->data() + offset); + ctime_ = DecodeFixed64(internal_value_str->data() + offset); offset += kTimestampLength; - uint64_t etime = DecodeFixed64(internal_value_str->data() + offset); + etime_ = DecodeFixed64(internal_value_str->data() + offset); offset += kTimestampLength; - - ctime_ = (ctime & ~(1ULL << 63)); - // if ctime_==ctime, means ctime_ storaged in seconds - if (ctime_ == ctime) { - ctime_ *= 1000; - } - etime_ = (etime & ~(1ULL << 63)); - // if etime_==etime, means etime_ storaged in seconds - if (etime == etime_) { - etime_ *= 1000; - } } count_ = DecodeFixed64(internal_value_str->data() + kTypeLength); } @@ -136,21 +122,10 @@ class ParsedListsMetaValue : public ParsedInternalValue { offset += kListValueIndexLength; memcpy(reserve_, internal_value_slice.data() + offset, sizeof(reserve_)); offset += kSuffixReserveLength; - uint64_t ctime = DecodeFixed64(internal_value_slice.data() + offset); + ctime_ = DecodeFixed64(internal_value_slice.data() + offset); offset += kTimestampLength; - uint64_t etime = DecodeFixed64(internal_value_slice.data() + offset); + etime_ = DecodeFixed64(internal_value_slice.data() + offset); offset += kTimestampLength; - - ctime_ = (ctime & ~(1ULL << 63)); - // if ctime_==ctime, means ctime_ storaged in seconds - if (ctime_ == ctime) { - ctime_ *= 1000; - } - etime_ = (etime & ~(1ULL << 63)); - // if etime_==etime, means etime_ storaged in seconds - if (etime == etime_) { - etime_ *= 1000; - } } count_ = DecodeFixed64(internal_value_slice.data() + kTypeLength); } @@ -171,16 +146,14 @@ class ParsedListsMetaValue : public ParsedInternalValue { void SetCtimeToValue() override { if (value_) { char* dst = const_cast(value_->data()) + value_->size() - 2 * kTimestampLength; - uint64_t ctime = ctime_ > 0 ? (ctime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, ctime); + EncodeFixed64(dst, ctime_); } } void SetEtimeToValue() override { if (value_) { char* dst = const_cast(value_->data()) + value_->size() - kTimestampLength; - uint64_t etime = etime_ > 0 ? (etime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, etime); + EncodeFixed64(dst, etime_); } } @@ -225,7 +198,8 @@ class ParsedListsMetaValue : public ParsedInternalValue { } uint64_t UpdateVersion() { - pstd::TimeType unix_time = pstd::NowMillis(); + int64_t unix_time; + rocksdb::Env::Default()->GetCurrentTime(&unix_time); if (version_ >= static_cast(unix_time)) { version_++; } else { diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index 4c26aa4822..0f61f02fa9 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -117,11 +117,11 @@ class Redis { Status ScanStreamsKeyNum(KeyInfo* key_info); // Keys Commands - virtual Status StringsExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta = {}); - virtual Status HashesExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta = {}); - virtual Status ListsExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta = {}); - virtual Status ZsetsExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta = {}); - virtual Status SetsExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta = {}); + virtual Status StringsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {}); + virtual Status HashesExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {}); + virtual Status ListsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {}); + virtual Status ZsetsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {}); + virtual Status SetsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {}); virtual Status StringsDel(const Slice& key, std::string&& prefetch_meta = {}); virtual Status HashesDel(const Slice& key, std::string&& prefetch_meta = {}); @@ -130,11 +130,11 @@ class Redis { virtual Status SetsDel(const Slice& key, std::string&& prefetch_meta = {}); virtual Status StreamsDel(const Slice& key, std::string&& prefetch_meta = {}); - virtual Status StringsExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta = {}); - virtual Status HashesExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta = {}); - virtual Status ListsExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta = {}); - virtual Status SetsExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta = {}); - virtual Status ZsetsExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta = {}); + virtual Status StringsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {}); + virtual Status HashesExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {}); + virtual Status ListsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {}); + virtual Status SetsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {}); + virtual Status ZsetsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {}); virtual Status StringsPersist(const Slice& key, std::string&& prefetch_meta = {}); virtual Status HashesPersist(const Slice& key, std::string&& prefetch_meta = {}); @@ -142,11 +142,11 @@ class Redis { virtual Status ZsetsPersist(const Slice& key, std::string&& prefetch_meta = {}); virtual Status SetsPersist(const Slice& key, std::string&& prefetch_meta = {}); - virtual Status StringsTTL(const Slice& key, int64_t* ttl_millsec, std::string&& prefetch_meta = {}); - virtual Status HashesTTL(const Slice& key, int64_t* ttl_millsec, std::string&& prefetch_meta = {}); - virtual Status ListsTTL(const Slice& key, int64_t* ttl_millsec, std::string&& prefetch_meta = {}); - virtual Status ZsetsTTL(const Slice& key, int64_t* ttl_millsec, std::string&& prefetch_meta = {}); - virtual Status SetsTTL(const Slice& key, int64_t* ttl_millsec, std::string&& prefetch_meta = {}); + virtual Status StringsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta = {}); + virtual Status HashesTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta = {}); + virtual Status ListsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta = {}); + virtual Status ZsetsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta = {}); + virtual Status SetsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta = {}); // Strings Commands Status Append(const Slice& key, const Slice& value, int32_t* ret, int64_t* expired_timestamp_sec, std::string& out_new_value); @@ -156,12 +156,12 @@ class Redis { Status Get(const Slice& key, std::string* value); Status HyperloglogGet(const Slice& key, std::string* value); Status MGet(const Slice& key, std::string* value); - Status GetWithTTL(const Slice& key, std::string* value, int64_t* ttl_millsec); - Status MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl_millsec); + Status GetWithTTL(const Slice& key, std::string* value, int64_t* ttl); + Status MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl); Status GetBit(const Slice& key, int64_t offset, int32_t* ret); Status Getrange(const Slice& key, int64_t start_offset, int64_t end_offset, std::string* ret); Status GetrangeWithValue(const Slice& key, int64_t start_offset, int64_t end_offset, - std::string* ret, std::string* value, int64_t* ttl_millsec); + std::string* ret, std::string* value, int64_t* ttl); Status GetSet(const Slice& key, const Slice& value, std::string* old_value); Status Incrby(const Slice& key, int64_t value, int64_t* ret, int64_t* expired_timestamp_sec); Status Incrbyfloat(const Slice& key, const Slice& value, std::string* ret, int64_t* expired_timestamp_sec); @@ -169,11 +169,11 @@ class Redis { Status MSetnx(const std::vector& kvs, int32_t* ret); Status Set(const Slice& key, const Slice& value); Status HyperloglogSet(const Slice& key, const Slice& value); - Status Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl_millsec = 0); + Status Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl = 0); Status SetBit(const Slice& key, int64_t offset, int32_t value, int32_t* ret); - Status Setex(const Slice& key, const Slice& value, int64_t ttl_millsec); - Status Setnx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl_millsec = 0); - Status Setvx(const Slice& key, const Slice& value, const Slice& new_value, int32_t* ret, int64_t ttl_millsec = 0); + Status Setex(const Slice& key, const Slice& value, int64_t ttl); + Status Setnx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl = 0); + Status Setvx(const Slice& key, const Slice& value, const Slice& new_value, int32_t* ret, int64_t ttl = 0); Status Delvx(const Slice& key, const Slice& value, int32_t* ret); Status Setrange(const Slice& key, int64_t start_offset, const Slice& value, int32_t* ret); Status Strlen(const Slice& key, int32_t* len); @@ -181,14 +181,14 @@ class Redis { Status BitPos(const Slice& key, int32_t bit, int64_t* ret); Status BitPos(const Slice& key, int32_t bit, int64_t start_offset, int64_t* ret); Status BitPos(const Slice& key, int32_t bit, int64_t start_offset, int64_t end_offset, int64_t* ret); - Status PKSetexAt(const Slice& key, const Slice& value, int64_t time_stamp_millsec_); + Status PKSetexAt(const Slice& key, const Slice& value, int64_t timestamp); Status Exists(const Slice& key); Status Del(const Slice& key); - Status Expire(const Slice& key, int64_t ttl_millsec); - Status Expireat(const Slice& key, int64_t timestamp_millsec); + Status Expire(const Slice& key, int64_t timestamp); + Status Expireat(const Slice& key, int64_t timestamp); Status Persist(const Slice& key); - Status TTL(const Slice& key, int64_t* ttl_millsec); + Status TTL(const Slice& key, int64_t* timestamp); Status PKPatternMatchDelWithRemoveKeys(const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count); Status GetType(const Slice& key, enum DataType& type); @@ -198,7 +198,7 @@ class Redis { Status HExists(const Slice& key, const Slice& field); Status HGet(const Slice& key, const Slice& field, std::string* value); Status HGetall(const Slice& key, std::vector* fvs); - Status HGetallWithTTL(const Slice& key, std::vector* fvs, int64_t* ttl_millsec); + Status HGetallWithTTL(const Slice& key, std::vector* fvs, int64_t* ttl); Status HIncrby(const Slice& key, const Slice& field, int64_t value, int64_t* ret); Status HIncrbyfloat(const Slice& key, const Slice& field, const Slice& by, std::string* new_value); Status HKeys(const Slice& key, std::vector* fields); @@ -255,7 +255,7 @@ class Redis { Status SInterstore(const Slice& destination, const std::vector& keys, std::vector& value_to_dest, int32_t* ret); Status SIsmember(const Slice& key, const Slice& member, int32_t* ret); Status SMembers(const Slice& key, std::vector* members); - Status SMembersWithTTL(const Slice& key, std::vector* members, int64_t* ttl_millsec); + Status SMembersWithTTL(const Slice& key, std::vector* members, int64_t* ttl); Status SMove(const Slice& source, const Slice& destination, const Slice& member, int32_t* ret); Status SPop(const Slice& key, std::vector* members, int64_t cnt); Status SRandmember(const Slice& key, int32_t count, std::vector* members); @@ -276,7 +276,7 @@ class Redis { Status LPush(const Slice& key, const std::vector& values, uint64_t* ret); Status LPushx(const Slice& key, const std::vector& values, uint64_t* len); Status LRange(const Slice& key, int64_t start, int64_t stop, std::vector* ret); - Status LRangeWithTTL(const Slice& key, int64_t start, int64_t stop, std::vector* ret, int64_t* ttl_millsec); + Status LRangeWithTTL(const Slice& key, int64_t start, int64_t stop, std::vector* ret, int64_t* ttl); Status LRem(const Slice& key, int64_t count, const Slice& value, uint64_t* ret); Status LSet(const Slice& key, int64_t index, const Slice& value); Status LTrim(const Slice& key, int64_t start, int64_t stop); @@ -291,7 +291,7 @@ class Redis { Status ZCount(const Slice& key, double min, double max, bool left_close, bool right_close, int32_t* ret); Status ZIncrby(const Slice& key, const Slice& member, double increment, double* ret); Status ZRange(const Slice& key, int32_t start, int32_t stop, std::vector* score_members); - Status ZRangeWithTTL(const Slice& key, int32_t start, int32_t stop, std::vector* score_members, int64_t* ttl_millsec); + Status ZRangeWithTTL(const Slice& key, int32_t start, int32_t stop, std::vector* score_members, int64_t* ttl); Status ZRangebyscore(const Slice& key, double min, double max, bool left_close, bool right_close, int64_t count, int64_t offset, std::vector* score_members); Status ZRank(const Slice& key, const Slice& member, int32_t* rank); diff --git a/src/storage/src/redis_hashes.cc b/src/storage/src/redis_hashes.cc index 1a947c07e7..9193dddd1c 100644 --- a/src/storage/src/redis_hashes.cc +++ b/src/storage/src/redis_hashes.cc @@ -31,7 +31,8 @@ Status Redis::ScanHashesKeyNum(KeyInfo* key_info) { iterator_options.snapshot = snapshot; iterator_options.fill_cache = false; - pstd::TimeType curtime = pstd::NowMillis(); + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -115,7 +116,7 @@ Status Redis::HDel(const Slice& key, const std::vector& fields, int } } *ret = del_cnt; - if (!parsed_hashes_meta_value.CheckModifyCount(-del_cnt)) { + if (!parsed_hashes_meta_value.CheckModifyCount(-del_cnt)){ return Status::InvalidArgument("hash size overflow"); } parsed_hashes_meta_value.ModifyCount(-del_cnt); @@ -220,7 +221,7 @@ Status Redis::HGetall(const Slice& key, std::vector* fvs) { return s; } -Status Redis::HGetallWithTTL(const Slice& key, std::vector* fvs, int64_t* ttl_millsec) { +Status Redis::HGetallWithTTL(const Slice& key, std::vector* fvs, int64_t* ttl) { rocksdb::ReadOptions read_options; const rocksdb::Snapshot* snapshot; @@ -248,12 +249,13 @@ Status Redis::HGetallWithTTL(const Slice& key, std::vector* fvs, int return Status::NotFound("Stale"); } else { // ttl - *ttl_millsec = parsed_hashes_meta_value.Etime(); - if (*ttl_millsec == 0) { - *ttl_millsec = -1; + *ttl = parsed_hashes_meta_value.Etime(); + if (*ttl == 0) { + *ttl = -1; } else { - pstd::TimeType curtime = pstd::NowMillis(); - *ttl_millsec = *ttl_millsec - curtime >= 0 ? *ttl_millsec - curtime : -2; + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); + *ttl = *ttl - curtime >= 0 ? *ttl - curtime : -2; } version = parsed_hashes_meta_value.Version(); @@ -330,7 +332,7 @@ Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64 statistic++; } else if (s.IsNotFound()) { Int64ToStr(value_buf, 32, value); - if (!parsed_hashes_meta_value.CheckModifyCount(1)) { + if (!parsed_hashes_meta_value.CheckModifyCount(1)){ return Status::InvalidArgument("hash size overflow"); } BaseDataValue internal_value(value_buf); @@ -424,7 +426,7 @@ Status Redis::HIncrbyfloat(const Slice& key, const Slice& field, const Slice& by statistic++; } else if (s.IsNotFound()) { LongDoubleToStr(long_double_by, new_value); - if (!parsed_hashes_meta_value.CheckModifyCount(1)) { + if (!parsed_hashes_meta_value.CheckModifyCount(1)){ return Status::InvalidArgument("hash size overflow"); } parsed_hashes_meta_value.ModifyCount(1); @@ -652,7 +654,7 @@ Status Redis::HMSet(const Slice& key, const std::vector& fvs) { return s; } } - if (!parsed_hashes_meta_value.CheckModifyCount(count)) { + if (!parsed_hashes_meta_value.CheckModifyCount(count)){ return Status::InvalidArgument("hash size overflow"); } parsed_hashes_meta_value.ModifyCount(count); @@ -720,7 +722,7 @@ Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int statistic++; } } else if (s.IsNotFound()) { - if (!parsed_hashes_meta_value.CheckModifyCount(1)) { + if (!parsed_hashes_meta_value.CheckModifyCount(1)){ return Status::InvalidArgument("hash size overflow"); } parsed_hashes_meta_value.ModifyCount(1); @@ -787,7 +789,7 @@ Status Redis::HSetnx(const Slice& key, const Slice& field, const Slice& value, i if (s.ok()) { *ret = 0; } else if (s.IsNotFound()) { - if (!parsed_hashes_meta_value.CheckModifyCount(1)) { + if (!parsed_hashes_meta_value.CheckModifyCount(1)){ return Status::InvalidArgument("hash size overflow"); } parsed_hashes_meta_value.ModifyCount(1); @@ -1120,7 +1122,7 @@ Status Redis::PKHRScanRange(const Slice& key, const Slice& field_start, const st return Status::NotFound(); } else { uint64_t version = parsed_hashes_meta_value.Version(); - uint64_t start_key_version = start_no_limit ? version + 1 : version; + int32_t start_key_version = start_no_limit ? version + 1 : version; std::string start_key_field = start_no_limit ? "" : field_start.ToString(); HashesDataKey hashes_data_prefix(key, version, Slice()); HashesDataKey hashes_start_data_key(key, start_key_version, start_key_field); @@ -1155,7 +1157,7 @@ Status Redis::PKHRScanRange(const Slice& key, const Slice& field_start, const st return Status::OK(); } -Status Redis::HashesExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta) { +Status Redis::HashesExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta) { std::string meta_value(std::move(prefetch_meta)); ScopeRecordLock l(lock_mgr_, key); BaseMetaKey base_meta_key(key); @@ -1184,8 +1186,8 @@ Status Redis::HashesExpire(const Slice& key, int64_t ttl_millsec, std::string&& return Status::NotFound(); } - if (ttl_millsec > 0) { - parsed_hashes_meta_value.SetRelativeTimestamp(ttl_millsec); + if (ttl > 0) { + parsed_hashes_meta_value.SetRelativeTimestamp(ttl); s = db_->Put(default_write_options_, handles_[kMetaCF], base_meta_key.Encode(), meta_value); } else { parsed_hashes_meta_value.InitialMetaValue(); @@ -1232,7 +1234,7 @@ Status Redis::HashesDel(const Slice& key, std::string&& prefetch_meta) { return s; } -Status Redis::HashesExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta) { +Status Redis::HashesExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta) { std::string meta_value(std::move(prefetch_meta)); ScopeRecordLock l(lock_mgr_, key); BaseMetaKey base_meta_key(key); @@ -1260,8 +1262,8 @@ Status Redis::HashesExpireat(const Slice& key, int64_t timestamp_millsec, std::s } else if (parsed_hashes_meta_value.Count() == 0) { return Status::NotFound(); } else { - if (timestamp_millsec > 0) { - parsed_hashes_meta_value.SetEtime(static_cast(timestamp_millsec)); + if (timestamp > 0) { + parsed_hashes_meta_value.SetEtime(static_cast(timestamp)); } else { parsed_hashes_meta_value.InitialMetaValue(); } @@ -1311,7 +1313,7 @@ Status Redis::HashesPersist(const Slice& key, std::string&& prefetch_meta) { return s; } -Status Redis::HashesTTL(const Slice& key, int64_t* ttl_millsec, std::string&& prefetch_meta) { +Status Redis::HashesTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta) { std::string meta_value(std::move(prefetch_meta)); Status s; BaseMetaKey base_meta_key(key); @@ -1334,22 +1336,23 @@ Status Redis::HashesTTL(const Slice& key, int64_t* ttl_millsec, std::string&& pr if (s.ok()) { ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value); if (parsed_hashes_meta_value.IsStale()) { - *ttl_millsec = -2; + *timestamp = -2; return Status::NotFound("Stale"); } else if (parsed_hashes_meta_value.Count() == 0) { - *ttl_millsec = -2; + *timestamp = -2; return Status::NotFound(); } else { - *ttl_millsec = parsed_hashes_meta_value.Etime(); - if (*ttl_millsec == 0) { - *ttl_millsec = -1; + *timestamp = parsed_hashes_meta_value.Etime(); + if (*timestamp == 0) { + *timestamp = -1; } else { - pstd::TimeType curtime = pstd::NowMillis(); - *ttl_millsec = *ttl_millsec - curtime >= 0 ? *ttl_millsec - curtime : -2; + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); + *timestamp = *timestamp - curtime >= 0 ? *timestamp - curtime : -2; } } } else if (s.IsNotFound()) { - *ttl_millsec = -2; + *timestamp = -2; } return s; } diff --git a/src/storage/src/redis_lists.cc b/src/storage/src/redis_lists.cc index cdf4ff122d..db007ee2cf 100644 --- a/src/storage/src/redis_lists.cc +++ b/src/storage/src/redis_lists.cc @@ -30,7 +30,8 @@ Status Redis::ScanListsKeyNum(KeyInfo* key_info) { iterator_options.snapshot = snapshot; iterator_options.fill_cache = false; - pstd::TimeType curtime = pstd::NowMillis(); + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -470,7 +471,7 @@ Status Redis::LRange(const Slice& key, int64_t start, int64_t stop, std::vector< } } -Status Redis::LRangeWithTTL(const Slice& key, int64_t start, int64_t stop, std::vector* ret, int64_t* ttl_millsec) { +Status Redis::LRangeWithTTL(const Slice& key, int64_t start, int64_t stop, std::vector* ret, int64_t* ttl) { rocksdb::ReadOptions read_options; const rocksdb::Snapshot* snapshot; @@ -498,12 +499,13 @@ Status Redis::LRangeWithTTL(const Slice& key, int64_t start, int64_t stop, std:: return Status::NotFound("Stale"); } else { // ttl - *ttl_millsec = parsed_lists_meta_value.Etime(); - if (*ttl_millsec == 0) { - *ttl_millsec = -1; + *ttl = parsed_lists_meta_value.Etime(); + if (*ttl == 0) { + *ttl = -1; } else { - pstd::TimeType curtime = pstd::NowMillis(); - *ttl_millsec = *ttl_millsec - curtime >= 0 ? *ttl_millsec - curtime : -2; + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); + *ttl = *ttl - curtime >= 0 ? *ttl - curtime : -2; } uint64_t version = parsed_lists_meta_value.Version(); @@ -1095,7 +1097,7 @@ Status Redis::RPushx(const Slice& key, const std::vector& values, u return s; } -Status Redis::ListsExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta) { +Status Redis::ListsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta) { std::string meta_value(std::move(prefetch_meta)); ScopeRecordLock l(lock_mgr_, key); BaseMetaKey base_meta_key(key); @@ -1124,8 +1126,8 @@ Status Redis::ListsExpire(const Slice& key, int64_t ttl_millsec, std::string&& p return Status::NotFound(); } - if (ttl_millsec > 0) { - parsed_lists_meta_value.SetRelativeTimestamp(ttl_millsec); + if (ttl > 0) { + parsed_lists_meta_value.SetRelativeTimestamp(ttl); s = db_->Put(default_write_options_, handles_[kMetaCF], base_meta_key.Encode(), meta_value); } else { parsed_lists_meta_value.InitialMetaValue(); @@ -1172,7 +1174,7 @@ Status Redis::ListsDel(const Slice& key, std::string&& prefetch_meta) { return s; } -Status Redis::ListsExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta) { +Status Redis::ListsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta) { std::string meta_value(std::move(prefetch_meta)); ScopeRecordLock l(lock_mgr_, key); BaseMetaKey base_meta_key(key); @@ -1200,8 +1202,8 @@ Status Redis::ListsExpireat(const Slice& key, int64_t timestamp_millsec, std::st } else if (parsed_lists_meta_value.Count() == 0) { return Status::NotFound(); } else { - if (timestamp_millsec > 0) { - parsed_lists_meta_value.SetEtime(static_cast(timestamp_millsec)); + if (timestamp > 0) { + parsed_lists_meta_value.SetEtime(static_cast(timestamp)); } else { parsed_lists_meta_value.InitialMetaValue(); } @@ -1251,7 +1253,7 @@ Status Redis::ListsPersist(const Slice& key, std::string&& prefetch_meta) { return s; } -Status Redis::ListsTTL(const Slice& key, int64_t* ttl_millsec, std::string&& prefetch_meta) { +Status Redis::ListsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta) { std::string meta_value(std::move(prefetch_meta)); BaseMetaKey base_meta_key(key); Status s; @@ -1274,23 +1276,24 @@ Status Redis::ListsTTL(const Slice& key, int64_t* ttl_millsec, std::string&& pre if (s.ok()) { ParsedListsMetaValue parsed_lists_meta_value(&meta_value); if (parsed_lists_meta_value.IsStale()) { - *ttl_millsec = -2; + *timestamp = -2; return Status::NotFound("Stale"); } else if (parsed_lists_meta_value.Count() == 0) { - *ttl_millsec = -2; + *timestamp = -2; return Status::NotFound(); } else { // Return -1 for lists with no set expiration, and calculate remaining time for others - *ttl_millsec = parsed_lists_meta_value.Etime(); - if (*ttl_millsec == 0) { - *ttl_millsec = -1; + *timestamp = parsed_lists_meta_value.Etime(); + if (*timestamp == 0) { + *timestamp = -1; } else { - pstd::TimeType curtime = pstd::NowMillis(); - *ttl_millsec = *ttl_millsec - curtime >= 0 ? *ttl_millsec - curtime : -2; + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); + *timestamp = *timestamp - curtime >= 0 ? *timestamp - curtime : -2; } } } else if (s.IsNotFound()) { - *ttl_millsec = -2; + *timestamp = -2; } return s; } diff --git a/src/storage/src/redis_sets.cc b/src/storage/src/redis_sets.cc index 5f33d9574b..db5044b440 100644 --- a/src/storage/src/redis_sets.cc +++ b/src/storage/src/redis_sets.cc @@ -34,7 +34,8 @@ rocksdb::Status Redis::ScanSetsKeyNum(KeyInfo* key_info) { iterator_options.snapshot = snapshot; iterator_options.fill_cache = false; - pstd::TimeType curtime = pstd::NowMillis(); + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -123,7 +124,7 @@ rocksdb::Status Redis::SAdd(const Slice& key, const std::vector& me if (cnt == 0) { return rocksdb::Status::OK(); } else { - if (!parsed_sets_meta_value.CheckModifyCount(cnt)) { + if (!parsed_sets_meta_value.CheckModifyCount(cnt)){ return Status::InvalidArgument("set size overflow"); } parsed_sets_meta_value.ModifyCount(cnt); @@ -742,7 +743,7 @@ rocksdb::Status Redis::SMembers(const Slice& key, std::vector* memb Status Redis::SMembersWithTTL(const Slice& key, std::vector* members, - int64_t* ttl_millsec) { + int64_t* ttl) { rocksdb::ReadOptions read_options; const rocksdb::Snapshot* snapshot; @@ -770,12 +771,13 @@ Status Redis::SMembersWithTTL(const Slice& key, return Status::NotFound("Stale"); } else { // ttl - *ttl_millsec = parsed_sets_meta_value.Etime(); - if (*ttl_millsec == 0) { - *ttl_millsec = -1; + *ttl = parsed_sets_meta_value.Etime(); + if (*ttl == 0) { + *ttl = -1; } else { - pstd::TimeType curtime = pstd::NowMillis(); - *ttl_millsec = *ttl_millsec - curtime >= 0 ? *ttl_millsec - curtime : -2; + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); + *ttl = *ttl - curtime >= 0 ? *ttl - curtime : -2; } version = parsed_sets_meta_value.Version(); @@ -836,7 +838,7 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons s = db_->Get(default_read_options_, handles_[kSetsDataCF], sets_member_key.Encode(), &member_value); if (s.ok()) { *ret = 1; - if (!parsed_sets_meta_value.CheckModifyCount(-1)) { + if (!parsed_sets_meta_value.CheckModifyCount(-1)){ return Status::InvalidArgument("set size overflow"); } parsed_sets_meta_value.ModifyCount(-1); @@ -884,7 +886,7 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons SetsMemberKey sets_member_key(destination, version, member); s = db_->Get(default_read_options_, handles_[kSetsDataCF], sets_member_key.Encode(), &member_value); if (s.IsNotFound()) { - if (!parsed_sets_meta_value.CheckModifyCount(1)) { + if (!parsed_sets_meta_value.CheckModifyCount(1)){ return Status::InvalidArgument("set size overflow"); } parsed_sets_meta_value.ModifyCount(1); @@ -919,6 +921,8 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector* members, rocksdb::WriteBatch batch; ScopeRecordLock l(lock_mgr_, key); + uint64_t start_us = pstd::NowMicros(); + BaseMetaKey base_meta_key(key); Status s = db_->Get(default_read_options_, handles_[kMetaCF], base_meta_key.Encode(), &meta_value); if (s.ok() && !ExpectedMetaValue(DataType::kSets, meta_value)) { @@ -996,7 +1000,7 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector* members, } } - if (!parsed_sets_meta_value.CheckModifyCount(static_cast(-cnt))) { + if (!parsed_sets_meta_value.CheckModifyCount(static_cast(-cnt))){ return Status::InvalidArgument("set size overflow"); } parsed_sets_meta_value.ModifyCount(static_cast(-cnt)); @@ -1145,7 +1149,7 @@ rocksdb::Status Redis::SRem(const Slice& key, const std::vector& me } } *ret = cnt; - if (!parsed_sets_meta_value.CheckModifyCount(-cnt)) { + if (!parsed_sets_meta_value.CheckModifyCount(-cnt)){ return Status::InvalidArgument("set size overflow"); } parsed_sets_meta_value.ModifyCount(-cnt); @@ -1403,7 +1407,7 @@ rocksdb::Status Redis::SScan(const Slice& key, int64_t cursor, const std::string return rocksdb::Status::OK(); } -rocksdb::Status Redis::SetsExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta) { +rocksdb::Status Redis::SetsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta) { std::string meta_value(std::move(prefetch_meta)); ScopeRecordLock l(lock_mgr_, key); BaseMetaKey base_meta_key(key); @@ -1432,8 +1436,8 @@ rocksdb::Status Redis::SetsExpire(const Slice& key, int64_t ttl_millsec, std::st return rocksdb::Status::NotFound(); } - if (ttl_millsec > 0) { - parsed_sets_meta_value.SetRelativeTimestamp(ttl_millsec); + if (ttl > 0) { + parsed_sets_meta_value.SetRelativeTimestamp(ttl); s = db_->Put(default_write_options_, handles_[kMetaCF], base_meta_key.Encode(), meta_value); } else { parsed_sets_meta_value.InitialMetaValue(); @@ -1480,7 +1484,7 @@ rocksdb::Status Redis::SetsDel(const Slice& key, std::string&& prefetch_meta) { return s; } -rocksdb::Status Redis::SetsExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta) { +rocksdb::Status Redis::SetsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta) { std::string meta_value(std::move(prefetch_meta)); ScopeRecordLock l(lock_mgr_, key); BaseMetaKey base_meta_key(key); @@ -1508,8 +1512,8 @@ rocksdb::Status Redis::SetsExpireat(const Slice& key, int64_t timestamp_millsec, } else if (parsed_sets_meta_value.Count() == 0) { return rocksdb::Status::NotFound(); } else { - if (timestamp_millsec > 0) { - parsed_sets_meta_value.SetEtime(static_cast(timestamp_millsec)); + if (timestamp > 0) { + parsed_sets_meta_value.SetEtime(static_cast(timestamp)); } else { parsed_sets_meta_value.InitialMetaValue(); } @@ -1559,7 +1563,7 @@ rocksdb::Status Redis::SetsPersist(const Slice& key, std::string&& prefetch_meta return s; } -rocksdb::Status Redis::SetsTTL(const Slice& key, int64_t* ttl_millsec, std::string&& prefetch_meta) { +rocksdb::Status Redis::SetsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta) { std::string meta_value(std::move(prefetch_meta)); BaseMetaKey base_meta_key(key); rocksdb::Status s; @@ -1582,22 +1586,23 @@ rocksdb::Status Redis::SetsTTL(const Slice& key, int64_t* ttl_millsec, std::stri if (s.ok()) { ParsedSetsMetaValue parsed_setes_meta_value(&meta_value); if (parsed_setes_meta_value.IsStale()) { - *ttl_millsec = -2; + *timestamp = -2; return rocksdb::Status::NotFound("Stale"); } else if (parsed_setes_meta_value.Count() == 0) { - *ttl_millsec = -2; + *timestamp = -2; return rocksdb::Status::NotFound(); } else { - *ttl_millsec = parsed_setes_meta_value.Etime(); - if (*ttl_millsec == 0) { - *ttl_millsec = -1; + *timestamp = parsed_setes_meta_value.Etime(); + if (*timestamp == 0) { + *timestamp = -1; } else { - pstd::TimeType curtime = pstd::NowMillis(); - *ttl_millsec = *ttl_millsec - curtime >= 0 ? *ttl_millsec - curtime : -2; + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); + *timestamp = *timestamp - curtime >= 0 ? *timestamp - curtime : -2; } } } else if (s.IsNotFound()) { - *ttl_millsec = -2; + *timestamp = -2; } return s; } diff --git a/src/storage/src/redis_streams.cc b/src/storage/src/redis_streams.cc index f3abdc5b08..606fb99c05 100644 --- a/src/storage/src/redis_streams.cc +++ b/src/storage/src/redis_streams.cc @@ -338,6 +338,9 @@ Status Redis::ScanStreamsKeyNum(KeyInfo* key_info) { iterator_options.snapshot = snapshot; iterator_options.fill_cache = false; + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); + rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { if (!ExpectedMetaValue(DataType::kStreams, iter->value().ToString())) { diff --git a/src/storage/src/redis_strings.cc b/src/storage/src/redis_strings.cc index 1c46b6ab69..c69be1d46e 100644 --- a/src/storage/src/redis_strings.cc +++ b/src/storage/src/redis_strings.cc @@ -33,7 +33,8 @@ Status Redis::ScanStringsKeyNum(KeyInfo* key_info) { iterator_options.snapshot = snapshot; iterator_options.fill_cache = false; - pstd::TimeType curtime = pstd::NowMillis(); + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); // Note: This is a string type and does not need to pass the column family as // a parameter, use the default column family @@ -381,23 +382,24 @@ void ClearValueAndSetTTL(std::string* value, int64_t* ttl, int64_t ttl_value) { } int64_t CalculateTTL(int64_t expiry_time) { - pstd::TimeType current_time = pstd::NowMillis(); + int64_t current_time; + rocksdb::Env::Default()->GetCurrentTime(¤t_time); return expiry_time - current_time >= 0 ? expiry_time - current_time : -2; } -Status HandleParsedStringsValue(ParsedStringsValue& parsed_strings_value, std::string* value, int64_t* ttl_millsec) { +Status HandleParsedStringsValue(ParsedStringsValue& parsed_strings_value, std::string* value, int64_t* ttl) { if (parsed_strings_value.IsStale()) { - ClearValueAndSetTTL(value, ttl_millsec, -2); + ClearValueAndSetTTL(value, ttl, -2); return Status::NotFound("Stale"); } else { parsed_strings_value.StripSuffix(); int64_t expiry_time = parsed_strings_value.Etime(); - *ttl_millsec = (expiry_time == 0) ? -1 : CalculateTTL(expiry_time); + *ttl = (expiry_time == 0) ? -1 : CalculateTTL(expiry_time); } return Status::OK(); } -Status Redis::GetWithTTL(const Slice& key, std::string* value, int64_t* ttl_millsec) { +Status Redis::GetWithTTL(const Slice& key, std::string* value, int64_t* ttl) { value->clear(); BaseKey base_key(key); Status s = db_->Get(default_read_options_, base_key.Encode(), value); @@ -416,15 +418,15 @@ Status Redis::GetWithTTL(const Slice& key, std::string* value, int64_t* ttl_mill if (s.ok()) { ParsedStringsValue parsed_strings_value(value); - return HandleParsedStringsValue(parsed_strings_value, value, ttl_millsec); + return HandleParsedStringsValue(parsed_strings_value, value, ttl); } else if (s.IsNotFound()) { - ClearValueAndSetTTL(value, ttl_millsec, -2); + ClearValueAndSetTTL(value, ttl, -2); } return s; } -Status Redis::MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl_millsec) { +Status Redis::MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl) { value->clear(); BaseKey base_key(key); Status s = db_->Get(default_read_options_, base_key.Encode(), value); @@ -436,9 +438,9 @@ Status Redis::MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl_mil if (s.ok()) { ParsedStringsValue parsed_strings_value(value); - return HandleParsedStringsValue(parsed_strings_value, value, ttl_millsec); + return HandleParsedStringsValue(parsed_strings_value, value, ttl); } else if (s.IsNotFound()) { - ClearValueAndSetTTL(value, ttl_millsec, -2); + ClearValueAndSetTTL(value, ttl, -2); } return s; @@ -529,7 +531,7 @@ Status Redis::Getrange(const Slice& key, int64_t start_offset, int64_t end_offse } Status Redis::GetrangeWithValue(const Slice& key, int64_t start_offset, int64_t end_offset, - std::string* ret, std::string* value, int64_t* ttl_millsec) { + std::string* ret, std::string* value, int64_t* ttl) { *ret = ""; BaseKey base_key(key); Status s = db_->Get(default_read_options_, base_key.Encode(), value); @@ -548,17 +550,18 @@ Status Redis::GetrangeWithValue(const Slice& key, int64_t start_offset, int64_t ParsedStringsValue parsed_strings_value(value); if (parsed_strings_value.IsStale()) { value->clear(); - *ttl_millsec = -2; + *ttl = -2; return Status::NotFound("Stale"); } else { parsed_strings_value.StripSuffix(); // get ttl - *ttl_millsec = parsed_strings_value.Etime(); - if (*ttl_millsec == 0) { - *ttl_millsec = -1; + *ttl = parsed_strings_value.Etime(); + if (*ttl == 0) { + *ttl = -1; } else { - pstd::TimeType curtime = pstd::NowMillis(); - *ttl_millsec = *ttl_millsec - curtime >= 0 ? *ttl_millsec - curtime : -2; + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); + *ttl = *ttl - curtime >= 0 ? *ttl - curtime : -2; } int64_t size = value->size(); @@ -584,7 +587,7 @@ Status Redis::GetrangeWithValue(const Slice& key, int64_t start_offset, int64_t } } else if (s.IsNotFound()) { value->clear(); - *ttl_millsec = -2; + *ttl = -2; } return s; } @@ -778,7 +781,7 @@ Status Redis::Set(const Slice& key, const Slice& value) { return db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode()); } -Status Redis::Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl_millsec) { +Status Redis::Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl) { bool not_found = true; std::string old_value; StringsValue strings_value(value); @@ -810,8 +813,8 @@ Status Redis::Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t return s; } else { *ret = 1; - if (ttl_millsec > 0) { - strings_value.SetRelativeTimeByMillsec(ttl_millsec); + if (ttl > 0) { + strings_value.SetRelativeTimestamp(ttl); } return db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode()); } @@ -876,12 +879,12 @@ Status Redis::SetBit(const Slice& key, int64_t offset, int32_t on, int32_t* ret) } } -Status Redis::Setex(const Slice& key, const Slice& value, int64_t ttl_millsec) { - if (ttl_millsec <= 0) { +Status Redis::Setex(const Slice& key, const Slice& value, int64_t ttl) { + if (ttl <= 0) { return Status::InvalidArgument("invalid expire time"); } StringsValue strings_value(value); - auto s = strings_value.SetRelativeTimeByMillsec(ttl_millsec); + auto s = strings_value.SetRelativeTimestamp(ttl); if (s != Status::OK()) { return s; } @@ -891,7 +894,7 @@ Status Redis::Setex(const Slice& key, const Slice& value, int64_t ttl_millsec) { return db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode()); } -Status Redis::Setnx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl_millsec) { +Status Redis::Setnx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl) { *ret = 0; std::string old_value; @@ -908,8 +911,8 @@ Status Redis::Setnx(const Slice& key, const Slice& value, int32_t* ret, int64_t s = Status::NotFound(); StringsValue strings_value(value); - if (ttl_millsec > 0) { - strings_value.SetRelativeTimeByMillsec(ttl_millsec); + if (ttl > 0) { + strings_value.SetRelativeTimestamp(ttl); } s = db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode()); if (s.ok()) { @@ -919,7 +922,7 @@ Status Redis::Setnx(const Slice& key, const Slice& value, int32_t* ret, int64_t } Status Redis::Setvx(const Slice& key, const Slice& value, const Slice& new_value, int32_t* ret, - int64_t ttl_millsec) { + int64_t ttl) { *ret = 0; std::string old_value; @@ -943,8 +946,8 @@ Status Redis::Setvx(const Slice& key, const Slice& value, const Slice& new_value } else { if (value.compare(parsed_strings_value.UserValue()) == 0) { StringsValue strings_value(new_value); - if (ttl_millsec > 0) { - strings_value.SetRelativeTimeByMillsec(ttl_millsec); + if (ttl > 0) { + strings_value.SetRelativeTimestamp(ttl); } s = db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode()); if (!s.ok()) { @@ -1289,18 +1292,16 @@ Status Redis::BitPos(const Slice& key, int32_t bit, int64_t start_offset, int64_ } //TODO(wangshaoyi): timestamp uint64_t -Status Redis::PKSetexAt(const Slice& key, const Slice& value, int64_t time_stamp_millsec_) { +Status Redis::PKSetexAt(const Slice& key, const Slice& value, int64_t timestamp) { StringsValue strings_value(value); - if (time_stamp_millsec_ < 0) { - time_stamp_millsec_ = pstd::NowMillis() - 1; - } + BaseKey base_key(key); ScopeRecordLock l(lock_mgr_, key); - strings_value.SetEtime(uint64_t(time_stamp_millsec_)); + strings_value.SetEtime(uint64_t(timestamp)); return db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode()); } -Status Redis::StringsExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta) { +Status Redis::StringsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta) { std::string value(std::move(prefetch_meta)); BaseKey base_key(key); @@ -1326,8 +1327,8 @@ Status Redis::StringsExpire(const Slice& key, int64_t ttl_millsec, std::string&& if (parsed_strings_value.IsStale()) { return Status::NotFound("Stale"); } - if (ttl_millsec > 0) { - parsed_strings_value.SetRelativeTimestamp(ttl_millsec); + if (ttl > 0) { + parsed_strings_value.SetRelativeTimestamp(ttl); return db_->Put(default_write_options_, base_key.Encode(), value); } else { return db_->Delete(default_write_options_, base_key.Encode()); @@ -1367,7 +1368,7 @@ Status Redis::StringsDel(const Slice& key, std::string&& prefetch_meta) { return s; } -Status Redis::StringsExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta) { +Status Redis::StringsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta) { std::string value(std::move(prefetch_meta)); ScopeRecordLock l(lock_mgr_, key); BaseKey base_key(key); @@ -1393,8 +1394,8 @@ Status Redis::StringsExpireat(const Slice& key, int64_t timestamp_millsec, std:: if (parsed_strings_value.IsStale()) { return Status::NotFound("Stale"); } else { - if (timestamp_millsec > 0) { - parsed_strings_value.SetEtime(static_cast(timestamp_millsec)); + if (timestamp > 0) { + parsed_strings_value.SetEtime(static_cast(timestamp)); return db_->Put(default_write_options_, base_key.Encode(), value); } else { return db_->Delete(default_write_options_, base_key.Encode()); @@ -1442,7 +1443,7 @@ Status Redis::StringsPersist(const Slice& key, std::string&& prefetch_meta) { return s; } -Status Redis::StringsTTL(const Slice& key, int64_t* ttl_millsec, std::string&& prefetch_meta) { +Status Redis::StringsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta) { std::string value(std::move(prefetch_meta)); ScopeRecordLock l(lock_mgr_, key); BaseKey base_key(key); @@ -1466,19 +1467,20 @@ Status Redis::StringsTTL(const Slice& key, int64_t* ttl_millsec, std::string&& p if (s.ok()) { ParsedStringsValue parsed_strings_value(&value); if (parsed_strings_value.IsStale()) { - *ttl_millsec = -2; + *timestamp = -2; return Status::NotFound("Stale"); } else { - *ttl_millsec = parsed_strings_value.Etime(); - if (*ttl_millsec == 0) { - *ttl_millsec = -1; + *timestamp = parsed_strings_value.Etime(); + if (*timestamp == 0) { + *timestamp = -1; } else { - pstd::TimeType curtime = pstd::NowMillis(); - *ttl_millsec = *ttl_millsec - curtime >= 0 ? *ttl_millsec - curtime : -2; + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); + *timestamp = *timestamp - curtime >= 0 ? *timestamp - curtime : -2; } } } else if (s.IsNotFound()) { - *ttl_millsec = -2; + *timestamp = -2; } return s; } @@ -1570,7 +1572,7 @@ rocksdb::Status Redis::Del(const Slice& key) { return rocksdb::Status::NotFound(); } -rocksdb::Status Redis::Expire(const Slice& key, int64_t ttl_millsec) { +rocksdb::Status Redis::Expire(const Slice& key, int64_t ttl) { std::string meta_value; BaseMetaKey base_meta_key(key); rocksdb::Status s = db_->Get(default_read_options_, handles_[kMetaCF], base_meta_key.Encode(), &meta_value); @@ -1578,15 +1580,15 @@ rocksdb::Status Redis::Expire(const Slice& key, int64_t ttl_millsec) { auto type = static_cast(static_cast(meta_value[0])); switch (type) { case DataType::kSets: - return SetsExpire(key, ttl_millsec, std::move(meta_value)); + return SetsExpire(key, ttl, std::move(meta_value)); case DataType::kZSets: - return ZsetsExpire(key, ttl_millsec, std::move(meta_value)); + return ZsetsExpire(key, ttl, std::move(meta_value)); case DataType::kHashes: - return HashesExpire(key, ttl_millsec, std::move(meta_value)); + return HashesExpire(key, ttl, std::move(meta_value)); case DataType::kLists: - return ListsExpire(key, ttl_millsec, std::move(meta_value)); + return ListsExpire(key, ttl, std::move(meta_value)); case DataType::kStrings: - return StringsExpire(key, ttl_millsec, std::move(meta_value)); + return StringsExpire(key, ttl, std::move(meta_value)); default: return rocksdb::Status::NotFound(); } @@ -1594,7 +1596,7 @@ rocksdb::Status Redis::Expire(const Slice& key, int64_t ttl_millsec) { return rocksdb::Status::NotFound(); } -rocksdb::Status Redis::Expireat(const Slice& key, int64_t timestamp_millsec) { +rocksdb::Status Redis::Expireat(const Slice& key, int64_t ttl) { std::string meta_value; BaseMetaKey base_meta_key(key); rocksdb::Status s = db_->Get(default_read_options_, handles_[kMetaCF], base_meta_key.Encode(), &meta_value); @@ -1602,15 +1604,15 @@ rocksdb::Status Redis::Expireat(const Slice& key, int64_t timestamp_millsec) { auto type = static_cast(static_cast(meta_value[0])); switch (type) { case DataType::kSets: - return SetsExpireat(key, timestamp_millsec, std::move(meta_value)); + return SetsExpireat(key, ttl, std::move(meta_value)); case DataType::kZSets: - return ZsetsExpireat(key, timestamp_millsec, std::move(meta_value)); + return ZsetsExpireat(key, ttl, std::move(meta_value)); case DataType::kHashes: - return HashesExpireat(key, timestamp_millsec, std::move(meta_value)); + return HashesExpireat(key, ttl, std::move(meta_value)); case DataType::kLists: - return ListsExpireat(key, timestamp_millsec, std::move(meta_value)); + return ListsExpireat(key, ttl, std::move(meta_value)); case DataType::kStrings: - return StringsExpireat(key, timestamp_millsec, std::move(meta_value)); + return StringsExpireat(key, ttl, std::move(meta_value)); default: return rocksdb::Status::NotFound(); } @@ -1642,7 +1644,7 @@ rocksdb::Status Redis::Persist(const Slice& key) { return rocksdb::Status::NotFound(); } -rocksdb::Status Redis::TTL(const Slice& key, int64_t* ttl_millsec) { +rocksdb::Status Redis::TTL(const Slice& key, int64_t* timestamp) { std::string meta_value; BaseMetaKey base_meta_key(key); rocksdb::Status s = db_->Get(default_read_options_, handles_[kMetaCF], base_meta_key.Encode(), &meta_value); @@ -1650,15 +1652,15 @@ rocksdb::Status Redis::TTL(const Slice& key, int64_t* ttl_millsec) { auto type = static_cast(static_cast(meta_value[0])); switch (type) { case DataType::kSets: - return SetsTTL(key, ttl_millsec, std::move(meta_value)); + return SetsTTL(key, timestamp, std::move(meta_value)); case DataType::kZSets: - return ZsetsTTL(key, ttl_millsec, std::move(meta_value)); + return ZsetsTTL(key, timestamp, std::move(meta_value)); case DataType::kHashes: - return HashesTTL(key, ttl_millsec, std::move(meta_value)); + return HashesTTL(key, timestamp, std::move(meta_value)); case DataType::kLists: - return ListsTTL(key, ttl_millsec, std::move(meta_value)); + return ListsTTL(key, timestamp, std::move(meta_value)); case DataType::kStrings: - return StringsTTL(key, ttl_millsec, std::move(meta_value)); + return StringsTTL(key, timestamp, std::move(meta_value)); default: return rocksdb::Status::NotFound(); } diff --git a/src/storage/src/redis_zsets.cc b/src/storage/src/redis_zsets.cc index fa6c78f912..632f7fb80a 100644 --- a/src/storage/src/redis_zsets.cc +++ b/src/storage/src/redis_zsets.cc @@ -36,7 +36,8 @@ Status Redis::ScanZsetsKeyNum(KeyInfo* key_info) { iterator_options.snapshot = snapshot; iterator_options.fill_cache = false; - pstd::TimeType curtime = pstd::NowMillis(); + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -107,7 +108,7 @@ Status Redis::ZPopMax(const Slice& key, const int64_t count, std::vectorkey()); } delete iter; - if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)) { + if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)){ return Status::InvalidArgument("zset size overflow"); } parsed_zsets_meta_value.ModifyCount(-del_cnt); @@ -165,7 +166,7 @@ Status Redis::ZPopMin(const Slice& key, const int64_t count, std::vectorkey()); } delete iter; - if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)) { + if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)){ return Status::InvalidArgument("zset size overflow"); } parsed_zsets_meta_value.ModifyCount(-del_cnt); @@ -260,7 +261,7 @@ Status Redis::ZAdd(const Slice& key, const std::vector& score_membe cnt++; } } - if (!parsed_zsets_meta_value.CheckModifyCount(cnt)) { + if (!parsed_zsets_meta_value.CheckModifyCount(cnt)){ return Status::InvalidArgument("zset size overflow"); } parsed_zsets_meta_value.ModifyCount(cnt); @@ -443,7 +444,7 @@ Status Redis::ZIncrby(const Slice& key, const Slice& member, double increment, d statistic++; } else if (s.IsNotFound()) { score = increment; - if (!parsed_zsets_meta_value.CheckModifyCount(1)) { + if (!parsed_zsets_meta_value.CheckModifyCount(1)){ return Status::InvalidArgument("zset size overflow"); } parsed_zsets_meta_value.ModifyCount(1); @@ -534,7 +535,7 @@ Status Redis::ZRange(const Slice& key, int32_t start, int32_t stop, std::vector< } Status Redis::ZRangeWithTTL(const Slice& key, int32_t start, int32_t stop, std::vector* score_members, - int64_t* ttl_millsec) { + int64_t* ttl) { score_members->clear(); rocksdb::ReadOptions read_options; const rocksdb::Snapshot* snapshot = nullptr; @@ -563,12 +564,13 @@ Status Redis::ZRangeWithTTL(const Slice& key, int32_t start, int32_t stop, std:: return Status::NotFound("Stale"); } else { // ttl - *ttl_millsec = parsed_zsets_meta_value.Etime(); - if (*ttl_millsec == 0) { - *ttl_millsec = -1; + *ttl = parsed_zsets_meta_value.Etime(); + if (*ttl == 0) { + *ttl = -1; } else { - pstd::TimeType curtime = pstd::NowMillis(); - *ttl_millsec = *ttl_millsec - curtime >= 0 ? *ttl_millsec - curtime : -2; + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); + *ttl = *ttl - curtime >= 0 ? *ttl - curtime : -2; } int32_t count = parsed_zsets_meta_value.Count(); @@ -795,7 +797,7 @@ Status Redis::ZRem(const Slice& key, const std::vector& members, in } } *ret = del_cnt; - if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)) { + if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)){ return Status::InvalidArgument("zset size overflow"); } parsed_zsets_meta_value.ModifyCount(-del_cnt); @@ -862,7 +864,7 @@ Status Redis::ZRemrangebyrank(const Slice& key, int32_t start, int32_t stop, int } delete iter; *ret = del_cnt; - if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)) { + if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)){ return Status::InvalidArgument("zset size overflow"); } parsed_zsets_meta_value.ModifyCount(-del_cnt); @@ -942,7 +944,7 @@ Status Redis::ZRemrangebyscore(const Slice& key, double min, double max, bool le } delete iter; *ret = del_cnt; - if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)) { + if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)){ return Status::InvalidArgument("zset size overflow"); } parsed_zsets_meta_value.ModifyCount(-del_cnt); @@ -1654,7 +1656,7 @@ Status Redis::ZRemrangebylex(const Slice& key, const Slice& min, const Slice& ma delete iter; } if (del_cnt > 0) { - if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)) { + if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)){ return Status::InvalidArgument("zset size overflow"); } parsed_zsets_meta_value.ModifyCount(-del_cnt); @@ -1669,7 +1671,7 @@ Status Redis::ZRemrangebylex(const Slice& key, const Slice& min, const Slice& ma return s; } -Status Redis::ZsetsExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta) { +Status Redis::ZsetsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta) { std::string meta_value(std::move(prefetch_meta)); ScopeRecordLock l(lock_mgr_, key); BaseMetaKey base_meta_key(key); @@ -1698,8 +1700,8 @@ Status Redis::ZsetsExpire(const Slice& key, int64_t ttl_millsec, std::string&& p return Status::NotFound(); } - if (ttl_millsec > 0) { - parsed_zsets_meta_value.SetRelativeTimestamp(ttl_millsec); + if (ttl > 0) { + parsed_zsets_meta_value.SetRelativeTimestamp(ttl); } else { parsed_zsets_meta_value.InitialMetaValue(); } @@ -1745,7 +1747,7 @@ Status Redis::ZsetsDel(const Slice& key, std::string&& prefetch_meta) { return s; } -Status Redis::ZsetsExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta) { +Status Redis::ZsetsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta) { std::string meta_value(std::move(prefetch_meta)); ScopeRecordLock l(lock_mgr_, key); BaseMetaKey base_meta_key(key); @@ -1773,8 +1775,8 @@ Status Redis::ZsetsExpireat(const Slice& key, int64_t timestamp_millsec, std::st } else if (parsed_zsets_meta_value.Count() == 0) { return Status::NotFound(); } else { - if (timestamp_millsec > 0) { - parsed_zsets_meta_value.SetEtime(uint64_t(timestamp_millsec)); + if (timestamp > 0) { + parsed_zsets_meta_value.SetEtime(uint64_t(timestamp)); } else { parsed_zsets_meta_value.InitialMetaValue(); } @@ -1910,7 +1912,7 @@ Status Redis::ZsetsPersist(const Slice& key, std::string&& prefetch_meta) { return s; } -Status Redis::ZsetsTTL(const Slice& key, int64_t* ttl_millsec, std::string&& prefetch_meta) { +Status Redis::ZsetsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta) { std::string meta_value(std::move(prefetch_meta)); BaseMetaKey base_meta_key(key); Status s; @@ -1933,22 +1935,23 @@ Status Redis::ZsetsTTL(const Slice& key, int64_t* ttl_millsec, std::string&& pre if (s.ok()) { ParsedZSetsMetaValue parsed_zsets_meta_value(&meta_value); if (parsed_zsets_meta_value.IsStale()) { - *ttl_millsec = -2; + *timestamp = -2; return Status::NotFound("Stale"); } else if (parsed_zsets_meta_value.Count() == 0) { - *ttl_millsec = -2; + *timestamp = -2; return Status::NotFound(); } else { - *ttl_millsec = parsed_zsets_meta_value.Etime(); - if (*ttl_millsec == 0) { - *ttl_millsec = -1; + *timestamp = parsed_zsets_meta_value.Etime(); + if (*timestamp == 0) { + *timestamp = -1; } else { - pstd::TimeType curtime = pstd::NowMillis(); - *ttl_millsec = *ttl_millsec - curtime >= 0 ? *ttl_millsec - curtime : -2; + int64_t curtime; + rocksdb::Env::Default()->GetCurrentTime(&curtime); + *timestamp = *timestamp - curtime >= 0 ? *timestamp - curtime : -2; } } } else if (s.IsNotFound()) { - *ttl_millsec = -2; + *timestamp = -2; } return s; } diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 716bfeb32f..173ecfb976 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -125,7 +125,9 @@ Status Storage::StoreCursorStartKey(const DataType& dtype, int64_t cursor, char return cursors_store_->Insert(index_key, index_value); } -std::unique_ptr& Storage::GetDBInstance(const Slice& key) { return GetDBInstance(key.ToString()); } +std::unique_ptr& Storage::GetDBInstance(const Slice& key) { + return GetDBInstance(key.ToString()); +} std::unique_ptr& Storage::GetDBInstance(const std::string& key) { auto inst_index = slot_indexer_->GetInstanceID(GetSlotID(slot_num_, key)); @@ -138,9 +140,9 @@ Status Storage::Set(const Slice& key, const Slice& value) { return inst->Set(key, value); } -Status Storage::Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl_millsec) { +Status Storage::Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl) { auto& inst = GetDBInstance(key); - return inst->Setxx(key, value, ret, ttl_millsec); + return inst->Setxx(key, value, ret, ttl); } Status Storage::Get(const Slice& key, std::string* value) { @@ -148,14 +150,14 @@ Status Storage::Get(const Slice& key, std::string* value) { return inst->Get(key, value); } -Status Storage::GetWithTTL(const Slice& key, std::string* value, int64_t* ttl_millsec) { +Status Storage::GetWithTTL(const Slice& key, std::string* value, int64_t* ttl) { auto& inst = GetDBInstance(key); - return inst->GetWithTTL(key, value, ttl_millsec); + return inst->GetWithTTL(key, value, ttl); } -Status Storage::MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl_millsec) { +Status Storage::MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl) { auto& inst = GetDBInstance(key); - return inst->MGetWithTTL(key, value, ttl_millsec); + return inst->MGetWithTTL(key, value, ttl); } Status Storage::GetSet(const Slice& key, const Slice& value, std::string* old_value) { @@ -194,7 +196,7 @@ Status Storage::MGet(const std::vector& keys, std::vectorMGet(key, &value); if (s.ok()) { vss->push_back({value, Status::OK()}); - } else if (s.IsNotFound()) { + } else if(s.IsNotFound()) { vss->push_back({std::string(), Status::NotFound()}); } else { vss->clear(); @@ -210,12 +212,12 @@ Status Storage::MGetWithTTL(const std::vector& keys, std::vectorMGetWithTTL(key, &value, &ttl_millsec); + int64_t ttl; + s = inst->MGetWithTTL(key, &value, &ttl); if (s.ok()) { - vss->push_back({value, Status::OK(), ttl_millsec}); + vss->push_back({value, Status::OK(), ttl}); } else if (s.IsNotFound()) { - vss->push_back({std::string(), Status::NotFound(), ttl_millsec}); + vss->push_back({std::string(), Status::NotFound(), ttl}); } else { vss->clear(); return s; @@ -224,9 +226,9 @@ Status Storage::MGetWithTTL(const std::vector& keys, std::vectorSetnx(key, value, ret, ttl_millsec); + return inst->Setnx(key, value, ret, ttl); } // disallowed in codis, only runs in pika classic mode @@ -255,9 +257,9 @@ Status Storage::MSetnx(const std::vector& kvs, int32_t* ret) { return s; } -Status Storage::Setvx(const Slice& key, const Slice& value, const Slice& new_value, int32_t* ret, int64_t ttl_millsec) { +Status Storage::Setvx(const Slice& key, const Slice& value, const Slice& new_value, int32_t* ret, int64_t ttl) { auto& inst = GetDBInstance(key); - return inst->Setvx(key, value, new_value, ret, ttl_millsec); + return inst->Setvx(key, value, new_value, ret, ttl); } Status Storage::Delvx(const Slice& key, const Slice& value, int32_t* ret) { @@ -276,9 +278,9 @@ Status Storage::Getrange(const Slice& key, int64_t start_offset, int64_t end_off } Status Storage::GetrangeWithValue(const Slice& key, int64_t start_offset, int64_t end_offset, - std::string* ret, std::string* value, int64_t* ttl_millsec) { + std::string* ret, std::string* value, int64_t* ttl) { auto& inst = GetDBInstance(key); - return inst->GetrangeWithValue(key, start_offset, end_offset, ret, value, ttl_millsec); + return inst->GetrangeWithValue(key, start_offset, end_offset, ret, value, ttl); } Status Storage::Append(const Slice& key, const Slice& value, int32_t* ret, int64_t* expired_timestamp_sec, std::string& out_new_value) { @@ -355,9 +357,9 @@ Status Storage::Incrbyfloat(const Slice& key, const Slice& value, std::string* r return inst->Incrbyfloat(key, value, ret, expired_timestamp_sec); } -Status Storage::Setex(const Slice& key, const Slice& value, int64_t ttl_millsec) { +Status Storage::Setex(const Slice& key, const Slice& value, int64_t ttl) { auto& inst = GetDBInstance(key); - return inst->Setex(key, value, ttl_millsec); + return inst->Setex(key, value, ttl); } Status Storage::Strlen(const Slice& key, int32_t* len) { @@ -365,12 +367,9 @@ Status Storage::Strlen(const Slice& key, int32_t* len) { return inst->Strlen(key, len); } -Status Storage::PKSetexAt(const Slice& key, const Slice& value, int64_t time_stamp_millsec_) { +Status Storage::PKSetexAt(const Slice& key, const Slice& value, int64_t timestamp) { auto& inst = GetDBInstance(key); - if (time_stamp_millsec_ < 0) { - time_stamp_millsec_ = pstd::NowMillis() - 1; - } - return inst->PKSetexAt(key, value, time_stamp_millsec_); + return inst->PKSetexAt(key, value, timestamp); } // Hashes Commands @@ -399,9 +398,9 @@ Status Storage::HGetall(const Slice& key, std::vector* fvs) { return inst->HGetall(key, fvs); } -Status Storage::HGetallWithTTL(const Slice& key, std::vector* fvs, int64_t* ttl_millsec) { +Status Storage::HGetallWithTTL(const Slice& key, std::vector* fvs, int64_t* ttl) { auto& inst = GetDBInstance(key); - return inst->HGetallWithTTL(key, fvs, ttl_millsec); + return inst->HGetallWithTTL(key, fvs, ttl); } Status Storage::HKeys(const Slice& key, std::vector* fields) { @@ -627,9 +626,9 @@ Status Storage::SMembers(const Slice& key, std::vector* members) { return inst->SMembers(key, members); } -Status Storage::SMembersWithTTL(const Slice& key, std::vector* members, int64_t * ttl_millsec) { +Status Storage::SMembersWithTTL(const Slice& key, std::vector* members, int64_t *ttl) { auto& inst = GetDBInstance(key); - return inst->SMembersWithTTL(key, members, ttl_millsec); + return inst->SMembersWithTTL(key, members, ttl); } Status Storage::SMove(const Slice& source, const Slice& destination, const Slice& member, int32_t* ret) { @@ -754,9 +753,9 @@ Status Storage::LRange(const Slice& key, int64_t start, int64_t stop, std::vecto return inst->LRange(key, start, stop, ret); } -Status Storage::LRangeWithTTL(const Slice& key, int64_t start, int64_t stop, std::vector* ret, int64_t * ttl_millsec) { +Status Storage::LRangeWithTTL(const Slice& key, int64_t start, int64_t stop, std::vector* ret, int64_t *ttl) { auto& inst = GetDBInstance(key); - return inst->LRangeWithTTL(key, start, stop, ret, ttl_millsec); + return inst->LRangeWithTTL(key, start, stop, ret, ttl); } Status Storage::LTrim(const Slice& key, int64_t start, int64_t stop) { @@ -886,10 +885,10 @@ Status Storage::ZRange(const Slice& key, int32_t start, int32_t stop, std::vecto return inst->ZRange(key, start, stop, score_members); } Status Storage::ZRangeWithTTL(const Slice& key, int32_t start, int32_t stop, std::vector* score_members, - int64_t * ttl_millsec) { + int64_t *ttl) { score_members->clear(); auto& inst = GetDBInstance(key); - return inst->ZRangeWithTTL(key, start, stop, score_members, ttl_millsec); + return inst->ZRangeWithTTL(key, start, stop, score_members, ttl); } Status Storage::ZRangebyscore(const Slice& key, double min, double max, bool left_close, bool right_close, @@ -1171,10 +1170,10 @@ Status Storage::XInfo(const Slice& key, StreamInfoResult &result) { } // Keys Commands -int32_t Storage::Expire(const Slice& key, int64_t ttl_millsec) { +int32_t Storage::Expire(const Slice& key, int64_t ttl) { auto& inst = GetDBInstance(key); int32_t ret = 0; - Status s = inst->Expire(key, ttl_millsec); + Status s = inst->Expire(key, ttl); if (s.ok()) { ret++; } else if (!s.IsNotFound()) { @@ -1450,11 +1449,11 @@ Status Storage::Scanx(const DataType& data_type, const std::string& start_key, c return Status::OK(); } -int32_t Storage::Expireat(const Slice& key, int64_t timestamp_millsec) { +int32_t Storage::Expireat(const Slice& key, int64_t timestamp) { Status s; int32_t count = 0; auto& inst = GetDBInstance(key); - s = inst->Expireat(key, timestamp_millsec); + s = inst->Expireat(key, timestamp); if (s.ok()) { count++; } else if (!s.IsNotFound()) { @@ -1475,28 +1474,16 @@ int32_t Storage::Persist(const Slice& key) { return count; } -int64_t Storage::PTTL(const Slice& key) { - int64_t ttl_millsec = 0; - auto& inst = GetDBInstance(key); - Status s = inst->TTL(key, &ttl_millsec); - if (s.ok() || s.IsNotFound()) { - return ttl_millsec; - } else if (!s.IsNotFound()) { - return -3; - } - return ttl_millsec; -} - int64_t Storage::TTL(const Slice& key) { - int64_t ttl_millsec = 0; + int64_t timestamp = 0; auto& inst = GetDBInstance(key); - Status s = inst->TTL(key, &ttl_millsec); + Status s = inst->TTL(key, ×tamp); if (s.ok() || s.IsNotFound()) { - return ttl_millsec > 0 ? ttl_millsec / 1000 : ttl_millsec; + return timestamp; } else if (!s.IsNotFound()) { return -3; } - return ttl_millsec > 0 ? ttl_millsec / 1000 : ttl_millsec; + return timestamp; } Status Storage::GetType(const std::string& key, enum DataType& type) { @@ -1792,7 +1779,7 @@ Status Storage::SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys) { } Status Storage::SetSmallCompactionThreshold(uint32_t small_compaction_threshold) { - for (const auto& inst : insts_) { + for (const auto& inst: insts_) { inst->SetSmallCompactionThreshold(small_compaction_threshold); } return Status::OK(); diff --git a/src/storage/src/strings_filter.h b/src/storage/src/strings_filter.h index c53478bb11..fc03595d82 100644 --- a/src/storage/src/strings_filter.h +++ b/src/storage/src/strings_filter.h @@ -20,7 +20,8 @@ class StringsFilter : public rocksdb::CompactionFilter { StringsFilter() = default; bool Filter(int level, const rocksdb::Slice& key, const rocksdb::Slice& value, std::string* new_value, bool* value_changed) const override { - pstd::TimeType unix_time = pstd::NowMillis(); + int64_t unix_time; + rocksdb::Env::Default()->GetCurrentTime(&unix_time); auto cur_time = static_cast(unix_time); ParsedStringsValue parsed_strings_value(value); TRACE("==========================START=========================="); diff --git a/src/storage/src/strings_value_format.h b/src/storage/src/strings_value_format.h index 550104b339..6e001d7475 100644 --- a/src/storage/src/strings_value_format.h +++ b/src/storage/src/strings_value_format.h @@ -35,13 +35,9 @@ class StringsValue : public InternalValue { dst += usize; memcpy(dst, reserve_, kSuffixReserveLength); dst += kSuffixReserveLength; - // The most significant bit is 1 for milliseconds and 0 for seconds. - // The previous data was stored in seconds, but the subsequent data was stored in milliseconds - uint64_t ctime = ctime_ > 0 ? (ctime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, ctime); + EncodeFixed64(dst, ctime_); dst += kTimestampLength; - uint64_t etime = etime_ > 0 ? (etime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, etime); + EncodeFixed64(dst, etime_); return {start_, needed}; } }; @@ -82,20 +78,9 @@ class ParsedStringsValue : public ParsedInternalValue { offset += user_value_.size(); memcpy(reserve_, internal_value_str->data() + offset, kSuffixReserveLength); offset += kSuffixReserveLength; - uint64_t ctime = DecodeFixed64(internal_value_str->data() + offset); + ctime_ = DecodeFixed64(internal_value_str->data() + offset); offset += sizeof(ctime_); - uint64_t etime = DecodeFixed64(internal_value_str->data() + offset); - - ctime_ = (ctime & ~(1ULL << 63)); - // if ctime_==ctime, means ctime_ storaged in seconds - if (ctime_ == ctime) { - ctime_ *= 1000; - } - etime_ = (etime & ~(1ULL << 63)); - // if etime_==etime, means etime_ storaged in seconds - if (etime == etime_) { - etime_ *= 1000; - } + etime_ = DecodeFixed64(internal_value_str->data() + offset); } } @@ -109,20 +94,9 @@ class ParsedStringsValue : public ParsedInternalValue { offset += user_value_.size(); memcpy(reserve_, internal_value_slice.data() + offset, kSuffixReserveLength); offset += kSuffixReserveLength; - uint64_t ctime = DecodeFixed64(internal_value_slice.data() + offset); + ctime_ = DecodeFixed64(internal_value_slice.data() + offset); offset += kTimestampLength; - uint64_t etime = DecodeFixed64(internal_value_slice.data() + offset); - - ctime_ = (ctime & ~(1ULL << 63)); - // if ctime_==ctime, means ctime_ storaged in seconds - if (ctime_ == ctime) { - ctime_ *= 1000; - } - etime_ = (etime & ~(1ULL << 63)); - // if etime_==etime, means etime_ storaged in seconds - if (etime == etime_) { - etime_ *= 1000; - } + etime_ = DecodeFixed64(internal_value_slice.data() + offset); } } @@ -140,8 +114,7 @@ class ParsedStringsValue : public ParsedInternalValue { if (value_) { char* dst = const_cast(value_->data()) + value_->size() - kStringsValueSuffixLength + kSuffixReserveLength; - uint64_t ctime = ctime_ > 0 ? (ctime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, ctime); + EncodeFixed64(dst, ctime_); } } @@ -149,8 +122,7 @@ class ParsedStringsValue : public ParsedInternalValue { if (value_) { char* dst = const_cast(value_->data()) + value_->size() - kStringsValueSuffixLength + kSuffixReserveLength + kTimestampLength; - uint64_t etime = etime_ > 0 ? (etime_ | (1ULL << 63)) : 0; - EncodeFixed64(dst, etime); + EncodeFixed64(dst, etime_); } } diff --git a/src/storage/src/zsets_filter.h b/src/storage/src/zsets_filter.h index 629f12e669..8c046c7d3b 100644 --- a/src/storage/src/zsets_filter.h +++ b/src/storage/src/zsets_filter.h @@ -80,7 +80,8 @@ class ZSetsScoreFilter : public rocksdb::CompactionFilter { return true; } - pstd::TimeType unix_time = pstd::NowMillis(); + int64_t unix_time; + rocksdb::Env::Default()->GetCurrentTime(&unix_time); if (cur_meta_etime_ != 0 && cur_meta_etime_ < static_cast(unix_time)) { TRACE("Drop[Timeout]"); return true; diff --git a/src/storage/tests/keys_test.cc b/src/storage/tests/keys_test.cc index eeb7f8d9db..09292872f0 100644 --- a/src/storage/tests/keys_test.cc +++ b/src/storage/tests/keys_test.cc @@ -5179,7 +5179,8 @@ TEST_F(KeysTest, ExpireatTest) { s = db.Set("EXPIREAT_KEY", "VALUE"); ASSERT_TRUE(s.ok()); - pstd::TimeType unix_time = pstd::NowMillis(); + int64_t unix_time; + rocksdb::Env::Default()->GetCurrentTime(&unix_time); int64_t timestamp = unix_time + 1; ret = db.Expireat("EXPIREAT_KEY", timestamp); ASSERT_EQ(ret, 1); diff --git a/src/storage/tests/lists_filter_test.cc b/src/storage/tests/lists_filter_test.cc index aa343c4322..18e979d803 100644 --- a/src/storage/tests/lists_filter_test.cc +++ b/src/storage/tests/lists_filter_test.cc @@ -102,7 +102,7 @@ TEST_F(ListsFilterTest, DataFilterTest) { EncodeFixed64(str, 1); ListsMetaValue lists_meta_value2(Slice(str, sizeof(uint64_t))); version = lists_meta_value2.UpdateVersion(); - lists_meta_value2.SetRelativeTimeByMillsec(1); + lists_meta_value2.SetRelativeTimestamp(1); s = meta_db->Put(rocksdb::WriteOptions(), handles[0], bmk.Encode(), lists_meta_value2.Encode()); ASSERT_TRUE(s.ok()); ListsDataKey lists_data_key2("FILTER_TEST_KEY", version, 1); @@ -119,7 +119,7 @@ TEST_F(ListsFilterTest, DataFilterTest) { EncodeFixed64(str, 1); ListsMetaValue lists_meta_value3(Slice(str, sizeof(uint64_t))); version = lists_meta_value3.UpdateVersion(); - lists_meta_value3.SetRelativeTimeByMillsec(1); + lists_meta_value3.SetRelativeTimestamp(1); s = meta_db->Put(rocksdb::WriteOptions(), handles[0], bmk.Encode(), lists_meta_value3.Encode()); ASSERT_TRUE(s.ok()); std::this_thread::sleep_for(std::chrono::milliseconds(2000)); diff --git a/src/storage/tests/strings_filter_test.cc b/src/storage/tests/strings_filter_test.cc index 4c6f90d0c7..26af189877 100644 --- a/src/storage/tests/strings_filter_test.cc +++ b/src/storage/tests/strings_filter_test.cc @@ -21,7 +21,7 @@ TEST(StringsFilterTest, FilterTest) { int64_t ttl = 1; StringsValue strings_value("FILTER_VALUE"); - strings_value.SetRelativeTimeByMillsec(ttl); + strings_value.SetRelativeTimestamp(ttl); is_stale = filter->Filter(0, "FILTER_KEY", strings_value.Encode(), &new_value, &value_changed); ASSERT_FALSE(is_stale); std::this_thread::sleep_for(std::chrono::milliseconds(2000)); diff --git a/src/storage/tests/strings_test.cc b/src/storage/tests/strings_test.cc index ca5adabac0..361f6d082b 100644 --- a/src/storage/tests/strings_test.cc +++ b/src/storage/tests/strings_test.cc @@ -44,7 +44,7 @@ class StringsTest : public ::testing::Test { static bool make_expired(storage::Storage* const db, const Slice& key) { std::map type_status; - int ret = db->Expire(key, 1 * 100); + int ret = db->Expire(key, 1); if ((ret == 0) || !type_status[storage::DataType::kStrings].ok()) { return false; } @@ -87,7 +87,7 @@ TEST_F(StringsTest, AppendTest) { // ***************** Group 2 Test ***************** s = db.Set("GP2_APPEND_KEY", "VALUE"); ASSERT_TRUE(s.ok()); - ret = db.Expire("GP2_APPEND_KEY", 100 * 1000); + ret = db.Expire("GP2_APPEND_KEY", 100); ASSERT_EQ(ret, 1); type_status.clear(); type_ttl = db.TTL("GP2_APPEND_KEY"); @@ -764,7 +764,7 @@ TEST_F(StringsTest, SetvxTest) { ASSERT_TRUE(s.ok()); std::map type_status; - ret = db.Expire("GP6_SETVX_KEY", 10 * 1000); + ret = db.Expire("GP6_SETVX_KEY", 10); ASSERT_EQ(ret, 1); sleep(1); @@ -772,7 +772,7 @@ TEST_F(StringsTest, SetvxTest) { ASSERT_LT(0, ttl); ASSERT_GT(10, ttl); - s = db.Setvx("GP6_SETVX_KEY", "GP6_SETVX_VALUE", "GP6_SETVX_NEW_VALUE", &ret, 20 * 1000); + s = db.Setvx("GP6_SETVX_KEY", "GP6_SETVX_VALUE", "GP6_SETVX_NEW_VALUE", &ret, 20); ASSERT_TRUE(s.ok()); ASSERT_EQ(ret, 1); @@ -941,13 +941,16 @@ TEST_F(StringsTest, BitPosTest) { // PKSetexAt TEST_F(StringsTest, PKSetexAtTest) { - pstd::TimeType unix_time; +#ifdef OS_MACOSX + return ; +#endif + int64_t unix_time; + rocksdb::Env::Default()->GetCurrentTime(&unix_time); int64_t ttl_ret; std::map type_status; // ***************** Group 1 Test ***************** - unix_time = pstd::NowMillis(); - s = db.PKSetexAt("GP1_PKSETEX_KEY", "VALUE", unix_time + 100*1000); + s = db.PKSetexAt("GP1_PKSETEX_KEY", "VALUE", unix_time + 100); ASSERT_TRUE(s.ok()); type_status.clear(); @@ -957,10 +960,9 @@ TEST_F(StringsTest, PKSetexAtTest) { ASSERT_GE(ttl_ret, 90); // ***************** Group 2 Test ***************** - unix_time = pstd::NowMillis(); s = db.Set("GP2_PKSETEX_KEY", "VALUE"); ASSERT_TRUE(s.ok()); - s = db.PKSetexAt("GP2_PKSETEX_KEY", "VALUE", unix_time + 100*1000); + s = db.PKSetexAt("GP2_PKSETEX_KEY", "VALUE", unix_time + 100); ASSERT_TRUE(s.ok()); type_status.clear(); @@ -970,8 +972,7 @@ TEST_F(StringsTest, PKSetexAtTest) { ASSERT_GE(ttl_ret, 90); // ***************** Group 3 Test ***************** - unix_time = pstd::NowMillis(); - s = db.PKSetexAt("GP3_PKSETEX_KEY", "VALUE", unix_time - 100*1000); + s = db.PKSetexAt("GP3_PKSETEX_KEY", "VALUE", unix_time - 100); ASSERT_TRUE(s.ok()); type_status.clear(); @@ -979,10 +980,9 @@ TEST_F(StringsTest, PKSetexAtTest) { ASSERT_EQ(ttl_ret, -2); // ***************** Group 4 Test ***************** - unix_time = pstd::NowMillis(); s = db.Set("GP4_PKSETEX_KEY", "VALUE"); ASSERT_TRUE(s.ok()); - s = db.PKSetexAt("GP4_PKSETEX_KEY", "VALUE", unix_time - 100*1000); + s = db.PKSetexAt("GP4_PKSETEX_KEY", "VALUE", unix_time - 100); ASSERT_TRUE(s.ok()); type_status.clear(); @@ -990,7 +990,6 @@ TEST_F(StringsTest, PKSetexAtTest) { ASSERT_EQ(ttl_ret, -2); // ***************** Group 5 Test ***************** - unix_time = pstd::NowMillis(); s = db.PKSetexAt("GP5_PKSETEX_KEY", "VALUE", -unix_time); ASSERT_TRUE(s.ok()); @@ -999,7 +998,6 @@ TEST_F(StringsTest, PKSetexAtTest) { ASSERT_EQ(ttl_ret, -2); // ***************** Group 6 Test ***************** - unix_time = pstd::NowMillis(); s = db.Set("GP6_PKSETEX_KEY", "VALUE"); ASSERT_TRUE(s.ok()); s = db.PKSetexAt("GP6_PKSETEX_KEY", "VALUE", -unix_time);