diff --git a/examples/firehose/Cargo.toml b/examples/firehose/Cargo.toml index 70bab61d..b51f470f 100644 --- a/examples/firehose/Cargo.toml +++ b/examples/firehose/Cargo.toml @@ -7,12 +7,16 @@ edition = "2021" [dependencies] anyhow = "1.0.80" -atrium-api = { version = "0.18.1", features = ["dag-cbor"] } +atrium-api = { version = "0.24.8" } chrono = "0.4.34" +cid_old = { package = "cid", version = "0.10.1" } +cid = { package = "cid", version = "0.11.1" } futures = "0.3.30" ipld-core = { version = "0.4.0", default-features = false, features = ["std"] } rs-car = "0.4.1" -serde_ipld_dagcbor = { version = "0.6.0", default-features = false, features = ["std"] } +serde_ipld_dagcbor = { version = "0.6.0", default-features = false, features = [ + "std", +] } tokio = { version = "1.36.0", features = ["full"] } tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] } trait-variant = "0.1.1" diff --git a/examples/firehose/src/cid_compat.rs b/examples/firehose/src/cid_compat.rs new file mode 100644 index 00000000..1e1be3f4 --- /dev/null +++ b/examples/firehose/src/cid_compat.rs @@ -0,0 +1,24 @@ +use cid::{multihash::Multihash, Cid}; + +pub struct CidOld(cid_old::Cid); + +impl From for CidOld { + fn from(value: cid_old::Cid) -> Self { + Self(value) + } +} +impl TryFrom for Cid { + type Error = cid::Error; + fn try_from(value: CidOld) -> Result { + let version = match value.0.version() { + cid_old::Version::V0 => cid::Version::V0, + cid_old::Version::V1 => cid::Version::V1, + }; + + let codec = value.0.codec(); + let hash = value.0.hash(); + let hash = Multihash::from_bytes(&hash.to_bytes())?; + + Self::new(version, codec, hash) + } +} diff --git a/examples/firehose/src/lib.rs b/examples/firehose/src/lib.rs index b4e04262..420c55d2 100644 --- a/examples/firehose/src/lib.rs +++ b/examples/firehose/src/lib.rs @@ -1,2 +1,3 @@ +pub mod cid_compat; pub mod stream; pub mod subscription; diff --git a/examples/firehose/src/main.rs b/examples/firehose/src/main.rs index e70a237e..558c8d34 100644 --- a/examples/firehose/src/main.rs +++ b/examples/firehose/src/main.rs @@ -3,6 +3,7 @@ use atrium_api::app::bsky::feed::post::Record; use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID}; use atrium_api::types::{CidLink, Collection}; use chrono::Local; +use firehose::cid_compat::CidOld; use firehose::stream::frames::Frame; use firehose::subscription::{CommitHandler, Subscription}; use futures::StreamExt; @@ -54,7 +55,12 @@ impl CommitHandler for Firehose { continue; } let (items, _) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?; - if let Some((_, item)) = items.iter().find(|(cid, _)| Some(CidLink(*cid)) == op.cid) { + if let Some((_, item)) = items.iter().find(|(cid, _)| { + // + // convert cid from v0.10.1 to v0.11.1 + let cid = CidOld::from(*cid).try_into().expect("couldn't convert old to new cid"); + Some(CidLink(cid)) == op.cid + }) { let record = serde_ipld_dagcbor::from_reader::(&mut item.as_slice())?; println!( "{} - {}", @@ -78,8 +84,5 @@ impl CommitHandler for Firehose { #[tokio::main] async fn main() -> Result<(), Box> { - RepoSubscription::new("bsky.network") - .await? - .run(Firehose) - .await + RepoSubscription::new("bsky.network").await?.run(Firehose).await }