From 39d2b2b5e0f834b445429a1b452ee731b7a365b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=81=E5=B0=8F=E5=B8=85?= <56024577+dingxiaoshuai123@users.noreply.github.com> Date: Tue, 31 Oct 2023 22:35:04 +0800 Subject: [PATCH] test:replication_test (#2089) * test:replication_test * test: Using go's synchronization statements --- .github/workflows/pika.yml | 36 +- tests/integration/pika_replication_test.py | 939 --------------------- tests/integration/replication_test.go | 463 ++++++++++ 3 files changed, 473 insertions(+), 965 deletions(-) delete mode 100644 tests/integration/pika_replication_test.py create mode 100644 tests/integration/replication_test.go diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index f7fd4deac4..868ced268b 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -38,9 +38,7 @@ jobs: if: ${{ steps.cache.output.cache-hit != 'true' }} run: | sudo apt-get install -y autoconf libprotobuf-dev protobuf-compiler - sudo apt-get install -y clang-tidy-12 python3-pip - python3 -m pip install --upgrade pip - python3 -m pip install redis + sudo apt-get install -y clang-tidy-12 - name: Configure CMake # Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make. @@ -68,12 +66,6 @@ jobs: chmod +x ../tests/integration/start_master_and_slave.sh ../tests/integration/start_master_and_slave.sh - - name: Run Python E2E Tests - working-directory: ${{ github.workspace }}/build - run: | - python3 ../tests/integration/pika_replication_test.py - python3 ../tests/unit/Blpop_Brpop_test.py - - name: Run Go E2E Tests working-directory: ${{ github.workspace }}/build run: | @@ -89,11 +81,9 @@ jobs: steps: - name: Install deps run: | - yum install -y wget git autoconf centos-release-scl + yum install -y wget git autoconf centos-release-scl gcc yum install -y devtoolset-10-gcc devtoolset-10-gcc-c++ devtoolset-10-make devtoolset-10-bin-util - yum install -y llvm-toolset-7 llvm-toolset-7-clang tcl which python3 - python3 -m pip install --upgrade pip - python3 -m pip install redis + yum install -y llvm-toolset-7 llvm-toolset-7-clang tcl which - name: Set up Go uses: actions/setup-go@v3 @@ -134,11 +124,13 @@ jobs: chmod +x ../tests/integration/start_master_and_slave.sh ../tests/integration/start_master_and_slave.sh - - name: Run Python E2E Tests + - name: Run Go E2E Tests working-directory: ${{ github.workspace }}/build run: | - python3 ../tests/integration/pika_replication_test.py - python3 ../tests/unit/Blpop_Brpop_test.py + cd ../tests/integration/ + chmod +x integrate_test.sh + sh integrate_test.sh + build_on_macos: runs-on: macos-latest @@ -166,8 +158,6 @@ jobs: brew update brew install --overwrite python autoconf protobuf llvm wget git brew install gcc@10 automake cmake make binutils - python3 -m pip install --upgrade pip - python3 -m pip install redis - name: Configure CMake run: | @@ -193,12 +183,6 @@ jobs: chmod +x ../tests/integration/start_master_and_slave.sh ../tests/integration/start_master_and_slave.sh - - name: Run Python E2E Tests - working-directory: ${{ github.workspace }}/build - run: | - python3 ../tests/integration/pika_replication_test.py - python3 ../tests/unit/Blpop_Brpop_test.py - - name: Run Go E2E Tests working-directory: ${{ github.workspace }}/build run: | @@ -218,13 +202,13 @@ jobs: - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 - + - name: Extract metadata (tags, labels) for Docker id: meta uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 with: images: pikadb/pika - + - name: Build Docker image uses: docker/build-push-action@3b5e8027fcad23fda98b2e3ac259d8d67585f671 with: diff --git a/tests/integration/pika_replication_test.py b/tests/integration/pika_replication_test.py deleted file mode 100644 index 6b816a64d9..0000000000 --- a/tests/integration/pika_replication_test.py +++ /dev/null @@ -1,939 +0,0 @@ -# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # -# This is the basic replication tests for pika -# -# It's also the tests for the issues and pr below: -# relevent issue: -# https://github.com/OpenAtomFoundation/pika/issues/1638 -# https://github.com/OpenAtomFoundation/pika/issues/1608 -# relevent pr: -# https://github.com/OpenAtomFoundation/pika/pull/1658 -# https://github.com/OpenAtomFoundation/pika/issues/1638 -# -# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # -import threading -import time -import redis -import random -import string - - -def test_del_replication(): - print("start del multiple keys replication test") - # 创建Redis客户端 - master = redis.Redis(host=master_ip, port=int(master_port), db=0) - slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) - if delay_slave_of: - slave.slaveof("no", "one") - else: - slave.slaveof(master_ip, master_port) - time.sleep(1) - # 执行日志中的操作 - master.delete('blist0', 'blist1', 'blist2', 'blist3') - master.delete('blist100', 'blist101', 'blist102', 'blist103') - master.delete('blist0', 'blist1', 'blist2', 'blist3') - - master.rpush('blist3', 'v2') - master.lpush('blist2', 'v2') - master.lpop('blist3') - master.rpop('blist2') - - master.lpush('blist2', 'v2') - master.lpop('blist2') - master.rpush('blist3', 'v2') - master.rpop('blist3') - - master.lpush('blist2', 'v2') - master.lpop('blist2') - master.rpush('blist3', 'v2') - master.lpush('blist2', 'v2') - - master.rpop('blist3') - master.lpop('blist2') - master.lpush('blist2', 'v2') - master.rpush('blist3', 'v2') - - master.rpop('blist3') - master.lpop('blist2') - master.rpush('blist3', 'v2') - master.lpush('blist2', 'v2') - - master.rpop('blist3') - master.rpush('blist3', 'v2') - master.lpush('blist2', 'v2') - master.rpush('blist3', 'v2') - - master.rpush('blist3', 'v2') - master.lpush('blist2', 'v2') - master.lpush('blist2', 'v2') - master.rpush('blist3', 'v2') - - master.lpush('blist2', 'v2') - master.rpush('blist3', 'v2') - master.delete('blist1', 'large', 'blist2') - - master.rpush('blist1', 'a', 'large', 'c') - master.rpush('blist2', 'd', 'large', 'f') - - master.lpop('blist1') - master.rpop('blist1') - master.lpop('blist2') - master.rpop('blist2') - - master.delete('blist3') - master.lpop('blist2') - master.rpop('blist1') - - if delay_slave_of: - slave.slaveof(master_ip, master_port) - time.sleep(25) - else: - time.sleep(10) - # Retrieve all keys from the master and slave - m_keys = master.keys() - s_keys = slave.keys() - - # print(m_keys) - # print(s_keys) - # Check if the keys in the master and slave are the same - assert set(m_keys) == set(s_keys), f'Expected: s_keys == m_keys, but got s_keys: {s_keys}, m_keys: {m_keys}' - - lists_ = ['blist1', 'blist2', 'blist3'] - for this_list in lists_: - # Check if the length of the list stored at this_list is the same in master and slave - assert master.llen(this_list) == slave.llen(this_list), \ - f'Expected: master.llen({this_list}) == slave.llen({this_list}), but got {master.llen(this_list)} != {slave.llen(this_list)}' - # Check if each element in the list is the same in the master and slave - for i in range(0, master.llen(this_list)): - mv = master.lindex(this_list, i) - sv = slave.lindex(this_list, i) - assert mv == sv, \ - f"Expected: master.lindex({this_list}, {i}) == slave.lindex({this_list}, {i}), but got {mv} != {sv}" - - master.close() - slave.close() - print("Del multiple keys replication OK [Passed]") - - -def test_msetnx_replication(): - print("start test_msetnx_replication") - master = redis.Redis(host=master_ip, port=int(master_port), db=0) - slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) - if delay_slave_of: - slave.slaveof("no", "one") - else: - slave.slaveof(master_ip, master_port) - time.sleep(1) - - master.delete('1mset_key', '2mset_key', '3mset_key', '4mset_key') - - def random_mset_thread(keys_): - pika = redis.Redis(host=master_ip, port=int(master_port), db=0) - for i in range(0, 3): - kvs1 = {} - kvs2 = {} - kvs3 = {} - kvs4 = {} - letters = string.ascii_letters - for key in keys_: - kvs1[key] = ''.join(random.choice(letters) for _ in range(5)) - kvs2[key] = ''.join(random.choice(letters) for _ in range(5)) - kvs3[key] = ''.join(random.choice(letters) for _ in range(5)) - kvs4[key] = ''.join(random.choice(letters) for _ in range(5)) - pika.set(keys_[2], ''.join(random.choice(letters) for _ in range(5))) - pika.set(keys_[3], ''.join(random.choice(letters) for _ in range(5))) - pika.delete(*keys_) - pika.msetnx(kvs1) - pika.set(keys_[0], ''.join(random.choice(letters) for _ in range(5))) - pika.set(keys_[1], ''.join(random.choice(letters) for _ in range(5))) - pika.delete(*keys_) - pika.msetnx(kvs2) - pika.set(keys_[1], ''.join(random.choice(letters) for _ in range(5))) - pika.set(keys_[2], ''.join(random.choice(letters) for _ in range(5))) - pika.set(keys_[0], ''.join(random.choice(letters) for _ in range(5))) - pika.delete(*keys_) - pika.msetnx(kvs3) - pika.set(keys_[3], ''.join(random.choice(letters) for _ in range(5))) - pika.set(keys_[0], ''.join(random.choice(letters) for _ in range(5))) - pika.set(keys_[1], ''.join(random.choice(letters) for _ in range(5))) - pika.delete(*keys_) - pika.msetnx(kvs4) - pika.set(keys_[3], ''.join(random.choice(letters) for _ in range(5))) - - keys = ['1mset_key', '2mset_key', '3mset_key', '4mset_key'] - threads = [] - for i in range(0, 50): - t = threading.Thread(target=random_mset_thread, args=(keys,)) - threads.append(t) - - for t in threads: - t.start() - for t in threads: - t.join() - - if delay_slave_of: - slave.slaveof(master_ip, master_port) - time.sleep(25) - else: - time.sleep(10) - - for key in keys: - m_v = master.get(key) - s_v = slave.get(key) - assert m_v == s_v, f'Expected: master_v == slave_v, but got slave_v:{s_v}, master_v:{m_v}, using key:{key}' - print("test_msetnx_replication OK [Passed]") - - -def test_mset_replication(): - print("start test_mset_replication") - master = redis.Redis(host=master_ip, port=int(master_port), db=0) - slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) - if delay_slave_of: - slave.slaveof("no", "one") - else: - slave.slaveof(master_ip, master_port) - time.sleep(1) - - keys = ['1mset_key', '2mset_key', '3mset_key', '4mset_key'] - master.delete('1mset_key', '2mset_key', '3mset_key', '4mset_key') - - def random_mset_thread(keys_): - pika = redis.Redis(host=master_ip, port=int(master_port), db=0) - for i in range(0, 3): - kvs1 = {} - kvs2 = {} - kvs3 = {} - kvs4 = {} - letters = string.ascii_letters - for key in keys_: - kvs1[key] = ''.join(random.choice(letters) for _ in range(5)) - kvs2[key] = ''.join(random.choice(letters) for _ in range(5)) - kvs3[key] = ''.join(random.choice(letters) for _ in range(5)) - kvs4[key] = ''.join(random.choice(letters) for _ in range(5)) - pika.set(keys_[2], ''.join(random.choice(letters) for _ in range(5))) - pika.set(keys_[3], ''.join(random.choice(letters) for _ in range(5))) - pika.mset(kvs1) - pika.set(keys_[0], ''.join(random.choice(letters) for _ in range(5))) - pika.set(keys_[1], ''.join(random.choice(letters) for _ in range(5))) - pika.mset(kvs2) - pika.set(keys_[1], ''.join(random.choice(letters) for _ in range(5))) - pika.set(keys_[2], ''.join(random.choice(letters) for _ in range(5))) - pika.set(keys_[0], ''.join(random.choice(letters) for _ in range(5))) - pika.mset(kvs3) - pika.set(keys_[3], ''.join(random.choice(letters) for _ in range(5))) - pika.set(keys_[0], ''.join(random.choice(letters) for _ in range(5))) - pika.set(keys_[1], ''.join(random.choice(letters) for _ in range(5))) - pika.mset(kvs4) - pika.set(keys_[3], ''.join(random.choice(letters) for _ in range(5))) - - threads = [] - for i in range(0, 50): - t = threading.Thread(target=random_mset_thread, args=(keys,)) - threads.append(t) - - for t in threads: - t.start() - for t in threads: - t.join() - - if delay_slave_of: - slave.slaveof(master_ip, master_port) - time.sleep(25) - else: - time.sleep(10) - - for key in keys: - m_v = master.get(key) - s_v = slave.get(key) - assert m_v == s_v, f'Expected: master_v == slave_v, but got slave_v:{s_v}, master_v:{m_v}, using key:{key}' - print("test_mset_replication OK [Passed]") - - -def test_smove_replication(): - print("start test_smove_replication") - - master = redis.Redis(host=master_ip, port=int(master_port), db=0) - slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) - - if delay_slave_of: - slave.slaveof("no", "one") - else: - slave.slaveof(master_ip, master_port) - time.sleep(1) - - master.delete('source_set', 'dest_set') - - def random_smove_thread(): - pika = redis.Redis(host=master_ip, port=int(master_port), db=0) - letters = string.ascii_letters - for i in range(0, 1): - member = ''.join(random.choice(letters) for _ in range(5)) - pika.sadd('source_set', member) - pika.sadd('source_set', member) - pika.sadd('source_set', member) - pika.srem('dest_set', member) - pika.srem('dest_set', member) - pika.smove('source_set', 'dest_set', member) - - threads = [] - for i in range(0, 10): - t = threading.Thread(target=random_smove_thread) - threads.append(t) - - for t in threads: - t.start() - for t in threads: - t.join() - - if delay_slave_of: - slave.slaveof(master_ip, master_port) - time.sleep(25) - else: - time.sleep(10) - m_source_set = master.smembers('source_set') - m_dest_set = master.smembers('dest_set') - s_source_set = slave.smembers('source_set') - s_dest_set = slave.smembers('dest_set') - - assert m_source_set == s_source_set, f'Expected: source_set on master == source_set on slave, but got source_set on slave:{s_source_set}, source_set on master:{m_source_set}' - assert m_dest_set == s_dest_set, f'Expected: dest_set on master == dest_set on slave, but got dest_set on slave:{s_dest_set}, dest_set on master:{m_dest_set}' - print("start test_smove_replication OK [Passed]") - - -def test_rpoplpush_replication(): - print("start test_rpoplpush_replication") - master = redis.Redis(host=master_ip, port=int(master_port), db=0) - slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) - if delay_slave_of: - slave.slaveof("no", "one") - else: - slave.slaveof(master_ip, master_port) - time.sleep(1) - - master.delete('blist0', 'blist1', 'blist') - - def rpoplpush_thread1(): - nonlocal master - for i in range(0, 50): - letters = string.ascii_letters - random_str1 = ''.join(random.choice(letters) for _ in range(5)) - random_str2 = ''.join(random.choice(letters) for _ in range(5)) - random_str3 = ''.join(random.choice(letters) for _ in range(5)) - master.lpush('blist0', random_str1) - master.rpoplpush('blist0', 'blist') - master.lpush('blist', random_str1, random_str2, random_str3) - - master.lpop('blist') - master.rpop('blist') - master.lpush('blist0', random_str3) - master.rpoplpush('blist0', 'blist') - master.rpush('blist', random_str3, random_str2, random_str1) - master.lpop('blist') - master.lpush('blist0', random_str2) - master.rpoplpush('blist0', 'blist') - master.rpop('blist') - - t1 = threading.Thread(target=rpoplpush_thread1) - t2 = threading.Thread(target=rpoplpush_thread1) - t3 = threading.Thread(target=rpoplpush_thread1) - t4 = threading.Thread(target=rpoplpush_thread1) - t5 = threading.Thread(target=rpoplpush_thread1) - t6 = threading.Thread(target=rpoplpush_thread1) - t7 = threading.Thread(target=rpoplpush_thread1) - t8 = threading.Thread(target=rpoplpush_thread1) - t9 = threading.Thread(target=rpoplpush_thread1) - t10 = threading.Thread(target=rpoplpush_thread1) - t1.start() - t2.start() - t3.start() - t4.start() - t5.start() - t6.start() - t7.start() - t8.start() - t9.start() - t10.start() - - t1.join() - t2.join() - t3.join() - t4.join() - t5.join() - t6.join() - t7.join() - t8.join() - t9.join() - t10.join() - - if delay_slave_of: - slave.slaveof(master_ip, master_port) - time.sleep(25) - else: - time.sleep(10) - m_keys = master.keys() - s_keys = slave.keys() - assert s_keys == m_keys, f'Expected: s_keys == m_keys, but got s_keys: {s_keys}, m_keys: {m_keys}' - - for i in range(0, master.llen('blist')): - # print(master.lindex('blist', i)) - # print(slave.lindex('blist', i)) - assert master.lindex('blist', i) == slave.lindex('blist', i), \ - f"Expected:master.lindex('blist', i) == slave.linex('blist', i), but got False when i = {i}" - print("test_rpoplpush_replication OK [Passed]") - - -def test_sdiffstore_replication(): - print("start test_sdiffstore_replication") - - master = redis.Redis(host=master_ip, port=int(master_port), db=0) - slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) - if delay_slave_of: - slave.slaveof("no", "one") - else: - slave.slaveof(master_ip, master_port) - time.sleep(1) - - master.delete('set1', 'set2', 'dest_set') - - def random_sdiffstore_thread(): - pika = redis.Redis(host=master_ip, port=int(master_port), db=0) - letters = string.ascii_letters - for i in range(0, 10): - pika.sadd('set1', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set1', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set1', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set1', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set1', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set1', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set1', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set1', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(letters) for _ in range(5))) - pika.sadd('dest_set', ''.join(random.choice(letters) for _ in range(5))) - pika.sdiffstore('dest_set', 'set1', 'set2') - - threads = [] - for i in range(0, 10): - t = threading.Thread(target=random_sdiffstore_thread) - threads.append(t) - - for t in threads: - t.start() - for t in threads: - t.join() - - if delay_slave_of: - slave.slaveof(master_ip, master_port) - time.sleep(25) - else: - time.sleep(10) - - m_set1 = master.smembers('set1') - m_set2 = master.smembers('set2') - m_dest_set = master.smembers('dest_set') - s_set1 = slave.smembers('set1') - s_set2 = slave.smembers('set2') - s_dest_set = slave.smembers('dest_set') - - assert m_set1 == s_set1, f'Expected: set1 on master == set1 on slave, but got set1 on slave:{s_set1}, set1 on master:{m_set1}' - assert m_set2 == s_set2, f'Expected: set2 on master == set2 on slave, but got set2 on slave:{s_set2}, set2 on master:{m_set2}' - assert m_dest_set == s_dest_set, f'Expected: dest_set on master == dest_set on slave, but got dest_set on slave:{s_dest_set}, dest_set on master:{m_dest_set}' - print("test_sdiffstore_replication OK [Passed]") - - -def test_sinterstore_replication(): - print("start test_sinterstore_replication") - - master = redis.Redis(host=master_ip, port=int(master_port), db=0) - slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) - if delay_slave_of: - slave.slaveof("no", "one") - else: - slave.slaveof(master_ip, master_port) - time.sleep(1) - - master.delete('set1', 'set2', 'dest_set') - - def random_sinterstore_thread(): - pika = redis.Redis(host=master_ip, port=int(master_port), db=0) - letters = string.ascii_letters - for i in range(0, 10): - member = ''.join(random.choice(letters) for _ in range(5)) - member2 = ''.join(random.choice(letters) for _ in range(5)) - member3 = ''.join(random.choice(letters) for _ in range(5)) - member4 = ''.join(random.choice(letters) for _ in range(5)) - member5 = ''.join(random.choice(letters) for _ in range(5)) - member6 = ''.join(random.choice(letters) for _ in range(5)) - pika.sadd('set1', member) - pika.sadd('set2', member) - pika.sadd('set1', member2) - pika.sadd('set2', member2) - pika.sadd('set1', member3) - pika.sadd('set2', member3) - pika.sadd('set1', member4) - pika.sadd('set2', member4) - pika.sadd('set1', member5) - pika.sadd('set2', member5) - pika.sadd('set1', member6) - pika.sadd('set2', member6) - pika.sadd('dest_set', ''.join(random.choice(letters) for _ in range(5))) - pika.sinterstore('dest_set', 'set1', 'set2') - pika.sadd('dest_set', ''.join(random.choice(letters) for _ in range(5))) - - threads = [] - for i in range(0, 10): - t = threading.Thread(target=random_sinterstore_thread) - threads.append(t) - - for t in threads: - t.start() - for t in threads: - t.join() - - if delay_slave_of: - slave.slaveof(master_ip, master_port) - time.sleep(25) - else: - time.sleep(10) - - m_dest_set = master.smembers('dest_set') - s_dest_set = slave.smembers('dest_set') - - assert m_dest_set == s_dest_set, f'Expected: dest_set on master == dest_set on slave, but got dest_set on slave:{s_dest_set}, dest_set on master:{m_dest_set}' - print("test_sinterstore_replication OK [Passed]") - - -def test_zunionstore_replication(): - print("start test_zunionstore_replication") - - master = redis.Redis(host=master_ip, port=int(master_port), db=0) - slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) - if delay_slave_of: - slave.slaveof("no", "one") - else: - slave.slaveof(master_ip, master_port) - time.sleep(1) - - master.delete('zset1', 'zset2', 'zset_out') - - def random_zunionstore_thread(): - pika = redis.Redis(host=master_ip, port=int(master_port), db=0) - for i in range(0, 10): - pika.zadd('zset1', {''.join(random.choice(string.ascii_letters) for _ in range(5)): random.randint(1, 5)}) - pika.zadd('zset2', {''.join(random.choice(string.ascii_letters) for _ in range(5)): random.randint(1, 5)}) - pika.zadd('zset2', {''.join(random.choice(string.ascii_letters) for _ in range(5)): random.randint(1, 5)}) - pika.zadd('zset1', {''.join(random.choice(string.ascii_letters) for _ in range(5)): random.randint(1, 5)}) - pika.zadd('zset2', {''.join(random.choice(string.ascii_letters) for _ in range(5)): random.randint(1, 5)}) - pika.zadd('zset1', {''.join(random.choice(string.ascii_letters) for _ in range(5)): random.randint(1, 5)}) - pika.zadd('zset2', {''.join(random.choice(string.ascii_letters) for _ in range(5)): random.randint(1, 5)}) - pika.zadd('zset2', {''.join(random.choice(string.ascii_letters) for _ in range(5)): random.randint(1, 5)}) - pika.zadd('zset1', {''.join(random.choice(string.ascii_letters) for _ in range(5)): random.randint(1, 5)}) - pika.zadd('zset1', {''.join(random.choice(string.ascii_letters) for _ in range(5)): random.randint(1, 5)}) - pika.zadd('zset2', {''.join(random.choice(string.ascii_letters) for _ in range(5)): random.randint(1, 5)}) - pika.zadd('zset1', {''.join(random.choice(string.ascii_letters) for _ in range(5)): random.randint(1, 5)}) - pika.zunionstore('zset_out', ['zset1', 'zset2']) - pika.zadd('zset_out', - {''.join(random.choice(string.ascii_letters) for _ in range(5)): random.randint(1, 5)}) - - threads = [] - for i in range(0, 10): - t = threading.Thread(target=random_zunionstore_thread) - threads.append(t) - - for t in threads: - t.start() - for t in threads: - t.join() - if delay_slave_of: - slave.slaveof(master_ip, master_port) - time.sleep(25) - else: - time.sleep(10) - - m_zset_out = master.zrange('zset_out', 0, -1, withscores=True) - s_zset_out = slave.zrange('zset_out', 0, -1, withscores=True) - - assert m_zset_out == s_zset_out, f'Expected: zset_out on master == zset_out on slave, but got zset_out on slave:{s_zset_out}, zset_out on master:{m_zset_out}' - print("test_zunionstore_replication OK [Passed]") - - -def test_zinterstore_replication(): - print("start test_zinterstore_replication") - - master = redis.Redis(host=master_ip, port=int(master_port), db=0) - slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) - if delay_slave_of: - slave.slaveof("no", "one") - else: - slave.slaveof(master_ip, master_port) - time.sleep(1) - - master.delete('zset1', 'zset2', 'zset_out') - - def random_zinterstore_thread(): - pika = redis.Redis(host=master_ip, port=int(master_port), db=0) - for i in range(0, 10): - member = ''.join(random.choice(string.ascii_letters) for _ in range(5)) - member2 = ''.join(random.choice(string.ascii_letters) for _ in range(5)) - member3 = ''.join(random.choice(string.ascii_letters) for _ in range(5)) - member4 = ''.join(random.choice(string.ascii_letters) for _ in range(5)) - pika.zadd('zset1', {member: random.randint(1, 5)}) - pika.zadd('zset2', {member: random.randint(1, 5)}) - pika.zadd('zset1', {member2: random.randint(1, 5)}) - pika.zadd('zset2', {member2: random.randint(1, 5)}) - pika.zadd('zset1', {member3: random.randint(1, 5)}) - pika.zadd('zset2', {member3: random.randint(1, 5)}) - pika.zadd('zset1', {member4: random.randint(1, 5)}) - pika.zadd('zset2', {member4: random.randint(1, 5)}) - pika.zinterstore('zset_out', ['zset1', 'zset2']) - - - threads = [] - for i in range(0, 10): - t = threading.Thread(target=random_zinterstore_thread) - threads.append(t) - - for t in threads: - t.start() - for t in threads: - t.join() - - if delay_slave_of: - slave.slaveof(master_ip, master_port) - time.sleep(25) - else: - time.sleep(10) - - m_zset_out = master.zrange('zset_out', 0, -1, withscores=True) - s_zset_out = slave.zrange('zset_out', 0, -1, withscores=True) - - if len(m_zset_out) != len(s_zset_out): - print(f"Length mismatch: Master has {len(m_zset_out)} elements, Slave has {len(s_zset_out)} elements") - - for i, (m_item, s_item) in enumerate(zip(m_zset_out, s_zset_out)): - if m_item != s_item: - print(f"Mismatch at rank {i + 1}: Master has {m_item}, Slave has {s_item}") - - assert m_zset_out == s_zset_out, f'Expected: zset_out on master == zset_out on slave, but got zset_out on slave:{s_zset_out}, zset_out on master:{m_zset_out}' - - print("test_zinterstore_replication OK [Passed]") - - -def test_sunionstore_replication(): - print("start test_sunionstore_replication") - - master = redis.Redis(host=master_ip, port=int(master_port), db=0) - slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) - if delay_slave_of: - slave.slaveof("no", "one") - else: - slave.slaveof(master_ip, master_port) - time.sleep(1) - - master.delete('set1', 'set2', 'set_out') - - def random_sunionstore_thread(): - pika = redis.Redis(host=master_ip, port=int(master_port), db=0) - for i in range(0, 10): - pika.sadd('set1', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sadd('set1', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sadd('set1', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sadd('set1', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sadd('set1', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sadd('set1', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sadd('set1', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sadd('set2', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.sunionstore('set_out', ['set1', 'set2']) - - threads = [] - for i in range(0, 10): - t = threading.Thread(target=random_sunionstore_thread) - threads.append(t) - - for t in threads: - t.start() - for t in threads: - t.join() - - if delay_slave_of: - slave.slaveof(master_ip, master_port) - time.sleep(25) - else: - time.sleep(10) - - m_set_out = master.smembers('set_out') - s_set_out = slave.smembers('set_out') - - assert m_set_out == s_set_out, f'Expected: set_out on master == set_out on slave, but got set_out on slave:{s_set_out}, set_out on master:{m_set_out}' - print("test_sunionstore_replication OK [Passed]") - - -def test_bitop_replication(): - print("start test_bitop_replication") - - master = redis.Redis(host=master_ip, port=int(master_port), db=0) - slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) - if delay_slave_of: - slave.slaveof("no", "one") - else: - slave.slaveof(master_ip, master_port) - time.sleep(1) - - master.delete('bitkey1', 'bitkey2', 'bitkey_out1', 'bitkey_out2') - - def random_bitop_thread(): - pika = redis.Redis(host=master_ip, port=int(master_port), db=0) - for i in range(0, 100): # Consider increasing the range to a larger number to get more meaningful results. - offset1 = random.randint(0, 100) # You may want to adjust the range based on your use case. - offset2 = random.randint(0, 100) # You may want to adjust the range based on your use case. - value1 = random.choice([0, 1]) - value2 = random.choice([0, 1]) - pika.setbit('bitkey1', offset1, value1) - pika.setbit('bitkey2', offset1, value1) - pika.bitop('AND', 'bitkey_out1', 'bitkey1', 'bitkey2') - pika.setbit('bitkey1', offset1 + offset2, value2) - pika.setbit('bitkey2', offset2, value2) - pika.bitop('OR', 'bitkey_out2', 'bitkey1', 'bitkey2') - - threads = [] - for i in range(0, 10): - t = threading.Thread(target=random_bitop_thread) - threads.append(t) - - for t in threads: - t.start() - for t in threads: - t.join() - if delay_slave_of: - slave.slaveof(master_ip, master_port) - time.sleep(25) - else: - time.sleep(10) - - m_key_out_count1 = master.bitcount('bitkey_out1') - s_key_out_count1 = slave.bitcount('bitkey_out1') - - m_key_out_count2 = master.bitcount('bitkey_out2') - s_key_out_count2 = slave.bitcount('bitkey_out2') - - assert m_key_out_count1 == s_key_out_count1, f'Expected: bitcount of bitkey_out1 on master == bitcount of bitkey_out1 on slave, but got bitcount of bitkey_out1 on slave:{s_key_out_count1}, bitcount of bitkey_out1 on master:{m_key_out_count1}' - assert m_key_out_count2 == s_key_out_count2, f'Expected: bitcount of bitkey_out2 on master == bitcount of bitkey_out2 on slave, but got bitcount of bitkey_out2 on slave:{s_key_out_count2}, bitcount of bitkey_out1 on master:{m_key_out_count2}' - print("test_bitop_replication OK [Passed]") - - -def test_pfmerge_replication(): - print("start test_pfmerge_replication") - - master = redis.Redis(host=master_ip, port=int(master_port), db=0) - slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) - if delay_slave_of: - slave.slaveof("no", "one") - else: - slave.slaveof(master_ip, master_port) - time.sleep(1) - - master.delete('hll1', 'hll2', 'hll_out') - - def random_pfmerge_thread(): - pika = redis.Redis(host=master_ip, port=int(master_port), db=0) - for i in range(0, 1): - pika.pfadd('hll1', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.pfadd('hll2', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.pfadd('hll2', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.pfadd('hll1', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.pfadd('hll2', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.pfadd('hll1', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.pfadd('hll2', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.pfadd('hll1', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.pfadd('hll_out', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - pika.pfmerge('hll_out', 'hll1', 'hll2') - pika.pfadd('hll_out', ''.join(random.choice(string.ascii_letters) for _ in range(5))) - - threads = [] - for i in range(0, 50): - t = threading.Thread(target=random_pfmerge_thread) - threads.append(t) - - for t in threads: - t.start() - for t in threads: - t.join() - if delay_slave_of: - slave.slaveof(master_ip, master_port) - time.sleep(25) - else: - time.sleep(10) - - m_hll_out = master.pfcount('hll_out') - s_hll_out = slave.pfcount('hll_out') - - assert m_hll_out == s_hll_out, f'Expected: hll_out on master == hll_out on slave, but got hll_out on slave:{s_hll_out}, hll_out on master:{m_hll_out}' - print("test_pfmerge_replication OK [Passed]") - -def test_migrateslot_replication(): - print("start test_migrateslot_replication") - master = redis.Redis(host=master_ip, port=int(master_port), db=0) - slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) - - # open slot migrate - master.config_set("slotmigrate", "yes") - slave.config_set("slotmigrate", "no") - - setKey1 = "setKey_000" - setKey2 = "setKey_001" - setKey3 = "setKey_002" - setKey4 = "setKey_store" - - slave.slaveof(master_ip, master_port) - time.sleep(20) - - master.delete(setKey1) - master.delete(setKey2) - master.delete(setKey3) - master.delete(setKey4) - - letters = string.ascii_letters - master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) - master.sdiffstore(setKey4, setKey1, setKey2) - - time.sleep(25) - - m_set1 = master.smembers(setKey1) - m_set2 = master.smembers(setKey2) - m_set3 = master.smembers(setKey3) - m_dest_set = master.smembers(setKey4) - s_set1 = slave.smembers(setKey1) - s_set2 = slave.smembers(setKey2) - s_set3 = slave.smembers(setKey3) - s_dest_set = slave.smembers(setKey4) - - assert m_set1 == s_set1, f'Expected: set1 on master == set1 on slave, but got set1 on slave:{s_set1}, set1 on master:{m_set1}' - assert m_set2 == s_set2, f'Expected: set2 on master == set2 on slave, but got set2 on slave:{s_set2}, set2 on master:{m_set2}' - assert m_set3 == s_set3, f'Expected: set3 on master == set3 on slave, but got set3 on slave:{s_set3}, set3 on master:{m_set3}' - assert m_dest_set == s_dest_set, f'Expected: dest_set on master == dest_set on slave, but got dest_set on slave:{s_dest_set}, dest_set on master:{m_dest_set}' - - # disconnect mster and slave - slave.slaveof("no", "one") - - master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) - master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) - master.sdiffstore(setKey4, setKey1, setKey2) - - # reconnect mster and slave - slave.slaveof(master_ip, master_port) - time.sleep(25) - - m_set1 = master.smembers(setKey1) - m_set2 = master.smembers(setKey2) - m_set3 = master.smembers(setKey3) - m_dest_set = master.smembers(setKey4) - time.sleep(15) - s_set1 = slave.smembers(setKey1) - s_set2 = slave.smembers(setKey2) - s_set3 = slave.smembers(setKey3) - s_dest_set = slave.smembers(setKey4) - - assert m_set1 == s_set1, f'Expected: set1 on master == set1 on slave, but got set1 on slave:{s_set1}, set1 on master:{m_set1}' - assert m_set2 == s_set2, f'Expected: set2 on master == set2 on slave, but got set2 on slave:{s_set2}, set2 on master:{m_set2}' - assert m_set3 == s_set3, f'Expected: set3 on master == set3 on slave, but got set3 on slave:{s_set3}, set3 on master:{m_set3}' - assert m_dest_set == s_dest_set, f'Expected: dest_set on master == dest_set on slave, but got dest_set on slave:{s_dest_set}, dest_set on master:{m_dest_set}' - - # slave node should not has slot key - s_keys = slave.keys() - for key in s_keys: - assert not (str(key).startswith("_internal:slotkey:4migrate:") or str(key).startswith("_internal:slottag:4migrate:")), f'Expected: slave should not has slot key, but got {key}' - - master.config_set("slotmigrate", "no") - - i_keys = master.keys("_internal:slotkey:4migrate*") - master.delete(*i_keys) - print("test_migrateslot_replication OK [Passed]") - -master_ip = '127.0.0.1' -master_port = '9221' -slave_ip = '127.0.0.1' -slave_port = '9231' - - -# ---------For Github Action---------Start -# Simulate the slave server goes down and After being disconnected for a while, it reconnects to the master server. -delay_slave_of = False #Don't change this to True, unless you've added a long sleep after every slaveof - -test_migrateslot_replication() -master = redis.Redis(host=master_ip, port=int(master_port), db=0) -slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) -slave.slaveof(master_ip, master_port) -time.sleep(25) -test_rpoplpush_replication() -test_bitop_replication() -test_msetnx_replication() -test_mset_replication() -test_smove_replication() -test_del_replication() -test_pfmerge_replication() -test_sdiffstore_replication() -test_sinterstore_replication() -test_zunionstore_replication() -test_zinterstore_replication() -test_sunionstore_replication() -# ---------For Github Action---------End - - -# ---------For Local Stress Test---------Start -# delay_slave_of = False -# for i in range(0, 200): -# test_rpoplpush_replication() -# test_bitop_replication() -# test_del_replication() -# test_msetnx_replication() -# test_mset_replication() -# test_smove_replication() -# test_pfmerge_replication() -# test_sunionstore_replication() -# test_sdiffstore_replication() -# test_sinterstore_replication() -# test_zunionstore_replication() -# test_zinterstore_replication() -# delay_slave_of = not delay_slave_of -# ---------For Local Stress Test---------End - diff --git a/tests/integration/replication_test.go b/tests/integration/replication_test.go new file mode 100644 index 0000000000..a8ac037721 --- /dev/null +++ b/tests/integration/replication_test.go @@ -0,0 +1,463 @@ +package pika_integration + +import ( + "context" + "fmt" + "math/rand" + "strings" + "sync" + "time" + + . "github.com/bsm/ginkgo/v2" + . "github.com/bsm/gomega" + "github.com/redis/go-redis/v9" +) + +type command_func func(*context.Context, *redis.Client, *sync.WaitGroup) + +func cleanEnv(ctx context.Context, clientMaster, clientSlave *redis.Client) { + r := clientSlave.Do(ctx, "slaveof", "no", "one") + Expect(r.Err()).NotTo(HaveOccurred()) + Expect(r.Val()).To(Equal("OK")) + r = clientSlave.Do(ctx, "clearreplicationid") + r = clientMaster.Do(ctx, "clearreplicationid") + time.Sleep(1 * time.Second) +} + +func trySlave(ctx context.Context, clientSlave *redis.Client, ip string, port string) bool { + Expect(clientSlave.Do(ctx, "slaveof", ip, port).Val()).To(Equal("OK")) + infoRes := clientSlave.Info(ctx, "replication") + Expect(infoRes.Err()).NotTo(HaveOccurred()) + Expect(infoRes.Val()).To(ContainSubstring("role:slave")) + var count = 0 + for { + infoRes = clientSlave.Info(ctx, "replication") + Expect(infoRes.Err()).NotTo(HaveOccurred()) + count++ + if strings.Contains(infoRes.Val(), "master_link_status:up") { + return true + } else if count > 200 { + return false + } + time.Sleep(100 * time.Millisecond) + } +} + +func randomString(length int) string { + rand.Seed(time.Now().UnixNano()) + b := make([]byte, length) + rand.Read(b) + return fmt.Sprintf("%x", b)[:length] +} + +func randomInt(max int) int { + rand.Seed(time.Now().UnixNano()) + return rand.Intn(max) +} + +func rpoplpushThread(ctx *context.Context, clientMaster *redis.Client, wg *sync.WaitGroup) { + defer wg.Done() + for i := 0; i < 5; i++ { + letters1 := randomString(5) + letters2 := randomString(5) + letters3 := randomString(5) + + clientMaster.LPush(*ctx, "blist0", letters1) + clientMaster.RPopLPush(*ctx, "blist0", "blist") + clientMaster.LPush(*ctx, "blist", letters1, letters2, letters3) + + clientMaster.LPop(*ctx, "blist") + clientMaster.RPop(*ctx, "blist") + clientMaster.LPush(*ctx, "blist0", letters3) + clientMaster.RPopLPush(*ctx, "blist0", "blist") + clientMaster.RPush(*ctx, "blist", letters3, letters2, letters1) + clientMaster.LPop(*ctx, "blist") + clientMaster.LPush(*ctx, "blist0", letters2) + clientMaster.RPopLPush(*ctx, "blist0", "blist") + clientMaster.RPop(*ctx, "blist") + } + +} + +func randomBitopThread(ctx *context.Context, clientMaster *redis.Client, wg *sync.WaitGroup) { + defer wg.Done() + for i := 0; i < 10; i++ { + offset1 := randomInt(50) + offset2 := randomInt(50) + value1 := randomInt(1) + value2 := randomInt(1) + + clientMaster.SetBit(*ctx, "bitkey1", int64(offset1), value1) + clientMaster.SetBit(*ctx, "bitkey2", int64(offset1), value1) + clientMaster.BitOpAnd(*ctx, "bitkey_out", "bitkey1", "bitkey2") + clientMaster.SetBit(*ctx, "bitkey1", int64(offset1+offset2), value2) + clientMaster.SetBit(*ctx, "bitkey2", int64(offset2), value2) + clientMaster.BitOpOr(*ctx, "bitkey_out2", "bitkey1", "bitkey2") + } + +} + +func randomSmoveThread(ctx *context.Context, clientMaster *redis.Client, wg *sync.WaitGroup) { + defer wg.Done() + member := randomString(5) + clientMaster.SAdd(*ctx, "sourceSet", member) + clientMaster.SAdd(*ctx, "sourceSet", member) + clientMaster.SAdd(*ctx, "sourceSet", member) + clientMaster.SRem(*ctx, "destSet", member) + clientMaster.SRem(*ctx, "destSet", member) + clientMaster.SMove(*ctx, "sourceSet", "destSet", member) +} + +func randomSdiffstoreThread(ctx *context.Context, clientMaster *redis.Client, wg *sync.WaitGroup) { + defer wg.Done() + for i := 0; i < 5; i++ { + clientMaster.SAdd(*ctx, "set1", randomString(5)) + clientMaster.SAdd(*ctx, "set2", randomString(5)) + clientMaster.SAdd(*ctx, "set1", randomString(5)) + clientMaster.SAdd(*ctx, "set2", randomString(5)) + clientMaster.SAdd(*ctx, "set1", randomString(5)) + clientMaster.SAdd(*ctx, "set2", randomString(5)) + clientMaster.SAdd(*ctx, "set1", randomString(5)) + clientMaster.SAdd(*ctx, "set2", randomString(5)) + clientMaster.SAdd(*ctx, "set1", randomString(5)) + clientMaster.SAdd(*ctx, "set2", randomString(5)) + clientMaster.SAdd(*ctx, "set1", randomString(5)) + clientMaster.SAdd(*ctx, "set2", randomString(5)) + clientMaster.SAdd(*ctx, "dest_set", randomString(5)) + clientMaster.SDiffStore(*ctx, "dest_set", "set1", "set2") + } +} + +func randomSinterstoreThread(ctx *context.Context, clientMaster *redis.Client, wg *sync.WaitGroup) { + defer wg.Done() + for i := 0; i < 5; i++ { + member := randomString(5) + member2 := randomString(5) + member3 := randomString(5) + member4 := randomString(5) + member5 := randomString(5) + member6 := randomString(5) + clientMaster.SAdd(*ctx, "set1", member) + clientMaster.SAdd(*ctx, "set2", member) + clientMaster.SAdd(*ctx, "set1", member2) + clientMaster.SAdd(*ctx, "set2", member2) + clientMaster.SAdd(*ctx, "set1", member3) + clientMaster.SAdd(*ctx, "set2", member3) + clientMaster.SAdd(*ctx, "set1", member4) + clientMaster.SAdd(*ctx, "set2", member4) + clientMaster.SAdd(*ctx, "set1", member5) + clientMaster.SAdd(*ctx, "set2", member5) + clientMaster.SAdd(*ctx, "set1", member6) + clientMaster.SAdd(*ctx, "set2", member6) + clientMaster.SInterStore(*ctx, "dest_set", "set1", "set2") + } +} + +func test_del_replication(ctx *context.Context, clientMaster, clientSlave *redis.Client) { + clientMaster.Del(*ctx, "blist0", "blist1", "blist2", "blist3") + clientMaster.Del(*ctx, "blist100", "blist101", "blist102", "blist103") + clientMaster.Del(*ctx, "blist0", "blist1", "blist2", "blist3") + clientMaster.RPush(*ctx, "blist3", "v2") + clientMaster.RPush(*ctx, "blist2", "v2") + clientMaster.RPop(*ctx, "blist2") + clientMaster.LPop(*ctx, "blist3") + + clientMaster.LPush(*ctx, "blist2", "v2") + clientMaster.LPop(*ctx, "blist2") + clientMaster.RPush(*ctx, "blist3", "v2") + clientMaster.RPop(*ctx, "blist3") + + clientMaster.LPush(*ctx, "blist2", "v2") + clientMaster.LPop(*ctx, "blist2") + clientMaster.RPush(*ctx, "blist3", "v2") + clientMaster.LPush(*ctx, "blist2", "v2") + + clientMaster.RPop(*ctx, "blist3") + clientMaster.LPop(*ctx, "blist2") + clientMaster.LPush(*ctx, "blist2", "v2") + clientMaster.RPush(*ctx, "blist3", "v2") + + clientMaster.RPop(*ctx, "blist3") + clientMaster.LPop(*ctx, "blist2") + clientMaster.RPush(*ctx, "blist3", "v2") + clientMaster.LPush(*ctx, "blist2", "v2") + + clientMaster.RPop(*ctx, "blist3") + clientMaster.RPush(*ctx, "blist3", "v2") + clientMaster.LPush(*ctx, "blist2", "v2") + clientMaster.RPush(*ctx, "blist3", "v2") + + clientMaster.RPush(*ctx, "blist3", "v2") + clientMaster.LPush(*ctx, "blist2", "v2") + clientMaster.RPush(*ctx, "blist3", "v2") + clientMaster.LPush(*ctx, "blist2", "v2") + + clientMaster.LPush(*ctx, "blist2", "v2") + clientMaster.RPush(*ctx, "blist3", "v2") + clientMaster.Del(*ctx, "blist1", "large", "blist2") + + clientMaster.RPush(*ctx, "blist1", "a", "latge", "c") + clientMaster.RPush(*ctx, "blist2", "d", "latge", "f") + + clientMaster.LPop(*ctx, "blist1") + clientMaster.RPop(*ctx, "blist1") + clientMaster.LPop(*ctx, "blist2") + clientMaster.RPop(*ctx, "blist2") + + clientMaster.Del(*ctx, "blist3") + clientMaster.LPop(*ctx, "blist2") + clientMaster.RPop(*ctx, "blist1") + time.Sleep(15 * time.Second) + + for i := int64(0); i < clientMaster.LLen(*ctx, "blist1").Val(); i++ { + Expect(clientMaster.LIndex(*ctx, "blist", i)).To(Equal(clientSlave.LIndex(*ctx, "blist", i))) + } + for i := int64(0); i < clientMaster.LLen(*ctx, "blist2").Val(); i++ { + Expect(clientMaster.LIndex(*ctx, "blist2", i)).To(Equal(clientSlave.LIndex(*ctx, "blist2", i))) + } + for i := int64(0); i < clientMaster.LLen(*ctx, "blist3").Val(); i++ { + Expect(clientMaster.LIndex(*ctx, "blist3", i)).To(Equal(clientSlave.LIndex(*ctx, "blist3", i))) + } + +} + +func randomZunionstoreThread(ctx *context.Context, clientMaster *redis.Client, wg *sync.WaitGroup) { + defer wg.Done() + for i := 0; i < 5; i++ { + clientMaster.Do(*ctx, "zadd", "zset1", randomInt(10), randomString(5)) + clientMaster.Do(*ctx, "zadd", "zset2", randomInt(10), randomString(5)) + clientMaster.Do(*ctx, "zadd", "zset2", randomInt(10), randomString(5)) + clientMaster.Do(*ctx, "zadd", "zset1", randomInt(10), randomString(5)) + clientMaster.Do(*ctx, "zadd", "zset2", randomInt(10), randomString(5)) + clientMaster.Do(*ctx, "zadd", "zset1", randomInt(10), randomString(5)) + clientMaster.Do(*ctx, "zadd", "zset2", randomInt(10), randomString(5)) + clientMaster.Do(*ctx, "zadd", "zset2", randomInt(10), randomString(5)) + clientMaster.Do(*ctx, "zadd", "zset1", randomInt(10), randomString(5)) + clientMaster.Do(*ctx, "zadd", "zset1", randomInt(10), randomString(5)) + clientMaster.Do(*ctx, "zadd", "zset2", randomInt(10), randomString(5)) + clientMaster.Do(*ctx, "zadd", "zset1", randomInt(10), randomString(5)) + clientMaster.ZUnionStore(*ctx, "zset_out", &redis.ZStore{Keys: []string{"zset1", "zset2"}, Weights: []float64{1, 1}}) + clientMaster.Do(*ctx, "zadd", "zset_out", randomInt(10), randomString(5)) + } +} + +func randomZinterstoreThread(ctx *context.Context, clientMaster *redis.Client, wg *sync.WaitGroup) { + defer wg.Done() + for i := 0; i < 5; i++ { + member := randomString(5) + member2 := randomString(5) + member3 := randomString(5) + member4 := randomString(5) + clientMaster.Do(*ctx, "zadd", "zset1", randomInt(5), member) + clientMaster.Do(*ctx, "zadd", "zset2", randomInt(5), member) + clientMaster.Do(*ctx, "zadd", "zset2", randomInt(5), member2) + clientMaster.Do(*ctx, "zadd", "zset1", randomInt(5), member2) + clientMaster.Do(*ctx, "zadd", "zset2", randomInt(5), member3) + clientMaster.Do(*ctx, "zadd", "zset1", randomInt(5), member3) + clientMaster.Do(*ctx, "zadd", "zset2", randomInt(5), member4) + clientMaster.Do(*ctx, "zadd", "zset2", randomInt(5), member4) + clientMaster.ZInterStore(*ctx, "zset_out", &redis.ZStore{Keys: []string{"zset1", "zset2"}, Weights: []float64{1, 1}}) + } +} + +func randomSunionstroeThread(ctx *context.Context, clientMaster *redis.Client, wg *sync.WaitGroup) { + defer wg.Done() + for i := 0; i < 5; i++ { + clientMaster.SAdd(*ctx, "set1", randomString(5)) + clientMaster.SAdd(*ctx, "set2", randomString(5)) + clientMaster.SAdd(*ctx, "set1", randomString(5)) + clientMaster.SAdd(*ctx, "set1", randomString(5)) + clientMaster.SAdd(*ctx, "set2", randomString(5)) + clientMaster.SAdd(*ctx, "set1", randomString(5)) + clientMaster.SAdd(*ctx, "set1", randomString(5)) + clientMaster.SAdd(*ctx, "set2", randomString(5)) + clientMaster.SAdd(*ctx, "set2", randomString(5)) + clientMaster.SAdd(*ctx, "set2", randomString(5)) + clientMaster.SAdd(*ctx, "set1", randomString(5)) + clientMaster.SAdd(*ctx, "set2", randomString(5)) + clientMaster.SAdd(*ctx, "set1", randomString(5)) + clientMaster.SAdd(*ctx, "set2", randomString(5)) + clientMaster.SUnionStore(*ctx, "set_out", "set1", "set2") + } +} + +func execute(ctx *context.Context, clientMaster *redis.Client, num_thread int, f command_func) { + var wg sync.WaitGroup + wg.Add(num_thread) + for i := 1; i <= num_thread; i++ { + go f(ctx, clientMaster, &wg) + } + wg.Wait() + time.Sleep(10 * time.Second) +} + +//func randomPfmergeThread(ctx *context.Context, clientMaster *redis.Client) { +// clientMaster.PFAdd(*ctx, "hll1", randomString(5)) +// clientMaster.PFAdd(*ctx, "hll2", randomString(5)) +// clientMaster.PFAdd(*ctx, "hll2", randomString(5)) +// clientMaster.PFAdd(*ctx, "hll1", randomString(5)) +// clientMaster.PFAdd(*ctx, "hll2", randomString(5)) +// clientMaster.PFAdd(*ctx, "hll1", randomString(5)) +// clientMaster.PFAdd(*ctx, "hll2", randomString(5)) +// clientMaster.PFAdd(*ctx, "hll1", randomString(5)) +// clientMaster.PFAdd(*ctx, "hll_out", randomString(5)) +// clientMaster.PFMerge(*ctx, "hll_out", "hll1", "hll2") +// clientMaster.PFAdd(*ctx, "hll_out", randomString(5)) +//} + +var _ = Describe("shuould replication ", func() { + Describe("all replication test", func() { + ctx := context.TODO() + var clientSlave *redis.Client + var clientMaster *redis.Client + + BeforeEach(func() { + clientMaster = redis.NewClient(pikaOptions1()) + clientSlave = redis.NewClient(pikaOptions2()) + cleanEnv(ctx, clientMaster, clientSlave) + Expect(clientSlave.FlushDB(ctx).Err()).NotTo(HaveOccurred()) + Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred()) + time.Sleep(1 * time.Second) + }) + AfterEach(func() { + cleanEnv(ctx, clientMaster, clientSlave) + Expect(clientSlave.FlushDB(ctx).Err()).NotTo(HaveOccurred()) + Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred()) + time.Sleep(1 * time.Second) + Expect(clientSlave.Close()).NotTo(HaveOccurred()) + Expect(clientMaster.Close()).NotTo(HaveOccurred()) + }) + It("Let The slave become a replica of The master ", func() { + infoRes := clientSlave.Info(ctx, "replication") + Expect(infoRes.Err()).NotTo(HaveOccurred()) + Expect(infoRes.Val()).To(ContainSubstring("role:master")) + infoRes = clientMaster.Info(ctx, "replication") + Expect(infoRes.Err()).NotTo(HaveOccurred()) + Expect(infoRes.Val()).To(ContainSubstring("role:master")) + Expect(clientSlave.Do(ctx, "slaveof", "127.0.0.1", "9231").Err()).To(MatchError("ERR The master ip:port and the slave ip:port are the same")) + + var count = 0 + for { + res := trySlave(ctx, clientSlave, "127.0.0.1", "9221") + if res { + break + } else if count > 4 { + break + } else { + cleanEnv(ctx, clientMaster, clientSlave) + count++ + } + } + + infoRes = clientSlave.Info(ctx, "replication") + Expect(infoRes.Err()).NotTo(HaveOccurred()) + Expect(infoRes.Val()).To(ContainSubstring("master_link_status:up")) + + infoRes = clientMaster.Info(ctx, "replication") + Expect(infoRes.Err()).NotTo(HaveOccurred()) + Expect(infoRes.Val()).To(ContainSubstring("connected_slaves:1")) + + slaveWrite := clientSlave.Set(ctx, "foo", "bar", 0) + Expect(slaveWrite.Err()).To(MatchError("ERR Server in read-only")) + + clientMaster.Del(ctx, "blist0", "blist1", "blist") + execute(&ctx, clientMaster, 4, rpoplpushThread) + for i := int64(0); i < clientMaster.LLen(ctx, "blist").Val(); i++ { + Expect(clientMaster.LIndex(ctx, "blist", i)).To(Equal(clientSlave.LIndex(ctx, "blist", i))) + } + + Expect(clientMaster.Del(ctx, "bitkey1", "bitkey2", "bitkey_out1", "bitkey_out2").Err()).NotTo(HaveOccurred()) + execute(&ctx, clientMaster, 4, randomBitopThread) + master_key_out_count1 := clientMaster.Do(ctx, "bitcount", "bitkey_out1", 0, -1) + slave_key_out_count1 := clientSlave.Do(ctx, "bitcount", "bitkey_out1", 0, -1) + Expect(master_key_out_count1.Val()).To(Equal(slave_key_out_count1.Val())) + + master_key_out_count2 := clientMaster.Do(ctx, "bitcount", "bitkey_out2", 0, -1) + slave_key_out_count2 := clientSlave.Do(ctx, "bitcount", "bitkey_out2", 0, -1) + Expect(master_key_out_count2.Val()).To(Equal(slave_key_out_count2.Val())) + + clientMaster.Del(ctx, "source_set", "dest_set") + execute(&ctx, clientMaster, 4, randomSmoveThread) + master_source_set := clientMaster.SMembers(ctx, "sourceSet") + Expect(master_source_set.Err()).NotTo(HaveOccurred()) + slave_source_set := clientSlave.SMembers(ctx, "sourceSet") + Expect(slave_source_set.Err()).NotTo(HaveOccurred()) + Expect(master_source_set.Val()).To(Equal(slave_source_set.Val())) + + master_dest_set := clientMaster.SMembers(ctx, "destSet") + Expect(master_dest_set.Err()).NotTo(HaveOccurred()) + slave_dest_set := clientSlave.SMembers(ctx, "destSet") + Expect(slave_dest_set.Err()).NotTo(HaveOccurred()) + Expect(master_dest_set.Val()).To(Equal(slave_dest_set.Val())) + + test_del_replication(&ctx, clientMaster, clientSlave) + + clientMaster.Del(ctx, "set1", "set2", "dest_set") + execute(&ctx, clientMaster, 4, randomSdiffstoreThread) + master_set1 := clientMaster.SMembers(ctx, "set1") + Expect(master_set1.Err()).NotTo(HaveOccurred()) + slave_set1 := clientSlave.SMembers(ctx, "set1") + Expect(slave_set1.Err()).NotTo(HaveOccurred()) + Expect(master_set1.Val()).To(Equal(slave_set1.Val())) + + master_set2 := clientMaster.SMembers(ctx, "set2") + Expect(master_set2.Err()).NotTo(HaveOccurred()) + slave_set2 := clientSlave.SMembers(ctx, "set2") + Expect(slave_set2.Err()).NotTo(HaveOccurred()) + Expect(master_set2.Val()).To(Equal(slave_set2.Val())) + + master_dest_store_set := clientMaster.SMembers(ctx, "dest_set") + Expect(master_dest_store_set.Err()).NotTo(HaveOccurred()) + slave_dest_store_set := clientSlave.SMembers(ctx, "dest_set") + Expect(slave_dest_store_set.Err()).NotTo(HaveOccurred()) + Expect(master_dest_store_set.Val()).To(Equal(slave_dest_store_set.Val())) + + clientMaster.Del(ctx, "set1", "set2", "dest_set") + execute(&ctx, clientMaster, 4, randomSinterstoreThread) + master_dest_interstore_set := clientMaster.SMembers(ctx, "dest_set") + Expect(master_dest_interstore_set.Err()).NotTo(HaveOccurred()) + slave_dest_interstore_set := clientSlave.SMembers(ctx, "dest_set") + Expect(slave_dest_interstore_set.Err()).NotTo(HaveOccurred()) + Expect(master_dest_interstore_set.Val()).To(Equal(slave_dest_interstore_set.Val())) + + //clientMaster.FlushAll(ctx) + //time.Sleep(3 * time.Second) + //go randomPfmergeThread(&ctx, clientMaster) + //go randomPfmergeThread(&ctx, clientMaster) + //go randomPfmergeThread(&ctx, clientMaster) + //go randomPfmergeThread(&ctx, clientMaster) + //time.Sleep(10 * time.Second) + //master_hll_out := clientMaster.PFCount(ctx, "hll_out") + //Expect(master_hll_out.Err()).NotTo(HaveOccurred()) + //slave_hll_out := clientSlave.PFCount(ctx, "hll_out") + //Expect(slave_hll_out.Err()).NotTo(HaveOccurred()) + //Expect(master_hll_out.Val()).To(Equal(slave_hll_out.Val())) + + clientMaster.Del(ctx, "zset1", "zset2", "zset_out") + execute(&ctx, clientMaster, 4, randomZunionstoreThread) + master_zset_out := clientMaster.ZRange(ctx, "zset_out", 0, -1) + Expect(master_zset_out.Err()).NotTo(HaveOccurred()) + slave_zset_out := clientSlave.ZRange(ctx, "zset_out", 0, -1) + Expect(slave_zset_out.Err()).NotTo(HaveOccurred()) + Expect(master_zset_out.Val()).To(Equal(slave_zset_out.Val())) + + clientMaster.Del(ctx, "zset1", "zset2", "zset_out") + execute(&ctx, clientMaster, 4, randomZinterstoreThread) + master_dest_interstore_set = clientMaster.SMembers(ctx, "dest_set") + Expect(master_dest_interstore_set.Err()).NotTo(HaveOccurred()) + slave_dest_interstore_set = clientSlave.SMembers(ctx, "dest_set") + Expect(slave_dest_interstore_set.Err()).NotTo(HaveOccurred()) + Expect(master_dest_interstore_set.Val()).To(Equal(slave_dest_interstore_set.Val())) + + clientMaster.Del(ctx, "set1", "set2", "set_out") + execute(&ctx, clientMaster, 4, randomSunionstroeThread) + master_unionstore_set := clientMaster.SMembers(ctx, "set_out") + Expect(master_unionstore_set.Err()).NotTo(HaveOccurred()) + slave_unionstore_set := clientSlave.SMembers(ctx, "set_out") + Expect(slave_unionstore_set.Err()).NotTo(HaveOccurred()) + Expect(master_unionstore_set.Val()).To(Equal(slave_unionstore_set.Val())) + }) + + }) + +})