Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19354. S3A: S3AInputStream to be created by factory under S3AStore #7214

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -123,4 +124,39 @@ public static <E extends Enum<E>> Map<String, E> mapEnumNamesToValues(
return mapping;
}

/**
* Look up an enum from the configuration option and map it to
* a value in the supplied enum class.
* If no value is supplied or there is no match for the supplied value,
* the fallback function is invoked, passing in the trimmed and possibly
* empty string of the value.
* Extends {link {@link Configuration#getEnum(String, Enum)}}
* by adding case independence and a lambda expression for fallback,
* rather than a default value.
* @param conf configuration
* @param name property name
* @param enumClass classname to resolve
* @param fallback fallback supplier
* @return an enum value
* @param <E> enumeration type.
* @throws IllegalArgumentException If mapping is illegal for the type provided
*/
public static <E extends Enum<E>> E resolveEnum(
Configuration conf,
String name,
Class<E> enumClass,
Function<String, E> fallback) {

final String val = conf.getTrimmed(name, "");

// build a map of lower case string to enum values.
final Map<String, E> mapping = mapEnumNamesToValues("", enumClass);
final E mapped = mapping.get(val.toLowerCase(Locale.ROOT));
if (mapped != null) {
return mapped;
} else {
// fallback handles it
return fallback.apply(val);
}
}
}
2 changes: 1 addition & 1 deletion hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
</Match>
<!-- we are using completable futures, so ignore the Future which submit() returns -->
<Match>
<Class name="org.apache.hadoop.fs.s3a.S3AFileSystem$InputStreamCallbacksImpl" />
<Class name="org.apache.hadoop.fs.s3a.impl.InputStreamCallbacksImpl" />
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
</Match>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;

import java.time.Duration;
Expand Down Expand Up @@ -1580,14 +1581,60 @@ private Constants() {
*/
public static final String AWS_AUTH_CLASS_PREFIX = "com.amazonaws.auth";

/**
* Input stream type: {@value}.
*/
public static final String INPUT_STREAM_TYPE = "fs.s3a.input.stream.type";

/**
* The classic input stream: {@value}.
*/
public static final String INPUT_STREAM_TYPE_CLASSIC =
StreamIntegration.CLASSIC;

/**
* The prefetching input stream: {@value}.
*/
public static final String INPUT_STREAM_TYPE_PREFETCH = StreamIntegration.PREFETCH;

/**
* The analytics input stream: {@value}.
*/
public static final String INPUT_STREAM_TYPE_ANALYTICS =
StreamIntegration.ANALYTICS;

/**
* Request the default input stream,
* whatever it is for this release: {@value}.
*/
public static final String INPUT_STREAM_TYPE_DEFAULT = StreamIntegration.DEFAULT;

/**
* The custom input stream type: {@value}".
* If set, the classname is loaded from
* {@link #INPUT_STREAM_CUSTOM_FACTORY}.
* <p>
* This option is primarily for testing as it can
* be used to generated failures.
*/
public static final String INPUT_STREAM_TYPE_CUSTOM =
StreamIntegration.CUSTOM;

/**
* Classname of the factory to instantiate for custom streams: {@value}.
*/
public static final String INPUT_STREAM_CUSTOM_FACTORY = "fs.s3a.input.stream.custom.factory";

/**
* Controls whether the prefetching input stream is enabled.
*/
@Deprecated
public static final String PREFETCH_ENABLED_KEY = "fs.s3a.prefetch.enabled";

/**
* Default option as to whether the prefetching input stream is enabled.
*/
@Deprecated
public static final boolean PREFETCH_ENABLED_DEFAULT = false;

// If the default values are used, each file opened for reading will consume
Expand Down
Loading