Skip to content

Commit

Permalink
fix bug and adjust logic
Browse files Browse the repository at this point in the history
  • Loading branch information
buzhimingyonghu committed Jan 14, 2025
1 parent 21e1e9e commit 2afafc0
Show file tree
Hide file tree
Showing 17 changed files with 275 additions and 286 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ src/build_version.cc
.cache

.idea/


#build
build/
buildtrees
Expand Down
25 changes: 11 additions & 14 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ slow-cmd-thread-pool-size : 1
admin-thread-pool-size : 2

# Slow cmd list e.g. hgetall, mset
slow-cmd-list :
slow-cmd-list :

# List of commands considered as administrative. These commands will be handled by the admin thread pool. Modify this list as needed.
# Default commands: info, ping, monitor
Expand Down Expand Up @@ -94,7 +94,7 @@ write-buffer-size : 256M
# If <= 0, a proper value is automatically calculated.
# (usually 1/8 of writer-buffer-size, rounded up to a multiple of 4KB)
# Supported Units [K|M|G], arena-block-size default unit is in [bytes].
arena-block-size :
arena-block-size :

# Timeout of Pika's connection, counting down starts When there are no requests
# on a connection (it enters sleep state), when the countdown reaches 0, the connection
Expand All @@ -108,12 +108,12 @@ timeout : 60
# [NOTICE] If this admin password is the same as user password (including both being empty),
# in this scenario, users are not subject to the restrictions imposed by the userblacklist.
# PS: "user password" refers to value of the parameter below: userpass.
requirepass :
requirepass :

# Password for replication verify, used for authentication when a slave
# connects to a master to request replication.
# [NOTICE] The value of this parameter must match the "requirepass" setting on the master.
masterauth :
masterauth :

# The [password of user], which is empty by default.
# [NOTICE] If this user password is the same as admin password (including both being empty),
Expand Down Expand Up @@ -154,7 +154,7 @@ consensus-level : 0

# The Prefix of dump file's name.
# All the files that generated by command "bgsave" will be name with this prefix.
dump-prefix :
dump-prefix :

# daemonize [yes | no].
#daemonize : yes
Expand Down Expand Up @@ -568,7 +568,7 @@ cache-num : 16
# cache-model 0:cache_none 1:cache_read
cache-model : 1
# cache-type: string, set, zset, list, hash, bit
cache-type : string, set, zset, list, hash, bit
cache-type: string, set, zset, list, hash, bit

# Maximum number of keys in the zset redis cache
# On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum
Expand Down Expand Up @@ -600,10 +600,10 @@ cache-maxmemory : 10737418240
cache-maxmemory-policy : 1

# cache-maxmemory-samples
cache-maxmemory-samples : 5
cache-maxmemory-samples: 5

# cache-lfu-decay-time
cache-lfu-decay-time : 1
cache-lfu-decay-time: 1


# is possible to manage access to Pub/Sub channels with ACL rules as well. The
Expand Down Expand Up @@ -657,12 +657,12 @@ cache-lfu-decay-time : 1
# 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election'
# which serves for the scenario of codis-pika cluster reelection
# You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING]
internal-used-unfinished-full-sync :
internal-used-unfinished-full-sync :

# for wash data from 4.0.0 to 4.0.1
# https://github.com/OpenAtomFoundation/pika/issues/2886
# default value: true
wash-data : true
wash-data: true

# Pika automatic compact compact strategy, a complement to rocksdb compact.
# Trigger the compact background task periodically according to `compact-interval`
Expand Down Expand Up @@ -696,7 +696,4 @@ dont-compact-sst-created-in-seconds : 20
# For OBD_Compact
# According to the number of sst files in rocksdb,
# compact every `compact-every-num-of-files` file.
best-delete-min-ratio : 10
# Generated by ReplicationID CONFIG REWRITE
replication-id : 6573021a3fdc3550d4e6a9bec5967726486d139b164b57b33d
run-id : ad00e48e2067e5a9d6e7893f83f83ab08cf5c86b
best-delete-min-ratio : 10
2 changes: 1 addition & 1 deletion include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
// enable copy, used default copy
// Cmd(const Cmd&);
void ProcessCommand(const HintKeys& hint_key = HintKeys());
void InternalProcessCommand(const HintKeys& hint_key,bool is_consistency = false);
void InternalProcessCommand(const HintKeys& hint_key);
void DoCommand(const HintKeys& hint_key);
bool DoReadCommandInCache();
void LogCommand() const;
Expand Down
31 changes: 13 additions & 18 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

