From e753acdbd103be969b7d39773769c77dba388908 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 7 Nov 2024 15:28:32 +0800 Subject: [PATCH 01/10] test(tests/www/views/test_views_grid): extend Asset test cases to include both uri and name --- tests/www/views/test_views_grid.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/www/views/test_views_grid.py b/tests/www/views/test_views_grid.py index c94539f05587..dd7179bc5884 100644 --- a/tests/www/views/test_views_grid.py +++ b/tests/www/views/test_views_grid.py @@ -431,8 +431,8 @@ def test_has_outlet_asset_flag(admin_client, dag_maker, session, app, monkeypatc lineagefile = File("/tmp/does_not_exist") EmptyOperator(task_id="task1") EmptyOperator(task_id="task2", outlets=[lineagefile]) - EmptyOperator(task_id="task3", outlets=[Asset("foo"), lineagefile]) - EmptyOperator(task_id="task4", outlets=[Asset("foo")]) + EmptyOperator(task_id="task3", outlets=[Asset(name="foo", uri="s3://bucket/key"), lineagefile]) + EmptyOperator(task_id="task4", outlets=[Asset(name="foo", uri="s3://bucket/key")]) m.setattr(app, "dag_bag", dag_maker.dagbag) resp = admin_client.get(f"/object/grid_data?dag_id={DAG_ID}", follow_redirects=True) @@ -471,7 +471,7 @@ def _expected_task_details(task_id, has_outlet_assets): @pytest.mark.need_serialized_dag def test_next_run_assets(admin_client, dag_maker, session, app, monkeypatch): with monkeypatch.context() as m: - assets = [Asset(uri=f"s3://bucket/key/{i}") for i in [1, 2]] + assets = [Asset(uri=f"s3://bucket/key/{i}", name=f"name_{i}", group="test-group") for i in [1, 2]] with dag_maker(dag_id=DAG_ID, schedule=assets, serialized=True, session=session): EmptyOperator(task_id="task1") From 1c90a8c3ca69f460f6af055e45373f1b63443935 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 7 Nov 2024 15:29:52 +0800 Subject: [PATCH 02/10] test(utils/test_json): extend Asset test cases to include both uri and name --- tests/utils/test_json.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/utils/test_json.py b/tests/utils/test_json.py index 5a58b5d79032..86661c02d29c 100644 --- a/tests/utils/test_json.py +++ b/tests/utils/test_json.py @@ -86,7 +86,7 @@ def test_encode_raises(self): ) def test_encode_xcom_asset(self): - asset = Asset("mytest://asset") + asset = Asset(uri="mytest://asset", name="mytest") s = json.dumps(asset, cls=utils_json.XComEncoder) obj = json.loads(s, cls=utils_json.XComDecoder) assert asset.uri == obj.uri From d31c46436fa13777580ba08cbe5d8b33b4b9571f Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 7 Nov 2024 15:33:05 +0800 Subject: [PATCH 03/10] test(timetables/test_assets_timetable): extend Asset test cases to include both uri and name --- tests/timetables/test_assets_timetable.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/tests/timetables/test_assets_timetable.py b/tests/timetables/test_assets_timetable.py index bb942a4a01d4..3629156b24c3 100644 --- a/tests/timetables/test_assets_timetable.py +++ b/tests/timetables/test_assets_timetable.py @@ -105,7 +105,7 @@ def test_timetable() -> MockTimetable: @pytest.fixture def test_assets() -> list[Asset]: """Pytest fixture for creating a list of Asset objects.""" - return [Asset("test_asset")] + return [Asset(name="test_asset", uri="test://asset")] @pytest.fixture @@ -134,7 +134,15 @@ def test_serialization(asset_timetable: AssetOrTimeSchedule, monkeypatch: Any) - "timetable": "mock_serialized_timetable", "asset_condition": { "__type": "asset_all", - "objects": [{"__type": "asset", "uri": "test_asset", "name": "test_asset", "extra": {}}], + "objects": [ + { + "__type": "asset", + "name": "test_asset", + "uri": "test://asset/", + "group": "asset", + "extra": {}, + } + ], }, } @@ -152,7 +160,15 @@ def test_deserialization(monkeypatch: Any) -> None: "timetable": "mock_serialized_timetable", "asset_condition": { "__type": "asset_all", - "objects": [{"__type": "asset", "name": "test_asset", "uri": "test_asset", "extra": None}], + "objects": [ + { + "__type": "asset", + "name": "test_asset", + "uri": "test://asset/", + "group": "asset", + "extra": None, + } + ], }, } deserialized = AssetOrTimeSchedule.deserialize(mock_serialized_data) From 48af4ceb7017d926edea46bba9d50630a2fd112e Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 7 Nov 2024 15:40:33 +0800 Subject: [PATCH 04/10] fix(serialization): serialize both name, uri and group for Asset --- airflow/serialization/serialized_objects.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 4b7ee6d0871b..ab782e26f802 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -255,7 +255,7 @@ def encode_asset_condition(var: BaseAsset) -> dict[str, Any]: :meta private: """ if isinstance(var, Asset): - return {"__type": DAT.ASSET, "name": var.name, "uri": var.uri, "extra": var.extra} + return {"__type": DAT.ASSET, "name": var.name, "uri": var.uri, "group": var.group, "extra": var.extra} if isinstance(var, AssetAlias): return {"__type": DAT.ASSET_ALIAS, "name": var.name} if isinstance(var, AssetAll): @@ -273,7 +273,7 @@ def decode_asset_condition(var: dict[str, Any]) -> BaseAsset: """ dat = var["__type"] if dat == DAT.ASSET: - return Asset(uri=var["uri"], name=var["name"], extra=var["extra"]) + return Asset(name=var["name"], uri=var["uri"], group=var["group"], extra=var["extra"]) if dat == DAT.ASSET_ALL: return AssetAll(*(decode_asset_condition(x) for x in var["objects"])) if dat == DAT.ASSET_ANY: From 9941ba6f6d59dc64bc48a879740d260129890f40 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 12 Nov 2024 17:24:11 +0800 Subject: [PATCH 05/10] test(listeners/test_asset_listener): extend Asset test cases to include both uri and name --- tests/listeners/test_asset_listener.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/listeners/test_asset_listener.py b/tests/listeners/test_asset_listener.py index a075b87a7f3d..fe4bac7401b7 100644 --- a/tests/listeners/test_asset_listener.py +++ b/tests/listeners/test_asset_listener.py @@ -42,9 +42,11 @@ def clean_listener_manager(): @pytest.mark.db_test @provide_session def test_asset_listener_on_asset_changed_gets_calls(create_task_instance_of_operator, session): - asset_uri = "test_asset_uri" - asset = Asset(uri=asset_uri) - asset_model = AssetModel(uri=asset_uri) + asset_uri = "test://asset/" + asset_name = "test_asset_uri" + asset_group = "test-group" + asset = Asset(uri=asset_uri, name=asset_name, group=asset_group) + asset_model = AssetModel(uri=asset_uri, name=asset_name) session.add(asset_model) session.flush() @@ -60,3 +62,5 @@ def test_asset_listener_on_asset_changed_gets_calls(create_task_instance_of_oper assert len(asset_listener.changed) == 1 assert asset_listener.changed[0].uri == asset_uri + assert asset_listener.changed[0].name == asset_name + assert asset_listener.changed[0].group == asset_group From d73bc092d181939bb1bc3c82c891d6adce89c4fd Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 12 Nov 2024 18:03:41 +0800 Subject: [PATCH 06/10] test(jobs/test_scheduler_job): extend Asset test cases to include both uri and name --- tests/jobs/test_scheduler_job.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 2975fc49df5d..6e8d18b03a29 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3959,8 +3959,8 @@ def test_create_dag_runs_assets(self, session, dag_maker): - That dag_model has next_dagrun """ - asset1 = Asset(uri="ds1") - asset2 = Asset(uri="ds2") + asset1 = Asset(uri="test://asset1", name="test_asset", group="test_group") + asset2 = Asset(uri="test://asset2", name="test_asset_2", group="test_group") with dag_maker(dag_id="assets-1", start_date=timezone.utcnow(), session=session): BashOperator(task_id="task", bash_command="echo 1", outlets=[asset1]) @@ -4055,15 +4055,14 @@ def dict_from_obj(obj): ], ) def test_no_create_dag_runs_when_dag_disabled(self, session, dag_maker, disable, enable): - ds = Asset("ds") - with dag_maker(dag_id="consumer", schedule=[ds], session=session): + asset = Asset(uri="test://asset_1", name="test_asset_1", group="test_group") + with dag_maker(dag_id="consumer", schedule=[asset], session=session): pass with dag_maker(dag_id="producer", schedule="@daily", session=session): - BashOperator(task_id="task", bash_command="echo 1", outlets=ds) + BashOperator(task_id="task", bash_command="echo 1", outlets=asset) asset_manger = AssetManager() - asset_id = session.scalars(select(AssetModel.id).filter_by(uri=ds.uri)).one() - + asset_id = session.scalars(select(AssetModel.id).filter_by(uri=asset.uri, name=asset.name)).one() ase_q = select(AssetEvent).where(AssetEvent.asset_id == asset_id).order_by(AssetEvent.timestamp) adrq_q = select(AssetDagRunQueue).where( AssetDagRunQueue.asset_id == asset_id, AssetDagRunQueue.target_dag_id == "consumer" @@ -4076,7 +4075,7 @@ def test_no_create_dag_runs_when_dag_disabled(self, session, dag_maker, disable, dr1: DagRun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) asset_manger.register_asset_change( task_instance=dr1.get_task_instance("task", session=session), - asset=ds, + asset=asset, session=session, ) session.flush() @@ -4090,7 +4089,7 @@ def test_no_create_dag_runs_when_dag_disabled(self, session, dag_maker, disable, dr2: DagRun = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED) asset_manger.register_asset_change( task_instance=dr2.get_task_instance("task", session=session), - asset=ds, + asset=asset, session=session, ) session.flush() @@ -6178,11 +6177,11 @@ def _find_assets_activation(session) -> tuple[list[AssetModel], list[AssetModel] def test_asset_orphaning(self, dag_maker, session): self.job_runner = SchedulerJobRunner(job=Job(), subdir=os.devnull) - asset1 = Asset(uri="ds1") - asset2 = Asset(uri="ds2") - asset3 = Asset(uri="ds3") - asset4 = Asset(uri="ds4") - asset5 = Asset(uri="ds5") + asset1 = Asset(uri="test://asset_1", name="test_asset_1", group="test_group") + asset2 = Asset(uri="test://asset_2", name="test_asset_2", group="test_group") + asset3 = Asset(uri="test://asset_3", name="test_asset_3", group="test_group") + asset4 = Asset(uri="test://asset_4", name="test_asset_4", group="test_group") + asset5 = Asset(uri="test://asset_5", name="test_asset_5", group="test_group") with dag_maker(dag_id="assets-1", schedule=[asset1, asset2], session=session): BashOperator(task_id="task", bash_command="echo 1", outlets=[asset3, asset4]) @@ -6221,7 +6220,7 @@ def test_asset_orphaning(self, dag_maker, session): def test_asset_orphaning_ignore_orphaned_assets(self, dag_maker, session): self.job_runner = SchedulerJobRunner(job=Job(), subdir=os.devnull) - asset1 = Asset(uri="ds1") + asset1 = Asset(uri="test://asset_1", name="test_asset_1", group="test_group") with dag_maker(dag_id="assets-1", schedule=[asset1], session=session): BashOperator(task_id="task", bash_command="echo 1") From d341bf4ba000f849605a3dc1fcde539bacf62126 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 12 Nov 2024 19:23:31 +0800 Subject: [PATCH 07/10] test(providers/openlineage): extend Asset test cases to include both uri and name --- .../tests/openlineage/plugins/test_utils.py | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/providers/tests/openlineage/plugins/test_utils.py b/providers/tests/openlineage/plugins/test_utils.py index e84fac118657..3d41e87cf015 100644 --- a/providers/tests/openlineage/plugins/test_utils.py +++ b/providers/tests/openlineage/plugins/test_utils.py @@ -334,9 +334,9 @@ def test_serialize_timetable(): from airflow.timetables.simple import AssetTriggeredTimetable asset = AssetAny( - Asset("2"), - AssetAlias("example-alias"), - Asset("3"), + Asset(name="2", uri="test://2", group="test-group"), + AssetAlias(name="example-alias", group="test-group"), + Asset(name="3", uri="test://3", group="test-group"), AssetAll(AssetAlias("this-should-not-be-seen"), Asset("4")), ) dag = MagicMock() @@ -347,14 +347,32 @@ def test_serialize_timetable(): "asset_condition": { "__type": DagAttributeTypes.ASSET_ANY, "objects": [ - {"__type": DagAttributeTypes.ASSET, "extra": {}, "name": "2", "uri": "2"}, + { + "__type": DagAttributeTypes.ASSET, + "extra": {}, + "uri": "test://2/", + "name": "2", + "group": "test-group", + }, {"__type": DagAttributeTypes.ASSET_ANY, "objects": []}, - {"__type": DagAttributeTypes.ASSET, "extra": {}, "name": "3", "uri": "3"}, + { + "__type": DagAttributeTypes.ASSET, + "extra": {}, + "uri": "test://3/", + "name": "3", + "group": "test-group", + }, { "__type": DagAttributeTypes.ASSET_ALL, "objects": [ {"__type": DagAttributeTypes.ASSET_ANY, "objects": []}, - {"__type": DagAttributeTypes.ASSET, "extra": {}, "name": "4", "uri": "4"}, + { + "__type": DagAttributeTypes.ASSET, + "extra": {}, + "uri": "4", + "name": "4", + "group": "asset", + }, ], }, ], From d5c2245deb3eb2d4d143ee49473fd6df532bb3aa Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 13 Nov 2024 17:35:47 +0800 Subject: [PATCH 08/10] test(decorators/test_python): extend Asset test cases to include both uri and name --- tests/decorators/test_python.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py index 668d0be99b65..8358c7d53967 100644 --- a/tests/decorators/test_python.py +++ b/tests/decorators/test_python.py @@ -989,12 +989,13 @@ def test_task_decorator_asset(dag_maker, session): result = None uri = "s3://bucket/name" + asset_name = "test_asset" with dag_maker(session=session) as dag: @dag.task() def up1() -> Asset: - return Asset(uri) + return Asset(uri=uri, name=asset_name) @dag.task() def up2(src: Asset) -> str: From 159e858d1dc66ed5d5a79672c8162e7232156c45 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 13 Nov 2024 18:20:55 +0800 Subject: [PATCH 09/10] test(models/test_dag): extend asset test cases to cover name, uri, group --- tests/models/test_dag.py | 84 +++++++++++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 22 deletions(-) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 28f745b0614e..c4d01a660c6c 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -843,15 +843,24 @@ def test_bulk_write_to_db_assets(self): """ dag_id1 = "test_asset_dag1" dag_id2 = "test_asset_dag2" + task_id = "test_asset_task" + uri1 = "s3://asset/1" - a1 = Asset(uri1, extra={"not": "used"}) - a2 = Asset("s3://asset/2") - a3 = Asset("s3://asset/3") + a1 = Asset(uri=uri1, name="test_asset_1", extra={"not": "used"}, group="test-group") + a2 = Asset(uri="s3://asset/2", name="test_asset_2", group="test-group") + a3 = Asset(uri="s3://asset/3", name="test_asset-3", group="test-group") + dag1 = DAG(dag_id=dag_id1, start_date=DEFAULT_DATE, schedule=[a1]) EmptyOperator(task_id=task_id, dag=dag1, outlets=[a2, a3]) + dag2 = DAG(dag_id=dag_id2, start_date=DEFAULT_DATE, schedule=None) - EmptyOperator(task_id=task_id, dag=dag2, outlets=[Asset(uri1, extra={"should": "be used"})]) + EmptyOperator( + task_id=task_id, + dag=dag2, + outlets=[Asset(uri=uri1, name="test_asset_1", extra={"should": "be used"}, group="test-group")], + ) + session = settings.Session() dag1.clear() DAG.bulk_write_to_db([dag1, dag2], session=session) @@ -920,10 +929,10 @@ def test_bulk_write_to_db_does_not_activate(self, dag_maker, session): """ # Create four assets - two that have references and two that are unreferenced and marked as # orphans - asset1 = Asset(uri="ds1") - asset2 = Asset(uri="ds2") - asset3 = Asset(uri="ds3") - asset4 = Asset(uri="ds4") + asset1 = Asset(uri="test://asset1", name="asset1", group="test-group") + asset2 = Asset(uri="test://asset2", name="asset2", group="test-group") + asset3 = Asset(uri="test://asset3", name="asset3", group="test-group") + asset4 = Asset(uri="test://asset4", name="asset4", group="test-group") dag1 = DAG(dag_id="assets-1", start_date=DEFAULT_DATE, schedule=[asset1]) BashOperator(dag=dag1, task_id="task", bash_command="echo 1", outlets=[asset3]) @@ -1393,8 +1402,11 @@ def test_timetable_and_description_from_schedule_arg( assert dag.timetable.description == interval_description def test_timetable_and_description_from_asset(self): - dag = DAG("test_schedule_interval_arg", schedule=[Asset(uri="hello")], start_date=TEST_DATE) - assert dag.timetable == AssetTriggeredTimetable(Asset(uri="hello")) + uri = "test://asset" + dag = DAG( + "test_schedule_interval_arg", schedule=[Asset(uri=uri, group="test-group")], start_date=TEST_DATE + ) + assert dag.timetable == AssetTriggeredTimetable(Asset(uri=uri, group="test-group")) assert dag.timetable.description == "Triggered by assets" @pytest.mark.parametrize( @@ -2159,7 +2171,7 @@ def test_dags_needing_dagruns_not_too_early(self): session.close() def test_dags_needing_dagruns_assets(self, dag_maker, session): - asset = Asset(uri="hello") + asset = Asset(uri="test://asset", group="test-group") with dag_maker( session=session, dag_id="my_dag", @@ -2391,8 +2403,8 @@ def test__processor_dags_folder(self, session): @pytest.mark.need_serialized_dag def test_dags_needing_dagruns_asset_triggered_dag_info_queued_times(self, session, dag_maker): - asset1 = Asset(uri="ds1") - asset2 = Asset(uri="ds2") + asset1 = Asset(uri="test://asset1", group="test-group") + asset2 = Asset(uri="test://asset2", name="test_asset_2", group="test-group") for dag_id, asset in [("assets-1", asset1), ("assets-2", asset2)]: with dag_maker(dag_id=dag_id, start_date=timezone.utcnow(), session=session): @@ -2441,10 +2453,15 @@ def test_asset_expression(self, session: Session) -> None: dag = DAG( dag_id="test_dag_asset_expression", schedule=AssetAny( - Asset("s3://dag1/output_1.txt", extra={"hi": "bye"}), + Asset(uri="s3://dag1/output_1.txt", extra={"hi": "bye"}, group="test-group"), AssetAll( - Asset("s3://dag2/output_1.txt", extra={"hi": "bye"}), - Asset("s3://dag3/output_3.txt", extra={"hi": "bye"}), + Asset( + uri="s3://dag2/output_1.txt", + name="test_asset_2", + extra={"hi": "bye"}, + group="test-group", + ), + Asset("s3://dag3/output_3.txt", extra={"hi": "bye"}, group="test-group"), ), AssetAlias(name="test_name"), ), @@ -2455,9 +2472,32 @@ def test_asset_expression(self, session: Session) -> None: expression = session.scalars(select(DagModel.asset_expression).filter_by(dag_id=dag.dag_id)).one() assert expression == { "any": [ - "s3://dag1/output_1.txt", - {"all": ["s3://dag2/output_1.txt", "s3://dag3/output_3.txt"]}, - {"alias": "test_name"}, + { + "asset": { + "uri": "s3://dag1/output_1.txt", + "name": "s3://dag1/output_1.txt", + "group": "test-group", + } + }, + { + "all": [ + { + "asset": { + "uri": "s3://dag2/output_1.txt", + "name": "test_asset_2", + "group": "test-group", + } + }, + { + "asset": { + "uri": "s3://dag3/output_3.txt", + "name": "s3://dag3/output_3.txt", + "group": "test-group", + } + }, + ] + }, + {"alias": {"name": "test_name", "group": ""}}, ] } @@ -3016,9 +3056,9 @@ def test__time_restriction(dag_maker, dag_date, tasks_date, restrict): @pytest.mark.need_serialized_dag def test_get_asset_triggered_next_run_info(dag_maker, clear_assets): - asset1 = Asset(uri="ds1") - asset2 = Asset(uri="ds2") - asset3 = Asset(uri="ds3") + asset1 = Asset(uri="test://asset1", name="test_asset1", group="test-group") + asset2 = Asset(uri="test://asset2", group="test-group") + asset3 = Asset(uri="test://asset3", group="test-group") with dag_maker(dag_id="assets-1", schedule=[asset2]): pass dag1 = dag_maker.dag From 678640e254daa092c8c4bc7517c4f695470f6314 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 13 Nov 2024 18:36:12 +0800 Subject: [PATCH 10/10] fix(assets): extend Asset as_expression methods to include name, group fields (also AssetAlias group field) --- airflow/assets/__init__.py | 10 ++++++---- airflow/serialization/serialized_objects.py | 2 +- airflow/timetables/simple.py | 4 +++- .../core_api/routes/ui/test_assets.py | 16 +++++++++++++--- tests/assets/test_asset.py | 12 ++++++------ tests/www/views/test_views_grid.py | 7 ++++++- 6 files changed, 35 insertions(+), 16 deletions(-) diff --git a/airflow/assets/__init__.py b/airflow/assets/__init__.py index f1d36ac12b73..58522fa483af 100644 --- a/airflow/assets/__init__.py +++ b/airflow/assets/__init__.py @@ -343,7 +343,7 @@ def as_expression(self) -> Any: :meta private: """ - return self.uri + return {"asset": {"uri": self.uri, "name": self.name, "group": self.group}} def iter_assets(self) -> Iterator[tuple[str, Asset]]: yield self.uri, self @@ -390,7 +390,8 @@ def __init__(self, *objects: BaseAsset) -> None: raise TypeError("expect asset expressions in condition") self.objects = [ - _AssetAliasCondition(obj.name) if isinstance(obj, AssetAlias) else obj for obj in objects + _AssetAliasCondition(name=obj.name, group=obj.group) if isinstance(obj, AssetAlias) else obj + for obj in objects ] def evaluate(self, statuses: dict[str, bool]) -> bool: @@ -450,8 +451,9 @@ class _AssetAliasCondition(AssetAny): :meta private: """ - def __init__(self, name: str) -> None: + def __init__(self, name: str, group: str) -> None: self.name = name + self.group = group self.objects = expand_alias_to_assets(name) def __repr__(self) -> str: @@ -463,7 +465,7 @@ def as_expression(self) -> Any: :meta private: """ - return {"alias": self.name} + return {"alias": {"name": self.name, "group": self.group}} def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]: yield self.name, AssetAlias(self.name) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index ab782e26f802..1c943af3e373 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -1053,7 +1053,7 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]: ) ) elif isinstance(obj, AssetAlias): - cond = _AssetAliasCondition(obj.name) + cond = _AssetAliasCondition(name=obj.name, group=obj.group) deps.extend(cond.iter_dag_dependencies(source=task.dag_id, target="")) return deps diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py index 3457c52a08aa..f75acea81bb6 100644 --- a/airflow/timetables/simple.py +++ b/airflow/timetables/simple.py @@ -169,7 +169,9 @@ def __init__(self, assets: BaseAsset) -> None: super().__init__() self.asset_condition = assets if isinstance(self.asset_condition, AssetAlias): - self.asset_condition = _AssetAliasCondition(self.asset_condition.name) + self.asset_condition = _AssetAliasCondition( + name=self.asset_condition.name, group=self.asset_condition.group + ) if not next(self.asset_condition.iter_assets(), False): self._summary = AssetTriggeredTimetable.UNRESOLVED_ALIAS_SUMMARY diff --git a/tests/api_fastapi/core_api/routes/ui/test_assets.py b/tests/api_fastapi/core_api/routes/ui/test_assets.py index b5c85b98ba6b..b8f001d25d95 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_assets.py +++ b/tests/api_fastapi/core_api/routes/ui/test_assets.py @@ -36,7 +36,7 @@ def cleanup(): def test_next_run_assets(test_client, dag_maker): - with dag_maker(dag_id="upstream", schedule=[Asset(uri="s3://bucket/key/1")], serialized=True): + with dag_maker(dag_id="upstream", schedule=[Asset(uri="s3://bucket/next-run-asset/1")], serialized=True): EmptyOperator(task_id="task1") dag_maker.create_dagrun() @@ -46,6 +46,16 @@ def test_next_run_assets(test_client, dag_maker): assert response.status_code == 200 assert response.json() == { - "asset_expression": {"all": ["s3://bucket/key/1"]}, - "events": [{"id": 20, "uri": "s3://bucket/key/1", "lastUpdate": None}], + "asset_expression": { + "all": [ + { + "asset": { + "uri": "s3://bucket/next-run-asset/1", + "name": "s3://bucket/next-run-asset/1", + "group": "asset", + } + } + ] + }, + "events": [{"id": 20, "uri": "s3://bucket/next-run-asset/1", "lastUpdate": None}], } diff --git a/tests/assets/test_asset.py b/tests/assets/test_asset.py index a454fd2826bd..91a628f8f149 100644 --- a/tests/assets/test_asset.py +++ b/tests/assets/test_asset.py @@ -597,22 +597,22 @@ def resolved_asset_alias_2(self, session, asset_1): return asset_alias_2 def test_init(self, asset_alias_1, asset_1, resolved_asset_alias_2): - cond = _AssetAliasCondition(name=asset_alias_1.name) + cond = _AssetAliasCondition(name=asset_alias_1.name, group=asset_alias_1.group) assert cond.objects == [] - cond = _AssetAliasCondition(name=resolved_asset_alias_2.name) + cond = _AssetAliasCondition(name=resolved_asset_alias_2.name, group=resolved_asset_alias_2.group) assert cond.objects == [Asset(uri=asset_1.uri)] def test_as_expression(self, asset_alias_1, resolved_asset_alias_2): for assset_alias in (asset_alias_1, resolved_asset_alias_2): - cond = _AssetAliasCondition(assset_alias.name) - assert cond.as_expression() == {"alias": assset_alias.name} + cond = _AssetAliasCondition(name=assset_alias.name, group=assset_alias.group) + assert cond.as_expression() == {"alias": {"name": assset_alias.name, "group": ""}} def test_evalute(self, asset_alias_1, resolved_asset_alias_2, asset_1): - cond = _AssetAliasCondition(asset_alias_1.name) + cond = _AssetAliasCondition(name=asset_alias_1.name, group=asset_alias_1.group) assert cond.evaluate({asset_1.uri: True}) is False - cond = _AssetAliasCondition(resolved_asset_alias_2.name) + cond = _AssetAliasCondition(name=resolved_asset_alias_2.name, group=resolved_asset_alias_2.group) assert cond.evaluate({asset_1.uri: True}) is True diff --git a/tests/www/views/test_views_grid.py b/tests/www/views/test_views_grid.py index dd7179bc5884..b5e8a1dd9ddd 100644 --- a/tests/www/views/test_views_grid.py +++ b/tests/www/views/test_views_grid.py @@ -508,7 +508,12 @@ def test_next_run_assets(admin_client, dag_maker, session, app, monkeypatch): assert resp.status_code == 200, resp.json assert resp.json == { - "asset_expression": {"all": ["s3://bucket/key/1", "s3://bucket/key/2"]}, + "asset_expression": { + "all": [ + {"asset": {"uri": "s3://bucket/key/1", "name": "name_1", "group": "test-group"}}, + {"asset": {"uri": "s3://bucket/key/2", "name": "name_2", "group": "test-group"}}, + ] + }, "events": [ {"id": asset1_id, "uri": "s3://bucket/key/1", "lastUpdate": "2022-08-02T02:00:00+00:00"}, {"id": asset2_id, "uri": "s3://bucket/key/2", "lastUpdate": None},