Skip to content

Commit

Permalink
Merge pull request #218 from ali-ince/1.5-async-perf-improvement
Browse files Browse the repository at this point in the history
1.5 async performance improvement
  • Loading branch information
zhenlineo authored Aug 16, 2017
2 parents 106cd98 + 4b80d54 commit 6b42230
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 231 deletions.
107 changes: 5 additions & 102 deletions Neo4j.Driver/Neo4j.Driver.Tests/IO/ChunkReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public void ShouldCleanupChunkBufferIfItExceedsMaxChunkBufferSize()
real.Should().HaveCount(ushort.MaxValue);
real.Should().Contain(0);

chunkBuffer.Length.Should().Be(2);
chunkBuffer.Length.Should().Be(0);
chunkBuffer.Position.Should().Be(0);
}

[Fact]
Expand All @@ -188,109 +189,11 @@ public async void ShouldCleanupChunkBufferIfItExceedsMaxChunkBufferSizeAsync()
real.Should().HaveCount(ushort.MaxValue);
real.Should().Contain(0);

chunkBuffer.Length.Should().Be(2);
chunkBuffer.Length.Should().Be(0);
chunkBuffer.Position.Should().Be(0);
}

}

//public class ReadBSyteMethod
//{
// [Theory]
// [InlineData(new byte[] {0x00, 0x01, 0x80, 0x00, 0x00}, sbyte.MinValue)]
// [InlineData(new byte[] {0x00, 0x01, 0x7F, 0x00, 0x00}, sbyte.MaxValue)]
// [InlineData(new byte[] {0x00, 0x01, 0x00, 0x00, 0x00}, 0)]
// [InlineData(new byte[] {0x00, 0x01, 0xFF, 0x00, 0x00}, -1)]
// public void ShouldReturnTheCorrectValue(byte[] response, sbyte correctValue)
// {
// var chunkedInput = IOExtensions.CreateChunkedPackStreamReaderFromBytes(response);
// var actual = chunkedInput.NextSByte();
// actual.Should().Be(correctValue); //, $"Got: {actual}, expected: {correctValue}");
// }
//}

//public class MultipleChunksTests
//{
// private readonly ITestOutputHelper _output;

// public MultipleChunksTests(ITestOutputHelper output)
// {
// _output = output;
// }

// [Theory]
// //-----------------------|---head1--|----|---head2---|-----------|--msg end--|
// [InlineData(new byte[] { 0x00, 0x01, 0x00, 0x00, 0x02, 0x01, 0x02, 0x00, 0x00 }, new byte[] { 0x00, 0x01, 0x02})]
// public void ShouldReadMessageAcrossChunks(byte[] input, byte[] correctValue)
// {
// var chunkedInput = IOExtensions.CreateChunkedPackStreamReaderFromBytes(input);

// byte[] actual = chunkedInput.ReadBytes(3);

// actual.Should().Equal(correctValue);
// }

// [Theory]
// //-----------------------|---head1--|----|---head2---|-----------|--msg end--|
// [InlineData(new byte[] { 0x00, 0x01, 0x00, 0x00, 0x02, 0x01, 0x02, 0x00, 0x00 }, new byte[] { 0x00, 0x01, 0x02 })]
// public void ShouldLogBytes(byte[] input, byte[] correctValue)
// {
// var loggerMock = new Mock<ILogger>();
// loggerMock.Setup(x => x.Trace(It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<int>(), It.IsAny<int>()))
// .Callback<string, object[]>((s, o) => _output.WriteLine(s + ((byte[])o[0]).ToHexString(showX: true)));

// var chunkedInput = IOExtensions.CreateChunkedPackStreamReaderFromBytes(input, loggerMock.Object);

// byte[] actual = chunkedInput.ReadBytes(3);
// actual.Should().Equal(correctValue);
// loggerMock.Verify(x => x.Trace("S: ", It.IsAny<byte[]>(), It.IsAny<int>(), It.IsAny<int>()), Times.AtLeastOnce);
// }

// [Theory]
// //-----------------------|---head1--|----|---head2---|-----------|--msg end--|
// [InlineData(new byte[] { 0x00, 0x01, 0x00, 0x00, 0x02, 0x01, 0x02, 0x00, 0x00 }, new byte[] { 0x00, 0x01, 0x02 })]
// public void ShouldReadMessageBiggerThanChunkSize(byte[] input, byte[] correctValue)
// {
// var chunkedInput = IOExtensions.CreateChunkedPackStreamReaderFromBytes(input);

// byte[] actual = chunkedInput.ReadBytes(3);
// actual.Should().Equal(correctValue);
// }
//}

//public class ChunkHeaderTests
//{
// private readonly Random _random = new Random();
// public byte Getbyte()
// {
// var num = _random.Next(0, 26); // 0 to 25
// byte letter = (byte)('a' + num);
// return letter;
// }

// [Fact]
// public void ShouldReadHeaderWithinUnsignedShortRange()
// {
// for (var i = 1; i <= UInt16.MaxValue; i = (i << 1) + 1) // i: [0x1, 0xFFFF]
// {
// ushort chunkHeaderSize = (ushort)(i & 0xFFFF);

// var input = new byte[chunkHeaderSize + 2 + 2]; // 0xXX, 0xXX, ..., 0x00, 0x00
// input[0] = (byte)((chunkHeaderSize & 0xFF00) >> 8);
// input[1] = (byte)(chunkHeaderSize & 0xFF);
// for (int j = 2; j < chunkHeaderSize + 2; j++)
// {
// input[j] = Getbyte();
// }

// var chunkedInput = IOExtensions.CreateChunkedPackStreamReaderFromBytes(input);
// byte[] actual = chunkedInput.ReadBytes(chunkHeaderSize);
// for (int j = 0; j < actual.Length; j++)
// {
// actual[j].Should().Be(input[2 + j]);
// }
// }
// }
//}


}
}
41 changes: 0 additions & 41 deletions Neo4j.Driver/Neo4j.Driver/Internal/Connector/IInputStream.cs

This file was deleted.

31 changes: 0 additions & 31 deletions Neo4j.Driver/Neo4j.Driver/Internal/Connector/IOutputStream.cs

This file was deleted.

14 changes: 11 additions & 3 deletions Neo4j.Driver/Neo4j.Driver/Internal/IO/BoltReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void Read(IMessageResponseHandler responseHandler)

_chunkReader.ReadNextMessage(_bufferStream);

ProcessMessage(responseHandler);
ConsumeMessages(responseHandler);
}

public Task ReadAsync(IMessageResponseHandler responseHandler)
Expand All @@ -81,14 +81,22 @@ public Task ReadAsync(IMessageResponseHandler responseHandler)
_chunkReader.ReadNextMessageAsync(_bufferStream)
.ContinueWith(t =>
{
ProcessMessage(responseHandler);
ConsumeMessages(responseHandler);
}, TaskContinuationOptions.ExecuteSynchronously);
}

private void ProcessMessage(IMessageResponseHandler responseHandler)
private void ConsumeMessages(IMessageResponseHandler responseHandler)
{
_bufferStream.Position = 0;

while (_bufferStream.Length > _bufferStream.Position)
{
ProcessMessage(responseHandler);
}
}

private void ProcessMessage(IMessageResponseHandler responseHandler)
{
var structure = (PackStreamStruct)_packStreamReader.Read();

switch (structure.Signature)
Expand Down
Loading

0 comments on commit 6b42230

Please sign in to comment.