Skip to content

Commit

Permalink
Merge pull request #272 from BCG-Gamma/dev/2.0.0dev0
Browse files Browse the repository at this point in the history
BUILD: release version 2.0.0dev0
  • Loading branch information
j-ittner authored Sep 10, 2021
2 parents d62f7ea + 74ded60 commit 2f0af64
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 79 deletions.
43 changes: 38 additions & 5 deletions RELEASE_NOTES.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,35 @@
Release Notes
=============

*pytools* 2.0
-------------

2.0.0
~~~~~

- API: revised job/queue API in :mod:`pytools.parallelization`
- method :meth:`.JobRunner.run_jobs` now expects a single iterable of :class:`.Job`
objects instead of individual jobs as positional arguments
- method :meth:`.JobRunner.run_queues` now expects a single iterable of
:class:`.JobQueue` objects instead of individual queues as positional arguments
- method :meth:`.JobRunner.run_queues` returns a list of results instead of an
iterator
- methods :meth:`.JobRunner.run_queue` and :meth:`.JobRunner.run_queues` are now
thread-safe
- rename method `collate` of class :class:`.JobQueue` to :meth:`.JobQueue.aggregate`
- :class:`.SimpleQueue` is now an abstract class, expecting subclasses to implement
method :meth:`.SimpleQueue.aggregate`


*pytools* 1.2
-------------

1.2.3
~~~~~

This is a maintenance release to catch up with *pytools* 1.1.6.


1.2.2
~~~~~

Expand Down Expand Up @@ -32,12 +58,19 @@ This is a maintenance release to catch up with *pytools* 1.1.4.
*pytools* 1.1
-------------

1.1.6
~~~~~

- FIX: ensure correct weight labels when rendering dendograms as plain text using the
:class:`.DendrogramReportStyle`


1.1.5
~~~~~

- FIX: fixed a rare case where :meth:`~.Expression.eq_` returned `False` for two
equivalent expressions if one of them included an :class:`~.ExpressionAlias`
- FIX: accept any type of numerical values as leaf weights of :class:`~.LinkageTree`
- FIX: fixed a rare case where :meth:`.Expression.eq_` returned `False` for two
equivalent expressions if one of them included an :class:`.ExpressionAlias`
- FIX: accept any type of numerical values as leaf weights of :class:`.LinkageTree`


1.1.4
Expand All @@ -50,7 +83,7 @@ This is a maintenance release to catch up with *pytools* 1.1.4.
~~~~~

- FIX: comparing two :class:`.InfixExpression` objects using method
:meth:`~.Expression.eq_` would erroneously yield ``True`` if both expressions
:meth:`.Expression.eq_` would erroneously yield ``True`` if both expressions
had the same operator but a different number of operands, and the operands of the
shorter expression were equal to the operands at the start of the longer expression

Expand Down Expand Up @@ -90,7 +123,7 @@ This is a maintenance release to catch up with *pytools* 1.1.4.
1.0.6
~~~~~

- FIX: back-port 1.1 bugfix for :meth:`~.Expression.eq_`
- FIX: back-port 1.1 bugfix for :meth:`.Expression.eq_`


