Skip to content

Commit

Permalink
Arrow Flight Server bootstrap logic
Browse files Browse the repository at this point in the history
* new plugin for StreamManager implementation
* integration with server module
* support for SslContext in Flight server and client
* ClientManager for creating a pool of flight clients for data nodes
* custom event loop group and thread pool for server and client channel
  • Loading branch information
rishabhmaurya committed Jan 6, 2025
1 parent c0f7806 commit 53ad56f
Show file tree
Hide file tree
Showing 104 changed files with 10,101 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public class OpenSearchNode implements TestClusterConfiguration {
private boolean isWorkingDirConfigured = false;
private String httpPort = "0";
private String transportPort = "0";
private String streamPort = "0";
private Path confPathData;
private String keystorePassword = "";
private boolean preserveDataDir = false;
Expand Down Expand Up @@ -1175,6 +1176,8 @@ private void createConfiguration() {
baseConfig.put("node.portsfile", "true");
baseConfig.put("http.port", httpPort);
baseConfig.put("transport.port", transportPort);
baseConfig.put("node.attr.transport.stream.port", streamPort);

// Default the watermarks to absurdly low to prevent the tests from failing on nodes without enough disk space
baseConfig.put("cluster.routing.allocation.disk.watermark.low", "1b");
baseConfig.put("cluster.routing.allocation.disk.watermark.high", "1b");
Expand Down Expand Up @@ -1447,6 +1450,10 @@ void setTransportPort(String transportPort) {
this.transportPort = transportPort;
}

void setStreamPort(String streamPort) {
this.streamPort = streamPort;
}

void setDataPath(Path dataPath) {
this.confPathData = dataPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class RunTask extends DefaultTestClustersTask {
public static final String CUSTOM_SETTINGS_PREFIX = "tests.opensearch.";
private static final int DEFAULT_HTTP_PORT = 9200;
private static final int DEFAULT_TRANSPORT_PORT = 9300;
private static final int DEFAULT_STREAM_PORT = 9880;
private static final int DEFAULT_DEBUG_PORT = 5005;
public static final String LOCALHOST_ADDRESS_PREFIX = "127.0.0.1:";

Expand Down Expand Up @@ -140,6 +141,8 @@ public void beforeStart() {
int debugPort = DEFAULT_DEBUG_PORT;
int httpPort = DEFAULT_HTTP_PORT;
int transportPort = DEFAULT_TRANSPORT_PORT;
int streamPort = DEFAULT_STREAM_PORT;

Map<String, String> additionalSettings = System.getProperties()
.entrySet()
.stream()
Expand All @@ -164,15 +167,19 @@ public void beforeStart() {
firstNode.setHttpPort(String.valueOf(httpPort));
httpPort++;
firstNode.setTransportPort(String.valueOf(transportPort));
firstNode.setStreamPort(String.valueOf(streamPort));
transportPort++;
streamPort++;
firstNode.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT);
cluster.setPreserveDataDir(preserveData);
for (OpenSearchNode node : cluster.getNodes()) {
if (node != firstNode) {
node.setHttpPort(String.valueOf(httpPort));
httpPort++;
node.setTransportPort(String.valueOf(transportPort));
node.setStreamPort(String.valueOf(streamPort));
transportPort++;
streamPort++;
node.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT);
}
additionalSettings.forEach(node::setting);
Expand Down
4 changes: 2 additions & 2 deletions distribution/archives/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* under the License.
*/

import org.opensearch.gradle.JavaPackageType
import org.opensearch.gradle.JavaPackageType

apply plugin: 'opensearch.internal-distribution-archive-setup'

Expand Down Expand Up @@ -190,7 +190,7 @@ distribution_archives {
}
}


linuxPpc64leTar {
archiveClassifier = 'linux-ppc64le'
content {
Expand Down
1 change: 1 addition & 0 deletions distribution/src/config/jvm.options
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,4 @@ ${error.file}
# See please https://bugs.openjdk.org/browse/JDK-8341127 (openjdk/jdk#21283)
23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.setAsTypeCache
23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.asTypeUncached
--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ opentelemetry = "1.41.0"
opentelemetrysemconv = "1.27.0-alpha"

# arrow dependencies
arrow = "17.0.0"
arrow = "18.1.0"
flatbuffers = "2.0.0"

[libraries]
Expand Down
259 changes: 259 additions & 0 deletions modules/arrow-flight-rpc/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

apply plugin: 'opensearch.publish'
apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description 'Arrow flight based Stream implementation'
classname 'org.opensearch.arrow.flight.FlightStreamPlugin'
}

dependencies {
implementation project(':libs:opensearch-arrow-spi')

implementation "io.netty:netty-buffer:${versions.netty}"
implementation "io.netty:netty-codec:${versions.netty}"
implementation "io.netty:netty-codec-http:${versions.netty}"
implementation "io.netty:netty-codec-http2:${versions.netty}"
implementation "io.netty:netty-common:${versions.netty}"
implementation "io.netty:netty-handler:${versions.netty}"
implementation "io.netty:netty-resolver:${versions.netty}"
implementation "io.netty:netty-transport:${versions.netty}"
implementation "io.netty:netty-transport-native-unix-common:${versions.netty}"
implementation "io.netty:netty-transport-classes-epoll:${versions.netty}"

implementation 'org.checkerframework:checker-qual:3.44.0'
implementation "org.apache.arrow:arrow-memory-core:${versions.arrow}"
implementation "org.apache.arrow:arrow-memory-netty-buffer-patch:${versions.arrow}"
implementation "org.apache.arrow:arrow-memory-netty:${versions.arrow}"

runtimeOnly group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.2'
compileOnly 'org.immutables:value:2.10.1'
annotationProcessor 'org.immutables:value:2.10.1'
implementation "org.apache.arrow:arrow-flight:${versions.arrow}"
implementation "org.apache.arrow:flight-core:${versions.arrow}"
implementation "io.grpc:grpc-api:${versions.grpc}"
implementation "io.grpc:grpc-netty:${versions.grpc}"
runtimeOnly "io.grpc:grpc-core:${versions.grpc}"
implementation "io.grpc:grpc-stub:${versions.grpc}"
runtimeOnly "io.grpc:grpc-all:${versions.grpc}"
runtimeOnly "io.grpc:grpc-protobuf:${versions.grpc}"
runtimeOnly "io.grpc:grpc-protobuf-lite:${versions.grpc}"
runtimeOnly 'io.perfmark:perfmark-api:0.27.0'
runtimeOnly "com.google.guava:failureaccess:1.0.1"
compileOnly "com.google.errorprone:error_prone_annotations:2.31.0"
runtimeOnly('com.google.guava:guava:33.3.1-jre') {
attributes {
attribute(Attribute.of('org.gradle.jvm.environment', String), 'standard-jvm')
}
}
runtimeOnly 'org.apache.parquet:parquet-arrow:1.13.1'
}

tasks.named('test').configure {
jacoco {
excludes = ['org/apache/arrow/flight/**']
}
}

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'

excludes = [
'org/apache/arrow/flight/OSFlightServer$Builder.class',
'org/apache/arrow/flight/OSFlightClient$Builder.class',
'org/opensearch/flight/bootstrap/server/ServerConfig$Netty4Configs.class',
'org/opensearch/flight/bootstrap/server/ServerConfig.class',
'org/opensearch/flight/bootstrap/tls/DefaultSslContextProvider.class',
'org/apache/arrow/flight/OpenSearchFlightClient$Builder.class'
]
}

tasks.named('thirdPartyAudit').configure {
ignoreMissingClasses(
'com.google.gson.stream.JsonReader',
'com.google.gson.stream.JsonToken',
'com.google.protobuf.util.Timestamps',
'com.google.rpc.Status',
'com.google.rpc.Status$Builder',
'org.apache.parquet.schema.GroupType',
// Parquet Schema classes
'org.apache.parquet.schema.LogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$DateLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$DecimalLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$IntLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$IntervalLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$ListLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$LogicalTypeAnnotationVisitor',
'org.apache.parquet.schema.LogicalTypeAnnotation$StringLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$TimeLogicalTypeAnnotation',
'org.apache.parquet.schema.LogicalTypeAnnotation$TimeUnit',
'org.apache.parquet.schema.LogicalTypeAnnotation$TimestampLogicalTypeAnnotation',
'org.apache.parquet.schema.MessageType',
'org.apache.parquet.schema.OriginalType',
'org.apache.parquet.schema.PrimitiveType',
'org.apache.parquet.schema.PrimitiveType$PrimitiveTypeName',
'org.apache.parquet.schema.PrimitiveType$PrimitiveTypeNameConverter',
'org.apache.parquet.schema.Type',
'org.apache.parquet.schema.Type$Repetition',
'org.apache.parquet.schema.Types',
'org.apache.parquet.schema.Types$BaseListBuilder',
'org.apache.parquet.schema.Types$GroupBuilder',
'org.apache.parquet.schema.Types$ListBuilder',
'org.apache.parquet.schema.Types$PrimitiveBuilder',

'com.aayushatharva.brotli4j.Brotli4jLoader',
'com.aayushatharva.brotli4j.decoder.DecoderJNI$Status',
'com.aayushatharva.brotli4j.decoder.DecoderJNI$Wrapper',
'com.aayushatharva.brotli4j.encoder.BrotliEncoderChannel',
'com.aayushatharva.brotli4j.encoder.Encoder$Mode',
'com.aayushatharva.brotli4j.encoder.Encoder$Parameters',
// classes are missing

// from io.netty.logging.CommonsLoggerFactory (netty)
'org.apache.commons.logging.Log',
'org.apache.commons.logging.LogFactory',

// from Log4j (deliberate, Netty will fallback to Log4j 2)
'org.apache.log4j.Level',
'org.apache.log4j.Logger',

// from io.netty.handler.ssl.OpenSslEngine (netty)
'io.netty.internal.tcnative.Buffer',
'io.netty.internal.tcnative.CertificateCompressionAlgo',
'io.netty.internal.tcnative.Library',
'io.netty.internal.tcnative.SSL',
'io.netty.internal.tcnative.SSLContext',
'io.netty.internal.tcnative.SSLPrivateKeyMethod',

// from io.netty.handler.ssl.util.BouncyCastleSelfSignedCertGenerator (netty)
'org.bouncycastle.cert.X509v3CertificateBuilder',
'org.bouncycastle.cert.jcajce.JcaX509CertificateConverter',
'org.bouncycastle.operator.jcajce.JcaContentSignerBuilder',
'org.bouncycastle.openssl.PEMEncryptedKeyPair',
'org.bouncycastle.openssl.PEMParser',
'org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter',
'org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder',
'org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder',
'org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo',

// from io.netty.handler.ssl.JettyNpnSslEngine (netty)
'org.eclipse.jetty.npn.NextProtoNego$ClientProvider',
'org.eclipse.jetty.npn.NextProtoNego$ServerProvider',
'org.eclipse.jetty.npn.NextProtoNego',

// from io.netty.handler.codec.marshalling.ChannelBufferByteInput (netty)
'org.jboss.marshalling.ByteInput',

// from io.netty.handler.codec.marshalling.ChannelBufferByteOutput (netty)
'org.jboss.marshalling.ByteOutput',

// from io.netty.handler.codec.marshalling.CompatibleMarshallingEncoder (netty)
'org.jboss.marshalling.Marshaller',

// from io.netty.handler.codec.marshalling.ContextBoundUnmarshallerProvider (netty)
'org.jboss.marshalling.MarshallerFactory',
'org.jboss.marshalling.MarshallingConfiguration',
'org.jboss.marshalling.Unmarshaller',

'com.google.protobuf.nano.CodedOutputByteBufferNano',
'com.google.protobuf.nano.MessageNano',
'com.ning.compress.BufferRecycler',
'com.ning.compress.lzf.ChunkDecoder',
'com.ning.compress.lzf.ChunkEncoder',
'com.ning.compress.lzf.LZFChunk',
'com.ning.compress.lzf.LZFEncoder',
'com.ning.compress.lzf.util.ChunkDecoderFactory',
'com.ning.compress.lzf.util.ChunkEncoderFactory',
'lzma.sdk.lzma.Encoder',
'net.jpountz.lz4.LZ4Compressor',
'net.jpountz.lz4.LZ4Factory',
'net.jpountz.lz4.LZ4FastDecompressor',
'net.jpountz.xxhash.XXHash32',
'net.jpountz.xxhash.XXHashFactory',
'io.netty.internal.tcnative.AsyncSSLPrivateKeyMethod',
'io.netty.internal.tcnative.AsyncTask',
'io.netty.internal.tcnative.CertificateCallback',
'io.netty.internal.tcnative.CertificateVerifier',
'io.netty.internal.tcnative.ResultCallback',
'io.netty.internal.tcnative.SessionTicketKey',
'io.netty.internal.tcnative.SniHostNameMatcher',
'io.netty.internal.tcnative.SSL',
'io.netty.internal.tcnative.SSLSession',
'io.netty.internal.tcnative.SSLSessionCache',
'org.eclipse.jetty.alpn.ALPN$ClientProvider',
'org.eclipse.jetty.alpn.ALPN$ServerProvider',
'org.eclipse.jetty.alpn.ALPN',

'org.conscrypt.AllocatedBuffer',
'org.conscrypt.BufferAllocator',
'org.conscrypt.Conscrypt',
'org.conscrypt.HandshakeListener',

'reactor.blockhound.BlockHound$Builder',
'reactor.blockhound.integration.BlockHoundIntegration'
)
ignoreViolations(
// Guava internal classes
'com.google.common.cache.Striped64',
'com.google.common.cache.Striped64$1',
'com.google.common.cache.Striped64$Cell',
'com.google.common.hash.LittleEndianByteArray$UnsafeByteArray',
'com.google.common.hash.LittleEndianByteArray$UnsafeByteArray$1',
'com.google.common.hash.LittleEndianByteArray$UnsafeByteArray$2',
'com.google.common.hash.Striped64',
'com.google.common.hash.Striped64$1',
'com.google.common.hash.Striped64$Cell',
'com.google.common.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator',
'com.google.common.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator$1',
'com.google.common.util.concurrent.AbstractFuture$UnsafeAtomicHelper',
'com.google.common.util.concurrent.AbstractFuture$UnsafeAtomicHelper$1',

// Arrow memory classes
'org.apache.arrow.memory.util.MemoryUtil',
'org.apache.arrow.memory.util.MemoryUtil$1',

'io.netty.util.internal.PlatformDependent0',
'io.netty.util.internal.PlatformDependent0$1',
'io.netty.util.internal.PlatformDependent0$2',
'io.netty.util.internal.PlatformDependent0$3',
'io.netty.util.internal.PlatformDependent0$4',
'io.netty.util.internal.PlatformDependent0$6',
'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueConsumerNodeRef',
'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueProducerNodeRef',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueColdProducerFields',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueConsumerFields',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueProducerFields',
'io.netty.util.internal.shaded.org.jctools.queues.LinkedQueueNode',
'io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerLimitField',
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerLimitField',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeLongArrayAccess',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$1',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$2',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$3',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$4',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5'
)
}

tasks.named("dependencyLicenses").configure {
mapping from: /netty-.*/, to: 'netty'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1dcf1de382a0bf95a3d8b0849546c88bac1292c9
Loading

0 comments on commit 53ad56f

Please sign in to comment.