Skip to content

Commit

Permalink
Add ingest schema case sensitivity test
Browse files Browse the repository at this point in the history
Closes: #899
  • Loading branch information
sergiimk committed Oct 13, 2024
1 parent f060d0a commit ed8d6de
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/infra/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ itertools = "0.13"
libc = "0.2" # Signal names
like = { version = "0.3", default-features = false }
pin-project = "1"
petgraph = { version = "0.6", default-features = false, features = ["stable_graph"] }
petgraph = { version = "0.6", default-features = false, features = [
"stable_graph",
] }
rand = "0.8"
regex = "1"
tempfile = "3"
Expand Down
177 changes: 177 additions & 0 deletions src/infra/core/tests/tests/ingest/test_polling_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,183 @@ async fn test_ingest_polling_bad_column_names_rename() {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// See: https://github.com/apache/datafusion/issues/7460
// See: https://github.com/kamu-data/kamu-cli/issues/899
#[test_group::group(engine, ingest, datafusion)]
#[test_log::test(tokio::test)]
async fn test_ingest_polling_schema_case_sensitivity() {
let harness = IngestTestHarness::new();

let src_path = harness.temp_dir.path().join("data.csv");

let dataset_snapshot = MetadataFactory::dataset_snapshot()
.name("foo.bar")
.kind(DatasetKind::Root)
.push_event(
MetadataFactory::set_polling_source()
.fetch(FetchStep::Url(FetchStepUrl {
url: url::Url::from_file_path(&src_path)
.unwrap()
.as_str()
.to_owned(),
event_time: None,
cache: None,
headers: None,
}))
.read(ReadStep::Csv(ReadStepCsv {
header: Some(true),
schema: Some(vec![
"date TIMESTAMP".to_string(),
"UPPER STRING".to_string(),
"lower BIGINT".to_string(),
]),
..ReadStepCsv::default()
}))
.merge(MergeStrategyLedger {
primary_key: vec!["date".to_string()],
})
.build(),
)
.push_event(SetVocab {
event_time_column: Some("date".to_string()),
..Default::default()
})
.build();

let dataset_alias = dataset_snapshot.name.clone();

harness.create_dataset(dataset_snapshot).await;
let data_helper = harness.dataset_data_helper(&dataset_alias).await;

// Round 1
std::fs::write(
&src_path,
indoc!(
"
date,UPPER,lower
2020-01-01,A,1000
2020-01-02,B,2000
2020-01-03,C,3000
"
),
)
.unwrap();

harness.ingest(&dataset_alias).await.unwrap();

data_helper
.assert_last_data_eq(
indoc!(
r#"
message arrow_schema {
REQUIRED INT64 offset;
REQUIRED INT32 op;
REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true));
OPTIONAL INT64 date (TIMESTAMP(MILLIS,true));
OPTIONAL BYTE_ARRAY UPPER (STRING);
OPTIONAL INT64 lower;
}
"#
),
indoc!(
r#"
+--------+----+----------------------+----------------------+-------+-------+
| offset | op | system_time | date | UPPER | lower |
+--------+----+----------------------+----------------------+-------+-------+
| 0 | 0 | 2050-01-01T12:00:00Z | 2020-01-01T00:00:00Z | A | 1000 |
| 1 | 0 | 2050-01-01T12:00:00Z | 2020-01-02T00:00:00Z | B | 2000 |
| 2 | 0 | 2050-01-01T12:00:00Z | 2020-01-03T00:00:00Z | C | 3000 |
+--------+----+----------------------+----------------------+-------+-------+
"#
),
)
.await;

assert_eq!(
data_helper
.get_last_data_block()
.await
.event
.new_watermark
.map(|dt| dt.to_rfc3339()),
Some("2020-01-03T00:00:00+00:00".to_string())
);

// Round 2
std::fs::write(
&src_path,
indoc!(
"
date,UPPER,lower
2020-01-01,A,1000
2020-01-02,B,2000
2020-01-03,C,3000
2020-01-04,D,4000
"
),
)
.unwrap();

harness
.time_source
.set(Utc.with_ymd_and_hms(2050, 1, 2, 12, 0, 0).unwrap());

harness.ingest(&dataset_alias).await.unwrap();

data_helper
.assert_last_data_records_eq(indoc!(
r#"
+--------+----+----------------------+----------------------+-------+-------+
| offset | op | system_time | date | UPPER | lower |
+--------+----+----------------------+----------------------+-------+-------+
| 3 | 0 | 2050-01-02T12:00:00Z | 2020-01-04T00:00:00Z | D | 4000 |
+--------+----+----------------------+----------------------+-------+-------+
"#
))
.await;

assert_eq!(
data_helper
.get_last_data_block()
.await
.event
.new_watermark
.map(|dt| dt.to_rfc3339()),
Some("2020-01-04T00:00:00+00:00".to_string())
);

// Round 3 (no-op)
std::fs::write(
&src_path,
indoc!(
"
date,UPPER,lower
2020-01-01,A,1000
2020-01-02,B,2000
2020-01-03,C,3000
2020-01-04,D,4000
"
),
)
.unwrap();

harness
.time_source
.set(Utc.with_ymd_and_hms(2050, 1, 3, 12, 0, 0).unwrap());

harness.ingest(&dataset_alias).await.unwrap();
let event = data_helper.get_last_block_typed::<AddData>().await.event;

assert_eq!(event.new_data, None);
assert_eq!(
event.new_watermark.map(|dt| dt.to_rfc3339()),
Some("2020-01-04T00:00:00+00:00".to_string())
);
assert!(event.new_source_state.is_some());
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[test_group::group(containerized, engine, ingest, spark)]
#[test_log::test(tokio::test)]
async fn test_ingest_polling_preprocess_with_spark() {
Expand Down

0 comments on commit ed8d6de

Please sign in to comment.