Skip to content

Commit

Permalink
Merge pull request #260 from zhenlineo/1.5-purge-improve
Browse files Browse the repository at this point in the history
Do not purge inUse connections
  • Loading branch information
zhenlineo authored Nov 23, 2017
2 parents 8772b73 + 25dea0f commit f0fbf34
Show file tree
Hide file tree
Showing 15 changed files with 1,034 additions and 319 deletions.
727 changes: 591 additions & 136 deletions Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void ShouldEnsureInitialRouter()
var pool = new ClusterConnectionPool(connSettings, poolSettings, bufferSettings, uris, null);

pool.ToString().Should().Be(
"[{bolt://123:456/ : _availableConnections: {[]}, _inUseConnections: {[]}}]");
"[{bolt://123:456/ : _idleConnections: {[]}, _inUseConnections: {[]}}]");
}
}

Expand Down Expand Up @@ -139,7 +139,7 @@ public void ShouldAddNewConnectionPoolIfDoesNotExist()
var pool = new ClusterConnectionPool(mockedConnectionPool.Object, connectionPoolDict);

// When
pool.Update(new[] {ServerUri});
pool.Update(new[] {ServerUri}, new Uri[0]);

// Then
connectionPoolDict.Count.Should().Be(1);
Expand All @@ -148,7 +148,7 @@ public void ShouldAddNewConnectionPoolIfDoesNotExist()
}

[Fact]
public void ShouldRemoveNewlyCreatedPoolnIfDisposeAlreadyCalled()
public void ShouldRemoveNewlyCreatedPoolIfDisposeAlreadyCalled()
{
// Given
var mockedConnectionPool = new Mock<IConnectionPool>();
Expand All @@ -157,7 +157,7 @@ public void ShouldRemoveNewlyCreatedPoolnIfDisposeAlreadyCalled()

// When
pool.Dispose();
var exception = Record.Exception(() => pool.Update(new[] {ServerUri}));
var exception = Record.Exception(() => pool.Update(new[] {ServerUri}, new Uri[0]));

// Then
mockedConnectionPool.Verify(x => x.Close());
Expand All @@ -173,20 +173,40 @@ public void ShouldRemoveServerPoolIfNotPresentInNewServers()
var mockedConnectionPool = new Mock<IConnectionPool>();
var connectionPoolDict = new ConcurrentDictionary<Uri, IConnectionPool>();
connectionPoolDict.GetOrAdd(ServerUri, mockedConnectionPool.Object);
mockedConnectionPool.Setup(x => x.NumberOfInUseConnections).Returns(0); // no need to explicitly config this
var pool = new ClusterConnectionPool(mockedConnectionPool.Object, connectionPoolDict);

// When
pool.Update(new Uri[0]);
pool.Update(new Uri[0], new[] {ServerUri});

// Then
mockedConnectionPool.Verify(x => x.Deactivate(), Times.Once); // first deactivate then remove
connectionPoolDict.Count.Should().Be(0);
}

[Fact]
public void ShouldDeactivateServerPoolIfNotPresentInNewServersButHasInUseConnections()
{
// Given
var mockedConnectionPool = new Mock<IConnectionPool>();
var connectionPoolDict = new ConcurrentDictionary<Uri, IConnectionPool>();
connectionPoolDict.GetOrAdd(ServerUri, mockedConnectionPool.Object);
mockedConnectionPool.Setup(x => x.NumberOfInUseConnections).Returns(10); // non-zero number
var pool = new ClusterConnectionPool(mockedConnectionPool.Object, connectionPoolDict);

// When
pool.Update(new Uri[0], new[] {ServerUri});

// Then
mockedConnectionPool.Verify(x => x.Deactivate(), Times.Once);
connectionPoolDict.Count.Should().Be(1);
}
}

