diff --git a/lib/astarte_flow/blocks/random_producer.ex b/lib/astarte_flow/blocks/random_producer.ex index a84867b..a4a6abc 100644 --- a/lib/astarte_flow/blocks/random_producer.ex +++ b/lib/astarte_flow/blocks/random_producer.ex @@ -59,22 +59,15 @@ defmodule Astarte.Flow.Blocks.RandomProducer do defmodule Config do @moduledoc false - @type t() :: %__MODULE__{ - key: String.t(), - type: Astarte.Flow.Blocks.RandomProducer.supported_types(), - min: number() | nil, - max: number() | nil, - p: float() | nil, - delay_ms: integer() | nil - } - defstruct [ :key, :type, :min, :max, :p, - :delay_ms + :polling_interval_ms, + :pending_demand, + :queue ] end @@ -105,7 +98,11 @@ defmodule Astarte.Flow.Blocks.RandomProducer do with {:ok, type} <- validate_type(type), {:ok, state} <- init_state(key, type, opts) do delay_ms = Keyword.get(opts, :delay_ms) - {:producer, %Config{state | delay_ms: delay_ms}} + send(self(), :poll) + + {:producer, + %Config{state | polling_interval_ms: delay_ms, pending_demand: 0, queue: :queue.new()}, + dispatcher: GenStage.BroadcastDispatcher} else {:error, reason} -> {:stop, reason} @@ -113,10 +110,44 @@ defmodule Astarte.Flow.Blocks.RandomProducer do end @impl true - def handle_demand(demand, config) when demand > 0 do - messages = for _ <- 1..demand, do: generate_message(config) + def handle_demand(incoming_demand, %Config{pending_demand: demand} = config) do + dispatch_messages(%{config | pending_demand: demand + incoming_demand}, [ + generate_message(config) + ]) + end + + def handle_info(:poll, config) do + %Config{ + polling_interval_ms: polling_interval_ms, + queue: queue + } = config + + # Schedule next polling + _ = Process.send_after(self(), :poll, polling_interval_ms) + + new_queue = + generate_message(config) + |> :queue.in(queue) + + new_state = %{config | queue: new_queue} + dispatch_messages(new_state, []) + end - {:noreply, messages, config} + defp dispatch_messages(%Config{pending_demand: 0} = state, messages) do + {:noreply, Enum.reverse(messages), state} + end + + defp dispatch_messages(%Config{pending_demand: demand, queue: queue} = state, messages) do + case :queue.out(queue) do + {{:value, message}, updated_queue} -> + updated_state = %{state | pending_demand: demand - 1, queue: updated_queue} + updated_messages = [message | messages] + + dispatch_messages(updated_state, updated_messages) + + {:empty, _queue} -> + {:noreply, Enum.reverse(messages), state} + end end defp validate_type(type) when type in [:integer, :real, :boolean] do @@ -163,11 +194,9 @@ defmodule Astarte.Flow.Blocks.RandomProducer do end end - defp generate_message(%Config{key: key, type: type, delay_ms: delay_ms} = state) do + defp generate_message(%Config{key: key, type: type, polling_interval_ms: delay_ms} = state) do data = generate_data(state) - maybe_sleep(delay_ms) - %Message{ key: key, type: type,