Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support writing v2 data pages #83

Merged
merged 5 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
- `list` of `raw` vectors to `BYTE_ARRAY`,
- `list` of `raw` vectors to `FIXED_LEN_BYTE_ARRAY`.

* `write_parquet()` can now write version 2 data pages. The default is
still version 1, but it might change in the future.

* `write_parquet(file = ":raw:")` now works correctly for larger data
frames (#77).

Expand Down
14 changes: 12 additions & 2 deletions R/options.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#' to tell which without using the Arrow metadata.
#' @param write_arrow_metadata Whether to add the Apache Arrow types as
#' metadata to the file [write_parquet()].
#' @param write_data_page_version Data version to write by default.
#' Possible values are 1 and 2. Default is 1.
#'
#' @return List of nanoparquet options.
#'
Expand All @@ -36,15 +38,23 @@
parquet_options <- function(
class = getOption("nanoparquet.class", "tbl"),
use_arrow_metadata = getOption("nanoparquet.use_arrow_metadata", TRUE),
write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE)
write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE),
write_data_page_version = getOption("nanoparquet.write_data_page_version", 1L)
) {
stopifnot(is.character(class))
stopifnot(is_flag(use_arrow_metadata))
stopifnot(is_flag(write_arrow_metadata))
stopifnot(
identical(write_data_page_version, 1) ||
identical(write_data_page_version, 2) ||
identical(write_data_page_version, 1L) ||
identical(write_data_page_version, 2L)
)

list(
class = class,
use_arrow_metadata = use_arrow_metadata,
write_arrow_metadata = write_arrow_metadata
write_arrow_metadata = write_arrow_metadata,
write_data_page_version = as.integer(write_data_page_version)
)
}
6 changes: 5 additions & 1 deletion man/parquet_options.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

