Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
feat: (producer) thread level healthchecks (#886) (#887)
Browse files Browse the repository at this point in the history
* feat: deep healthchecks of threads

* fix: add failback time format to handle some old timestamps in production
  • Loading branch information
shawnsarwar authored Aug 18, 2020
1 parent 1688236 commit b8f8abd
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 10 deletions.
47 changes: 44 additions & 3 deletions aether-producer/aether/producer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime
from functools import wraps
import json
import signal
import socket
from functools import wraps

from confluent_kafka.admin import AdminClient
from flask import Flask, Response, request, jsonify
from flask import (
Flask, jsonify, request, Response
)

import gevent
from gevent.pool import Pool
Expand All @@ -31,6 +35,7 @@
from aether.producer.settings import KAFKA_SETTINGS, SETTINGS, LOG_LEVEL, get_logger
from aether.producer.topic import KafkaStatus, TopicStatus, RealmManager


# How to access Kernel: API (default) | DB
if SETTINGS.get('kernel_access_type', 'api').lower() != 'db':
from aether.producer.kernel_api import KernelAPIClient as KernelClient
Expand Down Expand Up @@ -67,6 +72,7 @@ def __init__(self):
# Clear objects and start
self.kafka_status = KafkaStatus.SUBMISSION_PENDING
self.realm_managers = {}
self.thread_idle = {}
self.run()

def keep_alive_loop(self):
Expand Down Expand Up @@ -153,6 +159,13 @@ def broker_info(self):
except Exception as err:
return {'error': f'{err}'}

def thread_checkin(self, _id):
self.thread_idle[_id] = datetime.now()

def thread_set_inactive(self, _id):
if _id in self.thread_idle:
del self.thread_idle[_id]

# Connect to offset
def init_db(self):
init_offset_db()
Expand Down Expand Up @@ -182,6 +195,26 @@ def check_realms(self):
continue
self.logger.debug('No longer checking for new Realms')

def check_thread_health(self):
max_idle = SETTINGS.get('MAX_JOB_IDLE_SEC', 600)
idle_times = self.get_thread_idle()
if not idle_times:
return {}
self.logger.debug(f'idle times (s) {idle_times}')
expired = {k: v for k, v in idle_times.items() if v > max_idle}
if expired:
self.logger.error(f'Expired threads (s) {expired}')
return expired

def get_thread_idle(self):
_now = datetime.now()
# if a job is inactive (stopped / paused intentionally or died naturally)
# or if a job is still starting up
# then it's removed from the check and given a default value of _now
idle_times = {_id: int((_now - self.thread_idle.get(_id, _now)).total_seconds())
for _id in self.realm_managers.keys()}
return idle_times

# Flask Functions

def add_endpoints(self):
Expand Down Expand Up @@ -236,7 +269,15 @@ def decorated(self, *args, **kwargs):

def request_healthcheck(self):
with self.app.app_context():
return Response({'healthy': True})
try:
expired = self.check_thread_health()
if not expired:
return Response({'healthy': True})
else:
return Response(json.dumps(expired), 500, mimetype='application/json')
except Exception as err:
self.app.logger.error(f'Unexpected HC error: {err}')
return Response(f'Unexpected error: {err}', 500)

def request_kernelcheck(self):
with self.app.app_context():
Expand Down
6 changes: 5 additions & 1 deletion aether-producer/aether/producer/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

_WINDOW_SIZE_SEC = int(SETTINGS.get('window_size_sec', 3))
_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
_FAILBACK_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'


class KernelClient(object):
Expand All @@ -46,7 +47,10 @@ def get_time_window_filter(self, query_time):
# based on the insert time and now() to provide a buffer.

def fn(row):
committed = datetime.strptime(row.get('modified')[:26], _TIME_FORMAT)
try:
committed = datetime.strptime(row.get('modified')[:26], _TIME_FORMAT)
except ValueError:
committed = datetime.strptime(row.get('modified')[:19], _FAILBACK_TIME_FORMAT)
lag_time = (query_time - committed).total_seconds()
if lag_time > _WINDOW_SIZE_SEC:
return True
Expand Down
10 changes: 10 additions & 0 deletions aether-producer/aether/producer/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Settings(dict):
# A container for our settings

def __init__(self, file_path=None):
self.overrides = {}
self.load(file_path)

def get(self, key, default=None):
Expand All @@ -37,12 +38,21 @@ def get_required(self, key):
return self.__getitem__(key)

def __getitem__(self, key):
if key in self.overrides:
return self.overrides[key]
result = os.environ.get(key.upper())
if result is None:
result = super().__getitem__(key.lower())

return result

def override(self, key, value):
# case sensitive override of a setting (for testing purposes)
self.overrides[key] = value

def clear_overrides(self):
self.overrides = {}

def load(self, path):
with open(path) as f:
obj = json.load(f)
Expand Down
9 changes: 7 additions & 2 deletions aether-producer/aether/producer/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import io
import json
import traceback
from typing import (Any, Dict)
from typing import (Any, Dict, TYPE_CHECKING)

from datetime import datetime

from confluent_kafka import Producer
Expand All @@ -36,6 +37,9 @@
from aether.producer.db import Offset
from aether.producer.settings import SETTINGS, KAFKA_SETTINGS, get_logger

if TYPE_CHECKING:
from aether.producer import ProducerManager

logger = get_logger('topic')


Expand Down Expand Up @@ -105,7 +109,7 @@ class RealmManager(object):

# Creates a long running job on RealmManager.update_kafka

def __init__(self, context, realm):
def __init__(self, context: 'ProducerManager', realm: str):
self.context = context
self.realm = realm
self.sleep_time = int(SETTINGS.get('sleep_time', 10))
Expand Down Expand Up @@ -239,6 +243,7 @@ def get_topic_size(self, sw: SchemaWrapper):

def update_loop(self):
while not self.context.killed:
self.context.thread_checkin(self.realm)
logger.info(f'Looking for updates on: {self.realm}')
self.producer.poll(0)
self.update_schemas()
Expand Down
1 change: 1 addition & 0 deletions aether-producer/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ def __init__(self):
self.kafka_admin_client = MockAdminInterface()
self.logger = get_logger('tests')
self.realm_managers = {}
self.thread_idle = {}
18 changes: 14 additions & 4 deletions aether-producer/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,38 @@
def test_manager_http_endpoint_service():
man = MockProducerManager()
try:
SETTINGS.override('MAX_JOB_IDLE_SEC', 1)
_realm = 'fake_realm'
auth = requests.auth.HTTPBasicAuth(man.admin_name, man.admin_password)
man.serve()
man.add_endpoints()
sleep(1)

url = 'http://localhost:%s' % SETTINGS.get('server_port', 9005)
r = requests.head(f'{url}/healthcheck')
assert(r.status_code == 200)
assert(r.status_code == 200), r.text

r = requests.head(f'{url}/kernelcheck')
assert(r.status_code == 424)
assert(r.status_code == 424), r.text

protected_endpoints = ['status', 'topics']
for e in protected_endpoints:
r = requests.head(f'{url}/{e}')
assert(r.status_code == 401)
assert(r.status_code == 401), r.text

for e in protected_endpoints:
r = requests.head(f'{url}/{e}', auth=auth)
assert(r.status_code == 200)
assert(r.status_code == 200), r.text

man.realm_managers[_realm] = {}
man.thread_checkin(_realm)
sleep(2)
r = requests.get(f'{url}/healthcheck')
assert(r.status_code == 500)
assert(_realm in r.json().keys())

finally:
SETTINGS.clear_overrides()
man.http.stop()
man.http.close()
man.worker_pool.kill()
Expand Down

0 comments on commit b8f8abd

Please sign in to comment.