Skip to content

Commit

Permalink
Fix validator mode
Browse files Browse the repository at this point in the history
  • Loading branch information
DogLooksGood committed Jan 11, 2024
1 parent 735e864 commit 67dbf4b
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 132 deletions.
4 changes: 4 additions & 0 deletions core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ mod transport_params;
mod transactor_params;
mod common;
mod accounts;
mod broadcast_frame;
mod tx_state;

pub use transport_params::*;
pub use transactor_params::*;
pub use common::*;
pub use accounts::*;
pub use broadcast_frame::*;
pub use tx_state::*;
2 changes: 1 addition & 1 deletion core/src/types/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl GameAccount {
}
}

pub fn derive_rollbacked_init_account(&self) -> InitAccount {
pub fn derive_checkpoint_init_account(&self) -> InitAccount {
let game_account = self.to_owned();
let Self {
players,
Expand Down
73 changes: 73 additions & 0 deletions core/src/types/broadcast_frame.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use crate::types::PlayerJoin;
use borsh::{BorshDeserialize, BorshSerialize};
use race_api::event::{Event, Message};
use race_api::types::ServerJoin;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::fmt::Display;

use super::TxState;

#[derive(Default, Debug, Clone, PartialEq, Eq, BorshDeserialize, BorshSerialize)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
pub struct BroadcastSync {
pub new_players: Vec<PlayerJoin>,
pub new_servers: Vec<ServerJoin>,
pub transactor_addr: String,
pub access_version: u64,
}

impl BroadcastSync {
pub fn merge(&mut self, other: &Self) {
self.new_players.append(&mut other.new_players.clone());
self.new_servers.append(&mut other.new_servers.clone());
self.access_version = u64::max(self.access_version, other.access_version);
self.transactor_addr = other.transactor_addr.clone();
}
}

#[derive(Debug, Clone, PartialEq, Eq, BorshDeserialize, BorshSerialize)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
pub enum BroadcastFrame {
// Game event
Event {
game_addr: String,
event: Event,
timestamp: u64,
is_history: bool,
},
// Arbitrary message
Message {
game_addr: String,
message: Message,
},
// Transaction state updates
TxState {
tx_state: TxState,
},
// Node state updates
Sync {
sync: BroadcastSync,
},
}

impl Display for BroadcastFrame {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BroadcastFrame::Event { event, .. } => {
write!(f, "BroadcastFrame::Event: {}", event)
}
BroadcastFrame::Message { message, .. } => {
write!(f, "BroadcastFrame::Message: {}", message.sender)
}
BroadcastFrame::TxState { tx_state } => {
write!(f, "BroadcastFrame::TxState: {:?}", tx_state)
}
BroadcastFrame::Sync { sync } => {
write!(f, "BroadcastFrame::Sync: access_version {}", sync.access_version)
}
}
}
}
90 changes: 1 addition & 89 deletions core/src/types/transactor_params.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,12 @@
//! Parameters for interacting with transactor
use crate::encryptor::NodePublicKeyRaw;
use crate::types::PlayerJoin;
use borsh::{BorshDeserialize, BorshSerialize};
use race_api::event::{Event, Message};
use race_api::types::ServerJoin;
use race_api::event::Event;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::fmt::Display;

#[derive(Debug, Clone, PartialEq, Eq, BorshDeserialize, BorshSerialize)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
pub struct ConfirmingPlayer {
id: u64,
addr: String,
position: u16,
balance: u64,
}

impl From<PlayerJoin> for ConfirmingPlayer {
fn from(value: PlayerJoin) -> Self {
Self {
id: value.access_version,
addr: value.addr,
position: value.position,
balance: value.balance,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, BorshDeserialize, BorshSerialize)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
pub enum TxState {
PlayerConfirming {
confirm_players: Vec<ConfirmingPlayer>,
access_version: u64,
},

PlayerConfirmingFailed(u64),

SettleSucceed {
settle_version: u64,
signature: Option<String>,
},
}

#[derive(Debug, Clone, PartialEq, Eq, BorshDeserialize, BorshSerialize)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct AttachGameParams {
Expand Down Expand Up @@ -111,51 +71,3 @@ impl Display for SubscribeEventParams {
write!(f, "SubscribeEventParams")
}
}

#[derive(Debug, Clone, PartialEq, Eq, BorshDeserialize, BorshSerialize)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
pub enum BroadcastFrame {
// Game event
Event {
game_addr: String,
event: Event,
timestamp: u64,
is_history: bool,
},
// Arbitrary message
Message {
game_addr: String,
message: Message,
},
// Transaction state updates
TxState {
tx_state: TxState,
},
// Node state updates
Sync {
new_players: Vec<PlayerJoin>,
new_servers: Vec<ServerJoin>,
transactor_addr: String,
access_version: u64,
},
}

impl Display for BroadcastFrame {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BroadcastFrame::Event { event, .. } => {
write!(f, "BroadcastFrame::Event: {}", event)
}
BroadcastFrame::Message { message, .. } => {
write!(f, "BroadcastFrame::Message: {}", message.sender)
}
BroadcastFrame::TxState { tx_state } => {
write!(f, "BroadcastFrame::TxState: {:?}", tx_state)
}
BroadcastFrame::Sync { access_version, .. } => {
write!(f, "BroadcastFrame::Sync: access_version {}", access_version)
}
}
}
}
42 changes: 42 additions & 0 deletions core/src/types/tx_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::types::PlayerJoin;
use borsh::{BorshDeserialize, BorshSerialize};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Eq, BorshDeserialize, BorshSerialize)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
pub struct ConfirmingPlayer {
id: u64,
addr: String,
position: u16,
balance: u64,
}

