Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs: register RoR affiliation job #390

Merged
merged 4 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions invenio_vocabularies/contrib/common/ror/datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@


class RORHTTPReader(BaseReader):
"""ROR HTTP Reader returning an in-memory binary stream of the latest ROR data dump ZIP file."""
"""ROR HTTP Reader.

Returning an in-memory
binary stream of the latest ROR data dump ZIP file.
"""

def __init__(self, origin=None, mode="r", since=None, *args, **kwargs):
"""Constructor."""
Expand All @@ -30,7 +34,8 @@ def __init__(self, origin=None, mode="r", since=None, *args, **kwargs):

def _iter(self, fp, *args, **kwargs):
raise NotImplementedError(
"RORHTTPReader downloads one file and therefore does not iterate through items"
"RORHTTPReader downloads one file "
"and therefore does not iterate through items"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there so many formatting changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the checks were not run in the past

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI has been running green recently.

I just checked with my local setup and I don't get the same reformatting as you:

$ black --version
black, 24.10.0 (compiled: yes)
Python (CPython) 3.9.18

$ black .
All done! ✨ 🍰 ✨
215 files left unchanged.

Are you using another version of black, or another tool maybe?

Copy link
Contributor Author

@kpsherva kpsherva Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mine is 24.8 but since the major is the same, I wouldn't expect breaking changes. I guess it is not a blocker to merge? Also CI is green after my reformatting.

)

def _get_last_dump_date(self, linksets):
Expand All @@ -53,11 +58,16 @@ def _get_last_dump_date(self, linksets):
return last_dump_date
else:
raise ReaderError(
"Couldn't find JSON-LD in publisher's linkset to determine last dump date."
"Couldn't find JSON-LD in publisher's linkset "
"to determine last dump date."
)

def read(self, item=None, *args, **kwargs):
"""Reads the latest ROR data dump ZIP file from Zenodo and yields an in-memory binary stream of it."""
"""Reads the latest ROR data dump.

Read from ZIP file from
Zenodo and yields an in-memory binary stream of it.
"""
if item:
raise NotImplementedError(
"RORHTTPReader does not support being chained after another reader"
Expand All @@ -68,7 +78,8 @@ def read(self, item=None, *args, **kwargs):
landing_page = requests.get(dataset_doi_link, allow_redirects=True)
landing_page.raise_for_status()

# Call the signposting `linkset+json` endpoint for the Concept DOI (i.e. latest version) of the ROR data dump.
# Call the signposting `linkset+json` endpoint for
# the Concept DOI (i.e. latest version) of the ROR data dump.
# See: https://github.com/inveniosoftware/rfcs/blob/master/rfcs/rdm-0071-signposting.md#provide-an-applicationlinksetjson-endpoint
if "linkset" not in landing_page.links:
raise ReaderError("Linkset not found in the ROR dataset record.")
Expand All @@ -94,8 +105,10 @@ def read(self, item=None, *args, **kwargs):
raise ReaderError(f"Expected 1 ZIP item but got {len(zip_files)}")

# Download the ZIP file and fully load the response bytes content in memory.
# The bytes content are then wrapped by a BytesIO to be file-like object (as required by `zipfile.ZipFile`).
# Using directly `file_resp.raw` is not possible since `zipfile.ZipFile` requires the file-like object to be seekable.
# The bytes content are then wrapped by a BytesIO to be
# file-like object (as required by `zipfile.ZipFile`).
# Using directly `file_resp.raw` is not possible since
# `zipfile.ZipFile` requires the file-like object to be seekable.
file_resp = requests.get(file_url)
file_resp.raise_for_status()
yield io.BytesIO(file_resp.content)
Expand Down
88 changes: 88 additions & 0 deletions invenio_vocabularies/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2021-2022 CERN.
#
# Invenio-Vocabularies is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Jobs module."""

import datetime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Just missing the copyright header.

from datetime import timezone

from invenio_i18n import gettext as _
from invenio_jobs.jobs import JobType
from marshmallow import Schema, fields
from marshmallow_utils.fields import TZDateTime

from invenio_vocabularies.services.tasks import process_datastream


class ArgsSchema(Schema):
"""Schema of task input arguments."""

since = TZDateTime(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about moving this to a base class/mixin in invenio-jobs, as it will be added to all jobs?

timezone=timezone.utc,
format="iso",
metadata={
"description": _(
"YYYY-MM-DD HH:mm format. "
"Leave field empty if it should continue since last successful run."
)
},
)
job_arg_schema = fields.String(
metadata={"type": "hidden"},
dump_default="ArgsSchema",
load_default="ArgsSchema",
)


class ProcessDataStreamJob(JobType):
"""Generic process data stream job type."""

arguments_schema = ArgsSchema
task = process_datastream
id = None


class ProcessRORAffiliationsJob(ProcessDataStreamJob):
"""Process ROR affiliations datastream registered task."""

description = "Process ROR affiliations"
title = "Load ROR affiliations"
id = "process_ror_affiliations"

@classmethod
def default_args(cls, job_obj, since=None, **kwargs):
"""Generate default job arguments here."""
if since is None and job_obj.last_runs["success"]:
since = job_obj.last_runs["success"].started_at
else:
since = datetime.datetime.now()

return {
"config": {
"readers": [
{
"args": {"since": since},
"type": "ror-http",
},
{"args": {"regex": "_schema_v2\\.json$"}, "type": "zip"},
{"type": "json"},
],
"writers": [
{
"args": {
"writer": {
"type": "affiliations-service",
"args": {"update": True},
}
},
"type": "async",
}
],
"transformers": [{"type": "ror-affiliations"}],
}
}
4 changes: 3 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ install_requires =
invenio-i18n>=2.0.0,<3.0.0
invenio-records-resources>=6.0.0,<7.0.0
invenio-administration>=2.0.0,<3.0.0
invenio-jobs>=1.0.0,<2.0.0
lxml>=4.5.0
PyYAML>=5.4.1
regex>=2024.7.24
Expand Down Expand Up @@ -110,7 +111,8 @@ invenio_i18n.translations =
invenio_vocabularies = invenio_vocabularies
invenio_celery.tasks =
invenio_vocabularies_services = invenio_vocabularies.services.tasks

invenio_jobs.jobs =
process_ror_affiliations = invenio_vocabularies.jobs:ProcessRORAffiliationsJob

[build_sphinx]
source-dir = docs/
Expand Down