diff --git a/CMakeLists.txt b/CMakeLists.txt index 95ec7e2e02..883a171582 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -51,7 +51,7 @@ elseif(${BUILD_TYPE} STREQUAL RELWITHDEBINFO) set(LIB_BUILD_TYPE RELWITHDEBINFO) else() set(LIB_BUILD_TYPE RELEASE) - set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG") + set(CMAKE_CXX_FLAGS_RELEASE "-O2 -g -DNDEBUG") endif() if(CMAKE_SYSTEM_NAME MATCHES "Darwin") diff --git a/src/pika_admin.cc b/src/pika_admin.cc index e9cc766cae..3e93d223c2 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -476,8 +476,7 @@ std::string FlushallCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t std::string flushdb_cmd("flushdb"); RedisAppendLenUint64(content, flushdb_cmd.size(), "$"); RedisAppendContent(content, flushdb_cmd); - return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset, - content, {}); + return content; } void FlushallCmd::Execute() { diff --git a/src/pika_binlog.cc b/src/pika_binlog.cc index 534710f108..d0a612f24c 100644 --- a/src/pika_binlog.cc +++ b/src/pika_binlog.cc @@ -12,6 +12,7 @@ #include #include "include/pika_binlog_transverter.h" +#include "pstd/include/pstd_defer.h" #include "pstd_status.h" using pstd::Status; @@ -168,7 +169,25 @@ Status Binlog::Put(const std::string& item) { if (!opened_.load()) { return Status::Busy("Binlog is not open yet"); } - Status s = Put(item.c_str(), static_cast(item.size())); + uint32_t filenum = 0; + uint32_t term = 0; + uint64_t offset = 0; + uint64_t logic_id = 0; + + Lock(); + DEFER { + Unlock(); + }; + + Status s = GetProducerStatus(&filenum, &offset, &term, &logic_id); + if (!s.ok()) { + return s; + } + logic_id++; + std::string data = PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, + time(nullptr), term, logic_id, filenum, offset, item, {}); + + s = Put(data.c_str(), static_cast(data.size())); if (!s.ok()) { binlog_io_error_.store(true); } diff --git a/src/pika_command.cc b/src/pika_command.cc index e41fe2ea55..757c54cf1d 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -910,8 +910,7 @@ std::string Cmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_i RedisAppendContent(content, v); } - return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset, - content, {}); + return content; } bool Cmd::CheckArg(uint64_t num) const { return !((arity_ > 0 && num != arity_) || (arity_ < 0 && num < -arity_)); } diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index 7164f5c009..4a94f957ab 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -355,31 +355,12 @@ Status ConsensusCoordinator::ProposeLog(const std::shared_ptr& cmd_ptr, std LogOffset log_offset; - stable_logger_->Logger()->Lock(); -// std::this_thread::sleep_for(std::chrono::seconds(20)); - // build BinlogItem - uint32_t filenum = 0; - uint32_t term = 0; - uint64_t offset = 0; - uint64_t logic_id = 0; - Status s = stable_logger_->Logger()->GetProducerStatus(&filenum, &offset, &term, &logic_id); - if (!s.ok()) { - stable_logger_->Logger()->Unlock(); - return s; - } BinlogItem item; - item.set_exec_time(time(nullptr)); - item.set_term_id(term); - item.set_logic_id(logic_id + 1); - item.set_filenum(filenum); - item.set_offset(offset); // make sure stable log and mem log consistent - s = InternalAppendLog(item, cmd_ptr, std::move(conn_ptr), std::move(resp_ptr)); + Status s = InternalAppendLog(item, cmd_ptr, std::move(conn_ptr), std::move(resp_ptr)); if (!s.ok()) { - stable_logger_->Logger()->Unlock(); return s; } - stable_logger_->Logger()->Unlock(); g_pika_server->SignalAuxiliary(); return Status::OK(); @@ -409,9 +390,7 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr& cmd_pt return Status::OK(); } - stable_logger_->Logger()->Lock(); Status s = InternalAppendLog(attribute, cmd_ptr, nullptr, nullptr); - stable_logger_->Logger()->Unlock(); InternalApplyFollower(MemLog::LogItem(LogOffset(), cmd_ptr, nullptr, nullptr)); return Status::OK(); diff --git a/src/pika_kv.cc b/src/pika_kv.cc index 7886a9cc73..c6d7d981d4 100644 --- a/src/pika_kv.cc +++ b/src/pika_kv.cc @@ -126,8 +126,7 @@ std::string SetCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logi // value RedisAppendLenUint64(content, value_.size(), "$"); RedisAppendContent(content, value_); - return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset, - content, {}); + return content; } else { return Cmd::ToBinlog(exec_time, term_id, logic_id, filenum, offset); } @@ -510,9 +509,7 @@ std::string SetnxCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t lo // value RedisAppendLenUint64(content, value_.size(), "$"); RedisAppendContent(content, value_); - - return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset, - content, {}); + return content; } void SetexCmd::DoInitial() { @@ -561,8 +558,7 @@ std::string SetexCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t lo // value RedisAppendLenUint64(content, value_.size(), "$"); RedisAppendContent(content, value_); - return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset, - content, {}); + return content; } void PsetexCmd::DoInitial() { @@ -610,8 +606,7 @@ std::string PsetexCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t l // value RedisAppendLenUint64(content, value_.size(), "$"); RedisAppendContent(content, value_); - return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset, - content, {}); + return content; } void DelvxCmd::DoInitial() { @@ -891,9 +886,7 @@ std::string ExpireCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t l std::string at(buf); RedisAppendLenUint64(content, at.size(), "$"); RedisAppendContent(content, at); - - return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset, - content, {}); + return content; } void PexpireCmd::DoInitial() { @@ -938,9 +931,7 @@ std::string PexpireCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t std::string at(buf); RedisAppendLenUint64(content, at.size(), "$"); RedisAppendContent(content, at); - - return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset, - content, {}); + return content; } void ExpireatCmd::DoInitial() { @@ -997,9 +988,7 @@ std::string PexpireatCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_ std::string at(buf); RedisAppendLenUint64(content, at.size(), "$"); RedisAppendContent(content, at); - - return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset, - content, {}); + return content; } void PexpireatCmd::Do(std::shared_ptr slot) {