Skip to content

Commit

Permalink
Upgrade to latest datafusion
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Feb 3, 2024
1 parent 36a15c2 commit 1b12451
Show file tree
Hide file tree
Showing 38 changed files with 1,368 additions and 519 deletions.
465 changes: 268 additions & 197 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,4 @@ debug = 1
# See: https://doc.rust-lang.org/cargo/reference/overriding-dependencies.html
[patch.crates-io]
# datafusion = { git = 'https://github.com/apache/arrow-datafusion.git', tag = '33.0.0-rc1' }
datafusion = { git = 'https://github.com/apache/arrow-datafusion.git', branch = 'main' }
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ deny = [
{ name = "clap", deny-multiple-versions = true },
{ name = "datafusion", deny-multiple-versions = true },
{ name = "object_store", deny-multiple-versions = true },
{ name = "parquet", deny-multiple-versions = true },
{ name = "prost", deny-multiple-versions = true },
# { name = "rustls", deny-multiple-versions = true },
{ name = "tokio", deny-multiple-versions = true },
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/flight-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ doctest = false


[dependencies]
arrow-flight = { version = "48", features = ["flight-sql-experimental"] }
arrow-flight = { version = "50", features = ["flight-sql-experimental"] }
async-trait = { version = "0.1", default-features = false }
base64 = { version = "0.21", default-features = false }
dashmap = { version = "5", default-features = false }
datafusion = "33"
datafusion = { version = "35", default-features = false }
futures = "0.3"
like = { version = "0.3", default-features = false }
prost = { version = "0.12", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/graphql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ async-graphql = { version = "6", features = ["chrono", "url", "apollo_tracing"]
async-trait = { version = "0.1", default-features = false }
cron = { version = "0.12.0", default-features = false }
chrono = "0.4"
datafusion = "33" # TODO: Currently needed for type conversions but ideally should be encapsulated by kamu-core
datafusion = "35" # TODO: Currently needed for type conversions but ideally should be encapsulated by kamu-core
dill = "0.8"
futures = "0.3"
indoc = "2"
indoc = "1.0.6"
serde = "1"
serde_json = "1"
tokio = { version = "1", default-features = false, features = [] }
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/graphql/src/scalars/data_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl From<QueryError> for DataQueryResult {
impl From<DataFusionError> for DataQueryResult {
fn from(e: DataFusionError) -> Self {
match e {
DataFusionError::SQL(e) => DataQueryResult::invalid_sql(e.to_string()),
DataFusionError::SQL(e, _backtrace) => DataQueryResult::invalid_sql(e.to_string()),
DataFusionError::Plan(e) => DataQueryResult::invalid_sql(e),
_ => DataQueryResult::internal(e.to_string()),
}
Expand Down
4 changes: 2 additions & 2 deletions src/app/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ read_input = "0.8" # Basic user input
webbrowser = "0.8" # For opening URLs in default system browser

# APIs
arrow-flight = { version = "48", features = ["flight-sql-experimental"] }
arrow-flight = { version = "50", features = ["flight-sql-experimental"] }
async-graphql = { version = "6", features = ["chrono", "url", "apollo_tracing"] }
async-graphql-axum = "6"
axum = { version = "0.6", features = ["ws"] }
Expand Down Expand Up @@ -104,7 +104,7 @@ tracing-bunyan-formatter = "0.3"
async-trait = "0.1"
chrono = "0.4"
cfg-if = "1" # Conditional compilation
datafusion = "33"
datafusion = { version = "35", default-features = false, features = ["crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] }
dill = "0.8"
dirs = "5"
fs_extra = "1.3"
Expand Down
1 change: 1 addition & 0 deletions src/app/cli/src/commands/sql_shell_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl SqlShellCommand {
let mut print_options = PrintOptions {
format: PrintFormat::Table,
quiet: false,
color: true,
maxrows: MaxRows::Limited(DEFAULT_MAX_ROWS_FOR_OUTPUT),
};

Expand Down
4 changes: 2 additions & 2 deletions src/domain/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ tracing = { version = "0.1", default-features = false }
url = { version = "2", default-features = false, features = ["serde"] }

# TODO: Avoid this dependency or depend on sub-crates
datafusion = { version = "33", default-features = false }
object_store = { version = "0.7", default-features = false }
datafusion = { version = "35", default-features = false }
object_store = { version = "0.9", default-features = false }

# TODO: Make serde optional
serde = { version = "1", default-features = false, features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion src/domain/flow-system/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ serde = { version = "1", default-features = false, features = ["derive"] }
serde_with = { version = "3", default-features = false }

[dev-dependencies]
datafusion = "33"
datafusion = { version = "35", default-features = false }
2 changes: 1 addition & 1 deletion src/domain/opendatafabric/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ prost = "0.12"
tonic = "0.10"

# Optional
arrow = { optional = true, version = "48", default-features = false, features = ["ipc"] }
arrow = { optional = true, version = "50", default-features = false, features = ["ipc"] }
4 changes: 2 additions & 2 deletions src/infra/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ tar = "0.4" # Checkpoint archival
zip = "0.6"

# Data
datafusion = "33"
object_store = { version = "0.7", features = ["aws"] }
datafusion = { version = "35", default-features = false }
object_store = { version = "0.9", features = ["aws"] }
digest = "0.10"
sha3 = "0.10"

Expand Down
1 change: 0 additions & 1 deletion src/infra/core/src/query_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ impl SchemaProvider for KamuSchema {
table_partition_cols: Vec::new(),
parquet_pruning: None,
skip_metadata: None,
insert_mode: datafusion::datasource::listing::ListingTableInsertMode::Error,
},
)
.await
Expand Down
6 changes: 3 additions & 3 deletions src/infra/core/tests/tests/ingest/test_polling_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async fn test_ingest_polling_snapshot() {
OPTIONAL INT64 offset;
REQUIRED INT32 op;
REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true));
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL BYTE_ARRAY city (STRING);
OPTIONAL INT64 population;
}
Expand Down Expand Up @@ -907,7 +907,7 @@ async fn test_ingest_polling_preprocess_with_spark() {
OPTIONAL INT64 offset;
REQUIRED INT32 op;
REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true));
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL BYTE_ARRAY city (STRING);
OPTIONAL INT64 population;
}
Expand Down Expand Up @@ -999,7 +999,7 @@ async fn test_ingest_polling_preprocess_with_flink() {
OPTIONAL INT64 offset;
REQUIRED INT32 op;
REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true));
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL BYTE_ARRAY city (STRING);
OPTIONAL INT64 population;
}
Expand Down
40 changes: 7 additions & 33 deletions src/infra/core/tests/tests/ingest/test_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::path::PathBuf;
use std::sync::Arc;

use chrono::{DateTime, TimeZone, Utc};
use datafusion::arrow::datatypes::{Field, Schema, SchemaRef};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::prelude::*;
use dill::Component;
use event_bus::EventBus;
Expand All @@ -32,28 +32,6 @@ use opendatafabric as odf;
// crate.
///////////////////////////////////////////////////////////////

fn assert_schemas_equal(lhs: &SchemaRef, rhs: &SchemaRef, ignore_nullability: bool) {
let map_field = |f: &Arc<Field>| -> Arc<Field> {
if ignore_nullability {
Arc::new(f.as_ref().clone().with_nullable(true))
} else {
f.clone()
}
};

let lhs = Schema::new_with_metadata(
lhs.fields().iter().map(map_field).collect::<Vec<_>>(),
lhs.metadata().clone(),
);
let rhs = Schema::new_with_metadata(
rhs.fields().iter().map(map_field).collect::<Vec<_>>(),
rhs.metadata().clone(),
);
assert_eq!(lhs, rhs);
}

/////////////////////////////////////////////////////////////////////////////////////////

#[test_group::group(engine, ingest, datafusion)]
#[test_log::test(tokio::test)]
async fn test_data_writer_happy_path() {
Expand Down Expand Up @@ -92,7 +70,7 @@ async fn test_data_writer_happy_path() {
OPTIONAL INT64 offset;
REQUIRED INT32 op;
REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true));
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL BYTE_ARRAY city (STRING);
OPTIONAL INT64 population;
}
Expand Down Expand Up @@ -125,11 +103,7 @@ async fn test_data_writer_happy_path() {
let (schema_block_hash, schema_block) = harness.get_last_schema_block().await;
let schema_in_block = schema_block.event.schema_as_arrow().unwrap();
let schema_in_data = SchemaRef::new(df.schema().into());
// TODO: DataFusion issue where the schema of the
// DataFrame being saved into parquet and those read out of it differs in
// nullability
assert_ne!(schema_in_block, schema_in_data);
assert_schemas_equal(&schema_in_block, &schema_in_data, true);
assert_eq!(schema_in_block, schema_in_data);

// Round 2
harness.set_system_time(Utc.with_ymd_and_hms(2010, 1, 2, 12, 0, 0).unwrap());
Expand Down Expand Up @@ -280,7 +254,7 @@ async fn test_data_writer_rejects_incompatible_schema() {
OPTIONAL INT64 offset;
REQUIRED INT32 op;
REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true));
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL BYTE_ARRAY city (STRING);
OPTIONAL INT64 population;
}
Expand Down Expand Up @@ -331,7 +305,7 @@ async fn test_data_writer_rejects_incompatible_schema() {
OPTIONAL INT64 offset;
REQUIRED INT32 op;
REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true));
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL BYTE_ARRAY city (STRING);
OPTIONAL INT64 population;
}
Expand Down Expand Up @@ -412,7 +386,7 @@ async fn test_data_writer_rejects_incompatible_schema() {
OPTIONAL INT64 offset;
REQUIRED INT32 op;
REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true));
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL BYTE_ARRAY city (STRING);
OPTIONAL INT64 population;
}
Expand Down Expand Up @@ -545,7 +519,7 @@ async fn test_data_writer_snapshot_orders_by_pk_and_operation_type() {
OPTIONAL INT64 offset;
REQUIRED INT32 op;
REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true));
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true));
OPTIONAL BYTE_ARRAY city (STRING);
OPTIONAL INT64 population;
}
Expand Down
2 changes: 1 addition & 1 deletion src/infra/core/tests/tests/test_query_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ async fn test_dataset_sql_unauthorized_common(catalog: dill::Catalog, tempdir: &
result,
Err(QueryError::DataFusionError(
datafusion::common::DataFusionError::Plan(s)
)) if s.eq("table 'kamu.kamu.foo' not found")
)) if s.contains("table 'kamu.kamu.foo' not found")
);
}

