From 4256242fa0b6d0beb7d1546d6a567ff6ddd8e749 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sat, 14 Sep 2024 09:55:08 +0200 Subject: [PATCH] Fix writing dict pages into multiple row groups Write local dicitonaries, expect for factors. --- R/porcelain.R | 4 +- src/dictionary-encoding.cpp | 36 +++++++--- src/lib/ParquetOutFile.cpp | 21 ++++-- src/lib/ParquetOutFile.h | 9 ++- src/rwrapper.cpp | 4 +- src/write.cpp | 70 +++++++++++-------- .../_snaps/write-parquet-row-groups.md | 16 +++++ .../testthat/test-write-parquet-row-groups.R | 31 ++++++++ 8 files changed, 137 insertions(+), 54 deletions(-) diff --git a/R/porcelain.R b/R/porcelain.R index 0dcecf5..d951a93 100644 --- a/R/porcelain.R +++ b/R/porcelain.R @@ -241,8 +241,8 @@ dict_encode <- function(x, n = length(x)) { .Call(nanoparquet_create_dict, x, n) } -dict_encode_idx <- function(x) { - .Call(nanoparquet_create_dict_idx, x, sys.call()) +dict_encode_idx <- function(x, from = 1L, until = length(x) + 1L ) { + .Call(nanoparquet_create_dict_idx, x, from - 1L, until - 1L, sys.call()) } lgl_avg_run_length <- function(x, n = length(x)) { diff --git a/src/dictionary-encoding.cpp b/src/dictionary-encoding.cpp index fc355be..e2edb03 100644 --- a/src/dictionary-encoding.cpp +++ b/src/dictionary-encoding.cpp @@ -175,8 +175,11 @@ SEXP nanoparquet_create_dict(SEXP x, SEXP rlen) { return Rf_ScalarInteger(dictlen); } -SEXP nanoparquet_create_dict_idx_(SEXP x) { - R_xlen_t dictlen, len = Rf_xlength(x); +SEXP nanoparquet_create_dict_idx_(SEXP x, SEXP from, SEXP until) { + int64_t cfrom = INTEGER(from)[0]; + int64_t cuntil = INTEGER(until)[0]; + int64_t len = cuntil - cfrom; + R_xlen_t dictlen; SEXP idx = PROTECT(Rf_allocVector(INTSXP, len)); SEXP dict = PROTECT(Rf_allocVector(INTSXP, len)); @@ -184,16 +187,16 @@ SEXP nanoparquet_create_dict_idx_(SEXP x) { int *iidx = INTEGER(idx); switch (TYPEOF(x)) { case LGLSXP: - dictlen = create_dict_idx(LOGICAL(x), iidx, idict, len, NA_LOGICAL); + dictlen = create_dict_idx(LOGICAL(x) + cfrom, iidx, idict, len, NA_LOGICAL); break; case INTSXP: - dictlen = create_dict_idx(INTEGER(x), idict, iidx, len, NA_INTEGER); + dictlen = create_dict_idx(INTEGER(x) + cfrom, idict, iidx, len, NA_INTEGER); break; case REALSXP: - dictlen = create_dict_real_idx(REAL(x), idict, iidx, len); + dictlen = create_dict_real_idx(REAL(x) + cfrom, idict, iidx, len); break; case STRSXP: { - dictlen = create_dict_ptr_idx((void**)STRING_PTR_RO(x),idict, iidx, len, (void*) NA_STRING); + dictlen = create_dict_ptr_idx((void**)(STRING_PTR_RO(x) + cfrom), idict, iidx, len, (void*) NA_STRING); break; } default: @@ -213,19 +216,32 @@ SEXP nanoparquet_create_dict_idx_(SEXP x) { return res; } +struct nanoparquet_create_dict_idx_data { + SEXP data; + SEXP from; + SEXP until; +}; + inline SEXP nanoparquet_create_dict_idx_wrapper(void *data) { - SEXP x = (SEXP) data; - return nanoparquet_create_dict_idx_(x); + struct nanoparquet_create_dict_idx_data *rdata = + (struct nanoparquet_create_dict_idx_data*) data; + return nanoparquet_create_dict_idx_( + rdata->data, + rdata->from, + rdata->until + ); } -SEXP nanoparquet_create_dict_idx(SEXP x, SEXP call) { +SEXP nanoparquet_create_dict_idx(SEXP x, SEXP from, SEXP until, SEXP call) { + + struct nanoparquet_create_dict_idx_data data = { x, from, until }; SEXP uwt = PROTECT(R_MakeUnwindCont()); R_API_START(call); SEXP ret = R_UnwindProtect( nanoparquet_create_dict_idx_wrapper, - (void*) x, + &data, throw_error, &uwt, uwt diff --git a/src/lib/ParquetOutFile.cpp b/src/lib/ParquetOutFile.cpp index 13499e0..be9006e 100644 --- a/src/lib/ParquetOutFile.cpp +++ b/src/lib/ParquetOutFile.cpp @@ -372,11 +372,14 @@ void ParquetOutFile::write_dictionary_indices_( std::ostream &file, uint32_t idx, uint32_t size, - uint64_t from, - uint64_t until) { + int64_t rg_from, + int64_t rg_until, + uint64_t page_from, + uint64_t page_until) { streampos start = file.tellp(); - write_dictionary_indices(file, idx, from, until); + write_dictionary_indices(file, idx, rg_from, rg_until, + page_from, page_until); streampos end = file.tellp(); if (end - start != size) { throw runtime_error( @@ -728,7 +731,8 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_indices_(*os0, idx, data_size, page_from, page_until); + write_dictionary_indices_(*os0, idx, data_size, rg_from, rg_until, + page_from, page_until); // 2. RLE encode buf_unc to buf_com uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); @@ -763,7 +767,8 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_indices_(*os0, idx, data_size, page_from, page_until); + write_dictionary_indices_(*os0, idx, data_size, rg_from, rg_until, + page_from, page_until); // 2. RLE encode buf_unc to buf_com uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); @@ -906,7 +911,8 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_indices_(*os0, idx, data_size, page_from, page_until); + write_dictionary_indices_(*os0, idx, data_size, rg_from, rg_until, + page_from, page_until); // 4. append RLE buf_unc to buf_com uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); @@ -961,7 +967,8 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_indices_(*os0, idx, data_size, page_from, page_until); + write_dictionary_indices_(*os0, idx, data_size, rg_from, rg_until, + page_from, page_until); // 4. append RLE buf_unc to buf_com uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); diff --git a/src/lib/ParquetOutFile.h b/src/lib/ParquetOutFile.h index 873aca6..494bc6f 100644 --- a/src/lib/ParquetOutFile.h +++ b/src/lib/ParquetOutFile.h @@ -81,7 +81,9 @@ class ParquetOutFile { int64_t until) = 0; // Needs to write indices as int32_t virtual void write_dictionary_indices(std::ostream &file, uint32_t idx, - uint64_t from, uint64_t until) = 0; + int64_t rg_from, int64_t rg_until, + uint64_t page_from, + uint64_t page_until) = 0; int data_page_version = 1; @@ -126,8 +128,9 @@ class ParquetOutFile { parquet::SchemaElement &sel, int64_t from, int64_t until); void write_dictionary_indices_(std::ostream &file, uint32_t idx, - uint32_t size, uint64_t from, - uint64_t until); + uint32_t size, int64_t rg_from, + int64_t rg_ti, uint64_t page_from, + uint64_t page_until); size_t compress(parquet::CompressionCodec::type codec, ByteBuffer &src, uint32_t src_size, ByteBuffer &tgt, diff --git a/src/rwrapper.cpp b/src/rwrapper.cpp index 54b5dd9..4e4943b 100644 --- a/src/rwrapper.cpp +++ b/src/rwrapper.cpp @@ -41,7 +41,7 @@ SEXP nanoparquet_unpack_bits_int32(SEXP x, SEXP bit_width, SEXP n); SEXP nanoparquet_pack_bits_int32(SEXP x, SEXP bit_width); SEXP nanoparquet_create_dict(SEXP x, SEXP l); -SEXP nanoparquet_create_dict_idx(SEXP x, SEXP call); +SEXP nanoparquet_create_dict_idx(SEXP x, SEXP from, SEXP until, SEXP call); SEXP nanoparquet_avg_run_length(SEXP x, SEXP len); SEXP nanoparquet_base64_decode(SEXP x); @@ -110,7 +110,7 @@ static const R_CallMethodDef R_CallDef[] = { CALLDEF(nanoparquet_unpack_bits_int32, 3), CALLDEF(nanoparquet_pack_bits_int32, 2), CALLDEF(nanoparquet_create_dict, 2), - CALLDEF(nanoparquet_create_dict_idx, 2), + CALLDEF(nanoparquet_create_dict_idx, 4), CALLDEF(nanoparquet_avg_run_length, 2), CALLDEF(nanoparquet_base64_decode, 1), CALLDEF(nanoparquet_base64_encode, 1), diff --git a/src/write.cpp b/src/write.cpp index 8af4c9f..ce3616f 100644 --- a/src/write.cpp +++ b/src/write.cpp @@ -87,7 +87,7 @@ static uint16_t double_to_float16(double x) { extern "C" { SEXP nanoparquet_create_dict(SEXP x, SEXP rlen); -SEXP nanoparquet_create_dict_idx_(SEXP x); +SEXP nanoparquet_create_dict_idx_(SEXP x, SEXP from, SEXP until); SEXP nanoparquet_avg_run_length(SEXP x, SEXP rlen); } @@ -143,7 +143,8 @@ class RParquetOutFile : public ParquetOutFile { parquet::SchemaElement &sel, int64_t from, int64_t until); void write_dictionary_indices(std::ostream &file, uint32_t idx, - uint64_t from, uint64_t until); + int64_t rg_from, int64_t rg_until, + uint64_t page_from, uint64_t page_until); void write( SEXP dfsxp, @@ -159,9 +160,10 @@ class RParquetOutFile : public ParquetOutFile { SEXP df = R_NilValue; SEXP required = R_NilValue; SEXP dicts = R_NilValue; + SEXP dicts_from = R_NilValue; ByteBuffer present; - void create_dictionary(uint32_t idx); + void create_dictionary(uint32_t idx, int64_t from, int64_t until); // for LGLSXP this mean RLE encoding bool should_use_dict_encoding(uint32_t idx); parquet::Encoding::type @@ -182,16 +184,20 @@ RParquetOutFile::RParquetOutFile( ParquetOutFile(stream, codec, row_groups) { } -void RParquetOutFile::create_dictionary(uint32_t idx) { - // olny do it once - if (!Rf_isNull(VECTOR_ELT(dicts, idx))) { +void RParquetOutFile::create_dictionary(uint32_t idx, int64_t from, + int64_t until) { + if (!Rf_isNull(VECTOR_ELT(dicts, idx)) && + INTEGER(dicts_from)[idx] == from) { return; } SEXP col = VECTOR_ELT(df, idx); - SEXP d = PROTECT(nanoparquet_create_dict_idx_(col)); + SEXP sfrom = PROTECT(Rf_ScalarInteger(from)); + SEXP suntil = PROTECT(Rf_ScalarInteger(until)); + SEXP d = PROTECT(nanoparquet_create_dict_idx_(col, sfrom, suntil)); SET_VECTOR_ELT(dicts, idx, d); - UNPROTECT(1); + INTEGER(dicts_from)[idx] = from; + UNPROTECT(3); } static const char *enc_[] = { @@ -1547,7 +1553,7 @@ uint32_t RParquetOutFile::get_num_values_dictionary( if (Rf_inherits(col, "factor")) { return Rf_nlevels(col); } else { - create_dictionary(idx); + create_dictionary(idx, from, until); return Rf_length(VECTOR_ELT(VECTOR_ELT(dicts, idx), 0)); } } @@ -1574,7 +1580,7 @@ uint32_t RParquetOutFile::get_size_dictionary( UNPROTECT(1); return size; } else { - create_dictionary(idx); + create_dictionary(idx, from, until); SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); if (type == parquet::Type::INT32) { return Rf_xlength(dictidx) * sizeof(int); @@ -1593,7 +1599,7 @@ uint32_t RParquetOutFile::get_size_dictionary( break; } case REALSXP: { - create_dictionary(idx); + create_dictionary(idx, from, until); SEXP dict = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); if (type == parquet::Type::DOUBLE) { return Rf_xlength(dict) * sizeof(double); @@ -1618,7 +1624,7 @@ uint32_t RParquetOutFile::get_size_dictionary( } case STRSXP: { // need to count the length of the stings that are indexed in dict - create_dictionary(idx); + create_dictionary(idx, from, until); SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); R_xlen_t len = Rf_xlength(dictidx); bool is_uuid = sel.__isset.logicalType && sel.logicalType.__isset.UUID; @@ -1639,7 +1645,7 @@ uint32_t RParquetOutFile::get_size_dictionary( } case LGLSXP: { // this does not happen, no dictionaries for BOOLEAN, makes no sense - create_dictionary(idx); // # nocov + create_dictionary(idx, from, until); // # nocov SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); // # nocov R_xlen_t l = Rf_xlength(dictidx); // # nocov return l / 8 + (l % 8 > 0); // # nocov @@ -1684,7 +1690,7 @@ void RParquetOutFile::write_dictionary( } else { SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); R_xlen_t len = Rf_xlength(dictidx); - int *icol = INTEGER(col); + int *icol = INTEGER(col) + from; int *iidx = INTEGER(dictidx); switch (type) { case parquet::Type::INT32: { @@ -1786,7 +1792,7 @@ void RParquetOutFile::write_dictionary( case REALSXP: { SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); R_xlen_t len = Rf_xlength(dictidx); - double *icol = REAL(col); + double *icol = REAL(col) + from; int *iidx = INTEGER(dictidx); if (Rf_inherits(col, "POSIXct")) { if (type != parquet::Type::INT64) { @@ -2049,7 +2055,7 @@ void RParquetOutFile::write_dictionary( R_xlen_t len = Rf_xlength(dictidx); int *iidx = INTEGER(dictidx); for (uint64_t i = 0; i < len; i++) { - const char *c = CHAR(STRING_ELT(col, iidx[i])); + const char *c = CHAR(STRING_ELT(col, from + iidx[i])); uint32_t len1 = strlen(c); file.write((const char *)&len1, 4); file.write(c, len1); @@ -2064,7 +2070,7 @@ void RParquetOutFile::write_dictionary( R_xlen_t len = Rf_xlength(dictidx); int *iidx = INTEGER(dictidx); for (uint64_t i = 0; i < len; i++) { - const char *c = CHAR(STRING_ELT(col, iidx[i])); + const char *c = CHAR(STRING_ELT(col, from + iidx[i])); if (!parse_uuid(c, u, tmp)) { Rf_errorcall( nanoparquet_call, @@ -2078,7 +2084,7 @@ void RParquetOutFile::write_dictionary( R_xlen_t len = Rf_xlength(dictidx); int *iidx = INTEGER(dictidx); for (uint64_t i = 0; i < len; i++) { - const char *c = CHAR(STRING_ELT(col, iidx[i])); + const char *c = CHAR(STRING_ELT(col, from + iidx[i])); uint32_t len1 = strlen(c); if (len1 != sel.type_length) { Rf_errorcall( @@ -2113,7 +2119,7 @@ void RParquetOutFile::write_dictionary( SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); R_xlen_t len = Rf_xlength(dictidx); SEXP dict = PROTECT(Rf_allocVector(LGLSXP, len)); - int *icol = LOGICAL(col); + int *icol = LOGICAL(col) + from; int *iidx = INTEGER(dictidx); int *idict = LOGICAL(dict); for (auto i = 0; i < len; i++) { @@ -2131,18 +2137,18 @@ void RParquetOutFile::write_dictionary( void RParquetOutFile::write_dictionary_indices( std::ostream &file, uint32_t idx, - uint64_t from, - uint64_t until) { + int64_t rg_from, + int64_t rg_until, + uint64_t page_from, + uint64_t page_until) { + + // Both all of rg_* and page_* are in absolute coordinates SEXP col = VECTOR_ELT(df, idx); - if (until > Rf_xlength(col)) { - Rf_errorcall( // # nocov - nanoparquet_call, // # nocov - "Internal nanoparquet error, row index too large" - ); - } if (TYPEOF(col) == INTSXP && Rf_inherits(col, "factor")) { - for (uint64_t i = from; i < until; i++) { + // there is a single dict for a factor, which is used in all row + // groups, so we use absolute coordinates here + for (uint64_t i = page_from; i < page_until; i++) { int el = INTEGER(col)[i]; if (el != NA_INTEGER) { el--; @@ -2151,7 +2157,10 @@ void RParquetOutFile::write_dictionary_indices( } } else { SEXP dictmap = VECTOR_ELT(VECTOR_ELT(dicts, idx), 1); - for (uint64_t i = from; i < until; i++) { + // there is a separate dict for each row group, so we need to convert + // the absolute page_* coordinates to relative coordinates, starting + // at rg_from + for (uint64_t i = page_from - rg_from; i < page_until - rg_from; i++) { int el = INTEGER(dictmap)[i]; if (el != NA_INTEGER) { file.write((const char *) &el, sizeof(int)); @@ -2392,6 +2401,7 @@ void RParquetOutFile::write( df = dfsxp; required = rrequired; dicts = PROTECT(Rf_allocVector(VECSXP, Rf_length(df))); + dicts_from = PROTECT(Rf_allocVector(INTSXP, Rf_length(df))); SEXP nms = PROTECT(Rf_getAttrib(dfsxp, R_NamesSymbol)); int *type = INTEGER(VECTOR_ELT(schema, 3)); int *type_length = INTEGER(VECTOR_ELT(schema, 4)); @@ -2458,7 +2468,7 @@ void RParquetOutFile::write( ParquetOutFile::write(); - UNPROTECT(2); + UNPROTECT(3); } extern "C" { diff --git a/tests/testthat/_snaps/write-parquet-row-groups.md b/tests/testthat/_snaps/write-parquet-row-groups.md index ef8989d..696e283 100644 --- a/tests/testthat/_snaps/write-parquet-row-groups.md +++ b/tests/testthat/_snaps/write-parquet-row-groups.md @@ -70,3 +70,19 @@ 31 Ford Pantera L 8 32 Maserati Bora 8 +# non-factors write local dictionary + + Code + for (do in dict_ofs) { + print(read_parquet_page(tmp, do)[["data"]]) + } + Output + [1] 01 00 00 00 61 + [1] 01 00 00 00 61 + [1] 01 00 00 00 61 01 00 00 00 62 + [1] 01 00 00 00 62 + [1] 01 00 00 00 62 + [1] 01 00 00 00 63 + [1] 01 00 00 00 63 + [1] 01 00 00 00 63 + diff --git a/tests/testthat/test-write-parquet-row-groups.R b/tests/testthat/test-write-parquet-row-groups.R index 90d7d48..4c8d302 100644 --- a/tests/testthat/test-write-parquet-row-groups.R +++ b/tests/testthat/test-write-parquet-row-groups.R @@ -64,4 +64,35 @@ test_that("factors & factor levels", { withr::local_options(nanoparquet.num_rows_per_row_group = 50L) write_parquet(df, tmp) expect_equal(as.data.frame(read_parquet(tmp)), df) + # the same dict is written into every dicitonary page + pgs <- read_parquet_pages(tmp) + dict_ofs <- pgs[["page_header_offset"]][ + pgs[["page_type"]] == "DICTIONARY_PAGE" + ] + dict_data <- read_parquet_page(tmp, dict_ofs[1])[["data"]] + for (do in dict_ofs) { + expect_equal(dict_data, read_parquet_page(tmp, do)[["data"]]) + } +}) + +test_that("non-factors write local dictionary", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + + df <- data.frame( + stringsAsFactors = FALSE, + f = c(rep("a", 100), rep("b", 100), rep("c", 100)) + ) + withr::local_options(nanoparquet.num_rows_per_row_group = 40L) + write_parquet(df, tmp) + expect_equal(as.data.frame(read_parquet(tmp)), df) + pgs <- read_parquet_pages(tmp) + dict_ofs <- pgs[["page_header_offset"]][ + pgs[["page_type"]] == "DICTIONARY_PAGE" + ] + expect_snapshot({ + for (do in dict_ofs) { + print(read_parquet_page(tmp, do)[["data"]]) + } + }) })