132 changes: 97 additions & 35 deletions src/lib/ParquetOutFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,30 +380,36 @@ size_t ParquetOutFile::compress(
parquet::CompressionCodec::type codec,
ByteBuffer &src,
uint32_t src_size,
ByteBuffer &tgt) {
ByteBuffer &tgt,
uint32_t skip) {

if (codec == CompressionCodec::SNAPPY) {
size_t tgt_size_est = snappy::MaxCompressedLength(src_size);
tgt.reset(tgt_size_est);
size_t tgt_size_est = snappy::MaxCompressedLength(src_size - skip);
tgt.reset(tgt_size_est + skip);
if (skip > 0) memcpy(tgt.ptr, src.ptr, skip);
size_t tgt_size;
snappy::RawCompress(src.ptr, src_size, tgt.ptr, &tgt_size);
return tgt_size;
snappy::RawCompress(src.ptr + skip, src_size - skip, tgt.ptr + skip, &tgt_size);
return tgt_size + skip;

} else if (codec == CompressionCodec::GZIP) {
miniz::MiniZStream mzs;
size_t tgt_size_est = mzs.MaxCompressedLength(src_size);
tgt.reset(tgt_size_est);
size_t tgt_size_est = mzs.MaxCompressedLength(src_size - skip);
tgt.reset(tgt_size_est + skip);
if (skip > 0) memcpy(tgt.ptr, src.ptr, skip);
size_t tgt_size = tgt_size_est;
// throws on error
mzs.Compress(src.ptr, src_size, tgt.ptr, &tgt_size);
return tgt_size;
mzs.Compress(src.ptr + skip, src_size - skip, tgt.ptr + skip, &tgt_size);
return tgt_size + skip;

} else if (codec == CompressionCodec::ZSTD) {
size_t tgt_size_est = zstd::ZSTD_compressBound(src_size);
size_t tgt_size_est = zstd::ZSTD_compressBound(src_size - skip);
tgt.reset(tgt_size_est);
if (skip > 0) memcpy(tgt.ptr, src.ptr, skip);
size_t tgt_size = zstd::ZSTD_compress(
tgt.ptr,
tgt.ptr + skip,
tgt_size_est,
src.ptr,
src_size,
src.ptr + skip,
src_size - skip,
ZSTD_CLEVEL_DEFAULT
);
if (zstd::ZSTD_isError(tgt_size)) {
Expand All @@ -412,7 +418,8 @@ size_t ParquetOutFile::compress(
<< __LINE__;
throw runtime_error(ss.str());
}
return tgt_size;
return tgt_size + skip;

} else {
std::stringstream ss;
ss << "Unsupported Parquet compression codec: " << codec;
Expand Down Expand Up @@ -605,14 +612,31 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
ColumnMetaData *cmd = &(column_meta_data[idx]);
SchemaElement se = schemas[idx + 1];
PageHeader ph;
ph.__set_type(PageType::DATA_PAGE);
DataPageHeader dph;
dph.__set_num_values(until - from);
dph.__set_encoding(encodings[idx]);
if (se.repetition_type == FieldRepetitionType::OPTIONAL) {
dph.__set_definition_level_encoding(Encoding::RLE);
DataPageHeaderV2 dph2;
if (data_page_version == 1) {
ph.__set_type(PageType::DATA_PAGE);
DataPageHeader dph;
dph.__set_num_values(until - from);
dph.__set_encoding(encodings[idx]);
if (se.repetition_type == FieldRepetitionType::OPTIONAL) {
dph.__set_definition_level_encoding(Encoding::RLE);
}
// for version 1 we can set it here
ph.__set_data_page_header(dph);
} else if (data_page_version == 2) {
ph.__set_type(PageType::DATA_PAGE_V2);
dph2.__set_num_values(until - from);
dph2.__set_num_rows(until - from);
dph2.__set_encoding(encodings[idx]);
// these might be overwritten later if there are NAs
dph2.__set_num_nulls(0);
dph2.__set_definition_levels_byte_length(0);
dph2.__set_repetition_levels_byte_length(0);
// for version 2 we need to set the header after we
// set the remaining fields
} else {
throw runtime_error("Invalid data page version");
}
ph.__set_data_page_header(dph);

if (se.repetition_type == FieldRepetitionType::REQUIRED &&
encodings[idx] == Encoding::PLAIN &&
Expand All @@ -624,6 +648,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
);
ph.__set_uncompressed_page_size(data_size);
ph.__set_compressed_page_size(data_size);
if (data_page_version == 2) {
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
write_data_(pfile, idx, data_size, from, until);

Expand All @@ -646,6 +673,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,

// 3. write buf_com to file
ph.__set_compressed_page_size(cdata_size);
if (data_page_version == 2) {
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char *) buf_com.ptr, cdata_size);

Expand Down Expand Up @@ -674,6 +704,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
// 3. write buf_com to file
ph.__set_uncompressed_page_size(rle_size);
ph.__set_compressed_page_size(rle_size);
if (data_page_version == 2) {
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char*) buf_com.ptr, rle_size);
cmd->__set_total_uncompressed_size(
Expand Down Expand Up @@ -710,6 +743,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
// 4. write buf_unc to file
ph.__set_uncompressed_page_size(rle_size);
ph.__set_compressed_page_size(crle_size);
if (data_page_version == 2) {
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char*) buf_unc.ptr, crle_size);
cmd->__set_total_uncompressed_size(
Expand All @@ -734,10 +770,18 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
uint32_t data_size = calculate_column_data_size(
idx, num_present, from, until
);
ph.__set_uncompressed_page_size(data_size + rle_size + 4);
ph.__set_compressed_page_size(data_size + rle_size + 4);
int prep_length = data_page_version == 1 ? 4 : 0;
ph.__set_uncompressed_page_size(data_size + rle_size + prep_length);
ph.__set_compressed_page_size(data_size + rle_size + prep_length);
if (data_page_version == 2) {
dph2.__set_num_nulls(until - from - num_present);
dph2.__set_definition_levels_byte_length(rle_size);
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char*) &rle_size, 4);
if (data_page_version == 1) {
pfile.write((const char*) &rle_size, 4);
}
pfile.write((const char*) buf_com.ptr, rle_size);
cmd->__set_total_uncompressed_size(
cmd->total_uncompressed_size + rle_size + 4
Expand All @@ -762,9 +806,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
buf_unc,
until - from,
buf_com,
1, // bit_width
false, // add_bit_width
true // add_size
1, // bit_width
false, // add_bit_width
data_page_version == 1 // add_size
);

// 3. Append data to buf_com
Expand All @@ -778,11 +822,18 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
write_present_data_(*os1, idx, data_size, num_present, from, until);

// 4. compress buf_com to buf_unc
size_t comp_size = compress(cmd->codec, buf_com, rle_size + data_size, buf_unc);
// for data page v2, the def levels are not compressed!
uint32_t skip = data_page_version == 1 ? 0 : rle_size;
size_t comp_size = compress(cmd->codec, buf_com, rle_size + data_size, buf_unc, skip);

// 5. write buf_unc to file
ph.__set_uncompressed_page_size(rle_size + data_size);
ph.__set_compressed_page_size(comp_size);
if (data_page_version == 2) {
dph2.__set_num_nulls(until - from - num_present);
dph2.__set_definition_levels_byte_length(rle_size);
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char*) buf_unc.ptr, comp_size);
cmd->__set_total_uncompressed_size(
Expand All @@ -805,9 +856,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
buf_unc,
until - from,
buf_com,
1, // bit_width
false, // add_bit_width
true // add_size
1, // bit_width
false, // add_bit_width
data_page_version == 1 // add_size
);

// 3. write dictionaery indices to buf_unc
Expand All @@ -833,6 +884,11 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
// 5. write buf_com to file
ph.__set_uncompressed_page_size(rle2_size);
ph.__set_compressed_page_size(rle2_size);
if (data_page_version == 2) {
dph2.__set_num_nulls(until - from - num_present);
dph2.__set_definition_levels_byte_length(rle_size);
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char*) buf_com.ptr, rle2_size);
cmd->__set_total_uncompressed_size(
Expand All @@ -855,9 +911,9 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
buf_unc,
until - from,
buf_com,
1, // bit_width
false, // add_bit_width
true // add_size
1, // bit_width
false, // add_bit_width
data_page_version == 1 // add_size
);

// 3. write dictionaery indices to buf_unc
Expand All @@ -881,11 +937,17 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint64_t from,
);

// 5. compress buf_com to buf_unc
size_t crle2_size = compress(cmd->codec, buf_com, rle2_size, buf_unc);
uint32_t skip = data_page_version == 1 ? 0 : rle_size;
size_t crle2_size = compress(cmd->codec, buf_com, rle2_size, buf_unc, skip);

// 6. write buf_unc to file
ph.__set_uncompressed_page_size(rle2_size);
ph.__set_compressed_page_size(crle2_size);
if (data_page_version == 2) {
dph2.__set_num_nulls(until - from - num_present);
dph2.__set_definition_levels_byte_length(rle_size);
ph.__set_data_page_header_v2(dph2);
}
write_page_header(idx, ph);
pfile.write((const char*) buf_unc.ptr, crle2_size);
cmd->__set_total_uncompressed_size(
Expand Down
Binary file modified src/lib/ParquetOutFile.h
Binary file not shown.
18 changes: 18 additions & 0 deletions src/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2327,6 +2327,20 @@ void RParquetOutFile::write(

extern "C" {

static SEXP get_list_element(SEXP list, const char *str) {
SEXP elmt = R_NilValue;
SEXP names = PROTECT(Rf_getAttrib(list, R_NamesSymbol));

for (R_xlen_t i = 0; i < Rf_xlength(list); i++) {
if (strcmp(CHAR(STRING_ELT(names, i)), str) == 0) {
elmt = VECTOR_ELT(list, i);
break;
}
}
UNPROTECT(1);
return elmt;
}

SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression,
SEXP metadata, SEXP required, SEXP options,
SEXP schema, SEXP encoding) {
Expand Down Expand Up @@ -2357,18 +2371,22 @@ SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression,
break;
}

int dp_ver = INTEGER(get_list_element(options, "write_data_page_version"))[0];

std::string fname = (char *)CHAR(STRING_ELT(filesxp, 0));
if (fname == ":raw:") {
MemStream ms;
std::ostream &os = ms.stream();
RParquetOutFile of(os, codec);
of.data_page_version = dp_ver;
of.write(dfsxp, dim, metadata, required, options, schema, encoding);
R_xlen_t bufsize = ms.size();
SEXP res = Rf_allocVector(RAWSXP, bufsize);
ms.copy(RAW(res), bufsize);
return res;
} else {
RParquetOutFile of(fname, codec);
of.data_page_version = dp_ver;
of.write(dfsxp, dim, metadata, required, options, schema, encoding);
return R_NilValue;
}
Expand Down
Loading
Loading