Skip to content

Commit

Permalink
Update StockPriceWatcher to return timeleft until next price tick
Browse files Browse the repository at this point in the history
  • Loading branch information
sgobotta committed Mar 11, 2024
1 parent f83d140 commit df70d90
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 15 deletions.
10 changes: 7 additions & 3 deletions lib/ex_finnhub/stock_prices.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,22 @@ defmodule ExFinnhub.StockPrices do
@spec heartbeat(pid()) :: :ok
def heartbeat(pid), do: StockPriceServer.heartbeat(pid)

@spec subscribe_stock_price(binary()) :: {:ok, pid(), StockPrice.t() | nil}
@spec subscribe_stock_price(binary()) ::
{:ok, pid(), {StockPrice.t() | nil, non_neg_integer() | nil}}
def subscribe_stock_price(stock_symbol) do
maybe_start_stock_price_watcher(stock_symbol)
Channels.subscribe_stock_prices_topic(stock_symbol)
{:ok, pid} = maybe_start_stock_price_worker(stock_symbol)

{:ok, millis_to_next_price_update} =
StockPriceServer.get_interval_timeleft(pid)

case fetch_last_registered_stock_price(stock_symbol) do
{:ok, %StockPrice{} = stock_price} ->
{:ok, pid, stock_price}
{:ok, pid, {stock_price, millis_to_next_price_update}}

{:error, :no_result} ->
{:ok, pid, nil}
{:ok, pid, {nil, millis_to_next_price_update}}
end
end

Expand Down
18 changes: 17 additions & 1 deletion lib/ex_finnhub/stock_prices/stock_price_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ defmodule ExFinnhub.StockPrices.StockPriceSupervisor do
{StockPriceWatcher,
[
supplier: "finnhub",
module_name: watcher_module_name(symbol),
name: watcher_module_name(symbol),
on_get_timeleft_to_next_update: &get_timeleft_to_next_update/1,
symbol: symbol
]}
)
Expand All @@ -100,4 +101,19 @@ defmodule ExFinnhub.StockPrices.StockPriceSupervisor do
|> String.to_atom()
|> then(&Module.concat(__MODULE__, &1))
end

@spec get_timeleft_to_next_update(binary()) :: {:ok, non_neg_integer() | nil}
defp get_timeleft_to_next_update(symbol) do
maybe_timeleft =
case get_child(symbol) do
nil ->
nil

{pid, _state} ->
{:ok, timeleft} = StockPriceServer.get_interval_timeleft(pid)
if timeleft, do: timeleft, else: nil
end

{:ok, maybe_timeleft}
end
end
38 changes: 27 additions & 11 deletions lib/ex_finnhub/stock_prices/stock_price_watcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,32 @@ defmodule ExFinnhub.StockPrices.StockPriceWatcher do
require Logger

@spec child_spec(keyword()) :: map()
def child_spec(supplier: supplier, module_name: module_name, symbol: symbol),
do: %{
def child_spec(opts) do
supplier = Keyword.fetch!(opts, :supplier)

%{
id: supplier <> "-producer",
start: {
__MODULE__,
:start_link,
[
[
supplier: supplier,
name: module_name,
symbol: symbol
]
]
[opts]
},
restart: :permanent,
type: :worker
}
end

def start_link(opts) do
name = Keyword.fetch!(opts, :name)
supplier_name = Keyword.fetch!(opts, :supplier)
symbol = Keyword.fetch!(opts, :symbol)

on_get_timeleft_to_next_update =
Keyword.fetch!(
opts,
:on_get_timeleft_to_next_update
)

Broadway.start_link(__MODULE__,
name: name,
producer: [
Expand All @@ -50,21 +53,26 @@ defmodule ExFinnhub.StockPrices.StockPriceWatcher do
],
processors: [
default: [min_demand: 0, max_demand: 10]
],
context: [
get_timeleft_to_next_update: on_get_timeleft_to_next_update
]
)
end

def handle_message(_processor, %Broadway.Message{} = message, _context) do
def handle_message(_processor, %Broadway.Message{} = message, context) do
Logger.debug("Loading stock price from message=#{inspect(message)}")

with %Redis.Stream.Entry{} = entry <-
Redis.Client.parse_stream_entry(message.data),
{:ok, :loaded, %StockPrice{symbol: symbol} = stock_price} <-
load_stock_price_entry(entry),
{:ok, millis_until_next_update} <-
get_timeleft_to_next_update(context, symbol),
:ok <-
StockPrices.Channels.broadcast_new_stock_price!(
symbol,
stock_price
{stock_price, millis_until_next_update}
) do
message
else
Expand Down Expand Up @@ -116,4 +124,12 @@ defmodule ExFinnhub.StockPrices.StockPriceWatcher do

@spec get_stage :: ExFinance.Application.stage()
defp get_stage, do: ExFinance.Application.stage()

@spec get_timeleft_to_next_update(keyword(), binary()) ::
{:ok, non_neg_integer() | false}
defp get_timeleft_to_next_update(
[get_timeleft_to_next_update: get_timeleft_to_next_update],
symbol
),
do: get_timeleft_to_next_update.(symbol)
end

0 comments on commit df70d90

Please sign in to comment.