Skip to content

Commit

Permalink
Merge pull request #250 from pulibrary/104-figgy-timestamp
Browse files Browse the repository at this point in the history
Compare indexing progress using the figgy timestamp
  • Loading branch information
tpendragon authored Nov 19, 2024
2 parents 6f1609b + 756e5a9 commit 76b5456
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/admin/clean_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ When you're ready to use the new index, connect to a iex console on the indexer

## Step 3: stop and clean up the old index

When you're ready to delete the old index, remove its configuration from the `index_cache_collections` variable and deploy. Then you can connect to the indexer node and run `DpulCollections.Solr.delete_collection/1` with the old collection name. To delete all the database entries for that cache version use `IndexingPipeline.delete_cache_version/1`.
When you're ready to delete the old index, remove its configuration from the `index_cache_collections` variable and deploy. Then you can connect to the indexer node and run `DpulCollections.Solr.delete_collection/1` with the old collection name. To delete all the database entries for that cache version use `DpulCollections.IndexingPipeline.delete_cache_version/1`.
4 changes: 4 additions & 0 deletions lib/dpul_collections/indexing_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ defmodule DpulCollections.IndexingPipeline do
"""
def get_hydration_cache_entry!(id), do: Repo.get!(Figgy.HydrationCacheEntry, id)

def get_hydration_cache_entry!(record_id, cache_version) do
Repo.get_by(Figgy.HydrationCacheEntry, record_id: record_id, cache_version: cache_version)
end

@doc """
Deletes a hydration_cache_entry.
Expand Down
42 changes: 33 additions & 9 deletions lib/dpul_collections/indexing_pipeline/coherence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,9 @@ defmodule DpulCollections.IndexingPipeline.Coherence do

@spec index_parity?() :: boolean()
def index_parity?() do
pms =
Application.fetch_env!(:dpul_collections, DpulCollections.IndexingPipeline)
|> Enum.map(fn pipeline ->
IndexingPipeline.get_processor_marker!("figgy_indexer", pipeline[:cache_version])
end)

version_sorted = Enum.sort_by(pms, & &1.cache_version)
date_sorted = Enum.sort_by(pms, & &1.cache_location, DateTime)
version_sorted == date_sorted
Application.fetch_env!(:dpul_collections, DpulCollections.IndexingPipeline)
|> Enum.map(&index_progress_summary/1)
|> is_new_cache_caught_up?()
end

@spec document_count_report() :: map()
Expand All @@ -26,4 +20,34 @@ defmodule DpulCollections.IndexingPipeline.Coherence do
}
end)
end

# Check the processor marker for the most recently indexed record.
# Get the figgy timestamp for that record from its hydration cache entry.
defp index_progress_summary(cache_version: cache_version, write_collection: _) do
marker = IndexingPipeline.get_processor_marker!("figgy_indexer", cache_version)

hydration_entry =
IndexingPipeline.get_hydration_cache_entry!(
marker.cache_record_id,
cache_version
)

%{figgy_timestamp: hydration_entry.source_cache_order, cache_version: cache_version}
end

# if both indexes have hit the same figgy timestamp, we're caught up
defp is_new_cache_caught_up?([
%{figgy_timestamp: timestamps_are_equal},
%{figgy_timestamp: timestamps_are_equal}
]) do
true
end

# otherwise, if the more recent index has passed the older index, we're caught
# up
defp is_new_cache_caught_up?(index_progress_summaries) do
version_sorted = Enum.sort_by(index_progress_summaries, & &1.cache_version)
date_sorted = Enum.sort_by(index_progress_summaries, & &1.figgy_timestamp, DateTime)
version_sorted == date_sorted
end
end
9 changes: 6 additions & 3 deletions test/dpul_collections/indexing_pipeline/coherence_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ defmodule DpulCollections.IndexingPipeline.CoherenceTest do
end

test "index_parity?/0 is false when the old index is fresher than the new index" do
{marker1, marker2, _marker3} = FiggyTestFixtures.transformation_cache_markers()
{marker1, marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers(1)
FiggyTestFixtures.hydration_cache_markers(2)

IndexingPipeline.write_processor_marker(%{
type: "figgy_indexer",
Expand All @@ -41,7 +42,8 @@ defmodule DpulCollections.IndexingPipeline.CoherenceTest do
end

test "index_parity?/0 is true when the new index is fresher than the old index" do
{marker1, marker2, _marker3} = FiggyTestFixtures.transformation_cache_markers()
{marker1, marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers(1)
FiggyTestFixtures.hydration_cache_markers(2)

IndexingPipeline.write_processor_marker(%{
type: "figgy_indexer",
Expand All @@ -61,7 +63,8 @@ defmodule DpulCollections.IndexingPipeline.CoherenceTest do
end

test "index_parity?/0 is true when the new index and the old index have equal freshness" do
{marker1, _marker2, _marker3} = FiggyTestFixtures.transformation_cache_markers()
{marker1, marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers(1)
FiggyTestFixtures.hydration_cache_markers(2)

IndexingPipeline.write_processor_marker(%{
type: "figgy_indexer",
Expand Down

0 comments on commit 76b5456

Please sign in to comment.