Skip to content

Commit

Permalink
More types in read_parquet()
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborcsardi committed Aug 13, 2024
1 parent 25d3ae8 commit e00c562
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 30 deletions.
18 changes: 18 additions & 0 deletions R/read.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,23 @@ read_parquet2 <- function(file, options = parquet_options()) {
res <- apply_arrow_schema2(res, file, dicts, types)
}

# convert hms from milliseconds to seconds, also integer -> double
hmss <- which(vapply(res, "inherits", "hms", FUN.VALUE = logical(1)))
for (idx in hmss) {
res[[idx]] <- structure(
unclass(res[[idx]]) / 1000,
class = class(res[[idx]])
)
}

# convert POSIXct from milliseconds to seconds
posixcts <- which(vapply(res, "inherits", "POSIXct", FUN.VALUE = logical(1)))
for (idx in posixcts) {
res[[idx]][] <- structure(
unclass(res[[idx]]) / 1000,
class = class(res[[idx]])
)
}

res
}
67 changes: 51 additions & 16 deletions src/RParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ RParquetReader::RParquetReader(std::string filename)
if (rt.type != rt.tmptype && rt.tmptype != NILSXP) {
tmpdata[i].resize(metadata.num_rows * rt.elsize);
}
INTEGER(types)[idx] = file_meta_data_.schema[i].type;
idx++;
}
}
Expand Down Expand Up @@ -1039,22 +1040,6 @@ void convert_column_to_r_int96(postprocess *pp, uint32_t cl) {
} else if (hasdict0 && hasmiss0) {
convert_column_to_r_int96_dict_miss(pp, cl);
}

// TODO: make this conversion configurable
SEXP x = VECTOR_ELT(pp->columns, pp->leaf_cols[cl]);
SEXP cls = PROTECT(Rf_allocVector(STRSXP, 2));
SET_STRING_ELT(cls, 0, Rf_mkChar("POSIXct"));
SET_STRING_ELT(cls, 1, Rf_mkChar("POSIXt"));
Rf_setAttrib(x, Rf_install("tzone"), Rf_mkString("UTC"));
SET_CLASS(x, cls);
UNPROTECT(1);

R_xlen_t len = Rf_xlength(x);
double *ptr = REAL(x);
double *end = ptr + len;
for (; ptr < end; ptr++) {
*ptr = *ptr / 1000;
}
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -1544,6 +1529,56 @@ void convert_columns_to_r_(postprocess *pp) {
default:
break;

Check warning on line 1530 in src/RParquetReader.cpp

View check run for this annotation

Codecov / codecov/patch

src/RParquetReader.cpp#L1526-L1530

Added lines #L1526 - L1530 were not covered by tests
}

// add classes, if any
size_t nc = rt.classes.size();
if (nc > 0) {
SEXP x = VECTOR_ELT(pp->columns, pp->leaf_cols[cl]);
SEXP cls = PROTECT(Rf_allocVector(STRSXP, nc));
for (size_t i = 0; i < nc; i++) {
SET_STRING_ELT(cls, i, Rf_mkCharCE(rt.classes[i].c_str(), CE_UTF8));
}
SET_CLASS(x, cls);
UNPROTECT(1);
}

// add time zone attribute, if any
if (rt.tzone != "") {
SEXP x = VECTOR_ELT(pp->columns, pp->leaf_cols[cl]);
Rf_setAttrib(x, Rf_install("tzone"), Rf_mkString(rt.tzone.c_str()));
}

// add unit
size_t nu = rt.units.size();
if (nu > 0) {
SEXP x = VECTOR_ELT(pp->columns, pp->leaf_cols[cl]);
SEXP units = PROTECT(Rf_allocVector(STRSXP, nu));
for (size_t i = 0; i < nu; i++) {
SET_STRING_ELT(units, i, Rf_mkCharCE(rt.units[i].c_str(), CE_UTF8));
}
Rf_setAttrib(x, Rf_install("units"), units);
UNPROTECT(1);
}

