Skip to content

Commit

Permalink
Python: command WAIT
Browse files Browse the repository at this point in the history
  • Loading branch information
TJ Zhang committed Jun 28, 2024
1 parent b231a72 commit ee0a769
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 0 deletions.
1 change: 1 addition & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ enum RequestType {
FunctionRestore = 197;
XPending = 198;
XGroupSetId = 199;
Wait = 200;
}

message Command {
Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ pub enum RequestType {
FunctionRestore = 197,
XPending = 198,
XGroupSetId = 199,
Wait = 200,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down Expand Up @@ -419,6 +420,7 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::FunctionRestore => RequestType::FunctionRestore,
ProtobufRequestType::XPending => RequestType::XPending,
ProtobufRequestType::XGroupSetId => RequestType::XGroupSetId,
ProtobufRequestType::Wait => RequestType::Wait,
}
}
}
Expand Down Expand Up @@ -628,6 +630,7 @@ impl RequestType {
RequestType::FunctionRestore => Some(get_two_word_command("FUNCTION", "RESTORE")),
RequestType::XPending => Some(cmd("XPENDING")),
RequestType::XGroupSetId => Some(get_two_word_command("XGROUP", "SETID")),
RequestType::Wait => Some(cmd("WAIT")),
}
}
}
30 changes: 30 additions & 0 deletions python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,3 +658,33 @@ async def lolwut(
TClusterResponse[str],
await self._execute_command(RequestType.Lolwut, args, route),
)

async def wait(
self,
numreplicas: int,
timeout: int,
route: Optional[Route] = None,
) -> int:
"""
Blocks the current client until all the previous write commands are successfully transferred
and acknowledged by at least `numreplicas` of replicas. If `timeout` is
reached, the command returns even if the specified number of replicas were not yet reached.
See https://valkey.io/commands/wait for more details.
Args:
numreplicas (int): The number of replicas to reach.
timeout (int): The timeout value specified in milliseconds.
Returns:
str: The number of replicas reached by all the writes performed in the context of the current connection.
Examples:
>>> await client.set("key", "value");
>>> await client.wait(1, 1000);
"""
args = [str(numreplicas), str(timeout)]
return cast(
int,
await self._execute_command(RequestType.Wait, args, route),
)
29 changes: 29 additions & 0 deletions python/python/glide/async_commands/standalone_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,32 @@ async def lolwut(
str,
await self._execute_command(RequestType.Lolwut, args),
)

async def wait(
self,
numreplicas: int,
timeout: int,
) -> int:
"""
Blocks the current client until all the previous write commands are successfully transferred
and acknowledged by at least `numreplicas` of replicas. If `timeout` is
reached, the command returns even if the specified number of replicas were not yet reached.
See https://valkey.io/commands/wait for more details.
Args:
numreplicas (int): The number of replicas to reach.
timeout (int): The timeout value specified in milliseconds.
Returns:
str: The number of replicas reached by all the writes performed in the context of the current connection.
Examples:
>>> await client.set("key", "value");
>>> await client.wait(1, 1000);
"""
args = [str(numreplicas), str(timeout)]
return cast(
int,
await self._execute_command(RequestType.Wait, args),
)
22 changes: 22 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -4135,4 +4135,26 @@ def copy(

return self.append_command(RequestType.Copy, args)

def wait(
self: TTransaction,
numreplicas: int,
timeout: int,
) -> TTransaction:
"""
Blocks the current client until all the previous write commands are successfully transferred
and acknowledged by at least `numreplicas` of replicas. If `timeout` is
reached, the command returns even if the specified number of replicas were not yet reached.
See https://valkey.io/commands/wait for more details.
Args:
numreplicas (int): The number of replicas to reach.
timeout (int): The timeout value specified in milliseconds.
Command Response:
str: The number of replicas reached by all the writes performed in the context of the current connection.
"""
args = [str(numreplicas), str(timeout)]
return self.append_command(RequestType.Wait, args)

# TODO: add all CLUSTER commands
9 changes: 9 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6970,6 +6970,15 @@ async def test_copy_database(self, redis_client: GlideClient):
finally:
assert await redis_client.select(0) == OK

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_wait(self, redis_client: TGlideClient):
key = f"{{key}}-1{get_random_string(5)}"
value = get_random_string(5)

await redis_client.set(key, value)
assert await redis_client.wait(1, 1000) >= 0

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_lolwut(self, redis_client: TGlideClient):
Expand Down
2 changes: 2 additions & 0 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ async def transaction_test(
args.append("one")
transaction.srandmember_count(key7, 1)
args.append(["one"])
transaction.wait(1, 1000)
args.append(0)
transaction.flushall(FlushMode.ASYNC)
args.append(OK)
transaction.flushall()
Expand Down

0 comments on commit ee0a769

Please sign in to comment.