Skip to content

Commit

Permalink
tests: add test for record deletion of specific version
Browse files Browse the repository at this point in the history
  • Loading branch information
zzacharo committed Oct 10, 2023
1 parent fef8916 commit 259975d
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 5 deletions.
2 changes: 0 additions & 2 deletions invenio_rdm_records/records/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,6 @@ def next_latest_published_record_by_parent(cls, parent):
:param parent: parent record.
:param excluded_latest: latest record to exclude find next published version
"""

with db.session.no_autoflush:
rec_model_query = (
cls.model_cls.query.filter_by(parent_id=parent.id)
Expand Down Expand Up @@ -488,7 +487,6 @@ def get_latest_published_by_parent(cls, parent):
It might return None if there is no latest published version i.e not
published yet or all versions are deleted.
"""

latest_record = cls.get_latest_by_parent(parent)
if latest_record.deletion_status != RecordDeletionStatusEnum.PUBLISHED.value:
return None
Expand Down
174 changes: 171 additions & 3 deletions tests/services/test_record_deletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,31 @@

"""Service-level tests for record deletion."""

from copy import deepcopy
from datetime import datetime

import arrow
import pytest
from invenio_access.permissions import system_identity

from invenio_rdm_records.proxies import current_rdm_records
from invenio_rdm_records.records.api import RDMRecord
from invenio_rdm_records.records.systemfields.deletion_status import (
RecordDeletionStatusEnum,
)
from invenio_rdm_records.services.errors import DeletionStatusException


def test_record_deletion(running_app, minimal_record):
def assert_parent_resolved_to_record(
service, identity, parent_pid, record, status=RecordDeletionStatusEnum.PUBLISHED
):
"""."""
# resolve parent pid to latest
resolved_rec = service.pids.resolve(identity, parent_pid, "recid")
assert resolved_rec._record.pid.pid_value == record._record.pid.pid_value
assert resolved_rec._obj.deletion_status == status


def test_record_deletion(running_app, minimal_record, search_clear):
"""Test simple record deletion of a record."""
superuser_identity = running_app.superuser_identity
service = current_rdm_records.records_service
Expand Down Expand Up @@ -62,7 +73,7 @@ def test_record_deletion(running_app, minimal_record):
assert record._obj.tombstone is None


def test_invalid_record_deletion_workflows(running_app, minimal_record):
def test_invalid_record_deletion_workflows(running_app, minimal_record, search_clear):
"""Test the wrong order of deletion operations."""
superuser_identity = running_app.superuser_identity
service = current_rdm_records.records_service
Expand Down Expand Up @@ -96,3 +107,160 @@ def test_invalid_record_deletion_workflows(running_app, minimal_record):
# we cannot directly restore a record marked for purge
with pytest.raises(DeletionStatusException):
service.restore_record(superuser_identity, record.id)


def test_record_deletion_of_specific_version(running_app, minimal_record, search_clear):
"""Test record deletion of a specific version."""
superuser_identity = running_app.superuser_identity
service = current_rdm_records.records_service
draft = service.create(superuser_identity, minimal_record)
record_v1 = service.publish(superuser_identity, draft.id)
published_parent_pid = record_v1._record.parent.pid.pid_value

# publish v2
minimal_record_v2 = deepcopy(minimal_record)
minimal_record_v2["metadata"]["title"] = f"{minimal_record['metadata']['title']} v2"
draft_v2 = service.new_version(superuser_identity, record_v1.id)
service.update_draft(superuser_identity, draft_v2.id, minimal_record_v2)
record_v2 = service.publish(superuser_identity, draft_v2.id)

assert record_v2._obj.deletion_status == RecordDeletionStatusEnum.PUBLISHED

# resolve parent pid to record v2
assert_parent_resolved_to_record(
service, superuser_identity, published_parent_pid, record_v2
)

# search
res = service.search(
superuser_identity,
params={
"status": RecordDeletionStatusEnum.PUBLISHED.value,
"allversions": True,
},
)
assert res.total == 2

# delete the record v1
tombstone_info = {"note": "no specific reason, tbh"}
record = service.delete_record(superuser_identity, record_v1.id, tombstone_info)

# resolve parent pid to record v1
assert_parent_resolved_to_record(
service, superuser_identity, published_parent_pid, record_v2
)

RDMRecord.index.refresh()

# search
res = service.search(
superuser_identity,
params={
"status": RecordDeletionStatusEnum.PUBLISHED.value,
"allversions": True,
},
)
assert res.total == 1

# delete the record v2
tombstone_info = {"note": "no specific reason, tbh"}
record = service.delete_record(superuser_identity, record_v2.id, tombstone_info)
tombstone = record._obj.tombstone

# resolve parent pid to record v2 but deleted (v2 was the last version)
assert_parent_resolved_to_record(
service,
superuser_identity,
published_parent_pid,
record_v2, # resolved rec
status=RecordDeletionStatusEnum.DELETED,
)
assert tombstone.note == tombstone_info["note"]

RDMRecord.index.refresh()

# search
res = service.search(
superuser_identity,
params={
"status": RecordDeletionStatusEnum.PUBLISHED.value,
"allversions": True,
},
)
assert res.total == 0

# restore the record v1
record = service.restore_record(superuser_identity, record_v1.id)
assert record._obj.deletion_status == RecordDeletionStatusEnum.PUBLISHED
assert not record._obj.deletion_status.is_deleted
assert record._obj.tombstone is None

# resolve parent pid to record v1
assert_parent_resolved_to_record(
service, superuser_identity, published_parent_pid, record_v1
)

RDMRecord.index.refresh()

# search
res = service.search(
superuser_identity,
params={
"status": RecordDeletionStatusEnum.PUBLISHED.value,
"allversions": True,
},
)
assert res.total == 1

# create new version from restored v1
minimal_record_v2 = deepcopy(minimal_record)
minimal_record_v2["metadata"]["title"] = f"{minimal_record['metadata']['title']} v3"
draft_v3 = service.new_version(superuser_identity, record_v1.id)
service.update_draft(superuser_identity, draft_v3.id, minimal_record_v2)
record_v3 = service.publish(superuser_identity, draft_v3.id)

# resolve parent pid to record v3
assert_parent_resolved_to_record(
service, superuser_identity, published_parent_pid, record_v3
)

RDMRecord.index.refresh()

# search
res = service.search(
superuser_identity,
params={
"status": RecordDeletionStatusEnum.PUBLISHED.value,
"allversions": True,
},
)
assert res.total == 2

# restore the record v2
record = service.restore_record(superuser_identity, record_v2.id)
assert record._obj.deletion_status == RecordDeletionStatusEnum.PUBLISHED
assert not record._obj.deletion_status.is_deleted
assert record._obj.tombstone is None

# resolve parent pid to record v3
assert_parent_resolved_to_record(
service, superuser_identity, published_parent_pid, record_v3
)

RDMRecord.index.refresh()

# search
res = service.search(
superuser_identity,
params={
"status": RecordDeletionStatusEnum.PUBLISHED.value,
"allversions": True,
},
)

assert res.total == 3
hits = [(hit["id"], hit["versions"]["index"]) for hit in res.hits]
# assert the correct order of (id, version.index)
assert hits[0] == (record_v3.id, 3)
assert hits[1] == (record_v2.id, 2)
assert hits[2] == (record_v1.id, 1)

0 comments on commit 259975d

Please sign in to comment.