Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSLWantReadError: The operation did not complete (read) #119

Open
wbarnha opened this issue Mar 8, 2024 · 0 comments
Open

SSLWantReadError: The operation did not complete (read) #119

wbarnha opened this issue Mar 8, 2024 · 0 comments

Comments

@wbarnha
Copy link
Owner

wbarnha commented Mar 8, 2024

hi, I use kafka-python-1.4.2 to connect the kafka_2.11-0.11.0.1, and i use SSL_SASL mode as security choose.
I got ERROR as below, but if i remove the eventlet.monkey_patch() , every things get ok. Is anyone knew why?

this is my dev info:
python version is 2.7.5
(centos)Linux version 3.10.0-514.44.5.10_44.x86_64

and my script is:
####################################

#-*- coding: utf-8 -*-
import eventlet
import os
from kafka import KafkaConsumer
import time
import ssl
from oslo_utils import eventletutils
eventlet.monkey_patch()
import logging
from kafka.client_async import selectors
if eventletutils.is_monkey_patched('select'):
    # monkeypatch the vendored SelectSelector._select like eventlet does
    # https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32
    from eventlet.green import select
    selectors.SelectSelector._select = staticmethod(select.select)

    # Force to use the select selectors
    KAFKA_SELECTOR = selectors.SelectSelector
else:
    KAFKA_SELECTOR = selectors.DefaultSelector
context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
def log_init(log_name, filemode='w'):
    logging.basicConfig(filename=os.path.join(os.getcwd(), log_name), level=logging.DEBUG, filemode=filemode,
                        format='%(asctime)s - %(levelname)s: %(message)s')

log_init('test')

consumer = KafkaConsumer('topic1',
                         group_id='my-group',
                         bootstrap_servers=['127.0.0.1:9092'],
                         ssl_context=context,
                         auto_offset_reset="latest",
                         api_version = (0,11),
                         sasl_mechanism="PLAIN",
                         sasl_plain_username="admin",
                         sasl_plain_password="pass",
                         selector=KAFKA_SELECTOR,
                         security_protocol='SASL_SSL')
for msg in consumer:
        recv = "%s:%d:%d: key=%s value=%s" %(msg.topic,msg.partition,msg.offset,msg.key,msg.value)
        logging.info(recv)

####################################

and I get the ERROR as this:

ERROR: <BrokerConnection node_id=bootstrap host=xxx:9092 <authenticating> [IPv4 ('xxx', 9092)]>: Error receiving reply from server
Traceback (most recent call last):
  File "xxxx/lib/python2.7/site-packages/kafka_python-1.4.2-py2.7.egg/kafka/conn.py", line 558, in _try_authenticate_plain
    data = self._recv_bytes_blocking(4)
  File "xxxx/lib/python2.7/site-packages/kafka_python-1.4.2-py2.7.egg/kafka/conn.py", line 535, in _recv_bytes_blocking
    fragment = self._sock.recv(n - len(data))
  File "xxxx/lib/python2.7/site-packages/eventlet/green/ssl.py", line 198, in recv
    read = self.read(buflen)
  File "xxxx/lib/python2.7/site-packages/eventlet/green/ssl.py", line 138, in read
    super(GreenSSLSocket, self).read, *args, **kwargs)
  File "xxxx/lib/python2.7/site-packages/eventlet/green/ssl.py", line 108, in _call_trampolining
    return func(*a, **kw)
  File "/usr/lib64/python2.7/ssl.py", line 634, in read
    v = self._sslobj.read(len or 1024)
SSLWantReadError: The operation did not complete (read) (_ssl.c:1936)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant