Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3177: consumer hangs when assigned to partition that doesn't exist #124

Open
wbarnha opened this issue Mar 8, 2024 · 0 comments
Open

Comments

@wbarnha
Copy link
Owner

wbarnha commented Mar 8, 2024

Hi guys,
the code below doesn't work. I'm probably doing something wrong, but I couldn't make it to work.
Kafka-python version is 1.1.1
and Kafka is 0.8.2.2.

# -*- coding: utf-8 -*-
from kafka import KafkaConsumer, KafkaProducer, TopicPartition


def test_it():
    consumer = KafkaConsumer(bootstrap_servers="localhost:9092", consumer_timeout_ms=100)
    consumer.assign([TopicPartition("frontera-todo", 1)])  # make sure frontera-todo has only one partition

    m = next(consumer)  # hang

    consumer.close()

test_it()

Kafka broker output
[2016-05-11 14:46:14,669] ERROR Closing socket for /0:0:0:0:0:0:0:1 because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 16 at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:64) at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50) at kafka.network.Processor.read(SocketServer.scala:450) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745)

Traceback on KeyboardInterrupt

Traceback (most recent call last):
  File "/Users/sibiryakov/src/sh/crawl-frontier/frontera/tests/test_kafka2.py", line 25, in <module>
    test_it()
  File "/Users/sibiryakov/src/sh/crawl-frontier/frontera/tests/test_kafka2.py", line 11, in test_it
    m = next(consumer)
  File "/usr/local/lib/python2.7/site-packages/six.py", line 558, in next
    return type(self).__next__(self)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 844, in __next__
    return next(self._iterator)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 779, in _message_generator
    self._update_fetch_positions(partitions)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 762, in _update_fetch_positions
    self._fetcher.update_fetch_positions(partitions)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/fetcher.py", line 162, in update_fetch_positions
    self._reset_offset(tp)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/fetcher.py", line 188, in _reset_offset
    offset = self._offset(partition, timestamp)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/fetcher.py", line 221, in _offset
    self._client.poll(future=refresh_future, sleep=True)
  File "/usr/local/lib/python2.7/site-packages/kafka/client_async.py", line 430, in poll
    responses.extend(self._poll(timeout, sleep=sleep))
  File "/usr/local/lib/python2.7/site-packages/kafka/client_async.py", line 445, in _poll
    for key, events in self._selector.select(timeout):
  File "/usr/local/lib/python2.7/site-packages/kafka/selectors34.py", line 598, in select
    kev_list = self._kqueue.control(None, max_ev, timeout)
KeyboardInterrupt`

It certainly shouldn't hang, instead raise an exception.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant