Skip to content

Commit

Permalink
protocols/flow: add array inference to protocol (#1787)
Browse files Browse the repository at this point in the history
  • Loading branch information
williamhbaker authored Dec 11, 2024
1 parent 3f92db5 commit 552f6c0
Show file tree
Hide file tree
Showing 30 changed files with 1,790 additions and 822 deletions.
5 changes: 4 additions & 1 deletion crates/assemble/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub fn inference(shape: &Shape, exists: Exists) -> flow::Inference {
} else {
None
},
array: None,
}
}

Expand Down Expand Up @@ -120,7 +121,9 @@ pub fn partition_template(
let compression_codec = compression_codec(codec.unwrap_or(models::CompressionCodec::Gzip));

// If an explicit flush interval isn't provided, default to 24 hours
let flush_interval = flush_interval.unwrap_or(std::time::Duration::from_secs(24 * 3600)).into();
let flush_interval = flush_interval
.unwrap_or(std::time::Duration::from_secs(24 * 3600))
.into();

// If a fragment length isn't set, default and then map MB to bytes.
let length = (length.unwrap_or(512) as i64) << 20;
Expand Down
3 changes: 3 additions & 0 deletions crates/assemble/src/snapshots/assemble__test__inference.snap
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ expression: "&[out1, out2, out3]"
secret: true,
exists: Must,
numeric: None,
array: None,
},
Inference {
types: [
Expand All @@ -34,6 +35,7 @@ expression: "&[out1, out2, out3]"
secret: true,
exists: May,
numeric: None,
array: None,
},
Inference {
types: [
Expand Down Expand Up @@ -61,5 +63,6 @@ expression: "&[out1, out2, out3]"
maximum: 1000.0,
},
),
array: None,
},
]
24 changes: 24 additions & 0 deletions crates/proto-flow/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pub struct Inference {
pub exists: i32,
#[prost(message, optional, tag = "9")]
pub numeric: ::core::option::Option<inference::Numeric>,
#[prost(message, optional, tag = "10")]
pub array: ::core::option::Option<inference::Array>,
}
/// Nested message and enum types in `Inference`.
pub mod inference {
Expand Down Expand Up @@ -130,6 +132,28 @@ pub mod inference {
#[prost(double, tag = "4")]
pub maximum: f64,
}
/// Array type-specific inferences. Will be nil if types doesn't include
/// "array", or if the specification was built prior to array inference
/// existing in the protocol.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Array {
/// Minimum number of items the array must contain.
#[prost(uint32, tag = "1")]
pub min_items: u32,
/// True if there is an inferred maximum allowed number of items the array
/// may contain, otherwise False.
#[prost(bool, tag = "2")]
pub has_max_items: bool,
/// Maximum number of items the array may contain.
#[prost(uint32, tag = "3")]
pub max_items: u32,
/// The possible types of items contained in this array.
/// Subset of ["null", "boolean", "object", "array", "integer", "numeric",
/// "string"].
#[prost(string, repeated, tag = "4")]
pub item_types: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
/// Exists enumerates the possible states of existence for a location.
#[derive(
Clone,
Expand Down
167 changes: 167 additions & 0 deletions crates/proto-flow/src/flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2573,6 +2573,9 @@ impl serde::Serialize for Inference {
if self.numeric.is_some() {
len += 1;
}
if self.array.is_some() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("flow.Inference", len)?;
if !self.types.is_empty() {
struct_ser.serialize_field("types", &self.types)?;
Expand Down Expand Up @@ -2600,6 +2603,9 @@ impl serde::Serialize for Inference {
if let Some(v) = self.numeric.as_ref() {
struct_ser.serialize_field("numeric", v)?;
}
if let Some(v) = self.array.as_ref() {
struct_ser.serialize_field("array", v)?;
}
struct_ser.end()
}
}
Expand All @@ -2619,6 +2625,7 @@ impl<'de> serde::Deserialize<'de> for Inference {
"secret",
"exists",
"numeric",
"array",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -2631,6 +2638,7 @@ impl<'de> serde::Deserialize<'de> for Inference {
Secret,
Exists,
Numeric,
Array,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand Down Expand Up @@ -2660,6 +2668,7 @@ impl<'de> serde::Deserialize<'de> for Inference {
"secret" => Ok(GeneratedField::Secret),
"exists" => Ok(GeneratedField::Exists),
"numeric" => Ok(GeneratedField::Numeric),
"array" => Ok(GeneratedField::Array),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand Down Expand Up @@ -2687,6 +2696,7 @@ impl<'de> serde::Deserialize<'de> for Inference {
let mut secret__ = None;
let mut exists__ = None;
let mut numeric__ = None;
let mut array__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Types => {
Expand Down Expand Up @@ -2737,6 +2747,12 @@ impl<'de> serde::Deserialize<'de> for Inference {
}
numeric__ = map_.next_value()?;
}
GeneratedField::Array => {
if array__.is_some() {
return Err(serde::de::Error::duplicate_field("array"));
}
array__ = map_.next_value()?;
}
}
}
Ok(Inference {
Expand All @@ -2748,12 +2764,163 @@ impl<'de> serde::Deserialize<'de> for Inference {
secret: secret__.unwrap_or_default(),
exists: exists__.unwrap_or_default(),
numeric: numeric__,
array: array__,
})
}
}
deserializer.deserialize_struct("flow.Inference", FIELDS, GeneratedVisitor)
}
}
impl serde::Serialize for inference::Array {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct;
let mut len = 0;
if self.min_items != 0 {
len += 1;
}
if self.has_max_items {
len += 1;
}
if self.max_items != 0 {
len += 1;
}
if !self.item_types.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("flow.Inference.Array", len)?;
if self.min_items != 0 {
struct_ser.serialize_field("minItems", &self.min_items)?;
}
if self.has_max_items {
struct_ser.serialize_field("hasMaxItems", &self.has_max_items)?;
}
if self.max_items != 0 {
struct_ser.serialize_field("maxItems", &self.max_items)?;
}
if !self.item_types.is_empty() {
struct_ser.serialize_field("itemTypes", &self.item_types)?;
}
struct_ser.end()
}
}
impl<'de> serde::Deserialize<'de> for inference::Array {
#[allow(deprecated)]
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
const FIELDS: &[&str] = &[
"min_items",
"minItems",
"has_max_items",
"hasMaxItems",
"max_items",
"maxItems",
"item_types",
"itemTypes",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
MinItems,
HasMaxItems,
MaxItems,
ItemTypes,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
where
D: serde::Deserializer<'de>,
{
struct GeneratedVisitor;

impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
type Value = GeneratedField;

fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(formatter, "expected one of: {:?}", &FIELDS)
}

#[allow(unused_variables)]
fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
where
E: serde::de::Error,
{
match value {
"minItems" | "min_items" => Ok(GeneratedField::MinItems),
"hasMaxItems" | "has_max_items" => Ok(GeneratedField::HasMaxItems),
"maxItems" | "max_items" => Ok(GeneratedField::MaxItems),
"itemTypes" | "item_types" => Ok(GeneratedField::ItemTypes),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
}
deserializer.deserialize_identifier(GeneratedVisitor)
}
}
struct GeneratedVisitor;
impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
type Value = inference::Array;

fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("struct flow.Inference.Array")
}

