Skip to content

Commit

Permalink
Merge pull request #2069 from dandi/zarrbargo
Browse files Browse the repository at this point in the history
Add API support for Embargoed Zarrs
  • Loading branch information
jjnesbitt authored Nov 13, 2024
2 parents d8fe661 + 4277b10 commit 8665adc
Show file tree
Hide file tree
Showing 15 changed files with 399 additions and 137 deletions.
18 changes: 9 additions & 9 deletions dandiapi/api/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ def is_blob(self):
def is_zarr(self):
return self.zarr is not None

@property
def is_embargoed(self) -> bool:
if self.blob is not None:
return self.blob.embargoed

return self.zarr.embargoed # type: ignore # noqa: PGH003

@property
def size(self):
if self.is_blob:
Expand Down Expand Up @@ -195,7 +202,7 @@ def is_different_from(
metadata: dict,
path: str,
) -> bool:
from dandiapi.zarr.models import EmbargoedZarrArchive, ZarrArchive
from dandiapi.zarr.models import ZarrArchive

if isinstance(asset_blob, AssetBlob) and self.blob is not None and self.blob != asset_blob:
return True
Expand All @@ -207,9 +214,6 @@ def is_different_from(
):
return True

if isinstance(zarr_archive, EmbargoedZarrArchive):
raise NotImplementedError

if self.path != path:
return True

Expand All @@ -234,9 +238,8 @@ def full_metadata(self):
'access': [
{
'schemaKey': 'AccessRequirements',
# TODO: When embargoed zarrs land, include that logic here
'status': AccessType.EmbargoedAccess.value
if self.blob and self.blob.embargoed
if self.is_embargoed
else AccessType.OpenAccess.value,
}
],
Expand Down Expand Up @@ -283,7 +286,4 @@ def total_size(cls):
.aggregate(size=models.Sum('size'))['size']
or 0
for cls in (AssetBlob, ZarrArchive)
# adding of Zarrs to embargoed dandisets is not supported
# so no point of adding EmbargoedZarr here since would also result in error
# TODO: add EmbagoedZarr whenever supported
)
9 changes: 5 additions & 4 deletions dandiapi/api/services/asset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,16 @@ def add_asset_to_version(
raise ZarrArchiveBelongsToDifferentDandisetError

with transaction.atomic():
# Creating an asset in an OPEN dandiset that points to an embargoed blob results in that
# blob being unembargoed
# Creating an asset in an OPEN dandiset that points to an
# embargoed blob results in that blob being unembargoed.
# NOTE: This only applies to asset blobs, as zarrs cannot belong to
# multiple dandisets at once.
if (
asset_blob is not None
and asset_blob.embargoed
and version.dandiset.embargo_status == Dandiset.EmbargoStatus.OPEN
):
asset_blob.embargoed = False
asset_blob.save()
AssetBlob.objects.filter(blob_id=asset_blob.blob_id).update(embargoed=False)
transaction.on_commit(
lambda: remove_asset_blob_embargoed_tag_task.delay(blob_id=asset_blob.blob_id)
)
Expand Down
53 changes: 11 additions & 42 deletions dandiapi/api/services/embargo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,32 @@
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
import logging
from typing import TYPE_CHECKING

from botocore.config import Config
from django.conf import settings
from django.db import transaction
from more_itertools import chunked

from dandiapi.api.mail import send_dandiset_unembargoed_message
from dandiapi.api.models import AssetBlob, Dandiset, Version
from dandiapi.api.services import audit
from dandiapi.api.services.asset.exceptions import DandisetOwnerRequiredError
from dandiapi.api.services.embargo.utils import _delete_object_tags, remove_dandiset_embargo_tags
from dandiapi.api.services.exceptions import DandiError
from dandiapi.api.services.metadata import validate_version_metadata
from dandiapi.api.storage import get_boto_client
from dandiapi.api.tasks import unembargo_dandiset_task
from dandiapi.zarr.models import ZarrArchive

from .exceptions import (
AssetBlobEmbargoedError,
AssetTagRemovalError,
DandisetActiveUploadsError,
DandisetNotEmbargoedError,
)

if TYPE_CHECKING:
from django.contrib.auth.models import User
from mypy_boto3_s3 import S3Client


logger = logging.getLogger(__name__)
ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE = 5000


def _delete_asset_blob_tags(client: S3Client, blob: str):
client.delete_object_tagging(
Bucket=settings.DANDI_DANDISETS_BUCKET_NAME,
Key=blob,
)


# NOTE: In testing this took ~2 minutes for 100,000 files
def _remove_dandiset_asset_blob_embargo_tags(dandiset: Dandiset):
client = get_boto_client(config=Config(max_pool_connections=100))
embargoed_asset_blobs = (
AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=dandiset)
.values_list('blob', flat=True)
.iterator(chunk_size=ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE)
)

