Skip to content

Commit

Permalink
Integrate analytics-accelerator with factory (#7332)
Browse files Browse the repository at this point in the history
  • Loading branch information
rajdchak authored Jan 28, 2025
1 parent 4601be6 commit 71ff480
Show file tree
Hide file tree
Showing 19 changed files with 160 additions and 164 deletions.
5 changes: 0 additions & 5 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -478,11 +478,6 @@
<version>0.0.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
<version>0.29.10</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1818,30 +1818,4 @@ private Constants() {
public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
"fs.s3a.analytics.accelerator";

/**
* Config to enable Analytics Accelerator Library for Amazon S3.
* https://github.com/awslabs/analytics-accelerator-s3
*/
public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY =
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + ".enabled";

/**
* Config to enable usage of crt client with Analytics Accelerator Library.
* It is by default true.
*/
public static final String ANALYTICS_ACCELERATOR_CRT_ENABLED =
"fs.s3a.analytics.accelerator.crt.client";

/**
* Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY }
* Value {@value}.
*/
public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false;

/**
* Default value for {@link #ANALYTICS_ACCELERATOR_CRT_ENABLED }
* Value {@value}.
*/
public static final boolean ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT = true;

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
Expand Down Expand Up @@ -85,11 +84,6 @@
import software.amazon.awssdk.transfer.s3.model.Copy;
import software.amazon.awssdk.transfer.s3.model.CopyRequest;

import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;

import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -316,13 +310,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private S3Client s3Client;

/**
* CRT-Based S3Client created of analytics accelerator library is enabled
* and managed by the S3AStoreImpl. Analytics accelerator library can be
* enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY}
*/
private S3AsyncClient s3AsyncClient;

// initial callback policy is fail-once; it's there just to assist
// some mock tests and other codepaths trying to call the low level
// APIs on an uninitialized filesystem.
Expand Down Expand Up @@ -352,8 +339,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
// If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 will be used.
private boolean analyticsAcceleratorEnabled;

private boolean analyticsAcceleratorCRTEnabled;

private int executorCapacity;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
Expand Down Expand Up @@ -522,11 +507,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private boolean s3AccessGrantsEnabled;

/**
* Factory to create S3SeekableInputStream if {@link this#analyticsAcceleratorEnabled} is true.
*/
private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;

/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
Expand Down Expand Up @@ -673,16 +653,12 @@ public void initialize(URI name, Configuration originalConf)
dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS,
s3ExpressStore);

this.analyticsAcceleratorEnabled =
conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
this.analyticsAcceleratorCRTEnabled =
conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
this.analyticsAcceleratorEnabled = conf.getEnum(INPUT_STREAM_TYPE, InputStreamType.DEFAULT_STREAM_TYPE) == InputStreamType.Analytics;

this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);

if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) {
if(this.analyticsAcceleratorEnabled) {
// Temp change: Analytics Accelerator with S3AsyncClient do not support Multi-part upload.
this.isMultipartUploadEnabled = false;
}
Expand Down Expand Up @@ -803,27 +779,6 @@ public void initialize(URI name, Configuration originalConf)
int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
// now create and initialize the store
store = createS3AStore(clientManager, rateLimitCapacity);

if (this.analyticsAcceleratorEnabled) {
LOG.info("Using S3SeekableInputStream");
if(this.analyticsAcceleratorCRTEnabled) {
LOG.info("Using S3 CRT client for analytics accelerator S3");
this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
} else {
LOG.info("Using S3 async client for analytics accelerator S3");
this.s3AsyncClient = store.getOrCreateAsyncClient();
}

ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
this.s3SeekableInputStreamFactory =
new S3SeekableInputStreamFactory(
new S3SdkObjectClient(this.s3AsyncClient),
seekableInputStreamConfiguration);
}

// the s3 client is created through the store, rather than
// directly through the client manager.
// this is to aid mocking.
Expand Down Expand Up @@ -1909,7 +1864,7 @@ private FSDataInputStream executeOpen(
final S3AFileStatus fileStatus =
trackDuration(inputStreamStats,
ACTION_FILE_OPENED.getSymbol(), () ->
extractOrFetchSimpleFileStatus(path, fileInformation));
extractOrFetchSimpleFileStatus(path, fileInformation));
S3AReadOpContext readContext = createReadContext(
fileStatus,
auditSpan);
Expand All @@ -1933,15 +1888,15 @@ private FSDataInputStream executeOpen(
true,
inputStreamStats);

// do not validate() the parameters as the store
// do not validate() the parameters as the store
// completes this.
ObjectReadParameters parameters = new ObjectReadParameters()
.withBoundedThreadPool(pool)
.withCallbacks(createInputStreamCallbacks(auditSpan))
.withContext(readContext.build())
.withObjectAttributes(createObjectAttributes(path, fileStatus))
.withStreamStatistics(inputStreamStats);
return new FSDataInputStream(getStore().readObject(parameters));
return new FSDataInputStream(getStore().readObject(parameters));

}

