diff --git a/dandiapi/analytics/tasks/__init__.py b/dandiapi/analytics/tasks/__init__.py index 1e1ba3d16..3314bc305 100644 --- a/dandiapi/analytics/tasks/__init__.py +++ b/dandiapi/analytics/tasks/__init__.py @@ -10,6 +10,7 @@ from django.db.models.aggregates import Max from django.db.models.expressions import F from django.db.utils import IntegrityError +from more_itertools import batched from s3logparse import s3logparse from dandiapi.analytics.models import ProcessedS3Log @@ -80,10 +81,19 @@ def process_s3_log_file_task(s3_log_key: str) -> None: logger.info('Already processed log file %s', s3_log_key) return + # we need to store all of the fully hydrated blob objects in memory in order to use + # bulk_update, but this turns out to not be very costly. 1,000 blobs use about ~8kb + # of memory. + asset_blobs = [] + + # batch the blob queries to avoid a large WHERE IN clause + for batch in batched(download_counts, 1_000): + asset_blobs += AssetBlob.objects.filter(blob__in=batch) + + for asset_blob in asset_blobs: + asset_blob.download_count = F('download_count') + download_counts[asset_blob.blob] + # note this task is run serially per log file. this is to avoid the contention between # multiple log files trying to update the same blobs. this serialization is enforced through # the task queue configuration. - for blob, download_count in download_counts.items(): - AssetBlob.objects.filter(blob=blob).update( - download_count=F('download_count') + download_count - ) + AssetBlob.objects.bulk_update(asset_blobs, ['download_count'], batch_size=1_000)