Skip to content

Commit

Permalink
Add support for datanode sasl negotation (#81)
Browse files Browse the repository at this point in the history
* Add support for datanode sasl negotation
  • Loading branch information
Kimahriman authored Mar 21, 2024
1 parent cf2c49e commit 8e28d8e
Show file tree
Hide file tree
Showing 13 changed files with 380 additions and 86 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Here is a list of currently supported and unsupported but possible future featur
- [x] Kerberos authentication (GSSAPI SASL support)
- [x] Token authentication (DIGEST-MD5 SASL support, no encryption support)
- [x] NameNode SASL connection
- [ ] DataNode SASL connection
- [x] DataNode SASL connection
- [ ] DataNode data transfer encryption
- [ ] Encryption at rest (KMS support)

Expand Down
2 changes: 1 addition & 1 deletion crates/hdfs-native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ libc = "0.2"
libgssapi = { version = "0.7", default-features = false, optional = true }
log = "0.4"
num-traits = "0.2"
once_cell = "1.19.0"
once_cell = "1"
prost = "0.12"
prost-types = "0.12"
roxmltree = "0.18"
Expand Down
9 changes: 6 additions & 3 deletions crates/hdfs-native/minidfs/src/main/java/main/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public static void main(String args[]) throws Exception {
}
MiniKdc kdc = null;

// If an existing token exists, make sure to delete it
new File("target/test/delegation_token").delete();

Configuration conf = new Configuration();
if (flags.contains("security")) {
kdc = new MiniKdc(MiniKdc.createConf(), new File("target/test/kdc"));
Expand All @@ -60,8 +63,10 @@ public static void main(String args[]) throws Exception {
conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, "target/test/hdfs.keytab");
conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, "hdfs/localhost@" + kdc.getRealm());
conf.set(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, "true");
// conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
conf.set(DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY, "true");
if (flags.contains("data_transfer_security")) {
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
}
}

HdfsConfiguration hdfsConf = new HdfsConfiguration(conf);
Expand Down Expand Up @@ -155,8 +160,6 @@ public static void main(String args[]) throws Exception {
DataOutputStream os = new DataOutputStream(new FileOutputStream("target/test/delegation_token"));
creds.writeTokenStorageToStream(os, SerializedFormat.WRITABLE);
os.close();
} else {
new File("target/test/delegation_token").delete();
}
}