Expand All @@ -1954,6 +1909,7 @@ private ObjectInputStreamCallbacks createInputStreamCallbacks(
return new InputStreamCallbacksImpl(auditSpan, getStore(), fsHandler, unboundedThreadPool);
}


/**
* Callbacks for WriteOperationHelper.
*/
Expand Down Expand Up @@ -4238,7 +4194,7 @@ PutObjectResponse executePut(
throws IOException {
String key = putObjectRequest.key();
ProgressableProgressListener listener =
new ProgressableProgressListener(store, key, progress);
new ProgressableProgressListener(getStore(), key, progress);
UploadInfo info = putObject(putObjectRequest, file, listener);
PutObjectResponse result = getStore().waitForUploadCompletion(key, info).response();
listener.uploadCompleted(info.getFileUpload());
Expand Down Expand Up @@ -4338,8 +4294,6 @@ protected synchronized void stopAllServices() {
closeAutocloseables(LOG, getStore());
store = null;
s3Client = null;
s3AsyncClient = null;
s3SeekableInputStreamFactory = null;

// At this point the S3A client is shut down,
// now the executor pools are closed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@
import org.apache.hadoop.util.functional.Tuples;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.extractException;
import static org.apache.hadoop.fs.s3a.S3AUtils.getPutRequestLength;
import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
Expand Down Expand Up @@ -947,7 +946,7 @@ public File createTemporaryFileForWriting(String pathStr,
* All stream factory initialization required after {@code Service.init()},
* after all other services have themselves been initialized.
*/
private void finishStreamFactoryInit() {
private void finishStreamFactoryInit() throws Exception {
// must be on be invoked during service initialization
Preconditions.checkState(isInState(STATE.INITED),
"Store is in wrong state: %s", getServiceState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected AbstractObjectInputStreamFactory(final String name) {
* @param factoryCallbacks callbacks needed by the factories.
*/
@Override
public void bind(final StreamFactoryCallbacks factoryCallbacks) {
public void bind(final StreamFactoryCallbacks factoryCallbacks) throws Exception {
// must be on be invoked during service initialization
Preconditions.checkState(isInState(STATE.INITED),
"Input Stream factory %s is in wrong state: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,34 @@
* under the License.
*/

package org.apache.hadoop.fs.s3a;
package org.apache.hadoop.fs.s3a.impl.streams;

import java.io.EOFException;
import java.io.IOException;

import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.FSInputStream;

import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

public class S3ASeekableStream extends FSInputStream implements StreamCapabilities {
public class AnalyticsStream extends ObjectInputStream implements StreamCapabilities {

private S3SeekableInputStream inputStream;
private long lastReadCurrentPos = 0;
private final String key;
private volatile boolean closed;

public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableStream.class);
public static final Logger LOG = LoggerFactory.getLogger(AnalyticsStream.class);

public S3ASeekableStream(String bucket, String key,
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
this.key = key;
public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
super(parameters);
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()));
}

/**
Expand Down Expand Up @@ -139,6 +138,24 @@ public int available() throws IOException {
return super.available();
}

@Override
protected boolean isStreamOpen() {
return !isClosed();
}

protected boolean isClosed() {
return inputStream == null;
}

@Override
protected void abortInFinalizer() {
try {
close();
} catch (IOException ignored) {

}
}

@Override
public synchronized void close() throws IOException {
if(!closed) {
Expand All @@ -148,7 +165,7 @@ public synchronized void close() throws IOException {
inputStream = null;
super.close();
} catch (IOException ioe) {
LOG.debug("Failure closing stream {}: ", key);
LOG.debug("Failure closing stream {}: ", getKey());
throw ioe;
}
}
Expand All @@ -165,19 +182,19 @@ private void onReadFailure(IOException ioe) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Got exception while trying to read from stream {}, " +
"not trying to recover:",
key, ioe);
getKey(), ioe);
} else {
LOG.info("Got exception while trying to read from stream {}, " +
"not trying to recover:",
key, ioe);
getKey(), ioe);
}
this.close();
}


protected void throwIfClosed() throws IOException {
if (closed) {
throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
}
}
}
Loading

0 comments on commit 71ff480

Please sign in to comment.