Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): ParquetCloudSink to allow streaming pipelines into remote ObjectStores #10060

Merged
merged 68 commits into from
Sep 8, 2023

Conversation

Qqwy
Copy link
Contributor

@Qqwy Qqwy commented Jul 24, 2023

This PR implements #9976

Progress:

  • ParquetCloudSink type, part of LogicalPlan and ALogicalPlan
  • sink_parquet_cloud
  • An example program showing how it can be used.
  • Proper usage of feature flags.
  • Replace FileSink and newly introduced CloudSink with single Sink enum.
  • Unit Tests
  • Integration tests using MinIO or similar

@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature rust Related to Rust Polars labels Jul 24, 2023
@Qqwy
Copy link
Contributor Author

Qqwy commented Jul 24, 2023

About the feature flags: I propose a new cloud flag. Until now, the async flag would toggle some cloud functions but not others. (They would only be toggled by the individual aws/gcp/azure flags). Having a single flag for the common functionality from aws/gcp/azure seems nicer to me. It also makes writing tests easier, since there's the in-memory and local file backends of ObjectStore which are already usable without enabling any cloud implementation in particular.

Copy link
Member

@ritchie46 ritchie46 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Qqwy I have left some comments, but I think we are almost there.

crates/Makefile Outdated Show resolved Hide resolved
crates/polars-lazy/src/frame/mod.rs Outdated Show resolved Hide resolved
crates/polars-lazy/src/physical_plan/planner/lp.rs Outdated Show resolved Hide resolved
@Qqwy Qqwy marked this pull request as ready for review August 22, 2023 10:06
@Qqwy Qqwy requested a review from orlp as a code owner August 22, 2023 10:06
@Qqwy
Copy link
Contributor Author

Qqwy commented Aug 22, 2023

I've marked this PR now as 'ready'. Still missing are tests (see notes above about questions I have regarding these), and a similar implementation for cloud-synching IPC files, but I am all up for introducing these two things in separate PRs to make the reviewing work easier.

@Qqwy Qqwy force-pushed the qqwy/cloud_sink branch from 39a9fe5 to ab968bd Compare August 22, 2023 10:17
pub fn sink_parquet_cloud(
mut self,
uri: String,
cloud_options: Option<polars_core::cloud::CloudOptions>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides the cloud_options parameter, this looks exactly the same as the sink_parquet method. Isn't there a possibility that you can just pass in the uri and the write/sink methods can determine if it's an uri or a local path?

That would make it much more easy to maintain the code, and open up all methods to natively read & write to local/cloud based sources/targets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While possible, there are some drawbacks of doing so:

  • It is a backwards-incompatible change. Polars is still pre-1.0 but nonetheless breaking existing code is not something to do lightly.
  • It makes sink_parquet more difficult to use for anyone not using the cloud feature, since for writing to a normal file you'd now need to pass in an extra None every time.
  • It complicates how to deal with feature flags. Currently, the feature flag cloud_write enables/disables sink_parquet_cloud. Attempting to use it without the feature flag enabled results in a compile-time error. If it would alter the internals of sink_parquet, the error would only be caught at runtime.

That is why I decided to keep it separate in this PR. Unifying the APIs is definitely interesting and maybe this refactor might be worth considering in a future version, but I did not want to rush this for these reasons.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I keep forgetting Rust doesn't have default parameters... Pity because here it would be so much more helpful!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this PR enable us to write LazyFrames to cloud storage using sink_parquet? If so, how do I so this? I can't figure it out.

@Qqwy Qqwy requested a review from ritchie46 August 28, 2023 13:38
@ritchie46
Copy link
Member

@Qqwy can you do a rebase? Then we can merge this work. Please ping me when done, so we don't get conflicts. :)

Copy link
Contributor

@svaningelgem svaningelgem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this PR however I can't shake the feeling that it could be easier (and easier for other sinks to use this cloud storage as well).

Let me try to put my gut-feeling into words:

  • Right now you created a new sink_parquet_cloud method that besides all the sink_parquet* options receives a cloud_options argument (note to self: nope, rust has still no overloading).
  • This method fires off and saves the file onto the cloud
  • Wouldn't it be easier if we can pass in a path or object_store into the sink_parquet method directly?
  • There is done something similar in the read_parquet within the python library that does something like "check if the file allows for reading" or "check if it's a string to a path". Finally out of that method comes a readable bytestream.
    I'm guessing this should be possible here as well?
    That would allow for passing in a path or writable bytestream, leaving the cloud-options at the side of the client, and not something that needs to be considered by polars. The only thing that polars need to do is check if it can write to it, or if it's a string to a path.
  • Allowing for that last one makes it a breeze to implement the same for the other sink-methods (parquet/ipc/csv (and in feat: Implement LazyFrame.sink_ndjson #10786 json/json lines). They would all work right out of the box with that.

Hopefully I made sense, but I would love to see it work in such a way (and sadly my rust knowledge is still lacking greatly)

crates/polars-io/src/cloud/adaptors.rs Outdated Show resolved Hide resolved
pub fn sink_parquet_cloud(
mut self,
uri: String,
cloud_options: Option<polars_core::cloud::CloudOptions>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I keep forgetting Rust doesn't have default parameters... Pity because here it would be so much more helpful!

@Qqwy
Copy link
Contributor Author

Qqwy commented Sep 8, 2023

@svaningelgem There are definitely ways to improve the ergonomics of the new features. But I think it's better to discuss these in a separate issue after this PR is merged because the PR has already grown much larger than originally intended. Separate small 'refactor' PRs will be much easier to make and review afterwards.

@ritchie46 conflicts are resolved. Ready for merge! 🚀

@ritchie46
Copy link
Member

Alright. Here we go.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants