diff --git a/NEWS.md b/NEWS.md index cd1148c..44c7c17 100644 --- a/NEWS.md +++ b/NEWS.md @@ -16,6 +16,8 @@ `read_parquet_schema()` or the new `infer_parquet_schema()` function instead. +* Other improvements: + - The new `parquet_schema()` function creates a Parquet schema from scratch. You can use this schema as the new `schema` argument of `write_parquet()`, to specify how the columns of a data frame should @@ -25,6 +27,10 @@ at most 10 million rows into a single row group. You can choose the row groups manually with the `row_groups` argument. + - `write_parquet()` now writes minimum and maximum values per row group + for most types. See `?parquet_options()` for turning this off. It also + writes out the number of non-missing values. + - Newly supported type conversions in `write_parquet()` via the schema argument: diff --git a/R/options.R b/R/options.R index 275d17c..2bb44fc 100644 --- a/R/options.R +++ b/R/options.R @@ -34,6 +34,13 @@ #' metadata to the file [write_parquet()]. #' @param write_data_page_version Data version to write by default. #' Possible values are 1 and 2. Default is 1. +#' @param write_minmax_values Whether to write minimum and maximum values +#' per row group, for data types that support this in [write_parquet()]. +#' However, nanoparquet currently does not support minimum and maximum +#' values for the `DECIMAL`, `UUID` and `FLOAT16` logical types and the +#' `BOOLEAN`, `BYTE_ARRAY` and `FIXED_LEN_BYTE_ARRAY` primitive types +#' if they are writing without a logical type. Currently the default +#' is `TRUE`. #' #' @return List of nanoparquet options. #' @@ -55,7 +62,8 @@ parquet_options <- function( num_rows_per_row_group = getOption("nanoparquet.num_rows_per_row_group", 10000000L), use_arrow_metadata = getOption("nanoparquet.use_arrow_metadata", TRUE), write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE), - write_data_page_version = getOption("nanoparquet.write_data_page_version", 1L) + write_data_page_version = getOption("nanoparquet.write_data_page_version", 1L), + write_minmax_values = getOption("nanoparquet.write_minmax_values", TRUE) ) { stopifnot(is.character(class)) stopifnot(is_flag(use_arrow_metadata)) @@ -66,6 +74,7 @@ parquet_options <- function( identical(write_data_page_version, 1L) || identical(write_data_page_version, 2L) ) + stopifnot(is_flag(write_minmax_values)) num_rows_per_row_group <- as_count( num_rows_per_row_group, "num_rows_per_row_group" @@ -86,6 +95,7 @@ parquet_options <- function( num_rows_per_row_group = num_rows_per_row_group, use_arrow_metadata = use_arrow_metadata, write_arrow_metadata = write_arrow_metadata, - write_data_page_version = as.integer(write_data_page_version) + write_data_page_version = as.integer(write_data_page_version), + write_minmax_values = write_minmax_values ) } diff --git a/R/parquet-metadata.R b/R/parquet-metadata.R index 50c5612..0f07466 100644 --- a/R/parquet-metadata.R +++ b/R/parquet-metadata.R @@ -126,6 +126,21 @@ format_schema_result <- function(mtd, sch, options) { #' integer for the root node, and `NA` for a leaf node. # ------------------------------------------------------------------------- #' * `$row_groups`: a data frame, information about the row groups. +#' Some important columns: +#' - `file_name`: file name. +#' - `id`: row group id, integer from zero to number of row groups +#' minus one. +#' - `total_byte_size`: total uncompressed size of all column data. +#' - `num_rows`: number of rows. +#' - `file_offset`: where the row group starts in the file. This is +#' optional, so it might be `NA`. +#' - `total_compressed_size`: total byte size of all compressed +#' (and potentially encrypted) column data in this row group. +#' This is optional, so it might be `NA`. +#' - `ordinal`: ordinal position of the row group in the file, starting +#' from zero. This is optional, so it might be `NA`. If `NA`, then +#' the order of the row groups is as they appear in the metadata. +# ------------------------------------------------------------------------- #' * `$column_chunks`: a data frame, information about all column chunks, #' across all row groups. Some important columns: #' - `file_name`: file name. @@ -155,6 +170,18 @@ format_schema_result <- function(mtd, sch, options) { #' - `dictionary_page_offset`: absolute position of the first #' dictionary page of the column chunk in the file, or `NA` if there #' are no dictionary pages. +#' - `null_count`: the number of missing values in the column chunk. +#' It may be `NA`. +#' - `min_value`: list column of raw vectors, the minimum value of the +#' column, in binary. If `NULL`, then then it is not specified. +#' This column is experimental. +#' - `max_value`: list column of raw vectors, the maximum value of the +#' column, in binary. If `NULL`, then then it is not specified. +#' This column is experimental. +#' - `is_min_value_exact`: whether the minimum value is an actual +#' value of a column, or a bound. It may be `NA`. +#' - `is_max_value_exact`: whether the maximum value is an actual +#' value of a column, or a bound. It may be `NA`. #' #' @export #' @seealso [read_parquet_info()] for a much shorter summary. @@ -191,6 +218,8 @@ read_parquet_metadata <- function(file, options = parquet_options()) { res$column_chunks$codec <- names(codecs)[res$column_chunks$codec + 1L] res$column_chunks$encodings <- I(res$column_chunks$encodings) res$column_chunks$path_in_schema <- I(res$column_chunks$path_in_schema) + res$column_chunks$min_value <- I(res$column_chunks$min_value) + res$column_chunks$max_value <- I(res$column_chunks$max_value) res$column_chunks <- as.data.frame(res$column_chunks) class(res$column_chunks) <- c("tbl", class(res$column_chunks)) diff --git a/R/utils.R b/R/utils.R index af2be45..7d273c0 100644 --- a/R/utils.R +++ b/R/utils.R @@ -1,5 +1,7 @@ `%||%` <- function(l, r) if (is.null(l)) r else l +`%&&%` <- function(l, r) if (is.null(l)) NULL else r + is_rcmd_check <- function() { if (identical(Sys.getenv("NOT_CRAN"), "true")) { FALSE diff --git a/man/parquet_options.Rd b/man/parquet_options.Rd index b0c0a1c..9c0863f 100644 --- a/man/parquet_options.Rd +++ b/man/parquet_options.Rd @@ -10,7 +10,8 @@ parquet_options( num_rows_per_row_group = getOption("nanoparquet.num_rows_per_row_group", 10000000L), use_arrow_metadata = getOption("nanoparquet.use_arrow_metadata", TRUE), write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE), - write_data_page_version = getOption("nanoparquet.write_data_page_version", 1L) + write_data_page_version = getOption("nanoparquet.write_data_page_version", 1L), + write_minmax_values = getOption("nanoparquet.write_minmax_values", TRUE) ) } \arguments{ @@ -56,6 +57,14 @@ metadata to the file \code{\link[=write_parquet]{write_parquet()}}.} \item{write_data_page_version}{Data version to write by default. Possible values are 1 and 2. Default is 1.} + +\item{write_minmax_values}{Whether to write minimum and maximum values +per row group, for data types that support this in \code{\link[=write_parquet]{write_parquet()}}. +However, nanoparquet currently does not support minimum and maximum +values for the \code{DECIMAL}, \code{UUID} and \code{FLOAT16} logical types and the +\code{BOOLEAN}, \code{BYTE_ARRAY} and \code{FIXED_LEN_BYTE_ARRAY} primitive types +if they are writing without a logical type. Currently the default +is \code{TRUE}.} } \value{ List of nanoparquet options. diff --git a/man/read_parquet_metadata.Rd b/man/read_parquet_metadata.Rd index 0034132..2a7320c 100644 --- a/man/read_parquet_metadata.Rd +++ b/man/read_parquet_metadata.Rd @@ -53,6 +53,22 @@ additional entries, e.g. \code{bit_width}, \code{is_signed}, etc. integer for the root node, and \code{NA} for a leaf node. } \item \verb{$row_groups}: a data frame, information about the row groups. +Some important columns: +\itemize{ +\item \code{file_name}: file name. +\item \code{id}: row group id, integer from zero to number of row groups +minus one. +\item \code{total_byte_size}: total uncompressed size of all column data. +\item \code{num_rows}: number of rows. +\item \code{file_offset}: where the row group starts in the file. This is +optional, so it might be \code{NA}. +\item \code{total_compressed_size}: total byte size of all compressed +(and potentially encrypted) column data in this row group. +This is optional, so it might be \code{NA}. +\item \code{ordinal}: ordinal position of the row group in the file, starting +from zero. This is optional, so it might be \code{NA}. If \code{NA}, then +the order of the row groups is as they appear in the metadata. +} \item \verb{$column_chunks}: a data frame, information about all column chunks, across all row groups. Some important columns: \itemize{ @@ -83,6 +99,18 @@ the column chunk in the file, or \code{NA} if there are no index pages. \item \code{dictionary_page_offset}: absolute position of the first dictionary page of the column chunk in the file, or \code{NA} if there are no dictionary pages. +\item \code{null_count}: the number of missing values in the column chunk. +It may be \code{NA}. +\item \code{min_value}: list column of raw vectors, the minimum value of the +column, in binary. If \code{NULL}, then then it is not specified. +This column is experimental. +\item \code{max_value}: list column of raw vectors, the maximum value of the +column, in binary. If \code{NULL}, then then it is not specified. +This column is experimental. +\item \code{is_min_value_exact}: whether the minimum value is an actual +value of a column, or a bound. It may be \code{NA}. +\item \code{is_max_value_exact}: whether the maximum value is an actual +value of a column, or a bound. It may be \code{NA}. } } } diff --git a/src/dictionary-encoding.cpp b/src/dictionary-encoding.cpp index e2edb03..bcbb366 100644 --- a/src/dictionary-encoding.cpp +++ b/src/dictionary-encoding.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include "protect.h" @@ -68,21 +69,48 @@ uint64_t create_dict(T* values, uint64_t len, T naval) { return n; } -uint64_t create_dict_ptr_idx(void** values, int *dict, int *idx, - uint64_t len, void *naval) { +static inline bool STR_LESS(SEXP sc, SEXP set) { + const char *c = CHAR(sc), *et = CHAR(set); + size_t l = strlen(c), el = strlen(et); + if (l == 0) return el > 0; + if (el == 0) return false; + int res = memcmp(c, et, l < el ? l : el); + return res < 0 || (res == 0 && l < el); +} + +static inline bool STR_MORE(SEXP sc, SEXP set) { + const char *c = CHAR(sc), *et = CHAR(set); + size_t l = strlen(c), el = strlen(et); + if (l == 0) return false; + if (el == 0) return true; + int res = memcmp(c, et, l < el ? l : el); + return res > 0 || (res == 0 && l > el); +} + +uint64_t create_dict_str_idx(const SEXP* values, int *dict, int *idx, + uint64_t len, SEXP naval, SEXP &minval, + SEXP &maxval, bool &hasminmax) { std::unordered_map mm; mm.reserve(len * 2); - void **begin = values; - void **end = begin + len; + SEXP *begin = (SEXP*) values; + SEXP *end = (SEXP*) begin + len; int n = 0; + hasminmax = false; + for (int i = 0; begin < end; begin++, i++) { if (*begin == naval) { idx[i] = NA_INTEGER; continue; } + if (!hasminmax) { + hasminmax = true; + minval = maxval = *begin; + } auto it = mm.find(*begin); if (it == mm.end()) { + if (STR_LESS(*begin, minval)) minval = *begin; + if (STR_MORE(*begin, maxval)) maxval = *begin; mm.insert(std::make_pair(*begin, n)); idx[i] = n; dict[n] = i; @@ -95,20 +123,30 @@ uint64_t create_dict_ptr_idx(void** values, int *dict, int *idx, return n; } -uint64_t create_dict_real_idx(double* values, int *dict, int *idx, uint64_t len) { +uint64_t create_dict_real_idx(double* values, int *dict, int *idx, + uint64_t len, double &minval, + double &maxval, bool &hasminmax) { std::unordered_map mm; mm.reserve(len * 2); double *begin = values; double *end = begin + len; int n = 0; + hasminmax = false; + for (int i = 0; begin < end; begin++, i++) { if (R_IsNA(*begin)) { idx[i] = NA_INTEGER; continue; } + if (!hasminmax) { + hasminmax = true; + minval = maxval = *begin; + } auto it = mm.find(*begin); if (it == mm.end()) { + if (*begin < minval) minval = *begin; + if (*begin > maxval) maxval = *begin; mm.insert(std::make_pair(*begin, n)); idx[i] = n; dict[n] = i; @@ -122,20 +160,29 @@ uint64_t create_dict_real_idx(double* values, int *dict, int *idx, uint64_t len) } template -uint64_t create_dict_idx(T* values, int *dict, int *idx, uint64_t len, T naval) { +uint64_t create_dict_idx(T* values, int *dict, int *idx, uint64_t len, + T naval, T &minval, T &maxval, bool &hasminmax) { std::unordered_map mm; mm.reserve(len * 2); T *begin = values; T *end = begin + len; int n = 0; + hasminmax = false; + for (int i = 0; begin < end; begin++, i++) { if (*begin == naval) { idx[i] = NA_INTEGER; continue; } + if (!hasminmax) { + hasminmax = true; + minval = maxval = *begin; + } auto it = mm.find(*begin); if (it == mm.end()) { + if (*begin < minval) minval = *begin; + if (*begin > maxval) maxval = *begin; mm.insert(std::make_pair(*begin, n)); idx[i] = n; dict[n] = i; @@ -185,18 +232,34 @@ SEXP nanoparquet_create_dict_idx_(SEXP x, SEXP from, SEXP until) { SEXP dict = PROTECT(Rf_allocVector(INTSXP, len)); int *idict = INTEGER(dict); int *iidx = INTEGER(idx); + int imin, imax; + double dmin, dmax; + SEXP smin = R_NilValue, smax = R_NilValue; + bool hasminmax = false; switch (TYPEOF(x)) { case LGLSXP: - dictlen = create_dict_idx(LOGICAL(x) + cfrom, iidx, idict, len, NA_LOGICAL); + dictlen = create_dict_idx( + LOGICAL(x) + cfrom, iidx, idict, len, NA_LOGICAL, + imin, imax, hasminmax + ); break; case INTSXP: - dictlen = create_dict_idx(INTEGER(x) + cfrom, idict, iidx, len, NA_INTEGER); + dictlen = create_dict_idx( + INTEGER(x) + cfrom, idict, iidx, len, NA_INTEGER, + imin, imax, hasminmax + ); break; case REALSXP: - dictlen = create_dict_real_idx(REAL(x) + cfrom, idict, iidx, len); + dictlen = create_dict_real_idx( + REAL(x) + cfrom, idict, iidx, len, + dmin, dmax, hasminmax + ); break; case STRSXP: { - dictlen = create_dict_ptr_idx((void**)(STRING_PTR_RO(x) + cfrom), idict, iidx, len, (void*) NA_STRING); + dictlen = create_dict_str_idx( + STRING_PTR_RO(x) + cfrom, idict, iidx, len, NA_STRING, + smin, smax, hasminmax + ); break; } default: @@ -204,9 +267,21 @@ SEXP nanoparquet_create_dict_idx_(SEXP x, SEXP from, SEXP until) { break; } - SEXP res = PROTECT(Rf_allocVector(VECSXP, 2)); + SEXP res = PROTECT(Rf_allocVector(VECSXP, hasminmax ? 4 : 2)); SET_VECTOR_ELT(res, 0, dict); SET_VECTOR_ELT(res, 1, idx); + if (hasminmax) { + if (TYPEOF(x) == INTSXP) { + SET_VECTOR_ELT(res, 2, Rf_ScalarInteger(imin)); + SET_VECTOR_ELT(res, 3, Rf_ScalarInteger(imax)); + } else if (TYPEOF(x) == REALSXP) { + SET_VECTOR_ELT(res, 2, Rf_ScalarReal(dmin)); + SET_VECTOR_ELT(res, 3, Rf_ScalarReal(dmax)); + } else if (TYPEOF(x) == STRSXP) { + SET_VECTOR_ELT(res, 2, smin); + SET_VECTOR_ELT(res, 3, smax); + } + } if (dictlen < len) { SET_VECTOR_ELT(res, 0, Rf_xlengthgets(dict, dictlen)); diff --git a/src/lib/ParquetOutFile.cpp b/src/lib/ParquetOutFile.cpp index 51f9bb3..0da5337 100644 --- a/src/lib/ParquetOutFile.cpp +++ b/src/lib/ParquetOutFile.cpp @@ -246,6 +246,8 @@ void ParquetOutFile::write_data_( std::ostream &file, uint32_t idx, uint32_t size, + uint32_t group, + uint32_t page, uint64_t from, uint64_t until) { @@ -254,28 +256,28 @@ void ParquetOutFile::write_data_( parquet::Type::type type = se.type; switch (type) { case Type::INT32: - write_int32(file, idx, from, until, se); + write_int32(file, idx, group, page, from, until, se); break; case Type::INT64: - write_int64(file, idx, from, until, se); + write_int64(file, idx, group, page, from, until, se); break; case Type::INT96: - write_int96(file, idx, from, until, se); + write_int96(file, idx, group, page, from, until, se); break; case Type::FLOAT: - write_float(file, idx, from, until, se); + write_float(file, idx, group, page, from, until, se); break; case Type::DOUBLE: - write_double(file, idx, from, until, se); + write_double(file, idx, group, page, from, until, se); break; case Type::BYTE_ARRAY: - write_byte_array(file, idx, from, until, se); + write_byte_array(file, idx, group, page, from, until, se); break; case Type::FIXED_LEN_BYTE_ARRAY: - write_fixed_len_byte_array(file, idx, from, until, se); + write_fixed_len_byte_array(file, idx, group, page, from, until, se); break; case Type::BOOLEAN: - write_boolean(file, idx, from, until); + write_boolean(file, idx, group, page, from, until); break; default: throw runtime_error("Cannot write unknown column type"); // # nocov @@ -300,6 +302,8 @@ void ParquetOutFile::write_present_data_( uint32_t idx, uint32_t size, uint32_t num_present, + uint32_t group, + uint32_t page, uint64_t from, uint64_t until) { @@ -308,25 +312,25 @@ void ParquetOutFile::write_present_data_( parquet::Type::type type = se.type; switch (type) { case Type::INT32: - write_int32(file, idx, from, until, se); + write_int32(file, idx, group, page, from, until, se); break; case Type::INT64: - write_int64(file, idx, from, until, se); + write_int64(file, idx, group, page, from, until, se); break; case Type::INT96: - write_int96(file, idx, from, until, se); + write_int96(file, idx, group, page, from, until, se); break; case Type::FLOAT: - write_float(file, idx, from, until, se); + write_float(file, idx, group, page, from, until, se); break; case Type::DOUBLE: - write_double(file, idx, from, until, se); + write_double(file, idx, group, page, from, until, se); break; case Type::BYTE_ARRAY: - write_byte_array(file, idx, from, until, se); + write_byte_array(file, idx, group, page, from, until, se); break; case Type::FIXED_LEN_BYTE_ARRAY: - write_fixed_len_byte_array(file, idx, from, until, se); + write_fixed_len_byte_array(file, idx, group, page, from, until, se); break; case Type::BOOLEAN: write_present_boolean(file, idx, num_present, from, until); @@ -510,7 +514,8 @@ void ParquetOutFile::write() { // write int64_t from = row_group_starts[idx]; int64_t until = idx < row_group_starts.size() - 1 ? row_group_starts[idx + 1] : num_rows; - int64_t total_size = write_columns(from, until); + write_row_group(idx); + int64_t total_size = write_columns(idx, from, until); // row group metadata vector ccs; @@ -531,33 +536,48 @@ void ParquetOutFile::write() { pfile_.close(); } -int64_t ParquetOutFile::write_columns(int64_t from, int64_t until) { +int64_t ParquetOutFile::write_columns(uint32_t group, int64_t from, + int64_t until) { uint32_t start = pfile.tellp(); for (uint32_t idx = 0; idx < num_cols; idx++) { - write_column(idx, from, until); + write_column(idx, group, from, until); } uint32_t end = pfile.tellp(); // return total size return end - start; } -void ParquetOutFile::write_column(uint32_t idx, int64_t from, int64_t until) { +void ParquetOutFile::write_column(uint32_t idx, uint32_t group, + int64_t from, int64_t until) { ColumnMetaData *cmd = &(column_meta_data[idx]); SchemaElement se = schemas[idx + 1]; uint32_t col_start = pfile.tellp(); // we increase this as needed cmd->__set_total_uncompressed_size(0); + Statistics stat; + // we increase this as we write + stat.__set_null_count(0); + cmd->__set_statistics(stat); if (encodings[idx] == Encoding::RLE_DICTIONARY) { uint32_t dictionary_page_offset = pfile.tellp(); write_dictionary_page(idx, from, until); cmd->__set_dictionary_page_offset(dictionary_page_offset); } uint32_t data_offset = pfile.tellp(); - write_data_pages(idx, from, until); + write_data_pages(idx, group, from, until); int32_t column_bytes = ((int32_t) pfile.tellp()) - col_start; cmd->__set_num_values(until - from); cmd->__set_total_compressed_size(column_bytes); cmd->__set_data_page_offset(data_offset); + // min-max values + std::string min_value, max_value; + if (get_group_minmax_values(idx, group, se, min_value, max_value)) { + Statistics *stat = &cmd->statistics; + stat->__set_min_value(min_value); + stat->__set_max_value(max_value); + stat->__set_is_min_value_exact(true); + stat->__set_is_max_value_exact(true); + } } void ParquetOutFile::write_page_header(uint32_t idx, PageHeader &ph) { @@ -582,7 +602,7 @@ void ParquetOutFile::write_dictionary_page(uint32_t idx, int64_t from, // Uncompresed size of the dictionary in bytes uint32_t dict_size = get_size_dictionary(idx, se, from, until); // Number of entries in the dicitonary - uint32_t num_dict_values = get_num_values_dictionary(idx, from, until); + uint32_t num_dict_values = get_num_values_dictionary(idx, se, from, until); // Init page header PageHeader ph; @@ -619,8 +639,8 @@ void ParquetOutFile::write_dictionary_page(uint32_t idx, int64_t from, } } -void ParquetOutFile::write_data_pages(uint32_t idx, int64_t from, - int64_t until) { +void ParquetOutFile::write_data_pages(uint32_t idx, uint32_t group, + int64_t from, int64_t until) { SchemaElement se = schemas[idx + 1]; int64_t rg_num_rows = until - from; @@ -630,7 +650,7 @@ void ParquetOutFile::write_data_pages(uint32_t idx, int64_t from, total_size = calculate_column_data_size(idx, rg_num_rows, from, until); } else { // estimate the max RLE length - uint32_t num_values = get_num_values_dictionary(idx, from, until); + uint32_t num_values = get_num_values_dictionary(idx, se, from, until); uint8_t bit_width = ceil(log2((double) num_values)); total_size = MaxRleBpSizeSimple(rg_num_rows, bit_width); } @@ -665,14 +685,16 @@ void ParquetOutFile::write_data_pages(uint32_t idx, int64_t from, if (page_until > until) { page_until = until; } - write_data_page(idx, from, until, page_from, page_until); + write_data_page(idx, group, i, from, until, page_from, page_until); } } -void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, +void ParquetOutFile::write_data_page(uint32_t idx, uint32_t group, + uint32_t page, int64_t rg_from, int64_t rg_until, uint64_t page_from, uint64_t page_until) { ColumnMetaData *cmd = &(column_meta_data[idx]); + Statistics *stat = &(cmd->statistics); SchemaElement se = schemas[idx + 1]; PageHeader ph; DataPageHeaderV2 dph2; @@ -716,7 +738,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, ph.__set_data_page_header_v2(dph2); } write_page_header(idx, ph); - write_data_(pfile, idx, data_size, page_from, page_until); + write_data_(pfile, idx, data_size, group, page, page_from, page_until); } else if (se.repetition_type == FieldRepetitionType::REQUIRED && encodings[idx] == Encoding::PLAIN && @@ -730,7 +752,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_data_(*os0, idx, data_size, page_from, page_until); + write_data_(*os0, idx, data_size, group, page, page_from, page_until); // 2. compress buf_unc to buf_com size_t cdata_size = compress(cmd->codec, buf_unc, data_size, buf_com); @@ -756,7 +778,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, page_from, page_until); // 2. RLE encode buf_unc to buf_com - uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); + uint32_t num_dict_values = get_num_values_dictionary(idx, se, rg_from, rg_until); uint8_t bit_width = ceil(log2((double) num_dict_values)); uint32_t rle_size = rle_encode( buf_unc, @@ -792,7 +814,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, page_from, page_until); // 2. RLE encode buf_unc to buf_com - uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); + uint32_t num_dict_values = get_num_values_dictionary(idx, se, rg_from, rg_until); uint8_t bit_width = ceil(log2((double) num_dict_values)); uint32_t rle_size = rle_encode( buf_unc, @@ -852,9 +874,11 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, cmd->__set_total_uncompressed_size( cmd->total_uncompressed_size + rle_size + 4 ); + stat->__set_null_count(stat->null_count + page_num_values - num_present); // 4. write data to file - write_present_data_(pfile, idx, data_size, num_present, page_from, page_until); + write_present_data_(pfile, idx, data_size, num_present, group, page, + page_from, page_until); } else if (se.repetition_type == FieldRepetitionType::OPTIONAL && encodings[idx] == Encoding::PLAIN && @@ -885,7 +909,8 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, std::unique_ptr os1 = std::unique_ptr(new std::ostream(&buf_com)); buf_com.skip(rle_size); - write_present_data_(*os1, idx, data_size, num_present, page_from, page_until); + write_present_data_(*os1, idx, data_size, num_present, group, page, + page_from, page_until); // 4. compress buf_com to buf_unc // for data page v2, the def levels are not compressed! @@ -905,6 +930,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, cmd->__set_total_uncompressed_size( cmd->total_uncompressed_size + rle_size ); + stat->__set_null_count(stat->null_count + page_num_values - num_present); } else if (se.repetition_type == FieldRepetitionType::OPTIONAL && encodings[idx] == Encoding::RLE_DICTIONARY && @@ -936,7 +962,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, page_from, page_until); // 4. append RLE buf_unc to buf_com - uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); + uint32_t num_dict_values = get_num_values_dictionary(idx, se, rg_from, rg_until); uint8_t bit_width = ceil(log2((double) num_dict_values)); uint32_t rle2_size = rle_encode( buf_unc, @@ -961,6 +987,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, cmd->__set_total_uncompressed_size( cmd->total_uncompressed_size + rle2_size ); + stat->__set_null_count(stat->null_count + page_num_values - num_present); } else if (se.repetition_type == FieldRepetitionType::OPTIONAL && encodings[idx] == Encoding::RLE_DICTIONARY && @@ -992,7 +1019,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, page_from, page_until); // 4. append RLE buf_unc to buf_com - uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); + uint32_t num_dict_values = get_num_values_dictionary(idx, se, rg_from, rg_until); uint8_t bit_width = ceil(log2((double) num_dict_values)); uint32_t rle2_size = rle_encode( buf_unc, @@ -1021,6 +1048,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, cmd->__set_total_uncompressed_size( cmd->total_uncompressed_size + rle2_size ); + stat->__set_null_count(stat->null_count + page_num_values - num_present); } else if (se.repetition_type == FieldRepetitionType::REQUIRED && encodings[idx] == Encoding::RLE && @@ -1031,7 +1059,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_boolean_as_int(*os0, idx, page_from, page_until); + write_boolean_as_int(*os0, idx, group, page, page_from, page_until); // 2. RLE encode buf_unc to buf_com uint32_t rle_size = rle_encode( @@ -1061,7 +1089,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_boolean_as_int(*os0, idx, page_from, page_until); + write_boolean_as_int(*os0, idx, group, page, page_from, page_until); // 2. RLE encode buf_unc to buf_com uint32_t rle_size = rle_encode( @@ -1132,6 +1160,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, cmd->__set_total_uncompressed_size( cmd->total_uncompressed_size + rle2_size ); + stat->__set_null_count(stat->null_count + page_num_values - num_present); } else if (se.repetition_type == FieldRepetitionType::OPTIONAL && encodings[idx] == Encoding::RLE && @@ -1183,6 +1212,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, cmd->__set_total_uncompressed_size( cmd->total_uncompressed_size + rle2_size ); + stat->__set_null_count(stat->null_count + page_num_values - num_present); } } diff --git a/src/lib/ParquetOutFile.h b/src/lib/ParquetOutFile.h index 4adce01..34782d8 100644 --- a/src/lib/ParquetOutFile.h +++ b/src/lib/ParquetOutFile.h @@ -33,27 +33,40 @@ class ParquetOutFile { void add_key_value_metadata(std::string key, std::string value); void write(); + // This makes the write inherently sequential and we might remove it + // latest. Currently, it makes it easier to keep track of minimume and + // maximum values per row group. + virtual void write_row_group(uint32_t group) = 0; + // write out various parquet types, these must be implemented in // the subclass - virtual void write_int32(std::ostream &file, uint32_t idx, uint64_t from, - uint64_t until, parquet::SchemaElement &sel) = 0; - virtual void write_int64(std::ostream &file, uint32_t idx, uint64_t from, - uint64_t until, parquet::SchemaElement &sel) = 0; - virtual void write_int96(std::ostream &file, uint32_t idx, uint64_t from, - uint64_t until, parquet::SchemaElement &sel) = 0; - virtual void write_float(std::ostream &file, uint32_t idx, uint64_t from, - uint64_t until, parquet::SchemaElement &sel) = 0; - virtual void write_double(std::ostream &file, uint32_t idx, uint64_t from, - uint64_t until, parquet::SchemaElement &sel) = 0; + virtual void write_int32(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until, + parquet::SchemaElement &sel) = 0; + virtual void write_int64(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until, + parquet::SchemaElement &sel) = 0; + virtual void write_int96(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until, + parquet::SchemaElement &sel) = 0; + virtual void write_float(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until, + parquet::SchemaElement &sel) = 0; + virtual void write_double(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until, + parquet::SchemaElement &sel) = 0; virtual void write_byte_array(std::ostream &file, uint32_t idx, - uint64_t from, uint64_t until, + uint32_t group, uint32_t page, uint64_t from, + uint64_t until, parquet::SchemaElement &sel) = 0; virtual void write_fixed_len_byte_array(std::ostream &file, uint32_t idx, + uint32_t group, uint32_t page, uint64_t from, uint64_t until, parquet::SchemaElement &sel) = 0; - virtual void write_boolean(std::ostream &file, uint32_t idx, - uint64_t from, uint64_t until) = 0; + virtual void write_boolean(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until) = 0; virtual void write_boolean_as_int(std::ostream &file, uint32_t idx, + uint32_t group, uint32_t page, uint64_t from, uint64_t until) = 0; // callbacks for missing values @@ -72,7 +85,9 @@ class ParquetOutFile { virtual uint32_t get_size_byte_array(uint32_t idx, uint32_t num_present, uint64_t from, uint64_t until) = 0; - virtual uint32_t get_num_values_dictionary(uint32_t idx, int64_t from, + virtual uint32_t get_num_values_dictionary(uint32_t idx, + parquet::SchemaElement &sel, + int64_t from, int64_t until) = 0; virtual uint32_t get_size_dictionary(uint32_t idx, parquet::SchemaElement &sel, @@ -87,6 +102,11 @@ class ParquetOutFile { uint64_t page_from, uint64_t page_until) = 0; + virtual bool get_group_minmax_values(uint32_t idx, uint32_t group, + parquet::SchemaElement &sel, + std::string &min_value, + std::string &max_value) = 0; + int data_page_version = 1; private: @@ -113,19 +133,24 @@ class ParquetOutFile { void init_column_meta_data(); // return total size - int64_t write_columns(int64_t from, int64_t until); - void write_column(uint32_t idx, int64_t from, int64_t until); + int64_t write_columns(uint32_t group, int64_t from, int64_t until); + void write_column(uint32_t idx, uint32_t group, int64_t from, + int64_t until); void write_dictionary_page(uint32_t idx, int64_t from, int64_t until); - void write_data_pages(uint32_t idx, int64_t from, int64_t until); - void write_data_page(uint32_t idx, int64_t rg_from, int64_t rg_until, + void write_data_pages(uint32_t idx, uint32_t group, int64_t from, + int64_t until); + void write_data_page(uint32_t idx, uint32_t group, uint32_t page, + int64_t rg_from, int64_t rg_until, uint64_t from, uint64_t until); void write_page_header(uint32_t idx, parquet::PageHeader &ph); void write_footer(); void write_data_(std::ostream &file, uint32_t idx, uint32_t size, - uint64_t from, uint64_t until); + uint32_t group, uint32_t page, uint64_t from, + uint64_t until); void write_present_data_(std::ostream &file, uint32_t idx, uint32_t size, uint32_t num_present, + uint32_t group, uint32_t page, uint64_t from, uint64_t until); void write_dictionary_(std::ostream &file, uint32_t idx, uint32_t size, parquet::SchemaElement &sel, int64_t from, diff --git a/src/protect.cpp b/src/protect.cpp index 9d67d21..6a8ce6c 100644 --- a/src/protect.cpp +++ b/src/protect.cpp @@ -17,6 +17,11 @@ SEXP wrapped_intsxp(void *len) { return Rf_allocVector(INTSXP, *xlen); } +SEXP wrapped_lglsxp(void *len) { + R_xlen_t *xlen = (R_xlen_t*) len; + return Rf_allocVector(LGLSXP, *xlen); +} + SEXP wrapped_realsxp(void *len) { R_xlen_t *xlen = (R_xlen_t*) len; return Rf_allocVector(REALSXP, *xlen); diff --git a/src/protect.h b/src/protect.h index 9804c1a..bd8e7b6 100644 --- a/src/protect.h +++ b/src/protect.h @@ -46,6 +46,7 @@ void throw_error(void *err, Rboolean jump); SEXP wrapped_rawsxp(void *len); SEXP wrapped_intsxp(void *len); +SEXP wrapped_lglsxp(void *len); SEXP wrapped_realsxp(void *len); SEXP wrapped_strsxp(void *len); SEXP wrapped_vecsxp(void *len); @@ -66,6 +67,10 @@ inline SEXP safe_allocvector_int(R_xlen_t len, SEXP *uwt) { return R_UnwindProtect(wrapped_intsxp, &len, throw_error, uwt, *uwt); } +inline SEXP safe_allocvector_lgl(R_xlen_t len, SEXP *uwt) { + return R_UnwindProtect(wrapped_lglsxp, &len, throw_error, uwt, *uwt); +} + inline SEXP safe_allocvector_real(R_xlen_t len, SEXP *uwt) { return R_UnwindProtect(wrapped_realsxp, &len, throw_error, uwt, *uwt); } diff --git a/src/read-metadata.cpp b/src/read-metadata.cpp index 3b067e8..412729f 100644 --- a/src/read-metadata.cpp +++ b/src/read-metadata.cpp @@ -301,7 +301,12 @@ SEXP convert_column_chunks(const char *file_name, "data_page_offset", "index_page_offset", "dictionary_page_offset", - // TODO: statistics + "null_count", + "min_value", + "max_value", + "is_min_value_exact", + "is_max_value_exact", + // TODO: more statistics // TODO: encoding_stats "" }; @@ -334,6 +339,11 @@ SEXP convert_column_chunks(const char *file_name, SET_VECTOR_ELT(rccs, 16, safe_allocvector_real(nccs, &uwtoken)); // data_page_offset SET_VECTOR_ELT(rccs, 17, safe_allocvector_real(nccs, &uwtoken)); // index_page_offset SET_VECTOR_ELT(rccs, 18, safe_allocvector_real(nccs, &uwtoken)); // dictionary_page_offset + SET_VECTOR_ELT(rccs, 19, safe_allocvector_real(nccs, &uwtoken)); // statistics.null_count + SET_VECTOR_ELT(rccs, 20, safe_allocvector_vec(nccs, &uwtoken)); // statistics.min_value + SET_VECTOR_ELT(rccs, 21, safe_allocvector_vec(nccs, &uwtoken)); // statistics.max_value + SET_VECTOR_ELT(rccs, 22, safe_allocvector_lgl(nccs, &uwtoken)); // statistics.is_min_value_exact + SET_VECTOR_ELT(rccs, 23, safe_allocvector_lgl(nccs, &uwtoken)); // statistics.is_max_value_exact SEXP rfile_name = PROTECT(safe_mkchar(file_name, &uwtoken)); @@ -375,6 +385,25 @@ SEXP convert_column_chunks(const char *file_name, cmd.__isset.index_page_offset ? cmd.index_page_offset : NA_REAL; REAL(VECTOR_ELT(rccs, 18))[idx] = cmd.__isset.dictionary_page_offset ? cmd.dictionary_page_offset : NA_REAL; + REAL(VECTOR_ELT(rccs, 19))[idx] = + cmd.__isset.statistics && cmd.statistics.__isset.null_count ? + cmd.statistics.null_count : NA_REAL; + if (cmd.__isset.statistics && cmd.statistics.__isset.min_value) { + size_t vl = cmd.statistics.min_value.size(); + SET_VECTOR_ELT(VECTOR_ELT(rccs, 20), idx, safe_allocvector_raw(vl, &uwtoken)); + memcpy(RAW(VECTOR_ELT(VECTOR_ELT(rccs, 20), idx)), cmd.statistics.min_value.data(), vl); + } + if (cmd.__isset.statistics && cmd.statistics.__isset.max_value) { + size_t vl = cmd.statistics.max_value.size(); + SET_VECTOR_ELT(VECTOR_ELT(rccs, 21), idx, safe_allocvector_raw(vl, &uwtoken)); + memcpy(RAW(VECTOR_ELT(VECTOR_ELT(rccs, 21), idx)), cmd.statistics.max_value.data(), vl); + } + LOGICAL(VECTOR_ELT(rccs, 22))[idx] = + cmd.__isset.statistics && cmd.statistics.__isset.is_min_value_exact ? + cmd.statistics.is_min_value_exact : NA_LOGICAL; + LOGICAL(VECTOR_ELT(rccs, 23))[idx] = + cmd.__isset.statistics && cmd.statistics.__isset.is_max_value_exact ? + cmd.statistics.is_max_value_exact : NA_LOGICAL; idx++; } diff --git a/src/rwrapper.cpp b/src/rwrapper.cpp index 4e4943b..e96f18c 100644 --- a/src/rwrapper.cpp +++ b/src/rwrapper.cpp @@ -1,3 +1,5 @@ +#include + #include extern "C" { @@ -56,6 +58,18 @@ SEXP zstd_uncompress_raw(SEXP x, SEXP ucl); SEXP test_memstream(); +SEXP read_float(SEXP x) { + float *f = (float*) RAW(x); + double d = *f; + return Rf_ScalarReal(d); +} + +SEXP read_int64(SEXP x) { + int64_t *f = (int64_t*) RAW(x); + double d = *f; + return Rf_ScalarReal(d); +} + SEXP is_asan_() { #if defined(__has_feature) # if __has_feature(address_sanitizer) // for clang @@ -122,6 +136,8 @@ static const R_CallMethodDef R_CallDef[] = { CALLDEF(zstd_uncompress_raw, 2), CALLDEF(test_memstream, 0), + CALLDEF(read_float, 1), + CALLDEF(read_int64, 1), CALLDEF(is_asan_, 0), CALLDEF(is_ubsan_, 0), diff --git a/src/write.cpp b/src/write.cpp index 169ddd0..461229e 100644 --- a/src/write.cpp +++ b/src/write.cpp @@ -89,6 +89,7 @@ extern "C" { SEXP nanoparquet_create_dict(SEXP x, SEXP rlen); SEXP nanoparquet_create_dict_idx_(SEXP x, SEXP from, SEXP until); SEXP nanoparquet_avg_run_length(SEXP x, SEXP rlen); +static SEXP get_list_element(SEXP list, const char *str); } class RParquetOutFile : public ParquetOutFile { @@ -105,27 +106,34 @@ class RParquetOutFile : public ParquetOutFile { int compsession_level, std::vector &row_groups ); - void write_int32(std::ostream &file, uint32_t idx, uint64_t from, - uint64_t until, parquet::SchemaElement &sel); - void write_int64(std::ostream &file, uint32_t idx, uint64_t from, - uint64_t until, parquet::SchemaElement &sel); - void write_int96(std::ostream &file, uint32_t idx, uint64_t from, - uint64_t until, parquet::SchemaElement &sel); - void write_float(std::ostream &file, uint32_t idx, uint64_t from, - uint64_t until, parquet::SchemaElement &sel); - void write_double(std::ostream &file, uint32_t idx, uint64_t from, - uint64_t until, parquet::SchemaElement &sel); - void write_byte_array(std::ostream &file, uint32_t id, uint64_t from, - uint64_t until, parquet::SchemaElement &sel); + void write_row_group(uint32_t group); + void write_int32(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until, + parquet::SchemaElement &sel); + void write_int64(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until, + parquet::SchemaElement &sel); + void write_int96(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until, + parquet::SchemaElement &sel); + void write_float(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until, + parquet::SchemaElement &sel); + void write_double(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until, + parquet::SchemaElement &sel); + void write_byte_array(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until, + parquet::SchemaElement &sel); void write_fixed_len_byte_array(std::ostream &file, uint32_t id, - uint64_t from, uint64_t until, - parquet::SchemaElement &sel); + uint32_t group, uint32_t page, uint64_t from, + uint64_t until, parquet::SchemaElement &sel); uint32_t get_size_byte_array(uint32_t idx, uint32_t num_present, uint64_t from, uint64_t until); - void write_boolean(std::ostream &file, uint32_t idx, uint64_t from, - uint64_t until); - void write_boolean_as_int(std::ostream &file, uint32_t idx, - uint64_t from, uint64_t until); + void write_boolean(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until); + void write_boolean_as_int(std::ostream &file, uint32_t idx, uint32_t group, + uint32_t page, uint64_t from, uint64_t until); uint32_t write_present(std::ostream &file, uint32_t idx, uint64_t from, uint64_t until); @@ -137,8 +145,9 @@ class RParquetOutFile : public ParquetOutFile { uint64_t until); // for dictionaries - uint32_t get_num_values_dictionary(uint32_t idx, int64_t form, - int64_t until); + uint32_t get_num_values_dictionary(uint32_t idx, + parquet::SchemaElement &sel, + int64_t form, int64_t until); uint32_t get_size_dictionary(uint32_t idx, parquet::SchemaElement &type, int64_t from, int64_t until); void write_dictionary(std::ostream &file, uint32_t idx, @@ -148,6 +157,12 @@ class RParquetOutFile : public ParquetOutFile { int64_t rg_from, int64_t rg_until, uint64_t page_from, uint64_t page_until); + // statistics + bool get_group_minmax_values(uint32_t idx, uint32_t group, + parquet::SchemaElement &sel, + std::string &min_value, + std::string &max_value); + void write( SEXP dfsxp, SEXP dim, @@ -165,11 +180,36 @@ class RParquetOutFile : public ParquetOutFile { SEXP dicts_from = R_NilValue; ByteBuffer present; - void create_dictionary(uint32_t idx, int64_t from, int64_t until); + bool write_minmax_values; + std::vector is_minmax_supported; + std::vector min_values; + std::vector max_values; + std::vector has_minmax_value; + + void create_dictionary(uint32_t idx, int64_t from, int64_t until, + parquet::SchemaElement &sel); // for LGLSXP this mean RLE encoding bool should_use_dict_encoding(uint32_t idx); parquet::Encoding::type detect_encoding(uint32_t idx, parquet::SchemaElement &sel, int32_t renc); + + void write_integer_int32(std::ostream &file, SEXP col, uint32_t idx, + uint64_t from, uint64_t until, + parquet::SchemaElement &sel); + void write_double_int32_time(std::ostream &file, SEXP col, uint32_t idx, + uint64_t from, uint64_t until, + parquet::SchemaElement &sel, double factor); + void write_double_int32(std::ostream &file, SEXP col, uint32_t idx, + uint64_t from, uint64_t until, + parquet::SchemaElement &sel); + void write_integer_int64(std::ostream &file, SEXP col, uint32_t idx, + uint64_t from, uint64_t until); + void write_double_int64(std::ostream &file, SEXP col, uint32_t idx, + uint64_t from, uint64_t until, + parquet::SchemaElement &sel); + void write_double_int64_time(std::ostream &file, SEXP col, uint32_t idx, + uint64_t from, uint64_t until, + parquet::SchemaElement &sel, double factor); }; RParquetOutFile::RParquetOutFile( @@ -188,8 +228,33 @@ RParquetOutFile::RParquetOutFile( ParquetOutFile(stream, codec, compression_level, row_groups) { } +static bool is_time(parquet::SchemaElement &sel, double &factor) { + factor = 1.0; + if (sel.__isset.logicalType && sel.logicalType.__isset.TIME) { + auto unit = sel.logicalType.TIME.unit; + if (unit.__isset.MILLIS) { + factor = 1000; + } else if (unit.__isset.MICROS) { + factor = 1000 * 1000; + } else if (unit.__isset.NANOS) { + factor = 1000 * 1000 * 1000; + } + return true; + } else if (sel.__isset.converted_type) { + if (sel.converted_type == parquet::ConvertedType::TIME_MILLIS) { + factor = 1000; + return true; + } else if (sel.converted_type == parquet::ConvertedType::TIME_MICROS) { + factor = 1000 * 1000; + return true; + } + } + return false; +} + void RParquetOutFile::create_dictionary(uint32_t idx, int64_t from, - int64_t until) { + int64_t until, + parquet::SchemaElement &sel) { if (!Rf_isNull(VECTOR_ELT(dicts, idx)) && INTEGER(dicts_from)[idx] == from) { return; @@ -202,6 +267,75 @@ void RParquetOutFile::create_dictionary(uint32_t idx, int64_t from, SET_VECTOR_ELT(dicts, idx, d); INTEGER(dicts_from)[idx] = from; UNPROTECT(3); + if (write_minmax_values && Rf_length(d) == 4 && + is_minmax_supported[idx] && Rf_xlength(col) > 0 && + !Rf_isNull(VECTOR_ELT(d, 2)) && !Rf_isNull(VECTOR_ELT(d, 3))) { + has_minmax_value[idx] = true; + if (TYPEOF(VECTOR_ELT(d, 2)) == INTSXP) { + if (sel.type == parquet::Type::INT32) { + min_values[idx] = std::string((const char*) INTEGER(VECTOR_ELT(d, 2)), sizeof(int32_t)); + max_values[idx] = std::string((const char*) INTEGER(VECTOR_ELT(d, 3)), sizeof(int32_t)); + } else if (sel.type == parquet::Type::INT64) { + int64_t min = INTEGER(VECTOR_ELT(d, 2))[0]; + int64_t max = INTEGER(VECTOR_ELT(d, 3))[0]; + min_values[idx] = std::string((const char*) &min, sizeof(int64_t)); + max_values[idx] = std::string((const char*) &max, sizeof(int64_t)); + } else { + Rf_errorcall( + nanoparquet_call, + "Cannot convert an integer vector to Parquet type %s.", + parquet::_Type_VALUES_TO_NAMES.at(sel.type) + ); + } + } else if (TYPEOF(VECTOR_ELT(d, 2)) == REALSXP) { + double factor; + bool istime = is_time(sel, factor); + if (istime) { + if (sel.type == parquet::Type::INT32) { + int32_t min = REAL(VECTOR_ELT(d, 2))[0] * factor; + int32_t max = REAL(VECTOR_ELT(d, 3))[0] * factor; + min_values[idx] = std::string((const char*) &min, sizeof(int32_t)); + max_values[idx] = std::string((const char*) &max, sizeof(int32_t)); + } else { + int64_t min = REAL(VECTOR_ELT(d, 2))[0] * factor; + int64_t max = REAL(VECTOR_ELT(d, 3))[0] * factor; + min_values[idx] = std::string((const char*) &min, sizeof(int64_t)); + max_values[idx] = std::string((const char*) &max, sizeof(int64_t)); + } + } else if (sel.type == parquet::Type::INT32) { + int32_t min = REAL(VECTOR_ELT(d, 2))[0]; + int32_t max = REAL(VECTOR_ELT(d, 3))[0]; + min_values[idx] = std::string((const char*) &min, sizeof(int32_t)); + max_values[idx] = std::string((const char*) &max, sizeof(int32_t)); + } else if (sel.type == parquet::Type::DOUBLE) { + min_values[idx] = std::string((const char*) REAL(VECTOR_ELT(d, 2)), sizeof(double)); + max_values[idx] = std::string((const char*) REAL(VECTOR_ELT(d, 3)), sizeof(double)); + } else if (sel.type == parquet::Type::FLOAT) { + float min = REAL(VECTOR_ELT(d, 2))[0]; + float max = REAL(VECTOR_ELT(d, 3))[0]; + min_values[idx] = std::string((const char*) &min, sizeof(float)); + max_values[idx] = std::string((const char*) &max, sizeof(float)); + } else if (sel.type == parquet::Type::INT64) { + int64_t min = REAL(VECTOR_ELT(d, 2))[0]; + int64_t max = REAL(VECTOR_ELT(d, 3))[0]; + min_values[idx] = std::string((const char*) &min, sizeof(int64_t)); + max_values[idx] = std::string((const char*) &max, sizeof(int64_t)); + } else { + Rf_errorcall( + nanoparquet_call, + "Cannot convert a double vector to Parquet type %s.", + parquet::_Type_VALUES_TO_NAMES.at(sel.type) + ); + } + } else if (TYPEOF(VECTOR_ELT(d, 2)) == CHARSXP) { + const char *min = CHAR(VECTOR_ELT(d, 2)); + const char *max = CHAR(VECTOR_ELT(d, 3)); + min_values[idx] = std::string(min, strlen(min)); + max_values[idx] = std::string(max, strlen(min)); + } else { + Rf_error("Unknown R type when writing out min/max values, internal error"); + } + } } static const char *enc_[] = { @@ -492,6 +626,14 @@ static const char *type_names[] = { "an S4 object" }; +void RParquetOutFile::write_row_group(uint32_t group) { + if (write_minmax_values) { + std::fill(min_values.begin(), min_values.end(), std::string()); + std::fill(max_values.begin(), max_values.end(), std::string()); + std::fill(has_minmax_value.begin(), has_minmax_value.end(), false); + } +} + static bool is_decimal(parquet::SchemaElement &sel, int32_t &precision, int32_t &scale) { if (sel.__isset.logicalType && sel.logicalType.__isset.DECIMAL) { @@ -520,30 +662,6 @@ static bool is_decimal(parquet::SchemaElement &sel, int32_t &precision, } } -static bool is_time(parquet::SchemaElement &sel, double &factor) { - factor = 1.0; - if (sel.__isset.logicalType && sel.logicalType.__isset.TIME) { - auto unit = sel.logicalType.TIME.unit; - if (unit.__isset.MILLIS) { - factor = 1000; - } else if (unit.__isset.MICROS) { - factor = 1000 * 1000; - } else if (unit.__isset.NANOS) { - factor = 1000 * 1000 * 1000; - } - return true; - } else if (sel.__isset.converted_type) { - if (sel.converted_type == parquet::ConvertedType::TIME_MILLIS) { - factor = 1000; - return true; - } else if (sel.converted_type == parquet::ConvertedType::TIME_MICROS) { - factor = 1000 * 1000; - return true; - } - } - return false; -} - void write_integer_int32_dec(std::ostream & file, SEXP col, uint64_t from, uint64_t until, int32_t precision, int32_t scale) { @@ -577,23 +695,48 @@ void write_integer_int32_dec(std::ostream & file, SEXP col, uint64_t from, } } -void write_integer_int32(std::ostream &file, SEXP col, uint32_t idx, - uint64_t from, uint64_t until, - parquet::SchemaElement &sel) { +#define GRAB_MIN(idx, t) ((t*) min_values[idx].data()) +#define GRAB_MAX(idx, t) ((t*) max_values[idx].data()) +#define SAVE_MIN(idx, val, t) do { \ + min_values[idx] = std::string((const char*) &val, sizeof(t)); \ + min_value = (t*) min_values[idx].data(); } while (0) +#define SAVE_MAX(idx, val, t) do { \ + max_values[idx] = std::string((const char*) &val, sizeof(t)); \ + max_value = (t*) max_values[idx].data(); } while (0) + +void RParquetOutFile::write_integer_int32(std::ostream &file, SEXP col, + uint32_t idx, + uint64_t from, uint64_t until, + parquet::SchemaElement &sel) { bool is_signed = TRUE; int bit_width = 32; if (sel.__isset.logicalType && sel.logicalType.__isset.INTEGER) { is_signed = sel.logicalType.INTEGER.isSigned; bit_width = sel.logicalType.INTEGER.bitWidth; } + + bool minmax = write_minmax_values && is_minmax_supported[idx]; + int32_t *min_value = 0, *max_value = 0; + if (minmax && has_minmax_value[idx]) { + min_value = GRAB_MIN(idx, int32_t); + max_value = GRAB_MAX(idx, int32_t); + } + if (bit_width == 32) { - if (sel.repetition_type == parquet::FieldRepetitionType::REQUIRED) { + if (!minmax && + sel.repetition_type == parquet::FieldRepetitionType::REQUIRED) { uint64_t len = until - from; file.write((const char *) (INTEGER(col) + from), sizeof(int) * len); } else { for (uint64_t i = from; i < until; i++) { int32_t val = INTEGER(col)[i]; if (val == NA_INTEGER) continue; + if (minmax && (min_value == 0 || val < *min_value)) { + SAVE_MIN(idx, val, int32_t); + } + if (minmax && (max_value == 0 || val > *max_value)) { + SAVE_MAX(idx, val, int32_t); + } file.write((const char*) &val, sizeof(int32_t)); } } @@ -622,9 +765,16 @@ void write_integer_int32(std::ostream &file, SEXP col, uint32_t idx, w, (is_signed ? "" : "U"), bit_width, val, idx + 1, i + 1 ); } + if (minmax && (min_value == 0 || val < *min_value)) { + SAVE_MIN(idx, val, int32_t); + } + if (minmax && (max_value == 0 || val > *max_value)) { + SAVE_MAX(idx, val, int32_t); + } file.write((const char *) &val, sizeof(int32_t)); } } + has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0; } void write_double_int32_dec(std::ostream &file, SEXP col, uint64_t from, @@ -660,20 +810,37 @@ void write_double_int32_dec(std::ostream &file, SEXP col, uint64_t from, } } -void write_double_int32_time(std::ostream &file, SEXP col, uint32_t idx, - uint64_t from, uint64_t until, - parquet::SchemaElement &sel, double factor) { +void RParquetOutFile::write_double_int32_time(std::ostream &file, SEXP col, + uint32_t idx, uint64_t from, + uint64_t until, + parquet::SchemaElement &sel, + double factor) { + int32_t *min_value = 0, *max_value = 0; + bool minmax = write_minmax_values && is_minmax_supported[idx]; + if (minmax && has_minmax_value[idx]) { + min_value = GRAB_MIN(idx, int32_t); + max_value = GRAB_MAX(idx, int32_t); + } + for (uint64_t i = from; i < until; i++) { double val = REAL(col)[i]; if (R_IsNA(val)) continue; int32_t ival = val * factor; + if (minmax && (min_value == 0 || ival < *min_value)) { + SAVE_MIN(idx, ival, int32_t); + } + if (minmax && (max_value == 0 || ival > *max_value)) { + SAVE_MAX(idx, ival, int32_t); + } file.write((const char *)&ival, sizeof(int32_t)); } + has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0; } -void write_double_int32(std::ostream &file, SEXP col, uint32_t idx, - uint64_t from, uint64_t until, - parquet::SchemaElement &sel) { +void RParquetOutFile::write_double_int32(std::ostream &file, SEXP col, + uint32_t idx, uint64_t from, + uint64_t until, + parquet::SchemaElement &sel) { bool is_signed = TRUE; int bit_width = 32; if (sel.__isset.logicalType && sel.logicalType.__isset.INTEGER) { @@ -681,6 +848,13 @@ void write_double_int32(std::ostream &file, SEXP col, uint32_t idx, bit_width = sel.logicalType.INTEGER.bitWidth; } if (is_signed) { + int32_t *min_value = 0, *max_value = 0; + bool minmax = write_minmax_values && is_minmax_supported[idx]; + if (minmax && has_minmax_value[idx]) { + min_value = GRAB_MIN(idx, int32_t); + max_value = GRAB_MAX(idx, int32_t); + } + int32_t min, max; switch (bit_width) { case 8: @@ -709,9 +883,23 @@ void write_double_int32(std::ostream &file, SEXP col, uint32_t idx, ); } int32_t ival = val; + if (minmax && (min_value == 0 || ival < *min_value)) { + SAVE_MIN(idx, ival, int32_t); + } + if (minmax && (max_value == 0 || ival > *max_value)) { + SAVE_MAX(idx, ival, int32_t); + } file.write((const char *)&ival, sizeof(int32_t)); } + has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0; } else { + uint32_t *min_value = 0, *max_value = 0; + bool minmax = write_minmax_values && is_minmax_supported[idx]; + if (minmax && has_minmax_value[idx]) { + min_value = GRAB_MIN(idx, uint32_t); + max_value = GRAB_MAX(idx, uint32_t); + } + uint32_t max; switch (bit_width) { case 8: @@ -748,13 +936,21 @@ void write_double_int32(std::ostream &file, SEXP col, uint32_t idx, val, idx + 1, i + 1 ); } - int32_t ival = val; - file.write((const char *)&ival, sizeof(int32_t)); + uint32_t uival = val; + if (minmax && (min_value == 0 || uival < *min_value)) { + SAVE_MIN(idx, uival, uint32_t); + } + if (minmax && (max_value == 0 || uival > *max_value)) { + SAVE_MAX(idx, uival, uint32_t); + } + file.write((const char *)&uival, sizeof(uint32_t)); } + has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0; } } void RParquetOutFile::write_int32(std::ostream &file, uint32_t idx, + uint32_t group, uint32_t page, uint64_t from, uint64_t until, parquet::SchemaElement &sel) { SEXP col = VECTOR_ELT(df, idx); @@ -831,16 +1027,31 @@ void write_integer_int64_dec(std::ostream &file, SEXP col, uint64_t from, } } -void write_integer_int64(std::ostream &file, SEXP col, uint64_t from, - uint64_t until) { +void RParquetOutFile::write_integer_int64(std::ostream &file, SEXP col, + uint32_t idx, + uint64_t from, uint64_t until) { + + int64_t *min_value = 0, *max_value = 0; + bool minmax = write_minmax_values && is_minmax_supported[idx]; + if (minmax && has_minmax_value[idx]) { + min_value = GRAB_MIN(idx, int64_t); + max_value = GRAB_MAX(idx, int64_t); + } for (uint64_t i = from; i < until; i++) { int32_t val = INTEGER(col)[i]; if (val == NA_INTEGER) continue; int64_t el = val; + if (minmax && (min_value == 0 || el < *min_value)) { + SAVE_MIN(idx, el, int64_t); + } + if (minmax && (max_value == 0 || el > *max_value)) { + SAVE_MAX(idx, el, int64_t); + } file.write((const char*) &el, sizeof(int64_t)); } - } + has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0; +} void write_double_int64_dec(std::ostream &file, SEXP col, uint64_t from, uint64_t until, int32_t precision, @@ -874,20 +1085,44 @@ void write_integer_int64(std::ostream &file, SEXP col, uint64_t from, } } -void write_double_int64_time(std::ostream &file, SEXP col, uint32_t idx, - uint64_t from, uint64_t until, - parquet::SchemaElement &sel, double factor) { +void RParquetOutFile::write_double_int64_time(std::ostream &file, SEXP col, + uint32_t idx, uint64_t from, + uint64_t until, + parquet::SchemaElement &sel, + double factor) { + int64_t *min_value = 0, *max_value = 0; + bool minmax = write_minmax_values && is_minmax_supported[idx]; + if (minmax && has_minmax_value[idx]) { + min_value = GRAB_MIN(idx, int64_t); + max_value = GRAB_MAX(idx, int64_t); + } + for (uint64_t i = from; i < until; i++) { double val = REAL(col)[i]; if (R_IsNA(val)) continue; int64_t ival = val * factor; + if (minmax && (min_value == 0 || ival < *min_value)) { + SAVE_MIN(idx, ival, int64_t); + } + if (minmax && (max_value == 0 || ival > *max_value)) { + SAVE_MAX(idx, ival, int64_t); + } file.write((const char *)&ival, sizeof(int64_t)); } + has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0; } -void write_double_int64(std::ostream &file, SEXP col, uint32_t idx, - uint64_t from, uint64_t until, - parquet::SchemaElement &sel) { +void RParquetOutFile::write_double_int64(std::ostream &file, SEXP col, + uint32_t idx, uint64_t from, + uint64_t until, + parquet::SchemaElement &sel) { + int64_t *min_value = 0, *max_value = 0; + bool minmax = write_minmax_values && is_minmax_supported[idx]; + if (minmax && has_minmax_value[idx]) { + min_value = GRAB_MIN(idx, int64_t); + max_value = GRAB_MAX(idx, int64_t); + } + if (Rf_inherits(col, "POSIXct")) { int64_t fact = 1; if (sel.__isset.logicalType && sel.logicalType.__isset.TIMESTAMP) { @@ -910,15 +1145,29 @@ void write_double_int64(std::ostream &file, SEXP col, uint32_t idx, double val = REAL(col)[i]; if (R_IsNA(val)) continue; int64_t el = val * fact; + if (minmax && (min_value == 0 || el < *min_value)) { + SAVE_MIN(idx, el, int64_t); + } + if (minmax && (max_value == 0 || el > *max_value)) { + SAVE_MAX(idx, el, int64_t); + } file.write((const char *)&el, sizeof(int64_t)); } + has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0; } else if (Rf_inherits(col, "difftime")) { for (uint64_t i = from; i < until; i++) { double val = REAL(col)[i]; if (R_IsNA(val)) continue; int64_t el = val * 1000 * 1000 * 1000; + if (minmax && (min_value == 0 || el < *min_value)) { + SAVE_MIN(idx, el, int64_t); + } + if (minmax && (max_value == 0 || el > *max_value)) { + SAVE_MAX(idx, el, int64_t); + } file.write((const char *)&el, sizeof(int64_t)); } + has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0; } else { bool is_signed = TRUE; int bit_width = 64; @@ -947,10 +1196,23 @@ void write_double_int64(std::ostream &file, SEXP col, uint32_t idx, ); } int64_t el = val; + if (minmax && (min_value == 0 || el < *min_value)) { + SAVE_MIN(idx, el, int64_t); + } + if (minmax && (max_value == 0 || el > *max_value)) { + SAVE_MAX(idx, el, int64_t); + } file.write((const char *)&el, sizeof(int64_t)); } + has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0; } else { double max = pow(2, 64) - 1; + uint64_t *min_value = 0, *max_value = 0; + bool minmax = write_minmax_values && is_minmax_supported[idx]; + if (minmax && has_minmax_value[idx]) { + min_value = GRAB_MIN(idx, uint64_t); + max_value = GRAB_MAX(idx, uint64_t); + } for (uint64_t i = from; i < until; i++) { double val = REAL(col)[i]; if (R_IsNA(val)) continue; @@ -971,13 +1233,21 @@ void write_double_int64(std::ostream &file, SEXP col, uint32_t idx, ); } uint64_t el = val; + if (minmax && (min_value == 0 || el < *min_value)) { + SAVE_MIN(idx, el, uint64_t); + } + if (minmax && (max_value == 0 || el > *max_value)) { + SAVE_MAX(idx, el, uint64_t); + } file.write((const char *)&el, sizeof(uint64_t)); } + has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0; } } } void RParquetOutFile::write_int64(std::ostream &file, uint32_t idx, + uint32_t group, uint32_t page, uint64_t from, uint64_t until, parquet::SchemaElement &sel) { // This is double in R, so we need to convert @@ -997,7 +1267,7 @@ void RParquetOutFile::write_int64(std::ostream &file, uint32_t idx, if (isdec) { write_integer_int64_dec(file, col, from, until, precision, scale); } else { - write_integer_int64(file, col, from, until); + write_integer_int64(file, col, idx, from, until); } break; case REALSXP: @@ -1019,6 +1289,7 @@ void RParquetOutFile::write_int64(std::ostream &file, uint32_t idx, } void RParquetOutFile::write_int96(std::ostream &file, uint32_t idx, + uint32_t group, uint32_t page, uint64_t from, uint64_t until, parquet::SchemaElement &sel) { // This is double in R, so we need to convert @@ -1058,6 +1329,7 @@ void RParquetOutFile::write_int96(std::ostream &file, uint32_t idx, } void RParquetOutFile::write_float(std::ostream &file, uint32_t idx, + uint32_t group, uint32_t page, uint64_t from, uint64_t until, parquet::SchemaElement &sel) { SEXP col = VECTOR_ELT(df, idx); @@ -1074,15 +1346,31 @@ void RParquetOutFile::write_float(std::ostream &file, uint32_t idx, "Internal nanoparquet error, row index too large" ); } + + bool minmax = write_minmax_values && is_minmax_supported[idx]; + float *min_value = 0, *max_value = 0; + if (minmax && has_minmax_value[idx]) { + min_value = GRAB_MIN(idx, float); + max_value = GRAB_MAX(idx, float); + } + for (uint64_t i = from; i < until; i++) { double val = REAL(col)[i]; if (R_IsNA(val)) continue; float el = val; + if (minmax && (min_value == 0 || el< *min_value)) { + SAVE_MIN(idx, el, float); + } + if (minmax && (max_value == 0 || el > *max_value)) { + SAVE_MAX(idx, el, float); + } file.write((const char*) &el, sizeof(float)); } + has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0; } void RParquetOutFile::write_double(std::ostream &file, uint32_t idx, + uint32_t group, uint32_t page, uint64_t from, uint64_t until, parquet::SchemaElement &sel) { SEXP col = VECTOR_ELT(df, idx); @@ -1099,19 +1387,63 @@ void RParquetOutFile::write_double(std::ostream &file, uint32_t idx, "Internal nanoparquet error, row index too large" ); } - if (sel.repetition_type == parquet::FieldRepetitionType::REQUIRED) { + + bool minmax = write_minmax_values && is_minmax_supported[idx]; + double *min_value = 0, *max_value = 0; + if (minmax && has_minmax_value[idx]) { + min_value = GRAB_MIN(idx, double); + max_value = GRAB_MAX(idx, double); + } + + if (!minmax && + sel.repetition_type == parquet::FieldRepetitionType::REQUIRED) { uint64_t len = until - from; file.write((const char *) (REAL(col) + from), sizeof(double) * len); } else { for (uint64_t i = from; i < until; i++) { double val = REAL(col)[i]; if (R_IsNA(val)) continue; + if (minmax && (min_value == 0 || val < *min_value)) { + SAVE_MIN(idx, val, double); + } + if (minmax && (max_value == 0 || val > *max_value)) { + SAVE_MAX(idx, val, double); + } file.write((const char*) &val, sizeof(double)); } + has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0; } } +static inline bool STR_LESS(const char *c, size_t l, std::string &etalon) { + size_t el = etalon.size(); + // "" is less than anything but "" + if (l == 0) return el > 0; + // otherwise anything is more than "" + if (el == 0) return false; + int res = memcmp(c, etalon.data(), l < el ? l : el); + return res < 0 || (res == 0 && l < el); +} + +static inline bool STR_MORE(const char *c, size_t l, std::string &etalon) { + size_t el = etalon.size(); + // "" is not more than anything + if (l == 0) return false; + // othwrwise anything is more than "" + if (el == 0) return true; + int res = memcmp(c, etalon.data(), l < el ? l : el); + return res > 0 || (res == 0 && l > el); +} + +#define SAVE_MIN_STR(idx, c, l) do { \ + min_values[idx] = std::string((c), (l)); \ + min_value = &min_values[idx]; } while (0) +#define SAVE_MAX_STR(idx, c, l) do { \ + max_values[idx] = std::string((c), (l)); \ + max_value = &max_values[idx]; } while (0) + void RParquetOutFile::write_byte_array(std::ostream &file, uint32_t idx, + uint32_t group, uint32_t page, uint64_t from, uint64_t until, parquet::SchemaElement &sel) { SEXP col = VECTOR_ELT(df, idx); @@ -1124,6 +1456,13 @@ void RParquetOutFile::write_byte_array(std::ostream &file, uint32_t idx, switch (TYPEOF(col)) { case STRSXP: { + bool minmax = write_minmax_values && is_minmax_supported[idx]; + std::string *min_value = nullptr, *max_value = nullptr; + if (minmax && has_minmax_value[idx]) { + min_value = &min_values[idx]; + max_value = &max_values[idx]; + } + for (uint64_t i = from; i < until; i++) { SEXP el = STRING_ELT(col, i); if (el == NA_STRING) { @@ -1131,9 +1470,16 @@ void RParquetOutFile::write_byte_array(std::ostream &file, uint32_t idx, } const char *c = CHAR(el); uint32_t len1 = strlen(c); + if (minmax && (min_value == nullptr || STR_LESS(c, len1, *min_value))) { + SAVE_MIN_STR(idx, c, len1); + } + if (minmax && (max_value == nullptr || STR_MORE(c, len1, *max_value))) { + SAVE_MAX_STR(idx, c, len1); + } file.write((const char *)&len1, 4); file.write(c, len1); } + has_minmax_value[idx] = has_minmax_value[idx] || min_value != nullptr; break; } case VECSXP: { @@ -1155,15 +1501,6 @@ void RParquetOutFile::write_byte_array(std::ostream &file, uint32_t idx, } break; } - case INTSXP: { - int32_t precision, scale; - bool isdec = is_decimal(sel, precision, scale); - - break; - } - case REALSXP: { - break; - } default: Rf_errorcall( // # nocov nanoparquet_call, // # nocov @@ -1259,6 +1596,7 @@ static bool parse_uuid(const char *c, char *u, char *t) { void RParquetOutFile::write_fixed_len_byte_array( std::ostream &file, uint32_t idx, + uint32_t group, uint32_t page, uint64_t from, uint64_t until, parquet::SchemaElement &sel) { @@ -1399,6 +1737,7 @@ void write_boolean_impl(std::ostream &file, SEXP col, } void RParquetOutFile::write_boolean(std::ostream &file, uint32_t idx, + uint32_t group, uint32_t page, uint64_t from, uint64_t until) { SEXP col = VECTOR_ELT(df, idx); if (TYPEOF(col) != LGLSXP) { @@ -1413,6 +1752,8 @@ void RParquetOutFile::write_boolean(std::ostream &file, uint32_t idx, void RParquetOutFile::write_boolean_as_int(std::ostream &file, uint32_t idx, + uint32_t group, + uint32_t page, uint64_t from, uint64_t until) { SEXP col = VECTOR_ELT(df, idx); @@ -1551,13 +1892,14 @@ void RParquetOutFile::write_present_boolean( uint32_t RParquetOutFile::get_num_values_dictionary( uint32_t idx, + parquet::SchemaElement &sel, int64_t from, int64_t until) { SEXP col = VECTOR_ELT(df, idx); if (Rf_inherits(col, "factor")) { return Rf_nlevels(col); } else { - create_dictionary(idx, from, until); + create_dictionary(idx, from, until, sel); return Rf_length(VECTOR_ELT(VECTOR_ELT(dicts, idx), 0)); } } @@ -1584,7 +1926,7 @@ uint32_t RParquetOutFile::get_size_dictionary( UNPROTECT(1); return size; } else { - create_dictionary(idx, from, until); + create_dictionary(idx, from, until, sel); SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); if (type == parquet::Type::INT32) { return Rf_xlength(dictidx) * sizeof(int); @@ -1603,7 +1945,7 @@ uint32_t RParquetOutFile::get_size_dictionary( break; } case REALSXP: { - create_dictionary(idx, from, until); + create_dictionary(idx, from, until, sel); SEXP dict = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); if (type == parquet::Type::DOUBLE) { return Rf_xlength(dict) * sizeof(double); @@ -1628,7 +1970,7 @@ uint32_t RParquetOutFile::get_size_dictionary( } case STRSXP: { // need to count the length of the stings that are indexed in dict - create_dictionary(idx, from, until); + create_dictionary(idx, from, until, sel); SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); R_xlen_t len = Rf_xlength(dictidx); bool is_uuid = sel.__isset.logicalType && sel.logicalType.__isset.UUID; @@ -1649,7 +1991,7 @@ uint32_t RParquetOutFile::get_size_dictionary( } case LGLSXP: { // this does not happen, no dictionaries for BOOLEAN, makes no sense - create_dictionary(idx, from, until); // # nocov + create_dictionary(idx, from, until, sel); // # nocov SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); // # nocov R_xlen_t l = Rf_xlength(dictidx); // # nocov return l / 8 + (l % 8 > 0); // # nocov @@ -2173,6 +2515,23 @@ void RParquetOutFile::write_dictionary_indices( } } +bool RParquetOutFile::get_group_minmax_values(uint32_t idx, uint32_t group, + parquet::SchemaElement &sel, + std::string &min_value, + std::string &max_value) { + + if (!is_minmax_supported[idx]) { + return false; + } else if (!has_minmax_value[idx]) { + // maybe all values are missing + return false; + } else { + min_value = min_values[idx]; + max_value = max_values[idx]; + return true; + } +} + void nanoparquet_map_to_parquet_type( SEXP x, SEXP options, @@ -2417,6 +2776,13 @@ void RParquetOutFile::write( R_xlen_t nr = INTEGER(dim)[0]; set_num_rows(nr); R_xlen_t nc = INTEGER(dim)[1]; + + write_minmax_values = LOGICAL(get_list_element(options, "write_minmax_values"))[0]; + is_minmax_supported = std::vector(nc, false); + has_minmax_value.resize(nc); + min_values.resize(nc); + max_values.resize(nc); + for (R_xlen_t idx = 0; idx < nc; idx++) { SEXP col = VECTOR_ELT(dfsxp, idx); bool req = LOGICAL(required)[idx]; @@ -2453,6 +2819,33 @@ void RParquetOutFile::write( } } + if (!write_minmax_values) { + // nothing to do + } if (sel.__isset.logicalType) { + parquet::LogicalType < = sel.logicalType; + is_minmax_supported[idx] = lt.__isset.DATE || lt.__isset.INTEGER || + lt.__isset.TIME || lt.__isset.STRING || lt.__isset.ENUM || + lt.__isset.JSON || lt.__isset.BSON || lt.__isset.TIMESTAMP; + // TODO: support the rest + // is_minmax_supported[idx] = lt.__isset.UUID || + // lt.__isset.DECIMAL || lt.isset.FLOAT16; + } else { + switch(sel.type) { + // case parquet::Type::BOOLEAN: + case parquet::Type::INT32: + case parquet::Type::INT64: + case parquet::Type::FLOAT: + case parquet::Type::DOUBLE: + // case parquet::Type::BYTE_ARRAY; + // case parquet::Type::FIXED_LEN_BYTE_ARRAY; + is_minmax_supported[idx] = true; + break; + default: + is_minmax_supported[idx] = false; + break; + } + } + int32_t ienc = INTEGER(encoding)[idx]; parquet::Encoding::type enc = detect_encoding(idx, sel, ienc); schema_add_column(sel, enc); diff --git a/tests/testthat/_snaps/parquet-metadata.md b/tests/testthat/_snaps/parquet-metadata.md index e706280..b9dc1fe 100644 --- a/tests/testthat/_snaps/parquet-metadata.md +++ b/tests/testthat/_snaps/parquet-metadata.md @@ -48,7 +48,7 @@ ordinal 1 NA Code - as.data.frame(mtd$column_chunks) + as.data.frame(mtd$column_chunks[, 1:20]) Output file_name row_group column file_path file_offset offset_index_offset 1 test.parquet 0 0 4 NA @@ -106,20 +106,20 @@ 11 275 2879 NA 12 275 3154 NA 13 21 3429 NA - dictionary_page_offset - 1 NA - 2 NA - 3 NA - 4 NA - 5 NA - 6 NA - 7 NA - 8 NA - 9 NA - 10 NA - 11 NA - 12 NA - 13 NA + dictionary_page_offset null_count + 1 NA 0 + 2 NA 0 + 3 NA 0 + 4 NA 0 + 5 NA 0 + 6 NA 0 + 7 NA 0 + 8 NA 0 + 9 NA 0 + 10 NA 0 + 11 NA 0 + 12 NA 0 + 13 NA 0 --- diff --git a/tests/testthat/_snaps/write-parquet-statistics.md b/tests/testthat/_snaps/write-parquet-statistics.md new file mode 100644 index 0000000..4048500 --- /dev/null +++ b/tests/testthat/_snaps/write-parquet-statistics.md @@ -0,0 +1,982 @@ +# null_count is written + + Code + as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]][, c("row_group", + "column", "null_count")]) + Output + row_group column null_count + 1 0 0 1 + 2 0 1 1 + 3 0 2 1 + 4 0 3 1 + 5 0 4 1 + 6 0 5 1 + 7 0 6 1 + 8 0 7 1 + 9 0 8 1 + 10 0 9 1 + 11 0 10 0 + 12 0 11 0 + 13 0 12 0 + 14 1 0 0 + 15 1 1 0 + 16 1 2 0 + 17 1 3 0 + 18 1 4 0 + 19 1 5 0 + 20 1 6 0 + 21 1 7 0 + 22 1 8 0 + 23 1 9 0 + 24 1 10 1 + 25 1 11 1 + 26 1 12 1 + 27 2 0 0 + 28 2 1 0 + 29 2 2 0 + 30 2 3 0 + 31 2 4 0 + 32 2 5 0 + 33 2 6 0 + 34 2 7 0 + 35 2 8 0 + 36 2 9 0 + 37 2 10 0 + 38 2 11 0 + 39 2 12 0 + 40 3 0 0 + 41 3 1 0 + 42 3 2 0 + 43 3 3 0 + 44 3 4 0 + 45 3 5 0 + 46 3 6 0 + 47 3 7 0 + 48 3 8 0 + 49 3 9 0 + 50 3 10 0 + 51 3 11 0 + 52 3 12 0 + +# min/max for integers + + Code + do(compression = "snappy") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "uncompressed") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "snappy") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "uncompressed") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +# min/max for DATEs + + Code + do() + Output + [[1]] + name r_type type type_length repetition_type converted_type logical_type + 1 schema NA + 2 day Date INT32 NA OPTIONAL DATE DATE + 3 count integer INT32 NA REQUIRED INT_32 INT, 32,.... + num_children scale precision field_id + 1 2 NA NA NA + 2 NA NA NA NA + 3 NA NA NA NA + + [[2]] + [1] "2024-09-06" "2024-09-08" "2024-09-10" "2024-09-12" "2024-09-14" + + [[3]] + [1] "2024-09-07" "2024-09-09" "2024-09-11" "2024-09-13" "2024-09-15" + + +# min/max for double -> signed integers + + Code + do(compression = "snappy") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "uncompressed") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "snappy") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "uncompressed") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +# min/max for double -> unsigned integers + + Code + do(compression = "snappy") + Output + [[1]] + [1] 1 1 0 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "uncompressed") + Output + [[1]] + [1] 1 1 0 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "snappy") + Output + [[1]] + [1] 1 1 0 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "uncompressed") + Output + [[1]] + [1] 1 1 0 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +# minmax for double -> INT32 TIME(MULLIS) + + Code + do(compression = "snappy") + Output + [[1]] + [1] 1000 -100000 -1000000 NA + + [[2]] + [1] 5000 100000 1000000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "uncompressed") + Output + [[1]] + [1] 1000 -100000 -1000000 NA + + [[2]] + [1] 5000 100000 1000000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "snappy") + Output + [[1]] + [1] 1000 -100000 -1000000 NA + + [[2]] + [1] 5000 100000 1000000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "uncompressed") + Output + [[1]] + [1] 1000 -100000 -1000000 NA + + [[2]] + [1] 5000 100000 1000000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +# min/max for DOUBLE + + Code + do(compression = "snappy") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "uncompressed") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "snappy") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "uncompressed") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +# min/max for FLOAT + + Code + do(compression = "snappy") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "uncompressed") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "snappy") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "uncompressed") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +# min/max for integer -> INT64 + + Code + do(compression = "snappy") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "uncompressed") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "snappy") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "uncompressed") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +# min/max for REALSXP -> INT64 + + Code + do(compression = "snappy") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "uncompressed") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "snappy") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "uncompressed") + Output + [[1]] + [1] 1 -100 -1000 NA + + [[2]] + [1] 5 100 1000 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +# min/max for STRING + + Code + do(compression = "snappy") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "uncompressed") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "snappy") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "uncompressed") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "snappy", type = "JSON") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "uncompressed", type = "JSON") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "snappy", type = "JSON") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "uncompressed", type = "JSON") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "snappy", type = "BSON") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "uncompressed", type = "BSON") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "snappy", type = "BSON") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "uncompressed", type = "BSON") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "snappy", type = "ENUM") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "uncompressed", type = "ENUM") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "snappy", type = "ENUM") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(encoding = "RLE_DICTIONARY", compression = "uncompressed", type = "ENUM") + Output + [[1]] + [1] "a" "!!!" "!" NA + + [[2]] + [1] "e" "~~~" "~" NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +# min/max for REALSXP -> TIMESTAMP (INT64) + + Code + do(compression = "snappy") + Output + [[1]] + [1] 1e+06 -1e+08 -1e+09 NA + + [[2]] + [1] 5e+06 1e+08 1e+09 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + +--- + + Code + do(compression = "uncompressed") + Output + [[1]] + [1] 1e+06 -1e+08 -1e+09 NA + + [[2]] + [1] 5e+06 1e+08 1e+09 NA + + [[3]] + [1] TRUE TRUE TRUE NA + + [[4]] + [1] TRUE TRUE TRUE NA + + diff --git a/tests/testthat/test-parquet-metadata.R b/tests/testthat/test-parquet-metadata.R index 3c76c48..2c4d870 100644 --- a/tests/testthat/test-parquet-metadata.R +++ b/tests/testthat/test-parquet-metadata.R @@ -18,7 +18,7 @@ test_that("parquet_metadata", { as.data.frame(mtd$file_meta_data) as.data.frame(mtd$schema) as.data.frame(mtd$row_groups) - as.data.frame(mtd$column_chunks) + as.data.frame(mtd$column_chunks[, 1:20]) }) sch <- read_parquet_schema("test.parquet") diff --git a/tests/testthat/test-write-parquet-statistics.R b/tests/testthat/test-write-parquet-statistics.R new file mode 100644 index 0000000..55719ee --- /dev/null +++ b/tests/testthat/test-write-parquet-statistics.R @@ -0,0 +1,456 @@ +test_that("null_count is written", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + df <- test_df(missing = TRUE) + write_parquet( + df, tmp, + options = parquet_options(num_rows_per_row_group = 10) + ) + expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp))) + expect_snapshot( + as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]][ + , c("row_group", "column", "null_count") + ]) + ) +}) + +test_that("min/max for integers", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + df <- data.frame(x = c( + sample(1:5), + sample(c(1:3, -100L, 100L)), + sample(c(-1000L, NA_integer_, 1000L, NA_integer_, NA_integer_)), + rep(NA_integer_, 3) + )) + + as_int <- function(x) { + sapply(x, function(xx) xx %&&% readBin(xx, what = "integer") %||% NA_integer_) + } + + do <- function(encoding = "PLAIN",...) { + write_parquet( + df, tmp, + encoding = encoding, + options = parquet_options(num_rows_per_row_group = 5), + ... + ) + expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp))) + mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]]) + list( + as_int(mtd[["min_value"]]), + as_int(mtd[["max_value"]]), + mtd[["is_min_value_exact"]], + mtd[["is_max_value_exact"]] + ) + } + expect_snapshot(do(compression = "snappy")) + expect_snapshot(do(compression = "uncompressed")) + + # dictionary + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy")) + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed")) +}) + +test_that("min/max for DATEs", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + + do <- function(...) { + df <- data.frame( + day = rep(as.Date("2024-09-16") - 10:1, each = 10), + count = 1:100 + ) + df$day[c(1, 20, 25, 40)] <- as.Date(NA_character_) + write_parquet( + df, tmp, + options = parquet_options(num_rows_per_row_group = 20), + ... + ) + expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp))) + mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]]) + minv <- mtd[mtd$column == 0, "min_value"] + maxv <- mtd[mtd$column == 0, "max_value"] + list( + as.data.frame(read_parquet_schema(tmp)[, -1]), + as.Date(map_int(minv, readBin, what = "integer", n = 1), origin = "1970-01-01"), + as.Date(map_int(maxv, readBin, what = "integer", n = 1), origin = "1970-01-01") + ) + } + + expect_snapshot(do()) +}) + +test_that("min/max for double -> signed integers", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + df <- data.frame(x = as.double(c( + sample(1:5), + sample(c(1:3, -100L, 100L)), + sample(c(-1000L, NA_integer_, 1000L, NA_integer_, NA_integer_)), + rep(NA_integer_, 3) + ))) + + as_int <- function(x) { + sapply(x, function(xx) xx %&&% readBin(xx, what = "integer") %||% NA_integer_) + } + + do <- function(encoding = "PLAIN",...) { + write_parquet( + df, tmp, + schema = parquet_schema(x = "INT32"), + encoding = encoding, + options = parquet_options(num_rows_per_row_group = 5), + ... + ) + expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp))) + mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]]) + list( + as_int(mtd[["min_value"]]), + as_int(mtd[["max_value"]]), + mtd[["is_min_value_exact"]], + mtd[["is_max_value_exact"]] + ) + } + expect_snapshot(do(compression = "snappy")) + expect_snapshot(do(compression = "uncompressed")) + + # dictionary + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy")) + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed")) +}) + +test_that("min/max for double -> unsigned integers", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + df <- data.frame(x = as.double(c( + sample(1:5), + sample(c(1:3, 1L, 100L)), + sample(c(0L, NA_integer_, 1000L, NA_integer_, NA_integer_)), + rep(NA_integer_, 3) + ))) + + as_int <- function(x) { + sapply(x, function(xx) xx %&&% readBin(xx, what = "integer") %||% NA_integer_) + } + + do <- function(encoding = "PLAIN",...) { + write_parquet( + df, tmp, + schema = parquet_schema(x = "UINT_32"), + encoding = encoding, + options = parquet_options(num_rows_per_row_group = 5), + ... + ) + expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp))) + mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]]) + list( + as_int(mtd[["min_value"]]), + as_int(mtd[["max_value"]]), + mtd[["is_min_value_exact"]], + mtd[["is_max_value_exact"]] + ) + } + expect_snapshot(do(compression = "snappy")) + expect_snapshot(do(compression = "uncompressed")) + + # dictionary + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy")) + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed")) +}) + +test_that("minmax for double -> INT32 TIME(MULLIS)", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + # IDK what's the point of signed TIME, but it seems to be allowed, the + # sort order is signed + df <- data.frame(x = hms::as_hms(c( + sample(1:5), + sample(c(1:3, -100L, 100L)), + sample(c(-1000L, NA_integer_, 1000L, NA_integer_, NA_integer_)), + rep(NA_integer_, 3) + ))) + + as_int <- function(x) { + sapply(x, function(xx) xx %&&% readBin(xx, what = "integer") %||% NA_integer_) + } + + do <- function(encoding = "PLAIN",...) { + write_parquet( + df, tmp, + schema = parquet_schema(x = "TIME_MILLIS"), + encoding = encoding, + options = parquet_options(num_rows_per_row_group = 5), + ... + ) + expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp))) + mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]]) + list( + as_int(mtd[["min_value"]]), + as_int(mtd[["max_value"]]), + mtd[["is_min_value_exact"]], + mtd[["is_max_value_exact"]] + ) + } + expect_snapshot(do(compression = "snappy")) + expect_snapshot(do(compression = "uncompressed")) + + # dictionary + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy")) + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed")) +}) + +test_that("min/max for DOUBLE", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + df <- data.frame(x = as.double(c( + sample(1:5), + sample(c(1:3, -100, 100)), + sample(c(-1000, NA_real_, 1000, NA_real_, NA_real_)), + rep(NA_real_, 3) + ))) + + as_dbl <- function(x) { + sapply(x, function(xx) xx %&&% readBin(xx, what = "double") %||% NA_real_) + } + + do <- function(encoding = "PLAIN",...) { + write_parquet( + df, tmp, + encoding = encoding, + options = parquet_options(num_rows_per_row_group = 5), + ... + ) + expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp))) + mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]]) + list( + as_dbl(mtd[["min_value"]]), + as_dbl(mtd[["max_value"]]), + mtd[["is_min_value_exact"]], + mtd[["is_max_value_exact"]] + ) + } + expect_snapshot(do(compression = "snappy")) + expect_snapshot(do(compression = "uncompressed")) + + # dictionary + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy")) + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed")) +}) + +test_that("min/max for FLOAT", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + df <- data.frame(x = as.double(c( + sample(1:5), + sample(c(1:3, -100, 100)), + sample(c(-1000, NA_real_, 1000, NA_real_, NA_real_)), + rep(NA_real_, 3) + ))) + + as_flt <- function(x) { + sapply(x, function(xx) xx %&&% .Call(read_float, xx) %||% NA_real_) + } + + do <- function(encoding = "PLAIN",...) { + write_parquet( + df, tmp, + schema = parquet_schema(x = "FLOAT"), + encoding = encoding, + options = parquet_options(num_rows_per_row_group = 5), + ... + ) + expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp))) + mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]]) + list( + as_flt(mtd[["min_value"]]), + as_flt(mtd[["max_value"]]), + mtd[["is_min_value_exact"]], + mtd[["is_max_value_exact"]] + ) + } + expect_snapshot(do(compression = "snappy")) + expect_snapshot(do(compression = "uncompressed")) + + # dictionary + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy")) + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed")) +}) + +test_that("min/max for integer -> INT64", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + df <- data.frame(x = c( + sample(1:5), + sample(c(1:3, -100L, 100L)), + sample(c(-1000L, NA_integer_, 1000L, NA_integer_, NA_integer_)), + rep(NA_integer_, 3) + )) + + as_int64 <- function(x) { + sapply(x, function(xx) xx %&&% .Call(read_int64, xx) %||% NA_real_) + } + + do <- function(encoding = "PLAIN",...) { + write_parquet( + df, tmp, + schema = parquet_schema(x = "INT64"), + encoding = encoding, + options = parquet_options(num_rows_per_row_group = 5), + ... + ) + expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp))) + mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]]) + list( + as_int64(mtd[["min_value"]]), + as_int64(mtd[["max_value"]]), + mtd[["is_min_value_exact"]], + mtd[["is_max_value_exact"]] + ) + } + expect_snapshot(do(compression = "snappy")) + expect_snapshot(do(compression = "uncompressed")) + + # dictionary + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy")) + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed")) +}) + +test_that("min/max for REALSXP -> INT64", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + df <- data.frame(x = as.double(c( + sample(1:5), + sample(c(1:3, -100L, 100L)), + sample(c(-1000L, NA_integer_, 1000L, NA_integer_, NA_integer_)), + rep(NA_integer_, 3) + ))) + + as_int64 <- function(x) { + sapply(x, function(xx) xx %&&% .Call(read_int64, xx) %||% NA_real_) + } + + do <- function(encoding = "PLAIN",...) { + write_parquet( + df, tmp, + schema = parquet_schema(x = "INT64"), + encoding = encoding, + options = parquet_options(num_rows_per_row_group = 5), + ... + ) + expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp))) + mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]]) + list( + as_int64(mtd[["min_value"]]), + as_int64(mtd[["max_value"]]), + mtd[["is_min_value_exact"]], + mtd[["is_max_value_exact"]] + ) + } + expect_snapshot(do(compression = "snappy")) + expect_snapshot(do(compression = "uncompressed")) + + # dictionary + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy")) + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed")) +}) + +test_that("min/max for STRING", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + df <- data.frame(x = c( + sample(letters[1:5]), + sample(c(letters[1:3], "!!!", "~~~")), + sample(c("!", NA_character_, "~", NA_character_, NA_character_)), + rep(NA_character_, 3) + )) + + as_str <- function(x) { + sapply(x, function(xx) xx %&&% rawToChar(xx) %||% NA_character_) + } + + do <- function(encoding = "PLAIN", type = "STRING", ...) { + write_parquet( + df, tmp, + schema = parquet_schema(x = type), + encoding = encoding, + options = parquet_options(num_rows_per_row_group = 5), + ... + ) + expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp))) + expect_equal(read_parquet_schema(tmp)$logical_type[[2]]$type, type) + mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]]) + list( + as_str(mtd[["min_value"]]), + as_str(mtd[["max_value"]]), + mtd[["is_min_value_exact"]], + mtd[["is_max_value_exact"]] + ) + } + expect_snapshot(do(compression = "snappy")) + expect_snapshot(do(compression = "uncompressed")) + + # dictionary + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy")) + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed")) + + expect_snapshot(do(compression = "snappy", type = "JSON")) + expect_snapshot(do(compression = "uncompressed", type = "JSON")) + + # dictionary + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy", type = "JSON")) + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed", type = "JSON")) + + expect_snapshot(do(compression = "snappy", type = "BSON")) + expect_snapshot(do(compression = "uncompressed", type = "BSON")) + + # dictionary + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy", type = "BSON")) + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed", type = "BSON")) + + expect_snapshot(do(compression = "snappy", type = "ENUM")) + expect_snapshot(do(compression = "uncompressed", type = "ENUM")) + + # dictionary + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy", type = "ENUM")) + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed", type = "ENUM")) +}) + +test_that("min/max for REALSXP -> TIMESTAMP (INT64)", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + now <- 0L + df <- data.frame(x = .POSIXct(as.double(c( + sample(now + 1:5), + sample(c(now + c(1:3, -100L, 100L))), + sample(c(now - 1000L, NA_integer_, now + 1000L, NA_integer_, NA_integer_)), + rep(NA_integer_, 3) + )), tz = "UTC")) + + as_int64 <- function(x) { + sapply(x, function(xx) xx %&&% .Call(read_int64, xx) %||% NA_real_) + } + + do <- function(encoding = "PLAIN",...) { + write_parquet( + df, tmp, + encoding = encoding, + options = parquet_options(num_rows_per_row_group = 5), + ... + ) + expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp))) + mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]]) + list( + as_int64(mtd[["min_value"]]), + as_int64(mtd[["max_value"]]), + mtd[["is_min_value_exact"]], + mtd[["is_max_value_exact"]] + ) + } + expect_snapshot(do(compression = "snappy")) + expect_snapshot(do(compression = "uncompressed")) +return() + # dictionary + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy")) + expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed")) +})