Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
feat: added timeout monitoring of threads triggered by healthcheck (#25)
Browse files Browse the repository at this point in the history
* feat: added timeout monitoring of threads triggered by healthcheck
  • Loading branch information
shawnsarwar authored Aug 12, 2020
1 parent 61be2f5 commit b620552
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 9 deletions.
9 changes: 8 additions & 1 deletion aet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,14 @@ def decorated(self, *args, **kwargs):

def request_healthcheck(self) -> Response:
with self.app.app_context():
return Response({"healthy": True})
try:
expired = self.consumer.healthcheck()
if not expired:
return Response({"healthy": True})
else:
return Response(expired, 500)
except Exception as err:
return Response(f"Unexpected error: {err}", 500)

# Generic CRUD

Expand Down
36 changes: 36 additions & 0 deletions aet/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
# specific language governing permissions and limitations
# under the License.

import json
from time import sleep
from typing import Any, ClassVar, Dict
import sys
import threading
import traceback

from aether.python.redis.task import TaskHelper
from flask import Response
Expand Down Expand Up @@ -90,6 +94,38 @@ def stop(self, *args, **kwargs):
sleep(.25)
LOG.info('Shutdown Complete')

def healthcheck(self, *args, **kwargs):
idle_times = self.job_manager.status()
max_idle = self.consumer_settings.get('MAX_JOB_IDLE_SEC', 600)
if not idle_times:
return {}
LOG.info(f'idle times (s) {idle_times}')
expired = {k: v for k, v in idle_times.items() if v > max_idle}
if expired:
LOG.error(f'Expired threads (s) {expired}')
if self.consumer_settings.get('DUMP_STACK_ON_EXPIRE', False):
LOG.critical('Healthcheck failed! Dumping stack.')
self.dump_stack()
return expired

def dump_stack(self):
_names = {th.ident: th.name for th in threading.enumerate()}
_threads = []
for _id, stack in sys._current_frames().items():
_info = {
'id': _id,
'name': _names.get(_id)}
frames = []
for filename, lineno, name, line in traceback.extract_stack(stack):
frames.append({
'filename': filename,
'line_no': lineno,
'name': name,
'line': line.strip() or None})
_info['frames'] = frames
_threads.append(_info)
LOG.error(json.dumps(_threads, indent=2))

# Generic API Functions that aren't pure delegation to Redis

def validate(self, job, _type=None, verbose=False, tenant=None):
Expand Down
27 changes: 25 additions & 2 deletions aet/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from abc import abstractmethod
from copy import deepcopy
from datetime import datetime
import enum
from time import sleep
from threading import Thread
Expand Down Expand Up @@ -97,10 +98,11 @@ def reference(cls) -> JobReference:
name = require_property(cls.name)
return JobReference(name)

def __init__(self, _id: str, tenant: str, resources: InstanceManager):
def __init__(self, _id: str, tenant: str, resources: InstanceManager, context: 'JobManager'):
self._id = _id
self.tenant = tenant
self.resources = resources
self.context = context
self.log_stack = []
self.log = callback_logger(f'j-{self.tenant}-{self._id}', self.log_stack, 100)
self._setup()
Expand Down Expand Up @@ -158,6 +160,7 @@ def _run(self):
try:
c = 0
while self.status is not JobStatus.STOPPED:
self.context.check_in(self._id, datetime.now())
c += 1
if c % self.report_interval == 0:
self.log.debug(f'thread {self._id} running : {self.status}')
Expand Down Expand Up @@ -187,10 +190,13 @@ def _run(self):
except RuntimeError as rer:
self.log.critical(f'RuntimeError: {self._id} | {rer}')
self.safe_sleep(self.sleep_delay)

self.context.set_inactive(self._id)
self.log.debug(f'Job {self._id} stopped normally.')
except Exception as fatal:
self.log.critical(f'job {self._id} failed with critical error {type(fatal)}: {fatal}')
self.log.error(''.join(traceback.format_tb(fatal.__traceback__)))
self.context.set_inactive(self._id)
self.status = JobStatus.DEAD
return # we still want to be able to read the logs so don't re-raise

Expand Down Expand Up @@ -272,6 +278,7 @@ def safe_sleep(self, dur):
def stop(self, *args, **kwargs):
# return thread to be observed
self.log.info(f'Job {self._id} caught stop signal.')
self.context.set_inactive(self._id)
self.status = JobStatus.STOPPED
return self._thread

Expand All @@ -294,6 +301,7 @@ def get_job_id(job: Union[str, Dict[str, Any]], tenant: str):

def __init__(self, task_master: TaskHelper, job_class: Callable = BaseJob):
self.jobs = {}
self.check_ins = {}
self.task = task_master
self.job_class = job_class # type: ignore
self.resources = InstanceManager(self.job_class._resources)
Expand All @@ -310,6 +318,21 @@ def stop(self, *args, **kwargs):
LOG.info('Stopping Resources...')
[t.join() for t in self.resources.stop()]

def set_inactive(self, _id):
if _id in self.check_ins:
del self.check_ins[_id]

def check_in(self, _id, ts: datetime):
self.check_ins[_id] = ts

def status(self):
_now = datetime.now()
# if a job is inactive (stopped / paused intentionally or died naturally)
# then it's remove from the check and given a value of _now
idle_times = {_id: int((_now - self.check_ins.get(_id, _now)).total_seconds())
for _id in self.jobs.keys()}
return idle_times

# Job Initialization

def _init_jobs(self):
Expand Down Expand Up @@ -347,7 +370,7 @@ def _init_job(
self.jobs[_id].set_config(job)
else:
LOG.debug(f'Creating new job {_id}')
self.jobs[_id] = self.job_class(_id, tenant, self.resources)
self.jobs[_id] = self.job_class(_id, tenant, self.resources, self)
self.jobs[_id].set_config(job)

def list_jobs(self, tenant: str):
Expand Down
10 changes: 7 additions & 3 deletions aet/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ def _default_filter_config(self) -> FilterConfig:
pass_conditions=self.config.get("aether_emit_flag_values"))

def _get_topic_filter_config(self, topic) -> FilterConfig:
return self._topic_filter_configs.get(topic, None)
if topic not in self._topic_filter_configs:
self.set_topic_filter_config(topic, self._default_filter_config())
return self._topic_filter_configs.get(topic)

def get_approval_filter(self, config: FilterConfig):
# If {aether_emit_flag_required} is True, each message is checked for a passing value.
Expand Down Expand Up @@ -144,6 +146,8 @@ def _default_mask_config(self):
emit_level=self.config.get("aether_masking_schema_emit_level"))

