-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rust): ParquetCloudSink to allow streaming pipelines into remote ObjectStores #10060
Conversation
…an be used directly
…re is enabled even without enabling any of the `aws`/`gcp`/`azure` features.
...to create the appropriate ObjectStore from the given URI.
…it is Sync by itself already
About the feature flags: I propose a new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Qqwy I have left some comments, but I think we are almost there.
I've marked this PR now as 'ready'. Still missing are tests (see notes above about questions I have regarding these), and a similar implementation for cloud-synching IPC files, but I am all up for introducing these two things in separate PRs to make the reviewing work easier. |
pub fn sink_parquet_cloud( | ||
mut self, | ||
uri: String, | ||
cloud_options: Option<polars_core::cloud::CloudOptions>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides the cloud_options
parameter, this looks exactly the same as the sink_parquet
method. Isn't there a possibility that you can just pass in the uri and the write/sink methods can determine if it's an uri or a local path?
That would make it much more easy to maintain the code, and open up all methods to natively read & write to local/cloud based sources/targets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While possible, there are some drawbacks of doing so:
- It is a backwards-incompatible change. Polars is still pre-1.0 but nonetheless breaking existing code is not something to do lightly.
- It makes
sink_parquet
more difficult to use for anyone not using the cloud feature, since for writing to a normal file you'd now need to pass in an extraNone
every time. - It complicates how to deal with feature flags. Currently, the feature flag
cloud_write
enables/disablessink_parquet_cloud
. Attempting to use it without the feature flag enabled results in a compile-time error. If it would alter the internals ofsink_parquet
, the error would only be caught at runtime.
That is why I decided to keep it separate in this PR. Unifying the APIs is definitely interesting and maybe this refactor might be worth considering in a future version, but I did not want to rush this for these reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I keep forgetting Rust doesn't have default parameters... Pity because here it would be so much more helpful!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this PR enable us to write LazyFrames to cloud storage using sink_parquet? If so, how do I so this? I can't figure it out.
@Qqwy can you do a rebase? Then we can merge this work. Please ping me when done, so we don't get conflicts. :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this PR however I can't shake the feeling that it could be easier (and easier for other sinks to use this cloud storage as well).
Let me try to put my gut-feeling into words:
- Right now you created a new
sink_parquet_cloud
method that besides all thesink_parquet*
options receives acloud_options
argument (note to self: nope, rust has still no overloading). - This method fires off and saves the file onto the cloud
- Wouldn't it be easier if we can pass in a path or object_store into the sink_parquet method directly?
- There is done something similar in the read_parquet within the python library that does something like "check if the file allows for reading" or "check if it's a string to a path". Finally out of that method comes a readable bytestream.
I'm guessing this should be possible here as well?
That would allow for passing in a path or writable bytestream, leaving the cloud-options at the side of the client, and not something that needs to be considered by polars. The only thing that polars need to do is check if it can write to it, or if it's a string to a path. - Allowing for that last one makes it a breeze to implement the same for the other sink-methods (parquet/ipc/csv (and in feat: Implement
LazyFrame.sink_ndjson
#10786 json/json lines). They would all work right out of the box with that.
Hopefully I made sense, but I would love to see it work in such a way (and sadly my rust knowledge is still lacking greatly)
pub fn sink_parquet_cloud( | ||
mut self, | ||
uri: String, | ||
cloud_options: Option<polars_core::cloud::CloudOptions>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I keep forgetting Rust doesn't have default parameters... Pity because here it would be so much more helpful!
throughout Polars
… Windows Thank you, @svaningelgem!
@svaningelgem There are definitely ways to improve the ergonomics of the new features. But I think it's better to discuss these in a separate issue after this PR is merged because the PR has already grown much larger than originally intended. Separate small 'refactor' PRs will be much easier to make and review afterwards. @ritchie46 conflicts are resolved. Ready for merge! 🚀 |
Alright. Here we go. |
This PR implements #9976
Progress:
LogicalPlan
andALogicalPlan
FileSink
and newly introducedCloudSink
with singleSink
enum.