Expand Down
4 changes: 2 additions & 2 deletions src/infra/ingest-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ opendatafabric = { workspace = true, features = ["arrow"] }
kamu-core = { workspace = true }
kamu-data-utils = { workspace = true }

datafusion = "33"
datafusion = { version = "35", default-features = false }
digest = "0.10"
geo-types = { version = "0.7", default-features = false, features = [] }
geojson ={ version = "0.24", default-features = false, features = ["geo-types"] }
glob = "0.3"
object_store = { version = "0.7", features = ["aws"] }
object_store = { version = "0.9", features = ["aws"] }
serde = { version = "1" }
serde_json = "1"
sha3 = "0.10"
Expand Down
4 changes: 2 additions & 2 deletions src/infra/ingest-datafusion/src/merge_strategies/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ impl MergeStrategySnapshot {
let state = ledger
.window(vec![Expr::WindowFunction(
datafusion::logical_expr::expr::WindowFunction {
fun: datafusion::logical_expr::WindowFunction::BuiltInWindowFunction(
fun: datafusion::logical_expr::WindowFunctionDefinition::BuiltInWindowFunction(
datafusion::logical_expr::BuiltInWindowFunction::RowNumber,
),
args: Vec::new(),
partition_by: self.primary_key.iter().map(col).collect(),
order_by: vec![col(&self.vocab.offset_column).sort(false, false)],
window_frame: datafusion::logical_expr::WindowFrame::new(true),
window_frame: datafusion::logical_expr::WindowFrame::new(Some(false)),
},
)
.alias(rank_col)])
Expand Down
2 changes: 0 additions & 2 deletions src/infra/ingest-datafusion/src/readers/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ impl Reader for ReaderCsv {
// re-compress it.
file_compression_type: FileCompressionType::UNCOMPRESSED,
file_sort_order: Vec::new(),
insert_mode: datafusion::datasource::listing::ListingTableInsertMode::Error,
infinite: false,
};

let df = self
Expand Down
1 change: 0 additions & 1 deletion src/infra/ingest-datafusion/src/readers/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ impl Reader for ReaderNdJson {
file_compression_type: FileCompressionType::UNCOMPRESSED,
file_sort_order: Vec::new(),
infinite: false,
insert_mode: datafusion::datasource::listing::ListingTableInsertMode::Error,
};

let df = self
Expand Down
1 change: 0 additions & 1 deletion src/infra/ingest-datafusion/src/readers/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl Reader for ReaderParquet {
parquet_pruning: None,
skip_metadata: None,
file_sort_order: Vec::new(),
insert_mode: datafusion::datasource::listing::ListingTableInsertMode::Error,
};

let df = self
Expand Down
6 changes: 2 additions & 4 deletions src/infra/ingest-datafusion/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ impl DataWriterDataFusion {
table_partition_cols: Vec::new(),
parquet_pruning: None,
skip_metadata: None,
insert_mode: datafusion::datasource::listing::ListingTableInsertMode::Error,
},
)
.await
Expand Down Expand Up @@ -330,13 +329,13 @@ impl DataWriterDataFusion {
.with_column(
&self.meta.vocab.offset_column,
Expr::WindowFunction(WindowFunction {
fun: expr::WindowFunction::BuiltInWindowFunction(
fun: expr::WindowFunctionDefinition::BuiltInWindowFunction(
expr::BuiltInWindowFunction::RowNumber,
),
args: vec![],
partition_by: vec![],
order_by: self.merge_strategy.sort_order(),
window_frame: expr::WindowFrame::new(false),
window_frame: expr::WindowFrame::new(Some(false)),
}),
)
.int_err()?;
Expand Down Expand Up @@ -478,7 +477,6 @@ impl DataWriterDataFusion {
table_partition_cols: Vec::new(),
parquet_pruning: None,
skip_metadata: None,
insert_mode: datafusion::datasource::listing::ListingTableInsertMode::Error,
},
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ async fn test_read_ndjson_format_timestamp_parse_failed() {
assert_matches!(
res.unwrap().collect().await,
Err(DataFusionError::ArrowError(
::datafusion::arrow::error::ArrowError::JsonError(_)
::datafusion::arrow::error::ArrowError::JsonError(_),
_
))
);
},
Expand Down
4 changes: 2 additions & 2 deletions src/utils/data-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ doctest = false
[dependencies]
opendatafabric = { workspace = true }

arrow-digest = { version = "48", default-features = false }
datafusion = { version = "33", default-features = false }
arrow-digest = { version = "50", default-features = false }
datafusion = { version = "35", default-features = false, features = ["parquet"] }
tracing = { version = "0.1", default-features = false }

async-trait = "0.1"
Expand Down
33 changes: 33 additions & 0 deletions src/utils/data-utils/src/schema/cmp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright Kamu Data, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use datafusion::arrow::datatypes::{Field, Schema};

/// Compare schemas optionally performing some normalization
pub fn assert_schemas_equal(lhs: &Schema, rhs: &Schema, ignore_nullability: bool) {
let map_field = |f: &Arc<Field>| -> Arc<Field> {
if ignore_nullability {
Arc::new(f.as_ref().clone().with_nullable(true))
} else {
f.clone()
}
};

let lhs = Schema::new_with_metadata(
lhs.fields().iter().map(map_field).collect::<Vec<_>>(),
lhs.metadata().clone(),
);
let rhs = Schema::new_with_metadata(
rhs.fields().iter().map(map_field).collect::<Vec<_>>(),
rhs.metadata().clone(),
);
assert_eq!(lhs, rhs);
}
1 change: 1 addition & 0 deletions src/utils/data-utils/src/schema/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl<'a> ParquetJsonSchemaWriter<'a> {
) -> String {
match logical_type {
Some(logical_type) => match logical_type {
LogicalType::Float16 => "FLOAT16".to_string(),
LogicalType::Integer {
bit_width,
is_signed,
Expand Down
Loading

0 comments on commit 1b12451

Please sign in to comment.