From 1b124519872692d9fc7536143983235cb24f1ced Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Fri, 2 Feb 2024 16:25:13 -0800 Subject: [PATCH] Upgrade to latest datafusion --- Cargo.lock | 465 +++++++------ Cargo.toml | 1 + deny.toml | 1 + src/adapter/flight-sql/Cargo.toml | 4 +- src/adapter/graphql/Cargo.toml | 4 +- src/adapter/graphql/src/scalars/data_query.rs | 2 +- src/app/cli/Cargo.toml | 4 +- src/app/cli/src/commands/sql_shell_command.rs | 1 + src/domain/core/Cargo.toml | 4 +- src/domain/flow-system/Cargo.toml | 2 +- src/domain/opendatafabric/Cargo.toml | 2 +- src/infra/core/Cargo.toml | 4 +- src/infra/core/src/query_service_impl.rs | 1 - .../tests/tests/ingest/test_polling_ingest.rs | 6 +- .../core/tests/tests/ingest/test_writer.rs | 40 +- .../tests/tests/test_query_service_impl.rs | 2 +- src/infra/ingest-datafusion/Cargo.toml | 4 +- .../src/merge_strategies/snapshot.rs | 4 +- .../ingest-datafusion/src/readers/csv.rs | 2 - .../ingest-datafusion/src/readers/ndjson.rs | 1 - .../ingest-datafusion/src/readers/parquet.rs | 1 - src/infra/ingest-datafusion/src/writer.rs | 6 +- .../tests/tests/test_reader_ndjson.rs | 3 +- src/utils/data-utils/Cargo.toml | 4 +- src/utils/data-utils/src/schema/cmp.rs | 33 + src/utils/data-utils/src/schema/format.rs | 1 + src/utils/data-utils/src/schema/mod.rs | 1 + src/utils/datafusion-cli/Cargo.toml | 9 +- src/utils/datafusion-cli/src/catalog.rs | 8 +- src/utils/datafusion-cli/src/command.rs | 2 +- src/utils/datafusion-cli/src/exec.rs | 153 +++-- src/utils/datafusion-cli/src/functions.rs | 246 ++++++- src/utils/datafusion-cli/src/helper.rs | 25 +- src/utils/datafusion-cli/src/highlighter.rs | 125 ++++ src/utils/datafusion-cli/src/lib.rs | 1 + .../datafusion-cli/src/object_storage.rs | 41 +- src/utils/datafusion-cli/src/print_format.rs | 610 +++++++++++++----- src/utils/datafusion-cli/src/print_options.rs | 64 +- 38 files changed, 1368 insertions(+), 519 deletions(-) create mode 100644 src/utils/data-utils/src/schema/cmp.rs create mode 100644 src/utils/datafusion-cli/src/highlighter.rs diff --git a/Cargo.lock b/Cargo.lock index e3f6594b80..8b644d1829 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -125,9 +125,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" +checksum = "2faccea4cc4ab4a667ce676a30e8ec13922a692c99bb8f5b11f1502c72e04220" [[package]] name = "anstyle-parse" @@ -186,11 +186,10 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8919668503a4f2d8b6da96fa7c16e93046bfb3412ffcfa1e5dc7d2e3adcb378" +checksum = "aa285343fba4d829d49985bdc541e3789cf6000ed0e84be7c039438df4a4e78c" dependencies = [ - "ahash", "arrow-arith", "arrow-array", "arrow-buffer", @@ -208,9 +207,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef983914f477d4278b068f13b3224b7d19eb2b807ac9048544d3bfebdf2554c4" +checksum = "753abd0a5290c1bcade7c6623a556f7d1659c5f4148b140b5b63ce7bd1a45705" dependencies = [ "arrow-array", "arrow-buffer", @@ -223,9 +222,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6eaf89041fa5937940ae390294ece29e1db584f46d995608d6e5fe65a2e0e9b" +checksum = "d390feeb7f21b78ec997a4081a025baef1e2e0d6069e181939b61864c9779609" dependencies = [ "ahash", "arrow-buffer", @@ -240,9 +239,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55512d988c6fbd76e514fd3ff537ac50b0a675da5a245e4fdad77ecfd654205f" +checksum = "69615b061701bcdffbc62756bc7e85c827d5290b472b580c972ebbbf690f5aa4" dependencies = [ "bytes", "half", @@ -251,15 +250,16 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "655ee51a2156ba5375931ce21c1b2494b1d9260e6dcdc6d4db9060c37dc3325b" +checksum = "e448e5dd2f4113bf5b74a1f26531708f5edcacc77335b7066f9398f4bcf4cdef" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", + "base64 0.21.7", "chrono", "comfy-table", "half", @@ -269,9 +269,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "258bb689997ad5b6660b3ce3638bd6b383d668ec555ed41ad7c6559cbb2e4f91" +checksum = "46af72211f0712612f5b18325530b9ad1bfbdc87290d5fbfd32a7da128983781" dependencies = [ "arrow-array", "arrow-buffer", @@ -288,9 +288,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dc2b9fec74763427e2e5575b8cc31ce96ba4c9b4eb05ce40e0616d9fad12461" +checksum = "67d644b91a162f3ad3135ce1184d0a31c28b816a581e08f29e8e9277a574c64e" dependencies = [ "arrow-buffer", "arrow-schema", @@ -300,9 +300,9 @@ dependencies = [ [[package]] name = "arrow-digest" -version = "48.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd2a23669fa1f16b76f2ff9f29de529d1f82b0f8e23b1913e1220ea2d7e20f06" +checksum = "b9a0e5a10b0f1a37847d296787a9073c5fa6c21f60dc12bf341a74e7caf99aed" dependencies = [ "arrow", "digest", @@ -310,9 +310,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9169108dbae9e48517a71910e51b24682c6096dbd61cbd35df296c6683a85b35" +checksum = "1d7f215461ad6346f2e4cc853e377d4e076d533e1ed78d327debe83023e3601f" dependencies = [ "arrow-arith", "arrow-array", @@ -337,9 +337,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6eaa6ab203cc6d89b7eaa1ac781c1dfeef325454c5d5a0419017f95e6bafc03c" +checksum = "03dea5e79b48de6c2e04f03f62b0afea7105be7b77d134f6c5414868feefb80d" dependencies = [ "arrow-array", "arrow-buffer", @@ -347,13 +347,14 @@ dependencies = [ "arrow-data", "arrow-schema", "flatbuffers", + "lz4_flex", ] [[package]] name = "arrow-json" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb64e30d9b73f66fdc5c52d5f4cf69bbf03d62f64ffeafa0715590a5320baed7" +checksum = "8950719280397a47d37ac01492e3506a8a724b3fb81001900b866637a829ee0f" dependencies = [ "arrow-array", "arrow-buffer", @@ -362,7 +363,7 @@ dependencies = [ "arrow-schema", "chrono", "half", - "indexmap 2.1.0", + "indexmap 2.2.2", "lexical-core", "num", "serde", @@ -371,9 +372,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a818951c0d11c428dda03e908175969c262629dd20bd0850bd6c7a8c3bfe48" +checksum = "1ed9630979034077982d8e74a942b7ac228f33dd93a93b615b4d02ad60c260be" dependencies = [ "arrow-array", "arrow-buffer", @@ -386,9 +387,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5d664318bc05f930559fc088888f0f7174d3c5bc888c0f4f9ae8f23aa398ba3" +checksum = "007035e17ae09c4e8993e4cb8b5b96edf0afb927cd38e2dff27189b274d83dcf" dependencies = [ "ahash", "arrow-array", @@ -401,15 +402,15 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf4d737bba93da59f16129bec21e087aed0be84ff840e74146d4703879436cb" +checksum = "0ff3e9c01f7cd169379d269f926892d0e622a704960350d09d331be3ec9e0029" [[package]] name = "arrow-select" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "374c4c3b812ecc2118727b892252a4a4308f87a8aca1dbf09f3ce4bc578e668a" +checksum = "1ce20973c1912de6514348e064829e50947e35977bb9d7fb637dc99ea9ffd78c" dependencies = [ "ahash", "arrow-array", @@ -421,9 +422,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15aed5624bb23da09142f58502b59c23f5bea607393298bb81dab1ce60fc769" +checksum = "00f3b37f2aeece31a2636d1b037dabb69ef590e03bdc7eb68519b51ec86932a7" dependencies = [ "arrow-array", "arrow-buffer", @@ -503,7 +504,7 @@ dependencies = [ "futures-util", "handlebars", "http", - "indexmap 2.1.0", + "indexmap 2.2.2", "mime", "multer", "num-traits", @@ -549,7 +550,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "strum", + "strum 0.25.0", "syn 2.0.48", "thiserror", ] @@ -573,7 +574,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "323a5143f5bdd2030f45e3f2e0c821c9b1d36e79cf382129c64299c50a7f3750" dependencies = [ "bytes", - "indexmap 2.1.0", + "indexmap 2.2.2", "serde", "serde_json", ] @@ -652,7 +653,7 @@ dependencies = [ "hex", "http", "hyper", - "ring 0.17.7", + "ring", "time", "tokio", "tracing", @@ -827,7 +828,7 @@ dependencies = [ "p256", "percent-encoding", "regex", - "ring 0.17.7", + "ring", "sha2", "time", "tracing", @@ -1443,9 +1444,9 @@ dependencies = [ [[package]] name = "clap_complete" -version = "4.4.9" +version = "4.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df631ae429f6613fcd3a7c1adbdb65f637271e561b03680adaa6573015dfb106" +checksum = "abb745187d7f4d76267b37485a65e0149edd0e91a4cfcdd3f27524ad86cee9f3" dependencies = [ "clap", ] @@ -1501,7 +1502,7 @@ version = "7.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c64043d6c7b7a4c58e39e7efccfdea7b93d885a795d0c054a69dbbf4dd52686" dependencies = [ - "strum", + "strum 0.25.0", "strum_macros 0.25.3", "unicode-width", ] @@ -1815,9 +1816,9 @@ dependencies = [ [[package]] name = "curl-sys" -version = "0.4.70+curl-8.5.0" +version = "0.4.71+curl-8.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c0333d8849afe78a4c8102a429a446bfdd055832af071945520e835ae2d841e" +checksum = "c7b12a7ab780395666cb576203dc3ed6e01513754939a600b85196ccf5356bc5" dependencies = [ "cc", "libc", @@ -1864,9 +1865,9 @@ checksum = "ef8ae57c4978a2acd8b869ce6b9ca1dfe817bff704c220209fdef2c0b75a01b9" [[package]] name = "darling" -version = "0.20.3" +version = "0.20.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" +checksum = "fc5d6b04b3fd0ba9926f945895de7d806260a2d7431ba82e7edaecb043c4c6b8" dependencies = [ "darling_core", "darling_macro", @@ -1874,9 +1875,9 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.3" +version = "0.20.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" +checksum = "04e48a959bcd5c761246f5d090ebc2fbf7b9cd527a492b07a67510c108f1e7e3" dependencies = [ "fnv", "ident_case", @@ -1888,9 +1889,9 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.20.3" +version = "0.20.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" +checksum = "1d1545d67a2149e1d93b7e5c7752dce5a7426eb5d1357ddcfd89336b94444f77" dependencies = [ "darling_core", "quote", @@ -1918,13 +1919,13 @@ checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" [[package]] name = "datafusion" -version = "33.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "676796427e638d85e9eadf13765705212be60b34f8fc5d3934d95184c63ca1b4" +version = "35.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?branch=main#7641a3228156aab0e48c4bab5a6834b44f722d89" dependencies = [ "ahash", "arrow", "arrow-array", + "arrow-ipc", "arrow-schema", "async-compression", "async-trait", @@ -1932,9 +1933,10 @@ dependencies = [ "bzip2", "chrono", "dashmap", - "datafusion-common", + "datafusion-common 35.0.0 (git+https://github.com/apache/arrow-datafusion.git?branch=main)", "datafusion-execution", "datafusion-expr", + "datafusion-functions", "datafusion-optimizer", "datafusion-physical-expr", "datafusion-physical-plan", @@ -1944,8 +1946,8 @@ dependencies = [ "glob", "half", "hashbrown 0.14.3", - "indexmap 2.1.0", - "itertools 0.11.0", + "indexmap 2.2.2", + "itertools 0.12.1", "log", "num_cpus", "object_store", @@ -1953,7 +1955,7 @@ dependencies = [ "parquet", "pin-project-lite", "rand", - "sqlparser", + "sqlparser 0.43.1", "tempfile", "tokio", "tokio-util", @@ -1965,9 +1967,26 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "33.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e23b3d21a6531259d291bd20ce59282ea794bda1018b0a1e278c13cd52e50c" +checksum = "d29a7752143b446db4a2cccd9a6517293c6b97e8c39e520ca43ccd07135a4f7e" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-schema", + "chrono", + "half", + "libc", + "num_cpus", + "sqlparser 0.41.0", +] + +[[package]] +name = "datafusion-common" +version = "35.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?branch=main#7641a3228156aab0e48c4bab5a6834b44f722d89" dependencies = [ "ahash", "arrow", @@ -1976,22 +1995,22 @@ dependencies = [ "arrow-schema", "chrono", "half", + "libc", "num_cpus", "object_store", "parquet", - "sqlparser", + "sqlparser 0.43.1", ] [[package]] name = "datafusion-execution" -version = "33.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4de1fd0d8db0f2b8e4f4121bfa1c7c09d3a5c08a0a65c2229cd849eb65cff855" +version = "35.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?branch=main#7641a3228156aab0e48c4bab5a6834b44f722d89" dependencies = [ "arrow", "chrono", "dashmap", - "datafusion-common", + "datafusion-common 35.0.0 (git+https://github.com/apache/arrow-datafusion.git?branch=main)", "datafusion-expr", "futures", "hashbrown 0.14.3", @@ -2005,42 +2024,54 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "33.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18e227fe88bf6730cab378d0cd8fc4c6b2ea42bc7e414a8ea9feba7225932735" +version = "35.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?branch=main#7641a3228156aab0e48c4bab5a6834b44f722d89" dependencies = [ "ahash", "arrow", "arrow-array", - "datafusion-common", - "sqlparser", - "strum", - "strum_macros 0.25.3", + "datafusion-common 35.0.0 (git+https://github.com/apache/arrow-datafusion.git?branch=main)", + "paste", + "sqlparser 0.43.1", + "strum 0.26.1", + "strum_macros 0.26.1", +] + +[[package]] +name = "datafusion-functions" +version = "35.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?branch=main#7641a3228156aab0e48c4bab5a6834b44f722d89" +dependencies = [ + "arrow", + "base64 0.21.7", + "datafusion-common 35.0.0 (git+https://github.com/apache/arrow-datafusion.git?branch=main)", + "datafusion-execution", + "datafusion-expr", + "hex", + "log", ] [[package]] name = "datafusion-optimizer" -version = "33.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6648e62ea7605b9bfcd87fdc9d67e579c3b9ac563a87734ae5fe6d79ee4547" +version = "35.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?branch=main#7641a3228156aab0e48c4bab5a6834b44f722d89" dependencies = [ "arrow", "async-trait", "chrono", - "datafusion-common", + "datafusion-common 35.0.0 (git+https://github.com/apache/arrow-datafusion.git?branch=main)", "datafusion-expr", "datafusion-physical-expr", "hashbrown 0.14.3", - "itertools 0.11.0", + "itertools 0.12.1", "log", "regex-syntax 0.8.2", ] [[package]] name = "datafusion-physical-expr" -version = "33.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f32b8574add16a32411a9b3fb3844ac1fc09ab4e7be289f86fd56d620e4f2508" +version = "35.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?branch=main#7641a3228156aab0e48c4bab5a6834b44f722d89" dependencies = [ "ahash", "arrow", @@ -2052,14 +2083,14 @@ dependencies = [ "blake2", "blake3", "chrono", - "datafusion-common", + "datafusion-common 35.0.0 (git+https://github.com/apache/arrow-datafusion.git?branch=main)", + "datafusion-execution", "datafusion-expr", "half", "hashbrown 0.14.3", "hex", - "indexmap 2.1.0", - "itertools 0.11.0", - "libc", + "indexmap 2.2.2", + "itertools 0.12.1", "log", "md-5", "paste", @@ -2073,9 +2104,8 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "33.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "796abd77d5bfecd9e5275a99daf0ec45f5b3a793ec431349ce8211a67826fd22" +version = "35.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?branch=main#7641a3228156aab0e48c4bab5a6834b44f722d89" dependencies = [ "ahash", "arrow", @@ -2084,15 +2114,15 @@ dependencies = [ "arrow-schema", "async-trait", "chrono", - "datafusion-common", + "datafusion-common 35.0.0 (git+https://github.com/apache/arrow-datafusion.git?branch=main)", "datafusion-execution", "datafusion-expr", "datafusion-physical-expr", "futures", "half", "hashbrown 0.14.3", - "indexmap 2.1.0", - "itertools 0.11.0", + "indexmap 2.2.2", + "itertools 0.12.1", "log", "once_cell", "parking_lot", @@ -2104,16 +2134,15 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "33.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ced70b8a5648ba7b95c61fc512183c33287ffe2c9f22ffe22700619d7d48c84f" +version = "35.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?branch=main#7641a3228156aab0e48c4bab5a6834b44f722d89" dependencies = [ "arrow", "arrow-schema", - "datafusion-common", + "datafusion-common 35.0.0 (git+https://github.com/apache/arrow-datafusion.git?branch=main)", "datafusion-expr", "log", - "sqlparser", + "sqlparser 0.43.1", ] [[package]] @@ -2755,7 +2784,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 2.1.0", + "indexmap 2.2.2", "slab", "tokio", "tokio-util", @@ -3088,9 +3117,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.1.0" +version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" +checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -3201,6 +3230,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -3256,7 +3294,7 @@ dependencies = [ "base64 0.21.7", "js-sys", "pem", - "ring 0.17.7", + "ring", "serde", "serde_json", "simple_asn1", @@ -3320,7 +3358,7 @@ dependencies = [ "serde_with", "serde_yaml", "sha3", - "strum", + "strum 0.25.0", "strum_macros 0.25.3", "tar", "tempfile", @@ -3400,7 +3438,7 @@ dependencies = [ "event-bus", "event-sourcing", "futures", - "indoc 2.0.4", + "indoc 1.0.9", "internal-error", "kamu", "kamu-core", @@ -3634,6 +3672,7 @@ dependencies = [ "aws-credential-types", "clap", "datafusion", + "datafusion-common 35.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures", "object_store", "parking_lot", @@ -3903,9 +3942,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.152" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libflate" @@ -3946,9 +3985,9 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.1.14" +version = "1.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "295c17e837573c8c821dbaeb3cceb3d745ad082f7572191409e69cbc1b3fd050" +checksum = "037731f5d3aaa87a5675e895b63ddff1a87624bc29f77004ea829809654e48f6" dependencies = [ "cc", "libc", @@ -4192,7 +4231,7 @@ dependencies = [ "log", "memchr", "mime", - "spin 0.9.8", + "spin", "version_check", ] @@ -4332,6 +4371,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-format" version = "0.4.4" @@ -4430,9 +4475,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.7.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f930c88a43b1c3f6e776dfe495b4afab89882dbc81530c632db2ed65451ebcb4" +checksum = "d139f545f64630e2e3688fd9f81c470888ab01edeb72d13b4e86c566f1130000" dependencies = [ "async-trait", "base64 0.21.7", @@ -4441,14 +4486,14 @@ dependencies = [ "futures", "humantime", "hyper", - "itertools 0.11.0", + "itertools 0.12.1", "parking_lot", "percent-encoding", "quick-xml", "rand", "reqwest", - "ring 0.16.20", - "rustls-pemfile", + "ring", + "rustls-pemfile 2.0.0", "serde", "serde_json", "snafu", @@ -4510,9 +4555,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-src" -version = "300.2.1+3.2.0" +version = "300.2.2+3.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fe476c29791a5ca0d1273c697e96085bbabbbea2ef7afd5617e78a4b40332d3" +checksum = "8bbfad0063610ac26ee79f7484739e2b07555a75c42453b89263830b5c8103bc" dependencies = [ "cc", ] @@ -4618,9 +4663,9 @@ dependencies = [ [[package]] name = "parquet" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bfe55df96e3f02f11bf197ae37d91bb79801631f82f6195dd196ef521df3597" +checksum = "547b92ebf0c1177e3892f44c8f79757ee62e678d564a9834189725f2c5b7a750" dependencies = [ "ahash", "arrow-array", @@ -4636,6 +4681,7 @@ dependencies = [ "chrono", "flate2", "futures", + "half", "hashbrown 0.14.3", "lz4_flex", "num", @@ -4762,7 +4808,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 2.1.0", + "indexmap 2.2.2", ] [[package]] @@ -5089,9 +5135,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quick-xml" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" dependencies = [ "memchr", "serde", @@ -5244,9 +5290,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" -version = "0.11.23" +version = "0.11.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" +checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" dependencies = [ "async-compression", "base64 0.21.7", @@ -5267,10 +5313,12 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls", - "rustls-pemfile", + "rustls-native-certs", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", + "sync_wrapper", "system-configuration", "tokio", "tokio-rustls", @@ -5306,21 +5354,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "ring" -version = "0.16.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" -dependencies = [ - "cc", - "libc", - "once_cell", - "spin 0.5.2", - "untrusted 0.7.1", - "web-sys", - "winapi", -] - [[package]] name = "ring" version = "0.17.7" @@ -5330,8 +5363,8 @@ dependencies = [ "cc", "getrandom", "libc", - "spin 0.9.8", - "untrusted 0.9.0", + "spin", + "untrusted", "windows-sys 0.48.0", ] @@ -5403,9 +5436,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.30" +version = "0.38.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" +checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" dependencies = [ "bitflags 2.4.2", "errno", @@ -5421,7 +5454,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" dependencies = [ "log", - "ring 0.17.7", + "ring", "rustls-webpki", "sct", ] @@ -5433,7 +5466,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "schannel", "security-framework", ] @@ -5447,14 +5480,30 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4" +dependencies = [ + "base64 0.21.7", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf" + [[package]] name = "rustls-webpki" version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ - "ring 0.17.7", - "untrusted 0.9.0", + "ring", + "untrusted", ] [[package]] @@ -5522,8 +5571,8 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" dependencies = [ - "ring 0.17.7", - "untrusted 0.9.0", + "ring", + "untrusted", ] [[package]] @@ -5580,18 +5629,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.195" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02" +checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.195" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c" +checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" dependencies = [ "proc-macro2", "quote", @@ -5600,9 +5649,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.111" +version = "1.0.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" dependencies = [ "itoa", "ryu", @@ -5642,15 +5691,15 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.5.1" +version = "3.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5c9fdb6b00a489875b22efd4b78fe2b363b72265cc5f6eb2e2b9ee270e6140c" +checksum = "1b0ed1662c5a68664f45b76d18deb0e234aff37207086803165c961eb695e981" dependencies = [ "base64 0.21.7", "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.1.0", + "indexmap 2.2.2", "serde", "serde_json", "serde_with_macros", @@ -5659,9 +5708,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.5.1" +version = "3.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbff351eb4b33600a2e138dfa0b10b65a238ea8ff8fb2387c422c5022a3e8298" +checksum = "568577ff0ef47b879f736cd66740e022f3672788cdf002a05a4e609ea5a6fb15" dependencies = [ "darling", "proc-macro2", @@ -5671,11 +5720,11 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.30" +version = "0.9.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1bf28c79a99f70ee1f1d83d10c875d2e70618417fda01ad1785e027579d9d38" +checksum = "adf8a49373e98a4c5f0ceb5d05aa7c648d75f63774981ed95b7c7443bbd50c6e" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.2", "itoa", "ryu", "serde", @@ -5879,12 +5928,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "spin" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" - [[package]] name = "spin" version = "0.9.8" @@ -5913,9 +5956,19 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.39.0" +version = "0.41.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964" +dependencies = [ + "log", + "sqlparser_derive", +] + +[[package]] +name = "sqlparser" +version = "0.43.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "743b4dc2cbde11890ccb254a8fc9d537fa41b36da00de2a1c5e9848c9bc42bd7" +checksum = "f95c4bae5aba7cd30bd506f7140026ade63cff5afd778af8854026f9606bf5d4" dependencies = [ "log", "sqlparser_derive", @@ -5923,13 +5976,13 @@ dependencies = [ [[package]] name = "sqlparser_derive" -version = "0.1.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e" +checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.48", ] [[package]] @@ -5972,6 +6025,15 @@ dependencies = [ "strum_macros 0.25.3", ] +[[package]] +name = "strum" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "723b93e8addf9aa965ebe2d11da6d7540fa2283fcea14b3371ff055f7ba13f5f" +dependencies = [ + "strum_macros 0.26.1", +] + [[package]] name = "strum_macros" version = "0.24.3" @@ -5998,6 +6060,19 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "strum_macros" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a3417fc93d76740d974a01654a09777cb500428cc874ca9f45edfe0c4d4cd18" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.48", +] + [[package]] name = "subtle" version = "2.5.0" @@ -6190,13 +6265,14 @@ dependencies = [ [[package]] name = "time" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" +checksum = "fe80ced77cbfb4cb91a94bf72b378b4b6791a0d9b7f09d0be747d1bdff4e68bd" dependencies = [ "deranged", "itoa", "libc", + "num-conv", "num_threads", "powerfmt", "serde", @@ -6212,10 +6288,11 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26197e33420244aeb70c3e8c78376ca46571bc4e701e4791c2cd9f57dcb3a43f" +checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" dependencies = [ + "num-conv", "time-core", ] @@ -6255,9 +6332,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.1" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -6343,14 +6420,14 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1a195ec8c9da26928f773888e0742ca3ca1040c6cd859c919c9f59c1954ab35" +checksum = "c6a4b9e8023eb94392d3dca65d717c53abc5dad49c07cb65bb8fcd87115fa325" dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.21.0", + "toml_edit 0.21.1", ] [[package]] @@ -6368,18 +6445,18 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.2", "toml_datetime", "winnow", ] [[package]] name = "toml_edit" -version = "0.21.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d34d383cd00a163b4a5b85053df514d45bc330f6de7737edfe0a93311d1eaa03" +checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.2", "serde", "serde_spanned", "toml_datetime", @@ -6741,12 +6818,6 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" -[[package]] -name = "untrusted" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" - [[package]] name = "untrusted" version = "0.9.0" @@ -6933,9 +7004,9 @@ checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" [[package]] name = "wasm-streams" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129" dependencies = [ "futures-util", "js-sys", @@ -6973,9 +7044,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.3" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" [[package]] name = "whoami" @@ -7233,9 +7304,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.34" +version = "0.5.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7cf47b659b318dccbd69cc4797a39ae128f533dce7902a1096044d1967b9c16" +checksum = "a7cad8365489051ae9f054164e459304af2e7e9bb407c958076c8bf4aef52da5" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 3475c1e917..664e510fc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,3 +151,4 @@ debug = 1 # See: https://doc.rust-lang.org/cargo/reference/overriding-dependencies.html [patch.crates-io] # datafusion = { git = 'https://github.com/apache/arrow-datafusion.git', tag = '33.0.0-rc1' } +datafusion = { git = 'https://github.com/apache/arrow-datafusion.git', branch = 'main' } diff --git a/deny.toml b/deny.toml index 0f05d77774..211d2d6715 100644 --- a/deny.toml +++ b/deny.toml @@ -32,6 +32,7 @@ deny = [ { name = "clap", deny-multiple-versions = true }, { name = "datafusion", deny-multiple-versions = true }, { name = "object_store", deny-multiple-versions = true }, + { name = "parquet", deny-multiple-versions = true }, { name = "prost", deny-multiple-versions = true }, # { name = "rustls", deny-multiple-versions = true }, { name = "tokio", deny-multiple-versions = true }, diff --git a/src/adapter/flight-sql/Cargo.toml b/src/adapter/flight-sql/Cargo.toml index 985145cc1b..f0595eae52 100644 --- a/src/adapter/flight-sql/Cargo.toml +++ b/src/adapter/flight-sql/Cargo.toml @@ -22,11 +22,11 @@ doctest = false [dependencies] -arrow-flight = { version = "48", features = ["flight-sql-experimental"] } +arrow-flight = { version = "50", features = ["flight-sql-experimental"] } async-trait = { version = "0.1", default-features = false } base64 = { version = "0.21", default-features = false } dashmap = { version = "5", default-features = false } -datafusion = "33" +datafusion = { version = "35", default-features = false } futures = "0.3" like = { version = "0.3", default-features = false } prost = { version = "0.12", default-features = false } diff --git a/src/adapter/graphql/Cargo.toml b/src/adapter/graphql/Cargo.toml index c6bd4afe34..a12f69d2e1 100644 --- a/src/adapter/graphql/Cargo.toml +++ b/src/adapter/graphql/Cargo.toml @@ -36,10 +36,10 @@ async-graphql = { version = "6", features = ["chrono", "url", "apollo_tracing"] async-trait = { version = "0.1", default-features = false } cron = { version = "0.12.0", default-features = false } chrono = "0.4" -datafusion = "33" # TODO: Currently needed for type conversions but ideally should be encapsulated by kamu-core +datafusion = "35" # TODO: Currently needed for type conversions but ideally should be encapsulated by kamu-core dill = "0.8" futures = "0.3" -indoc = "2" +indoc = "1.0.6" serde = "1" serde_json = "1" tokio = { version = "1", default-features = false, features = [] } diff --git a/src/adapter/graphql/src/scalars/data_query.rs b/src/adapter/graphql/src/scalars/data_query.rs index 7989335ec9..d27492bce5 100644 --- a/src/adapter/graphql/src/scalars/data_query.rs +++ b/src/adapter/graphql/src/scalars/data_query.rs @@ -96,7 +96,7 @@ impl From for DataQueryResult { impl From for DataQueryResult { fn from(e: DataFusionError) -> Self { match e { - DataFusionError::SQL(e) => DataQueryResult::invalid_sql(e.to_string()), + DataFusionError::SQL(e, _backtrace) => DataQueryResult::invalid_sql(e.to_string()), DataFusionError::Plan(e) => DataQueryResult::invalid_sql(e), _ => DataQueryResult::internal(e.to_string()), } diff --git a/src/app/cli/Cargo.toml b/src/app/cli/Cargo.toml index 5fe19f78cf..0d999a2704 100644 --- a/src/app/cli/Cargo.toml +++ b/src/app/cli/Cargo.toml @@ -67,7 +67,7 @@ read_input = "0.8" # Basic user input webbrowser = "0.8" # For opening URLs in default system browser # APIs -arrow-flight = { version = "48", features = ["flight-sql-experimental"] } +arrow-flight = { version = "50", features = ["flight-sql-experimental"] } async-graphql = { version = "6", features = ["chrono", "url", "apollo_tracing"] } async-graphql-axum = "6" axum = { version = "0.6", features = ["ws"] } @@ -104,7 +104,7 @@ tracing-bunyan-formatter = "0.3" async-trait = "0.1" chrono = "0.4" cfg-if = "1" # Conditional compilation -datafusion = "33" +datafusion = { version = "35", default-features = false, features = ["crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] } dill = "0.8" dirs = "5" fs_extra = "1.3" diff --git a/src/app/cli/src/commands/sql_shell_command.rs b/src/app/cli/src/commands/sql_shell_command.rs index 5c358555aa..e710a5e86e 100644 --- a/src/app/cli/src/commands/sql_shell_command.rs +++ b/src/app/cli/src/commands/sql_shell_command.rs @@ -134,6 +134,7 @@ impl SqlShellCommand { let mut print_options = PrintOptions { format: PrintFormat::Table, quiet: false, + color: true, maxrows: MaxRows::Limited(DEFAULT_MAX_ROWS_FOR_OUTPUT), }; diff --git a/src/domain/core/Cargo.toml b/src/domain/core/Cargo.toml index 712f820ab3..4478a5bc26 100644 --- a/src/domain/core/Cargo.toml +++ b/src/domain/core/Cargo.toml @@ -42,8 +42,8 @@ tracing = { version = "0.1", default-features = false } url = { version = "2", default-features = false, features = ["serde"] } # TODO: Avoid this dependency or depend on sub-crates -datafusion = { version = "33", default-features = false } -object_store = { version = "0.7", default-features = false } +datafusion = { version = "35", default-features = false } +object_store = { version = "0.9", default-features = false } # TODO: Make serde optional serde = { version = "1", default-features = false, features = ["derive"] } diff --git a/src/domain/flow-system/Cargo.toml b/src/domain/flow-system/Cargo.toml index 95d012980b..3bf292beac 100644 --- a/src/domain/flow-system/Cargo.toml +++ b/src/domain/flow-system/Cargo.toml @@ -42,4 +42,4 @@ serde = { version = "1", default-features = false, features = ["derive"] } serde_with = { version = "3", default-features = false } [dev-dependencies] -datafusion = "33" +datafusion = { version = "35", default-features = false } diff --git a/src/domain/opendatafabric/Cargo.toml b/src/domain/opendatafabric/Cargo.toml index 9b9c6693c1..7bb9eb7f23 100644 --- a/src/domain/opendatafabric/Cargo.toml +++ b/src/domain/opendatafabric/Cargo.toml @@ -59,4 +59,4 @@ prost = "0.12" tonic = "0.10" # Optional -arrow = { optional = true, version = "48", default-features = false, features = ["ipc"] } +arrow = { optional = true, version = "50", default-features = false, features = ["ipc"] } diff --git a/src/infra/core/Cargo.toml b/src/infra/core/Cargo.toml index f1b7702b91..ac43cf4073 100644 --- a/src/infra/core/Cargo.toml +++ b/src/infra/core/Cargo.toml @@ -57,8 +57,8 @@ tar = "0.4" # Checkpoint archival zip = "0.6" # Data -datafusion = "33" -object_store = { version = "0.7", features = ["aws"] } +datafusion = { version = "35", default-features = false } +object_store = { version = "0.9", features = ["aws"] } digest = "0.10" sha3 = "0.10" diff --git a/src/infra/core/src/query_service_impl.rs b/src/infra/core/src/query_service_impl.rs index 60f983eee8..518c2c1c1a 100644 --- a/src/infra/core/src/query_service_impl.rs +++ b/src/infra/core/src/query_service_impl.rs @@ -532,7 +532,6 @@ impl SchemaProvider for KamuSchema { table_partition_cols: Vec::new(), parquet_pruning: None, skip_metadata: None, - insert_mode: datafusion::datasource::listing::ListingTableInsertMode::Error, }, ) .await diff --git a/src/infra/core/tests/tests/ingest/test_polling_ingest.rs b/src/infra/core/tests/tests/ingest/test_polling_ingest.rs index aade2147b7..91f6427a4b 100644 --- a/src/infra/core/tests/tests/ingest/test_polling_ingest.rs +++ b/src/infra/core/tests/tests/ingest/test_polling_ingest.rs @@ -122,7 +122,7 @@ async fn test_ingest_polling_snapshot() { OPTIONAL INT64 offset; REQUIRED INT32 op; REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); - REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true)); OPTIONAL BYTE_ARRAY city (STRING); OPTIONAL INT64 population; } @@ -907,7 +907,7 @@ async fn test_ingest_polling_preprocess_with_spark() { OPTIONAL INT64 offset; REQUIRED INT32 op; REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); - REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true)); OPTIONAL BYTE_ARRAY city (STRING); OPTIONAL INT64 population; } @@ -999,7 +999,7 @@ async fn test_ingest_polling_preprocess_with_flink() { OPTIONAL INT64 offset; REQUIRED INT32 op; REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); - REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true)); OPTIONAL BYTE_ARRAY city (STRING); OPTIONAL INT64 population; } diff --git a/src/infra/core/tests/tests/ingest/test_writer.rs b/src/infra/core/tests/tests/ingest/test_writer.rs index ea23f980d9..efa4f69eb6 100644 --- a/src/infra/core/tests/tests/ingest/test_writer.rs +++ b/src/infra/core/tests/tests/ingest/test_writer.rs @@ -12,7 +12,7 @@ use std::path::PathBuf; use std::sync::Arc; use chrono::{DateTime, TimeZone, Utc}; -use datafusion::arrow::datatypes::{Field, Schema, SchemaRef}; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::prelude::*; use dill::Component; use event_bus::EventBus; @@ -32,28 +32,6 @@ use opendatafabric as odf; // crate. /////////////////////////////////////////////////////////////// -fn assert_schemas_equal(lhs: &SchemaRef, rhs: &SchemaRef, ignore_nullability: bool) { - let map_field = |f: &Arc| -> Arc { - if ignore_nullability { - Arc::new(f.as_ref().clone().with_nullable(true)) - } else { - f.clone() - } - }; - - let lhs = Schema::new_with_metadata( - lhs.fields().iter().map(map_field).collect::>(), - lhs.metadata().clone(), - ); - let rhs = Schema::new_with_metadata( - rhs.fields().iter().map(map_field).collect::>(), - rhs.metadata().clone(), - ); - assert_eq!(lhs, rhs); -} - -///////////////////////////////////////////////////////////////////////////////////////// - #[test_group::group(engine, ingest, datafusion)] #[test_log::test(tokio::test)] async fn test_data_writer_happy_path() { @@ -92,7 +70,7 @@ async fn test_data_writer_happy_path() { OPTIONAL INT64 offset; REQUIRED INT32 op; REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); - REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true)); OPTIONAL BYTE_ARRAY city (STRING); OPTIONAL INT64 population; } @@ -125,11 +103,7 @@ async fn test_data_writer_happy_path() { let (schema_block_hash, schema_block) = harness.get_last_schema_block().await; let schema_in_block = schema_block.event.schema_as_arrow().unwrap(); let schema_in_data = SchemaRef::new(df.schema().into()); - // TODO: DataFusion issue where the schema of the - // DataFrame being saved into parquet and those read out of it differs in - // nullability - assert_ne!(schema_in_block, schema_in_data); - assert_schemas_equal(&schema_in_block, &schema_in_data, true); + assert_eq!(schema_in_block, schema_in_data); // Round 2 harness.set_system_time(Utc.with_ymd_and_hms(2010, 1, 2, 12, 0, 0).unwrap()); @@ -280,7 +254,7 @@ async fn test_data_writer_rejects_incompatible_schema() { OPTIONAL INT64 offset; REQUIRED INT32 op; REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); - REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true)); OPTIONAL BYTE_ARRAY city (STRING); OPTIONAL INT64 population; } @@ -331,7 +305,7 @@ async fn test_data_writer_rejects_incompatible_schema() { OPTIONAL INT64 offset; REQUIRED INT32 op; REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); - REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true)); OPTIONAL BYTE_ARRAY city (STRING); OPTIONAL INT64 population; } @@ -412,7 +386,7 @@ async fn test_data_writer_rejects_incompatible_schema() { OPTIONAL INT64 offset; REQUIRED INT32 op; REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); - REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true)); OPTIONAL BYTE_ARRAY city (STRING); OPTIONAL INT64 population; } @@ -545,7 +519,7 @@ async fn test_data_writer_snapshot_orders_by_pk_and_operation_type() { OPTIONAL INT64 offset; REQUIRED INT32 op; REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); - REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true)); OPTIONAL BYTE_ARRAY city (STRING); OPTIONAL INT64 population; } diff --git a/src/infra/core/tests/tests/test_query_service_impl.rs b/src/infra/core/tests/tests/test_query_service_impl.rs index 84fcae5e3d..4ae92ea718 100644 --- a/src/infra/core/tests/tests/test_query_service_impl.rs +++ b/src/infra/core/tests/tests/test_query_service_impl.rs @@ -434,7 +434,7 @@ async fn test_dataset_sql_unauthorized_common(catalog: dill::Catalog, tempdir: & result, Err(QueryError::DataFusionError( datafusion::common::DataFusionError::Plan(s) - )) if s.eq("table 'kamu.kamu.foo' not found") + )) if s.contains("table 'kamu.kamu.foo' not found") ); } diff --git a/src/infra/ingest-datafusion/Cargo.toml b/src/infra/ingest-datafusion/Cargo.toml index 006c8726f8..b0c752732a 100644 --- a/src/infra/ingest-datafusion/Cargo.toml +++ b/src/infra/ingest-datafusion/Cargo.toml @@ -27,12 +27,12 @@ opendatafabric = { workspace = true, features = ["arrow"] } kamu-core = { workspace = true } kamu-data-utils = { workspace = true } -datafusion = "33" +datafusion = { version = "35", default-features = false } digest = "0.10" geo-types = { version = "0.7", default-features = false, features = [] } geojson ={ version = "0.24", default-features = false, features = ["geo-types"] } glob = "0.3" -object_store = { version = "0.7", features = ["aws"] } +object_store = { version = "0.9", features = ["aws"] } serde = { version = "1" } serde_json = "1" sha3 = "0.10" diff --git a/src/infra/ingest-datafusion/src/merge_strategies/snapshot.rs b/src/infra/ingest-datafusion/src/merge_strategies/snapshot.rs index f78ef0862f..a73d3de4a7 100644 --- a/src/infra/ingest-datafusion/src/merge_strategies/snapshot.rs +++ b/src/infra/ingest-datafusion/src/merge_strategies/snapshot.rs @@ -91,13 +91,13 @@ impl MergeStrategySnapshot { let state = ledger .window(vec![Expr::WindowFunction( datafusion::logical_expr::expr::WindowFunction { - fun: datafusion::logical_expr::WindowFunction::BuiltInWindowFunction( + fun: datafusion::logical_expr::WindowFunctionDefinition::BuiltInWindowFunction( datafusion::logical_expr::BuiltInWindowFunction::RowNumber, ), args: Vec::new(), partition_by: self.primary_key.iter().map(col).collect(), order_by: vec![col(&self.vocab.offset_column).sort(false, false)], - window_frame: datafusion::logical_expr::WindowFrame::new(true), + window_frame: datafusion::logical_expr::WindowFrame::new(Some(false)), }, ) .alias(rank_col)]) diff --git a/src/infra/ingest-datafusion/src/readers/csv.rs b/src/infra/ingest-datafusion/src/readers/csv.rs index bf34f74ce0..b259f5dbb1 100644 --- a/src/infra/ingest-datafusion/src/readers/csv.rs +++ b/src/infra/ingest-datafusion/src/readers/csv.rs @@ -120,8 +120,6 @@ impl Reader for ReaderCsv { // re-compress it. file_compression_type: FileCompressionType::UNCOMPRESSED, file_sort_order: Vec::new(), - insert_mode: datafusion::datasource::listing::ListingTableInsertMode::Error, - infinite: false, }; let df = self diff --git a/src/infra/ingest-datafusion/src/readers/ndjson.rs b/src/infra/ingest-datafusion/src/readers/ndjson.rs index f537ef2e45..08ee5a7427 100644 --- a/src/infra/ingest-datafusion/src/readers/ndjson.rs +++ b/src/infra/ingest-datafusion/src/readers/ndjson.rs @@ -72,7 +72,6 @@ impl Reader for ReaderNdJson { file_compression_type: FileCompressionType::UNCOMPRESSED, file_sort_order: Vec::new(), infinite: false, - insert_mode: datafusion::datasource::listing::ListingTableInsertMode::Error, }; let df = self diff --git a/src/infra/ingest-datafusion/src/readers/parquet.rs b/src/infra/ingest-datafusion/src/readers/parquet.rs index 82f5e55a3d..35a37c0cde 100644 --- a/src/infra/ingest-datafusion/src/readers/parquet.rs +++ b/src/infra/ingest-datafusion/src/readers/parquet.rs @@ -49,7 +49,6 @@ impl Reader for ReaderParquet { parquet_pruning: None, skip_metadata: None, file_sort_order: Vec::new(), - insert_mode: datafusion::datasource::listing::ListingTableInsertMode::Error, }; let df = self diff --git a/src/infra/ingest-datafusion/src/writer.rs b/src/infra/ingest-datafusion/src/writer.rs index 533ea2500c..c981aa788f 100644 --- a/src/infra/ingest-datafusion/src/writer.rs +++ b/src/infra/ingest-datafusion/src/writer.rs @@ -244,7 +244,6 @@ impl DataWriterDataFusion { table_partition_cols: Vec::new(), parquet_pruning: None, skip_metadata: None, - insert_mode: datafusion::datasource::listing::ListingTableInsertMode::Error, }, ) .await @@ -330,13 +329,13 @@ impl DataWriterDataFusion { .with_column( &self.meta.vocab.offset_column, Expr::WindowFunction(WindowFunction { - fun: expr::WindowFunction::BuiltInWindowFunction( + fun: expr::WindowFunctionDefinition::BuiltInWindowFunction( expr::BuiltInWindowFunction::RowNumber, ), args: vec![], partition_by: vec![], order_by: self.merge_strategy.sort_order(), - window_frame: expr::WindowFrame::new(false), + window_frame: expr::WindowFrame::new(Some(false)), }), ) .int_err()?; @@ -478,7 +477,6 @@ impl DataWriterDataFusion { table_partition_cols: Vec::new(), parquet_pruning: None, skip_metadata: None, - insert_mode: datafusion::datasource::listing::ListingTableInsertMode::Error, }, ) .await diff --git a/src/infra/ingest-datafusion/tests/tests/test_reader_ndjson.rs b/src/infra/ingest-datafusion/tests/tests/test_reader_ndjson.rs index ed1554ecae..2f3b242dc2 100644 --- a/src/infra/ingest-datafusion/tests/tests/test_reader_ndjson.rs +++ b/src/infra/ingest-datafusion/tests/tests/test_reader_ndjson.rs @@ -231,7 +231,8 @@ async fn test_read_ndjson_format_timestamp_parse_failed() { assert_matches!( res.unwrap().collect().await, Err(DataFusionError::ArrowError( - ::datafusion::arrow::error::ArrowError::JsonError(_) + ::datafusion::arrow::error::ArrowError::JsonError(_), + _ )) ); }, diff --git a/src/utils/data-utils/Cargo.toml b/src/utils/data-utils/Cargo.toml index c4a3cec291..bec9651aae 100644 --- a/src/utils/data-utils/Cargo.toml +++ b/src/utils/data-utils/Cargo.toml @@ -24,8 +24,8 @@ doctest = false [dependencies] opendatafabric = { workspace = true } -arrow-digest = { version = "48", default-features = false } -datafusion = { version = "33", default-features = false } +arrow-digest = { version = "50", default-features = false } +datafusion = { version = "35", default-features = false, features = ["parquet"] } tracing = { version = "0.1", default-features = false } async-trait = "0.1" diff --git a/src/utils/data-utils/src/schema/cmp.rs b/src/utils/data-utils/src/schema/cmp.rs new file mode 100644 index 0000000000..eb1be181e5 --- /dev/null +++ b/src/utils/data-utils/src/schema/cmp.rs @@ -0,0 +1,33 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use datafusion::arrow::datatypes::{Field, Schema}; + +/// Compare schemas optionally performing some normalization +pub fn assert_schemas_equal(lhs: &Schema, rhs: &Schema, ignore_nullability: bool) { + let map_field = |f: &Arc| -> Arc { + if ignore_nullability { + Arc::new(f.as_ref().clone().with_nullable(true)) + } else { + f.clone() + } + }; + + let lhs = Schema::new_with_metadata( + lhs.fields().iter().map(map_field).collect::>(), + lhs.metadata().clone(), + ); + let rhs = Schema::new_with_metadata( + rhs.fields().iter().map(map_field).collect::>(), + rhs.metadata().clone(), + ); + assert_eq!(lhs, rhs); +} diff --git a/src/utils/data-utils/src/schema/format.rs b/src/utils/data-utils/src/schema/format.rs index 65a9ae505c..b09bbd1234 100644 --- a/src/utils/data-utils/src/schema/format.rs +++ b/src/utils/data-utils/src/schema/format.rs @@ -146,6 +146,7 @@ impl<'a> ParquetJsonSchemaWriter<'a> { ) -> String { match logical_type { Some(logical_type) => match logical_type { + LogicalType::Float16 => "FLOAT16".to_string(), LogicalType::Integer { bit_width, is_signed, diff --git a/src/utils/data-utils/src/schema/mod.rs b/src/utils/data-utils/src/schema/mod.rs index 95c0c83549..bf334737c5 100644 --- a/src/utils/data-utils/src/schema/mod.rs +++ b/src/utils/data-utils/src/schema/mod.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +pub mod cmp; pub mod convert; pub mod format; pub mod parse; diff --git a/src/utils/datafusion-cli/Cargo.toml b/src/utils/datafusion-cli/Cargo.toml index 92030d0d62..4f6842af28 100644 --- a/src/utils/datafusion-cli/Cargo.toml +++ b/src/utils/datafusion-cli/Cargo.toml @@ -30,16 +30,17 @@ edition = { workspace = true } publish = { workspace = true } [dependencies] -arrow = "48.0.0" +arrow = "50.0.0" async-trait = "0.1" aws-config = "0.57" aws-credential-types = "0.57" clap = { version = "4", features = ["derive"] } -datafusion = { version = "33", features = ["crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] } +datafusion-common = { version = "35", default-features = false } +datafusion = { version = "35", default-features = false, features = ["crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] } futures = "0.3" -object_store = { version = "0.7", features = ["aws", "gcp"] } +object_store = { version = "0.9", features = ["aws", "gcp"] } parking_lot = { version = "0.12" } -parquet = { version = "48.0.0", default-features = false } +parquet = { version = "50", default-features = false } regex = "1" rustyline = "11" tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot", "signal"] } diff --git a/src/utils/datafusion-cli/src/catalog.rs b/src/utils/datafusion-cli/src/catalog.rs index 93af8061c0..eaeb357d6f 100644 --- a/src/utils/datafusion-cli/src/catalog.rs +++ b/src/utils/datafusion-cli/src/catalog.rs @@ -20,7 +20,7 @@ use std::sync::{Arc, Weak}; use async_trait::async_trait; use datafusion::catalog::schema::SchemaProvider; -use datafusion::catalog::{CatalogList, CatalogProvider}; +use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use datafusion::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; use datafusion::datasource::TableProvider; use datafusion::error::Result; @@ -30,17 +30,17 @@ use parking_lot::RwLock; /// Wraps another catalog, automatically creating table providers /// for local files if needed pub struct DynamicFileCatalog { - inner: Arc, + inner: Arc, state: Weak>, } impl DynamicFileCatalog { - pub fn new(inner: Arc, state: Weak>) -> Self { + pub fn new(inner: Arc, state: Weak>) -> Self { Self { inner, state } } } -impl CatalogList for DynamicFileCatalog { +impl CatalogProviderList for DynamicFileCatalog { fn as_any(&self) -> &dyn Any { self } diff --git a/src/utils/datafusion-cli/src/command.rs b/src/utils/datafusion-cli/src/command.rs index c13ea4c2d6..e7660a2360 100644 --- a/src/utils/datafusion-cli/src/command.rs +++ b/src/utils/datafusion-cli/src/command.rs @@ -78,7 +78,7 @@ impl Command { let file = File::open(filename).map_err(|e| { DataFusionError::Execution(format!("Error opening {:?} {}", filename, e)) })?; - exec_from_lines(ctx, &mut BufReader::new(file), print_options).await; + exec_from_lines(ctx, &mut BufReader::new(file), print_options).await?; Ok(()) } else { exec_err!("Required filename argument is missing") diff --git a/src/utils/datafusion-cli/src/exec.rs b/src/utils/datafusion-cli/src/exec.rs index 039f21b2eb..5389848a6b 100644 --- a/src/utils/datafusion-cli/src/exec.rs +++ b/src/utils/datafusion-cli/src/exec.rs @@ -23,16 +23,19 @@ use std::io::BufReader; use std::sync::Arc; use std::time::Instant; -use datafusion::common::plan_datafusion_err; +use datafusion::common::{exec_datafusion_err, plan_datafusion_err}; use datafusion::datasource::listing::ListingTableUrl; +use datafusion::datasource::physical_plan::is_plan_streaming; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan}; +use datafusion::physical_plan::{collect, execute_stream}; use datafusion::prelude::SessionContext; use datafusion::sql::parser::DFParser; use datafusion::sql::sqlparser::dialect::dialect_from_str; use object_store::ObjectStore; use rustyline::error::ReadlineError; use rustyline::Editor; +use tokio::signal; use url::Url; use crate::command::{Command, OutputFormat}; @@ -42,21 +45,21 @@ use crate::object_storage::{ get_oss_object_store_builder, get_s3_object_store_builder, }; +use crate::print_format::PrintFormat; use crate::print_options::{MaxRows, PrintOptions}; /// run and execute SQL statements and commands, against a context with the /// given print options pub async fn exec_from_commands( ctx: &mut SessionContext, - print_options: &PrintOptions, commands: Vec, -) { + print_options: &PrintOptions, +) -> Result<()> { for sql in commands { - match exec_and_print(ctx, print_options, sql).await { - Ok(_) => {} - Err(err) => println!("{err}"), - } + exec_and_print(ctx, print_options, sql).await?; } + + Ok(()) } /// run and execute SQL statements and commands from a file, against a context @@ -65,7 +68,7 @@ pub async fn exec_from_lines( ctx: &mut SessionContext, reader: &mut BufReader, print_options: &PrintOptions, -) { +) -> Result<()> { let mut query = "".to_owned(); for line in reader.lines() { @@ -95,26 +98,28 @@ pub async fn exec_from_lines( // run the left over query if the last statement doesn't contain ‘;’ // ignore if it only consists of '\n' if query.contains(|c| c != '\n') { - match exec_and_print(ctx, print_options, query).await { - Ok(_) => {} - Err(err) => println!("{err}"), - } + exec_and_print(ctx, print_options, query).await?; } + + Ok(()) } pub async fn exec_from_files( - files: Vec, ctx: &mut SessionContext, + files: Vec, print_options: &PrintOptions, -) { +) -> Result<()> { let files = files .into_iter() .map(|file_path| File::open(file_path).unwrap()) .collect::>(); + for file in files { let mut reader = BufReader::new(file); - exec_from_lines(ctx, &mut reader, print_options).await; + exec_from_lines(ctx, &mut reader, print_options).await?; } + + Ok(()) } /// run and execute SQL statements and commands against a context with the given @@ -126,11 +131,10 @@ pub async fn exec_from_repl( let mut rl = Editor::new()?; rl.set_helper(Some(CliHelper::new( &ctx.task_ctx().session_config().options().sql_parser.dialect, + print_options.color, ))); rl.load_history(".history").ok(); - let mut print_options = print_options.clone(); - loop { match rl.readline("❯ ") { Ok(line) if line.starts_with('\\') => { @@ -142,7 +146,7 @@ pub async fn exec_from_repl( Command::OutputFormat(subcommand) => { if let Some(subcommand) = subcommand { if let Ok(command) = subcommand.parse::() { - if let Err(e) = command.execute(&mut print_options).await { + if let Err(e) = command.execute(print_options).await { eprintln!("{e}") } } else { @@ -153,7 +157,7 @@ pub async fn exec_from_repl( } } _ => { - if let Err(e) = cmd.execute(ctx, &mut print_options).await { + if let Err(e) = cmd.execute(ctx, print_options).await { eprintln!("{e}") } } @@ -164,9 +168,15 @@ pub async fn exec_from_repl( } Ok(line) => { rl.add_history_entry(line.trim_end())?; - match exec_and_print(ctx, &print_options, line).await { - Ok(_) => {} - Err(err) => eprintln!("{err}"), + tokio::select! { + res = exec_and_print(ctx, print_options, line) => match res { + Ok(_) => {} + Err(err) => eprintln!("{err}"), + }, + _ = signal::ctrl_c() => { + println!("^C"); + continue + }, } // dialect might have changed rl.helper_mut() @@ -197,7 +207,6 @@ async fn exec_and_print( sql: String, ) -> Result<()> { let now = Instant::now(); - let sql = unescape_input(&sql)?; let task_ctx = ctx.task_ctx(); let dialect = &task_ctx.session_config().options().sql_parser.dialect; @@ -207,9 +216,10 @@ async fn exec_and_print( Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi." ) })?; + let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?; for statement in statements { - let plan = ctx.state().statement_to_plan(statement).await?; + let mut plan = ctx.state().statement_to_plan(statement).await?; // For plans like `Explain` ignore `MaxRows` option and always display all rows let should_ignore_maxrows = matches!( @@ -217,31 +227,36 @@ async fn exec_and_print( LogicalPlan::Explain(_) | LogicalPlan::DescribeTable(_) | LogicalPlan::Analyze(_) ); - let df = match &plan { - LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => { - create_external_table(ctx, cmd).await?; - ctx.execute_logical_plan(plan).await? - } - _ => ctx.execute_logical_plan(plan).await?, - }; + // Note that cmd is a mutable reference so that create_external_table function + // can remove all datafusion-cli specific options before passing through + // to datafusion. Otherwise, datafusion will raise Configuration errors. + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { + create_external_table(ctx, cmd).await?; + } - let results = df.collect().await?; + let df = ctx.execute_logical_plan(plan).await?; + let physical_plan = df.create_physical_plan().await?; - let print_options = if should_ignore_maxrows { - PrintOptions { - maxrows: MaxRows::Unlimited, - ..print_options.clone() - } + if is_plan_streaming(&physical_plan)? { + let stream = execute_stream(physical_plan, task_ctx.clone())?; + print_options.print_stream(stream, now).await?; } else { - print_options.clone() - }; - print_options.print_batches(&results, now)?; + let mut print_options = print_options.clone(); + if should_ignore_maxrows { + print_options.maxrows = MaxRows::Unlimited; + } + if print_options.format == PrintFormat::Automatic { + print_options.format = PrintFormat::Table; + } + let results = collect(physical_plan, task_ctx.clone()).await?; + print_options.print_batches(&results, now)?; + } } Ok(()) } -async fn create_external_table(ctx: &SessionContext, cmd: &CreateExternalTable) -> Result<()> { +async fn create_external_table(ctx: &SessionContext, cmd: &mut CreateExternalTable) -> Result<()> { let table_path = ListingTableUrl::parse(&cmd.location)?; let scheme = table_path.scheme(); let url: &Url = table_path.as_ref(); @@ -265,12 +280,7 @@ async fn create_external_table(ctx: &SessionContext, cmd: &CreateExternalTable) ctx.runtime_env() .object_store_registry .get_store(url) - .map_err(|_| { - DataFusionError::Execution(format!( - "Unsupported object store scheme: {}", - scheme - )) - })? + .map_err(|_| exec_datafusion_err!("Unsupported object store scheme: {}", scheme))? } }; @@ -281,16 +291,32 @@ async fn create_external_table(ctx: &SessionContext, cmd: &CreateExternalTable) #[cfg(test)] mod tests { - use datafusion::common::plan_err; + use std::str::FromStr; + + use datafusion::common::file_options::StatementOptions; + use datafusion::common::{plan_err, FileTypeWriterOptions}; use super::*; async fn create_external_table_test(location: &str, sql: &str) -> Result<()> { let ctx = SessionContext::new(); - let plan = ctx.state().create_logical_plan(sql).await?; + let mut plan = ctx.state().create_logical_plan(sql).await?; - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { create_external_table(&ctx, cmd).await?; + let options: Vec<_> = cmd + .options + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let statement_options = StatementOptions::new(options); + let file_type = datafusion::common::FileType::from_str(cmd.file_type.as_str())?; + + let _file_type_writer_options = FileTypeWriterOptions::build( + &file_type, + ctx.state().config_options(), + &statement_options, + )?; } else { return plan_err!("LogicalPlan is not a CreateExternalTable"); } @@ -309,16 +335,13 @@ mod tests { let session_token = "fake_session_token"; let location = "s3://bucket/path/file.parquet"; - // Missing region + // Missing region, use object_store defaults let sql = format!( "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key' '{secret_access_key}') \ LOCATION '{location}'" ); - let err = create_external_table_test(location, &sql) - .await - .unwrap_err(); - assert!(err.to_string().contains("Missing region")); + create_external_table_test(location, &sql).await?; // Should be OK let sql = format!( @@ -352,8 +375,9 @@ mod tests { #[tokio::test] async fn create_object_store_table_gcs() -> Result<()> { let service_account_path = "fake_service_account_path"; - let service_account_key = - "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}"; + let service_account_key = "{\"private_key\": \ + \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"\ + , \"private_key_id\":\"id\"}"; let application_credentials_path = "fake_application_credentials_path"; let location = "gcs://bucket/path/file.parquet"; @@ -374,8 +398,9 @@ mod tests { ); let err = create_external_table_test(location, &sql) .await - .unwrap_err(); - assert!(err.to_string().contains("No RSA key found in pem file")); + .unwrap_err() + .to_string(); + assert!(err.contains("No RSA key found in pem file"), "{err}"); // for application_credentials_path let sql = format!( @@ -397,15 +422,7 @@ mod tests { // Ensure that local files are also registered let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'"); - let err = create_external_table_test(location, &sql) - .await - .unwrap_err(); - - if let DataFusionError::IoError(e) = err { - assert_eq!(e.kind(), std::io::ErrorKind::NotFound); - } else { - return Err(err); - } + create_external_table_test(location, &sql).await.unwrap(); Ok(()) } diff --git a/src/utils/datafusion-cli/src/functions.rs b/src/utils/datafusion-cli/src/functions.rs index 5fb4fcfb1b..0f7c216cfe 100644 --- a/src/utils/datafusion-cli/src/functions.rs +++ b/src/utils/datafusion-cli/src/functions.rs @@ -17,14 +17,28 @@ //! Functions that are query-able and searchable via the `\h` command use std::fmt; +use std::fs::File; use std::str::FromStr; use std::sync::Arc; -use arrow::array::StringArray; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::array::{Int64Array, StringArray}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; +use async_trait::async_trait; +use datafusion::common::{plan_err, Column, DataFusionError}; +use datafusion::datasource::function::TableFunctionImpl; +use datafusion::datasource::TableProvider; use datafusion::error::Result; +use datafusion::execution::context::SessionState; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::memory::MemoryExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::scalar::ScalarValue; +use parquet::basic::ConvertedType; +use parquet::file::reader::FileReader; +use parquet::file::serialized_reader::SerializedFileReader; +use parquet::file::statistics::Statistics; #[derive(Debug)] pub enum Function { @@ -197,3 +211,231 @@ pub fn display_all_functions() -> Result<()> { println!("{}", pretty_format_batches(&[batch]).unwrap()); Ok(()) } + +/// PARQUET_META table function +struct ParquetMetadataTable { + schema: SchemaRef, + batch: RecordBatch, +} + +#[async_trait] +impl TableProvider for ParquetMetadataTable { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> datafusion::logical_expr::TableType { + datafusion::logical_expr::TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(Arc::new(MemoryExec::try_new( + &[vec![self.batch.clone()]], + TableProvider::schema(self), + projection.cloned(), + )?)) + } +} + +fn convert_parquet_statistics( + value: &Statistics, + converted_type: ConvertedType, +) -> (String, String) { + match (value, converted_type) { + (Statistics::Boolean(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Int32(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Int64(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Int96(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Float(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Double(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::ByteArray(val), ConvertedType::UTF8) => { + let min_bytes = val.min(); + let max_bytes = val.max(); + let min = min_bytes + .as_utf8() + .map(|v| v.to_string()) + .unwrap_or_else(|_| min_bytes.to_string()); + + let max = max_bytes + .as_utf8() + .map(|v| v.to_string()) + .unwrap_or_else(|_| max_bytes.to_string()); + (min, max) + } + (Statistics::ByteArray(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => { + let min_bytes = val.min(); + let max_bytes = val.max(); + let min = min_bytes + .as_utf8() + .map(|v| v.to_string()) + .unwrap_or_else(|_| min_bytes.to_string()); + + let max = max_bytes + .as_utf8() + .map(|v| v.to_string()) + .unwrap_or_else(|_| max_bytes.to_string()); + (min, max) + } + (Statistics::FixedLenByteArray(val), _) => (val.min().to_string(), val.max().to_string()), + } +} + +pub struct ParquetMetadataFunc {} + +impl TableFunctionImpl for ParquetMetadataFunc { + fn call(&self, exprs: &[Expr]) -> Result> { + let filename = match exprs.first() { + // single quote: + // parquet_metadata('x.parquet') + Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, + // double quote: + // parquet_metadata("x.parquet") + Some(Expr::Column(Column { name, .. })) => name, + _ => { + return plan_err!("parquet_metadata requires string argument as its input"); + } + }; + + let file = File::open(filename.clone())?; + let reader = SerializedFileReader::new(file)?; + let metadata = reader.metadata(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("filename", DataType::Utf8, true), + Field::new("row_group_id", DataType::Int64, true), + Field::new("row_group_num_rows", DataType::Int64, true), + Field::new("row_group_num_columns", DataType::Int64, true), + Field::new("row_group_bytes", DataType::Int64, true), + Field::new("column_id", DataType::Int64, true), + Field::new("file_offset", DataType::Int64, true), + Field::new("num_values", DataType::Int64, true), + Field::new("path_in_schema", DataType::Utf8, true), + Field::new("type", DataType::Utf8, true), + Field::new("stats_min", DataType::Utf8, true), + Field::new("stats_max", DataType::Utf8, true), + Field::new("stats_null_count", DataType::Int64, true), + Field::new("stats_distinct_count", DataType::Int64, true), + Field::new("stats_min_value", DataType::Utf8, true), + Field::new("stats_max_value", DataType::Utf8, true), + Field::new("compression", DataType::Utf8, true), + Field::new("encodings", DataType::Utf8, true), + Field::new("index_page_offset", DataType::Int64, true), + Field::new("dictionary_page_offset", DataType::Int64, true), + Field::new("data_page_offset", DataType::Int64, true), + Field::new("total_compressed_size", DataType::Int64, true), + Field::new("total_uncompressed_size", DataType::Int64, true), + ])); + + // construct recordbatch from metadata + let mut filename_arr = vec![]; + let mut row_group_id_arr = vec![]; + let mut row_group_num_rows_arr = vec![]; + let mut row_group_num_columns_arr = vec![]; + let mut row_group_bytes_arr = vec![]; + let mut column_id_arr = vec![]; + let mut file_offset_arr = vec![]; + let mut num_values_arr = vec![]; + let mut path_in_schema_arr = vec![]; + let mut type_arr = vec![]; + let mut stats_min_arr = vec![]; + let mut stats_max_arr = vec![]; + let mut stats_null_count_arr = vec![]; + let mut stats_distinct_count_arr = vec![]; + let mut stats_min_value_arr = vec![]; + let mut stats_max_value_arr = vec![]; + let mut compression_arr = vec![]; + let mut encodings_arr = vec![]; + let mut index_page_offset_arr = vec![]; + let mut dictionary_page_offset_arr = vec![]; + let mut data_page_offset_arr = vec![]; + let mut total_compressed_size_arr = vec![]; + let mut total_uncompressed_size_arr = vec![]; + for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() { + for (col_idx, column) in row_group.columns().iter().enumerate() { + filename_arr.push(filename.clone()); + row_group_id_arr.push(rg_idx as i64); + row_group_num_rows_arr.push(row_group.num_rows()); + row_group_num_columns_arr.push(row_group.num_columns() as i64); + row_group_bytes_arr.push(row_group.total_byte_size()); + column_id_arr.push(col_idx as i64); + file_offset_arr.push(column.file_offset()); + num_values_arr.push(column.num_values()); + path_in_schema_arr.push(column.column_path().to_string()); + type_arr.push(column.column_type().to_string()); + let converted_type = column.column_descr().converted_type(); + + if let Some(s) = column.statistics() { + let (min_val, max_val) = if s.has_min_max_set() { + let (min_val, max_val) = convert_parquet_statistics(s, converted_type); + (Some(min_val), Some(max_val)) + } else { + (None, None) + }; + stats_min_arr.push(min_val.clone()); + stats_max_arr.push(max_val.clone()); + stats_null_count_arr.push(Some(s.null_count() as i64)); + stats_distinct_count_arr.push(s.distinct_count().map(|c| c as i64)); + stats_min_value_arr.push(min_val); + stats_max_value_arr.push(max_val); + } else { + stats_min_arr.push(None); + stats_max_arr.push(None); + stats_null_count_arr.push(None); + stats_distinct_count_arr.push(None); + stats_min_value_arr.push(None); + stats_max_value_arr.push(None); + }; + compression_arr.push(format!("{:?}", column.compression())); + encodings_arr.push(format!("{:?}", column.encodings())); + index_page_offset_arr.push(column.index_page_offset()); + dictionary_page_offset_arr.push(column.dictionary_page_offset()); + data_page_offset_arr.push(column.data_page_offset()); + total_compressed_size_arr.push(column.compressed_size()); + total_uncompressed_size_arr.push(column.uncompressed_size()); + } + } + + let rb = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(filename_arr)), + Arc::new(Int64Array::from(row_group_id_arr)), + Arc::new(Int64Array::from(row_group_num_rows_arr)), + Arc::new(Int64Array::from(row_group_num_columns_arr)), + Arc::new(Int64Array::from(row_group_bytes_arr)), + Arc::new(Int64Array::from(column_id_arr)), + Arc::new(Int64Array::from(file_offset_arr)), + Arc::new(Int64Array::from(num_values_arr)), + Arc::new(StringArray::from(path_in_schema_arr)), + Arc::new(StringArray::from(type_arr)), + Arc::new(StringArray::from(stats_min_arr)), + Arc::new(StringArray::from(stats_max_arr)), + Arc::new(Int64Array::from(stats_null_count_arr)), + Arc::new(Int64Array::from(stats_distinct_count_arr)), + Arc::new(StringArray::from(stats_min_value_arr)), + Arc::new(StringArray::from(stats_max_value_arr)), + Arc::new(StringArray::from(compression_arr)), + Arc::new(StringArray::from(encodings_arr)), + Arc::new(Int64Array::from(index_page_offset_arr)), + Arc::new(Int64Array::from(dictionary_page_offset_arr)), + Arc::new(Int64Array::from(data_page_offset_arr)), + Arc::new(Int64Array::from(total_compressed_size_arr)), + Arc::new(Int64Array::from(total_uncompressed_size_arr)), + ], + )?; + + let parquet_metadata = ParquetMetadataTable { schema, batch: rb }; + Ok(Arc::new(parquet_metadata)) + } +} diff --git a/src/utils/datafusion-cli/src/helper.rs b/src/utils/datafusion-cli/src/helper.rs index 235482f771..1d972bb801 100644 --- a/src/utils/datafusion-cli/src/helper.rs +++ b/src/utils/datafusion-cli/src/helper.rs @@ -19,6 +19,8 @@ //! validation, and auto-completion for file name during creating external //! table. +use std::borrow::Cow; + use datafusion::common::sql_err; use datafusion::error::DataFusionError; use datafusion::sql::parser::{DFParser, Statement}; @@ -31,16 +33,25 @@ use rustyline::hint::Hinter; use rustyline::validate::{ValidationContext, ValidationResult, Validator}; use rustyline::{Context, Helper, Result}; +use crate::highlighter::{NoSyntaxHighlighter, SyntaxHighlighter}; + pub struct CliHelper { completer: FilenameCompleter, dialect: String, + highlighter: Box, } impl CliHelper { - pub fn new(dialect: &str) -> Self { + pub fn new(dialect: &str, color: bool) -> Self { + let highlighter: Box = if !color { + Box::new(NoSyntaxHighlighter {}) + } else { + Box::new(SyntaxHighlighter::new(dialect)) + }; Self { completer: FilenameCompleter::new(), dialect: dialect.into(), + highlighter, } } @@ -91,11 +102,19 @@ impl CliHelper { impl Default for CliHelper { fn default() -> Self { - Self::new("generic") + Self::new("generic", false) } } -impl Highlighter for CliHelper {} +impl Highlighter for CliHelper { + fn highlight<'l>(&self, line: &'l str, pos: usize) -> Cow<'l, str> { + self.highlighter.highlight(line, pos) + } + + fn highlight_char(&self, line: &str, pos: usize) -> bool { + self.highlighter.highlight_char(line, pos) + } +} impl Hinter for CliHelper { type Hint = String; diff --git a/src/utils/datafusion-cli/src/highlighter.rs b/src/utils/datafusion-cli/src/highlighter.rs new file mode 100644 index 0000000000..f7e16c880d --- /dev/null +++ b/src/utils/datafusion-cli/src/highlighter.rs @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The syntax highlighter. + +use std::borrow::Cow::{self, Borrowed}; +use std::fmt::Display; + +use datafusion::sql::sqlparser::dialect::{dialect_from_str, Dialect, GenericDialect}; +use datafusion::sql::sqlparser::keywords::Keyword; +use datafusion::sql::sqlparser::tokenizer::{Token, Tokenizer}; +use rustyline::highlight::Highlighter; + +/// The syntax highlighter. +#[derive(Debug)] +pub struct SyntaxHighlighter { + dialect: Box, +} + +impl SyntaxHighlighter { + pub fn new(dialect: &str) -> Self { + let dialect = dialect_from_str(dialect).unwrap_or(Box::new(GenericDialect {})); + Self { dialect } + } +} + +pub struct NoSyntaxHighlighter {} + +impl Highlighter for NoSyntaxHighlighter {} + +impl Highlighter for SyntaxHighlighter { + fn highlight<'l>(&self, line: &'l str, _: usize) -> Cow<'l, str> { + let mut out_line = String::new(); + + // `with_unescape(false)` since we want to rebuild the original string. + let mut tokenizer = Tokenizer::new(self.dialect.as_ref(), line).with_unescape(false); + let tokens = tokenizer.tokenize(); + match tokens { + Ok(tokens) => { + for token in tokens.iter() { + match token { + Token::Word(w) if w.keyword != Keyword::NoKeyword => { + out_line.push_str(&Color::red(token)); + } + Token::SingleQuotedString(_) => { + out_line.push_str(&Color::green(token)); + } + other => out_line.push_str(&format!("{other}")), + } + } + out_line.into() + } + Err(_) => Borrowed(line), + } + } + + fn highlight_char(&self, line: &str, _: usize) -> bool { + !line.is_empty() + } +} + +/// Convenient utility to return strings with [ANSI color](https://gist.github.com/JBlond/2fea43a3049b38287e5e9cefc87b2124). +struct Color {} + +impl Color { + fn green(s: impl Display) -> String { + format!("\x1b[92m{s}\x1b[0m") + } + + fn red(s: impl Display) -> String { + format!("\x1b[91m{s}\x1b[0m") + } +} + +#[cfg(test)] +mod tests { + use rustyline::highlight::Highlighter; + + use super::SyntaxHighlighter; + + #[test] + fn highlighter_valid() { + let s = "SElect col_a from tab_1;"; + let highlighter = SyntaxHighlighter::new("generic"); + let out = highlighter.highlight(s, s.len()); + assert_eq!( + "\u{1b}[91mSElect\u{1b}[0m col_a \u{1b}[91mfrom\u{1b}[0m tab_1;", + out + ); + } + + #[test] + fn highlighter_valid_with_new_line() { + let s = "SElect col_a from tab_1\n WHERE col_b = 'なにか';"; + let highlighter = SyntaxHighlighter::new("generic"); + let out = highlighter.highlight(s, s.len()); + assert_eq!( + "\u{1b}[91mSElect\u{1b}[0m col_a \u{1b}[91mfrom\u{1b}[0m tab_1\n \ + \u{1b}[91mWHERE\u{1b}[0m col_b = \u{1b}[92m'なにか'\u{1b}[0m;", + out + ); + } + + #[test] + fn highlighter_invalid() { + let s = "SElect col_a from tab_1 WHERE col_b = ';"; + let highlighter = SyntaxHighlighter::new("generic"); + let out = highlighter.highlight(s, s.len()); + assert_eq!("SElect col_a from tab_1 WHERE col_b = ';", out); + } +} diff --git a/src/utils/datafusion-cli/src/lib.rs b/src/utils/datafusion-cli/src/lib.rs index 7eb3cb51c1..139a60b8cf 100644 --- a/src/utils/datafusion-cli/src/lib.rs +++ b/src/utils/datafusion-cli/src/lib.rs @@ -23,6 +23,7 @@ pub mod command; pub mod exec; pub mod functions; pub mod helper; +pub mod highlighter; pub mod object_storage; pub mod print_format; pub mod print_options; diff --git a/src/utils/datafusion-cli/src/object_storage.rs b/src/utils/datafusion-cli/src/object_storage.rs index a9a889de00..7b7b05f82e 100644 --- a/src/utils/datafusion-cli/src/object_storage.rs +++ b/src/utils/datafusion-cli/src/object_storage.rs @@ -28,20 +28,23 @@ use url::Url; pub async fn get_s3_object_store_builder( url: &Url, - cmd: &CreateExternalTable, + cmd: &mut CreateExternalTable, ) -> Result { let bucket_name = get_bucket_name(url)?; let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name); if let (Some(access_key_id), Some(secret_access_key)) = ( - cmd.options.get("access_key_id"), - cmd.options.get("secret_access_key"), + // These options are datafusion-cli specific and must be removed before passing through to + // datafusion. Otherwise, a Configuration error will be raised. + cmd.options.remove("access_key_id"), + cmd.options.remove("secret_access_key"), ) { + println!("removing secret access key!"); builder = builder .with_access_key_id(access_key_id) .with_secret_access_key(secret_access_key); - if let Some(session_token) = cmd.options.get("session_token") { + if let Some(session_token) = cmd.options.remove("session_token") { builder = builder.with_token(session_token); } } else { @@ -64,7 +67,7 @@ pub async fn get_s3_object_store_builder( builder = builder.with_credentials(credentials); } - if let Some(region) = cmd.options.get("region") { + if let Some(region) = cmd.options.remove("region") { builder = builder.with_region(region); } @@ -97,7 +100,7 @@ impl CredentialProvider for S3CredentialProvider { pub fn get_oss_object_store_builder( url: &Url, - cmd: &CreateExternalTable, + cmd: &mut CreateExternalTable, ) -> Result { let bucket_name = get_bucket_name(url)?; let mut builder = AmazonS3Builder::from_env() @@ -107,15 +110,15 @@ pub fn get_oss_object_store_builder( .with_region("do_not_care"); if let (Some(access_key_id), Some(secret_access_key)) = ( - cmd.options.get("access_key_id"), - cmd.options.get("secret_access_key"), + cmd.options.remove("access_key_id"), + cmd.options.remove("secret_access_key"), ) { builder = builder .with_access_key_id(access_key_id) .with_secret_access_key(secret_access_key); } - if let Some(endpoint) = cmd.options.get("endpoint") { + if let Some(endpoint) = cmd.options.remove("endpoint") { builder = builder.with_endpoint(endpoint); } @@ -124,20 +127,20 @@ pub fn get_oss_object_store_builder( pub fn get_gcs_object_store_builder( url: &Url, - cmd: &CreateExternalTable, + cmd: &mut CreateExternalTable, ) -> Result { let bucket_name = get_bucket_name(url)?; let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name); - if let Some(service_account_path) = cmd.options.get("service_account_path") { + if let Some(service_account_path) = cmd.options.remove("service_account_path") { builder = builder.with_service_account_path(service_account_path); } - if let Some(service_account_key) = cmd.options.get("service_account_key") { + if let Some(service_account_key) = cmd.options.remove("service_account_key") { builder = builder.with_service_account_key(service_account_key); } - if let Some(application_credentials_path) = cmd.options.get("application_credentials_path") { + if let Some(application_credentials_path) = cmd.options.remove("application_credentials_path") { builder = builder.with_application_credentials(application_credentials_path); } @@ -180,9 +183,9 @@ mod tests { ); let ctx = SessionContext::new(); - let plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx.state().create_logical_plan(&sql).await?; - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { let builder = get_s3_object_store_builder(table_url.as_ref(), cmd).await?; // get the actual configuration information, then assert_eq! let config = [ @@ -216,9 +219,9 @@ mod tests { ); let ctx = SessionContext::new(); - let plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx.state().create_logical_plan(&sql).await?; - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { let builder = get_oss_object_store_builder(table_url.as_ref(), cmd)?; // get the actual configuration information, then assert_eq! let config = [ @@ -253,9 +256,9 @@ mod tests { ); let ctx = SessionContext::new(); - let plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx.state().create_logical_plan(&sql).await?; - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { let builder = get_gcs_object_store_builder(table_url.as_ref(), cmd)?; // get the actual configuration information, then assert_eq! let config = [ diff --git a/src/utils/datafusion-cli/src/print_format.rs b/src/utils/datafusion-cli/src/print_format.rs index a97e91ac82..0f5c053992 100644 --- a/src/utils/datafusion-cli/src/print_format.rs +++ b/src/utils/datafusion-cli/src/print_format.rs @@ -16,25 +16,27 @@ // under the License. //! Print format variants + use std::str::FromStr; use arrow::csv::writer::WriterBuilder; use arrow::json::{ArrayWriter, LineDelimitedWriter}; +use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches_with_options; -use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::format::DEFAULT_FORMAT_OPTIONS; -use datafusion::error::{DataFusionError, Result}; +use datafusion::error::Result; use crate::print_options::MaxRows; /// Allow records to be printed in different formats -#[derive(Debug, PartialEq, Eq, clap::ValueEnum, Clone)] +#[derive(Debug, PartialEq, Eq, clap::ValueEnum, Clone, Copy)] pub enum PrintFormat { Csv, Tsv, Table, Json, NdJson, + Automatic, } impl FromStr for PrintFormat { @@ -46,30 +48,44 @@ impl FromStr for PrintFormat { } macro_rules! batches_to_json { - ($WRITER: ident, $batches: expr) => {{ - let mut bytes = vec![]; + ($WRITER: ident, $writer: expr, $batches: expr) => {{ { - let mut writer = $WRITER::new(&mut bytes); - $batches.iter().try_for_each(|batch| writer.write(batch))?; - writer.finish()?; + if !$batches.is_empty() { + let mut json_writer = $WRITER::new(&mut *$writer); + for batch in $batches { + json_writer.write(batch)?; + } + json_writer.finish()?; + json_finish!($WRITER, $writer); + } } - String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))? + Ok(()) as Result<()> }}; } -fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result { - let mut bytes = vec![]; - { - let builder = WriterBuilder::new() - .with_header(true) - .with_delimiter(delimiter); - let mut writer = builder.build(&mut bytes); - for batch in batches { - writer.write(batch)?; - } +macro_rules! json_finish { + (ArrayWriter, $writer: expr) => {{ + writeln!($writer)?; + }}; + (LineDelimitedWriter, $writer: expr) => {{}}; +} + +fn print_batches_with_sep( + writer: &mut W, + batches: &[RecordBatch], + delimiter: u8, + with_header: bool, +) -> Result<()> { + let builder = WriterBuilder::new() + .with_header(with_header) + .with_delimiter(delimiter); + let mut csv_writer = builder.build(writer); + + for batch in batches { + csv_writer.write(batch)?; } - let formatted = String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))?; - Ok(formatted) + + Ok(()) } fn keep_only_maxrows(s: &str, maxrows: usize) -> String { @@ -89,73 +105,83 @@ fn keep_only_maxrows(s: &str, maxrows: usize) -> String { result.join("\n") } -fn format_batches_with_maxrows(batches: &[RecordBatch], maxrows: MaxRows) -> Result { +fn format_batches_with_maxrows( + writer: &mut W, + batches: &[RecordBatch], + maxrows: MaxRows, +) -> Result<()> { match maxrows { MaxRows::Limited(maxrows) => { - // Only format enough batches for maxrows + // Filter batches to meet the maxrows condition let mut filtered_batches = Vec::new(); - let mut batches = batches; - let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); - if row_count > maxrows { - let mut accumulated_rows = 0; - - for batch in batches { + let mut row_count: usize = 0; + let mut over_limit = false; + for batch in batches { + if row_count + batch.num_rows() > maxrows { + // If adding this batch exceeds maxrows, slice the batch + let limit = maxrows - row_count; + let sliced_batch = batch.slice(0, limit); + filtered_batches.push(sliced_batch); + over_limit = true; + break; + } else { filtered_batches.push(batch.clone()); - if accumulated_rows + batch.num_rows() > maxrows { - break; - } - accumulated_rows += batch.num_rows(); + row_count += batch.num_rows(); } - - batches = &filtered_batches; } - let mut formatted = format!( - "{}", - pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?, - ); - - if row_count > maxrows { - formatted = keep_only_maxrows(&formatted, maxrows); + let formatted = + pretty_format_batches_with_options(&filtered_batches, &DEFAULT_FORMAT_OPTIONS)?; + if over_limit { + let mut formatted_str = format!("{}", formatted); + formatted_str = keep_only_maxrows(&formatted_str, maxrows); + writeln!(writer, "{}", formatted_str)?; + } else { + writeln!(writer, "{}", formatted)?; } - - Ok(formatted) } MaxRows::Unlimited => { - // maxrows not specified, print all rows - Ok(format!( - "{}", - pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?, - )) + let formatted = pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?; + writeln!(writer, "{}", formatted)?; } } + + Ok(()) } impl PrintFormat { - /// print the batches to stdout using the specified format - /// `maxrows` option is only used for `Table` format: - /// If `maxrows` is Some(n), then at most n rows will be displayed - /// If `maxrows` is None, then every row will be displayed - pub fn print_batches(&self, batches: &[RecordBatch], maxrows: MaxRows) -> Result<()> { + /// Print the batches to a writer using the specified format + pub fn print_batches( + &self, + writer: &mut W, + batches: &[RecordBatch], + maxrows: MaxRows, + with_header: bool, + ) -> Result<()> { + // filter out any empty batches + let batches: Vec<_> = batches + .iter() + .filter(|b| b.num_rows() > 0) + .cloned() + .collect(); if batches.is_empty() { return Ok(()); } match self { - Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?), - Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), + Self::Csv | Self::Automatic => { + print_batches_with_sep(writer, &batches, b',', with_header) + } + Self::Tsv => print_batches_with_sep(writer, &batches, b'\t', with_header), Self::Table => { if maxrows == MaxRows::Limited(0) { return Ok(()); } - println!("{}", format_batches_with_maxrows(batches, maxrows)?,) - } - Self::Json => println!("{}", batches_to_json!(ArrayWriter, batches)), - Self::NdJson => { - println!("{}", batches_to_json!(LineDelimitedWriter, batches)) + format_batches_with_maxrows(writer, &batches, maxrows) } + Self::Json => batches_to_json!(ArrayWriter, writer, &batches), + Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches), } - Ok(()) } } @@ -163,86 +189,185 @@ impl PrintFormat { mod tests { use std::sync::Arc; - use arrow::array::Int32Array; + use arrow::array::{ArrayRef, Int32Array}; use arrow::datatypes::{DataType, Field, Schema}; use super::*; #[test] - fn test_print_batches_with_sep() { - let batches = vec![]; - assert_eq!("", print_batches_with_sep(&batches, b',').unwrap()); - - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - Field::new("c", DataType::Int32, false), - ])); - - let batch = RecordBatch::try_new( - schema, - vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), - Arc::new(Int32Array::from(vec![4, 5, 6])), - Arc::new(Int32Array::from(vec![7, 8, 9])), - ], - ) - .unwrap(); + fn print_empty() { + for format in [ + PrintFormat::Csv, + PrintFormat::Tsv, + PrintFormat::Table, + PrintFormat::Json, + PrintFormat::NdJson, + PrintFormat::Automatic, + ] { + // no output for empty batches, even with header set + PrintBatchesTest::new() + .with_format(format) + .with_batches(vec![]) + .with_expected(&[""]) + .run(); + } + } - let batches = vec![batch]; - let r = print_batches_with_sep(&batches, b',').unwrap(); - assert_eq!("a,b,c\n1,4,7\n2,5,8\n3,6,9\n", r); + #[test] + fn print_csv_no_header() { + #[rustfmt::skip] + let expected = &[ + "1,4,7", + "2,5,8", + "3,6,9", + ]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Csv) + .with_batches(split_batch(three_column_batch())) + .with_header(WithHeader::No) + .with_expected(expected) + .run(); } #[test] - fn test_print_batches_to_json_empty() -> Result<()> { - let batches = vec![]; - let r = batches_to_json!(ArrayWriter, &batches); - assert_eq!("", r); + fn print_csv_with_header() { + #[rustfmt::skip] + let expected = &[ + "a,b,c", + "1,4,7", + "2,5,8", + "3,6,9", + ]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Csv) + .with_batches(split_batch(three_column_batch())) + .with_header(WithHeader::Yes) + .with_expected(expected) + .run(); + } - let r = batches_to_json!(LineDelimitedWriter, &batches); - assert_eq!("", r); + #[test] + fn print_tsv_no_header() { + #[rustfmt::skip] + let expected = &[ + "1\t4\t7", + "2\t5\t8", + "3\t6\t9", + ]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Tsv) + .with_batches(split_batch(three_column_batch())) + .with_header(WithHeader::No) + .with_expected(expected) + .run(); + } - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - Field::new("c", DataType::Int32, false), - ])); + #[test] + fn print_tsv_with_header() { + #[rustfmt::skip] + let expected = &[ + "a\tb\tc", + "1\t4\t7", + "2\t5\t8", + "3\t6\t9", + ]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Tsv) + .with_batches(split_batch(three_column_batch())) + .with_header(WithHeader::Yes) + .with_expected(expected) + .run(); + } - let batch = RecordBatch::try_new( - schema, - vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), - Arc::new(Int32Array::from(vec![4, 5, 6])), - Arc::new(Int32Array::from(vec![7, 8, 9])), - ], - ) - .unwrap(); - - let batches = vec![batch]; - let r = batches_to_json!(ArrayWriter, &batches); - assert_eq!( - "[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]", - r - ); - - let r = batches_to_json!(LineDelimitedWriter, &batches); - assert_eq!( - "{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n", - r - ); - Ok(()) + #[test] + fn print_table() { + let expected = &[ + "+---+---+---+", + "| a | b | c |", + "+---+---+---+", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + "+---+---+---+", + ]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Table) + .with_batches(split_batch(three_column_batch())) + .with_header(WithHeader::Ignored) + .with_expected(expected) + .run(); + } + #[test] + fn print_json() { + let expected = &[r#"[{"a":1,"b":4,"c":7},{"a":2,"b":5,"c":8},{"a":3,"b":6,"c":9}]"#]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Json) + .with_batches(split_batch(three_column_batch())) + .with_header(WithHeader::Ignored) + .with_expected(expected) + .run(); } #[test] - fn test_format_batches_with_maxrows() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + fn print_ndjson() { + let expected = &[ + r#"{"a":1,"b":4,"c":7}"#, + r#"{"a":2,"b":5,"c":8}"#, + r#"{"a":3,"b":6,"c":9}"#, + ]; + + PrintBatchesTest::new() + .with_format(PrintFormat::NdJson) + .with_batches(split_batch(three_column_batch())) + .with_header(WithHeader::Ignored) + .with_expected(expected) + .run(); + } - let batch = - RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap(); + #[test] + fn print_automatic_no_header() { + #[rustfmt::skip] + let expected = &[ + "1,4,7", + "2,5,8", + "3,6,9", + ]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Automatic) + .with_batches(split_batch(three_column_batch())) + .with_header(WithHeader::No) + .with_expected(expected) + .run(); + } + #[test] + fn print_automatic_with_header() { + #[rustfmt::skip] + let expected = &[ + "a,b,c", + "1,4,7", + "2,5,8", + "3,6,9", + ]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Automatic) + .with_batches(split_batch(three_column_batch())) + .with_header(WithHeader::Yes) + .with_expected(expected) + .run(); + } + #[test] + fn print_maxrows_unlimited() { #[rustfmt::skip] - let all_rows_expected = [ + let expected = &[ "+---+", "| a |", "+---+", @@ -250,10 +375,24 @@ mod tests { "| 2 |", "| 3 |", "+---+", - ].join("\n"); + ]; + + // should print out entire output with no truncation if unlimited or + // limit greater than number of batches or equal to the number of batches + for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] { + PrintBatchesTest::new() + .with_format(PrintFormat::Table) + .with_batches(vec![one_column_batch()]) + .with_maxrows(max_rows) + .with_expected(expected) + .run(); + } + } + #[test] + fn print_maxrows_limited_one_batch() { #[rustfmt::skip] - let one_row_expected = [ + let expected = &[ "+---+", "| a |", "+---+", @@ -262,10 +401,20 @@ mod tests { "| . |", "| . |", "+---+", - ].join("\n"); + ]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Table) + .with_batches(vec![one_column_batch()]) + .with_maxrows(MaxRows::Limited(1)) + .with_expected(expected) + .run(); + } + #[test] + fn print_maxrows_limited_multi_batched() { #[rustfmt::skip] - let multi_batches_expected = [ + let expected = &[ "+---+", "| a |", "+---+", @@ -278,26 +427,189 @@ mod tests { "| . |", "| . |", "+---+", - ].join("\n"); - - let no_limit = format_batches_with_maxrows(&[batch.clone()], MaxRows::Unlimited)?; - assert_eq!(all_rows_expected, no_limit); - - let maxrows_less_than_actual = - format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(1))?; - assert_eq!(one_row_expected, maxrows_less_than_actual); - let maxrows_more_than_actual = - format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(5))?; - assert_eq!(all_rows_expected, maxrows_more_than_actual); - let maxrows_equals_actual = - format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(3))?; - assert_eq!(all_rows_expected, maxrows_equals_actual); - let multi_batches = format_batches_with_maxrows( - &[batch.clone(), batch.clone(), batch.clone()], - MaxRows::Limited(5), - )?; - assert_eq!(multi_batches_expected, multi_batches); - - Ok(()) + ]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Table) + .with_batches(vec![ + one_column_batch(), + one_column_batch(), + one_column_batch(), + ]) + .with_maxrows(MaxRows::Limited(5)) + .with_expected(expected) + .run(); + } + + #[test] + fn test_print_batches_empty_batches() { + let batch = one_column_batch(); + let empty_batch = RecordBatch::new_empty(batch.schema()); + + #[rustfmt::skip] + let expected =&[ + "+---+", + "| a |", + "+---+", + "| 1 |", + "| 2 |", + "| 3 |", + "+---+", + ]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Table) + .with_batches(vec![empty_batch.clone(), batch, empty_batch]) + .with_expected(expected) + .run(); + } + + #[test] + fn test_print_batches_empty_batches_no_header() { + let empty_batch = RecordBatch::new_empty(one_column_batch().schema()); + + // empty batches should not print a header + let expected = &[""]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Table) + .with_batches(vec![empty_batch]) + .with_header(WithHeader::Yes) + .with_expected(expected) + .run(); + } + + #[derive(Debug)] + struct PrintBatchesTest { + format: PrintFormat, + batches: Vec, + maxrows: MaxRows, + with_header: WithHeader, + expected: Vec<&'static str>, + } + + /// How to test with_header + #[derive(Debug, Clone)] + enum WithHeader { + Yes, + No, + /// output should be the same with or without header + Ignored, + } + + impl PrintBatchesTest { + fn new() -> Self { + Self { + format: PrintFormat::Table, + batches: vec![], + maxrows: MaxRows::Unlimited, + with_header: WithHeader::Ignored, + expected: vec![], + } + } + + /// set the format + fn with_format(mut self, format: PrintFormat) -> Self { + self.format = format; + self + } + + /// set the batches to convert + fn with_batches(mut self, batches: Vec) -> Self { + self.batches = batches; + self + } + + /// set maxrows + fn with_maxrows(mut self, maxrows: MaxRows) -> Self { + self.maxrows = maxrows; + self + } + + /// set with_header + fn with_header(mut self, with_header: WithHeader) -> Self { + self.with_header = with_header; + self + } + + /// set expected output + fn with_expected(mut self, expected: &[&'static str]) -> Self { + self.expected = expected.to_vec(); + self + } + + /// run the test + fn run(self) { + let actual = self.output(); + let actual: Vec<_> = actual.trim_end().split('\n').collect(); + let expected = self.expected; + assert_eq!( + actual, expected, + "\n\nactual:\n{actual:#?}\n\nexpected:\n{expected:#?}" + ); + } + + /// formats batches using parameters and returns the resulting output + fn output(&self) -> String { + match self.with_header { + WithHeader::Yes => self.output_with_header(true), + WithHeader::No => self.output_with_header(false), + WithHeader::Ignored => { + let output = self.output_with_header(true); + // ensure the output is the same without header + let output_without_header = self.output_with_header(false); + assert_eq!( + output, output_without_header, + "Expected output to be the same with or without header" + ); + output + } + } + } + + fn output_with_header(&self, with_header: bool) -> String { + let mut buffer: Vec = vec![]; + self.format + .print_batches(&mut buffer, &self.batches, self.maxrows, with_header) + .unwrap(); + String::from_utf8(buffer).unwrap() + } + } + + /// Return a batch with three columns and three rows + fn three_column_batch() -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + Arc::new(Int32Array::from(vec![7, 8, 9])), + ], + ) + .unwrap() + } + + /// return a batch with one column and three rows + fn one_column_batch() -> RecordBatch { + RecordBatch::try_from_iter(vec![( + "a", + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + )]) + .unwrap() + } + + /// Slice the record batch into 2 batches + fn split_batch(batch: RecordBatch) -> Vec { + assert!(batch.num_rows() > 1); + let split = batch.num_rows() / 2; + vec![ + batch.slice(0, split), + batch.slice(split, batch.num_rows() - split), + ] } } diff --git a/src/utils/datafusion-cli/src/print_options.rs b/src/utils/datafusion-cli/src/print_options.rs index 87c6517a03..7feba4aa47 100644 --- a/src/utils/datafusion-cli/src/print_options.rs +++ b/src/utils/datafusion-cli/src/print_options.rs @@ -16,11 +16,16 @@ // under the License. use std::fmt::{Display, Formatter}; +use std::io::Write; +use std::pin::Pin; use std::str::FromStr; use std::time::Instant; -use datafusion::arrow::record_batch::RecordBatch; +use arrow::record_batch::RecordBatch; +use datafusion::common::DataFusionError; use datafusion::error::Result; +use datafusion::physical_plan::RecordBatchStream; +use futures::StreamExt; use crate::print_format::PrintFormat; @@ -68,6 +73,7 @@ pub struct PrintOptions { pub format: PrintFormat, pub quiet: bool, pub maxrows: MaxRows, + pub color: bool, } fn get_timing_info_str(row_count: usize, maxrows: MaxRows, query_start_time: Instant) -> String { @@ -87,16 +93,62 @@ fn get_timing_info_str(row_count: usize, maxrows: MaxRows, query_start_time: Ins } impl PrintOptions { - /// print the batches to stdout using the specified format + /// Print the batches to stdout using the specified format pub fn print_batches(&self, batches: &[RecordBatch], query_start_time: Instant) -> Result<()> { + let stdout = std::io::stdout(); + let mut writer = stdout.lock(); + + self.format + .print_batches(&mut writer, batches, self.maxrows, true)?; + let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); - // Elapsed time should not count time for printing batches - let timing_info = get_timing_info_str(row_count, self.maxrows, query_start_time); + let timing_info = get_timing_info_str( + row_count, + if self.format == PrintFormat::Table { + self.maxrows + } else { + MaxRows::Unlimited + }, + query_start_time, + ); + + if !self.quiet { + writeln!(writer, "{timing_info}")?; + } + + Ok(()) + } + + /// Print the stream to stdout using the specified format + pub async fn print_stream( + &self, + mut stream: Pin>, + query_start_time: Instant, + ) -> Result<()> { + if self.format == PrintFormat::Table { + return Err(DataFusionError::External( + "PrintFormat::Table is not implemented".to_string().into(), + )); + }; + + let stdout = std::io::stdout(); + let mut writer = stdout.lock(); + + let mut row_count = 0_usize; + let mut with_header = true; + + while let Some(maybe_batch) = stream.next().await { + let batch = maybe_batch?; + row_count += batch.num_rows(); + self.format + .print_batches(&mut writer, &[batch], MaxRows::Unlimited, with_header)?; + with_header = false; + } - self.format.print_batches(batches, self.maxrows)?; + let timing_info = get_timing_info_str(row_count, MaxRows::Unlimited, query_start_time); if !self.quiet { - println!("{timing_info}"); + writeln!(writer, "{timing_info}")?; } Ok(())