Skip to content

Commit

Permalink
Merge pull request #98 from bosch-io/feature/declared-acks
Browse files Browse the repository at this point in the history
Allow declaration of acknowledgements for Ditto Java Client
  • Loading branch information
thjaeckle authored Oct 23, 2020
2 parents 066c9c0 + c973112 commit 52f6bc7
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;

/**
Expand Down Expand Up @@ -49,6 +52,14 @@ public interface MessagingConfiguration {
*/
URI getEndpointUri();

/**
* Returns the labels of all acknowledgements that are declared to be provided by this connection.
*
* @return the acknowledgment labels.
* @since 1.4.0
*/
Set<AcknowledgementLabel> getDeclaredAcknowledgements();

/**
* @return {@code true} if client should try to reconnect when connection is lost.
*/
Expand Down Expand Up @@ -114,6 +125,15 @@ interface Builder {
*/
Builder endpoint(String endpoint);

/**
* Sets the labels of all acknowledgements that are declared to be provided by this client session/connection.
*
* @param acknowledgementLabels the acknowledgement labels
* @return this builder.
* @since 1.4.0
*/
Builder declaredAcknowledgements(Collection<AcknowledgementLabel> acknowledgementLabels);

/**
* Sets if {@code reconnectEnabled}.
* <p> Default is enabled. If a connection was established once, the client tries to reconnect <em>every 5
Expand Down Expand Up @@ -157,7 +177,7 @@ interface Builder {
* @param handler the handler that will be called with the cause of the connection error.
* @since 1.2.0
*/
Builder connectionErrorHandler(final Consumer<Throwable> handler);
Builder connectionErrorHandler(@Nullable final Consumer<Throwable> handler);

/**
* Creates a new instance of {@code MessagingConfiguration}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;

/**
Expand All @@ -44,6 +49,7 @@ public final class WebSocketMessagingConfiguration implements MessagingConfigura
@Nullable private final ProxyConfiguration proxyConfiguration;
@Nullable private final TrustStoreConfiguration trustStoreConfiguration;
@Nullable private final Consumer<Throwable> connectionErrorHandler;
private final Set<AcknowledgementLabel> declaredAcknowledgements;

public WebSocketMessagingConfiguration(final WebSocketMessagingConfigurationBuilder builder,
final URI endpointUri) {
Expand All @@ -55,6 +61,7 @@ public WebSocketMessagingConfiguration(final WebSocketMessagingConfigurationBuil
trustStoreConfiguration = builder.trustStoreConfiguration;
connectionErrorHandler = builder.connectionErrorHandler;
this.timeout = builder.timeout;
this.declaredAcknowledgements = Collections.unmodifiableSet(builder.declaredAcknowledgements);
this.endpointUri = endpointUri;
}

Expand All @@ -77,6 +84,11 @@ public URI getEndpointUri() {
return endpointUri;
}

@Override
public Set<AcknowledgementLabel> getDeclaredAcknowledgements() {
return declaredAcknowledgements;
}

@Override
public boolean isReconnectEnabled() {
return reconnectEnabled;
Expand Down Expand Up @@ -116,6 +128,7 @@ private static final class WebSocketMessagingConfigurationBuilder implements Mes
@Nullable private ProxyConfiguration proxyConfiguration;
private TrustStoreConfiguration trustStoreConfiguration;
@Nullable private Consumer<Throwable> connectionErrorHandler;
private final Set<AcknowledgementLabel> declaredAcknowledgements = new HashSet<>();

private WebSocketMessagingConfigurationBuilder() {
jsonSchemaVersion = JsonSchemaVersion.LATEST;
Expand Down Expand Up @@ -150,6 +163,13 @@ public MessagingConfiguration.Builder endpoint(final String endpoint) {
return this;
}

@Override
public Builder declaredAcknowledgements(final Collection<AcknowledgementLabel> acknowledgementLabels) {
this.declaredAcknowledgements.clear();
this.declaredAcknowledgements.addAll(acknowledgementLabels);
return this;
}

@Override
public MessagingConfiguration.Builder reconnectEnabled(final boolean reconnectEnabled) {
this.reconnectEnabled = reconnectEnabled;
Expand Down Expand Up @@ -177,7 +197,7 @@ public MessagingConfiguration.Builder trustStoreConfiguration(
}

@Override
public Builder connectionErrorHandler(final Consumer<Throwable> handler) {
public Builder connectionErrorHandler(@Nullable final Consumer<Throwable> handler) {
this.connectionErrorHandler = handler;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.client.internal;

import java.text.MessageFormat;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
Expand All @@ -24,7 +25,9 @@
import org.eclipse.ditto.client.changes.internal.ImmutableFeatureChange;
import org.eclipse.ditto.client.changes.internal.ImmutableFeaturesChange;
import org.eclipse.ditto.client.changes.internal.ImmutableThingChange;
import org.eclipse.ditto.client.internal.bus.AdaptableBus;
import org.eclipse.ditto.client.internal.bus.BusFactory;
import org.eclipse.ditto.client.internal.bus.Classification;
import org.eclipse.ditto.client.internal.bus.JsonPointerSelectors;
import org.eclipse.ditto.client.internal.bus.PointerBus;
import org.eclipse.ditto.client.internal.bus.SelectorUtil;
Expand All @@ -37,6 +40,8 @@
import org.eclipse.ditto.client.twin.Twin;
import org.eclipse.ditto.client.twin.internal.TwinImpl;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotDeclaredException;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeadersBuilder;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
Expand All @@ -47,9 +52,11 @@
import org.eclipse.ditto.protocoladapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.protocoladapter.ProtocolAdapter;
import org.eclipse.ditto.protocoladapter.ProtocolFactory;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
import org.eclipse.ditto.signals.events.things.AclEntryCreated;
import org.eclipse.ditto.signals.events.things.AclEntryDeleted;
import org.eclipse.ditto.signals.events.things.AclEntryModified;
Expand Down Expand Up @@ -107,6 +114,7 @@ private DefaultDittoClient(final TwinImpl twin, final LiveImpl live, final Polic
this.live = live;
this.policies = policies;
logVersionInformation();
handleSpontaneousErrors();
}

/**
Expand Down Expand Up @@ -587,4 +595,48 @@ public CompletionStage<DittoClient> connect() {
.thenCompose(result -> policies.messagingProvider.initializeAsync())
.thenApply(result -> this);
}

private void handleSpontaneousErrors() {
handleSpontaneousErrors(twin.messagingProvider);
if (live.messagingProvider != twin.messagingProvider) {
handleSpontaneousErrors(live.messagingProvider);
}
if (policies.messagingProvider != twin.messagingProvider) {
handleSpontaneousErrors(policies.messagingProvider);
}
}

/**
* Handle {@code DittoRuntimeException}s from the back-end that are not replies of anything.
*
* @param provider the messaging provider.
*/
private static void handleSpontaneousErrors(final MessagingProvider provider) {
final Optional<Consumer<Throwable>> connectionErrorHandler =
provider.getMessagingConfiguration().getConnectionErrorHandler();
if (connectionErrorHandler.isPresent()) {
final AdaptableBus adaptableBus = provider.getAdaptableBus();
final Consumer<Throwable> consumer = connectionErrorHandler.get();

final Classification ackLabelNotUnique =
Classification.forErrorCode(AcknowledgementLabelNotUniqueException.ERROR_CODE);
final Classification ackLabelNotDeclared =
Classification.forErrorCode(AcknowledgementLabelNotDeclaredException.ERROR_CODE);

adaptableBus.subscribeForAdaptableExclusively(ackLabelNotUnique,
adaptable -> consumer.accept(asDittoRuntimeException(adaptable)));
adaptableBus.subscribeForAdaptableExclusively(ackLabelNotDeclared,
adaptable -> consumer.accept(asDittoRuntimeException(adaptable)));
}
}

private static Throwable asDittoRuntimeException(final Adaptable adaptable) {
final Signal<?> signal = AbstractHandle.PROTOCOL_ADAPTER.fromAdaptable(adaptable);
if (signal instanceof ErrorResponse) {
return ((ErrorResponse<?>) signal).getDittoRuntimeException();
} else {
return new ClassCastException("Expect an error response, got: " +
ProtocolFactory.wrapAsJsonifiableAdaptable(adaptable).toJsonString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -83,6 +84,16 @@ public interface AdaptableBus {
*/
SubscriptionId subscribeForAdaptable(Classification tag, Consumer<Adaptable> adaptableConsumer);

/**
* Add a persistent subscriber for an adaptable message and remove all other subscribers.
* Only effective if no one-time string or adaptable subscriber matches.
*
* @param tag the adaptable classification.
* @param adaptableConsumer the consumer of the adaptable message.
* @return the subscription ID.
*/
SubscriptionId subscribeForAdaptableExclusively(Classification tag, Consumer<Adaptable> adaptableConsumer);

/**
* Add a persistent subscriber for an adaptable message that are removed after a timeout.
* If tag requires sequentialization, take care that all consumer and predicate parameters are fast,
Expand All @@ -109,6 +120,11 @@ SubscriptionId subscribeForAdaptableWithTimeout(Classification tag,
*/
boolean unsubscribe(@Nullable SubscriptionId subscriptionId);

/**
* @return the scheduled executor service of this adaptable bus.
*/
ScheduledExecutorService getScheduledExecutor();

/**
* Closes the executor of the adaptable bus .
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static AdaptableBus createAdaptableBus() {
.addStringClassifier(Classifiers.identity())
.addAdaptableClassifier(Classifiers.correlationId())
.addAdaptableClassifier(Classifiers.streamingType())
.addAdaptableClassifier(Classifiers.thingsSearch());
.addAdaptableClassifier(Classifiers.thingsSearch())
.addAdaptableClassifier(Classifiers.errorCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ static Classification forThingsSearch(final String searchSubscriptionId) {
return new SearchSubscriptionId(searchSubscriptionId);
}

/**
* Create an error-code classification key.
*
* @param errorCode the error code.
* @return the key.
*/
static Classification forErrorCode(final String errorCode) {
return new ErrorCode(errorCode);
}

/**
* Check whether subscribers for this classification requires sequential dispatching.
*
Expand Down Expand Up @@ -192,4 +202,11 @@ static <T> Optional<Classification> of(final T value) {
return Optional.of(new Identity<>(value));
}
}

final class ErrorCode extends Literal<String> {

private ErrorCode(final String errorCode) {
super(errorCode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Optional;

import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.signals.events.thingsearch.SubscriptionEvent;
Expand Down Expand Up @@ -66,6 +67,15 @@ public static Classifier<Adaptable> thingsSearch() {
return Instances.THINGS_SEARCH_CLASSIFIER;
}

/**
* Classify Ditto protocol errors by error codes.
*
* @return the error code classifier.
*/
public static Classifier<Adaptable> errorCode() {
return Instances.ERROR_CODE_CLASSIFIER;
}

private static final class StreamingTypeClassifier implements Classifier<Adaptable> {

@Override
Expand Down Expand Up @@ -116,6 +126,22 @@ public Optional<Classification> classify(final Adaptable message) {
}
}

private static final class ErrorCodeClassifier implements Classifier<Adaptable> {

@Override
public Optional<Classification> classify(final Adaptable message) {
if (message.getTopicPath().getCriterion() == TopicPath.Criterion.ERRORS) {
return message.getPayload()
.getValue()
.filter(JsonValue::isObject)
.flatMap(value -> value.asObject().getValue(DittoRuntimeException.JsonFields.ERROR_CODE))
.map(Classification::forErrorCode);
} else {
return Optional.empty();
}
}
}

private static final class Instances {

private static final Classifier<Adaptable> CORRELATION_ID_CLASSIFIER = adaptable ->
Expand All @@ -126,5 +152,7 @@ private static final class Instances {
private static final Classifier<Adaptable> STREAMING_TYPE_CLASSIFIER = new StreamingTypeClassifier();

private static final Classifier<Adaptable> THINGS_SEARCH_CLASSIFIER = new ThingsSearchClassifier();

private static final Classifier<Adaptable> ERROR_CODE_CLASSIFIER = new ErrorCodeClassifier();
}
}
Loading

0 comments on commit 52f6bc7

Please sign in to comment.