Expand Down
20 changes: 6 additions & 14 deletions crates/hdfs-native/src/hdfs/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ pub(crate) fn get_block_stream(

/// Connects to a DataNode to do a read, attempting to used cached connections.
async fn connect_and_send(
url: &str,
datanode_id: &hdfs::DatanodeIdProto,
block: &hdfs::ExtendedBlockProto,
token: common::TokenProto,
offset: u64,
len: u64,
) -> Result<(DatanodeConnection, BlockOpResponseProto)> {
let mut remaining_attempts = 2;
while remaining_attempts > 0 {
if let Some(mut conn) = DATANODE_CACHE.get(url) {
if let Some(mut conn) = DATANODE_CACHE.get(datanode_id) {
let message = hdfs::OpReadBlockProto {
header: conn.build_header(block, Some(token.clone())),
offset,
Expand All @@ -68,7 +68,7 @@ async fn connect_and_send(
}
remaining_attempts -= 1;
}
let mut conn = DatanodeConnection::connect(url).await?;
let mut conn = DatanodeConnection::connect(datanode_id, &token).await?;

let message = hdfs::OpReadBlockProto {
header: conn.build_header(block, Some(token)),
Expand Down Expand Up @@ -117,10 +117,9 @@ impl ReplicatedBlockStream {
}

let datanode = &self.block.locs[self.current_replica].id;
let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port);

let (connection, response) = connect_and_send(
&datanode_url,
datanode,
&self.block.b,
self.block.block_token.clone(),
self.offset as u64,
Expand Down Expand Up @@ -389,15 +388,8 @@ impl StripedBlockStream {
return Ok(());
}

let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port);
let (mut connection, response) = connect_and_send(
&datanode_url,
block,
token.clone(),
offset as u64,
len as u64,
)
.await?;
let (mut connection, response) =
connect_and_send(datanode, block, token.clone(), offset as u64, len as u64).await?;

if response.status() != hdfs::Status::Success {
return Err(HdfsError::DataTransferError(response.message().to_string()));
Expand Down
4 changes: 1 addition & 3 deletions crates/hdfs-native/src/hdfs/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ impl ReplicatedBlockWriter {
server_defaults: hdfs::FsServerDefaultsProto,
) -> Result<Self> {
let datanode = &block.locs[0].id;
let mut connection =
DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port))
.await?;
let mut connection = DatanodeConnection::connect(datanode, &block.block_token).await?;

let checksum = hdfs::ChecksumProto {
r#type: hdfs::ChecksumTypeProto::ChecksumCrc32c as i32,
Expand Down
28 changes: 23 additions & 5 deletions crates/hdfs-native/src/hdfs/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ use tokio::{
use uuid::Uuid;

use crate::proto::common::rpc_response_header_proto::RpcStatusProto;
use crate::proto::common::TokenProto;
use crate::proto::hdfs::DatanodeIdProto;
use crate::proto::{common, hdfs};
use crate::security::sasl::{SaslReader, SaslRpcClient, SaslWriter};
use crate::security::user::UserInfo;
use crate::{HdfsError, Result};

#[cfg(feature = "token")]
use crate::security::sasl::SaslDatanodeConnection;

const PROTOCOL: &str = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
const DATA_TRANSFER_VERSION: u16 = 28;
const MAX_PACKET_HEADER_SIZE: usize = 33;
Expand Down Expand Up @@ -520,12 +525,24 @@ pub(crate) struct DatanodeConnection {
}

impl DatanodeConnection {
pub(crate) async fn connect(url: &str) -> Result<Self> {
let stream = BufStream::new(connect(url).await?);
#[allow(unused_variables)]
pub(crate) async fn connect(datanode_id: &DatanodeIdProto, token: &TokenProto) -> Result<Self> {
let url = format!("{}:{}", datanode_id.ip_addr, datanode_id.xfer_port);
let stream = connect(&url).await?;

// If the token has an identifier, we can do SASL negotation
#[cfg(feature = "token")]
let stream = if token.identifier.is_empty() {
stream
} else {
debug!("{:?}", token);
let sasl_connection = SaslDatanodeConnection::create(stream);
sasl_connection.negotiate(datanode_id, token).await?
};

let conn = DatanodeConnection {
client_name: Uuid::new_v4().to_string(),
stream,
stream: BufStream::new(stream),
url: url.to_string(),
};
Ok(conn)
Expand Down Expand Up @@ -704,15 +721,16 @@ impl DatanodeConnectionCache {
}
}

pub(crate) fn get(&self, url: &str) -> Option<DatanodeConnection> {
pub(crate) fn get(&self, datanode_id: &hdfs::DatanodeIdProto) -> Option<DatanodeConnection> {
// Keep things simply and just expire cache entries when checking the cache. We could
// move this to its own task but that will add a little more complexity.
self.remove_expired();

let url = format!("{}:{}", datanode_id.ip_addr, datanode_id.xfer_port);
let mut cache = self.cache.lock().unwrap();

cache
.get_mut(url)
.get_mut(&url)
.iter_mut()
.flat_map(|conns| conns.pop_front())
.map(|(_, conn)| conn)
Expand Down
36 changes: 22 additions & 14 deletions crates/hdfs-native/src/minidfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use which::which;

#[derive(PartialEq, Eq, Hash, Debug)]
pub enum DfsFeatures {
SECURITY,
TOKEN,
PRIVACY,
Security,
DataTransferSecurity,
Token,
Privacy,
HA,
VIEWFS,
ViewFS,
EC,
RBF,
}
Expand All @@ -23,10 +24,11 @@ impl DfsFeatures {
match self {
DfsFeatures::EC => "ec",
DfsFeatures::HA => "ha",
DfsFeatures::VIEWFS => "viewfs",
DfsFeatures::PRIVACY => "privacy",
DfsFeatures::SECURITY => "security",
DfsFeatures::TOKEN => "token",
DfsFeatures::ViewFS => "viewfs",
DfsFeatures::Privacy => "privacy",
DfsFeatures::Security => "security",
DfsFeatures::DataTransferSecurity => "data_transfer_security",
DfsFeatures::Token => "token",
DfsFeatures::RBF => "rbf",
}
}
Expand All @@ -35,9 +37,9 @@ impl DfsFeatures {
match value {
"ec" => Some(DfsFeatures::EC),
"ha" => Some(DfsFeatures::HA),
"privacy" => Some(DfsFeatures::PRIVACY),
"security" => Some(DfsFeatures::SECURITY),
"token" => Some(DfsFeatures::TOKEN),
"privacy" => Some(DfsFeatures::Privacy),
"security" => Some(DfsFeatures::Security),
"token" => Some(DfsFeatures::Token),
_ => None,
}
}
Expand All @@ -56,6 +58,12 @@ impl MiniDfs {
for feature in features.iter() {
feature_args.push(feature.as_str());
}
// If the `token` feature is enabled, we need to force the data transfer protection
#[cfg(feature = "token")]
if !features.contains(&DfsFeatures::DataTransferSecurity) {
feature_args.push(DfsFeatures::DataTransferSecurity.as_str());
}

let mut child = Command::new(mvn_exec)
.args([
"-f",
Expand Down Expand Up @@ -86,7 +94,7 @@ impl MiniDfs {
// Make sure this doesn't care over from a token test to a non-token test
env::remove_var("HADOOP_TOKEN_FILE_LOCATION");

if features.contains(&DfsFeatures::SECURITY) {
if features.contains(&DfsFeatures::Security) {
let krb_conf = output.next().unwrap().unwrap();
let kdestroy_exec = which("kdestroy").expect("Failed to find kdestroy executable");
Command::new(kdestroy_exec).spawn().unwrap().wait().unwrap();
Expand All @@ -106,7 +114,7 @@ impl MiniDfs {
);

// If we testing token auth, set the path to the file and make sure we don't have an old kinit, otherwise kinit
if features.contains(&DfsFeatures::TOKEN) {
if features.contains(&DfsFeatures::Token) {
env::set_var("HADOOP_TOKEN_FILE_LOCATION", "target/test/delegation_token");
} else {
let kinit_exec = which("kinit").expect("Failed to find kinit executable");
Expand All @@ -120,7 +128,7 @@ impl MiniDfs {
}
}

let url = if features.contains(&DfsFeatures::VIEWFS) {
let url = if features.contains(&DfsFeatures::ViewFS) {
"viewfs://minidfs-viewfs"
} else if features.contains(&DfsFeatures::RBF) {
"hdfs://fed"
Expand Down
Loading

0 comments on commit 8e28d8e

Please sign in to comment.