You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We are sorry if this is a long text. This a copy-paste issue we also posted on apache/airflow and redis/redis-py. We include a MCVE to reproduce this bug. We are not sure who should fix this, but celery/kombu could have a workaround which we will discuss at the end of this issue to avoid this bug.
TL;DR: The with timeout(seconds=OPERATION_TIMEOUT): in airflow.executors.celery_executor.send_task_to_executor might leave a very broken import of redis & may affect another package that we haven't discovered yet. This is a race condition and very hard to debug at first. To reproduce this bug, we have to
have a celery timeout (we kept it at default 1.0 second)
the timeout should happen during an import
a long import (we aren't sure the import of redis is long and this bug happens mostly with redis package, maybe this is a confirmation bias)
airflow: this happens with both 2.6 and latest 2.9.3 version
helm chart: 1.9, 1.14 or 1.15
python: 3.11.9
redis: 4.6.0 (airflow 2.6) and 5.0.7 (airflow 2.9)
kombu: 5.3.1 (airflow 2.6) and 5.3.7 (airflow 2.9)
We 've observed this issue since at least several months ago with our airflow deployment using official helm chart, we have the same issue as in related issues/discussion:
Aug 8 08:29:02 airflow-XXX-scheduler-XXX-zhtb8 scheduler ERROR {timeout.py:68} ERROR - Process timed out, PID: 7
Aug 8 08:29:02 airflow-XXX-scheduler-XXX-zhtb8 scheduler {celery_executor.py:279} INFO - [Try 1 of 3] Task Timeout Error for Task: (TaskInstanceKey(dag_id='XXX', task_id='XXX', run_id='manual__2024-06-19T14:06:39+00:00', try_number=51, map_index=-1)).
Aug 8 08:29:03 airflow-XXX-scheduler-XXX-zhtb8 scheduler {celery_executor.py:290} ERROR - Error sending Celery task: module 'redis' has no attribute 'client'
Aug 8 08:29:03 airflow-XXX-scheduler-XXX-zhtb8 scheduler Celery Task ID: TaskInstanceKey(dag_id='XXX', task_id='XXX', run_id='manual__2024-06-19T14:06:39+00:00', try_number=51, map_index=-1)
Aug 8 08:29:03 airflow-XXX-scheduler-XXX-zhtb8 scheduler Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 220, in send_task_to_executor
result = task_to_run.apply_async(args=[command], queue=queue)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/task.py", line 594, in apply_async
return app.send_task(
^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 797, in send_task
with self.producer_or_acquire(producer) as P:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 932, in producer_or_acquire
producer, self.producer_pool.acquire, block=True,
^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 1354, in producer_pool
return self.amqp.producer_pool
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/amqp.py", line 591, in producer_pool
self.app.connection_for_write()]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 829, in connection_for_write
return self._connection(url or self.conf.broker_write_url, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 880, in _connection
return self.amqp.Connection(
^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/kombu/connection.py", line 201, in __init__
if not get_transport_cls(transport).can_parse_url:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/kombu/transport/__init__.py", line 90, in get_transport_cls
_transport_cache[transport] = resolve_transport(transport)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/kombu/transport/__init__.py", line 75, in resolve_transport
return symbol_by_name(transport)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/kombu/utils/imports.py", line 59, in symbol_by_name
module = imp(module_name, package=package, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._XXX>", line 1204, in _gcd_import
File "<frozen importlib._XXX>", line 1176, in _find_and_load
File "<frozen importlib._XXX>", line 1147, in _find_and_load_unlocked
File "<frozen importlib._XXX>", line 690, in _load_unlocked
File "<frozen importlib._XXX_external>", line 940, in exec_module
File "<frozen importlib._XXX>", line 241, in _call_with_frames_removed
File "/home/airflow/.local/lib/python3.11/site-packages/kombu/transport/redis.py", line 285, in <module>
class PrefixedRedisPipeline(GlobalKeyPrefixMixin, redis.client.Pipeline):
^^^^^^^^^^^^
Aug 8 08:29:03 airflow-XXX-scheduler-XXX-zhtb8 scheduler AttributeError: module 'redis' has no attribute 'client'
We've verified and we have neither redis folder nor redis.py file from our dev, this is a very sporadic error where most of the time it works, then it stops working for unknown reason, and once if happens, the scheduler is broken and couldn't schedule anything (same error message) until we restart the scheduler process (restart the pod)
This happens quite randomly (one in tens or fifty deployments of helm chart), and we couldn't reproduce it for sure for debugging purpose.
What we found out is that if this happens, this bug won't disappear until we restart (kill) the scheduler pod.
We could reproduce randomly with these steps in a test airflow:
kubectl delete po -l release=airflow-XXX,component=scheduler --force --grace-period 0
Clear a DAG task and hope that the bug happens, this should be immediate, if not, repeat the whole process
At first, we suspect that this is a case of race condition in importing redis package, because we inject debug code before the line class PrefixedRedisPipeline(GlobalKeyPrefixMixin, redis.client.Pipeline): with print(sys.path), print(redis), print(redis.__version__), ... and everything is okay, except print(dir(redis)) gives a different result:
We noted that dir(redis) inside the troublesome scheduler lacks several attributes, notably redis.client
Another thing we discovered is that in every case, there is always a Timeout (as you could see the log above), and sure enough, we found out later that the bug always happens while the process of importing redis is interrupted by Timeout (we print line number in redis/__init__.py and the importing didn't run till the end). In very rare case, airflow.utils.timeout doesn't work as inteded, the timeout error is printed out in the middle of import redis but the import redis still run till the end, in this case, the bug couldn't happens. But most of the time, the timeout interrupt the import.
With this idea, we injected a sleep at the end of redis/__init__.py and sure enough, we could reproduce this bug every time.
So we made a quick minimal, complete and verifiable example (MCVE) by timeout the very long first import of redis.
(This is a race condition which is hard to duplicate in local machine, so what we did is to introduce a delay to the very first import of redis to reproduce this bug for sure. )
Please find below a compressed file of three file:
mock_kombu_transport_redis.py
mock_airflow.py
and redis.patch/init.py, we need to add the content of this file at the end of redis-py package's init.py to add the delay in import mcve.zip
python -m mock_airflow gives the result:
FunctionTimedOut
After failed import redis
'redis' in sys.modules: True
'redis.client' in sys.modules: True
Reimport mock_kombu_transport_redis
After reimport redis
'redis' in sys.modules: True
'redis.client' in sys.modules: True
'client' in dir(redis): False
getattr(redis, 'client', None)=None
After reimport redis.client
'client' in dir(redis): False
getattr(redis, 'client', None)=None
After reimport: from redis import client
client=<module 'redis.client' from '***/site-packages/redis/client.py'>
'client' in dir(redis): False
getattr(redis, 'client', None)=None
client.Pipeline=<class 'redis.client.Pipeline'>
The line print(redis.client) will raise an error:
AttributeError: module 'redis' has no attribute 'client'
So an interrupted import give a different import than a normal import, it seems that the broken import doesn't import not-public member in package, such as redis.client in this case, Redis, StrictRedis are exposed explicitly but redis.client is set "impliciteley"
In our case, if we try to reimport an interrupted import, this isn't true anymore, the submodule isn't set at all. We didn't dig further in internal python to find out why this happens.
We see at least four options to fix this bug:
Increase celery's operation_timeout (by config or env var). This isn't error-proof but at least reduce drastically the number of this bug
inject from redis import client to redis/__init__.py
patch kombu/transport/redis.py with
fromredisimportclient
and replace every redis.client by client
We opt for the second method in our dev at the moment, with the same script above we have the diffent output:
...
After reimport redis
'redis' in sys.modules: True
'redis.client' in sys.modules: True
'client' in dir(redis): True
getattr(redis, 'client', None)=<module 'redis.client' from 'XXX/site-packages/redis/client.py'>
...
We aren't sure that this bug happens enough to be taken into consideration in upstream? But at least other dev won't loose days of debugging session as us ^^
This raise another question: Could the Timeout or another mechanism break the import and introduce this bug in another package or another hard-to-catch race condition bug? mcve.zip
The text was updated successfully, but these errors were encountered:
I want to point out that neither of last two options actually solve the underlying issue, they just hide it. You cannot continue running with half-imported modules since they might not be fully executed and therefore missing members (more than just the submodule attribute).
As I said in the discourse thread, the only real solution outside of preventing the interrupted import in the first place is to restart the interpreter or to try and clean up sys.modules. I don't really know your usecases, so I can't give more specific information.
We're having this issue as well - when you said we should try to clean up sys.modules is this something that can be fixed further upstream or it will have to be addressed by every downstream user of kombu manually?
Hello,
We are sorry if this is a long text. This a copy-paste issue we also posted on apache/airflow and redis/redis-py. We include a MCVE to reproduce this bug. We are not sure who should fix this, but celery/kombu could have a workaround which we will discuss at the end of this issue to avoid this bug.
TL;DR: The
with timeout(seconds=OPERATION_TIMEOUT):
inairflow.executors.celery_executor.send_task_to_executor
might leave a very broken import of redis & may affect another package that we haven't discovered yet. This is a race condition and very hard to debug at first. To reproduce this bug, we have toRelates:
Our environment:
We 've observed this issue since at least several months ago with our airflow deployment using official helm chart, we have the same issue as in related issues/discussion:
We've verified and we have neither redis folder nor
redis.py
file from our dev, this is a very sporadic error where most of the time it works, then it stops working for unknown reason, and once if happens, the scheduler is broken and couldn't schedule anything (same error message) until we restart the scheduler process (restart the pod)This happens quite randomly (one in tens or fifty deployments of helm chart), and we couldn't reproduce it for sure for debugging purpose.
What we found out is that if this happens, this bug won't disappear until we restart (kill) the scheduler pod.
We could reproduce randomly with these steps in a test airflow:
kubectl delete po -l release=airflow-XXX,component=scheduler --force --grace-period 0
At first, we suspect that this is a case of race condition in importing redis package, because we inject debug code before the line
class PrefixedRedisPipeline(GlobalKeyPrefixMixin, redis.client.Pipeline):
withprint(sys.path)
,print(redis)
,print(redis.__version__)
, ... and everything is okay, exceptprint(dir(redis))
gives a different result:compared to a python shell session inside the same container:
We noted that
dir(redis)
inside the troublesome scheduler lacks several attributes, notablyredis.client
Another thing we discovered is that in every case, there is always a Timeout (as you could see the log above), and sure enough, we found out later that the bug always happens while the process of importing
redis
is interrupted by Timeout (we print line number inredis/__init__.py
and the importing didn't run till the end). In very rare case,airflow.utils.timeout
doesn't work as inteded, the timeout error is printed out in the middle ofimport redis
but theimport redis
still run till the end, in this case, the bug couldn't happens. But most of the time, the timeout interrupt the import.With this idea, we injected a
sleep
at the end ofredis/__init__.py
and sure enough, we could reproduce this bug every time.So we made a quick minimal, complete and verifiable example (MCVE) by timeout the very long first import of redis.
(This is a race condition which is hard to duplicate in local machine, so what we did is to introduce a delay to the very first import of redis to reproduce this bug for sure. )
Please find below a compressed file of three file:
mcve.zip
python -m mock_airflow
gives the result:The line
print(redis.client)
will raise an error:So an interrupted import give a different import than a normal
import
, it seems that the broken import doesn't import not-public member in package, such as redis.client in this case, Redis, StrictRedis are exposed explicitly but redis.client is set "impliciteley"I've found one comment from discuss.python.org:
In our case, if we try to reimport an interrupted import, this isn't true anymore, the submodule isn't set at all. We didn't dig further in internal python to find out why this happens.
We see at least four options to fix this bug:
from redis import client
toredis/__init__.py
kombu/transport/redis.py
withredis.client
byclient
We opt for the second method in our dev at the moment, with the same script above we have the diffent output:
We aren't sure that this bug happens enough to be taken into consideration in upstream? But at least other dev won't loose days of debugging session as us ^^
This raise another question: Could the Timeout or another mechanism break the import and introduce this bug in another package or another hard-to-catch race condition bug?
mcve.zip
The text was updated successfully, but these errors were encountered: