From ae80f198f78113c81f34ffc17810f994e47962ff Mon Sep 17 00:00:00 2001 From: Juan Pablo Royo Sales Date: Wed, 25 Sep 2024 14:46:00 +0200 Subject: [PATCH 1/4] feat: removing origin-demuxer --- core/fetcher/src/router.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/fetcher/src/router.rs b/core/fetcher/src/router.rs index 3ff2f692d..9a9109da8 100644 --- a/core/fetcher/src/router.rs +++ b/core/fetcher/src/router.rs @@ -1,5 +1,5 @@ use lightning_interfaces::types::{Blake3Hash, ImmutablePointer, OriginProvider}; -use lightning_interfaces::NodeComponents; +use lightning_interfaces::Collection; use lightning_origin_b3fs::B3FSOrigin; use lightning_origin_http::HttpOrigin; use lightning_origin_ipfs::IPFSOrigin; From 6d5b48651a3e8dbff4517d6730b76a089ce516b2 Mon Sep 17 00:00:00 2001 From: Juan Pablo Royo Sales Date: Mon, 23 Dec 2024 17:41:04 +0100 Subject: [PATCH 2/4] fix: try --- .../src/blockstore_server.rs | 128 +++++++++++++----- 1 file changed, 93 insertions(+), 35 deletions(-) diff --git a/core/blockstore-server/src/blockstore_server.rs b/core/blockstore-server/src/blockstore_server.rs index 3b992a281..48e59bd9a 100644 --- a/core/blockstore-server/src/blockstore_server.rs +++ b/core/blockstore-server/src/blockstore_server.rs @@ -536,7 +536,7 @@ async fn handle_request( peer: NodeIndex, peer_request: PeerRequest, blockstore: C::BlockstoreInterface, - request: ::Request, + mut request: ::Request, num_responses: Arc, rep_reporter: c!(C::ReputationAggregatorInterface::ReputationReporter), ) { @@ -544,16 +544,23 @@ async fn handle_request( let num_blocks = tree.blocks(); if tree.is_file() { if let Some(file) = tree.into_file() { - send_file::( + match send_file::( file, - request, + &mut request, num_blocks, &num_responses, blockstore, rep_reporter, peer, ) - .await; + .await + { + Ok(_) => (), + Err(e) => { + error!("{e:?}"); + return request.reject(RejectReason::ContentNotFound); + }, + } } else { error!( "Content Header detected a File type but it could not be converted into File" @@ -561,7 +568,23 @@ async fn handle_request( request.reject(RejectReason::Other); } } else if let Some(dir) = tree.into_dir() { - send_dir::(dir, request, num_blocks, &num_responses, rep_reporter, peer).await; + match send_dir::( + dir, + &mut request, + blockstore, + num_blocks, + &num_responses, + rep_reporter, + peer, + ) + .await + { + Ok(_) => (), + Err(e) => { + error!("{e:?}"); + return request.reject(RejectReason::ContentNotFound); + }, + } } else { error!( "Content Header detected a Directory type but it could not be converted into Dir" @@ -577,20 +600,19 @@ async fn handle_request( async fn send_file( file: b3fs::bucket::file::reader::B3File, - mut request: ::Request, + request: &mut ::Request, num_blocks: u32, num_responses: &Arc, blockstore: ::BlockstoreInterface, rep_reporter: c!(C::ReputationAggregatorInterface::ReputationReporter), peer: u32, -) { +) -> anyhow::Result<()> { let mut num_bytes = 0; let instant = Instant::now(); let mut reader = match file.hashtree().await { Ok(reader) => reader, Err(e) => { - error!("Failed to get Async HashTree {}", e); - return request.reject(RejectReason::ContentNotFound); + return Err(anyhow!("Failed to get Async HashTree {}", e)); }, }; for block in 0..num_blocks { @@ -598,16 +620,14 @@ async fn send_file( Ok(Some(hash)) => hash, Ok(_) => break, Err(e) => { - error!("Failed to read hash from block {} - {}", block, e); - return request.reject(RejectReason::Other); + return Err(anyhow!("Failed to read hash from block {} - {}", block, e)); }, }; let proof = match reader.generate_proof(block).await { Ok(p) => p, Err(e) => { - error!("Failed to generate proof {}", e); - return request.reject(RejectReason::Other); + return Err(anyhow!("Failed to generate proof {}", e)); }, }; @@ -618,14 +638,14 @@ async fn send_file( ))))) .await { - error!("Failed to send proof: {e:?}"); num_responses.fetch_sub(1, Ordering::Release); - return; + return Err(anyhow!("Failed to send proof: {e:?}")); } let chunk = match blockstore.get_bucket().get_block_content(&hash).await { Ok(Some(chunk)) => chunk, - _ => return request.reject(RejectReason::ContentNotFound), + Ok(None) => return Err(anyhow!("Cannot get block content")), + Err(e) => return Err(anyhow!("Cannot get block content {e:?}")), }; num_bytes += chunk.len(); @@ -635,39 +655,39 @@ async fn send_file( Frame::File(FileFrame::Chunk(Cow::Borrowed(&chunk))) }; if let Err(e) = request.send(Bytes::from(frame)).await { - error!("Failed to send chunk: {e:?}"); num_responses.fetch_sub(1, Ordering::Release); + return Err(anyhow!("Failed to send chunk: {e:?}")); } } if let Err(e) = request.send(Bytes::from(Frame::File(FileFrame::Eos))).await { - error!("Failed to send eos: {e:?}"); + return Err(anyhow!("Failed to send eos: {e:?}")); } else { rep_reporter.report_bytes_sent(peer, num_bytes as u64, Some(instant.elapsed())); } + Ok(()) } async fn send_dir( mut dir: b3fs::bucket::dir::reader::B3Dir, - mut request: ::Request, + request: &mut ::Request, + blockstore: ::BlockstoreInterface, num_blocks: u32, num_responses: &Arc, rep_reporter: c!(C::ReputationAggregatorInterface::ReputationReporter), peer: u32, -) { +) -> anyhow::Result<()> { let mut num_bytes = 0; let instant = Instant::now(); let mut reader = match dir.hashtree().await { Ok(reader) => reader, Err(e) => { - error!("Failed to get Async HashTree {}", e); - return request.reject(RejectReason::ContentNotFound); + return Err(anyhow!("Failed to get Async HashTree {}", e)); }, }; let mut entries_reader = match dir.entries().await { Ok(entries) => entries, Err(e) => { - error!("Error trying to obtain Entries Iterator {}", e); - return request.reject(RejectReason::ContentNotFound); + return Err(anyhow!("Error trying to obtain Entries Iterator {}", e)); }, }; @@ -675,7 +695,7 @@ async fn send_dir( .send(Bytes::from(Frame::Dir(DirFrame::Prelude(num_blocks)))) .await { - error!("Failed to send prelude: {e:?}"); + return Err(anyhow!("Failed to send prelude: {e:?}")); } else { rep_reporter.report_bytes_sent(peer, num_bytes as u64, Some(instant.elapsed())); } @@ -684,8 +704,7 @@ async fn send_dir( let proof = match reader.generate_proof(block).await { Ok(p) => p, Err(e) => { - error!("Failed to generate proof {}", e); - return request.reject(RejectReason::Other); + return Err(anyhow!("Failed to generate proof {}", e)); }, }; @@ -696,14 +715,54 @@ async fn send_dir( ))))) .await { - error!("Failed to send proof: {e:?}"); num_responses.fetch_sub(1, Ordering::Release); - return; + return Err(anyhow!("Failed to send proof: {e:?}")); } if let Some(ent) = entries_reader.next().await { match ent { Ok(entry) => { + match entry.link { + OwnedLink::Content(content) => { + let content_header = blockstore.get_bucket().get(&content).await; + match content_header { + Ok(cont_head) => { + if cont_head.is_dir() { + let dir_reader = cont_head.into_dir().unwrap(); + let result = Box::pin( + send_dir::( + dir_reader, + request, + blockstore.clone(), + num_blocks, + num_responses, + rep_reporter.clone(), + peer, + ) + .await, + ); + result.unwrap()?; + } else { + let file_data = cont_head.into_file().unwrap(); + send_file::( + file_data, + request, + num_blocks, + num_responses, + blockstore.clone(), + rep_reporter.clone(), + peer, + ) + .await?; + } + }, + Err(err) => { + return Err(anyhow!("Error getting entry - {}", err)); + }, + }; + }, + OwnedLink::Link(_) => (), + }; let dir_frame: DirFrame<'_> = DirFrame::from_entry(entry, num_blocks == block + 1); num_bytes += dir_frame.len(); @@ -715,22 +774,21 @@ async fn send_dir( } }, Err(e) => { - error!("Error getting entry - {}", e); - return request.reject(RejectReason::ContentNotFound); + return Err(anyhow!("Error getting entry - {}", e)); }, } } else { - error!( + return Err(anyhow!( "There should be a next entry but it not found. Inconsistency between stream entries and hashes" - ); - return request.reject(RejectReason::ContentNotFound); + )); } } if let Err(e) = request.send(Bytes::from(Frame::Dir(DirFrame::Eos))).await { - error!("Failed to send eos: {e:?}"); + return Err(anyhow!("Failed to send eos: {e:?}")); } else { rep_reporter.report_bytes_sent(peer, num_bytes as u64, Some(instant.elapsed())); } + Ok(()) } async fn send_request( From 8ee54f4bbed3cc15ffa77d7bceeba7ec1cbe5618 Mon Sep 17 00:00:00 2001 From: Juan Pablo Royo Sales Date: Fri, 27 Dec 2024 16:40:41 +0100 Subject: [PATCH 3/4] feat: second attempt --- .../src/blockstore_server.rs | 190 ++++++++++-------- 1 file changed, 101 insertions(+), 89 deletions(-) diff --git a/core/blockstore-server/src/blockstore_server.rs b/core/blockstore-server/src/blockstore_server.rs index 48e59bd9a..832e73e09 100644 --- a/core/blockstore-server/src/blockstore_server.rs +++ b/core/blockstore-server/src/blockstore_server.rs @@ -602,7 +602,7 @@ async fn send_file( file: b3fs::bucket::file::reader::B3File, request: &mut ::Request, num_blocks: u32, - num_responses: &Arc, + num_responses: &AtomicUsize, blockstore: ::BlockstoreInterface, rep_reporter: c!(C::ReputationAggregatorInterface::ReputationReporter), peer: u32, @@ -672,122 +672,134 @@ async fn send_dir( request: &mut ::Request, blockstore: ::BlockstoreInterface, num_blocks: u32, - num_responses: &Arc, + num_responses: &AtomicUsize, rep_reporter: c!(C::ReputationAggregatorInterface::ReputationReporter), peer: u32, ) -> anyhow::Result<()> { let mut num_bytes = 0; let instant = Instant::now(); - let mut reader = match dir.hashtree().await { + + let reader = match dir.hashtree().await { Ok(reader) => reader, Err(e) => { return Err(anyhow!("Failed to get Async HashTree {}", e)); }, }; - let mut entries_reader = match dir.entries().await { + let entries_reader = match dir.entries().await { Ok(entries) => entries, Err(e) => { return Err(anyhow!("Error trying to obtain Entries Iterator {}", e)); }, }; + let mut stack_reader = VecDeque::new(); + stack_reader.push_back((reader, entries_reader, false)); - if let Err(e) = request - .send(Bytes::from(Frame::Dir(DirFrame::Prelude(num_blocks)))) - .await - { - return Err(anyhow!("Failed to send prelude: {e:?}")); - } else { - rep_reporter.report_bytes_sent(peer, num_bytes as u64, Some(instant.elapsed())); - } - - for block in 0..num_blocks { - let proof = match reader.generate_proof(block).await { - Ok(p) => p, - Err(e) => { - return Err(anyhow!("Failed to generate proof {}", e)); - }, - }; - - num_bytes += proof.len(); - if let Err(e) = request - .send(Bytes::from(Frame::Dir(DirFrame::Proof(Cow::Borrowed( - proof.as_slice(), - ))))) - .await + 'dir_reader: loop { + if let Some((ref mut reader, ref mut entries_reader, ref mut prelude_sent)) = + stack_reader.pop_front() { - num_responses.fetch_sub(1, Ordering::Release); - return Err(anyhow!("Failed to send proof: {e:?}")); - } + if !*prelude_sent { + if let Err(e) = request + .send(Bytes::from(Frame::Dir(DirFrame::Prelude(num_blocks)))) + .await + { + return Err(anyhow!("Failed to send prelude: {e:?}")); + } else { + rep_reporter.report_bytes_sent(peer, num_bytes as u64, Some(instant.elapsed())); + *prelude_sent = true; + } + } - if let Some(ent) = entries_reader.next().await { - match ent { - Ok(entry) => { - match entry.link { - OwnedLink::Content(content) => { - let content_header = blockstore.get_bucket().get(&content).await; - match content_header { - Ok(cont_head) => { - if cont_head.is_dir() { - let dir_reader = cont_head.into_dir().unwrap(); - let result = Box::pin( - send_dir::( - dir_reader, + let mut block = 0; + 'entries: while let Some(ent) = entries_reader.next().await { + match ent { + Ok(ref entry) => { + let proof = match reader.generate_proof(block).await { + Ok(p) => p, + Err(e) => { + return Err(anyhow!("Failed to generate proof {}", e)); + }, + }; + num_bytes += proof.len(); + if let Err(e) = request + .send(Bytes::from(Frame::Dir(DirFrame::Proof(Cow::Borrowed( + proof.as_slice(), + ))))) + .await + { + num_responses.fetch_sub(1, Ordering::Release); + return Err(anyhow!("Failed to send proof: {e:?}")); + } + let dir_frame: DirFrame<'_> = + DirFrame::from_entry(entry.clone(), num_blocks == block + 1); + num_bytes += dir_frame.len(); + let frame = Frame::Dir(dir_frame); + + if let Err(e) = request.send(Bytes::from(frame)).await { + error!("Failed to send chunk: {e:?}"); + num_responses.fetch_sub(1, Ordering::Release); + } + block += 1; + match entry.link { + OwnedLink::Content(content) => { + let content_header = blockstore.get_bucket().get(&content).await; + match content_header { + Ok(cont_head) => { + if cont_head.is_dir() { + if let Some(ref mut dir_reader) = cont_head.into_dir() { + let reader = dir_reader.hashtree().await?; + let entries_reader = dir_reader.entries().await?; + stack_reader.push_back(( + reader, + entries_reader, + false, + )); + continue 'entries; + } else { + return Err(anyhow!( + "Content was detect as DIR but it cannot be converted into DirReader" + )); + } + } else if let Some(file_data) = cont_head.into_file() { + send_file::( + file_data, request, - blockstore.clone(), num_blocks, num_responses, + blockstore.clone(), rep_reporter.clone(), peer, ) - .await, - ); - result.unwrap()?; - } else { - let file_data = cont_head.into_file().unwrap(); - send_file::( - file_data, - request, - num_blocks, - num_responses, - blockstore.clone(), - rep_reporter.clone(), - peer, - ) - .await?; - } - }, - Err(err) => { - return Err(anyhow!("Error getting entry - {}", err)); - }, - }; - }, - OwnedLink::Link(_) => (), - }; - let dir_frame: DirFrame<'_> = - DirFrame::from_entry(entry, num_blocks == block + 1); - num_bytes += dir_frame.len(); - let frame = Frame::Dir(dir_frame); - - if let Err(e) = request.send(Bytes::from(frame)).await { - error!("Failed to send chunk: {e:?}"); - num_responses.fetch_sub(1, Ordering::Release); - } - }, - Err(e) => { - return Err(anyhow!("Error getting entry - {}", e)); - }, + .await?; + continue 'entries; + } else { + return Err(anyhow!( + "File Reader cannot be obtained and it should be present" + )); + } + }, + Err(err) => { + return Err(anyhow!("Error getting entry - {}", err)); + }, + }; + }, + OwnedLink::Link(_) => (), + }; + }, + Err(e) => { + return Err(anyhow!("Error getting entry - {}", e)); + }, + } + } + if let Err(e) = request.send(Bytes::from(Frame::Dir(DirFrame::Eos))).await { + return Err(anyhow!("Failed to send eos: {e:?}")); + } else { + rep_reporter.report_bytes_sent(peer, num_bytes as u64, Some(instant.elapsed())); } } else { - return Err(anyhow!( - "There should be a next entry but it not found. Inconsistency between stream entries and hashes" - )); + break 'dir_reader; } } - if let Err(e) = request.send(Bytes::from(Frame::Dir(DirFrame::Eos))).await { - return Err(anyhow!("Failed to send eos: {e:?}")); - } else { - rep_reporter.report_bytes_sent(peer, num_bytes as u64, Some(instant.elapsed())); - } Ok(()) } From 2a9af10942817a8c57e119170f808ffc921d6dfd Mon Sep 17 00:00:00 2001 From: Juan Pablo Royo Sales Date: Thu, 2 Jan 2025 09:15:02 +0100 Subject: [PATCH 4/4] feat: third attempt --- .../src/blockstore_server.rs | 162 ++++++++++++------ core/blockstore-server/src/tests.rs | 10 +- 2 files changed, 112 insertions(+), 60 deletions(-) diff --git a/core/blockstore-server/src/blockstore_server.rs b/core/blockstore-server/src/blockstore_server.rs index 832e73e09..e6af3dcd2 100644 --- a/core/blockstore-server/src/blockstore_server.rs +++ b/core/blockstore-server/src/blockstore_server.rs @@ -11,6 +11,7 @@ use std::time::{Duration, Instant}; use affair::{Socket, Task}; use anyhow::{anyhow, Result}; +use b3fs::bucket::dir::reader::B3Dir; use b3fs::entry::{BorrowedEntry, InlineVec, OwnedEntry, OwnedLink}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use lightning_interfaces::prelude::*; @@ -304,6 +305,7 @@ pub enum Frame<'a> { #[derive(Debug)] pub enum FileFrame<'a> { + Prelude([u8; 32]), Proof(Cow<'a, [u8]>), Chunk(Cow<'a, [u8]>), LastChunk(Cow<'a, [u8]>), @@ -312,7 +314,7 @@ pub enum FileFrame<'a> { #[derive(Debug)] pub enum DirFrame<'a> { - Prelude(u32), + Prelude((u32, [u8; 32])), Proof(Cow<'a, [u8]>), Chunk(Cow<'a, OwnedEntry>), LastChunk(Cow<'a, OwnedEntry>), @@ -331,7 +333,7 @@ impl<'a> DirFrame<'a> { pub fn len(&self) -> usize { match &self { - Self::Prelude(c) => c.to_le_bytes().len(), + Self::Prelude((entries, hash)) => hash.len() + entries.to_le_bytes().len(), Self::Proof(c) => c.len(), Self::Chunk(entry) => Self::entry_len(entry), Self::LastChunk(entry) => Self::entry_len(entry), @@ -353,33 +355,38 @@ impl<'a> From> for Bytes { let mut b = BytesMut::new(); match value { Frame::File(file) => match file { - FileFrame::Proof(proof) => { + FileFrame::Prelude(hash) => { b.put_u8(0x00); + b.put_slice(&hash); + }, + FileFrame::Proof(proof) => { + b.put_u8(0x01); b.put_slice(&proof); }, FileFrame::Chunk(chunk) => { - b.put_u8(0x01); + b.put_u8(0x02); b.put_slice(&chunk); }, FileFrame::LastChunk(chunk) => { - b.put_u8(0x02); + b.put_u8(0x03); b.put_slice(&chunk); }, FileFrame::Eos => { - b.put_u8(0x03); + b.put_u8(0x04); }, }, Frame::Dir(dir) => match dir { - DirFrame::Prelude(num_entries) => { - b.put_u8(0x09); + DirFrame::Prelude((num_entries, hash)) => { + b.put_u8(0x10); b.put_u32_le(num_entries); + b.put_slice(&hash); }, DirFrame::Proof(proof) => { - b.put_u8(0x10); + b.put_u8(0x11); b.put_slice(&proof); }, DirFrame::Chunk(chunk) => { - b.put_u8(0x11); + b.put_u8(0x12); b.put_slice(&chunk.name); b.put_u8(0x00); match &chunk.link { @@ -394,7 +401,7 @@ impl<'a> From> for Bytes { } }, DirFrame::LastChunk(chunk) => { - b.put_u8(0x12); + b.put_u8(0x13); b.put_slice(&chunk.name); b.put_u8(0x00); match &chunk.link { @@ -409,7 +416,7 @@ impl<'a> From> for Bytes { } }, DirFrame::Eos => { - b.put_u8(0x13); + b.put_u8(0x14); }, }, } @@ -423,15 +430,25 @@ impl TryFrom for Frame<'static> { fn try_from(mut value: Bytes) -> Result { let val_frame = value.get_u8(); match val_frame { - 0x00 => Ok(Frame::File(FileFrame::Proof(Cow::Owned(value.to_vec())))), - 0x01 => Ok(Frame::File(FileFrame::Chunk(Cow::Owned(value.to_vec())))), - 0x02 => Ok(Frame::File(FileFrame::LastChunk(Cow::Owned( + 0x00 => { + let mut content: [u8; 32] = [0; 32]; + content.copy_from_slice(&value); + Ok(Frame::File(FileFrame::Prelude(content))) + }, + 0x01 => Ok(Frame::File(FileFrame::Proof(Cow::Owned(value.to_vec())))), + 0x02 => Ok(Frame::File(FileFrame::Chunk(Cow::Owned(value.to_vec())))), + 0x03 => Ok(Frame::File(FileFrame::LastChunk(Cow::Owned( value.to_vec(), )))), - 0x03 => Ok(Frame::File(FileFrame::Eos)), - 0x09 => Ok(Frame::Dir(DirFrame::Prelude(value.get_u32_le()))), - 0x10 => Ok(Frame::Dir(DirFrame::Proof(Cow::Owned(value.to_vec())))), - 0x11 => { + 0x04 => Ok(Frame::File(FileFrame::Eos)), + 0x10 => { + let num_entries = value.get_u32_le(); + let mut content: [u8; 32] = [0; 32]; + content.copy_from_slice(&value); + Ok(Frame::Dir(DirFrame::Prelude((num_entries, content)))) + }, + 0x11 => Ok(Frame::Dir(DirFrame::Proof(Cow::Owned(value.to_vec())))), + 0x12 => { let bytes: &[u8] = if let Some(bs) = value.iter().position(|p| *p == 0x00) { &value.split_to(bs) } else { @@ -472,7 +489,7 @@ impl TryFrom for Frame<'static> { _ => Err(anyhow!("Unknown magic byte for OwnedEntry type")), } }, - 0x12 => { + 0x13 => { let bytes: &[u8] = if let Some(bs) = value.iter().position(|p| *p == 0x00) { &value.split_to(bs) } else { @@ -514,7 +531,7 @@ impl TryFrom for Frame<'static> { _ => Err(anyhow!("Unknown magic byte for OwnedEntry type")), } }, - 0x13 => Ok(Frame::Dir(DirFrame::Eos)), + 0x14 => Ok(Frame::Dir(DirFrame::Eos)), i => Err(anyhow!("Unknown magic byte {i}")), } } @@ -546,6 +563,7 @@ async fn handle_request( if let Some(file) = tree.into_file() { match send_file::( file, + peer_request.hash, &mut request, num_blocks, &num_responses, @@ -570,6 +588,7 @@ async fn handle_request( } else if let Some(dir) = tree.into_dir() { match send_dir::( dir, + peer_request.hash, &mut request, blockstore, num_blocks, @@ -600,6 +619,7 @@ async fn handle_request( async fn send_file( file: b3fs::bucket::file::reader::B3File, + hash: [u8; 32], request: &mut ::Request, num_blocks: u32, num_responses: &AtomicUsize, @@ -615,6 +635,13 @@ async fn send_file( return Err(anyhow!("Failed to get Async HashTree {}", e)); }, }; + if let Err(e) = request + .send(Bytes::from(Frame::File(FileFrame::Prelude(hash)))) + .await + { + num_responses.fetch_sub(1, Ordering::Release); + return Err(anyhow!("Failed to send prelude file: {e:?}")); + } for block in 0..num_blocks { let hash = match reader.get_hash(block).await { Ok(Some(hash)) => hash, @@ -667,8 +694,27 @@ async fn send_file( Ok(()) } +struct QueueDir { + dir: B3Dir, + num_blocks: u32, + hash: [u8; 32], + prelude_sent: bool, +} + +impl QueueDir { + pub fn new(dir: B3Dir, hash: [u8; 32], num_blocks: u32) -> Self { + QueueDir { + dir, + num_blocks, + hash, + prelude_sent: false, + } + } +} + async fn send_dir( - mut dir: b3fs::bucket::dir::reader::B3Dir, + dir: b3fs::bucket::dir::reader::B3Dir, + hash: [u8; 32], request: &mut ::Request, blockstore: ::BlockstoreInterface, num_blocks: u32, @@ -679,39 +725,42 @@ async fn send_dir( let mut num_bytes = 0; let instant = Instant::now(); - let reader = match dir.hashtree().await { - Ok(reader) => reader, - Err(e) => { - return Err(anyhow!("Failed to get Async HashTree {}", e)); - }, - }; - let entries_reader = match dir.entries().await { - Ok(entries) => entries, - Err(e) => { - return Err(anyhow!("Error trying to obtain Entries Iterator {}", e)); - }, - }; let mut stack_reader = VecDeque::new(); - stack_reader.push_back((reader, entries_reader, false)); + stack_reader.push_back(QueueDir::new(dir, hash, num_blocks)); 'dir_reader: loop { - if let Some((ref mut reader, ref mut entries_reader, ref mut prelude_sent)) = - stack_reader.pop_front() - { - if !*prelude_sent { + if let Some(ref mut queue_dir) = stack_reader.pop_front() { + dbg!("New dir"); + let mut reader = match queue_dir.dir.hashtree().await { + Ok(reader) => reader, + Err(e) => { + return Err(anyhow!("Failed to get Async HashTree {}", e)); + }, + }; + let mut entries_reader = match queue_dir.dir.entries().await { + Ok(entries) => entries, + Err(e) => { + return Err(anyhow!("Error trying to obtain Entries Iterator {}", e)); + }, + }; + if !queue_dir.prelude_sent { if let Err(e) = request - .send(Bytes::from(Frame::Dir(DirFrame::Prelude(num_blocks)))) + .send(Bytes::from(Frame::Dir(DirFrame::Prelude(( + queue_dir.num_blocks, + queue_dir.hash, + ))))) .await { return Err(anyhow!("Failed to send prelude: {e:?}")); } else { rep_reporter.report_bytes_sent(peer, num_bytes as u64, Some(instant.elapsed())); - *prelude_sent = true; + queue_dir.prelude_sent = true; } } let mut block = 0; 'entries: while let Some(ent) = entries_reader.next().await { + dbg!("new entry"); match ent { Ok(ref entry) => { let proof = match reader.generate_proof(block).await { @@ -745,14 +794,13 @@ async fn send_dir( let content_header = blockstore.get_bucket().get(&content).await; match content_header { Ok(cont_head) => { + let num_blocks_new_content = cont_head.blocks(); if cont_head.is_dir() { - if let Some(ref mut dir_reader) = cont_head.into_dir() { - let reader = dir_reader.hashtree().await?; - let entries_reader = dir_reader.entries().await?; - stack_reader.push_back(( - reader, - entries_reader, - false, + if let Some(dir_reader) = cont_head.into_dir() { + stack_reader.push_back(QueueDir::new( + dir_reader, + content, + num_blocks_new_content, )); continue 'entries; } else { @@ -761,10 +809,12 @@ async fn send_dir( )); } } else if let Some(file_data) = cont_head.into_file() { + dbg!("sending file"); send_file::( file_data, + content, request, - num_blocks, + num_blocks_new_content, num_responses, blockstore.clone(), rep_reporter.clone(), @@ -849,14 +899,11 @@ async fn send_request( }; match frame { Frame::File(file) => { - if writer.is_none() { + if let FileFrame::Prelude(hash) = file { writer = Some(RwLock::new( - blockstore - .file_untrusted_writer(request.hash) - .await - .unwrap(), + blockstore.file_untrusted_writer(hash).await.unwrap(), )); - } + }; if let Some(ref file_writer) = writer { match handle_send_request_file::(file_writer, file).await { Ok(RespSendRequest::Continue) => (), @@ -884,10 +931,10 @@ async fn send_request( } }, Frame::Dir(dir) => { - if let DirFrame::Prelude(num_entries) = dir { + if let DirFrame::Prelude((num_entries, hash)) = dir { dir_writer = Some(RwLock::new( blockstore - .dir_untrusted_writer(request.hash, num_entries as usize) + .dir_untrusted_writer(hash, num_entries as usize) .await .unwrap(), )); @@ -951,6 +998,7 @@ async fn handle_send_request_file( file: FileFrame<'_>, ) -> Result { match file { + FileFrame::Prelude(_) => Ok(RespSendRequest::Continue), FileFrame::Proof(proof) => writer .write() .await diff --git a/core/blockstore-server/src/tests.rs b/core/blockstore-server/src/tests.rs index 30a05f96d..d2e5ee9e8 100644 --- a/core/blockstore-server/src/tests.rs +++ b/core/blockstore-server/src/tests.rs @@ -208,6 +208,7 @@ async fn test_stream_verified_content() { let mut putter = UntrustedFileWriter::new(&bucket, root_hash).await.unwrap(); while let Some(frame) = network_wire.pop_front() { match frame { + Frame::File(FileFrame::Prelude(_root_hash)) => (), Frame::File(FileFrame::Proof(proof)) => putter.feed_proof(&proof).await.unwrap(), Frame::File(FileFrame::LastChunk(chunk)) => { putter.write(&chunk, true).await.unwrap(); @@ -336,7 +337,10 @@ async fn test_dir_stream_verified_content() { let mut tree_dir = tree.into_dir().unwrap(); let mut reader = tree_dir.hashtree().await.unwrap(); let mut entries_reader = tree_dir.entries().await.unwrap(); - network_wire.push_back(Frame::Dir(DirFrame::Prelude(num_blocks))); + network_wire.push_back(Frame::Dir(DirFrame::Prelude(( + num_blocks, + root_hash_testdir, + )))); let mut block = 0; while let Some(ent) = entries_reader.next().await { let entry = ent.unwrap(); @@ -362,10 +366,10 @@ async fn test_dir_stream_verified_content() { let mut putter = None; while let Some(ref frame) = network_wire.pop_front() { match frame { - Frame::Dir(DirFrame::Prelude(num_entries)) => { + Frame::Dir(DirFrame::Prelude((num_entries, hash))) => { putter = Some(RwLock::new( blockstore_peer1 - .dir_untrusted_writer(root_hash_testdir, *num_entries as usize) + .dir_untrusted_writer(*hash, *num_entries as usize) .await .unwrap(), ));