# Chunk the blobs so we're never storing a list of all embargoed blobs
chunks = chunked(embargoed_asset_blobs, ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE)
for chunk in chunks:
with ThreadPoolExecutor(max_workers=100) as e:
futures = [
e.submit(_delete_asset_blob_tags, client=client, blob=blob) for blob in chunk
]

# Check if any failed and raise exception if so
failed = [blob for i, blob in enumerate(chunk) if futures[i].exception() is not None]
if failed:
raise AssetTagRemovalError('Some blobs failed to remove tags', blobs=failed)


@transaction.atomic()
Expand All @@ -80,13 +45,17 @@ def unembargo_dandiset(ds: Dandiset, user: User):

# Remove tags in S3
logger.info('Removing tags...')
_remove_dandiset_asset_blob_embargo_tags(ds)
remove_dandiset_embargo_tags(ds)

# Update embargoed flag on asset blobs
updated = AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=ds).update(
# Update embargoed flag on asset blobs and zarrs
updated_blobs = AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=ds).update(
embargoed=False
)
logger.info('Updated %s asset blobs', updated)
updated_zarrs = ZarrArchive.objects.filter(
embargoed=True, assets__versions__dandiset=ds
).update(embargoed=False)
logger.info('Updated %s asset blobs', updated_blobs)
logger.info('Updated %s zarrs', updated_zarrs)

# Set status to OPEN
Dandiset.objects.filter(pk=ds.pk).update(embargo_status=Dandiset.EmbargoStatus.OPEN)
Expand Down Expand Up @@ -118,7 +87,7 @@ def remove_asset_blob_embargoed_tag(asset_blob: AssetBlob) -> None:
if asset_blob.embargoed:
raise AssetBlobEmbargoedError

_delete_asset_blob_tags(client=get_boto_client(), blob=asset_blob.blob.name)
_delete_object_tags(client=get_boto_client(), blob=asset_blob.blob.name)


def kickoff_dandiset_unembargo(*, user: User, dandiset: Dandiset):
Expand Down
103 changes: 103 additions & 0 deletions dandiapi/api/services/embargo/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
import logging
from typing import TYPE_CHECKING

from botocore.config import Config
from django.conf import settings
from django.db.models import Q
from more_itertools import chunked

from dandiapi.api.models.asset import Asset
from dandiapi.api.storage import get_boto_client
from dandiapi.zarr.models import zarr_s3_path

from .exceptions import AssetTagRemovalError

if TYPE_CHECKING:
from mypy_boto3_s3 import S3Client

from dandiapi.api.models.dandiset import Dandiset


logger = logging.getLogger(__name__)
TAG_REMOVAL_CHUNK_SIZE = 5000


def retry(times: int, exceptions: tuple[type[Exception]]):
"""
Retry Decorator.
Retries the wrapped function/method `times` times if the exceptions listed
in ``exceptions`` are thrown
:param times: The number of times to repeat the wrapped function/method
:param exceptions: Lists of exceptions that trigger a retry attempt
"""

def decorator(func):
def newfn(*args, **kwargs):
attempt = 0
while attempt < times:
try:
return func(*args, **kwargs)
except exceptions:
attempt += 1
return func(*args, **kwargs)

return newfn

return decorator


@retry(times=3, exceptions=(Exception,))
def _delete_object_tags(client: S3Client, blob: str):
client.delete_object_tagging(
Bucket=settings.DANDI_DANDISETS_BUCKET_NAME,
Key=blob,
)


@retry(times=3, exceptions=(Exception,))
def _delete_zarr_object_tags(client: S3Client, zarr: str):
paginator = client.get_paginator('list_objects_v2')
pages = paginator.paginate(
Bucket=settings.DANDI_DANDISETS_BUCKET_NAME, Prefix=zarr_s3_path(zarr_id=zarr)
)

with ThreadPoolExecutor(max_workers=100) as e:
for page in pages:
keys = [obj['Key'] for obj in page.get('Contents', [])]
futures = [e.submit(_delete_object_tags, client=client, blob=key) for key in keys]

# Check if any failed and raise exception if so
failed = [key for i, key in enumerate(keys) if futures[i].exception() is not None]
if failed:
raise AssetTagRemovalError('Some zarr files failed to remove tags', blobs=failed)


def remove_dandiset_embargo_tags(dandiset: Dandiset):
client = get_boto_client(config=Config(max_pool_connections=100))
embargoed_assets = (
Asset.objects.filter(versions__dandiset=dandiset)
.filter(Q(blob__embargoed=True) | Q(zarr__embargoed=True))
.values_list('blob__blob', 'zarr__zarr_id')
.iterator(chunk_size=TAG_REMOVAL_CHUNK_SIZE)
)