fn visit_map<V>(self, mut map_: V) -> std::result::Result<inference::Array, V::Error>
where
V: serde::de::MapAccess<'de>,
{
let mut min_items__ = None;
let mut has_max_items__ = None;
let mut max_items__ = None;
let mut item_types__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::MinItems => {
if min_items__.is_some() {
return Err(serde::de::Error::duplicate_field("minItems"));
}
min_items__ =
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
GeneratedField::HasMaxItems => {
if has_max_items__.is_some() {
return Err(serde::de::Error::duplicate_field("hasMaxItems"));
}
has_max_items__ = Some(map_.next_value()?);
}
GeneratedField::MaxItems => {
if max_items__.is_some() {
return Err(serde::de::Error::duplicate_field("maxItems"));
}
max_items__ =
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
GeneratedField::ItemTypes => {
if item_types__.is_some() {
return Err(serde::de::Error::duplicate_field("itemTypes"));
}
item_types__ = Some(map_.next_value()?);
}
}
}
Ok(inference::Array {
min_items: min_items__.unwrap_or_default(),
has_max_items: has_max_items__.unwrap_or_default(),
max_items: max_items__.unwrap_or_default(),
item_types: item_types__.unwrap_or_default(),
})
}
}
deserializer.deserialize_struct("flow.Inference.Array", FIELDS, GeneratedVisitor)
}
}
impl serde::Serialize for inference::Exists {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
Expand Down
6 changes: 6 additions & 0 deletions crates/proto-flow/tests/regression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ fn ex_projections() -> Vec<flow::Projection> {
has_maximum: false,
maximum: 0.0,
}),
array: Some(inference::Array {
min_items: 10,
has_max_items: true,
max_items: 20,
item_types: vec!["null".to_string(), "integer".to_string()],
}),
}),
}]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ expression: json_test(msg)
"numeric": {
"hasMinimum": true,
"minimum": -1000.0
},
"array": {
"minItems": 10,
"hasMaxItems": true,
"maxItems": 20,
"itemTypes": [
"null",
"integer"
]
}
}
}
Expand Down Expand Up @@ -141,6 +150,15 @@ expression: json_test(msg)
"numeric": {
"hasMinimum": true,
"minimum": -1000.0
},
"array": {
"minItems": 10,
"hasMaxItems": true,
"maxItems": 20,
"itemTypes": [
"null",
"integer"
]
}
}
}
Expand Down Expand Up @@ -276,6 +294,15 @@ expression: json_test(msg)
"numeric": {
"hasMinimum": true,
"minimum": -1000.0
},
"array": {
"minItems": 10,
"hasMaxItems": true,
"maxItems": 20,
"itemTypes": [
"null",
"integer"
]
}
}
}
Expand Down
Loading

0 comments on commit 552f6c0

Please sign in to comment.