Skip to content

Commit

Permalink
Supported router based federation with context tracking (#40)
Browse files Browse the repository at this point in the history
* Add RBF state support
  • Loading branch information
Kimahriman authored Dec 4, 2023
1 parent 5a96327 commit dd0d913
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 95 deletions.
17 changes: 14 additions & 3 deletions rust/minidfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,23 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>3.3.5</version>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>3.3.5</version>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-rbf</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-rbf</artifactId>
<version>3.3.6</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand All @@ -40,7 +51,7 @@
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<mainClass>Main</mainClass>
<mainClass>main.Main</mainClass>
</configuration>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
package main;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
Expand All @@ -15,6 +17,10 @@
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.Credentials;
Expand Down Expand Up @@ -58,83 +64,116 @@ public static void main(String args[]) throws Exception {
conf.set(DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY, "true");
}

MiniDFSNNTopology nnTopology = generateTopology(flags, conf);
HdfsConfiguration hdfsConf = new HdfsConfiguration(conf);

int numDataNodes = 1;
if (flags.contains("ec")) {
// Enough for the largest EC policy
numDataNodes = 14;
}
MiniDFSCluster dfs = null;
StateStoreDFSCluster routerDfs = null;
if (flags.contains("rbf")) {
routerDfs = new StateStoreDFSCluster(false, 2);

HdfsConfiguration hdfsConf = new HdfsConfiguration(conf);
MiniDFSCluster dfs = new MiniDFSCluster.Builder(hdfsConf)
.nameNodePort(9000)
.nameNodeHttpPort(9870)
.nnTopology(nnTopology)
.numDataNodes(numDataNodes)
.build();
Configuration routerOverrides = new RouterConfigBuilder()
.stateStore()
.rpc()
.build();

if (flags.contains("viewfs")) {
hdfsConf.set(FS_DEFAULT_NAME_KEY, "viewfs://minidfs-viewfs");
} else if (flags.contains("ha")) {
hdfsConf.set(FS_DEFAULT_NAME_KEY, "hdfs://minidfs-ns");
} else {
hdfsConf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:9000");
}
routerDfs.addRouterOverrides(routerOverrides);
routerDfs.startCluster(hdfsConf);
routerDfs.startRouters();

hdfsConf.writeXml(new FileOutputStream("target/test/core-site.xml"));
dfs.waitActive();
RouterContext routerContext = routerDfs.getRandomRouter();
StateStoreService stateStore = routerContext.getRouter().getStateStore();
routerDfs.createTestMountTable(stateStore);

int activeNamenode = 0;
if (flags.contains("viewfs")) {
// Each name services has two namenodes
dfs.transitionToActive(0);
dfs.transitionToActive(2);
} else if (flags.contains("ha")) {
activeNamenode = 2;
// dfs.transitionToObserver(1);
dfs.transitionToActive(activeNamenode);
}
routerDfs.waitClusterUp();

if (flags.contains("ec")) {
DistributedFileSystem fs = dfs.getFileSystem(activeNamenode);
fs.enableErasureCodingPolicy("RS-3-2-1024k");
fs.enableErasureCodingPolicy("RS-10-4-1024k");
fs.mkdirs(new Path("/ec-3-2"), new FsPermission("755"));
fs.mkdirs(new Path("/ec-6-3"), new FsPermission("755"));
fs.mkdirs(new Path("/ec-10-4"), new FsPermission("755"));
fs.setErasureCodingPolicy(new Path("/ec-3-2"), "RS-3-2-1024k");
fs.setErasureCodingPolicy(new Path("/ec-6-3"), "RS-6-3-1024k");
fs.setErasureCodingPolicy(new Path("/ec-10-4"), "RS-10-4-1024k");
}
hdfsConf.addResource(routerDfs.generateClientConfiguration());
hdfsConf.addResource(routerDfs.getRouterClientConf());
hdfsConf.set(FS_DEFAULT_NAME_KEY, "hdfs://fed");
} else {
MiniDFSNNTopology nnTopology = generateTopology(flags, hdfsConf);

if (flags.contains("token")) {
Credentials creds = new Credentials();
if (flags.contains("ha")) {
System.err.println("Getting token from namenode! " + dfs.getNameNode(2).getTokenServiceName());
Token<DelegationTokenIdentifier> token = dfs.getNameNodeRpc(2).getDelegationToken(null);
token.setService(new Text("ha-hdfs:minidfs-ns"));
creds.addToken(new Text("ha-hdfs:minidfs-ns"), token);
int numDataNodes = 1;
if (flags.contains("ec")) {
// Enough for the largest EC policy
numDataNodes = 14;
}

dfs = new MiniDFSCluster.Builder(hdfsConf)
.nameNodePort(9000)
.nameNodeHttpPort(9870)
.nnTopology(nnTopology)
.numDataNodes(numDataNodes)
.build();

if (flags.contains("viewfs")) {
hdfsConf.set(FS_DEFAULT_NAME_KEY, "viewfs://minidfs-viewfs");
} else if (flags.contains("ha")) {
hdfsConf.set(FS_DEFAULT_NAME_KEY, "hdfs://minidfs-ns");
} else {
System.err.println("Getting token from namenode! " + dfs.getNameNode().getTokenServiceName());
Token<DelegationTokenIdentifier> token = dfs.getNameNodeRpc().getDelegationToken(null);
token.setService(new Text(dfs.getNameNode().getTokenServiceName()));
creds.addToken(new Text(dfs.getNameNode().getTokenServiceName()), token);
hdfsConf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:9000");
}

dfs.waitActive();

int activeNamenode = 0;
if (flags.contains("viewfs")) {
// Each name services has two namenodes
dfs.transitionToActive(0);
dfs.transitionToActive(2);
} else if (flags.contains("ha")) {
activeNamenode = 2;
// dfs.transitionToObserver(1);
dfs.transitionToActive(activeNamenode);
}

if (flags.contains("ec")) {
DistributedFileSystem fs = dfs.getFileSystem(activeNamenode);
fs.enableErasureCodingPolicy("RS-3-2-1024k");
fs.enableErasureCodingPolicy("RS-10-4-1024k");
fs.mkdirs(new Path("/ec-3-2"), new FsPermission("755"));
fs.mkdirs(new Path("/ec-6-3"), new FsPermission("755"));
fs.mkdirs(new Path("/ec-10-4"), new FsPermission("755"));
fs.setErasureCodingPolicy(new Path("/ec-3-2"), "RS-3-2-1024k");
fs.setErasureCodingPolicy(new Path("/ec-6-3"), "RS-6-3-1024k");
fs.setErasureCodingPolicy(new Path("/ec-10-4"), "RS-10-4-1024k");
}

if (flags.contains("token")) {
Credentials creds = new Credentials();
if (flags.contains("ha")) {
System.err.println("Getting token from namenode! " + dfs.getNameNode(2).getTokenServiceName());
Token<DelegationTokenIdentifier> token = dfs.getNameNodeRpc(2).getDelegationToken(null);
token.setService(new Text("ha-hdfs:minidfs-ns"));
creds.addToken(new Text("ha-hdfs:minidfs-ns"), token);
} else {
System.err.println("Getting token from namenode! " + dfs.getNameNode().getTokenServiceName());
Token<DelegationTokenIdentifier> token = dfs.getNameNodeRpc().getDelegationToken(null);
token.setService(new Text(dfs.getNameNode().getTokenServiceName()));
creds.addToken(new Text(dfs.getNameNode().getTokenServiceName()), token);
}

DataOutputStream os = new DataOutputStream(new FileOutputStream("target/test/delegation_token"));
creds.writeTokenStorageToStream(os, SerializedFormat.WRITABLE);
os.close();
}

DataOutputStream os = new DataOutputStream(new FileOutputStream("target/test/delegation_token"));
creds.writeTokenStorageToStream(os, SerializedFormat.WRITABLE);
os.close();
}

hdfsConf.writeXml(new FileOutputStream("target/test/core-site.xml"));

System.out.println("Ready!");
if (flags.contains("security")) {
System.out.println(kdc.getKrb5conf().toPath().toString());
}

BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
reader.readLine();
dfs.close();

if (dfs != null) {
dfs.close();
}
if (routerDfs != null) {
routerDfs.shutdown();
}

if (flags.contains("security")) {
kdc.stop();
Expand All @@ -149,13 +188,15 @@ public static MiniDFSNNTopology generateTopology(Set<String> flags, Configuratio
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + ".ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
conf.set(DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, "true");
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, "true");
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
conf.set(CONFIG_VIEWFS_PREFIX + ".minidfs-viewfs.link./mount1", "hdfs://ns0/nested");
conf.set(CONFIG_VIEWFS_PREFIX + ".minidfs-viewfs.linkFallback", "hdfs://ns1/nested");
} else if (flags.contains("ha")) {
nnTopology = MiniDFSNNTopology.simpleHATopology(3);
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + ".minidfs-ns", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
conf.set(DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, "true");
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, "true");
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
}
return nnTopology;
}
Expand Down
Loading

0 comments on commit dd0d913

Please sign in to comment.