From f72ab9256a469f0098d7e06e5f067cb2f0b38d64 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 5 Dec 2024 07:22:09 -0500 Subject: [PATCH] Handle DataNode failures in replicated writes (#164) --- Cargo.lock | 4 +- rust/minidfs/src/main/java/main/Main.java | 2 +- rust/src/file.rs | 8 +- rust/src/hdfs/block_writer.rs | 622 ++++++++++++++-------- rust/src/hdfs/connection.rs | 127 ++--- rust/src/hdfs/protocol.rs | 19 +- rust/src/test.rs | 4 +- rust/tests/test_write_resiliency.rs | 115 +++- 8 files changed, 599 insertions(+), 302 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index beb4568..0184ffd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1646,9 +1646,9 @@ dependencies = [ [[package]] name = "scc" -version = "2.2.4" +version = "2.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8d25269dd3a12467afe2e510f69fb0b46b698e5afb296b59f2145259deaf8e8" +checksum = "66b202022bb57c049555430e11fc22fea12909276a80a4c3d368da36ac1d88ed" dependencies = [ "sdd", ] diff --git a/rust/minidfs/src/main/java/main/Main.java b/rust/minidfs/src/main/java/main/Main.java index 24cd194..934605a 100644 --- a/rust/minidfs/src/main/java/main/Main.java +++ b/rust/minidfs/src/main/java/main/Main.java @@ -106,7 +106,7 @@ public static void main(String args[]) throws Exception { } else { MiniDFSNNTopology nnTopology = generateTopology(flags, hdfsConf); - int numDataNodes = 1; + int numDataNodes = 3; if (flags.contains("ec")) { // Enough for the largest EC policy numDataNodes = 14; diff --git a/rust/src/file.rs b/rust/src/file.rs index a785a04..1d2f68c 100644 --- a/rust/src/file.rs +++ b/rust/src/file.rs @@ -208,9 +208,7 @@ impl FileWriter { // Not appending to an existing block, just create a new one // If there's an existing block writer, close it first let extended_block = if let Some(block_writer) = self.block_writer.take() { - let extended_block = block_writer.get_extended_block(); - block_writer.close().await?; - Some(extended_block) + Some(block_writer.close().await?) } else { None }; @@ -265,9 +263,7 @@ impl FileWriter { pub async fn close(&mut self) -> Result<()> { if !self.closed { let extended_block = if let Some(block_writer) = self.block_writer.take() { - let extended_block = block_writer.get_extended_block(); - block_writer.close().await?; - Some(extended_block) + Some(block_writer.close().await?) } else { None }; diff --git a/rust/src/hdfs/block_writer.rs b/rust/src/hdfs/block_writer.rs index ea32c54..8a33e04 100644 --- a/rust/src/hdfs/block_writer.rs +++ b/rust/src/hdfs/block_writer.rs @@ -2,18 +2,19 @@ use std::{sync::Arc, time::Duration}; use bytes::{BufMut, Bytes, BytesMut}; use futures::future::join_all; -use log::debug; +use log::{debug, warn}; use tokio::{sync::mpsc, task::JoinHandle}; use crate::{ ec::{gf256::Coder, EcSchema}, - hdfs::connection::{DatanodeConnection, DatanodeReader, DatanodeWriter, Op, Packet}, + hdfs::{ + connection::{DatanodeConnection, DatanodeReader, DatanodeWriter, Op, WritePacket}, + protocol::NamenodeProtocol, + }, proto::hdfs, HdfsError, Result, }; -use super::protocol::NamenodeProtocol; - const HEART_BEAT_SEQNO: i64 = -1; const UNKNOWN_SEQNO: i64 = -2; @@ -47,7 +48,7 @@ impl BlockWriter { )) } else { Self::Replicated( - ReplicatedBlockWriter::new(&protocol, block, block_size, server_defaults).await?, + ReplicatedBlockWriter::new(protocol, block, block_size, server_defaults).await?, ) }; Ok(block_writer) @@ -67,14 +68,7 @@ impl BlockWriter { } } - pub(crate) fn get_extended_block(&self) -> hdfs::ExtendedBlockProto { - match self { - Self::Replicated(writer) => writer.get_extended_block(), - Self::Striped(writer) => writer.get_extended_block(), - } - } - - pub(crate) async fn close(self) -> Result<()> { + pub(crate) async fn close(self) -> Result { match self { Self::Replicated(writer) => writer.close().await, Self::Striped(writer) => writer.close().await, @@ -82,33 +76,352 @@ impl BlockWriter { } } -pub(crate) struct ReplicatedBlockWriter { - block: hdfs::LocatedBlockProto, - block_size: usize, - server_defaults: hdfs::FsServerDefaultsProto, - - next_seqno: i64, - current_packet: Packet, +enum WriteStatus { + Success, + Recover(Vec, Vec), +} +struct Pipeline { + packet_sender: mpsc::Sender, // Tracks the state of acknowledgements. Set to an Err if any error occurs doing receiving - // acknowledgements. Set to Ok(()) when the last acknowledgement is received. - ack_listener_handle: JoinHandle>, + // acknowledgements. Set to Ok(WriteStatus::Success) when the last acknowledgement is received. + ack_listener_handle: JoinHandle>, // Tracks the state of packet sender. Set to Err if any error occurs during writing packets, - packet_sender_handle: JoinHandle>, + packet_sender_handle: JoinHandle>, // Tracks the heartbeat task so we can abort it when we close heartbeat_handle: JoinHandle<()>, +} + +impl Pipeline { + fn new(connection: DatanodeConnection) -> Self { + let (reader, writer) = connection.split(); + + // Channel for tracking packets that need to be acked + let (ack_queue_sender, ack_queue_receiever) = + mpsc::channel::(WRITE_PACKET_BUFFER_LEN); + let (packet_sender, packet_receiver) = + mpsc::channel::(WRITE_PACKET_BUFFER_LEN); + + let ack_listener_handle = Self::listen_for_acks(reader, ack_queue_receiever); + let packet_sender_handle = + Self::start_packet_sender(writer, packet_receiver, ack_queue_sender); + let heartbeat_handle = Self::start_heartbeat_sender(packet_sender.clone()); + + Self { + packet_sender, + ack_listener_handle, + packet_sender_handle, + heartbeat_handle, + } + } + + async fn send_packet(&self, packet: WritePacket) -> std::result::Result<(), WritePacket> { + self.packet_sender.send(packet).await.map_err(|e| e.0) + } + + async fn shutdown(self) -> Result { + self.heartbeat_handle.abort(); + + let (failed_nodes, unacked_packets) = match self.ack_listener_handle.await.unwrap()? { + WriteStatus::Success => { + if !self.packet_sender_handle.await.unwrap().is_empty() { + return Err(HdfsError::DataTransferError( + "Failed to send all packets to DataNode".to_string(), + )); + } + return Ok(WriteStatus::Success); + } + WriteStatus::Recover(failed_nodes, unacked_packets) => (failed_nodes, unacked_packets), + }; + + let packets_to_send = self.packet_sender_handle.await.unwrap(); + + let packets_to_replay = unacked_packets.into_iter().chain(packets_to_send).collect(); + + Ok(WriteStatus::Recover(failed_nodes, packets_to_replay)) + } + + async fn drain_queue(mut queue: mpsc::Receiver) -> Vec { + queue.close(); + + let mut packets = Vec::with_capacity(queue.len()); + while let Some(packet) = queue.recv().await { + packets.push(packet); + } + packets + } + + fn start_packet_sender( + mut writer: DatanodeWriter, + mut packet_receiver: mpsc::Receiver, + ack_queue: mpsc::Sender, + ) -> JoinHandle> { + tokio::spawn(async move { + while let Some(mut packet) = packet_receiver.recv().await { + // Simulate node we are writing to failing + #[cfg(feature = "integration-test")] + if crate::test::WRITE_CONNECTION_FAULT_INJECTOR + .swap(false, std::sync::atomic::Ordering::SeqCst) + { + debug!("Failing write to active node"); + return [packet] + .into_iter() + .chain(Self::drain_queue(packet_receiver).await) + .collect(); + } + + if let Err(e) = writer.write_packet(&mut packet).await { + warn!("Failed to send packet to DataNode: {:?}", e); + return [packet] + .into_iter() + .chain(Self::drain_queue(packet_receiver).await) + .collect(); + } + + if packet.header.seqno == HEART_BEAT_SEQNO { + continue; + } + + let last_packet = packet.header.last_packet_in_block; + + if let Err(err) = ack_queue.send(packet).await { + // Ack listener failed, so it will have a failed node + return [err.0] + .into_iter() + .chain(Self::drain_queue(packet_receiver).await) + .collect(); + }; + + if last_packet { + break; + } + } + vec![] + }) + } + + fn start_heartbeat_sender(packet_sender: mpsc::Sender) -> JoinHandle<()> { + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS)).await; + let heartbeat_packet = WritePacket::empty(0, HEART_BEAT_SEQNO, 0, 0); + // If this fails, sending anymore data packets will generate an error as well + if packet_sender.send(heartbeat_packet).await.is_err() { + break; + } + } + }) + } + + fn listen_for_acks( + mut reader: DatanodeReader, + mut ack_queue: mpsc::Receiver, + ) -> JoinHandle> { + tokio::spawn(async move { + loop { + let next_ack = match reader.read_ack().await { + Ok(next_ack) => next_ack, + Err(e) => { + warn!("Failed to read ack from DataNode: {}", e); + return Ok(WriteStatus::Recover( + vec![0], + Self::drain_queue(ack_queue).await, + )); + } + }; + + let mut failed_nodes: Vec = vec![]; + + for (i, reply) in next_ack.reply().enumerate() { + // Simulate node we are replicating to failing + #[cfg(feature = "integration-test")] + if crate::test::WRITE_REPLY_FAULT_INJECTOR + .lock() + .unwrap() + .take() + .is_some_and(|j| i == j) + { + debug!("Failing write to replica node"); + failed_nodes.push(i); + } + + if reply != hdfs::Status::Success { + failed_nodes.push(i); + } + } + + if !failed_nodes.is_empty() { + // I need a way to make sure the packet sender thread dies here + return Ok(WriteStatus::Recover( + failed_nodes, + Self::drain_queue(ack_queue).await, + )); + } + + if next_ack.seqno == HEART_BEAT_SEQNO { + continue; + } + if next_ack.seqno == UNKNOWN_SEQNO { + return Err(HdfsError::DataTransferError( + "Received unknown seqno for successful ack".to_string(), + )); + } - ack_queue: mpsc::Sender<(i64, bool)>, - packet_sender: mpsc::Sender, + if let Some(packet) = ack_queue.recv().await { + debug!("Next: {}, packet: {}", next_ack.seqno, packet.header.seqno); + if next_ack.seqno != packet.header.seqno { + return Err(HdfsError::DataTransferError( + "Received acknowledgement does not match expected sequence number" + .to_string(), + )); + } + + if packet.header.last_packet_in_block { + return Ok(WriteStatus::Success); + } + } else { + // Error occurred in the packet sender, which would only happen on errors + // communicating with the DataNode + return Ok(WriteStatus::Recover( + vec![0], + Self::drain_queue(ack_queue).await, + )); + } + } + }) + } +} + +pub(crate) struct ReplicatedBlockWriter { + protocol: Arc, + block: hdfs::LocatedBlockProto, + block_size: usize, + server_defaults: hdfs::FsServerDefaultsProto, + + current_packet: WritePacket, + pipeline: Option, } impl ReplicatedBlockWriter { async fn new( - protocol: &Arc, + protocol: Arc, block: hdfs::LocatedBlockProto, block_size: usize, server_defaults: hdfs::FsServerDefaultsProto, ) -> Result { + let pipeline = + Self::setup_pipeline(&protocol, &block, &server_defaults, None, None).await?; + + let bytes_in_last_chunk = block.b.num_bytes() % server_defaults.bytes_per_checksum as u64; + let current_packet = if bytes_in_last_chunk > 0 { + // When appending, we want to first send a packet with a single chunk of the data required + // to get the block to a multiple of bytes_per_checksum. After that, things work the same + // as create. + WritePacket::empty( + block.b.num_bytes() as i64, + 0, + server_defaults.bytes_per_checksum - bytes_in_last_chunk as u32, + 0, + ) + } else { + WritePacket::empty( + block.b.num_bytes() as i64, + 0, + server_defaults.bytes_per_checksum, + server_defaults.write_packet_size, + ) + }; + + let this = Self { + protocol, + block, + block_size, + server_defaults, + current_packet, + + pipeline: Some(pipeline), + }; + + Ok(this) + } + + async fn recover( + &mut self, + failed_nodes: Vec, + packets_to_replay: Vec, + ) -> Result<()> { + debug!( + "Failed nodes: {:?}, block locs: {:?}", + failed_nodes, self.block.locs + ); + if failed_nodes.len() >= self.block.locs.len() { + return Err(HdfsError::DataTransferError( + "All nodes failed for write".to_string(), + )); + } + + debug!("Recovering block writer"); + + let mut new_block = self.block.clone(); + for failed_node in failed_nodes { + new_block.locs.remove(failed_node); + new_block.storage_i_ds.remove(failed_node); + new_block.storage_types.remove(failed_node); + } + + let mut bytes_acked = new_block.b.num_bytes(); + for packet in packets_to_replay.iter() { + bytes_acked -= packet.data.len() as u64; + } + + let old_block = std::mem::replace(&mut self.block, new_block); + + let updated_block = self + .protocol + .update_block_for_pipeline(self.block.b.clone()) + .await?; + + // self.block.b.generation_stamp = updated_block.block.b.generation_stamp; + self.block.block_token = updated_block.block.block_token; + + let pipeline = Self::setup_pipeline( + &self.protocol, + &self.block, + &self.server_defaults, + Some(updated_block.block.b.generation_stamp), + Some(bytes_acked), + ) + .await?; + + self.block.b.generation_stamp = updated_block.block.b.generation_stamp; + + for packet in packets_to_replay { + pipeline.send_packet(packet).await.map_err(|_| { + HdfsError::DataTransferError("Failed to replay packets during recovery".to_string()) + })?; + } + + self.protocol + .update_pipeline( + old_block.b, + self.block.b.clone(), + self.block.locs.iter().map(|l| l.id.clone()).collect(), + self.block.storage_i_ds.clone(), + ) + .await?; + + self.pipeline = Some(pipeline); + + Ok(()) + } + + async fn setup_pipeline( + protocol: &Arc, + block: &hdfs::LocatedBlockProto, + server_defaults: &hdfs::FsServerDefaultsProto, + new_gs: Option, + bytes_acked: Option, + ) -> Result { let datanode = &block.locs[0].id; let mut connection = DatanodeConnection::connect( datanode, @@ -122,9 +435,10 @@ impl ReplicatedBlockWriter { bytes_per_checksum: server_defaults.bytes_per_checksum, }; - let append = block.b.num_bytes() > 0; - - let stage = if append { + let stage = if new_gs.is_some() { + hdfs::op_write_block_proto::BlockConstructionStage::PipelineSetupStreamingRecovery + as i32 + } else if block.b.num_bytes() > 0 { hdfs::op_write_block_proto::BlockConstructionStage::PipelineSetupAppend as i32 } else { hdfs::op_write_block_proto::BlockConstructionStage::PipelineSetupCreate as i32 @@ -135,8 +449,8 @@ impl ReplicatedBlockWriter { stage, targets: block.locs[1..].to_vec(), pipeline_size: block.locs.len() as u32, - latest_generation_stamp: block.b.generation_stamp, - min_bytes_rcvd: block.b.num_bytes(), + latest_generation_stamp: new_gs.unwrap_or(block.b.generation_stamp), + min_bytes_rcvd: bytes_acked.unwrap_or(block.b.num_bytes()), max_bytes_rcvd: block.b.num_bytes(), requested_checksum: checksum, storage_type: Some(block.storage_types[0]), @@ -150,118 +464,55 @@ impl ReplicatedBlockWriter { let response = connection.send(Op::WriteBlock, &message).await?; debug!("Block write response: {:?}", response); - let (reader, writer) = connection.split(); - - // Channel for tracking packets that need to be acked - let (ack_queue_sender, ack_queue_receiever) = - mpsc::channel::<(i64, bool)>(WRITE_PACKET_BUFFER_LEN); - let (packet_sender, packet_receiver) = mpsc::channel::(WRITE_PACKET_BUFFER_LEN); - - let ack_listener_handle = Self::listen_for_acks(reader, ack_queue_receiever); - let packet_sender_handle = Self::start_packet_sender(writer, packet_receiver); - let heartbeat_handle = Self::start_heartbeat_sender(packet_sender.clone()); - - let bytes_per_checksum = server_defaults.bytes_per_checksum; - let write_packet_size = server_defaults.write_packet_size; - - let bytes_left_in_chunk = server_defaults.bytes_per_checksum - - (block.b.num_bytes() % server_defaults.bytes_per_checksum as u64) as u32; - let current_packet = if append && bytes_left_in_chunk > 0 { - // When appending, we want to first send a packet with a single chunk of the data required - // to get the block to a multiple of bytes_per_checksum. After that, things work the same - // as create. - Packet::empty(block.b.num_bytes() as i64, 0, bytes_left_in_chunk, 0) - } else { - Packet::empty( - block.b.num_bytes() as i64, - 0, - bytes_per_checksum, - write_packet_size, - ) - }; - - let this = Self { - block, - block_size, - server_defaults, - next_seqno: 1, - current_packet, - - ack_listener_handle, - packet_sender_handle, - heartbeat_handle, - - ack_queue: ack_queue_sender, - packet_sender, - }; - - Ok(this) + Ok(Pipeline::new(connection)) } // Create the next packet and return the current packet - fn create_next_packet(&mut self) -> Packet { - let next_packet = Packet::empty( + fn create_next_packet(&mut self) -> WritePacket { + let next_packet = WritePacket::empty( self.block.b.num_bytes() as i64, - self.next_seqno, + self.current_packet.header.seqno + 1, self.server_defaults.bytes_per_checksum, self.server_defaults.write_packet_size, ); - self.next_seqno += 1; std::mem::replace(&mut self.current_packet, next_packet) } - async fn queue_ack(&self) -> Result<()> { - self.ack_queue - .send(( - self.current_packet.header.seqno, - self.current_packet.header.last_packet_in_block, - )) - .await - .map_err(|_| HdfsError::DataTransferError("Failed to send to ack queue".to_string())) - } - async fn send_current_packet(&mut self) -> Result<()> { - // Queue up the sequence number for acknowledgement - self.queue_ack().await?; - // Create a fresh packet - let current_packet = self.create_next_packet(); - - // Send the packet - // TODO: handler error - let _ = self.packet_sender.send(current_packet).await; - - Ok(()) - } - - fn check_error(&mut self) -> Result<()> { - // If either task is finished, something went wrong - if self.ack_listener_handle.is_finished() { - return Err(HdfsError::DataTransferError( - "Ack listener finished prematurely".to_string(), - )); - } - - if self.packet_sender_handle.is_finished() { - return Err(HdfsError::DataTransferError( - "Packet sender finished prematurely".to_string(), - )); + let mut current_packet = self.create_next_packet(); + + loop { + let pipeline = self.pipeline.take().ok_or(HdfsError::DataTransferError( + "Block writer is closed".to_string(), + ))?; + + if let Err(packet) = pipeline.send_packet(current_packet).await { + // Shutdown the pipeline and try to recover + current_packet = packet; + match pipeline.shutdown().await? { + WriteStatus::Success => { + return Err(HdfsError::DataTransferError( + "Pipeline succeeded but failure was expected".to_string(), + )) + } + WriteStatus::Recover(failed_nodes, packets_to_replay) => { + self.recover(failed_nodes, packets_to_replay).await?; + } + } + } else { + self.pipeline = Some(pipeline); + return Ok(()); + } + // Send the packet } - - Ok(()) } fn is_full(&self) -> bool { self.block.b.num_bytes() == self.block_size as u64 } - fn get_extended_block(&self) -> hdfs::ExtendedBlockProto { - self.block.b.clone() - } - async fn write(&mut self, buf: &mut Bytes) -> Result<()> { - self.check_error()?; - // Only write up to what's left in this block let bytes_to_write = usize::min( buf.len(), @@ -285,9 +536,7 @@ impl ReplicatedBlockWriter { } /// Send a packet with any remaining data and then send a last packet - async fn close(mut self) -> Result<()> { - self.check_error()?; - + async fn close(mut self) -> Result { // Send a packet with any remaining data if !self.current_packet.is_empty() { self.send_current_packet().await?; @@ -297,98 +546,22 @@ impl ReplicatedBlockWriter { self.current_packet.set_last_packet(); self.send_current_packet().await?; - self.heartbeat_handle.abort(); - - // Wait for all packets to be sent - self.packet_sender_handle.await.map_err(|_| { - HdfsError::DataTransferError( - "Packet sender task err while waiting for packets to send".to_string(), - ) - })??; - - // Wait for the channel to close, meaning all acks have been received or an error occured - self.ack_listener_handle.await.map_err(|_| { - HdfsError::DataTransferError( - "Ack status channel closed while waiting for final ack".to_string(), - ) - })??; - - Ok(()) - } - - fn listen_for_acks( - mut reader: DatanodeReader, - mut ack_queue: mpsc::Receiver<(i64, bool)>, - ) -> JoinHandle> { - tokio::spawn(async move { - loop { - let next_ack = reader.read_ack().await?; - - for reply in next_ack.reply.iter() { - if *reply != hdfs::Status::Success as i32 { - return Err(HdfsError::DataTransferError(format!( - "Received non-success status in datanode ack: {:?}", - hdfs::Status::try_from(*reply) - ))); - } - } - - if next_ack.seqno == HEART_BEAT_SEQNO { - continue; - } - if next_ack.seqno == UNKNOWN_SEQNO { - return Err(HdfsError::DataTransferError( - "Received unknown seqno for successful ack".to_string(), - )); - } - - if let Some((seqno, last_packet)) = ack_queue.recv().await { - if next_ack.seqno != seqno { - return Err(HdfsError::DataTransferError( - "Received acknowledgement does not match expected sequence number" - .to_string(), - )); - } - - if last_packet { - return Ok(()); - } - } else { - return Err(HdfsError::DataTransferError( - "Channel closed while getting next seqno to acknowledge".to_string(), - )); + loop { + match self + .pipeline + .take() + .ok_or(HdfsError::DataTransferError( + "Block writer closed prematurely".to_string(), + ))? + .shutdown() + .await? + { + WriteStatus::Success => return Ok(self.block.b), + WriteStatus::Recover(failed_nodes, packets_to_replay) => { + self.recover(failed_nodes, packets_to_replay).await? } } - }) - } - - fn start_packet_sender( - mut writer: DatanodeWriter, - mut packet_receiver: mpsc::Receiver, - ) -> JoinHandle> { - tokio::spawn(async move { - while let Some(mut packet) = packet_receiver.recv().await { - writer.write_packet(&mut packet).await?; - - if packet.header.last_packet_in_block { - break; - } - } - Ok(()) - }) - } - - fn start_heartbeat_sender(packet_sender: mpsc::Sender) -> JoinHandle<()> { - tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS)).await; - let heartbeat_packet = Packet::empty(0, HEART_BEAT_SEQNO, 0, 0); - // If this fails, sending anymore data packets will generate an error as well - if packet_sender.send(heartbeat_packet).await.is_err() { - break; - } - } - }) + } } } @@ -538,7 +711,7 @@ impl StripedBlockWriter { *writer = Some( ReplicatedBlockWriter::new( - &self.protocol, + Arc::clone(&self.protocol), cloned, self.block_size, self.server_defaults.clone(), @@ -575,7 +748,7 @@ impl StripedBlockWriter { Ok(()) } - async fn close(mut self) -> Result<()> { + async fn close(mut self) -> Result { if !self.cell_buffer.is_empty() { self.write_cells().await?; } @@ -590,7 +763,11 @@ impl StripedBlockWriter { close_result?; } - Ok(()) + let mut extended_block = self.block.b; + + extended_block.num_bytes = Some(self.bytes_written as u64); + + Ok(extended_block) } fn is_full(&self) -> bool { @@ -598,11 +775,4 @@ impl StripedBlockWriter { .iter() .all(|writer| writer.as_ref().is_some_and(|w| w.is_full())) } - - fn get_extended_block(&self) -> hdfs::ExtendedBlockProto { - let mut extended_block = self.block.b.clone(); - - extended_block.num_bytes = Some(self.bytes_written as u64); - extended_block - } } diff --git a/rust/src/hdfs/connection.rs b/rust/src/hdfs/connection.rs index 926381f..197357f 100644 --- a/rust/src/hdfs/connection.rs +++ b/rust/src/hdfs/connection.rs @@ -392,25 +392,59 @@ impl Op { const CHECKSUM_BYTES: usize = 4; -pub(crate) struct Packet { +pub(crate) struct ReadPacket { pub header: hdfs::PacketHeaderProto, - checksum: BytesMut, - data: BytesMut, - bytes_per_checksum: usize, - max_data_size: usize, + checksum: Bytes, + data: Bytes, } -impl Packet { - fn new(header: hdfs::PacketHeaderProto, checksum: BytesMut, data: BytesMut) -> Self { +impl ReadPacket { + fn new(header: hdfs::PacketHeaderProto, checksum: Bytes, data: Bytes) -> Self { Self { header, checksum, data, - bytes_per_checksum: 0, - max_data_size: 0, } } + pub(crate) fn get_data( + mut self, + checksum_info: &Option, + ) -> Result { + // Verify the checksums if they were requested + if let Some(info) = checksum_info { + let algorithm = match info.checksum.r#type() { + hdfs::ChecksumTypeProto::ChecksumCrc32 => Some(&CRC32), + hdfs::ChecksumTypeProto::ChecksumCrc32c => Some(&CRC32C), + hdfs::ChecksumTypeProto::ChecksumNull => None, + }; + + if let Some(algorithm) = algorithm { + // Create a new Bytes view over the data that we can consume + let mut checksum_data = self.data.clone(); + while !checksum_data.is_empty() { + let chunk_checksum = algorithm.checksum(&checksum_data.split_to(usize::min( + info.checksum.bytes_per_checksum as usize, + checksum_data.len(), + ))); + if chunk_checksum != self.checksum.get_u32() { + return Err(HdfsError::ChecksumError); + } + } + } + } + Ok(self.data) + } +} + +pub(crate) struct WritePacket { + pub header: hdfs::PacketHeaderProto, + pub data: BytesMut, + bytes_per_checksum: usize, + max_data_size: usize, +} + +impl WritePacket { pub(crate) fn empty( offset: i64, seqno: i64, @@ -427,7 +461,6 @@ impl Packet { Self { header, - checksum: BytesMut::with_capacity(num_chunks * CHECKSUM_BYTES), data: BytesMut::with_capacity(num_chunks * bytes_per_checksum as usize), bytes_per_checksum: bytes_per_checksum as usize, max_data_size: num_chunks * bytes_per_checksum as usize, @@ -454,6 +487,7 @@ impl Packet { pub(crate) fn write(&mut self, buf: &mut Bytes) { self.data .put(buf.split_to(usize::min(self.max_data_size - self.data.len(), buf.len()))); + self.header.data_len = self.data.len() as i32; } pub(crate) fn is_full(&self) -> bool { @@ -464,53 +498,22 @@ impl Packet { self.data.is_empty() } - fn finalize(&mut self) -> (hdfs::PacketHeaderProto, Bytes, Bytes) { - let data = self.data.split().freeze(); + fn calculate_checksum(&mut self) -> Bytes { + if self.data.is_empty() || self.bytes_per_checksum == 0 { + return Bytes::new(); + } + + let mut checksum = BytesMut::with_capacity(self.data.len() / self.bytes_per_checksum); let mut chunk_start = 0; - while chunk_start < data.len() { - let chunk_end = usize::min(chunk_start + self.bytes_per_checksum, data.len()); - let chunk_checksum = CRC32C.checksum(&data[chunk_start..chunk_end]); - self.checksum.put_u32(chunk_checksum); + while chunk_start < self.data.len() { + let chunk_end = usize::min(chunk_start + self.bytes_per_checksum, self.data.len()); + let chunk_checksum = CRC32C.checksum(&self.data[chunk_start..chunk_end]); + checksum.put_u32(chunk_checksum); chunk_start += self.bytes_per_checksum; } - let checksum = self.checksum.split().freeze(); - - self.header.data_len = data.len() as i32; - - (self.header, checksum, data) - } - - pub(crate) fn get_data( - self, - checksum_info: &Option, - ) -> Result { - // Verify the checksums if they were requested - let mut checksums = self.checksum.freeze(); - let data = self.data.freeze(); - if let Some(info) = checksum_info { - let algorithm = match info.checksum.r#type() { - hdfs::ChecksumTypeProto::ChecksumCrc32 => Some(&CRC32), - hdfs::ChecksumTypeProto::ChecksumCrc32c => Some(&CRC32C), - hdfs::ChecksumTypeProto::ChecksumNull => None, - }; - - if let Some(algorithm) = algorithm { - // Create a new Bytes view over the data that we can consume - let mut checksum_data = data.clone(); - while !checksum_data.is_empty() { - let chunk_checksum = algorithm.checksum(&checksum_data.split_to(usize::min( - info.checksum.bytes_per_checksum as usize, - checksum_data.len(), - ))); - if chunk_checksum != checksums.get_u32() { - return Err(HdfsError::ChecksumError); - } - } - } - } - Ok(data) + checksum.freeze() } } @@ -581,7 +584,7 @@ impl DatanodeConnection { } } - pub(crate) async fn read_packet(&mut self) -> Result { + pub(crate) async fn read_packet(&mut self) -> Result { let mut payload_len_buf = [0u8; 4]; let mut header_len_buf = [0u8; 2]; self.reader.read_exact(&mut payload_len_buf).await?; @@ -597,10 +600,10 @@ impl DatanodeConnection { hdfs::PacketHeaderProto::decode(remaining_buf.split_to(header_length).freeze())?; let checksum_length = payload_length - 4 - header.data_len as usize; - let checksum = remaining_buf.split_to(checksum_length); - let data = remaining_buf; + let checksum = remaining_buf.split_to(checksum_length).freeze(); + let data = remaining_buf.freeze(); - Ok(Packet::new(header, checksum, data)) + Ok(ReadPacket::new(header, checksum, data)) } pub(crate) async fn send_read_success(&mut self) -> Result<()> { @@ -649,19 +652,19 @@ pub(crate) struct DatanodeWriter { impl DatanodeWriter { /// Create a buffer to send to the datanode - pub(crate) async fn write_packet(&mut self, packet: &mut Packet) -> Result<()> { - let (header, checksum, data) = packet.finalize(); + pub(crate) async fn write_packet(&mut self, packet: &mut WritePacket) -> Result<()> { + let checksum = packet.calculate_checksum(); - let payload_len = (checksum.len() + data.len() + 4) as u32; - let header_encoded = header.encode_to_vec(); + let payload_len = (checksum.len() + packet.data.len() + 4) as u32; + let header_encoded = packet.header.encode_to_vec(); self.writer.write_all(&payload_len.to_be_bytes()).await?; self.writer .write_all(&(header_encoded.len() as u16).to_be_bytes()) .await?; - self.writer.write_all(&header.encode_to_vec()).await?; + self.writer.write_all(&header_encoded).await?; self.writer.write_all(&checksum).await?; - self.writer.write_all(&data).await?; + self.writer.write_all(&packet.data).await?; self.writer.flush().await?; Ok(()) diff --git a/rust/src/hdfs/protocol.rs b/rust/src/hdfs/protocol.rs index a7f371c..e665c85 100644 --- a/rust/src/hdfs/protocol.rs +++ b/rust/src/hdfs/protocol.rs @@ -217,7 +217,6 @@ impl NamenodeProtocol { self.call("addBlock", message, true).await } - #[allow(dead_code)] pub(crate) async fn update_block_for_pipeline( &self, block: hdfs::ExtendedBlockProto, @@ -230,6 +229,24 @@ impl NamenodeProtocol { self.call("updateBlockForPipeline", message, true).await } + pub(crate) async fn update_pipeline( + &self, + old_block: hdfs::ExtendedBlockProto, + new_block: hdfs::ExtendedBlockProto, + new_nodes: Vec, + storage_i_ds: Vec, + ) -> Result { + let message = hdfs::UpdatePipelineRequestProto { + client_name: self.client_name.clone(), + old_block, + new_block, + new_nodes, + storage_i_ds, + }; + + self.call("updatePipeline", message, true).await + } + pub(crate) async fn complete( &self, src: &str, diff --git a/rust/src/test.rs b/rust/src/test.rs index 0ee25fc..7179f12 100644 --- a/rust/src/test.rs +++ b/rust/src/test.rs @@ -1,6 +1,8 @@ -use std::sync::Mutex; +use std::sync::{atomic::AtomicBool, Mutex}; pub static EC_FAULT_INJECTOR: Mutex> = Mutex::new(None); +pub static WRITE_CONNECTION_FAULT_INJECTOR: AtomicBool = AtomicBool::new(false); +pub static WRITE_REPLY_FAULT_INJECTOR: Mutex> = Mutex::new(None); pub struct EcFaultInjection { pub fail_blocks: Vec, diff --git a/rust/tests/test_write_resiliency.rs b/rust/tests/test_write_resiliency.rs index 03ac782..ee1f54f 100644 --- a/rust/tests/test_write_resiliency.rs +++ b/rust/tests/test_write_resiliency.rs @@ -1,18 +1,20 @@ #[cfg(feature = "integration-test")] mod test { - use std::{collections::HashSet, time::Duration}; + use std::{collections::HashSet, sync::atomic::Ordering, time::Duration}; - use bytes::Bytes; + use bytes::{Buf, BufMut, Bytes, BytesMut}; use hdfs_native::{ + file::FileReader, minidfs::{DfsFeatures, MiniDfs}, + test::{WRITE_CONNECTION_FAULT_INJECTOR, WRITE_REPLY_FAULT_INJECTOR}, Client, Result, WriteOptions, }; use serial_test::serial; #[tokio::test] #[serial] - async fn test_write_resiliency() -> Result<()> { + async fn test_lease_renewal() -> Result<()> { let _ = env_logger::builder().is_test(true).try_init(); let _dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::HA])); @@ -44,4 +46,111 @@ mod test { Ok(()) } + + #[tokio::test] + #[serial] + async fn test_write_failures_basic() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let _dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::HA])); + test_write_failures().await?; + Ok(()) + } + + #[tokio::test] + #[serial] + async fn test_write_failures_security() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let _dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::HA, DfsFeatures::Security])); + test_write_failures().await?; + Ok(()) + } + + async fn test_write_failures() -> Result<()> { + let client = Client::default(); + + let file = "/testfile"; + let bytes_to_write = 2usize * 1024 * 1024; + + let mut data = BytesMut::with_capacity(bytes_to_write); + for i in 0..(bytes_to_write / 4) { + data.put_i32(i as i32); + } + + // Test connection failure before writing data + let mut writer = client + .create(file, WriteOptions::default().replication(3)) + .await?; + + WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst); + + let data = data.freeze(); + writer.write(data.clone()).await?; + writer.close().await?; + + let reader = client.read("/testfile").await?; + check_file_content(&reader, data.clone()).await?; + + // Test connection failure after data has been written + let mut writer = client + .create(file, WriteOptions::default().replication(3).overwrite(true)) + .await?; + + writer.write(data.slice(..bytes_to_write / 2)).await?; + + // Give a little time for the packets to send + tokio::time::sleep(Duration::from_millis(100)).await; + + WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst); + + writer.write(data.slice(bytes_to_write / 2..)).await?; + writer.close().await?; + + let reader = client.read("/testfile").await?; + check_file_content(&reader, data.clone()).await?; + + // Test failure in from ack status before any data is written + let mut writer = client + .create(file, WriteOptions::default().replication(3).overwrite(true)) + .await?; + + *WRITE_REPLY_FAULT_INJECTOR.lock().unwrap() = Some(2); + + writer.write(data.clone()).await?; + writer.close().await?; + + let reader = client.read("/testfile").await?; + check_file_content(&reader, data.clone()).await?; + + // Test failure in from ack status after some data has been written + let mut writer = client + .create(file, WriteOptions::default().replication(3).overwrite(true)) + .await?; + + writer.write(data.slice(..bytes_to_write / 2)).await?; + + // Give a little time for the packets to send + tokio::time::sleep(Duration::from_millis(100)).await; + + *WRITE_REPLY_FAULT_INJECTOR.lock().unwrap() = Some(2); + + writer.write(data.slice(bytes_to_write / 2..)).await?; + writer.close().await?; + + let reader = client.read("/testfile").await?; + check_file_content(&reader, data.clone()).await?; + + Ok(()) + } + + async fn check_file_content(reader: &FileReader, mut expected: Bytes) -> Result<()> { + assert_eq!(reader.file_length(), expected.len()); + + let mut file_data = reader.read_range(0, reader.file_length()).await?; + for _ in 0..expected.len() / 4 { + assert_eq!(file_data.get_i32(), expected.get_i32()); + } + Ok(()) + } }