From a1744452ac76b88d9c5d609301b8d11ce6dad1d2 Mon Sep 17 00:00:00 2001 From: Warren He Date: Wed, 9 Mar 2022 16:54:17 -0800 Subject: [PATCH 01/10] dispatcher: decode incoming messages --- runtime-sdk/src/context.rs | 20 ++++++ runtime-sdk/src/dispatcher.rs | 103 ++++++++++++++++++++++++++-- runtime-sdk/src/modules/core/mod.rs | 10 ++- runtime-sdk/src/types/in_msg.rs | 39 +++++++++++ runtime-sdk/src/types/mod.rs | 1 + 5 files changed, 166 insertions(+), 7 deletions(-) create mode 100644 runtime-sdk/src/types/in_msg.rs diff --git a/runtime-sdk/src/context.rs b/runtime-sdk/src/context.rs index 1885b9c31c..41d8f54342 100644 --- a/runtime-sdk/src/context.rs +++ b/runtime-sdk/src/context.rs @@ -453,6 +453,8 @@ pub struct RuntimeBatchContext<'a, R: runtime::Runtime, S: NestedStore> { max_messages: u32, /// Emitted messages. messages: Vec<(roothash::Message, MessageEventHookInvocation)>, + /// Number of processed incoming messages. + in_msgs_processed: usize, /// Per-context values. values: BTreeMap<&'static str, Box>, @@ -490,6 +492,7 @@ impl<'a, R: runtime::Runtime, S: NestedStore> RuntimeBatchContext<'a, R, S> { block_etags: EventTags::new(), max_messages, messages: Vec::new(), + in_msgs_processed: 0, values: BTreeMap::new(), _runtime: PhantomData, } @@ -521,10 +524,25 @@ impl<'a, R: runtime::Runtime, S: NestedStore> RuntimeBatchContext<'a, R, S> { block_etags: EventTags::new(), max_messages: ctx.max_messages, messages: Vec::new(), + in_msgs_processed: 0, values: BTreeMap::new(), _runtime: PhantomData, } } + + // Load how many roothash messages that this runtime handled in this round. This becomes valid + // after the dispatcher finishes executing incoming blocks, and it doesn't get updated during + // the execution of those incoming messages. The dispatcher calls this in order to report back + // to the node how many messages it got through. + pub fn get_in_msgs_processed(&self) -> usize { + self.in_msgs_processed + } + + // Save how many roothash incoming messages that this runtime handled in this round. This is + // for the dispatcher to call, and modules shouldn't need to use this. + pub fn set_in_msgs_processed(&mut self, count: usize) { + self.in_msgs_processed = count; + } } impl<'a, R: runtime::Runtime, S: NestedStore> Context for RuntimeBatchContext<'a, R, S> { @@ -649,6 +667,7 @@ impl<'a, R: runtime::Runtime, S: NestedStore> Context for RuntimeBatchContext<'a _ => remaining_messages, }, messages: Vec::new(), + in_msgs_processed: self.in_msgs_processed, values: BTreeMap::new(), _runtime: PhantomData, }; @@ -893,6 +912,7 @@ impl<'round, 'store, R: runtime::Runtime, S: Store> Context _ => remaining_messages, }, messages: Vec::new(), + in_msgs_processed: 0, values: BTreeMap::new(), _runtime: PhantomData, }; diff --git a/runtime-sdk/src/dispatcher.rs b/runtime-sdk/src/dispatcher.rs index d48a7a1fff..2a64a15f5a 100644 --- a/runtime-sdk/src/dispatcher.rs +++ b/runtime-sdk/src/dispatcher.rs @@ -7,7 +7,7 @@ use std::{ }; use anyhow::anyhow; -use slog::error; +use slog::{error, warn}; use thiserror::Error; use oasis_core_runtime::{ @@ -38,7 +38,10 @@ use crate::{ schedule_control::ScheduleControlHost, storage::{self, NestedStore, Prefix}, types, - types::transaction::{AuthProof, Transaction}, + types::{ + in_msg::IncomingMessageData, + transaction::{AuthProof, Transaction}, + }, }; /// Unique module name. @@ -169,6 +172,16 @@ impl Dispatcher { } } + /// Decode a roothash incoming message's data field. + pub fn decode_in_msg( + in_msg: &roothash::IncomingMessage, + ) -> Result { + let data: types::in_msg::IncomingMessageData = cbor::from_slice(&in_msg.data) + .map_err(|e| modules::core::Error::MalformedIncomingMessageData(in_msg.id, e.into()))?; + data.validate_basic()?; + Ok(data) + } + /// Run the dispatch steps inside a transaction context. This includes the before call hooks, /// the call itself and after call hooks. The after call hooks are called regardless if the call /// succeeds or not. @@ -406,6 +419,49 @@ impl Dispatcher { } } + /// Execute the given roothash incoming message. This includes executing the embedded + /// transaction if there is one. + pub fn execute_in_msg( + ctx: &mut C, + in_msg: &roothash::IncomingMessage, + data: &IncomingMessageData, + tx: &Option, + ) -> Result<(), RuntimeError> { + if let Some(tx) = tx { + let tx_size = match data + .tx + .as_ref() + .unwrap_or_else(|| panic!("incoming message {} has tx but no data.tx", in_msg.id)) + .len() + .try_into() + { + Ok(tx_size) => tx_size, + Err(err) => { + warn!(ctx.get_logger("dispatcher"), "incoming message transaction too large"; "id" => in_msg.id, "err" => ?err); + return Ok(()); + } + }; + // Use the ID as index. + let index = in_msg.id.try_into().unwrap(); + Self::execute_tx(ctx, tx_size, tx.clone(), index)?; + } + Ok(()) + } + + /// Prefetch prefixes for the given roothash incoming message. This includes prefetching the + /// prefixes for the embedded transaction if there is one. + pub fn prefetch_in_msg( + prefixes: &mut BTreeSet, + _in_msg: &roothash::IncomingMessage, + _data: &IncomingMessageData, + tx: &Option, + ) -> Result<(), RuntimeError> { + if let Some(tx) = tx { + Self::prefetch_tx(prefixes, tx.clone())?; + } + Ok(()) + } + fn handle_last_round_messages(ctx: &mut C) -> Result<(), modules::core::Error> { let message_events = ctx.runtime_round_results().messages.clone(); @@ -521,6 +577,8 @@ impl Dispatcher { // Run end block hooks. R::Modules::end_block(&mut ctx); + let in_msgs_count = ctx.get_in_msgs_processed(); + // Commit the context and retrieve the emitted messages. let (block_tags, messages) = ctx.commit(); let (messages, handlers) = messages.into_iter().unzip(); @@ -534,7 +592,7 @@ impl Dispatcher { block_tags: block_tags.into_tags(), batch_weight_limits: None, tx_reject_hashes: vec![], - in_msgs_count: 0, // TODO: Support processing incoming messages. + in_msgs_count, }) } } @@ -544,7 +602,7 @@ impl transaction::dispatcher::Dispatcher for Dispatche &self, rt_ctx: transaction::Context<'_>, batch: &TxnBatch, - _in_msgs: &[roothash::IncomingMessage], + in_msgs: &[roothash::IncomingMessage], ) -> Result { self.execute_batch_common( rt_ctx, @@ -552,8 +610,22 @@ impl transaction::dispatcher::Dispatcher for Dispatche // If prefetch limit is set enable prefetch. let prefetch_enabled = R::PREFETCH_LIMIT > 0; - let mut txs = Vec::with_capacity(batch.len()); let mut prefixes: BTreeSet = BTreeSet::new(); + let mut in_msgs_parsed = Vec::with_capacity(in_msgs.len()); + for in_msg in in_msgs { + let data = Self::decode_in_msg(in_msg).unwrap_or_else(|err| { + warn!(ctx.get_logger("dispatcher"), "incoming message data malformed"; "id" => in_msg.id, "err" => ?err); + IncomingMessageData::noop() + }); + let tx = data.tx.as_ref().and_then(|tx| Self::decode_tx(ctx, tx).map_err(|err| { + warn!(ctx.get_logger("dispatcher"), "incoming message transaction malformed"; "id" => in_msg.id, "err" => ?err); + }).ok()); + if prefetch_enabled { + Self::prefetch_in_msg(&mut prefixes, in_msg, &data, &tx)?; + } + in_msgs_parsed.push((in_msg, data, tx)); + } + let mut txs = Vec::with_capacity(batch.len()); for tx in batch.iter() { let tx_size = tx.len().try_into().map_err(|_| { Error::MalformedTransactionInBatch(anyhow!("transaction too large")) @@ -576,6 +648,12 @@ impl transaction::dispatcher::Dispatcher for Dispatche .prefetch_prefixes(prefixes.into_iter().collect(), R::PREFETCH_LIMIT); } + // Execute incoming messages. + for (in_msg, data, tx) in in_msgs_parsed { + Self::execute_in_msg(ctx, in_msg, &data, &tx)?; + } + ctx.set_in_msgs_processed(in_msgs.len()); + // Execute the batch. let mut results = Vec::with_capacity(batch.len()); for (index, (tx_size, tx)) in txs.into_iter().enumerate() { @@ -591,7 +669,7 @@ impl transaction::dispatcher::Dispatcher for Dispatche &self, rt_ctx: transaction::Context<'_>, batch: &mut TxnBatch, - _in_msgs: &[roothash::IncomingMessage], + in_msgs: &[roothash::IncomingMessage], ) -> Result { let cfg = R::SCHEDULE_CONTROL; let mut tx_reject_hashes = Vec::new(); @@ -599,6 +677,19 @@ impl transaction::dispatcher::Dispatcher for Dispatche let mut result = self.execute_batch_common( rt_ctx, |ctx| -> Result, RuntimeError> { + // Execute incoming messages. + for in_msg in in_msgs { + let data = Self::decode_in_msg(in_msg).unwrap_or_else(|err| { + warn!(ctx.get_logger("dispatcher"), "incoming message data malformed"; "id" => in_msg.id, "err" => ?err); + IncomingMessageData::noop() + }); + let tx = data.tx.as_ref().and_then(|tx| Self::decode_tx(ctx, tx).map_err(|err| { + warn!(ctx.get_logger("dispatcher"), "incoming message transaction malformed"; "id" => in_msg.id, "err" => ?err); + }).ok()); + Self::execute_in_msg(ctx, in_msg, &data, &tx)?; + } + ctx.set_in_msgs_processed(in_msgs.len()); + // Schedule and execute the batch. // // The idea is to keep scheduling transactions as long as we have some space diff --git a/runtime-sdk/src/modules/core/mod.rs b/runtime-sdk/src/modules/core/mod.rs index e6fd012630..8d43853a9f 100644 --- a/runtime-sdk/src/modules/core/mod.rs +++ b/runtime-sdk/src/modules/core/mod.rs @@ -21,7 +21,7 @@ use crate::{ ModuleInfoHandler as _, }, types::{ - token, + in_msg, token, transaction::{self, AddressSpec, AuthProof, Call, CallFormat, UnverifiedTransaction}, }, Runtime, @@ -131,6 +131,14 @@ pub enum Error { #[sdk_error(code = 25)] ReadOnlyTransaction, + #[error("malformed incoming message: {0}")] + #[sdk_error(code = 26)] + MalformedIncomingMessageData(u64, #[source] anyhow::Error), + + #[error("invalid incoming message: {0}")] + #[sdk_error(code = 27)] + InvalidIncomingMessage(#[from] in_msg::Error), + #[error("{0}")] #[sdk_error(transparent)] TxSimulationFailed(#[from] TxSimulationFailure), diff --git a/runtime-sdk/src/types/in_msg.rs b/runtime-sdk/src/types/in_msg.rs new file mode 100644 index 0000000000..390d63fee9 --- /dev/null +++ b/runtime-sdk/src/types/in_msg.rs @@ -0,0 +1,39 @@ +use thiserror::Error; + +/// The latest incoming message format version. +pub const LATEST_INCOMING_MESSAGE_VERSION: u16 = 1; + +/// Error. +#[derive(Debug, Error)] +pub enum Error { + #[error("unsupported version")] + UnsupportedVersion, + #[error("malformed incoming message data: {0}")] + MalformedTransaction(anyhow::Error), +} + +/// Roothash incoming message data. +#[derive(Clone, Debug, cbor::Encode, cbor::Decode)] +pub struct IncomingMessageData { + #[cbor(rename = "v")] + pub version: u16, + /// An embedded transaction (UnverifiedTransaction in runtimes using this SDK). + /// The transaction doesn't need to be from the same account that sent the message. + pub tx: Option>, +} + +impl IncomingMessageData { + pub fn noop() -> Self { + Self { + version: LATEST_INCOMING_MESSAGE_VERSION, + tx: None, + } + } + + pub fn validate_basic(&self) -> Result<(), Error> { + if self.version != LATEST_INCOMING_MESSAGE_VERSION { + return Err(Error::UnsupportedVersion); + } + Ok(()) + } +} diff --git a/runtime-sdk/src/types/mod.rs b/runtime-sdk/src/types/mod.rs index e2eec6deab..1fd25354de 100644 --- a/runtime-sdk/src/types/mod.rs +++ b/runtime-sdk/src/types/mod.rs @@ -2,6 +2,7 @@ pub mod address; pub mod callformat; +pub mod in_msg; pub mod message; pub mod token; pub mod transaction; From 1fb5a8a06fef164ced75949186473999ccc657d7 Mon Sep 17 00:00:00 2001 From: Warren He Date: Fri, 11 Mar 2022 11:22:35 -0800 Subject: [PATCH 02/10] module: add incoming message handler --- runtime-sdk/modules/contracts/src/lib.rs | 1 + runtime-sdk/modules/evm/src/lib.rs | 2 + runtime-sdk/src/dispatcher.rs | 8 +-- runtime-sdk/src/module.rs | 51 +++++++++++++++++++ runtime-sdk/src/modules/accounts/mod.rs | 2 + runtime-sdk/src/modules/consensus/mod.rs | 2 + .../src/modules/consensus_accounts/mod.rs | 5 ++ runtime-sdk/src/modules/core/mod.rs | 2 + runtime-sdk/src/modules/core/test.rs | 1 + runtime-sdk/src/modules/rewards/mod.rs | 2 + runtime-sdk/src/runtime.rs | 5 +- runtime-sdk/src/types/address.rs | 6 +++ .../runtimes/benchmarking/src/runtime/mod.rs | 2 + .../runtimes/simple-keyvalue/src/keyvalue.rs | 1 + 14 files changed, 85 insertions(+), 5 deletions(-) diff --git a/runtime-sdk/modules/contracts/src/lib.rs b/runtime-sdk/modules/contracts/src/lib.rs index 0fd372b506..a6507c57f6 100644 --- a/runtime-sdk/modules/contracts/src/lib.rs +++ b/runtime-sdk/modules/contracts/src/lib.rs @@ -793,5 +793,6 @@ impl module::MigrationHandler for Module { } impl module::TransactionHandler for Module {} +impl module::IncomingMessageHandler for Module {} impl module::BlockHandler for Module {} impl module::InvariantHandler for Module {} diff --git a/runtime-sdk/modules/evm/src/lib.rs b/runtime-sdk/modules/evm/src/lib.rs index 4be7e31d07..357cd9f0ea 100644 --- a/runtime-sdk/modules/evm/src/lib.rs +++ b/runtime-sdk/modules/evm/src/lib.rs @@ -770,6 +770,8 @@ impl module::TransactionHandler for Module { } } +impl module::IncomingMessageHandler for Module {} + impl module::BlockHandler for Module { fn end_block(ctx: &mut C) { // Update the list of historic block hashes. diff --git a/runtime-sdk/src/dispatcher.rs b/runtime-sdk/src/dispatcher.rs index 2a64a15f5a..779cabc81a 100644 --- a/runtime-sdk/src/dispatcher.rs +++ b/runtime-sdk/src/dispatcher.rs @@ -31,7 +31,7 @@ use crate::{ error::{Error as _, RuntimeError}, event::IntoTags, keymanager::{KeyManagerClient, KeyManagerError}, - module::{self, BlockHandler, MethodHandler, TransactionHandler}, + module::{self, BlockHandler, IncomingMessageHandler, MethodHandler, TransactionHandler}, modules, modules::core::API as _, runtime::Runtime, @@ -427,6 +427,7 @@ impl Dispatcher { data: &IncomingMessageData, tx: &Option, ) -> Result<(), RuntimeError> { + R::Modules::execute_in_msg(ctx, in_msg, data, tx)?; if let Some(tx) = tx { let tx_size = match data .tx @@ -452,10 +453,11 @@ impl Dispatcher { /// prefixes for the embedded transaction if there is one. pub fn prefetch_in_msg( prefixes: &mut BTreeSet, - _in_msg: &roothash::IncomingMessage, - _data: &IncomingMessageData, + in_msg: &roothash::IncomingMessage, + data: &IncomingMessageData, tx: &Option, ) -> Result<(), RuntimeError> { + R::Modules::prefetch_in_msg(prefixes, in_msg, data, tx)?; if let Some(tx) = tx { Self::prefetch_tx(prefixes, tx.clone())?; } diff --git a/runtime-sdk/src/module.rs b/runtime-sdk/src/module.rs index 6e9eecfa0e..00c61ffcfd 100644 --- a/runtime-sdk/src/module.rs +++ b/runtime-sdk/src/module.rs @@ -7,6 +7,8 @@ use std::{ use cbor::Encode as _; use impl_trait_for_tuples::impl_for_tuples; +use oasis_core_runtime::consensus::roothash; + use crate::{ context::{Context, TxContext}, dispatcher, error, @@ -16,6 +18,7 @@ use crate::{ storage, storage::{Prefix, Store}, types::{ + in_msg::IncomingMessageData, message::MessageResult, transaction::{self, AuthInfo, Call, Transaction, UnverifiedTransaction}, }, @@ -451,6 +454,54 @@ impl TransactionHandler for Tuple { } } +/// Roothash incoming message handler. +pub trait IncomingMessageHandler { + /// Add storage prefixes to prefetch, except for the prefixes for the embedded transaction. The + /// dispatcher will invoke the method handler for the embedded transaction separately. + fn prefetch_in_msg( + _prefixes: &mut BTreeSet, + _in_msg: &roothash::IncomingMessage, + _data: &IncomingMessageData, + _tx: &Option, + ) -> Result<(), error::RuntimeError> { + Ok(()) + } + + /// Execute an incoming message, except for the embedded transaction. The dispatcher will + /// invoke the transaction and method handlers for the embedded transaction separately. + fn execute_in_msg( + _ctx: &mut C, + _in_msg: &roothash::IncomingMessage, + _data: &IncomingMessageData, + _tx: &Option, + ) -> Result<(), error::RuntimeError> { + Ok(()) + } +} + +#[impl_for_tuples(30)] +impl IncomingMessageHandler for Tuple { + fn prefetch_in_msg( + prefixes: &mut BTreeSet, + in_msg: &roothash::IncomingMessage, + data: &IncomingMessageData, + tx: &Option, + ) -> Result<(), error::RuntimeError> { + for_tuples!( #( Tuple::prefetch_in_msg(prefixes, in_msg, data, tx)?; )* ); + Ok(()) + } + + fn execute_in_msg( + ctx: &mut C, + in_msg: &roothash::IncomingMessage, + data: &IncomingMessageData, + tx: &Option, + ) -> Result<(), error::RuntimeError> { + for_tuples!( #( Tuple::execute_in_msg(ctx, in_msg, data, tx)?; )* ); + Ok(()) + } +} + /// Migration handler. pub trait MigrationHandler { /// Genesis state type. diff --git a/runtime-sdk/src/modules/accounts/mod.rs b/runtime-sdk/src/modules/accounts/mod.rs index b5ee7edf2f..8d6e52df46 100644 --- a/runtime-sdk/src/modules/accounts/mod.rs +++ b/runtime-sdk/src/modules/accounts/mod.rs @@ -935,6 +935,8 @@ impl module::TransactionHandler for Module { } } +impl module::IncomingMessageHandler for Module {} + impl module::BlockHandler for Module { fn end_block(ctx: &mut C) { // Determine the fees that are available for disbursement from the last block. diff --git a/runtime-sdk/src/modules/consensus/mod.rs b/runtime-sdk/src/modules/consensus/mod.rs index eff9b3d0f3..3d57aa88df 100644 --- a/runtime-sdk/src/modules/consensus/mod.rs +++ b/runtime-sdk/src/modules/consensus/mod.rs @@ -372,6 +372,8 @@ impl module::MigrationHandler for Module { impl module::TransactionHandler for Module {} +impl module::IncomingMessageHandler for Module {} + impl module::BlockHandler for Module {} impl module::InvariantHandler for Module {} diff --git a/runtime-sdk/src/modules/consensus_accounts/mod.rs b/runtime-sdk/src/modules/consensus_accounts/mod.rs index f16ce0c711..2ab466a827 100644 --- a/runtime-sdk/src/modules/consensus_accounts/mod.rs +++ b/runtime-sdk/src/modules/consensus_accounts/mod.rs @@ -416,6 +416,11 @@ impl { } +impl + module::IncomingMessageHandler for Module +{ +} + impl module::BlockHandler for Module { diff --git a/runtime-sdk/src/modules/core/mod.rs b/runtime-sdk/src/modules/core/mod.rs index 8d43853a9f..b02137cf10 100644 --- a/runtime-sdk/src/modules/core/mod.rs +++ b/runtime-sdk/src/modules/core/mod.rs @@ -945,6 +945,8 @@ impl module::TransactionHandler for Module { } } +impl module::IncomingMessageHandler for Module {} + impl module::MigrationHandler for Module { type Genesis = Genesis; diff --git a/runtime-sdk/src/modules/core/test.rs b/runtime-sdk/src/modules/core/test.rs index c9afaa7f36..645b1e6629 100644 --- a/runtime-sdk/src/modules/core/test.rs +++ b/runtime-sdk/src/modules/core/test.rs @@ -267,6 +267,7 @@ impl GasWasterModule { } } +impl module::IncomingMessageHandler for GasWasterModule {} impl module::BlockHandler for GasWasterModule {} impl module::TransactionHandler for GasWasterModule {} impl module::MigrationHandler for GasWasterModule { diff --git a/runtime-sdk/src/modules/rewards/mod.rs b/runtime-sdk/src/modules/rewards/mod.rs index 2f6a07e3fd..d1b0dc72bd 100644 --- a/runtime-sdk/src/modules/rewards/mod.rs +++ b/runtime-sdk/src/modules/rewards/mod.rs @@ -138,6 +138,8 @@ impl module::MigrationHandler for Module module::TransactionHandler for Module {} +impl module::IncomingMessageHandler for Module {} + impl module::BlockHandler for Module { fn end_block(ctx: &mut C) { let epoch = ctx.epoch(); diff --git a/runtime-sdk/src/runtime.rs b/runtime-sdk/src/runtime.rs index 8b91baad54..9097d9ec35 100644 --- a/runtime-sdk/src/runtime.rs +++ b/runtime-sdk/src/runtime.rs @@ -17,8 +17,8 @@ use crate::{ crypto, dispatcher, keymanager::{KeyManagerClient, TrustedPolicySigners}, module::{ - BlockHandler, InvariantHandler, MethodHandler, MigrationHandler, ModuleInfoHandler, - TransactionHandler, + BlockHandler, IncomingMessageHandler, InvariantHandler, MethodHandler, MigrationHandler, + ModuleInfoHandler, TransactionHandler, }, modules, storage, }; @@ -43,6 +43,7 @@ pub trait Runtime { type Modules: TransactionHandler + MigrationHandler + MethodHandler + + IncomingMessageHandler + BlockHandler + InvariantHandler + ModuleInfoHandler; diff --git a/runtime-sdk/src/types/address.rs b/runtime-sdk/src/types/address.rs index d4ff0a340f..09e6be284b 100644 --- a/runtime-sdk/src/types/address.rs +++ b/runtime-sdk/src/types/address.rs @@ -250,6 +250,12 @@ impl From
for ConsensusAddress { } } +impl From<&ConsensusAddress> for Address { + fn from(addr: &ConsensusAddress) -> Address { + Address::from_bytes(addr.as_ref()).unwrap() + } +} + #[cfg(test)] mod test { use super::*; diff --git a/tests/runtimes/benchmarking/src/runtime/mod.rs b/tests/runtimes/benchmarking/src/runtime/mod.rs index a0cc649f07..76fe530e9b 100644 --- a/tests/runtimes/benchmarking/src/runtime/mod.rs +++ b/tests/runtimes/benchmarking/src/runtime/mod.rs @@ -203,6 +203,8 @@ impl module::MigrationHandler for Module module::IncomingMessageHandler for Module {} + impl module::TransactionHandler for Module {} impl module::BlockHandler for Module {} diff --git a/tests/runtimes/simple-keyvalue/src/keyvalue.rs b/tests/runtimes/simple-keyvalue/src/keyvalue.rs index 25bb5671f3..54d98f6734 100644 --- a/tests/runtimes/simple-keyvalue/src/keyvalue.rs +++ b/tests/runtimes/simple-keyvalue/src/keyvalue.rs @@ -155,6 +155,7 @@ impl sdk::module::TransactionHandler for Module { } } +impl sdk::module::IncomingMessageHandler for Module {} impl sdk::module::BlockHandler for Module {} impl sdk::module::InvariantHandler for Module {} From 41fd21bb89e37fa200af83e2960897882696fbee Mon Sep 17 00:00:00 2001 From: Warren He Date: Fri, 18 Mar 2022 15:35:02 -0700 Subject: [PATCH 03/10] modules/accounts: mint_into_fee_accumulator --- runtime-sdk/src/modules/accounts/mod.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/runtime-sdk/src/modules/accounts/mod.rs b/runtime-sdk/src/modules/accounts/mod.rs index 8d6e52df46..2b4556d8f8 100644 --- a/runtime-sdk/src/modules/accounts/mod.rs +++ b/runtime-sdk/src/modules/accounts/mod.rs @@ -226,6 +226,12 @@ pub trait API { amount: &token::BaseUnits, ) -> Result<(), modules::core::Error>; + /// Mint new tokens, directly crediting the fee accumulator. + fn mint_into_fee_accumulator( + ctx: &mut C, + amount: &token::BaseUnits, + ) -> Result<(), Error>; + /// Move amount from fee accumulator into address. fn move_from_fee_accumulator( ctx: &mut C, @@ -608,6 +614,24 @@ impl API for Module { Ok(()) } + fn mint_into_fee_accumulator( + ctx: &mut C, + amount: &token::BaseUnits, + ) -> Result<(), Error> { + if ctx.is_simulation() { + return Ok(()); + } + + ctx.value::(CONTEXT_KEY_FEE_ACCUMULATOR) + .or_default() + .add(amount); + + // Increase total supply. + Self::inc_total_supply(ctx.runtime_state(), amount)?; + + Ok(()) + } + fn move_from_fee_accumulator( ctx: &mut C, to: Address, From d39c92c5205003b3500862c880bf15296745ef28 Mon Sep 17 00:00:00 2001 From: Warren He Date: Fri, 11 Mar 2022 12:56:30 -0800 Subject: [PATCH 04/10] modules/consensus_accounts: handle incoming message tokens --- .../src/modules/consensus_accounts/mod.rs | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/runtime-sdk/src/modules/consensus_accounts/mod.rs b/runtime-sdk/src/modules/consensus_accounts/mod.rs index 2ab466a827..1f46f57775 100644 --- a/runtime-sdk/src/modules/consensus_accounts/mod.rs +++ b/runtime-sdk/src/modules/consensus_accounts/mod.rs @@ -4,10 +4,11 @@ //! while keeping track of amount deposited per account. use std::{collections::BTreeSet, convert::TryInto}; +use num_traits::Zero; use once_cell::sync::Lazy; use thiserror::Error; -use oasis_core_runtime::consensus::staking::Account as ConsensusAccount; +use oasis_core_runtime::consensus::{roothash, staking::Account as ConsensusAccount}; use oasis_runtime_sdk_macros::{handler, sdk_derive}; use crate::{ @@ -20,9 +21,10 @@ use crate::{ storage::Prefix, types::{ address::Address, + in_msg::IncomingMessageData, message::{MessageEvent, MessageEventHookInvocation}, token, - transaction::AuthInfo, + transaction::{AuthInfo, Transaction}, }, }; @@ -419,6 +421,40 @@ impl impl module::IncomingMessageHandler for Module { + fn prefetch_in_msg( + _prefixes: &mut BTreeSet, + _in_msg: &roothash::IncomingMessage, + _data: &IncomingMessageData, + _tx: &Option, + ) -> Result<(), error::RuntimeError> { + // todo: their account + Ok(()) + } + + fn execute_in_msg( + ctx: &mut C, + in_msg: &roothash::IncomingMessage, + _data: &IncomingMessageData, + _tx: &Option, + ) -> Result<(), error::RuntimeError> { + if !in_msg.fee.is_zero() { + let amount = token::BaseUnits( + Consensus::amount_from_consensus(ctx, (&in_msg.fee).try_into().unwrap()).unwrap(), + Consensus::consensus_denomination(ctx).unwrap(), + ); + Accounts::mint_into_fee_accumulator(ctx, &amount).unwrap(); + // TODO: Emit event that fee has been paid. + } + if !in_msg.tokens.is_zero() { + let amount = token::BaseUnits( + Consensus::amount_from_consensus(ctx, (&in_msg.tokens).try_into().unwrap()) + .unwrap(), + Consensus::consensus_denomination(ctx).unwrap(), + ); + Accounts::mint(ctx, (&in_msg.caller).into(), &amount).unwrap(); + } + Ok(()) + } } impl module::BlockHandler From 1c718c321eceecc5b853a3444dd6f1bfad5968d2 Mon Sep 17 00:00:00 2001 From: Warren He Date: Wed, 16 Mar 2022 13:33:32 -0700 Subject: [PATCH 05/10] modules/core: add incoming messages gas limit parameter --- runtime-sdk/src/modules/core/mod.rs | 10 ++++++++++ runtime-sdk/src/modules/core/test.rs | 6 ++++++ tests/runtimes/benchmarking/src/lib.rs | 1 + tests/runtimes/simple-consensus/src/lib.rs | 1 + tests/runtimes/simple-contracts/src/lib.rs | 1 + tests/runtimes/simple-evm/src/lib.rs | 1 + tests/runtimes/simple-keyvalue/src/lib.rs | 1 + tests/runtimes/simple-keyvalue/src/test.rs | 1 + 8 files changed, 22 insertions(+) diff --git a/runtime-sdk/src/modules/core/mod.rs b/runtime-sdk/src/modules/core/mod.rs index b02137cf10..5055173533 100644 --- a/runtime-sdk/src/modules/core/mod.rs +++ b/runtime-sdk/src/modules/core/mod.rs @@ -230,6 +230,7 @@ pub struct GasCosts { #[derive(Clone, Debug, Default, cbor::Encode, cbor::Decode)] pub struct Parameters { pub max_batch_gas: u64, + pub max_in_msgs_gas: u64, pub max_tx_size: u32, pub max_tx_signers: u32, pub max_multisig_signers: u32, @@ -258,6 +259,9 @@ pub trait API { /// Returns the remaining batch-wide gas. fn remaining_batch_gas(ctx: &mut C) -> u64; + /// Returns the remaining batch-wide gas that can be used for roothash incoming messages. + fn remaining_in_msgs_gas(ctx: &mut C) -> u64; + /// Return the remaining tx-wide gas. fn remaining_tx_gas(ctx: &mut C) -> u64; @@ -417,6 +421,12 @@ impl API for Module { batch_gas_limit.saturating_sub(*batch_gas_used) } + fn remaining_in_msgs_gas(ctx: &mut C) -> u64 { + let in_msgs_gas_limit = Self::params(ctx.runtime_state()).max_in_msgs_gas; + let batch_gas_used = ctx.value::(CONTEXT_KEY_GAS_USED).or_default(); + in_msgs_gas_limit.saturating_sub(*batch_gas_used) + } + fn remaining_tx_gas(ctx: &mut C) -> u64 { let gas_limit = ctx.tx_auth_info().fee.gas; let gas_used = ctx.tx_value::(CONTEXT_KEY_GAS_USED).or_default(); diff --git a/runtime-sdk/src/modules/core/test.rs b/runtime-sdk/src/modules/core/test.rs index 645b1e6629..55b477417c 100644 --- a/runtime-sdk/src/modules/core/test.rs +++ b/runtime-sdk/src/modules/core/test.rs @@ -28,6 +28,7 @@ fn test_use_gas() { ctx.runtime_state(), Parameters { max_batch_gas: BLOCK_MAX_GAS, + max_in_msgs_gas: BLOCK_MAX_GAS / 4, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, @@ -127,6 +128,7 @@ fn test_query_min_gas_price() { ctx.runtime_state(), Parameters { max_batch_gas: 10000, + max_in_msgs_gas: 2500, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, @@ -299,6 +301,7 @@ impl Runtime for GasWasterRuntime { super::Genesis { parameters: Parameters { max_batch_gas: u64::MAX, + max_in_msgs_gas: u64::MAX, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, @@ -660,6 +663,7 @@ fn test_approve_unverified_tx() { ctx.runtime_state(), Parameters { max_batch_gas: u64::MAX, + max_in_msgs_gas: u64::MAX, max_tx_size: 32 * 1024, max_tx_signers: 2, max_multisig_signers: 2, @@ -759,6 +763,7 @@ fn test_min_gas_price() { ctx.runtime_state(), Parameters { max_batch_gas: u64::MAX, + max_in_msgs_gas: u64::MAX, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, @@ -941,6 +946,7 @@ fn test_gas_used_events() { ctx.runtime_state(), Parameters { max_batch_gas: 1_000_000, + max_in_msgs_gas: 250_000, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, diff --git a/tests/runtimes/benchmarking/src/lib.rs b/tests/runtimes/benchmarking/src/lib.rs index a43c1a3e80..9ee1ed835e 100644 --- a/tests/runtimes/benchmarking/src/lib.rs +++ b/tests/runtimes/benchmarking/src/lib.rs @@ -47,6 +47,7 @@ impl sdk::Runtime for Runtime { modules::core::Genesis { parameters: modules::core::Parameters { max_batch_gas: 10_000_000, + max_in_msgs_gas: 2_500_000, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, diff --git a/tests/runtimes/simple-consensus/src/lib.rs b/tests/runtimes/simple-consensus/src/lib.rs index ddb8687557..d45b9c5595 100644 --- a/tests/runtimes/simple-consensus/src/lib.rs +++ b/tests/runtimes/simple-consensus/src/lib.rs @@ -64,6 +64,7 @@ impl sdk::Runtime for Runtime { modules::core::Genesis { parameters: modules::core::Parameters { max_batch_gas: 10_000, + max_in_msgs_gas: 2_500, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, diff --git a/tests/runtimes/simple-contracts/src/lib.rs b/tests/runtimes/simple-contracts/src/lib.rs index b3d2a1c190..94a7b5210b 100644 --- a/tests/runtimes/simple-contracts/src/lib.rs +++ b/tests/runtimes/simple-contracts/src/lib.rs @@ -72,6 +72,7 @@ impl sdk::Runtime for Runtime { modules::core::Genesis { parameters: modules::core::Parameters { max_batch_gas: 10_000_000, + max_in_msgs_gas: 2_500_000, max_tx_size: 512 * 1024, max_tx_signers: 8, max_multisig_signers: 8, diff --git a/tests/runtimes/simple-evm/src/lib.rs b/tests/runtimes/simple-evm/src/lib.rs index fbceacb3f0..06012b3a92 100644 --- a/tests/runtimes/simple-evm/src/lib.rs +++ b/tests/runtimes/simple-evm/src/lib.rs @@ -77,6 +77,7 @@ impl sdk::Runtime for Runtime { modules::core::Genesis { parameters: modules::core::Parameters { max_batch_gas: 1_000_000, + max_in_msgs_gas: 250_000, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, diff --git a/tests/runtimes/simple-keyvalue/src/lib.rs b/tests/runtimes/simple-keyvalue/src/lib.rs index 59739b489e..45069deb9b 100644 --- a/tests/runtimes/simple-keyvalue/src/lib.rs +++ b/tests/runtimes/simple-keyvalue/src/lib.rs @@ -126,6 +126,7 @@ impl sdk::Runtime for Runtime { modules::core::Genesis { parameters: modules::core::Parameters { max_batch_gas: 2_000, + max_in_msgs_gas: 500, max_tx_size: 32 * 1024, max_tx_signers: 8, max_multisig_signers: 8, diff --git a/tests/runtimes/simple-keyvalue/src/test.rs b/tests/runtimes/simple-keyvalue/src/test.rs index f6be5ebd67..6c5e4478d6 100644 --- a/tests/runtimes/simple-keyvalue/src/test.rs +++ b/tests/runtimes/simple-keyvalue/src/test.rs @@ -16,6 +16,7 @@ fn test_impl_for_tuple() { ctx.runtime_state(), core::Parameters { max_batch_gas: u64::MAX, + max_in_msgs_gas: u64::MAX, max_tx_size: 32 * 1024, max_tx_signers: 1, max_multisig_signers: 1, From 01ce63c2a6f772ea984ebf7aa46786de3d2f9008 Mon Sep 17 00:00:00 2001 From: Warren He Date: Wed, 16 Mar 2022 15:11:28 -0700 Subject: [PATCH 06/10] dispatcher: watch incoming messages gas --- runtime-sdk/src/dispatcher.rs | 48 ++++++++++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/runtime-sdk/src/dispatcher.rs b/runtime-sdk/src/dispatcher.rs index 779cabc81a..75c0a9ab66 100644 --- a/runtime-sdk/src/dispatcher.rs +++ b/runtime-sdk/src/dispatcher.rs @@ -680,17 +680,57 @@ impl transaction::dispatcher::Dispatcher for Dispatche rt_ctx, |ctx| -> Result, RuntimeError> { // Execute incoming messages. + let in_msgs_gas_limit = R::Core::remaining_in_msgs_gas(ctx); + let mut in_msgs_processed = 0usize; for in_msg in in_msgs { let data = Self::decode_in_msg(in_msg).unwrap_or_else(|err| { warn!(ctx.get_logger("dispatcher"), "incoming message data malformed"; "id" => in_msg.id, "err" => ?err); IncomingMessageData::noop() }); - let tx = data.tx.as_ref().and_then(|tx| Self::decode_tx(ctx, tx).map_err(|err| { - warn!(ctx.get_logger("dispatcher"), "incoming message transaction malformed"; "id" => in_msg.id, "err" => ?err); - }).ok()); + let tx = match data.tx.as_ref() { + Some(tx) => { + match Self::decode_tx(ctx, tx) { + Ok(tx) => { + let remaining_gas = R::Core::remaining_in_msgs_gas(ctx); + if remaining_gas < cfg.min_remaining_gas { + // This next message has a transaction, but we won't have + // enough gas to execute it, so leave it for the next + // round and stop. + break; + } else if tx.auth_info.fee.gas > in_msgs_gas_limit { + // The transaction is too large to execute under our + // current parameters, so skip over it. + warn!(ctx.get_logger("dispatcher"), "incoming message transaction fee gas exceeds round gas limit"; + "id" => in_msg.id, + "tx_gas" => tx.auth_info.fee.gas, + "in_msgs_gas_limit" => in_msgs_gas_limit, + ); + // Actually don't skip the message entirely, just don't + // execute the transaction. + None + } else if tx.auth_info.fee.gas > remaining_gas { + // The transaction is too large to execute in this round, + // so leave it for the next round and stop. + break; + } else { + Some(tx) + } + } + Err(err) => { + warn!(ctx.get_logger("dispatcher"), "incoming message transaction malformed"; + "id" => in_msg.id, + "err" => ?err, + ); + None + } + } + } + None => None, + }; Self::execute_in_msg(ctx, in_msg, &data, &tx)?; + in_msgs_processed += 1; } - ctx.set_in_msgs_processed(in_msgs.len()); + ctx.set_in_msgs_processed(in_msgs_processed); // Schedule and execute the batch. // From 57624df487b512769f3dd53c3ae3dca68cc5bcdf Mon Sep 17 00:00:00 2001 From: Warren He Date: Fri, 25 Mar 2022 16:07:33 -0700 Subject: [PATCH 07/10] dispatcher: add note about incoming messages events --- runtime-sdk/src/dispatcher.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/runtime-sdk/src/dispatcher.rs b/runtime-sdk/src/dispatcher.rs index 75c0a9ab66..c5b04c5f2c 100644 --- a/runtime-sdk/src/dispatcher.rs +++ b/runtime-sdk/src/dispatcher.rs @@ -444,6 +444,7 @@ impl Dispatcher { }; // Use the ID as index. let index = in_msg.id.try_into().unwrap(); + // todo: put result tags in block Self::execute_tx(ctx, tx_size, tx.clone(), index)?; } Ok(()) From 6ea7b510c277569a660aaf4888578ba55c5d0f88 Mon Sep 17 00:00:00 2001 From: Warren He Date: Wed, 27 Apr 2022 15:55:36 -0700 Subject: [PATCH 08/10] f gas --- runtime-sdk/src/dispatcher.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/runtime-sdk/src/dispatcher.rs b/runtime-sdk/src/dispatcher.rs index c5b04c5f2c..88a65e528b 100644 --- a/runtime-sdk/src/dispatcher.rs +++ b/runtime-sdk/src/dispatcher.rs @@ -690,15 +690,16 @@ impl transaction::dispatcher::Dispatcher for Dispatche }); let tx = match data.tx.as_ref() { Some(tx) => { + let remaining_gas = R::Core::remaining_in_msgs_gas(ctx); + if remaining_gas < cfg.min_remaining_gas { + // This next message has a transaction, but we won't have + // enough gas to execute it, so leave it for the next + // round and stop. + break; + } match Self::decode_tx(ctx, tx) { Ok(tx) => { - let remaining_gas = R::Core::remaining_in_msgs_gas(ctx); - if remaining_gas < cfg.min_remaining_gas { - // This next message has a transaction, but we won't have - // enough gas to execute it, so leave it for the next - // round and stop. - break; - } else if tx.auth_info.fee.gas > in_msgs_gas_limit { + if tx.auth_info.fee.gas > in_msgs_gas_limit { // The transaction is too large to execute under our // current parameters, so skip over it. warn!(ctx.get_logger("dispatcher"), "incoming message transaction fee gas exceeds round gas limit"; From 5688603151b8456c75038033223a4c4c69c65924 Mon Sep 17 00:00:00 2001 From: Warren He Date: Thu, 5 May 2022 11:44:28 -0700 Subject: [PATCH 09/10] (unfinished) as part of txs --- runtime-sdk/src/dispatcher.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/runtime-sdk/src/dispatcher.rs b/runtime-sdk/src/dispatcher.rs index 88a65e528b..1c74b7f16a 100644 --- a/runtime-sdk/src/dispatcher.rs +++ b/runtime-sdk/src/dispatcher.rs @@ -680,6 +680,8 @@ impl transaction::dispatcher::Dispatcher for Dispatche let mut result = self.execute_batch_common( rt_ctx, |ctx| -> Result, RuntimeError> { + let mut new_batch = Vec::new(); + // Execute incoming messages. let in_msgs_gas_limit = R::Core::remaining_in_msgs_gas(ctx); let mut in_msgs_processed = 0usize; @@ -690,6 +692,12 @@ impl transaction::dispatcher::Dispatcher for Dispatche }); let tx = match data.tx.as_ref() { Some(tx) => { + if new_batch.len() >= cfg.max_tx_count { + // This next message has a transaction, but we'll exceed the + // maximum transaction count, so leave it for the next round and + // stop. + break; + } let remaining_gas = R::Core::remaining_in_msgs_gas(ctx); if remaining_gas < cfg.min_remaining_gas { // This next message has a transaction, but we won't have @@ -738,7 +746,6 @@ impl transaction::dispatcher::Dispatcher for Dispatche // // The idea is to keep scheduling transactions as long as we have some space // available in the block as determined by gas use. - let mut new_batch = Vec::new(); let mut results = Vec::with_capacity(batch.len()); let mut requested_batch_len = cfg.initial_batch_size; 'batch: loop { From 5c2f73c569887c00c4962c1697cbbf83801e9641 Mon Sep 17 00:00:00 2001 From: Warren He Date: Thu, 5 May 2022 11:44:38 -0700 Subject: [PATCH 10/10] (junk) dedup --- runtime-sdk/src/dispatcher.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runtime-sdk/src/dispatcher.rs b/runtime-sdk/src/dispatcher.rs index 1c74b7f16a..b93d69594f 100644 --- a/runtime-sdk/src/dispatcher.rs +++ b/runtime-sdk/src/dispatcher.rs @@ -683,6 +683,7 @@ impl transaction::dispatcher::Dispatcher for Dispatche let mut new_batch = Vec::new(); // Execute incoming messages. + let in_msg_txs = Vec::new(); // todo: more efficient way to do this let in_msgs_gas_limit = R::Core::remaining_in_msgs_gas(ctx); let mut in_msgs_processed = 0usize; for in_msg in in_msgs { @@ -754,6 +755,7 @@ impl transaction::dispatcher::Dispatcher for Dispatche let last_batch_tx_hash = batch.last().map(|raw_tx| Hash::digest_bytes(raw_tx)); for raw_tx in batch.drain(..) { + // todo: skip copies of incoming message txs // If we don't have enough gas for processing even the cheapest transaction // we are done. Same if we reached the runtime-imposed maximum tx count. let remaining_gas = R::Core::remaining_batch_gas(ctx);