diff --git a/api/src/effect.rs b/api/src/effect.rs index a1f6f78d..178d4f88 100644 --- a/api/src/effect.rs +++ b/api/src/effect.rs @@ -289,6 +289,11 @@ impl Effect { self.timestamp } + /// Cancel current dispatched event. + pub fn cancel_dispatch(&mut self) { + self.cancel_dispatch = true; + } + /// Dispatch waiting timeout event after certain milliseconds. pub fn wait_timeout(&mut self, timeout: u64) { self.wait_timeout = Some(timeout); diff --git a/api/src/event.rs b/api/src/event.rs index 4afe2608..dc88bbe2 100644 --- a/api/src/event.rs +++ b/api/src/event.rs @@ -136,7 +136,7 @@ impl std::fmt::Display for Event { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Event::Custom { sender, raw } => write!(f, "Custom from {}, inner: {:?}", sender, raw), - Event::Bridge { dest, raw } => write!(f, "Bridge to {}, inner: {:?}", dest, raw), + Event::Bridge { dest, raw } => write!(f, "Bridge to {}, inner: [{}...]", dest, raw[0]), Event::Ready => write!(f, "Ready"), Event::ShareSecrets { sender, shares } => { let repr = shares diff --git a/core/src/types/broadcast_frame.rs b/core/src/types/broadcast_frame.rs index a57161b6..afa53829 100644 --- a/core/src/types/broadcast_frame.rs +++ b/core/src/types/broadcast_frame.rs @@ -36,7 +36,7 @@ pub enum BroadcastFrame { game_addr: String, event: Event, timestamp: u64, - is_history: bool, + remain: u16, }, // Arbitrary message Message { diff --git a/dev/scripts/create-mtt-game.sh b/dev/scripts/create-mtt-game.sh index 67e54d58..8f18a491 100644 --- a/dev/scripts/create-mtt-game.sh +++ b/dev/scripts/create-mtt-game.sh @@ -12,8 +12,7 @@ DATA=$(cd ./js/borsh; npx ts-node ./bin/cli.ts \ -u8 50 \ -u8 30 \ -u8 20 \ - -u8 1 \ - -s GvMiN5CouVeELHbvyzZXfAqa21TU2rMSRTdiHwCHUTz5) + -u8 0) echo "DATA is $DATA" JSON=$(cat < { console.log('Initialize state with', initAccount); await this.__handler.initState(this.__gameContext, initAccount); + console.log('State initialized'); await this.__invokeEventCallback(undefined, true); } async __getGameAccount(): Promise { let retries = 0; while (true) { + if (retries === MAX_RETRIES) { + throw new Error(`Game account not found, after ${retries} retries`); + } try { const gameAccount = await this.__transport.getGameAccount(this.gameAddr); - if (gameAccount === undefined) continue; + if (gameAccount === undefined) { + retries += 1; + continue; + } return gameAccount; } catch (e: any) { console.warn(e, 'Failed to fetch game account, will retry in 3s'); await new Promise(r => setTimeout(r, 3000)); - if (retries === MAX_RETRIES) { - throw new Error(`Game account not found, after ${retries} retries`); - } else { - retries += 1; - continue; - } + retries += 1; + continue; } } } @@ -285,23 +290,12 @@ export class BaseClient { this.__gameContext.prepareForNextEvent(timestamp); try { let context = new GameContext(this.__gameContext); - if (event instanceof Join) { - while (true) { - let gameAccount = await this.__transport.getGameAccount(this.__gameAddr); - if (gameAccount === undefined) { - console.warn('Failed to get game account, will retry'); - await new Promise(r => setTimeout(r, 3000)); - continue; - } - break; - } - } await this.__handler.handleEvent(context, event); this.__gameContext = context; } catch (err: any) { console.error(err); } - await this.__invokeEventCallback(event, frame.isHistory); + await this.__invokeEventCallback(event, frame.remain !== 0); } catch (e: any) { console.log("Game context in error:", this.__gameContext); throw e; diff --git a/js/sdk-core/src/connection.ts b/js/sdk-core/src/connection.ts index f94f6f1d..8116a4a9 100644 --- a/js/sdk-core/src/connection.ts +++ b/js/sdk-core/src/connection.ts @@ -1,7 +1,7 @@ import { IEncryptor, PublicKeyRaws } from './encryptor'; import { TxState } from './tx-state'; import { GameEvent } from './events'; -import { deserialize, array, enums, field, option, serialize, struct, variant } from '@race-foundation/borsh'; +import { deserialize, array, enums, field, serialize, struct, variant } from '@race-foundation/borsh'; import { arrayBufferToBase64, base64ToUint8Array } from './utils'; import { PlayerJoin, ServerJoin } from './accounts'; @@ -26,11 +26,6 @@ interface ISubmitMessageParams { content: string; } -interface ICheckTxStateParams { - newPlayers: string[]; - accessVersion: bigint; -} - export type ConnectionSubscriptionItem = BroadcastFrame | ConnectionState | undefined; export type ConnectionSubscription = AsyncGenerator; @@ -95,8 +90,8 @@ export class BroadcastFrameEvent extends BroadcastFrame { event!: GameEvent; @field('u64') timestamp!: bigint; - @field('bool') - isHistory!: boolean; + @field('u16') + remain!: number; constructor(fields: any) { super(); Object.assign(this, fields); @@ -222,7 +217,7 @@ export class Connection implements IConnection { } async connect(params: SubscribeEventParams) { - console.log('Establishing server connection, settle version:', params.settleVersion); + console.log(`Establishing server connection, target: ${this.target}, settle version: ${params.settleVersion}`) this.socket = new WebSocket(this.endpoint); this.clearCheckTimer(); diff --git a/js/sdk-core/src/game-context.ts b/js/sdk-core/src/game-context.ts index 531f0bbe..fee61c65 100644 --- a/js/sdk-core/src/game-context.ts +++ b/js/sdk-core/src/game-context.ts @@ -492,6 +492,6 @@ export class GameContext { } findSubGame(subId: number): LaunchSubGame | undefined { - return this.launchSubGames.find(g => g.subId === subId); + return this.launchSubGames.find(g => g.subId === Number(subId)); } } diff --git a/js/sdk-core/src/handler.ts b/js/sdk-core/src/handler.ts index 55e09067..9debbf2a 100644 --- a/js/sdk-core/src/handler.ts +++ b/js/sdk-core/src/handler.ts @@ -49,6 +49,7 @@ export class InitAccount { this.checkpoint = fields.checkpoint; this.entryType = fields.entryType; } + static createFromGameAccount( gameAccount: GameAccount, transactorAccessVersion: bigint, diff --git a/js/sdk-core/src/sub-client.ts b/js/sdk-core/src/sub-client.ts index 288858c4..694bae99 100644 --- a/js/sdk-core/src/sub-client.ts +++ b/js/sdk-core/src/sub-client.ts @@ -1,10 +1,10 @@ import { BaseClient } from './base-client'; import { Client } from './client'; -import { IConnection } from './connection'; +import { IConnection, SubscribeEventParams } from './connection'; import { DecryptionCache } from './decryption-cache'; import { IEncryptor } from './encryptor'; import { GameContext } from './game-context'; -import { Handler } from './handler'; +import { Handler, IInitAccount, InitAccount } from './handler'; import { ITransport } from './transport'; import { GameInfo, ConnectionStateCallbackFunction, EventCallbackFunction, MessageCallbackFunction, TxStateCallbackFunction } from './types'; import { IWallet } from './wallet'; @@ -25,21 +25,44 @@ export type SubClientCtorOpts = { encryptor: IEncryptor; info: GameInfo; decryptionCache: DecryptionCache; + initAccount: InitAccount; } export class SubClient extends BaseClient { - #subId: number; + __subId: number; + __initAccount: InitAccount; constructor(opts: SubClientCtorOpts) { super({ onLoadProfile: (_id: bigint, _addr: string) => {}, ...opts }) - this.#subId = opts.subId; + this.__subId = opts.subId; + this.__initAccount = opts.initAccount; } get subId(): number { - return this.#subId; + return this.__subId; + } + + /** + * Connect to the transactor and retrieve the event stream. + */ + async attachGame() { + console.groupCollapsed('Attach to game'); + let sub; + try { + await this.__client.attachGame(); + sub = this.__connection.subscribeEvents(); + await this.__connection.connect(new SubscribeEventParams({ settleVersion: this.__gameContext.settleVersion })); + await this.__initializeState(this.__initAccount); + } catch (e) { + console.error('Attaching game failed', e); + throw e; + } finally { + console.groupEnd(); + } + await this.__processSubscription(sub); } } diff --git a/transactor/src/component.rs b/transactor/src/component.rs index 80768131..299db106 100644 --- a/transactor/src/component.rs +++ b/transactor/src/component.rs @@ -16,6 +16,7 @@ pub use event_bus::CloseReason; pub use broadcaster::Broadcaster; pub use common::Component; pub use common::PortsHandle; +pub use common::ComponentEnv; pub use connection::{LocalConnection, RemoteConnection}; pub use event_bus::EventBus; pub use event_loop::EventLoop; diff --git a/transactor/src/component/broadcaster.rs b/transactor/src/component/broadcaster.rs index 1d68a826..32575ed1 100644 --- a/transactor/src/component/broadcaster.rs +++ b/transactor/src/component/broadcaster.rs @@ -13,7 +13,7 @@ use tracing::{debug, error}; use crate::component::common::{Component, ConsumerPorts}; use crate::frame::EventFrame; -use super::CloseReason; +use super::{CloseReason, ComponentEnv}; /// Backup events in memeory, for new connected clients. The /// `settle_version` and `access_version` are the values at the time @@ -88,12 +88,15 @@ impl Broadcaster { histories.push(BroadcastFrame::Sync { sync: group.sync.clone() }); + let cnt = group.events.len(); + let mut i = cnt as _; for event in group.events.iter() { + i -= 1; histories.push(BroadcastFrame::Event { game_addr: self.id.clone(), event: event.event.clone(), timestamp: event.timestamp, - is_history: true, + remain: i, }) } } @@ -105,11 +108,11 @@ impl Broadcaster { #[async_trait] impl Component for Broadcaster { - fn name(&self) -> &str { + fn name() -> &'static str { "Broadcaster" } - async fn run(mut ports: ConsumerPorts, ctx: BroadcasterContext) -> CloseReason { + async fn run(mut ports: ConsumerPorts, ctx: BroadcasterContext, env: ComponentEnv) -> CloseReason { while let Some(event) = ports.recv().await { match event { EventFrame::SendMessage { message } => { @@ -120,7 +123,7 @@ impl Component for Broadcaster { if let Err(e) = r { // Usually it means no receivers - debug!("Failed to broadcast event: {:?}", e); + debug!("{} Failed to broadcast event: {:?}", env.log_prefix, e); } } EventFrame::Checkpoint { @@ -147,21 +150,21 @@ impl Component for Broadcaster { let r = ctx.broadcast_tx.send(BroadcastFrame::TxState { tx_state }); if let Err(e) = r { - debug!("Failed to broadcast event: {:?}", e); + debug!("{} Failed to broadcast event: {:?}", env.log_prefix, e); } } TxState::PlayerConfirming { .. } => { let r = ctx.broadcast_tx.send(BroadcastFrame::TxState { tx_state }); if let Err(e) = r { - debug!("Failed to broadcast event: {:?}", e); + debug!("{} Failed to broadcast event: {:?}", env.log_prefix, e); } } TxState::PlayerConfirmingFailed(_) => { let r = ctx.broadcast_tx.send(BroadcastFrame::TxState { tx_state }); if let Err(e) = r { - debug!("Failed to broadcast event: {:?}", e); + debug!("{} Failed to broadcast event: {:?}", env.log_prefix, e); } } }, @@ -171,7 +174,7 @@ impl Component for Broadcaster { settle_version, timestamp, } => { - debug!("Broadcaster receive event: {}", event); + debug!("{} Broadcaster receive event: {}", env.log_prefix, event); let mut event_backup_groups = ctx.event_backup_groups.lock().await; if let Some(current) = event_backup_groups.back_mut() { @@ -182,7 +185,7 @@ impl Component for Broadcaster { timestamp, }); } else { - error!("Received event without checkpoint"); + error!("{} Received event without checkpoint", env.log_prefix); } // Keep 10 groups at most if event_backup_groups.len() > 10 { @@ -194,12 +197,12 @@ impl Component for Broadcaster { game_addr: ctx.id.clone(), event, timestamp, - is_history: false, + remain: 0, }); if let Err(e) = r { // Usually it means no receivers - debug!("Failed to broadcast event: {:?}", e); + debug!("{} Failed to broadcast event: {:?}", env.log_prefix, e); } } EventFrame::Sync { @@ -225,7 +228,7 @@ impl Component for Broadcaster { let r = ctx.broadcast_tx.send(broadcast_frame); if let Err(e) = r { - debug!("Failed to broadcast node updates: {:?}", e); + debug!("{} Failed to broadcast node updates: {:?}", env.log_prefix, e); } } EventFrame::Shutdown => { @@ -255,7 +258,7 @@ mod tests { .build(); let (broadcaster, ctx) = Broadcaster::init(game_account.addr.clone()); - let handle = broadcaster.start(ctx); + let handle = broadcaster.start("", ctx); let mut rx = broadcaster.get_broadcast_rx(); // BroadcastFrame::Event @@ -277,7 +280,7 @@ mod tests { sender: alice.id(), raw: "CUSTOM EVENT".into(), }, - is_history: false, + remain: 0, }; handle.send_unchecked(event_frame).await; @@ -294,7 +297,7 @@ mod tests { balance: 100, access_version: 10, verify_key: "alice".into(), - }], + }.into()], access_version: 10, }; let event_frame = EventFrame::TxState { diff --git a/transactor/src/component/common.rs b/transactor/src/component/common.rs index 5eacae2c..70e274aa 100644 --- a/transactor/src/component/common.rs +++ b/transactor/src/component/common.rs @@ -5,9 +5,9 @@ use tokio::{ }; use tracing::{info, warn}; -use crate::frame::EventFrame; +use crate::{frame::EventFrame, utils::addr_shorthand}; -use super::event_bus::CloseReason; +use super::{event_bus::CloseReason}; /// An interface for a component that can be attached to the event bus. pub trait Attachable { @@ -30,16 +30,16 @@ pub struct PortsHandleInner { } pub struct PortsHandle { - pub name: String, + pub id: String, input_tx: Option>, output_rx: Option>, join_handle: JoinHandle, } impl PortsHandle { - fn from_inner>(name: S, value: PortsHandleInner, join_handle: JoinHandle) -> Self { + fn from_inner>(id: S, value: PortsHandleInner, join_handle: JoinHandle) -> Self { Self { - name: name.into(), + id: id.into(), input_tx: value.input_tx, output_rx: value.output_rx, join_handle, @@ -73,7 +73,7 @@ impl PortsHandle { impl Attachable for PortsHandle { fn id(&self) -> &str { - self.name.as_str() + self.id.as_str() } fn input(&mut self) -> Option> { @@ -214,19 +214,31 @@ impl Ports for PipelinePorts { } } +pub struct ComponentEnv { + pub log_prefix: String, +} + +impl ComponentEnv { + pub fn new(addr: &str, component_name: &str) -> Self { + let addr_short = addr_shorthand(addr); + Self { log_prefix: format!("[{}|{}]", addr_short, component_name)} + } +} + #[async_trait] pub trait Component where P: Ports + 'static, C: Send + 'static, { - fn name(&self) -> &str; + fn name() -> &'static str; - fn start(&self, context: C) -> PortsHandle { - info!("Starting component: {}", self.name()); + fn start(&self, addr: &str, context: C) -> PortsHandle { + info!("Starting component: {}", Self::name()); let (ports, ports_handle_inner) = P::create(); - let join_handle = tokio::spawn(async move { Self::run(ports, context).await }); - PortsHandle::from_inner(self.name(), ports_handle_inner, join_handle) + let env = ComponentEnv::new(addr, Self::name()); + let join_handle = tokio::spawn(async move { Self::run(ports, context, env).await }); + PortsHandle::from_inner(Self::name(), ports_handle_inner, join_handle) } - async fn run(ports: P, context: C) -> CloseReason; + async fn run(ports: P, context: C, env: ComponentEnv) -> CloseReason; } diff --git a/transactor/src/component/connection.rs b/transactor/src/component/connection.rs index 1a10aecb..6821d679 100644 --- a/transactor/src/component/connection.rs +++ b/transactor/src/component/connection.rs @@ -20,15 +20,13 @@ use jsonrpsee::{ rpc_params, ws_client::{WsClient, WsClientBuilder}, }; +use race_api::error::{Error, Result}; +use race_core::types::{BroadcastFrame, SubscribeEventParams}; use race_core::{ connection::ConnectionT, encryptor::EncryptorT, types::{AttachGameParams, ExitGameParams, SubmitEventParams}, }; -use race_api::error::{Error, Result}; -use race_core::{ - types::{BroadcastFrame, SubscribeEventParams}, -}; use crate::frame::EventFrame; use crate::utils::base64_decode; diff --git a/transactor/src/component/event_bridge.rs b/transactor/src/component/event_bridge.rs index e5cc4e76..eff6fb13 100644 --- a/transactor/src/component/event_bridge.rs +++ b/transactor/src/component/event_bridge.rs @@ -1,11 +1,11 @@ +use crate::frame::{EventFrame, SignalFrame}; ///! The component to bridge two event buses, typically to be used ///! between the parent game and the sub games. use async_trait::async_trait; use tokio::sync::{broadcast, mpsc}; +use tracing::{info, log::error}; -use crate::frame::{EventFrame, SignalFrame}; - -use super::{common::PipelinePorts, CloseReason, Component}; +use super::{common::PipelinePorts, CloseReason, Component, ComponentEnv}; #[allow(dead_code)] pub struct EventBridgeParentContext { @@ -90,32 +90,56 @@ impl EventBridgeParent { #[async_trait] impl Component for EventBridgeParent { - fn name(&self) -> &str { + fn name() -> &'static str { "Event Bridge (Parent)" } - async fn run(mut ports: PipelinePorts, mut ctx: EventBridgeParentContext) -> CloseReason { + async fn run( + mut ports: PipelinePorts, + mut ctx: EventBridgeParentContext, + env: ComponentEnv, + ) -> CloseReason { while let Some((from_bridge, event_frame)) = Self::read_event(&mut ports, &mut ctx.rx).await { if from_bridge { - ports.send(event_frame).await; + match event_frame { + EventFrame::SendBridgeEvent { dest, event } => { + info!("{} Receives event: {}", env.log_prefix, event); + ports + .send(EventFrame::RecvBridgeEvent { dest, event }) + .await; + } + _ => (), + } } else { match event_frame { EventFrame::LaunchSubGame { spec } => { let f = SignalFrame::LaunchSubGame { spec: *spec }; if let Err(e) = ctx.signal_tx.send(f).await { - println!("Failed to send: {}", e); + error!("{} Failed to send: {}", env.log_prefix, e); } } EventFrame::Shutdown => { - if let Err(e) = ctx.tx.send(event_frame) { - println!("Failed to send: {}", e); + if !ctx.tx.is_empty() { + info!("{} Sends Shutdown", env.log_prefix); + if let Err(e) = ctx.tx.send(event_frame) { + error!("{} Failed to send: {}", env.log_prefix, e); + } } break; } - EventFrame::BridgeEvent { .. } | EventFrame::Sync { .. } => { + EventFrame::SendBridgeEvent { dest, .. } if dest != 0 => { + info!("{} Sends event: {}", env.log_prefix, event_frame); if let Err(e) = ctx.tx.send(event_frame) { - println!("Failed to send: {}", e); + error!("{} Failed to send: {}", env.log_prefix, e); + } + } + EventFrame::Sync { .. } => { + if !ctx.tx.is_empty() { + info!("{} Sends event: {}", env.log_prefix, event_frame); + if let Err(e) = ctx.tx.send(event_frame) { + error!("{} Failed to send: {}", env.log_prefix, e); + } } } _ => continue, @@ -156,21 +180,41 @@ impl EventBridgeChild { #[async_trait] impl Component for EventBridgeChild { - fn name(&self) -> &str { + fn name() -> &'static str { "Event Bridge (Child)" } - async fn run(mut ports: PipelinePorts, mut ctx: EventBridgeChildContext) -> CloseReason { + async fn run( + mut ports: PipelinePorts, + mut ctx: EventBridgeChildContext, + env: ComponentEnv, + ) -> CloseReason { while let Some((from_bridge, event_frame)) = Self::read_event(&mut ports, &mut ctx.rx).await { if from_bridge { - ports.send(event_frame).await; + match event_frame { + EventFrame::Shutdown => { + info!("{} Receives Shutdown, quit", env.log_prefix); + ports.send(event_frame).await; + break; + } + EventFrame::Sync { .. } => { + info!("{} Receives event: {}", env.log_prefix, event_frame); + ports.send(event_frame).await; + } + EventFrame::SendBridgeEvent { dest, event } if dest == ctx.sub_id => { + info!("{} Receives event: {}", env.log_prefix, event); + ports.send(EventFrame::RecvBridgeEvent { dest, event }).await; + } + _ => {} + } } else { match event_frame { EventFrame::Shutdown => break, - EventFrame::BridgeEvent { .. } => { + EventFrame::SendBridgeEvent { dest, .. } if dest != ctx.sub_id => { + info!("{} Sends event: {}", env.log_prefix, event_frame); if let Err(e) = ctx.tx.send(event_frame).await { - println!("Failed to send: {:?}", e); + error!("{} Failed to send: {}", env.log_prefix, e); } } _ => continue, diff --git a/transactor/src/component/event_bus.rs b/transactor/src/component/event_bus.rs index 0621f367..3ad17b8d 100644 --- a/transactor/src/component/event_bus.rs +++ b/transactor/src/component/event_bus.rs @@ -6,15 +6,52 @@ use tracing::{error, warn}; use crate::component::common::Attachable; use crate::frame::EventFrame; +use crate::utils::addr_shorthand; /// An event bus that passes the events between different components. pub struct EventBus { + #[allow(unused)] + addr: String, tx: mpsc::Sender, attached_txs: Arc)>>>, close_rx: watch::Receiver, } impl EventBus { + pub fn new(addr: String) -> Self { + let (close_tx, close_rx) = watch::channel(false); + let (tx, mut rx) = mpsc::channel::(32); + let txs: Arc)>>> = Arc::new(Mutex::new(vec![])); + let attached_txs = txs.clone(); + let addr_1 = addr_shorthand(&addr); + + tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + let txs = attached_txs.lock().await; + for (id, t) in txs.iter() { + if t.send(msg.clone()).await.is_err() { + warn!( + "[{}] Failed to send message: {} to component: {}", + addr_1, + msg, + id + ); + } + } + if matches!(msg, EventFrame::Shutdown) { + close_tx.send(true).unwrap(); + break; + } + } + }); + Self { + addr, + tx, + attached_txs: txs, + close_rx, + } + } + pub async fn attach(&self, attachable: &mut T) where T: Attachable, @@ -64,30 +101,7 @@ impl EventBus { impl Default for EventBus { fn default() -> Self { - let (close_tx, close_rx) = watch::channel(false); - let (tx, mut rx) = mpsc::channel::(32); - let txs: Arc)>>> = Arc::new(Mutex::new(vec![])); - let attached_txs = txs.clone(); - - tokio::spawn(async move { - while let Some(msg) = rx.recv().await { - let txs = attached_txs.lock().await; - for (id, t) in txs.iter() { - if t.send(msg.clone()).await.is_err() { - warn!("Failed to send message: {} to component: {}", msg, id); - } - } - if matches!(msg, EventFrame::Shutdown) { - close_tx.send(true).unwrap(); - break; - } - } - }); - Self { - tx, - attached_txs: txs, - close_rx, - } + Self::new("".to_string()) } } diff --git a/transactor/src/component/event_loop.rs b/transactor/src/component/event_loop.rs index a515f81d..4463fa4c 100644 --- a/transactor/src/component/event_loop.rs +++ b/transactor/src/component/event_loop.rs @@ -11,9 +11,10 @@ use crate::component::common::{Component, PipelinePorts}; use crate::component::event_bus::CloseReason; use crate::component::wrapped_handler::WrappedHandler; use crate::frame::EventFrame; -use crate::utils::addr_shorthand; use race_core::types::{ClientMode, GameAccount, GamePlayer, SubGameSpec}; +use super::ComponentEnv; + fn log_execution_context(ctx: &GameContext, evt: &Event) { info!("Execution context"); info!("===== State ====="); @@ -43,12 +44,9 @@ async fn handle( event: Event, ports: &PipelinePorts, mode: ClientMode, + env: &ComponentEnv, ) -> Option { - info!( - "{} Handle event: {}", - addr_shorthand(game_context.get_game_addr()), - event - ); + info!("{} Handle event: {}", env.log_prefix, event); let access_version = game_context.get_access_version(); let settle_version = game_context.get_settle_version(); @@ -92,8 +90,7 @@ async fn handle( info!( "{} Send settlements, settle_version: {}", - addr_shorthand(game_context.get_game_addr()), - settle_version + env.log_prefix, settle_version ); ports @@ -125,7 +122,7 @@ async fn handle( // Emit bridge events if mode == ClientMode::Transactor { for be in effects.bridge_events { - let ef = EventFrame::BridgeEvent { + let ef = EventFrame::SendBridgeEvent { dest: be.dest, event: Event::Bridge { dest: be.dest, @@ -137,11 +134,7 @@ async fn handle( } } Err(e) => { - warn!( - "{} Handle event error: {}", - addr_shorthand(game_context.get_game_addr()), - e.to_string() - ); + warn!("{} Handle event error: {}", env.log_prefix, e.to_string()); log_execution_context(game_context, &event); match e { Error::WasmExecutionError(_) | Error::WasmMemoryOverflow => { @@ -190,11 +183,15 @@ async fn read_event( #[async_trait] impl Component for EventLoop { - fn name(&self) -> &str { + fn name() -> &'static str { "Event Loop" } - async fn run(mut ports: PipelinePorts, ctx: EventLoopContext) -> CloseReason { + async fn run( + mut ports: PipelinePorts, + ctx: EventLoopContext, + env: ComponentEnv, + ) -> CloseReason { let mut handler = ctx.handler; let mut game_context = ctx.game_context; @@ -209,22 +206,20 @@ impl Component for EventLoop { if let Err(e) = game_context .apply_checkpoint(init_account.access_version, init_account.settle_version) { - error!("Failed to apply checkpoint: {:?}", e); + error!("{} Failed to apply checkpoint: {:?}", env.log_prefix, e); ports.send(EventFrame::Shutdown).await; return CloseReason::Fault(e); } if let Err(e) = handler.init_state(&mut game_context, &init_account) { - error!("Failed to initialize state: {:?}", e); + error!("{} Failed to initialize state: {:?}", env.log_prefix, e); ports.send(EventFrame::Shutdown).await; return CloseReason::Fault(e); } info!( "{} Initialize game state, access_version = {}, settle_version = {}", - addr_shorthand(&init_account.addr), - init_account.access_version, - init_account.settle_version + env.log_prefix, init_account.access_version, init_account.settle_version ); game_context.dispatch_safe(Event::Ready, 0); @@ -241,7 +236,7 @@ impl Component for EventLoop { if ctx.mode == ClientMode::Transactor { let event = Event::GameStart; if let Some(close_reason) = - handle(&mut handler, &mut game_context, event, &ports, ctx.mode).await + handle(&mut handler, &mut game_context, event, &ports, ctx.mode, &env).await { ports.send(EventFrame::Shutdown).await; return close_reason; @@ -256,7 +251,8 @@ impl Component for EventLoop { transactor_addr, } => { info!( - "Event loop handle Sync, access_version: {:?}", + "{} handle Sync, access_version: {:?}", + env.log_prefix, access_version ); game_context.set_access_version(access_version); @@ -283,7 +279,7 @@ impl Component for EventLoop { players: new_players_1, }; if let Some(close_reason) = - handle(&mut handler, &mut game_context, event, &ports, ctx.mode).await + handle(&mut handler, &mut game_context, event, &ports, ctx.mode, &env).await { ports.send(EventFrame::Shutdown).await; return close_reason; @@ -294,18 +290,21 @@ impl Component for EventLoop { if let Ok(player_id) = game_context.addr_to_id(&player_addr) { let event = Event::Leave { player_id }; if let Some(close_reason) = - handle(&mut handler, &mut game_context, event, &ports, ctx.mode).await + handle(&mut handler, &mut game_context, event, &ports, ctx.mode, &env).await { ports.send(EventFrame::Shutdown).await; return close_reason; } } else { - error!("Ignore PlayerLeaving, due to can not map the address to id"); + error!( + "{} Ignore PlayerLeaving, due to can not map the address to id", + env.log_prefix + ); } } - EventFrame::SendEvent { event } => { + EventFrame::SendEvent { event } | EventFrame::RecvBridgeEvent { event, .. } => { if let Some(close_reason) = - handle(&mut handler, &mut game_context, event, &ports, ctx.mode).await + handle(&mut handler, &mut game_context, event, &ports, ctx.mode, &env).await { ports.send(EventFrame::Shutdown).await; return close_reason; @@ -317,14 +316,14 @@ impl Component for EventLoop { ports.send(EventFrame::Shutdown).await; return CloseReason::Complete; } else if let Some(close_reason) = - handle(&mut handler, &mut game_context, event, &ports, ctx.mode).await + handle(&mut handler, &mut game_context, event, &ports, ctx.mode, &env).await { ports.send(EventFrame::Shutdown).await; return close_reason; } } EventFrame::Shutdown => { - warn!("Shutdown event loop"); + warn!("{} Shutdown event loop", env.log_prefix); return CloseReason::Complete; } _ => (), diff --git a/transactor/src/component/submitter.rs b/transactor/src/component/submitter.rs index 2c98f8a0..f5ba8009 100644 --- a/transactor/src/component/submitter.rs +++ b/transactor/src/component/submitter.rs @@ -13,6 +13,7 @@ use crate::component::event_bus::CloseReason; use crate::frame::EventFrame; use race_core::transport::TransportT; +use super::ComponentEnv; use super::common::PipelinePorts; /// Squash two settles into one. @@ -94,11 +95,11 @@ impl Submitter { #[async_trait] impl Component for Submitter { - fn name(&self) -> &str { + fn name() -> &'static str { "Submitter" } - async fn run(mut ports: PipelinePorts, ctx: SubmitterContext) -> CloseReason { + async fn run(mut ports: PipelinePorts, ctx: SubmitterContext, env: ComponentEnv) -> CloseReason { let (queue_tx, mut queue_rx) = mpsc::channel::(32); let p = ports.clone_as_producer(); // Start a task to handle settlements @@ -152,7 +153,8 @@ impl Component for Submitter { .await; if let Err(e) = res { error!( - "Submitter failed to send settle to task queue: {}", + "{} Submitter failed to send settle to task queue: {}", + env.log_prefix, e.to_string() ); } diff --git a/transactor/src/component/subscriber.rs b/transactor/src/component/subscriber.rs index f2b7ed3e..cdd6a800 100644 --- a/transactor/src/component/subscriber.rs +++ b/transactor/src/component/subscriber.rs @@ -15,6 +15,7 @@ use tracing::warn; use crate::frame::EventFrame; +use super::ComponentEnv; use super::common::{Component, ProducerPorts}; use super::{event_bus::CloseReason, RemoteConnection}; @@ -53,11 +54,11 @@ impl Subscriber { #[async_trait] impl Component for Subscriber { - fn name(&self) -> &str { + fn name() -> &'static str { "Subscriber" } - async fn run(ports: ProducerPorts, ctx: SubscriberContext) -> CloseReason { + async fn run(ports: ProducerPorts, ctx: SubscriberContext, env: ComponentEnv) -> CloseReason { let SubscriberContext { game_addr, server_addr: _, @@ -77,8 +78,8 @@ impl Component for Subscriber { Err(e) => { if retries == 3 { error!( - "Failed to subscribe events: {}. Vote on the transactor {} has dropped", - e, transactor_addr + "{} Failed to subscribe events: {}. Vote on the transactor {} has dropped", + env.log_prefix, e, transactor_addr ); ports @@ -88,10 +89,10 @@ impl Component for Subscriber { }) .await; - warn!("Shutdown subscriber"); + warn!("{} Shutdown subscriber", env.log_prefix); return CloseReason::Complete; } else { - error!("Failed to subscribe events: {}, will retry", e); + error!("{} Failed to subscribe events: {}, will retry", env.log_prefix, e); retries += 1; continue; } @@ -99,14 +100,14 @@ impl Component for Subscriber { } }; - info!("Subscription established"); + info!("{} Subscription established", env.log_prefix); pin_mut!(sub); while let Some(frame) = sub.next().await { match frame { // Forward event to event bus BroadcastFrame::Event { event, .. } => { - info!("Receive event: {}", event); + info!("{} Receive event: {}", env.log_prefix, event); if let Err(e) = ports.try_send(EventFrame::SendServerEvent { event }).await { error!("Send server event error: {}", e); break; @@ -115,7 +116,7 @@ impl Component for Subscriber { BroadcastFrame::Sync { sync} => { let BroadcastSync { new_players, new_servers, access_version, transactor_addr } = sync; - info!("Receive Sync broadcast, new_players: {:?}, new_servers: {:?}", new_players, new_servers); + info!("{} Receive Sync broadcast, new_players: {:?}, new_servers: {:?}", env.log_prefix, new_players, new_servers); if let Err(e) = ports .try_send(EventFrame::Sync { new_players, @@ -125,7 +126,7 @@ impl Component for Subscriber { }) .await { - error!("Send update node error: {}", e); + error!("{} Send update node error: {}", env.log_prefix, e); break; } } @@ -138,7 +139,7 @@ impl Component for Subscriber { } } - warn!("Vote for disconnecting"); + warn!("{} Vote for disconnecting", env.log_prefix); ports .send(EventFrame::Vote { votee: transactor_addr, diff --git a/transactor/src/component/synchronizer.rs b/transactor/src/component/synchronizer.rs index 49a4462a..c99c4165 100644 --- a/transactor/src/component/synchronizer.rs +++ b/transactor/src/component/synchronizer.rs @@ -3,10 +3,10 @@ use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use tokio::time::sleep; -use crate::{frame::EventFrame, utils::addr_shorthand}; +use crate::frame::EventFrame; use race_core::{ transport::TransportT, - types::{GameAccount, PlayerJoin, QueryMode, ServerJoin, TxState, ConfirmingPlayer}, + types::{ConfirmingPlayer, GameAccount, PlayerJoin, QueryMode, ServerJoin, TxState}, }; use tracing::{info, warn}; @@ -15,6 +15,8 @@ use crate::component::{ event_bus::CloseReason, }; +use super::ComponentEnv; + pub struct GameSynchronizerContext { transport: Arc, access_version: u64, @@ -48,21 +50,22 @@ impl GameSynchronizer { #[allow(unused_assignments)] #[async_trait] impl Component for GameSynchronizer { - fn name(&self) -> &str { - "Game synchronizer" + fn name() -> &'static str { + "Game Synchronizer" } - async fn run(ports: ProducerPorts, ctx: GameSynchronizerContext) -> CloseReason { + async fn run( + ports: ProducerPorts, + ctx: GameSynchronizerContext, + env: ComponentEnv, + ) -> CloseReason { let mut prev_access_version = ctx.access_version; let mut access_version = ctx.access_version; let mut mode = ctx.mode; let mut num_of_retries = 0; loop { - let state = ctx - .transport - .get_game_account(&ctx.game_addr, mode) - .await; + let state = ctx.transport.get_game_account(&ctx.game_addr, mode).await; if ports.is_tx_closed() { return CloseReason::Complete; @@ -74,7 +77,7 @@ impl Component for GameSynchronizer { if access_version < state.access_version { info!( "{} Synchronizer found confirming state, access_version = {}, settle_version = {}", - addr_shorthand(&ctx.game_addr), + env.log_prefix, state.access_version, state.settle_version, ); @@ -126,7 +129,7 @@ impl Component for GameSynchronizer { if access_version <= state.access_version { info!( "{} Synchronizer found a finalized state, access_version = {}, settle_version = {}", - addr_shorthand(&ctx.game_addr), + env.log_prefix, state.access_version, state.settle_version, ); diff --git a/transactor/src/component/voter.rs b/transactor/src/component/voter.rs index b5889d10..75b36078 100644 --- a/transactor/src/component/voter.rs +++ b/transactor/src/component/voter.rs @@ -12,7 +12,7 @@ use race_core::{ }; use tracing::{info, warn}; -use super::common::{Component, PipelinePorts}; +use super::{common::{Component, PipelinePorts}, ComponentEnv}; use crate::frame::EventFrame; use super::event_bus::CloseReason; @@ -44,11 +44,11 @@ impl Voter { #[async_trait] impl Component for Voter { - fn name(&self) -> &str { + fn name() -> &'static str { "Voter" } - async fn run(mut ports: PipelinePorts, ctx: VoterContext) -> CloseReason { + async fn run(mut ports: PipelinePorts, ctx: VoterContext, env: ComponentEnv) -> CloseReason { while let Some(frame) = ports.recv().await { match frame { EventFrame::Vote { votee, vote_type } => { @@ -64,19 +64,19 @@ impl Component for Voter { let r = ctx.transport.vote(params.clone()).await; match r { Ok(_) | Err(Error::DuplicatedVote) => { - info!("Vote sent"); + info!("{} Vote sent", env.log_prefix); ports.send(EventFrame::Shutdown).await; break; } Err(e) => { - warn!("An error occurred in vote: {:?}, will retry.", e); + warn!("{} An error occurred in vote: {:?}, will retry.", env.log_prefix, e); tokio::time::sleep(Duration::from_secs(3)).await; } } } } EventFrame::Shutdown => { - warn!("Shutdown voter"); + warn!("{} Shutdown voter", env.log_prefix); break; } _ => (), diff --git a/transactor/src/component/wrapped_client.rs b/transactor/src/component/wrapped_client.rs index cfa654ee..a2da1743 100644 --- a/transactor/src/component/wrapped_client.rs +++ b/transactor/src/component/wrapped_client.rs @@ -18,6 +18,7 @@ use race_core::transport::TransportT; use race_core::types::ClientMode; use tracing::{error, warn}; +use super::ComponentEnv; use super::event_bus::CloseReason; pub struct WrappedClient {} @@ -56,11 +57,11 @@ impl WrappedClient { #[async_trait] impl Component for WrappedClient { - fn name(&self) -> &str { + fn name() -> &'static str { "Client" } - async fn run(mut ports: ConsumerPorts, ctx: ClientContext) -> CloseReason { + async fn run(mut ports: ConsumerPorts, ctx: ClientContext, env: ComponentEnv) -> CloseReason { let ClientContext { addr, game_addr, @@ -73,7 +74,7 @@ impl Component for WrappedClient { let mut client = Client::new(addr, game_addr, mode, transport, encryptor, connection); if let Err(e) = client.attach_game().await { - warn!("Failed to attach to game due to error: {:?}", e); + warn!("{} Failed to attach to game due to error: {:?}", env.log_prefix, e); } let mut res = Ok(()); @@ -93,7 +94,7 @@ impl Component for WrappedClient { } } Err(e) => { - error!("Client error: {:?}", e); + error!("{} Client error: {:?}", env.log_prefix, e); res = Err(e); break 'outer; } diff --git a/transactor/src/frame.rs b/transactor/src/frame.rs index fd62fc2b..f2302c36 100644 --- a/transactor/src/frame.rs +++ b/transactor/src/frame.rs @@ -72,7 +72,11 @@ pub enum EventFrame { vote_type: VoteType, }, Shutdown, - BridgeEvent { + SendBridgeEvent { + dest: usize, + event: Event, + }, + RecvBridgeEvent { dest: usize, event: Event, }, @@ -125,8 +129,11 @@ impl std::fmt::Display for EventFrame { EventFrame::Vote { votee, vote_type } => { write!(f, "Vote: to {} for {:?}", votee, vote_type) } - EventFrame::BridgeEvent { dest, event } => { - write!(f, "BridgeEvent: dest {}, event: {}", dest, event) + EventFrame::SendBridgeEvent { dest, event } => { + write!(f, "SendBridgeEvent: dest {}, event: {}", dest, event) + } + EventFrame::RecvBridgeEvent { dest, event } => { + write!(f, "RecvBridgeEvent: dest {}, event: {}", dest, event) } EventFrame::LaunchSubGame { spec } => { write!(f, "LaunchSubGame: {:?}", spec) diff --git a/transactor/src/game_manager.rs b/transactor/src/game_manager.rs index 616bb918..f8b612c0 100644 --- a/transactor/src/game_manager.rs +++ b/transactor/src/game_manager.rs @@ -71,6 +71,7 @@ impl GameManager { Ok(mut handle) => { let mut games = self.games.lock().await; let addr = format!("{}:{}", game_addr, sub_id); + info!("Launch sub game {}", addr); let join_handle = handle.wait(); games.insert(addr.clone(), handle); wait_and_unload(addr, join_handle, self.games.clone(), None); diff --git a/transactor/src/handle.rs b/transactor/src/handle.rs index 463474b8..7a298b8b 100644 --- a/transactor/src/handle.rs +++ b/transactor/src/handle.rs @@ -103,7 +103,7 @@ impl Handle { match self { Handle::Transactor(h) => Ok(&h.broadcaster), Handle::Validator(_) => Err(Error::NotSupportedInValidatorMode), - Handle::SubGame(_) => Err(Error::NotSupportedInSubGameMode), + Handle::SubGame(h) => Ok(&h.broadcaster), } } diff --git a/transactor/src/handle/subgame.rs b/transactor/src/handle/subgame.rs index 5b52c92d..7fb9e1e3 100644 --- a/transactor/src/handle/subgame.rs +++ b/transactor/src/handle/subgame.rs @@ -37,7 +37,7 @@ impl SubGameHandle { let game_addr = spec.game_addr.clone(); let sub_id = spec.sub_id.clone(); let addr = format!("{}:{}", game_addr, sub_id); - let event_bus = EventBus::default(); + let event_bus = EventBus::new(addr.to_string()); let bundle_account = transport .get_game_bundle(&spec.bundle_addr) @@ -53,14 +53,14 @@ impl SubGameHandle { let handler = WrappedHandler::load_by_bundle(&bundle_account, encryptor.clone()).await?; let (broadcaster, broadcaster_ctx) = Broadcaster::init(addr.clone()); - let mut broadcaster_handle = broadcaster.start(broadcaster_ctx); + let mut broadcaster_handle = broadcaster.start(&addr, broadcaster_ctx); let (bridge, bridge_ctx) = bridge_parent.derive_child(sub_id.clone()); - let mut bridge_handle = bridge.start(bridge_ctx); + let mut bridge_handle = bridge.start(&addr, bridge_ctx); let (event_loop, event_loop_ctx) = EventLoop::init(handler, game_context, ClientMode::Transactor); - let mut event_loop_handle = event_loop.start(event_loop_ctx); + let mut event_loop_handle = event_loop.start(&addr, event_loop_ctx); let mut connection = LocalConnection::new(encryptor.clone()); @@ -74,7 +74,7 @@ impl SubGameHandle { encryptor, Arc::new(connection), ); - let mut client_handle = client.start(client_ctx); + let mut client_handle = client.start(&addr, client_ctx); event_bus.attach(&mut client_handle).await; event_bus.attach(&mut bridge_handle).await; diff --git a/transactor/src/handle/transactor.rs b/transactor/src/handle/transactor.rs index ec616df8..129d5016 100644 --- a/transactor/src/handle/transactor.rs +++ b/transactor/src/handle/transactor.rs @@ -67,20 +67,20 @@ impl TransactorHandle { let game_context = GameContext::try_new(game_account)?; let handler = WrappedHandler::load_by_bundle(bundle_account, encryptor.clone()).await?; - let event_bus = EventBus::default(); + let event_bus = EventBus::new(game_account.addr.clone()); let (broadcaster, broadcaster_ctx) = Broadcaster::init(game_account.addr.clone()); - let mut broadcaster_handle = broadcaster.start(broadcaster_ctx); + let mut broadcaster_handle = broadcaster.start(&game_account.addr, broadcaster_ctx); let (bridge, bridge_ctx) = EventBridgeParent::init(signal_tx); - let mut bridge_handle = bridge.start(bridge_ctx); + let mut bridge_handle = bridge.start(&game_account.addr, bridge_ctx); let (event_loop, event_loop_ctx) = EventLoop::init(handler, game_context, ClientMode::Transactor); - let mut event_loop_handle = event_loop.start(event_loop_ctx); + let mut event_loop_handle = event_loop.start(&game_account.addr, event_loop_ctx); let (submitter, submitter_ctx) = Submitter::init(game_account, transport.clone()); - let mut submitter_handle = submitter.start(submitter_ctx); + let mut submitter_handle = submitter.start(&game_account.addr, submitter_ctx); let (synchronizer, synchronizer_ctx) = GameSynchronizer::init(transport.clone(), game_account); @@ -96,7 +96,7 @@ impl TransactorHandle { encryptor, Arc::new(connection), ); - let mut client_handle = client.start(client_ctx); + let mut client_handle = client.start(&game_account.addr, client_ctx); event_bus.attach(&mut broadcaster_handle).await; event_bus.attach(&mut bridge_handle).await; @@ -110,7 +110,7 @@ impl TransactorHandle { event_bus.send(EventFrame::InitState { init_account }).await; event_bus.send(create_init_sync(game_account)?).await; - let mut synchronizer_handle = synchronizer.start(synchronizer_ctx); + let mut synchronizer_handle = synchronizer.start(&game_account.addr, synchronizer_ctx); event_bus.attach(&mut synchronizer_handle).await; Ok(Self { diff --git a/transactor/src/handle/validator.rs b/transactor/src/handle/validator.rs index 43ade47d..d75641f5 100644 --- a/transactor/src/handle/validator.rs +++ b/transactor/src/handle/validator.rs @@ -47,14 +47,14 @@ impl ValidatorHandle { .ok_or(Error::CantFindTransactor)?; info!("Creating components"); - let event_bus = EventBus::default(); + let event_bus = EventBus::new(game_account.addr.clone()); let (bridge, bridge_ctx) = EventBridgeParent::init(signal_tx); - let mut bridge_handle = bridge.start(bridge_ctx); + let mut bridge_handle = bridge.start(&game_account.addr, bridge_ctx); let (event_loop, event_loop_ctx) = EventLoop::init(handler, game_context, ClientMode::Validator); - let mut event_loop_handle = event_loop.start(event_loop_ctx); + let mut event_loop_handle = event_loop.start(&game_account.addr, event_loop_ctx); let connection = Arc::new( RemoteConnection::try_new( @@ -66,7 +66,7 @@ impl ValidatorHandle { ); let (subscriber, subscriber_context) = Subscriber::init(game_account, server_account, connection.clone()); - let mut subscriber_handle = subscriber.start(subscriber_context); + let mut subscriber_handle = subscriber.start(&game_account.addr, subscriber_context); let (client, client_ctx) = WrappedClient::init( server_account.addr.clone(), @@ -76,10 +76,10 @@ impl ValidatorHandle { encryptor, connection, ); - let mut client_handle = client.start(client_ctx); + let mut client_handle = client.start(&game_account.addr, client_ctx); let (voter, voter_ctx) = Voter::init(game_account, server_account, transport.clone()); - let mut voter_handle = voter.start(voter_ctx); + let mut voter_handle = voter.start(&game_account.addr, voter_ctx); event_bus.attach(&mut bridge_handle).await; event_bus.attach(&mut event_loop_handle).await; diff --git a/transactor/src/main.rs b/transactor/src/main.rs index b7839b03..b3b0ebbe 100644 --- a/transactor/src/main.rs +++ b/transactor/src/main.rs @@ -27,6 +27,7 @@ fn cli() -> Command { #[tokio::main] pub async fn main() { let log_format = tracing_subscriber::fmt::format() + .without_time() .with_level(true) .with_target(false) .compact(); diff --git a/transactor/src/server.rs b/transactor/src/server.rs index 503e6c32..33fea729 100644 --- a/transactor/src/server.rs +++ b/transactor/src/server.rs @@ -129,6 +129,7 @@ fn subscribe_event( match context.get_broadcast(&game_addr, settle_version).await { Ok(x) => x, Err(e) => { + warn!("Game not found: {:?}", game_addr); sink.close(SubscriptionClosed::Failed( CallError::Failed(e.into()).into(), )); diff --git a/transactor/src/utils.rs b/transactor/src/utils.rs index d0a1a391..ab2c9244 100644 --- a/transactor/src/utils.rs +++ b/transactor/src/utils.rs @@ -14,5 +14,5 @@ pub fn base64_decode(data: &str) -> Result, race_api::error::Error> { pub fn addr_shorthand(addr: &str) -> String { let l = addr.len(); - format!("[{}]", &addr[(l - 6)..l]) + format!("{}", &addr[(l - 6)..l]) }