Skip to content

Commit

Permalink
Break out SQL Writer
Browse files Browse the repository at this point in the history
  • Loading branch information
backkem committed Mar 8, 2024
1 parent 277fafa commit 7f5f890
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"examples",
"sources/sql",
"sources/flight-sql",
"sql-writer",
]

[patch.crates-io]
Expand Down
4 changes: 3 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ datafusion-federation-flight-sql.path = "../sources/flight-sql"
connectorx = { git = "https://github.com/sfu-db/connector-x.git", rev = "fa0fc7bc", features = [
"dst_arrow",
"src_sqlite",
"src_postgres",
] }
tonic = "0.10.2"

[dependencies]
async-std = "1.12.0"

[features]
postgres = ["connectorx/src_postgres"]
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ cargo run --example sqlite

- [sqlite](./examples/sqlite.rs): federate an entire query to a SQLite database.
- [sqlite-partial](./examples/sqlite-partial.rs): federate parts of a query to two separate SQLite database instances.
- [postgres-partial](./examples/postgres-partial.rs): federate parts of a query to two separate PostgreSQL database instances. To run this example pass `--features postgres` to the `cargo run` command.
1 change: 1 addition & 0 deletions sources/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ connectorx = { git = "https://github.com/sfu-db/connector-x.git", rev = "fa0fc7b
] }
datafusion.workspace = true
datafusion-federation.path = "../../datafusion-federation"
datafusion-sql-writer.path = "../../sql-writer"
# derive_builder = "0.13.0"
futures = "0.3.30"
tokio = "1.35.1"
8 changes: 2 additions & 6 deletions sources/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use datafusion::{
use datafusion_federation::{FederatedPlanNode, FederationPlanner, FederationProvider};

mod schema;
use datafusion_sql_writer::from_df_plan;
pub use schema::*;

pub mod connectorx;
Expand All @@ -24,11 +25,6 @@ pub use executor::*;
// #[macro_use]
// extern crate derive_builder;

mod producer;
use producer::query_to_sql;

mod ast_builder;

// SQLFederationProvider provides federation to SQL DMBSs.
pub struct SQLFederationProvider {
analyzer: Arc<Analyzer>,
Expand Down Expand Up @@ -168,7 +164,7 @@ impl ExecutionPlan for VirtualExecutionPlan {
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let ast = query_to_sql(&self.plan, self.executor.dialect())?;
let ast = from_df_plan(&self.plan, self.executor.dialect())?;
let query = format!("{ast}");

self.executor.execute(query.as_str(), self.schema())
Expand Down
17 changes: 17 additions & 0 deletions sql-writer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "datafusion-sql-writer"
version.workspace = true
edition.workspace = true
license.workspace = true
readme.workspace = true

[lib]
name = "datafusion_sql_writer"
path = "src/lib.rs"

[dependencies]
datafusion.workspace = true
# derive_builder = "0.13.0"

[dev-dependencies]
tokio = "1.35.1"
35 changes: 35 additions & 0 deletions sql-writer/examples/expr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::sync::Arc;

use datafusion::{
common::Column,
logical_expr::{BinaryExpr, Operator},
prelude::Expr,
sql::{sqlparser::dialect::GenericDialect, TableReference},
};
use datafusion_sql_writer::from_df_epr;

fn main() -> Result<(), Box<dyn std::error::Error>> {
// Example expression
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: Some(TableReference::bare("table_a")),
name: "id".to_string(),
})),
op: Operator::Gt,
right: Box::new(Expr::Column(Column {
relation: Some(TableReference::bare("table_b")),
name: "b".to_string(),
})),
});

// datafusion::Expr -> sqlparser::ast
let dialect = Arc::new(GenericDialect {});
let ast = from_df_epr(&expr, dialect)?;

// Get SQL string by formatting the AST
let sql = format!("{}", ast);

println!("{sql}");

Ok(())
}
41 changes: 41 additions & 0 deletions sql-writer/examples/plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::sync::Arc;

use datafusion::{
execution::context::SessionContext, sql::sqlparser::dialect::GenericDialect,
test_util::TestTableFactory,
};
use datafusion_sql_writer::from_df_plan;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Example query
let query = "select ta.id, tb.value from table_a ta join table_b tb on ta.id = tb.id;";

// Create the DataFusion plan
let dialect = Arc::new(GenericDialect {});
let ctx = mock_ctx().await;
let plan = ctx.sql(query).await.unwrap().into_unoptimized_plan();

// datafusion::LogicalPlan -> sqlparser::ast
let ast = from_df_plan(&plan, dialect)?;

// Get SQL string by formatting the AST
let sql = format!("{}", ast);

println!("{sql}");
Ok(())
}

async fn mock_ctx() -> SessionContext {
let mut state = SessionContext::new().state();
state
.table_factories_mut()
.insert("MOCKTABLE".to_string(), Arc::new(TestTableFactory {}));
let ctx = SessionContext::new_with_state(state);

ctx.sql("CREATE EXTERNAL TABLE table_a (id integer, value string) STORED AS MOCKTABLE LOCATION 'mock://path';").await.unwrap();
ctx.sql("CREATE EXTERNAL TABLE table_b (id integer, value string) STORED AS MOCKTABLE LOCATION 'mock://path';").await.unwrap();
ctx.sql("CREATE EXTERNAL TABLE table_c (id integer, value string) STORED AS MOCKTABLE LOCATION 'mock://path';").await.unwrap();

ctx
}
File renamed without changes.
14 changes: 12 additions & 2 deletions sources/sql/src/producer.rs → sql-writer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use datafusion::{
sql::sqlparser::ast::{self, Expr as SQLExpr},
};

use datafusion::common::not_impl_err;
use datafusion::common::{not_impl_err, DFSchema};
use datafusion::common::{Column, DFSchemaRef};
#[allow(unused_imports)]
use datafusion::logical_expr::aggregate_function;
Expand All @@ -21,12 +21,22 @@ use datafusion::sql::sqlparser::dialect::{
Dialect, GenericDialect, PostgreSqlDialect, SQLiteDialect,
};

mod ast_builder;
use crate::ast_builder::{
BuilderError, QueryBuilder, RelationBuilder, SelectBuilder, TableRelationBuilder,
TableWithJoinsBuilder,
};

pub fn query_to_sql(plan: &LogicalPlan, dialect: Arc<dyn Dialect>) -> Result<ast::Statement> {
pub fn from_df_plan(plan: &LogicalPlan, dialect: Arc<dyn Dialect>) -> Result<ast::Statement> {
query_to_sql(plan, dialect)
}

pub fn from_df_epr(expr: &Expr, dialect: Arc<dyn Dialect>) -> Result<SQLExpr> {
let schema = DFSchema::empty();
expr_to_sql(expr, &Arc::new(schema), 0, dialect)
}

fn query_to_sql(plan: &LogicalPlan, dialect: Arc<dyn Dialect>) -> Result<ast::Statement> {
match plan {
LogicalPlan::Projection(_)
| LogicalPlan::Filter(_)
Expand Down

0 comments on commit 7f5f890

Please sign in to comment.