From 4562c1c6a117090e3e5b109bbc0bdafca130d2d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 08:47:32 +0200 Subject: [PATCH 01/22] feat: Implement query cancellation --- src/statement.cpp | 83 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 66 insertions(+), 17 deletions(-) diff --git a/src/statement.cpp b/src/statement.cpp index b4170c825..62f101bb2 100644 --- a/src/statement.cpp +++ b/src/statement.cpp @@ -1,17 +1,22 @@ -#include "rapi.hpp" -#include "typesr.hpp" - -#include - +#include "cpp11/function.hpp" +#include "cpp11/protect.hpp" #include "duckdb/common/arrow/arrow.hpp" #include "duckdb/common/arrow/arrow_converter.hpp" #include "duckdb/common/arrow/arrow_util.hpp" -#include "duckdb/common/types/timestamp.hpp" #include "duckdb/common/arrow/arrow_wrapper.hpp" #include "duckdb/common/arrow/result_arrow_wrapper.hpp" +#include "duckdb/common/enums/pending_execution_result.hpp" +#include "duckdb/common/exception.hpp" +#include "duckdb/common/shared_ptr.hpp" +#include "duckdb/common/types/timestamp.hpp" #include "duckdb/main/chunk_scan_state/query_result.hpp" - +#include "duckdb/main/client_context.hpp" #include "duckdb/parser/statement/relation_statement.hpp" +#include "rapi.hpp" +#include "typesr.hpp" + +#include +#include using namespace duckdb; using namespace cpp11::literals; @@ -334,21 +339,65 @@ bool FetchArrowChunk(ChunkScanState &scan_state, ClientProperties options, Appen return cpp11::safe[Rf_eval](record_batch_reader, arrow_namespace); } +class LocalSignalHandler { +private: + shared_ptr context; + bool interrupted = false; + + // oldhandler stores the old signal handler + // so that it can be restored when the object is destroyed + sig_t oldhandler; + + static LocalSignalHandler *instance; + +public: + LocalSignalHandler(shared_ptr context_) { + if (instance != nullptr) { + throw("Only one instance of LocalSignalHandler is allowed"); + } + instance = this; + oldhandler = signal(SIGINT, signal_handler); + context = context_; + } + + ~LocalSignalHandler() { + signal(SIGINT, oldhandler); + instance = nullptr; + } + + bool WasInterrupted() const { + return interrupted; + } + +private: + static void signal_handler(int signum) { + if (signum == SIGINT) { + instance->context->Interrupt(); + instance->interrupted = true; + } + } +}; + +LocalSignalHandler* LocalSignalHandler::instance = nullptr; + +extern "C" { + extern int R_interrupts_pending; +} + [[cpp11::register]] SEXP rapi_execute(duckdb::stmt_eptr_t stmt, bool arrow, bool integer64) { if (!stmt || !stmt.get() || !stmt->stmt) { cpp11::stop("rapi_execute: Invalid statement"); } - auto pending_query = stmt->stmt->PendingQuery(stmt->parameters, arrow); - duckdb::PendingExecutionResult execution_result; - do { - execution_result = pending_query->ExecuteTask(); - R_CheckUserInterrupt(); - } while (!PendingQueryResult::IsResultReady(execution_result)); - if (execution_result == PendingExecutionResult::EXECUTION_ERROR) { - cpp11::stop("rapi_execute: Failed to run query\nError: %s", pending_query->GetError().c_str()); - } - auto generic_result = pending_query->Execute(); + + LocalSignalHandler signal_handler(stmt->stmt->context); + + auto generic_result = stmt->stmt->Execute(stmt->parameters, false); + if (generic_result->HasError()) { + if (signal_handler.WasInterrupted()) { + R_interrupts_pending = 1; + return R_NilValue; + } cpp11::stop("rapi_execute: Failed to run query\nError: %s", generic_result->GetError().c_str()); } From 9afb6762c859051b133e7703c4a44210b3a02a9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 13:01:45 +0200 Subject: [PATCH 02/22] ifndef --- inst/include/cpp11/R.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/inst/include/cpp11/R.hpp b/inst/include/cpp11/R.hpp index 4d48b9a7f..5579a8c94 100644 --- a/inst/include/cpp11/R.hpp +++ b/inst/include/cpp11/R.hpp @@ -10,8 +10,12 @@ #endif #endif +#ifndef R_NO_REMAP #define R_NO_REMAP +#endif +#ifndef STRICT_R_HEADERS #define STRICT_R_HEADERS +#endif #include "R_ext/Boolean.h" #include "Rinternals.h" #include "Rversion.h" From db1fe79eaf52d287777d4fe5f736309e9d6ac8e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 13:01:47 +0200 Subject: [PATCH 03/22] Include --- src/statement.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/statement.cpp b/src/statement.cpp index 62f101bb2..9642e0133 100644 --- a/src/statement.cpp +++ b/src/statement.cpp @@ -18,6 +18,8 @@ #include #include +#include + using namespace duckdb; using namespace cpp11::literals; From 26fbdb0d80bf8b3483c17a59f22fcbdd07a8bb2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 13:35:31 +0200 Subject: [PATCH 04/22] Rf_onintr() --- src/statement.cpp | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/statement.cpp b/src/statement.cpp index 9642e0133..012989363 100644 --- a/src/statement.cpp +++ b/src/statement.cpp @@ -16,6 +16,7 @@ #include "typesr.hpp" #include +#include #include #include @@ -380,11 +381,7 @@ class LocalSignalHandler { } }; -LocalSignalHandler* LocalSignalHandler::instance = nullptr; - -extern "C" { - extern int R_interrupts_pending; -} +LocalSignalHandler *LocalSignalHandler::instance = nullptr; [[cpp11::register]] SEXP rapi_execute(duckdb::stmt_eptr_t stmt, bool arrow, bool integer64) { if (!stmt || !stmt.get() || !stmt->stmt) { @@ -397,7 +394,8 @@ extern "C" { if (generic_result->HasError()) { if (signal_handler.WasInterrupted()) { - R_interrupts_pending = 1; + cpp11::safe[Rf_onintr](); + // FIXME: Is the following better? cpp11::safe[Rf_onintrNoResume](); return R_NilValue; } cpp11::stop("rapi_execute: Failed to run query\nError: %s", generic_result->GetError().c_str()); From 3c1e209e33d95bf4431eb5f728bfbc37aebce553 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 13:44:12 +0200 Subject: [PATCH 05/22] Move to file --- src/Makevars | 2 +- src/Makevars.in | 2 +- src/Makevars.win | 2 +- src/include/signal.hpp | 35 ++++++++++++++++++++++ src/signal.cpp | 28 ++++++++++++++++++ src/statement.cpp | 66 ++++++------------------------------------ 6 files changed, 75 insertions(+), 60 deletions(-) create mode 100644 src/include/signal.hpp create mode 100644 src/signal.cpp diff --git a/src/Makevars b/src/Makevars index 5e6b4b1bd..08c2ae4ae 100644 --- a/src/Makevars +++ b/src/Makevars @@ -16,4 +16,4 @@ include Makevars.duckdb CXX_STD = CXX17 PKG_CPPFLAGS = -Iinclude -I../inst/include -DDUCKDB_DISABLE_PRINT -DDUCKDB_R_BUILD -DBROTLI_ENCODER_CLEANUP_ON_OOM -Iduckdb/src/include -Iduckdb/third_party/concurrentqueue -Iduckdb/third_party/fast_float -Iduckdb/third_party/fastpforlib -Iduckdb/third_party/fmt/include -Iduckdb/third_party/fsst -Iduckdb/third_party/httplib -Iduckdb/third_party/hyperloglog -Iduckdb/third_party/jaro_winkler -Iduckdb/third_party/jaro_winkler/details -Iduckdb/third_party/libpg_query -Iduckdb/third_party/libpg_query/include -Iduckdb/third_party/lz4 -Iduckdb/third_party/brotli/include -Iduckdb/third_party/brotli/common -Iduckdb/third_party/brotli/dec -Iduckdb/third_party/brotli/enc -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -Iduckdb/third_party/mbedtls/library -Iduckdb/third_party/miniz -Iduckdb/third_party/pcg -Iduckdb/third_party/re2 -Iduckdb/third_party/skiplist -Iduckdb/third_party/tdigest -Iduckdb/third_party/utf8proc -Iduckdb/third_party/utf8proc/include -Iduckdb/third_party/yyjson/include -Iduckdb/extension/parquet/include -Iduckdb/third_party/parquet -Iduckdb/third_party/thrift -Iduckdb/third_party/lz4 -Iduckdb/third_party/brotli/include -Iduckdb/third_party/brotli/common -Iduckdb/third_party/brotli/dec -Iduckdb/third_party/brotli/enc -Iduckdb/third_party/snappy -Iduckdb/third_party/zstd/include -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -I../inst/include -Iduckdb -DDUCKDB_EXTENSION_PARQUET_LINKED -DDUCKDB_BUILD_LIBRARY -OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES) +OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o signal.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES) diff --git a/src/Makevars.in b/src/Makevars.in index aedc76359..51cfe4de3 100644 --- a/src/Makevars.in +++ b/src/Makevars.in @@ -16,5 +16,5 @@ include Makevars.duckdb CXX_STD = CXX17 PKG_CPPFLAGS = -Iinclude -I../inst/include -DDUCKDB_DISABLE_PRINT -DDUCKDB_R_BUILD -DBROTLI_ENCODER_CLEANUP_ON_OOM {{ INCLUDES }} -OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES) +OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o signal.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES) PKG_LIBS={{ LINK_FLAGS }} diff --git a/src/Makevars.win b/src/Makevars.win index a03bde361..2b183aa87 100644 --- a/src/Makevars.win +++ b/src/Makevars.win @@ -16,5 +16,5 @@ include Makevars.duckdb CXX_STD = CXX17 PKG_CPPFLAGS = -Iinclude -I../inst/include -DDUCKDB_DISABLE_PRINT -DDUCKDB_R_BUILD -DBROTLI_ENCODER_CLEANUP_ON_OOM -Iduckdb/src/include -Iduckdb/third_party/concurrentqueue -Iduckdb/third_party/fast_float -Iduckdb/third_party/fastpforlib -Iduckdb/third_party/fmt/include -Iduckdb/third_party/fsst -Iduckdb/third_party/httplib -Iduckdb/third_party/hyperloglog -Iduckdb/third_party/jaro_winkler -Iduckdb/third_party/jaro_winkler/details -Iduckdb/third_party/libpg_query -Iduckdb/third_party/libpg_query/include -Iduckdb/third_party/lz4 -Iduckdb/third_party/brotli/include -Iduckdb/third_party/brotli/common -Iduckdb/third_party/brotli/dec -Iduckdb/third_party/brotli/enc -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -Iduckdb/third_party/mbedtls/library -Iduckdb/third_party/miniz -Iduckdb/third_party/pcg -Iduckdb/third_party/re2 -Iduckdb/third_party/skiplist -Iduckdb/third_party/tdigest -Iduckdb/third_party/utf8proc -Iduckdb/third_party/utf8proc/include -Iduckdb/third_party/yyjson/include -Iduckdb/extension/parquet/include -Iduckdb/third_party/parquet -Iduckdb/third_party/thrift -Iduckdb/third_party/lz4 -Iduckdb/third_party/brotli/include -Iduckdb/third_party/brotli/common -Iduckdb/third_party/brotli/dec -Iduckdb/third_party/brotli/enc -Iduckdb/third_party/snappy -Iduckdb/third_party/zstd/include -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -I../inst/include -Iduckdb -DDUCKDB_EXTENSION_PARQUET_LINKED -DDUCKDB_BUILD_LIBRARY -DDUCKDB_PLATFORM_RTOOLS=1 -OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES) +OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o signal.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES) PKG_LIBS=-lws2_32 -L. -lrstrtmgr diff --git a/src/include/signal.hpp b/src/include/signal.hpp new file mode 100644 index 000000000..902fda542 --- /dev/null +++ b/src/include/signal.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include + +#include "duckdb/common/shared_ptr.hpp" +#include "duckdb/main/client_context.hpp" + +// For Rf_onintr() needed by users of this class +#include "cpp11/R.hpp" +#include + +namespace duckdb { + +class ScopedInterruptHandler { +private: + shared_ptr context; + bool interrupted = false; + + // oldhandler stores the old signal handler + // so that it can be restored when the object is destroyed + sig_t oldhandler; + + static ScopedInterruptHandler *instance; + +public: + ScopedInterruptHandler(shared_ptr context_); + ~ScopedInterruptHandler(); + + bool WasInterrupted() const; + +private: + static void signal_handler(int signum); +}; + +}; diff --git a/src/signal.cpp b/src/signal.cpp new file mode 100644 index 000000000..1defc1094 --- /dev/null +++ b/src/signal.cpp @@ -0,0 +1,28 @@ +#include "signal.hpp" + +namespace duckdb { + +ScopedInterruptHandler *ScopedInterruptHandler::instance = nullptr; + +ScopedInterruptHandler::ScopedInterruptHandler(shared_ptr context_) : context(context_) { + instance = this; + oldhandler = std::signal(SIGINT, ScopedInterruptHandler::signal_handler); +} + +ScopedInterruptHandler::~ScopedInterruptHandler() { + std::signal(SIGINT, oldhandler); + instance = nullptr; +} + +bool ScopedInterruptHandler::WasInterrupted() const { + return interrupted; +} + +void ScopedInterruptHandler::signal_handler(int signum) { + if (instance) { + instance->interrupted = true; + instance->context->Interrupt(); + } +} + +}; diff --git a/src/statement.cpp b/src/statement.cpp index 012989363..29784889c 100644 --- a/src/statement.cpp +++ b/src/statement.cpp @@ -1,25 +1,18 @@ -#include "cpp11/function.hpp" -#include "cpp11/protect.hpp" +#include "rapi.hpp" +#include "typesr.hpp" +#include "signal.hpp" + +#include + #include "duckdb/common/arrow/arrow.hpp" #include "duckdb/common/arrow/arrow_converter.hpp" #include "duckdb/common/arrow/arrow_util.hpp" +#include "duckdb/common/types/timestamp.hpp" #include "duckdb/common/arrow/arrow_wrapper.hpp" #include "duckdb/common/arrow/result_arrow_wrapper.hpp" -#include "duckdb/common/enums/pending_execution_result.hpp" -#include "duckdb/common/exception.hpp" -#include "duckdb/common/shared_ptr.hpp" -#include "duckdb/common/types/timestamp.hpp" #include "duckdb/main/chunk_scan_state/query_result.hpp" -#include "duckdb/main/client_context.hpp" -#include "duckdb/parser/statement/relation_statement.hpp" -#include "rapi.hpp" -#include "typesr.hpp" -#include -#include -#include - -#include +#include "duckdb/parser/statement/relation_statement.hpp" using namespace duckdb; using namespace cpp11::literals; @@ -342,53 +335,12 @@ bool FetchArrowChunk(ChunkScanState &scan_state, ClientProperties options, Appen return cpp11::safe[Rf_eval](record_batch_reader, arrow_namespace); } -class LocalSignalHandler { -private: - shared_ptr context; - bool interrupted = false; - - // oldhandler stores the old signal handler - // so that it can be restored when the object is destroyed - sig_t oldhandler; - - static LocalSignalHandler *instance; - -public: - LocalSignalHandler(shared_ptr context_) { - if (instance != nullptr) { - throw("Only one instance of LocalSignalHandler is allowed"); - } - instance = this; - oldhandler = signal(SIGINT, signal_handler); - context = context_; - } - - ~LocalSignalHandler() { - signal(SIGINT, oldhandler); - instance = nullptr; - } - - bool WasInterrupted() const { - return interrupted; - } - -private: - static void signal_handler(int signum) { - if (signum == SIGINT) { - instance->context->Interrupt(); - instance->interrupted = true; - } - } -}; - -LocalSignalHandler *LocalSignalHandler::instance = nullptr; - [[cpp11::register]] SEXP rapi_execute(duckdb::stmt_eptr_t stmt, bool arrow, bool integer64) { if (!stmt || !stmt.get() || !stmt->stmt) { cpp11::stop("rapi_execute: Invalid statement"); } - LocalSignalHandler signal_handler(stmt->stmt->context); + ScopedInterruptHandler signal_handler(stmt->stmt->context); auto generic_result = stmt->stmt->Execute(stmt->parameters, false); From b392b4546344ab47cc025156a251e1274c1b79f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 14:25:01 +0200 Subject: [PATCH 06/22] Local typedef --- src/include/signal.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/include/signal.hpp b/src/include/signal.hpp index 902fda542..d39afcc11 100644 --- a/src/include/signal.hpp +++ b/src/include/signal.hpp @@ -18,6 +18,7 @@ class ScopedInterruptHandler { // oldhandler stores the old signal handler // so that it can be restored when the object is destroyed + typedef void (*sig_t)(int); sig_t oldhandler; static ScopedInterruptHandler *instance; From 396f2943da24e586ab28a95ee44faf88199cf881 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 15:26:45 +0200 Subject: [PATCH 07/22] Bump --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index 44691486d..767ec87fb 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: duckdb Title: DBI Package for the DuckDB Database Management System -Version: 1.1.1.9000 +Version: 1.1.1.9001 Authors@R: c( person("Hannes", "Mühleisen", , "hannes@cwi.nl", role = "aut", comment = c(ORCID = "0000-0001-8552-0029")), From 4967ec9581597a37adb1332abd8adc82de3f6753 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 15:26:57 +0200 Subject: [PATCH 08/22] Add test --- tests/testthat/test-signal.R | 40 ++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 tests/testthat/test-signal.R diff --git a/tests/testthat/test-signal.R b/tests/testthat/test-signal.R new file mode 100644 index 000000000..0b1661110 --- /dev/null +++ b/tests/testthat/test-signal.R @@ -0,0 +1,40 @@ +test_that("long-running queries can be cancelled", { + skip_if_not_installed("callr") + + r_session <- callr::r_session$new() + + r_session$run(function() { + .GlobalEnv$con <- DBI::dbConnect(duckdb::duckdb()) + DBI::dbExecute(.GlobalEnv$con, "CREATE TABLE data AS SELECT unnest(generate_series(1, 100000)) AS a") + }) + + r_session$call(function() { + .GlobalEnv$interrupted <- FALSE + tryCatch( + DBI::dbGetQuery(.GlobalEnv$con, "SELECT COUNT(*) FROM data JOIN data AS data2 ON data.a != data2.a"), + interrupt = function(e) { + .GlobalEnv$interrupted <- TRUE + } + ) + }) + + start_time <- Sys.time() + + Sys.sleep(0.2) + expect_equal(r_session$get_state(), "busy") + polled <- r_session$poll_process(200) + expect_equal(polled, "timeout") + r_session$signal(2) + polled <- r_session$poll_process(200) + expect_equal(polled, "ready") + expect_equal(r_session$read()$code, 200) + expect_equal(r_session$get_state(), "idle") + + expect_true(r_session$run(function() .GlobalEnv$interrupted)) + + end_time <- Sys.time() + expect_lt(end_time - start_time, 1) + + r_session$run(function() DBI::dbDisconnect(.GlobalEnv$con)) + r_session$close() +}) From 531e105609e8e598e9767e02e47f7506233a96f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 15:35:24 +0200 Subject: [PATCH 09/22] Add backref --- src/include/signal.hpp | 2 ++ src/signal.cpp | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/include/signal.hpp b/src/include/signal.hpp index d39afcc11..0d5c0601b 100644 --- a/src/include/signal.hpp +++ b/src/include/signal.hpp @@ -9,6 +9,8 @@ #include "cpp11/R.hpp" #include +// Toy repo: https://github.com/krlmlr/cancel.test + namespace duckdb { class ScopedInterruptHandler { diff --git a/src/signal.cpp b/src/signal.cpp index 1defc1094..b2b6abadf 100644 --- a/src/signal.cpp +++ b/src/signal.cpp @@ -1,5 +1,7 @@ #include "signal.hpp" +// Toy repo: https://github.com/krlmlr/cancel.test + namespace duckdb { ScopedInterruptHandler *ScopedInterruptHandler::instance = nullptr; From d2cea0ec24d74139b03c4b406681ad7208724cd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 15:42:00 +0200 Subject: [PATCH 10/22] Fix Windows test --- tests/testthat/test-signal.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/testthat/test-signal.R b/tests/testthat/test-signal.R index 0b1661110..b1eb62706 100644 --- a/tests/testthat/test-signal.R +++ b/tests/testthat/test-signal.R @@ -24,7 +24,7 @@ test_that("long-running queries can be cancelled", { expect_equal(r_session$get_state(), "busy") polled <- r_session$poll_process(200) expect_equal(polled, "timeout") - r_session$signal(2) + r_session$interrupt() polled <- r_session$poll_process(200) expect_equal(polled, "ready") expect_equal(r_session$read()$code, 200) From 37468f2a4a6739ad9a5e8062bb0f3ff66f01c422 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 15:46:28 +0200 Subject: [PATCH 11/22] Move responsibilities --- src/include/signal.hpp | 6 +----- src/signal.cpp | 17 ++++++++++++++--- src/statement.cpp | 18 +++++++----------- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/src/include/signal.hpp b/src/include/signal.hpp index 0d5c0601b..eaaab9a46 100644 --- a/src/include/signal.hpp +++ b/src/include/signal.hpp @@ -5,10 +5,6 @@ #include "duckdb/common/shared_ptr.hpp" #include "duckdb/main/client_context.hpp" -// For Rf_onintr() needed by users of this class -#include "cpp11/R.hpp" -#include - // Toy repo: https://github.com/krlmlr/cancel.test namespace duckdb { @@ -29,7 +25,7 @@ class ScopedInterruptHandler { ScopedInterruptHandler(shared_ptr context_); ~ScopedInterruptHandler(); - bool WasInterrupted() const; + bool HandleInterrupt() const; private: static void signal_handler(int signum); diff --git a/src/signal.cpp b/src/signal.cpp index b2b6abadf..2da442d55 100644 --- a/src/signal.cpp +++ b/src/signal.cpp @@ -1,5 +1,9 @@ #include "signal.hpp" +#include "cpp11/R.hpp" + +#include + // Toy repo: https://github.com/krlmlr/cancel.test namespace duckdb { @@ -16,8 +20,15 @@ ScopedInterruptHandler::~ScopedInterruptHandler() { instance = nullptr; } -bool ScopedInterruptHandler::WasInterrupted() const { - return interrupted; +bool ScopedInterruptHandler::HandleInterrupt() const { + if (!interrupted) { + return false; + } + + // We're presumably still blocking interrupts here in the R session, + // so this is likely equivalent to cpp11::safe[Rf_onintrNoResume]() + cpp11::safe[Rf_onintr](); + return true; } void ScopedInterruptHandler::signal_handler(int signum) { @@ -27,4 +38,4 @@ void ScopedInterruptHandler::signal_handler(int signum) { } } -}; +}; // namespace duckdb diff --git a/src/statement.cpp b/src/statement.cpp index 29784889c..7c2d7e078 100644 --- a/src/statement.cpp +++ b/src/statement.cpp @@ -1,18 +1,16 @@ -#include "rapi.hpp" -#include "typesr.hpp" -#include "signal.hpp" - -#include - #include "duckdb/common/arrow/arrow.hpp" #include "duckdb/common/arrow/arrow_converter.hpp" #include "duckdb/common/arrow/arrow_util.hpp" -#include "duckdb/common/types/timestamp.hpp" #include "duckdb/common/arrow/arrow_wrapper.hpp" #include "duckdb/common/arrow/result_arrow_wrapper.hpp" +#include "duckdb/common/types/timestamp.hpp" #include "duckdb/main/chunk_scan_state/query_result.hpp" - #include "duckdb/parser/statement/relation_statement.hpp" +#include "rapi.hpp" +#include "signal.hpp" +#include "typesr.hpp" + +#include using namespace duckdb; using namespace cpp11::literals; @@ -345,9 +343,7 @@ bool FetchArrowChunk(ChunkScanState &scan_state, ClientProperties options, Appen auto generic_result = stmt->stmt->Execute(stmt->parameters, false); if (generic_result->HasError()) { - if (signal_handler.WasInterrupted()) { - cpp11::safe[Rf_onintr](); - // FIXME: Is the following better? cpp11::safe[Rf_onintrNoResume](); + if (signal_handler.HandleInterrupt()) { return R_NilValue; } cpp11::stop("rapi_execute: Failed to run query\nError: %s", generic_result->GetError().c_str()); From 68363ef994cf24b1063dfad8250db16748deab91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 17:37:38 +0200 Subject: [PATCH 12/22] Include --- src/signal.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/signal.cpp b/src/signal.cpp index 2da442d55..d0cabebe8 100644 --- a/src/signal.cpp +++ b/src/signal.cpp @@ -1,6 +1,7 @@ #include "signal.hpp" #include "cpp11/R.hpp" +#include "cpp11/protect.hpp" // for safe #include From 5cb2613278c31059c73f1b469668c5f9e58c2eb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 17:41:12 +0200 Subject: [PATCH 13/22] Assertion --- src/signal.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/signal.cpp b/src/signal.cpp index d0cabebe8..bb476c407 100644 --- a/src/signal.cpp +++ b/src/signal.cpp @@ -2,6 +2,7 @@ #include "cpp11/R.hpp" #include "cpp11/protect.hpp" // for safe +#include "duckdb/common/exception.hpp" #include @@ -12,6 +13,9 @@ namespace duckdb { ScopedInterruptHandler *ScopedInterruptHandler::instance = nullptr; ScopedInterruptHandler::ScopedInterruptHandler(shared_ptr context_) : context(context_) { + if (instance) { + throw InternalException("ScopedInterruptHandler already active"); + } instance = this; oldhandler = std::signal(SIGINT, ScopedInterruptHandler::signal_handler); } From 77a8979946f5f5227c15f6fcd4eeb10dacd853b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 17:41:32 +0200 Subject: [PATCH 14/22] Support no-op with nullptr in context --- src/signal.cpp | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/signal.cpp b/src/signal.cpp index bb476c407..798932731 100644 --- a/src/signal.cpp +++ b/src/signal.cpp @@ -16,18 +16,25 @@ ScopedInterruptHandler::ScopedInterruptHandler(shared_ptr context if (instance) { throw InternalException("ScopedInterruptHandler already active"); } - instance = this; - oldhandler = std::signal(SIGINT, ScopedInterruptHandler::signal_handler); + if (context) { + instance = this; + oldhandler = std::signal(SIGINT, ScopedInterruptHandler::signal_handler); + } } ScopedInterruptHandler::~ScopedInterruptHandler() { - std::signal(SIGINT, oldhandler); - instance = nullptr; + if (context) { + std::signal(SIGINT, oldhandler); + instance = nullptr; + } } bool ScopedInterruptHandler::HandleInterrupt() const { + // Never interrupted without context if (!interrupted) { return false; + } else { + D_ASSERT(context); } // We're presumably still blocking interrupts here in the R session, From 13569855c3d0a3236f5b19fe540ff9ac32766e36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 17:42:55 +0200 Subject: [PATCH 15/22] Delete other constructors --- src/include/signal.hpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/include/signal.hpp b/src/include/signal.hpp index eaaab9a46..15fa5481f 100644 --- a/src/include/signal.hpp +++ b/src/include/signal.hpp @@ -21,6 +21,12 @@ class ScopedInterruptHandler { static ScopedInterruptHandler *instance; +private: + ScopedInterruptHandler() = delete; + ScopedInterruptHandler(const ScopedInterruptHandler &) = delete; + ScopedInterruptHandler &operator=(const ScopedInterruptHandler &) = delete; + ScopedInterruptHandler(ScopedInterruptHandler &&) = delete; + public: ScopedInterruptHandler(shared_ptr context_); ~ScopedInterruptHandler(); From 3d6c93ca57146d344ab6796e1c8b34cc3943753b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 17:49:06 +0200 Subject: [PATCH 16/22] Disable when done --- src/include/signal.hpp | 1 + src/signal.cpp | 13 +++++++++---- src/statement.cpp | 2 ++ 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/include/signal.hpp b/src/include/signal.hpp index 15fa5481f..2e3a02444 100644 --- a/src/include/signal.hpp +++ b/src/include/signal.hpp @@ -32,6 +32,7 @@ class ScopedInterruptHandler { ~ScopedInterruptHandler(); bool HandleInterrupt() const; + void Disable(); private: static void signal_handler(int signum); diff --git a/src/signal.cpp b/src/signal.cpp index 798932731..f1f4f9cff 100644 --- a/src/signal.cpp +++ b/src/signal.cpp @@ -23,10 +23,8 @@ ScopedInterruptHandler::ScopedInterruptHandler(shared_ptr context } ScopedInterruptHandler::~ScopedInterruptHandler() { - if (context) { - std::signal(SIGINT, oldhandler); - instance = nullptr; - } + Disable(); + instance = nullptr; } bool ScopedInterruptHandler::HandleInterrupt() const { @@ -43,6 +41,13 @@ bool ScopedInterruptHandler::HandleInterrupt() const { return true; } +void ScopedInterruptHandler::Disable() { + if (context) { + std::signal(SIGINT, oldhandler); + context.reset(); + } +} + void ScopedInterruptHandler::signal_handler(int signum) { if (instance) { instance->interrupted = true; diff --git a/src/statement.cpp b/src/statement.cpp index 7c2d7e078..75a92bb63 100644 --- a/src/statement.cpp +++ b/src/statement.cpp @@ -349,6 +349,8 @@ bool FetchArrowChunk(ChunkScanState &scan_state, ClientProperties options, Appen cpp11::stop("rapi_execute: Failed to run query\nError: %s", generic_result->GetError().c_str()); } + signal_handler.Disable(); + if (arrow) { auto query_result = new RQueryResult(); query_result->result = std::move(generic_result); From 1f04dae18dca306c0222932877a4a5bce274710b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 17:50:57 +0200 Subject: [PATCH 17/22] Extract variable --- src/relational.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/relational.cpp b/src/relational.cpp index f682cc534..939c0b3df 100644 --- a/src/relational.cpp +++ b/src/relational.cpp @@ -431,7 +431,9 @@ static SEXP result_to_df(duckdb::unique_ptr res) { } [[cpp11::register]] SEXP rapi_rel_to_df(duckdb::rel_extptr_t rel) { - return result_to_df(rel->rel->Execute()); + auto res = rel->rel->Execute(); + + return result_to_df(std::move(res)); } [[cpp11::register]] std::string rapi_rel_tostring(duckdb::rel_extptr_t rel) { From 1521f361aa1693c2c5c2d120161bcb2667912ca6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 18:09:56 +0200 Subject: [PATCH 18/22] Interruptible rapi_rel_to_df() --- src/relational.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/relational.cpp b/src/relational.cpp index 939c0b3df..8a829b45b 100644 --- a/src/relational.cpp +++ b/src/relational.cpp @@ -1,5 +1,6 @@ #include "cpp11.hpp" #include "duckdb.hpp" +#include "signal.hpp" #include "typesr.hpp" #include "rapi.hpp" @@ -431,8 +432,16 @@ static SEXP result_to_df(duckdb::unique_ptr res) { } [[cpp11::register]] SEXP rapi_rel_to_df(duckdb::rel_extptr_t rel) { + ScopedInterruptHandler signal_handler(rel->rel->context.GetContext()); + auto res = rel->rel->Execute(); + if (signal_handler.HandleInterrupt()) { + return R_NilValue; + } + + signal_handler.Disable(); + return result_to_df(std::move(res)); } From 1b9dd12537a8fbf76c280eb93d3a780ee2c6b1fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 18:10:30 +0200 Subject: [PATCH 19/22] Interruptible ALTREP --- src/reltoaltrep.cpp | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/reltoaltrep.cpp b/src/reltoaltrep.cpp index eff66620d..583bcaf3d 100644 --- a/src/reltoaltrep.cpp +++ b/src/reltoaltrep.cpp @@ -1,8 +1,10 @@ #define __STDC_FORMAT_MACROS +#include "httplib.hpp" #include "rapi.hpp" #include "typesr.hpp" #include "reltoaltrep.hpp" +#include "signal.hpp" #include "cpp11/declarations.hpp" #include @@ -97,16 +99,31 @@ struct AltrepRelationWrapper { Rprintf("materializing:\n%s\n", rel->ToString().c_str()); } + ScopedInterruptHandler signal_handler(rel->context.GetContext()); + // We need to temporarily allow a deeper execution stack // https://github.com/duckdb/duckdb-r/issues/101 auto old_depth = rel->context.GetContext()->config.max_expression_depth; rel->context.GetContext()->config.max_expression_depth = old_depth * 2; + duckdb_httplib::detail::scope_exit reset_max_expression_depth([&]() { + rel->context.GetContext()->config.max_expression_depth = old_depth; + }); + res = rel->Execute(); + + // FIXME: Use std::experimental::scope_exit if (rel->context.GetContext()->config.max_expression_depth != old_depth * 2) { Rprintf("Internal error: max_expression_depth was changed from %" PRIu64 " to %" PRIu64 "\n", old_depth * 2, rel->context.GetContext()->config.max_expression_depth); } rel->context.GetContext()->config.max_expression_depth = old_depth; + reset_max_expression_depth.release(); + + if (signal_handler.HandleInterrupt()) { + cpp11::stop("Query execution was interrupted"); + } + + signal_handler.Disable(); if (res->HasError()) { cpp11::stop("Error evaluating duckdb query: %s", res->GetError().c_str()); From ba85d69666bab2626166a773467346abfbb6f435 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 18:10:41 +0200 Subject: [PATCH 20/22] Harmonize rapi_execute() --- src/statement.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/statement.cpp b/src/statement.cpp index 75a92bb63..bec1dfd14 100644 --- a/src/statement.cpp +++ b/src/statement.cpp @@ -342,15 +342,16 @@ bool FetchArrowChunk(ChunkScanState &scan_state, ClientProperties options, Appen auto generic_result = stmt->stmt->Execute(stmt->parameters, false); - if (generic_result->HasError()) { - if (signal_handler.HandleInterrupt()) { - return R_NilValue; - } - cpp11::stop("rapi_execute: Failed to run query\nError: %s", generic_result->GetError().c_str()); + if (signal_handler.HandleInterrupt()) { + return R_NilValue; } signal_handler.Disable(); + if (generic_result->HasError()) { + cpp11::stop("rapi_execute: Failed to run query\nError: %s", generic_result->GetError().c_str()); + } + if (arrow) { auto query_result = new RQueryResult(); query_result->result = std::move(generic_result); From ee05d1573362b2ff8b72aabe3be407e7f2121d7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 20 Oct 2024 19:57:57 +0200 Subject: [PATCH 21/22] Skip test on old Windows for now --- tests/testthat/test-signal.R | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/testthat/test-signal.R b/tests/testthat/test-signal.R index b1eb62706..d350094b8 100644 --- a/tests/testthat/test-signal.R +++ b/tests/testthat/test-signal.R @@ -1,5 +1,8 @@ test_that("long-running queries can be cancelled", { skip_if_not_installed("callr") + # Skip on Windows for R < 4.4, the signal doesn't seem to make it through + # (but works for the toy repository) + skip_if(getRversion() < "4.4.0" && .Platform$OS.type == "windows") r_session <- callr::r_session$new() From 69aeedc9dacc9b0714f4d0e10d7505d1137b6cb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Mon, 21 Oct 2024 05:18:50 +0200 Subject: [PATCH 22/22] Message --- src/signal.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/signal.cpp b/src/signal.cpp index f1f4f9cff..fb4af557f 100644 --- a/src/signal.cpp +++ b/src/signal.cpp @@ -1,6 +1,7 @@ #include "signal.hpp" #include "cpp11/R.hpp" +#include "cpp11/function.hpp" #include "cpp11/protect.hpp" // for safe #include "duckdb/common/exception.hpp" @@ -35,8 +36,12 @@ bool ScopedInterruptHandler::HandleInterrupt() const { D_ASSERT(context); } - // We're presumably still blocking interrupts here in the R session, - // so this is likely equivalent to cpp11::safe[Rf_onintrNoResume]() + // This seems necessary to work around a specificity with the RStudio IDE on Windows. + // Without the message, the interrupt is not available as a catchable condition. + // https://github.com/krlmlr/cancel.test/issues/1 + cpp11::message(""); + + // FIXME: Is this equivalent to cpp11::safe[Rf_onintrNoResume](), or worse? cpp11::safe[Rf_onintr](); return true; }