diff --git a/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java b/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java index ca4b20cf9c..520c2233b9 100644 --- a/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java +++ b/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java @@ -140,6 +140,97 @@ public class LogBufferDescriptor */ public static final int LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH = CACHE_LINE_LENGTH * 2; + /** + * Offset within the log metadata where the sparse property is stored. + */ + public static final int LOG_SPARSE_OFFSET; + + /** + * Offset within the log metadata where the tether property is stored. + */ + public static final int LOG_TETHER_OFFSET; + + /** + * Offset within the log metadata where the rejoin property is stored. + */ + public static final int LOG_REJOIN_OFFSET; + + /** + * Offset within the log metadata where the reliable property is stored. + */ + public static final int LOG_RELIABLE_OFFSET; + + /** + * Offset within the log metadata where the socket receive buffer length is stored. + */ + public static final int LOG_SOCKET_RCVBUF_LENGTH_OFFSET; + + /** + * Offset within the log metadata where the socket send buffer length is stored. + */ + public static final int LOG_SOCKET_SNDBUF_LENGTH_OFFSET; + + /** + * Offset within the log metadata where the receiver window length is stored. + */ + public static final int LOG_RECEIVER_WINDOW_LENGTH_OFFSET; + + /** + * Offset within the log metadata where the publication window length is stored. + */ + public static final int LOG_PUBLICATION_WINDOW_LENGTH_OFFSET; + + /** + * Offset within the log metadata where the window limit timeout ns is stored. + */ + public static final int LOG_UNTETHERED_WINDOW_LIMIT_TIMEOUT_NS_OFFSET; + + /** + * Offset within the log metadata where the untether resting timeout ns is stored. + */ + public static final int LOG_UNTETHERED_RESTING_TIMEOUT_NS_OFFSET; + + /** + * Offset within the log metadata where the max resend is stored. + */ + public static final int LOG_MAX_RESEND_OFFSET; + + /** + * Offset within the log metadata where the linger timeout ns is stored. + */ + public static final int LOG_LINGER_TIMEOUT_NS_OFFSET; + + /** + * Offset within the log metadata where the signal-eos is stored. + */ + public static final int LOG_SIGNAL_EOS_OFFSET; + + /** + * Offset within the log metadata where the spies-simulate-connection is stored. + */ + public static final int LOG_SPIES_SIMULATE_CONNECTION_OFFSET; + + /** + * Offset within the log metadata where the group is stored. + */ + public static final int LOG_GROUP_OFFSET; + + /** + * Offset within the log metadata where the entity tag is stored. + */ + public static final int LOG_ENTITY_TAG_OFFSET; + + /** + * Offset within the log metadata where the response correlation id is stored. + */ + public static final int LOG_RESPONSE_CORRELATION_ID_OFFSET; + + /** + * Offset within the log metadata where is-response is stored. + */ + public static final int LOG_IS_RESPONSE_OFFSET; + + /** * Total length of the log metadata buffer in bytes. *
@@ -183,16 +274,55 @@ public class LogBufferDescriptor
      *  +---------------------------------------------------------------+
      *  |                          Page Size                            |
      *  +---------------------------------------------------------------+
-     *  |                      Cache Line Padding                      ...
-     * ...                                                              |
+     *  |                    Publication Window Length                  |
+     *  +---------------------------------------------------------------+
+     *  |                      Receiver Window Length                   |
+     *  +---------------------------------------------------------------+
+     *  |                    Socket Send Buffer Length                  |
+     *  +---------------------------------------------------------------+
+     *  |                  Socket Receive Buffer Length                 |
+     *  +---------------------------------------------------------------+
+     *  |                        Maximum Resend                         |
+     *  +---------------------------------------------------------------+
+     *  |                           Entity tag                          |
+     *  |                                                               |
+     *  +---------------------------------------------------------------+
+     *  |                    Response correlation id                    |
+     *  |                                                               |
      *  +---------------------------------------------------------------+
      *  |                     Default Frame Header                     ...
      * ...                                                              |
      *  +---------------------------------------------------------------+
+     *  |                        Linger Timeout (ns)                    |
+     *  |                                                               |
+     *  +---------------------------------------------------------------+
+     *  |               Untethered Window Limit Timeout (ns)            |
+     *  |                                                               |
+     *  +---------------------------------------------------------------+
+     *  |                 Untethered Resting Timeout (ns)               |
+     *  |                                                               |
+     *  +---------------------------------------------------------------+
+     *  |                            Group                              |
+     *  +---------------------------------------------------------------+
+     *  |                          Is response                          |
+     *  +---------------------------------------------------------------+
+     *  |                            Rejoin                             |
+     *  +---------------------------------------------------------------+
+     *  |                           Reliable                            |
+     *  +---------------------------------------------------------------+
+     *  |                            Sparse                             |
+     *  +---------------------------------------------------------------+
+     *  |                         Signal EOS                            |
+     *  +---------------------------------------------------------------+
+     *  |                 Spies Simulate Connection                     |
+     *  +---------------------------------------------------------------+
+     *  |                          Tether                               |
+     *  +---------------------------------------------------------------+
      * 
*/ public static final int LOG_META_DATA_LENGTH; + static { int offset = 0; @@ -207,6 +337,7 @@ public class LogBufferDescriptor LOG_ACTIVE_TRANSPORT_COUNT = LOG_IS_CONNECTED_OFFSET + SIZE_OF_INT; offset += (CACHE_LINE_LENGTH * 2); + LOG_CORRELATION_ID_OFFSET = offset; LOG_INITIAL_TERM_ID_OFFSET = LOG_CORRELATION_ID_OFFSET + SIZE_OF_LONG; LOG_DEFAULT_FRAME_HEADER_LENGTH_OFFSET = LOG_INITIAL_TERM_ID_OFFSET + SIZE_OF_INT; @@ -214,10 +345,53 @@ public class LogBufferDescriptor LOG_TERM_LENGTH_OFFSET = LOG_MTU_LENGTH_OFFSET + SIZE_OF_INT; LOG_PAGE_SIZE_OFFSET = LOG_TERM_LENGTH_OFFSET + SIZE_OF_INT; + LOG_PUBLICATION_WINDOW_LENGTH_OFFSET = LOG_PAGE_SIZE_OFFSET + SIZE_OF_INT; + LOG_RECEIVER_WINDOW_LENGTH_OFFSET = LOG_PUBLICATION_WINDOW_LENGTH_OFFSET + SIZE_OF_INT; + LOG_SOCKET_SNDBUF_LENGTH_OFFSET = LOG_RECEIVER_WINDOW_LENGTH_OFFSET + SIZE_OF_INT; + LOG_SOCKET_RCVBUF_LENGTH_OFFSET = LOG_SOCKET_SNDBUF_LENGTH_OFFSET + SIZE_OF_INT; + LOG_MAX_RESEND_OFFSET = LOG_SOCKET_RCVBUF_LENGTH_OFFSET + SIZE_OF_INT; + LOG_ENTITY_TAG_OFFSET = LOG_MAX_RESEND_OFFSET + SIZE_OF_INT; + LOG_RESPONSE_CORRELATION_ID_OFFSET = LOG_ENTITY_TAG_OFFSET + SIZE_OF_LONG; + offset += CACHE_LINE_LENGTH; + LOG_DEFAULT_FRAME_HEADER_OFFSET = offset; + offset += LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH; + + LOG_LINGER_TIMEOUT_NS_OFFSET = offset; + offset += SIZE_OF_LONG; + + LOG_UNTETHERED_WINDOW_LIMIT_TIMEOUT_NS_OFFSET = offset; + offset += SIZE_OF_LONG; + + LOG_UNTETHERED_RESTING_TIMEOUT_NS_OFFSET = offset; + offset += SIZE_OF_LONG; + + LOG_GROUP_OFFSET = offset; + offset += SIZE_OF_BYTE; + + LOG_IS_RESPONSE_OFFSET = offset; + offset += SIZE_OF_BYTE; + + LOG_REJOIN_OFFSET = offset; + offset += SIZE_OF_BYTE; + + LOG_RELIABLE_OFFSET = offset; + offset += SIZE_OF_BYTE; + + LOG_SPARSE_OFFSET = offset; + offset += SIZE_OF_BYTE; - LOG_META_DATA_LENGTH = align(offset + LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH, PAGE_MIN_SIZE); + LOG_SIGNAL_EOS_OFFSET = offset; + offset += SIZE_OF_BYTE; + + LOG_SPIES_SIMULATE_CONNECTION_OFFSET = offset; + offset += SIZE_OF_BYTE; + + LOG_TETHER_OFFSET = offset; + offset += SIZE_OF_BYTE; + + LOG_META_DATA_LENGTH = align(offset, PAGE_MIN_SIZE); } /** @@ -878,4 +1052,312 @@ public static int computeAssembledFrameLength(final int length, final int maxPay return HEADER_LENGTH + (numMaxPayloads * maxPayloadSize) + remainingPayload; } + + /** + * Get whether the log is sparse from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return true if the log is sparse, otherwise false. + */ + public static boolean sparse(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getByte(LOG_SPARSE_OFFSET) == 1; + } + + /** + * Set whether the log is sparse in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value true if the log is sparse, otherwise false. + */ + public static void sparse(final UnsafeBuffer metadataBuffer, final boolean value) + { + metadataBuffer.putByte(LOG_SPARSE_OFFSET, (byte)(value ? 1 : 0)); + } + + /** + * Get whether the log is tethered from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return true if the log is tethered, otherwise false. + */ + public static boolean tether(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getByte(LOG_TETHER_OFFSET) == 1; + } + + /** + * Set whether the log is tethered in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value true if the log is tethered, otherwise false. + */ + public static void tether(final UnsafeBuffer metadataBuffer, final boolean value) + { + metadataBuffer.putByte(LOG_TETHER_OFFSET, (byte)(value ? 1 : 0)); + } + + /** + * Get whether the log is rejoining from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return true if the log is rejoining, otherwise false. + */ + public static boolean rejoin(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getByte(LOG_REJOIN_OFFSET) == 1; + } + + /** + * Set whether the log is rejoining in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value true if the log is rejoining, otherwise false. + */ + public static void rejoin(final UnsafeBuffer metadataBuffer, final boolean value) + { + metadataBuffer.putByte(LOG_REJOIN_OFFSET, (byte)(value ? 1 : 0)); + } + + /** + * Get whether the log is reliable from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return true if the log is reliable, otherwise false. + */ + public static boolean reliable(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getByte(LOG_RELIABLE_OFFSET) == 1; + } + + /** + * Set whether the log is reliable in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value true if the log is reliable, otherwise false. + */ + public static void reliable(final UnsafeBuffer metadataBuffer, final boolean value) + { + metadataBuffer.putByte(LOG_RELIABLE_OFFSET, (byte)(value ? 1 : 0)); + } + + /** + * Get the socket receive buffer length from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the socket receive buffer length. + */ + public static int socketRcvbufLength(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getInt(LOG_SOCKET_RCVBUF_LENGTH_OFFSET); + } + + /** + * Set the socket receive buffer length in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the socket receive buffer length to set. + */ + public static void socketRcvbufLength(final UnsafeBuffer metadataBuffer, final int value) + { + metadataBuffer.putInt(LOG_SOCKET_RCVBUF_LENGTH_OFFSET, value); + } + + /** + * Get the socket send buffer length from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the socket send buffer length. + */ + public static int socketSndbufLength(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getInt(LOG_SOCKET_SNDBUF_LENGTH_OFFSET); + } + + /** + * Set the socket send buffer length in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the socket send buffer length to set. + */ + public static void socketSndbufLength(final UnsafeBuffer metadataBuffer, final int value) + { + metadataBuffer.putInt(LOG_SOCKET_SNDBUF_LENGTH_OFFSET, value); + } + + /** + * Get the receiver window length from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the receiver window length. + */ + public static int receiverWindowLength(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getInt(LOG_RECEIVER_WINDOW_LENGTH_OFFSET); + } + + /** + * Set the receiver window length in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the receiver window length to set. + */ + public static void receiverWindowLength(final UnsafeBuffer metadataBuffer, final int value) + { + metadataBuffer.putInt(LOG_RECEIVER_WINDOW_LENGTH_OFFSET, value); + } + + /** + * Get the publication window length from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the publication window length. + */ + public static int publicationWindowLength(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getInt(LOG_PUBLICATION_WINDOW_LENGTH_OFFSET); + } + + /** + * Set the publication window length in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the publication window length to set. + */ + public static void publicationWindowLength(final UnsafeBuffer metadataBuffer, final int value) + { + metadataBuffer.putInt(LOG_PUBLICATION_WINDOW_LENGTH_OFFSET, value); + } + + /** + * Get the untethered window limit timeout in nanoseconds from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the untethered window limit timeout in nanoseconds. + */ + public static long untetheredWindowLimitTimeoutNs(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getLong(LOG_UNTETHERED_WINDOW_LIMIT_TIMEOUT_NS_OFFSET); + } + + /** + * Set the untethered window limit timeout in nanoseconds in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the untethered window limit timeout to set. + */ + public static void untetheredWindowLimitTimeoutNs(final UnsafeBuffer metadataBuffer, final long value) + { + metadataBuffer.putLong(LOG_UNTETHERED_WINDOW_LIMIT_TIMEOUT_NS_OFFSET, value); + } + + /** + * Get the untethered resting timeout in nanoseconds from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the untethered resting timeout in nanoseconds. + */ + public static long untetheredRestingTimeoutNs(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getLong(LOG_UNTETHERED_RESTING_TIMEOUT_NS_OFFSET); + } + + /** + * Set the untethered resting timeout in nanoseconds in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the untethered resting timeout to set. + */ + public static void untetheredRestingTimeoutNs(final UnsafeBuffer metadataBuffer, final long value) + { + metadataBuffer.putLong(LOG_UNTETHERED_RESTING_TIMEOUT_NS_OFFSET, value); + } + + /** + * Get the maximum resend count from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the maximum resend count. + */ + public static int maxResend(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getInt(LOG_MAX_RESEND_OFFSET); + } + + /** + * Set the maximum resend count in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the maximum resend count to set. + */ + public static void maxResend(final UnsafeBuffer metadataBuffer, final int value) + { + metadataBuffer.putInt(LOG_MAX_RESEND_OFFSET, value); + } + + /** + * Get the linger timeout in nanoseconds from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the linger timeout in nanoseconds. + */ + public static long lingerTimeoutNs(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getLong(LOG_LINGER_TIMEOUT_NS_OFFSET); + } + + /** + * Set the linger timeout in nanoseconds in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the linger timeout to set. + */ + public static void lingerTimeoutNs(final UnsafeBuffer metadataBuffer, final long value) + { + metadataBuffer.putLong(LOG_LINGER_TIMEOUT_NS_OFFSET, value); + } + + /** + * Get whether the signal EOS is enabled from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return true if signal EOS is enabled, otherwise false. + */ + public static boolean signalEos(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getByte(LOG_SIGNAL_EOS_OFFSET) == 1; + } + + /** + * Set whether the signal EOS is enabled in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value true if signal EOS is enabled, otherwise false. + */ + public static void signalEos(final UnsafeBuffer metadataBuffer, final boolean value) + { + metadataBuffer.putByte(LOG_SIGNAL_EOS_OFFSET, (byte)(value ? 1 : 0)); + } + + /** + * Get whether spies simulate connection from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return true if spies simulate connection, otherwise false. + */ + public static boolean spiesSimulateConnection(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getByte(LOG_SPIES_SIMULATE_CONNECTION_OFFSET) == 1; + } + + /** + * Set whether spies simulate connection in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value true if spies simulate connection, otherwise false. + */ + public static void spiesSimulateConnection(final UnsafeBuffer metadataBuffer, final boolean value) + { + metadataBuffer.putByte(LOG_SPIES_SIMULATE_CONNECTION_OFFSET, (byte)(value ? 1 : 0)); + } } diff --git a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java index 8b0e8b7e3b..ed02fe172e 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java +++ b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java @@ -309,6 +309,10 @@ void onCreatePublicationImage( termBufferLength, isOldestSubscriptionSparse(subscriberPositions), senderMtuLength, + subscriptionChannel.socketRcvbufLength(), + subscriptionChannel.socketSndbufLength(), + termOffset, + subscriptionParams, registrationId); congestionControl = ctx.congestionControlSupplier().newInstance( @@ -1724,8 +1728,9 @@ private NetworkPublication newNetworkPublication( params.initialTermId, params.termLength); - final RawLog rawLog = - newNetworkPublicationLog(params.sessionId, streamId, params.initialTermId, registrationId, params); + final int termOffset = params.termOffset; + final RawLog rawLog = newNetworkPublicationLog(params.sessionId, streamId, params.initialTermId, registrationId, + udpChannel.socketRcvbufLength(), udpChannel.socketSndbufLength(), termOffset, params); UnsafeBufferPosition publisherPos = null; UnsafeBufferPosition publisherLmt = null; UnsafeBufferPosition senderPos = null; @@ -1805,10 +1810,38 @@ private RawLog newNetworkPublicationLog( final int streamId, final int initialTermId, final long registrationId, + final int socketRcvBufLength, + final int socketSndBufLength, + final int termOffset, final PublicationParams params) { final RawLog rawLog = logFactory.newPublication(registrationId, params.termLength, params.isSparse); - initLogMetadata(sessionId, streamId, initialTermId, params.mtuLength, registrationId, rawLog); + final int receiverWindowLength = 0; + final boolean tether = false; + final boolean rejoin = false; + final boolean reliable = false; + initLogMetadata( + sessionId, + streamId, + initialTermId, + params.mtuLength, + registrationId, + socketRcvBufLength, + socketSndBufLength, + termOffset, + receiverWindowLength, + tether, + rejoin, + reliable, + params.isSparse, + params.publicationWindowLength, + params.untetheredWindowLimitTimeoutNs, + params.untetheredRestingTimeoutNs, + params.maxResend, + params.lingerTimeoutNs, + params.signalEos, + params.spiesSimulateConnection, + rawLog); initialisePositionCounters(initialTermId, params, rawLog.metaData()); return rawLog; @@ -1819,10 +1852,39 @@ private RawLog newIpcPublicationLog( final int streamId, final int initialTermId, final long registrationId, + final int termOffset, final PublicationParams params) { final RawLog rawLog = logFactory.newPublication(registrationId, params.termLength, params.isSparse); - initLogMetadata(sessionId, streamId, initialTermId, params.mtuLength, registrationId, rawLog); + + final int socketRcvBufLength = 0; + final int socketSndbufLength = 0; + final int receiverWindowLength = 0; + final boolean tether = false; + final boolean rejoin = false; + final boolean reliable = false; + initLogMetadata( + sessionId, + streamId, + initialTermId, + params.mtuLength, + registrationId, + socketRcvBufLength, + socketSndbufLength, + termOffset, + receiverWindowLength, + tether, + rejoin, + reliable, + params.isSparse, + params.publicationWindowLength, + params.untetheredWindowLimitTimeoutNs, + params.untetheredRestingTimeoutNs, + params.maxResend, + params.lingerTimeoutNs, + params.signalEos, + params.spiesSimulateConnection, + rawLog); initialisePositionCounters(initialTermId, params, rawLog.metaData()); return rawLog; @@ -1834,11 +1896,29 @@ private void initLogMetadata( final int initialTermId, final int mtuLength, final long registrationId, + final int socketRcvBufLength, + final int socketSndbufLength, + final int termOffset, + final int receiverWindowLength, + final boolean tether, + final boolean rejoin, + final boolean reliable, + final boolean sparse, + final int publicationWindowLength, + final long untetheredWindowLimitTimeoutNs, + final long untetheredRestingTimeoutNs, + final int maxResend, + final long lingerTimeoutNs, + final boolean signalEos, + final boolean spiesSimulateConnection, final RawLog rawLog) { final UnsafeBuffer logMetaData = rawLog.metaData(); - defaultDataHeader.sessionId(sessionId).streamId(streamId).termId(initialTermId); + defaultDataHeader.sessionId(sessionId) + .streamId(streamId) + .termId(initialTermId) + .termOffset(termOffset); storeDefaultFrameHeader(logMetaData, defaultDataHeader); initialTermId(logMetaData, initialTermId); @@ -1846,9 +1926,30 @@ private void initLogMetadata( termLength(logMetaData, rawLog.termLength()); pageSize(logMetaData, ctx.filePageSize()); correlationId(logMetaData, registrationId); + + socketRcvbufLength(logMetaData, socketRcvBufLength); + socketSndbufLength(logMetaData, socketSndbufLength); + receiverWindowLength(logMetaData, receiverWindowLength); + publicationWindowLength(logMetaData, publicationWindowLength); + maxResend(logMetaData, maxResend); + spiesSimulateConnection(logMetaData, spiesSimulateConnection); + + tether(logMetaData, tether); + rejoin(logMetaData, rejoin); + reliable(logMetaData, reliable); + sparse(logMetaData, sparse); + signalEos(logMetaData, signalEos); + + untetheredWindowLimitTimeoutNs(logMetaData, untetheredWindowLimitTimeoutNs); + untetheredRestingTimeoutNs(logMetaData, untetheredRestingTimeoutNs); + lingerTimeoutNs(logMetaData, lingerTimeoutNs); + + // Acts like a release fence; so this should be the last statement to ensure that all above writes + // are ordered before the eos-position. endOfStreamPosition(logMetaData, Long.MAX_VALUE); } + private static void initialisePositionCounters( final int initialTermId, final PublicationParams params, final UnsafeBuffer logMetaData) { @@ -1886,10 +1987,43 @@ private RawLog newPublicationImageLog( final int termBufferLength, final boolean isSparse, final int senderMtuLength, + final int socketRcvBufLength, + final int socketSndBufLength, + final int termOffset, + final SubscriptionParams params, final long correlationId) { final RawLog rawLog = logFactory.newImage(correlationId, termBufferLength, isSparse); - initLogMetadata(sessionId, streamId, initialTermId, senderMtuLength, correlationId, rawLog); + + final int publicationWindowLength = 0; + final long untetheredWindowLimitTimeoutNs = 0; + final long untetheredRestingTimeoutNs = 0; + final int maxResend = 0; + final long lingerTimeoutNs = 0; + final boolean signalEos = false; + final boolean spiesSimulateConnection = false; + initLogMetadata( + sessionId, + streamId, + initialTermId, + senderMtuLength, + correlationId, + socketRcvBufLength, + socketSndBufLength, + termOffset, + params.receiverWindowLength, + params.isTether, + params.isRejoin, + params.isReliable, + params.isSparse, + publicationWindowLength, + untetheredWindowLimitTimeoutNs, + untetheredRestingTimeoutNs, + maxResend, + lingerTimeoutNs, + signalEos, + spiesSimulateConnection, + rawLog); return rawLog; } @@ -2273,8 +2407,10 @@ private IpcPublication addIpcPublication( final boolean isExclusive, final PublicationParams params) { + final int termOffset = params.termOffset; + final RawLog rawLog = - newIpcPublicationLog(params.sessionId, streamId, params.initialTermId, registrationId, params); + newIpcPublicationLog(params.sessionId, streamId, params.initialTermId, registrationId, termOffset, params); UnsafeBufferPosition publisherPosition = null; UnsafeBufferPosition publisherLimit = null;