Skip to content

Commit

Permalink
Logs, log levels (#797)
Browse files Browse the repository at this point in the history
* more logs, log levels
  • Loading branch information
volovyks authored Aug 8, 2024
1 parent d37cd5f commit 805c9be
Show file tree
Hide file tree
Showing 16 changed files with 230 additions and 88 deletions.
44 changes: 36 additions & 8 deletions chain-signatures/node/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl SecretManagerService {
data: Some(data), ..
}) if data.len() > 1 => Ok(Some(data)),
_ => {
tracing::info!("failed to load existing key share, presuming it is missing");
tracing::error!("failed to load existing key share, presuming it is missing");
Ok(None)
}
}
Expand All @@ -70,7 +70,11 @@ impl SecretManagerService {
&format!("projects/{}/secrets/{}", self.project_id, name.as_ref()),
)
.doit()
.await?;
.await
.map_err(|e| {
tracing::error!(%e, "failed to store secret");
e
})?;
Ok(())
}
}
Expand Down Expand Up @@ -122,7 +126,11 @@ impl DatastoreService {
.projects()
.lookup(request, &self.project_id)
.doit()
.await?;
.await
.map_err(|e| {
tracing::error!(%e, "failed to lookup entity in data store");
e
})?;
match response
.found
.and_then(|mut results| results.pop())
Expand Down Expand Up @@ -166,7 +174,11 @@ impl DatastoreService {
.projects()
.commit(request, &self.project_id)
.doit()
.await?;
.await
.map_err(|e| {
tracing::error!(%e, "failed to insert entity to data store");
e
})?;
Ok(())
}

Expand Down Expand Up @@ -203,7 +215,11 @@ impl DatastoreService {
.projects()
.commit(request, &self.project_id)
.doit()
.await?;
.await
.map_err(|e| {
tracing::error!(%e, "failed to update entity in data store");
e
})?;

Ok(())
}
Expand Down Expand Up @@ -241,7 +257,11 @@ impl DatastoreService {
.projects()
.commit(request, &self.project_id)
.doit()
.await?;
.await
.map_err(|e| {
tracing::error!(%e, "failed to upsert entity in data store");
e
})?;

Ok(())
}
Expand Down Expand Up @@ -273,7 +293,11 @@ impl DatastoreService {
.projects()
.run_query(req, &self.project_id)
.doit()
.await?;
.await
.map_err(|e| {
tracing::error!(%e, "failed to fetch entities from data store");
e
})?;
let batch = query_resp.batch.ok_or_else(|| {
DatastoreStorageError::FetchEntitiesError(
"Could not retrieve batch while fetching entities".to_string(),
Expand Down Expand Up @@ -322,7 +346,11 @@ impl DatastoreService {
.projects()
.commit(request, &self.project_id)
.doit()
.await?;
.await
.map_err(|e| {
tracing::error!(%e, "failed to delete entities in data store");
e
})?;
Ok(())
}
}
Expand Down
5 changes: 4 additions & 1 deletion chain-signatures/node/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn send_encrypted<U: IntoUrl>(
if status.is_success() {
Ok(())
} else {
tracing::error!(
tracing::warn!(
"failed to send a message to {} with code {}: {}",
url,
status,
Expand Down Expand Up @@ -186,6 +186,9 @@ impl MessageQueue {

// Add back the failed attempts for next time.
self.deque = failed;
if !errors.is_empty() {
tracing::warn!("got errors when sending encrypted messages: {errors:?}");
}
errors
}
}
Expand Down
31 changes: 25 additions & 6 deletions chain-signatures/node/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ pub struct Indexer {

impl Indexer {
fn new(latest_block_height: LatestBlockHeight, options: &Options) -> Self {
tracing::info!(
"creating new indexer, latest block height: {}",
latest_block_height.block_height
);
Self {
latest_block_height: Arc::new(RwLock::new(latest_block_height)),
last_updated_timestamp: Arc::new(RwLock::new(Instant::now())),
Expand Down Expand Up @@ -143,6 +147,7 @@ impl Indexer {
block_height: BlockHeight,
gcp: &GcpService,
) -> Result<(), DatastoreStorageError> {
tracing::trace!(block_height, "update_block_height");
*self.last_updated_timestamp.write().await = Instant::now();
self.latest_block_height
.write()
Expand All @@ -166,9 +171,11 @@ async fn handle_block(
mut block: near_lake_primitives::block::Block,
ctx: &Context,
) -> anyhow::Result<()> {
tracing::trace!(block_height = block.block_height(), "handle_block");
let mut pending_requests = Vec::new();
for action in block.actions().cloned().collect::<Vec<_>>() {
if action.receiver_id() == ctx.mpc_contract_id {
tracing::trace!("got action targeting {}", ctx.mpc_contract_id);
let Some(receipt) = block.receipt_by_id(&action.receipt_id()) else {
let err = format!(
"indexer unable to find block for receipt_id={}",
Expand All @@ -184,6 +191,7 @@ async fn handle_block(
continue;
};
if function_call.method_name() == "sign" {
tracing::trace!("found `sign` function call");
let arguments =
match serde_json::from_slice::<'_, SignArguments>(function_call.args()) {
Ok(arguments) => arguments,
Expand All @@ -206,10 +214,13 @@ async fn handle_block(
continue;
};

let Ok(entropy) = serde_json::from_str::<'_, [u8; 32]>(&receipt.logs()[1]) else {
let entropy_log_index = 1;
let Ok(entropy) =
serde_json::from_str::<'_, [u8; 32]>(&receipt.logs()[entropy_log_index])
else {
tracing::warn!(
"`sign` did not produce entropy correctly: {:?}",
receipt.logs()[0]
receipt.logs()[entropy_log_index]
);
continue;
};
Expand Down Expand Up @@ -261,9 +272,15 @@ async fn handle_block(
}
drop(queue);

if block.block_height() % 1000 == 0 {
tracing::info!(block_height = block.block_height(), "indexed block");
let log_indexing_interval = 1000;
if block.block_height() % log_indexing_interval == 0 {
tracing::info!(
"indexed another {} blocks, latest: {}",
log_indexing_interval,
block.block_height()
);
}

Ok(())
}

Expand Down Expand Up @@ -316,14 +333,14 @@ pub fn run(
let mut i = 0;
loop {
if i > 0 {
tracing::info!("restarting indexer after failure: restart count={i}");
tracing::warn!("restarting indexer after failure: restart count={i}");
}
i += 1;

let Ok(lake) = rt.block_on(async {
let latest = context.indexer.latest_block_height().await;
if i > 0 {
tracing::info!("indexer latest height {latest}, restart count={i}");
tracing::warn!("indexer latest height {latest}, restart count={i}");
}
let mut lake_builder = LakeBuilder::default()
.s3_bucket_name(&options.s3_bucket)
Expand Down Expand Up @@ -355,6 +372,7 @@ pub fn run(
let outcome = rt.block_on(async {
if i > 0 {
// give it some time to catch up
tracing::trace!("giving indexer some time to catch up");
backoff(i, 10, 300);
}
// while running, we will keep the task spinning, and check every so often if
Expand All @@ -368,6 +386,7 @@ pub fn run(

// Abort the indexer task if it's still running.
if !join_handle.is_finished() {
tracing::trace!("aborting indexer task");
join_handle.abort();
}

Expand Down
4 changes: 4 additions & 0 deletions chain-signatures/node/src/mesh/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ impl Pool {

async fn set_potential_participants(&self, participants: &Participants) {
*self.potential_connections.write().await = participants.clone();
tracing::debug!(
"Pool set potential participants to {:?}",
self.potential_connections.read().await.keys_vec()
);
}

pub async fn potential_participants(&self) -> Participants {
Expand Down
4 changes: 2 additions & 2 deletions chain-signatures/node/src/mesh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ impl Mesh {
self.ping().await;

tracing::debug!(
active = ?self.active_participants.keys_vec(),
active_potential = ?self.active_potential_participants.keys_vec(),
active = ?self.active_participants.account_ids(),
active_potential = ?self.active_potential_participants.account_ids(),
"mesh pinging",
);
}
Expand Down
16 changes: 2 additions & 14 deletions chain-signatures/node/src/protocol/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,7 @@ impl ConsensusProtocol for WaitingForConsensusState {
&public_key,
)
.await
.map_err(|err| {
tracing::error!(
?public_key,
?err,
"failed to vote for the generated public key"
);
ConsensusError::CannotVote(format!("{err:?}"))
})?;
.map_err(|err| ConsensusError::CannotVote(format!("{err:?}")))?;
}
Ok(NodeState::WaitingForConsensus(self))
}
Expand Down Expand Up @@ -454,11 +447,6 @@ impl ConsensusProtocol for WaitingForConsensusState {
)
.await
.map_err(|err| {
tracing::error!(
epoch = self.epoch,
?err,
"failed to vote for resharing"
);
ConsensusError::CannotVote(format!("{err:?}"))
})?;
} else {
Expand Down Expand Up @@ -504,7 +492,7 @@ impl ConsensusProtocol for RunningState {
}
Ordering::Less => Err(ConsensusError::EpochRollback),
Ordering::Equal => {
tracing::debug!("running(running): continuing to run as normal");
tracing::trace!("running(running): continuing to run as normal");
if contract_state.participants != self.participants {
return Err(ConsensusError::MismatchedParticipants);
}
Expand Down
1 change: 1 addition & 0 deletions chain-signatures/node/src/protocol/cryptography.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ impl CryptographicProtocol for RunningState {
// block height is up to date, such that they too can process signature requests. If they cannot
// then they are considered unstable and should not be a part of signature generation this round.
let stable = ctx.mesh().stable_participants().await;
tracing::info!(?stable, "stable participants");

let mut sign_queue = self.sign_queue.write().await;
crate::metrics::SIGN_QUEUE_SIZE
Expand Down
11 changes: 9 additions & 2 deletions chain-signatures/node/src/protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl MessageHandler for ResharingState {
_ctx: C,
queue: &mut MpcMessageQueue,
) -> Result<(), MessageHandleError> {
tracing::debug!("handling {} resharing messages", queue.resharing_bins.len());
let q = queue.resharing_bins.entry(self.old_epoch).or_default();
let mut protocol = self.protocol.write().await;
while let Some(msg) = q.pop_front() {
Expand Down Expand Up @@ -525,7 +526,10 @@ where
let msg = serde_json::to_vec(&msg)?;
let ciphered = cipher_pk
.encrypt(&msg, SignedMessage::<T>::ASSOCIATED_DATA)
.map_err(|e| CryptographicError::Encryption(e.to_string()))?;
.map_err(|e| {
tracing::error!(error = ?e, "failed to encrypt message");
CryptographicError::Encryption(e.to_string())
})?;
Ok(ciphered)
}
}
Expand All @@ -541,7 +545,10 @@ where
) -> Result<T, CryptographicError> {
let message = cipher_sk
.decrypt(&encrypted, SignedMessage::<T>::ASSOCIATED_DATA)
.map_err(|err| CryptographicError::Encryption(err.to_string()))?;
.map_err(|err| {
tracing::error!(error = ?err, "failed to decrypt message");
CryptographicError::Encryption(err.to_string())
})?;
let SignedMessage::<Vec<u8>> { msg, sig, from } = serde_json::from_slice(&message)?;
if !sig.verify(
&msg,
Expand Down
24 changes: 17 additions & 7 deletions chain-signatures/node/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,21 @@ impl MpcSignProtocol {
triple_storage: LockTripleNodeStorageBox,
cfg: Config,
) -> (Self, Arc<RwLock<NodeState>>) {
let my_address = my_address.into_url().unwrap();
let rpc_url = rpc_client.rpc_addr();
let signer_account_id: AccountId = signer.clone().account_id;
tracing::info!(
?my_address,
?mpc_contract_id,
?account_id,
?rpc_url,
?signer_account_id,
?cfg,
"initializing protocol with parameters"
);
let state = Arc::new(RwLock::new(NodeState::Starting));
let ctx = Ctx {
my_address: my_address.into_url().unwrap(),
my_address,
account_id,
mpc_contract_id,
rpc_client,
Expand Down Expand Up @@ -209,12 +221,12 @@ impl MpcSignProtocol {
.fetch_inplace(&self.ctx.rpc_client, &self.ctx.mpc_contract_id)
.await
{
tracing::warn!("could not fetch contract's config on startup: {err:?}");
tracing::error!("could not fetch contract's config on startup: {err:?}");
}

loop {
let protocol_time = Instant::now();
tracing::debug!("trying to advance chain signatures protocol");
tracing::trace!("trying to advance chain signatures protocol");
loop {
let msg_result = self.receiver.try_recv();
match msg_result {
Expand All @@ -241,13 +253,11 @@ impl MpcSignProtocol {
.await
{
Ok(contract_state) => contract_state,
Err(e) => {
tracing::error!("could not fetch contract's state: {e}");
Err(_) => {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
tracing::debug!(?contract_state);

// Establish the participants for this current iteration of the protocol loop. This will
// set which participants are currently active in the protocol and determines who will be
Expand Down Expand Up @@ -286,7 +296,7 @@ impl MpcSignProtocol {
let crypto_time = Instant::now();
let mut state = match state.progress(&mut self).await {
Ok(state) => {
tracing::debug!("progress ok: {state}");
tracing::trace!("progress ok: {state}");
state
}
Err(err) => {
Expand Down
Loading

0 comments on commit 805c9be

Please sign in to comment.