Skip to content

Commit

Permalink
Edit random_producer
Browse files Browse the repository at this point in the history
Edit random_producer in order to add immediate number creation instead of waiting demand*milliseconds

Signed-off-by: Eddy Babetto <eddy.babetto@secomind.com>
  • Loading branch information
eddbbt committed Feb 29, 2024
1 parent f1c56e9 commit c360022
Showing 1 changed file with 46 additions and 17 deletions.
63 changes: 46 additions & 17 deletions lib/astarte_flow/blocks/random_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,15 @@ defmodule Astarte.Flow.Blocks.RandomProducer do
defmodule Config do
@moduledoc false

@type t() :: %__MODULE__{
key: String.t(),
type: Astarte.Flow.Blocks.RandomProducer.supported_types(),
min: number() | nil,
max: number() | nil,
p: float() | nil,
delay_ms: integer() | nil
}

defstruct [
:key,
:type,
:min,
:max,
:p,
:delay_ms
:polling_interval_ms,
:pending_demand,
:queue
]
end

Expand Down Expand Up @@ -105,18 +98,56 @@ defmodule Astarte.Flow.Blocks.RandomProducer do
with {:ok, type} <- validate_type(type),
{:ok, state} <- init_state(key, type, opts) do
delay_ms = Keyword.get(opts, :delay_ms)
{:producer, %Config{state | delay_ms: delay_ms}}
send(self(), :poll)

{:producer,
%Config{state | polling_interval_ms: delay_ms, pending_demand: 0, queue: :queue.new()},
dispatcher: GenStage.BroadcastDispatcher}
else
{:error, reason} ->
{:stop, reason}
end
end

@impl true
def handle_demand(demand, config) when demand > 0 do
messages = for _ <- 1..demand, do: generate_message(config)
def handle_demand(incoming_demand, %Config{pending_demand: demand} = config) do
dispatch_messages(%{config | pending_demand: demand + incoming_demand}, [
generate_message(config)
])
end

def handle_info(:poll, config) do
%Config{
polling_interval_ms: polling_interval_ms,
queue: queue
} = config

# Schedule next polling
_ = Process.send_after(self(), :poll, polling_interval_ms)

new_queue =
generate_message(config)
|> :queue.in(queue)

new_state = %{config | queue: new_queue}
dispatch_messages(new_state, [])
end

{:noreply, messages, config}
defp dispatch_messages(%Config{pending_demand: 0} = state, messages) do
{:noreply, Enum.reverse(messages), state}
end

defp dispatch_messages(%Config{pending_demand: demand, queue: queue} = state, messages) do
case :queue.out(queue) do
{{:value, message}, updated_queue} ->
updated_state = %{state | pending_demand: demand - 1, queue: updated_queue}
updated_messages = [message | messages]

dispatch_messages(updated_state, updated_messages)

{:empty, _queue} ->
{:noreply, Enum.reverse(messages), state}
end
end

defp validate_type(type) when type in [:integer, :real, :boolean] do
Expand Down Expand Up @@ -163,11 +194,9 @@ defmodule Astarte.Flow.Blocks.RandomProducer do
end
end

defp generate_message(%Config{key: key, type: type, delay_ms: delay_ms} = state) do
defp generate_message(%Config{key: key, type: type, polling_interval_ms: delay_ms} = state) do
data = generate_data(state)

maybe_sleep(delay_ms)

%Message{
key: key,
type: type,
Expand Down

0 comments on commit c360022

Please sign in to comment.