Skip to content

Commit

Permalink
Merge pull request #105 from bosch-io/bugfix/handle-consumption-error
Browse files Browse the repository at this point in the history
Ditto-java-client bugfix: When starting consumption with invalid filter, wrongly timeout exception is propagate to the user
  • Loading branch information
dguggemos authored Dec 1, 2020
2 parents f724b82 + 6ac4dfb commit 55d5ca2
Show file tree
Hide file tree
Showing 14 changed files with 596 additions and 73 deletions.
14 changes: 8 additions & 6 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
<equals-verifier.version>3.0.3</equals-verifier.version>
<mockito.version>3.1.0</mockito.version>
<jsonassert.version>1.2.3</jsonassert.version>
<awaitility.version>4.0.3</awaitility.version>

<!-- reactive streams versions -->
<reactive-streams.version>1.0.3</reactive-streams.version>
Expand Down Expand Up @@ -332,12 +333,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M4</version>
<configuration>
<systemProperties>
<kamon.auto-start>true</kamon.auto-start>
</systemProperties>
</configuration>
<version>3.0.0-M5</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -920,6 +916,12 @@
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mutabilitydetector</groupId>
<artifactId>MutabilityDetector</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -53,6 +55,9 @@
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.messages.Message;
import org.eclipse.ditto.model.messages.MessageDirection;
import org.eclipse.ditto.model.messages.MessageHeaders;
Expand All @@ -67,6 +72,7 @@
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.base.WithOptionalEntity;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import org.eclipse.ditto.signals.commands.things.modify.CreateThing;
import org.eclipse.ditto.signals.commands.things.modify.DeleteThing;
import org.eclipse.ditto.signals.commands.things.modify.ModifyThing;
Expand All @@ -93,6 +99,7 @@ public abstract class CommonManagementImpl<T extends ThingHandle<F>, F extends F

protected final OutgoingMessageFactory outgoingMessageFactory;

private final AtomicBoolean subscriptionRequestPending = new AtomicBoolean(false);
private final HandlerRegistry<T, F> handlerRegistry;
private final PointerBus bus;

