Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19256 Integrate PutIfNotExist functionality into S3A createFile #7329

Open
wants to merge 9 commits into
base: HADOOP-19256-s3-conditional-writes
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,13 @@ private Constants() {
*/
public static final String FS_S3A_CREATE_PERFORMANCE = "fs.s3a.create.performance";

/**
* Flag for commit if none match.
* This can be set in the {code createFile()} builder.
* Value {@value}.
*/
public static final String FS_S3A_CONDITIONAL_FILE_CREATE = "fs.s3a.conditional.file.create";

/**
* Default value for create performance in an S3A FS.
* Value {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM;
import static org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent.*;
import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
Expand Down Expand Up @@ -148,6 +149,7 @@ class S3ABlockOutputStream extends OutputStream implements
* the blocks themselves are closed: 15 seconds.
*/
private static final Duration TIME_TO_AWAIT_CANCEL_COMPLETION = Duration.ofSeconds(15);
public static final String IF_NONE_MATCH_HEADER = "If-None-Match";

/** Object being uploaded. */
private final String key;
Expand Down Expand Up @@ -692,20 +694,28 @@ private long putObject() throws IOException {
final S3ADataBlocks.DataBlock block = getActiveBlock();
final long size = block.dataSize();
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
final PutObjectRequest putObjectRequest =
PutObjectRequest putObjectRequest =
writeOperationHelper.createPutObjectRequest(
key,
uploadData.getSize(),
builder.putOptions);
clearActiveBlock();

PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder();
Map<String, String> optionHeaders = builder.putOptions.getHeaders();
if (builder.isConditionalPutEnabled){
maybeModifiedPutIfAbsentRequest.overrideConfiguration(
override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH)));
}
final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build();

BlockUploadProgress progressCallback =
new BlockUploadProgress(block, progressListener, now());
statistics.blockUploadQueued(size);
try {
progressCallback.progressChanged(PUT_STARTED_EVENT);
// the putObject call automatically closes the upload data
writeOperationHelper.putObject(putObjectRequest,
writeOperationHelper.putObject(finalizedRequest,
builder.putOptions,
uploadData,
statistics);
Expand Down Expand Up @@ -1399,6 +1409,11 @@ public static final class BlockOutputStreamBuilder {
*/
private boolean isMultipartUploadEnabled;

/**
* Is conditional create enabled.
*/
private boolean isConditionalPutEnabled;

private BlockOutputStreamBuilder() {
}

Expand Down Expand Up @@ -1560,5 +1575,11 @@ public BlockOutputStreamBuilder withMultipartEnabled(
isMultipartUploadEnabled = value;
return this;
}

public BlockOutputStreamBuilder withConditionalPutEnabled(
final boolean value){
isConditionalPutEnabled = value;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2171,8 +2171,9 @@ private FSDataOutputStream innerCreateFile(
String destKey = putTracker.getDestKey();

// put options are derived from the option builder.
boolean conditionalCreate = options.isConditionalCreate();
final PutObjectOptions putOptions =
new PutObjectOptions(null, options.getHeaders());
new PutObjectOptions(false, conditionalCreate, null, options.getHeaders());

validateOutputStreamConfiguration(path, getConf());

Expand Down Expand Up @@ -2200,7 +2201,8 @@ private FSDataOutputStream innerCreateFile(
.withPutOptions(putOptions)
.withIOStatisticsAggregator(
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
.withMultipartEnabled(isMultipartUploadEnabled);
.withMultipartEnabled(isMultipartUploadEnabled)
.withConditionalPutEnabled(conditionalCreate);
return new FSDataOutputStream(
new S3ABlockOutputStream(builder),
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,7 @@ private CompleteMultipartUploadResponse finalizeMultipartUpload(
retrying,
() -> {
final CompleteMultipartUploadRequest.Builder requestBuilder =
getRequestFactory().newCompleteMultipartUploadRequestBuilder(
destKey, uploadId, partETags);
getRequestFactory().newCompleteMultipartUploadRequestBuilder(destKey, uploadId, partETags, putOptions);
return writeOperationHelperCallbacks.completeMultipartUpload(requestBuilder.build());
});
return uploadResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,14 @@ CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder(
* @param destKey destination object key
* @param uploadId ID of initiated upload
* @param partETags ordered list of etags
* @param putOptions options for the request
* @return the request builder.
*/
CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder(
String destKey,
String uploadId,
List<CompletedPart> partETags);
List<CompletedPart> partETags,
PutObjectOptions putOptions);

/**
* Create a HEAD object request builder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public boolean aboutToComplete(String uploadId,
PutObjectRequest originalDestPut = getWriter().createPutObjectRequest(
getOriginalDestKey(),
0,
new PutObjectOptions(null, headers));
new PutObjectOptions(true,false, null, headers));
upload(originalDestPut, EMPTY);

// build the commit summary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public interface AWSHeaders {
String DATE = "Date";
String ETAG = "ETag";
String LAST_MODIFIED = "Last-Modified";
String IF_NONE_MATCH = "If-None-Match";

/*
* Amazon HTTP Headers used by S3A.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ public class CreateFileBuilder extends
* Classic create file option set: overwriting.
*/
public static final CreateFileOptions OPTIONS_CREATE_FILE_OVERWRITE =
new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, null);
new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, false, null);

/**
* Classic create file option set: no overwrite.
*/
public static final CreateFileOptions OPTIONS_CREATE_FILE_NO_OVERWRITE =
new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, null);
new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, false, null);

/**
* Performance create options.
*/
public static final CreateFileOptions OPTIONS_CREATE_FILE_PERFORMANCE =
new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, null);
new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, false, null);

/**
* Callback interface.
Expand Down Expand Up @@ -130,7 +130,6 @@ public FSDataOutputStream build() throws IOException {
.filter(key -> key.startsWith(headerPrefix) && key.length() > prefixLen)
.forEach(key -> headers.put(key.substring(prefixLen), options.get(key)));


EnumSet<CreateFlag> flags = getFlags();
if (flags.contains(CreateFlag.APPEND)) {
throw new UnsupportedOperationException("Append is not supported");
Expand All @@ -144,11 +143,12 @@ public FSDataOutputStream build() throws IOException {

final boolean performance =
options.getBoolean(Constants.FS_S3A_CREATE_PERFORMANCE, false);
final boolean conditionalCreate =
options.getBoolean(Constants.FS_S3A_CONDITIONAL_FILE_CREATE, false);
return callbacks.createFileFromBuilder(
path,
getProgress(),
new CreateFileOptions(flags, isRecursive(), performance, headers));

new CreateFileOptions(flags, isRecursive(), performance, conditionalCreate, headers));
}

/**
Expand Down Expand Up @@ -218,6 +218,11 @@ public static final class CreateFileOptions {
*/
private final boolean performance;

/**
* conditional flag.
*/
private final boolean conditionalCreate;

/**
* Headers; may be null.
*/
Expand All @@ -227,16 +232,19 @@ public static final class CreateFileOptions {
* @param flags creation flags
* @param recursive create parent dirs?
* @param performance performance flag
* @param conditionalCreate conditional flag
* @param headers nullable header map.
*/
public CreateFileOptions(
final EnumSet<CreateFlag> flags,
final boolean recursive,
final boolean performance,
final boolean conditionalCreate,
final Map<String, String> headers) {
this.flags = flags;
this.recursive = recursive;
this.performance = performance;
this.conditionalCreate = conditionalCreate;
this.headers = headers;
}

Expand All @@ -246,6 +254,7 @@ public String toString() {
"flags=" + flags +
", recursive=" + recursive +
", performance=" + performance +
", conditionalCreate=" + conditionalCreate +
", headers=" + headers +
'}';
}
Expand All @@ -262,6 +271,10 @@ public boolean isPerformance() {
return performance;
}

public boolean isConditionalCreate() {
return conditionalCreate;
}

public Map<String, String> getHeaders() {
return headers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2;
Expand Down Expand Up @@ -260,7 +261,7 @@ private InternalConstants() {
*/
public static final Set<String> CREATE_FILE_KEYS =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE)));
new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE, FS_S3A_CONDITIONAL_FILE_CREATE)));

