Skip to content

Commit

Permalink
read_parquet() can read from a connection now
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborcsardi committed Sep 14, 2024
1 parent 9295026 commit e97f7a5
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 2 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
* `write_parquet(file = ":raw:")` now works correctly for larger data
frames (#77).

* `read_parquet()` can now read from an R connection (#71).

* `read_parquet()` now reads `DECIMAL` values correctly from `INT32`
and `INT64` columns if their `scale` is not zero.

Expand Down
31 changes: 30 additions & 1 deletion R/read-parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
#'
#' Converts the contents of the named Parquet file to a R data frame.
#'
#' @param file Path to a Parquet file.
#' @param file Path to a Parquet file. It may also be an R connection,
#' in which case it first reads all data from the connection, writes
#' it into a temporary file, then reads the temporary file, and
#' deletes it. The connection might be open, it which case it must be
#' a binary connection. If it is not open, then `read_parquet()` will
#' open it and also close it in the end.
#' @param options Nanoparquet options, see [parquet_options()].
#' @return A `data.frame` with the file's contents.
#' @export
Expand All @@ -17,6 +22,11 @@
#' print(str(parquet_df))

read_parquet <- function(file, options = parquet_options()) {
if (inherits(file, "connection")) {
tmp <- tempfile(fileext = ".parquet")
dump_connection(file, tmp)
file <- tmp
}
file <- path.expand(file)
res <- .Call(nanoparquet_read2, file, options, sys.call())
dicts <- res[[2]]
Expand Down Expand Up @@ -46,3 +56,22 @@ read_parquet <- function(file, options = parquet_options()) {

res
}

# dump the contents of a connection to path
dump_connection <- function(con, path) {
if (!isOpen(con)) {
on.exit(close(con), add = TRUE)
open(con, "rb")
}
ocon <- file(path, open = "wb")
# 10 MB buffer by default
bs <- getOption("nanoparquet.con_buffer_size", 1024L * 1024L * 10)
while (TRUE) {
buf <- readBin(con, what = "raw", n = bs)
if (length(buf) == 0) {
break
}
writeBin(buf, path)
}
close(ocon)
}
7 changes: 6 additions & 1 deletion man/read_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions tests/testthat/test-read-parquet-connection.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
test_that("not open", {
pf <- test_path("data/factor.parquet")
con <- file(pf)
expect_equal(
read_parquet(con),
read_parquet(pf)
)
# A closed (=invalid in R) connection will error here
expect_error(isOpen(con))
})

test_that("open", {
pf <- test_path("data/factor.parquet")
con <- file(pf, open = "rb")
on.exit(close(con), add = TRUE)
expect_equal(
read_parquet(con),
read_parquet(pf)
)
expect_true(isOpen(con))
})

test_that("raw, opened", {
pf <- test_path("data/factor.parquet")
bts <- readBin(pf, what = "raw", n = file.size(pf))
con <- rawConnection(bts, open = "rb")
on.exit(close(con), add = TRUE)
expect_equal(
read_parquet(con),
read_parquet(pf)
)
expect_true(isOpen(con))
})

0 comments on commit e97f7a5

Please sign in to comment.