diff --git a/rust/minidfs/pom.xml b/rust/minidfs/pom.xml
index c95ded0..3f984db 100644
--- a/rust/minidfs/pom.xml
+++ b/rust/minidfs/pom.xml
@@ -11,12 +11,23 @@
org.apache.hadoop
hadoop-minicluster
- 3.3.5
+ 3.3.6
org.apache.hadoop
hadoop-minikdc
- 3.3.5
+ 3.3.6
+
+
+ org.apache.hadoop
+ hadoop-hdfs-rbf
+ 3.3.6
+
+
+ org.apache.hadoop
+ hadoop-hdfs-rbf
+ 3.3.6
+ test-jar
junit
@@ -40,7 +51,7 @@
exec-maven-plugin
3.0.0
- Main
+ main.Main
diff --git a/rust/minidfs/src/main/java/Main.java b/rust/minidfs/src/main/java/main/Main.java
similarity index 50%
rename from rust/minidfs/src/main/java/Main.java
rename to rust/minidfs/src/main/java/main/Main.java
index ec7dc33..2201167 100644
--- a/rust/minidfs/src/main/java/Main.java
+++ b/rust/minidfs/src/main/java/main/Main.java
@@ -1,3 +1,5 @@
+package main;
+
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
@@ -15,6 +17,10 @@
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.Credentials;
@@ -58,75 +64,102 @@ public static void main(String args[]) throws Exception {
conf.set(DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY, "true");
}
- MiniDFSNNTopology nnTopology = generateTopology(flags, conf);
+ HdfsConfiguration hdfsConf = new HdfsConfiguration(conf);
- int numDataNodes = 1;
- if (flags.contains("ec")) {
- // Enough for the largest EC policy
- numDataNodes = 14;
- }
+ MiniDFSCluster dfs = null;
+ StateStoreDFSCluster routerDfs = null;
+ if (flags.contains("rbf")) {
+ routerDfs = new StateStoreDFSCluster(false, 2);
- HdfsConfiguration hdfsConf = new HdfsConfiguration(conf);
- MiniDFSCluster dfs = new MiniDFSCluster.Builder(hdfsConf)
- .nameNodePort(9000)
- .nameNodeHttpPort(9870)
- .nnTopology(nnTopology)
- .numDataNodes(numDataNodes)
- .build();
+ Configuration routerOverrides = new RouterConfigBuilder()
+ .stateStore()
+ .rpc()
+ .build();
- if (flags.contains("viewfs")) {
- hdfsConf.set(FS_DEFAULT_NAME_KEY, "viewfs://minidfs-viewfs");
- } else if (flags.contains("ha")) {
- hdfsConf.set(FS_DEFAULT_NAME_KEY, "hdfs://minidfs-ns");
- } else {
- hdfsConf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:9000");
- }
+ routerDfs.addRouterOverrides(routerOverrides);
+ routerDfs.startCluster(hdfsConf);
+ routerDfs.startRouters();
- hdfsConf.writeXml(new FileOutputStream("target/test/core-site.xml"));
- dfs.waitActive();
+ RouterContext routerContext = routerDfs.getRandomRouter();
+ StateStoreService stateStore = routerContext.getRouter().getStateStore();
+ routerDfs.createTestMountTable(stateStore);
- int activeNamenode = 0;
- if (flags.contains("viewfs")) {
- // Each name services has two namenodes
- dfs.transitionToActive(0);
- dfs.transitionToActive(2);
- } else if (flags.contains("ha")) {
- activeNamenode = 2;
- // dfs.transitionToObserver(1);
- dfs.transitionToActive(activeNamenode);
- }
+ routerDfs.waitClusterUp();
- if (flags.contains("ec")) {
- DistributedFileSystem fs = dfs.getFileSystem(activeNamenode);
- fs.enableErasureCodingPolicy("RS-3-2-1024k");
- fs.enableErasureCodingPolicy("RS-10-4-1024k");
- fs.mkdirs(new Path("/ec-3-2"), new FsPermission("755"));
- fs.mkdirs(new Path("/ec-6-3"), new FsPermission("755"));
- fs.mkdirs(new Path("/ec-10-4"), new FsPermission("755"));
- fs.setErasureCodingPolicy(new Path("/ec-3-2"), "RS-3-2-1024k");
- fs.setErasureCodingPolicy(new Path("/ec-6-3"), "RS-6-3-1024k");
- fs.setErasureCodingPolicy(new Path("/ec-10-4"), "RS-10-4-1024k");
- }
+ hdfsConf.addResource(routerDfs.generateClientConfiguration());
+ hdfsConf.addResource(routerDfs.getRouterClientConf());
+ hdfsConf.set(FS_DEFAULT_NAME_KEY, "hdfs://fed");
+ } else {
+ MiniDFSNNTopology nnTopology = generateTopology(flags, hdfsConf);
- if (flags.contains("token")) {
- Credentials creds = new Credentials();
- if (flags.contains("ha")) {
- System.err.println("Getting token from namenode! " + dfs.getNameNode(2).getTokenServiceName());
- Token token = dfs.getNameNodeRpc(2).getDelegationToken(null);
- token.setService(new Text("ha-hdfs:minidfs-ns"));
- creds.addToken(new Text("ha-hdfs:minidfs-ns"), token);
+ int numDataNodes = 1;
+ if (flags.contains("ec")) {
+ // Enough for the largest EC policy
+ numDataNodes = 14;
+ }
+
+ dfs = new MiniDFSCluster.Builder(hdfsConf)
+ .nameNodePort(9000)
+ .nameNodeHttpPort(9870)
+ .nnTopology(nnTopology)
+ .numDataNodes(numDataNodes)
+ .build();
+
+ if (flags.contains("viewfs")) {
+ hdfsConf.set(FS_DEFAULT_NAME_KEY, "viewfs://minidfs-viewfs");
+ } else if (flags.contains("ha")) {
+ hdfsConf.set(FS_DEFAULT_NAME_KEY, "hdfs://minidfs-ns");
} else {
- System.err.println("Getting token from namenode! " + dfs.getNameNode().getTokenServiceName());
- Token token = dfs.getNameNodeRpc().getDelegationToken(null);
- token.setService(new Text(dfs.getNameNode().getTokenServiceName()));
- creds.addToken(new Text(dfs.getNameNode().getTokenServiceName()), token);
+ hdfsConf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:9000");
+ }
+
+ dfs.waitActive();
+
+ int activeNamenode = 0;
+ if (flags.contains("viewfs")) {
+ // Each name services has two namenodes
+ dfs.transitionToActive(0);
+ dfs.transitionToActive(2);
+ } else if (flags.contains("ha")) {
+ activeNamenode = 2;
+ // dfs.transitionToObserver(1);
+ dfs.transitionToActive(activeNamenode);
+ }
+
+ if (flags.contains("ec")) {
+ DistributedFileSystem fs = dfs.getFileSystem(activeNamenode);
+ fs.enableErasureCodingPolicy("RS-3-2-1024k");
+ fs.enableErasureCodingPolicy("RS-10-4-1024k");
+ fs.mkdirs(new Path("/ec-3-2"), new FsPermission("755"));
+ fs.mkdirs(new Path("/ec-6-3"), new FsPermission("755"));
+ fs.mkdirs(new Path("/ec-10-4"), new FsPermission("755"));
+ fs.setErasureCodingPolicy(new Path("/ec-3-2"), "RS-3-2-1024k");
+ fs.setErasureCodingPolicy(new Path("/ec-6-3"), "RS-6-3-1024k");
+ fs.setErasureCodingPolicy(new Path("/ec-10-4"), "RS-10-4-1024k");
+ }
+
+ if (flags.contains("token")) {
+ Credentials creds = new Credentials();
+ if (flags.contains("ha")) {
+ System.err.println("Getting token from namenode! " + dfs.getNameNode(2).getTokenServiceName());
+ Token token = dfs.getNameNodeRpc(2).getDelegationToken(null);
+ token.setService(new Text("ha-hdfs:minidfs-ns"));
+ creds.addToken(new Text("ha-hdfs:minidfs-ns"), token);
+ } else {
+ System.err.println("Getting token from namenode! " + dfs.getNameNode().getTokenServiceName());
+ Token token = dfs.getNameNodeRpc().getDelegationToken(null);
+ token.setService(new Text(dfs.getNameNode().getTokenServiceName()));
+ creds.addToken(new Text(dfs.getNameNode().getTokenServiceName()), token);
+ }
+
+ DataOutputStream os = new DataOutputStream(new FileOutputStream("target/test/delegation_token"));
+ creds.writeTokenStorageToStream(os, SerializedFormat.WRITABLE);
+ os.close();
}
-
- DataOutputStream os = new DataOutputStream(new FileOutputStream("target/test/delegation_token"));
- creds.writeTokenStorageToStream(os, SerializedFormat.WRITABLE);
- os.close();
}
+ hdfsConf.writeXml(new FileOutputStream("target/test/core-site.xml"));
+
System.out.println("Ready!");
if (flags.contains("security")) {
System.out.println(kdc.getKrb5conf().toPath().toString());
@@ -134,7 +167,13 @@ public static void main(String args[]) throws Exception {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
reader.readLine();
- dfs.close();
+
+ if (dfs != null) {
+ dfs.close();
+ }
+ if (routerDfs != null) {
+ routerDfs.shutdown();
+ }
if (flags.contains("security")) {
kdc.stop();
@@ -149,6 +188,7 @@ public static MiniDFSNNTopology generateTopology(Set flags, Configuratio
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + ".ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
conf.set(DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, "true");
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, "true");
+ conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
conf.set(CONFIG_VIEWFS_PREFIX + ".minidfs-viewfs.link./mount1", "hdfs://ns0/nested");
conf.set(CONFIG_VIEWFS_PREFIX + ".minidfs-viewfs.linkFallback", "hdfs://ns1/nested");
} else if (flags.contains("ha")) {
@@ -156,6 +196,7 @@ public static MiniDFSNNTopology generateTopology(Set flags, Configuratio
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + ".minidfs-ns", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
conf.set(DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, "true");
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, "true");
+ conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
}
return nnTopology;
}
diff --git a/rust/src/hdfs/connection.rs b/rust/src/hdfs/connection.rs
index 059d5d5..0bc73d4 100644
--- a/rust/src/hdfs/connection.rs
+++ b/rust/src/hdfs/connection.rs
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::default::Default;
use std::io::ErrorKind;
-use std::sync::atomic::{AtomicI32, AtomicI64, Ordering};
+use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
@@ -49,14 +49,56 @@ async fn connect(addr: &str) -> Result {
#[derive(Debug)]
pub(crate) struct AlignmentContext {
- state_id: AtomicI64,
- router_federated_state: Option>>>,
+ state_id: i64,
+ router_federated_state: Option>,
+}
+
+impl AlignmentContext {
+ fn update(
+ &mut self,
+ state_id: Option,
+ router_federated_state: Option>,
+ ) -> Result<()> {
+ if let Some(new_state_id) = state_id {
+ self.state_id = new_state_id
+ }
+
+ if let Some(new_router_state) = router_federated_state {
+ let new_map = hdfs::RouterFederatedStateProto::decode(Bytes::from(new_router_state))?
+ .namespace_state_ids;
+
+ let current_map = if let Some(cur) = self.router_federated_state.as_mut() {
+ cur
+ } else {
+ self.router_federated_state = Some(HashMap::new());
+ self.router_federated_state.as_mut().unwrap()
+ };
+
+ for (key, value) in new_map.into_iter() {
+ current_map.insert(
+ key.clone(),
+ i64::max(value, *current_map.get(&key).unwrap_or(&i64::MIN)),
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ fn encode_router_state(&self) -> Option> {
+ self.router_federated_state.as_ref().map(|state| {
+ hdfs::RouterFederatedStateProto {
+ namespace_state_ids: state.clone(),
+ }
+ .encode_to_vec()
+ })
+ }
}
impl Default for AlignmentContext {
fn default() -> Self {
Self {
- state_id: AtomicI64::new(i64::MIN),
+ state_id: i64::MIN,
router_federated_state: None,
}
}
@@ -69,7 +111,7 @@ pub(crate) struct RpcConnection {
client_id: Vec,
user_info: UserInfo,
next_call_id: AtomicI32,
- alignment_context: Arc,
+ alignment_context: Arc>,
call_map: Arc>>,
sender: mpsc::Sender>,
listener: Option>,
@@ -78,7 +120,7 @@ pub(crate) struct RpcConnection {
impl RpcConnection {
pub(crate) async fn connect(
url: &str,
- alignment_context: Arc,
+ alignment_context: Arc>,
nameservice: Option<&str>,
) -> Result {
let client_id = Uuid::new_v4().to_bytes_le().to_vec();
@@ -160,6 +202,8 @@ impl RpcConnection {
call_id: i32,
retry_count: i32,
) -> common::RpcRequestHeaderProto {
+ let context = self.alignment_context.lock().unwrap();
+
common::RpcRequestHeaderProto {
rpc_kind: Some(common::RpcKindProto::RpcProtocolBuffer as i32),
// RPC_FINAL_PACKET
@@ -167,12 +211,8 @@ impl RpcConnection {
call_id,
client_id: self.client_id.clone(),
retry_count: Some(retry_count),
- state_id: Some(self.alignment_context.state_id.load(Ordering::SeqCst)),
- router_federated_state: self
- .alignment_context
- .router_federated_state
- .as_ref()
- .map(|state| state.lock().unwrap().clone()),
+ state_id: Some(context.state_id),
+ router_federated_state: context.encode_router_state(),
..Default::default()
}
}
@@ -248,14 +288,14 @@ struct RpcListener {
call_map: Arc>>,
reader: SaslReader,
alive: bool,
- alignment_context: Arc,
+ alignment_context: Arc>,
}
impl RpcListener {
fn new(
call_map: Arc>>,
reader: SaslReader,
- alignment_context: Arc,
+ alignment_context: Arc>,
) -> Self {
RpcListener {
call_map,
@@ -299,14 +339,10 @@ impl RpcListener {
if let Some(call) = call {
match rpc_response.status() {
RpcStatusProto::Success => {
- if let Some(state_id) = rpc_response.state_id {
- self.alignment_context
- .state_id
- .fetch_max(state_id, Ordering::SeqCst);
- }
- if let Some(_router_federation_state) = rpc_response.router_federated_state {
- todo!();
- }
+ self.alignment_context
+ .lock()
+ .unwrap()
+ .update(rpc_response.state_id, rpc_response.router_federated_state)?;
let _ = call.send(Ok(bytes));
}
RpcStatusProto::Error => {
@@ -625,10 +661,14 @@ impl DatanodeConnection {
#[cfg(test)]
mod test {
+ use std::collections::HashMap;
+
use prost::Message;
use crate::{hdfs::connection::MAX_PACKET_HEADER_SIZE, proto::hdfs};
+ use super::AlignmentContext;
+
#[test]
fn test_max_packet_header_size() {
// Create a dummy header to get its size
@@ -639,4 +679,45 @@ mod test {
// Add 4 bytes for size of whole packet and 2 bytes for size of header
assert_eq!(MAX_PACKET_HEADER_SIZE, header.encoded_len() + 4 + 2);
}
+
+ fn encode_router_state(map: &HashMap) -> Vec {
+ hdfs::RouterFederatedStateProto {
+ namespace_state_ids: map.clone(),
+ }
+ .encode_to_vec()
+ }
+
+ #[test]
+ fn test_router_federated_state() {
+ let mut alignment_context = AlignmentContext::default();
+
+ assert!(alignment_context.router_federated_state.is_none());
+
+ let mut state_map = HashMap::::new();
+ state_map.insert("ns-1".to_string(), 3);
+
+ alignment_context
+ .update(None, Some(encode_router_state(&state_map)))
+ .unwrap();
+
+ assert!(alignment_context.router_federated_state.is_some());
+
+ let router_state = alignment_context.router_federated_state.as_ref().unwrap();
+
+ assert_eq!(router_state.len(), 1);
+ assert_eq!(*router_state.get("ns-1").unwrap(), 3);
+
+ state_map.insert("ns-1".to_string(), 5);
+ state_map.insert("ns-2".to_string(), 7);
+
+ alignment_context
+ .update(None, Some(encode_router_state(&state_map)))
+ .unwrap();
+
+ let router_state = alignment_context.router_federated_state.as_ref().unwrap();
+
+ assert_eq!(router_state.len(), 2);
+ assert_eq!(*router_state.get("ns-1").unwrap(), 5);
+ assert_eq!(*router_state.get("ns-2").unwrap(), 7);
+ }
}
diff --git a/rust/src/hdfs/proxy.rs b/rust/src/hdfs/proxy.rs
index a8763b5..9d7479e 100644
--- a/rust/src/hdfs/proxy.rs
+++ b/rust/src/hdfs/proxy.rs
@@ -1,12 +1,11 @@
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
- Arc,
+ Arc, Mutex,
};
use bytes::Bytes;
use log::warn;
use prost::Message;
-use tokio::sync::Mutex;
use url::Url;
use crate::{
@@ -26,14 +25,14 @@ const OBSERVER_RETRY_EXCEPTION: &str = "org.apache.hadoop.ipc.ObserverRetryOnAct
struct ProxyConnection {
url: String,
inner: Option,
- alignment_context: Arc,
+ alignment_context: Arc>,
nameservice: Option,
}
impl ProxyConnection {
fn new(
url: String,
- alignment_context: Arc,
+ alignment_context: Arc>,
nameservice: Option,
) -> Self {
ProxyConnection {
@@ -68,7 +67,7 @@ impl ProxyConnection {
#[derive(Debug)]
pub(crate) struct NameServiceProxy {
- proxy_connections: Vec>>,
+ proxy_connections: Vec>>,
current_index: AtomicUsize,
msycned: AtomicBool,
}
@@ -77,11 +76,11 @@ impl NameServiceProxy {
/// Creates a new proxy for a name service. If the URL contains a port,
/// it is assumed to be for a single NameNode.
pub(crate) fn new(nameservice: &Url, config: &Configuration) -> Self {
- let alignment_context = Arc::new(AlignmentContext::default());
+ let alignment_context = Arc::new(Mutex::new(AlignmentContext::default()));
let proxy_connections = if let Some(port) = nameservice.port() {
let url = format!("{}:{}", nameservice.host_str().unwrap(), port);
- vec![Arc::new(Mutex::new(ProxyConnection::new(
+ vec![Arc::new(tokio::sync::Mutex::new(ProxyConnection::new(
url,
alignment_context.clone(),
None,
@@ -92,7 +91,7 @@ impl NameServiceProxy {
.get_urls_for_nameservice(host)
.into_iter()
.map(|url| {
- Arc::new(Mutex::new(ProxyConnection::new(
+ Arc::new(tokio::sync::Mutex::new(ProxyConnection::new(
url,
alignment_context.clone(),
Some(host.to_string()),
@@ -114,7 +113,16 @@ impl NameServiceProxy {
if !self.msycned.fetch_or(true, Ordering::SeqCst) {
let msync_msg = hdfs::MsyncRequestProto::default();
self.call_inner("msync", msync_msg.encode_length_delimited_to_vec())
- .await?;
+ .await
+ .map(|_| ())
+ .or_else(|err| match err {
+ HdfsError::RPCError(class, _)
+ if class == "java.lang.UnsupportedOperationException" =>
+ {
+ Ok(())
+ }
+ _ => Err(err),
+ })?;
}
Ok(())
}
diff --git a/rust/src/minidfs.rs b/rust/src/minidfs.rs
index 88c7fbf..2a4decd 100644
--- a/rust/src/minidfs.rs
+++ b/rust/src/minidfs.rs
@@ -15,6 +15,7 @@ pub enum DfsFeatures {
HA,
VIEWFS,
EC,
+ RBF,
}
impl DfsFeatures {
@@ -26,6 +27,7 @@ impl DfsFeatures {
DfsFeatures::PRIVACY => "privacy",
DfsFeatures::SECURITY => "security",
DfsFeatures::TOKEN => "token",
+ DfsFeatures::RBF => "rbf",
}
}
@@ -120,6 +122,8 @@ impl MiniDfs {
let url = if features.contains(&DfsFeatures::VIEWFS) {
"viewfs://minidfs-viewfs"
+ } else if features.contains(&DfsFeatures::RBF) {
+ "hdfs://fed"
} else if features.contains(&DfsFeatures::HA) {
"hdfs://minidfs-ns"
} else {
diff --git a/rust/tests/test_integration.rs b/rust/tests/test_integration.rs
index 23c80c6..6d4bec8 100644
--- a/rust/tests/test_integration.rs
+++ b/rust/tests/test_integration.rs
@@ -5,7 +5,7 @@ mod common;
mod test {
use crate::common::{setup, TEST_FILE_INTS};
use bytes::{Buf, BufMut, BytesMut};
- use hdfs_native::{minidfs::DfsFeatures, Client, Result, WriteOptions};
+ use hdfs_native::{client::FileStatus, minidfs::DfsFeatures, Client, Result, WriteOptions};
use serial_test::serial;
use std::collections::HashSet;
@@ -93,6 +93,14 @@ mod test {
.unwrap();
}
+ #[tokio::test]
+ #[serial]
+ async fn test_rbf() {
+ test_with_features(&HashSet::from([DfsFeatures::RBF]))
+ .await
+ .unwrap();
+ }
+
pub async fn test_with_features(features: &HashSet) -> Result<()> {
let _ = env_logger::builder().is_test(true).try_init();
@@ -120,7 +128,13 @@ mod test {
}
async fn test_listing(client: &Client) -> Result<()> {
- let statuses = client.list_status("/", false).await?;
+ let statuses: Vec = client
+ .list_status("/", false)
+ .await?
+ .into_iter()
+ // Only include files, since federation things could result in folders being created
+ .filter(|s| !s.isdir)
+ .collect();
assert_eq!(statuses.len(), 1);
let status = &statuses[0];
assert_eq!(status.path, "/testfile");