Skip to content

Commit

Permalink
rework: threads all the way down (#44)
Browse files Browse the repository at this point in the history
A pretty sizeable rework to the underlying code but no changes to the API

ALL flushing to Supergood is done in a separate thread. Currently the library uses a synchronous scheme when it detects it is operating in a fork. This was in an effort to optimize data integrity, but it does add additional delay.

Instead, a worker thread is spun up just like it would have been on the main thread. What this means is that sometimes a process that shuts down too quickly after finishing their HTTP calls won't end up flushing to Supergood.

In the future, we intend on utilizing a separately deployable agent that can manage all this off-process, but for now this is my best effort to balance data integrity without compromising speed.

---------

Co-authored-by: Steve Bunting <steve@supergood.ai>
  • Loading branch information
stevenbunting and Steve Bunting authored Dec 2, 2024
1 parent 1480cba commit 7ed21c8
Show file tree
Hide file tree
Showing 20 changed files with 356 additions and 433 deletions.
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ jobs:
pytest tests/test_core.py
pytest tests/test_ignored_domains.py
pytest tests/test_remote_config.py
pytest tests/test_repeating_thread.py
pytest tests/caching/test_location_request_body.py
pytest tests/caching/test_location_request_headers.py
pytest tests/redaction/test_no_redaction.py
Expand Down
271 changes: 67 additions & 204 deletions src/supergood/client.py

Large diffs are not rendered by default.

125 changes: 125 additions & 0 deletions src/supergood/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import os
import threading
from collections.abc import Callable
from queue import Empty, Full, Queue
from time import sleep
from typing import Optional


# A version of the threading.Timer class with 2 differences:
# - instead of running the function once after <interval> seconds, it loops and runs again <interval> seconds later
# - also, runs the provided function immediately on start (for initital config fetch)
class Repeater(threading.Timer):
def run(self):
if not self.finished.is_set():
self.function(*self.args, **self.kwargs)
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)


class Worker:
def __init__(self, repeater):
# type: (Callable[[dict],None], Optional[int]) -> None
# print(f"[{os.getpid()}] worker init")
self._queue = Queue(42)
self._lock = threading.Lock()
self._thread = None
self._pid = None
self._fn = repeater

@property
def is_alive(self):
# type: () -> bool
if self._pid != os.getpid():
# This case occurs when an initialized client has been forked
# threads do not get persisted on fork, so they must be re-started
return False
if not self._thread:
return True
return self._thread.is_alive()

def _ensure_running(self):
# type: () -> None
if not self.is_alive:
self.start()

def start(self):
# type: () -> None
with self._lock:
if not self.is_alive:
self._thread = threading.Thread(
target=self._run, name="supergood-repeater"
)
self._thread.daemon = True
try:
self._thread.start()
self._pid = os.getpid()
except RuntimeError:
# thread init failed.
# May be out of available thread ids, or shutting down
self._thread = None

def flush(self):
# type: () -> None
with self._lock:
if self._thread:
try:
self._queue.put_nowait({})
except Full:
# full, drop events
pass

def kill(self):
# type: () -> None
with self._lock:
if self._thread:
try:
self._queue.put_nowait(None)
except Full:
# full, drop events
pass
self._thread = None
self._pid = None

def append(self, entry):
# type: (dict) -> None
self._ensure_running()
with self._lock:
try:
self._queue.put(entry)
return True
except Full as e:
# full, drop events
return False

def _run(self):
# type: () -> None
while True:
entries = {}
# get() blocks here. it should receive a None object to terminate gracefully
entry = self._queue.get()
if entry is None:
# terminate
return
entries.update(entry)
# once we've gotten _a_ thing, check to see if we can bundle a few together. Up to 10
terminate = False
for _ in range(10):
try:
entry = self._queue.get_nowait()
if entry is None:
# terminate
terminate = True
break
entries.update(entry)
except Empty:
# nothing else to do immediately, flush what you got
break

