diff --git a/CHANGES/969.feature b/CHANGES/969.feature new file mode 100644 index 00000000..96c0793c --- /dev/null +++ b/CHANGES/969.feature @@ -0,0 +1 @@ +Implement DeleteRecords API (KIP-204) (pr #969 by @vmaurin) diff --git a/aiokafka/admin/__init__.py b/aiokafka/admin/__init__.py index 61913cc8..75d807b8 100644 --- a/aiokafka/admin/__init__.py +++ b/aiokafka/admin/__init__.py @@ -1,5 +1,6 @@ from .client import AIOKafkaAdminClient from .new_partitions import NewPartitions from .new_topic import NewTopic +from .records_to_delete import RecordsToDelete -__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic"] +__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic", "RecordsToDelete"] diff --git a/aiokafka/admin/client.py b/aiokafka/admin/client.py index d8b08752..375f470c 100644 --- a/aiokafka/admin/client.py +++ b/aiokafka/admin/client.py @@ -6,12 +6,18 @@ from aiokafka import __version__ from aiokafka.client import AIOKafkaClient -from aiokafka.errors import IncompatibleBrokerVersion, for_code +from aiokafka.errors import ( + IncompatibleBrokerVersion, + LeaderNotAvailableError, + NotLeaderForPartitionError, + for_code, +) from aiokafka.protocol.admin import ( AlterConfigsRequest, ApiVersionRequest_v0, CreatePartitionsRequest, CreateTopicsRequest, + DeleteRecordsRequest, DeleteTopicsRequest, DescribeConfigsRequest, DescribeGroupsRequest, @@ -24,6 +30,7 @@ from .config_resource import ConfigResource, ConfigResourceType from .new_topic import NewTopic +from .records_to_delete import RecordsToDelete log = logging.getLogger(__name__) @@ -605,3 +612,56 @@ async def list_consumer_group_offsets( offset_plus_meta = OffsetAndMetadata(offset, metadata) response_dict[tp] = offset_plus_meta return response_dict + + async def delete_records( + self, + records_to_delete: Dict[TopicPartition, RecordsToDelete], + timeout_ms: Optional[int] = None, + ) -> Dict[TopicPartition, int]: + """Delete records from partitions. + + :param records_to_delete: A map of RecordsToDelete for each TopicPartition + :param timeout_ms: Milliseconds to wait for the deletion to complete. + :return: Appropriate version of DeleteRecordsResponse class. + """ + version = self._matching_api_version(DeleteRecordsRequest) + + metadata = await self._get_cluster_metadata() + + self._client.cluster.update_metadata(metadata) + + requests = defaultdict(lambda: defaultdict(list)) + responses = {} + + for tp, records in records_to_delete.items(): + leader = self._client.cluster.leader_for_partition(tp) + if leader is None: + raise NotLeaderForPartitionError() + elif leader == -1: + raise LeaderNotAvailableError() + requests[leader][tp.topic].append((tp.partition, records)) + + req_cls = DeleteRecordsRequest[version] + + for leader, delete_request in requests.items(): + request = req_cls( + self._convert_records_to_delete(delete_request), + timeout_ms or self._request_timeout_ms, + ) + response = await self._client.send(leader, request) + for topic, partitions in response.topics: + for partition_index, low_watermark, error_code in partitions: + if error_code: + err = for_code(error_code) + raise err + responses[TopicPartition(topic, partition_index)] = low_watermark + return responses + + @staticmethod + def _convert_records_to_delete( + records_to_delete: Dict[str, List[Tuple[int, RecordsToDelete]]], + ): + return [ + (topic, [(partition, rec.before_offset) for partition, rec in records]) + for topic, records in records_to_delete.items() + ] diff --git a/aiokafka/admin/records_to_delete.py b/aiokafka/admin/records_to_delete.py new file mode 100644 index 00000000..76edc146 --- /dev/null +++ b/aiokafka/admin/records_to_delete.py @@ -0,0 +1,12 @@ +class RecordsToDelete: + """A class for deleting records on existing topics. + Arguments: + before_offset (int): + delete all the records before the given offset + """ + + def __init__( + self, + before_offset, + ): + self.before_offset = before_offset diff --git a/aiokafka/cluster.py b/aiokafka/cluster.py index e0cc1cf7..08b59d2d 100644 --- a/aiokafka/cluster.py +++ b/aiokafka/cluster.py @@ -254,7 +254,8 @@ def update_metadata(self, metadata): error_type = Errors.for_code(error_code) if error_type is Errors.NoError: _new_partitions[topic] = {} - for p_error, partition, leader, replicas, isr in partitions: + # Starting with v5, MetadataResponse contains more than 5 fields + for p_error, partition, leader, replicas, isr, *_ in partitions: _new_partitions[topic][partition] = PartitionMetadata( topic=topic, partition=partition, diff --git a/aiokafka/protocol/admin.py b/aiokafka/protocol/admin.py index 0e5eace6..133c9a2a 100644 --- a/aiokafka/protocol/admin.py +++ b/aiokafka/protocol/admin.py @@ -1276,3 +1276,126 @@ class ListPartitionReassignmentsRequest_v0(Request): ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0] ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0] + + +class DeleteRecordsResponse_v0(Response): + API_KEY = 21 + API_VERSION = 0 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ( + "topics", + Array( + ("name", String("utf-8")), + ( + "partitions", + Array( + ("partition_index", Int32), + ("low_watermark", Int64), + ("error_code", Int16), + ), + ), + ), + ), + ) + + +class DeleteRecordsResponse_v1(Response): + API_KEY = 21 + API_VERSION = 1 + SCHEMA = DeleteRecordsResponse_v0.SCHEMA + + +class DeleteRecordsResponse_v2(Response): + API_KEY = 21 + API_VERSION = 2 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ( + "topics", + CompactArray( + ("name", CompactString("utf-8")), + ( + "partitions", + CompactArray( + ("partition_index", Int32), + ("low_watermark", Int64), + ("error_code", Int16), + ("tags", TaggedFields), + ), + ), + ("tags", TaggedFields), + ), + ), + ("tags", TaggedFields), + ) + + +class DeleteRecordsRequest_v0(Request): + API_KEY = 21 + API_VERSION = 0 + RESPONSE_TYPE = DeleteRecordsResponse_v0 + SCHEMA = Schema( + ( + "topics", + Array( + ("name", String("utf-8")), + ( + "partitions", + Array( + ("partition_index", Int32), + ("offset", Int64), + ), + ), + ), + ), + ("timeout_ms", Int32), + ) + + +class DeleteRecordsRequest_v1(Request): + API_KEY = 21 + API_VERSION = 1 + RESPONSE_TYPE = DeleteRecordsResponse_v1 + SCHEMA = DeleteRecordsRequest_v0.SCHEMA + + +class DeleteRecordsRequest_v2(Request): + API_KEY = 21 + API_VERSION = 2 + FLEXIBLE_VERSION = True + RESPONSE_TYPE = DeleteRecordsResponse_v2 + SCHEMA = Schema( + ( + "topics", + CompactArray( + ("name", CompactString("utf-8")), + ( + "partitions", + CompactArray( + ("partition_index", Int32), + ("offset", Int64), + ("tags", TaggedFields), + ), + ), + ("tags", TaggedFields), + ), + ), + ("timeout_ms", Int32), + ("tags", TaggedFields), + ) + + +DeleteRecordsRequest = [ + DeleteRecordsRequest_v0, + DeleteRecordsRequest_v1, + # FIXME: We have some problems with `TaggedFields` + # DeleteRecordsRequest_v2, +] + +DeleteRecordsResponse = [ + DeleteRecordsResponse_v0, + DeleteRecordsResponse_v1, + # FIXME: We have some problems with `TaggedFields` + # DeleteRecordsResponse_v2, +] diff --git a/aiokafka/protocol/types.py b/aiokafka/protocol/types.py index f1e106c5..cc20c6bd 100644 --- a/aiokafka/protocol/types.py +++ b/aiokafka/protocol/types.py @@ -313,6 +313,7 @@ def encode(self, value): return UnsignedVarInt32.encode(len(value) + 1) + value +# FIXME: TaggedFields doesn't seem to work properly so they should be avoided class TaggedFields(AbstractType): @classmethod def decode(cls, data): diff --git a/tests/test_admin.py b/tests/test_admin.py index fcafeff7..a81b9663 100644 --- a/tests/test_admin.py +++ b/tests/test_admin.py @@ -1,6 +1,6 @@ import asyncio -from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic +from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic, RecordsToDelete from aiokafka.admin.config_resource import ConfigResource, ConfigResourceType from aiokafka.consumer import AIOKafkaConsumer from aiokafka.producer import AIOKafkaProducer @@ -201,3 +201,38 @@ async def test_list_consumer_group_offsets(self): assert resp[tp].offset == msg.offset + 1 resp = await admin.list_consumer_group_offsets(group_id, partitions=[tp]) assert resp[tp].offset == msg.offset + 1 + + @kafka_versions(">=1.1.0") + @run_until_complete + async def test_delete_records(self): + admin = await self.create_admin() + + await admin.create_topics([NewTopic(self.topic, 1, 1)]) + + async with AIOKafkaProducer(bootstrap_servers=self.hosts) as producer: + first_message = await producer.send_and_wait( + self.topic, partition=0, value=b"some-message" + ) + await producer.send_and_wait( + self.topic, partition=0, value=b"other-message" + ) + + await admin.delete_records( + { + TopicPartition(self.topic, 0): RecordsToDelete( + before_offset=first_message.offset + 1 + ) + } + ) + + consumer = AIOKafkaConsumer( + self.topic, + bootstrap_servers=self.hosts, + enable_auto_commit=False, + auto_offset_reset="earliest", + ) + await consumer.start() + self.add_cleanup(consumer.stop) + + msg = await consumer.getone() + assert msg.value == b"other-message"