From 761c05911b663ce99f51fcd0f00881c919ef5e5a Mon Sep 17 00:00:00 2001 From: Wojciech Lukowicz Date: Fri, 31 May 2024 16:06:38 +0100 Subject: [PATCH] fix FixReplayerSession state scope Static state won't work if there are multiple Artio instances in a single JVM. --- .../engine/logger/FixReplayerSession.java | 46 +++++++++---------- .../artio/engine/logger/Replayer.java | 4 ++ 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/FixReplayerSession.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/FixReplayerSession.java index 8e12dffb11..01938f4428 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/FixReplayerSession.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/FixReplayerSession.java @@ -63,14 +63,7 @@ private enum State CLOSING } - // Safe to share between multiple instances due to single threaded nature of the replayer - private static final FixMessageEncoder FIX_MESSAGE_ENCODER = new FixMessageEncoder(); - private static final FixMessageDecoder FIX_MESSAGE = new FixMessageDecoder(); - private static final ThrottleRejectDecoder THROTTLE_REJECT = new ThrottleRejectDecoder(); - private static final AsciiBuffer ASCII_BUFFER = new MutableAsciiBuffer(); - private final GapFillEncoder gapFillEncoder; - private final PossDupEnabler possDupEnabler; private final EpochNanoClock clock; private final String message; @@ -147,7 +140,7 @@ MessageTracker messageTracker() private void onPreCommit(final MutableDirectBuffer buffer, final int offset) { final int frameOffset = offset + MessageHeaderEncoder.ENCODED_LENGTH; - FIX_MESSAGE_ENCODER + replayer.fixMessageEncoder .wrap(buffer, frameOffset) .connection(connectionId) .sequenceNumber(headerSeqNum); @@ -199,31 +192,33 @@ private Action onFixMessage( final int offset, final int version) { - FIX_MESSAGE.wrap( + final FixMessageDecoder fixMessageDecoder = replayer.fixMessageDecoder; + fixMessageDecoder.wrap( srcBuffer, offset, actingBlockLength, version); - if (FIX_MESSAGE.status() == MessageStatus.OK) + if (fixMessageDecoder.status() == MessageStatus.OK) { final int metaDataAdjustment = version >= metaDataSinceVersion() ? - metaDataHeaderLength() + FIX_MESSAGE.metaDataLength() : 0; + metaDataHeaderLength() + fixMessageDecoder.metaDataLength() : 0; final int messageFrameBlockLength = MESSAGE_FRAME_BLOCK_LENGTH + metaDataAdjustment; final int messageOffset = srcOffset + messageFrameBlockLength; final int messageLength = srcLength - messageFrameBlockLength; final int msgSeqNum = sequenceNumberExtractor.extract(srcBuffer, messageOffset, messageLength); - final long messageType = MessageTypeExtractor.getMessageType(FIX_MESSAGE); + final long messageType = MessageTypeExtractor.getMessageType(fixMessageDecoder); - ASCII_BUFFER.wrap(srcBuffer); + final AsciiBuffer asciiBuffer = replayer.sessionAsciiBuffer; + asciiBuffer.wrap(srcBuffer); replayHandler.onReplayedMessage( - ASCII_BUFFER, + asciiBuffer, messageOffset, messageLength, - FIX_MESSAGE.libraryId(), - FIX_MESSAGE.session(), - FIX_MESSAGE.sequenceIndex(), + fixMessageDecoder.libraryId(), + fixMessageDecoder.session(), + fixMessageDecoder.sequenceIndex(), messageType); if (gapFillMessageTypes.contains(messageType)) @@ -271,12 +266,13 @@ else if (msgSeqNum > lastSeqNo + 1) private Action onThrottleReject( final DirectBuffer srcBuffer, final int actingBlockLength, final int offset, final int version) { - THROTTLE_REJECT.wrap( + final ThrottleRejectDecoder throttleRejectDecoder = replayer.throttleRejectDecoder; + throttleRejectDecoder.wrap( srcBuffer, offset, actingBlockLength, version); - final int msgSeqNum = THROTTLE_REJECT.sequenceNumber(); + final int msgSeqNum = throttleRejectDecoder.sequenceNumber(); if (gapFillMessageTypes.contains(BUSINESS_MESSAGE_REJECT_MESSAGE_TYPE)) { @@ -299,15 +295,15 @@ else if (msgSeqNum > lastSeqNo + 1) sendGapFill(lastSeqNo, msgSeqNum, false); } - final int businessRejectRefIDOffset = THROTTLE_REJECT.limit() + + final int businessRejectRefIDOffset = throttleRejectDecoder.limit() + ThrottleNotificationDecoder.businessRejectRefIDHeaderLength(); throttleRejectBuilder.build( - THROTTLE_REJECT.refMsgType(), - THROTTLE_REJECT.refSeqNum(), - THROTTLE_REJECT.sequenceNumber(), + throttleRejectDecoder.refMsgType(), + throttleRejectDecoder.refSeqNum(), + throttleRejectDecoder.sequenceNumber(), srcBuffer, businessRejectRefIDOffset, - THROTTLE_REJECT.businessRejectRefIDLength(), + throttleRejectDecoder.businessRejectRefIDLength(), true); final Action action = sendFixMessage( @@ -351,7 +347,7 @@ private Action sendFixMessage( final int destOffset = bufferClaim.offset(); final MutableDirectBuffer destBuffer = bufferClaim.buffer(); - FIX_MESSAGE_ENCODER + replayer.fixMessageEncoder .wrapAndApplyHeader(destBuffer, destOffset, replayer.messageHeaderEncoder) .session(this.sessionId) .connection(this.connectionId) diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java index 8f81de3684..c6f3c46156 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java @@ -78,6 +78,10 @@ public class Replayer extends AbstractReplayer final CharFormatter completeNotRecentFormatter = new CharFormatter( "ReplayerSession: completeReplay-!upToMostRecent replayedMessages=%s " + "endSeqNo=%s beginSeqNo=%s expectedCount=%s connId=%s"); + final FixMessageEncoder fixMessageEncoder = new FixMessageEncoder(); + final FixMessageDecoder fixMessageDecoder = new FixMessageDecoder(); + final ThrottleRejectDecoder throttleRejectDecoder = new ThrottleRejectDecoder(); + final AsciiBuffer sessionAsciiBuffer = new MutableAsciiBuffer(); // Binary FIXP specific state private final IntHashSet gapfillOnRetransmitILinkTemplateIds;