Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[local-worker-mgr] Force using 'spawn' for process creation #307

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions compiler_opt/distributed/local/local_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class TaskResult:
value: Any


def _get_context():
return multiprocessing.get_context('spawn')


SerializedClass = bytes


Expand All @@ -74,6 +78,7 @@ def _run_impl(pipe: connection.Connection, worker_class: SerializedClass, *args,
# jobs, this effectively limits the number of clang instances spawned.
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
obj = cloudpickle.loads(worker_class)(*args, **kwargs)
obj = cloudpickle.loads(worker_class)(*args, **kwargs)

# Pipes are not thread safe
pipe_lock = threading.Lock()
Expand Down Expand Up @@ -122,15 +127,15 @@ class _Stub:
"""Client stub to a worker hosted by a process."""

def __init__(self):
parent_pipe, child_pipe = multiprocessing.get_context().Pipe()
parent_pipe, child_pipe = _get_context().Pipe()
self._pipe = parent_pipe
self._pipe_lock = threading.Lock()

# this is the process hosting one worker instance.
# we set aside 1 thread to coordinate running jobs, and the main thread
# to handle high priority requests. The expectation is that the user
# achieves concurrency through multiprocessing, not multithreading.
self._process = multiprocessing.get_context().Process(
self._process = _get_context().Process(
target=functools.partial(_run, child_pipe, cloudpickle.dumps(cls), *
args, **kwargs))
# lock for the msgid -> reply future map. The map will be set to None
Expand Down