Skip to content

Commit

Permalink
feat:incr send binlog withttl (#2833)
Browse files Browse the repository at this point in the history
Co-authored-by: chejinge <[email protected]>
  • Loading branch information
chejinge and brother-jin committed Aug 7, 2024
1 parent f804191 commit f61950e
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 34 deletions.
9 changes: 9 additions & 0 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class IncrCmd : public Cmd {
int64_t new_value_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int32_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

class IncrbyCmd : public Cmd {
Expand All @@ -138,6 +140,8 @@ class IncrbyCmd : public Cmd {
int64_t by_ = 0, new_value_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int32_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

class IncrbyfloatCmd : public Cmd {
Expand All @@ -161,6 +165,8 @@ class IncrbyfloatCmd : public Cmd {
double by_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int32_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

class DecrCmd : public Cmd {
Expand Down Expand Up @@ -251,8 +257,11 @@ class AppendCmd : public Cmd {
private:
std::string key_;
std::string value_;
std::string new_value_;
void DoInitial() override;
rocksdb::Status s_;
int32_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

class MgetCmd : public Cmd {
Expand Down
111 changes: 107 additions & 4 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ void IncrCmd::DoInitial() {
}

void IncrCmd::Do() {
s_ = db_->storage()->Incrby(key_, 1, &new_value_);
s_ = db_->storage()->Incrby(key_, 1, &new_value_, &expired_timestamp_sec_);
if (s_.ok()) {
res_.AppendContent(":" + std::to_string(new_value_));
AddSlotKey("k", key_, db_);
Expand All @@ -290,6 +290,32 @@ void IncrCmd::DoUpdateCache() {
}
}

std::string IncrCmd::ToRedisProtocol() {
std::string content;
content.reserve(RAW_ARGS_LEN);
RedisAppendLen(content, 4, "*");

// to pksetexat cmd
std::string pksetexat_cmd("pksetexat");
RedisAppendLenUint64(content, pksetexat_cmd.size(), "$");
RedisAppendContent(content, pksetexat_cmd);
// key
RedisAppendLenUint64(content, key_.size(), "$");
RedisAppendContent(content, key_);
// time_stamp
char buf[100];
auto time_stamp = expired_timestamp_sec_;
pstd::ll2string(buf, sizeof(buf), time_stamp);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);
// value
std::string new_value_str = std::to_string(new_value_);
RedisAppendLenUint64(content, new_value_str.size(), "$");
RedisAppendContent(content, new_value_str);
return content;
}

void IncrbyCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameIncrby);
Expand All @@ -303,7 +329,7 @@ void IncrbyCmd::DoInitial() {
}

void IncrbyCmd::Do() {
s_ = db_->storage()->Incrby(key_, by_, &new_value_);
s_ = db_->storage()->Incrby(key_, by_, &new_value_, &expired_timestamp_sec_);
if (s_.ok()) {
res_.AppendContent(":" + std::to_string(new_value_));
AddSlotKey("k", key_, db_);
Expand All @@ -327,6 +353,32 @@ void IncrbyCmd::DoUpdateCache() {
}
}

std::string IncrbyCmd::ToRedisProtocol() {
std::string content;
content.reserve(RAW_ARGS_LEN);
RedisAppendLen(content, 4, "*");

// to pksetexat cmd
std::string pksetexat_cmd("pksetexat");
RedisAppendLenUint64(content, pksetexat_cmd.size(), "$");
RedisAppendContent(content, pksetexat_cmd);
// key
RedisAppendLenUint64(content, key_.size(), "$");
RedisAppendContent(content, key_);
// time_stamp
char buf[100];
auto time_stamp = expired_timestamp_sec_;
pstd::ll2string(buf, sizeof(buf), time_stamp);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);
// value
std::string new_value_str = std::to_string(new_value_);
RedisAppendLenUint64(content, new_value_str.size(), "$");
RedisAppendContent(content, new_value_str);
return content;
}

void IncrbyfloatCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameIncrbyfloat);
Expand All @@ -341,7 +393,7 @@ void IncrbyfloatCmd::DoInitial() {
}

void IncrbyfloatCmd::Do() {
s_ = db_->storage()->Incrbyfloat(key_, value_, &new_value_);
s_ = db_->storage()->Incrbyfloat(key_, value_, &new_value_, &expired_timestamp_sec_);
if (s_.ok()) {
res_.AppendStringLenUint64(new_value_.size());
res_.AppendContent(new_value_);
Expand Down Expand Up @@ -369,6 +421,32 @@ void IncrbyfloatCmd::DoUpdateCache() {
}
}

std::string IncrbyfloatCmd::ToRedisProtocol() {
std::string content;
content.reserve(RAW_ARGS_LEN);
RedisAppendLen(content, 4, "*");

// to pksetexat cmd
std::string pksetexat_cmd("pksetexat");
RedisAppendLenUint64(content, pksetexat_cmd.size(), "$");
RedisAppendContent(content, pksetexat_cmd);
// key
RedisAppendLenUint64(content, key_.size(), "$");
RedisAppendContent(content, key_);
// time_stamp
char buf[100];
auto time_stamp = expired_timestamp_sec_;
pstd::ll2string(buf, sizeof(buf), time_stamp);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);
// value
RedisAppendLenUint64(content, new_value_.size(), "$");
RedisAppendContent(content, new_value_);
return content;
}


void DecrCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameDecr);
Expand Down Expand Up @@ -485,7 +563,7 @@ void AppendCmd::DoInitial() {

void AppendCmd::Do() {
int32_t new_len = 0;
s_ = db_->storage()->Append(key_, value_, &new_len);
s_ = db_->storage()->Append(key_, value_, &new_len, &expired_timestamp_sec_, new_value_);
if (s_.ok() || s_.IsNotFound()) {
res_.AppendInteger(new_len);
AddSlotKey("k", key_, db_);
Expand All @@ -505,6 +583,31 @@ void AppendCmd::DoUpdateCache() {
}
}

std::string AppendCmd::ToRedisProtocol() {
std::string content;
content.reserve(RAW_ARGS_LEN);
RedisAppendLen(content, 4, "*");

// to pksetexat cmd
std::string pksetexat_cmd("pksetexat");
RedisAppendLenUint64(content, pksetexat_cmd.size(), "$");
RedisAppendContent(content, pksetexat_cmd);
// key
RedisAppendLenUint64(content, key_.size(), "$");
RedisAppendContent(content, key_);
// time_stamp
char buf[100];
auto time_stamp = expired_timestamp_sec_;
pstd::ll2string(buf, sizeof(buf), time_stamp);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);
// value
RedisAppendLenUint64(content, new_value_.size(), "$");
RedisAppendContent(content, new_value_);
return content;
}

void MgetCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameMget);
Expand Down
6 changes: 3 additions & 3 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class Storage {
// If key already exists and is a string, this command appends the value at
// the end of the string
// return the length of the string after the append operation
Status Append(const Slice& key, const Slice& value, int32_t* ret);
Status Append(const Slice& key, const Slice& value, int32_t* ret, int32_t* expired_timestamp_sec, std::string& out_new_value);

// Count the number of set bits (population counting) in a string.
// return the number of bits set to 1
Expand All @@ -276,11 +276,11 @@ class Storage {

// Increments the number stored at key by increment.
// If the key does not exist, it is set to 0 before performing the operation
Status Incrby(const Slice& key, int64_t value, int64_t* ret);
Status Incrby(const Slice& key, int64_t value, int64_t* ret, int32_t* expired_timestamp_sec);

// Increment the string representing a floating point number
// stored at key by the specified increment.
Status Incrbyfloat(const Slice& key, const Slice& value, std::string* ret);
Status Incrbyfloat(const Slice& key, const Slice& value, std::string* ret, int32_t* expired_timestamp_sec);

// Set key to hold the string value and set key to timeout after a given
// number of seconds
Expand Down
16 changes: 13 additions & 3 deletions src/storage/src/redis_strings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,10 @@ Status RedisStrings::PKPatternMatchDelWithRemoveKeys(const DataType& data_type,
return s;
}

Status RedisStrings::Append(const Slice& key, const Slice& value, int32_t* ret) {
Status RedisStrings::Append(const Slice& key, const Slice& value, int32_t* ret, int32_t* expired_timestamp_sec, std::string& out_new_value) {
std::string old_value;
*ret = 0;
*expired_timestamp_sec = 0;
ScopeRecordLock l(lock_mgr_, key);
Status s = db_->Get(default_read_options_, key, &old_value);
if (s.ok()) {
Expand All @@ -171,15 +172,18 @@ Status RedisStrings::Append(const Slice& key, const Slice& value, int32_t* ret)
int32_t timestamp = parsed_strings_value.timestamp();
std::string old_user_value = parsed_strings_value.value().ToString();
std::string new_value = old_user_value + value.ToString();
out_new_value = new_value;
StringsValue strings_value(new_value);
strings_value.set_timestamp(timestamp);
*ret = static_cast<int32_t>(new_value.size());
return db_->Put(default_write_options_, key, strings_value.Encode());
*expired_timestamp_sec = timestamp;
}
} else if (s.IsNotFound()) {
*ret = static_cast<int32_t>(value.size());
StringsValue strings_value(value);
return db_->Put(default_write_options_, key, strings_value.Encode());
*expired_timestamp_sec = 0;
}
return s;
}
Expand Down Expand Up @@ -539,9 +543,10 @@ Status RedisStrings::GetSet(const Slice& key, const Slice& value, std::string* o
return db_->Put(default_write_options_, key, strings_value.Encode());
}

Status RedisStrings::Incrby(const Slice& key, int64_t value, int64_t* ret) {
Status RedisStrings::Incrby(const Slice& key, int64_t value, int64_t* ret, int32_t* expired_timestamp_sec) {
std::string old_value;
std::string new_value;
*expired_timestamp_sec = 0;
ScopeRecordLock l(lock_mgr_, key);
Status s = db_->Get(default_read_options_, key, &old_value);
char buf[32] = {0};
Expand All @@ -568,20 +573,23 @@ Status RedisStrings::Incrby(const Slice& key, int64_t value, int64_t* ret) {
StringsValue strings_value(new_value);
strings_value.set_timestamp(timestamp);
return db_->Put(default_write_options_, key, strings_value.Encode());
*expired_timestamp_sec = timestamp;
}
} else if (s.IsNotFound()) {
*ret = value;
Int64ToStr(buf, 32, value);
StringsValue strings_value(buf);
*expired_timestamp_sec = 0;
return db_->Put(default_write_options_, key, strings_value.Encode());
} else {
return s;
}
}

Status RedisStrings::Incrbyfloat(const Slice& key, const Slice& value, std::string* ret) {
Status RedisStrings::Incrbyfloat(const Slice& key, const Slice& value, std::string* ret, int32_t* expired_timestamp_sec) {
std::string old_value;
std::string new_value;
*expired_timestamp_sec = 0;
long double long_double_by;
if (StrToLongDouble(value.data(), value.size(), &long_double_by) == -1) {
return Status::Corruption("Value is not a vaild float");
Expand Down Expand Up @@ -611,11 +619,13 @@ Status RedisStrings::Incrbyfloat(const Slice& key, const Slice& value, std::stri
StringsValue strings_value(new_value);
strings_value.set_timestamp(timestamp);
return db_->Put(default_write_options_, key, strings_value.Encode());
*expired_timestamp_sec = timestamp;
}
} else if (s.IsNotFound()) {
LongDoubleToStr(long_double_by, &new_value);
*ret = new_value;
StringsValue strings_value(new_value);
*expired_timestamp_sec = 0;
return db_->Put(default_write_options_, key, strings_value.Encode());
} else {
return s;
Expand Down
6 changes: 3 additions & 3 deletions src/storage/src/redis_strings.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class RedisStrings : public Redis {
Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) override;
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) override;
// Strings Command
Status Append(const Slice& key, const Slice& value, int32_t* ret);
Status Append(const Slice& key, const Slice& value, int32_t* ret, int32_t* expired_timestamp_sec, std::string& out_new_value);
Status BitCount(const Slice& key, int64_t start_offset, int64_t end_offset, int32_t* ret, bool have_range);
Status BitOp(BitOpType op, const std::string& dest_key, const std::vector<std::string>& src_keys, std::string &value_to_dest, int64_t* ret);
Status Decrby(const Slice& key, int64_t value, int64_t* ret);
Expand All @@ -37,8 +37,8 @@ class RedisStrings : public Redis {
Status Getrange(const Slice& key, int64_t start_offset, int64_t end_offset, std::string* ret);
Status GetrangeWithValue(const Slice& key, int64_t start_offset, int64_t end_offset, std::string* ret, std::string* value, int64_t* ttl);
Status GetSet(const Slice& key, const Slice& value, std::string* old_value);
Status Incrby(const Slice& key, int64_t value, int64_t* ret);
Status Incrbyfloat(const Slice& key, const Slice& value, std::string* ret);
Status Incrby(const Slice& key, int64_t value, int64_t* ret, int32_t* expired_timestamp_sec);
Status Incrbyfloat(const Slice& key, const Slice& value, std::string* ret, int32_t* expired_timestamp_sec);
Status MGet(const std::vector<std::string>& keys, std::vector<ValueStatus>* vss);
Status MGetWithTTL(const std::vector<std::string>& keys, std::vector<ValueStatus>* vss);
Status MSet(const std::vector<KeyValue>& kvs);
Expand Down
13 changes: 8 additions & 5 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ Status Storage::GetrangeWithValue(const Slice& key, int64_t start_offset, int64_
return strings_db_->GetrangeWithValue(key, start_offset, end_offset, ret, value, ttl);
}

Status Storage::Append(const Slice& key, const Slice& value, int32_t* ret) {
return strings_db_->Append(key, value, ret);
Status Storage::Append(const Slice& key, const Slice& value, int32_t* ret, int32_t* expired_timestamp_sec, std::string& out_new_value) {
return strings_db_->Append(key, value, ret, expired_timestamp_sec, out_new_value);
}

Status Storage::BitCount(const Slice& key, int64_t start_offset, int64_t end_offset, int32_t* ret, bool have_range) {
Expand All @@ -225,10 +225,13 @@ Status Storage::BitPos(const Slice& key, int32_t bit, int64_t start_offset, int6

Status Storage::Decrby(const Slice& key, int64_t value, int64_t* ret) { return strings_db_->Decrby(key, value, ret); }

Status Storage::Incrby(const Slice& key, int64_t value, int64_t* ret) { return strings_db_->Incrby(key, value, ret); }

Status Storage::Incrbyfloat(const Slice& key, const Slice& value, std::string* ret) {
return strings_db_->Incrbyfloat(key, value, ret);
Status Storage::Incrby(const Slice& key, int64_t value, int64_t* ret, int32_t* expired_timestamp_sec) {
return strings_db_->Incrby(key, value, ret, expired_timestamp_sec);
}

Status Storage::Incrbyfloat(const Slice& key, const Slice& value, std::string* ret, int32_t* expired_timestamp_sec) {
return strings_db_->Incrbyfloat(key, value, ret, expired_timestamp_sec);
}

Status Storage::Setex(const Slice& key, const Slice& value, int32_t ttl) { return strings_db_->Setex(key, value, ttl); }
Expand Down
Loading

0 comments on commit f61950e

Please sign in to comment.