diff --git a/Cargo.lock b/Cargo.lock index 62b9783df1a..feef1009e9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -169,6 +169,12 @@ dependencies = [ "serde", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "ansi_term" version = "0.12.1" @@ -307,7 +313,7 @@ dependencies = [ "bitflags", "cexpr", "clang-sys", - "clap", + "clap 2.34.0", "env_logger 0.9.0", "lazy_static", "lazycell", @@ -389,7 +395,6 @@ dependencies = [ "lazy_static", "memchr", "regex-automata", - "serde", ] [[package]] @@ -450,12 +455,9 @@ dependencies = [ [[package]] name = "cast" -version = "0.2.7" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c24dab4283a142afa2fdca129b80ad2c6284e073930f964c3a1293c225ee39a" -dependencies = [ - "rustc_version 0.4.0", -] +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" @@ -501,6 +503,33 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "ciborium" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c137568cc60b904a7724001b35ce2630fd00d5d84805fbb608ab89509d788f" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "346de753af073cc87b52b2083a506b38ac176a44cfb05497b622e27be899b369" + +[[package]] +name = "ciborium-ll" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213030a2b5a4e0c0892b6652260cf6ccac84827b83a85a534e178e3906c4cf1b" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "clang-sys" version = "1.3.3" @@ -523,11 +552,32 @@ dependencies = [ "bitflags", "strsim", "term_size", - "textwrap", + "textwrap 0.11.0", "unicode-width", "vec_map", ] +[[package]] +name = "clap" +version = "3.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71655c45cb9845d3270c9d6df84ebe72b4dad3c2ba3f7023ad47c144e4e473a5" +dependencies = [ + "bitflags", + "clap_lex", + "indexmap", + "textwrap 0.16.0", +] + +[[package]] +name = "clap_lex" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" +dependencies = [ + "os_str_bytes", +] + [[package]] name = "clear_on_drop" version = "0.2.5" @@ -667,15 +717,16 @@ dependencies = [ [[package]] name = "criterion" -version = "0.3.5" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1604dafd25fba2fe2d5895a9da139f8dc9b319a5fe5354ca137cbbce4e178d10" +checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb" dependencies = [ + "anes", "atty", "cast", - "clap", + "ciborium", + "clap 3.2.23", "criterion-plot", - "csv", "itertools 0.10.3", "lazy_static", "num-traits 0.2.15", @@ -684,7 +735,6 @@ dependencies = [ "rayon", "regex", "serde", - "serde_cbor", "serde_derive", "serde_json", "tinytemplate", @@ -693,9 +743,9 @@ dependencies = [ [[package]] name = "criterion-plot" -version = "0.4.4" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d00996de9f2f7559f7f4dc286073197f83e92256a59ed395f9aac01fe717da57" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" dependencies = [ "cast", "itertools 0.10.3", @@ -823,28 +873,6 @@ dependencies = [ "subtle 1.0.0", ] -[[package]] -name = "csv" -version = "1.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" -dependencies = [ - "bstr", - "csv-core", - "itoa 0.4.8", - "ryu", - "serde", -] - -[[package]] -name = "csv-core" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" -dependencies = [ - "memchr", -] - [[package]] name = "curve25519-dalek" version = "1.2.6" @@ -2427,6 +2455,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "os_str_bytes" +version = "6.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" + [[package]] name = "owning_ref" version = "0.4.1" @@ -3150,7 +3184,7 @@ name = "relay" version = "23.1.0" dependencies = [ "anyhow", - "clap", + "clap 2.34.0", "console 0.10.3", "dialoguer", "futures 0.1.31", @@ -3434,6 +3468,7 @@ name = "relay-replays" version = "23.1.0" dependencies = [ "assert-json-diff", + "criterion", "flate2", "insta", "rand 0.8.5", @@ -3474,7 +3509,7 @@ dependencies = [ "brotli2", "bytes 0.4.12", "chrono", - "clap", + "clap 2.34.0", "data-encoding", "failure", "flate2", @@ -3977,16 +4012,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde_cbor" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" -dependencies = [ - "half", - "serde", -] - [[package]] name = "serde_derive" version = "1.0.137" @@ -4242,7 +4267,7 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10" dependencies = [ - "clap", + "clap 2.34.0", "lazy_static", "paw", "structopt-derive", @@ -4412,6 +4437,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "textwrap" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" + [[package]] name = "thiserror" version = "1.0.37" diff --git a/relay-general/Cargo.toml b/relay-general/Cargo.toml index e718b64263f..4acb25ca226 100644 --- a/relay-general/Cargo.toml +++ b/relay-general/Cargo.toml @@ -42,7 +42,7 @@ utf16string = "0.2.0" uuid = { version = "0.8.1", features = ["v4", "serde"] } [dev-dependencies] -criterion = "0.3" +criterion = "0.4" insta = { version = "1.19.0", features = ["json", "redactions", "ron", "yaml"] } pretty-hex = "0.2.0" similar-asserts = "1.4.2" diff --git a/relay-metrics/Cargo.toml b/relay-metrics/Cargo.toml index fad598d4c3b..9a2d29e272e 100644 --- a/relay-metrics/Cargo.toml +++ b/relay-metrics/Cargo.toml @@ -23,7 +23,7 @@ thiserror = "1.0.20" tokio = { version = "1.0", features = ["macros", "time"] } [dev-dependencies] -criterion = "0.3" +criterion = "0.4" insta = "1.19.0" relay-statsd = { path = "../relay-statsd", features = ["test"] } relay-test = { path = "../relay-test" } diff --git a/relay-replays/Cargo.toml b/relay-replays/Cargo.toml index 4913ff0c8c7..5d0981f3cfd 100644 --- a/relay-replays/Cargo.toml +++ b/relay-replays/Cargo.toml @@ -22,5 +22,10 @@ unicase = "2.6.0" flate2 = "1.0.19" [dev-dependencies] +criterion = "0.4" insta = { version = "1.1.0", features = ["ron"] } assert-json-diff = "2.0.2" + +[[bench]] +name = "benchmarks" +harness = false diff --git a/relay-replays/benches/benchmarks.rs b/relay-replays/benches/benchmarks.rs new file mode 100644 index 00000000000..de7514fb3d5 --- /dev/null +++ b/relay-replays/benches/benchmarks.rs @@ -0,0 +1,14 @@ +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; + +use relay_replays::recording::_deserialize_event; + +fn bench_recording(c: &mut Criterion) { + let payload = include_bytes!("../tests/fixtures/rrweb.json"); + + c.bench_with_input(BenchmarkId::new("rrweb", 1), &payload, |b, &_| { + b.iter(|| _deserialize_event(payload)); + }); +} + +criterion_group!(benches, bench_recording); +criterion_main!(benches); diff --git a/relay-replays/src/recording.rs b/relay-replays/src/recording/mod.rs similarity index 76% rename from relay-replays/src/recording.rs rename to relay-replays/src/recording/mod.rs index d8d122dd418..00d48da348a 100644 --- a/relay-replays/src/recording.rs +++ b/relay-replays/src/recording/mod.rs @@ -10,9 +10,11 @@ use relay_general::processor::{ use relay_general::types::{Meta, ProcessingAction}; use flate2::{read::ZlibDecoder, write::ZlibEncoder, Compression}; -use serde::{de::Error as DError, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use serde_json::Value; +mod serialization; + /// Parses compressed replay recording payloads and applies data scrubbers. /// /// `limit` controls the maximum size in bytes during decompression. This function returns an `Err` @@ -158,9 +160,10 @@ impl RecordingProcessor<'_> { } } NodeVariant::T2(element) => self.recurse_element(element)?, - NodeVariant::Rest(text) => { + NodeVariant::T3(text) | NodeVariant::T4(text) | NodeVariant::T5(text) => { self.strip_pii(&mut text.text_content)?; } + _ => {} } @@ -232,56 +235,19 @@ impl RecordingProcessor<'_> { /// -> CUSTOM = 5 /// -> PLUGIN = 6 -#[derive(Debug, Serialize)] -#[serde(untagged)] +#[derive(Debug)] enum Event { - T2(FullSnapshotEvent), - T3(IncrementalSnapshotEvent), - T4(MetaEvent), - T5(CustomEvent), - Default(Value), - // 0: DOMContentLoadedEvent, - // 1: LoadEvent, - // 6: PluginEvent, -} - -impl<'de> serde::Deserialize<'de> for Event { - fn deserialize>(d: D) -> Result { - let value = Value::deserialize(d)?; - - match value.get("type") { - Some(val) => match Value::as_u64(val) { - Some(v) => match v { - 2 => match FullSnapshotEvent::deserialize(value) { - Ok(event) => Ok(Event::T2(event)), - Err(_) => Err(DError::custom("could not parse snapshot event")), - }, - 3 => match IncrementalSnapshotEvent::deserialize(value) { - Ok(event) => Ok(Event::T3(event)), - Err(_) => Err(DError::custom("could not parse incremental snapshot event")), - }, - 4 => match MetaEvent::deserialize(value) { - Ok(event) => Ok(Event::T4(event)), - Err(_) => Err(DError::custom("could not parse meta event")), - }, - 5 => match CustomEvent::deserialize(value) { - Ok(event) => Ok(Event::T5(event)), - Err(e) => Err(DError::custom(e.to_string())), - }, - 0 | 1 | 6 => Ok(Event::Default(value)), - _ => Err(DError::custom("invalid type value")), - }, - None => Err(DError::custom("type field must be an integer")), - }, - None => Err(DError::missing_field("type")), - } - } + T0(Value), // 0: DOMContentLoadedEvent, + T1(Value), // 1: LoadEvent, + T2(Box), + T3(Box), + T4(Box), + T5(Box), + T6(Value), // 6: PluginEvent, } #[derive(Debug, Serialize, Deserialize)] struct FullSnapshotEvent { - #[serde(rename = "type")] - ty: u8, timestamp: u64, data: FullSnapshotEventData, } @@ -295,24 +261,18 @@ struct FullSnapshotEventData { #[derive(Debug, Serialize, Deserialize)] struct IncrementalSnapshotEvent { - #[serde(rename = "type")] - ty: u8, timestamp: u64, data: IncrementalSourceDataVariant, } #[derive(Debug, Serialize, Deserialize)] struct MetaEvent { - #[serde(rename = "type")] - ty: u8, timestamp: u64, data: Value, } #[derive(Debug, Serialize, Deserialize)] struct CustomEvent { - #[serde(rename = "type")] - ty: u8, timestamp: f64, data: CustomEventDataVariant, } @@ -321,9 +281,9 @@ struct CustomEvent { #[serde(untagged)] enum CustomEventDataVariant { #[serde(rename = "breadcrumb")] - Breadcrumb(Breadcrumb), + Breadcrumb(Box), #[serde(rename = "performanceSpan")] - PerformanceSpan(PerformanceSpan), + PerformanceSpan(Box), } #[derive(Debug, Serialize, Deserialize)] @@ -391,52 +351,20 @@ struct Node { variant: NodeVariant, } -#[derive(Debug, Serialize)] -#[serde(untagged)] +#[derive(Debug)] + enum NodeVariant { - T0(DocumentNode), - T1(DocumentTypeNode), - T2(ElementNode), - Rest(TextNode), // types 3 (text), 4 (cdata), 5 (comment) -} - -impl<'de> serde::Deserialize<'de> for NodeVariant { - fn deserialize>(d: D) -> Result { - let value = Value::deserialize(d)?; - - match value.get("type") { - Some(val) => match Value::as_u64(val) { - Some(v) => match v { - 0 => match DocumentNode::deserialize(value) { - Ok(document) => Ok(NodeVariant::T0(document)), - Err(_) => Err(DError::custom("could not parse document object.")), - }, - 1 => match DocumentTypeNode::deserialize(value) { - Ok(document_type) => Ok(NodeVariant::T1(document_type)), - Err(_) => Err(DError::custom("could not parse document-type object")), - }, - 2 => match ElementNode::deserialize(value) { - Ok(element) => Ok(NodeVariant::T2(element)), - Err(_) => Err(DError::custom("could not parse element object")), - }, - 3 | 4 | 5 => match TextNode::deserialize(value) { - Ok(text) => Ok(NodeVariant::Rest(text)), - Err(_) => Err(DError::custom("could not parse text object")), - }, - _ => Err(DError::custom("invalid type value")), - }, - None => Err(DError::custom("type field must be an integer")), - }, - None => Err(DError::missing_field("type")), - } - } + T0(Box), + T1(Box), + T2(Box), + T3(Box), // text + T4(Box), // cdata + T5(Box), // comment } #[derive(Debug, Serialize, Deserialize)] struct DocumentNode { id: i32, - #[serde(rename = "type")] - ty: u8, #[serde(rename = "childNodes")] child_nodes: Vec, } @@ -444,8 +372,6 @@ struct DocumentNode { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct DocumentTypeNode { - #[serde(rename = "type")] - ty: u8, id: Value, public_id: Value, system_id: Value, @@ -456,8 +382,6 @@ struct DocumentTypeNode { #[serde(rename_all = "camelCase")] struct ElementNode { id: Value, - #[serde(rename = "type")] - ty: u8, attributes: HashMap, tag_name: String, child_nodes: Vec, @@ -471,8 +395,6 @@ struct ElementNode { #[serde(rename_all = "camelCase")] struct TextNode { id: Value, - #[serde(rename = "type")] - ty: u8, text_content: String, #[serde(skip_serializing_if = "Option::is_none")] is_style: Option, @@ -499,42 +421,16 @@ struct TextNode { /// -> DRAG = 12 /// -> STYLEDECLARATION = 13 -#[derive(Debug, Serialize)] -#[serde(untagged)] +#[derive(Debug)] enum IncrementalSourceDataVariant { - Mutation(MutationIncrementalSourceData), - Input(InputIncrementalSourceData), - Default(Value), -} - -impl<'de> serde::Deserialize<'de> for IncrementalSourceDataVariant { - fn deserialize>(d: D) -> Result { - let value = Value::deserialize(d)?; - - match value.get("source") { - Some(val) => match Value::as_u64(val) { - Some(v) => match v { - 0 => match MutationIncrementalSourceData::deserialize(value) { - Ok(document) => Ok(IncrementalSourceDataVariant::Mutation(document)), - Err(_) => Err(DError::custom("could not parse mutation object.")), - }, - 5 => match InputIncrementalSourceData::deserialize(value) { - Ok(document_type) => Ok(IncrementalSourceDataVariant::Input(document_type)), - Err(_) => Err(DError::custom("could not parse input object")), - }, - _ => Ok(IncrementalSourceDataVariant::Default(value)), - }, - None => Err(DError::custom("type field must be an integer")), - }, - None => Err(DError::missing_field("type")), - } - } + Mutation(Box), + Input(Box), + Default(Box), } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct InputIncrementalSourceData { - source: u8, id: i32, text: String, is_checked: Value, @@ -546,7 +442,6 @@ struct InputIncrementalSourceData { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct MutationIncrementalSourceData { - source: u8, texts: Vec, attributes: Vec, removes: Vec, @@ -555,6 +450,12 @@ struct MutationIncrementalSourceData { is_attach_iframe: Option, } +#[derive(Debug)] +struct DefaultIncrementalSourceData { + pub source: u8, + pub value: Value, +} + #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct MutationAdditionIncrementalSourceData { @@ -658,7 +559,7 @@ mod tests { #[test] fn test_pii_credit_card_removal() { - let payload = include_bytes!("../tests/fixtures/rrweb-pii.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-pii.json"); let mut events: Vec = serde_json::from_slice(payload).unwrap(); recording::strip_pii(&mut events).unwrap(); @@ -669,7 +570,7 @@ mod tests { let dd = cc.adds.pop().unwrap(); if let recording::NodeVariant::T2(mut ee) = dd.node.variant { let ff = ee.child_nodes.pop().unwrap(); - if let recording::NodeVariant::Rest(gg) = ff.variant { + if let recording::NodeVariant::T3(gg) = ff.variant { assert_eq!(gg.text_content, "[creditcard]"); return; } @@ -681,7 +582,7 @@ mod tests { #[test] fn test_scrub_pii_navigation() { - let payload = include_bytes!("../tests/fixtures/rrweb-performance-navigation.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-performance-navigation.json"); let mut events: Vec = serde_json::from_slice(payload).unwrap(); recording::strip_pii(&mut events).unwrap(); @@ -702,7 +603,7 @@ mod tests { #[test] fn test_scrub_pii_resource() { - let payload = include_bytes!("../tests/fixtures/rrweb-performance-resource.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-performance-resource.json"); let mut events: Vec = serde_json::from_slice(payload).unwrap(); recording::strip_pii(&mut events).unwrap(); @@ -723,7 +624,7 @@ mod tests { #[test] fn test_pii_ip_address_removal() { - let payload = include_bytes!("../tests/fixtures/rrweb-pii-ip-address.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-pii-ip-address.json"); let mut events: Vec = serde_json::from_slice(payload).unwrap(); recording::strip_pii(&mut events).unwrap(); @@ -734,7 +635,7 @@ mod tests { let dd = cc.adds.pop().unwrap(); if let recording::NodeVariant::T2(mut ee) = dd.node.variant { let ff = ee.child_nodes.pop().unwrap(); - if let recording::NodeVariant::Rest(gg) = ff.variant { + if let recording::NodeVariant::T3(gg) = ff.variant { assert_eq!(gg.text_content, "[ip]"); return; } @@ -746,7 +647,7 @@ mod tests { #[test] fn test_rrweb_snapshot_parsing() { - let payload = include_bytes!("../tests/fixtures/rrweb.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb.json"); let input_parsed = loads(payload).unwrap(); let input_raw: Value = serde_json::from_slice(payload).unwrap(); @@ -755,7 +656,7 @@ mod tests { #[test] fn test_rrweb_incremental_source_parsing() { - let payload = include_bytes!("../tests/fixtures/rrweb-diff.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-diff.json"); let input_parsed = loads(payload).unwrap(); let input_raw: Value = serde_json::from_slice(payload).unwrap(); @@ -765,7 +666,7 @@ mod tests { // Node coverage #[test] fn test_rrweb_node_2_parsing() { - let payload = include_bytes!("../tests/fixtures/rrweb-node-2.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-node-2.json"); let input_parsed: recording::NodeVariant = serde_json::from_slice(payload).unwrap(); let input_raw: Value = serde_json::from_slice(payload).unwrap(); @@ -774,18 +675,19 @@ mod tests { #[test] fn test_rrweb_node_2_style_parsing() { - let payload = include_bytes!("../tests/fixtures/rrweb-node-2-style.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-node-2-style.json"); let input_parsed: recording::NodeVariant = serde_json::from_slice(payload).unwrap(); let input_raw: Value = serde_json::from_slice(payload).unwrap(); - assert_json_eq!(input_parsed, input_raw) + serde_json::to_string_pretty(&input_parsed).unwrap(); + assert_json_eq!(input_parsed, input_raw); } // Event coverage #[test] fn test_rrweb_event_3_parsing() { - let payload = include_bytes!("../tests/fixtures/rrweb-event-3.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-event-3.json"); let input_parsed: recording::Event = serde_json::from_slice(payload).unwrap(); let input_raw: Value = serde_json::from_slice(payload).unwrap(); @@ -794,10 +696,16 @@ mod tests { #[test] fn test_rrweb_event_5_parsing() { - let payload = include_bytes!("../tests/fixtures/rrweb-event-5.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-event-5.json"); let input_parsed: Vec = serde_json::from_slice(payload).unwrap(); let input_raw: Value = serde_json::from_slice(payload).unwrap(); assert_json_eq!(input_parsed, input_raw); } } + +#[doc(hidden)] +/// Only used in benchmarks. +pub fn _deserialize_event(payload: &[u8]) { + let _: Vec = serde_json::from_slice(payload).unwrap(); +} diff --git a/relay-replays/src/recording/serialization.rs b/relay-replays/src/recording/serialization.rs new file mode 100644 index 00000000000..b42d9559076 --- /dev/null +++ b/relay-replays/src/recording/serialization.rs @@ -0,0 +1,246 @@ +use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::Value; + +use crate::recording::*; + +/// Implementation tweaked from serde's `derive(Deserialize)` for internally tagged enums, +/// in order to work with integer tags. +impl<'de> Deserialize<'de> for Event { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let tagged = match Deserializer::deserialize_any( + d, + // NOTE: Use of this private API is discouraged by serde, but we need it for + // efficient deserialization of these large, recursive structures into + // internally tagged enums with integer tags. + // Ideally, we would write our own `derive` for this, or contribute to serde + // to support integer tags out of the box. + serde::__private::de::TaggedContentVisitor::::new( + "type", + "internally tagged enum Event", + ), + ) { + Ok(val) => val, + Err(err) => return Err(err), + }; + let content_deserializer = + serde::__private::de::ContentDeserializer::::new(tagged.content); + match tagged.tag { + 0 => Value::deserialize(content_deserializer).map(Event::T0), + 1 => Value::deserialize(content_deserializer).map(Event::T1), + 2 => Box::::deserialize(content_deserializer).map(Event::T2), + 3 => Box::::deserialize(content_deserializer).map(Event::T3), + 4 => Box::::deserialize(content_deserializer).map(Event::T4), + 5 => Box::::deserialize(content_deserializer).map(Event::T5), + 6 => Value::deserialize(content_deserializer).map(Event::T6), + value => Err(serde::de::Error::invalid_value( + serde::de::Unexpected::Unsigned(value as u64), + &"type id 0 <= i < 7", + )), + } + } +} + +/// Helper for [`Event`] serialization. +#[derive(Serialize)] +#[serde(untagged)] +enum InnerEvent<'a> { + T0(&'a Value), // 0: DOMContentLoadedEvent, + T1(&'a Value), // 1: LoadEvent, + T2(&'a FullSnapshotEvent), + T3(&'a IncrementalSnapshotEvent), + T4(&'a MetaEvent), + T5(&'a CustomEvent), + T6(&'a Value), // 6: PluginEvent, +} + +/// Helper for [`Event`] serialization. +#[derive(Serialize)] +struct OuterEvent<'a> { + #[serde(rename = "type")] + ty: u8, + #[serde(flatten)] + inner: InnerEvent<'a>, +} + +impl<'a> OuterEvent<'a> { + fn new(ty: u8, inner: InnerEvent<'a>) -> Self { + Self { ty, inner } + } +} + +impl Serialize for Event { + fn serialize(&self, s: S) -> Result + where + S: serde::Serializer, + { + match self { + Event::T0(c) => OuterEvent::new(0, InnerEvent::T0(c)), + Event::T1(c) => OuterEvent::new(1, InnerEvent::T1(c)), + Event::T2(c) => OuterEvent::new(2, InnerEvent::T2(c)), + Event::T3(c) => OuterEvent::new(3, InnerEvent::T3(c)), + Event::T4(c) => OuterEvent::new(4, InnerEvent::T4(c)), + Event::T5(c) => OuterEvent::new(5, InnerEvent::T5(c)), + Event::T6(c) => OuterEvent::new(6, InnerEvent::T6(c)), + } + .serialize(s) + } +} + +/// Implementation tweaked from serde's `derive(Deserialize)` for internally tagged enums, +/// in order to work with integer tags. +impl<'de> Deserialize<'de> for NodeVariant { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let tagged = match Deserializer::deserialize_any( + d, + serde::__private::de::TaggedContentVisitor::::new( + "type", + "internally tagged enum NodeVariant", + ), + ) { + Ok(val) => val, + Err(err) => return Err(err), + }; + + let content_deserializer = + serde::__private::de::ContentDeserializer::::new(tagged.content); + match tagged.tag { + 0 => Box::::deserialize(content_deserializer).map(NodeVariant::T0), + 1 => Box::::deserialize(content_deserializer).map(NodeVariant::T1), + 2 => Box::::deserialize(content_deserializer).map(NodeVariant::T2), + 3 => Box::::deserialize(content_deserializer).map(NodeVariant::T3), + 4 => Box::::deserialize(content_deserializer).map(NodeVariant::T4), + 5 => Box::::deserialize(content_deserializer).map(NodeVariant::T5), + value => Err(serde::de::Error::invalid_value( + serde::de::Unexpected::Unsigned(value as u64), + &"type id 0 <= i < 6", + )), + } + } +} + +/// Helper for [`NodeVariant`] serialization. +#[derive(Serialize)] +#[serde(untagged)] +enum InnerNodeVariant<'a> { + T0(&'a DocumentNode), + T1(&'a DocumentTypeNode), + T2(&'a ElementNode), + T3(&'a TextNode), // text + T4(&'a TextNode), // cdata + T5(&'a TextNode), // comment +} + +/// Helper for [`NodeVariant`] serialization. +#[derive(Serialize)] +struct OuterNodeVariant<'a> { + #[serde(rename = "type")] + ty: u8, + #[serde(flatten)] + inner: InnerNodeVariant<'a>, +} + +impl<'a> OuterNodeVariant<'a> { + fn new(ty: u8, inner: InnerNodeVariant<'a>) -> Self { + Self { ty, inner } + } +} + +impl Serialize for NodeVariant { + fn serialize(&self, s: S) -> Result + where + S: serde::Serializer, + { + match self { + NodeVariant::T0(c) => OuterNodeVariant::new(0, InnerNodeVariant::T0(c)), + NodeVariant::T1(c) => OuterNodeVariant::new(1, InnerNodeVariant::T1(c)), + NodeVariant::T2(c) => OuterNodeVariant::new(2, InnerNodeVariant::T2(c)), + NodeVariant::T3(c) => OuterNodeVariant::new(3, InnerNodeVariant::T3(c)), + NodeVariant::T4(c) => OuterNodeVariant::new(4, InnerNodeVariant::T4(c)), + NodeVariant::T5(c) => OuterNodeVariant::new(5, InnerNodeVariant::T5(c)), + } + .serialize(s) + } +} + +/// Implementation tweaked from serde's `derive(Deserialize)` for internally tagged enums, +/// in order to work with integer tags. +impl<'de> Deserialize<'de> for IncrementalSourceDataVariant { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let tagged = match Deserializer::deserialize_any( + d, + serde::__private::de::TaggedContentVisitor::::new( + "source", + "internally tagged enum IncrementalSourceDataVariant", + ), + ) { + Ok(val) => val, + Err(err) => return Err(err), + }; + let content_deserializer = + serde::__private::de::ContentDeserializer::::new(tagged.content); + match tagged.tag { + 0 => Box::::deserialize(content_deserializer) + .map(IncrementalSourceDataVariant::Mutation), + 5 => Box::::deserialize(content_deserializer) + .map(IncrementalSourceDataVariant::Input), + source => Value::deserialize(content_deserializer).map(|value| { + IncrementalSourceDataVariant::Default(Box::new(DefaultIncrementalSourceData { + source, + value, + })) + }), + } + } +} + +/// Helper for [`IncrementalSourceDataVariant`] serialization. +#[derive(Serialize)] +#[serde(untagged)] +enum InnerISDV<'a> { + Mutation(&'a MutationIncrementalSourceData), + Input(&'a InputIncrementalSourceData), + Default(&'a Value), +} + +/// Helper for [`IncrementalSourceDataVariant`] serialization. +#[derive(Serialize)] +struct OuterISDV<'a> { + source: u8, + #[serde(flatten)] + inner: InnerISDV<'a>, +} + +impl<'a> OuterISDV<'a> { + fn new(source: u8, inner: InnerISDV<'a>) -> Self { + Self { source, inner } + } +} + +impl Serialize for IncrementalSourceDataVariant { + fn serialize(&self, s: S) -> Result + where + S: serde::Serializer, + { + match self { + IncrementalSourceDataVariant::Mutation(m) => { + OuterISDV::new(0, InnerISDV::Mutation(m.as_ref())) + } + IncrementalSourceDataVariant::Input(i) => { + OuterISDV::new(5, InnerISDV::Input(i.as_ref())) + } + IncrementalSourceDataVariant::Default(v) => { + OuterISDV::new(v.source, InnerISDV::Default(&v.value)) + } + } + .serialize(s) + } +} diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index 020d117f1d4..aac5fbbe542 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -1093,9 +1093,7 @@ impl EnvelopeProcessorService { } } ItemType::ReplayRecording => { - // XXX: Temporarily, only the Sentry org will be allowed to parse replays while - // we measure the impact of this change. - if replays_enabled && state.project_state.organization_id == Some(1) { + if replays_enabled { // Limit expansion of recordings to the max replay size. The payload is // decompressed temporarily and then immediately re-compressed. However, to // limit memory pressure, we use the replay limit as a good overall limit for diff --git a/tests/integration/test_replay_recordings.py b/tests/integration/test_replay_recordings.py index 3aca9ebc681..72c6f4775b3 100644 --- a/tests/integration/test_replay_recordings.py +++ b/tests/integration/test_replay_recordings.py @@ -1,4 +1,4 @@ -import time +import zlib from sentry_sdk.envelope import Envelope, Item, PayloadRef @@ -92,8 +92,6 @@ def test_chunked_replay_recordings_processing( assert replay_recording["received"] assert type(replay_recording["received"]) == int - outcomes_consumer.assert_empty() - def test_nonchunked_replay_recordings_processing( mini_sentry, relay_with_processing, replay_recordings_consumer, outcomes_consumer @@ -117,7 +115,8 @@ def test_nonchunked_replay_recordings_processing( ["attachment_type", "replay_recording"], ] ) - envelope.add_item(Item(payload=PayloadRef(bytes=b"test"), type="replay_recording")) + payload = recording_payload(b"[]") + envelope.add_item(Item(payload=PayloadRef(bytes=payload), type="replay_recording")) relay.send_envelope(project_id, envelope) @@ -129,7 +128,12 @@ def test_nonchunked_replay_recordings_processing( assert replay_recording["org_id"] == org_id assert type(replay_recording["received"]) == int assert replay_recording["retention_days"] == 90 - assert replay_recording["payload"] == b"test" + assert replay_recording["payload"] == payload assert replay_recording["type"] == "replay_recording_not_chunked" outcomes_consumer.assert_empty() + + +def recording_payload(bits: bytes): + compressed_payload = zlib.compress(bits) + return b'{"segment_id": 0}\n' + compressed_payload