From e1268dff5c2151575d2f5680ba3f56a6e280d0fd Mon Sep 17 00:00:00 2001 From: diljot grewal Date: Fri, 23 Aug 2024 15:24:02 -0700 Subject: [PATCH 01/12] Integrate PutIfNotExist functionality into S3A --- .../org/apache/hadoop/fs/s3a/Constants.java | 7 ++ .../hadoop/fs/s3a/S3ABlockOutputStream.java | 13 +- .../hadoop/fs/s3a/WriteOperationHelper.java | 3 +- .../hadoop/fs/s3a/api/RequestFactory.java | 9 +- .../hadoop/fs/s3a/impl/CreateFileBuilder.java | 7 +- .../hadoop/fs/s3a/impl/InternalConstants.java | 3 +- .../fs/s3a/impl/RequestFactoryImpl.java | 16 ++- .../fs/s3a/impl/ITestS3APutIfMatch.java | 113 ++++++++++++++++++ .../fs/s3a/impl/TestCreateFileBuilder.java | 4 +- .../fs/s3a/impl/TestRequestFactory.java | 3 +- 10 files changed, 162 insertions(+), 16 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index e695e918c953d..44b7e91ce2ee4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1502,6 +1502,13 @@ private Constants() { */ public static final String FS_S3A_CREATE_PERFORMANCE = "fs.s3a.create.performance"; + /** + * Flag for commit if none match. + * This can be set in the {code createFile()} builder. + * Value {@value}. + */ + public static final String FS_S3A_CREATE_IF_NONE_MATCH = "fs.s3a.create.header.If-None-Match"; + /** * Default value for create performance in an S3A FS. * Value {@value}. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 7b249a11c07d4..cba99e573c5ed 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -148,6 +148,7 @@ class S3ABlockOutputStream extends OutputStream implements * the blocks themselves are closed: 15 seconds. */ private static final Duration TIME_TO_AWAIT_CANCEL_COMPLETION = Duration.ofSeconds(15); + public static final String IF_NONE_MATCH_HEADER = "If-None-Match"; /** Object being uploaded. */ private final String key; @@ -692,13 +693,23 @@ private long putObject() throws IOException { final S3ADataBlocks.DataBlock block = getActiveBlock(); final long size = block.dataSize(); final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); - final PutObjectRequest putObjectRequest = + PutObjectRequest putObjectRequest = writeOperationHelper.createPutObjectRequest( key, uploadData.getSize(), builder.putOptions); clearActiveBlock(); + PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder(); + Map optionHeaders = builder.putOptions.getHeaders(); + + if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH_HEADER)) { + maybeModifiedPutIfAbsentRequest.overrideConfiguration( + override -> override.putHeader(IF_NONE_MATCH_HEADER, optionHeaders.get(IF_NONE_MATCH_HEADER))); + } + + final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build(); + BlockUploadProgress progressCallback = new BlockUploadProgress(block, progressListener, now()); statistics.blockUploadQueued(size); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 969c1023d7347..364f780863a01 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -318,8 +318,7 @@ private CompleteMultipartUploadResponse finalizeMultipartUpload( retrying, () -> { final CompleteMultipartUploadRequest.Builder requestBuilder = - getRequestFactory().newCompleteMultipartUploadRequestBuilder( - destKey, uploadId, partETags); + getRequestFactory().newCompleteMultipartUploadRequestBuilder(destKey, uploadId, partETags, putOptions); return writeOperationHelperCallbacks.completeMultipartUpload(requestBuilder.build()); }); return uploadResult; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index c69e3394c3dd3..1696e00b51883 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -165,15 +165,16 @@ CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder( /** * Complete a multipart upload. - * @param destKey destination object key - * @param uploadId ID of initiated upload - * @param partETags ordered list of etags + * + * @param destKey destination object key + * @param uploadId ID of initiated upload + * @param partETags ordered list of etags * @return the request builder. */ CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( String destKey, String uploadId, - List partETags); + List partETags, PutObjectOptions putOptions); /** * Create a HEAD object request builder. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java index ae2945989ddd3..b50ac6aa948d8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java @@ -224,10 +224,11 @@ public static final class CreateFileOptions { private final Map headers; /** - * @param flags creation flags - * @param recursive create parent dirs? + * @param flags creation flags + * @param recursive create parent dirs? * @param performance performance flag - * @param headers nullable header map. + * @param + * @param headers nullable header map. */ public CreateFileOptions( final EnumSet flags, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 5bb64ddc28920..d261ea7b5cd6a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -40,6 +40,7 @@ import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE; import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2; @@ -260,7 +261,7 @@ private InternalConstants() { */ public static final Set CREATE_FILE_KEYS = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE))); + new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE, FS_S3A_CREATE_IF_NONE_MATCH))); /** * Dynamic Path capabilities to be evaluated diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 00d9368aa58f1..adc606d3456d7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -533,12 +533,22 @@ public CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder( public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( String destKey, String uploadId, - List partETags) { + List partETags, + PutObjectOptions putOptions) { + // a copy of the list is required, so that the AWS SDK doesn't // attempt to sort an unmodifiable list. - CompleteMultipartUploadRequest.Builder requestBuilder = - CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) + CompleteMultipartUploadRequest.Builder requestBuilder; + Map optionHeaders = putOptions.getHeaders(); + + if (optionHeaders != null && optionHeaders.containsKey("If-None-Match")) { + requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) + .overrideConfiguration(override ->override.putHeader("If-None-Match", optionHeaders.get("If-None-Match"))) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); + } else { + requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) + .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); + } return prepareRequest(requestBuilder); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java new file mode 100644 index 0000000000000..2b8d98c9a224b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java @@ -0,0 +1,113 @@ +package org.apache.hadoop.fs.s3a.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.RemoteFileChangedException; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.io.IOUtils; + +import org.junit.Assert; +import org.junit.Test; +import software.amazon.awssdk.services.s3.model.S3Exception; + +import java.io.IOException; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH; +import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; +import static org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE; +import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; + + +public class ITestS3APutIfMatch extends AbstractS3ATestBase { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + removeBaseAndBucketOverrides(conf, + MULTIPART_SIZE, + UPLOAD_PART_COUNT_LIMIT); + conf.setLong(MULTIPART_SIZE, MPU_SIZE); + conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2); + conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); + conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); + conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName()); + return conf; + } + + protected String getBlockOutputBufferName() { + return FAST_UPLOAD_BUFFER_ARRAY; + } + + /** + * Create a file using the PutIfMatch feature from S3 + * @param fs filesystem + * @param path path to write + * @param data source dataset. Can be null + * @throws IOException on any problem + */ + private static void createFileWithIfNoneMatchFlag(FileSystem fs, + Path path, + byte[] data, + String ifMatchTag) throws Exception { + FSDataOutputStreamBuilder builder = fs.createFile(path); + builder.must(FS_S3A_CREATE_IF_NONE_MATCH, ifMatchTag); + FSDataOutputStream stream = builder.create().build(); + if (data != null && data.length > 0) { + stream.write(data); + } + stream.close(); + IOUtils.closeStream(stream); + } + + @Test + public void testPutIfAbsentConflict() throws IOException { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + + fs.mkdirs(testFile.getParent()); + byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255); + + try { + createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + } catch (Exception e) { + Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); + + S3Exception s3Exception = (S3Exception) e.getCause(); + Assert.assertEquals(s3Exception.statusCode(), 412); + } + } + + + @Test + public void testPutIfAbsentLargeFileConflict() throws IOException { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + + fs.mkdirs(testFile.getParent()); + // enough bytes for Multipart Upload + byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a'); + + try { + createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + } catch (Exception e) { + Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); + + // Error gets caught here: + S3Exception s3Exception = (S3Exception) e.getCause(); + Assert.assertEquals(s3Exception.statusCode(), 412); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java index 65d7aa6192dd8..a582a9ce6fc50 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java @@ -89,11 +89,13 @@ public void testPerformanceSupport() throws Throwable { public void testHeaderOptions() throws Throwable { final CreateFileBuilder builder = mkBuilder().create() .must(FS_S3A_CREATE_HEADER + ".retention", "permanent") + .must(FS_S3A_CREATE_HEADER + ".If-None-Match", "*") .opt(FS_S3A_CREATE_HEADER + ".owner", "engineering"); final Map headers = build(builder).getHeaders(); Assertions.assertThat(headers) .containsEntry("retention", "permanent") - .containsEntry("owner", "engineering"); + .containsEntry("owner", "engineering") + .containsEntry("If-None-Match", "*"); } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index 15e8a4485d558..4079079c5321b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.atomic.AtomicLong; import software.amazon.awssdk.awscore.AwsRequest; @@ -166,7 +167,7 @@ private void createFactoryObjects(RequestFactory factory) throws IOException { String id = "1"; a(factory.newAbortMultipartUploadRequestBuilder(path, id)); a(factory.newCompleteMultipartUploadRequestBuilder(path, id, - new ArrayList<>())); + new ArrayList<>(), new PutObjectOptions(false, "some class", Collections.emptyMap()))); a(factory.newCopyObjectRequestBuilder(path, path2, HeadObjectResponse.builder().build())); a(factory.newDeleteObjectRequestBuilder(path)); From 634806c47f98df9cc97a8568ea0e7d49905c9a84 Mon Sep 17 00:00:00 2001 From: diljot grewal Date: Thu, 26 Sep 2024 21:31:07 -0700 Subject: [PATCH 02/12] addressing PR feedback --- .../org/apache/hadoop/fs/s3a/Constants.java | 2 +- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 16 ++++++- .../hadoop/fs/s3a/api/RequestFactory.java | 11 ++--- .../apache/hadoop/fs/s3a/impl/AWSHeaders.java | 1 + .../hadoop/fs/s3a/impl/CreateFileBuilder.java | 30 +++++++++---- .../hadoop/fs/s3a/impl/InternalConstants.java | 4 +- .../fs/s3a/impl/RequestFactoryImpl.java | 11 +++-- .../fs/s3a/impl/ITestS3APutIfMatch.java | 45 +++++++++++++++---- .../fs/s3a/impl/TestCreateFileBuilder.java | 5 ++- 9 files changed, 91 insertions(+), 34 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 44b7e91ce2ee4..535de12a0544a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1507,7 +1507,7 @@ private Constants() { * This can be set in the {code createFile()} builder. * Value {@value}. */ - public static final String FS_S3A_CREATE_IF_NONE_MATCH = "fs.s3a.create.header.If-None-Match"; + public static final String FS_S3A_CONDITIONAL_FILE_CREATE = "fs.s3a.conditional.file.create"; /** * Default value for create performance in an S3A FS. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index cba99e573c5ed..169acfbc5f27a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -81,6 +81,7 @@ import static org.apache.hadoop.fs.s3a.Statistic.*; import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM; import static org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent.*; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; @@ -703,9 +704,9 @@ private long putObject() throws IOException { PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder(); Map optionHeaders = builder.putOptions.getHeaders(); - if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH_HEADER)) { + if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH)) { maybeModifiedPutIfAbsentRequest.overrideConfiguration( - override -> override.putHeader(IF_NONE_MATCH_HEADER, optionHeaders.get(IF_NONE_MATCH_HEADER))); + override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); } final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build(); @@ -1410,6 +1411,11 @@ public static final class BlockOutputStreamBuilder { */ private boolean isMultipartUploadEnabled; + /** + * Is conditional create enables. + */ + private boolean isConditionalEnabled; + private BlockOutputStreamBuilder() { } @@ -1571,5 +1577,11 @@ public BlockOutputStreamBuilder withMultipartEnabled( isMultipartUploadEnabled = value; return this; } + + public BlockOutputStreamBuilder withConditionalEnabled( + final boolean value){ + isConditionalEnabled = value; + return this; + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index 1696e00b51883..294f0a08f9257 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -165,16 +165,17 @@ CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder( /** * Complete a multipart upload. - * - * @param destKey destination object key - * @param uploadId ID of initiated upload - * @param partETags ordered list of etags + * @param destKey destination object key + * @param uploadId ID of initiated upload + * @param partETags ordered list of etags + * @param putOptions options for the request * @return the request builder. */ CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( String destKey, String uploadId, - List partETags, PutObjectOptions putOptions); + List partETags, + PutObjectOptions putOptions); /** * Create a HEAD object request builder. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java index fe7b45147986f..bd9cb0487b0d8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java @@ -38,6 +38,7 @@ public interface AWSHeaders { String DATE = "Date"; String ETAG = "ETag"; String LAST_MODIFIED = "Last-Modified"; + String IF_NONE_MATCH = "If-None-Match"; /* * Amazon HTTP Headers used by S3A. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java index b50ac6aa948d8..2421bfc17488a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java @@ -63,19 +63,19 @@ public class CreateFileBuilder extends * Classic create file option set: overwriting. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_OVERWRITE = - new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, null); + new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, false, null); /** * Classic create file option set: no overwrite. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_NO_OVERWRITE = - new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, null); + new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, false, null); /** * Performance create options. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_PERFORMANCE = - new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, null); + new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, false, null); /** * Callback interface. @@ -144,10 +144,12 @@ public FSDataOutputStream build() throws IOException { final boolean performance = options.getBoolean(Constants.FS_S3A_CREATE_PERFORMANCE, false); + final boolean conditionalCreate = + options.getBoolean(Constants.FS_S3A_CONDITIONAL_FILE_CREATE, false); return callbacks.createFileFromBuilder( path, getProgress(), - new CreateFileOptions(flags, isRecursive(), performance, headers)); + new CreateFileOptions(flags, isRecursive(), performance, conditionalCreate, headers)); } @@ -218,26 +220,33 @@ public static final class CreateFileOptions { */ private final boolean performance; + /** + * conditional flag. + */ + private final boolean conditionalCreate; + /** * Headers; may be null. */ private final Map headers; /** - * @param flags creation flags - * @param recursive create parent dirs? + * @param flags creation flags + * @param recursive create parent dirs? * @param performance performance flag - * @param - * @param headers nullable header map. + * @param conditionalCreate conditional flag + * @param headers nullable header map. */ public CreateFileOptions( final EnumSet flags, final boolean recursive, final boolean performance, + final boolean conditionalCreate, final Map headers) { this.flags = flags; this.recursive = recursive; this.performance = performance; + this.conditionalCreate = conditionalCreate; this.headers = headers; } @@ -247,6 +256,7 @@ public String toString() { "flags=" + flags + ", recursive=" + recursive + ", performance=" + performance + + ", conditionalCreate=" + conditionalCreate + ", headers=" + headers + '}'; } @@ -263,6 +273,10 @@ public boolean isPerformance() { return performance; } + public boolean isConditionalCreate() { + return conditionalCreate; + } + public Map getHeaders() { return headers; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index d261ea7b5cd6a..6c9db6dfd6a9f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -40,7 +40,7 @@ import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE; import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; -import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2; @@ -261,7 +261,7 @@ private InternalConstants() { */ public static final Set CREATE_FILE_KEYS = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE, FS_S3A_CREATE_IF_NONE_MATCH))); + new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE, FS_S3A_CONDITIONAL_FILE_CREATE))); /** * Dynamic Path capabilities to be evaluated diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index adc606d3456d7..631ca33facbed 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -62,6 +62,7 @@ import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM; import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; @@ -541,13 +542,11 @@ public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestB CompleteMultipartUploadRequest.Builder requestBuilder; Map optionHeaders = putOptions.getHeaders(); - if (optionHeaders != null && optionHeaders.containsKey("If-None-Match")) { - requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) - .overrideConfiguration(override ->override.putHeader("If-None-Match", optionHeaders.get("If-None-Match"))) - .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); - } else { - requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) + requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); + if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH)) { + requestBuilder = CompleteMultipartUploadRequest.builder().overrideConfiguration( + override ->override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); } return prepareRequest(requestBuilder); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java index 2b8d98c9a224b..279ec9cdf88da 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.fs.s3a.impl; import org.apache.hadoop.conf.Configuration; @@ -5,11 +23,12 @@ import org.apache.hadoop.fs.FSDataOutputStreamBuilder; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.s3a.RemoteFileChangedException; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.io.IOUtils; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Test; import software.amazon.awssdk.services.s3.model.S3Exception; @@ -18,20 +37,23 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; -import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE; import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE; import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; -public class ITestS3APutIfMatch extends AbstractS3ATestBase { +public class ITestS3APutIfMatch extends AbstractS3ACostTest { + + private Configuration conf; @Override - protected Configuration createConfiguration() { + public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); S3ATestUtils.disableFilesystemCaching(conf); removeBaseAndBucketOverrides(conf, @@ -45,6 +67,14 @@ protected Configuration createConfiguration() { return conf; } + @Override + public void setup() throws Exception { + super.setup(); + conf = createConfiguration(); + skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_FILE_CREATE, + "Skipping IfNoneMatch tests"); + } + protected String getBlockOutputBufferName() { return FAST_UPLOAD_BUFFER_ARRAY; } @@ -61,7 +91,7 @@ private static void createFileWithIfNoneMatchFlag(FileSystem fs, byte[] data, String ifMatchTag) throws Exception { FSDataOutputStreamBuilder builder = fs.createFile(path); - builder.must(FS_S3A_CREATE_IF_NONE_MATCH, ifMatchTag); + builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag); FSDataOutputStream stream = builder.create().build(); if (data != null && data.length > 0) { stream.write(data); @@ -85,7 +115,7 @@ public void testPutIfAbsentConflict() throws IOException { Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); S3Exception s3Exception = (S3Exception) e.getCause(); - Assert.assertEquals(s3Exception.statusCode(), 412); + Assertions.assertThat(s3Exception.statusCode()).isEqualTo(412); } } @@ -95,7 +125,6 @@ public void testPutIfAbsentLargeFileConflict() throws IOException { FileSystem fs = getFileSystem(); Path testFile = methodPath(); - fs.mkdirs(testFile.getParent()); // enough bytes for Multipart Upload byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a'); @@ -107,7 +136,7 @@ public void testPutIfAbsentLargeFileConflict() throws IOException { // Error gets caught here: S3Exception s3Exception = (S3Exception) e.getCause(); - Assert.assertEquals(s3Exception.statusCode(), 412); + Assertions.assertThat(s3Exception.statusCode()).isEqualTo(412); } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java index a582a9ce6fc50..9759cf610e8f0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java @@ -37,6 +37,7 @@ import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; /** * Unit test of {@link CreateFileBuilder}. @@ -89,13 +90,13 @@ public void testPerformanceSupport() throws Throwable { public void testHeaderOptions() throws Throwable { final CreateFileBuilder builder = mkBuilder().create() .must(FS_S3A_CREATE_HEADER + ".retention", "permanent") - .must(FS_S3A_CREATE_HEADER + ".If-None-Match", "*") + .must(FS_S3A_CREATE_HEADER + ".".concat(IF_NONE_MATCH), "*") .opt(FS_S3A_CREATE_HEADER + ".owner", "engineering"); final Map headers = build(builder).getHeaders(); Assertions.assertThat(headers) .containsEntry("retention", "permanent") .containsEntry("owner", "engineering") - .containsEntry("If-None-Match", "*"); + .containsEntry(IF_NONE_MATCH, "*"); } @Test From c66e7193106e51544a3529f2262bc106166748a6 Mon Sep 17 00:00:00 2001 From: diljot grewal Date: Wed, 9 Oct 2024 23:44:04 -0700 Subject: [PATCH 03/12] addressing review comments, added flag to PutObjectOptions --- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 19 +++--- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 6 +- .../commit/magic/S3MagicCommitTracker.java | 2 +- .../hadoop/fs/s3a/impl/PutObjectOptions.java | 34 ++++++++++ .../fs/s3a/impl/RequestFactoryImpl.java | 6 +- .../fs/s3a/impl/ITestS3APutIfMatch.java | 64 +++++++++++-------- .../fs/s3a/impl/TestCreateFileBuilder.java | 2 +- .../fs/s3a/impl/TestRequestFactory.java | 2 +- 8 files changed, 89 insertions(+), 46 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 169acfbc5f27a..d9a5dc90f544a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -79,9 +79,9 @@ import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.s3a.S3AUtils.translateException; import static org.apache.hadoop.fs.s3a.Statistic.*; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM; import static org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent.*; -import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; @@ -702,13 +702,10 @@ private long putObject() throws IOException { clearActiveBlock(); PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder(); - Map optionHeaders = builder.putOptions.getHeaders(); - - if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH)) { + if (builder.isConditionalPutEnabled){ maybeModifiedPutIfAbsentRequest.overrideConfiguration( - override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); + override -> override.putHeader(IF_NONE_MATCH, "*")); } - final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build(); BlockUploadProgress progressCallback = @@ -717,7 +714,7 @@ private long putObject() throws IOException { try { progressCallback.progressChanged(PUT_STARTED_EVENT); // the putObject call automatically closes the upload data - writeOperationHelper.putObject(putObjectRequest, + writeOperationHelper.putObject(finalizedRequest, builder.putOptions, uploadData, statistics); @@ -1412,9 +1409,9 @@ public static final class BlockOutputStreamBuilder { private boolean isMultipartUploadEnabled; /** - * Is conditional create enables. + * Is conditional create enabled. */ - private boolean isConditionalEnabled; + private boolean isConditionalPutEnabled; private BlockOutputStreamBuilder() { } @@ -1578,9 +1575,9 @@ public BlockOutputStreamBuilder withMultipartEnabled( return this; } - public BlockOutputStreamBuilder withConditionalEnabled( + public BlockOutputStreamBuilder withConditionalPutEnabled( final boolean value){ - isConditionalEnabled = value; + isConditionalPutEnabled = value; return this; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 14031ed007ef8..d93acbeb0ebb3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -2171,8 +2171,9 @@ private FSDataOutputStream innerCreateFile( String destKey = putTracker.getDestKey(); // put options are derived from the option builder. + boolean conditionalCreate = options.isConditionalCreate(); final PutObjectOptions putOptions = - new PutObjectOptions(null, options.getHeaders()); + new PutObjectOptions(false, false, null, options.getHeaders()); validateOutputStreamConfiguration(path, getConf()); @@ -2200,7 +2201,8 @@ private FSDataOutputStream innerCreateFile( .withPutOptions(putOptions) .withIOStatisticsAggregator( IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) - .withMultipartEnabled(isMultipartUploadEnabled); + .withMultipartEnabled(isMultipartUploadEnabled) + .withConditionalPutEnabled(conditionalCreate); return new FSDataOutputStream( new S3ABlockOutputStream(builder), null); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java index ecc3496ce8f3a..1ebcb1c83ae91 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java @@ -79,7 +79,7 @@ public boolean aboutToComplete(String uploadId, PutObjectRequest originalDestPut = getWriter().createPutObjectRequest( getOriginalDestKey(), 0, - new PutObjectOptions(null, headers)); + new PutObjectOptions(true,false, null, headers)); upload(originalDestPut, EMPTY); // build the commit summary diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java index 1ca502c44cde6..0788f94c77bb1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java @@ -26,6 +26,16 @@ */ public final class PutObjectOptions { + /** + * Can the PUT operation skip marker deletion? + */ + private final boolean keepMarkers; + + /** + * Is this a conditional PUT operation + */ + private final boolean conditionalPutEnabled; + /** * Storage class, if not null. */ @@ -39,15 +49,36 @@ public final class PutObjectOptions { /** * Constructor. * @param storageClass Storage class, if not null. + * @param conditionalPutEnabled Is this a conditional Put? * @param headers Headers; may be null. */ public PutObjectOptions( + final boolean keepMarkers, + final boolean conditionalPutEnabled, @Nullable final String storageClass, @Nullable final Map headers) { + this.keepMarkers = keepMarkers; + this.conditionalPutEnabled = conditionalPutEnabled; this.storageClass = storageClass; this.headers = headers; } + /** + * Get the marker retention flag. + * @return true if markers are to be retained. + */ + public boolean isKeepMarkers() { + return keepMarkers; + } + + /** + * Get the conditional put flag. + * @return true if it's a conditional put + */ + public boolean isconditionalPutEnabled() { + return conditionalPutEnabled; + } + /** * Headers for the put/post request. * @return headers or null. @@ -67,6 +98,9 @@ public String toString() { * Empty options. */ private static final PutObjectOptions EMPTY_OPTIONS = new PutObjectOptions( + private static final PutObjectOptions KEEP_DIRS = new PutObjectOptions(true, false, + null, null); + private static final PutObjectOptions DELETE_DIRS = new PutObjectOptions(false, false, null, null); /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 631ca33facbed..1c45b1cdd1f78 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -540,13 +540,13 @@ public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestB // a copy of the list is required, so that the AWS SDK doesn't // attempt to sort an unmodifiable list. CompleteMultipartUploadRequest.Builder requestBuilder; - Map optionHeaders = putOptions.getHeaders(); requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); - if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH)) { + if (putOptions.isconditionalPutEnabled()){ requestBuilder = CompleteMultipartUploadRequest.builder().overrideConfiguration( - override ->override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); + override ->override.putHeader(IF_NONE_MATCH, "*")); + } return prepareRequest(requestBuilder); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java index 279ec9cdf88da..9a10ac544cec1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java @@ -34,6 +34,8 @@ import software.amazon.awssdk.services.s3.model.S3Exception; import java.io.IOException; +import java.nio.file.AccessDeniedException; + import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; @@ -46,6 +48,7 @@ import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE; import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; public class ITestS3APutIfMatch extends AbstractS3ACostTest { @@ -56,14 +59,16 @@ public class ITestS3APutIfMatch extends AbstractS3ACostTest { public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); S3ATestUtils.disableFilesystemCaching(conf); - removeBaseAndBucketOverrides(conf, - MULTIPART_SIZE, - UPLOAD_PART_COUNT_LIMIT); + removeBaseAndBucketOverrides( + conf, + MULTIPART_SIZE, + UPLOAD_PART_COUNT_LIMIT, + MIN_MULTIPART_THRESHOLD, + MULTIPART_SIZE); conf.setLong(MULTIPART_SIZE, MPU_SIZE); conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2); conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); - conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName()); return conf; } @@ -75,6 +80,14 @@ public void setup() throws Exception { "Skipping IfNoneMatch tests"); } + private static void assertS3ExceptionStatusCode(int code, Exception ex) { + S3Exception s3Exception = (S3Exception) ex.getCause(); + + if (s3Exception.statusCode() != code) { + throw new AssertionError("Expected status code " + code + " from " + ex, ex); + } + } + protected String getBlockOutputBufferName() { return FAST_UPLOAD_BUFFER_ARRAY; } @@ -86,10 +99,11 @@ protected String getBlockOutputBufferName() { * @param data source dataset. Can be null * @throws IOException on any problem */ - private static void createFileWithIfNoneMatchFlag(FileSystem fs, - Path path, - byte[] data, - String ifMatchTag) throws Exception { + private static void createFileWithIfNoneMatchFlag( + FileSystem fs, + Path path, + byte[] data, + String ifMatchTag) throws Exception { FSDataOutputStreamBuilder builder = fs.createFile(path); builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag); FSDataOutputStream stream = builder.create().build(); @@ -101,42 +115,38 @@ private static void createFileWithIfNoneMatchFlag(FileSystem fs, } @Test - public void testPutIfAbsentConflict() throws IOException { + public void testPutIfAbsentConflict() throws Throwable { FileSystem fs = getFileSystem(); Path testFile = methodPath(); fs.mkdirs(testFile.getParent()); byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255); - try { - createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); - createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); - } catch (Exception e) { - Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); + RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, firstException); - S3Exception s3Exception = (S3Exception) e.getCause(); - Assertions.assertThat(s3Exception.statusCode()).isEqualTo(412); - } + RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, secondException); } @Test - public void testPutIfAbsentLargeFileConflict() throws IOException { + public void testPutIfAbsentLargeFileConflict() throws Throwable { FileSystem fs = getFileSystem(); Path testFile = methodPath(); // enough bytes for Multipart Upload byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a'); - try { - createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); - createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); - } catch (Exception e) { - Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); + RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, firstException); + + RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, secondException); - // Error gets caught here: - S3Exception s3Exception = (S3Exception) e.getCause(); - Assertions.assertThat(s3Exception.statusCode()).isEqualTo(412); - } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java index 9759cf610e8f0..c9682abab3e7f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java @@ -90,7 +90,7 @@ public void testPerformanceSupport() throws Throwable { public void testHeaderOptions() throws Throwable { final CreateFileBuilder builder = mkBuilder().create() .must(FS_S3A_CREATE_HEADER + ".retention", "permanent") - .must(FS_S3A_CREATE_HEADER + ".".concat(IF_NONE_MATCH), "*") + .must(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, "*") .opt(FS_S3A_CREATE_HEADER + ".owner", "engineering"); final Map headers = build(builder).getHeaders(); Assertions.assertThat(headers) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index 4079079c5321b..c0fa66cc74515 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -167,7 +167,7 @@ private void createFactoryObjects(RequestFactory factory) throws IOException { String id = "1"; a(factory.newAbortMultipartUploadRequestBuilder(path, id)); a(factory.newCompleteMultipartUploadRequestBuilder(path, id, - new ArrayList<>(), new PutObjectOptions(false, "some class", Collections.emptyMap()))); + new ArrayList<>(), new PutObjectOptions(false, false,"some class", Collections.emptyMap()))); a(factory.newCopyObjectRequestBuilder(path, path2, HeadObjectResponse.builder().build())); a(factory.newDeleteObjectRequestBuilder(path)); From 52549e650844b30ecd476726caae2e871555edaa Mon Sep 17 00:00:00 2001 From: diljot grewal Date: Tue, 15 Oct 2024 00:44:21 -0700 Subject: [PATCH 04/12] reverting to using optionheaders --- .../java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java | 3 ++- .../org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index d9a5dc90f544a..613188a9ac69b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -702,9 +702,10 @@ private long putObject() throws IOException { clearActiveBlock(); PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder(); + Map optionHeaders = builder.putOptions.getHeaders(); if (builder.isConditionalPutEnabled){ maybeModifiedPutIfAbsentRequest.overrideConfiguration( - override -> override.putHeader(IF_NONE_MATCH, "*")); + override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); } final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 1c45b1cdd1f78..c0d3b58b893a7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -540,12 +540,12 @@ public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestB // a copy of the list is required, so that the AWS SDK doesn't // attempt to sort an unmodifiable list. CompleteMultipartUploadRequest.Builder requestBuilder; - + Map optionHeaders = putOptions.getHeaders(); requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); if (putOptions.isconditionalPutEnabled()){ requestBuilder = CompleteMultipartUploadRequest.builder().overrideConfiguration( - override ->override.putHeader(IF_NONE_MATCH, "*")); + override ->override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); } From 8a87a7748558edb4b295a0b6c4f4234157bc4762 Mon Sep 17 00:00:00 2001 From: diljot grewal Date: Tue, 15 Oct 2024 00:44:21 -0700 Subject: [PATCH 05/12] reverting to using optionheaders From cd663bd67234f56d9f573a7a7ae199678db7c816 Mon Sep 17 00:00:00 2001 From: diljot grewal Date: Tue, 15 Oct 2024 10:54:22 -0700 Subject: [PATCH 06/12] redundant set for multipart_size --- .../org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java index 9a10ac544cec1..46b5a6c35de7f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java @@ -46,7 +46,6 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; -import static org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE; import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -63,9 +62,7 @@ public Configuration createConfiguration() { conf, MULTIPART_SIZE, UPLOAD_PART_COUNT_LIMIT, - MIN_MULTIPART_THRESHOLD, - MULTIPART_SIZE); - conf.setLong(MULTIPART_SIZE, MPU_SIZE); + MIN_MULTIPART_THRESHOLD); conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2); conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); From 4c1658de6a1878ed6ab7a36ef15a961d4fd5349f Mon Sep 17 00:00:00 2001 From: diljot grewal Date: Tue, 15 Oct 2024 10:54:22 -0700 Subject: [PATCH 07/12] redundant set for multipart_size From 29ff04acf62fa646368f04fd519d44f662996635 Mon Sep 17 00:00:00 2001 From: Saikat Roy Date: Thu, 16 Jan 2025 20:27:06 +0530 Subject: [PATCH 08/12] fixing merge conflict --- .../java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java index 0788f94c77bb1..ca43b3b7f58bd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java @@ -97,7 +97,8 @@ public String toString() { /** * Empty options. */ - private static final PutObjectOptions EMPTY_OPTIONS = new PutObjectOptions( + private static final PutObjectOptions EMPTY_OPTIONS = new PutObjectOptions(false, false, + null, null); private static final PutObjectOptions KEEP_DIRS = new PutObjectOptions(true, false, null, null); private static final PutObjectOptions DELETE_DIRS = new PutObjectOptions(false, false, From aa1defccf49717b7be391ebde37d0346fd8bb921 Mon Sep 17 00:00:00 2001 From: Saikat Roy Date: Mon, 27 Jan 2025 20:29:08 +0530 Subject: [PATCH 09/12] Fixing tests: testPutIfAbsentConflict, testPutIfAbsentLargeFileConflict --- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 2 +- .../hadoop/fs/s3a/impl/CreateFileBuilder.java | 2 -- .../fs/s3a/impl/RequestFactoryImpl.java | 7 ++--- .../fs/s3a/impl/ITestS3APutIfMatch.java | 29 ++++++++++--------- 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index d93acbeb0ebb3..a4b90fd86ee89 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -2173,7 +2173,7 @@ private FSDataOutputStream innerCreateFile( // put options are derived from the option builder. boolean conditionalCreate = options.isConditionalCreate(); final PutObjectOptions putOptions = - new PutObjectOptions(false, false, null, options.getHeaders()); + new PutObjectOptions(false, conditionalCreate, null, options.getHeaders()); validateOutputStreamConfiguration(path, getConf()); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java index 2421bfc17488a..fecc2459c8f54 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java @@ -130,7 +130,6 @@ public FSDataOutputStream build() throws IOException { .filter(key -> key.startsWith(headerPrefix) && key.length() > prefixLen) .forEach(key -> headers.put(key.substring(prefixLen), options.get(key))); - EnumSet flags = getFlags(); if (flags.contains(CreateFlag.APPEND)) { throw new UnsupportedOperationException("Append is not supported"); @@ -150,7 +149,6 @@ public FSDataOutputStream build() throws IOException { path, getProgress(), new CreateFileOptions(flags, isRecursive(), performance, conditionalCreate, headers)); - } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index c0d3b58b893a7..69c8303dde7d1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -543,10 +543,9 @@ public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestB Map optionHeaders = putOptions.getHeaders(); requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); - if (putOptions.isconditionalPutEnabled()){ - requestBuilder = CompleteMultipartUploadRequest.builder().overrideConfiguration( - override ->override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); - + if (putOptions.isconditionalPutEnabled()) { + requestBuilder.overrideConfiguration( + override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); } return prepareRequest(requestBuilder); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java index 46b5a6c35de7f..80935a459847e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java @@ -28,23 +28,21 @@ import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.io.IOUtils; -import org.assertj.core.api.Assertions; -import org.junit.Assert; import org.junit.Test; import software.amazon.awssdk.services.s3.model.S3Exception; import java.io.IOException; -import java.nio.file.AccessDeniedException; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; -import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER; import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -101,24 +99,26 @@ private static void createFileWithIfNoneMatchFlag( Path path, byte[] data, String ifMatchTag) throws Exception { - FSDataOutputStreamBuilder builder = fs.createFile(path); - builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag); - FSDataOutputStream stream = builder.create().build(); - if (data != null && data.length > 0) { - stream.write(data); - } - stream.close(); - IOUtils.closeStream(stream); + FSDataOutputStreamBuilder builder = fs.createFile(path); + builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, "true"); + builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, ifMatchTag); + FSDataOutputStream stream = builder.create().build(); + if (data != null && data.length > 0) { + stream.write(data); + } + stream.close(); + IOUtils.closeStream(stream); } @Test public void testPutIfAbsentConflict() throws Throwable { FileSystem fs = getFileSystem(); Path testFile = methodPath(); - fs.mkdirs(testFile.getParent()); byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255); + createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); assertS3ExceptionStatusCode(412, firstException); @@ -137,6 +137,8 @@ public void testPutIfAbsentLargeFileConflict() throws Throwable { // enough bytes for Multipart Upload byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a'); + createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); assertS3ExceptionStatusCode(412, firstException); @@ -144,6 +146,5 @@ public void testPutIfAbsentLargeFileConflict() throws Throwable { RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); assertS3ExceptionStatusCode(412, secondException); - } } From b19da55cc3664c6145d8285e2e8643c938036118 Mon Sep 17 00:00:00 2001 From: Saikat Roy Date: Thu, 6 Feb 2025 16:36:35 +0530 Subject: [PATCH 10/12] Changes based on review comments --- .../org/apache/hadoop/fs/s3a/Constants.java | 8 +++ .../hadoop/fs/s3a/S3ABlockOutputStream.java | 2 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 3 +- .../commit/magic/S3MagicCommitTracker.java | 2 +- .../hadoop/fs/s3a/impl/PutObjectOptions.java | 53 +++++++++---------- .../fs/s3a/impl/RequestFactoryImpl.java | 9 ++-- .../fs/s3a/impl/ITestS3APutIfMatch.java | 25 ++++----- .../fs/s3a/impl/TestRequestFactory.java | 2 +- 8 files changed, 53 insertions(+), 51 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 535de12a0544a..b047547640e3e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1787,4 +1787,12 @@ private Constants() { * Value: {@value}. */ public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit"; + + /** + * Value for the {@code If-None-Match} HTTP header in S3 requests. + * Value: {@value}. + * More information: + * AWS S3 PutObject API Documentation + */ + public static final String IF_NONE_MATCH_STAR = "*"; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 613188a9ac69b..f1c8ded23e15d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -703,7 +703,7 @@ private long putObject() throws IOException { PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder(); Map optionHeaders = builder.putOptions.getHeaders(); - if (builder.isConditionalPutEnabled){ + if (builder.isConditionalPutEnabled) { maybeModifiedPutIfAbsentRequest.overrideConfiguration( override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index a4b90fd86ee89..5efa1ee3a1087 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -2173,7 +2173,8 @@ private FSDataOutputStream innerCreateFile( // put options are derived from the option builder. boolean conditionalCreate = options.isConditionalCreate(); final PutObjectOptions putOptions = - new PutObjectOptions(false, conditionalCreate, null, options.getHeaders()); + new PutObjectOptions(null, options.getHeaders(), false, null); + validateOutputStreamConfiguration(path, getConf()); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java index 1ebcb1c83ae91..bd0b58f480d6d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java @@ -79,7 +79,7 @@ public boolean aboutToComplete(String uploadId, PutObjectRequest originalDestPut = getWriter().createPutObjectRequest( getOriginalDestKey(), 0, - new PutObjectOptions(true,false, null, headers)); + new PutObjectOptions(null, headers, false, null)); upload(originalDestPut, EMPTY); // build the commit summary diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java index ca43b3b7f58bd..c25d9a623fc94 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java @@ -27,56 +27,56 @@ public final class PutObjectOptions { /** - * Can the PUT operation skip marker deletion? + * Storage class, if not null. */ - private final boolean keepMarkers; + private final String storageClass; /** - * Is this a conditional PUT operation + * Headers; may be null. */ - private final boolean conditionalPutEnabled; + private final Map headers; /** - * Storage class, if not null. + * Prevent overwriting an existing object? */ - private final String storageClass; + private final boolean noObjectOverwrite; /** - * Headers; may be null. + * If set, allows overwriting an object only if the object's ETag matches this value. */ - private final Map headers; + private final String etagOverwrite; /** * Constructor. * @param storageClass Storage class, if not null. - * @param conditionalPutEnabled Is this a conditional Put? + * @param noObjectOverwrite Prevent overwriting existing object? * @param headers Headers; may be null. */ public PutObjectOptions( - final boolean keepMarkers, - final boolean conditionalPutEnabled, @Nullable final String storageClass, - @Nullable final Map headers) { - this.keepMarkers = keepMarkers; - this.conditionalPutEnabled = conditionalPutEnabled; + @Nullable final Map headers, + final boolean noObjectOverwrite, + final String etagOverwrite) { + this.noObjectOverwrite = noObjectOverwrite; + this.etagOverwrite = etagOverwrite; this.storageClass = storageClass; this.headers = headers; } /** - * Get the marker retention flag. - * @return true if markers are to be retained. + * Get the noObjectOverwrite flag. + * @return true if object override not allowed. */ - public boolean isKeepMarkers() { - return keepMarkers; + public boolean isNoObjectOverwrite() { + return noObjectOverwrite; } /** - * Get the conditional put flag. - * @return true if it's a conditional put + * Get the ETag that must match for an overwrite operation to proceed. + * @return The ETag required for overwrite, or {@code null} if no ETag match is required. */ - public boolean isconditionalPutEnabled() { - return conditionalPutEnabled; + public String getEtagOverwrite() { + return etagOverwrite; } /** @@ -97,14 +97,9 @@ public String toString() { /** * Empty options. */ - private static final PutObjectOptions EMPTY_OPTIONS = new PutObjectOptions(false, false, - null, null); - private static final PutObjectOptions KEEP_DIRS = new PutObjectOptions(true, false, - null, null); - private static final PutObjectOptions DELETE_DIRS = new PutObjectOptions(false, false, - null, null); + private static final PutObjectOptions EMPTY_OPTIONS = new PutObjectOptions(null, null, false, null); - /** + /** * Get the default options. * @return an instance with no storage class or headers. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 69c8303dde7d1..f1de885d9b5fe 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -62,9 +62,10 @@ import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; -import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; +import static org.apache.hadoop.fs.s3a.Constants.IF_NONE_MATCH_STAR; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM; import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.util.Preconditions.checkArgument; import static org.apache.hadoop.util.Preconditions.checkNotNull; @@ -540,12 +541,12 @@ public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestB // a copy of the list is required, so that the AWS SDK doesn't // attempt to sort an unmodifiable list. CompleteMultipartUploadRequest.Builder requestBuilder; - Map optionHeaders = putOptions.getHeaders(); requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); - if (putOptions.isconditionalPutEnabled()) { + if (putOptions.isNoObjectOverwrite()) { requestBuilder.overrideConfiguration( - override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); + override -> override.putHeader(IF_NONE_MATCH, IF_NONE_MATCH_STAR)); + // TODO: add etag } return prepareRequest(requestBuilder); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java index 80935a459847e..db289fc5791f2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.s3a.RemoteFileChangedException; import org.apache.hadoop.fs.s3a.S3ATestUtils; -import org.apache.hadoop.io.IOUtils; import org.junit.Test; import software.amazon.awssdk.services.s3.model.S3Exception; @@ -43,6 +42,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_412_PRECONDITION_FAILED; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -50,8 +50,6 @@ public class ITestS3APutIfMatch extends AbstractS3ACostTest { - private Configuration conf; - @Override public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); @@ -70,7 +68,7 @@ public Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); - conf = createConfiguration(); + Configuration conf = getConfiguration(); skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_FILE_CREATE, "Skipping IfNoneMatch tests"); } @@ -102,12 +100,12 @@ private static void createFileWithIfNoneMatchFlag( FSDataOutputStreamBuilder builder = fs.createFile(path); builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, "true"); builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, ifMatchTag); - FSDataOutputStream stream = builder.create().build(); - if (data != null && data.length > 0) { - stream.write(data); + + try (FSDataOutputStream stream = builder.create().build()) { + if (data != null && data.length > 0) { + stream.write(data); + } } - stream.close(); - IOUtils.closeStream(stream); } @Test @@ -121,14 +119,13 @@ public void testPutIfAbsentConflict() throws Throwable { RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); - assertS3ExceptionStatusCode(412, firstException); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException); RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); - assertS3ExceptionStatusCode(412, secondException); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException); } - @Test public void testPutIfAbsentLargeFileConflict() throws Throwable { FileSystem fs = getFileSystem(); @@ -141,10 +138,10 @@ public void testPutIfAbsentLargeFileConflict() throws Throwable { RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); - assertS3ExceptionStatusCode(412, firstException); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException); RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); - assertS3ExceptionStatusCode(412, secondException); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index c0fa66cc74515..6f0c8bf2b6f5d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -167,7 +167,7 @@ private void createFactoryObjects(RequestFactory factory) throws IOException { String id = "1"; a(factory.newAbortMultipartUploadRequestBuilder(path, id)); a(factory.newCompleteMultipartUploadRequestBuilder(path, id, - new ArrayList<>(), new PutObjectOptions(false, false,"some class", Collections.emptyMap()))); + new ArrayList<>(), new PutObjectOptions("some class", Collections.emptyMap(), false, null))); a(factory.newCopyObjectRequestBuilder(path, path2, HeadObjectResponse.builder().build())); a(factory.newDeleteObjectRequestBuilder(path)); From 75544d0b9615683098f7822533c0a5cc4f7c8527 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 6 Feb 2025 12:15:59 +0000 Subject: [PATCH 11/12] HADOOP-19256. review of conditional write + hadoop common and s3a create file options wired up to s3a create file builder, and nowhere else. Change-Id: Ieaafa9ecdfd68306deb41ecf40c416374a310859 --- .../java/org/apache/hadoop/fs/Options.java | 51 ++++++ .../org/apache/hadoop/fs/s3a/Constants.java | 24 ++- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 21 ++- .../hadoop/fs/s3a/WriteOperationHelper.java | 2 +- .../hadoop/fs/s3a/api/RequestFactory.java | 4 +- .../hadoop/fs/s3a/impl/CreateFileBuilder.java | 173 +++++++++++++----- .../hadoop/fs/s3a/impl/InternalConstants.java | 15 +- .../fs/s3a/impl/RequestFactoryImpl.java | 19 +- .../fs/s3a/ITestS3AClientSideEncryption.java | 2 +- .../hadoop/fs/s3a/ITestS3AMiscOperations.java | 2 +- .../fs/s3a/impl/ITestS3APutIfMatch.java | 64 ++++++- .../fs/s3a/impl/TestRequestFactory.java | 6 +- .../scale/ITestS3ADirectoryPerformance.java | 2 +- 13 files changed, 312 insertions(+), 73 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java index f473e9427ba5d..9e1fdf0650283 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java @@ -710,4 +710,55 @@ private OpenFileOptions() { public static final String FS_OPTION_OPENFILE_EC_POLICY = FS_OPTION_OPENFILE + "ec.policy"; } + + /** + * The standard {@code createFile()} options. + *

+ * If an option is not supported during file creation and it is considered + * part of a commit protocol, then, when supplied in a must() option, + * it MUST be rejected. + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving + public interface CreateFileOptionKeys { + + /** + * {code createFile()} option to write a file iff there is nothing at the destination. + * This may happen during create() or in the close. + *

+ * Explicitly set {@link #FS_OPTION_CREATE_IN_CLOSE} if you want to force the end of file + * creation. + * + * Value {@value}. + *

+ * This can be set in the builder. + *

+ * It should be exported as a path capability for all stores where + * the feature is available *and* enabled. + */ + String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE = "fs.option.create.conditional.overwrite"; + + /** + * Overwrite a file only if there is an Etag match. This option takes a string. + * Value {@value}. + */ + String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG = + "fs.option.create.conditional.overwrite.etag"; + + /** + * String to define the content filetype. + * Value {@value}. + */ + String FS_OPTION_CREATE_CONTENT_TYPE = "fs.option.create.content.type"; + + /** + * A flag which requires the filesystem to create files/objects in close(), + * rather than create/createFile. + *

+ * Object stores with this behavior should also export it as a path capability. + * + * Value {@value}. + */ + String FS_OPTION_CREATE_IN_CLOSE = "fs.option.create.in.close"; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index b047547640e3e..4d79ba86806df 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1502,13 +1502,6 @@ private Constants() { */ public static final String FS_S3A_CREATE_PERFORMANCE = "fs.s3a.create.performance"; - /** - * Flag for commit if none match. - * This can be set in the {code createFile()} builder. - * Value {@value}. - */ - public static final String FS_S3A_CONDITIONAL_FILE_CREATE = "fs.s3a.conditional.file.create"; - /** * Default value for create performance in an S3A FS. * Value {@value}. @@ -1528,6 +1521,23 @@ private Constants() { */ public static final String FS_S3A_PERFORMANCE_FLAGS = "fs.s3a.performance.flags"; + + + /** + * Is the create overwrite feature enabled or not? + * A configuration option and a path status probe. + * Value {@value}. + */ + public static final String FS_S3A_CREATE_OVERWRITE_SUPPORTED = "fs.s3a.create.overwrite.supported"; + + /** + * Create a multipart file, always: {@value}. + *

+ * This is inefficient and will not work on a store which doesn't support that feature, + * so is primarily for testing. + */ + public static final String FS_S3A_CREATE_MULTIPART = "fs.s3a.create.multipart"; + /** * Prefix for adding a header to the object when created. * The actual value must have a "." suffix and then the actual header. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 5efa1ee3a1087..3b5188d00542f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -223,6 +223,10 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONTENT_TYPE; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_IN_CLOSE; import static org.apache.hadoop.fs.impl.FlagSet.buildFlagSet; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.s3a.Constants.*; @@ -2171,9 +2175,9 @@ private FSDataOutputStream innerCreateFile( String destKey = putTracker.getDestKey(); // put options are derived from the option builder. - boolean conditionalCreate = options.isConditionalCreate(); + boolean conditionalCreate = options.isConditionalOverwrite(); final PutObjectOptions putOptions = - new PutObjectOptions(null, options.getHeaders(), false, null); + new PutObjectOptions(null, options.getHeaders(), conditionalCreate, null); validateOutputStreamConfiguration(path, getConf()); @@ -3248,7 +3252,8 @@ private DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteRequest) public PutObjectRequest.Builder newPutObjectRequestBuilder(String key, long length, boolean isDirectoryMarker) { - return requestFactory.newPutObjectRequestBuilder(key, null, length, isDirectoryMarker); + return requestFactory.newPutObjectRequestBuilder(key, null, length, isDirectoryMarker, + PutObjectOptions.defaultOptions()); } /** @@ -5416,11 +5421,19 @@ public boolean hasPathCapability(final Path path, final String capability) case STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED: return isMultipartUploadEnabled(); - // create file options + // create file options which are always true + + case FS_OPTION_CREATE_IN_CLOSE: + case FS_OPTION_CREATE_CONTENT_TYPE: case FS_S3A_CREATE_PERFORMANCE: case FS_S3A_CREATE_HEADER: return true; + case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE: + case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG: + // TODO HADOOP-19256. conditional on enablement + return true; + // is the FS configured for create file performance case FS_S3A_CREATE_PERFORMANCE_ENABLED: return performanceFlags.enabled(PerformanceFlagEnum.Create); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 364f780863a01..df0e58635f9d7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -245,7 +245,7 @@ public PutObjectRequest createPutObjectRequest(String destKey, activateAuditSpan(); return getRequestFactory() - .newPutObjectRequestBuilder(destKey, options, length, false) + .newPutObjectRequestBuilder(destKey, options, length, false, options) .build(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index 294f0a08f9257..c74866c18c467 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -118,12 +118,14 @@ CopyObjectRequest.Builder newCopyObjectRequestBuilder(String srcKey, * @param options options for the request * @param length length of object to be uploaded * @param isDirectoryMarker true if object to be uploaded is a directory marker + * @param putOptions * @return the request builder */ PutObjectRequest.Builder newPutObjectRequestBuilder(String key, PutObjectOptions options, long length, - boolean isDirectoryMarker); + boolean isDirectoryMarker, + PutObjectOptions putOptions); /** * Create a {@link PutObjectRequest} request for creating diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java index fecc2459c8f54..6e0a82ef8433d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java @@ -21,9 +21,9 @@ import java.io.IOException; import java.util.EnumSet; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import org.apache.hadoop.conf.Configuration; @@ -33,10 +33,21 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; -import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.util.Progressable; +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONTENT_TYPE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CONTENT_TYPE; +import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.ExtractedCreateFileSwitch.ConditionalOverwrite; +import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.ExtractedCreateFileSwitch.ConditionalOverwriteEtag; +import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.ExtractedCreateFileSwitch.CreateMultipart; +import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.ExtractedCreateFileSwitch.Performance; +import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.ExtractedCreateFileSwitch.Recursive; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CREATE_FILE_KEYS; /** @@ -63,19 +74,25 @@ public class CreateFileBuilder extends * Classic create file option set: overwriting. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_OVERWRITE = - new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, false, null); + new CreateFileOptions(CREATE_OVERWRITE_FLAGS, + EnumSet.of(Recursive), + null, null); /** * Classic create file option set: no overwrite. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_NO_OVERWRITE = - new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, false, null); + new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, + EnumSet.of(Recursive), + null, null); /** * Performance create options. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_PERFORMANCE = - new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, false, null); + new CreateFileOptions(CREATE_OVERWRITE_FLAGS, + EnumSet.of(Performance,Recursive), + null, null); /** * Callback interface. @@ -109,26 +126,36 @@ public FSDataOutputStream build() throws IOException { final Configuration options = getOptions(); final Map headers = new HashMap<>(); final Set mandatoryKeys = getMandatoryKeys(); - final Set keysToValidate = new HashSet<>(); + final EnumSet createFileSwitches = EnumSet.noneOf( + ExtractedCreateFileSwitch.class); // pick up all headers from the mandatory list and strip them before // validating the keys + + // merge the config lists + String headerPrefix = FS_S3A_CREATE_HEADER + "."; final int prefixLen = headerPrefix.length(); - mandatoryKeys.stream().forEach(key -> { - if (key.startsWith(headerPrefix) && key.length() > prefixLen) { - headers.put(key.substring(prefixLen), options.get(key)); - } else { - keysToValidate.add(key); - } - }); + + final Set keysToValidate = mandatoryKeys.stream() + .filter(key -> !key.startsWith(headerPrefix)) + .collect(Collectors.toSet()); rejectUnknownMandatoryKeys(keysToValidate, CREATE_FILE_KEYS, "for " + path); - // and add any optional headers - getOptionalKeys().stream() - .filter(key -> key.startsWith(headerPrefix) && key.length() > prefixLen) - .forEach(key -> headers.put(key.substring(prefixLen), options.get(key))); + // look for headers + + for (Map.Entry option : options) { + String key = option.getKey(); + if (key.startsWith(headerPrefix) && key.length() > prefixLen) { + headers.put(key.substring(prefixLen), option.getValue()); + } + } + + // and add the mimetype + if (options.get(FS_OPTION_CREATE_CONTENT_TYPE, null) != null) { + headers.put(CONTENT_TYPE, options.get(FS_OPTION_CREATE_CONTENT_TYPE, null)); + } EnumSet flags = getFlags(); if (flags.contains(CreateFlag.APPEND)) { @@ -141,14 +168,32 @@ public FSDataOutputStream build() throws IOException { "Must specify either create or overwrite"); } - final boolean performance = - options.getBoolean(Constants.FS_S3A_CREATE_PERFORMANCE, false); - final boolean conditionalCreate = - options.getBoolean(Constants.FS_S3A_CONDITIONAL_FILE_CREATE, false); + // build the other switches + if (isRecursive()) { + createFileSwitches.add(Recursive); + } + if (Performance.isEnabled(options)) { + createFileSwitches.add(Performance); + } + if (CreateMultipart.isEnabled(options)) { + createFileSwitches.add(CreateMultipart); + } + if (ConditionalOverwrite.isEnabled(options)) { + createFileSwitches.add(ConditionalOverwrite); + } + // etag is a string so is checked for then extracted. + final String etag = options.get(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, null); + if (etag != null) { + createFileSwitches.add(ConditionalOverwriteEtag); + } + return callbacks.createFileFromBuilder( path, getProgress(), - new CreateFileOptions(flags, isRecursive(), performance, conditionalCreate, headers)); + new CreateFileOptions(flags, + createFileSwitches, + etag, + headers)); } /** @@ -209,19 +254,14 @@ public static final class CreateFileOptions { private final EnumSet flags; /** - * create parent dirs? + * Create File switches. */ - private final boolean recursive; + private final EnumSet createFileSwitches; /** - * performance flag. + * Etag. Only used if the create file switches enable it. */ - private final boolean performance; - - /** - * conditional flag. - */ - private final boolean conditionalCreate; + private final String etag; /** * Headers; may be null. @@ -230,21 +270,19 @@ public static final class CreateFileOptions { /** * @param flags creation flags - * @param recursive create parent dirs? - * @param performance performance flag - * @param conditionalCreate conditional flag * @param headers nullable header map. */ public CreateFileOptions( final EnumSet flags, - final boolean recursive, - final boolean performance, - final boolean conditionalCreate, + final EnumSet createFileSwitches, + final String etag, final Map headers) { - this.flags = flags; - this.recursive = recursive; - this.performance = performance; - this.conditionalCreate = conditionalCreate; + this.flags = requireNonNull(flags); + this.createFileSwitches = requireNonNull(createFileSwitches); + if (createFileSwitches().contains(ConditionalOverwriteEtag)) { + requireNonNull(etag); + } + this.etag = etag; this.headers = headers; } @@ -252,9 +290,7 @@ public CreateFileOptions( public String toString() { return "CreateFileOptions{" + "flags=" + flags + - ", recursive=" + recursive + - ", performance=" + performance + - ", conditionalCreate=" + conditionalCreate + + ", switches" + createFileSwitches + ", headers=" + headers + '}'; } @@ -264,20 +300,61 @@ public EnumSet getFlags() { } public boolean isRecursive() { - return recursive; + return isSet(Recursive); } public boolean isPerformance() { - return performance; + return isSet(Performance); + } + + public boolean isConditionalOverwrite() { + return isSet(ConditionalOverwrite); + } + + public boolean isConditionalOverwriteEtag() { + return isSet(ConditionalOverwriteEtag); } - public boolean isConditionalCreate() { - return conditionalCreate; + public boolean isSet(ExtractedCreateFileSwitch val) { + return createFileSwitches().contains(val); } public Map getHeaders() { return headers; } + + public String etag() { + return etag; + } + + public EnumSet createFileSwitches() { + return createFileSwitches; + } } + /** + * Create File switches extracted from create options. + */ + public enum ExtractedCreateFileSwitch { + CreateMultipart(FS_S3A_CREATE_MULTIPART), + ConditionalOverwrite(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE), + ConditionalOverwriteEtag(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG), + Performance(FS_S3A_CREATE_PERFORMANCE), + Recursive(""); + + private String key; + + ExtractedCreateFileSwitch(final String key) { + this.key = key; + } + + /** + * does the configuration contain this option as a boolean? + * @param options options to scan + * @return true if this is defined as a boolean + */ + boolean isEnabled(Configuration options) { + return options.getBoolean(key, false); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 6c9db6dfd6a9f..8cf435f7ca603 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -35,12 +35,16 @@ import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE; import static org.apache.hadoop.fs.CommonPathCapabilities.FS_CHECKSUMS; import static org.apache.hadoop.fs.CommonPathCapabilities.FS_MULTIPART_UPLOADER; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONTENT_TYPE; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_IN_CLOSE; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESS_GRANTS_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE; import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; -import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2; @@ -261,7 +265,14 @@ private InternalConstants() { */ public static final Set CREATE_FILE_KEYS = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE, FS_S3A_CONDITIONAL_FILE_CREATE))); + new HashSet<>(Arrays.asList( + FS_OPTION_CREATE_CONDITIONAL_OVERWRITE, + FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, + FS_OPTION_CREATE_IN_CLOSE, + FS_OPTION_CREATE_CONTENT_TYPE, + FS_S3A_CREATE_PERFORMANCE, + FS_S3A_CREATE_MULTIPART + ))); /** * Dynamic Path capabilities to be evaluated diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index f1de885d9b5fe..2eb0999a461dd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -60,6 +60,7 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; +import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.IF_NONE_MATCH_STAR; @@ -333,13 +334,14 @@ protected void copyEncryptionParameters(HeadObjectResponse srcom, * @param options options for the request, including headers * @param length length of object to be uploaded * @param isDirectoryMarker true if object to be uploaded is a directory marker + * @param putOptions * @return the request builder */ @Override public PutObjectRequest.Builder newPutObjectRequestBuilder(String key, final PutObjectOptions options, long length, - boolean isDirectoryMarker) { + boolean isDirectoryMarker, final PutObjectOptions putOptions) { Preconditions.checkArgument(isNotEmpty(key), "Null/empty key"); @@ -362,6 +364,17 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key, setRequestTimeout(putObjectRequestBuilder, partUploadTimeout); } + if (putOptions.isNoObjectOverwrite()) { + LOG.debug("setting if none-match"); + putObjectRequestBuilder.overrideConfiguration( + override -> override.putHeader(IF_NONE_MATCH, IF_NONE_MATCH_STAR)); + } + if (!isEmpty(putOptions.getEtagOverwrite())) { + // TODO: add etag + LOG.warn("etag match not yet supported"); + } + + return prepareRequest(putObjectRequestBuilder); } @@ -544,9 +557,13 @@ public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestB requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); if (putOptions.isNoObjectOverwrite()) { + LOG.debug("setting if none-match"); requestBuilder.overrideConfiguration( override -> override.putHeader(IF_NONE_MATCH, IF_NONE_MATCH_STAR)); + } + if (!isEmpty(putOptions.getEtagOverwrite())) { // TODO: add etag + LOG.warn("etag match not yet supported"); } return prepareRequest(requestBuilder); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java index 508e1a38356ec..c013f40abaa6a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java @@ -332,7 +332,7 @@ public void testSizeOfEncryptedObjectFromHeaderWithV1Compatibility() throws Exce .build(); PutObjectRequest.Builder putObjectRequestBuilder = factory.newPutObjectRequestBuilder(key, - null, SMALL_FILE_SIZE, false); + null, SMALL_FILE_SIZE, false, null); putObjectRequestBuilder.contentLength(Long.parseLong(String.valueOf(SMALL_FILE_SIZE))); putObjectRequestBuilder.metadata(metadata); fs.putObjectDirect(putObjectRequestBuilder.build(), diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java index ecda6fd2acee6..abdf08a75c5b7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java @@ -107,7 +107,7 @@ public void testPutObjectDirect() throws Throwable { .build(); Path path = path("putDirect"); PutObjectRequest.Builder putObjectRequestBuilder = - factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false); + factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false, null); putObjectRequestBuilder.contentLength(-1L); LambdaTestUtils.intercept(IllegalStateException.class, () -> fs.putObjectDirect( diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java index db289fc5791f2..77a8a7be2c534 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java @@ -34,8 +34,9 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; -import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE; +import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_OVERWRITE_SUPPORTED; import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; @@ -53,6 +54,7 @@ public class ITestS3APutIfMatch extends AbstractS3ACostTest { @Override public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); removeBaseAndBucketOverrides( conf, @@ -69,8 +71,10 @@ public Configuration createConfiguration() { public void setup() throws Exception { super.setup(); Configuration conf = getConfiguration(); - skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_FILE_CREATE, + skipIfNotEnabled(conf, FS_S3A_CREATE_OVERWRITE_SUPPORTED, "Skipping IfNoneMatch tests"); + + // for large files, skip if filesystem.hasPathCapability() does not support multipart } private static void assertS3ExceptionStatusCode(int code, Exception ex) { @@ -98,7 +102,7 @@ private static void createFileWithIfNoneMatchFlag( byte[] data, String ifMatchTag) throws Exception { FSDataOutputStreamBuilder builder = fs.createFile(path); - builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, "true"); + builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE, "true"); builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, ifMatchTag); try (FSDataOutputStream stream = builder.create().build()) { @@ -110,20 +114,26 @@ private static void createFileWithIfNoneMatchFlag( @Test public void testPutIfAbsentConflict() throws Throwable { + describe("generate conflict on overwrites"); FileSystem fs = getFileSystem(); Path testFile = methodPath(); fs.mkdirs(testFile.getParent()); byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255); + // create a file over an empty path: all good createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + // attempted overwrite fails RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException); + // second attempt also fails RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException); + + // TODO: delete file and verify an overwrite works again } @Test @@ -131,6 +141,7 @@ public void testPutIfAbsentLargeFileConflict() throws Throwable { FileSystem fs = getFileSystem(); Path testFile = methodPath(); + // enough bytes for Multipart Upload byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a'); @@ -143,5 +154,52 @@ public void testPutIfAbsentLargeFileConflict() throws Throwable { RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException); + } + + @Test + public void testMultipartFileWithRaceCondition() throws Throwable { + /* + - f1 = createFile() no overwrite, but do not close it + - f2 = create small file, write. close file + - close (f1), expect different exception back. we will need to map this to RemoteFileChangedException + */ + } + + @Test + public void testTwoMultipartFileWithRaceCondition() throws Throwable { +/* + - f1 = createFile() no overwrite, but do not close it + - f2 = create multipart file, write. + - close (f1), expect different exception back. we will need to map this to RemoteFileChangedException + - close (f2) expect a failure? + */ + } + + @Test + public void testOverwriteWithEmptyFile() throws Throwable { + /* + - create a non empty file + -overwrite with zero byte file: expect an error + */ + } + + + @Test + public void testOverwriteEmptyFileWithFile() throws Throwable { + /* + - create an empty file + -overwrite: expect an error + */ + } + + @Test + public void testOverwriteEmptyWithEmptyFile() throws Throwable { + /* + - create an empty file + - overwrite with zero byte file: expect an error + */ + } + + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index 6f0c8bf2b6f5d..1e7a0ce8a69ff 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -96,7 +96,7 @@ public void testRequestFactoryWithCannedACL() throws Throwable { String path2 = "path2"; HeadObjectResponse md = HeadObjectResponse.builder().contentLength(128L).build(); - Assertions.assertThat(factory.newPutObjectRequestBuilder(path, null, 128, false) + Assertions.assertThat(factory.newPutObjectRequestBuilder(path, null, 128, false, null) .build() .acl() .toString()) @@ -180,7 +180,7 @@ private void createFactoryObjects(RequestFactory factory) throws IOException { a(factory.newListObjectsV2RequestBuilder(path, "/", 1)); a(factory.newMultipartUploadRequestBuilder(path, null)); a(factory.newPutObjectRequestBuilder(path, - PutObjectOptions.defaultOptions(), -1, true)); + PutObjectOptions.defaultOptions(), -1, true, null)); } /** @@ -264,7 +264,7 @@ public void testUploadTimeouts() throws Throwable { // A simple PUT final PutObjectRequest put = factory.newPutObjectRequestBuilder(path, - PutObjectOptions.defaultOptions(), 1024, false).build(); + PutObjectOptions.defaultOptions(), 1024, false, null).build(); assertApiTimeouts(partDuration, put); // multipart part diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java index 8addbbe304959..743312ed706fc 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java @@ -250,7 +250,7 @@ public void testMultiPagesListingPerformanceAndCorrectness() originalListOfFiles.add(file.toString()); PutObjectRequest.Builder putObjectRequestBuilder = requestFactory .newPutObjectRequestBuilder(fs.pathToKey(file), - null, 0, false); + null, 0, false, null); futures.add(submit(executorService, () -> writeOperationHelper.putObject(putObjectRequestBuilder.build(), PutObjectOptions.defaultOptions(), From 26811e69a3212941067479a45daec69fc9ccf85a Mon Sep 17 00:00:00 2001 From: Saikat Roy Date: Fri, 7 Feb 2025 17:05:38 +0530 Subject: [PATCH 12/12] HADOOP-19256: add tests --- .../fs/s3a/impl/ITestS3APutIfMatch.java | 195 +++++++++++------- 1 file changed, 126 insertions(+), 69 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java index 77a8a7be2c534..8376206db80c1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java @@ -27,30 +27,35 @@ import org.apache.hadoop.fs.s3a.RemoteFileChangedException; import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.junit.Assume; import org.junit.Test; import software.amazon.awssdk.services.s3.model.S3Exception; -import java.io.IOException; - import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_OVERWRITE_SUPPORTED; +import static org.apache.hadoop.fs.s3a.Constants.IF_NONE_MATCH_STAR; import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; -import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_412_PRECONDITION_FAILED; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; -import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; +import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1KB; import static org.apache.hadoop.test.LambdaTestUtils.intercept; public class ITestS3APutIfMatch extends AbstractS3ACostTest { + private static final int UPDATED_MULTIPART_THRESHOLD = 100 * _1KB; + + private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 255); + private static final byte[] MULTIPART_FILE_BYTES = dataset(UPDATED_MULTIPART_THRESHOLD * 5, 'a', 'z' - 'a'); + @Override public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); @@ -62,8 +67,8 @@ public Configuration createConfiguration() { UPLOAD_PART_COUNT_LIMIT, MIN_MULTIPART_THRESHOLD); conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2); - conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); - conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); + conf.setLong(MIN_MULTIPART_THRESHOLD, UPDATED_MULTIPART_THRESHOLD); + conf.setInt(MULTIPART_SIZE, UPDATED_MULTIPART_THRESHOLD); return conf; } @@ -73,8 +78,6 @@ public void setup() throws Exception { Configuration conf = getConfiguration(); skipIfNotEnabled(conf, FS_S3A_CREATE_OVERWRITE_SUPPORTED, "Skipping IfNoneMatch tests"); - - // for large files, skip if filesystem.hasPathCapability() does not support multipart } private static void assertS3ExceptionStatusCode(int code, Exception ex) { @@ -90,50 +93,60 @@ protected String getBlockOutputBufferName() { } /** - * Create a file using the PutIfMatch feature from S3 + * Create a file using the If-None-Match feature from S3 * @param fs filesystem * @param path path to write * @param data source dataset. Can be null - * @throws IOException on any problem + * @throws Exception on any problem */ private static void createFileWithIfNoneMatchFlag( FileSystem fs, Path path, - byte[] data, - String ifMatchTag) throws Exception { + byte[] data) throws Exception { + FSDataOutputStream stream = getStreamWithIfNoneMatchFlag(fs, path); + if (data != null && data.length > 0) { + stream.write(data); + } + stream.close(); + } + + /** + * Creates an {@link FSDataOutputStream} for writing a file with an If-None-Match + * @param fs filesystem + * @param path path to write + */ + private static FSDataOutputStream getStreamWithIfNoneMatchFlag( + FileSystem fs, + Path path) throws Exception { FSDataOutputStreamBuilder builder = fs.createFile(path); builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE, "true"); - builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, ifMatchTag); - - try (FSDataOutputStream stream = builder.create().build()) { - if (data != null && data.length > 0) { - stream.write(data); - } - } + builder.opt(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, IF_NONE_MATCH_STAR); + return builder.create().build(); } @Test public void testPutIfAbsentConflict() throws Throwable { - describe("generate conflict on overwrites"); + describe("generate conflict on overwrites"); FileSystem fs = getFileSystem(); Path testFile = methodPath(); fs.mkdirs(testFile.getParent()); - byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255); // create a file over an empty path: all good - createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + createFileWithIfNoneMatchFlag(fs, testFile, SMALL_FILE_BYTES); // attempted overwrite fails RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, - () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + () -> createFileWithIfNoneMatchFlag(fs, testFile, SMALL_FILE_BYTES)); assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException); // second attempt also fails RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, - () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + () -> createFileWithIfNoneMatchFlag(fs, testFile, SMALL_FILE_BYTES)); assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException); - // TODO: delete file and verify an overwrite works again + // Delete file and verify an overwrite works again + fs.delete(testFile, false); + createFileWithIfNoneMatchFlag(fs, testFile, SMALL_FILE_BYTES); } @Test @@ -141,65 +154,109 @@ public void testPutIfAbsentLargeFileConflict() throws Throwable { FileSystem fs = getFileSystem(); Path testFile = methodPath(); + // Skip if multipart upload not supported + Assume.assumeTrue("Skipping as multipart upload not supported", + fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED)); - // enough bytes for Multipart Upload - byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a'); - - createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + createFileWithIfNoneMatchFlag(fs, testFile, MULTIPART_FILE_BYTES); RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, - () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + () -> createFileWithIfNoneMatchFlag(fs, testFile, MULTIPART_FILE_BYTES)); assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException); RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, - () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + () -> createFileWithIfNoneMatchFlag(fs, testFile, MULTIPART_FILE_BYTES)); assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException); + } + + @Test + public void testMultipartFileWithRaceCondition() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + + // Skip test if multipart uploads are not supported + Assume.assumeTrue("Skipping test as multipart uploads are not supported", + fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED)); + // Create a file with multipart upload but do not close the stream + FSDataOutputStream stream = getStreamWithIfNoneMatchFlag(fs, testFile); + stream.write(MULTIPART_FILE_BYTES); + + // create and close another small file in parallel + createFileWithIfNoneMatchFlag(fs, testFile, SMALL_FILE_BYTES); + + // Closing the first stream should throw RemoteFileChangedException + RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); } - @Test - public void testMultipartFileWithRaceCondition() throws Throwable { - /* - - f1 = createFile() no overwrite, but do not close it - - f2 = create small file, write. close file - - close (f1), expect different exception back. we will need to map this to RemoteFileChangedException - */ - } + @Test + public void testTwoMultipartFileWithRaceCondition() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); - @Test - public void testTwoMultipartFileWithRaceCondition() throws Throwable { -/* - - f1 = createFile() no overwrite, but do not close it - - f2 = create multipart file, write. - - close (f1), expect different exception back. we will need to map this to RemoteFileChangedException - - close (f2) expect a failure? - */ - } + // Skip test if multipart uploads are not supported + Assume.assumeTrue("Skipping test as multipart uploads are not supported", + fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED)); - @Test - public void testOverwriteWithEmptyFile() throws Throwable { - /* - - create a non empty file - -overwrite with zero byte file: expect an error - */ - } + // Create a file with multipart upload but do not close the stream + FSDataOutputStream stream = getStreamWithIfNoneMatchFlag(fs, testFile); + stream.write(MULTIPART_FILE_BYTES); + // create and close another multipart file in parallel + createFileWithIfNoneMatchFlag(fs, testFile, MULTIPART_FILE_BYTES); - @Test - public void testOverwriteEmptyFileWithFile() throws Throwable { - /* - - create an empty file - -overwrite: expect an error - */ - } + // Closing the first stream should throw RemoteFileChangedException + RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + } - @Test - public void testOverwriteEmptyWithEmptyFile() throws Throwable { - /* - - create an empty file - - overwrite with zero byte file: expect an error - */ - } + @Test + public void testOverwriteWithEmptyFile() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + fs.mkdirs(testFile.getParent()); + // create a non-empty file + createFileWithIfNoneMatchFlag(fs, testFile, SMALL_FILE_BYTES); + // overwrite with zero-byte file (no write) + FSDataOutputStream stream = getStreamWithIfNoneMatchFlag(fs, testFile); + + // close the stream, should throw RemoteFileChangedException + RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + } + + @Test + public void testOverwriteEmptyFileWithFile() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + fs.mkdirs(testFile.getParent()); + + // create an empty file (no write) + FSDataOutputStream stream = getStreamWithIfNoneMatchFlag(fs, testFile); + stream.close(); + + // overwrite with non-empty file, should throw RemoteFileChangedException + RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, SMALL_FILE_BYTES)); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + } + + @Test + public void testOverwriteEmptyWithEmptyFile() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + fs.mkdirs(testFile.getParent()); + + // create an empty file (no write) + FSDataOutputStream stream1 = getStreamWithIfNoneMatchFlag(fs, testFile); + stream1.close(); + + // overwrite with another empty file, should throw RemoteFileChangedException + FSDataOutputStream stream2 = getStreamWithIfNoneMatchFlag(fs, testFile); + RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream2::close); + assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + } }