Skip to content

Commit

Permalink
uow: add ParentRecordCommitOp uow
Browse files Browse the repository at this point in the history
- adds a new uow to commit a parent record and bulk index all records
  and drafts
  • Loading branch information
ntarocco authored and zzacharo committed Sep 5, 2023
1 parent e57501d commit f42a5d5
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 75 deletions.
25 changes: 18 additions & 7 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,39 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
python-version: [3.8, 3.9]
python-version: [3.7, 3.8, 3.9]
requirements-level: [pypi]
db-service: [postgresql11, postgresql13]
db-service: [postgresql12, postgresql14]
search-service: [opensearch2, elasticsearch7]
exclude:
- python-version: 3.7
db-service: postgresql14
search-service: opensearch2

- python-version: 3.7
db-service: postgresql14
search-service: elasticsearch7

- python-version: 3.8
db-service: postgresql11
db-service: postgresql12
search-service: opensearch2

- python-version: 3.8
db-service: postgresql11
db-service: postgresql12
search-service: elasticsearch7

- python-version: 3.9
db-service: postgresql11
db-service: postgresql12
search-service: opensearch2

- python-version: 3.9
db-service: postgresql11
db-service: postgresql12
search-service: elasticsearch7
include:
- db-service: postgresql13
- db-service: postgresql12
DB_EXTRAS: "postgresql"

- db-service: postgresql14
DB_EXTRAS: "postgresql"

- search-service: opensearch2
Expand Down
38 changes: 30 additions & 8 deletions invenio_drafts_resources/records/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,38 @@ class Record(RecordBase):
versions = VersionsField(create=True, set_latest=True)

@classmethod
def get_records_by_parent(cls, parent, include_deleted=True):
def get_records_by_parent(cls, parent, with_deleted=True, ids_only=False):
"""Get all sibling records for the specified parent record."""
versions = cls.model_cls.query.filter_by(parent_id=parent.id)
if not include_deleted:
versions = versions.filter_by(is_deleted=False)
rec_models = cls.model_cls.query.filter_by(parent_id=parent.id)
if not with_deleted:
rec_models = rec_models.filter_by(is_deleted=False)

return (
cls(rec_model.data, model=rec_model, parent=parent)
for rec_model in versions
)
if ids_only:
return (rec_model.id for rec_model in rec_models)
else:
return (
cls(rec_model.data, model=rec_model, parent=parent)
for rec_model in rec_models
)

@classmethod
def get_latest_by_parent(cls, parent, id_only=False):
"""Get the latest record for the specified parent record.
It might return None if there is no latest published version yet.
"""
version = cls.versions_model_cls.query.filter_by(
parent_id=parent.id
).one_or_none()
has_latest = version and version.latest_id
if not has_latest:
return None

rec_model = cls.model_cls.query.filter_by(id=version.latest_id).one()
if id_only:
return rec_model.id
else:
return cls(rec_model.data, model=rec_model, parent=parent)

@classmethod
def publish(cls, draft):
Expand Down
29 changes: 3 additions & 26 deletions invenio_drafts_resources/services/records/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

"""Primary service for working with records and drafts."""

import itertools

from flask import current_app
from invenio_db import db
from invenio_records_resources.services import LinksTemplate
Expand All @@ -28,6 +26,8 @@
from sqlalchemy.orm.exc import NoResultFound
from werkzeug.local import LocalProxy

from .uow import ParentRecordCommitOp


class RecordService(RecordServiceBase):
"""Record and draft service interface.
Expand Down Expand Up @@ -291,7 +291,7 @@ def create(self, identity, data, uow=None, expand=False):
uow=uow,
expand=expand,
)
uow.register(RecordCommitOp(res._record.parent))
uow.register(ParentRecordCommitOp(res._record.parent))
return res

@unit_of_work()
Expand Down Expand Up @@ -612,29 +612,6 @@ def _get_draft_and_parent_by_id(self, id_):

return draft, parent

@unit_of_work()
def _index_related_records(self, record, parent, uow=None):
"""Index all records that are related to the specified ones.
Soft deleted records (including published drafts) will not be indexed
because the JSON payload is empty.
"""
_parent = parent or record.parent
siblings = self.record_cls.get_records_by_parent(_parent, include_deleted=False)

if self.draft_cls is not None:
# if drafts are available, reindex them as well
drafts = self.draft_cls.get_records_by_parent(
_parent, include_deleted=False
)
siblings = itertools.chain(siblings, drafts)

# TODO only index the current record immediately;
# all siblings should be sent to a high-priority celery task
# instead (requires bulk indexing to work)
for sibling in siblings:
uow.register(RecordIndexOp(sibling, indexer=self.indexer))

@unit_of_work()
def cleanup_drafts(self, timedelta, uow=None, search_gc_deletes=60):
"""Hard delete of soft deleted drafts.
Expand Down
69 changes: 69 additions & 0 deletions invenio_drafts_resources/services/records/uow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2023 CERN.
#
# Invenio-Drafts-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Unit of work."""

from invenio_records_resources.services.uow import RecordCommitOp


class ParentRecordCommitOp(RecordCommitOp):
"""Parent record commit operation, bulk indexing records and drafts."""

