Skip to content

Commit

Permalink
Merge branch 'OpenAtomFoundation:unstable' into stream_base
Browse files Browse the repository at this point in the history
  • Loading branch information
KKorpse authored Dec 15, 2023
2 parents ae337ab + ba0d5ce commit b7c8d11
Show file tree
Hide file tree
Showing 18 changed files with 207 additions and 51 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ jobs:
include:
- arch: x86_64-unknown-linux-gnu
os: ubuntu-latest
file_name: pika-${{ github.ref_name }}-linux-amd64
file_name: ${{ github.event.repository.name }}-${{ github.ref_name }}-linux-amd64
file_ext: .tar.gz
- arch: aarch64-unknown-linux-gnu
os: ubuntu-latest
file_name: pika-${{ github.ref_name }}-linux-arm64
file_name: ${{ github.event.repository.name }}-${{ github.ref_name }}-linux-arm64
file_ext: .tar.gz
- arch: x86_64-apple-darwin
os: macos-latest
file_name: pika-${{ github.ref_name }}-darwin-amd64
file_name: ${{ github.event.repository.name }}-${{ github.ref_name }}-darwin-amd64
file_ext: .tar.gz
- arch: aarch64-apple-darwin
os: macos-latest
file_name: pika-${{ github.ref_name }}-darwin-arm64
file_name: ${{ github.event.repository.name }}-${{ github.ref_name }}-darwin-arm64
file_ext: .tar.gz

runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -77,8 +77,8 @@ jobs:
shell: bash
run: |
cd build/
chmod +x pika
tar -zcvf ${{ matrix.file_name }}${{ matrix.file_ext }} pika
chmod +x ${{ github.event.repository.name }}
tar -zcvf ${{ matrix.file_name }}${{ matrix.file_ext }} ${{ github.event.repository.name }}
echo $(shasum -a 256 ${{ matrix.file_name }}${{ matrix.file_ext }} | cut -f1 -d' ') > ${{ matrix.file_name }}${{ matrix.file_ext }}.sha256sum
- name: Upload artifacts
Expand Down Expand Up @@ -107,4 +107,4 @@ jobs:
name: "Release ${{ github.ref_name }}"
generate_release_notes: true
files: |
**/pika-*
**/${{ github.event.repository.name }}-*
2 changes: 1 addition & 1 deletion codis/pkg/topom/topom_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *Topom) RefreshRedisStats(timeout time.Duration) (*sync2.Future, error)
for _, g := range ctx.group {
for _, x := range g.Servers {
goStats(x.Addr, func(addr string) (*RedisStats, error) {
m, err := s.stats.redisp.InfoFull(addr)
m, err := s.stats.redisp.InfoFullv2(addr)
if err != nil {
return nil, err
}
Expand Down
53 changes: 53 additions & 0 deletions codis/pkg/utils/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,24 @@ func (c *Client) Info() (map[string]string, error) {
return info, nil
}

func (c *Client) InfoReplicationIpPort() (map[string]string, error) {
text, err := redigo.String(c.Do("INFO", "replication"))
if err != nil {
return nil, errors.Trace(err)
}
info := make(map[string]string)
for _, line := range strings.Split(text, "\n") {
kv := strings.SplitN(line, ":", 2)
if len(kv) != 2 {
continue
}
if key := strings.TrimSpace(kv[0]); key != "" {
info[key] = strings.TrimSpace(kv[1])
}
}
return info, nil
}

func (c *Client) InfoKeySpace() (map[int]string, error) {
text, err := redigo.String(c.Do("INFO", "keyspace"))
if err != nil {
Expand Down Expand Up @@ -262,6 +280,32 @@ func (c *Client) InfoFull() (map[string]string, error) {
}
}

func (c *Client) InfoFullv2() (map[string]string, error) {
if info, err := c.InfoReplicationIpPort(); err != nil {
return nil, errors.Trace(err)
} else {
host := info["master_host"]
port := info["master_port"]
if host != "" || port != "" {
info["master_addr"] = net.JoinHostPort(host, port)
}
r, err := c.Do("CONFIG", "GET", "maxmemory")
if err != nil {
return nil, errors.Trace(err)
}
p, err := redigo.Values(r, nil)
if err != nil || len(p) != 2 {
return nil, errors.Errorf("invalid response = %v", r)
}
v, err := redigo.Int(p[1], nil)
if err != nil {
return nil, errors.Errorf("invalid response = %v", r)
}
info["maxmemory"] = strconv.Itoa(v)
return info, nil
}
}

func (c *Client) SetMaster(master string) error {
if master == "" || strings.ToUpper(master) == "NO:ONE" {
if _, err := c.Do("SLAVEOF", "NO", "ONE"); err != nil {
Expand Down Expand Up @@ -509,6 +553,15 @@ func (p *Pool) InfoFull(addr string) (_ map[string]string, err error) {
return c.InfoFull()
}

func (p *Pool) InfoFullv2(addr string) (_ map[string]string, err error) {
c, err := p.GetClient(addr)
if err != nil {
return nil, err
}
defer p.PutClient(c)
return c.InfoFullv2()
}

type InfoCache struct {
mu sync.Mutex

Expand Down
12 changes: 12 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,18 @@ class TimeCmd : public Cmd {
void DoInitial() override;
};

class LastsaveCmd : public Cmd {
public:
LastsaveCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new LastsaveCmd(*this); }

private:
void DoInitial() override;
};

class DelbackupCmd : public Cmd {
public:
DelbackupCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
Expand Down
1 change: 1 addition & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const std::string kCmdNameCommand = "command";
const std::string kCmdNameDiskRecovery = "diskrecovery";
const std::string kCmdNameClearReplicationID = "clearreplicationid";
const std::string kCmdNameDisableWal = "disablewal";
const std::string kCmdNameLastSave = "lastsave";
const std::string kCmdNameCache = "cache";
const std::string kCmdNameClearCache = "clearcache";

Expand Down
2 changes: 1 addition & 1 deletion include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ class PikaConf : public pstd::BaseConf {
bool cache_index_and_filter_blocks_ = false;
bool pin_l0_filter_and_index_blocks_in_cache_ = false;
bool optimize_filters_for_hits_ = false;
bool level_compaction_dynamic_level_bytes_ = false;
bool level_compaction_dynamic_level_bytes_ = true;
int64_t rate_limiter_bandwidth_ = 0;
int64_t rate_limiter_refill_period_us_ = 0;
int64_t rate_limiter_fairness_ = 0;
Expand Down
12 changes: 12 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,12 @@ class PikaServer : public pstd::noncopyable {
void ClearHitRatio(void);
void ProcessCronTask();
double HitRatio();

/*
* lastsave used
*/
int64_t GetLastSave() const {return lastsave_;}
void UpdateLastSave(int64_t lastsave) {lastsave_ = lastsave;}
private:
/*
* TimingTask use
Expand All @@ -577,6 +583,7 @@ class PikaServer : public pstd::noncopyable {
void AutoKeepAliveRSync();
void AutoUpdateNetworkMetric();
void PrintThreadPoolQueueStatus();
int64_t GetLastSaveTime(const std::string& dump_dir);

std::string host_;
int port_ = 0;
Expand Down Expand Up @@ -701,6 +708,11 @@ class PikaServer : public pstd::noncopyable {
*/
std::shared_mutex mu_;
std::shared_mutex cache_info_rwlock_;

/*
* lastsave used
*/
int64_t lastsave_ = 0;
};

#endif
2 changes: 1 addition & 1 deletion src/build_version.cc.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
const char* pika_build_git_sha =
"pika_git_sha:@PIKA_GIT_SHA@";
const char* pika_build_git_date = "pika_build_git_date:@PIKA_GIT_DATE@";
const char* pika_build_compile_date = "pika_build_date:@PIKA_BUILD_DATE@";
const char* pika_build_compile_date = "@PIKA_BUILD_DATE@";
11 changes: 11 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2610,6 +2610,17 @@ void TimeCmd::Do(std::shared_ptr<Slot> slot) {
}
}

void LastsaveCmd::DoInitial() {
if (argv_.size() != 1) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameLastSave);
return;
}
}

void LastsaveCmd::Do(std::shared_ptr<Slot> slot) {
res_.AppendInteger(g_pika_server->GetLastSave());
}

void DelbackupCmd::DoInitial() {
if (argv_.size() != 1) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameDelbackup);
Expand Down
3 changes: 3 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ extern PikaServer* g_pika_server;
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
extern std::unique_ptr<PikaCmdTableManager> g_pika_cmd_table_manager;


void InitCmdTable(CmdTable* cmd_table) {
// Admin
////Slaveof
Expand Down Expand Up @@ -110,6 +111,8 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameCache, std::move(cacheptr)));
std::unique_ptr<Cmd> clearcacheptr = std::make_unique<ClearCacheCmd>(kCmdNameClearCache, 1, kCmdFlagsAdmin | kCmdFlagsWrite);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameClearCache, std::move(clearcacheptr)));
std::unique_ptr<Cmd> lastsaveptr = std::make_unique<LastsaveCmd>(kCmdNameLastSave, 1, kCmdFlagsAdmin | kCmdFlagsRead);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameLastSave, std::move(lastsaveptr)));

#ifdef WITH_COMMAND_DOCS
std::unique_ptr<Cmd> commandptr = std::make_unique<CommandCmd>(kCmdNameCommand, -1, kCmdFlagsRead | kCmdFlagsAdmin);
Expand Down
2 changes: 1 addition & 1 deletion src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ int PikaConf::Load() {

std::string lcdlb;
GetConfStr("level-compaction-dynamic-level-bytes", &lcdlb);
level_compaction_dynamic_level_bytes_ = lcdlb == "yes";
level_compaction_dynamic_level_bytes_ = lcdlb == "yes" || lcdlb.empty();

// daemonize
std::string dmz;
Expand Down
53 changes: 36 additions & 17 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ PikaServer::PikaServer()
pika_client_processor_ = std::make_unique<PikaClientProcessor>(g_pika_conf->thread_pool_size(), 100000);
instant_ = std::make_unique<Instant>();
exit_mutex_.lock();
int64_t lastsave = GetLastSaveTime(g_pika_conf->bgsave_path());
UpdateLastSave(lastsave);
}

PikaServer::~PikaServer() {
Expand Down Expand Up @@ -548,7 +550,7 @@ void PikaServer::SlotSetSmallCompactionDurationThreshold(uint32_t small_compacti
}

bool PikaServer::GetDBSlotBinlogOffset(const std::string& db_name, uint32_t slot_id,
BinlogOffset* const boffset) {
BinlogOffset* const boffset) {
std::shared_ptr<SyncMasterSlot> slot =
g_pika_rm->GetSyncMasterSlotByName(SlotInfo(db_name, slot_id));
if (!slot) {
Expand Down Expand Up @@ -1233,7 +1235,7 @@ void PikaServer::ResetLastSecQuerynum() {
}

void PikaServer::UpdateQueryNumAndExecCountDB(const std::string& db_name, const std::string& command,
bool is_write) {
bool is_write) {
std::string cmd(command);
statistic_.server_stat.qps.querynum++;
statistic_.server_stat.exec_count_db[pstd::StringToUpper(cmd)]++;
Expand Down Expand Up @@ -1537,13 +1539,13 @@ void PikaServer::AutoUpdateNetworkMetric() {
}

void PikaServer::PrintThreadPoolQueueStatus() {
// Print the current queue size if it exceeds QUEUE_SIZE_THRESHOLD_PERCENTAGE/100 of the maximum queue size.
size_t cur_size = ClientProcessorThreadPoolCurQueueSize();
size_t max_size = ClientProcessorThreadPoolMaxQueueSize();
size_t thread_hold = (max_size / 100) * QUEUE_SIZE_THRESHOLD_PERCENTAGE;
if (cur_size > thread_hold) {
LOG(INFO) << "The current queue size of the Pika Server's client thread processor thread pool: " << cur_size;
}
// Print the current queue size if it exceeds QUEUE_SIZE_THRESHOLD_PERCENTAGE/100 of the maximum queue size.
size_t cur_size = ClientProcessorThreadPoolCurQueueSize();
size_t max_size = ClientProcessorThreadPoolMaxQueueSize();
size_t thread_hold = (max_size / 100) * QUEUE_SIZE_THRESHOLD_PERCENTAGE;
if (cur_size > thread_hold) {
LOG(INFO) << "The current queue size of the Pika Server's client thread processor thread pool: " << cur_size;
}
}

void PikaServer::InitStorageOptions() {
Expand Down Expand Up @@ -1600,14 +1602,14 @@ void PikaServer::InitStorageOptions() {
}

storage_options_.options.rate_limiter =
std::shared_ptr<rocksdb::RateLimiter>(
rocksdb::NewGenericRateLimiter(
g_pika_conf->rate_limiter_bandwidth(),
g_pika_conf->rate_limiter_refill_period_us(),
static_cast<int32_t>(g_pika_conf->rate_limiter_fairness()),
rocksdb::RateLimiter::Mode::kWritesOnly,
g_pika_conf->rate_limiter_auto_tuned()
));
std::shared_ptr<rocksdb::RateLimiter>(
rocksdb::NewGenericRateLimiter(
g_pika_conf->rate_limiter_bandwidth(),
g_pika_conf->rate_limiter_refill_period_us(),
static_cast<int32_t>(g_pika_conf->rate_limiter_fairness()),
rocksdb::RateLimiter::Mode::kWritesOnly,
g_pika_conf->rate_limiter_auto_tuned()
));

// For Storage small compaction
storage_options_.statistics_max_size = g_pika_conf->max_cache_statistic_keys();
Expand Down Expand Up @@ -1778,6 +1780,23 @@ void PikaServer::Bgslotscleanup(std::vector<int> cleanupSlots, const std::shared
bgslots_cleanup_thread_.StartThread();
bgslots_cleanup_thread_.Schedule(&DoBgslotscleanup, static_cast<void*>(this));
}
int64_t PikaServer::GetLastSaveTime(const std::string& dir_path) {
std::vector<std::string> dump_dir;
// Dump file is not exist
if (!pstd::FileExists(dir_path)) {
LOG(INFO) << "Dump file is not exist,path: " << dir_path;
return 0;
}
if (pstd::GetChildren(dir_path, dump_dir) != 0) {
return 0;
}
std::string dump_file = dir_path + dump_dir[0];
struct stat fileStat;
if (stat(dump_file.c_str(), &fileStat) == 0) {
return static_cast<int64_t>(fileStat.st_mtime);
}
return 0;
}

void DoBgslotscleanup(void* arg) {
auto p = static_cast<PikaServer*>(arg);
Expand Down
Loading

0 comments on commit b7c8d11

Please sign in to comment.