1.0.5
Expand Down
7 changes: 4 additions & 3 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
trigger:
- 1.2.x
- 2.0.x
- release/*

pr:
- 1.2.x
- 2.0.x
- release/*

# set the build name
Expand All @@ -23,7 +23,7 @@ resources:
type: github
endpoint: BCG-Gamma
name: BCG-Gamma/pytools
ref: 1.2.x
ref: 2.0.x

variables:
${{ if not(startsWith(variables['Build.SourceBranch'], 'refs/pull/')) }}:
Expand Down Expand Up @@ -468,6 +468,7 @@ stages:
echo "Current version: $version"
echo "Detecting pre-release ('rc' in version)"
prerelease=False
[[ $version == *dev* ]] && prerelease=True && echo "Development release identified"
[[ $version == *rc* ]] && prerelease=True && echo "Pre-release identified"
echo "##vso[task.setvariable variable=current_version]$version"
echo "##vso[task.setvariable variable=is_prerelease]$prerelease"
Expand Down
2 changes: 1 addition & 1 deletion src/pytools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
A collection of Python extensions and tools used in BCG GAMMA's open-source libraries.
"""
__version__ = "1.2.2"
__version__ = "2.0.0dev0"
134 changes: 72 additions & 62 deletions src/pytools/parallelization/_parallelization.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import logging
from abc import ABCMeta, abstractmethod
from functools import wraps
from multiprocessing import Lock
from typing import (
Any,
Callable,
Generic,
Iterable,
Iterator,
List,
Optional,
Sequence,
Expand All @@ -32,12 +32,12 @@
#

__all__ = [
"ParallelizableMixin",
"Job",
"JobQueue",
"JobRunner",
"SimpleQueue",
"NestedQueue",
"ParallelizableMixin",
"SimpleQueue",
]

#
Expand Down Expand Up @@ -161,6 +161,13 @@ class JobQueue(Generic[T_Job_Result, T_Queue_Result], metaclass=ABCMeta):
Supports :meth:`.len` to determine the number of jobs in this queue.
"""

#: The lock used by class :class:`.JobRunner` to prevent parallel executions of the
#: same queue
lock: Lock

def __init__(self) -> None:
self.lock = Lock()

@abstractmethod
def jobs(self) -> Iterable[Job[T_Job_Result]]:
"""
Expand All @@ -179,14 +186,14 @@ def on_run(self) -> None:
"""

@abstractmethod
def collate(self, job_results: List[T_Job_Result]) -> T_Queue_Result:
def aggregate(self, job_results: List[T_Job_Result]) -> T_Queue_Result:
"""
Called by :meth:`.JobRunner.run` to collate the results of all jobs once they
Called by :meth:`.JobRunner.run` to aggregate the results of all jobs once they
have all been run.
:param job_results: list of job results, ordered corresponding to the sequence
of jobs generated by method :meth:`.jobs`
:return: the collated result of running the queue
:return: the aggregated result of running the queue
"""
pass

Expand All @@ -197,7 +204,7 @@ def __len__(self) -> int:

class JobRunner(ParallelizableMixin):
"""
Runs job queues in parallel and collates results.
Runs job queues in parallel and aggregates results.
"""

@classmethod
Expand All @@ -220,65 +227,73 @@ def from_parallelizable(
verbose=parallelizable.verbose,
)

def run_jobs(self, *jobs: Job[T_Job_Result]) -> List[T_Job_Result]:
def run_jobs(self, jobs: Iterable[Job[T_Job_Result]]) -> List[T_Job_Result]:
"""
Run all given jobs in parallel.
:param jobs: the jobs to run in parallel
:return: the results of all jobs
"""
simple_queue: JobQueue[T_Job_Result, List[T_Job_Result]] = SimpleQueue(
jobs=jobs
)
return self.run_queue(simple_queue)
with self._parallel() as parallel:
return parallel(joblib.delayed(lambda job: job.run())(job) for job in jobs)

def run_queue(self, queue: JobQueue[Any, T_Queue_Result]) -> T_Queue_Result:
"""
Run all jobs in the given queue, in parallel.
:param queue: the queue to run
:return: the result of all jobs, collated using method :meth:`.JobQueue.collate`
:raise AssertionError: the number of results does not match the number of jobs
in the queue
:return: the result of all jobs, aggregated using method
:meth:`.JobQueue.aggregate`
"""

queue.on_run()
with queue.lock:

with self._parallel() as parallel:
results: List[T_Job_Result] = parallel(
joblib.delayed(lambda job: job.run())(job) for job in queue.jobs()
)
# notify the queue that we're about to run it
queue.on_run()

if len(results) != len(queue):
raise AssertionError(
f"Number of results ({len(results)}) does not match length of "
f"queue ({len(queue)}): check method {type(queue).__name__}.__len__()"
)
results = self.run_jobs(queue.jobs())

if len(results) != len(queue):
raise AssertionError(
f"Number of results ({len(results)}) does not match length of "
f"queue ({len(queue)}): check method {type(queue).__name__}.__len__"
)

return queue.collate(job_results=results)
return queue.aggregate(job_results=results)

def run_queues(
self, *queues: JobQueue[Any, T_Queue_Result]
) -> Iterator[T_Queue_Result]:
self, queues: Iterable[JobQueue[Any, T_Queue_Result]]
) -> List[T_Queue_Result]:
"""
Run all jobs in the given queues, in parallel.
:param queues: the queues to run
:return: the result of all jobs, collated per queue using method
:meth:`.JobQueue.collate`
:raise AssertionError: the number of results does not match the total number of
jobs in the queues
:param queues: the queues to run in parallel
:return: the result of all jobs in all queues, aggregated per queue using method
:meth:`.JobQueue.aggregate`
"""

for queue in queues:
queue.on_run()
queues: Sequence[JobQueue[T_Queue_Result]] = to_tuple(
queues, element_type=JobQueue, arg_name="queues"
)

with self._parallel() as parallel:
results: List[T_Job_Result] = parallel(
joblib.delayed(lambda job: job.run())(job)
for queue in queues
for job in queue.jobs()
)
try:
for queue in queues:
queue.lock.acquire()

# notify the queues that we're about to run them
for queue in queues:
queue.on_run()

with self._parallel() as parallel:
results: List[T_Job_Result] = parallel(
joblib.delayed(lambda job: job.run())(job)
for queue in queues
for job in queue.jobs()
)

finally:
for queue in queues:
queue.lock.release()

queues_len = sum(len(queue) for queue in queues)
if len(results) != queues_len:
Expand All @@ -287,11 +302,12 @@ def run_queues(
f"queues ({queues_len}): check method __len__() of the queue class(es)"
)

first_job = 0
for queue in queues:
last_job = first_job + len(queue)
yield queue.collate(results[first_job:last_job])
first_job = last_job
# split the results into a list for each queue
queue_ends = list(itertools.accumulate(len(queue) for queue in queues))
return [
queue.aggregate(results[first_job:last_job])
for queue, first_job, last_job in zip(queues, [0, *queue_ends], queue_ends)
]

def _parallel(self) -> joblib.Parallel:
# Generate a :class:`joblib.Parallel` instance using the parallelization
Expand All @@ -300,9 +316,13 @@ def _parallel(self) -> joblib.Parallel:


@inheritdoc(match="""[see superclass]""")
class SimpleQueue(JobQueue[T_Job_Result, List[T_Job_Result]], Generic[T_Job_Result]):
class SimpleQueue(
JobQueue[T_Job_Result, T_Queue_Result],
Generic[T_Job_Result, T_Queue_Result],
metaclass=ABCMeta,
):
"""
A simple queue, running a given list of jobs and returning their results as a list.
A simple queue, running a given list of jobs.
"""

#: The jobs run by this queue.
Expand All @@ -313,28 +333,18 @@ def __init__(self, jobs: Iterable[Job[T_Job_Result]]) -> None:
:param jobs: jobs to be run by this queue in the given order
"""
super().__init__()
self._jobs = to_tuple(jobs)
self._jobs = to_tuple(jobs, element_type=Job, arg_name="jobs")

def jobs(self) -> Iterable[Job[T_Job_Result]]:
"""[see superclass]"""
return self._jobs

def collate(self, job_results: List[T_Job_Result]) -> T_Queue_Result:
"""
Return the list of job results as-is, without collating them any further.
:param job_results: list of job results, ordered corresponding to the sequence
of jobs generated by method :meth:`.jobs`
:return: the identical list of job results
"""
return job_results

def __len__(self) -> int:
return len(self._jobs)


@inheritdoc(match="""[see superclass]""")
class NestedQueue(JobQueue[T_Job_Result, List[T_Job_Result]]):
class NestedQueue(JobQueue[T_Job_Result, List[T_Job_Result]], Generic[T_Job_Result]):
"""
Runs all jobs in a given list of compatible queues and returns their results as a
flat list.
Expand All @@ -356,9 +366,9 @@ def jobs(self) -> Iterable[Job[T_Job_Result]]:
"""[see superclass]"""
return itertools.chain.from_iterable(queue.jobs() for queue in self.queues)

def collate(self, job_results: List[T_Job_Result]) -> T_Queue_Result:
def aggregate(self, job_results: List[T_Job_Result]) -> List[T_Job_Result]:
"""
Return the list of job results as-is, without collating them any further.
Return the list of job results as-is, without aggregating them any further.
:param job_results: list of job results, ordered corresponding to the sequence
of jobs generated by method :meth:`.jobs`
Expand Down
2 changes: 1 addition & 1 deletion src/pytools/viz/_viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""
import logging
from abc import ABCMeta, abstractmethod
from threading import Lock
from multiprocessing import Lock
from typing import Any, Dict, Generic, Iterable, Optional, Type, TypeVar, Union, cast

from ..api import AllTracker, inheritdoc
Expand Down
Loading

0 comments on commit 2f0af64

Please sign in to comment.