Skip to content

Commit

Permalink
fix(rust): Refactor decompression checks and add support for decompre…
Browse files Browse the repository at this point in the history
…ssing JSON (#18536)

Co-authored-by: Ohan Fillbach <ofillbach@vectra.ai>
  • Loading branch information
ohanf and Ohan Fillbach authored Sep 11, 2024
1 parent eac567f commit 79b91c3
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 73 deletions.
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::options::{CommentPrefix, NullValuesCompiled};
use super::splitfields::SplitFields;
use super::utils::get_file_chunks;
use crate::path_utils::is_cloud_url;
use crate::utils::maybe_decompress_bytes;
use crate::utils::compression::maybe_decompress_bytes;

/// Read the number of rows without parsing columns
/// useful for count(*) queries
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use super::utils::get_file_chunks;
use crate::mmap::ReaderBytes;
use crate::predicates::PhysicalIoExpr;
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
use crate::utils::is_compressed;
use crate::utils::compression::SupportedCompression;
use crate::utils::update_row_counts;
use crate::RowIndex;

Expand Down Expand Up @@ -179,7 +179,7 @@ impl<'a> CoreReader<'a> {
let mut reader_bytes = reader_bytes;

#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
if is_compressed(&reader_bytes) {
if SupportedCompression::check(&reader_bytes).is_some() {
polars_bail!(
ComputeError: "cannot read compressed CSV file; \
compile with feature 'decompress' or 'decompress-fast'"
Expand Down
27 changes: 17 additions & 10 deletions crates/polars-io/src/csv/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,23 @@ pub(crate) fn decompress(
quote_char: Option<u8>,
eol_char: u8,
) -> Option<Vec<u8>> {
use crate::utils::compression::magic::*;
if bytes.starts_with(&GZIP) {
let mut decoder = flate2::read::MultiGzDecoder::new(bytes);
decompress_impl(&mut decoder, n_rows, separator, quote_char, eol_char)
} else if bytes.starts_with(&ZLIB0) || bytes.starts_with(&ZLIB1) || bytes.starts_with(&ZLIB2) {
let mut decoder = flate2::read::ZlibDecoder::new(bytes);
decompress_impl(&mut decoder, n_rows, separator, quote_char, eol_char)
} else if bytes.starts_with(&ZSTD) {
let mut decoder = zstd::Decoder::new(bytes).ok()?;
decompress_impl(&mut decoder, n_rows, separator, quote_char, eol_char)
use crate::utils::compression::SupportedCompression;

if let Some(algo) = SupportedCompression::check(bytes) {
match algo {
SupportedCompression::GZIP => {
let mut decoder = flate2::read::MultiGzDecoder::new(bytes);
decompress_impl(&mut decoder, n_rows, separator, quote_char, eol_char)
},
SupportedCompression::ZLIB => {
let mut decoder = flate2::read::ZlibDecoder::new(bytes);
decompress_impl(&mut decoder, n_rows, separator, quote_char, eol_char)
},
SupportedCompression::ZSTD => {
let mut decoder = zstd::Decoder::new(bytes).ok()?;
decompress_impl(&mut decoder, n_rows, separator, quote_char, eol_char)
},
}
} else {
None
}
Expand Down
11 changes: 9 additions & 2 deletions crates/polars-io/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,15 @@ where
JsonFormat::Json => {
polars_ensure!(!self.ignore_errors, InvalidOperation: "'ignore_errors' only supported in ndjson");
let mut bytes = rb.deref().to_vec();
let json_value =
simd_json::to_borrowed_value(&mut bytes).map_err(to_compute_err)?;
let owned = &mut vec![];
compression::maybe_decompress_bytes(&bytes, owned)?;
// the easiest way to avoid ownership issues is by implicitly figuring out if
// decompression happened (owned is only populated on decompress), then pick which bytes to parse
let json_value = if owned.is_empty() {
simd_json::to_borrowed_value(&mut bytes).map_err(to_compute_err)?
} else {
simd_json::to_borrowed_value(owned).map_err(to_compute_err)?
};

// struct type
let dtype = if let Some(mut schema) = self.schema {
Expand Down
77 changes: 62 additions & 15 deletions crates/polars-io/src/utils/compression.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,66 @@
// magic numbers
pub mod magic {
pub const GZIP: [u8; 2] = [31, 139];
pub const ZLIB0: [u8; 2] = [0x78, 0x01];
pub const ZLIB1: [u8; 2] = [0x78, 0x9C];
pub const ZLIB2: [u8; 2] = [0x78, 0xDA];
pub const ZSTD: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
use std::io::Read;

use polars_core::prelude::*;
use polars_error::to_compute_err;

/// Represents the compression algorithms that we have decoders for
pub enum SupportedCompression {
GZIP,
ZLIB,
ZSTD,
}

/// check if csv file is compressed
pub fn is_compressed(bytes: &[u8]) -> bool {
use magic::*;
impl SupportedCompression {
/// If the given byte slice starts with the "magic" bytes for a supported compression family, return
/// that family, for unsupported/uncompressed slices, return None
pub fn check(bytes: &[u8]) -> Option<Self> {
if bytes.len() < 4 {
// not enough bytes to perform prefix checks
return None;
}
match bytes[..4] {
[31, 139, _, _] => Some(Self::GZIP),
[0x78, 0x01, _, _] | // ZLIB0
[0x78, 0x9C, _, _] | // ZLIB1
[0x78, 0xDA, _, _] // ZLIB2
=> Some(Self::ZLIB),
[0x28, 0xB5, 0x2F, 0xFD] => Some(Self::ZSTD),
_ => None,
}
}
}

/// Decompress `bytes` if compression is detected, otherwise simply return it.
/// An `out` vec must be given for ownership of the decompressed data.
pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {
assert!(out.is_empty());

if let Some(algo) = SupportedCompression::check(bytes) {
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
{
match algo {
SupportedCompression::GZIP => {
flate2::read::MultiGzDecoder::new(bytes)
.read_to_end(out)
.map_err(to_compute_err)?;
},
SupportedCompression::ZLIB => {
flate2::read::ZlibDecoder::new(bytes)
.read_to_end(out)
.map_err(to_compute_err)?;
},
SupportedCompression::ZSTD => {
zstd::Decoder::new(bytes)?.read_to_end(out)?;
},
}

bytes.starts_with(&ZLIB0)
|| bytes.starts_with(&ZLIB1)
|| bytes.starts_with(&ZLIB2)
|| bytes.starts_with(&GZIP)
|| bytes.starts_with(&ZSTD)
Ok(out)
}
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
{
panic!("cannot decompress without 'decompress' or 'decompress-fast' feature")
}
} else {
Ok(bytes)
}
}
1 change: 0 additions & 1 deletion crates/polars-io/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub mod compression;
mod other;

pub use compression::is_compressed;
pub use other::*;
#[cfg(feature = "cloud")]
pub mod byte_source;
Expand Down
41 changes: 0 additions & 41 deletions crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use once_cell::sync::Lazy;
use polars_core::prelude::*;
#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use polars_error::to_compute_err;
use polars_utils::mmap::MMapSemaphore;
use regex::{Regex, RegexBuilder};

Expand Down Expand Up @@ -46,46 +45,6 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
}
}

/// Decompress `bytes` if compression is detected, otherwise simply return it.
/// An `out` vec must be given for ownership of the decompressed data.
pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {
assert!(out.is_empty());
use crate::prelude::is_compressed;
let is_compressed = bytes.len() >= 4 && is_compressed(bytes);

if is_compressed {
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
{
use crate::utils::compression::magic::*;

if bytes.starts_with(&GZIP) {
flate2::read::MultiGzDecoder::new(bytes)
.read_to_end(out)
.map_err(to_compute_err)?;
} else if bytes.starts_with(&ZLIB0)
|| bytes.starts_with(&ZLIB1)
|| bytes.starts_with(&ZLIB2)
{
flate2::read::ZlibDecoder::new(bytes)
.read_to_end(out)
.map_err(to_compute_err)?;
} else if bytes.starts_with(&ZSTD) {
zstd::Decoder::new(bytes)?.read_to_end(out)?;
} else {
polars_bail!(ComputeError: "unimplemented compression format")
}

Ok(out)
}
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
{
panic!("cannot decompress without 'decompress' or 'decompress-fast' feature")
}
} else {
Ok(bytes)
}
}

#[cfg(any(
feature = "ipc",
feature = "ipc_streaming",
Expand Down
1 change: 1 addition & 0 deletions crates/polars-mem-engine/src/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use polars_core::config;
use polars_core::utils::{
accumulate_dataframes_vertical, accumulate_dataframes_vertical_unchecked,
};
use polars_io::utils::compression::maybe_decompress_bytes;

use super::*;

Expand Down
1 change: 1 addition & 0 deletions crates/polars-mem-engine/src/executors/scan/ndjson.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use polars_core::config;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_io::utils::compression::maybe_decompress_bytes;

use super::*;

Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/conversion/scans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use polars_io::path_utils::is_cloud_url;
#[cfg(feature = "cloud")]
use polars_io::pl_async::get_runtime;
use polars_io::prelude::*;
use polars_io::utils::compression::maybe_decompress_bytes;
use polars_io::RowIndex;

use super::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/functions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub(super) fn count_rows_ndjson(
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<usize> {
use polars_core::config;
use polars_io::utils::maybe_decompress_bytes;
use polars_io::utils::compression::maybe_decompress_bytes;

if sources.is_empty() {
return Ok(0);
Expand Down
35 changes: 35 additions & 0 deletions py-polars/tests/unit/io/test_json.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from __future__ import annotations

import gzip
import io
import json
import typing
import zlib
from collections import OrderedDict
from decimal import Decimal as D
from io import BytesIO
from typing import TYPE_CHECKING

import zstandard

if TYPE_CHECKING:
from pathlib import Path

Expand Down Expand Up @@ -385,3 +389,34 @@ def test_empty_json() -> None:
df = pl.read_json(b'{"j":{}}')
assert df.dtypes == [pl.Struct([])]
assert df.shape == (0, 1)


def test_compressed_json() -> None:
# shared setup
json_obj = [
{"id": 1, "name": "Alice", "trusted": True},
{"id": 2, "name": "Bob", "trusted": True},
{"id": 3, "name": "Carol", "trusted": False},
]
expected = pl.DataFrame(json_obj, orient="row")
json_bytes = json.dumps(json_obj).encode()

# gzip
compressed_bytes = gzip.compress(json_bytes)
out = pl.read_json(compressed_bytes)
assert_frame_equal(out, expected)

# zlib
compressed_bytes = zlib.compress(json_bytes)
out = pl.read_json(compressed_bytes)
assert_frame_equal(out, expected)

# zstd
compressed_bytes = zstandard.compress(json_bytes)
out = pl.read_json(compressed_bytes)
assert_frame_equal(out, expected)

# no compression
uncompressed = io.BytesIO(json_bytes)
out = pl.read_json(uncompressed)
assert_frame_equal(out, expected)

0 comments on commit 79b91c3

Please sign in to comment.