Skip to content

Commit

Permalink
Add duty cycle tracking to replayer/gapfiller
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-adaptive committed Dec 19, 2024
1 parent 729ba8e commit d6037d5
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ private Replayer newReplayer(
configuration.maxConcurrentSessionReplays(),
clock,
configuration.supportedFixPProtocolType(),
configuration);
configuration,
fixCounters.getIndexerDutyCycleTracker(configuration.indexerCycleThresholdNs()));
}

private void newIndexers()
Expand Down Expand Up @@ -372,19 +373,14 @@ private void newArchivingAgent()
senderSequenceNumbers,
replayerCommandQueue,
new FixSessionCodecsFactory(clock, configuration.sessionEpochFractionFormat()),
clock);
clock,
fixCounters.getIndexerDutyCycleTracker(configuration.indexerCycleThresholdNs()));
}

final Agent dutyCycleTrackingAgent = new IndexerDutyCycleTracker(
configuration.agentNamePrefix(),
clock,
fixCounters.getIndexerDutyCycleTracker(configuration.indexerCycleThresholdNs()));

final List<Agent> agents = new ArrayList<>();
agents.add(inboundIndexer);
agents.add(outboundIndexer);
agents.add(replayer);
agents.add(dutyCycleTrackingAgent);

indexingAgent = new CompositeAgent(agents);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package uk.co.real_logic.artio.engine.logger;

import io.aeron.ExclusivePublication;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.ControlledFragmentHandler;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.EpochNanoClock;
import uk.co.real_logic.artio.DebugLogger;
import uk.co.real_logic.artio.LogTag;
import uk.co.real_logic.artio.Pressure;
Expand Down Expand Up @@ -69,16 +71,23 @@ abstract class AbstractReplayer implements Agent, ControlledFragmentHandler

boolean sendStartReplay = true;

protected final EpochNanoClock clock;
private final DutyCycleTracker dutyCycleTracker;

AbstractReplayer(
final ExclusivePublication publication,
final FixSessionCodecsFactory fixSessionCodecsFactory,
final BufferClaim bufferClaim,
final SenderSequenceNumbers senderSequenceNumbers)
final SenderSequenceNumbers senderSequenceNumbers,
final EpochNanoClock clock,
final DutyCycleTracker dutyCycleTracker)
{
this.publication = publication;
this.fixSessionCodecsFactory = fixSessionCodecsFactory;
this.bufferClaim = bufferClaim;
this.senderSequenceNumbers = senderSequenceNumbers;
this.clock = clock;
this.dutyCycleTracker = dutyCycleTracker;
}

boolean trySendStartReplay(final long sessionId, final long connectionId, final long correlationId)
Expand Down Expand Up @@ -108,6 +117,16 @@ boolean trySendStartReplay(final long sessionId, final long connectionId, final
return false;
}

public void onStart()
{
dutyCycleTracker.update(clock.nanoTime());
}

protected void trackDutyCycleTime(final long timeInNs)
{
dutyCycleTracker.measureAndUpdate(timeInNs);
}

public void onClose()
{
publication.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package uk.co.real_logic.artio.engine.logger;

import io.aeron.Subscription;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.Header;
import org.agrona.DirectBuffer;
Expand Down Expand Up @@ -63,9 +64,11 @@ public GapFiller(
final SenderSequenceNumbers senderSequenceNumbers,
final ReplayerCommandQueue replayerCommandQueue,
final FixSessionCodecsFactory fixSessionCodecsFactory,
final EpochNanoClock clock)
final EpochNanoClock clock,
final DutyCycleTracker dutyCycleTracker)
{
super(publication.dataPublication(), fixSessionCodecsFactory, new BufferClaim(), senderSequenceNumbers);
super(publication.dataPublication(), fixSessionCodecsFactory, new BufferClaim(), senderSequenceNumbers,
clock, dutyCycleTracker);
this.inboundSubscription = inboundSubscription;
this.publication = publication;
this.agentNamePrefix = agentNamePrefix;
Expand All @@ -76,7 +79,10 @@ public GapFiller(

public int doWork()
{
timestamper.sendTimestampMessage();
final long timeInNs = clock.nanoTime();

trackDutyCycleTime(timeInNs);
timestamper.sendTimestampMessage(timeInNs);

return replayerCommandQueue.poll() + inboundSubscription.controlledPoll(this, POLL_LIMIT);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ class ReplayTimestamper
replayerTimestampEncoder.wrapAndApplyHeader(timestampBuffer, 0, messageHeaderEncoder);
}

void sendTimestampMessage()
void sendTimestampMessage(final long timeInNs)
{
final long timeInNs = clock.nanoTime();
if (timeInNs > nextTimestampMessageInNs)
{
replayerTimestampEncoder.timestamp(timeInNs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.aeron.ExclusivePublication;
import io.aeron.Subscription;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.Header;
import org.agrona.DirectBuffer;
Expand Down Expand Up @@ -106,7 +107,6 @@ public class Replayer extends AbstractReplayer
private final ReplayerCommandQueue replayerCommandQueue;
private final AtomicCounter currentReplayCount;
private final int maxConcurrentSessionReplays;
private final EpochNanoClock clock;
private final EngineConfiguration configuration;
private final ReplayQuery outboundReplayQuery;
private final IdleStrategy idleStrategy;
Expand Down Expand Up @@ -140,9 +140,10 @@ public Replayer(
final int maxConcurrentSessionReplays,
final EpochNanoClock clock,
final FixPProtocolType fixPProtocolType,
final EngineConfiguration configuration)
final EngineConfiguration configuration,
final DutyCycleTracker dutyCycleTracker)
{
super(publication, fixSessionCodecsFactory, bufferClaim, senderSequenceNumbers);
super(publication, fixSessionCodecsFactory, bufferClaim, senderSequenceNumbers, clock, dutyCycleTracker);
this.outboundReplayQuery = outboundReplayQuery;
this.idleStrategy = idleStrategy;
this.errorHandler = errorHandler;
Expand All @@ -157,7 +158,6 @@ public Replayer(
this.replayerCommandQueue = replayerCommandQueue;
this.currentReplayCount = currentReplayCount;
this.maxConcurrentSessionReplays = maxConcurrentSessionReplays;
this.clock = clock;
this.configuration = configuration;

gapFillMessageTypes = packAllMessageTypes(gapfillOnReplayMessageTypes);
Expand Down Expand Up @@ -468,7 +468,10 @@ private FixReplayerSession processFixResendRequest(

public int doWork()
{
timestamper.sendTimestampMessage();
final long timeInNs = clock.nanoTime();

trackDutyCycleTime(timeInNs);
timestamper.sendTimestampMessage(timeInNs);

int work = replayerCommandQueue.poll();
work += pollReplayerChannels();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.aeron.Subscription;
import io.aeron.driver.Configuration;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.ControlledFragmentHandler.Action;
import io.aeron.logbuffer.Header;
Expand Down Expand Up @@ -138,7 +139,8 @@ public void setUp()
DEFAULT_MAX_CONCURRENT_SESSION_REPLAYS,
clock,
FixPProtocolType.ILINK_3,
mock(EngineConfiguration.class));
mock(EngineConfiguration.class),
mock(DutyCycleTracker.class));
}

private void setReplayedMessages(final int replayedMessages)
Expand Down

0 comments on commit d6037d5

Please sign in to comment.