Skip to content

Commit

Permalink
Edit random_producer
Browse files Browse the repository at this point in the history
Edit random_producer in order to add immediate number creation instead of waiting demand*milliseconds

Signed-off-by: Eddy Babetto <eddy.babetto@secomind.com>
  • Loading branch information
eddbbt committed Feb 29, 2024
1 parent f1c56e9 commit da23a20
Showing 1 changed file with 46 additions and 20 deletions.
66 changes: 46 additions & 20 deletions lib/astarte_flow/blocks/random_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,15 @@ defmodule Astarte.Flow.Blocks.RandomProducer do
defmodule Config do
@moduledoc false

@type t() :: %__MODULE__{
key: String.t(),
type: Astarte.Flow.Blocks.RandomProducer.supported_types(),
min: number() | nil,
max: number() | nil,
p: float() | nil,
delay_ms: integer() | nil
}

defstruct [
:key,
:type,
:min,
:max,
:p,
:delay_ms
:delay_ms,
:pending_demand,
:queue
]
end

Expand Down Expand Up @@ -104,19 +97,57 @@ defmodule Astarte.Flow.Blocks.RandomProducer do

with {:ok, type} <- validate_type(type),
{:ok, state} <- init_state(key, type, opts) do
delay_ms = Keyword.get(opts, :delay_ms)
{:producer, %Config{state | delay_ms: delay_ms}}
delay_ms = Keyword.get(opts, :delay_ms) || 0
send(self(), :poll)

{:producer, %Config{state | delay_ms: delay_ms, pending_demand: 0, queue: :queue.new()},
dispatcher: GenStage.BroadcastDispatcher}
else
{:error, reason} ->
{:stop, reason}
end
end

@impl true
def handle_demand(demand, config) when demand > 0 do
messages = for _ <- 1..demand, do: generate_message(config)
def handle_demand(incoming_demand, %Config{pending_demand: demand} = config) do
dispatch_messages(%{config | pending_demand: demand + incoming_demand}, [
generate_message(config)
])
end

@impl true
def handle_info(:poll, config) do
%Config{
delay_ms: delay_ms,
queue: queue
} = config

# Schedule next polling
_ = Process.send_after(self(), :poll, delay_ms)

new_queue =
generate_message(config)
|> :queue.in(queue)

new_state = %{config | queue: new_queue}
dispatch_messages(new_state, [])
end

defp dispatch_messages(%Config{pending_demand: 0} = state, messages) do
{:noreply, Enum.reverse(messages), state}
end

defp dispatch_messages(%Config{pending_demand: demand, queue: queue} = state, messages) do
case :queue.out(queue) do
{{:value, message}, updated_queue} ->
updated_state = %{state | pending_demand: demand - 1, queue: updated_queue}
updated_messages = [message | messages]

{:noreply, messages, config}
dispatch_messages(updated_state, updated_messages)

{:empty, _queue} ->
{:noreply, Enum.reverse(messages), state}
end
end

defp validate_type(type) when type in [:integer, :real, :boolean] do
Expand Down Expand Up @@ -166,8 +197,6 @@ defmodule Astarte.Flow.Blocks.RandomProducer do
defp generate_message(%Config{key: key, type: type, delay_ms: delay_ms} = state) do

Check warning on line 197 in lib/astarte_flow/blocks/random_producer.ex

View workflow job for this annotation

GitHub Actions / Check Dialyzer

variable "delay_ms" is unused (if the variable is not meant to be used, prefix it with an underscore)
data = generate_data(state)

maybe_sleep(delay_ms)

%Message{
key: key,
type: type,
Expand All @@ -176,9 +205,6 @@ defmodule Astarte.Flow.Blocks.RandomProducer do
}
end

defp maybe_sleep(nil), do: :ok
defp maybe_sleep(delay_ms) when is_integer(delay_ms), do: :timer.sleep(delay_ms)

defp generate_data(%Config{type: :integer, min: min, max: max}) do
Enum.random(min..max)
end
Expand Down

0 comments on commit da23a20

Please sign in to comment.