From 8e28d8e0ef368f5fbfcdc2d4f6aa1c354da23df5 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 21 Mar 2024 06:21:04 -0400 Subject: [PATCH] Add support for datanode sasl negotation (#81) * Add support for datanode sasl negotation --- README.md | 2 +- crates/hdfs-native/Cargo.toml | 2 +- .../minidfs/src/main/java/main/Main.java | 9 +- crates/hdfs-native/src/hdfs/block_reader.rs | 20 +- crates/hdfs-native/src/hdfs/block_writer.rs | 4 +- crates/hdfs-native/src/hdfs/connection.rs | 28 ++- crates/hdfs-native/src/minidfs.rs | 36 ++-- crates/hdfs-native/src/security/sasl.rs | 127 ++++++++++-- crates/hdfs-native/src/security/user.rs | 196 ++++++++++++++++-- crates/hdfs-native/tests/test_ec.rs | 14 +- crates/hdfs-native/tests/test_integration.rs | 24 +-- crates/hdfs-native/tests/test_viewfs.rs | 2 +- .../tests/test_write_resiliency.rs | 2 +- 13 files changed, 380 insertions(+), 86 deletions(-) diff --git a/README.md b/README.md index 8a172c2..0de647a 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Here is a list of currently supported and unsupported but possible future featur - [x] Kerberos authentication (GSSAPI SASL support) - [x] Token authentication (DIGEST-MD5 SASL support, no encryption support) - [x] NameNode SASL connection -- [ ] DataNode SASL connection +- [x] DataNode SASL connection - [ ] DataNode data transfer encryption - [ ] Encryption at rest (KMS support) diff --git a/crates/hdfs-native/Cargo.toml b/crates/hdfs-native/Cargo.toml index 2e96d39..c97282c 100644 --- a/crates/hdfs-native/Cargo.toml +++ b/crates/hdfs-native/Cargo.toml @@ -22,7 +22,7 @@ libc = "0.2" libgssapi = { version = "0.7", default-features = false, optional = true } log = "0.4" num-traits = "0.2" -once_cell = "1.19.0" +once_cell = "1" prost = "0.12" prost-types = "0.12" roxmltree = "0.18" diff --git a/crates/hdfs-native/minidfs/src/main/java/main/Main.java b/crates/hdfs-native/minidfs/src/main/java/main/Main.java index 410577d..d301527 100644 --- a/crates/hdfs-native/minidfs/src/main/java/main/Main.java +++ b/crates/hdfs-native/minidfs/src/main/java/main/Main.java @@ -41,6 +41,9 @@ public static void main(String args[]) throws Exception { } MiniKdc kdc = null; + // If an existing token exists, make sure to delete it + new File("target/test/delegation_token").delete(); + Configuration conf = new Configuration(); if (flags.contains("security")) { kdc = new MiniKdc(MiniKdc.createConf(), new File("target/test/kdc")); @@ -60,8 +63,10 @@ public static void main(String args[]) throws Exception { conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, "target/test/hdfs.keytab"); conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, "hdfs/localhost@" + kdc.getRealm()); conf.set(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, "true"); - // conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication"); conf.set(DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY, "true"); + if (flags.contains("data_transfer_security")) { + conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication"); + } } HdfsConfiguration hdfsConf = new HdfsConfiguration(conf); @@ -155,8 +160,6 @@ public static void main(String args[]) throws Exception { DataOutputStream os = new DataOutputStream(new FileOutputStream("target/test/delegation_token")); creds.writeTokenStorageToStream(os, SerializedFormat.WRITABLE); os.close(); - } else { - new File("target/test/delegation_token").delete(); } } diff --git a/crates/hdfs-native/src/hdfs/block_reader.rs b/crates/hdfs-native/src/hdfs/block_reader.rs index a720865..cb5fab4 100644 --- a/crates/hdfs-native/src/hdfs/block_reader.rs +++ b/crates/hdfs-native/src/hdfs/block_reader.rs @@ -37,7 +37,7 @@ pub(crate) fn get_block_stream( /// Connects to a DataNode to do a read, attempting to used cached connections. async fn connect_and_send( - url: &str, + datanode_id: &hdfs::DatanodeIdProto, block: &hdfs::ExtendedBlockProto, token: common::TokenProto, offset: u64, @@ -45,7 +45,7 @@ async fn connect_and_send( ) -> Result<(DatanodeConnection, BlockOpResponseProto)> { let mut remaining_attempts = 2; while remaining_attempts > 0 { - if let Some(mut conn) = DATANODE_CACHE.get(url) { + if let Some(mut conn) = DATANODE_CACHE.get(datanode_id) { let message = hdfs::OpReadBlockProto { header: conn.build_header(block, Some(token.clone())), offset, @@ -68,7 +68,7 @@ async fn connect_and_send( } remaining_attempts -= 1; } - let mut conn = DatanodeConnection::connect(url).await?; + let mut conn = DatanodeConnection::connect(datanode_id, &token).await?; let message = hdfs::OpReadBlockProto { header: conn.build_header(block, Some(token)), @@ -117,10 +117,9 @@ impl ReplicatedBlockStream { } let datanode = &self.block.locs[self.current_replica].id; - let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port); let (connection, response) = connect_and_send( - &datanode_url, + datanode, &self.block.b, self.block.block_token.clone(), self.offset as u64, @@ -389,15 +388,8 @@ impl StripedBlockStream { return Ok(()); } - let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port); - let (mut connection, response) = connect_and_send( - &datanode_url, - block, - token.clone(), - offset as u64, - len as u64, - ) - .await?; + let (mut connection, response) = + connect_and_send(datanode, block, token.clone(), offset as u64, len as u64).await?; if response.status() != hdfs::Status::Success { return Err(HdfsError::DataTransferError(response.message().to_string())); diff --git a/crates/hdfs-native/src/hdfs/block_writer.rs b/crates/hdfs-native/src/hdfs/block_writer.rs index 31f2bc5..cabf6ca 100644 --- a/crates/hdfs-native/src/hdfs/block_writer.rs +++ b/crates/hdfs-native/src/hdfs/block_writer.rs @@ -100,9 +100,7 @@ impl ReplicatedBlockWriter { server_defaults: hdfs::FsServerDefaultsProto, ) -> Result { let datanode = &block.locs[0].id; - let mut connection = - DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) - .await?; + let mut connection = DatanodeConnection::connect(datanode, &block.block_token).await?; let checksum = hdfs::ChecksumProto { r#type: hdfs::ChecksumTypeProto::ChecksumCrc32c as i32, diff --git a/crates/hdfs-native/src/hdfs/connection.rs b/crates/hdfs-native/src/hdfs/connection.rs index 0fa7880..3f8036b 100644 --- a/crates/hdfs-native/src/hdfs/connection.rs +++ b/crates/hdfs-native/src/hdfs/connection.rs @@ -25,11 +25,16 @@ use tokio::{ use uuid::Uuid; use crate::proto::common::rpc_response_header_proto::RpcStatusProto; +use crate::proto::common::TokenProto; +use crate::proto::hdfs::DatanodeIdProto; use crate::proto::{common, hdfs}; use crate::security::sasl::{SaslReader, SaslRpcClient, SaslWriter}; use crate::security::user::UserInfo; use crate::{HdfsError, Result}; +#[cfg(feature = "token")] +use crate::security::sasl::SaslDatanodeConnection; + const PROTOCOL: &str = "org.apache.hadoop.hdfs.protocol.ClientProtocol"; const DATA_TRANSFER_VERSION: u16 = 28; const MAX_PACKET_HEADER_SIZE: usize = 33; @@ -520,12 +525,24 @@ pub(crate) struct DatanodeConnection { } impl DatanodeConnection { - pub(crate) async fn connect(url: &str) -> Result { - let stream = BufStream::new(connect(url).await?); + #[allow(unused_variables)] + pub(crate) async fn connect(datanode_id: &DatanodeIdProto, token: &TokenProto) -> Result { + let url = format!("{}:{}", datanode_id.ip_addr, datanode_id.xfer_port); + let stream = connect(&url).await?; + + // If the token has an identifier, we can do SASL negotation + #[cfg(feature = "token")] + let stream = if token.identifier.is_empty() { + stream + } else { + debug!("{:?}", token); + let sasl_connection = SaslDatanodeConnection::create(stream); + sasl_connection.negotiate(datanode_id, token).await? + }; let conn = DatanodeConnection { client_name: Uuid::new_v4().to_string(), - stream, + stream: BufStream::new(stream), url: url.to_string(), }; Ok(conn) @@ -704,15 +721,16 @@ impl DatanodeConnectionCache { } } - pub(crate) fn get(&self, url: &str) -> Option { + pub(crate) fn get(&self, datanode_id: &hdfs::DatanodeIdProto) -> Option { // Keep things simply and just expire cache entries when checking the cache. We could // move this to its own task but that will add a little more complexity. self.remove_expired(); + let url = format!("{}:{}", datanode_id.ip_addr, datanode_id.xfer_port); let mut cache = self.cache.lock().unwrap(); cache - .get_mut(url) + .get_mut(&url) .iter_mut() .flat_map(|conns| conns.pop_front()) .map(|(_, conn)| conn) diff --git a/crates/hdfs-native/src/minidfs.rs b/crates/hdfs-native/src/minidfs.rs index 8bc0389..1eff66d 100644 --- a/crates/hdfs-native/src/minidfs.rs +++ b/crates/hdfs-native/src/minidfs.rs @@ -9,11 +9,12 @@ use which::which; #[derive(PartialEq, Eq, Hash, Debug)] pub enum DfsFeatures { - SECURITY, - TOKEN, - PRIVACY, + Security, + DataTransferSecurity, + Token, + Privacy, HA, - VIEWFS, + ViewFS, EC, RBF, } @@ -23,10 +24,11 @@ impl DfsFeatures { match self { DfsFeatures::EC => "ec", DfsFeatures::HA => "ha", - DfsFeatures::VIEWFS => "viewfs", - DfsFeatures::PRIVACY => "privacy", - DfsFeatures::SECURITY => "security", - DfsFeatures::TOKEN => "token", + DfsFeatures::ViewFS => "viewfs", + DfsFeatures::Privacy => "privacy", + DfsFeatures::Security => "security", + DfsFeatures::DataTransferSecurity => "data_transfer_security", + DfsFeatures::Token => "token", DfsFeatures::RBF => "rbf", } } @@ -35,9 +37,9 @@ impl DfsFeatures { match value { "ec" => Some(DfsFeatures::EC), "ha" => Some(DfsFeatures::HA), - "privacy" => Some(DfsFeatures::PRIVACY), - "security" => Some(DfsFeatures::SECURITY), - "token" => Some(DfsFeatures::TOKEN), + "privacy" => Some(DfsFeatures::Privacy), + "security" => Some(DfsFeatures::Security), + "token" => Some(DfsFeatures::Token), _ => None, } } @@ -56,6 +58,12 @@ impl MiniDfs { for feature in features.iter() { feature_args.push(feature.as_str()); } + // If the `token` feature is enabled, we need to force the data transfer protection + #[cfg(feature = "token")] + if !features.contains(&DfsFeatures::DataTransferSecurity) { + feature_args.push(DfsFeatures::DataTransferSecurity.as_str()); + } + let mut child = Command::new(mvn_exec) .args([ "-f", @@ -86,7 +94,7 @@ impl MiniDfs { // Make sure this doesn't care over from a token test to a non-token test env::remove_var("HADOOP_TOKEN_FILE_LOCATION"); - if features.contains(&DfsFeatures::SECURITY) { + if features.contains(&DfsFeatures::Security) { let krb_conf = output.next().unwrap().unwrap(); let kdestroy_exec = which("kdestroy").expect("Failed to find kdestroy executable"); Command::new(kdestroy_exec).spawn().unwrap().wait().unwrap(); @@ -106,7 +114,7 @@ impl MiniDfs { ); // If we testing token auth, set the path to the file and make sure we don't have an old kinit, otherwise kinit - if features.contains(&DfsFeatures::TOKEN) { + if features.contains(&DfsFeatures::Token) { env::set_var("HADOOP_TOKEN_FILE_LOCATION", "target/test/delegation_token"); } else { let kinit_exec = which("kinit").expect("Failed to find kinit executable"); @@ -120,7 +128,7 @@ impl MiniDfs { } } - let url = if features.contains(&DfsFeatures::VIEWFS) { + let url = if features.contains(&DfsFeatures::ViewFS) { "viewfs://minidfs-viewfs" } else if features.contains(&DfsFeatures::RBF) { "hdfs://fed" diff --git a/crates/hdfs-native/src/security/sasl.rs b/crates/hdfs-native/src/security/sasl.rs index e703384..6aa31fa 100644 --- a/crates/hdfs-native/src/security/sasl.rs +++ b/crates/hdfs-native/src/security/sasl.rs @@ -15,10 +15,11 @@ use crate::proto::common::rpc_sasl_proto::{SaslAuth, SaslState}; use crate::proto::common::{ RpcKindProto, RpcRequestHeaderProto, RpcResponseHeaderProto, RpcSaslProto, }; + use crate::{HdfsError, Result}; #[cfg(feature = "token")] use { - super::user::Token, + super::user::{BlockTokenIdentifier, Token}, base64::{engine::general_purpose, Engine as _}, gsasl_sys as gsasl, libc::{c_char, c_void, memcpy}, @@ -26,12 +27,25 @@ use { std::ptr, std::sync::atomic::AtomicPtr, }; +#[cfg(feature = "token")] +use { + crate::proto::{ + common::TokenProto, + hdfs::{ + data_transfer_encryptor_message_proto::DataTransferEncryptorStatus, + DataTransferEncryptorMessageProto, DatanodeIdProto, HandshakeSecretProto, + }, + }, + tokio::io::{AsyncBufReadExt, BufStream}, +}; #[cfg(feature = "kerberos")] use super::gssapi::GssapiSession; use super::user::{User, UserInfo}; const SASL_CALL_ID: i32 = -33; +#[cfg(feature = "token")] +const SASL_TRANSFER_MAGIC_NUMBER: i32 = 0xDEADBEEFu32 as i32; const HDFS_DELEGATION_TOKEN: &str = "HDFS_DELEGATION_TOKEN"; pub(crate) enum AuthMethod { @@ -309,17 +323,6 @@ impl SaslReader { } } -// TODO: Can we implement this? -// impl AsyncRead for SaslReader { -// fn poll_read( -// self: Pin<&mut Self>, -// cx: &mut task::Context<'_>, -// buf: &mut ReadBuf<'_>, -// ) -> Poll> { -// todo!() -// } -// } - pub(crate) struct SaslWriter { stream: OwnedWriteHalf, session: Option>>>, @@ -547,3 +550,103 @@ impl Drop for GSASLSession { } } } + +#[cfg(feature = "token")] +pub(crate) struct SaslDatanodeConnection { + stream: BufStream, +} + +#[cfg(feature = "token")] +impl SaslDatanodeConnection { + pub fn create(stream: TcpStream) -> Self { + Self { + stream: BufStream::new(stream), + } + } + + pub(crate) async fn negotiate( + mut self, + datanode_id: &DatanodeIdProto, + token: &TokenProto, + ) -> Result { + // If it's a privileged port, don't do SASL negotation + if datanode_id.xfer_port <= 1024 { + return Ok(self.stream.into_inner()); + } + + self.stream.write_i32(SASL_TRANSFER_MAGIC_NUMBER).await?; + self.stream.flush().await?; + + let mut session = GSASLSession::new("hdfs", "0", &token.clone().into())?; + + let token_identifier = BlockTokenIdentifier::from_identifier(&token.identifier)?; + + let handshake_secret = if !token_identifier.handshake_secret.is_empty() { + Some(HandshakeSecretProto { + bpid: token_identifier.block_pool_id.clone(), + secret: token_identifier.handshake_secret.clone(), + }) + } else { + None + }; + + let message = DataTransferEncryptorMessageProto { + handshake_secret, + status: DataTransferEncryptorStatus::Success as i32, + ..Default::default() + }; + + debug!("Sending data transfer encryptor message: {:?}", message); + + self.stream + .write_all(&message.encode_length_delimited_to_vec()) + .await?; + self.stream.flush().await?; + + let response = self.read_sasl_response().await?; + debug!("Data transfer encryptor response: {:?}", response); + + let (payload, finished) = session.step(response.payload.as_ref().map(|p| &p[..]))?; + assert!(!finished); + + let message = DataTransferEncryptorMessageProto { + status: DataTransferEncryptorStatus::Success as i32, + payload: Some(payload), + ..Default::default() + }; + + debug!("Sending data transfer encryptor message: {:?}", message); + + self.stream + .write_all(&message.encode_length_delimited_to_vec()) + .await?; + self.stream.flush().await?; + + let response = self.read_sasl_response().await?; + debug!("Data transfer encryptor response: {:?}", response); + + let (_, finished) = session.step(response.payload.as_ref().map(|p| &p[..]))?; + + assert!(finished); + + Ok(self.stream.into_inner()) + } + + async fn read_sasl_response(&mut self) -> Result { + self.stream.fill_buf().await?; + + let buf = self.stream.fill_buf().await?; + if buf.is_empty() { + return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof))?; + } + let msg_length = prost::decode_length_delimiter(buf)?; + let total_size = msg_length + prost::length_delimiter_len(msg_length); + + let mut response_buf = BytesMut::zeroed(total_size); + self.stream.read_exact(&mut response_buf).await?; + + Ok(DataTransferEncryptorMessageProto::decode_length_delimited( + response_buf.freeze(), + )?) + } +} diff --git a/crates/hdfs-native/src/security/user.rs b/crates/hdfs-native/src/security/user.rs index 1285718..623b955 100644 --- a/crates/hdfs-native/src/security/user.rs +++ b/crates/hdfs-native/src/security/user.rs @@ -9,6 +9,11 @@ use std::path::PathBuf; use users::get_current_username; use crate::proto::common::CredentialsProto; +use crate::proto::common::TokenProto; +use crate::proto::hdfs::AccessModeProto; +use crate::proto::hdfs::BlockTokenSecretProto; +use crate::proto::hdfs::StorageTypeProto; +use crate::Result; const HADOOP_USER_NAME: &str = "HADOOP_USER_NAME"; #[cfg(feature = "kerberos")] @@ -16,6 +21,109 @@ const HADOOP_PROXY_USER: &str = "HADOOP_PROXY_USER"; const HADOOP_TOKEN_FILE_LOCATION: &str = "HADOOP_TOKEN_FILE_LOCATION"; const TOKEN_STORAGE_MAGIC: &[u8] = "HDTS".as_bytes(); +#[derive(Debug)] +#[allow(dead_code)] +pub(crate) struct BlockTokenIdentifier { + pub expiry_date: u64, + pub key_id: u32, + pub user_id: String, + pub block_pool_id: String, + pub block_id: u64, + pub modes: Vec, + pub storage_types: Vec, + pub storage_ids: Vec, + pub handshake_secret: Vec, +} + +#[allow(dead_code)] +impl BlockTokenIdentifier { + fn parse_writable(reader: &mut impl Buf) -> Result { + let expiry_date = parse_vlong(reader) as u64; + let key_id = parse_vint(reader) as u32; + let user_id = parse_int_string(reader)?.unwrap(); + let block_pool_id = parse_int_string(reader)?.unwrap(); + let block_id = parse_vlong(reader) as u64; + + let mut modes: Vec = Vec::new(); + let mut storage_types: Vec = Vec::new(); + let mut storage_ids: Vec = Vec::new(); + + // The rest of the fields may or may not be present depending on HDFS version + if reader.has_remaining() { + // Modes + for _ in 0..parse_vint(reader) { + if let Some(mode) = AccessModeProto::from_str_name(&parse_vint_string(reader)?) { + modes.push(mode as i32); + } + } + } + + if reader.has_remaining() { + // Storage Types + for _ in 0..parse_vint(reader) { + if let Some(storage_type) = + StorageTypeProto::from_str_name(&parse_vint_string(reader)?) + { + storage_types.push(storage_type as i32); + } + } + } + + if reader.has_remaining() { + // Storage IDs + for _ in 0..parse_vint(reader) { + if let Some(storage_id) = parse_int_string(reader)? { + storage_ids.push(storage_id); + } + } + } + + let handshake_secret = if reader.has_remaining() { + let handshake_secret_len = parse_vint(reader) as usize; + reader.copy_to_bytes(handshake_secret_len).to_vec() + } else { + vec![] + }; + + Ok(BlockTokenIdentifier { + expiry_date, + key_id, + user_id, + block_pool_id, + block_id, + modes, + storage_types, + storage_ids, + handshake_secret, + }) + } + + fn parse_protobuf(identifier: &[u8]) -> Result { + let secret_proto = BlockTokenSecretProto::decode(identifier)?; + + Ok(BlockTokenIdentifier { + expiry_date: secret_proto.expiry_date(), + key_id: secret_proto.key_id(), + user_id: secret_proto.user_id().to_string(), + block_pool_id: secret_proto.block_pool_id().to_string(), + block_id: secret_proto.block_id(), + modes: secret_proto.modes.clone(), + storage_types: secret_proto.storage_types.clone(), + storage_ids: secret_proto.storage_ids.clone(), + handshake_secret: secret_proto.handshake_secret().to_vec(), + }) + } + + pub(crate) fn from_identifier(identifier: &[u8]) -> Result { + if identifier[0] == 0 || identifier[0] > 127 { + let mut content = Bytes::from(identifier.to_vec()); + Self::parse_writable(&mut content) + } else { + Self::parse_protobuf(identifier) + } + } +} + #[derive(Debug)] pub struct Token { pub alias: String, @@ -35,7 +143,6 @@ impl Token { fn read_token_file(path: PathBuf) -> std::io::Result> { let mut content = Bytes::from(fs::read(path)?); - // let mut reader = content.reader(); let magic = content.copy_to_bytes(4); @@ -78,17 +185,8 @@ impl Token { let password_length = parse_vlong(reader); let password = reader.copy_to_bytes(password_length as usize).to_vec(); - let kind_length = parse_vlong(reader); - let kind = String::from_utf8(reader.copy_to_bytes(kind_length as usize).to_vec()) - .map_err(|_| { - io::Error::new(io::ErrorKind::Other, "Failed to parse token".to_string()) - })?; - - let service_length = parse_vlong(reader); - let service = String::from_utf8(reader.copy_to_bytes(service_length as usize).to_vec()) - .map_err(|_| { - io::Error::new(io::ErrorKind::Other, "Failed to parse token".to_string()) - })?; + let kind = parse_vint_string(reader)?; + let service = parse_vint_string(reader)?; tokens.push(Token { alias, @@ -121,8 +219,18 @@ impl Token { Ok(tokens) } +} - // fn parse_writable +impl From for Token { + fn from(value: TokenProto) -> Self { + Self { + alias: String::new(), + identifier: value.identifier, + password: value.password, + kind: value.kind, + service: value.service, + } + } } /// Adapted from WritableUtils class in Hadoop @@ -157,6 +265,40 @@ fn parse_vlong(reader: &mut impl Buf) -> i64 { } } +fn parse_vint(reader: &mut impl Buf) -> i32 { + // Same method as a long, but it should just be in the int range + let n = parse_vlong(reader); + assert!(n > i32::MIN as i64 && n < i32::MAX as i64); + n as i32 +} + +fn parse_string(reader: &mut impl Buf, length: i32) -> io::Result { + String::from_utf8(reader.copy_to_bytes(length as usize).to_vec()).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "Failed to parse string from writable".to_string(), + ) + }) +} + +/// Parse a string prefixed with the length as an int +#[allow(dead_code)] +fn parse_int_string(reader: &mut impl Buf) -> io::Result> { + let length = reader.get_i32(); + let value = if length == -1 { + None + } else { + Some(parse_string(reader, length)?) + }; + Ok(value) +} + +/// Parse a string prefixed with the length as a vint +fn parse_vint_string(reader: &mut impl Buf) -> io::Result { + let length = parse_vint(reader); + parse_string(reader, length) +} + #[derive(Debug)] pub(crate) struct UserInfo { pub(crate) real_user: Option, @@ -281,4 +423,32 @@ mod tests { assert_eq!(tokens[0].service, "127.0.0.1:9000"); tokens.iter().for_each(|t| println!("{:?}", t)); } + + #[test] + fn test_load_token_identifier() { + let token = [ + 138u8, 1, 142, 89, 190, 30, 189, 140, 100, 197, 210, 104, 0, 0, 0, 4, 104, 100, 102, + 115, 0, 0, 0, 40, 66, 80, 45, 57, 55, 51, 52, 55, 55, 51, 54, 48, 45, 49, 57, 50, 46, + 49, 54, 56, 46, 49, 46, 49, 56, 52, 45, 49, 55, 49, 48, 56, 54, 54, 54, 49, 49, 51, 50, + 53, 128, 127, 255, 255, 255, 255, 255, 255, 239, 1, 4, 82, 69, 65, 68, 3, 4, 68, 73, + 83, 75, 4, 68, 73, 83, 75, 4, 68, 73, 83, 75, 3, 0, 0, 0, 39, 68, 83, 45, 97, 50, 100, + 51, 55, 50, 98, 101, 45, 101, 99, 98, 55, 45, 52, 101, 101, 49, 45, 98, 48, 99, 51, 45, + 48, 57, 102, 49, 51, 100, 52, 49, 57, 101, 52, 102, 0, 0, 0, 39, 68, 83, 45, 53, 56, + 54, 55, 50, 99, 50, 50, 45, 51, 49, 57, 99, 45, 52, 99, 50, 53, 45, 56, 55, 50, 98, 45, + 97, 56, 48, 49, 98, 57, 99, 100, 53, 102, 51, 49, 0, 0, 0, 39, 68, 83, 45, 102, 49, + 102, 57, 57, 97, 52, 49, 45, 56, 54, 102, 51, 45, 52, 57, 102, 56, 45, 57, 48, 50, 55, + 45, 98, 101, 102, 102, 54, 100, 100, 52, 53, 54, 54, 100, + ]; + + let token_identifier = BlockTokenIdentifier::from_identifier(&token).unwrap(); + println!("{:?}", token_identifier); + assert_eq!(token_identifier.user_id, "hdfs"); + assert_eq!( + token_identifier.block_pool_id, + "BP-973477360-192.168.1.184-1710866611325" + ); + assert_eq!(token_identifier.block_id, 9223372036854775824); + assert_eq!(token_identifier.key_id, 1690686056); + assert!(token_identifier.handshake_secret.is_empty()); + } } diff --git a/crates/hdfs-native/tests/test_ec.rs b/crates/hdfs-native/tests/test_ec.rs index bc03d86..6249492 100644 --- a/crates/hdfs-native/tests/test_ec.rs +++ b/crates/hdfs-native/tests/test_ec.rs @@ -74,10 +74,11 @@ mod test { async fn test_erasure_coded_read() -> Result<()> { let _ = env_logger::builder().is_test(true).try_init(); + let mut dfs_features = HashSet::from([DfsFeatures::EC]); #[cfg(feature = "kerberos")] - let dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::EC, DfsFeatures::SECURITY])); - #[cfg(not(feature = "kerberos"))] - let dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::EC])); + dfs_features.insert(DfsFeatures::Security); + + let dfs = MiniDfs::with_features(&dfs_features); let client = Client::default(); // Test each of Hadoop's built-in RS policies @@ -117,10 +118,11 @@ mod test { async fn test_erasure_coded_write() -> Result<()> { let _ = env_logger::builder().is_test(true).try_init(); + let mut dfs_features = HashSet::from([DfsFeatures::EC]); #[cfg(feature = "kerberos")] - let _dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::EC, DfsFeatures::SECURITY])); - #[cfg(not(feature = "kerberos"))] - let _dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::EC])); + dfs_features.insert(DfsFeatures::Security); + + let _dfs = MiniDfs::with_features(&dfs_features); let client = Client::default(); // Test each of Hadoop's built-in RS policies diff --git a/crates/hdfs-native/tests/test_integration.rs b/crates/hdfs-native/tests/test_integration.rs index 8dadbbd..0fd2a3c 100644 --- a/crates/hdfs-native/tests/test_integration.rs +++ b/crates/hdfs-native/tests/test_integration.rs @@ -19,7 +19,7 @@ mod test { #[serial] #[cfg(feature = "kerberos")] async fn test_security_kerberos() { - test_with_features(&HashSet::from([DfsFeatures::SECURITY])) + test_with_features(&HashSet::from([DfsFeatures::Security])) .await .unwrap(); } @@ -28,7 +28,7 @@ mod test { #[serial] #[cfg(feature = "token")] async fn test_security_token() { - test_with_features(&HashSet::from([DfsFeatures::SECURITY, DfsFeatures::TOKEN])) + test_with_features(&HashSet::from([DfsFeatures::Security, DfsFeatures::Token])) .await .unwrap(); } @@ -39,9 +39,9 @@ mod test { #[cfg(feature = "token")] async fn test_privacy_token() { test_with_features(&HashSet::from([ - DfsFeatures::SECURITY, - DfsFeatures::TOKEN, - DfsFeatures::PRIVACY, + DfsFeatures::Security, + DfsFeatures::Token, + DfsFeatures::Privacy, ])) .await .unwrap(); @@ -52,8 +52,8 @@ mod test { #[cfg(feature = "kerberos")] async fn test_privacy_kerberos() { test_with_features(&HashSet::from([ - DfsFeatures::SECURITY, - DfsFeatures::PRIVACY, + DfsFeatures::Security, + DfsFeatures::Privacy, ])) .await .unwrap(); @@ -72,8 +72,8 @@ mod test { #[cfg(feature = "kerberos")] async fn test_security_privacy_ha() { test_with_features(&HashSet::from([ - DfsFeatures::SECURITY, - DfsFeatures::PRIVACY, + DfsFeatures::Security, + DfsFeatures::Privacy, DfsFeatures::HA, ])) .await @@ -85,8 +85,8 @@ mod test { #[cfg(feature = "token")] async fn test_security_token_ha() { test_with_features(&HashSet::from([ - DfsFeatures::SECURITY, - DfsFeatures::TOKEN, + DfsFeatures::Security, + DfsFeatures::Token, DfsFeatures::HA, ])) .await @@ -104,7 +104,7 @@ mod test { pub async fn test_with_features(features: &HashSet) -> Result<()> { let _ = env_logger::builder().is_test(true).try_init(); - let _dfs = setup(features); + let _dfs = setup(&features); let client = Client::default(); test_file_info(&client).await?; diff --git a/crates/hdfs-native/tests/test_viewfs.rs b/crates/hdfs-native/tests/test_viewfs.rs index 790e42f..01539c2 100644 --- a/crates/hdfs-native/tests/test_viewfs.rs +++ b/crates/hdfs-native/tests/test_viewfs.rs @@ -23,7 +23,7 @@ mod test { #[tokio::test] #[serial] async fn test_viewfs() { - let features = HashSet::from([DfsFeatures::VIEWFS]); + let features = HashSet::from([DfsFeatures::ViewFS]); let _ = env_logger::builder().is_test(true).try_init(); // VIEWFS feature creates one mount with a fallback diff --git a/crates/hdfs-native/tests/test_write_resiliency.rs b/crates/hdfs-native/tests/test_write_resiliency.rs index beae2a3..b42e67c 100644 --- a/crates/hdfs-native/tests/test_write_resiliency.rs +++ b/crates/hdfs-native/tests/test_write_resiliency.rs @@ -16,7 +16,7 @@ mod test { let _ = env_logger::builder().is_test(true).try_init(); #[cfg(feature = "kerberos")] - let _dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::HA, DfsFeatures::SECURITY])); + let _dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::HA, DfsFeatures::Security])); #[cfg(not(feature = "kerberos"))] let _dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::HA, DfsFeatures::RBF])); let client = Client::default();