diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 20a419ceb5..d76aefca85 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,6 +1,7 @@ name: Continuous Integration on: + workflow_call: push: branches: - master @@ -10,7 +11,7 @@ on: - master concurrency: - group: ${{ github.workflow }}-${{ github.ref }} + group: ci-${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true env: @@ -27,8 +28,8 @@ jobs: strategy: fail-fast: false matrix: - java: [ '8', '11', '17', '21' ] - os: [ 'ubuntu-22.04', 'windows-latest' ] + java: [ '17', '21' ] + os: [ 'ubuntu-24.04', 'windows-latest' ] steps: - name: Checkout code uses: actions/checkout@v4 @@ -56,11 +57,11 @@ jobs: java -Xinternalversion echo "BUILD_JAVA_HOME=${JAVA_HOME}" >> $GITHUB_ENV echo "BUILD_JAVA_VERSION=${{ matrix.java }}" >> $GITHUB_ENV - - name: Setup java 8 to run the Gradle script + - name: Setup java 17 to run the Gradle script uses: actions/setup-java@v4 with: distribution: 'zulu' - java-version: 8 + java-version: 17 - name: Build with Gradle run: ./gradlew - name: Copy test logs diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 2926e37390..26cd0d926f 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -1,6 +1,7 @@ name: "CodeQL" on: + workflow_call: push: branches: - master @@ -10,7 +11,7 @@ on: - master concurrency: - group: ${{ github.workflow }}-${{ github.ref }} + group: codeql-${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true jobs: @@ -34,15 +35,15 @@ jobs: ref: ${{ github.sha }} - name: Initialize CodeQL - uses: github/codeql-action/init@v2 + uses: github/codeql-action/init@v3 with: languages: ${{ matrix.language }} config-file: ./.github/codeql/codeql-config.yml - name: Autobuild - uses: github/codeql-action/autobuild@v2 + uses: github/codeql-action/autobuild@v3 - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v2 + uses: github/codeql-action/analyze@v3 with: category: "/language:${{ matrix.language }}" diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000000..5e62641cb7 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,60 @@ +name: Release + +on: + workflow_dispatch: + push: + tags: + - '*' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: false + +env: + GRADLE_OPTS: '-Dorg.gradle.daemon=false -Dorg.gradle.java.installations.auto-detect=false' + +jobs: + ci: + uses: ./.github/workflows/ci.yml + permissions: + contents: read + + codeql: + uses: ./.github/workflows/codeql.yml + permissions: + actions: read + contents: read + security-events: write + + release: + name: Release java artifacts + permissions: + contents: write + packages: write + needs: [ ci, codeql ] + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + ref: ${{ github.ref }} + - name: Create Release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: ${{ github.ref }} + release_name: Release ${{ github.ref }} + draft: true + - name: Setup java + uses: actions/setup-java@v4 + with: + distribution: 'zulu' + java-version: 17 + - name: Publish with Gradle to Open Source + run: ./gradlew publishAllPublicationsToOssRepository + env: + ORG_GRADLE_PROJECT_repoUsername: ${{ secrets.SONATYPE_CENTRAL_USERNAME }} + ORG_GRADLE_PROJECT_repoPassword: ${{ secrets.SONATYPE_CENTRAL_PASSWORD }} + ORG_GRADLE_PROJECT_signingKey: ${{ secrets.GPG_RSA_SIGN_KEY }} + ORG_GRADLE_PROJECT_signingPassword: ${{ secrets.GPG_RSA_SIGN_KEYPASS }} diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000..8f32d942c4 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,6 @@ +# the idea of this dockerfile is to allow running artio build in multiple containers in order to increase the chances of a failing test locally +FROM alpine/java:21-jdk as artio-image +ENV GRADLE_OPTS="-Dorg.gradle.daemon=false -Dfix.core.debug=STATE_CLEANUP,FIX_MESSAGE,REPLAY,FIXP_SESSION,FIXP_BUSINESS -Dfix.core.ci=true" +ADD . artio-src +WORKDIR artio-src +ENTRYPOINT ./gradlew clean test diff --git a/artio-binary-entrypoint-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/BinaryEntryPointClient.java b/artio-binary-entrypoint-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/BinaryEntryPointClient.java index 0a5abe7844..f2935dee3d 100644 --- a/artio-binary-entrypoint-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/BinaryEntryPointClient.java +++ b/artio-binary-entrypoint-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/BinaryEntryPointClient.java @@ -28,6 +28,7 @@ import uk.co.real_logic.artio.DebugLogger; import uk.co.real_logic.artio.binary_entrypoint.BinaryEntryPointProtocol; import uk.co.real_logic.sbe.json.JsonPrinter; +import uk.co.real_logic.sbe.otf.OtfHeaderDecoder; import java.io.IOException; import java.net.InetSocketAddress; @@ -89,6 +90,8 @@ public final class BinaryEntryPointClient implements AutoCloseable private long keepAliveIntervalInMs = KEEP_ALIVE_INTERVAL_IN_MS; private CancelOnDisconnectType cancelOnDisconnectType = DO_NOT_CANCEL_ON_DISCONNECT_OR_TERMINATE; private long codTimeoutWindow = DeltaInMillisEncoder.timeNullValue(); + private static final OtfHeaderDecoder OTF_HEADER_DECODER = new OtfHeaderDecoder( + BinaryEntryPointProtocol.loadSbeIr().headerStructure()); public BinaryEntryPointClient(final int port, final TestSystem testSystem, final long serverAliveIntervalInMs) throws IOException @@ -331,9 +334,15 @@ private void print(final UnsafeBuffer unsafeReadBuffer, final String prefixStrin { if (DebugLogger.isEnabled(FIX_TEST)) { - final StringBuilder sb = new StringBuilder(); - jsonPrinter.print(sb, unsafeReadBuffer, SOFH_LENGTH); - DebugLogger.log(FIX_TEST, prefixString, sb.toString()); + // when templateId == 1000 the call to jsonPrinter.print throws an exception as it does not recognize + // this as a valid templateId + final int templateId = OTF_HEADER_DECODER.getTemplateId(unsafeReadBuffer, SOFH_LENGTH); + if (templateId != OUT_OF_RANGE_TEMPLATE_ID) + { + final StringBuilder sb = new StringBuilder(); + jsonPrinter.print(sb, unsafeReadBuffer, SOFH_LENGTH); + DebugLogger.log(FIX_TEST, prefixString, sb.toString()); + } } } @@ -492,6 +501,7 @@ public void writeNewOrderSingle(final int clOrdId) newOrderSingle .clOrdID(clOrdId) + .selfTradePreventionInstruction(SelfTradePreventionInstruction.CANCEL_BOTH_ORDERS) .securityID(SECURITY_ID) .price().mantissa(3); newOrderSingle diff --git a/artio-binary-entrypoint-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/CancelOnDisconnectBinaryEntrypointSystemTest.java b/artio-binary-entrypoint-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/CancelOnDisconnectBinaryEntrypointSystemTest.java index 03c6e67e65..744b1f7b51 100644 --- a/artio-binary-entrypoint-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/CancelOnDisconnectBinaryEntrypointSystemTest.java +++ b/artio-binary-entrypoint-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/CancelOnDisconnectBinaryEntrypointSystemTest.java @@ -216,7 +216,7 @@ private void assertTriggersCancelOnDisconnect(final long logoutTimeInNs) assertEquals(onlySession.key(), result.context.key()); final long timeoutTakenInNs = result.timeInNs - logoutTimeInNs; assertThat(timeoutTakenInNs, greaterThanOrEqualTo(codTimeoutInNs)); - assertEquals(1, timeoutHandler.invokeCount()); + testSystem.await("timeoutHandler.invokeCount() is not 1", () -> 1 == timeoutHandler.invokeCount()); } class FakeTimeoutHandler implements FixPCancelOnDisconnectTimeoutHandler diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/DecoderGenerator.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/DecoderGenerator.java index e49867b68f..54fc32a812 100644 --- a/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/DecoderGenerator.java +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/DecoderGenerator.java @@ -416,21 +416,28 @@ protected String resetGroup(final Entry entry) " {\n" + " for (final %2$s %6$s : %5$s.iterator())\n" + " {\n" + - " %6$s.reset();\n" + " if (%6$s.next() == null)\n" + " {\n" + + " %6$s.reset();\n" + " break;\n" + " }\n" + + " else\n" + + " {\n" + + " %6$s.reset();\n" + + " }\n" + " }\n" + " %3$s = MISSING_INT;\n" + " has%4$s = false;\n" + + " %7$s = null;\n" + " }\n\n", resetMethod, decoderClassName(name), formatPropertyName(numberField.name()), numberField.name(), iteratorFieldName(group), - formatPropertyName(decoderClassName(name))); + formatPropertyName(decoderClassName(name)), + formatPropertyName(name) + ); } } @@ -484,6 +491,9 @@ private String additionalReset(final boolean isGroup) { return " buffer = null;\n" + + (isGroup ? + " next = null;\n" : "" + ) + " if (" + CODEC_VALIDATION_ENABLED + ")\n" + " {\n" + " invalidTagId = Decoder.NO_ERROR;\n" + @@ -1925,7 +1935,12 @@ private String decodeGroup(final Entry entry) " {\n" + " invalidTagId = tag;\n" + " rejectReason = %6$s;\n" + - " return position;\n" + + " while (%1$sCurrent != null) \n" + + " {\n" + + " position += %1$sCurrent.decode(buffer, position, end - position);\n" + + " %1$sCurrent = %1$sCurrent.next();\n" + + " }\n" + + " return position - offset;\n" + " }\n" + " }\n" + " }\n", diff --git a/artio-codecs/src/main/resources/uk/co/real_logic/artio/messages/message-schema.xml b/artio-codecs/src/main/resources/uk/co/real_logic/artio/messages/message-schema.xml index 77d20f555c..1cbcc21bc9 100644 --- a/artio-codecs/src/main/resources/uk/co/real_logic/artio/messages/message-schema.xml +++ b/artio-codecs/src/main/resources/uk/co/real_logic/artio/messages/message-schema.xml @@ -2,7 +2,7 @@ @@ -499,6 +499,9 @@ description="notifies library instances that they have been timed out, added for monitoring purposes"> + + + + assertInvalid(decoder1, + INCORRECT_NUMINGROUP_COUNT_FOR_REPEATING_GROUP, + 120), REPEATING_GROUP_MESSAGE_WITH_TOO_HIGH_NUMBER_FIELD); + decodeHeartbeatWithRejectingUnknownFields( + decoder, this::assertValid, REPEATING_GROUP_MESSAGE_WITH_THREE); + } + + @Test + public void shouldSupportGroupNumbersGreaterThanTheNumberOfElementsInTheNestedGroup() throws Exception + { + final Decoder decoder = decodeHeartbeatWithRejectingUnknownFields( + NESTED_REPEATING_GROUP_MESSAGE_WITH_TOO_HIGH_NUMBER_FIELD); + + assertInvalid(decoder, INCORRECT_NUMINGROUP_COUNT_FOR_REPEATING_GROUP, 122); + } + + @Test + public void shouldReasonablyValidateGroupNumbersLessThanTheNumberOfElementsInTheNestedGroup() throws Exception + { + final Decoder decoder = decodeHeartbeatWithRejectingUnknownFields( + NESTED_REPEATING_GROUP_MESSAGE_WITH_TOO_LOW_NUMBER_FIELD); + + assertInvalid(decoder, INCORRECT_NUMINGROUP_COUNT_FOR_REPEATING_GROUP, 122); + } + @Test public void shouldLeaveDecoderInUsableIfUnknownFieldForRepeatingGroupReachedAndRejectingOn() throws Exception { @@ -1139,15 +1175,29 @@ public void shouldResetAllNestedRepeatingGroupEntries() throws Exception decoder.reset(); - decode(MULTI_ENTRY_NESTED_GROUP_MESSAGE_WITHOUT_NESTED_FIELDS, decoder); + decode(MULTI_ENTRY_EG_GROUP_MESSAGE_WITHOUT_NESTED_GROUPS, decoder); assertEquals(2, getNoEgGroupGroupCounter(decoder)); group = getEgGroup(decoder); + assertNull(getNestedGroup(group)); + + group = next(group); + assertNull(getNestedGroup(group)); + } + + @Test + public void shouldHaveAllNestedRepeatingGroupEntriesNullifiedWhenMessageDoesNotHaveIt() throws Exception + { + Object group; + + final Decoder decoder = decodeHeartbeat(MULTI_ENTRY_EG_GROUP_MESSAGE_WITHOUT_NESTED_GROUPS); + assertEquals(2, getNoEgGroupGroupCounter(decoder)); - assertNestedRepeating(group, 1, CodecUtil.MISSING_INT, CodecUtil.MISSING_INT); + group = getEgGroup(decoder); + assertNull(getNestedGroup(group)); group = next(group); - assertNestedRepeating(group, 2, CodecUtil.MISSING_INT, CodecUtil.MISSING_INT); + assertNull(getNestedGroup(group)); } @Test @@ -1861,6 +1911,15 @@ Decoder decodeHeartbeatWithoutValidation(final String example) throws Exception return decoder; } + private Decoder decodeHeartbeatWithRejectingUnknownFields(final Decoder decoder, + final Consumer consumer, + final String example) + { + decode(example, decoder); + consumer.accept(decoder); + return decoder; + } + private Decoder decodeHeartbeatWithRejectingUnknownFields(final String example) throws Exception { final Decoder decoder = (Decoder)heartbeatWithRejectingUnknownFields.getConstructor().newInstance(); @@ -1871,6 +1930,7 @@ private Decoder decodeHeartbeatWithRejectingUnknownFields(final String example) void decode(final String example, final Decoder decoder) { buffer.putAscii(1, example); + decoder.reset(); decoder.decode(buffer, 1, example.length()); } diff --git a/artio-codecs/src/test/java/uk/co/real_logic/artio/util/TestMessages.java b/artio-codecs/src/test/java/uk/co/real_logic/artio/util/TestMessages.java index 17c8a9df2e..98cdb0f340 100644 --- a/artio-codecs/src/test/java/uk/co/real_logic/artio/util/TestMessages.java +++ b/artio-codecs/src/test/java/uk/co/real_logic/artio/util/TestMessages.java @@ -22,8 +22,8 @@ public final class TestMessages { // BUY 100 CVS MKT DAY public static final byte[] EG_MESSAGE = toAscii("8=FIX.4.2\0019=146\00135=D\00134=4\00149=ABC_DEFG01\001" + - "52=20090323-15:40:29\00156=CCG\001115=XYZ\00111=NF 0542/03232009\00154=1\00138=100\00155=CVS\00140=1" + - "\00159=0\00147=A\00160=20090323-15:40:29\00121=1\001207=N\00110=195\001"); + "52=20090323-15:40:29\00156=CCG\001115=XYZ\00111=NF 0542/03232009\00154=1\00138=100\00155=CVS\00140=1" + + "\00159=0\00147=A\00160=20090323-15:40:29\00121=1\001207=N\00110=195\001"); public static final byte[] LOGON_MESSAGE = toAscii("8=FIX.4.2\0019=64\00135=A\00134=1\00149=ABC_DEFG01" + "\00152=20090323-15:40:29\00156=CCG\00198=0\001108=30\00110=161\001"); @@ -36,55 +36,55 @@ public final class TestMessages "\00159=0\00147=A\00160=20090323-15:40:29\00121=1\001207=N\00110=146\001"); public static final byte[] INVALID_CHECKSUM_MSG = toAscii("8=FIX.4.2\0019=133\00135=D\00134=4\00149=ABC_DEFG01\001" + - "52=20090323-15:40:29\00156=CCG\001115=XYZ\00111=NF 0542/03232009\00155=CVS\00140=1\00159=0\00147=A" + - "60=20090323-15:40:29\00121=1\001207=N\00110=155\001"); + "52=20090323-15:40:29\00156=CCG\001115=XYZ\00111=NF 0542/03232009\00155=CVS\00140=1\00159=0\00147=A" + + "60=20090323-15:40:29\00121=1\001207=N\00110=155\001"); public static final int INVALID_CHECKSUM_LEN = INVALID_CHECKSUM_MSG.length; public static final byte[] INVALID_MESSAGE = toAscii("8=FIX.4.2\0019=145\00135=D\00134=4\00149=ABC_DEFG01\001" + - "52=\\\\\20156=CCG\001115=XYZ\00111=NF 0542/03232009\001\001\001\001\001\00154=1\00138=55140=" + - "\00159=0\00147=A\00160=20090323-15:40:29\00121=1\001207=N\00110=194\001"); + "52=\\\\\20156=CCG\001115=XYZ\00111=NF 0542/03232009\001\001\001\001\001\00154=1\00138=55140=" + + "\00159=0\00147=A\00160=20090323-15:40:29\00121=1\001207=N\00110=194\001"); public static final int INVALID_LEN = INVALID_MESSAGE.length; public static final byte[] EXECUTION_REPORT = toAscii("8=FIX.4.2\0019=378\00135=8\001128=XYZ\00134=5\00149=CCG" + - "\00156=ABC_DEFG01\00152=20090323-" + - "15:40:35\00155=CVS\00137=NF 0542/03232009\00111=NF 0542/03232009\00117=NF 0542/03232009" + - "001001001\00120=0\00139=2\001150=2\00154=1\00138=100\00140=1\00159=0\00131=25.4800\00132=100\001" + - "14=0\0016=0\001151=0\00160=20090323-15:40:30\00158=Fill\00130=N\00176=0034\001207=N\00147=A\001" + - "9430=NX\0019483=000008\0019578=1\001382=1\001375=TOD\001337=0000\001437=100\001438=1243\001" + - "9579=0000100001\0019426=2/2\0019433=0034\00129=1\00163=0\0019440=001001001\00110=080\001"); + "\00156=ABC_DEFG01\00152=20090323-" + + "15:40:35\00155=CVS\00137=NF 0542/03232009\00111=NF 0542/03232009\00117=NF 0542/03232009" + + "001001001\00120=0\00139=2\001150=2\00154=1\00138=100\00140=1\00159=0\00131=25.4800\00132=100\001" + + "14=0\0016=0\001151=0\00160=20090323-15:40:30\00158=Fill\00130=N\00176=0034\001207=N\00147=A\001" + + "9430=NX\0019483=000008\0019578=1\001382=1\001375=TOD\001337=0000\001437=100\001438=1243\001" + + "9579=0000100001\0019426=2/2\0019433=0034\00129=1\00163=0\0019440=001001001\00110=080\001"); public static final byte[] ZERO_REPEATING_GROUP = toAscii("8=FIX.4.2\0019=378\00135=8\001128=XYZ\00134=5\00149=CCG" + - "\00156=ABC_DEFG01\00152=20090323-" + - "15:40:35\00155=CVS\00137=NF 0542/03232009\00111=NF 0542/03232009\00117=NF 0542/03232009" + - "001001001\00120=0\00139=2\001150=2\00154=1\00138=100\00140=1\00159=0\00131=25.4800\00132=100\001" + - "14=0\0016=0\001151=0\00160=20090323-15:40:30\00158=Fill\00130=N\00176=0034\001207=N\00147=A\001" + - "9430=NX\0019483=000008\0019578=1\001382=0\001" + - "9579=0000100001\0019426=2/2\0019433=0034\00129=1\00163=0\0019440=001001001\00110=080\001"); + "\00156=ABC_DEFG01\00152=20090323-" + + "15:40:35\00155=CVS\00137=NF 0542/03232009\00111=NF 0542/03232009\00117=NF 0542/03232009" + + "001001001\00120=0\00139=2\001150=2\00154=1\00138=100\00140=1\00159=0\00131=25.4800\00132=100\001" + + "14=0\0016=0\001151=0\00160=20090323-15:40:30\00158=Fill\00130=N\00176=0034\001207=N\00147=A\001" + + "9430=NX\0019483=000008\0019578=1\001382=0\001" + + "9579=0000100001\0019426=2/2\0019433=0034\00129=1\00163=0\0019440=001001001\00110=080\001"); public static final byte[] REPEATING_GROUP = toAscii("8=FIX.4.2\0019=190\00135=E\00149=INST\00156=BROK\001" + - "52=20050908-15:51:22\00134=200\00166=14\001394=1\00168=2\001" + - "73=2\001" + - "11=order-1\00167=1\00155=IBM\00154=2\00138=2000\00140=1\001" + - "11=order-2\00167=2\00155=AOL\00154=2\00138=1000\00140=1\001"); + "52=20050908-15:51:22\00134=200\00166=14\001394=1\00168=2\001" + + "73=2\001" + + "11=order-1\00167=1\00155=IBM\00154=2\00138=2000\00140=1\001" + + "11=order-2\00167=2\00155=AOL\00154=2\00138=1000\00140=1\001"); // See http://fixwiki.org/fixwiki/FPL:Tag_Value_Syntax#Example_of_nested_repeating_group for details public static final byte[] NESTED_REPEATING_GROUP = toAscii("8=FIX.4.2\0019=190\00135=E\00149=INST\00156=BROK\001" + - "52=20050908-15:51:22\00134=200\00166=14\001394=1\00168=2\001" + - "73=2\001" + // NoOrders Group - "11=order-1\00167=1\00155=IBM\00154=2\00138=2000\00140=1\001" + - "78=2\001" + // NoAllocs nested group - "79=bob\001467=10\001366=4\001" + - "79=sally\001467=11\001366=5\001" + - "11=order-2\00167=2\00155=AOL\00154=2\00138=1000\00140=1\001"); + "52=20050908-15:51:22\00134=200\00166=14\001394=1\00168=2\001" + + "73=2\001" + // NoOrders Group + "11=order-1\00167=1\00155=IBM\00154=2\00138=2000\00140=1\001" + + "78=2\001" + // NoAllocs nested group + "79=bob\001467=10\001366=4\001" + + "79=sally\001467=11\001366=5\001" + + "11=order-2\00167=2\00155=AOL\00154=2\00138=1000\00140=1\001"); public static final byte[] INVALID_LENGTH_MESSAGE = toAscii( "8=FIX.4.4\0019=5\00135=A\00134=1\00149=TW\00152=20150604-12:46:54\00156=ISLD\00198=0\00110=000\001"); public static final byte[] ZERO_CHECKSUM_MESSAGE = toAscii( "8=FIX.4.4\0019=0067\00135=0\00149=acceptor\00156=initiator\00134=2" + - "\00152=20160415-12:50:23.294\001112=hi\00110=000\001"); + "\00152=20160415-12:50:23.294\001112=hi\00110=000\001"); public static final int NO_ORDERS = 73; public static final int NO_ALLOCS = 78; diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/LogTag.java b/artio-core/src/main/java/uk/co/real_logic/artio/LogTag.java index a98c1caa48..855229c416 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/LogTag.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/LogTag.java @@ -30,7 +30,7 @@ public enum LogTag * This logs direct reads and writes of messages from TCP sockets. These lines are labelled "Read" and * "Written". * - * NB: TCP reads may read multiple messages at the same time. This isn't a bug. + * NB: TCP reads may read partial messages or multiple messages at the same time. This isn't a bug. */ FIX_MESSAGE_TCP, /** diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/LowResourceEngineScheduler.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/LowResourceEngineScheduler.java index 260d3f5720..580c6a789d 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/LowResourceEngineScheduler.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/LowResourceEngineScheduler.java @@ -117,7 +117,7 @@ public void configure(final Aeron.Context aeronContext) /** * Adapt a recording coordinator to the Agent interface to enable it to be shutdown in order. */ - private class RecordingCoordinatorAgent implements Agent + private final class RecordingCoordinatorAgent implements Agent { @Override public int doWork() diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/EngineStreamInfo.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/EngineStreamInfo.java index c87bbbd775..0c60b025d1 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/EngineStreamInfo.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/EngineStreamInfo.java @@ -4,6 +4,7 @@ public final class EngineStreamInfo { private final long inboundIndexSubscriptionRegistrationId; private final long outboundIndexSubscriptionRegistrationId; + private final long librarySubscriptionRegistrationId; private final int inboundPublicationSessionId; private final long inboundPublicationPosition; private final int outboundPublicationSessionId; @@ -12,6 +13,7 @@ public final class EngineStreamInfo EngineStreamInfo( final long inboundIndexSubscriptionRegistrationId, final long outboundIndexSubscriptionRegistrationId, + final long librarySubscriptionRegistrationId, final int inboundPublicationSessionId, final long inboundPublicationPosition, final int outboundPublicationSessionId, @@ -19,6 +21,7 @@ public final class EngineStreamInfo { this.inboundIndexSubscriptionRegistrationId = inboundIndexSubscriptionRegistrationId; this.outboundIndexSubscriptionRegistrationId = outboundIndexSubscriptionRegistrationId; + this.librarySubscriptionRegistrationId = librarySubscriptionRegistrationId; this.inboundPublicationSessionId = inboundPublicationSessionId; this.inboundPublicationPosition = inboundPublicationPosition; this.outboundPublicationSessionId = outboundPublicationSessionId; @@ -35,6 +38,11 @@ public long outboundIndexSubscriptionRegistrationId() return outboundIndexSubscriptionRegistrationId; } + public long librarySubscriptionRegistrationId() + { + return librarySubscriptionRegistrationId; + } + public int inboundPublicationSessionId() { return inboundPublicationSessionId; @@ -60,6 +68,7 @@ public String toString() return "EngineStreamInfo{" + "inboundIndexSubscriptionRegistrationId=" + inboundIndexSubscriptionRegistrationId + ", outboundIndexSubscriptionRegistrationId=" + outboundIndexSubscriptionRegistrationId + + ", librarySubscriptionRegistrationId=" + librarySubscriptionRegistrationId + ", inboundPublicationSessionId=" + inboundPublicationSessionId + ", inboundPublicationPosition=" + inboundPublicationPosition + ", outboundPublicationSessionId=" + outboundPublicationSessionId + diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java index 76a07bc1d1..e2a9a2a0ef 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java @@ -559,9 +559,8 @@ private boolean receivedIndexedPosition(final int aeronSessionId, final long pos private void saveLibraryTimeout(final LibraryInfo library) { - final int libraryId = library.libraryId(); - schedule(() -> inboundPublication.saveLibraryTimeout(libraryId, 0)); - schedule(() -> outboundPublication.saveLibraryTimeout(libraryId, 0)); + schedule(() -> inboundPublication.saveLibraryTimeout(library, 0)); + schedule(() -> outboundPublication.saveLibraryTimeout(library, 0)); } private void acquireLibrarySessions(final LiveLibraryInfo library) @@ -1716,24 +1715,17 @@ public Action onValidResendRequest( private void checkOfflineSequenceReset(final long sessionId, final long messageType, final int sequenceIndex) { - if (messageType == LOGON_MESSAGE_TYPE) - { - // Always a sequence reset - final Map.Entry entry = fixContexts.lookupById(sessionId); - if (entry != null) - { - final SessionContext context = entry.getValue(); - context.onSequenceReset(clock.nanoTime()); - } - } - else if (messageType == SEQUENCE_RESET_MESSAGE_TYPE) + if (messageType == LOGON_MESSAGE_TYPE || messageType == SEQUENCE_RESET_MESSAGE_TYPE) { - // If it's not a gap-fill it's a sequence reset final Map.Entry entry = fixContexts.lookupById(sessionId); if (entry != null) { final SessionContext context = entry.getValue(); - context.onSequenceIndex(clock.nanoTime(), sequenceIndex); + final int currentSequenceIndex = context.sequenceIndex(); + if (sequenceIndex > currentSequenceIndex) + { + context.onSequenceIndex(clock.nanoTime(), sequenceIndex); + } } } } @@ -3503,6 +3495,7 @@ public void onEngineStreamInfoRequest(final EngineStreamInfoRequestCommand comma command.complete(new EngineStreamInfo( inboundIndexRegistrationId, outboundIndexRegistrationId, + librarySubscription.registrationId(), inboundPublication.sessionId(), inboundPublication.position(), outboundPublication.sessionId(), diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/NioSelectedKeySet.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/NioSelectedKeySet.java new file mode 100644 index 0000000000..e4dc2be210 --- /dev/null +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/NioSelectedKeySet.java @@ -0,0 +1,247 @@ +/* + * Copyright 2014-2024 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.co.real_logic.artio.engine.framer; + +import org.agrona.collections.ArrayUtil; + +import java.nio.channels.SelectionKey; +import java.util.AbstractSet; +import java.util.Arrays; +import java.util.Iterator; +import java.util.function.ToIntFunction; + +/** + * Try to fix handling of HashSet for {@link java.nio.channels.Selector} to avoid excessive allocation. + * Assumes single threaded usage. + */ +final class NioSelectedKeySet extends AbstractSet +{ + private static final int INITIAL_CAPACITY = 10; + + private SelectionKey[] keys; + private int size = 0; + + /** + * Construct a key set with default capacity + */ + NioSelectedKeySet() + { + keys = new SelectionKey[INITIAL_CAPACITY]; + } + + /** + * Construct a key set with the given capacity. + * + * @param initialCapacity for the key set + */ + NioSelectedKeySet(final int initialCapacity) + { + if (initialCapacity < 0 || initialCapacity > ArrayUtil.MAX_CAPACITY) + { + throw new IllegalArgumentException("invalid initial capacity: " + initialCapacity); + } + + keys = new SelectionKey[Math.max(initialCapacity, INITIAL_CAPACITY)]; + } + + /** + * {@inheritDoc} + */ + public int size() + { + return size; + } + + /** + * Capacity of the current set. + * + * @return capacity of the set. + */ + public int capacity() + { + return keys.length; + } + + /** + * {@inheritDoc} + */ + public boolean isEmpty() + { + return 0 == size; + } + + /** + * {@inheritDoc} + */ + public boolean add(final SelectionKey selectionKey) + { + if (null == selectionKey) + { + return false; + } + + ensureCapacity(size + 1); + keys[size++] = selectionKey; + + return true; + } + + /** + * {@inheritDoc} + */ + public boolean remove(final Object o) + { + for (int i = 0; i < size; i++) + { + final SelectionKey key = keys[i]; + if (key.equals(o)) + { + keys[i] = keys[--size]; + keys[size] = null; + return true; + } + } + + return false; + } + + /** + * {@inheritDoc} + */ + public boolean contains(final Object o) + { + for (int i = 0; i < size; i++) + { + final SelectionKey key = keys[i]; + if (key.equals(o)) + { + return true; + } + } + + return false; + } + + /** + * Return selected keys for direct processing which is valid up to {@link #size()} index. + * + * @return selected keys for direct processing which is valid up to {@link #size()} index. + */ + public SelectionKey[] keys() + { + return keys; + } + + /** + * Reset for next iteration. + */ + public void reset() + { + size = 0; + } + + /** + * Null out the keys and set size to 0. + */ + public void clear() + { + Arrays.fill(keys, null); + size = 0; + } + + /** + * Reset for next iteration, having only processed a subset of the selection keys. + *

+ * The {@link NioSelectedKeySet} will still contain the keys representing IO events after + * the skip Count have been removed, the remaining events can be processed in a future iteration. + * + * @param skipCount the number of keys to be skipped over that have already been processed. + */ + public void reset(final int skipCount) + { + if (skipCount > size) + { + throw new IllegalArgumentException("skipCount " + skipCount + " > size " + size); + } + + if (0 != size) + { + final SelectionKey[] keys = this.keys; + final int newSize = size - skipCount; + + System.arraycopy(keys, skipCount, keys, 0, newSize); + + size = newSize; + } + } + + /** + * Iterate over the key set and apply a given function. + * + * @param function to apply to each {@link java.nio.channels.SelectionKey} + * @return number of handled frames. + */ + @SuppressWarnings("overloads") + public int forEach(final ToIntFunction function) + { + int handledFrames = 0; + final SelectionKey[] keys = this.keys; + + for (int i = size - 1; i >= 0; i--) + { + handledFrames += function.applyAsInt(keys[i]); + } + + size = 0; + + return handledFrames; + } + + /** + * {@inheritDoc} + */ + public Iterator iterator() + { + throw new UnsupportedOperationException(); + } + + private void ensureCapacity(final int requiredCapacity) + { + if (requiredCapacity < 0) + { + throw new IllegalStateException( + "insufficient capacity: length=" + keys.length + " required=" + requiredCapacity); + } + + final int currentCapacity = keys.length; + if (requiredCapacity > currentCapacity) + { + int newCapacity = currentCapacity + (currentCapacity >> 1); + + if (newCapacity < 0 || newCapacity > ArrayUtil.MAX_CAPACITY) + { + if (currentCapacity == ArrayUtil.MAX_CAPACITY) + { + throw new IllegalStateException("max capacity reached: " + ArrayUtil.MAX_CAPACITY); + } + + newCapacity = ArrayUtil.MAX_CAPACITY; + } + + keys = Arrays.copyOf(keys, newCapacity); + } + } +} diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/PasswordCleaner.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/PasswordCleaner.java index a05b8c1027..8ce32bf30c 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/PasswordCleaner.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/PasswordCleaner.java @@ -165,7 +165,7 @@ public int cleanedLength() return cleanedLength; } - private class FieldScanner implements OtfMessageAcceptor + private final class FieldScanner implements OtfMessageAcceptor { public MessageControl onNext() { diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/PruneOperation.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/PruneOperation.java index 2481492a60..9e7393606f 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/PruneOperation.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/PruneOperation.java @@ -53,6 +53,8 @@ public static class Formatters private final CharFormatter filteredRecordingFormatter = new CharFormatter( "PruneOperation: filtered recordingId=%s,segmentStartPosition=%s,lowerBoundPrunePosition=%s" + ",segmentFileLength=%s,requestedNewStartPosition=%s,startPosition=%s"); + private final CharFormatter purgedSegmentsFormatter = new CharFormatter( + "PruneOperation: purged recordingId=%s,oldStartPosition=%s,newStartPosition=%s,durationNs=%s"); } private final Formatters formatters; @@ -235,7 +237,19 @@ private void prune() } else { + final long start = System.nanoTime(); aeronArchive.purgeSegments(recordingId, segmentStartPosition); + if (STATE_CLEANUP_ENABLED) + { + final long durationNs = System.nanoTime() - start; + formatters.purgedSegmentsFormatter + .clear() + .with(recordingId) + .with(stashedStartPosition) + .with(segmentStartPosition) + .with(durationNs); + DebugLogger.log(STATE_CLEANUP, formatters.purgedSegmentsFormatter); + } recordingIdToNewStartPosition.put(recordingId, segmentStartPosition); } } diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/ReceiverEndPoints.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/ReceiverEndPoints.java index 0a95f065c1..8b0ffe4ad6 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/ReceiverEndPoints.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/ReceiverEndPoints.java @@ -22,7 +22,9 @@ import uk.co.real_logic.artio.messages.DisconnectReason; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.util.Arrays; import java.util.function.LongConsumer; import java.util.stream.Stream; @@ -37,6 +39,41 @@ class ReceiverEndPoints extends TransportPoller public static final int ARTIO_ITERATION_THRESHOLD = Integer.getInteger( ARTIO_ITERATION_THRESHOLD_PROP_NAME, ITERATION_THRESHOLD_DEFAULT); + // FIXME: >> A temporary workaround to the recursive poll problem + private static final Field SELECTED_KEYS_FIELD; + private static final Field PUBLIC_SELECTED_KEYS_FIELD; + static + { + Field selectKeysField = null; + Field publicSelectKeysField = null; + + try (Selector selector = Selector.open()) + { + final Class clazz = Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader()); + + if (clazz.isAssignableFrom(selector.getClass())) + { + selectKeysField = clazz.getDeclaredField("selectedKeys"); + selectKeysField.setAccessible(true); + + publicSelectKeysField = clazz.getDeclaredField("publicSelectedKeys"); + publicSelectKeysField.setAccessible(true); + } + } + catch (final Exception ex) + { + LangUtil.rethrowUnchecked(ex); + } + finally + { + SELECTED_KEYS_FIELD = selectKeysField; + PUBLIC_SELECTED_KEYS_FIELD = publicSelectKeysField; + } + } + + private final NioSelectedKeySet selectedKeySet = new NioSelectedKeySet(); + // FIXME: << temporary workaround + private final ErrorHandler errorHandler; // Authentication flow requires periodic polling of the receiver end points until the authentication is @@ -50,6 +87,17 @@ class ReceiverEndPoints extends TransportPoller ReceiverEndPoints(final ErrorHandler errorHandler) { this.errorHandler = errorHandler; + + // FIXME: A temporary workaround using legacy Selector hacks + try + { + SELECTED_KEYS_FIELD.set(selector, selectedKeySet); + PUBLIC_SELECTED_KEYS_FIELD.set(selector, selectedKeySet); + } + catch (final Exception ex) + { + throw new RuntimeException(ex); + } } void add(final ReceiverEndPoint endPoint) diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java index c6f3c46156..e1dce081eb 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java @@ -384,7 +384,9 @@ else if (fixPConnectionIds.contains(connectionId)) return session; } - throw new IllegalStateException("Unknown session: sessionId=" + sessionId + ",connectionId=" + connectionId); + // ManageSession and ValidResendRequest might race each other (different sessions), so it's possible we see + // VRR before MS and sessionCodecs is null, simply wait in this case for the other image to catch up. + return null; } private FixReplayerSession processFixResendRequest( diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/library/FixLibrary.java b/artio-core/src/main/java/uk/co/real_logic/artio/library/FixLibrary.java index f426cb5c37..a6a885bb34 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/library/FixLibrary.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/library/FixLibrary.java @@ -397,7 +397,7 @@ public SessionWriter sessionWriter( * This allows offline sessions to be created using this method and messages sent via either the SessionWriter * or the Session itself. * - * For the FIX version see {@link #followerFixPSession(FixPContext, long)}. + * For the FIXP version see {@link #followerFixPSession(FixPContext, long)}. * * @param headerEncoder the message header that contains fields that identify the Session. You could set the * senderCompId and targetCompId on this header for example if those are the fields used to diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryPoller.java b/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryPoller.java index aa52297339..b568fa8bd6 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryPoller.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryPoller.java @@ -1294,6 +1294,17 @@ public Action onDisconnect( } } + final WeakReference sessionWriterRef = + sessionIdToFollowerSessionWriter.get(session.id()); + if (sessionWriterRef != null) + { + final SessionWriter sessionWriter = sessionWriterRef.get(); + if (sessionWriter != null) + { + InternalSession.disconnectWriter(sessionWriter); + } + } + connectionIdToSession.remove(connectionId); if (isEngineOwned) diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryTransport.java b/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryTransport.java index 2d2a564974..c8fe1e0245 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryTransport.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryTransport.java @@ -30,7 +30,7 @@ class LibraryTransport { - private static final String OUTBOUND_PUBLICATION = "outboundPublication"; + private static final String OUTBOUND_PUBLICATION = "library outboundPublication"; private final LibraryConfiguration configuration; private final FixCounters fixCounters; @@ -87,7 +87,7 @@ void initStreams(final String aeronChannel) idleStrategy, outboundDataPublication(aeronChannel)); final ExclusivePublication publication = aeron.addExclusivePublication(aeronChannel, inboundLibraryStream); - StreamInformation.print("inboundPublication", publication, printAeronStreamIdentifiers); + StreamInformation.print("library inboundPublication", publication, printAeronStreamIdentifiers); inboundPublication = new GatewayPublication( publication, fixCounters.failedInboundPublications(), diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/library/SessionHandler.java b/artio-core/src/main/java/uk/co/real_logic/artio/library/SessionHandler.java index 514220873c..2fa88ef973 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/library/SessionHandler.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/library/SessionHandler.java @@ -35,7 +35,7 @@ public interface SessionHandler * @param libraryId the id of library which has received this message. * @param session the session which has received this message. * @param sequenceIndex the sequence index of this message. - * @param messageType the FIX msgType field, encoded as an int. + * @param messageType the FIX msgType field, encoded as a long. * @param timestampInNs the time of the message in nanoseconds. * @param position the position in the Aeron stream at the end of the message. * @param messageInfo additional information about the message. @@ -56,7 +56,8 @@ Action onMessage( /** * This session has timed out on this library. It is still connected, but will * be managed by the gateway. - * @param libraryId the id of library which the session used to owned by. + * + * @param libraryId the id of library which the session used to owned by. * @param session the session that has timed out. */ void onTimeout(int libraryId, Session session); @@ -67,7 +68,7 @@ Action onMessage( * the wiki * for details on what a slow consumer is. * - * @param libraryId the id of library which the session used to owned by. + * @param libraryId the id of library which the session used to owned by. * @param session the session that has become slow. * @param hasBecomeSlow true iff the session has been detected as slow, false if it is no longer slow. */ @@ -84,7 +85,7 @@ Action onMessage( Action onDisconnect(int libraryId, Session session, DisconnectReason reason); /** - * Invoked When a client resets a session to the initial sequence number via a logon whilst still connected. + * Invoked when a client resets a session to the initial sequence number via a logon whilst still connected. * * @param session The session that has just started. */ diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/protocol/GatewayPublication.java b/artio-core/src/main/java/uk/co/real_logic/artio/protocol/GatewayPublication.java index 8fb9b1a60c..f2e868f165 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/protocol/GatewayPublication.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/protocol/GatewayPublication.java @@ -27,8 +27,10 @@ import org.agrona.concurrent.status.AtomicCounter; import uk.co.real_logic.artio.DebugLogger; import uk.co.real_logic.artio.dictionary.FixDictionary; +import uk.co.real_logic.artio.engine.ConnectedSessionInfo; import uk.co.real_logic.artio.engine.RecordingCoordinator; import uk.co.real_logic.artio.engine.SessionInfo; +import uk.co.real_logic.artio.engine.framer.LibraryInfo; import uk.co.real_logic.artio.messages.*; import uk.co.real_logic.artio.messages.ControlNotificationEncoder.DisconnectedSessionsEncoder; import uk.co.real_logic.artio.messages.ControlNotificationEncoder.SessionsEncoder; @@ -36,12 +38,11 @@ import java.util.List; -import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT; +import static io.aeron.logbuffer.LogBufferDescriptor.computeFragmentedFrameLength; import static io.aeron.protocol.DataHeaderFlyweight.BEGIN_FLAG; import static io.aeron.protocol.DataHeaderFlyweight.END_FLAG; import static java.nio.ByteOrder.LITTLE_ENDIAN; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.agrona.BitUtil.align; import static uk.co.real_logic.artio.DebugLogger.*; import static uk.co.real_logic.artio.LogTag.*; import static uk.co.real_logic.artio.messages.ErrorDecoder.messageHeaderLength; @@ -125,6 +126,8 @@ public class GatewayPublication extends ClaimablePublication private static final int THROTTLE_CONFIGURATION_REPLY_LENGTH = HEADER_LENGTH + ThrottleConfigurationReplyEncoder.BLOCK_LENGTH; private static final int SEQ_INDEX_SYNC_LENGTH = HEADER_LENGTH + SeqIndexSyncEncoder.BLOCK_LENGTH; + private static final int LIBRARY_TIMEOUT_LENGTH = HEADER_LENGTH + LibraryTimeoutEncoder.BLOCK_LENGTH + + GroupSizeEncodingEncoder.ENCODED_LENGTH; private static final boolean APPLICATION_HEARTBEAT_ATTEMPT_ENABLED = isEnabled(APPLICATION_HEARTBEAT_ATTEMPT); private static final boolean APPLICATION_HEARTBEAT_ENABLED = isEnabled(APPLICATION_HEARTBEAT); @@ -307,12 +310,7 @@ public long saveMessage( if (fragmented) { // Add a padding message at the end of the term buffer if needed. - final int length = framedLength; - final int numMaxPayloads = length / maxPayloadLength; - final int remainingPayload = length % maxPayloadLength; - final int lastFrameLength = remainingPayload > 0 ? - align(remainingPayload + HEADER_LENGTH, FRAME_ALIGNMENT) : 0; - final int requiredLength = (numMaxPayloads * (maxPayloadLength + HEADER_LENGTH)) + lastFrameLength; + final int requiredLength = computeFragmentedFrameLength(framedLength, maxPayloadLength); final int termLength = dataPublication.termBufferLength(); final int termOffset = dataPublication.termOffset(); final int resultingOffset = termOffset + requiredLength; @@ -966,25 +964,33 @@ public long saveRequestSessionReply(final int libraryId, final SessionReplyStatu return position; } - public long saveLibraryTimeout(final int libraryId, final long connectCorrelationId) + public long saveLibraryTimeout(final LibraryInfo libraryInfo, final long connectCorrelationId) { - final long position = claim(LibraryTimeoutEncoder.BLOCK_LENGTH + HEADER_LENGTH); - if (position < 0) - { - return position; - } + final List connectedSessionInfos = libraryInfo.sessions(); + final int sessionsCount = connectedSessionInfos.size(); - final MutableDirectBuffer buffer = bufferClaim.buffer(); - final int offset = bufferClaim.offset(); + final int framedLength = LIBRARY_TIMEOUT_LENGTH + sessionsCount * + LibraryTimeoutEncoder.SessionsEncoder.sbeBlockLength(); + final ExpandableArrayBuffer buffer = buffer(framedLength); libraryTimeout - .wrapAndApplyHeader(buffer, offset, header) - .libraryId(libraryId) + .wrapAndApplyHeader(buffer, 0, header) + .libraryId(libraryInfo.libraryId()) .connectCorrelationId(connectCorrelationId); - bufferClaim.commit(); + final LibraryTimeoutEncoder.SessionsEncoder sessionsEncoder = libraryTimeout.sessionsCount(sessionsCount); + for (int i = 0; i < sessionsCount; i++) + { + final SessionInfo session = connectedSessionInfos.get(i); + sessionsEncoder.next().sessionId(session.sessionId()); + } - logSbeMessage(GATEWAY_MESSAGE, libraryTimeout); + final long position = dataPublication.offer(buffer, 0, framedLength); + + if (position > 0) + { + logSbeMessage(GATEWAY_MESSAGE, libraryTimeout); + } return position; } diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/InternalSession.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/InternalSession.java index c6cb5f01e1..413dc5793d 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/session/InternalSession.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/InternalSession.java @@ -254,6 +254,11 @@ public void linkTo(final SessionWriter sessionWriter) sessionWriter.linkTo(this); } + public static void disconnectWriter(final SessionWriter writer) + { + writer.onDisconnect(); + } + public static void closeWriter(final SessionWriter writer) { writer.close(); diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionWriter.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionWriter.java index 7ee87c822a..d1752b9508 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionWriter.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionWriter.java @@ -17,12 +17,15 @@ import org.agrona.DirectBuffer; import uk.co.real_logic.artio.CommonConfiguration; +import uk.co.real_logic.artio.DebugLogger; import uk.co.real_logic.artio.builder.Encoder; import uk.co.real_logic.artio.dictionary.SessionConstants; import uk.co.real_logic.artio.messages.DisconnectReason; import uk.co.real_logic.artio.protocol.GatewayPublication; import uk.co.real_logic.artio.util.MutableAsciiBuffer; +import static uk.co.real_logic.artio.GatewayProcess.NO_CONNECTION_ID; +import static uk.co.real_logic.artio.LogTag.FIX_MESSAGE; import static uk.co.real_logic.artio.messages.MessageStatus.OK; /** @@ -82,7 +85,7 @@ public void sequenceIndex(final int sequenceIndex) } /** - * Sets the current sequence index. + * Gets the current sequence index. * * @return the current sequence index */ @@ -157,6 +160,8 @@ public long send( if (position > 0) { + DebugLogger.logFixMessage(FIX_MESSAGE, messageType, "Sent ", messageBuffer, offset, length); + final InternalSession session = this.session; if (session != null) { @@ -207,6 +212,11 @@ void linkTo(final InternalSession session) this.connectionId = session.connectionId(); } + void onDisconnect() + { + connectionId = NO_CONNECTION_ID; + } + void checkState() { if (closed) diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/engine/PossDupFinderTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/engine/PossDupFinderTest.java index 08e167d92b..d4c8711f38 100644 --- a/artio-core/src/test/java/uk/co/real_logic/artio/engine/PossDupFinderTest.java +++ b/artio-core/src/test/java/uk/co/real_logic/artio/engine/PossDupFinderTest.java @@ -13,11 +13,11 @@ public class PossDupFinderTest { private static final byte[] FIRST_MESSAGE = ("8=FIX.4.4\0019=0065\00135=5\00149=initiator\00156=acceptor\00134=2\001" + - "52=20161206-11:04:51.461\00110=088\001").getBytes(US_ASCII); + "52=20161206-11:04:51.461\00110=088\001").getBytes(US_ASCII); private static final byte[] SECOND_MESSAGE = ("8=FIX.4.4\0019=0065\00135=5\00149=initiator\00156=acceptor\00134=2\001" + - "52=20161206-11:04:51.461\00143=Y\00110=088\001").getBytes(US_ASCII); + "52=20161206-11:04:51.461\00143=Y\00110=088\001").getBytes(US_ASCII); private final PossDupFinder possDupFinder = new PossDupFinder(); private final OtfParser parser = new OtfParser(possDupFinder, new LongDictionary()); diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/engine/framer/FramerTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/engine/framer/FramerTest.java index 103539100d..cd79161ce2 100644 --- a/artio-core/src/test/java/uk/co/real_logic/artio/engine/framer/FramerTest.java +++ b/artio-core/src/test/java/uk/co/real_logic/artio/engine/framer/FramerTest.java @@ -37,6 +37,7 @@ import org.mockito.verification.VerificationMode; import uk.co.real_logic.artio.CloseChecker; import uk.co.real_logic.artio.FixCounters; +import uk.co.real_logic.artio.LivenessDetector; import uk.co.real_logic.artio.Timing; import uk.co.real_logic.artio.dictionary.FixDictionary; import uk.co.real_logic.artio.engine.*; @@ -117,7 +118,6 @@ public class FramerTest private final InternalSession session = mock(InternalSession.class); private final Subscription outboundLibrarySubscription = mock(Subscription.class); private final Image replayImage = mock(Image.class); - private final Image peekImage = mock(Image.class); private final Image normalImage = mock(Image.class); private final CompositeKey sessionKey = SessionIdStrategy .senderAndTarget() @@ -140,6 +140,12 @@ public class FramerTest private final MutableLong connectionId = new MutableLong(NO_CONNECTION_ID); private final ErrorHandler errorHandler = mock(ErrorHandler.class); + private final LivenessDetector livenessDetector = mock(LivenessDetector.class); + + private final LiveLibraryInfo libraryInfo = new LiveLibraryInfo( + errorHandler, + LIBRARY_ID, LIBRARY_NAME, livenessDetector, 1, + false); @Before @SuppressWarnings("unchecked") @@ -1004,7 +1010,7 @@ private void verifyEndpointsCreated() private void verifyLibraryTimeout() { - verify(inboundPublication).saveLibraryTimeout(LIBRARY_ID, 0); + verify(inboundPublication).saveLibraryTimeout(libraryInfo, 0); } private void libraryHasAcceptedClient() throws IOException diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayerTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayerTest.java index 6c3874020d..39506e9deb 100644 --- a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayerTest.java +++ b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayerTest.java @@ -79,7 +79,7 @@ public class ReplayerTest extends AbstractLogTest public static final byte[] MESSAGE_REQUIRING_LONGER_BODY_LENGTH = ("8=FIX.4.4\0019=99\00135=1\00134=1\00149=LEH_LZJ02\00152=" + ORIGINAL_SENDING_TIME + "\00156=CCG\001" + - "112=a12345678910123456789101234567891012345\00110=005\001").getBytes(US_ASCII); + "112=a12345678910123456789101234567891012345\00110=005\001").getBytes(US_ASCII); private static final int MAX_CLAIM_ATTEMPTS = 100; private static final long CORRELATION_ID = 2; diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/protocol/GatewayPublicationTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/protocol/GatewayPublicationTest.java new file mode 100644 index 0000000000..1082e3cd49 --- /dev/null +++ b/artio-core/src/test/java/uk/co/real_logic/artio/protocol/GatewayPublicationTest.java @@ -0,0 +1,199 @@ +package uk.co.real_logic.artio.protocol; + +import io.aeron.*; +import io.aeron.driver.MediaDriver; +import io.aeron.logbuffer.ControlledFragmentHandler; +import io.aeron.logbuffer.FragmentHandler; +import io.aeron.logbuffer.Header; +import io.aeron.protocol.DataHeaderFlyweight; +import org.agrona.DirectBuffer; +import org.agrona.collections.MutableLong; +import org.agrona.concurrent.NoOpIdleStrategy; +import org.agrona.concurrent.SystemEpochNanoClock; +import org.agrona.concurrent.UnsafeBuffer; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import uk.co.real_logic.artio.messages.DisconnectReason; +import uk.co.real_logic.artio.messages.MessageStatus; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.IntStream; + +import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT; +import static org.junit.jupiter.api.Assertions.*; +import static uk.co.real_logic.artio.TestFixtures.mediaDriverContext; +import static uk.co.real_logic.artio.protocol.GatewayPublication.FRAMED_MESSAGE_SIZE; + +class GatewayPublicationTest +{ + private static final int MAX_UNFRAGMENTED_BODY_LENGTH = 1305; + + static IntStream bodyLengthRange() + { + return IntStream.rangeClosed( + MAX_UNFRAGMENTED_BODY_LENGTH, + MAX_UNFRAGMENTED_BODY_LENGTH + DataHeaderFlyweight.HEADER_LENGTH + ); + } + + @ParameterizedTest + @MethodSource("bodyLengthRange") + void testSavingMessagesOverTermBoundary(final int bodyLength) + { + final int termBufferLength = 64 * 1024; + try ( + MediaDriver driver = MediaDriver.launch(mediaDriverContext(termBufferLength, true)); + Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(driver.aeronDirectoryName()))) + { + final String channel = CommonContext.IPC_CHANNEL; + final int streamId = 1000; + + final Subscription subscription = aeron.addSubscription(channel, streamId); + final ExclusivePublication publication = aeron.addExclusivePublication(channel, streamId); + final Counter fails = aeron.addCounter(1001, "fails"); + + // first one won't be fragmented, but subsequent ones will + assertEquals(MAX_UNFRAGMENTED_BODY_LENGTH, publication.maxPayloadLength() - FRAMED_MESSAGE_SIZE); + + final GatewayPublication gatewayPublication = new GatewayPublication( + publication, + fails, + NoOpIdleStrategy.INSTANCE, + new SystemEpochNanoClock(), + 5 + ); + + // leave enough space in the term for just over one full frame + final int startingPosition = (termBufferLength - driver.context().ipcMtuLength() - 1) & + -DataHeaderFlyweight.HEADER_LENGTH; // align down to min frame length + advanceToPosition(startingPosition, publication, subscription); + + final byte[] body = new byte[bodyLength]; + ThreadLocalRandom.current().nextBytes(body); + final DirectBuffer srcBuffer = new UnsafeBuffer(body); + + int attempt = 1; + do + { + final long result = gatewayPublication.saveMessage( + srcBuffer, + 0, + body.length, + 5000, + 68, + 1, + 0, + 1234, + MessageStatus.OK, + 42 + ); + if (result > 0) + { + break; + } + if (attempt >= 2) + { + fail("failed to save message: " + result); + } + attempt++; + } + while (true); + + final MessageCapturingProtocolHandler protocolHandler = new MessageCapturingProtocolHandler(); + final ProtocolSubscription protocolSubscription = ProtocolSubscription.of(protocolHandler); + final ControlledFragmentHandler fragmentHandler = new ControlledFragmentAssembler(protocolSubscription); + + subscription.controlledPoll(fragmentHandler, 5); + subscription.controlledPoll(fragmentHandler, 5); + + final CapturedMessage capturedMessage = protocolHandler.capturedMessages.get(0); + assertArrayEquals(body, capturedMessage.body()); + assertEquals(68, capturedMessage.messageType()); + assertEquals(42, capturedMessage.sequenceNumber()); + } + } + + private void advanceToPosition( + final long position, + final ExclusivePublication publication, + final Subscription subscription) + { + if (position % FRAME_ALIGNMENT != 0) + { + fail("position is not frame aligned: " + position); + } + + long lastPubPos = 0; + final MutableLong lastSubPos = new MutableLong(); + final FragmentHandler fragmentHandler = (buffer1, offset, length, header) -> lastSubPos.set(header.position()); + final DirectBuffer buffer = new UnsafeBuffer(); + + while (lastPubPos < position || lastSubPos.get() < position) + { + if (lastPubPos < position) + { + final long result = publication.offer(buffer); + if (result > 0) + { + lastPubPos = result; + } + } + + subscription.poll(fragmentHandler, 5); + } + } + + private record CapturedMessage(byte[] body, long messageType, int sequenceNumber) + { + } + + private static final class MessageCapturingProtocolHandler implements ProtocolHandler + { + private final List capturedMessages = new ArrayList<>(); + + public ControlledFragmentHandler.Action onMessage( + final DirectBuffer buffer, + final int offset, + final int length, + final int libraryId, + final long connectionId, + final long sessionId, + final int sequenceIndex, + final long messageType, + final long timestamp, + final MessageStatus status, + final int sequenceNumber, + final Header header, + final int metaDataLength) + { + final byte[] body = new byte[length]; + buffer.getBytes(offset, body); + + capturedMessages.add(new CapturedMessage( + body, + messageType, + sequenceNumber + )); + + return ControlledFragmentHandler.Action.CONTINUE; + } + + public ControlledFragmentHandler.Action onDisconnect( + final int libraryId, + final long connectionId, + final DisconnectReason reason) + { + throw new IllegalStateException(); + } + + public ControlledFragmentHandler.Action onFixPMessage( + final long connectionId, + final DirectBuffer buffer, + final int offset) + { + throw new IllegalStateException(); + } + } +} diff --git a/artio-samples/src/main/java/uk/co/real_logic/artio/client/SampleClient.java b/artio-samples/src/main/java/uk/co/real_logic/artio/client/SampleClient.java index 7dd6bfd93f..5732334385 100644 --- a/artio-samples/src/main/java/uk/co/real_logic/artio/client/SampleClient.java +++ b/artio-samples/src/main/java/uk/co/real_logic/artio/client/SampleClient.java @@ -74,6 +74,7 @@ public static void main(final String[] args) archiveContext .controlChannel(CONTROL_REQUEST_CHANNEL) + .replicationChannel(REPLICATION_CHANNEL) .recordingEventsChannel(RECORDING_EVENTS_CHANNEL); try (ArchivingMediaDriver driver = ArchivingMediaDriver.launch(context, archiveContext)) @@ -153,4 +154,5 @@ private static SessionHandler onConnect(final Session session) private static final String CONTROL_REQUEST_CHANNEL = "aeron:udp?endpoint=localhost:7010"; private static final String CONTROL_RESPONSE_CHANNEL = "aeron:udp?endpoint=localhost:7020"; private static final String RECORDING_EVENTS_CHANNEL = "aeron:udp?control-mode=dynamic|control=localhost:7030"; + private static final String REPLICATION_CHANNEL = "aeron:udp?endpoint=localhost:0"; } diff --git a/artio-samples/src/main/java/uk/co/real_logic/artio/example_buyer/BuyerAgent.java b/artio-samples/src/main/java/uk/co/real_logic/artio/example_buyer/BuyerAgent.java index 38e300d0cc..a537e35a00 100644 --- a/artio-samples/src/main/java/uk/co/real_logic/artio/example_buyer/BuyerAgent.java +++ b/artio-samples/src/main/java/uk/co/real_logic/artio/example_buyer/BuyerAgent.java @@ -7,8 +7,11 @@ import uk.co.real_logic.artio.library.FixLibrary; import uk.co.real_logic.artio.library.LibraryConfiguration; +import java.io.File; + import static io.aeron.CommonContext.IPC_CHANNEL; import static java.util.Collections.singletonList; +import static uk.co.real_logic.artio.CommonConfiguration.optimalTmpDirName; import static uk.co.real_logic.artio.example_buyer.BuyerApplication.AERON_DIRECTORY_NAME; import static uk.co.real_logic.artio.example_buyer.BuyerApplication.RECORDING_EVENTS_CHANNEL; import static uk.co.real_logic.artio.example_exchange.ExchangeApplication.cleanupOldLogFileDir; @@ -23,7 +26,9 @@ public class BuyerAgent implements Agent public void onStart() { final EngineConfiguration engineConfiguration = new EngineConfiguration() - .libraryAeronChannel(IPC_CHANNEL); + .libraryAeronChannel(IPC_CHANNEL) + .monitoringFile(optimalTmpDirName() + File.separator + "fix-buyer" + File.separator + "engineCounters") + .logFileDir("buyer-logs"); engineConfiguration .aeronContext() diff --git a/artio-samples/src/main/java/uk/co/real_logic/artio/example_buyer/BuyerApplication.java b/artio-samples/src/main/java/uk/co/real_logic/artio/example_buyer/BuyerApplication.java index 20b5deb96a..279e4843ad 100644 --- a/artio-samples/src/main/java/uk/co/real_logic/artio/example_buyer/BuyerApplication.java +++ b/artio-samples/src/main/java/uk/co/real_logic/artio/example_buyer/BuyerApplication.java @@ -21,6 +21,7 @@ import io.aeron.driver.MediaDriver; import uk.co.real_logic.artio.SampleUtil; +import static io.aeron.archive.Archive.Configuration.REPLICATION_CHANNEL_PROP_NAME; import static io.aeron.archive.client.AeronArchive.Configuration.CONTROL_CHANNEL_PROP_NAME; import static io.aeron.archive.client.AeronArchive.Configuration.CONTROL_RESPONSE_CHANNEL_PROP_NAME; import static io.aeron.driver.ThreadingMode.SHARED; @@ -37,6 +38,7 @@ public static void main(final String[] args) throws InterruptedException { System.setProperty(CONTROL_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:9010"); System.setProperty(CONTROL_RESPONSE_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:9020"); + System.setProperty(REPLICATION_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:0"); final MediaDriver.Context context = new MediaDriver.Context() .threadingMode(SHARED) diff --git a/artio-samples/src/main/java/uk/co/real_logic/artio/example_exchange/ExchangeApplication.java b/artio-samples/src/main/java/uk/co/real_logic/artio/example_exchange/ExchangeApplication.java index f421d98e42..8cd70c0d53 100644 --- a/artio-samples/src/main/java/uk/co/real_logic/artio/example_exchange/ExchangeApplication.java +++ b/artio-samples/src/main/java/uk/co/real_logic/artio/example_exchange/ExchangeApplication.java @@ -31,6 +31,9 @@ import java.util.Collections; import static io.aeron.CommonContext.IPC_CHANNEL; +import static io.aeron.archive.Archive.Configuration.REPLICATION_CHANNEL_PROP_NAME; +import static io.aeron.archive.client.AeronArchive.Configuration.CONTROL_CHANNEL_PROP_NAME; +import static io.aeron.archive.client.AeronArchive.Configuration.CONTROL_RESPONSE_CHANNEL_PROP_NAME; import static io.aeron.driver.ThreadingMode.SHARED; import static uk.co.real_logic.artio.CommonConfiguration.backoffIdleStrategy; @@ -41,6 +44,10 @@ public final class ExchangeApplication public static void main(final String[] args) throws Exception { + System.setProperty(CONTROL_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:10010"); + System.setProperty(CONTROL_RESPONSE_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:10020"); + System.setProperty(REPLICATION_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:0"); + final MessageValidationStrategy validationStrategy = MessageValidationStrategy.targetCompId(ACCEPTOR_COMP_ID) .and(MessageValidationStrategy.senderCompId(Collections.singletonList(INITIATOR_COMP_ID))); diff --git a/artio-samples/src/main/java/uk/co/real_logic/artio/example_fixp_exchange/FixPExchangeApplication.java b/artio-samples/src/main/java/uk/co/real_logic/artio/example_fixp_exchange/FixPExchangeApplication.java index 0832aa7c8e..500709566e 100644 --- a/artio-samples/src/main/java/uk/co/real_logic/artio/example_fixp_exchange/FixPExchangeApplication.java +++ b/artio-samples/src/main/java/uk/co/real_logic/artio/example_fixp_exchange/FixPExchangeApplication.java @@ -29,6 +29,9 @@ import java.io.File; import static io.aeron.CommonContext.IPC_CHANNEL; +import static io.aeron.archive.Archive.Configuration.REPLICATION_CHANNEL_PROP_NAME; +import static io.aeron.archive.client.AeronArchive.Configuration.CONTROL_CHANNEL_PROP_NAME; +import static io.aeron.archive.client.AeronArchive.Configuration.CONTROL_RESPONSE_CHANNEL_PROP_NAME; import static io.aeron.driver.ThreadingMode.SHARED; import static uk.co.real_logic.artio.CommonConfiguration.backoffIdleStrategy; @@ -40,6 +43,10 @@ public final class FixPExchangeApplication { public static void main(final String[] args) throws Exception { + System.setProperty(CONTROL_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:10010"); + System.setProperty(CONTROL_RESPONSE_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:10020"); + System.setProperty(REPLICATION_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:0"); + // Static configuration lasts the duration of a FIX-Gateway instance final EngineConfiguration configuration = new EngineConfiguration() .bindTo("localhost", 9999) diff --git a/artio-samples/src/main/java/uk/co/real_logic/artio/server/SampleServer.java b/artio-samples/src/main/java/uk/co/real_logic/artio/server/SampleServer.java index f96893fc82..3fe5adbe4c 100644 --- a/artio-samples/src/main/java/uk/co/real_logic/artio/server/SampleServer.java +++ b/artio-samples/src/main/java/uk/co/real_logic/artio/server/SampleServer.java @@ -63,6 +63,10 @@ public static void main(final String[] args) .libraryAeronChannel(aeronChannel); configuration.authenticationStrategy(authenticationStrategy); + configuration.aeronArchiveContext() + .controlRequestChannel(CONTROL_REQUEST_CHANNEL) + .controlResponseChannel(CONTROL_RESPONSE_CHANNEL); + cleanupOldLogFileDir(configuration); final Context context = new Context() @@ -71,6 +75,8 @@ public static void main(final String[] args) final Archive.Context archiveContext = new Archive.Context() .threadingMode(ArchiveThreadingMode.SHARED) + .controlChannel(CONTROL_REQUEST_CHANNEL) + .replicationChannel(REPLICATION_CHANNEL) .deleteArchiveOnStart(true); try (ArchivingMediaDriver driver = ArchivingMediaDriver.launch(context, archiveContext); @@ -121,4 +127,8 @@ private static SessionHandler onConnect(final Session session) return new SampleSessionHandler(session); } + + private static final String CONTROL_REQUEST_CHANNEL = "aeron:udp?endpoint=localhost:8010"; + private static final String CONTROL_RESPONSE_CHANNEL = "aeron:udp?endpoint=localhost:8020"; + private static final String REPLICATION_CHANNEL = "aeron:udp?endpoint=localhost:0"; } diff --git a/artio-samples/src/main/java/uk/co/real_logic/artio/stress/ConcurrentConnections.java b/artio-samples/src/main/java/uk/co/real_logic/artio/stress/ConcurrentConnections.java index 31ba377805..216a0757d5 100644 --- a/artio-samples/src/main/java/uk/co/real_logic/artio/stress/ConcurrentConnections.java +++ b/artio-samples/src/main/java/uk/co/real_logic/artio/stress/ConcurrentConnections.java @@ -52,6 +52,10 @@ public static void main(final String[] args) throws Exception .logFileDir("stress-client-logs"); engineConfiguration.authenticationStrategy((logon) -> true); + engineConfiguration.aeronArchiveContext() + .controlRequestChannel(StressConfiguration.CONTROL_REQUEST_CHANNEL) + .controlResponseChannel(StressConfiguration.CLIENT_CONTROL_RESPONSE_CHANNEL); + System.out.println("Client Logs at " + engineConfiguration.logFileDir()); StressUtil.cleanupOldLogFileDir(engineConfiguration); diff --git a/artio-samples/src/main/java/uk/co/real_logic/artio/stress/SerialConnections.java b/artio-samples/src/main/java/uk/co/real_logic/artio/stress/SerialConnections.java index 20ac5833a7..f0839e8546 100644 --- a/artio-samples/src/main/java/uk/co/real_logic/artio/stress/SerialConnections.java +++ b/artio-samples/src/main/java/uk/co/real_logic/artio/stress/SerialConnections.java @@ -33,6 +33,7 @@ import static org.agrona.SystemUtil.loadPropertiesFiles; import static uk.co.real_logic.artio.messages.SessionState.DISCONNECTED; import static uk.co.real_logic.artio.stress.StressConfiguration.*; +import static uk.co.real_logic.artio.validation.SessionPersistenceStrategy.alwaysPersistent; public final class SerialConnections { @@ -50,7 +51,11 @@ public static void main(final String[] args) final EngineConfiguration engineConfiguration = new EngineConfiguration() .libraryAeronChannel(aeronChannel) .logFileDir("stress-client-logs") - .bindTo("localhost", 10001); + .sessionPersistenceStrategy(alwaysPersistent()); + + engineConfiguration.aeronArchiveContext() + .controlRequestChannel(StressConfiguration.CONTROL_REQUEST_CHANNEL) + .controlResponseChannel(StressConfiguration.CLIENT_CONTROL_RESPONSE_CHANNEL); System.out.println("Client Logs at " + engineConfiguration.logFileDir()); @@ -73,6 +78,7 @@ public static void main(final String[] args) .address("localhost", StressConfiguration.PORT) .targetCompId(ACCEPTOR_ID) .senderCompId(INITIATOR_ID) + .sequenceNumbersPersistent(true) .build(); final LibraryConfiguration libraryConfiguration = new LibraryConfiguration() @@ -102,7 +108,7 @@ public static void main(final String[] args) } final Session session = reply.resultIfPresent(); - while (session.isActive()) + while (!session.isActive()) { idleStrategy.idle(library.poll(1)); } diff --git a/artio-samples/src/main/java/uk/co/real_logic/artio/stress/Server.java b/artio-samples/src/main/java/uk/co/real_logic/artio/stress/Server.java index f5ed505476..6d14b09280 100644 --- a/artio-samples/src/main/java/uk/co/real_logic/artio/stress/Server.java +++ b/artio-samples/src/main/java/uk/co/real_logic/artio/stress/Server.java @@ -55,6 +55,10 @@ public Server() .authenticationStrategy(authenticationStrategy) .agentNamePrefix("server-"); + configuration.aeronArchiveContext() + .controlRequestChannel(StressConfiguration.CONTROL_REQUEST_CHANNEL) + .controlResponseChannel(StressConfiguration.SERVER_CONTROL_RESPONSE_CHANNEL); + System.out.println("Server Logs at " + configuration.logFileDir()); StressUtil.cleanupOldLogFileDir(configuration); @@ -65,6 +69,8 @@ public Server() final Archive.Context archiveContext = new Archive.Context() .threadingMode(ArchiveThreadingMode.SHARED) + .controlChannel(StressConfiguration.CONTROL_REQUEST_CHANNEL) + .replicationChannel(StressConfiguration.REPLICATION_CHANNEL) .deleteArchiveOnStart(true); mediaDriver = ArchivingMediaDriver.launch(context, archiveContext); diff --git a/artio-samples/src/main/java/uk/co/real_logic/artio/stress/SoleEngine.java b/artio-samples/src/main/java/uk/co/real_logic/artio/stress/SoleEngine.java index 805bb56f2c..1bea2a4fb8 100644 --- a/artio-samples/src/main/java/uk/co/real_logic/artio/stress/SoleEngine.java +++ b/artio-samples/src/main/java/uk/co/real_logic/artio/stress/SoleEngine.java @@ -18,12 +18,18 @@ import uk.co.real_logic.artio.engine.EngineConfiguration; import uk.co.real_logic.artio.engine.FixEngine; +import static io.aeron.archive.client.AeronArchive.Configuration.CONTROL_CHANNEL_PROP_NAME; +import static io.aeron.archive.client.AeronArchive.Configuration.CONTROL_RESPONSE_CHANNEL_PROP_NAME; + public final class SoleEngine { static final String AERON_CHANNEL = "aeron:udp?endpoint=localhost:10000"; public static void main(final String[] args) { + System.setProperty(CONTROL_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:8010"); + System.setProperty(CONTROL_RESPONSE_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:8020"); + final EngineConfiguration configuration = new EngineConfiguration() .bindTo("localhost", StressConfiguration.PORT) .logFileDir("stress-server-logs") diff --git a/artio-samples/src/main/java/uk/co/real_logic/artio/stress/SoleMediaDriver.java b/artio-samples/src/main/java/uk/co/real_logic/artio/stress/SoleMediaDriver.java index c6d2f9431a..c1394f0051 100644 --- a/artio-samples/src/main/java/uk/co/real_logic/artio/stress/SoleMediaDriver.java +++ b/artio-samples/src/main/java/uk/co/real_logic/artio/stress/SoleMediaDriver.java @@ -20,12 +20,19 @@ import io.aeron.archive.ArchivingMediaDriver; import io.aeron.driver.MediaDriver; +import static io.aeron.archive.Archive.Configuration.REPLICATION_CHANNEL_PROP_NAME; +import static io.aeron.archive.client.AeronArchive.Configuration.CONTROL_CHANNEL_PROP_NAME; +import static io.aeron.archive.client.AeronArchive.Configuration.CONTROL_RESPONSE_CHANNEL_PROP_NAME; import static io.aeron.driver.ThreadingMode.SHARED; public final class SoleMediaDriver { public static void main(final String[] args) { + System.setProperty(CONTROL_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:8010"); + System.setProperty(CONTROL_RESPONSE_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:8020"); + System.setProperty(REPLICATION_CHANNEL_PROP_NAME, "aeron:udp?endpoint=localhost:0"); + final MediaDriver.Context context = new MediaDriver.Context() .threadingMode(SHARED) .dirDeleteOnStart(true); diff --git a/artio-samples/src/main/java/uk/co/real_logic/artio/stress/StressConfiguration.java b/artio-samples/src/main/java/uk/co/real_logic/artio/stress/StressConfiguration.java index 72a0fa39dc..968bbb30e1 100644 --- a/artio-samples/src/main/java/uk/co/real_logic/artio/stress/StressConfiguration.java +++ b/artio-samples/src/main/java/uk/co/real_logic/artio/stress/StressConfiguration.java @@ -35,4 +35,8 @@ static boolean printFailedSpints(final long failCount) return FAILED_SPINS_PRINT != DO_NOT_PRINT && failCount > FAILED_SPINS_PRINT; } + public static final String CONTROL_REQUEST_CHANNEL = "aeron:udp?endpoint=localhost:8010"; + public static final String SERVER_CONTROL_RESPONSE_CHANNEL = "aeron:udp?endpoint=localhost:8020"; + public static final String CLIENT_CONTROL_RESPONSE_CHANNEL = "aeron:udp?endpoint=localhost:8030"; + public static final String REPLICATION_CHANNEL = "aeron:udp?endpoint=localhost:0"; } diff --git a/artio-system-tests/src/perf/java/uk/co/real_logic/artio/system_benchmarks/FixBenchmarkServer.java b/artio-system-tests/src/perf/java/uk/co/real_logic/artio/system_benchmarks/FixBenchmarkServer.java index d638ed395c..a270ebcc18 100644 --- a/artio-system-tests/src/perf/java/uk/co/real_logic/artio/system_benchmarks/FixBenchmarkServer.java +++ b/artio-system-tests/src/perf/java/uk/co/real_logic/artio/system_benchmarks/FixBenchmarkServer.java @@ -125,7 +125,7 @@ public void onDisconnect(final FixLibrary library) }); } - private static class BenchmarkAuthenticationStrategy implements AuthenticationStrategy + private static final class BenchmarkAuthenticationStrategy implements AuthenticationStrategy { private static final byte[] INVALID_PASSWORD = "Invalid Password".getBytes(StandardCharsets.US_ASCII); private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/CancelOnDisconnectSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/CancelOnDisconnectSystemTest.java index 3c0d061fa0..3847f5621f 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/CancelOnDisconnectSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/CancelOnDisconnectSystemTest.java @@ -51,6 +51,7 @@ public class CancelOnDisconnectSystemTest extends AbstractGatewayToGatewaySystem public static final int COD_TEST_TIMEOUT_IN_MS = 500; public static final int LONG_COD_TEST_TIMEOUT_IN_MS = RUNNING_ON_WINDOWS ? 3_000 : COD_TEST_TIMEOUT_IN_MS; public static final Class FIX_DICTIONARY_WITHOUT_COD = FixDictionaryImpl.class; + private long now; private final FakeTimeoutHandler timeoutHandler = new FakeTimeoutHandler(); @@ -119,9 +120,10 @@ public void shouldTriggerCancelOnDisconnectTimeoutForLogout() launch(); setup(CANCEL_ON_LOGOUT_ONLY.representation(), COD_TEST_TIMEOUT_IN_MS); + now = nanoClock.nanoTime(); logoutSession(initiatingSession); - assertTriggersCancelOnDisconnect(CANCEL_ON_LOGOUT_ONLY); + assertTriggersCancelOnDisconnect(CANCEL_ON_LOGOUT_ONLY, now); } @Test(timeout = TEST_TIMEOUT_IN_MS) @@ -130,9 +132,10 @@ public void shouldTriggerCancelOnDisconnectTimeoutForDisconnect() launch(); setup(CANCEL_ON_DISCONNECT_ONLY.representation(), COD_TEST_TIMEOUT_IN_MS); + now = nanoClock.nanoTime(); testSystem.awaitRequestDisconnect(initiatingSession); - assertTriggersCancelOnDisconnect(CANCEL_ON_DISCONNECT_ONLY); + assertTriggersCancelOnDisconnect(CANCEL_ON_DISCONNECT_ONLY, now); } @Test(timeout = TEST_TIMEOUT_IN_MS) @@ -143,9 +146,10 @@ public void shouldTriggerCancelOnDisconnectTimeoutForLogoutLibrary() acquireAcceptingSession(); + now = nanoClock.nanoTime(); logoutSession(initiatingSession); - assertTriggersCancelOnDisconnect(CANCEL_ON_LOGOUT_ONLY); + assertTriggersCancelOnDisconnect(CANCEL_ON_LOGOUT_ONLY, now); } @Test(timeout = TEST_TIMEOUT_IN_MS) @@ -154,9 +158,10 @@ public void shouldTriggerCancelOnDisconnectTimeoutForDisconnectLibrary() launch(); setup(CANCEL_ON_DISCONNECT_ONLY.representation(), COD_TEST_TIMEOUT_IN_MS); + now = nanoClock.nanoTime(); testSystem.awaitRequestDisconnect(initiatingSession); - assertTriggersCancelOnDisconnect(CANCEL_ON_DISCONNECT_ONLY); + assertTriggersCancelOnDisconnect(CANCEL_ON_DISCONNECT_ONLY, now); } @Test(timeout = TEST_TIMEOUT_IN_MS) @@ -192,6 +197,7 @@ public void shouldTriggerCancelOnDisconnectTimeoutWithNoLogonOptionsButServerOpt acquireAcceptingSession(); + now = nanoClock.nanoTime(); disconnectSession(initiatingSession); assertTriggersCancelOnDisconnectFromDefaults(CancelOnDisconnectOption.CANCEL_ON_DISCONNECT_ONLY, 0); @@ -206,6 +212,7 @@ public void shouldTriggerCancelOnDisconnectTimeoutWithNoLogonOptionsButServerOpt acquireAcceptingSession(); + now = nanoClock.nanoTime(); disconnectSession(initiatingSession); assertTriggersCancelOnDisconnectFromDefaults(CancelOnDisconnectOption.CANCEL_ON_DISCONNECT_ONLY, @@ -221,6 +228,7 @@ public void shouldTriggerCancelOnDisconnectTimeoutForLogoutWithServerOption() acquireAcceptingSession(); + now = nanoClock.nanoTime(); logoutSession(initiatingSession); assertTriggersCancelOnDisconnectFromDefaults(CancelOnDisconnectOption.CANCEL_ON_LOGOUT_ONLY, 0); @@ -235,6 +243,7 @@ public void shouldTriggerCancelOnDisconnectTimeoutForLogoutWithServerOptionAndTi acquireAcceptingSession(); + now = nanoClock.nanoTime(); logoutSession(initiatingSession); assertTriggersCancelOnDisconnectFromDefaults(CancelOnDisconnectOption.CANCEL_ON_LOGOUT_ONLY, @@ -249,6 +258,7 @@ public void shouldTriggerCancelOnDisconnectTimeoutForLogoutWithOptionsInsteadOfS acquireAcceptingSession(); + now = nanoClock.nanoTime(); logoutSession(initiatingSession); assertTriggersCancelOnDisconnectFromDefaults(CancelOnDisconnectOption.CANCEL_ON_LOGOUT_ONLY, @@ -292,8 +302,9 @@ public void shouldTriggerCancelOnDisconnectFromGatewayAfterReacquiring() acquireAcceptingSession(); testSystem.awaitCompletedReply(acceptingLibrary.releaseToGateway(acceptingSession, 5_000)); + now = nanoClock.nanoTime(); testSystem.awaitRequestDisconnect(initiatingSession); - assertTriggersCancelOnDisconnect(CANCEL_ON_DISCONNECT_ONLY); + assertTriggersCancelOnDisconnect(CANCEL_ON_DISCONNECT_ONLY, now); } private void assertDisconnectWithHandlerNotInvoked() @@ -321,16 +332,18 @@ private void assertHandlerNotInvoked(final int codTestTimeoutInMs) assertEquals(0, timeoutHandler.invokeCount()); } - private void assertTriggersCancelOnDisconnect(final CancelOnDisconnectType type) + private void assertTriggersCancelOnDisconnect(final CancelOnDisconnectType type, final long initiatorLogoutTime) { - assertTriggersCancelOnDisconnect(type, COD_TEST_TIMEOUT_IN_MS); + assertTriggersCancelOnDisconnect(type, COD_TEST_TIMEOUT_IN_MS, initiatorLogoutTime); } - private void assertTriggersCancelOnDisconnect(final CancelOnDisconnectType type, final int codTestTimeoutInMs) + private void assertTriggersCancelOnDisconnect(final CancelOnDisconnectType type, + final int codTestTimeoutInMs, + final long initiatorLogoutTime) { final long codTimeoutInNs = MILLISECONDS.toNanos(codTestTimeoutInMs); - assertAcceptorCodTriggered(codTimeoutInNs); + assertAcceptorCodTriggered(codTimeoutInNs, initiatorLogoutTime); assertInitiatorCodState(type, codTimeoutInNs, codTestTimeoutInMs); } @@ -354,12 +367,11 @@ private void assertTriggersCancelOnDisconnectFromDefaults(final CancelOnDisconne assertEquals(initiatorOption, initiatingSession.cancelOnDisconnectOption()); assertEquals(initiatorCodTestTimeoutInNs, initiatingSession.cancelOnDisconnectTimeoutWindowInNs()); - assertAcceptorCodTriggered(acceptorCodTimeoutInNs); + assertAcceptorCodTriggered(acceptorCodTimeoutInNs, now); } - private void assertAcceptorCodTriggered(final long codTimeoutInNs) + private void assertAcceptorCodTriggered(final long codTimeoutInNs, final long initiatorLogoutTime) { - final long logoutTimeInNs = nanoClock.nanoTime(); assertSessionDisconnected(initiatingSession); testSystem.await("timeout not triggered", () -> timeoutHandler.result() != null); @@ -368,7 +380,7 @@ private void assertAcceptorCodTriggered(final long codTimeoutInNs) final TimeoutResult result = timeoutHandler.result(); assertEquals(onlySession.sessionId(), result.surrogateId); assertEquals(onlySession.sessionKey(), result.compositeId); - final long timeoutTakenInNs = result.timeInNs - logoutTimeInNs; + final long timeoutTakenInNs = result.timeInNs - initiatorLogoutTime; assertThat(timeoutTakenInNs, greaterThanOrEqualTo(codTimeoutInNs)); assertEquals(1, timeoutHandler.invokeCount()); } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/ExternallyControlledSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/ExternallyControlledSystemTest.java index 3a50389305..3c78e518e6 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/ExternallyControlledSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/ExternallyControlledSystemTest.java @@ -53,6 +53,7 @@ public class ExternallyControlledSystemTest extends AbstractGatewayToGatewaySystemTest { + private boolean awaitsNewOrderSingle = false; private final FakeSessionProxy fakeSessionProxy = new FakeSessionProxy(); private SessionWriter acceptingSessionWriter = null; private final FakeHandler acceptingHandler = new FakeHandler(acceptingOtfAcceptor) @@ -99,6 +100,11 @@ public void shouldRoundTripMessagesViaExternalSystem() assertNotNull(acceptingSessionWriter); + if (awaitsNewOrderSingle) + { + testSystem.awaitMessageOf(initiatingOtfAcceptor, "D"); + } + messagesCanBeExchanged(); assertEquals(1, sessionProxyRequests); @@ -135,6 +141,7 @@ public void shouldBeAbleToContinueProcessingAFollowersSession() fakeSessionProxy.sequenceNumberAdjustment = 1; + awaitsNewOrderSingle = true; shouldRoundTripMessagesViaExternalSystem(); assertEquals(acceptingSession.id(), writerSessionId); diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index a56413918a..a652417deb 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -23,12 +23,11 @@ import uk.co.real_logic.artio.builder.*; import uk.co.real_logic.artio.decoder.*; import uk.co.real_logic.artio.engine.SessionInfo; +import uk.co.real_logic.artio.engine.framer.LibraryInfo; import uk.co.real_logic.artio.library.FixLibrary; -import uk.co.real_logic.artio.messages.DisconnectReason; -import uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner; -import uk.co.real_logic.artio.messages.SessionReplyStatus; -import uk.co.real_logic.artio.messages.ThrottleConfigurationStatus; +import uk.co.real_logic.artio.messages.*; import uk.co.real_logic.artio.session.Session; +import uk.co.real_logic.artio.session.SessionWriter; import uk.co.real_logic.artio.util.MutableAsciiBuffer; import java.io.IOException; @@ -55,8 +54,7 @@ import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.ENGINE; import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY; import static uk.co.real_logic.artio.messages.ThrottleConfigurationStatus.OK; -import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.LONG_TEST_TIMEOUT_IN_MS; -import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.TEST_TIMEOUT_IN_MS; +import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.*; import static uk.co.real_logic.artio.system_tests.FixConnection.BUFFER_SIZE; import static uk.co.real_logic.artio.system_tests.MessageBasedInitiatorSystemTest.assertConnectionDisconnects; import static uk.co.real_logic.artio.system_tests.SystemTestUtil.*; @@ -924,6 +922,72 @@ public void shouldDisconnectConnectionTryingToSendOversizedMessage() throws IOEx } } + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse() + throws IOException + { + setup(false, true); + setupLibrary(); + + final List noSessionContext = engine.allSessions(); + assertEquals(0, noSessionContext.size()); + + final SessionWriter sessionWriter = createFollowerSession( + TEST_TIMEOUT_IN_MS, testSystem, library, INITIATOR_ID, ACCEPTOR_ID); + final SessionReplyStatus requestSessionReply = requestSession(library, sessionWriter.id(), testSystem); + assertEquals(SessionReplyStatus.OK, requestSessionReply); + + try (FixConnection connection = FixConnection.initiate(port)) + { + connection.logon(false); + Timing.assertEventuallyTrue("Library did not transition session to connected", + () -> + { + library.poll(1); + final List sessions = library.sessions(); + return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED; + } + ); + } + + Timing.assertEventuallyTrue("Fix connection was not disconnected", + () -> + { + final Reply> libraryReply = engine.libraries(); + while (!libraryReply.hasCompleted()) + { + sleep(100); + } + + final List allLibraryInfo = libraryReply.resultIfPresent(); + for (final LibraryInfo libraryInfo : allLibraryInfo) + { + if (libraryInfo.libraryId() == library.libraryId()) + { + return libraryInfo.sessions().isEmpty(); + } + } + return false; + } + ); + + Timing.assertEventuallyTrue("Library did not transition session to active", + () -> + { + library.poll(1); + final List sessions = library.sessions(); + return sessions.size() == 1 && sessions.get(0).state() == SessionState.ACTIVE; + } + ); + + assertEngineSubscriptionCaughtUpToLibraryPublication( + testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), engine, library); + + final List sessionContextAfterLogonNoSenderEndpoint = engine.allSessions(); + assertEquals(1, sessionContextAfterLogonNoSenderEndpoint.size()); + assertEquals(0, sessionContextAfterLogonNoSenderEndpoint.get(0).sequenceIndex()); + } + private void assertSell(final ExecutionReportDecoder executionReport) { assertEquals(executionReport.toString(), Side.SELL, executionReport.sideAsEnum()); @@ -1082,12 +1146,10 @@ private void sendInvalidMessage(final FixConnection connection, final Encoder en private void logonThenLogout() throws IOException { - final FixConnection connection = FixConnection.initiate(port); - - logon(connection); - - connection.logoutAndAwaitReply(); - - connection.close(); + try (FixConnection connection = FixConnection.initiate(port)) + { + logon(connection); + connection.logoutAndAwaitReply(); + } } } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/OrderFactory.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/OrderFactory.java index 28938c5b80..d5402e4e22 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/OrderFactory.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/OrderFactory.java @@ -27,7 +27,7 @@ final class OrderFactory { - private static NewOrderSingleEncoder makeOrder() + public static NewOrderSingleEncoder makeOrder() { final NewOrderSingleEncoder newOrderSingle = new NewOrderSingleEncoder(); final DecimalFloat price = new DecimalFloat(100); diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java index 6e29aa2eec..5721ec7553 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java @@ -50,6 +50,7 @@ import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; import static uk.co.real_logic.artio.Constants.*; +import static uk.co.real_logic.artio.GatewayProcess.NO_CONNECTION_ID; import static uk.co.real_logic.artio.MonitoringAgentFactory.consumeDistinctErrors; import static uk.co.real_logic.artio.Reply.State.ERRORED; import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver; @@ -215,9 +216,8 @@ public void shouldCopeWithResendRequestOfMissingMessagesWithHighInitialSequenceN HIGH_INITIAL_SEQUENCE_NUMBER, 4, HIGH_INITIAL_SEQUENCE_NUMBER, - 5); - - receivesGapfill(acceptingOtfAcceptor, greaterThan(HIGH_INITIAL_SEQUENCE_NUMBER)); + 5, + greaterThan(HIGH_INITIAL_SEQUENCE_NUMBER)); // Test that we don't accidentally send another resend request // Reproduction of reported bug @@ -988,6 +988,27 @@ private void resetSomeSequenceNumbersOfOfflineSessions( assertAcceptingSessionHasSequenceIndex(retry ? 2 : 1); } + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void followerSessionShouldReturnCurrentConnectionId() + { + launch(this::nothing); + + final SessionWriter followerSession = createFollowerSession(TEST_REPLY_TIMEOUT_IN_MS); + assertEquals(NO_CONNECTION_ID, followerSession.connectionId()); + + for (int i = 0; i < 2; i++) + { + connectPersistingSessions(); + + assertNotEquals(NO_CONNECTION_ID, followerSession.connectionId()); + assertEquals(acceptingSession.connectionId(), followerSession.connectionId()); + + disconnectSessions(); + + assertEquals(NO_CONNECTION_ID, followerSession.connectionId()); + } + } + private void connectPersistingSessions() { connectPersistingSessions(AUTOMATIC_INITIAL_SEQUENCE_NUMBER, false); @@ -1072,6 +1093,20 @@ private void exchangeMessagesAroundARestart( final int initialReceivedSequenceNumber, final int expectedInitToAccSeqNum, final int expectedAccToInitSeqNum) + { + exchangeMessagesAroundARestart(initialSentSequenceNumber, + initialReceivedSequenceNumber, + expectedInitToAccSeqNum, + expectedAccToInitSeqNum, + null); + } + + private void exchangeMessagesAroundARestart( + final int initialSentSequenceNumber, + final int initialReceivedSequenceNumber, + final int expectedInitToAccSeqNum, + final int expectedAccToInitSeqNum, + final Matcher gapFillMatcher) { launch(this::nothing); connectPersistingSessions(AUTOMATIC_INITIAL_SEQUENCE_NUMBER, resetSequenceNumbersOnLogon); @@ -1112,6 +1147,17 @@ private void exchangeMessagesAroundARestart( assertSequenceFromInitToAcceptAt(expectedInitToAccSeqNum, expectedAccToInitSeqNum); } + // this means a resend request will be sent by the acceptor + if (initialReceivedSequenceNumber != AUTOMATIC_INITIAL_SEQUENCE_NUMBER && + initialReceivedSequenceNumber < expectedAccToInitSeqNum) + { + assertReceivedResendRequest(testSystem, initiatingOtfAcceptor, expectedAccToInitSeqNum); + } + if (gapFillMatcher != null) + { + receivesGapfill(acceptingOtfAcceptor, gapFillMatcher); + } + assertTestRequestSentAndReceived(initiatingSession, testSystem, acceptingOtfAcceptor); } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SlowConsumerTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SlowConsumerTest.java index c360496559..36b7879404 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SlowConsumerTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SlowConsumerTest.java @@ -25,6 +25,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; + import uk.co.real_logic.artio.Timing; import uk.co.real_logic.artio.builder.Encoder; import uk.co.real_logic.artio.builder.LogonEncoder; @@ -148,7 +149,7 @@ public void shouldQuarantineThenDisconnectASlowConsumer() throws IOException hasBecomeSlow = true; } - sendMessage(); + sendMessageWithRetry(); } testSystem.poll(); @@ -173,16 +174,21 @@ private void sessionBecomesSlow() assertTrue(session.isSlowConsumer()); } - private void sendMessage() + private void sendMessageWithRetry() + { + testSystem.awaitSend(this::sendMessage); + } + + private long sendMessage() { if (sendMetadata) { metadata.putInt(0, session.lastSentMsgSeqNum() + 1); - session.trySend(testRequest, metadata, 0); + return session.trySend(testRequest, metadata, 0); } else { - session.trySend(testRequest); + return session.trySend(testRequest); } } @@ -206,7 +212,7 @@ public void shouldRestoreConnectionFromSlowGroupWhenItCatchesUp() throws IOExcep } while (bytesRead > 0); - sendMessage(); + sendMessageWithRetry(); testSystem.poll(); } @@ -259,7 +265,7 @@ private ConnectedSessionInfo sessionBecomesSlow(final MessageTimingCaptor messag { for (int i = 0; i < 10; i++) { - sendMessage(); + sendMessageWithRetry(); } testSystem.poll(); @@ -368,4 +374,3 @@ private void setup(final int senderMaxBytesInBuffer, final MessageTimingCaptor m library = testSystem.connect(libraryConfiguration); } } - diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java index 24cfc80333..a634fcb7ef 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java @@ -132,7 +132,7 @@ public void shouldSupportUnreleasedOfflineSessionsInSoleLibraryMode() assertEquals(1, acceptingSession.lastSentMsgSeqNum()); } - @Test(timeout = TEST_TIMEOUT_IN_MS) + @Test(timeout = LONG_TEST_TIMEOUT_IN_MS) public void shouldSupportManySessionReconnections() { launch(false, true); diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java index d8b78853b4..31a362818b 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java @@ -538,6 +538,20 @@ static void assertReceivedSingleHeartbeat( }); } + static void assertReceivedResendRequest( + final TestSystem testSystem, final FakeOtfAcceptor acceptor, final int msgSeqNo) + { + assertEventuallyTrue("Failed to receive resend request", + () -> + { + testSystem.poll(); + + return acceptor + .receivedMessage("2") + .anyMatch((message) -> msgSeqNo == Integer.parseInt(message.get(Constants.MSG_SEQ_NUM))); + }); + } + static LibraryInfo gatewayLibraryInfo(final FixEngine engine) { return libraries(engine) @@ -707,4 +721,55 @@ static void awaitIndexerCaughtUp( () -> {}); } } + + static void assertEngineSubscriptionCaughtUpToLibraryPublication( + final TestSystem testSystem, + final String aeronDirectoryName, + final FixEngine engine, + final FixLibrary library) + { + final EngineStreamInfo engineStreamInfo = + testSystem.awaitCompletedReply(FixEngineInternals.engineStreamInfo(engine)).resultIfPresent(); + + final LibraryStreamInfo libraryStreamInfo = FixLibraryInternals.libraryStreamInfo(library); + + final Aeron.Context aeronCtx = new Aeron.Context().aeronDirectoryName(aeronDirectoryName); + try (Aeron aeron = Aeron.connect(aeronCtx)) + { + final CountersReader countersReader = aeron.countersReader(); + + final SubPosMatcher subPosMatcher = new SubPosMatcher( + countersReader, + engineStreamInfo.librarySubscriptionRegistrationId(), + libraryStreamInfo.outboundPublicationSessionId(), + libraryStreamInfo.outboundPublicationPosition()); + + countersReader.forEach((counterId, typeId, keyBuffer, label) -> + subPosMatcher.tryMatch(counterId, typeId, keyBuffer)); + + if (!subPosMatcher.hasCounterId()) + { + throw new IllegalStateException("did not match counter: " + subPosMatcher); + } + + assertEventuallyTrue( + () -> + { + final StringBuilder builder = new StringBuilder(); + builder.append("expected sub-pos counters:\n"); + builder.append(subPosMatcher).append('\n'); + builder.append("\nbut counters were:\n"); + countersReader.forEach((value, counterId, label) -> + builder.append(String.format("%d: %d - %s%n", counterId, value, label))); + return builder.toString(); + }, + () -> + { + testSystem.poll(); + return subPosMatcher.isCaughtUp(); + }, + DEFAULT_TIMEOUT_IN_MS, + () -> {}); + } + } } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/TestSystem.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/TestSystem.java index 81c9a75f13..a5d4ca31b0 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/TestSystem.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/TestSystem.java @@ -53,6 +53,7 @@ public class TestSystem { + private final ExecutorService executor = Executors.newSingleThreadExecutor(); public static final long LONG_AWAIT_TIMEOUT_IN_MS = 600_000_000; private final List libraries; @@ -80,15 +81,19 @@ public TestSystem awaitTimeoutInMs(final long awaitTimeoutInMs) return this; } - public void poll() + public int poll() { + int result = 0; if (scheduler != null) { - scheduler.invokeFramer(); - scheduler.invokeFramer(); + result = scheduler.invokeFramer(); + } + for (final FixLibrary library : libraries) + { + result += library.poll(LIBRARY_LIMIT); } - libraries.forEach((library) -> library.poll(LIBRARY_LIMIT)); operations.forEach(Runnable::run); + return result; } public void addOperation(final Runnable operation) @@ -311,48 +316,42 @@ public void awaitLongBlocking(final Runnable operation) public T awaitBlocking(final Callable operation) { - final ExecutorService executor = Executors.newSingleThreadExecutor(); - try - { - final Future future = executor.submit(operation); + final Future future = executor.submit(operation); - final long deadlineInMs = System.currentTimeMillis() + awaitTimeoutInMs; + final long deadlineInMs = System.currentTimeMillis() + awaitTimeoutInMs; - while (!future.isDone()) - { - poll(); - - Thread.yield(); - - if (System.currentTimeMillis() > deadlineInMs) - { - Exceptions.printStackTracesForAllThreads(); + while (!future.isDone()) + { + poll(); - throw new TimeoutException(operation + " failed: timed out"); - } - } + Thread.yield(); - try + if (System.currentTimeMillis() > deadlineInMs) { - return future.get(); - } - catch (final InterruptedException | ExecutionException e) - { - if (e.getCause() instanceof TimeoutException || - e.getCause() instanceof java.util.concurrent.TimeoutException) - { - Exceptions.printStackTracesForAllThreads(); - } + Exceptions.printStackTracesForAllThreads(); - LangUtil.rethrowUnchecked(e); + throw new TimeoutException(String.format(" %s failed: timed out after [%s]ms", + operation, + awaitTimeoutInMs)); } + } - return null; + try + { + return future.get(); } - finally + catch (final InterruptedException | ExecutionException e) { - executor.shutdown(); + if (e.getCause() instanceof TimeoutException || + e.getCause() instanceof java.util.concurrent.TimeoutException) + { + Exceptions.printStackTracesForAllThreads(); + } + + LangUtil.rethrowUnchecked(e); } + + return null; } public void awaitUnbind(final ILink3Connection session) diff --git a/build.gradle b/build.gradle index 0a24828e3f..06fad0947c 100644 --- a/build.gradle +++ b/build.gradle @@ -13,40 +13,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - buildscript { ext { awsAccessKeyId = System.env.AWS_ACCESS_KEY_ID awsSecretAccessKey = System.env.AWS_SECRET_ACCESS_KEY } - - repositories { - mavenCentral() - } - - dependencies { - classpath 'org.ow2.asm:asm:9.7' - classpath 'org.ow2.asm:asm-commons:9.7' - } } plugins { - id 'com.github.johnrengelman.shadow' version '8.1.1' apply false - id 'net.researchgate.release' version '2.8.1' - id 'com.github.ben-manes.versions' version '0.51.0' + alias(libs.plugins.shadow).apply(false) + alias(libs.plugins.release) + alias(libs.plugins.versions) } -def checkstyleVersion = '9.3' -def hamcrestVersion = '2.2' -def junitVersion = '5.10.2' -def mockitoVersion = '4.11.0' -def hdrHistogramVersion = '2.1.12' -def jmhVersion = '1.37' -def byteBuddyVersion = '1.14.16' - -def agronaVersion = '1.21.2' -def sbeVersion = '1.31.1' -def aeronVersion = '1.44.1' def artioGroup = 'uk.co.real-logic' def iLink3Enabled = false @@ -103,6 +82,8 @@ def projectPom = { ext { group = artioGroup isReleaseVersion = !version.endsWith("SNAPSHOT") + repoUrl = 'https://oss.sonatype.org/service/local/staging/deploy/maven2/' + snapshotUrl = 'https://oss.sonatype.org/content/repositories/snapshots/' iLink3Enabled = Boolean.getBoolean("fix.core.iLink3Enabled") if (!project.hasProperty('repoUsername')) { @@ -113,18 +94,18 @@ ext { repoPassword = '' } - if (!project.hasProperty('repoUrl')) { - repoUrl = '' + if (!project.hasProperty('signingKey')) { + signingKey = null } - if (!project.hasProperty('snapshotUrl')) { - snapshotUrl = '' + if (!project.hasProperty('signingPassword')) { + signingPassword = null } } release { git { - requireBranch = '' + requireBranch.set('') } } @@ -140,9 +121,14 @@ allprojects { configurations.configureEach { resolutionStrategy { failOnVersionConflict() - force "org.agrona:agrona:${agronaVersion}", - "net.bytebuddy:byte-buddy:${byteBuddyVersion}", - "net.bytebuddy:byte-buddy-agent:${byteBuddyVersion}" + force "org.agrona:agrona:${libs.versions.agrona.get()}", + libs.byteBuddy, + libs.byteBuddy.agent, + // patching conflicting Checkstyle dependencies + "org.codehaus.plexus:plexus-utils:3.3.0", + "org.apache.commons:commons-lang3:3.8.1", + "org.apache.httpcomponents:httpcore:4.4.14", + "commons-codec:commons-codec:1.15" } } @@ -168,6 +154,7 @@ subprojects { toolchain { languageVersion = JavaLanguageVersion.of(buildJavaVersion) } + sourceCompatibility = JavaVersion.VERSION_17 } tasks.withType(JavaCompile).configureEach { @@ -176,23 +163,22 @@ subprojects { } dependencies { - testImplementation "org.hamcrest:hamcrest:${hamcrestVersion}" - testImplementation "org.mockito:mockito-core:${mockitoVersion}" - testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" - testImplementation "junit:junit:4.13.2" // FIXME: Remove after migration to JUnit 5 is complete + testImplementation libs.hamcrest + testImplementation libs.mockito + testImplementation platform("org.junit:junit-bom:${libs.versions.junit.get()}") + testImplementation "org.junit.jupiter:junit-jupiter" + testImplementation libs.junit4 testRuntimeOnly "org.junit.platform:junit-platform-launcher" - testRuntimeOnly "org.junit.vintage:junit-vintage-engine:${junitVersion}" + testRuntimeOnly "org.junit.vintage:junit-vintage-engine" } - checkstyle.toolVersion = "${checkstyleVersion}" + checkstyle.toolVersion = libs.versions.checkstyle.get() tasks.withType(Test).configureEach { useJUnitPlatform() - if (buildJavaVersion >= 9) { - jvmArgs('--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED') - jvmArgs('--add-opens', 'java.base/java.util.zip=ALL-UNNAMED') - } + jvmArgs('--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED') + jvmArgs('--add-opens', 'java.base/java.util.zip=ALL-UNNAMED') if (buildJavaVersion >= 21) { jvmArgs('-XX:+EnableDynamicAgentLoading') @@ -200,6 +186,11 @@ subprojects { testClassesDirs = testing.suites.test.sources.output.classesDirs classpath = testing.suites.test.sources.runtimeClasspath + maxHeapSize("1g") + jvmArgs("-XX:MaxDirectMemorySize=1g", + "-XX:+HeapDumpOnOutOfMemoryError", + "-XX:+CrashOnOutOfMemoryError", + "-XX:HeapDumpPath=" + project.rootDir + "/heap.hprof") testLogging { events 'skipped', 'failed' @@ -254,9 +245,7 @@ subprojects { options.encoding = 'UTF-8' options.docEncoding = 'UTF-8' options.charSet = 'UTF-8' - if (JavaVersion.current().isJava10Compatible()) { - options.addBooleanOption 'html5', true - } + options.addBooleanOption('html5', true) options.addStringOption('Xdoclint:all,-missing', '-quiet') } @@ -271,7 +260,7 @@ subprojects { tasks.register('testJar', Jar) { dependsOn testClasses archiveClassifier.set('tests') - archiveBaseName.set("test-${project.archivesBaseName}") + archiveBaseName.set("test-${base.archivesName}") from sourceSets.test.output } @@ -286,6 +275,9 @@ subprojects { } signing { + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + } required { isReleaseVersion && gradle.taskGraph.hasTask("uploadArchives") } sign configurations.archives } @@ -299,22 +291,21 @@ subprojects { def validationXsdPath = project(':artio-codecs').projectDir.toString() + '/src/main/resources/fpl/sbe.xsd' project(':artio-codecs') { - apply plugin: 'com.github.johnrengelman.shadow' + apply plugin: 'com.gradleup.shadow' configurations { codecGeneration } dependencies { - api "org.agrona:agrona:${agronaVersion}" - api "io.aeron:aeron-client:${aeronVersion}" - codecGeneration "uk.co.real-logic:sbe-tool:${sbeVersion}" - api "uk.co.real-logic:sbe-tool:${sbeVersion}" - api files('build/classes/java/generated') - testImplementation files('build/classes/java/generated') + api libs.agrona + api libs.aeron.client + api libs.sbe + api files("${layout.buildDirectory.get()}/classes/java/generated") + codecGeneration libs.sbe } - def generatedDir = file("${buildDir}/generated-src") + def generatedDir = file("${layout.buildDirectory.get()}/generated-src") sourceSets { generated { java.srcDir generatedDir @@ -364,27 +355,29 @@ project(':artio-codecs') { } signing { + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + } sign publishing.publications.artioCodecs } } project(':artio-ilink3-codecs') { - apply plugin: 'com.github.johnrengelman.shadow' + apply plugin: 'com.gradleup.shadow' configurations { codecGeneration } dependencies { - api "org.agrona:agrona:${agronaVersion}" - codecGeneration "uk.co.real-logic:sbe-tool:${sbeVersion}" - api files('build/classes/java/generated') - testImplementation files('build/classes/java/generated') + api libs.agrona + api files("${layout.buildDirectory.get()}/classes/java/generated") + codecGeneration libs.sbe } def outputClasses = 'out/production/classes' def sbePath = 'src/main/resources/uk/co/real_logic/artio/ilink/ilinkbinary.xml' - def generatedDir = file("${buildDir}/generated-src") + def generatedDir = file("${layout.buildDirectory.get()}/generated-src") sourceSets { generated { java.srcDir generatedDir @@ -444,27 +437,29 @@ project(':artio-ilink3-codecs') { } signing { + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + } sign publishing.publications.artioILink3Codecs } } project(':artio-binary-entrypoint-codecs') { - apply plugin: 'com.github.johnrengelman.shadow' + apply plugin: 'com.gradleup.shadow' configurations { codecGeneration } dependencies { - api "org.agrona:agrona:${agronaVersion}" - codecGeneration "uk.co.real-logic:sbe-tool:${sbeVersion}" - api files('build/classes/java/generated') - testImplementation files('build/classes/java/generated') + api libs.agrona + api files("${layout.buildDirectory.get()}/classes/java/generated") + codecGeneration libs.sbe } def outputClasses = 'out/production/classes' def sbePath = 'src/main/resources/uk/co/real_logic/artio/entrypoint/binary_entrypoint.xml' - def generatedDir = file("${buildDir}/generated-src") + def generatedDir = file("${layout.buildDirectory.get()}/generated-src") sourceSets { generated { java.srcDir generatedDir @@ -521,12 +516,15 @@ project(':artio-binary-entrypoint-codecs') { } signing { + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + } sign publishing.publications.artioBinaryEntrypointCodecs } } project(':artio-binary-entrypoint-impl') { - apply plugin: 'com.github.johnrengelman.shadow' + apply plugin: 'com.gradleup.shadow' dependencies { api project(':artio-binary-entrypoint-codecs') @@ -544,12 +542,15 @@ project(':artio-binary-entrypoint-impl') { } signing { + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + } sign publishing.publications.artioBinaryEntrypointImpl } } project(':artio-ilink3-impl') { - apply plugin: 'com.github.johnrengelman.shadow' + apply plugin: 'com.gradleup.shadow' dependencies { if (iLink3Enabled) { @@ -575,6 +576,9 @@ project(':artio-ilink3-impl') { } signing { + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + } sign publishing.publications.artioIlink3Impl } } @@ -586,11 +590,11 @@ project(':artio-session-codecs') { dependencies { api project(':artio-codecs') + api files("${layout.buildDirectory.get()}/classes/java/generated") codecGeneration project(':artio-codecs') - api files('build/classes/java/generated') } - def generatedDir = file("$buildDir/generated-src") + def generatedDir = file("${layout.buildDirectory.get()}/generated-src") sourceSets { generated { java.srcDir generatedDir @@ -604,7 +608,9 @@ project(':artio-session-codecs') { tasks.register('generateCodecs', JavaExec) { mainClass.set('uk.co.real_logic.artio.dictionary.CodecGenerationTool') classpath = configurations.codecGeneration - args = [generatedDir, 'src/main/resources/session_dictionary.xml'] + def dictionaryFile = 'src/main/resources/session_dictionary.xml' + inputs.file(dictionaryFile) + args = [generatedDir, dictionaryFile] outputs.dir generatedDir systemProperty("fix.codecs.flyweight", "true") } @@ -612,7 +618,9 @@ project(':artio-session-codecs') { tasks.register('generateOtherCodecs', JavaExec) { mainClass.set('uk.co.real_logic.artio.dictionary.CodecGenerationTool') classpath = configurations.codecGeneration - args = [generatedDir, 'src/main/resources/other_session_dictionary.xml'] + def dictionaryFile = 'src/main/resources/other_session_dictionary.xml' + inputs.file(dictionaryFile) + args = [generatedDir, dictionaryFile] outputs.dir generatedDir systemProperty("fix.codecs.flyweight", "true") systemProperty("fix.codecs.parent_package", "uk.co.real_logic.artio.other") @@ -641,6 +649,9 @@ project(':artio-session-codecs') { } signing { + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + } sign publishing.publications.artioSessionCodecs } } @@ -652,11 +663,11 @@ project(':artio-session-fixt-codecs') { dependencies { api project(path: ':artio-codecs') + api files("${layout.buildDirectory.get()}/classes/java/generated") codecGeneration project(':artio-codecs') - api files('build/classes/java/generated') } - def generatedDir = file("$buildDir/generated-src") + def generatedDir = file("${layout.buildDirectory.get()}/generated-src") sourceSets { generated { java.srcDir generatedDir @@ -702,21 +713,23 @@ project(':artio-session-fixt-codecs') { } signing { + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + } sign publishing.publications.artioSessionFixtCodecs } } project(':artio-core') { dependencies { + api project(':artio-codecs') + api libs.aeron.client + api libs.aeron.archive + api libs.hdrHistogram // FIXME: Api? + testImplementation project(path: ':artio-codecs', configuration: 'tests') testImplementation project(':artio-session-codecs') testImplementation project(':artio-session-fixt-codecs') - api project(':artio-codecs') - - api "io.aeron:aeron-client:${aeronVersion}" - api "io.aeron:aeron-driver:${aeronVersion}" - api "io.aeron:aeron-archive:${aeronVersion}" - api "org.hdrhistogram:HdrHistogram:${hdrHistogramVersion}" } test { @@ -734,34 +747,37 @@ project(':artio-core') { } signing { + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + } sign publishing.publications.artioCore } } project(':artio-system-tests') { - apply plugin: 'com.github.johnrengelman.shadow' + apply plugin: 'com.gradleup.shadow' sourceSets { perf } dependencies { + api project(':artio-session-codecs') + api project(':artio-session-fixt-codecs') + api project(path: ':artio-core') + testImplementation project(path: ':artio-core', configuration: 'tests') testImplementation project(path: ':artio-codecs', configuration: 'tests') testImplementation project(path: ':artio-session-codecs', configuration: 'tests') testImplementation project(path: ':artio-session-fixt-codecs', configuration: 'tests') - api project(':artio-session-codecs') - api project(':artio-session-fixt-codecs') - api project(path: ':artio-core') - perfImplementation project - perfImplementation "org.openjdk.jmh:jmh-core:${jmhVersion}" + perfImplementation libs.jmh.core // For IDEA: - implementation "org.openjdk.jmh:jmh-core:${jmhVersion}" + implementation libs.jmh.core - annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:${jmhVersion}" + annotationProcessor libs.jmh.generator.annprocess } test { @@ -795,12 +811,15 @@ project(':artio-system-tests') { } signing { + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + } sign publishing.publications.artioSystemTests } } project(':artio-ilink-system-tests') { - apply plugin: 'com.github.johnrengelman.shadow' + apply plugin: 'com.gradleup.shadow' dependencies { testImplementation project(path: ':artio-core', configuration: 'tests') @@ -830,12 +849,15 @@ project(':artio-ilink-system-tests') { } signing { + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + } sign publishing.publications.artioILink3SystemTests } } project(':artio-binary-entrypoint-system-tests') { - apply plugin: 'com.github.johnrengelman.shadow' + apply plugin: 'com.gradleup.shadow' dependencies { testImplementation project(path: ':artio-core', configuration: 'tests') @@ -860,12 +882,15 @@ project(':artio-binary-entrypoint-system-tests') { } signing { + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + } sign publishing.publications.artioBinaryEntrypointSystemTests } } project(':artio-samples') { - apply plugin: 'com.github.johnrengelman.shadow' + apply plugin: 'com.gradleup.shadow' dependencies { api project(':artio-core') @@ -892,6 +917,9 @@ project(':artio-samples') { } signing { + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + } sign publishing.publications.artioSamples } } @@ -926,7 +954,7 @@ tasks.withType(PublishToMavenRepository).configureEach { } wrapper { - gradleVersion = '8.1.1' + gradleVersion = libs.versions.gradle.get() distributionType = 'ALL' } @@ -943,3 +971,16 @@ tasks.register('copyTestLogs', Copy) { includeEmptyDirs = false } + +def isNonStable = { String version -> + def stableKeyword = ['RELEASE', 'FINAL', 'GA'].any { it -> version.toUpperCase().contains(it) } + def regex = /^[0-9,.v-]+(-r)?$/ + return !stableKeyword && !(version ==~ regex) +} + +tasks.named('dependencyUpdates').configure { + // Reject all non stable versions + rejectVersionIf { + isNonStable(it.candidate.version) + } +} diff --git a/gradle.properties b/gradle.properties index f215dd27f5..662684ff81 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,4 @@ +org.gradle.java.installations.auto-detect=false org.gradle.java.installations.auto-download=false org.gradle.java.installations.fromEnv=BUILD_JAVA_HOME @@ -7,6 +8,6 @@ org.gradle.caching=true org.gradle.logging.level=lifecycle org.gradle.warning.mode=all -version=0.154-SNAPSHOT +version=0.160-SNAPSHOT systemProp.org.gradle.internal.publish.checksums.insecure=true diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml new file mode 100644 index 0000000000..8e056409bb --- /dev/null +++ b/gradle/libs.versions.toml @@ -0,0 +1,27 @@ +[versions] +aeron = "1.46.7" +agrona = "1.23.1" +byteBuddy = "1.15.10" +checkstyle = "10.20.1" +junit = "5.11.3" +gradle = "8.10.2" +jmh = "1.37" + +[libraries] +agrona = { group = "org.agrona", name = "agrona", version = { strictly = "[1.23.1, 2.0[", require = "1.23.1" } } +aeron-client = { group = "io.aeron", name = "aeron-client", version.ref = "aeron" } +aeron-archive = { group = "io.aeron", name = "aeron-archive", version.ref = "aeron" } +sbe = { group = "uk.co.real-logic", name = "sbe-tool", version = "1.33.1" } +mockito = { group = "org.mockito", name = "mockito-core", version = "5.14.2" } +hamcrest = { group = "org.hamcrest", name = "hamcrest", version = "3.0" } +junit4 = { group = "junit", name = "junit", version = "4.13.2" } +byteBuddy = { group = "net.bytebuddy", name = "byte-buddy", version.ref = "byteBuddy" } +byteBuddy-agent = { group = "net.bytebuddy", name = "byte-buddy-agent", version.ref = "byteBuddy" } +hdrHistogram = { group = "org.hdrhistogram", name = "HdrHistogram", version = "2.2.2" } +jmh-core = { group = "org.openjdk.jmh", name = "jmh-core", version.ref = "jmh" } +jmh-generator-annprocess = { group = "org.openjdk.jmh", name = "jmh-generator-annprocess", version.ref = "jmh" } + +[plugins] +versions = { id = "com.github.ben-manes.versions", version = "0.51.0" } +release = { id = "net.researchgate.release", version = "3.0.2" } +shadow = { id = "com.gradleup.shadow", version = "8.3.5" } diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index c1962a79e2..a4b76b9530 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 8707e8b506..79eb9d003f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-all.zip networkTimeout=10000 +validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index aeb74cbb43..f5feea6d6b 100755 --- a/gradlew +++ b/gradlew @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +# SPDX-License-Identifier: Apache-2.0 +# ############################################################################## # @@ -55,7 +57,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -83,7 +85,9 @@ done # This is normally unused # shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s +' "$PWD" ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -130,10 +134,13 @@ location of your Java installation." fi else JAVACMD=java - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. @@ -141,7 +148,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac @@ -149,7 +156,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then '' | soft) :;; #( *) # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -198,11 +205,11 @@ fi # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' -# Collect all arguments for the java command; -# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of -# shell script including quotes and variable substitutions, so put them in -# double quotes to make sure that they get re-expanded; and -# * put everything else in single quotes, so that it's not re-expanded. +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ diff --git a/gradlew.bat b/gradlew.bat index 93e3f59f13..9d21a21834 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -13,6 +13,8 @@ @rem See the License for the specific language governing permissions and @rem limitations under the License. @rem +@rem SPDX-License-Identifier: Apache-2.0 +@rem @if "%DEBUG%"=="" @echo off @rem ########################################################################## @@ -43,11 +45,11 @@ set JAVA_EXE=java.exe %JAVA_EXE% -version >NUL 2>&1 if %ERRORLEVEL% equ 0 goto execute -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. +echo. 1>&2 +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 goto fail @@ -57,11 +59,11 @@ set JAVA_EXE=%JAVA_HOME%/bin/java.exe if exist "%JAVA_EXE%" goto execute -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. +echo. 1>&2 +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 goto fail