Skip to content

Commit

Permalink
Merge pull request #83 from r-lib/write-data-page-v2
Browse files Browse the repository at this point in the history
Support writing v2 data pages
  • Loading branch information
gaborcsardi authored Aug 12, 2024
2 parents baf7cbb + c5a8712 commit 6d2823d
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 38 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
- `list` of `raw` vectors to `BYTE_ARRAY`,
- `list` of `raw` vectors to `FIXED_LEN_BYTE_ARRAY`.

* `write_parquet()` can now write version 2 data pages. The default is
still version 1, but it might change in the future.

* `write_parquet(file = ":raw:")` now works correctly for larger data
frames (#77).

Expand Down
14 changes: 12 additions & 2 deletions R/options.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#' to tell which without using the Arrow metadata.
#' @param write_arrow_metadata Whether to add the Apache Arrow types as
#' metadata to the file [write_parquet()].
#' @param write_data_page_version Data version to write by default.
#' Possible values are 1 and 2. Default is 1.
#'
#' @return List of nanoparquet options.
#'
Expand All @@ -36,15 +38,23 @@
parquet_options <- function(
class = getOption("nanoparquet.class", "tbl"),
use_arrow_metadata = getOption("nanoparquet.use_arrow_metadata", TRUE),
write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE)
write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE),
write_data_page_version = getOption("nanoparquet.write_data_page_version", 1L)
) {
stopifnot(is.character(class))
stopifnot(is_flag(use_arrow_metadata))
stopifnot(is_flag(write_arrow_metadata))
stopifnot(
identical(write_data_page_version, 1) ||
identical(write_data_page_version, 2) ||
identical(write_data_page_version, 1L) ||
identical(write_data_page_version, 2L)
)

list(
class = class,
use_arrow_metadata = use_arrow_metadata,
write_arrow_metadata = write_arrow_metadata
write_arrow_metadata = write_arrow_metadata,
write_data_page_version = as.integer(write_data_page_version)
)
}
6 changes: 5 additions & 1 deletion man/parquet_options.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

