Skip to content

Commit

Permalink
interim changes - integration with Auxiliary Transport
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Maurya <[email protected]>
  • Loading branch information
rishabhmaurya committed Jan 9, 2025
1 parent 69e9a97 commit d300467
Show file tree
Hide file tree
Showing 22 changed files with 1,058 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ public class OpenSearchNode implements TestClusterConfiguration {
private boolean isWorkingDirConfigured = false;
private String httpPort = "0";
private String transportPort = "0";
private String streamPort = "0";
private Path confPathData;
private String keystorePassword = "";
private boolean preserveDataDir = false;
Expand Down Expand Up @@ -1176,7 +1175,6 @@ private void createConfiguration() {
baseConfig.put("node.portsfile", "true");
baseConfig.put("http.port", httpPort);
baseConfig.put("transport.port", transportPort);
baseConfig.put("node.attr.transport.stream.port", streamPort);

// Default the watermarks to absurdly low to prevent the tests from failing on nodes without enough disk space
baseConfig.put("cluster.routing.allocation.disk.watermark.low", "1b");
Expand Down Expand Up @@ -1450,10 +1448,6 @@ void setTransportPort(String transportPort) {
this.transportPort = transportPort;
}

void setStreamPort(String streamPort) {
this.streamPort = streamPort;
}

void setDataPath(Path dataPath) {
this.confPathData = dataPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public class RunTask extends DefaultTestClustersTask {
public static final String CUSTOM_SETTINGS_PREFIX = "tests.opensearch.";
private static final int DEFAULT_HTTP_PORT = 9200;
private static final int DEFAULT_TRANSPORT_PORT = 9300;
private static final int DEFAULT_STREAM_PORT = 9880;
private static final int DEFAULT_DEBUG_PORT = 5005;
public static final String LOCALHOST_ADDRESS_PREFIX = "127.0.0.1:";

Expand Down Expand Up @@ -141,7 +140,6 @@ public void beforeStart() {
int debugPort = DEFAULT_DEBUG_PORT;
int httpPort = DEFAULT_HTTP_PORT;
int transportPort = DEFAULT_TRANSPORT_PORT;
int streamPort = DEFAULT_STREAM_PORT;

Map<String, String> additionalSettings = System.getProperties()
.entrySet()
Expand All @@ -167,19 +165,15 @@ public void beforeStart() {
firstNode.setHttpPort(String.valueOf(httpPort));
httpPort++;
firstNode.setTransportPort(String.valueOf(transportPort));
firstNode.setStreamPort(String.valueOf(streamPort));
transportPort++;
streamPort++;
firstNode.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT);
cluster.setPreserveDataDir(preserveData);
for (OpenSearchNode node : cluster.getNodes()) {
if (node != firstNode) {
node.setHttpPort(String.valueOf(httpPort));
httpPort++;
node.setTransportPort(String.valueOf(transportPort));
node.setStreamPort(String.valueOf(streamPort));
transportPort++;
streamPort++;
node.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT);
}
additionalSettings.forEach(node::setting);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 3)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 5)
public class ArrowFlightServerIT extends OpenSearchIntegTestCase {

private FlightClientManager flightClientManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,29 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SecureTransportSettingsProvider;
import org.opensearch.plugins.StreamManagerPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ExecutorBuilder;
Expand All @@ -44,7 +51,7 @@
* BaseFlightStreamPlugin is a plugin that implements the StreamManagerPlugin interface.
* It provides the necessary components for handling flight streams in the OpenSearch cluster.
*/
public abstract class BaseFlightStreamPlugin extends Plugin implements StreamManagerPlugin, NetworkPlugin, ClusterPlugin {
public abstract class BaseFlightStreamPlugin extends Plugin implements StreamManagerPlugin, NetworkPlugin, ClusterPlugin, ActionPlugin {

/**
* Constructor for BaseFlightStreamPlugin.
Expand Down Expand Up @@ -107,6 +114,16 @@ public abstract Map<String, Supplier<Transport>> getSecureTransports(
Tracer tracer
);

@Override
public abstract Map<String, Supplier<AuxTransport>> getAuxTransports(
Settings settings,
ThreadPool threadPool,
CircuitBreakerService circuitBreakerService,
NetworkService networkService,
ClusterSettings clusterSettings,
Tracer tracer
);

/**
* Returns the StreamManager instance for managing flight streams.
*/
Expand All @@ -132,4 +149,18 @@ public abstract Map<String, Supplier<Transport>> getSecureTransports(
*/
@Override
public abstract void onNodeStarted(DiscoveryNode localNode);

@Override
public abstract List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
);

@Override
public abstract List<ActionHandler<?, ?>> getActions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
Expand All @@ -27,6 +31,8 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.SecureTransportSettingsProvider;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ExecutorBuilder;
Expand Down Expand Up @@ -90,6 +96,18 @@ public Map<String, Supplier<Transport>> getSecureTransports(
return Map.of();
}

@Override
public Map<String, Supplier<AuxTransport>> getAuxTransports(
Settings settings,
ThreadPool threadPool,
CircuitBreakerService circuitBreakerService,
NetworkService networkService,
ClusterSettings clusterSettings,
Tracer tracer
) {
return Map.of();
}

@Override
public Supplier<StreamManager> getStreamManager() {
return () -> null;
Expand All @@ -109,6 +127,24 @@ public List<Setting<?>> getSettings() {
public void onNodeStarted(DiscoveryNode localNode) {

}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of();
}

@Override
public List<ActionHandler<?, ?>> getActions() {
return List.of();
}
};
}
}
Expand Down Expand Up @@ -192,6 +228,44 @@ public Map<String, Supplier<Transport>> getSecureTransports(
);
}

@Override
public Map<String, Supplier<AuxTransport>> getAuxTransports(
Settings settings,
ThreadPool threadPool,
CircuitBreakerService circuitBreakerService,
NetworkService networkService,
ClusterSettings clusterSettings,
Tracer tracer
) {
return delegate.getAuxTransports(settings, threadPool, circuitBreakerService, networkService, clusterSettings, tracer);
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return delegate.getRestHandlers(
settings,
restController,
clusterSettings,
indexScopedSettings,
settingsFilter,
indexNameExpressionResolver,
nodesInCluster
);
}

@Override
public List<ActionHandler<?, ?>> getActions() {
return delegate.getActions();
}

/**
* Gets the StreamManager instance for managing flight streams.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.arrow.flight.api;

import org.opensearch.client.node.NodeClient;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.util.List;

import static org.opensearch.rest.RestRequest.Method.GET;

public class FlightServerInfoAction extends BaseRestHandler {

public FlightServerInfoAction() {}

@Override
public String getName() {
return "flight_server_info_action";
}

@Override
public List<Route> routes() {
return List.of(new Route(GET, "/_flight/info"), new Route(GET, "/_flight/info/{nodeId}"));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
String nodeId = request.param("nodeId");
if (nodeId != null) {
// Query specific node
NodesFlightInfoRequest nodesRequest = new NodesFlightInfoRequest(nodeId);
return channel -> client.execute(NodesFlightInfoAction.INSTANCE, nodesRequest, new RestToXContentListener<>(channel));
} else {
NodesFlightInfoRequest nodesRequest = new NodesFlightInfoRequest();
return channel -> client.execute(NodesFlightInfoAction.INSTANCE, nodesRequest, new RestToXContentListener<>(channel));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.arrow.flight.api;

import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.transport.BoundTransportAddress;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

public class NodeFlightInfo extends BaseNodeResponse implements ToXContentObject {
private final BoundTransportAddress boundAddress;

public NodeFlightInfo(StreamInput in) throws IOException {
super(in);
boundAddress = new BoundTransportAddress(in);
}

public NodeFlightInfo(DiscoveryNode node, BoundTransportAddress boundAddress) {
super(node);
this.boundAddress = boundAddress;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
boundAddress.writeTo(out);
}

public BoundTransportAddress getBoundAddress() {
return boundAddress;
}

public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.startObject("flight_server");

builder.startArray("bound_addresses");
for (TransportAddress address : boundAddress.boundAddresses()) {
builder.startObject();
builder.field("host", address.address().getHostString());
builder.field("port", address.address().getPort());
builder.endObject();
}
builder.endArray();

TransportAddress publishAddress = boundAddress.publishAddress();
builder.startObject("publish_address");
builder.field("host", publishAddress.address().getHostString());
builder.field("port", publishAddress.address().getPort());
builder.endObject();

builder.endObject();
builder.endObject();
return builder;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.arrow.flight.api;

import org.opensearch.action.ActionType;

public class NodesFlightInfoAction extends ActionType<NodesFlightInfoResponse> {
public static final NodesFlightInfoAction INSTANCE = new NodesFlightInfoAction();
public static final String NAME = "cluster:admin/flight/info";

private NodesFlightInfoAction() {
super(NAME, NodesFlightInfoResponse::new);
}
}
Loading

0 comments on commit d300467

Please sign in to comment.