Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Measure Hydration/Transformation/Indexing Times #153

Merged
merged 24 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/dpul_collections/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ defmodule DpulCollections.Application do
# Start a worker by calling: DpulCollections.Worker.start_link(arg)
# {DpulCollections.Worker, arg},
# Start to serve requests, typically the last entry
DpulCollectionsWeb.Endpoint
DpulCollectionsWeb.Endpoint,
DpulCollections.IndexMetricsTracker
] ++ environment_children(Application.fetch_env!(:dpul_collections, :current_env))

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
171 changes: 171 additions & 0 deletions lib/dpul_collections/index_metrics_tracker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
defmodule DpulCollections.IndexMetricsTracker do
use GenServer
alias DpulCollections.IndexingPipeline.Metrics
alias DpulCollections.IndexingPipeline.DatabaseProducer

@type processor_state :: %{
start_time: integer(),
end_time: integer(),
polling_started: boolean(),
acked_count: integer()
}
@type state :: %{(processor_key :: String.t()) => processor_state()}

def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end

@impl true
def init(_) do
:ok =
:telemetry.attach(
"metrics-ack-tracker",
[:database_producer, :ack, :done],
&handle_ack_received/4,
nil
)

{:ok, %{}}
end

@spec register_fresh_index(source :: module()) :: term()
def register_fresh_index(source) do
tpendragon marked this conversation as resolved.
Show resolved Hide resolved
GenServer.call(__MODULE__, {:fresh_index, source})
end

@spec register_polling_started(source :: module()) :: term()
def register_polling_started(source) do
GenServer.call(__MODULE__, {:poll_started, source})
end

@spec index_durations(source :: module()) :: term()
def index_durations(source) do
Metrics.index_metrics(source.processor_marker_key(), "full_index")
end

def reset() do
GenServer.call(__MODULE__, {:reset})
end

@impl true
@spec handle_call(term(), term(), state()) :: term()
def handle_call({:reset}, _, _state) do
{:reply, nil, %{}}
end

@impl true
@spec handle_call(term(), term(), state()) :: term()
def handle_call({:fresh_index, source}, _, state) do
new_state =
put_in(state, [source.processor_marker_key()], %{
start_time: :erlang.monotonic_time(),
acked_count: 0
})

{:reply, nil, new_state}
end

@spec handle_call(term(), term(), state()) :: term()
def handle_call({:poll_started, source}, _, state) do
# Record that polling has started if we've recorded a start time but not an
# end time for a source. Then the next time the source finishes acknowledgements
# we'll record an end time.
if get_in(state, [source.processor_marker_key(), :start_time]) != nil &&
get_in(state, [source.processor_marker_key(), :end_time]) == nil do
state = put_in(state, [source.processor_marker_key(), :polling_started], true)

{:reply, nil, state}
else
{:reply, nil, state}
end
end

@spec handle_call(term(), term(), state()) :: term()
def handle_call(
{:ack_received, metadata = %{processor_marker_key: processor_marker_key}},
_,
state
) do
state =
state
|> put_in(
[processor_marker_key],
handle_ack_received(metadata, Map.get(state, processor_marker_key))
)

{:reply, nil, state}
end

# If there's no stored info yet, do nothing.
@spec handle_ack_received(DatabaseProducer.ack_event_metadata(), processor_state()) ::
processor_state()
defp handle_ack_received(_event, nil), do: nil
# If there's a start and end time, do nothing
defp handle_ack_received(
_event,
processor_state = %{start_time: _start_time, end_time: _end_time}
),
do: processor_state

# If there's a start, trigger for end time, and the unacked_count is 0, create the IndexMetric.
defp handle_ack_received(
%{
processor_marker_key: processor_marker_key,
acked_count: new_acked_count,
unacked_count: 0
},
processor_state = %{
start_time: _start_time,
polling_started: true,
acked_count: old_acked_count
}
) do
processor_state =
processor_state
|> put_in([:end_time], :erlang.monotonic_time())
|> Map.delete(:polling_started)
|> put_in([:acked_count], old_acked_count + new_acked_count)

duration = processor_state[:end_time] - processor_state[:start_time]

