Skip to content

Commit

Permalink
Implement pipelined begin for executable query (#724)
Browse files Browse the repository at this point in the history
* Implement pipelined begin

* Add unit tests
  • Loading branch information
thelonelyvulpes authored Aug 18, 2023
1 parent d024d70 commit 7c93291
Show file tree
Hide file tree
Showing 16 changed files with 235 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ static SupportedFeatures()
//"Feature:TLS:1.3",
//"Optimization:ConnectionReuse",
"Optimization:EagerTransactionBegin",
"Optimization:ExecuteQueryPipelining",
//"Optimization:ImplicitDefaultArguments",
//"Optimization:MinimalResets",
"Optimization:AuthPipelining",
Expand Down
77 changes: 59 additions & 18 deletions Neo4j.Driver/Neo4j.Driver.Tests/AsyncSessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public async Task ShouldDelegateToProtocolRunAutoCommitTxAsync(bool reactive)
await session.RunAsync("lalalal");

mockConn.Verify(
x => x.RunInAutoCommitTransactionAsync(It.IsAny<AutoCommitParams>(),
x => x.RunInAutoCommitTransactionAsync(
It.IsAny<AutoCommitParams>(),
It.IsAny<INotificationsConfig>()),
Times.Once);
}
Expand Down Expand Up @@ -160,6 +161,27 @@ public async void ShouldNotBeAbleToUseSessionWhileOngoingTransaction()
error.Should().BeOfType<TransactionNestingException>();
}

[Fact]
public async void ShouldDefaultToBlockingTransactionStart()
{
var mockProtocol = new Mock<IBoltProtocol>();
var mockConn = new Mock<IConnection>();
mockConn.Setup(x => x.IsOpen).Returns(true);
mockConn
.SetupGet(x => x.BoltProtocol)
.Returns(mockProtocol.Object);

var session = NewSession(mockConn.Object);
var tx = await session.BeginTransactionAsync();

mockProtocol.Verify(
x =>
x.BeginTransactionAsync(
It.IsAny<IConnection>(),
It.Is<BeginProtocolParams>(y => y.AwaitBeginResult == true)),
Times.Once);
}

[Fact]
public async void ShouldBeAbleToUseSessionAgainWhenTransactionIsClosed()
{
Expand Down Expand Up @@ -205,11 +227,7 @@ public async void ShouldCloseConnectionOnRunIfBeginTxFailed()
x =>
x.BeginTransactionAsync(
It.IsAny<IConnection>(),
It.IsAny<string>(),
It.IsAny<Bookmarks>(),
It.IsAny<TransactionConfig>(),
It.IsAny<Driver.SessionConfig>(),
It.IsAny<INotificationsConfig>()))
It.IsAny<BeginProtocolParams>()))
.Throws(new IOException("Triggered an error when beginTx"));

var session = NewSession(mockConn.Object);
Expand All @@ -234,11 +252,7 @@ public async void ShouldCloseConnectionOnNewBeginTxIfBeginTxFailed()
x =>
x.BeginTransactionAsync(
It.IsAny<IConnection>(),
It.IsAny<string>(),
It.IsAny<Bookmarks>(),
It.IsAny<TransactionConfig>(),
It.IsAny<Driver.SessionConfig>(),
It.IsAny<INotificationsConfig>()))
It.IsAny<BeginProtocolParams>()))
.Returns(Task.CompletedTask)
.Callback(
() =>
Expand All @@ -263,6 +277,38 @@ public async void ShouldCloseConnectionOnNewBeginTxIfBeginTxFailed()
}
}

public class PipelinedRunTransactionMethod
{
[Fact]
public async void PipelinedShouldBeginWithoutBlocking()
{
var mockProtocol = new Mock<IBoltProtocol>();
var mockConn = new Mock<IConnection>();
mockConn.Setup(x => x.IsOpen).Returns(true);

mockConn
.SetupGet(x => x.BoltProtocol)
.Returns(mockProtocol.Object);

var session = new AsyncSession(
new TestConnectionProvider(mockConn.Object),
null,
new AsyncRetryLogic(TimeSpan.Zero, null),
0,
new Driver.SessionConfig(),
false);

await session.PipelinedExecuteReadAsync(_ => Task.FromResult(null as EagerResult<IRecord[]>));

mockProtocol.Verify(
x =>
x.BeginTransactionAsync(
It.IsAny<IConnection>(),
It.Is<BeginProtocolParams>(y => y.AwaitBeginResult == false)),
Times.Once);
}
}

