From d81a1328305880bb7ebb2848b2120e3b23fceb53 Mon Sep 17 00:00:00 2001 From: ewelinagr Date: Tue, 2 Apr 2019 15:53:36 +0200 Subject: [PATCH] Call gb-backend scan call after the data load TMT-799 --- luigi-pipeline/main.py | 23 +++++++++----- luigi.cfg-sample | 1 + scripts/transmart_api_calls.py | 57 +++++++++++++++++++++++++++------- 3 files changed, 62 insertions(+), 19 deletions(-) diff --git a/luigi-pipeline/main.py b/luigi-pipeline/main.py index 2d47219..f21ce1b 100644 --- a/luigi-pipeline/main.py +++ b/luigi-pipeline/main.py @@ -4,6 +4,7 @@ import luigi import time import threading +import asyncio from .luigi_commons import BaseTask, ExternalProgramTask @@ -234,24 +235,32 @@ class TransmartApiTask(BaseTask): transmart_url = luigi.Parameter(description='URL of the tranSMART instance', significant=False) transmart_username = luigi.Parameter(description='Username for an admin account', significant=False) transmart_password = luigi.Parameter(description='Password for the admin account', significant=False) + gb_backend_url = luigi.Parameter(description='URL of the gb backend instance', significant=False) + + max_status_check_retrial = 240 def run(self): reload_obj = TransmartApiCalls(keycloak_url=self.keycloak_url, username=self.transmart_username, password=self.transmart_password, transmart_url=self.transmart_url, + gb_backend_url=self.gb_backend_url, client_id=self.client_id, client_secret=self.client_secret) - # logger.info('Clearing tree cache') - # reload_obj.clear_tree_nodes_cache() - # logger.info('Rebuilding tree cache') - # reload_obj.rebuild_tree_cache() - # logger.info('Scanning for new subscriptions') - # reload_obj.scan_subscription_queries() - logger.info('After data loading update; clearing caches and scanning query subscriptions') + logger.info('After data loading update; clearing and rebuilding caches, rebuilding subject sets') reload_obj.after_data_loading() + logger.info('Waiting for the update to complete ...') + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(reload_obj.check_status(self.max_status_check_retrial)) + finally: + loop.close() + + logger.info('Scanning for new subscriptions') + reload_obj.scan_subscription_queries() + class CbioportalDataValidation(ExternalProgramTask): """ diff --git a/luigi.cfg-sample b/luigi.cfg-sample index 6a0293f..461c11c 100644 --- a/luigi.cfg-sample +++ b/luigi.cfg-sample @@ -81,6 +81,7 @@ modifiers = modifiers.txt [TransmartApiTask] keycloak_url = https://somekeycloak.net/auth/realms/transmart-name transmart_url = https://transmart.thehyve.net +gb_backend_url = https://gb-backend.thehyve.net client_id = transmart client_secret = '' transmart_username = admin diff --git a/scripts/transmart_api_calls.py b/scripts/transmart_api_calls.py index b679c2b..22c0f7d 100644 --- a/scripts/transmart_api_calls.py +++ b/scripts/transmart_api_calls.py @@ -2,6 +2,7 @@ import sys import json import requests +import asyncio class TransmartApiException(Exception): @@ -9,12 +10,13 @@ class TransmartApiException(Exception): class TransmartApiCalls(object): - def __init__(self, keycloak_url, username, password, transmart_url, client_id, client_secret): + def __init__(self, keycloak_url, username, password, transmart_url, gb_backend_url, client_id, client_secret): self.url = keycloak_url self.username = username self.password = password self.token = None self.tm_url = transmart_url + self.gb_backend_url = gb_backend_url self.client_id = client_id self.client_secret = client_secret @@ -57,10 +59,10 @@ def retrieve_token(self): def scan_subscription_queries(self): """ - Triggers the scan of stored set queries in TranSMART. + Triggers the scan of stored set queries in Gb Backend app. """ - self.post('/v2/queries/sets/scan') + self.post('/queries/sets/scan', server_url=self.gb_backend_url) def clear_tree_nodes_cache(self): @@ -84,13 +86,43 @@ def rebuild_tree_cache(self): def after_data_loading(self): """ - Trigger a clear of the caches of TranSMART and scans for query subscriptions + Triggers a clear of the caches of TranSMART and scans for query subscriptions """ - Console.info('After data loading update, clearing caches. Scanning query subscriptions') + Console.info('After data loading update, clearing and rebuilding caches.') self.get('/v2/admin/system/after_data_loading_update') - def get(self, path): + def update_status(self): + """ + Gets a status report about the current after data loading update task + """ + Console.info('After data loading update status check.') + response = self.get('/v2/admin/system/update_status') + return response.json() + + + async def check_status(self, n, sleep=30.0): + """ + Checks the after data loading task status periodically every $sleep seconds + Waits max `sleep*n` seconds for the status to be `COMPLETED` or `FAILED` + :param n: max status call retrials + :param sleep: number of seconds before update_status is called again, default: 30s. + """ + for i in range(n): + update_status = self.update_status() + if update_status['status'] == 'COMPLETED': + return + elif update_status['status'] == 'FAILED': + Console.error('After data loading update failed. Error: %s' % (update_status['message'])) + raise TransmartApiException('After data loading update failed. Error: %s' % (update_status['message'])) + Console.info('%s/%s Current status of the update: %s. Sleeping for %s seconds ...' % + (i, n, update_status['status'], sleep)) + await asyncio.sleep(sleep) + Console.error('After data loading update took too long: %s seconds. Transmart Api task interrupted.' % (n*sleep)) + raise TransmartApiException('Timeout. Not able to finish an update task within %s seconds.' % (n*sleep)) + + + def get(self, path, **kwargs): """ Performs a call to the server. :param path: the API path to call. @@ -101,20 +133,20 @@ def get(self, path): 'Accept': 'application/json', 'Authorization': 'Bearer ' + str(token) } - url = self.tm_url + path + url = kwargs.get('server_url', self.tm_url) + path response = None - Console.warning(url) + Console.info('Making a get call to: %s' % url) try: response = requests.get(url, headers=headers) if not response.ok: response.raise_for_status() return response except Exception as e: - Console.error('Retrieving %s failed: %s' % (path, response)) + Console.error('Retrieving %s failed: %s' % (url, response)) raise TransmartApiException(e) - def post(self, path): + def post(self, path, **kwargs): """ Performs a post call to the server :param path: the API path to call @@ -125,15 +157,16 @@ def post(self, path): 'Accept': 'application/json', 'Authorization': 'Bearer ' + str(token) } - url = self.tm_url + path + url = kwargs.get('server_url', self.tm_url) + path response = None + Console.info('Making a post call to: %s' % url) try: response = requests.post(url, headers=headers) if not response.ok: response.raise_for_status() return response except Exception as e: - Console.error('Retrieving %s failed: %s' % (path, response)) + Console.error('Retrieving %s failed: %s' % (url, response)) raise TransmartApiException(e)