diff --git a/cds_migrator_kit/rdm/migration/stats/event_generator.py b/cds_migrator_kit/rdm/migration/stats/event_generator.py index 8d0f36b..195fd2d 100644 --- a/cds_migrator_kit/rdm/migration/stats/event_generator.py +++ b/cds_migrator_kit/rdm/migration/stats/event_generator.py @@ -99,7 +99,7 @@ def process_download_event(entry, rec_context, logger): "via_api": False, "is_robot": entry.get("bot", False), "country": entry.get("country", ""), - "visitor_id": entry["visitor_id"], + "visitor_id": entry.get("visitor_id", ""), "unique_session_id": entry["unique_session_id"], # Note: id_bibrec doesn't have the new format pids "unique_id": f"ui_{_record_version['new_recid']}", @@ -168,7 +168,7 @@ def process_pageview_event(entry, rec_context, logger): "referrer": None, "via_api": False, "is_robot": entry.get("bot", False), - "visitor_id": entry["visitor_id"], + "visitor_id": entry.get("visitor_id", ""), # Note: id_bibrec doesn't have the new format pids "unique_id": f"ui_{rec_context['latest_version']}", "unique_session_id": entry["unique_session_id"], @@ -189,14 +189,15 @@ def prepare_new_doc( try: new_doc = deepcopy(doc) # remove to avoid reindexing - new_doc.pop("_id", None) + new_doc["_id"] = f"migrated_{new_doc['_id']}" + new_doc.pop("_score", None) event_type = new_doc["_source"].pop("event_type", None) if event_type != doc_type: raise Exception("Inconsistent doc type") - processed_doc = {} + if event_type == "events.downloads": processed_doc = process_download_event( new_doc["_source"], rec_context, logger @@ -228,9 +229,10 @@ def prepare_new_doc( month = f"{date_object.month:02}" yield { - "_op_type": "index", + "_op_type": "create", "_index": f"{dest_search_index_prefix}-{index_type}-{year}-{month}", "_source": processed_doc, + "_id": new_doc["_id"], } except Exception as ex: logger.error( diff --git a/cds_migrator_kit/rdm/migration/stats/load.py b/cds_migrator_kit/rdm/migration/stats/load.py index 5fb57fc..d6d5a35 100644 --- a/cds_migrator_kit/rdm/migration/stats/load.py +++ b/cds_migrator_kit/rdm/migration/stats/load.py @@ -19,6 +19,7 @@ os_count, os_scroll, os_search, + bulk_index_documents, ) @@ -26,7 +27,7 @@ from opensearchpy import OpenSearch from opensearchpy.exceptions import OpenSearchException -from opensearchpy.helpers import bulk +from opensearchpy.helpers import bulk, parallel_bulk, BulkIndexError _QUERY_VIEWS = { "query": { @@ -83,12 +84,13 @@ def _generate_new_events(self, data, rec_context, logger, doc_type): self.config["DEST_SEARCH_INDEX_PREFIX"], ) if self.dry_run: - for new_doc in new_docs: + for new_doc in new_docs_generated: logger.info(json.dumps(new_doc)) else: - bulk(self.dest_os_client, new_docs_generated, raise_on_error=True) + bulk_index_documents(self.dest_os_client, new_docs_generated, logger) + except Exception as ex: - logger.error(ex) + logger.error(str(ex)) def _process_legacy_events_for_recid(self, recid, rec_context, index, event_type): data = os_search( @@ -169,7 +171,7 @@ def validate_stats_for_recid(self, recid, record, event_type): ) except AssertionError as e: logger.error( - f"Not all events of type {event_type} were migrated for record: {recid}" + f"Not all events of type {event_type} were migrated for record: {recid}. Legacy count: {legacy_total['count']} - RDM count: {new_total['count']}" ) def _load(self, entry): diff --git a/cds_migrator_kit/rdm/migration/stats/search.py b/cds_migrator_kit/rdm/migration/stats/search.py index ab35a4a..488f4a9 100644 --- a/cds_migrator_kit/rdm/migration/stats/search.py +++ b/cds_migrator_kit/rdm/migration/stats/search.py @@ -7,11 +7,13 @@ """CDS-RDM migration stats search module.""" +import json import time from copy import deepcopy from opensearchpy import OpenSearch from opensearchpy.exceptions import OpenSearchException +from opensearchpy.helpers import parallel_bulk, BulkIndexError def generate_query(doc_type, identifier, legacy_to_rdm_events_map): @@ -77,3 +79,37 @@ def os_count(src_os_client, index, q): ex = _ex time.sleep(10) raise ex + + +def bulk_index_documents( + client, + documents, + logger, + chunk_size=500, + max_chunk_bytes=50 * 1024 * 1024, +): + """ + Index documents into Opensearch using parallel_bulk with improved readability and error handling. + """ + try: + # Execute parallel_bulk with configuration for improved performance + for ok, action in parallel_bulk( + client, + actions=documents, + chunk_size=chunk_size, + max_chunk_bytes=max_chunk_bytes, + raise_on_error=True, # Handle errors manually for better control + raise_on_exception=True, + ignore_status=409, # Ignore 409 Conflict status for existing documents + ): + pass + + except BulkIndexError as e: + for error in e.errors: + _failed_doc = { + "_op_type": "create", + "_index": error["create"]["_index"], + "_source": error["create"]["data"], + "_id": error["create"]["_id"], + } + logger.error(f"Failed to index: {json.dumps(_failed_doc)}") diff --git a/cds_migrator_kit/rdm/migration/transform/models/affiliations.py b/cds_migrator_kit/rdm/migration/transform/models/affiliations.py index ad5a396..393256e 100644 --- a/cds_migrator_kit/rdm/migration/transform/models/affiliations.py +++ b/cds_migrator_kit/rdm/migration/transform/models/affiliations.py @@ -33,6 +33,7 @@ class CDSAffiliations(CdsOverdo): __ignore_keys__ = { # IGNORED + "037__9", "0247_2", # DOI, summer student notes do not have it "0247_a", # DOI "0248_a", # oai identifier, not needed to migrate, TBD @@ -45,6 +46,7 @@ class CDSAffiliations(CdsOverdo): "269__a", "269__b", "269__c", + "300__a", "270__m", # document contact email "595__a", # always value CERN EDS, not displayed, TODO: do we keep? "595__z", # SOME RECORD HAVE UNCL as value, do we keep it? what does UNCL mean diff --git a/cds_migrator_kit/rdm/migration/transform/transform.py b/cds_migrator_kit/rdm/migration/transform/transform.py index d5cd7e0..433467e 100644 --- a/cds_migrator_kit/rdm/migration/transform/transform.py +++ b/cds_migrator_kit/rdm/migration/transform/transform.py @@ -225,17 +225,21 @@ def _match_affiliation(self, affiliation_name): # Step 1: check if there is a curated input if match.curated_affiliation: return match.curated_affiliation + # Step 2: check if there is an exact match elif match.ror_exact_match: return {"id": normalize_ror(match.ror_exact_match)} + # Step 3: check if there is not exact match elif match.ror_not_exact_match: + _affiliation_ror_id = normalize_ror(match.ror_not_exact_match) raise RecordFlaggedCuration( subfield="u", - value={"id": normalize_ror(match.ror_not_exact_match)}, + value={"id": _affiliation_ror_id}, field="author", - message=f"Affiliation {normalize_ror(match.ror_not_exact_match)} not found as an exact match, ROR id should be checked.", + message=f"Affiliation {_affiliation_ror_id} not found as an exact match, ROR id should be checked.", stage="vocabulary match", ) else: + # Step 4: set the originally inserted value from legacy raise RecordFlaggedCuration( subfield="u", value={"name": affiliation_name}, @@ -306,12 +310,12 @@ def _resource_type(entry): # filter empty keys return {k: v for k, v in metadata.items() if v} - - def _custom_fields(self, json_entry, json_output): def field_experiments(record_json, custom_fields_dict): - experiments = record_json.get("custom_fields", {}).get("cern:experiments", []) + experiments = record_json.get("custom_fields", {}).get( + "cern:experiments", [] + ) for experiment in experiments: result = search_vocabulary(experiment, "experiments") @@ -331,9 +335,10 @@ def field_experiments(record_json, custom_fields_dict): stage="vocabulary match", ) - def field_departments(record_json, custom_fields_dict): - departments = record_json.get("custom_fields", {}).get("cern:departments", []) + departments = record_json.get("custom_fields", {}).get( + "cern:departments", [] + ) for department in departments: result = search_vocabulary(department, "departments") if result["hits"]["total"]: @@ -350,7 +355,9 @@ def field_departments(record_json, custom_fields_dict): ) def field_accelerators(record_json, custom_fields_dict): - accelerators = record_json.get("custom_fields", {}).get("cern:accelerators", []) + accelerators = record_json.get("custom_fields", {}).get( + "cern:accelerators", [] + ) for accelerator in accelerators: result = search_vocabulary(accelerator, "accelerators") if result["hits"]["total"]: @@ -559,7 +566,7 @@ def compute_files(file_dump, versions_dict): { file["full_name"]: { "eos_tmp_path": tmp_eos_root - / full_path.relative_to(legacy_path_root), + / full_path.relative_to(legacy_path_root), "id_bibdoc": file["bibdocid"], "key": file["full_name"], "metadata": {},