Skip to content

Commit

Permalink
⚡️ add state monitoring stream (#1143)
Browse files Browse the repository at this point in the history
* add state monitoring stream

* add new nats streams

* pass state down and dont do check

* import new stream and subject

* update sub for new stream

* accept new state var

* new sub vars added

* remove state check

* update job for new streams added

* update nats tests

* remove imports

* # pylint: disable=protected-access

* update logger

* force passing args by keywords

* update tests

* update comment

---------

Co-authored-by: cl0ete <cloete.dupreez@gmail.com>
Co-authored-by: Mourits de Beer <31511766+ff137@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 1, 2024
1 parent 6881517 commit 96560f1
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 25 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/continuous-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,9 @@ jobs:
nats --creds ./nats.creds \
--server ${{ secrets.NATS_URL }} \
stream rm acapy_events -f
nats --creds ./nats.creds \
--server ${{ secrets.NATS_URL }} \
stream rm cloudapi_aries_state_monitoring -f
nats --creds ./nats.creds \
--server ${{ secrets.NATS_URL }} \
Expand Down Expand Up @@ -638,6 +641,19 @@ jobs:
--replicas 3 \
--compression s2
nats --creds ./nats.creds \
--server ${{ secrets.NATS_URL }} \
stream add cloudapi_aries_state_monitoring --subjects "cloudapi.aries.state_monitoring.*.*.>" \
--defaults \
--storage file \
--replicas 3 \
--compression s2 \
--retention limits \
--discard old \
--max-age 1m \
--dupe-window 1m \
--max-msgs-per-subject 1000
rm -f ./nats.creds
- name: Helmfile Apply # Apply default helmfile (without RDS proxy) when resetting deployments.
Expand Down
2 changes: 2 additions & 0 deletions shared/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,7 @@
NATS_SERVER = os.getenv("NATS_SERVER", "nats://nats:4222")
NATS_SUBJECT = os.getenv("NATS_SUBJECT", "cloudapi.aries.events")
NATS_STREAM = os.getenv("NATS_STREAM", "cloudapi_aries_events")
NATS_STATE_STREAM = os.getenv("NATS_STATE_STREAM", "cloudapi_aries_state_monitoring")
NATS_STATE_SUBJECT = os.getenv("NATS_STATE_SUBJECT", "cloudapi.aries.state_monitoring")
NATS_CREDS_FILE = os.getenv("NATS_CREDS_FILE", "")
ENDORSER_DURABLE_CONSUMER = os.getenv("ENDORSER_DURABLE_CONSUMER", "endorser")
36 changes: 30 additions & 6 deletions tilt/cloudapi/nats/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,34 @@ spec:
{{- range $stream, $config := .Values.postInstall.streams }}
nats --server nats://{{ template "common.names.fullname" $ }}:{{ default 4222 $.Values.nats.service.ports.client }} stream add {{ $stream }} \
--subjects {{ join "," $config.subjects | quote }} \
{{- if $config.defaults }}
--defaults \
{{- end }}
--storage {{ $config.storage }} \
--replicas={{ $.Values.nats.replicaCount }}
--subjects {{ join "," $config.subjects | quote }} \
{{- if $config.defaults }}
--defaults \
{{- end }}
--storage {{ $config.storage }} \
{{- if $config.compression }}
--compression {{ $config.compression }} \
{{- end }}
{{- if $config.maxAge }}
--max-age {{ $config.maxAge }} \
{{- end }}
{{- if $config.maxBytes }}
--max-bytes {{ $config.maxBytes }} \
{{- end }}
{{- if $config.maxMsgs }}
--max-msgs {{ $config.maxMsgs }} \
{{- end }}
{{- if $config.maxMsgsPerSubject }}
--max-msgs-per-subject {{ $config.maxMsgsPerSubject }} \
{{- end }}
{{- if $config.retention }}
--retention {{ $config.retention }} \
{{- end }}
{{- if $config.discard }}
--discard {{ $config.discard }} \
{{- end }}
{{- if $config.duplicateWindow }}
--dupe-window {{ $config.duplicateWindow }} \
{{- end }}
--replicas={{ $.Values.nats.replicaCount }}
{{- end }}
10 changes: 10 additions & 0 deletions tilt/cloudapi/nats/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ postInstall:
- cloudapi.aries.events.*.*
defaults: true
storage: file
cloudapi_aries_state_monitoring:
subjects:
- cloudapi.aries.state_monitoring.*.*.>
defaults: true
storage: file
retention: limits
discard: old
maxAge: 1m
duplicateWindow: 1m
maxMsgsPerSubject: 1000
acapy_events:
subjects:
- acapy.>
Expand Down
3 changes: 2 additions & 1 deletion waypoint/routers/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ async def nats_event_stream_generator(
group_id=group_id,
wallet_id=wallet_id,
topic=topic,
state=desired_state,
stop_event=stop_event,
duration=SSE_TIMEOUT,
look_back=look_back,
Expand All @@ -65,7 +66,7 @@ async def nats_event_stream_generator(
break

payload = event.payload
if payload.get(field) == field_id and payload.get("state") == desired_state:
if payload.get(field) == field_id:
logger.trace("Event found yielding event {}", event)
yield event.model_dump_json()
stop_event.set()
Expand Down
26 changes: 16 additions & 10 deletions waypoint/services/nats_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from nats.js.api import ConsumerConfig, DeliverPolicy
from nats.js.client import JetStreamContext

from shared.constants import NATS_STREAM, NATS_SUBJECT
from shared.constants import NATS_STATE_STREAM, NATS_STATE_SUBJECT
from shared.log_config import get_logger
from shared.models.webhook_events import CloudApiWebhookEventGeneric

Expand All @@ -25,7 +25,7 @@ def __init__(self, jetstream: JetStreamContext):
self.js_context: JetStreamContext = jetstream

async def _subscribe(
self, group_id: str, wallet_id: str, look_back: int
self, *, group_id: str, wallet_id: str, topic: str, state: str, look_back: int
) -> JetStreamContext.PullSubscription:
try:
logger.trace(
Expand All @@ -35,14 +35,14 @@ async def _subscribe(
)
group_id = group_id or "*"
subscribe_kwargs = {
"subject": f"{NATS_SUBJECT}.{group_id}.{wallet_id}",
"stream": NATS_STREAM,
"subject": f"{NATS_STATE_SUBJECT}.{group_id}.{wallet_id}.{topic}.{state}",
"stream": NATS_STATE_STREAM,
}

# Get the current time in UTC
current_time = datetime.now(timezone.utc)

# Subtract 30 seconds
# Subtract look_back time from the current time
look_back_time = current_time - timedelta(seconds=look_back)

# Format the time in the required format
Expand Down Expand Up @@ -70,22 +70,29 @@ async def _subscribe(
@asynccontextmanager
async def process_events(
self,
*,
group_id: str,
wallet_id: str,
topic: str,
state: str,
stop_event: asyncio.Event,
duration: int = 10,
look_back: int = 300,
):
logger.debug(
"Processing events for group {} and wallet {} on topic {}",
"Processing events for group {} and wallet {} on topic {} with state {}",
group_id,
wallet_id,
topic,
state,
)

subscription = await self._subscribe(
group_id=group_id, wallet_id=wallet_id, look_back=look_back
group_id=group_id,
wallet_id=wallet_id,
topic=topic,
state=state,
look_back=look_back,
)

async def event_generator():
Expand All @@ -101,9 +108,8 @@ async def event_generator():
try:
messages = await subscription.fetch(batch=5, timeout=0.2)
for message in messages:
if message.headers.get("event_topic") == topic:
event = orjson.loads(message.data)
yield CloudApiWebhookEventGeneric(**event)
event = orjson.loads(message.data)
yield CloudApiWebhookEventGeneric(**event)
await message.ack()
except TimeoutError:
logger.trace("Timeout fetching messages continuing...")
Expand Down
41 changes: 33 additions & 8 deletions waypoint/tests/services/test_nats_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from nats.js.api import ConsumerConfig, DeliverPolicy
from nats.js.client import JetStreamContext

from shared.constants import NATS_STREAM, NATS_SUBJECT
from shared.constants import NATS_STATE_STREAM, NATS_STATE_SUBJECT
from shared.models.webhook_events import CloudApiWebhookEventGeneric
from shared.services.nats_jetstream import init_nats_client
from waypoint.services.nats_service import NatsEventsProcessor
Expand Down Expand Up @@ -60,11 +60,15 @@ async def test_nats_events_processor_subscribe(
opt_start_time="2024-10-24T09:17:17.998149541Z",
)
subscription = await processor._subscribe( # pylint: disable=protected-access
"group_id", "wallet_id", 300
group_id="group_id",
wallet_id="wallet_id",
topic="proofs",
state="done",
look_back=300,
)
mock_nats_client.pull_subscribe.assert_called_once_with(
subject=f"{NATS_SUBJECT}.group_id.wallet_id",
stream=NATS_STREAM,
subject=f"{NATS_STATE_SUBJECT}.group_id.wallet_id.proofs.done",
stream=NATS_STATE_STREAM,
config=mock_config.return_value,
)
assert isinstance(subscription, JetStreamContext.PullSubscription)
Expand All @@ -79,7 +83,13 @@ async def test_nats_events_processor_subscribe_error(
mock_nats_client.pull_subscribe.side_effect = exception

with pytest.raises(exception):
await processor._subscribe("group_id", "wallet_id", 300)
await processor._subscribe( # pylint: disable=protected-access
group_id="group_id",
wallet_id="wallet_id",
topic="proofs",
state="done",
look_back=300,
)


@pytest.mark.anyio
Expand All @@ -106,7 +116,12 @@ async def test_process_events(

stop_event = asyncio.Event()
async with processor.process_events(
group_id, "wallet_id", "test_topic", stop_event, duration=1
group_id=group_id,
wallet_id="wallet_id",
topic="test_topic",
state="state",
stop_event=stop_event,
duration=1,
) as event_generator:
events = []
async for event in event_generator:
Expand All @@ -133,7 +148,12 @@ async def test_process_events_cancelled_error(

with patch.object(mock_subscription, "fetch", side_effect=asyncio.CancelledError):
async with processor.process_events(
"group_id", "wallet_id", "test_topic", stop_event, duration=1
group_id="group_id",
wallet_id="wallet_id",
topic="test_topic",
state="state",
stop_event=stop_event,
duration=1,
) as event_generator:
events = []
async for event in event_generator:
Expand All @@ -155,7 +175,12 @@ async def test_process_events_timeout_error(

stop_event = asyncio.Event()
async with processor.process_events(
"group_id", "wallet_id", "test_topic", stop_event, duration=1
group_id="group_id",
wallet_id="wallet_id",
topic="test_topic",
state="state",
stop_event=stop_event,
duration=1,
) as event_generator:
events = []
async for event in event_generator:
Expand Down

0 comments on commit 96560f1

Please sign in to comment.