From eb214785ff8df677834e15e7697a6245822f551f Mon Sep 17 00:00:00 2001 From: Zhen Date: Mon, 20 Nov 2017 11:04:28 +0100 Subject: [PATCH 1/9] Delay purge connection that is current in use by sessions --- .../Neo4j.Driver.Tests/ConnectionPoolTests.cs | 198 +++++++++++++++++- .../Routing/ClusterConnectionPoolTests.cs | 10 +- .../Neo4j.Driver/Internal/ConnectionPool.cs | 129 ++++++++---- .../Neo4j.Driver/Internal/Extensions/Throw.cs | 14 +- .../Neo4j.Driver/Internal/IConnectionPool.cs | 4 +- .../Internal/Routing/ClusterConnectionPool.cs | 13 +- .../Routing/IClusterConnectionPool.cs | 4 +- .../Routing/IClusterConnectionPoolManager.cs | 4 +- .../Internal/Routing/LoadBalancer.cs | 8 +- .../Internal/Routing/RoutingTableManager.cs | 24 ++- 10 files changed, 329 insertions(+), 79 deletions(-) diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs index 3a3811238..9657edb58 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs @@ -35,11 +35,17 @@ namespace Neo4j.Driver.Tests { public class ConnectionPoolTests { - internal static ConnectionPool NewConnectionPoolWithNoConnectionTimeoutValidation( - IConnection connection, + internal static ConnectionPool NewConnectionPool( + IConnection connection = null, BlockingCollection availableConnections = null, ConcurrentSet inUseConnections = null) { + if(connection == null) + { + var connectionMock = new Mock(); + connectionMock.Setup(x => x.IsOpen).Returns(true); + connection = connectionMock.Object; + } var testConfigWithIdleTimeoutAndLifetimeCheckDisabled = new Config { MaxConnectionLifetime = Config.InfiniteInterval, @@ -48,6 +54,7 @@ internal static ConnectionPool NewConnectionPoolWithNoConnectionTimeoutValidatio return new ConnectionPool(connection, availableConnections, inUseConnections, poolSettings: new ConnectionPoolSettings(testConfigWithIdleTimeoutAndLifetimeCheckDisabled)); } + public class AcquireMethod { private readonly ITestOutputHelper _output; @@ -263,7 +270,7 @@ public void ShouldReuseOpenConnectionWhenOpenAndClosedConnectionsInQueue() conns.Add(unhealthyMock.Object); conns.Add(healthyMock.Object); - var pool = NewConnectionPoolWithNoConnectionTimeoutValidation(MockedConnection, conns); + var pool = NewConnectionPool(MockedConnection, conns); pool.NumberOfAvailableConnections.Should().Be(2); pool.NumberOfInUseConnections.Should().Be(0); @@ -373,7 +380,7 @@ public void ShouldAcquireNewWhenBeingUsedConcurrentlyBy(int numberOfThreads) mockConns.Enqueue(mock); } - var pool = NewConnectionPoolWithNoConnectionTimeoutValidation(MockedConnection, conns); + var pool = NewConnectionPool(MockedConnection, conns); pool.NumberOfAvailableConnections.Should().Be(numberOfThreads); pool.NumberOfInUseConnections.Should().Be(0); @@ -437,7 +444,7 @@ public void ShouldCloseAcquiredConnectionIfPoolDisposeStarted() // Given var conns = new BlockingCollection(); var healthyMock = new Mock(); - var pool = NewConnectionPoolWithNoConnectionTimeoutValidation(MockedConnection, conns); + var pool = NewConnectionPool(MockedConnection, conns); pool.NumberOfAvailableConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(0); @@ -1133,5 +1140,186 @@ public async void ShoulReportPoolSizeCorrectOnConcurrentRequestsAsync() } + public class PoolStateTests + { + // open + [Fact] + public void FromOpenViaAcquireToOpen() + { + var pool = NewConnectionPool(); + pool.Acquire(); + pool.Status.Should().Be(PoolStatus.Open); + } + + [Fact] + public void FromOpenViaReleaseToOpen() + { + var idleQueue = new BlockingCollection(); + var inUseConnections = new ConcurrentSet(); + var conn = new Mock().Object; + inUseConnections.TryAdd(conn); + var pool = NewConnectionPool(null, idleQueue, inUseConnections); + + pool.Release(conn); + + idleQueue.Count.Should().Be(1); + inUseConnections.Count.Should().Be(0); + pool.Status.Should().Be(PoolStatus.Open); + } + + [Fact] + public void FromOpenViaDisposeToClosed() + { + var pool = NewConnectionPool(); + pool.Dispose(); + pool.Status.Should().Be(PoolStatus.Closed); + } + + [Fact] + public void FromOpenViaActiveToOpen() + { + var pool = NewConnectionPool(); + pool.Active(); + pool.Status.Should().Be(PoolStatus.Open); + } + + [Fact] + public void FromOpenViaDeactiveToZombie() + { + var pool = NewConnectionPool(); + pool.Deactivate(); + pool.Status.Should().Be(PoolStatus.Zombie); + } + + // zombie + [Fact] + public void FromZombieViaAcquireThrowsError() + { + var pool = NewConnectionPool(); + pool.Status = PoolStatus.Zombie; + + var exception = Record.Exception(()=>pool.Acquire()); + + exception.Should().BeOfType(); + pool.Status.Should().Be(PoolStatus.Zombie); + } + + [Fact] + public void FromZombieViaReleaseToZombie() + { + var idleQueue = new BlockingCollection(); + var inUseConnections = new ConcurrentSet(); + + var conn = new Mock().Object; + inUseConnections.TryAdd(conn); + + var pool = NewConnectionPool(null, idleQueue, inUseConnections); + pool.Status = PoolStatus.Zombie; + + pool.Release(conn); + + inUseConnections.Count.Should().Be(0); + idleQueue.Count.Should().Be(0); + pool.Status.Should().Be(PoolStatus.Zombie); + } + + [Fact] + public void FromZombieViaDisposeToClosed() + { + var pool = NewConnectionPool(); + pool.Status = PoolStatus.Zombie; + + pool.Dispose(); + + pool.Status.Should().Be(PoolStatus.Closed); + } + + [Fact] + public void FromZombieViaActiveToOpen() + { + var pool = NewConnectionPool(); + pool.Status = PoolStatus.Zombie; + + pool.Active(); + + pool.Status.Should().Be(PoolStatus.Open); + } + + [Fact] + public void FromZombieViaDeactiveToZombie() + { + var pool = NewConnectionPool(); + pool.Status = PoolStatus.Zombie; + + pool.Deactivate(); + + pool.Status.Should().Be(PoolStatus.Zombie); + } + + //closed + [Fact] + public void FromClosedViaAcquireThrowsError() + { + var pool = NewConnectionPool(); + pool.Status = PoolStatus.Closed; + + var exception = Record.Exception(()=>pool.Acquire()); + + exception.Should().BeOfType(); + pool.Status.Should().Be(PoolStatus.Closed); + } + + [Fact] + public void FromClosedViaReleaseToClosed() + { + var idleQueue = new BlockingCollection(); + var inUseConnections = new ConcurrentSet(); + + var conn = new Mock().Object; + inUseConnections.TryAdd(conn); + + var pool = NewConnectionPool(null, idleQueue, inUseConnections); + pool.Status = PoolStatus.Closed; + + pool.Release(conn); + + inUseConnections.Count.Should().Be(1); + idleQueue.Count.Should().Be(0); + pool.Status.Should().Be(PoolStatus.Closed); + } + + [Fact] + public void FromClosedViaDisposeToClosed() + { + var pool = NewConnectionPool(); + pool.Status = PoolStatus.Closed; + + pool.Dispose(); + + pool.Status.Should().Be(PoolStatus.Closed); + } + + [Fact] + public void FromClosedViaActiveToClosed() + { + var pool = NewConnectionPool(); + pool.Status = PoolStatus.Closed; + + pool.Active(); + + pool.Status.Should().Be(PoolStatus.Closed); + } + + [Fact] + public void FromClosedViaDeactiveToClosed() + { + var pool = NewConnectionPool(); + pool.Status = PoolStatus.Closed; + + pool.Deactivate(); + + pool.Status.Should().Be(PoolStatus.Closed); + } + } } } diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs index 3d3105c3e..7f7f61592 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs @@ -139,7 +139,7 @@ public void ShouldAddNewConnectionPoolIfDoesNotExist() var pool = new ClusterConnectionPool(mockedConnectionPool.Object, connectionPoolDict); // When - pool.Update(new[] {ServerUri}); + pool.Update(new[] {ServerUri}, new Uri[0]); // Then connectionPoolDict.Count.Should().Be(1); @@ -148,7 +148,7 @@ public void ShouldAddNewConnectionPoolIfDoesNotExist() } [Fact] - public void ShouldRemoveNewlyCreatedPoolnIfDisposeAlreadyCalled() + public void ShouldRemoveNewlyCreatedPoolIfDisposeAlreadyCalled() { // Given var mockedConnectionPool = new Mock(); @@ -157,7 +157,7 @@ public void ShouldRemoveNewlyCreatedPoolnIfDisposeAlreadyCalled() // When pool.Dispose(); - var exception = Record.Exception(() => pool.Update(new[] {ServerUri})); + var exception = Record.Exception(() => pool.Update(new[] {ServerUri}, new Uri[0])); // Then mockedConnectionPool.Verify(x => x.Close()); @@ -176,7 +176,7 @@ public void ShouldRemoveServerPoolIfNotPresentInNewServers() var pool = new ClusterConnectionPool(mockedConnectionPool.Object, connectionPoolDict); // When - pool.Update(new Uri[0]); + pool.Update(new Uri[0], new[] {ServerUri}); // Then connectionPoolDict.Count.Should().Be(0); @@ -273,4 +273,4 @@ public void ShouldReturnCorrectCountForPresentAddress() } } } -} \ No newline at end of file +} diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs index e67a8aa3b..8805c05c1 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs @@ -24,17 +24,27 @@ using System.Threading.Tasks; using Neo4j.Driver.Internal.Connector; using Neo4j.Driver.V1; -using static Neo4j.Driver.Internal.Throw.DriverDisposedException; +using static Neo4j.Driver.Internal.PoolStatus; +using static Neo4j.Driver.Internal.Throw.ObjectDisposedException; namespace Neo4j.Driver.Internal { + internal static class PoolStatus + { + public const int Open = 0; + public const int Closed = 1; + public const int Zombie = 2; + } + internal class ConnectionPool : LoggerBase, IConnectionPool { private const int SpinningWaitInterval = 500; private readonly Uri _uri; - private int _closedMarker = 0; + private int _poolStatus = Open; + private bool IsClosed => _poolStatus == Closed; + private bool IsZombieOrClosed => _poolStatus != Open; private int _poolSize = 0; private readonly int _maxPoolSize; @@ -45,22 +55,27 @@ internal class ConnectionPool : LoggerBase, IConnectionPool private readonly ConnectionSettings _connectionSettings; private readonly BufferSettings _bufferSettings; - private readonly BlockingCollection _availableConnections = new BlockingCollection(); + private readonly BlockingCollection _idleConnections = new BlockingCollection(); private readonly ConcurrentSet _inUseConnections = new ConcurrentSet(); - // for test only - private readonly IConnection _fakeConnection; - private IStatisticsCollector _statisticsCollector; private ConnectionPoolStatistics _statistics; public int NumberOfInUseConnections => _inUseConnections.Count; - internal int NumberOfAvailableConnections => _availableConnections.Count; + internal int NumberOfAvailableConnections => _idleConnections.Count; internal int PoolSize => _poolSize; + // for test only + private readonly IConnection _fakeConnection; internal bool DisposeCalled { - set => Interlocked.CompareExchange(ref _closedMarker, 1, 0); + set => Interlocked.Exchange(ref _poolStatus, Closed); + } + + internal int Status + { + get => _poolStatus; + set => Interlocked.Exchange(ref _poolStatus, value); } public ConnectionPool( @@ -97,13 +112,11 @@ internal ConnectionPool( bufferSettings ?? new BufferSettings(Config.DefaultConfig), logger) { _fakeConnection = connection; - _availableConnections = availableConnections ?? new BlockingCollection(); + _idleConnections = availableConnections ?? new BlockingCollection(); _inUseConnections = inUseConnections ?? new ConcurrentSet(); } - private bool IsClosed => _closedMarker > 0; - - public IPooledConnection CreateNewPooledConnection() + private IPooledConnection CreateNewPooledConnection() { PooledConnection conn = null; try @@ -245,12 +258,12 @@ private IPooledConnection Acquire(CancellationToken cancellationToken) { while (true) { - if (IsClosed) + if (IsZombieOrClosed) { ThrowObjectDisposedException(); } - if (!_availableConnections.TryTake(out connection)) + if (!_idleConnections.TryTake(out connection)) { do { @@ -263,7 +276,7 @@ private IPooledConnection Acquire(CancellationToken cancellationToken) } } - if (_availableConnections.TryTake(out connection, SpinningWaitInterval, cancellationToken)) + if (_idleConnections.TryTake(out connection, SpinningWaitInterval, cancellationToken)) { break; } @@ -289,7 +302,7 @@ private IPooledConnection Acquire(CancellationToken cancellationToken) } _inUseConnections.TryAdd(connection); - if (IsClosed) + if (IsZombieOrClosed) { if (_inUseConnections.TryRemove(connection)) { @@ -331,12 +344,12 @@ private Task AcquireAsync(CancellationToken cancellationToken) { while (true) { - if (IsClosed) + if (IsZombieOrClosed) { ThrowObjectDisposedException(); } - if (!_availableConnections.TryTake(out connection)) + if (!_idleConnections.TryTake(out connection)) { do { @@ -351,7 +364,7 @@ private Task AcquireAsync(CancellationToken cancellationToken) await Task.Delay(SpinningWaitInterval, cancellationToken).ConfigureAwait(false); - if (_availableConnections.TryTake(out connection)) + if (_idleConnections.TryTake(out connection)) { break; } @@ -377,7 +390,7 @@ private Task AcquireAsync(CancellationToken cancellationToken) } _inUseConnections.TryAdd(connection); - if (IsClosed) + if (IsZombieOrClosed) { if (_inUseConnections.TryRemove(connection)) { @@ -403,7 +416,7 @@ private bool IsConnectionPoolFull() private bool IsIdlePoolFull() { - return _idlePoolSize != Config.Infinite && _availableConnections.Count >= _idlePoolSize; + return _idlePoolSize != Config.Infinite && _idleConnections.Count >= _idlePoolSize; } public void Release(IPooledConnection connection) @@ -412,7 +425,7 @@ public void Release(IPooledConnection connection) { if (IsClosed) { - // pool already disposed. + // pool already disposed, and this connection is also already closed return; } if (!_inUseConnections.TryRemove(connection)) @@ -428,11 +441,11 @@ public void Release(IPooledConnection connection) } else { - _availableConnections.Add(connection); + _idleConnections.Add(connection); } // Just dequeue any one connection and close it will ensure that all connections in the pool will finally be closed - if (IsClosed && _availableConnections.TryTake(out connection)) + if (IsZombieOrClosed && _idleConnections.TryTake(out connection)) { DestroyConnection(connection); } @@ -468,11 +481,11 @@ public Task ReleaseAsync(IPooledConnection connection) } else { - _availableConnections.Add(connection); + _idleConnections.Add(connection); } // Just dequeue any one connection and close it will ensure that all connections in the pool will finally be closed - if (IsClosed && _availableConnections.TryTake(out connection)) + if (IsZombieOrClosed && _idleConnections.TryTake(out connection)) { await DestroyConnectionAsync(connection).ConfigureAwait(false); } @@ -502,7 +515,7 @@ protected override void Dispose(bool disposing) public void Close() { - if (Interlocked.CompareExchange(ref _closedMarker, 1, 0) == 0) + if (Interlocked.Exchange(ref _poolStatus, Closed) != Closed) { TryExecute(() => { @@ -515,12 +528,7 @@ public void Close() } } - while (_availableConnections.TryTake(out var connection)) - { - Logger?.Debug($"Disposing Available Connection {connection.Id}"); - DestroyConnection(connection); - } - + TerminateIdleConnections(); DisposeStatisticsProvider(); }); } @@ -528,7 +536,7 @@ public void Close() public Task CloseAsync() { - if (Interlocked.CompareExchange(ref _closedMarker, 1, 0) == 0) + if (Interlocked.Exchange(ref _poolStatus, Closed) != Closed) { var allCloseTasks = new List(); @@ -541,12 +549,7 @@ public Task CloseAsync() } } - while (_availableConnections.TryTake(out var connection)) - { - Logger?.Debug($"Disposing Available Connection {connection.Id}"); - allCloseTasks.Add(DestroyConnectionAsync(connection)); - } - + allCloseTasks.AddRange(TerminateIdleConnectionsAsync()); DisposeStatisticsProvider(); return Task.WhenAll(allCloseTasks); @@ -555,9 +558,51 @@ public Task CloseAsync() return TaskExtensions.GetCompletedTask(); } + public void Deactivate() + { + if (Interlocked.CompareExchange(ref _poolStatus, Zombie, Open) == Open) + { + TerminateIdleConnections(); + } + } + + public Task DeactivateAsync() + { + if (Interlocked.CompareExchange(ref _poolStatus, Zombie, Open) == Open) + { + return Task.WhenAll(TerminateIdleConnectionsAsync()); + } + return TaskExtensions.GetCompletedTask(); + } + + public void Active() + { + Interlocked.CompareExchange(ref _poolStatus, Open, Zombie); + } + + private void TerminateIdleConnections() + { + while (_idleConnections.TryTake(out var connection)) + { + Logger?.Debug($"Disposing Available Connection {connection.Id}"); + DestroyConnection(connection); + } + } + + private IEnumerable TerminateIdleConnectionsAsync() + { + var allCloseTasks = new List(); + while (_idleConnections.TryTake(out var connection)) + { + Logger?.Debug($"Disposing Available Connection {connection.Id}"); + allCloseTasks.Add(DestroyConnectionAsync(connection)); + } + return allCloseTasks; + } + private void ThrowObjectDisposedException() { - FailedToCreateConnection(this); + FailedToAcquireConnectionDueToPoolClosed(this); } private void SetupStatisticsProvider(IStatisticsCollector collector) @@ -581,7 +626,7 @@ private void DisposeStatisticsProvider() public override string ToString() { - return $"{nameof(_availableConnections)}: {{{_availableConnections.ValueToString()}}}, " + + return $"{nameof(_idleConnections)}: {{{_idleConnections.ValueToString()}}}, " + $"{nameof(_inUseConnections)}: {{{_inUseConnections}}}"; } } diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Extensions/Throw.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Extensions/Throw.cs index 59f97f0a1..45c5fdc09 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Extensions/Throw.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Extensions/Throw.cs @@ -20,11 +20,17 @@ namespace Neo4j.Driver.Internal { internal static class Throw { - public static class DriverDisposedException + public static class ObjectDisposedException { - public static void FailedToCreateConnection(object obj) + public static void FailedToAcquireConnectionDueToPoolClosed(object obj) { - throw new ObjectDisposedException(obj.GetType().Name, "Failed to acquire a new connection as the driver has already been disposed."); + throw new System.ObjectDisposedException(obj.GetType().Name, $"Failed to acquire a new connection as the driver has already been disposed."); + } + + + public static void FailedToAcquireConnection(object obj) + { + throw new System.ObjectDisposedException(obj.GetType().Name, $"Failed to acquire a new connection as the driver has already been disposed."); } } @@ -92,4 +98,4 @@ public static void IfFalse(bool value, string nameofValue) } } } -} \ No newline at end of file +} diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/IConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/IConnectionPool.cs index 8d0fa4ddc..654f7701c 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/IConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/IConnectionPool.cs @@ -20,5 +20,7 @@ namespace Neo4j.Driver.Internal internal interface IConnectionPool : IConnectionProvider, IConnectionReleaseManager { int NumberOfInUseConnections { get; } + void Deactivate(); + void Active(); } -} \ No newline at end of file +} diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs index 07f393bf3..44f8f066e 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs @@ -120,16 +120,16 @@ public void Add(IEnumerable servers) } } - public void Update(IEnumerable servers) + public void Update(IEnumerable added, IEnumerable removed) { foreach (var uri in _pools.Keys) { - if (!servers.Contains(uri)) + if (!added.Contains(uri)) { Purge(uri); } } - foreach (var uri in servers) + foreach (var uri in added) { Add(uri); } @@ -144,7 +144,7 @@ public void Purge(Uri uri) } } - public Task PurgeAsync(Uri uri) + private Task PurgeAsync(Uri uri) { var removed = _pools.TryRemove(uri, out var toRemove); if (removed) @@ -157,8 +157,7 @@ public Task PurgeAsync(Uri uri) public int NumberOfInUseConnections(Uri uri) { - IConnectionPool pool; - if (_pools.TryGetValue(uri, out pool)) + if (_pools.TryGetValue(uri, out var pool)) { return pool.NumberOfInUseConnections; } @@ -223,4 +222,4 @@ public override string ToString() return _pools.ValueToString(); } } -} \ No newline at end of file +} diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs index eb31a8e27..02b3b6605 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs @@ -29,7 +29,7 @@ internal interface IClusterConnectionPool : IDisposable // Add a set of uri to this pool void Add(IEnumerable uris); // Update the pool keys with the new server uris - void Update(IEnumerable uris); + void Update(IEnumerable added, IEnumerable removed); // Remove all the connection pool with the server specified by the uri void Purge(Uri uri); // Get number of in-use connections for the uri @@ -39,4 +39,4 @@ internal interface IClusterConnectionPool : IDisposable Task CloseAsync(); } -} \ No newline at end of file +} diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs index 37dfa0b5b..63423a75b 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs @@ -24,8 +24,8 @@ namespace Neo4j.Driver.Internal.Routing internal interface IClusterConnectionPoolManager { void AddConnectionPool(IEnumerable uris); - void UpdateConnectionPool(IEnumerable uris); + void UpdateConnectionPool(IEnumerable added, IEnumerable removed); IConnection CreateClusterConnection(Uri uri); Task CreateClusterConnectionAsync(Uri uri); } -} \ No newline at end of file +} diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs index 6b5647878..d2cb9218c 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs @@ -21,7 +21,7 @@ using System.Threading.Tasks; using Neo4j.Driver.Internal.Connector; using Neo4j.Driver.V1; -using static Neo4j.Driver.Internal.Throw.DriverDisposedException; +using static Neo4j.Driver.Internal.Throw.ObjectDisposedException; namespace Neo4j.Driver.Internal.Routing { @@ -114,9 +114,9 @@ public void AddConnectionPool(IEnumerable uris) _clusterConnectionPool.Add(uris); } - public void UpdateConnectionPool(IEnumerable uris) + public void UpdateConnectionPool(IEnumerable added, IEnumerable removed) { - _clusterConnectionPool.Update(uris); + _clusterConnectionPool.Update(added, removed); } public IConnection CreateClusterConnection(Uri uri) @@ -276,7 +276,7 @@ private async Task CreateClusterConnectionAsync(Uri uri, AccessMode private void ThrowObjectDisposedException() { - FailedToCreateConnection(this); + FailedToAcquireConnection(this); } public override string ToString() diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs index 7f1eea6db..10bfe717b 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs @@ -93,9 +93,8 @@ public void EnsureRoutingTableForMode(AccessMode mode) } var routingTable = UpdateRoutingTableWithInitialUriFallback(new HashSet { _seedUri }); - _poolManager.UpdateConnectionPool(routingTable.All()); - _routingTable = routingTable; - _logger?.Info($"Updated routingTable to be {_routingTable}"); + Update(routingTable); + } } @@ -118,9 +117,7 @@ public async Task EnsureRoutingTableForModeAsync(AccessMode mode) } var routingTable = await UpdateRoutingTableWithInitialUriFallbackAsync(new HashSet { _seedUri }).ConfigureAwait(false); - _poolManager.UpdateConnectionPool(routingTable.All()); - _routingTable = routingTable; - _logger?.Info($"Updated routingTable to be {_routingTable}"); + Update(routingTable); } finally { @@ -129,6 +126,19 @@ public async Task EnsureRoutingTableForModeAsync(AccessMode mode) } } + private void Update(IRoutingTable newTable) + { + var added = newTable.All(); + added.ExceptWith(_routingTable.All()); + var removed = _routingTable.All(); + removed.ExceptWith(newTable.All()); + + _poolManager.UpdateConnectionPool(added, removed); + _routingTable = newTable; + + _logger?.Info($"Updated routingTable to be {_routingTable}"); + } + private bool IsRoutingTableStale(IRoutingTable routingTable, AccessMode mode = AccessMode.Read) { switch (mode) @@ -338,4 +348,4 @@ public void Clear() _routingTable.Clear(); } } -} \ No newline at end of file +} From b48f3687a406112637957434b6000f06930f8ef7 Mon Sep 17 00:00:00 2001 From: Zhen Date: Mon, 20 Nov 2017 14:50:29 +0100 Subject: [PATCH 2/9] Added connection pool state tests and concurrent tests Simplified test logic to not worry about connection validation if not needed Renamed `AvailableConnections` to `IdleConnections` --- .../Neo4j.Driver.Tests/ConnectionPoolTests.cs | 570 +++++++++++++----- .../Neo4j.Driver/Internal/ConnectionPool.cs | 50 +- .../Internal/ConnectionPoolStatistics.cs | 2 +- .../Internal/ConnectionValidator.cs | 9 +- .../Neo4j.Driver/Internal/IConnectionPool.cs | 2 +- 5 files changed, 453 insertions(+), 180 deletions(-) diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs index 9657edb58..1818d3759 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs @@ -35,50 +35,77 @@ namespace Neo4j.Driver.Tests { public class ConnectionPoolTests { - internal static ConnectionPool NewConnectionPool( - IConnection connection = null, - BlockingCollection availableConnections = null, - ConcurrentSet inUseConnections = null) + internal class TestConnectionValidator : IConnectionValidator { - if(connection == null) + private readonly bool _isValid; + public TestConnectionValidator(bool isValid = true) { - var connectionMock = new Mock(); - connectionMock.Setup(x => x.IsOpen).Returns(true); - connection = connectionMock.Object; + _isValid = isValid; } - var testConfigWithIdleTimeoutAndLifetimeCheckDisabled = new Config + + public bool IsConnectionReusable(IPooledConnection connection) + { + return _isValid; + } + + public Task IsConnectionReusableAsync(IPooledConnection connection) { - MaxConnectionLifetime = Config.InfiniteInterval, - ConnectionIdleTimeout = Config.InfiniteInterval - }; - return new ConnectionPool(connection, availableConnections, inUseConnections, - poolSettings: new ConnectionPoolSettings(testConfigWithIdleTimeoutAndLifetimeCheckDisabled)); + return Task.FromResult(_isValid); + } + + public bool IsValid(IPooledConnection connection) + { + return _isValid; + } } - public class AcquireMethod + private static ConnectionPool NewConnectionPool( + BlockingCollection idleConnections = null, + ConcurrentSet inUseConnections = null, + ConnectionPoolSettings poolSettings = null, + bool isConnectionValid = true) { - private readonly ITestOutputHelper _output; - private IConnection MockedConnection + var connection = new Mock().Object; + return new ConnectionPool(connection, idleConnections, inUseConnections, + poolSettings: poolSettings, validator: new TestConnectionValidator(isConnectionValid)); + + } + + private static ConnectionPool NewConnectionPoolWithConnectionTimeoutCheckDisabled( + IConnection connection, + BlockingCollection idleConnections = null, + ConcurrentSet inUseConnections = null) + { + return new ConnectionPool(connection, idleConnections, inUseConnections, + validator: new ConnectionValidator(Config.InfiniteInterval, Config.InfiniteInterval)); + } + + private static IConnection ReusableConnection + { + get { - get - { - var mock = new Mock(); - mock.Setup(x => x.IsOpen).Returns(true); - mock.Setup(x => x.IdleTimer).Returns(MockedTimer); - mock.Setup(x => x.LifetimeTimer).Returns(MockedTimer); - return mock.Object; - } + var mock = new Mock(); + mock.Setup(x => x.IsOpen).Returns(true); + mock.Setup(x => x.IdleTimer).Returns(MockedTimer); + mock.Setup(x => x.LifetimeTimer).Returns(MockedTimer); + return mock.Object; } + } - private ITimer MockedTimer + private static ITimer MockedTimer + { + get { - get { - var mock = new Mock(); - mock.Setup(t => t.ElapsedMilliseconds).Returns(0); - return mock.Object; - } + var mock = new Mock(); + mock.Setup(t => t.ElapsedMilliseconds).Returns(0); + return mock.Object; } + } + + public class AcquireMethod + { + private readonly ITestOutputHelper _output; public AcquireMethod(ITestOutputHelper output) { @@ -90,8 +117,7 @@ public void ShouldCallConnInit() { // Given var mock = new Mock(); - mock.Setup(x => x.IsOpen).Returns(true); - var connectionPool = new ConnectionPool(mock.Object); + var connectionPool = new ConnectionPool(mock.Object, validator: new TestConnectionValidator()); // When connectionPool.Acquire(); @@ -103,10 +129,10 @@ public void ShouldCallConnInit() public void ShouldBlockWhenMaxPoolSizeReached() { var connectionPoolSettings = new ConnectionPoolSettings(new Config {MaxConnectionPoolSize = 2}); - var pool = new ConnectionPool(MockedConnection, poolSettings: connectionPoolSettings); + var pool = NewConnectionPool(poolSettings: connectionPoolSettings); var conn = pool.Acquire(); pool.Acquire(); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(2); var timer = new Stopwatch(); @@ -122,7 +148,7 @@ public void ShouldBlockWhenMaxPoolSizeReached() blockingAcquire.Wait(); timer.Stop(); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(2); timer.ElapsedMilliseconds.Should().BeGreaterOrEqualTo(1000); } @@ -136,10 +162,10 @@ public void ShouldThrowClientExceptionWhenFailedToAcquireWithinTimeout() MaxConnectionPoolSize = 2, ConnectionAcquisitionTimeout = TimeSpan.FromMilliseconds(0) }); - var pool = new ConnectionPool(MockedConnection, poolSettings: connectionPoolSettings); - var conn = pool.Acquire(); + var pool = NewConnectionPool(poolSettings: connectionPoolSettings); + pool.Acquire(); pool.Acquire(); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(2); var exception = Record.Exception(() => pool.Acquire()); @@ -151,48 +177,48 @@ public void ShouldThrowClientExceptionWhenFailedToAcquireWithinTimeout() public void ShouldNotExceedIdleLimit() { var connectionPoolSettings = new ConnectionPoolSettings(new Config {MaxIdleConnectionPoolSize = 2}); - var pool = new ConnectionPool(MockedConnection, poolSettings: connectionPoolSettings); + var pool = NewConnectionPool(poolSettings: connectionPoolSettings); var conns = new List(); for (var i = 0; i < 4; i++) { conns.Add(pool.Acquire()); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); } foreach (var conn in conns) { conn.Close(); - pool.NumberOfAvailableConnections.Should().BeLessOrEqualTo(2); + pool.NumberOfIdleConnections.Should().BeLessOrEqualTo(2); } - pool.NumberOfAvailableConnections.Should().Be(2); + pool.NumberOfIdleConnections.Should().Be(2); } [Fact] public void ShouldAcquireFromPoolIfAvailable() { var connectionPoolSettings = new ConnectionPoolSettings(new Config {MaxIdleConnectionPoolSize = 2}); - var pool = new ConnectionPool(MockedConnection, poolSettings: connectionPoolSettings); + var pool = NewConnectionPool(poolSettings: connectionPoolSettings); for (var i = 0; i < 4; i++) { var conn = pool.Acquire(); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); conn.Close(); - pool.NumberOfAvailableConnections.Should().Be(1); + pool.NumberOfIdleConnections.Should().Be(1); } - pool.NumberOfAvailableConnections.Should().Be(1); + pool.NumberOfIdleConnections.Should().Be(1); } [Fact] public void ShouldCreateNewWhenQueueIsEmpty() { - var pool = new ConnectionPool(MockedConnection); + var pool = NewConnectionPool(); pool.Acquire(); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(1); } @@ -206,7 +232,7 @@ public void ShouldCloseConnectionIfFailedToCreate() Record.Exception(() => pool.Acquire()); mockedConnection.Verify(x => x.Destroy(), Times.Once); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(0); } @@ -214,26 +240,24 @@ public void ShouldCloseConnectionIfFailedToCreate() public void ShouldCreateNewWhenQueueOnlyContainsClosedConnections() { var conns = new BlockingCollection(); - var closedId = Guid.NewGuid(); var closedMock = new Mock(); closedMock.Setup(x => x.IsOpen).Returns(false); - closedMock.Setup(x => x.Id).Returns(closedId); conns.Add(closedMock.Object); - var pool = new ConnectionPool(MockedConnection, conns); + var pool = new ConnectionPool(ReusableConnection, conns); - pool.NumberOfAvailableConnections.Should().Be(1); + pool.NumberOfIdleConnections.Should().Be(1); pool.NumberOfInUseConnections.Should().Be(0); var conn = pool.Acquire(); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(1); closedMock.Verify(x => x.IsOpen, Times.Once); closedMock.Verify(x => x.Destroy(), Times.Once); conn.Should().NotBeNull(); - conn.Id.Should().NotBe(closedId); + conn.Should().NotBe(closedMock.Object); } [Fact] @@ -246,14 +270,14 @@ public void ShouldReuseWhenOpenConnectionInQueue() mock.Setup(x => x.LifetimeTimer).Returns(MockedTimer); conns.Add(mock.Object); - var pool = new ConnectionPool(MockedConnection, conns); + var pool = new ConnectionPool(ReusableConnection, conns); - pool.NumberOfAvailableConnections.Should().Be(1); + pool.NumberOfIdleConnections.Should().Be(1); pool.NumberOfInUseConnections.Should().Be(0); var conn = pool.Acquire(); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(1); mock.Verify(x => x.IsOpen, Times.Once); conn.Should().Be(mock.Object); @@ -265,19 +289,20 @@ public void ShouldReuseOpenConnectionWhenOpenAndClosedConnectionsInQueue() var conns = new BlockingCollection(); var healthyMock = new Mock(); healthyMock.Setup(x => x.IsOpen).Returns(true); + healthyMock.Setup(x => x.LifetimeTimer).Returns(MockedTimer); var unhealthyMock = new Mock(); unhealthyMock.Setup(x => x.IsOpen).Returns(false); conns.Add(unhealthyMock.Object); conns.Add(healthyMock.Object); - var pool = NewConnectionPool(MockedConnection, conns); + var pool = new ConnectionPool(ReusableConnection, conns); - pool.NumberOfAvailableConnections.Should().Be(2); + pool.NumberOfIdleConnections.Should().Be(2); pool.NumberOfInUseConnections.Should().Be(0); var conn = pool.Acquire(); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(1); unhealthyMock.Verify(x => x.Destroy(), Times.Once); healthyMock.Verify(x => x.Destroy(), Times.Never); @@ -301,16 +326,16 @@ public void ShouldCloseIdleTooLongConn() var enableIdleTooLongTest = TimeSpan.FromMilliseconds(100); var poolSettings = new ConnectionPoolSettings( new Config {MaxIdleConnectionPoolSize = 2, ConnectionIdleTimeout = enableIdleTooLongTest}); - var pool = new ConnectionPool(MockedConnection, conns, poolSettings: poolSettings); + var pool = new ConnectionPool(ReusableConnection, conns, poolSettings: poolSettings); - pool.NumberOfAvailableConnections.Should().Be(1); + pool.NumberOfIdleConnections.Should().Be(1); pool.NumberOfInUseConnections.Should().Be(0); // When var conn = pool.Acquire(); // Then - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(1); mock.Verify(x => x.Destroy(), Times.Once); @@ -339,16 +364,16 @@ public void ShouldReuseIdleNotTooLongConn() ConnectionIdleTimeout = enableIdleTooLongTest, MaxConnectionLifetime = Config.InfiniteInterval, // disable life time check }); - var pool = new ConnectionPool(MockedConnection, conns, poolSettings: poolSettings); + var pool = new ConnectionPool(ReusableConnection, conns, poolSettings: poolSettings); - pool.NumberOfAvailableConnections.Should().Be(1); + pool.NumberOfIdleConnections.Should().Be(1); pool.NumberOfInUseConnections.Should().Be(0); // When var conn = pool.Acquire(); // Then - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(1); conn.Should().Be(mock.Object); @@ -380,9 +405,9 @@ public void ShouldAcquireNewWhenBeingUsedConcurrentlyBy(int numberOfThreads) mockConns.Enqueue(mock); } - var pool = NewConnectionPool(MockedConnection, conns); + var pool = NewConnectionPoolWithConnectionTimeoutCheckDisabled(ReusableConnection, conns); - pool.NumberOfAvailableConnections.Should().Be(numberOfThreads); + pool.NumberOfIdleConnections.Should().Be(numberOfThreads); pool.NumberOfInUseConnections.Should().Be(0); var receivedIds = new List(); @@ -399,7 +424,7 @@ public void ShouldAcquireNewWhenBeingUsedConcurrentlyBy(int numberOfThreads) Task.Delay(500); var conn = pool.Acquire(); lock (receivedIds) - receivedIds.Add(((IPooledConnection) conn).Id); + receivedIds.Add(conn.Id); } catch (Exception ex) { @@ -428,7 +453,7 @@ public void ShouldAcquireNewWhenBeingUsedConcurrentlyBy(int numberOfThreads) [Fact] public void ShouldThrowExceptionWhenAcquireCalledAfterDispose() { - var pool = new ConnectionPool(MockedConnection); + var pool = NewConnectionPool(); pool.Dispose(); var exception = Record.Exception(() => pool.Acquire()); @@ -442,24 +467,24 @@ public void ShouldThrowExceptionWhenAcquireCalledAfterDispose() public void ShouldCloseAcquiredConnectionIfPoolDisposeStarted() { // Given - var conns = new BlockingCollection(); + var idleConnections = new BlockingCollection(); var healthyMock = new Mock(); - var pool = NewConnectionPool(MockedConnection, conns); + var pool = NewConnectionPoolWithConnectionTimeoutCheckDisabled(ReusableConnection, idleConnections); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(0); // This is to simulate Acquire called first, // but before Acquire put a new conn into inUseConn, Dispose get called. // Note: Once dispose get called, it is forbiden to put anything into queue. healthyMock.Setup(x => x.IsOpen).Returns(true) - .Callback(() => pool.DisposeCalled = true); // Simulte Dispose get called at this time - conns.Add(healthyMock.Object); - pool.NumberOfAvailableConnections.Should().Be(1); + .Callback(() => pool.Close()); // Simulte Dispose get called at this time + idleConnections.Add(healthyMock.Object); + pool.NumberOfIdleConnections.Should().Be(1); // When var exception = Record.Exception(() => pool.Acquire()); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(0); healthyMock.Verify(x => x.IsOpen, Times.Once); healthyMock.Verify(x => x.Destroy(), Times.Once); @@ -473,14 +498,15 @@ public void ShouldTimeoutAfterAcquireTimeoutIfPoolIsFull() Config config = Config.Builder.WithConnectionAcquisitionTimeout(TimeSpan.FromSeconds(10)) .WithMaxConnectionPoolSize(5).WithMaxIdleConnectionPoolSize(0).ToConfig(); - var pool = new ConnectionPool(MockedConnection, null, null, null, new ConnectionPoolSettings(config), null); + var pool = NewConnectionPool(poolSettings: new ConnectionPoolSettings(config)); for (var i = 0; i < config.MaxConnectionPoolSize; i++) { pool.Acquire(); } - var stopWatch = new Stopwatch(); stopWatch.Start(); + var stopWatch = new Stopwatch(); + stopWatch.Start(); var exception = Record.Exception(() => pool.Acquire()); @@ -493,13 +519,13 @@ public void ShouldTimeoutAfterAcquireTimeoutIfPoolIsFull() public void ShouldTimeoutAfterAcquireTimeoutWhenConnectionIsNotValidated() { Config config = Config.Builder.WithConnectionAcquisitionTimeout(TimeSpan.FromSeconds(5)) - .WithConnectionTimeout(Config.InfiniteInterval) .ToConfig(); - var notValidConnection = new Mock(); - notValidConnection.Setup(x => x.IsOpen).Returns(false); + var closedConnectionMock = new Mock(); + closedConnectionMock.Setup(x => x.IsOpen).Returns(false); - var pool = new ConnectionPool(notValidConnection.Object, null, null, null, new ConnectionPoolSettings(config), null); + var pool = NewConnectionPool(poolSettings: new ConnectionPoolSettings(config), + isConnectionValid: false); var exception = Record.Exception(() => pool.Acquire()); @@ -513,7 +539,7 @@ public async void ShouldTimeoutAfterAcquireAsyncTimeoutIfPoolIsFull() Config config = Config.Builder.WithConnectionAcquisitionTimeout(TimeSpan.FromSeconds(10)) .WithMaxConnectionPoolSize(5).WithMaxIdleConnectionPoolSize(0).ToConfig(); - var pool = new ConnectionPool(MockedConnection, null, null, null, new ConnectionPoolSettings(config), null); + var pool = NewConnectionPool(poolSettings: new ConnectionPoolSettings(config)); for (var i = 0; i < config.MaxConnectionPoolSize; i++) { @@ -535,10 +561,8 @@ public async void ShouldTimeoutAfterAcquireAsyncTimeoutWhenConnectionIsNotValida Config config = Config.Builder.WithConnectionAcquisitionTimeout(TimeSpan.FromSeconds(5)) .ToConfig(); - var notValidConnection = new Mock(); - notValidConnection.Setup(x => x.IsOpen).Returns(false); - var pool = new ConnectionPool(notValidConnection.Object, null, null, null, new ConnectionPoolSettings(config), null); + var pool = NewConnectionPool(poolSettings: new ConnectionPoolSettings(config), isConnectionValid: false); var exception = await Record.ExceptionAsync(() => pool.AcquireAsync(AccessMode.Read)); @@ -553,19 +577,18 @@ public class ReleaseMethod [Fact] public void ShouldReturnToPoolWhenConnectionIsReusableAndPoolIsNotFull() { - var mock = new Mock(); - mock.Setup(x => x.IsOpen).Returns(true); + var conn = new Mock().Object; var inUseConns = new ConcurrentSet(); - inUseConns.TryAdd(mock.Object); - var pool = new ConnectionPool(null, null, inUseConns); + inUseConns.TryAdd(conn); + var pool = NewConnectionPool(inUseConnections: inUseConns); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(1); - pool.Release(mock.Object); + pool.Release(conn); - pool.NumberOfAvailableConnections.Should().Be(1); + pool.NumberOfIdleConnections.Should().Be(1); pool.NumberOfInUseConnections.Should().Be(0); } @@ -579,12 +602,12 @@ public void ShouldCloseConnectionWhenConnectionIsClosed() inUseConns.TryAdd(mock.Object); var pool = new ConnectionPool(null, null, inUseConns); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(1); pool.Release(mock.Object); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(0); mock.Verify(x => x.Destroy(), Times.Once); } @@ -600,12 +623,31 @@ public void ShouldCloseConnectionWhenConnectionIsOpenButNotResetable() inUseConns.TryAdd(mock.Object); var pool = new ConnectionPool(null, null, inUseConns); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(1); pool.Release(mock.Object); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); + pool.NumberOfInUseConnections.Should().Be(0); + mock.Verify(x => x.Destroy(), Times.Once); + } + + [Fact] + public void ShouldCloseConnectionWhenConnectionIsNotValid() + { + var mock = new Mock(); + + var inUseConns = new ConcurrentSet(); + inUseConns.TryAdd(mock.Object); + var pool = NewConnectionPool(inUseConnections: inUseConns, isConnectionValid: false); + + pool.NumberOfIdleConnections.Should().Be(0); + pool.NumberOfInUseConnections.Should().Be(1); + + pool.Release(mock.Object); + + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(0); mock.Verify(x => x.Destroy(), Times.Once); } @@ -614,7 +656,6 @@ public void ShouldCloseConnectionWhenConnectionIsOpenButNotResetable() public void ShouldCloseTheConnectionWhenConnectionIsReusableButThePoolIsFull() { var mock = new Mock(); - mock.Setup(x => x.IsOpen).Returns(true); var inUseConns = new ConcurrentSet(); inUseConns.TryAdd(mock.Object); @@ -629,14 +670,14 @@ public void ShouldCloseTheConnectionWhenConnectionIsReusableButThePoolIsFull() availableConns.Add(pooledConnMock.Object); } - var pool = new ConnectionPool(null, availableConns, inUseConns, poolSettings: poolSettings); + var pool = NewConnectionPool(availableConns, inUseConns, poolSettings); - pool.NumberOfAvailableConnections.Should().Be(10); + pool.NumberOfIdleConnections.Should().Be(10); pool.NumberOfInUseConnections.Should().Be(1); pool.Release(mock.Object); - pool.NumberOfAvailableConnections.Should().Be(10); + pool.NumberOfIdleConnections.Should().Be(10); pool.NumberOfInUseConnections.Should().Be(0); mock.Verify(x => x.Destroy(), Times.Once); } @@ -658,14 +699,14 @@ public void ShouldStartTimerBeforeReturnToPoolWhenIdleDetectionEnabled() ; var pool = new ConnectionPool(null, null, inUseConns, poolSettings: poolSettings); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(1); //When pool.Release(mock.Object); // Then - pool.NumberOfAvailableConnections.Should().Be(1); + pool.NumberOfIdleConnections.Should().Be(1); pool.NumberOfInUseConnections.Should().Be(0); timerMock.Verify(x => x.Start(), Times.Once); @@ -685,14 +726,14 @@ public void ShouldNotStartTimerBeforeReturnToPoolWhenIdleDetectionDisabled() // default pool setting have timer disabled var pool = new ConnectionPool(null, null, inUseConns); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(1); //When pool.Release(mock.Object); // Then - pool.NumberOfAvailableConnections.Should().Be(1); + pool.NumberOfIdleConnections.Should().Be(1); pool.NumberOfInUseConnections.Should().Be(0); timerMock.Verify(x => x.Start(), Times.Never); @@ -707,7 +748,7 @@ public void ShouldCloseConnectionIfPoolDisposeStarted() var inUseConns = new ConcurrentSet(); var pool = new ConnectionPool(null, null, inUseConns); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(0); var mock = new Mock(); @@ -719,11 +760,11 @@ public void ShouldCloseConnectionIfPoolDisposeStarted() // but before Release put a new conn into availConns, Dispose get called. // Note: Once dispose get called, it is forbiden to put anything into queue. mock.Setup(x => x.IsOpen).Returns(true) - .Callback(() => pool.DisposeCalled = true); // Simulte Dispose get called at this time + .Callback(() => pool.Close()); // Simulte Dispose get called at this time pool.Release(mock.Object); // Then - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(0); mock.Verify(x => x.Destroy(), Times.Once); } @@ -734,43 +775,41 @@ public class DisposeMethod [Fact] public void ShouldReleaseAll() { - var mock = new Mock(); - mock.Setup(x => x.IsOpen).Returns(true); var inUseConns = new ConcurrentSet(); + var mock = new Mock(); inUseConns.TryAdd(mock.Object); - var availableConns = new BlockingCollection(); + var idleConns = new BlockingCollection(); var mock1 = new Mock(); - mock1.Setup(x => x.IsOpen).Returns(true); - - availableConns.Add(mock1.Object); + idleConns.Add(mock1.Object); - var pool = new ConnectionPool(null, availableConns, inUseConns); - pool.NumberOfAvailableConnections.Should().Be(1); + var pool = NewConnectionPool(idleConns, inUseConns); + pool.NumberOfIdleConnections.Should().Be(1); pool.NumberOfInUseConnections.Should().Be(1); pool.Dispose(); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(0); + mock.Verify(x=>x.Destroy(), Times.Once); + mock1.Verify(x=>x.Destroy(), Times.Once); } [Fact] public void ShouldLogInUseAndAvailableConnectionIds() { var mockLogger = new Mock(); - var mock = new Mock(); - mock.Setup(x => x.IsOpen).Returns(true); + var inUseConns = new ConcurrentSet(); + var mock = new Mock(); inUseConns.TryAdd(mock.Object); var availableConns = new BlockingCollection(); var mock1 = new Mock(); - mock1.Setup(x => x.IsOpen).Returns(true); - availableConns.Add(mock1.Object); - var pool = new ConnectionPool(null, availableConns, inUseConns, mockLogger.Object); + var pool = new ConnectionPool(null, availableConns, inUseConns, mockLogger.Object, + validator: new TestConnectionValidator()); pool.Dispose(); @@ -787,7 +826,7 @@ public void ShouldReturnDirectlyWhenConnectionReleaseCalledAfterPoolDispose() var mock = new Mock(); var inUseConns = new ConcurrentSet(); inUseConns.TryAdd(mock.Object); - var pool = new ConnectionPool(null, null, inUseConns); + var pool = NewConnectionPool(inUseConnections: inUseConns); // When pool.Dispose(); @@ -804,7 +843,7 @@ public void ShouldNotThrowExceptionWhenDisposedTwice() var mock = new Mock(); var inUseConns = new ConcurrentSet(); inUseConns.TryAdd(mock.Object); - var pool = new ConnectionPool(null, null, inUseConns); + var pool = NewConnectionPool(inUseConnections: inUseConns); // When pool.Dispose(); @@ -812,7 +851,7 @@ public void ShouldNotThrowExceptionWhenDisposedTwice() // Then mock.Verify(x => x.Destroy(), Times.Once); - pool.NumberOfAvailableConnections.Should().Be(0); + pool.NumberOfIdleConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(0); } } @@ -838,7 +877,7 @@ public void ShouldReturnCorrectCountWhenOnlyInUseConnectionsPresent() { var connectionMock = new Mock(); // pool has no idle connections - var availableConnections = new BlockingCollection(); + var idleConnections = new BlockingCollection(); // pool has 3 in-use connections var inUseConnections = new ConcurrentSet(); @@ -848,7 +887,7 @@ public void ShouldReturnCorrectCountWhenOnlyInUseConnectionsPresent() var logger = new Mock().Object; - var pool = new ConnectionPool(connectionMock.Object, availableConnections, inUseConnections, logger); + var pool = new ConnectionPool(connectionMock.Object, idleConnections, inUseConnections, logger); pool.NumberOfInUseConnections.Should().Be(3); } @@ -859,15 +898,15 @@ public void ShouldReturnZeroWhenOnlyIdleConnectionsPresent() var connectionMock = new Mock(); // pool has 2 idle connections - var availableConnections = new BlockingCollection(); - availableConnections.TryAdd(new Mock().Object); - availableConnections.TryAdd(new Mock().Object); + var idleConnections = new BlockingCollection(); + idleConnections.TryAdd(new Mock().Object); + idleConnections.TryAdd(new Mock().Object); // pool has no in-use connections var inUseConnections = new ConcurrentSet(); var logger = new Mock().Object; - var pool = new ConnectionPool(connectionMock.Object, availableConnections, inUseConnections, logger); + var pool = new ConnectionPool(connectionMock.Object, idleConnections, inUseConnections, logger); pool.NumberOfInUseConnections.Should().Be(0); } @@ -878,10 +917,10 @@ public void ShouldReturnCorrectCountWhenBothIdleAndInUseConnectionsPresent() var connectionMock = new Mock(); // pool has 3 idle connections - var availableConnections = new BlockingCollection(); - availableConnections.TryAdd(new Mock().Object); - availableConnections.TryAdd(new Mock().Object); - availableConnections.TryAdd(new Mock().Object); + var idleConnections = new BlockingCollection(); + idleConnections.TryAdd(new Mock().Object); + idleConnections.TryAdd(new Mock().Object); + idleConnections.TryAdd(new Mock().Object); // pool has 2 in-use connections var inUseConnections = new ConcurrentSet(); @@ -890,7 +929,7 @@ public void ShouldReturnCorrectCountWhenBothIdleAndInUseConnectionsPresent() var logger = new Mock().Object; - var pool = new ConnectionPool(connectionMock.Object, availableConnections, inUseConnections, logger); + var pool = new ConnectionPool(connectionMock.Object, idleConnections, inUseConnections, logger); pool.NumberOfInUseConnections.Should().Be(2); } @@ -1140,7 +1179,7 @@ public async void ShoulReportPoolSizeCorrectOnConcurrentRequestsAsync() } - public class PoolStateTests + public class PoolState { // open [Fact] @@ -1158,7 +1197,7 @@ public void FromOpenViaReleaseToOpen() var inUseConnections = new ConcurrentSet(); var conn = new Mock().Object; inUseConnections.TryAdd(conn); - var pool = NewConnectionPool(null, idleQueue, inUseConnections); + var pool = NewConnectionPool(idleQueue, inUseConnections); pool.Release(conn); @@ -1176,15 +1215,15 @@ public void FromOpenViaDisposeToClosed() } [Fact] - public void FromOpenViaActiveToOpen() + public void FromOpenViaActivateToOpen() { var pool = NewConnectionPool(); - pool.Active(); + pool.Activate(); pool.Status.Should().Be(PoolStatus.Open); } [Fact] - public void FromOpenViaDeactiveToZombie() + public void FromOpenViaDeactiateToZombie() { var pool = NewConnectionPool(); pool.Deactivate(); @@ -1213,7 +1252,7 @@ public void FromZombieViaReleaseToZombie() var conn = new Mock().Object; inUseConnections.TryAdd(conn); - var pool = NewConnectionPool(null, idleQueue, inUseConnections); + var pool = NewConnectionPool(idleQueue, inUseConnections); pool.Status = PoolStatus.Zombie; pool.Release(conn); @@ -1235,18 +1274,18 @@ public void FromZombieViaDisposeToClosed() } [Fact] - public void FromZombieViaActiveToOpen() + public void FromZombieViaActivateToOpen() { var pool = NewConnectionPool(); pool.Status = PoolStatus.Zombie; - pool.Active(); + pool.Activate(); pool.Status.Should().Be(PoolStatus.Open); } [Fact] - public void FromZombieViaDeactiveToZombie() + public void FromZombieViaDeactiateToZombie() { var pool = NewConnectionPool(); pool.Status = PoolStatus.Zombie; @@ -1278,7 +1317,7 @@ public void FromClosedViaReleaseToClosed() var conn = new Mock().Object; inUseConnections.TryAdd(conn); - var pool = NewConnectionPool(null, idleQueue, inUseConnections); + var pool = NewConnectionPool(idleQueue, inUseConnections); pool.Status = PoolStatus.Closed; pool.Release(conn); @@ -1300,18 +1339,18 @@ public void FromClosedViaDisposeToClosed() } [Fact] - public void FromClosedViaActiveToClosed() + public void FromClosedViaActivateToClosed() { var pool = NewConnectionPool(); pool.Status = PoolStatus.Closed; - pool.Active(); + pool.Activate(); pool.Status.Should().Be(PoolStatus.Closed); } [Fact] - public void FromClosedViaDeactiveToClosed() + public void FromClosedViaDeactivateToClosed() { var pool = NewConnectionPool(); pool.Status = PoolStatus.Closed; @@ -1321,5 +1360,232 @@ public void FromClosedViaDeactiveToClosed() pool.Status.Should().Be(PoolStatus.Closed); } } + + public class DeactiviateMethod + { + private static List> FillIdleConnections( + BlockingCollection idleConnections, int count) + { + var idleMocks = new List>(); + for (var i = 0; i < count; i++) + { + var connMock = new Mock(); + idleMocks.Add(connMock); + idleConnections.Add(connMock.Object); + } + return idleMocks; + } + + private static List> FillInUseConnections( + ConcurrentSet inUseConnections, int count) + { + var inUseMocks = new List>(); + for (var i = 0; i < count; i++) + { + var connMock = new Mock(); + inUseMocks.Add(connMock); + inUseConnections.TryAdd(connMock.Object); + } + return inUseMocks; + } + + private static void VerifyDestroyCalledOnce(List> mocks) + { + foreach (var conn in mocks) + { + conn.Verify(x=>x.Destroy(), Times.Once); + } + } + + private static void VerifyDestroyAsyncCalledOnce(List> mocks) + { + foreach (var conn in mocks) + { + conn.Verify(x=>x.DestroyAsync(), Times.Once); + } + } + + [Fact] + public void ShouldCloseAllIdleConnectoins() + { + // Given + var idleConnections = new BlockingCollection (); + var idleMocks = FillIdleConnections(idleConnections, 10); + var pool = NewConnectionPool(idleConnections); + + // When + pool.Deactivate(); + + // Then + idleConnections.Count.Should().Be(0); + VerifyDestroyCalledOnce(idleMocks); + } + + [Fact] + public async Task ShouldCloseAllIdleConnectoinsAsync() + { + // Given + var idleConnections = new BlockingCollection (); + + var idleMocks = FillIdleConnections(idleConnections, 10); + + var pool = NewConnectionPool(idleConnections); + + // When + await pool.DeactivateAsync(); + + // Then + idleConnections.Count.Should().Be(0); + VerifyDestroyAsyncCalledOnce(idleMocks); + } + + // concurrent test + // concurrently close and deactive + [Fact] + public void DeactiviateAndThenCloseShouldCloseAllConnections() + { + var idleConnections = new BlockingCollection(); + var idleMocks = FillIdleConnections(idleConnections, 5); + + var inUseConnections = new ConcurrentSet(); + var inUseMocks = FillInUseConnections(inUseConnections, 10); + var pool = NewConnectionPool(idleConnections, inUseConnections); + + // When + pool.Deactivate(); + // Then + idleConnections.Count.Should().Be(0); + inUseConnections.Count.Should().Be(10); + VerifyDestroyCalledOnce(idleMocks); + // refill the idle connections + var newIdleMocks = FillIdleConnections(idleConnections, 5); + idleConnections.Count.Should().Be(5); + + // When + pool.Close(); + // Then + idleConnections.Count.Should().Be(0); + inUseConnections.Count.Should().Be(0); + + VerifyDestroyCalledOnce(newIdleMocks); + VerifyDestroyCalledOnce(inUseMocks); + } + + // cncurrent tests + // ConcurrentlyAcquireAndDeactivate + [Fact] + public void ReturnConnectionIfAcquiredValidConnectionBeforeZombified() + { + // Given + var idleConnections = new BlockingCollection(); + var openConnMock = new Mock(); + var pool = NewConnectionPoolWithConnectionTimeoutCheckDisabled(ReusableConnection, idleConnections); + + pool.NumberOfIdleConnections.Should().Be(0); + pool.NumberOfInUseConnections.Should().Be(0); + + // This is to simulate Acquire called first, + // but before Acquire put a new conn into inUseConn, Deactiviate get called. + openConnMock.Setup(x => x.IsOpen).Returns(true) + .Callback(() => pool.Deactivate()); + idleConnections.Add(openConnMock.Object); + pool.NumberOfIdleConnections.Should().Be(1); + // When + pool.Acquire(); + + pool.NumberOfIdleConnections.Should().Be(0); + pool.NumberOfInUseConnections.Should().Be(1); + openConnMock.Verify(x => x.IsOpen, Times.Once); + } + + [Fact] + public void ErrorIfAcquiredInvalidConnectionBeforeZombified() + { + // Given + var idleConnections = new BlockingCollection(); + var closedConnMock = new Mock(); + var pool = NewConnectionPoolWithConnectionTimeoutCheckDisabled(ReusableConnection, idleConnections); + + pool.NumberOfIdleConnections.Should().Be(0); + pool.NumberOfInUseConnections.Should().Be(0); + + // This is to simulate Acquire called first, + // but before Acquire put a new conn into inUseConn, Deactiviate get called. + // However here, this connection is not healthy and will be destoried directly + closedConnMock.Setup(x => x.IsOpen).Returns(false) + .Callback(() => pool.Deactivate()); + idleConnections.Add(closedConnMock.Object); + pool.NumberOfIdleConnections.Should().Be(1); + // When + var exception = Record.Exception(() => pool.Acquire()); + + pool.NumberOfIdleConnections.Should().Be(0); + pool.NumberOfInUseConnections.Should().Be(0); + closedConnMock.Verify(x => x.IsOpen, Times.Once); + closedConnMock.Verify(x => x.Destroy(), Times.Once); + exception.Should().BeOfType(); + exception.Message.Should().StartWith("Failed to acquire a new connection"); + } + + // concurrent test + // ConcurrentlyReleaseAndDeactiate + [Fact] + public void ShouldCloseConnectionReleasedDuringDiactiviation() + { + // Given + var idleConnections = new BlockingCollection(); + var idleMocks = new List>(); + idleMocks.AddRange(FillIdleConnections(idleConnections, 5)); + + var specialConn = new Mock(); + var releasedConn = new Mock(); + specialConn.Setup(x => x.Destroy()).Callback(() => { idleConnections.Add(releasedConn.Object); }); + + idleConnections.Add(specialConn.Object); + idleMocks.Add(specialConn); + idleMocks.Add(releasedConn); + idleMocks.Count.Should().Be(5 + 2); + + var pool = NewConnectionPool(idleConnections); + + // When + pool.Deactivate(); + + // Then + idleConnections.Count.Should().Be(0); + VerifyDestroyCalledOnce(idleMocks); + } + + // concurrent test + // ConcurrentlyActivateAndDeactivate + [Fact] + public void ShouldCloseAllIdleConnectionsRegardlessActivateCalled() + { + // Given + var idleConnections = new BlockingCollection(); + var idleMocks = new List>(); + idleMocks.AddRange(FillIdleConnections(idleConnections, 5)); + + var specialConn = new Mock(); + var pool = NewConnectionPool(idleConnections); + specialConn.Setup(x => x.Destroy()).Callback(() => + { + pool.Activate(); + }); + + idleConnections.Add(specialConn.Object); + idleMocks.Add(specialConn); + + idleMocks.AddRange(FillIdleConnections(idleConnections, 5)); + idleMocks.Count.Should().Be(5 + 1 + 5); + + // When + pool.Deactivate(); + + // Then + idleConnections.Count.Should().Be(0); + VerifyDestroyCalledOnce(idleMocks); + } + } } } diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs index 8805c05c1..69627f526 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs @@ -51,7 +51,7 @@ internal class ConnectionPool : LoggerBase, IConnectionPool private readonly int _idlePoolSize; private readonly TimeSpan _connAcquisitionTimeout; - private readonly ConnectionValidator _connectionValidator; + private readonly IConnectionValidator _connectionValidator; private readonly ConnectionSettings _connectionSettings; private readonly BufferSettings _bufferSettings; @@ -62,15 +62,11 @@ internal class ConnectionPool : LoggerBase, IConnectionPool private ConnectionPoolStatistics _statistics; public int NumberOfInUseConnections => _inUseConnections.Count; - internal int NumberOfAvailableConnections => _idleConnections.Count; + internal int NumberOfIdleConnections => _idleConnections.Count; internal int PoolSize => _poolSize; // for test only private readonly IConnection _fakeConnection; - internal bool DisposeCalled - { - set => Interlocked.Exchange(ref _poolStatus, Closed); - } internal int Status { @@ -103,17 +99,22 @@ public ConnectionPool( internal ConnectionPool( IConnection connection, - BlockingCollection availableConnections = null, + BlockingCollection idleConnections = null, ConcurrentSet inUseConnections = null, ILogger logger = null, ConnectionPoolSettings poolSettings = null, - BufferSettings bufferSettings = null) + BufferSettings bufferSettings = null, + IConnectionValidator validator = null) : this(null, null, poolSettings ?? new ConnectionPoolSettings(Config.DefaultConfig), bufferSettings ?? new BufferSettings(Config.DefaultConfig), logger) { _fakeConnection = connection; - _idleConnections = availableConnections ?? new BlockingCollection(); + _idleConnections = idleConnections ?? new BlockingCollection(); _inUseConnections = inUseConnections ?? new ConcurrentSet(); + if (validator != null) + { + _connectionValidator = validator; + } } private IPooledConnection CreateNewPooledConnection() @@ -199,6 +200,18 @@ private void DestroyConnection(IPooledConnection conn) _statistics?.IncrementConnectionClosed(); } + private async Task DestroyConnectionAsync(IPooledConnection conn) + { + DecrementPoolSize(); + if (conn == null) + { + return; + } + + _statistics?.IncrementConnectionToClose(); + await conn.DestroyAsync().ConfigureAwait(false); + _statistics?.IncrementConnectionClosed(); + } /// /// Returns true if pool size is successfully increased, otherwise false. @@ -222,19 +235,6 @@ private void DecrementPoolSize() Interlocked.Decrement(ref _poolSize); } - private async Task DestroyConnectionAsync(IPooledConnection conn) - { - DecrementPoolSize(); - if (conn == null) - { - return; - } - - _statistics?.IncrementConnectionToClose(); - await conn.DestroyAsync().ConfigureAwait(false); - _statistics?.IncrementConnectionClosed(); - } - public IConnection Acquire(AccessMode mode) { return Acquire(); @@ -302,7 +302,7 @@ private IPooledConnection Acquire(CancellationToken cancellationToken) } _inUseConnections.TryAdd(connection); - if (IsZombieOrClosed) + if (IsClosed) { if (_inUseConnections.TryRemove(connection)) { @@ -390,7 +390,7 @@ private Task AcquireAsync(CancellationToken cancellationToken) } _inUseConnections.TryAdd(connection); - if (IsZombieOrClosed) + if (IsClosed) { if (_inUseConnections.TryRemove(connection)) { @@ -575,7 +575,7 @@ public Task DeactivateAsync() return TaskExtensions.GetCompletedTask(); } - public void Active() + public void Activate() { Interlocked.CompareExchange(ref _poolStatus, Open, Zombie); } diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPoolStatistics.cs b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPoolStatistics.cs index 9f16d0d65..0dabb46ef 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPoolStatistics.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPoolStatistics.cs @@ -8,7 +8,7 @@ namespace Neo4j.Driver.Internal internal class ConnectionPoolStatistics : IStatisticsProvider, IDisposable { public int InUseConns => _pool?.NumberOfInUseConnections ?? _inUseConns; - public int AvailableConns => _pool?.NumberOfAvailableConnections ?? _availableConns; + public int AvailableConns => _pool?.NumberOfIdleConnections ?? _availableConns; public long ConnCreated => _connCreated; public long ConnClosed => _connClosed; public long ConnToCreate => _connToCreate; diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionValidator.cs b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionValidator.cs index 24e688ea8..69cb1bc07 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionValidator.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionValidator.cs @@ -3,7 +3,14 @@ namespace Neo4j.Driver.Internal { - internal class ConnectionValidator + internal interface IConnectionValidator + { + bool IsConnectionReusable(IPooledConnection connection); + Task IsConnectionReusableAsync(IPooledConnection connection); + bool IsValid(IPooledConnection connection); + } + + internal class ConnectionValidator : IConnectionValidator { private readonly TimeSpan _connIdleTimeout; private readonly TimeSpan _maxConnLifetime; diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/IConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/IConnectionPool.cs index 654f7701c..ea809d48e 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/IConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/IConnectionPool.cs @@ -21,6 +21,6 @@ internal interface IConnectionPool : IConnectionProvider, IConnectionReleaseMana { int NumberOfInUseConnections { get; } void Deactivate(); - void Active(); + void Activate(); } } From 63f961fa801344735e19a5beaa90925d67e1766f Mon Sep 17 00:00:00 2001 From: Zhen Date: Mon, 20 Nov 2017 16:50:20 +0100 Subject: [PATCH 3/9] Plug in routing table update routing table to not purge connections --- .../Neo4j.Driver/Internal/IConnectionPool.cs | 3 ++ .../Internal/Routing/ClusterConnectionPool.cs | 44 ++++++++++++++++--- .../Routing/IClusterConnectionPool.cs | 1 + .../Routing/IClusterConnectionPoolManager.cs | 1 + .../Internal/Routing/LoadBalancer.cs | 5 +++ .../Internal/Routing/RoutingTableManager.cs | 15 ++++++- 6 files changed, 61 insertions(+), 8 deletions(-) diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/IConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/IConnectionPool.cs index ea809d48e..fe01738ba 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/IConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/IConnectionPool.cs @@ -15,12 +15,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +using System.Threading.Tasks; + namespace Neo4j.Driver.Internal { internal interface IConnectionPool : IConnectionProvider, IConnectionReleaseManager { int NumberOfInUseConnections { get; } void Deactivate(); + Task DeactivateAsync(); void Activate(); } } diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs index 44f8f066e..d023502f8 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs @@ -77,8 +77,7 @@ private IConnectionPool CreateNewConnectionPool(Uri uri) public IConnection Acquire(Uri uri) { - IConnectionPool pool; - if (!_pools.TryGetValue(uri, out pool)) + if (!_pools.TryGetValue(uri, out var pool)) { return null; } @@ -89,8 +88,7 @@ public IConnection Acquire(Uri uri) public Task AcquireAsync(Uri uri) { - IConnectionPool pool; - if (!_pools.TryGetValue(uri, out pool)) + if (!_pools.TryGetValue(uri, out var pool)) { return Task.FromResult((IConnection)null); } @@ -122,16 +120,48 @@ public void Add(IEnumerable servers) public void Update(IEnumerable added, IEnumerable removed) { - foreach (var uri in _pools.Keys) + foreach (var uri in added) + { + if (_pools.ContainsKey(uri)) + { + _pools[uri].Activate(); + } + else + { + Add(uri); + } + } + foreach (var uri in removed) { - if (!added.Contains(uri)) + _pools[uri].Deactivate(); + if (_pools[uri].NumberOfInUseConnections == 0) { Purge(uri); } } + } + + public async Task UpdateAsync(IEnumerable added, IEnumerable removed) + { foreach (var uri in added) { - Add(uri); + if (_pools.ContainsKey(uri)) + { + _pools[uri].Activate(); + } + else + { + Add(uri); + } + } + // TODO chain this part and use task.waitAll + foreach (var uri in removed) + { + await _pools[uri].DeactivateAsync().ConfigureAwait(false); + if (_pools[uri].NumberOfInUseConnections == 0) + { + await PurgeAsync(uri).ConfigureAwait(false); + } } } diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs index 02b3b6605..533de6dfe 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs @@ -30,6 +30,7 @@ internal interface IClusterConnectionPool : IDisposable void Add(IEnumerable uris); // Update the pool keys with the new server uris void Update(IEnumerable added, IEnumerable removed); + Task UpdateAsync(IEnumerable added, IEnumerable removed); // Remove all the connection pool with the server specified by the uri void Purge(Uri uri); // Get number of in-use connections for the uri diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs index 63423a75b..9d0e1c552 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs @@ -25,6 +25,7 @@ internal interface IClusterConnectionPoolManager { void AddConnectionPool(IEnumerable uris); void UpdateConnectionPool(IEnumerable added, IEnumerable removed); + Task UpdateConnectionPoolAsync(IEnumerable added, IEnumerable removed); IConnection CreateClusterConnection(Uri uri); Task CreateClusterConnectionAsync(Uri uri); } diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs index d2cb9218c..b75eaa42e 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs @@ -119,6 +119,11 @@ public void UpdateConnectionPool(IEnumerable added, IEnumerable remove _clusterConnectionPool.Update(added, removed); } + public Task UpdateConnectionPoolAsync(IEnumerable added, IEnumerable removed) + { + return _clusterConnectionPool.UpdateAsync(added, removed); + } + public IConnection CreateClusterConnection(Uri uri) { return CreateClusterConnection(uri, AccessMode.Write); diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs index 10bfe717b..45db10acc 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs @@ -117,7 +117,7 @@ public async Task EnsureRoutingTableForModeAsync(AccessMode mode) } var routingTable = await UpdateRoutingTableWithInitialUriFallbackAsync(new HashSet { _seedUri }).ConfigureAwait(false); - Update(routingTable); + await UpdateAsync(routingTable).ConfigureAwait(false); } finally { @@ -139,6 +139,19 @@ private void Update(IRoutingTable newTable) _logger?.Info($"Updated routingTable to be {_routingTable}"); } + private async Task UpdateAsync(IRoutingTable newTable) + { + var added = newTable.All(); + added.ExceptWith(_routingTable.All()); + var removed = _routingTable.All(); + removed.ExceptWith(newTable.All()); + + await _poolManager.UpdateConnectionPoolAsync(added, removed).ConfigureAwait(false); + _routingTable = newTable; + + _logger?.Info($"Updated routingTable to be {_routingTable}"); + } + private bool IsRoutingTableStale(IRoutingTable routingTable, AccessMode mode = AccessMode.Read) { switch (mode) From 0de98ba2913b4c01cc4895d7dbff3daf67dab6d9 Mon Sep 17 00:00:00 2001 From: Zhen Date: Mon, 20 Nov 2017 17:23:04 +0100 Subject: [PATCH 4/9] Remove purge on connection errors but deactiviate Added tests for verify deactiviation --- .../Neo4j.Driver.Tests/ConnectionPoolTests.cs | 3 +- .../Routing/ClusterConnectionPoolTests.cs | 78 ++++++++++++++++--- .../Routing/LoadBalancerTests.cs | 12 +-- .../Internal/Routing/ClusterConnection.cs | 34 +++++++- .../Internal/Routing/ClusterConnectionPool.cs | 45 ++++++----- .../Routing/IClusterConnectionPool.cs | 5 +- .../Internal/Routing/IClusterErrorHandler.cs | 4 +- .../Internal/Routing/LoadBalancer.cs | 15 +++- .../Internal/Routing/RoutingTableManager.cs | 9 +-- 9 files changed, 152 insertions(+), 53 deletions(-) diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs index 1818d3759..3ea665840 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs @@ -546,7 +546,8 @@ public async void ShouldTimeoutAfterAcquireAsyncTimeoutIfPoolIsFull() pool.Acquire(); } - var stopWatch = new Stopwatch(); stopWatch.Start(); + var stopWatch = new Stopwatch(); + stopWatch.Start(); var exception = await Record.ExceptionAsync(() => pool.AcquireAsync(AccessMode.Read)); diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs index 7f7f61592..9abb91000 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs @@ -47,7 +47,7 @@ public void ShouldEnsureInitialRouter() var pool = new ClusterConnectionPool(connSettings, poolSettings, bufferSettings, uris, null); pool.ToString().Should().Be( - "[{bolt://123:456/ : _availableConnections: {[]}, _inUseConnections: {[]}}]"); + "[{bolt://123:456/ : _idleConnections: {[]}, _inUseConnections: {[]}}]"); } } @@ -173,20 +173,40 @@ public void ShouldRemoveServerPoolIfNotPresentInNewServers() var mockedConnectionPool = new Mock(); var connectionPoolDict = new ConcurrentDictionary(); connectionPoolDict.GetOrAdd(ServerUri, mockedConnectionPool.Object); + mockedConnectionPool.Setup(x => x.NumberOfInUseConnections).Returns(0); // no need to explicitly config this var pool = new ClusterConnectionPool(mockedConnectionPool.Object, connectionPoolDict); // When pool.Update(new Uri[0], new[] {ServerUri}); // Then + mockedConnectionPool.Verify(x => x.Deactivate(), Times.Once); // first deactiviate then remove connectionPoolDict.Count.Should().Be(0); } + + [Fact] + public void ShouldDeactiviateServerPoolIfNotPresentInNewServersButHasInUseConnections() + { + // Given + var mockedConnectionPool = new Mock(); + var connectionPoolDict = new ConcurrentDictionary(); + connectionPoolDict.GetOrAdd(ServerUri, mockedConnectionPool.Object); + mockedConnectionPool.Setup(x => x.NumberOfInUseConnections).Returns(10); // non-zero number + var pool = new ClusterConnectionPool(mockedConnectionPool.Object, connectionPoolDict); + + // When + pool.Update(new Uri[0], new[] {ServerUri}); + + // Then + mockedConnectionPool.Verify(x => x.Deactivate(), Times.Once); + connectionPoolDict.Count.Should().Be(1); + } } - public class PurgeMethod + public class AddMethod { [Fact] - public void ShouldRemovedIfExist() + public void ShouldActiviateIfExist() { // Given var mockedConnectionPool = new Mock(); @@ -196,16 +216,56 @@ public void ShouldRemovedIfExist() var pool = new ClusterConnectionPool(null, connectionPoolDict); // When - pool.Purge(ServerUri); + pool.Add(new []{ServerUri}); // Then - mockedConnectionPool.Verify(x => x.Close(), Times.Once); - connectionPoolDict.Count.Should().Be(0); - connectionPoolDict.ContainsKey(ServerUri).Should().BeFalse(); + mockedConnectionPool.Verify(x => x.Activate(), Times.Once); + connectionPoolDict.Count.Should().Be(1); + connectionPoolDict.ContainsKey(ServerUri).Should().BeTrue(); + } + + [Fact] + public void ShouldAddIfNotFound() + { + // Given + var connectionPoolDict = new ConcurrentDictionary(); + var fakePoolMock = new Mock(); + + var pool = new ClusterConnectionPool(fakePoolMock.Object, connectionPoolDict); + + // When + pool.Add(new[] {ServerUri}); + + // Then + connectionPoolDict.Count.Should().Be(1); + connectionPoolDict.ContainsKey(ServerUri).Should().BeTrue(); + connectionPoolDict[ServerUri].Should().Be(fakePoolMock.Object); + } + } + + public class DeactiviateMethod + { + [Fact] + public void ShouldDeactiviateIfExist() + { + // Given + var mockedConnectionPool = new Mock(); + var connectionPoolDict = new ConcurrentDictionary(); + connectionPoolDict.GetOrAdd(ServerUri, mockedConnectionPool.Object); + + var pool = new ClusterConnectionPool(null, connectionPoolDict); + + // When + pool.Deactiviate(ServerUri); + + // Then + mockedConnectionPool.Verify(x => x.Deactivate(), Times.Once); + connectionPoolDict.Count.Should().Be(1); + connectionPoolDict.ContainsKey(ServerUri).Should().BeTrue(); } [Fact] - public void ShouldRemoveNothingIfNotFound() + public void ShouldDeactiviateNothingIfNotFound() { // Given var connectionPoolDict = new ConcurrentDictionary(); @@ -213,7 +273,7 @@ public void ShouldRemoveNothingIfNotFound() var pool = new ClusterConnectionPool(null, connectionPoolDict); // When - pool.Purge(ServerUri); + pool.Deactiviate(ServerUri); // Then connectionPoolDict.Count.Should().Be(0); diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs index 2b8b2eb6c..366df1a0b 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs @@ -44,7 +44,7 @@ public void ShouldRmoveFromLoadBalancer() var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableManagerMock.Object); loadBalancer.OnConnectionError(uri, new ClientException()); - clusterPoolMock.Verify(x => x.Purge(uri), Times.Once); + clusterPoolMock.Verify(x => x.Deactiviate(uri), Times.Once); routingTableMock.Verify(x => x.Remove(uri), Times.Once); routingTableMock.Verify(x => x.RemoveWriter(uri), Times.Never); } @@ -63,7 +63,7 @@ public void ShouldRemoveWriterFromRoutingTable() var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableManagerMock.Object); loadBalancer.OnWriteError(uri); - clusterPoolMock.Verify(x => x.Purge(uri), Times.Never); + clusterPoolMock.Verify(x => x.Deactiviate(uri), Times.Never); routingTableMock.Verify(x => x.Remove(uri), Times.Never); routingTableMock.Verify(x => x.RemoveWriter(uri), Times.Once); } @@ -141,7 +141,7 @@ public void ShouldForgetServerWhenFailedToEstablishConn(AccessMode mode) // should be removed routingTableMock.Verify(m => m.Remove(uri), Times.Once); - clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Once); + clusterConnPoolMock.Verify(m => m.Deactiviate(uri), Times.Once); } [Theory] @@ -170,7 +170,7 @@ public void ShouldThrowErrorDirectlyIfSecurityError(AccessMode mode) // while the server is not removed routingTableMock.Verify(m => m.Remove(uri), Times.Never); - clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Never); + clusterConnPoolMock.Verify(m => m.Deactiviate(uri), Times.Never); } [Theory] @@ -199,7 +199,7 @@ public void ShouldThrowErrorDirectlyIfProtocolError(AccessMode mode) // while the server is not removed routingTableMock.Verify(m => m.Remove(uri), Times.Never); - clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Never); + clusterConnPoolMock.Verify(m => m.Deactiviate(uri), Times.Never); } [Theory] @@ -253,4 +253,4 @@ private static IConnection NewConnectionMock(Uri uri) } } } -} \ No newline at end of file +} diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnection.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnection.cs index 3f9e79079..c45506555 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnection.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnection.cs @@ -16,6 +16,7 @@ // limitations under the License. using System; +using System.Threading.Tasks; using Neo4j.Driver.Internal.Connector; using Neo4j.Driver.V1; @@ -47,7 +48,35 @@ public override void OnError(Exception error) { _errorHandler.OnConnectionError(_uri, error); } - else if (error.IsClusterError()) + else + { + HandleClusterError(error); + } + throw error; + } + + public async Task OnErrorAsync(Exception error) + { + if (error is ServiceUnavailableException) + { + await _errorHandler.OnConnectionErrorAsync(_uri, error).ConfigureAwait(false); + throw new SessionExpiredException( + $"Server at {_uri} is no longer available due to error: {error.Message}.", error); + } + else if (error.IsDatabaseUnavailableError()) + { + await _errorHandler.OnConnectionErrorAsync(_uri, error).ConfigureAwait(false); + } + else + { + HandleClusterError(error); + } + throw error; + } + + private void HandleClusterError(Exception error) + { + if (error.IsClusterError()) { switch (_mode) { @@ -65,7 +94,6 @@ public override void OnError(Exception error) throw new ArgumentOutOfRangeException($"Unsupported mode type {_mode}"); } } - throw error; } } -} \ No newline at end of file +} diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs index d023502f8..82315d1b0 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs @@ -19,6 +19,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Neo4j.Driver.Internal.Connector; @@ -113,14 +114,6 @@ private void Add(Uri uri) public void Add(IEnumerable servers) { foreach (var uri in servers) - { - Add(uri); - } - } - - public void Update(IEnumerable added, IEnumerable removed) - { - foreach (var uri in added) { if (_pools.ContainsKey(uri)) { @@ -131,6 +124,11 @@ public void Update(IEnumerable added, IEnumerable removed) Add(uri); } } + } + + public void Update(IEnumerable added, IEnumerable removed) + { + Add(added); foreach (var uri in removed) { _pools[uri].Deactivate(); @@ -143,17 +141,7 @@ public void Update(IEnumerable added, IEnumerable removed) public async Task UpdateAsync(IEnumerable added, IEnumerable removed) { - foreach (var uri in added) - { - if (_pools.ContainsKey(uri)) - { - _pools[uri].Activate(); - } - else - { - Add(uri); - } - } + Add(added); // TODO chain this part and use task.waitAll foreach (var uri in removed) { @@ -165,7 +153,24 @@ public async Task UpdateAsync(IEnumerable added, IEnumerable removed) } } - public void Purge(Uri uri) + public void Deactiviate(Uri uri) + { + if (_pools.TryGetValue(uri, out var pool)) + { + pool.Deactivate(); + } + } + + public Task DeactiviateAsync(Uri uri) + { + if (_pools.TryGetValue(uri, out var pool)) + { + return pool.DeactivateAsync(); + } + return TaskExtensions.GetCompletedTask(); + } + + private void Purge(Uri uri) { var removed = _pools.TryRemove(uri, out var toRemove); if (removed) diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs index 533de6dfe..f913129e4 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs @@ -31,8 +31,9 @@ internal interface IClusterConnectionPool : IDisposable // Update the pool keys with the new server uris void Update(IEnumerable added, IEnumerable removed); Task UpdateAsync(IEnumerable added, IEnumerable removed); - // Remove all the connection pool with the server specified by the uri - void Purge(Uri uri); + // Deactiviate all the connection pool with the server specified by the uri + void Deactiviate(Uri uri); + Task DeactiviateAsync(Uri uri); // Get number of in-use connections for the uri int NumberOfInUseConnections(Uri uri); diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterErrorHandler.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterErrorHandler.cs index 81227b836..e939cf246 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterErrorHandler.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterErrorHandler.cs @@ -15,12 +15,14 @@ // See the License for the specific language governing permissions and // limitations under the License. using System; +using System.Threading.Tasks; namespace Neo4j.Driver.Internal.Routing { internal interface IClusterErrorHandler { void OnConnectionError(Uri uri, Exception e); + Task OnConnectionErrorAsync(Uri uri, Exception e); void OnWriteError(Uri uri); } -} \ No newline at end of file +} diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs index b75eaa42e..173bff7b5 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs @@ -101,7 +101,14 @@ public void OnConnectionError(Uri uri, Exception e) { _logger?.Info($"Server at {uri} is no longer available due to error: {e.Message}."); _routingTableManager.RoutingTable.Remove(uri); - _clusterConnectionPool.Purge(uri); + _clusterConnectionPool.Deactiviate(uri); + } + + public Task OnConnectionErrorAsync(Uri uri, Exception e) + { + _logger?.Info($"Server at {uri} is no longer available due to error: {e.Message}."); + _routingTableManager.RoutingTable.Remove(uri); + return _clusterConnectionPool.DeactiviateAsync(uri); } public void OnWriteError(Uri uri) @@ -268,13 +275,13 @@ private async Task CreateClusterConnectionAsync(Uri uri, AccessMode { return new ClusterConnection(conn, uri, mode, this); } - OnConnectionError(uri, new ArgumentException( + await OnConnectionErrorAsync(uri, new ArgumentException( $"Routing table {_routingTableManager.RoutingTable} contains a server {uri} " + - $"that is not known to cluster connection pool {_clusterConnectionPool}.")); + $"that is not known to cluster connection pool {_clusterConnectionPool}.")).ConfigureAwait(false); } catch (ServiceUnavailableException e) { - OnConnectionError(uri, e); + await OnConnectionErrorAsync(uri, e).ConfigureAwait(false); } return null; } diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs index 45db10acc..2a5fdd63f 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs @@ -126,7 +126,7 @@ public async Task EnsureRoutingTableForModeAsync(AccessMode mode) } } - private void Update(IRoutingTable newTable) + internal void Update(IRoutingTable newTable) { var added = newTable.All(); added.ExceptWith(_routingTable.All()); @@ -139,7 +139,7 @@ private void Update(IRoutingTable newTable) _logger?.Info($"Updated routingTable to be {_routingTable}"); } - private async Task UpdateAsync(IRoutingTable newTable) + internal async Task UpdateAsync(IRoutingTable newTable) { var added = newTable.All(); added.ExceptWith(_routingTable.All()); @@ -355,10 +355,5 @@ private async Task RediscoveryAsync(IConnection conn) return new RoutingTable(discoveryManager.Routers, discoveryManager.Readers, discoveryManager.Writers, discoveryManager.ExpireAfterSeconds); } - - public void Clear() - { - _routingTable.Clear(); - } } } From afa64938a50a41e810d9517a234b81cdae70559f Mon Sep 17 00:00:00 2001 From: Zhen Date: Tue, 21 Nov 2017 12:02:02 +0100 Subject: [PATCH 5/9] I miss-spelled `deactivate` --- .../Neo4j.Driver.Tests/ConnectionPoolTests.cs | 8 ++++---- .../Routing/ClusterConnectionPoolTests.cs | 16 ++++++++-------- .../Routing/LoadBalancerTests.cs | 10 +++++----- .../Internal/Routing/ClusterConnectionPool.cs | 4 ++-- .../Internal/Routing/IClusterConnectionPool.cs | 6 +++--- .../Internal/Routing/LoadBalancer.cs | 4 ++-- 6 files changed, 24 insertions(+), 24 deletions(-) diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs index 3ea665840..e7edbc3eb 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs @@ -1362,7 +1362,7 @@ public void FromClosedViaDeactivateToClosed() } } - public class DeactiviateMethod + public class DeactivateMethod { private static List> FillIdleConnections( BlockingCollection idleConnections, int count) @@ -1443,7 +1443,7 @@ public async Task ShouldCloseAllIdleConnectoinsAsync() // concurrent test // concurrently close and deactive [Fact] - public void DeactiviateAndThenCloseShouldCloseAllConnections() + public void DeactivateAndThenCloseShouldCloseAllConnections() { var idleConnections = new BlockingCollection(); var idleMocks = FillIdleConnections(idleConnections, 5); @@ -1486,7 +1486,7 @@ public void ReturnConnectionIfAcquiredValidConnectionBeforeZombified() pool.NumberOfInUseConnections.Should().Be(0); // This is to simulate Acquire called first, - // but before Acquire put a new conn into inUseConn, Deactiviate get called. + // but before Acquire put a new conn into inUseConn, Deactivate get called. openConnMock.Setup(x => x.IsOpen).Returns(true) .Callback(() => pool.Deactivate()); idleConnections.Add(openConnMock.Object); @@ -1511,7 +1511,7 @@ public void ErrorIfAcquiredInvalidConnectionBeforeZombified() pool.NumberOfInUseConnections.Should().Be(0); // This is to simulate Acquire called first, - // but before Acquire put a new conn into inUseConn, Deactiviate get called. + // but before Acquire put a new conn into inUseConn, Deactivate get called. // However here, this connection is not healthy and will be destoried directly closedConnMock.Setup(x => x.IsOpen).Returns(false) .Callback(() => pool.Deactivate()); diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs index 9abb91000..7d5660b28 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs @@ -180,12 +180,12 @@ public void ShouldRemoveServerPoolIfNotPresentInNewServers() pool.Update(new Uri[0], new[] {ServerUri}); // Then - mockedConnectionPool.Verify(x => x.Deactivate(), Times.Once); // first deactiviate then remove + mockedConnectionPool.Verify(x => x.Deactivate(), Times.Once); // first deactivate then remove connectionPoolDict.Count.Should().Be(0); } [Fact] - public void ShouldDeactiviateServerPoolIfNotPresentInNewServersButHasInUseConnections() + public void ShouldDeactivateServerPoolIfNotPresentInNewServersButHasInUseConnections() { // Given var mockedConnectionPool = new Mock(); @@ -206,7 +206,7 @@ public void ShouldDeactiviateServerPoolIfNotPresentInNewServersButHasInUseConnec public class AddMethod { [Fact] - public void ShouldActiviateIfExist() + public void ShouldActivateIfExist() { // Given var mockedConnectionPool = new Mock(); @@ -243,10 +243,10 @@ public void ShouldAddIfNotFound() } } - public class DeactiviateMethod + public class DeactivateMethod { [Fact] - public void ShouldDeactiviateIfExist() + public void ShouldDeactivateIfExist() { // Given var mockedConnectionPool = new Mock(); @@ -256,7 +256,7 @@ public void ShouldDeactiviateIfExist() var pool = new ClusterConnectionPool(null, connectionPoolDict); // When - pool.Deactiviate(ServerUri); + pool.Deactivate(ServerUri); // Then mockedConnectionPool.Verify(x => x.Deactivate(), Times.Once); @@ -265,7 +265,7 @@ public void ShouldDeactiviateIfExist() } [Fact] - public void ShouldDeactiviateNothingIfNotFound() + public void ShouldDeactivateNothingIfNotFound() { // Given var connectionPoolDict = new ConcurrentDictionary(); @@ -273,7 +273,7 @@ public void ShouldDeactiviateNothingIfNotFound() var pool = new ClusterConnectionPool(null, connectionPoolDict); // When - pool.Deactiviate(ServerUri); + pool.Deactivate(ServerUri); // Then connectionPoolDict.Count.Should().Be(0); diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs index 366df1a0b..094acc0b8 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs @@ -44,7 +44,7 @@ public void ShouldRmoveFromLoadBalancer() var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableManagerMock.Object); loadBalancer.OnConnectionError(uri, new ClientException()); - clusterPoolMock.Verify(x => x.Deactiviate(uri), Times.Once); + clusterPoolMock.Verify(x => x.Deactivate(uri), Times.Once); routingTableMock.Verify(x => x.Remove(uri), Times.Once); routingTableMock.Verify(x => x.RemoveWriter(uri), Times.Never); } @@ -63,7 +63,7 @@ public void ShouldRemoveWriterFromRoutingTable() var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableManagerMock.Object); loadBalancer.OnWriteError(uri); - clusterPoolMock.Verify(x => x.Deactiviate(uri), Times.Never); + clusterPoolMock.Verify(x => x.Deactivate(uri), Times.Never); routingTableMock.Verify(x => x.Remove(uri), Times.Never); routingTableMock.Verify(x => x.RemoveWriter(uri), Times.Once); } @@ -141,7 +141,7 @@ public void ShouldForgetServerWhenFailedToEstablishConn(AccessMode mode) // should be removed routingTableMock.Verify(m => m.Remove(uri), Times.Once); - clusterConnPoolMock.Verify(m => m.Deactiviate(uri), Times.Once); + clusterConnPoolMock.Verify(m => m.Deactivate(uri), Times.Once); } [Theory] @@ -170,7 +170,7 @@ public void ShouldThrowErrorDirectlyIfSecurityError(AccessMode mode) // while the server is not removed routingTableMock.Verify(m => m.Remove(uri), Times.Never); - clusterConnPoolMock.Verify(m => m.Deactiviate(uri), Times.Never); + clusterConnPoolMock.Verify(m => m.Deactivate(uri), Times.Never); } [Theory] @@ -199,7 +199,7 @@ public void ShouldThrowErrorDirectlyIfProtocolError(AccessMode mode) // while the server is not removed routingTableMock.Verify(m => m.Remove(uri), Times.Never); - clusterConnPoolMock.Verify(m => m.Deactiviate(uri), Times.Never); + clusterConnPoolMock.Verify(m => m.Deactivate(uri), Times.Never); } [Theory] diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs index 82315d1b0..5107fb1ed 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs @@ -153,7 +153,7 @@ public async Task UpdateAsync(IEnumerable added, IEnumerable removed) } } - public void Deactiviate(Uri uri) + public void Deactivate(Uri uri) { if (_pools.TryGetValue(uri, out var pool)) { @@ -161,7 +161,7 @@ public void Deactiviate(Uri uri) } } - public Task DeactiviateAsync(Uri uri) + public Task DeactivateAsync(Uri uri) { if (_pools.TryGetValue(uri, out var pool)) { diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs index f913129e4..b87103d67 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs @@ -31,9 +31,9 @@ internal interface IClusterConnectionPool : IDisposable // Update the pool keys with the new server uris void Update(IEnumerable added, IEnumerable removed); Task UpdateAsync(IEnumerable added, IEnumerable removed); - // Deactiviate all the connection pool with the server specified by the uri - void Deactiviate(Uri uri); - Task DeactiviateAsync(Uri uri); + // Deactivate all the connection pool with the server specified by the uri + void Deactivate(Uri uri); + Task DeactivateAsync(Uri uri); // Get number of in-use connections for the uri int NumberOfInUseConnections(Uri uri); diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs index 173bff7b5..54be349d3 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs @@ -101,14 +101,14 @@ public void OnConnectionError(Uri uri, Exception e) { _logger?.Info($"Server at {uri} is no longer available due to error: {e.Message}."); _routingTableManager.RoutingTable.Remove(uri); - _clusterConnectionPool.Deactiviate(uri); + _clusterConnectionPool.Deactivate(uri); } public Task OnConnectionErrorAsync(Uri uri, Exception e) { _logger?.Info($"Server at {uri} is no longer available due to error: {e.Message}."); _routingTableManager.RoutingTable.Remove(uri); - return _clusterConnectionPool.DeactiviateAsync(uri); + return _clusterConnectionPool.DeactivateAsync(uri); } public void OnWriteError(Uri uri) From cc4202273f8fef71535427d6830137b91a983a18 Mon Sep 17 00:00:00 2001 From: Zhen Date: Tue, 21 Nov 2017 17:14:00 +0100 Subject: [PATCH 6/9] Made cluster pool to handle concurrent update (add, delete, modify) properly --- .../Neo4j.Driver/Internal/ConnectionPool.cs | 57 ++++------ .../Internal/Routing/ClusterConnectionPool.cs | 104 ++++++++++-------- .../Routing/IClusterConnectionPool.cs | 1 + .../Routing/IClusterConnectionPoolManager.cs | 1 + .../Internal/Routing/LoadBalancer.cs | 5 + .../Internal/Routing/RoutingTableManager.cs | 10 +- 6 files changed, 97 insertions(+), 81 deletions(-) diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs index 69627f526..103a10b7c 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs @@ -428,31 +428,26 @@ public void Release(IPooledConnection connection) // pool already disposed, and this connection is also already closed return; } + + // Remove from inUse if (!_inUseConnections.TryRemove(connection)) { // pool already disposed. return; } - if (_connectionValidator.IsConnectionReusable(connection)) - { - if (IsIdlePoolFull()) - { - DestroyConnection(connection); - } - else - { - _idleConnections.Add(connection); - } - // Just dequeue any one connection and close it will ensure that all connections in the pool will finally be closed - if (IsZombieOrClosed && _idleConnections.TryTake(out connection)) - { - DestroyConnection(connection); - } + if (!_connectionValidator.IsConnectionReusable(connection) || IsIdlePoolFull() || IsZombieOrClosed) + { + // cannot return to the idle pool + DestroyConnection(connection); + return; } - else + + // Add back to the idle pool + _idleConnections.Add(connection); + // Just dequeue any one connection and close it will ensure that all connections in the pool will finally be closed + if (IsZombieOrClosed && _idleConnections.TryTake(out connection)) { - //release resources by connection DestroyConnection(connection); } }); @@ -467,32 +462,26 @@ public Task ReleaseAsync(IPooledConnection connection) // pool already disposed return; } + // Remove from idle if (!_inUseConnections.TryRemove(connection)) { // pool already disposed return; } - if (await _connectionValidator.IsConnectionReusableAsync(connection).ConfigureAwait(false)) + if (!await _connectionValidator.IsConnectionReusableAsync(connection).ConfigureAwait(false) + || IsIdlePoolFull() || IsZombieOrClosed) { - if (IsIdlePoolFull()) - { - await DestroyConnectionAsync(connection).ConfigureAwait(false); - } - else - { - _idleConnections.Add(connection); - } - - // Just dequeue any one connection and close it will ensure that all connections in the pool will finally be closed - if (IsZombieOrClosed && _idleConnections.TryTake(out connection)) - { - await DestroyConnectionAsync(connection).ConfigureAwait(false); - } + // cannot return to idle pool + await DestroyConnectionAsync(connection).ConfigureAwait(false); + return; } - else + + // Add back to idle pool + _idleConnections.Add(connection); + // Just dequeue any one connection and close it will ensure that all connections in the pool will finally be closed + if (IsZombieOrClosed && _idleConnections.TryTake(out connection)) { - //release resources by connection await DestroyConnectionAsync(connection).ConfigureAwait(false); } }); diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs index 5107fb1ed..076d510fa 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs @@ -71,11 +71,6 @@ internal ClusterConnectionPool( private bool IsClosed => _closedMarker > 0; - private IConnectionPool CreateNewConnectionPool(Uri uri) - { - return _fakePool ?? new ConnectionPool(uri, _connectionSettings, _poolSettings, _bufferSettings, Logger); - } - public IConnection Acquire(Uri uri) { if (!_pools.TryGetValue(uri, out var pool)) @@ -98,31 +93,33 @@ public Task AcquireAsync(Uri uri) return pool.AcquireAsync(ignored); } - // This is the ultimate method to add a pool - private void Add(Uri uri) + public void Add(IEnumerable servers) { - _pools.GetOrAdd(uri, CreateNewConnectionPool); + foreach (var uri in servers) + { + _pools.AddOrUpdate(uri, CreateNewConnectionPool, ActivateConnectionPool); + } if (IsClosed) { // Anything added after dispose should be directly cleaned. Clear(); throw new ObjectDisposedException(GetType().Name, - $"Failed to create connections with server {uri} as the driver has already started to dispose."); + $"Failed to create connections with servers {servers.ToContentString()} as the driver has already started to dispose."); } } - public void Add(IEnumerable servers) + public async Task AddAsync(IEnumerable servers) { foreach (var uri in servers) { - if (_pools.ContainsKey(uri)) - { - _pools[uri].Activate(); - } - else - { - Add(uri); - } + _pools.AddOrUpdate(uri, CreateNewConnectionPool, ActivateConnectionPool); + } + if (IsClosed) + { + // Anything added after dispose should be directly cleaned. + await ClearAsync().ConfigureAwait(false); + throw new ObjectDisposedException(GetType().Name, + $"Failed to create connections with servers {servers.ToContentString()} as the driver has already started to dispose."); } } @@ -131,24 +128,30 @@ public void Update(IEnumerable added, IEnumerable removed) Add(added); foreach (var uri in removed) { - _pools[uri].Deactivate(); - if (_pools[uri].NumberOfInUseConnections == 0) + if (_pools.TryGetValue(uri, out var pool)) { - Purge(uri); + pool.Deactivate(); + if (pool.NumberOfInUseConnections == 0) + { + Purge(uri); + } } } } public async Task UpdateAsync(IEnumerable added, IEnumerable removed) { - Add(added); + await AddAsync(added).ConfigureAwait(false); // TODO chain this part and use task.waitAll foreach (var uri in removed) { - await _pools[uri].DeactivateAsync().ConfigureAwait(false); - if (_pools[uri].NumberOfInUseConnections == 0) + if (_pools.TryGetValue(uri, out var pool)) { - await PurgeAsync(uri).ConfigureAwait(false); + await pool.DeactivateAsync().ConfigureAwait(false); + if (pool.NumberOfInUseConnections == 0) + { + await PurgeAsync(uri).ConfigureAwait(false); + } } } } @@ -170,33 +173,31 @@ public Task DeactivateAsync(Uri uri) return TaskExtensions.GetCompletedTask(); } - private void Purge(Uri uri) + public int NumberOfInUseConnections(Uri uri) { - var removed = _pools.TryRemove(uri, out var toRemove); - if (removed) + if (_pools.TryGetValue(uri, out var pool)) { - toRemove.Close(); + return pool.NumberOfInUseConnections; } + return 0; } - private Task PurgeAsync(Uri uri) + public void Close() { - var removed = _pools.TryRemove(uri, out var toRemove); - if (removed) + if (Interlocked.CompareExchange(ref _closedMarker, 1, 0) == 0) { - return toRemove.CloseAsync(); + Clear(); } - - return TaskExtensions.GetCompletedTask(); } - public int NumberOfInUseConnections(Uri uri) + public Task CloseAsync() { - if (_pools.TryGetValue(uri, out var pool)) + if (Interlocked.CompareExchange(ref _closedMarker, 1, 0) == 0) { - return pool.NumberOfInUseConnections; + return ClearAsync(); } - return 0; + + return TaskExtensions.GetCompletedTask(); } private void Clear() @@ -221,19 +222,21 @@ private Task ClearAsync() return Task.WhenAll(clearTasks); } - public void Close() + private void Purge(Uri uri) { - if (Interlocked.CompareExchange(ref _closedMarker, 1, 0) == 0) + var removed = _pools.TryRemove(uri, out var toRemove); + if (removed) { - Clear(); + toRemove.Close(); } } - public Task CloseAsync() + private Task PurgeAsync(Uri uri) { - if (Interlocked.CompareExchange(ref _closedMarker, 1, 0) == 0) + var removed = _pools.TryRemove(uri, out var toRemove); + if (removed) { - return ClearAsync(); + return toRemove.CloseAsync(); } return TaskExtensions.GetCompletedTask(); @@ -256,5 +259,16 @@ public override string ToString() { return _pools.ValueToString(); } + + private IConnectionPool CreateNewConnectionPool(Uri uri) + { + return _fakePool ?? new ConnectionPool(uri, _connectionSettings, _poolSettings, _bufferSettings, Logger); + } + + private IConnectionPool ActivateConnectionPool(Uri uri, IConnectionPool pool) + { + pool.Activate(); + return pool; + } } } diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs index b87103d67..c95cbbfa3 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs @@ -28,6 +28,7 @@ internal interface IClusterConnectionPool : IDisposable Task AcquireAsync(Uri uri); // Add a set of uri to this pool void Add(IEnumerable uris); + Task AddAsync(IEnumerable uris); // Update the pool keys with the new server uris void Update(IEnumerable added, IEnumerable removed); Task UpdateAsync(IEnumerable added, IEnumerable removed); diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs index 9d0e1c552..79c17f47b 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs @@ -24,6 +24,7 @@ namespace Neo4j.Driver.Internal.Routing internal interface IClusterConnectionPoolManager { void AddConnectionPool(IEnumerable uris); + Task AddConnectionPoolAsync(IEnumerable uris); void UpdateConnectionPool(IEnumerable added, IEnumerable removed); Task UpdateConnectionPoolAsync(IEnumerable added, IEnumerable removed); IConnection CreateClusterConnection(Uri uri); diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs index 54be349d3..5d8153b1b 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs @@ -121,6 +121,11 @@ public void AddConnectionPool(IEnumerable uris) _clusterConnectionPool.Add(uris); } + public Task AddConnectionPoolAsync(IEnumerable uris) + { + return _clusterConnectionPool.AddAsync(uris); + } + public void UpdateConnectionPool(IEnumerable added, IEnumerable removed) { _clusterConnectionPool.Update(added, removed); diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs index 2a5fdd63f..9890fc514 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs @@ -176,6 +176,12 @@ private void PrependRouters(ISet uris) _poolManager.AddConnectionPool(uris); } + private Task PrependRoutersAsync(ISet uris) + { + _routingTable.PrependRouters(uris); + return _poolManager.AddConnectionPoolAsync(uris); + } + internal IRoutingTable UpdateRoutingTableWithInitialUriFallback(ISet initialUriSet, Func, IRoutingTable> updateRoutingTableFunc = null) { @@ -224,7 +230,7 @@ internal async Task UpdateRoutingTableWithInitialUriFallbackAsync var hasPrependedInitialRouters = false; if (_isReadingInAbsenceOfWriter) { - PrependRouters(initialUriSet); + await PrependRoutersAsync(initialUriSet).ConfigureAwait(false); hasPrependedInitialRouters = true; } @@ -241,7 +247,7 @@ internal async Task UpdateRoutingTableWithInitialUriFallbackAsync uris.ExceptWith(triedUris); if (uris.Count != 0) { - PrependRouters(uris); + await PrependRoutersAsync(uris).ConfigureAwait(false); routingTable = await updateRoutingTableFunc(null).ConfigureAwait(false); if (routingTable != null) { From eccd0b9c02b414143c767d3c5b83e3ab2af29f9b Mon Sep 17 00:00:00 2001 From: Zhen Date: Wed, 22 Nov 2017 13:02:13 +0100 Subject: [PATCH 7/9] Fixed reading is not atomic problem --- .../Neo4j.Driver.Tests/ConnectionPoolTests.cs | 2 +- .../Neo4j.Driver/Internal/ConnectionPool.cs | 34 ++++++++++++++++--- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs index e7edbc3eb..5dfa1f789 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs @@ -1240,7 +1240,7 @@ public void FromZombieViaAcquireThrowsError() var exception = Record.Exception(()=>pool.Acquire()); - exception.Should().BeOfType(); + exception.Should().BeOfType(); pool.Status.Should().Be(PoolStatus.Zombie); } diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs index 103a10b7c..fee88ba43 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs @@ -43,8 +43,9 @@ internal class ConnectionPool : LoggerBase, IConnectionPool private readonly Uri _uri; private int _poolStatus = Open; - private bool IsClosed => _poolStatus == Closed; - private bool IsZombieOrClosed => _poolStatus != Open; + private bool IsClosed => AtomicRead(ref _poolStatus) == Closed; + private bool IsZombie => AtomicRead(ref _poolStatus) == Zombie; + private bool IsZombieOrClosed => AtomicRead(ref _poolStatus) != Open; private int _poolSize = 0; private readonly int _maxPoolSize; @@ -70,7 +71,7 @@ internal class ConnectionPool : LoggerBase, IConnectionPool internal int Status { - get => _poolStatus; + get => AtomicRead(ref _poolStatus); set => Interlocked.Exchange(ref _poolStatus, value); } @@ -258,10 +259,14 @@ private IPooledConnection Acquire(CancellationToken cancellationToken) { while (true) { - if (IsZombieOrClosed) + if (IsClosed) { ThrowObjectDisposedException(); } + else if (IsZombie) + { + ThrowClientExceptionDueToZombified(); + } if (!_idleConnections.TryTake(out connection)) { @@ -344,10 +349,14 @@ private Task AcquireAsync(CancellationToken cancellationToken) { while (true) { - if (IsZombieOrClosed) + if (IsClosed) { ThrowObjectDisposedException(); } + else if (IsZombie) + { + ThrowClientExceptionDueToZombified(); + } if (!_idleConnections.TryTake(out connection)) { @@ -594,6 +603,15 @@ private void ThrowObjectDisposedException() FailedToAcquireConnectionDueToPoolClosed(this); } + private void ThrowClientExceptionDueToZombified() + { + throw new ClientException( + $"Failed to obtain a connection from connection pool for server with URI `{_uri}` " + + "as this server has already been removed from routing table. " + + "Please retry your statement again and you should be routed with a different server from the new routing table. " + + "You should not see this error persistenly."); + } + private void SetupStatisticsProvider(IStatisticsCollector collector) { _statisticsCollector = collector; @@ -613,6 +631,12 @@ private void DisposeStatisticsProvider() } } + private static int AtomicRead(ref int value) + { + return Interlocked.CompareExchange(ref value, 0, 0); // change to 0 if the value was 0, + // a.k.a. do nothing but return the original value + } + public override string ToString() { return $"{nameof(_idleConnections)}: {{{_idleConnections.ValueToString()}}}, " + From 1e78b6b38ec44552306c9dca26dee12e27020b2a Mon Sep 17 00:00:00 2001 From: Zhen Date: Wed, 22 Nov 2017 13:11:56 +0100 Subject: [PATCH 8/9] Renamed Open -> Active Zombie -> Inactive --- .../Neo4j.Driver.Tests/ConnectionPoolTests.cs | 50 +++++++++---------- .../Neo4j.Driver/Internal/ConnectionPool.cs | 28 +++++------ 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs index 5dfa1f789..52b6051b0 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs @@ -1182,17 +1182,17 @@ public async void ShoulReportPoolSizeCorrectOnConcurrentRequestsAsync() public class PoolState { - // open + // active [Fact] - public void FromOpenViaAcquireToOpen() + public void FromActiveViaAcquireToActive() { var pool = NewConnectionPool(); pool.Acquire(); - pool.Status.Should().Be(PoolStatus.Open); + pool.Status.Should().Be(PoolStatus.Active); } [Fact] - public void FromOpenViaReleaseToOpen() + public void FromActiveViaReleaseToActive() { var idleQueue = new BlockingCollection(); var inUseConnections = new ConcurrentSet(); @@ -1204,11 +1204,11 @@ public void FromOpenViaReleaseToOpen() idleQueue.Count.Should().Be(1); inUseConnections.Count.Should().Be(0); - pool.Status.Should().Be(PoolStatus.Open); + pool.Status.Should().Be(PoolStatus.Active); } [Fact] - public void FromOpenViaDisposeToClosed() + public void FromActiveViaDisposeToClosed() { var pool = NewConnectionPool(); pool.Dispose(); @@ -1216,36 +1216,36 @@ public void FromOpenViaDisposeToClosed() } [Fact] - public void FromOpenViaActivateToOpen() + public void FromActiveViaActivateToActive() { var pool = NewConnectionPool(); pool.Activate(); - pool.Status.Should().Be(PoolStatus.Open); + pool.Status.Should().Be(PoolStatus.Active); } [Fact] - public void FromOpenViaDeactiateToZombie() + public void FromActiveViaDeactiateToInactive() { var pool = NewConnectionPool(); pool.Deactivate(); - pool.Status.Should().Be(PoolStatus.Zombie); + pool.Status.Should().Be(PoolStatus.Inactive); } - // zombie + // inactive [Fact] - public void FromZombieViaAcquireThrowsError() + public void FromInactiveViaAcquireThrowsError() { var pool = NewConnectionPool(); - pool.Status = PoolStatus.Zombie; + pool.Status = PoolStatus.Inactive; var exception = Record.Exception(()=>pool.Acquire()); exception.Should().BeOfType(); - pool.Status.Should().Be(PoolStatus.Zombie); + pool.Status.Should().Be(PoolStatus.Inactive); } [Fact] - public void FromZombieViaReleaseToZombie() + public void FromInactiveViaReleaseToInactive() { var idleQueue = new BlockingCollection(); var inUseConnections = new ConcurrentSet(); @@ -1254,20 +1254,20 @@ public void FromZombieViaReleaseToZombie() inUseConnections.TryAdd(conn); var pool = NewConnectionPool(idleQueue, inUseConnections); - pool.Status = PoolStatus.Zombie; + pool.Status = PoolStatus.Inactive; pool.Release(conn); inUseConnections.Count.Should().Be(0); idleQueue.Count.Should().Be(0); - pool.Status.Should().Be(PoolStatus.Zombie); + pool.Status.Should().Be(PoolStatus.Inactive); } [Fact] - public void FromZombieViaDisposeToClosed() + public void FromInactiveViaDisposeToClosed() { var pool = NewConnectionPool(); - pool.Status = PoolStatus.Zombie; + pool.Status = PoolStatus.Inactive; pool.Dispose(); @@ -1275,25 +1275,25 @@ public void FromZombieViaDisposeToClosed() } [Fact] - public void FromZombieViaActivateToOpen() + public void FromInactiveViaActivateToActive() { var pool = NewConnectionPool(); - pool.Status = PoolStatus.Zombie; + pool.Status = PoolStatus.Inactive; pool.Activate(); - pool.Status.Should().Be(PoolStatus.Open); + pool.Status.Should().Be(PoolStatus.Active); } [Fact] - public void FromZombieViaDeactiateToZombie() + public void FromInactiveViaDeactiateToInactive() { var pool = NewConnectionPool(); - pool.Status = PoolStatus.Zombie; + pool.Status = PoolStatus.Inactive; pool.Deactivate(); - pool.Status.Should().Be(PoolStatus.Zombie); + pool.Status.Should().Be(PoolStatus.Inactive); } //closed diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs index fee88ba43..86c004dee 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs @@ -31,9 +31,9 @@ namespace Neo4j.Driver.Internal { internal static class PoolStatus { - public const int Open = 0; + public const int Active = 0; public const int Closed = 1; - public const int Zombie = 2; + public const int Inactive = 2; } internal class ConnectionPool : LoggerBase, IConnectionPool @@ -42,10 +42,10 @@ internal class ConnectionPool : LoggerBase, IConnectionPool private readonly Uri _uri; - private int _poolStatus = Open; + private int _poolStatus = Active; private bool IsClosed => AtomicRead(ref _poolStatus) == Closed; - private bool IsZombie => AtomicRead(ref _poolStatus) == Zombie; - private bool IsZombieOrClosed => AtomicRead(ref _poolStatus) != Open; + private bool IsInactive => AtomicRead(ref _poolStatus) == Inactive; + private bool IsInactiveOrClosed => AtomicRead(ref _poolStatus) != Active; private int _poolSize = 0; private readonly int _maxPoolSize; @@ -263,7 +263,7 @@ private IPooledConnection Acquire(CancellationToken cancellationToken) { ThrowObjectDisposedException(); } - else if (IsZombie) + else if (IsInactive) { ThrowClientExceptionDueToZombified(); } @@ -353,7 +353,7 @@ private Task AcquireAsync(CancellationToken cancellationToken) { ThrowObjectDisposedException(); } - else if (IsZombie) + else if (IsInactive) { ThrowClientExceptionDueToZombified(); } @@ -445,7 +445,7 @@ public void Release(IPooledConnection connection) return; } - if (!_connectionValidator.IsConnectionReusable(connection) || IsIdlePoolFull() || IsZombieOrClosed) + if (!_connectionValidator.IsConnectionReusable(connection) || IsIdlePoolFull() || IsInactiveOrClosed) { // cannot return to the idle pool DestroyConnection(connection); @@ -455,7 +455,7 @@ public void Release(IPooledConnection connection) // Add back to the idle pool _idleConnections.Add(connection); // Just dequeue any one connection and close it will ensure that all connections in the pool will finally be closed - if (IsZombieOrClosed && _idleConnections.TryTake(out connection)) + if (IsInactiveOrClosed && _idleConnections.TryTake(out connection)) { DestroyConnection(connection); } @@ -479,7 +479,7 @@ public Task ReleaseAsync(IPooledConnection connection) } if (!await _connectionValidator.IsConnectionReusableAsync(connection).ConfigureAwait(false) - || IsIdlePoolFull() || IsZombieOrClosed) + || IsIdlePoolFull() || IsInactiveOrClosed) { // cannot return to idle pool await DestroyConnectionAsync(connection).ConfigureAwait(false); @@ -489,7 +489,7 @@ public Task ReleaseAsync(IPooledConnection connection) // Add back to idle pool _idleConnections.Add(connection); // Just dequeue any one connection and close it will ensure that all connections in the pool will finally be closed - if (IsZombieOrClosed && _idleConnections.TryTake(out connection)) + if (IsInactiveOrClosed && _idleConnections.TryTake(out connection)) { await DestroyConnectionAsync(connection).ConfigureAwait(false); } @@ -558,7 +558,7 @@ public Task CloseAsync() public void Deactivate() { - if (Interlocked.CompareExchange(ref _poolStatus, Zombie, Open) == Open) + if (Interlocked.CompareExchange(ref _poolStatus, Inactive, Active) == Active) { TerminateIdleConnections(); } @@ -566,7 +566,7 @@ public void Deactivate() public Task DeactivateAsync() { - if (Interlocked.CompareExchange(ref _poolStatus, Zombie, Open) == Open) + if (Interlocked.CompareExchange(ref _poolStatus, Inactive, Active) == Active) { return Task.WhenAll(TerminateIdleConnectionsAsync()); } @@ -575,7 +575,7 @@ public Task DeactivateAsync() public void Activate() { - Interlocked.CompareExchange(ref _poolStatus, Open, Zombie); + Interlocked.CompareExchange(ref _poolStatus, Active, Inactive); } private void TerminateIdleConnections() From 25dea0fcc0f3f1a26cd3ba2a975bb1659899dfa8 Mon Sep 17 00:00:00 2001 From: Zhen Date: Wed, 22 Nov 2017 14:37:56 +0100 Subject: [PATCH 9/9] Fix failing test --- Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs | 8 ++++---- Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs index 52b6051b0..85794e44c 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs @@ -1475,7 +1475,7 @@ public void DeactivateAndThenCloseShouldCloseAllConnections() // cncurrent tests // ConcurrentlyAcquireAndDeactivate [Fact] - public void ReturnConnectionIfAcquiredValidConnectionBeforeZombified() + public void ReturnConnectionIfAcquiredValidConnectionBeforeInactivation() { // Given var idleConnections = new BlockingCollection(); @@ -1500,7 +1500,7 @@ public void ReturnConnectionIfAcquiredValidConnectionBeforeZombified() } [Fact] - public void ErrorIfAcquiredInvalidConnectionBeforeZombified() + public void ErrorIfAcquiredInvalidConnectionBeforeInactivation() { // Given var idleConnections = new BlockingCollection(); @@ -1524,8 +1524,8 @@ public void ErrorIfAcquiredInvalidConnectionBeforeZombified() pool.NumberOfInUseConnections.Should().Be(0); closedConnMock.Verify(x => x.IsOpen, Times.Once); closedConnMock.Verify(x => x.Destroy(), Times.Once); - exception.Should().BeOfType(); - exception.Message.Should().StartWith("Failed to acquire a new connection"); + exception.Should().BeOfType(); + exception.Message.Should().StartWith("Failed to acquire a connection"); } // concurrent test diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs index 86c004dee..e2154b2c0 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs @@ -606,7 +606,7 @@ private void ThrowObjectDisposedException() private void ThrowClientExceptionDueToZombified() { throw new ClientException( - $"Failed to obtain a connection from connection pool for server with URI `{_uri}` " + + $"Failed to acquire a connection from connection pool for server with URI `{_uri}` " + "as this server has already been removed from routing table. " + "Please retry your statement again and you should be routed with a different server from the new routing table. " + "You should not see this error persistenly.");