:telemetry.execute(
[:dpulc, :indexing_pipeline, event(processor_marker_key), :time_to_poll],
%{duration: duration},
%{source: processor_marker_key}
)

Metrics.create_index_metric(%{
type: processor_marker_key,
measurement_type: "full_index",
duration: System.convert_time_unit(duration, :native, :second),
records_acked: processor_state[:acked_count]
})

processor_state
end

# If there's a start time, record the acked_count
defp handle_ack_received(
%{acked_count: new_acked_count},
processor_state = %{start_time: _start_time, acked_count: old_acked_count}
) do
processor_state
|> put_in([:acked_count], old_acked_count + new_acked_count)
end

defp handle_ack_received([:database_producer, :ack, :done], _measurements, metadata, _config) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this one recurse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I should probably call that function something different

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to ack_telemetry_callback

GenServer.call(__MODULE__, {:ack_received, metadata})
end

def event("figgy_hydrator") do
:hydrator
end

def event("figgy_transformer") do
:transformer
end

def event("figgy_indexer") do
:indexer
end
end
27 changes: 23 additions & 4 deletions lib/dpul_collections/indexing_pipeline/database_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do
records =
source_module.get_cache_entries_since!(last_queried_marker, total_demand, cache_version)

if last_queried_marker == nil && length(records) > 0 do
DpulCollections.IndexMetricsTracker.register_fresh_index(source_module)
end

new_state =
state
|> Map.put(
Expand All @@ -82,6 +86,7 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do

# Set a timer to try fulfilling demand again later
if new_state.stored_demand > 0 do
DpulCollections.IndexMetricsTracker.register_polling_started(source_module)
Process.send_after(self(), :check_for_updates, 50)
end

Expand Down Expand Up @@ -136,7 +141,12 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do
})
end

notify_ack(pending_markers |> length())
notify_ack(
pending_markers |> length(),
new_state.pulled_records |> length(),
state.source_module.processor_marker_key()
)

{:noreply, messages, new_state}
end

Expand Down Expand Up @@ -214,12 +224,21 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do

# This happens when ack is finished, we listen to this telemetry event in
# tests so we know when the Producer's done processing a message.
@spec notify_ack(integer()) :: any()
defp notify_ack(acked_message_count) do
@spec notify_ack(integer(), integer(), String.t()) :: any()
@type ack_event_metadata :: %{
acked_count: integer(),
unacked_count: integer(),
processor_marker_key: String.t()
}
defp notify_ack(acked_message_count, unacked_count, processor_marker_key) do
:telemetry.execute(
[:database_producer, :ack, :done],
%{},
%{acked_count: acked_message_count}
%{
acked_count: acked_message_count,
unacked_count: unacked_count,
processor_marker_key: processor_marker_key
}
)
end

Expand Down
21 changes: 21 additions & 0 deletions lib/dpul_collections/indexing_pipeline/index_metric.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule DpulCollections.IndexingPipeline.IndexMetric do
use Ecto.Schema
import Ecto.Changeset

schema "index_metrics" do
field :type, :string
field :measurement_type, :string
# Duration in seconds
field :duration, :integer
field :records_acked, :integer, default: 0

timestamps(type: :utc_datetime_usec)
end

@doc false
def changeset(index_metric, attrs) do
index_metric
|> cast(attrs, [:type, :measurement_type, :duration, :records_acked])
|> validate_required([:type, :measurement_type, :duration, :records_acked])
end
end
29 changes: 29 additions & 0 deletions lib/dpul_collections/indexing_pipeline/metrics.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule DpulCollections.IndexingPipeline.Metrics do
import Ecto.Query, warn: false
alias DpulCollections.Repo
alias DpulCollections.IndexingPipeline.IndexMetric

@doc """
Creates an IndexMetric
"""
def create_index_metric(attrs \\ %{}) do
{:ok, index_metric} =
%IndexMetric{}
|> IndexMetric.changeset(attrs)
|> Repo.insert()

index_metric
end

@doc """
Get index metrics by type
"""
def index_metrics(type, measurement_type) do
query =
from r in IndexMetric,
where: r.type == ^type and r.measurement_type == ^measurement_type,
order_by: [desc: r.inserted_at]

Repo.all(query)
end
end
Loading