Skip to content

Commit

Permalink
Merge branch 'unstable' into fix_slave_binlog_lock
Browse files Browse the repository at this point in the history
  • Loading branch information
chejinge authored Feb 26, 2024
2 parents 478c6bc + 0004932 commit 3aa8982
Show file tree
Hide file tree
Showing 101 changed files with 5,922 additions and 8,142 deletions.
2 changes: 0 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ elseif(${BUILD_TYPE} STREQUAL RELWITHDEBINFO)
else()
set(LIB_BUILD_TYPE RELEASE)
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -g -DNDEBUG")

endif()

if(CMAKE_SYSTEM_NAME MATCHES "Darwin")
Expand Down Expand Up @@ -172,7 +171,6 @@ set(GTEST_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
set(GTEST_MAIN_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
set(GMOCK_INCLUDE_DIR ${INSTALL_INCLUDEDIR})


ExternalProject_Add(gflags
URL
https://github.com/gflags/gflags/archive/refs/tags/v2.2.2.tar.gz
Expand Down
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,26 @@ Users can directly download the latest binary version package from [releases](ht
```


* #### 3.4 Running with docker-compose
docker-compose.yaml
```yaml
pikadb:
image: pikadb/pika:lastest
container_name: pikadb
ports:
- "6379:9221"
volumes:
- ./data/pika:/pika/log
# Specify the configuration file path. If you need to specify a configuration file, specify it here.
# Note: pika.conf should be in the ./deploy/pika directory
#- ./deploy/pika:/pika/conf
- ./data/pika/db:/pika/db
- ./data/pika/dump:/pika/dump
- ./data/pika/dbsync:/pika/dbsync
privileged: true
restart: always
```


## Performance test

Expand Down
19 changes: 18 additions & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,24 @@ Pika 力求在完全兼容 Redis 协议、 继承 Redis 便捷运维设计的前
--image redis -it --rm --restart=Never \
-- /usr/local/bin/redis-cli -h pika-sample -p 9221 info
```

* #### 3.4 使用 docker-compose
docker-compose.yaml
```yaml
pikadb:
image: pikadb/pika:lastest
container_name: pikadb
ports:
- "6379:9221"
volumes:
- ./data/pika:/pika/log
# 指定配置文件路径,如果有需要指定配置文件则在这里指定 注意: pika.conf 要在./deploy/pika目录中
#- ./deploy/pika:/pika/conf
- ./data/pika/db:/pika/db
- ./data/pika/dump:/pika/dump
- ./data/pika/dbsync:/pika/dbsync
privileged: true
restart: always
```
## Pika 未来工作规划

### 1、Pika 单机版
Expand Down
4 changes: 4 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
# Port 10221 is used for Rsync, and port 11221 is used for Replication, while the listening port is 9221.
port : 9221

db-instance-num : 3
rocksdb-ttl-second : 86400 * 7;
rocksdb-periodic-second : 86400 * 3;

# Random value identifying the Pika server, its string length must be 40.
# If not set, Pika will generate a random string with a length of 40 random characters.
# run-id :
Expand Down
2 changes: 1 addition & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ class DiskRecoveryCmd : public Cmd {

private:
void DoInitial() override;
std::map<std::string, uint64_t> background_errors_;
std::map<int, uint64_t> background_errors_;
};

class ClearReplicationIDCmd : public Cmd {
Expand Down
2 changes: 0 additions & 2 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
#include "pstd/include/pstd_mutex.h"
#include "pstd/include/pstd_status.h"
#include "pstd/include/noncopyable.h"

#include "include/pika_define.h"


std::string NewFileName(const std::string& name, uint32_t current);

class Version final : public pstd::noncopyable {
Expand Down
2 changes: 1 addition & 1 deletion include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,4 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
std::vector<std::shared_ptr<pstd::Mutex>> cache_mutexs_;
};

#endif
#endif
2 changes: 1 addition & 1 deletion include/pika_cache_load_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class PikaCacheLoadThread : public net::Thread {
private:
std::atomic_bool should_exit_;
std::deque<std::tuple<const char, std::string, const std::shared_ptr<DB>>> loadkeys_queue_;

pstd::CondVar loadkeys_cond_;
pstd::Mutex loadkeys_mutex_;

Expand Down
27 changes: 24 additions & 3 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
#include <set>
#include <unordered_set>

#include "rocksdb/compression_type.h"

#include "pstd/include/base_conf.h"
#include "pstd/include/pstd_mutex.h"
#include "pstd/include/pstd_string.h"

#include "acl.h"
#include "include/pika_define.h"
#include "include/pika_meta.h"
#include "rocksdb/compression_type.h"

#define kBinlogReadWinDefaultSize 9000
Expand Down Expand Up @@ -76,6 +77,15 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return db_path_;
}
int db_instance_num() {
return db_instance_num_;
}
uint64_t rocksdb_ttl_second() {
return rocksdb_ttl_second_.load();
}
uint64_t rocksdb_periodic_compaction_second() {
return rocksdb_periodic_second_.load();
}
std::string db_sync_path() {
std::shared_lock l(rwlock_);
return db_sync_path_;
Expand Down Expand Up @@ -376,7 +386,6 @@ class PikaConf : public pstd::BaseConf {
bool daemonize() { return daemonize_; }
std::string pidfile() { return pidfile_; }
int binlog_file_size() { return binlog_file_size_; }
PikaMeta* local_meta() { return local_meta_.get(); }
std::vector<rocksdb::CompressionType> compression_per_level();
std::string compression_all_levels() const { return compression_per_level_; };
static rocksdb::CompressionType GetCompression(const std::string& value);
Expand Down Expand Up @@ -416,6 +425,15 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("slaveof", value);
slaveof_ = value;
}

void SetRocksdbTTLSecond(uint64_t ttl) {
rocksdb_ttl_second_.store(ttl);
}

void SetRocksdbPeriodicSecond(uint64_t value) {
rocksdb_periodic_second_.store(value);
}

void SetReplicationID(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("replication-id", value);
Expand Down Expand Up @@ -655,6 +673,7 @@ class PikaConf : public pstd::BaseConf {
int ConfigRewriteReplicationID();

private:
// TODO: replace mutex with atomic value
int port_ = 0;
int slave_priority_ = 0;
int thread_num_ = 0;
Expand All @@ -668,6 +687,7 @@ class PikaConf : public pstd::BaseConf {
std::string log_path_;
std::string log_level_;
std::string db_path_;
int db_instance_num_ = 0;
std::string db_sync_path_;
std::string compact_cron_;
std::string compact_interval_;
Expand Down Expand Up @@ -719,6 +739,8 @@ class PikaConf : public pstd::BaseConf {
int max_background_compactions_ = 0;
int max_background_jobs_ = 0;
int max_cache_files_ = 0;
std::atomic<uint64_t> rocksdb_ttl_second_ = 0;
std::atomic<uint64_t> rocksdb_periodic_second_ = 0;
int max_bytes_for_level_multiplier_ = 0;
int64_t block_size_ = 0;
int64_t block_cache_ = 0;
Expand Down Expand Up @@ -787,7 +809,6 @@ class PikaConf : public pstd::BaseConf {
int64_t blob_file_size_ = 256 * 1024 * 1024; // 256M
std::string blob_compression_type_ = "none";

std::unique_ptr<PikaMeta> local_meta_;
std::shared_mutex rwlock_;

// Rsync Rate limiting configuration
Expand Down
4 changes: 2 additions & 2 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

#include <utility>

#include "include/pika_define.h"
#include "pstd/include/env.h"
#include "include/pika_binlog_transverter.h"
#include "include/pika_client_conn.h"
#include "include/pika_define.h"
#include "include/pika_slave_node.h"
#include "include/pika_stable_log.h"
#include "pstd/include/env.h"

class Context : public pstd::noncopyable {
public:
Expand Down
8 changes: 4 additions & 4 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ const std::string kDefaultRsyncAuth = "default";
const int kMaxRsyncParallelNum = 4;

struct DBStruct {
DBStruct(std::string tn)
: db_name(std::move(tn)) {}
DBStruct(std::string tn, int32_t inst_num)
: db_name(std::move(tn)), db_instance_num(inst_num) {}

bool operator==(const DBStruct& db_struct) const {
return db_name == db_struct.db_name;
return db_name == db_struct.db_name && db_instance_num == db_struct.db_instance_num;
}
std::string db_name;
int32_t db_instance_num = 0;
};

// slave item
struct SlaveItem {
std::string ip_port;
std::string ip;
Expand Down
33 changes: 0 additions & 33 deletions include/pika_meta.h

This file was deleted.

47 changes: 47 additions & 0 deletions include/pika_monitor_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_MONITOR_THREAD_H_
#define PIKA_MONITOR_THREAD_H_

#include <atomic>
#include <deque>
#include <list>
#include <queue>

#include "net/include/net_thread.h"
#include "pstd/include/pstd_mutex.h"
#include "include/pika_define.h"
#include "include/pika_client_conn.h"

class PikaMonitorThread : public net::Thread {
public:
PikaMonitorThread();
~PikaMonitorThread() override;

void AddMonitorClient(const std::shared_ptr<PikaClientConn>& client_ptr);
void AddMonitorMessage(const std::string& monitor_message);
int32_t ThreadClientList(std::vector<ClientInfo>* client = nullptr);
bool ThreadClientKill(const std::string& ip_port = "all");
bool HasMonitorClients();

private:
void AddCronTask(const MonitorCronTask& task);
bool FindClient(const std::string& ip_port);
net::WriteStatus SendMessage(int32_t fd, std::string& message);
void RemoveMonitorClient(const std::string& ip_port);

std::atomic<bool> has_monitor_clients_;
pstd::Mutex monitor_mutex_protector_;
pstd::CondVar monitor_cond_;

std::list<ClientInfo> monitor_clients_;
std::deque<std::string> monitor_messages_;
std::queue<MonitorCronTask> cron_tasks_;

void* ThreadMain() override;
void RemoveMonitorClient(int32_t client_fd);
};
#endif
2 changes: 1 addition & 1 deletion include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
#include "net/include/net_conn.h"
#include "net/include/thread_pool.h"
#include "pstd/include/pstd_status.h"
#include "include/pika_define.h"

#include "include/pika_binlog_reader.h"
#include "include/pika_define.h"
#include "include/pika_repl_bgworker.h"
#include "include/pika_repl_client_thread.h"

Expand Down
2 changes: 1 addition & 1 deletion include/pika_rsync_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#ifndef PIKA_RSYNC_SERVICE_H_
#define PIKA_RSYNC_SERVICE_H_

#include "iostream"
#include <iostream>

class PikaRsyncService {
public:
Expand Down
17 changes: 4 additions & 13 deletions include/pika_slot_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,14 @@
const std::string SlotKeyPrefix = "_internal:slotkey:4migrate:";
const std::string SlotTagPrefix = "_internal:slottag:4migrate:";

extern uint32_t crc32tab[256];
const size_t MaxKeySendSize = 10 * 1024;

void CRC32TableInit(uint32_t poly);

extern void InitCRC32Table();

extern uint32_t CRC32Update(uint32_t crc, const char* buf, int len);
extern uint32_t CRC32CheckSum(const char* buf, int len);

int GetSlotID(const std::string &str);
int GetKeyType(const std::string& key, std::string& key_type, const std::shared_ptr<DB>& db);
int DeleteKey(const std::string& key, const char key_type, const std::shared_ptr<DB>& db);
int GetSlotsID(const std::string& str, uint32_t* pcrc, int* phastag);
int GetKeyType(const std::string& key, std::string &key_type, const std::shared_ptr<DB>& db);
void AddSlotKey(const std::string& type, const std::string& key, const std::shared_ptr<DB>& db);
void RemSlotKey(const std::string& key, const std::shared_ptr<DB>& db);
int DeleteKey(const std::string& key, const char key_type, const std::shared_ptr<DB>& db);
void RemSlotKeyByType(const std::string& type, const std::string& key, const std::shared_ptr<DB>& db);
std::string GetSlotKey(int slot);
std::string GetSlotKey(uint32_t slot);
std::string GetSlotsTagKey(uint32_t crc);

class PikaMigrate {
Expand Down
4 changes: 2 additions & 2 deletions src/cache/include/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class RedisCache {
static void ResetHitAndMissNum(void);
Status Open(void);
int32_t ActiveExpireCycle(void);

// Normal Commands
bool Exists(std::string& key);
int64_t DbSize(void);
Expand Down Expand Up @@ -163,7 +163,7 @@ class RedisCache {
void FreeHitemList(hitem *items, uint32_t size);
void FreeZitemList(zitem *items, uint32_t size);
void ConvertObjectToString(robj *obj, std::string *value);

private:
RedisCache(const RedisCache&);
RedisCache& operator=(const RedisCache&);
Expand Down
2 changes: 1 addition & 1 deletion src/cache/src/hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,4 +309,4 @@ Status RedisCache::HStrlen(std::string& key, std::string &field, uint64_t *len)

} // namespace cache

/* EOF */
/* EOF */
Loading

0 comments on commit 3aa8982

Please sign in to comment.