From df70d90f1edb627a6dc9939b00520456f2ed3471 Mon Sep 17 00:00:00 2001 From: Santiago Botta Date: Mon, 11 Mar 2024 13:13:22 -0300 Subject: [PATCH] Update StockPriceWatcher to return timeleft until next price tick --- lib/ex_finnhub/stock_prices.ex | 10 +++-- .../stock_prices/stock_price_supervisor.ex | 18 ++++++++- .../stock_prices/stock_price_watcher.ex | 38 +++++++++++++------ 3 files changed, 51 insertions(+), 15 deletions(-) diff --git a/lib/ex_finnhub/stock_prices.ex b/lib/ex_finnhub/stock_prices.ex index 742827b..4ca17a4 100644 --- a/lib/ex_finnhub/stock_prices.ex +++ b/lib/ex_finnhub/stock_prices.ex @@ -13,18 +13,22 @@ defmodule ExFinnhub.StockPrices do @spec heartbeat(pid()) :: :ok def heartbeat(pid), do: StockPriceServer.heartbeat(pid) - @spec subscribe_stock_price(binary()) :: {:ok, pid(), StockPrice.t() | nil} + @spec subscribe_stock_price(binary()) :: + {:ok, pid(), {StockPrice.t() | nil, non_neg_integer() | nil}} def subscribe_stock_price(stock_symbol) do maybe_start_stock_price_watcher(stock_symbol) Channels.subscribe_stock_prices_topic(stock_symbol) {:ok, pid} = maybe_start_stock_price_worker(stock_symbol) + {:ok, millis_to_next_price_update} = + StockPriceServer.get_interval_timeleft(pid) + case fetch_last_registered_stock_price(stock_symbol) do {:ok, %StockPrice{} = stock_price} -> - {:ok, pid, stock_price} + {:ok, pid, {stock_price, millis_to_next_price_update}} {:error, :no_result} -> - {:ok, pid, nil} + {:ok, pid, {nil, millis_to_next_price_update}} end end diff --git a/lib/ex_finnhub/stock_prices/stock_price_supervisor.ex b/lib/ex_finnhub/stock_prices/stock_price_supervisor.ex index e64ea1f..24e7583 100644 --- a/lib/ex_finnhub/stock_prices/stock_price_supervisor.ex +++ b/lib/ex_finnhub/stock_prices/stock_price_supervisor.ex @@ -88,7 +88,8 @@ defmodule ExFinnhub.StockPrices.StockPriceSupervisor do {StockPriceWatcher, [ supplier: "finnhub", - module_name: watcher_module_name(symbol), + name: watcher_module_name(symbol), + on_get_timeleft_to_next_update: &get_timeleft_to_next_update/1, symbol: symbol ]} ) @@ -100,4 +101,19 @@ defmodule ExFinnhub.StockPrices.StockPriceSupervisor do |> String.to_atom() |> then(&Module.concat(__MODULE__, &1)) end + + @spec get_timeleft_to_next_update(binary()) :: {:ok, non_neg_integer() | nil} + defp get_timeleft_to_next_update(symbol) do + maybe_timeleft = + case get_child(symbol) do + nil -> + nil + + {pid, _state} -> + {:ok, timeleft} = StockPriceServer.get_interval_timeleft(pid) + if timeleft, do: timeleft, else: nil + end + + {:ok, maybe_timeleft} + end end diff --git a/lib/ex_finnhub/stock_prices/stock_price_watcher.ex b/lib/ex_finnhub/stock_prices/stock_price_watcher.ex index 1d86632..f881f77 100644 --- a/lib/ex_finnhub/stock_prices/stock_price_watcher.ex +++ b/lib/ex_finnhub/stock_prices/stock_price_watcher.ex @@ -9,29 +9,32 @@ defmodule ExFinnhub.StockPrices.StockPriceWatcher do require Logger @spec child_spec(keyword()) :: map() - def child_spec(supplier: supplier, module_name: module_name, symbol: symbol), - do: %{ + def child_spec(opts) do + supplier = Keyword.fetch!(opts, :supplier) + + %{ id: supplier <> "-producer", start: { __MODULE__, :start_link, - [ - [ - supplier: supplier, - name: module_name, - symbol: symbol - ] - ] + [opts] }, restart: :permanent, type: :worker } + end def start_link(opts) do name = Keyword.fetch!(opts, :name) supplier_name = Keyword.fetch!(opts, :supplier) symbol = Keyword.fetch!(opts, :symbol) + on_get_timeleft_to_next_update = + Keyword.fetch!( + opts, + :on_get_timeleft_to_next_update + ) + Broadway.start_link(__MODULE__, name: name, producer: [ @@ -50,21 +53,26 @@ defmodule ExFinnhub.StockPrices.StockPriceWatcher do ], processors: [ default: [min_demand: 0, max_demand: 10] + ], + context: [ + get_timeleft_to_next_update: on_get_timeleft_to_next_update ] ) end - def handle_message(_processor, %Broadway.Message{} = message, _context) do + def handle_message(_processor, %Broadway.Message{} = message, context) do Logger.debug("Loading stock price from message=#{inspect(message)}") with %Redis.Stream.Entry{} = entry <- Redis.Client.parse_stream_entry(message.data), {:ok, :loaded, %StockPrice{symbol: symbol} = stock_price} <- load_stock_price_entry(entry), + {:ok, millis_until_next_update} <- + get_timeleft_to_next_update(context, symbol), :ok <- StockPrices.Channels.broadcast_new_stock_price!( symbol, - stock_price + {stock_price, millis_until_next_update} ) do message else @@ -116,4 +124,12 @@ defmodule ExFinnhub.StockPrices.StockPriceWatcher do @spec get_stage :: ExFinance.Application.stage() defp get_stage, do: ExFinance.Application.stage() + + @spec get_timeleft_to_next_update(keyword(), binary()) :: + {:ok, non_neg_integer() | false} + defp get_timeleft_to_next_update( + [get_timeleft_to_next_update: get_timeleft_to_next_update], + symbol + ), + do: get_timeleft_to_next_update.(symbol) end