Skip to content

Commit

Permalink
feat(event-streaming): API-driven subscription management (#2172)
Browse files Browse the repository at this point in the history
Refactor event streaming to support dynamic client subscriptions over RPC using a unified StreamingManager. The StreamingManager now orchestrates background streamers by initializing a streamer when a client activates it via the RPC API and shutting it down when no longer needed. Legacy fee estimator endpoints have been replaced with streaming RPC methods, and new task manager RPCs for BCH and Tendermint have been added. Additionally, event stream configuration has been migrated from static JSON settings to runtime API initialization for improved flexibility.
  • Loading branch information
mariocynicys authored Feb 6, 2025
1 parent ff0eefc commit 927f84b
Show file tree
Hide file tree
Showing 136 changed files with 4,155 additions and 2,610 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ MM2.json

# mergetool
*.orig
# Dumpster (files not intended for tracking)
hidden

# Ignore containers runtime directories for dockerized tests
# This directory contains temporary data used by Docker containers during tests execution.
# It is recreated from container-state data each time test containers are started,
# and should not be tracked in version control.
.docker/container-runtime/
.docker/container-runtime/
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

106 changes: 32 additions & 74 deletions mm2src/coins/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,16 @@ use async_trait::async_trait;
use bitcrypto::{dhash160, keccak256, ripemd160, sha256};
use common::custom_futures::repeatable::{Ready, Retry, RetryOnError};
use common::custom_futures::timeout::FutureTimerExt;
use common::executor::{abortable_queue::AbortableQueue, AbortOnDropHandle, AbortSettings, AbortableSystem,
AbortedError, SpawnAbortable, Timer};
use common::executor::{abortable_queue::AbortableQueue, AbortSettings, AbortableSystem, AbortedError, SpawnAbortable,
Timer};
use common::log::{debug, error, info, warn};
use common::number_type_casting::SafeTypeCastingNumbers;
use common::{now_sec, small_rng, DEX_FEE_ADDR_RAW_PUBKEY};
use crypto::privkey::key_pair_from_secret;
use crypto::{Bip44Chain, CryptoCtx, CryptoCtxError, GlobalHDAccountArc, KeyPairPolicy};
use derive_more::Display;
use enum_derives::EnumFromStringify;

use ethabi::{Contract, Function, Token};
use ethcore_transaction::tx_builders::TxBuilderError;
use ethcore_transaction::{Action, TransactionWrapper, TransactionWrapperBuilder as UnSignedEthTxBuilder,
Expand All @@ -77,7 +78,6 @@ use futures01::Future;
use http::Uri;
use instant::Instant;
use mm2_core::mm_ctx::{MmArc, MmWeak};
use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus};
use mm2_number::bigdecimal_custom::CheckedDivision;
use mm2_number::{BigDecimal, BigUint, MmNumber};
#[cfg(test)] use mocktopus::macros::*;
Expand Down Expand Up @@ -109,30 +109,30 @@ cfg_wasm32! {
}

use super::{coin_conf, lp_coinfind_or_err, AsyncMutex, BalanceError, BalanceFut, CheckIfMyPaymentSentArgs,
CoinBalance, CoinFutSpawner, CoinProtocol, CoinTransportMetrics, CoinsContext, ConfirmPaymentInput,
EthValidateFeeArgs, FeeApproxStage, FoundSwapTxSpend, HistorySyncState, IguanaPrivKey, MakerSwapTakerCoin,
MarketCoinOps, MmCoin, MmCoinEnum, MyAddressError, MyWalletAddress, NegotiateSwapContractAddrErr,
NumConversError, NumConversResult, PaymentInstructionArgs, PaymentInstructions, PaymentInstructionsErr,
PrivKeyBuildPolicy, PrivKeyPolicyNotAllowed, RawTransactionError, RawTransactionFut,
RawTransactionRequest, RawTransactionRes, RawTransactionResult, RefundError, RefundPaymentArgs,
RefundResult, RewardTarget, RpcClientType, RpcTransportEventHandler, RpcTransportEventHandlerShared,
SearchForSwapTxSpendInput, SendMakerPaymentSpendPreimageInput, SendPaymentArgs, SignEthTransactionParams,
SignRawTransactionEnum, SignRawTransactionRequest, SignatureError, SignatureResult, SpendPaymentArgs,
SwapOps, SwapTxFeePolicy, TakerSwapMakerCoin, TradeFee, TradePreimageError, TradePreimageFut,
TradePreimageResult, TradePreimageValue, Transaction, TransactionDetails, TransactionEnum, TransactionErr,
TransactionFut, TransactionType, TxMarshalingErr, UnexpectedDerivationMethod, ValidateAddressResult,
ValidateFeeArgs, ValidateInstructionsErr, ValidateOtherPubKeyErr, ValidatePaymentError,
ValidatePaymentFut, ValidatePaymentInput, VerificationError, VerificationResult, WaitForHTLCTxSpendArgs,
WatcherOps, WatcherReward, WatcherRewardError, WatcherSearchForSwapTxSpendInput,
WatcherValidatePaymentInput, WatcherValidateTakerFeeInput, WithdrawError, WithdrawFee, WithdrawFut,
WithdrawRequest, WithdrawResult, EARLY_CONFIRMATION_ERR_LOG, INVALID_CONTRACT_ADDRESS_ERR_LOG,
CoinBalance, CoinProtocol, CoinTransportMetrics, CoinsContext, ConfirmPaymentInput, EthValidateFeeArgs,
FeeApproxStage, FoundSwapTxSpend, HistorySyncState, IguanaPrivKey, MakerSwapTakerCoin, MarketCoinOps,
MmCoin, MmCoinEnum, MyAddressError, MyWalletAddress, NegotiateSwapContractAddrErr, NumConversError,
NumConversResult, PaymentInstructionArgs, PaymentInstructions, PaymentInstructionsErr, PrivKeyBuildPolicy,
PrivKeyPolicyNotAllowed, RawTransactionError, RawTransactionFut, RawTransactionRequest, RawTransactionRes,
RawTransactionResult, RefundError, RefundPaymentArgs, RefundResult, RewardTarget, RpcClientType,
RpcTransportEventHandler, RpcTransportEventHandlerShared, SearchForSwapTxSpendInput,
SendMakerPaymentSpendPreimageInput, SendPaymentArgs, SignEthTransactionParams, SignRawTransactionEnum,
SignRawTransactionRequest, SignatureError, SignatureResult, SpendPaymentArgs, SwapOps, SwapTxFeePolicy,
TakerSwapMakerCoin, TradeFee, TradePreimageError, TradePreimageFut, TradePreimageResult,
TradePreimageValue, Transaction, TransactionDetails, TransactionEnum, TransactionErr, TransactionFut,
TransactionType, TxMarshalingErr, UnexpectedDerivationMethod, ValidateAddressResult, ValidateFeeArgs,
ValidateInstructionsErr, ValidateOtherPubKeyErr, ValidatePaymentError, ValidatePaymentFut,
ValidatePaymentInput, VerificationError, VerificationResult, WaitForHTLCTxSpendArgs, WatcherOps,
WatcherReward, WatcherRewardError, WatcherSearchForSwapTxSpendInput, WatcherValidatePaymentInput,
WatcherValidateTakerFeeInput, WeakSpawner, WithdrawError, WithdrawFee, WithdrawFut, WithdrawRequest,
WithdrawResult, EARLY_CONFIRMATION_ERR_LOG, INVALID_CONTRACT_ADDRESS_ERR_LOG,
INVALID_PAYMENT_STATE_ERR_LOG, INVALID_RECEIVER_ERR_LOG, INVALID_SENDER_ERR_LOG, INVALID_SWAP_ID_ERR_LOG};
pub use rlp;
cfg_native! {
use std::path::PathBuf;
}

mod eth_balance_events;
pub mod eth_balance_events;
mod eth_rpc;
#[cfg(test)] mod eth_tests;
#[cfg(target_arch = "wasm32")] mod eth_wasm_tests;
Expand All @@ -153,14 +153,12 @@ use eth_withdraw::{EthWithdraw, InitEthWithdraw, StandardEthWithdraw};
mod nonce;
use nonce::ParityNonce;

mod eip1559_gas_fee;
pub(crate) use eip1559_gas_fee::FeePerGasEstimated;
use eip1559_gas_fee::{BlocknativeGasApiCaller, FeePerGasSimpleEstimator, GasApiConfig, GasApiProvider,
InfuraGasApiCaller};
pub mod fee_estimation;
use fee_estimation::eip1559::{block_native::BlocknativeGasApiCaller, infura::InfuraGasApiCaller,
simple::FeePerGasSimpleEstimator, FeePerGasEstimated, GasApiConfig, GasApiProvider};

pub mod erc20;
use erc20::get_token_decimals;

pub(crate) mod eth_swap_v2;
use eth_swap_v2::{EthPaymentType, PaymentMethod};

Expand Down Expand Up @@ -580,7 +578,8 @@ impl TryFrom<PayForGasParams> for PayForGasOption {

type GasDetails = (U256, PayForGasOption);

#[derive(Debug, Display, EnumFromStringify)]
#[derive(Debug, Display, EnumFromStringify, Serialize, SerializeErrorType)]
#[serde(tag = "error_type", content = "error_data")]
pub enum Web3RpcError {
#[display(fmt = "Transport: {}", _0)]
Transport(String),
Expand Down Expand Up @@ -814,29 +813,6 @@ impl From<PrivKeyBuildPolicy> for EthPrivKeyBuildPolicy {
}
}

/// Gas fee estimator loop context, runs a loop to estimate max fee and max priority fee per gas according to EIP-1559 for the next block
///
/// This FeeEstimatorContext handles rpc requests which start and stop gas fee estimation loop and handles the loop itself.
/// FeeEstimatorContext keeps the latest estimated gas fees to return them on rpc request
pub(crate) struct FeeEstimatorContext {
/// Latest estimated gas fee values
pub(crate) estimated_fees: Arc<AsyncMutex<FeePerGasEstimated>>,
/// Handler for estimator loop graceful shutdown
pub(crate) abort_handler: AsyncMutex<Option<AbortOnDropHandle>>,
}

/// Gas fee estimator creation state
pub(crate) enum FeeEstimatorState {
/// Gas fee estimation not supported for this coin
CoinNotSupported,
/// Platform coin required to be enabled for gas fee estimation for this coin
PlatformCoinRequired,
/// Fee estimator created, use simple internal estimator
Simple(AsyncMutex<FeeEstimatorContext>),
/// Fee estimator created, use provider or simple internal estimator (if provider fails)
Provider(AsyncMutex<FeeEstimatorContext>),
}

/// pImpl idiom.
pub struct EthCoinImpl {
ticker: String,
Expand Down Expand Up @@ -877,8 +853,6 @@ pub struct EthCoinImpl {
/// consisting of the token address and token ID, separated by a comma. This field is essential for tracking the NFT assets
/// information (chain & contract type, amount etc.), where ownership and amount, in ERC1155 case, might change over time.
pub nfts_infos: Arc<AsyncMutex<HashMap<String, NftInfo>>>,
/// Context for eth fee per gas estimator loop. Created if coin supports fee per gas estimation
pub(crate) platform_fee_estimator_state: Arc<FeeEstimatorState>,
/// Config provided gas limits for swap and send transactions
pub(crate) gas_limit: EthGasLimit,
/// Config provided gas limits v2 for swap v2 transactions
Expand Down Expand Up @@ -5359,14 +5333,14 @@ impl EthCoin {
}

/// Get gas base fee and suggest priority tip fees for the next block (see EIP-1559)
pub async fn get_eip1559_gas_fee(&self) -> Web3RpcResult<FeePerGasEstimated> {
pub async fn get_eip1559_gas_fee(&self, use_simple: bool) -> Web3RpcResult<FeePerGasEstimated> {
let coin = self.clone();
let history_estimator_fut = FeePerGasSimpleEstimator::estimate_fee_by_history(&coin);
let ctx =
MmArc::from_weak(&coin.ctx).ok_or_else(|| MmError::new(Web3RpcError::Internal("ctx is null".into())))?;

let gas_api_conf = ctx.conf["gas_api"].clone();
if gas_api_conf.is_null() {
debug!("No eth gas api provider config, using only history estimator");
if gas_api_conf.is_null() || use_simple {
return history_estimator_fut
.await
.map_err(|e| MmError::new(Web3RpcError::Internal(e.to_string())));
Expand Down Expand Up @@ -5403,7 +5377,7 @@ impl EthCoin {
Ok(PayForGasOption::Legacy(LegacyGasPrice { gas_price }))
},
SwapTxFeePolicy::Low | SwapTxFeePolicy::Medium | SwapTxFeePolicy::High => {
let fee_per_gas = coin.get_eip1559_gas_fee().await?;
let fee_per_gas = coin.get_eip1559_gas_fee(false).await?;
let pay_result = match swap_fee_policy {
SwapTxFeePolicy::Low => PayForGasOption::Eip1559(Eip1559FeePerGas {
max_fee_per_gas: fee_per_gas.low.max_fee_per_gas,
Expand Down Expand Up @@ -5559,16 +5533,6 @@ impl EthCoin {
Box::new(fut.boxed().compat())
}

async fn spawn_balance_stream_if_enabled(&self, ctx: &MmArc) -> Result<(), String> {
if let Some(stream_config) = &ctx.event_stream_configuration {
if let EventInitStatus::Failed(err) = EventBehaviour::spawn_if_active(self.clone(), stream_config).await {
return ERR!("Failed spawning balance events. Error: {}", err);
}
}

Ok(())
}

/// Requests the nonce from all available nodes and returns the highest nonce available with the list of nodes that returned the highest nonce.
/// Transactions will be sent using the nodes that returned the highest nonce.
pub fn get_addr_nonce(
Expand Down Expand Up @@ -5695,7 +5659,7 @@ impl EthTxFeeDetails {
impl MmCoin for EthCoin {
fn is_asset_chain(&self) -> bool { false }

fn spawner(&self) -> CoinFutSpawner { CoinFutSpawner::new(&self.abortable_system) }
fn spawner(&self) -> WeakSpawner { self.abortable_system.weak_spawner() }

fn get_raw_transaction(&self, req: RawTransactionRequest) -> RawTransactionFut {
Box::new(get_raw_transaction_impl(self.clone(), req).boxed().compat())
Expand Down Expand Up @@ -6569,7 +6533,6 @@ pub async fn eth_coin_from_conf_and_request(
// all spawned futures related to `ETH` coin will be aborted as well.
let abortable_system = try_s!(ctx.abortable_system.create_subsystem());

let platform_fee_estimator_state = FeeEstimatorState::init_fee_estimator(ctx, conf, &coin_type).await?;
let max_eth_tx_type = get_max_eth_tx_type_conf(ctx, conf, &coin_type).await?;
let gas_limit: EthGasLimit = extract_gas_limit_from_conf(conf)?;
let gas_limit_v2: EthGasLimitV2 = extract_gas_limit_from_conf(conf)?;
Expand Down Expand Up @@ -6597,16 +6560,12 @@ pub async fn eth_coin_from_conf_and_request(
address_nonce_locks,
erc20_tokens_infos: Default::default(),
nfts_infos: Default::default(),
platform_fee_estimator_state,
gas_limit,
gas_limit_v2,
abortable_system,
};

let coin = EthCoin(Arc::new(coin));
coin.spawn_balance_stream_if_enabled(ctx).await?;

Ok(coin)
Ok(EthCoin(Arc::new(coin)))
}

/// Displays the address in mixed-case checksum form
Expand Down Expand Up @@ -7415,7 +7374,6 @@ impl EthCoin {
address_nonce_locks: Arc::clone(&self.address_nonce_locks),
erc20_tokens_infos: Arc::clone(&self.erc20_tokens_infos),
nfts_infos: Arc::clone(&self.nfts_infos),
platform_fee_estimator_state: Arc::clone(&self.platform_fee_estimator_state),
gas_limit: EthGasLimit::default(),
gas_limit_v2: EthGasLimitV2::default(),
abortable_system: self.abortable_system.create_subsystem().unwrap(),
Expand Down
Loading

0 comments on commit 927f84b

Please sign in to comment.