diff --git a/Cargo.lock b/Cargo.lock index 25bb8190c7a..902d8ebd9bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1318,6 +1318,12 @@ dependencies = [ "slab", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "fuchsia-cprng" version = "0.1.1" @@ -3471,7 +3477,6 @@ dependencies = [ "criterion", "flate2", "insta", - "once_cell", "rand 0.8.5", "rand_pcg 0.3.1", "relay-common", @@ -3479,7 +3484,6 @@ dependencies = [ "relay-general", "relay-log", "serde", - "serde-transcode", "serde_json", "unicase", ] @@ -3556,6 +3560,7 @@ dependencies = [ "symbolic-unreal", "take_mut", "thiserror", + "tikv-jemallocator", "tokio 1.19.2", "tokio-timer", "url 2.2.2", @@ -4014,15 +4019,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde-transcode" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "590c0e25c2a5bb6e85bf5c1bce768ceb86b316e7a01bdf07d2cb4ec2271990e2" -dependencies = [ - "serde", -] - [[package]] name = "serde_derive" version = "1.0.137" @@ -4474,6 +4470,27 @@ dependencies = [ "syn 1.0.96", ] +[[package]] +name = "tikv-jemalloc-sys" +version = "0.5.2+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec45c14da997d0925c7835883e4d5c181f196fa142f8c19d7643d1e9af2592c3" +dependencies = [ + "cc", + "fs_extra", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20612db8a13a6c06d57ec83953694185a367e16945f66565e8028d2c0bd76979" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.1.44" diff --git a/Dockerfile b/Dockerfile index df139474137..19a31edaaa7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ ARG DOCKER_ARCH=amd64 ### Deps stage ### ################## -FROM getsentry/sentry-cli:1 AS sentry-cli +FROM getsentry/sentry-cli:2 AS sentry-cli FROM $DOCKER_ARCH/centos:7 AS relay-deps ARG DOCKER_ARCH @@ -47,6 +47,7 @@ FROM relay-deps AS relay-builder ARG RELAY_FEATURES=ssl,processing,crash-handler ENV RELAY_FEATURES=${RELAY_FEATURES} +ENV JEMALLOC_SYS_WITH_MALLOC_CONF=background_thread:true,abort_conf:true COPY . . diff --git a/relay-replays/Cargo.toml b/relay-replays/Cargo.toml index 4cf6713c6b6..acc4ed30116 100644 --- a/relay-replays/Cargo.toml +++ b/relay-replays/Cargo.toml @@ -10,18 +10,16 @@ license-file = "../LICENSE" publish = false [dependencies] -flate2 = "1.0.19" -once_cell = "1.13.1" -rand = "0.8.5" -rand_pcg = "0.3.1" relay-common = { path = "../relay-common" } -relay-filter = { path = "../relay-filter" } relay-general = { path = "../relay-general" } relay-log = { path = "../relay-log" } serde = { version = "1.0.114", features = ["derive"] } serde_json = "1.0.55" -serde-transcode = "1.1.1" +relay-filter = { path = "../relay-filter" } +rand = "0.8.5" +rand_pcg = "0.3.1" unicase = "2.6.0" +flate2 = "1.0.19" [dev-dependencies] criterion = "0.4" diff --git a/relay-replays/benches/benchmarks.rs b/relay-replays/benches/benchmarks.rs index 091ba4e3fea..de7514fb3d5 100644 --- a/relay-replays/benches/benchmarks.rs +++ b/relay-replays/benches/benchmarks.rs @@ -1,34 +1,12 @@ -use std::io::Read; - use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; -use flate2::{bufread::ZlibEncoder, Compression}; -use relay_general::pii::DataScrubbingConfig; -use relay_replays::recording::ReplayScrubber; +use relay_replays::recording::_deserialize_event; fn bench_recording(c: &mut Criterion) { let payload = include_bytes!("../tests/fixtures/rrweb.json"); - // Compress the payload to mimic real-world behavior. The replay processor can also handle - // uncompressed payloads, but those happen infrequently. - let mut compressed = Vec::new(); - let mut encoder = ZlibEncoder::new(payload.as_slice(), Compression::default()); - encoder.read_to_end(&mut compressed).unwrap(); - - let mut scrubbing_config = DataScrubbingConfig::default(); - scrubbing_config.scrub_data = true; - scrubbing_config.scrub_defaults = true; - scrubbing_config.scrub_ip_addresses = true; - let pii_config = scrubbing_config.pii_config_uncached().unwrap().unwrap(); - - let mut scrubber = ReplayScrubber::new(usize::MAX, Some(&pii_config), None); - - c.bench_with_input(BenchmarkId::new("rrweb", 1), &compressed, |b, &_| { - b.iter(|| { - let mut buf = Vec::new(); - scrubber.transcode_replay(&compressed, &mut buf).ok(); - buf - }); + c.bench_with_input(BenchmarkId::new("rrweb", 1), &payload, |b, &_| { + b.iter(|| _deserialize_event(payload)); }); } diff --git a/relay-replays/src/lib.rs b/relay-replays/src/lib.rs index 7f3fbf258ad..9a606f4919e 100644 --- a/relay-replays/src/lib.rs +++ b/relay-replays/src/lib.rs @@ -1,2 +1 @@ pub mod recording; -mod transform; diff --git a/relay-replays/src/recording.rs b/relay-replays/src/recording.rs deleted file mode 100644 index 1a53361a402..00000000000 --- a/relay-replays/src/recording.rs +++ /dev/null @@ -1,382 +0,0 @@ -use std::borrow::Cow; -use std::fmt; -use std::io::{BufReader, Read}; - -use flate2::{bufread::ZlibDecoder, write::ZlibEncoder, Compression}; -use once_cell::sync::Lazy; - -use relay_general::pii::{PiiConfig, PiiProcessor}; -use relay_general::processor::{FieldAttrs, Pii, ProcessingState, Processor, ValueType}; -use relay_general::types::Meta; - -use crate::transform::{self, Transform}; - -#[derive(Debug)] -pub enum ParseRecordingError { - Json(serde_json::Error), - Message(&'static str), -} - -impl fmt::Display for ParseRecordingError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ParseRecordingError::Json(serde_error) => write!(f, "{serde_error}"), - ParseRecordingError::Message(message) => write!(f, "{message}"), - } - } -} - -impl std::error::Error for ParseRecordingError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - ParseRecordingError::Json(e) => Some(e), - ParseRecordingError::Message(_) => None, - } - } -} - -impl From for ParseRecordingError { - fn from(err: serde_json::Error) -> Self { - ParseRecordingError::Json(err) - } -} - -static STRING_STATE: Lazy = Lazy::new(|| { - ProcessingState::root().enter_static( - "", - Some(Cow::Owned(FieldAttrs::new().pii(Pii::True))), - Some(ValueType::String), - ) -}); - -/// A utility that performs data scrubbing on Replay payloads. -pub struct ReplayScrubber<'a> { - limit: usize, - processor1: Option>, - processor2: Option>, -} - -impl<'a> ReplayScrubber<'a> { - /// Creates a new `ReplayScrubber` from PII configs. - /// - /// `limit` controls the maximum size in bytes during decompression. This function returns an - /// `Err` if decompressed contents exceed the limit. The two optional configs to be passed here - /// are from data scrubbing settings and from the dedicated PII config. - pub fn new( - limit: usize, - config1: Option<&'a PiiConfig>, - config2: Option<&'a PiiConfig>, - ) -> Self { - Self { - limit, - processor1: config1.map(|c| PiiProcessor::new(c.compiled())), - processor2: config2.map(|c| PiiProcessor::new(c.compiled())), - } - } - - /// Returns `true` if both configs are empty and no scrubbing would occur. - pub fn is_empty(&self) -> bool { - self.processor1.is_none() && self.processor2.is_none() - } - - fn scrub_replay(&mut self, read: R, write: W) -> Result<(), ParseRecordingError> - where - R: std::io::Read, - W: std::io::Write, - { - let mut deserializer = serde_json::Deserializer::from_reader(read); - let mut serializer = serde_json::Serializer::new(write); - - let transformer = transform::Deserializer::new(&mut deserializer, self); - serde_transcode::transcode(transformer, &mut serializer)?; - - Ok(()) - } - - #[doc(hidden)] // Public for benchmarks. - pub fn transcode_replay( - &mut self, - body: &[u8], - output: &mut Vec, - ) -> Result<(), ParseRecordingError> { - let encoder = ZlibEncoder::new(output, Compression::default()); - - if body.first() == Some(&b'[') { - self.scrub_replay(body, encoder) - } else { - let decoder = ZlibDecoder::new(body).take(self.limit as u64); - self.scrub_replay(BufReader::new(decoder), encoder) - } - } - - /// Parses compressed replay recording payloads and applies data scrubbers. - /// - /// To avoid redundant parsing, check [`is_empty`](Self::is_empty) first. - pub fn process_recording(&mut self, bytes: &[u8]) -> Result, ParseRecordingError> { - // Check for null byte condition. - if bytes.is_empty() { - return Err(ParseRecordingError::Message("no data found")); - } - - let mut split = bytes.splitn(2, |b| b == &b'\n'); - let header = split - .next() - .ok_or(ParseRecordingError::Message("no headers found"))?; - - let body = match split.next() { - Some(b"") | None => return Err(ParseRecordingError::Message("no body found")), - Some(body) => body, - }; - - let mut output = header.to_owned(); - output.push(b'\n'); - // Data scrubbing usually does not change the size of the output by much. We can preallocate - // enough space for the scrubbed output to avoid resizing the output buffer serveral times. - // Benchmarks have NOT shown a big difference, however. - output.reserve(body.len()); - self.transcode_replay(body, &mut output)?; - - Ok(output) - } -} - -impl Transform for &'_ mut ReplayScrubber<'_> { - fn transform_str<'a>(&mut self, v: &'a str) -> Cow<'a, str> { - self.transform_string(v.to_owned()) - } - - fn transform_string(&mut self, mut value: String) -> Cow<'static, str> { - if let Some(ref mut processor) = self.processor1 { - if processor - .process_string(&mut value, &mut Meta::default(), &STRING_STATE) - .is_err() - { - return Cow::Borrowed(""); - } - } - - if let Some(ref mut processor) = self.processor2 { - if processor - .process_string(&mut value, &mut Meta::default(), &STRING_STATE) - .is_err() - { - return Cow::Borrowed(""); - } - } - - Cow::Owned(value) - } -} - -#[cfg(test)] -mod tests { - // End to end test coverage. - - use relay_general::pii::{DataScrubbingConfig, PiiConfig}; - - use super::ReplayScrubber; - - fn default_pii_config() -> PiiConfig { - let mut scrubbing_config = DataScrubbingConfig::default(); - scrubbing_config.scrub_data = true; - scrubbing_config.scrub_defaults = true; - scrubbing_config.scrub_ip_addresses = true; - scrubbing_config.pii_config_uncached().unwrap().unwrap() - } - - fn scrubber(config: &PiiConfig) -> ReplayScrubber { - ReplayScrubber::new(usize::MAX, Some(config), None) - } - - #[test] - fn test_process_recording_end_to_end() { - // Valid compressed rrweb payload. Contains a 16 byte header followed by a new line - // character and concludes with a gzipped rrweb payload. - let payload: &[u8] = &[ - 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 120, - 156, 149, 144, 91, 106, 196, 32, 20, 64, 247, 114, 191, 237, 160, 241, 145, 234, 38, - 102, 1, 195, 124, 152, 104, 6, 33, 169, 193, 40, 52, 4, 247, 94, 91, 103, 40, 20, 108, - 59, 191, 247, 30, 207, 225, 122, 57, 32, 238, 171, 5, 69, 17, 24, 29, 53, 168, 3, 54, - 159, 194, 88, 70, 4, 193, 234, 55, 23, 157, 127, 219, 64, 93, 14, 120, 7, 37, 100, 1, - 119, 80, 29, 102, 8, 156, 1, 213, 11, 4, 209, 45, 246, 60, 77, 155, 141, 160, 94, 232, - 43, 206, 232, 206, 118, 127, 176, 132, 177, 7, 203, 42, 75, 36, 175, 44, 231, 63, 88, - 217, 229, 107, 174, 179, 45, 234, 101, 45, 172, 232, 49, 163, 84, 22, 191, 232, 63, 61, - 207, 93, 130, 229, 189, 216, 53, 138, 84, 182, 139, 178, 199, 191, 22, 139, 179, 238, - 196, 227, 244, 134, 137, 240, 158, 60, 101, 34, 255, 18, 241, 6, 116, 42, 212, 119, 35, - 234, 27, 40, 24, 130, 213, 102, 12, 105, 25, 160, 252, 147, 222, 103, 175, 205, 215, - 182, 45, 168, 17, 48, 118, 210, 105, 142, 229, 217, 168, 163, 189, 249, 80, 254, 19, - 146, 59, 13, 115, 10, 144, 115, 190, 126, 0, 2, 68, 180, 16, - ]; - - let config = default_pii_config(); - let result = scrubber(&config).process_recording(payload); - assert!(!result.unwrap().is_empty()); - } - - #[test] - fn test_process_recording_no_body_data() { - // Empty bodies can not be decompressed and fail. - let payload: &[u8] = &[ - 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, - ]; - - let config = default_pii_config(); - let result = scrubber(&config).process_recording(payload); - assert!(matches!( - result.unwrap_err(), - super::ParseRecordingError::Message("no body found"), - )); - } - - #[test] - fn test_process_recording_bad_body_data() { - // Invalid gzip body contents. Can not deflate. - let payload: &[u8] = &[ - 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 22, - ]; - - let config = default_pii_config(); - let result = scrubber(&config).process_recording(payload); - assert!(matches!( - result.unwrap_err(), - super::ParseRecordingError::Json(_), - )); - } - - #[test] - fn test_process_recording_no_headers() { - // No header delimiter. Entire payload is consumed as headers. The empty body fails. - let payload: &[u8] = &[ - 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, - ]; - - let config = default_pii_config(); - let result = scrubber(&config).process_recording(payload); - assert!(matches!( - result.unwrap_err(), - super::ParseRecordingError::Message("no body found"), - )); - } - - #[test] - fn test_process_recording_no_contents() { - // Empty payload can not be decompressed. Header check never fails. - let payload: &[u8] = &[]; - - let config = default_pii_config(); - let result = scrubber(&config).process_recording(payload); - assert!(matches!( - result.unwrap_err(), - super::ParseRecordingError::Message("no data found"), - )); - } - - // RRWeb Payload Coverage - - #[test] - fn test_pii_credit_card_removal() { - let payload = include_bytes!("../tests/fixtures/rrweb-pii.json"); - - let mut transcoded = Vec::new(); - let config = default_pii_config(); - scrubber(&config) - .scrub_replay(payload.as_slice(), &mut transcoded) - .unwrap(); - - let parsed = std::str::from_utf8(&transcoded).unwrap(); - assert!(parsed.contains(r#"{"type":3,"textContent":"[Filtered]","id":284}"#)); - } - - #[test] - fn test_scrub_pii_navigation() { - let payload = include_bytes!("../tests/fixtures/rrweb-performance-navigation.json"); - - let mut transcoded = Vec::new(); - let config = default_pii_config(); - scrubber(&config) - .scrub_replay(payload.as_slice(), &mut transcoded) - .unwrap(); - - let parsed = std::str::from_utf8(&transcoded).unwrap(); - assert!(parsed.contains("https://sentry.io?credit-card=[Filtered]")); - } - - #[test] - fn test_scrub_pii_resource() { - let payload = include_bytes!("../tests/fixtures/rrweb-performance-resource.json"); - - let mut transcoded = Vec::new(); - let config = default_pii_config(); - scrubber(&config) - .scrub_replay(payload.as_slice(), &mut transcoded) - .unwrap(); - - let parsed = std::str::from_utf8(&transcoded).unwrap(); - assert!(parsed.contains("https://sentry.io?credit-card=[Filtered]")); - } - - #[test] - fn test_pii_ip_address_removal() { - let payload = include_bytes!("../tests/fixtures/rrweb-pii-ip-address.json"); - - let mut transcoded = Vec::new(); - let config = default_pii_config(); - scrubber(&config) - .scrub_replay(payload.as_slice(), &mut transcoded) - .unwrap(); - - let parsed = std::str::from_utf8(&transcoded).unwrap(); - assert!(parsed.contains("\"value\":\"[ip]\"")); // Assert texts were mutated. - assert!(parsed.contains("\"textContent\":\"[ip]\"")) // Assert text node was mutated. - } - - // Event Parsing and Scrubbing. - - #[test] - fn test_scrub_pii_full_snapshot_event() { - let payload = include_bytes!("../tests/fixtures/rrweb-event-2.json"); - - let mut transcoded = Vec::new(); - let config = default_pii_config(); - scrubber(&config) - .scrub_replay(payload.as_slice(), &mut transcoded) - .unwrap(); - - let scrubbed_result = std::str::from_utf8(&transcoded).unwrap(); - // NOTE: The normalization below was removed - // assert!(scrubbed_result.contains("\"attributes\":{\"src\":\"#\"}")); - assert!(scrubbed_result.contains("\"textContent\":\"my ssn is [Filtered]\"")); - } - - #[test] - fn test_scrub_pii_incremental_snapshot_event() { - let payload = include_bytes!("../tests/fixtures/rrweb-event-3.json"); - - let mut transcoded = Vec::new(); - let config = default_pii_config(); - scrubber(&config) - .scrub_replay(payload.as_slice(), &mut transcoded) - .unwrap(); - - let scrubbed_result = std::str::from_utf8(&transcoded).unwrap(); - assert!(scrubbed_result.contains("\"textContent\":\"[Filtered]\"")); - assert!(scrubbed_result.contains("\"value\":\"[Filtered]\"")); - } - - #[test] - fn test_scrub_pii_custom_event() { - let payload = include_bytes!("../tests/fixtures/rrweb-event-5.json"); - - let mut transcoded = Vec::new(); - let config = default_pii_config(); - scrubber(&config) - .scrub_replay(payload.as_slice(), &mut transcoded) - .unwrap(); - - let scrubbed_result = std::str::from_utf8(&transcoded).unwrap(); - assert!(scrubbed_result.contains("\"description\":\"[Filtered]\"")); - assert!(scrubbed_result.contains("\"description\":\"https://sentry.io?ip-address=[ip]\"")); - // NOTE: default scrubbers do not remove email address - // assert!(scrubbed_result.contains("\"message\":\"[email]\"")); - } -} diff --git a/relay-replays/src/recording/mod.rs b/relay-replays/src/recording/mod.rs new file mode 100644 index 00000000000..1868b6975e3 --- /dev/null +++ b/relay-replays/src/recording/mod.rs @@ -0,0 +1,723 @@ +use std::borrow::Cow; +use std::collections::{BTreeMap, HashMap}; +use std::fmt::Display; +use std::io::{Read, Write}; + +use relay_general::pii::{PiiConfig, PiiProcessor}; +use relay_general::processor::{ + FieldAttrs, Pii, ProcessingState, Processor, SelectorSpec, ValueType, +}; +use relay_general::types::{Meta, ProcessingAction}; + +use flate2::{read::ZlibDecoder, write::ZlibEncoder, Compression}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +mod serialization; + +/// Parses compressed replay recording payloads and applies data scrubbers. +/// +/// `limit` controls the maximum size in bytes during decompression. This function returns an `Err` +/// if decompressed contents exceed the limit. +pub fn process_recording(bytes: &[u8], limit: usize) -> Result, RecordingParseError> { + // Check for null byte condition. + if bytes.is_empty() { + return Err(RecordingParseError::Message("no data found")); + } + + let mut split = bytes.splitn(2, |b| b == &b'\n'); + let header = split + .next() + .ok_or(RecordingParseError::Message("no headers found"))?; + + let body = match split.next() { + Some(b"") | None => return Err(RecordingParseError::Message("no body found")), + Some(body) => body, + }; + + let mut events = deserialize_compressed(body, limit)?; + strip_pii(&mut events).map_err(RecordingParseError::ProcessingAction)?; + let out_bytes = serialize_compressed(events)?; + Ok([header.into(), vec![b'\n'], out_bytes].concat()) +} + +fn deserialize_compressed( + zipped_input: &[u8], + limit: usize, +) -> Result, RecordingParseError> { + let decoder = ZlibDecoder::new(zipped_input); + + let mut buffer = Vec::new(); + decoder.take(limit as u64).read_to_end(&mut buffer)?; + + Ok(serde_json::from_slice(&buffer)?) +} + +fn serialize_compressed(rrweb: Vec) -> Result, RecordingParseError> { + let buffer = serde_json::to_vec(&rrweb)?; + + let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&buffer)?; + let result = encoder.finish()?; + Ok(result) +} + +fn strip_pii(events: &mut Vec) -> Result<(), ProcessingAction> { + let mut pii_config = PiiConfig::default(); + pii_config.applications = + BTreeMap::from([(SelectorSpec::And(vec![]), vec!["@common".to_string()])]); + + let pii_processor = PiiProcessor::new(pii_config.compiled()); + let mut processor = RecordingProcessor::new(pii_processor); + processor.mask_pii(events)?; + + Ok(()) +} + +// Error + +#[derive(Debug)] +pub enum RecordingParseError { + Json(serde_json::Error), + Compression(std::io::Error), + Message(&'static str), + ProcessingAction(ProcessingAction), +} + +impl Display for RecordingParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RecordingParseError::Json(serde_error) => write!(f, "{serde_error}"), + RecordingParseError::Compression(io_error) => write!(f, "{io_error}"), + RecordingParseError::Message(message) => write!(f, "{message}"), + RecordingParseError::ProcessingAction(action) => write!(f, "{action}"), + } + } +} + +impl std::error::Error for RecordingParseError {} + +impl From for RecordingParseError { + fn from(err: serde_json::Error) -> Self { + RecordingParseError::Json(err) + } +} + +impl From for RecordingParseError { + fn from(err: std::io::Error) -> Self { + RecordingParseError::Compression(err) + } +} + +// Recording Processor + +struct RecordingProcessor<'a> { + pii_processor: PiiProcessor<'a>, +} + +impl RecordingProcessor<'_> { + fn new(pii_processor: PiiProcessor) -> RecordingProcessor { + RecordingProcessor { pii_processor } + } + + fn mask_pii(&mut self, events: &mut Vec) -> Result<(), ProcessingAction> { + for event in events { + match event { + Event::T2(variant) => self.recurse_snapshot_node(&mut variant.data.node)?, + Event::T3(variant) => self.recurse_incremental_source(&mut variant.data)?, + Event::T5(variant) => self.recurse_custom_event(variant)?, + _ => {} + }; + } + Ok(()) + } + + fn recurse_incremental_source( + &mut self, + variant: &mut IncrementalSourceDataVariant, + ) -> Result<(), ProcessingAction> { + match variant { + IncrementalSourceDataVariant::Mutation(mutation) => { + for text in &mut mutation.texts { + self.strip_pii(&mut text.value)? + } + for addition in &mut mutation.adds { + self.recurse_snapshot_node(&mut addition.node)? + } + } + IncrementalSourceDataVariant::Input(input) => self.strip_pii(&mut input.text)?, + _ => {} + } + + Ok(()) + } + + fn recurse_snapshot_node(&mut self, node: &mut Node) -> Result<(), ProcessingAction> { + match &mut node.variant { + NodeVariant::T0(document) => { + for node in &mut document.child_nodes { + self.recurse_snapshot_node(node)? + } + } + NodeVariant::T2(element) => self.recurse_element(element)?, + NodeVariant::T3(text) | NodeVariant::T4(text) | NodeVariant::T5(text) => { + self.strip_pii(&mut text.text_content)?; + } + + _ => {} + } + + Ok(()) + } + + fn recurse_custom_event(&mut self, event: &mut CustomEvent) -> Result<(), ProcessingAction> { + match &mut event.data { + CustomEventDataVariant::Breadcrumb(breadcrumb) => match &mut breadcrumb.payload.message + { + Some(message) => self.strip_pii(message)?, + None => {} + }, + CustomEventDataVariant::PerformanceSpan(span) => { + self.strip_pii(&mut span.payload.description)?; + } + } + + Ok(()) + } + + fn recurse_element(&mut self, element: &mut ElementNode) -> Result<(), ProcessingAction> { + match element.tag_name.as_str() { + "script" | "style" => {} + "img" | "source" => { + let attrs = &mut element.attributes; + attrs.insert("src".to_string(), Value::String("#".to_string())); + self.recurse_element_children(element)? + } + _ => self.recurse_element_children(element)?, + } + + Ok(()) + } + + fn recurse_element_children( + &mut self, + element: &mut ElementNode, + ) -> Result<(), ProcessingAction> { + for node in &mut element.child_nodes { + self.recurse_snapshot_node(node)? + } + + Ok(()) + } + + fn strip_pii(&mut self, value: &mut String) -> Result<(), ProcessingAction> { + let field_attrs = Cow::Owned(FieldAttrs::new().pii(Pii::True)); + let processing_state = + ProcessingState::root().enter_static("", Some(field_attrs), Some(ValueType::String)); + self.pii_processor + .process_string(value, &mut Meta::default(), &processing_state)?; + + Ok(()) + } +} + +/// Event Type Parser +/// +/// Events have an internally tagged variant on their "type" field. The type must be one of seven +/// values. There are no default types for this variation. Because the "type" field's values are +/// integers we must define custom deserailization behavior. +/// +/// -> DOMCONTENTLOADED = 0 +/// -> LOAD = 1 +/// -> FULLSNAPSHOT = 2 +/// -> INCREMENTALSNAPSHOT = 3 +/// -> META = 4 +/// -> CUSTOM = 5 +/// -> PLUGIN = 6 + +#[derive(Debug)] +enum Event { + T0(Value), // 0: DOMContentLoadedEvent, + T1(Value), // 1: LoadEvent, + T2(Box), + T3(Box), + T4(Box), + T5(Box), + T6(Value), // 6: PluginEvent, +} + +#[derive(Debug, Serialize, Deserialize)] +struct FullSnapshotEvent { + timestamp: u64, + data: FullSnapshotEventData, +} + +#[derive(Debug, Serialize, Deserialize)] +struct FullSnapshotEventData { + node: Node, + #[serde(rename = "initialOffset")] + initial_offset: Value, +} + +#[derive(Debug, Serialize, Deserialize)] +struct IncrementalSnapshotEvent { + timestamp: u64, + data: IncrementalSourceDataVariant, +} + +#[derive(Debug, Serialize, Deserialize)] +struct MetaEvent { + timestamp: u64, + data: Value, +} + +#[derive(Debug, Serialize, Deserialize)] +struct CustomEvent { + timestamp: f64, + data: CustomEventDataVariant, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +enum CustomEventDataVariant { + #[serde(rename = "breadcrumb")] + Breadcrumb(Box), + #[serde(rename = "performanceSpan")] + PerformanceSpan(Box), +} + +#[derive(Debug, Serialize, Deserialize)] +struct Breadcrumb { + tag: String, + payload: BreadcrumbPayload, +} + +#[derive(Debug, Serialize, Deserialize)] +struct BreadcrumbPayload { + #[serde(rename = "type")] + ty: String, + timestamp: f64, + #[serde(skip_serializing_if = "Option::is_none")] + category: Option, + #[serde(skip_serializing_if = "Option::is_none")] + level: Option, + #[serde(skip_serializing_if = "Option::is_none")] + message: Option, + #[serde(skip_serializing_if = "Option::is_none")] + data: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct PerformanceSpan { + tag: String, + payload: PerformanceSpanPayload, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PerformanceSpanPayload { + op: String, + description: String, + start_timestamp: f64, + end_timestamp: f64, + #[serde(skip_serializing_if = "Option::is_none")] + data: Option, +} + +/// Node Type Parser +/// +/// Nodes have an internally tagged variant on their "type" field. The type must be one of six +/// values. There are no default types for this variation. Because the "type" field's values are +/// integers we must define custom deserailization behavior. +/// +/// -> DOCUMENT = 0 +/// -> DOCUMENTTYPE = 1 +/// -> ELEMENT = 2 +/// -> TEXT = 3 +/// -> CDATA = 4 +/// -> COMMENT = 5 + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +struct Node { + #[serde(skip_serializing_if = "Option::is_none")] + root_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + is_shadow_host: Option, + #[serde(skip_serializing_if = "Option::is_none")] + is_shadow: Option, + #[serde(skip_serializing_if = "Option::is_none")] + compat_mode: Option, + #[serde(flatten)] + variant: NodeVariant, +} + +#[derive(Debug)] + +enum NodeVariant { + T0(Box), + T1(Box), + T2(Box), + T3(Box), // text + T4(Box), // cdata + T5(Box), // comment +} + +#[derive(Debug, Serialize, Deserialize)] +struct DocumentNode { + id: i32, + #[serde(rename = "childNodes")] + child_nodes: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct DocumentTypeNode { + id: Value, + public_id: Value, + system_id: Value, + name: Value, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ElementNode { + id: Value, + attributes: HashMap, + tag_name: String, + child_nodes: Vec, + #[serde(rename = "isSVG", skip_serializing_if = "Option::is_none")] + is_svg: Option, + #[serde(skip_serializing_if = "Option::is_none")] + need_block: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TextNode { + id: Value, + text_content: String, + #[serde(skip_serializing_if = "Option::is_none")] + is_style: Option, +} + +/// Incremental Source Parser +/// +/// Sources have an internally tagged variant on their "source" field. The type must be one of +/// fourteen values. Because the "type" field's values are integers we must define custom +/// deserailization behavior. +/// +/// -> MUTATION = 0 +/// -> MOUSEMOVE = 1 +/// -> MOUSEINTERACTION = 2 +/// -> SCROLL = 3 +/// -> VIEWPORTRESIZE = 4 +/// -> INPUT = 5 +/// -> TOUCHMOVE = 6 +/// -> MEDIAINTERACTION = 7 +/// -> STYLESHEETRULE = 8 +/// -> CANVASMUTATION = 9 +/// -> FONT = 10 +/// -> LOG = 11 +/// -> DRAG = 12 +/// -> STYLEDECLARATION = 13 + +#[derive(Debug)] +enum IncrementalSourceDataVariant { + Mutation(Box), + Input(Box), + Default(Box), +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct InputIncrementalSourceData { + id: i32, + text: String, + is_checked: Value, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + user_triggered: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct MutationIncrementalSourceData { + texts: Vec, + attributes: Vec, + removes: Vec, + adds: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + is_attach_iframe: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct Text { + id: i32, + value: String, +} + +#[derive(Debug)] +struct DefaultIncrementalSourceData { + pub source: u8, + pub value: Value, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct MutationAdditionIncrementalSourceData { + parent_id: Value, + next_id: Value, + node: Node, +} + +#[cfg(test)] +mod tests { + use assert_json_diff::assert_json_eq; + use serde_json::{Error, Value}; + + use crate::recording::{self, Event}; + + fn loads(bytes: &[u8]) -> Result, Error> { + serde_json::from_slice(bytes) + } + + // End to end test coverage. + + #[test] + fn test_process_recording_end_to_end() { + // Valid compressed rrweb payload. Contains a 16 byte header followed by a new line + // character and concludes with a gzipped rrweb payload. + let payload: &[u8] = &[ + 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 120, + 156, 149, 144, 91, 106, 196, 32, 20, 64, 247, 114, 191, 237, 160, 241, 145, 234, 38, + 102, 1, 195, 124, 152, 104, 6, 33, 169, 193, 40, 52, 4, 247, 94, 91, 103, 40, 20, 108, + 59, 191, 247, 30, 207, 225, 122, 57, 32, 238, 171, 5, 69, 17, 24, 29, 53, 168, 3, 54, + 159, 194, 88, 70, 4, 193, 234, 55, 23, 157, 127, 219, 64, 93, 14, 120, 7, 37, 100, 1, + 119, 80, 29, 102, 8, 156, 1, 213, 11, 4, 209, 45, 246, 60, 77, 155, 141, 160, 94, 232, + 43, 206, 232, 206, 118, 127, 176, 132, 177, 7, 203, 42, 75, 36, 175, 44, 231, 63, 88, + 217, 229, 107, 174, 179, 45, 234, 101, 45, 172, 232, 49, 163, 84, 22, 191, 232, 63, 61, + 207, 93, 130, 229, 189, 216, 53, 138, 84, 182, 139, 178, 199, 191, 22, 139, 179, 238, + 196, 227, 244, 134, 137, 240, 158, 60, 101, 34, 255, 18, 241, 6, 116, 42, 212, 119, 35, + 234, 27, 40, 24, 130, 213, 102, 12, 105, 25, 160, 252, 147, 222, 103, 175, 205, 215, + 182, 45, 168, 17, 48, 118, 210, 105, 142, 229, 217, 168, 163, 189, 249, 80, 254, 19, + 146, 59, 13, 115, 10, 144, 115, 190, 126, 0, 2, 68, 180, 16, + ]; + + let result = recording::process_recording(payload, 1000); + assert!(!result.unwrap().is_empty()); + } + + #[test] + fn test_process_recording_no_body_data() { + // Empty bodies can not be decompressed and fail. + let payload: &[u8] = &[ + 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, + ]; + + let result = recording::process_recording(payload, 1000); + assert!(matches!( + result.unwrap_err(), + recording::RecordingParseError::Message("no body found"), + )); + } + + #[test] + fn test_process_recording_bad_body_data() { + // Invalid gzip body contents. Can not deflate. + let payload: &[u8] = &[ + 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 22, + ]; + + let result = recording::process_recording(payload, 1000); + assert!(matches!( + result.unwrap_err(), + recording::RecordingParseError::Compression(_), + )); + } + + #[test] + fn test_process_recording_no_headers() { + // No header delimiter. Entire payload is consumed as headers. The empty body fails. + let payload: &[u8] = &[ + 123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, + ]; + + let result = recording::process_recording(payload, 1000); + assert!(matches!( + result.unwrap_err(), + recording::RecordingParseError::Message("no body found"), + )); + } + + #[test] + fn test_process_recording_no_contents() { + // Empty payload can not be decompressed. Header check never fails. + let payload: &[u8] = &[]; + + let result = recording::process_recording(payload, 1000); + assert!(matches!( + result.unwrap_err(), + recording::RecordingParseError::Message("no data found"), + )); + } + + // RRWeb Payload Coverage + + #[test] + fn test_pii_credit_card_removal() { + let payload = include_bytes!("../../tests/fixtures/rrweb-pii.json"); + let mut events: Vec = serde_json::from_slice(payload).unwrap(); + + recording::strip_pii(&mut events).unwrap(); + + let aa = events.pop().unwrap(); + if let recording::Event::T3(bb) = aa { + if let recording::IncrementalSourceDataVariant::Mutation(mut cc) = bb.data { + let dd = cc.adds.pop().unwrap(); + if let recording::NodeVariant::T2(mut ee) = dd.node.variant { + let ff = ee.child_nodes.pop().unwrap(); + if let recording::NodeVariant::T3(gg) = ff.variant { + assert_eq!(gg.text_content, "[creditcard]"); + return; + } + } + } + } + unreachable!(); + } + + #[test] + fn test_scrub_pii_navigation() { + let payload = include_bytes!("../../tests/fixtures/rrweb-performance-navigation.json"); + let mut events: Vec = serde_json::from_slice(payload).unwrap(); + + recording::strip_pii(&mut events).unwrap(); + + let event = events.pop().unwrap(); + if let recording::Event::T5(custom) = &event { + if let recording::CustomEventDataVariant::PerformanceSpan(span) = &custom.data { + assert_eq!( + &span.payload.description, + "https://sentry.io?credit-card=[creditcard]" + ); + return; + } + } + + unreachable!(); + } + + #[test] + fn test_scrub_pii_resource() { + let payload = include_bytes!("../../tests/fixtures/rrweb-performance-resource.json"); + let mut events: Vec = serde_json::from_slice(payload).unwrap(); + + recording::strip_pii(&mut events).unwrap(); + + let event = events.pop().unwrap(); + if let recording::Event::T5(custom) = &event { + if let recording::CustomEventDataVariant::PerformanceSpan(span) = &custom.data { + assert_eq!( + &span.payload.description, + "https://sentry.io?credit-card=[creditcard]" + ); + return; + } + } + + unreachable!(); + } + + #[test] + fn test_pii_ip_address_removal() { + let payload = include_bytes!("../../tests/fixtures/rrweb-pii-ip-address.json"); + let mut events: Vec = serde_json::from_slice(payload).unwrap(); + + recording::strip_pii(&mut events).unwrap(); + + let parsed = serde_json::to_string(&events).unwrap(); + assert!(parsed.contains("\"value\":\"[ip]\"")); // Assert texts were mutated. + assert!(parsed.contains("\"textContent\":\"[ip]\"")) // Assert text node was mutated. + } + + #[test] + fn test_rrweb_snapshot_parsing() { + let payload = include_bytes!("../../tests/fixtures/rrweb.json"); + + let input_parsed = loads(payload).unwrap(); + let input_raw: Value = serde_json::from_slice(payload).unwrap(); + assert_json_eq!(input_parsed, input_raw) + } + + #[test] + fn test_rrweb_incremental_source_parsing() { + let payload = include_bytes!("../../tests/fixtures/rrweb-diff.json"); + + let input_parsed = loads(payload).unwrap(); + let input_raw: Value = serde_json::from_slice(payload).unwrap(); + assert_json_eq!(input_parsed, input_raw) + } + + // Node coverage + #[test] + fn test_rrweb_node_2_parsing() { + let payload = include_bytes!("../../tests/fixtures/rrweb-node-2.json"); + + let input_parsed: recording::NodeVariant = serde_json::from_slice(payload).unwrap(); + let input_raw: Value = serde_json::from_slice(payload).unwrap(); + assert_json_eq!(input_parsed, input_raw) + } + + #[test] + fn test_rrweb_node_2_style_parsing() { + let payload = include_bytes!("../../tests/fixtures/rrweb-node-2-style.json"); + + let input_parsed: recording::NodeVariant = serde_json::from_slice(payload).unwrap(); + let input_raw: Value = serde_json::from_slice(payload).unwrap(); + serde_json::to_string_pretty(&input_parsed).unwrap(); + assert_json_eq!(input_parsed, input_raw); + } + + // Event Parsing and Scrubbing. + + #[test] + fn test_scrub_pii_full_snapshot_event() { + let payload = include_bytes!("../../tests/fixtures/rrweb-event-2.json"); + let mut events: Vec = serde_json::from_slice(payload).unwrap(); + recording::strip_pii(&mut events).unwrap(); + + let scrubbed_result = serde_json::to_string(&events).unwrap(); + assert!(scrubbed_result.contains("\"attributes\":{\"src\":\"#\"}")); + assert!(scrubbed_result.contains("\"textContent\":\"my ssn is ***********\"")); + } + + #[test] + fn test_scrub_pii_incremental_snapshot_event() { + let payload = include_bytes!("../../tests/fixtures/rrweb-event-3.json"); + let mut events: Vec = serde_json::from_slice(payload).unwrap(); + recording::strip_pii(&mut events).unwrap(); + + let scrubbed_result = serde_json::to_string(&events).unwrap(); + assert!(scrubbed_result.contains("\"textContent\":\"[creditcard]\"")); + assert!(scrubbed_result.contains("\"value\":\"***********\"")); + } + + #[test] + fn test_scrub_pii_custom_event() { + let payload = include_bytes!("../../tests/fixtures/rrweb-event-5.json"); + let mut events: Vec = serde_json::from_slice(payload).unwrap(); + recording::strip_pii(&mut events).unwrap(); + + let scrubbed_result = serde_json::to_string(&events).unwrap(); + assert!(scrubbed_result.contains("\"description\":\"[creditcard]\"")); + assert!(scrubbed_result.contains("\"description\":\"https://sentry.io?ip-address=[ip]\"")); + assert!(scrubbed_result.contains("\"message\":\"[email]\"")); + } +} + +#[doc(hidden)] +/// Only used in benchmarks. +pub fn _deserialize_event(payload: &[u8]) { + let _: Vec = serde_json::from_slice(payload).unwrap(); +} diff --git a/relay-replays/src/recording/serialization.rs b/relay-replays/src/recording/serialization.rs new file mode 100644 index 00000000000..b42d9559076 --- /dev/null +++ b/relay-replays/src/recording/serialization.rs @@ -0,0 +1,246 @@ +use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::Value; + +use crate::recording::*; + +/// Implementation tweaked from serde's `derive(Deserialize)` for internally tagged enums, +/// in order to work with integer tags. +impl<'de> Deserialize<'de> for Event { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let tagged = match Deserializer::deserialize_any( + d, + // NOTE: Use of this private API is discouraged by serde, but we need it for + // efficient deserialization of these large, recursive structures into + // internally tagged enums with integer tags. + // Ideally, we would write our own `derive` for this, or contribute to serde + // to support integer tags out of the box. + serde::__private::de::TaggedContentVisitor::::new( + "type", + "internally tagged enum Event", + ), + ) { + Ok(val) => val, + Err(err) => return Err(err), + }; + let content_deserializer = + serde::__private::de::ContentDeserializer::::new(tagged.content); + match tagged.tag { + 0 => Value::deserialize(content_deserializer).map(Event::T0), + 1 => Value::deserialize(content_deserializer).map(Event::T1), + 2 => Box::::deserialize(content_deserializer).map(Event::T2), + 3 => Box::::deserialize(content_deserializer).map(Event::T3), + 4 => Box::::deserialize(content_deserializer).map(Event::T4), + 5 => Box::::deserialize(content_deserializer).map(Event::T5), + 6 => Value::deserialize(content_deserializer).map(Event::T6), + value => Err(serde::de::Error::invalid_value( + serde::de::Unexpected::Unsigned(value as u64), + &"type id 0 <= i < 7", + )), + } + } +} + +/// Helper for [`Event`] serialization. +#[derive(Serialize)] +#[serde(untagged)] +enum InnerEvent<'a> { + T0(&'a Value), // 0: DOMContentLoadedEvent, + T1(&'a Value), // 1: LoadEvent, + T2(&'a FullSnapshotEvent), + T3(&'a IncrementalSnapshotEvent), + T4(&'a MetaEvent), + T5(&'a CustomEvent), + T6(&'a Value), // 6: PluginEvent, +} + +/// Helper for [`Event`] serialization. +#[derive(Serialize)] +struct OuterEvent<'a> { + #[serde(rename = "type")] + ty: u8, + #[serde(flatten)] + inner: InnerEvent<'a>, +} + +impl<'a> OuterEvent<'a> { + fn new(ty: u8, inner: InnerEvent<'a>) -> Self { + Self { ty, inner } + } +} + +impl Serialize for Event { + fn serialize(&self, s: S) -> Result + where + S: serde::Serializer, + { + match self { + Event::T0(c) => OuterEvent::new(0, InnerEvent::T0(c)), + Event::T1(c) => OuterEvent::new(1, InnerEvent::T1(c)), + Event::T2(c) => OuterEvent::new(2, InnerEvent::T2(c)), + Event::T3(c) => OuterEvent::new(3, InnerEvent::T3(c)), + Event::T4(c) => OuterEvent::new(4, InnerEvent::T4(c)), + Event::T5(c) => OuterEvent::new(5, InnerEvent::T5(c)), + Event::T6(c) => OuterEvent::new(6, InnerEvent::T6(c)), + } + .serialize(s) + } +} + +/// Implementation tweaked from serde's `derive(Deserialize)` for internally tagged enums, +/// in order to work with integer tags. +impl<'de> Deserialize<'de> for NodeVariant { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let tagged = match Deserializer::deserialize_any( + d, + serde::__private::de::TaggedContentVisitor::::new( + "type", + "internally tagged enum NodeVariant", + ), + ) { + Ok(val) => val, + Err(err) => return Err(err), + }; + + let content_deserializer = + serde::__private::de::ContentDeserializer::::new(tagged.content); + match tagged.tag { + 0 => Box::::deserialize(content_deserializer).map(NodeVariant::T0), + 1 => Box::::deserialize(content_deserializer).map(NodeVariant::T1), + 2 => Box::::deserialize(content_deserializer).map(NodeVariant::T2), + 3 => Box::::deserialize(content_deserializer).map(NodeVariant::T3), + 4 => Box::::deserialize(content_deserializer).map(NodeVariant::T4), + 5 => Box::::deserialize(content_deserializer).map(NodeVariant::T5), + value => Err(serde::de::Error::invalid_value( + serde::de::Unexpected::Unsigned(value as u64), + &"type id 0 <= i < 6", + )), + } + } +} + +/// Helper for [`NodeVariant`] serialization. +#[derive(Serialize)] +#[serde(untagged)] +enum InnerNodeVariant<'a> { + T0(&'a DocumentNode), + T1(&'a DocumentTypeNode), + T2(&'a ElementNode), + T3(&'a TextNode), // text + T4(&'a TextNode), // cdata + T5(&'a TextNode), // comment +} + +/// Helper for [`NodeVariant`] serialization. +#[derive(Serialize)] +struct OuterNodeVariant<'a> { + #[serde(rename = "type")] + ty: u8, + #[serde(flatten)] + inner: InnerNodeVariant<'a>, +} + +impl<'a> OuterNodeVariant<'a> { + fn new(ty: u8, inner: InnerNodeVariant<'a>) -> Self { + Self { ty, inner } + } +} + +impl Serialize for NodeVariant { + fn serialize(&self, s: S) -> Result + where + S: serde::Serializer, + { + match self { + NodeVariant::T0(c) => OuterNodeVariant::new(0, InnerNodeVariant::T0(c)), + NodeVariant::T1(c) => OuterNodeVariant::new(1, InnerNodeVariant::T1(c)), + NodeVariant::T2(c) => OuterNodeVariant::new(2, InnerNodeVariant::T2(c)), + NodeVariant::T3(c) => OuterNodeVariant::new(3, InnerNodeVariant::T3(c)), + NodeVariant::T4(c) => OuterNodeVariant::new(4, InnerNodeVariant::T4(c)), + NodeVariant::T5(c) => OuterNodeVariant::new(5, InnerNodeVariant::T5(c)), + } + .serialize(s) + } +} + +/// Implementation tweaked from serde's `derive(Deserialize)` for internally tagged enums, +/// in order to work with integer tags. +impl<'de> Deserialize<'de> for IncrementalSourceDataVariant { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let tagged = match Deserializer::deserialize_any( + d, + serde::__private::de::TaggedContentVisitor::::new( + "source", + "internally tagged enum IncrementalSourceDataVariant", + ), + ) { + Ok(val) => val, + Err(err) => return Err(err), + }; + let content_deserializer = + serde::__private::de::ContentDeserializer::::new(tagged.content); + match tagged.tag { + 0 => Box::::deserialize(content_deserializer) + .map(IncrementalSourceDataVariant::Mutation), + 5 => Box::::deserialize(content_deserializer) + .map(IncrementalSourceDataVariant::Input), + source => Value::deserialize(content_deserializer).map(|value| { + IncrementalSourceDataVariant::Default(Box::new(DefaultIncrementalSourceData { + source, + value, + })) + }), + } + } +} + +/// Helper for [`IncrementalSourceDataVariant`] serialization. +#[derive(Serialize)] +#[serde(untagged)] +enum InnerISDV<'a> { + Mutation(&'a MutationIncrementalSourceData), + Input(&'a InputIncrementalSourceData), + Default(&'a Value), +} + +/// Helper for [`IncrementalSourceDataVariant`] serialization. +#[derive(Serialize)] +struct OuterISDV<'a> { + source: u8, + #[serde(flatten)] + inner: InnerISDV<'a>, +} + +impl<'a> OuterISDV<'a> { + fn new(source: u8, inner: InnerISDV<'a>) -> Self { + Self { source, inner } + } +} + +impl Serialize for IncrementalSourceDataVariant { + fn serialize(&self, s: S) -> Result + where + S: serde::Serializer, + { + match self { + IncrementalSourceDataVariant::Mutation(m) => { + OuterISDV::new(0, InnerISDV::Mutation(m.as_ref())) + } + IncrementalSourceDataVariant::Input(i) => { + OuterISDV::new(5, InnerISDV::Input(i.as_ref())) + } + IncrementalSourceDataVariant::Default(v) => { + OuterISDV::new(v.source, InnerISDV::Default(&v.value)) + } + } + .serialize(s) + } +} diff --git a/relay-replays/src/transform.rs b/relay-replays/src/transform.rs deleted file mode 100644 index ba2d081f708..00000000000 --- a/relay-replays/src/transform.rs +++ /dev/null @@ -1,708 +0,0 @@ -//! A transforming `serde` Deserializer`. - -use std::borrow::Cow; -use std::fmt; - -use serde::de; - -/// A transform for deserialized values. -/// -/// This transformer defines callbacks that will be called by a [`Deserializer`] during -/// deserialization to map values inline. The default for every transform callback is the identity -/// function, which will not change the value. -/// -/// There is a default implementation for all functions with a matching signature, for example -/// `FnMut(&str) -> Cow`. -/// -/// # Strings and Bytes -/// -/// When implementing a transform for strings or bytes, **always implement both** the owned and -/// borrowed version: -/// -/// - `transform_str` and `transform_string` for strings -/// - `transform_bytes` and `transform_byte_buf` for bytes. -/// -/// # Numbers -/// -/// If the deserializer is used on a format that supports all numeric types, the default of each -/// transform function is the identity. To override this, all of `transform_i*` and `transform_u*` -/// have to be implemented. -/// -/// # Example -/// -/// ```ignore -/// struct StringDefault(&'static str); -/// -/// impl Transform for StringDefault { -/// fn transform_str<'a>(&mut self, v: &'a str) -> Cow<'a, str> { -/// match v { -/// "" => Cow::Borrowed(self.0), -/// other => Cow::Borrowed(other), -/// } -/// } -/// -/// fn transform_string(&mut self, v: String) -> Cow<'a, str> { -/// match v.as_str() { -/// "" => Cow::Borrowed(self.0), -/// _ => Cow::Owned(v), -/// } -/// } -/// } -/// ``` -pub trait Transform { - fn transform_bool(&mut self, v: bool) -> bool { - v - } - - fn transform_i8(&mut self, v: i8) -> i8 { - v - } - - fn transform_i16(&mut self, v: i16) -> i16 { - v - } - - fn transform_i32(&mut self, v: i32) -> i32 { - v - } - - fn transform_i64(&mut self, v: i64) -> i64 { - v - } - - fn transform_u8(&mut self, v: u8) -> u8 { - v - } - - fn transform_u16(&mut self, v: u16) -> u16 { - v - } - - fn transform_u32(&mut self, v: u32) -> u32 { - v - } - - fn transform_u64(&mut self, v: u64) -> u64 { - v - } - - serde::serde_if_integer128! { - fn transform_i128(&mut self, v: i128) -> i128 { - v - } - - fn transform_u128(&mut self, v: u128) -> u128 { - v - } - } - - fn transform_f32(&mut self, v: f32) -> f32 { - v - } - - fn transform_f64(&mut self, v: f64) -> f64 { - v - } - - fn transform_char(&mut self, v: char) -> char { - v - } - - fn transform_str<'a>(&mut self, v: &'a str) -> Cow<'a, str> { - Cow::Borrowed(v) - } - - fn transform_string(&mut self, v: String) -> Cow<'static, str> { - Cow::Owned(v) - } - - fn transform_bytes<'a>(&mut self, v: &'a [u8]) -> Cow<'a, [u8]> { - Cow::Borrowed(v) - } - - fn transform_byte_buf(&mut self, v: Vec) -> Cow<'static, [u8]> { - Cow::Owned(v) - } -} - -enum Mut<'a, T> { - Owned(T), - Borrowed(&'a mut T), -} - -impl Mut<'_, T> { - fn as_mut(&mut self) -> &mut T { - match self { - Self::Owned(ref mut t) => t, - Self::Borrowed(t) => t, - } - } -} - -/// A [`Deserializer`](de::Deserializer) that maps all values through a [`Transform`]. -/// -/// This deserializer wraps another deserializer. Values are transformed inline during -/// deserialization and passed directly to the `Deserialize` implementation. All errors are passed -/// through without modification. -/// -/// # Lifetime -/// -/// The lifetime parameter is an implementation detail. [`new`](Self::new) returns a transforming -/// deserializer with static lifetime. -/// -/// # Example -/// -/// ```ignore -/// struct Identity; -/// -/// let json = "42"; -/// let json_deserializer = &mut serde_json::Deserializer::from_str(&json); -/// let deserializer = Deserializer::new(json_deserializer, Identity); -/// -/// let number = deserializer.deserialize_u32(deserializer).unwrap(); -/// assert_eq!(number, 42); -/// ``` -pub struct Deserializer<'a, D, T>(D, Mut<'a, T>); - -impl<'de, 'a, D, T> Deserializer<'a, D, T> -where - D: de::Deserializer<'de>, - T: Transform, -{ - /// Creates a new `Deserializer`. - pub fn new(deserializer: D, transformer: T) -> Self { - Self(deserializer, Mut::Owned(transformer)) - } - - fn borrowed(deserializer: D, transformer: &'a mut T) -> Self { - Self(deserializer, Mut::Borrowed(transformer)) - } -} - -impl<'de, 'a, D, T> de::Deserializer<'de> for Deserializer<'a, D, T> -where - D: de::Deserializer<'de>, - T: Transform, -{ - type Error = D::Error; - - fn deserialize_any(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_any(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_bool(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_bool(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_i8(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_i8(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_i16(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_i16(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_i32(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_i32(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_i64(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_i64(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_u8(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_u8(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_u16(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_u16(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_u32(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_u32(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_u64(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_u64(Visitor(visitor, self.1.as_mut())) - } - - serde::serde_if_integer128! { - fn deserialize_i128(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_i128(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_u128(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_u128(Visitor(visitor, self.1.as_mut())) - } - } - - fn deserialize_f32(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_f32(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_f64(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_f64(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_char(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_char(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_str(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_str(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_string(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_string(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_bytes(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_bytes(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_byte_buf(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0 - .deserialize_byte_buf(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_option(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_option(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_unit(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_unit(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_unit_struct( - mut self, - name: &'static str, - visitor: V, - ) -> Result - where - V: de::Visitor<'de>, - { - self.0 - .deserialize_unit_struct(name, Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_newtype_struct( - mut self, - name: &'static str, - visitor: V, - ) -> Result - where - V: de::Visitor<'de>, - { - self.0 - .deserialize_newtype_struct(name, Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_seq(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_seq(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_tuple(mut self, len: usize, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0 - .deserialize_tuple(len, Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_tuple_struct( - mut self, - name: &'static str, - len: usize, - visitor: V, - ) -> Result - where - V: de::Visitor<'de>, - { - self.0 - .deserialize_tuple_struct(name, len, Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_map(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0.deserialize_map(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_struct( - mut self, - name: &'static str, - fields: &'static [&'static str], - visitor: V, - ) -> Result - where - V: de::Visitor<'de>, - { - self.0 - .deserialize_struct(name, fields, Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_enum( - mut self, - name: &'static str, - variants: &'static [&'static str], - visitor: V, - ) -> Result - where - V: de::Visitor<'de>, - { - self.0 - .deserialize_enum(name, variants, Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_identifier(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0 - .deserialize_identifier(Visitor(visitor, self.1.as_mut())) - } - - fn deserialize_ignored_any(mut self, visitor: V) -> Result - where - V: de::Visitor<'de>, - { - self.0 - .deserialize_ignored_any(Visitor(visitor, self.1.as_mut())) - } -} - -struct Visitor<'a, V, T>(V, &'a mut T); - -impl<'de, 'a, V, T> de::Visitor<'de> for Visitor<'a, V, T> -where - V: de::Visitor<'de>, - T: Transform, -{ - type Value = V::Value; - - fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "any value") - } - - fn visit_bool(self, v: bool) -> Result - where - E: de::Error, - { - self.0.visit_bool(self.1.transform_bool(v)) - } - - fn visit_i8(self, v: i8) -> Result - where - E: de::Error, - { - self.0.visit_i8(self.1.transform_i8(v)) - } - - fn visit_i16(self, v: i16) -> Result - where - E: de::Error, - { - self.0.visit_i16(self.1.transform_i16(v)) - } - - fn visit_i32(self, v: i32) -> Result - where - E: de::Error, - { - self.0.visit_i32(self.1.transform_i32(v)) - } - - fn visit_i64(self, v: i64) -> Result - where - E: de::Error, - { - self.0.visit_i64(self.1.transform_i64(v)) - } - - fn visit_u8(self, v: u8) -> Result - where - E: de::Error, - { - self.0.visit_u8(self.1.transform_u8(v)) - } - - fn visit_u16(self, v: u16) -> Result - where - E: de::Error, - { - self.0.visit_u16(self.1.transform_u16(v)) - } - - fn visit_u32(self, v: u32) -> Result - where - E: de::Error, - { - self.0.visit_u32(self.1.transform_u32(v)) - } - - fn visit_u64(self, v: u64) -> Result - where - E: de::Error, - { - self.0.visit_u64(self.1.transform_u64(v)) - } - - serde::serde_if_integer128! { - fn visit_i128(self, v: i128) -> Result - where E: de::Error - { - self.0.visit_i128(self.1.transform_i128(v)) - } - - fn visit_u128(self, v: u128) -> Result - where E: de::Error - { - self.0.visit_u128(self.1.transform_u128(v)) - } - } - - fn visit_f32(self, v: f32) -> Result - where - E: de::Error, - { - self.0.visit_f32(self.1.transform_f32(v)) - } - - fn visit_f64(self, v: f64) -> Result - where - E: de::Error, - { - self.0.visit_f64(self.1.transform_f64(v)) - } - - fn visit_char(self, v: char) -> Result - where - E: de::Error, - { - self.0.visit_char(self.1.transform_char(v)) - } - - fn visit_borrowed_str(self, v: &'de str) -> Result - where - E: de::Error, - { - match self.1.transform_str(v) { - Cow::Borrowed(v) => self.0.visit_borrowed_str(v), - Cow::Owned(v) => self.0.visit_string(v), - } - } - - fn visit_str(self, v: &str) -> Result - where - E: de::Error, - { - match self.1.transform_str(v) { - Cow::Borrowed(v) => self.0.visit_str(v), - Cow::Owned(v) => self.0.visit_string(v), - } - } - - fn visit_string(self, v: String) -> Result - where - E: de::Error, - { - match self.1.transform_string(v) { - Cow::Borrowed(v) => self.0.visit_borrowed_str(v), - Cow::Owned(v) => self.0.visit_string(v), - } - } - - fn visit_unit(self) -> Result - where - E: de::Error, - { - self.0.visit_unit() - } - - fn visit_none(self) -> Result - where - E: de::Error, - { - self.0.visit_none() - } - - fn visit_some(self, d: D) -> Result - where - D: de::Deserializer<'de>, - { - self.0.visit_some(Deserializer::borrowed(d, self.1)) - } - - fn visit_newtype_struct(self, d: D) -> Result - where - D: de::Deserializer<'de>, - { - self.0 - .visit_newtype_struct(Deserializer::borrowed(d, self.1)) - } - - fn visit_seq(self, v: A) -> Result - where - A: de::SeqAccess<'de>, - { - self.0.visit_seq(SeqAccess(v, self.1)) - } - - fn visit_map(self, v: A) -> Result - where - A: de::MapAccess<'de>, - { - self.0.visit_map(MapAccess(v, self.1)) - } - - fn visit_bytes(self, v: &[u8]) -> Result - where - E: de::Error, - { - match self.1.transform_bytes(v) { - Cow::Borrowed(v) => self.0.visit_bytes(v), - Cow::Owned(v) => self.0.visit_byte_buf(v), - } - } - - fn visit_byte_buf(self, v: Vec) -> Result - where - E: de::Error, - { - match self.1.transform_byte_buf(v) { - Cow::Borrowed(v) => self.0.visit_bytes(v), - Cow::Owned(v) => self.0.visit_byte_buf(v), - } - } -} - -struct SeqAccess<'a, A, T>(A, &'a mut T); - -impl<'de, 'a, A, T> de::SeqAccess<'de> for SeqAccess<'a, A, T> -where - A: de::SeqAccess<'de>, - T: Transform, -{ - type Error = A::Error; - - fn size_hint(&self) -> Option { - self.0.size_hint() - } - - fn next_element_seed(&mut self, seed: S) -> Result, Self::Error> - where - S: de::DeserializeSeed<'de>, - { - self.0.next_element_seed(DeserializeSeed(seed, self.1)) - } -} - -struct MapAccess<'a, A, T>(A, &'a mut T); - -impl<'de, 'a, A, T> de::MapAccess<'de> for MapAccess<'a, A, T> -where - A: de::MapAccess<'de>, - T: Transform, -{ - type Error = A::Error; - - fn next_key_seed(&mut self, seed: K) -> Result, Self::Error> - where - K: de::DeserializeSeed<'de>, - { - // NOTE: No transform on keys. - self.0.next_key_seed(seed) - } - - fn next_value_seed(&mut self, seed: V) -> Result - where - V: de::DeserializeSeed<'de>, - { - self.0.next_value_seed(DeserializeSeed(seed, self.1)) - } -} - -struct DeserializeSeed<'a, D, T>(D, &'a mut T); - -impl<'de, 'a, D, T> de::DeserializeSeed<'de> for DeserializeSeed<'a, D, T> -where - D: de::DeserializeSeed<'de>, - T: Transform, -{ - type Value = D::Value; - - fn deserialize(self, deserializer: X) -> Result - where - X: serde::Deserializer<'de>, - { - self.0 - .deserialize(Deserializer::borrowed(deserializer, self.1)) - } -} diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index ae22f2e9f5e..af9612f959b 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -72,6 +72,7 @@ zstd = { version = "0.11.2"} [target."cfg(not(windows))".dependencies] libc = "0.2.71" +tikv-jemallocator = { version = "0.5", features = ["stats"] } [dev-dependencies] insta = { version = "1.19.0", features = ["json"] } diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index fc22c067c5c..c2353b10c07 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -1,5 +1,4 @@ use bytes::Bytes; -use relay_replays::recording::ReplayScrubber; use std::collections::BTreeMap; use std::convert::TryFrom; use std::io::Write; @@ -1058,7 +1057,7 @@ impl EnvelopeProcessorService { fn process_replays(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> { let project_state = &mut state.project_state; let replays_enabled = project_state.has_feature(Feature::SessionReplay); - let scrubbing_enabled = project_state.has_feature(Feature::SessionReplayRecordingScrubbing); + // let scrubbing_enabled = project_state.has_feature(Feature::SessionReplayRecordingScrubbing); let context = &state.envelope_context; let meta = state.envelope.meta().clone(); @@ -1066,15 +1065,13 @@ impl EnvelopeProcessorService { let user_agent = meta.user_agent(); let event_id = state.envelope.event_id(); - let limit = self.config.max_replay_size(); + // let limit = self.config.max_replay_size(); let config = project_state.config(); - let datascrubbing_config = config - .datascrubbing_settings - .pii_config() - .map_err(|e| ProcessingError::PiiConfigError(e.clone()))? - .as_ref(); - let mut scrubber = - ReplayScrubber::new(limit, config.pii_config.as_ref(), datascrubbing_config); + // let datascrubbing_config = config + // .datascrubbing_settings + // .pii_config() + // .map_err(|e| ProcessingError::PiiConfigError(e.clone()))? + // .as_ref(); state.envelope.retain_items(|item| match item.ty() { ItemType::ReplayEvent => { @@ -1136,41 +1133,35 @@ impl EnvelopeProcessorService { } } ItemType::ReplayRecording => { - if !replays_enabled { - return false; - } - - // XXX: Processing is there just for data scrubbing. Skip the entire expensive - // processing step if we do not need to scrub. - if !scrubbing_enabled || scrubber.is_empty() { - return true; - } - - // Limit expansion of recordings to the max replay size. The payload is - // decompressed temporarily and then immediately re-compressed. However, to - // limit memory pressure, we use the replay limit as a good overall limit for - // allocations. - let parsed_recording = metric!(timer(RelayTimers::ReplayRecordingProcessing), { - scrubber.process_recording(&item.payload()) - }); - - match parsed_recording { - Ok(recording) => { - item.set_payload(ContentType::OctetStream, recording.as_slice()); - } - Err(e) => { - relay_log::warn!("replay-recording-event: {e} {event_id:?}"); - context.track_outcome( - Outcome::Invalid(DiscardReason::InvalidReplayRecordingEvent), - DataCategory::Replay, - 1, - ); + if replays_enabled { + // Limit expansion of recordings to the max replay size. The payload is + // decompressed temporarily and then immediately re-compressed. However, to + // limit memory pressure, we use the replay limit as a good overall limit for + // allocations. + let limit = self.config.max_replay_size(); + let parsed_recording = + metric!(timer(RelayTimers::ReplayRecordingProcessing), { + relay_replays::recording::process_recording(&item.payload(), limit) + }); + + match parsed_recording { + Ok(recording) => { + item.set_payload(ContentType::OctetStream, recording.as_slice()); + } + Err(e) => { + relay_log::warn!("replay-recording-event: {e} {event_id:?}"); + context.track_outcome( + Outcome::Invalid(DiscardReason::InvalidReplayRecordingEvent), + DataCategory::Replay, + 1, + ); + } } } // XXX: For now replays that could not be parsed are still accepted while we // determine the impact of the recording parser. - true + replays_enabled } _ => true, }); diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 123704d1da9..79dded0ef25 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -253,6 +253,13 @@ )] #![allow(clippy::derive_partial_eq_without_eq)] +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + mod actors; mod body; mod constants;