-
Notifications
You must be signed in to change notification settings - Fork 842
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hook up Avro Decoder #6820
Hook up Avro Decoder #6820
Conversation
2e9ab6a
to
893813f
Compare
&self.schema | ||
} | ||
|
||
/// Decode `count` records from `buf` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This separation of decode and flush allows reading records from multiple blocks into the same RecordBatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would help to add in the documentation that the user should call flush
to generate record batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a good step to me -- I think testing list/struct decoding is important, ideally could be done before merge.
Otherwise 🚀
arrow-avro/src/reader/cursor.rs
Outdated
self.buf = &self.buf[1..]; | ||
in_progress |= ((byte & 0x7F) as u32) << shift; | ||
shift += 7; | ||
if byte & 0x80 == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might help to ensure the loop runs a bounded number of times (as it is bad input might run many times / overflow)
It running more than 4 times is ok, a comment about why would be nice
arrow-avro/src/reader/cursor.rs
Outdated
let mut in_progress = 0; | ||
let mut shift = 0; | ||
|
||
while let Some(byte) = self.buf.first().copied() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about the loop
]; | ||
|
||
let expected = RecordBatch::try_from_iter_with_nullable([ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
&self.schema | ||
} | ||
|
||
/// Decode `count` records from `buf` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would help to add in the documentation that the user should call flush
to generate record batches.
} | ||
|
||
impl RecordDecoder { | ||
pub fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might help to document what types are supported and which are not (e.g. interval / fixed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll make a note to include this in the documentation for the public reader once created
} | ||
Codec::Fixed(_) => return nyi("decoding fixed"), | ||
Codec::Interval => return nyi("decoding interval"), | ||
Codec::List(item) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I didn't see any tests for list / struct decoding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I wanted to keep the diff smaller, definitely more testing is needed before we make any of this public (currently all the module is private)
Thank you @tustvold |
/// Returns the current cursor position | ||
#[inline] | ||
pub(crate) fn position(&self) -> usize { | ||
self.start_len - self.buf.len() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where start_len is used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see -- thank you
} | ||
Codec::Fixed(_) => return nyi("decoding fixed"), | ||
Codec::Interval => return nyi("decoding interval"), | ||
Codec::List(item) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I wanted to keep the diff smaller, definitely more testing is needed before we make any of this public (currently all the module is private)
Which issue does this PR close?
Part of #4886
Rationale for this change
This hooks up the codec logic into a decoder, the rest of #4886 is then just a plumbing and testing exercise.
What changes are included in this PR?
Are there any user-facing changes?
No, none of these types are public yet