Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write multiple row groups #88

Merged
merged 10 commits into from
Sep 14, 2024
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
`write_parquet()`, to specify how the columns of a data frame should
be mapped to Parquet types.

- `write_parquet()` can now write multiple row groups. By default it puts
at most 10 million rows into a single row group. You can choose the
row groups manually with the `row_groups` argument.

- Newly supported type conversions in `write_parquet()` via the
schema argument:

Expand Down
9 changes: 9 additions & 0 deletions R/options.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#' in [read_parquet()]. By default nanoparquet adds the `"tbl"` class,
#' so data frames are printed differently if the pillar package is
#' loaded.
#' @param num_rows_per_row_group The number of rows to put into a row
#' group, if row groups are not specified explicitly. It should be
#' an integer scalar. Defaults to 10 million.
#' @param use_arrow_metadata `TRUE` or `FALSE`. If `TRUE`, then
#' [read_parquet()] and [read_parquet_schema()] will make use of the Apache
#' Arrow metadata to assign R classes to Parquet columns.
Expand Down Expand Up @@ -37,6 +40,7 @@

parquet_options <- function(
class = getOption("nanoparquet.class", "tbl"),
num_rows_per_row_group = getOption("nanoparquet.num_rows_per_row_group", 10000000L),
use_arrow_metadata = getOption("nanoparquet.use_arrow_metadata", TRUE),
write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE),
write_data_page_version = getOption("nanoparquet.write_data_page_version", 1L)
Expand All @@ -50,9 +54,14 @@ parquet_options <- function(
identical(write_data_page_version, 1L) ||
identical(write_data_page_version, 2L)
)
num_rows_per_row_group <- as_count(
num_rows_per_row_group,
"num_rows_per_row_group"
)

list(
class = class,
num_rows_per_row_group = num_rows_per_row_group,
use_arrow_metadata = use_arrow_metadata,
write_arrow_metadata = write_arrow_metadata,
write_data_page_version = as.integer(write_data_page_version)
Expand Down
4 changes: 2 additions & 2 deletions R/porcelain.R
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@
.Call(nanoparquet_create_dict, x, n)
}

