Skip to content

Commit

Permalink
Impl traits for parquet crate
Browse files Browse the repository at this point in the history
  • Loading branch information
pka committed Sep 18, 2023
1 parent 933d9da commit a93fb37
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 5 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ default = ["reqwest-async", "reqwest-sync"]
reqwest-async = ["reqwest"]
reqwest-sync = ["reqwest/blocking"]
ureq-sync = ["ureq"]
parquet = ["dep:parquet"]

[dependencies]
async-trait = "0.1.51"
byteorder = "1.4.2"
bytes = "1.0.1"
parquet = { version = "46.0.0", optional = true }
read-logger = "0.2.0"
reqwest = { version = "0.11.0", default-features = false, features = ["default-tls"], optional = true }
thiserror = "1.0"
Expand All @@ -29,5 +31,8 @@ ureq = { version = "2.7.1", optional = true }
env_logger = "0.10.0"
tokio = { version = "1.0.2", default-features = false, features = ["rt-multi-thread", "macros"] }

[patch.crates-io]
read-logger = { path = "../read-logger" }

[package.metadata.docs.rs]
all-features = true
12 changes: 7 additions & 5 deletions src/buffered_range_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::cmp::{max, min};
use std::str::{self, FromStr};

/// Buffer for Range request reader (https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests)
#[derive(Clone)]
struct HttpRangeBuffer {
buf: BytesMut,
min_req_size: usize,
Expand Down Expand Up @@ -146,14 +147,15 @@ pub(crate) mod sync {
use std::io::{Read, Seek, SeekFrom};

/// HTTP client adapter for HTTP Range requests with a buffer optimized for sequential reading
pub struct SyncBufferedHttpRangeClient<T: SyncHttpRangeClient> {
#[derive(Clone)]
pub struct SyncBufferedHttpRangeClient<T: SyncHttpRangeClient + Clone> {
http_client: T,
url: String,
buffer: HttpRangeBuffer,
length_info: Option<Option<u64>>,
pub(crate) length_info: Option<Option<u64>>,
}

impl<T: SyncHttpRangeClient> SyncBufferedHttpRangeClient<T> {
impl<T: SyncHttpRangeClient + Clone> SyncBufferedHttpRangeClient<T> {
pub fn with(http_client: T, url: &str) -> SyncBufferedHttpRangeClient<T> {
SyncBufferedHttpRangeClient {
http_client,
Expand Down Expand Up @@ -230,7 +232,7 @@ pub(crate) mod sync {
}
}

impl<T: SyncHttpRangeClient> Read for SyncBufferedHttpRangeClient<T> {
impl<T: SyncHttpRangeClient + Clone> Read for SyncBufferedHttpRangeClient<T> {
fn read(&mut self, buf: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
let length = buf.len();
let mut bytes = self.get_bytes(length).map_err(|e| match e {
Expand All @@ -244,7 +246,7 @@ pub(crate) mod sync {
}
}

impl<T: SyncHttpRangeClient> Seek for SyncBufferedHttpRangeClient<T> {
impl<T: SyncHttpRangeClient + Clone> Seek for SyncBufferedHttpRangeClient<T> {
fn seek(&mut self, pos: SeekFrom) -> std::result::Result<u64, std::io::Error> {
match pos {
SeekFrom::Start(p) => {
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

mod buffered_range_client;
mod error;
#[cfg(feature = "parquet")]
mod parquet;
mod range_client;
#[cfg(any(feature = "reqwest-async", feature = "reqwest-sync"))]
mod reqwest_client;
Expand Down
25 changes: 25 additions & 0 deletions src/parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use crate::{SyncBufferedHttpRangeClient, SyncHttpRangeClient};
use bytes::Bytes;
use parquet::errors::Result;
use parquet::file::reader::{ChunkReader, Length};
use std::io::{Seek, SeekFrom};

impl<T: SyncHttpRangeClient + Clone> Length for SyncBufferedHttpRangeClient<T> {
fn len(&self) -> u64 {
self.length_info.unwrap_or(Some(0)).unwrap_or(0)
}
}

impl<T: SyncHttpRangeClient + Clone + Send + Sync> ChunkReader for SyncBufferedHttpRangeClient<T> {
type T = SyncBufferedHttpRangeClient<T>;
fn get_read(&self, start: u64) -> Result<Self::T> {
let mut client = (*self).clone();
client.seek(SeekFrom::Start(start)).unwrap();
Ok(client)
}
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let mut client = self.clone();
let bytes = client.get_range(start as usize, length).unwrap();
Ok(Bytes::copy_from_slice(bytes))
}
}

0 comments on commit a93fb37

Please sign in to comment.