Skip to content

Commit

Permalink
Use trio for process and scenario launching!
Browse files Browse the repository at this point in the history
After attempting to find an OS portable way to spawn subprocesses using
the stdlib and coming out unsatisfied, I've decided use the new
subprocess launching support in `trio`! This will of course require that
the project moves to python 3.6+ giving us access to a lot of neat
features of modern python including async/await support and adherence to
the structured concurrency principles prominent in `trio`. It turns out
this is a good fit since SIPp already has a built in cancellation
mechanism via the SIGUSR1 signal.

There's a lot of "core" changes to go over in this commit:
- drop the "run protocol" and "runner creation" related hooks since they
  really shouldn't be overridden until there's some need for it and it's
  likely smarter to keep those "machinery" details strictly internal for now
- the run "protocol" has now been relegated to an async function:
  `pysipp.launch.run_all_agents()`
- many routines have been converted to async functions particularly at the
  runner (`pysipp.TrioRunner.run()`, `.get()`) and scenario
  (`pysipp.Scenario.arun()`) levels allowing us to expose both a sync and
  async interface for running subprocesses / agents
- drop all the epoll/select loop stuff as this is entirely delegated to
  `trio.open_process()` and it's underlying machinery and APIs

Resolves #53
  • Loading branch information
goodboy committed Dec 24, 2020
1 parent 0492895 commit 3df7a5f
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 182 deletions.
70 changes: 4 additions & 66 deletions pysipp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@
'''
import sys
from os.path import dirname
from . import launch, report, plugin, netplug, agent
from . import plugin, netplug, agent
from .load import iter_scen_dirs
from .agent import client, server


class SIPpFailure(RuntimeError):
"""SIPp commands failed
"""


__package__ = 'pysipp'
__author__ = 'Tyler Goodlet (tgoodlet@gmail.com)'

Expand Down Expand Up @@ -106,12 +101,11 @@ def scenario(dirpath=None, proxyaddr=None, autolocalsocks=True,
# same as above
scen = plugin.mng.hook.pysipp_conf_scen_protocol(
agents=[uas, uac], confpy=None,
scenkwargs=scenkwargs
)

if proxyaddr:
assert isinstance(
proxyaddr, tuple), 'proxyaddr must be a (addr, port) tuple'
if proxyaddr is not None:
assert isinstance(proxyaddr, tuple), (
'proxyaddr must be a (addr, port) tuple')
scen.clientdefaults.proxyaddr = proxyaddr

return scen
Expand Down Expand Up @@ -196,61 +190,5 @@ def pysipp_conf_scen(agents, scen):
ua.rtp_echo = True


@plugin.hookimpl
def pysipp_new_runner():
"""Provision and assign a default cmd runner
"""
return launch.PopenRunner()


@plugin.hookimpl
def pysipp_run_protocol(scen, runner, block, timeout, raise_exc):
""""Run all rendered commands with the provided runner or the built-in
PopenRunner which runs commands locally.
"""
# use provided runner or default provided by hook
runner = runner or plugin.mng.hook.pysipp_new_runner()
agents = scen.prepare()

def finalize(cmds2procs=None, timeout=180, raise_exc=True):
"""Wait for all remaining agents in the scenario to finish executing
and perform error and logfile reporting.
"""
cmds2procs = cmds2procs or runner.get(timeout=timeout)
agents2procs = list(zip(agents, cmds2procs.values()))
msg = report.err_summary(agents2procs)
if msg:
# report logs and stderr
report.emit_logfiles(agents2procs)
if raise_exc:
# raise RuntimeError on agent failure(s)
# (HINT: to rerun type `scen()` from the debugger)
raise SIPpFailure(msg)

return cmds2procs

try:
# run all agents (raises RuntimeError on timeout)
cmds2procs = runner(
(ua.render() for ua in agents),
block=block, timeout=timeout
)
except launch.TimeoutError: # sucessful timeout
cmds2procs = finalize(timeout=0, raise_exc=False)
if raise_exc:
raise
else:
# async
if not block:
# XXX async run must bundle up results for later processing
scen.finalize = finalize
return finalize

# sync
finalize(cmds2procs, raise_exc=raise_exc)

return runner


# register the default hook set
plugin.mng.register(sys.modules[__name__])
66 changes: 48 additions & 18 deletions pysipp/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import re
import itertools
import tempfile
from functools import partial
from copy import deepcopy
from distutils import spawn
from collections import namedtuple, OrderedDict
from . import command, plugin, utils

import trio

from . import command, plugin, utils, launch, report

log = utils.get_logger()

Expand Down Expand Up @@ -61,20 +65,21 @@ def name(self):
ipcaddr = tuple_property(('ipc_host', 'ipc_port'))
call_load = tuple_property(('rate', 'limit', 'call_count'))

def __call__(self, block=True, timeout=180, runner=None, raise_exc=True,
**kwargs):

def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)

def run(
self,
timeout=180,
**kwargs
):

# create and configure a temp scenario
scen = plugin.mng.hook.pysipp_conf_scen_protocol(
agents=[self], confpy=None, scenkwargs={},
)
# run the standard protocol
# (attach allocted runner for reuse/post-portem)
return plugin.mng.hook.pysipp_run_protocol(
scen=scen, block=block, timeout=timeout,
runner=runner,
raise_exc=raise_exc, **kwargs
)
return scen.run(timeout=timeout, **kwargs)

def is_client(self):
return 'uac' in self.name.lower()
Expand Down Expand Up @@ -255,8 +260,13 @@ class ScenarioType(object):
If called it will invoke the standard run hooks.
"""

def __init__(self, agents, defaults, clientdefaults=None,
serverdefaults=None, confpy=None, enable_screen_file=True):
def __init__(
self, agents, defaults, clientdefaults=None,
serverdefaults=None, confpy=None, enable_screen_file=True
):
# placeholder for process "runner"
self._runner = None

# agents iterable in launch-order
self._agents = agents
ua_attrs = UserAgent.keys()
Expand Down Expand Up @@ -432,10 +442,30 @@ def from_agents(self, agents=None, autolocalsocks=True, **scenkwargs):
return type(self)(
self.prepare(agents), self._defaults, confpy=self.mod)

def __call__(self, agents=None, block=True, timeout=180, runner=None,
raise_exc=True, copy_agents=False, **kwargs):
return plugin.mng.hook.pysipp_run_protocol(
scen=self,
block=block, timeout=timeout, runner=runner,
raise_exc=raise_exc, **kwargs
async def arun(
self,
timeout=180,
runner=None,
):
agents = self.prepare()
runner = runner or launch.TrioRunner()

return await launch.run_all_agents(runner, agents, timeout=timeout)

def run(
self,
timeout=180,
**kwargs
):
"""Run scenario blocking to completion."""
return trio.run(
partial(
self.arun,
timeout=timeout,
**kwargs
)
)

def __call__(self, *args, **kwargs):
# TODO: deprecation warning here
return self.run(*args, **kwargs)
Loading

0 comments on commit 3df7a5f

Please sign in to comment.