Skip to content

Commit

Permalink
Support DataNode connection reuse (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman authored Feb 22, 2024
1 parent 4b16e3a commit cf2c49e
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 72 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,9 @@ resolver = "2"

[workspace.dependencies]
bytes = "1"
chrono = "0.4"
futures = "0.3"
tokio = "1"
tokio = "1"

[profile.bench]
debug = true
2 changes: 1 addition & 1 deletion crates/hdfs-native-object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ license = "Apache-2.0"
[dependencies]
async-trait = { version = "0.1" }
bytes = { workspace = true }
chrono = { version = "0.4" }
chrono = { workspace = true }
futures = { workspace = true }
hdfs-native = { path = "../hdfs-native", version = "0.7" }
object_store = { version = "0.9", features = ["cloud"] }
Expand Down
4 changes: 3 additions & 1 deletion crates/hdfs-native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ license = "Apache-2.0"
[dependencies]
base64 = "0.21"
bytes = { workspace = true }
chrono = { workspace = true }
crc = "3.1.0-beta.1"
futures = { workspace = true }
g2p = "1"
gsasl-sys = { version = "0.2", default-features = false, optional = true }
libc = "0.2"
libgssapi = { version = "0.6", default-features = false, optional = true }
libgssapi = { version = "0.7", default-features = false, optional = true }
log = "0.4"
num-traits = "0.2"
once_cell = "1.19.0"
prost = "0.12"
prost-types = "0.12"
roxmltree = "0.18"
Expand Down
126 changes: 87 additions & 39 deletions crates/hdfs-native/src/hdfs/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ use futures::{
stream::{self, BoxStream},
Stream, StreamExt,
};
use log::debug;
use log::{debug, warn};

use crate::{
ec::EcSchema,
hdfs::connection::{DatanodeConnection, Op},
proto::{common, hdfs},
hdfs::connection::{DatanodeConnection, Op, DATANODE_CACHE},
proto::{
common,
hdfs::{self, BlockOpResponseProto},
},
HdfsError, Result,
};

Expand All @@ -32,6 +35,55 @@ pub(crate) fn get_block_stream(
}
}

/// Connects to a DataNode to do a read, attempting to used cached connections.
async fn connect_and_send(
url: &str,
block: &hdfs::ExtendedBlockProto,
token: common::TokenProto,
offset: u64,
len: u64,
) -> Result<(DatanodeConnection, BlockOpResponseProto)> {
let mut remaining_attempts = 2;
while remaining_attempts > 0 {
if let Some(mut conn) = DATANODE_CACHE.get(url) {
let message = hdfs::OpReadBlockProto {
header: conn.build_header(block, Some(token.clone())),
offset,
len,
send_checksums: Some(true),
..Default::default()
};
debug!("Block read op request {:?}", &message);
match conn.send(Op::ReadBlock, &message).await {
Ok(response) => {
debug!("Block read op response {:?}", response);
return Ok((conn, response));
}
Err(e) => {
warn!("Failed to use cached connection: {:?}", e);
}
}
} else {
break;
}
remaining_attempts -= 1;
}
let mut conn = DatanodeConnection::connect(url).await?;

let message = hdfs::OpReadBlockProto {
header: conn.build_header(block, Some(token)),
offset,
len,
send_checksums: Some(true),
..Default::default()
};

debug!("Block read op request {:?}", &message);
let response = conn.send(Op::ReadBlock, &message).await?;
debug!("Block read op response {:?}", response);
Ok((conn, response))
}

struct ReplicatedBlockStream {
block: hdfs::LocatedBlockProto,
offset: usize,
Expand Down Expand Up @@ -63,24 +115,18 @@ impl ReplicatedBlockStream {
));
}
}
let datanode = &self.block.locs[self.current_replica].id;
let mut connection =
DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port))
.await?;

let message = hdfs::OpReadBlockProto {
header: connection.build_header(&self.block.b, Some(self.block.block_token.clone())),
offset: self.offset as u64,
len: self.len as u64,
send_checksums: Some(true),
..Default::default()
};

debug!("Block read op request {:?}", &message);
let datanode = &self.block.locs[self.current_replica].id;
let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port);

connection.send(Op::ReadBlock, &message).await?;
let response = connection.read_block_op_response().await?;
debug!("Block read op response {:?}", response);
let (connection, response) = connect_and_send(
&datanode_url,
&self.block.b,
self.block.block_token.clone(),
self.offset as u64,
self.len as u64,
)
.await?;

