From 34433070f3ad597b7bc52c15b732ebc177c2c33d Mon Sep 17 00:00:00 2001 From: ikolomi Date: Mon, 8 Jul 2024 12:33:24 +0300 Subject: [PATCH 1/2] 1. Enabled all tests on macos latest with all engine versions. 2. Changed pubsub tests to be run in separate task. 3. Fixed uds message parsing in glide-core. 4. Refactored pubsub tests --- .github/workflows/python.yml | 87 +- glide-core/src/socket_listener.rs | 12 +- python/pytest.ini | 1 - python/pytest_pubsub.ini | 4 + python/python/glide/glide_client.py | 2 +- python/python/tests/test_pubsub.py | 2155 ++++++++++++++++----------- 6 files changed, 1388 insertions(+), 873 deletions(-) create mode 100644 python/pytest_pubsub.ini diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index df228b43e2..85f7038baf 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -52,7 +52,7 @@ jobs: test-ubuntu-latest: runs-on: ubuntu-latest needs: load-engine-matrix - timeout-minutes: 25 + timeout-minutes: 35 strategy: fail-fast: false matrix: @@ -128,6 +128,46 @@ jobs: with: language-flag: -python + test-pubsub-ubuntu-latest: + runs-on: ubuntu-latest + needs: load-engine-matrix + timeout-minutes: 35 + strategy: + fail-fast: false + matrix: + engine: ${{ fromJson(needs.load-engine-matrix.outputs.matrix) }} + python: + - "3.8" + - "3.9" + - "3.10" + - "3.11" + - "3.12" + + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python }} + + - name: Build Python wrapper + uses: ./.github/workflows/build-python-wrapper + with: + os: "ubuntu" + target: "x86_64-unknown-linux-gnu" + github-token: ${{ secrets.GITHUB_TOKEN }} + engine-version: ${{ matrix.engine.version }} + + - name: Test pubsub with pytest + working-directory: ./python + run: | + source .env/bin/activate + cd python/tests/ + pytest -c ../../pytest_pubsub.ini --asyncio-mode=auto test_pubsub.py::TestPubSub + lint-rust: runs-on: ubuntu-latest timeout-minutes: 15 @@ -141,9 +181,14 @@ jobs: cargo-toml-folder: ./python name: lint python-rust - build-macos-latest: + test-macos-latest: runs-on: macos-latest - timeout-minutes: 25 + needs: load-engine-matrix + timeout-minutes: 35 + strategy: + fail-fast: false + matrix: + engine: ${{ fromJson(needs.load-engine-matrix.outputs.matrix) }} steps: - uses: actions/checkout@v4 with: @@ -157,13 +202,43 @@ jobs: os: "macos" target: "aarch64-apple-darwin" github-token: ${{ secrets.GITHUB_TOKEN }} - engine-version: "7.2.5" + engine-version: ${{ matrix.engine.version }} - - name: Test compatibility with pytest + - name: Test with pytest working-directory: ./python run: | source .env/bin/activate - pytest --asyncio-mode=auto -m smoke_test + pytest --asyncio-mode=auto + + test-pubsub-macos-latest: + runs-on: macos-latest + needs: load-engine-matrix + timeout-minutes: 35 + strategy: + fail-fast: false + matrix: + engine: ${{ fromJson(needs.load-engine-matrix.outputs.matrix) }} + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + - name: Set up Homebrew + uses: Homebrew/actions/setup-homebrew@master + + - name: Build Python wrapper + uses: ./.github/workflows/build-python-wrapper + with: + os: "macos" + target: "aarch64-apple-darwin" + github-token: ${{ secrets.GITHUB_TOKEN }} + engine-version: ${{ matrix.engine.version }} + + - name: Test pubsub with pytest + working-directory: ./python + run: | + source .env/bin/activate + cd python/tests/ + pytest -c ../../pytest_pubsub.ini --asyncio-mode=auto test_pubsub.py::TestPubSub build-amazonlinux-latest: runs-on: ubuntu-latest diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index bce4c8bf6c..ba60676391 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -117,9 +117,15 @@ impl UnixStreamListener { return ReadSocketClosed.into(); } Ok(_) => { - return match self.rotating_buffer.get_requests() { - Ok(requests) => ReceivedValues(requests), - Err(err) => UnhandledError(err.into()).into(), + match self.rotating_buffer.get_requests() { + Ok(requests) => { + if !requests.is_empty() { + // continue to read from socket + return ReceivedValues(requests); + } + continue; + } + Err(err) => return UnhandledError(err.into()).into(), }; } Err(ref e) diff --git a/python/pytest.ini b/python/pytest.ini index 7c29dff74f..236b85a8ad 100644 --- a/python/pytest.ini +++ b/python/pytest.ini @@ -1,5 +1,4 @@ [pytest] markers = smoke_test: mark a test as a build verification testing. -# TODO: Remove pubsub exclusion after the flakey tests are fixed addopts = -k "not redis_modules and not pubsub" diff --git a/python/pytest_pubsub.ini b/python/pytest_pubsub.ini new file mode 100644 index 0000000000..bf42185756 --- /dev/null +++ b/python/pytest_pubsub.ini @@ -0,0 +1,4 @@ +[pytest] +markers = + smoke_test: mark a test as a build verification testing. +addopts = -k "not redis_modules" diff --git a/python/python/glide/glide_client.py b/python/python/glide/glide_client.py index 2ee030c555..0ba5a87c73 100644 --- a/python/python/glide/glide_client.py +++ b/python/python/glide/glide_client.py @@ -152,7 +152,7 @@ async def close(self, err_message: Optional[str] = None) -> None: try: self._pubsub_lock.acquire() for pubsub_future in self._pubsub_futures: - if not response_future.done() and not pubsub_future.cancelled(): + if not pubsub_future.done() and not pubsub_future.cancelled(): pubsub_future.set_exception(ClosingError("")) finally: self._pubsub_lock.release() diff --git a/python/python/tests/test_pubsub.py b/python/python/tests/test_pubsub.py index ae33e5ae0e..87a37dc954 100644 --- a/python/python/tests/test_pubsub.py +++ b/python/python/tests/test_pubsub.py @@ -15,7 +15,7 @@ ) from glide.constants import OK from glide.exceptions import ConfigurationError -from glide.glide_client import GlideClient, GlideClusterClient, TGlideClient +from glide.glide_client import BaseClient, GlideClient, GlideClusterClient, TGlideClient from tests.conftest import create_client from tests.utils.utils import check_if_server_version_lt, get_random_string @@ -33,49 +33,57 @@ class MethodTesting(IntEnum): "Uses callback-based subscription method." -async def create_two_clients( +async def create_two_clients_with_pubsub( request, cluster_mode, - pub_sub, - pub_sub2: Optional[Any] = None, + client1_pubsub: Optional[Any] = None, + client2_pubsub: Optional[Any] = None, protocol: ProtocolVersion = ProtocolVersion.RESP3, + timeout: Optional[int] = None, ) -> Tuple[ Union[GlideClient, GlideClusterClient], Union[GlideClient, GlideClusterClient] ]: """ - Sets 2 up clients for testing purposes. + Sets 2 up clients for testing purposes with optional pubsub configuration. Args: request: pytest request for creating a client. cluster_mode: the cluster mode. - pub_sub: pubsub configuration subscription for a client. - pub_sub2: pubsub configuration subscription for a client. + client1_pubsub: pubsub configuration subscription for the first client. + client2_pubsub: pubsub configuration subscription for the second client. protocol: what protocol to use, used for the test: `test_pubsub_resp2_raise_an_error`. """ - cluster_mode_pubsub, standalone_mode_pubsub = None, None + cluster_mode_pubsub1, standalone_mode_pubsub1 = None, None cluster_mode_pubsub2, standalone_mode_pubsub2 = None, None if cluster_mode: - cluster_mode_pubsub = pub_sub - cluster_mode_pubsub2 = pub_sub2 + cluster_mode_pubsub1 = client1_pubsub + cluster_mode_pubsub2 = client2_pubsub else: - standalone_mode_pubsub = pub_sub - standalone_mode_pubsub2 = pub_sub2 + standalone_mode_pubsub1 = client1_pubsub + standalone_mode_pubsub2 = client2_pubsub - client = await create_client( + client1 = await create_client( request, cluster_mode=cluster_mode, - cluster_mode_pubsub=cluster_mode_pubsub2, - standalone_mode_pubsub=standalone_mode_pubsub2, + cluster_mode_pubsub=cluster_mode_pubsub1, + standalone_mode_pubsub=standalone_mode_pubsub1, protocol=protocol, + timeout=timeout, ) - client2 = await create_client( - request, - cluster_mode=cluster_mode, - cluster_mode_pubsub=cluster_mode_pubsub, - standalone_mode_pubsub=standalone_mode_pubsub, - protocol=protocol, - ) - return client, client2 + try: + client2 = await create_client( + request, + cluster_mode=cluster_mode, + cluster_mode_pubsub=cluster_mode_pubsub2, + standalone_mode_pubsub=standalone_mode_pubsub2, + protocol=protocol, + timeout=timeout, + ) + except Exception as e: + await client1.close() + raise e + + return client1, client2 def decode_pubsub_msg(msg: Optional[CoreCommands.PubSubMsg]) -> CoreCommands.PubSubMsg: @@ -148,6 +156,64 @@ def new_message(msg: CoreCommands.PubSubMsg, context: Any): received_messages.append(msg) +async def client_cleanup( + client: Union[GlideClient, GlideClusterClient], + cluster_mode_subs: Optional[ + GlideClusterClientConfiguration.PubSubSubscriptions + ] = None, +): + """ + This function tries its best to clear state assosiated with client + Its explicitly calls client.close() and deletes the object + In addition, it tries to clean up cluster mode subsciptions since it was found the closing the client via close() is not enough. + Note that unsubscribing is not feasible in the current implementation since its unknown on which node the subs are configured + """ + # if is_cluster: + # pubsub_subs = cast(ClusterClientConfiguration.PubSubSubscriptions, pubsub_subs) + # else : + # pubsub_subs = cast(GlideClientConfiguration.PubSubSubscriptions, pubsub_subs) + + # for channel_type, channel_patterns in pubsub_subs: + # if channel_type == ClusterClientConfiguration.PubSubChannelModes.Exact or channel_type == GlideClientConfiguration.PubSubChannelModes.Exact: + # cmd = "UNSUBSCRIBE" + # elif channel_type == ClusterClientConfiguration.PubSubChannelModes.Pattern or channel_type == GlideClientConfiguration.PubSubChannelModes.Pattern: + # cmd = "PUNSUBSCRIBE" + # else: + # cmd = "SUNSUBSCRIBE" + + # # we need unsubscribe commands because close might + # # UNSUBSCRIBE commands are unsupported, also, the routing might be wrong in cluster mode + # for channel_patern in channel_patterns: + + # await listening_client.custom_command( + # ["UNSUBSCRIBE", *list(channels_and_messages.keys())] + # ) + + if cluster_mode_subs: + for ( + channel_type, + channel_patterns, + ) in cluster_mode_subs.channels_and_patterns.items(): + if channel_type == GlideClusterClientConfiguration.PubSubChannelModes.Exact: + cmd = "UNSUBSCRIBE" + elif ( + channel_type + == GlideClusterClientConfiguration.PubSubChannelModes.Pattern + ): + cmd = "PUNSUBSCRIBE" + elif not await check_if_server_version_lt(client, "7.0.0"): + cmd = "SUNSUBSCRIBE" + else: + continue + + for channel_patern in channel_patterns: + await client.custom_command([cmd, channel_patern]) + + await client.close() + del client + await asyncio.sleep(1) + + @pytest.mark.asyncio class TestPubSub: @pytest.mark.parametrize("cluster_mode", [True, False]) @@ -167,29 +233,31 @@ async def test_pubsub_exact_happy_path( Async, Sync, and Callback. It verifies that a message published to a specific channel is correctly received by a subscriber. """ - channel = get_random_string(10) - message = get_random_string(5) - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel}}, - {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, - callback=callback, - context=context, - ) + listening_client, publishing_client = None, None + try: + channel = get_random_string(10) + message = get_random_string(5) + + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel}}, + {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, + callback=callback, + context=context, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 # allow the message to propagate @@ -205,11 +273,18 @@ async def test_pubsub_exact_happy_path( await check_no_messages_left(method, listening_client, callback_messages, 1) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["UNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["UNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_exact_happy_path_coexistence( @@ -222,23 +297,25 @@ async def test_pubsub_exact_happy_path_coexistence( and received using both async and sync methods to ensure that both methods can coexist and function correctly. """ - channel = get_random_string(10) - message = get_random_string(5) - message2 = get_random_string(7) - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel}}, - {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, - ) + listening_client, publishing_client = None, None + try: + channel = get_random_string(10) + message = get_random_string(5) + message2 = get_random_string(7) + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel}}, + {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: for msg in [message, message2]: result = await publishing_client.publish(msg, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -267,11 +344,18 @@ async def test_pubsub_exact_happy_path_coexistence( assert listening_client.try_get_pubsub_message() is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["UNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["UNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( @@ -287,44 +371,46 @@ async def test_pubsub_exact_happy_path_many_channels( unique message. It verifies that messages are correctly published and received using different retrieval methods: async, sync, and callback. """ - NUM_CHANNELS = 256 - shard_prefix = "{same-shard}" - - # Create a map of channels to random messages with shard prefix - channels_and_messages = { - f"{shard_prefix}{get_random_string(10)}": get_random_string(5) - for _ in range(NUM_CHANNELS) - } + listening_client, publishing_client = None, None + try: + NUM_CHANNELS = 256 + shard_prefix = "{same-shard}" - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages + # Create a map of channels to random messages with shard prefix + channels_and_messages = { + f"{shard_prefix}{get_random_string(10)}": get_random_string(5) + for _ in range(NUM_CHANNELS) + } - pub_sub = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( - channels_and_messages.keys() - ) - }, - { - GlideClientConfiguration.PubSubChannelModes.Exact: set( - channels_and_messages.keys() - ) - }, - callback=callback, - context=context, - ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( + channels_and_messages.keys() + ) + }, + { + GlideClientConfiguration.PubSubChannelModes.Exact: set( + channels_and_messages.keys() + ) + }, + callback=callback, + context=context, + ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: # Publish messages to each channel for channel, message in channels_and_messages.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -349,14 +435,21 @@ async def test_pubsub_exact_happy_path_many_channels( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command( - ["UNSUBSCRIBE", *list(channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command( + # ["UNSUBSCRIBE", *list(channels_and_messages.keys())] + # ) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None ) + if publishing_client: + await client_cleanup(publishing_client, None) + @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_exact_happy_path_many_channels_co_existence( self, request, cluster_mode: bool @@ -368,37 +461,39 @@ async def test_pubsub_exact_happy_path_many_channels_co_existence( It verifies that messages are correctly published and received using both async and sync methods to ensure that both methods can coexist and function correctly. """ - NUM_CHANNELS = 256 - shard_prefix = "{same-shard}" + listening_client, publishing_client = None, None + try: + NUM_CHANNELS = 256 + shard_prefix = "{same-shard}" - # Create a map of channels to random messages with shard prefix - channels_and_messages = { - f"{shard_prefix}{get_random_string(10)}": get_random_string(5) - for _ in range(NUM_CHANNELS) - } + # Create a map of channels to random messages with shard prefix + channels_and_messages = { + f"{shard_prefix}{get_random_string(10)}": get_random_string(5) + for _ in range(NUM_CHANNELS) + } - pub_sub = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( - channels_and_messages.keys() - ) - }, - { - GlideClientConfiguration.PubSubChannelModes.Exact: set( - channels_and_messages.keys() - ) - }, - ) + pub_sub = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( + channels_and_messages.keys() + ) + }, + { + GlideClientConfiguration.PubSubChannelModes.Exact: set( + channels_and_messages.keys() + ) + }, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: # Publish messages to each channel for channel, message in channels_and_messages.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -424,14 +519,21 @@ async def test_pubsub_exact_happy_path_many_channels_co_existence( assert listening_client.try_get_pubsub_message() is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command( - ["UNSUBSCRIBE", *list(channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command( + # ["UNSUBSCRIBE", *list(channels_and_messages.keys())] + # ) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None ) + if publishing_client: + await client_cleanup(publishing_client, None) + @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( "method", [MethodTesting.Async, MethodTesting.Sync, MethodTesting.Callback] @@ -446,38 +548,45 @@ async def test_sharded_pubsub( Async, Sync, and Callback. It verifies that a message published to a specific sharded channel is correctly received by a subscriber. """ - channel = get_random_string(10) - message = get_random_string(5) - publish_response = 1 - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, - {}, - callback=callback, - context=context, - ) + listening_client, publishing_client = None, None + try: + channel = get_random_string(10) + message = get_random_string(5) + publish_response = 1 + + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, + {}, + callback=callback, + context=context, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) - min_version = "7.0.0" - if await check_if_server_version_lt(publishing_client, min_version): - pytest.skip(reason=f"Valkey version required >= {min_version}") + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) + min_version = "7.0.0" + if await check_if_server_version_lt(publishing_client, min_version): + pytest.skip(reason=f"Valkey version required >= {min_version}") - try: + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True ) == publish_response ) + + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + # allow the message to propagate await asyncio.sleep(1) @@ -488,14 +597,22 @@ async def test_sharded_pubsub( assert pubsub_msg.channel == channel assert pubsub_msg.pattern is None - finally: # assert there are no messages to read await check_no_messages_left(method, listening_client, callback_messages, 1) - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["SUNSUBSCRIBE", channel]) + + finally: + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["SUNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True]) async def test_sharded_pubsub_co_existence(self, request, cluster_mode: bool): @@ -510,38 +627,48 @@ async def test_sharded_pubsub_co_existence(self, request, cluster_mode: bool): both async and sync methods. This ensures that the asynchronous and synchronous message retrieval methods can coexist without interfering with each other and operate as expected. """ - channel = get_random_string(10) - message = get_random_string(5) - message2 = get_random_string(7) - publish_response = 1 if cluster_mode else OK - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, - {}, - ) + listening_client, publishing_client = None, None + try: + channel = get_random_string(10) + message = get_random_string(5) + message2 = get_random_string(7) + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, + {}, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - min_version = "7.0.0" - if await check_if_server_version_lt(publishing_client, min_version): - pytest.skip(reason=f"Valkey version required >= {min_version}") + min_version = "7.0.0" + if await check_if_server_version_lt(publishing_client, min_version): + pytest.skip(reason=f"Valkey version required >= {min_version}") - try: + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True ) - == publish_response + == 1 ) assert ( await cast(GlideClusterClient, publishing_client).publish( message2, channel, sharded=True ) - == publish_response + == 1 ) + + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + + # await cast(GlideClusterClient, publishing_client).publish( + # message2, channel, sharded=True + # ) + # allow the messages to propagate await asyncio.sleep(1) @@ -567,11 +694,18 @@ async def test_sharded_pubsub_co_existence(self, request, cluster_mode: bool): assert listening_client.try_get_pubsub_message() is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["SUNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["SUNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( @@ -587,45 +721,47 @@ async def test_sharded_pubsub_many_channels( across multiple sharded channels. It covers three different message retrieval methods: Async, Sync, and Callback. """ - NUM_CHANNELS = 256 - shard_prefix = "{same-shard}" - publish_response = 1 - - # Create a map of channels to random messages with shard prefix - channels_and_messages = { - f"{shard_prefix}{get_random_string(10)}": get_random_string(5) - for _ in range(NUM_CHANNELS) - } - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages + listening_client, publishing_client = None, None + try: + NUM_CHANNELS = 256 + shard_prefix = "{same-shard}" + publish_response = 1 + + # Create a map of channels to random messages with shard prefix + channels_and_messages = { + f"{shard_prefix}{get_random_string(10)}": get_random_string(5) + for _ in range(NUM_CHANNELS) + } - pub_sub = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Sharded: set( - channels_and_messages.keys() - ) - }, - {}, - callback=callback, - context=context, - ) + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: set( + channels_and_messages.keys() + ) + }, + {}, + callback=callback, + context=context, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - min_version = "7.0.0" - if await check_if_server_version_lt(publishing_client, min_version): - pytest.skip(reason=f"Valkey version required >= {min_version}") + min_version = "7.0.0" + if await check_if_server_version_lt(publishing_client, min_version): + pytest.skip(reason=f"Redis version required >= {min_version}") - try: # Publish messages to each channel for channel, message in channels_and_messages.items(): + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True @@ -633,6 +769,10 @@ async def test_sharded_pubsub_many_channels( == publish_response ) + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + # Allow the messages to propagate await asyncio.sleep(1) @@ -655,14 +795,21 @@ async def test_sharded_pubsub_many_channels( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command( - ["SUNSUBSCRIBE", *list(channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command( + # ["SUNSUBSCRIBE", *list(channels_and_messages.keys())] + # ) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None ) + if publishing_client: + await client_cleanup(publishing_client, None) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( "method", [MethodTesting.Async, MethodTesting.Sync, MethodTesting.Callback] @@ -676,32 +823,38 @@ async def test_pubsub_pattern( This test verifies the behavior of PUBSUB when subscribing to a pattern and receiving messages using three different methods: Async, Sync, and Callback. """ - PATTERN = "{{{}}}:{}".format("channel", "*") - channels = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5), - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5), - } - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages + listening_client, publishing_client = None, None + try: + PATTERN = "{{{}}}:{}".format("channel", "*") + channels = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 5 + ), + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 5 + ), + } - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - callback=callback, - context=context, - ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + callback=callback, + context=context, + ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: for channel, message in channels.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -724,11 +877,18 @@ async def test_pubsub_pattern( await check_no_messages_left(method, listening_client, callback_messages, 2) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_pattern_co_existence(self, request, cluster_mode: bool): @@ -739,25 +899,31 @@ async def test_pubsub_pattern_co_existence(self, request, cluster_mode: bool): and received using both async and sync methods to ensure that both methods can coexist and function correctly. """ - PATTERN = "{{{}}}:{}".format("channel", "*") - channels = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5), - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5), - } + listening_client, publishing_client = None, None + try: + PATTERN = "{{{}}}:{}".format("channel", "*") + channels = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 5 + ), + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 5 + ), + } - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - ) + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: for channel, message in channels.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -784,11 +950,18 @@ async def test_pubsub_pattern_co_existence(self, request, cluster_mode: bool): assert listening_client.try_get_pubsub_message() is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( @@ -804,33 +977,37 @@ async def test_pubsub_pattern_many_channels( and received. It verifies that messages are correctly published and received using different retrieval methods: async, sync, and callback. """ - NUM_CHANNELS = 256 - PATTERN = "{{{}}}:{}".format("channel", "*") - channels = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5) - for _ in range(NUM_CHANNELS) - } - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages + listening_client, publishing_client = None, None + try: + NUM_CHANNELS = 256 + PATTERN = "{{{}}}:{}".format("channel", "*") + channels = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 5 + ) + for _ in range(NUM_CHANNELS) + } - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - callback=callback, - context=context, - ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + callback=callback, + context=context, + ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: for channel, message in channels.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -855,11 +1032,18 @@ async def test_pubsub_pattern_many_channels( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( @@ -877,60 +1061,68 @@ async def test_pubsub_combined_exact_and_pattern_one_client( - Subscribing to channels using a pattern and verifying message reception. - Ensuring that messages are correctly published and received using different retrieval methods (async, sync, callback). """ - NUM_CHANNELS = 256 - PATTERN = "{{{}}}:{}".format("pattern", "*") - - # Create dictionaries of channels and their corresponding messages - exact_channels_and_messages = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(10) - for _ in range(NUM_CHANNELS) - } - pattern_channels_and_messages = { - "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string(5) - for _ in range(NUM_CHANNELS) - } - - all_channels_and_messages = { - **exact_channels_and_messages, - **pattern_channels_and_messages, - } - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages - - # Setup PUBSUB for exact channels - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() - ), - GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}, - }, - { - GlideClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() - ), - GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}, - }, - callback=callback, - context=context, - ) + listening_client, publishing_client = None, None + try: + NUM_CHANNELS = 256 + PATTERN = "{{{}}}:{}".format("pattern", "*") - publishing_client, listening_client = await create_two_clients( - request, - cluster_mode, - pub_sub_exact, - ) + # Create dictionaries of channels and their corresponding messages + exact_channels_and_messages = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 10 + ) + for _ in range(NUM_CHANNELS) + } + pattern_channels_and_messages = { + "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string( + 5 + ) + for _ in range(NUM_CHANNELS) + } + + all_channels_and_messages = { + **exact_channels_and_messages, + **pattern_channels_and_messages, + } + + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + # Setup PUBSUB for exact channels + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ), + GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { + PATTERN + }, + }, + { + GlideClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ), + GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}, + }, + callback=callback, + context=context, + ) + + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub_exact, + ) - try: # Publish messages to all channels for channel, message in all_channels_and_messages.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -961,14 +1153,21 @@ async def test_pubsub_combined_exact_and_pattern_one_client( method, listening_client, callback_messages, NUM_CHANNELS * 2 ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command( - ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command( + # ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # ) + # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + if listening_client: + await client_cleanup( + listening_client, pub_sub_exact if cluster_mode else None ) - await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( @@ -988,76 +1187,91 @@ async def test_pubsub_combined_exact_and_pattern_multiple_clients( - Verifying that no messages are left unread. - Properly unsubscribing from all channels to avoid interference with other tests. """ - NUM_CHANNELS = 256 - PATTERN = "{{{}}}:{}".format("pattern", "*") - - # Create dictionaries of channels and their corresponding messages - exact_channels_and_messages = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(10) - for _ in range(NUM_CHANNELS) - } - pattern_channels_and_messages = { - "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string(5) - for _ in range(NUM_CHANNELS) - } - - all_channels_and_messages = { - **exact_channels_and_messages, - **pattern_channels_and_messages, - } - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages - - # Setup PUBSUB for exact channels - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() + ( + listening_client_exact, + publishing_client, + listening_client_pattern, + client_dont_care, + ) = (None, None, None, None) + try: + NUM_CHANNELS = 256 + PATTERN = "{{{}}}:{}".format("pattern", "*") + + # Create dictionaries of channels and their corresponding messages + exact_channels_and_messages = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 10 ) - }, - { - GlideClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() + for _ in range(NUM_CHANNELS) + } + pattern_channels_and_messages = { + "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string( + 5 ) - }, - callback=callback, - context=context, - ) + for _ in range(NUM_CHANNELS) + } - publishing_client, listening_client_exact = await create_two_clients( - request, - cluster_mode, - pub_sub_exact, - ) + all_channels_and_messages = { + **exact_channels_and_messages, + **pattern_channels_and_messages, + } - callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages_pattern + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] - # Setup PUBSUB for pattern channels - pub_sub_pattern = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - callback=callback, - context=context, - ) + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages - _, listening_client_pattern = await create_two_clients( - request, cluster_mode, pub_sub_pattern - ) + # Setup PUBSUB for exact channels + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ) + }, + { + GlideClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ) + }, + callback=callback, + context=context, + ) + + listening_client_exact, publishing_client = ( + await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub_exact, + ) + ) + + callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages_pattern + + # Setup PUBSUB for pattern channels + pub_sub_pattern = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + callback=callback, + context=context, + ) + + listening_client_pattern, client_dont_care = ( + await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub_pattern + ) + ) - try: # Publish messages to all channels for channel, message in all_channels_and_messages.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -1107,14 +1321,29 @@ async def test_pubsub_combined_exact_and_pattern_multiple_clients( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client_exact.custom_command( - ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client_exact.custom_command( + # ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # ) + # await listening_client_pattern.custom_command(["PUNSUBSCRIBE", PATTERN]) + if listening_client_exact: + await client_cleanup( + listening_client_exact, pub_sub_exact if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) + + if listening_client_pattern: + await client_cleanup( + listening_client_pattern, pub_sub_pattern if cluster_mode else None ) - await listening_client_pattern.custom_command(["PUNSUBSCRIBE", PATTERN]) + + if client_dont_care: + await client_cleanup(client_dont_care, None) @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( @@ -1133,73 +1362,83 @@ async def test_pubsub_combined_exact_pattern_and_sharded_one_client( - Subscribing to channels using a with sharded subscription and verifying message reception. - Ensuring that messages are correctly published and received using different retrieval methods (async, sync, callback). """ - NUM_CHANNELS = 256 - PATTERN = "{{{}}}:{}".format("pattern", "*") - SHARD_PREFIX = "{same-shard}" - - # Create dictionaries of channels and their corresponding messages - exact_channels_and_messages = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(10) - for _ in range(NUM_CHANNELS) - } - pattern_channels_and_messages = { - "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string(5) - for _ in range(NUM_CHANNELS) - } - sharded_channels_and_messages = { - f"{SHARD_PREFIX}:{get_random_string(10)}": get_random_string(7) - for _ in range(NUM_CHANNELS) - } - - publish_response = 1 - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages - - # Setup PUBSUB for exact channels - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() - ), - GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}, - GlideClusterClientConfiguration.PubSubChannelModes.Sharded: set( - sharded_channels_and_messages.keys() - ), - }, - {}, - callback=callback, - context=context, - ) + listening_client, publishing_client = None, None + try: + NUM_CHANNELS = 256 + PATTERN = "{{{}}}:{}".format("pattern", "*") + SHARD_PREFIX = "{same-shard}" + + # Create dictionaries of channels and their corresponding messages + exact_channels_and_messages = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 10 + ) + for _ in range(NUM_CHANNELS) + } + pattern_channels_and_messages = { + "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string( + 5 + ) + for _ in range(NUM_CHANNELS) + } + sharded_channels_and_messages = { + f"{SHARD_PREFIX}:{get_random_string(10)}": get_random_string(7) + for _ in range(NUM_CHANNELS) + } - publishing_client, listening_client = await create_two_clients( - request, - cluster_mode, - pub_sub_exact, - ) + publish_response = 1 + + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + # Setup PUBSUB for exact channels + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ), + GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { + PATTERN + }, + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: set( + sharded_channels_and_messages.keys() + ), + }, + {}, + callback=callback, + context=context, + ) - # Setup PUBSUB for sharded channels (Redis version > 7) - if await check_if_server_version_lt(publishing_client, "7.0.0"): - pytest.skip("Redis version required >= 7.0.0") + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub_exact, + ) + + # Setup PUBSUB for sharded channels (Redis version > 7) + if await check_if_server_version_lt(publishing_client, "7.0.0"): + pytest.skip("Redis version required >= 7.0.0") - try: # Publish messages to all channels for channel, message in { **exact_channels_and_messages, **pattern_channels_and_messages, }.items(): + # TODO: enable when client closing works assert ( await publishing_client.publish(message, channel) == publish_response ) + # await publishing_client.publish(message, channel) # Publish sharded messages to all channels for channel, message in sharded_channels_and_messages.items(): + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True @@ -1207,6 +1446,10 @@ async def test_pubsub_combined_exact_pattern_and_sharded_one_client( == publish_response ) + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + # allow the messages to propagate await asyncio.sleep(1) @@ -1240,18 +1483,25 @@ async def test_pubsub_combined_exact_pattern_and_sharded_one_client( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command( - ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] - ) - await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) - await listening_client.custom_command( - ["SUNSUBSCRIBE", *list(sharded_channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command( + # ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # ) + # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + # await listening_client.custom_command( + # ["SUNSUBSCRIBE", *list(sharded_channels_and_messages.keys())] + # ) + if listening_client: + await client_cleanup( + listening_client, pub_sub_exact if cluster_mode else None ) + if publishing_client: + await client_cleanup(publishing_client, None) + @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( "method", [MethodTesting.Async, MethodTesting.Sync, MethodTesting.Callback] @@ -1271,106 +1521,123 @@ async def test_pubsub_combined_exact_pattern_and_sharded_multi_client( - Verifying that no messages are left unread. - Properly unsubscribing from all channels to avoid interference with other tests. """ - NUM_CHANNELS = 256 - PATTERN = "{{{}}}:{}".format("pattern", "*") - SHARD_PREFIX = "{same-shard}" - - # Create dictionaries of channels and their corresponding messages - exact_channels_and_messages = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(10) - for _ in range(NUM_CHANNELS) - } - pattern_channels_and_messages = { - "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string(5) - for _ in range(NUM_CHANNELS) - } - sharded_channels_and_messages = { - f"{SHARD_PREFIX}:{get_random_string(10)}": get_random_string(7) - for _ in range(NUM_CHANNELS) - } - - publish_response = 1 - - callback, context = None, None - callback_messages_exact: List[CoreCommands.PubSubMsg] = [] - callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] - callback_messages_sharded: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages_exact - - # Setup PUBSUB for exact channels - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() + ( + listening_client_exact, + publishing_client, + listening_client_pattern, + listening_client_sharded, + ) = (None, None, None, None) + try: + NUM_CHANNELS = 256 + PATTERN = "{{{}}}:{}".format("pattern", "*") + SHARD_PREFIX = "{same-shard}" + + # Create dictionaries of channels and their corresponding messages + exact_channels_and_messages = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 10 ) - }, - { - GlideClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() + for _ in range(NUM_CHANNELS) + } + pattern_channels_and_messages = { + "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string( + 5 ) - }, - callback=callback, - context=context, - ) - - publishing_client, listening_client_exact = await create_two_clients( - request, - cluster_mode, - pub_sub_exact, - ) + for _ in range(NUM_CHANNELS) + } + sharded_channels_and_messages = { + f"{SHARD_PREFIX}:{get_random_string(10)}": get_random_string(7) + for _ in range(NUM_CHANNELS) + } - # Setup PUBSUB for sharded channels (Valkey version > 7) - if await check_if_server_version_lt(publishing_client, "7.0.0"): - pytest.skip("Valkey version required >= 7.0.0") + publish_response = 1 - if method == MethodTesting.Callback: - context = callback_messages_pattern + callback, context = None, None + callback_messages_exact: List[CoreCommands.PubSubMsg] = [] + callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] + callback_messages_sharded: List[CoreCommands.PubSubMsg] = [] - # Setup PUBSUB for pattern channels - pub_sub_pattern = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - callback=callback, - context=context, - ) + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages_exact - if method == MethodTesting.Callback: - context = callback_messages_sharded + # Setup PUBSUB for exact channels + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ) + }, + { + GlideClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ) + }, + callback=callback, + context=context, + ) - pub_sub_sharded = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Sharded: set( - sharded_channels_and_messages.keys() + listening_client_exact, publishing_client = ( + await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub_exact, ) - }, - {}, - callback=callback, - context=context, - ) + ) - listening_client_sharded, listening_client_pattern = await create_two_clients( - request, cluster_mode, pub_sub_pattern, pub_sub_sharded - ) + # Setup PUBSUB for sharded channels (Valkey version > 7) + if await check_if_server_version_lt(publishing_client, "7.0.0"): + pytest.skip("Valkey version required >= 7.0.0") + + if method == MethodTesting.Callback: + context = callback_messages_pattern + + # Setup PUBSUB for pattern channels + pub_sub_pattern = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + callback=callback, + context=context, + ) + + if method == MethodTesting.Callback: + context = callback_messages_sharded + + pub_sub_sharded = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: set( + sharded_channels_and_messages.keys() + ) + }, + {}, + callback=callback, + context=context, + ) + + listening_client_pattern, listening_client_sharded = ( + await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub_pattern, pub_sub_sharded + ) + ) - try: # Publish messages to all channels for channel, message in { **exact_channels_and_messages, **pattern_channels_and_messages, }.items(): + # TODO: enable when client closing works assert ( await publishing_client.publish(message, channel) == publish_response ) + # await publishing_client.publish(message, channel) # Publish sharded messages to all channels for channel, message in sharded_channels_and_messages.items(): + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True @@ -1378,6 +1645,10 @@ async def test_pubsub_combined_exact_pattern_and_sharded_multi_client( == publish_response ) + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + # allow the messages to propagate await asyncio.sleep(1) @@ -1446,18 +1717,33 @@ async def test_pubsub_combined_exact_pattern_and_sharded_multi_client( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client_exact.custom_command( - ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client_exact.custom_command( + # ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # ) + # await listening_client_pattern.custom_command(["PUNSUBSCRIBE", PATTERN]) + # await listening_client_sharded.custom_command( + # ["SUNSUBSCRIBE", *list(sharded_channels_and_messages.keys())] + # ) + if listening_client_exact: + await client_cleanup( + listening_client_exact, pub_sub_exact if cluster_mode else None ) - await listening_client_pattern.custom_command(["PUNSUBSCRIBE", PATTERN]) - await listening_client_sharded.custom_command( - ["SUNSUBSCRIBE", *list(sharded_channels_and_messages.keys())] + + if publishing_client: + await client_cleanup(publishing_client, None) + + if listening_client_pattern: + await client_cleanup( + listening_client_pattern, pub_sub_pattern if cluster_mode else None ) + if listening_client_sharded: + await client_cleanup(listening_client_sharded, None) + @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( "method", [MethodTesting.Async, MethodTesting.Sync, MethodTesting.Callback] @@ -1478,77 +1764,92 @@ async def test_pubsub_combined_different_channels_with_same_name( - Verifying that no messages are left unread. - Properly unsubscribing from all channels to avoid interference with other tests. """ - CHANNEL_NAME = "same-channel-name" - MESSAGE_EXACT = get_random_string(10) - MESSAGE_PATTERN = get_random_string(7) - MESSAGE_SHARDED = get_random_string(5) - - callback, context = None, None - callback_messages_exact: List[CoreCommands.PubSubMsg] = [] - callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] - callback_messages_sharded: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages_exact - - # Setup PUBSUB for exact channel - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, - {GlideClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, - callback=callback, - context=context, - ) - - publishing_client, listening_client_exact = await create_two_clients( - request, - cluster_mode, - pub_sub_exact, - ) - - # (Valkey version > 7) - if await check_if_server_version_lt(publishing_client, "7.0.0"): - pytest.skip("Valkey version required >= 7.0.0") - - # Setup PUBSUB for pattern channel - if method == MethodTesting.Callback: - context = callback_messages_pattern + ( + listening_client_exact, + publishing_client, + listening_client_pattern, + listening_client_sharded, + ) = (None, None, None, None) + try: + CHANNEL_NAME = "same-channel-name" + MESSAGE_EXACT = get_random_string(10) + MESSAGE_PATTERN = get_random_string(7) + MESSAGE_SHARDED = get_random_string(5) + + callback, context = None, None + callback_messages_exact: List[CoreCommands.PubSubMsg] = [] + callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] + callback_messages_sharded: List[CoreCommands.PubSubMsg] = [] + + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages_exact + + # Setup PUBSUB for exact channel + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: { + CHANNEL_NAME + } + }, + {GlideClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, + callback=callback, + context=context, + ) - # Setup PUBSUB for pattern channels - pub_sub_pattern = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { - CHANNEL_NAME - } - }, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {CHANNEL_NAME}}, - callback=callback, - context=context, - ) + listening_client_exact, publishing_client = ( + await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub_exact, + ) + ) - if method == MethodTesting.Callback: - context = callback_messages_sharded + # (Valkey version > 7) + if await check_if_server_version_lt(publishing_client, "7.0.0"): + pytest.skip("Valkey version required >= 7.0.0") + + # Setup PUBSUB for pattern channel + if method == MethodTesting.Callback: + context = callback_messages_pattern + + # Setup PUBSUB for pattern channels + pub_sub_pattern = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { + CHANNEL_NAME + } + }, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {CHANNEL_NAME}}, + callback=callback, + context=context, + ) - pub_sub_sharded = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { - CHANNEL_NAME - } - }, - {}, - callback=callback, - context=context, - ) + if method == MethodTesting.Callback: + context = callback_messages_sharded + + pub_sub_sharded = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { + CHANNEL_NAME + } + }, + {}, + callback=callback, + context=context, + ) - listening_client_sharded, listening_client_pattern = await create_two_clients( - request, cluster_mode, pub_sub_pattern, pub_sub_sharded - ) + listening_client_pattern, listening_client_sharded = ( + await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub_pattern, pub_sub_sharded + ) + ) - try: # Publish messages to each channel + # TODO: enable when client closing works assert await publishing_client.publish(MESSAGE_EXACT, CHANNEL_NAME) == 2 assert await publishing_client.publish(MESSAGE_PATTERN, CHANNEL_NAME) == 2 assert ( @@ -1558,6 +1859,12 @@ async def test_pubsub_combined_different_channels_with_same_name( == 1 ) + # await publishing_client.publish(MESSAGE_EXACT, CHANNEL_NAME) + # await publishing_client.publish(MESSAGE_PATTERN, CHANNEL_NAME) + # await cast(GlideClusterClient, publishing_client).publish( + # MESSAGE_SHARDED, CHANNEL_NAME, sharded=True + # ) + # allow the message to propagate await asyncio.sleep(1) @@ -1594,20 +1901,35 @@ async def test_pubsub_combined_different_channels_with_same_name( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client_exact.custom_command( - ["UNSUBSCRIBE", CHANNEL_NAME] - ) - await listening_client_pattern.custom_command( - ["PUNSUBSCRIBE", CHANNEL_NAME] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client_exact.custom_command( + # ["UNSUBSCRIBE", CHANNEL_NAME] + # ) + # await listening_client_pattern.custom_command( + # ["PUNSUBSCRIBE", CHANNEL_NAME] + # ) + # await listening_client_sharded.custom_command( + # ["SUNSUBSCRIBE", CHANNEL_NAME] + # ) + if listening_client_exact: + await client_cleanup( + listening_client_exact, pub_sub_exact if cluster_mode else None ) - await listening_client_sharded.custom_command( - ["SUNSUBSCRIBE", CHANNEL_NAME] + + if publishing_client: + await client_cleanup(publishing_client, None) + + if listening_client_pattern: + await client_cleanup( + listening_client_pattern, pub_sub_pattern if cluster_mode else None ) + if listening_client_sharded: + await client_cleanup(listening_client_sharded, None) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( "method", [MethodTesting.Async, MethodTesting.Sync, MethodTesting.Callback] @@ -1628,47 +1950,53 @@ async def test_pubsub_two_publishing_clients_same_name( - Verifying that no messages are left unread. - Properly unsubscribing from all channels to avoid interference with other tests. """ - CHANNEL_NAME = "channel-name" - MESSAGE_EXACT = get_random_string(10) - MESSAGE_PATTERN = get_random_string(7) - callback, context_exact, context_pattern = None, None, None - callback_messages_exact: List[CoreCommands.PubSubMsg] = [] - callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context_exact = callback_messages_exact - context_pattern = callback_messages_pattern - - # Setup PUBSUB for exact channel - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, - {GlideClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, - callback=callback, - context=context_exact, - ) - # Setup PUBSUB for pattern channels - pub_sub_pattern = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { - CHANNEL_NAME - } - }, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {CHANNEL_NAME}}, - callback=callback, - context=context_pattern, - ) + client_exact, client_pattern = None, None + try: + CHANNEL_NAME = "channel-name" + MESSAGE_EXACT = get_random_string(10) + MESSAGE_PATTERN = get_random_string(7) + callback, context_exact, context_pattern = None, None, None + callback_messages_exact: List[CoreCommands.PubSubMsg] = [] + callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] + + if method == MethodTesting.Callback: + callback = new_message + context_exact = callback_messages_exact + context_pattern = callback_messages_pattern + + # Setup PUBSUB for exact channel + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: { + CHANNEL_NAME + } + }, + {GlideClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, + callback=callback, + context=context_exact, + ) + # Setup PUBSUB for pattern channels + pub_sub_pattern = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { + CHANNEL_NAME + } + }, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {CHANNEL_NAME}}, + callback=callback, + context=context_pattern, + ) - client_pattern, client_exact = await create_two_clients( - request, cluster_mode, pub_sub_exact, pub_sub_pattern - ) + client_exact, client_pattern = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub_exact, pub_sub_pattern + ) - try: # Publish messages to each channel - both clients publishing for msg in [MESSAGE_EXACT, MESSAGE_PATTERN]: result = await client_pattern.publish(msg, CHANNEL_NAME) + # TODO: enable when client closing works if cluster_mode: assert result == 2 @@ -1697,12 +2025,21 @@ async def test_pubsub_two_publishing_clients_same_name( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await client_exact.custom_command(["UNSUBSCRIBE", CHANNEL_NAME]) - await client_pattern.custom_command(["PUNSUBSCRIBE", CHANNEL_NAME]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await client_exact.custom_command(["UNSUBSCRIBE", CHANNEL_NAME]) + # await client_pattern.custom_command(["PUNSUBSCRIBE", CHANNEL_NAME]) + if client_exact: + await client_cleanup( + client_exact, pub_sub_exact if cluster_mode else None + ) + + if client_pattern: + await client_cleanup( + client_pattern, pub_sub_pattern if cluster_mode else None + ) @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( @@ -1724,72 +2061,83 @@ async def test_pubsub_three_publishing_clients_same_name_with_sharded( - Verifying that no messages are left unread. - Properly unsubscribing from all channels to avoid interference with other tests. """ - CHANNEL_NAME = "same-channel-name" - MESSAGE_EXACT = get_random_string(10) - MESSAGE_PATTERN = get_random_string(7) - MESSAGE_SHARDED = get_random_string(5) - publish_response = 2 if cluster_mode else OK - callback, context_exact, context_pattern, context_sharded = ( + client_exact, client_pattern, client_sharded, client_dont_care = ( None, None, None, None, ) - callback_messages_exact: List[CoreCommands.PubSubMsg] = [] - callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] - callback_messages_sharded: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context_exact = callback_messages_exact - context_pattern = callback_messages_pattern - context_sharded = callback_messages_sharded - - # Setup PUBSUB for exact channel - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, - {GlideClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, - callback=callback, - context=context_exact, - ) - # Setup PUBSUB for pattern channels - pub_sub_pattern = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { - CHANNEL_NAME - } - }, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {CHANNEL_NAME}}, - callback=callback, - context=context_pattern, - ) - # Setup PUBSUB for pattern channels - pub_sub_sharded = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { - CHANNEL_NAME - } - }, - {}, - callback=callback, - context=context_sharded, - ) + try: + CHANNEL_NAME = "same-channel-name" + MESSAGE_EXACT = get_random_string(10) + MESSAGE_PATTERN = get_random_string(7) + MESSAGE_SHARDED = get_random_string(5) + publish_response = 2 if cluster_mode else OK + callback, context_exact, context_pattern, context_sharded = ( + None, + None, + None, + None, + ) + callback_messages_exact: List[CoreCommands.PubSubMsg] = [] + callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] + callback_messages_sharded: List[CoreCommands.PubSubMsg] = [] + + if method == MethodTesting.Callback: + callback = new_message + context_exact = callback_messages_exact + context_pattern = callback_messages_pattern + context_sharded = callback_messages_sharded + + # Setup PUBSUB for exact channel + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: { + CHANNEL_NAME + } + }, + {GlideClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, + callback=callback, + context=context_exact, + ) + # Setup PUBSUB for pattern channels + pub_sub_pattern = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { + CHANNEL_NAME + } + }, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {CHANNEL_NAME}}, + callback=callback, + context=context_pattern, + ) + # Setup PUBSUB for pattern channels + pub_sub_sharded = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { + CHANNEL_NAME + } + }, + {}, + callback=callback, + context=context_sharded, + ) - client_pattern, client_exact = await create_two_clients( - request, cluster_mode, pub_sub_exact, pub_sub_pattern - ) - _, client_sharded = await create_two_clients( - request, cluster_mode, pub_sub_sharded - ) - # (Valkey version > 7) - if await check_if_server_version_lt(client_pattern, "7.0.0"): - pytest.skip("Valkey version required >= 7.0.0") + client_exact, client_pattern = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub_exact, pub_sub_pattern + ) + client_sharded, client_dont_care = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub_sharded + ) + # (Valkey version > 7) + if await check_if_server_version_lt(client_pattern, "7.0.0"): + pytest.skip("Valkey version required >= 7.0.0") - try: # Publish messages to each channel - both clients publishing + # TODO: enable when client closing works assert ( await client_pattern.publish(MESSAGE_EXACT, CHANNEL_NAME) == publish_response @@ -1805,6 +2153,12 @@ async def test_pubsub_three_publishing_clients_same_name_with_sharded( == 1 ) + # await client_pattern.publish(MESSAGE_EXACT, CHANNEL_NAME) + # await client_sharded.publish(MESSAGE_PATTERN, CHANNEL_NAME) + # await cast(GlideClusterClient, client_exact).publish( + # MESSAGE_SHARDED, CHANNEL_NAME, sharded=True + # ) + # allow the message to propagate await asyncio.sleep(1) @@ -1840,16 +2194,33 @@ async def test_pubsub_three_publishing_clients_same_name_with_sharded( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await client_exact.custom_command(["UNSUBSCRIBE", CHANNEL_NAME]) - await client_pattern.custom_command(["PUNSUBSCRIBE", CHANNEL_NAME]) - await client_sharded.custom_command(["SUNSUBSCRIBE", CHANNEL_NAME]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await client_exact.custom_command(["UNSUBSCRIBE", CHANNEL_NAME]) + # await client_pattern.custom_command(["PUNSUBSCRIBE", CHANNEL_NAME]) + # await client_sharded.custom_command(["SUNSUBSCRIBE", CHANNEL_NAME]) + if client_exact: + await client_cleanup( + client_exact, pub_sub_exact if cluster_mode else None + ) + + if client_pattern: + await client_cleanup( + client_pattern, pub_sub_pattern if cluster_mode else None + ) + + if client_sharded: + await client_cleanup( + client_sharded, pub_sub_sharded if cluster_mode else None + ) + + if client_dont_care: + await client_cleanup(client_dont_care, None) @pytest.mark.skip( - reason="no way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649" + reason="This test requires special configuration for client-output-buffer-limit for valkey-server and timeouts seems to vary across platforms and server versions" ) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_exact_max_size_message(self, request, cluster_mode: bool): @@ -1868,9 +2239,8 @@ async def test_pubsub_exact_max_size_message(self, request, cluster_mode: bool): - Ensuring that no additional messages are left after the expected messages are received. """ channel = get_random_string(10) - message = get_random_string(512 * 1024 * 1024) - message2 = get_random_string(512 * 1024 * 1024) - publish_response = 1 if cluster_mode else OK + message = "1" * 512 * 1024 * 1024 + message2 = "2" * 512 * 1024 * 1024 pub_sub = create_pubsub_subscription( cluster_mode, @@ -1878,26 +2248,35 @@ async def test_pubsub_exact_max_size_message(self, request, cluster_mode: bool): {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub, + timeout=10000, ) try: - assert await publishing_client.publish(message, channel) == publish_response - assert ( - await publishing_client.publish(message2, channel) == publish_response - ) + # TODO: enable when client closing works + result = await publishing_client.publish(message, channel) + if cluster_mode: + assert result == 1 + + result = await publishing_client.publish(message2, channel) + if cluster_mode: + assert result == 1 # allow the message to propagate - await asyncio.sleep(5) + await asyncio.sleep(15) - async_msg = await listening_client.get_pubsub_message() - sync_msg = listening_client.try_get_pubsub_message() - assert sync_msg + # await publishing_client.publish(message, channel) + # await publishing_client.publish(message2, channel) + async_msg = await listening_client.get_pubsub_message() assert async_msg.message == message.encode() assert async_msg.channel == channel.encode() assert async_msg.pattern is None + sync_msg = listening_client.try_get_pubsub_message() + assert sync_msg assert sync_msg.message == message2.encode() assert sync_msg.channel == channel.encode() assert sync_msg.pattern is None @@ -1909,14 +2288,21 @@ async def test_pubsub_exact_max_size_message(self, request, cluster_mode: bool): assert listening_client.try_get_pubsub_message() is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["UNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["UNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.skip( - reason="no way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649" + reason="This test requires special configuration for client-output-buffer-limit for valkey-server and timeouts seems to vary across platforms and server versions" ) @pytest.mark.parametrize("cluster_mode", [True]) async def test_pubsub_sharded_max_size_message(self, request, cluster_mode: bool): @@ -1934,37 +2320,51 @@ async def test_pubsub_sharded_max_size_message(self, request, cluster_mode: bool - Verifying that the messages are received correctly using both async and sync methods. - Ensuring that no additional messages are left after the expected messages are received. """ - channel = get_random_string(10) - message = get_random_string(512 * 1024 * 1024) - message2 = get_random_string(512 * 1024 * 1024) - publish_response = 1 if cluster_mode else OK - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, - {}, - ) + publishing_client, listening_client = None, None + try: + channel = get_random_string(10) + message = "1" * 512 * 1024 * 1024 + message2 = "2" * 512 * 1024 * 1024 + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, + {}, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub, + timeout=10000, + ) - # (Redis version > 7) - if await check_if_server_version_lt(publishing_client, "7.0.0"): - pytest.skip("Redis version required >= 7.0.0") + # (Redis version > 7) + if await check_if_server_version_lt(publishing_client, "7.0.0"): + pytest.skip("Redis version required >= 7.0.0") - try: + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True ) - == publish_response + == 1 ) + assert ( - await publishing_client.publish(message2, channel) == publish_response + await cast(GlideClusterClient, publishing_client).publish( + message2, channel, sharded=True + ) + == 1 ) + + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + # await publishing_client.publish(message2, channel) + # allow the message to propagate - await asyncio.sleep(5) + await asyncio.sleep(15) async_msg = await listening_client.get_pubsub_message() sync_msg = listening_client.try_get_pubsub_message() @@ -1985,14 +2385,21 @@ async def test_pubsub_sharded_max_size_message(self, request, cluster_mode: bool assert listening_client.try_get_pubsub_message() is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["UNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["UNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.skip( - reason="no way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649" + reason="This test requires special configuration for client-output-buffer-limit for valkey-server and timeouts seems to vary across platforms and server versions" ) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_exact_max_size_message_callback( @@ -2011,45 +2418,56 @@ async def test_pubsub_exact_max_size_message_callback( - Publishing a maximum size message to the channel. - Verifying that the message is received correctly using the callback method. """ - channel = get_random_string(10) - message = get_random_string(512 * 1024 * 1024) - publish_response = 1 if cluster_mode else OK - - callback_messages: List[CoreCommands.PubSubMsg] = [] - callback, context = new_message, callback_messages - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel}}, - {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, - callback=callback, - context=context, - ) + listening_client, publishing_client = None, None + try: + channel = get_random_string(10) + message = "0" * 12 * 1024 * 1024 + + callback_messages: List[CoreCommands.PubSubMsg] = [] + callback, context = new_message, callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel}}, + {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, + callback=callback, + context=context, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub, timeout=10000 + ) - try: - assert await publishing_client.publish(message, channel) == publish_response + # TODO: enable when client closing works + result = await publishing_client.publish(message, channel) + if cluster_mode: + assert result == 1 + # await publishing_client.publish(message, channel) # allow the message to propagate - await asyncio.sleep(5) + await asyncio.sleep(15) assert len(callback_messages) == 1 - assert callback_messages[0].message == message - assert callback_messages[0].channel == channel + assert callback_messages[0].message == message.encode() + assert callback_messages[0].channel == channel.encode() assert callback_messages[0].pattern is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["UNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["UNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.skip( - reason="no way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649" + reason="This test requires special configuration for client-output-buffer-limit for valkey-server and timeouts seems to vary across platforms and server versions" ) @pytest.mark.parametrize("cluster_mode", [True]) async def test_pubsub_sharded_max_size_message_callback( @@ -2068,51 +2486,64 @@ async def test_pubsub_sharded_max_size_message_callback( - Publishing a maximum size message to the channel. - Verifying that the message is received correctly using the callback method. """ - channel = get_random_string(10) - message = get_random_string(512 * 1024 * 1024) - publish_response = 1 if cluster_mode else OK - - callback_messages: List[CoreCommands.PubSubMsg] = [] - callback, context = new_message, callback_messages - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, - {}, - callback=callback, - context=context, - ) + publishing_client, listening_client = None, None + try: + channel = get_random_string(10) + message = "0" * 512 * 1024 * 1024 + + callback_messages: List[CoreCommands.PubSubMsg] = [] + callback, context = new_message, callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, + {}, + callback=callback, + context=context, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub, timeout=10000 + ) - # (Valkey version > 7) - if await check_if_server_version_lt(publishing_client, "7.0.0"): - pytest.skip("Valkey version required >= 7.0.0") + # (Valkey version > 7) + if await check_if_server_version_lt(publishing_client, "7.0.0"): + pytest.skip("Valkey version required >= 7.0.0") - try: + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True ) - == publish_response + == 1 ) + + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + # allow the message to propagate - await asyncio.sleep(5) + await asyncio.sleep(15) assert len(callback_messages) == 1 - assert callback_messages[0].message == message - assert callback_messages[0].channel == channel + assert callback_messages[0].message == message.encode() + assert callback_messages[0].channel == channel.encode() assert callback_messages[0].pattern is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["UNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["UNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_resp2_raise_an_error(self, request, cluster_mode: bool): @@ -2126,7 +2557,7 @@ async def test_pubsub_resp2_raise_an_error(self, request, cluster_mode: bool): ) with pytest.raises(ConfigurationError): - await create_two_clients( + await create_two_clients_with_pubsub( request, cluster_mode, pub_sub_exact, protocol=ProtocolVersion.RESP2 ) @@ -2145,4 +2576,4 @@ async def test_pubsub_context_with_no_callback_raise_error( ) with pytest.raises(ConfigurationError): - await create_two_clients(request, cluster_mode, pub_sub_exact) + await create_two_clients_with_pubsub(request, cluster_mode, pub_sub_exact) From b435c50ceb6c26e53026a112b1c2ae46d6ebf3b0 Mon Sep 17 00:00:00 2001 From: ikolomi Date: Mon, 8 Jul 2024 14:28:35 +0300 Subject: [PATCH 2/2] CR cleanup --- .github/workflows/python.yml | 4 +- glide-core/src/socket_listener.rs | 2 +- python/pytest.ini | 2 +- python/pytest_pubsub.ini | 4 - python/python/tests/test_pubsub.py | 510 ++++++----------------------- 5 files changed, 99 insertions(+), 423 deletions(-) delete mode 100644 python/pytest_pubsub.ini diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 85f7038baf..df1e2236fb 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -166,7 +166,7 @@ jobs: run: | source .env/bin/activate cd python/tests/ - pytest -c ../../pytest_pubsub.ini --asyncio-mode=auto test_pubsub.py::TestPubSub + pytest --asyncio-mode=auto -k test_pubsub lint-rust: runs-on: ubuntu-latest @@ -238,7 +238,7 @@ jobs: run: | source .env/bin/activate cd python/tests/ - pytest -c ../../pytest_pubsub.ini --asyncio-mode=auto test_pubsub.py::TestPubSub + pytest --asyncio-mode=auto -k test_pubsub build-amazonlinux-latest: runs-on: ubuntu-latest diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index ba60676391..3dc52fc0ed 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -120,9 +120,9 @@ impl UnixStreamListener { match self.rotating_buffer.get_requests() { Ok(requests) => { if !requests.is_empty() { - // continue to read from socket return ReceivedValues(requests); } + // continue to read from socket continue; } Err(err) => return UnhandledError(err.into()).into(), diff --git a/python/pytest.ini b/python/pytest.ini index 236b85a8ad..0624078cea 100644 --- a/python/pytest.ini +++ b/python/pytest.ini @@ -1,4 +1,4 @@ [pytest] markers = smoke_test: mark a test as a build verification testing. -addopts = -k "not redis_modules and not pubsub" +addopts = -k "not server_modules and not pubsub" diff --git a/python/pytest_pubsub.ini b/python/pytest_pubsub.ini deleted file mode 100644 index bf42185756..0000000000 --- a/python/pytest_pubsub.ini +++ /dev/null @@ -1,4 +0,0 @@ -[pytest] -markers = - smoke_test: mark a test as a build verification testing. -addopts = -k "not redis_modules" diff --git a/python/python/tests/test_pubsub.py b/python/python/tests/test_pubsub.py index 87a37dc954..23b5bb6709 100644 --- a/python/python/tests/test_pubsub.py +++ b/python/python/tests/test_pubsub.py @@ -40,9 +40,7 @@ async def create_two_clients_with_pubsub( client2_pubsub: Optional[Any] = None, protocol: ProtocolVersion = ProtocolVersion.RESP3, timeout: Optional[int] = None, -) -> Tuple[ - Union[GlideClient, GlideClusterClient], Union[GlideClient, GlideClusterClient] -]: +) -> Tuple[TGlideClient, TGlideClient]: """ Sets 2 up clients for testing purposes with optional pubsub configuration. @@ -157,7 +155,7 @@ def new_message(msg: CoreCommands.PubSubMsg, context: Any): async def client_cleanup( - client: Union[GlideClient, GlideClusterClient], + client: Optional[Union[GlideClient, GlideClusterClient]], cluster_mode_subs: Optional[ GlideClusterClientConfiguration.PubSubSubscriptions ] = None, @@ -168,26 +166,9 @@ async def client_cleanup( In addition, it tries to clean up cluster mode subsciptions since it was found the closing the client via close() is not enough. Note that unsubscribing is not feasible in the current implementation since its unknown on which node the subs are configured """ - # if is_cluster: - # pubsub_subs = cast(ClusterClientConfiguration.PubSubSubscriptions, pubsub_subs) - # else : - # pubsub_subs = cast(GlideClientConfiguration.PubSubSubscriptions, pubsub_subs) - - # for channel_type, channel_patterns in pubsub_subs: - # if channel_type == ClusterClientConfiguration.PubSubChannelModes.Exact or channel_type == GlideClientConfiguration.PubSubChannelModes.Exact: - # cmd = "UNSUBSCRIBE" - # elif channel_type == ClusterClientConfiguration.PubSubChannelModes.Pattern or channel_type == GlideClientConfiguration.PubSubChannelModes.Pattern: - # cmd = "PUNSUBSCRIBE" - # else: - # cmd = "SUNSUBSCRIBE" - - # # we need unsubscribe commands because close might - # # UNSUBSCRIBE commands are unsupported, also, the routing might be wrong in cluster mode - # for channel_patern in channel_patterns: - - # await listening_client.custom_command( - # ["UNSUBSCRIBE", *list(channels_and_messages.keys())] - # ) + + if client is None: + return if cluster_mode_subs: for ( @@ -204,6 +185,7 @@ async def client_cleanup( elif not await check_if_server_version_lt(client, "7.0.0"): cmd = "SUNSUBSCRIBE" else: + # disregard sharded config for versions < 7.0.0 continue for channel_patern in channel_patterns: @@ -211,6 +193,7 @@ async def client_cleanup( await client.close() del client + # The closure is not completed in the glide-core instantly await asyncio.sleep(1) @@ -257,7 +240,6 @@ async def test_pubsub_exact_happy_path( ) result = await publishing_client.publish(message, channel) - # TODO: enable when client closing works if cluster_mode: assert result == 1 # allow the message to propagate @@ -273,18 +255,8 @@ async def test_pubsub_exact_happy_path( await check_no_messages_left(method, listening_client, callback_messages, 1) finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command(["UNSUBSCRIBE", channel]) - if listening_client: - await client_cleanup( - listening_client, pub_sub if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup(listening_client, pub_sub if cluster_mode else None) + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_exact_happy_path_coexistence( @@ -315,7 +287,6 @@ async def test_pubsub_exact_happy_path_coexistence( for msg in [message, message2]: result = await publishing_client.publish(msg, channel) - # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -344,18 +315,8 @@ async def test_pubsub_exact_happy_path_coexistence( assert listening_client.try_get_pubsub_message() is None finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command(["UNSUBSCRIBE", channel]) - if listening_client: - await client_cleanup( - listening_client, pub_sub if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup(listening_client, pub_sub if cluster_mode else None) + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( @@ -410,7 +371,6 @@ async def test_pubsub_exact_happy_path_many_channels( # Publish messages to each channel for channel, message in channels_and_messages.items(): result = await publishing_client.publish(message, channel) - # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -435,20 +395,8 @@ async def test_pubsub_exact_happy_path_many_channels( ) finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command( - # ["UNSUBSCRIBE", *list(channels_and_messages.keys())] - # ) - if listening_client: - await client_cleanup( - listening_client, pub_sub if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup(listening_client, pub_sub if cluster_mode else None) + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_exact_happy_path_many_channels_co_existence( @@ -493,7 +441,6 @@ async def test_pubsub_exact_happy_path_many_channels_co_existence( # Publish messages to each channel for channel, message in channels_and_messages.items(): result = await publishing_client.publish(message, channel) - # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -519,20 +466,8 @@ async def test_pubsub_exact_happy_path_many_channels_co_existence( assert listening_client.try_get_pubsub_message() is None finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command( - # ["UNSUBSCRIBE", *list(channels_and_messages.keys())] - # ) - if listening_client: - await client_cleanup( - listening_client, pub_sub if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup(listening_client, pub_sub if cluster_mode else None) + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( @@ -575,7 +510,6 @@ async def test_sharded_pubsub( if await check_if_server_version_lt(publishing_client, min_version): pytest.skip(reason=f"Valkey version required >= {min_version}") - # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True @@ -583,10 +517,6 @@ async def test_sharded_pubsub( == publish_response ) - # await cast(GlideClusterClient, publishing_client).publish( - # message, channel, sharded=True - # ) - # allow the message to propagate await asyncio.sleep(1) @@ -601,18 +531,8 @@ async def test_sharded_pubsub( await check_no_messages_left(method, listening_client, callback_messages, 1) finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command(["SUNSUBSCRIBE", channel]) - if listening_client: - await client_cleanup( - listening_client, pub_sub if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup(listening_client, pub_sub if cluster_mode else None) + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True]) async def test_sharded_pubsub_co_existence(self, request, cluster_mode: bool): @@ -647,7 +567,6 @@ async def test_sharded_pubsub_co_existence(self, request, cluster_mode: bool): if await check_if_server_version_lt(publishing_client, min_version): pytest.skip(reason=f"Valkey version required >= {min_version}") - # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True @@ -661,14 +580,6 @@ async def test_sharded_pubsub_co_existence(self, request, cluster_mode: bool): == 1 ) - # await cast(GlideClusterClient, publishing_client).publish( - # message, channel, sharded=True - # ) - - # await cast(GlideClusterClient, publishing_client).publish( - # message2, channel, sharded=True - # ) - # allow the messages to propagate await asyncio.sleep(1) @@ -694,18 +605,8 @@ async def test_sharded_pubsub_co_existence(self, request, cluster_mode: bool): assert listening_client.try_get_pubsub_message() is None finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command(["SUNSUBSCRIBE", channel]) - if listening_client: - await client_cleanup( - listening_client, pub_sub if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup(listening_client, pub_sub if cluster_mode else None) + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( @@ -761,7 +662,6 @@ async def test_sharded_pubsub_many_channels( # Publish messages to each channel for channel, message in channels_and_messages.items(): - # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True @@ -769,10 +669,6 @@ async def test_sharded_pubsub_many_channels( == publish_response ) - # await cast(GlideClusterClient, publishing_client).publish( - # message, channel, sharded=True - # ) - # Allow the messages to propagate await asyncio.sleep(1) @@ -795,18 +691,10 @@ async def test_sharded_pubsub_many_channels( ) finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command( - # ["SUNSUBSCRIBE", *list(channels_and_messages.keys())] - # ) if listening_client: await client_cleanup( listening_client, pub_sub if cluster_mode else None ) - if publishing_client: await client_cleanup(publishing_client, None) @@ -854,7 +742,6 @@ async def test_pubsub_pattern( for channel, message in channels.items(): result = await publishing_client.publish(message, channel) - # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -877,18 +764,8 @@ async def test_pubsub_pattern( await check_no_messages_left(method, listening_client, callback_messages, 2) finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) - if listening_client: - await client_cleanup( - listening_client, pub_sub if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup(listening_client, pub_sub if cluster_mode else None) + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_pattern_co_existence(self, request, cluster_mode: bool): @@ -923,7 +800,6 @@ async def test_pubsub_pattern_co_existence(self, request, cluster_mode: bool): for channel, message in channels.items(): result = await publishing_client.publish(message, channel) - # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -950,18 +826,8 @@ async def test_pubsub_pattern_co_existence(self, request, cluster_mode: bool): assert listening_client.try_get_pubsub_message() is None finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) - if listening_client: - await client_cleanup( - listening_client, pub_sub if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup(listening_client, pub_sub if cluster_mode else None) + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( @@ -1007,7 +873,6 @@ async def test_pubsub_pattern_many_channels( for channel, message in channels.items(): result = await publishing_client.publish(message, channel) - # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -1032,18 +897,8 @@ async def test_pubsub_pattern_many_channels( ) finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) - if listening_client: - await client_cleanup( - listening_client, pub_sub if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup(listening_client, pub_sub if cluster_mode else None) + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( @@ -1122,7 +977,6 @@ async def test_pubsub_combined_exact_and_pattern_one_client( # Publish messages to all channels for channel, message in all_channels_and_messages.items(): result = await publishing_client.publish(message, channel) - # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -1153,21 +1007,10 @@ async def test_pubsub_combined_exact_and_pattern_one_client( method, listening_client, callback_messages, NUM_CHANNELS * 2 ) finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command( - # ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] - # ) - # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) - if listening_client: - await client_cleanup( - listening_client, pub_sub_exact if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup( + listening_client, pub_sub_exact if cluster_mode else None + ) + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( @@ -1271,7 +1114,6 @@ async def test_pubsub_combined_exact_and_pattern_multiple_clients( # Publish messages to all channels for channel, message in all_channels_and_messages.items(): result = await publishing_client.publish(message, channel) - # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -1321,29 +1163,14 @@ async def test_pubsub_combined_exact_and_pattern_multiple_clients( ) finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client_exact.custom_command( - # ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] - # ) - # await listening_client_pattern.custom_command(["PUNSUBSCRIBE", PATTERN]) - if listening_client_exact: - await client_cleanup( - listening_client_exact, pub_sub_exact if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) - - if listening_client_pattern: - await client_cleanup( - listening_client_pattern, pub_sub_pattern if cluster_mode else None - ) - - if client_dont_care: - await client_cleanup(client_dont_care, None) + await client_cleanup( + listening_client_exact, pub_sub_exact if cluster_mode else None + ) + await client_cleanup(publishing_client, None) + await client_cleanup( + listening_client_pattern, pub_sub_pattern if cluster_mode else None + ) + await client_cleanup(client_dont_care, None) @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( @@ -1429,16 +1256,13 @@ async def test_pubsub_combined_exact_pattern_and_sharded_one_client( **exact_channels_and_messages, **pattern_channels_and_messages, }.items(): - # TODO: enable when client closing works assert ( await publishing_client.publish(message, channel) == publish_response ) - # await publishing_client.publish(message, channel) # Publish sharded messages to all channels for channel, message in sharded_channels_and_messages.items(): - # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True @@ -1446,10 +1270,6 @@ async def test_pubsub_combined_exact_pattern_and_sharded_one_client( == publish_response ) - # await cast(GlideClusterClient, publishing_client).publish( - # message, channel, sharded=True - # ) - # allow the messages to propagate await asyncio.sleep(1) @@ -1483,24 +1303,10 @@ async def test_pubsub_combined_exact_pattern_and_sharded_one_client( ) finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command( - # ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] - # ) - # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) - # await listening_client.custom_command( - # ["SUNSUBSCRIBE", *list(sharded_channels_and_messages.keys())] - # ) - if listening_client: - await client_cleanup( - listening_client, pub_sub_exact if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup( + listening_client, pub_sub_exact if cluster_mode else None + ) + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( @@ -1527,6 +1333,13 @@ async def test_pubsub_combined_exact_pattern_and_sharded_multi_client( listening_client_pattern, listening_client_sharded, ) = (None, None, None, None) + + ( + pub_sub_exact, + pub_sub_sharded, + pub_sub_pattern, + ) = (None, None, None) + try: NUM_CHANNELS = 256 PATTERN = "{{{}}}:{}".format("pattern", "*") @@ -1628,16 +1441,13 @@ async def test_pubsub_combined_exact_pattern_and_sharded_multi_client( **exact_channels_and_messages, **pattern_channels_and_messages, }.items(): - # TODO: enable when client closing works assert ( await publishing_client.publish(message, channel) == publish_response ) - # await publishing_client.publish(message, channel) # Publish sharded messages to all channels for channel, message in sharded_channels_and_messages.items(): - # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True @@ -1645,10 +1455,6 @@ async def test_pubsub_combined_exact_pattern_and_sharded_multi_client( == publish_response ) - # await cast(GlideClusterClient, publishing_client).publish( - # message, channel, sharded=True - # ) - # allow the messages to propagate await asyncio.sleep(1) @@ -1717,32 +1523,16 @@ async def test_pubsub_combined_exact_pattern_and_sharded_multi_client( ) finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client_exact.custom_command( - # ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] - # ) - # await listening_client_pattern.custom_command(["PUNSUBSCRIBE", PATTERN]) - # await listening_client_sharded.custom_command( - # ["SUNSUBSCRIBE", *list(sharded_channels_and_messages.keys())] - # ) - if listening_client_exact: - await client_cleanup( - listening_client_exact, pub_sub_exact if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) - - if listening_client_pattern: - await client_cleanup( - listening_client_pattern, pub_sub_pattern if cluster_mode else None - ) - - if listening_client_sharded: - await client_cleanup(listening_client_sharded, None) + await client_cleanup( + listening_client_exact, pub_sub_exact if cluster_mode else None + ) + await client_cleanup(publishing_client, None) + await client_cleanup( + listening_client_pattern, pub_sub_pattern if cluster_mode else None + ) + await client_cleanup( + listening_client_sharded, pub_sub_sharded if cluster_mode else None + ) @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( @@ -1770,6 +1560,13 @@ async def test_pubsub_combined_different_channels_with_same_name( listening_client_pattern, listening_client_sharded, ) = (None, None, None, None) + + ( + pub_sub_exact, + pub_sub_sharded, + pub_sub_pattern, + ) = (None, None, None) + try: CHANNEL_NAME = "same-channel-name" MESSAGE_EXACT = get_random_string(10) @@ -1849,7 +1646,6 @@ async def test_pubsub_combined_different_channels_with_same_name( ) # Publish messages to each channel - # TODO: enable when client closing works assert await publishing_client.publish(MESSAGE_EXACT, CHANNEL_NAME) == 2 assert await publishing_client.publish(MESSAGE_PATTERN, CHANNEL_NAME) == 2 assert ( @@ -1859,12 +1655,6 @@ async def test_pubsub_combined_different_channels_with_same_name( == 1 ) - # await publishing_client.publish(MESSAGE_EXACT, CHANNEL_NAME) - # await publishing_client.publish(MESSAGE_PATTERN, CHANNEL_NAME) - # await cast(GlideClusterClient, publishing_client).publish( - # MESSAGE_SHARDED, CHANNEL_NAME, sharded=True - # ) - # allow the message to propagate await asyncio.sleep(1) @@ -1901,34 +1691,16 @@ async def test_pubsub_combined_different_channels_with_same_name( ) finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client_exact.custom_command( - # ["UNSUBSCRIBE", CHANNEL_NAME] - # ) - # await listening_client_pattern.custom_command( - # ["PUNSUBSCRIBE", CHANNEL_NAME] - # ) - # await listening_client_sharded.custom_command( - # ["SUNSUBSCRIBE", CHANNEL_NAME] - # ) - if listening_client_exact: - await client_cleanup( - listening_client_exact, pub_sub_exact if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) - - if listening_client_pattern: - await client_cleanup( - listening_client_pattern, pub_sub_pattern if cluster_mode else None - ) - - if listening_client_sharded: - await client_cleanup(listening_client_sharded, None) + await client_cleanup( + listening_client_exact, pub_sub_exact if cluster_mode else None + ) + await client_cleanup(publishing_client, None) + await client_cleanup( + listening_client_pattern, pub_sub_pattern if cluster_mode else None + ) + await client_cleanup( + listening_client_sharded, pub_sub_sharded if cluster_mode else None + ) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( @@ -1996,7 +1768,6 @@ async def test_pubsub_two_publishing_clients_same_name( # Publish messages to each channel - both clients publishing for msg in [MESSAGE_EXACT, MESSAGE_PATTERN]: result = await client_pattern.publish(msg, CHANNEL_NAME) - # TODO: enable when client closing works if cluster_mode: assert result == 2 @@ -2025,21 +1796,10 @@ async def test_pubsub_two_publishing_clients_same_name( ) finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await client_exact.custom_command(["UNSUBSCRIBE", CHANNEL_NAME]) - # await client_pattern.custom_command(["PUNSUBSCRIBE", CHANNEL_NAME]) - if client_exact: - await client_cleanup( - client_exact, pub_sub_exact if cluster_mode else None - ) - - if client_pattern: - await client_cleanup( - client_pattern, pub_sub_pattern if cluster_mode else None - ) + await client_cleanup(client_exact, pub_sub_exact if cluster_mode else None) + await client_cleanup( + client_pattern, pub_sub_pattern if cluster_mode else None + ) @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( @@ -2137,7 +1897,6 @@ async def test_pubsub_three_publishing_clients_same_name_with_sharded( pytest.skip("Valkey version required >= 7.0.0") # Publish messages to each channel - both clients publishing - # TODO: enable when client closing works assert ( await client_pattern.publish(MESSAGE_EXACT, CHANNEL_NAME) == publish_response @@ -2153,12 +1912,6 @@ async def test_pubsub_three_publishing_clients_same_name_with_sharded( == 1 ) - # await client_pattern.publish(MESSAGE_EXACT, CHANNEL_NAME) - # await client_sharded.publish(MESSAGE_PATTERN, CHANNEL_NAME) - # await cast(GlideClusterClient, client_exact).publish( - # MESSAGE_SHARDED, CHANNEL_NAME, sharded=True - # ) - # allow the message to propagate await asyncio.sleep(1) @@ -2194,30 +1947,14 @@ async def test_pubsub_three_publishing_clients_same_name_with_sharded( ) finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await client_exact.custom_command(["UNSUBSCRIBE", CHANNEL_NAME]) - # await client_pattern.custom_command(["PUNSUBSCRIBE", CHANNEL_NAME]) - # await client_sharded.custom_command(["SUNSUBSCRIBE", CHANNEL_NAME]) - if client_exact: - await client_cleanup( - client_exact, pub_sub_exact if cluster_mode else None - ) - - if client_pattern: - await client_cleanup( - client_pattern, pub_sub_pattern if cluster_mode else None - ) - - if client_sharded: - await client_cleanup( - client_sharded, pub_sub_sharded if cluster_mode else None - ) - - if client_dont_care: - await client_cleanup(client_dont_care, None) + await client_cleanup(client_exact, pub_sub_exact if cluster_mode else None) + await client_cleanup( + client_pattern, pub_sub_pattern if cluster_mode else None + ) + await client_cleanup( + client_sharded, pub_sub_sharded if cluster_mode else None + ) + await client_cleanup(client_dont_care, None) @pytest.mark.skip( reason="This test requires special configuration for client-output-buffer-limit for valkey-server and timeouts seems to vary across platforms and server versions" @@ -2256,7 +1993,6 @@ async def test_pubsub_exact_max_size_message(self, request, cluster_mode: bool): ) try: - # TODO: enable when client closing works result = await publishing_client.publish(message, channel) if cluster_mode: assert result == 1 @@ -2267,9 +2003,6 @@ async def test_pubsub_exact_max_size_message(self, request, cluster_mode: bool): # allow the message to propagate await asyncio.sleep(15) - # await publishing_client.publish(message, channel) - # await publishing_client.publish(message2, channel) - async_msg = await listening_client.get_pubsub_message() assert async_msg.message == message.encode() assert async_msg.channel == channel.encode() @@ -2288,18 +2021,8 @@ async def test_pubsub_exact_max_size_message(self, request, cluster_mode: bool): assert listening_client.try_get_pubsub_message() is None finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command(["UNSUBSCRIBE", channel]) - if listening_client: - await client_cleanup( - listening_client, pub_sub if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup(listening_client, pub_sub if cluster_mode else None) + await client_cleanup(publishing_client, None) @pytest.mark.skip( reason="This test requires special configuration for client-output-buffer-limit for valkey-server and timeouts seems to vary across platforms and server versions" @@ -2343,7 +2066,6 @@ async def test_pubsub_sharded_max_size_message(self, request, cluster_mode: bool if await check_if_server_version_lt(publishing_client, "7.0.0"): pytest.skip("Redis version required >= 7.0.0") - # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True @@ -2358,11 +2080,6 @@ async def test_pubsub_sharded_max_size_message(self, request, cluster_mode: bool == 1 ) - # await cast(GlideClusterClient, publishing_client).publish( - # message, channel, sharded=True - # ) - # await publishing_client.publish(message2, channel) - # allow the message to propagate await asyncio.sleep(15) @@ -2385,18 +2102,8 @@ async def test_pubsub_sharded_max_size_message(self, request, cluster_mode: bool assert listening_client.try_get_pubsub_message() is None finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command(["UNSUBSCRIBE", channel]) - if listening_client: - await client_cleanup( - listening_client, pub_sub if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup(listening_client, pub_sub if cluster_mode else None) + await client_cleanup(publishing_client, None) @pytest.mark.skip( reason="This test requires special configuration for client-output-buffer-limit for valkey-server and timeouts seems to vary across platforms and server versions" @@ -2438,11 +2145,9 @@ async def test_pubsub_exact_max_size_message_callback( request, cluster_mode, pub_sub, timeout=10000 ) - # TODO: enable when client closing works result = await publishing_client.publish(message, channel) if cluster_mode: assert result == 1 - # await publishing_client.publish(message, channel) # allow the message to propagate await asyncio.sleep(15) @@ -2453,18 +2158,8 @@ async def test_pubsub_exact_max_size_message_callback( assert callback_messages[0].pattern is None finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command(["UNSUBSCRIBE", channel]) - if listening_client: - await client_cleanup( - listening_client, pub_sub if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup(listening_client, pub_sub if cluster_mode else None) + await client_cleanup(publishing_client, None) @pytest.mark.skip( reason="This test requires special configuration for client-output-buffer-limit for valkey-server and timeouts seems to vary across platforms and server versions" @@ -2510,7 +2205,6 @@ async def test_pubsub_sharded_max_size_message_callback( if await check_if_server_version_lt(publishing_client, "7.0.0"): pytest.skip("Valkey version required >= 7.0.0") - # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True @@ -2518,10 +2212,6 @@ async def test_pubsub_sharded_max_size_message_callback( == 1 ) - # await cast(GlideClusterClient, publishing_client).publish( - # message, channel, sharded=True - # ) - # allow the message to propagate await asyncio.sleep(15) @@ -2532,18 +2222,8 @@ async def test_pubsub_sharded_max_size_message_callback( assert callback_messages[0].pattern is None finally: - # if cluster_mode: - # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # # In cluster mode, we check how many subscriptions received the message - # # So to avoid flakiness, we make sure to unsubscribe from the channels - # await listening_client.custom_command(["UNSUBSCRIBE", channel]) - if listening_client: - await client_cleanup( - listening_client, pub_sub if cluster_mode else None - ) - - if publishing_client: - await client_cleanup(publishing_client, None) + await client_cleanup(listening_client, pub_sub if cluster_mode else None) + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_resp2_raise_an_error(self, request, cluster_mode: bool):