// use multiplier, if any
if (rt.time_fct != 1.0) {
SEXP x = VECTOR_ELT(pp->columns, pp->leaf_cols[cl]);
if (TYPEOF(x) == INTSXP) {
int32_t *ptr = INTEGER(x);
int32_t *end = ptr + Rf_xlength(x);
for (; ptr < end; ptr++) {
*ptr /= rt.time_fct;

Check warning on line 1570 in src/RParquetReader.cpp

View check run for this annotation

Codecov / codecov/patch

src/RParquetReader.cpp#L1567-L1570

Added lines #L1567 - L1570 were not covered by tests
}
} else if (TYPEOF(x) == REALSXP) {
double *ptr = REAL(x);
double *end = ptr + Rf_xlength(x);
for (; ptr < end; ptr++) {
*ptr /= rt.time_fct;
}
} else {
Rf_error("Internal nanoparquet error, cannot multiply non-numeric");

Check warning on line 1579 in src/RParquetReader.cpp

View check run for this annotation

Codecov / codecov/patch

src/RParquetReader.cpp#L1579

Added line #L1579 was not covered by tests
}
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions tests/testthat/_snaps/read-parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
# read hms in MICROS

Code
as.data.frame(read_parquet(pf))
as.data.frame(read_parquet2(pf))
Output
tt
1 14:30:00
Expand All @@ -56,7 +56,7 @@
# read GZIP compressed files

Code
as.data.frame(read_parquet(pf))
as.data.frame(read_parquet2(pf))
Output
nam mpg cyl disp hp drat wt qsec vs am gear carb
1 <NA> 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
Expand Down Expand Up @@ -128,7 +128,7 @@
# V2 data pages

Code
as.data.frame(read_parquet(pf))
as.data.frame(read_parquet2(pf))
Output
FirstName Data
1 John 48, 65, 6c, 6c, 6f, 20, 57, 6f, 72, 6c, 64
Expand Down
22 changes: 11 additions & 11 deletions tests/testthat/test-read-parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ test_that("read Date", {
)
write_parquet(d, tmp)

d2 <- read_parquet(tmp)
d2 <- read_parquet2(tmp)
expect_s3_class(d2$d, "Date")
expect_equal(d$d, d2$d)
})
Expand All @@ -175,15 +175,15 @@ test_that("read hms", {
)
write_parquet(d, tmp)

d2 <- read_parquet(tmp)
d2 <- read_parquet2(tmp)
expect_s3_class(d2$h, "hms")
expect_equal(d$h, d2$h)
})

test_that("read hms in MICROS", {
pf <- test_path("data/timetz.parquet")
expect_snapshot({
as.data.frame(read_parquet(pf))
as.data.frame(read_parquet2(pf))
})
})

Expand All @@ -196,7 +196,7 @@ test_that("read POSIXct", {
)
write_parquet(d, tmp)

d2 <- read_parquet(tmp)
d2 <- read_parquet2(tmp)
expect_s3_class(d$h, "POSIXct")
expect_equal(d$h, d2$h)
})
Expand All @@ -206,7 +206,7 @@ test_that("read POSIXct in MILLIS", {
# This file has UTC = FALSE, so the exact result depends on the current
# time zone. But it should match Arrow.
pf <- test_path("data/timestamp-ms.parquet")
d1 <- read_parquet(pf)
d1 <- read_parquet2(pf)
d2 <- arrow::read_parquet(pf)
expect_equal(
as.data.frame(d1),
Expand All @@ -224,7 +224,7 @@ test_that("read difftime", {
)
write_parquet(d, tmp)

d2 <- read_parquet(tmp)
d2 <- read_parquet2(tmp)
expect_s3_class(d2$h, "difftime")
expect_equal(d$h, d2$h)

Expand All @@ -233,7 +233,7 @@ test_that("read difftime", {
h = as.difftime(10, units = "mins")
)
write_parquet(d, tmp)
d2 <- read_parquet(tmp)
d2 <- read_parquet2(tmp)
expect_snapshot({
as.data.frame(d2)
})
Expand Down Expand Up @@ -283,22 +283,22 @@ test_that("RLE BOOLEAN", {
test_that("read GZIP compressed files", {
pf <- test_path("data/gzip.parquet")
expect_snapshot({
as.data.frame(read_parquet(pf))
as.data.frame(read_parquet2(pf))
})
})

test_that("V2 data pages", {
pf <- test_path("data/parquet_go.parquet")
expect_snapshot({
as.data.frame(read_parquet(pf))
as.data.frame(read_parquet2(pf))
})
})

test_that("V2 data page with missing values", {
skip_on_cran()
pf <- test_path("data/duckdb-bug1589.parquet")
expect_equal(
as.data.frame(read_parquet(pf)),
as.data.frame(read_parquet2(pf)),
as.data.frame(arrow::read_parquet(pf))
)
})
Expand All @@ -316,7 +316,7 @@ test_that("zstd", {
pf <- test_path("data/zstd.parquet")
expect_true(all(read_parquet_metadata(pf)$column_chunks$codec == "ZSTD"))
pf2 <- test_path("data/gzip.parquet")
expect_equal(read_parquet(pf), read_parquet(pf2))
expect_equal(read_parquet2(pf), read_parquet2(pf2))
})

test_that("zstd with data page v2", {
Expand Down

0 comments on commit e00c562

Please sign in to comment.