Skip to content

Commit

Permalink
HADOOP-19348. Add initial support for Analytics Accelerator Library f…
Browse files Browse the repository at this point in the history
…or Amazon S3 (#7192)
  • Loading branch information
fuatbasik authored and ahmarsuhail committed Jan 24, 2025
1 parent 053afb7 commit e18d0a4
Show file tree
Hide file tree
Showing 33 changed files with 620 additions and 32 deletions.
11 changes: 11 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,17 @@
<artifactId>amazon-s3-encryption-client-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>software.amazon.s3.analyticsaccelerator</groupId>
<artifactId>analyticsaccelerator-s3</artifactId>
<version>0.0.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
<version>0.29.10</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1780,4 +1780,38 @@ private Constants() {
* Value: {@value}.
*/
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";


/**
* Prefix to configure Analytics Accelerator Library.
*/
public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
"fs.s3a.analytics.accelerator";

/**
* Config to enable Analytics Accelerator Library for Amazon S3.
* https://github.com/awslabs/analytics-accelerator-s3
*/
public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY =
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + ".enabled";

/**
* Config to enable usage of crt client with Analytics Accelerator Library.
* It is by default true.
*/
public static final String ANALYTICS_ACCELERATOR_CRT_ENABLED =
"fs.s3a.analytics.accelerator.crt.client";

/**
* Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY }
* Value {@value}.
*/
public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false;

/**
* Default value for {@link #ANALYTICS_ACCELERATOR_CRT_ENABLED }
* Value {@value}.
*/
public static final boolean ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT = true;

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
Expand Down Expand Up @@ -86,6 +88,11 @@
import software.amazon.awssdk.transfer.s3.model.Copy;
import software.amazon.awssdk.transfer.s3.model.CopyRequest;

import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;

import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -313,6 +320,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private S3Client s3Client;

/**
* CRT-Based S3Client created of analytics accelerator library is enabled
* and managed by the S3AStoreImpl. Analytics accelerator library can be
* enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY}
*/
private S3AsyncClient s3AsyncClient;

// initial callback policy is fail-once; it's there just to assist
// some mock tests and other codepaths trying to call the low level
// APIs on an uninitialized filesystem.
Expand Down Expand Up @@ -340,6 +354,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
// If true, the prefetching input stream is used for reads.
private boolean prefetchEnabled;

// If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 will be used.
private boolean analyticsAcceleratorEnabled;

private boolean analyticsAcceleratorCRTEnabled;

// Size in bytes of a single prefetch block.
private int prefetchBlockSize;

Expand Down Expand Up @@ -515,6 +534,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private boolean s3AccessGrantsEnabled;

/**
* Factory to create S3SeekableInputStream if {@link this#analyticsAcceleratorEnabled} is true.
*/
private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;

/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
Expand Down Expand Up @@ -670,8 +694,21 @@ public void initialize(URI name, Configuration originalConf)
this.prefetchBlockSize = (int) prefetchBlockSizeLong;
this.prefetchBlockCount =
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);

this.analyticsAcceleratorEnabled =
conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
this.analyticsAcceleratorCRTEnabled =
conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);

this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
DEFAULT_MULTIPART_UPLOAD_ENABLED);

if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) {
// Temp change: Analytics Accelerator with S3AsyncClient do not support Multi-part upload.
this.isMultipartUploadEnabled = false;
}

// multipart copy and upload are the same; this just makes it explicit
this.isMultipartCopyEnabled = isMultipartUploadEnabled;

Expand Down Expand Up @@ -794,6 +831,27 @@ public void initialize(URI name, Configuration originalConf)
// directly through the client manager.
// this is to aid mocking.
s3Client = store.getOrCreateS3Client();

if (this.analyticsAcceleratorEnabled) {
LOG.info("Using S3SeekableInputStream");
if(this.analyticsAcceleratorCRTEnabled) {
LOG.info("Using S3 CRT client for analytics accelerator S3");
this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
} else {
LOG.info("Using S3 async client for analytics accelerator S3");
this.s3AsyncClient = store.getOrCreateAsyncClient();
}

ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
this.s3SeekableInputStreamFactory =
new S3SeekableInputStreamFactory(
new S3SdkObjectClient(this.s3AsyncClient),
seekableInputStreamConfiguration);
}

