Skip to content

Commit

Permalink
update to datafusion 40
Browse files Browse the repository at this point in the history
  • Loading branch information
jonmmease committed Aug 11, 2024
1 parent fd72c88 commit 035de06
Show file tree
Hide file tree
Showing 20 changed files with 193 additions and 171 deletions.
134 changes: 58 additions & 76 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ members = [
]

[workspace.dependencies]
arrow = { version = "52.0.0", default-features = false }
arrow = { version = "52.1.0", default-features = false }
sqlparser = { version = "0.47.0" }
chrono = { version = "0.4.35", default-features = false }
chrono-tz = {version = "0.9.0", features=["case-insensitive", "filter-by-regex"] }
Expand All @@ -27,34 +27,34 @@ prost-types = { version = "0.12.3" }
object_store = { version= "0.10.1" }

[workspace.dependencies.datafusion]
version = "39.0.0"
version = "40.0.0"

[workspace.dependencies.datafusion-common]
version = "39.0.0"
version = "40.0.0"

[workspace.dependencies.datafusion-expr]
version = "39.0.0"
version = "40.0.0"

[workspace.dependencies.datafusion-proto]
version = "39.0.0"
version = "40.0.0"

[workspace.dependencies.datafusion-proto-common]
version = "39.0.0"
version = "40.0.0"

[workspace.dependencies.datafusion-physical-expr]
version = "39.0.0"
version = "40.0.0"

[workspace.dependencies.datafusion-optimizer]
version = "39.0.0"
version = "40.0.0"

[workspace.dependencies.datafusion-functions]
version = "39.0.0"
version = "40.0.0"

[workspace.dependencies.datafusion-functions-array]
version = "39.0.0"
version = "40.0.0"

[workspace.dependencies.datafusion-functions-aggregate]
version = "39.0.0"
version = "40.0.0"

