Skip to content

Commit

Permalink
修复 cmd 层的 TTL 时间单位问题
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyuecai committed Jul 31, 2024
1 parent c465f67 commit 30a71a6
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 109 deletions.
9 changes: 4 additions & 5 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ class ExpireCmd : public Cmd {

private:
std::string key_;
int64_t sec_ = 0;
int64_t ttl_sec_ = 0;
void DoInitial() override;
std::string ToRedisProtocol() override;
rocksdb::Status s_;
Expand All @@ -596,7 +596,7 @@ class PexpireCmd : public Cmd {

private:
std::string key_;
int64_t msec_ = 0;
int64_t ttl_millsec = 0;
void DoInitial() override;
std::string ToRedisProtocol() override;
rocksdb::Status s_;
Expand All @@ -620,7 +620,7 @@ class ExpireatCmd : public Cmd {

private:
std::string key_;
int64_t time_stamp_ = 0;
int64_t time_stamp_sec_ = 0;
void DoInitial() override;
rocksdb::Status s_;
};
Expand All @@ -643,10 +643,9 @@ class PexpireatCmd : public Cmd {

private:
std::string key_;
int64_t time_stamp_ms_ = 0;
int64_t time_stamp_millsec_ = 0;
void DoInitial() override;
rocksdb::Status s_;
std::string ToRedisProtocol() override;
};

class TtlCmd : public Cmd {
Expand Down
4 changes: 2 additions & 2 deletions src/pika_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ Status PikaCache::Expire(std::string& key, int64_t ttl) {
return caches_[cache_index]->Expire(key, ttl);
}

Status PikaCache::Expireat(std::string& key, int64_t ttl) {
Status PikaCache::Expireat(std::string& key, int64_t milliseconds) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);
return caches_[cache_index]->Expireat(key, ttl);
return caches_[cache_index]->Expireat(key, milliseconds);
}

Status PikaCache::TTL(std::string& key, int64_t *ttl) {
Expand Down
54 changes: 16 additions & 38 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1199,14 +1199,14 @@ void ExpireCmd::DoInitial() {
return;
}
key_ = argv_[1];
if (pstd::string2int(argv_[2].data(), argv_[2].size(), &sec_) == 0) {
if (pstd::string2int(argv_[2].data(), argv_[2].size(), &ttl_sec_) == 0) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
}

void ExpireCmd::Do() {
int32_t res = db_->storage()->Expire(key_, sec_);
int32_t res = db_->storage()->Expire(key_, ttl_sec_ * 1000);
if (res != -1) {
res_.AppendInteger(res);
s_ = rocksdb::Status::OK();
Expand All @@ -1230,7 +1230,7 @@ std::string ExpireCmd::ToRedisProtocol() {
RedisAppendContent(content, key_);
// sec
char buf[100];
int64_t expireat = time(nullptr) + sec_;
int64_t expireat = time(nullptr) + ttl_sec_;
pstd::ll2string(buf, 100, expireat);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
Expand All @@ -1244,7 +1244,7 @@ void ExpireCmd::DoThroughDB() {

void ExpireCmd::DoUpdateCache() {
if (s_.ok()) {
db_->cache()->Expire(key_, sec_);
db_->cache()->Expire(key_, ttl_sec_);
}
}

Expand All @@ -1254,14 +1254,14 @@ void PexpireCmd::DoInitial() {
return;
}
key_ = argv_[1];
if (pstd::string2int(argv_[2].data(), argv_[2].size(), &msec_) == 0) {
if (pstd::string2int(argv_[2].data(), argv_[2].size(), &ttl_millsec) == 0) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
}

void PexpireCmd::Do() {
int64_t res = db_->storage()->Expire(key_, msec_ / 1000);
int64_t res = db_->storage()->Expire(key_, ttl_millsec);
if (res != -1) {
res_.AppendInteger(res);
s_ = rocksdb::Status::OK();
Expand All @@ -1276,16 +1276,16 @@ std::string PexpireCmd::ToRedisProtocol() {
content.reserve(RAW_ARGS_LEN);
RedisAppendLenUint64(content, argv_.size(), "*");

// to expireat cmd
std::string expireat_cmd("expireat");
// to pexpireat cmd
std::string expireat_cmd("pexpireat");
RedisAppendLenUint64(content, expireat_cmd.size(), "$");
RedisAppendContent(content, expireat_cmd);
// key
RedisAppendLenUint64(content, key_.size(), "$");
RedisAppendContent(content, key_);
// sec
char buf[100];
int64_t expireat = time(nullptr) + msec_ / 1000;
int64_t expireat = pstd::NowMillis() + ttl_millsec;
pstd::ll2string(buf, 100, expireat);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
Expand All @@ -1299,7 +1299,7 @@ void PexpireCmd::DoThroughDB() {

void PexpireCmd::DoUpdateCache() {
if (s_.ok()) {
db_->cache()->Expire(key_, msec_ / 1000);
db_->cache()->Expire(key_, ttl_millsec);
}
}

Expand All @@ -1309,14 +1309,14 @@ void ExpireatCmd::DoInitial() {
return;
}
key_ = argv_[1];
if (pstd::string2int(argv_[2].data(), argv_[2].size(), &time_stamp_) == 0) {
if (pstd::string2int(argv_[2].data(), argv_[2].size(), &time_stamp_sec_) == 0) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
}

void ExpireatCmd::Do() {
int32_t res = db_->storage()->Expireat(key_, time_stamp_);
int32_t res = db_->storage()->Expireat(key_, time_stamp_sec_ * 1000);
if (res != -1) {
res_.AppendInteger(res);
s_ = rocksdb::Status::OK();
Expand All @@ -1332,7 +1332,7 @@ void ExpireatCmd::DoThroughDB() {

void ExpireatCmd::DoUpdateCache() {
if (s_.ok()) {
db_->cache()->Expireat(key_, time_stamp_);
db_->cache()->Expireat(key_, time_stamp_sec_);
}
}

Expand All @@ -1342,36 +1342,14 @@ void PexpireatCmd::DoInitial() {
return;
}
key_ = argv_[1];
if (pstd::string2int(argv_[2].data(), argv_[2].size(), &time_stamp_ms_) == 0) {
if (pstd::string2int(argv_[2].data(), argv_[2].size(), &time_stamp_millsec_) == 0) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
}

std::string PexpireatCmd::ToRedisProtocol() {
std::string content;
content.reserve(RAW_ARGS_LEN);
RedisAppendLenUint64(content, argv_.size(), "*");

// to expireat cmd
std::string expireat_cmd("expireat");
RedisAppendLenUint64(content, expireat_cmd.size(), "$");
RedisAppendContent(content, expireat_cmd);
// key
RedisAppendLenUint64(content, key_.size(), "$");
RedisAppendContent(content, key_);
// sec
char buf[100];
int64_t expireat = time_stamp_ms_ / 1000;
pstd::ll2string(buf, 100, expireat);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);
return content;
}

void PexpireatCmd::Do() {
int32_t res = db_->storage()->Expireat(key_, static_cast<int32_t>(time_stamp_ms_ / 1000));
int32_t res = db_->storage()->Expireat(key_, static_cast<int32_t>(time_stamp_millsec_));
if (res != -1) {
res_.AppendInteger(res);
s_ = rocksdb::Status::OK();
Expand All @@ -1387,7 +1365,7 @@ void PexpireatCmd::DoThroughDB() {

void PexpireatCmd::DoUpdateCache() {
if (s_.ok()) {
db_->cache()->Expireat(key_, time_stamp_ms_ / 1000);
db_->cache()->Expireat(key_, time_stamp_millsec_ / 1000);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -957,10 +957,10 @@ class Storage {
// While any error happens, you need to check type_status for
// the error message

// Set a timeout on key
// Set a timeout on key, milliseconds unit
// return -1 operation exception errors happen in database
// return >=0 success
int32_t Expire(const Slice& key, int64_t ttl);
int32_t Expire(const Slice& key, int64_t ttl_millsec);

// Removes the specified keys
// return -1 operation exception errors happen in database
Expand Down Expand Up @@ -1005,12 +1005,12 @@ class Storage {

// EXPIREAT has the same effect and semantic as EXPIRE, but instead of
// specifying the number of seconds representing the TTL (time to live), it
// takes an absolute Unix timestamp (seconds since January 1, 1970). A
// takes an absolute Unix timestamp (milliseconds since January 1, 1970). A
// timestamp in the past will delete the key immediately.
// return -1 operation exception errors happen in database
// return 0 if key does not exist
// return >=1 if the timueout was set
int32_t Expireat(const Slice& key, int64_t timestamp);
int32_t Expireat(const Slice& key, int64_t timestamp_millsec);

// Remove the existing timeout on key, turning the key from volatile (a key
// with an expire set) to persistent (a key that will never expire as no
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/base_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ class ParsedInternalValue {
SetCtimeToValue();
}

void SetRelativeTimestamp(int64_t ttl) {
void SetRelativeTimestamp(int64_t ttl_millsec) {
pstd::TimeType unix_time = pstd::NowMillis();
etime_ = unix_time + ttl * 1000;
etime_ = unix_time + ttl_millsec;
SetEtimeToValue();
}

Expand Down
24 changes: 12 additions & 12 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ class Redis {
Status ScanStreamsKeyNum(KeyInfo* key_info);

// Keys Commands
virtual Status StringsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {});
virtual Status HashesExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {});
virtual Status ListsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {});
virtual Status ZsetsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {});
virtual Status SetsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {});
virtual Status StringsExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta = {});
virtual Status HashesExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta = {});
virtual Status ListsExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta = {});
virtual Status ZsetsExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta = {});
virtual Status SetsExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta = {});

virtual Status StringsDel(const Slice& key, std::string&& prefetch_meta = {});
virtual Status HashesDel(const Slice& key, std::string&& prefetch_meta = {});
Expand All @@ -130,11 +130,11 @@ class Redis {
virtual Status SetsDel(const Slice& key, std::string&& prefetch_meta = {});
virtual Status StreamsDel(const Slice& key, std::string&& prefetch_meta = {});

virtual Status StringsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {});
virtual Status HashesExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {});
virtual Status ListsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {});
virtual Status SetsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {});
virtual Status ZsetsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {});
virtual Status StringsExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta = {});
virtual Status HashesExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta = {});
virtual Status ListsExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta = {});
virtual Status SetsExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta = {});
virtual Status ZsetsExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta = {});

virtual Status StringsPersist(const Slice& key, std::string&& prefetch_meta = {});
virtual Status HashesPersist(const Slice& key, std::string&& prefetch_meta = {});
Expand Down Expand Up @@ -185,8 +185,8 @@ class Redis {

Status Exists(const Slice& key);
Status Del(const Slice& key);
Status Expire(const Slice& key, int64_t timestamp);
Status Expireat(const Slice& key, int64_t timestamp);
Status Expire(const Slice& key, int64_t ttl_millsec);
Status Expireat(const Slice& key, int64_t timestamp_millsec);
Status Persist(const Slice& key);
Status TTL(const Slice& key, int64_t* timestamp);
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret);
Expand Down
12 changes: 6 additions & 6 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ Status Redis::PKHRScanRange(const Slice& key, const Slice& field_start, const st
return Status::OK();
}

Status Redis::HashesExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta) {
Status Redis::HashesExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta) {
std::string meta_value(std::move(prefetch_meta));
ScopeRecordLock l(lock_mgr_, key);
BaseMetaKey base_meta_key(key);
Expand Down Expand Up @@ -1183,8 +1183,8 @@ Status Redis::HashesExpire(const Slice& key, int64_t ttl, std::string&& prefetch
return Status::NotFound();
}

if (ttl > 0) {
parsed_hashes_meta_value.SetRelativeTimestamp(ttl);
if (ttl_millsec > 0) {
parsed_hashes_meta_value.SetRelativeTimestamp(ttl_millsec);
s = db_->Put(default_write_options_, handles_[kMetaCF], base_meta_key.Encode(), meta_value);
} else {
parsed_hashes_meta_value.InitialMetaValue();
Expand Down Expand Up @@ -1231,7 +1231,7 @@ Status Redis::HashesDel(const Slice& key, std::string&& prefetch_meta) {
return s;
}

Status Redis::HashesExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta) {
Status Redis::HashesExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta) {
std::string meta_value(std::move(prefetch_meta));
ScopeRecordLock l(lock_mgr_, key);
BaseMetaKey base_meta_key(key);
Expand Down Expand Up @@ -1259,8 +1259,8 @@ Status Redis::HashesExpireat(const Slice& key, int64_t timestamp, std::string&&
} else if (parsed_hashes_meta_value.Count() == 0) {
return Status::NotFound();
} else {
if (timestamp > 0) {
parsed_hashes_meta_value.SetEtime(static_cast<uint64_t>(timestamp));
if (timestamp_millsec > 0) {
parsed_hashes_meta_value.SetEtime(static_cast<uint64_t>(timestamp_millsec));
} else {
parsed_hashes_meta_value.InitialMetaValue();
}
Expand Down
12 changes: 6 additions & 6 deletions src/storage/src/redis_lists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ Status Redis::RPushx(const Slice& key, const std::vector<std::string>& values, u
return s;
}

Status Redis::ListsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta) {
Status Redis::ListsExpire(const Slice& key, int64_t ttl_millsec, std::string&& prefetch_meta) {
std::string meta_value(std::move(prefetch_meta));
ScopeRecordLock l(lock_mgr_, key);
BaseMetaKey base_meta_key(key);
Expand Down Expand Up @@ -1124,8 +1124,8 @@ Status Redis::ListsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_
return Status::NotFound();
}

if (ttl > 0) {
parsed_lists_meta_value.SetRelativeTimestamp(ttl);
if (ttl_millsec > 0) {
parsed_lists_meta_value.SetRelativeTimestamp(ttl_millsec);
s = db_->Put(default_write_options_, handles_[kMetaCF], base_meta_key.Encode(), meta_value);
} else {
parsed_lists_meta_value.InitialMetaValue();
Expand Down Expand Up @@ -1172,7 +1172,7 @@ Status Redis::ListsDel(const Slice& key, std::string&& prefetch_meta) {
return s;
}

Status Redis::ListsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta) {
Status Redis::ListsExpireat(const Slice& key, int64_t timestamp_millsec, std::string&& prefetch_meta) {
std::string meta_value(std::move(prefetch_meta));
ScopeRecordLock l(lock_mgr_, key);
BaseMetaKey base_meta_key(key);
Expand Down Expand Up @@ -1200,8 +1200,8 @@ Status Redis::ListsExpireat(const Slice& key, int64_t timestamp, std::string&& p
} else if (parsed_lists_meta_value.Count() == 0) {
return Status::NotFound();
} else {
if (timestamp > 0) {
parsed_lists_meta_value.SetEtime(static_cast<uint64_t>(timestamp));
if (timestamp_millsec > 0) {
parsed_lists_meta_value.SetEtime(static_cast<uint64_t>(timestamp_millsec));
} else {
parsed_lists_meta_value.InitialMetaValue();
}
Expand Down
Loading

0 comments on commit 30a71a6

Please sign in to comment.