Skip to content

Commit

Permalink
feat(operator-pool): check operator balance
Browse files Browse the repository at this point in the history
  • Loading branch information
00nktk committed Nov 12, 2024
1 parent 3a276b7 commit f040a3f
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ impl Executor {
ExecutorBuilder::builder()
}

pub fn pubkey(&self) -> Pubkey {
self.builder.pubkey()
}

async fn initialize_and_start(
params: ExecutorBuilder,
) -> anyhow::Result<(Arc<Self>, impl Future<Output = anyhow::Result<()>>)> {
Expand Down
1 change: 1 addition & 0 deletions operator-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ futures-util.workspace = true
humantime = "2.1.0"
dashmap.workspace = true
async-channel = "2.3.1"
tokio-util = { version = "0.7.12", features = ["time"] }
132 changes: 123 additions & 9 deletions operator-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::Duration;

use clap::Args;
use dashmap::DashMap;
use futures_util::StreamExt;
use humantime::format_duration;
use reth_primitives::Address;
use solana_cli_config::CONFIG_FILE;
Expand All @@ -15,13 +16,15 @@ use solana_sdk::signature::Signature;
use thiserror::Error;
use tokio::sync::oneshot;
use tokio::time;
use tokio_util::time::DelayQueue;

use executor::Executor;
use neon_api::NeonApi;
use operator::Operator;
use solana_api::solana_api::SolanaApi;

const RELOAD_INTERVAL: Duration = Duration::from_secs(60);
const RECHECK_INTERVAL: Duration = Duration::from_secs(20);

type Result<T> = std::result::Result<T, Error>;

Expand Down Expand Up @@ -68,6 +71,14 @@ pub struct Config {
#[arg(long, default_value_t = false)]
/// Create all holders on service start
pub init_holders: bool,

#[arg(long, default_value_t = 9_000_000_000)]
/// Balance below which a warning is issued
pub operator_warn_balance: u64,

#[arg(long, default_value_t = 1_000_000_000)]
/// Minimum balance for operator to become activated
pub operator_minimum_balance: u64,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -110,13 +121,33 @@ impl Bootstrap {
type Sender<T> = async_channel::Sender<T>;
type Receiver<T> = async_channel::Receiver<T>;

#[derive(Debug, Clone, Copy)]
enum OperatorHealth {
Good,
Warn,
Bad,
}

impl OperatorHealth {
/// Returns `true` if the operator health is [`Bad`].
///
/// [`Bad`]: OperatorHealth::Bad
#[must_use]
fn is_bad(&self) -> bool {
matches!(self, Self::Bad)
}
}

#[derive(Debug)]
pub struct OperatorPool {
map: DashMap<Address, PoolEntry>,
queue: (Sender<Address>, Receiver<Address>),
bootstrap: Bootstrap,
deactivated: Sender<Address>,
path: OsString,
prefix: Option<OsString>,
warn_balance: u64,
min_balance: u64,
}

impl OperatorPool {
Expand All @@ -127,6 +158,7 @@ impl OperatorPool {
solana_api: SolanaApi,
pg_pool: db::PgPool,
) -> Result<Arc<Self>> {
assert!(config.operator_warn_balance > config.operator_minimum_balance);
let operators = load_operators(
&config.operator_keypair_path,
config.operator_keypair_prefix.as_deref(),
Expand All @@ -141,6 +173,7 @@ impl OperatorPool {
max_holders: config.max_holders,
init_holders: config.init_holders,
};
let (deactivated, deactivated_rx) = async_channel::bounded(32);
let map = DashMap::new();
let queue = async_channel::unbounded();
let mut operator_order = Vec::new();
Expand All @@ -160,11 +193,14 @@ impl OperatorPool {
map,
queue,
bootstrap,
deactivated,
path: config.operator_keypair_path,
prefix: config.operator_keypair_prefix,
warn_balance: config.operator_warn_balance,
min_balance: config.operator_minimum_balance,
});

tokio::spawn(this.clone().reload_task());
tokio::spawn(this.clone().run(deactivated_rx));

Ok(this)
}
Expand Down Expand Up @@ -231,25 +267,86 @@ impl OperatorPool {
Ok(())
}

fn reload_task(self: Arc<Self>) -> impl Future<Output = ()> + Send + 'static {
fn balance_to_health(&self, balance: u64) -> OperatorHealth {
match balance {
_ if balance < self.min_balance => OperatorHealth::Bad,
_ if balance < self.warn_balance => OperatorHealth::Warn,
_ => OperatorHealth::Good,
}
}

async fn get_operator_balance(&self, addr: &Address, key: &Pubkey) -> u64 {
let balance = self
.bootstrap
.solana_api
.get_balance(key)
.await
.inspect_err(
|error| tracing::warn!(%key, ?error, "error while checking operator balance"),
)
.unwrap_or(0);
tracing::debug!(%addr, %key, balance, "checking operator balance");
balance
}

/// Returns true if there's no point in checking this operator again
async fn try_reactivate(&self, address: Address) -> bool {
let Some(key) = self.map.get(&address).map(|ref_| ref_.operator.pubkey()) else {
tracing::debug!(%address, "operator will not be reactivated, not present in map");
return true;
};
if self
.balance_to_health(self.get_operator_balance(&address, &key).await)
.is_bad()
{
false
} else {
self.queue.0.send(address).await.expect("never closed");
tracing::info!(%address, "reactivated operator");
true
}
}

fn run(
self: Arc<Self>,
deactivate: Receiver<Address>,
) -> impl Future<Output = ()> + Send + 'static {
let this = Arc::downgrade(&self);
drop(self);
let mut interval = time::interval(RELOAD_INTERVAL);
interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
let mut deactivated = DelayQueue::new();

async move {
tracing::info!(
interval = %format_duration(RELOAD_INTERVAL),
"starting operator pool reload task"
);
loop {
interval.tick().await;
let Some(this) = this.upgrade() else {
tracing::info!("stopping operator pool reload task");
break;
};
if let Err(error) = this.reload().await {
tracing::error!(?error, "could not reload operator pool");
tokio::select! {
_ = interval.tick() => {
let Some(this) = this.upgrade() else {
tracing::info!("stopping operator pool reload task");
break;
};
if let Err(error) = this.reload().await {
tracing::error!(?error, "could not reload operator pool");
}
}
Ok(addr) = deactivate.recv() => {
deactivated.insert(addr, RECHECK_INTERVAL);
},
Some(addr) = deactivated.next(), if !deactivated.is_empty() => {
let addr = addr.into_inner();
let Some(this) = this.upgrade() else {
tracing::info!("stopping operator pool reload task");
break;
};
if !this.try_reactivate(addr).await {
deactivated.insert(addr, RECHECK_INTERVAL);
}
}
else => break,
}
}
}
Expand All @@ -268,6 +365,23 @@ impl executor::Execute for OperatorPool {
// This is for removed and disabled operators
continue;
};
let balance = self
.get_operator_balance(&address, &executor.pubkey())
.await;
match self.balance_to_health(balance) {
OperatorHealth::Bad => {
tracing::info!(%address, pubkey = %executor.pubkey(), balance, "deactivating operator");
self.deactivated
.send(address)
.await
.expect("cannot deactivate operator");
continue;
}
OperatorHealth::Warn => {
tracing::warn!(%address, pubkey = %executor.pubkey(), balance, "operator balance is low");
}
OperatorHealth::Good => (),
}
self.queue
.0
.send(address)
Expand Down

0 comments on commit f040a3f

Please sign in to comment.