// The filesystem is now ready to perform operations against
// S3
// This initiates a probe against S3 for the bucket existing.
Expand Down Expand Up @@ -1861,6 +1919,8 @@ private FSDataInputStream executeOpen(
final Path path,
final OpenFileSupport.OpenFileInformation fileInformation)
throws IOException {


// create the input stream statistics before opening
// the file so that the time to prepare to open the file is included.
S3AInputStreamStatistics inputStreamStats =
Expand All @@ -1877,6 +1937,14 @@ private FSDataInputStream executeOpen(
fileInformation.applyOptions(readContext);
LOG.debug("Opening '{}'", readContext);

if (this.analyticsAcceleratorEnabled) {
return new FSDataInputStream(
new S3ASeekableStream(
this.bucket,
pathToKey(path),
s3SeekableInputStreamFactory));
}

if (this.prefetchEnabled) {
Configuration configuration = getConf();
initLocalDirAllocatorIfNotInitialized(configuration);
Expand Down Expand Up @@ -4354,9 +4422,11 @@ public void close() throws IOException {
protected synchronized void stopAllServices() {
try {
trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> {
closeAutocloseables(LOG, store);
closeAutocloseables(LOG, store, s3SeekableInputStreamFactory);
store = null;
s3Client = null;
s3AsyncClient = null;
s3SeekableInputStreamFactory = null;

// At this point the S3A client is shut down,
// now the executor pools are closed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hadoop.fs.s3a;

import java.io.EOFException;
import java.io.IOException;

import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.FSInputStream;

import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

public class S3ASeekableStream extends FSInputStream implements StreamCapabilities {

private S3SeekableInputStream inputStream;
private long lastReadCurrentPos = 0;
private final String key;
private volatile boolean closed;

public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableStream.class);

public S3ASeekableStream(String bucket, String key,
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
this.key = key;
}

/**
* Indicates whether the given {@code capability} is supported by this stream.
*
* @param capability the capability to check.
* @return true if the given {@code capability} is supported by this stream, false otherwise.
*/
@Override
public boolean hasCapability(String capability) {
return false;
}

@Override
public int read() throws IOException {
throwIfClosed();
int bytesRead;
try {
bytesRead = inputStream.read();
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}
return bytesRead;
}

@Override
public void seek(long pos) throws IOException {
throwIfClosed();
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+ " " + pos);
}
inputStream.seek(pos);
}


@Override
public synchronized long getPos() {
if (!closed) {
lastReadCurrentPos = inputStream.getPos();
}
return lastReadCurrentPos;
}


/**
* Reads the last n bytes from the stream into a byte buffer. Blocks until end of stream is
* reached. Leaves the position of the stream unaltered.
*
* @param buf buffer to read data into
* @param off start position in buffer at which data is written
* @param len the number of bytes to read; the n-th byte should be the last byte of the stream.
* @return the total number of bytes read into the buffer
* @throws IOException if an I/O error occurs
*/
public int readTail(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
int bytesRead;
try {
bytesRead = inputStream.readTail(buf, off, len);
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}
return bytesRead;
}

@Override
public int read(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
int bytesRead;
try {
bytesRead = inputStream.read(buf, off, len);
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}
return bytesRead;
}


@Override
public boolean seekToNewSource(long l) throws IOException {
return false;
}

@Override
public int available() throws IOException {
throwIfClosed();
return super.available();
}

@Override
public synchronized void close() throws IOException {
if(!closed) {
closed = true;
try {
inputStream.close();
inputStream = null;
super.close();
} catch (IOException ioe) {
LOG.debug("Failure closing stream {}: ", key);
throw ioe;
}
}
}

/**
* Close the stream on read failure.
* No attempt to recover from failure
*
* @param ioe exception caught.
*/
@Retries.OnceTranslated
private void onReadFailure(IOException ioe) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Got exception while trying to read from stream {}, " +
"not trying to recover:",
key, ioe);
} else {
LOG.info("Got exception while trying to read from stream {}, " +
"not trying to recover:",
key, ioe);
}
this.close();
}


protected void throwIfClosed() throws IOException {
if (closed) {
throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
}
}
}
Loading

0 comments on commit e18d0a4

Please sign in to comment.