Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write operation #29

Merged
merged 4 commits into from
Jul 10, 2023
Merged

Write operation #29

merged 4 commits into from
Jul 10, 2023

Conversation

fjetter
Copy link
Contributor

@fjetter fjetter commented Jun 23, 2023

Closes #4

It effectively translates/copied most of https://github.com/delta-io/delta-rs/blob/e5dd8e2167b94e6856aa531d878584397d5bea69/python/deltalake/writer.py#L142-L342 to dask. I omitted the overwrite path for now mostly for simplicities sake. The only genuine addition here is how this would have to be wired together with dask (the HLG / map_partition / Scalar foo)

There is a lot of overlap and I have to hook into some internal flagged APIs of delta-rs . This can be hopefully cleaned up eventually (cc @MrPowers)

@codecov-commenter
Copy link

codecov-commenter commented Jun 28, 2023

Codecov Report

Merging #29 (3a17af6) into main (ba3801c) will decrease coverage by 19.57%.
The diff coverage is 62.43%.

❗ Your organization is not using the GitHub App Integration. As a result you may experience degraded service beginning May 15th. Please install the Github App Integration for your organization. Read more.

@@             Coverage Diff             @@
##             main      #29       +/-   ##
===========================================
- Coverage   92.85%   73.28%   -19.57%     
===========================================
  Files           2        4        +2     
  Lines         112      307      +195     
===========================================
+ Hits          104      225      +121     
- Misses          8       82       +74     
Impacted Files Coverage Δ
dask_deltatable/_schema.py 60.86% <60.86%> (ø)
dask_deltatable/write.py 63.75% <63.75%> (ø)
dask_deltatable/core.py 92.52% <100.00%> (ø)

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor Author

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test coverage is obviously still very bad but I would love to see this getting in soon before we cover all cases

cc @jrbourbeau @MrPowers

@rjzamora I assume you built the dask vanilla-parquet writer. This may interest you as well

Comment on lines +176 to +178
# TODO: This is applying a potentially stricted schema control than what
# Delta requires but if this passes, it should be good to go
schema = validate_compatible(schemas)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took this code from https://github.com/data-engineering-collective/plateau where I've built a similar thing before. The challenge here is to get a schema from all the partitions that represents the transaction the best. this is particularly a struggle for partitions will null columns because pyarrow then does not give a proper schema. This method effectively looks at all written schemas, deduplicates it and merges them into a super-schema, i.e. it fills null columns with a proper type.

It also raises if there are incompatible schemas detected. Incompatible in this sense means, for example that the same column has an integer and a float in two different partitions. This may be stricter than what delta requires and a lot of this is already covered by dask but I used this regardless since we need the scheme deduplication.

I'm very open to throwing this out again down the road but this gets us started quickly since this is battle-proven code.

def test_roundtrip(tmpdir, with_index):
dtypes = {
"str": object,
# FIXME: Categorical data does not work
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of problems around dtypes. We can open dedicated issues for this once the PR is merged

@fjetter fjetter marked this pull request as ready for review June 28, 2023 14:35
@fjetter fjetter changed the title WIP Write operation Write operation Jun 28, 2023
raise NotImplementedError()

written = df.map_partitions(
_write_partition,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we use write_deltalake from deltatable for writing a partition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to separate the writing of the parquet files from the commit to the log. The entire to_deltalake is supposed to be one transaction. Using the deltatable.write_deltalake would create a commit per partition. This is not only not what a transaction is supposed to be but this would almost guarantee that we'd have write conflicts due to concurrent writes.

raise DeltaProtocolError(
"This table's min_writer_version is "
f"{table.protocol().min_writer_version}, "
"but this method only supports version 2."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"but this method only supports version 2."
f"but this method only supports version {MAX_SUPPORTED_WRITER_VERSION}."

partitioning = None
if mode == "overwrite":
# FIXME: There are a couple of checks that are not migrated yet
raise NotImplementedError()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise NotImplementedError()
raise NotImplementedError("mode='overwrite' is not implemented")

format="parquet",
partitioning=partitioning,
# It will not accept a schema if using a RBR
schema=schema if not isinstance(data, RecordBatchReader) else None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would data be a RecordBatchReader if you just created it as pa.Table.from_pandas on line 218?

Copy link
Collaborator

@j-bennet j-bennet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Currently, there are a couple of cases that are not covered by this PR. I would either open issues to fix those cases later, or add tests that are xfailed, so they serve as a reminder.

@fjetter fjetter merged commit 08cc5b4 into dask-contrib:main Jul 10, 2023
13 checks passed
@fjetter fjetter deleted the dask_writer branch July 10, 2023 14:45
@mrocklin mrocklin mentioned this pull request Jul 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create Dask Delta writer
3 participants