Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix clippy warnings and enforce in CI #38

Merged
merged 13 commits into from
Oct 24, 2023
4 changes: 2 additions & 2 deletions .github/workflows/rust-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ jobs:
- name: Install native libs
run: sudo apt-get install -y libkrb5-dev libgsasl-dev

# - name: build and lint with clippy
# run: cargo clippy --tests
- name: build and lint with clippy
run: cargo clippy --tests --features kerberos,token,integration-test,rs

- name: Check docs
run: cargo doc
Expand Down
46 changes: 27 additions & 19 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl MountTable {
let path = Path::new(src);
for link in self.mounts.iter() {
if let Some(resolved) = link.resolve(path) {
return (&link, resolved.to_string_lossy().into());
return (link, resolved.to_string_lossy().into());
}
}
(
Expand All @@ -112,19 +112,7 @@ impl Client {
/// host is treated as a name service that will be resolved using the HDFS config.
pub fn new(url: &str) -> Result<Self> {
let parsed_url = Url::parse(url)?;
Ok(Self::with_config(&parsed_url, Configuration::new()?)?)
}

/// Creates a new HDFS Client based on the fs.defaultFs setting.
pub fn default() -> Result<Self> {
let config = Configuration::new()?;
let url = config
.get(config::DEFAULT_FS)
.ok_or(HdfsError::InvalidArgument(format!(
"No {} setting found",
config::DEFAULT_FS
)))?;
Ok(Self::with_config(&Url::parse(&url)?, config)?)
Self::with_config(&parsed_url, Configuration::new()?)
}

fn with_config(url: &Url, config: Configuration) -> Result<Self> {
Expand Down Expand Up @@ -170,7 +158,7 @@ impl Client {
"Only hdfs mounts are supported for viewfs".to_string(),
));
}
let proxy = NameServiceProxy::new(&url, &config);
let proxy = NameServiceProxy::new(&url, config);
let protocol = Arc::new(NamenodeProtocol::new(proxy));

if let Some(prefix) = viewfs_path {
Expand Down Expand Up @@ -202,7 +190,7 @@ impl Client {
pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
let (link, resolved_path) = self.mount_table.resolve(path);
match link.protocol.get_file_info(&resolved_path).await?.fs {
Some(status) => Ok(FileStatus::from(status, &path)),
Some(status) => Ok(FileStatus::from(status, path)),
None => Err(HdfsError::FileNotFound(path.to_string())),
}
}
Expand Down Expand Up @@ -340,6 +328,26 @@ impl Client {
}
}

impl Default for Client {
/// Creates a new HDFS Client based on the fs.defaultFS setting. Panics if the config files fail to load,
/// no defaultFS is defined, or the defaultFS is invalid.
fn default() -> Self {
let config = Configuration::new().expect("Failed to load configuration");
let url = config
.get(config::DEFAULT_FS)
.ok_or(HdfsError::InvalidArgument(format!(
"No {} setting found",
config::DEFAULT_FS
)))
.expect("No fs.defaultFS config defined");
Self::with_config(
&Url::parse(&url).expect("Failed to parse fs.defaultFS"),
config,
)
.expect("Failed to create default client")
}
}

pub(crate) struct DirListingIterator {
path: String,
resolved_path: String,
Expand Down Expand Up @@ -386,14 +394,14 @@ impl DirListingIterator {
.into_iter()
.filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
.collect();
Ok(self.partial_listing.len() > 0)
Ok(!self.partial_listing.is_empty())
} else {
Err(HdfsError::FileNotFound(self.path.clone()))
}
}

pub async fn next(&mut self) -> Option<Result<FileStatus>> {
if self.partial_listing.len() == 0 && self.remaining > 0 {
if self.partial_listing.is_empty() && self.remaining > 0 {
if let Err(error) = self.get_next_batch().await {
self.remaining = 0;
return Some(Err(error));
Expand Down Expand Up @@ -482,7 +490,7 @@ impl FileStatus {
fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
let mut path = PathBuf::from(base_path);
if let Ok(relative_path) = std::str::from_utf8(&value.path) {
if relative_path.len() > 0 {
if !relative_path.is_empty() {
path.push(relative_path)
}
}
Expand Down
25 changes: 11 additions & 14 deletions rust/src/common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,33 @@ impl Configuration {
pub fn new() -> io::Result<Self> {
let mut map: HashMap<String, String> = HashMap::new();

match Self::get_conf_dir() {
Some(conf_dir) => {
for file in ["core-site.xml", "hdfs-site.xml"] {
let config_path = conf_dir.join(file);
if config_path.as_path().exists() {
Self::read_from_file(config_path.as_path())?
.into_iter()
.for_each(|(key, value)| {
map.insert(key, value);
})
}
if let Some(conf_dir) = Self::get_conf_dir() {
for file in ["core-site.xml", "hdfs-site.xml"] {
let config_path = conf_dir.join(file);
if config_path.as_path().exists() {
Self::read_from_file(config_path.as_path())?
.into_iter()
.for_each(|(key, value)| {
map.insert(key, value);
})
}
}
None => (),
}

Ok(Configuration { map })
}

/// Get a value from the config, returning None if the key wasn't defined.
pub fn get(&self, key: &str) -> Option<String> {
self.map.get(key).map(|v| v.clone())
self.map.get(key).cloned()
}

pub(crate) fn get_urls_for_nameservice(&self, nameservice: &str) -> Vec<String> {
self.map
.get(&format!("{}.{}", HA_NAMENODES_PREFIX, nameservice))
.into_iter()
.flat_map(|namenodes| {
namenodes.split(",").flat_map(|namenode_id| {
namenodes.split(',').flat_map(|namenode_id| {
self.map
.get(&format!(
"{}.{}.{}",
Expand Down
7 changes: 3 additions & 4 deletions rust/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ impl FileReader {
} else {
let offset = self.position;
self.position = usize::min(self.position + len, self.file_length());
self.read_range(offset, self.position - offset as usize)
.await
self.read_range(offset, self.position - offset).await
}
}

Expand Down Expand Up @@ -174,12 +173,12 @@ impl FileWriter {
)
.await?;

Ok(BlockWriter::new(
BlockWriter::new(
new_block.block,
self.status.blocksize() as usize,
self.server_defaults.clone(),
)
.await?)
.await
}

async fn get_block_writer(&mut self) -> Result<&mut BlockWriter> {
Expand Down
97 changes: 51 additions & 46 deletions rust/src/hdfs/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::security::sasl::{SaslReader, SaslRpcClient, SaslWriter};
use crate::security::user::UserInfo;
use crate::{HdfsError, Result};

const PROTOCOL: &'static str = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
const PROTOCOL: &str = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
const DATA_TRANSFER_VERSION: u16 = 28;

const MAX_PACKET_HEADER_SIZE: usize = 33;
Expand Down Expand Up @@ -160,30 +160,34 @@ impl RpcConnection {
call_id: i32,
retry_count: i32,
) -> common::RpcRequestHeaderProto {
let mut request_header = common::RpcRequestHeaderProto::default();
request_header.rpc_kind = Some(common::RpcKindProto::RpcProtocolBuffer as i32);
// RPC_FINAL_PACKET
request_header.rpc_op = Some(0);
request_header.call_id = call_id;
request_header.client_id = self.client_id.clone();
request_header.retry_count = Some(retry_count);
request_header.state_id = Some(self.alignment_context.state_id.load(Ordering::SeqCst));
request_header.router_federated_state = self
.alignment_context
.router_federated_state
.as_ref()
.map(|state| state.lock().unwrap().clone());
request_header
common::RpcRequestHeaderProto {
rpc_kind: Some(common::RpcKindProto::RpcProtocolBuffer as i32),
// RPC_FINAL_PACKET
rpc_op: Some(0),
call_id,
client_id: self.client_id.clone(),
retry_count: Some(retry_count),
state_id: Some(self.alignment_context.state_id.load(Ordering::SeqCst)),
router_federated_state: self
.alignment_context
.router_federated_state
.as_ref()
.map(|state| state.lock().unwrap().clone()),
..Default::default()
}
}

fn get_connection_context(&self) -> common::IpcConnectionContextProto {
let mut context = common::IpcConnectionContextProto::default();
context.protocol = Some(PROTOCOL.to_string());
let user_info = common::UserInformationProto {
effective_user: self.user_info.effective_user.clone(),
real_user: self.user_info.real_user.clone(),
};

let context = common::IpcConnectionContextProto {
protocol: Some(PROTOCOL.to_string()),
user_info: Some(user_info),
};

let mut user_info = common::UserInformationProto::default();
user_info.effective_user = self.user_info.effective_user.clone();
user_info.real_user = self.user_info.real_user.clone();
context.user_info = Some(user_info);
debug!("Connection context: {:?}", context);
context
}
Expand Down Expand Up @@ -220,11 +224,11 @@ impl RpcConnection {

let conn_header_buf = conn_header.encode_length_delimited_to_vec();

let mut msg_header = common::RequestHeaderProto::default();
msg_header.method_name = method_name.to_string();
msg_header.declaring_class_protocol_name = PROTOCOL.to_string();
msg_header.client_protocol_version = 1;

let msg_header = common::RequestHeaderProto {
method_name: method_name.to_string(),
declaring_class_protocol_name: PROTOCOL.to_string(),
client_protocol_version: 1,
};
debug!("RPC request header: {:?}", msg_header);

let header_buf = msg_header.encode_length_delimited_to_vec();
Expand Down Expand Up @@ -370,9 +374,11 @@ impl Packet {
bytes_per_checksum: u32,
max_packet_size: u32,
) -> Self {
let mut header = hdfs::PacketHeaderProto::default();
header.offset_in_block = offset;
header.seqno = seqno;
let header = hdfs::PacketHeaderProto {
offset_in_block: offset,
seqno,
..Default::default()
};

let num_chunks = Self::max_packet_chunks(bytes_per_checksum, max_packet_size);

Expand All @@ -394,8 +400,8 @@ impl Packet {
fn max_packet_chunks(bytes_per_checksum: u32, max_packet_size: u32) -> usize {
let data_size = max_packet_size as usize - MAX_PACKET_HEADER_SIZE;
let chunk_size = bytes_per_checksum as usize + CHECKSUM_BYTES;
let chunks = data_size / chunk_size;
chunks

data_size / chunk_size
}

pub(crate) fn write(&mut self, buf: &mut Bytes) {
Expand Down Expand Up @@ -470,7 +476,7 @@ pub(crate) struct DatanodeConnection {

impl DatanodeConnection {
pub(crate) async fn connect(url: &str) -> Result<Self> {
let stream = connect(&url).await?;
let stream = connect(url).await?;

let (reader, writer) = stream.into_split();

Expand Down Expand Up @@ -499,14 +505,16 @@ impl DatanodeConnection {
block: &hdfs::ExtendedBlockProto,
token: Option<common::TokenProto>,
) -> hdfs::ClientOperationHeaderProto {
let mut base_header = hdfs::BaseHeaderProto::default();
base_header.block = block.clone();
base_header.token = token;
let base_header = hdfs::BaseHeaderProto {
block: block.clone(),
token,
..Default::default()
};

let mut header = hdfs::ClientOperationHeaderProto::default();
header.base_header = base_header;
header.client_name = self.client_name.clone();
header
hdfs::ClientOperationHeaderProto {
base_header,
client_name: self.client_name.clone(),
}
}

pub(crate) async fn read_block_op_response(&mut self) -> Result<hdfs::BlockOpResponseProto> {
Expand Down Expand Up @@ -624,13 +632,10 @@ mod test {
#[test]
fn test_max_packet_header_size() {
// Create a dummy header to get its size
let mut header = hdfs::PacketHeaderProto::default();
header.offset_in_block = 0;
header.seqno = 0;
header.data_len = 0;
header.last_packet_in_block = false;
header.sync_block = Some(false);

let header = hdfs::PacketHeaderProto {
sync_block: Some(false),
..Default::default()
};
// Add 4 bytes for size of whole packet and 2 bytes for size of header
assert_eq!(MAX_PACKET_HEADER_SIZE, header.encoded_len() + 4 + 2);
}
Expand Down
Loading
Loading