Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use master's wall time(physical time) as ttl in binlog #2699

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/pika_binlog_transverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "include/pika_command.h"
#include "include/pika_define.h"
#include "storage/storage.h"
#include "storage/storage_time.h"


uint32_t BinlogItem::exec_time() const { return exec_time_; }
Expand Down Expand Up @@ -84,6 +85,7 @@ bool PikaBinlogTransverter::BinlogDecode(BinlogType type, const std::string& bin
LOG(ERROR) << "Binlog Item type error, expect type:" << type << " actualy type: " << binlog_type;
return false;
}
storage::g_storage_logictime->UpdateLogicTime(binlog_item->exec_time());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update logical time before fetching execution time.

-  storage::g_storage_logictime->UpdateLogicTime(binlog_item->exec_time());
+  pstd::GetFixed32(&binlog_str, &binlog_item->exec_time_);
+  storage::g_storage_logictime->UpdateLogicTime(binlog_item->exec_time());

Update the execution time from binlog_str before calling UpdateLogicTime to ensure the logic time is updated with the correct value.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
storage::g_storage_logictime->UpdateLogicTime(binlog_item->exec_time());
pstd::GetFixed32(&binlog_str, &binlog_item->exec_time_);
storage::g_storage_logictime->UpdateLogicTime(binlog_item->exec_time());

pstd::GetFixed32(&binlog_str, &binlog_item->exec_time_);
pstd::GetFixed32(&binlog_str, &binlog_item->term_id_);
pstd::GetFixed64(&binlog_str, &binlog_item->logic_id_);
Expand Down
2 changes: 2 additions & 0 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
return c_ptr;
}
if (g_pika_server->readonly(current_db_) && opt != kCmdNameExec) {
storage::g_storage_logictime->SetProtectionMode(true);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor to improve clarity in setting protection mode.

- storage::g_storage_logictime->SetProtectionMode(true);
- c_ptr->res().SetRes(CmdRes::kErrOther, "READONLY You can't write against a read only replica.");
+ if (g_pika_server->readonly(current_db_)) {
+   storage::g_storage_logictime->SetProtectionMode(true);
+   c_ptr->res().SetRes(CmdRes::kErrOther, "READONLY You can't write against a read only replica.");
+ }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
storage::g_storage_logictime->SetProtectionMode(true);
if (g_pika_server->readonly(current_db_)) {
storage::g_storage_logictime->SetProtectionMode(true);
c_ptr->res().SetRes(CmdRes::kErrOther, "READONLY You can't write against a read only replica.");
}

c_ptr->res().SetRes(CmdRes::kErrOther, "READONLY You can't write against a read only replica.");
return c_ptr;
}
Expand All @@ -185,6 +186,7 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
}
}
}
storage::g_storage_logictime->SetProtectionMode(false);

// Process Command
c_ptr->Execute();
Expand Down
44 changes: 44 additions & 0 deletions src/storage/include/storage/storage_time.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2024-present The storage Authors. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef STORAGE_TIME_H
#define STORAGE_TIME_H

#include <atomic>
#include <memory>
#include <cstdint>
#include <rocksdb/env.h> // Assuming you're using RocksDB for time functions

namespace storage {

class LogicTime {
public:
// Constructor
LogicTime() : logic_time_(0), protection_mode_(false) {}

// Get the current logical time or the system time
int64_t Now();

// Update the logical time
void UpdateLogicTime(int64_t time);

// Get the protection mode status
bool ProtectionMode();

// Set the protection mode
void SetProtectionMode(bool on);

private:
std::atomic<int64_t> logic_time_; // Logical time value
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
std::atomic<bool> protection_mode_; // Protection mode flag
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure encapsulation and thread safety in LogicTime.

The LogicTime class uses std::atomic for logic_time_ and protection_mode_, which is good for thread safety. However, consider making the atomic variables private and providing thread-safe public methods to manipulate these values to ensure encapsulation and safety.


// Global unique pointer to LogicTime instance
extern std::unique_ptr<LogicTime> g_storage_logictime;

} // namespace storage

#endif // STORAGE_TIME_H

5 changes: 2 additions & 3 deletions src/storage/src/base_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ class BaseMetaFilter : public rocksdb::CompactionFilter {
BaseMetaFilter() = default;
bool Filter(int level, const rocksdb::Slice& key, const rocksdb::Slice& value, std::string* new_value,
bool* value_changed) const override {
int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);
auto cur_time = static_cast<uint64_t>(unix_time);
int64_t unix_time = g_storage_logictime->Now();
auto cur_time = static_cast<uint32_t>(unix_time);
/*
* For the filtering of meta information, because the field designs of string
* and list are different, their filtering policies are written separately.
Expand Down
5 changes: 3 additions & 2 deletions src/storage/src/base_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "src/mutex.h"

#include "pstd/include/env.h"
#include "storage/storage_time.h"

namespace storage {

Expand Down Expand Up @@ -134,8 +135,8 @@ class ParsedInternalValue {
if (etime_ == 0) {
return false;
}
int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);

int64_t unix_time = g_storage_logictime->Now();
return etime_ < unix_time;
}

Expand Down
37 changes: 37 additions & 0 deletions src/storage/src/storage_time.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include <memory>
#include "storage/storage_time.h"
#include <glog/logging.h>

namespace storage {

std::unique_ptr<LogicTime> g_storage_logictime = std::make_unique<LogicTime>();

int64_t LogicTime::Now() {
if (protection_mode_) {
chejinge marked this conversation as resolved.
Show resolved Hide resolved
return logic_time_;
}
int64_t t;
rocksdb::Env::Default()->GetCurrentTime(&t);
return t;
}
Comment on lines +14 to +21
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure thread safety when accessing logic_time_ outside of protection_mode_.

Consider using atomic operations or mutexes to ensure thread safety for logic_time_ access in non-protection mode environments.


void LogicTime::UpdateLogicTime(int64_t time) {
logic_time_ = time;
}

bool LogicTime::ProtectionMode() {
return protection_mode_;
}

void LogicTime::SetProtectionMode(bool on) {
protection_mode_ = on;
LOG(INFO) << "Set protection mode to: " << on;
}
Comment on lines +33 to +36
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for logging failures.

Implement error handling or fallback mechanisms for logging within SetProtectionMode to enhance reliability in case of logging system failures or initialization issues.


} // namespace storage

Loading