diff --git a/lib/astarte_flow/blocks/random_producer.ex b/lib/astarte_flow/blocks/random_producer.ex index a84867b..5bbe151 100644 --- a/lib/astarte_flow/blocks/random_producer.ex +++ b/lib/astarte_flow/blocks/random_producer.ex @@ -59,22 +59,15 @@ defmodule Astarte.Flow.Blocks.RandomProducer do defmodule Config do @moduledoc false - @type t() :: %__MODULE__{ - key: String.t(), - type: Astarte.Flow.Blocks.RandomProducer.supported_types(), - min: number() | nil, - max: number() | nil, - p: float() | nil, - delay_ms: integer() | nil - } - defstruct [ :key, :type, :min, :max, :p, - :delay_ms + :delay_ms, + :pending_demand, + :queue ] end @@ -104,8 +97,11 @@ defmodule Astarte.Flow.Blocks.RandomProducer do with {:ok, type} <- validate_type(type), {:ok, state} <- init_state(key, type, opts) do - delay_ms = Keyword.get(opts, :delay_ms) - {:producer, %Config{state | delay_ms: delay_ms}} + delay_ms = Keyword.get(opts, :delay_ms) || 0 + send(self(), :poll) + + {:producer, %Config{state | delay_ms: delay_ms, pending_demand: 0, queue: :queue.new()}, + dispatcher: GenStage.BroadcastDispatcher} else {:error, reason} -> {:stop, reason} @@ -113,10 +109,45 @@ defmodule Astarte.Flow.Blocks.RandomProducer do end @impl true - def handle_demand(demand, config) when demand > 0 do - messages = for _ <- 1..demand, do: generate_message(config) + def handle_demand(incoming_demand, %Config{pending_demand: demand} = config) do + dispatch_messages(%{config | pending_demand: demand + incoming_demand}, [ + generate_message(config) + ]) + end + + @impl true + def handle_info(:poll, config) do + %Config{ + delay_ms: delay_ms, + queue: queue + } = config + + # Schedule next polling + _ = Process.send_after(self(), :poll, delay_ms) + + new_queue = + generate_message(config) + |> :queue.in(queue) + + new_state = %{config | queue: new_queue} + dispatch_messages(new_state, []) + end + + defp dispatch_messages(%Config{pending_demand: 0} = state, messages) do + {:noreply, Enum.reverse(messages), state} + end + + defp dispatch_messages(%Config{pending_demand: demand, queue: queue} = state, messages) do + case :queue.out(queue) do + {{:value, message}, updated_queue} -> + updated_state = %{state | pending_demand: demand - 1, queue: updated_queue} + updated_messages = [message | messages] - {:noreply, messages, config} + dispatch_messages(updated_state, updated_messages) + + {:empty, _queue} -> + {:noreply, Enum.reverse(messages), state} + end end defp validate_type(type) when type in [:integer, :real, :boolean] do @@ -166,8 +197,6 @@ defmodule Astarte.Flow.Blocks.RandomProducer do defp generate_message(%Config{key: key, type: type, delay_ms: delay_ms} = state) do data = generate_data(state) - maybe_sleep(delay_ms) - %Message{ key: key, type: type, @@ -176,9 +205,6 @@ defmodule Astarte.Flow.Blocks.RandomProducer do } end - defp maybe_sleep(nil), do: :ok - defp maybe_sleep(delay_ms) when is_integer(delay_ms), do: :timer.sleep(delay_ms) - defp generate_data(%Config{type: :integer, min: min, max: max}) do Enum.random(min..max) end