diff --git a/aiokafka/consumer/fetcher.py b/aiokafka/consumer/fetcher.py index c788e765..b7a419a2 100644 --- a/aiokafka/consumer/fetcher.py +++ b/aiokafka/consumer/fetcher.py @@ -1093,7 +1093,7 @@ async def next_record(self, partitions): for tp in list(self._records.keys()): if partitions and tp not in partitions: - # Cleanup results for unassigned partitons + # Cleanup results for unassigned partitions if not self._subscriptions.is_assigned(tp): del self._records[tp] continue @@ -1129,7 +1129,7 @@ async def fetched_records(self, partitions, timeout=0, max_records=None): drained = {} for tp in list(self._records.keys()): if partitions and tp not in partitions: - # Cleanup results for unassigned partitons + # Cleanup results for unassigned partitions if not self._subscriptions.is_assigned(tp): del self._records[tp] continue diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 4895ab51..4916a08c 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -1104,7 +1104,7 @@ async def test_consumer_rebalance_on_new_topic(self): # Wait for group to stabilize assign1 = await listener1.wait_assign() assign2 = await listener2.wait_assign() - # We expect 2 partitons for autocreated topics + # We expect 2 partitions for autocreated topics my_partitions = {TopicPartition(my_topic, 0), TopicPartition(my_topic, 1)} self.assertEqual(assign1 | assign2, my_partitions) self.assertEqual(consumer1.assignment() | consumer2.assignment(), my_partitions) @@ -1118,7 +1118,7 @@ async def test_consumer_rebalance_on_new_topic(self): # Wait for group to stabilize assign1 = await listener1.wait_assign() assign2 = await listener2.wait_assign() - # We expect 2 partitons for autocreated topics + # We expect 2 partitions for autocreated topics my_partitions = { TopicPartition(my_topic, 0), TopicPartition(my_topic, 1),