Skip to content

Commit

Permalink
Implement KIP-202 : DeleteRecords API
Browse files Browse the repository at this point in the history
When doing stream processing, it is convinient to use "transient" topic
:
* retention time is infinite
* records get deleted when consumed

The java kafka streams client is using the deleteRecords of the admin
client to perform this operation. It is lacking in aiokafka

The KIP reference https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API

refs #967
  • Loading branch information
Vincent Maurin committed Jan 25, 2024
1 parent f8d0d15 commit 181f956
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 3 deletions.
3 changes: 2 additions & 1 deletion aiokafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .client import AIOKafkaAdminClient
from .new_partitions import NewPartitions
from .new_topic import NewTopic
from .records_to_delete import RecordsToDelete

__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic"]
__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic", "RecordsToDelete"]
72 changes: 71 additions & 1 deletion aiokafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@

from aiokafka import __version__
from aiokafka.client import AIOKafkaClient
from aiokafka.errors import IncompatibleBrokerVersion, for_code
from aiokafka.errors import (
IncompatibleBrokerVersion,
LeaderNotAvailableError,
NotLeaderForPartitionError,
for_code,
)
from aiokafka.protocol.admin import (
AlterConfigsRequest,
ApiVersionRequest_v0,
CreatePartitionsRequest,
CreateTopicsRequest,
DeleteRecordsRequest,
DeleteTopicsRequest,
DescribeConfigsRequest,
DescribeGroupsRequest,
Expand All @@ -24,6 +30,7 @@

from .config_resource import ConfigResource, ConfigResourceType
from .new_topic import NewTopic
from .records_to_delete import RecordsToDelete

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -605,3 +612,66 @@ async def list_consumer_group_offsets(
offset_plus_meta = OffsetAndMetadata(offset, metadata)
response_dict[tp] = offset_plus_meta
return response_dict

async def delete_records(
self,
records_to_delete: Dict[TopicPartition, RecordsToDelete],
timeout_ms: Optional[int] = None,
) -> Dict[TopicPartition, int]:
"""Delete records from partitions.
:param records_to_delete: A map of RecordsToDelete for each TopicPartition
:param timeout_ms: Milliseconds to wait for the deletion to complete.
:return: Appropriate version of DeleteRecordsResponse class.
"""
version = self._matching_api_version(DeleteRecordsRequest)

Check warning on line 627 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L627

Added line #L627 was not covered by tests

if self._version_info[MetadataRequest[0].API_KEY] < (0, 10):
metadata_request = MetadataRequest[0]([])

Check warning on line 630 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L630

Added line #L630 was not covered by tests
else:
metadata_request = MetadataRequest[1](None)

Check warning on line 632 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L632

Added line #L632 was not covered by tests

metadata = await self._send_request(metadata_request)

Check warning on line 634 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L634

Added line #L634 was not covered by tests

self._client.cluster.update_metadata(metadata)

Check warning on line 636 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L636

Added line #L636 was not covered by tests

requests = defaultdict(lambda: defaultdict(list))
responses = {}

Check warning on line 639 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L639

Added line #L639 was not covered by tests

for tp, records in records_to_delete.items():
leader = self._client.cluster.leader_for_partition(tp)

Check warning on line 642 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L642

Added line #L642 was not covered by tests
if leader is None:
raise NotLeaderForPartitionError()

Check warning on line 644 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L644

Added line #L644 was not covered by tests
elif leader == -1:
raise LeaderNotAvailableError()
requests[leader][tp.topic].append((tp.partition, records))

Check warning on line 647 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L646-L647

Added lines #L646 - L647 were not covered by tests

req_cls = DeleteRecordsRequest[version]

Check warning on line 649 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L649

Added line #L649 was not covered by tests

for leader, delete_request in requests.items():
request = req_cls(

Check warning on line 652 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L652

Added line #L652 was not covered by tests
self._convert_records_to_delete(delete_request),
timeout_ms or self._request_timeout_ms,
{},
)
response = await self._client.send(leader, request)

Check warning on line 657 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L657

Added line #L657 was not covered by tests
for topic, partitions in response.topics:
for partition_index, low_watermark, error_code in partitions:
if error_code:
err = for_code(error_code)
raise err
responses[TopicPartition(topic, partition_index)] = low_watermark
return responses

Check warning on line 664 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L661-L664

Added lines #L661 - L664 were not covered by tests

@staticmethod
def _convert_records_to_delete(
records_to_delete: Dict[str, List[Tuple[int, RecordsToDelete]]],
):
return [
(
topic,
[(partition, rec.before_offset, {}) for partition, rec in records],
{},
)
for topic, records in records_to_delete.items()
]
12 changes: 12 additions & 0 deletions aiokafka/admin/records_to_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class RecordsToDelete:
"""A class for deleting records on existing topics.
Arguments:
before_offset (int):
delete all the records before the given offset
"""

def __init__(
self,
before_offset,
):
self.before_offset = before_offset

Check warning on line 12 in aiokafka/admin/records_to_delete.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/records_to_delete.py#L12

Added line #L12 was not covered by tests
121 changes: 121 additions & 0 deletions aiokafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1276,3 +1276,124 @@ class ListPartitionReassignmentsRequest_v0(Request):
ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]

ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]


class DeleteRecordsResponse_v0(Response):
API_KEY = 21
API_VERSION = 0
SCHEMA = Schema(
("throttle_time_ms", Int32),
(
"topics",
Array(
("name", String("utf-8")),
(
"partitions",
Array(
("partition_index", Int32),
("low_watermark", Int64),
("error_code", Int16),
),
),
),
),
)


class DeleteRecordsResponse_v1(Response):
API_KEY = 21
API_VERSION = 1
SCHEMA = DeleteRecordsResponse_v0.SCHEMA


class DeleteRecordsResponse_v2(Response):
API_KEY = 21
API_VERSION = 2
SCHEMA = Schema(
("throttle_time_ms", Int32),
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
(
"partitions",
CompactArray(
("partition_index", Int32),
("low_watermark", Int64),
("error_code", Int16),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
)


class DeleteRecordsRequest_v0(Request):
API_KEY = 21
API_VERSION = 0
RESPONSE_TYPE = DeleteRecordsResponse_v0
SCHEMA = Schema(
(
"topics",
Array(
("name", String("utf-8")),
(
"partitions",
Array(
("partition_index", Int32),
("offset", Int64),
),
),
),
),
("timeout_ms", Int32),
)


class DeleteRecordsRequest_v1(Request):
API_KEY = 21
API_VERSION = 1
RESPONSE_TYPE = DeleteRecordsResponse_v1
SCHEMA = DeleteRecordsRequest_v0.SCHEMA


class DeleteRecordsRequest_v2(Request):
API_KEY = 21
API_VERSION = 2
FLEXIBLE_VERSION = True
RESPONSE_TYPE = DeleteRecordsResponse_v2
SCHEMA = Schema(
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
(
"partitions",
CompactArray(
("partition_index", Int32),
("offset", Int64),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
),
),
("timeout_ms", Int32),
("tags", TaggedFields),
)


DeleteRecordsRequest = [
DeleteRecordsRequest_v0,
DeleteRecordsRequest_v1,
DeleteRecordsRequest_v2,
]

DeleteRecordsResponse = [
DeleteRecordsResponse_v0,
DeleteRecordsResponse_v1,
DeleteRecordsResponse_v2,
]
37 changes: 36 additions & 1 deletion tests/test_admin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio

from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic
from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic, RecordsToDelete
from aiokafka.admin.config_resource import ConfigResource, ConfigResourceType
from aiokafka.consumer import AIOKafkaConsumer
from aiokafka.producer import AIOKafkaProducer
Expand Down Expand Up @@ -201,3 +201,38 @@ async def test_list_consumer_group_offsets(self):
assert resp[tp].offset == msg.offset + 1
resp = await admin.list_consumer_group_offsets(group_id, partitions=[tp])
assert resp[tp].offset == msg.offset + 1

@kafka_versions(">=1.1.0")
@run_until_complete
async def test_delete_records(self):
admin = await self.create_admin()

Check warning on line 208 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L208

Added line #L208 was not covered by tests

await admin.create_topics([NewTopic(self.topic, 1, 1)])

Check warning on line 210 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L210

Added line #L210 was not covered by tests

async with AIOKafkaProducer(bootstrap_servers=self.hosts) as producer:
first_message = await producer.send_and_wait(

Check warning on line 213 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L213

Added line #L213 was not covered by tests
self.topic, partition=0, value=b"some-message"
)
await producer.send_and_wait(

Check warning on line 216 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L216

Added line #L216 was not covered by tests
self.topic, partition=0, value=b"other-message"
)

await admin.delete_records(

Check warning on line 220 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L220

Added line #L220 was not covered by tests
{
TopicPartition(self.topic, 0): RecordsToDelete(
before_offset=first_message.offset + 1
)
}
)

consumer = AIOKafkaConsumer(

Check warning on line 228 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L228

Added line #L228 was not covered by tests
self.topic,
bootstrap_servers=self.hosts,
enable_auto_commit=False,
auto_offset_reset="earliest",
)
await consumer.start()
self.add_cleanup(consumer.stop)

Check warning on line 235 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L234-L235

Added lines #L234 - L235 were not covered by tests

msg = await consumer.getone()
assert msg.value == b"other-message"

Check warning on line 238 in tests/test_admin.py

View check run for this annotation

Codecov / codecov/patch

tests/test_admin.py#L237-L238

Added lines #L237 - L238 were not covered by tests

0 comments on commit 181f956

Please sign in to comment.