From 4dcf16ee5072801b4fc4bc33887344503e8b6da3 Mon Sep 17 00:00:00 2001 From: Rudolph Pienaar Date: Thu, 5 Sep 2024 18:10:49 -0400 Subject: [PATCH] Update to new chrisclient --- dylld.py | 400 +++++++++++++++++++++-------------------------- requirements.txt | 2 +- 2 files changed, 182 insertions(+), 220 deletions(-) diff --git a/dylld.py b/dylld.py index 33c229a..a24b1b5 100644 --- a/dylld.py +++ b/dylld.py @@ -1,35 +1,36 @@ #!/usr/bin/env python -from collections.abc import Iterator -from pathlib import Path -from argparse import ArgumentParser, Namespace, ArgumentDefaultsHelpFormatter +from collections.abc import Iterator +from pathlib import Path +from argparse import ArgumentParser, Namespace, ArgumentDefaultsHelpFormatter -from chris_plugin import chris_plugin, PathMapper +from chris_plugin import chris_plugin, PathMapper -from pathlib import Path +from pathlib import Path -from io import TextIOWrapper -import os, sys -os.environ['XDG_CONFIG_HOME'] = '/tmp' -import pudb -from pudb.remote import set_trace +from io import TextIOWrapper +import os, sys -from loguru import logger -from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor -from threading import current_thread, get_native_id +os.environ["XDG_CONFIG_HOME"] = "/tmp" +import pudb +from pudb.remote import set_trace -from typing import Callable, Any +from loguru import logger +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor +from threading import current_thread, get_native_id -from datetime import datetime, timezone -import json +from typing import Callable, Any -from state import data -from logic import behavior -from control import action -from control.filter import PathFilter -from pftag import pftag -from pflog import pflog +from datetime import datetime, timezone +import json -LOG = logger.debug +from state import data +from logic import behavior +from control import action +from control.filter import PathFilter +from pftag import pftag +from pflog import pflog + +LOG = logger.debug logger_format = ( "{time:YYYY-MM-DD HH:mm:ss} │ " @@ -42,11 +43,11 @@ logger.remove() logger.add(sys.stderr, format=logger_format) -pluginInputDir:Path -pluginOutputDir:Path -ld_forestResult:list = [] +pluginInputDir: Path +pluginOutputDir: Path +ld_forestResult: list = [] -__version__ = '4.4.38' +__version__ = "4.4.40" DISPLAY_TITLE = r""" _ _ _ _ _ @@ -60,154 +61,126 @@ """ -parser: ArgumentParser = ArgumentParser( - description = ''' +parser: ArgumentParser = ArgumentParser( + description=""" A ChRIS plugin that dynamically builds a workflow to compute length discrepencies from extremity X-Rays -''', - formatter_class=ArgumentDefaultsHelpFormatter) - +""", + formatter_class=ArgumentDefaultsHelpFormatter, +) -parser.add_argument('-V', '--version', action='version', - version=f'%(prog)s {__version__}') parser.add_argument( - '--pattern', - default = '**/*dcm', - help = ''' - pattern for file names to include (you should quote this!) - (this flag triggers the PathMapper on the inputdir).''' -) -parser.add_argument( - '--pluginInstanceID', - default = '', - help = 'plugin instance ID from which to start analysis' -) -parser.add_argument( - '--CUBEurl', - default = 'http://localhost:8000/api/v1/', - help = 'CUBE URL' -) -parser.add_argument( - '--CUBEuser', - default = 'chris', - help = 'CUBE/ChRIS username' -) -parser.add_argument( - '--CUBEpassword', - default = 'chris1234', - help = 'CUBE/ChRIS password' -) -parser.add_argument( - '--orthancURL', - default = 'https://orthanc-chris-public.apps.ocp-prod.massopen.cloud/', - help = 'IP of the orthanc to receive analysis results' -) -parser.add_argument( - '--orthancuser', - default = 'fnndsc', - help = 'Orthanc username' + "-V", "--version", action="version", version=f"%(prog)s {__version__}" ) + parser.add_argument( - '--orthancpassword', - default = 'Lerkyacyids5', - help = 'Orthanc password' + "--pattern", + default="**/*dcm", + help=""" + pattern for file names to include (you should quote this!) + (this flag triggers the PathMapper on the inputdir).""", ) parser.add_argument( - '--orthancremote', - default = '', - help = 'remote orthanc modality' + "--pluginInstanceID", + default="", + help="plugin instance ID from which to start analysis", ) parser.add_argument( - '--verbosity', - default = '0', - help = 'verbosity level of app' + "--CUBEurl", default="http://localhost:8000/api/v1/", help="CUBE URL" ) +parser.add_argument("--CUBEuser", default="chris", help="CUBE/ChRIS username") +parser.add_argument("--CUBEpassword", default="chris1234", help="CUBE/ChRIS password") parser.add_argument( - "--thread", - help = "use threading to branch in parallel", - dest = 'thread', - action = 'store_true', - default = False + "--orthancURL", + default="https://orthanc-chris-public.apps.ocp-prod.massopen.cloud/", + help="IP of the orthanc to receive analysis results", ) +parser.add_argument("--orthancuser", default="fnndsc", help="Orthanc username") parser.add_argument( - "--pftelDB", - help = "an optional pftel telemetry logger, of form '/api/v1///'", - default = '' + "--orthancpassword", default="Lerkyacyids5", help="Orthanc password" ) +parser.add_argument("--orthancremote", default="", help="remote orthanc modality") +parser.add_argument("--verbosity", default="0", help="verbosity level of app") parser.add_argument( - "--inNode", - help = "perform in-node implicit parallelization in conjunction with --thread", - dest = 'inNode', - action = 'store_true', - default = False + "--thread", + help="use threading to branch in parallel", + dest="thread", + action="store_true", + default=False, ) parser.add_argument( - "--notimeout", - help = "if specified, then controller never timesout while waiting on nodes to complete", - dest = 'notimeout', - action = 'store_true', - default = False + "--pftelDB", + help="an optional pftel telemetry logger, of form '/api/v1///'", + default="", ) parser.add_argument( - "--debug", - help = "if true, toggle telnet pudb debugging", - dest = 'debug', - action = 'store_true', - default = False + "--inNode", + help="perform in-node implicit parallelization in conjunction with --thread", + dest="inNode", + action="store_true", + default=False, ) parser.add_argument( - "--debugTermSize", - help = "the terminal 'cols,rows' size for debugging", - default = '253,62' + "--notimeout", + help="if specified, then controller never timesout while waiting on nodes to complete", + dest="notimeout", + action="store_true", + default=False, ) parser.add_argument( - "--debugPort", - help = "the debugging telnet port", - default = '7900' + "--debug", + help="if true, toggle telnet pudb debugging", + dest="debug", + action="store_true", + default=False, ) parser.add_argument( - "--debugHost", - help = "the debugging telnet host", - default = '0.0.0.0' + "--debugTermSize", + help="the terminal 'cols,rows' size for debugging", + default="253,62", ) +parser.add_argument("--debugPort", help="the debugging telnet port", default="7900") +parser.add_argument("--debugHost", help="the debugging telnet host", default="0.0.0.0") -def Env_setup( options : Namespace, - inputdir : Path, - outputdir : Path, - debugPortOffset : int = 0) -> data.env: + +def Env_setup( + options: Namespace, inputdir: Path, outputdir: Path, debugPortOffset: int = 0 +) -> data.env: """ - Setup the environment + Setup the environment - Args: - options (Namespace): options passed from the CLI caller - inputdir (Path): plugin global input directory - outputdir (Path): plugin global output directory - debugPortOffset (int, optional): offset added to debug port -- useful for multithreading. Defaults to 0. + Args: + options (Namespace): options passed from the CLI caller + inputdir (Path): plugin global input directory + outputdir (Path): plugin global output directory + debugPortOffset (int, optional): offset added to debug port -- useful for multithreading. Defaults to 0. - Returns: - data.env: an instantiated environment object. Note in multithreaded - runs, each thread gets its own object. + Returns: + data.env: an instantiated environment object. Note in multithreaded + runs, each thread gets its own object. """ - Env: data.env = data.env() - Env.CUBE.set(inputdir = str(inputdir)) - Env.CUBE.set(outputdir = str(outputdir)) - Env.CUBE.set(url = str(options.CUBEurl)) - Env.CUBE.set(username = str(options.CUBEuser)) - Env.CUBE.set(password = str(options.CUBEpassword)) - Env.orthanc.set(url = str(options.orthancURL)) - Env.orthanc.set(username = str(options.orthancuser)) - Env.orthanc.set(password = str(options.orthancpassword)) - Env.orthanc.set(remote = str(options.orthancremote)) - Env.set(inputdir = inputdir) - Env.set(outputdir = outputdir) - Env.debug_setup( debug = options.debug, - termsize = options.debugTermSize, - port = int(options.debugPort) + debugPortOffset, - host = options.debugHost + Env: data.env = data.env() + Env.CUBE.set(inputdir=str(inputdir)) + Env.CUBE.set(outputdir=str(outputdir)) + Env.CUBE.set(url=str(options.CUBEurl)) + Env.CUBE.set(username=str(options.CUBEuser)) + Env.CUBE.set(password=str(options.CUBEpassword)) + Env.orthanc.set(url=str(options.orthancURL)) + Env.orthanc.set(username=str(options.orthancuser)) + Env.orthanc.set(password=str(options.orthancpassword)) + Env.orthanc.set(remote=str(options.orthancremote)) + Env.set(inputdir=inputdir) + Env.set(outputdir=outputdir) + Env.debug_setup( + debug=options.debug, + termsize=options.debugTermSize, + port=int(options.debugPort) + debugPortOffset, + host=options.debugHost, ) return Env + def preamble(options: Namespace) -> str: """ Just show some preamble "noise" in the output terminal and also process @@ -221,26 +194,27 @@ def preamble(options: Namespace) -> str: """ print(DISPLAY_TITLE) - pftelDB:str = "" + pftelDB: str = "" if options.pftelDB: - tagger:pftag.Pftag = pftag.Pftag({}) - pftelDB = tagger(options.pftelDB)['result'] + tagger: pftag.Pftag = pftag.Pftag({}) + pftelDB = tagger(options.pftelDB)["result"] LOG("plugin arguments...") - for k,v in options.__dict__.items(): - LOG("%25s: [%s]" % (k, v)) + for k, v in options.__dict__.items(): + LOG("%25s: [%s]" % (k, v)) LOG("") LOG("base environment...") - for k,v in os.environ.items(): - LOG("%25s: [%s]" % (k, v)) + for k, v in os.environ.items(): + LOG("%25s: [%s]" % (k, v)) LOG("") LOG("Starting growth cycle...") return pftelDB -def ground_prep(options: Namespace, Env : data.env) -> action.PluginRun: + +def ground_prep(options: Namespace, Env: data.env) -> action.PluginRun: """ Do some per-tree setup -- prepare the ground! @@ -257,16 +231,18 @@ def ground_prep(options: Namespace, Env : data.env) -> action.PluginRun: LOG("Prepping ground for tree in thread %s..." % get_native_id()) LOG("Constructing object to filter parent field") - PLinputFilter: action.PluginRun = action.PluginRun(env = Env, options = options) + PLinputFilter: action.PluginRun = action.PluginRun(env=Env, options=options) if len(options.pluginInstanceID): - Env.CUBE.parentPluginInstanceID = options.pluginInstanceID + Env.CUBE.parentPluginInstanceID = options.pluginInstanceID else: - Env.CUBE.parentPluginInstanceID = \ - Env.CUBE.parentPluginInstanceID_discover()['parentPluginInstanceID'] + Env.CUBE.parentPluginInstanceID = Env.CUBE.parentPluginInstanceID_discover()[ + "parentPluginInstanceID" + ] return PLinputFilter -def replantSeed_catchError(PLseed:action.PluginRun, input: Path) -> dict: + +def replantSeed_catchError(PLseed: action.PluginRun, input: Path) -> dict: """ Re-run a failed filter (pl-shexec) with explicit error catching @@ -277,12 +253,13 @@ def replantSeed_catchError(PLseed:action.PluginRun, input: Path) -> dict: Returns: dict: the detailed error log from the failed run """ - global LOG + global LOG LOG("Some error was returned when planting the seed!") - LOG('Replanting seed with error catching on...') - d_seedreplant:dict = PLseed(str(input), append = "--jsonReturn") + LOG("Replanting seed with error catching on...") + d_seedreplant: dict = PLseed(str(input), append="--jsonReturn") return d_seedreplant + def tree_grow(options: Namespace, input: Path, output: Path = None) -> dict: """ Based on some conditional applied to the file space, direct the @@ -300,54 +277,44 @@ def tree_grow(options: Namespace, input: Path, output: Path = None) -> dict: # set_trace(term_size=(253, 62), host = '0.0.0.0', port = 7900) - Env:data.env = Env_setup( - options, - pluginInputDir, - pluginOutputDir, - get_native_id() - ) + Env: data.env = Env_setup(options, pluginInputDir, pluginOutputDir, get_native_id()) Env.set_telnet_trace_if_specified() - timenow:Callable[[], str] = lambda: datetime.now(timezone.utc).astimezone().isoformat() - conditional:behavior.Filter = behavior.Filter() - conditional.obj_pass = behavior.unconditionalPass - PLinputFilter:action.PluginRun = ground_prep(options, Env) - LLD:action.LLDcomputeflow = action.LLDcomputeflow(env = Env, options = options) - str_threadName:str = current_thread().getName() - d_seedGet:dict = { - "status" : False, - "message" : "unable to plant seed" - } - d_treeGrow:dict = { - "status" : False, - "message" : "unable to grow tree" - } - d_ret:dict = { - "seed" : {}, - "tree" : {} - } - - Path('%s/start-%s.touch' % (Env.outputdir.touch(), str_threadName)) + timenow: Callable[[], str] = ( + lambda: datetime.now(timezone.utc).astimezone().isoformat() + ) + conditional: behavior.Filter = behavior.Filter() + conditional.obj_pass = behavior.unconditionalPass + PLinputFilter: action.PluginRun = ground_prep(options, Env) + LLD: action.LLDcomputeflow = action.LLDcomputeflow(env=Env, options=options) + str_threadName: str = current_thread().getName() + d_seedGet: dict = {"status": False, "message": "unable to plant seed"} + d_treeGrow: dict = {"status": False, "message": "unable to grow tree"} + d_ret: dict = {"seed": {}, "tree": {}} + + Path("%s/start-%s.touch" % (Env.outputdir.touch(), str_threadName)) LOG("Growing a new tree in thread %s..." % str_threadName) - str_heartbeat:str = str(Env.outputdir.joinpath('heartbeat-%s.log' % \ - str_threadName)) - fl:TextIOWrapper = open(str_heartbeat, 'w') - fl.write('Start time: {}\n'.format(timenow())) + str_heartbeat: str = str( + Env.outputdir.joinpath("heartbeat-%s.log" % str_threadName) + ) + fl: TextIOWrapper = open(str_heartbeat, "w") + fl.write("Start time: {}\n".format(timenow())) if conditional.obj_pass(str(input)): LOG("Planting seed off %s" % str(input)) - d_seedGet = PLinputFilter(str(input)) - if d_seedGet['status']: - d_treeGrow = LLD(d_seedGet['branchInstanceID']) + d_seedGet = PLinputFilter(str(input)) + if d_seedGet["status"]: + d_treeGrow = LLD(d_seedGet["branchInstanceID"]) else: - d_seedGet['failed'] = replantSeed_catchError(PLinputFilter, input) - fl.write('End time: {}\n'.format(timenow())) + d_seedGet["failed"] = replantSeed_catchError(PLinputFilter, input) + fl.write("End time: {}\n".format(timenow())) fl.close() - d_ret['seed'] = d_seedGet - d_ret['tree'] = d_treeGrow + d_ret["seed"] = d_seedGet + d_ret["tree"] = d_treeGrow ld_forestResult.append(d_ret) return d_ret -def treeGrowth_savelog(outputdir : Path) -> None: + +def treeGrowth_savelog(outputdir: Path) -> None: """ Write the global log file on the tree growth to the passed @@ -357,22 +324,22 @@ def treeGrowth_savelog(outputdir : Path) -> None: """ global ld_forestResult - with open(str(outputdir.joinpath('treeLog.json')), 'w') as f: + with open(str(outputdir.joinpath("treeLog.json")), "w") as f: f.write(json.dumps(ld_forestResult, indent=4)) f.close() + # documentation: https://fnndsc.github.io/chris_plugin/chris_plugin.html#chris_plugin @chris_plugin( - parser = parser, - title = 'Leg-Length Discrepency - Dynamic Compute Flow', - category = '', # ref. https://chrisstore.co/plugins - min_memory_limit = '100Mi', # supported units: Mi, Gi - min_cpu_limit = '1000m', # millicores, e.g. "1000m" = 1 CPU core - min_gpu_limit = 0 # set min_gpu_limit=1 to enable GPU + parser=parser, + title="Leg-Length Discrepency - Dynamic Compute Flow", + category="", # ref. https://chrisstore.co/plugins + min_memory_limit="100Mi", # supported units: Mi, Gi + min_cpu_limit="1000m", # millicores, e.g. "1000m" = 1 CPU core + min_gpu_limit=0, # set min_gpu_limit=1 to enable GPU ) @pflog.tel_logTime( - event = 'dylld', - log = 'Leg Length Discepency Dynamic Workflow controller' + event="dylld", log="Leg Length Discepency Dynamic Workflow controller" ) def main(options: Namespace, inputdir: Path, outputdir: Path): """ @@ -382,37 +349,32 @@ def main(options: Namespace, inputdir: Path, outputdir: Path): """ # set_trace(term_size=(253, 62), host = '0.0.0.0', port = 7900) global pluginInputDir, pluginOutputDir - pluginInputDir = inputdir - pluginOutputDir = outputdir - + pluginInputDir = inputdir + pluginOutputDir = outputdir - options.pftelDB = preamble(options) + options.pftelDB = preamble(options) - output:Path + output: Path if not options.inNode: - mapper:PathMapper = PathMapper.file_mapper( - inputdir, - outputdir, - glob = options.pattern - ) + mapper: PathMapper = PathMapper.file_mapper( + inputdir, outputdir, glob=options.pattern + ) else: - mapper:PathMapper = PathMapper.dir_mapper_deep( - inputdir, - outputdir - ) + mapper: PathMapper = PathMapper.dir_mapper_deep(inputdir, outputdir) if int(options.thread): with ThreadPoolExecutor(max_workers=len(os.sched_getaffinity(0))) as pool: - results:Iterator = pool.map(lambda t: tree_grow(options, *t), mapper) + results: Iterator = pool.map(lambda t: tree_grow(options, *t), mapper) # raise any Exceptions which happened in threads for _ in results: pass else: for input, output in mapper: - d_results:dict = tree_grow(options, input, output) + d_results: dict = tree_grow(options, input, output) LOG("Ending growth cycle...") treeGrowth_savelog(outputdir) -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/requirements.txt b/requirements.txt index 189642f..1958f28 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ pudb pflogf pfmisc loguru -python-chrisclient==2.9.1 +python-chrisclient==2.11.1 pftag==1.2.22 pflog==1.2.26 pftel-client