Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging during finalization from producer daemon thread might raise exception #189

Open
huadle opened this issue Jul 31, 2024 · 0 comments

Comments

@huadle
Copy link

huadle commented Jul 31, 2024

I've been running into a non-deterministic crash on shutdown from some programs that use kafka-python to produce messages and don't explicitly close the producer before the program ends. The full exception and a small reproduction that works in my environment are below.

I suspect this is because the sender daemon thread is trying to log during shutdown, grabbing the i/o lock for stderr, and then getting killed meaning the Python runtime can't complete finalization. I'm happy to try my hand at writing a PR to disable logging during shutdown but want to see if there's anything I'm missing before I go ahead and do that.

From the logging docs (emphasis is mine):

logging.shutdown()
Informs the logging system to perform an orderly shutdown by flushing and closing all handlers. This should be called at application exit and no further use of the logging system should be made after this call.

When the logging module is imported, it registers this function as an exit handler (see atexit), so normally there’s no need to do that manually.

Exception:

Fatal Python error: _enter_buffered_busy: could not acquire lock for <_io.BufferedWriter name='<stderr>'> at interpreter shutdown, possibly due to daemon threads
Python runtime state: finalizing (tstate=0x0000000001045980)
 
Current thread 0x00007f63d10f1b80 (most recent call first):
  <no Python frame>

Minimal repro:

# This is the minimal atexit handler I could find that reliably reproduces the crash
def _yield_on_exit():
    import os
    os.sched_yield()
 
import atexit
import json
 
atexit.register(_yield_on_exit)
 
import kafka
import logging
 
KAFKA_TOPIC = "my-topic"
 
logging.basicConfig(level=logging.DEBUG)  # bug doesn't repro without this line
 
producer = kafka.KafkaProducer(
    value_serializer=lambda m: json.dumps(m).encode("utf8"),
    client_id=f"kafka-python-producer",
    api_version=(1, 1, 1),
    bootstrap_servers=["my-bootstrap-server"],
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="GSSAPI",
    retries=3,
    )
 
# atexit.register(producer.close)  # bug doesn't repro if you uncomment this line
 
producer.send(KAFKA_TOPIC, value={})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant