diff --git a/lib/dpul_collections/application.ex b/lib/dpul_collections/application.ex index a23cea48..015f1eae 100644 --- a/lib/dpul_collections/application.ex +++ b/lib/dpul_collections/application.ex @@ -20,7 +20,8 @@ defmodule DpulCollections.Application do # Start a worker by calling: DpulCollections.Worker.start_link(arg) # {DpulCollections.Worker, arg}, # Start to serve requests, typically the last entry - DpulCollectionsWeb.Endpoint + DpulCollectionsWeb.Endpoint, + DpulCollections.IndexMetricsTracker ] ++ environment_children(Application.fetch_env!(:dpul_collections, :current_env)) # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/dpul_collections/index_metrics_tracker.ex b/lib/dpul_collections/index_metrics_tracker.ex new file mode 100644 index 00000000..b16b6bf7 --- /dev/null +++ b/lib/dpul_collections/index_metrics_tracker.ex @@ -0,0 +1,171 @@ +defmodule DpulCollections.IndexMetricsTracker do + use GenServer + alias DpulCollections.IndexingPipeline.Metrics + alias DpulCollections.IndexingPipeline.DatabaseProducer + + @type processor_state :: %{ + start_time: integer(), + end_time: integer(), + polling_started: boolean(), + acked_count: integer() + } + @type state :: %{(processor_key :: String.t()) => processor_state()} + + def start_link(_) do + GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + end + + @impl true + def init(_) do + :ok = + :telemetry.attach( + "metrics-ack-tracker", + [:database_producer, :ack, :done], + &ack_telemetry_callback/4, + nil + ) + + {:ok, %{}} + end + + @spec register_fresh_start(source :: module()) :: term() + def register_fresh_start(source) do + GenServer.call(__MODULE__, {:fresh_index, source}) + end + + @spec register_polling_started(source :: module()) :: term() + def register_polling_started(source) do + GenServer.call(__MODULE__, {:poll_started, source}) + end + + @spec processor_durations(source :: module()) :: term() + def processor_durations(source) do + Metrics.index_metrics(source.processor_marker_key(), "full_index") + end + + def reset() do + GenServer.call(__MODULE__, {:reset}) + end + + @impl true + @spec handle_call(term(), term(), state()) :: term() + def handle_call({:reset}, _, _state) do + {:reply, nil, %{}} + end + + @impl true + @spec handle_call(term(), term(), state()) :: term() + def handle_call({:fresh_index, source}, _, state) do + new_state = + put_in(state, [source.processor_marker_key()], %{ + start_time: :erlang.monotonic_time(), + acked_count: 0 + }) + + {:reply, nil, new_state} + end + + @spec handle_call(term(), term(), state()) :: term() + def handle_call({:poll_started, source}, _, state) do + # Record that polling has started if we've recorded a start time but not an + # end time for a source. Then the next time the source finishes acknowledgements + # we'll record an end time. + if get_in(state, [source.processor_marker_key(), :start_time]) != nil && + get_in(state, [source.processor_marker_key(), :end_time]) == nil do + state = put_in(state, [source.processor_marker_key(), :polling_started], true) + + {:reply, nil, state} + else + {:reply, nil, state} + end + end + + @spec handle_call(term(), term(), state()) :: term() + def handle_call( + {:ack_received, metadata = %{processor_marker_key: processor_marker_key}}, + _, + state + ) do + state = + state + |> put_in( + [processor_marker_key], + handle_ack_received(metadata, Map.get(state, processor_marker_key)) + ) + + {:reply, nil, state} + end + + # If there's no stored info yet, do nothing. + @spec handle_ack_received(DatabaseProducer.ack_event_metadata(), processor_state()) :: + processor_state() + defp handle_ack_received(_event, nil), do: nil + # If there's a start and end time, do nothing + defp handle_ack_received( + _event, + processor_state = %{start_time: _start_time, end_time: _end_time} + ), + do: processor_state + + # If there's a start, trigger for end time, and the unacked_count is 0, create the IndexMetric. + defp handle_ack_received( + %{ + processor_marker_key: processor_marker_key, + acked_count: new_acked_count, + unacked_count: 0 + }, + processor_state = %{ + start_time: _start_time, + polling_started: true, + acked_count: old_acked_count + } + ) do + processor_state = + processor_state + |> put_in([:end_time], :erlang.monotonic_time()) + |> Map.delete(:polling_started) + |> put_in([:acked_count], old_acked_count + new_acked_count) + + duration = processor_state[:end_time] - processor_state[:start_time] + + :telemetry.execute( + [:dpulc, :indexing_pipeline, event(processor_marker_key), :time_to_poll], + %{duration: duration}, + %{source: processor_marker_key} + ) + + Metrics.create_index_metric(%{ + type: processor_marker_key, + measurement_type: "full_index", + duration: System.convert_time_unit(duration, :native, :second), + records_acked: processor_state[:acked_count] + }) + + processor_state + end + + # If there's a start time, record the acked_count + defp handle_ack_received( + %{acked_count: new_acked_count}, + processor_state = %{start_time: _start_time, acked_count: old_acked_count} + ) do + processor_state + |> put_in([:acked_count], old_acked_count + new_acked_count) + end + + defp ack_telemetry_callback([:database_producer, :ack, :done], _measurements, metadata, _config) do + GenServer.call(__MODULE__, {:ack_received, metadata}) + end + + def event("figgy_hydrator") do + :hydrator + end + + def event("figgy_transformer") do + :transformer + end + + def event("figgy_indexer") do + :indexer + end +end diff --git a/lib/dpul_collections/indexing_pipeline/database_producer.ex b/lib/dpul_collections/indexing_pipeline/database_producer.ex index ba9d13f2..2f9b6d8a 100644 --- a/lib/dpul_collections/indexing_pipeline/database_producer.ex +++ b/lib/dpul_collections/indexing_pipeline/database_producer.ex @@ -64,6 +64,10 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do records = source_module.get_cache_entries_since!(last_queried_marker, total_demand, cache_version) + if last_queried_marker == nil && length(records) > 0 do + DpulCollections.IndexMetricsTracker.register_fresh_start(source_module) + end + new_state = state |> Map.put( @@ -82,6 +86,7 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do # Set a timer to try fulfilling demand again later if new_state.stored_demand > 0 do + DpulCollections.IndexMetricsTracker.register_polling_started(source_module) Process.send_after(self(), :check_for_updates, 50) end @@ -136,7 +141,12 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do }) end - notify_ack(pending_markers |> length()) + notify_ack( + pending_markers |> length(), + new_state.pulled_records |> length(), + state.source_module.processor_marker_key() + ) + {:noreply, messages, new_state} end @@ -214,12 +224,21 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do # This happens when ack is finished, we listen to this telemetry event in # tests so we know when the Producer's done processing a message. - @spec notify_ack(integer()) :: any() - defp notify_ack(acked_message_count) do + @spec notify_ack(integer(), integer(), String.t()) :: any() + @type ack_event_metadata :: %{ + acked_count: integer(), + unacked_count: integer(), + processor_marker_key: String.t() + } + defp notify_ack(acked_message_count, unacked_count, processor_marker_key) do :telemetry.execute( [:database_producer, :ack, :done], %{}, - %{acked_count: acked_message_count} + %{ + acked_count: acked_message_count, + unacked_count: unacked_count, + processor_marker_key: processor_marker_key + } ) end diff --git a/lib/dpul_collections/indexing_pipeline/index_metric.ex b/lib/dpul_collections/indexing_pipeline/index_metric.ex new file mode 100644 index 00000000..0ae96c4d --- /dev/null +++ b/lib/dpul_collections/indexing_pipeline/index_metric.ex @@ -0,0 +1,21 @@ +defmodule DpulCollections.IndexingPipeline.IndexMetric do + use Ecto.Schema + import Ecto.Changeset + + schema "index_metrics" do + field :type, :string + field :measurement_type, :string + # Duration in seconds + field :duration, :integer + field :records_acked, :integer, default: 0 + + timestamps(type: :utc_datetime_usec) + end + + @doc false + def changeset(index_metric, attrs) do + index_metric + |> cast(attrs, [:type, :measurement_type, :duration, :records_acked]) + |> validate_required([:type, :measurement_type, :duration, :records_acked]) + end +end diff --git a/lib/dpul_collections/indexing_pipeline/metrics.ex b/lib/dpul_collections/indexing_pipeline/metrics.ex new file mode 100644 index 00000000..ce15d958 --- /dev/null +++ b/lib/dpul_collections/indexing_pipeline/metrics.ex @@ -0,0 +1,29 @@ +defmodule DpulCollections.IndexingPipeline.Metrics do + import Ecto.Query, warn: false + alias DpulCollections.Repo + alias DpulCollections.IndexingPipeline.IndexMetric + + @doc """ + Creates an IndexMetric + """ + def create_index_metric(attrs \\ %{}) do + {:ok, index_metric} = + %IndexMetric{} + |> IndexMetric.changeset(attrs) + |> Repo.insert() + + index_metric + end + + @doc """ + Get index metrics by type + """ + def index_metrics(type, measurement_type) do + query = + from r in IndexMetric, + where: r.type == ^type and r.measurement_type == ^measurement_type, + order_by: [desc: r.inserted_at] + + Repo.all(query) + end +end diff --git a/lib/dpul_collections_web/indexing_pipeline/dashboard_page.ex b/lib/dpul_collections_web/indexing_pipeline/dashboard_page.ex new file mode 100644 index 00000000..91f6e8fe --- /dev/null +++ b/lib/dpul_collections_web/indexing_pipeline/dashboard_page.ex @@ -0,0 +1,130 @@ +defmodule DpulCollectionsWeb.IndexingPipeline.DashboardPage do + alias DpulCollections.IndexingPipeline.Figgy.IndexingProducerSource + alias DpulCollections.IndexingPipeline.Figgy.TransformationProducerSource + alias DpulCollections.IndexingPipeline.Figgy.HydrationProducerSource + alias DpulCollections.IndexMetricsTracker + use Phoenix.LiveDashboard.PageBuilder + + @impl true + def mount(_params, _session, socket) do + socket = + assign(socket, + hydration_times: IndexMetricsTracker.processor_durations(HydrationProducerSource), + transformation_times: + IndexMetricsTracker.processor_durations(TransformationProducerSource), + indexing_times: IndexMetricsTracker.processor_durations(IndexingProducerSource) + ) + + {:ok, socket, temporary_assigns: [item_count: nil]} + end + + @impl true + def menu_link(_, _) do + {:ok, "Index Metrics"} + end + + defp hydration_times(_params, _node) do + hydration_times = + IndexMetricsTracker.processor_durations(HydrationProducerSource) + |> Enum.map(&Map.from_struct/1) + + {hydration_times, length(hydration_times)} + end + + defp transformation_times(_params, _node) do + transformation_times = + IndexMetricsTracker.processor_durations(TransformationProducerSource) + |> Enum.map(&Map.from_struct/1) + + {transformation_times, length(transformation_times)} + end + + defp indexing_times(_params, _node) do + indexing_times = + IndexMetricsTracker.processor_durations(IndexingProducerSource) + |> Enum.map(&Map.from_struct/1) + + {indexing_times, length(indexing_times)} + end + + @impl true + def render(assigns) do + ~H""" + <.live_table + id="hydration-table" + dom_id="hydration-table" + page={@page} + title="Hydration Metric Times (1 hour .. 2 days)" + row_fetcher={&hydration_times/2} + rows_name="metrics" + > + <:col field={:updated_at} sortable={:desc} /> + <:col :let={record} field={:duration} header="Duration (hh:mm:ss)"> + <%= to_hh_mm_ss(record.duration) %> + + <:col field={:records_acked} header="Record Count" /> + <:col :let={record} field={:per_second} header="Records per Second"> + <%= per_second(record) %> + + + <.live_table + id="transformation-table" + dom_id="transformation-table" + page={@page} + title="Transformation Metric Times (30 minutes .. 2 hours)" + row_fetcher={&transformation_times/2} + rows_name="metrics" + > + <:col field={:updated_at} sortable={:desc} /> + <:col :let={record} field={:duration} header="Duration (hh:mm:ss)"> + <%= to_hh_mm_ss(record.duration) %> + + <:col field={:records_acked} header="Record Count" /> + <:col :let={record} field={:per_second} header="Records per Second"> + <%= per_second(record) %> + + + <.live_table + id="indexing-table" + dom_id="indexing-table" + page={@page} + title="Indexing Metric Times (10 minutes .. 1 hour)" + row_fetcher={&indexing_times/2} + rows_name="metrics" + > + <:col field={:updated_at} sortable={:desc} /> + <:col :let={record} field={:duration} header="Duration (hh:mm:ss)"> + <%= to_hh_mm_ss(record.duration) %> + + <:col field={:records_acked} header="Record Count" /> + <:col :let={record} field={:per_second} header="Records per Second"> + <%= per_second(record) %> + + + """ + end + + defp per_second(%{duration: 0, records_acked: records_acked}) do + records_acked + end + + defp per_second(%{duration: duration, records_acked: records_acked}) do + records_acked / duration + end + + # Pulled from + # https://nickjanetakis.com/blog/formatting-seconds-into-hh-mm-ss-with-elixir-and-python + # and modified to be consistently hh:mm:ss + defp to_hh_mm_ss(0), do: "00:00:00" + + defp to_hh_mm_ss(seconds) do + units = [3600, 60, 1] + # Returns a list of how many hours, minutes, and seconds there are, reducing + # the total seconds by that amount if it's greater than 1. + t = + Enum.map_reduce(units, seconds, fn unit, val -> {div(val, unit), rem(val, unit)} end) + |> elem(0) + + Enum.map_join(t, ":", fn x -> x |> Integer.to_string() |> String.pad_leading(2, "0") end) + end +end diff --git a/lib/dpul_collections_web/router.ex b/lib/dpul_collections_web/router.ex index 3bcab5e4..8ddf3cbf 100644 --- a/lib/dpul_collections_web/router.ex +++ b/lib/dpul_collections_web/router.ex @@ -50,7 +50,8 @@ defmodule DpulCollectionsWeb.Router do live_dashboard "/dashboard", metrics: DpulCollectionsWeb.Telemetry, additional_pages: [ - broadway: BroadwayDashboard + broadway: BroadwayDashboard, + index_metrics: DpulCollectionsWeb.IndexingPipeline.DashboardPage ] forward "/mailbox", Plug.Swoosh.MailboxPreview diff --git a/priv/repo/migrations/20241120163247_create_index_metrics.exs b/priv/repo/migrations/20241120163247_create_index_metrics.exs new file mode 100644 index 00000000..9473be67 --- /dev/null +++ b/priv/repo/migrations/20241120163247_create_index_metrics.exs @@ -0,0 +1,17 @@ +defmodule DpulCollections.Repo.Migrations.CreateIndexMetrics do + use Ecto.Migration + + def change do + create table(:index_metrics) do + add :type, :string + add :measurement_type, :string + add :duration, :integer + add :records_acked, :integer + + timestamps(type: :utc_datetime_usec) + end + + create index(:index_metrics, [:type]) + create index(:index_metrics, [:measurement_type]) + end +end diff --git a/test/dpul_collections/index_metrics_tracker_test.exs b/test/dpul_collections/index_metrics_tracker_test.exs new file mode 100644 index 00000000..73297e2f --- /dev/null +++ b/test/dpul_collections/index_metrics_tracker_test.exs @@ -0,0 +1,73 @@ +defmodule DpulCollections.IndexMetricsTrackerTest do + alias DpulCollections.IndexingPipeline.IndexMetric + alias DpulCollections.IndexingPipeline.Figgy.HydrationProducerSource + alias DpulCollections.IndexMetricsTracker + alias Phoenix.ActionClauseError + use DpulCollections.DataCase + + describe "processor_durations/1" do + setup do + IndexMetricsTracker.reset() + :ok + end + + test "registers index times" do + # Act + # Send an ack done with acked_count 1, before anything - this should be + # ignored + :telemetry.execute( + [:database_producer, :ack, :done], + %{}, + %{ + acked_count: 1, + unacked_count: 0, + processor_marker_key: HydrationProducerSource.processor_marker_key() + } + ) + + IndexMetricsTracker.register_fresh_start(HydrationProducerSource) + # Send an ack done with acked_count 1 + :telemetry.execute( + [:database_producer, :ack, :done], + %{}, + %{ + acked_count: 1, + unacked_count: 0, + processor_marker_key: HydrationProducerSource.processor_marker_key() + } + ) + + IndexMetricsTracker.register_polling_started(HydrationProducerSource) + # Send an ack done with unacked_count 1, this tracks ack but doesn't + # finish. + :telemetry.execute( + [:database_producer, :ack, :done], + %{}, + %{ + acked_count: 1, + unacked_count: 1, + processor_marker_key: HydrationProducerSource.processor_marker_key() + } + ) + + # Send an ack done with unacked_count 0, this triggers an index time + # create. + :telemetry.execute( + [:database_producer, :ack, :done], + %{}, + %{ + acked_count: 1, + unacked_count: 0, + processor_marker_key: HydrationProducerSource.processor_marker_key() + } + ) + + [metric = %IndexMetric{}] = IndexMetricsTracker.processor_durations(HydrationProducerSource) + + # Assert + # This is 0 because it takes less than a second to run. + assert metric.duration == 0 + assert metric.records_acked == 3 + end + end +end diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index a6134ce0..68b72741 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -1,9 +1,10 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do + alias DpulCollections.IndexingPipeline.Figgy.HydrationProducerSource use DpulCollections.DataCase alias DpulCollections.Repo alias DpulCollections.IndexingPipeline.Figgy - alias DpulCollections.{IndexingPipeline, Solr} + alias DpulCollections.{IndexingPipeline, Solr, IndexMetricsTracker} import SolrTestSupport setup do @@ -50,6 +51,18 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do {Figgy.HydrationConsumer, cache_version: cache_version, batch_size: 50} ] + test_pid = self() + + :ok = + :telemetry.attach( + "hydration-full-run", + [:dpulc, :indexing_pipeline, :hydrator, :time_to_poll], + fn _, measurements, _, _ -> + send(test_pid, {:hydrator_time_to_poll_hit, measurements}) + end, + nil + ) + Supervisor.start_link(children, strategy: :one_for_one, name: DpulCollections.TestSupervisor) task = @@ -146,6 +159,11 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do assert hydration_entry.cache_order != hydration_entry_again.cache_order Supervisor.stop(DpulCollections.TestSupervisor, :normal) + + # Ensure metrics are being sent. + assert_receive {:hydrator_time_to_poll_hit, %{duration: _}} + [hydration_metric_1 | _] = IndexMetricsTracker.processor_durations(HydrationProducerSource) + assert hydration_metric_1.duration > 0 end test "indexes expected fields" do diff --git a/test/dpul_collections_web/indexing_pipeline/dashboard_page_test.exs b/test/dpul_collections_web/indexing_pipeline/dashboard_page_test.exs new file mode 100644 index 00000000..a798649c --- /dev/null +++ b/test/dpul_collections_web/indexing_pipeline/dashboard_page_test.exs @@ -0,0 +1,48 @@ +defmodule DpuLCollectionsWeb.IndexingPipeline.DashboardPageTest do + alias DpulCollections.IndexingPipeline.Figgy.IndexingProducerSource + alias DpulCollections.IndexingPipeline.Figgy.TransformationProducerSource + alias DpulCollections.IndexingPipeline.Figgy.HydrationProducerSource + alias DpulCollections.IndexingPipeline.Metrics + use DpulCollectionsWeb.ConnCase + import Phoenix.LiveViewTest + @endpoint DpulCollectionsWeb.Endpoint + + test "GET /dev/dashboard/index_metrics", %{conn: conn} do + Metrics.create_index_metric(%{ + type: HydrationProducerSource.processor_marker_key(), + measurement_type: "full_index", + duration: 0, + records_acked: 20 + }) + + Metrics.create_index_metric(%{ + type: TransformationProducerSource.processor_marker_key(), + measurement_type: "full_index", + duration: 10, + records_acked: 20 + }) + + Metrics.create_index_metric(%{ + type: IndexingProducerSource.processor_marker_key(), + measurement_type: "full_index", + duration: 200, + records_acked: 60 + }) + + {:ok, view, html} = + conn + |> put_req_header("authorization", "Basic " <> Base.encode64("admin:test")) + |> get(~p"/dev/dashboard/index_metrics") + |> live + + assert html =~ "Hydration Metric Times" + assert html =~ "Transformation Metric Times" + assert html =~ "Indexing Metric Times" + assert has_element?(view, "td.hydration-table-per_second", "20") + assert has_element?(view, "td.hydration-table-duration", "00:00:00") + assert has_element?(view, "td.transformation-table-per_second", "2") + assert has_element?(view, "td.transformation-table-duration", "00:00:10") + assert has_element?(view, "td.indexing-table-per_second", "0.3") + assert has_element?(view, "td.indexing-table-duration", "00:03:20") + end +end