diff --git a/NEWS.md b/NEWS.md index 52b3ca4..1c07a5d 100644 --- a/NEWS.md +++ b/NEWS.md @@ -21,6 +21,10 @@ `write_parquet()`, to specify how the columns of a data frame should be mapped to Parquet types. + - `write_parquet()` can now write multiple row groups. By default it puts + at most 10 million rows into a single row group. You can choose the + row groups manually with the `row_groups` argument. + - Newly supported type conversions in `write_parquet()` via the schema argument: diff --git a/src/lib/ParquetOutFile.cpp b/src/lib/ParquetOutFile.cpp index c6b4e4b..13499e0 100644 --- a/src/lib/ParquetOutFile.cpp +++ b/src/lib/ParquetOutFile.cpp @@ -349,11 +349,13 @@ void ParquetOutFile::write_dictionary_( std::ostream &file, uint32_t idx, uint32_t size, - parquet::SchemaElement &sel) { + parquet::SchemaElement &sel, + int64_t from, + int64_t until) { ColumnMetaData *cmd = &(column_meta_data[idx]); uint32_t start = file.tellp(); - write_dictionary(file, idx, sel); + write_dictionary(file, idx, sel, from, until); uint32_t end = file.tellp(); if (end - start != size) { throw runtime_error( @@ -523,7 +525,7 @@ void ParquetOutFile::write_column(uint32_t idx, int64_t from, int64_t until) { cmd->__set_total_uncompressed_size(0); if (encodings[idx] == Encoding::RLE_DICTIONARY) { uint32_t dictionary_page_offset = pfile.tellp(); - write_dictionary_page(idx); + write_dictionary_page(idx, from, until); cmd->__set_dictionary_page_offset(dictionary_page_offset); } uint32_t data_offset = pfile.tellp(); @@ -549,13 +551,14 @@ void ParquetOutFile::write_page_header(uint32_t idx, PageHeader &ph) { // Currently only for byte arrays, more later -void ParquetOutFile::write_dictionary_page(uint32_t idx) { +void ParquetOutFile::write_dictionary_page(uint32_t idx, int64_t from, + int64_t until) { ColumnMetaData *cmd = &(column_meta_data[idx]); SchemaElement se = schemas[idx + 1]; // Uncompresed size of the dictionary in bytes - uint32_t dict_size = get_size_dictionary(idx, se); + uint32_t dict_size = get_size_dictionary(idx, se, from, until); // Number of entries in the dicitonary - uint32_t num_dict_values = get_num_values_dictionary(idx); + uint32_t num_dict_values = get_num_values_dictionary(idx, from, until); // Init page header PageHeader ph; @@ -572,7 +575,7 @@ void ParquetOutFile::write_dictionary_page(uint32_t idx) { // then the data directly to the file ph.__set_compressed_page_size(dict_size); write_page_header(idx, ph); - write_dictionary_(pfile, idx, dict_size, se); + write_dictionary_(pfile, idx, dict_size, se, from, until); } else { // With compression we need two temporary buffers @@ -580,7 +583,7 @@ void ParquetOutFile::write_dictionary_page(uint32_t idx) { buf_unc.reset(dict_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_(*os0, idx, dict_size, se); + write_dictionary_(*os0, idx, dict_size, se, from, until); // 2. compress buf_unc to buf_com size_t cdict_size = compress(cmd->codec, buf_unc, dict_size, buf_com); @@ -603,7 +606,7 @@ void ParquetOutFile::write_data_pages(uint32_t idx, int64_t from, total_size = calculate_column_data_size(idx, rg_num_rows, from, until); } else { // estimate the max RLE length - uint32_t num_values = get_num_values_dictionary(idx); + uint32_t num_values = get_num_values_dictionary(idx, from, until); uint8_t bit_width = ceil(log2((double) num_values)); total_size = MaxRleBpSizeSimple(rg_num_rows, bit_width); } @@ -638,20 +641,22 @@ void ParquetOutFile::write_data_pages(uint32_t idx, int64_t from, if (page_until > until) { page_until = until; } - write_data_page(idx, page_from, page_until); + write_data_page(idx, from, until, page_from, page_until); } } -void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, - uint64_t until) { +void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from, + int64_t rg_until, uint64_t page_from, + uint64_t page_until) { ColumnMetaData *cmd = &(column_meta_data[idx]); SchemaElement se = schemas[idx + 1]; PageHeader ph; DataPageHeaderV2 dph2; + uint32_t page_num_values = page_until - page_from; if (data_page_version == 1) { ph.__set_type(PageType::DATA_PAGE); DataPageHeader dph; - dph.__set_num_values(until - from); + dph.__set_num_values(page_num_values); dph.__set_encoding(encodings[idx]); if (se.repetition_type == FieldRepetitionType::OPTIONAL) { dph.__set_definition_level_encoding(Encoding::RLE); @@ -660,8 +665,8 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, ph.__set_data_page_header(dph); } else if (data_page_version == 2) { ph.__set_type(PageType::DATA_PAGE_V2); - dph2.__set_num_values(until - from); - dph2.__set_num_rows(until - from); + dph2.__set_num_values(page_num_values); + dph2.__set_num_rows(page_num_values); dph2.__set_encoding(encodings[idx]); // these might be overwritten later if there are NAs dph2.__set_num_nulls(0); @@ -679,7 +684,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, // CASE 1: REQ, PLAIN, UNC // 1. write directly to file uint32_t data_size = calculate_column_data_size( - idx, until - from, from, until + idx, page_num_values, page_from, page_until ); ph.__set_uncompressed_page_size(data_size); ph.__set_compressed_page_size(data_size); @@ -687,7 +692,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, ph.__set_data_page_header_v2(dph2); } write_page_header(idx, ph); - write_data_(pfile, idx, data_size, from, until); + write_data_(pfile, idx, data_size, page_from, page_until); } else if (se.repetition_type == FieldRepetitionType::REQUIRED && encodings[idx] == Encoding::PLAIN && @@ -695,13 +700,13 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, // CASE 2: REQ, PLAIN, COMP // 1. write data to buf_unc uint32_t data_size = calculate_column_data_size( - idx, until - from, from, until + idx, page_num_values, page_from, page_until ); ph.__set_uncompressed_page_size(data_size); buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_data_(*os0, idx, data_size, from, until); + write_data_(*os0, idx, data_size, page_from, page_until); // 2. compress buf_unc to buf_com size_t cdata_size = compress(cmd->codec, buf_unc, data_size, buf_com); @@ -719,18 +724,18 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec == CompressionCodec::UNCOMPRESSED) { // CASE 3: REQ, RLE_DICT, UNC // 1. write dictionary indices to buf_unc - uint32_t data_size = (until - from) * sizeof(int); + uint32_t data_size = (page_num_values) * sizeof(int); buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_indices_(*os0, idx, data_size, from, until); + write_dictionary_indices_(*os0, idx, data_size, page_from, page_until); // 2. RLE encode buf_unc to buf_com - uint32_t num_dict_values = get_num_values_dictionary(idx); + uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); uint8_t bit_width = ceil(log2((double) num_dict_values)); uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, bit_width, true @@ -753,19 +758,19 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec != CompressionCodec::UNCOMPRESSED) { // CASE 4: REQ, RLE_DICT, COMP // 1. write dictionary indices to buf_unc - uint32_t data_size = (until - from) * sizeof(int); + uint32_t data_size = page_num_values * sizeof(int); ph.__set_uncompressed_page_size(data_size); buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_indices_(*os0, idx, data_size, from, until); + write_dictionary_indices_(*os0, idx, data_size, page_from, page_until); // 2. RLE encode buf_unc to buf_com - uint32_t num_dict_values = get_num_values_dictionary(idx); + uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); uint8_t bit_width = ceil(log2((double) num_dict_values)); uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, bit_width, true, // add_bit_width @@ -792,24 +797,24 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec == CompressionCodec::UNCOMPRESSED) { // CASE 5: OPT, PLAIN, UNC // 1. write definition levels to buf_unc - uint32_t miss_size = (until - from) * sizeof(int); + uint32_t miss_size = page_num_values * sizeof(int); buf_unc.reset(miss_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - uint32_t num_present = write_present(*os0, idx, from, until); + uint32_t num_present = write_present(*os0, idx, page_from, page_until); // 2. RLE buf_unc to buf_com - uint32_t rle_size = rle_encode(buf_unc, until - from, buf_com, 1, false); + uint32_t rle_size = rle_encode(buf_unc, page_num_values, buf_com, 1, false); // 3. Write buf_unc to file uint32_t data_size = calculate_column_data_size( - idx, num_present, from, until + idx, num_present, page_from, page_until ); int prep_length = data_page_version == 1 ? 4 : 0; ph.__set_uncompressed_page_size(data_size + rle_size + prep_length); ph.__set_compressed_page_size(data_size + rle_size + prep_length); if (data_page_version == 2) { - dph2.__set_num_nulls(until - from - num_present); + dph2.__set_num_nulls(page_num_values - num_present); dph2.__set_definition_levels_byte_length(rle_size); ph.__set_data_page_header_v2(dph2); } @@ -823,23 +828,23 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, ); // 4. write data to file - write_present_data_(pfile, idx, data_size, num_present, from, until); + write_present_data_(pfile, idx, data_size, num_present, page_from, page_until); } else if (se.repetition_type == FieldRepetitionType::OPTIONAL && encodings[idx] == Encoding::PLAIN && cmd->codec != CompressionCodec::UNCOMPRESSED) { // CASE 6: OPT, PLAIN, COMP // 1. write definition levels to buf_unc - uint32_t miss_size = (until - from) * sizeof(int); + uint32_t miss_size = page_num_values * sizeof(int); buf_unc.reset(miss_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - uint32_t num_present = write_present(*os0, idx, from, until); + uint32_t num_present = write_present(*os0, idx, page_from, page_until); // 2. RLE buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -848,13 +853,13 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, // 3. Append data to buf_com uint32_t data_size = calculate_column_data_size( - idx, num_present, from, until + idx, num_present, page_from, page_until ); buf_com.resize(rle_size + data_size, true); std::unique_ptr os1 = std::unique_ptr(new std::ostream(&buf_com)); buf_com.skip(rle_size); - write_present_data_(*os1, idx, data_size, num_present, from, until); + write_present_data_(*os1, idx, data_size, num_present, page_from, page_until); // 4. compress buf_com to buf_unc // for data page v2, the def levels are not compressed! @@ -865,7 +870,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, ph.__set_uncompressed_page_size(rle_size + data_size); ph.__set_compressed_page_size(comp_size); if (data_page_version == 2) { - dph2.__set_num_nulls(until - from - num_present); + dph2.__set_num_nulls(page_num_values - num_present); dph2.__set_definition_levels_byte_length(rle_size); ph.__set_data_page_header_v2(dph2); } @@ -880,16 +885,16 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec == CompressionCodec::UNCOMPRESSED) { // CASE 7: OPT RLE_DICT UNC // 1. write definition levels to buf_unc - uint32_t miss_size = (until - from) * sizeof(int); + uint32_t miss_size = page_num_values * sizeof(int); buf_unc.reset(miss_size); std::unique_ptr os1 = std::unique_ptr(new std::ostream(&buf_unc)); - uint32_t num_present = write_present(*os1, idx, from, until); + uint32_t num_present = write_present(*os1, idx, page_from, page_until); // 2. RLE buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -901,10 +906,10 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_indices_(*os0, idx, data_size, from, until); + write_dictionary_indices_(*os0, idx, data_size, page_from, page_until); // 4. append RLE buf_unc to buf_com - uint32_t num_dict_values = get_num_values_dictionary(idx); + uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); uint8_t bit_width = ceil(log2((double) num_dict_values)); uint32_t rle2_size = rle_encode( buf_unc, @@ -920,7 +925,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, ph.__set_uncompressed_page_size(rle2_size); ph.__set_compressed_page_size(rle2_size); if (data_page_version == 2) { - dph2.__set_num_nulls(until - from - num_present); + dph2.__set_num_nulls(page_num_values - num_present); dph2.__set_definition_levels_byte_length(rle_size); ph.__set_data_page_header_v2(dph2); } @@ -935,16 +940,16 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec != CompressionCodec::UNCOMPRESSED) { // CASE 8: OPT RLE_DICT COM // 1. write definition levels to buf_unc - uint32_t miss_size = (until - from) * sizeof(int); + uint32_t miss_size = page_num_values * sizeof(int); buf_unc.reset(miss_size); std::unique_ptr os1 = std::unique_ptr(new std::ostream(&buf_unc)); - uint32_t num_present = write_present(*os1, idx, from, until); + uint32_t num_present = write_present(*os1, idx, page_from, page_until); // 2. RLE buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -956,10 +961,10 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_dictionary_indices_(*os0, idx, data_size, from, until); + write_dictionary_indices_(*os0, idx, data_size, page_from, page_until); // 4. append RLE buf_unc to buf_com - uint32_t num_dict_values = get_num_values_dictionary(idx); + uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until); uint8_t bit_width = ceil(log2((double) num_dict_values)); uint32_t rle2_size = rle_encode( buf_unc, @@ -979,7 +984,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, ph.__set_uncompressed_page_size(rle2_size); ph.__set_compressed_page_size(crle2_size); if (data_page_version == 2) { - dph2.__set_num_nulls(until - from - num_present); + dph2.__set_num_nulls(page_num_values - num_present); dph2.__set_definition_levels_byte_length(rle_size); ph.__set_data_page_header_v2(dph2); } @@ -994,16 +999,16 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec == CompressionCodec::UNCOMPRESSED) { // CASE 9: REQ, RLE, UNCOMP // 1. write logicals into buf_unc - uint32_t data_size = (until - from) * sizeof(int); + uint32_t data_size = page_num_values * sizeof(int); buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_boolean_as_int(*os0, idx, from, until); + write_boolean_as_int(*os0, idx, page_from, page_until); // 2. RLE encode buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -1024,16 +1029,16 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec != CompressionCodec::UNCOMPRESSED) { // CASE 10: REQ, RLE, COMP // 1. write logicals into buf_unc - uint32_t data_size = (until - from) * sizeof(int); + uint32_t data_size = page_num_values * sizeof(int); buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_boolean_as_int(*os0, idx, from, until); + write_boolean_as_int(*os0, idx, page_from, page_until); // 2. RLE encode buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -1057,16 +1062,16 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec == CompressionCodec::UNCOMPRESSED) { // CASE 11: OPT RLE UNC // 1. write definition levels to buf_unc - uint32_t miss_size = (until - from) * sizeof(int); + uint32_t miss_size = page_num_values * sizeof(int); buf_unc.reset(miss_size); std::unique_ptr os1 = std::unique_ptr(new std::ostream(&buf_unc)); - uint32_t num_present = write_present(*os1, idx, from, until); + uint32_t num_present = write_present(*os1, idx, page_from, page_until); // 2. RLE buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -1078,7 +1083,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_present_boolean_as_int(*os0, idx, num_present, from, until); + write_present_boolean_as_int(*os0, idx, num_present, page_from, page_until); // 4. append RLE buf_unc to buf_com uint32_t rle2_size = rle_encode( @@ -1105,16 +1110,16 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, cmd->codec != CompressionCodec::UNCOMPRESSED) { // CASE 12: OPT RLE COMP // 1. write definition levels to buf_unc - uint32_t miss_size = (until - from) * sizeof(int); + uint32_t miss_size = page_num_values * sizeof(int); buf_unc.reset(miss_size); std::unique_ptr os1 = std::unique_ptr(new std::ostream(&buf_unc)); - uint32_t num_present = write_present(*os1, idx, from, until); + uint32_t num_present = write_present(*os1, idx, page_from, page_until); // 2. RLE buf_unc to buf_com uint32_t rle_size = rle_encode( buf_unc, - until - from, + page_num_values, buf_com, 1, // bit_width false, // add_bit_width @@ -1126,7 +1131,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from, buf_unc.reset(data_size); std::unique_ptr os0 = std::unique_ptr(new std::ostream(&buf_unc)); - write_present_boolean_as_int(*os0, idx, num_present, from, until); + write_present_boolean_as_int(*os0, idx, num_present, page_from, page_until); // 4. append RLE buf_unc to buf_com uint32_t rle2_size = rle_encode( diff --git a/src/lib/ParquetOutFile.h b/src/lib/ParquetOutFile.h index a2416ef..873aca6 100644 --- a/src/lib/ParquetOutFile.h +++ b/src/lib/ParquetOutFile.h @@ -70,12 +70,15 @@ class ParquetOutFile { virtual uint32_t get_size_byte_array(uint32_t idx, uint32_t num_present, uint64_t from, uint64_t until) = 0; - virtual uint32_t get_num_values_dictionary(uint32_t idx) = 0; + virtual uint32_t get_num_values_dictionary(uint32_t idx, int64_t from, + int64_t until) = 0; virtual uint32_t get_size_dictionary(uint32_t idx, - parquet::SchemaElement &sel) = 0; + parquet::SchemaElement &sel, + int64_t from, int64_t until) = 0; virtual void write_dictionary(std::ostream &file, uint32_t idx, - parquet::SchemaElement &sel) = 0; + parquet::SchemaElement &sel, int64_t from, + int64_t until) = 0; // Needs to write indices as int32_t virtual void write_dictionary_indices(std::ostream &file, uint32_t idx, uint64_t from, uint64_t until) = 0; @@ -107,9 +110,10 @@ class ParquetOutFile { // return total size int64_t write_columns(int64_t from, int64_t until); void write_column(uint32_t idx, int64_t from, int64_t until); - void write_dictionary_page(uint32_t idx); + void write_dictionary_page(uint32_t idx, int64_t from, int64_t until); void write_data_pages(uint32_t idx, int64_t from, int64_t until); - void write_data_page(uint32_t idx, uint64_t from, uint64_t until); + void write_data_page(uint32_t idx, int64_t rg_from, int64_t rg_until, + uint64_t from, uint64_t until); void write_page_header(uint32_t idx, parquet::PageHeader &ph); void write_footer(); @@ -119,7 +123,8 @@ class ParquetOutFile { uint32_t size, uint32_t num_present, uint64_t from, uint64_t until); void write_dictionary_(std::ostream &file, uint32_t idx, uint32_t size, - parquet::SchemaElement &sel); + parquet::SchemaElement &sel, int64_t from, + int64_t until); void write_dictionary_indices_(std::ostream &file, uint32_t idx, uint32_t size, uint64_t from, uint64_t until); diff --git a/src/write.cpp b/src/write.cpp index 3b99ff4..8af4c9f 100644 --- a/src/write.cpp +++ b/src/write.cpp @@ -135,10 +135,13 @@ class RParquetOutFile : public ParquetOutFile { uint64_t until); // for dictionaries - uint32_t get_num_values_dictionary(uint32_t idx); - uint32_t get_size_dictionary(uint32_t idx, parquet::SchemaElement &type); + uint32_t get_num_values_dictionary(uint32_t idx, int64_t form, + int64_t until); + uint32_t get_size_dictionary(uint32_t idx, parquet::SchemaElement &type, + int64_t from, int64_t until); void write_dictionary(std::ostream &file, uint32_t idx, - parquet::SchemaElement &sel); + parquet::SchemaElement &sel, int64_t from, + int64_t until); void write_dictionary_indices(std::ostream &file, uint32_t idx, uint64_t from, uint64_t until); @@ -1537,7 +1540,9 @@ void RParquetOutFile::write_present_boolean( } uint32_t RParquetOutFile::get_num_values_dictionary( - uint32_t idx) { + uint32_t idx, + int64_t from, + int64_t until) { SEXP col = VECTOR_ELT(df, idx); if (Rf_inherits(col, "factor")) { return Rf_nlevels(col); @@ -1549,7 +1554,9 @@ uint32_t RParquetOutFile::get_num_values_dictionary( uint32_t RParquetOutFile::get_size_dictionary( uint32_t idx, - parquet::SchemaElement &sel) { + parquet::SchemaElement &sel, + int64_t from, + int64_t until) { SEXP col = VECTOR_ELT(df, idx); parquet::Type::type type = sel.type; @@ -1646,7 +1653,9 @@ uint32_t RParquetOutFile::get_size_dictionary( void RParquetOutFile::write_dictionary( std::ostream &file, uint32_t idx, - parquet::SchemaElement &sel) { + parquet::SchemaElement &sel, + int64_t from, + int64_t until) { parquet::Type::type type = sel.type; diff --git a/tests/testthat/test-write-parquet-row-groups.R b/tests/testthat/test-write-parquet-row-groups.R index 2eb28a7..90d7d48 100644 --- a/tests/testthat/test-write-parquet-row-groups.R +++ b/tests/testthat/test-write-parquet-row-groups.R @@ -48,7 +48,20 @@ test_that("grouped df", { ) tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) expect_snapshot(write_parquet(df, tmp)) expect_equal(nrow(read_parquet_metadata(tmp)[["row_groups"]]), 3L) expect_snapshot(as.data.frame(read_parquet(tmp)[, c("nam", "cyl")])) }) + +test_that("factors & factor levels", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + + df <- data.frame( + f = factor(c(rep("a", 100), rep("b", 100), rep("c", 100))) + ) + withr::local_options(nanoparquet.num_rows_per_row_group = 50L) + write_parquet(df, tmp) + expect_equal(as.data.frame(read_parquet(tmp)), df) +})