Skip to content

Commit

Permalink
Merge pull request #88 from r-lib/feature/write-row-groups
Browse files Browse the repository at this point in the history
Write multiple row groups
  • Loading branch information
gaborcsardi committed Sep 14, 2024
2 parents c1305e5 + 4256242 commit 9295026
Show file tree
Hide file tree
Showing 18 changed files with 614 additions and 195 deletions.
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
`write_parquet()`, to specify how the columns of a data frame should
be mapped to Parquet types.

- `write_parquet()` can now write multiple row groups. By default it puts
at most 10 million rows into a single row group. You can choose the
row groups manually with the `row_groups` argument.

- Newly supported type conversions in `write_parquet()` via the
schema argument:

Expand Down
9 changes: 9 additions & 0 deletions R/options.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#' in [read_parquet()]. By default nanoparquet adds the `"tbl"` class,
#' so data frames are printed differently if the pillar package is
#' loaded.
#' @param num_rows_per_row_group The number of rows to put into a row
#' group, if row groups are not specified explicitly. It should be
#' an integer scalar. Defaults to 10 million.
#' @param use_arrow_metadata `TRUE` or `FALSE`. If `TRUE`, then
#' [read_parquet()] and [read_parquet_schema()] will make use of the Apache
#' Arrow metadata to assign R classes to Parquet columns.
Expand Down Expand Up @@ -37,6 +40,7 @@

parquet_options <- function(
class = getOption("nanoparquet.class", "tbl"),
num_rows_per_row_group = getOption("nanoparquet.num_rows_per_row_group", 10000000L),
use_arrow_metadata = getOption("nanoparquet.use_arrow_metadata", TRUE),
write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE),
write_data_page_version = getOption("nanoparquet.write_data_page_version", 1L)
Expand All @@ -50,9 +54,14 @@ parquet_options <- function(
identical(write_data_page_version, 1L) ||
identical(write_data_page_version, 2L)
)
num_rows_per_row_group <- as_count(
num_rows_per_row_group,
"num_rows_per_row_group"
)

