Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix factory.py #1099

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 65 additions & 18 deletions cwltool/factory.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
from __future__ import absolute_import

# move to a regular typing import when Python 3.3-3.6 is no longer supported
import functools
import os
import sys
from typing import Callable as tCallable # pylint: disable=unused-import
from typing import Any, Dict, Tuple, Union
mr-c marked this conversation as resolved.
Show resolved Hide resolved

from typing_extensions import Text # pylint: disable=unused-import
# move to a regular typing import when Python 3.3-3.6 is no longer supported

from . import load_tool
from .context import LoadingContext, RuntimeContext
from .argparser import arg_parser
from .context import LoadingContext, RuntimeContext, getdefault
from .executors import SingleJobExecutor
from .process import Process
mr-c marked this conversation as resolved.
Show resolved Hide resolved
from .main import find_default_container
from .resolver import tool_resolver
from .secrets import SecretStore
from .utils import DEFAULT_TMP_PREFIX


class WorkflowStatus(Exception):
Expand All @@ -28,34 +35,74 @@ def __init__(self, t, factory): # type: (Process, Factory) -> None

def __call__(self, **kwargs):
# type: (**Any) -> Union[Text, Dict[Text, Text]]
runtime_context = self.factory.runtime_context.copy()
runtime_context.basedir = os.getcwd()
out, status = self.factory.executor(self.t, kwargs, runtime_context)
out, status = self.factory.executor(
self.t, kwargs, self.factory.runtimeContext)
if status != "success":
raise WorkflowStatus(out, status)
else:
return out


class Factory(object):
def __init__(self,
executor=None, # type: tCallable[...,Tuple[Dict[Text,Any], Text]]
loading_context=None, # type: LoadingContext
runtime_context=None # type: RuntimeContext
): # type: (...) -> None
argsl=None, # type: List[str]
args=None, # type: argparse.Namespace
executor=None, # type: Callable[..., Tuple[Dict[Text, Any], Text]]
mr-c marked this conversation as resolved.
Show resolved Hide resolved
loadingContext=None, # type: LoadingContext
mr-c marked this conversation as resolved.
Show resolved Hide resolved
runtimeContext=None # type: RuntimeContext
): # type: (...) -> None
if argsl is not None:
args = arg_parser().parse_args(argsl)
if executor is None:
executor = SingleJobExecutor()
self.executor = executor
self.loading_context = loading_context
if loading_context is None:
self.loading_context = LoadingContext()
if runtime_context is None:
self.runtime_context = RuntimeContext()
self.executor = SingleJobExecutor()
else:
self.executor = executor
if loadingContext is None:
self.loadingContext = LoadingContext(vars(args))
mr-c marked this conversation as resolved.
Show resolved Hide resolved
self._fix_loadingContext()
else:
self.runtime_context = runtime_context
self.loadingContext = loadingContext
if runtimeContext is None:
self.runtimeContext = RuntimeContext(vars(args))
self._fix_runtimeContext()
else:
self.runtimeContext = runtimeContext

def make(self, cwl):
"""Instantiate a CWL object from a CWl document."""
load = load_tool.load_tool(cwl, self.loading_context)
load = load_tool.load_tool(cwl, self.loadingContext)
if isinstance(load, int):
raise Exception("Error loading tool")
return Callable(load, self)

def _fix_loadingContext(self):
mr-c marked this conversation as resolved.
Show resolved Hide resolved
self.loadingContext.resolver = getdefault(
mr-c marked this conversation as resolved.
Show resolved Hide resolved
self.loadingContext.resolver, tool_resolver)

def _fix_runtimeContext(self):
mr-c marked this conversation as resolved.
Show resolved Hide resolved
self.runtimeContext.basedir = os.getcwd()
self.runtimeContext.find_default_container = functools.partial(
find_default_container,
default_container=None,
use_biocontainers=None)

if sys.platform == "darwin":
default_mac_path = "/private/tmp/docker_tmp"
if self.runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX:
self.runtimeContext.tmp_outdir_prefix = default_mac_path

for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"):
if getattr(self.runtimeContext, dirprefix) and getattr(self.runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX:
sl = "/" if getattr(self.runtimeContext, dirprefix).endswith("/") or dirprefix == "cachedir" \
else ""
setattr(self.runtimeContext, dirprefix,
os.path.abspath(getattr(self.runtimeContext, dirprefix)) + sl)
if not os.path.exists(os.path.dirname(getattr(self.runtimeContext, dirprefix))):
try:
os.makedirs(os.path.dirname(
getattr(self.runtimeContext, dirprefix)))
except Exception as e:
print("Failed to create directory: %s", e)

self.runtimeContext.secret_store = getdefault(
self.runtimeContext.secret_store, SecretStore())