From ec48f50877461143b011de4d85a167eea47cc30d Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 16 Sep 2024 18:01:14 +0200 Subject: [PATCH] Fix wrong celery_app config on job and workflow handlers Fixes https://github.com/galaxyproject/galaxy/issues/18727 / https://sentry.galaxyproject.org/share/issue/8091a452f32645b89b37976362779532/: ``` Message (59965185/49195886) Job wrapper finish method failed Stack Trace Newest OperationalError: unable to open database file File "sqlalchemy/engine/base.py", line 146, in __init__ self._dbapi_connection = engine.raw_connection() File "sqlalchemy/engine/base.py", line 3300, in raw_connection return self.pool.connect() File "sqlalchemy/pool/base.py", line 449, in connect return _ConnectionFairy._checkout(self) File "sqlalchemy/pool/base.py", line 1263, in _checkout fairy = _ConnectionRecord.checkout(pool) File "sqlalchemy/pool/base.py", line 712, in checkout rec = pool._do_get() File "sqlalchemy/pool/impl.py", line 179, in _do_get with util.safe_reraise(): File "sqlalchemy/util/langhelpers.py", line 146, in __exit__ raise exc_value.with_traceback(exc_tb) File "sqlalchemy/pool/impl.py", line 177, in _do_get return self._create_connection() File "sqlalchemy/pool/base.py", line 390, in _create_connection return _ConnectionRecord(self) File "sqlalchemy/pool/base.py", line 674, in __init__ self.__connect() File "sqlalchemy/pool/base.py", line 900, in __connect with util.safe_reraise(): File "sqlalchemy/util/langhelpers.py", line 146, in __exit__ raise exc_value.with_traceback(exc_tb) File "sqlalchemy/pool/base.py", line 896, in __connect self.dbapi_connection = connection = pool._invoke_creator(self) File "sqlalchemy/engine/create.py", line 643, in connect return dialect.connect(*cargs, **cparams) File "sqlalchemy/engine/default.py", line 620, in connect return self.loaded_dbapi.connect(*cargs, **cparams) OperationalError: (sqlite3.OperationalError) unable to open database file (Background on this error at: https://sqlalche.me/e/20/e3q8) File "kombu/connection.py", line 472, in _reraise_as_library_errors yield File "kombu/connection.py", line 556, in _ensured return fun(*args, **kwargs) File "kombu/messaging.py", line 202, in _publish [maybe_declare(entity) for entity in declare] File "kombu/messaging.py", line 202, in [maybe_declare(entity) for entity in declare] File "kombu/messaging.py", line 107, in maybe_declare return maybe_declare(entity, self.channel, retry, **retry_policy) File "kombu/common.py", line 113, in maybe_declare return _maybe_declare(entity, channel) File "kombu/common.py", line 153, in _maybe_declare entity.declare(channel=channel) File "kombu/entity.py", line 617, in declare self._create_queue(nowait=nowait, channel=channel) File "kombu/entity.py", line 626, in _create_queue self.queue_declare(nowait=nowait, passive=False, channel=channel) File "kombu/entity.py", line 655, in queue_declare ret = channel.queue_declare( File "kombu/transport/virtual/base.py", line 537, in queue_declare self._new_queue(queue, **kwargs) File "kombu/transport/sqlalchemy/__init__.py", line 159, in _new_queue self._get_or_create(queue) File "kombu/transport/sqlalchemy/__init__.py", line 138, in _get_or_create obj = self.session.query(self.queue_cls) \ File "kombu/transport/sqlalchemy/__init__.py", line 133, in session _, Session = self._open() File "kombu/transport/sqlalchemy/__init__.py", line 125, in _open metadata.create_all(engine) File "sqlalchemy/sql/schema.py", line 5857, in create_all bind._run_ddl_visitor( File "sqlalchemy/engine/base.py", line 3250, in _run_ddl_visitor with self.begin() as conn: File "contextlib.py", line 137, in __enter__ return next(self.gen) File "sqlalchemy/engine/base.py", line 3240, in begin with self.connect() as conn: File "sqlalchemy/engine/base.py", line 3276, in connect return self._connection_cls(self) File "sqlalchemy/engine/base.py", line 148, in __init__ Connection._handle_dbapi_exception_noconnection( File "sqlalchemy/engine/base.py", line 2440, in _handle_dbapi_exception_noconnection raise sqlalchemy_exception.with_traceback(exc_info[2]) from e File "sqlalchemy/engine/base.py", line 146, in __init__ self._dbapi_connection = engine.raw_connection() File "sqlalchemy/engine/base.py", line 3300, in raw_connection return self.pool.connect() File "sqlalchemy/pool/base.py", line 449, in connect return _ConnectionFairy._checkout(self) File "sqlalchemy/pool/base.py", line 1263, in _checkout fairy = _ConnectionRecord.checkout(pool) File "sqlalchemy/pool/base.py", line 712, in checkout rec = pool._do_get() File "sqlalchemy/pool/impl.py", line 179, in _do_get with util.safe_reraise(): File "sqlalchemy/util/langhelpers.py", line 146, in __exit__ raise exc_value.with_traceback(exc_tb) File "sqlalchemy/pool/impl.py", line 177, in _do_get return self._create_connection() File "sqlalchemy/pool/base.py", line 390, in _create_connection return _ConnectionRecord(self) File "sqlalchemy/pool/base.py", line 674, in __init__ self.__connect() File "sqlalchemy/pool/base.py", line 900, in __connect with util.safe_reraise(): File "sqlalchemy/util/langhelpers.py", line 146, in __exit__ raise exc_value.with_traceback(exc_tb) File "sqlalchemy/pool/base.py", line 896, in __connect self.dbapi_connection = connection = pool._invoke_creator(self) File "sqlalchemy/engine/create.py", line 643, in connect return dialect.connect(*cargs, **cparams) File "sqlalchemy/engine/default.py", line 620, in connect return self.loaded_dbapi.connect(*cargs, **cparams) OperationalError: (sqlite3.OperationalError) unable to open database file (Background on this error at: https://sqlalche.me/e/20/e3q8) File "galaxy/jobs/runners/__init__.py", line 677, in _finish_or_resubmit_job job_wrapper.finish( File "galaxy/jobs/__init__.py", line 2070, in finish task_wrapper.delay() File "celery/canvas.py", line 353, in delay return self.apply_async(partial_args, partial_kwargs) File "celery/canvas.py", line 400, in apply_async return _apply(args, kwargs, **options) File "celery/app/task.py", line 594, in apply_async return app.send_task( File "celery/app/base.py", line 801, in send_task amqp.send_task_message(P, name, message, **options) File "celery/app/amqp.py", line 518, in send_task_message ret = producer.publish( File "kombu/messaging.py", line 186, in publish return _publish( File "kombu/connection.py", line 553, in _ensured with self._reraise_as_library_errors(): File "contextlib.py", line 155, in __exit__ self.gen.throw(typ, value, traceback) File "kombu/connection.py", line 476, in _reraise_as_library_errors raise ConnectionError(str(exc)) from exc ``` The issue was that we aren't setting `GALAXY_CONFIG_FILE` if the job and workflow handlers are started with `-c path/to/galaxy.yml`, and the celery config heuristic isn't looking at sys.argv. --- lib/galaxy/celery/__init__.py | 2 +- scripts/galaxy_main.py | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/galaxy/celery/__init__.py b/lib/galaxy/celery/__init__.py index 11432481529c..120df3c28906 100644 --- a/lib/galaxy/celery/__init__.py +++ b/lib/galaxy/celery/__init__.py @@ -220,7 +220,7 @@ def config_celery_app(config, celery_app): if config.celery_conf: celery_app.conf.update(config.celery_conf) # Handle special cases - if not celery_app.conf.broker_url: + if not config.celery_conf.get("broker_url"): celery_app.conf.broker_url = config.amqp_internal_connection diff --git a/scripts/galaxy_main.py b/scripts/galaxy_main.py index 02ef73928604..85d6b954dbb7 100755 --- a/scripts/galaxy_main.py +++ b/scripts/galaxy_main.py @@ -87,8 +87,15 @@ def load_galaxy_app(config_builder, config_env=False, log=None, attach_to_pools= kwds = config_builder.app_kwds() kwds = load_app_properties(**kwds) from galaxy.app import UniverseApplication + from galaxy.celery import ( + celery_app, + config_celery_app, + ) app = UniverseApplication(global_conf=config_builder.global_conf(), attach_to_pools=attach_to_pools, **kwds) + # Update celery app, which might not have taken into account the config file if set via the `-c` argument. + config_celery_app(app.config, celery_app) + app.database_heartbeat.start() app.application_stack.log_startup() return app