Skip to content

Commit

Permalink
Fixes for writing min/max for integers
Browse files Browse the repository at this point in the history
- fix double -> int32 conversion
- fixes for NA values
  • Loading branch information
gaborcsardi committed Sep 21, 2024
1 parent d46c91c commit 6262342
Show file tree
Hide file tree
Showing 7 changed files with 407 additions and 74 deletions.
2 changes: 2 additions & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
`%||%` <- function(l, r) if (is.null(l)) r else l

`%&&%` <- function(l, r) if (is.null(l)) NULL else r

is_rcmd_check <- function() {
if (identical(Sys.getenv("NOT_CRAN"), "true")) {
FALSE
Expand Down
58 changes: 44 additions & 14 deletions src/dictionary-encoding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,30 @@ uint64_t create_dict_ptr_idx(void** values, int *dict, int *idx,
return n;
}

uint64_t create_dict_real_idx(double* values, int *dict, int *idx, uint64_t len) {
uint64_t create_dict_real_idx(double* values, int *dict, int *idx,
uint64_t len, double &minval,
double &maxval, bool &hasminmax) {
std::unordered_map<double, int> mm;
mm.reserve(len * 2);
double *begin = values;
double *end = begin + len;
int n = 0;

hasminmax = false;

for (int i = 0; begin < end; begin++, i++) {
if (R_IsNA(*begin)) {
idx[i] = NA_INTEGER;
continue;
}
if (!hasminmax) {
hasminmax = true;
minval = maxval = *begin;
}
auto it = mm.find(*begin);
if (it == mm.end()) {
if (*begin < minval) minval = *begin;
if (*begin > maxval) maxval = *begin;
mm.insert(std::make_pair(*begin, n));
idx[i] = n;
dict[n] = i;
Expand All @@ -123,23 +133,24 @@ uint64_t create_dict_real_idx(double* values, int *dict, int *idx, uint64_t len)

template <typename T>
uint64_t create_dict_idx(T* values, int *dict, int *idx, uint64_t len,
T naval, T &minval, T &maxval) {
T naval, T &minval, T &maxval, bool &hasminmax) {
std::unordered_map<T, int> mm;
mm.reserve(len * 2);
T *begin = values;
T *end = begin + len;
int n = 0;

if (begin < end) {
minval = *begin;
maxval = *begin;
}
hasminmax = false;

for (int i = 0; begin < end; begin++, i++) {
if (*begin == naval) {
idx[i] = NA_INTEGER;
continue;
}
if (!hasminmax) {
hasminmax = true;
minval = maxval = *begin;
}
auto it = mm.find(*begin);
if (it == mm.end()) {
if (*begin < minval) minval = *begin;
Expand Down Expand Up @@ -194,31 +205,50 @@ SEXP nanoparquet_create_dict_idx_(SEXP x, SEXP from, SEXP until) {
int *idict = INTEGER(dict);
int *iidx = INTEGER(idx);
int imin, imax;
double dmin, dmax;
bool hasminmax = false;
switch (TYPEOF(x)) {
case LGLSXP:
dictlen = create_dict_idx<int>(LOGICAL(x) + cfrom, iidx, idict, len, NA_LOGICAL, imin, imax);
dictlen = create_dict_idx<int>(
LOGICAL(x) + cfrom, iidx, idict, len, NA_LOGICAL,
imin, imax, hasminmax
);
break;
case INTSXP:
dictlen = create_dict_idx<int>(INTEGER(x) + cfrom, idict, iidx, len, NA_INTEGER, imin, imax);
dictlen = create_dict_idx<int>(
INTEGER(x) + cfrom, idict, iidx, len, NA_INTEGER,
imin, imax, hasminmax
);
break;
case REALSXP:
dictlen = create_dict_real_idx(REAL(x) + cfrom, idict, iidx, len);
dictlen = create_dict_real_idx(
REAL(x) + cfrom, idict, iidx, len,
dmin, dmax, hasminmax
);
break;
case STRSXP: {
dictlen = create_dict_ptr_idx((void**)(STRING_PTR_RO(x) + cfrom), idict, iidx, len, (void*) NA_STRING);
dictlen = create_dict_ptr_idx(
(void**)(STRING_PTR_RO(x) + cfrom), idict, iidx, len,
(void*) NA_STRING
);
break;
}
default:
Rf_error("Cannot create dictionary for this type");
break;
}

SEXP res = PROTECT(Rf_allocVector(VECSXP, TYPEOF(x) == INTSXP ? 4 : 2));
SEXP res = PROTECT(Rf_allocVector(VECSXP, hasminmax ? 4 : 2));
SET_VECTOR_ELT(res, 0, dict);
SET_VECTOR_ELT(res, 1, idx);
if (TYPEOF(x) == INTSXP) {
SET_VECTOR_ELT(res, 2, Rf_ScalarInteger(imin));
SET_VECTOR_ELT(res, 3, Rf_ScalarInteger(imax));
if (hasminmax) {
if (TYPEOF(x) == INTSXP) {
SET_VECTOR_ELT(res, 2, Rf_ScalarInteger(imin));
SET_VECTOR_ELT(res, 3, Rf_ScalarInteger(imax));
} else if (TYPEOF(x) == REALSXP) {
SET_VECTOR_ELT(res, 2, Rf_ScalarReal(dmin));
SET_VECTOR_ELT(res, 3, Rf_ScalarReal(dmax));
}
}

if (dictlen < len) {
Expand Down
12 changes: 6 additions & 6 deletions src/lib/ParquetOutFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ void ParquetOutFile::write_dictionary_page(uint32_t idx, int64_t from,
// Uncompresed size of the dictionary in bytes
uint32_t dict_size = get_size_dictionary(idx, se, from, until);
// Number of entries in the dicitonary
uint32_t num_dict_values = get_num_values_dictionary(idx, from, until);
uint32_t num_dict_values = get_num_values_dictionary(idx, se, from, until);

// Init page header
PageHeader ph;
Expand Down Expand Up @@ -650,7 +650,7 @@ void ParquetOutFile::write_data_pages(uint32_t idx, uint32_t group,
total_size = calculate_column_data_size(idx, rg_num_rows, from, until);
} else {
// estimate the max RLE length
uint32_t num_values = get_num_values_dictionary(idx, from, until);
uint32_t num_values = get_num_values_dictionary(idx, se, from, until);
uint8_t bit_width = ceil(log2((double) num_values));
total_size = MaxRleBpSizeSimple(rg_num_rows, bit_width);
}
Expand Down Expand Up @@ -778,7 +778,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint32_t group,
page_from, page_until);

// 2. RLE encode buf_unc to buf_com
uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until);
uint32_t num_dict_values = get_num_values_dictionary(idx, se, rg_from, rg_until);
uint8_t bit_width = ceil(log2((double) num_dict_values));
uint32_t rle_size = rle_encode(
buf_unc,
Expand Down Expand Up @@ -814,7 +814,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint32_t group,
page_from, page_until);

// 2. RLE encode buf_unc to buf_com
uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until);
uint32_t num_dict_values = get_num_values_dictionary(idx, se, rg_from, rg_until);
uint8_t bit_width = ceil(log2((double) num_dict_values));
uint32_t rle_size = rle_encode(
buf_unc,
Expand Down Expand Up @@ -962,7 +962,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint32_t group,
page_from, page_until);

// 4. append RLE buf_unc to buf_com
uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until);
uint32_t num_dict_values = get_num_values_dictionary(idx, se, rg_from, rg_until);
uint8_t bit_width = ceil(log2((double) num_dict_values));
uint32_t rle2_size = rle_encode(
buf_unc,
Expand Down Expand Up @@ -1019,7 +1019,7 @@ void ParquetOutFile::write_data_page(uint32_t idx, uint32_t group,
page_from, page_until);

// 4. append RLE buf_unc to buf_com
uint32_t num_dict_values = get_num_values_dictionary(idx, rg_from, rg_until);
uint32_t num_dict_values = get_num_values_dictionary(idx, se, rg_from, rg_until);
uint8_t bit_width = ceil(log2((double) num_dict_values));
uint32_t rle2_size = rle_encode(
buf_unc,
Expand Down
4 changes: 3 additions & 1 deletion src/lib/ParquetOutFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ class ParquetOutFile {
virtual uint32_t get_size_byte_array(uint32_t idx,
uint32_t num_present,
uint64_t from, uint64_t until) = 0;
virtual uint32_t get_num_values_dictionary(uint32_t idx, int64_t from,
virtual uint32_t get_num_values_dictionary(uint32_t idx,
parquet::SchemaElement &sel,
int64_t from,
int64_t until) = 0;
virtual uint32_t get_size_dictionary(uint32_t idx,
parquet::SchemaElement &sel,
Expand Down
80 changes: 64 additions & 16 deletions src/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ class RParquetOutFile : public ParquetOutFile {
uint64_t until);

// for dictionaries
uint32_t get_num_values_dictionary(uint32_t idx, int64_t form,
int64_t until);
uint32_t get_num_values_dictionary(uint32_t idx,
parquet::SchemaElement &sel,
int64_t form, int64_t until);
uint32_t get_size_dictionary(uint32_t idx, parquet::SchemaElement &type,
int64_t from, int64_t until);
void write_dictionary(std::ostream &file, uint32_t idx,
Expand Down Expand Up @@ -185,7 +186,8 @@ class RParquetOutFile : public ParquetOutFile {
std::vector<std::string> max_values;
std::vector<bool> has_minmax_value;

void create_dictionary(uint32_t idx, int64_t from, int64_t until);
void create_dictionary(uint32_t idx, int64_t from, int64_t until,
parquet::SchemaElement &sel);
// for LGLSXP this mean RLE encoding
bool should_use_dict_encoding(uint32_t idx);
parquet::Encoding::type
Expand All @@ -197,6 +199,9 @@ class RParquetOutFile : public ParquetOutFile {
void write_double_int32_time(std::ostream &file, SEXP col, uint32_t idx,
uint64_t from, uint64_t until,
parquet::SchemaElement &sel, double factor);
void write_double_int32(std::ostream &file, SEXP col, uint32_t idx,
uint64_t from, uint64_t until,
parquet::SchemaElement &sel);
};

RParquetOutFile::RParquetOutFile(
Expand All @@ -216,7 +221,8 @@ RParquetOutFile::RParquetOutFile(
}

void RParquetOutFile::create_dictionary(uint32_t idx, int64_t from,
int64_t until) {
int64_t until,
parquet::SchemaElement &sel) {
if (!Rf_isNull(VECTOR_ELT(dicts, idx)) &&
INTEGER(dicts_from)[idx] == from) {
return;
Expand All @@ -233,8 +239,20 @@ void RParquetOutFile::create_dictionary(uint32_t idx, int64_t from,
is_minmax_supported[idx] && Rf_xlength(col) > 0 &&
!Rf_isNull(VECTOR_ELT(d, 2)) && !Rf_isNull(VECTOR_ELT(d, 3))) {
has_minmax_value[idx] = true;
min_values[idx] = std::string((const char*) INTEGER(VECTOR_ELT(d, 2)), sizeof(int32_t));
max_values[idx] = std::string((const char*) INTEGER(VECTOR_ELT(d, 3)), sizeof(int32_t));
if (TYPEOF(VECTOR_ELT(d, 2)) == INTSXP) {
min_values[idx] = std::string((const char*) INTEGER(VECTOR_ELT(d, 2)), sizeof(int32_t));
max_values[idx] = std::string((const char*) INTEGER(VECTOR_ELT(d, 3)), sizeof(int32_t));
} else if (TYPEOF(VECTOR_ELT(d, 2)) == REALSXP) {
if (sel.type == parquet::Type::INT32) {
int32_t min = REAL(VECTOR_ELT(d, 2))[0];
int32_t max = REAL(VECTOR_ELT(d, 3))[0];
min_values[idx] = std::string((const char*) &min, sizeof(int32_t));
max_values[idx] = std::string((const char*) &max, sizeof(int32_t));
} else if (sel.type == parquet::Type::DOUBLE) {
min_values[idx] = std::string((const char*) REAL(VECTOR_ELT(d, 2)), sizeof(double));
max_values[idx] = std::string((const char*) REAL(VECTOR_ELT(d, 3)), sizeof(double));

Check warning on line 253 in src/write.cpp

View check run for this annotation

Codecov / codecov/patch

src/write.cpp#L252-L253

Added lines #L252 - L253 were not covered by tests
}
}
}
}

Expand Down Expand Up @@ -761,16 +779,24 @@ void RParquetOutFile::write_double_int32_time(std::ostream &file, SEXP col,
has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0;
}

void write_double_int32(std::ostream &file, SEXP col, uint32_t idx,
uint64_t from, uint64_t until,
parquet::SchemaElement &sel) {
void RParquetOutFile::write_double_int32(std::ostream &file, SEXP col,
uint32_t idx, uint64_t from,
uint64_t until,
parquet::SchemaElement &sel) {
bool is_signed = TRUE;
int bit_width = 32;
if (sel.__isset.logicalType && sel.logicalType.__isset.INTEGER) {
is_signed = sel.logicalType.INTEGER.isSigned;
bit_width = sel.logicalType.INTEGER.bitWidth;
}
if (is_signed) {
int32_t *min_value = 0, *max_value = 0;
bool minmax = write_minmax_values && is_minmax_supported[idx];
if (minmax && has_minmax_value[idx]) {
min_value = GRAB_MIN(idx, int32_t);
max_value = GRAB_MAX(idx, int32_t);

Check warning on line 797 in src/write.cpp

View check run for this annotation

Codecov / codecov/patch

src/write.cpp#L796-L797

Added lines #L796 - L797 were not covered by tests
}

int32_t min, max;
switch (bit_width) {
case 8:
Expand Down Expand Up @@ -799,9 +825,23 @@ void write_double_int32(std::ostream &file, SEXP col, uint32_t idx,
);
}
int32_t ival = val;
if (minmax && (min_value == 0 || ival < *min_value)) {
SAVE_MIN(idx, ival, int32_t);
}
if (minmax && (max_value == 0 || ival > *max_value)) {
SAVE_MAX(idx, ival, int32_t);
}
file.write((const char *)&ival, sizeof(int32_t));
}
has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0;
} else {
uint32_t *min_value = 0, *max_value = 0;
bool minmax = write_minmax_values && is_minmax_supported[idx];
if (minmax && has_minmax_value[idx]) {
min_value = GRAB_MIN(idx, uint32_t);
max_value = GRAB_MAX(idx, uint32_t);

Check warning on line 842 in src/write.cpp

View check run for this annotation

Codecov / codecov/patch

src/write.cpp#L841-L842

Added lines #L841 - L842 were not covered by tests
}

uint32_t max;
switch (bit_width) {
case 8:
Expand Down Expand Up @@ -838,9 +878,16 @@ void write_double_int32(std::ostream &file, SEXP col, uint32_t idx,
val, idx + 1, i + 1
);
}
int32_t ival = val;
file.write((const char *)&ival, sizeof(int32_t));
uint32_t uival = val;
if (minmax && (min_value == 0 || uival < *min_value)) {
SAVE_MIN(idx, uival, uint32_t);
}
if (minmax && (max_value == 0 || uival > *max_value)) {
SAVE_MAX(idx, uival, uint32_t);
}
file.write((const char *)&uival, sizeof(uint32_t));
}
has_minmax_value[idx] = has_minmax_value[idx] || min_value != 0;
}
}

Expand Down Expand Up @@ -1642,13 +1689,14 @@ void RParquetOutFile::write_present_boolean(

uint32_t RParquetOutFile::get_num_values_dictionary(
uint32_t idx,
parquet::SchemaElement &sel,
int64_t from,
int64_t until) {
SEXP col = VECTOR_ELT(df, idx);
if (Rf_inherits(col, "factor")) {
return Rf_nlevels(col);
} else {
create_dictionary(idx, from, until);
create_dictionary(idx, from, until, sel);
return Rf_length(VECTOR_ELT(VECTOR_ELT(dicts, idx), 0));
}
}
Expand All @@ -1675,7 +1723,7 @@ uint32_t RParquetOutFile::get_size_dictionary(
UNPROTECT(1);
return size;
} else {
create_dictionary(idx, from, until);
create_dictionary(idx, from, until, sel);
SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0);
if (type == parquet::Type::INT32) {
return Rf_xlength(dictidx) * sizeof(int);
Expand All @@ -1694,7 +1742,7 @@ uint32_t RParquetOutFile::get_size_dictionary(
break;
}
case REALSXP: {
create_dictionary(idx, from, until);
create_dictionary(idx, from, until, sel);
SEXP dict = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0);
if (type == parquet::Type::DOUBLE) {
return Rf_xlength(dict) * sizeof(double);
Expand All @@ -1719,7 +1767,7 @@ uint32_t RParquetOutFile::get_size_dictionary(
}
case STRSXP: {
// need to count the length of the stings that are indexed in dict
create_dictionary(idx, from, until);
create_dictionary(idx, from, until, sel);
SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0);
R_xlen_t len = Rf_xlength(dictidx);
bool is_uuid = sel.__isset.logicalType && sel.logicalType.__isset.UUID;
Expand All @@ -1740,7 +1788,7 @@ uint32_t RParquetOutFile::get_size_dictionary(
}
case LGLSXP: {
// this does not happen, no dictionaries for BOOLEAN, makes no sense
create_dictionary(idx, from, until); // # nocov
create_dictionary(idx, from, until, sel); // # nocov
SEXP dictidx = VECTOR_ELT(VECTOR_ELT(dicts, idx), 0); // # nocov
R_xlen_t l = Rf_xlength(dictidx); // # nocov
return l / 8 + (l % 8 > 0); // # nocov
Expand Down
Loading

0 comments on commit 6262342

Please sign in to comment.