132 changes: 97 additions & 35 deletions src/lib/ParquetOutFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,30 +380,36 @@ size_t ParquetOutFile::compress(
parquet::CompressionCodec::type codec,
ByteBuffer &src,
uint32_t src_size,
ByteBuffer &tgt) {
ByteBuffer &tgt,
uint32_t skip) {

if (codec == CompressionCodec::SNAPPY) {
size_t tgt_size_est = snappy::MaxCompressedLength(src_size);
tgt.reset(tgt_size_est);
size_t tgt_size_est = snappy::MaxCompressedLength(src_size - skip);
tgt.reset(tgt_size_est + skip);
if (skip > 0) memcpy(tgt.ptr, src.ptr, skip);
size_t tgt_size;
snappy::RawCompress(src.ptr, src_size, tgt.ptr, &tgt_size);
return tgt_size;
snappy::RawCompress(src.ptr + skip, src_size - skip, tgt.ptr + skip, &tgt_size);
return tgt_size + skip;

} else if (codec == CompressionCodec::GZIP) {
miniz::MiniZStream mzs;
size_t tgt_size_est = mzs.MaxCompressedLength(src_size);
tgt.reset(tgt_size_est);
size_t tgt_size_est = mzs.MaxCompressedLength(src_size - skip);
tgt.reset(tgt_size_est + skip);
if (skip > 0) memcpy(tgt.ptr, src.ptr, skip);
size_t tgt_size = tgt_size_est;
// throws on error
mzs.Compress(src.ptr, src_size, tgt.ptr, &tgt_size);
return tgt_size;
mzs.Compress(src.ptr + skip, src_size - skip, tgt.ptr + skip, &tgt_size);
return tgt_size + skip;

} else if (codec == CompressionCodec::ZSTD) {
size_t tgt_size_est = zstd::ZSTD_compressBound(src_size);
size_t tgt_size_est = zstd::ZSTD_compressBound(src_size - skip);
tgt.reset(tgt_size_est);
if (skip > 0) memcpy(tgt.ptr, src.ptr, skip);
size_t tgt_size = zstd::ZSTD_compress(
tgt.ptr,
tgt.ptr + skip,
tgt_size_est,
src.ptr,
src_size,
src.ptr + skip,
src_size - skip,
ZSTD_CLEVEL_DEFAULT
);
if (zstd::ZSTD_isError(tgt_size)) {
Expand All @@ -412,7 +418,8 @@ size_t ParquetOutFile::compress(
<< __LINE__;
throw runtime_error(ss.str());
}
return tgt_size;
return tgt_size + skip;

} else {
std::stringstream ss;
ss << "Unsupported Parquet compression codec: " << codec;
Expand Down Expand Up @@ -605,14 +612,31 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
ColumnMetaData *cmd = &(column_meta_data[idx]);
SchemaElement se = schemas[idx + 1];
PageHeader ph;
ph.__set_type(PageType::DATA_PAGE);
DataPageHeader dph;
dph.__set_num_values(until - from);
dph.__set_encoding(encodings[idx]);
if (se.repetition_type == FieldRepetitionType::OPTIONAL) {
dph.__set_definition_level_encoding(Encoding::RLE);
DataPageHeaderV2 dph2;
if (data_page_version == 1) {
ph.__set_type(PageType::DATA_PAGE);
DataPageHeader dph;
dph.__set_num_values(until - from);
dph.__set_encoding(encodings[idx]);
if (se.repetition_type == FieldRepetitionType::OPTIONAL) {
dph.__set_definition_level_encoding(Encoding::RLE);
}
// for version 1 we can set it here
ph.__set_data_page_header(dph);
} else if (data_page_version == 2) {
ph.__set_type(PageType::DATA_PAGE_V2);
dph2.__set_num_values(until - from);
dph2.__set_num_rows(until - from);
dph2.__set_encoding(encodings[idx]);
// these might be overwritten later if there are NAs
dph2.__set_num_nulls(0);
dph2.__set_definition_levels_byte_length(0);
dph2.__set_repetition_levels_byte_length(0);
// for version 2 we need to set the header after we
// set the remaining fields
} else {
throw runtime_error("Invalid data page version");
}
ph.__set_data_page_header(dph);

if (se.repetition_type == FieldRepetitionType::REQUIRED &&
encodings[idx] == Encoding::PLAIN &&
Expand All @@ -624,6 +648,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
);
ph.__set_uncompressed_page_size(data_size);
ph.__set_compressed_page_size(data_size);
if (data_page_version == 2) {
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
write_data_(pfile, idx, data_size, from, until);

Expand All @@ -646,6 +673,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,

// 3. write buf_com to file
ph.__set_compressed_page_size(cdata_size);
if (data_page_version == 2) {
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char *) buf_com.ptr, cdata_size);

Expand Down Expand Up @@ -674,6 +704,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
// 3. write buf_com to file
ph.__set_uncompressed_page_size(rle_size);
ph.__set_compressed_page_size(rle_size);
if (data_page_version == 2) {
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char*) buf_com.ptr, rle_size);
cmd->__set_total_uncompressed_size(
Expand Down Expand Up @@ -710,6 +743,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
// 4. write buf_unc to file
ph.__set_uncompressed_page_size(rle_size);
ph.__set_compressed_page_size(crle_size);
if (data_page_version == 2) {
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char*) buf_unc.ptr, crle_size);
cmd->__set_total_uncompressed_size(
Expand All @@ -734,10 +770,18 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
uint32_t data_size = calculate_column_data_size(
idx, num_present, from, until
);
ph.__set_uncompressed_page_size(data_size + rle_size + 4);
ph.__set_compressed_page_size(data_size + rle_size + 4);
int prep_length = data_page_version == 1 ? 4 : 0;
ph.__set_uncompressed_page_size(data_size + rle_size + prep_length);
ph.__set_compressed_page_size(data_size + rle_size + prep_length);
if (data_page_version == 2) {
dph2.__set_num_nulls(until - from - num_present);
dph2.__set_definition_levels_byte_length(rle_size);
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char*) &rle_size, 4);
if (data_page_version == 1) {
pfile.write((const char*) &rle_size, 4);
}
pfile.write((const char*) buf_com.ptr, rle_size);
cmd->__set_total_uncompressed_size(
cmd->total_uncompressed_size + rle_size + 4
Expand All @@ -762,9 +806,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
buf_unc,
until - from,
buf_com,
1, // bit_width
false, // add_bit_width
true // add_size
1, // bit_width
false, // add_bit_width
data_page_version == 1 // add_size
);

// 3. Append data to buf_com
Expand All @@ -778,11 +822,18 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
write_present_data_(*os1, idx, data_size, num_present, from, until);

// 4. compress buf_com to buf_unc
size_t comp_size = compress(cmd->codec, buf_com, rle_size + data_size, buf_unc);
// for data page v2, the def levels are not compressed!
uint32_t skip = data_page_version == 1 ? 0 : rle_size;
size_t comp_size = compress(cmd->codec, buf_com, rle_size + data_size, buf_unc, skip);

// 5. write buf_unc to file
ph.__set_uncompressed_page_size(rle_size + data_size);
ph.__set_compressed_page_size(comp_size);
if (data_page_version == 2) {
dph2.__set_num_nulls(until - from - num_present);
dph2.__set_definition_levels_byte_length(rle_size);
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char*) buf_unc.ptr, comp_size);
cmd->__set_total_uncompressed_size(
Expand All @@ -805,9 +856,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
buf_unc,
until - from,
buf_com,
1, // bit_width
false, // add_bit_width
true // add_size
1, // bit_width
false, // add_bit_width
data_page_version == 1 // add_size
);

// 3. write dictionaery indices to buf_unc
Expand All @@ -833,6 +884,11 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
// 5. write buf_com to file
ph.__set_uncompressed_page_size(rle2_size);
ph.__set_compressed_page_size(rle2_size);
if (data_page_version == 2) {
dph2.__set_num_nulls(until - from - num_present);
dph2.__set_definition_levels_byte_length(rle_size);
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char*) buf_com.ptr, rle2_size);
cmd->__set_total_uncompressed_size(
Expand All @@ -855,9 +911,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
buf_unc,
until - from,
buf_com,
1, // bit_width
false, // add_bit_width
true // add_size
1, // bit_width
false, // add_bit_width
data_page_version == 1 // add_size
);

// 3. write dictionaery indices to buf_unc
Expand All @@ -881,11 +937,17 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
);

