Skip to content

Commit

Permalink
fix: 🎨 WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyarancio committed Nov 8, 2024
1 parent d2259a8 commit dbd45d7
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 135 deletions.
11 changes: 9 additions & 2 deletions robotoff/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from robotoff import settings
from robotoff.types import JSONType, ProductIdentifier, ServerType
from robotoff.utils import get_logger, gzip_jsonl_iter, http_session, jsonl_iter
from robotoff.utils import export

logger = get_logger(__name__)

Expand Down Expand Up @@ -592,9 +593,15 @@ def convert_jsonl_to_parquet(
.replace("{output_path}", output_file_path)
)
try:
duckdb.sql(query)
logger.info("Query the JSONL using DuckDB.")
arrow_batches = duckdb.sql(query).fetch_arrow_reader(batch_size=100000)
logger.info("Post-process extracted data using Arrow")
# arrow_batches = export.postprocess_arrow_batches(arrow_batches)
logger.info("Write post-processed data into Parquet.")
export.sink_to_parquet(output_file_path, batches=arrow_batches)

except duckdb.Error as e:
logger.error(f"Error executing query: {query}\nError message: {e}")
logger.error("Error executing query: %s\nError message: %s", query, e)
raise
logger.info("JSONL successfully converted into Parquet file.")

Expand Down
116 changes: 116 additions & 0 deletions robotoff/utils/export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"Functions to postprocess the database conversion into Parquet."

import json
from typing import Iterator

import pyarrow as pa
import pyarrow.parquet as pq


################
# Schemas
################
## Images field
_size_schema = pa.struct(
[
pa.field("h", pa.int32(), nullable=True),
pa.field("w", pa.int32(), nullable=True),
]
)

_dict_schema = pa.struct(
[
pa.field("key", pa.string(), nullable=True),
pa.field("imgid", pa.string(), nullable=True),
pa.field(
"sizes",
pa.struct(
[
pa.field("100", _size_schema, nullable=True),
pa.field("200", _size_schema, nullable=True),
pa.field("400", _size_schema, nullable=True),
pa.field("full", _size_schema, nullable=True),
]
),
nullable=True,
),
pa.field("uploaded_t", pa.string(), nullable=True),
pa.field("uploader", pa.string(), nullable=True),
]
)

IMAGES_DATATYPE = pa.list_(_dict_schema)


################
# Functions
################
def sink_to_parquet(path: str, batches: pa.RecordBatchReader):
schema = batches.schema
schema = schema.remove(schema.get_field_index("images"))
schema = schema.append(pa.field("images", IMAGES_DATATYPE))
with pq.ParquetWriter(path, schema=schema) as writer:
for batch in batches:
batch = batches.read_next_batch()
batch = _postprocess_arrow_batch(batch)
# batch = _postprocess_arrow_batch(batch)
writer.write_batch(batch)


def postprocess_arrow_batches(batches: pa.RecordBatchReader) -> pa.RecordBatchReader:

return pa.RecordBatchReader.from_batches(
schema=batches.schema,
batches=[_postprocess_arrow_batch(batch) for batch in batches]
)


def _postprocess_arrow_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
batch = _postprocess_images(batch)
return batch


def _postprocess_images(
batch: pa.RecordBatch,
datatype: pa.DataType = IMAGES_DATATYPE
):
postprocessed_images = []
images: list[dict | None] = [
json.loads(image) if image else None for image in batch["images"].to_pylist()
]
for image in images:
if image:
postprocessed_images.append(
[
{
"key": key,
"imgid": str(value.get("imgid", "unknown")),
"sizes": {
"100": {
"h": value.get("sizes", {}).get("100", {}).get("h", 0),
"w": value.get("sizes", {}).get("100", {}).get("w", 0),
},
"200": {
"h": value.get("sizes", {}).get("200", {}).get("h", 0),
"w": value.get("sizes", {}).get("200", {}).get("w", 0),
},
"400": {
"h": value.get("sizes", {}).get("400", {}).get("h", 0),
"w": value.get("sizes", {}).get("400", {}).get("w", 0),
},
"full": {
"h": value.get("sizes", {}).get("full", {}).get("h", 0),
"w": value.get("sizes", {}).get("full", {}).get("w", 0),
},
},
"uploaded_t": str(value.get("uploaded_t", "unknown")),
"uploader": str(value.get("uploader", "unknown")),
}
for key, value in image.items()
]
)
else:
postprocessed_images.append([])
images_array = pa.array(postprocessed_images, type=datatype)
batch = batch.set_column(1, "images", images_array)
return batch
Loading

0 comments on commit dbd45d7

Please sign in to comment.