diff --git a/Cargo.toml b/Cargo.toml index fcd13f7..310515d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,11 +15,13 @@ default = ["reqwest-async", "reqwest-sync"] reqwest-async = ["reqwest"] reqwest-sync = ["reqwest/blocking"] ureq-sync = ["ureq"] +parquet = ["dep:parquet"] [dependencies] async-trait = "0.1.51" byteorder = "1.4.2" bytes = "1.0.1" +parquet = { version = "46.0.0", optional = true } read-logger = "0.2.0" reqwest = { version = "0.11.0", default-features = false, features = ["default-tls"], optional = true } thiserror = "1.0" diff --git a/src/buffered_range_client.rs b/src/buffered_range_client.rs index 7250b31..112b2be 100644 --- a/src/buffered_range_client.rs +++ b/src/buffered_range_client.rs @@ -5,6 +5,7 @@ use std::cmp::{max, min}; use std::str::{self, FromStr}; /// Buffer for Range request reader (https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests) +#[derive(Clone)] struct HttpRangeBuffer { buf: BytesMut, min_req_size: usize, @@ -146,14 +147,15 @@ pub(crate) mod sync { use std::io::{Read, Seek, SeekFrom}; /// HTTP client adapter for HTTP Range requests with a buffer optimized for sequential reading - pub struct SyncBufferedHttpRangeClient { + #[derive(Clone)] + pub struct SyncBufferedHttpRangeClient { http_client: T, url: String, buffer: HttpRangeBuffer, - length_info: Option>, + pub(crate) length_info: Option>, } - impl SyncBufferedHttpRangeClient { + impl SyncBufferedHttpRangeClient { pub fn with(http_client: T, url: &str) -> SyncBufferedHttpRangeClient { SyncBufferedHttpRangeClient { http_client, @@ -230,7 +232,7 @@ pub(crate) mod sync { } } - impl Read for SyncBufferedHttpRangeClient { + impl Read for SyncBufferedHttpRangeClient { fn read(&mut self, buf: &mut [u8]) -> std::result::Result { let length = buf.len(); let mut bytes = self.get_bytes(length).map_err(|e| match e { @@ -244,7 +246,7 @@ pub(crate) mod sync { } } - impl Seek for SyncBufferedHttpRangeClient { + impl Seek for SyncBufferedHttpRangeClient { fn seek(&mut self, pos: SeekFrom) -> std::result::Result { match pos { SeekFrom::Start(p) => { diff --git a/src/lib.rs b/src/lib.rs index b0871b8..21e116f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,8 @@ mod buffered_range_client; mod error; +#[cfg(feature = "parquet")] +mod parquet; mod range_client; #[cfg(any(feature = "reqwest-async", feature = "reqwest-sync"))] mod reqwest_client; diff --git a/src/parquet.rs b/src/parquet.rs new file mode 100644 index 0000000..9b8f28d --- /dev/null +++ b/src/parquet.rs @@ -0,0 +1,26 @@ +use crate::{SyncBufferedHttpRangeClient, SyncHttpRangeClient}; +use bytes::Bytes; +use parquet::errors::Result; +use parquet::file::reader::{ChunkReader, Length}; +use std::io::{Seek, SeekFrom}; + +impl Length for SyncBufferedHttpRangeClient { + fn len(&self) -> u64 { + self.length_info.unwrap_or(Some(0)).unwrap_or(0) + } +} + +impl ChunkReader for SyncBufferedHttpRangeClient { + type T = SyncBufferedHttpRangeClient; + fn get_read(&self, start: u64) -> Result { + let mut client = (*self).clone(); + client.seek(SeekFrom::Start(start)).unwrap(); + Ok(client) + } + fn get_bytes(&self, start: u64, length: usize) -> Result { + // let mut client = self.clone(); + // let bytes = client.get_range(start as usize, length).unwrap(); + // Ok(Bytes::from(bytes)) + todo!() + } +}