# Chunk the blobs so we're never storing a list of all embargoed blobs
chunks = chunked(embargoed_assets, TAG_REMOVAL_CHUNK_SIZE)
for chunk in chunks:
futures = []
with ThreadPoolExecutor(max_workers=100) as e:
for blob, zarr in chunk:
if blob is not None:
futures.append(e.submit(_delete_object_tags, client=client, blob=blob))
if zarr is not None:
futures.append(e.submit(_delete_zarr_object_tags, client=client, zarr=zarr))

# Check if any failed and raise exception if so
failed = [blob for i, blob in enumerate(chunk) if futures[i].exception() is not None]
if failed:
raise AssetTagRemovalError('Some assets failed to remove tags', blobs=failed)
2 changes: 1 addition & 1 deletion dandiapi/api/services/publish/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def _lock_dandiset_for_publishing(*, user: User, dandiset: Dandiset) -> None: #
if dandiset.embargo_status != Dandiset.EmbargoStatus.OPEN:
raise NotAllowedError('Operation only allowed on OPEN dandisets', 400)

if dandiset.zarr_archives.exists() or dandiset.embargoed_zarr_archives.exists():
if dandiset.zarr_archives.exists():
raise NotAllowedError('Cannot publish dandisets which contain zarrs', 400)

with transaction.atomic():
Expand Down
73 changes: 70 additions & 3 deletions dandiapi/api/tests/test_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,6 @@ def test_asset_total_size(

assert Asset.total_size() == asset_blob.size + zarr_archive.size

# TODO: add testing for embargoed zarr added, whenever embargoed zarrs
# supported, ATM they are not and tested by test_zarr_rest_create_embargoed_dandiset


@pytest.mark.django_db
def test_asset_full_metadata(draft_asset_factory):
Expand Down Expand Up @@ -233,6 +230,42 @@ def test_asset_full_metadata_zarr(draft_asset_factory, zarr_archive):
}


@pytest.mark.django_db
def test_asset_full_metadata_access(draft_asset_factory, asset_blob_factory, zarr_archive_factory):
raw_metadata = {
'foo': 'bar',
'schemaVersion': settings.DANDI_SCHEMA_VERSION,
}
embargoed_zarr_asset: Asset = draft_asset_factory(
metadata=raw_metadata, blob=None, zarr=zarr_archive_factory(embargoed=True)
)
open_zarr_asset: Asset = draft_asset_factory(
metadata=raw_metadata, blob=None, zarr=zarr_archive_factory(embargoed=False)
)

embargoed_blob_asset: Asset = draft_asset_factory(
metadata=raw_metadata, blob=asset_blob_factory(embargoed=True), zarr=None
)
open_blob_asset: Asset = draft_asset_factory(
metadata=raw_metadata, blob=asset_blob_factory(embargoed=False), zarr=None
)

# Test that access is correctly inferred from embargo status
assert embargoed_zarr_asset.full_metadata['access'] == [
{'schemaKey': 'AccessRequirements', 'status': AccessType.EmbargoedAccess.value}
]
assert embargoed_blob_asset.full_metadata['access'] == [
{'schemaKey': 'AccessRequirements', 'status': AccessType.EmbargoedAccess.value}
]

assert open_zarr_asset.full_metadata['access'] == [
{'schemaKey': 'AccessRequirements', 'status': AccessType.OpenAccess.value}
]
assert open_blob_asset.full_metadata['access'] == [
{'schemaKey': 'AccessRequirements', 'status': AccessType.OpenAccess.value}
]


# API Tests


Expand Down Expand Up @@ -1048,6 +1081,40 @@ def test_asset_create_existing_path(api_client, user, draft_version, asset_blob,
assert resp.status_code == 409


# Must use transaction=True as the tested function uses a transaction on_commit hook
@pytest.mark.django_db(transaction=True)
def test_asset_create_on_open_dandiset_embargoed_asset_blob(
api_client, user, draft_version, embargoed_asset_blob, mocker
):
mocked = mocker.patch('dandiapi.api.services.asset.remove_asset_blob_embargoed_tag_task.delay')

assert embargoed_asset_blob.embargoed

assign_perm('owner', user, draft_version.dandiset)
api_client.force_authenticate(user)

path = 'test/create/asset.txt'
metadata = {
'encodingFormat': 'application/x-nwb',
'path': path,
}

resp = api_client.post(
f'/api/dandisets/{draft_version.dandiset.identifier}'
f'/versions/{draft_version.version}/assets/',
{'metadata': metadata, 'blob_id': embargoed_asset_blob.blob_id},
format='json',
)
assert resp.status_code == 200

# Check that asset blob is no longer embargoed
embargoed_asset_blob.refresh_from_db()
assert not embargoed_asset_blob.embargoed

# Check that tag removal function called
mocked.assert_called_once()


@pytest.mark.django_db
def test_asset_rest_rename(api_client, user, draft_version, asset_blob):
assign_perm('owner', user, draft_version.dandiset)
Expand Down
Loading

0 comments on commit 8665adc

Please sign in to comment.