diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java index 39dac46dd544e..ebb3763f7a32a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java @@ -2108,4 +2108,23 @@ public static void maybeIgnoreMissingDirectory(FileSystem fs, LOG.info("Ignoring missing directory {}", path); LOG.debug("Directory missing", e); } + + /** + * Return true if the FS implements {@link WithErasureCoding} and + * supports EC_POLICY option in {@link Options.OpenFileOptions}. + * A message is logged when the filesystem does not support Erasure coding. + * @param fs filesystem + * @param path path + * @return true if the Filesystem supports EC + * @throws IOException if there is a failure in hasPathCapability call + */ + public static boolean checkFSSupportsEC(FileSystem fs, Path path) throws IOException { + if (fs instanceof WithErasureCoding && + fs.hasPathCapability(path, Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY)) { + return true; + } + LOG.warn("Filesystem with scheme {} does not support Erasure Coding" + + " at path {}", fs.getScheme(), path); + return false; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java index b59d2f3be1526..f473e9427ba5d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java @@ -704,5 +704,10 @@ private OpenFileOptions() { FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) .collect(Collectors.toSet())); + /** + * EC policy to be set on the file that needs to be created : {@value}. + */ + public static final String FS_OPTION_OPENFILE_EC_POLICY = + FS_OPTION_OPENFILE + "ec.policy"; } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WithErasureCoding.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WithErasureCoding.java new file mode 100644 index 0000000000000..5f8a7fbad6ea3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WithErasureCoding.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; + +/** + * Filesystems that support EC can implement this interface. + */ +public interface WithErasureCoding { + + /** + * Get the EC Policy name of the given file's fileStatus. + * If the file is not erasure coded, this shall return null. + * Callers will make sure to check if fileStatus isInstance of + * an FS that implements this interface. + * If the call fails due to some error, this shall return null. + * @param fileStatus object of the file whose ecPolicy needs to be obtained. + * @return the ec Policy name + */ + String getErasureCodingPolicyName(FileStatus fileStatus); + + /** + * Set the given ecPolicy on the path. + * The path and ecPolicyName should be valid (not null/empty, the + * implementing FS shall support the supplied ecPolicy). + * implementations can throw IOException if these conditions are not met. + * @param path on which the EC policy needs to be set. + * @param ecPolicyName the EC policy. + * @throws IOException if there is an error during the set op. + */ + void setErasureCodingPolicy(Path path, String ecPolicyName) throws + IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 17c39f6c55b75..dac205158d0f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -74,6 +74,7 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.WithErasureCoding; import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.hdfs.client.DfsPathCapabilities; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -146,7 +147,8 @@ @InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" }) @InterfaceStability.Unstable public class DistributedFileSystem extends FileSystem - implements KeyProviderTokenIssuer, BatchListingOperations, LeaseRecoverable, SafeMode { + implements KeyProviderTokenIssuer, BatchListingOperations, LeaseRecoverable, SafeMode, + WithErasureCoding { private Path workingDir; private URI uri; @@ -376,6 +378,14 @@ public FSDataInputStream open(PathHandle fd, int bufferSize) return dfs.createWrappedInputStream(dfsis); } + @Override + public String getErasureCodingPolicyName(FileStatus fileStatus) { + if (!(fileStatus instanceof HdfsFileStatus)) { + return null; + } + return ((HdfsFileStatus) fileStatus).getErasureCodingPolicy().getName(); + } + /** * Create a handle to an HDFS file. * @param st HdfsFileStatus instance from NameNode @@ -3862,6 +3872,10 @@ protected EnumSet getFlags() { */ @Override public FSDataOutputStream build() throws IOException { + String ecPolicy = getOptions().get(Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY, ""); + if (!ecPolicy.isEmpty()) { + ecPolicyName(ecPolicy); + } if (getFlags().contains(CreateFlag.CREATE) || getFlags().contains(CreateFlag.OVERWRITE)) { if (isRecursive()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/DfsPathCapabilities.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/DfsPathCapabilities.java index 612a977630327..b779e42014f1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/DfsPathCapabilities.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/DfsPathCapabilities.java @@ -21,6 +21,7 @@ import java.util.Optional; import org.apache.hadoop.fs.CommonPathCapabilities; +import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -54,6 +55,7 @@ public static Optional hasPathCapability(final Path path, case CommonPathCapabilities.FS_STORAGEPOLICY: case CommonPathCapabilities.FS_XATTRS: case CommonPathCapabilities.FS_TRUNCATE: + case Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY: return Optional.of(true); case CommonPathCapabilities.FS_SYMLINKS: return Optional.of(FileSystem.areSymlinksEnabled()); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index ad17e574ca9b8..904b297c2b09a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -205,7 +205,8 @@ public void map(Text relPath, CopyListingFileStatus sourceFileStatus, } if (sourceCurrStatus.isDirectory()) { - createTargetDirsWithRetry(description, target, context, sourceStatus); + createTargetDirsWithRetry(description, target, context, sourceStatus, + sourceFS); return; } @@ -295,10 +296,10 @@ private void copyFileWithRetry(String description, } private void createTargetDirsWithRetry(String description, Path target, - Context context, FileStatus sourceStatus) throws IOException { + Context context, FileStatus sourceStatus, FileSystem sourceFS) throws IOException { try { - new RetriableDirectoryCreateCommand(description).execute(target, - context, sourceStatus); + new RetriableDirectoryCreateCommand(description).execute(target, context, + sourceStatus, sourceFS); } catch (Exception e) { throw new IOException("mkdir failed for " + target, e); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java index f5d9246e6a3bf..2b50d63e2f0cf 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java @@ -18,16 +18,20 @@ package org.apache.hadoop.tools.mapred; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.WithErasureCoding; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.util.RetriableCommand; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapreduce.Mapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.fs.FileUtil.checkFSSupportsEC; import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings; /** @@ -36,6 +40,9 @@ */ public class RetriableDirectoryCreateCommand extends RetriableCommand { + private static final Logger LOG = + LoggerFactory.getLogger(RetriableDirectoryCreateCommand.class); + /** * Constructor, taking a description of the action. * @param description Verbose description of the copy operation. @@ -53,10 +60,11 @@ public RetriableDirectoryCreateCommand(String description) { */ @Override protected Object doExecute(Object... arguments) throws Exception { - assert arguments.length == 3 : "Unexpected argument list."; + assert arguments.length == 4 : "Unexpected argument list."; Path target = (Path)arguments[0]; Mapper.Context context = (Mapper.Context)arguments[1]; FileStatus sourceStatus = (FileStatus)arguments[2]; + FileSystem sourceFs = (FileSystem)arguments[3]; FileSystem targetFS = target.getFileSystem(context.getConfiguration()); if(!targetFS.mkdirs(target)) { @@ -66,11 +74,16 @@ protected Object doExecute(Object... arguments) throws Exception { boolean preserveEC = getFileAttributeSettings(context) .contains(DistCpOptions.FileAttribute.ERASURECODINGPOLICY); if (preserveEC && sourceStatus.isErasureCoded() - && targetFS instanceof DistributedFileSystem) { - ErasureCodingPolicy ecPolicy = - ((HdfsFileStatus) sourceStatus).getErasureCodingPolicy(); - DistributedFileSystem dfs = (DistributedFileSystem) targetFS; - dfs.setErasureCodingPolicy(target, ecPolicy.getName()); + && checkFSSupportsEC(sourceFs, sourceStatus.getPath()) + && checkFSSupportsEC(targetFS, target)) { + ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getByName( + ((WithErasureCoding) sourceFs).getErasureCodingPolicyName( + sourceStatus)); + LOG.debug("EC Policy for source path is {}", ecPolicy); + WithErasureCoding ecFs = (WithErasureCoding) targetFS; + if (ecPolicy != null) { + ecFs.setErasureCodingPolicy(target, ecPolicy.getName()); + } } return true; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 84bb001008637..fc3109ee2cecd 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -24,9 +24,6 @@ import java.util.EnumSet; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.tools.DistCpOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,9 +33,11 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.WithErasureCoding; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.tools.CopyListingFileStatus; @@ -52,8 +51,10 @@ import org.apache.hadoop.classification.VisibleForTesting; +import static org.apache.hadoop.fs.FileUtil.checkFSSupportsEC; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY; import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; @@ -151,8 +152,8 @@ private long doCopy(CopyListingFileStatus source, Path target, long offset = (action == FileAction.APPEND) ? targetFS.getFileStatus(target).getLen() : source.getChunkOffset(); - long bytesRead = copyToFile(targetPath, targetFS, source, - offset, context, fileAttributes, sourceChecksum, sourceStatus); + long bytesRead = copyToFile(targetPath, targetFS, source, offset, context, + fileAttributes, sourceChecksum, sourceStatus, sourceFS); if (!source.isSplit()) { DistCpUtils.compareFileLengthsAndChecksums(source.getLen(), sourceFS, @@ -195,7 +196,7 @@ private ChecksumOpt getChecksumOpt(EnumSet fileAttributes, private long copyToFile(Path targetPath, FileSystem targetFS, CopyListingFileStatus source, long sourceOffset, Mapper.Context context, EnumSet fileAttributes, final FileChecksum sourceChecksum, - FileStatus sourceStatus) + FileStatus sourceStatus,FileSystem sourceFS) throws IOException { FsPermission permission = FsPermission.getFileDefault().applyUMask( FsPermission.getUMask(targetFS.getConf())); @@ -205,11 +206,11 @@ private long copyToFile(Path targetPath, FileSystem targetFS, boolean preserveEC = getFileAttributeSettings(context) .contains(DistCpOptions.FileAttribute.ERASURECODINGPOLICY); - ErasureCodingPolicy ecPolicy = null; + String ecPolicyName = null; if (preserveEC && sourceStatus.isErasureCoded() - && sourceStatus instanceof HdfsFileStatus - && targetFS instanceof DistributedFileSystem) { - ecPolicy = ((HdfsFileStatus) sourceStatus).getErasureCodingPolicy(); + && checkFSSupportsEC(sourceFS, sourceStatus.getPath()) + && checkFSSupportsEC(targetFS, targetPath)) { + ecPolicyName = ((WithErasureCoding) sourceFS).getErasureCodingPolicyName(sourceStatus); } final OutputStream outStream; if (action == FileAction.OVERWRITE) { @@ -222,21 +223,21 @@ private long copyToFile(Path targetPath, FileSystem targetFS, targetFS, targetPath); FSDataOutputStream out; ChecksumOpt checksumOpt = getChecksumOpt(fileAttributes, sourceChecksum); - if (!preserveEC || ecPolicy == null) { + if (!preserveEC || ecPolicyName == null) { out = targetFS.create(targetPath, permission, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), copyBufferSize, repl, blockSize, context, checksumOpt); } else { - DistributedFileSystem dfs = (DistributedFileSystem) targetFS; - DistributedFileSystem.HdfsDataOutputStreamBuilder builder = - dfs.createFile(targetPath).permission(permission).create() - .overwrite(true).bufferSize(copyBufferSize).replication(repl) - .blockSize(blockSize).progress(context).recursive() - .ecPolicyName(ecPolicy.getName()); - if (checksumOpt != null) { - builder.checksumOpt(checksumOpt); - } - out = builder.build(); + FSDataOutputStreamBuilder builder = targetFS.createFile(targetPath) + .permission(permission) + .overwrite(true) + .bufferSize(copyBufferSize) + .replication(repl) + .blockSize(blockSize) + .progress(context) + .recursive(); + builder.opt(FS_OPTION_OPENFILE_EC_POLICY, ecPolicyName); + out = builder.build(); } outStream = new BufferedOutputStream(out); } else { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java index 841501869b5e4..6c6e5e78b9021 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java @@ -18,12 +18,21 @@ package org.apache.hadoop.tools; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.WithErasureCoding; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -41,8 +50,10 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; +import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; /** @@ -68,12 +79,17 @@ public class TestDistCpWithRawXAttrs { private static final String rootedSrcName = "/src"; private static final String rawDestName = "/.reserved/raw/dest"; private static final String rawSrcName = "/.reserved/raw/src"; + private static final File base = + GenericTestUtils.getTestDir("work-dir/localfs"); + + private static final String TEST_ROOT_DIR = base.getAbsolutePath(); @BeforeClass public static void init() throws Exception { conf = new Configuration(); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true); conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2); + conf.setClass("fs.file.impl", DummyEcFs.class, FileSystem.class); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true) .build(); cluster.waitActive(); @@ -240,6 +256,120 @@ public void testPreserveAndNoPreserveEC() throws Exception { dfs.unsetErasureCodingPolicy(dir1); } + + @Test + public void testPreserveECAcrossFilesystems() throws Exception{ + // set EC policy on source (HDFS) + String[] args = {"-setPolicy", "-path", dir1.toString(), + "-policy", "XOR-2-1-1024k"}; + fs.delete(new Path("/dest"), true); + fs.mkdirs(subDir1); + DistributedFileSystem dfs = (DistributedFileSystem) fs; + dfs.enableErasureCodingPolicy("XOR-2-1-1024k"); + dfs.setErasureCodingPolicy(dir1, "XOR-2-1-1024k"); + fs.create(file1).close(); + int res = ToolRunner.run(conf, new ECAdmin(conf), args); + assertEquals("Unable to set EC policy on " + subDir1.toString(), 0, res); + String src = "/src/*"; + Path dest = new Path(TEST_ROOT_DIR, "dest"); + final Path dest2Dir1 = new Path(dest, "dir1"); + final Path dest2SubDir1 = new Path(dest2Dir1, "subdir1"); + + // copy source(HDFS) to target(DummyECFS) with preserveEC + + try (DummyEcFs dummyEcFs = (DummyEcFs)FileSystem.get(URI.create("file:///"), conf)) { + Path target = dummyEcFs.makeQualified(dest); + DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src, target.toString(), + "-pe", conf); + try { + FileStatus destDir1Status = dummyEcFs.getFileStatus(dest2Dir1); + FileStatus destSubDir1Status = dummyEcFs.getFileStatus(dest2SubDir1); + assertNotNull("FileStatus for path: " + dest2Dir1 + " is null", destDir1Status); + assertNotNull("FileStatus for path: " + dest2SubDir1 + " is null", destSubDir1Status); + // check if target paths are erasure coded. + assertTrue("Path is not erasure coded : " + dest2Dir1, + dummyEcFs.isPathErasureCoded(destDir1Status.getPath())); + assertTrue("Path is not erasure coded : " + dest2SubDir1, + dummyEcFs.isPathErasureCoded(destSubDir1Status.getPath())); + + // copy source(DummyECFS) to target (HDFS) + String dfsTarget = "/dest"; + DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, + target.toString(), dfsTarget, "-pe", conf); + Path dfsTargetPath = new Path(dfsTarget); + Path dfsTargetDir1 = new Path(dfsTarget, "dir1"); + ContractTestUtils.assertPathExists(fs, + "Path doesn't exist:" + dfsTargetPath, dfsTargetPath); + ContractTestUtils.assertPathExists(fs, + "Path doesn't exist:" + dfsTargetDir1, dfsTargetDir1); + FileStatus targetDir1Status = fs.getFileStatus(dfsTargetDir1); + assertTrue("Path is not erasure coded : " + targetDir1Status, + targetDir1Status.isErasureCoded()); + fs.delete(dfsTargetPath, true); + } finally { + dummyEcFs.delete(new Path(base.getAbsolutePath()),true); + } + } + + } + + /** + * Dummy/Fake FS implementation that supports Erasure Coding. + */ + public static class DummyEcFs extends LocalFileSystem implements WithErasureCoding { + + private Set erasureCodedPaths; + public DummyEcFs() { + super(); + this.erasureCodedPaths = new HashSet<>(); + } + + public boolean isPathErasureCoded(Path p){ + return erasureCodedPaths.contains(p); + } + + + @Override + public boolean hasPathCapability(Path path, String capability) + throws IOException { + switch (validatePathCapabilityArgs(makeQualified(path), capability)) { + case Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY: + return true; + default: + return super.hasPathCapability(path, capability); + } + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + FileStatus fileStatus = super.getFileStatus(f); + if (!erasureCodedPaths.contains(f)) { + return fileStatus; + } + Set attrSet = new HashSet<>(); + attrSet.add(FileStatus.AttrFlags.HAS_EC); + return new FileStatus(fileStatus.getLen(), fileStatus.isDirectory(), + fileStatus.getReplication(), fileStatus.getBlockSize(), + fileStatus.getModificationTime(), fileStatus.getAccessTime(), + fileStatus.getPermission(), fileStatus.getOwner(), + fileStatus.getGroup(), + fileStatus.isSymlink() ? fileStatus.getSymlink() : null, + fileStatus.getPath(), + attrSet); + } + + @Override + public String getErasureCodingPolicyName(FileStatus fileStatus) { + return "XOR-2-1-1024k"; + } + + @Override + public void setErasureCodingPolicy(Path path, String ecPolicyName) + throws IOException { + erasureCodedPaths.add(path); + } + } + @Test public void testUseIterator() throws Exception {