Skip to content

Commit

Permalink
HADOOP-19354. Support stream factory callbacks
Browse files Browse the repository at this point in the history
- Add callbacks from stream factories to creator.
- Initial operation is to ask for an async client.
- Callbacks and wiring up done in S3AStoreImpl.

Change-Id: I544f05da15e3b57e9a538d337b972e4e07dc8877
  • Loading branch information
steveloughran committed Jan 27, 2025
1 parent 67b14ea commit 537805c
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.RateLimiting;
import org.apache.hadoop.util.functional.Tuples;

Expand Down Expand Up @@ -230,22 +231,28 @@ public class S3AStoreImpl
@Override
protected void serviceInit(final Configuration conf) throws Exception {

// create and register the stream factory, which will
// then follow the service lifecycle
objectInputStreamFactory = createStreamFactory(conf);
addService(objectInputStreamFactory);

// init all child services
// init all child services, including the stream factory
super.serviceInit(conf);

// pass down extra information to the stream factory.
finishStreamFactoryInit();
}



@Override
protected void serviceStart() throws Exception {
super.serviceStart();
initLocalDirAllocator();
}


/**
* Return the store capabilities.
* Return the store path capabilities.
* If the object stream factory is non-null, hands off the
* query to that factory if not handled here.
* @param path path to query the capability of.
Expand All @@ -262,6 +269,7 @@ public boolean hasPathCapability(final Path path, final String capability) {
}
}


/**
* Return the capabilities of input streams created
* through the store.
Expand Down Expand Up @@ -931,6 +939,25 @@ public File createTemporaryFileForWriting(String pathStr,
return File.createTempFile(prefix, null, dir);
}

/*
=============== BEGIN ObjectInputStreamFactory ===============
*/

/**
* All stream factory initialization required after {@code Service.init()},
* after all other services have themselves been initialized.
*/
private void finishStreamFactoryInit() {
// must be on be invoked during service initialization
Preconditions.checkState(isInState(STATE.INITED),
"Store is in wrong state: %s", getServiceState());
Preconditions.checkState(clientManager.isInState(STATE.INITED),
"Client Manager is in wrong state: %s", clientManager.getServiceState());

// finish initialization and pass down callbacks to self
objectInputStreamFactory.bind(new FactoryCallbacks());
}

@Override /* ObjectInputStreamFactory */
public ObjectInputStream readObject(ObjectReadParameters parameters)
throws IOException {
Expand All @@ -942,4 +969,32 @@ public ObjectInputStream readObject(ObjectReadParameters parameters)
public StreamThreadOptions threadRequirements() {
return objectInputStreamFactory.threadRequirements();
}

/**
* This operation is not implemented, as
* is this class which invokes it on the actual factory.
* @param callbacks factory callbacks.
* @throws UnsupportedOperationException always
*/
@Override /* ObjectInputStreamFactory */
public void bind(final StreamFactoryCallbacks callbacks) {
throw new UnsupportedOperationException("Not supported");
}

/**
* Callbacks from {@link ObjectInputStreamFactory} instances.
*/
private class FactoryCallbacks implements StreamFactoryCallbacks {

@Override
public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOException {
// Needs support of the CRT before the requireCRT can be used
LOG.debug("Stream factory requested async client");
return clientManager().getOrCreateAsyncClient();
}
}

/*
=============== END ObjectInputStreamFactory ===============
*/
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.Preconditions;

import static org.apache.hadoop.util.StringUtils.toLowerCase;

Expand All @@ -34,6 +35,33 @@ protected AbstractObjectInputStreamFactory(final String name) {
super(name);
}

/**
* Callbacks.
*/
private StreamFactoryCallbacks callbacks;

/**
* Bind to the callbacks.
* <p>
* The base class checks service state then stores
* the callback interface.
* @param factoryCallbacks callbacks needed by the factories.
*/
@Override
public void bind(final StreamFactoryCallbacks factoryCallbacks) {
// must be on be invoked during service initialization
Preconditions.checkState(isInState(STATE.INITED),
"Input Stream factory %s is in wrong state: %s",
this, getServiceState());
this.callbacks = factoryCallbacks;
}

/**
* Return base capabilities of all stream factories,
* defined what the base ObjectInputStream class does.
* @param capability string to query the stream support for.
* @return true if implemented
*/
@Override
public boolean hasCapability(final String capability) {
switch (toLowerCase(capability)) {
Expand All @@ -45,4 +73,11 @@ public boolean hasCapability(final String capability) {
}
}

/**
* Get the factory callbacks.
* @return callbacks.
*/
public StreamFactoryCallbacks callbacks() {
return callbacks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.IOException;

import software.amazon.awssdk.services.s3.S3AsyncClient;

import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.service.Service;

Expand All @@ -38,6 +40,14 @@
public interface ObjectInputStreamFactory
extends Service, StreamCapabilities {

/**
* Set extra initialization parameters.
* This MUST ONLY be invoked between {@code init()}
* and {@code start()}.
* @param callbacks extra initialization parameters
*/
void bind(StreamFactoryCallbacks callbacks);

/**
* Create a new input stream.
* There is no requirement to actually contact the store; this is generally done
Expand All @@ -55,5 +65,18 @@ ObjectInputStream readObject(ObjectReadParameters parameters)
*/
StreamThreadOptions threadRequirements();

/**
* Callbacks for stream factories.
*/
interface StreamFactoryCallbacks {

/**
* Get the Async S3Client, raising a failure to create as an IOException.
* @param requireCRT is the CRT required.
* @return the Async S3 client
* @throws IOException failure to create the client.
*/
S3AsyncClient getOrCreateAsyncClient(boolean requireCRT) throws IOException;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,20 @@ public final class StreamIntegration {
*/
public static ObjectInputStreamFactory createStreamFactory(final Configuration conf) {
// choose the default input stream type

// work out the default stream; this includes looking at the
// deprecated prefetch enabled key to see if it is set.
InputStreamType defaultStream = InputStreamType.DEFAULT_STREAM_TYPE;
if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) {

// prefetch enabled, warn (once) then change it to be the default.
WARN_PREFETCH_KEY.info("Using {} is deprecated: choose the appropriate stream in {}",
PREFETCH_ENABLED_KEY, INPUT_STREAM_TYPE);
defaultStream = InputStreamType.Prefetch;
}

// retrieve the enum value, returning the configured value or
// the default...then instantiate it.
return conf.getEnum(INPUT_STREAM_TYPE, defaultStream)
.factory()
.apply(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected void serviceInit(final Configuration conf) throws Exception {
long prefetchBlockSizeLong =
longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, 1);
checkState(prefetchBlockSizeLong < Integer.MAX_VALUE,
"S3A prefatch block size exceeds int limit");
"S3A prefetch block size exceeds int limit");
prefetchBlockSize = (int) prefetchBlockSizeLong;
prefetchBlockCount =
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
Expand All @@ -87,4 +87,5 @@ public ObjectInputStream readObject(final ObjectReadParameters parameters) throw
public StreamThreadOptions threadRequirements() {
return new StreamThreadOptions(prefetchBlockCount, 0, true, false);
}

}

0 comments on commit 537805c

Please sign in to comment.