if response.status() != hdfs::Status::Success {
return Err(HdfsError::DataTransferError(response.message().to_string()));
Expand All @@ -96,13 +142,20 @@ impl ReplicatedBlockStream {
if self.connection.is_none() {
self.select_next_datanode().await?;
}
let conn = self.connection.as_mut().unwrap();

if self.len == 0 {
let mut conn = self.connection.take().unwrap();

// Read the final empty packet
conn.read_packet().await?;

conn.send_read_success().await?;
DATANODE_CACHE.release(conn);
return Ok(None);
}

let conn = self.connection.as_mut().unwrap();

let packet = conn.read_packet().await?;

let packet_offset = if self.offset > packet.header.offset_in_block as usize {
Expand Down Expand Up @@ -336,29 +389,22 @@ impl StripedBlockStream {
return Ok(());
}

let mut conn =
DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port))
.await?;

let message = hdfs::OpReadBlockProto {
header: conn.build_header(block, Some(token.clone())),
offset: offset as u64,
len: len as u64,
send_checksums: Some(true),
..Default::default()
};
debug!("Block read op request {:?}", &message);

conn.send(Op::ReadBlock, &message).await?;
let response = conn.read_block_op_response().await?;
debug!("Block read op response {:?}", response);
let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port);
let (mut connection, response) = connect_and_send(
&datanode_url,
block,
token.clone(),
offset as u64,
len as u64,
)
.await?;

if response.status() != hdfs::Status::Success {
return Err(HdfsError::DataTransferError(response.message().to_string()));
}

// First handle the offset into the first packet
let mut packet = conn.read_packet().await?;
let mut packet = connection.read_packet().await?;
let packet_offset = offset - packet.header.offset_in_block as usize;
let data_len = packet.header.data_len as usize - packet_offset;
let data_to_read = usize::min(data_len, len);
Expand All @@ -368,7 +414,7 @@ impl StripedBlockStream {
buf.put(packet_data.slice(packet_offset..(packet_offset + data_to_read)));

while data_left > 0 {
packet = conn.read_packet().await?;
packet = connection.read_packet().await?;
// TODO: Error checking
let data_to_read = usize::min(data_left, packet.header.data_len as usize);
buf.put(
Expand All @@ -380,7 +426,9 @@ impl StripedBlockStream {
}

// There should be one last empty packet after we are done
conn.read_packet().await?;
connection.read_packet().await?;
connection.send_read_success().await?;
DATANODE_CACHE.release(connection);

Ok(())
}
Expand Down
18 changes: 8 additions & 10 deletions crates/hdfs-native/src/hdfs/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ use tokio::{sync::mpsc, task::JoinHandle};

use crate::{
ec::{gf256::Coder, EcSchema},
hdfs::connection::{DatanodeConnection, Op},
hdfs::connection::{DatanodeConnection, DatanodeReader, DatanodeWriter, Op, Packet},
proto::hdfs,
HdfsError, Result,
};

use super::connection::{DatanodeReader, DatanodeWriter, Packet};

const HEART_BEAT_SEQNO: i64 = -1;
const UNKNOWN_SEQNO: i64 = -2;

Expand Down Expand Up @@ -87,7 +85,7 @@ pub(crate) struct ReplicatedBlockWriter {
// acknowledgements. Set to Ok(()) when the last acknowledgement is received.
ack_listener_handle: JoinHandle<Result<()>>,
// Tracks the state of packet sender. Set to Err if any error occurs during writing packets,
packet_sender_handle: JoinHandle<Result<DatanodeWriter>>,
packet_sender_handle: JoinHandle<Result<()>>,
// Tracks the heartbeat task so we can abort it when we close
heartbeat_handle: JoinHandle<()>,

Expand Down Expand Up @@ -136,9 +134,7 @@ impl ReplicatedBlockWriter {
};

debug!("Block write request: {:?}", &message);

connection.send(Op::WriteBlock, &message).await?;
let response = connection.read_block_op_response().await?;
let response = connection.send(Op::WriteBlock, &message).await?;
debug!("Block write response: {:?}", response);

let (reader, writer) = connection.split();
Expand Down Expand Up @@ -301,7 +297,9 @@ impl ReplicatedBlockWriter {
HdfsError::DataTransferError(
"Ack status channel closed while waiting for final ack".to_string(),
)
})?
})??;

Ok(())
}

fn listen_for_acks(
Expand Down Expand Up @@ -353,7 +351,7 @@ impl ReplicatedBlockWriter {
fn start_packet_sender(
mut writer: DatanodeWriter,
mut packet_receiver: mpsc::Receiver<Packet>,
) -> JoinHandle<Result<DatanodeWriter>> {
) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
while let Some(mut packet) = packet_receiver.recv().await {
writer.write_packet(&mut packet).await?;
Expand All @@ -362,7 +360,7 @@ impl ReplicatedBlockWriter {
break;
}
}
Ok(writer)
Ok(())
})
}

Expand Down
Loading

0 comments on commit cf2c49e

Please sign in to comment.