Skip to content

Commit

Permalink
asyncio loop profiling
Browse files Browse the repository at this point in the history
  • Loading branch information
adal-chiriliuc-reef committed Nov 4, 2024
1 parent dbad222 commit 4142a40
Showing 1 changed file with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@
_GIVE_AVERAGE_JOB_SEND_TIME_BONUS = False
_SEND_MACHINE_SPECS = False

# asyncio event loop profiling sleep interval
_LOOP_PROFILING_INTERVAL = 0.1

# always-on executor classes have spin_up_time=0, but realistically
# we need a bit more for all the back-and-forth messaging, especially
# when we talk with a lot of executors
Expand Down Expand Up @@ -442,6 +445,9 @@ class BatchContext:
stage_start_time: dict[str, datetime]
average_job_send_time: timedelta | None = None

loop_profiling_task: asyncio.Task[None] | None = None
loop_profiling_timings: list[float] | None = None

# for tests
_loop: asyncio.AbstractEventLoop | None = None

Expand Down Expand Up @@ -558,6 +564,20 @@ def emit_telemetry_event(self) -> SystemEvent | None:
executors = None
manifests[miner_hotkey] = executors

if self.loop_profiling_timings and len(self.loop_profiling_timings) >= 2:
loop_profiling = dict(
interval=_LOOP_PROFILING_INTERVAL,
count=len(self.loop_profiling_timings),
min=min(self.loop_profiling_timings),
max=max(self.loop_profiling_timings),
mean=statistics.mean(self.loop_profiling_timings),
median=statistics.median(self.loop_profiling_timings),
stddev=statistics.stdev(self.loop_profiling_timings),
variance=statistics.variance(self.loop_profiling_timings),
)
else:
loop_profiling = None

data = dict(
validator_hotkey=self.own_keypair.ss58_address,
stage_start_time={
Expand All @@ -566,6 +586,7 @@ def emit_telemetry_event(self) -> SystemEvent | None:
average_job_send_time=_timedelta_dump(self.average_job_send_time),
counts=counts,
manifests=manifests,
loop_profiling=loop_profiling,
)
return self.system_event(
type=SystemEvent.EventType.VALIDATOR_TELEMETRY,
Expand Down Expand Up @@ -673,6 +694,22 @@ def _init_context(
return ctx


async def _loop_profiling(ctx: BatchContext) -> None:
ctx.loop_profiling_timings = []

while True:
try:
sleep_before_ns = time.monotonic_ns()
await asyncio.sleep(_LOOP_PROFILING_INTERVAL)
sleep_after_ns = time.monotonic_ns()

sleep_duration_sec = (sleep_after_ns - sleep_before_ns) / 1_000_000_000
ctx.loop_profiling_timings.append(sleep_duration_sec)

except asyncio.CancelledError:
break


def _get_max_spin_up_time(ctx: BatchContext) -> int:
max_spin_up_time = _MIN_SPIN_UP_TIME
for executors in ctx.executors.values():
Expand Down Expand Up @@ -1633,6 +1670,8 @@ async def execute_synthetic_batch_run(
await ctx.checkpoint_system_event("BATCH_BEGIN", dt=start_time)

try:
ctx.loop_profiling_task = asyncio.create_task(_loop_profiling(ctx), name="_loop_profiling")

await ctx.checkpoint_system_event("_db_get_previous_online_executor_count")
await _db_get_previous_online_executor_count(ctx)

Expand Down Expand Up @@ -1690,6 +1729,14 @@ async def execute_synthetic_batch_run(
else:
logger.warning("No executors available")

if ctx.loop_profiling_task is not None:
ctx.loop_profiling_task.cancel()
try:
await ctx.loop_profiling_task
except asyncio.CancelledError:
pass
ctx.loop_profiling_task = None

except (Exception, asyncio.CancelledError) as exc:
logger.error("Synthetic jobs batch failure: %r", exc)
ctx.system_event(
Expand Down

0 comments on commit 4142a40

Please sign in to comment.