Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement KIP-204 : DeleteRecords API #969

Merged
merged 6 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion aiokafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .client import AIOKafkaAdminClient
from .new_partitions import NewPartitions
from .new_topic import NewTopic
from .records_to_delete import RecordsToDelete

__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic"]
__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic", "RecordsToDelete"]
67 changes: 66 additions & 1 deletion aiokafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@

from aiokafka import __version__
from aiokafka.client import AIOKafkaClient
from aiokafka.errors import IncompatibleBrokerVersion, for_code
from aiokafka.errors import (
IncompatibleBrokerVersion,
LeaderNotAvailableError,
NotLeaderForPartitionError,
for_code,
)
from aiokafka.protocol.admin import (
AlterConfigsRequest,
ApiVersionRequest_v0,
CreatePartitionsRequest,
CreateTopicsRequest,
DeleteRecordsRequest,
DeleteTopicsRequest,
DescribeConfigsRequest,
DescribeGroupsRequest,
Expand All @@ -24,6 +30,7 @@

from .config_resource import ConfigResource, ConfigResourceType
from .new_topic import NewTopic
from .records_to_delete import RecordsToDelete

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -605,3 +612,61 @@
offset_plus_meta = OffsetAndMetadata(offset, metadata)
response_dict[tp] = offset_plus_meta
return response_dict

async def delete_records(
self,
records_to_delete: Dict[TopicPartition, RecordsToDelete],
timeout_ms: Optional[int] = None,
) -> Dict[TopicPartition, int]:
"""Delete records from partitions.

:param records_to_delete: A map of RecordsToDelete for each TopicPartition
:param timeout_ms: Milliseconds to wait for the deletion to complete.
:return: Appropriate version of DeleteRecordsResponse class.
"""
version = self._matching_api_version(DeleteRecordsRequest)

Check warning on line 627 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L627

Added line #L627 was not covered by tests

metadata = await self._get_cluster_metadata()

Check warning on line 629 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L629

Added line #L629 was not covered by tests

self._client.cluster.update_metadata(metadata)

Check warning on line 631 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L631

Added line #L631 was not covered by tests

requests = defaultdict(lambda: defaultdict(list))
responses = {}

Check warning on line 634 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L634

Added line #L634 was not covered by tests

for tp, records in records_to_delete.items():
leader = self._client.cluster.leader_for_partition(tp)

Check warning on line 637 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L637

Added line #L637 was not covered by tests
ods marked this conversation as resolved.
Show resolved Hide resolved
if leader is None:
raise NotLeaderForPartitionError()

Check warning on line 639 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L639

Added line #L639 was not covered by tests
elif leader == -1:
raise LeaderNotAvailableError()
requests[leader][tp.topic].append((tp.partition, records))

Check warning on line 642 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L641-L642

Added lines #L641 - L642 were not covered by tests

req_cls = DeleteRecordsRequest[version]

Check warning on line 644 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L644

Added line #L644 was not covered by tests

for leader, delete_request in requests.items():
request = req_cls(

Check warning on line 647 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L647

Added line #L647 was not covered by tests
self._convert_records_to_delete(delete_request),
timeout_ms or self._request_timeout_ms,
{},
)
response = await self._client.send(leader, request)

Check warning on line 652 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L652

Added line #L652 was not covered by tests
for topic, partitions in response.topics:
for partition_index, low_watermark, error_code in partitions:
if error_code:
err = for_code(error_code)
raise err
responses[TopicPartition(topic, partition_index)] = low_watermark
return responses

Check warning on line 659 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L656-L659

Added lines #L656 - L659 were not covered by tests

@staticmethod
def _convert_records_to_delete(
records_to_delete: Dict[str, List[Tuple[int, RecordsToDelete]]],
):
return [
(
topic,
[(partition, rec.before_offset, {}) for partition, rec in records],
{},
vmaurin marked this conversation as resolved.
Show resolved Hide resolved
)
for topic, records in records_to_delete.items()
]
12 changes: 12 additions & 0 deletions aiokafka/admin/records_to_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class RecordsToDelete:
"""A class for deleting records on existing topics.
Arguments:
before_offset (int):
delete all the records before the given offset
"""

def __init__(
self,
before_offset,
):
self.before_offset = before_offset

Check warning on line 12 in aiokafka/admin/records_to_delete.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/records_to_delete.py#L12

Added line #L12 was not covered by tests
3 changes: 2 additions & 1 deletion aiokafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ def update_metadata(self, metadata):
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
_new_partitions[topic] = {}
for p_error, partition, leader, replicas, isr in partitions:
# Starting with v5, MetadataResponse contains more than 5 fields
for p_error, partition, leader, replicas, isr, *_ in partitions:
_new_partitions[topic][partition] = PartitionMetadata(
topic=topic,
partition=partition,
Expand Down
121 changes: 121 additions & 0 deletions aiokafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1276,3 +1276,124 @@ class ListPartitionReassignmentsRequest_v0(Request):
ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]

ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]


