From 98c7e1abfefc7c0ad2cafb84d53055855951f7c2 Mon Sep 17 00:00:00 2001 From: Steve Bunting Date: Thu, 5 Dec 2024 10:12:05 -0800 Subject: [PATCH] Revert "rework: threads all the way down (#44)" This reverts commit 7ed21c81b8d346b20f3662738ce75fe50ab7cd96. --- .github/workflows/test.yml | 1 + src/supergood/client.py | 271 +++++++++++++----- src/supergood/worker.py | 125 -------- tests/caching/test_location_request_body.py | 8 +- .../caching/test_location_request_headers.py | 7 +- tests/caching/test_method.py | 7 +- tests/conftest.py | 105 ++++--- tests/redaction/test_no_redaction.py | 35 +-- tests/redaction/test_redact_all.py | 3 +- tests/redaction/test_redact_arrays.py | 9 +- tests/redaction/test_redact_by_default.py | 6 +- tests/redaction/test_redaction.py | 6 +- tests/redaction/test_redaction_failures.py | 13 +- tests/redaction/test_top_level_redaction.py | 3 +- tests/test_core.py | 109 ++++--- tests/test_dont_log.py | 20 +- tests/test_ignored_domains.py | 22 +- tests/test_remote_config.py | 6 +- tests/test_repeating_thread.py | 29 ++ tests/vendors/test_httpx.py | 4 +- 20 files changed, 433 insertions(+), 356 deletions(-) delete mode 100644 src/supergood/worker.py create mode 100644 tests/test_repeating_thread.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d5a9928..9ae41fd 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -35,6 +35,7 @@ jobs: pytest tests/test_core.py pytest tests/test_ignored_domains.py pytest tests/test_remote_config.py + pytest tests/test_repeating_thread.py pytest tests/caching/test_location_request_body.py pytest tests/caching/test_location_request_headers.py pytest tests/redaction/test_no_redaction.py diff --git a/src/supergood/client.py b/src/supergood/client.py index 78eab39..4093d28 100644 --- a/src/supergood/client.py +++ b/src/supergood/client.py @@ -23,12 +23,12 @@ ) from .logger import Logger from .remote_config import get_vendor_endpoint_from_config, parse_remote_config_json +from .repeating_thread import RepeatingThread from .vendors.aiohttp import patch as patch_aiohttp from .vendors.http import patch as patch_http from .vendors.httpx import patch as patch_httpx from .vendors.requests import patch as patch_requests from .vendors.urllib3 import patch as patch_urllib3 -from .worker import Repeater, Worker load_dotenv() @@ -47,6 +47,8 @@ def initialize( metadata={}, ): self.uninitialized = False + # This PID is used to detect when the client is running in a forked process + self.main_pid = os.getpid() # Storage for thread-local tags self.thread_local = threading.local() self.base_url = base_url if base_url else DEFAULT_SUPERGOOD_BASE_URL @@ -72,8 +74,8 @@ def initialize( self.base_config = DEFAULT_SUPERGOOD_CONFIG self.base_config.update(config) - # # By default will spin up threads to handle flushing and config fetching - # # can be changed by setting the appropriate config variable + # By default will spin up threads to handle flushing and config fetching + # can be changed by setting the appropriate config variable auto_flush = True auto_config = True if not self.base_config["runThreads"]: @@ -88,18 +90,26 @@ def initialize( self.log = Logger(self.__class__.__name__, self.base_config, self.api) self.remote_config = None - self.remote_config_thread = Repeater(10, self._get_config) - self.remote_config_thread.daemon = True if auto_config and self.base_config["useRemoteConfig"]: - self.remote_config_thread.start() + self.remote_config_initial_pull = threading.Thread( + daemon=True, target=self._get_config + ) + self.remote_config_initial_pull.start() elif not self.base_config["useRemoteConfig"]: self.log.debug("Running supergood in remote config off mode!") else: self.log.debug("auto config off. Remember to request manually") + self.remote_config_refresh_thread = RepeatingThread( + self._get_config, self.base_config["configInterval"] / 1000 + ) + if auto_config and self.base_config["useRemoteConfig"]: + self.remote_config_refresh_thread.start() + self.api.set_logger(self.log) self._request_cache = {} + self._response_cache = {} # Initialize patches here patch_requests(self._cache_request, self._cache_response) @@ -108,24 +118,18 @@ def initialize( patch_aiohttp(self._cache_request, self._cache_response) patch_httpx(self._cache_request, self._cache_response) - self.flush_thread = Worker(self.flush_cache) + self.flush_thread = RepeatingThread( + self.flush_cache, self.base_config["flushInterval"] / 1000 + ) + self.flush_lock = threading.Lock() if auto_flush: self.flush_thread.start() else: self.log.debug("auto flush off, remember to flush manually") - # Exit gracefully when possible - atexit.register(self.close) - - def close(self) -> None: - try: - self.log.debug("Closing client auto-flush, force flushing remaining cache") - except ValueError: - # logfile is already closed, cannot log - pass - self.flush_thread.flush() - self.flush_thread.kill() - self.remote_config_thread.cancel() + # On clean exit, or terminated exit - exit gracefully + if self.base_config["runThreads"]: + atexit.register(self.close) def _build_log_payload(self, urls=None, size=None, num_events=None): payload = {} @@ -267,29 +271,45 @@ def _cache_response( "statusText": safe_decode(response_status_text), "respondedAt": datetime.utcnow().strftime(self.time_format), } - # Append to flush worker. If the worker is not started yet, this will start it. - self.flush_thread.append( - { - request_id: { - "request": request["request"], - "response": response, - "metadata": request.get("metadata", {}), - } + if os.getpid() == self.main_pid: + # If we're in the main thread, push to the cache + self._response_cache[request_id] = { + "request": request["request"], + "response": response, + "metadata": request.get("metadata", {}), } - ) + else: + # Otherwise, flush synchronously + self.sync_flush_cache( + [ + { + "request": request["request"], + "response": response, + "metadata": request.get("metadata", {}), + } + ] + ) + except Exception: url = None if request and request.get("request", None): - url = request.get("request").get("url", None) + url = request.get("request").get("url") payload = self._build_log_payload(urls=[url] if url else []) trace = "".join(traceback.format_exc()) self.log.error(ERRORS["CACHING_RESPONSE"], trace, payload) + def close(self) -> None: + self.log.debug("Closing client auto-flush, force flushing remaining cache") + self.flush_thread.cancel() + self.remote_config_refresh_thread.cancel() + self.flush_cache(force=True) + def kill(self) -> None: self.log.debug("Killing client auto-flush, deleting remaining cache.") - self.flush_thread.kill() - self.remote_config_thread.cancel() + self.flush_thread.cancel() + self.remote_config_refresh_thread.cancel() self._request_cache.clear() + self._response_cache.clear() def _get_config(self) -> None: try: @@ -303,50 +323,167 @@ def _get_config(self) -> None: trace = "".join(traceback.format_exc()) self.log.error(ERRORS["FETCHING_CONFIG"], trace, payload) - def flush_cache(self, entries: dict) -> None: + def _take_lock(self, block=False) -> bool: + return self.flush_lock.acquire(blocking=block) + + def _release_lock(self): + try: + self.flush_lock.release() + except RuntimeError: # releasing a non-held lock + payload = self._build_log_payload() + trace = "".join(traceback.format_exc()) + self.log.error(ERRORS["LOCK_STATE"], trace, payload) + + def flush_cache(self, force=False) -> None: + # In remote config mode, don't flush until a remote config is fetched if self.remote_config is None and self.base_config["useRemoteConfig"]: self.log.info("Config not loaded yet, cannot flush") return - data = list(entries.values()) + # if we're not force flushing, and another flush is in progress, just skip + # if we are force flushing, block until the previous flush completes. + acquired = self._take_lock(block=force) + if acquired == False: + self.log.info("Flush already in progress, skipping") + return + # FLUSH LOCK PROTECTION START + response_keys = [] + request_keys = [] try: - # In force redact all mode, always force redact everything - if self.base_config["forceRedactAll"]: - redact_all(data, self.remote_config, by_default=False) - # In redact by default mode, redact any non-allowed keys - elif self.base_config["redactByDefault"]: - redact_all(data, self.remote_config, by_default=True) - # Otherwise, redact using the remote config in remote config mode - elif self.base_config["useRemoteConfig"]: - to_delete = redact_values( - data, - self.remote_config, - self.base_config, - ) - if to_delete: - data = [ - item for (ind, item) in enumerate(data) if ind not in to_delete - ] + response_keys = list(self._response_cache.keys()) + request_keys = list(self._request_cache.keys()) + # If there are no responses in cache, just exit + if len(response_keys) == 0 and not force: + return + + # If we're forcing a flush but there's nothing in the cache, exit here + if force and len(response_keys) == 0 and len(request_keys) == 0: + return + + data = list(self._response_cache.values()) + + if force: + data += list(self._request_cache.values()) + try: + # In force redact all mode, always force redact everything + if self.base_config["forceRedactAll"]: + redact_all(data, self.remote_config, by_default=False) + # In redact by default mode, redact any non-allowed keys + elif self.base_config["redactByDefault"]: + redact_all(data, self.remote_config, by_default=True) + # Otherwise, redact using the remote config in remote config mode + elif self.base_config["useRemoteConfig"]: + to_delete = redact_values( + data, + self.remote_config, + self.base_config, + ) + if to_delete: + data = [ + item + for (ind, item) in enumerate(data) + if ind not in to_delete + ] + except Exception: + urls = [] + for entry in data: + if entry.get("request", None): + urls.append(entry.get("request").get("url")) + payload = self._build_log_payload(num_events=len(data), urls=urls) + trace = "".join(traceback.format_exc()) + self.log.error(ERRORS["REDACTION"], trace, payload) + else: # Only post if no exceptions + self.log.debug(f"Flushing {len(data)} items") + try: + self.api.post_telemetry( + { + "numResponseCacheKeys": len(response_keys), + "numRequestCacheKeys": len(request_keys), + } + ) + except Exception as e: + # telemetry post is nice to have, if it fails just log and ignore + self.log.warning(f"Error posting telemetry: {e}") + self.api.post_events(data) except Exception: - urls = [] - for entry in data: - if entry.get("request", None): - urls.append(entry.get("request").get("url")) - payload = self._build_log_payload(num_events=len(data), urls=urls) trace = "".join(traceback.format_exc()) - self.log.error(ERRORS["REDACTION"], trace, payload) - else: # Only post if no exceptions - self.log.debug(f"Flushing {len(data)} items") + payload = "" try: - self.api.post_telemetry( - { - "numEntries": len(entries), - } - ) - except Exception as e: - # telemetry post is nice to have, if it fails just log and ignore - self.log.warning(f"Error posting telemetry: {e}") - self.api.post_events(data) + urls = [] + for entry in data: + if entry.get("request", None): + urls.append(entry.get("request").get("url")) + num_events = len(data) + payload = self._build_log_payload(num_events=num_events, urls=urls) + except Exception: + # something is really messed up, just report out + payload = self._build_log_payload() + self.log.error(ERRORS["POSTING_EVENTS"], trace, payload) + finally: # always occurs, even from internal returns + for response_key in response_keys: + self._response_cache.pop(response_key, None) + if force: + for request_key in request_keys: + self._request_cache.pop(request_key, None) + self.flush_lock.release() + # FLUSH LOCK PROTECTION END + + def sync_flush_cache(self, data) -> None: + """ + If the client detects it is running in a forked process, we probably dont + want to spin up new threads within that fork. Instead, flush each item synchronously + after caching the response. This does have the effect of blocking until the flush occurs + """ + if self.remote_config is None and self.base_config["useRemoteConfig"]: + # Forked processes get a copy of the remote config (if it has been pulled) for free + # however, the config fetch is expensive if it hasn't been pulled yet. + # not a great solution, but for now just drop the event. + # in the future we're planning on potentially writing these to disk for a cleanup job later + self.log.info("Config not loaded yet, cannot flush") + return + + # don't worry about the flush lock because each flush is only handling one event + try: + # don't worry about anything on the cache except for the data provided to us + try: + if self.base_config["forceRedactAll"]: + redact_all(data) + elif self.base_config["useRemoteConfig"]: + to_delete = redact_values( + data, + self.remote_config, + self.base_config, + ) + if to_delete: + data = [ + item + for (ind, item) in enumerate(data) + if ind not in to_delete + ] + except Exception: + urls = [] + for entry in data: + if entry.get("request", None): + urls.append(entry.get("request").get("url")) + payload = self._build_log_payload(num_events=len(data), urls=urls) + trace = "".join(traceback.format_exc()) + self.log.error(ERRORS["REDACTION"], trace, payload) + else: # Only post if no exceptions + self.log.debug(f"Flushing {len(data)} items") + self.api.post_events(data) + except Exception: + trace = "".join(traceback.format_exc()) + try: + urls = [] + for entry in data: + if entry.get("request", None): + urls.append(entry.get("request").get("url")) + num_events = len(data) + payload = self._build_log_payload(num_events=num_events, urls=urls) + except Exception: + # something is really messed up, just report out + payload = self._build_log_payload() + self.log.error(ERRORS["POSTING_EVENTS"], trace, payload) def _format_tags(self, tags): # takes a list of tags (dicts) and rolls them up into one dictionary diff --git a/src/supergood/worker.py b/src/supergood/worker.py deleted file mode 100644 index 3c1b5ca..0000000 --- a/src/supergood/worker.py +++ /dev/null @@ -1,125 +0,0 @@ -import os -import threading -from collections.abc import Callable -from queue import Empty, Full, Queue -from time import sleep -from typing import Optional - - -# A version of the threading.Timer class with 2 differences: -# - instead of running the function once after seconds, it loops and runs again seconds later -# - also, runs the provided function immediately on start (for initital config fetch) -class Repeater(threading.Timer): - def run(self): - if not self.finished.is_set(): - self.function(*self.args, **self.kwargs) - while not self.finished.wait(self.interval): - self.function(*self.args, **self.kwargs) - - -class Worker: - def __init__(self, repeater): - # type: (Callable[[dict],None], Optional[int]) -> None - # print(f"[{os.getpid()}] worker init") - self._queue = Queue(42) - self._lock = threading.Lock() - self._thread = None - self._pid = None - self._fn = repeater - - @property - def is_alive(self): - # type: () -> bool - if self._pid != os.getpid(): - # This case occurs when an initialized client has been forked - # threads do not get persisted on fork, so they must be re-started - return False - if not self._thread: - return True - return self._thread.is_alive() - - def _ensure_running(self): - # type: () -> None - if not self.is_alive: - self.start() - - def start(self): - # type: () -> None - with self._lock: - if not self.is_alive: - self._thread = threading.Thread( - target=self._run, name="supergood-repeater" - ) - self._thread.daemon = True - try: - self._thread.start() - self._pid = os.getpid() - except RuntimeError: - # thread init failed. - # May be out of available thread ids, or shutting down - self._thread = None - - def flush(self): - # type: () -> None - with self._lock: - if self._thread: - try: - self._queue.put_nowait({}) - except Full: - # full, drop events - pass - - def kill(self): - # type: () -> None - with self._lock: - if self._thread: - try: - self._queue.put_nowait(None) - except Full: - # full, drop events - pass - self._thread = None - self._pid = None - - def append(self, entry): - # type: (dict) -> None - self._ensure_running() - with self._lock: - try: - self._queue.put(entry) - return True - except Full as e: - # full, drop events - return False - - def _run(self): - # type: () -> None - while True: - entries = {} - # get() blocks here. it should receive a None object to terminate gracefully - entry = self._queue.get() - if entry is None: - # terminate - return - entries.update(entry) - # once we've gotten _a_ thing, check to see if we can bundle a few together. Up to 10 - terminate = False - for _ in range(10): - try: - entry = self._queue.get_nowait() - if entry is None: - # terminate - terminate = True - break - entries.update(entry) - except Empty: - # nothing else to do immediately, flush what you got - break - - if len(entries) != 0: - # TODO: invoke this with a timeout? - self._fn(entries) - elif terminate: - return - else: - sleep(0) diff --git a/tests/caching/test_location_request_body.py b/tests/caching/test_location_request_body.py index 3442d32..86e709e 100644 --- a/tests/caching/test_location_request_body.py +++ b/tests/caching/test_location_request_body.py @@ -28,15 +28,11 @@ def test_request_body(self, httpserver, supergood_client): url=httpserver.url_for("/200"), data="blah scoobydoobydoo blah", ) - # verify that the event was not appended to the worker - entries = supergood_client.flush_thread.append.call_args - assert entries is None + supergood_client.flush_cache() assert Api.post_events.call_args is None requests.request( method="get", url=httpserver.url_for("/200"), data="blah scrappydootoo blah" ) - # in this case the event _was_ added to the worker - entries = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(entries) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] assert len(args) == 1 diff --git a/tests/caching/test_location_request_headers.py b/tests/caching/test_location_request_headers.py index 61cb84f..7dc4878 100644 --- a/tests/caching/test_location_request_headers.py +++ b/tests/caching/test_location_request_headers.py @@ -24,10 +24,9 @@ def test_request_headers(self, httpserver, supergood_client): } ) requests.get(httpserver.url_for("/200"), headers={"X-test": "scoobydoo"}) - entries = supergood_client.flush_thread.append.call_args - assert entries is None + supergood_client.flush_cache() + assert Api.post_events.call_args is None requests.get(httpserver.url_for("/200"), headers={"X-test": "scrappydootoo"}) - entries = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(entries) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] assert len(args) == 1 diff --git a/tests/caching/test_method.py b/tests/caching/test_method.py index 39532c8..2494a1e 100644 --- a/tests/caching/test_method.py +++ b/tests/caching/test_method.py @@ -29,12 +29,11 @@ def test_method_matching(self, httpserver: HTTPServer, supergood_client): ) # First call is ignored due to matching the ignored POST methods assert response1.json()["string"] == "abc" - entries = supergood_client.flush_thread.append.call_args - assert entries is None + supergood_client.flush_cache() + assert Api.post_events.call_args is None response2 = requests.request(method="get", url=httpserver.url_for("/200")) # Second call is cached and flushed because it does not match (i.e. is a new endpoint) assert response2.json()["string"] == "def" - entries = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(entries) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] assert len(args) == 1 diff --git a/tests/conftest.py b/tests/conftest.py index 7e07428..fc50ef8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,48 +1,79 @@ -from unittest.mock import MagicMock - import pytest from supergood import Client from tests.helper import get_config, get_remote_config -@pytest.fixture(scope="function") +@pytest.fixture(scope="session") +def broken_redaction(session_mocker): + session_mocker.patch( + "supergood.client.redact_values", side_effect=Exception + ).start() + yield session_mocker + + +@pytest.fixture(scope="session") def monkeysession(): with pytest.MonkeyPatch.context() as mp: yield mp -@pytest.fixture -def supergood_client(request, mocker): - with pytest.MonkeyPatch.context() as mp: - config = get_config() - remote_config = get_remote_config() - - if getattr(request, "param", None): - if request.param.get("config", None): - config = request.param["config"] - if request.param.get("remote_config", None): - remote_config = request.param["remote_config"] - - Client.initialize( - client_id="client_id", - client_secret_id="client_secret_id", - base_url="https://api.supergood.ai", - telemetry_url="https://telemetry.supergood.ai", - config=config, - ) - # first 3 are just to make sure we don't post anything externally - mocker.patch("supergood.api.Api.post_events", return_value=None).start() - mocker.patch("supergood.api.Api.post_errors", return_value=None).start() - mocker.patch("supergood.api.Api.post_telemetry", return_value=None).start() - # next we make sure we don't call get externally, and stub in our remote config - mocker.patch("supergood.api.Api.get_config", return_value=remote_config).start() - # Turns off the worker, pytest mocks don't always play well with threads - mocker.patch("supergood.worker.Worker.start", return_value=None).start() - mocker.patch("supergood.worker.Repeater.start", return_value=None).start() - mocker.patch("supergood.worker.Worker.append", return_value=True).start() - mp.setenv("SG_OVERRIDE_AUTO_FLUSH", "false") - mp.setenv("SG_OVERRIDE_AUTO_CONFIG", "false") - Client._get_config() - yield Client - Client.kill() +@pytest.fixture(scope="session") +def broken_client(broken_redaction, monkeysession): + config = get_config() + remote_config = get_remote_config() + broken_redaction.patch("supergood.api.Api.post_events", return_value=None).start() + broken_redaction.patch("supergood.api.Api.post_errors", return_value=None).start() + broken_redaction.patch( + "supergood.api.Api.get_config", return_value=remote_config + ).start() + Client.initialize( + client_id="client_id", + client_secret_id="client_secret_id", + base_url="https://api.supergood.ai", + telemetry_url="https://telemetry.supergood.ai", + config=config, + ) + monkeysession.setenv("SG_OVERRIDE_AUTO_FLUSH", "false") + monkeysession.setenv("SG_OVERRIDE_AUTO_CONFIG", "false") + Client._get_config() + yield Client + Client.kill() # on exit + + +@pytest.fixture(scope="session") +def supergood_client(request, session_mocker, monkeysession): + # Allows for a param dictionary to control behavior + # currently looks for "config" "remote_config" and "auto" + config = get_config() + auto = False + remote_config = get_remote_config() + if getattr(request, "param", None): + if request.param.get("config", None): + config = request.param["config"] + if request.param.get("remote_config", None): + remote_config = request.param["remote_config"] + if request.param.get("auto", None): + auto = request.param["auto"] + + session_mocker.patch("supergood.api.Api.post_events", return_value=None).start() + session_mocker.patch("supergood.api.Api.post_errors", return_value=None).start() + session_mocker.patch( + "supergood.api.Api.get_config", return_value=remote_config + ).start() + session_mocker.patch("supergood.api.Api.post_telemetry", return_value=None).start() + + if not auto: + monkeysession.setenv("SG_OVERRIDE_AUTO_FLUSH", "false") + monkeysession.setenv("SG_OVERRIDE_AUTO_CONFIG", "false") + + Client.initialize( + client_id="client_id", + client_secret_id="client_secret_id", + base_url="https://api.supergood.ai", + telemetry_url="https://telemetry.supergood.ai", + config=config, + ) + Client._get_config() + yield Client + Client.kill() # on exit diff --git a/tests/redaction/test_no_redaction.py b/tests/redaction/test_no_redaction.py index e5c262f..937f172 100644 --- a/tests/redaction/test_no_redaction.py +++ b/tests/redaction/test_no_redaction.py @@ -9,28 +9,23 @@ class TestNoRedaction: def test_ignore_redaction(self, httpserver, supergood_client): supergood_client.base_config["ignoreRedaction"] = True - # httpserver.expect_request("/200").respond_with_json( - # { - # "string": "abc", - # "complex_string": "Alex Klarfeld 911!", - # } - # ) - # requests.get(httpserver.url_for("/200")) - inputs = { - "request_id": { - "request": { - "body": "", - "headers": {}, - "method": "GET", - "url": httpserver.url_for("/200"), - }, - "response": {"body": {"string": "abc"}, "headers": {}}, + httpserver.expect_request("/200").respond_with_json( + { + "string": "abc", + "complex_string": "Alex Klarfeld 911!", } - } - supergood_client.flush_cache(inputs) + ) + requests.get(httpserver.url_for("/200")) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] response_body = args[0]["response"]["body"] assert response_body["string"] == "abc" # not redacted! + assert response_body["complex_string"] == "Alex Klarfeld 911!" + assert "metadata" in args[0] + assert args[0]["metadata"] == { + "endpointId": "endpoint-id", + "vendorId": "vendor-id", + } def test_no_redaction(self, httpserver, supergood_client): httpserver.expect_request("/201").respond_with_json( @@ -44,9 +39,7 @@ def test_no_redaction(self, httpserver, supergood_client): } ) requests.get(httpserver.url_for("/201")) - append_call = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(append_call) - + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] body = args[0]["response"]["body"] assert body["string"] == "abc" diff --git a/tests/redaction/test_redact_all.py b/tests/redaction/test_redact_all.py index 8c51166..2affc50 100644 --- a/tests/redaction/test_redact_all.py +++ b/tests/redaction/test_redact_all.py @@ -29,8 +29,7 @@ def test_redact_all(self, httpserver, supergood_client): } ) requests.get(httpserver.url_for("/200")) - entries = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(entries) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] response_body = args[0]["response"]["body"] metadata = args[0]["metadata"] diff --git a/tests/redaction/test_redact_arrays.py b/tests/redaction/test_redact_arrays.py index 5d429fd..3c765ba 100644 --- a/tests/redaction/test_redact_arrays.py +++ b/tests/redaction/test_redact_arrays.py @@ -28,8 +28,7 @@ def test_redact_array_element(self, httpserver, supergood_client): } ) requests.get(httpserver.url_for("/200")) - entries = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(entries) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] body = args[0]["response"]["body"] metadata = args[0]["metadata"] @@ -52,8 +51,7 @@ def test_redact_nested_array_element(self, httpserver, supergood_client): } ) requests.get(httpserver.url_for("/200")) - entries = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(entries) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] body = args[0]["response"]["body"] metadata = args[0]["metadata"] @@ -79,8 +77,7 @@ def test_redact_array_sub_element(self, httpserver, supergood_client): } ) requests.get(httpserver.url_for("/200")) - entries = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(entries) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] body = args[0]["response"]["body"] assert len(body) == 1 diff --git a/tests/redaction/test_redact_by_default.py b/tests/redaction/test_redact_by_default.py index f1037e9..1417267 100644 --- a/tests/redaction/test_redact_by_default.py +++ b/tests/redaction/test_redact_by_default.py @@ -30,8 +30,7 @@ def test_redact_by_default(self, httpserver, supergood_client): } ) requests.get(httpserver.url_for("/200")) - entries = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(entries) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] response_body = args[0]["response"]["body"] metadata = args[0]["metadata"] @@ -63,8 +62,7 @@ def test_allowed_keys_of_arrays(self, httpserver, supergood_client): ] ) requests.get(httpserver.url_for("/200")) - entries = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(entries) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] response_body = args[0]["response"]["body"] metadata = args[0]["metadata"] diff --git a/tests/redaction/test_redaction.py b/tests/redaction/test_redaction.py index 6e6389e..89d6ec9 100644 --- a/tests/redaction/test_redaction.py +++ b/tests/redaction/test_redaction.py @@ -33,8 +33,7 @@ def test_redact_one(self, httpserver, supergood_client): } ) requests.get(httpserver.url_for("/200")) - entries = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(entries) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] response_body = args[0]["response"]["body"] metadata = args[0]["metadata"] @@ -57,8 +56,7 @@ def test_each_redaction(self, httpserver, supergood_client): } httpserver.expect_request("/200").respond_with_json(response_json) requests.get(httpserver.url_for("/200")) - entries = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(entries) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] body = args[0]["response"]["body"] for key in response_json: diff --git a/tests/redaction/test_redaction_failures.py b/tests/redaction/test_redaction_failures.py index f2f865c..c9eef2d 100644 --- a/tests/redaction/test_redaction_failures.py +++ b/tests/redaction/test_redaction_failures.py @@ -7,7 +7,7 @@ @pytest.mark.parametrize( - "supergood_client", + "broken_client", [ { "remote_config": get_remote_config( @@ -20,15 +20,12 @@ indirect=True, ) class TestRedactionFails: - def test_redaction_fails(self, httpserver, supergood_client, mocker): + def test_redaction_fails(self, httpserver, broken_client): httpserver.expect_request("/200").respond_with_json({"string": "blah"}) requests.get(httpserver.url_for("/200")) - mocker.patch("supergood.client.redact_values", side_effect=Exception).start() - entries = supergood_client.flush_thread.append.call_args[0][0] - assert len(entries) == 1 # there is something to be flushed! - supergood_client.flush_cache(entries) + assert len(broken_client._response_cache) == 1 # event to flush + broken_client.flush_cache() # redaction fails! logs error but does not flush args = Api.post_events.call_args - assert not args # but nothing got flushed, because redaction failed. - # what did happen was an error post + assert not args assert Api.post_errors.call_count == 1 assert Api.post_errors.call_args[0][2] == ERRORS["REDACTION"] diff --git a/tests/redaction/test_top_level_redaction.py b/tests/redaction/test_top_level_redaction.py index 3218365..0971f8c 100644 --- a/tests/redaction/test_top_level_redaction.py +++ b/tests/redaction/test_top_level_redaction.py @@ -25,8 +25,7 @@ class TestTopLevelRedaction: def test_top_level_redactions(self, httpserver, supergood_client): httpserver.expect_request("/200").respond_with_json({"string": "abc"}) requests.get(httpserver.url_for("/200")) - entries = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(entries) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] assert len(args) == 1 assert args[0]["request"]["body"] == None diff --git a/tests/test_core.py b/tests/test_core.py index eb46892..15c593c 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,14 +1,24 @@ import asyncio +import json +import re +import sys +import time +import unittest +from urllib.parse import urlparse import aiohttp +import pytest import requests import urllib3 from dotenv import load_dotenv from pytest_httpserver import HTTPServer +from werkzeug.wrappers import Response +from supergood import Client from supergood.api import Api from supergood.constants import ERRORS from tests.constants import * +from tests.helper import get_remote_config load_dotenv() @@ -24,8 +34,13 @@ def test_captures_all_outgoing_200_http_requests( for i in range(CALL_COUNT): response = requests.get(httpserver.url_for("/200")) - assert supergood_client.flush_thread.append.call_args_list is not None - assert len(supergood_client.flush_thread.append.call_args_list) == 5 + supergood_client.flush_cache() + assert Api.post_events.call_args is not None + args = Api.post_events.call_args[0][0] + assert args[0]["request"] is not None + assert args[0]["response"] is not None + assert len(args) == CALL_COUNT + supergood_client.kill() def test_captures_non_success_status_and_errors( self, httpserver: HTTPServer, supergood_client @@ -34,18 +49,16 @@ def test_captures_non_success_status_and_errors( for code in http_error_codes: httpserver.expect_request(f"/{code}").respond_with_data(status=code) - requests.get(httpserver.url_for(f"/{code}")) + response = requests.get(httpserver.url_for(f"/{code}")) - assert supergood_client.flush_thread.append.call_args_list is not None + supergood_client.flush_cache() - args = supergood_client.flush_thread.append.call_args_list + args = Api.post_events.call_args[0][0] assert len(args) == len(http_error_codes) - responded_codes = set( - map(lambda x: list(x[0][0].values())[0]["response"]["status"], args) - ) - + responded_codes = set(map(lambda x: x["response"]["status"], args)) assert responded_codes == set(http_error_codes) + supergood_client.kill() def test_post_requests(self, httpserver: HTTPServer, supergood_client): httpserver.expect_request("/200", method="POST").respond_with_json( @@ -54,10 +67,12 @@ def test_post_requests(self, httpserver: HTTPServer, supergood_client): response = requests.post(httpserver.url_for("/200")) response.raise_for_status() json = response.json() + supergood_client.flush_cache() assert json["payload"] == "value" - args = supergood_client.flush_thread.append.call_args[0][0] + args = Api.post_events.call_args[0][0] assert len(args) == 1 + supergood_client.kill() def test_post_request_for_large_payload( self, httpserver: HTTPServer, supergood_client @@ -66,28 +81,48 @@ def test_post_request_for_large_payload( response.raise_for_status() json = response.json() + supergood_client.flush_cache() + assert json.get("mapResults", {}).get("totalItems", 0) >= 0 - args = supergood_client.flush_thread.append.call_args[0][0] - assert list(args.values())[0]["request"] is not None - assert list(args.values())[0]["request"] is not None + args = Api.post_events.call_args[0][0][0] + assert args["request"] is not None + assert args["response"] is not None + supergood_client.kill() + + def test_hanging_response(self, httpserver: HTTPServer, supergood_client): + HANG_TIME_IN_SECONDS = 2 + requests.get(f"{TEST_BED_URL}/200?sleep={HANG_TIME_IN_SECONDS}") + supergood_client.flush_cache() + + args = Api.post_events.call_args[0][0] + assert len(args) == 1 + supergood_client.kill() - def test_posting_errors(self, supergood_client, mocker): - _mock = mocker.patch( - "supergood.worker.Worker.append", side_effect=Exception("Test Error") + def test_posting_errors(self, supergood_client, session_mocker): + _mock = session_mocker.patch( + "supergood.api.Api.post_events", side_effect=Exception("Test Error") ) _mock.start() requests.get(f"{TEST_BED_URL}/200") + supergood_client.flush_cache() error_message = Api.post_errors.call_args[0][2] - assert error_message == ERRORS["CACHING_RESPONSE"] + assert error_message == ERRORS["POSTING_EVENTS"] + + _mock.resetmock() + session_mocker.patch("supergood.api.Api.post_events").start() + supergood_client.kill() def test_different_http_library(self, httpserver: HTTPServer, supergood_client): http = urllib3.PoolManager() http.request("GET", f"{TEST_BED_URL}/200") - args = supergood_client.flush_thread.append.call_args_list + supergood_client.flush_cache() + + args = Api.post_events.call_args[0][0] assert len(args) == 1 + supergood_client.kill() def test_aiohttp_library(self, httpserver: HTTPServer, supergood_client): async def aiohttp_get_request(): @@ -107,13 +142,15 @@ async def aiohttp_post_request(): get_response = asyncio.run(aiohttp_get_request()) post_response = asyncio.run(aiohttp_post_request()) - args = supergood_client.flush_thread.append.call_args_list + supergood_client.flush_cache() + + args = Api.post_events.call_args[0][0] assert len(args) == 2 assert get_response["success"] == "true" assert post_response["success"] == "true" - first = list(args[0][0][0].values())[0] - assert first["response"] is not None - assert first["request"] is not None + assert args[0]["response"] is not None + assert args[0]["request"] is not None + supergood_client.kill() def test_tagging(self, httpserver: HTTPServer, supergood_client): tags = {"m": "mini", "w": "wumbo"} @@ -122,13 +159,13 @@ def test_tagging(self, httpserver: HTTPServer, supergood_client): with supergood_client.tagging(tags): requests.get(httpserver.url_for("/tagging")) - assert supergood_client.flush_thread.append.call_args is not None - args = supergood_client.flush_thread.append.call_args[0][0] - first = list(args.values())[0] - assert first["request"] is not None - assert first["response"] is not None - assert first["metadata"] is not None - assert first["metadata"]["tags"] == tags + supergood_client.flush_cache() + assert Api.post_events.call_args is not None + args = Api.post_events.call_args[0][0] + assert args[0]["request"] is not None + assert args[0]["response"] is not None + assert args[0]["metadata"] is not None + assert args[0]["metadata"]["tags"] == tags def test_layered_tagging(self, httpserver: HTTPServer, supergood_client): outer_tag = {"m": "mini"} @@ -142,13 +179,13 @@ def test_layered_tagging(self, httpserver: HTTPServer, supergood_client): with supergood_client.tagging(inner_tag): requests.get(httpserver.url_for("/inner")) requests.get(httpserver.url_for("/outeragain")) - - assert supergood_client.flush_thread.append.call_args is not None - call_list = supergood_client.flush_thread.append.call_args_list - assert len(call_list) == 3 + supergood_client.flush_cache() + assert Api.post_events.call_args is not None + args = Api.post_events.call_args[0][0] + assert len(args) == 3 # First call, tag shouldbe only the outer tag - assert list(call_list[0][0][0].values())[0]["metadata"]["tags"] == outer_tag + assert args[0]["metadata"]["tags"] == outer_tag # Second call, should have both - assert list(call_list[1][0][0].values())[0]["metadata"]["tags"] == both_tags + assert args[1]["metadata"]["tags"] == both_tags # Third call, back to only outer - assert list(call_list[2][0][0].values())[0]["metadata"]["tags"] == outer_tag + assert args[2]["metadata"]["tags"] == outer_tag diff --git a/tests/test_dont_log.py b/tests/test_dont_log.py index d7355f4..cdc6c4a 100644 --- a/tests/test_dont_log.py +++ b/tests/test_dont_log.py @@ -1,6 +1,5 @@ import pytest import requests -from pytest_httpserver import HTTPServer from supergood.api import Api from tests.helper import get_config @@ -23,14 +22,13 @@ class TestDontLog: ], indirect=True, ) - def test_ignores_fields_when_set(self, httpserver: HTTPServer, supergood_client): - httpserver.expect_request("/ignores").respond_with_data("super secret response") - requests.post(httpserver.url_for("/ignores"), data={"mumbo": "jumbo"}) - - args = supergood_client.flush_thread.append.call_args[0][0] - + def test_ignores_fields_when_set(self, supergood_client): + requests.get(f"{TEST_BED_URL}/200") + supergood_client.flush_cache() + supergood_client.kill() + args = Api.post_events.call_args[0][0] assert len(args) == 1 - assert list(args.values())[0]["request"]["body"] == "" - assert list(args.values())[0]["request"]["headers"] == {} - assert list(args.values())[0]["response"]["body"] == "" - assert list(args.values())[0]["response"]["headers"] == {} + assert args[0]["request"]["body"] == "" + assert args[0]["request"]["headers"] == {} + assert args[0]["response"]["body"] == "" + assert args[0]["response"]["headers"] == {} diff --git a/tests/test_ignored_domains.py b/tests/test_ignored_domains.py index 3144426..cd3936c 100644 --- a/tests/test_ignored_domains.py +++ b/tests/test_ignored_domains.py @@ -1,11 +1,11 @@ import pytest import requests -from pytest_httpserver import HTTPServer from supergood.api import Api from tests.helper import get_config TEST_BED_URL = "http://supergood-testbed.herokuapp.com" +EXTERNAL_URL = "https://api.ipify.org/?format=json" class TestIgnoreDomain: @@ -20,19 +20,11 @@ class TestIgnoreDomain: ], indirect=True, ) - def test_ignores_requests_to_ignored_domains( - self, httpserver: HTTPServer, supergood_client - ): + def test_ignores_requests_to_ignored_domains(self, supergood_client): requests.get(f"{TEST_BED_URL}/200") - args = supergood_client.flush_thread.append.call_args_list - assert len(args) == 0 - httpserver.expect_request("/ignore-domain").respond_with_data("wumbo") - requests.get(httpserver.url_for("/ignore-domain")) - args = supergood_client.flush_thread.append.call_args_list + requests.get(f"{EXTERNAL_URL}") + supergood_client.flush_cache() + supergood_client.kill() + args = Api.post_events.call_args[0][0] assert len(args) == 1 - list(args[0][0][0].values())[0] - # double check that it's the expected domain - assert ( - "http://localhost/ignore-domain" - == list(args[0][0][0].values())[0]["request"]["url"] - ) + assert EXTERNAL_URL == args[0]["request"]["url"] diff --git a/tests/test_remote_config.py b/tests/test_remote_config.py index 6652782..f9364c0 100644 --- a/tests/test_remote_config.py +++ b/tests/test_remote_config.py @@ -27,9 +27,11 @@ def test_client_ignores_before_config(self, httpserver, supergood_client): httpserver.expect_request("/200").respond_with_json({"key": "val"}) requests.get(httpserver.url_for("/200")) assert supergood_client._request_cache == {} + assert supergood_client._response_cache == {} supergood_client._get_config() # Now there's a config httpserver.expect_request("/200").respond_with_json({"key": "val"}) requests.get(httpserver.url_for("/200")) - args = supergood_client.flush_thread.append.call_args[0][0] + supergood_client.flush_cache() + args = Api.post_events.call_args[0][0] assert len(args) == 1 - assert (list(args.values()))[0]["response"]["body"] == {"key": "val"} + assert args[0]["response"]["body"] == {"key": "val"} diff --git a/tests/test_repeating_thread.py b/tests/test_repeating_thread.py new file mode 100644 index 0000000..c9c311c --- /dev/null +++ b/tests/test_repeating_thread.py @@ -0,0 +1,29 @@ +import time + +from supergood.repeating_thread import RepeatingThread + + +class TestRepeatingThread: + def test_repeating_thread(self): + class A: + def inc(self): + self.i = self.i + 1 + + def __init__(self): + self.i = 0 + self.thread = RepeatingThread(self.inc, 1) + + def start(self): + self.thread.start() + + def cancel(self): + self.thread.cancel() + + a = A() + assert a.thread._thread == None + a.start() + for i in range(1, 3): + time.sleep(1.1) + assert a.i == i + a.cancel() + assert a.thread._thread.finished.is_set() diff --git a/tests/vendors/test_httpx.py b/tests/vendors/test_httpx.py index 021fb9a..7564e2d 100644 --- a/tests/vendors/test_httpx.py +++ b/tests/vendors/test_httpx.py @@ -15,10 +15,10 @@ def test_httpx_streaming(self, httpserver: HTTPServer, supergood_client): for data in s.iter_bytes(chunk_size=1): resp += data assert resp.decode("utf-8") == raw_response - entries = supergood_client.flush_thread.append.call_args[0][0] - supergood_client.flush_cache(entries) + supergood_client.flush_cache() args = Api.post_events.call_args[0][0] # verify 2 entries, one each for each response field assert len(args[0]["response"]["body"]) == 2 # verifies valid JSON is indexible assert args[0]["response"]["body"][0]["data"][0]["valid"] == "json" + supergood_client.kill()