diff --git a/rust/src/security/sasl.rs b/rust/src/security/sasl.rs index 5e81722..46e752d 100644 --- a/rust/src/security/sasl.rs +++ b/rust/src/security/sasl.rs @@ -252,7 +252,7 @@ impl SaslReader { let mut bytes = buf.freeze(); let rpc_response = RpcResponseHeaderProto::decode_length_delimited(&mut bytes)?; - debug!("{:?}", rpc_response); + debug!("RPC response: {:?}", rpc_response); match RpcStatusProto::try_from(rpc_response.status).unwrap() { RpcStatusProto::Error => { @@ -339,6 +339,8 @@ impl SaslWriter { } async fn send_sasl_message(&mut self, message: &RpcSaslProto) -> io::Result<()> { + debug!("Sending SASL message {:?}", message); + let header_buf = Self::create_request_header().encode_length_delimited_to_vec(); let message_buf = message.encode_length_delimited_to_vec(); let size = (header_buf.len() + message_buf.len()) as u32; diff --git a/rust/src/security/user.rs b/rust/src/security/user.rs index 901405c..da10885 100644 --- a/rust/src/security/user.rs +++ b/rust/src/security/user.rs @@ -1,4 +1,5 @@ use bytes::{Buf, Bytes}; +use chrono::Utc; use log::debug; use prost::Message; use std::env; @@ -13,6 +14,7 @@ use crate::proto::common::TokenProto; use crate::proto::hdfs::AccessModeProto; use crate::proto::hdfs::BlockTokenSecretProto; use crate::proto::hdfs::StorageTypeProto; +use crate::HdfsError; use crate::Result; const HADOOP_USER_NAME: &str = "HADOOP_USER_NAME"; @@ -123,6 +125,48 @@ impl BlockTokenIdentifier { } } +#[derive(Debug)] +#[allow(dead_code)] +struct TokenIdentifier { + owner: String, + renewer: String, + real_user: String, + issue_date: i64, + max_date: i64, + sequence_number: i32, + master_key_id: i32, +} + +impl TryFrom> for TokenIdentifier { + type Error = HdfsError; + + fn try_from(value: Vec) -> std::result::Result { + let mut buf = Bytes::from(value); + let version = buf.get_u8(); + if version != 0 { + panic!(); + } + + let owner = parse_vint_string(&mut buf)?; + let renewer = parse_vint_string(&mut buf)?; + let real_user = parse_vint_string(&mut buf)?; + let issue_date = parse_vlong(&mut buf); + let max_date = parse_vlong(&mut buf); + let sequence_number = parse_vint(&mut buf); + let master_key_id = parse_vint(&mut buf); + + Ok(TokenIdentifier { + owner, + renewer, + real_user, + issue_date, + max_date, + sequence_number, + master_key_id, + }) + } +} + #[derive(Debug)] #[allow(dead_code)] pub struct Token { @@ -320,7 +364,13 @@ impl User { pub(crate) fn get_token(&self, kind: &str, service: &str) -> Option<&Token> { self.tokens .iter() - .find(|t| t.kind == kind && t.service == service) + .filter(|t| t.kind == kind && t.service == service) + .find(|t| { + // Ignore any tokens that are set to expire in the next 60 seconds + let token_identifier: TokenIdentifier = t.identifier.clone().try_into().unwrap(); + debug!("Token Identifier: {:?}", token_identifier); + token_identifier.max_date > Utc::now().timestamp_millis() + 60000 + }) } pub(crate) fn get_user_info_from_principal(principal: &str) -> UserInfo { @@ -385,7 +435,9 @@ mod tests { assert_eq!(tokens.len(), 1); assert_eq!(tokens[0].kind, "HDFS_DELEGATION_TOKEN"); assert_eq!(tokens[0].service, "127.0.0.1:9000"); - tokens.iter().for_each(|t| println!("{:?}", t)); + + let token_identifier: TokenIdentifier = tokens[0].identifier.clone().try_into().unwrap(); + assert_eq!(token_identifier.max_date, 1690672432660) } #[test] @@ -413,11 +465,13 @@ mod tests { assert_eq!(tokens.len(), 1); assert_eq!(tokens[0].kind, "HDFS_DELEGATION_TOKEN"); assert_eq!(tokens[0].service, "127.0.0.1:9000"); - tokens.iter().for_each(|t| println!("{:?}", t)); + + let token_identifier: TokenIdentifier = tokens[0].identifier.clone().try_into().unwrap(); + assert_eq!(token_identifier.max_date, 1686955057021) } #[test] - fn test_load_token_identifier() { + fn test_load_block_token_identifier() { let token = [ 138u8, 1, 142, 89, 190, 30, 189, 140, 100, 197, 210, 104, 0, 0, 0, 4, 104, 100, 102, 115, 0, 0, 0, 40, 66, 80, 45, 57, 55, 51, 52, 55, 55, 51, 54, 48, 45, 49, 57, 50, 46, @@ -433,7 +487,6 @@ mod tests { ]; let token_identifier = BlockTokenIdentifier::from_identifier(&token).unwrap(); - println!("{:?}", token_identifier); assert_eq!(token_identifier.user_id, "hdfs"); assert_eq!( token_identifier.block_pool_id,