if len(entries) != 0:
# TODO: invoke this with a timeout?
self._fn(entries)
elif terminate:
return
else:
sleep(0)
8 changes: 6 additions & 2 deletions tests/caching/test_location_request_body.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ def test_request_body(self, httpserver, supergood_client):
url=httpserver.url_for("/200"),
data="blah scoobydoobydoo blah",
)
supergood_client.flush_cache()
# verify that the event was not appended to the worker
entries = supergood_client.flush_thread.append.call_args
assert entries is None
assert Api.post_events.call_args is None
requests.request(
method="get", url=httpserver.url_for("/200"), data="blah scrappydootoo blah"
)
supergood_client.flush_cache()
# in this case the event _was_ added to the worker
entries = supergood_client.flush_thread.append.call_args[0][0]
supergood_client.flush_cache(entries)
args = Api.post_events.call_args[0][0]
assert len(args) == 1
7 changes: 4 additions & 3 deletions tests/caching/test_location_request_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ def test_request_headers(self, httpserver, supergood_client):
}
)
requests.get(httpserver.url_for("/200"), headers={"X-test": "scoobydoo"})
supergood_client.flush_cache()
assert Api.post_events.call_args is None
entries = supergood_client.flush_thread.append.call_args
assert entries is None
requests.get(httpserver.url_for("/200"), headers={"X-test": "scrappydootoo"})
supergood_client.flush_cache()
entries = supergood_client.flush_thread.append.call_args[0][0]
supergood_client.flush_cache(entries)
args = Api.post_events.call_args[0][0]
assert len(args) == 1
7 changes: 4 additions & 3 deletions tests/caching/test_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ def test_method_matching(self, httpserver: HTTPServer, supergood_client):
)
# First call is ignored due to matching the ignored POST methods
assert response1.json()["string"] == "abc"
supergood_client.flush_cache()
assert Api.post_events.call_args is None
entries = supergood_client.flush_thread.append.call_args
assert entries is None
response2 = requests.request(method="get", url=httpserver.url_for("/200"))
# Second call is cached and flushed because it does not match (i.e. is a new endpoint)
assert response2.json()["string"] == "def"
supergood_client.flush_cache()
entries = supergood_client.flush_thread.append.call_args[0][0]
supergood_client.flush_cache(entries)
args = Api.post_events.call_args[0][0]
assert len(args) == 1
105 changes: 37 additions & 68 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,48 @@
from unittest.mock import MagicMock

import pytest

from supergood import Client
from tests.helper import get_config, get_remote_config


@pytest.fixture(scope="session")
def broken_redaction(session_mocker):
session_mocker.patch(
"supergood.client.redact_values", side_effect=Exception
).start()
yield session_mocker


@pytest.fixture(scope="session")
@pytest.fixture(scope="function")
def monkeysession():
with pytest.MonkeyPatch.context() as mp:
yield mp


@pytest.fixture(scope="session")
def broken_client(broken_redaction, monkeysession):
config = get_config()
remote_config = get_remote_config()
broken_redaction.patch("supergood.api.Api.post_events", return_value=None).start()
broken_redaction.patch("supergood.api.Api.post_errors", return_value=None).start()
broken_redaction.patch(
"supergood.api.Api.get_config", return_value=remote_config
).start()
Client.initialize(
client_id="client_id",
client_secret_id="client_secret_id",
base_url="https://api.supergood.ai",
telemetry_url="https://telemetry.supergood.ai",
config=config,
)
monkeysession.setenv("SG_OVERRIDE_AUTO_FLUSH", "false")
monkeysession.setenv("SG_OVERRIDE_AUTO_CONFIG", "false")
Client._get_config()
yield Client
Client.kill() # on exit


@pytest.fixture(scope="session")
def supergood_client(request, session_mocker, monkeysession):
# Allows for a param dictionary to control behavior
# currently looks for "config" "remote_config" and "auto"
config = get_config()
auto = False
remote_config = get_remote_config()
if getattr(request, "param", None):
if request.param.get("config", None):
config = request.param["config"]
if request.param.get("remote_config", None):
remote_config = request.param["remote_config"]
if request.param.get("auto", None):
auto = request.param["auto"]

session_mocker.patch("supergood.api.Api.post_events", return_value=None).start()
session_mocker.patch("supergood.api.Api.post_errors", return_value=None).start()
session_mocker.patch(
"supergood.api.Api.get_config", return_value=remote_config
).start()
session_mocker.patch("supergood.api.Api.post_telemetry", return_value=None).start()

if not auto:
monkeysession.setenv("SG_OVERRIDE_AUTO_FLUSH", "false")
monkeysession.setenv("SG_OVERRIDE_AUTO_CONFIG", "false")

Client.initialize(
client_id="client_id",
client_secret_id="client_secret_id",
base_url="https://api.supergood.ai",
telemetry_url="https://telemetry.supergood.ai",
config=config,
)
Client._get_config()
yield Client
Client.kill() # on exit
@pytest.fixture
def supergood_client(request, mocker):
with pytest.MonkeyPatch.context() as mp:
config = get_config()
remote_config = get_remote_config()

if getattr(request, "param", None):
if request.param.get("config", None):
config = request.param["config"]
if request.param.get("remote_config", None):
remote_config = request.param["remote_config"]

