From b8f8abdbc736b32bc865f51a7972545cd49fe12b Mon Sep 17 00:00:00 2001 From: shawnsarwar Date: Tue, 18 Aug 2020 13:01:51 +0200 Subject: [PATCH] feat: (producer) thread level healthchecks (#886) (#887) * feat: deep healthchecks of threads * fix: add failback time format to handle some old timestamps in production --- aether-producer/aether/producer/__init__.py | 47 +++++++++++++++++++-- aether-producer/aether/producer/kernel.py | 6 ++- aether-producer/aether/producer/settings.py | 10 +++++ aether-producer/aether/producer/topic.py | 9 +++- aether-producer/tests/__init__.py | 1 + aether-producer/tests/test_integration.py | 18 ++++++-- 6 files changed, 81 insertions(+), 10 deletions(-) diff --git a/aether-producer/aether/producer/__init__.py b/aether-producer/aether/producer/__init__.py index d8433a709..87cb7911d 100644 --- a/aether-producer/aether/producer/__init__.py +++ b/aether-producer/aether/producer/__init__.py @@ -16,12 +16,16 @@ # specific language governing permissions and limitations # under the License. +from datetime import datetime +from functools import wraps +import json import signal import socket -from functools import wraps from confluent_kafka.admin import AdminClient -from flask import Flask, Response, request, jsonify +from flask import ( + Flask, jsonify, request, Response +) import gevent from gevent.pool import Pool @@ -31,6 +35,7 @@ from aether.producer.settings import KAFKA_SETTINGS, SETTINGS, LOG_LEVEL, get_logger from aether.producer.topic import KafkaStatus, TopicStatus, RealmManager + # How to access Kernel: API (default) | DB if SETTINGS.get('kernel_access_type', 'api').lower() != 'db': from aether.producer.kernel_api import KernelAPIClient as KernelClient @@ -67,6 +72,7 @@ def __init__(self): # Clear objects and start self.kafka_status = KafkaStatus.SUBMISSION_PENDING self.realm_managers = {} + self.thread_idle = {} self.run() def keep_alive_loop(self): @@ -153,6 +159,13 @@ def broker_info(self): except Exception as err: return {'error': f'{err}'} + def thread_checkin(self, _id): + self.thread_idle[_id] = datetime.now() + + def thread_set_inactive(self, _id): + if _id in self.thread_idle: + del self.thread_idle[_id] + # Connect to offset def init_db(self): init_offset_db() @@ -182,6 +195,26 @@ def check_realms(self): continue self.logger.debug('No longer checking for new Realms') + def check_thread_health(self): + max_idle = SETTINGS.get('MAX_JOB_IDLE_SEC', 600) + idle_times = self.get_thread_idle() + if not idle_times: + return {} + self.logger.debug(f'idle times (s) {idle_times}') + expired = {k: v for k, v in idle_times.items() if v > max_idle} + if expired: + self.logger.error(f'Expired threads (s) {expired}') + return expired + + def get_thread_idle(self): + _now = datetime.now() + # if a job is inactive (stopped / paused intentionally or died naturally) + # or if a job is still starting up + # then it's removed from the check and given a default value of _now + idle_times = {_id: int((_now - self.thread_idle.get(_id, _now)).total_seconds()) + for _id in self.realm_managers.keys()} + return idle_times + # Flask Functions def add_endpoints(self): @@ -236,7 +269,15 @@ def decorated(self, *args, **kwargs): def request_healthcheck(self): with self.app.app_context(): - return Response({'healthy': True}) + try: + expired = self.check_thread_health() + if not expired: + return Response({'healthy': True}) + else: + return Response(json.dumps(expired), 500, mimetype='application/json') + except Exception as err: + self.app.logger.error(f'Unexpected HC error: {err}') + return Response(f'Unexpected error: {err}', 500) def request_kernelcheck(self): with self.app.app_context(): diff --git a/aether-producer/aether/producer/kernel.py b/aether-producer/aether/producer/kernel.py index 030edf0a7..a553ba5c2 100644 --- a/aether-producer/aether/producer/kernel.py +++ b/aether-producer/aether/producer/kernel.py @@ -25,6 +25,7 @@ _WINDOW_SIZE_SEC = int(SETTINGS.get('window_size_sec', 3)) _TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' +_FAILBACK_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S' class KernelClient(object): @@ -46,7 +47,10 @@ def get_time_window_filter(self, query_time): # based on the insert time and now() to provide a buffer. def fn(row): - committed = datetime.strptime(row.get('modified')[:26], _TIME_FORMAT) + try: + committed = datetime.strptime(row.get('modified')[:26], _TIME_FORMAT) + except ValueError: + committed = datetime.strptime(row.get('modified')[:19], _FAILBACK_TIME_FORMAT) lag_time = (query_time - committed).total_seconds() if lag_time > _WINDOW_SIZE_SEC: return True diff --git a/aether-producer/aether/producer/settings.py b/aether-producer/aether/producer/settings.py index 2d6c3da3c..58fe255e5 100644 --- a/aether-producer/aether/producer/settings.py +++ b/aether-producer/aether/producer/settings.py @@ -25,6 +25,7 @@ class Settings(dict): # A container for our settings def __init__(self, file_path=None): + self.overrides = {} self.load(file_path) def get(self, key, default=None): @@ -37,12 +38,21 @@ def get_required(self, key): return self.__getitem__(key) def __getitem__(self, key): + if key in self.overrides: + return self.overrides[key] result = os.environ.get(key.upper()) if result is None: result = super().__getitem__(key.lower()) return result + def override(self, key, value): + # case sensitive override of a setting (for testing purposes) + self.overrides[key] = value + + def clear_overrides(self): + self.overrides = {} + def load(self, path): with open(path) as f: obj = json.load(f) diff --git a/aether-producer/aether/producer/topic.py b/aether-producer/aether/producer/topic.py index cfddf69b0..6a5b24daf 100644 --- a/aether-producer/aether/producer/topic.py +++ b/aether-producer/aether/producer/topic.py @@ -22,7 +22,8 @@ import io import json import traceback -from typing import (Any, Dict) +from typing import (Any, Dict, TYPE_CHECKING) + from datetime import datetime from confluent_kafka import Producer @@ -36,6 +37,9 @@ from aether.producer.db import Offset from aether.producer.settings import SETTINGS, KAFKA_SETTINGS, get_logger +if TYPE_CHECKING: + from aether.producer import ProducerManager + logger = get_logger('topic') @@ -105,7 +109,7 @@ class RealmManager(object): # Creates a long running job on RealmManager.update_kafka - def __init__(self, context, realm): + def __init__(self, context: 'ProducerManager', realm: str): self.context = context self.realm = realm self.sleep_time = int(SETTINGS.get('sleep_time', 10)) @@ -239,6 +243,7 @@ def get_topic_size(self, sw: SchemaWrapper): def update_loop(self): while not self.context.killed: + self.context.thread_checkin(self.realm) logger.info(f'Looking for updates on: {self.realm}') self.producer.poll(0) self.update_schemas() diff --git a/aether-producer/tests/__init__.py b/aether-producer/tests/__init__.py index ff5902c4e..f39eaf834 100644 --- a/aether-producer/tests/__init__.py +++ b/aether-producer/tests/__init__.py @@ -64,3 +64,4 @@ def __init__(self): self.kafka_admin_client = MockAdminInterface() self.logger = get_logger('tests') self.realm_managers = {} + self.thread_idle = {} diff --git a/aether-producer/tests/test_integration.py b/aether-producer/tests/test_integration.py index cc303aaed..aad835a8c 100644 --- a/aether-producer/tests/test_integration.py +++ b/aether-producer/tests/test_integration.py @@ -32,6 +32,8 @@ def test_manager_http_endpoint_service(): man = MockProducerManager() try: + SETTINGS.override('MAX_JOB_IDLE_SEC', 1) + _realm = 'fake_realm' auth = requests.auth.HTTPBasicAuth(man.admin_name, man.admin_password) man.serve() man.add_endpoints() @@ -39,21 +41,29 @@ def test_manager_http_endpoint_service(): url = 'http://localhost:%s' % SETTINGS.get('server_port', 9005) r = requests.head(f'{url}/healthcheck') - assert(r.status_code == 200) + assert(r.status_code == 200), r.text r = requests.head(f'{url}/kernelcheck') - assert(r.status_code == 424) + assert(r.status_code == 424), r.text protected_endpoints = ['status', 'topics'] for e in protected_endpoints: r = requests.head(f'{url}/{e}') - assert(r.status_code == 401) + assert(r.status_code == 401), r.text for e in protected_endpoints: r = requests.head(f'{url}/{e}', auth=auth) - assert(r.status_code == 200) + assert(r.status_code == 200), r.text + + man.realm_managers[_realm] = {} + man.thread_checkin(_realm) + sleep(2) + r = requests.get(f'{url}/healthcheck') + assert(r.status_code == 500) + assert(_realm in r.json().keys()) finally: + SETTINGS.clear_overrides() man.http.stop() man.http.close() man.worker_pool.kill()