Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
DogLooksGood committed Dec 23, 2023
1 parent 8931209 commit cae1e6c
Show file tree
Hide file tree
Showing 16 changed files with 218 additions and 110 deletions.
2 changes: 1 addition & 1 deletion api/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct ActionTimeout {
pub timeout: u64,
}

#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)]
#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq, Clone)]
pub struct LaunchSubGame {
pub id: usize,
pub bundle_addr: String,
Expand Down
71 changes: 46 additions & 25 deletions core/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::HashMap;

use crate::types::{GameAccount, SettleTransferCheckpoint, SubGameSpec };
use crate::types::{GameAccount, SubGameSpec};
use borsh::{BorshDeserialize, BorshSerialize};
use race_api::decision::DecisionState;
use race_api::effect::{Ask, Assign, Effect, Release, Reveal};
use race_api::effect::{Ask, Assign, Effect, LaunchSubGame, Release, Reveal};
use race_api::engine::GameHandler;
use race_api::error::{Error, Result};
use race_api::event::{CustomEvent, Event};
Expand Down Expand Up @@ -92,6 +92,18 @@ impl DispatchEvent {
}
}

/// The effects of an event.
///
/// If the `checkpoint` is Some, a settlement should be sent. If the
/// `launch_sub_games` is not empty, sub games should be launched.
#[derive(Debug)]
pub struct EventEffects {
pub settles: Vec<Settle>,
pub transfers: Vec<Transfer>,
pub checkpoint: Option<Vec<u8>>,
pub launch_sub_games: Vec<LaunchSubGame>,
}

/// The context for public data.
///
/// This information is not transmitted over the network, instead it's
Expand Down Expand Up @@ -145,6 +157,8 @@ pub struct GameContext {
pub(crate) transfers: Option<Vec<Transfer>>,
/// The latest checkpoint state
pub(crate) checkpoint: Option<Vec<u8>>,
/// The sub games to launch
pub(crate) launch_sub_games: Vec<LaunchSubGame>,
}

impl GameContext {
Expand Down Expand Up @@ -191,6 +205,7 @@ impl GameContext {
transfers: None,
handler_state: "".into(),
checkpoint: None,
launch_sub_games: vec![],
})
}

