Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compare indexing progress using the figgy timestamp #250

Merged
merged 3 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/admin/clean_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ When you're ready to use the new index, connect to a iex console on the indexer

## Step 3: stop and clean up the old index

When you're ready to delete the old index, remove its configuration from the `index_cache_collections` variable and deploy. Then you can connect to the indexer node and run `DpulCollections.Solr.delete_collection/1` with the old collection name. To delete all the database entries for that cache version use `IndexingPipeline.delete_cache_version/1`.
When you're ready to delete the old index, remove its configuration from the `index_cache_collections` variable and deploy. Then you can connect to the indexer node and run `DpulCollections.Solr.delete_collection/1` with the old collection name. To delete all the database entries for that cache version use `DpulCollections.IndexingPipeline.delete_cache_version/1`.
4 changes: 4 additions & 0 deletions lib/dpul_collections/indexing_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ defmodule DpulCollections.IndexingPipeline do
"""
def get_hydration_cache_entry!(id), do: Repo.get!(Figgy.HydrationCacheEntry, id)

def get_hydration_cache_entry!(record_id, cache_version) do
Repo.get_by(Figgy.HydrationCacheEntry, record_id: record_id, cache_version: cache_version)
end

@doc """
Deletes a hydration_cache_entry.

Expand Down
42 changes: 33 additions & 9 deletions lib/dpul_collections/indexing_pipeline/coherence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,9 @@ defmodule DpulCollections.IndexingPipeline.Coherence do

@spec index_parity?() :: boolean()
def index_parity?() do
pms =
Application.fetch_env!(:dpul_collections, DpulCollections.IndexingPipeline)
|> Enum.map(fn pipeline ->
IndexingPipeline.get_processor_marker!("figgy_indexer", pipeline[:cache_version])
end)

version_sorted = Enum.sort_by(pms, & &1.cache_version)
date_sorted = Enum.sort_by(pms, & &1.cache_location, DateTime)
version_sorted == date_sorted
Application.fetch_env!(:dpul_collections, DpulCollections.IndexingPipeline)
|> Enum.map(&index_progress_summary/1)
|> is_new_cache_caught_up?()
end

@spec document_count_report() :: map()
Expand All @@ -26,4 +20,34 @@ defmodule DpulCollections.IndexingPipeline.Coherence do
}
end)
end

# Check the processor marker for the most recently indexed record.
# Get the figgy timestamp for that record from its hydration cache entry.
defp index_progress_summary(cache_version: cache_version, write_collection: _) do
marker = IndexingPipeline.get_processor_marker!("figgy_indexer", cache_version)

hydration_entry =
IndexingPipeline.get_hydration_cache_entry!(
marker.cache_record_id,
cache_version
)

%{figgy_timestamp: hydration_entry.source_cache_order, cache_version: cache_version}
end

# if both indexes have hit the same figgy timestamp, we're caught up
defp is_new_cache_caught_up?([
%{figgy_timestamp: timestamps_are_equal},
%{figgy_timestamp: timestamps_are_equal}
]) do
true
end

# otherwise, if the more recent index has passed the older index, we're caught
# up
defp is_new_cache_caught_up?(index_progress_summaries) do
version_sorted = Enum.sort_by(index_progress_summaries, & &1.cache_version)
date_sorted = Enum.sort_by(index_progress_summaries, & &1.figgy_timestamp, DateTime)
version_sorted == date_sorted
end
end
9 changes: 6 additions & 3 deletions test/dpul_collections/indexing_pipeline/coherence_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ defmodule DpulCollections.IndexingPipeline.CoherenceTest do
end

test "index_parity?/0 is false when the old index is fresher than the new index" do
{marker1, marker2, _marker3} = FiggyTestFixtures.transformation_cache_markers()
{marker1, marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers(1)
FiggyTestFixtures.hydration_cache_markers(2)

IndexingPipeline.write_processor_marker(%{
type: "figgy_indexer",
Expand All @@ -41,7 +42,8 @@ defmodule DpulCollections.IndexingPipeline.CoherenceTest do
end

test "index_parity?/0 is true when the new index is fresher than the old index" do
{marker1, marker2, _marker3} = FiggyTestFixtures.transformation_cache_markers()
{marker1, marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers(1)
FiggyTestFixtures.hydration_cache_markers(2)

IndexingPipeline.write_processor_marker(%{
type: "figgy_indexer",
Expand All @@ -61,7 +63,8 @@ defmodule DpulCollections.IndexingPipeline.CoherenceTest do
end

test "index_parity?/0 is true when the new index and the old index have equal freshness" do
{marker1, _marker2, _marker3} = FiggyTestFixtures.transformation_cache_markers()
{marker1, marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers(1)
FiggyTestFixtures.hydration_cache_markers(2)

IndexingPipeline.write_processor_marker(%{
type: "figgy_indexer",
Expand Down
Loading