[profile.release]
## Tell `rustc` to use highest performance optimization and perform Link Time Optimization
Expand Down
4 changes: 2 additions & 2 deletions vegafusion-common/src/data/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ impl ScalarValueHelpers for ScalarValue {
}
Value::Array(elements) => {
let array: ListArray = if elements.is_empty() {
array_into_list_array(Arc::new(new_empty_array(&DataType::Float64)))
array_into_list_array(Arc::new(new_empty_array(&DataType::Float64)), true)
} else {
let elements: Vec<_> = elements
.iter()
.map(ScalarValue::from_json)
.collect::<Result<Vec<ScalarValue>>>()?;

array_into_list_array(ScalarValue::iter_to_array(elements)?)
array_into_list_array(ScalarValue::iter_to_array(elements)?, true)
};

ScalarValue::List(Arc::new(array))
Expand Down
8 changes: 6 additions & 2 deletions vegafusion-common/src/data/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,14 @@ impl VegaFusionTable {
if self.num_rows() == 0 {
// Return empty list with (arbitrary) Float64 type
let array = Arc::new(new_empty_array(&DataType::Float64));
return Ok(ScalarValue::List(Arc::new(array_into_list_array(array))));
return Ok(ScalarValue::List(Arc::new(array_into_list_array(
array, true,
))));
}
let array = Arc::new(StructArray::from(self.to_record_batch()?)) as ArrayRef;
Ok(ScalarValue::List(Arc::new(array_into_list_array(array))))
Ok(ScalarValue::List(Arc::new(array_into_list_array(
array, true,
))))
}

#[cfg(feature = "json")]
Expand Down
2 changes: 1 addition & 1 deletion vegafusion-datafusion-udfs/src/udafs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) struct PercentileContAccumulator {

impl Accumulator for PercentileContAccumulator {
fn state(&mut self) -> Result<Vec<ScalarValue>, DataFusionError> {
let state = ScalarValue::new_list(self.all_values.as_slice(), &self.data_type);
let state = ScalarValue::new_list(self.all_values.as_slice(), &self.data_type, true);
Ok(vec![ScalarValue::List(state)])
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ pub fn vl_selection_resolve_fn(
// Turn values into a scalar list
let values = ScalarValue::List(Arc::new(array_into_list_array(
ScalarValue::iter_to_array(values)?,
true,
)));
Ok((name, values))
})
Expand Down
36 changes: 21 additions & 15 deletions vegafusion-runtime/src/expression/compiler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,10 @@ mod test_compile {
// Check evaluated value
let result_value = result_expr.eval_to_scalar().unwrap();

let expected_value = ScalarValue::List(Arc::new(array_into_list_array(Arc::new(
Float64Array::from(vec![1.0, 2.0, 3.0]),
))));
let expected_value = ScalarValue::List(Arc::new(array_into_list_array(
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
true,
)));

println!("value: {result_value:?}");
assert_eq!(result_value, expected_value);
Expand All @@ -406,9 +407,10 @@ mod test_compile {
assert_eq!(result_expr, expected_expr);

let result_value = result_expr.eval_to_scalar().unwrap();
let expected_value = ScalarValue::List(Arc::new(array_into_list_array(new_empty_array(
&DataType::Int64,
))));
let expected_value = ScalarValue::List(Arc::new(array_into_list_array(
new_empty_array(&DataType::Int64),
true,
)));

println!("value: {result_value:?}");
assert_eq!(result_value, expected_value);
Expand All @@ -432,17 +434,21 @@ mod test_compile {
let result_value = result_expr.eval_to_scalar().unwrap();
let expected_value = ScalarValue::List(Arc::new(array_into_list_array(
ScalarValue::iter_to_array(vec![
ScalarValue::List(Arc::new(array_into_list_array(Arc::new(
Float64Array::from(vec![1.0, 2.0]),
)))),
ScalarValue::List(Arc::new(array_into_list_array(Arc::new(
Float64Array::from(vec![3.0, 4.0]),
)))),
ScalarValue::List(Arc::new(array_into_list_array(Arc::new(
Float64Array::from(vec![5.0, 6.0]),
)))),
ScalarValue::List(Arc::new(array_into_list_array(
Arc::new(Float64Array::from(vec![1.0, 2.0])),
true,
))),
ScalarValue::List(Arc::new(array_into_list_array(
Arc::new(Float64Array::from(vec![3.0, 4.0])),
true,
))),
ScalarValue::List(Arc::new(array_into_list_array(
Arc::new(Float64Array::from(vec![5.0, 6.0])),
true,
))),
])
.unwrap(),
true,
)));

println!("value: {result_value:?}");
Expand Down
20 changes: 8 additions & 12 deletions vegafusion-runtime/src/transform/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use crate::expression::compiler::config::CompilationConfig;
use crate::transform::TransformTrait;

use datafusion_expr::{avg, count, count_distinct, lit, max, min, sum, Expr};
use datafusion_expr::{lit, max, min, Expr};
use datafusion_functions_aggregate::median::median_udaf;
use datafusion_functions_aggregate::variance::var_samp_udaf;
use datafusion_functions_aggregate::variance::{var_pop_udaf, var_samp_udaf};
use sqlparser::ast::NullTreatment;
use std::collections::HashMap;

use async_trait::async_trait;
use datafusion_expr::expr;
use datafusion_expr::expr::AggregateFunctionDefinition;
use datafusion_expr::{aggregate_function, expr};
use datafusion_functions_aggregate::expr_fn::{avg, count, count_distinct, sum};
use datafusion_functions_aggregate::stddev::{stddev_pop_udaf, stddev_udaf};
use std::sync::Arc;
use vegafusion_common::column::{flat_col, unescaped_col};
use vegafusion_common::data::ORDER_COL;
Expand Down Expand Up @@ -182,29 +184,23 @@ pub fn make_agg_expr_for_col_expr(
null_treatment: Some(NullTreatment::IgnoreNulls),
}),
AggregateOp::Variancep => Expr::AggregateFunction(expr::AggregateFunction {
func_def: AggregateFunctionDefinition::BuiltIn(
aggregate_function::AggregateFunction::VariancePop,
),
func_def: AggregateFunctionDefinition::UDF(var_pop_udaf()),
distinct: false,
args: vec![numeric_column()?],
filter: None,
order_by: None,
null_treatment: Some(NullTreatment::IgnoreNulls),
}),
AggregateOp::Stdev => Expr::AggregateFunction(expr::AggregateFunction {
func_def: AggregateFunctionDefinition::BuiltIn(
aggregate_function::AggregateFunction::Stddev,
),
func_def: AggregateFunctionDefinition::UDF(stddev_udaf()),
distinct: false,
args: vec![numeric_column()?],
filter: None,
order_by: None,
null_treatment: Some(NullTreatment::IgnoreNulls),
}),
AggregateOp::Stdevp => Expr::AggregateFunction(expr::AggregateFunction {
func_def: AggregateFunctionDefinition::BuiltIn(
aggregate_function::AggregateFunction::StddevPop,
),
func_def: AggregateFunctionDefinition::UDF(stddev_pop_udaf()),
distinct: false,
args: vec![numeric_column()?],
filter: None,
Expand Down
1 change: 1 addition & 0 deletions vegafusion-runtime/src/transform/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ fn compute_output_value(bin_tx: &Bin, start: f64, stop: f64, step: f64) -> Optio
fname.insert_str(0, "bin_");
let fields = ScalarValue::List(Arc::new(array_into_list_array(
ScalarValue::iter_to_array(vec![ScalarValue::from(bin_tx.field.as_str())]).ok()?,
true,
)));

if bin_tx.signal.is_some() {
Expand Down
1 change: 1 addition & 0 deletions vegafusion-runtime/src/transform/extent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ fn extract_extent_list(table: &VegaFusionTable) -> Result<TaskValue> {
// Build two-element list of the extents
let extent_list = TaskValue::Scalar(ScalarValue::List(Arc::new(array_into_list_array(
ScalarValue::iter_to_array(vec![min_val_scalar, max_val_scalar])?,
true,
))));
Ok(extent_list)
}
30 changes: 11 additions & 19 deletions vegafusion-runtime/src/transform/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_trait::async_trait;

use datafusion_common::ScalarValue;
use datafusion_expr::{aggregate_function, expr, lit, Expr, WindowFrame, WindowFunctionDefinition};
use datafusion_functions_aggregate::variance::var_samp_udaf;
use datafusion_functions_aggregate::variance::{var_pop_udaf, var_samp_udaf};
use sqlparser::ast::NullTreatment;
use std::sync::Arc;
use vegafusion_core::error::Result;
Expand All @@ -13,7 +13,11 @@ use vegafusion_core::proto::gen::transforms::{
};
use vegafusion_core::task_graph::task_value::TaskValue;

use datafusion_expr::test::function_stub::count_udaf;
use datafusion_expr::{BuiltInWindowFunction, WindowFrameBound, WindowFrameUnits};
use datafusion_functions_aggregate::average::avg_udaf;
use datafusion_functions_aggregate::stddev::{stddev_pop_udaf, stddev_udaf};
use datafusion_functions_aggregate::sum::sum_udaf;
use vegafusion_common::column::{flat_col, unescaped_col};
use vegafusion_common::data::ORDER_COL;
use vegafusion_common::datatypes::to_numeric;
Expand Down Expand Up @@ -113,21 +117,15 @@ impl TransformTrait for Window {
use AggregateOp::*;
match op {
Count => (
WindowFunctionDefinition::AggregateFunction(
aggregate_function::AggregateFunction::Count,
),
WindowFunctionDefinition::AggregateUDF(count_udaf()),
vec![lit(true)],
),
Sum => (
WindowFunctionDefinition::AggregateFunction(
aggregate_function::AggregateFunction::Sum,
),
WindowFunctionDefinition::AggregateUDF(sum_udaf()),
vec![numeric_field()?],
),
Mean | Average => (
WindowFunctionDefinition::AggregateFunction(
aggregate_function::AggregateFunction::Avg,
),
WindowFunctionDefinition::AggregateUDF(avg_udaf()),
vec![numeric_field()?],
),
Min => (
Expand All @@ -147,21 +145,15 @@ impl TransformTrait for Window {
vec![numeric_field()?],
),
Variancep => (
WindowFunctionDefinition::AggregateFunction(
aggregate_function::AggregateFunction::VariancePop,
),
WindowFunctionDefinition::AggregateUDF(var_pop_udaf()),
vec![numeric_field()?],
),
Stdev => (
WindowFunctionDefinition::AggregateFunction(
aggregate_function::AggregateFunction::Stddev,
),
WindowFunctionDefinition::AggregateUDF(stddev_udaf()),
vec![numeric_field()?],
),
Stdevp => (
WindowFunctionDefinition::AggregateFunction(
aggregate_function::AggregateFunction::StddevPop,
),
WindowFunctionDefinition::AggregateUDF(stddev_pop_udaf()),
vec![numeric_field()?],
),
// ArrayAgg only available on master right now
Expand Down
3 changes: 2 additions & 1 deletion vegafusion-runtime/tests/test_vegajs_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ fn test_evaluate_filter_transform() {
assert_eq!(
result_signals,
vec![ScalarValue::List(Arc::new(array_into_list_array(
Arc::new(Float64Array::from(vec![6.0, 10.0]))
Arc::new(Float64Array::from(vec![6.0, 10.0])),
true
)))]
);

Expand Down
3 changes: 3 additions & 0 deletions vegafusion-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ workspace = true
[dependencies.datafusion-functions]
workspace = true

[dependencies.datafusion-functions-aggregate]
workspace = true

[dependencies.datafusion]
workspace = true
optional = true
Expand Down
14 changes: 10 additions & 4 deletions vegafusion-sql/src/compile/function_arg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ impl ToSqlFunctionArg for Expr {
Expr::Wildcard { qualifier: None } => SqlFunctionArgExpr::Wildcard,
Expr::Wildcard {
qualifier: Some(qualifier),
} => SqlFunctionArgExpr::QualifiedWildcard(ObjectName(vec![Ident {
value: qualifier.clone(),
quote_style: None,
}])),
} => SqlFunctionArgExpr::QualifiedWildcard(ObjectName(
qualifier
.to_vec()
.into_iter()
.map(|value| Ident {
value,
quote_style: None,
})
.collect(),
)),
expr => SqlFunctionArgExpr::Expr(expr.to_sql(dialect, schema)?),
})
}
Expand Down
9 changes: 9 additions & 0 deletions vegafusion-sql/src/compile/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,15 @@ impl ToSqlScalar for ScalarValue {
ScalarValue::Union(_, _, _) => Err(VegaFusionError::internal(
"Union cannot be converted to SQL",
)),
ScalarValue::Utf8View(_) => Err(VegaFusionError::internal(
"Utf8View cannot be converted to SQL",
)),
ScalarValue::BinaryView(_) => Err(VegaFusionError::internal(
"BinaryView cannot be converted to SQL",
)),
ScalarValue::Map(_) => Err(VegaFusionError::internal(
"BinaryView cannot be converted to SQL",
)),
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions vegafusion-sql/src/connection/datafusion_py_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,8 @@ impl ExecutionPlan for PyDatasourceExec {
fn properties(&self) -> &PlanProperties {
&self.plan_properties
}

fn name(&self) -> &str {
"py_datasource"
}
}
Loading

0 comments on commit 035de06

Please sign in to comment.