diff --git a/CMakeLists.txt b/CMakeLists.txt index 6eb2725c2..426847bb8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -148,6 +148,7 @@ ENABLE_TESTING() ADD_SUBDIRECTORY(src/pstd) ADD_SUBDIRECTORY(src/net) +ADD_SUBDIRECTORY(src/praft) ADD_SUBDIRECTORY(src/storage) ADD_SUBDIRECTORY(src) @@ -200,4 +201,3 @@ ADD_CUSTOM_TARGET(cpplint echo '${LINT_FILES}' | xargs -n12 -P8 --linelength=120 --filter=-legal/copyright,-build/header_guard,-runtime/references ) - diff --git a/cmake/gflags.cmake b/cmake/gflags.cmake index 04f317de9..5b45360b9 100644 --- a/cmake/gflags.cmake +++ b/cmake/gflags.cmake @@ -7,9 +7,6 @@ INCLUDE_GUARD() INCLUDE(cmake/utils.cmake) -SET(MY_BUILD_TYPE ${CMAKE_BUILD_TYPE}) -SET(CMAKE_BUILD_TYPE ${THIRD_PARTY_BUILD_TYPE}) - FetchContent_DeclareGitHubWithMirror(gflags gflags/gflags v2.2.2 SHA256=19713a36c9f32b33df59d1c79b4958434cb005b5b47dc5400a7a4b078111d9b5 @@ -22,6 +19,7 @@ FetchContent_MakeAvailableWithArgs(gflags BUILD_gflags_LIB=ON BUILD_gflags_nothreads_LIB=OFF BUILD_TESTING=OFF + CMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE} ) FIND_PACKAGE(Threads REQUIRED) @@ -30,6 +28,4 @@ TARGET_LINK_LIBRARIES(gflags_static Threads::Threads) SET(GFLAGS_INCLUDE_PATH ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/include) SET(GFLAGS_LIBRARY ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/libgflags.a) -SET(GFLAGS_LIB ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/libgflags.a) - -SET(CMAKE_BUILD_TYPE ${MY_BUILD_TYPE}) \ No newline at end of file +SET(GFLAGS_LIB ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/libgflags.a) \ No newline at end of file diff --git a/cmake/glog.cmake b/cmake/glog.cmake index 4113ba48b..e694689f0 100644 --- a/cmake/glog.cmake +++ b/cmake/glog.cmake @@ -26,4 +26,4 @@ IF (CMAKE_BUILD_TYPE STREQUAL "Release") SET(GLOG_LIB ${CMAKE_CURRENT_BINARY_DIR}/_deps/glog-build/libglog.a CACHE BOOL "" FORCE) ELSEIF (CMAKE_BUILD_TYPE STREQUAL "Debug") SET(GLOG_LIB ${CMAKE_CURRENT_BINARY_DIR}/_deps/glog-build/libglogd.a CACHE BOOL "" FORCE) -ENDIF() +ENDIF() \ No newline at end of file diff --git a/cmake/leveldb.cmake b/cmake/leveldb.cmake index 17a3cf034..939df493b 100644 --- a/cmake/leveldb.cmake +++ b/cmake/leveldb.cmake @@ -6,9 +6,9 @@ INCLUDE_GUARD() FETCHCONTENT_DECLARE( - leveldb - GIT_REPOSITORY https://github.com/google/leveldb.git - GIT_TAG main + leveldb + GIT_REPOSITORY https://github.com/google/leveldb.git + GIT_TAG main ) SET(LEVELDB_BUILD_TESTS OFF CACHE BOOL "" FORCE) SET(LEVELDB_BUILD_BENCHMARKS OFF CACHE BOOL "" FORCE) diff --git a/cmake/modules/glog/Findgflags.cmake b/cmake/modules/glog/Findgflags.cmake index 97701ee84..b9b83ac84 100644 --- a/cmake/modules/glog/Findgflags.cmake +++ b/cmake/modules/glog/Findgflags.cmake @@ -8,4 +8,4 @@ IF (gflags_SOURCE_DIR) # add_library(gflags_static::gflags_static ALIAS gflags_static) INSTALL(TARGETS gflags_static EXPORT glog-targets) -ENDIF () +ENDIF () \ No newline at end of file diff --git a/cmake/protobuf.cmake b/cmake/protobuf.cmake index 67ffd150e..2717ef5aa 100644 --- a/cmake/protobuf.cmake +++ b/cmake/protobuf.cmake @@ -164,4 +164,4 @@ IF (NOT PROTOBUF_FOUND) SET(PROTOBUF_PROTOC_EXECUTABLE ${extern_protobuf_PROTOC_EXECUTABLE} CACHE FILEPATH "protobuf executable." FORCE) PROMPT_PROTOBUF_LIB(extern_protobuf zlib) -ENDIF (NOT PROTOBUF_FOUND) +ENDIF (NOT PROTOBUF_FOUND) \ No newline at end of file diff --git a/cmake/utils.cmake b/cmake/utils.cmake index 2241392d0..71aea8ba5 100644 --- a/cmake/utils.cmake +++ b/cmake/utils.cmake @@ -44,4 +44,4 @@ FUNCTION(FetchContent_DeclareGitHubWithMirror dep repo tag hash) https://github.com/${repo}/archive/${tag}.zip ${hash} ) -ENDFUNCTION() +ENDFUNCTION() \ No newline at end of file diff --git a/pikiwidb.conf b/pikiwidb.conf index e720a2133..3affebcc5 100644 --- a/pikiwidb.conf +++ b/pikiwidb.conf @@ -7,6 +7,9 @@ daemonize no # port 0 is not permitted. port 9221 +# Braft relies on brpc to communicate via the default port number plus the port offset +raft-port-offset 10 + # If you want you can bind a single interface, if the bind option is not # specified all the interfaces will listen for incoming connections. # diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index efea214ed..c5451e69f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -8,15 +8,17 @@ AUX_SOURCE_DIRECTORY(. PIKIWIDB_SRC) ADD_EXECUTABLE(pikiwidb ${PIKIWIDB_SRC}) SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin) -TARGET_INCLUDE_DIRECTORIES(pikiwidb PRIVATE - ${PROJECT_SOURCE_DIR}/src - ${PROJECT_SOURCE_DIR}/src/pstd - ${PROJECT_SOURCE_DIR}/src/net - ${PROJECT_SOURCE_DIR}/src/storage/include - ${rocksdb_SOURCE_DIR}/ - ${rocksdb_SOURCE_DIR}/include +TARGET_INCLUDE_DIRECTORIES(pikiwidb + PRIVATE ${PROJECT_SOURCE_DIR}/src + PRIVATE ${PROJECT_SOURCE_DIR}/src/pstd + PRIVATE ${PROJECT_SOURCE_DIR}/src/net + PRIVATE ${PROJECT_SOURCE_DIR}/src/storage/include + PRIVATE ${rocksdb_SOURCE_DIR}/ + PRIVATE ${rocksdb_SOURCE_DIR}/include PRIVATE ${BRAFT_INCLUDE_DIR} PRIVATE ${BRPC_INCLUDE_DIR} + PRIVATE ${GFLAGS_INCLUDE_PATH} + PRIVATE ${PROJECT_SOURCE_DIR}/src/praft ) IF (CMAKE_SYSTEM_NAME MATCHES "Darwin") @@ -34,5 +36,6 @@ ELSE () MESSAGE(FATAL_ERROR "only support linux or macOS") ENDIF () -TARGET_LINK_LIBRARIES(pikiwidb net; dl; fmt; storage; rocksdb; braft brpc ssl crypto zlib protobuf leveldb gflags "${MAC_LIBRARY}") -SET_TARGET_PROPERTIES(pikiwidb PROPERTIES LINKER_LANGUAGE CXX) +TARGET_LINK_LIBRARIES(pikiwidb net; dl; fmt; storage; rocksdb; pstd braft brpc ssl crypto zlib protobuf leveldb gflags rt crypto dl z praft "${MAC_LIBRARY}") + +SET_TARGET_PROPERTIES(pikiwidb PROPERTIES LINKER_LANGUAGE CXX) \ No newline at end of file diff --git a/src/base_cmd.h b/src/base_cmd.h index f7b0f0b6c..5fb96e0d9 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -26,6 +26,10 @@ const std::string kCmdNameDel = "del"; const std::string kCmdNameExists = "exists"; const std::string kCmdNamePExpire = "pexpire"; +// raft cmd +const std::string kCmdNameRaftCluster = "raft.cluster"; +const std::string kCmdNameRaftNode = "raft.node"; + // string cmd const std::string kCmdNameSet = "set"; const std::string kCmdNameGet = "get"; @@ -62,6 +66,7 @@ const std::string kCmdNameFlushdb = "flushdb"; const std::string kCmdNameFlushall = "flushall"; const std::string kCmdNameAuth = "auth"; const std::string kCmdNameSelect = "select"; +const std::string kCmdNameInfo = "info"; // hash cmd const std::string kCmdNameHSet = "hset"; @@ -119,6 +124,7 @@ enum CmdFlags { kCmdFlagsProtected = (1 << 12), // Don't accept in scripts kCmdFlagsModuleNoCluster = (1 << 13), // No cluster mode support kCmdFlagsNoMulti = (1 << 14), // Cannot be pipelined + kCmdFlagsRaft = (1 << 15), // raft }; enum AclCategory { @@ -142,7 +148,8 @@ enum AclCategory { kAclCategoryDangerous = (1 << 17), kAclCategoryConnection = (1 << 18), kAclCategoryTransaction = (1 << 19), - kAclCategoryScripting = (1 << 20) + kAclCategoryScripting = (1 << 20), + kAclCategoryRaft = (1 << 21), }; /** diff --git a/src/client.cc b/src/client.cc index 4296a7b96..0434489dc 100644 --- a/src/client.cc +++ b/src/client.cc @@ -15,6 +15,7 @@ #include "pstd_string.h" #include "slow_log.h" #include "store.h" +#include "praft.h" namespace pikiwidb { @@ -130,6 +131,10 @@ void CmdRes::SetRes(CmdRes::CmdRet _ret, const std::string& content) { case kInvalidCursor: AppendStringRaw("-ERR invalid cursor"); break; + case kWrongLeader: + AppendStringRaw("-ERR wrong leader"); + AppendStringRaw(content); + AppendStringRaw(CRLF); default: break; } @@ -268,6 +273,15 @@ int PClient::handlePacket(const char* start, int bytes) { } } + if (isJoinCmdTarget()) { + // Proccees the packet at one turn. + auto [len, is_disconnect] = PRAFT.ProcessClusterJoinCmdResponse(this, start, bytes); + if (is_disconnect) { + conn->ActiveClose(); + } + return len; + } + auto parseRet = parser_.ParseRequest(ptr, end); if (parseRet == PParseResult::kError) { if (!parser_.IsInitialState()) { @@ -410,7 +424,7 @@ PClient::PClient(TcpConnection* obj) int PClient::HandlePackets(pikiwidb::TcpConnection* obj, const char* start, int size) { int total = 0; - + LOG(INFO) << start; while (total < size) { auto processed = handlePacket(start + total, size - total); if (processed <= 0) { @@ -437,6 +451,9 @@ void PClient::OnConnect() { if (g_config.masterauth.empty()) { SetAuth(); } + } else if (isJoinCmdTarget()) { + SetName("ClusterJoinCmdConnection"); + PRAFT.SendNodeInfoRequest(this); } else { if (g_config.password.empty()) { SetAuth(); @@ -509,6 +526,10 @@ bool PClient::isPeerMaster() const { return repl_addr.GetIP() == PeerIP() && repl_addr.GetPort() == PeerPort(); } +bool PClient::isJoinCmdTarget() const { + return PRAFT.GetJoinCtx().GetPeerIp() == PeerIP() && PRAFT.GetJoinCtx().GetPort() == PeerPort(); +} + int PClient::uniqueID() const { if (auto c = getTcpConnection(); c) { return c->GetUniqueId(); @@ -673,6 +694,7 @@ void PClient::FeedMonitors(const std::vector& params) { } } } + void PClient::SetKey(std::vector& names) { keys_ = std::move(names); // use std::move clear copy expense } diff --git a/src/client.h b/src/client.h index e4aefe02e..16e470495 100644 --- a/src/client.h +++ b/src/client.h @@ -48,6 +48,7 @@ class CmdRes { kErrOther, KIncrByOverFlow, kInvalidCursor, + kWrongLeader, }; CmdRes() = default; @@ -209,6 +210,8 @@ class PClient : public std::enable_shared_from_this, public CmdRes { bool isPeerMaster() const; int uniqueID() const; + bool isJoinCmdTarget() const; + // TcpConnection's life is undetermined, so use weak ptr for safety. std::weak_ptr tcp_connection_; diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index 34f72ca31..adc463a94 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -7,6 +7,8 @@ #include "cmd_admin.h" #include "store.h" +#include "braft/raft.h" +#include "praft.h" namespace pikiwidb { @@ -78,4 +80,77 @@ void SelectCmd::DoCmd(PClient* client) { client->SetRes(CmdRes::kOK); } +InfoCmd::InfoCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsAdmin | kCmdFlagsReadonly, kAclCategoryAdmin) {} + +bool InfoCmd::DoInitial(PClient* client) { return true; } + +/* +* INFO raft +* Querying Node Information. +* Reply: +* raft_node_id:595100767 + raft_state:up + raft_role:follower + raft_is_voting:yes + raft_leader_id:1733428433 + raft_current_term:1 + raft_num_nodes:2 + raft_num_voting_nodes:2 + raft_node1:id=1733428433,state=connected,voting=yes,addr=localhost,port=5001,last_conn_secs=5,conn_errors=0,conn_oks=1 +*/ +// @todo The info raft command is only supported for the time being +void InfoCmd::DoCmd(PClient* client) { + if (client->argv_.size() <= 1) { + return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + } + + auto cmd = client->argv_[1]; + if (!strcasecmp(cmd.c_str(), "RAFT")) { + if (client->argv_.size() != 2) { + return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + } + + if (!PRAFT.IsInitialized()) { + return client->SetRes(CmdRes::kErrOther, "don't already cluster member"); + } + + auto node_status = PRAFT.GetNodeStatus(); + if (node_status.state == braft::State::STATE_END) { + return client->SetRes(CmdRes::kErrOther, "Node is not initialized"); + } + + std::string message(""); + message += "raft_group_id:" + PRAFT.GetGroupId() + "\r\n"; + message += "raft_node_id:" + PRAFT.GetNodeId() + "\r\n"; + if (braft::is_active_state(node_status.state)) { + message += "raft_state:up\r\n"; + } else { + message += "raft_state:down\r\n"; + } + message += "raft_role:" + std::string(braft::state2str(node_status.state)) + "\r\n"; + // message += "raft_is_voting:" + node_status.is_voting + "\r\n"; + message += "raft_leader_id:" + node_status.leader_id.to_string() + "\r\n"; + message += "raft_current_term:" + std::to_string(node_status.term) + "\r\n"; + // message += "raft_num_nodes:" + std::to_string(node_status.num_nodes) + "\r\n"; + // message += "raft_num_voting_nodes:" + std::to_string(node_status.num_voting_nodes) + "\r\n"; + + if (PRAFT.IsLeader()) { + std::vector peers; + auto status = PRAFT.GetListPeers(&peers); + if (!status.ok()) { + return client->SetRes(CmdRes::kErrOther, status.error_str()); + } + + for (int i = 0; i < peers.size(); i++) { + message += "raft_node" + std::to_string(i) + ":addr=" + butil::ip2str(peers[i].addr.ip).c_str() + ",port=" + std::to_string(peers[i].addr.port) + "\r\n"; + } + } + + client->AppendString(message); + } else { + client->SetRes(CmdRes::kErrOther, "ERR the cmd is not supported"); + } +} + } // namespace pikiwidb \ No newline at end of file diff --git a/src/cmd_admin.h b/src/cmd_admin.h index 9b20bca3f..476660e01 100644 --- a/src/cmd_admin.h +++ b/src/cmd_admin.h @@ -84,4 +84,15 @@ class SelectCmd : public BaseCmd { void DoCmd(PClient* client) override; }; +class InfoCmd : public BaseCmd { + public: + InfoCmd(const std::string& name, int16_t arity); + + protected: + bool DoInitial(PClient* client) override; + + private: + void DoCmd(PClient* client) override; +}; + } // namespace pikiwidb diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc new file mode 100644 index 000000000..08b17b4e1 --- /dev/null +++ b/src/cmd_raft.cc @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include +#include +#include + +#include "braft/configuration.h" +#include "client.h" +#include "cmd_raft.h" +#include "event_loop.h" +#include "pikiwidb.h" +#include "praft.h" +#include "pstd_status.h" +#include "pstd_string.h" + +#define VALID_NODE_ID(x) ((x) > 0) + +namespace pikiwidb { + +RaftNodeCmd::RaftNodeCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} + +bool RaftNodeCmd::DoInitial(PClient* client) { return true; } + +/* RAFT.NODE ADD [id] [address:port] + * Add a new node to the cluster. The [id] can be an explicit non-zero value, + * or zero to let the cluster choose one. + * Reply: + * -NOCLUSTER || + * -LOADING || + * -CLUSTERDOWN || + * -MOVED : || + * *2 + * : + * : + * + * RAFT.NODE REMOVE [id] + * Remove an existing node from the cluster. + * Reply: + * -NOCLUSTER || + * -LOADING || + * -CLUSTERDOWN || + * -MOVED : || + * +OK + */ +void RaftNodeCmd::DoCmd(PClient* client) { + // Check whether it is a leader. If it is not a leader, return the leader information + if (!PRAFT.IsLeader()) { + return client->SetRes(CmdRes::kWrongLeader, PRAFT.GetLeaderId()); + } + + auto cmd = client->argv_[1]; + if (!strcasecmp(cmd.c_str(), "ADD")) { + if (client->argv_.size() != 4) { + return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + } + + // RedisRaft has nodeid, but in Braft, NodeId is IP:Port. + // So we do not need to parse and use nodeid like redis; + auto s = PRAFT.AddPeer(client->argv_[3]); + if (s.ok()) { + client->SetRes(CmdRes::kOK); + } else { + client->SetRes(CmdRes::kErrOther); + } + } else if (!strcasecmp(cmd.c_str(), "REMOVE")) { + if (client->argv_.size() != 3) { + return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + } + + // (KKorpse)TODO: Redirect to leader if not leader. + auto s = PRAFT.RemovePeer(client->argv_[2]); + if (s.ok()) { + client->SetRes(CmdRes::kOK); + } else { + client->SetRes(CmdRes::kErrOther); + } + } else { + client->SetRes(CmdRes::kErrOther, "ERR RAFT.NODE supports ADD / REMOVE only"); + } +} + +RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} + +bool RaftClusterCmd::DoInitial(PClient* client) { return true; } + +// The endpoint must be in the league format of ip:port +std::string GetIpFromEndPoint(std::string& endpoint) { + auto pos = endpoint.find(':'); + if (pos == std::string::npos) { + return ""; + } + + return endpoint.substr(0, pos); +} + +// The endpoint must be in the league format of ip:port +int GetPortFromEndPoint(std::string& endpoint) { + auto pos = endpoint.find(':'); + if (pos == std::string::npos) { + return 0; + } + + int ret = 0; + pstd::String2int(endpoint.substr(pos + 1), &ret); + return ret; +} + +/* RAFT.CLUSTER INIT + * Initializes a new Raft cluster. + * is an optional 32 character string, if set, cluster will use it for the id + * Reply: + * +OK [dbid] + * + * RAFT.CLUSTER JOIN [addr:port] + * Join an existing cluster. + * The operation is asynchronous and may take place/retry in the background. + * Reply: + * +OK + */ +void RaftClusterCmd::DoCmd(PClient* client) { + if (client->argv_.size() < 2) { + return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + } + + if (PRAFT.IsInitialized()) { + return client->SetRes(CmdRes::kErrOther, "ERR Already cluster member"); + } + + auto cmd = client->argv_[1]; + if (!strcasecmp(cmd.c_str(), "INIT")) { + if (client->argv_.size() != 2 && client->argv_.size() != 3) { + return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + } + + std::string cluster_id; + if (client->argv_.size() == 3) { + cluster_id = client->argv_[2]; + if (cluster_id.size() != RAFT_DBID_LEN) { + return client->SetRes(CmdRes::kInvalidParameter, + "ERR cluster id must be " + std::to_string(RAFT_DBID_LEN) + " characters"); + } + } else { + cluster_id = pstd::RandomHexChars(RAFT_DBID_LEN); + } + auto s = PRAFT.Init(cluster_id, false); + if (!s.ok()) { + return client->SetRes(CmdRes::kErrOther, s.error_str()); + } + client->SetRes(CmdRes::kOK); + } else if (!strcasecmp(cmd.c_str(), "JOIN")) { + if (client->argv_.size() < 3) { + return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + } + + // (KKorpse)TODO: Support multiple nodes join at the same time. + if (client->argv_.size() > 3) { + return client->SetRes(CmdRes::kInvalidParameter, "ERR too many arguments"); + } + + auto addr = client->argv_[2]; + if (braft::PeerId(addr).is_empty()) { + return client->SetRes(CmdRes::kInvalidParameter, "ERR invalid ip::port: " + addr); + } + + auto on_new_conn = [](TcpConnection* obj) { + if (g_pikiwidb) { + g_pikiwidb->OnNewConnection(obj); + } + }; + auto fail_cb = [&](EventLoop* loop, const char* peer_ip, int port) { + PRAFT.OnJoinCmdConnectionFailed(loop, peer_ip, port); + }; + + auto loop = EventLoop::Self(); + auto peer_ip = GetIpFromEndPoint(addr); + auto port = GetPortFromEndPoint(addr); + // FIXME: The client here is not smart pointer, may cause undefined behavior. + // should use shared_ptr in DoCmd() rather than raw pointer. + auto ret = PRAFT.GetJoinCtx().Set(client, peer_ip, port); + if (!ret) { // other clients have joined + client->SetRes(CmdRes::kErrOther, "other clients have joined"); + } else { + loop->Connect(peer_ip.c_str(), port, on_new_conn, fail_cb); + // Not reply any message here, we will reply after the connection is established. + client->Clear(); + } + } else { + client->SetRes(CmdRes::kErrOther, "ERR RAFT.CLUSTER supports INIT / JOIN only"); + } +} +} // namespace pikiwidb \ No newline at end of file diff --git a/src/cmd_raft.h b/src/cmd_raft.h new file mode 100644 index 000000000..26652d269 --- /dev/null +++ b/src/cmd_raft.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include "braft/raft.h" +#include "brpc/server.h" +#include "base_cmd.h" + +namespace pikiwidb { + +class RaftNodeCmd : public BaseCmd { + public: + RaftNodeCmd(const std::string &name, int16_t arity); + + protected: + bool DoInitial(PClient *client) override; + + private: + void DoCmd(PClient *client) override; +}; + +class RaftClusterCmd : public BaseCmd { + public: + RaftClusterCmd(const std::string &name, int16_t arity); + + protected: + bool DoInitial(PClient *client) override; + + private: + void DoCmd(PClient *client) override; +}; + +} // namespace pikiwidb \ No newline at end of file diff --git a/src/cmd_table_manager.cc b/src/cmd_table_manager.cc index 74cce77b5..159ba3da4 100644 --- a/src/cmd_table_manager.cc +++ b/src/cmd_table_manager.cc @@ -13,6 +13,7 @@ #include "cmd_kv.h" #include "cmd_list.h" #include "cmd_set.h" +#include "cmd_raft.h" #include "cmd_table_manager.h" namespace pikiwidb { @@ -42,6 +43,13 @@ void CmdTableManager::InitCmdTable() { ADD_COMMAND(Flushall, 1); ADD_COMMAND(Select, 2); + // info + ADD_COMMAND(Info, -1); + + // raft + ADD_COMMAND(RaftCluster, -1); + ADD_COMMAND(RaftNode, -2); + // keyspace ADD_COMMAND(Del, -2); ADD_COMMAND(Exists, -2); diff --git a/src/config.cc b/src/config.cc index dbbe20979..592df73c5 100644 --- a/src/config.cc +++ b/src/config.cc @@ -91,6 +91,7 @@ bool LoadPikiwiDBConfig(const char* cfgFile, PConfig& cfg) { cfg.ip = parser.GetData("bind", cfg.ip); cfg.port = parser.GetData("port"); + cfg.raft_port_offset = parser.GetData("raft-port-offset"); cfg.timeout = parser.GetData("timeout"); cfg.dbpath = parser.GetData("db-path"); diff --git a/src/config.h b/src/config.h index 14aecf70e..1c5728523 100644 --- a/src/config.h +++ b/src/config.h @@ -27,6 +27,7 @@ struct PConfig { PString ip; unsigned short port; + unsigned short raft_port_offset; int timeout; diff --git a/src/pikiwidb.cc b/src/pikiwidb.cc index e0c9f5ba7..369cfa333 100644 --- a/src/pikiwidb.cc +++ b/src/pikiwidb.cc @@ -256,6 +256,8 @@ void PikiwiDB::Run() { } void PikiwiDB::Stop() { + pikiwidb::PRAFT.ShutDown(); + pikiwidb::PRAFT.Join(); slave_threads_.Exit(); worker_threads_.Exit(); } @@ -293,7 +295,6 @@ static void closeStd() { int main(int ac, char* av[]) { [[maybe_unused]] rocksdb::DB* db; g_pikiwidb = std::make_unique(); - if (!g_pikiwidb->ParseArgs(ac - 1, av + 1)) { Usage(); return -1; diff --git a/src/pikiwidb.h b/src/pikiwidb.h index 4dfdb685b..389096e24 100644 --- a/src/pikiwidb.h +++ b/src/pikiwidb.h @@ -10,9 +10,14 @@ #include "event_loop.h" #include "io_thread_pool.h" #include "tcp_connection.h" +#include "praft/praft.h" #define kPIKIWIDB_VERSION "4.0.0" +namespace pikiwidb { +class PRaft; +} // namespace pikiwidb + class PikiwiDB final { public: PikiwiDB() = default; diff --git a/src/praft/CMakeLists.txt b/src/praft/CMakeLists.txt new file mode 100644 index 000000000..30d7c4505 --- /dev/null +++ b/src/praft/CMakeLists.txt @@ -0,0 +1,33 @@ +# Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. An additional grant +# of patent rights can be found in the PATENTS file in the same directory. + +FILE(GLOB PRAFT_PROTO "${CMAKE_CURRENT_SOURCE_DIR}/*.proto") +EXECUTE_PROCESS( + COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} --cpp_out=${CMAKE_CURRENT_SOURCE_DIR} -I=${CMAKE_CURRENT_SOURCE_DIR} ${PRAFT_PROTO} +) + +FILE(GLOB PRAFT_SRC + "${CMAKE_CURRENT_SOURCE_DIR}/*.cc" + "${CMAKE_CURRENT_SOURCE_DIR}/*.h" +) +SET(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin) +ADD_LIBRARY(praft ${PRAFT_SRC}) + +TARGET_INCLUDE_DIRECTORIES(praft + PRIVATE ${PROJECT_SOURCE_DIR}/src + PRIVATE ${PROJECT_SOURCE_DIR}/src/pstd + PRIVATE ${PROJECT_SOURCE_DIR}/src/net + PRIVATE ${PROJECT_SOURCE_DIR}/src/storage/include + PRIVATE ${rocksdb_SOURCE_DIR}/ + PRIVATE ${rocksdb_SOURCE_DIR}/include + PRIVATE ${BRAFT_INCLUDE_DIR} + PRIVATE ${BRPC_INCLUDE_DIR} + PRIVATE ${GFLAGS_INCLUDE_PATH} + PRIVATE ${PROJECT_SOURCE_DIR}/src/praft +) + +TARGET_LINK_LIBRARIES(praft net; dl; fmt; storage; pstd braft brpc ssl crypto zlib protobuf leveldb gflags rocksdb rt crypto dl z) + +SET_TARGET_PROPERTIES(praft PROPERTIES LINKER_LANGUAGE CXX) \ No newline at end of file diff --git a/src/praft/praft.cc b/src/praft/praft.cc new file mode 100644 index 000000000..595fb3f08 --- /dev/null +++ b/src/praft/praft.cc @@ -0,0 +1,361 @@ +/* + * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +// +// praft.cc + +#include "praft.h" +#include +#include +#include +#include "client.h" +#include "config.h" +#include "pstd_string.h" +#include "braft/configuration.h" +#include "event_loop.h" +#include "pikiwidb.h" + +namespace pikiwidb { + +PRaft& PRaft::Instance() { + static PRaft store; + return store; +} + +butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { + if (node_ && server_) { + return {0, "OK"}; + } + + server_ = std::make_unique(); + DummyServiceImpl service(&PRAFT); + auto port = g_config.port + pikiwidb::g_config.raft_port_offset; + // Add your service into RPC server + if (server_->AddService(&service, + brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Fail to add service"; + return {EINVAL, "Fail to add service"}; + } + // raft can share the same RPC server. Notice the second parameter, because + // adding services into a running server is not allowed and the listen + // address of this server is impossible to get before the server starts. You + // have to specify the address of the server. + if (braft::add_service(server_.get(), port) != 0) { + LOG(ERROR) << "Fail to add raft service"; + return {EINVAL, "Fail to add raft service"}; + } + + // It's recommended to start the server before Counter is started to avoid + // the case that it becomes the leader while the service is unreacheable by + // clients. + // Notice the default options of server is used here. Check out details from + // the doc of brpc if you would like change some options; + if (server_->Start(port, nullptr) != 0) { + LOG(ERROR) << "Fail to start Server"; + return {EINVAL, "Fail to start Server"}; + } + + // It's ok to start PRaft; + assert(group_id.size() == RAFT_DBID_LEN); + this->dbid_ = group_id; + + // FIXME: g_config.ip is default to 127.0.0.0, which may not work in cluster. + raw_addr_ = g_config.ip + ":" + std::to_string(port); + butil::ip_t ip; + auto ret = butil::str2ip(g_config.ip.c_str(), &ip); + if (ret != 0) { + LOG(ERROR) << "Fail to covert str_ip to butil::ip_t"; + return {EINVAL, "Fail to covert str_ip to butil::ip_t"}; + } + butil::EndPoint addr(ip, port); + + // Default init in one node. + /* + initial_conf takes effect only when the replication group is started from an empty node. + The Configuration is restored from the snapshot and log files when the data in the replication group is not empty. + initial_conf is used only to create replication groups. + The first node adds itself to initial_conf and then calls add_peer to add other nodes. + Set initial_conf to empty for other nodes. + You can also start empty nodes simultaneously by setting the same inital_conf(ip:port of multiple nodes) for multiple nodes. + */ + std::string initial_conf(""); + if (!initial_conf_is_null) { + initial_conf = raw_addr_ + ":0,"; + } + if (node_options_.initial_conf.parse_from(initial_conf) != 0) { + LOG(ERROR) << "Fail to parse configuration, address: " << raw_addr_; + return {EINVAL, "Fail to parse address."}; + } + + // node_options_.election_timeout_ms = FLAGS_election_timeout_ms; + node_options_.fsm = this; + node_options_.node_owns_fsm = false; + // node_options_.snapshot_interval_s = FLAGS_snapshot_interval; + std::string prefix = "local://" + g_config.dbpath + "_praft"; + node_options_.log_uri = prefix + "/log"; + node_options_.raft_meta_uri = prefix + "/raft_meta"; + node_options_.snapshot_uri = prefix + "/snapshot"; + // node_options_.disable_cli = FLAGS_disable_cli; + node_ = std::make_unique("pikiwidb", braft::PeerId(addr)); // group_id + if (node_->init(node_options_) != 0) { + node_.reset(); + LOG(ERROR) << "Fail to init raft node"; + return {EINVAL, "Fail to init raft node"}; + } + + return {0, "OK"}; +} + +bool PRaft::IsLeader() const { + if (!node_) { + LOG(ERROR) << "Node is not initialized"; + return false; + } + return node_->is_leader(); +} + +std::string PRaft::GetLeaderId() const { + if (!node_) { + LOG(ERROR) << "Node is not initialized"; + return std::string("Fail to get leader id"); + } + return node_->leader_id().to_string(); +} + +std::string PRaft::GetNodeId() const { + if (!node_) { + LOG(ERROR) << "Node is not initialized"; + return std::string("Fail to get node id"); + } + return node_->node_id().to_string(); +} + +std::string PRaft::GetGroupId() const { + if (!node_) { + LOG(ERROR) << "Node is not initialized"; + return std::string("Fail to get cluster id"); + } + return dbid_; +} + +braft::NodeStatus PRaft::GetNodeStatus() const { + braft::NodeStatus node_status; + if (!node_) { + LOG(ERROR) << "Node is not initialized"; + } else { + node_->get_status(&node_status); + } + + return node_status; +} + +butil::Status PRaft::GetListPeers(std::vector* peers) { + if (!node_) { + LOG(ERROR) << "Node is not initialized"; + } else { + return node_->list_peers(peers); + } +} + +// Gets the cluster id, which is used to initialize node +void PRaft::SendNodeInfoRequest(PClient *client) { + assert(client); + + UnboundedBuffer req; + req.PushData("INFO raft", 9); + req.PushData("\r\n", 2); + client->SendPacket(req); +} + +void PRaft::SendNodeAddRequest(PClient *client) { + assert(client); + + // Node id in braft are ip:port, the node id param in RAFT.NODE ADD cmd will be ignored. + int unused_node_id = 0; + auto port = g_config.port + pikiwidb::g_config.raft_port_offset; + auto raw_addr = g_config.ip + ":" + std::to_string(port); + UnboundedBuffer req; + req.PushData("RAFT.NODE ADD ", 14); + req.PushData(std::to_string(unused_node_id).c_str(), std::to_string(unused_node_id).size()); + req.PushData(" ", 1); + req.PushData(raw_addr.data(), raw_addr.size()); + req.PushData("\r\n", 2); + client->SendPacket(req); +} + +std::tuple PRaft::ProcessClusterJoinCmdResponse(PClient* client, const char* start, int len) { + assert(start); + auto join_client = join_ctx_.GetClient(); + if (!join_client) { + LOG(WARNING) << "No client when processing cluster join cmd response."; + return std::make_tuple(0, true); + } + + bool is_disconnect = true; + std::string reply(start, len); + if (reply.find("+OK") != std::string::npos) { + LOG(INFO) << "Joined Raft cluster, node id:" << PRAFT.GetNodeId() << "dbid:" << PRAFT.dbid_; + join_client->SetRes(CmdRes::kOK); + join_client->SendPacket(join_client->Message()); + is_disconnect = false; + } else if (reply.find("-ERR wrong leader") != std::string::npos) { + // Resolve the ip address of the leader + pstd::StringTrimLeft(reply, "-ERR wrong leader"); + pstd::StringTrim(reply); + braft::PeerId peerId; + peerId.parse(reply); + + // Establish a connection with the leader and send the add request + auto on_new_conn = [](TcpConnection* obj) { + if (g_pikiwidb) { + g_pikiwidb->OnNewConnection(obj); + } + }; + auto fail_cb = [&](EventLoop* loop, const char* peer_ip, int port) { + PRAFT.OnJoinCmdConnectionFailed(loop, peer_ip, port); + }; + + auto loop = EventLoop::Self(); + auto peer_ip = std::string(butil::ip2str(peerId.addr.ip).c_str()); + auto port = peerId.addr.port; + // FIXME: The client here is not smart pointer, may cause undefined behavior. + // should use shared_ptr in DoCmd() rather than raw pointer. + PRAFT.GetJoinCtx().Set(join_client, peer_ip, port); + loop->Connect(peer_ip.c_str(), port, on_new_conn, fail_cb); + + // Not reply any message here, we will reply after the connection is established. + join_client->Clear(); + } else if (reply.find("raft_group_id") != std::string::npos) { + std::string prefix = "raft_group_id:"; + std::string::size_type prefix_length = prefix.length(); + std::string::size_type group_id_start = reply.find(prefix); + group_id_start += prefix_length; // 定位到raft_group_id的起始位置 + std::string::size_type group_id_end = reply.find("\r\n", group_id_start); + if (group_id_end != std::string::npos) { + std::string raft_group_id = reply.substr(group_id_start, group_id_end - group_id_start); + // initialize the slave node + auto s = PRAFT.Init(raft_group_id, true); + if (!s.ok()) { + join_client->SetRes(CmdRes::kErrOther, s.error_str()); + join_client->SendPacket(join_client->Message()); + return std::make_tuple(len, is_disconnect); + } + + PRAFT.SendNodeAddRequest(client); + is_disconnect = false; + } else { + LOG(ERROR) << "Joined Raft cluster fail, because of invalid raft_group_id"; + join_client->SetRes(CmdRes::kErrOther, "Invalid raft_group_id"); + join_client->SendPacket(join_client->Message()); + } + } else { + LOG(ERROR) << "Joined Raft cluster fail, " << start; + join_client->SetRes(CmdRes::kErrOther, std::string(start, len)); + join_client->SendPacket(join_client->Message()); + } + + return std::make_tuple(len, is_disconnect); +} + +butil::Status PRaft::AddPeer(const std::string& peer) { + if (!node_) { + LOG(ERROR) << "Node is not initialized"; + return {EINVAL, "Node is not initialized"}; + } + + braft::SynchronizedClosure done; + node_->add_peer(peer, &done); + done.wait(); + + if (!done.status().ok()) { + LOG(WARNING) << "Fail to add peer " << peer << " to node " << node_->node_id() << ", status " << done.status(); + return done.status(); + } + + return {0, "OK"}; +} + +butil::Status PRaft::RemovePeer(const std::string& peer) { + if (!node_) { + LOG(ERROR) << "Node is not initialized"; + return {EINVAL, "Node is not initialized"}; + } + + braft::SynchronizedClosure done; + node_->remove_peer(peer, &done); + done.wait(); + + if (!done.status().ok()) { + LOG(WARNING) << "Fail to remove peer " << peer << " from node " << node_->node_id() << ", status " << done.status(); + return done.status(); + } + + return {0, "OK"}; +} + +void PRaft::OnJoinCmdConnectionFailed([[maybe_unused]] EventLoop* loop, const char* peer_ip, int port) { + auto cli = join_ctx_.GetClient(); + if (cli) { + cli->SetRes(CmdRes::kErrOther, + "ERR failed to connect to cluster for join, please check logs " + std::string(peer_ip) + ":" + std::to_string(port)); + cli->SendPacket(cli->Message()); + } +} + +// Shut this node and server down. +void PRaft::ShutDown() { + if (node_) { + node_->shutdown(nullptr); + } + + if (server_) { + server_->Stop(0); + } +} + +// Blocking this thread until the node is eventually down. +void PRaft::Join() { + if (node_) { + node_->join(); + } + + if (server_) { + server_->Join(); + } +} + +void PRaft::Apply(braft::Task& task) { + if (node_) { + node_->apply(task); + } +} + +// @braft::StateMachine +void PRaft::on_apply(braft::Iterator& iter) { + // A batch of tasks are committed, which must be processed through + // |iter| + for (; iter.valid(); iter.next()) { + } +} + +void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) {} + +int PRaft::on_snapshot_load(braft::SnapshotReader* reader) { return 0; } + +void PRaft::on_leader_start(int64_t term) { + LOG(WARNING) << "Node " << node_->node_id() << "start to be leader, term" << term; +} + +void PRaft::on_leader_stop(const butil::Status& status) {} + +void PRaft::on_shutdown() {} +void PRaft::on_error(const ::braft::Error& e) {} +void PRaft::on_configuration_committed(const ::braft::Configuration& conf) {} +void PRaft::on_stop_following(const ::braft::LeaderChangeContext& ctx) {} +void PRaft::on_start_following(const ::braft::LeaderChangeContext& ctx) {} + +} // namespace pikiwidb \ No newline at end of file diff --git a/src/praft/praft.h b/src/praft/praft.h new file mode 100644 index 000000000..41d87283a --- /dev/null +++ b/src/praft/praft.h @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "braft/configuration.h" +#include "braft/raft.h" +#include "braft/util.h" +#include "brpc/server.h" +#include "brpc/controller.h" +#include "butil/status.h" +#include "client.h" +#include "common.h" +#include "config.h" +#include "event_loop.h" +#include "gflags/gflags.h" +#include "praft.pb.h" +#include "tcp_connection.h" + +namespace pikiwidb { + +#define RAFT_DBID_LEN 32 + +#define PRAFT PRaft::Instance() + +extern PConfig g_config; + +class JoinCmdContext { + friend class PRaft; + + public: + JoinCmdContext() = default; + ~JoinCmdContext() = default; + + bool Set(PClient* client, const std::string& peer_ip, int port) { + std::unique_lock lck(mtx_); + if (client_ != nullptr) { + return false; + } + assert(client); + client_ = client; + peer_ip_ = peer_ip; + port_ = port; + return true; + } + + void Clear() { + std::unique_lock lck(mtx_); + client_ = nullptr; + peer_ip_.clear(); + port_ = 0; + } + + // @todo the function seems useless + bool IsEmpty() { + std::unique_lock lck(mtx_); + return client_ == nullptr; + } + + PClient* GetClient() { return client_; } + braft::PeerId GetPeerIp() { return peer_ip_; } + int GetPort() { return port_; } + + private: + std::mutex mtx_; + PClient* client_ = nullptr; + std::string peer_ip_; + int port_ = 0; +}; + +class PRaft : public braft::StateMachine { + public: + PRaft() : server_(nullptr), node_(nullptr) {} + + ~PRaft() override = default; + + static PRaft& Instance(); + + //===--------------------------------------------------------------------===// + // Braft API + //===--------------------------------------------------------------------===// + butil::Status Init(std::string& cluster_id, bool initial_conf_is_null); + butil::Status AddPeer(const std::string& peer); + butil::Status RemovePeer(const std::string& peer); + butil::Status RaftRecvEntry(); + + void ShutDown(); + void Join(); + void Apply(braft::Task& task); + + //===--------------------------------------------------------------------===// + // ClusterJoin command + //===--------------------------------------------------------------------===// + JoinCmdContext& GetJoinCtx() { return join_ctx_; } + void SendNodeInfoRequest(PClient *client); + void SendNodeAddRequest(PClient *client); + std::tuple ProcessClusterJoinCmdResponse(PClient* client, const char* start, int len); + void OnJoinCmdConnectionFailed(EventLoop*, const char* peer_ip, int port); + + bool IsLeader() const; + std::string GetLeaderId() const; + std::string GetNodeId() const; + std::string GetGroupId() const; + braft::NodeStatus GetNodeStatus() const; + butil::Status GetListPeers(std::vector* peers); + + bool IsInitialized() const { return node_ != nullptr && server_ != nullptr; } + + private: + void on_apply(braft::Iterator& iter) override; + void on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) override; + int on_snapshot_load(braft::SnapshotReader* reader) override; + + void on_leader_start(int64_t term) override; + void on_leader_stop(const butil::Status& status) override; + + void on_shutdown() override; + void on_error(const ::braft::Error& e) override; + void on_configuration_committed(const ::braft::Configuration& conf) override; + void on_stop_following(const ::braft::LeaderChangeContext& ctx) override; + void on_start_following(const ::braft::LeaderChangeContext& ctx) override;; + + private: + std::unique_ptr server_; // brpc + std::unique_ptr node_; + braft::NodeOptions node_options_; // options for raft node + std::string raw_addr_; // ip:port of this node + + JoinCmdContext join_ctx_; // context for cluster join command + std::string dbid_; // dbid of group, +}; + +class DummyServiceImpl : public DummyService { +public: + explicit DummyServiceImpl(PRaft* praft) : praft_(praft) {} + void DummyMethod(::google::protobuf::RpcController* controller, + const ::pikiwidb::DummyRequest* request, + ::pikiwidb::DummyResponse* response, + ::google::protobuf::Closure* done) {} +private: + PRaft* praft_; +}; + +} // namespace pikiwidb \ No newline at end of file diff --git a/src/praft/praft.proto b/src/praft/praft.proto new file mode 100644 index 000000000..61a495f21 --- /dev/null +++ b/src/praft/praft.proto @@ -0,0 +1,13 @@ +syntax="proto3"; +package pikiwidb; +option cc_generic_services = true; + +message DummyRequest { +}; + +message DummyResponse { +}; + +service DummyService { + rpc DummyMethod(DummyRequest) returns (DummyResponse); +};