Skip to content

Commit

Permalink
Improve concurrency
Browse files Browse the repository at this point in the history
This change rewrites the way broker actions handle concurrency.
Previously, this was offloaded in a manner that was wasteful in the way
it handled pre-action setup.
Now, we don't go concurrent until we're ready to perform the action we
want.
  • Loading branch information
JacobCallahan committed Sep 20, 2024
1 parent 9358211 commit 15e5851
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 86 deletions.
95 changes: 28 additions & 67 deletions broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
It defines the `Host` class, which represents a cloud resource, and the `Broker` class,
which provides methods for managing hosts.
The `Broker` class is decorated with `mp_decorator`, which enables multiprocessing for
certain methods. The `Host` class is defined in the `broker.hosts` module,
The `Host` class is defined in the `broker.hosts` module,
and the provider classes are defined in the `broker.providers` module.
Exceptions are defined in the `broker.exceptions` module,
Expand Down Expand Up @@ -40,44 +39,6 @@ def _try_teardown(host_obj):
return exceptions.HostError(host_obj, f"error during teardown:\n{err}")


class mp_decorator:
"""Decorator wrapping Broker methods to enable multiprocessing.
The decorated method is expected to return an itearable.
"""

MAX_WORKERS = None
""" If set to integer, the count of workers will be limited to that amount.
If set to None, the max workers count of the EXECUTOR will match the count of items."""

EXECUTOR = ThreadPoolExecutor

def __init__(self, func=None):
self.func = func

def __get__(self, instance, owner):
"""Support instance methods."""
if not instance:
return self.func

def mp_split(*args, **kwargs):
count = instance._kwargs.get("_count", None)
if count is None:
return self.func(instance, *args, **kwargs)

results = []
max_workers_count = self.MAX_WORKERS or count
with self.EXECUTOR(max_workers=max_workers_count) as workers:
completed_futures = as_completed(
workers.submit(self.func, instance, *args, **kwargs) for _ in range(count)
)
for f in completed_futures:
results.extend(f.result())
return results

return mp_split


class Broker:
"""Main Broker class to be used as the primary interface for the Broker API."""

Expand Down Expand Up @@ -112,6 +73,7 @@ def __init__(self, **kwargs):

def _act(self, provider, method, checkout=False):
"""Perform a general action against a provider's method."""
count = self._kwargs.get("_count", 1)
logger.debug(f"Resolving action {method} on provider {provider}.")
provider_inst = provider(**self._kwargs)
helpers.emit(
Expand All @@ -123,54 +85,53 @@ def _act(self, provider, method, checkout=False):
)
method_obj = getattr(provider_inst, method)
logger.debug(f"On {provider_inst=} executing {method_obj=} with params {self._kwargs=}.")
result = method_obj(**self._kwargs)
logger.debug(f"Action {result=}")
# Overkill for a single action, cleaner than splitting the logic
with ThreadPoolExecutor() as workers:
tasks = [workers.submit(method_obj, **self._kwargs) for _ in range(count)]
result = []
for task in as_completed(tasks):
try:
result.append(task.result())
except exceptions.ProviderError as err:
result.append(err)
logger.debug(f"Result:\n{result}")
if result and checkout:
return provider_inst.construct_host(
provider_params=result, host_classes=self.host_classes, **self._kwargs
)
return [
provider_inst.construct_host(
provider_params=res, host_classes=self.host_classes, **self._kwargs
)
for res in result
if not isinstance(res, exceptions.ProviderError)
]
else:
return result
return result if len(result) > 1 else result[0]

def _update_provider_actions(self, kwargs):
if not self._provider_actions:
for key, action in PROVIDER_ACTIONS.items():
if key in kwargs:
self._provider_actions[key] = action

@mp_decorator
def _checkout(self):
"""Checkout one or more VMs.
def checkout(self):
"""Checkout one or more hosts.
:return: List of Host objects
:return: Host obj or list of Host objects
"""
hosts = []
logger.debug(f"Doing _checkout(): {self._provider_actions=}")
logger.debug(f"Starting checkout(s): {self._provider_actions=}")
if not self._provider_actions:
raise self.BrokerError("Could not determine an appropriate provider")
for action in self._provider_actions:
provider, method = PROVIDER_ACTIONS[action]
logger.info(f"Using provider {provider.__name__} to checkout")
try:
host = self._act(provider, method, checkout=True)
except exceptions.ProviderError as err:
host = err
hosts.append(host)
if host and not isinstance(host, exceptions.ProviderError):
logger.info(f"{host.__class__.__name__}: {host.hostname}")
return hosts

def checkout(self):
"""Checkout one or more VMs.
:return: Host obj or list of Host objects
"""
hosts = self._checkout()
# perform the checkout, concurrently if _count is set
hosts = self._act(provider, method, checkout=True)
err = None
for host in hosts[:]:
if isinstance(host, exceptions.ProviderError):
err = host
hosts.remove(host)
else:
logger.info(f"{host.__class__.__name__}: {host.hostname}")
helpers.emit(hosts=[host.to_dict() for host in hosts])
self._hosts.extend(hosts)
helpers.update_inventory([host.to_dict() for host in hosts])
Expand Down
19 changes: 0 additions & 19 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,3 @@ def test_multi_manager():

class SomeException(Exception):
pass


class MyBroker:
@broker.mp_decorator
def workload(self):
return []

@broker.mp_decorator
def failing_workload(self):
raise SomeException()


def test_mp_decorator():
tested_broker = MyBroker()
tested_broker._kwargs = dict(_count=2)

tested_broker.workload()
with pytest.raises(SomeException):
tested_broker.failing_workload()

0 comments on commit 15e5851

Please sign in to comment.