// 5. compress buf_com to buf_unc
size_t crle2_size = compress(cmd->codec, buf_com, rle2_size, buf_unc);
uint32_t skip = data_page_version == 1 ? 0 : rle_size;
size_t crle2_size = compress(cmd->codec, buf_com, rle2_size, buf_unc, skip);

// 6. write buf_unc to file
ph.__set_uncompressed_page_size(rle2_size);
ph.__set_compressed_page_size(crle2_size);
if (data_page_version == 2) {
dph2.__set_num_nulls(until - from - num_present);
dph2.__set_definition_levels_byte_length(rle_size);
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char*) buf_unc.ptr, crle2_size);
cmd->__set_total_uncompressed_size(
Expand Down
Binary file modified src/lib/ParquetOutFile.h
Binary file not shown.
18 changes: 18 additions & 0 deletions src/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2327,6 +2327,20 @@ void RParquetOutFile::write(

extern "C" {

static SEXP get_list_element(SEXP list, const char *str) {
SEXP elmt = R_NilValue;
SEXP names = PROTECT(Rf_getAttrib(list, R_NamesSymbol));

for (R_xlen_t i = 0; i < Rf_xlength(list); i++) {
if (strcmp(CHAR(STRING_ELT(names, i)), str) == 0) {
elmt = VECTOR_ELT(list, i);
break;
}
}
UNPROTECT(1);
return elmt;
}

SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression,
SEXP metadata, SEXP required, SEXP options,
SEXP schema, SEXP encoding) {
Expand Down Expand Up @@ -2357,18 +2371,22 @@ SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression,
break;
}

int dp_ver = INTEGER(get_list_element(options, "write_data_page_version"))[0];

std::string fname = (char *)CHAR(STRING_ELT(filesxp, 0));
if (fname == ":raw:") {
MemStream ms;
std::ostream &os = ms.stream();
RParquetOutFile of(os, codec);
of.data_page_version = dp_ver;
of.write(dfsxp, dim, metadata, required, options, schema, encoding);
R_xlen_t bufsize = ms.size();
SEXP res = Rf_allocVector(RAWSXP, bufsize);
ms.copy(RAW(res), bufsize);
return res;
} else {
RParquetOutFile of(fname, codec);
of.data_page_version = dp_ver;
of.write(dfsxp, dim, metadata, required, options, schema, encoding);
return R_NilValue;
}
Expand Down
Loading

0 comments on commit 6d2823d

Please sign in to comment.