diff --git a/NEWS.md b/NEWS.md index 1c07a5d..af0f771 100644 --- a/NEWS.md +++ b/NEWS.md @@ -53,6 +53,8 @@ * `write_parquet(file = ":raw:")` now works correctly for larger data frames (#77). +* `read_parquet()` can now read from an R connection (#71). + * `read_parquet()` now reads `DECIMAL` values correctly from `INT32` and `INT64` columns if their `scale` is not zero. diff --git a/R/read-parquet.R b/R/read-parquet.R index 35514ef..9da621f 100644 --- a/R/read-parquet.R +++ b/R/read-parquet.R @@ -2,7 +2,12 @@ #' #' Converts the contents of the named Parquet file to a R data frame. #' -#' @param file Path to a Parquet file. +#' @param file Path to a Parquet file. It may also be an R connection, +#' in which case it first reads all data from the connection, writes +#' it into a temporary file, then reads the temporary file, and +#' deletes it. The connection might be open, it which case it must be +#' a binary connection. If it is not open, then `read_parquet()` will +#' open it and also close it in the end. #' @param options Nanoparquet options, see [parquet_options()]. #' @return A `data.frame` with the file's contents. #' @export @@ -17,6 +22,11 @@ #' print(str(parquet_df)) read_parquet <- function(file, options = parquet_options()) { + if (inherits(file, "connection")) { + tmp <- tempfile(fileext = ".parquet") + dump_connection(file, tmp) + file <- tmp + } file <- path.expand(file) res <- .Call(nanoparquet_read2, file, options, sys.call()) dicts <- res[[2]] @@ -46,3 +56,22 @@ read_parquet <- function(file, options = parquet_options()) { res } + +# dump the contents of a connection to path +dump_connection <- function(con, path) { + if (!isOpen(con)) { + on.exit(close(con), add = TRUE) + open(con, "rb") + } + ocon <- file(path, open = "wb") + # 10 MB buffer by default + bs <- getOption("nanoparquet.con_buffer_size", 1024L * 1024L * 10) + while (TRUE) { + buf <- readBin(con, what = "raw", n = bs) + if (length(buf) == 0) { + break + } + writeBin(buf, path) + } + close(ocon) +} diff --git a/man/read_parquet.Rd b/man/read_parquet.Rd index 5e74bb0..201824f 100644 --- a/man/read_parquet.Rd +++ b/man/read_parquet.Rd @@ -7,7 +7,12 @@ read_parquet(file, options = parquet_options()) } \arguments{ -\item{file}{Path to a Parquet file.} +\item{file}{Path to a Parquet file. It may also be an R connection, +in which case it first reads all data from the connection, writes +it into a temporary file, then reads the temporary file, and +deletes it. The connection might be open, it which case it must be +a binary connection. If it is not open, then \code{read_parquet()} will +open it and also close it in the end.} \item{options}{Nanoparquet options, see \code{\link[=parquet_options]{parquet_options()}}.} } diff --git a/tests/testthat/test-read-parquet-connection.R b/tests/testthat/test-read-parquet-connection.R new file mode 100644 index 0000000..b3d1974 --- /dev/null +++ b/tests/testthat/test-read-parquet-connection.R @@ -0,0 +1,33 @@ +test_that("not open", { + pf <- test_path("data/factor.parquet") + con <- file(pf) + expect_equal( + read_parquet(con), + read_parquet(pf) + ) + # A closed (=invalid in R) connection will error here + expect_error(isOpen(con)) +}) + +test_that("open", { + pf <- test_path("data/factor.parquet") + con <- file(pf, open = "rb") + on.exit(close(con), add = TRUE) + expect_equal( + read_parquet(con), + read_parquet(pf) + ) + expect_true(isOpen(con)) +}) + +test_that("raw, opened", { + pf <- test_path("data/factor.parquet") + bts <- readBin(pf, what = "raw", n = file.size(pf)) + con <- rawConnection(bts, open = "rb") + on.exit(close(con), add = TRUE) + expect_equal( + read_parquet(con), + read_parquet(pf) + ) + expect_true(isOpen(con)) +})