Expand Down Expand Up @@ -444,7 +459,8 @@ impl GameContext {

pub fn add_node(&mut self, node_addr: String, access_version: u64) {
if !self.nodes.iter().any(|n| n.addr.eq(&node_addr)) {
self.nodes.push(Node::new_pending(node_addr, access_version))
self.nodes
.push(Node::new_pending(node_addr, access_version))
}
}

Expand Down Expand Up @@ -594,32 +610,36 @@ impl GameContext {
self.settle_version += 1;
}

pub fn take_settles_and_transfers(&mut self) -> Result<Option<SettleTransferCheckpoint>> {
if let Some(checkpoint) = self.get_checkpoint() {
let mut settles = None;
std::mem::swap(&mut settles, &mut self.settles);
pub fn take_event_effects(&mut self) -> Result<EventEffects> {
let mut settles = vec![];
let mut transfers = vec![];

if let Some(settles) = settles.as_mut() {
settles.sort_by_key(|s| match s.op {
SettleOp::Add(_) => 0,
SettleOp::Sub(_) => 1,
SettleOp::Eject => 2,
SettleOp::AssignSlot(_) => 3,
})
}
if self.checkpoint.is_some() {
self.settles
.take()
.map(|mut s| settles = s.drain(..).collect());

let mut transfers = None;
std::mem::swap(&mut transfers, &mut self.transfers);
self.bump_settle_version();
settles.sort_by_key(|s| match s.op {
SettleOp::Add(_) => 0,
SettleOp::Sub(_) => 1,
SettleOp::Eject => 2,
SettleOp::AssignSlot(_) => 3,
});

Ok(Some((
settles.unwrap_or(vec![]),
transfers.unwrap_or(vec![]),
checkpoint,
)))
} else {
Ok(None)
self.transfers
.take()
.map(|mut t| transfers = t.drain(..).collect());
self.bump_settle_version();
}

let launch_sub_games = self.launch_sub_games.drain(..).collect();

Ok(EventEffects {
settles,
transfers,
checkpoint: self.get_checkpoint(),
launch_sub_games,
})
}

pub fn add_settle(&mut self, settle: Settle) {
Expand Down Expand Up @@ -828,6 +848,7 @@ impl Default for GameContext {
settles: None,
transfers: None,
checkpoint: None,
launch_sub_games: Vec::new(),
}
}
}
1 change: 0 additions & 1 deletion core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ pub fn general_handle_event(
Event::GameStart { access_version } => {
context.set_game_status(GameStatus::Running);
context.set_node_ready(*access_version);
println!("Nodes: {:?}", context.get_nodes());
Ok(())
}

Expand Down
1 change: 0 additions & 1 deletion core/src/types/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ pub struct SubGameSpec {
pub sub_id: usize,
pub bundle_addr: String,
pub init_data: Vec<u8>,
pub players: Vec<GamePlayer>,
pub nodes: Vec<Node>,
pub transactor_addr: String,
}
2 changes: 0 additions & 2 deletions examples/draw-card/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ fn test() -> anyhow::Result<()> {
println!("Initialize handler state");
let mut ctx = GameContext::try_new(&game_account)?;
let mut handler = TestHandler::init_state(&mut ctx, &game_account)?;
assert_eq!(1, ctx.count_players());

// Start game
println!("Start game, without players");
Expand Down Expand Up @@ -66,7 +65,6 @@ fn test() -> anyhow::Result<()> {
handler.handle_event(&mut ctx, &sync_event)?;

{
assert_eq!(2, ctx.count_players());
assert_eq!(GameStatus::Uninit, ctx.get_status());
assert_eq!(
Some(DispatchEvent::new(
Expand Down
5 changes: 4 additions & 1 deletion js/sdk-core/src/game-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
Shutdown,
WaitingTimeout,
} from './events';
import { Effect, Settle, SettleAdd, SettleEject, SettleSub, Transfer } from './effect';
import { Effect, LaunchSubGame, Settle, SettleAdd, SettleEject, SettleSub, Transfer } from './effect';
import { GameAccount, PlayerJoin, ServerJoin } from './accounts';
import { Ciphertext, Digest, Id } from './types';

Expand Down Expand Up @@ -56,6 +56,7 @@ export class GameContext {
transfers: Transfer[] | undefined;
checkpoint: Uint8Array | undefined;
checkpointAccessVersion: bigint;
launchSubGames: LaunchSubGame[];

constructor(context: GameContext);
constructor(gameAccount: GameAccount);
Expand All @@ -78,6 +79,7 @@ export class GameContext {
this.transfers = context.transfers;
this.checkpoint = undefined;
this.checkpointAccessVersion = context.checkpointAccessVersion;
this.launchSubGames = context.launchSubGames;
} else {
const gameAccount = gameAccountOrContext;
const transactorAddr = gameAccount.transactorAddr;
Expand Down Expand Up @@ -110,6 +112,7 @@ export class GameContext {
this.handlerState = Uint8Array.of();
this.checkpoint = undefined;
this.checkpointAccessVersion = gameAccount.checkpointAccessVersion;
this.launchSubGames = [];
}
}

Expand Down
12 changes: 10 additions & 2 deletions transactor/src/component/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use super::event_bus::CloseReason;

/// An interface for a component that can be attached to the event bus.
pub trait Attachable {
fn id(&self) -> &str;

/// Return the input channel of current component.
/// Return `None` when the component does not accept input.
fn input(&mut self) -> Option<mpsc::Sender<EventFrame>>;
Expand All @@ -28,14 +30,16 @@ pub struct PortsHandleInner {
}

pub struct PortsHandle {
pub name: String,
input_tx: Option<mpsc::Sender<EventFrame>>,
output_rx: Option<mpsc::Receiver<EventFrame>>,
join_handle: JoinHandle<CloseReason>,
}

impl PortsHandle {
fn from_inner(value: PortsHandleInner, join_handle: JoinHandle<CloseReason>) -> Self {
fn from_inner<S: Into<String>>(name: S, value: PortsHandleInner, join_handle: JoinHandle<CloseReason>) -> Self {
Self {
name: name.into(),
input_tx: value.input_tx,
output_rx: value.output_rx,
join_handle,
Expand Down Expand Up @@ -68,6 +72,10 @@ impl PortsHandle {
}

impl Attachable for PortsHandle {
fn id(&self) -> &str {
self.name.as_str()
}

fn input(&mut self) -> Option<mpsc::Sender<EventFrame>> {
if self.input_tx.is_some() {
self.input_tx.clone()
Expand Down Expand Up @@ -212,7 +220,7 @@ where
info!("Starting component: {}", self.name());
let (ports, ports_handle_inner) = P::create();
let join_handle = tokio::spawn(async move { Self::run(ports, context).await });
PortsHandle::from_inner(ports_handle_inner, join_handle)
PortsHandle::from_inner(self.name(), ports_handle_inner, join_handle)
}
async fn run(ports: P, context: C) -> CloseReason;
}
4 changes: 4 additions & 0 deletions transactor/src/component/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ impl LocalConnection {
}

impl Attachable for LocalConnection {
fn id(&self) -> &str {
"LocalConnection"
}

fn input(&mut self) -> Option<mpsc::Sender<EventFrame>> {
None
}
Expand Down
71 changes: 48 additions & 23 deletions transactor/src/component/event_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
use async_trait::async_trait;
use tokio::sync::{broadcast, mpsc};

use crate::frame::EventFrame;
use crate::frame::{EventFrame, SignalFrame};

use super::{common::PipelinePorts, CloseReason, Component};

#[allow(dead_code)]
pub struct EventBridgeParentContext {
tx: broadcast::Sender<EventFrame>,
rx: mpsc::Receiver<EventFrame>,
signal_tx: mpsc::Sender<SignalFrame>,
}

#[derive(Clone, Debug)]
Expand All @@ -20,17 +21,17 @@ pub struct EventBridgeParent {
}

pub struct EventBridgeChildContext {
sub_id: usize,
pub sub_id: usize,
tx: mpsc::Sender<EventFrame>,
rx: broadcast::Receiver<EventFrame>,
}

pub struct EventBridgeChild {
sub_id: usize,
pub sub_id: usize,
}

impl EventBridgeParent {
pub fn init() -> (Self, EventBridgeParentContext) {
pub fn init(signal_tx: mpsc::Sender<SignalFrame>) -> (Self, EventBridgeParentContext) {
let (mpsc_tx, mpsc_rx) = mpsc::channel(10);
let (bc_tx, _bc_rx) = broadcast::channel(10);
(
Expand All @@ -41,6 +42,7 @@ impl EventBridgeParent {
EventBridgeParentContext {
tx: bc_tx,
rx: mpsc_rx,
signal_tx,
},
)
}
Expand All @@ -67,19 +69,21 @@ impl EventBridgeParent {
ports: &mut PipelinePorts,
rx: &mut mpsc::Receiver<EventFrame>,
) -> Option<(bool, EventFrame)> {
let (from_bridge, event_frame) = tokio::select! {
tokio::select! {
e = rx.recv() => {
(true, e)
if let Some(e) = e {
Some((true, e))
} else {
None
}
},
e = ports.recv() => {
(false, e)
if let Some(e) = e {
Some((false, e))
} else {
None
}
},
};

if let Some(event_frame) = event_frame {
Some((from_bridge, event_frame))
} else {
None
}
}
}
Expand All @@ -91,14 +95,30 @@ impl Component<PipelinePorts, EventBridgeParentContext> for EventBridgeParent {
}

async fn run(mut ports: PipelinePorts, mut ctx: EventBridgeParentContext) -> CloseReason {
while let Some((from_bridge, event_frame)) = Self::read_event(&mut ports, &mut ctx.rx).await {
while let Some((from_bridge, event_frame)) = Self::read_event(&mut ports, &mut ctx.rx).await
{
if from_bridge {
ports.send(event_frame).await;
} else if matches!(event_frame, EventFrame::Shutdown) {
break;
} else {
if let Err(e) = ctx.tx.send(event_frame) {
println!("Failed to send: {:?}", e);
match event_frame {
EventFrame::LaunchSubGame { spec } => {
let f = SignalFrame::LaunchSubGame { spec };
if let Err(e) = ctx.signal_tx.send(f).await {
println!("Failed to send: {}", e);
}
}
EventFrame::Shutdown => {
if let Err(e) = ctx.tx.send(event_frame) {
println!("Failed to send: {}", e);
}
break;
}
EventFrame::BridgeEvent { .. } | EventFrame::UpdateNodes { .. } => {
if let Err(e) = ctx.tx.send(event_frame) {
println!("Failed to send: {}", e);
}
}
_ => continue,
}
}
}
Expand Down Expand Up @@ -141,14 +161,19 @@ impl Component<PipelinePorts, EventBridgeChildContext> for EventBridgeChild {
}

async fn run(mut ports: PipelinePorts, mut ctx: EventBridgeChildContext) -> CloseReason {
while let Some((from_bridge, event_frame)) = Self::read_event(&mut ports, &mut ctx.rx).await {
while let Some((from_bridge, event_frame)) = Self::read_event(&mut ports, &mut ctx.rx).await
{
if from_bridge {
ports.send(event_frame).await;
} else if matches!(event_frame, EventFrame::Shutdown) {
break;
} else {
if let Err(e) = ctx.tx.send(event_frame).await {
println!("Failed to send: {:?}", e);
match event_frame {
EventFrame::Shutdown => break,
EventFrame::BridgeEvent { .. } => {
if let Err(e) = ctx.tx.send(event_frame).await {
println!("Failed to send: {:?}", e);
}
}
_ => continue,
}
}
}
Expand Down
Loading

0 comments on commit cae1e6c

Please sign in to comment.