forked from AdaptiveConsulting/Aeron.NET
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPing.cs
128 lines (107 loc) · 5.44 KB
/
Ping.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/*
* Copyright 2014 - 2017 Adaptive Financial Consulting Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Diagnostics;
using System.Threading;
using Adaptive.Aeron.LogBuffer;
using Adaptive.Aeron.Samples.Common;
using Adaptive.Agrona;
using Adaptive.Agrona.Concurrent;
using Adaptive.Agrona.Util;
using HdrHistogram;
namespace Adaptive.Aeron.Samples.Ping
{
public class Ping
{
private static readonly string PingChannel = SampleConfiguration.PING_CHANNEL;
private static readonly string PongChannel = SampleConfiguration.PONG_CHANNEL;
private static readonly int PingStreamID = SampleConfiguration.PING_STREAM_ID;
private static readonly int PongStreamID = SampleConfiguration.PONG_STREAM_ID;
private static readonly int NumberOfMessages = SampleConfiguration.NUMBER_OF_MESSAGES;
private static readonly int WarmupNumberOfMessages = SampleConfiguration.WARMUP_NUMBER_OF_MESSAGES;
private static readonly int WarmupNumberOfIterations = SampleConfiguration.WARMUP_NUMBER_OF_ITERATIONS;
private static readonly int MessageLength = SampleConfiguration.MESSAGE_LENGTH;
private static readonly int FragmentCountLimit = SampleConfiguration.FRAGMENT_COUNT_LIMIT;
private static readonly LongHistogram Histogram = new LongHistogram(NanoUtil.FromSeconds(10), 3);
private static readonly CountdownEvent Latch = new CountdownEvent(1);
private static readonly IIdleStrategy PollingIdleStrategy = new BusySpinIdleStrategy();
public static void Main()
{
var ctx = new Aeron.Context()
.AvailableImageHandler(AvailablePongImageHandler);
var fragmentAssembler = new FragmentAssembler(PongHandler);
Console.WriteLine("Publishing Ping at " + PingChannel + " on stream Id " + PingStreamID);
Console.WriteLine("Subscribing Pong at " + PongChannel + " on stream Id " + PongStreamID);
Console.WriteLine("Message length of " + MessageLength + " bytes");
using (var aeron = Aeron.Connect(ctx))
{
Console.WriteLine("Warming up... " + WarmupNumberOfIterations + " iterations of " + WarmupNumberOfMessages + " messages");
using (var publication = aeron.AddPublication(PingChannel, PingStreamID))
using (var subscription = aeron.AddSubscription(PongChannel, PongStreamID))
using (var byteBuffer = BufferUtil.AllocateDirectAligned(MessageLength, BitUtil.CACHE_LINE_LENGTH))
using (var atomicBuffer = new UnsafeBuffer(byteBuffer))
{
Latch.Wait();
for (var i = 0; i < WarmupNumberOfIterations; i++)
{
RoundTripMessages(atomicBuffer, fragmentAssembler.OnFragment, publication, subscription, WarmupNumberOfMessages);
}
Thread.Sleep(100);
do
{
Histogram.Reset();
Console.WriteLine("Pinging " + NumberOfMessages + " messages");
RoundTripMessages(atomicBuffer, fragmentAssembler.OnFragment, publication, subscription, NumberOfMessages);
Console.WriteLine("Histogram of RTT latencies in microseconds.");
Histogram.OutputPercentileDistribution(Console.Out, outputValueUnitScalingRatio: 1000);
} while (Console.Read() == 'y');
}
}
}
private static void RoundTripMessages(UnsafeBuffer buffer,
FragmentHandler fragmentHandler, Publication publication, Subscription subscription, int count)
{
for (var i = 0; i < count; i++)
{
do
{
buffer.PutLong(0, Stopwatch.GetTimestamp());
} while (publication.Offer(buffer, 0, MessageLength) < 0L);
PollingIdleStrategy.Reset();
while (subscription.Poll(fragmentHandler, FragmentCountLimit) <= 0)
{
PollingIdleStrategy.Idle();
}
}
}
private static void PongHandler(IDirectBuffer buffer, int offset, int length, Header header)
{
var pingTimestamp = buffer.GetLong(offset);
var rttNs = Stopwatch.GetTimestamp() - pingTimestamp;
var b = rttNs*1000*1000*1000d/Stopwatch.Frequency;
Histogram.RecordValue((long) b);
}
private static void AvailablePongImageHandler(Image image)
{
var subscription = image.Subscription;
Console.WriteLine($"Available image: channel={subscription.Channel} streamId={subscription.StreamId} session={image.SessionId}");
if (PongStreamID == subscription.StreamId && PongChannel.Equals(subscription.Channel))
{
Latch.Signal();
}
}
}
}