Skip to content

Commit

Permalink
Fix erasure coded reads when a block is missing (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman authored Apr 5, 2024
1 parent a8826fa commit f8183c3
Showing 1 changed file with 31 additions and 20 deletions.
51 changes: 31 additions & 20 deletions crates/hdfs-native/src/hdfs/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,19 @@ impl StripedBlockStream {
let block_read_len = block_end - block_start;

assert_eq!(self.block.block_indices().len(), self.block.locs.len());
let block_map: HashMap<u8, &hdfs::DatanodeInfoProto> = self
let datanode_infos: Vec<(&hdfs::DatanodeInfoProto, &common::TokenProto)> = self
.block
.locs
.iter()
.zip(self.block.block_tokens.iter())
.collect();

let block_map: HashMap<u8, (&hdfs::DatanodeInfoProto, &common::TokenProto)> = self
.block
.block_indices()
.iter()
.copied()
.zip(self.block.locs.iter())
.zip(datanode_infos.into_iter())
.collect();

let mut stripe_results: Vec<Option<Bytes>> =
Expand All @@ -264,10 +271,12 @@ impl StripedBlockStream {
let mut futures = Vec::new();

for index in 0..self.ec_schema.data_units as u8 {
let datanode_info = block_map.get(&index);
futures.push(self.read_vertical_stripe(
&self.ec_schema,
index,
block_map.get(&index),
datanode_info.map(|(datanode, _)| datanode),
datanode_info.map(|(_, token)| token),
block_start,
block_read_len,
));
Expand All @@ -287,12 +296,13 @@ impl StripedBlockStream {
let mut parity_unit = 0usize;
while blocks_needed > 0 && parity_unit < self.ec_schema.parity_units {
let block_index = (self.ec_schema.data_units + parity_unit) as u8;
let datanode_info = block_map.get(&block_index).unwrap();
let datanode_info = block_map.get(&block_index);
let result = self
.read_vertical_stripe(
&self.ec_schema,
block_index,
Some(datanode_info),
datanode_info.map(|(datanode, _)| datanode),
datanode_info.map(|(_, token)| token),
block_start,
block_read_len,
)
Expand Down Expand Up @@ -337,6 +347,7 @@ impl StripedBlockStream {
ec_schema: &EcSchema,
index: u8,
datanode: Option<&&hdfs::DatanodeInfoProto>,
token: Option<&&common::TokenProto>,
offset: usize,
len: usize,
) -> Result<Bytes> {
Expand All @@ -347,32 +358,32 @@ impl StripedBlockStream {
return Err(HdfsError::InternalError("Testing error".to_string()));
}
}
let max_block_offset =
ec_schema.max_offset(index as usize, self.block.b.num_bytes() as usize);

let mut buf = BytesMut::zeroed(len);
if let Some(datanode_info) = datanode {
let max_block_offset =
ec_schema.max_offset(index as usize, self.block.b.num_bytes() as usize);
let read_len = usize::min(len, max_block_offset - offset);

let read_len = usize::min(len, max_block_offset - offset);
if read_len == 0 {
// We're past the end of the file so there's nothign to read, just return a zeroed buffer
Ok(BytesMut::zeroed(len).freeze())
} else if let Some((datanode_info, token)) = datanode.zip(token) {
let mut buf = BytesMut::zeroed(len);

// Each vertical stripe has a block ID of the original located block ID + block index
// That was fun to figure out
let mut block = self.block.b.clone();
block.block_id += index as u64;

// The token of the first block is the main one, then all the rest are in the `block_tokens` list
let token = &self.block.block_tokens[self
.block
.block_indices()
.iter()
.position(|x| *x == index)
.unwrap()];

self.read_from_datanode(&datanode_info.id, &block, token, offset, read_len, &mut buf)
.await?;
}

Ok(buf.freeze())
Ok(buf.freeze())
} else {
// There should be data to read but we didn't get block info for this index, so this shard is missing
Err(HdfsError::ErasureCodingError(
"Shard is missing".to_string(),
))
}
}

async fn read_from_datanode(
Expand Down

0 comments on commit f8183c3

Please sign in to comment.