public class CloseAsyncMethod
{
[Fact]
Expand All @@ -274,11 +320,7 @@ public async void ShouldCloseConnectionIfBeginTxFailed()
x =>
x.BeginTransactionAsync(
It.IsAny<IConnection>(),
It.IsAny<string>(),
It.IsAny<Bookmarks>(),
It.IsAny<TransactionConfig>(),
It.IsAny<Driver.SessionConfig>(),
It.IsAny<INotificationsConfig>()))
It.IsAny<BeginProtocolParams>()))
.Throws(new IOException("Triggered an error when beginTx"));

var session = NewSession(mockConn.Object);
Expand All @@ -295,7 +337,7 @@ public async void ShouldCloseTxOnCloseAsync()
var mockProtocol = new Mock<IBoltProtocol>();
var mockConn = NewMockedConnection(mockProtocol);
var session = NewSession(mockConn.Object);
var _ = await session.BeginTransactionAsync();
await session.BeginTransactionAsync();
await session.CloseAsync();

mockProtocol.Verify(x => x.RollbackTransactionAsync(It.IsAny<IConnection>()), Times.Once);
Expand Down Expand Up @@ -393,7 +435,6 @@ public Task<IServerInfo> VerifyConnectivityAndGetInfoAsync()
AuthTokenManagers.Static(AuthTokens.None),
Config.Default);


public Task<bool> SupportsMultiDbAsync()
{
return Task.FromResult(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,11 +838,13 @@ public async Task ShouldThrowWhenUsingImpersonatedUserWithBoltVersionLessThan44(
var exception = await Record.ExceptionAsync(
() => BoltProtocol.Instance.BeginTransactionAsync(
mockConn.Object,
new BeginProtocolParams(
"db",
Bookmarks.Empty,
TransactionConfig.Default,
new SessionConfig("Douglas Fir"),
null));
null,
true)));

exception.Should().BeOfType<ArgumentException>();
}
Expand All @@ -861,11 +863,13 @@ public async Task ShouldThrowWhenNotificationsWithBoltVersionLessThan52(int majo
var exception = await Record.ExceptionAsync(
() => BoltProtocol.Instance.BeginTransactionAsync(
mockConn.Object,
new BeginProtocolParams(
"db",
null,
TransactionConfig.Default,
null,
new NotificationsDisabledConfig()));
new NotificationsDisabledConfig(),
true)));

exception.Should().BeOfType<ArgumentOutOfRangeException>();
}
Expand All @@ -882,11 +886,13 @@ public async Task ShouldNotThrowWhenNotificationsWithBoltVersionGreaterThan51(in
var exception = await Record.ExceptionAsync(
() => BoltProtocol.Instance.BeginTransactionAsync(
mockConn.Object,
new BeginProtocolParams(
"db",
null,
TransactionConfig.Default,
null,
new NotificationsDisabledConfig()));
new NotificationsDisabledConfig(),
true)));

exception.Should().BeNull();
}
Expand All @@ -904,11 +910,13 @@ public async Task ShouldNotThrowWhenImpersonatingUserWithBoltVersionGreaterThan4
var exception = await Record.ExceptionAsync(
() => BoltProtocol.Instance.BeginTransactionAsync(
mockConn.Object,
new BeginProtocolParams(
"db",
Bookmarks.Empty,
TransactionConfig.Default,
new SessionConfig("Douglas Fir"),
null));
null,
true)));

exception.Should().BeNull();
}
Expand All @@ -928,25 +936,21 @@ public async Task ShouldDelegateLogicToV3()

await protocol.BeginTransactionAsync(
mockConn.Object,
new BeginProtocolParams(
"db",
bookmarks,
config,
sessionConfig,
null);
null,
true));

mockV3.Verify(
x =>
x.BeginTransactionAsync(mockConn.Object, "db", bookmarks, config, sessionConfig, null),
x => x.BeginTransactionAsync(mockConn.Object,
new BeginProtocolParams("db", bookmarks, config, sessionConfig, null, true)),
Times.Once);

mockConn.Verify(
x =>
x.BeginTransactionAsync(
It.IsAny<string>(),
It.IsAny<Bookmarks>(),
It.IsAny<TransactionConfig>(),
It.IsAny<SessionConfig>(),
It.IsAny<INotificationsConfig>()),
x => x.BeginTransactionAsync(It.IsAny<BeginProtocolParams>()),
Times.Never);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,13 @@ public async Task ShouldThrowIfImpersonatedUserIsNotNull()
var exception = await Record.ExceptionAsync(
() => BoltProtocolV3.Instance.BeginTransactionAsync(
mockConn.Object,
new BeginProtocolParams(
null,
null,
TransactionConfig.Default,
null,
null));
null,
true)));

exception.Should().BeOfType<ArgumentException>();
}
Expand All @@ -416,11 +418,13 @@ public async Task ShouldThrowWhenNotificationsWithBoltVersionLessThan52(int majo
var exception = await Record.ExceptionAsync(
() => BoltProtocolV3.Instance.BeginTransactionAsync(
mockConn.Object,
new BeginProtocolParams(
null,
null,
TransactionConfig.Default,
null,
new NotificationsDisabledConfig()));
new NotificationsDisabledConfig(),
true)));

exception.Should().BeOfType<ArgumentOutOfRangeException>();
}
Expand All @@ -437,11 +441,13 @@ public async Task ShouldNotThrowWhenNotificationsWithBoltVersionGreaterThan51(in
var exception = await Record.ExceptionAsync(
() => BoltProtocolV3.Instance.BeginTransactionAsync(
mockConn.Object,
new BeginProtocolParams(
null,
null,
TransactionConfig.Default,
null,
new NotificationsDisabledConfig()));
new NotificationsDisabledConfig(),
true)));

exception.Should().BeNull();
}
Expand All @@ -455,11 +461,13 @@ public async Task ShouldThrowIfDatabaseIsNotNull()
var exception = await Record.ExceptionAsync(
() => BoltProtocolV3.Instance.BeginTransactionAsync(
mockConn.Object,
new BeginProtocolParams(
"db",
null,
TransactionConfig.Default,
null,
null));
null,
true)));

exception.Should().BeOfType<ClientException>();
}
Expand All @@ -474,11 +482,13 @@ public async Task ShouldThrowIfConnectionModeIsNull()
var exception = await Record.ExceptionAsync(
() => BoltProtocolV3.Instance.BeginTransactionAsync(
mockConn.Object,
new BeginProtocolParams(
null,
null,
TransactionConfig.Default,
null,
null));
null,
true)));

exception.Should().BeOfType<InvalidOperationException>();
}
Expand Down Expand Up @@ -508,11 +518,13 @@ public async Task ShouldSyncBeginMessage()
var bookmarks = new InternalBookmarks();
await protocol.BeginTransactionAsync(
mockConn.Object,
new BeginProtocolParams(
null,
bookmarks,
tc,
null,
null);
null,
true));

msgFactory.Verify(
x => x.NewBeginMessage(mockConn.Object, null, bookmarks, tc, AccessMode.Write, null),
Expand All @@ -525,6 +537,51 @@ await protocol.BeginTransactionAsync(
mockConn.Verify(x => x.SyncAsync(), Times.Once);
mockConn.Verify(x => x.SendAsync(), Times.Never);
}

[Fact]
public async Task ShouldNotSyncBeginMessageWhenPipelining()
{
var mockConn = new Mock<IConnection>();
mockConn.SetupGet(x => x.Version).Returns(BoltProtocolVersion.V3_0);
mockConn.SetupGet(x => x.Mode).Returns(AccessMode.Write);

var fakeMessage = new BeginMessage(null, null, null, null, AccessMode.Write, null, null);
var msgFactory = new Mock<IBoltProtocolMessageFactory>();
msgFactory.Setup(
x => x.NewBeginMessage(
It.IsAny<IConnection>(),
It.IsAny<string>(),
It.IsAny<Bookmarks>(),
It.IsAny<TransactionConfig>(),
It.IsAny<AccessMode>(),
It.IsAny<INotificationsConfig>()))
.Returns(fakeMessage);

var protocol = new BoltProtocolV3(msgFactory.Object);

var tc = new TransactionConfig();
var bookmarks = new InternalBookmarks();
await protocol.BeginTransactionAsync(
mockConn.Object,
new BeginProtocolParams(
null,
bookmarks,
tc,
null,
null,
false));

msgFactory.Verify(
x => x.NewBeginMessage(mockConn.Object, null, bookmarks, tc, AccessMode.Write, null),
Times.Once);

mockConn.Verify(
x => x.EnqueueAsync(fakeMessage, NoOpResponseHandler.Instance),
Times.Once);

mockConn.Verify(x => x.SyncAsync(), Times.Never);
mockConn.Verify(x => x.SendAsync(), Times.Never);
}
}

public class RunInExplicitTransactionAsync
Expand Down
Loading

0 comments on commit 7c93291

Please sign in to comment.