Client.initialize(
client_id="client_id",
client_secret_id="client_secret_id",
base_url="https://api.supergood.ai",
telemetry_url="https://telemetry.supergood.ai",
config=config,
)
# first 3 are just to make sure we don't post anything externally
mocker.patch("supergood.api.Api.post_events", return_value=None).start()
mocker.patch("supergood.api.Api.post_errors", return_value=None).start()
mocker.patch("supergood.api.Api.post_telemetry", return_value=None).start()
# next we make sure we don't call get externally, and stub in our remote config
mocker.patch("supergood.api.Api.get_config", return_value=remote_config).start()
# Turns off the worker, pytest mocks don't always play well with threads
mocker.patch("supergood.worker.Worker.start", return_value=None).start()
mocker.patch("supergood.worker.Repeater.start", return_value=None).start()
mocker.patch("supergood.worker.Worker.append", return_value=True).start()
mp.setenv("SG_OVERRIDE_AUTO_FLUSH", "false")
mp.setenv("SG_OVERRIDE_AUTO_CONFIG", "false")
Client._get_config()
yield Client
Client.kill()
35 changes: 21 additions & 14 deletions tests/redaction/test_no_redaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,28 @@
class TestNoRedaction:
def test_ignore_redaction(self, httpserver, supergood_client):
supergood_client.base_config["ignoreRedaction"] = True
httpserver.expect_request("/200").respond_with_json(
{
"string": "abc",
"complex_string": "Alex Klarfeld 911!",
# httpserver.expect_request("/200").respond_with_json(
# {
# "string": "abc",
# "complex_string": "Alex Klarfeld 911!",
# }
# )
# requests.get(httpserver.url_for("/200"))
inputs = {
"request_id": {
"request": {
"body": "",
"headers": {},
"method": "GET",
"url": httpserver.url_for("/200"),
},
"response": {"body": {"string": "abc"}, "headers": {}},
}
)
requests.get(httpserver.url_for("/200"))
supergood_client.flush_cache()
}
supergood_client.flush_cache(inputs)
args = Api.post_events.call_args[0][0]
response_body = args[0]["response"]["body"]
assert response_body["string"] == "abc" # not redacted!
assert response_body["complex_string"] == "Alex Klarfeld 911!"
assert "metadata" in args[0]
assert args[0]["metadata"] == {
"endpointId": "endpoint-id",
"vendorId": "vendor-id",
}

def test_no_redaction(self, httpserver, supergood_client):
httpserver.expect_request("/201").respond_with_json(
Expand All @@ -39,7 +44,9 @@ def test_no_redaction(self, httpserver, supergood_client):
}
)
requests.get(httpserver.url_for("/201"))
supergood_client.flush_cache()
append_call = supergood_client.flush_thread.append.call_args[0][0]
supergood_client.flush_cache(append_call)

args = Api.post_events.call_args[0][0]
body = args[0]["response"]["body"]
assert body["string"] == "abc"
Expand Down
3 changes: 2 additions & 1 deletion tests/redaction/test_redact_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def test_redact_all(self, httpserver, supergood_client):
}
)
requests.get(httpserver.url_for("/200"))
supergood_client.flush_cache()
entries = supergood_client.flush_thread.append.call_args[0][0]
supergood_client.flush_cache(entries)
args = Api.post_events.call_args[0][0]
response_body = args[0]["response"]["body"]
metadata = args[0]["metadata"]
Expand Down
9 changes: 6 additions & 3 deletions tests/redaction/test_redact_arrays.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def test_redact_array_element(self, httpserver, supergood_client):
}
)
requests.get(httpserver.url_for("/200"))
supergood_client.flush_cache()
entries = supergood_client.flush_thread.append.call_args[0][0]
supergood_client.flush_cache(entries)
args = Api.post_events.call_args[0][0]
body = args[0]["response"]["body"]
metadata = args[0]["metadata"]
Expand All @@ -51,7 +52,8 @@ def test_redact_nested_array_element(self, httpserver, supergood_client):
}
)
requests.get(httpserver.url_for("/200"))
supergood_client.flush_cache()
entries = supergood_client.flush_thread.append.call_args[0][0]
supergood_client.flush_cache(entries)
args = Api.post_events.call_args[0][0]
body = args[0]["response"]["body"]
metadata = args[0]["metadata"]
Expand All @@ -77,7 +79,8 @@ def test_redact_array_sub_element(self, httpserver, supergood_client):
}
)
requests.get(httpserver.url_for("/200"))
supergood_client.flush_cache()
entries = supergood_client.flush_thread.append.call_args[0][0]
supergood_client.flush_cache(entries)
args = Api.post_events.call_args[0][0]
body = args[0]["response"]["body"]
assert len(body) == 1
Expand Down
Loading

0 comments on commit 7ed21c8

Please sign in to comment.