diff --git a/.github/workflows/rust-test.yml b/.github/workflows/rust-test.yml index 09f2a06..87547f3 100644 --- a/.github/workflows/rust-test.yml +++ b/.github/workflows/rust-test.yml @@ -47,8 +47,8 @@ jobs: - name: Install native libs run: sudo apt-get install -y libkrb5-dev libgsasl-dev - # - name: build and lint with clippy - # run: cargo clippy --tests + - name: build and lint with clippy + run: cargo clippy --tests --features kerberos,token,integration-test,rs - name: Check docs run: cargo doc diff --git a/rust/src/client.rs b/rust/src/client.rs index 270db84..332454a 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -87,7 +87,7 @@ impl MountTable { let path = Path::new(src); for link in self.mounts.iter() { if let Some(resolved) = link.resolve(path) { - return (&link, resolved.to_string_lossy().into()); + return (link, resolved.to_string_lossy().into()); } } ( @@ -112,19 +112,7 @@ impl Client { /// host is treated as a name service that will be resolved using the HDFS config. pub fn new(url: &str) -> Result { let parsed_url = Url::parse(url)?; - Ok(Self::with_config(&parsed_url, Configuration::new()?)?) - } - - /// Creates a new HDFS Client based on the fs.defaultFs setting. - pub fn default() -> Result { - let config = Configuration::new()?; - let url = config - .get(config::DEFAULT_FS) - .ok_or(HdfsError::InvalidArgument(format!( - "No {} setting found", - config::DEFAULT_FS - )))?; - Ok(Self::with_config(&Url::parse(&url)?, config)?) + Self::with_config(&parsed_url, Configuration::new()?) } fn with_config(url: &Url, config: Configuration) -> Result { @@ -170,7 +158,7 @@ impl Client { "Only hdfs mounts are supported for viewfs".to_string(), )); } - let proxy = NameServiceProxy::new(&url, &config); + let proxy = NameServiceProxy::new(&url, config); let protocol = Arc::new(NamenodeProtocol::new(proxy)); if let Some(prefix) = viewfs_path { @@ -202,7 +190,7 @@ impl Client { pub async fn get_file_info(&self, path: &str) -> Result { let (link, resolved_path) = self.mount_table.resolve(path); match link.protocol.get_file_info(&resolved_path).await?.fs { - Some(status) => Ok(FileStatus::from(status, &path)), + Some(status) => Ok(FileStatus::from(status, path)), None => Err(HdfsError::FileNotFound(path.to_string())), } } @@ -340,6 +328,26 @@ impl Client { } } +impl Default for Client { + /// Creates a new HDFS Client based on the fs.defaultFS setting. Panics if the config files fail to load, + /// no defaultFS is defined, or the defaultFS is invalid. + fn default() -> Self { + let config = Configuration::new().expect("Failed to load configuration"); + let url = config + .get(config::DEFAULT_FS) + .ok_or(HdfsError::InvalidArgument(format!( + "No {} setting found", + config::DEFAULT_FS + ))) + .expect("No fs.defaultFS config defined"); + Self::with_config( + &Url::parse(&url).expect("Failed to parse fs.defaultFS"), + config, + ) + .expect("Failed to create default client") + } +} + pub(crate) struct DirListingIterator { path: String, resolved_path: String, @@ -386,14 +394,14 @@ impl DirListingIterator { .into_iter() .filter(|s| !self.files_only || s.file_type() != FileType::IsDir) .collect(); - Ok(self.partial_listing.len() > 0) + Ok(!self.partial_listing.is_empty()) } else { Err(HdfsError::FileNotFound(self.path.clone())) } } pub async fn next(&mut self) -> Option> { - if self.partial_listing.len() == 0 && self.remaining > 0 { + if self.partial_listing.is_empty() && self.remaining > 0 { if let Err(error) = self.get_next_batch().await { self.remaining = 0; return Some(Err(error)); @@ -482,7 +490,7 @@ impl FileStatus { fn from(value: HdfsFileStatusProto, base_path: &str) -> Self { let mut path = PathBuf::from(base_path); if let Ok(relative_path) = std::str::from_utf8(&value.path) { - if relative_path.len() > 0 { + if !relative_path.is_empty() { path.push(relative_path) } } diff --git a/rust/src/common/config.rs b/rust/src/common/config.rs index 46476a0..9895bc2 100644 --- a/rust/src/common/config.rs +++ b/rust/src/common/config.rs @@ -25,20 +25,17 @@ impl Configuration { pub fn new() -> io::Result { let mut map: HashMap = HashMap::new(); - match Self::get_conf_dir() { - Some(conf_dir) => { - for file in ["core-site.xml", "hdfs-site.xml"] { - let config_path = conf_dir.join(file); - if config_path.as_path().exists() { - Self::read_from_file(config_path.as_path())? - .into_iter() - .for_each(|(key, value)| { - map.insert(key, value); - }) - } + if let Some(conf_dir) = Self::get_conf_dir() { + for file in ["core-site.xml", "hdfs-site.xml"] { + let config_path = conf_dir.join(file); + if config_path.as_path().exists() { + Self::read_from_file(config_path.as_path())? + .into_iter() + .for_each(|(key, value)| { + map.insert(key, value); + }) } } - None => (), } Ok(Configuration { map }) @@ -46,7 +43,7 @@ impl Configuration { /// Get a value from the config, returning None if the key wasn't defined. pub fn get(&self, key: &str) -> Option { - self.map.get(key).map(|v| v.clone()) + self.map.get(key).cloned() } pub(crate) fn get_urls_for_nameservice(&self, nameservice: &str) -> Vec { @@ -54,7 +51,7 @@ impl Configuration { .get(&format!("{}.{}", HA_NAMENODES_PREFIX, nameservice)) .into_iter() .flat_map(|namenodes| { - namenodes.split(",").flat_map(|namenode_id| { + namenodes.split(',').flat_map(|namenode_id| { self.map .get(&format!( "{}.{}.{}", diff --git a/rust/src/file.rs b/rust/src/file.rs index e07aa02..c4b10de 100644 --- a/rust/src/file.rs +++ b/rust/src/file.rs @@ -51,8 +51,7 @@ impl FileReader { } else { let offset = self.position; self.position = usize::min(self.position + len, self.file_length()); - self.read_range(offset, self.position - offset as usize) - .await + self.read_range(offset, self.position - offset).await } } @@ -174,12 +173,12 @@ impl FileWriter { ) .await?; - Ok(BlockWriter::new( + BlockWriter::new( new_block.block, self.status.blocksize() as usize, self.server_defaults.clone(), ) - .await?) + .await } async fn get_block_writer(&mut self) -> Result<&mut BlockWriter> { diff --git a/rust/src/hdfs/connection.rs b/rust/src/hdfs/connection.rs index e5e264c..059d5d5 100644 --- a/rust/src/hdfs/connection.rs +++ b/rust/src/hdfs/connection.rs @@ -28,7 +28,7 @@ use crate::security::sasl::{SaslReader, SaslRpcClient, SaslWriter}; use crate::security::user::UserInfo; use crate::{HdfsError, Result}; -const PROTOCOL: &'static str = "org.apache.hadoop.hdfs.protocol.ClientProtocol"; +const PROTOCOL: &str = "org.apache.hadoop.hdfs.protocol.ClientProtocol"; const DATA_TRANSFER_VERSION: u16 = 28; const MAX_PACKET_HEADER_SIZE: usize = 33; @@ -160,30 +160,34 @@ impl RpcConnection { call_id: i32, retry_count: i32, ) -> common::RpcRequestHeaderProto { - let mut request_header = common::RpcRequestHeaderProto::default(); - request_header.rpc_kind = Some(common::RpcKindProto::RpcProtocolBuffer as i32); - // RPC_FINAL_PACKET - request_header.rpc_op = Some(0); - request_header.call_id = call_id; - request_header.client_id = self.client_id.clone(); - request_header.retry_count = Some(retry_count); - request_header.state_id = Some(self.alignment_context.state_id.load(Ordering::SeqCst)); - request_header.router_federated_state = self - .alignment_context - .router_federated_state - .as_ref() - .map(|state| state.lock().unwrap().clone()); - request_header + common::RpcRequestHeaderProto { + rpc_kind: Some(common::RpcKindProto::RpcProtocolBuffer as i32), + // RPC_FINAL_PACKET + rpc_op: Some(0), + call_id, + client_id: self.client_id.clone(), + retry_count: Some(retry_count), + state_id: Some(self.alignment_context.state_id.load(Ordering::SeqCst)), + router_federated_state: self + .alignment_context + .router_federated_state + .as_ref() + .map(|state| state.lock().unwrap().clone()), + ..Default::default() + } } fn get_connection_context(&self) -> common::IpcConnectionContextProto { - let mut context = common::IpcConnectionContextProto::default(); - context.protocol = Some(PROTOCOL.to_string()); + let user_info = common::UserInformationProto { + effective_user: self.user_info.effective_user.clone(), + real_user: self.user_info.real_user.clone(), + }; + + let context = common::IpcConnectionContextProto { + protocol: Some(PROTOCOL.to_string()), + user_info: Some(user_info), + }; - let mut user_info = common::UserInformationProto::default(); - user_info.effective_user = self.user_info.effective_user.clone(); - user_info.real_user = self.user_info.real_user.clone(); - context.user_info = Some(user_info); debug!("Connection context: {:?}", context); context } @@ -220,11 +224,11 @@ impl RpcConnection { let conn_header_buf = conn_header.encode_length_delimited_to_vec(); - let mut msg_header = common::RequestHeaderProto::default(); - msg_header.method_name = method_name.to_string(); - msg_header.declaring_class_protocol_name = PROTOCOL.to_string(); - msg_header.client_protocol_version = 1; - + let msg_header = common::RequestHeaderProto { + method_name: method_name.to_string(), + declaring_class_protocol_name: PROTOCOL.to_string(), + client_protocol_version: 1, + }; debug!("RPC request header: {:?}", msg_header); let header_buf = msg_header.encode_length_delimited_to_vec(); @@ -370,9 +374,11 @@ impl Packet { bytes_per_checksum: u32, max_packet_size: u32, ) -> Self { - let mut header = hdfs::PacketHeaderProto::default(); - header.offset_in_block = offset; - header.seqno = seqno; + let header = hdfs::PacketHeaderProto { + offset_in_block: offset, + seqno, + ..Default::default() + }; let num_chunks = Self::max_packet_chunks(bytes_per_checksum, max_packet_size); @@ -394,8 +400,8 @@ impl Packet { fn max_packet_chunks(bytes_per_checksum: u32, max_packet_size: u32) -> usize { let data_size = max_packet_size as usize - MAX_PACKET_HEADER_SIZE; let chunk_size = bytes_per_checksum as usize + CHECKSUM_BYTES; - let chunks = data_size / chunk_size; - chunks + + data_size / chunk_size } pub(crate) fn write(&mut self, buf: &mut Bytes) { @@ -470,7 +476,7 @@ pub(crate) struct DatanodeConnection { impl DatanodeConnection { pub(crate) async fn connect(url: &str) -> Result { - let stream = connect(&url).await?; + let stream = connect(url).await?; let (reader, writer) = stream.into_split(); @@ -499,14 +505,16 @@ impl DatanodeConnection { block: &hdfs::ExtendedBlockProto, token: Option, ) -> hdfs::ClientOperationHeaderProto { - let mut base_header = hdfs::BaseHeaderProto::default(); - base_header.block = block.clone(); - base_header.token = token; + let base_header = hdfs::BaseHeaderProto { + block: block.clone(), + token, + ..Default::default() + }; - let mut header = hdfs::ClientOperationHeaderProto::default(); - header.base_header = base_header; - header.client_name = self.client_name.clone(); - header + hdfs::ClientOperationHeaderProto { + base_header, + client_name: self.client_name.clone(), + } } pub(crate) async fn read_block_op_response(&mut self) -> Result { @@ -624,13 +632,10 @@ mod test { #[test] fn test_max_packet_header_size() { // Create a dummy header to get its size - let mut header = hdfs::PacketHeaderProto::default(); - header.offset_in_block = 0; - header.seqno = 0; - header.data_len = 0; - header.last_packet_in_block = false; - header.sync_block = Some(false); - + let header = hdfs::PacketHeaderProto { + sync_block: Some(false), + ..Default::default() + }; // Add 4 bytes for size of whole packet and 2 bytes for size of header assert_eq!(MAX_PACKET_HEADER_SIZE, header.encoded_len() + 4 + 2); } diff --git a/rust/src/hdfs/datanode.rs b/rust/src/hdfs/datanode.rs index 07269f9..464860c 100644 --- a/rust/src/hdfs/datanode.rs +++ b/rust/src/hdfs/datanode.rs @@ -73,12 +73,13 @@ impl ReplicatedBlockStream { DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) .await?; - let mut message = hdfs::OpReadBlockProto::default(); - message.header = - connection.build_header(&self.block.b, Some(self.block.block_token.clone())); - message.offset = self.offset as u64; - message.len = self.len as u64; - message.send_checksums = Some(true); + let message = hdfs::OpReadBlockProto { + header: connection.build_header(&self.block.b, Some(self.block.block_token.clone())), + offset: self.offset as u64, + len: self.len as u64, + send_checksums: Some(true), + ..Default::default() + }; debug!("Block read op request {:?}", &message); @@ -203,7 +204,7 @@ impl StripedBlockStream { .block .block_indices() .iter() - .map(|i| *i) + .copied() .zip(self.block.locs.iter()) .collect(); @@ -241,7 +242,7 @@ impl StripedBlockStream { .read_vertical_stripe( &self.ec_schema, block_index, - Some(&&datanode_info), + Some(datanode_info), block_start, block_read_len, ) @@ -315,17 +316,10 @@ impl StripedBlockStream { .block_indices() .iter() .position(|x| *x == index) - .unwrap() as usize]; - - self.read_from_datanode( - &datanode_info.id, - &block, - &token, - offset, - read_len, - &mut buf, - ) - .await?; + .unwrap()]; + + self.read_from_datanode(&datanode_info.id, &block, token, offset, read_len, &mut buf) + .await?; } Ok(buf) @@ -346,12 +340,13 @@ impl StripedBlockStream { DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) .await?; - let mut message = hdfs::OpReadBlockProto::default(); - message.header = conn.build_header(&block, Some(token.clone())); - message.offset = offset as u64; - message.len = len as u64; - message.send_checksums = Some(true); - + let message = hdfs::OpReadBlockProto { + header: conn.build_header(block, Some(token.clone())), + offset: offset as u64, + len: len as u64, + send_checksums: Some(true), + ..Default::default() + }; debug!("Block read op request {:?}", &message); conn.send(Op::ReadBlock, &message).await?; @@ -418,23 +413,24 @@ impl BlockWriter { DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) .await?; - let mut message = hdfs::OpWriteBlockProto::default(); - message.header = connection.build_header(&block.b, Some(block.block_token.clone())); - message.stage = - hdfs::op_write_block_proto::BlockConstructionStage::PipelineSetupCreate as i32; - message.targets = block.locs[1..].to_vec(); - message.pipeline_size = block.locs.len() as u32; - message.latest_generation_stamp = 0; //block.b.generation_stamp; - - let mut checksum = hdfs::ChecksumProto::default(); - checksum.r#type = hdfs::ChecksumTypeProto::ChecksumCrc32c as i32; - checksum.bytes_per_checksum = server_defaults.bytes_per_checksum; - message.requested_checksum = checksum; + let checksum = hdfs::ChecksumProto { + r#type: hdfs::ChecksumTypeProto::ChecksumCrc32c as i32, + bytes_per_checksum: server_defaults.bytes_per_checksum, + }; - message.storage_type = Some(block.storage_types[0].clone()); - message.target_storage_types = block.storage_types[1..].to_vec(); - message.storage_id = Some(block.storage_i_ds[0].clone()); - message.target_storage_ids = block.storage_i_ds[1..].to_vec(); + let message = hdfs::OpWriteBlockProto { + header: connection.build_header(&block.b, Some(block.block_token.clone())), + stage: hdfs::op_write_block_proto::BlockConstructionStage::PipelineSetupCreate as i32, + targets: block.locs[1..].to_vec(), + pipeline_size: block.locs.len() as u32, + latest_generation_stamp: 0, + requested_checksum: checksum, + storage_type: Some(block.storage_types[0]), + target_storage_types: block.storage_types[1..].to_vec(), + storage_id: Some(block.storage_i_ds[0].clone()), + target_storage_ids: block.storage_i_ds[1..].to_vec(), + ..Default::default() + }; debug!("Block write request: {:?}", &message); @@ -571,7 +567,7 @@ impl BlockWriter { "Status channel closed while waiting for final ack".to_string(), ) })?; - let _ = result?; + result?; } else { return Err(HdfsError::DataTransferError( "Block already closed".to_string(), diff --git a/rust/src/hdfs/ec.rs b/rust/src/hdfs/ec.rs index c7f1bf3..c574158 100644 --- a/rust/src/hdfs/ec.rs +++ b/rust/src/hdfs/ec.rs @@ -50,12 +50,12 @@ impl EcSchema { let remaining_block_bytes = block_size - full_row_bytes; - let bytes_in_last_row: usize = if remaining_block_bytes < index as usize * self.cell_size { + let bytes_in_last_row: usize = if remaining_block_bytes < index * self.cell_size { 0 - } else if remaining_block_bytes > (index + 1) as usize * self.cell_size { + } else if remaining_block_bytes > (index + 1) * self.cell_size { self.cell_size } else { - remaining_block_bytes - index as usize * self.cell_size + remaining_block_bytes - index * self.cell_size }; full_rows * self.cell_size + bytes_in_last_row } @@ -88,14 +88,8 @@ impl EcSchema { } while vertical_stripes[0].as_ref().is_some_and(|b| !b.is_empty()) { - for index in 0..self.data_units { - cells.push( - vertical_stripes[index as usize] - .as_mut() - .unwrap() - .split_to(self.cell_size) - .freeze(), - ) + for stripe in vertical_stripes.iter_mut().take(self.data_units) { + cells.push(stripe.as_mut().unwrap().split_to(self.cell_size).freeze()) } } diff --git a/rust/src/hdfs/protocol.rs b/rust/src/hdfs/protocol.rs index eb914fc..58ff676 100644 --- a/rust/src/hdfs/protocol.rs +++ b/rust/src/hdfs/protocol.rs @@ -15,16 +15,14 @@ pub(crate) struct NamenodeProtocol { impl NamenodeProtocol { pub(crate) fn new(proxy: NameServiceProxy) -> Self { - let client_name = format!( - "hdfs_native_client-{}", - Uuid::new_v4().as_hyphenated().to_string() - ); + let client_name = format!("hdfs_native_client-{}", Uuid::new_v4().as_hyphenated()); NamenodeProtocol { proxy, client_name } } pub(crate) async fn get_file_info(&self, src: &str) -> Result { - let mut message = hdfs::GetFileInfoRequestProto::default(); - message.src = src.to_string(); + let message = hdfs::GetFileInfoRequestProto { + src: src.to_string(), + }; debug!("get_file_info request: {:?}", &message); let response = self @@ -44,10 +42,11 @@ impl NamenodeProtocol { start_after: Vec, need_location: bool, ) -> Result { - let mut message = hdfs::GetListingRequestProto::default(); - message.src = src.to_string(); - message.start_after = start_after; - message.need_location = need_location; + let message = hdfs::GetListingRequestProto { + src: src.to_string(), + start_after, + need_location, + }; debug!("get_listing request: {:?}", &message); let response = self @@ -64,9 +63,10 @@ impl NamenodeProtocol { &self, src: &str, ) -> Result { - let mut message = hdfs::GetLocatedFileInfoRequestProto::default(); - message.src = Some(src.to_string()); - message.need_block_token = Some(true); + let message = hdfs::GetLocatedFileInfoRequestProto { + src: Some(src.to_string()), + need_block_token: Some(true), + }; debug!("getLocatedFileInfo request: {:?}", &message); let response = self @@ -107,21 +107,22 @@ impl NamenodeProtocol { replication: u32, block_size: u64, ) -> Result { - let mut masked = hdfs::FsPermissionProto::default(); - masked.perm = permission; - - let mut message = hdfs::CreateRequestProto::default(); - message.src = src.to_string(); - message.masked = masked; - message.client_name = self.client_name.clone(); - if overwrite { - message.create_flag = hdfs::CreateFlagProto::Overwrite as u32; - } else { - message.create_flag = hdfs::CreateFlagProto::Create as u32; - } - message.create_parent = create_parent; - message.replication = replication; - message.block_size = block_size; + let masked = hdfs::FsPermissionProto { perm: permission }; + + let message = hdfs::CreateRequestProto { + src: src.to_string(), + masked, + client_name: self.client_name.clone(), + create_parent, + replication, + block_size, + create_flag: if overwrite { + hdfs::CreateFlagProto::Overwrite + } else { + hdfs::CreateFlagProto::Create + } as u32, + ..Default::default() + }; debug!("create request: {:?}", &message); @@ -141,11 +142,13 @@ impl NamenodeProtocol { previous: Option, file_id: Option, ) -> Result { - let mut message = hdfs::AddBlockRequestProto::default(); - message.src = src.to_string(); - message.client_name = self.client_name.clone(); - message.previous = previous; - message.file_id = file_id; + let message = hdfs::AddBlockRequestProto { + src: src.to_string(), + client_name: self.client_name.clone(), + previous, + file_id, + ..Default::default() + }; debug!("add_block request: {:?}", &message); @@ -165,12 +168,12 @@ impl NamenodeProtocol { last: Option, file_id: Option, ) -> Result { - let mut message = hdfs::CompleteRequestProto::default(); - message.src = src.to_string(); - message.client_name = self.client_name.clone(); - message.last = last; - message.file_id = file_id; - + let message = hdfs::CompleteRequestProto { + src: src.to_string(), + client_name: self.client_name.clone(), + last, + file_id, + }; debug!("complete request: {:?}", &message); let response = self @@ -189,14 +192,14 @@ impl NamenodeProtocol { permission: u32, create_parent: bool, ) -> Result { - let mut masked = hdfs::FsPermissionProto::default(); - masked.perm = permission; - - let mut message = hdfs::MkdirsRequestProto::default(); - message.src = src.to_string(); - message.masked = masked; - message.create_parent = create_parent; - + let masked = hdfs::FsPermissionProto { perm: permission }; + + let message = hdfs::MkdirsRequestProto { + src: src.to_string(), + masked, + create_parent, + ..Default::default() + }; debug!("mkdirs request: {:?}", &message); let response = self @@ -215,11 +218,12 @@ impl NamenodeProtocol { dst: &str, overwrite: bool, ) -> Result { - let mut message = hdfs::Rename2RequestProto::default(); - message.src = src.to_string(); - message.dst = dst.to_string(); - message.overwrite_dest = overwrite; - + let message = hdfs::Rename2RequestProto { + src: src.to_string(), + dst: dst.to_string(), + overwrite_dest: overwrite, + ..Default::default() + }; debug!("rename request: {:?}", &message); let response = self @@ -237,9 +241,10 @@ impl NamenodeProtocol { src: &str, recursive: bool, ) -> Result { - let mut message = hdfs::DeleteRequestProto::default(); - message.src = src.to_string(); - message.recursive = recursive; + let message = hdfs::DeleteRequestProto { + src: src.to_string(), + recursive, + }; debug!("delete request: {:?}", &message); let response = self diff --git a/rust/src/hdfs/proxy.rs b/rust/src/hdfs/proxy.rs index 858cadb..a8763b5 100644 --- a/rust/src/hdfs/proxy.rs +++ b/rust/src/hdfs/proxy.rs @@ -50,7 +50,7 @@ impl ProxyConnection { RpcConnection::connect( &self.url, self.alignment_context.clone(), - self.nameservice.as_ref().map(|ns| ns.as_str()), + self.nameservice.as_deref(), ) .await?, ); @@ -135,7 +135,7 @@ impl NameServiceProxy { let result = self.proxy_connections[proxy_index] .lock() .await - .call(&method_name, &message) + .call(method_name, &message) .await; match result { diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 6d3ace8..88d688d 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -1,3 +1,4 @@ +// #![warn(missing_docs)] //! Native HDFS client implementation in Rust //! //! # Usage diff --git a/rust/src/minidfs.rs b/rust/src/minidfs.rs index b38beda..88c7fbf 100644 --- a/rust/src/minidfs.rs +++ b/rust/src/minidfs.rs @@ -75,7 +75,7 @@ impl MiniDfs { if ready != "Ready!" { println!("Failed to start minidfs"); println!("{}", ready); - while let Some(line) = output.next() { + for line in output.by_ref() { println!("{}", line.unwrap()); } panic!(); @@ -100,7 +100,7 @@ impl MiniDfs { env::set_var("KRB5_CONFIG", &krb_conf); env::set_var( "HADOOP_OPTS", - &format!("-Djava.security.krb5.conf={}", &krb_conf), + format!("-Djava.security.krb5.conf={}", &krb_conf), ); // If we testing token auth, set the path to the file and make sure we don't have an old kinit, otherwise kinit diff --git a/rust/src/proto/mod.rs b/rust/src/proto/mod.rs index 37a06f7..c438f06 100644 --- a/rust/src/proto/mod.rs +++ b/rust/src/proto/mod.rs @@ -1,3 +1,4 @@ +#[allow(clippy::all)] pub mod common { #[cfg(feature = "generate-protobuf")] include!(concat!(env!("OUT_DIR"), "/hadoop.common.rs")); @@ -5,6 +6,7 @@ pub mod common { include!(concat!("hadoop.common.rs")); } +#[allow(clippy::all)] pub mod hdfs { #[cfg(feature = "generate-protobuf")] include!(concat!(env!("OUT_DIR"), "/hadoop.hdfs.rs")); diff --git a/rust/src/security/gssapi.rs b/rust/src/security/gssapi.rs index 26ae7c7..f3a5df7 100644 --- a/rust/src/security/gssapi.rs +++ b/rust/src/security/gssapi.rs @@ -11,7 +11,7 @@ use super::user::User; #[repr(u8)] enum SecurityLayer { - NoSecurityLayer = 1, + None = 1, Integrity = 2, Confidentiality = 4, } @@ -103,11 +103,8 @@ impl SaslSession for GssapiSession { [SecurityLayer::Integrity as u8, 0xFF, 0xFF, 0xFF], Some(false), ) - } else if supported_sec & SecurityLayer::NoSecurityLayer as u8 > 0 { - ( - [SecurityLayer::NoSecurityLayer as u8, 0x00, 0x00, 0x00], - None, - ) + } else if supported_sec & SecurityLayer::None as u8 > 0 { + ([SecurityLayer::None as u8, 0x00, 0x00, 0x00], None) } else { return Err(HdfsError::SASLError( "No supported security layer found".to_string(), diff --git a/rust/src/security/sasl.rs b/rust/src/security/sasl.rs index 087f863..0eaf55a 100644 --- a/rust/src/security/sasl.rs +++ b/rust/src/security/sasl.rs @@ -35,16 +35,16 @@ const SASL_CALL_ID: i32 = -33; const HDFS_DELEGATION_TOKEN: &str = "HDFS_DELEGATION_TOKEN"; pub(crate) enum AuthMethod { - SIMPLE, - KERBEROS, - TOKEN, + Simple, + Kerberos, + Token, } impl AuthMethod { fn parse(method: &str) -> Option { match method { - "SIMPLE" => Some(Self::SIMPLE), - "KERBEROS" => Some(Self::KERBEROS), - "TOKEN" => Some(Self::TOKEN), + "SIMPLE" => Some(Self::Simple), + "KERBEROS" => Some(Self::Kerberos), + "TOKEN" => Some(Self::Token), _ => None, } } @@ -81,8 +81,10 @@ impl SaslRpcClient { /// Service should be the connection host:port for a single NameNode connection, or the /// name service name when connecting to HA NameNodes. pub(crate) async fn negotiate(&mut self, service: &str) -> Result { - let mut rpc_sasl = RpcSaslProto::default(); - rpc_sasl.state = SaslState::Negotiate as i32; + let rpc_sasl = RpcSaslProto { + state: SaslState::Negotiate as i32, + ..Default::default() + }; self.writer.send_sasl_message(&rpc_sasl).await?; @@ -115,10 +117,12 @@ impl SaslRpcClient { // Response shouldn't contain the challenge selected_auth.challenge = None; - let mut r = RpcSaslProto::default(); - r.state = SaslState::Initiate as i32; - r.auths = Vec::from([selected_auth]); - r.token = token.or(Some(Vec::new())); + let r = RpcSaslProto { + state: SaslState::Initiate as i32, + auths: Vec::from([selected_auth]), + token: token.or(Some(Vec::new())), + ..Default::default() + }; response = Some(r); } SaslState::Challenge => { @@ -127,9 +131,11 @@ impl SaslRpcClient { .unwrap() .step(message.token.as_ref().map(|t| &t[..]))?; - let mut r = RpcSaslProto::default(); - r.state = SaslState::Response as i32; - r.token = Some(token); + let r = RpcSaslProto { + state: SaslState::Response as i32, + token: Some(token), + ..Default::default() + }; response = Some(r); } SaslState::Success => { @@ -169,7 +175,7 @@ impl SaslRpcClient { fn select_method( &mut self, - auths: &Vec, + auths: &[SaslAuth], service: &str, ) -> Result<(SaslAuth, Option>)> { let user = User::get(); @@ -178,16 +184,16 @@ impl SaslRpcClient { AuthMethod::parse(&auth.method), user.get_token(HDFS_DELEGATION_TOKEN, service), ) { - (Some(AuthMethod::SIMPLE), _) => { + (Some(AuthMethod::Simple), _) => { return Ok((auth.clone(), None)); } #[cfg(feature = "kerberos")] - (Some(AuthMethod::KERBEROS), _) => { + (Some(AuthMethod::Kerberos), _) => { let session = GssapiSession::new(auth.protocol(), auth.server_id())?; return Ok((auth.clone(), Some(Box::new(session)))); } #[cfg(feature = "token")] - (Some(AuthMethod::TOKEN), Some(token)) => { + (Some(AuthMethod::Token), Some(token)) => { debug!("Using token {:?}", token); let session = GSASLSession::new(auth.protocol(), auth.server_id(), token)?; @@ -332,14 +338,15 @@ impl SaslWriter { } fn create_request_header() -> RpcRequestHeaderProto { - let mut request_header = RpcRequestHeaderProto::default(); - request_header.rpc_kind = Some(RpcKindProto::RpcProtocolBuffer as i32); - // RPC_FINAL_PACKET - request_header.rpc_op = Some(0); - request_header.call_id = SASL_CALL_ID; - request_header.client_id = Vec::new(); - request_header.retry_count = Some(-1); - request_header + RpcRequestHeaderProto { + rpc_kind: Some(RpcKindProto::RpcProtocolBuffer as i32), + // RPC_FINAL_PACKET + rpc_op: Some(0), + call_id: SASL_CALL_ID, + client_id: Vec::new(), + retry_count: Some(-1), + ..Default::default() + } } async fn send_sasl_message(&mut self, message: &RpcSaslProto) -> io::Result<()> { @@ -357,8 +364,10 @@ impl SaslWriter { pub(crate) async fn write(&mut self, buf: &[u8]) -> io::Result<()> { if self.session.is_some() { - let mut rpc_sasl = RpcSaslProto::default(); - rpc_sasl.state = SaslState::Wrap as i32; + let mut rpc_sasl = RpcSaslProto { + state: SaslState::Wrap as i32, + ..Default::default() + }; // let mut writer = Vec::with_capacity(buf.len()).writer(); let encoded = self diff --git a/rust/src/security/user.rs b/rust/src/security/user.rs index f16ec61..1285718 100644 --- a/rust/src/security/user.rs +++ b/rust/src/security/user.rs @@ -16,14 +16,6 @@ const HADOOP_PROXY_USER: &str = "HADOOP_PROXY_USER"; const HADOOP_TOKEN_FILE_LOCATION: &str = "HADOOP_TOKEN_FILE_LOCATION"; const TOKEN_STORAGE_MAGIC: &[u8] = "HDTS".as_bytes(); -#[derive(Clone)] -#[allow(dead_code)] -pub(crate) enum LoginMethod { - SIMPLE, - KERBEROS, - TOKEN, -} - #[derive(Debug)] pub struct Token { pub alias: String, @@ -36,7 +28,7 @@ pub struct Token { impl Token { fn load_tokens() -> Vec { match env::var(HADOOP_TOKEN_FILE_LOCATION).map(PathBuf::from) { - Ok(path) if path.exists() => Self::read_token_file(path).ok().unwrap_or_else(Vec::new), + Ok(path) if path.exists() => Self::read_token_file(path).ok().unwrap_or_default(), _ => Vec::new(), } } @@ -152,11 +144,11 @@ fn parse_vlong(reader: &mut impl Buf) -> i64 { let mut i = 0i64; for _ in 0..length - 1 { let b = reader.get_u8(); - i = i << 8; - i = i | (b & 0xFF) as i64; + i <<= 8; + i |= b as i64; } - let is_negative = first_byte < -120 || (first_byte >= -112 && first_byte < 0); + let is_negative = first_byte < -120 || (-112..0).contains(&first_byte); if is_negative { i ^ -1 @@ -216,9 +208,9 @@ impl User { #[cfg(feature = "kerberos")] pub(crate) fn get_user_from_principal(principal: &str) -> String { // If there's a /, take the part before it. - if let Some(index) = principal.find("/") { + if let Some(index) = principal.find('/') { principal[0..index].to_string() - } else if let Some(index) = principal.find("@") { + } else if let Some(index) = principal.find('@') { principal[0..index].to_string() } else { principal.to_string() @@ -240,7 +232,7 @@ mod tests { let b64_token = "SERUUwABDjEyNy4wLjAuMTo5MDAwLgAaaGRmcy9sb2NhbGhvc3RARVhBTVBMRS5DT00AAIoBiX/hghSKAYmj7gYUAQIUadF4ni3ObKqU8niv40WBFsGhFm4VSERGU19ERUxFR0FUSU9OX1RPS0VODjEyNy4wLjAuMTo5MDAwAA=="; let mut token_file = NamedTempFile::new().unwrap(); token_file - .write( + .write_all( general_purpose::STANDARD .decode(b64_token) .unwrap() @@ -268,7 +260,7 @@ mod tests { let b64_token = "SERUUwGBAQp/Cg5sb2NhbGhvc3Q6OTAwMBJtCi4AGmhkZnMvbG9jYWxob3N0QEVYQU1QTEUuQ09NAACKAYiiTtt9igGIxltffQECEhQoROcYNFMxMuoK9UHlAna6ZmhQSBoVSERGU19ERUxFR0FUSU9OX1RPS0VOIg4xMjcuMC4wLjE6OTAwMA=="; let mut token_file = NamedTempFile::new().unwrap(); token_file - .write( + .write_all( general_purpose::STANDARD .decode(b64_token) .unwrap() diff --git a/rust/tests/common/mod.rs b/rust/tests/common/mod.rs index 5c3d4b7..9969a89 100644 --- a/rust/tests/common/mod.rs +++ b/rust/tests/common/mod.rs @@ -1,6 +1,7 @@ +#![allow(dead_code)] use std::collections::HashSet; use std::io::{BufWriter, Write}; -use std::process::{Command, Stdio}; +use std::process::Command; use tempfile::NamedTempFile; use which::which; @@ -18,7 +19,7 @@ pub fn setup(features: &HashSet) -> MiniDfs { let mut writer = BufWriter::new(file.as_file_mut()); for i in 0..TEST_FILE_INTS as i32 { let bytes = i.to_be_bytes(); - writer.write(&bytes).unwrap(); + writer.write_all(&bytes).unwrap(); } writer.flush().unwrap(); } diff --git a/rust/tests/test_ec.rs b/rust/tests/test_ec.rs index bf18fe8..03505a4 100644 --- a/rust/tests/test_ec.rs +++ b/rust/tests/test_ec.rs @@ -23,7 +23,7 @@ mod test { let mut writer = BufWriter::new(file.as_file_mut()); for i in 0..num_ints as u32 { let bytes = i.to_be_bytes(); - writer.write(&bytes)?; + writer.write_all(&bytes)?; } writer.flush()?; } @@ -63,7 +63,7 @@ mod test { let dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::EC, DfsFeatures::SECURITY])); #[cfg(not(feature = "kerberos"))] let dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::EC])); - let client = Client::default()?; + let client = Client::default(); // Test each of Hadoop's built-in RS policies for (data, parity) in [(3usize, 2usize), (6, 3), (10, 4)] { @@ -91,7 +91,7 @@ mod test { for faults in 0..parity { let _ = EC_FAULT_INJECTOR.lock().unwrap().insert(EcFaultInjection { - fail_blocks: (0..faults).into_iter().collect(), + fail_blocks: (0..faults).collect(), }); let data = reader.read_range(0, reader.file_length()).await?; verify_read(data, file_size); @@ -99,7 +99,7 @@ mod test { // Fail more than the number of parity shards, read should fail let _ = EC_FAULT_INJECTOR.lock().unwrap().insert(EcFaultInjection { - fail_blocks: (0..=parity).into_iter().collect(), + fail_blocks: (0..=parity).collect(), }); assert!(reader.read_range(0, reader.file_length()).await.is_err()); diff --git a/rust/tests/test_integration.rs b/rust/tests/test_integration.rs index 1bb4205..23c80c6 100644 --- a/rust/tests/test_integration.rs +++ b/rust/tests/test_integration.rs @@ -97,7 +97,7 @@ mod test { let _ = env_logger::builder().is_test(true).try_init(); let _dfs = setup(features); - let client = Client::default()?; + let client = Client::default(); test_file_info(&client).await?; test_listing(&client).await?; @@ -137,9 +137,7 @@ mod test { } // Read a single integer from the file - let mut buf = reader - .read_range(TEST_FILE_INTS as usize / 2 * 4, 4) - .await?; + let mut buf = reader.read_range(TEST_FILE_INTS / 2 * 4, 4).await?; assert_eq!(buf.get_i32(), TEST_FILE_INTS as i32 / 2); // Read the whole file in 1 MiB chunks @@ -175,7 +173,7 @@ mod test { assert!(client .list_status("/testdir", false) .await - .is_ok_and(|s| s.len() == 0)); + .is_ok_and(|s| s.is_empty())); client.delete("/testdir", false).await?; assert!(client.list_status("/testdir", false).await.is_err()); diff --git a/rust/tests/test_viewfs.rs b/rust/tests/test_viewfs.rs index 37166fd..790e42f 100644 --- a/rust/tests/test_viewfs.rs +++ b/rust/tests/test_viewfs.rs @@ -30,7 +30,7 @@ mod test { // /mount1 resolves to hdfs://ns0/nested // fallback resolves to hdfs://ns1/nested let _dfs = MiniDfs::with_features(&features); - let viewfs = Client::default().unwrap(); + let viewfs = Client::default(); let hdfs1 = Client::new("hdfs://ns0").unwrap(); let hdfs2 = Client::new("hdfs://ns1").unwrap();