From 82695b0c961e5f8328a65f52de208ba195dbc1f9 Mon Sep 17 00:00:00 2001 From: Vincent Maurin Date: Mon, 29 Jan 2024 20:40:28 +0100 Subject: [PATCH] Implement KIP-204 : DeleteRecords API (#969) * Implement KIP-202 : DeleteRecords API When doing stream processing, it is convinient to use "transient" topic : * retention time is infinite * records get deleted when consumed The java kafka streams client is using the deleteRecords of the admin client to perform this operation. It is lacking in aiokafka The KIP reference https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API refs #967 * Use common method to get metadata * Explain the unpacking catch all * Remove usage of TaggedFields TaggedFields doesn't seem to work properly at the moment. Maybe they should be replaced by an implementation closer to the java client with their "flexibleVersions" * Fix linting errors (format) * Add change log --------- Co-authored-by: Vincent Maurin Co-authored-by: Denis Otkidach --- CHANGES/969.feature | 1 + aiokafka/admin/__init__.py | 3 +- aiokafka/admin/client.py | 62 +++++++++++++- aiokafka/admin/records_to_delete.py | 12 +++ aiokafka/cluster.py | 3 +- aiokafka/protocol/admin.py | 123 ++++++++++++++++++++++++++++ aiokafka/protocol/types.py | 1 + tests/test_admin.py | 37 ++++++++- 8 files changed, 238 insertions(+), 4 deletions(-) create mode 100644 CHANGES/969.feature create mode 100644 aiokafka/admin/records_to_delete.py diff --git a/CHANGES/969.feature b/CHANGES/969.feature new file mode 100644 index 00000000..96c0793c --- /dev/null +++ b/CHANGES/969.feature @@ -0,0 +1 @@ +Implement DeleteRecords API (KIP-204) (pr #969 by @vmaurin) diff --git a/aiokafka/admin/__init__.py b/aiokafka/admin/__init__.py index 61913cc8..75d807b8 100644 --- a/aiokafka/admin/__init__.py +++ b/aiokafka/admin/__init__.py @@ -1,5 +1,6 @@ from .client import AIOKafkaAdminClient from .new_partitions import NewPartitions from .new_topic import NewTopic +from .records_to_delete import RecordsToDelete -__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic"] +__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic", "RecordsToDelete"] diff --git a/aiokafka/admin/client.py b/aiokafka/admin/client.py index d8b08752..375f470c 100644 --- a/aiokafka/admin/client.py +++ b/aiokafka/admin/client.py @@ -6,12 +6,18 @@ from aiokafka import __version__ from aiokafka.client import AIOKafkaClient -from aiokafka.errors import IncompatibleBrokerVersion, for_code +from aiokafka.errors import ( + IncompatibleBrokerVersion, + LeaderNotAvailableError, + NotLeaderForPartitionError, + for_code, +) from aiokafka.protocol.admin import ( AlterConfigsRequest, ApiVersionRequest_v0, CreatePartitionsRequest, CreateTopicsRequest, + DeleteRecordsRequest, DeleteTopicsRequest, DescribeConfigsRequest, DescribeGroupsRequest, @@ -24,6 +30,7 @@ from .config_resource import ConfigResource, ConfigResourceType from .new_topic import NewTopic +from .records_to_delete import RecordsToDelete log = logging.getLogger(__name__) @@ -605,3 +612,56 @@ async def list_consumer_group_offsets( offset_plus_meta = OffsetAndMetadata(offset, metadata) response_dict[tp] = offset_plus_meta return response_dict + + async def delete_records( + self, + records_to_delete: Dict[TopicPartition, RecordsToDelete], + timeout_ms: Optional[int] = None, + ) -> Dict[TopicPartition, int]: + """Delete records from partitions. + + :param records_to_delete: A map of RecordsToDelete for each TopicPartition + :param timeout_ms: Milliseconds to wait for the deletion to complete. + :return: Appropriate version of DeleteRecordsResponse class. + """ + version = self._matching_api_version(DeleteRecordsRequest) + + metadata = await self._get_cluster_metadata() + + self._client.cluster.update_metadata(metadata) + + requests = defaultdict(lambda: defaultdict(list)) + responses = {} + + for tp, records in records_to_delete.items(): + leader = self._client.cluster.leader_for_partition(tp) + if leader is None: + raise NotLeaderForPartitionError() + elif leader == -1: + raise LeaderNotAvailableError() + requests[leader][tp.topic].append((tp.partition, records)) + + req_cls = DeleteRecordsRequest[version] + + for leader, delete_request in requests.items(): + request = req_cls( + self._convert_records_to_delete(delete_request), + timeout_ms or self._request_timeout_ms, + ) + response = await self._client.send(leader, request) + for topic, partitions in response.topics: + for partition_index, low_watermark, error_code in partitions: + if error_code: + err = for_code(error_code) + raise err + responses[TopicPartition(topic, partition_index)] = low_watermark + return responses + + @staticmethod + def _convert_records_to_delete( + records_to_delete: Dict[str, List[Tuple[int, RecordsToDelete]]], + ): + return [ + (topic, [(partition, rec.before_offset) for partition, rec in records]) + for topic, records in records_to_delete.items() + ] diff --git a/aiokafka/admin/records_to_delete.py b/aiokafka/admin/records_to_delete.py new file mode 100644 index 00000000..76edc146 --- /dev/null +++ b/aiokafka/admin/records_to_delete.py @@ -0,0 +1,12 @@ +class RecordsToDelete: + """A class for deleting records on existing topics. + Arguments: + before_offset (int): + delete all the records before the given offset + """ + + def __init__( + self, + before_offset, + ): + self.before_offset = before_offset diff --git a/aiokafka/cluster.py b/aiokafka/cluster.py index e0cc1cf7..08b59d2d 100644 --- a/aiokafka/cluster.py +++ b/aiokafka/cluster.py @@ -254,7 +254,8 @@ def update_metadata(self, metadata): error_type = Errors.for_code(error_code) if error_type is Errors.NoError: _new_partitions[topic] = {} - for p_error, partition, leader, replicas, isr in partitions: + # Starting with v5, MetadataResponse contains more than 5 fields + for p_error, partition, leader, replicas, isr, *_ in partitions: _new_partitions[topic][partition] = PartitionMetadata( topic=topic, partition=partition, diff --git a/aiokafka/protocol/admin.py b/aiokafka/protocol/admin.py index 0e5eace6..133c9a2a 100644 --- a/aiokafka/protocol/admin.py +++ b/aiokafka/protocol/admin.py @@ -1276,3 +1276,126 @@ class ListPartitionReassignmentsRequest_v0(Request): ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0] ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0] + + +class DeleteRecordsResponse_v0(Response): + API_KEY = 21 + API_VERSION = 0 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ( + "topics", + Array( + ("name", String("utf-8")), + ( + "partitions", + Array( + ("partition_index", Int32), + ("low_watermark", Int64), + ("error_code", Int16), + ), + ), + ), + ), + ) + + +class DeleteRecordsResponse_v1(Response): + API_KEY = 21 + API_VERSION = 1 + SCHEMA = DeleteRecordsResponse_v0.SCHEMA + + +class DeleteRecordsResponse_v2(Response): + API_KEY = 21 + API_VERSION = 2 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ( + "topics", + CompactArray( + ("name", CompactString("utf-8")), + ( + "partitions", + CompactArray( + ("partition_index", Int32), + ("low_watermark", Int64), + ("error_code", Int16), + ("tags", TaggedFields), + ), + ), + ("tags", TaggedFields), + ), + ), + ("tags", TaggedFields), + ) + + +class DeleteRecordsRequest_v0(Request): + API_KEY = 21 + API_VERSION = 0 + RESPONSE_TYPE = DeleteRecordsResponse_v0 + SCHEMA = Schema( + ( + "topics", + Array( + ("name", String("utf-8")), + ( + "partitions", + Array( + ("partition_index", Int32), + ("offset", Int64), + ), + ), + ), + ), + ("timeout_ms", Int32), + ) + + +class DeleteRecordsRequest_v1(Request): + API_KEY = 21 + API_VERSION = 1 + RESPONSE_TYPE = DeleteRecordsResponse_v1 + SCHEMA = DeleteRecordsRequest_v0.SCHEMA + + +class DeleteRecordsRequest_v2(Request): + API_KEY = 21 + API_VERSION = 2 + FLEXIBLE_VERSION = True + RESPONSE_TYPE = DeleteRecordsResponse_v2 + SCHEMA = Schema( + ( + "topics", + CompactArray( + ("name", CompactString("utf-8")), + ( + "partitions", + CompactArray( + ("partition_index", Int32), + ("offset", Int64), + ("tags", TaggedFields), + ), + ), + ("tags", TaggedFields), + ), + ), + ("timeout_ms", Int32), + ("tags", TaggedFields), + ) + + +DeleteRecordsRequest = [ + DeleteRecordsRequest_v0, + DeleteRecordsRequest_v1, + # FIXME: We have some problems with `TaggedFields` + # DeleteRecordsRequest_v2, +] + +DeleteRecordsResponse = [ + DeleteRecordsResponse_v0, + DeleteRecordsResponse_v1, + # FIXME: We have some problems with `TaggedFields` + # DeleteRecordsResponse_v2, +] diff --git a/aiokafka/protocol/types.py b/aiokafka/protocol/types.py index f1e106c5..cc20c6bd 100644 --- a/aiokafka/protocol/types.py +++ b/aiokafka/protocol/types.py @@ -313,6 +313,7 @@ def encode(self, value): return UnsignedVarInt32.encode(len(value) + 1) + value +# FIXME: TaggedFields doesn't seem to work properly so they should be avoided class TaggedFields(AbstractType): @classmethod def decode(cls, data): diff --git a/tests/test_admin.py b/tests/test_admin.py index fcafeff7..a81b9663 100644 --- a/tests/test_admin.py +++ b/tests/test_admin.py @@ -1,6 +1,6 @@ import asyncio -from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic +from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic, RecordsToDelete from aiokafka.admin.config_resource import ConfigResource, ConfigResourceType from aiokafka.consumer import AIOKafkaConsumer from aiokafka.producer import AIOKafkaProducer @@ -201,3 +201,38 @@ async def test_list_consumer_group_offsets(self): assert resp[tp].offset == msg.offset + 1 resp = await admin.list_consumer_group_offsets(group_id, partitions=[tp]) assert resp[tp].offset == msg.offset + 1 + + @kafka_versions(">=1.1.0") + @run_until_complete + async def test_delete_records(self): + admin = await self.create_admin() + + await admin.create_topics([NewTopic(self.topic, 1, 1)]) + + async with AIOKafkaProducer(bootstrap_servers=self.hosts) as producer: + first_message = await producer.send_and_wait( + self.topic, partition=0, value=b"some-message" + ) + await producer.send_and_wait( + self.topic, partition=0, value=b"other-message" + ) + + await admin.delete_records( + { + TopicPartition(self.topic, 0): RecordsToDelete( + before_offset=first_message.offset + 1 + ) + } + ) + + consumer = AIOKafkaConsumer( + self.topic, + bootstrap_servers=self.hosts, + enable_auto_commit=False, + auto_offset_reset="earliest", + ) + await consumer.start() + self.add_cleanup(consumer.stop) + + msg = await consumer.getone() + assert msg.value == b"other-message"