Skip to content

Commit

Permalink
api: use local proxy instead of the decorator
Browse files Browse the repository at this point in the history
* Introduces single variable local proxy instead of decorator- before we
depended on the with_api_client decorator, and the click context- now it's only
one LocalProxy.
* Removes Client class because it was using inheritance (from BaseAPIClient)
and it wasn't a correct relation. Without it it was just a collection of static
methods so we decided to leave them as separate functions.
* Removes mock_base_api_client fixture because it was returning a Client class
instance, when the class is no longer present. Something similar is
necessery because now we have a repeating code in lots of places-see the
issue: #219 (comment)
  • Loading branch information
okraskaj committed Dec 7, 2018
1 parent ce387bf commit b907da2
Show file tree
Hide file tree
Showing 13 changed files with 371 additions and 388 deletions.
2 changes: 0 additions & 2 deletions reana_client/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,3 @@
"""Python api for connecting to REANA server."""

from __future__ import absolute_import, print_function

from .client import Client
81 changes: 41 additions & 40 deletions reana_client/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@
# under the terms of the MIT License; see LICENSE file for more details.
"""REANA REST API client."""

import traceback
import enum
import json
import logging
import os
import traceback
from functools import partial
from werkzeug.local import LocalProxy

import pkg_resources
from bravado.exception import HTTPError
from reana_commons.api_client import BaseAPIClient
from reana_commons.api_client import get_current_api_client
from werkzeug.local import LocalProxy

from reana_client.errors import FileDeletionError, FileUploadError
from reana_client.utils import get_workflow_root
from reana_commons.api_client import BaseAPIClient, get_current_api_client

current_rs_api_client = LocalProxy(
partial(get_current_api_client, component='reana-server'))
Expand All @@ -43,8 +43,8 @@ def ping():
'REANA server health check failed: '
'\nStatus: {}\nReason: {}\n'
'Message: {}'.format(e.response.status_code,
e.response.reason,
e.response.json()['message']))
e.response.reason,
e.response.json()['message']))
raise Exception(e.response.json()['message'])
except Exception as e:
raise e
Expand All @@ -68,8 +68,8 @@ def get_workflows(access_token):
'The list of workflows could not be retrieved: '
'\nStatus: {}\nReason: {}\n'
'Message: {}'.format(e.response.status_code,
e.response.reason,
e.response.json()['message']))
e.response.reason,
e.response.json()['message']))
raise Exception(e.response.json()['message'])
except Exception as e:
raise e
Expand All @@ -95,8 +95,8 @@ def get_workflow_status(workflow, access_token):
'Analysis status could not be retrieved: '
'\nStatus: {}\nReason: {}\n'
'Message: {}'.format(e.response.status_code,
e.response.reason,
e.response.json()['message']))
e.response.reason,
e.response.json()['message']))
raise Exception(e.response.json()['message'])
except Exception as e:
raise e
Expand Down Expand Up @@ -124,8 +124,8 @@ def create_workflow(reana_specification, name, access_token):
'Workflow creation failed: '
'\nStatus: {}\nReason: {}\n'
'Message: {}'.format(e.response.status_code,
e.response.reason,
e.response.json()['message']))
e.response.reason,
e.response.json()['message']))
raise Exception(e.response.json()['message'])
except Exception as e:
raise e
Expand Down Expand Up @@ -182,8 +182,8 @@ def upload_file(workflow_id, file_, file_name, access_token):
'File could not be uploaded: '
'\nStatus: {}\nReason: {}\n'
'Message: {}'.format(e.response.status_code,
e.response.reason,
e.response.json()['message']))
e.response.reason,
e.response.json()['message']))
raise Exception(e.response.json()['message'])
except Exception as e:
raise e
Expand All @@ -210,8 +210,8 @@ def get_workflow_logs(workflow_id, access_token):
'Workflow logs could not be retrieved: '
'\nStatus: {}\nReason: {}\n'
'Message: {}'.format(e.response.status_code,
e.response.reason,
e.response.json()['message']))
e.response.reason,
e.response.json()['message']))
raise Exception(e.response.json()['message'])
except Exception as e:
raise e
Expand Down Expand Up @@ -245,8 +245,8 @@ def download_file(workflow_id, file_name, access_token):
'Output file could not be downloaded: '
'\nStatus: {}\nReason: {}\n'
'Message: {}'.format(e.response.status_code,
e.response.reason,
e.response.json()['message']))
e.response.reason,
e.response.json()['message']))
raise Exception(e.response.json()['message'])
except Exception as e:
raise e
Expand All @@ -265,7 +265,7 @@ def delete_file(workflow_id, file_name, access_token):
file_name=file_name,
access_token=access_token).result()
if http_response.status_code == 200 and (response['deleted'] or
response['failed']):
response['failed']):
return response
elif not (response['deleted'] or response['failed']):
raise FileDeletionError('{} did not match any existing '
Expand All @@ -281,14 +281,14 @@ def delete_file(workflow_id, file_name, access_token):
'File could not be downloaded: '
'\nStatus: {}\nReason: {}\n'
'Message: {}'.format(e.response.status_code,
e.response.reason,
e.response.json()['message']))
e.response.reason,
e.response.json()['message']))
raise Exception(e.response.json()['message'])
except Exception as e:
raise e


