From ead7f3754258b00d929444762dc44db7dba3de92 Mon Sep 17 00:00:00 2001 From: Titus <8200809+nismotie@users.noreply.github.com> Date: Wed, 20 Dec 2023 17:02:23 +0000 Subject: [PATCH] Implement Serialize for lambda telemetry (#759) * Add serialize testing mod and macro * Use and derive serialize * Skip serialize if None on Options * Add unit tests for serialization --- lambda-extension/src/telemetry.rs | 231 ++++++++++++++++++++++++++++-- 1 file changed, 216 insertions(+), 15 deletions(-) diff --git a/lambda-extension/src/telemetry.rs b/lambda-extension/src/telemetry.rs index 1e83ee8e..cfb4dde2 100644 --- a/lambda-extension/src/telemetry.rs +++ b/lambda-extension/src/telemetry.rs @@ -3,14 +3,14 @@ use http::{Request, Response}; use http_body_util::BodyExt; use hyper::body::Incoming; use lambda_runtime_api_client::body::Body; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::{boxed::Box, fmt, sync::Arc}; use tokio::sync::Mutex; use tower::Service; use tracing::{error, trace}; /// Payload received from the Telemetry API -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub struct LambdaTelemetry { /// Time when the telemetry was generated pub time: DateTime, @@ -20,7 +20,7 @@ pub struct LambdaTelemetry { } /// Record in a LambdaTelemetry entry -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] #[serde(tag = "type", content = "record", rename_all = "lowercase")] pub enum LambdaTelemetryRecord { /// Function log records @@ -37,8 +37,10 @@ pub enum LambdaTelemetryRecord { /// Phase of initialisation phase: InitPhase, /// Lambda runtime version + #[serde(skip_serializing_if = "Option::is_none")] runtime_version: Option, /// Lambda runtime version ARN + #[serde(skip_serializing_if = "Option::is_none")] runtime_version_arn: Option, }, /// Platform init runtime done record @@ -47,10 +49,12 @@ pub enum LambdaTelemetryRecord { /// Type of initialization initialization_type: InitType, /// Phase of initialisation + #[serde(skip_serializing_if = "Option::is_none")] phase: Option, /// Status of initalization status: Status, /// When the status = failure, the error_type describes what kind of error occurred + #[serde(skip_serializing_if = "Option::is_none")] error_type: Option, /// Spans #[serde(default)] @@ -75,8 +79,10 @@ pub enum LambdaTelemetryRecord { /// Request identifier request_id: String, /// Version of the Lambda function + #[serde(skip_serializing_if = "Option::is_none")] version: Option, /// Trace Context + #[serde(skip_serializing_if = "Option::is_none")] tracing: Option, }, /// Record marking the completion of an invocation @@ -87,13 +93,16 @@ pub enum LambdaTelemetryRecord { /// Status of the invocation status: Status, /// When unsuccessful, the error_type describes what kind of error occurred + #[serde(skip_serializing_if = "Option::is_none")] error_type: Option, /// Metrics corresponding to the runtime + #[serde(skip_serializing_if = "Option::is_none")] metrics: Option, /// Spans #[serde(default)] spans: Vec, /// Trace Context + #[serde(skip_serializing_if = "Option::is_none")] tracing: Option, }, /// Platfor report record @@ -104,6 +113,7 @@ pub enum LambdaTelemetryRecord { /// Status of the invocation status: Status, /// When unsuccessful, the error_type describes what kind of error occurred + #[serde(skip_serializing_if = "Option::is_none")] error_type: Option, /// Metrics metrics: ReportMetrics, @@ -111,6 +121,7 @@ pub enum LambdaTelemetryRecord { #[serde(default)] spans: Vec, /// Trace Context + #[serde(skip_serializing_if = "Option::is_none")] tracing: Option, }, @@ -147,7 +158,7 @@ pub enum LambdaTelemetryRecord { } /// Type of Initialization -#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] #[serde(rename_all = "kebab-case")] pub enum InitType { /// Initialised on demand @@ -159,7 +170,7 @@ pub enum InitType { } /// Phase in which initialization occurs -#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] #[serde(rename_all = "kebab-case")] pub enum InitPhase { /// Initialization phase @@ -169,7 +180,7 @@ pub enum InitPhase { } /// Status of invocation/initialization -#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] #[serde(rename_all = "kebab-case")] pub enum Status { /// Success @@ -183,7 +194,7 @@ pub enum Status { } /// Span -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] #[serde(rename_all = "camelCase")] pub struct Span { /// Duration of the span @@ -195,7 +206,7 @@ pub struct Span { } /// Tracing Context -#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] #[serde(rename_all = "camelCase")] pub struct TraceContext { /// Span ID @@ -207,7 +218,7 @@ pub struct TraceContext { } /// Type of tracing -#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] pub enum TracingType { /// Amazon trace type #[serde(rename = "X-Amzn-Trace-Id")] @@ -215,7 +226,7 @@ pub enum TracingType { } ///Init report metrics -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] #[serde(rename_all = "camelCase")] pub struct InitReportMetrics { /// Duration of initialization @@ -223,7 +234,7 @@ pub struct InitReportMetrics { } /// Report metrics -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] #[serde(rename_all = "camelCase")] pub struct ReportMetrics { /// Duration in milliseconds @@ -237,15 +248,15 @@ pub struct ReportMetrics { #[serde(rename = "maxMemoryUsedMB")] pub max_memory_used_mb: u64, /// Init duration in case of a cold start - #[serde(default = "Option::default")] + #[serde(default = "Option::default", skip_serializing_if = "Option::is_none")] pub init_duration_ms: Option, /// Restore duration in milliseconds - #[serde(default = "Option::default")] + #[serde(default = "Option::default", skip_serializing_if = "Option::is_none")] pub restore_duration_ms: Option, } /// Runtime done metrics -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] #[serde(rename_all = "camelCase")] pub struct RuntimeDoneMetrics { /// Duration in milliseconds @@ -303,7 +314,7 @@ where } #[cfg(test)] -mod tests { +mod deserialization_tests { use super::*; use chrono::{Duration, TimeZone}; @@ -459,3 +470,193 @@ mod tests { ), } } + +#[cfg(test)] +mod serialization_tests { + use chrono::{Duration, TimeZone}; + + use super::*; + macro_rules! serialize_tests { + ($($name:ident: $value:expr,)*) => { + $( + #[test] + fn $name() { + let (input, expected) = $value; + let actual = serde_json::to_string(&input).expect("unable to serialize"); + println!("Input: {:?}\n", input); + println!("Expected:\n {:?}\n", expected); + println!("Actual:\n {:?}\n", actual); + + assert!(actual == expected); + } + )* + } + } + + serialize_tests! { + // function + function: ( + LambdaTelemetry { + time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(), + record: LambdaTelemetryRecord::Function("hello world".to_string()), + }, + r#"{"time":"2023-11-28T12:00:09Z","type":"function","record":"hello world"}"#, + ), + // extension + extension: ( + LambdaTelemetry { + time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(), + record: LambdaTelemetryRecord::Extension("hello world".to_string()), + }, + r#"{"time":"2023-11-28T12:00:09Z","type":"extension","record":"hello world"}"#, + ), + //platform.Start + platform_start: ( + LambdaTelemetry{ + time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(), + record: LambdaTelemetryRecord::PlatformStart { + request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(), + version: Some("$LATEST".to_string()), + tracing: Some(TraceContext{ + span_id: Some("24cd7d670fa455f0".to_string()), + r#type: TracingType::AmznTraceId, + value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(), + }), + } + }, + r#"{"time":"2023-11-28T12:00:09Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#, + ), + // platform.initStart + platform_init_start: ( + LambdaTelemetry{ + time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(), + record: LambdaTelemetryRecord::PlatformInitStart { + initialization_type: InitType::OnDemand, + phase: InitPhase::Init, + runtime_version: None, + runtime_version_arn: None, + }, + }, + r#"{"time":"2023-11-28T12:00:09Z","type":"platform.initStart","record":{"initializationType":"on-demand","phase":"init"}}"#, + ), + // platform.runtimeDone + platform_runtime_done: ( + LambdaTelemetry{ + time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(), + record: LambdaTelemetryRecord::PlatformRuntimeDone { + request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(), + status: Status::Success, + error_type: None, + metrics: Some(RuntimeDoneMetrics { + duration_ms: 2599.0, + produced_bytes: Some(8), + }), + spans: vec!( + Span { + name:"responseLatency".to_string(), + start: Utc + .with_ymd_and_hms(2022, 10, 21, 14, 5, 3) + .unwrap() + .checked_add_signed(Duration::milliseconds(165)) + .unwrap(), + duration_ms: 2598.0 + }, + Span { + name:"responseDuration".to_string(), + start: Utc + .with_ymd_and_hms(2022, 10, 21, 14, 5, 5) + .unwrap() + .checked_add_signed(Duration::milliseconds(763)) + .unwrap(), + duration_ms: 0.0 + }, + ), + tracing: Some(TraceContext{ + span_id: Some("24cd7d670fa455f0".to_string()), + r#type: TracingType::AmznTraceId, + value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(), + }), + }, + }, + r#"{"time":"2023-11-28T12:00:09Z","type":"platform.runtimeDone","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","status":"success","metrics":{"durationMs":2599.0,"producedBytes":8},"spans":[{"durationMs":2598.0,"name":"responseLatency","start":"2022-10-21T14:05:03.165Z"},{"durationMs":0.0,"name":"responseDuration","start":"2022-10-21T14:05:05.763Z"}],"tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#, + ), + // platform.report + platform_report: ( + LambdaTelemetry{ + time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(), + record: LambdaTelemetryRecord::PlatformReport { + request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(), + status: Status::Success, + error_type: None, + metrics: ReportMetrics { + duration_ms: 2599.4, + billed_duration_ms: 2600, + memory_size_mb:128, + max_memory_used_mb:94, + init_duration_ms: Some(549.04), + restore_duration_ms: None, + }, + spans: Vec::new(), + tracing: Some(TraceContext { + span_id: Some("24cd7d670fa455f0".to_string()), + r#type: TracingType::AmznTraceId, + value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(), + }), + }, + }, + r#"{"time":"2023-11-28T12:00:09Z","type":"platform.report","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","status":"success","metrics":{"durationMs":2599.4,"billedDurationMs":2600,"memorySizeMB":128,"maxMemoryUsedMB":94,"initDurationMs":549.04},"spans":[],"tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#, + ), + // platform.telemetrySubscription + platform_telemetry_subscription: ( + LambdaTelemetry{ + time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(), + record: LambdaTelemetryRecord::PlatformTelemetrySubscription { + name: "my-extension".to_string(), + state: "Subscribed".to_string(), + types: vec!("platform".to_string(), "function".to_string()), + }, + }, + r#"{"time":"2023-11-28T12:00:09Z","type":"platform.telemetrySubscription","record":{"name":"my-extension","state":"Subscribed","types":["platform","function"]}}"#, + ), + // platform.initRuntimeDone + platform_init_runtime_done: ( + LambdaTelemetry{ + time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(), + record: LambdaTelemetryRecord::PlatformInitRuntimeDone { + initialization_type: InitType::OnDemand, + status: Status::Success, + phase: None, + error_type: None, + spans: Vec::new(), + }, + }, + r#"{"time":"2023-11-28T12:00:09Z","type":"platform.initRuntimeDone","record":{"initializationType":"on-demand","status":"success","spans":[]}}"#, + ), + // platform.extension + platform_extension: ( + LambdaTelemetry { + time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(), + record: LambdaTelemetryRecord::PlatformExtension { + name: "my-extension".to_string(), + state: "Ready".to_string(), + events: vec!("SHUTDOWN".to_string(), "INVOKE".to_string()), + }, + }, + r#"{"time":"2023-11-28T12:00:09Z","type":"platform.extension","record":{"name":"my-extension","state":"Ready","events":["SHUTDOWN","INVOKE"]}}"#, + ), + // platform.initReport + platform_init_report: ( + LambdaTelemetry { + time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(), + record: LambdaTelemetryRecord::PlatformInitReport { + initialization_type: InitType::OnDemand, + phase: InitPhase::Init, + metrics: InitReportMetrics { duration_ms: 500.0 }, + spans: Vec::new(), + }, + }, + r#"{"time":"2023-11-28T12:00:09Z","type":"platform.initReport","record":{"initializationType":"on-demand","phase":"init","metrics":{"durationMs":500.0},"spans":[]}}"#, + ), + + } +}