Skip to content

Commit

Permalink
[Java] Upgrade to Aeron 1.44.0.
Browse files Browse the repository at this point in the history
  • Loading branch information
vyazelenko committed Apr 5, 2024
1 parent 142ea17 commit 027469a
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ public static File recordingIdsFile(final EngineConfiguration configuration)
if (configuration.logAnyMessages())
{
counters = this.aeron.countersReader();
framerInboundLookup = new RecordingIdLookup(archiverIdleStrategy, counters);
framerOutboundLookup = new RecordingIdLookup(archiverIdleStrategy, counters);
indexerInboundLookup = new RecordingIdLookup(archiverIdleStrategy, counters);
indexerOutboundLookup = new RecordingIdLookup(archiverIdleStrategy, counters);
framerInboundLookup = new RecordingIdLookup(archive.archiveId(), archiverIdleStrategy, counters);
framerOutboundLookup = new RecordingIdLookup(archive.archiveId(), archiverIdleStrategy, counters);
indexerInboundLookup = new RecordingIdLookup(archive.archiveId(), archiverIdleStrategy, counters);
indexerOutboundLookup = new RecordingIdLookup(archive.archiveId(), archiverIdleStrategy, counters);
}
else
{
Expand Down Expand Up @@ -522,7 +522,7 @@ private boolean startRecording(

private boolean recordingAlreadyStarted(final int sessionId)
{
return RecordingPos.findCounterIdBySession(counters, sessionId) != NULL_VALUE;
return RecordingPos.findCounterIdBySession(counters, sessionId, archive.archiveId()) != NULL_VALUE;
}

// awaits the recording start and saves the file
Expand Down Expand Up @@ -603,7 +603,7 @@ private void awaitRecordingsCompletion(
final List<CompletingRecording> completingRecordings = new ArrayList<>();
aeronSessionIdToCompletionPosition.forEachLong((sessionId, completionPosition) ->
{
final int counterId = RecordingPos.findCounterIdBySession(counters, (int)sessionId);
final int counterId = RecordingPos.findCounterIdBySession(counters, (int)sessionId, archive.archiveId());
// Recording has completed
if (counterId != NULL_COUNTER_ID)
{
Expand All @@ -630,7 +630,7 @@ private void shutdownArchiver()
it.next();
final long registrationId = it.getLongKey();
final long recordingId = it.getLongValue();
final int counterId = RecordingPos.findCounterIdByRecording(counters, recordingId);
final int counterId = RecordingPos.findCounterIdByRecording(counters, recordingId, archive.archiveId());
archive.stopRecording(registrationId);
if (counterId != NULL_COUNTER_ID)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package uk.co.real_logic.artio.engine.logger;

import io.aeron.Aeron;
import io.aeron.archive.status.RecordingPos;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.concurrent.IdleStrategy;
Expand All @@ -26,13 +27,30 @@
public class RecordingIdLookup
{
private final Long2LongHashMap aeronSessionIdToRecordingId = new Long2LongHashMap(NULL_RECORDING_ID);
private final long archiveId;
private final IdleStrategy archiverIdleStrategy;
private final CountersReader counters;

/**
*
* @param archiverIdleStrategy idle strategy.
* @param counters reader.
* @deprecated Use {@link #RecordingIdLookup(long, IdleStrategy, CountersReader)} instead.
*/
@Deprecated
public RecordingIdLookup(
final IdleStrategy archiverIdleStrategy,
final CountersReader counters)
{
this(Aeron.NULL_VALUE, archiverIdleStrategy, counters);
}

public RecordingIdLookup(
final long archiveId,
final IdleStrategy archiverIdleStrategy,
final CountersReader counters)
{
this.archiveId = archiveId;
this.archiverIdleStrategy = archiverIdleStrategy;
this.counters = counters;
}
Expand Down Expand Up @@ -67,7 +85,7 @@ long findRecordingId(final int aeronSessionId)

private long checkRecordingId(final int aeronSessionId)
{
final int counterId = RecordingPos.findCounterIdBySession(counters, aeronSessionId);
final int counterId = RecordingPos.findCounterIdBySession(counters, aeronSessionId, archiveId);
if (counterId == NULL_COUNTER_ID)
{
return NULL_RECORDING_ID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ int replayedMessages()

private boolean archivingNotComplete(final long endPosition, final long recordingId)
{
final int counterId = RecordingPos.findCounterIdByRecording(countersReader, recordingId);
final int counterId =
RecordingPos.findCounterIdByRecording(countersReader, recordingId, aeronArchive.archiveId());

// wait if the recording is active - otherwise assume that the recording has complete.
if (counterId != CountersReader.NULL_COUNTER_ID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public void setUp()
mediaDriver = TestFixtures.launchMediaDriver();
aeronArchive = AeronArchive.connect(aeronArchiveContext());

recordingIdLookup = new RecordingIdLookup(new YieldingIdleStrategy(), aeron().countersReader());
recordingIdLookup =
new RecordingIdLookup(aeronArchive.archiveId(), new YieldingIdleStrategy(), aeron().countersReader());

aeronArchive.startRecording(CHANNEL, STREAM_ID, SourceLocation.LOCAL);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public void setUp()

deleteFiles();

recordingIdLookup = new RecordingIdLookup(YieldingIdleStrategy.INSTANCE, aeron.countersReader());
recordingIdLookup =
new RecordingIdLookup(aeronArchive.archiveId(), YieldingIdleStrategy.INSTANCE, aeron.countersReader());
writer = newWriter(inMemoryBuffer);
reader = new SequenceNumberIndexReader(inMemoryBuffer, errorHandler, recordingIdLookup, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ public void setUp()
mediaDriver = TestFixtures.launchMediaDriver();
aeronArchive = AeronArchive.connect(aeronArchiveContext());

recordingIdLookup = new RecordingIdLookup(new YieldingIdleStrategy(), aeron().countersReader());
recordingIdLookup =
new RecordingIdLookup(aeronArchive.archiveId(), new YieldingIdleStrategy(), aeron().countersReader());

aeronArchive.startRecording(CHANNEL, STREAM_ID, SourceLocation.LOCAL);

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def byteBuddyVersion = '1.14.13'

def agronaVersion = '1.21.1'
def sbeVersion = '1.31.0'
def aeronVersion = '1.43.0'
def aeronVersion = '1.44.0'
def artioGroup = 'uk.co.real-logic'
def iLink3Enabled = false

Expand Down

0 comments on commit 027469a

Please sign in to comment.