Skip to content

Commit

Permalink
Merge pull request #175 from bosch-io/feature/smart-channel
Browse files Browse the repository at this point in the history
Added option for live channel condition.
  • Loading branch information
thjaeckle authored Jan 3, 2022
2 parents 4669791 + 5cc7a35 commit e6fc99b
Show file tree
Hide file tree
Showing 17 changed files with 912 additions and 411 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Comparator;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand All @@ -39,19 +39,25 @@ final class OptionsToDittoHeaders {
EntityTagMatchers.fromList(Collections.singletonList(EntityTagMatcher.asterisk()));

private final JsonSchemaVersion jsonSchemaVersion;
private final Set<? extends OptionName> allowedOptions;
private final SortedSet<? extends OptionName> allowedOptions;
private final OptionsEvaluator.Global globalOptionsEvaluator;
private final OptionsEvaluator.Modify modifyOptionsEvaluator;

private final DittoHeadersBuilder<?, ?> headersBuilder;

private OptionsToDittoHeaders(final JsonSchemaVersion jsonSchemaVersion,
final Collection<? extends OptionName> allowedOptions,
final Collection<? extends OptionName> explicitlyAllowedOptions,
final OptionsEvaluator.Global globalOptionsEvaluator,
final OptionsEvaluator.Modify modifyOptionsEvaluator) {

this.jsonSchemaVersion = jsonSchemaVersion;
this.allowedOptions = Collections.unmodifiableSet(new HashSet<>(allowedOptions));

final SortedSet<OptionName> allAllowedOptions = new TreeSet<>(Comparator.comparing(Object::toString));
allAllowedOptions.addAll(explicitlyAllowedOptions);
allAllowedOptions.add(OptionName.Global.DITTO_HEADERS);
allAllowedOptions.add(OptionName.Modify.RESPONSE_REQUIRED);
allowedOptions = Collections.unmodifiableSortedSet(allAllowedOptions);

this.globalOptionsEvaluator = globalOptionsEvaluator;
this.modifyOptionsEvaluator = modifyOptionsEvaluator;

Expand All @@ -62,18 +68,20 @@ private OptionsToDittoHeaders(final JsonSchemaVersion jsonSchemaVersion,
* Returns {@link DittoHeaders} that are based on the specified arguments.
*
* @param schemaVersion the JSON schema version to be used.
* @param allowedOptions the options that are allowed for a particular outgoing message.
* @param explicitlyAllowedOptions the options that are explicitly allowed for a particular outgoing message.
* @param options the user provided options.
* @return the {@code DittoHeaders}.
* @throws NullPointerException if any argument is {@code null}.
* @throws IllegalArgumentException if {@code options} contains items that are not within
* {@code explicitlyAllowedOptions} or that are not implicitly allowed.
*/
static DittoHeaders getDittoHeaders(final JsonSchemaVersion schemaVersion,
final Collection<? extends OptionName> allowedOptions,
final Collection<? extends OptionName> explicitlyAllowedOptions,
final Option<?>[] options) {

final OptionsToDittoHeaders optionsToDittoHeaders = new OptionsToDittoHeaders(
ConditionChecker.checkNotNull(schemaVersion, "schemaVersion"),
ConditionChecker.checkNotNull(allowedOptions, "allowedOptions"),
ConditionChecker.checkNotNull(explicitlyAllowedOptions, "explicitlyAllowedOptions"),
OptionsEvaluator.forGlobalOptions(options),
OptionsEvaluator.forModifyOptions(options)
);
Expand All @@ -88,6 +96,7 @@ private DittoHeaders getDittoHeaders() {
setResponseRequired();
setEntityTagMatchers();
setCondition();
setLiveChannelCondition();
return buildDittoHeaders();
}

Expand All @@ -96,7 +105,22 @@ private void putAdditionalHeaders(final DittoHeaders additionalHeaders) {
}

private DittoHeaders getAdditionalHeaders() {
return globalOptionsEvaluator.getDittoHeaders().orElseGet(DittoHeaders::empty);
final DittoHeaders result;
final Optional<DittoHeaders> dittoHeadersOptional = globalOptionsEvaluator.getDittoHeaders();
if (dittoHeadersOptional.isPresent()) {
validateIfOptionIsAllowed(OptionName.Global.DITTO_HEADERS);
result = dittoHeadersOptional.get();
} else {
result = DittoHeaders.empty();
}
return result;
}

private void validateIfOptionIsAllowed(final OptionName optionName) {
if (!allowedOptions.contains(optionName)) {
final String pattern = "Option ''{0}'' is not allowed. This operation only allows {1}.";
throw new IllegalArgumentException(MessageFormat.format(pattern, optionName, allowedOptions));
}
}

private void setRandomCorrelationIdIfMissing(final DittoHeaders additionalHeaders) {
Expand Down Expand Up @@ -126,14 +150,6 @@ private void setEntityTagMatchers() {
});
}

private void validateIfOptionIsAllowed(final OptionName option) {
if (!allowedOptions.contains(option)) {
final String pattern = "Option ''{0}'' is not allowed for this operation.";
final String lowerCaseOptionName = option.toString().toLowerCase(Locale.ENGLISH);
throw new IllegalArgumentException(MessageFormat.format(pattern, lowerCaseOptionName));
}
}

private void setCondition() {
globalOptionsEvaluator.condition()
.ifPresent(condition -> {
Expand All @@ -142,6 +158,14 @@ private void setCondition() {
});
}

private void setLiveChannelCondition() {
globalOptionsEvaluator.getLiveChannelCondition()
.ifPresent(liveChannelCondition -> {
validateIfOptionIsAllowed(OptionName.Global.LIVE_CHANNEL_CONDITION);
headersBuilder.liveChannelCondition(liveChannelCondition);
});
}

private DittoHeaders buildDittoHeaders() {
return headersBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.client.options.OptionName.Global.CONDITION;
import static org.eclipse.ditto.client.options.OptionName.Global.LIVE_CHANNEL_CONDITION;
import static org.eclipse.ditto.client.options.OptionName.Modify.EXISTS;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
Expand All @@ -27,7 +27,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import javax.annotation.Nullable;

Expand All @@ -37,6 +38,7 @@
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.client.live.messages.MessageSerializationException;
import org.eclipse.ditto.client.live.messages.MessageSerializer;
import org.eclipse.ditto.client.live.messages.MessageSerializerKey;
import org.eclipse.ditto.client.live.messages.MessageSerializerRegistry;
import org.eclipse.ditto.client.live.messages.MessageSerializers;
import org.eclipse.ditto.client.options.Option;
Expand Down Expand Up @@ -212,34 +214,31 @@ MergeThing mergeThing(final ThingId thingId, final Thing thing, final Option<?>[
}

public RetrieveThing retrieveThing(final ThingId thingId, final Option<?>... options) {
return RetrieveThing.of(thingId, buildDittoHeaders(EnumSet.of(CONDITION), options));
return RetrieveThing.of(thingId, buildDittoHeaders(EnumSet.of(CONDITION, LIVE_CHANNEL_CONDITION), options));
}

public RetrieveThing retrieveThing(final ThingId thingId,
final Iterable<JsonPointer> fields,
final Option<?>... options) {

return RetrieveThing.getBuilder(thingId, buildDittoHeaders(EnumSet.of(CONDITION), options))
return RetrieveThing.getBuilder(thingId,
buildDittoHeaders(EnumSet.of(CONDITION, LIVE_CHANNEL_CONDITION), options))
.withSelectedFields(JsonFactory.newFieldSelector(fields))
.build();
}

public RetrieveThings retrieveThings(final Iterable<ThingId> thingIds) {
return RetrieveThings.getBuilder(makeList(thingIds))
return RetrieveThings.getBuilder(getAsList(thingIds))
.dittoHeaders(buildDittoHeaders(Collections.emptySet()))
.build();
}

private static <E> List<E> makeList(final Iterable<E> iter) {
final List<E> list = new ArrayList<>();
for (final E item : iter) {
list.add(item);
}
return list;
private static <E> List<E> getAsList(final Iterable<E> iterable) {
return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
}

public RetrieveThings retrieveThings(final Iterable<ThingId> thingIds, final Iterable<JsonPointer> fields) {
return RetrieveThings.getBuilder(makeList(thingIds))
return RetrieveThings.getBuilder(getAsList(thingIds))
.selectedFields(JsonFactory.newFieldSelector(fields))
.dittoHeaders(buildDittoHeaders(Collections.emptySet()))
.build();
Expand Down Expand Up @@ -391,15 +390,20 @@ public MergeThing mergePolicyId(final ThingId thingId, final PolicyId policyId,
}

public RetrieveFeature retrieveFeature(final ThingId thingId, final String featureId, final Option<?>... options) {
return RetrieveFeature.of(thingId, featureId, buildDittoHeaders(EnumSet.of(CONDITION), options));
return RetrieveFeature.of(thingId,
featureId,
buildDittoHeaders(EnumSet.of(CONDITION, LIVE_CHANNEL_CONDITION), options));
}

public RetrieveFeature retrieveFeature(final ThingId thingId,
final String featureId,
final JsonFieldSelector fieldSelector,
final Option<?>... options) {

return RetrieveFeature.of(thingId, featureId, fieldSelector, buildDittoHeaders(EnumSet.of(CONDITION), options));
return RetrieveFeature.of(thingId,
featureId,
fieldSelector,
buildDittoHeaders(EnumSet.of(CONDITION, LIVE_CHANNEL_CONDITION), options));
}

public DeleteFeature deleteFeature(final ThingId thingId, final String featureId, final Option<?>... options) {
Expand Down Expand Up @@ -555,14 +559,14 @@ public <T> Message<T> sendMessage(final MessageSerializerRegistry registry, fina
// if no content-type was explicitly set, but a payload
if (!msgContentType.isPresent()) {
// find out the content-type by the payload java-type:
final String implicitContentType = registry.findKeyFor(payloadType, subject)
.orElseThrow(
() -> new MessageSerializationException(
"No content-type could be determined for payload of type '" +
payloadType + "'. "
+
"Ensure that a a MessageSerializer for that payload-type is registered"))
.getContentType();
final MessageSerializerKey<T> serializerKey = registry.findKeyFor(payloadType, subject)
.orElseThrow(() -> new MessageSerializationException(
"No content-type could be determined for payload of type '"
+ payloadType + "'."
+ " Ensure that a MessageSerializer for that payload-type is"
+ " registered"
));
final String implicitContentType = serializerKey.getContentType();

final MessageHeaders adjustedHeaders =
messageHeaders.toBuilder().contentType(implicitContentType).build();
Expand Down Expand Up @@ -630,20 +634,17 @@ private void validateOptions(@Nullable final JsonObject initialPolicy, final Opt
}

private Optional<String> getPolicyIdOrPlaceholder(final Option<?>... options) {
final Optional<String> result;
final OptionsEvaluator.Modify optionsEvaluator = OptionsEvaluator.forModifyOptions(options);
final Optional<String> copyPolicy = optionsEvaluator.copyPolicy().map(PolicyId::toString);

return ifPresentOrElse(copyPolicy,
() -> optionsEvaluator.copyPolicyFromThingId()
.map(ThingId::toString)
.map(thingId -> "{{ ref:things/" + thingId + "/policyId }}"));
}

private static <T> Optional<T> ifPresentOrElse(final Optional<T> optional, final Supplier<Optional<T>> otherwise) {
if (optional.isPresent()) {
return optional;
final Optional<PolicyId> copyPolicy = optionsEvaluator.copyPolicy();
if (copyPolicy.isPresent()) {
result = copyPolicy.map(PolicyId::toString);
} else {
result = optionsEvaluator.copyPolicyFromThingId()
.map(ThingId::toString)
.map(thingId -> "{{ ref:things/" + thingId + "/policyId }}");
}
return otherwise.get();
return result;
}

}
Loading

0 comments on commit e6fc99b

Please sign in to comment.