From e10796bb90a5bc7444f2d85b8475573d17778742 Mon Sep 17 00:00:00 2001 From: Anna Headley <845363+hackartisan@users.noreply.github.com> Date: Tue, 19 Nov 2024 15:25:16 -0500 Subject: [PATCH 1/3] Compare indexing progress using the figgy timestamp refs #104 --- docs/admin/clean_index.md | 2 +- lib/dpul_collections/indexing_pipeline.ex | 4 ++++ lib/dpul_collections/indexing_pipeline/coherence.ex | 13 +++++++++++-- .../indexing_pipeline/coherence_test.exs | 9 ++++++--- 4 files changed, 22 insertions(+), 6 deletions(-) diff --git a/docs/admin/clean_index.md b/docs/admin/clean_index.md index 8e459192..f5716fb6 100644 --- a/docs/admin/clean_index.md +++ b/docs/admin/clean_index.md @@ -30,4 +30,4 @@ When you're ready to use the new index, connect to a iex console on the indexer ## Step 3: stop and clean up the old index -When you're ready to delete the old index, remove its configuration from the `index_cache_collections` variable and deploy. Then you can connect to the indexer node and run `DpulCollections.Solr.delete_collection/1` with the old collection name. To delete all the database entries for that cache version use `IndexingPipeline.delete_cache_version/1`. +When you're ready to delete the old index, remove its configuration from the `index_cache_collections` variable and deploy. Then you can connect to the indexer node and run `DpulCollections.Solr.delete_collection/1` with the old collection name. To delete all the database entries for that cache version use `DpulCollections.IndexingPipeline.delete_cache_version/1`. diff --git a/lib/dpul_collections/indexing_pipeline.ex b/lib/dpul_collections/indexing_pipeline.ex index f928152e..e70c5338 100644 --- a/lib/dpul_collections/indexing_pipeline.ex +++ b/lib/dpul_collections/indexing_pipeline.ex @@ -38,6 +38,10 @@ defmodule DpulCollections.IndexingPipeline do """ def get_hydration_cache_entry!(id), do: Repo.get!(Figgy.HydrationCacheEntry, id) + def get_hydration_cache_entry!(record_id, cache_version) do + Repo.get_by(Figgy.HydrationCacheEntry, record_id: record_id, cache_version: cache_version) + end + @doc """ Deletes a hydration_cache_entry. diff --git a/lib/dpul_collections/indexing_pipeline/coherence.ex b/lib/dpul_collections/indexing_pipeline/coherence.ex index d5450c53..60e7a874 100644 --- a/lib/dpul_collections/indexing_pipeline/coherence.ex +++ b/lib/dpul_collections/indexing_pipeline/coherence.ex @@ -10,8 +10,17 @@ defmodule DpulCollections.IndexingPipeline.Coherence do IndexingPipeline.get_processor_marker!("figgy_indexer", pipeline[:cache_version]) end) - version_sorted = Enum.sort_by(pms, & &1.cache_version) - date_sorted = Enum.sort_by(pms, & &1.cache_location, DateTime) + # the cache_location in a processor marker isn't consistent between + # cache versions -- it's just a timestamp. So we pull the hydration + # cache entries and compare them based on the figgy timestamp itself + hydration_entries = + pms + |> Enum.map(fn marker -> + IndexingPipeline.get_hydration_cache_entry!(marker.cache_record_id, marker.cache_version) + end) + + version_sorted = Enum.sort_by(hydration_entries, & &1.cache_version) + date_sorted = Enum.sort_by(hydration_entries, & &1.source_cache_order, DateTime) version_sorted == date_sorted end diff --git a/test/dpul_collections/indexing_pipeline/coherence_test.exs b/test/dpul_collections/indexing_pipeline/coherence_test.exs index ce5b8ee6..ac4467bf 100644 --- a/test/dpul_collections/indexing_pipeline/coherence_test.exs +++ b/test/dpul_collections/indexing_pipeline/coherence_test.exs @@ -21,7 +21,8 @@ defmodule DpulCollections.IndexingPipeline.CoherenceTest do end test "index_parity?/0 is false when the old index is fresher than the new index" do - {marker1, marker2, _marker3} = FiggyTestFixtures.transformation_cache_markers() + {marker1, marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers(1) + FiggyTestFixtures.hydration_cache_markers(2) IndexingPipeline.write_processor_marker(%{ type: "figgy_indexer", @@ -41,7 +42,8 @@ defmodule DpulCollections.IndexingPipeline.CoherenceTest do end test "index_parity?/0 is true when the new index is fresher than the old index" do - {marker1, marker2, _marker3} = FiggyTestFixtures.transformation_cache_markers() + {marker1, marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers(1) + FiggyTestFixtures.hydration_cache_markers(2) IndexingPipeline.write_processor_marker(%{ type: "figgy_indexer", @@ -61,7 +63,8 @@ defmodule DpulCollections.IndexingPipeline.CoherenceTest do end test "index_parity?/0 is true when the new index and the old index have equal freshness" do - {marker1, _marker2, _marker3} = FiggyTestFixtures.transformation_cache_markers() + {marker1, marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers(1) + FiggyTestFixtures.hydration_cache_markers(2) IndexingPipeline.write_processor_marker(%{ type: "figgy_indexer", From 837735fc2af1e02b4682a63e984644d32f02227e Mon Sep 17 00:00:00 2001 From: Anna Headley <845363+hackartisan@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:13:18 -0500 Subject: [PATCH 2/3] Split up index_parity logic for clarity --- .../indexing_pipeline/coherence.ex | 50 ++++++++++++------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline/coherence.ex b/lib/dpul_collections/indexing_pipeline/coherence.ex index 60e7a874..3b23424e 100644 --- a/lib/dpul_collections/indexing_pipeline/coherence.ex +++ b/lib/dpul_collections/indexing_pipeline/coherence.ex @@ -4,24 +4,10 @@ defmodule DpulCollections.IndexingPipeline.Coherence do @spec index_parity?() :: boolean() def index_parity?() do - pms = + index_progress_summaries = Application.fetch_env!(:dpul_collections, DpulCollections.IndexingPipeline) - |> Enum.map(fn pipeline -> - IndexingPipeline.get_processor_marker!("figgy_indexer", pipeline[:cache_version]) - end) - - # the cache_location in a processor marker isn't consistent between - # cache versions -- it's just a timestamp. So we pull the hydration - # cache entries and compare them based on the figgy timestamp itself - hydration_entries = - pms - |> Enum.map(fn marker -> - IndexingPipeline.get_hydration_cache_entry!(marker.cache_record_id, marker.cache_version) - end) - - version_sorted = Enum.sort_by(hydration_entries, & &1.cache_version) - date_sorted = Enum.sort_by(hydration_entries, & &1.source_cache_order, DateTime) - version_sorted == date_sorted + |> Enum.map(&index_progress_summary/1) + |> is_new_cache_caught_up?() end @spec document_count_report() :: map() @@ -35,4 +21,34 @@ defmodule DpulCollections.IndexingPipeline.Coherence do } end) end + + # Check the processor marker for the most recently indexed record. + # Get the figgy timestamp for that record from its hydration cache entry. + defp index_progress_summary(cache_version: cache_version, write_collection: _) do + marker = IndexingPipeline.get_processor_marker!("figgy_indexer", cache_version) + + hydration_entry = + IndexingPipeline.get_hydration_cache_entry!( + marker.cache_record_id, + cache_version + ) + + %{figgy_timestamp: hydration_entry.source_cache_order, cache_version: cache_version} + end + + # if both indexes have hit the same figgy timestamp, we're caught up + defp is_new_cache_caught_up?([ + %{figgy_timestamp: timestamps_are_equal}, + %{figgy_timestamp: timestamps_are_equal} + ]) do + true + end + + # otherwise, if the more recent index has passed the older index, we're caught + # up + defp is_new_cache_caught_up?(index_progress_summaries) do + version_sorted = Enum.sort_by(index_progress_summaries, & &1.cache_version) + date_sorted = Enum.sort_by(index_progress_summaries, & &1.figgy_timestamp, DateTime) + version_sorted == date_sorted + end end From 756e5a9899f8b3467fb35dd30a41b2bf0e3c5f56 Mon Sep 17 00:00:00 2001 From: Anna Headley <845363+hackartisan@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:15:32 -0500 Subject: [PATCH 3/3] Remove extraneous variable --- lib/dpul_collections/indexing_pipeline/coherence.ex | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline/coherence.ex b/lib/dpul_collections/indexing_pipeline/coherence.ex index 3b23424e..f3a146a4 100644 --- a/lib/dpul_collections/indexing_pipeline/coherence.ex +++ b/lib/dpul_collections/indexing_pipeline/coherence.ex @@ -4,10 +4,9 @@ defmodule DpulCollections.IndexingPipeline.Coherence do @spec index_parity?() :: boolean() def index_parity?() do - index_progress_summaries = - Application.fetch_env!(:dpul_collections, DpulCollections.IndexingPipeline) - |> Enum.map(&index_progress_summary/1) - |> is_new_cache_caught_up?() + Application.fetch_env!(:dpul_collections, DpulCollections.IndexingPipeline) + |> Enum.map(&index_progress_summary/1) + |> is_new_cache_caught_up?() end @spec document_count_report() :: map()