Skip to content

Commit

Permalink
debugs
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
  • Loading branch information
wild-endeavor committed Jan 10, 2025
1 parent 5a1e301 commit e7260db
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
1 change: 0 additions & 1 deletion flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,6 @@ def initialize():
import sys
import threading
print(f"{threading.current_thread().name}: name: {__name__}")
print(f"{threading.current_thread().name}: {sys.modules}")
print(f"{threading.current_thread().name}: {sys.path}")
print(f"{threading.current_thread().name}: {sys.thread_info}")

Expand Down
16 changes: 12 additions & 4 deletions flytekit/core/worker_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,13 @@ def __init__(self, remote: FlyteRemote, ss: SerializationSettings, tag: str, roo
# Import this to ensure context is loaded... python is reloading this module because its in a different thread
FlyteContextManager.current_context()

Check warning on line 175 in flytekit/core/worker_queue.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/worker_queue.py#L175

Added line #L175 was not covered by tests

self.stopping_condition = threading.Event()

Check warning on line 177 in flytekit/core/worker_queue.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/worker_queue.py#L177

Added line #L177 was not covered by tests
# Things for actually kicking off and monitoring
self.__runner_thread: threading.Thread = threading.Thread(

Check warning on line 179 in flytekit/core/worker_queue.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/worker_queue.py#L179

Added line #L179 was not covered by tests
target=self._execute, daemon=True, name="controller-thread"
)
self.__runner_thread.start()
atexit.register(self._close)
atexit.register(self._close, event=self.stopping_condition, runner=self.__runner_thread)

Check warning on line 183 in flytekit/core/worker_queue.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/worker_queue.py#L182-L183

Added lines #L182 - L183 were not covered by tests

# Executions should be tracked in the following way:
# a) you should be able to list by label, all executions generated by the current eager task,
Expand All @@ -200,8 +201,11 @@ def __init__(self, remote: FlyteRemote, ss: SerializationSettings, tag: str, roo
self.tag = tag
self.root_tag = root_tag

def _close(self) -> None:
self.__runner_thread.join()
@staticmethod
def _close(event: threading.Event, runner: threading.Thread) -> None:
event.set()
printf("----------------------------- in close ------------------------")
runner.join()

Check warning on line 208 in flytekit/core/worker_queue.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/worker_queue.py#L206-L208

Added lines #L206 - L208 were not covered by tests

def reconcile_one(self, update: Update):
"""
Expand Down Expand Up @@ -286,6 +290,9 @@ def _poll(self) -> None:
FlyteContextManager.current_context()

Check warning on line 290 in flytekit/core/worker_queue.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/worker_queue.py#L290

Added line #L290 was not covered by tests
# This needs to be a while loop that runs forever,
while True:
if self.stopping_condition.is_set():
print(f"STOPPING RUNNER THREAD!")
break

Check warning on line 295 in flytekit/core/worker_queue.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/worker_queue.py#L294-L295

Added lines #L294 - L295 were not covered by tests
# Gather all items that need processing
update_items = self._get_update_items()
print(f"DEBUG: poll loop {update_items}")

Check warning on line 298 in flytekit/core/worker_queue.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/worker_queue.py#L297-L298

Added lines #L297 - L298 were not covered by tests
Expand Down Expand Up @@ -394,13 +401,14 @@ async def add(self, entity: RunnableEntity, input_kwargs: dict[str, typing.Any])
# wait for it to finish one way or another
while True:
developer_logger.debug(f"Watching id {id(i)}")
print(f"In Add loop - Watching id {id(i)}")

Check warning on line 404 in flytekit/core/worker_queue.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/worker_queue.py#L403-L404

Added lines #L403 - L404 were not covered by tests
if i.status == ItemStatus.SUCCESS:
return i.result

Check warning on line 406 in flytekit/core/worker_queue.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/worker_queue.py#L406

Added line #L406 was not covered by tests
elif i.status == ItemStatus.FAILED:
assert i.error is not None
raise i.error

Check warning on line 409 in flytekit/core/worker_queue.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/worker_queue.py#L408-L409

Added lines #L408 - L409 were not covered by tests
else:
await asyncio.sleep(1) # Small delay to avoid busy-waiting
await asyncio.sleep(2) # Small delay to avoid busy-waiting

Check warning on line 411 in flytekit/core/worker_queue.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/worker_queue.py#L411

Added line #L411 was not covered by tests

def render_html(self) -> str:
"""Render the callstack as a deck presentation to be shown after eager workflow execution."""
Expand Down

0 comments on commit e7260db

Please sign in to comment.