def get_files(workflow_id, access_token):
def list_files(workflow_id, access_token):
"""Return the list of file for a given workflow workspace.
:param workflow_id: UUID which identifies the workflow.
Expand All @@ -314,8 +314,8 @@ def get_files(workflow_id, access_token):
'File list could not be retrieved: '
'\nStatus: {}\nReason: {}\n'
'Message: {}'.format(e.response.status_code,
e.response.reason,
e.response.json()['message']))
e.response.reason,
e.response.json()['message']))
raise Exception(e.response.json()['message'])
except Exception as e:
raise e
Expand Down Expand Up @@ -355,8 +355,7 @@ def upload_to_server(workflow, paths, access_token):

if os.path.isdir(path):
logging.debug("'{}' is a directory.".format(path))
logging.info("Uploading contents of folder '{}' ..."
.format(path))
logging.info("Uploading contents of folder '{}' ...".format(path))
for root, dirs, files in os.walk(path, topdown=False):
uploaded_files = []
for next_path in files + dirs:
Expand Down Expand Up @@ -394,7 +393,7 @@ def upload_to_server(workflow, paths, access_token):
save_path = "/".join(
save_path.strip("/").split('/')[1:])
logging.debug("'{}' is an absolute filepath."
.format(os.path.basename(fname)))
.format(os.path.basename(fname)))
logging.info("Uploading '{}' ...".format(fname))
try:
response = upload_file(workflow, f, save_path,
Expand All @@ -407,8 +406,8 @@ def upload_to_server(workflow, paths, access_token):
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
logging.info("Something went wrong while uploading {}".
format(fname))
logging.info("Something went wrong while uploading {}"
.format(fname))


def get_workflow_parameters(workflow, access_token):
Expand All @@ -432,8 +431,8 @@ def get_workflow_parameters(workflow, access_token):
'Workflow parameters could not be retrieved: '
'\nStatus: {}\nReason: {}\n'
'Message: {}'.format(e.response.status_code,
e.response.reason,
e.response.json()['message']))
e.response.reason,
e.response.json()['message']))
raise Exception(e.response.json()['message'])
except Exception as e:
raise e
Expand All @@ -443,10 +442,11 @@ def delete_workflow(workflow, all_runs, hard_delete,
workspace, access_token):
"""Delete a workflow."""
try:
parameters = {'all_runs': True if all_runs == 1 else False,
'hard_delete': True if hard_delete == 1 else False,
'workspace': True if hard_delete == 1 or
workspace == 1 else False}
parameters = {
'all_runs': True if all_runs == 1 else False,
'hard_delete': True if hard_delete == 1 else False,
'workspace': True if hard_delete == 1 or workspace == 1 else False
}
(response,
http_response) = current_rs_api_client.api.set_workflow_status(
workflow_id_or_name=workflow,
Expand All @@ -466,8 +466,8 @@ def delete_workflow(workflow, all_runs, hard_delete,
'Workflow run could not be deleted: '
'\nStatus: {}\nReason: {}\n'
'Message: {}'.format(e.response.status_code,
e.response.reason,
e.response.json()['message']))
e.response.reason,
e.response.json()['message']))
raise Exception(e.response.json()['message'])
except Exception as e:
raise e
Expand All @@ -477,7 +477,8 @@ def stop_workflow(workflow, force_stop, access_token):
"""Stop a workflow."""
try:
parameters = {'force_stop': force_stop}
(response, http_response) = current_rs_api_client.api.set_workflow_status(
(response, http_response) = current_rs_api_client.api\
.set_workflow_status(
workflow_id_or_name=workflow,
status='stop',
access_token=access_token,
Expand All @@ -494,8 +495,8 @@ def stop_workflow(workflow, force_stop, access_token):
'Workflow run could not be stopped: '
'\nStatus: {}\nReason: {}\n'
'Message: {}'.format(e.response.status_code,
e.response.reason,
e.response.json()['message']))
e.response.reason,
e.response.json()['message']))
raise Exception(e.response.json()['message'])
except Exception as e:
raise e
Expand Down
1 change: 0 additions & 1 deletion reana_client/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import click

from reana_client.api import Client
from reana_client.cli import workflow, files, ping

DEBUG_LOG_FORMAT = '[%(asctime)s] p%(process)s ' \
Expand Down
21 changes: 9 additions & 12 deletions reana_client/cli/cwl_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
from cwltool.main import printdeps
from cwltool.workflow import findfiles

from reana_client.api import Client
from reana_client.api.client import (create_workflow, current_rs_api_client,
get_workflow_logs, start_workflow,
upload_file)
from reana_client.cli.utils import add_access_token_options
from reana_client.config import default_user
from reana_client.decorators import with_api_client
from reana_client.utils import load_workflow_spec
from reana_client.version import __version__


PY3 = sys.version_info > (3,)


Expand Down Expand Up @@ -68,7 +68,6 @@ def get_file_dependencies_obj(cwl_obj, basedir):
@click.argument('processfile', required=False)
@click.argument('jobfile')
@click.pass_context
@with_api_client
def cwl_runner(ctx, quiet, outdir, basedir, processfile, jobfile,
access_token):
"""Run CWL files in a standard format <workflow.cwl> <job.json>."""
Expand Down Expand Up @@ -104,9 +103,9 @@ def cwl_runner(ctx, quiet, outdir, basedir, processfile, jobfile,
reana_spec['workflow']['spec'] = replace_location_in_cwl_spec(
reana_spec['workflow']['spec'])

logging.info('Connecting to {0}'.format(ctx.obj.client.server_url))
response = ctx.obj.client.create_workflow(reana_spec, 'cwl-test',
access_token)
logging.info('Connecting to {0}'.format(
current_rs_api_client.swagger_spec.api_url))
response = create_workflow(reana_spec, 'cwl-test', access_token)
logging.error(response)
workflow_name = response['workflow_name']
workflow_id = response['workflow_id']
Expand All @@ -122,20 +121,18 @@ def cwl_runner(ctx, quiet, outdir, basedir, processfile, jobfile,
file_path = cwl_file_object.get('location')
abs_file_path = os.path.join(basedir, file_path)
with open(abs_file_path, 'r') as f:
ctx.obj.client.upload_file(workflow_id, f, file_path,
access_token)
upload_file(workflow_id, f, file_path, access_token)
logging.error('File {} uploaded.'.format(file_path))

response = ctx.obj.client.start_workflow(
response = start_workflow(
workflow_id, access_token, reana_spec['inputs']['parameters'])
logging.error(response)

first_logs = ""
while True:
sleep(1)
logging.error('Polling workflow logs')
response = ctx.obj.client.get_workflow_logs(workflow_id,
access_token)
response = get_workflow_logs(workflow_id, access_token)
logs = response['logs']
if logs != first_logs:

Expand Down
Loading

0 comments on commit b907da2

Please sign in to comment.