Skip to content

Commit

Permalink
Abort heartbeat when closing
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman committed Jan 14, 2024
1 parent 6c8107a commit 2960f9c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
17 changes: 14 additions & 3 deletions crates/hdfs-native/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use bytes::{BufMut, Bytes, BytesMut};
use futures::stream::BoxStream;
use futures::{stream, Stream, StreamExt};
use log::warn;

use crate::ec::{resolve_ec_policy, EcSchema};
use crate::hdfs::block_reader::get_block_stream;
Expand Down Expand Up @@ -272,9 +273,19 @@ impl FileWriter {
retry_delay *= 2;
retries += 1;
}
Err(HdfsError::OperationFailed(
"Failed to complete file in time".to_string(),
))
} else {
Ok(())
}
}
}

impl Drop for FileWriter {
fn drop(&mut self) {
if !self.closed {
warn!("FileWriter dropped without being closed. File content may not have saved or may not be complete");
}
Err(HdfsError::OperationFailed(
"Failed to complete file in time".to_string(),
))
}
}
11 changes: 8 additions & 3 deletions crates/hdfs-native/src/hdfs/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ pub(crate) struct ReplicatedBlockWriter {
ack_listener_handle: JoinHandle<Result<()>>,
// Tracks the state of packet sender. Set to Err if any error occurs during writing packets,
packet_sender_handle: JoinHandle<Result<DatanodeWriter>>,
// Tracks the heartbeat task so we can abort it when we close
heartbeat_handle: JoinHandle<()>,

ack_queue: mpsc::Sender<(i64, bool)>,
packet_sender: mpsc::Sender<Packet>,
Expand Down Expand Up @@ -147,7 +149,7 @@ impl ReplicatedBlockWriter {

let ack_listener_handle = Self::listen_for_acks(reader, ack_queue_receiever);
let packet_sender_handle = Self::start_packet_sender(writer, packet_receiver);
Self::start_heartbeat_sender(packet_sender.clone());
let heartbeat_handle = Self::start_heartbeat_sender(packet_sender.clone());

let bytes_per_checksum = server_defaults.bytes_per_checksum;
let write_packet_size = server_defaults.write_packet_size;
Expand Down Expand Up @@ -177,6 +179,7 @@ impl ReplicatedBlockWriter {

ack_listener_handle,
packet_sender_handle,
heartbeat_handle,

ack_queue: ack_queue_sender,
packet_sender,
Expand Down Expand Up @@ -284,6 +287,8 @@ impl ReplicatedBlockWriter {
self.current_packet.set_last_packet();
self.send_current_packet().await?;

self.heartbeat_handle.abort();

// Wait for all packets to be sent
self.packet_sender_handle.await.map_err(|_| {
HdfsError::DataTransferError(
Expand Down Expand Up @@ -361,7 +366,7 @@ impl ReplicatedBlockWriter {
})
}

fn start_heartbeat_sender(packet_sender: mpsc::Sender<Packet>) {
fn start_heartbeat_sender(packet_sender: mpsc::Sender<Packet>) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS)).await;
Expand All @@ -371,7 +376,7 @@ impl ReplicatedBlockWriter {
break;
}
}
});
})
}
}

Expand Down

0 comments on commit 2960f9c

Please sign in to comment.