/**
* Dynamic Path capabilities to be evaluated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@
*/
public final class PutObjectOptions {

/**
* Can the PUT operation skip marker deletion?
*/
private final boolean keepMarkers;

/**
* Is this a conditional PUT operation
*/
private final boolean conditionalPutEnabled;

/**
* Storage class, if not null.
*/
Expand All @@ -39,15 +49,36 @@
/**
* Constructor.
* @param storageClass Storage class, if not null.
* @param conditionalPutEnabled Is this a conditional Put?
* @param headers Headers; may be null.
*/
public PutObjectOptions(

Check failure on line 55 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java#L55

javadoc: warning: no @param for keepMarkers

Check failure on line 55 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java#L55

javadoc: warning: no @param for keepMarkers
final boolean keepMarkers,
final boolean conditionalPutEnabled,
@Nullable final String storageClass,
@Nullable final Map<String, String> headers) {
this.keepMarkers = keepMarkers;
this.conditionalPutEnabled = conditionalPutEnabled;
this.storageClass = storageClass;
this.headers = headers;
}

/**
* Get the marker retention flag.
* @return true if markers are to be retained.
*/
public boolean isKeepMarkers() {
return keepMarkers;
}

/**
* Get the conditional put flag.
* @return true if it's a conditional put
*/
public boolean isconditionalPutEnabled() {
return conditionalPutEnabled;
}

/**
* Headers for the put/post request.
* @return headers or null.
Expand All @@ -66,7 +97,11 @@
/**
* Empty options.
*/
private static final PutObjectOptions EMPTY_OPTIONS = new PutObjectOptions(
private static final PutObjectOptions EMPTY_OPTIONS = new PutObjectOptions(false, false,
null, null);
private static final PutObjectOptions KEEP_DIRS = new PutObjectOptions(true, false,
null, null);
private static final PutObjectOptions DELETE_DIRS = new PutObjectOptions(false, false,
null, null);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
Expand Down Expand Up @@ -533,12 +534,19 @@ public CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder(
public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder(
String destKey,
String uploadId,
List<CompletedPart> partETags) {
List<CompletedPart> partETags,
PutObjectOptions putOptions) {

// a copy of the list is required, so that the AWS SDK doesn't
// attempt to sort an unmodifiable list.
CompleteMultipartUploadRequest.Builder requestBuilder =
CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
CompleteMultipartUploadRequest.Builder requestBuilder;
Map<String, String> optionHeaders = putOptions.getHeaders();
requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build());
if (putOptions.isconditionalPutEnabled()) {
requestBuilder.overrideConfiguration(
override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH)));
}

return prepareRequest(requestBuilder);
}
Expand Down
Loading
Loading