Skip to content

Commit

Permalink
Filter out expired tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman committed Nov 17, 2024
1 parent 99b689e commit a206c54
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 6 deletions.
4 changes: 3 additions & 1 deletion rust/src/security/sasl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl SaslReader {

let mut bytes = buf.freeze();
let rpc_response = RpcResponseHeaderProto::decode_length_delimited(&mut bytes)?;
debug!("{:?}", rpc_response);
debug!("RPC response: {:?}", rpc_response);

match RpcStatusProto::try_from(rpc_response.status).unwrap() {
RpcStatusProto::Error => {
Expand Down Expand Up @@ -339,6 +339,8 @@ impl SaslWriter {
}

async fn send_sasl_message(&mut self, message: &RpcSaslProto) -> io::Result<()> {
debug!("Sending SASL message {:?}", message);

let header_buf = Self::create_request_header().encode_length_delimited_to_vec();
let message_buf = message.encode_length_delimited_to_vec();
let size = (header_buf.len() + message_buf.len()) as u32;
Expand Down
63 changes: 58 additions & 5 deletions rust/src/security/user.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bytes::{Buf, Bytes};
use chrono::Utc;
use log::debug;
use prost::Message;
use std::env;
Expand All @@ -13,6 +14,7 @@ use crate::proto::common::TokenProto;
use crate::proto::hdfs::AccessModeProto;
use crate::proto::hdfs::BlockTokenSecretProto;
use crate::proto::hdfs::StorageTypeProto;
use crate::HdfsError;
use crate::Result;

const HADOOP_USER_NAME: &str = "HADOOP_USER_NAME";
Expand Down Expand Up @@ -123,6 +125,48 @@ impl BlockTokenIdentifier {
}
}

#[derive(Debug)]
#[allow(dead_code)]
struct TokenIdentifier {
owner: String,
renewer: String,
real_user: String,
issue_date: i64,
max_date: i64,
sequence_number: i32,
master_key_id: i32,
}

impl TryFrom<Vec<u8>> for TokenIdentifier {
type Error = HdfsError;

fn try_from(value: Vec<u8>) -> std::result::Result<Self, Self::Error> {
let mut buf = Bytes::from(value);
let version = buf.get_u8();
if version != 0 {
panic!();
}

let owner = parse_vint_string(&mut buf)?;
let renewer = parse_vint_string(&mut buf)?;
let real_user = parse_vint_string(&mut buf)?;
let issue_date = parse_vlong(&mut buf);
let max_date = parse_vlong(&mut buf);
let sequence_number = parse_vint(&mut buf);
let master_key_id = parse_vint(&mut buf);

Ok(TokenIdentifier {
owner,
renewer,
real_user,
issue_date,
max_date,
sequence_number,
master_key_id,
})
}
}

#[derive(Debug)]
#[allow(dead_code)]
pub struct Token {
Expand Down Expand Up @@ -320,7 +364,13 @@ impl User {
pub(crate) fn get_token(&self, kind: &str, service: &str) -> Option<&Token> {
self.tokens
.iter()
.find(|t| t.kind == kind && t.service == service)
.filter(|t| t.kind == kind && t.service == service)
.find(|t| {
// Ignore any tokens that are set to expire in the next 60 seconds
let token_identifier: TokenIdentifier = t.identifier.clone().try_into().unwrap();
debug!("Token Identifier: {:?}", token_identifier);
token_identifier.max_date > Utc::now().timestamp_millis() + 60000
})
}

pub(crate) fn get_user_info_from_principal(principal: &str) -> UserInfo {
Expand Down Expand Up @@ -385,7 +435,9 @@ mod tests {
assert_eq!(tokens.len(), 1);
assert_eq!(tokens[0].kind, "HDFS_DELEGATION_TOKEN");
assert_eq!(tokens[0].service, "127.0.0.1:9000");
tokens.iter().for_each(|t| println!("{:?}", t));

let token_identifier: TokenIdentifier = tokens[0].identifier.clone().try_into().unwrap();
assert_eq!(token_identifier.max_date, 1690672432660)
}

#[test]
Expand Down Expand Up @@ -413,11 +465,13 @@ mod tests {
assert_eq!(tokens.len(), 1);
assert_eq!(tokens[0].kind, "HDFS_DELEGATION_TOKEN");
assert_eq!(tokens[0].service, "127.0.0.1:9000");
tokens.iter().for_each(|t| println!("{:?}", t));

let token_identifier: TokenIdentifier = tokens[0].identifier.clone().try_into().unwrap();
assert_eq!(token_identifier.max_date, 1686955057021)
}

#[test]
fn test_load_token_identifier() {
fn test_load_block_token_identifier() {
let token = [
138u8, 1, 142, 89, 190, 30, 189, 140, 100, 197, 210, 104, 0, 0, 0, 4, 104, 100, 102,
115, 0, 0, 0, 40, 66, 80, 45, 57, 55, 51, 52, 55, 55, 51, 54, 48, 45, 49, 57, 50, 46,
Expand All @@ -433,7 +487,6 @@ mod tests {
];

let token_identifier = BlockTokenIdentifier::from_identifier(&token).unwrap();
println!("{:?}", token_identifier);
assert_eq!(token_identifier.user_id, "hdfs");
assert_eq!(
token_identifier.block_pool_id,
Expand Down

0 comments on commit a206c54

Please sign in to comment.