Skip to content

Commit

Permalink
Make param validation consistent for DAG validation and triggering (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheffl authored Sep 10, 2023
1 parent 13d2f4a commit 3e34079
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
AirflowSkipException,
DuplicateTaskIdFound,
FailStopDagInvalidTriggerRule,
ParamValidationError,
RemovedInAirflow3Warning,
TaskNotFound,
)
Expand Down Expand Up @@ -3276,20 +3277,20 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed

def validate_schedule_and_params(self):
"""
Validate Param values when the schedule_interval is not None.
Validate Param values when the DAG has schedule defined.
Raise exception if there are any Params in the DAG which neither have a default value nor
have the null in schema['type'] list, but the DAG have a schedule_interval which is not None.
Raise exception if there are any Params which can not be resolved by their schema definition.
"""
if not self.timetable.can_be_scheduled:
return

for v in self.params.values():
# As type can be an array, we would check if `null` is an allowed type or not
if not v.has_value and ("type" not in v.schema or "null" not in v.schema["type"]):
raise AirflowException(
"DAG Schedule must be None, if there are any required params without default values"
)
try:
self.params.validate()
except ParamValidationError as pverr:
raise AirflowException(
"DAG is not allowed to define a Schedule, "
"if there are any required params without default values or default values are not valid."
) from pverr

def iter_invalid_owner_links(self) -> Iterator[tuple[str, str]]:
"""
Expand Down

0 comments on commit 3e34079

Please sign in to comment.