From 38a78d8bb0323dfcc8a4502988a30fc642ad2e51 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Thu, 23 Nov 2023 11:50:52 +0800 Subject: [PATCH] feat(meta): add event log service (#13392) --- Cargo.lock | 5 + Makefile.toml | 2 + proto/meta.proto | 39 ++++ src/common/src/config.rs | 14 ++ src/config/example.toml | 2 + .../src/catalog/system_catalog/mod.rs | 1 + .../catalog/system_catalog/rw_catalog/mod.rs | 2 + .../rw_catalog/rw_event_logs.rs | 67 ++++++ src/frontend/src/meta_client.rs | 6 + src/frontend/src/test_utils.rs | 6 +- src/meta/Cargo.toml | 1 + src/meta/node/Cargo.toml | 2 + src/meta/node/src/lib.rs | 2 + src/meta/node/src/server.rs | 17 ++ src/meta/service/src/event_log_service.rs | 39 ++++ src/meta/service/src/lib.rs | 1 + src/meta/src/manager/catalog/mod.rs | 31 ++- src/meta/src/manager/env.rs | 21 +- src/meta/src/manager/event_log.rs | 172 ++++++++++++++ src/meta/src/manager/mod.rs | 1 + src/meta/src/rpc/ddl_controller.rs | 18 +- src/rpc_client/src/meta_client.rs | 13 +- src/stream/Cargo.toml | 1 + src/stream/src/executor/actor.rs | 5 + src/tests/simulation/Cargo.toml | 1 + .../integration_tests/recovery/event_log.rs | 219 ++++++++++++++++++ .../tests/integration_tests/recovery/mod.rs | 1 + 27 files changed, 681 insertions(+), 8 deletions(-) create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs create mode 100644 src/meta/service/src/event_log_service.rs create mode 100644 src/meta/src/manager/event_log.rs create mode 100644 src/tests/simulation/tests/integration_tests/recovery/event_log.rs diff --git a/Cargo.lock b/Cargo.lock index 136d91c75c71d..0b98ab4bfa4e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8112,6 +8112,7 @@ dependencies = [ "serde_json", "sync-point", "thiserror", + "thiserror-ext", "tokio-retry", "tokio-stream", "tower", @@ -8165,6 +8166,8 @@ dependencies = [ "risingwave_pb", "risingwave_rpc_client", "sea-orm", + "serde", + "serde_json", "tracing", "workspace-hack", ] @@ -8346,6 +8349,7 @@ dependencies = [ "cfg-or-panic", "clap", "console", + "fail", "futures", "glob", "itertools 0.12.0", @@ -8564,6 +8568,7 @@ dependencies = [ "either", "enum-as-inner", "expect-test", + "fail", "futures", "futures-async-stream", "governor", diff --git a/Makefile.toml b/Makefile.toml index 4b44f73774d33..d32aae0330a13 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -965,6 +965,7 @@ set -e cargo nextest run \ --config "target.'cfg(all())'.rustflags = ['--cfg=madsim']" \ + --features fail/failpoints \ -p risingwave_simulation \ "$@" """ @@ -980,6 +981,7 @@ set -e cargo nextest archive \ --config "target.'cfg(all())'.rustflags = ['--cfg=madsim']" \ + --features fail/failpoints \ -p risingwave_simulation \ --archive-file simulation-it-test.tar.zst \ "$@" diff --git a/proto/meta.proto b/proto/meta.proto index 0425ff81276f5..19fb29c260736 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -565,3 +565,42 @@ message GetServingVnodeMappingsResponse { service ServingService { rpc GetServingVnodeMappings(GetServingVnodeMappingsRequest) returns (GetServingVnodeMappingsResponse); } + +message EventLog { + message EventMetaNodeStart { + string advertise_addr = 1; + string listen_addr = 2; + string opts = 3; + } + message EventCreateStreamJobFail { + uint32 id = 1; + string name = 2; + string definition = 3; + string error = 4; + } + message EventDirtyStreamJobClear { + uint32 id = 1; + string name = 2; + string definition = 3; + string error = 4; + } + // Event logs identifier, which should be populated by event log service. + optional string unique_id = 1; + // Processing time, which should be populated by event log service. + optional uint64 timestamp = 2; + oneof event { + EventCreateStreamJobFail create_stream_job_fail = 3; + EventDirtyStreamJobClear dirty_stream_job_clear = 4; + EventMetaNodeStart meta_node_start = 5; + } +} + +message ListEventLogRequest {} + +message ListEventLogResponse { + repeated EventLog event_logs = 1; +} + +service EventLogService { + rpc ListEventLog(ListEventLogRequest) returns (ListEventLogResponse); +} diff --git a/src/common/src/config.rs b/src/common/src/config.rs index f80afbebf0ee0..daef793bece92 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -287,6 +287,12 @@ pub struct MetaConfig { #[serde(default)] pub compaction_config: CompactionConfig, + + #[serde(default = "default::meta::event_log_enabled")] + pub event_log_enabled: bool, + /// Keeps the latest N events per channel. + #[serde(default = "default::meta::event_log_channel_max_size")] + pub event_log_channel_max_size: u32, } #[derive(Clone, Debug, Default)] @@ -975,6 +981,14 @@ pub mod default { pub fn compaction_task_max_heartbeat_interval_secs() -> u64 { 60 // 1min } + + pub fn event_log_enabled() -> bool { + true + } + + pub fn event_log_channel_max_size() -> u32 { + 10 + } } pub mod server { diff --git a/src/config/example.toml b/src/config/example.toml index 301386d2ddb90..6db5134b40a45 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -41,6 +41,8 @@ partition_vnode_count = 64 table_write_throughput_threshold = 16777216 min_table_split_write_throughput = 4194304 compaction_task_max_heartbeat_interval_secs = 60 +event_log_enabled = true +event_log_channel_max_size = 10 [meta.compaction_config] max_bytes_for_level_base = 536870912 diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 897171cfd38b4..04bd4354f1c83 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -405,6 +405,7 @@ prepare_sys_catalog! { { BuiltinCatalog::Table(&RW_HUMMOCK_BRANCHED_OBJECTS), read_hummock_branched_objects await }, { BuiltinCatalog::Table(&RW_HUMMOCK_COMPACTION_GROUP_CONFIGS), read_hummock_compaction_group_configs await }, { BuiltinCatalog::Table(&RW_HUMMOCK_META_CONFIGS), read_hummock_meta_configs await}, + { BuiltinCatalog::Table(&RW_EVENT_LOGS), read_event_logs await}, { BuiltinCatalog::Table(&RW_HUMMOCK_COMPACT_TASK_ASSIGNMEN), read_hummock_compaction_status await }, { BuiltinCatalog::Table(&RW_DESCRIPTION), read_rw_description }, } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 6073c54ebcdd6..7f888f16f0ea0 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -18,6 +18,7 @@ mod rw_connections; mod rw_databases; mod rw_ddl_progress; mod rw_description; +mod rw_event_logs; mod rw_fragments; mod rw_functions; mod rw_hummock_branched_objects; @@ -54,6 +55,7 @@ pub use rw_connections::*; pub use rw_databases::*; pub use rw_ddl_progress::*; pub use rw_description::*; +pub use rw_event_logs::*; pub use rw_fragments::*; pub use rw_functions::*; pub use rw_hummock_branched_objects::*; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs new file mode 100644 index 0000000000000..ab012eefd1241 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs @@ -0,0 +1,67 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; +use risingwave_common::error::Result; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, ScalarImpl, Timestamptz}; +use risingwave_pb::meta::event_log::Event; +use serde_json::json; + +use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl}; + +pub const RW_EVENT_LOGS: BuiltinTable = BuiltinTable { + name: "rw_event_logs", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &[ + (DataType::Varchar, "unique_id"), + (DataType::Timestamptz, "timestamp"), + (DataType::Varchar, "event_type"), + (DataType::Jsonb, "info"), + ], + pk: &[0], +}; + +impl SysCatalogReaderImpl { + pub async fn read_event_logs(&self) -> Result> { + let configs = self + .meta_client + .list_event_log() + .await? + .into_iter() + .sorted_by(|a, b| a.timestamp.cmp(&b.timestamp)) + .map(|e| { + let ts = Timestamptz::from_millis(e.timestamp.unwrap() as i64).unwrap(); + let event_type = event_type(e.event.as_ref().unwrap()); + OwnedRow::new(vec![ + Some(ScalarImpl::Utf8(e.unique_id.to_owned().unwrap().into())), + Some(ScalarImpl::Timestamptz(ts)), + Some(ScalarImpl::Utf8(event_type.into())), + Some(ScalarImpl::Jsonb(json!(e).into())), + ]) + }) + .collect_vec(); + Ok(configs) + } +} + +fn event_type(e: &Event) -> String { + match e { + Event::CreateStreamJobFail(_) => "CREATE_STREAM_JOB_FAIL", + Event::DirtyStreamJobClear(_) => "DIRTY_STREAM_JOB_CLEAR", + Event::MetaNodeStart(_) => "META_NODE_START", + } + .into() +} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 239e637665e79..2ee5ef6e322f4 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -30,6 +30,7 @@ use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; +use risingwave_pb::meta::EventLog; use risingwave_rpc_client::error::Result; use risingwave_rpc_client::{HummockMetaClient, MetaClient}; @@ -99,6 +100,7 @@ pub trait FrontendMetaClient: Send + Sync { async fn list_hummock_meta_configs(&self) -> Result>; + async fn list_event_log(&self) -> Result>; async fn list_compact_task_assignment(&self) -> Result>; async fn list_all_nodes(&self) -> Result>; } @@ -246,6 +248,10 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.list_hummock_meta_config().await } + async fn list_event_log(&self) -> Result> { + self.0.list_event_log().await + } + async fn list_compact_task_assignment(&self) -> Result> { self.0.rise_ctl_list_compact_task_assignment().await } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index a1481a52cba21..ec5a1c03d64b2 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -51,7 +51,7 @@ use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; -use risingwave_pb::meta::SystemParams; +use risingwave_pb::meta::{EventLog, SystemParams}; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_pb::user::update_user_request::UpdateField; use risingwave_pb::user::{GrantPrivilege, UserInfo}; @@ -924,6 +924,10 @@ impl FrontendMetaClient for MockFrontendMetaClient { unimplemented!() } + async fn list_event_log(&self) -> RpcResult> { + unimplemented!() + } + async fn list_compact_task_assignment(&self) -> RpcResult> { unimplemented!() } diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 9acbf84ff121a..32b5ea9a5a3fa 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -67,6 +67,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" sync-point = { path = "../utils/sync-point" } thiserror = "1" +thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", diff --git a/src/meta/node/Cargo.toml b/src/meta/node/Cargo.toml index 04fbea0705a87..9d6ec577d6c70 100644 --- a/src/meta/node/Cargo.toml +++ b/src/meta/node/Cargo.toml @@ -38,6 +38,8 @@ sea-orm = { version = "0.12.0", features = [ "runtime-tokio-native-tls", "macros", ] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 5e12448bf27ad..7f4ee1a63623a 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -307,6 +307,8 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { .meta .compaction_task_max_heartbeat_interval_secs, compaction_config: Some(config.meta.compaction_config), + event_log_enabled: config.meta.event_log_enabled, + event_log_channel_max_size: config.meta.event_log_channel_max_size, advertise_addr: opts.advertise_addr, }, config.system.into_init_system_params(), diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index ce17c42ee8e72..b8ef739872b05 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -100,7 +100,9 @@ pub struct MetaStoreSqlBackend { } use risingwave_meta::MetaStoreBackend; +use risingwave_meta_service::event_log_service::EventLogServiceImpl; use risingwave_meta_service::AddressInfo; +use risingwave_pb::meta::event_log_service_server::EventLogServiceServer; pub async fn rpc_serve( address_info: AddressInfo, @@ -619,6 +621,7 @@ pub async fn start_service_as_election_leader( let serving_srv = ServingServiceImpl::new(serving_vnode_mapping.clone(), fragment_manager.clone()); let cloud_srv = CloudServiceImpl::new(catalog_manager.clone(), aws_cli); + let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref()); if let Some(prometheus_addr) = address_info.prometheus_addr { MetricsManager::boot_metrics_service(prometheus_addr.to_string()) @@ -706,6 +709,10 @@ pub async fn start_service_as_election_leader( tracing::info!("Telemetry didn't start due to meta backend or config"); } + if let Some(pair) = env.event_log_manager_ref().take_join_handle() { + sub_tasks.push(pair); + } + let shutdown_all = async move { let mut handles = Vec::with_capacity(sub_tasks.len()); @@ -743,6 +750,15 @@ pub async fn start_service_as_election_leader( tracing::info!("Assigned cluster id {:?}", *env.cluster_id()); tracing::info!("Starting meta services"); + let event = risingwave_pb::meta::event_log::EventMetaNodeStart { + advertise_addr: address_info.advertise_addr, + listen_addr: address_info.listen_addr.to_string(), + opts: serde_json::to_string(&env.opts).unwrap(), + }; + env.event_log_manager_ref().add_event_logs(vec![ + risingwave_pb::meta::event_log::Event::MetaNodeStart(event), + ]); + tonic::transport::Server::builder() .layer(MetricsMiddlewareLayer::new(meta_metrics)) .layer(TracingExtractLayer::new()) @@ -764,6 +780,7 @@ pub async fn start_service_as_election_leader( .add_service(ServingServiceServer::new(serving_srv)) .add_service(CloudServiceServer::new(cloud_srv)) .add_service(SinkCoordinationServiceServer::new(sink_coordination_srv)) + .add_service(EventLogServiceServer::new(event_log_srv)) .monitored_serve_with_shutdown( address_info.listen_addr, "grpc-meta-leader-service", diff --git a/src/meta/service/src/event_log_service.rs b/src/meta/service/src/event_log_service.rs new file mode 100644 index 0000000000000..bc71a07677498 --- /dev/null +++ b/src/meta/service/src/event_log_service.rs @@ -0,0 +1,39 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_meta::manager::event_log::EventLogMangerRef; +use risingwave_pb::meta::event_log_service_server::EventLogService; +use risingwave_pb::meta::{ListEventLogRequest, ListEventLogResponse}; +use tonic::{Request, Response, Status}; + +pub struct EventLogServiceImpl { + event_log_manager: EventLogMangerRef, +} + +impl EventLogServiceImpl { + pub fn new(event_log_manager: EventLogMangerRef) -> Self { + Self { event_log_manager } + } +} + +#[async_trait::async_trait] +impl EventLogService for EventLogServiceImpl { + async fn list_event_log( + &self, + _request: Request, + ) -> Result, Status> { + let event_logs = self.event_log_manager.list_event_logs(); + Ok(Response::new(ListEventLogResponse { event_logs })) + } +} diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index 6c8cc11f8971c..4d028a1bb5f79 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -24,6 +24,7 @@ pub mod backup_service; pub mod cloud_service; pub mod cluster_service; pub mod ddl_service; +pub mod event_log_service; pub mod health_service; pub mod heartbeat_service; pub mod hummock_service; diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 78745be72003a..8a27bf279802c 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -862,6 +862,29 @@ impl CatalogManager { } commit_meta!(self, tables)?; + // Note that `tables_to_clean` doesn't include sink/index/table_with_source creation, + // because their states are not persisted in the first place, see `start_create_stream_job_procedure`. + let event_logs = tables_to_clean + .iter() + .filter_map(|t| { + if t.table_type == TableType::Internal as i32 { + return None; + } + let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear { + id: t.id, + name: t.name.to_owned(), + definition: t.definition.to_owned(), + error: "clear during recovery".to_string(), + }; + Some(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear( + event, + )) + }) + .collect_vec(); + if !event_logs.is_empty() { + self.env.event_log_manager_ref().add_event_logs(event_logs); + } + database_core.clear_creating_stream_jobs(); let user_core = &mut core.user; for table in &tables_to_clean { @@ -926,11 +949,13 @@ impl CatalogManager { /// It is required because failure may not necessarily happen in barrier, /// e.g. when cordon nodes. /// and we still need some way to cleanup the state. + /// + /// Returns false if `table_id` is not found. pub async fn cancel_create_table_procedure( &self, table_id: TableId, internal_table_ids: Vec, - ) -> MetaResult<()> { + ) -> MetaResult { let table = { let core = &mut self.core.lock().await; let database_core = &mut core.database; @@ -940,7 +965,7 @@ impl CatalogManager { "table_id {} missing when attempting to cancel job, could be cleaned on recovery", table_id ); - return Ok(()); + return Ok(false); }; // `Unspecified` maps to Created state, due to backwards compatibility. // `Created` states should not be cancelled. @@ -984,7 +1009,7 @@ impl CatalogManager { } } - Ok(()) + Ok(true) } /// return id of streaming jobs in the database which need to be dropped by stream manager. diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index b5b9c4d35776a..81bfcfc8d482b 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -24,6 +24,7 @@ use sea_orm::EntityTrait; use super::{SystemParamsManager, SystemParamsManagerRef}; use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef}; use crate::controller::SqlMetaStore; +use crate::manager::event_log::{start_event_log_manager, EventLogManger, EventLogMangerRef}; use crate::manager::{ IdGeneratorManager, IdGeneratorManagerRef, IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef, @@ -56,6 +57,8 @@ pub struct MetaSrvEnv { /// idle status manager. idle_manager: IdleManagerRef, + event_log_manager: EventLogMangerRef, + /// system param manager. system_params_manager: SystemParamsManagerRef, @@ -76,7 +79,7 @@ pub struct MetaSrvEnv { } /// Options shared by all meta service instances -#[derive(Clone)] +#[derive(Clone, serde::Serialize)] pub struct MetaOpts { /// Whether to enable the recovery of the cluster. If disabled, the meta service will exit on /// abnormal cases. @@ -172,6 +175,8 @@ pub struct MetaOpts { pub compaction_task_max_heartbeat_interval_secs: u64, pub compaction_config: Option, + pub event_log_enabled: bool, + pub event_log_channel_max_size: u32, pub advertise_addr: String, } @@ -214,6 +219,8 @@ impl MetaOpts { partition_vnode_count: 32, compaction_task_max_heartbeat_interval_secs: 0, compaction_config: None, + event_log_enabled: false, + event_log_channel_max_size: 1, advertise_addr: "".to_string(), } } @@ -268,6 +275,10 @@ impl MetaSrvEnv { }; let connector_client = ConnectorClient::try_new(opts.connector_rpc_endpoint.as_ref()).await; + let event_log_manager = Arc::new(start_event_log_manager( + opts.event_log_enabled, + opts.event_log_channel_max_size, + )); Ok(Self { id_gen_manager, @@ -276,6 +287,7 @@ impl MetaSrvEnv { notification_manager, stream_client_pool, idle_manager, + event_log_manager, system_params_manager, system_params_controller, cluster_id, @@ -356,6 +368,10 @@ impl MetaSrvEnv { pub fn connector_client(&self) -> Option { self.connector_client.clone() } + + pub fn event_log_manager_ref(&self) -> EventLogMangerRef { + self.event_log_manager.clone() + } } #[cfg(any(test, feature = "test"))] @@ -402,6 +418,8 @@ impl MetaSrvEnv { None }; + let event_log_manager = Arc::new(EventLogManger::for_test()); + Self { id_gen_manager, meta_store, @@ -409,6 +427,7 @@ impl MetaSrvEnv { notification_manager, stream_client_pool, idle_manager, + event_log_manager, system_params_manager, system_params_controller, cluster_id, diff --git a/src/meta/src/manager/event_log.rs b/src/meta/src/manager/event_log.rs new file mode 100644 index 0000000000000..df7bf3daa14cf --- /dev/null +++ b/src/meta/src/manager/event_log.rs @@ -0,0 +1,172 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use std::time::SystemTime; + +use parking_lot::RwLock; +use risingwave_pb::meta::event_log::{Event as PbEvent, Event}; +use risingwave_pb::meta::EventLog as PbEventLog; +use tokio::task::JoinHandle; + +pub type EventLogMangerRef = Arc; +type EventLogSender = tokio::sync::mpsc::Sender; +type ShutdownSender = tokio::sync::oneshot::Sender<()>; + +/// Channel determines expiration strategy. +/// +/// Currently all channels apply the same one strategy: keep latest N events. +/// +/// Currently each event type has its own channel. +type ChannelId = u32; +type Channel = VecDeque; +type EventStoreRef = Arc>>; + +/// Spawns a task that's responsible for event log insertion and expiration. +pub fn start_event_log_manager(enabled: bool, event_log_channel_max_size: u32) -> EventLogManger { + use futures::FutureExt; + const BUFFER_SIZE: usize = 1024; + let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::(BUFFER_SIZE); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); + let shutdown_rx_shared = shutdown_rx.shared(); + let event_logs: EventStoreRef = Arc::new(Default::default()); + let event_logs_shared = event_logs.clone(); + let worker_loop = async move { + if !enabled { + return; + } + loop { + futures::select_biased! { + _ = shutdown_rx_shared.clone().fuse() => { + tracing::info!("event log worker is stopped"); + return; + }, + event_log = event_rx.recv().fuse() => { + let Some(event_log) = event_log else { + tracing::info!("event log worker is stopped"); + return; + }; + let mut write = event_logs_shared.write(); + let channel_id: ChannelId = (&event_log).into(); + let channel = write.entry(channel_id).or_default(); + channel.push_back(event_log); + // Apply expiration strategies. + keep_latest_n(channel, event_log_channel_max_size as _); + }, + } + } + }; + let worker_join_handle = tokio::spawn(worker_loop); + EventLogManger::new( + event_tx, + (worker_join_handle, shutdown_tx), + enabled, + event_logs, + ) +} + +struct EventLog { + payload: PbEventLog, +} + +pub struct EventLogManger { + event_tx: EventLogSender, + worker_join_handle: RwLock, ShutdownSender)>>, + enabled: bool, + event_logs: EventStoreRef, +} + +impl EventLogManger { + fn new( + event_tx: EventLogSender, + worker_join_handle: (JoinHandle<()>, ShutdownSender), + enabled: bool, + event_logs: EventStoreRef, + ) -> Self { + if !enabled { + tracing::info!("event log is disabled"); + } + Self { + event_tx, + worker_join_handle: RwLock::new(Some(worker_join_handle)), + enabled, + event_logs, + } + } + + #[cfg(any(test, feature = "test"))] + pub fn for_test() -> Self { + let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1); + Self { + event_tx, + worker_join_handle: Default::default(), + enabled: false, + event_logs: Arc::new(Default::default()), + } + } + + pub fn take_join_handle(&self) -> Option<(JoinHandle<()>, ShutdownSender)> { + self.worker_join_handle.write().take() + } + + pub fn add_event_logs(&self, events: Vec) { + if !self.enabled { + return; + } + let processing_ts = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + for event in events { + let event_log = EventLog { + payload: PbEventLog { + unique_id: Some(uuid::Uuid::new_v4().to_string()), + timestamp: Some(processing_ts), + event: Some(event), + }, + }; + // Intentionally drop event logs if any error of buffer is full. + if self.event_tx.try_send(event_log).is_err() { + tracing::warn!("some event logs have been dropped"); + break; + } + } + } + + pub fn list_event_logs(&self) -> Vec { + self.event_logs + .read() + .values() + .flat_map(|v| v.iter().map(|e| e.payload.to_owned())) + .collect() + } +} + +fn keep_latest_n(channel: &mut Channel, max_n: usize) { + while channel.len() > max_n { + channel.pop_front(); + } +} + +// TODO: avoid manual implementation +impl From<&EventLog> for ChannelId { + fn from(value: &EventLog) -> Self { + match value.payload.event.as_ref().unwrap() { + Event::CreateStreamJobFail(_) => 1, + Event::DirtyStreamJobClear(_) => 2, + Event::MetaNodeStart(_) => 3, + } + } +} diff --git a/src/meta/src/manager/mod.rs b/src/meta/src/manager/mod.rs index e7e5208856bc3..96886eaad0e0e 100644 --- a/src/meta/src/manager/mod.rs +++ b/src/meta/src/manager/mod.rs @@ -15,6 +15,7 @@ mod catalog; mod cluster; mod env; +pub mod event_log; mod id; mod idle; mod notification; diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 776c39b7131fb..f421b7cadc31b 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -486,7 +486,8 @@ impl DdlController { let (ctx, table_fragments) = match result { Ok(r) => r, Err(e) => { - self.cancel_stream_job(&stream_job, internal_tables).await?; + self.cancel_stream_job(&stream_job, internal_tables, Some(&e)) + .await?; return Err(e); } }; @@ -545,7 +546,8 @@ impl DdlController { tracing::error!(id = stream_job.id(), error = ?e, "finish stream job failed") } _ => { - self.cancel_stream_job(&stream_job, internal_tables).await?; + self.cancel_stream_job(&stream_job, internal_tables, Some(&e)) + .await?; } } return Err(e); @@ -767,7 +769,19 @@ impl DdlController { &self, stream_job: &StreamingJob, internal_tables: Vec, + error: Option<&impl ToString>, ) -> MetaResult<()> { + let error = error.map(ToString::to_string).unwrap_or_default(); + let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail { + id: stream_job.id(), + name: stream_job.name(), + definition: stream_job.definition(), + error, + }; + self.env.event_log_manager_ref().add_event_logs(vec![ + risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event), + ]); + let mut creating_internal_table_ids = internal_tables.into_iter().map(|t| t.id).collect_vec(); // 1. cancel create procedure. diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index cb3f2a8807657..693df50f7e7d0 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -64,6 +64,7 @@ use risingwave_pb::hummock::*; use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::cluster_service_client::ClusterServiceClient; +use risingwave_pb::meta::event_log_service_client::EventLogServiceClient; use risingwave_pb::meta::get_reschedule_plan_request::PbPolicy; use risingwave_pb::meta::heartbeat_request::{extra_info, ExtraInfo}; use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient; @@ -1189,6 +1190,12 @@ impl MetaClient { self.inner.core.read().await.sink_coordinate_client.clone() } + pub async fn list_event_log(&self) -> Result> { + let req = ListEventLogRequest::default(); + let resp = self.inner.list_event_log(req).await?; + Ok(resp.event_logs) + } + pub async fn rise_ctl_list_compact_task_assignment( &self, ) -> Result> { @@ -1392,6 +1399,7 @@ struct GrpcMetaClientCore { serving_client: ServingServiceClient, cloud_client: CloudServiceClient, sink_coordinate_client: SinkCoordinationRpcClient, + event_log_client: EventLogServiceClient, } impl GrpcMetaClientCore { @@ -1416,7 +1424,8 @@ impl GrpcMetaClientCore { let system_params_client = SystemParamsServiceClient::new(channel.clone()); let serving_client = ServingServiceClient::new(channel.clone()); let cloud_client = CloudServiceClient::new(channel.clone()); - let sink_coordinate_client = SinkCoordinationServiceClient::new(channel); + let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone()); + let event_log_client = EventLogServiceClient::new(channel); GrpcMetaClientCore { cluster_client, @@ -1434,6 +1443,7 @@ impl GrpcMetaClientCore { serving_client, cloud_client, sink_coordinate_client, + event_log_client, } } } @@ -1868,6 +1878,7 @@ macro_rules! for_all_meta_rpc { ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse } ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse } ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse } + ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse } } }; } diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 4cdb6ad02b364..6d81b4af85e70 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -25,6 +25,7 @@ bytes = "1" educe = "0.4" either = "1" enum-as-inner = "0.6" +fail = "0.5" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } governor = { version = "0.6", default-features = false, features = [ diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index bc7c7f62c00e9..dc8f71eb6ce9f 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -161,6 +161,11 @@ where } async fn run_consumer(self) -> StreamResult<()> { + fail::fail_point!("start_actors_err", |_| Err(anyhow::anyhow!( + "intentional start_actors_err" + ) + .into())); + let id = self.actor_context.id; let span_name = format!("Actor {id}"); diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index eceab4c434eb4..1f420d371622f 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -18,6 +18,7 @@ cfg-or-panic = "0.2" clap = { version = "4", features = ["derive"] } console = "0.15" etcd-client = { workspace = true } +fail = { version = "0.5" } futures = { version = "0.3", default-features = false, features = ["alloc"] } glob = "0.3" itertools = "0.12" diff --git a/src/tests/simulation/tests/integration_tests/recovery/event_log.rs b/src/tests/simulation/tests/integration_tests/recovery/event_log.rs new file mode 100644 index 0000000000000..f2f929c823908 --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/recovery/event_log.rs @@ -0,0 +1,219 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![cfg(madsim)] + +use std::io::Write; +use std::time::Duration; + +use anyhow::Result; +use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration, Session}; + +fn cluster_config() -> Configuration { + let config_path = { + let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); + file.write_all( + "\ +[server] +telemetry_enabled = false +metrics_level = \"Disabled\" +[meta] +event_log_flush_interval_ms = 10\ + " + .as_bytes(), + ) + .expect("failed to write config file"); + file.into_temp_path() + }; + + Configuration { + config_path: ConfigPath::Temp(config_path.into()), + frontend_nodes: 1, + compute_nodes: 1, + meta_nodes: 1, + compactor_nodes: 0, + compute_node_cores: 2, + etcd_timeout_rate: 0.0, + etcd_data_path: None, + } +} + +async fn assert_event_count(mut session: Session, expected_count: u64) { + let event_type = "CREATE_STREAM_JOB_FAIL"; + let count = session + .run(format!( + "select count(*) from rw_event_logs where event_type='{}'", + event_type + )) + .await + .unwrap(); + assert_eq!(count, expected_count.to_string()); +} + +async fn assert_latest_event( + mut session: Session, + name: impl ToString, + definition: impl ToString, + error: impl ToString, +) { + let event_type = "CREATE_STREAM_JOB_FAIL"; + let info = session + .run(format!( + "select info from rw_event_logs where event_type='{}' order by timestamp desc limit 1", + event_type + )) + .await + .unwrap(); + let json = serde_json::from_str::(&info).unwrap(); + let inner = json.get("createStreamJobFail").unwrap(); + // the event log shows the detail of the failed creation. + assert_eq!( + inner.get("name").unwrap().as_str().unwrap(), + name.to_string(), + ); + assert_eq!( + inner + .get("definition") + .unwrap() + .as_str() + .unwrap() + .to_lowercase(), + definition.to_string().to_lowercase() + ); + assert!(inner + .get("error") + .unwrap() + .as_str() + .unwrap() + .to_lowercase() + .find(&error.to_string().to_lowercase()) + .is_some()); +} + +async fn test_create_succ( + session: &mut Session, + expect_failure_count: &mut u64, + create_should_succ: &str, +) { + session.run(create_should_succ).await.unwrap(); + assert_event_count(session.clone(), *expect_failure_count).await; +} + +async fn test_create_fail( + session: &mut Session, + expect_failure_count: &mut u64, + create_should_fail: &str, + name: &str, + error: &str, +) { + let fp_start_actors_err = "start_actors_err"; + fail::cfg(fp_start_actors_err, "return").unwrap(); + session.run(create_should_fail).await.unwrap_err(); + fail::remove(fp_start_actors_err); + *expect_failure_count += 1; + assert_event_count(session.clone(), *expect_failure_count).await; + assert_latest_event(session.clone(), name, create_should_fail, error).await; +} + +/// Tests event log do record info of stream job creation failure, for CREATE TABLE/MV/INDEX/SINK. +#[tokio::test] +async fn test_create_stream_job_fail() -> Result<()> { + let mut cluster = Cluster::start(cluster_config()).await.unwrap(); + let mut session = cluster.start_session(); + const WAIT_RECOVERY_SEC: u64 = 5; + let mut expect_failure_count = 0; + + // create table succeeds. + test_create_succ( + &mut session, + &mut expect_failure_count, + "create table t1 (c int primary key)", + ) + .await; + // create table fails due to injected failure and subsequent recovery. + test_create_fail( + &mut session, + &mut expect_failure_count, + "create table t2 (c int primary key)", + "t2", + "intentional start_actors_err", + ) + .await; + + // wait for cluster recovery. + tokio::time::sleep(Duration::from_secs(WAIT_RECOVERY_SEC)).await; + // create table with source succeeds. + test_create_succ( + &mut session, + &mut expect_failure_count, + "create table ts1 (c int primary key) with (connector = 'datagen', datagen.rows.per.second = '1') format native encode native", + ).await; + test_create_fail( + &mut session, + &mut expect_failure_count, + "create table ts2 (c int primary key) with (connector = 'datagen', datagen.rows.per.second = '1') format native encode native", + "ts2", + "intentional start_actors_err", + ).await; + + tokio::time::sleep(Duration::from_secs(WAIT_RECOVERY_SEC)).await; + // create mv succeeds. + test_create_succ( + &mut session, + &mut expect_failure_count, + "create materialized view mv1 as select * from t1", + ) + .await; + test_create_fail( + &mut session, + &mut expect_failure_count, + "create materialized view mv2 as select * from t1", + "mv2", + "intentional start_actors_err", + ) + .await; + tokio::time::sleep(Duration::from_secs(WAIT_RECOVERY_SEC)).await; + // create sink succeeds succeeds. + test_create_succ( + &mut session, + &mut expect_failure_count, + "create sink s1 as select * from t1 with (connector = 'blackhole', type = 'append-only', force_append_only = 'true')", + ).await; + test_create_fail( + &mut session, + &mut expect_failure_count, + "create sink s2 as select * from t1 with (connector = 'blackhole', type = 'append-only', force_append_only = 'true')", + "s2", + "intentional start_actors_err", + ).await; + + tokio::time::sleep(Duration::from_secs(WAIT_RECOVERY_SEC)).await; + // create index succeeds. + test_create_succ( + &mut session, + &mut expect_failure_count, + "create index i1 on t1(c)", + ) + .await; + test_create_fail( + &mut session, + &mut expect_failure_count, + "create index i2 on t1(c)", + "i2", + "intentional start_actors_err", + ) + .await; + + Ok(()) +} diff --git a/src/tests/simulation/tests/integration_tests/recovery/mod.rs b/src/tests/simulation/tests/integration_tests/recovery/mod.rs index d3c5572c8dea0..186c9e2f7345f 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/mod.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/mod.rs @@ -14,6 +14,7 @@ mod backfill; mod background_ddl; +mod event_log; mod nexmark_recovery; mod pause_on_bootstrap; mod scale_in_when_recovery;