dict_encode_idx <- function(x) {
.Call(nanoparquet_create_dict_idx, x, sys.call())
dict_encode_idx <- function(x, from = 1L, until = length(x) + 1L ) {
.Call(nanoparquet_create_dict_idx, x, from - 1L, until - 1L, sys.call())

Check warning on line 245 in R/porcelain.R

View check run for this annotation

Codecov / codecov/patch

R/porcelain.R#L245

Added line #L245 was not covered by tests
}

lgl_avg_run_length <- function(x, n = length(x)) {
Expand Down
15 changes: 15 additions & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ is_string <- function(x) {
is.character(x) && length(x) == 1 && !is.na(x)
}

is_icount <- function(x) {
is.integer(x) && length(x) == 1 && !is.na(x) && x >= 1L
}

is_dcount <- function(x) {
is.double(x) && length(x) == 1 && !is.na(x) && as.integer(x) == x &&
x >= 1
}

as_count <- function(x, name = "x") {
if (is_icount(x)) return(x)
if (is_dcount(x)) return(as.integer(x))
stop(name, " must be a count, i.e. an integer scalar")
}

is_uint32 <- function(x) {
is.numeric(x) && length(x) == 1 && !is.na(x) &&
round(x) == x && x >= 0 && x <= 4294967295
Expand Down
44 changes: 44 additions & 0 deletions R/write-parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
#' @param metadata Additional key-value metadata to add to the file.
#' This must be a named character vector, or a data frame with columns
#' character columns called `key` and `value`.
#' @param row_groups Row groups of the Parquet file. If `NULL`, and `x`
#' is a grouped data frame, then the groups are used as row groups.
#' The rows will be reordered to match groups. If `NULL`, and `x` is
#' not a grouped data frame, then the `num_rows_per_row_group` option is
#' used from the `options` argument, see [parquet_options()]. Otherwise
#' it must be an integer vector, specifying the starts of the row groups.
#' @param options Nanoparquet options, see [parquet_options()].
#' @return `NULL`, unless `file` is `":raw:"`, in which case the Parquet
#' file is returned as a raw vector.
Expand All @@ -62,6 +68,7 @@ write_parquet <- function(
compression = c("snappy", "gzip", "zstd", "uncompressed"),
encoding = NULL,
metadata = NULL,
row_groups = NULL,
options = parquet_options()) {

file <- path.expand(file)
Expand Down Expand Up @@ -148,6 +155,14 @@ write_parquet <- function(

encoding <- parse_encoding(encoding, x)

row_groups <- row_groups %||%
attr(x, "groups") %||%
default_row_groups(x, schema, compression, encoding, options)

row_group_starts <- parse_row_groups(x, row_groups)
x <- row_group_starts[[1]]
row_group_starts <- row_group_starts[[2]]

res <- .Call(
nanoparquet_write,
x,
Expand All @@ -159,6 +174,7 @@ res <- .Call(
options,
schema,
encodings[encoding],
row_group_starts,
sys.call()
)

Expand Down Expand Up @@ -201,3 +217,31 @@ parse_encoding <- function(encoding, x) {
structure(encoding, names = names(x))
}
}

# we should refine this later
default_row_groups <- function(x, schema, compression, encoding, options) {
n <- options[["num_rows_per_row_group"]]
seq(1L, nrow(x), by = n)
}

parse_row_groups <- function(x, rg) {
if (is.data.frame(rg)) {
rg <- rg[[".rows"]]
urg <- unlist(rg)
if (any(diff(urg) != 1)) {
message("Ordering data frame according to row groups.")
x <- x[urg, ]
}
rg <- c(1L, cumsum(lengths(rg)))
rg <- rg[-length(rg)]
} else {
if (!is.integer(rg) || anyNA(rg) || any(rg <= 0) ||
any(diff(rg) <= 0) || rg[1] != 1L) {
stop(
"Row groups must be specified as a growing positive integer ",
"vector, starting with 1."
)
}
}
list(x = x, row_groups = rg)
}
5 changes: 5 additions & 0 deletions man/parquet_options.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions man/write_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 26 additions & 10 deletions src/dictionary-encoding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,28 @@
return Rf_ScalarInteger(dictlen);
}

SEXP nanoparquet_create_dict_idx_(SEXP x) {
R_xlen_t dictlen, len = Rf_xlength(x);
SEXP nanoparquet_create_dict_idx_(SEXP x, SEXP from, SEXP until) {
int64_t cfrom = INTEGER(from)[0];
int64_t cuntil = INTEGER(until)[0];
int64_t len = cuntil - cfrom;
R_xlen_t dictlen;

SEXP idx = PROTECT(Rf_allocVector(INTSXP, len));
SEXP dict = PROTECT(Rf_allocVector(INTSXP, len));
int *idict = INTEGER(dict);
int *iidx = INTEGER(idx);
switch (TYPEOF(x)) {
case LGLSXP:
dictlen = create_dict_idx<int>(LOGICAL(x), iidx, idict, len, NA_LOGICAL);
dictlen = create_dict_idx<int>(LOGICAL(x) + cfrom, iidx, idict, len, NA_LOGICAL);
break;
case INTSXP:
dictlen = create_dict_idx<int>(INTEGER(x), idict, iidx, len, NA_INTEGER);
dictlen = create_dict_idx<int>(INTEGER(x) + cfrom, idict, iidx, len, NA_INTEGER);
break;
case REALSXP:
dictlen = create_dict_real_idx(REAL(x), idict, iidx, len);
dictlen = create_dict_real_idx(REAL(x) + cfrom, idict, iidx, len);
break;
case STRSXP: {
dictlen = create_dict_ptr_idx((void**)STRING_PTR_RO(x),idict, iidx, len, (void*) NA_STRING);
dictlen = create_dict_ptr_idx((void**)(STRING_PTR_RO(x) + cfrom), idict, iidx, len, (void*) NA_STRING);
break;
}
default:
Expand All @@ -213,19 +216,32 @@
return res;
}

struct nanoparquet_create_dict_idx_data {
SEXP data;
SEXP from;
SEXP until;
};

inline SEXP nanoparquet_create_dict_idx_wrapper(void *data) {
SEXP x = (SEXP) data;
return nanoparquet_create_dict_idx_(x);
struct nanoparquet_create_dict_idx_data *rdata =

Check warning on line 226 in src/dictionary-encoding.cpp

View check run for this annotation

Codecov / codecov/patch

src/dictionary-encoding.cpp#L226

Added line #L226 was not covered by tests
(struct nanoparquet_create_dict_idx_data*) data;
return nanoparquet_create_dict_idx_(

Check warning on line 228 in src/dictionary-encoding.cpp

View check run for this annotation

Codecov / codecov/patch

src/dictionary-encoding.cpp#L228

Added line #L228 was not covered by tests
rdata->data,
rdata->from,
rdata->until
);

Check warning on line 232 in src/dictionary-encoding.cpp

View check run for this annotation

Codecov / codecov/patch

src/dictionary-encoding.cpp#L232

Added line #L232 was not covered by tests
}

SEXP nanoparquet_create_dict_idx(SEXP x, SEXP call) {
SEXP nanoparquet_create_dict_idx(SEXP x, SEXP from, SEXP until, SEXP call) {

Check warning on line 235 in src/dictionary-encoding.cpp

View check run for this annotation

Codecov / codecov/patch

src/dictionary-encoding.cpp#L235

Added line #L235 was not covered by tests

struct nanoparquet_create_dict_idx_data data = { x, from, until };

Check warning on line 237 in src/dictionary-encoding.cpp

View check run for this annotation

Codecov / codecov/patch

src/dictionary-encoding.cpp#L237

Added line #L237 was not covered by tests

SEXP uwt = PROTECT(R_MakeUnwindCont());
R_API_START(call);

SEXP ret = R_UnwindProtect(
nanoparquet_create_dict_idx_wrapper,
(void*) x,
&data,
throw_error,
&uwt,
uwt
Expand Down
Loading
Loading