Skip to content

Commit

Permalink
HADOOP-19354. S3A: S3AInputStream to be created by factory under S3AS…
Browse files Browse the repository at this point in the history
…tore (apache#7214)

S3 InputStreams are created by a factory class, with the
choice of factory dynamically chosen by the option

  fs.s3a.input.stream.type

Supported values: classic, prefetching, analytics, custom

Contributed by Steve Loughran
  • Loading branch information
steveloughran authored Feb 20, 2025
1 parent ec6c08b commit 5067082
Show file tree
Hide file tree
Showing 66 changed files with 3,652 additions and 807 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -123,4 +124,39 @@ public static <E extends Enum<E>> Map<String, E> mapEnumNamesToValues(
return mapping;
}

/**
* Look up an enum from the configuration option and map it to
* a value in the supplied enum class.
* If no value is supplied or there is no match for the supplied value,
* the fallback function is invoked, passing in the trimmed and possibly
* empty string of the value.
* Extends {link {@link Configuration#getEnum(String, Enum)}}
* by adding case independence and a lambda expression for fallback,
* rather than a default value.
* @param conf configuration
* @param name property name
* @param enumClass classname to resolve
* @param fallback fallback supplier
* @param <E> enumeration type.
* @return an enum value
* @throws IllegalArgumentException If mapping is illegal for the type provided
*/
public static <E extends Enum<E>> E resolveEnum(
Configuration conf,
String name,
Class<E> enumClass,
Function<String, E> fallback) {

final String val = conf.getTrimmed(name, "");

// build a map of lower case string to enum values.
final Map<String, E> mapping = mapEnumNamesToValues("", enumClass);
final E mapped = mapping.get(val.toLowerCase(Locale.ROOT));
if (mapped != null) {
return mapped;
} else {
// fallback handles it
return fallback.apply(val);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.apache.hadoop.util.ConfigurationHelper.ERROR_MULTIPLE_ELEMENTS_MATCHING_TO_LOWER_CASE_VALUE;
import static org.apache.hadoop.util.ConfigurationHelper.mapEnumNamesToValues;
import static org.apache.hadoop.util.ConfigurationHelper.parseEnumSet;
import static org.apache.hadoop.util.ConfigurationHelper.resolveEnum;

/**
* Test for {@link ConfigurationHelper}.
Expand All @@ -43,6 +44,12 @@ public class TestConfigurationHelper extends AbstractHadoopTestBase {
*/
private enum SimpleEnum { a, b, c, i }

/**
* Upper case version of SimpleEnum.
* "i" is included for case tests, as it is special in turkey.
*/
private enum UppercaseEnum { A, B, C, I }


/**
* Special case: an enum with no values.
Expand Down Expand Up @@ -171,4 +178,65 @@ public void testDuplicateValues() {
.containsExactly(SimpleEnum.a, SimpleEnum.b, SimpleEnum.c);
}

@Test
public void testResolveEnumGood() throws Throwable {
assertEnumResolution("c", SimpleEnum.c);
}

@Test
public void testResolveEnumTrimmed() throws Throwable {
// strings are trimmed at each end
assertEnumResolution("\n i \n ", SimpleEnum.i);
}

@Test
public void testResolveEnumCaseConversion() throws Throwable {
assertEnumResolution("C", SimpleEnum.c);
}

@Test
public void testResolveEnumNoMatch() throws Throwable {
assertEnumResolution("other", null);
}

@Test
public void testResolveEnumEmpty() throws Throwable {
assertEnumResolution("", null);
}

@Test
public void testResolveEnumUpperCaseConversion() throws Throwable {
assertUpperEnumResolution("C", UppercaseEnum.C);
}

@Test
public void testResolveLowerToUpperCaseConversion() throws Throwable {
assertUpperEnumResolution("i", UppercaseEnum.I);
}

/**
* Assert that a string value in a configuration resolves to the expected
* value.
* @param value value to set
* @param expected expected outcome, set to null for no resolution.
*/
private void assertEnumResolution(final String value, final SimpleEnum expected) {
Assertions.assertThat(resolveEnum(confWithKey(value),
"key", SimpleEnum.class, (v) -> null))
.describedAs("Resolution of %s", value)
.isEqualTo(expected);
}

/**
* Equivalent for Uppercase Enum.
* @param value value to set
* @param expected expected outcome, set to null for no resolution.
*/
private void assertUpperEnumResolution(final String value, UppercaseEnum expected) {
Assertions.assertThat(resolveEnum(confWithKey(value),
"key", UppercaseEnum.class, (v) -> null))
.describedAs("Resolution of %s", value)
.isEqualTo(expected);
}

}
2 changes: 1 addition & 1 deletion hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
</Match>
<!-- we are using completable futures, so ignore the Future which submit() returns -->
<Match>
<Class name="org.apache.hadoop.fs.s3a.S3AFileSystem$InputStreamCallbacksImpl" />
<Class name="org.apache.hadoop.fs.s3a.impl.InputStreamCallbacksImpl" />
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
</Match>

Expand Down
36 changes: 24 additions & 12 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@
<!-- Set a longer timeout for integration test (in milliseconds) -->
<test.integration.timeout>200000</test.integration.timeout>


<!-- Is prefetch enabled? -->
<fs.s3a.prefetch.enabled>unset</fs.s3a.prefetch.enabled>
<!-- stream type to use in tests; passed down in fs.s3a.input.stream.type -->
<stream>classic</stream>
<!-- Job ID; allows for parallel jobs on same bucket -->
<!-- job.id is used to build the path for tests; default is 00.-->
<job.id>00</job.id>
Expand Down Expand Up @@ -122,8 +121,8 @@
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
<fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
<!-- Prefetch -->
<fs.s3a.prefetch.enabled>${fs.s3a.prefetch.enabled}</fs.s3a.prefetch.enabled>
<!-- Stream Type -->
<fs.s3a.input.stream.type>${stream}</fs.s3a.input.stream.type>
</systemPropertyVariables>
</configuration>
</plugin>
Expand Down Expand Up @@ -161,8 +160,8 @@
<fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
<test.default.timeout>${test.integration.timeout}</test.default.timeout>
<!-- Prefetch -->
<fs.s3a.prefetch.enabled>${fs.s3a.prefetch.enabled}</fs.s3a.prefetch.enabled>
<!-- Stream Type -->
<fs.s3a.input.stream.type>${stream}</fs.s3a.input.stream.type>
<!-- are root tests enabled. Set to false when running parallel jobs on same bucket -->
<fs.s3a.root.tests.enabled>${root.tests.enabled}</fs.s3a.root.tests.enabled>

Expand Down Expand Up @@ -212,8 +211,8 @@
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
<fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
<!-- Prefetch -->
<fs.s3a.prefetch.enabled>${fs.s3a.prefetch.enabled}</fs.s3a.prefetch.enabled>
<!-- Stream Type -->
<fs.s3a.input.stream.type>${stream}</fs.s3a.input.stream.type>
<!-- are root tests enabled. Set to false when running parallel jobs on same bucket -->
<fs.s3a.root.tests.enabled>${root.tests.enabled}</fs.s3a.root.tests.enabled>
<test.unique.fork.id>job-${job.id}</test.unique.fork.id>
Expand Down Expand Up @@ -273,8 +272,8 @@
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
<!-- Prefetch -->
<fs.s3a.prefetch.enabled>${fs.s3a.prefetch.enabled}</fs.s3a.prefetch.enabled>
<!-- Stream Type -->
<fs.s3a.input.stream.type>${stream}</fs.s3a.input.stream.type>
<test.unique.fork.id>job-${job.id}</test.unique.fork.id>
</systemPropertyVariables>
<forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
Expand Down Expand Up @@ -308,7 +307,20 @@
</property>
</activation>
<properties>
<fs.s3a.prefetch.enabled>true</fs.s3a.prefetch.enabled>
<stream>prefetch</stream>
</properties>
</profile>

<!-- Switch to the analytics input stream-->
<profile>
<id>analytics</id>
<activation>
<property>
<name>analytics</name>
</property>
</activation>
<properties>
<stream>analytics</stream>
</properties>
</profile>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;

import java.time.Duration;
Expand Down Expand Up @@ -1580,14 +1581,60 @@ private Constants() {
*/
public static final String AWS_AUTH_CLASS_PREFIX = "com.amazonaws.auth";

/**
* Input stream type: {@value}.
*/
public static final String INPUT_STREAM_TYPE = "fs.s3a.input.stream.type";

/**
* The classic input stream: {@value}.
*/
public static final String INPUT_STREAM_TYPE_CLASSIC =
StreamIntegration.CLASSIC;

/**
* The prefetching input stream: {@value}.
*/
public static final String INPUT_STREAM_TYPE_PREFETCH = StreamIntegration.PREFETCH;

/**
* The analytics input stream: {@value}.
*/
public static final String INPUT_STREAM_TYPE_ANALYTICS =
StreamIntegration.ANALYTICS;

/**
* Request the default input stream,
* whatever it is for this release: {@value}.
*/
public static final String INPUT_STREAM_TYPE_DEFAULT = StreamIntegration.DEFAULT;

/**
* The custom input stream type: {@value}".
* If set, the classname is loaded from
* {@link #INPUT_STREAM_CUSTOM_FACTORY}.
* <p>
* This option is primarily for testing as it can
* be used to generated failures.
*/
public static final String INPUT_STREAM_TYPE_CUSTOM =
StreamIntegration.CUSTOM;

/**
* Classname of the factory to instantiate for custom streams: {@value}.
*/
public static final String INPUT_STREAM_CUSTOM_FACTORY = "fs.s3a.input.stream.custom.factory";

/**
* Controls whether the prefetching input stream is enabled.
*/
@Deprecated
public static final String PREFETCH_ENABLED_KEY = "fs.s3a.prefetch.enabled";

/**
* Default option as to whether the prefetching input stream is enabled.
*/
@Deprecated
public static final boolean PREFETCH_ENABLED_DEFAULT = false;

// If the default values are used, each file opened for reading will consume
Expand Down
Loading

0 comments on commit 5067082

Please sign in to comment.