Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:Slotsrestore #8

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ add_subdirectory(src/pstd)
add_subdirectory(src/net)
add_subdirectory(src/storage)
add_subdirectory(src/cache)
add_subdirectory(src/utils)
if (USE_PIKA_TOOLS)
add_subdirectory(tools)
endif()
Expand Down Expand Up @@ -767,7 +768,9 @@ message("pika BUILD_DATE = ${PIKA_BUILD_DATE}")

set(PIKA_BUILD_VERSION_CC ${CMAKE_BINARY_DIR}/pika_build_version.cc
src/pika_cache_load_thread.cc
)
src/utils/src/pika_lzf.cc
src/utils/src/pika_redis.cc
)
message("PIKA_BUILD_VERSION_CC : " ${PIKA_BUILD_VERSION_CC})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/build_version.cc.in ${PIKA_BUILD_VERSION_CC} @ONLY)

Expand Down Expand Up @@ -805,6 +808,7 @@ add_dependencies(${PROJECT_NAME}
rediscache
storage
cache
utils
)

target_include_directories(${PROJECT_NAME}
Expand All @@ -818,6 +822,7 @@ target_link_libraries(${PROJECT_NAME}
storage
net
pstd
utils
${GLOG_LIBRARY}
librocksdb.a
${LIB_PROTOBUF}
Expand Down
8 changes: 7 additions & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@ max-bytes-for-level-multiplier : 10
# slotmigrate [yes | no]
slotmigrate : no

# slotmigrate thread num
slotmigrate-thread-num : 8

# thread-migrate-keys-num
thread-migrate-keys-num : 64

# BlockBasedTable block_size, default 4k
# block-size: 4096

Expand Down Expand Up @@ -494,7 +500,7 @@ cache-lfu-decay-time: 1
#
# For example:
#
# user : worker on >password ~key* +@all
#user : default on >123456 ~* &* +@all

# Using an external ACL file
#
Expand Down
1 change: 1 addition & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const std::string kCmdNameSlotsMgrtSlotAsync = "slotsmgrtslot-async";
const std::string kCmdNameSlotsMgrtExecWrapper = "slotsmgrt-exec-wrapper";
const std::string kCmdNameSlotsMgrtAsyncStatus = "slotsmgrt-async-status";
const std::string kCmdNameSlotsMgrtAsyncCancel = "slotsmgrt-async-cancel";
const std::string kCmdNameSlotsrestore = "slotsrestore";

// Kv
const std::string kCmdNameSet = "set";
Expand Down
8 changes: 4 additions & 4 deletions include/pika_migrate_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class PikaParseSendThread : public net::Thread {
void ExitThread(void);

private:
int MigrateOneKey(net::NetCli* cli, const std::string& key, const char key_type, bool async);
int MigrateOneKey(std::unique_ptr<net::NetCli> cli, const std::string& key, const char key_type, bool async);
void DelKeysAndWriteBinlog(std::deque<std::pair<const char, std::string>>& send_keys, const std::shared_ptr<DB>& db);
bool CheckMigrateRecv(int64_t need_receive_num);
void *ThreadMain() override;
Expand All @@ -34,8 +34,8 @@ class PikaParseSendThread : public net::Thread {
int64_t timeout_ms_ = 60;
int32_t mgrtkeys_num_ = 0;
std::atomic<bool> should_exit_;
PikaMigrateThread *migrate_thread_ = nullptr;
net::NetCli *cli_ = nullptr;
std::unique_ptr<PikaMigrateThread> migrate_thread_;
std::unique_ptr<net::NetCli> cli_;
pstd::Mutex working_mutex_;
std::shared_ptr<DB> db_;
};
Expand Down Expand Up @@ -90,7 +90,7 @@ class PikaMigrateThread : public net::Thread {
std::mutex request_migrate_mutex_;

int32_t workers_num_ = 0;
std::vector<PikaParseSendThread*> workers_;
std::vector<std::unique_ptr<PikaParseSendThread>> workers_;

std::atomic<int32_t> working_thread_num_;
pstd::CondVar workers_cond_;
Expand Down
28 changes: 24 additions & 4 deletions include/pika_slot_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ class PikaMigrate {
void Unlock() {
mutex_.unlock();
}
net::NetCli* GetMigrateClient(const std::string& host, const int port, int timeout);
std::unique_ptr<net::NetCli> GetMigrateClient(const std::string& host, const int port, int timeout);

private:
std::map<std::string, void*> migrate_clients_;
pstd::Mutex mutex_;
void KillMigrateClient(net::NetCli* migrate_cli);
void KillMigrateClient(std::unique_ptr<net::NetCli> migrate_cli);
void KillAllMigrateClient();
int64_t TTLByType(const char key_type, const std::string& key, const std::shared_ptr<DB>& db);
int MigrateSend(net::NetCli* migrate_cli, const std::string& key, const char type, std::string& detail,
int MigrateSend(std::unique_ptr<net::NetCli> migrate_cli, const std::string& key, const char type, std::string& detail,
const std::shared_ptr<DB>& db);
bool MigrateRecv(net::NetCli* migrate_cli, int need_receive, std::string& detail);
bool MigrateRecv(std::unique_ptr<net::NetCli> migrate_cli, int need_receive, std::string& detail);
int ParseKey(const std::string& key, const char type, std::string& wbuf_str, const std::shared_ptr<DB>& db);
int ParseKKey(const std::string& key, std::string& wbuf_str, const std::shared_ptr<DB>& db);
int ParseZKey(const std::string& key, std::string& wbuf_str, const std::shared_ptr<DB>& db);
Expand Down Expand Up @@ -277,4 +277,24 @@ class SlotsCleanupOffCmd : public Cmd {
void DoInitial() override;
};

////slotsrestore key ttl(ms) value(rdb)
struct RestoreKey {
std::string key;
int64_t ttlms;
std::string value;
};

class SlotsrestoreCmd : public Cmd {
public:
SlotsrestoreCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {}
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new SlotsrestoreCmd(*this); }

private:
void DoInitial() override;
std::vector<struct RestoreKey> restore_keys_;
};

#endif
2 changes: 1 addition & 1 deletion include/pika_version.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@

#define PIKA_MAJOR 3
#define PIKA_MINOR 5
#define PIKA_PATCH 2
#define PIKA_PATCH 3

#endif // INCLUDE_PIKA_VERSION_H_
5 changes: 5 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(
std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSlotsCleanupOff, std::move(slotscleanupoffptr)));

std::unique_ptr<Cmd> slotsrestoreptr =
std::make_unique<SlotsrestoreCmd>(kCmdNameSlotsrestore, -4, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSlow);
cmd_table->insert(
std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSlotsrestore, std::move(slotsrestoreptr)));

// Kv
////SetCmd
std::unique_ptr<Cmd> setptr =
Expand Down
Loading
Loading