Skip to content

Commit

Permalink
Timestamp and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikhail Sozin committed Oct 30, 2024
1 parent 2f34985 commit b48f0ba
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 21 deletions.
43 changes: 23 additions & 20 deletions core/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub trait ProviderInterface: Send + Sync + Debug {

async fn get_block_number(&self) -> Result<U64, ProviderError>;

async fn get_logs(&self, filter: &RindexerEventFilter) -> Result<Vec<Log>, ProviderError>;
async fn get_logs(&self, filter: &RindexerEventFilter) -> Result<Vec<WrappedLog>, ProviderError>;

fn max_block_range(&self) -> Option<U64>;

Expand Down Expand Up @@ -190,6 +190,7 @@ impl ProviderInterface for HyperSyncProvider {

let all_log_fields: BTreeSet<String> =
hypersync_schema::log().fields.iter().map(|x| x.name.clone()).collect();
let block_fields = BTreeSet::from(["timestamp".to_string(), "number".to_string()]);

let mut query = match raw_filter.block_option {
FilterBlockOption::Range { from_block, to_block } => {
Expand All @@ -201,7 +202,7 @@ impl ProviderInterface for HyperSyncProvider {
Query {
from_block: from_block.as_u64(),
to_block: to_block.map(|n| n.as_u64() + 1),
field_selection: FieldSelection { log: all_log_fields, ..Default::default() },
field_selection: FieldSelection { log: all_log_fields, block: block_fields, ..Default::default() },
..Default::default()
}
}
Expand All @@ -212,8 +213,7 @@ impl ProviderInterface for HyperSyncProvider {
hash: vec![block_hash.into()],
..Default::default()
}],
field_selection: FieldSelection { log: all_log_fields, ..Default::default() },
join_mode: JoinMode::JoinAll,
field_selection: FieldSelection { log: all_log_fields, block: block_fields ,..Default::default() },
..Default::default()
},
};
Expand Down Expand Up @@ -246,7 +246,7 @@ impl ProviderInterface for HyperSyncProvider {
topics: hypersync_topics,
}];

query.join_mode = JoinMode::JoinNothing;
query.join_mode = JoinMode::JoinAll;

let resp = self
.provider
Expand All @@ -255,17 +255,24 @@ impl ProviderInterface for HyperSyncProvider {
.await
.map_err(|err| ProviderError::CustomError(err.to_string()))?;

Ok(resp
.data
.logs
.into_iter()
.flatten()
.filter_map(|log| {
let mut blocks = resp.data.blocks.into_iter().flatten();
let block = blocks.next();
if let None = block {
return Ok(vec![]);
}
let mut block = block.unwrap();
Ok(resp.data.logs.into_iter().flatten().filter_map(|log| {
if block.number.unwrap() != u64::from(log.block_number.unwrap()) {
block = blocks.next().unwrap();
};
if block.number.unwrap() != u64::from(log.block_number.unwrap()) {
let log = log.try_into().ok()?;
Some(WrappedLog{inner: log, block_timestamp: None})
})
.collect::<Vec<WrappedLog>>())

} else {
let log = log.try_into().ok()?;
Some(WrappedLog{inner: log, block_timestamp: block.timestamp.clone().and_then(|ts| ts.try_into().ok())})
}
}).collect::<Vec<WrappedLog>>())
}

fn max_block_range(&self) -> Option<U64> {
Expand Down Expand Up @@ -300,12 +307,8 @@ pub fn create_client(
RetryClientError::HttpProviderCantBeCreated(rpc_url.to_string(), e.to_string())
})?;
match kind {
ProviderType::Rpc => {
create_jsonrpc_client(url, compute_units_per_second, max_block_range, custom_headers)
.and_then(|client| Ok(client as Arc<dyn ProviderInterface>))
}
ProviderType::Hypersync => create_hypersync_client(url, max_block_range)
.and_then(|client| Ok(client as Arc<dyn ProviderInterface>)),
ProviderType::Rpc => create_jsonrpc_client(url, compute_units_per_second, max_block_range, custom_headers).map(|client| client as Arc<dyn ProviderInterface>),
ProviderType::Hypersync => create_hypersync_client(url, max_block_range).map(|client| client as Arc<dyn ProviderInterface>),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn create_shadow_client(
max_block_range,
header,
)
.and_then(|client| Ok(client as Arc<dyn ProviderInterface>))
.map(|client| client as Arc<dyn ProviderInterface>)
}

lazy_static! {
Expand Down

0 comments on commit b48f0ba

Please sign in to comment.