Skip to content

Commit

Permalink
Add support for decoding Timestamp as Decimal128 (#96)
Browse files Browse the repository at this point in the history
* Add support for decoding Timestamp as Decimal128

This allows support for the full range of ORC timestamp, with full precision.

* Avoid overflows

* Fix wording in comment
  • Loading branch information
progval authored Jun 20, 2024
1 parent 47bf8c3 commit f93314a
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 85 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,20 @@ The following table lists how ORC data types are read into Arrow data types:
| Binary | Binary | |
| Decimal | Decimal128 | |
| Date | Date32 | |
| Timestamp | Timestamp(Nanosecond, None) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` |
| Timestamp instant | Timestamp(Nanosecond, UTC) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` |
| Timestamp | Timestamp(Nanosecond, None) | ¹ |
| Timestamp instant | Timestamp(Nanosecond, UTC) | ¹ |
| Struct | Struct | |
| List | List | |
| Map | Map | |
| Union | Union(_, Sparse)* | |
| Union | Union(_, Sparse) | ² |

*Currently only supports a maximum of 127 variants
¹: `ArrowReaderBuilder::with_schema` allows configuring different time units or decoding to
`Decimal128(38, 9)` (i128 of non-leap nanoseconds since UNIX epoch).
Overflows may happen while decoding to a non-Seconds time unit, and results in `OrcError`.
Loss of precision may happen while decoding to a non-Nanosecond time unit, and results in `OrcError`.
`Decimal128(38, 9)` avoids both overflows and loss of precision.

²: Currently only supports a maximum of 127 variants

## Contributing

Expand Down
215 changes: 143 additions & 72 deletions src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ use crate::{
};
use arrow::array::ArrayRef;
use arrow::datatypes::{
ArrowTimestampType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
ArrowTimestampType, Decimal128Type, DecimalType, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
use chrono::offset::TimeZone;
use chrono_tz::Tz;
use chrono::TimeDelta;
use chrono_tz::{Tz, UTC};
use snafu::ensure;

use super::{ArrayBatchDecoder, PrimitiveArrayDecoder};
use super::{ArrayBatchDecoder, DecimalArrayDecoder, PrimitiveArrayDecoder};
use crate::error::UnsupportedTypeVariantSnafu;

const NANOSECONDS_IN_SECOND: i128 = 1_000_000_000;
const NANOSECOND_DIGITS: i8 = 9;

/// Statically dispatches to the right ArrowTimestampType based on the value of $time_unit
/// to create a $decoder_type with that type as type parameter and $iter/$present as value
/// parameters, then applies $f to it and $tz.
Expand All @@ -40,7 +44,7 @@ macro_rules! decoder_for_time_unit {

match $time_unit {
TimeUnit::Second => {
let iter = Box::new(TimestampIterator::<TimestampSecondType>::new(
let iter = Box::new(TimestampIterator::<TimestampSecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
Expand All @@ -51,7 +55,7 @@ macro_rules! decoder_for_time_unit {
)))
}
TimeUnit::Millisecond => {
let iter = Box::new(TimestampIterator::<TimestampMillisecondType>::new(
let iter = Box::new(TimestampIterator::<TimestampMillisecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
Expand All @@ -62,7 +66,7 @@ macro_rules! decoder_for_time_unit {
)))
}
TimeUnit::Microsecond => {
let iter = Box::new(TimestampIterator::<TimestampMicrosecondType>::new(
let iter = Box::new(TimestampIterator::<TimestampMicrosecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
Expand All @@ -73,7 +77,7 @@ macro_rules! decoder_for_time_unit {
)))
}
TimeUnit::Nanosecond => {
let iter = Box::new(TimestampIterator::<TimestampNanosecondType>::new(
let iter = Box::new(TimestampIterator::<TimestampNanosecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
Expand All @@ -87,6 +91,56 @@ macro_rules! decoder_for_time_unit {
}};
}

fn decimal128_decoder(
column: &Column,
stripe: &Stripe,
seconds_since_unix_epoch: i64,
writer_tz: Option<Tz>,
) -> Result<DecimalArrayDecoder> {
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

let iter = TimestampIterator::<TimestampNanosecondType, i128>::new(
seconds_since_unix_epoch,
data,
secondary,
);

let iter: Box<dyn Iterator<Item = _> + Send> = match writer_tz {
Some(UTC) => Box::new(iter), // Avoid overflow-able operations below
Some(writer_tz) => Box::new(iter.map(move |ts| {
let ts = ts?;
let seconds = ts.div_euclid(NANOSECONDS_IN_SECOND);
let nanoseconds = ts.rem_euclid(NANOSECONDS_IN_SECOND);

// The addition may panic, because chrono stores dates in an i32,
// which can be overflowed with an i64 of seconds.
let dt = (writer_tz.timestamp_nanos(0)
+ TimeDelta::new(seconds as i64, nanoseconds as u32)
.expect("TimeDelta duration out of bound"))
.naive_local()
.and_utc();

Ok((dt.timestamp() as i128) * NANOSECONDS_IN_SECOND
+ (dt.timestamp_subsec_nanos() as i128))
})),
None => Box::new(iter),
};

Ok(DecimalArrayDecoder::new(
Decimal128Type::MAX_PRECISION,
NANOSECOND_DIGITS,
iter,
present,
))
}

/// Seconds from ORC epoch of 1 January 2015, which serves as the 0
/// point for all timestamp values, to the UNIX epoch of 1 January 1970.
const ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH: i64 = 1_420_070_400;
Expand All @@ -99,100 +153,117 @@ pub fn new_timestamp_decoder(
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let ArrowDataType::Timestamp(time_unit, None) = field_type else {
MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail()?
};

match stripe.writer_tz() {
let seconds_since_unix_epoch = match stripe.writer_tz() {
Some(writer_tz) => {
// If writer timezone exists then we must take the ORC epoch according
// to that timezone, and find seconds since UTC UNIX epoch for the base.
let seconds_since_unix_epoch = writer_tz
writer_tz
.with_ymd_and_hms(2015, 1, 1, 0, 0, 0)
.unwrap()
.timestamp();
.timestamp()
}
None => {
// No writer timezone, we can assume UTC, so we can use known fixed value
// for the base offset.
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH
}
};

fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
writer_tz: Tz,
) -> TimestampOffsetArrayDecoder<T> {
TimestampOffsetArrayDecoder {
inner: decoder,
match field_type {
ArrowDataType::Timestamp(time_unit, None) => match stripe.writer_tz() {
Some(writer_tz) => {
fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
writer_tz: Tz,
) -> TimestampOffsetArrayDecoder<T> {
TimestampOffsetArrayDecoder {
inner: decoder,
writer_tz,
}
}
decoder_for_time_unit!(
column,
time_unit,
seconds_since_unix_epoch,
stripe,
writer_tz,
f,
)
}
None => {
fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> PrimitiveArrayDecoder<T> {
decoder
}

decoder_for_time_unit!(column, time_unit, seconds_since_unix_epoch, stripe, (), f,)
}
decoder_for_time_unit!(
},
ArrowDataType::Decimal128(Decimal128Type::MAX_PRECISION, NANOSECOND_DIGITS) => {
Ok(Box::new(decimal128_decoder(
column,
time_unit,
seconds_since_unix_epoch,
stripe,
writer_tz,
f,
)
seconds_since_unix_epoch,
stripe.writer_tz(),
)?))
}
None => {
_ => MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail(),
}
}

/// Decodes a TIMESTAMP_INSTANT column stripe into batches of
/// Timestamp{Nano,Micro,Milli,}secondArrays with UTC timezone.
pub fn new_timestamp_instant_decoder(
column: &Column,
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
match field_type {
ArrowDataType::Timestamp(time_unit, Some(tz)) => {
ensure!(
tz.as_ref() == "UTC",
UnsupportedTypeVariantSnafu {
msg: "Non-UTC Arrow timestamps"
}
);

fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> PrimitiveArrayDecoder<T> {
decoder
) -> TimestampInstantArrayDecoder<T> {
TimestampInstantArrayDecoder(decoder)
}

decoder_for_time_unit!(
column,
time_unit,
// No writer timezone, we can assume UTC, so we can use known fixed value
// for the base offset.
// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
stripe,
(),
f,
)
}
}
}

/// Decodes a TIMESTAMP_INSTANT column stripe into batches of
/// Timestamp{Nano,Micro,Milli,}secondArrays with UTC timezone.
pub fn new_timestamp_instant_decoder(
column: &Column,
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let ArrowDataType::Timestamp(time_unit, Some(tz)) = field_type else {
MismatchedSchemaSnafu {
ArrowDataType::Decimal128(Decimal128Type::MAX_PRECISION, NANOSECOND_DIGITS) => {
Ok(Box::new(decimal128_decoder(
column,
stripe,
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
None,
)?))
}
_ => MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail()?
};
ensure!(
tz.as_ref() == "UTC",
UnsupportedTypeVariantSnafu {
msg: "Non-UTC Arrow timestamps"
}
);

fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> TimestampInstantArrayDecoder<T> {
TimestampInstantArrayDecoder(decoder)
.fail()?,
}

decoder_for_time_unit!(
column,
time_unit,
// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
stripe,
(),
f,
)
}

/// Wrapper around PrimitiveArrayDecoder to decode timestamps which are encoded in
Expand Down
17 changes: 9 additions & 8 deletions src/reader/decode/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use crate::error::{DecodeTimestampSnafu, Result};

const NANOSECONDS_IN_SECOND: i64 = 1_000_000_000;

pub struct TimestampIterator<T: ArrowTimestampType> {
pub struct TimestampIterator<T: ArrowTimestampType, Item: TryFrom<i128>> {
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<u64>> + Send>,
_marker: PhantomData<T>,
_marker: PhantomData<(T, Item)>,
}

impl<T: ArrowTimestampType> TimestampIterator<T> {
impl<T: ArrowTimestampType, Item: TryFrom<i128>> TimestampIterator<T, Item> {
pub fn new(
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
Expand All @@ -29,22 +29,23 @@ impl<T: ArrowTimestampType> TimestampIterator<T> {
}
}

impl<T: ArrowTimestampType> Iterator for TimestampIterator<T> {
type Item = Result<i64>;
impl<T: ArrowTimestampType, Item: TryFrom<i128>> Iterator for TimestampIterator<T, Item> {
type Item = Result<Item>;

fn next(&mut self) -> Option<Self::Item> {
// TODO: throw error for mismatched stream lengths?
let (seconds_since_orc_base, nanoseconds) =
self.data.by_ref().zip(self.secondary.by_ref()).next()?;
decode_timestamp::<T>(self.base_from_epoch, seconds_since_orc_base, nanoseconds).transpose()
decode_timestamp::<T, _>(self.base_from_epoch, seconds_since_orc_base, nanoseconds)
.transpose()
}
}

fn decode_timestamp<T: ArrowTimestampType>(
fn decode_timestamp<T: ArrowTimestampType, Ret: TryFrom<i128>>(
base: i64,
seconds_since_orc_base: Result<i64>,
nanoseconds: Result<u64>,
) -> Result<Option<i64>> {
) -> Result<Option<Ret>> {
let data = seconds_since_orc_base?;
let mut nanoseconds = nanoseconds?;
// Last 3 bits indicate how many trailing zeros were truncated
Expand Down
Loading

0 comments on commit f93314a

Please sign in to comment.