def _get_topic_mask_config(self, topic):
if topic not in self._topic_mask_configs:
self.set_topic_mask_config(topic, self._default_mask_config())
return self._topic_mask_configs.get(topic, None)

def get_mask_from_schema(self, schema, config: MaskConfig):
Expand Down Expand Up @@ -272,8 +276,8 @@ def _reader_to_messages(self, reader, last_schema, mask, approval_filter, topic)
# we get a mess of unicode that can't be json parsed so we need ast
raw_schema = ast.literal_eval(str(reader.meta))
schema = json.loads(raw_schema.get("avro.schema"))
filter_config = self._get_topic_filter_config(topic) or self._default_filter_config()
mask_config = self._get_topic_mask_config(topic) or self._default_mask_config()
filter_config = self._get_topic_filter_config(topic)
mask_config = self._get_topic_mask_config(topic)

if schema != last_schema.get(topic):
last_schema[topic] = schema
Expand Down
10 changes: 10 additions & 0 deletions aet/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
class Settings(dict):
# A container for our settings
def __init__(self, file_path=None, alias=None, exclude=None):
# takes precident over env and initial values. Case sensitive
# useful for tests
self.overrides = {}
if not exclude:
self.exclude = []
else:
Expand All @@ -39,13 +42,20 @@ def get(self, key, default=None):
except KeyError:
return default

def override(self, key, value):
self.overrides[key] = value

def __getattr__(self, name):
try:
if name in self.overrides:
return self.overrides[name]
super().__getattr__(name)
except AttributeError:
return self.get(name)

def __getitem__(self, key):
if key in self.overrides:
return self.overrides[key]
if self.alias and key in self.alias:
key = self.alias.get(key)
result = os.environ.get(key.upper())
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ services:
# ---------------------------------

zookeeper-test:
image: confluentinc/cp-zookeeper:5.2.1
image: confluentinc/cp-zookeeper:5.5.1
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
extra_hosts:
- moby:127.0.0.1

