Skip to content
This repository has been archived by the owner on Dec 18, 2023. It is now read-only.

Commit

Permalink
Merge pull request #66 from thehyve/gbe-scan
Browse files Browse the repository at this point in the history
Call gb-backend scan call after the data load
  • Loading branch information
ewelinagr authored Apr 3, 2019
2 parents 735f16b + d81a132 commit f5a92ff
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 19 deletions.
23 changes: 16 additions & 7 deletions luigi-pipeline/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import luigi
import time
import threading
import asyncio

from .luigi_commons import BaseTask, ExternalProgramTask

Expand Down Expand Up @@ -234,24 +235,32 @@ class TransmartApiTask(BaseTask):
transmart_url = luigi.Parameter(description='URL of the tranSMART instance', significant=False)
transmart_username = luigi.Parameter(description='Username for an admin account', significant=False)
transmart_password = luigi.Parameter(description='Password for the admin account', significant=False)
gb_backend_url = luigi.Parameter(description='URL of the gb backend instance', significant=False)

max_status_check_retrial = 240

def run(self):
reload_obj = TransmartApiCalls(keycloak_url=self.keycloak_url,
username=self.transmart_username,
password=self.transmart_password,
transmart_url=self.transmart_url,
gb_backend_url=self.gb_backend_url,
client_id=self.client_id,
client_secret=self.client_secret)

# logger.info('Clearing tree cache')
# reload_obj.clear_tree_nodes_cache()
# logger.info('Rebuilding tree cache')
# reload_obj.rebuild_tree_cache()
# logger.info('Scanning for new subscriptions')
# reload_obj.scan_subscription_queries()
logger.info('After data loading update; clearing caches and scanning query subscriptions')
logger.info('After data loading update; clearing and rebuilding caches, rebuilding subject sets')
reload_obj.after_data_loading()

logger.info('Waiting for the update to complete ...')
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(reload_obj.check_status(self.max_status_check_retrial))
finally:
loop.close()

logger.info('Scanning for new subscriptions')
reload_obj.scan_subscription_queries()


class CbioportalDataValidation(ExternalProgramTask):
"""
Expand Down
1 change: 1 addition & 0 deletions luigi.cfg-sample
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ modifiers = modifiers.txt
[TransmartApiTask]
keycloak_url = https://somekeycloak.net/auth/realms/transmart-name
transmart_url = https://transmart.thehyve.net
gb_backend_url = https://gb-backend.thehyve.net
client_id = transmart
client_secret = ''
transmart_username = admin
Expand Down
57 changes: 45 additions & 12 deletions scripts/transmart_api_calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@
import sys
import json
import requests
import asyncio


class TransmartApiException(Exception):
pass

class TransmartApiCalls(object):

def __init__(self, keycloak_url, username, password, transmart_url, client_id, client_secret):
def __init__(self, keycloak_url, username, password, transmart_url, gb_backend_url, client_id, client_secret):
self.url = keycloak_url
self.username = username
self.password = password
self.token = None
self.tm_url = transmart_url
self.gb_backend_url = gb_backend_url
self.client_id = client_id
self.client_secret = client_secret

Expand Down Expand Up @@ -57,10 +59,10 @@ def retrieve_token(self):

def scan_subscription_queries(self):
"""
Triggers the scan of stored set queries in TranSMART.
Triggers the scan of stored set queries in Gb Backend app.
"""
self.post('/v2/queries/sets/scan')
self.post('/queries/sets/scan', server_url=self.gb_backend_url)


def clear_tree_nodes_cache(self):
Expand All @@ -84,13 +86,43 @@ def rebuild_tree_cache(self):

def after_data_loading(self):
"""
Trigger a clear of the caches of TranSMART and scans for query subscriptions
Triggers a clear of the caches of TranSMART and scans for query subscriptions
"""
Console.info('After data loading update, clearing caches. Scanning query subscriptions')
Console.info('After data loading update, clearing and rebuilding caches.')
self.get('/v2/admin/system/after_data_loading_update')


def get(self, path):
def update_status(self):
"""
Gets a status report about the current after data loading update task
"""
Console.info('After data loading update status check.')
response = self.get('/v2/admin/system/update_status')
return response.json()


async def check_status(self, n, sleep=30.0):
"""
Checks the after data loading task status periodically every $sleep seconds
Waits max `sleep*n` seconds for the status to be `COMPLETED` or `FAILED`
:param n: max status call retrials
:param sleep: number of seconds before update_status is called again, default: 30s.
"""
for i in range(n):
update_status = self.update_status()
if update_status['status'] == 'COMPLETED':
return
elif update_status['status'] == 'FAILED':
Console.error('After data loading update failed. Error: %s' % (update_status['message']))
raise TransmartApiException('After data loading update failed. Error: %s' % (update_status['message']))
Console.info('%s/%s Current status of the update: %s. Sleeping for %s seconds ...' %
(i, n, update_status['status'], sleep))
await asyncio.sleep(sleep)
Console.error('After data loading update took too long: %s seconds. Transmart Api task interrupted.' % (n*sleep))
raise TransmartApiException('Timeout. Not able to finish an update task within %s seconds.' % (n*sleep))


def get(self, path, **kwargs):
"""
Performs a call to the server.
:param path: the API path to call.
Expand All @@ -101,20 +133,20 @@ def get(self, path):
'Accept': 'application/json',
'Authorization': 'Bearer ' + str(token)
}
url = self.tm_url + path
url = kwargs.get('server_url', self.tm_url) + path
response = None
Console.warning(url)
Console.info('Making a get call to: %s' % url)
try:
response = requests.get(url, headers=headers)
if not response.ok:
response.raise_for_status()
return response
except Exception as e:
Console.error('Retrieving %s failed: %s' % (path, response))
Console.error('Retrieving %s failed: %s' % (url, response))
raise TransmartApiException(e)


def post(self, path):
def post(self, path, **kwargs):
"""
Performs a post call to the server
:param path: the API path to call
Expand All @@ -125,15 +157,16 @@ def post(self, path):
'Accept': 'application/json',
'Authorization': 'Bearer ' + str(token)
}
url = self.tm_url + path
url = kwargs.get('server_url', self.tm_url) + path
response = None
Console.info('Making a post call to: %s' % url)
try:
response = requests.post(url, headers=headers)
if not response.ok:
response.raise_for_status()
return response
except Exception as e:
Console.error('Retrieving %s failed: %s' % (path, response))
Console.error('Retrieving %s failed: %s' % (url, response))
raise TransmartApiException(e)


Expand Down

0 comments on commit f5a92ff

Please sign in to comment.