fullNamesSet = new HashSet<>();
-
/**
* Constructs set of prefix by names and separators. Names can use wildcard only symbol "*".
* Syntax of wildcard is the same as the specifications of {@code java import ...} .
@@ -94,50 +86,40 @@ static StringPrefixSet valueOf(String names) {
static StringPrefixSet valueOf(Collection names) {
if (names == null)
return NOTHING_SET;
- boolean isAnything = false;
- Set fullNamesSet = new HashSet<>();
- TreeSet prefixTree = new TreeSet<>();
- int i = 0;
+ HashSet fullNamesSet = new HashSet<>();
+ TreeSet prefixSet = new TreeSet<>();
+ int i = -1;
for (String name : names) {
+ i++;
if (name.length() == 0)
continue;
int index = name.indexOf(ANYTHING_SYMBOL);
if (index == -1) {
fullNamesSet.add(name);
- continue;
+ } else {
+ if (index != name.length() - ANYTHING_SYMBOL.length())
+ throw new IllegalArgumentException("Name at index " + i + " has wrong format: " + name);
+ prefixSet.add(name.substring(0, index));
}
- if (index != name.length() - 1)
- throw new IllegalArgumentException("Name at number " + i + " has a wrong format: " + name);
- if (index == 0) {
- isAnything = true;
- continue;
- }
- prefixTree.add(name.substring(0, name.length() - 1));
- i++;
}
- if (isAnything)
- return ANYTHING_SET;
- return optimize(fullNamesSet, prefixTree);
+ return optimize(fullNamesSet, prefixSet);
}
- private static StringPrefixSet optimize(Set fullNamesSet, TreeSet prefixSet) {
- Iterator it = prefixSet.iterator();
- String cur;
- String last = null;
- if (it.hasNext())
- last = it.next();
- while (it.hasNext()) {
- cur = it.next();
- if (cur.startsWith(last))
- it.remove();
- else
- last = cur;
+ private static StringPrefixSet optimize(HashSet fullNamesSet, TreeSet prefixSet) {
+ if (prefixSet.size() > 1) {
+ Iterator it = prefixSet.iterator();
+ String last = it.next();
+ while (it.hasNext()) {
+ String cur = it.next();
+ if (cur.startsWith(last))
+ it.remove();
+ else
+ last = cur;
+ }
}
- it = fullNamesSet.iterator();
- String prefix;
- while (it.hasNext()) {
- cur = it.next();
- prefix = prefixSet.floor(cur);
+ for (Iterator it = fullNamesSet.iterator(); it.hasNext();) {
+ String cur = it.next();
+ String prefix = prefixSet.floor(cur);
if (prefix != null && cur.startsWith(prefix))
it.remove();
}
@@ -150,14 +132,13 @@ private static StringPrefixSet optimize(Set fullNamesSet, TreeSet fullNamesSet, @Nonnull TreeSet prefixSet) {
- this.fullNamesSet.addAll(fullNamesSet);
- this.prefixSet.addAll(prefixSet);
- }
- private StringPrefixSet(StringPrefixSet prefixSet) {
- this.fullNamesSet.addAll(prefixSet.fullNamesSet);
- this.prefixSet.addAll(prefixSet.prefixSet);
+ private final HashSet fullNamesSet;
+ private final TreeSet prefixSet;
+
+ private StringPrefixSet(HashSet fullNamesSet, TreeSet prefixSet) {
+ this.fullNamesSet = fullNamesSet;
+ this.prefixSet = prefixSet;
}
/**
@@ -165,7 +146,6 @@ private StringPrefixSet(StringPrefixSet prefixSet) {
*
* @param other set of prefix
* @return the union of two sets of prefix.
- * @throws IllegalArgumentException if the sets are different delimiter symbol between the prefix.
*/
StringPrefixSet add(StringPrefixSet other) {
if (this == ANYTHING_SET || other == ANYTHING_SET)
@@ -179,13 +159,13 @@ StringPrefixSet add(StringPrefixSet other) {
prefixSetUnion.addAll(prefixSet);
return optimize(fullNamesSetUnion, prefixSetUnion);
}
- return this == NOTHING_SET ? new StringPrefixSet(other) : new StringPrefixSet(this);
+ return this == NOTHING_SET ? other : this;
}
StringPrefixSet copy() {
if (this == ANYTHING_SET || this == NOTHING_SET)
return this;
- return new StringPrefixSet(this);
+ return new StringPrefixSet(fullNamesSet, prefixSet);
}
/**
@@ -195,9 +175,9 @@ StringPrefixSet copy() {
* @return {@code true} if name is contained in list.
*/
boolean accept(String name) {
- if (isAnything())
+ if (this == ANYTHING_SET)
return true;
- if (isNothing())
+ if (this == NOTHING_SET)
return false;
if (fullNamesSet.contains(name))
return true;
@@ -211,7 +191,7 @@ boolean accept(String name) {
* @return {@code true} if set contains any prefix name.
*/
boolean isAnything() {
- return fullNamesSet.isEmpty() && (prefixSet.size() == 1 && prefixSet.first().length() == 0);
+ return this == ANYTHING_SET;
}
/**
@@ -220,18 +200,17 @@ boolean isAnything() {
* @return {@code true} if set not contains any prefix name.
*/
boolean isNothing() {
- return fullNamesSet.isEmpty() && prefixSet.isEmpty();
+ return this == NOTHING_SET;
}
@Override
public boolean equals(Object o) {
- if (this == o)
+ if (o == this)
return true;
- if (o == null || !(o instanceof StringPrefixSet))
+ if (!(o instanceof StringPrefixSet))
return false;
StringPrefixSet other = (StringPrefixSet) o;
- return fullNamesSet.equals(other.fullNamesSet)
- && prefixSet.equals(other.prefixSet);
+ return fullNamesSet.equals(other.fullNamesSet) && prefixSet.equals(other.prefixSet);
}
@Override
@@ -241,24 +220,17 @@ public int hashCode() {
@Override
public String toString() {
- String className = "SerialClassList{";
- if (this == NOTHING_SET)
- return className + "NOTHING}";
if (this == ANYTHING_SET)
- return className + "ANYTHING}";
- return className + "prefixes=" + prefixSet
- + ", full names = " + new TreeSet<>(fullNamesSet)
- + "}";
+ return "StringPrefixSet{ANYTHING}";
+ if (this == NOTHING_SET)
+ return "StringPrefixSet{NOTHING}";
+ return "StringPrefixSet{prefixes=" + prefixSet + ", full names = " + new TreeSet<>(fullNamesSet) + "}";
}
public List getList() {
- List result = new ArrayList<>();
- if (fullNamesSet != null)
- result.addAll(fullNamesSet);
- if (prefixSet != null) {
- for (String prefix : prefixSet)
- result.add(prefix + ANYTHING_SYMBOL);
- }
+ List result = new ArrayList<>(fullNamesSet);
+ for (String prefix : prefixSet)
+ result.add(prefix + ANYTHING_SYMBOL);
return result;
}
}
diff --git a/dxlib/src/main/java/com/devexperts/util/TimePeriod.java b/dxlib/src/main/java/com/devexperts/util/TimePeriod.java
index 34db080b0..9c6b73b97 100644
--- a/dxlib/src/main/java/com/devexperts/util/TimePeriod.java
+++ b/dxlib/src/main/java/com/devexperts/util/TimePeriod.java
@@ -29,6 +29,11 @@ public class TimePeriod implements Serializable {
*/
public static final TimePeriod UNLIMITED = new TimePeriod(Long.MAX_VALUE);
+ /**
+ * Input representation of unlimited interval
+ */
+ private static final String UNLIMITED_STR = "inf";
+
/**
* Returns TimePeriod
with value milliseconds.
*
@@ -36,7 +41,7 @@ public class TimePeriod implements Serializable {
* @return TimePeriod
with value milliseconds.
*/
public static TimePeriod valueOf(long value) {
- return value == 0 ? ZERO : new TimePeriod(value);
+ return value == 0 ? ZERO : (value == Long.MAX_VALUE ? UNLIMITED : new TimePeriod(value));
}
/**
@@ -49,6 +54,7 @@ public static TimePeriod valueOf(long value) {
* Letter "S" can be also omitted. In this case last number will be supposed to be seconds.
* Number of seconds can be fractional. So it is possible to define duration accurate within milliseconds.
* Every part can be omitted. It is supposed that it's value is zero then.
+ * String "inf" recognized as unlimited period.
*
*
* @param value string representation
@@ -68,6 +74,8 @@ protected TimePeriod(long value) {
protected static long parse(String value) {
try {
+ if (UNLIMITED_STR.equalsIgnoreCase(value))
+ return UNLIMITED.getTime();
boolean metAnyPart = false;
value = value.toUpperCase() + '#';
long res = 0;
diff --git a/license/pom.xml b/license/pom.xml
index e9b45b1de..82ec9d071 100644
--- a/license/pom.xml
+++ b/license/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/mars-sample/pom.xml b/mars-sample/pom.xml
index 91b9c6c27..855a3d006 100644
--- a/mars-sample/pom.xml
+++ b/mars-sample/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/mars/pom.xml b/mars/pom.xml
index c7fee6f1b..16f21e32f 100644
--- a/mars/pom.xml
+++ b/mars/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/pom.xml b/pom.xml
index ea150d39f..520182631 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
com.devexperts.qd
QD
pom
- 3.293
+ 3.294
2002
diff --git a/proto-sample/pom.xml b/proto-sample/pom.xml
index f55d51251..3123570b2 100644
--- a/proto-sample/pom.xml
+++ b/proto-sample/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/proto-ssl/pom.xml b/proto-ssl/pom.xml
index 8961a2e5d..9f8c1e4ce 100644
--- a/proto-ssl/pom.xml
+++ b/proto-ssl/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/proto/pom.xml b/proto/pom.xml
index 49e708a5b..aeff5e672 100644
--- a/proto/pom.xml
+++ b/proto/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/qd-core/pom.xml b/qd-core/pom.xml
index dfc743de4..8b8f22f49 100644
--- a/qd-core/pom.xml
+++ b/qd-core/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/qd-core/src/main/java/com/devexperts/qd/qtp/AgentAdapter.java b/qd-core/src/main/java/com/devexperts/qd/qtp/AgentAdapter.java
index f84b59aea..1f0005fa7 100644
--- a/qd-core/src/main/java/com/devexperts/qd/qtp/AgentAdapter.java
+++ b/qd-core/src/main/java/com/devexperts/qd/qtp/AgentAdapter.java
@@ -26,6 +26,9 @@
import com.devexperts.qd.SubscriptionFilter;
import com.devexperts.qd.SubscriptionIterator;
import com.devexperts.qd.kit.CompositeFilters;
+import com.devexperts.qd.ng.EventFlag;
+import com.devexperts.qd.ng.RecordBuffer;
+import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.ng.RecordProvider;
import com.devexperts.qd.ng.RecordSource;
import com.devexperts.qd.qtp.auth.BasicChannelShaperFactory;
@@ -43,6 +46,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
@@ -86,6 +90,14 @@ public static class Factory extends MessageAdapter.AbstractFactory {
*/
private Executor subscriptionExecutor;
+ /**
+ * Subscription keep-alive period.
+ * If more than zero, unsubscription requests will be delayed for a specified period to amortize
+ * fast unsub/sub sequences.
+ *
NOTE: For the moment only zero and unlimited periods are supported.
+ */
+ private TimePeriod subscriptionKeepAlive = TimePeriod.ZERO;
+
public Factory(QDTicker ticker, QDStream stream, QDHistory history, SubscriptionFilter filter) {
super(ticker, stream, history, filter);
}
@@ -165,6 +177,22 @@ public synchronized void setSubscriptionExecutor(Executor subscriptionExecutor)
rebuildChannels();
}
+ @Nonnull
+ public synchronized TimePeriod getSubscriptionKeepAlive() {
+ return subscriptionKeepAlive;
+ }
+
+ @Configurable(description = "subscription keep-alive period (0 or 'inf')")
+ public synchronized void setSubscriptionKeepAlive(TimePeriod keepAlive) {
+ Objects.requireNonNull(keepAlive);
+ if (subscriptionKeepAlive.equals(keepAlive))
+ return;
+ if (!TimePeriod.ZERO.equals(keepAlive) && !TimePeriod.UNLIMITED.equals(keepAlive))
+ throw new IllegalArgumentException("Only zero or infinite supported");
+ this.subscriptionKeepAlive = keepAlive;
+ rebuildChannels();
+ }
+
synchronized Executor getOrCreateSubscriptionExecutor() {
if (subscriptionExecutor != null)
return subscriptionExecutor;
@@ -202,6 +230,7 @@ public MessageAdapter createAdapter(QDStats stats) {
private final DataScheme scheme;
private final QDFilter filter; // @NotNull
private AgentAdapter.Factory factory;
+ private boolean skipRemoveSubscription = false; // current implementation supports only infinite keep-alive period
final QDFilter[] peerFilter = new QDFilter[N_CONTRACTS]; // filters received from remote peer in DESCRIBE PROTOCOL message
@@ -347,6 +376,7 @@ public void updateChannel(ChannelShaper shaper) {}
private void setAgentFactory(AgentAdapter.Factory factory) {
this.factory = factory;
+ skipRemoveSubscription = TimePeriod.UNLIMITED.equals(factory.getSubscriptionKeepAlive());
}
Factory getAgentFactory() {
@@ -366,27 +396,46 @@ protected void processSubscription(SubscriptionIterator iterator, MessageType me
throw new IllegalArgumentException(message.toString());
if (!isAlive()) {
reportIgnoredMessage("Adapter is " + getStatus(), message);
- } else {
- boolean hasContract = false;
- QDContract contract = message.getContract();
+ } else if (!skipRemoveSubscription || !message.isSubscriptionRemove()) {
RecordSource sub = LegacyAdapter.of(iterator);
- long initialPosition = sub.getPosition();
- for (int i = 0; i < shapers.length; i++) {
- ChannelShaper shaper = shapers[i];
- if (shaper.getContract() != contract)
- continue;
- hasContract = true;
- AgentChannel channel = getOrCreateChannelAt(i);
- sub.setPosition(initialPosition);
- channel.processSubscription(message, sub);
+ if (skipRemoveSubscription) {
+ RecordBuffer buf = skipRemoveSubscription(sub);
+ processSubscription(buf, message);
+ buf.release();
+ } else {
+ processSubscription(sub, message);
}
LegacyAdapter.release(iterator, sub);
- if (!hasContract)
- reportIgnoredMessage("Contract is not supported", message);
}
SubscriptionConsumer.VOID.processSubscription(iterator); // silently ignore all remaining data if it was not processed
}
+ private void processSubscription(RecordSource sub, MessageType message) {
+ QDContract contract = message.getContract();
+ boolean hasContract = false;
+ long initialPosition = sub.getPosition();
+ for (int i = 0; i < shapers.length; i++) {
+ ChannelShaper shaper = shapers[i];
+ if (shaper.getContract() != contract)
+ continue;
+ hasContract = true;
+ AgentChannel channel = getOrCreateChannelAt(i);
+ sub.setPosition(initialPosition);
+ channel.processSubscription(message, sub);
+ }
+ if (!hasContract)
+ reportIgnoredMessage("Contract is not supported", message);
+ }
+
+ private RecordBuffer skipRemoveSubscription(RecordSource sub) {
+ RecordBuffer buf = RecordBuffer.getInstance(sub.getMode());
+ for (RecordCursor cur; (cur = sub.next()) != null; ) {
+ if (!EventFlag.REMOVE_SYMBOL.in(cur.getEventFlags()))
+ buf.append(cur);
+ }
+ return buf;
+ }
+
private AgentChannel getOrCreateChannelAt(int i) {
AgentChannel channel = channels[i];
if (channel == null) {
diff --git a/qd-core/src/main/java/com/devexperts/qd/util/TimePeriod.java b/qd-core/src/main/java/com/devexperts/qd/util/TimePeriod.java
index 6789e4f12..c6a41ebb4 100644
--- a/qd-core/src/main/java/com/devexperts/qd/util/TimePeriod.java
+++ b/qd-core/src/main/java/com/devexperts/qd/util/TimePeriod.java
@@ -38,6 +38,7 @@ public static TimePeriod valueOf(long value) {
*
Letter "S" can be also omitted. In this case last number will be supposed to be seconds.
* Number of seconds can be fractional. So it is possible to define duration accurate within milliseconds.
* Every part can be omitted. It is supposed that it's value is zero then.
+ * String "inf" recognized as unlimited period.
*
*
* @param value string representation
diff --git a/qd-dataextractor/pom.xml b/qd-dataextractor/pom.xml
index a4708e62e..8178baaff 100644
--- a/qd-dataextractor/pom.xml
+++ b/qd-dataextractor/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
4.0.0
diff --git a/qd-logger/pom.xml b/qd-logger/pom.xml
index 5a2c9d826..338e6f507 100644
--- a/qd-logger/pom.xml
+++ b/qd-logger/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/qd-nio/pom.xml b/qd-nio/pom.xml
index c285bfff5..141af5600 100644
--- a/qd-nio/pom.xml
+++ b/qd-nio/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/qd-rmi/pom.xml b/qd-rmi/pom.xml
index 8a09fdd30..949348ae0 100644
--- a/qd-rmi/pom.xml
+++ b/qd-rmi/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/qd-sample/pom.xml b/qd-sample/pom.xml
index 4e4c59df3..f3516c9aa 100644
--- a/qd-sample/pom.xml
+++ b/qd-sample/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/qd-sample/src/test/java/com/devexperts/qd/qtp/test/SubscriptionKeepAliveTest.java b/qd-sample/src/test/java/com/devexperts/qd/qtp/test/SubscriptionKeepAliveTest.java
new file mode 100644
index 000000000..c94dc2c45
--- /dev/null
+++ b/qd-sample/src/test/java/com/devexperts/qd/qtp/test/SubscriptionKeepAliveTest.java
@@ -0,0 +1,163 @@
+/*
+ * !++
+ * QDS - Quick Data Signalling Library
+ * !-
+ * Copyright (C) 2002 - 2020 Devexperts LLC
+ * !-
+ * This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
+ * If a copy of the MPL was not distributed with this file, You can obtain one at
+ * http://mozilla.org/MPL/2.0/.
+ * !__
+ */
+package com.devexperts.qd.qtp.test;
+
+import com.devexperts.logging.Logging;
+import com.devexperts.qd.DataRecord;
+import com.devexperts.qd.QDAgent;
+import com.devexperts.qd.QDContract;
+import com.devexperts.qd.QDDistributor;
+import com.devexperts.qd.ng.RecordBuffer;
+import com.devexperts.qd.ng.RecordCursor;
+import com.devexperts.qd.ng.RecordMode;
+import com.devexperts.qd.ng.RecordProvider;
+import com.devexperts.qd.qtp.AgentAdapter;
+import com.devexperts.qd.qtp.DistributorAdapter;
+import com.devexperts.qd.qtp.MessageConnectors;
+import com.devexperts.qd.qtp.MessageType;
+import com.devexperts.qd.qtp.QDEndpoint;
+import com.devexperts.qd.qtp.socket.ServerSocketTestHelper;
+import com.devexperts.qd.stats.QDStats;
+import com.devexperts.qd.test.TestDataScheme;
+import com.devexperts.util.IndexedSet;
+import com.dxfeed.promise.Promise;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static com.devexperts.qd.qtp.MessageType.Flag.ADDSUB;
+import static com.devexperts.qd.qtp.MessageType.Flag.REMSUB;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class SubscriptionKeepAliveTest {
+
+ private static final Logging log = Logging.getLogging(SubscriptionKeepAliveTest.class);
+ private static final long TIMEOUT_MS = 30_000;
+
+ private static final TestDataScheme SCHEME = new TestDataScheme();
+ private static final DataRecord RECORD = SCHEME.getRecord(0);
+
+ private QDEndpoint server;
+ private QDEndpoint client;
+
+ @Before
+ public void setUp() {
+ server = createEndpoint();
+ client = createEndpoint();
+ }
+
+ @After
+ public void tearDown() {
+ if (client != null)
+ client.close();
+ if (server != null) {
+ server.close();
+ }
+ }
+
+ @Test
+ public void testWithoutKeepAlive() throws InterruptedException {
+ doTestKeepAlive(false);
+ }
+
+ @Test
+ public void testInfiniteKeepAlive() throws InterruptedException {
+ doTestKeepAlive(true);
+ }
+
+ protected void doTestKeepAlive(boolean keepAlive) throws InterruptedException {
+ String testID = UUID.randomUUID().toString();
+ Promise port = ServerSocketTestHelper.createPortPromise(testID);
+ server.addConnectors(MessageConnectors.createMessageConnectors(
+ new AgentAdapter.Factory(server, null),
+ ":0[name=" + testID + ",bindAddr=127.0.0.1,subscriptionKeepAlive=" + (keepAlive ? "inf" : "0") + "]",
+ QDStats.VOID));
+ server.startConnectors();
+
+ client.addConnectors(MessageConnectors.createMessageConnectors(
+ new DistributorAdapter.Factory(client, null),
+ "127.0.0.1:" + port.await(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+ QDStats.VOID));
+ client.startConnectors();
+
+ ArrayBlockingQueue subQueue = new ArrayBlockingQueue<>(10);
+ ArrayBlockingQueue unsubQueue = new ArrayBlockingQueue<>(10);
+
+ QDDistributor distributor = server.getTicker().distributorBuilder().build();
+ distributor.getAddedRecordProvider().setRecordListener(provider -> retrieveSub(provider, subQueue, ADDSUB));
+ distributor.getRemovedRecordProvider().setRecordListener(provider -> retrieveSub(provider, unsubQueue, REMSUB));
+
+ QDAgent agent = client.getTicker().agentBuilder().withVoidRecordListener(true).build();
+ setSubscription(agent, "A", "B", "C");
+
+ HashSet subSet = new HashSet<>();
+ consumeSub(subQueue, 3, subSet::add);
+ assertEquals(IndexedSet.of("A", "B", "C"), subSet);
+
+ setSubscription(agent, "D");
+ consumeSub(subQueue, 1, subSet::add);
+
+ if (!keepAlive) {
+ consumeSub(unsubQueue, 3, subSet::remove);
+ assertEquals(IndexedSet.of("D"), subSet);
+ }
+
+ Thread.sleep(100); // give a chance to receive UNSUB
+ assertTrue("No unsubscriptions expected", unsubQueue.isEmpty());
+ assertTrue("No subscriptions expected", subQueue.isEmpty());
+ }
+
+ protected void consumeSub(BlockingQueue queue, int num, Consumer sink) throws InterruptedException {
+ for (int i = 0; i < num; i++) {
+ String s = queue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ assertNotNull("Expected subscription", s);
+ sink.accept(s);
+ }
+ }
+
+ protected void retrieveSub(RecordProvider provider, BlockingQueue subQueue, MessageType.Flag flag) {
+ RecordBuffer sub = RecordBuffer.getInstance(RecordMode.SUBSCRIPTION);
+ provider.retrieve(sub);
+ sub.rewind();
+ for (RecordCursor cur; (cur = sub.next()) != null; ) {
+ String s = cur.getDecodedSymbol();
+ log.debug("[" + flag + "]: " + s);
+ subQueue.add(s);
+ }
+ sub.release();
+ }
+
+ protected void setSubscription(QDAgent agent, String... symbols) {
+ RecordBuffer sub = RecordBuffer.getInstance(RecordMode.SUBSCRIPTION);
+ Stream.of(symbols).forEach(s -> sub.add(RECORD, SCHEME.getCodec().encode(s), s));
+ agent.setSubscription(sub);
+ sub.release();
+ }
+
+ protected QDEndpoint createEndpoint() {
+ return QDEndpoint.newBuilder()
+ .withScheme(SCHEME)
+ .withCollectors(Collections.singletonList(QDContract.TICKER))
+ .build();
+ }
+}
diff --git a/qd-samplecert/pom.xml b/qd-samplecert/pom.xml
index 132f134e8..90fa66cbb 100644
--- a/qd-samplecert/pom.xml
+++ b/qd-samplecert/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/qd-servlet/pom.xml b/qd-servlet/pom.xml
index 0db5dda18..fa305dc13 100644
--- a/qd-servlet/pom.xml
+++ b/qd-servlet/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/qd-stripe/pom.xml b/qd-stripe/pom.xml
index 94253e7cf..e5f715981 100644
--- a/qd-stripe/pom.xml
+++ b/qd-stripe/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/qd-tools/pom.xml b/qd-tools/pom.xml
index 8993b7a0f..2a55ba8c9 100644
--- a/qd-tools/pom.xml
+++ b/qd-tools/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/qd-tools/src/main/java/com/devexperts/qd/tools/TimeSyncTracker.java b/qd-tools/src/main/java/com/devexperts/qd/tools/TimeSyncTracker.java
index ddc2f4ea9..df5c8bc6c 100644
--- a/qd-tools/src/main/java/com/devexperts/qd/tools/TimeSyncTracker.java
+++ b/qd-tools/src/main/java/com/devexperts/qd/tools/TimeSyncTracker.java
@@ -127,13 +127,9 @@ public synchronized void start() {
try {
socket.joinGroup(InetAddress.getByName(addr));
} catch (IOException e) {
- // Workaround for JDK-8178161 (see QD-1131)
- String javaVersion = System.getProperty("java.version");
+ // Workaround for JDK-8178161 (see QD-1131, QD-1262)
String osName = System.getProperty("os.name");
- if (e.getMessage().contains("assign requested address") &&
- osName.toLowerCase().startsWith("mac") &&
- (javaVersion.startsWith("1.8") || javaVersion.startsWith("9.")))
- {
+ if (e.getMessage().contains("assign requested address") && osName.toLowerCase().startsWith("mac")) {
QDLog.log.info("Time synchronization tracker initialization failed - unsupported on MacOS");
} else {
QDLog.log.error("Failed to join multicast group for time synchronization tracker", e);
diff --git a/qds-file/pom.xml b/qds-file/pom.xml
index fbeacbb70..253075c08 100644
--- a/qds-file/pom.xml
+++ b/qds-file/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/qds-monitoring/pom.xml b/qds-monitoring/pom.xml
index ce50015cb..e7388162b 100644
--- a/qds-monitoring/pom.xml
+++ b/qds-monitoring/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/qds-tools/pom.xml b/qds-tools/pom.xml
index 391328745..32fce7a5d 100644
--- a/qds-tools/pom.xml
+++ b/qds-tools/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/qds/pom.xml b/qds/pom.xml
index bebb55d7f..54c4378e7 100644
--- a/qds/pom.xml
+++ b/qds/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0
diff --git a/rt-api-builder/pom.xml b/rt-api-builder/pom.xml
index ca8f84278..052c795ef 100644
--- a/rt-api-builder/pom.xml
+++ b/rt-api-builder/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
4.0.0
diff --git a/teamcity-version/pom.xml b/teamcity-version/pom.xml
index cbddda2ba..3fcd8d7a3 100644
--- a/teamcity-version/pom.xml
+++ b/teamcity-version/pom.xml
@@ -14,7 +14,7 @@
QD
com.devexperts.qd
- 3.293
+ 3.294
../pom.xml
4.0.0