Skip to content

Commit

Permalink
fix(airflow): fix xcom pull in _try_to_adopt_job (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala authored Jul 18, 2024
1 parent 3cfea92 commit f62dd0b
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions spark_on_k8s/airflow/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,22 @@ def _persist_spark_history_ui_link(self, context: Context):
def _try_to_adopt_job(self, context: Context, spark_app_manager: SparkAppManager) -> bool:
from spark_on_k8s.utils.spark_app_status import SparkAppStatus

xcom_driver_namespace = context["ti"].xcom_pull(key=self._XCOM_DRIVER_POD_NAMESPACE)
xcom_driver_namespace = context["ti"].xcom_pull(
dag_id=context["ti"].dag_id,
task_ids=context["ti"].task_id,
map_indexes=context["ti"].map_index,
key=self._XCOM_DRIVER_POD_NAMESPACE,
include_prior_dates=True,
)
if not xcom_driver_namespace or xcom_driver_namespace != self.namespace:
return False
xcom_driver_pod_name = context["ti"].xcom_pull(key=self._XCOM_DRIVER_POD_NAME)
xcom_driver_pod_name = context["ti"].xcom_pull(
dag_id=context["ti"].dag_id,
task_ids=context["ti"].task_id,
map_indexes=context["ti"].map_index,
key=self._XCOM_DRIVER_POD_NAME,
include_prior_dates=True,
)
if xcom_driver_pod_name:
with contextlib.suppress(Exception):
app_status = spark_app_manager.app_status(
Expand Down

0 comments on commit f62dd0b

Please sign in to comment.