Skip to content

Commit

Permalink
feat(operator-pool): add try_recover_stuck_holder
Browse files Browse the repository at this point in the history
  • Loading branch information
tukan committed Nov 20, 2024
1 parent 7789ddc commit c674dbd
Showing 1 changed file with 64 additions and 6 deletions.
70 changes: 64 additions & 6 deletions operator-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,22 @@ use dashmap::DashMap;
use futures_util::StreamExt;
use humantime::format_duration;
use reth_primitives::Address;
use solana_api::solana_api::SolanaApi;
use solana_cli_config::CONFIG_FILE;
use solana_sdk::account_info::IntoAccountInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use thiserror::Error;
use tokio::sync::oneshot;
use tokio::time;
use tokio_util::time::DelayQueue;

use executor::{Executor, HOLDER_SIZE};
use executor::{
parse_owner, Executor, HolderMeta, HolderState, RecoveredHolder, RecoveredStuckHolder,
HOLDER_SIZE,
};
use neon_api::NeonApi;
use operator::Operator;
use solana_api::solana_api::SolanaApi;

const RELOAD_INTERVAL: Duration = Duration::from_secs(60);
const RECHECK_INTERVAL: Duration = Duration::from_secs(20);
Expand Down Expand Up @@ -146,6 +150,7 @@ impl OperatorHealth {
#[derive(Debug)]
pub struct OperatorPool {
map: DashMap<Address, PoolEntry>,
pubkey_map: DashMap<Pubkey, Address>,
queue: (Sender<Address>, Receiver<Address>),
bootstrap: Bootstrap,
deactivated: Sender<Address>,
Expand Down Expand Up @@ -181,22 +186,26 @@ impl OperatorPool {
};
let (deactivated, deactivated_rx) = async_channel::bounded(32);
let map = DashMap::new();
let pubkey_map = DashMap::new();
let queue = async_channel::unbounded();
let mut operator_order = Vec::new();
for operator in operators {
let operator = Arc::new(operator);
let address = operator.address();
let pubkey = operator.pubkey();
let entry = bootstrap.start_executor(operator).await?;
tracing::info!(operator = ?entry.operator, "loaded operator");
operator_order.push(entry.operator.address());
map.insert(entry.operator.address(), entry);
pubkey_map.insert(pubkey, address.clone());
queue.0.send(address).await.expect("never closed");
}

tracing::info!(order = ?operator_order, "loaded operators");

let this = Arc::new(Self {
map,
pubkey_map,
queue,
bootstrap,
deactivated,
Expand Down Expand Up @@ -232,6 +241,7 @@ impl OperatorPool {
self.map.iter().map(|ref_| *ref_.key()).collect()
}

// TODO: update pubkey_map here as well
async fn reload(&self) -> Result<()> {
let mut to_add = load_operators(&self.path, self.prefix.as_deref())?;

Expand All @@ -246,14 +256,13 @@ impl OperatorPool {
tracing::debug!("operator set did not change, nothing to reload");
} else {
tracing::info!(?to_add, ?to_remove, "reloading operators");
let mut map = HashMap::new();
let mut operator_order = Vec::new();
for address in to_remove {
let Some((_, entry)) = self.map.remove(&address) else {
tracing::warn!("cannot remove operator, not found in map");
continue;
};
let operator = &entry.operator;
self.pubkey_map.remove(&operator.pubkey());
tracing::info!(?operator, "deactivating operator");
continue;
}
Expand All @@ -263,8 +272,9 @@ impl OperatorPool {
let address = operator.address();
let entry = self.bootstrap.start_executor(operator).await?;
tracing::info!(operator = ?entry.operator, "loaded operator");
operator_order.push(entry.operator.address());
map.insert(entry.operator.address(), entry);
self.pubkey_map
.insert(entry.operator.pubkey(), entry.operator.address());
self.map.insert(entry.operator.address(), entry);
self.queue.0.send(address).await.expect("never closed");
}

Expand Down Expand Up @@ -313,6 +323,54 @@ impl OperatorPool {
}
}

async fn try_recover_stuck_holder(
&self,
holder_pkey: Pubkey,
) -> anyhow::Result<Option<RecoveredStuckHolder>> {
let program_id = self.bootstrap.neon_pubkey;
let Some(mut account) = self.bootstrap.solana_api.get_account(&holder_pkey).await? else {
return Ok(None); // TODO:
};
let account_info = (&holder_pkey, &mut account).into_account_info();
let owner = parse_owner(&account_info)?;
let state = HolderState::try_from_account_info(&program_id, account_info)?;
let meta = HolderMeta::try_from_operator(&program_id, &owner, self.bootstrap.max_holders)?;
Ok(Some(RecoveredStuckHolder {
recovered: RecoveredHolder {
state,
meta,
recreate: false,
},
operator: owner,
}))
}

async fn continue_from_recovered_holder(
&self,
holder: RecoveredStuckHolder,
) -> anyhow::Result<()> {
if !matches!(holder.recovered.state, HolderState::Pending(_)) {
return Ok(()); // TODO:
};

let Some(operator_address) = self.pubkey_map.get(&holder.operator) else {
return Ok(()); // TODO:
};
let Some(executor) = self
.map
.get(operator_address.value())
.map(|ref_| ref_.executor.clone())
else {
return Ok(()); // TODO:
};

executor
.continue_from_recovered_holder(holder.recovered)
.await?;

Ok(())
}

fn run(
self: Arc<Self>,
deactivate: Receiver<Address>,
Expand Down

0 comments on commit c674dbd

Please sign in to comment.