diff --git a/robotoff/utils/export.py b/robotoff/utils/export.py index d67b18d7f3..6335da54f6 100644 --- a/robotoff/utils/export.py +++ b/robotoff/utils/export.py @@ -1,4 +1,5 @@ "Functions to postprocess the database conversion into Parquet." + import json import pyarrow as pa @@ -15,40 +16,38 @@ ] ) -_dict_schema = pa.struct( - [ - pa.field("key", pa.string(), nullable=True), - pa.field("imgid", pa.string(), nullable=True), - pa.field( - "sizes", - pa.struct( - [ - pa.field("100", _size_schema, nullable=True), - pa.field("200", _size_schema, nullable=True), - pa.field("400", _size_schema, nullable=True), - pa.field("full", _size_schema, nullable=True), - ] + +IMAGES_DATATYPE = pa.list_( + pa.struct( + [ + pa.field("key", pa.string(), nullable=True), + pa.field("imgid", pa.string(), nullable=True), + pa.field( + "sizes", + pa.struct( + [ + pa.field("100", _size_schema, nullable=True), + pa.field("200", _size_schema, nullable=True), + pa.field("400", _size_schema, nullable=True), + pa.field("full", _size_schema, nullable=True), + ] + ), + nullable=True, ), - nullable=True, - ), - pa.field("uploaded_t", pa.string(), nullable=True), - pa.field("uploader", pa.string(), nullable=True), - ] + pa.field("uploaded_t", pa.string(), nullable=True), + pa.field("uploader", pa.string(), nullable=True), + ] + ) ) -IMAGES_DATATYPE = pa.list_(_dict_schema) - -SCHEMAS = { - "images": IMAGES_DATATYPE -} - +SCHEMAS = {"images": IMAGES_DATATYPE} ################ # Functions ################ def sink_to_parquet(parquet_path: str, output_path: str): - parquet_file = pq.ParquetFile(parquet_path) + parquet_file = pq.ParquetFile(parquet_path) updated_schema = update_schema(parquet_file.schema.to_arrow_schema()) with pq.ParquetWriter(output_path, schema=updated_schema) as writer: for batch in parquet_file.iter_batches(batch_size=1000): @@ -74,10 +73,7 @@ def _postprocess_arrow_batch(batch: pa.RecordBatch) -> pa.RecordBatch: return batch -def _postprocess_images( - batch: pa.RecordBatch, - datatype: pa.DataType = IMAGES_DATATYPE - ): +def _postprocess_images(batch: pa.RecordBatch, datatype: pa.DataType = IMAGES_DATATYPE): postprocessed_images = [] images: list[dict | None] = [ json.loads(image) if image else None for image in batch["images"].to_pylist() @@ -115,22 +111,27 @@ def _postprocess_images( ) else: postprocessed_images.append([]) - images_array = pa.array(postprocessed_images, type=datatype) - batch = batch.set_column(1, "images", images_array) + images_col_index = batch.schema.get_field_index("images") + batch = batch.set_column( + images_col_index, + pa.field("images", datatype), + pa.array(postprocessed_images, type=datatype) + ) return batch def update_schema(schema: pa.Schema) -> pa.Schema: for field_name, field_datatype in SCHEMAS.items(): - schema = _udpate_schema_by_field(schema=schema, field_name=field_name, field_datatype=field_datatype) + schema = _udpate_schema_by_field( + schema=schema, field_name=field_name, field_datatype=field_datatype + ) return schema -def _udpate_schema_by_field(schema: pa.Schema, field_name: str, field_datatype: pa.DataType) -> pa.schema: +def _udpate_schema_by_field( + schema: pa.Schema, field_name: str, field_datatype: pa.DataType +) -> pa.schema: field_index = schema.get_field_index(field_name) schema = schema.remove(field_index) - schema = schema.insert( - field_index, - pa.field(field_name, field_datatype) - ) + schema = schema.insert(field_index, pa.field(field_name, field_datatype)) return schema diff --git a/robotoff/utils/sql/jsonl_to_parquet.sql b/robotoff/utils/sql/jsonl_to_parquet.sql index f9d954ae94..bf469155e5 100644 --- a/robotoff/utils/sql/jsonl_to_parquet.sql +++ b/robotoff/utils/sql/jsonl_to_parquet.sql @@ -45,7 +45,6 @@ COPY( ingredients_text_with_allergens, ingredients_text, COLUMNS('ingredients_text_\w{2}$'), - -- All columns containing ingredients_text_.. ingredients_with_specified_percent_n, ingredients_with_unspecified_percent_n, ciqual_food_name_tags,