Skip to content

Commit

Permalink
fix: index and use of full logs
Browse files Browse the repository at this point in the history
  • Loading branch information
tukan authored and 00nktk committed Nov 13, 2024
1 parent 0233706 commit 2140825
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 49 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 50 additions & 1 deletion common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ mod tx_envelope_ext {
}
}

#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash)]
#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(into = "U256", from = "U256")]
pub struct TxHash([u8; 32]);

impl TxHash {
Expand Down Expand Up @@ -135,6 +136,19 @@ impl std::fmt::Debug for TxHash {
}
}

impl From<TxHash> for U256 {
fn from(tx_hash: TxHash) -> Self {
U256::from_be_bytes(tx_hash.0)
}
}

impl From<U256> for TxHash {
fn from(value: U256) -> Self {
let bytes: [u8; 32] = value.to_be_bytes();
TxHash(bytes)
}
}

/// Solana block info.
#[derive(Debug, Clone)]
pub struct SolanaBlock {
Expand Down Expand Up @@ -210,6 +224,21 @@ impl Default for SolTxCuInfo {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
/// [`EventLog`] with additional block and transaction data
pub struct RichLog {
pub blockhash: Hash,
pub slot: u64,
pub timestamp: i64,
pub tx_idx: u64,
pub tx_hash: TxHash,
pub sol_signature: Signature,
pub sol_ix_idx: u64,
pub sol_ix_inner_idx: Option<u64>,
#[serde(flatten)]
pub event: EventLog,
}

/// Neon Transaction. Not yet sure if represents a single contract invocation or a completed transaction.
/// Inserted into `neon_transactions` table. Lacks `Clone` due to `evm-loader` implementation.
#[derive(Debug)]
Expand All @@ -223,6 +252,7 @@ pub struct NeonTxInfo {
pub contract: Option<Address>,
pub transaction: Transaction, // TODO: Clone
pub events: Vec<EventLog>,
pub rich_logs: Vec<RichLog>,

pub gas_used: U256,
pub sum_gas_used: U256, // TODO: What is this?
Expand All @@ -248,6 +278,25 @@ pub struct NeonTxInfo {
pub sol_tx_cu_info: SolTxCuInfo,
}

impl NeonTxInfo {
pub fn generate_rich_logs(&self, blockhash: Hash) -> Vec<RichLog> {
self.events
.iter()
.map(|event| RichLog {
blockhash,
slot: self.sol_slot,
timestamp: 0,
tx_idx: self.tx_idx,
tx_hash: self.neon_signature,
sol_signature: self.sol_signature,
sol_ix_idx: self.sol_ix_idx,
sol_ix_inner_idx: self.sol_ix_inner_idx,
event: event.clone(),
})
.collect()
}
}

#[derive(Debug)]
pub struct CanceledNeonTxInfo {
pub neon_signature: TxHash,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ futures-util.workspace = true
num-bigint = "0.4"
num-traits = "0.2.19"
serde_json = "1.0.125"
serde = { version = "1.0.208", features = ["derive"] }
2 changes: 1 addition & 1 deletion db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use block::{BlockBy, BlockRepo};
pub use reliable_empty_slot::ReliableEmptySlotRepo;
pub use solana_neon_transactions::SolanaNeonTransactionRepo;
pub use sqlx::PgPool;
pub use transaction::{RichLog, RichLogBy, TransactionBy, TransactionRepo, WithBlockhash};
pub use transaction::{RichLogBy, TransactionBy, TransactionRepo, WithBlockhash};

pub async fn connect(url: &str) -> Result<PgPool, sqlx::Error> {
tracing::info!(%url, "connecting to database");
Expand Down
115 changes: 90 additions & 25 deletions db/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,19 @@ use common::evm_loader::types::vector::{VectorVecExt, VectorVecSlowExt};
use common::evm_loader::types::{AccessListTx, Address, LegacyTx, Transaction, TransactionPayload};
use common::solana_sdk::hash::Hash;
use common::solana_sdk::pubkey::Pubkey;
use common::solana_sdk::signature::Signature;
use common::types::{
CanceledNeonTxInfo, EventKind, EventLog, NeonTxInfo, TxHash, SOLANA_MAX_HEAP_SIZE,
CanceledNeonTxInfo, EventKind, EventLog, NeonTxInfo, RichLog, TxHash, SOLANA_MAX_HEAP_SIZE,
};

use crate::{u256_to_bytes, PgAddress, PgPubkey, PgSolanaBlockHash, PgU256};

use super::Error;
use crate::{u256_to_bytes, PgAddress, PgPubkey, PgSolanaBlockHash, PgU256};

#[derive(Debug, Clone)]
struct EventFilter<'a> {
address: &'a [Address],
topics: [&'a [Vec<u8>]; 4],
}

#[derive(Debug, Clone)]
/// [`EventLog`] with additional block and transaction data
pub struct RichLog {
pub blockhash: Hash,
pub slot: u64,
pub timestamp: i64,
pub tx_idx: u64,
pub tx_hash: TxHash,
pub sol_signature: Signature,
pub sol_ix_idx: u64,
pub sol_ix_inner_idx: Option<u64>,
pub event: EventLog,
}

#[derive(Debug, Clone, Copy)]
pub enum RichLogBy {
Hash([u8; 32]),
Expand Down Expand Up @@ -185,6 +169,7 @@ impl TransactionRepo {

pub async fn insert(
&self,
block_hash: Hash,
tx: &NeonTxInfo,
txn: &mut sqlx::Transaction<'_, Postgres>,
) -> Result<(), sqlx::Error> {
Expand Down Expand Up @@ -359,14 +344,32 @@ impl TransactionRepo {
.execute(&mut **txn)
.await?;

let logs_data = match serde_json::to_vec(&tx.events) {
Ok(logs_data) => logs_data,
Err(err) => {
tracing::warn!(?err, ?tx, "failed to serialize logs");
vec![]
let mut parsed_logs = match sqlx::query!(
r#"
SELECT logs
FROM neon_transactions
WHERE neon_sig = $1
"#,
tx_hash.as_slice()
)
.fetch_optional(&mut **txn)
.await?
{
None => Vec::new(),
Some(existing_logs_raw) => {
serde_json::from_slice::<Vec<RichLog>>(&existing_logs_raw.logs)
.inspect_err(|err| {
tracing::warn!(?err, ?tx, "failed to deserialize existing logs")
})
.unwrap_or_default()
}
};

parsed_logs.extend(tx.generate_rich_logs(block_hash));
let logs_data = serde_json::to_vec(&parsed_logs)
.inspect_err(|err| tracing::warn!(?err, ?tx, "failed to serialize logs"))
.unwrap_or_default();

sqlx::query!(
r#"
INSERT INTO neon_transactions
Expand Down Expand Up @@ -452,7 +455,54 @@ impl TransactionRepo {
self.fetch_with_events_inner(by, None, true)
}

/// Do not filters out incomplete transactions
pub async fn fetch_neon_tx_info(
&self,
tx_hash: TxHash,
) -> Result<Option<WithBlockhash<NeonTxInfo>>, Error> {
tracing::info!(%tx_hash, "fetching transactions with raw events");
let row = sqlx::query_as::<_, NeonTransactionRow>(
r#"
SELECT
neon_sig,
tx_type,
from_addr,
sol_sig,
sol_ix_idx,
sol_ix_inner_idx,
block_slot,
tx_idx,
nonce,
gas_price,
gas_limit,
value,
gas_used,
sum_gas_used,
to_addr,
contract,
status,
is_canceled,
is_completed,
v,
r,
s,
chain_id,
calldata,
logs,
neon_step_cnt,
NULL as block_hash
FROM neon_transactions T
WHERE neon_sig = $1
"#,
)
.bind(*tx_hash.as_array())
.fetch_optional(&self.pool)
.await?;

let tx_info = row.map(|r| r.neon_tx_info_with_default_cu()).transpose()?;
Ok(tx_info)
}

/// Does not filter out incomplete transactions
pub fn fetch_with_events_maybe_incomplete(
&self,
by: TransactionBy,
Expand Down Expand Up @@ -508,6 +558,7 @@ impl TransactionRepo {
v, r, s, chain_id,
calldata, neon_step_cnt,
B.block_hash,
NULL as logs,
L.address, L.tx_log_idx,
(row_number() OVER (
Expand Down Expand Up @@ -662,7 +713,7 @@ impl TransactionRepo {
}

#[derive(Debug, Clone, sqlx::FromRow)]
struct NeonTransactionRow {
pub struct NeonTransactionRow {
neon_sig: Vec<u8>,
tx_type: i32,
from_addr: PgAddress,
Expand All @@ -687,6 +738,8 @@ struct NeonTransactionRow {
is_canceled: bool,
is_completed: bool,

logs: Option<Vec<u8>>,

v: PgU256,
r: PgU256,
s: PgU256,
Expand Down Expand Up @@ -808,6 +861,17 @@ impl NeonTransactionRow {
})
}

fn parse_logs(&self) -> Vec<RichLog> {
serde_json::from_slice(self.logs.as_deref().unwrap_or_default()).unwrap_or_default()
}

fn neon_tx_info_with_default_cu(self) -> anyhow::Result<WithBlockhash<NeonTxInfo>> {
let rich_logs = self.parse_logs();
let mut info = self.neon_tx_info_with_empty_logs_and_default_cu()?;
info.inner.rich_logs = rich_logs;
Ok(info)
}

fn neon_tx_info_with_empty_logs_and_default_cu(
self,
) -> anyhow::Result<WithBlockhash<NeonTxInfo>> {
Expand All @@ -828,6 +892,7 @@ impl NeonTransactionRow {
sol_signer: Pubkey::default(),
transaction,
events: Vec::new(),
rich_logs: Vec::new(),
gas_used: U256::from(self.gas_used),
sum_gas_used: U256::from(self.sum_gas_used),
sol_signature: common::solana_sdk::signature::Signature::from(
Expand Down
2 changes: 1 addition & 1 deletion indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Indexer {
let slot = block.block.slot;
for tx in &block.txs {
self.tx_repo
.insert(tx, &mut txn)
.insert(block.block.hash, tx, &mut txn)
.await
.context("failed to save neon transaction")?;
metrics().neon_transactions_saved.inc();
Expand Down
1 change: 1 addition & 0 deletions parse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ fn add_log_and_meta(
contract,
transaction: tx,
events: log_info.event_list.clone(), // TODO
rich_logs: Vec::new(),
gas_used,
sum_gas_used: Default::default(), /* set later */
sol_signature: Signature::default(), // TODO: should be in input?
Expand Down
4 changes: 2 additions & 2 deletions proxy/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use common::convert::ToNeon;
use common::evm_loader::types::{Address as NeonAddress, TransactionPayload};
use common::neon_lib::types::TxParams;
use common::solana_sdk::hash::Hash;
use common::types::{EventLog, NeonTxInfo, SolanaBlock};
use db::{RichLog, RichLogBy};
use common::types::{EventLog, NeonTxInfo, RichLog, SolanaBlock};
use db::RichLogBy;

use crate::rpc::NeonLog;

Expand Down
Loading

0 comments on commit 2140825

Please sign in to comment.