Skip to content

Commit

Permalink
Ensure dask-deltatable can write datetimes with different resolution (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
j-bennet authored Jul 12, 2023
1 parent 9adc684 commit d897cb1
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 11 deletions.
24 changes: 24 additions & 0 deletions dask_deltatable/_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,30 @@ def __hash__(self):
return hash(_schema2bytes(self.schema))


def pyarrow_to_deltalake(schema: pa.Schema) -> pa.Schema:
"""Adjust data types to make schema compatible with Delta Lake dtypes.
Not all Arrow data types are supported by Delta Lake. See also
``deltalake.schema.delta_arrow_schema_from_pandas``.
Notes
-----
We shouldn't need this when https://github.com/delta-io/delta-rs/issues/686 is closed
"""
schema_out = []
for field in schema:
if isinstance(field.type, pa.TimestampType):
f = pa.field(
name=field.name,
type=pa.timestamp("us"),
nullable=field.nullable,
metadata=field.metadata,
)
schema_out.append(f)
else:
schema_out.append(field)
return pa.schema(schema_out, metadata=schema.metadata)


def _pandas_in_schemas(schemas):
"""Check if any schema contains pandas metadata."""
has_pandas = False
Expand Down
84 changes: 76 additions & 8 deletions dask_deltatable/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
)
from toolz.itertoolz import pluck

from ._schema import validate_compatible
from ._schema import pyarrow_to_deltalake, validate_compatible


def to_deltalake(
Expand All @@ -54,9 +54,71 @@ def to_deltalake(
storage_options: dict[str, str] | None = None,
partition_filters: list[tuple[str, str, Any]] | None = None,
):
"""Write a given dask.DataFrame to a delta table
"""Write a given dask.DataFrame to a delta table. The returned value is a Dask Scalar,
and the writing operation is only triggered when calling ``.compute()``
TODO:
Parameters
----------
table_or_uri: str | Path | DeltaTable
URI of a table or a DeltaTable object.
df: dd.DataFrame
Data to write
schema : pa.Schema | None. Default None
Optional schema to write.
partition_by : list[str] | str | None. Default None
List of columns to partition the table by. Only required
when creating a new table
filesystem : pa_fs.FileSystem | None. Default None
Optional filesystem to pass to PyArrow. If not provided will
be inferred from uri. The file system has to be rooted in the table root.
Use the pyarrow.fs.SubTreeFileSystem, to adopt the root of pyarrow file systems.
mode : Literal["error", "append", "overwrite", "ignore"]. Default "error"
How to handle existing data. Default is to error if table already exists.
If 'append', will add new data.
If 'overwrite', will replace table with new data.
If 'ignore', will not write anything if table already exists.
file_options : ds.ParquetFileWriteOptions | None. Default None
Optional write options for Parquet (ParquetFileWriteOptions).
Can be provided with defaults using ParquetFileWriteOptions().make_write_options().
Please refer to https://github.com/apache/arrow/blob/master/python/pyarrow/_dataset_parquet.pyx
for the list of available options
max_partitions : int | None. Default None
The maximum number of partitions that will be used.
max_open_files : int. Default 1024
Limits the maximum number of
files that can be left open while writing. If an attempt is made to open
too many files then the least recently used file will be closed.
If this setting is set too low you may end up fragmenting your
data into many small files.
max_rows_per_file : int. Default 10 * 1024 * 1024
Maximum number of rows per file.
If greater than 0 then this will limit how many rows are placed in any single file.
Otherwise there will be no limit and one file will be created in each output directory
unless files need to be closed to respect max_open_files
min_rows_per_group : int. Default 64 * 1024
Minimum number of rows per group. When the value is set,
the dataset writer will batch incoming data and only write the row groups to the disk
when sufficient rows have accumulated.
max_rows_per_group : int. Default 128 * 1024
Maximum number of rows per group.
If the value is set, then the dataset writer may split up large incoming batches into multiple row groups.
If this value is set, then min_rows_per_group should also be set
name: str | None. Default None
User-provided identifier for this table.
description : str | None. Default None
User-provided description for this table
configuration : Mapping[str, str | None] | None. Default None
A map containing configuration options for the metadata action.
overwrite_schema : bool. Default False
If True, allows updating the schema of the table.
storage_options : dict[str, str] | None. Default None
Options passed to the native delta filesystem. Unused if 'filesystem' is defined
partition_filters : list[tuple[str, str, Any]] | None. Default None
The partition filters that will be used for partition overwrite.
Returns
-------
dask.Scalar
"""
table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options)

Expand All @@ -76,9 +138,14 @@ def to_deltalake(
if isinstance(partition_by, str):
partition_by = [partition_by]

if schema is not None:
schema = pyarrow_to_deltalake(schema)

if table: # already exists
if schema != table.schema().to_pyarrow() and not (
mode == "overwrite" and overwrite_schema
if (
schema is not None
and schema != table.schema().to_pyarrow()
and not (mode == "overwrite" and overwrite_schema)
):
raise ValueError(
"Schema of data does not match table schema\n"
Expand Down Expand Up @@ -214,9 +281,10 @@ def _write_partition(
filesystem,
max_partitions,
) -> tuple[pa.Schema, list[AddAction]]:
# TODO: what to do with the schema, if provided
data = pa.Table.from_pandas(df)
schema = schema or data.schema
if schema is None:
#
schema = pyarrow_to_deltalake(pa.Schema.from_pandas(df))
data = pa.Table.from_pandas(df, schema=schema)

add_actions: list[AddAction] = []

Expand Down
20 changes: 17 additions & 3 deletions tests/test_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import os

import dask.dataframe as dd
import pandas as pd
import pytest
from dask.dataframe.utils import assert_eq
from dask.datasets import timeseries
Expand Down Expand Up @@ -41,11 +43,8 @@ def test_roundtrip(tmpdir, with_index):
# partition_freq="1w",
dtypes=dtypes,
)
# FIXME: us is the only precision delta supports. This lib should likely
# cast this itself

ddf = ddf.reset_index()
ddf.timestamp = ddf.timestamp.astype("datetime64[us]")
if with_index:
ddf = ddf.set_index("timestamp")

Expand All @@ -62,3 +61,18 @@ def test_roundtrip(tmpdir, with_index):
# By default, arrow reads with ns resolution
ddf.timestamp = ddf.timestamp.astype("datetime64[ns]")
assert_eq(ddf, ddf_read)


@pytest.mark.parametrize("unit", ["s", "ms", "us", "ns"])
def test_datetime(tmpdir, unit):
"""Ensure we can write datetime with different resolutions,
at least one-way only"""
tmpdir = str(tmpdir)
ts = pd.date_range("2023-01-01", periods=10, freq="1D", unit=unit) # type: ignore[call-arg]
df = pd.DataFrame({"ts": pd.Series(ts)})
ddf = dd.from_pandas(df, npartitions=2)
out = to_deltalake(tmpdir, ddf)
out.compute()
ddf_read = read_delta_table(tmpdir)
# arrow reads back with ns
assert ddf_read.ts.dtype == "datetime64[ns]"

0 comments on commit d897cb1

Please sign in to comment.