From 317104606827067be7692412e0b612da24dfb641 Mon Sep 17 00:00:00 2001 From: devesh-b1 <132320178+devesh-b1@users.noreply.github.com> Date: Tue, 3 Sep 2024 20:29:04 +0800 Subject: [PATCH] LibraryTimeout publish sessions (#519) --- .../artio/messages/message-schema.xml | 5 ++- .../artio/engine/framer/Framer.java | 5 ++- .../artio/protocol/GatewayPublication.java | 36 ++++++++++++------- .../artio/engine/framer/FramerTest.java | 9 ++++- 4 files changed, 38 insertions(+), 17 deletions(-) diff --git a/artio-codecs/src/main/resources/uk/co/real_logic/artio/messages/message-schema.xml b/artio-codecs/src/main/resources/uk/co/real_logic/artio/messages/message-schema.xml index 77d20f555c..1cbcc21bc9 100644 --- a/artio-codecs/src/main/resources/uk/co/real_logic/artio/messages/message-schema.xml +++ b/artio-codecs/src/main/resources/uk/co/real_logic/artio/messages/message-schema.xml @@ -2,7 +2,7 @@ @@ -499,6 +499,9 @@ description="notifies library instances that they have been timed out, added for monitoring purposes"> + + + inboundPublication.saveLibraryTimeout(libraryId, 0)); - schedule(() -> outboundPublication.saveLibraryTimeout(libraryId, 0)); + schedule(() -> inboundPublication.saveLibraryTimeout(library, 0)); + schedule(() -> outboundPublication.saveLibraryTimeout(library, 0)); } private void acquireLibrarySessions(final LiveLibraryInfo library) diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/protocol/GatewayPublication.java b/artio-core/src/main/java/uk/co/real_logic/artio/protocol/GatewayPublication.java index 8fb9b1a60c..79d3393619 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/protocol/GatewayPublication.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/protocol/GatewayPublication.java @@ -27,8 +27,10 @@ import org.agrona.concurrent.status.AtomicCounter; import uk.co.real_logic.artio.DebugLogger; import uk.co.real_logic.artio.dictionary.FixDictionary; +import uk.co.real_logic.artio.engine.ConnectedSessionInfo; import uk.co.real_logic.artio.engine.RecordingCoordinator; import uk.co.real_logic.artio.engine.SessionInfo; +import uk.co.real_logic.artio.engine.framer.LibraryInfo; import uk.co.real_logic.artio.messages.*; import uk.co.real_logic.artio.messages.ControlNotificationEncoder.DisconnectedSessionsEncoder; import uk.co.real_logic.artio.messages.ControlNotificationEncoder.SessionsEncoder; @@ -125,6 +127,8 @@ public class GatewayPublication extends ClaimablePublication private static final int THROTTLE_CONFIGURATION_REPLY_LENGTH = HEADER_LENGTH + ThrottleConfigurationReplyEncoder.BLOCK_LENGTH; private static final int SEQ_INDEX_SYNC_LENGTH = HEADER_LENGTH + SeqIndexSyncEncoder.BLOCK_LENGTH; + private static final int LIBRARY_TIMEOUT_LENGTH = HEADER_LENGTH + LibraryTimeoutEncoder.BLOCK_LENGTH + + GroupSizeEncodingEncoder.ENCODED_LENGTH; private static final boolean APPLICATION_HEARTBEAT_ATTEMPT_ENABLED = isEnabled(APPLICATION_HEARTBEAT_ATTEMPT); private static final boolean APPLICATION_HEARTBEAT_ENABLED = isEnabled(APPLICATION_HEARTBEAT); @@ -966,25 +970,33 @@ public long saveRequestSessionReply(final int libraryId, final SessionReplyStatu return position; } - public long saveLibraryTimeout(final int libraryId, final long connectCorrelationId) + public long saveLibraryTimeout(final LibraryInfo libraryInfo, final long connectCorrelationId) { - final long position = claim(LibraryTimeoutEncoder.BLOCK_LENGTH + HEADER_LENGTH); - if (position < 0) - { - return position; - } + final List connectedSessionInfos = libraryInfo.sessions(); + final int sessionsCount = connectedSessionInfos.size(); - final MutableDirectBuffer buffer = bufferClaim.buffer(); - final int offset = bufferClaim.offset(); + final int framedLength = LIBRARY_TIMEOUT_LENGTH + sessionsCount * + LibraryTimeoutEncoder.SessionsEncoder.sbeBlockLength(); + final ExpandableArrayBuffer buffer = buffer(framedLength); libraryTimeout - .wrapAndApplyHeader(buffer, offset, header) - .libraryId(libraryId) + .wrapAndApplyHeader(buffer, 0, header) + .libraryId(libraryInfo.libraryId()) .connectCorrelationId(connectCorrelationId); - bufferClaim.commit(); + final LibraryTimeoutEncoder.SessionsEncoder sessionsEncoder = libraryTimeout.sessionsCount(sessionsCount); + for (int i = 0; i < sessionsCount; i++) + { + final SessionInfo session = connectedSessionInfos.get(i); + sessionsEncoder.next().sessionId(session.sessionId()); + } - logSbeMessage(GATEWAY_MESSAGE, libraryTimeout); + final long position = dataPublication.offer(buffer, 0, framedLength); + + if (position > 0) + { + logSbeMessage(GATEWAY_MESSAGE, libraryTimeout); + } return position; } diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/engine/framer/FramerTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/engine/framer/FramerTest.java index a2086409f5..cd79161ce2 100644 --- a/artio-core/src/test/java/uk/co/real_logic/artio/engine/framer/FramerTest.java +++ b/artio-core/src/test/java/uk/co/real_logic/artio/engine/framer/FramerTest.java @@ -37,6 +37,7 @@ import org.mockito.verification.VerificationMode; import uk.co.real_logic.artio.CloseChecker; import uk.co.real_logic.artio.FixCounters; +import uk.co.real_logic.artio.LivenessDetector; import uk.co.real_logic.artio.Timing; import uk.co.real_logic.artio.dictionary.FixDictionary; import uk.co.real_logic.artio.engine.*; @@ -139,6 +140,12 @@ public class FramerTest private final MutableLong connectionId = new MutableLong(NO_CONNECTION_ID); private final ErrorHandler errorHandler = mock(ErrorHandler.class); + private final LivenessDetector livenessDetector = mock(LivenessDetector.class); + + private final LiveLibraryInfo libraryInfo = new LiveLibraryInfo( + errorHandler, + LIBRARY_ID, LIBRARY_NAME, livenessDetector, 1, + false); @Before @SuppressWarnings("unchecked") @@ -1003,7 +1010,7 @@ private void verifyEndpointsCreated() private void verifyLibraryTimeout() { - verify(inboundPublication).saveLibraryTimeout(LIBRARY_ID, 0); + verify(inboundPublication).saveLibraryTimeout(libraryInfo, 0); } private void libraryHasAcceptedClient() throws IOException