Skip to content

Commit

Permalink
Python: add XGROUP SETID command (#1683)
Browse files Browse the repository at this point in the history
* Python: add XGROUP SETID command

* Add comments to IT

* Fix stream ID arg

* PR suggestions
  • Loading branch information
aaron-congo authored Jun 27, 2024
1 parent 4b12c86 commit 20ba73b
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* Python: Added XREADGROUP command ([#1679](https://github.com/aws/glide-for-redis/pull/1679))
* Python: Added XACK command ([#1681](https://github.com/aws/glide-for-redis/pull/1681))
* Python: Added FLUSHDB command ([#1680](https://github.com/aws/glide-for-redis/pull/1680))
* Python: Added XGROUP SETID command ([#1683](https://github.com/aws/glide-for-redis/pull/1683))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
1 change: 1 addition & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ enum RequestType {
FunctionDump = 196;
FunctionRestore = 197;
XPending = 198;
XGroupSetId = 199;
}

message Command {
Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ pub enum RequestType {
FunctionDump = 196,
FunctionRestore = 197,
XPending = 198,
XGroupSetId = 199,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down Expand Up @@ -417,6 +418,7 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::FunctionDump => RequestType::FunctionDump,
ProtobufRequestType::FunctionRestore => RequestType::FunctionRestore,
ProtobufRequestType::XPending => RequestType::XPending,
ProtobufRequestType::XGroupSetId => RequestType::XGroupSetId,
}
}
}
Expand Down Expand Up @@ -625,6 +627,7 @@ impl RequestType {
RequestType::FunctionDump => Some(get_two_word_command("FUNCTION", "DUMP")),
RequestType::FunctionRestore => Some(get_two_word_command("FUNCTION", "RESTORE")),
RequestType::XPending => Some(cmd("XPENDING")),
RequestType::XGroupSetId => Some(get_two_word_command("XGROUP", "SETID")),
}
}
}
36 changes: 36 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2917,6 +2917,42 @@ async def xgroup_del_consumer(
),
)

async def xgroup_set_id(
self,
key: str,
group_name: str,
stream_id: str,
entries_read_id: Optional[str] = None,
) -> TOK:
"""
Set the last delivered ID for a consumer group.
See https://valkey.io/commands/xgroup-setid for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
stream_id (str): The stream entry ID that should be set as the last delivered ID for the consumer group.
entries_read_id (Optional[str]): An arbitrary ID (that isn't the first ID, last ID, or the zero ID ("0-0"))
used to find out how many entries are between the arbitrary ID (excluding it) and the stream's last
entry. This argument can only be specified if you are using Redis version 7.0.0 or above.
Returns:
TOK: A simple "OK" response.
Examples:
>>> await client.xgroup_set_id("mystream", "mygroup", "0")
OK # The last delivered ID for consumer group "mygroup" was set to 0.
"""
args = [key, group_name, stream_id]
if entries_read_id is not None:
args.extend(["ENTRIESREAD", entries_read_id])

return cast(
TOK,
await self._execute_command(RequestType.XGroupSetId, args),
)

async def xreadgroup(
self,
keys_and_ids: Mapping[str, str],
Expand Down
29 changes: 29 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,35 @@ def xgroup_del_consumer(
RequestType.XGroupDelConsumer, [key, group_name, consumer_name]
)

def xgroup_set_id(
self: TTransaction,
key: str,
group_name: str,
stream_id: str,
entries_read_id: Optional[str] = None,
) -> TTransaction:
"""
Set the last delivered ID for a consumer group.
See https://valkey.io/commands/xgroup-setid for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
stream_id (str): The stream entry ID that should be set as the last delivered ID for the consumer group.
entries_read_id (Optional[str]): An arbitrary ID (that isn't the first ID, last ID, or the zero ID ("0-0"))
used to find out how many entries are between the arbitrary ID (excluding it) and the stream's last
entry. This argument can only be specified if you are using Redis version 7.0.0 or above.
Command response:
TOK: A simple "OK" response.
"""
args = [key, group_name, stream_id]
if entries_read_id is not None:
args.extend(["ENTRIESREAD", entries_read_id])

return self.append_command(RequestType.XGroupSetId, args)

def xreadgroup(
self: TTransaction,
keys_and_ids: Mapping[str, str],
Expand Down
84 changes: 84 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5562,6 +5562,90 @@ async def test_xack(
with pytest.raises(RequestError):
await redis_client.xack(string_key, group_name, [stream_id1_0])

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xgroup_set_id(
self, redis_client: TGlideClient, cluster_mode, protocol, request
):
key = f"{{testKey}}:{get_random_string(10)}"
non_existing_key = f"{{testKey}}:{get_random_string(10)}"
string_key = f"{{testKey}}:{get_random_string(10)}"
group_name = get_random_string(10)
consumer_name = get_random_string(10)
stream_id0 = "0"
stream_id1_0 = "1-0"
stream_id1_1 = "1-1"
stream_id1_2 = "1-2"

# setup: create stream with 3 entries, create consumer group, read entries to add them to the Pending Entries
# List
assert (
await redis_client.xadd(key, [("f0", "v0")], StreamAddOptions(stream_id1_0))
== stream_id1_0
)
assert (
await redis_client.xadd(key, [("f1", "v1")], StreamAddOptions(stream_id1_1))
== stream_id1_1
)
assert (
await redis_client.xadd(key, [("f2", "v2")], StreamAddOptions(stream_id1_2))
== stream_id1_2
)
assert await redis_client.xgroup_create(key, group_name, stream_id0) == OK
assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == {
key: {
stream_id1_0: [["f0", "v0"]],
stream_id1_1: [["f1", "v1"]],
stream_id1_2: [["f2", "v2"]],
}
}
# sanity check: xreadgroup should not return more entries since they're all already in the Pending Entries List
assert (
await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) is None
)

# reset the last delivered ID for the consumer group to "1-1"
# ENTRIESREAD is only supported in Redis version 7.0.0 and above
if await check_if_server_version_lt(redis_client, "7.0.0"):
assert await redis_client.xgroup_set_id(key, group_name, stream_id1_1) == OK
else:
assert (
await redis_client.xgroup_set_id(
key, group_name, stream_id1_1, entries_read_id=stream_id0
)
== OK
)

# the entries_read_id cannot be the first, last, or zero ID. Here we pass the first ID and assert that an
# error is raised.
with pytest.raises(RequestError):
await redis_client.xgroup_set_id(
key, group_name, stream_id1_1, entries_read_id=stream_id1_0
)

# xreadgroup should only return entry 1-2 since we reset the last delivered ID to 1-1
assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == {
key: {
stream_id1_2: [["f2", "v2"]],
}
}

# an error is raised if XGROUP SETID is called with a non-existing key
with pytest.raises(RequestError):
await redis_client.xgroup_set_id(non_existing_key, group_name, stream_id0)

# an error is raised if XGROUP SETID is called with a non-existing group
with pytest.raises(RequestError):
await redis_client.xgroup_set_id(key, "non_existing_group", stream_id0)

# setting the ID to a non-existing ID is allowed
assert await redis_client.xgroup_set_id(key, group_name, "99-99") == OK

# key exists, but it is not a stream
assert await redis_client.set(string_key, "foo") == OK
with pytest.raises(RequestError):
await redis_client.xgroup_set_id(string_key, group_name, stream_id0)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_pfadd(self, redis_client: TGlideClient):
Expand Down
2 changes: 2 additions & 0 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ async def transaction_test(
args.append({key11: {"0-2": [["foo", "bar"]]}})
transaction.xack(key11, group_name1, ["0-2"])
args.append(1)
transaction.xgroup_set_id(key11, group_name1, "0-2")
args.append(OK)
transaction.xgroup_del_consumer(key11, group_name1, consumer)
args.append(0)
transaction.xgroup_destroy(key11, group_name1)
Expand Down

0 comments on commit 20ba73b

Please sign in to comment.