diff --git a/NEWS.md b/NEWS.md index 52b3ca4..1c07a5d 100644 --- a/NEWS.md +++ b/NEWS.md @@ -21,6 +21,10 @@ `write_parquet()`, to specify how the columns of a data frame should be mapped to Parquet types. + - `write_parquet()` can now write multiple row groups. By default it puts + at most 10 million rows into a single row group. You can choose the + row groups manually with the `row_groups` argument. + - Newly supported type conversions in `write_parquet()` via the schema argument: diff --git a/R/options.R b/R/options.R index b0f23dd..bff9c57 100644 --- a/R/options.R +++ b/R/options.R @@ -5,6 +5,9 @@ #' in [read_parquet()]. By default nanoparquet adds the `"tbl"` class, #' so data frames are printed differently if the pillar package is #' loaded. +#' @param num_rows_per_row_group The number of rows to put into a row +#' group, if row groups are not specified explicitly. It should be +#' an integer scalar. Defaults to 10 million. #' @param use_arrow_metadata `TRUE` or `FALSE`. If `TRUE`, then #' [read_parquet()] and [read_parquet_schema()] will make use of the Apache #' Arrow metadata to assign R classes to Parquet columns. @@ -37,6 +40,7 @@ parquet_options <- function( class = getOption("nanoparquet.class", "tbl"), + num_rows_per_row_group = getOption("nanoparquet.num_rows_per_row_group", 10000000L), use_arrow_metadata = getOption("nanoparquet.use_arrow_metadata", TRUE), write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE), write_data_page_version = getOption("nanoparquet.write_data_page_version", 1L) @@ -50,9 +54,14 @@ parquet_options <- function( identical(write_data_page_version, 1L) || identical(write_data_page_version, 2L) ) + num_rows_per_row_group <- as_count( + num_rows_per_row_group, + "num_rows_per_row_group" + ) list( class = class, + num_rows_per_row_group = num_rows_per_row_group, use_arrow_metadata = use_arrow_metadata, write_arrow_metadata = write_arrow_metadata, write_data_page_version = as.integer(write_data_page_version) diff --git a/R/porcelain.R b/R/porcelain.R index 0dcecf5..d951a93 100644 --- a/R/porcelain.R +++ b/R/porcelain.R @@ -241,8 +241,8 @@ dict_encode <- function(x, n = length(x)) { .Call(nanoparquet_create_dict, x, n) } -dict_encode_idx <- function(x) { - .Call(nanoparquet_create_dict_idx, x, sys.call()) +dict_encode_idx <- function(x, from = 1L, until = length(x) + 1L ) { + .Call(nanoparquet_create_dict_idx, x, from - 1L, until - 1L, sys.call()) } lgl_avg_run_length <- function(x, n = length(x)) { diff --git a/R/utils.R b/R/utils.R index 8bb3aad..6b7bb20 100644 --- a/R/utils.R +++ b/R/utils.R @@ -33,6 +33,21 @@ is_string <- function(x) { is.character(x) && length(x) == 1 && !is.na(x) } +is_icount <- function(x) { + is.integer(x) && length(x) == 1 && !is.na(x) && x >= 1L +} + +is_dcount <- function(x) { + is.double(x) && length(x) == 1 && !is.na(x) && as.integer(x) == x && + x >= 1 +} + +as_count <- function(x, name = "x") { + if (is_icount(x)) return(x) + if (is_dcount(x)) return(as.integer(x)) + stop(name, " must be a count, i.e. an integer scalar") +} + is_uint32 <- function(x) { is.numeric(x) && length(x) == 1 && !is.na(x) && round(x) == x && x >= 0 && x <= 4294967295 diff --git a/R/write-parquet.R b/R/write-parquet.R index fbcb1b6..7410656 100644 --- a/R/write-parquet.R +++ b/R/write-parquet.R @@ -44,6 +44,12 @@ #' @param metadata Additional key-value metadata to add to the file. #' This must be a named character vector, or a data frame with columns #' character columns called `key` and `value`. +#' @param row_groups Row groups of the Parquet file. If `NULL`, and `x` +#' is a grouped data frame, then the groups are used as row groups. +#' The rows will be reordered to match groups. If `NULL`, and `x` is +#' not a grouped data frame, then the `num_rows_per_row_group` option is +#' used from the `options` argument, see [parquet_options()]. Otherwise +#' it must be an integer vector, specifying the starts of the row groups. #' @param options Nanoparquet options, see [parquet_options()]. #' @return `NULL`, unless `file` is `":raw:"`, in which case the Parquet #' file is returned as a raw vector. @@ -62,6 +68,7 @@ write_parquet <- function( compression = c("snappy", "gzip", "zstd", "uncompressed"), encoding = NULL, metadata = NULL, + row_groups = NULL, options = parquet_options()) { file <- path.expand(file) @@ -148,6 +155,14 @@ write_parquet <- function( encoding <- parse_encoding(encoding, x) +row_groups <- row_groups %||% + attr(x, "groups") %||% + default_row_groups(x, schema, compression, encoding, options) + +row_group_starts <- parse_row_groups(x, row_groups) +x <- row_group_starts[[1]] +row_group_starts <- row_group_starts[[2]] + res <- .Call( nanoparquet_write, x, @@ -159,6 +174,7 @@ res <- .Call( options, schema, encodings[encoding], + row_group_starts, sys.call() ) @@ -201,3 +217,31 @@ parse_encoding <- function(encoding, x) { structure(encoding, names = names(x)) } } + +# we should refine this later +default_row_groups <- function(x, schema, compression, encoding, options) { + n <- options[["num_rows_per_row_group"]] + seq(1L, nrow(x), by = n) +} + +parse_row_groups <- function(x, rg) { + if (is.data.frame(rg)) { + rg <- rg[[".rows"]] + urg <- unlist(rg) + if (any(diff(urg) != 1)) { + message("Ordering data frame according to row groups.") + x <- x[urg, ] + } + rg <- c(1L, cumsum(lengths(rg))) + rg <- rg[-length(rg)] + } else { + if (!is.integer(rg) || anyNA(rg) || any(rg <= 0) || + any(diff(rg) <= 0) || rg[1] != 1L) { + stop( + "Row groups must be specified as a growing positive integer ", + "vector, starting with 1." + ) + } + } + list(x = x, row_groups = rg) +} diff --git a/man/parquet_options.Rd b/man/parquet_options.Rd index 79ad82a..f79a011 100644 --- a/man/parquet_options.Rd +++ b/man/parquet_options.Rd @@ -6,6 +6,7 @@ \usage{ parquet_options( class = getOption("nanoparquet.class", "tbl"), + num_rows_per_row_group = getOption("nanoparquet.num_rows_per_row_group", 10000000L), use_arrow_metadata = getOption("nanoparquet.use_arrow_metadata", TRUE), write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE), write_data_page_version = getOption("nanoparquet.write_data_page_version", 1L) @@ -17,6 +18,10 @@ in \code{\link[=read_parquet]{read_parquet()}}. By default nanoparquet adds the so data frames are printed differently if the pillar package is loaded.} +\item{num_rows_per_row_group}{The number of rows to put into a row +group, if row groups are not specified explicitly. It should be +an integer scalar. Defaults to 10 million.} + \item{use_arrow_metadata}{\code{TRUE} or \code{FALSE}. If \code{TRUE}, then \code{\link[=read_parquet]{read_parquet()}} and \code{\link[=read_parquet_schema]{read_parquet_schema()}} will make use of the Apache Arrow metadata to assign R classes to Parquet columns. diff --git a/man/write_parquet.Rd b/man/write_parquet.Rd index ff11067..fb3ec62 100644 --- a/man/write_parquet.Rd +++ b/man/write_parquet.Rd @@ -11,6 +11,7 @@ write_parquet( compression = c("snappy", "gzip", "zstd", "uncompressed"), encoding = NULL, metadata = NULL, + row_groups = NULL, options = parquet_options() ) } @@ -62,6 +63,13 @@ See \link{parquet-encodings} for more about encodings.} This must be a named character vector, or a data frame with columns character columns called \code{key} and \code{value}.} +\item{row_groups}{Row groups of the Parquet file. If \code{NULL}, and \code{x} +is a grouped data frame, then the groups are used as row groups. +The rows will be reordered to match groups. If \code{NULL}, and \code{x} is +not a grouped data frame, then the \code{num_rows_per_row_group} option is +used from the \code{options} argument, see \code{\link[=parquet_options]{parquet_options()}}. Otherwise +it must be an integer vector, specifying the starts of the row groups.} + \item{options}{Nanoparquet options, see \code{\link[=parquet_options]{parquet_options()}}.} } \value{ diff --git a/src/dictionary-encoding.cpp b/src/dictionary-encoding.cpp index fc355be..e2edb03 100644 --- a/src/dictionary-encoding.cpp +++ b/src/dictionary-encoding.cpp @@ -175,8 +175,11 @@ SEXP nanoparquet_create_dict(SEXP x, SEXP rlen) { return Rf_ScalarInteger(dictlen); } -SEXP nanoparquet_create_dict_idx_(SEXP x) { - R_xlen_t dictlen, len = Rf_xlength(x); +SEXP nanoparquet_create_dict_idx_(SEXP x, SEXP from, SEXP until) { + int64_t cfrom = INTEGER(from)[0]; + int64_t cuntil = INTEGER(until)[0]; + int64_t len = cuntil - cfrom; + R_xlen_t dictlen; SEXP idx = PROTECT(Rf_allocVector(INTSXP, len)); SEXP dict = PROTECT(Rf_allocVector(INTSXP, len)); @@ -184,16 +187,16 @@ SEXP nanoparquet_create_dict_idx_(SEXP x) { int *iidx = INTEGER(idx); switch (TYPEOF(x)) { case LGLSXP: - dictlen = create_dict_idx(LOGICAL(x), iidx, idict, len, NA_LOGICAL); + dictlen = create_dict_idx(LOGICAL(x) + cfrom, iidx, idict, len, NA_LOGICAL); break; case INTSXP: - dictlen = create_dict_idx(INTEGER(x), idict, iidx, len, NA_INTEGER); + dictlen = create_dict_idx(INTEGER(x) + cfrom, idict, iidx, len, NA_INTEGER); break; case REALSXP: - dictlen = create_dict_real_idx(REAL(x), idict, iidx, len); + dictlen = create_dict_real_idx(REAL(x) + cfrom, idict, iidx, len); break; case STRSXP: { - dictlen = create_dict_ptr_idx((void**)STRING_PTR_RO(x),idict, iidx, len, (void*) NA_STRING); + dictlen = create_dict_ptr_idx((void**)(STRING_PTR_RO(x) + cfrom), idict, iidx, len, (void*) NA_STRING); break; } default: @@ -213,19 +216,32 @@ SEXP nanoparquet_create_dict_idx_(SEXP x) { return res; } +struct nanoparquet_create_dict_idx_data { + SEXP data; + SEXP from; + SEXP until; +}; + inline SEXP nanoparquet_create_dict_idx_wrapper(void *data) { - SEXP x = (SEXP) data; - return nanoparquet_create_dict_idx_(x); + struct nanoparquet_create_dict_idx_data *rdata = + (struct nanoparquet_create_dict_idx_data*) data; + return nanoparquet_create_dict_idx_( + rdata->data, + rdata->from, + rdata->until + ); } -SEXP nanoparquet_create_dict_idx(SEXP x, SEXP call) { +SEXP nanoparquet_create_dict_idx(SEXP x, SEXP from, SEXP until, SEXP call) { + + struct nanoparquet_create_dict_idx_data data = { x, from, until }; SEXP uwt = PROTECT(R_MakeUnwindCont()); R_API_START(call); SEXP ret = R_UnwindProtect( nanoparquet_create_dict_idx_wrapper, - (void*) x, + &data, throw_error, &uwt, uwt diff --git a/src/lib/ParquetOutFile.cpp b/src/lib/ParquetOutFile.cpp index cff7b8e..be9006e 100644 --- a/src/lib/ParquetOutFile.cpp +++ b/src/lib/ParquetOutFile.cpp @@ -34,9 +34,11 @@ static string type_to_string(Type::type t) { ParquetOutFile::ParquetOutFile( std::string filename, - parquet::CompressionCodec::type codec) : + parquet::CompressionCodec::type codec, + vector &row_group_starts) : pfile(pfile_), num_rows(0), num_cols(0), num_rows_set(false), - codec(codec), mem_buffer(new TMemoryBuffer(1024 * 1024)), // 1MB, what if not enough? + codec(codec), row_group_starts(row_group_starts), + mem_buffer(new TMemoryBuffer(1024 * 1024)), // 1MB, what if not enough? tproto(tproto_factory.getProtocol(mem_buffer)) { // open file @@ -51,9 +53,11 @@ ParquetOutFile::ParquetOutFile( ParquetOutFile::ParquetOutFile( std::ostream &stream, - parquet::CompressionCodec::type codec) : + parquet::CompressionCodec::type codec, + vector &row_group_starts) : pfile(stream), num_rows(0), num_cols(0), num_rows_set(false), - codec(codec), mem_buffer(new TMemoryBuffer(1024 * 1024)), // 1MB, what if not enough? + codec(codec), row_group_starts(row_group_starts), + mem_buffer(new TMemoryBuffer(1024 * 1024)), // 1MB, what if not enough? tproto(tproto_factory.getProtocol(mem_buffer)) { // root schema element @@ -72,38 +76,44 @@ void ParquetOutFile::schema_add_column(parquet::SchemaElement &sel, parquet::Encoding::type encoding) { schemas.push_back(sel); schemas[0].__set_num_children(schemas[0].num_children + 1); - - ColumnMetaData cmd; - cmd.__set_type(sel.type); - vector encs; - if (sel.repetition_type != parquet::FieldRepetitionType::REQUIRED && - encoding != Encoding::RLE) { - // def levels, but do not duplicate - encs.push_back(Encoding::RLE); - } - if (encoding == Encoding::RLE_DICTIONARY || - encoding == Encoding::PLAIN_DICTIONARY) { - // dictionary values - encs.push_back(Encoding::PLAIN); - } - encs.push_back(encoding); encodings.push_back(encoding); - - cmd.__set_encodings(encs); - vector paths; - paths.push_back(sel.name); - cmd.__set_path_in_schema(paths); - cmd.__set_codec(codec); - // num_values set later - // total_uncompressed_size set later - // total_compressed_size set later - // data_page_offset set later - // dictionary_page_offset set later when we have dictionaries - column_meta_data.push_back(cmd); - num_cols++; } +void ParquetOutFile::init_column_meta_data() { + column_meta_data.clear(); + for (uint32_t cl = 0; cl < schemas.size() - 1; cl++) { + parquet::SchemaElement &sel = schemas[cl + 1]; + parquet::Encoding::type encoding = encodings[cl]; + ColumnMetaData cmd; + cmd.__set_type(sel.type); + vector encs; + if (sel.repetition_type != parquet::FieldRepetitionType::REQUIRED && + encoding != Encoding::RLE) { + // def levels, but do not duplicate + encs.push_back(Encoding::RLE); + } + if (encoding == Encoding::RLE_DICTIONARY || + encoding == Encoding::PLAIN_DICTIONARY) { + // dictionary values + encs.push_back(Encoding::PLAIN); + } + encs.push_back(encoding); + + cmd.__set_encodings(encs); + vector paths; + paths.push_back(sel.name); + cmd.__set_path_in_schema(paths); + cmd.__set_codec(codec); + // num_values set later + // total_uncompressed_size set later + // total_compressed_size set later + // data_page_offset set later + // dictionary_page_offset set later when we have dictionaries + column_meta_data.push_back(cmd); + } +} + void ParquetOutFile::add_key_value_metadata( std::string key, std::string value) { KeyValue kv0; @@ -339,11 +349,13 @@ void ParquetOutFile::write_dictionary_( std::ostream &file, uint32_t idx, uint32_t size, - parquet::SchemaElement &sel) { + parquet::SchemaElement &sel, + int64_t from, + int64_t until) { ColumnMetaData *cmd = &(column_meta_data[idx]); uint32_t start = file.tellp(); - write_dictionary(file, idx, sel); + write_dictionary(file, idx, sel, from, until); uint32_t end = file.tellp(); if (end - start != size) { throw runtime_error( @@ -360,11 +372,14 @@ void ParquetOutFile::write_dictionary_indices_( std::ostream &file, uint32_t idx, uint32_t size, - uint64_t from, - uint64_t until) { + int64_t rg_from, + int64_t rg_until, + uint64_t page_from, + uint64_t page_until) { streampos start = file.tellp(); - write_dictionary_indices(file, idx, from, until); + write_dictionary_indices(file, idx, rg_from, rg_until, + page_from, page_until); streampos end = file.tellp(); if (end - start != size) { throw runtime_error( @@ -467,22 +482,45 @@ void ParquetOutFile::write() { throw runtime_error("Need to set the number of rows before writing"); // # nocov } pfile.write("PAR1", 4); - write_columns(); + for (int idx = 0; idx < row_group_starts.size(); idx++) { + // init for row group + init_column_meta_data(); + + // write + int64_t from = row_group_starts[idx]; + int64_t until = idx < row_group_starts.size() - 1 ? row_group_starts[idx + 1] : num_rows; + int64_t total_size = write_columns(from, until); + + // row group metadata + vector ccs; + for (uint32_t idx = 0; idx < num_cols; idx++) { + ColumnChunk cc; + cc.__set_file_offset(column_meta_data[idx].data_page_offset); + cc.__set_meta_data(column_meta_data[idx]); + ccs.push_back(cc); + } + RowGroup rg; + rg.__set_num_rows(until - from); + rg.__set_total_byte_size(total_size); + rg.__set_columns(ccs); + row_groups.push_back(rg); + } write_footer(); pfile.write("PAR1", 4); pfile_.close(); } -void ParquetOutFile::write_columns() { +int64_t ParquetOutFile::write_columns(int64_t from, int64_t until) { uint32_t start = pfile.tellp(); for (uint32_t idx = 0; idx < num_cols; idx++) { - write_column(idx); + write_column(idx, from, until); } uint32_t end = pfile.tellp(); - total_size = end - start; + // return total size + return end - start; } -void ParquetOutFile::write_column(uint32_t idx) { +void ParquetOutFile::write_column(uint32_t idx, int64_t from, int64_t until) { ColumnMetaData *cmd = &(column_meta_data[idx]); SchemaElement se = schemas[idx + 1]; uint32_t col_start = pfile.tellp(); @@ -490,13 +528,13 @@ void ParquetOutFile::write_column(uint32_t idx) { cmd->__set_total_uncompressed_size(0); if (encodings[idx] == Encoding::RLE_DICTIONARY) { uint32_t dictionary_page_offset = pfile.tellp(); - write_dictionary_page(idx); + write_dictionary_page(idx, from, until); cmd->__set_dictionary_page_offset(dictionary_page_offset); } uint32_t data_offset = pfile.tellp(); - write_data_pages(idx); + write_data_pages(idx, from, until); int32_t column_bytes = ((int32_t) pfile.tellp()) - col_start; - cmd->__set_num_values(num_rows); + cmd->__set_num_values(until - from); cmd->__set_total_compressed_size(column_bytes); cmd->__set_data_page_offset(data_offset); } @@ -516,13 +554,14 @@ void ParquetOutFile::write_page_header(uint32_t idx, PageHeader &ph) { // Currently only for byte arrays, more later -void ParquetOutFile::write_dictionary_page(uint32_t idx) { +void ParquetOutFile::write_dictionary_page(uint32_t idx, int64_t from, + int64_t until) { ColumnMetaData *cmd = &(column_meta_data[idx]); SchemaElement se = schemas[idx + 1]; // Uncompresed size of the dictionary in bytes - uint32_t dict_size = get_size_dictionary(idx, se); + uint32_t dict_size = get_size_dictionary(idx, se, from, until); // Number of entries in the dicitonary - uint32_t num_dict_values = get_num_values_dictionary(idx); + uint32_t num_dict_values = get_num_values_dictionary(idx, from, until); // Init page header PageHeader ph; @@ -539,7 +578,7 @@ void ParquetOutFile::write_dictionary_page(uint32_t idx) { // then the data directly to the file ph.__set_compressed_page_size(dict_size); write_page_header(idx, ph); - write_dictionary_(pfile, idx, dict_size, se); + write_dictionary_(pfile, idx, dict_size, se, from, until); } else { // With compression we need two temporary buffers @@ -547,7 +586,7 @@ void ParquetOutFile::write_dictionary_page(uint32_t idx) { buf_unc.reset(dict_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_(*os0, idx, dict_size, se); + write_dictionary_(*os0, idx, dict_size, se, from, until); // 2. compress buf_unc to buf_com size_t cdict_size = compress(cmd->codec, buf_unc, dict_size, buf_com); @@ -559,18 +598,20 @@ void ParquetOutFile::write_dictionary_page(uint32_t idx) { } } -void ParquetOutFile::write_data_pages(uint32_t idx) { +void ParquetOutFile::write_data_pages(uint32_t idx, int64_t from, + int64_t until) { SchemaElement se = schemas[idx + 1]; + int64_t rg_num_rows = until - from; // guess total size and decide on number of pages uint64_t total_size; if (encodings[idx] == Encoding::PLAIN) { - total_size = calculate_column_data_size(idx, num_rows, 0, num_rows); + total_size = calculate_column_data_size(idx, rg_num_rows, from, until); } else { // estimate the max RLE length - uint32_t num_values = get_num_values_dictionary(idx); + uint32_t num_values = get_num_values_dictionary(idx, from, until); uint8_t bit_width = ceil(log2((double) num_values)); - total_size = MaxRleBpSizeSimple(num_rows, bit_width); + total_size = MaxRleBpSizeSimple(rg_num_rows, bit_width); } uint32_t page_size = 1024 * 1024; @@ -592,31 +633,33 @@ void ParquetOutFile::write_data_pages(uint32_t idx) { num_pages = 1; } - uint32_t rows_per_page = num_rows / num_pages + (num_rows % num_pages ? 1 : 0); + uint32_t rows_per_page = rg_num_rows / num_pages + (rg_num_rows % num_pages ? 1 : 0); if (rows_per_page == 0) { rows_per_page = 1; } for (auto i = 0; i < num_pages; i++) { - uint64_t from = i * rows_per_page; - uint64_t until = (i + 1) * rows_per_page; - if (until > num_rows) { - until = num_rows; + uint64_t page_from = from + i * rows_per_page; + uint64_t page_until = from + (i + 1) * rows_per_page; + if (page_until > until) { + page_until = until; } - write_data_page(idx, from, until); + write_data_page(idx, from, until, page_from, page_until); } } -void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, - uint64_t until) { +void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, + int64_t rg_until, uint64_t page_from, + uint64_t page_until) { ColumnMetaData *cmd = &(column_meta_data[idx]); SchemaElement se = schemas[idx + 1]; PageHeader ph; DataPageHeaderV2 dph2; + uint32_t page_num_values = page_until - page_from; if (data_page_version == 1) { ph.__set_type(PageType::DATA_PAGE); DataPageHeader dph; - dph.__set_num_values(until - from); + dph.__set_num_values(page_num_values); dph.__set_encoding(encodings[idx]); if (se.repetition_type == FieldRepetitionType::OPTIONAL) { dph.__set_definition_level_encoding(Encoding::RLE); @@ -625,8 +668,8 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, ph.__set_data_page_header(dph); } else if (data_page_version == 2) { ph.__set_type(PageType::DATA_PAGE_V2); - dph2.__set_num_values(until - from); - dph2.__set_num_rows(until - from); + dph2.__set_num_values(page_num_values); + dph2.__set_num_rows(page_num_values); dph2.__set_encoding(encodings[idx]); // these might be overwritten later if there are NAs dph2.__set_num_nulls(0); @@ -644,7 +687,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, // CASE 1: REQ, PLAIN, UNC // 1. write directly to file uint32_t data_size = calculate_column_data_size( - idx, until - from, from, until + idx, page_num_values, page_from, page_until ); ph.__set_uncompressed_page_size(data_size); ph.__set_compressed_page_size(data_size); @@ -652,7 +695,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, ph.__set_data_page_header_v2(dph2); } write_page_header(idx, ph); - write_data_(pfile, idx, data_size, from, until); + write_data_(pfile, idx, data_size, page_from, page_until); } else if (se.repetition_type == FieldRepetitionType::REQUIRED && encodings[idx] == Encoding::PLAIN && @@ -660,13 +703,13 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, // CASE 2: REQ, PLAIN, COMP // 1. write data to buf_unc uint32_t data_size = calculate_column_data_size( - idx, until - from, from, until + idx, page_num_values, page_from, page_until ); ph.__set_uncompressed_page_size(data_size); buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_data_(*os0, idx, data_size, from, until); + write_data_(*os0, idx, data_size, page_from, page_until); // 2. compress buf_unc to buf_com size_t cdata_size = compress(cmd->codec, buf_unc, data_size, buf_com); @@ -684,18 +727,19 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec == CompressionCodec::UNCOMPRESSED) { // CASE 3: REQ, RLE_DICT, UNC // 1. write dictionary indices to buf_unc - uint32_t data_size = (until - from) * sizeof(int); + uint32_t data_size = (page_num_values) * sizeof(int); buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_indices_(*os0, idx, data_size, from, until); + write_dictionary_indices_(*os0, idx, data_size, rg_from, rg_until, + page_from, page_until); // 2. RLE encode buf_unc to buf_com - uint32_t num_dict_values = get_num_values_dictionary(idx); + uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); uint8_t bit_width = ceil(log2((double) num_dict_values)); uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, bit_width, true @@ -718,19 +762,20 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec != CompressionCodec::UNCOMPRESSED) { // CASE 4: REQ, RLE_DICT, COMP // 1. write dictionary indices to buf_unc - uint32_t data_size = (until - from) * sizeof(int); + uint32_t data_size = page_num_values * sizeof(int); ph.__set_uncompressed_page_size(data_size); buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_indices_(*os0, idx, data_size, from, until); + write_dictionary_indices_(*os0, idx, data_size, rg_from, rg_until, + page_from, page_until); // 2. RLE encode buf_unc to buf_com - uint32_t num_dict_values = get_num_values_dictionary(idx); + uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); uint8_t bit_width = ceil(log2((double) num_dict_values)); uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, bit_width, true, // add_bit_width @@ -757,24 +802,24 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec == CompressionCodec::UNCOMPRESSED) { // CASE 5: OPT, PLAIN, UNC // 1. write definition levels to buf_unc - uint32_t miss_size = (until - from) * sizeof(int); + uint32_t miss_size = page_num_values * sizeof(int); buf_unc.reset(miss_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - uint32_t num_present = write_present(*os0, idx, from, until); + uint32_t num_present = write_present(*os0, idx, page_from, page_until); // 2. RLE buf_unc to buf_com - uint32_t rle_size = rle_encode(buf_unc, until - from, buf_com, 1, false); + uint32_t rle_size = rle_encode(buf_unc, page_num_values, buf_com, 1, false); // 3. Write buf_unc to file uint32_t data_size = calculate_column_data_size( - idx, num_present, from, until + idx, num_present, page_from, page_until ); int prep_length = data_page_version == 1 ? 4 : 0; ph.__set_uncompressed_page_size(data_size + rle_size + prep_length); ph.__set_compressed_page_size(data_size + rle_size + prep_length); if (data_page_version == 2) { - dph2.__set_num_nulls(until - from - num_present); + dph2.__set_num_nulls(page_num_values - num_present); dph2.__set_definition_levels_byte_length(rle_size); ph.__set_data_page_header_v2(dph2); } @@ -788,23 +833,23 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, ); // 4. write data to file - write_present_data_(pfile, idx, data_size, num_present, from, until); + write_present_data_(pfile, idx, data_size, num_present, page_from, page_until); } else if (se.repetition_type == FieldRepetitionType::OPTIONAL && encodings[idx] == Encoding::PLAIN && cmd->codec != CompressionCodec::UNCOMPRESSED) { // CASE 6: OPT, PLAIN, COMP // 1. write definition levels to buf_unc - uint32_t miss_size = (until - from) * sizeof(int); + uint32_t miss_size = page_num_values * sizeof(int); buf_unc.reset(miss_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - uint32_t num_present = write_present(*os0, idx, from, until); + uint32_t num_present = write_present(*os0, idx, page_from, page_until); // 2. RLE buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -813,13 +858,13 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, // 3. Append data to buf_com uint32_t data_size = calculate_column_data_size( - idx, num_present, from, until + idx, num_present, page_from, page_until ); buf_com.resize(rle_size + data_size, true); std::unique_ptr os1 = std::unique_ptr(new std::ostream(&buf_com)); buf_com.skip(rle_size); - write_present_data_(*os1, idx, data_size, num_present, from, until); + write_present_data_(*os1, idx, data_size, num_present, page_from, page_until); // 4. compress buf_com to buf_unc // for data page v2, the def levels are not compressed! @@ -830,7 +875,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, ph.__set_uncompressed_page_size(rle_size + data_size); ph.__set_compressed_page_size(comp_size); if (data_page_version == 2) { - dph2.__set_num_nulls(until - from - num_present); + dph2.__set_num_nulls(page_num_values - num_present); dph2.__set_definition_levels_byte_length(rle_size); ph.__set_data_page_header_v2(dph2); } @@ -845,16 +890,16 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec == CompressionCodec::UNCOMPRESSED) { // CASE 7: OPT RLE_DICT UNC // 1. write definition levels to buf_unc - uint32_t miss_size = (until - from) * sizeof(int); + uint32_t miss_size = page_num_values * sizeof(int); buf_unc.reset(miss_size); std::unique_ptr os1 = std::unique_ptr(new std::ostream(&buf_unc)); - uint32_t num_present = write_present(*os1, idx, from, until); + uint32_t num_present = write_present(*os1, idx, page_from, page_until); // 2. RLE buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -866,10 +911,11 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_indices_(*os0, idx, data_size, from, until); + write_dictionary_indices_(*os0, idx, data_size, rg_from, rg_until, + page_from, page_until); // 4. append RLE buf_unc to buf_com - uint32_t num_dict_values = get_num_values_dictionary(idx); + uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); uint8_t bit_width = ceil(log2((double) num_dict_values)); uint32_t rle2_size = rle_encode( buf_unc, @@ -885,7 +931,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, ph.__set_uncompressed_page_size(rle2_size); ph.__set_compressed_page_size(rle2_size); if (data_page_version == 2) { - dph2.__set_num_nulls(until - from - num_present); + dph2.__set_num_nulls(page_num_values - num_present); dph2.__set_definition_levels_byte_length(rle_size); ph.__set_data_page_header_v2(dph2); } @@ -900,16 +946,16 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec != CompressionCodec::UNCOMPRESSED) { // CASE 8: OPT RLE_DICT COM // 1. write definition levels to buf_unc - uint32_t miss_size = (until - from) * sizeof(int); + uint32_t miss_size = page_num_values * sizeof(int); buf_unc.reset(miss_size); std::unique_ptr os1 = std::unique_ptr(new std::ostream(&buf_unc)); - uint32_t num_present = write_present(*os1, idx, from, until); + uint32_t num_present = write_present(*os1, idx, page_from, page_until); // 2. RLE buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -921,10 +967,11 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_indices_(*os0, idx, data_size, from, until); + write_dictionary_indices_(*os0, idx, data_size, rg_from, rg_until, + page_from, page_until); // 4. append RLE buf_unc to buf_com - uint32_t num_dict_values = get_num_values_dictionary(idx); + uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); uint8_t bit_width = ceil(log2((double) num_dict_values)); uint32_t rle2_size = rle_encode( buf_unc, @@ -944,7 +991,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, ph.__set_uncompressed_page_size(rle2_size); ph.__set_compressed_page_size(crle2_size); if (data_page_version == 2) { - dph2.__set_num_nulls(until - from - num_present); + dph2.__set_num_nulls(page_num_values - num_present); dph2.__set_definition_levels_byte_length(rle_size); ph.__set_data_page_header_v2(dph2); } @@ -959,16 +1006,16 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec == CompressionCodec::UNCOMPRESSED) { // CASE 9: REQ, RLE, UNCOMP // 1. write logicals into buf_unc - uint32_t data_size = (until - from) * sizeof(int); + uint32_t data_size = page_num_values * sizeof(int); buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_boolean_as_int(*os0, idx, from, until); + write_boolean_as_int(*os0, idx, page_from, page_until); // 2. RLE encode buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -989,16 +1036,16 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec != CompressionCodec::UNCOMPRESSED) { // CASE 10: REQ, RLE, COMP // 1. write logicals into buf_unc - uint32_t data_size = (until - from) * sizeof(int); + uint32_t data_size = page_num_values * sizeof(int); buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_boolean_as_int(*os0, idx, from, until); + write_boolean_as_int(*os0, idx, page_from, page_until); // 2. RLE encode buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -1022,16 +1069,16 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec == CompressionCodec::UNCOMPRESSED) { // CASE 11: OPT RLE UNC // 1. write definition levels to buf_unc - uint32_t miss_size = (until - from) * sizeof(int); + uint32_t miss_size = page_num_values * sizeof(int); buf_unc.reset(miss_size); std::unique_ptr os1 = std::unique_ptr(new std::ostream(&buf_unc)); - uint32_t num_present = write_present(*os1, idx, from, until); + uint32_t num_present = write_present(*os1, idx, page_from, page_until); // 2. RLE buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -1043,7 +1090,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_present_boolean_as_int(*os0, idx, num_present, from, until); + write_present_boolean_as_int(*os0, idx, num_present, page_from, page_until); // 4. append RLE buf_unc to buf_com uint32_t rle2_size = rle_encode( @@ -1070,16 +1117,16 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec != CompressionCodec::UNCOMPRESSED) { // CASE 12: OPT RLE COMP // 1. write definition levels to buf_unc - uint32_t miss_size = (until - from) * sizeof(int); + uint32_t miss_size = page_num_values * sizeof(int); buf_unc.reset(miss_size); std::unique_ptr os1 = std::unique_ptr(new std::ostream(&buf_unc)); - uint32_t num_present = write_present(*os1, idx, from, until); + uint32_t num_present = write_present(*os1, idx, page_from, page_until); // 2. RLE buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -1091,7 +1138,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_present_boolean_as_int(*os0, idx, num_present, from, until); + write_present_boolean_as_int(*os0, idx, num_present, page_from, page_until); // 4. append RLE buf_unc to buf_com uint32_t rle2_size = rle_encode( @@ -1158,26 +1205,11 @@ uint64_t ParquetOutFile::calculate_column_data_size(uint32_t idx, } void ParquetOutFile::write_footer() { - vector ccs; - for (uint32_t idx = 0; idx < num_cols; idx++) { - ColumnChunk cc; - cc.__set_file_offset(column_meta_data[idx].data_page_offset); - cc.__set_meta_data(column_meta_data[idx]); - ccs.push_back(cc); - } - - vector rgs; - RowGroup rg; - rg.__set_num_rows(num_rows); - rg.__set_total_byte_size(total_size); - rg.__set_columns(ccs); - rgs.push_back(rg); - FileMetaData fmd; fmd.__set_version(1); fmd.__set_schema(schemas); fmd.__set_num_rows(num_rows); - fmd.__set_row_groups(rgs); + fmd.__set_row_groups(row_groups); fmd.__set_key_value_metadata(kv); fmd.__set_created_by("https://github.com/gaborcsardi/nanoparquet"); fmd.write(tproto.get()); diff --git a/src/lib/ParquetOutFile.h b/src/lib/ParquetOutFile.h index aef0e4c..494bc6f 100644 Binary files a/src/lib/ParquetOutFile.h and b/src/lib/ParquetOutFile.h differ diff --git a/src/rwrapper.cpp b/src/rwrapper.cpp index f3d72ef..4e4943b 100644 --- a/src/rwrapper.cpp +++ b/src/rwrapper.cpp @@ -15,6 +15,7 @@ SEXP nanoparquet_write( SEXP options, SEXP schema, SEXP encoding, + SEXP row_group_starts, SEXP mycall ); SEXP nanoparquet_map_to_parquet_types(SEXP df, SEXP options); @@ -40,7 +41,7 @@ SEXP nanoparquet_unpack_bits_int32(SEXP x, SEXP bit_width, SEXP n); SEXP nanoparquet_pack_bits_int32(SEXP x, SEXP bit_width); SEXP nanoparquet_create_dict(SEXP x, SEXP l); -SEXP nanoparquet_create_dict_idx(SEXP x, SEXP call); +SEXP nanoparquet_create_dict_idx(SEXP x, SEXP from, SEXP until, SEXP call); SEXP nanoparquet_avg_run_length(SEXP x, SEXP len); SEXP nanoparquet_base64_decode(SEXP x); @@ -89,7 +90,7 @@ SEXP is_ubsan_() { static const R_CallMethodDef R_CallDef[] = { CALLDEF(nanoparquet_read2, 3), - CALLDEF(nanoparquet_write, 10), + CALLDEF(nanoparquet_write, 11), CALLDEF(nanoparquet_map_to_parquet_types, 2), CALLDEF(nanoparquet_logical_to_converted, 1), CALLDEF(nanoparquet_read_metadata, 1), @@ -109,7 +110,7 @@ static const R_CallMethodDef R_CallDef[] = { CALLDEF(nanoparquet_unpack_bits_int32, 3), CALLDEF(nanoparquet_pack_bits_int32, 2), CALLDEF(nanoparquet_create_dict, 2), - CALLDEF(nanoparquet_create_dict_idx, 2), + CALLDEF(nanoparquet_create_dict_idx, 4), CALLDEF(nanoparquet_avg_run_length, 2), CALLDEF(nanoparquet_base64_decode, 1), CALLDEF(nanoparquet_base64_encode, 1), diff --git a/src/write.cpp b/src/write.cpp index 36bc13d..ce3616f 100644 --- a/src/write.cpp +++ b/src/write.cpp @@ -87,7 +87,7 @@ static uint16_t double_to_float16(double x) { extern "C" { SEXP nanoparquet_create_dict(SEXP x, SEXP rlen); -SEXP nanoparquet_create_dict_idx_(SEXP x); +SEXP nanoparquet_create_dict_idx_(SEXP x, SEXP from, SEXP until); SEXP nanoparquet_avg_run_length(SEXP x, SEXP rlen); } @@ -95,11 +95,13 @@ class RParquetOutFile : public ParquetOutFile { public: RParquetOutFile( std::string filename, - parquet::CompressionCodec::type codec + parquet::CompressionCodec::type codec, + std::vector &row_groups ); RParquetOutFile( std::ostream &stream, - parquet::CompressionCodec::type codec + parquet::CompressionCodec::type codec, + std::vector &row_groups ); void write_int32(std::ostream &file, uint32_t idx, uint64_t from, uint64_t until, parquet::SchemaElement &sel); @@ -133,12 +135,16 @@ class RParquetOutFile : public ParquetOutFile { uint64_t until); // for dictionaries - uint32_t get_num_values_dictionary(uint32_t idx); - uint32_t get_size_dictionary(uint32_t idx, parquet::SchemaElement &type); + uint32_t get_num_values_dictionary(uint32_t idx, int64_t form, + int64_t until); + uint32_t get_size_dictionary(uint32_t idx, parquet::SchemaElement &type, + int64_t from, int64_t until); void write_dictionary(std::ostream &file, uint32_t idx, - parquet::SchemaElement &sel); + parquet::SchemaElement &sel, int64_t from, + int64_t until); void write_dictionary_indices(std::ostream &file, uint32_t idx, - uint64_t from, uint64_t until); + int64_t rg_from, int64_t rg_until, + uint64_t page_from, uint64_t page_until); void write( SEXP dfsxp, @@ -154,9 +160,10 @@ class RParquetOutFile : public ParquetOutFile { SEXP df = R_NilValue; SEXP required = R_NilValue; SEXP dicts = R_NilValue; + SEXP dicts_from = R_NilValue; ByteBuffer present; - void create_dictionary(uint32_t idx); + void create_dictionary(uint32_t idx, int64_t from, int64_t until); // for LGLSXP this mean RLE encoding bool should_use_dict_encoding(uint32_t idx); parquet::Encoding::type @@ -165,26 +172,32 @@ class RParquetOutFile : public ParquetOutFile { RParquetOutFile::RParquetOutFile( std::string filename, - parquet::CompressionCodec::type codec) : - ParquetOutFile(filename, codec) { + parquet::CompressionCodec::type codec, + std::vector &row_groups) : + ParquetOutFile(filename, codec, row_groups) { } RParquetOutFile::RParquetOutFile( std::ostream &stream, - parquet::CompressionCodec::type codec -) : ParquetOutFile(stream, codec) { + parquet::CompressionCodec::type codec, + std::vector &row_groups) : + ParquetOutFile(stream, codec, row_groups) { } -void RParquetOutFile::create_dictionary(uint32_t idx) { - // olny do it once - if (!Rf_isNull(VECTOR_ELT(dicts, idx))) { +void RParquetOutFile::create_dictionary(uint32_t idx, int64_t from, + int64_t until) { + if (!Rf_isNull(VECTOR_ELT(dicts, idx)) && + INTEGER(dicts_from)[idx] == from) { return; } SEXP col = VECTOR_ELT(df, idx); - SEXP d = PROTECT(nanoparquet_create_dict_idx_(col)); + SEXP sfrom = PROTECT(Rf_ScalarInteger(from)); + SEXP suntil = PROTECT(Rf_ScalarInteger(until)); + SEXP d = PROTECT(nanoparquet_create_dict_idx_(col, sfrom, suntil)); SET_VECTOR_ELT(dicts, idx, d); - UNPROTECT(1); + INTEGER(dicts_from)[idx] = from; + UNPROTECT(3); } static const char *enc_[] = { @@ -1533,19 +1546,23 @@ void RParquetOutFile::write_present_boolean( } uint32_t RParquetOutFile::get_num_values_dictionary( - uint32_t idx) { + uint32_t idx, + int64_t from, + int64_t until) { SEXP col = VECTOR_ELT(df, idx); if (Rf_inherits(col, "factor")) { return Rf_nlevels(col); } else { - create_dictionary(idx); + create_dictionary(idx, from, until); return Rf_length(VECTOR_ELT(VECTOR_ELT(dicts, idx), 0)); } } uint32_t RParquetOutFile::get_size_dictionary( uint32_t idx, - parquet::SchemaElement &sel) { + parquet::SchemaElement &sel, + int64_t from, + int64_t until) { SEXP col = VECTOR_ELT(df, idx); parquet::Type::type type = sel.type; @@ -1563,7 +1580,7 @@ uint32_t RParquetOutFile::get_size_dictionary( UNPROTECT(1); return size; } else { - create_dictionary(idx); + create_dictionary(idx, from, until); SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); if (type == parquet::Type::INT32) { return Rf_xlength(dictidx) * sizeof(int); @@ -1582,7 +1599,7 @@ uint32_t RParquetOutFile::get_size_dictionary( break; } case REALSXP: { - create_dictionary(idx); + create_dictionary(idx, from, until); SEXP dict = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); if (type == parquet::Type::DOUBLE) { return Rf_xlength(dict) * sizeof(double); @@ -1607,7 +1624,7 @@ uint32_t RParquetOutFile::get_size_dictionary( } case STRSXP: { // need to count the length of the stings that are indexed in dict - create_dictionary(idx); + create_dictionary(idx, from, until); SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); R_xlen_t len = Rf_xlength(dictidx); bool is_uuid = sel.__isset.logicalType && sel.logicalType.__isset.UUID; @@ -1628,7 +1645,7 @@ uint32_t RParquetOutFile::get_size_dictionary( } case LGLSXP: { // this does not happen, no dictionaries for BOOLEAN, makes no sense - create_dictionary(idx); // # nocov + create_dictionary(idx, from, until); // # nocov SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); // # nocov R_xlen_t l = Rf_xlength(dictidx); // # nocov return l / 8 + (l % 8 > 0); // # nocov @@ -1642,7 +1659,9 @@ uint32_t RParquetOutFile::get_size_dictionary( void RParquetOutFile::write_dictionary( std::ostream &file, uint32_t idx, - parquet::SchemaElement &sel) { + parquet::SchemaElement &sel, + int64_t from, + int64_t until) { parquet::Type::type type = sel.type; @@ -1671,7 +1690,7 @@ void RParquetOutFile::write_dictionary( } else { SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); R_xlen_t len = Rf_xlength(dictidx); - int *icol = INTEGER(col); + int *icol = INTEGER(col) + from; int *iidx = INTEGER(dictidx); switch (type) { case parquet::Type::INT32: { @@ -1773,7 +1792,7 @@ void RParquetOutFile::write_dictionary( case REALSXP: { SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); R_xlen_t len = Rf_xlength(dictidx); - double *icol = REAL(col); + double *icol = REAL(col) + from; int *iidx = INTEGER(dictidx); if (Rf_inherits(col, "POSIXct")) { if (type != parquet::Type::INT64) { @@ -2036,7 +2055,7 @@ void RParquetOutFile::write_dictionary( R_xlen_t len = Rf_xlength(dictidx); int *iidx = INTEGER(dictidx); for (uint64_t i = 0; i < len; i++) { - const char *c = CHAR(STRING_ELT(col, iidx[i])); + const char *c = CHAR(STRING_ELT(col, from + iidx[i])); uint32_t len1 = strlen(c); file.write((const char *)&len1, 4); file.write(c, len1); @@ -2051,7 +2070,7 @@ void RParquetOutFile::write_dictionary( R_xlen_t len = Rf_xlength(dictidx); int *iidx = INTEGER(dictidx); for (uint64_t i = 0; i < len; i++) { - const char *c = CHAR(STRING_ELT(col, iidx[i])); + const char *c = CHAR(STRING_ELT(col, from + iidx[i])); if (!parse_uuid(c, u, tmp)) { Rf_errorcall( nanoparquet_call, @@ -2065,7 +2084,7 @@ void RParquetOutFile::write_dictionary( R_xlen_t len = Rf_xlength(dictidx); int *iidx = INTEGER(dictidx); for (uint64_t i = 0; i < len; i++) { - const char *c = CHAR(STRING_ELT(col, iidx[i])); + const char *c = CHAR(STRING_ELT(col, from + iidx[i])); uint32_t len1 = strlen(c); if (len1 != sel.type_length) { Rf_errorcall( @@ -2100,7 +2119,7 @@ void RParquetOutFile::write_dictionary( SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); R_xlen_t len = Rf_xlength(dictidx); SEXP dict = PROTECT(Rf_allocVector(LGLSXP, len)); - int *icol = LOGICAL(col); + int *icol = LOGICAL(col) + from; int *iidx = INTEGER(dictidx); int *idict = LOGICAL(dict); for (auto i = 0; i < len; i++) { @@ -2118,18 +2137,18 @@ void RParquetOutFile::write_dictionary( void RParquetOutFile::write_dictionary_indices( std::ostream &file, uint32_t idx, - uint64_t from, - uint64_t until) { + int64_t rg_from, + int64_t rg_until, + uint64_t page_from, + uint64_t page_until) { + + // Both all of rg_* and page_* are in absolute coordinates SEXP col = VECTOR_ELT(df, idx); - if (until > Rf_xlength(col)) { - Rf_errorcall( // # nocov - nanoparquet_call, // # nocov - "Internal nanoparquet error, row index too large" - ); - } if (TYPEOF(col) == INTSXP && Rf_inherits(col, "factor")) { - for (uint64_t i = from; i < until; i++) { + // there is a single dict for a factor, which is used in all row + // groups, so we use absolute coordinates here + for (uint64_t i = page_from; i < page_until; i++) { int el = INTEGER(col)[i]; if (el != NA_INTEGER) { el--; @@ -2138,7 +2157,10 @@ void RParquetOutFile::write_dictionary_indices( } } else { SEXP dictmap = VECTOR_ELT(VECTOR_ELT(dicts, idx), 1); - for (uint64_t i = from; i < until; i++) { + // there is a separate dict for each row group, so we need to convert + // the absolute page_* coordinates to relative coordinates, starting + // at rg_from + for (uint64_t i = page_from - rg_from; i < page_until - rg_from; i++) { int el = INTEGER(dictmap)[i]; if (el != NA_INTEGER) { file.write((const char *) &el, sizeof(int)); @@ -2379,6 +2401,7 @@ void RParquetOutFile::write( df = dfsxp; required = rrequired; dicts = PROTECT(Rf_allocVector(VECSXP, Rf_length(df))); + dicts_from = PROTECT(Rf_allocVector(INTSXP, Rf_length(df))); SEXP nms = PROTECT(Rf_getAttrib(dfsxp, R_NamesSymbol)); int *type = INTEGER(VECTOR_ELT(schema, 3)); int *type_length = INTEGER(VECTOR_ELT(schema, 4)); @@ -2445,7 +2468,7 @@ void RParquetOutFile::write( ParquetOutFile::write(); - UNPROTECT(2); + UNPROTECT(3); } extern "C" { @@ -2466,7 +2489,8 @@ static SEXP get_list_element(SEXP list, const char *str) { SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression, SEXP metadata, SEXP required, SEXP options, - SEXP schema, SEXP encoding) { + SEXP schema, SEXP encoding, + SEXP row_group_starts) { if (TYPEOF(filesxp) != STRSXP || LENGTH(filesxp) != 1) { Rf_errorcall(nanoparquet_call, @@ -2496,11 +2520,18 @@ SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression, int dp_ver = INTEGER(get_list_element(options, "write_data_page_version"))[0]; + R_xlen_t nrg = Rf_xlength(row_group_starts); + std::vector row_groups(nrg); + for (R_xlen_t i = 0; i < nrg; i++) { + // convert to zero-based + row_groups[i] = INTEGER(row_group_starts)[i] - 1; + } + std::string fname = (char *)CHAR(STRING_ELT(filesxp, 0)); if (fname == ":raw:") { MemStream ms; std::ostream &os = ms.stream(); - RParquetOutFile of(os, codec); + RParquetOutFile of(os, codec, row_groups); of.data_page_version = dp_ver; of.write(dfsxp, dim, metadata, required, options, schema, encoding); R_xlen_t bufsize = ms.size(); @@ -2508,7 +2539,7 @@ SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression, ms.copy(RAW(res), bufsize); return res; } else { - RParquetOutFile of(fname, codec); + RParquetOutFile of(fname, codec, row_groups); of.data_page_version = dp_ver; of.write(dfsxp, dim, metadata, required, options, schema, encoding); return R_NilValue; @@ -2525,6 +2556,7 @@ struct nanoparquet_write_data { SEXP options; SEXP schema; SEXP encoding; + SEXP row_group_starts; }; SEXP nanoparquet_write_wrapped(void *data) { @@ -2539,9 +2571,11 @@ SEXP nanoparquet_write_wrapped(void *data) { SEXP options = rdata->options; SEXP schema = rdata->schema; SEXP encoding = rdata->encoding; + SEXP row_group_starts = rdata->row_group_starts; return nanoparquet_write_(dfsxp, filesxp, dim, compression, metadata, - required, options, schema, encoding); + required, options, schema, encoding, + row_group_starts); } SEXP nanoparquet_write( @@ -2554,11 +2588,12 @@ SEXP nanoparquet_write( SEXP options, SEXP schema, SEXP encoding, + SEXP row_group_starts, SEXP call) { struct nanoparquet_write_data data = { dfsxp, filesxp, dim, compression, metadata, required, options, schema, - encoding + encoding, row_group_starts }; SEXP uwt = PROTECT(R_MakeUnwindCont()); diff --git a/tests/testthat/_snaps/utils.md b/tests/testthat/_snaps/utils.md new file mode 100644 index 0000000..81f617a --- /dev/null +++ b/tests/testthat/_snaps/utils.md @@ -0,0 +1,28 @@ +# as_count + + Code + as_count(1:2) + Condition + Error in `as_count()`: + ! x must be a count, i.e. an integer scalar + Code + as_count(0) + Condition + Error in `as_count()`: + ! x must be a count, i.e. an integer scalar + Code + as_count(NA_real_) + Condition + Error in `as_count()`: + ! x must be a count, i.e. an integer scalar + Code + as_count(-100) + Condition + Error in `as_count()`: + ! x must be a count, i.e. an integer scalar + Code + as_count(-100L) + Condition + Error in `as_count()`: + ! x must be a count, i.e. an integer scalar + diff --git a/tests/testthat/_snaps/write-parquet-4.md b/tests/testthat/_snaps/write-parquet-4.md index 5e3ecb5..8c63e9e 100644 --- a/tests/testthat/_snaps/write-parquet-4.md +++ b/tests/testthat/_snaps/write-parquet-4.md @@ -3,7 +3,7 @@ Code .Call(nanoparquet_write, mtcars, tempfile(), dim(mtcars), 0L, list(character(), character()), rep(FALSE, ncol(mtcars)), options, map_schema_to_df(NULL, mtcars), - rep(10L, ncol(mtcars)), sys.call()) + rep(10L, ncol(mtcars)), 1L, sys.call()) Condition Error: ! Unknown Praquet encoding code: 10 diff --git a/tests/testthat/_snaps/write-parquet-row-groups.md b/tests/testthat/_snaps/write-parquet-row-groups.md new file mode 100644 index 0000000..696e283 --- /dev/null +++ b/tests/testthat/_snaps/write-parquet-row-groups.md @@ -0,0 +1,88 @@ +# errors + + Code + parquet_options(num_rows_per_row_group = "foobar") + Condition + Error in `as_count()`: + ! num_rows_per_row_group must be a count, i.e. an integer scalar + +--- + + Code + write_parquet(df, tmp, row_groups = "foobar") + Condition + Error in `parse_row_groups()`: + ! Row groups must be specified as a growing positive integer vector, starting with 1. + Code + write_parquet(df, tmp, row_groups = c(100L, 1L)) + Condition + Error in `parse_row_groups()`: + ! Row groups must be specified as a growing positive integer vector, starting with 1. + Code + write_parquet(df, tmp, row_groups = c(1L, 100L)) + Condition + Error in `write_parquet()`: + ! Internal nanoparquet error, row index too large + +# grouped df + + Code + write_parquet(df, tmp) + Message + Ordering data frame according to row groups. + +--- + + Code + as.data.frame(read_parquet(tmp)[, c("nam", "cyl")]) + Output + nam cyl + 1 Datsun 710 4 + 2 Merc 240D 4 + 3 Merc 230 4 + 4 Fiat 128 4 + 5 Honda Civic 4 + 6 Toyota Corolla 4 + 7 Toyota Corona 4 + 8 Fiat X1-9 4 + 9 Porsche 914-2 4 + 10 Lotus Europa 4 + 11 Volvo 142E 4 + 12 Mazda RX4 6 + 13 Mazda RX4 Wag 6 + 14 Hornet 4 Drive 6 + 15 Valiant 6 + 16 Merc 280 6 + 17 Merc 280C 6 + 18 Ferrari Dino 6 + 19 Hornet Sportabout 8 + 20 Duster 360 8 + 21 Merc 450SE 8 + 22 Merc 450SL 8 + 23 Merc 450SLC 8 + 24 Cadillac Fleetwood 8 + 25 Lincoln Continental 8 + 26 Chrysler Imperial 8 + 27 Dodge Challenger 8 + 28 AMC Javelin 8 + 29 Camaro Z28 8 + 30 Pontiac Firebird 8 + 31 Ford Pantera L 8 + 32 Maserati Bora 8 + +# non-factors write local dictionary + + Code + for (do in dict_ofs) { + print(read_parquet_page(tmp, do)[["data"]]) + } + Output + [1] 01 00 00 00 61 + [1] 01 00 00 00 61 + [1] 01 00 00 00 61 01 00 00 00 62 + [1] 01 00 00 00 62 + [1] 01 00 00 00 62 + [1] 01 00 00 00 63 + [1] 01 00 00 00 63 + [1] 01 00 00 00 63 + diff --git a/tests/testthat/test-utils.R b/tests/testthat/test-utils.R index f9067c6..b15ab03 100644 --- a/tests/testthat/test-utils.R +++ b/tests/testthat/test-utils.R @@ -51,3 +51,39 @@ test_that("is_uint32", { expect_false(is_uint32(NA_real_)) expect_false(is_uint32("foo")) }) + +test_that("is_icount", { + expect_true(is_icount(1L)) + expect_true(is_icount(100L)) + expect_true(is_icount(2147483647L)) + + expect_false(is_icount(NA_integer_)) + expect_false(is_icount(1:2)) + expect_false(is_icount(1)) + expect_false(is_icount(0L)) + expect_false(is_icount(-100L)) +}) + +test_that("is_dcount", { + expect_true(is_dcount(1)) + expect_true(is_dcount(100)) + expect_true(is_dcount(2147483647)) + + expect_false(is_dcount(NA_real_)) + expect_false(is_dcount(1:2)) + expect_false(is_dcount(1L)) + expect_false(is_dcount(0)) + expect_false(is_dcount(-100)) +}) + +test_that("as_count", { + expect_equal(as_count(1), 1L) + expect_equal(as_count(100), 100L) + expect_snapshot(error = TRUE, { + as_count(1:2) + as_count(0) + as_count(NA_real_) + as_count(-100) + as_count(-100L) + }) +}) diff --git a/tests/testthat/test-write-parquet-4.R b/tests/testthat/test-write-parquet-4.R index 140c87e..eb68e2a 100644 --- a/tests/testthat/test-write-parquet-4.R +++ b/tests/testthat/test-write-parquet-4.R @@ -4,7 +4,7 @@ test_that("errors", { .Call( nanoparquet_write, mtcars, tempfile(), dim(mtcars), 0L, list(character(), character()), rep(FALSE, ncol(mtcars)), - options, map_schema_to_df(NULL, mtcars), rep(10L, ncol(mtcars)), + options, map_schema_to_df(NULL, mtcars), rep(10L, ncol(mtcars)), 1L, sys.call() ) }) diff --git a/tests/testthat/test-write-parquet-row-groups.R b/tests/testthat/test-write-parquet-row-groups.R new file mode 100644 index 0000000..4c8d302 --- /dev/null +++ b/tests/testthat/test-write-parquet-row-groups.R @@ -0,0 +1,98 @@ +test_that("errors", { + expect_snapshot(error = TRUE, { + parquet_options(num_rows_per_row_group = "foobar") + }) + + df <- test_df() + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + expect_snapshot(error = TRUE, { + write_parquet(df, tmp, row_groups = "foobar") + write_parquet(df, tmp, row_groups = c(100L, 1L)) + write_parquet(df, tmp, row_groups = c(1L, 100L)) + }) +}) + +test_that("row groups", { + tmp1 <- tempfile(fileext = ".parquet") + tmp2 <- tempfile(fileext = ".parquet") + on.exit(unlink(c(tmp1, tmp2)), add = TRUE) + + df <- test_df() + write_parquet(df, tmp1, row_groups = 1L) + write_parquet(df, tmp2, row_groups = c(1L, 16L)) + expect_equal(read_parquet(tmp1), read_parquet(tmp2)) + expect_equal(nrow(read_parquet_metadata(tmp2)[["row_groups"]]), 2L) + + unlink(tmp2) + write_parquet(df, tmp2, row_groups = seq_len(nrow(df))) + expect_equal(read_parquet(tmp1), read_parquet(tmp2)) + expect_equal(nrow(read_parquet_metadata(tmp2)[["row_groups"]]), nrow(df)) + + unlink(tmp2) + withr::local_options(nanoparquet.num_rows_per_row_group = 10L) + write_parquet(df, tmp2) + expect_equal(read_parquet(tmp1), read_parquet(tmp2)) + expect_equal(nrow(read_parquet_metadata(tmp2)[["row_groups"]]), 4L) +}) + +test_that("grouped df", { + df <- test_df() + attr(df, "groups") <- data.frame( + cyl = c(4L, 6L, 8L), + .rows = I(list( + c(3L, 8L, 9L, 18L, 19L, 20L, 21L, 26L, 27L, 28L, 32L), + c(1L, 2L, 4L, 6L, 10L, 11L, 30L), + c(5L, 7L, 12L, 13L, 14L, 15L, 16L, 17L, 22L, 23L, 24L, 25L, 29L, 31L) + )) + ) + + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + expect_snapshot(write_parquet(df, tmp)) + expect_equal(nrow(read_parquet_metadata(tmp)[["row_groups"]]), 3L) + expect_snapshot(as.data.frame(read_parquet(tmp)[, c("nam", "cyl")])) +}) + +test_that("factors & factor levels", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + + df <- data.frame( + f = factor(c(rep("a", 100), rep("b", 100), rep("c", 100))) + ) + withr::local_options(nanoparquet.num_rows_per_row_group = 50L) + write_parquet(df, tmp) + expect_equal(as.data.frame(read_parquet(tmp)), df) + # the same dict is written into every dicitonary page + pgs <- read_parquet_pages(tmp) + dict_ofs <- pgs[["page_header_offset"]][ + pgs[["page_type"]] == "DICTIONARY_PAGE" + ] + dict_data <- read_parquet_page(tmp, dict_ofs[1])[["data"]] + for (do in dict_ofs) { + expect_equal(dict_data, read_parquet_page(tmp, do)[["data"]]) + } +}) + +test_that("non-factors write local dictionary", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + + df <- data.frame( + stringsAsFactors = FALSE, + f = c(rep("a", 100), rep("b", 100), rep("c", 100)) + ) + withr::local_options(nanoparquet.num_rows_per_row_group = 40L) + write_parquet(df, tmp) + expect_equal(as.data.frame(read_parquet(tmp)), df) + pgs <- read_parquet_pages(tmp) + dict_ofs <- pgs[["page_header_offset"]][ + pgs[["page_type"]] == "DICTIONARY_PAGE" + ] + expect_snapshot({ + for (do in dict_ofs) { + print(read_parquet_page(tmp, do)[["data"]]) + } + }) +})