Skip to content

Commit

Permalink
fix: reindex-algolia - include UUIDs from ALL related objects
Browse files Browse the repository at this point in the history
Prior to this change we essentially looped over courses and appended
their UUIDs to related parent objects (programs and pathways), but now
we have to do this inverse: loop over higher level objects and absorb
UUIDs from all child objects.  This is a fundamental change to the
access pattern that would normally have no impact to the output since it
just changes the order in which UUIDs are accumulated; in the old world
where we accumulate all the UUIDs into a giant 16GB+ dictionary object,
the order they are added doesn't matter because they'd all get there by
the end.  However, now without being able to accumulate all UUIDs
in-memory, we have to essentially change the UUID access pattern to be
insensitive to constantly clearing the memory between batches.
  • Loading branch information
pwnage101 committed Jul 10, 2023
1 parent 620f1b4 commit 54a8bc9
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 115 deletions.
211 changes: 139 additions & 72 deletions enterprise_catalog/apps/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from enterprise_catalog.apps.catalog.constants import (
COURSE,
COURSE_RUN,
DISCOVERY_COURSE_KEY_BATCH_SIZE,
DISCOVERY_PROGRAM_KEY_BATCH_SIZE,
LEARNER_PATHWAY,
Expand Down Expand Up @@ -500,20 +501,37 @@ def index_enterprise_catalog_in_algolia_task(self, force=False, dry_run=False):
raise exep


def get_pathways_by_associated_content():
""" Prefetch course id and program id to pathway id mapping. """
pathway_membership_by_course_key = defaultdict(set)
pathway_membership_by_program_key = defaultdict(set)
pathways = ContentMetadata.objects.filter(content_type=LEARNER_PATHWAY).prefetch_related(
def _precalculate_content_mappings():
"""
Precalculate various mappings between different types of related content.
Returns:
2-tuple(dict):
- First element: Mapping of program content_key to list of course run and course ContentMetadata objects.
- Second element: Mapping of learner pathway content_key to list of program and course ContentMetadata
objects.
"""
program_to_courses_mapping = defaultdict(set)
pathway_to_programs_courses_mapping = defaultdict(set)
courses_programs = ContentMetadata.objects.filter(
content_type__in=[COURSE, PROGRAM],
).prefetch_related(
'associated_content_metadata'
)
for pathway in pathways:
for associated_content in pathway.associated_content_metadata.all():
if associated_content.content_type == COURSE:
pathway_membership_by_course_key[associated_content.content_key].add(pathway)
elif associated_content.content_type == PROGRAM:
pathway_membership_by_program_key[associated_content.content_key].add(pathway)
return pathway_membership_by_course_key, pathway_membership_by_program_key
for metadata in courses_programs:
if metadata.content_type == COURSE:
for associated_content in metadata.associated_content_metadata.all():
if associated_content.content_type == PROGRAM:
program_to_courses_mapping[associated_content.content_key].add(metadata)
elif associated_content.content_type == LEARNER_PATHWAY:
pathway_to_programs_courses_mapping[associated_content.content_key].add(metadata)
# This else block represents metadata.content_type == PROGRAM
else:
for associated_content in metadata.associated_content_metadata.all():
if associated_content.content_type == LEARNER_PATHWAY:
pathway_to_programs_courses_mapping[associated_content.content_key].add(metadata)

return program_to_courses_mapping, pathway_to_programs_courses_mapping


def add_metadata_to_algolia_objects(
Expand Down Expand Up @@ -579,8 +597,8 @@ def add_metadata_to_algolia_objects(
def _get_algolia_products_for_batch(
batch_num,
content_keys_batch,
course_to_pathway_mapping,
program_to_pathway_mapping,
program_to_courses_mapping,
pathway_to_programs_courses_mapping,
context_accumulator,
):
"""
Expand All @@ -598,75 +616,119 @@ def _get_algolia_products_for_batch(
catalog_uuids_by_key = defaultdict(set)
customer_uuids_by_key = defaultdict(set)
catalog_queries_by_key = defaultdict(set)
# Retrieve ContentMetadata records for courses, course runs, programs and learner pathways that match the specified
# content_keys in the content_key, parent_content_key or associated content metadata's content_key.
query = (

# Create a shared convenience queryset to prefetch catalogs for all metadata lookups below.
all_catalog_queries = CatalogQuery.objects.prefetch_related('enterprise_catalogs')

# Retrieve ContentMetadata records for:
# * Course runs, courses, programs and learner pathways that are directly requested, and
# * Courses and programs indirectly related to something directly requested.
# - e.g. A course that was not directly requested, but is a member of a program which was requested.
# - e.g. A program that was not directly requested, but is a member of a pathway which was requested.
content_metadata_no_courseruns = ContentMetadata.objects.filter(
# All content (courses, course runs, programs, pathways) directly requested.
Q(content_key__in=content_keys_batch)
| Q(parent_content_key__in=content_keys_batch)
# All course runs, courses, or programs contained in programs or pathways requested.
| Q(
content_type__in=[COURSE_RUN, COURSE, PROGRAM],
associated_content_metadata__content_type__in=[PROGRAM, LEARNER_PATHWAY],
associated_content_metadata__content_key__in=content_keys_batch,
content_type__in=[PROGRAM, LEARNER_PATHWAY]
)
# All programs, pathways to which any requested course belongs.
| Q(
content_type__in=[PROGRAM, LEARNER_PATHWAY],
associated_content_metadata__content_type__in=[COURSE, PROGRAM],
associated_content_metadata__content_key__in=content_keys_batch,
)
).prefetch_related(
Prefetch('catalog_queries', queryset=all_catalog_queries),
)

catalog_queries = CatalogQuery.objects.prefetch_related(
'enterprise_catalogs',
)
associated_programs_query = ContentMetadata.objects.filter(content_type__in=[PROGRAM, LEARNER_PATHWAY])
content_metadata = ContentMetadata.objects.filter(query).prefetch_related(
Prefetch('catalog_queries', queryset=catalog_queries),
Prefetch('associated_content_metadata', queryset=associated_programs_query, to_attr='associated_programs'),
# Retrieve ContentMetadata records for any course run which is part of any course found in the previous query.
course_content_keys = content_metadata_no_courseruns.filter(
content_type=COURSE,
).values_list('content_key', flat=True)
content_metadata_courseruns = ContentMetadata.objects.filter(
parent_content_key__in=course_content_keys
).prefetch_related(
Prefetch('catalog_queries', queryset=all_catalog_queries),
)

# Retrieve the associated enterprise_catalog_uuids and enterprise_customer_uuids for each selected ContentMetadata
# record, storing them for later retrieval.
for metadata in content_metadata:
# Combine both querysets to represent all the ContentMetadata needed to process this batch.
#
# DEFICIENCY: This final set does not guarantee inclusion of courses (or course runs) indirectly related to a
# requested to a pathway via an association chain of course->program->pathway. This maybe should be added! When it
# is added, a related change must be made in the third pass (below) to chain
# `pathway_to_programs_courses_mapping` and `program_to_courses_mapping` to actually collect the UUIDs.
content_metadata_to_process = content_metadata_no_courseruns.union(content_metadata_courseruns)

# First pass over the batch of content. The goal for this pass is to collect all the UUIDs directly associated with
# each content. This DOES NOT capture any UUIDs indirectly related to programs or pathways via associated courses
# or programs.
for metadata in content_metadata_to_process:
if metadata.content_type in (COURSE, PROGRAM, LEARNER_PATHWAY):
content_key = metadata.content_key
else:
# Course runs should contribute their UUIDs to the parent course.
content_key = metadata.parent_content_key

associated_queries = metadata.catalog_queries.all()
enterprise_catalog_uuids = set()
enterprise_customer_uuids = set()
enterprise_catalog_queries = set()
for query in associated_queries:
enterprise_catalog_queries.add((str(query.uuid), query.title))
associated_catalogs = query.enterprise_catalogs.all()
associated_catalog_queries = metadata.catalog_queries.all()
for catalog_query in associated_catalog_queries:
catalog_queries_by_key[content_key].add((str(catalog_query.uuid), catalog_query.title))
# This line is possible thanks to `all_catalog_queries` with the prefectch_related() above.
associated_catalogs = catalog_query.enterprise_catalogs.all()
for catalog in associated_catalogs:
enterprise_catalog_uuids.add(str(catalog.uuid))
enterprise_customer_uuids.add(str(catalog.enterprise_uuid))

# add to any existing enterprise catalog uuids, enterprise customer uuids or catalog query uuids
catalog_uuids_by_key[content_key].update(enterprise_catalog_uuids)
customer_uuids_by_key[content_key].update(enterprise_customer_uuids)
catalog_queries_by_key[content_key].update(enterprise_catalog_queries)

if metadata.content_type == COURSE:
for program_metadata in metadata.associated_programs:
content_key = program_metadata.content_key
catalog_uuids_by_key[content_key].update(enterprise_catalog_uuids)
customer_uuids_by_key[content_key].update(enterprise_customer_uuids)
catalog_queries_by_key[content_key].update(enterprise_catalog_queries)

for pathway in course_to_pathway_mapping[metadata.content_key]:
catalog_queries_by_key[pathway.content_key].update(enterprise_catalog_queries)
catalog_uuids_by_key[pathway.content_key].update(enterprise_catalog_uuids)
customer_uuids_by_key[pathway.content_key].update(enterprise_customer_uuids)

if metadata.content_type == PROGRAM:
for pathway in program_to_pathway_mapping[content_key]:
catalog_queries_by_key[pathway.content_key].update(enterprise_catalog_queries)
catalog_uuids_by_key[pathway.content_key].update(enterprise_catalog_uuids)
customer_uuids_by_key[pathway.content_key].update(enterprise_customer_uuids)
catalog_uuids_by_key[content_key].add(str(catalog.uuid))
customer_uuids_by_key[content_key].add(str(catalog.enterprise_uuid))

# Second pass. This time the goal is to capture indirect relationships on programs:
# * For each program:
# - Absorb all UUIDs associated with every associated course.
for metadata in content_metadata_to_process:
if metadata.content_type != PROGRAM:
continue
program_content_key = metadata.content_key
for metadata in program_to_courses_mapping[program_content_key]:
catalog_queries_by_key[program_content_key].update(catalog_queries_by_key[metadata.content_key])
catalog_uuids_by_key[program_content_key].update(catalog_uuids_by_key[metadata.content_key])
customer_uuids_by_key[program_content_key].update(customer_uuids_by_key[metadata.content_key])

# Third pass. This time the goal is to capture indirect relationships on pathways:
# * For each pathway:
# - Absorb all UUIDs associated with every associated course.
# - Absorb all UUIDs associated with every associated program.
for metadata in content_metadata_to_process:
if metadata.content_type != LEARNER_PATHWAY:
continue
pathway_content_key = metadata.content_key

for metadata in pathway_to_programs_courses_mapping[pathway_content_key]:
catalog_queries_by_key[pathway_content_key].update(catalog_queries_by_key[metadata.content_key])
catalog_uuids_by_key[pathway_content_key].update(catalog_uuids_by_key[metadata.content_key])
customer_uuids_by_key[pathway_content_key].update(customer_uuids_by_key[metadata.content_key])

# Extra disabled logic to additionally absorb UUIDs from courses linked to this pathway indirectly via a
# program (chain of association is course -> program -> pathway). This doesn't work because
# content_metadata_to_process queryset for this batch has insuficcient records to support this feature.
#
# if metadata.content_type == PROGRAM:
# for course_metadata in program_to_courses_mapping[metadata.content_key]:
# catalog_queries_by_key[pathway_content_key].update(
# catalog_queries_by_key[course_metadata.content_key]
# )
# catalog_uuids_by_key[pathway_content_key].update(
# catalog_uuids_by_key[course_metadata.content_key]
# )
# customer_uuids_by_key[pathway_content_key].update(
# customer_uuids_by_key[course_metadata.content_key]
# )

# iterate over courses, programs and pathways and add their metadata to the list of objects to be indexed
filtered_content_metadata = content_metadata.filter(
Q(content_type=COURSE)
| Q(content_type=PROGRAM)
| Q(content_type=LEARNER_PATHWAY)
content_metadata_to_index = (
metadata for metadata in content_metadata_to_process
if metadata.content_type in [COURSE, PROGRAM, LEARNER_PATHWAY]
)
for metadata in filtered_content_metadata:
num_content_metadata_indexed = 0
for metadata in content_metadata_to_index:
# Check if we've indexed the course recently
# (programs/pathways are indexed every time regardless of last indexing)
if _was_recently_indexed(metadata.content_key) and metadata.content_type not in [PROGRAM, LEARNER_PATHWAY]:
Expand All @@ -684,6 +746,8 @@ def _get_algolia_products_for_batch(
catalog_queries_by_key[metadata.content_key],
)

num_content_metadata_indexed += 1

# In case there are multiple CourseMetadata records that share the exact same content_uuid (which would cause an
# algolia objectID collision), do not send more than one. Note that selection of duplicate content is
# non-deterministic because we do not use order_by() on the queryset.
Expand All @@ -703,8 +767,8 @@ def _get_algolia_products_for_batch(
logger.info(
f'[ENTERPRISE_CATALOG_ALGOLIA_REINDEX] batch#{batch_num}: '
f'{len(content_keys_batch)} content keys, '
f'{content_metadata.count()} content metadata found, '
f'{filtered_content_metadata.count()} content metadata processed, '
f'{content_metadata_to_process.count()} content metadata found, '
f'{num_content_metadata_indexed} content metadata indexed, '
f'{len(algolia_products_by_object_id)} generated algolia products kept, '
f'{duplicate_algolia_records_discarded} generated algolia products discarded.'
)
Expand Down Expand Up @@ -733,7 +797,10 @@ def _index_content_keys_in_algolia(content_keys, algolia_client):
f'[ENTERPRISE_CATALOG_ALGOLIA_REINDEX] There are {len(content_keys)} total content keys to include in the'
f' Algolia index.'
)
course_to_pathway_mapping, program_to_pathway_mapping = get_pathways_by_associated_content()
(
program_to_courses_mapping,
pathway_to_programs_courses_mapping,
) = _precalculate_content_mappings()
context_accumulator = {
'total_algolia_products_count': 0,
'discarded_algolia_object_ids': defaultdict(int),
Expand All @@ -744,8 +811,8 @@ def _index_content_keys_in_algolia(content_keys, algolia_client):
_get_algolia_products_for_batch(
batch_num,
content_keys_batch,
course_to_pathway_mapping,
program_to_pathway_mapping,
program_to_courses_mapping,
pathway_to_programs_courses_mapping,
context_accumulator,
)
for batch_num, content_keys_batch
Expand Down
Loading

0 comments on commit 54a8bc9

Please sign in to comment.