impl From<PlayerJoin> for ConfirmingPlayer {
fn from(value: PlayerJoin) -> Self {
Self {
id: value.access_version,
addr: value.addr,
position: value.position,
balance: value.balance,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, BorshDeserialize, BorshSerialize)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
pub enum TxState {
PlayerConfirming {
confirm_players: Vec<ConfirmingPlayer>,
access_version: u64,
},

PlayerConfirmingFailed(u64),

SettleSucceed {
settle_version: u64,
signature: Option<String>,
},
}
9 changes: 6 additions & 3 deletions facade/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! It is supposed to be used for testing and developing.
use borsh::BorshSerialize;
use clap::{Command, arg};
use clap::{arg, Command};
use hyper::Method;
use jsonrpsee::server::{AllowHosts, ServerBuilder, ServerHandle};
use jsonrpsee::types::Params;
Expand Down Expand Up @@ -617,9 +617,12 @@ async fn serve(params: Params<'_>, context: Arc<Mutex<Context>>) -> RpcResult<()
.get_mut(&game_addr)
.ok_or(custom_error(Error::GameAccountNotFound))?;

let new_access_version = account.access_version + 1;

if account.transactor_addr.is_none() {
is_transactor = true;
account.transactor_addr = Some(server_addr.clone());
account.checkpoint_access_version = new_access_version;
}

let server_account = servers
Expand All @@ -642,11 +645,11 @@ async fn serve(params: Params<'_>, context: Arc<Mutex<Context>>) -> RpcResult<()
DEFAULT_MAX_SERVERS as _,
)));
} else {
account.access_version += 1;
account.access_version = new_access_version;
account.servers.push(ServerJoin::new(
server_addr.clone(),
server_account.endpoint.clone(),
account.access_version,
new_access_version,
verify_key,
));
}
Expand Down
1 change: 0 additions & 1 deletion js/sdk-core/src/app-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ export class AppClient extends BaseClient {
const client = new Client(playerAddr, encryptor, connection);
const handler = await Handler.initialize(gameBundle, encryptor, client, decryptionCache);
const gameContext = new GameContext(gameAccount);
gameContext.applyCheckpoint(gameContext.checkpointAccessVersion, gameContext.settleVersion);
const token = await transport.getToken(gameAccount.tokenAddr);
if (token === undefined) {
throw SdkError.tokenNotFound(gameAccount.tokenAddr);
Expand Down
29 changes: 19 additions & 10 deletions transactor/src/component/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use race_api::event::Event;
use race_core::types::{BroadcastFrame, TxState};
use race_core::types::{BroadcastFrame, BroadcastSync, TxState};
use tokio::sync::{broadcast, Mutex};
use tracing::{debug, error};

Expand Down Expand Up @@ -35,6 +35,7 @@ pub struct Checkpoint {

#[derive(Debug)]
pub struct EventBackupGroup {
pub sync: BroadcastSync,
pub events: LinkedList<EventBackup>,
pub checkpoint: Checkpoint,
pub settle_version: u64,
Expand Down Expand Up @@ -84,6 +85,9 @@ impl Broadcaster {

for group in event_backup_groups.iter() {
if group.settle_version >= settle_version {
histories.push(BroadcastFrame::Sync {
sync: group.sync.clone()
});
for event in group.events.iter() {
histories.push(BroadcastFrame::Event {
game_addr: self.id.clone(),
Expand Down Expand Up @@ -131,6 +135,7 @@ impl Component<ConsumerPorts, BroadcasterContext> for Broadcaster {
};

event_backup_groups.push_back(EventBackupGroup {
sync: Default::default(),
events: LinkedList::new(),
checkpoint,
access_version,
Expand All @@ -145,19 +150,15 @@ impl Component<ConsumerPorts, BroadcasterContext> for Broadcaster {
debug!("Failed to broadcast event: {:?}", e);
}
}
TxState::PlayerConfirming {
..
} => {
TxState::PlayerConfirming { .. } => {
let r = ctx.broadcast_tx.send(BroadcastFrame::TxState { tx_state });

if let Err(e) = r {
debug!("Failed to broadcast event: {:?}", e);
}
}
TxState::PlayerConfirmingFailed(_) => {
let r = ctx.broadcast_tx.send(BroadcastFrame::TxState {
tx_state,
});
let r = ctx.broadcast_tx.send(BroadcastFrame::TxState { tx_state });

if let Err(e) = r {
debug!("Failed to broadcast event: {:?}", e);
Expand Down Expand Up @@ -187,6 +188,7 @@ impl Component<ConsumerPorts, BroadcasterContext> for Broadcaster {
if event_backup_groups.len() > 10 {
event_backup_groups.pop_front();
}
drop(event_backup_groups);

let r = ctx.broadcast_tx.send(BroadcastFrame::Event {
game_addr: ctx.id.clone(),
Expand All @@ -203,16 +205,23 @@ impl Component<ConsumerPorts, BroadcasterContext> for Broadcaster {
EventFrame::Sync {
new_servers,
new_players,
transactor_addr,
access_version,
transactor_addr,
} => {
let broadcast_frame = BroadcastFrame::Sync {
let sync = BroadcastSync {
new_players,
new_servers,
transactor_addr,
access_version,
transactor_addr,
};

let mut event_backup_groups = ctx.event_backup_groups.lock().await;
if let Some(current) = event_backup_groups.back_mut() {
current.sync.merge(&sync);
}
drop(event_backup_groups);

let broadcast_frame = BroadcastFrame::Sync { sync };
let r = ctx.broadcast_tx.send(broadcast_frame);

if let Err(e) = r {
Expand Down
Loading

0 comments on commit 67dbf4b

Please sign in to comment.