Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix random_producer delay #160

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions lib/astarte_flow/blocks/random_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,15 @@
defmodule Config do
@moduledoc false

@type t() :: %__MODULE__{
key: String.t(),
type: Astarte.Flow.Blocks.RandomProducer.supported_types(),
min: number() | nil,
max: number() | nil,
p: float() | nil,
delay_ms: integer() | nil
}

defstruct [
:key,
:type,
:min,
:max,
:p,
:delay_ms
:delay_ms,
:pending_demand,
:queue
]
end

Expand Down Expand Up @@ -104,19 +97,57 @@

with {:ok, type} <- validate_type(type),
{:ok, state} <- init_state(key, type, opts) do
delay_ms = Keyword.get(opts, :delay_ms)
{:producer, %Config{state | delay_ms: delay_ms}}
delay_ms = Keyword.get(opts, :delay_ms) || 0
send(self(), :poll)

{:producer, %Config{state | delay_ms: delay_ms, pending_demand: 0, queue: :queue.new()},
dispatcher: GenStage.BroadcastDispatcher}
else
{:error, reason} ->
{:stop, reason}
end
end

@impl true
def handle_demand(demand, config) when demand > 0 do
messages = for _ <- 1..demand, do: generate_message(config)
def handle_demand(incoming_demand, %Config{pending_demand: demand} = config) do
dispatch_messages(%{config | pending_demand: demand + incoming_demand}, [
generate_message(config)
])
end

@impl true
def handle_info(:poll, config) do
%Config{
delay_ms: delay_ms,
queue: queue
} = config

# Schedule next polling
_ = Process.send_after(self(), :poll, delay_ms)

new_queue =
generate_message(config)
|> :queue.in(queue)

new_state = %{config | queue: new_queue}
dispatch_messages(new_state, [])
end

defp dispatch_messages(%Config{pending_demand: 0} = state, messages) do
{:noreply, Enum.reverse(messages), state}
end

defp dispatch_messages(%Config{pending_demand: demand, queue: queue} = state, messages) do
case :queue.out(queue) do
{{:value, message}, updated_queue} ->
updated_state = %{state | pending_demand: demand - 1, queue: updated_queue}
updated_messages = [message | messages]

{:noreply, messages, config}
dispatch_messages(updated_state, updated_messages)

{:empty, _queue} ->
{:noreply, Enum.reverse(messages), state}
end
end

defp validate_type(type) when type in [:integer, :real, :boolean] do
Expand Down Expand Up @@ -163,11 +194,9 @@
end
end

defp generate_message(%Config{key: key, type: type, delay_ms: delay_ms} = state) do

Check warning on line 197 in lib/astarte_flow/blocks/random_producer.ex

View workflow job for this annotation

GitHub Actions / Check Dialyzer

variable "delay_ms" is unused (if the variable is not meant to be used, prefix it with an underscore)
data = generate_data(state)

maybe_sleep(delay_ms)

%Message{
key: key,
type: type,
Expand All @@ -176,9 +205,6 @@
}
end

defp maybe_sleep(nil), do: :ok
defp maybe_sleep(delay_ms) when is_integer(delay_ms), do: :timer.sleep(delay_ms)

defp generate_data(%Config{type: :integer, min: min, max: max}) do
Enum.random(min..max)
end
Expand Down
Loading