diff --git a/robotoff/products.py b/robotoff/products.py index 284e4d8445..f90fc1db77 100644 --- a/robotoff/products.py +++ b/robotoff/products.py @@ -18,6 +18,7 @@ from robotoff import settings from robotoff.types import JSONType, ProductIdentifier, ServerType from robotoff.utils import get_logger, gzip_jsonl_iter, http_session, jsonl_iter +from robotoff.utils import export logger = get_logger(__name__) @@ -592,9 +593,15 @@ def convert_jsonl_to_parquet( .replace("{output_path}", output_file_path) ) try: - duckdb.sql(query) + logger.info("Query the JSONL using DuckDB.") + arrow_batches = duckdb.sql(query).fetch_arrow_reader(batch_size=100000) + logger.info("Post-process extracted data using Arrow") + # arrow_batches = export.postprocess_arrow_batches(arrow_batches) + logger.info("Write post-processed data into Parquet.") + export.sink_to_parquet(output_file_path, batches=arrow_batches) + except duckdb.Error as e: - logger.error(f"Error executing query: {query}\nError message: {e}") + logger.error("Error executing query: %s\nError message: %s", query, e) raise logger.info("JSONL successfully converted into Parquet file.") diff --git a/robotoff/utils/export.py b/robotoff/utils/export.py new file mode 100644 index 0000000000..8df44af415 --- /dev/null +++ b/robotoff/utils/export.py @@ -0,0 +1,116 @@ +"Functions to postprocess the database conversion into Parquet." + +import json +from typing import Iterator + +import pyarrow as pa +import pyarrow.parquet as pq + + +################ +# Schemas +################ +## Images field +_size_schema = pa.struct( + [ + pa.field("h", pa.int32(), nullable=True), + pa.field("w", pa.int32(), nullable=True), + ] +) + +_dict_schema = pa.struct( + [ + pa.field("key", pa.string(), nullable=True), + pa.field("imgid", pa.string(), nullable=True), + pa.field( + "sizes", + pa.struct( + [ + pa.field("100", _size_schema, nullable=True), + pa.field("200", _size_schema, nullable=True), + pa.field("400", _size_schema, nullable=True), + pa.field("full", _size_schema, nullable=True), + ] + ), + nullable=True, + ), + pa.field("uploaded_t", pa.string(), nullable=True), + pa.field("uploader", pa.string(), nullable=True), + ] +) + +IMAGES_DATATYPE = pa.list_(_dict_schema) + + +################ +# Functions +################ +def sink_to_parquet(path: str, batches: pa.RecordBatchReader): + schema = batches.schema + schema = schema.remove(schema.get_field_index("images")) + schema = schema.append(pa.field("images", IMAGES_DATATYPE)) + with pq.ParquetWriter(path, schema=schema) as writer: + for batch in batches: + batch = batches.read_next_batch() + batch = _postprocess_arrow_batch(batch) + # batch = _postprocess_arrow_batch(batch) + writer.write_batch(batch) + + +def postprocess_arrow_batches(batches: pa.RecordBatchReader) -> pa.RecordBatchReader: + + return pa.RecordBatchReader.from_batches( + schema=batches.schema, + batches=[_postprocess_arrow_batch(batch) for batch in batches] + ) + + +def _postprocess_arrow_batch(batch: pa.RecordBatch) -> pa.RecordBatch: + batch = _postprocess_images(batch) + return batch + + +def _postprocess_images( + batch: pa.RecordBatch, + datatype: pa.DataType = IMAGES_DATATYPE + ): + postprocessed_images = [] + images: list[dict | None] = [ + json.loads(image) if image else None for image in batch["images"].to_pylist() + ] + for image in images: + if image: + postprocessed_images.append( + [ + { + "key": key, + "imgid": str(value.get("imgid", "unknown")), + "sizes": { + "100": { + "h": value.get("sizes", {}).get("100", {}).get("h", 0), + "w": value.get("sizes", {}).get("100", {}).get("w", 0), + }, + "200": { + "h": value.get("sizes", {}).get("200", {}).get("h", 0), + "w": value.get("sizes", {}).get("200", {}).get("w", 0), + }, + "400": { + "h": value.get("sizes", {}).get("400", {}).get("h", 0), + "w": value.get("sizes", {}).get("400", {}).get("w", 0), + }, + "full": { + "h": value.get("sizes", {}).get("full", {}).get("h", 0), + "w": value.get("sizes", {}).get("full", {}).get("w", 0), + }, + }, + "uploaded_t": str(value.get("uploaded_t", "unknown")), + "uploader": str(value.get("uploader", "unknown")), + } + for key, value in image.items() + ] + ) + else: + postprocessed_images.append([]) + images_array = pa.array(postprocessed_images, type=datatype) + batch = batch.set_column(1, "images", images_array) + return batch diff --git a/robotoff/utils/sql/jsonl_to_parquet.sql b/robotoff/utils/sql/jsonl_to_parquet.sql index deb91d7e9d..cdd6ba7445 100644 --- a/robotoff/utils/sql/jsonl_to_parquet.sql +++ b/robotoff/utils/sql/jsonl_to_parquet.sql @@ -1,135 +1,132 @@ SET threads to 4; SET preserve_insertion_order = false; -COPY ( - SELECT - code, - additives_n, - additives_tags, - allergens_from_ingredients, - allergens_from_user, - allergens_tags, - brands_tags, - categories_properties_tags, - categories, - checkers_tags, - cities_tags, - compared_to_category, - complete, - completeness, - correctors_tags, - countries_tags, - to_timestamp(created_t)::datetime AS created_t, -- Convert from unixtime to datetime - creator, - data_quality_errors_tags, - data_quality_info_tags, - data_quality_warnings_tags, - data_sources_tags, - ecoscore_data, - ecoscore_grade, - ecoscore_score, - ecoscore_tags, - editors, - emb_codes, - emb_codes_tags, - entry_dates_tags, - environment_impact_level, - food_groups_tags, - forest_footprint_data, - generic_name, - grades, - images, - informers_tags, - ingredients_analysis_tags, - ingredients_from_palm_oil_n, - ingredients_n, - ingredients_tags, - ingredients_text_with_allergens, - ingredients_text, - COLUMNS('ingredients_text_\w{2}$'), -- All columns containing ingredients_text_.. - ingredients_with_specified_percent_n, - ingredients_with_unspecified_percent_n, - ciqual_food_name_tags, - ingredients_percent_analysis, - ingredients_original_tags, - ingredients_without_ciqual_codes_n, - ingredients_without_ciqual_codes, - ingredients, - known_ingredients_n, - labels_tags, - lang, - languages_tags, - languages_codes, - last_edit_dates_tags, - last_editor, - to_timestamp(last_image_t)::datetime AS last_image_t, - last_modified_by, - to_timestamp(last_modified_t)::datetime AS last_modified_t, - to_timestamp(last_updated_t)::datetime AS last_updated_t, - link, - main_countries_tags, - manufacturing_places, - manufacturing_places_tags, - max_imgid, - misc_tags, - minerals_tags, - new_additives_n, - no_nutrition_data, - nova_group, - nova_groups, - nova_groups_markers, - nova_groups_tags, - nucleotides_tags, - nutrient_levels_tags, - unknown_nutrients_tags, - nutriments, - nutriscore_data, - nutriscore_grade, - nutriscore_score, - nutriscore_tags, - nutrition_data_prepared_per, - nutrition_data, - nutrition_grades_tags, - nutrition_score_beverage, - nutrition_score_warning_fruits_vegetables_nuts_estimate_from_ingredients, - nutrition_score_warning_no_fiber, - nutrition_score_warning_no_fruits_vegetables_nuts, - obsolete_since_date, - obsolete, - origins_tags, - packaging_recycling_tags, - packaging_shapes_tags, - packaging_tags, - packagings_materials, - packagings_n, - packagings_n, - photographers, - pnns_groups_1_tags, - pnns_groups_2_tags, - popularity_key, - popularity_tags, - product_name, - product_quantity_unit, - product_quantity, - purchase_places_tags, - quantity, - rev, - scans_n, - scores, - serving_quantity, - serving_size, - sources, - sources_fields, - specific_ingredients, - states_tags, - stores, - stores_tags, - traces_tags, - unique_scans_n, - unknown_ingredients_n, - vitamins_tags, - weighers_tags, - with_non_nutritive_sweeteners, - with_sweeteners, - FROM read_ndjson('{dataset_path}', ignore_errors=True) -) TO '{output_path}' (FORMAT PARQUET) -; \ No newline at end of file +SELECT + code, + additives_n, + additives_tags, + allergens_from_ingredients, + allergens_from_user, + allergens_tags, + brands_tags, + categories_properties_tags, + categories, + checkers_tags, + cities_tags, + compared_to_category, + complete, + completeness, + correctors_tags, + countries_tags, + to_timestamp(created_t)::datetime AS created_t, -- Convert from unixtime to datetime + creator, + data_quality_errors_tags, + data_quality_info_tags, + data_quality_warnings_tags, + data_sources_tags, + ecoscore_data, + ecoscore_grade, + ecoscore_score, + ecoscore_tags, + editors, + emb_codes, + emb_codes_tags, + entry_dates_tags, + environment_impact_level, + food_groups_tags, + forest_footprint_data, + generic_name, + grades, + images, + informers_tags, + ingredients_analysis_tags, + ingredients_from_palm_oil_n, + ingredients_n, + ingredients_tags, + ingredients_text_with_allergens, + ingredients_text, + COLUMNS('ingredients_text_\w{2}$'), -- All columns containing ingredients_text_.. + ingredients_with_specified_percent_n, + ingredients_with_unspecified_percent_n, + ciqual_food_name_tags, + ingredients_percent_analysis, + ingredients_original_tags, + ingredients_without_ciqual_codes_n, + ingredients_without_ciqual_codes, + ingredients, + known_ingredients_n, + labels_tags, + lang, + languages_tags, + languages_codes, + last_edit_dates_tags, + last_editor, + to_timestamp(last_image_t)::datetime AS last_image_t, + last_modified_by, + to_timestamp(last_modified_t)::datetime AS last_modified_t, + to_timestamp(last_updated_t)::datetime AS last_updated_t, + link, + main_countries_tags, + manufacturing_places, + manufacturing_places_tags, + max_imgid, + misc_tags, + minerals_tags, + new_additives_n, + no_nutrition_data, + nova_group, + nova_groups, + nova_groups_markers, + nova_groups_tags, + nucleotides_tags, + nutrient_levels_tags, + unknown_nutrients_tags, + nutriments, + nutriscore_data, + nutriscore_grade, + nutriscore_score, + nutriscore_tags, + nutrition_data_prepared_per, + nutrition_data, + nutrition_grades_tags, + nutrition_score_beverage, + nutrition_score_warning_fruits_vegetables_nuts_estimate_from_ingredients, + nutrition_score_warning_no_fiber, + nutrition_score_warning_no_fruits_vegetables_nuts, + obsolete_since_date, + obsolete, + origins_tags, + packaging_recycling_tags, + packaging_shapes_tags, + packaging_tags, + packagings_materials, + packagings_n, + packagings_n, + photographers, + pnns_groups_1_tags, + pnns_groups_2_tags, + popularity_key, + popularity_tags, + product_name, + product_quantity_unit, + product_quantity, + purchase_places_tags, + quantity, + rev, + scans_n, + scores, + serving_quantity, + serving_size, + sources, + sources_fields, + specific_ingredients, + states_tags, + stores, + stores_tags, + traces_tags, + unique_scans_n, + unknown_ingredients_n, + vitamins_tags, + weighers_tags, + with_non_nutritive_sweeteners, + with_sweeteners, +FROM read_ndjson('{dataset_path}', ignore_errors=True) \ No newline at end of file