Skip to content

Commit

Permalink
allow some concurrent dag scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish committed Nov 12, 2024
1 parent d54fc76 commit db9b19f
Showing 1 changed file with 36 additions and 15 deletions.
51 changes: 36 additions & 15 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import signal
import sys
import time
from asyncio import Semaphore
from collections import Counter, defaultdict, deque
from datetime import timedelta
from functools import lru_cache, partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterator

from asgiref.sync import sync_to_async
from sqlalchemy import and_, delete, exists, func, not_, select, text, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.asyncio import AsyncSession
Expand Down Expand Up @@ -102,6 +102,8 @@
DR = DagRun
DM = DagModel

dagrun_lock_semaphore = Semaphore(3)


class ConcurrencyMap:
"""
Expand Down Expand Up @@ -142,6 +144,9 @@ def _is_parent_process() -> bool:
return multiprocessing.current_process().name == "MainProcess"


continue_scheduling = None


class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
"""
SchedulerJobRunner runs for a specific time interval and schedules jobs that are ready to run.
Expand Down Expand Up @@ -264,18 +269,29 @@ def _debug_dump(self, signum: int, frame: FrameType | None) -> None:
self.log.info("-" * 80)

async def _schedule_a_dag_run(self):
session = create_async_session()

async with session.begin():
dag_run = await DagRun.get_running_dag_run_to_examine(session=session)

# callback_tuples = asyncio.run(self.schedule_dag_run(guard, dag_run, session))

if dag_run is not None:
await self._schedule_dag_run(dag_run, session=session)
else:
print("nothing to schedule!")

try:
await dagrun_lock_semaphore.acquire()
global continue_scheduling
if continue_scheduling is False:
self.log.info("continue_scheduling is False, skipping")
return
session = create_async_session()

async with session.begin():
dag_run = await DagRun.get_running_dag_run_to_examine(session=session)
if not dag_run:
continue_scheduling = False
self.log.info("no more dag runs to schedule")
return
dagrun_lock_semaphore.release()
# callback_tuples = asyncio.run(self.schedule_dag_run(guard, dag_run, session))

if dag_run is not None:
await self._schedule_dag_run(dag_run, session=session)
else:
print("nothing to schedule!")
finally:
dagrun_lock_semaphore.release()
def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -> list[TI]:
"""
Find TIs that are ready for execution based on conditions.
Expand Down Expand Up @@ -1217,6 +1233,8 @@ def _do_scheduling(self, session: Session) -> int:
:return: Number of TIs enqueued in this iteration
"""
# Put a check in place to make sure we don't commit unexpectedly
global continue_scheduling
continue_scheduling = True
with prohibit_commit(session) as guard:
if settings.USE_JOB_SCHEDULE:
self._create_dagruns_for_dags(guard, session)
Expand All @@ -1225,8 +1243,11 @@ def _do_scheduling(self, session: Session) -> int:
guard.commit()

loop = asyncio.get_event_loop()
loop.run_until_complete(self._schedule_a_dag_run())

for num in range(25):
asyncio.ensure_future(self._schedule_a_dag_run(), loop=loop)
pending = asyncio.all_tasks(loop)
loop.run_until_complete(asyncio.gather(*pending))
time.sleep(15)
callback_tuples = []
# Send the callbacks after we commit to ensure the context is up to date when it gets run
# cache saves time during scheduling of many dag_runs for same dag
Expand Down

0 comments on commit db9b19f

Please sign in to comment.