kafka-test:
image: confluentinc/cp-kafka:5.2.1
image: confluentinc/cp-kafka:5.5.1
links:
- zookeeper-test
environment:
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ python_files = tests/test*.py
addopts = --maxfail=100 -p no:warnings
# For super verbose tests...
# log_cli = 1
# log_cli_level = ERROR
# log_cli_level = DEBUG
# log_cli_format = %(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)
# log_cli_date_format=%Y-%m-%d %H:%M:%S

Expand Down
45 changes: 45 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,12 @@ def share_resource(self, _id, prop):
return r.definition.get(prop)


class StuckJob(IJob):
def _get_messages(self, config):
sleep(60)
return []


class MockCallable(object):
value: Optional[Task] = None

Expand Down Expand Up @@ -346,6 +352,23 @@ def __init__(self, CON_CONF, KAFKA_CONF):
)


class MockStuckConsumer(BaseConsumer):
def __init__(self, CON_CONF, KAFKA_CONF):
_settings = CON_CONF.copy()
CON = settings.Settings(_settings)
CON.override('MAX_JOB_IDLE_SEC', 1)
CON.override('DUMP_STACK_ON_TIMEOUT', True)
CON.override('EXPOSE_PORT', 7099)
LOG.debug(CON.copy())
self.job_class = StuckJob
super(MockStuckConsumer, self).__init__(
CON,
KAFKA_CONF,
self.job_class,
redis_instance=get_redis()
)


def send_plain_messages(producer, topic, schema, messages, encoding, is_json=True):
for msg in messages:
if is_json:
Expand Down Expand Up @@ -616,6 +639,17 @@ def mocked_consumer():
sleep(.5)


@pytest.mark.unit
@pytest.fixture(scope="module")
def mocked_stuck_consumer():
consumer = MockStuckConsumer(settings.CONSUMER_CONFIG, settings.KAFKA_CONFIG)
LOG.debug('Starting mocked_stuck_consumer')
yield consumer
LOG.debug('Fixture mocked_stuck_consumer complete, stopping.')
consumer.stop()
sleep(.5)


# API Assets
@pytest.mark.unit
@pytest.fixture(scope="module")
Expand All @@ -626,3 +660,14 @@ def mocked_api(mocked_consumer) -> Iterable[APIServer]:
mocked_consumer.stop()
# api.stop()
sleep(.5)


@pytest.mark.unit
@pytest.fixture(scope="module")
def mocked_stuck_api(mocked_stuck_consumer) -> Iterable[APIServer]:
yield mocked_stuck_consumer.api
# teardown
LOG.debug('Fixture api complete, stopping.')
mocked_stuck_consumer.stop()
# api.stop()
sleep(.5)
29 changes: 29 additions & 0 deletions tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,10 @@ def test_publishing_enum_pass(default_consumer_args,
topic_size), "Should have generated the right number of messages"
# set configs
consumer_kwargs = default_consumer_args
consumer_kwargs["aether_emit_flag_required"] = True
consumer_kwargs["aether_emit_flag_values"] = publish_on
# get messages for this emit level
LOG.debug(json.dumps(consumer_kwargs, indent=2))
iter_consumer = KafkaConsumer(**consumer_kwargs)
iter_consumer.subscribe([topic])
iter_consumer.seek_to_beginning()
Expand Down Expand Up @@ -586,6 +588,33 @@ def test_api_post_calls(call, result, json_body, raises_error, mocked_api):
assert(result(val)), val


@pytest.mark.unit
@pytest.mark.parametrize("call,method,json_body,raises_error", [
('job/add', 'POST', {
"id": "old_timer", "resources": [], "poll_interval": 1}, False),
('health', 'GET', None, True),
])
def test_api_stuck_job(call, method, json_body, raises_error, mocked_stuck_api):
user = settings.CONSUMER_CONFIG.get('ADMIN_USER')
pw = settings.CONSUMER_CONFIG.get('ADMIN_PW')
auth = requests.auth.HTTPBasicAuth(user, pw)
port = 7099
url = f'http://localhost:{port}/{call}'
req = requests.Request(method, url, json=json_body, auth=auth)
with requests.Session() as s:
res = s.send(req.prepare())
try:
LOG.debug(res.text)
res.raise_for_status()
if raises_error:
assert(False), res
sleep(2)
except Exception:
if not raises_error:
assert(False), res


######
#
# Job Manager Tests
Expand Down

0 comments on commit b620552

Please sign in to comment.