Skip to content

Commit

Permalink
Merge branch 'feature/v0.4.7' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
KelSolaar committed Dec 24, 2024
2 parents 9b62207 + 2e44188 commit 62ae8bc
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 23 deletions.
173 changes: 150 additions & 23 deletions colour/utilities/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from __future__ import annotations

import atexit
import concurrent.futures
import multiprocessing
import os
Expand Down Expand Up @@ -65,7 +66,9 @@
"ExecutionNode",
"ControlFlowNode",
"For",
"ThreadPoolExecutorManager",
"ParallelForThread",
"ProcessPoolExecutorManager",
"ParallelForMultiprocess",
]

Expand Down Expand Up @@ -2140,6 +2143,65 @@ def _task_thread(args: Sequence) -> tuple[int, Any]:
return i, sub_graph.get_output("output")


class ThreadPoolExecutorManager:
"""
Define a singleton class managing our
:class:`concurrent.futures.ThreadPoolExecutor` class instance.
Attributes
----------
- :attr:`~colour.utilities.ThreadPoolExecutorManager.ThreadPoolExecutor`
Methods
-------
- :meth:`~colour.utilities.ThreadPoolExecutorManager.get_executor`
- :meth:`~colour.utilities.ThreadPoolExecutorManager.shutdown_executor`
"""

ThreadPoolExecutor: concurrent.futures.ThreadPoolExecutor | None = None

@staticmethod
def get_executor(
max_workers: int | None = None,
) -> concurrent.futures.ThreadPoolExecutor:
"""
Return the :class:`concurrent.futures.ThreadPoolExecutor` class instance or
create it if not existing.
Parameters
----------
max_workers
Maximum worker count.
Returns
-------
:class:`concurrent.futures.ThreadPoolExecutor`
Notes
-----
The :class:`concurrent.futures.ThreadPoolExecutor` class instance is
automatically shutdown on process exit.
"""

if ThreadPoolExecutorManager.ThreadPoolExecutor is None:
ThreadPoolExecutorManager.ThreadPoolExecutor = (
concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
)

return ThreadPoolExecutorManager.ThreadPoolExecutor

@atexit.register
@staticmethod
def shutdown_executor() -> None:
"""
Shutdown the :class:`concurrent.futures.ThreadPoolExecutor` class instance.
"""

if ThreadPoolExecutorManager.ThreadPoolExecutor is not None:
ThreadPoolExecutorManager.ThreadPoolExecutor.shutdown(wait=True)
ThreadPoolExecutorManager.ThreadPoolExecutor = None


class ParallelForThread(ControlFlowNode):
"""
Define an advanced ``for`` loop node distributing the work across multiple
Expand Down Expand Up @@ -2198,18 +2260,20 @@ def process(self) -> None:
self.log(f'Processing "{node}" node...')

results = {}
with concurrent.futures.ThreadPoolExecutor(
thread_pool_executor = ThreadPoolExecutorManager.get_executor(
max_workers=self.get_input("workers")
) as executor:
futures = [
executor.submit(self.get_input("task"), (i, element, node, self))
for i, element in enumerate(self.get_input("array"))
]
)
futures = [
thread_pool_executor.submit(
self.get_input("task"), (i, element, node, self)
)
for i, element in enumerate(self.get_input("array"))
]

for future in concurrent.futures.as_completed(futures):
index, element = future.result()
self.log(f'Processed "{element}" element with index "{index}".')
results[index] = element
for future in concurrent.futures.as_completed(futures):
index, element = future.result()
self.log(f'Processed "{element}" element with index "{index}".')
results[index] = element

results = dict(sorted(results.items()))
self.set_output("results", list(results.values()))
Expand Down Expand Up @@ -2253,6 +2317,68 @@ def _task_multiprocess(args: Sequence) -> tuple[int, Any]:
return i, sub_graph.get_output("output")


class ProcessPoolExecutorManager:
"""
Define a singleton class managing our
:class:`concurrent.futures.ProcessPoolExecutor` class instance.
Attributes
----------
- :attr:`~colour.utilities.ProcessPoolExecutorManager.ProcessPoolExecutor`
Methods
-------
- :meth:`~colour.utilities.ProcessPoolExecutorManager.get_executor`
- :meth:`~colour.utilities.ProcessPoolExecutorManager.shutdown_executor`
"""

ProcessPoolExecutor: concurrent.futures.ProcessPoolExecutor | None = None

@staticmethod
def get_executor(
max_workers: int | None = None,
) -> concurrent.futures.ProcessPoolExecutor:
"""
Return the :class:`concurrent.futures.ProcessPoolExecutor` class instance or
create it if not existing.
Parameters
----------
max_workers
Maximum worker count.
Returns
-------
:class:`concurrent.futures.ProcessPoolExecutor`
Notes
-----
The :class:`concurrent.futures.ProcessPoolExecutor` class instance is
automatically shutdown on process exit.
"""

if ProcessPoolExecutorManager.ProcessPoolExecutor is None:
context = multiprocessing.get_context("spawn")
ProcessPoolExecutorManager.ProcessPoolExecutor = (
concurrent.futures.ProcessPoolExecutor(
mp_context=context, max_workers=max_workers
)
)

return ProcessPoolExecutorManager.ProcessPoolExecutor

@atexit.register
@staticmethod
def shutdown_executor() -> None:
"""
Shutdown the :class:`concurrent.futures.ProcessPoolExecutor` class instance.
"""

if ProcessPoolExecutorManager.ProcessPoolExecutor is not None:
ProcessPoolExecutorManager.ProcessPoolExecutor.shutdown(wait=True)
ProcessPoolExecutorManager.ProcessPoolExecutor = None


class ParallelForMultiprocess(ControlFlowNode):
"""
Define an advanced ``for`` loop node distributing the work across multiple
Expand Down Expand Up @@ -2305,19 +2431,20 @@ def process(self) -> None:
self.log(f'Processing "{node}" node...')

