Skip to content

Commit

Permalink
HDFS-17381. Distcp of EC files should not be limited to DFS. (apache#…
Browse files Browse the repository at this point in the history
…6551)


Contributed by Sadanand Shenoy
  • Loading branch information
sadanand48 authored Sep 25, 2024
1 parent 21ec686 commit 49a4958
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2108,4 +2108,23 @@ public static void maybeIgnoreMissingDirectory(FileSystem fs,
LOG.info("Ignoring missing directory {}", path);
LOG.debug("Directory missing", e);
}

/**
* Return true if the FS implements {@link WithErasureCoding} and
* supports EC_POLICY option in {@link Options.OpenFileOptions}.
* A message is logged when the filesystem does not support Erasure coding.
* @param fs filesystem
* @param path path
* @return true if the Filesystem supports EC
* @throws IOException if there is a failure in hasPathCapability call
*/
public static boolean checkFSSupportsEC(FileSystem fs, Path path) throws IOException {
if (fs instanceof WithErasureCoding &&
fs.hasPathCapability(path, Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY)) {
return true;
}
LOG.warn("Filesystem with scheme {} does not support Erasure Coding" +
" at path {}", fs.getScheme(), path);
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -704,5 +704,10 @@ private OpenFileOptions() {
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.collect(Collectors.toSet()));

/**
* EC policy to be set on the file that needs to be created : {@value}.
*/
public static final String FS_OPTION_OPENFILE_EC_POLICY =
FS_OPTION_OPENFILE + "ec.policy";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs;

import java.io.IOException;

/**
* Filesystems that support EC can implement this interface.
*/
public interface WithErasureCoding {

/**
* Get the EC Policy name of the given file's fileStatus.
* If the file is not erasure coded, this shall return null.
* Callers will make sure to check if fileStatus isInstance of
* an FS that implements this interface.
* If the call fails due to some error, this shall return null.
* @param fileStatus object of the file whose ecPolicy needs to be obtained.
* @return the ec Policy name
*/
String getErasureCodingPolicyName(FileStatus fileStatus);

/**
* Set the given ecPolicy on the path.
* The path and ecPolicyName should be valid (not null/empty, the
* implementing FS shall support the supplied ecPolicy).
* implementations can throw IOException if these conditions are not met.
* @param path on which the EC policy needs to be set.
* @param ecPolicyName the EC policy.
* @throws IOException if there is an error during the set op.
*/
void setErasureCodingPolicy(Path path, String ecPolicyName) throws
IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.WithErasureCoding;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.client.DfsPathCapabilities;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
Expand Down Expand Up @@ -146,7 +147,8 @@
@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
@InterfaceStability.Unstable
public class DistributedFileSystem extends FileSystem
implements KeyProviderTokenIssuer, BatchListingOperations, LeaseRecoverable, SafeMode {
implements KeyProviderTokenIssuer, BatchListingOperations, LeaseRecoverable, SafeMode,
WithErasureCoding {
private Path workingDir;
private URI uri;

Expand Down Expand Up @@ -376,6 +378,14 @@ public FSDataInputStream open(PathHandle fd, int bufferSize)
return dfs.createWrappedInputStream(dfsis);
}

@Override
public String getErasureCodingPolicyName(FileStatus fileStatus) {
if (!(fileStatus instanceof HdfsFileStatus)) {
return null;
}
return ((HdfsFileStatus) fileStatus).getErasureCodingPolicy().getName();
}

/**
* Create a handle to an HDFS file.
* @param st HdfsFileStatus instance from NameNode
Expand Down Expand Up @@ -3862,6 +3872,10 @@ protected EnumSet<CreateFlag> getFlags() {
*/
@Override
public FSDataOutputStream build() throws IOException {
String ecPolicy = getOptions().get(Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY, "");
if (!ecPolicy.isEmpty()) {
ecPolicyName(ecPolicy);
}
if (getFlags().contains(CreateFlag.CREATE) ||
getFlags().contains(CreateFlag.OVERWRITE)) {
if (isRecursive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Optional;

import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

Expand Down Expand Up @@ -54,6 +55,7 @@ public static Optional<Boolean> hasPathCapability(final Path path,
case CommonPathCapabilities.FS_STORAGEPOLICY:
case CommonPathCapabilities.FS_XATTRS:
case CommonPathCapabilities.FS_TRUNCATE:
case Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY:
return Optional.of(true);
case CommonPathCapabilities.FS_SYMLINKS:
return Optional.of(FileSystem.areSymlinksEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
}

if (sourceCurrStatus.isDirectory()) {
createTargetDirsWithRetry(description, target, context, sourceStatus);
createTargetDirsWithRetry(description, target, context, sourceStatus,
sourceFS);
return;
}

Expand Down Expand Up @@ -295,10 +296,10 @@ private void copyFileWithRetry(String description,
}

private void createTargetDirsWithRetry(String description, Path target,
Context context, FileStatus sourceStatus) throws IOException {
Context context, FileStatus sourceStatus, FileSystem sourceFS) throws IOException {
try {
new RetriableDirectoryCreateCommand(description).execute(target,
context, sourceStatus);
new RetriableDirectoryCreateCommand(description).execute(target, context,
sourceStatus, sourceFS);
} catch (Exception e) {
throw new IOException("mkdir failed for " + target, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@

package org.apache.hadoop.tools.mapred;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.WithErasureCoding;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.util.RetriableCommand;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.Mapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.FileUtil.checkFSSupportsEC;
import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings;

/**
Expand All @@ -36,6 +40,9 @@
*/
public class RetriableDirectoryCreateCommand extends RetriableCommand {

private static final Logger LOG =
LoggerFactory.getLogger(RetriableDirectoryCreateCommand.class);

/**
* Constructor, taking a description of the action.
* @param description Verbose description of the copy operation.
Expand All @@ -53,10 +60,11 @@ public RetriableDirectoryCreateCommand(String description) {
*/
@Override
protected Object doExecute(Object... arguments) throws Exception {
assert arguments.length == 3 : "Unexpected argument list.";
assert arguments.length == 4 : "Unexpected argument list.";
Path target = (Path)arguments[0];
Mapper.Context context = (Mapper.Context)arguments[1];
FileStatus sourceStatus = (FileStatus)arguments[2];
FileSystem sourceFs = (FileSystem)arguments[3];

FileSystem targetFS = target.getFileSystem(context.getConfiguration());
if(!targetFS.mkdirs(target)) {
Expand All @@ -66,11 +74,16 @@ protected Object doExecute(Object... arguments) throws Exception {
boolean preserveEC = getFileAttributeSettings(context)
.contains(DistCpOptions.FileAttribute.ERASURECODINGPOLICY);
if (preserveEC && sourceStatus.isErasureCoded()
&& targetFS instanceof DistributedFileSystem) {
ErasureCodingPolicy ecPolicy =
((HdfsFileStatus) sourceStatus).getErasureCodingPolicy();
DistributedFileSystem dfs = (DistributedFileSystem) targetFS;
dfs.setErasureCodingPolicy(target, ecPolicy.getName());
&& checkFSSupportsEC(sourceFs, sourceStatus.getPath())
&& checkFSSupportsEC(targetFS, target)) {
ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getByName(
((WithErasureCoding) sourceFs).getErasureCodingPolicyName(
sourceStatus));
LOG.debug("EC Policy for source path is {}", ecPolicy);
WithErasureCoding ecFs = (WithErasureCoding) targetFS;
if (ecPolicy != null) {
ecFs.setErasureCodingPolicy(target, ecPolicy.getName());
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
import java.util.EnumSet;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.tools.DistCpOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,9 +33,11 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.WithErasureCoding;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.tools.CopyListingFileStatus;
Expand All @@ -52,8 +51,10 @@

import org.apache.hadoop.classification.VisibleForTesting;

import static org.apache.hadoop.fs.FileUtil.checkFSSupportsEC;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY;
import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

Expand Down Expand Up @@ -151,8 +152,8 @@ private long doCopy(CopyListingFileStatus source, Path target,

long offset = (action == FileAction.APPEND) ?
targetFS.getFileStatus(target).getLen() : source.getChunkOffset();
long bytesRead = copyToFile(targetPath, targetFS, source,
offset, context, fileAttributes, sourceChecksum, sourceStatus);
long bytesRead = copyToFile(targetPath, targetFS, source, offset, context,
fileAttributes, sourceChecksum, sourceStatus, sourceFS);

if (!source.isSplit()) {
DistCpUtils.compareFileLengthsAndChecksums(source.getLen(), sourceFS,
Expand Down Expand Up @@ -195,7 +196,7 @@ private ChecksumOpt getChecksumOpt(EnumSet<FileAttribute> fileAttributes,
private long copyToFile(Path targetPath, FileSystem targetFS,
CopyListingFileStatus source, long sourceOffset, Mapper.Context context,
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum,
FileStatus sourceStatus)
FileStatus sourceStatus,FileSystem sourceFS)
throws IOException {
FsPermission permission = FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(targetFS.getConf()));
Expand All @@ -205,11 +206,11 @@ private long copyToFile(Path targetPath, FileSystem targetFS,
boolean preserveEC = getFileAttributeSettings(context)
.contains(DistCpOptions.FileAttribute.ERASURECODINGPOLICY);

ErasureCodingPolicy ecPolicy = null;
String ecPolicyName = null;
if (preserveEC && sourceStatus.isErasureCoded()
&& sourceStatus instanceof HdfsFileStatus
&& targetFS instanceof DistributedFileSystem) {
ecPolicy = ((HdfsFileStatus) sourceStatus).getErasureCodingPolicy();
&& checkFSSupportsEC(sourceFS, sourceStatus.getPath())
&& checkFSSupportsEC(targetFS, targetPath)) {
ecPolicyName = ((WithErasureCoding) sourceFS).getErasureCodingPolicyName(sourceStatus);
}
final OutputStream outStream;
if (action == FileAction.OVERWRITE) {
Expand All @@ -222,21 +223,21 @@ private long copyToFile(Path targetPath, FileSystem targetFS,
targetFS, targetPath);
FSDataOutputStream out;
ChecksumOpt checksumOpt = getChecksumOpt(fileAttributes, sourceChecksum);
if (!preserveEC || ecPolicy == null) {
if (!preserveEC || ecPolicyName == null) {
out = targetFS.create(targetPath, permission,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), copyBufferSize,
repl, blockSize, context, checksumOpt);
} else {
DistributedFileSystem dfs = (DistributedFileSystem) targetFS;
DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
dfs.createFile(targetPath).permission(permission).create()
.overwrite(true).bufferSize(copyBufferSize).replication(repl)
.blockSize(blockSize).progress(context).recursive()
.ecPolicyName(ecPolicy.getName());
if (checksumOpt != null) {
builder.checksumOpt(checksumOpt);
}
out = builder.build();
FSDataOutputStreamBuilder builder = targetFS.createFile(targetPath)
.permission(permission)
.overwrite(true)
.bufferSize(copyBufferSize)
.replication(repl)
.blockSize(blockSize)
.progress(context)
.recursive();
builder.opt(FS_OPTION_OPENFILE_EC_POLICY, ecPolicyName);
out = builder.build();
}
outStream = new BufferedOutputStream(out);
} else {
Expand Down
Loading

0 comments on commit 49a4958

Please sign in to comment.