Skip to content

Commit

Permalink
PLAT-1156 ws issues
Browse files Browse the repository at this point in the history
This MR adds reconnect functionality for websockets
  • Loading branch information
Victor Ramirez committed Sep 18, 2024
1 parent 1fecd61 commit ac6d620
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 406 deletions.
30 changes: 27 additions & 3 deletions coinmetrics/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from datetime import date, datetime
from logging import getLogger
from typing import Dict, List, Optional, Union, cast, Callable, Any
from types import FrameType
from urllib.parse import urlencode

import signal
import requests
from requests import HTTPError, Response
import websocket
Expand Down Expand Up @@ -62,9 +63,16 @@
class CmStream:
def __init__(self, ws_url: str):
self.ws_url = ws_url
self._stop_event_received = False
self._events_handlers_set = False

def run(
self, on_message: MessageHandlerType = None, on_error: MessageHandlerType = None, on_close: Optional[Callable[[websocket.WebSocketApp, int, str], None]] = None) -> None:
self,
on_message: MessageHandlerType = None,
on_error: MessageHandlerType = None,
on_close: Optional[Callable[[websocket.WebSocketApp, int, str], None]] = None,
reconnect: bool = True
) -> None:
if on_message is None:
on_message = self._on_message
if on_error is None:
Expand All @@ -76,7 +84,23 @@ def run(
self.ws_url, on_message=on_message, on_error=on_error, on_close=on_close
)
self.ws = ws
self.ws.run_forever()

self._stop_event_received = False
self._register_signal_handlers([signal.SIGINT])
self.ws.run_forever(reconnect=reconnect)

def _register_signal_handlers(self, signal_types: List[int]) -> None:
if self._events_handlers_set:
return
for signal_type in signal_types:
previous_handler = signal.getsignal(signal_type)
if callable(previous_handler):
def handler(signum: int, frame: Optional[FrameType]) -> None:
self._stop_event_received = True
previous_handler(signum, frame)
signal.signal(signal_type, handler)

self._events_handlers_set = True

def _on_message(self, stream: websocket.WebSocketApp, message: str) -> None:
print(f"{message}")
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ requests==2.27.1; (python_version >= "2.7" and python_full_version < "3.0.0") or
six==1.16.0; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.3.0"
urllib3==1.26.8; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" and python_version < "4"
websocket-client==1.2.3; python_version >= "3.6"
typer==0.6.1
typer==0.6.1
3 changes: 2 additions & 1 deletion test/test_debugging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from coinmetrics.api_client import CoinMetricsClient
import os
from datetime import datetime

client = CoinMetricsClient(str(os.environ.get("CM_API_KEY")), debug_mode=True)
cm_api_key_set = os.environ.get("CM_API_KEY") is not None
Expand All @@ -20,7 +21,7 @@ def test_debugging_get_rrs() -> None:
eth_reference_rates = client.get_asset_metrics(
metrics="ReferenceRateUSD",
assets="eth",
start_time="2020-01-01",
start_time=datetime.now().date(),
frequency="1d",
)
for data in eth_reference_rates:
Expand Down
Loading

0 comments on commit ac6d620

Please sign in to comment.