#include <utility>

#include "include/pika_define.h"
#include "pstd/include/env.h"
#include "include/pika_binlog_transverter.h"
#include "include/pika_client_conn.h"
#include "include/pika_define.h"
#include "include/pika_slave_node.h"
#include "include/pika_stable_log.h"
#include "pstd/include/env.h"

class Context : public pstd::noncopyable {
public:
Expand Down Expand Up @@ -50,7 +50,7 @@ class SyncProgress {
pstd::Status AddSlaveNode(const std::string& ip, int port, const std::string& db_name, int session_id);
pstd::Status RemoveSlaveNode(const std::string& ip, int port);
pstd::Status Update(const std::string& ip, int port, const LogOffset& start, const LogOffset& end,
LogOffset* committed_index);
LogOffset* committed_index);
int SlaveSize();
int SlaveBinlogStateSize() {
std::shared_lock l(rwlock_);
Expand Down Expand Up @@ -81,10 +81,7 @@ class MemLog {
struct LogItem {
LogItem(const LogOffset& _offset, std::shared_ptr<Cmd> _cmd_ptr, std::shared_ptr<PikaClientConn> _conn_ptr,
std::shared_ptr<std::string> _resp_ptr)
: offset(_offset),
cmd_ptr(std::move(_cmd_ptr)),
conn_ptr(std::move(_conn_ptr)),
resp_ptr(std::move(_resp_ptr)) {}
: offset(_offset), cmd_ptr(std::move(_cmd_ptr)), conn_ptr(std::move(_conn_ptr)), resp_ptr(std::move(_resp_ptr)) {}
LogOffset offset;
std::shared_ptr<Cmd> cmd_ptr;
std::shared_ptr<PikaClientConn> conn_ptr;
Expand Down Expand Up @@ -142,10 +139,10 @@ class Log {

private:
int FindLogIndex(const LogOffset& offset);
std::shared_mutex logs_mutex_;
pstd::Mutex logs_mutex_;
std::vector<LogItem> logs_;
LogOffset last_index_;
LogOffset first_index_;
LogOffset last_index_ = LogOffset();
LogOffset first_index_ = LogOffset();
};

class ConsensusCoordinator {
Expand Down Expand Up @@ -222,11 +219,11 @@ class ConsensusCoordinator {

pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset);
pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, const BinlogOffset& end_offset,
std::vector<LogOffset>* log_offset);
pstd::Status FindBinlogFileNum(const std::map<uint32_t, std::string>& binlogs, uint64_t target_index,
uint32_t start_filenum, uint32_t* founded_filenum);
std::vector<LogOffset>* log_offset);
pstd::Status FindBinlogFileNum(const std::map<uint32_t, std::string>& binlogs, uint64_t target_index, uint32_t start_filenum,
uint32_t* founded_filenum);
pstd::Status FindLogicOffsetBySearchingBinlog(const BinlogOffset& hint_offset, uint64_t target_index,
LogOffset* found_offset);
LogOffset* found_offset);
pstd::Status FindLogicOffset(const BinlogOffset& start_offset, uint64_t target_index, LogOffset* found_offset);
pstd::Status GetLogsBefore(const BinlogOffset& start_offset, std::vector<LogOffset>* hints);

Expand All @@ -250,12 +247,10 @@ class ConsensusCoordinator {

// pacificA
public:
void InitContext(){
context_->Init();
}
void InitContext() { context_->Init(); }
bool checkFinished(const LogOffset& offset);
pstd::Status AppendEntries(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_logoffset);
void SetIsConsistency(bool is_consistency);
void SetConsistency(bool is_consistency);
bool GetISConsistency();
pstd::Status SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name);
pstd::Status Truncate(const LogOffset& offset);
Expand Down
2 changes: 1 addition & 1 deletion include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ enum SlaveState {
};

// debug only
const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync"};
const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync", "Candidate"};

enum BinlogSyncState {
kNotSync = 0,
Expand Down
92 changes: 39 additions & 53 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
#include <memory>
#include <set>

#include "src/cache/include/config.h"
#include "net/include/bg_thread.h"
#include "net/include/net_pubsub.h"
#include "net/include/thread_pool.h"
#include "pstd/include/pstd_mutex.h"
#include "pstd/include/pstd_status.h"
#include "pstd/include/pstd_string.h"
#include "src/cache/include/config.h"
#include "storage/backupable.h"
#include "storage/storage.h"

Expand Down Expand Up @@ -101,9 +101,7 @@ class PikaServer : public pstd::noncopyable {
void SetDispatchQueueLimit(int queue_limit);
void SetSlowCmdThreadPoolFlag(bool flag);
storage::StorageOptions storage_options();
std::unique_ptr<PikaDispatchThread>& pika_dispatch_thread() {
return pika_dispatch_thread_;
}
std::unique_ptr<PikaDispatchThread>& pika_dispatch_thread() { return pika_dispatch_thread_; }

/*
* DB use
Expand All @@ -117,21 +115,11 @@ class PikaServer : public pstd::noncopyable {
std::shared_ptr<DB> GetDB(const std::string& db_name);
std::set<std::string> GetAllDBName();
pstd::Status DoSameThingSpecificDB(const std::set<std::string>& dbs, const TaskArg& arg);
std::shared_mutex& GetDBLock() {
return dbs_rw_;
}
void DBLockShared() {
dbs_rw_.lock_shared();
}
void DBLock() {
dbs_rw_.lock();
}
void DBUnlock() {
dbs_rw_.unlock();
}
void DBUnlockShared() {
dbs_rw_.unlock_shared();
}
std::shared_mutex& GetDBLock() { return dbs_rw_; }
void DBLockShared() { dbs_rw_.lock_shared(); }
void DBLock() { dbs_rw_.lock(); }
void DBUnlock() { dbs_rw_.unlock(); }
void DBUnlockShared() { dbs_rw_.unlock_shared(); }

/*
* DB use
Expand All @@ -153,7 +141,7 @@ class PikaServer : public pstd::noncopyable {
bool TryAddSlave(const std::string& ip, int64_t port, int fd, const std::vector<DBStruct>& table_structs);
pstd::Mutex slave_mutex_; // protect slaves_;
std::vector<SlaveItem> slaves_;
int slave_size(){
int slave_size() {
std::lock_guard l(slave_mutex_);
return slaves_.size();
}
Expand All @@ -168,7 +156,7 @@ class PikaServer : public pstd::noncopyable {
*/
void SyncError();
void RemoveMaster();
bool SetMaster(std::string& master_ip, int master_port, bool is_consistency=false);
bool SetMaster(std::string& master_ip, int master_port, bool is_consistency = false);

/*
* Slave State Machine
Expand All @@ -181,8 +169,8 @@ class PikaServer : public pstd::noncopyable {
void UpdateMetaSyncTimestamp();
void UpdateMetaSyncTimestampWithoutLock();
bool IsFirstMetaSync();
bool IsConsistency();
void SetIsConsistency(bool is_consistency);
bool IsConsistency();
void SetConsistency(bool is_consistency);
void SetFirstMetaSync(bool v);

/*
Expand Down Expand Up @@ -271,12 +259,8 @@ class PikaServer : public pstd::noncopyable {
/*
* Disk usage statistic
*/
uint64_t GetDBSize() const {
return disk_statistic_.db_size_.load();
}
uint64_t GetLogSize() const {
return disk_statistic_.log_size_.load();
}
uint64_t GetDBSize() const { return disk_statistic_.db_size_.load(); }
uint64_t GetLogSize() const { return disk_statistic_.log_size_.load(); }

/*
* Network Statistic used
Expand Down Expand Up @@ -320,9 +304,11 @@ class PikaServer : public pstd::noncopyable {
/*
* Async migrate used
*/
int SlotsMigrateOne(const std::string& key, const std::shared_ptr<DB> &db);
bool SlotsMigrateBatch(const std::string &ip, int64_t port, int64_t time_out, int64_t slots, int64_t keys_num, const std::shared_ptr<DB>& db);
void GetSlotsMgrtSenderStatus(std::string *ip, int64_t* port, int64_t *slot, bool *migrating, int64_t *moved, int64_t *remained);
int SlotsMigrateOne(const std::string& key, const std::shared_ptr<DB>& db);
bool SlotsMigrateBatch(const std::string& ip, int64_t port, int64_t time_out, int64_t slots, int64_t keys_num,
const std::shared_ptr<DB>& db);
void GetSlotsMgrtSenderStatus(std::string* ip, int64_t* port, int64_t* slot, bool* migrating, int64_t* moved,
int64_t* remained);
bool SlotsMigrateAsyncCancel();
std::shared_mutex bgslots_protector_;

Expand Down Expand Up @@ -450,17 +436,15 @@ class PikaServer : public pstd::noncopyable {
storage::Status RewriteStorageOptions(const storage::OptionType& option_type,
const std::unordered_map<std::string, std::string>& options);

/*
* Instantaneous Metric used
*/
/*
* Instantaneous Metric used
*/
std::unique_ptr<Instant> instant_;

/*
* Diskrecovery used
*/
std::map<std::string, std::shared_ptr<DB>> GetDB() {
return dbs_;
}
/*
* Diskrecovery used
*/
std::map<std::string, std::shared_ptr<DB>> GetDB() { return dbs_; }

/*
* acl init
Expand Down Expand Up @@ -488,39 +472,41 @@ class PikaServer : public pstd::noncopyable {
* Cache used
*/
static void DoCacheBGTask(void* arg);
void ResetCacheAsync(uint32_t cache_num, std::shared_ptr<DB> db, cache::CacheConfig *cache_cfg = nullptr);
void ResetCacheAsync(uint32_t cache_num, std::shared_ptr<DB> db, cache::CacheConfig* cache_cfg = nullptr);
void ClearCacheDbAsync(std::shared_ptr<DB> db);
void ClearCacheDbAsyncV2(std::shared_ptr<DB> db);
void ResetCacheConfig(std::shared_ptr<DB> db);
void ClearHitRatio(std::shared_ptr<DB> db);
void OnCacheStartPosChanged(int zset_cache_start_direction, std::shared_ptr<DB> db);
void UpdateCacheInfo(void);
void ResetDisplayCacheInfo(int status, std::shared_ptr<DB> db);
void CacheConfigInit(cache::CacheConfig &cache_cfg);
void CacheConfigInit(cache::CacheConfig& cache_cfg);
void ProcessCronTask();
double HitRatio();
void SetLogNetActivities(bool value);
/*
* disable compact
*/
* disable compact
*/
void DisableCompact();

/*
* lastsave used
*/
int64_t GetLastSave() const {return lastsave_;}
void UpdateLastSave(int64_t lastsave) {lastsave_ = lastsave;}
void InitStatistic(CmdTable *inited_cmd_table) {
int64_t GetLastSave() const { return lastsave_; }
void UpdateLastSave(int64_t lastsave) { lastsave_ = lastsave; }
void InitStatistic(CmdTable* inited_cmd_table) {
// we insert all cmd name to statistic_.server_stat.exec_count_db,
// then when we can call PikaServer::UpdateQueryNumAndExecCountDB(const std::string&, const std::string&, bool) in parallel without lock
// although exec_count_db(unordered_map) is not thread-safe, but we won't trigger any insert or erase operation toward exec_count_db(unordered_map) during the running of pika
auto &exec_stat_map = statistic_.server_stat.exec_count_db;
// then when we can call PikaServer::UpdateQueryNumAndExecCountDB(const std::string&, const std::string&, bool) in
// parallel without lock although exec_count_db(unordered_map) is not thread-safe, but we won't trigger any insert
// or erase operation toward exec_count_db(unordered_map) during the running of pika
auto& exec_stat_map = statistic_.server_stat.exec_count_db;
for (auto& it : *inited_cmd_table) {
std::string cmd_name = it.first; //value copy is needed
pstd::StringToUpper(cmd_name); //cmd_name now is all uppercase
std::string cmd_name = it.first; // value copy is needed
pstd::StringToUpper(cmd_name); // cmd_name now is all uppercase
exec_stat_map.insert(std::make_pair(cmd_name, 0));
}
}

private:
/*
* TimingTask use
Expand Down
Loading

0 comments on commit 2afafc0

Please sign in to comment.