From 181f956e9565fe99835fa913ea8cc2181ea69600 Mon Sep 17 00:00:00 2001 From: Vincent Maurin Date: Thu, 25 Jan 2024 16:59:39 +0100 Subject: [PATCH] Implement KIP-202 : DeleteRecords API When doing stream processing, it is convinient to use "transient" topic : * retention time is infinite * records get deleted when consumed The java kafka streams client is using the deleteRecords of the admin client to perform this operation. It is lacking in aiokafka The KIP reference https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API refs #967 --- aiokafka/admin/__init__.py | 3 +- aiokafka/admin/client.py | 72 ++++++++++++++++- aiokafka/admin/records_to_delete.py | 12 +++ aiokafka/protocol/admin.py | 121 ++++++++++++++++++++++++++++ tests/test_admin.py | 37 ++++++++- 5 files changed, 242 insertions(+), 3 deletions(-) create mode 100644 aiokafka/admin/records_to_delete.py diff --git a/aiokafka/admin/__init__.py b/aiokafka/admin/__init__.py index 61913cc8..75d807b8 100644 --- a/aiokafka/admin/__init__.py +++ b/aiokafka/admin/__init__.py @@ -1,5 +1,6 @@ from .client import AIOKafkaAdminClient from .new_partitions import NewPartitions from .new_topic import NewTopic +from .records_to_delete import RecordsToDelete -__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic"] +__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic", "RecordsToDelete"] diff --git a/aiokafka/admin/client.py b/aiokafka/admin/client.py index d8b08752..2838b238 100644 --- a/aiokafka/admin/client.py +++ b/aiokafka/admin/client.py @@ -6,12 +6,18 @@ from aiokafka import __version__ from aiokafka.client import AIOKafkaClient -from aiokafka.errors import IncompatibleBrokerVersion, for_code +from aiokafka.errors import ( + IncompatibleBrokerVersion, + LeaderNotAvailableError, + NotLeaderForPartitionError, + for_code, +) from aiokafka.protocol.admin import ( AlterConfigsRequest, ApiVersionRequest_v0, CreatePartitionsRequest, CreateTopicsRequest, + DeleteRecordsRequest, DeleteTopicsRequest, DescribeConfigsRequest, DescribeGroupsRequest, @@ -24,6 +30,7 @@ from .config_resource import ConfigResource, ConfigResourceType from .new_topic import NewTopic +from .records_to_delete import RecordsToDelete log = logging.getLogger(__name__) @@ -605,3 +612,66 @@ async def list_consumer_group_offsets( offset_plus_meta = OffsetAndMetadata(offset, metadata) response_dict[tp] = offset_plus_meta return response_dict + + async def delete_records( + self, + records_to_delete: Dict[TopicPartition, RecordsToDelete], + timeout_ms: Optional[int] = None, + ) -> Dict[TopicPartition, int]: + """Delete records from partitions. + + :param records_to_delete: A map of RecordsToDelete for each TopicPartition + :param timeout_ms: Milliseconds to wait for the deletion to complete. + :return: Appropriate version of DeleteRecordsResponse class. + """ + version = self._matching_api_version(DeleteRecordsRequest) + + if self._version_info[MetadataRequest[0].API_KEY] < (0, 10): + metadata_request = MetadataRequest[0]([]) + else: + metadata_request = MetadataRequest[1](None) + + metadata = await self._send_request(metadata_request) + + self._client.cluster.update_metadata(metadata) + + requests = defaultdict(lambda: defaultdict(list)) + responses = {} + + for tp, records in records_to_delete.items(): + leader = self._client.cluster.leader_for_partition(tp) + if leader is None: + raise NotLeaderForPartitionError() + elif leader == -1: + raise LeaderNotAvailableError() + requests[leader][tp.topic].append((tp.partition, records)) + + req_cls = DeleteRecordsRequest[version] + + for leader, delete_request in requests.items(): + request = req_cls( + self._convert_records_to_delete(delete_request), + timeout_ms or self._request_timeout_ms, + {}, + ) + response = await self._client.send(leader, request) + for topic, partitions in response.topics: + for partition_index, low_watermark, error_code in partitions: + if error_code: + err = for_code(error_code) + raise err + responses[TopicPartition(topic, partition_index)] = low_watermark + return responses + + @staticmethod + def _convert_records_to_delete( + records_to_delete: Dict[str, List[Tuple[int, RecordsToDelete]]], + ): + return [ + ( + topic, + [(partition, rec.before_offset, {}) for partition, rec in records], + {}, + ) + for topic, records in records_to_delete.items() + ] diff --git a/aiokafka/admin/records_to_delete.py b/aiokafka/admin/records_to_delete.py new file mode 100644 index 00000000..76edc146 --- /dev/null +++ b/aiokafka/admin/records_to_delete.py @@ -0,0 +1,12 @@ +class RecordsToDelete: + """A class for deleting records on existing topics. + Arguments: + before_offset (int): + delete all the records before the given offset + """ + + def __init__( + self, + before_offset, + ): + self.before_offset = before_offset diff --git a/aiokafka/protocol/admin.py b/aiokafka/protocol/admin.py index 0e5eace6..c130346a 100644 --- a/aiokafka/protocol/admin.py +++ b/aiokafka/protocol/admin.py @@ -1276,3 +1276,124 @@ class ListPartitionReassignmentsRequest_v0(Request): ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0] ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0] + + +class DeleteRecordsResponse_v0(Response): + API_KEY = 21 + API_VERSION = 0 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ( + "topics", + Array( + ("name", String("utf-8")), + ( + "partitions", + Array( + ("partition_index", Int32), + ("low_watermark", Int64), + ("error_code", Int16), + ), + ), + ), + ), + ) + + +class DeleteRecordsResponse_v1(Response): + API_KEY = 21 + API_VERSION = 1 + SCHEMA = DeleteRecordsResponse_v0.SCHEMA + + +class DeleteRecordsResponse_v2(Response): + API_KEY = 21 + API_VERSION = 2 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ( + "topics", + CompactArray( + ("name", CompactString("utf-8")), + ( + "partitions", + CompactArray( + ("partition_index", Int32), + ("low_watermark", Int64), + ("error_code", Int16), + ("tags", TaggedFields), + ), + ), + ("tags", TaggedFields), + ), + ), + ("tags", TaggedFields), + ) + + +class DeleteRecordsRequest_v0(Request): + API_KEY = 21 + API_VERSION = 0 + RESPONSE_TYPE = DeleteRecordsResponse_v0 + SCHEMA = Schema( + ( + "topics", + Array( + ("name", String("utf-8")), + ( + "partitions", + Array( + ("partition_index", Int32), + ("offset", Int64), + ), + ), + ), + ), + ("timeout_ms", Int32), + ) + + +class DeleteRecordsRequest_v1(Request): + API_KEY = 21 + API_VERSION = 1 + RESPONSE_TYPE = DeleteRecordsResponse_v1 + SCHEMA = DeleteRecordsRequest_v0.SCHEMA + + +class DeleteRecordsRequest_v2(Request): + API_KEY = 21 + API_VERSION = 2 + FLEXIBLE_VERSION = True + RESPONSE_TYPE = DeleteRecordsResponse_v2 + SCHEMA = Schema( + ( + "topics", + CompactArray( + ("name", CompactString("utf-8")), + ( + "partitions", + CompactArray( + ("partition_index", Int32), + ("offset", Int64), + ("tags", TaggedFields), + ), + ), + ("tags", TaggedFields), + ), + ), + ("timeout_ms", Int32), + ("tags", TaggedFields), + ) + + +DeleteRecordsRequest = [ + DeleteRecordsRequest_v0, + DeleteRecordsRequest_v1, + DeleteRecordsRequest_v2, +] + +DeleteRecordsResponse = [ + DeleteRecordsResponse_v0, + DeleteRecordsResponse_v1, + DeleteRecordsResponse_v2, +] diff --git a/tests/test_admin.py b/tests/test_admin.py index fcafeff7..a81b9663 100644 --- a/tests/test_admin.py +++ b/tests/test_admin.py @@ -1,6 +1,6 @@ import asyncio -from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic +from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic, RecordsToDelete from aiokafka.admin.config_resource import ConfigResource, ConfigResourceType from aiokafka.consumer import AIOKafkaConsumer from aiokafka.producer import AIOKafkaProducer @@ -201,3 +201,38 @@ async def test_list_consumer_group_offsets(self): assert resp[tp].offset == msg.offset + 1 resp = await admin.list_consumer_group_offsets(group_id, partitions=[tp]) assert resp[tp].offset == msg.offset + 1 + + @kafka_versions(">=1.1.0") + @run_until_complete + async def test_delete_records(self): + admin = await self.create_admin() + + await admin.create_topics([NewTopic(self.topic, 1, 1)]) + + async with AIOKafkaProducer(bootstrap_servers=self.hosts) as producer: + first_message = await producer.send_and_wait( + self.topic, partition=0, value=b"some-message" + ) + await producer.send_and_wait( + self.topic, partition=0, value=b"other-message" + ) + + await admin.delete_records( + { + TopicPartition(self.topic, 0): RecordsToDelete( + before_offset=first_message.offset + 1 + ) + } + ) + + consumer = AIOKafkaConsumer( + self.topic, + bootstrap_servers=self.hosts, + enable_auto_commit=False, + auto_offset_reset="earliest", + ) + await consumer.start() + self.add_cleanup(consumer.stop) + + msg = await consumer.getone() + assert msg.value == b"other-message"