Skip to content

Commit

Permalink
stats: make stat import idempotent
Browse files Browse the repository at this point in the history
  • Loading branch information
zzacharo committed Oct 31, 2024
1 parent cba6520 commit e18b304
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 19 deletions.
12 changes: 7 additions & 5 deletions cds_migrator_kit/rdm/migration/stats/event_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def process_download_event(entry, rec_context, logger):
"via_api": False,
"is_robot": entry.get("bot", False),
"country": entry.get("country", ""),
"visitor_id": entry["visitor_id"],
"visitor_id": entry.get("visitor_id", ""),
"unique_session_id": entry["unique_session_id"],
# Note: id_bibrec doesn't have the new format pids
"unique_id": f"ui_{_record_version['new_recid']}",
Expand Down Expand Up @@ -168,7 +168,7 @@ def process_pageview_event(entry, rec_context, logger):
"referrer": None,
"via_api": False,
"is_robot": entry.get("bot", False),
"visitor_id": entry["visitor_id"],
"visitor_id": entry.get("visitor_id", ""),
# Note: id_bibrec doesn't have the new format pids
"unique_id": f"ui_{rec_context['latest_version']}",
"unique_session_id": entry["unique_session_id"],
Expand All @@ -189,14 +189,15 @@ def prepare_new_doc(
try:
new_doc = deepcopy(doc)
# remove to avoid reindexing
new_doc.pop("_id", None)
new_doc["_id"] = f"migrated_{new_doc['_id']}"

new_doc.pop("_score", None)

event_type = new_doc["_source"].pop("event_type", None)

if event_type != doc_type:
raise Exception("Inconsistent doc type")
processed_doc = {}

if event_type == "events.downloads":
processed_doc = process_download_event(
new_doc["_source"], rec_context, logger
Expand Down Expand Up @@ -228,9 +229,10 @@ def prepare_new_doc(
month = f"{date_object.month:02}"

yield {
"_op_type": "index",
"_op_type": "create",
"_index": f"{dest_search_index_prefix}-{index_type}-{year}-{month}",
"_source": processed_doc,
"_id": new_doc["_id"],
}
except Exception as ex:
logger.error(
Expand Down
24 changes: 19 additions & 5 deletions cds_migrator_kit/rdm/migration/stats/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from opensearchpy import OpenSearch
from opensearchpy.exceptions import OpenSearchException
from opensearchpy.helpers import bulk
from opensearchpy.helpers import bulk, parallel_bulk, BulkIndexError

_QUERY_VIEWS = {
"query": {
Expand Down Expand Up @@ -83,12 +83,26 @@ def _generate_new_events(self, data, rec_context, logger, doc_type):
self.config["DEST_SEARCH_INDEX_PREFIX"],
)
if self.dry_run:
for new_doc in new_docs:
for new_doc in new_docs_generated:
logger.info(json.dumps(new_doc))
else:
bulk(self.dest_os_client, new_docs_generated, raise_on_error=True)
[
_
for _ in parallel_bulk(
self.dest_os_client,
actions=new_docs_generated,
raise_on_error=True,
raise_on_exception=True,
ignore_status=409,
)
]

except BulkIndexError as ex:
for error in ex.errors:
logger.error(error)

except Exception as ex:
logger.error(ex)
logger.error(str(ex))

def _process_legacy_events_for_recid(self, recid, rec_context, index, event_type):
data = os_search(
Expand Down Expand Up @@ -169,7 +183,7 @@ def validate_stats_for_recid(self, recid, record, event_type):
)
except AssertionError as e:
logger.error(
f"Not all events of type {event_type} were migrated for record: {recid}"
f"Not all events of type {event_type} were migrated for record: {recid}. Legacy count: {legacy_total['count']} - RDM count: {new_total['count']}"
)

def _load(self, entry):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class CDSAffiliations(CdsOverdo):

__ignore_keys__ = {
# IGNORED
"037__9",
"0247_2", # DOI, summer student notes do not have it
"0247_a", # DOI
"0248_a", # oai identifier, not needed to migrate, TBD
Expand All @@ -45,6 +46,7 @@ class CDSAffiliations(CdsOverdo):
"269__a",
"269__b",
"269__c",
"300__a",
"270__m", # document contact email
"595__a", # always value CERN EDS, not displayed, TODO: do we keep?
"595__z", # SOME RECORD HAVE UNCL as value, do we keep it? what does UNCL mean
Expand Down
25 changes: 16 additions & 9 deletions cds_migrator_kit/rdm/migration/transform/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,21 @@ def _match_affiliation(self, affiliation_name):
# Step 1: check if there is a curated input
if match.curated_affiliation:
return match.curated_affiliation
# Step 2: check if there is an exact match
elif match.ror_exact_match:
return {"id": normalize_ror(match.ror_exact_match)}
# Step 3: check if there is not exact match
elif match.ror_not_exact_match:
_affiliation_ror_id = normalize_ror(match.ror_not_exact_match)
raise RecordFlaggedCuration(
subfield="u",
value={"id": normalize_ror(match.ror_not_exact_match)},
value={"id": _affiliation_ror_id},
field="author",
message=f"Affiliation {normalize_ror(match.ror_not_exact_match)} not found as an exact match, ROR id should be checked.",
message=f"Affiliation {_affiliation_ror_id} not found as an exact match, ROR id should be checked.",
stage="vocabulary match",
)
else:
# Step 4: set the originally inserted value from legacy
raise RecordFlaggedCuration(
subfield="u",
value={"name": affiliation_name},
Expand Down Expand Up @@ -306,12 +310,12 @@ def _resource_type(entry):
# filter empty keys
return {k: v for k, v in metadata.items() if v}



def _custom_fields(self, json_entry, json_output):

def field_experiments(record_json, custom_fields_dict):
experiments = record_json.get("custom_fields", {}).get("cern:experiments", [])
experiments = record_json.get("custom_fields", {}).get(
"cern:experiments", []
)
for experiment in experiments:
result = search_vocabulary(experiment, "experiments")

Expand All @@ -331,9 +335,10 @@ def field_experiments(record_json, custom_fields_dict):
stage="vocabulary match",
)


def field_departments(record_json, custom_fields_dict):
departments = record_json.get("custom_fields", {}).get("cern:departments", [])
departments = record_json.get("custom_fields", {}).get(
"cern:departments", []
)
for department in departments:
result = search_vocabulary(department, "departments")
if result["hits"]["total"]:
Expand All @@ -350,7 +355,9 @@ def field_departments(record_json, custom_fields_dict):
)

def field_accelerators(record_json, custom_fields_dict):
accelerators = record_json.get("custom_fields", {}).get("cern:accelerators", [])
accelerators = record_json.get("custom_fields", {}).get(
"cern:accelerators", []
)
for accelerator in accelerators:
result = search_vocabulary(accelerator, "accelerators")
if result["hits"]["total"]:
Expand Down Expand Up @@ -559,7 +566,7 @@ def compute_files(file_dump, versions_dict):
{
file["full_name"]: {
"eos_tmp_path": tmp_eos_root
/ full_path.relative_to(legacy_path_root),
/ full_path.relative_to(legacy_path_root),
"id_bibdoc": file["bibdocid"],
"key": file["full_name"],
"metadata": {},
Expand Down

0 comments on commit e18b304

Please sign in to comment.