Skip to content

Commit

Permalink
fix: 🔥 Works!
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyarancio committed Nov 12, 2024
1 parent b64a308 commit 6a523ea
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 38 deletions.
75 changes: 38 additions & 37 deletions robotoff/utils/export.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"Functions to postprocess the database conversion into Parquet."

import json

import pyarrow as pa
Expand All @@ -15,40 +16,38 @@
]
)

_dict_schema = pa.struct(
[
pa.field("key", pa.string(), nullable=True),
pa.field("imgid", pa.string(), nullable=True),
pa.field(
"sizes",
pa.struct(
[
pa.field("100", _size_schema, nullable=True),
pa.field("200", _size_schema, nullable=True),
pa.field("400", _size_schema, nullable=True),
pa.field("full", _size_schema, nullable=True),
]

IMAGES_DATATYPE = pa.list_(
pa.struct(
[
pa.field("key", pa.string(), nullable=True),
pa.field("imgid", pa.string(), nullable=True),
pa.field(
"sizes",
pa.struct(
[
pa.field("100", _size_schema, nullable=True),
pa.field("200", _size_schema, nullable=True),
pa.field("400", _size_schema, nullable=True),
pa.field("full", _size_schema, nullable=True),
]
),
nullable=True,
),
nullable=True,
),
pa.field("uploaded_t", pa.string(), nullable=True),
pa.field("uploader", pa.string(), nullable=True),
]
pa.field("uploaded_t", pa.string(), nullable=True),
pa.field("uploader", pa.string(), nullable=True),
]
)
)

IMAGES_DATATYPE = pa.list_(_dict_schema)

SCHEMAS = {
"images": IMAGES_DATATYPE
}

SCHEMAS = {"images": IMAGES_DATATYPE}


################
# Functions
################
def sink_to_parquet(parquet_path: str, output_path: str):
parquet_file = pq.ParquetFile(parquet_path)
parquet_file = pq.ParquetFile(parquet_path)
updated_schema = update_schema(parquet_file.schema.to_arrow_schema())
with pq.ParquetWriter(output_path, schema=updated_schema) as writer:
for batch in parquet_file.iter_batches(batch_size=1000):
Expand All @@ -74,10 +73,7 @@ def _postprocess_arrow_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
return batch


def _postprocess_images(
batch: pa.RecordBatch,
datatype: pa.DataType = IMAGES_DATATYPE
):
def _postprocess_images(batch: pa.RecordBatch, datatype: pa.DataType = IMAGES_DATATYPE):
postprocessed_images = []
images: list[dict | None] = [
json.loads(image) if image else None for image in batch["images"].to_pylist()
Expand Down Expand Up @@ -115,22 +111,27 @@ def _postprocess_images(
)
else:
postprocessed_images.append([])
images_array = pa.array(postprocessed_images, type=datatype)
batch = batch.set_column(1, "images", images_array)
images_col_index = batch.schema.get_field_index("images")
batch = batch.set_column(
images_col_index,
pa.field("images", datatype),
pa.array(postprocessed_images, type=datatype)
)
return batch


def update_schema(schema: pa.Schema) -> pa.Schema:
for field_name, field_datatype in SCHEMAS.items():
schema = _udpate_schema_by_field(schema=schema, field_name=field_name, field_datatype=field_datatype)
schema = _udpate_schema_by_field(
schema=schema, field_name=field_name, field_datatype=field_datatype
)
return schema


def _udpate_schema_by_field(schema: pa.Schema, field_name: str, field_datatype: pa.DataType) -> pa.schema:
def _udpate_schema_by_field(
schema: pa.Schema, field_name: str, field_datatype: pa.DataType
) -> pa.schema:
field_index = schema.get_field_index(field_name)
schema = schema.remove(field_index)
schema = schema.insert(
field_index,
pa.field(field_name, field_datatype)
)
schema = schema.insert(field_index, pa.field(field_name, field_datatype))
return schema
1 change: 0 additions & 1 deletion robotoff/utils/sql/jsonl_to_parquet.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ COPY(
ingredients_text_with_allergens,
ingredients_text,
COLUMNS('ingredients_text_\w{2}$'),
-- All columns containing ingredients_text_..
ingredients_with_specified_percent_n,
ingredients_with_unspecified_percent_n,
ciqual_food_name_tags,
Expand Down

0 comments on commit 6a523ea

Please sign in to comment.