def __init__(
self,
parent,
indexer_context=None,
):
"""Initialize the parent record commit operation.
:params indexer_context: a dict containing record/draft cls and indexers,
or service. Expected keys: `record_cls, draft_cls, indexer, draft_indexer`.
`None` if indexing disabled.
"""
super().__init__(parent, indexer=None)
self._indexer_context = indexer_context
if indexer_context:
_service = indexer_context.get("service", None)
if _service is not None:
self._record_cls = getattr(_service, "record_cls")
self._draft_cls = getattr(_service, "draft_cls")
self._record_indexer = getattr(_service, "indexer")
self._draft_indexer = getattr(_service, "draft_indexer")
else:
self._record_cls = indexer_context["record_cls"]
self._draft_cls = indexer_context["draft_cls"]
self._record_indexer = indexer_context["indexer"]
self._draft_indexer = indexer_context["draft_indexer"]

def _get_siblings(self):
"""Get all record and draft versions by parent.
This operation might be slow when a record has a high number of versions and drafts.
However, given that often versions should be re-indexed as soon as possible when the parent
is committed, the fetching operation is done synchronously.
"""
records_ids = self._record_cls.get_records_by_parent(
self._record, with_deleted=False, ids_only=True
)
drafts_ids = self._draft_cls.get_records_by_parent(
self._record, with_deleted=False, ids_only=True
)
return records_ids, drafts_ids

def on_commit(self, uow):
"""No commit operation."""
pass

def on_post_commit(self, uow):
"""Bulk index as last operation."""
if self._indexer_context is not None:
records_ids, drafts_ids = self._get_siblings()
if records_ids:
self._record_indexer.bulk_index(records_ids)
if drafts_ids:
self._draft_indexer.bulk_index(drafts_ids)
40 changes: 38 additions & 2 deletions tests/records/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def test_get_records_by_parent(app, db, location):
drafts = list(Draft.get_records_by_parent(parent))
assert len(drafts) == 1
assert id(parent) == id(drafts[0].parent)
drafts = list(Draft.get_records_by_parent(parent, include_deleted=False))
drafts = list(Draft.get_records_by_parent(parent, with_deleted=False))
assert len(drafts) == 0
records = list(Record.get_records_by_parent(parent))
assert len(records) == 1
Expand Down Expand Up @@ -395,7 +395,7 @@ def test_get_records_by_parent(app, db, location):
parent = record_v2.parent
drafts = list(Draft.get_records_by_parent(parent))
assert len(drafts) == 2
drafts = list(Draft.get_records_by_parent(parent, include_deleted=False))
drafts = list(Draft.get_records_by_parent(parent, with_deleted=False))
assert len(drafts) == 0
records = list(Record.get_records_by_parent(parent))
assert len(records) == 2
Expand All @@ -406,3 +406,39 @@ def test_get_records_by_parent(app, db, location):
assert state.latest_id == record_v2.id
assert state.latest_index == 2
assert state.next_draft_id is None


#
# Get latest by parent
#
def test_get_latest_by_parent(app, db, location):
"""Test get latest by parent."""
draft_v1 = Draft.create({})
db.session.commit()
parent = draft_v1.parent

# nothing published yet
assert not Draft.get_latest_by_parent(parent)
assert not Record.get_latest_by_parent(parent)

record_v1 = Record.publish(draft_v1)
draft_v1.delete() # simulate service `publish`, will soft delete drafts
db.session.commit()
parent = record_v1.parent

assert not Draft.get_latest_by_parent(parent)
assert Record.get_latest_by_parent(parent).id == record_v1.id

draft_v2 = Draft.new_version(record_v1)
draft_v2.commit()
db.session.commit()

assert Draft.get_latest_by_parent(parent).id == record_v1.id
assert Record.get_latest_by_parent(parent).id == record_v1.id

record_v2 = Record.publish(draft_v2)
draft_v2.delete() # simulate service `publish`, will soft delete drafts
db.session.commit()

assert not Draft.get_latest_by_parent(parent)
assert Record.get_latest_by_parent(parent).id == record_v2.id
32 changes: 0 additions & 32 deletions tests/services/test_record_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
- Read with missing pid
"""

from io import BytesIO

import pytest
from invenio_db import db
from invenio_files_rest.errors import InvalidOperationError
from invenio_pidstore.errors import PIDDoesNotExistError, PIDUnregistered
from invenio_pidstore.models import PIDStatus
from marshmallow.exceptions import ValidationError
Expand Down Expand Up @@ -360,32 +357,3 @@ def test_read_latest_version(app, service, identity_simple, input_data):
# Check that parent returns latest version
latest = service.read_latest(identity_simple, parent_recid)
assert latest["id"] == recid_2


def test_reindexing_all_siblings(app, service, identity_simple, input_data):
"""Test if reindexing sibling records includes drafts."""
record = create_and_publish(service, identity_simple, input_data)._obj
recid = record["id"]

# Edit the record, and make it somehow out of sync with the index
draft = service.edit(identity_simple, recid)._obj
draft.metadata["title"] = "Test (draft)"
draft.commit()
db.session.commit()
draft.index.refresh()

# Make sure that the DB is correct, but the index is out of date
# (note: service.search_draft(...) would always give me 0 results here)
metadata = service.read_draft(identity_simple, recid).data["metadata"]
assert metadata["title"] == "Test (draft)"
hits = [hit for hit in service.draft_cls.index.search() if hit["id"] == recid]
assert hits
assert hits[0]["metadata"]["title"] == record.metadata["title"]

# Reindex all sibling records and drafts
# and make sure that the draft is updated now
service._index_related_records(record, parent=record.parent)
draft.index.refresh()
hits = [hit for hit in service.draft_cls.index.search() if hit["id"] == recid]
assert hits
assert hits[0]["metadata"]["title"] == draft.metadata["title"]
Loading

0 comments on commit f42a5d5

Please sign in to comment.