From 7f5f8902fda8976d4493efd314089198e6eb21db Mon Sep 17 00:00:00 2001 From: Michiel De Backker Date: Fri, 8 Mar 2024 11:53:07 +0100 Subject: [PATCH] Break out SQL Writer --- Cargo.toml | 1 + examples/Cargo.toml | 4 +- examples/README.md | 1 + sources/sql/Cargo.toml | 1 + sources/sql/src/lib.rs | 8 +--- sql-writer/Cargo.toml | 17 ++++++++ sql-writer/examples/expr.rs | 35 ++++++++++++++++ sql-writer/examples/plan.rs | 41 +++++++++++++++++++ .../sql => sql-writer}/src/ast_builder.rs | 0 .../src/producer.rs => sql-writer/src/lib.rs | 14 ++++++- 10 files changed, 113 insertions(+), 9 deletions(-) create mode 100644 sql-writer/Cargo.toml create mode 100644 sql-writer/examples/expr.rs create mode 100644 sql-writer/examples/plan.rs rename {sources/sql => sql-writer}/src/ast_builder.rs (100%) rename sources/sql/src/producer.rs => sql-writer/src/lib.rs (98%) diff --git a/Cargo.toml b/Cargo.toml index a0e33b7..1bd9381 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "examples", "sources/sql", "sources/flight-sql", + "sql-writer", ] [patch.crates-io] diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 8babd31..87d7c8e 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -16,9 +16,11 @@ datafusion-federation-flight-sql.path = "../sources/flight-sql" connectorx = { git = "https://github.com/sfu-db/connector-x.git", rev = "fa0fc7bc", features = [ "dst_arrow", "src_sqlite", - "src_postgres", ] } tonic = "0.10.2" [dependencies] async-std = "1.12.0" + +[features] +postgres = ["connectorx/src_postgres"] diff --git a/examples/README.md b/examples/README.md index 3eae546..8ecd490 100644 --- a/examples/README.md +++ b/examples/README.md @@ -9,3 +9,4 @@ cargo run --example sqlite - [sqlite](./examples/sqlite.rs): federate an entire query to a SQLite database. - [sqlite-partial](./examples/sqlite-partial.rs): federate parts of a query to two separate SQLite database instances. +- [postgres-partial](./examples/postgres-partial.rs): federate parts of a query to two separate PostgreSQL database instances. To run this example pass `--features postgres` to the `cargo run` command. diff --git a/sources/sql/Cargo.toml b/sources/sql/Cargo.toml index 448014e..8f966c8 100644 --- a/sources/sql/Cargo.toml +++ b/sources/sql/Cargo.toml @@ -18,6 +18,7 @@ connectorx = { git = "https://github.com/sfu-db/connector-x.git", rev = "fa0fc7b ] } datafusion.workspace = true datafusion-federation.path = "../../datafusion-federation" +datafusion-sql-writer.path = "../../sql-writer" # derive_builder = "0.13.0" futures = "0.3.30" tokio = "1.35.1" diff --git a/sources/sql/src/lib.rs b/sources/sql/src/lib.rs index ed0f1ca..1d997e5 100644 --- a/sources/sql/src/lib.rs +++ b/sources/sql/src/lib.rs @@ -15,6 +15,7 @@ use datafusion::{ use datafusion_federation::{FederatedPlanNode, FederationPlanner, FederationProvider}; mod schema; +use datafusion_sql_writer::from_df_plan; pub use schema::*; pub mod connectorx; @@ -24,11 +25,6 @@ pub use executor::*; // #[macro_use] // extern crate derive_builder; -mod producer; -use producer::query_to_sql; - -mod ast_builder; - // SQLFederationProvider provides federation to SQL DMBSs. pub struct SQLFederationProvider { analyzer: Arc, @@ -168,7 +164,7 @@ impl ExecutionPlan for VirtualExecutionPlan { _partition: usize, _context: Arc, ) -> Result { - let ast = query_to_sql(&self.plan, self.executor.dialect())?; + let ast = from_df_plan(&self.plan, self.executor.dialect())?; let query = format!("{ast}"); self.executor.execute(query.as_str(), self.schema()) diff --git a/sql-writer/Cargo.toml b/sql-writer/Cargo.toml new file mode 100644 index 0000000..6b716b3 --- /dev/null +++ b/sql-writer/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "datafusion-sql-writer" +version.workspace = true +edition.workspace = true +license.workspace = true +readme.workspace = true + +[lib] +name = "datafusion_sql_writer" +path = "src/lib.rs" + +[dependencies] +datafusion.workspace = true +# derive_builder = "0.13.0" + +[dev-dependencies] +tokio = "1.35.1" diff --git a/sql-writer/examples/expr.rs b/sql-writer/examples/expr.rs new file mode 100644 index 0000000..4dc6dfa --- /dev/null +++ b/sql-writer/examples/expr.rs @@ -0,0 +1,35 @@ +use std::sync::Arc; + +use datafusion::{ + common::Column, + logical_expr::{BinaryExpr, Operator}, + prelude::Expr, + sql::{sqlparser::dialect::GenericDialect, TableReference}, +}; +use datafusion_sql_writer::from_df_epr; + +fn main() -> Result<(), Box> { + // Example expression + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: Some(TableReference::bare("table_a")), + name: "id".to_string(), + })), + op: Operator::Gt, + right: Box::new(Expr::Column(Column { + relation: Some(TableReference::bare("table_b")), + name: "b".to_string(), + })), + }); + + // datafusion::Expr -> sqlparser::ast + let dialect = Arc::new(GenericDialect {}); + let ast = from_df_epr(&expr, dialect)?; + + // Get SQL string by formatting the AST + let sql = format!("{}", ast); + + println!("{sql}"); + + Ok(()) +} diff --git a/sql-writer/examples/plan.rs b/sql-writer/examples/plan.rs new file mode 100644 index 0000000..3f9a5c8 --- /dev/null +++ b/sql-writer/examples/plan.rs @@ -0,0 +1,41 @@ +use std::sync::Arc; + +use datafusion::{ + execution::context::SessionContext, sql::sqlparser::dialect::GenericDialect, + test_util::TestTableFactory, +}; +use datafusion_sql_writer::from_df_plan; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Example query + let query = "select ta.id, tb.value from table_a ta join table_b tb on ta.id = tb.id;"; + + // Create the DataFusion plan + let dialect = Arc::new(GenericDialect {}); + let ctx = mock_ctx().await; + let plan = ctx.sql(query).await.unwrap().into_unoptimized_plan(); + + // datafusion::LogicalPlan -> sqlparser::ast + let ast = from_df_plan(&plan, dialect)?; + + // Get SQL string by formatting the AST + let sql = format!("{}", ast); + + println!("{sql}"); + Ok(()) +} + +async fn mock_ctx() -> SessionContext { + let mut state = SessionContext::new().state(); + state + .table_factories_mut() + .insert("MOCKTABLE".to_string(), Arc::new(TestTableFactory {})); + let ctx = SessionContext::new_with_state(state); + + ctx.sql("CREATE EXTERNAL TABLE table_a (id integer, value string) STORED AS MOCKTABLE LOCATION 'mock://path';").await.unwrap(); + ctx.sql("CREATE EXTERNAL TABLE table_b (id integer, value string) STORED AS MOCKTABLE LOCATION 'mock://path';").await.unwrap(); + ctx.sql("CREATE EXTERNAL TABLE table_c (id integer, value string) STORED AS MOCKTABLE LOCATION 'mock://path';").await.unwrap(); + + ctx +} diff --git a/sources/sql/src/ast_builder.rs b/sql-writer/src/ast_builder.rs similarity index 100% rename from sources/sql/src/ast_builder.rs rename to sql-writer/src/ast_builder.rs diff --git a/sources/sql/src/producer.rs b/sql-writer/src/lib.rs similarity index 98% rename from sources/sql/src/producer.rs rename to sql-writer/src/lib.rs index 0c4ef45..27020da 100644 --- a/sources/sql/src/producer.rs +++ b/sql-writer/src/lib.rs @@ -8,7 +8,7 @@ use datafusion::{ sql::sqlparser::ast::{self, Expr as SQLExpr}, }; -use datafusion::common::not_impl_err; +use datafusion::common::{not_impl_err, DFSchema}; use datafusion::common::{Column, DFSchemaRef}; #[allow(unused_imports)] use datafusion::logical_expr::aggregate_function; @@ -21,12 +21,22 @@ use datafusion::sql::sqlparser::dialect::{ Dialect, GenericDialect, PostgreSqlDialect, SQLiteDialect, }; +mod ast_builder; use crate::ast_builder::{ BuilderError, QueryBuilder, RelationBuilder, SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder, }; -pub fn query_to_sql(plan: &LogicalPlan, dialect: Arc) -> Result { +pub fn from_df_plan(plan: &LogicalPlan, dialect: Arc) -> Result { + query_to_sql(plan, dialect) +} + +pub fn from_df_epr(expr: &Expr, dialect: Arc) -> Result { + let schema = DFSchema::empty(); + expr_to_sql(expr, &Arc::new(schema), 0, dialect) +} + +fn query_to_sql(plan: &LogicalPlan, dialect: Arc) -> Result { match plan { LogicalPlan::Projection(_) | LogicalPlan::Filter(_)