From 1731b082933079a7caeaa4eeaa6736664eacd8bf Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Fri, 22 Nov 2024 10:42:34 +0000 Subject: [PATCH] Introduce `bolt+unix` scheme support The new `bolt+unix` scheme allows connecting to Neo4j server over a Unix socket. Example: ```java try (var driver = GraphDatabase.driver("bolt+unix:///var/run/neo4j.sock")) { // use the driver var result = driver.executableQuery("SHOW DATABASES") .withConfig(QueryConfig.builder().withDatabase("system").build()) .execute(); result.records().forEach(System.out::println); } ``` --- .../neo4j/driver/internal/DriverFactory.java | 25 +++++- .../internal/InternalServerAddress.java | 41 --------- .../org/neo4j/driver/internal/Scheme.java | 2 + .../internal/bolt/api/BoltServerAddress.java | 16 +++- .../bolt/basicimpl/ConnectionProvider.java | 48 ----------- .../bolt/basicimpl/ConnectionProviders.java | 34 -------- .../NettyBoltConnectionProvider.java | 4 +- .../basicimpl/NettyConnectionProvider.java | 60 ++++++++----- .../driver/internal/DriverFactoryTest.java | 2 +- .../internal/InternalServerAddressTest.java | 84 +++++++++++++++++++ .../backend/messages/requests/NewDriver.java | 7 +- 11 files changed, 168 insertions(+), 155 deletions(-) delete mode 100644 driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/ConnectionProvider.java delete mode 100644 driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/ConnectionProviders.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/InternalServerAddressTest.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index 9f664d7d8a..4900eefaca 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -24,6 +24,8 @@ import io.netty.channel.local.LocalAddress; import io.netty.util.concurrent.EventExecutorGroup; import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Clock; import java.util.LinkedHashSet; import java.util.Set; @@ -111,7 +113,6 @@ public final Driver newInstance( ownsEventLoopGroup = false; } - var address = new InternalServerAddress(uri); var routingSettings = new RoutingSettings(config.routingTablePurgeDelayMillis(), new RoutingContext(uri)); EventExecutorGroup eventExecutorGroup = bootstrap.config().group(); @@ -122,7 +123,6 @@ public final Driver newInstance( return createDriver( uri, securityPlanManager, - address, bootstrap.group(), routingSettings, retryLogic, @@ -149,7 +149,6 @@ protected static MetricsProvider getOrCreateMetricsProvider(Config config, Clock private InternalDriver createDriver( URI uri, BoltSecurityPlanManager securityPlanManager, - ServerAddress address, EventLoopGroup eventLoopGroup, RoutingSettings routingSettings, RetryLogic retryLogic, @@ -159,11 +158,29 @@ private InternalDriver createDriver( boolean ownsEventLoopGroup, Supplier rediscoverySupplier) { BoltConnectionProvider boltConnectionProvider = null; + BoltServerAddress address; + if (Scheme.BOLT_UNIX_URI_SCHEME.equals(uri.getScheme())) { + var path = Path.of(uri.getPath()); + if (!Files.exists(path)) { + throw new IllegalArgumentException(String.format("%s does not exist", path)); + } + address = new BoltServerAddress(path); + } else { + var port = uri.getPort(); + if (port == -1) { + port = InternalServerAddress.DEFAULT_PORT; + } + if (port >= 0 && port <= 65_535) { + address = new BoltServerAddress(uri.getHost(), port); + } else { + throw new IllegalArgumentException("Illegal port: " + port); + } + } try { boltConnectionProvider = createBoltConnectionProvider(uri, config, eventLoopGroup, routingSettings, rediscoverySupplier); boltConnectionProvider.init( - new BoltServerAddress(address.host(), address.port()), + address, new RoutingContext(uri), DriverInfoUtil.boltAgent(), config.userAgent(), diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalServerAddress.java b/driver/src/main/java/org/neo4j/driver/internal/InternalServerAddress.java index df9ea9dd23..1e2b167b43 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalServerAddress.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalServerAddress.java @@ -35,10 +35,6 @@ private static void requireValidPort(int port) { throw new IllegalArgumentException("Illegal port: " + port); } - public InternalServerAddress(String address) { - this(uriFrom(address)); - } - public InternalServerAddress(URI uri) { this(hostFrom(uri), portFrom(uri)); } @@ -64,43 +60,6 @@ private static RuntimeException invalidAddressFormat(String address) { return new IllegalArgumentException("Invalid address format `" + address + "`"); } - @SuppressWarnings("DuplicatedCode") - private static URI uriFrom(String address) { - String scheme; - String hostPort; - - var schemeSplit = address.split("://"); - if (schemeSplit.length == 1) { - // URI can't parse addresses without scheme, prepend fake "bolt://" to reuse the parsing facility - scheme = "bolt://"; - hostPort = hostPortFrom(schemeSplit[0]); - } else if (schemeSplit.length == 2) { - scheme = schemeSplit[0] + "://"; - hostPort = hostPortFrom(schemeSplit[1]); - } else { - throw invalidAddressFormat(address); - } - - return URI.create(scheme + hostPort); - } - - private static String hostPortFrom(String address) { - if (address.startsWith("[")) { - // expected to be an IPv6 address like [::1] or [::1]:7687 - return address; - } - - var containsSingleColon = address.indexOf(":") == address.lastIndexOf(":"); - if (containsSingleColon) { - // expected to be an IPv4 address with or without port like 127.0.0.1 or 127.0.0.1:7687 - return address; - } - - // address contains multiple colons and does not start with '[' - // expected to be an IPv6 address without brackets - return "[" + address + "]"; - } - @Override public String toString() { return String.format("%s:%d", host, port); diff --git a/driver/src/main/java/org/neo4j/driver/internal/Scheme.java b/driver/src/main/java/org/neo4j/driver/internal/Scheme.java index 3917e54297..0ed4a6d467 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/Scheme.java +++ b/driver/src/main/java/org/neo4j/driver/internal/Scheme.java @@ -22,6 +22,7 @@ public class Scheme { public static final String BOLT_URI_SCHEME = "bolt"; public static final String BOLT_HIGH_TRUST_URI_SCHEME = "bolt+s"; public static final String BOLT_LOW_TRUST_URI_SCHEME = "bolt+ssc"; + public static final String BOLT_UNIX_URI_SCHEME = "bolt+unix"; public static final String NEO4J_URI_SCHEME = "neo4j"; public static final String NEO4J_HIGH_TRUST_URI_SCHEME = "neo4j+s"; public static final String NEO4J_LOW_TRUST_URI_SCHEME = "neo4j+ssc"; @@ -34,6 +35,7 @@ public static void validateScheme(String scheme) { case BOLT_URI_SCHEME, BOLT_LOW_TRUST_URI_SCHEME, BOLT_HIGH_TRUST_URI_SCHEME, + BOLT_UNIX_URI_SCHEME, NEO4J_URI_SCHEME, NEO4J_LOW_TRUST_URI_SCHEME, NEO4J_HIGH_TRUST_URI_SCHEME -> {} diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/api/BoltServerAddress.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/api/BoltServerAddress.java index c345307e90..73385f12b4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/api/BoltServerAddress.java +++ b/driver/src/main/java/org/neo4j/driver/internal/bolt/api/BoltServerAddress.java @@ -19,6 +19,7 @@ import static java.util.Objects.requireNonNull; import java.net.URI; +import java.nio.file.Path; import java.util.Objects; import java.util.stream.Stream; @@ -35,6 +36,7 @@ public class BoltServerAddress { // resolved IP address. protected final int port; private final String stringValue; + private final Path path; public BoltServerAddress(String address) { this(uriFrom(address)); @@ -55,6 +57,15 @@ public BoltServerAddress(String host, String connectionHost, int port) { this.stringValue = host.equals(connectionHost) ? String.format("%s:%d", host, port) : String.format("%s(%s):%d", host, connectionHost, port); + this.path = null; + } + + public BoltServerAddress(Path path) { + this.host = path.toString(); + this.connectionHost = this.host; + this.port = -1; + this.stringValue = this.host; + this.path = path; } @Override @@ -91,6 +102,10 @@ public String connectionHost() { return connectionHost; } + public Path path() { + return path; + } + /** * Create a stream of unicast addresses. *

@@ -115,7 +130,6 @@ private static int portFrom(URI uri) { return port == -1 ? DEFAULT_PORT : port; } - @SuppressWarnings("DuplicatedCode") private static URI uriFrom(String address) { String scheme; String hostPort; diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/ConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/ConnectionProvider.java deleted file mode 100644 index 683755f6d8..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/ConnectionProvider.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [https://neo4j.com] - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.bolt.basicimpl; - -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import org.neo4j.driver.Value; -import org.neo4j.driver.internal.bolt.api.AccessMode; -import org.neo4j.driver.internal.bolt.api.BoltAgent; -import org.neo4j.driver.internal.bolt.api.BoltServerAddress; -import org.neo4j.driver.internal.bolt.api.MetricsListener; -import org.neo4j.driver.internal.bolt.api.NotificationConfig; -import org.neo4j.driver.internal.bolt.api.RoutingContext; -import org.neo4j.driver.internal.bolt.api.SecurityPlan; -import org.neo4j.driver.internal.bolt.basicimpl.spi.Connection; - -public interface ConnectionProvider { - - CompletionStage acquireConnection( - BoltServerAddress address, - SecurityPlan securityPlan, - RoutingContext routingContext, - String databaseName, - Map authMap, - BoltAgent boltAgent, - String userAgent, - AccessMode mode, - int connectTimeoutMillis, - String impersonatedUser, - CompletableFuture latestAuthMillisFuture, - NotificationConfig notificationConfig, - MetricsListener metricsListener); -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/ConnectionProviders.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/ConnectionProviders.java deleted file mode 100644 index b572452af6..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/ConnectionProviders.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [https://neo4j.com] - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.bolt.basicimpl; - -import io.netty.channel.EventLoopGroup; -import io.netty.channel.local.LocalAddress; -import java.time.Clock; -import org.neo4j.driver.internal.bolt.api.DomainNameResolver; -import org.neo4j.driver.internal.bolt.api.LoggingProvider; - -public class ConnectionProviders { - static ConnectionProvider netty( - EventLoopGroup group, - Clock clock, - DomainNameResolver domainNameResolver, - LocalAddress localAddress, - LoggingProvider logging) { - return new NettyConnectionProvider(group, clock, domainNameResolver, localAddress, logging); - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java index d78bb52444..9d519342fe 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java @@ -53,7 +53,7 @@ public final class NettyBoltConnectionProvider implements BoltConnectionProvider private final LoggingProvider logging; private final System.Logger log; - private final ConnectionProvider connectionProvider; + private final NettyConnectionProvider connectionProvider; private BoltServerAddress address; @@ -76,7 +76,7 @@ public NettyBoltConnectionProvider( this.logging = Objects.requireNonNull(logging); this.log = logging.getLog(getClass()); this.connectionProvider = - ConnectionProviders.netty(eventLoopGroup, clock, domainNameResolver, localAddress, logging); + new NettyConnectionProvider(eventLoopGroup, clock, domainNameResolver, localAddress, logging); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyConnectionProvider.java index e5fcf7ae3a..15685da2b9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyConnectionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyConnectionProvider.java @@ -25,10 +25,12 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; +import io.netty.channel.socket.nio.NioDomainSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.resolver.AddressResolverGroup; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.UnixDomainSocketAddress; import java.time.Clock; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -52,7 +54,7 @@ import org.neo4j.driver.internal.bolt.basicimpl.messaging.BoltProtocol; import org.neo4j.driver.internal.bolt.basicimpl.spi.Connection; -public final class NettyConnectionProvider implements ConnectionProvider { +public final class NettyConnectionProvider { private final EventLoopGroup eventLoopGroup; private final Clock clock; private final DomainNameResolver domainNameResolver; @@ -75,7 +77,6 @@ public NettyConnectionProvider( this.logging = logging; } - @Override public CompletionStage acquireConnection( BoltServerAddress address, SecurityPlan securityPlan, @@ -90,27 +91,9 @@ public CompletionStage acquireConnection( CompletableFuture latestAuthMillisFuture, NotificationConfig notificationConfig, MetricsListener metricsListener) { - var bootstrap = new Bootstrap(); - bootstrap - .group(this.eventLoopGroup) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis) - .channel(localAddress != null ? LocalChannel.class : NioSocketChannel.class) - .resolver(addressResolverGroup) - .handler(new NettyChannelInitializer(address, securityPlan, connectTimeoutMillis, clock, logging)); - - SocketAddress socketAddress; - if (localAddress == null) { - try { - socketAddress = - new InetSocketAddress(domainNameResolver.resolve(address.connectionHost())[0], address.port()); - } catch (Throwable t) { - socketAddress = InetSocketAddress.createUnresolved(address.connectionHost(), address.port()); - } - } else { - socketAddress = localAddress; - } - return installChannelConnectedListeners(address, bootstrap.connect(socketAddress), connectTimeoutMillis) + return installChannelConnectedListeners( + address, connect(address, securityPlan, connectTimeoutMillis), connectTimeoutMillis) .thenCompose(channel -> BoltProtocol.forChannel(channel) .initializeChannel( channel, @@ -124,6 +107,39 @@ public CompletionStage acquireConnection( .thenApply(channel -> new NetworkConnection(channel, logging)); } + private ChannelFuture connect(BoltServerAddress address, SecurityPlan securityPlan, int connectTimeoutMillis) { + Class channelClass; + SocketAddress socketAddress; + + if (localAddress != null) { + channelClass = LocalChannel.class; + socketAddress = localAddress; + } else { + if (address.path() != null) { + channelClass = NioDomainSocketChannel.class; + socketAddress = UnixDomainSocketAddress.of(address.path()); + } else { + channelClass = NioSocketChannel.class; + try { + socketAddress = new InetSocketAddress( + domainNameResolver.resolve(address.connectionHost())[0], address.port()); + } catch (Throwable t) { + socketAddress = InetSocketAddress.createUnresolved(address.connectionHost(), address.port()); + } + } + } + + var bootstrap = new Bootstrap(); + bootstrap + .group(this.eventLoopGroup) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis) + .channel(channelClass) + .resolver(addressResolverGroup) + .handler(new NettyChannelInitializer(address, securityPlan, connectTimeoutMillis, clock, logging)); + + return bootstrap.connect(socketAddress); + } + private CompletionStage installChannelConnectedListeners( BoltServerAddress address, ChannelFuture channelConnected, int connectTimeoutMillis) { var pipeline = channelConnected.channel().pipeline(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java index 63ed0c1361..cefd7bf7e1 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java @@ -55,7 +55,7 @@ class DriverFactoryTest { private static Stream testUris() { - return Stream.of("bolt://localhost:7687", "neo4j://localhost:7687"); + return Stream.of("bolt://localhost:7687", "bolt+unix://localhost:7687", "neo4j://localhost:7687"); } @ParameterizedTest diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalServerAddressTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalServerAddressTest.java new file mode 100644 index 0000000000..5587b49e33 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalServerAddressTest.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.neo4j.driver.internal.bolt.api.BoltServerAddress.DEFAULT_PORT; + +import java.net.URI; +import java.net.URISyntaxException; +import org.junit.jupiter.api.Test; + +class InternalServerAddressTest { + @Test + void defaultPortShouldBe7687() { + assertThat(DEFAULT_PORT, equalTo(7687)); + } + + @Test + void portShouldUseDefaultIfNotSupplied() throws URISyntaxException { + assertThat( + new InternalServerAddress(new URI("neo4j://localhost")).port(), + equalTo(InternalServerAddress.DEFAULT_PORT)); + } + + @Test + void shouldHaveCorrectToString() { + assertEquals("localhost:4242", new InternalServerAddress("localhost", 4242).toString()); + assertEquals("127.0.0.1:8888", new InternalServerAddress("127.0.0.1", 8888).toString()); + } + + @Test + void shouldVerifyHost() { + assertThrows(NullPointerException.class, () -> new InternalServerAddress(null, 0)); + } + + @Test + void shouldVerifyPort() { + assertThrows(IllegalArgumentException.class, () -> new InternalServerAddress("localhost", -1)); + assertThrows(IllegalArgumentException.class, () -> new InternalServerAddress("localhost", -42)); + assertThrows(IllegalArgumentException.class, () -> new InternalServerAddress("localhost", 65_536)); + assertThrows(IllegalArgumentException.class, () -> new InternalServerAddress("localhost", 99_999)); + } + + @Test + void shouldUseUriWithHostButWithoutPort() { + var uri = URI.create("bolt://neo4j.com"); + var address = new InternalServerAddress(uri); + + assertEquals("neo4j.com", address.host()); + assertEquals(DEFAULT_PORT, address.port()); + } + + @Test + void shouldUseUriWithHostAndPort() { + var uri = URI.create("bolt://neo4j.com:12345"); + var address = new InternalServerAddress(uri); + + assertEquals("neo4j.com", address.host()); + assertEquals(12345, address.port()); + } + + @Test + void shouldIncludeHostAndPortInToString() { + var address = new InternalServerAddress("localhost", 8081); + assertThat(address.toString(), equalTo("localhost:8081")); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index 49696cb5db..f24595d591 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -52,13 +52,13 @@ import org.neo4j.driver.NotificationClassification; import org.neo4j.driver.internal.DriverFactory; import org.neo4j.driver.internal.InternalNotificationSeverity; -import org.neo4j.driver.internal.InternalServerAddress; import org.neo4j.driver.internal.SecuritySettings; import org.neo4j.driver.internal.bolt.api.DefaultDomainNameResolver; import org.neo4j.driver.internal.bolt.api.DomainNameResolver; import org.neo4j.driver.internal.security.BoltSecurityPlanManager; import org.neo4j.driver.internal.security.SecurityPlans; import org.neo4j.driver.internal.security.StaticAuthTokenManager; +import org.neo4j.driver.net.ServerAddress; import org.neo4j.driver.net.ServerAddressResolver; import reactor.core.publisher.Mono; @@ -176,7 +176,10 @@ private ServerAddressResolver callbackResolver(TestkitState testkitState) { throw new RuntimeException(e); } return resolutionCompleted.getData().getAddresses().stream() - .map(InternalServerAddress::new) + .map(value -> { + var tokens = value.split(":"); + return ServerAddress.of(tokens[0], Integer.parseInt(tokens[1])); + }) .collect(Collectors.toCollection(LinkedHashSet::new)); }; }