From 3ca8f5f562a12985a0b9481536b008528e42e33f Mon Sep 17 00:00:00 2001 From: Martin Grigorov Date: Tue, 15 Aug 2023 14:32:22 +0300 Subject: [PATCH] AVRO-3814: Fix schema resolution for records in union types (#2441) * AVRO-3814: Add a minimal test-case to reproduce Signed-off-by: Martin Tzvetanov Grigorov * AVRO-3814: Fix schema resolution for records in union types The logic for validation records in Value::validate_internal() would be too strict when resolving union types containing a record. This could lead to a situation where schema resolution would fail because the correct schema to use for a union type could not be identified. This commit fixes this by passing a boolean `schema_resolution` to `Value::validate_internal()` which governs whether schema_resolution rules should be applied. * AVRO-3814: Ensure to validate the deserialized value against the schema * AVRO-3814: Extend test case for validate_record * AVRO-3814: Revert whitespace changes * AVRO-3814: Remove confusing comments * AVRO-3786: Add test-cases and fix for AVRO-3786 * AVRO-3786: Revert change to UnionSchema::find_schema_with_known_schemata * AVRO-3814: [Rust] Use types::Value::resolve_internal() instead of validate_internal() ... when looking for the matching schema in an union Signed-off-by: Martin Tzvetanov Grigorov * AVRO-3814: Revert changes to validate_internal() Signed-off-by: Martin Tzvetanov Grigorov * AVRO-3814: Remove obsolete rustdoc for arguments Signed-off-by: Martin Tzvetanov Grigorov --------- Signed-off-by: Martin Tzvetanov Grigorov Co-authored-by: Rik Heijdens --- avro/src/schema.rs | 130 ++++++- avro/src/types.rs | 4 +- avro/tests/avro-3786.rs | 727 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 858 insertions(+), 3 deletions(-) diff --git a/avro/src/schema.rs b/avro/src/schema.rs index 26ed04c..d688473 100644 --- a/avro/src/schema.rs +++ b/avro/src/schema.rs @@ -837,9 +837,11 @@ impl UnionSchema { // extend known schemas with just resolved names collected_names.extend(resolved_names); let namespace = &schema.namespace().or_else(|| enclosing_namespace.clone()); + value - .validate_internal(schema, &collected_names, namespace) - .is_none() + .clone() + .resolve_internal(schema, &collected_names, namespace, &None) + .is_ok() }) } } @@ -5209,4 +5211,128 @@ mod tests { Ok(()) } + + #[test] + fn test_avro_3814_schema_resolution_failure() -> TestResult { + // Define a reader schema: a nested record with an optional field. + let reader_schema = json!( + { + "type": "record", + "name": "MyOuterRecord", + "fields": [ + { + "name": "inner_record", + "type": [ + "null", + { + "type": "record", + "name": "MyRecord", + "fields": [ + {"name": "a", "type": "string"} + ] + } + ], + "default": null + } + ] + } + ); + + // Define a writer schema: a nested record with an optional field, which + // may optionally contain an enum. + let writer_schema = json!( + { + "type": "record", + "name": "MyOuterRecord", + "fields": [ + { + "name": "inner_record", + "type": [ + "null", + { + "type": "record", + "name": "MyRecord", + "fields": [ + {"name": "a", "type": "string"}, + { + "name": "b", + "type": [ + "null", + { + "type": "enum", + "name": "MyEnum", + "symbols": ["A", "B", "C"], + "default": "C" + } + ], + "default": null + }, + ] + } + ] + } + ], + "default": null + } + ); + + // Use different structs to represent the "Reader" and the "Writer" + // to mimic two different versions of a producer & consumer application. + #[derive(Serialize, Deserialize, Debug)] + struct MyInnerRecordReader { + a: String, + } + + #[derive(Serialize, Deserialize, Debug)] + struct MyRecordReader { + inner_record: Option, + } + + #[derive(Serialize, Deserialize, Debug)] + enum MyEnum { + A, + B, + C, + } + + #[derive(Serialize, Deserialize, Debug)] + struct MyInnerRecordWriter { + a: String, + b: Option, + } + + #[derive(Serialize, Deserialize, Debug)] + struct MyRecordWriter { + inner_record: Option, + } + + let s = MyRecordWriter { + inner_record: Some(MyInnerRecordWriter { + a: "foo".to_string(), + b: None, + }), + }; + + // Serialize using the writer schema. + let writer_schema = Schema::parse(&writer_schema)?; + let avro_value = crate::to_value(s)?; + assert!( + avro_value.validate(&writer_schema), + "value is valid for schema", + ); + let datum = crate::to_avro_datum(&writer_schema, avro_value)?; + + // Now, attempt to deserialize using the reader schema. + let reader_schema = Schema::parse(&reader_schema)?; + let mut x = &datum[..]; + + // Deserialization should succeed and we should be able to resolve the schema. + let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?; + assert!(deser_value.validate(&reader_schema)); + + // Verify that we can read a field from the record. + let d: MyRecordReader = crate::from_value(&deser_value)?; + assert_eq!(d.inner_record.unwrap().a, "foo".to_string()); + Ok(()) + } } diff --git a/avro/src/types.rs b/avro/src/types.rs index fbc4fa0..07a51c3 100644 --- a/avro/src/types.rs +++ b/avro/src/types.rs @@ -377,6 +377,7 @@ impl Value { } } + /// Validates the value against the provided schema. pub(crate) fn validate_internal + Debug>( &self, schema: &Schema, @@ -516,6 +517,7 @@ impl Value { let non_nullable_fields_count = fields.iter().filter(|&rf| !rf.is_nullable()).count(); + // If the record contains fewer fields as required fields by the schema, it is invalid. if record_fields.len() < non_nullable_fields_count { return Some(format!( "The value's records length ({}) doesn't match the schema ({} non-nullable fields)", @@ -603,7 +605,7 @@ impl Value { self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None) } - fn resolve_internal( + pub(crate) fn resolve_internal( mut self, schema: &Schema, names: &NamesRef, diff --git a/avro/tests/avro-3786.rs b/avro/tests/avro-3786.rs index f36b163..d27e0c4 100644 --- a/avro/tests/avro-3786.rs +++ b/avro/tests/avro-3786.rs @@ -157,3 +157,730 @@ fn avro_3786_deserialize_union_with_different_enum_order() -> TestResult { } Ok(()) } + +#[test] +fn avro_3786_deserialize_union_with_different_enum_order_defined_in_record() -> TestResult { + #[derive( + Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize, + )] + pub enum Bar { + #[serde(rename = "bar0")] + Bar0, + #[serde(rename = "bar1")] + Bar1, + #[serde(rename = "bar2")] + Bar2, + } + #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] + pub struct BarParent { + pub bar: Bar, + } + #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] + pub struct Foo { + #[serde(rename = "barParent")] + pub bar_parent: Option, + } + let writer_schema = r#"{ + "type": "record", + "name": "Foo", + "namespace": "com.rallyhealth.devices.canonical.avro.model.v6_0", + "fields": + [ + { + "name": "barParent", + "type": [ + "null", + { + "type": "record", + "name": "BarParent", + "fields": [ + { + "name": "bar", + "type": { + "type": "enum", + "name": "Bar", + "symbols": + [ + "bar0", + "bar1", + "bar2" + ], + "default": "bar0" + } + } + ] + } + ] + } + ] + }"#; + let reader_schema = r#"{ + "type": "record", + "name": "Foo", + "namespace": "com.rallyhealth.devices.canonical.avro.model.v6_0", + "fields": + [ + { + "name": "barParent", + "type": [ + "null", + { + "type": "record", + "name": "BarParent", + "fields": [ + { + "name": "bar", + "type": { + "type": "enum", + "name": "Bar", + "symbols": + [ + "bar0", + "bar2" + ], + "default": "bar0" + } + } + ] + } + ] + } + ] + }"#; + let writer_schema = Schema::parse_str(writer_schema)?; + let foo1 = Foo { + bar_parent: Some(BarParent { bar: Bar::Bar0 }), + }; + let avro_value = crate::to_value(foo1)?; + assert!( + avro_value.validate(&writer_schema), + "value is valid for schema", + ); + let datum = crate::to_avro_datum(&writer_schema, avro_value)?; + let mut x = &datum[..]; + let reader_schema = Schema::parse_str(reader_schema)?; + let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?; + match deser_value { + types::Value::Record(fields) => { + assert_eq!(fields.len(), 1); + assert_eq!(fields[0].0, "barParent"); + // TODO: better validation + } + _ => panic!("Expected Value::Record"), + } + Ok(()) +} + +#[test] +fn test_avro_3786_deserialize_union_with_different_enum_order_defined_in_record_v1() -> TestResult { + #[derive( + Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize, + )] + pub enum Bar { + #[serde(rename = "bar0")] + Bar0, + #[serde(rename = "bar1")] + Bar1, + #[serde(rename = "bar2")] + Bar2, + } + #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] + pub struct BarParent { + pub bar: Bar, + } + #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] + pub struct Foo { + #[serde(rename = "barParent")] + pub bar_parent: Option, + } + let writer_schema = r#"{ + "type": "record", + "name": "Foo", + "namespace": "com.rallyhealth.devices.canonical.avro.model.v6_0", + "fields": + [ + { + "name": "barParent", + "type": [ + "null", + { + "type": "record", + "name": "BarParent", + "fields": [ + { + "name": "bar", + "type": { + "type": "enum", + "name": "Bar", + "symbols": + [ + "bar0", + "bar1", + "bar2" + ], + "default": "bar0" + } + } + ] + } + ] + } + ] + }"#; + let reader_schema = r#"{ + "type": "record", + "name": "Foo", + "namespace": "com.rallyhealth.devices.canonical.avro.model.v6_0", + "fields": + [ + { + "name": "barParent", + "type": [ + "null", + { + "type": "record", + "name": "BarParent", + "fields": [ + { + "name": "bar", + "type": { + "type": "enum", + "name": "Bar", + "symbols": + [ + "bar0", + "bar2" + ], + "default": "bar0" + } + } + ] + } + ] + } + ] + }"#; + let writer_schema = Schema::parse_str(writer_schema)?; + let foo1 = Foo { + bar_parent: Some(BarParent { bar: Bar::Bar1 }), + }; + let avro_value = crate::to_value(foo1)?; + assert!( + avro_value.validate(&writer_schema), + "value is valid for schema", + ); + let datum = crate::to_avro_datum(&writer_schema, avro_value)?; + let mut x = &datum[..]; + let reader_schema = Schema::parse_str(reader_schema)?; + let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?; + match deser_value { + types::Value::Record(fields) => { + assert_eq!(fields.len(), 1); + assert_eq!(fields[0].0, "barParent"); + // TODO: better validation + } + _ => panic!("Expected Value::Record"), + } + Ok(()) +} + +#[test] +fn test_avro_3786_deserialize_union_with_different_enum_order_defined_in_record_v2() -> TestResult { + #[derive( + Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize, + )] + pub enum Bar { + #[serde(rename = "bar0")] + Bar0, + #[serde(rename = "bar1")] + Bar1, + #[serde(rename = "bar2")] + Bar2, + } + #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] + pub struct BarParent { + pub bar: Bar, + } + #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] + pub struct Foo { + #[serde(rename = "barParent")] + pub bar_parent: Option, + } + let writer_schema = r#"{ + "type": "record", + "name": "Foo", + "namespace": "com.rallyhealth.devices.canonical.avro.model.v6_0", + "fields": + [ + { + "name": "barParent", + "type": [ + "null", + { + "type": "record", + "name": "BarParent", + "fields": [ + { + "name": "bar", + "type": { + "type": "enum", + "name": "Bar", + "symbols": + [ + "bar0", + "bar1", + "bar2" + ], + "default": "bar2" + } + } + ] + } + ] + } + ] + }"#; + let reader_schema = r#"{ + "type": "record", + "name": "Foo", + "namespace": "com.rallyhealth.devices.canonical.avro.model.v6_0", + "fields": + [ + { + "name": "barParent", + "type": [ + "null", + { + "type": "record", + "name": "BarParent", + "fields": [ + { + "name": "bar", + "type": { + "type": "enum", + "name": "Bar", + "symbols": + [ + "bar1", + "bar2" + ], + "default": "bar2" + } + } + ] + } + ] + } + ] + }"#; + let writer_schema = Schema::parse_str(writer_schema)?; + let foo1 = Foo { + bar_parent: Some(BarParent { bar: Bar::Bar1 }), + }; + let avro_value = crate::to_value(foo1)?; + assert!( + avro_value.validate(&writer_schema), + "value is valid for schema", + ); + let datum = crate::to_avro_datum(&writer_schema, avro_value)?; + let mut x = &datum[..]; + let reader_schema = Schema::parse_str(reader_schema)?; + let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?; + match deser_value { + types::Value::Record(fields) => { + assert_eq!(fields.len(), 1); + assert_eq!(fields[0].0, "barParent"); + // TODO: better validation + } + _ => panic!("Expected Value::Record"), + } + Ok(()) +} + +#[test] +fn deserialize_union_with_different_enum_order_defined_in_record() -> TestResult { + #[derive( + Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize, + )] + pub enum Bar { + #[serde(rename = "bar0")] + Bar0, + #[serde(rename = "bar1")] + Bar1, + #[serde(rename = "bar2")] + Bar2, + } + #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] + pub struct BarParent { + pub bar: Bar, + } + #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] + pub struct Foo { + #[serde(rename = "barParent")] + pub bar_parent: Option, + } + let writer_schema = r#"{ + "type": "record", + "name": "Foo", + "namespace": "com.rallyhealth.devices.canonical.avro.model.v6_0", + "fields": + [ + { + "name": "barParent", + "type": [ + "null", + { + "type": "record", + "name": "BarParent", + "fields": [ + { + "name": "bar", + "type": { + "type": "enum", + "name": "Bar", + "symbols": + [ + "bar0", + "bar1", + "bar2" + ], + "default": "bar0" + } + } + ] + } + ] + } + ] + }"#; + let reader_schema = r#"{ + "type": "record", + "name": "Foo", + "namespace": "com.rallyhealth.devices.canonical.avro.model.v6_0", + "fields": + [ + { + "name": "barParent", + "type": [ + "null", + { + "type": "record", + "name": "BarParent", + "fields": [ + { + "name": "bar", + "type": { + "type": "enum", + "name": "Bar", + "symbols": + [ + "bar0", + "bar2" + ], + "default": "bar0" + } + } + ] + } + ] + } + ] + }"#; + let writer_schema = Schema::parse_str(writer_schema)?; + let foo1 = Foo { + bar_parent: Some(BarParent { bar: Bar::Bar2 }), + }; + let avro_value = crate::to_value(foo1)?; + assert!( + avro_value.validate(&writer_schema), + "value is valid for schema", + ); + let datum = crate::to_avro_datum(&writer_schema, avro_value)?; + let mut x = &datum[..]; + let reader_schema = Schema::parse_str(reader_schema)?; + let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?; + match deser_value { + types::Value::Record(fields) => { + assert_eq!(fields.len(), 1); + assert_eq!(fields[0].0, "barParent"); + // TODO: better validation + } + _ => panic!("Expected Value::Record"), + } + Ok(()) +} + +#[test] +fn deserialize_union_with_record_with_enum_defined_inline_reader_has_different_indices( +) -> TestResult { + #[derive( + Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize, + )] + pub enum DefinedInRecord { + #[serde(rename = "val0")] + Val0, + #[serde(rename = "val1")] + Val1, + #[serde(rename = "val2")] + Val2, + #[serde(rename = "UNKNOWN")] + Unknown, + } + #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] + pub struct Parent { + pub date: i64, + #[serde(rename = "barUse")] + pub bar_use: Bar, + #[serde(rename = "bazUse")] + pub baz_use: Option>, + #[serde(rename = "definedInRecord")] + pub defined_in_record: DefinedInRecord, + #[serde(rename = "optionalString")] + pub optional_string: Option, + } + #[derive( + Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize, + )] + pub enum Baz { + #[serde(rename = "baz0")] + Baz0, + #[serde(rename = "baz1")] + Baz1, + #[serde(rename = "baz2")] + Baz2, + } + #[derive( + Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize, + )] + pub enum Bar { + #[serde(rename = "bar0")] + Bar0, + #[serde(rename = "bar1")] + Bar1, + #[serde(rename = "bar2")] + Bar2, + } + #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] + pub struct Foo { + #[serde(rename = "barInit")] + pub bar_init: Bar, + pub baz: Baz, + pub parent: Option, + } + let writer_schema = r#"{ + "type": "record", + "name": "Foo", + "namespace": "fake", + "fields": + [ + { + "name": "barInit", + "type": + { + "type": "enum", + "name": "Bar", + "symbols": + [ + "bar0", + "bar1", + "bar2" + ], + "default": "bar0" + } + }, + { + "name": "baz", + "type": + { + "type": "enum", + "name": "Baz", + "symbols": + [ + "baz0", + "baz1", + "baz2" + ], + "default": "baz0" + } + }, + { + "name": "parent", + "type": [ + "null", + { + "type": "record", + "name": "Parent", + "fields": [ + { + "name": "date", + "type": { + "type": "long", + "avro.java.long": "Long" + } + }, + { + "name": "barUse", + "type": "Bar" + }, + { + "name": "bazUse", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "Baz" + } + } + ] + }, + { + "name": "definedInRecord", + "type": { + "name": "DefinedInRecord", + "type": "enum", + "symbols": [ + "val0", + "val1", + "val2", + "UNKNOWN" + ], + "default": "UNKNOWN" + } + }, + { + "name": "optionalString", + "type": [ + "null", + "string" + ] + } + ] + } + ] + } + ] + }"#; + let reader_schema = r#"{ + "type": "record", + "name": "Foo", + "namespace": "fake", + "fields": + [ + { + "name": "barInit", + "type": + { + "type": "enum", + "name": "Bar", + "symbols": + [ + "bar0", + "bar2" + ], + "default": "bar0" + } + }, + { + "name": "baz", + "type": + { + "type": "enum", + "name": "Baz", + "symbols": + [ + "baz0", + "baz2" + ], + "default": "baz0" + } + }, + { + "name": "parent", + "type": [ + "null", + { + "type": "record", + "name": "Parent", + "fields": [ + { + "name": "date", + "type": { + "type": "long", + "avro.java.long": "Long" + } + }, + { + "name": "barUse", + "type": "Bar" + }, + { + "name": "bazUse", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "Baz" + } + } + ] + }, + { + "name": "definedInRecord", + "type": { + "name": "DefinedInRecord", + "type": "enum", + "symbols": [ + "val1", + "val2", + "UNKNOWN" + ], + "default": "UNKNOWN" + } + }, + { + "name": "optionalString", + "type": [ + "null", + "string" + ] + } + ] + } + ] + } + ] + }"#; + let writer_schema = Schema::parse_str(writer_schema)?; + let foo1 = Foo { + bar_init: Bar::Bar0, + baz: Baz::Baz0, + parent: Some(Parent { + bar_use: Bar::Bar0, + baz_use: Some(vec![Baz::Baz0]), + optional_string: Some("test".to_string()), + date: 1689197893, + defined_in_record: DefinedInRecord::Val1, + }), + }; + let avro_value = crate::to_value(foo1)?; + assert!( + avro_value.validate(&writer_schema), + "value is valid for schema", + ); + let datum = crate::to_avro_datum(&writer_schema, avro_value)?; + let mut x = &datum[..]; + let reader_schema = Schema::parse_str(reader_schema)?; + let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?; + match deser_value { + types::Value::Record(fields) => { + assert_eq!(fields.len(), 3); + assert_eq!(fields[0].0, "barInit"); + assert_eq!(fields[0].1, types::Value::Enum(0, "bar0".to_string())); + // TODO: better validation + } + _ => panic!("Expected Value::Record"), + } + Ok(()) +}