public class PurgeMethod
public class AddMethod
{
[Fact]
public void ShouldRemovedIfExist()
public void ShouldActivateIfExist()
{
// Given
var mockedConnectionPool = new Mock<IConnectionPool>();
Expand All @@ -196,24 +216,64 @@ public void ShouldRemovedIfExist()
var pool = new ClusterConnectionPool(null, connectionPoolDict);

// When
pool.Purge(ServerUri);
pool.Add(new []{ServerUri});

// Then
mockedConnectionPool.Verify(x => x.Close(), Times.Once);
connectionPoolDict.Count.Should().Be(0);
connectionPoolDict.ContainsKey(ServerUri).Should().BeFalse();
mockedConnectionPool.Verify(x => x.Activate(), Times.Once);
connectionPoolDict.Count.Should().Be(1);
connectionPoolDict.ContainsKey(ServerUri).Should().BeTrue();
}

[Fact]
public void ShouldAddIfNotFound()
{
// Given
var connectionPoolDict = new ConcurrentDictionary<Uri, IConnectionPool>();
var fakePoolMock = new Mock<IConnectionPool>();

var pool = new ClusterConnectionPool(fakePoolMock.Object, connectionPoolDict);

// When
pool.Add(new[] {ServerUri});

// Then
connectionPoolDict.Count.Should().Be(1);
connectionPoolDict.ContainsKey(ServerUri).Should().BeTrue();
connectionPoolDict[ServerUri].Should().Be(fakePoolMock.Object);
}
}

public class DeactivateMethod
{
[Fact]
public void ShouldDeactivateIfExist()
{
// Given
var mockedConnectionPool = new Mock<IConnectionPool>();
var connectionPoolDict = new ConcurrentDictionary<Uri, IConnectionPool>();
connectionPoolDict.GetOrAdd(ServerUri, mockedConnectionPool.Object);

var pool = new ClusterConnectionPool(null, connectionPoolDict);

// When
pool.Deactivate(ServerUri);

// Then
mockedConnectionPool.Verify(x => x.Deactivate(), Times.Once);
connectionPoolDict.Count.Should().Be(1);
connectionPoolDict.ContainsKey(ServerUri).Should().BeTrue();
}

[Fact]
public void ShouldRemoveNothingIfNotFound()
public void ShouldDeactivateNothingIfNotFound()
{
// Given
var connectionPoolDict = new ConcurrentDictionary<Uri, IConnectionPool>();

var pool = new ClusterConnectionPool(null, connectionPoolDict);

// When
pool.Purge(ServerUri);
pool.Deactivate(ServerUri);

// Then
connectionPoolDict.Count.Should().Be(0);
Expand Down Expand Up @@ -273,4 +333,4 @@ public void ShouldReturnCorrectCountForPresentAddress()
}
}
}
}
}
12 changes: 6 additions & 6 deletions Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void ShouldRmoveFromLoadBalancer()
var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableManagerMock.Object);

loadBalancer.OnConnectionError(uri, new ClientException());
clusterPoolMock.Verify(x => x.Purge(uri), Times.Once);
clusterPoolMock.Verify(x => x.Deactivate(uri), Times.Once);
routingTableMock.Verify(x => x.Remove(uri), Times.Once);
routingTableMock.Verify(x => x.RemoveWriter(uri), Times.Never);
}
Expand All @@ -63,7 +63,7 @@ public void ShouldRemoveWriterFromRoutingTable()
var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableManagerMock.Object);

loadBalancer.OnWriteError(uri);
clusterPoolMock.Verify(x => x.Purge(uri), Times.Never);
clusterPoolMock.Verify(x => x.Deactivate(uri), Times.Never);
routingTableMock.Verify(x => x.Remove(uri), Times.Never);
routingTableMock.Verify(x => x.RemoveWriter(uri), Times.Once);
}
Expand Down Expand Up @@ -141,7 +141,7 @@ public void ShouldForgetServerWhenFailedToEstablishConn(AccessMode mode)

// should be removed
routingTableMock.Verify(m => m.Remove(uri), Times.Once);
clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Once);
clusterConnPoolMock.Verify(m => m.Deactivate(uri), Times.Once);
}

[Theory]
Expand Down Expand Up @@ -170,7 +170,7 @@ public void ShouldThrowErrorDirectlyIfSecurityError(AccessMode mode)

// while the server is not removed
routingTableMock.Verify(m => m.Remove(uri), Times.Never);
clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Never);
clusterConnPoolMock.Verify(m => m.Deactivate(uri), Times.Never);
}

[Theory]
Expand Down Expand Up @@ -199,7 +199,7 @@ public void ShouldThrowErrorDirectlyIfProtocolError(AccessMode mode)

// while the server is not removed
routingTableMock.Verify(m => m.Remove(uri), Times.Never);
clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Never);
clusterConnPoolMock.Verify(m => m.Deactivate(uri), Times.Never);
}

[Theory]
Expand Down Expand Up @@ -253,4 +253,4 @@ private static IConnection NewConnectionMock(Uri uri)
}
}
}
}
}
Loading

0 comments on commit f0fbf34

Please sign in to comment.