Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Doesn't seem to support kafka-python library version >= 2.0.0 #22

Open
naveen-thotaka opened this issue Jan 25, 2021 · 3 comments
Open

Comments

@naveen-thotaka
Copy link

naveen-thotaka commented Jan 25, 2021

Kafka fixture seems to be supporting kafka-python library version up to 1.4.7 only, not up to the latest version 2.0.2.
There seem to be few changes between kafka-python <=1.4.7 and >= 2.0.0

I can raise a PR to support it.

@vangheem
Copy link
Contributor

ah yes, the image of kafka used is very old and unsupported. This should be updated to use better images.

The bitnami/kafka with bitnami/zookeeper works well.

@naveen-thotaka
Copy link
Author

naveen-thotaka commented Jan 26, 2021

Hi @vangheem, thanks for the response!

Kafka docker image (spotify/kafka) seems to be working fine, at least as of now.
I am actually having issues while using the fixture with kafka client library for python (kafka-python >= 2.0.0).
There seem to be few changes in kafka-python library after version 1.4.7 which breaks Kafka.check() method.

I needed to make following changes to containers/kafka.py to make it work with kafka-python version >= 2.0.0

class Kafka(BaseImage):
    label = 'kafka'
    name = 'kafka'
    port = 9092

    def check(self):
        from kafka import KafkaClient
-       from kafka.common import KafkaUnavailableError
+       from kafka.errors import KafkaUnavailableError
+       from time import sleep
+	sleep(30)
        try:
-           KafkaClient(f"{self.host}:{self.get_port()}")
+           KafkaClient(bootstrap_servers=f"{self.host}:{self.get_port()}")
            return True
        except KafkaUnavailableError:
            pass
        return False

Strangely, I also needed to introduce a 30s sleep before creating a client otherwise it crashes with "ValueError: Invalid file object".

I still need to debug into it further for the exact reason. Looking at your code, I feel you were expecting a KafkaUnavailableError for the previous version which might not be happening in this version of kafka-python.

@naveen-thotaka
Copy link
Author

Ok, as suspected, kafka-python no longer seems to be raising KafkaUnavailableError in case its not able to connect.
In place, I get a "ValueError: Invalid file object" because of invalid socket while trying to create a KafkaClient and connect to bootstrap_server.

I am able to make it work with the following changes for kafka-python latest version.

class Kafka(BaseImage):
    label = 'kafka'
    name = 'kafka'
    port = 9092

    def check(self):
        from kafka import KafkaClient
-       from kafka.common import KafkaUnavailableError
        try:
-           KafkaClient(f"{self.host}:{self.get_port()}")
+           KafkaClient(bootstrap_servers=f"{self.host}:{self.get_port()}")
            return True
-       except KafkaUnavailableError:
+       except ValueError:
            pass
        return False

Although, ValueError seems to be little more generic exception to catch but it doesn't seem to be raising any Kafka specific exception to catch now. Please suggest, if any other way to handle it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants