Skip to content

Commit

Permalink
Fix writing dict pages into multiple row groups
Browse files Browse the repository at this point in the history
Write local dicitonaries, expect for factors.
  • Loading branch information
gaborcsardi committed Sep 14, 2024
1 parent f81aea6 commit 4256242
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 54 deletions.
4 changes: 2 additions & 2 deletions R/porcelain.R
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ dict_encode <- function(x, n = length(x)) {
.Call(nanoparquet_create_dict, x, n)
}

dict_encode_idx <- function(x) {
.Call(nanoparquet_create_dict_idx, x, sys.call())
dict_encode_idx <- function(x, from = 1L, until = length(x) + 1L ) {
.Call(nanoparquet_create_dict_idx, x, from - 1L, until - 1L, sys.call())

Check warning on line 245 in R/porcelain.R

View check run for this annotation

Codecov / codecov/patch

R/porcelain.R#L245

Added line #L245 was not covered by tests
}

lgl_avg_run_length <- function(x, n = length(x)) {
Expand Down
36 changes: 26 additions & 10 deletions src/dictionary-encoding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,28 @@ SEXP nanoparquet_create_dict(SEXP x, SEXP rlen) {
return Rf_ScalarInteger(dictlen);
}

SEXP nanoparquet_create_dict_idx_(SEXP x) {
R_xlen_t dictlen, len = Rf_xlength(x);
SEXP nanoparquet_create_dict_idx_(SEXP x, SEXP from, SEXP until) {
int64_t cfrom = INTEGER(from)[0];
int64_t cuntil = INTEGER(until)[0];
int64_t len = cuntil - cfrom;
R_xlen_t dictlen;

SEXP idx = PROTECT(Rf_allocVector(INTSXP, len));
SEXP dict = PROTECT(Rf_allocVector(INTSXP, len));
int *idict = INTEGER(dict);
int *iidx = INTEGER(idx);
switch (TYPEOF(x)) {
case LGLSXP:
dictlen = create_dict_idx<int>(LOGICAL(x), iidx, idict, len, NA_LOGICAL);
dictlen = create_dict_idx<int>(LOGICAL(x) + cfrom, iidx, idict, len, NA_LOGICAL);
break;
case INTSXP:
dictlen = create_dict_idx<int>(INTEGER(x), idict, iidx, len, NA_INTEGER);
dictlen = create_dict_idx<int>(INTEGER(x) + cfrom, idict, iidx, len, NA_INTEGER);
break;
case REALSXP:
dictlen = create_dict_real_idx(REAL(x), idict, iidx, len);
dictlen = create_dict_real_idx(REAL(x) + cfrom, idict, iidx, len);
break;
case STRSXP: {
dictlen = create_dict_ptr_idx((void**)STRING_PTR_RO(x),idict, iidx, len, (void*) NA_STRING);
dictlen = create_dict_ptr_idx((void**)(STRING_PTR_RO(x) + cfrom), idict, iidx, len, (void*) NA_STRING);
break;
}
default:
Expand All @@ -213,19 +216,32 @@ SEXP nanoparquet_create_dict_idx_(SEXP x) {
return res;
}

struct nanoparquet_create_dict_idx_data {
SEXP data;
SEXP from;
SEXP until;
};

inline SEXP nanoparquet_create_dict_idx_wrapper(void *data) {
SEXP x = (SEXP) data;
return nanoparquet_create_dict_idx_(x);
struct nanoparquet_create_dict_idx_data *rdata =

Check warning on line 226 in src/dictionary-encoding.cpp

View check run for this annotation

Codecov / codecov/patch

src/dictionary-encoding.cpp#L226

Added line #L226 was not covered by tests
(struct nanoparquet_create_dict_idx_data*) data;
return nanoparquet_create_dict_idx_(

Check warning on line 228 in src/dictionary-encoding.cpp

View check run for this annotation

Codecov / codecov/patch

src/dictionary-encoding.cpp#L228

Added line #L228 was not covered by tests
rdata->data,
rdata->from,
rdata->until
);

Check warning on line 232 in src/dictionary-encoding.cpp

View check run for this annotation

Codecov / codecov/patch

src/dictionary-encoding.cpp#L232

Added line #L232 was not covered by tests
}

SEXP nanoparquet_create_dict_idx(SEXP x, SEXP call) {
SEXP nanoparquet_create_dict_idx(SEXP x, SEXP from, SEXP until, SEXP call) {

Check warning on line 235 in src/dictionary-encoding.cpp

View check run for this annotation

Codecov / codecov/patch

src/dictionary-encoding.cpp#L235

Added line #L235 was not covered by tests

struct nanoparquet_create_dict_idx_data data = { x, from, until };

Check warning on line 237 in src/dictionary-encoding.cpp

View check run for this annotation

Codecov / codecov/patch

src/dictionary-encoding.cpp#L237

Added line #L237 was not covered by tests

SEXP uwt = PROTECT(R_MakeUnwindCont());
R_API_START(call);

SEXP ret = R_UnwindProtect(
nanoparquet_create_dict_idx_wrapper,
(void*) x,
&data,
throw_error,
&uwt,
uwt
Expand Down
21 changes: 14 additions & 7 deletions src/lib/ParquetOutFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,14 @@ void ParquetOutFile::write_dictionary_indices_(
std::ostream &file,
uint32_t idx,
uint32_t size,
uint64_t from,
uint64_t until) {
int64_t rg_from,
int64_t rg_until,
uint64_t page_from,
uint64_t page_until) {

streampos start = file.tellp();
write_dictionary_indices(file, idx, from, until);
write_dictionary_indices(file, idx, rg_from, rg_until,
page_from, page_until);
streampos end = file.tellp();
if (end - start != size) {
throw runtime_error(
Expand Down Expand Up @@ -728,7 +731,8 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from,
buf_unc.reset(data_size);
std::unique_ptr<std::ostream> os0 =
std::unique_ptr<std::ostream>(new std::ostream(&buf_unc));
write_dictionary_indices_(*os0, idx, data_size, page_from, page_until);
write_dictionary_indices_(*os0, idx, data_size, rg_from, rg_until,
page_from, page_until);

// 2. RLE encode buf_unc to buf_com
uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until);
Expand Down Expand Up @@ -763,7 +767,8 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from,
buf_unc.reset(data_size);
std::unique_ptr<std::ostream> os0 =
std::unique_ptr<std::ostream>(new std::ostream(&buf_unc));
write_dictionary_indices_(*os0, idx, data_size, page_from, page_until);
write_dictionary_indices_(*os0, idx, data_size, rg_from, rg_until,
page_from, page_until);

// 2. RLE encode buf_unc to buf_com
uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until);
Expand Down Expand Up @@ -906,7 +911,8 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from,
buf_unc.reset(data_size);
std::unique_ptr<std::ostream> os0 =
std::unique_ptr<std::ostream>(new std::ostream(&buf_unc));
write_dictionary_indices_(*os0, idx, data_size, page_from, page_until);
write_dictionary_indices_(*os0, idx, data_size, rg_from, rg_until,
page_from, page_until);

// 4. append RLE buf_unc to buf_com
uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until);
Expand Down Expand Up @@ -961,7 +967,8 @@ void ParquetOutFile::write_data_page(uint32_t idx, int64_t rg_from,
buf_unc.reset(data_size);
std::unique_ptr<std::ostream> os0 =
std::unique_ptr<std::ostream>(new std::ostream(&buf_unc));
write_dictionary_indices_(*os0, idx, data_size, page_from, page_until);
write_dictionary_indices_(*os0, idx, data_size, rg_from, rg_until,
page_from, page_until);

// 4. append RLE buf_unc to buf_com
uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until);
Expand Down
9 changes: 6 additions & 3 deletions src/lib/ParquetOutFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ class ParquetOutFile {
int64_t until) = 0;
// Needs to write indices as int32_t
virtual void write_dictionary_indices(std::ostream &file, uint32_t idx,
uint64_t from, uint64_t until) = 0;
int64_t rg_from, int64_t rg_until,
uint64_t page_from,
uint64_t page_until) = 0;

int data_page_version = 1;

Expand Down Expand Up @@ -126,8 +128,9 @@ class ParquetOutFile {
parquet::SchemaElement &sel, int64_t from,
int64_t until);
void write_dictionary_indices_(std::ostream &file, uint32_t idx,
uint32_t size, uint64_t from,
uint64_t until);
uint32_t size, int64_t rg_from,
int64_t rg_ti, uint64_t page_from,
uint64_t page_until);

size_t compress(parquet::CompressionCodec::type codec,
ByteBuffer &src, uint32_t src_size, ByteBuffer &tgt,
Expand Down
4 changes: 2 additions & 2 deletions src/rwrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ SEXP nanoparquet_unpack_bits_int32(SEXP x, SEXP bit_width, SEXP n);
SEXP nanoparquet_pack_bits_int32(SEXP x, SEXP bit_width);

SEXP nanoparquet_create_dict(SEXP x, SEXP l);
SEXP nanoparquet_create_dict_idx(SEXP x, SEXP call);
SEXP nanoparquet_create_dict_idx(SEXP x, SEXP from, SEXP until, SEXP call);
SEXP nanoparquet_avg_run_length(SEXP x, SEXP len);

SEXP nanoparquet_base64_decode(SEXP x);
Expand Down Expand Up @@ -110,7 +110,7 @@ static const R_CallMethodDef R_CallDef[] = {
CALLDEF(nanoparquet_unpack_bits_int32, 3),
CALLDEF(nanoparquet_pack_bits_int32, 2),
CALLDEF(nanoparquet_create_dict, 2),
CALLDEF(nanoparquet_create_dict_idx, 2),
CALLDEF(nanoparquet_create_dict_idx, 4),
CALLDEF(nanoparquet_avg_run_length, 2),
CALLDEF(nanoparquet_base64_decode, 1),
CALLDEF(nanoparquet_base64_encode, 1),
Expand Down
70 changes: 40 additions & 30 deletions src/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ static uint16_t double_to_float16(double x) {

extern "C" {
SEXP nanoparquet_create_dict(SEXP x, SEXP rlen);
SEXP nanoparquet_create_dict_idx_(SEXP x);
SEXP nanoparquet_create_dict_idx_(SEXP x, SEXP from, SEXP until);
SEXP nanoparquet_avg_run_length(SEXP x, SEXP rlen);
}

Expand Down Expand Up @@ -143,7 +143,8 @@ class RParquetOutFile : public ParquetOutFile {
parquet::SchemaElement &sel, int64_t from,
int64_t until);
void write_dictionary_indices(std::ostream &file, uint32_t idx,
uint64_t from, uint64_t until);
int64_t rg_from, int64_t rg_until,
uint64_t page_from, uint64_t page_until);

void write(
SEXP dfsxp,
Expand All @@ -159,9 +160,10 @@ class RParquetOutFile : public ParquetOutFile {
SEXP df = R_NilValue;
SEXP required = R_NilValue;
SEXP dicts = R_NilValue;
SEXP dicts_from = R_NilValue;
ByteBuffer present;

void create_dictionary(uint32_t idx);
void create_dictionary(uint32_t idx, int64_t from, int64_t until);
// for LGLSXP this mean RLE encoding
bool should_use_dict_encoding(uint32_t idx);
parquet::Encoding::type
Expand All @@ -182,16 +184,20 @@ RParquetOutFile::RParquetOutFile(
ParquetOutFile(stream, codec, row_groups) {
}

void RParquetOutFile::create_dictionary(uint32_t idx) {
// olny do it once
if (!Rf_isNull(VECTOR_ELT(dicts, idx))) {
void RParquetOutFile::create_dictionary(uint32_t idx, int64_t from,
int64_t until) {
if (!Rf_isNull(VECTOR_ELT(dicts, idx)) &&
INTEGER(dicts_from)[idx] == from) {
return;
}

SEXP col = VECTOR_ELT(df, idx);
SEXP d = PROTECT(nanoparquet_create_dict_idx_(col));
SEXP sfrom = PROTECT(Rf_ScalarInteger(from));
SEXP suntil = PROTECT(Rf_ScalarInteger(until));
SEXP d = PROTECT(nanoparquet_create_dict_idx_(col, sfrom, suntil));
SET_VECTOR_ELT(dicts, idx, d);
UNPROTECT(1);
INTEGER(dicts_from)[idx] = from;
UNPROTECT(3);
}

static const char *enc_[] = {
Expand Down Expand Up @@ -1547,7 +1553,7 @@ uint32_t RParquetOutFile::get_num_values_dictionary(
if (Rf_inherits(col, "factor")) {
return Rf_nlevels(col);
} else {
create_dictionary(idx);
create_dictionary(idx, from, until);
return Rf_length(VECTOR_ELT(VECTOR_ELT(dicts, idx), 0));
}
}
Expand All @@ -1574,7 +1580,7 @@ uint32_t RParquetOutFile::get_size_dictionary(
UNPROTECT(1);
return size;
} else {
create_dictionary(idx);
create_dictionary(idx, from, until);
SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0);
if (type == parquet::Type::INT32) {
return Rf_xlength(dictidx) * sizeof(int);
Expand All @@ -1593,7 +1599,7 @@ uint32_t RParquetOutFile::get_size_dictionary(
break;
}
case REALSXP: {
create_dictionary(idx);
create_dictionary(idx, from, until);
SEXP dict = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0);
if (type == parquet::Type::DOUBLE) {
return Rf_xlength(dict) * sizeof(double);
Expand All @@ -1618,7 +1624,7 @@ uint32_t RParquetOutFile::get_size_dictionary(
}
case STRSXP: {
// need to count the length of the stings that are indexed in dict
create_dictionary(idx);
create_dictionary(idx, from, until);
SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0);
R_xlen_t len = Rf_xlength(dictidx);
bool is_uuid = sel.__isset.logicalType && sel.logicalType.__isset.UUID;
Expand All @@ -1639,7 +1645,7 @@ uint32_t RParquetOutFile::get_size_dictionary(
}
case LGLSXP: {
// this does not happen, no dictionaries for BOOLEAN, makes no sense
create_dictionary(idx); // # nocov
create_dictionary(idx, from, until); // # nocov
SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); // # nocov
R_xlen_t l = Rf_xlength(dictidx); // # nocov
return l / 8 + (l % 8 > 0); // # nocov
Expand Down Expand Up @@ -1684,7 +1690,7 @@ void RParquetOutFile::write_dictionary(
} else {
SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0);
R_xlen_t len = Rf_xlength(dictidx);
int *icol = INTEGER(col);
int *icol = INTEGER(col) + from;
int *iidx = INTEGER(dictidx);
switch (type) {
case parquet::Type::INT32: {
Expand Down Expand Up @@ -1786,7 +1792,7 @@ void RParquetOutFile::write_dictionary(
case REALSXP: {
SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0);
R_xlen_t len = Rf_xlength(dictidx);
double *icol = REAL(col);
double *icol = REAL(col) + from;
int *iidx = INTEGER(dictidx);
if (Rf_inherits(col, "POSIXct")) {
if (type != parquet::Type::INT64) {
Expand Down Expand Up @@ -2049,7 +2055,7 @@ void RParquetOutFile::write_dictionary(
R_xlen_t len = Rf_xlength(dictidx);
int *iidx = INTEGER(dictidx);
for (uint64_t i = 0; i < len; i++) {
const char *c = CHAR(STRING_ELT(col, iidx[i]));
const char *c = CHAR(STRING_ELT(col, from + iidx[i]));
uint32_t len1 = strlen(c);
file.write((const char *)&len1, 4);
file.write(c, len1);
Expand All @@ -2064,7 +2070,7 @@ void RParquetOutFile::write_dictionary(
R_xlen_t len = Rf_xlength(dictidx);
int *iidx = INTEGER(dictidx);
for (uint64_t i = 0; i < len; i++) {
const char *c = CHAR(STRING_ELT(col, iidx[i]));
const char *c = CHAR(STRING_ELT(col, from + iidx[i]));
if (!parse_uuid(c, u, tmp)) {
Rf_errorcall(
nanoparquet_call,
Expand All @@ -2078,7 +2084,7 @@ void RParquetOutFile::write_dictionary(
R_xlen_t len = Rf_xlength(dictidx);
int *iidx = INTEGER(dictidx);
for (uint64_t i = 0; i < len; i++) {
const char *c = CHAR(STRING_ELT(col, iidx[i]));
const char *c = CHAR(STRING_ELT(col, from + iidx[i]));
uint32_t len1 = strlen(c);
if (len1 != sel.type_length) {
Rf_errorcall(
Expand Down Expand Up @@ -2113,7 +2119,7 @@ void RParquetOutFile::write_dictionary(
SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0);
R_xlen_t len = Rf_xlength(dictidx);
SEXP dict = PROTECT(Rf_allocVector(LGLSXP, len));
int *icol = LOGICAL(col);
int *icol = LOGICAL(col) + from;
int *iidx = INTEGER(dictidx);
int *idict = LOGICAL(dict);
for (auto i = 0; i < len; i++) {
Expand All @@ -2131,18 +2137,18 @@ void RParquetOutFile::write_dictionary(
void RParquetOutFile::write_dictionary_indices(
std::ostream &file,
uint32_t idx,
uint64_t from,
uint64_t until) {
int64_t rg_from,
int64_t rg_until,
uint64_t page_from,
uint64_t page_until) {

// Both all of rg_* and page_* are in absolute coordinates

SEXP col = VECTOR_ELT(df, idx);
if (until > Rf_xlength(col)) {
Rf_errorcall( // # nocov
nanoparquet_call, // # nocov
"Internal nanoparquet error, row index too large"
);
}
if (TYPEOF(col) == INTSXP && Rf_inherits(col, "factor")) {
for (uint64_t i = from; i < until; i++) {
// there is a single dict for a factor, which is used in all row
// groups, so we use absolute coordinates here
for (uint64_t i = page_from; i < page_until; i++) {
int el = INTEGER(col)[i];
if (el != NA_INTEGER) {
el--;
Expand All @@ -2151,7 +2157,10 @@ void RParquetOutFile::write_dictionary_indices(
}
} else {
SEXP dictmap = VECTOR_ELT(VECTOR_ELT(dicts, idx), 1);
for (uint64_t i = from; i < until; i++) {
// there is a separate dict for each row group, so we need to convert
// the absolute page_* coordinates to relative coordinates, starting
// at rg_from
for (uint64_t i = page_from - rg_from; i < page_until - rg_from; i++) {
int el = INTEGER(dictmap)[i];
if (el != NA_INTEGER) {
file.write((const char *) &el, sizeof(int));
Expand Down Expand Up @@ -2392,6 +2401,7 @@ void RParquetOutFile::write(
df = dfsxp;
required = rrequired;
dicts = PROTECT(Rf_allocVector(VECSXP, Rf_length(df)));
dicts_from = PROTECT(Rf_allocVector(INTSXP, Rf_length(df)));
SEXP nms = PROTECT(Rf_getAttrib(dfsxp, R_NamesSymbol));
int *type = INTEGER(VECTOR_ELT(schema, 3));
int *type_length = INTEGER(VECTOR_ELT(schema, 4));
Expand Down Expand Up @@ -2458,7 +2468,7 @@ void RParquetOutFile::write(

ParquetOutFile::write();

UNPROTECT(2);
UNPROTECT(3);
}

extern "C" {
Expand Down
Loading

0 comments on commit 4256242

Please sign in to comment.