Expand All @@ -111,12 +118,20 @@ protected CommonManagementImpl(

@Override
public CompletableFuture<Void> startConsumption() {
return doStartConsumption(Collections.emptyMap());
// do not call doStartConsumption directly
return startConsumption(new Option[]{});
}

@Override
public CompletableFuture<Void> startConsumption(final Option<?>... consumptionOptions) {

// concurrent consumption requests can have strange effects, so better avoid it
if (!subscriptionRequestPending.compareAndSet(false, true)) {
final CompletableFuture<Void> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new ConcurrentConsumptionRequestException());
return failedFuture;
}

// only accept "Consumption" related options here:
final Optional<Option<?>> unknownOptionIncluded = Arrays.stream(consumptionOptions)
.filter(option -> !option.getName().equals(OptionName.Consumption.NAMESPACES))
Expand All @@ -139,7 +154,8 @@ public CompletableFuture<Void> startConsumption(final Option<?>... consumptionOp
options.getExtraFields().ifPresent(extraFields ->
subscriptionConfig.put(CONSUMPTION_PARAM_EXTRA_FIELDS, extraFields.toString()));

return doStartConsumption(subscriptionConfig);
// make sure to reset the flag when consumption request completes
return doStartConsumption(subscriptionConfig).whenComplete((v, t) -> subscriptionRequestPending.set(false));
}

/**
Expand Down Expand Up @@ -639,8 +655,11 @@ protected AdaptableBus.SubscriptionId subscribeAndPublishMessage(
final CompletableFuture<Void> futureToCompleteOrFailAfterAck,
final Function<Adaptable, NotifyMessage> adaptableToNotifier) {

LOGGER.trace("Sending {} and waiting for {}", protocolCommand, protocolCommandAck);
final String correlationId = UUID.randomUUID().toString();
final String protocolCommandWithCorrelationId = appendCorrelationIdParameter(protocolCommand, correlationId);
LOGGER.trace("Sending {} and waiting for {}", protocolCommandWithCorrelationId, protocolCommandAck);
final AdaptableBus adaptableBus = messagingProvider.getAdaptableBus();

if (previousSubscriptionId != null) {
// remove previous subscription without going through back-end because subscription will be replaced
adaptableBus.unsubscribe(previousSubscriptionId);
Expand All @@ -649,11 +668,42 @@ protected AdaptableBus.SubscriptionId subscribeAndPublishMessage(
adaptableBus.subscribeForAdaptable(streamingType,
adaptable -> adaptableToNotifier.apply(adaptable).accept(getBus()));
final Classification tag = Classification.forString(protocolCommandAck);
adjoin(adaptableBus.subscribeOnceForString(tag, getTimeout()), futureToCompleteOrFailAfterAck);
messagingProvider.emit(protocolCommand);

// subscribe exclusively because we allow only one request at a time
final CompletionStage<String> ackStage = adaptableBus.subscribeOnceForStringExclusively(tag, getTimeout());
final CompletableFuture<String> ackFuture = ackStage.toCompletableFuture();

// subscribe for possible error responses by correlationId
final Classification correlationIdTag = Classification.forCorrelationId(correlationId);
adaptableBus.subscribeOnceForAdaptable(correlationIdTag, getTimeout())
.thenAccept(adaptable -> {
final Signal<?> signal = AbstractHandle.PROTOCOL_ADAPTER.fromAdaptable(adaptable);
if (signal instanceof ThingErrorResponse) {
ackFuture.completeExceptionally(((ThingErrorResponse) signal).getDittoRuntimeException());
} else {
ackFuture.completeExceptionally(getUnexpectedSignalException(signal));
}
});

adjoin(ackFuture, futureToCompleteOrFailAfterAck);
messagingProvider.emit(protocolCommandWithCorrelationId);

return subscriptionId;
}

private DittoRuntimeException getUnexpectedSignalException(final Signal<?> signal) {
return DittoRuntimeException
.newBuilder("signal.unexpected", HttpStatusCode.BAD_REQUEST)
.message(() -> String.format("Received unexpected response of type '%s'.", signal.getType()))
.build();
}

private String appendCorrelationIdParameter(final String protocolCommand, final String correlationId) {
final String separator = protocolCommand.contains("?") ? "&" : "?";
return String.format("%s%s%s=%s", protocolCommand, separator,
DittoHeaderDefinition.CORRELATION_ID.getKey(), correlationId);
}

/**
* Remove a subscription.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.client.internal;

/**
* This exception may be thrown if concurrent consumption requests are detected.
*/
public class ConcurrentConsumptionRequestException extends RuntimeException {

private static final long serialVersionUID = -565137801315595348L;
private static final String MESSAGE = "Concurrent consumption requests are not allowed on one channel.";

/**
* Constructs a new {@code UncompletedTwinConsumptionRequestException} object.
*/
public ConcurrentConsumptionRequestException() {
super(MESSAGE, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,21 @@ public interface AdaptableBus {
*
* @param tag the string classification, usually itself.
* @param timeout how long to wait for a match.
* @return a future adaptable matching the tag according to the classifiers, or a failed future
* if no adaptable is matched within the timeout.
* @return a future string matching the tag according to the classifiers, or a failed future
* if no string is matched within the timeout.
*/
CompletionStage<String> subscribeOnceForString(Classification tag, Duration timeout);

/**
* Add a one-time subscriber for a string message by replacing an existing with the same string value.
*
* @param tag the string classification, usually itself.
* @param timeout how long to wait for a match.
* @return a future string matching the tag according to the classifiers, or a failed future
* if no string is matched within the timeout.
*/
CompletionStage<String> subscribeOnceForStringExclusively(Classification tag, Duration timeout);

/**
* Add a one-time subscriber for an adaptable message. Only effective if no one-time string subscriber matches.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public CompletionStage<String> subscribeOnceForString(final Classification tag,
return subscribeOnce(oneTimeStringConsumers, tag, timeout);
}

@Override
public CompletionStage<String> subscribeOnceForStringExclusively(final Classification tag, final Duration timeout) {
return subscribeOnce(oneTimeStringConsumers, tag, timeout, true);
}

@Override
public CompletionStage<Adaptable> subscribeOnceForAdaptable(final Classification tag,
final Duration timeout) {
Expand Down Expand Up @@ -158,8 +163,14 @@ public void publish(final String message) {
@Override
public void shutdownExecutor() {
LOGGER.trace("Shutting down AdaptableBus Executors");
singleThreadedExecutorService.shutdownNow();
scheduledExecutorService.shutdownNow();
try {
singleThreadedExecutorService.shutdownNow();
scheduledExecutorService.shutdownNow();
singleThreadedExecutorService.awaitTermination(2, TimeUnit.SECONDS);
scheduledExecutorService.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.info("Waiting for termination was interrupted.");
}
}

// call this in a single-threaded executor so that ordering is preserved
Expand Down Expand Up @@ -206,9 +217,21 @@ private <T> CompletionStage<T> subscribeOnce(
final Map<Classification, Set<Entry<Consumer<T>>>> registry,
final Classification tag,
final Duration timeout) {
return subscribeOnce(registry, tag, timeout, false);
}

private <T> CompletionStage<T> subscribeOnce(
final Map<Classification, Set<Entry<Consumer<T>>>> registry,
final Classification tag,
final Duration timeout,
final boolean exclusively) {
final CompletableFuture<T> resultFuture = new CompletableFuture<>();
final Entry<Consumer<T>> subscriber = new Entry<>(tag, resultFuture::complete);
addEntry(registry, subscriber);
if (exclusively) {
replaceEntry(registry, subscriber);
} else {
addEntry(registry, subscriber);
}
removeAfter(registry, subscriber, timeout, resultFuture);
return resultFuture;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.client;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.eclipse.ditto.client.internal.AbstractDittoClientTest;
import org.eclipse.ditto.client.internal.ConcurrentConsumptionRequestException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Tests different sequences of failing/succeeding startConsumption invocations.
*/
abstract class AbstractConsumptionDittoClientTest extends AbstractDittoClientTest {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDittoClientTest.class);

@Test
public void concurrentStartConsumptionFails() {
try {
final CompletableFuture<Void> firstRequest = startConsumptionRequest();
final CompletableFuture<Void> concurrentRequest = startConsumptionRequest();

replyToConsumptionRequest();

assertCompletion(firstRequest);
concurrentRequest.get(1, TimeUnit.SECONDS);
} catch (final Exception e) {
assertThat(e)
.isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(ConcurrentConsumptionRequestException.class);
}
}

protected abstract CompletableFuture<Void> startConsumptionRequest();

protected abstract void replyToConsumptionRequest();

@Test
public void testStartConsumptionCombinations() {
// test all combinations of startConsumption invocations expecting success or failure of the given length
testStartConsumptionSequence(4, Collections.emptyList());
}

protected void testStartConsumptionSequence(int remaining, final List<Boolean> sequence) {
testSequence(sequence);
if (remaining > 0) {
testStartConsumptionSequence(remaining - 1, addElement(sequence, true));
testStartConsumptionSequence(remaining - 1, addElement(sequence, false));
}
}

private void testSequence(final List<Boolean> sequence) {
if (sequence.isEmpty()) {
return;
}
LOGGER.info("Testing startConsumption sequence: {}",
sequence.stream()
.map(success -> success ? "expect success" : "expect failure")
.collect(Collectors.joining(" -> ")));
sequence.forEach(expectSuccess -> {
messaging.clearEmitted();
if (expectSuccess) {
startConsumptionSucceeds();
} else {
startConsumptionAndExpectError();
}
});
}

private List<Boolean> addElement(final List<Boolean> sequence, final boolean b) {
final ArrayList<Boolean> success = new ArrayList<>(sequence);
success.add(b);
return success;
}

protected void assertFailedCompletion(final CompletableFuture<Void> future,
final Class<? extends Exception> exception) {
try {
future.get(1L, TimeUnit.SECONDS);
} catch (final Exception e) {
assertThat(e)
.isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(exception);
}
}

/**
* Tests a succeeding startConsumption invocation.
*/
protected abstract void startConsumptionSucceeds();

/**
* Tests a failing startConsumption invocation.
*/
protected abstract void startConsumptionAndExpectError();
}
Loading

0 comments on commit 55d5ca2

Please sign in to comment.