Skip to content

Commit

Permalink
style: address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Nov 12, 2024
1 parent 0cf5a60 commit 8b00864
Showing 1 changed file with 3 additions and 6 deletions.
9 changes: 3 additions & 6 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from functools import lru_cache, partial
from itertools import groupby
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator, Sequence
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator

from sqlalchemy import and_, delete, exists, func, not_, select, text, update
from sqlalchemy.exc import OperationalError
Expand Down Expand Up @@ -2141,17 +2141,14 @@ def _activate_assets_generate_warnings() -> Iterator[tuple[str, str]]:
incoming_uri_to_name[asset.uri] = asset.name
session.add(AssetActive.for_asset(asset))

def _get_first_item(x: Sequence[Any]) -> Any:
return x[0]

warnings_to_have = {
dag_id: DagWarning(
dag_id=dag_id,
error_type=DagWarningType.ASSET_CONFLICT,
message="\n".join([row[1] for row in group]),
message="\n".join([message for _, message in group]),
)
for dag_id, group in groupby(
sorted(_activate_assets_generate_warnings(), key=_get_first_item), key=_get_first_item
sorted(_activate_assets_generate_warnings()), key=operator.itemgetter(0)
)
}

Expand Down

0 comments on commit 8b00864

Please sign in to comment.