From c01df1c84ccd0fc3a16e9537c3c0083f4635555f Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 8 Nov 2024 11:03:59 +0100 Subject: [PATCH] Refactor SerializedDagModel and DagCode for dag versioning Now that we have dag versioning, the SerializedDagModel and DagCode objects should no longer be deleted. Deletion should start with the DagModel, which will cascade to the DagVersion, then to the DagCode and SerializedDagModel. Also, these models are no longer updated. Instead, a new object is added; hence, the last_updated is changed to created_at. --- airflow/api/common/delete_dag.py | 6 - .../endpoints/rpc_api_endpoint.py | 1 - airflow/dag_processing/manager.py | 2 +- .../versions/0047_3_0_0_add_dag_versioning.py | 4 + airflow/models/dag_version.py | 2 +- airflow/models/dagcode.py | 41 +---- airflow/models/serialized_dag.py | 81 +++------- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 151 +++++++++--------- tests/models/test_dagcode.py | 8 +- tests/models/test_serialized_dag.py | 27 +--- 11 files changed, 114 insertions(+), 211 deletions(-) diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py index 11b046648c5a1..1e9d4a80dbd67 100644 --- a/airflow/api/common/delete_dag.py +++ b/airflow/api/common/delete_dag.py @@ -28,7 +28,6 @@ from airflow.exceptions import AirflowException, DagNotFound from airflow.models import DagModel from airflow.models.errors import ParseImportError -from airflow.models.serialized_dag import SerializedDagModel from airflow.utils.db import get_sqla_model_classes from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import TaskInstanceState @@ -64,11 +63,6 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session = if dag is None: raise DagNotFound(f"Dag id {dag_id} not found") - # Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval. - # There may be a lag, so explicitly removes serialized DAG here. - if SerializedDagModel.has_dag(dag_id=dag_id, session=session): - SerializedDagModel.remove_dag(dag_id=dag_id, session=session) - count = 0 for model in get_sqla_model_classes(): diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index 1cdd2536e1354..43acbde755142 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -138,7 +138,6 @@ def initialize_method_map() -> dict[str, Callable]: DagRun._get_log_template, RenderedTaskInstanceFields._update_runtime_evaluated_template_fields, SerializedDagModel.get_serialized_dag, - SerializedDagModel.remove_deleted_dags, SkipMixin._skip, SkipMixin._skip_all_except, TaskInstance._check_and_change_state_before_execution, diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 3bc467e2f7063..d60f236d1d99f 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -506,7 +506,7 @@ def deactivate_stale_dags( """ Detect DAGs which are no longer present in files. - Deactivate them and remove them in the serialized_dag table. + Deactivate them. """ to_deactivate = set() query = select(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time).where(DagModel.is_active) diff --git a/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py b/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py index ca685ae4e071d..076aa633be50b 100644 --- a/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py +++ b/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py @@ -81,6 +81,8 @@ def upgrade(): ondelete="CASCADE", ) batch_op.create_unique_constraint("dag_code_dag_version_id_uq", ["dag_version_id"]) + batch_op.drop_column("last_updated") + batch_op.add_column(sa.Column("created_at", UtcDateTime(), nullable=False, default=timezone.utcnow)) with op.batch_alter_table( "serialized_dag", recreate="always", naming_convention=naming_convention @@ -100,6 +102,8 @@ def upgrade(): ondelete="CASCADE", ) batch_op.create_unique_constraint("serialized_dag_dag_version_id_uq", ["dag_version_id"]) + batch_op.drop_column("last_updated") + batch_op.add_column(sa.Column("created_at", UtcDateTime(), nullable=False, default=timezone.utcnow)) with op.batch_alter_table("task_instance", schema=None) as batch_op: batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False))) diff --git a/airflow/models/dag_version.py b/airflow/models/dag_version.py index a0ce924caa8ba..4bbd171b3f96e 100644 --- a/airflow/models/dag_version.py +++ b/airflow/models/dag_version.py @@ -62,7 +62,7 @@ class DagVersion(Base): ) dag_runs = relationship("DagRun", back_populates="dag_version", cascade="all, delete, delete-orphan") task_instances = relationship("TaskInstance", back_populates="dag_version") - created_at = Column(UtcDateTime, default=timezone.utcnow) + created_at = Column(UtcDateTime, nullable=False, default=timezone.utcnow) __table_args__ = ( UniqueConstraint("dag_id", "version_number", name="dag_id_v_name_v_number_unique_constraint"), diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py index c78f6cafaa6fa..c78457e6059f6 100644 --- a/airflow/models/dagcode.py +++ b/airflow/models/dagcode.py @@ -18,16 +18,15 @@ import logging import struct -from typing import TYPE_CHECKING, Collection +from typing import TYPE_CHECKING import uuid6 -from sqlalchemy import BigInteger, Column, ForeignKey, String, Text, delete, select +from sqlalchemy import BigInteger, Column, ForeignKey, String, Text, select from sqlalchemy.dialects.mysql import MEDIUMTEXT from sqlalchemy.orm import relationship from sqlalchemy.sql.expression import literal from sqlalchemy_utils import UUIDType -from airflow.api_internal.internal_api_call import internal_api_call from airflow.configuration import conf from airflow.exceptions import DagCodeNotFound from airflow.models.base import Base @@ -58,7 +57,7 @@ class DagCode(Base): fileloc_hash = Column(BigInteger, nullable=False) fileloc = Column(String(2000), nullable=False) # The max length of fileloc exceeds the limit of indexing. - last_updated = Column(UtcDateTime, nullable=False) + created_at = Column(UtcDateTime, nullable=False, default=timezone.utcnow) source_code = Column(Text().with_variant(MEDIUMTEXT(), "mysql"), nullable=False) dag_version_id = Column( UUIDType(binary=False), ForeignKey("dag_version.id", ondelete="CASCADE"), nullable=False, unique=True @@ -74,7 +73,7 @@ def __init__(self, dag_version, full_filepath: str, source_code: str | None = No @classmethod @provide_session - def write_dag(cls, dag_version: DagVersion, fileloc: str, session: Session = NEW_SESSION) -> DagCode: + def write_code(cls, dag_version: DagVersion, fileloc: str, session: Session = NEW_SESSION) -> DagCode: """ Write code into database. @@ -87,36 +86,6 @@ def write_dag(cls, dag_version: DagVersion, fileloc: str, session: Session = NEW log.debug("DAG file %s written into DagCode table", fileloc) return dag_code - @classmethod - @internal_api_call - @provide_session - def remove_deleted_code( - cls, - alive_dag_filelocs: Collection[str], - processor_subdir: str, - session: Session = NEW_SESSION, - ) -> None: - """ - Delete code not included in alive_dag_filelocs. - - :param alive_dag_filelocs: file paths of alive DAGs - :param processor_subdir: dag processor subdir - :param session: ORM Session - """ - alive_fileloc_hashes = [cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs] - - log.debug("Deleting code from %s table ", cls.__tablename__) - - session.execute( - delete(cls) - .where( - cls.fileloc_hash.notin_(alive_fileloc_hashes), - cls.fileloc.notin_(alive_dag_filelocs), - cls.fileloc.contains(processor_subdir), - ) - .execution_options(synchronize_session="fetch") - ) - @classmethod @provide_session def has_dag(cls, fileloc: str, session: Session = NEW_SESSION) -> bool: @@ -172,7 +141,7 @@ def _get_code_from_db(cls, fileloc, session: Session = NEW_SESSION) -> str: dag_code = session.scalar( select(cls) .where(cls.fileloc_hash == cls.dag_fileloc_hash(fileloc)) - .order_by(cls.last_updated.desc()) + .order_by(cls.created_at.desc()) .limit(1) ) if not dag_code: diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 0d5667cd48fc9..37c3478c86328 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -22,11 +22,11 @@ import logging import zlib from datetime import timedelta -from typing import TYPE_CHECKING, Any, Collection +from typing import TYPE_CHECKING, Any import sqlalchemy_jsonfield import uuid6 -from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, or_, select +from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select from sqlalchemy.orm import backref, foreign, relationship from sqlalchemy.sql.expression import func, literal from sqlalchemy_utils import UUIDType @@ -83,7 +83,7 @@ class SerializedDagModel(Base): dag_id = Column(String(ID_LEN), nullable=False) _data = Column("data", sqlalchemy_jsonfield.JSONField(json=json), nullable=True) _data_compressed = Column("data_compressed", LargeBinary, nullable=True) - last_updated = Column(UtcDateTime, nullable=False) + created_at = Column(UtcDateTime, nullable=False, default=timezone.utcnow) dag_hash = Column(String(32), nullable=False) processor_subdir = Column(String(2000), nullable=True) @@ -110,7 +110,6 @@ class SerializedDagModel(Base): def __init__(self, dag: DAG, processor_subdir: str | None = None) -> None: self.dag_id = dag.dag_id - self.last_updated = timezone.utcnow() self.processor_subdir = processor_subdir dag_data = SerializedDAG.to_dict(dag) @@ -186,7 +185,7 @@ def write_dag( if session.scalar( select(literal(True)).where( cls.dag_id == dag.dag_id, - (timezone.utcnow() - timedelta(seconds=min_update_interval)) < cls.last_updated, + (timezone.utcnow() - timedelta(seconds=min_update_interval)) < cls.created_at, ) ): return False @@ -196,7 +195,7 @@ def write_dag( serialized_dag_db = session.execute( select(cls.dag_hash, cls.processor_subdir) .where(cls.dag_id == dag.dag_id) - .order_by(cls.last_updated.desc()) + .order_by(cls.created_at.desc()) ).first() if ( @@ -215,13 +214,12 @@ def write_dag( new_serialized_dag.dag_version = dagv session.add(new_serialized_dag) log.debug("DAG: %s written to the DB", dag.dag_id) - - DagCode.write_dag(dagv, dag.fileloc, session=session) + DagCode.write_code(dagv, dag.fileloc, session=session) return True @classmethod def latest_item_select_object(cls, dag_id): - return select(cls).where(cls.dag_id == dag_id).order_by(cls.last_updated.desc()).limit(1) + return select(cls).where(cls.dag_id == dag_id).order_by(cls.created_at.desc()).limit(1) @classmethod @provide_session @@ -237,7 +235,7 @@ def get_latest_serialized_dags( """ # Subquery to get the latest serdag per dag_id latest_serdag_subquery = ( - session.query(cls.dag_id, func.max(cls.last_updated).label("last_updated")) + session.query(cls.dag_id, func.max(cls.created_at).label("created_at")) .filter(cls.dag_id.in_(dag_ids)) .group_by(cls.dag_id) .subquery() @@ -246,7 +244,7 @@ def get_latest_serialized_dags( select(cls) .join( latest_serdag_subquery, - cls.last_updated == latest_serdag_subquery.c.last_updated, + cls.created_at == latest_serdag_subquery.c.created_at, ) .where(cls.dag_id.in_(dag_ids)) ).all() @@ -262,7 +260,7 @@ def read_all_dags(cls, session: Session = NEW_SESSION) -> dict[str, SerializedDA :returns: a dict of DAGs read from database """ latest_serialized_dag_subquery = ( - session.query(cls.dag_id, func.max(cls.last_updated).label("max_updated")) + session.query(cls.dag_id, func.max(cls.created_at).label("max_created")) .group_by(cls.dag_id) .subquery() ) @@ -270,7 +268,7 @@ def read_all_dags(cls, session: Session = NEW_SESSION) -> dict[str, SerializedDA select(cls).join( latest_serialized_dag_subquery, (cls.dag_id == latest_serialized_dag_subquery.c.dag_id) - and (cls.last_updated == latest_serialized_dag_subquery.c.max_updated), + and (cls.created_at == latest_serialized_dag_subquery.c.max_created), ) ) @@ -313,47 +311,6 @@ def dag(self) -> SerializedDAG: raise ValueError("invalid or missing serialized DAG data") return SerializedDAG.from_dict(data) - @classmethod - @provide_session - def remove_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> None: - """ - Delete a DAG with given dag_id. - - :param dag_id: dag_id to be deleted - :param session: ORM Session. - """ - session.execute(cls.__table__.delete().where(cls.dag_id == dag_id)) - - @classmethod - @internal_api_call - @provide_session - def remove_deleted_dags( - cls, - alive_dag_filelocs: Collection[str], - processor_subdir: str | None = None, - session: Session = NEW_SESSION, - ) -> None: - """ - Delete DAGs not included in alive_dag_filelocs. - - :param alive_dag_filelocs: file paths of alive DAGs - :param processor_subdir: dag processor subdir - :param session: ORM Session - """ - log.debug( - "Deleting Serialized DAGs (for which DAG files are deleted) from %s table ", cls.__tablename__ - ) - # Deleting the DagModel cascade deletes the serialized Dag through the dag version relationship - session.execute( - DagModel.__table__.delete().where( - DagModel.fileloc.notin_(alive_dag_filelocs), - or_( - DagModel.processor_subdir.is_(None), - DagModel.processor_subdir == processor_subdir, - ), - ) - ) - @classmethod @provide_session def has_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> bool: @@ -418,7 +375,7 @@ def get_last_updated_datetime(cls, dag_id: str, session: Session = NEW_SESSION) :param session: ORM Session """ return session.scalar( - select(cls.last_updated).where(cls.dag_id == dag_id).order_by(cls.last_updated.desc()).limit(1) + select(cls.created_at).where(cls.dag_id == dag_id).order_by(cls.created_at.desc()).limit(1) ) @classmethod @@ -429,7 +386,7 @@ def get_max_last_updated_datetime(cls, session: Session = NEW_SESSION) -> dateti :param session: ORM Session """ - return session.scalar(select(func.max(cls.last_updated))) + return session.scalar(select(func.max(cls.created_at))) @classmethod @provide_session @@ -442,7 +399,7 @@ def get_latest_version_hash(cls, dag_id: str, session: Session = NEW_SESSION) -> :return: DAG Hash, or None if the DAG is not found """ return session.scalar( - select(cls.dag_hash).where(cls.dag_id == dag_id).order_by(cls.last_updated.desc()).limit(1) + select(cls.dag_hash).where(cls.dag_id == dag_id).order_by(cls.created_at.desc()).limit(1) ) @classmethod @@ -461,9 +418,9 @@ def get_latest_version_hash_and_updated_datetime( :return: A tuple of DAG Hash and last updated datetime, or None if the DAG is not found """ return session.execute( - select(cls.dag_hash, cls.last_updated) + select(cls.dag_hash, cls.created_at) .where(cls.dag_id == dag_id) - .order_by(cls.last_updated.desc()) + .order_by(cls.created_at.desc()) .limit(1) ).one_or_none() @@ -476,7 +433,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[ :param session: ORM Session """ latest_sdag_subquery = ( - session.query(cls.dag_id, func.max(cls.last_updated).label("max_updated")) + session.query(cls.dag_id, func.max(cls.created_at).label("max_created")) .group_by(cls.dag_id) .subquery() ) @@ -485,7 +442,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[ select(cls.dag_id, func.json_extract(cls._data, "$.dag.dag_dependencies")).join( latest_sdag_subquery, (cls.dag_id == latest_sdag_subquery.c.dag_id) - and (cls.last_updated == latest_sdag_subquery.c.max_updated), + and (cls.created_at == latest_sdag_subquery.c.max_created), ) ) iterator = ((dag_id, json.loads(deps_data) if deps_data else []) for dag_id, deps_data in query) @@ -494,7 +451,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[ select(cls.dag_id, func.json_extract_path(cls._data, "dag", "dag_dependencies")).join( latest_sdag_subquery, (cls.dag_id == latest_sdag_subquery.c.dag_id) - and (cls.last_updated == latest_sdag_subquery.c.max_updated), + and (cls.created_at == latest_sdag_subquery.c.max_created), ) ) return {dag_id: [DagDependency(**d) for d in (deps_data or [])] for dag_id, deps_data in iterator} diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 8960e963a7522..000f872b20e27 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -2f5a0c99e82990e669d975bf366dc82202c5d460063aaebff3e10abd2acc15f9 \ No newline at end of file +49c3285d6abc128193c2255882a104bde24a9014d90693ecc83a908ea5d61e5d \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 26858f7a54008..41fe37adaaab6 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1539,6 +1539,7 @@ created_at [TIMESTAMP] + NOT NULL dag_id @@ -1758,94 +1759,94 @@ dag_code - -dag_code - -id - - [UUID] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL - -fileloc_hash - - [BIGINT] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -source_code - - [TEXT] - NOT NULL + +dag_code + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL + +fileloc_hash + + [BIGINT] + NOT NULL + +source_code + + [TEXT] + NOT NULL dag_version--dag_code - -0..N + +0..N 1 serialized_dag - -serialized_dag - -id - - [UUID] - NOT NULL - -dag_hash - - [VARCHAR(32)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -data - - [JSON] - -data_compressed - - [BYTEA] - -last_updated - - [TIMESTAMP] - NOT NULL - -processor_subdir - - [VARCHAR(2000)] + +serialized_dag + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_hash + + [VARCHAR(32)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +data + + [JSON] + +data_compressed + + [BYTEA] + +processor_subdir + + [VARCHAR(2000)] dag_version--serialized_dag - -0..N + +0..N 1 diff --git a/tests/models/test_dagcode.py b/tests/models/test_dagcode.py index fd7d761f9103f..fc1dfa330f7e8 100644 --- a/tests/models/test_dagcode.py +++ b/tests/models/test_dagcode.py @@ -95,7 +95,7 @@ def _compare_example_dags(self, example_dags): session.query(DagCode.fileloc, DagCode.fileloc_hash, DagCode.source_code) .filter(DagCode.fileloc == dag.fileloc) .filter(DagCode.fileloc_hash == dag_fileloc_hash) - .order_by(DagCode.last_updated.desc()) + .order_by(DagCode.created_at.desc()) .limit(1) .one() ) @@ -129,7 +129,7 @@ def test_db_code_created_on_serdag_change(self, session): result = ( session.query(DagCode) .filter(DagCode.fileloc == example_dag.fileloc) - .order_by(DagCode.last_updated.desc()) + .order_by(DagCode.created_at.desc()) .limit(1) .one() ) @@ -146,14 +146,14 @@ def test_db_code_created_on_serdag_change(self, session): new_result = ( session.query(DagCode) .filter(DagCode.fileloc == example_dag.fileloc) - .order_by(DagCode.last_updated.desc()) + .order_by(DagCode.created_at.desc()) .limit(1) .one() ) assert new_result.fileloc == example_dag.fileloc assert new_result.source_code != result.source_code - assert new_result.last_updated > result.last_updated + assert new_result.created_at > result.created_at def test_has_dag(self, dag_maker): """Test has_dag method.""" diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index d0bfe37a69cc0..599cb1396d714 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -109,7 +109,7 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self): s_dag_1 = SDM.get(example_bash_op_dag.dag_id) assert s_dag_1.dag_hash == s_dag.dag_hash - assert s_dag.last_updated == s_dag_1.last_updated + assert s_dag.created_at == s_dag_1.created_at assert dag_updated is False # Update DAG @@ -119,7 +119,7 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self): dag_updated = SDM.write_dag(dag=example_bash_op_dag) s_dag_2 = SDM.get(example_bash_op_dag.dag_id) - assert s_dag.last_updated != s_dag_2.last_updated + assert s_dag.created_at != s_dag_2.created_at assert s_dag.dag_hash != s_dag_2.dag_hash assert s_dag_2.data["dag"]["tags"] == ["example", "example2", "new_tag"] assert dag_updated is True @@ -141,7 +141,7 @@ def test_serialized_dag_is_updated_if_processor_subdir_changed(self): s_dag_1 = SDM.get(example_bash_op_dag.dag_id) assert s_dag_1.dag_hash == s_dag.dag_hash - assert s_dag.last_updated == s_dag_1.last_updated + assert s_dag.created_at == s_dag_1.created_at assert dag_updated is False session.flush() @@ -177,27 +177,6 @@ def test_read_all_dags_only_picks_the_latest_serdags(self, session): # assert only the latest SDM is returned assert len(sdags) != len(serialized_dags2) - @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode - def test_remove_dags_by_id(self): - """DAGs can be removed from database.""" - example_dags_list = list(self._write_example_dags().values()) - # Tests removing by dag_id. - dag_removed_by_id = example_dags_list[0] - SDM.remove_dag(dag_removed_by_id.dag_id) - assert not SDM.has_dag(dag_removed_by_id.dag_id) - - @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode - def test_remove_dags_by_filepath(self): - """DAGs can be removed from database.""" - example_dags_list = list(self._write_example_dags().values()) - # Tests removing by file path. - dag_removed_by_file = example_dags_list[0] - # remove repeated files for those DAGs that define multiple dags in the same file (set comprehension) - example_dag_files = list({dag.fileloc for dag in example_dags_list}) - example_dag_files.remove(dag_removed_by_file.fileloc) - SDM.remove_deleted_dags(example_dag_files, processor_subdir="/tmp/test") - assert not SDM.has_dag(dag_removed_by_file.dag_id) - @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_bulk_sync_to_db(self): dags = [