diff --git a/.github/workflows/publish_nightly_docker_image.yml b/.github/workflows/publish_nightly_docker_image.yml index cc1a508a1e..98f2d627e3 100644 --- a/.github/workflows/publish_nightly_docker_image.yml +++ b/.github/workflows/publish_nightly_docker_image.yml @@ -18,13 +18,13 @@ jobs: - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 - + - name: Log in to Docker Hub uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a with: username: ${{ secrets.DOCKER_USERNAME }} password: ${{ secrets.DOCKER_PASSWORD }} - + - name: Extract metadata (tags, labels) for Docker id: meta uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 @@ -32,7 +32,7 @@ jobs: images: pikadb/pika-dev-nightly tags: | type=schedule,prefix={{branch}},pattern={{date 'YYYYMMDD'}} - + - name: Build and push Docker image timeout-minutes: 1440 uses: docker/build-push-action@3b5e8027fcad23fda98b2e3ac259d8d67585f671 @@ -56,13 +56,13 @@ jobs: - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 - + - name: Log in to Docker Hub uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a with: username: ${{ secrets.DOCKER_USERNAME }} password: ${{ secrets.DOCKER_PASSWORD }} - + - name: Extract metadata (tags, labels) for Docker id: meta uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 @@ -70,7 +70,7 @@ jobs: images: pikadb/codis-dev-nightly tags: | type=schedule,prefix={{branch}},pattern={{date 'YYYYMMDD'}} - + - name: Build and push Docker image timeout-minutes: 1440 uses: docker/build-push-action@3b5e8027fcad23fda98b2e3ac259d8d67585f671 @@ -81,4 +81,3 @@ jobs: push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} - diff --git a/.github/workflows/publish_release_docker_image.yml b/.github/workflows/publish_release_docker_image.yml index 47b5f8df0b..92748d32e2 100644 --- a/.github/workflows/publish_release_docker_image.yml +++ b/.github/workflows/publish_release_docker_image.yml @@ -12,25 +12,25 @@ jobs: steps: - name: Check out the repo uses: actions/checkout@v3 - + - name: Set up QEMU uses: docker/setup-qemu-action@v2 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 - + - name: Log in to Docker Hub uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a with: username: ${{ secrets.DOCKER_USERNAME }} password: ${{ secrets.DOCKER_PASSWORD }} - + - name: Extract metadata (tags, labels) for Docker id: meta uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 with: images: pikadb/pika - + - name: Build and push Docker image timeout-minutes: 1440 uses: docker/build-push-action@3b5e8027fcad23fda98b2e3ac259d8d67585f671 @@ -48,25 +48,25 @@ jobs: steps: - name: Check out the repo uses: actions/checkout@v3 - + - name: Set up QEMU uses: docker/setup-qemu-action@v2 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 - + - name: Log in to Docker Hub uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a with: username: ${{ secrets.DOCKER_USERNAME }} password: ${{ secrets.DOCKER_PASSWORD }} - + - name: Extract metadata (tags, labels) for Docker id: meta uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 with: images: pikadb/codis - + - name: Build and push Docker image timeout-minutes: 1440 uses: docker/build-push-action@3b5e8027fcad23fda98b2e3ac259d8d67585f671 @@ -77,4 +77,3 @@ jobs: push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} - diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000000..d44f604665 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,110 @@ +name: Release + +on: + push: + tags: + - "v*.*.*" + +env: + BUILD_TYPE: Release + +jobs: + build: + name: Build binary + strategy: + matrix: + include: + - arch: x86_64-unknown-linux-gnu + os: ubuntu-latest + file_name: ${{ github.event.repository.name }}-${{ github.ref_name }}-linux-amd64 + file_ext: .tar.gz + - arch: aarch64-unknown-linux-gnu + os: ubuntu-latest + file_name: ${{ github.event.repository.name }}-${{ github.ref_name }}-linux-arm64 + file_ext: .tar.gz + - arch: x86_64-apple-darwin + os: macos-latest + file_name: ${{ github.event.repository.name }}-${{ github.ref_name }}-darwin-amd64 + file_ext: .tar.gz + - arch: aarch64-apple-darwin + os: macos-latest + file_name: ${{ github.event.repository.name }}-${{ github.ref_name }}-darwin-arm64 + file_ext: .tar.gz + + runs-on: ${{ matrix.os }} + + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Install Deps - linux + if: contains(matrix.arch, 'linux') + run: | + sudo apt-get install -y autoconf libprotobuf-dev protobuf-compiler + sudo apt-get install -y clang-tidy-12 + + - name: Install Deps - darwin + if: contains(matrix.os, 'macos') + run: | + brew update + brew install --overwrite python autoconf protobuf llvm wget git + brew install gcc@10 automake cmake make binutils + + - name: Configure CMake - linux + if: contains(matrix.arch, 'linux') + run: cmake -B build -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} -DUSE_PIKA_TOOLS=ON -DCMAKE_CXX_FLAGS="-s" -DCMAKE_EXE_LINKER_FLAGS="-s" + + - name: Configure CMake - darwin + if: contains(matrix.os, 'macos') + run: | + export CC=/usr/local/opt/gcc@10/bin/gcc-10 + cmake -B build -DCMAKE_C_COMPILER=/usr/local/opt/gcc@10/bin/gcc-10 -DUSE_PIKA_TOOLS=ON -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} + + - name: Cache Build - linux + if: contains(matrix.arch, 'linux') + uses: actions/cache@v3 + id: cache-ubuntu + with: + key: ${{ runner.os }}-build-ubuntu-${{ hashFiles('**/CMakeLists.txt') }} + path: | + ${{ github.workspace }}/buildtrees + ${{ github.workspace }}/deps + + - name: Build + run: cmake --build build --config ${{ env.BUILD_TYPE }} + + - name: Calculate checksum and rename binary + shell: bash + run: | + cd build/ + chmod +x ${{ github.event.repository.name }} + tar -zcvf ${{ matrix.file_name }}${{ matrix.file_ext }} ${{ github.event.repository.name }} + echo $(shasum -a 256 ${{ matrix.file_name }}${{ matrix.file_ext }} | cut -f1 -d' ') > ${{ matrix.file_name }}${{ matrix.file_ext }}.sha256sum + + - name: Upload artifacts + uses: actions/upload-artifact@v3 + with: + name: ${{ matrix.file_name }}${{ matrix.file_ext }} + path: build/${{ matrix.file_name }}${{ matrix.file_ext }} + + - name: Upload checksum of artifacts + uses: actions/upload-artifact@v3 + with: + name: ${{ matrix.file_name }}${{ matrix.file_ext }}.sha256sum + path: build/${{ matrix.file_name }}${{ matrix.file_ext }}.sha256sum + + release: + name: Release artifacts + needs: [ build ] + runs-on: ubuntu-latest + steps: + - name: Download artifacts + uses: actions/download-artifact@v3 + + - name: Publish release + uses: softprops/action-gh-release@v1 + with: + name: "Release ${{ github.ref_name }}" + generate_release_notes: true + files: | + **/${{ github.event.repository.name }}-* diff --git a/codis/pkg/topom/topom_stats.go b/codis/pkg/topom/topom_stats.go index 0d6acb726a..9186e05a13 100644 --- a/codis/pkg/topom/topom_stats.go +++ b/codis/pkg/topom/topom_stats.go @@ -66,7 +66,7 @@ func (s *Topom) RefreshRedisStats(timeout time.Duration) (*sync2.Future, error) for _, g := range ctx.group { for _, x := range g.Servers { goStats(x.Addr, func(addr string) (*RedisStats, error) { - m, err := s.stats.redisp.InfoFull(addr) + m, err := s.stats.redisp.InfoFullv2(addr) if err != nil { return nil, err } diff --git a/codis/pkg/utils/redis/client.go b/codis/pkg/utils/redis/client.go index f0ef98e98e..21ae9e83b6 100644 --- a/codis/pkg/utils/redis/client.go +++ b/codis/pkg/utils/redis/client.go @@ -157,6 +157,24 @@ func (c *Client) Info() (map[string]string, error) { return info, nil } +func (c *Client) InfoReplicationIpPort() (map[string]string, error) { + text, err := redigo.String(c.Do("INFO", "replication")) + if err != nil { + return nil, errors.Trace(err) + } + info := make(map[string]string) + for _, line := range strings.Split(text, "\n") { + kv := strings.SplitN(line, ":", 2) + if len(kv) != 2 { + continue + } + if key := strings.TrimSpace(kv[0]); key != "" { + info[key] = strings.TrimSpace(kv[1]) + } + } + return info, nil +} + func (c *Client) InfoKeySpace() (map[int]string, error) { text, err := redigo.String(c.Do("INFO", "keyspace")) if err != nil { @@ -262,6 +280,32 @@ func (c *Client) InfoFull() (map[string]string, error) { } } +func (c *Client) InfoFullv2() (map[string]string, error) { + if info, err := c.InfoReplicationIpPort(); err != nil { + return nil, errors.Trace(err) + } else { + host := info["master_host"] + port := info["master_port"] + if host != "" || port != "" { + info["master_addr"] = net.JoinHostPort(host, port) + } + r, err := c.Do("CONFIG", "GET", "maxmemory") + if err != nil { + return nil, errors.Trace(err) + } + p, err := redigo.Values(r, nil) + if err != nil || len(p) != 2 { + return nil, errors.Errorf("invalid response = %v", r) + } + v, err := redigo.Int(p[1], nil) + if err != nil { + return nil, errors.Errorf("invalid response = %v", r) + } + info["maxmemory"] = strconv.Itoa(v) + return info, nil + } +} + func (c *Client) SetMaster(master string) error { if master == "" || strings.ToUpper(master) == "NO:ONE" { if _, err := c.Do("SLAVEOF", "NO", "ONE"); err != nil { @@ -509,6 +553,15 @@ func (p *Pool) InfoFull(addr string) (_ map[string]string, err error) { return c.InfoFull() } +func (p *Pool) InfoFullv2(addr string) (_ map[string]string, err error) { + c, err := p.GetClient(addr) + if err != nil { + return nil, err + } + defer p.PutClient(c) + return c.InfoFullv2() +} + type InfoCache struct { mu sync.Mutex diff --git a/conf/pika.conf b/conf/pika.conf index 7f91a580ca..e902d45a1e 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -268,6 +268,7 @@ max-cache-statistic-keys : 0 # a small compact is triggered automatically if the small compaction feature is enabled. # small-compaction-threshold default value is 5000 and the value range is [1, 100000]. small-compaction-threshold : 5000 +small-compaction-duration-threshold : 10000 # The maximum total size of all live memtables of the RocksDB instance that owned by Pika. # Flushing from memtable to disk will be triggered if the actual memory usage of RocksDB diff --git a/include/pika_admin.h b/include/pika_admin.h index 1bd18b8f32..808e3fad4a 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -261,7 +261,9 @@ class InfoCmd : public Cmd { bool rescan_ = false; // whether to rescan the keyspace bool off_ = false; std::set keyspace_scan_dbs_; - + time_t db_size_last_time_ = 0; + uint64_t db_size_ = 0; + uint64_t log_size_ = 0; const static std::string kInfoSection; const static std::string kAllSection; const static std::string kServerSection; diff --git a/include/pika_conf.h b/include/pika_conf.h index 40bf3206a9..e65cd9a132 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -220,6 +220,10 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return small_compaction_threshold_; } + int small_compaction_duration_threshold() { + std::shared_lock l(rwlock_); + return small_compaction_duration_threshold_; + } int max_background_flushes() { std::shared_lock l(rwlock_); return max_background_flushes_; @@ -425,6 +429,11 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("small-compaction-threshold", std::to_string(value)); small_compaction_threshold_ = value; } + void SetSmallCompactionDurationThreshold(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("small-compaction-duration-threshold", std::to_string(value)); + small_compaction_duration_threshold_ = value; + } void SetMaxClientResponseSize(const int value) { std::lock_guard l(rwlock_); TryPushDiffCommands("max-client-response-size", std::to_string(value)); @@ -684,6 +693,7 @@ class PikaConf : public pstd::BaseConf { int max_cache_statistic_keys_ = 0; int small_compaction_threshold_ = 0; + int small_compaction_duration_threshold_ = 0; int max_background_flushes_ = 0; int max_background_compactions_ = 0; int max_background_jobs_ = 0; diff --git a/include/pika_server.h b/include/pika_server.h index 72a59a3f2c..b54d136def 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -195,6 +195,7 @@ class PikaServer : public pstd::noncopyable { bool IsDBExist(const std::string& db_name); bool IsDBSlotExist(const std::string& db_name, uint32_t slot_id); bool IsDBBinlogIoError(const std::string& db_name); + std::set GetAllDBName(); pstd::Status DoSameThingSpecificDB(const std::set& dbs, const TaskArg& arg); std::shared_mutex& GetDBLock() { return dbs_rw_; @@ -218,6 +219,7 @@ class PikaServer : public pstd::noncopyable { void PrepareSlotTrySync(); void SlotSetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys); void SlotSetSmallCompactionThreshold(uint32_t small_compaction_threshold); + void SlotSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold); bool GetDBSlotBinlogOffset(const std::string& db_name, uint32_t slot_id, BinlogOffset* boffset); std::shared_ptr GetSlotByDBName(const std::string& db_name); std::shared_ptr GetDBSlotById(const std::string& db_name, uint32_t slot_id); diff --git a/include/pika_set.h b/include/pika_set.h index 76643ce192..dca5bac17d 100644 --- a/include/pika_set.h +++ b/include/pika_set.h @@ -119,7 +119,6 @@ class SScanCmd : public Cmd { std::string key_, pattern_ = "*"; int64_t cursor_ = 0; int64_t count_ = 10; - rocksdb::Status s_; void DoInitial() override; void Clear() override { pattern_ = "*"; @@ -231,6 +230,7 @@ class SInterstoreCmd : public SetOperationCmd { private: void DoInitial() override; + rocksdb::Status s_; }; class SIsmemberCmd : public Cmd { @@ -339,6 +339,7 @@ class SRandmemberCmd : public Cmd { std::string key_; int64_t count_ = 1; bool reply_arr = false; + rocksdb::Status s_; void DoInitial() override; void Clear() override { count_ = 1; diff --git a/include/pika_zset.h b/include/pika_zset.h index 755f1b479e..f189ecf434 100644 --- a/include/pika_zset.h +++ b/include/pika_zset.h @@ -279,6 +279,7 @@ class ZRemCmd : public Cmd { std::string key_; std::vector members_; int32_t deleted_ = 0; + rocksdb::Status s_; void DoInitial() override; }; @@ -481,6 +482,7 @@ class ZRevrangebylexCmd : public ZsetRangebylexParentCmd { private: void DoInitial() override; + rocksdb::Status s_; }; class ZLexcountCmd : public Cmd { diff --git a/include/rsync_client.h b/include/rsync_client.h index d1077a8a61..388bd8ff0e 100644 --- a/include/rsync_client.h +++ b/include/rsync_client.h @@ -33,6 +33,7 @@ extern std::unique_ptr g_pika_conf; const std::string kDumpMetaFileName = "DUMP_META_DATA"; const std::string kUuidPrefix = "snapshot-uuid:"; +const size_t kInvalidOffset = 0xFFFFFFFF; namespace rsync { @@ -42,8 +43,7 @@ class WaitObject; class WaitObjectManager; using pstd::Status; - - +using ResponseSPtr = std::shared_ptr; class RsyncClient : public net::Thread { public: enum State { @@ -151,18 +151,18 @@ class WaitObject { void Reset(const std::string& filename, RsyncService::Type t, size_t offset) { std::lock_guard guard(mu_); - resp_ = nullptr; + resp_.reset(); filename_ = filename; type_ = t; offset_ = offset; } - pstd::Status Wait(RsyncService::RsyncResponse*& resp) { + pstd::Status Wait(ResponseSPtr& resp) { pstd::Status s = Status::Timeout("rsync timeout", "timeout"); { std::unique_lock lock(mu_); auto cv_s = cond_.wait_for(lock, std::chrono::seconds(1), [this] { - return resp_ != nullptr; + return resp_.get() != nullptr; }); if (!cv_s) { return s; @@ -175,19 +175,19 @@ class WaitObject { void WakeUp(RsyncService::RsyncResponse* resp) { std::unique_lock lock(mu_); - resp_ = resp; + resp_.reset(resp); + offset_ = kInvalidOffset; cond_.notify_all(); } - RsyncService::RsyncResponse* Response() {return resp_;} std::string Filename() {return filename_;} RsyncService::Type Type() {return type_;} size_t Offset() {return offset_;} private: std::string filename_; RsyncService::Type type_; - size_t offset_ = 0xFFFFFFFF; - RsyncService::RsyncResponse* resp_ = nullptr; + size_t offset_ = kInvalidOffset; + ResponseSPtr resp_ = nullptr; std::condition_variable cond_; std::mutex mu_; }; @@ -222,7 +222,8 @@ class WaitObjectManager { return; } if (resp->type() == RsyncService::kRsyncFile && - (resp->file_resp().filename() != wo_vec_[index]->Filename())) { + ((resp->file_resp().filename() != wo_vec_[index]->Filename()) || + (resp->file_resp().offset() != wo_vec_[index]->Offset()))) { delete resp; return; } diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 5977f05965..859fce548b 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -301,6 +301,8 @@ void BgsaveCmd::DoInitial() { bgsave_dbs_.insert(db); } } + } else { + bgsave_dbs_ = g_pika_server->GetAllDBName(); } } @@ -323,8 +325,10 @@ void CompactCmd::DoInitial() { if (argv_.size() == 1) { struct_type_ = "all"; + compact_dbs_ = g_pika_server->GetAllDBName(); } else if (argv_.size() == 2) { struct_type_ = argv_[1]; + compact_dbs_ = g_pika_server->GetAllDBName(); } else if (argv_.size() == 3) { std::vector dbs; pstd::StringSplit(argv_[1], COMMA, dbs); @@ -866,6 +870,8 @@ void InfoCmd::DoInitial() { keyspace_scan_dbs_.insert(db); } } + } else { + keyspace_scan_dbs_ = g_pika_server->GetAllDBName(); } LogCommand(); return; @@ -1097,61 +1103,6 @@ void InfoCmd::InfoCPU(std::string& info) { info.append(tmp_stream.str()); } -void InfoCmd::InfoShardingReplication(std::string& info) { - int role = 0; - std::string slave_list_string; - uint32_t slave_num = g_pika_server->GetShardingSlaveListString(slave_list_string); - if (slave_num != 0U) { - role |= PIKA_ROLE_MASTER; - } - std::string common_master; - std::string master_ip; - int master_port = 0; - g_pika_rm->FindCommonMaster(&common_master); - if (!common_master.empty()) { - role |= PIKA_ROLE_SLAVE; - if (!pstd::ParseIpPortString(common_master, master_ip, master_port)) { - return; - } - } - - std::stringstream tmp_stream; - tmp_stream << "# Replication("; - switch (role) { - case PIKA_ROLE_SINGLE: - case PIKA_ROLE_MASTER: - tmp_stream << "MASTER)\r\nrole:master\r\n"; - break; - case PIKA_ROLE_SLAVE: - tmp_stream << "SLAVE)\r\nrole:slave\r\n"; - break; - case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE: - tmp_stream << "Master && SLAVE)\r\nrole:master&&slave\r\n"; - break; - default: - info.append("ERR: server role is error\r\n"); - return; - } - switch (role) { - case PIKA_ROLE_SLAVE: - tmp_stream << "master_host:" << master_ip << "\r\n"; - tmp_stream << "master_port:" << master_port << "\r\n"; - tmp_stream << "master_link_status:up" - << "\r\n"; - tmp_stream << "slave_priority:" << g_pika_conf->slave_priority() << "\r\n"; - break; - case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE: - tmp_stream << "master_host:" << master_ip << "\r\n"; - tmp_stream << "master_port:" << master_port << "\r\n"; - tmp_stream << "master_link_status:up" - << "\r\n"; - case PIKA_ROLE_SINGLE: - case PIKA_ROLE_MASTER: - tmp_stream << "connected_slaves:" << slave_num << "\r\n" << slave_list_string; - } - info.append(tmp_stream.str()); -} - void InfoCmd::InfoReplication(std::string& info) { int host_role = g_pika_server->role(); std::stringstream tmp_stream; @@ -1289,7 +1240,7 @@ void InfoCmd::InfoKeyspace(std::string& info) { std::shared_lock rwl(g_pika_server->dbs_rw_); for (const auto& db_item : g_pika_server->dbs_) { - if (keyspace_scan_dbs_.empty() || keyspace_scan_dbs_.find(db_item.first) != keyspace_scan_dbs_.end()) { + if (keyspace_scan_dbs_.find(db_item.first) != keyspace_scan_dbs_.end()) { db_name = db_item.second->GetDBName(); key_scan_info = db_item.second->GetKeyScanInfo(); key_infos = key_scan_info.key_infos; @@ -1332,13 +1283,24 @@ void InfoCmd::InfoKeyspace(std::string& info) { void InfoCmd::InfoData(std::string& info) { std::stringstream tmp_stream; std::stringstream db_fatal_msg_stream; - - uint64_t db_size = pstd::Du(g_pika_conf->db_path()); + uint64_t db_size = 0; + time_t current_time_s = time(nullptr); + uint64_t log_size = 0; + + if (current_time_s - 60 >= db_size_last_time_) { + db_size_last_time_ = current_time_s; + db_size = pstd::Du(g_pika_conf->db_path()); + db_size_ = db_size; + log_size = pstd::Du(g_pika_conf->log_path()); + log_size_ = log_size; + } else { + db_size = db_size_; + log_size = log_size_; + } tmp_stream << "# Data" << "\r\n"; tmp_stream << "db_size:" << db_size << "\r\n"; tmp_stream << "db_size_human:" << (db_size >> 20) << "M\r\n"; - uint64_t log_size = pstd::Du(g_pika_conf->log_path()); tmp_stream << "log_size:" << log_size << "\r\n"; tmp_stream << "log_size_human:" << (log_size >> 20) << "M\r\n"; tmp_stream << "compression:" << g_pika_conf->compression() << "\r\n"; @@ -1347,8 +1309,8 @@ void InfoCmd::InfoData(std::string& info) { std::map background_errors; uint64_t total_background_errors = 0; uint64_t total_memtable_usage = 0; - uint64_t memtable_usage = 0; uint64_t total_table_reader_usage = 0; + uint64_t memtable_usage = 0; uint64_t table_reader_usage = 0; std::shared_lock db_rwl(g_pika_server->dbs_rw_); for (const auto& db_item : g_pika_server->dbs_) { @@ -1733,6 +1695,12 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeNumber(&config_body, g_pika_conf->small_compaction_threshold()); } + if (pstd::stringmatch(pattern.data(), "small-compaction-duration-threshold", 1) != 0) { + elements += 2; + EncodeString(&config_body, "small-compaction-duration-threshold"); + EncodeNumber(&config_body, g_pika_conf->small_compaction_duration_threshold()); + } + if (pstd::stringmatch(pattern.data(), "max-background-flushes", 1) != 0) { elements += 2; EncodeString(&config_body, "max-background-flushes"); @@ -2090,7 +2058,7 @@ void ConfigCmd::ConfigGet(std::string& ret) { void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr slot) { std::string set_item = config_args_v_[1]; if (set_item == "*") { - ret = "*28\r\n"; + ret = "*29\r\n"; EncodeString(&ret, "timeout"); EncodeString(&ret, "requirepass"); EncodeString(&ret, "masterauth"); @@ -2109,6 +2077,7 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr slot) { EncodeString(&ret, "write-binlog"); EncodeString(&ret, "max-cache-statistic-keys"); EncodeString(&ret, "small-compaction-threshold"); + EncodeString(&ret, "small-compaction-duration-threshold"); EncodeString(&ret, "max-client-response-size"); EncodeString(&ret, "db-sync-speed"); EncodeString(&ret, "compact-cron"); @@ -2241,6 +2210,14 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr slot) { g_pika_conf->SetSmallCompactionThreshold(static_cast(ival)); g_pika_server->SlotSetSmallCompactionThreshold(static_cast(ival)); ret = "+OK\r\n"; + } else if (set_item == "small-compaction-duration-threshold") { + if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { + ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'small-compaction-duration-threshold'\r\n"; + return; + } + g_pika_conf->SetSmallCompactionDurationThreshold(static_cast(ival)); + g_pika_server->SlotSetSmallCompactionDurationThreshold(static_cast(ival)); + ret = "+OK\r\n"; } else if (set_item == "max-client-response-size") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-client-response-size'\r\n"; diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 9aa414ae78..007d7490d7 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -451,8 +451,18 @@ int PikaConf::Load() { small_compaction_threshold_ = 5000; GetConfInt("small-compaction-threshold", &small_compaction_threshold_); - if (small_compaction_threshold_ <= 0 || small_compaction_threshold_ >= 100000) { - small_compaction_threshold_ = 5000; + if (small_compaction_threshold_ < 0) { + small_compaction_threshold_ = 0; + } else if (small_compaction_threshold_ >= 100000) { + small_compaction_threshold_ = 100000; + } + + small_compaction_duration_threshold_ = 10000; + GetConfInt("small-compaction-duration-threshold", &small_compaction_duration_threshold_); + if (small_compaction_duration_threshold_ < 0) { + small_compaction_duration_threshold_ = 0; + } else if (small_compaction_duration_threshold_ >= 1000000) { + small_compaction_duration_threshold_ = 1000000; } max_background_flushes_ = 1; @@ -724,6 +734,7 @@ int PikaConf::ConfigRewrite() { SetConfStr("replication-id", replication_id_); SetConfInt("max-cache-statistic-keys", max_cache_statistic_keys_); SetConfInt("small-compaction-threshold", small_compaction_threshold_); + SetConfInt("small-compaction-duration-threshold", small_compaction_duration_threshold_); SetConfInt("max-client-response-size", static_cast(max_client_response_size_)); SetConfInt("db-sync-speed", db_sync_speed_); SetConfStr("compact-cron", compact_cron_); diff --git a/src/pika_server.cc b/src/pika_server.cc index 7b9cdf4f9b..92e502cd3f 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -430,6 +430,15 @@ bool PikaServer::IsDBBinlogIoError(const std::string& db_name) { return db ? db->IsBinlogIoError() : true; } +std::set PikaServer::GetAllDBName() { + std::set dbs; + std::shared_lock l(dbs_rw_); + for (const auto& db_item : dbs_) { + dbs.insert(db_item.first); + } + return dbs; +} + Status PikaServer::DoSameThingSpecificDB(const std::set& dbs, const TaskArg& arg) { std::shared_lock rwl(dbs_rw_); for (const auto& db_item : dbs_) { @@ -527,6 +536,17 @@ void PikaServer::SlotSetSmallCompactionThreshold(uint32_t small_compaction_thres } } +void PikaServer::SlotSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold) { + std::shared_lock rwl(dbs_rw_); + for (const auto& db_item : dbs_) { + for (const auto& slot_item : db_item.second->slots_) { + slot_item.second->DbRWLockReader(); + slot_item.second->db()->SetSmallCompactionDurationThreshold(small_compaction_duration_threshold); + slot_item.second->DbRWUnLock(); + } + } +} + bool PikaServer::GetDBSlotBinlogOffset(const std::string& db_name, uint32_t slot_id, BinlogOffset* const boffset) { std::shared_ptr slot = @@ -1346,13 +1366,7 @@ void PikaServer::AutoCompactRange() { if (last_check_compact_time_.tv_sec == 0 || now.tv_sec - last_check_compact_time_.tv_sec >= interval * 3600) { gettimeofday(&last_check_compact_time_, nullptr); if ((static_cast(free_size) / static_cast(total_size)) * 100 >= usage) { - std::set dbs; - { - std::shared_lock l(dbs_rw_); - for (const auto& db_item : dbs_) { - dbs.insert(db_item.first); - } - } + std::set dbs = g_pika_server->GetAllDBName(); Status s = DoSameThingSpecificDB(dbs, {TaskType::kCompactAll}); if (s.ok()) { LOG(INFO) << "[Interval]schedule compactRange, freesize: " << free_size / 1048576 diff --git a/src/pika_set.cc b/src/pika_set.cc index fb4e620035..6e0174ab5d 100644 --- a/src/pika_set.cc +++ b/src/pika_set.cc @@ -257,7 +257,11 @@ void SRemCmd::DoInitial() { void SRemCmd::Do(std::shared_ptr slot) { s_ = slot->db()->SRem(key_, members_, &deleted_); - res_.AppendInteger(deleted_); + if (s_.ok() || s_.IsNotFound()) { + res_.AppendInteger(deleted_); + } else { + res_.SetRes(CmdRes::kErrOther, s_.ToString()); + } } void SRemCmd::DoThroughDB(std::shared_ptr slot) { @@ -282,11 +286,15 @@ void SUnionCmd::DoInitial() { void SUnionCmd::Do(std::shared_ptr slot) { std::vector members; - slot->db()->SUnion(keys_, &members); - res_.AppendArrayLenUint64(members.size()); - for (const auto& member : members) { - res_.AppendStringLenUint64(member.size()); - res_.AppendContent(member); + s_ = slot->db()->SUnion(keys_, &members); + if (s_.ok() || s_.IsNotFound()) { + res_.AppendArrayLenUint64(members.size()); + for (const auto& member : members) { + res_.AppendStringLenUint64(member.size()); + res_.AppendContent(member); + } + } else { + res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } @@ -374,11 +382,15 @@ void SInterCmd::DoInitial() { void SInterCmd::Do(std::shared_ptr slot) { std::vector members; - slot->db()->SInter(keys_, &members); - res_.AppendArrayLenUint64(members.size()); - for (const auto& member : members) { - res_.AppendStringLenUint64(member.size()); - res_.AppendContent(member); + s_ = slot->db()->SInter(keys_, &members); + if (s_.ok() || s_.IsNotFound()) { + res_.AppendArrayLenUint64(members.size()); + for (const auto& member : members) { + res_.AppendStringLenUint64(member.size()); + res_.AppendContent(member); + } + } else { + res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } @@ -395,11 +407,11 @@ void SInterstoreCmd::DoInitial() { void SInterstoreCmd::Do(std::shared_ptr slot) { int32_t count = 0; - rocksdb::Status s = slot->db()->SInterstore(dest_key_, keys_, value_to_dest_, &count); - if (s.ok()) { + s_ = slot->db()->SInterstore(dest_key_, keys_, value_to_dest_, &count); + if (s_.ok()) { res_.AppendInteger(count); } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } @@ -469,11 +481,15 @@ void SDiffCmd::DoInitial() { void SDiffCmd::Do(std::shared_ptr slot) { std::vector members; - slot->db()->SDiff(keys_, &members); - res_.AppendArrayLenUint64(members.size()); - for (const auto& member : members) { - res_.AppendStringLenUint64(member.size()); - res_.AppendContent(member); + s_ = slot->db()->SDiff(keys_, &members); + if (s_.ok() || s_.IsNotFound()) { + res_.AppendArrayLenUint64(members.size()); + for (const auto& member : members) { + res_.AppendStringLenUint64(member.size()); + res_.AppendContent(member); + } + } else { + res_.SetRes(CmdRes::kErrOther,s_.ToString()); } } @@ -522,12 +538,12 @@ void SMoveCmd::DoInitial() { void SMoveCmd::Do(std::shared_ptr slot) { int32_t res = 0; - rocksdb::Status s = slot->db()->SMove(src_key_, dest_key_, member_, &res); - if (s.ok() || s.IsNotFound()) { + s_ = slot->db()->SMove(src_key_, dest_key_, member_, &res); + if (s_.ok() || s_.IsNotFound()) { res_.AppendInteger(res); move_success_ = res; } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } @@ -594,8 +610,8 @@ void SRandmemberCmd::DoInitial() { void SRandmemberCmd::Do(std::shared_ptr slot) { std::vector members; - rocksdb::Status s = slot->db()->SRandmember(key_, static_cast(count_), &members); - if (s.ok() || s.IsNotFound()) { + s_ = slot->db()->SRandmember(key_, static_cast(count_), &members); + if (s_.ok() || s_.IsNotFound()) { if (!reply_arr && (static_cast(!members.empty()) != 0U)) { res_.AppendStringLenUint64(members[0].size()); res_.AppendContent(members[0]); @@ -607,7 +623,7 @@ void SRandmemberCmd::Do(std::shared_ptr slot) { } } } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } diff --git a/src/pika_zset.cc b/src/pika_zset.cc index fb7132b961..15606c01b6 100644 --- a/src/pika_zset.cc +++ b/src/pika_zset.cc @@ -683,12 +683,11 @@ void ZRemCmd::DoInitial() { void ZRemCmd::Do(std::shared_ptr slot) { int32_t count = 0; - rocksdb::Status s = slot->db()->ZRem(key_, members_, &count); - if (s.ok() || s.IsNotFound()) { - AddSlotKey("z", key_, slot); + s_ = slot->db()->ZRem(key_, members_, &count); + if (s_.ok() || s_.IsNotFound()) { res_.AppendInteger(count); } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } @@ -1201,9 +1200,9 @@ void ZRevrangebylexCmd::Do(std::shared_ptr slot) { return; } std::vector members; - rocksdb::Status s = slot->db()->ZRangebylex(key_, min_member_, max_member_, left_close_, right_close_, &members); - if (!s.ok() && !s.IsNotFound()) { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + s_ = slot->db()->ZRangebylex(key_, min_member_, max_member_, left_close_, right_close_, &members); + if (!s_.ok() && !s_.IsNotFound()) { + res_.SetRes(CmdRes::kErrOther, s_.ToString()); return; } FitLimit(count_, offset_, static_cast(members.size())); diff --git a/src/rsync_client.cc b/src/rsync_client.cc index f0254628a4..12da1ddad5 100644 --- a/src/rsync_client.cc +++ b/src/rsync_client.cc @@ -139,7 +139,8 @@ void* RsyncClient::ThreadMain() { } Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) { - std::unique_ptr writer(new RsyncWriter(dir_ + "/" + filename)); + const std::string filepath = dir_ + "/" + filename; + std::unique_ptr writer(new RsyncWriter(filepath)); Status s = Status::OK(); size_t offset = 0; int retries = 0; @@ -150,7 +151,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) { writer.reset(); } if (!s.ok()) { - DeleteFile(filename); + DeleteFile(filepath); } }; @@ -183,12 +184,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) { continue; } - RsyncResponse* resp = nullptr; - DEFER { - if (resp) { - delete resp; - } - }; + std::shared_ptr resp = nullptr; s = wo->Wait(resp); if (s.IsTimeout() || resp == nullptr) { LOG(WARNING) << "rsync request timeout"; @@ -335,19 +331,14 @@ Status RsyncClient::CopyRemoteMeta(std::string* snapshot_uuid, std::setUpdateWaitObject(0, "", kRsyncMeta, 0xFFFFFFFF); + WaitObject* wo = wo_mgr_->UpdateWaitObject(0, "", kRsyncMeta, kInvalidOffset); s = client_thread_->Write(master_ip_, master_port_, to_send); if (!s.ok()) { retries++; } - RsyncResponse* resp = nullptr; - DEFER { - if (resp) { - delete resp; - } - }; + std::shared_ptr resp; s = wo->Wait(resp); - if (s.IsTimeout() || resp == nullptr) { + if (s.IsTimeout() || resp.get() == nullptr) { LOG(WARNING) << "rsync CopyRemoteMeta request timeout, " << "retry times: " << retries; retries++; diff --git a/src/rsync_server.cc b/src/rsync_server.cc index c8d28f51fb..d4064364ff 100644 --- a/src/rsync_server.cc +++ b/src/rsync_server.cc @@ -48,13 +48,15 @@ int RsyncServer::Start() { LOG(INFO) << "start RsyncServer ..."; int res = rsync_server_thread_->StartThread(); if (res != net::kSuccess) { - LOG(FATAL) << "Start rsync Server Thread Error: " << res; + LOG(FATAL) << "Start rsync Server Thread Error. ret_code: " << res << " message: " + << (res == net::kBindError ? ": bind port conflict" : ": other error"); } res = work_thread_->start_thread_pool(); if (res != net::kSuccess) { - LOG(FATAL) << "Start ThreadPool Error: " << res + LOG(FATAL) << "Start rsync Server ThreadPool Error, ret_code: " << res << " message: " << (res == net::kCreateThreadError ? ": create thread error " : ": other error"); } + LOG(INFO) << "RsyncServer started ..."; return res; } diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index b756c5635f..fce6d546c5 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -66,6 +66,7 @@ struct StorageOptions { bool share_block_cache = false; size_t statistics_max_size = 0; size_t small_compaction_threshold = 5000; + size_t small_compaction_duration_threshold = 10000; Status ResetOptions(const OptionType& option_type, const std::unordered_map& options_map); }; @@ -1034,6 +1035,7 @@ class Storage { Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys); Status SetSmallCompactionThreshold(uint32_t small_compaction_threshold); + Status SetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold); std::string GetCurrentTaskType(); Status GetUsage(const std::string& property, uint64_t* result); diff --git a/src/storage/src/base_value_format.h b/src/storage/src/base_value_format.h index c819f786d8..35b200c2ea 100644 --- a/src/storage/src/base_value_format.h +++ b/src/storage/src/base_value_format.h @@ -34,7 +34,6 @@ class InternalValue { } return Status::OK(); } - void set_version(int32_t version = 0) { version_ = version; } static const size_t kDefaultValueSuffixLength = sizeof(int32_t) * 2; virtual rocksdb::Slice Encode() { size_t usize = user_value_.size(); diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index ca56c55091..b6b848c6d4 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -12,8 +12,9 @@ Redis::Redis(Storage* const s, const DataType& type) : storage_(s), type_(type), lock_mgr_(std::make_shared(1000, 0, std::make_shared())), - small_compaction_threshold_(5000) { - statistics_store_ = std::make_unique>(); + small_compaction_threshold_(5000), + small_compaction_duration_threshold_(10000) { + statistics_store_ = std::make_unique>(); scan_cursors_store_ = std::make_unique>(); scan_cursors_store_->SetCapacity(5000); default_compact_range_options_.exclusive_manual_compaction = false; @@ -46,23 +47,40 @@ Status Redis::SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys) { return Status::OK(); } -Status Redis::SetSmallCompactionThreshold(size_t small_compaction_threshold) { +Status Redis::SetSmallCompactionThreshold(uint64_t small_compaction_threshold) { small_compaction_threshold_ = small_compaction_threshold; return Status::OK(); } -Status Redis::UpdateSpecificKeyStatistics(const std::string& key, size_t count) { - if ((statistics_store_->Capacity() != 0U) && (count != 0U)) { - size_t total = 0; - statistics_store_->Lookup(key, &total); - statistics_store_->Insert(key, total + count); - AddCompactKeyTaskIfNeeded(key, total + count); +Status Redis::SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold) { + small_compaction_duration_threshold_ = small_compaction_duration_threshold; + return Status::OK(); +} + +Status Redis::UpdateSpecificKeyStatistics(const std::string& key, uint64_t count) { + if ((statistics_store_->Capacity() != 0U) && (count != 0U) && (small_compaction_threshold_ != 0U)) { + KeyStatistics data; + statistics_store_->Lookup(key, &data); + data.AddModifyCount(count); + statistics_store_->Insert(key, data); + AddCompactKeyTaskIfNeeded(key, data.ModifyCount(), data.AvgDuration()); + } + return Status::OK(); +} + +Status Redis::UpdateSpecificKeyDuration(const std::string& key, uint64_t duration) { + if ((statistics_store_->Capacity() != 0U) && (duration != 0U) && (small_compaction_duration_threshold_ != 0U)) { + KeyStatistics data; + statistics_store_->Lookup(key, &data); + data.AddDuration(duration); + statistics_store_->Insert(key, data); + AddCompactKeyTaskIfNeeded(key, data.ModifyCount(), data.AvgDuration()); } return Status::OK(); } -Status Redis::AddCompactKeyTaskIfNeeded(const std::string& key, size_t total) { - if (total < small_compaction_threshold_) { +Status Redis::AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration) { + if (count < small_compaction_threshold_ || duration < small_compaction_duration_threshold_) { return Status::OK(); } else { storage_->AddBGTask({type_, kCompactRange, {key, key}}); diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index ac1b560f51..f1615e7b8f 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -14,6 +14,8 @@ #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "pstd/include/env.h" + #include "src/lock_mgr.h" #include "src/lru_cache.h" #include "src/mutex_impl.h" @@ -30,6 +32,61 @@ class Redis { rocksdb::DB* GetDB() { return db_; } + struct KeyStatistics { + size_t window_size; + std::deque durations; + + uint64_t modify_count; + + KeyStatistics() : KeyStatistics(10) {} + + KeyStatistics(size_t size) : window_size(size + 2), modify_count(0) {} + + void AddDuration(uint64_t duration) { + durations.push_back(duration); + while (durations.size() > window_size) { + durations.pop_front(); + } + } + uint64_t AvgDuration() { + if (durations.size () < window_size) { + return 0; + } + uint64_t min = durations[0]; + uint64_t max = durations[0]; + uint64_t sum = 0; + for (auto duration : durations) { + if (duration < min) { + min = duration; + } + if (duration > max) { + max = duration; + } + sum += duration; + } + return (sum - max - min) / (durations.size() - 2); + } + void AddModifyCount(uint64_t count) { + modify_count += count; + } + uint64_t ModifyCount() { + return modify_count; + } + }; + + struct KeyStatisticsDurationGuard { + Redis* ctx; + std::string key; + uint64_t start_us; + KeyStatisticsDurationGuard(Redis* that, const std::string& key): ctx(that), key(key), start_us(pstd::NowMicros()) { + } + ~KeyStatisticsDurationGuard() { + uint64_t end_us = pstd::NowMicros(); + uint64_t duration = end_us > start_us ? end_us - start_us : 0; + ctx->UpdateSpecificKeyDuration(key, duration); + } + }; + Status SetOptions(const OptionType& option_type, const std::unordered_map& options); void SetWriteWalOptions(const bool is_wal_disable); @@ -54,7 +111,8 @@ class Redis { virtual Status TTL(const Slice& key, int64_t* timestamp) = 0; Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys); - Status SetSmallCompactionThreshold(size_t small_compaction_threshold); + Status SetSmallCompactionThreshold(uint64_t small_compaction_threshold); + Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold); void GetRocksDBInfo(std::string &info, const char *prefix); protected: @@ -75,11 +133,13 @@ class Redis { Status StoreScanNextPoint(const Slice& key, const Slice& pattern, int64_t cursor, const std::string& next_point); // For Statistics - std::atomic small_compaction_threshold_; - std::unique_ptr> statistics_store_; + std::atomic_uint64_t small_compaction_threshold_; + std::atomic_uint64_t small_compaction_duration_threshold_; + std::unique_ptr> statistics_store_; - Status UpdateSpecificKeyStatistics(const std::string& key, size_t count); - Status AddCompactKeyTaskIfNeeded(const std::string& key, size_t total); + Status UpdateSpecificKeyStatistics(const std::string& key, uint64_t count); + Status UpdateSpecificKeyDuration(const std::string& key, uint64_t duration); + Status AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration); }; } // namespace storage diff --git a/src/storage/src/redis_hashes.cc b/src/storage/src/redis_hashes.cc index 6d28f76ec7..4d1c9bf6b7 100644 --- a/src/storage/src/redis_hashes.cc +++ b/src/storage/src/redis_hashes.cc @@ -22,6 +22,7 @@ RedisHashes::RedisHashes(Storage* const s, const DataType& type) : Redis(s, type Status RedisHashes::Open(const StorageOptions& storage_options, const std::string& db_path) { statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; + small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold; rocksdb::Options ops(storage_options.options); Status s = rocksdb::DB::Open(ops, db_path, &db_); @@ -298,6 +299,7 @@ Status RedisHashes::HGetall(const Slice& key, std::vector* fvs) { version = parsed_hashes_meta_value.version(); HashesDataKey hashes_data_key(key, version, ""); Slice prefix = hashes_data_key.Encode(); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedHashesDataKey parsed_hashes_data_key(iter->key()); @@ -516,6 +518,7 @@ Status RedisHashes::HKeys(const Slice& key, std::vector* fields) { version = parsed_hashes_meta_value.version(); HashesDataKey hashes_data_key(key, version, ""); Slice prefix = hashes_data_key.Encode(); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedHashesDataKey parsed_hashes_data_key(iter->key()); @@ -788,6 +791,7 @@ Status RedisHashes::HVals(const Slice& key, std::vector* values) { version = parsed_hashes_meta_value.version(); HashesDataKey hashes_data_key(key, version, ""); Slice prefix = hashes_data_key.Encode(); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { values->push_back(iter->value().ToString()); @@ -850,6 +854,7 @@ Status RedisHashes::HScan(const Slice& key, int64_t cursor, const std::string& p HashesDataKey hashes_data_prefix(key, version, sub_field); HashesDataKey hashes_start_data_key(key, version, start_point); std::string prefix = hashes_data_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(hashes_start_data_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix); iter->Next()) { @@ -900,6 +905,7 @@ Status RedisHashes::HScanx(const Slice& key, const std::string& start_field, con HashesDataKey hashes_data_prefix(key, version, Slice()); HashesDataKey hashes_start_data_key(key, version, start_field); std::string prefix = hashes_data_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(hashes_start_data_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix); iter->Next()) { @@ -956,6 +962,7 @@ Status RedisHashes::PKHScanRange(const Slice& key, const Slice& field_start, con HashesDataKey hashes_data_prefix(key, version, Slice()); HashesDataKey hashes_start_data_key(key, version, field_start); std::string prefix = hashes_data_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(start_no_limit ? prefix : hashes_start_data_key.Encode()); iter->Valid() && remain > 0 && iter->key().starts_with(prefix); iter->Next()) { @@ -1016,6 +1023,7 @@ Status RedisHashes::PKHRScanRange(const Slice& key, const Slice& field_start, co HashesDataKey hashes_data_prefix(key, version, Slice()); HashesDataKey hashes_start_data_key(key, start_key_version, start_key_field); std::string prefix = hashes_data_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->SeekForPrev(hashes_start_data_key.Encode().ToString()); iter->Valid() && remain > 0 && iter->key().starts_with(prefix); iter->Prev()) { diff --git a/src/storage/src/redis_lists.cc b/src/storage/src/redis_lists.cc index 6360f94620..e2d484b3e4 100644 --- a/src/storage/src/redis_lists.cc +++ b/src/storage/src/redis_lists.cc @@ -26,6 +26,7 @@ RedisLists::RedisLists(Storage* const s, const DataType& type) : Redis(s, type) Status RedisLists::Open(const StorageOptions& storage_options, const std::string& db_path) { statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; + small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold; rocksdb::Options ops(storage_options.options); Status s = rocksdb::DB::Open(ops, db_path, &db_); diff --git a/src/storage/src/redis_sets.cc b/src/storage/src/redis_sets.cc index 4d8c7b2eee..f76217eb32 100644 --- a/src/storage/src/redis_sets.cc +++ b/src/storage/src/redis_sets.cc @@ -21,8 +21,6 @@ namespace storage { RedisSets::RedisSets(Storage* const s, const DataType& type) : Redis(s, type) { - spop_counts_store_ = std::make_unique>(); - spop_counts_store_->SetCapacity(1000); } RedisSets::~RedisSets() = default; @@ -30,6 +28,7 @@ RedisSets::~RedisSets() = default; rocksdb::Status RedisSets::Open(const StorageOptions& storage_options, const std::string& db_path) { statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; + small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold; rocksdb::Options ops(storage_options.options); rocksdb::Status s = rocksdb::DB::Open(ops, db_path, &db_); @@ -325,6 +324,7 @@ rocksdb::Status RedisSets::SDiff(const std::vector& keys, std::vect version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(keys[0], version, Slice()); prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, keys[0]); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -393,6 +393,7 @@ rocksdb::Status RedisSets::SDiffstore(const Slice& destination, const std::vecto version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(keys[0], version, Slice()); Slice prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, keys[0]); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -493,6 +494,7 @@ rocksdb::Status RedisSets::SInter(const std::vector& keys, std::vec version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(keys[0], version, Slice()); Slice prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, keys[0]); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -575,6 +577,7 @@ rocksdb::Status RedisSets::SInterstore(const Slice& destination, const std::vect version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(keys[0], version, Slice()); Slice prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, keys[0]); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -685,6 +688,7 @@ rocksdb::Status RedisSets::SMembers(const Slice& key, std::vector* version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(key, version, Slice()); Slice prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -832,7 +836,7 @@ rocksdb::Status RedisSets::SMove(const Slice& source, const Slice& destination, return s; } -rocksdb::Status RedisSets::SPop(const Slice& key, std::vector* members, bool* need_compact, int64_t cnt) { +rocksdb::Status RedisSets::SPop(const Slice& key, std::vector* members, int64_t cnt) { std::default_random_engine engine; std::string meta_value; @@ -890,6 +894,7 @@ rocksdb::Status RedisSets::SPop(const Slice& key, std::vector* memb SetsMemberKey sets_member_key(key, version, Slice()); int64_t del_count = 0; + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(default_read_options_, handles_[1]); for (iter->Seek(sets_member_key.Encode()); iter->Valid() && cur_index < size; @@ -911,34 +916,14 @@ rocksdb::Status RedisSets::SPop(const Slice& key, std::vector* memb parsed_sets_meta_value.ModifyCount(static_cast(-cnt)); batch.Put(handles_[0], key, meta_value); delete iter; - } - } } else { return s; } - uint64_t count = 0; - uint64_t duration = pstd::NowMicros() - start_us; - AddAndGetSpopCount(key.ToString(), &count); - if (duration >= SPOP_COMPACT_THRESHOLD_DURATION - || count >= SPOP_COMPACT_THRESHOLD_COUNT) { - *need_compact = true; - ResetSpopCount(key.ToString()); - } return db_->Write(default_write_options_, &batch); } -rocksdb::Status RedisSets::ResetSpopCount(const std::string& key) { return spop_counts_store_->Remove(key); } - -rocksdb::Status RedisSets::AddAndGetSpopCount(const std::string& key, uint64_t* count) { - size_t old_count = 0; - spop_counts_store_->Lookup(key, &old_count); - spop_counts_store_->Insert(key, old_count + 1); - *count = old_count + 1; - return rocksdb::Status::OK(); -} - rocksdb::Status RedisSets::SRandmember(const Slice& key, int32_t count, std::vector* members) { if (count == 0) { return rocksdb::Status::OK(); @@ -988,6 +973,7 @@ rocksdb::Status RedisSets::SRandmember(const Slice& key, int32_t count, std::vec int32_t cur_index = 0; int32_t idx = 0; SetsMemberKey sets_member_key(key, version, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(default_read_options_, handles_[1]); for (iter->Seek(sets_member_key.Encode()); iter->Valid() && cur_index < size; iter->Next(), cur_index++) { if (static_cast(idx) >= targets.size()) { @@ -1087,6 +1073,7 @@ rocksdb::Status RedisSets::SUnion(const std::vector& keys, std::vec for (const auto& key_version : vaild_sets) { SetsMemberKey sets_member_key(key_version.key, key_version.version, Slice()); prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, key_version.key); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -1136,6 +1123,7 @@ rocksdb::Status RedisSets::SUnionstore(const Slice& destination, const std::vect for (const auto& key_version : vaild_sets) { SetsMemberKey sets_member_key(key_version.key, key_version.version, Slice()); prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, key_version.key); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -1220,6 +1208,7 @@ rocksdb::Status RedisSets::SScan(const Slice& key, int64_t cursor, const std::st SetsMemberKey sets_member_prefix(key, version, sub_member); SetsMemberKey sets_member_key(key, version, start_point); std::string prefix = sets_member_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(sets_member_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix); iter->Next()) { diff --git a/src/storage/src/redis_sets.h b/src/storage/src/redis_sets.h index aa303b16a5..2898d0e9e7 100644 --- a/src/storage/src/redis_sets.h +++ b/src/storage/src/redis_sets.h @@ -10,15 +10,10 @@ #include #include -#include "pstd/include/env.h" - #include "src/custom_comparator.h" #include "src/lru_cache.h" #include "src/redis.h" -#define SPOP_COMPACT_THRESHOLD_COUNT 500 -#define SPOP_COMPACT_THRESHOLD_DURATION (1000 * 1000) // 1000ms - namespace storage { class RedisSets : public Redis { @@ -46,7 +41,7 @@ class RedisSets : public Redis { Status SMembers(const Slice& key, std::vector* members); Status SMembersWithTTL(const Slice& key, std::vector* members, int64_t* ttl); Status SMove(const Slice& source, const Slice& destination, const Slice& member, int32_t* ret); - Status SPop(const Slice& key, std::vector* members, bool* need_compact, int64_t cnt); + Status SPop(const Slice& key, std::vector* members, int64_t cnt); Status SRandmember(const Slice& key, int32_t count, std::vector* members); Status SRem(const Slice& key, const std::vector& members, int32_t* ret); Status SUnion(const std::vector& keys, std::vector* members); @@ -71,12 +66,6 @@ class RedisSets : public Redis { // Iterate all data void ScanDatabase(); - - private: - // For compact in time after multiple spop - std::unique_ptr> spop_counts_store_; - Status ResetSpopCount(const std::string& key); - Status AddAndGetSpopCount(const std::string& key, uint64_t* count); }; } // namespace storage diff --git a/src/storage/src/redis_zsets.cc b/src/storage/src/redis_zsets.cc index e516f9d140..4da415901f 100644 --- a/src/storage/src/redis_zsets.cc +++ b/src/storage/src/redis_zsets.cc @@ -31,6 +31,7 @@ RedisZSets::RedisZSets(Storage* const s, const DataType& type) : Redis(s, type) Status RedisZSets::Open(const StorageOptions& storage_options, const std::string& db_path) { statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; + small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold; rocksdb::Options ops(storage_options.options); Status s = rocksdb::DB::Open(ops, db_path, &db_); @@ -229,6 +230,7 @@ Status RedisZSets::ZPopMax(const Slice& key, const int64_t count, std::vector::max(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[2]); int32_t del_cnt = 0; for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && del_cnt < num; iter->Prev()) { @@ -274,6 +276,7 @@ Status RedisZSets::ZPopMin(const Slice& key, const int64_t count, std::vector::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[2]); int32_t del_cnt = 0; for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && del_cnt < num; iter->Next()) { @@ -439,6 +442,7 @@ Status RedisZSets::ZCount(const Slice& key, double min, double max, bool left_cl int32_t stop_index = parsed_zsets_meta_value.count() - 1; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, min, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { bool left_pass = false; @@ -563,6 +567,7 @@ Status RedisZSets::ZRange(const Slice& key, int32_t start, int32_t stop, std::ve int32_t cur_index = 0; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { if (cur_index >= start_index) { @@ -661,6 +666,7 @@ Status RedisZSets::ZRangebyscore(const Slice& key, double min, double max, bool int64_t skipped = 0; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, min, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && index <= stop_index; iter->Next(), ++index) { bool left_pass = false; @@ -725,6 +731,7 @@ Status RedisZSets::ZRank(const Slice& key, const Slice& member, int32_t* rank) { int32_t stop_index = parsed_zsets_meta_value.count() - 1; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && index <= stop_index; iter->Next(), ++index) { ParsedZSetsScoreKey parsed_zsets_score_key(iter->key()); @@ -830,6 +837,7 @@ Status RedisZSets::ZRemrangebyrank(const Slice& key, int32_t start, int32_t stop return s; } ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { if (cur_index >= start_index) { @@ -878,6 +886,7 @@ Status RedisZSets::ZRemrangebyscore(const Slice& key, double min, double max, bo int32_t stop_index = parsed_zsets_meta_value.count() - 1; int32_t version = parsed_zsets_meta_value.version(); ZSetsScoreKey zsets_score_key(key, version, min, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { bool left_pass = false; @@ -953,6 +962,7 @@ Status RedisZSets::ZRevrange(const Slice& key, int32_t start, int32_t stop, std: int32_t cur_index = count - 1; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::max(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && cur_index >= start_index; iter->Prev(), --cur_index) { @@ -991,6 +1001,7 @@ Status RedisZSets::ZRevrangebyscore(const Slice& key, double min, double max, bo int64_t skipped = 0; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, std::nextafter(max, std::numeric_limits::max()), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && left > 0; iter->Prev(), --left) { bool left_pass = false; @@ -1055,6 +1066,7 @@ Status RedisZSets::ZRevrank(const Slice& key, const Slice& member, int32_t* rank int32_t left = parsed_zsets_meta_value.count(); int32_t version = parsed_zsets_meta_value.version(); ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::max(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && left >= 0; iter->Prev(), --left, ++rev_index) { ParsedZSetsScoreKey parsed_zsets_score_key(iter->key()); @@ -1137,6 +1149,7 @@ Status RedisZSets::ZUnionstore(const Slice& destination, const std::vector::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, keys[idx]); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { @@ -1253,6 +1266,7 @@ Status RedisZSets::ZInterstore(const Slice& destination, const std::vector::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, vaild_zsets[0].key); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { ParsedZSetsScoreKey parsed_zsets_score_key(iter->key()); @@ -1357,6 +1371,7 @@ Status RedisZSets::ZRangebylex(const Slice& key, const Slice& min, const Slice& int32_t cur_index = 0; int32_t stop_index = parsed_zsets_meta_value.count() - 1; ZSetsMemberKey zsets_member_key(key, version, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(zsets_member_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { bool left_pass = false; @@ -1417,6 +1432,7 @@ Status RedisZSets::ZRemrangebylex(const Slice& key, const Slice& min, const Slic int32_t cur_index = 0; int32_t stop_index = parsed_zsets_meta_value.count() - 1; ZSetsMemberKey zsets_member_key(key, version, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(zsets_member_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { bool left_pass = false; @@ -1641,6 +1657,7 @@ Status RedisZSets::ZScan(const Slice& key, int64_t cursor, const std::string& pa ZSetsMemberKey zsets_member_prefix(key, version, sub_member); ZSetsMemberKey zsets_member_key(key, version, start_point); std::string prefix = zsets_member_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(zsets_member_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix); iter->Next()) { diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index a7c25043c7..d16548b9c2 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -337,11 +337,7 @@ Status Storage::SMove(const Slice& source, const Slice& destination, const Slice } Status Storage::SPop(const Slice& key, std::vector* members, int64_t count) { - bool need_compact = false; - Status status = sets_db_->SPop(key, members, &need_compact, count); - if (need_compact) { - AddBGTask({kSets, kCompactRange, {key.ToString(), key.ToString()}}); - } + Status status = sets_db_->SPop(key, members, count); return status; } @@ -1661,6 +1657,14 @@ Status Storage::SetSmallCompactionThreshold(uint32_t small_compaction_threshold) return Status::OK(); } +Status Storage::SetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold) { + std::vector dbs = {sets_db_.get(), zsets_db_.get(), hashes_db_.get(), lists_db_.get()}; + for (const auto& db : dbs) { + db->SetSmallCompactionDurationThreshold(small_compaction_duration_threshold); + } + return Status::OK(); +} + std::string Storage::GetCurrentTaskType() { int type = current_task_type_; switch (type) { diff --git a/tests/helpers/redis_queue.py b/tests/helpers/redis_queue.py index 3d0eb8fe12..9203c2d0db 100644 --- a/tests/helpers/redis_queue.py +++ b/tests/helpers/redis_queue.py @@ -6,13 +6,18 @@ START_FLAG = True + def enqueue(client: redis.Redis, queue_name: str): while START_FLAG: + n = client.zcard(queue_name) + if n >= 1000: + time.sleep(0.1) + continue now_ms = int(time.time() * 1000) pipeline = client.pipeline(transaction=False) for i in range(10): score = now_ms << 5 | i - pipeline.zadd(queue_name, {str(i): score}) + pipeline.zadd(queue_name, {str(score): score}) pipeline.execute() print("enqueue exit") @@ -23,6 +28,7 @@ def dequeue(client: redis.Redis, queue_name: str): start_time = time.time() n = client.zcard(queue_name) if n <= 10: + time.sleep(0.1) continue res = client.zremrangebyrank(queue_name, 0, 9) latency = time.time() - start_time @@ -45,37 +51,58 @@ def compact(client: redis.Redis, queue_name: str): print("compact exit") +def auto_compact(client: redis.Redis): + client.config_set("max-cache-statistic-keys", 10000) + client.config_set("small-compaction-threshold", 10000) + client.config_set("small-compaction-duration-threshold", 10000) + + def main(): - if len(sys.argv) != 4: - print("Usage: python redis_queue.py ") + if len(sys.argv) != 5: + print("Usage: python redis_queue.py $redis_host $port $passwd [compact | auto_compact]") sys.exit(1) host = sys.argv[1] port = int(sys.argv[2]) passwd = sys.argv[3] - client_enqueue = redis.Redis(host=host, port=port, password=passwd) - client_dequeue = redis.Redis(host=host, port=port, password=passwd) - client_compact = redis.Redis(host=host, port=port, password=passwd) + mode = sys.argv[4] + + thread_list = [] queue_name = "test_queue" + + client_enqueue = redis.Redis(host=host, port=port, password=passwd) t1 = threading.Thread(target=enqueue, args=(client_enqueue, queue_name)) - t1.start() + thread_list.append(t1) + + client_dequeue = redis.Redis(host=host, port=port, password=passwd) t2 = threading.Thread(target=dequeue, args=(client_dequeue, queue_name)) - t2.start() - t3 = threading.Thread(target=compact, args=(client_compact, queue_name)) - t3.start() - - def signal_handler(signal, frame): + thread_list.append(t2) + + client_compact = redis.Redis(host=host, port=port, password=passwd) + if mode == "compact": + t3 = threading.Thread(target=compact, args=(client_compact, queue_name)) + thread_list.append(t3) + elif mode == "auto_compact": + auto_compact(client_compact) + else: + print("invalid compact mode: {}".format(mode)) + sys.exit(1) + + for t in thread_list: + t.start() + + def signal_handler(signal, frame): print("revc signal: {}".format(signal)) global START_FLAG START_FLAG = False - t1.join() - t2.join() - t3.join() + for t in thread_list: + t.join() print("exit") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGQUIT, signal_handler) + while True: time.sleep(60) diff --git a/tests/integration/list_test.go b/tests/integration/list_test.go index d3ddeff15e..1647d897f2 100644 --- a/tests/integration/list_test.go +++ b/tests/integration/list_test.go @@ -2,6 +2,7 @@ package pika_integration import ( "context" + "sync" "time" . "github.com/bsm/ginkgo/v2" @@ -10,9 +11,33 @@ import ( "github.com/redis/go-redis/v9" ) +func issueBLPop(ctx *context.Context, client *redis.Client, lists []string, timeout time.Duration) { + client.BLPop(*ctx, timeout, lists...) +} + +func issueBRPop(ctx *context.Context, client *redis.Client, lists []string, timeout time.Duration) { + client.BRPop(*ctx, timeout, lists...) +} + +func issueLPush(ctx *context.Context, client *redis.Client, list string, value string) { + defer GinkgoRecover() + LPush := client.LPush(*ctx, list, value) + Expect(LPush.Err()).NotTo(HaveOccurred()) + Expect(LPush.Val()).To(Equal([]string{list, "v"})) +} + +func issueRPush(ctx *context.Context, client *redis.Client, list string, value string) { + defer GinkgoRecover() + RPush := client.RPush(*ctx, list, value) + Expect(RPush.Err()).NotTo(HaveOccurred()) + Expect(RPush.Val()).To(Equal([]string{list, "v"})) +} + var _ = Describe("List Commands", func() { ctx := context.TODO() var client *redis.Client + var blocked bool + var blockedLock sync.Mutex BeforeEach(func() { client = redis.NewClient(pikaOptions1()) @@ -121,6 +146,507 @@ var _ = Describe("List Commands", func() { } }) + It("should BLPopBlocks Timeout Auto UnBlock", func() { + err := client.Del(ctx, "list") + Expect(err.Err()).NotTo(HaveOccurred()) + blocked = true + + go func() { + val, err := client.BLPop(ctx, time.Second, "list").Result() + Expect(err).To(Equal(redis.Nil)) + Expect(val).To(BeNil()) + blockedLock.Lock() + blocked = false + blockedLock.Unlock() + }() + + time.Sleep(2 * time.Second) + blockedLock.Lock() + Expect(blocked).To(Equal(false)) + blockedLock.Unlock() + }) + + It("should BRPopBlocks Timeout Auto UnBlock", func() { + err := client.Del(ctx, "list") + Expect(err.Err()).NotTo(HaveOccurred()) + blocked = true + + go func() { + val, err := client.BRPop(ctx, time.Second, "list").Result() + Expect(err).To(Equal(redis.Nil)) + Expect(val).To(BeNil()) + blockedLock.Lock() + blocked = false + blockedLock.Unlock() + }() + + time.Sleep(2 * time.Second) + blockedLock.Lock() + Expect(blocked).To(Equal(false)) + blockedLock.Unlock() + }) + + It("should BLPopBlocks LPush UnBlock", func() { + err := client.Del(ctx, "list") + Expect(err.Err()).NotTo(HaveOccurred()) + + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + bLPop := client.BLPop(ctx, 0, "list") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{"list", "a"})) + done <- true + }() + <-started + + select { + case <-done: + Fail("BLPop is not blocked") + case <-time.After(time.Second): + // ok + } + + lPush := client.LPush(ctx, "list", "a") + Expect(lPush.Err()).NotTo(HaveOccurred()) + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BLPop is still blocked") + } + }) + + It("should BRPopBlocks LPush UnBlock", func() { + err := client.Del(ctx, "list") + Expect(err.Err()).NotTo(HaveOccurred()) + + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + bRPop := client.BRPop(ctx, 0, "list") + Expect(bRPop.Err()).NotTo(HaveOccurred()) + Expect(bRPop.Val()).To(Equal([]string{"list", "a"})) + done <- true + }() + <-started + + select { + case <-done: + Fail("BRPop is not blocked") + case <-time.After(time.Second): + // ok + } + + lPush := client.LPush(ctx, "list", "a") + Expect(lPush.Err()).NotTo(HaveOccurred()) + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BRPop is still blocked") + } + }) + + It("should BLPopBlocks RPopLPush UnBlock", func() { + err := client.Del(ctx, "list1", "list2") + Expect(err.Err()).NotTo(HaveOccurred()) + + lPush := client.LPush(ctx, "list1", "a") + Expect(lPush.Err()).NotTo(HaveOccurred()) + + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + bLPop := client.BLPop(ctx, 0, "list2") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{"list2", "a"})) + done <- true + }() + <-started + + select { + case <-done: + Fail("BLPop is not blocked") + case <-time.After(time.Second): + // ok + } + + rPopLPush := client.RPopLPush(ctx, "list1", "list2") + Expect(rPopLPush.Err()).NotTo(HaveOccurred()) + Expect(rPopLPush.Val()).To(Equal("a")) + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BLPop is still blocked") + } + }) + + It("should BRPopBlocks RPopLPush UnBlock", func() { + err := client.Del(ctx, "list1", "list2") + Expect(err.Err()).NotTo(HaveOccurred()) + + lPush := client.LPush(ctx, "list1", "a") + Expect(lPush.Err()).NotTo(HaveOccurred()) + + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + bLPop := client.BRPop(ctx, 0, "list2") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{"list2", "a"})) + done <- true + }() + <-started + + select { + case <-done: + Fail("BRPop is not blocked") + case <-time.After(time.Second): + // ok + } + + rPopLPush := client.RPopLPush(ctx, "list1", "list2") + Expect(rPopLPush.Err()).NotTo(HaveOccurred()) + Expect(rPopLPush.Val()).To(Equal("a")) + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BRPop is still blocked") + } + }) + + It("should BLPop BRPop Lists", func() { + err := client.Del(ctx, "list1", "list2", "list3") + Expect(err.Err()).NotTo(HaveOccurred()) + + rPush := client.RPush(ctx, "list1", "a", "large", "c") + Expect(rPush.Err()).NotTo(HaveOccurred()) + rPush = client.RPush(ctx, "list2", "d", "large", "f") + Expect(rPush.Err()).NotTo(HaveOccurred()) + + bLPop := client.BLPop(ctx, time.Second, "list1", "list2") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{"list1", "a"})) + + bRPop := client.BRPop(ctx, time.Second, "list1", "list2") + Expect(bRPop.Err()).NotTo(HaveOccurred()) + Expect(bRPop.Val()).To(Equal([]string{"list1", "c"})) + + lLen := client.LLen(ctx, "list1") + Expect(lLen.Err()).NotTo(HaveOccurred()) + Expect(lLen.Val()).To(Equal(int64(1))) + + lLen = client.LLen(ctx, "list2") + Expect(lLen.Err()).NotTo(HaveOccurred()) + Expect(lLen.Val()).To(Equal(int64(3))) + + bLPop = client.BLPop(ctx, time.Second, "list2", "list1") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{"list2", "d"})) + + bRPop = client.BRPop(ctx, time.Second, "list2", "list1") + Expect(bRPop.Err()).NotTo(HaveOccurred()) + Expect(bRPop.Val()).To(Equal([]string{"list2", "f"})) + + lLen = client.LLen(ctx, "list1") + Expect(lLen.Err()).NotTo(HaveOccurred()) + Expect(lLen.Val()).To(Equal(int64(1))) + + lLen = client.LLen(ctx, "list2") + Expect(lLen.Err()).NotTo(HaveOccurred()) + Expect(lLen.Val()).To(Equal(int64(1))) + + bLPop = client.BLPop(ctx, time.Second, "list3", "list2") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{"list2", "large"})) + + bRPop = client.BRPop(ctx, time.Second, "list3", "list1") + Expect(bRPop.Err()).NotTo(HaveOccurred()) + Expect(bRPop.Val()).To(Equal([]string{"list1", "large"})) + }) + + It("should BLPop Same Key Multiple Times", func() { + err := client.Del(ctx, "list1", "list2") + Expect(err.Err()).NotTo(HaveOccurred()) + + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + bLPop := client.BLPop(ctx, 0, "list1", "list2", "list2", "list1") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{"list1", "a"})) + done <- true + }() + <-started + + select { + case <-done: + Fail("BLPop is not blocked") + case <-time.After(time.Second): + // ok + } + + LPush := client.LPush(ctx, "list1", "a") + Expect(LPush.Err()).NotTo(HaveOccurred()) + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BLPop is still blocked") + } + }) + + It("should BRPop Same Key Multiple Times", func() { + err := client.Del(ctx, "list1", "list2") + Expect(err.Err()).NotTo(HaveOccurred()) + + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + bRPop := client.BRPop(ctx, 0, "list1", "list2", "list2", "list1") + Expect(bRPop.Err()).NotTo(HaveOccurred()) + Expect(bRPop.Val()).To(Equal([]string{"list2", "a"})) + done <- true + }() + <-started + + select { + case <-done: + Fail("BRPop is not blocked") + case <-time.After(time.Second): + // ok + } + + RPush := client.RPush(ctx, "list2", "a") + Expect(RPush.Err()).NotTo(HaveOccurred()) + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BRPop is still blocked") + } + }) + + It("should BLPop After Push Multi Value", func() { + err := client.Del(ctx, "list") + Expect(err.Err()).NotTo(HaveOccurred()) + + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + bLPop := client.BLPop(ctx, 0, "list") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{"list", "bar"})) + done <- true + }() + <-started + + select { + case <-done: + Fail("BRPop is not blocked") + case <-time.After(time.Second): + // ok + } + + lPush := client.LPush(ctx, "list", "foo", "bar") + Expect(lPush.Err()).NotTo(HaveOccurred()) + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BRPop is still blocked") + } + }) + + It("should BRPop After Push Multi Value", func() { + err := client.Del(ctx, "list") + Expect(err.Err()).NotTo(HaveOccurred()) + + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + bRPop := client.BRPop(ctx, 0, "list") + Expect(bRPop.Err()).NotTo(HaveOccurred()) + Expect(bRPop.Val()).To(Equal([]string{"list", "bar"})) + done <- true + }() + <-started + + select { + case <-done: + Fail("BRPop is not blocked") + case <-time.After(time.Second): + // ok + } + + rPush := client.RPush(ctx, "list", "foo", "bar") + Expect(rPush.Err()).NotTo(HaveOccurred()) + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BRPop is still blocked") + } + }) + + It("should BLPop Serve Priority", func() { + err := client.Del(ctx, "list") + Expect(err.Err()).NotTo(HaveOccurred()) + + go func() { + defer GinkgoRecover() + + bLPop := client.BLPop(ctx, 0, "list") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{"list", "v1"})) + }() + time.Sleep(500 * time.Millisecond) + + go func() { + defer GinkgoRecover() + + bLPop := client.BLPop(ctx, 0, "list") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{"list", "v2"})) + }() + time.Sleep(500 * time.Millisecond) + + rPush := client.RPush(ctx, "list", "v1", "v2") + Expect(rPush.Err()).NotTo(HaveOccurred()) + time.Sleep(1 * time.Second) + }) + + It("should BRPop Serve Priority", func() { + err := client.Del(ctx, "list") + Expect(err.Err()).NotTo(HaveOccurred()) + + go func() { + defer GinkgoRecover() + + bRPop := client.BRPop(ctx, 0, "list") + Expect(bRPop.Err()).NotTo(HaveOccurred()) + Expect(bRPop.Val()).To(Equal([]string{"list", "v4"})) + }() + time.Sleep(500 * time.Millisecond) + + go func() { + defer GinkgoRecover() + + bRPop := client.BRPop(ctx, 0, "list") + Expect(bRPop.Err()).NotTo(HaveOccurred()) + Expect(bRPop.Val()).To(Equal([]string{"list", "v3"})) + }() + time.Sleep(500 * time.Millisecond) + + rPush := client.RPush(ctx, "list", "v3", "v4") + Expect(rPush.Err()).NotTo(HaveOccurred()) + time.Sleep(1 * time.Second) + }) + + /** + It("should Concurrency Block UnBlock", func() { + lists1 := []string{"blist100", "blist101", "blist102", "blist103"} + lists2 := []string{"blist0", "blist1", "blist2", "blist3"} + + err := client.Del(ctx, lists1...) + Expect(err.Err()).NotTo(HaveOccurred()) + err = client.Del(ctx, lists2...) + Expect(err.Err()).NotTo(HaveOccurred()) + + // Add some blocking connections + timeout := 30 * time.Second + for i := 1; i <= 25; i++ { + go issueBLPop(&ctx, client, lists1, timeout) + go issueBRPop(&ctx, client, lists1, timeout) + } + + // Concurrent timeout test + timeout = 2 * time.Second + for i := 1; i <= 50; i++ { + go issueBLPop(&ctx, client, lists2, timeout) + go issueBRPop(&ctx, client, lists2, timeout) + } + time.Sleep(6 * time.Second) + + // Add 100 threads to execute blpop/brpop, and they will be blocked at the same time + timeout = 0 + for i := 1; i <= 50; i++ { + go issueBLPop(&ctx, client, lists2, timeout) + go issueBRPop(&ctx, client, lists2, timeout) + } + + // Push 200 data to ensure that the blocking of the 100 connections can be removed + for i := 1; i <= 50; i++ { + go issueLPush(&ctx, client, "blist2", "v") + go issueRPush(&ctx, client, "blist0", "v") + go issueLPush(&ctx, client, "blist1", "v") + go issueRPush(&ctx, client, "blist3", "v") + } + time.Sleep(5 * time.Second) + }) + + It("should Mixed Concurrency Block UnBlock", func() { + lists1 := []string{"list0", "list1"} + lists2 := []string{"list2", "list3"} + err := client.Del(ctx, lists1...) + Expect(err.Err()).NotTo(HaveOccurred()) + err = client.Del(ctx, lists2...) + Expect(err.Err()).NotTo(HaveOccurred()) + + for i := 1; i <= 25; i++ { + go issueBLPop(&ctx, client, lists1, 3 * time.Second) + go issueBRPop(&ctx, client, lists1, 3 * time.Second) + go issueBLPop(&ctx, client, lists2, 0) + go issueBRPop(&ctx, client, lists2, 0) + } + // Ensure that both blpop/brpop have been executed, and that 50 of the connections are about to start timing out and unblocking + time.Sleep(3 * time.Second) + // And push 100 pieces of data concurrently to ensure that the blocking of the first 50 connections can be removed + for i := 1; i <= 25; i++ { + go issueLPush(&ctx, client, "list2", "v") + go issueRPush(&ctx, client, "list3", "v") + go issueLPush(&ctx, client, "list2", "v") + go issueRPush(&ctx, client, "list3", "v") + } + time.Sleep(10 * time.Second) + }) + **/ + //It("should BRPopLPush", func() { // _, err := client.BRPopLPush(ctx, "list1", "list2", time.Second).Result() // Expect(err).To(Equal(redis.Nil)) diff --git a/tests/integration/options.go b/tests/integration/options.go index 3e3320a310..dc7a0bb8ed 100644 --- a/tests/integration/options.go +++ b/tests/integration/options.go @@ -23,8 +23,8 @@ func pikaOptions1() *redis.Options { ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, MaxRetries: -1, - PoolSize: 10, - PoolTimeout: 30 * time.Second, + PoolSize: 30, + PoolTimeout: 60 * time.Second, } } @@ -36,7 +36,7 @@ func pikaOptions2() *redis.Options { ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, MaxRetries: -1, - PoolSize: 10, - PoolTimeout: 30 * time.Second, + PoolSize: 30, + PoolTimeout: 60 * time.Second, } } diff --git a/tests/integration/replication_test.go b/tests/integration/replication_test.go index a8ac037721..c248c0c5b7 100644 --- a/tests/integration/replication_test.go +++ b/tests/integration/replication_test.go @@ -305,6 +305,48 @@ func execute(ctx *context.Context, clientMaster *redis.Client, num_thread int, f // clientMaster.PFAdd(*ctx, "hll_out", randomString(5)) //} +func issueBLPopCheck(ctx *context.Context, client *redis.Client, list string, random_str string) { + defer GinkgoRecover() + bLPop := client.BLPop(*ctx, 0, "list0", "list1") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{list, random_str})) +} + +func issueBRpopCheck(ctx *context.Context, client *redis.Client, list string, random_str string) { + defer GinkgoRecover() + bRPop := client.BRPop(*ctx, 0, "list0", "list1") + Expect(bRPop.Err()).NotTo(HaveOccurred()) + Expect(bRPop.Val()).To(Equal([]string{list, random_str})) +} + +func issuePushPopFrequency(ctx *context.Context, clientMaster *redis.Client, wg *sync.WaitGroup) { + defer wg.Done() + letters1 := randomString(5) + letters2 := randomString(5) + letters3 := randomString(5) + letters4 := randomString(5) + letters5 := randomString(5) + letters6 := randomString(5) + letters7 := randomString(5) + letters8 := randomString(5) + letters9 := randomString(5) + + clientMaster.LPush(*ctx, "blist0", letters1, letters2, letters3, letters4, letters5) + clientMaster.BLPop(*ctx, 1 * time.Second, "blist0") + clientMaster.BLPop(*ctx, 1 * time.Second, "blist0") + clientMaster.BLPop(*ctx, 1 * time.Second, "blist0") + + clientMaster.RPush(*ctx, "blist0", letters9, letters8, letters7, letters6, letters5) + clientMaster.BRPop(*ctx, 1 * time.Second, "blist0") + clientMaster.BRPop(*ctx, 1 * time.Second, "blist0") + clientMaster.BRPop(*ctx, 1 * time.Second, "blist0") + + clientMaster.RPush(*ctx, "blist0", letters7, letters8, letters9, letters1, letters2) + clientMaster.BLPop(*ctx, 1 * time.Second, "blist0") + clientMaster.BLPop(*ctx, 1 * time.Second, "blist0") + clientMaster.BLPop(*ctx, 1 * time.Second, "blist0") +} + var _ = Describe("shuould replication ", func() { Describe("all replication test", func() { ctx := context.TODO() @@ -327,6 +369,7 @@ var _ = Describe("shuould replication ", func() { Expect(clientSlave.Close()).NotTo(HaveOccurred()) Expect(clientMaster.Close()).NotTo(HaveOccurred()) }) + It("Let The slave become a replica of The master ", func() { infoRes := clientSlave.Info(ctx, "replication") Expect(infoRes.Err()).NotTo(HaveOccurred()) @@ -456,6 +499,66 @@ var _ = Describe("shuould replication ", func() { slave_unionstore_set := clientSlave.SMembers(ctx, "set_out") Expect(slave_unionstore_set.Err()).NotTo(HaveOccurred()) Expect(master_unionstore_set.Val()).To(Equal(slave_unionstore_set.Val())) + + // Blocked master-slave replication test + lists := []string{"list0", "list1"} + err := clientMaster.Del(ctx, lists...) + Expect(err.Err()).NotTo(HaveOccurred()) + + for i := 1; i <= 10; i++ { + random_str1 := randomString(5) + random_str2 := randomString(5) + random_str3 := randomString(5) + random_str4 := randomString(5) + random_str5 := randomString(5) + + go issueBLPopCheck(&ctx, clientMaster, "list1", random_str1) + time.Sleep(1 * time.Second) + go issueBRpopCheck(&ctx, clientMaster, "list0", random_str2) + time.Sleep(1 * time.Second) + go issueBLPopCheck(&ctx, clientMaster, "list1", random_str3) + time.Sleep(1 * time.Second) + go issueBRpopCheck(&ctx, clientMaster, "list0", random_str4) + time.Sleep(1 * time.Second) + go issueBLPopCheck(&ctx, clientMaster, "list1", random_str5) + time.Sleep(1 * time.Second) + + clientMaster.LPush(ctx, "list1", random_str1) + time.Sleep(1 * time.Second) + clientMaster.RPush(ctx, "list0", random_str2) + time.Sleep(1 * time.Second) + clientMaster.LPush(ctx, "list1", random_str3) + time.Sleep(1 * time.Second) + clientMaster.RPush(ctx, "list0", random_str4) + time.Sleep(1 * time.Second) + clientMaster.LPush(ctx, "list1", random_str5) + + for i := int64(0); i < clientMaster.LLen(ctx, "list0").Val(); i++ { + Expect(clientMaster.LIndex(ctx, "list0", i)).To(Equal(clientSlave.LIndex(ctx, "list0", i))) + } + for i := int64(0); i < clientMaster.LLen(ctx, "list1").Val(); i++ { + Expect(clientMaster.LIndex(ctx, "list1", i)).To(Equal(clientSlave.LIndex(ctx, "list1", i))) + } + } + + // High frequency pop/push during unblocking process + lists = []string{"blist0", "blist1"} + err = clientMaster.Del(ctx, lists...) + Expect(err.Err()).NotTo(HaveOccurred()) + + for i := 1; i <= 5; i++ { + go func() { + clientMaster.BLPop(ctx, 0, lists...) + }() + go func() { + clientMaster.BRPop(ctx, 0, lists...) + }() + } + execute(&ctx, clientMaster, 5, issuePushPopFrequency) + + for i := int64(0); i < clientMaster.LLen(ctx, "blist0").Val(); i++ { + Expect(clientMaster.LIndex(ctx, "blist0", i)).To(Equal(clientSlave.LIndex(ctx, "blist0", i))) + } }) }) diff --git a/tests/integration/set_test.go b/tests/integration/set_test.go index 0d6c4d0125..439c2167e4 100644 --- a/tests/integration/set_test.go +++ b/tests/integration/set_test.go @@ -85,6 +85,10 @@ var _ = Describe("Set Commands", func() { sDiff := client.SDiff(ctx, "set1", "set2") Expect(sDiff.Err()).NotTo(HaveOccurred()) Expect(sDiff.Val()).To(ConsistOf([]string{"a", "b"})) + + sDiff = client.SDiff(ctx, "nonexistent_set1", "nonexistent_set2") + Expect(sDiff.Err()).NotTo(HaveOccurred()) + Expect(sDiff.Val()).To(HaveLen(0)) }) It("should SDiffStore", func() { @@ -129,6 +133,10 @@ var _ = Describe("Set Commands", func() { sInter := client.SInter(ctx, "set1", "set2") Expect(sInter.Err()).NotTo(HaveOccurred()) Expect(sInter.Val()).To(Equal([]string{"c"})) + + sInter = client.SInter(ctx, "nonexistent_set1", "nonexistent_set2") + Expect(sInter.Err()).NotTo(HaveOccurred()) + Expect(sInter.Val()).To(HaveLen(0)) }) //It("should SInterCard", func() { @@ -336,6 +344,10 @@ var _ = Describe("Set Commands", func() { sMembers := client.SMembers(ctx, "set") Expect(sMembers.Err()).NotTo(HaveOccurred()) Expect(sMembers.Val()).To(ConsistOf([]string{"three", "two"})) + + sRem = client.SRem(ctx, "nonexistent_set", "one") + Expect(sRem.Err()).NotTo(HaveOccurred()) + Expect(sRem.Val()).To(Equal(int64(0))) }) It("should SUnion", func() { @@ -356,6 +368,10 @@ var _ = Describe("Set Commands", func() { sUnion := client.SUnion(ctx, "set1", "set2") Expect(sUnion.Err()).NotTo(HaveOccurred()) Expect(sUnion.Val()).To(HaveLen(5)) + + sUnion = client.SUnion(ctx, "nonexistent_set1", "nonexistent_set2") + Expect(sUnion.Err()).NotTo(HaveOccurred()) + Expect(sUnion.Val()).To(HaveLen(0)) }) It("should SUnionStore", func() { diff --git a/tests/integration/txn_test.go b/tests/integration/txn_test.go index 0945467341..7bddecda1b 100644 --- a/tests/integration/txn_test.go +++ b/tests/integration/txn_test.go @@ -230,6 +230,164 @@ var _ = Describe("Text Txn", func() { AssertEqualRedisString(setValue, cmders[2]) }) + It("blpop within transaction", func() { + txnClient.Del(ctx, "list") + txPipeline := txnClient.TxPipeline() + txPipeline.BLPop(ctx, 1*time.Second, "list") + _, err := txPipeline.Exec(ctx) + Expect(err).To(Equal(redis.Nil)) + }) + + It("brpop within transaction", func() { + txnClient.Del(ctx, "list") + txPipeline := txnClient.TxPipeline() + txPipeline.BRPop(ctx, 1*time.Second, "list") + _, err := txPipeline.Exec(ctx) + Expect(err).To(Equal(redis.Nil)) + }) + + It("should BLPOP, LPUSH + DEL should not awake blocked client", func() { + cmdClient.Del(ctx, "list") + + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + bLPop := cmdClient.BLPop(ctx, 0, "list") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{"list", "a"})) + done <- true + }() + <-started + + select { + case <-done: + Fail("BLPop is not blocked") + case <-time.After(time.Second): + // ok + } + + txnClient.Watch(ctx, func(tx *redis.Tx) error { + pipe := tx.TxPipeline() + pipe.LPush(ctx, "list", "a") + pipe.Del(ctx, "list") + _, err := pipe.Exec(ctx) + Expect(err).NotTo(HaveOccurred()) + return nil + }, "list") + + select { + case <-done: + Fail("BLPop is not blocked") + case <-time.After(time.Second): + // ok + } + + cmdClient.Del(ctx, "list") + cmdClient.LPush(ctx, "list", "a") + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BLPop is still blocked") + } + }) + + It("should BLPOP, LPUSH + DEL + SET should not awake blocked client", func() { + cmdClient.Del(ctx, "list") + + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + bLPop := cmdClient.BLPop(ctx, 0, "list") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{"list", "b"})) + done <- true + }() + <-started + + select { + case <-done: + Fail("BLPop is not blocked") + case <-time.After(time.Second): + // ok + } + + txnClient.Watch(ctx, func(tx *redis.Tx) error { + pipe := tx.TxPipeline() + pipe.LPush(ctx, "list", "a") + pipe.Del(ctx, "list") + pipe.Set(ctx, "list", "foo", 0) + _, err := pipe.Exec(ctx) + Expect(err).NotTo(HaveOccurred()) + return nil + }, "list") + + select { + case <-done: + Fail("BLPop is not blocked") + case <-time.After(time.Second): + // ok + } + + cmdClient.Del(ctx, "list") + cmdClient.LPush(ctx, "list", "b") + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BLPop is still blocked") + } + }) + + It("should MULTI/EXEC is isolated from the point of view of BLPOP", func() { + cmdClient.Del(ctx, "list") + + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + bLPop := cmdClient.BLPop(ctx, 0, "list") + Expect(bLPop.Err()).NotTo(HaveOccurred()) + Expect(bLPop.Val()).To(Equal([]string{"list", "c"})) + done <- true + }() + <-started + + select { + case <-done: + Fail("BLPop is not blocked") + case <-time.After(time.Second): + // ok + } + + txnClient.Watch(ctx, func(tx *redis.Tx) error { + pipe := tx.TxPipeline() + pipe.LPush(ctx, "list", "a") + pipe.LPush(ctx, "list", "b") + pipe.LPush(ctx, "list", "c") + _, err := pipe.Exec(ctx) + Expect(err).NotTo(HaveOccurred()) + return nil + }, "list") + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BLPop is still blocked") + } + }) + }) // Because when there is only one list result, Redis returns in two cases, one with * and one without * , // but go-redis knows only the ones without *