class DeleteRecordsResponse_v0(Response):
API_KEY = 21
API_VERSION = 0
SCHEMA = Schema(
("throttle_time_ms", Int32),
(
"topics",
Array(
("name", String("utf-8")),
(
"partitions",
Array(
("partition_index", Int32),
("low_watermark", Int64),
("error_code", Int16),
),
),
),
),
)


class DeleteRecordsResponse_v1(Response):
API_KEY = 21
API_VERSION = 1
SCHEMA = DeleteRecordsResponse_v0.SCHEMA


class DeleteRecordsResponse_v2(Response):
API_KEY = 21
API_VERSION = 2
SCHEMA = Schema(
("throttle_time_ms", Int32),
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
(
"partitions",
CompactArray(
("partition_index", Int32),
("low_watermark", Int64),
("error_code", Int16),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
)


class DeleteRecordsRequest_v0(Request):
API_KEY = 21
API_VERSION = 0
RESPONSE_TYPE = DeleteRecordsResponse_v0
SCHEMA = Schema(
(
"topics",
Array(
("name", String("utf-8")),
(
"partitions",
Array(
("partition_index", Int32),
("offset", Int64),
),
),
),
),
("timeout_ms", Int32),
)


class DeleteRecordsRequest_v1(Request):
API_KEY = 21
API_VERSION = 1
RESPONSE_TYPE = DeleteRecordsResponse_v1
SCHEMA = DeleteRecordsRequest_v0.SCHEMA


class DeleteRecordsRequest_v2(Request):
API_KEY = 21
API_VERSION = 2
FLEXIBLE_VERSION = True
RESPONSE_TYPE = DeleteRecordsResponse_v2
SCHEMA = Schema(
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
(
"partitions",
CompactArray(
("partition_index", Int32),
("offset", Int64),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
),
),
("timeout_ms", Int32),
("tags", TaggedFields),
)


DeleteRecordsRequest = [
DeleteRecordsRequest_v0,
DeleteRecordsRequest_v1,
DeleteRecordsRequest_v2,
]

DeleteRecordsResponse = [
DeleteRecordsResponse_v0,
DeleteRecordsResponse_v1,
DeleteRecordsResponse_v2,
]
vmaurin marked this conversation as resolved.
Show resolved Hide resolved
37 changes: 36 additions & 1 deletion tests/test_admin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio

from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic
from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic, RecordsToDelete
from aiokafka.admin.config_resource import ConfigResource, ConfigResourceType
from aiokafka.consumer import AIOKafkaConsumer
from aiokafka.producer import AIOKafkaProducer
Expand Down Expand Up @@ -201,3 +201,38 @@
assert resp[tp].offset == msg.offset + 1
resp = await admin.list_consumer_group_offsets(group_id, partitions=[tp])
assert resp[tp].offset == msg.offset + 1

@kafka_versions(">=1.1.0")
@run_until_complete
async def test_delete_records(self):
admin = await self.create_admin()

Check warning on line 208 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L208

Added line #L208 was not covered by tests

await admin.create_topics([NewTopic(self.topic, 1, 1)])

Check warning on line 210 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L210

Added line #L210 was not covered by tests

async with AIOKafkaProducer(bootstrap_servers=self.hosts) as producer:
first_message = await producer.send_and_wait(

Check warning on line 213 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L213

Added line #L213 was not covered by tests
self.topic, partition=0, value=b"some-message"
)
await producer.send_and_wait(

Check warning on line 216 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L216

Added line #L216 was not covered by tests
self.topic, partition=0, value=b"other-message"
)

await admin.delete_records(

Check warning on line 220 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L220

Added line #L220 was not covered by tests
{
TopicPartition(self.topic, 0): RecordsToDelete(
before_offset=first_message.offset + 1
)
}
)

consumer = AIOKafkaConsumer(

Check warning on line 228 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L228

Added line #L228 was not covered by tests
self.topic,
bootstrap_servers=self.hosts,
enable_auto_commit=False,
auto_offset_reset="earliest",
)
await consumer.start()
self.add_cleanup(consumer.stop)

Check warning on line 235 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L234-L235

Added lines #L234 - L235 were not covered by tests

msg = await consumer.getone()
assert msg.value == b"other-message"

Check warning on line 238 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L237-L238

Added lines #L237 - L238 were not covered by tests
Loading