From 2960f9c429791359aabbf9af1a24d5becaa6600c Mon Sep 17 00:00:00 2001
From: Adam Binford <adamq43@gmail.com>
Date: Sun, 14 Jan 2024 16:29:38 -0500
Subject: [PATCH] Abort heartbeat when closing

---
 crates/hdfs-native/src/file.rs              | 17 ++++++++++++++---
 crates/hdfs-native/src/hdfs/block_writer.rs | 11 ++++++++---
 2 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/crates/hdfs-native/src/file.rs b/crates/hdfs-native/src/file.rs
index 7df9071..0fda17f 100644
--- a/crates/hdfs-native/src/file.rs
+++ b/crates/hdfs-native/src/file.rs
@@ -4,6 +4,7 @@ use std::time::Duration;
 use bytes::{BufMut, Bytes, BytesMut};
 use futures::stream::BoxStream;
 use futures::{stream, Stream, StreamExt};
+use log::warn;
 
 use crate::ec::{resolve_ec_policy, EcSchema};
 use crate::hdfs::block_reader::get_block_stream;
@@ -272,9 +273,19 @@ impl FileWriter {
                 retry_delay *= 2;
                 retries += 1;
             }
+            Err(HdfsError::OperationFailed(
+                "Failed to complete file in time".to_string(),
+            ))
+        } else {
+            Ok(())
+        }
+    }
+}
+
+impl Drop for FileWriter {
+    fn drop(&mut self) {
+        if !self.closed {
+            warn!("FileWriter dropped without being closed. File content may not have saved or may not be complete");
         }
-        Err(HdfsError::OperationFailed(
-            "Failed to complete file in time".to_string(),
-        ))
     }
 }
diff --git a/crates/hdfs-native/src/hdfs/block_writer.rs b/crates/hdfs-native/src/hdfs/block_writer.rs
index 7c030a7..5205d49 100644
--- a/crates/hdfs-native/src/hdfs/block_writer.rs
+++ b/crates/hdfs-native/src/hdfs/block_writer.rs
@@ -88,6 +88,8 @@ pub(crate) struct ReplicatedBlockWriter {
     ack_listener_handle: JoinHandle<Result<()>>,
     // Tracks the state of packet sender. Set to Err if any error occurs during writing packets,
     packet_sender_handle: JoinHandle<Result<DatanodeWriter>>,
+    // Tracks the heartbeat task so we can abort it when we close
+    heartbeat_handle: JoinHandle<()>,
 
     ack_queue: mpsc::Sender<(i64, bool)>,
     packet_sender: mpsc::Sender<Packet>,
@@ -147,7 +149,7 @@ impl ReplicatedBlockWriter {
 
         let ack_listener_handle = Self::listen_for_acks(reader, ack_queue_receiever);
         let packet_sender_handle = Self::start_packet_sender(writer, packet_receiver);
-        Self::start_heartbeat_sender(packet_sender.clone());
+        let heartbeat_handle = Self::start_heartbeat_sender(packet_sender.clone());
 
         let bytes_per_checksum = server_defaults.bytes_per_checksum;
         let write_packet_size = server_defaults.write_packet_size;
@@ -177,6 +179,7 @@ impl ReplicatedBlockWriter {
 
             ack_listener_handle,
             packet_sender_handle,
+            heartbeat_handle,
 
             ack_queue: ack_queue_sender,
             packet_sender,
@@ -284,6 +287,8 @@ impl ReplicatedBlockWriter {
         self.current_packet.set_last_packet();
         self.send_current_packet().await?;
 
+        self.heartbeat_handle.abort();
+
         // Wait for all packets to be sent
         self.packet_sender_handle.await.map_err(|_| {
             HdfsError::DataTransferError(
@@ -361,7 +366,7 @@ impl ReplicatedBlockWriter {
         })
     }
 
-    fn start_heartbeat_sender(packet_sender: mpsc::Sender<Packet>) {
+    fn start_heartbeat_sender(packet_sender: mpsc::Sender<Packet>) -> JoinHandle<()> {
         tokio::spawn(async move {
             loop {
                 tokio::time::sleep(Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS)).await;
@@ -371,7 +376,7 @@ impl ReplicatedBlockWriter {
                     break;
                 }
             }
-        });
+        })
     }
 }