Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
funilrys committed Dec 31, 2024
2 parents c2d417a + 6b3574c commit f6d7add
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 2 deletions.
2 changes: 1 addition & 1 deletion PyFunceble/ext/process_manager/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@
limitations under the License.
"""

__version__ = "1.0.5"
__version__ = "1.0.6"
23 changes: 23 additions & 0 deletions PyFunceble/ext/process_manager/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,27 @@ def wrapper(self, *args, **kwargs):

return wrapper

def relink_queues_after(func: Callable[..., Any]) -> Callable[..., Any]:
"""
Decorator which ensures that the input queue of the dependent manager is
the output queue of the current manager.
"""

@functools.wraps(func)
def wrapper(self, *args, **kwargs):
result = func(self, *args, **kwargs) # pylint: disable=not-callable

if self.output_queues and self.dependent_managers:
for index, manager in enumerate(self.dependent_managers):
try:
manager.input_queue = self.output_queues[index]
except IndexError:
manager.input_queue = self.output_queues[-1]

return result

return wrapper

@property
def name(self) -> str:
"""
Expand Down Expand Up @@ -559,6 +580,7 @@ def is_queue_full(self) -> bool:

return self.queue_full

@relink_queues_after
def add_dependent_manager(
self, manager: "ProcessManagerCore"
) -> "ProcessManagerCore":
Expand All @@ -573,6 +595,7 @@ def add_dependent_manager(

return self

@relink_queues_after
def remove_dependent_manager(
self, manager: "ProcessManagerCore"
) -> "ProcessManagerCore":
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def get_version():

return extracted


def get_long_description(): # pragma: no cover
"""
This function return the long description.
Expand Down Expand Up @@ -170,5 +171,4 @@ def get_long_description(): # pragma: no cover
"Programming Language :: Python :: 3",
"License :: OSI Approved",
],
test_suite="setup._test_suite",
)
50 changes: 50 additions & 0 deletions tests/test_process_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,56 @@ def test_add_dependent_manager(process_manager):
assert process_manager.dependent_managers[0].name == "ppm-dependent_manager"


def test_relink_queues(process_manager):
process_manager.output_queues = [MagicMock()]

dependent_manager = ProcessManagerCore(max_workers=2, generate_input_queue=False)
dependent_manager.STD_NAME = "dependent_manager"

assert dependent_manager.input_queue is None

process_manager.add_dependent_manager(dependent_manager)
assert dependent_manager.input_queue == process_manager.output_queues[0]


def test_relink_queues_with_no_output_queues(process_manager):
process_manager.output_queues = []

dependent_manager = ProcessManagerCore(max_workers=2, generate_input_queue=False)
dependent_manager.STD_NAME = "dependent_manager"

assert dependent_manager.input_queue is None

process_manager.add_dependent_manager(dependent_manager)
assert dependent_manager.input_queue is None


def test_relink_queues_with_more_dependencies_than_output_queues(process_manager):
process_manager.output_queues = [MagicMock()]

dependent_manager = ProcessManagerCore(max_workers=2, generate_input_queue=False)
dependent_manager.STD_NAME = "dependent_manager"

assert dependent_manager.input_queue is None

process_manager.add_dependent_manager(dependent_manager)

assert dependent_manager.input_queue == process_manager.output_queues[0]

new_dependent_manager = ProcessManagerCore(
max_workers=2, generate_input_queue=False
)
new_dependent_manager.STD_NAME = "new_dependent_manager"

assert new_dependent_manager.input_queue is None

process_manager.add_dependent_manager(new_dependent_manager)

assert new_dependent_manager.input_queue == process_manager.output_queues[0]

process_manager.add_dependent_manager(new_dependent_manager)


def test_remove_dependent_manager(process_manager):
dependent_manager = ProcessManagerCore(max_workers=2)

Expand Down

0 comments on commit f6d7add

Please sign in to comment.