Skip to content

Commit

Permalink
Arrow Flight Server bootstrap logic
Browse files Browse the repository at this point in the history
* new plugin for StreamManager implementation
* integration with server module
* support for SslContext in Flight server and client
* ClientManager for creating a pool of flight clients for data nodes
* custom event loop group and thread pool for server and client channel

Signed-off-by: Rishabh Maurya <[email protected]>
  • Loading branch information
rishabhmaurya committed Jan 6, 2025
1 parent c0f7806 commit b38f302
Show file tree
Hide file tree
Showing 105 changed files with 10,100 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce a setting to disable download of full cluster state from remote on term mismatch([#16798](https://github.com/opensearch-project/OpenSearch/pull/16798/))
- Added ability to retrieve value from DocValues in a flat_object filed([#16802](https://github.com/opensearch-project/OpenSearch/pull/16802))
- Introduce framework for auxiliary transports and an experimental gRPC transport plugin ([#16534](https://github.com/opensearch-project/OpenSearch/pull/16534))
- Arrow Flight server bootstrap logic ([#16962](https://github.com/opensearch-project/OpenSearch/pull/16962))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public class OpenSearchNode implements TestClusterConfiguration {
private boolean isWorkingDirConfigured = false;
private String httpPort = "0";
private String transportPort = "0";
private String streamPort = "0";
private Path confPathData;
private String keystorePassword = "";
private boolean preserveDataDir = false;
Expand Down Expand Up @@ -1175,6 +1176,8 @@ private void createConfiguration() {
baseConfig.put("node.portsfile", "true");
baseConfig.put("http.port", httpPort);
baseConfig.put("transport.port", transportPort);
baseConfig.put("node.attr.transport.stream.port", streamPort);

// Default the watermarks to absurdly low to prevent the tests from failing on nodes without enough disk space
baseConfig.put("cluster.routing.allocation.disk.watermark.low", "1b");
baseConfig.put("cluster.routing.allocation.disk.watermark.high", "1b");
Expand Down Expand Up @@ -1447,6 +1450,10 @@ void setTransportPort(String transportPort) {
this.transportPort = transportPort;
}

void setStreamPort(String streamPort) {
this.streamPort = streamPort;
}

void setDataPath(Path dataPath) {
this.confPathData = dataPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class RunTask extends DefaultTestClustersTask {
public static final String CUSTOM_SETTINGS_PREFIX = "tests.opensearch.";
private static final int DEFAULT_HTTP_PORT = 9200;
private static final int DEFAULT_TRANSPORT_PORT = 9300;
private static final int DEFAULT_STREAM_PORT = 9880;
private static final int DEFAULT_DEBUG_PORT = 5005;
public static final String LOCALHOST_ADDRESS_PREFIX = "127.0.0.1:";

Expand Down Expand Up @@ -140,6 +141,8 @@ public void beforeStart() {
int debugPort = DEFAULT_DEBUG_PORT;
int httpPort = DEFAULT_HTTP_PORT;
int transportPort = DEFAULT_TRANSPORT_PORT;
int streamPort = DEFAULT_STREAM_PORT;

Map<String, String> additionalSettings = System.getProperties()
.entrySet()
.stream()
Expand All @@ -164,15 +167,19 @@ public void beforeStart() {
firstNode.setHttpPort(String.valueOf(httpPort));
httpPort++;
firstNode.setTransportPort(String.valueOf(transportPort));
firstNode.setStreamPort(String.valueOf(streamPort));
transportPort++;
streamPort++;
firstNode.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT);
cluster.setPreserveDataDir(preserveData);
for (OpenSearchNode node : cluster.getNodes()) {
if (node != firstNode) {
node.setHttpPort(String.valueOf(httpPort));
httpPort++;
node.setTransportPort(String.valueOf(transportPort));
node.setStreamPort(String.valueOf(streamPort));
transportPort++;
streamPort++;
node.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT);
}
additionalSettings.forEach(node::setting);
Expand Down
4 changes: 2 additions & 2 deletions distribution/archives/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* under the License.
*/

import org.opensearch.gradle.JavaPackageType
import org.opensearch.gradle.JavaPackageType

apply plugin: 'opensearch.internal-distribution-archive-setup'

Expand Down Expand Up @@ -190,7 +190,7 @@ distribution_archives {
}
}


linuxPpc64leTar {
archiveClassifier = 'linux-ppc64le'
content {
Expand Down
1 change: 1 addition & 0 deletions distribution/src/config/jvm.options
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,4 @@ ${error.file}
# See please https://bugs.openjdk.org/browse/JDK-8341127 (openjdk/jdk#21283)
23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.setAsTypeCache
23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.asTypeUncached
--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ opentelemetry = "1.41.0"
opentelemetrysemconv = "1.27.0-alpha"

# arrow dependencies
arrow = "17.0.0"
arrow = "18.1.0"
flatbuffers = "2.0.0"

[libraries]
Expand Down
259 changes: 259 additions & 0 deletions modules/arrow-flight-rpc/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

apply plugin: 'opensearch.publish'
apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description 'Arrow flight based Stream implementation'
classname 'org.opensearch.arrow.flight.FlightStreamPlugin'
}

dependencies {
implementation project(':libs:opensearch-arrow-spi')

implementation "io.netty:netty-buffer:${versions.netty}"
implementation "io.netty:netty-codec:${versions.netty}"
implementation "io.netty:netty-codec-http:${versions.netty}"
implementation "io.netty:netty-codec-http2:${versions.netty}"
implementation "io.netty:netty-common:${versions.netty}"
implementation "io.netty:netty-handler:${versions.netty}"
implementation "io.netty:netty-resolver:${versions.netty}"
implementation "io.netty:netty-transport:${versions.netty}"
implementation "io.netty:netty-transport-native-unix-common:${versions.netty}"
implementation "io.netty:netty-transport-classes-epoll:${versions.netty}"

implementation 'org.checkerframework:checker-qual:3.44.0'
implementation "org.apache.arrow:arrow-memory-core:${versions.arrow}"
implementation "org.apache.arrow:arrow-memory-netty-buffer-patch:${versions.arrow}"
implementation "org.apache.arrow:arrow-memory-netty:${versions.arrow}"

runtimeOnly group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.2'
compileOnly 'org.immutables:value:2.10.1'
annotationProcessor 'org.immutables:value:2.10.1'
implementation "org.apache.arrow:arrow-flight:${versions.arrow}"
implementation "org.apache.arrow:flight-core:${versions.arrow}"
implementation "io.grpc:grpc-api:${versions.grpc}"
implementation "io.grpc:grpc-netty:${versions.grpc}"
runtimeOnly "io.grpc:grpc-core:${versions.grpc}"
implementation "io.grpc:grpc-stub:${versions.grpc}"
runtimeOnly "io.grpc:grpc-all:${versions.grpc}"
runtimeOnly "io.grpc:grpc-protobuf:${versions.grpc}"
runtimeOnly "io.grpc:grpc-protobuf-lite:${versions.grpc}"
runtimeOnly 'io.perfmark:perfmark-api:0.27.0'
runtimeOnly "com.google.guava:failureaccess:1.0.1"
compileOnly "com.google.errorprone:error_prone_annotations:2.31.0"
runtimeOnly('com.google.guava:guava:33.3.1-jre') {
attributes {
attribute(Attribute.of('org.gradle.jvm.environment', String), 'standard-jvm')
}
}
runtimeOnly 'org.apache.parquet:parquet-arrow:1.13.1'
}

tasks.named('test').configure {
jacoco {
excludes = ['org/apache/arrow/flight/**']
}
}

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'

excludes = [
'org/apache/arrow/flight/OSFlightServer$Builder.class',
'org/apache/arrow/flight/OSFlightClient$Builder.class',
'org/opensearch/flight/bootstrap/server/ServerConfig$Netty4Configs.class',
'org/opensearch/flight/bootstrap/server/ServerConfig.class',
'org/opensearch/flight/bootstrap/tls/DefaultSslContextProvider.class',
'org/apache/arrow/flight/OpenSearchFlightClient$Builder.class'
]
}

tasks.named('thirdPartyAudit').configure {
ignoreMissingClasses(
'com.google.gson.stream.JsonReader',
'com.google.gson.stream.JsonToken',
'com.google.protobuf.util.Timestamps',
'com.google.rpc.Status',
'com.google.rpc.Status$Builder',
'org.apache.parquet.schema.GroupType',
// Parquet Schema classes
'org.apache.parquet.schema.LogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$DateLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$DecimalLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$IntLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$IntervalLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$ListLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$LogicalTypeAnnotationVisitor',
'org.apache.parquet.schema.LogicalTypeAnnotation$StringLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$TimeLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$TimeUnit',
'org.apache.parquet.schema.LogicalTypeAnnotation$TimestampLogicalTypeAnnotation',
'org.apache.parquet.schema.MessageType',
'org.apache.parquet.schema.OriginalType',
'org.apache.parquet.schema.PrimitiveType',
'org.apache.parquet.schema.PrimitiveType$PrimitiveTypeName',
'org.apache.parquet.schema.PrimitiveType$PrimitiveTypeNameConverter',
'org.apache.parquet.schema.Type',
'org.apache.parquet.schema.Type$Repetition',
'org.apache.parquet.schema.Types',
'org.apache.parquet.schema.Types$BaseListBuilder',
'org.apache.parquet.schema.Types$GroupBuilder',
'org.apache.parquet.schema.Types$ListBuilder',
'org.apache.parquet.schema.Types$PrimitiveBuilder',

'com.aayushatharva.brotli4j.Brotli4jLoader',
'com.aayushatharva.brotli4j.decoder.DecoderJNI$Status',
'com.aayushatharva.brotli4j.decoder.DecoderJNI$Wrapper',
'com.aayushatharva.brotli4j.encoder.BrotliEncoderChannel',
'com.aayushatharva.brotli4j.encoder.Encoder$Mode',
'com.aayushatharva.brotli4j.encoder.Encoder$Parameters',
// classes are missing

// from io.netty.logging.CommonsLoggerFactory (netty)
'org.apache.commons.logging.Log',
'org.apache.commons.logging.LogFactory',

// from Log4j (deliberate, Netty will fallback to Log4j 2)
'org.apache.log4j.Level',
'org.apache.log4j.Logger',

// from io.netty.handler.ssl.OpenSslEngine (netty)
'io.netty.internal.tcnative.Buffer',
'io.netty.internal.tcnative.CertificateCompressionAlgo',
'io.netty.internal.tcnative.Library',
'io.netty.internal.tcnative.SSL',
'io.netty.internal.tcnative.SSLContext',
'io.netty.internal.tcnative.SSLPrivateKeyMethod',

// from io.netty.handler.ssl.util.BouncyCastleSelfSignedCertGenerator (netty)
'org.bouncycastle.cert.X509v3CertificateBuilder',
'org.bouncycastle.cert.jcajce.JcaX509CertificateConverter',
'org.bouncycastle.operator.jcajce.JcaContentSignerBuilder',
'org.bouncycastle.openssl.PEMEncryptedKeyPair',
'org.bouncycastle.openssl.PEMParser',
'org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter',
'org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder',
'org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder',
'org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo',

// from io.netty.handler.ssl.JettyNpnSslEngine (netty)
'org.eclipse.jetty.npn.NextProtoNego$ClientProvider',
'org.eclipse.jetty.npn.NextProtoNego$ServerProvider',
'org.eclipse.jetty.npn.NextProtoNego',

// from io.netty.handler.codec.marshalling.ChannelBufferByteInput (netty)
'org.jboss.marshalling.ByteInput',

// from io.netty.handler.codec.marshalling.ChannelBufferByteOutput (netty)
'org.jboss.marshalling.ByteOutput',

// from io.netty.handler.codec.marshalling.CompatibleMarshallingEncoder (netty)
'org.jboss.marshalling.Marshaller',

// from io.netty.handler.codec.marshalling.ContextBoundUnmarshallerProvider (netty)
'org.jboss.marshalling.MarshallerFactory',
'org.jboss.marshalling.MarshallingConfiguration',
'org.jboss.marshalling.Unmarshaller',

'com.google.protobuf.nano.CodedOutputByteBufferNano',
'com.google.protobuf.nano.MessageNano',
'com.ning.compress.BufferRecycler',
'com.ning.compress.lzf.ChunkDecoder',
'com.ning.compress.lzf.ChunkEncoder',
'com.ning.compress.lzf.LZFChunk',
'com.ning.compress.lzf.LZFEncoder',
'com.ning.compress.lzf.util.ChunkDecoderFactory',
'com.ning.compress.lzf.util.ChunkEncoderFactory',
'lzma.sdk.lzma.Encoder',
'net.jpountz.lz4.LZ4Compressor',
'net.jpountz.lz4.LZ4Factory',
'net.jpountz.lz4.LZ4FastDecompressor',
'net.jpountz.xxhash.XXHash32',
'net.jpountz.xxhash.XXHashFactory',
'io.netty.internal.tcnative.AsyncSSLPrivateKeyMethod',
'io.netty.internal.tcnative.AsyncTask',
'io.netty.internal.tcnative.CertificateCallback',
'io.netty.internal.tcnative.CertificateVerifier',
'io.netty.internal.tcnative.ResultCallback',
'io.netty.internal.tcnative.SessionTicketKey',
'io.netty.internal.tcnative.SniHostNameMatcher',
'io.netty.internal.tcnative.SSL',
'io.netty.internal.tcnative.SSLSession',
'io.netty.internal.tcnative.SSLSessionCache',
'org.eclipse.jetty.alpn.ALPN$ClientProvider',
'org.eclipse.jetty.alpn.ALPN$ServerProvider',
'org.eclipse.jetty.alpn.ALPN',

'org.conscrypt.AllocatedBuffer',
'org.conscrypt.BufferAllocator',
'org.conscrypt.Conscrypt',
'org.conscrypt.HandshakeListener',

'reactor.blockhound.BlockHound$Builder',
'reactor.blockhound.integration.BlockHoundIntegration'
)
ignoreViolations(
// Guava internal classes
'com.google.common.cache.Striped64',
'com.google.common.cache.Striped64$1',
'com.google.common.cache.Striped64$Cell',
'com.google.common.hash.LittleEndianByteArray$UnsafeByteArray',
'com.google.common.hash.LittleEndianByteArray$UnsafeByteArray$1',
'com.google.common.hash.LittleEndianByteArray$UnsafeByteArray$2',
'com.google.common.hash.Striped64',
'com.google.common.hash.Striped64$1',
'com.google.common.hash.Striped64$Cell',
'com.google.common.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator',
'com.google.common.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator$1',
'com.google.common.util.concurrent.AbstractFuture$UnsafeAtomicHelper',
'com.google.common.util.concurrent.AbstractFuture$UnsafeAtomicHelper$1',

// Arrow memory classes
'org.apache.arrow.memory.util.MemoryUtil',
'org.apache.arrow.memory.util.MemoryUtil$1',

'io.netty.util.internal.PlatformDependent0',
'io.netty.util.internal.PlatformDependent0$1',
'io.netty.util.internal.PlatformDependent0$2',
'io.netty.util.internal.PlatformDependent0$3',
'io.netty.util.internal.PlatformDependent0$4',
'io.netty.util.internal.PlatformDependent0$6',
'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueConsumerNodeRef',
'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueProducerNodeRef',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueColdProducerFields',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueConsumerFields',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueProducerFields',
'io.netty.util.internal.shaded.org.jctools.queues.LinkedQueueNode',
'io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerLimitField',
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerLimitField',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeLongArrayAccess',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$1',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$2',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$3',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$4',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5'
)
}

tasks.named("dependencyLicenses").configure {
mapping from: /netty-.*/, to: 'netty'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1dcf1de382a0bf95a3d8b0849546c88bac1292c9
Loading

0 comments on commit b38f302

Please sign in to comment.