Skip to content

Commit

Permalink
properly index raw logs (rename)
Browse files Browse the repository at this point in the history
  • Loading branch information
tukan committed Nov 13, 2024
1 parent a31d3f2 commit d3789ec
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 40 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 58 additions & 1 deletion common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::fmt::{Display, Formatter};

use ethnum::U256;
use evm_loader::types::{Address, Transaction};
use serde::{Deserialize, Serialize};
use reth_primitives::B256;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use solana_sdk::clock::UnixTimestamp;
use solana_sdk::entrypoint::HEAP_LENGTH;
use solana_sdk::hash::Hash;
Expand Down Expand Up @@ -135,6 +136,27 @@ impl std::fmt::Debug for TxHash {
}
}

impl Serialize for TxHash {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let b256: B256 = B256::from(self.as_array());
b256.serialize(serializer)
}
}

impl<'de> Deserialize<'de> for TxHash {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let b256 = B256::deserialize(deserializer)?;
let array = <[u8; 32]>::try_from(b256.as_ref()).map_err(serde::de::Error::custom)?;
Ok(TxHash(array))
}
}

/// Solana block info.
#[derive(Debug, Clone)]
pub struct SolanaBlock {
Expand Down Expand Up @@ -210,6 +232,21 @@ impl Default for SolTxCuInfo {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
/// [`EventLog`] with additional block and transaction data
pub struct RichLog {
pub blockhash: Hash,
pub slot: u64,
pub timestamp: i64,
pub tx_idx: u64,
pub tx_hash: TxHash,
pub sol_signature: Signature,
pub sol_ix_idx: u64,
pub sol_ix_inner_idx: Option<u64>,
#[serde(flatten)]
pub event: EventLog,
}

/// Neon Transaction. Not yet sure if represents a single contract invocation or a completed transaction.
/// Inserted into `neon_transactions` table. Lacks `Clone` due to `evm-loader` implementation.
#[derive(Debug)]
Expand All @@ -223,6 +260,7 @@ pub struct NeonTxInfo {
pub contract: Option<Address>,
pub transaction: Transaction, // TODO: Clone
pub events: Vec<EventLog>,
pub rich_logs: Vec<RichLog>,

pub gas_used: U256,
pub sum_gas_used: U256, // TODO: What is this?
Expand All @@ -248,6 +286,25 @@ pub struct NeonTxInfo {
pub sol_tx_cu_info: SolTxCuInfo,
}

impl NeonTxInfo {
pub fn generate_rich_logs(&self, blockhash: Hash) -> Vec<RichLog> {
self.events
.iter()
.map(|event| RichLog {
blockhash,
slot: self.sol_slot,
timestamp: 0,
tx_idx: self.tx_idx,
tx_hash: self.neon_signature,
sol_signature: self.sol_signature,
sol_ix_idx: self.sol_ix_idx,
sol_ix_inner_idx: self.sol_ix_inner_idx,
event: event.clone(),
})
.collect()
}
}

#[derive(Debug)]
pub struct CanceledNeonTxInfo {
pub neon_signature: TxHash,
Expand Down
1 change: 1 addition & 0 deletions db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ futures-util.workspace = true
num-bigint = "0.4"
num-traits = "0.2.19"
serde_json = "1.0.125"
serde = { version = "1.0.208", features = ["derive"] }
2 changes: 1 addition & 1 deletion db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use block::{BlockBy, BlockRepo};
pub use reliable_empty_slot::ReliableEmptySlotRepo;
pub use solana_neon_transactions::SolanaNeonTransactionRepo;
pub use sqlx::PgPool;
pub use transaction::{RichLog, RichLogBy, TransactionBy, TransactionRepo, WithBlockhash};
pub use transaction::{RichLogBy, TransactionBy, TransactionRepo, WithBlockhash};

pub async fn connect(url: &str) -> Result<PgPool, sqlx::Error> {
tracing::info!(%url, "connecting to database");
Expand Down
120 changes: 95 additions & 25 deletions db/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,19 @@ use common::evm_loader::types::vector::{VectorVecExt, VectorVecSlowExt};
use common::evm_loader::types::{AccessListTx, Address, LegacyTx, Transaction, TransactionPayload};
use common::solana_sdk::hash::Hash;
use common::solana_sdk::pubkey::Pubkey;
use common::solana_sdk::signature::Signature;
use common::types::{
CanceledNeonTxInfo, EventKind, EventLog, NeonTxInfo, TxHash, SOLANA_MAX_HEAP_SIZE,
CanceledNeonTxInfo, EventKind, EventLog, NeonTxInfo, RichLog, TxHash, SOLANA_MAX_HEAP_SIZE,
};

use crate::{u256_to_bytes, PgAddress, PgPubkey, PgSolanaBlockHash, PgU256};

use super::Error;
use crate::{u256_to_bytes, PgAddress, PgPubkey, PgSolanaBlockHash, PgU256};

#[derive(Debug, Clone)]
struct EventFilter<'a> {
address: &'a [Address],
topics: [&'a [Vec<u8>]; 4],
}

#[derive(Debug, Clone)]
/// [`EventLog`] with additional block and transaction data
pub struct RichLog {
pub blockhash: Hash,
pub slot: u64,
pub timestamp: i64,
pub tx_idx: u64,
pub tx_hash: TxHash,
pub sol_signature: Signature,
pub sol_ix_idx: u64,
pub sol_ix_inner_idx: Option<u64>,
pub event: EventLog,
}

#[derive(Debug, Clone, Copy)]
pub enum RichLogBy {
Hash([u8; 32]),
Expand Down Expand Up @@ -185,6 +169,7 @@ impl TransactionRepo {

pub async fn insert(
&self,
block_hash: Hash,
tx: &NeonTxInfo,
txn: &mut sqlx::Transaction<'_, Postgres>,
) -> Result<(), sqlx::Error> {
Expand Down Expand Up @@ -359,11 +344,36 @@ impl TransactionRepo {
.execute(&mut **txn)
.await?;

let logs_data = match serde_json::to_vec(&tx.events) {
Ok(logs_data) => logs_data,
Err(err) => {
tracing::warn!(?err, ?tx, "failed to serialize logs");
vec![]
let logs_data = match sqlx::query!(
r#"
SELECT logs
FROM neon_transactions
WHERE neon_sig = $1
"#,
tx_hash.as_slice()
)
.fetch_optional(&mut **txn)
.await?
{
None => Vec::new(),
Some(existing_logs_raw) => {
let mut full_logs =
match serde_json::from_slice::<Vec<RichLog>>(&existing_logs_raw.logs) {
Ok(existing_logs) => existing_logs,
Err(err) => {
tracing::warn!(?err, ?tx, "failed to deserialize existing logs");
vec![]
}
};

full_logs.extend(tx.generate_rich_logs(block_hash));
match serde_json::to_vec(&full_logs) {
Ok(logs_data) => logs_data,
Err(err) => {
tracing::warn!(?err, ?tx, "failed to serialize logs");
vec![]
}
}
}
};

Expand Down Expand Up @@ -452,7 +462,53 @@ impl TransactionRepo {
self.fetch_with_events_inner(by, None, true)
}

/// Do not filters out incomplete transactions
pub async fn fetch_neon_tx_info(
&self,
tx_hash: TxHash,
) -> Result<Option<WithBlockhash<NeonTxInfo>>, Error> {
tracing::info!(%tx_hash, "fetching transactions with raw events");
let row = sqlx::query_as::<_, NeonTransactionRow>(
r#"
SELECT
neon_sig,
tx_type,
from_addr,
sol_sig,
sol_ix_idx,
sol_ix_inner_idx,
block_slot,
tx_idx,
nonce,
gas_price,
gas_limit,
value,
gas_used,
sum_gas_used,
to_addr,
contract,
status,
is_canceled,
is_completed,
v,
r,
s,
chain_id,
calldata,
logs,
neon_step_cnt
FROM neon_transactions T
WHERE neon_sig = $1
"#,
)
.bind(*tx_hash.as_array())
.fetch_optional(&self.pool)
.await?;

let tx_info = row.map(|r| r.neon_tx_info_with_default_cu()).transpose()?;
Ok(tx_info)
}

/// Does not filter out incomplete transactions
pub fn fetch_with_events_maybe_incomplete(
&self,
by: TransactionBy,
Expand Down Expand Up @@ -662,7 +718,7 @@ impl TransactionRepo {
}

#[derive(Debug, Clone, sqlx::FromRow)]
struct NeonTransactionRow {
pub struct NeonTransactionRow {
neon_sig: Vec<u8>,
tx_type: i32,
from_addr: PgAddress,
Expand All @@ -687,6 +743,8 @@ struct NeonTransactionRow {
is_canceled: bool,
is_completed: bool,

logs: Option<String>,

v: PgU256,
r: PgU256,
s: PgU256,
Expand Down Expand Up @@ -808,6 +866,17 @@ impl NeonTransactionRow {
})
}

fn parse_logs(&self) -> Vec<RichLog> {
serde_json::from_str(self.logs.as_deref().unwrap_or_default()).unwrap_or_default()
}

fn neon_tx_info_with_default_cu(self) -> anyhow::Result<WithBlockhash<NeonTxInfo>> {
let rich_logs = self.parse_logs();
let mut info = self.neon_tx_info_with_empty_logs_and_default_cu()?;
info.inner.rich_logs = rich_logs;
Ok(info)
}

fn neon_tx_info_with_empty_logs_and_default_cu(
self,
) -> anyhow::Result<WithBlockhash<NeonTxInfo>> {
Expand All @@ -828,6 +897,7 @@ impl NeonTransactionRow {
sol_signer: Pubkey::default(),
transaction,
events: Vec::new(),
rich_logs: Vec::new(),
gas_used: U256::from(self.gas_used),
sum_gas_used: U256::from(self.sum_gas_used),
sol_signature: common::solana_sdk::signature::Signature::from(
Expand Down
2 changes: 1 addition & 1 deletion indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Indexer {
let slot = block.block.slot;
for tx in &block.txs {
self.tx_repo
.insert(tx, &mut txn)
.insert(block.block.hash, tx, &mut txn)
.await
.context("failed to save neon transaction")?;
metrics().neon_transactions_saved.inc();
Expand Down
1 change: 1 addition & 0 deletions parse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ fn add_log_and_meta(
contract,
transaction: tx,
events: log_info.event_list.clone(), // TODO
rich_logs: Vec::new(),
gas_used,
sum_gas_used: Default::default(), /* set later */
sol_signature: Signature::default(), // TODO: should be in input?
Expand Down
4 changes: 2 additions & 2 deletions proxy/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use common::convert::ToNeon;
use common::evm_loader::types::{Address as NeonAddress, TransactionPayload};
use common::neon_lib::types::TxParams;
use common::solana_sdk::hash::Hash;
use common::types::{EventLog, NeonTxInfo, SolanaBlock};
use db::{RichLog, RichLogBy};
use common::types::{EventLog, NeonTxInfo, RichLog, SolanaBlock};
use db::RichLogBy;

use crate::rpc::NeonLog;

Expand Down
26 changes: 16 additions & 10 deletions proxy/src/rpc/neon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use common::types::EventKind;
use mempool::GasPriceModel;

use crate::convert::{
neon_to_eth, neon_to_eth_receipt, to_neon_receipt_v2, NeonTransactionReceiptV2,
convert_rich_log, neon_to_eth, neon_to_eth_receipt, to_neon_receipt_v2,
NeonTransactionReceiptV2,
};
use crate::error::{internal_error, Error};
use crate::rpc::EthApiImpl;
Expand Down Expand Up @@ -509,8 +510,10 @@ impl NeonCustomApiServer for EthApiImpl {
tracing::info!(%hash, ?detail, "transaction_receipt"); // TODO: downgrade to debug

let Some(tx_info) = self
.get_transaction(db::TransactionBy::Hash(hash.0.into()))
.await?
.transactions
.fetch_neon_tx_info(hash.0.into())
.await
.map_err(Error::from)?
else {
return Ok(None);
};
Expand Down Expand Up @@ -568,16 +571,19 @@ impl NeonCustomApiServer for EthApiImpl {
.map_err(|_| internal_error("failed to parse tx signature"))?,
);

let sol_ix_logs = logs
let sol_ix_logs = tx_info
.inner
.rich_logs
.iter()
.filter(|log| {
log.solana_transaction_signature == sol_tx_ix_sig
&& log.solana_instruction_index == sol_tx_ix.transaction.idx as u64
&& log.solana_inner_instruction_index
.filter(|rich_log| {
rich_log.sol_signature == sol_tx_ix_sig
&& rich_log.sol_ix_idx == sol_tx_ix.transaction.idx as u64
&& rich_log.sol_ix_inner_idx
== sol_tx_ix.transaction.inner_idx.map(|v| v as u64)
})
.cloned()
.collect::<Vec<_>>();
.map(|rich_log| convert_rich_log(rich_log.clone()))
.collect::<Result<Vec<_>, _>>()
.map_err(Error::from)?;

tracing::info!(?sol_ix_logs, total_logs = logs.len(), "instruction logs"); // TODO: remove

Expand Down

0 comments on commit d3789ec

Please sign in to comment.