list(
class = class,
num_rows_per_row_group = num_rows_per_row_group,
use_arrow_metadata = use_arrow_metadata,
write_arrow_metadata = write_arrow_metadata,
write_data_page_version = as.integer(write_data_page_version)
Expand Down
4 changes: 2 additions & 2 deletions R/porcelain.R
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ dict_encode <- function(x, n = length(x)) {
.Call(nanoparquet_create_dict, x, n)
}

dict_encode_idx <- function(x) {
.Call(nanoparquet_create_dict_idx, x, sys.call())
dict_encode_idx <- function(x, from = 1L, until = length(x) + 1L ) {
.Call(nanoparquet_create_dict_idx, x, from - 1L, until - 1L, sys.call())
}

lgl_avg_run_length <- function(x, n = length(x)) {
Expand Down
15 changes: 15 additions & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ is_string <- function(x) {
is.character(x) && length(x) == 1 && !is.na(x)
}

is_icount <- function(x) {
is.integer(x) && length(x) == 1 && !is.na(x) && x >= 1L
}

is_dcount <- function(x) {
is.double(x) && length(x) == 1 && !is.na(x) && as.integer(x) == x &&
x >= 1
}

as_count <- function(x, name = "x") {
if (is_icount(x)) return(x)
if (is_dcount(x)) return(as.integer(x))
stop(name, " must be a count, i.e. an integer scalar")
}

is_uint32 <- function(x) {
is.numeric(x) && length(x) == 1 && !is.na(x) &&
round(x) == x && x >= 0 && x <= 4294967295
Expand Down
44 changes: 44 additions & 0 deletions R/write-parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
#' @param metadata Additional key-value metadata to add to the file.
#' This must be a named character vector, or a data frame with columns
#' character columns called `key` and `value`.
#' @param row_groups Row groups of the Parquet file. If `NULL`, and `x`
#' is a grouped data frame, then the groups are used as row groups.
#' The rows will be reordered to match groups. If `NULL`, and `x` is
#' not a grouped data frame, then the `num_rows_per_row_group` option is
#' used from the `options` argument, see [parquet_options()]. Otherwise
#' it must be an integer vector, specifying the starts of the row groups.
#' @param options Nanoparquet options, see [parquet_options()].
#' @return `NULL`, unless `file` is `":raw:"`, in which case the Parquet
#' file is returned as a raw vector.
Expand All @@ -62,6 +68,7 @@ write_parquet <- function(
compression = c("snappy", "gzip", "zstd", "uncompressed"),
encoding = NULL,
metadata = NULL,
row_groups = NULL,
options = parquet_options()) {

file <- path.expand(file)
Expand Down Expand Up @@ -148,6 +155,14 @@ write_parquet <- function(

encoding <- parse_encoding(encoding, x)

row_groups <- row_groups %||%
attr(x, "groups") %||%
default_row_groups(x, schema, compression, encoding, options)

row_group_starts <- parse_row_groups(x, row_groups)
x <- row_group_starts[[1]]
row_group_starts <- row_group_starts[[2]]

res <- .Call(
nanoparquet_write,
x,
Expand All @@ -159,6 +174,7 @@ res <- .Call(
options,
schema,
encodings[encoding],
row_group_starts,
sys.call()
)

Expand Down Expand Up @@ -201,3 +217,31 @@ parse_encoding <- function(encoding, x) {
structure(encoding, names = names(x))
}
}

# we should refine this later
default_row_groups <- function(x, schema, compression, encoding, options) {
n <- options[["num_rows_per_row_group"]]
seq(1L, nrow(x), by = n)
}

parse_row_groups <- function(x, rg) {
if (is.data.frame(rg)) {
rg <- rg[[".rows"]]
urg <- unlist(rg)
if (any(diff(urg) != 1)) {
message("Ordering data frame according to row groups.")
x <- x[urg, ]
}
rg <- c(1L, cumsum(lengths(rg)))
rg <- rg[-length(rg)]
} else {
if (!is.integer(rg) || anyNA(rg) || any(rg <= 0) ||
any(diff(rg) <= 0) || rg[1] != 1L) {
stop(
"Row groups must be specified as a growing positive integer ",
"vector, starting with 1."
)
}
}
list(x = x, row_groups = rg)
}
5 changes: 5 additions & 0 deletions man/parquet_options.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions man/write_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 26 additions & 10 deletions src/dictionary-encoding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,28 @@ SEXP nanoparquet_create_dict(SEXP x, SEXP rlen) {
return Rf_ScalarInteger(dictlen);
}

SEXP nanoparquet_create_dict_idx_(SEXP x) {
R_xlen_t dictlen, len = Rf_xlength(x);
SEXP nanoparquet_create_dict_idx_(SEXP x, SEXP from, SEXP until) {
int64_t cfrom = INTEGER(from)[0];
int64_t cuntil = INTEGER(until)[0];
int64_t len = cuntil - cfrom;
R_xlen_t dictlen;

SEXP idx = PROTECT(Rf_allocVector(INTSXP, len));
SEXP dict = PROTECT(Rf_allocVector(INTSXP, len));
int *idict = INTEGER(dict);
int *iidx = INTEGER(idx);
switch (TYPEOF(x)) {
case LGLSXP:
dictlen = create_dict_idx<int>(LOGICAL(x), iidx, idict, len, NA_LOGICAL);
dictlen = create_dict_idx<int>(LOGICAL(x) + cfrom, iidx, idict, len, NA_LOGICAL);
break;
case INTSXP:
dictlen = create_dict_idx<int>(INTEGER(x), idict, iidx, len, NA_INTEGER);
dictlen = create_dict_idx<int>(INTEGER(x) + cfrom, idict, iidx, len, NA_INTEGER);
break;
case REALSXP:
dictlen = create_dict_real_idx(REAL(x), idict, iidx, len);
dictlen = create_dict_real_idx(REAL(x) + cfrom, idict, iidx, len);
break;
case STRSXP: {
dictlen = create_dict_ptr_idx((void**)STRING_PTR_RO(x),idict, iidx, len, (void*) NA_STRING);
dictlen = create_dict_ptr_idx((void**)(STRING_PTR_RO(x) + cfrom), idict, iidx, len, (void*) NA_STRING);
break;
}
default:
Expand All @@ -213,19 +216,32 @@ SEXP nanoparquet_create_dict_idx_(SEXP x) {
return res;
}

struct nanoparquet_create_dict_idx_data {
SEXP data;
SEXP from;
SEXP until;
};

inline SEXP nanoparquet_create_dict_idx_wrapper(void *data) {
SEXP x = (SEXP) data;
return nanoparquet_create_dict_idx_(x);
struct nanoparquet_create_dict_idx_data *rdata =
(struct nanoparquet_create_dict_idx_data*) data;
return nanoparquet_create_dict_idx_(
rdata->data,
rdata->from,
rdata->until
);
}

SEXP nanoparquet_create_dict_idx(SEXP x, SEXP call) {
SEXP nanoparquet_create_dict_idx(SEXP x, SEXP from, SEXP until, SEXP call) {

struct nanoparquet_create_dict_idx_data data = { x, from, until };

SEXP uwt = PROTECT(R_MakeUnwindCont());
R_API_START(call);

SEXP ret = R_UnwindProtect(
nanoparquet_create_dict_idx_wrapper,
(void*) x,
&data,
throw_error,
&uwt,
uwt
Expand Down
Loading

0 comments on commit 9295026

Please sign in to comment.