From e83ce625bb4ec4e3904ecd84526b67e43f3e86a2 Mon Sep 17 00:00:00 2001 From: Nell Hardcastle Date: Wed, 7 Aug 2024 12:58:55 -0700 Subject: [PATCH 1/2] deps(worker): Drop all ElasticSearch and APM deps from worker --- services/datalad/Pipfile | 3 - services/datalad/Pipfile.lock | 56 ++----------------- services/datalad/datalad_service/app.py | 4 -- .../datalad_service/common/elasticsearch.py | 21 ------- services/datalad/datalad_service/config.py | 6 -- .../datalad/datalad_service/tasks/publish.py | 5 -- .../datalad_service/tasks/validator.py | 10 ++-- 7 files changed, 9 insertions(+), 96 deletions(-) delete mode 100644 services/datalad/datalad_service/common/elasticsearch.py diff --git a/services/datalad/Pipfile b/services/datalad/Pipfile index 11f3fbbe2..74f7c7817 100644 --- a/services/datalad/Pipfile +++ b/services/datalad/Pipfile @@ -20,10 +20,7 @@ requests = "*" GitPython = "*" PyJWT = ">=2" gunicorn = "*" -elastic-apm = "*" -falcon-elastic-apm = "*" boto3 = "*" -elasticsearch = "*" pygit2 = "==1.15.1" pygithub = "==2.3.0" greenlet = "*" diff --git a/services/datalad/Pipfile.lock b/services/datalad/Pipfile.lock index 2e5b3ef03..ae944b3da 100644 --- a/services/datalad/Pipfile.lock +++ b/services/datalad/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "776c79e7c5f454085dd75d45c31303fda75957d6872232d5f7cff7ff57b97c48" + "sha256": "778766c74be9f304a058acdc157034a0bdbcf66b6804db84366905a73e11aa6a" }, "pipfile-spec": 6, "requires": {}, @@ -292,40 +292,6 @@ "markers": "python_version >= '3.8'", "version": "==2.6.1" }, - "ecs-logging": { - "hashes": [ - "sha256:1dc9e216f614129db0e6a2f9f926da4e4cf8edf8de16d1045a20aa8e950291d3", - "sha256:f6e22d267770b06f797076f49b5fcc9d97108b22f452f5f9ed4b5367b1e61b5b" - ], - "markers": "python_version >= '3.6'", - "version": "==2.2.0" - }, - "elastic-apm": { - "hashes": [ - "sha256:1aeef4562486bd9ad611bba15f9eca5aeede6a737777541cb68c47c6cf5df7d4", - "sha256:5a1a17580560e70fba40b0a2e7682348422de5a9d2d13511efd05646eb4c62ae" - ], - "index": "pypi", - "markers": "python_version >= '3.6' and python_version < '4'", - "version": "==6.23.0" - }, - "elastic-transport": { - "hashes": [ - "sha256:16339d392b4bbe86ad00b4bdeecff10edf516d32bc6c16053846625f2c6ea250", - "sha256:5d4bb6b8e9d74a9c16de274e91a5caf65a3a8d12876f1e99152975e15b2746fe" - ], - "markers": "python_version >= '3.7'", - "version": "==8.13.1" - }, - "elasticsearch": { - "hashes": [ - "sha256:aa2490029dd96f4015b333c1827aa21fd6c0a4d223b00dfb0fe933b8d09a511b", - "sha256:cef8ef70a81af027f3da74a4f7d9296b390c636903088439087b8262a468c130" - ], - "index": "pypi", - "markers": "python_version >= '3.7'", - "version": "==8.14.0" - }, "falcon": { "hashes": [ "sha256:04a92f159d392098a11d14b8ca71d17129d8b1ef37b7a3577f1f8bcb7b3aecba", @@ -366,13 +332,6 @@ "markers": "python_version >= '3.5'", "version": "==3.1.3" }, - "falcon-elastic-apm": { - "hashes": [ - "sha256:4d25a9f2f73e62c9d453ee51350061d7add8f3b33dcf91b3d389b4acd84420b0" - ], - "index": "pypi", - "version": "==0.1.1" - }, "gitdb": { "hashes": [ "sha256:81a3407ddd2ee8df444cbacea00e2d038e40150acfa3001696fe0dcf1d3adfa4", @@ -1066,18 +1025,12 @@ "markers": "python_version < '3.12'", "version": "==1.2.0" }, - "boto": { - "hashes": [ - "sha256:147758d41ae7240dc989f0039f27da8ca0d53734be0eb869ef16e3adcfa462e8", - "sha256:ea0d3b40a2d852767be77ca343b58a9e3a4b00d9db440efb8da74b4e58025e5a" - ], - "version": "==2.49.0" - }, "boto3": { "hashes": [ "sha256:b33e9a8f8be80d3053b8418836a7c1900410b23a30c7cb040927d601a1082e68", "sha256:cbbd453270b8ce94ef9da60dfbb6f9ceeb3eeee226b635aa9ec44b1def98cc96" ], + "index": "pypi", "markers": "python_version >= '3.8'", "version": "==1.34.156" }, @@ -1167,7 +1120,7 @@ "sha256:f9338cc05451f1942d0d8203ec2c346c830f8e86469903d5126c1f0a13a2bcbb", "sha256:ffef8fd58a36fb5f1196919638f73dd3ae0db1a878982b27a9a5a176ede4ba91" ], - "markers": "platform_python_implementation != 'PyPy'", + "markers": "python_version >= '3.8'", "version": "==1.17.0" }, "chardet": { @@ -1271,6 +1224,7 @@ "sha256:fd1abc0d89e30cc4e02e4064dc67fcc51bd941eb395c502aac3ec19fab46b519", "sha256:ff8fa367d09b717b2a17a052544193ad76cd49979c805768879cb63d9ca50561" ], + "index": "pypi", "markers": "python_full_version >= '3.7.0'", "version": "==3.3.2" }, @@ -1386,7 +1340,6 @@ "sha256:ee0c405832ade84d4de74b9029bedb7b31200600fa524d218fc29bfa371e97f5", "sha256:fdcb265de28585de5b859ae13e3846a8e805268a823a12a4da2597f1f5afc9f0" ], - "markers": "python_version >= '3.7'", "version": "==43.0.0" }, "datalad": { @@ -1702,6 +1655,7 @@ "sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760", "sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6" ], + "index": "pypi", "markers": "python_version >= '3.8'", "version": "==2.32.3" }, diff --git a/services/datalad/datalad_service/app.py b/services/datalad/datalad_service/app.py index b437a91b3..7d7a6ae3a 100644 --- a/services/datalad/datalad_service/app.py +++ b/services/datalad/datalad_service/app.py @@ -1,6 +1,5 @@ import falcon import falcon.asgi -from falcon_elastic_apm import ElasticApmMiddleware import datalad_service.config from datalad_service.tasks.audit import audit_datasets @@ -40,9 +39,6 @@ def create_app(): raise Exception("Required DATALAD_DATASET_PATH environment variable is not defined") middleware = [AuthenticateMiddleware(), CustomErrorHandlerMiddleware()] - if datalad_service.config.ELASTIC_APM_SERVER_URL: - middleware.append(ElasticApmMiddleware(service_name='datalad-service', - server_url=datalad_service.config.ELASTIC_APM_SERVER_URL)) app = falcon.asgi.App(middleware=middleware) app.router_options.converters['path'] = PathConverter diff --git a/services/datalad/datalad_service/common/elasticsearch.py b/services/datalad/datalad_service/common/elasticsearch.py deleted file mode 100644 index 9cc410dc6..000000000 --- a/services/datalad/datalad_service/common/elasticsearch.py +++ /dev/null @@ -1,21 +0,0 @@ -from elasticsearch import Elasticsearch -from datetime import datetime -from datalad_service.config import ELASTICSEARCH_CONNECTION - - -class ValidationLogger: - def __init__(self, dataset_id, user): - self.es = Elasticsearch([ELASTICSEARCH_CONNECTION + ':9200']) - self.dataset_id = dataset_id - self.user = user - - def log(self, stdout, stderr, error): - body = { - 'dataset_id': self.dataset_id, - 'stdout': str(stdout), - 'stderr': str(stderr), - 'error': str(error), - 'timestamp': datetime.now(), - 'user': self.user - } - self.es.index(index='logs-validation', body=body) diff --git a/services/datalad/datalad_service/config.py b/services/datalad/datalad_service/config.py index 91b063f3b..651f52181 100644 --- a/services/datalad/datalad_service/config.py +++ b/services/datalad/datalad_service/config.py @@ -31,11 +31,5 @@ # Redis Host REDIS_HOST = os.getenv('REDIS_HOST', 'redis') -# The path to connect to Elastic APM server -ELASTIC_APM_SERVER_URL = os.getenv('ELASTIC_APM_SERVER_URL') - -# Elasticsearch URL -ELASTICSEARCH_CONNECTION = os.getenv('ELASTICSEARCH_CONNECTION') - # Site URL CRN_SERVER_URL = os.getenv('CRN_SERVER_URL') diff --git a/services/datalad/datalad_service/tasks/publish.py b/services/datalad/datalad_service/tasks/publish.py index 1d6704e0a..61ae8f43e 100644 --- a/services/datalad/datalad_service/tasks/publish.py +++ b/services/datalad/datalad_service/tasks/publish.py @@ -3,7 +3,6 @@ import os.path import re -import elasticapm import pygit2 import boto3 from github import Github @@ -54,14 +53,12 @@ def create_remotes_and_export(dataset_path, cookies=None): export_dataset(dataset_path, cookies) -@elasticapm.capture_span() def create_remotes(dataset_path): dataset = os.path.basename(dataset_path) s3_sibling(dataset_path) github_sibling(dataset_path, dataset) -@elasticapm.capture_span() def export_dataset(dataset_path, cookies=None, s3_export=s3_export, github_export=github_export, update_s3_sibling=update_s3_sibling, github_enabled=DATALAD_GITHUB_EXPORTS_ENABLED): """ Export dataset to S3 and GitHub. @@ -113,7 +110,6 @@ def check_remote_has_version(dataset_path, remote, tag): return remote_id_A == remote_id_B and tree_id_A == tree_id_B -@elasticapm.capture_span() async def delete_s3_sibling(dataset_id): try: client = boto3.client( @@ -145,7 +141,6 @@ async def delete_s3_sibling(dataset_id): f'Attempt to delete dataset {dataset_id} from {get_s3_remote()} has failed. ({e})') -@elasticapm.capture_span() async def delete_github_sibling(dataset_id): ses = Github(DATALAD_GITHUB_LOGIN, DATALAD_GITHUB_PASS) org = ses.get_organization(DATALAD_GITHUB_ORG) diff --git a/services/datalad/datalad_service/tasks/validator.py b/services/datalad/datalad_service/tasks/validator.py index 4aa30fdc6..7b330586c 100644 --- a/services/datalad/datalad_service/tasks/validator.py +++ b/services/datalad/datalad_service/tasks/validator.py @@ -5,7 +5,6 @@ import re from datalad_service.config import GRAPHQL_ENDPOINT -from datalad_service.common.elasticsearch import ValidationLogger LEGACY_VALIDATOR_VERSION = json.load( @@ -34,7 +33,7 @@ async def setup_validator(): await process.wait() -async def run_and_decode(args, timeout, esLogger): +async def run_and_decode(args, timeout): """Run a subprocess and return the JSON output.""" process = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) try: @@ -53,7 +52,7 @@ async def run_and_decode(args, timeout, esLogger): esLogger.log(stdout, stderr, err) -async def validate_dataset_call(dataset_path, ref, esLogger): +async def validate_dataset_call(dataset_path, ref): """ Synchronous dataset validation. @@ -127,8 +126,7 @@ def issues_mutation(dataset_id, ref, issues, validator_metadata): async def validate_dataset(dataset_id, dataset_path, ref, cookies=None, user=''): - esLogger = ValidationLogger(dataset_id, user) - validator_output = await validate_dataset_call(dataset_path, ref, esLogger) + validator_output = await validate_dataset_call(dataset_path, ref) all_issues = validator_output['issues']['warnings'] + \ validator_output['issues']['errors'] if validator_output: @@ -146,7 +144,7 @@ async def validate_dataset(dataset_id, dataset_path, ref, cookies=None, user='') raise Exception('Validation failed unexpectedly') # New schema validator second in case of issues - validator_output_deno = await validate_dataset_deno_call(dataset_path, ref, esLogger) + validator_output_deno = await validate_dataset_deno_call(dataset_path, ref) if validator_output_deno: if 'issues' in validator_output_deno: r = requests.post( From 55d74517a572386cf43c0058aa2c08fc28c56326 Mon Sep 17 00:00:00 2001 From: Nell Hardcastle Date: Wed, 7 Aug 2024 13:26:52 -0700 Subject: [PATCH 2/2] fix(worker): Log validation failures to ASGI logs instead of ElasticSearch --- .../datalad_service/tasks/validator.py | 19 ++++++++++++------- services/datalad/tests/test_validator.py | 12 ++++++++---- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/services/datalad/datalad_service/tasks/validator.py b/services/datalad/datalad_service/tasks/validator.py index 7b330586c..b2c9a8ad7 100644 --- a/services/datalad/datalad_service/tasks/validator.py +++ b/services/datalad/datalad_service/tasks/validator.py @@ -1,11 +1,14 @@ import asyncio import json +import logging import os -import requests import re +import requests + from datalad_service.config import GRAPHQL_ENDPOINT +logger = logging.getLogger('datalad_service.' + __name__) LEGACY_VALIDATOR_VERSION = json.load( open('package.json'))['dependencies']['bids-validator'] @@ -33,12 +36,13 @@ async def setup_validator(): await process.wait() -async def run_and_decode(args, timeout): +async def run_and_decode(args, timeout, logger): """Run a subprocess and return the JSON output.""" process = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) try: await asyncio.wait_for(process.wait(), timeout=timeout) except asyncio.TimeoutError: + logger.warning(f'Timed out while running `{" ".join(args)}`') process.kill() # Retrieve what we can from the process @@ -49,10 +53,12 @@ async def run_and_decode(args, timeout): try: return json.loads(escape_ansi(stdout.decode('utf-8'))) except json.decoder.JSONDecodeError as err: - esLogger.log(stdout, stderr, err) + logger.exception(err) + logger.info(stdout) + logger.error(stderr) -async def validate_dataset_call(dataset_path, ref): +async def validate_dataset_call(dataset_path, ref, logger=logger): """ Synchronous dataset validation. @@ -62,11 +68,11 @@ async def validate_dataset_call(dataset_path, ref): return await run_and_decode( ['./node_modules/.bin/bids-validator', '--json', '--ignoreSubjectConsistency', dataset_path], timeout=300, - esLogger=esLogger, + logger=logger, ) -async def validate_dataset_deno_call(dataset_path, ref, esLogger): +async def validate_dataset_deno_call(dataset_path, ref): """ Synchronous dataset validation. @@ -77,7 +83,6 @@ async def validate_dataset_deno_call(dataset_path, ref, esLogger): f'https://deno.land/x/bids_validator@{DENO_VALIDATOR_VERSION}/bids-validator.ts', '--json', dataset_path], timeout=300, - esLogger=esLogger, ) diff --git a/services/datalad/tests/test_validator.py b/services/datalad/tests/test_validator.py index 16e3d14bf..0c9e2e95d 100644 --- a/services/datalad/tests/test_validator.py +++ b/services/datalad/tests/test_validator.py @@ -12,10 +12,12 @@ class MockLogger: async def test_validator_error(new_dataset): logger = MockLogger() - logger.log = Mock() + logger.info = Mock() + logger.error = Mock() + logger.exception = Mock() await validate_dataset_call(new_dataset.path, 'HEAD', logger) # new_dataset completes validation with errors, should not call logger - assert not logger.log.called + assert not logger.exception.called @pytest.fixture @@ -35,6 +37,8 @@ async def invalidJson(): async def test_validator_bad_json(new_dataset, mock_validator_crash): logger = MockLogger() - logger.log = Mock() + logger.info = Mock() + logger.error = Mock() + logger.exception = Mock() await validate_dataset_call(new_dataset.path, 'HEAD', logger) - assert logger.log.called + assert logger.exception.called