diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java index 13b2843c07..40cc9426ce 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java @@ -47,6 +47,7 @@ import org.jetbrains.annotations.Nullable; import java.io.*; +import java.lang.ref.WeakReference; import java.security.SecureRandom; import java.text.ParseException; import java.time.ZoneId; @@ -110,7 +111,7 @@ private final TimeProvider time; @NotNull private final BiFunction storeFactory; - private final Set closers = Collections.newSetFromMap(new IdentityHashMap<>()); + private final Set> closers = Collections.newSetFromMap(new IdentityHashMap<>()); private final boolean readOnly; @NotNull private final CycleCalculator cycleCalculator; @@ -716,8 +717,11 @@ public NavigableSet listCyclesBetween(int lowerCycle, int upperCycle) { public void addCloseListener(Closeable key) { synchronized (closers) { if (!closers.isEmpty()) - closers.removeIf(Closeable::isClosed); - closers.add(key); + closers.removeIf(wrc -> { + final Closeable closeable = wrc.get(); + return closeable != null || closeable.isClosed(); + }); + closers.add(new WeakReference<>(key)); } } @@ -725,7 +729,7 @@ public void addCloseListener(Closeable key) { @Override protected void performClose() { synchronized (closers) { - metaStoreMap.values().forEach(Closeable::closeQuietly); + Closeable.closeQuietly(metaStoreMap.values()); metaStoreMap.clear(); closers.forEach(Closeable::closeQuietly); closers.clear(); @@ -746,7 +750,7 @@ protected void performClose() { // close it if we created it. if (eventLoop instanceof OnDemandEventLoop) - eventLoop.close(); + Closeable.closeQuietly(eventLoop); } @Override @@ -911,9 +915,14 @@ private ToIntFunction fileNameToCycleFunction() { return name -> dateCache.parseCount(name.substring(0, name.length() - SUFFIX.length())); } + @Deprecated(/* to be removed in x.25 */) void removeCloseListener(final StoreTailer storeTailer) { + removeCloseListener((java.io.Closeable) storeTailer); + } + + void removeCloseListener(final java.io.Closeable closeable) { synchronized (closers) { - closers.remove(storeTailer); + closers.removeIf(wrc -> wrc.get() == closeable); } } diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreAppender.java b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreAppender.java index 2421f78d8b..395f7841c5 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreAppender.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreAppender.java @@ -225,6 +225,7 @@ public void writeBytes(@NotNull final WriteBytesMarshallable marshallable) { @Override protected void performClose() { +// queue.removeCloseListener(this); releaseBytesFor(wireForIndex); releaseBytesFor(wire); releaseBytesFor(bufferWire); diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java index 318df458f4..8ef282503e 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java @@ -150,6 +150,7 @@ public DocumentContext readingDocument() { @Override protected void performClose() { +// queue.removeCloseListener((java.io.Closeable) this); Closeable.closeQuietly(indexUpdater); // the wire ref count will be released here by setting it to null context.wire(null); diff --git a/src/test/java/net/openhft/chronicle/queue/SingleChroniclePerfMainTest.java b/src/test/java/net/openhft/chronicle/queue/SingleChroniclePerfMainTest.java index 84b3f471a0..9e2fbb5d05 100644 --- a/src/test/java/net/openhft/chronicle/queue/SingleChroniclePerfMainTest.java +++ b/src/test/java/net/openhft/chronicle/queue/SingleChroniclePerfMainTest.java @@ -66,7 +66,8 @@ static void doPerfTest(TestWriter> writer, TestReader> reader, Histogram readHdr = new Histogram(30, 7); String file = OS.getTarget() + "/deleteme-" + Time.uniqueId(); try (ChronicleQueue chronicle = single(file).blockSize(64 << 20).build(); - ExcerptAppender appender = chronicle.createAppender()) { + ExcerptAppender appender = chronicle.createAppender(); + ExcerptTailer tailer = chronicle.createTailer()) { UncheckedBytes bytes = new UncheckedBytes(BytesStore.empty().bytesForRead()); for (int i = 0; i < count; i++) { long start = System.nanoTime(); @@ -82,7 +83,6 @@ static void doPerfTest(TestWriter> writer, TestReader> reader, writeHdr.sample(time); } - ExcerptTailer tailer = chronicle.createTailer(); for (int i = 0; i < count; i++) { long start2 = System.nanoTime(); try (DocumentContext dc = tailer.readingDocument()) { diff --git a/src/test/java/net/openhft/chronicle/queue/StridingAQueueTest.java b/src/test/java/net/openhft/chronicle/queue/StridingAQueueTest.java index edac522177..e6a6f94584 100644 --- a/src/test/java/net/openhft/chronicle/queue/StridingAQueueTest.java +++ b/src/test/java/net/openhft/chronicle/queue/StridingAQueueTest.java @@ -46,22 +46,23 @@ public void testStriding() { assertEquals(getExpected(), queue.dump().replaceAll("(?m)^#.+$\\n", "")); StringWriter sw = new StringWriter(); - ExcerptTailer tailer = queue.createTailer().direction(TailerDirection.BACKWARD).toEnd().striding(true); - MethodReader reader = tailer.methodReader(Mocker.logging(SAQMessage.class, "", sw)); - while (reader.readOne()) ; - assertEquals("hi[4, 9]\n" + - "hi[4, 8]\n" + - "hi[4, 4]\n" + - "hi[4, 0]\n" + - "hi[3, 8]\n" + - "hi[3, 4]\n" + - "hi[3, 0]\n" + - "hi[2, 7]\n" + - "hi[2, 5]\n" + - "hi[2, 1]\n" + - "hi[1, 4]\n" + - "hi[1, 0]\n", - sw.toString().replace("\r", "")); + try (ExcerptTailer tailer = queue.createTailer().direction(TailerDirection.BACKWARD).toEnd().striding(true)) { + MethodReader reader = tailer.methodReader(Mocker.logging(SAQMessage.class, "", sw)); + while (reader.readOne()) ; + assertEquals("hi[4, 9]\n" + + "hi[4, 8]\n" + + "hi[4, 4]\n" + + "hi[4, 0]\n" + + "hi[3, 8]\n" + + "hi[3, 4]\n" + + "hi[3, 0]\n" + + "hi[2, 7]\n" + + "hi[2, 5]\n" + + "hi[2, 1]\n" + + "hi[1, 4]\n" + + "hi[1, 0]\n", + sw.toString().replace("\r", "")); + } } } diff --git a/src/test/java/net/openhft/chronicle/queue/WriteReadTextTest.java b/src/test/java/net/openhft/chronicle/queue/WriteReadTextTest.java index e384e6895a..d03bd0e2f3 100644 --- a/src/test/java/net/openhft/chronicle/queue/WriteReadTextTest.java +++ b/src/test/java/net/openhft/chronicle/queue/WriteReadTextTest.java @@ -81,9 +81,8 @@ private void doTest(@NotNull String... problematic) { .blockSize(Maths.nextPower2(EXTREMELY_LARGE.length() * 4, 256 << 10)) // .testBlockSize() not suitable as large message sizes. .build(); - ExcerptAppender appender = theQueue.createAppender()) { - - ExcerptTailer tailer = theQueue.createTailer(); + ExcerptAppender appender = theQueue.createAppender(); + ExcerptTailer tailer = theQueue.createTailer()) { StringBuilder tmpReadback = new StringBuilder();