Skip to content

Commit

Permalink
Merge pull request #174 from bosch-io/bugfix/thread-leak
Browse files Browse the repository at this point in the history
Shutdown executor after stream cancellation
  • Loading branch information
yufei-cai authored Oct 26, 2021
2 parents d95f1bf + dc980ef commit 2314743
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@
package org.eclipse.ditto.client.streaming;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.client.internal.bus.AdaptableBus;
import org.eclipse.ditto.client.internal.bus.Classification;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CancelSubscription;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.RequestFromSubscription;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionComplete;
Expand Down Expand Up @@ -111,7 +112,10 @@ public void request(final long n) {
// called by subscriber
@Override
public void cancel() {
singleThreadedExecutorService.submit(this::doCancel);
if (!singleThreadedExecutorService.isShutdown() && !singleThreadedExecutorService.isTerminated()) {
CompletableFuture.runAsync(this::doCancel, singleThreadedExecutorService)
.whenComplete((result, error) -> singleThreadedExecutorService.shutdownNow());
}
}

private void doCancel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.client.messaging.internal.MockMessagingProvider;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.thingsearch.model.signals.commands.ThingSearchCommand;
import org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.InvalidOptionException;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CancelSubscription;
Expand All @@ -47,7 +47,7 @@ public final class ThingSearchPublisherVerificationTest extends PublisherVerific

public ThingSearchPublisherVerificationTest() {
// use new TestEnvironment(true) to debug
super(new TestEnvironment(false), 1000L);
super(new TestEnvironment(1000L, 1000L, false), 1000L);
}

@Override
Expand Down

0 comments on commit 2314743

Please sign in to comment.