results = {}
context = multiprocessing.get_context("spawn")
with concurrent.futures.ProcessPoolExecutor(
mp_context=context, max_workers=self.get_input("processes")
) as executor:
futures = [
executor.submit(self.get_input("task"), (i, element, node, self))
for i, element in enumerate(self.get_input("array"))
]

for future in concurrent.futures.as_completed(futures):
index, element = future.result()
self.log(f'Processed "{element}" element with index "{index}".')
results[index] = element
process_pool_executor = ProcessPoolExecutorManager.get_executor(
max_workers=self.get_input("processes")
)
futures = [
process_pool_executor.submit(
self.get_input("task"), (i, element, node, self)
)
for i, element in enumerate(self.get_input("array"))
]

for future in concurrent.futures.as_completed(futures):
index, element = future.result()
self.log(f'Processed "{element}" element with index "{index}".')
results[index] = element

results = dict(sorted(results.items()))
self.set_output("results", list(results.values()))
Expand Down
34 changes: 34 additions & 0 deletions colour/utilities/tests/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
TreeNode,
is_pydot_installed,
)
from colour.utilities.network import (
ProcessPoolExecutorManager,
ThreadPoolExecutorManager,
)

__author__ = "Colour Developers"
__copyright__ = "Copyright 2013 Colour Developers"
Expand All @@ -35,7 +39,9 @@
"TestPortNode",
"TestPortGraph",
"TestFor",
"TestThreadPoolExecutorManager",
"TestParallelForThread",
"TestProcessPoolExecutorManager",
"TestParallelForMultiProcess",
]

Expand Down Expand Up @@ -1084,6 +1090,20 @@ def process(self, **kwargs: Any) -> None:
super().process(**kwargs)


class TestThreadPoolExecutorManager:
"""
Define :class:`colour.utilities.network.ThreadPoolExecutorManager` class unit tests
methods.
"""

def test_ThreadPoolExecutorManager(self) -> None:
"""Test :class:`colour.utilities.network.ThreadPoolExecutorManager` class."""

executor = ThreadPoolExecutorManager.get_executor()

assert executor is not None


class TestParallelForThread:
"""
Define :class:`colour.utilities.network.ParallelForThread` class unit tests
Expand All @@ -1108,6 +1128,20 @@ def test_ParallelForThread(self) -> None:
assert sum_array.get_output("summation") == 140


class TestProcessPoolExecutorManager:
"""
Define :class:`colour.utilities.network.ProcessPoolExecutorManager` class unit tests
methods.
"""

def test_ProcessPoolExecutorManager(self) -> None:
"""Test :class:`colour.utilities.network.ProcessPoolExecutorManager` class."""

executor = ProcessPoolExecutorManager.get_executor()

assert executor is not None


class TestParallelForMultiProcess:
"""
Define :class:`colour.utilities.network.ParallelForMultiProcess` class unit
Expand Down

0 comments on commit 62ae8bc

Please sign in to comment.