Skip to content

Commit

Permalink
feat: return Result from poll_next method for Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
fbozic committed Jun 18, 2024
1 parent a91d81d commit 411ff93
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 39 deletions.
60 changes: 33 additions & 27 deletions examples/chat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,16 @@ impl Node {

async fn handle_stream(&mut self, mut stream: network::stream::Stream) -> eyre::Result<()> {
let application_id = match stream.next().await {
Some(message) => match serde_json::from_slice(&message.data)? {
types::CatchupStreamMessage::Request(req) => {
types::ApplicationId::from(req.application_id)
}
message => {
eyre::bail!("Unexpected message: {:?}", message)
}
Some(message) => match message {
Ok(message) => match serde_json::from_slice(&message.data)? {
types::CatchupStreamMessage::Request(req) => {
types::ApplicationId::from(req.application_id)
}
message => {
eyre::bail!("Unexpected message: {:?}", message)
}
},
Err(err) => eyre::bail!(err),
},
None => {
eyre::bail!("Stream closed unexpectedly")
Expand Down Expand Up @@ -302,27 +305,30 @@ impl Node {
info!("Sent catchup request to peer: {:?}", choosen_peer);

while let Some(message) = stream.next().await {
match serde_json::from_slice(&message.data)? {
types::CatchupStreamMessage::Response(response) => {
for message in response.messages {
let text = String::from_utf8_lossy(&message.data);
println!(
"{LINE_START} Received cacthup message: {:?}, original from: {:?}",
text, message.source
);

self.store
.add_message(
types::ApplicationId::from(topic.clone().into_string()),
message,
)
.await;
match message {
Ok(message) => match serde_json::from_slice(&message.data)? {
types::CatchupStreamMessage::Response(response) => {
for message in response.messages {
let text = String::from_utf8_lossy(&message.data);
println!(
"{LINE_START} Received cacthup message: {:?}, original from: {:?}",
text, message.source
);

self.store
.add_message(
types::ApplicationId::from(topic.clone().into_string()),
message,
)
.await;
}
}
}
event => {
warn!(?event, "Unexpected event");
}
};
event => {
warn!(?event, "Unexpected event");
}
},
Err(err) => eyre::bail!(err),
}
}

info!("Closed stream to peer: {:?}", choosen_peer);
Expand Down
14 changes: 2 additions & 12 deletions examples/chat/src/network/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use libp2p::PeerId;
use tokio::io::BufStream;
use tokio_util::codec::Framed;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};
use tracing::error;

use super::{types, EventLoop};

Expand All @@ -31,20 +30,11 @@ impl Stream {
}

impl FuturesStream for Stream {
type Item = Message;
type Item = Result<Message, CodecError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = Pin::new(&mut self.get_mut().inner);
match inner.poll_next(cx) {
Poll::Ready(Some(Ok(message))) => Poll::Ready(Some(message)),
Poll::Ready(Some(Err(err))) => {
error!(%err, "Error while polling the inner stream");
cx.waker().wake_by_ref();
Poll::Pending
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
inner.poll_next(cx)
}
}

Expand Down

0 comments on commit 411ff93

Please sign in to comment.