Skip to content

Commit

Permalink
Merge pull request #92 from r-lib/feature/write-stats
Browse files Browse the repository at this point in the history
Write row group statistics
  • Loading branch information
gaborcsardi committed Sep 22, 2024
2 parents d087949 + e43fc37 commit 495c0c1
Show file tree
Hide file tree
Showing 18 changed files with 2,269 additions and 169 deletions.
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
`read_parquet_schema()` or the new `infer_parquet_schema()` function
instead.

* Other improvements:

- The new `parquet_schema()` function creates a Parquet schema from
scratch. You can use this schema as the new `schema` argument of
`write_parquet()`, to specify how the columns of a data frame should
Expand All @@ -25,6 +27,10 @@
at most 10 million rows into a single row group. You can choose the
row groups manually with the `row_groups` argument.

- `write_parquet()` now writes minimum and maximum values per row group
for most types. See `?parquet_options()` for turning this off. It also
writes out the number of non-missing values.

- Newly supported type conversions in `write_parquet()` via the
schema argument:

Expand Down
14 changes: 12 additions & 2 deletions R/options.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@
#' metadata to the file [write_parquet()].
#' @param write_data_page_version Data version to write by default.
#' Possible values are 1 and 2. Default is 1.
#' @param write_minmax_values Whether to write minimum and maximum values
#' per row group, for data types that support this in [write_parquet()].
#' However, nanoparquet currently does not support minimum and maximum
#' values for the `DECIMAL`, `UUID` and `FLOAT16` logical types and the
#' `BOOLEAN`, `BYTE_ARRAY` and `FIXED_LEN_BYTE_ARRAY` primitive types
#' if they are writing without a logical type. Currently the default
#' is `TRUE`.
#'
#' @return List of nanoparquet options.
#'
Expand All @@ -55,7 +62,8 @@ parquet_options <- function(
num_rows_per_row_group = getOption("nanoparquet.num_rows_per_row_group", 10000000L),
use_arrow_metadata = getOption("nanoparquet.use_arrow_metadata", TRUE),
write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE),
write_data_page_version = getOption("nanoparquet.write_data_page_version", 1L)
write_data_page_version = getOption("nanoparquet.write_data_page_version", 1L),
write_minmax_values = getOption("nanoparquet.write_minmax_values", TRUE)
) {
stopifnot(is.character(class))
stopifnot(is_flag(use_arrow_metadata))
Expand All @@ -66,6 +74,7 @@ parquet_options <- function(
identical(write_data_page_version, 1L) ||
identical(write_data_page_version, 2L)
)
stopifnot(is_flag(write_minmax_values))
num_rows_per_row_group <- as_count(
num_rows_per_row_group,
"num_rows_per_row_group"
Expand All @@ -86,6 +95,7 @@ parquet_options <- function(
num_rows_per_row_group = num_rows_per_row_group,
use_arrow_metadata = use_arrow_metadata,
write_arrow_metadata = write_arrow_metadata,
write_data_page_version = as.integer(write_data_page_version)
write_data_page_version = as.integer(write_data_page_version),
write_minmax_values = write_minmax_values
)
}
29 changes: 29 additions & 0 deletions R/parquet-metadata.R
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,21 @@ format_schema_result <- function(mtd, sch, options) {
#' integer for the root node, and `NA` for a leaf node.
# -------------------------------------------------------------------------
#' * `$row_groups`: a data frame, information about the row groups.
#' Some important columns:
#' - `file_name`: file name.
#' - `id`: row group id, integer from zero to number of row groups
#' minus one.
#' - `total_byte_size`: total uncompressed size of all column data.
#' - `num_rows`: number of rows.
#' - `file_offset`: where the row group starts in the file. This is
#' optional, so it might be `NA`.
#' - `total_compressed_size`: total byte size of all compressed
#' (and potentially encrypted) column data in this row group.
#' This is optional, so it might be `NA`.
#' - `ordinal`: ordinal position of the row group in the file, starting
#' from zero. This is optional, so it might be `NA`. If `NA`, then
#' the order of the row groups is as they appear in the metadata.
# -------------------------------------------------------------------------
#' * `$column_chunks`: a data frame, information about all column chunks,
#' across all row groups. Some important columns:
#' - `file_name`: file name.
Expand Down Expand Up @@ -155,6 +170,18 @@ format_schema_result <- function(mtd, sch, options) {
#' - `dictionary_page_offset`: absolute position of the first
#' dictionary page of the column chunk in the file, or `NA` if there
#' are no dictionary pages.
#' - `null_count`: the number of missing values in the column chunk.
#' It may be `NA`.
#' - `min_value`: list column of raw vectors, the minimum value of the
#' column, in binary. If `NULL`, then then it is not specified.
#' This column is experimental.
#' - `max_value`: list column of raw vectors, the maximum value of the
#' column, in binary. If `NULL`, then then it is not specified.
#' This column is experimental.
#' - `is_min_value_exact`: whether the minimum value is an actual
#' value of a column, or a bound. It may be `NA`.
#' - `is_max_value_exact`: whether the maximum value is an actual
#' value of a column, or a bound. It may be `NA`.
#'
#' @export
#' @seealso [read_parquet_info()] for a much shorter summary.
Expand Down Expand Up @@ -191,6 +218,8 @@ read_parquet_metadata <- function(file, options = parquet_options()) {
res$column_chunks$codec <- names(codecs)[res$column_chunks$codec + 1L]
res$column_chunks$encodings <- I(res$column_chunks$encodings)
res$column_chunks$path_in_schema <- I(res$column_chunks$path_in_schema)
res$column_chunks$min_value <- I(res$column_chunks$min_value)
res$column_chunks$max_value <- I(res$column_chunks$max_value)
res$column_chunks <- as.data.frame(res$column_chunks)
class(res$column_chunks) <- c("tbl", class(res$column_chunks))

Expand Down
2 changes: 2 additions & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
`%||%` <- function(l, r) if (is.null(l)) r else l

`%&&%` <- function(l, r) if (is.null(l)) NULL else r

is_rcmd_check <- function() {
if (identical(Sys.getenv("NOT_CRAN"), "true")) {
FALSE
Expand Down
11 changes: 10 additions & 1 deletion man/parquet_options.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions man/read_parquet_metadata.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 86 additions & 11 deletions src/dictionary-encoding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cstdint>
#include <cmath>
#include <iostream>
#include <cstring>

#include <Rdefines.h>
#include "protect.h"
Expand Down Expand Up @@ -68,21 +69,48 @@ uint64_t create_dict(T* values, uint64_t len, T naval) {
return n;
}

uint64_t create_dict_ptr_idx(void** values, int *dict, int *idx,
uint64_t len, void *naval) {
static inline bool STR_LESS(SEXP sc, SEXP set) {
const char *c = CHAR(sc), *et = CHAR(set);
size_t l = strlen(c), el = strlen(et);
if (l == 0) return el > 0;
if (el == 0) return false;
int res = memcmp(c, et, l < el ? l : el);
return res < 0 || (res == 0 && l < el);
}

static inline bool STR_MORE(SEXP sc, SEXP set) {
const char *c = CHAR(sc), *et = CHAR(set);
size_t l = strlen(c), el = strlen(et);
if (l == 0) return false;
if (el == 0) return true;
int res = memcmp(c, et, l < el ? l : el);
return res > 0 || (res == 0 && l > el);
}

uint64_t create_dict_str_idx(const SEXP* values, int *dict, int *idx,
uint64_t len, SEXP naval, SEXP &minval,
SEXP &maxval, bool &hasminmax) {
std::unordered_map<void*, int, void_ptr_hash> mm;
mm.reserve(len * 2);
void **begin = values;
void **end = begin + len;
SEXP *begin = (SEXP*) values;
SEXP *end = (SEXP*) begin + len;
int n = 0;

hasminmax = false;

for (int i = 0; begin < end; begin++, i++) {
if (*begin == naval) {
idx[i] = NA_INTEGER;
continue;
}
if (!hasminmax) {
hasminmax = true;
minval = maxval = *begin;
}
auto it = mm.find(*begin);
if (it == mm.end()) {
if (STR_LESS(*begin, minval)) minval = *begin;
if (STR_MORE(*begin, maxval)) maxval = *begin;
mm.insert(std::make_pair(*begin, n));
idx[i] = n;
dict[n] = i;
Expand All @@ -95,20 +123,30 @@ uint64_t create_dict_ptr_idx(void** values, int *dict, int *idx,
return n;
}

uint64_t create_dict_real_idx(double* values, int *dict, int *idx, uint64_t len) {
uint64_t create_dict_real_idx(double* values, int *dict, int *idx,
uint64_t len, double &minval,
double &maxval, bool &hasminmax) {
std::unordered_map<double, int> mm;
mm.reserve(len * 2);
double *begin = values;
double *end = begin + len;
int n = 0;

hasminmax = false;

for (int i = 0; begin < end; begin++, i++) {
if (R_IsNA(*begin)) {
idx[i] = NA_INTEGER;
continue;
}
if (!hasminmax) {
hasminmax = true;
minval = maxval = *begin;
}
auto it = mm.find(*begin);
if (it == mm.end()) {
if (*begin < minval) minval = *begin;
if (*begin > maxval) maxval = *begin;
mm.insert(std::make_pair(*begin, n));
idx[i] = n;
dict[n] = i;
Expand All @@ -122,20 +160,29 @@ uint64_t create_dict_real_idx(double* values, int *dict, int *idx, uint64_t len)
}

template <typename T>
uint64_t create_dict_idx(T* values, int *dict, int *idx, uint64_t len, T naval) {
uint64_t create_dict_idx(T* values, int *dict, int *idx, uint64_t len,
T naval, T &minval, T &maxval, bool &hasminmax) {
std::unordered_map<T, int> mm;
mm.reserve(len * 2);
T *begin = values;
T *end = begin + len;
int n = 0;

hasminmax = false;

for (int i = 0; begin < end; begin++, i++) {
if (*begin == naval) {
idx[i] = NA_INTEGER;
continue;
}
if (!hasminmax) {
hasminmax = true;
minval = maxval = *begin;
}
auto it = mm.find(*begin);
if (it == mm.end()) {
if (*begin < minval) minval = *begin;
if (*begin > maxval) maxval = *begin;
mm.insert(std::make_pair(*begin, n));
idx[i] = n;
dict[n] = i;
Expand Down Expand Up @@ -185,28 +232,56 @@ SEXP nanoparquet_create_dict_idx_(SEXP x, SEXP from, SEXP until) {
SEXP dict = PROTECT(Rf_allocVector(INTSXP, len));
int *idict = INTEGER(dict);
int *iidx = INTEGER(idx);
int imin, imax;
double dmin, dmax;
SEXP smin = R_NilValue, smax = R_NilValue;
bool hasminmax = false;
switch (TYPEOF(x)) {
case LGLSXP:
dictlen = create_dict_idx<int>(LOGICAL(x) + cfrom, iidx, idict, len, NA_LOGICAL);
dictlen = create_dict_idx<int>(
LOGICAL(x) + cfrom, iidx, idict, len, NA_LOGICAL,
imin, imax, hasminmax
);
break;
case INTSXP:
dictlen = create_dict_idx<int>(INTEGER(x) + cfrom, idict, iidx, len, NA_INTEGER);
dictlen = create_dict_idx<int>(
INTEGER(x) + cfrom, idict, iidx, len, NA_INTEGER,
imin, imax, hasminmax
);
break;
case REALSXP:
dictlen = create_dict_real_idx(REAL(x) + cfrom, idict, iidx, len);
dictlen = create_dict_real_idx(
REAL(x) + cfrom, idict, iidx, len,
dmin, dmax, hasminmax
);
break;
case STRSXP: {
dictlen = create_dict_ptr_idx((void**)(STRING_PTR_RO(x) + cfrom), idict, iidx, len, (void*) NA_STRING);
dictlen = create_dict_str_idx(
STRING_PTR_RO(x) + cfrom, idict, iidx, len, NA_STRING,
smin, smax, hasminmax
);
break;
}
default:
Rf_error("Cannot create dictionary for this type");
break;
}

SEXP res = PROTECT(Rf_allocVector(VECSXP, 2));
SEXP res = PROTECT(Rf_allocVector(VECSXP, hasminmax ? 4 : 2));
SET_VECTOR_ELT(res, 0, dict);
SET_VECTOR_ELT(res, 1, idx);
if (hasminmax) {
if (TYPEOF(x) == INTSXP) {
SET_VECTOR_ELT(res, 2, Rf_ScalarInteger(imin));
SET_VECTOR_ELT(res, 3, Rf_ScalarInteger(imax));
} else if (TYPEOF(x) == REALSXP) {
SET_VECTOR_ELT(res, 2, Rf_ScalarReal(dmin));
SET_VECTOR_ELT(res, 3, Rf_ScalarReal(dmax));
} else if (TYPEOF(x) == STRSXP) {
SET_VECTOR_ELT(res, 2, smin);
SET_VECTOR_ELT(res, 3, smax);
}
}

if (dictlen < len) {
SET_VECTOR_ELT(res, 0, Rf_xlengthgets(dict, dictlen));
Expand Down
Loading

0 comments on commit 495c0c1

Please sign in to comment.