From b48f0ba0e81469d428d9a2c809e24fc2224c1d24 Mon Sep 17 00:00:00 2001 From: Mikhail Sozin Date: Wed, 30 Oct 2024 04:38:34 +0100 Subject: [PATCH] Timestamp and cleanup --- core/src/provider.rs | 43 ++++++++++--------- .../src/rindexer_lib/typings/networks.rs | 2 +- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/core/src/provider.rs b/core/src/provider.rs index 0eaec67f..6d1c6f5d 100644 --- a/core/src/provider.rs +++ b/core/src/provider.rs @@ -42,7 +42,7 @@ pub trait ProviderInterface: Send + Sync + Debug { async fn get_block_number(&self) -> Result; - async fn get_logs(&self, filter: &RindexerEventFilter) -> Result, ProviderError>; + async fn get_logs(&self, filter: &RindexerEventFilter) -> Result, ProviderError>; fn max_block_range(&self) -> Option; @@ -190,6 +190,7 @@ impl ProviderInterface for HyperSyncProvider { let all_log_fields: BTreeSet = hypersync_schema::log().fields.iter().map(|x| x.name.clone()).collect(); + let block_fields = BTreeSet::from(["timestamp".to_string(), "number".to_string()]); let mut query = match raw_filter.block_option { FilterBlockOption::Range { from_block, to_block } => { @@ -201,7 +202,7 @@ impl ProviderInterface for HyperSyncProvider { Query { from_block: from_block.as_u64(), to_block: to_block.map(|n| n.as_u64() + 1), - field_selection: FieldSelection { log: all_log_fields, ..Default::default() }, + field_selection: FieldSelection { log: all_log_fields, block: block_fields, ..Default::default() }, ..Default::default() } } @@ -212,8 +213,7 @@ impl ProviderInterface for HyperSyncProvider { hash: vec![block_hash.into()], ..Default::default() }], - field_selection: FieldSelection { log: all_log_fields, ..Default::default() }, - join_mode: JoinMode::JoinAll, + field_selection: FieldSelection { log: all_log_fields, block: block_fields ,..Default::default() }, ..Default::default() }, }; @@ -246,7 +246,7 @@ impl ProviderInterface for HyperSyncProvider { topics: hypersync_topics, }]; - query.join_mode = JoinMode::JoinNothing; + query.join_mode = JoinMode::JoinAll; let resp = self .provider @@ -255,17 +255,24 @@ impl ProviderInterface for HyperSyncProvider { .await .map_err(|err| ProviderError::CustomError(err.to_string()))?; - Ok(resp - .data - .logs - .into_iter() - .flatten() - .filter_map(|log| { + let mut blocks = resp.data.blocks.into_iter().flatten(); + let block = blocks.next(); + if let None = block { + return Ok(vec![]); + } + let mut block = block.unwrap(); + Ok(resp.data.logs.into_iter().flatten().filter_map(|log| { + if block.number.unwrap() != u64::from(log.block_number.unwrap()) { + block = blocks.next().unwrap(); + }; + if block.number.unwrap() != u64::from(log.block_number.unwrap()) { let log = log.try_into().ok()?; Some(WrappedLog{inner: log, block_timestamp: None}) - }) - .collect::>()) - + } else { + let log = log.try_into().ok()?; + Some(WrappedLog{inner: log, block_timestamp: block.timestamp.clone().and_then(|ts| ts.try_into().ok())}) + } + }).collect::>()) } fn max_block_range(&self) -> Option { @@ -300,12 +307,8 @@ pub fn create_client( RetryClientError::HttpProviderCantBeCreated(rpc_url.to_string(), e.to_string()) })?; match kind { - ProviderType::Rpc => { - create_jsonrpc_client(url, compute_units_per_second, max_block_range, custom_headers) - .and_then(|client| Ok(client as Arc)) - } - ProviderType::Hypersync => create_hypersync_client(url, max_block_range) - .and_then(|client| Ok(client as Arc)), + ProviderType::Rpc => create_jsonrpc_client(url, compute_units_per_second, max_block_range, custom_headers).map(|client| client as Arc), + ProviderType::Hypersync => create_hypersync_client(url, max_block_range).map(|client| client as Arc), } } diff --git a/rindexer_rust_playground/src/rindexer_lib/typings/networks.rs b/rindexer_rust_playground/src/rindexer_lib/typings/networks.rs index 2d121448..e876f334 100644 --- a/rindexer_rust_playground/src/rindexer_lib/typings/networks.rs +++ b/rindexer_rust_playground/src/rindexer_lib/typings/networks.rs @@ -29,7 +29,7 @@ fn create_shadow_client( max_block_range, header, ) - .and_then(|client| Ok(client as Arc)) + .map(|client| client as Arc) } lazy_static! {