Skip to content

Commit

Permalink
HADOOP-19233: ABFS: [FnsOverBlob] Implementing Rename and Delete APIs…
Browse files Browse the repository at this point in the history
… over Blob Endpoint (apache#7265)

Contributed by Manish Bhatt.
Signed off by Anuj Modi, Anmol Asrani
  • Loading branch information
bhattmanish98 authored Feb 3, 2025
1 parent 741bdd6 commit 6d20de1
Show file tree
Hide file tree
Showing 45 changed files with 5,035 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,24 @@
import java.io.IOException;
import java.lang.reflect.Field;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerWithOutlierConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
Expand All @@ -65,16 +64,16 @@
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
import org.apache.hadoop.fs.azurebfs.services.KeyProvider;
import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.ReflectionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
Expand Down Expand Up @@ -399,6 +398,34 @@ public class AbfsConfiguration{
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE)
private boolean isPaginatedDeleteEnabled;

@LongConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS, DefaultValue = DEFAULT_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS)
private long blobCopyProgressPollWaitMillis;

@LongConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_COPY_MAX_WAIT_MILLIS, DefaultValue = DEFAULT_AZURE_BLOB_COPY_MAX_WAIT_MILLIS)
private long blobCopyProgressMaxWaitMillis;

@LongConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION, DefaultValue = DEFAULT_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION)
private long blobAtomicRenameLeaseRefreshDuration;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, DefaultValue = DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE)
private int producerQueueMaxSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_CONSUMER_MAX_LAG, DefaultValue = DEFAULT_FS_AZURE_CONSUMER_MAX_LAG)
private int listingMaxConsumptionLag;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_RENAME_THREAD)
private int blobRenameDirConsumptionParallelism;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_DELETE_THREAD)
private int blobDeleteDirConsumptionParallelism;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
private int maxApacheHttpClientIoExceptionsRetries;
Expand Down Expand Up @@ -1522,4 +1549,32 @@ public boolean getIsChecksumValidationEnabled() {
public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) {
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
}

public long getBlobCopyProgressPollWaitMillis() {
return blobCopyProgressPollWaitMillis;
}

public long getBlobCopyProgressMaxWaitMillis() {
return blobCopyProgressMaxWaitMillis;
}

public long getAtomicRenameLeaseRefreshDuration() {
return blobAtomicRenameLeaseRefreshDuration;
}

public int getProducerQueueMaxSize() {
return producerQueueMaxSize;
}

public int getListingMaxConsumptionLag() {
return listingMaxConsumptionLag;
}

public int getBlobRenameDirConsumptionParallelism() {
return blobRenameDirConsumptionParallelism;
}

public int getBlobDeleteDirConsumptionParallelism() {
return blobDeleteDirConsumptionParallelism;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import java.net.URISyntaxException;
import java.nio.file.AccessDeniedException;
import java.time.Duration;
import java.util.Hashtable;
import java.util.List;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -43,23 +43,15 @@
import java.util.concurrent.Future;
import javax.annotation.Nullable;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonPathCapabilities;
Expand All @@ -71,27 +63,34 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
import org.apache.hadoop.fs.azurebfs.services.AbfsLocatedFileStatus;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.utils.Listener;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
Expand All @@ -102,14 +101,16 @@
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.RateLimiting;
import org.apache.hadoop.util.RateLimitingFactory;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.RateLimiting;
import org.apache.hadoop.util.RateLimitingFactory;
import org.apache.hadoop.util.functional.RemoteIterators;

import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
Expand Down Expand Up @@ -431,8 +432,12 @@ public FSDataOutputStream create(final Path f,

@Override
@SuppressWarnings("deprecation")
public FSDataOutputStream createNonRecursive(final Path f, final FsPermission permission,
final boolean overwrite, final int bufferSize, final short replication, final long blockSize,
public FSDataOutputStream createNonRecursive(final Path f,
final FsPermission permission,
final boolean overwrite,
final int bufferSize,
final short replication,
final long blockSize,
final Progressable progress) throws IOException {

statIncrement(CALL_CREATE_NON_RECURSIVE);
Expand All @@ -442,18 +447,21 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe
ERR_CREATE_ON_ROOT,
null);
}
final Path parent = f.getParent();
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_NON_RECURSIVE, tracingHeaderFormat,
listener);
final FileStatus parentFileStatus = tryGetFileStatus(parent, tracingContext);

if (parentFileStatus == null) {
throw new FileNotFoundException("Cannot create file "
+ f.getName() + " because parent folder does not exist.");
Path qualifiedPath = makeQualified(f);
try {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_NON_RECURSIVE, tracingHeaderFormat,
listener);
OutputStream outputStream = getAbfsStore().createNonRecursive(qualifiedPath, statistics,
overwrite,
permission == null ? FsPermission.getFileDefault() : permission,
FsPermission.getUMask(getConf()), tracingContext);
statIncrement(FILES_CREATED);
return new FSDataOutputStream(outputStream, statistics);
} catch (AzureBlobFileSystemException ex) {
checkException(f, ex);
return null;
}

return create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
}

@Override
Expand All @@ -480,7 +488,10 @@ public FSDataOutputStream createNonRecursive(final Path f,
@Override
@SuppressWarnings("deprecation")
public FSDataOutputStream createNonRecursive(final Path f,
final boolean overwrite, final int bufferSize, final short replication, final long blockSize,
final boolean overwrite,
final int bufferSize,
final short replication,
final long blockSize,
final Progressable progress) throws IOException {
return this.createNonRecursive(f, FsPermission.getFileDefault(),
overwrite, bufferSize, replication, blockSize, progress);
Expand Down Expand Up @@ -530,45 +541,41 @@ public boolean rename(final Path src, final Path dst) throws IOException {
return tryGetFileStatus(qualifiedSrcPath, tracingContext) != null;
}

FileStatus dstFileStatus = null;
FileStatus dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext);
Path adjustedDst = dst;
if (qualifiedSrcPath.equals(qualifiedDstPath)) {
// rename to itself
// - if it doesn't exist, return false
// - if it is file, return true
// - if it is dir, return false.
dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext);
if (dstFileStatus == null) {
return false;
}
return dstFileStatus.isDirectory() ? false : true;
return !dstFileStatus.isDirectory();
}

// Non-HNS account need to check dst status on driver side.
if (!getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) {
dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext);
// adjust the destination path in case of FNS account.
if (!getIsNamespaceEnabled(tracingContext) && dstFileStatus != null) {
// return false if the destination is a file.
if (!dstFileStatus.isDirectory()) {
return false;
}
String sourceFileName = src.getName();
adjustedDst = new Path(dst, sourceFileName);
}

try {
String sourceFileName = src.getName();
Path adjustedDst = dst;

if (dstFileStatus != null) {
if (!dstFileStatus.isDirectory()) {
return qualifiedSrcPath.equals(qualifiedDstPath);
}
adjustedDst = new Path(dst, sourceFileName);
}

qualifiedDstPath = makeQualified(adjustedDst);

abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null);
getAbfsStore().rename(qualifiedSrcPath, qualifiedDstPath, tracingContext,
null);
return true;
} catch (AzureBlobFileSystemException ex) {
LOG.debug("Rename operation failed. ", ex);
checkException(
src,
ex,
AzureServiceErrorCode.PATH_ALREADY_EXISTS,
AzureServiceErrorCode.BLOB_ALREADY_EXISTS,
AzureServiceErrorCode.BLOB_PATH_NOT_FOUND,
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
Expand Down Expand Up @@ -641,7 +648,7 @@ public Pair<Boolean, Duration> commitSingleFileByRename(
final Duration waitTime = rateLimiting.acquire(1);

try {
final boolean recovered = abfsStore.rename(qualifiedSrcPath,
final boolean recovered = getAbfsStore().rename(qualifiedSrcPath,
qualifiedDstPath, tracingContext, sourceEtag);
return Pair.of(recovered, waitTime);
} catch (AzureBlobFileSystemException ex) {
Expand All @@ -655,9 +662,11 @@ public Pair<Boolean, Duration> commitSingleFileByRename(
}

@Override
public boolean delete(final Path f, final boolean recursive) throws IOException {
public boolean delete(final Path f, final boolean recursive)
throws IOException {
LOG.debug(
"AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(), recursive);
"AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(),
recursive);
statIncrement(CALL_DELETE);
Path qualifiedPath = makeQualified(f);

Expand All @@ -673,10 +682,13 @@ public boolean delete(final Path f, final boolean recursive) throws IOException
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.DELETE, tracingHeaderFormat,
listener);
abfsStore.delete(qualifiedPath, recursive, tracingContext);
getAbfsStore().delete(qualifiedPath, recursive, tracingContext);
return true;
} catch (AzureBlobFileSystemException ex) {
checkException(f, ex, AzureServiceErrorCode.PATH_NOT_FOUND);
checkException(f,
ex,
AzureServiceErrorCode.PATH_NOT_FOUND,
AzureServiceErrorCode.BLOB_PATH_NOT_FOUND);
return false;
}

Expand All @@ -693,7 +705,7 @@ public FileStatus[] listStatus(final Path f) throws IOException {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat,
listener);
FileStatus[] result = abfsStore.listStatus(qualifiedPath, tracingContext);
FileStatus[] result = getAbfsStore().listStatus(qualifiedPath, tracingContext);
return result;
} catch (AzureBlobFileSystemException ex) {
checkException(f, ex);
Expand Down
Loading

0 comments on commit 6d20de1

Please sign in to comment.