Skip to content

Commit

Permalink
fix: 🚧 WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyarancio committed Nov 11, 2024
1 parent dbd45d7 commit b64a308
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 206 deletions.
87 changes: 45 additions & 42 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ google-cloud-batch = "~0.17.26"
duckdb = "~1.0.0"
google-cloud-storage = "~2.14.0"
pandas = "~2.2.2"
pyarrow = "~17.0.0"
pyarrow = "18.0.0"
rich = "~13.9.2" # Used for CLI pretty print

[tool.poetry.dependencies.sentry-sdk]
Expand Down
37 changes: 21 additions & 16 deletions robotoff/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,27 +582,32 @@ def convert_jsonl_to_parquet(
dataset_path: Path = settings.JSONL_DATASET_PATH,
query_path: Path = settings.JSONL_TO_PARQUET_SQL_QUERY,
) -> None:
from pyarrow import ArrowException
logger.info("Start JSONL to Parquet conversion process.")
if not dataset_path.exists() or not query_path.exists():
raise FileNotFoundError(
f"{str(dataset_path)} or {str(query_path)} was not found."
)
query = (
query_path.read_text()
.replace("{dataset_path}", str(dataset_path))
.replace("{output_path}", output_file_path)
)
try:
logger.info("Query the JSONL using DuckDB.")
arrow_batches = duckdb.sql(query).fetch_arrow_reader(batch_size=100000)
logger.info("Post-process extracted data using Arrow")
# arrow_batches = export.postprocess_arrow_batches(arrow_batches)
logger.info("Write post-processed data into Parquet.")
export.sink_to_parquet(output_file_path, batches=arrow_batches)

except duckdb.Error as e:
logger.error("Error executing query: %s\nError message: %s", query, e)
raise
with tempfile.TemporaryDirectory() as tmp_dir:
try:
tmp_parquet_path = os.path.join(tmp_dir, "temp.parquet")
query = (
query_path.read_text()
.replace("{dataset_path}", str(dataset_path))
.replace("{output_path}", tmp_parquet_path)
)
logger.info("Query the JSONL using DuckDB.")
duckdb.sql(query)
# logger.info("Post-process extracted data using Arrow")
# arrow_batches = export.load_parquet()
logger.info("Write post-processed data into Parquet.")
export.sink_to_parquet(tmp_parquet_path, output_file_path)
except duckdb.Error as e:
logger.error("Error executing query: %s\nError message: %s", query, e)
raise
except ArrowException as e:
logger.error(e)
raise
logger.info("JSONL successfully converted into Parquet file.")


Expand Down
54 changes: 37 additions & 17 deletions robotoff/utils/export.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
"Functions to postprocess the database conversion into Parquet."

import json
from typing import Iterator

import pyarrow as pa
import pyarrow.parquet as pq


################
# Schemas
################
Expand Down Expand Up @@ -41,28 +38,35 @@

IMAGES_DATATYPE = pa.list_(_dict_schema)

SCHEMAS = {
"images": IMAGES_DATATYPE
}



################
# Functions
################
def sink_to_parquet(path: str, batches: pa.RecordBatchReader):
schema = batches.schema
schema = schema.remove(schema.get_field_index("images"))
schema = schema.append(pa.field("images", IMAGES_DATATYPE))
with pq.ParquetWriter(path, schema=schema) as writer:
for batch in batches:
batch = batches.read_next_batch()
def sink_to_parquet(parquet_path: str, output_path: str):
parquet_file = pq.ParquetFile(parquet_path)
updated_schema = update_schema(parquet_file.schema.to_arrow_schema())
with pq.ParquetWriter(output_path, schema=updated_schema) as writer:
for batch in parquet_file.iter_batches(batch_size=1000):
batch = _postprocess_arrow_batch(batch)
# batch = _postprocess_arrow_batch(batch)
writer.write_batch(batch)


def postprocess_arrow_batches(batches: pa.RecordBatchReader) -> pa.RecordBatchReader:

return pa.RecordBatchReader.from_batches(
schema=batches.schema,
batches=[_postprocess_arrow_batch(batch) for batch in batches]
)
# def postprocess_arrow_batches(batches: pa.RecordBatchReader) -> pa.RecordBatchReader:
# schema = _udpate_schema_by_field(
# schema=batches.schema,
# field_name="images",
# field_datatype=IMAGES_DATATYPE,
# )
# batches = [_postprocess_arrow_batch(batch) for batch in batches]
# return pa.RecordBatchReader.from_batches(
# schema=schema,
# batches=batches,
# )


def _postprocess_arrow_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
Expand Down Expand Up @@ -114,3 +118,19 @@ def _postprocess_images(
images_array = pa.array(postprocessed_images, type=datatype)
batch = batch.set_column(1, "images", images_array)
return batch


def update_schema(schema: pa.Schema) -> pa.Schema:
for field_name, field_datatype in SCHEMAS.items():
schema = _udpate_schema_by_field(schema=schema, field_name=field_name, field_datatype=field_datatype)
return schema


def _udpate_schema_by_field(schema: pa.Schema, field_name: str, field_datatype: pa.DataType) -> pa.schema:
field_index = schema.get_field_index(field_name)
schema = schema.remove(field_index)
schema = schema.insert(
field_index,
pa.field(field_name, field_datatype)
)
return schema
Loading

0 comments on commit b64a308

Please sign in to comment.