diff --git a/DEPENDENCIES b/DEPENDENCIES index 3d67be566ab..0c734565950 100644 --- a/DEPENDENCIES +++ b/DEPENDENCIES @@ -3,7 +3,7 @@ maven/mavencentral/com.apicatalog/iron-ed25519-cryptosuite-2020/0.8.1, Apache-2. maven/mavencentral/com.apicatalog/iron-verifiable-credentials/0.8.1, Apache-2.0, approved, #9234 maven/mavencentral/com.apicatalog/titanium-json-ld/1.0.0, Apache-2.0, approved, clearlydefined maven/mavencentral/com.apicatalog/titanium-json-ld/1.3.1, Apache-2.0, approved, #8912 -maven/mavencentral/com.apicatalog/titanium-json-ld/1.4.0, , restricted, clearlydefined +maven/mavencentral/com.apicatalog/titanium-json-ld/1.4.0, Apache-2.0, approved, #13683 maven/mavencentral/com.atomikos/atomikos-util/6.0.0, Apache-2.0, approved, #9326 maven/mavencentral/com.atomikos/transactions-api/6.0.0, Apache-2.0, approved, #10351 maven/mavencentral/com.atomikos/transactions-jdbc/6.0.0, Apache-2.0, approved, #9273 diff --git a/core/common/edr-store-core/src/main/java/org/eclipse/edc/core/edr/EndpointDataReferenceStoreDefaultServicesExtension.java b/core/common/edr-store-core/src/main/java/org/eclipse/edc/core/edr/EndpointDataReferenceStoreDefaultServicesExtension.java index adc902b6aff..104c7ffa389 100644 --- a/core/common/edr-store-core/src/main/java/org/eclipse/edc/core/edr/EndpointDataReferenceStoreDefaultServicesExtension.java +++ b/core/common/edr-store-core/src/main/java/org/eclipse/edc/core/edr/EndpointDataReferenceStoreDefaultServicesExtension.java @@ -37,12 +37,7 @@ public class EndpointDataReferenceStoreDefaultServicesExtension implements Servi @Setting(value = "Directory/Path where to store EDRs in the vault for vaults that supports hierarchical structuring.", defaultValue = DEFAULT_EDR_VAULT_PATH) public static final String EDC_EDR_VAULT_PATH = "edc.edr.vault.path"; protected static final String NAME = "Endpoint Data Reference Core Default Services Extension"; - @Inject - private EndpointDataReferenceEntryIndex edrEntryStore; - - @Inject - private EndpointDataReferenceCache edrCache; - + @Inject private CriterionOperatorRegistry criterionOperatorRegistry; diff --git a/core/common/edr-store-core/src/main/java/org/eclipse/edc/core/edr/defaults/InMemoryEndpointDataReferenceEntryIndex.java b/core/common/edr-store-core/src/main/java/org/eclipse/edc/core/edr/defaults/InMemoryEndpointDataReferenceEntryIndex.java index e4db5251ba1..1e2ef1a344b 100644 --- a/core/common/edr-store-core/src/main/java/org/eclipse/edc/core/edr/defaults/InMemoryEndpointDataReferenceEntryIndex.java +++ b/core/common/edr-store-core/src/main/java/org/eclipse/edc/core/edr/defaults/InMemoryEndpointDataReferenceEntryIndex.java @@ -21,6 +21,7 @@ import org.eclipse.edc.spi.query.QueryResolver; import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.result.StoreResult; +import org.jetbrains.annotations.Nullable; import java.util.List; import java.util.Map; @@ -40,6 +41,11 @@ public InMemoryEndpointDataReferenceEntryIndex(CriterionOperatorRegistry criteri queryResolver = new ReflectionBasedQueryResolver<>(EndpointDataReferenceEntry.class, criterionOperatorRegistry); } + @Override + public @Nullable EndpointDataReferenceEntry findById(String transferProcessId) { + return cache.get(transferProcessId); + } + @Override public StoreResult> query(QuerySpec spec) { return StoreResult.success(queryResolver.query(cache.values().stream(), spec).collect(Collectors.toList())); diff --git a/core/common/edr-store-core/src/main/java/org/eclipse/edc/core/edr/store/EndpointDataReferenceStoreImpl.java b/core/common/edr-store-core/src/main/java/org/eclipse/edc/core/edr/store/EndpointDataReferenceStoreImpl.java index 6184c1a62df..aeb2e2a7440 100644 --- a/core/common/edr-store-core/src/main/java/org/eclipse/edc/core/edr/store/EndpointDataReferenceStoreImpl.java +++ b/core/common/edr-store-core/src/main/java/org/eclipse/edc/core/edr/store/EndpointDataReferenceStoreImpl.java @@ -22,6 +22,7 @@ import org.eclipse.edc.spi.result.StoreResult; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.transaction.spi.TransactionContext; +import org.jetbrains.annotations.Nullable; import java.util.List; @@ -47,6 +48,11 @@ public StoreResult resolveByTransferProcess(String transferProcessI return transactionalContext.execute(() -> dataReferenceCache.get(transferProcessId)); } + @Override + public @Nullable EndpointDataReferenceEntry findById(String transferProcessId) { + return transactionalContext.execute(() -> dataReferenceEntryIndex.findById(transferProcessId)); + } + @Override public StoreResult> query(QuerySpec querySpec) { return transactionalContext.execute(() -> dataReferenceEntryIndex.query(querySpec)); diff --git a/core/common/edr-store-core/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/core/common/edr-store-core/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension index ef2ddebe16f..a48d397f742 100644 --- a/core/common/edr-store-core/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension +++ b/core/common/edr-store-core/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -13,4 +13,5 @@ # -org.eclipse.edc.core.edr.EndpointDataReferenceStoreExtension \ No newline at end of file +org.eclipse.edc.core.edr.EndpointDataReferenceStoreExtension +org.eclipse.edc.core.edr.EndpointDataReferenceStoreDefaultServicesExtension \ No newline at end of file diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/listener/TransferProcessEventListener.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/listener/TransferProcessEventListener.java index 3964cfc89f8..bcd39524afc 100644 --- a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/listener/TransferProcessEventListener.java +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/listener/TransferProcessEventListener.java @@ -46,9 +46,7 @@ public TransferProcessEventListener(EventRouter eventRouter, Clock clock) { @Override public void initiated(TransferProcess process) { - var event = TransferProcessInitiated.Builder.newInstance() - .transferProcessId(process.getId()) - .callbackAddresses(process.getCallbackAddresses()) + var event = withBaseProperties(TransferProcessInitiated.Builder.newInstance(), process) .build(); publish(event); @@ -56,9 +54,7 @@ public void initiated(TransferProcess process) { @Override public void provisioningRequested(TransferProcess process) { - var event = TransferProcessProvisioningRequested.Builder.newInstance() - .transferProcessId(process.getId()) - .callbackAddresses(process.getCallbackAddresses()) + var event = withBaseProperties(TransferProcessProvisioningRequested.Builder.newInstance(), process) .build(); publish(event); @@ -66,9 +62,7 @@ public void provisioningRequested(TransferProcess process) { @Override public void provisioned(TransferProcess process) { - var event = TransferProcessProvisioned.Builder.newInstance() - .transferProcessId(process.getId()) - .callbackAddresses(process.getCallbackAddresses()) + var event = withBaseProperties(TransferProcessProvisioned.Builder.newInstance(), process) .build(); publish(event); @@ -76,9 +70,7 @@ public void provisioned(TransferProcess process) { @Override public void requested(TransferProcess process) { - var event = TransferProcessRequested.Builder.newInstance() - .transferProcessId(process.getId()) - .callbackAddresses(process.getCallbackAddresses()) + var event = withBaseProperties(TransferProcessRequested.Builder.newInstance(), process) .build(); publish(event); @@ -86,12 +78,8 @@ public void requested(TransferProcess process) { @Override public void started(TransferProcess process, TransferProcessStartedData additionalData) { - var event = TransferProcessStarted.Builder.newInstance() - .transferProcessId(process.getId()) + var event = withBaseProperties(TransferProcessStarted.Builder.newInstance(), process) .dataAddress(additionalData.getDataAddress()) - .callbackAddresses(process.getCallbackAddresses()) - .contractId(process.getContractId()) - .type(process.getType().name()) .build(); publish(event); @@ -99,9 +87,7 @@ public void started(TransferProcess process, TransferProcessStartedData addition @Override public void completed(TransferProcess process) { - var event = TransferProcessCompleted.Builder.newInstance() - .transferProcessId(process.getId()) - .callbackAddresses(process.getCallbackAddresses()) + var event = withBaseProperties(TransferProcessCompleted.Builder.newInstance(), process) .build(); publish(event); @@ -109,10 +95,8 @@ public void completed(TransferProcess process) { @Override public void terminated(TransferProcess process) { - var event = TransferProcessTerminated.Builder.newInstance() + var event = withBaseProperties(TransferProcessTerminated.Builder.newInstance(), process) .reason(process.getErrorDetail()) - .transferProcessId(process.getId()) - .callbackAddresses(process.getCallbackAddresses()) .build(); publish(event); @@ -120,9 +104,7 @@ public void terminated(TransferProcess process) { @Override public void deprovisioningRequested(TransferProcess process) { - var event = TransferProcessDeprovisioningRequested.Builder.newInstance() - .transferProcessId(process.getId()) - .callbackAddresses(process.getCallbackAddresses()) + var event = withBaseProperties(TransferProcessDeprovisioningRequested.Builder.newInstance(), process) .build(); publish(event); @@ -130,14 +112,21 @@ public void deprovisioningRequested(TransferProcess process) { @Override public void deprovisioned(TransferProcess process) { - var event = TransferProcessDeprovisioned.Builder.newInstance() - .transferProcessId(process.getId()) - .callbackAddresses(process.getCallbackAddresses()) + var event = withBaseProperties(TransferProcessDeprovisioned.Builder.newInstance(), process) .build(); publish(event); } + private > B withBaseProperties(B builder, TransferProcess process) { + return builder.transferProcessId(process.getId()) + .contractId(process.getContractId()) + .assetId(process.getAssetId()) + .type(process.getType().name()) + .callbackAddresses(process.getCallbackAddresses()); + } + + @SuppressWarnings("unchecked") private void publish(TransferProcessEvent event) { var envelope = EventEnvelope.Builder.newInstance() .payload(event) diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java index 75c07e63a60..47e6f14934e 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java @@ -31,6 +31,7 @@ import java.util.UUID; import static java.util.stream.Collectors.toMap; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; import static org.eclipse.edc.spi.result.Result.success; public class DataPlaneAuthorizationServiceImpl implements DataPlaneAuthorizationService { @@ -85,10 +86,10 @@ public Result authorize(String token, Map requestDa private Result createDataAddress(TokenRepresentation tokenRepresentation, Endpoint publicEndpoint) { var address = DataAddress.Builder.newInstance() .type(publicEndpoint.endpointType()) - .property("endpoint", publicEndpoint.endpoint()) - .property("endpointType", publicEndpoint.endpointType()) //this is duplicated in the type() field, but will make serialization easier + .property(EDC_NAMESPACE + "endpoint", publicEndpoint.endpoint()) + .property(EDC_NAMESPACE + "endpointType", publicEndpoint.endpointType()) //this is duplicated in the type() field, but will make serialization easier .properties(tokenRepresentation.getAdditional()) // would contain the "authType = bearer" entry - .property("authorization", tokenRepresentation.getToken()) + .property(EDC_NAMESPACE + "authorization", tokenRepresentation.getToken()) .build(); return success(address); diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DefaultDataPlaneAccessTokenServiceImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DefaultDataPlaneAccessTokenServiceImpl.java index ac6c54f45b6..bd4a3a5900b 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DefaultDataPlaneAccessTokenServiceImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DefaultDataPlaneAccessTokenServiceImpl.java @@ -39,6 +39,8 @@ import java.util.function.Supplier; import java.util.stream.Stream; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; + /** * This implementation of the {@link DataPlaneAccessTokenService} uses a backing storage ({@link AccessTokenDataStore}) to keep a record of all * tokens it has issued. Tokens are in JWT format. @@ -93,7 +95,7 @@ public Result obtainToken(TokenParameters parameters, DataA var allDecorators = new ArrayList<>(Stream.concat(claimDecorators, headerDecorators).toList()); var keyIdDecorator = new KeyIdDecorator(publicKeyIdSupplier.get()); allDecorators.add(keyIdDecorator); - + // if there is no "jti" header on the token params, we'll assign a random one, and add it back to the decorators if (id == null) { monitor.info("No '%s' claim found on TokenParameters. Will generate a random one.".formatted(TOKEN_ID)); @@ -113,7 +115,7 @@ public Result obtainToken(TokenParameters parameters, DataA var storeResult = accessTokenDataStore.store(accessTokenData); var content = tokenResult.getContent(); - content.getAdditional().put("authType", "bearer"); + content.getAdditional().put(EDC_NAMESPACE + "authType", "bearer"); return storeResult.succeeded() ? Result.success(content) : Result.failure(storeResult.getFailureMessages()); } diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java index 07f631b16c4..d4c19f1b29b 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java @@ -72,8 +72,8 @@ void createEndpointDataReference() { assertThat(result).isSucceeded() .satisfies(da -> { assertThat(da.getType()).isEqualTo("https://w3id.org/idsa/v4.1/HTTP"); - assertThat(da.getProperties().get("endpoint")).isEqualTo("http://example.com"); - assertThat(da.getProperties().get("endpointType")).isEqualTo(da.getType()); + assertThat(da.getStringProperty("endpoint")).isEqualTo("http://example.com"); + assertThat(da.getStringProperty("endpointType")).isEqualTo(da.getType()); assertThat(da.getStringProperty("authorization")).isEqualTo("footoken"); }); @@ -105,7 +105,7 @@ void createEndpointDataReference_withAuthType() { assertThat(result).isSucceeded() .satisfies(da -> { assertThat(da.getType()).isEqualTo("https://w3id.org/idsa/v4.1/HTTP"); - assertThat(da.getProperties().get("endpoint")).isEqualTo("http://example.com"); + assertThat(da.getStringProperty("endpoint")).isEqualTo("http://example.com"); assertThat(da.getStringProperty("authorization")).isEqualTo("footoken"); assertThat(da.getStringProperty("authType")).isEqualTo("bearer"); assertThat(da.getStringProperty("fizz")).isEqualTo("buzz"); diff --git a/extensions/control-plane/edr/edr-store-receiver/build.gradle.kts b/extensions/control-plane/edr/edr-store-receiver/build.gradle.kts new file mode 100644 index 00000000000..ae7e76797b3 --- /dev/null +++ b/extensions/control-plane/edr/edr-store-receiver/build.gradle.kts @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +plugins { + `java-library` +} + +dependencies { + api(project(":spi:common:core-spi")) + api(project(":spi:control-plane:transfer-spi")) + api(project(":spi:control-plane:control-plane-spi")) + api(project(":spi:control-plane:policy-spi")) + api(project(":spi:common:edr-store-spi")) + + testImplementation(project(":core:common:junit")) + +} \ No newline at end of file diff --git a/extensions/control-plane/edr/edr-store-receiver/src/main/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiver.java b/extensions/control-plane/edr/edr-store-receiver/src/main/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiver.java new file mode 100644 index 00000000000..b7f6547954a --- /dev/null +++ b/extensions/control-plane/edr/edr-store-receiver/src/main/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiver.java @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.edr.store.receiver; + +import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation; +import org.eclipse.edc.connector.policy.spi.store.PolicyArchive; +import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.edr.spi.store.EndpointDataReferenceStore; +import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry; +import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.event.Event; +import org.eclipse.edc.spi.event.EventEnvelope; +import org.eclipse.edc.spi.event.EventSubscriber; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.transaction.spi.TransactionContext; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +/** + * An implementation of {@link EventSubscriber} which listen to {@link TransferProcessEvent} specifically + * the {@link TransferProcessStarted}, {@link TransferProcessCompleted}, {@link TransferProcessTerminated} and + * update the {@link EndpointDataReferenceStore} accordingly + */ +public class EndpointDataReferenceStoreReceiver implements EventSubscriber { + + private final EndpointDataReferenceStore dataReferenceStore; + private final PolicyArchive policyArchive; + private final ContractAgreementService contractAgreementService; + private final TransactionContext transactionContext; + private final Monitor monitor; + private final Map, Function>> handlers = new HashMap<>(); + + public EndpointDataReferenceStoreReceiver(EndpointDataReferenceStore dataReferenceStore, PolicyArchive policyArchive, ContractAgreementService contractAgreementService, TransactionContext transactionContext, Monitor monitor) { + this.dataReferenceStore = dataReferenceStore; + this.policyArchive = policyArchive; + this.contractAgreementService = contractAgreementService; + this.transactionContext = transactionContext; + this.monitor = monitor; + registerHandler(TransferProcessStarted.class, this::handleTransferStarted); + registerHandler(TransferProcessTerminated.class, this::handleTransferTerminated); + registerHandler(TransferProcessCompleted.class, this::handleTransferCompleted); + } + + @SuppressWarnings("unchecked") + @Override + public void on(EventEnvelope event) { + if (event.getPayload() instanceof TransferProcessEvent transferProcessEvent && transferProcessEvent.getType().equals(TransferProcess.Type.CONSUMER.name())) { + var handler = (Function>) handlers.get(transferProcessEvent.getClass()); + if (handler != null) { + transactionContext.execute(() -> { + handler.apply(event.getPayload()) + .onFailure(failure -> monitor.severe("Failed to process event %s: %s".formatted(event.getPayload().getClass().getSimpleName(), failure.getFailureDetail()))) + .orElseThrow(failure -> new EdcException("Failed to process event %s: %s".formatted(event.getPayload().getClass().getSimpleName(), failure.getFailureDetail()))); + }); + } + } + } + + private void registerHandler(Class klass, Function> function) { + handlers.put(klass, function); + } + + private Result handleTransferStarted(TransferProcessStarted transferStarted) { + + if (transferStarted.getDataAddress() != null) { + var contractNegotiationId = Optional.ofNullable(contractAgreementService.findNegotiation(transferStarted.getContractId())) + .map(ContractNegotiation::getId) + .orElse(null); + + if (contractNegotiationId == null) { + var msg = "Contract Negotiation for transfer process %s not found. The EDR cached entry will not have an associated contract negotiation id"; + monitor.debug(msg.formatted(transferStarted.getTransferProcessId())); + } + + var policy = policyArchive.findPolicyForContract(transferStarted.getContractId()); + + if (policy == null) { + var msg = "Policy associated to the transfer process %s and contract agreement %s not found"; + return Result.failure(msg.formatted(transferStarted.getTransferProcessId(), transferStarted.getContractId())); + } + + var result = dataReferenceStore.save(toEndpointDataReferenceEntry(transferStarted, policy.getAssigner(), contractNegotiationId), transferStarted.getDataAddress()); + + if (result.failed()) { + return Result.failure(result.getFailureDetail()); + } + } + return Result.success(); + } + + private EndpointDataReferenceEntry toEndpointDataReferenceEntry(TransferProcessStarted transferProcessStarted, String providerId, String contractNegotiationId) { + return EndpointDataReferenceEntry.Builder.newInstance() + .id(transferProcessStarted.getContractId()) + .transferProcessId(transferProcessStarted.getTransferProcessId()) + .assetId(transferProcessStarted.getAssetId()) + .contractNegotiationId(contractNegotiationId) + .providerId(providerId) + .agreementId(transferProcessStarted.getContractId()) + .build(); + } + + private Result handleTransferTerminated(TransferProcessTerminated transferProcessTerminated) { + return removeCachedEdr(transferProcessTerminated.getTransferProcessId()); + } + + private Result handleTransferCompleted(TransferProcessCompleted transferProcessCompleted) { + return removeCachedEdr(transferProcessCompleted.getTransferProcessId()); + } + + private Result removeCachedEdr(String transferProcessId) { + if (dataReferenceStore.findById(transferProcessId) != null) { + var result = dataReferenceStore.delete(transferProcessId); + if (result.failed()) { + return Result.failure(result.getFailureDetail()); + } + } + return Result.success(); + } +} diff --git a/extensions/control-plane/edr/edr-store-receiver/src/main/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiverExtension.java b/extensions/control-plane/edr/edr-store-receiver/src/main/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiverExtension.java new file mode 100644 index 00000000000..41366dda9c2 --- /dev/null +++ b/extensions/control-plane/edr/edr-store-receiver/src/main/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiverExtension.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.edr.store.receiver; + +import org.eclipse.edc.connector.policy.spi.store.PolicyArchive; +import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent; +import org.eclipse.edc.edr.spi.store.EndpointDataReferenceStore; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.spi.event.EventRouter; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.transaction.spi.TransactionContext; + +import static org.eclipse.edc.connector.edr.store.receiver.EndpointDataReferenceStoreReceiverExtension.NAME; + +@Extension(NAME) +public class EndpointDataReferenceStoreReceiverExtension implements ServiceExtension { + + public static final String NAME = "Endpoint Data Reference Store Receiver Extension"; + private static final String DEFAULT_SYNC_LISTENER = "false"; + @Setting(value = "If true the EDR receiver will be registered as synchronous listener", defaultValue = DEFAULT_SYNC_LISTENER) + private static final String EDC_EDR_RECEIVER_SYNC = "edc.edr.receiver.sync"; + @Inject + private EventRouter router; + + @Inject + private EndpointDataReferenceStore dataReferenceStore; + + @Inject + private Monitor monitor; + + @Inject + private ContractAgreementService agreementService; + + @Inject + private PolicyArchive policyArchive; + + @Inject + private TransactionContext transactionContext; + + @Override + public String name() { + return NAME; + } + + @Override + public void initialize(ServiceExtensionContext context) { + var isSyncMode = context.getSetting(EDC_EDR_RECEIVER_SYNC, Boolean.parseBoolean(EDC_EDR_RECEIVER_SYNC)); + var receiver = new EndpointDataReferenceStoreReceiver(dataReferenceStore, policyArchive, agreementService, transactionContext, monitor.withPrefix("EDR Receiver")); + if (isSyncMode) { + router.registerSync(TransferProcessEvent.class, receiver); + } else { + router.register(TransferProcessEvent.class, receiver); + } + } +} diff --git a/extensions/control-plane/edr/edr-store-receiver/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/extensions/control-plane/edr/edr-store-receiver/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 00000000000..6ded9239390 --- /dev/null +++ b/extensions/control-plane/edr/edr-store-receiver/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1,15 @@ +# +# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +# +# This program and the accompanying materials are made available under the +# terms of the Apache License, Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# +# Contributors: +# Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation +# +# + +org.eclipse.edc.connector.edr.store.receiver.EndpointDataReferenceStoreReceiverExtension \ No newline at end of file diff --git a/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiverExtensionTest.java b/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiverExtensionTest.java new file mode 100644 index 00000000000..b8111e8fee6 --- /dev/null +++ b/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiverExtensionTest.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.edr.store.receiver; + +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent; +import org.eclipse.edc.junit.extensions.DependencyInjectionExtension; +import org.eclipse.edc.spi.event.EventRouter; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(DependencyInjectionExtension.class) +public class EndpointDataReferenceStoreReceiverExtensionTest { + + private final EventRouter eventRouter = mock(); + + @BeforeEach + void setup(ServiceExtensionContext context) { + context.registerService(EventRouter.class, eventRouter); + } + + @Test + void initialize(ServiceExtensionContext context, EndpointDataReferenceStoreReceiverExtension extension) { + extension.initialize(context); + verify(eventRouter).register(eq(TransferProcessEvent.class), isA(EndpointDataReferenceStoreReceiver.class)); + verify(eventRouter, never()).registerSync(any(), any()); + } + + @Test + void initialize_withSyncConfig(ServiceExtensionContext context, EndpointDataReferenceStoreReceiverExtension extension) { + when(context.getSetting("edc.edr.receiver.sync", false)).thenReturn(true); + extension.initialize(context); + verify(eventRouter).registerSync(eq(TransferProcessEvent.class), isA(EndpointDataReferenceStoreReceiver.class)); + verify(eventRouter, never()).register(any(), any()); + } +} diff --git a/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiverTest.java b/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiverTest.java new file mode 100644 index 00000000000..db917cffefa --- /dev/null +++ b/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiverTest.java @@ -0,0 +1,255 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.edr.store.receiver; + +import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation; +import org.eclipse.edc.connector.policy.spi.store.PolicyArchive; +import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessDeprovisioned; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessDeprovisioningRequested; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessInitiated; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessProvisioned; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessProvisioningRequested; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessRequested; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated; +import org.eclipse.edc.edr.spi.store.EndpointDataReferenceStore; +import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry; +import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.event.EventEnvelope; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.result.StoreResult; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.transaction.spi.NoopTransactionContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.mockito.ArgumentCaptor; + +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.eclipse.edc.connector.edr.store.receiver.TestFunctions.baseBuilder; +import static org.eclipse.edc.connector.edr.store.receiver.TestFunctions.envelopeFor; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.startsWith; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +public class EndpointDataReferenceStoreReceiverTest { + + private final PolicyArchive policyArchive = mock(); + private final ContractAgreementService contractAgreementService = mock(); + private final EndpointDataReferenceStore dataReferenceStore = mock(); + + private final Monitor monitor = mock(); + + private EndpointDataReferenceStoreReceiver receiver; + + @BeforeEach + void setup() { + receiver = new EndpointDataReferenceStoreReceiver(dataReferenceStore, policyArchive, contractAgreementService, new NoopTransactionContext(), monitor); + } + + @Test + void transferStarted_shouldStoreTheEdr() { + var dataAddress = DataAddress.Builder.newInstance().type("type").build(); + var event = baseBuilder(TransferProcessStarted.Builder.newInstance()).dataAddress(dataAddress).build(); + + var policy = mock(Policy.class); + var contractNegotiation = mock(ContractNegotiation.class); + + when(policyArchive.findPolicyForContract(event.getContractId())).thenReturn(policy); + when(policy.getAssigner()).thenReturn("providerId"); + when(contractAgreementService.findNegotiation(event.getContractId())).thenReturn(contractNegotiation); + when(contractNegotiation.getId()).thenReturn("contractNegotiationId"); + when(dataReferenceStore.save(any(), eq(dataAddress))).thenReturn(StoreResult.success()); + + receiver.on(envelopeFor(event)); + + var capturedEntry = ArgumentCaptor.forClass(EndpointDataReferenceEntry.class); + verify(dataReferenceStore).save(capturedEntry.capture(), eq(dataAddress)); + + var entry = capturedEntry.getValue(); + assertThat(entry.getTransferProcessId()).isEqualTo(event.getTransferProcessId()); + assertThat(entry.getAgreementId()).isEqualTo(event.getContractId()); + assertThat(entry.getAssetId()).isEqualTo(event.getAssetId()); + assertThat(entry.getProviderId()).isEqualTo("providerId"); + assertThat(entry.getContractNegotiationId()).isEqualTo("contractNegotiationId"); + + } + + @Test + void transferStarted_shouldStoreTheEdr_whenContractNegotiationNotFound() { + var dataAddress = DataAddress.Builder.newInstance().type("type").build(); + var event = baseBuilder(TransferProcessStarted.Builder.newInstance()).dataAddress(dataAddress).build(); + + var policy = mock(Policy.class); + + when(policyArchive.findPolicyForContract(event.getContractId())).thenReturn(policy); + when(policy.getAssigner()).thenReturn("providerId"); + when(contractAgreementService.findNegotiation(event.getContractId())).thenReturn(null); + when(dataReferenceStore.save(any(), eq(dataAddress))).thenReturn(StoreResult.success()); + + receiver.on(envelopeFor(event)); + + var capturedEntry = ArgumentCaptor.forClass(EndpointDataReferenceEntry.class); + verify(dataReferenceStore).save(capturedEntry.capture(), eq(dataAddress)); + + var entry = capturedEntry.getValue(); + assertThat(entry.getTransferProcessId()).isEqualTo(event.getTransferProcessId()); + assertThat(entry.getAgreementId()).isEqualTo(event.getContractId()); + assertThat(entry.getAssetId()).isEqualTo(event.getAssetId()); + assertThat(entry.getProviderId()).isEqualTo("providerId"); + assertThat(entry.getContractNegotiationId()).isNull(); + + } + + @Test + void transferStarted_shouldFail_whenPolicyNotFound() { + var dataAddress = DataAddress.Builder.newInstance().type("type").build(); + var event = baseBuilder(TransferProcessStarted.Builder.newInstance()).dataAddress(dataAddress).build(); + + when(policyArchive.findPolicyForContract(event.getContractId())).thenReturn(null); + when(contractAgreementService.findNegotiation(event.getContractId())).thenReturn(null); + + assertThatThrownBy(() -> receiver.on(envelopeFor(event))).isInstanceOf(EdcException.class); + + verify(monitor).severe(startsWith("Failed to process event %s".formatted(TransferProcessStarted.class.getSimpleName()))); + verifyNoInteractions(dataReferenceStore); + + } + + @Test + void transferStarted_shouldFail_whenEdrStoreFails() { + var dataAddress = DataAddress.Builder.newInstance().type("type").build(); + var event = baseBuilder(TransferProcessStarted.Builder.newInstance()).dataAddress(dataAddress).build(); + + var policy = mock(Policy.class); + + when(policyArchive.findPolicyForContract(event.getContractId())).thenReturn(policy); + when(policy.getAssigner()).thenReturn("providerId"); + when(contractAgreementService.findNegotiation(event.getContractId())).thenReturn(null); + when(dataReferenceStore.save(any(), eq(dataAddress))).thenReturn(StoreResult.generalError("error")); + + assertThatThrownBy(() -> receiver.on(envelopeFor(event))).isInstanceOf(EdcException.class); + + verify(monitor).severe(startsWith("Failed to process event %s".formatted(TransferProcessStarted.class.getSimpleName()))); + verify(dataReferenceStore).save(any(), any()); + + } + + @ParameterizedTest + @ArgumentsSource(PurgeEvents.class) + void transferClosed_shouldRemoveCachedEdr(EventEnvelope event) { + + when(dataReferenceStore.findById(event.getPayload().getTransferProcessId())).thenReturn(mock()); + when(dataReferenceStore.delete(event.getPayload().getTransferProcessId())).thenReturn(StoreResult.success(mock())); + + receiver.on(event); + + verify(dataReferenceStore).findById(event.getPayload().getTransferProcessId()); + verify(dataReferenceStore).delete(event.getPayload().getTransferProcessId()); + + } + + @ParameterizedTest + @ArgumentsSource(PurgeEvents.class) + void transferClosed_shouldThrow_whenDeleteFails(EventEnvelope event) { + + when(dataReferenceStore.findById(event.getPayload().getTransferProcessId())).thenReturn(mock()); + when(dataReferenceStore.delete(event.getPayload().getTransferProcessId())).thenReturn(StoreResult.generalError("error")); + + assertThatThrownBy(() -> receiver.on(event)).isInstanceOf(EdcException.class); + + verify(dataReferenceStore).findById(event.getPayload().getTransferProcessId()); + verify(dataReferenceStore).delete(event.getPayload().getTransferProcessId()); + + } + + @ParameterizedTest + @ArgumentsSource(PurgeEvents.class) + void transferClosed_shouldNotThrow_whenEdrItsNotCached(EventEnvelope event) { + when(dataReferenceStore.findById(event.getPayload().getTransferProcessId())).thenReturn(null); + + receiver.on(event); + + verify(dataReferenceStore).findById(event.getPayload().getTransferProcessId()); + verify(dataReferenceStore, never()).delete(event.getPayload().getTransferProcessId()); + + } + + @ParameterizedTest + @ArgumentsSource(UnsupportedEvents.class) + void transfer_shouldNotRemoveCachedEdr_whenUnsupportedEvents(EventEnvelope event) { + + receiver.on(event); + verifyNoInteractions(dataReferenceStore); + } + + private static class PurgeEvents implements ArgumentsProvider { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public Stream provideArguments(ExtensionContext context) { + + var eventBuilders = Stream.of( + TransferProcessTerminated.Builder.newInstance(), + TransferProcessCompleted.Builder.newInstance() + ); + + return eventBuilders + .map(it -> baseBuilder((TransferProcessEvent.Builder) it).build()) + .map(TestFunctions::envelopeFor) + .map(Arguments::of); + } + + } + + private static class UnsupportedEvents implements ArgumentsProvider { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public Stream provideArguments(ExtensionContext context) { + + var eventBuilders = Stream.of( + TransferProcessDeprovisioned.Builder.newInstance(), + TransferProcessDeprovisioningRequested.Builder.newInstance(), + TransferProcessInitiated.Builder.newInstance(), + TransferProcessProvisioned.Builder.newInstance(), + TransferProcessProvisioningRequested.Builder.newInstance(), + TransferProcessRequested.Builder.newInstance().transferProcessId("id") + ); + + return eventBuilders + .map(it -> baseBuilder((TransferProcessEvent.Builder) it).build()) + .map(TestFunctions::envelopeFor) + .map(Arguments::of); + } + + } +} diff --git a/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/edr/store/receiver/TestFunctions.java b/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/edr/store/receiver/TestFunctions.java new file mode 100644 index 00000000000..2223cfb33c1 --- /dev/null +++ b/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/edr/store/receiver/TestFunctions.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.edr.store.receiver; + +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.spi.event.Event; +import org.eclipse.edc.spi.event.EventEnvelope; +import org.eclipse.edc.spi.types.domain.callback.CallbackAddress; + +import java.util.List; +import java.util.Set; +import java.util.UUID; + +public class TestFunctions { + + @SuppressWarnings("unchecked") + public static EventEnvelope envelopeFor(T event) { + + return EventEnvelope.Builder + .newInstance() + .id(UUID.randomUUID().toString()) + .at(System.currentTimeMillis()) + .payload(event) + .build(); + } + + public static > B baseBuilder(B builder) { + var callbacks = List.of(CallbackAddress.Builder.newInstance().uri("http://local").events(Set.of("test")).build()); + return builder.transferProcessId("id") + .assetId("assetId") + .type(TransferProcess.Type.CONSUMER.name()) + .contractId("agreementId") + .callbackAddresses(callbacks); + } + +} diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api-configuration/src/main/java/org/eclipse/edc/connector/api/signaling/configuration/SignalingApiConfigurationExtension.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api-configuration/src/main/java/org/eclipse/edc/connector/api/signaling/configuration/SignalingApiConfigurationExtension.java index 197c062ae4b..05eaac22045 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api-configuration/src/main/java/org/eclipse/edc/connector/api/signaling/configuration/SignalingApiConfigurationExtension.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api-configuration/src/main/java/org/eclipse/edc/connector/api/signaling/configuration/SignalingApiConfigurationExtension.java @@ -41,6 +41,8 @@ import java.util.Map; import static org.eclipse.edc.connector.api.signaling.configuration.SignalingApiConfigurationExtension.NAME; +import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_PREFIX; +import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_SCHEMA; import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_PREFIX; import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_SCHEMA; import static org.eclipse.edc.spi.CoreConstants.JSON_LD; @@ -89,6 +91,8 @@ public void initialize(ServiceExtensionContext context) { context.registerService(SignalingApiConfiguration.class, new SignalingApiConfiguration(webServiceConfiguration)); jsonLd.registerNamespace(ODRL_PREFIX, ODRL_SCHEMA, SIGNALING_SCOPE); + jsonLd.registerNamespace(DSPACE_PREFIX, DSPACE_SCHEMA, SIGNALING_SCOPE); + var jsonLdMapper = getJsonLdMapper(); webService.registerResource(webServiceConfiguration.getContextAlias(), new ObjectMapperProvider(jsonLdMapper)); webService.registerResource(webServiceConfiguration.getContextAlias(), new JerseyJsonLdInterceptor(jsonLd, jsonLdMapper, SIGNALING_SCOPE)); diff --git a/settings.gradle.kts b/settings.gradle.kts index 68e231ecc07..14f1553c366 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -170,6 +170,7 @@ include(":extensions:control-plane:store:sql:transfer-process-store-sql") include(":extensions:control-plane:callback:callback-event-dispatcher") include(":extensions:control-plane:callback:callback-http-dispatcher") include(":extensions:control-plane:callback:callback-static-endpoint") +include(":extensions:control-plane:edr:edr-store-receiver") include(":extensions:data-plane:data-plane-client") diff --git a/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/store/EndpointDataReferenceEntryIndex.java b/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/store/EndpointDataReferenceEntryIndex.java index 859740b2953..4a8d3a64924 100644 --- a/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/store/EndpointDataReferenceEntryIndex.java +++ b/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/store/EndpointDataReferenceEntryIndex.java @@ -19,6 +19,7 @@ import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.result.StoreResult; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.jetbrains.annotations.Nullable; import java.util.List; @@ -28,6 +29,15 @@ @ExtensionPoint public interface EndpointDataReferenceEntryIndex { + /** + * Return a {@link EndpointDataReferenceEntry} associated with the transferProcessId in input + * + * @param transferProcessId The transferProcessId + * @return The result containing the EDR entry {@link EndpointDataReferenceEntry} + */ + @Nullable + EndpointDataReferenceEntry findById(String transferProcessId); + /** * Returns all the EDR entries in the store that are covered by a given {@link QuerySpec}. * diff --git a/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/store/EndpointDataReferenceStore.java b/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/store/EndpointDataReferenceStore.java index 5752c6e307d..b9c4d4a3fa1 100644 --- a/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/store/EndpointDataReferenceStore.java +++ b/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/store/EndpointDataReferenceStore.java @@ -19,6 +19,7 @@ import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.result.StoreResult; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.jetbrains.annotations.Nullable; import java.util.List; @@ -39,6 +40,15 @@ public interface EndpointDataReferenceStore { */ StoreResult resolveByTransferProcess(String transferProcessId); + /** + * Return a {@link EndpointDataReferenceEntry} associated with the transferProcessId in input + * + * @param transferProcessId The transferProcessId + * @return The result containing the EDR entry {@link EndpointDataReferenceEntry} + */ + @Nullable + EndpointDataReferenceEntry findById(String transferProcessId); + /** * Returns all the EDR entries in the store that are covered by a given {@link QuerySpec}. * diff --git a/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/types/EndpointDataReferenceEntry.java b/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/types/EndpointDataReferenceEntry.java index 0dd02ff7914..2a2fd8b34bf 100644 --- a/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/types/EndpointDataReferenceEntry.java +++ b/spi/common/edr-store-spi/src/main/java/org/eclipse/edc/edr/spi/types/EndpointDataReferenceEntry.java @@ -118,7 +118,6 @@ public EndpointDataReferenceEntry build() { requireNonNull(entity.assetId, ASSET_ID); requireNonNull(entity.agreementId, AGREEMENT_ID); requireNonNull(entity.transferProcessId, TRANSFER_PROCESS_ID); - requireNonNull(entity.contractNegotiationId, TRANSFER_PROCESS_ID); requireNonNull(entity.providerId, TRANSFER_PROCESS_ID); // The id is always equals to transfer process id entity.id = entity.transferProcessId; diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/event/TransferProcessEvent.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/event/TransferProcessEvent.java index 839a16440fa..232c88a25dc 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/event/TransferProcessEvent.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/event/TransferProcessEvent.java @@ -28,13 +28,27 @@ public abstract class TransferProcessEvent extends Event { protected String transferProcessId; - protected List callbackAddresses = new ArrayList<>(); + protected String assetId; + protected String type; + protected String contractId; public String getTransferProcessId() { return transferProcessId; } + public String getAssetId() { + return assetId; + } + + public String getType() { + return type; + } + + public String getContractId() { + return contractId; + } + @Override public List getCallbackAddresses() { return callbackAddresses; @@ -53,15 +67,30 @@ public B transferProcessId(String transferProcessId) { return (B) this; } + public B assetId(String assetId) { + event.assetId = assetId; + return (B) this; + } + public B callbackAddresses(List callbackAddresses) { event.callbackAddresses = callbackAddresses; return self(); } + public B type(String type) { + event.type = type; + return self(); + } + + public B contractId(String contractId) { + event.contractId = contractId; + return self(); + } + public abstract B self(); public T build() { - Objects.requireNonNull(event.transferProcessId); + Objects.requireNonNull(event.transferProcessId, "transferProcess id can't be null"); return event; } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/event/TransferProcessStarted.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/event/TransferProcessStarted.java index db9162e798f..654f55d9999 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/event/TransferProcessStarted.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/event/TransferProcessStarted.java @@ -24,10 +24,7 @@ */ @JsonDeserialize(builder = TransferProcessStarted.Builder.class) public class TransferProcessStarted extends TransferProcessEvent { - private DataAddress dataAddress; - private String contractId; - private String type; private TransferProcessStarted() { } @@ -41,14 +38,6 @@ public String name() { return "transfer.process.started"; } - public String getContractId() { - return contractId; - } - - public String getType() { - return type; - } - @JsonPOJOBuilder(withPrefix = "") public static class Builder extends TransferProcessEvent.Builder { @@ -65,17 +54,7 @@ public Builder dataAddress(DataAddress dataAddress) { event.dataAddress = dataAddress; return this; } - - public Builder contractId(String contractId) { - event.contractId = contractId; - return this; - } - - public Builder type(String type) { - event.type = type; - return this; - } - + @Override public Builder self() { return this; diff --git a/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/transfer/spi/event/TransferProcessEventTest.java b/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/transfer/spi/event/TransferProcessEventTest.java index 986d7e8189b..be056224806 100644 --- a/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/transfer/spi/event/TransferProcessEventTest.java +++ b/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/transfer/spi/event/TransferProcessEventTest.java @@ -15,8 +15,10 @@ package org.eclipse.edc.connector.transfer.spi.event; import com.fasterxml.jackson.databind.jsontype.NamedType; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.spi.event.EventEnvelope; import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.callback.CallbackAddress; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; @@ -55,26 +57,39 @@ void serdes(EventEnvelope event) { private static class EventInstances implements ArgumentsProvider { + @SuppressWarnings("unchecked") @Override public Stream provideArguments(ExtensionContext context) { - var callbacks = List.of(CallbackAddress.Builder.newInstance().uri("http://local").events(Set.of("test")).build()); var eventBuilders = Stream.of( - TransferProcessCompleted.Builder.newInstance().transferProcessId("id").callbackAddresses(callbacks).build(), - TransferProcessDeprovisioned.Builder.newInstance().transferProcessId("id").callbackAddresses(callbacks).build(), - TransferProcessTerminated.Builder.newInstance().transferProcessId("id").callbackAddresses(callbacks).reason("any reason").build(), - TransferProcessInitiated.Builder.newInstance().transferProcessId("id").callbackAddresses(callbacks).build(), - TransferProcessProvisioned.Builder.newInstance().transferProcessId("id").callbackAddresses(callbacks).build(), - TransferProcessRequested.Builder.newInstance().transferProcessId("id").callbackAddresses(callbacks).build() + TransferProcessCompleted.Builder.newInstance(), + TransferProcessDeprovisioned.Builder.newInstance(), + TransferProcessDeprovisioningRequested.Builder.newInstance(), + TransferProcessStarted.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("type").build()), + TransferProcessTerminated.Builder.newInstance().reason("any reason"), + TransferProcessInitiated.Builder.newInstance(), + TransferProcessProvisioned.Builder.newInstance(), + TransferProcessProvisioningRequested.Builder.newInstance(), + TransferProcessRequested.Builder.newInstance().transferProcessId("id") ); return eventBuilders + .map(it -> baseProperties(it).build()) .map(it -> EventEnvelope.Builder.newInstance() .at(Clock.systemUTC().millis()) .id(UUID.randomUUID().toString()).payload(it) .build()) .map(Arguments::of); } + + private > TransferProcessEvent.Builder baseProperties(TransferProcessEvent.Builder builder) { + var callbacks = List.of(CallbackAddress.Builder.newInstance().uri("http://local").events(Set.of("test")).build()); + return builder.transferProcessId("id") + .assetId("assetId") + .type(TransferProcess.Type.CONSUMER.name()) + .contractId("agreementId") + .callbackAddresses(callbacks); + } } } diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java index a27e1b67a9f..bae1e1f1b6f 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java @@ -101,10 +101,9 @@ void startTransfer() throws JsonProcessingException { // verify basic shape of the DSPACE data address (=EDR token) assertThat(dataAddress).isNotNull(); assertThat(dataAddress.getType()).isEqualTo("https://w3id.org/idsa/v4.1/HTTP"); - assertThat(dataAddress.getProperties()) - .containsKey("authorization") - .containsEntry("endpoint", DATAPLANE_PUBLIC_ENDPOINT_URL) - .containsEntry("authType", "bearer"); + assertThat(dataAddress.getStringProperty("endpoint")).isEqualTo(DATAPLANE_PUBLIC_ENDPOINT_URL); + assertThat(dataAddress.getStringProperty("authorization")).isNotNull(); + assertThat(dataAddress.getStringProperty("authType")).isEqualTo("bearer"); // verify that the data flow was created var store = runtime.getService(DataPlaneStore.class).findById(processId); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/BaseEndToEndParticipant.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/BaseEndToEndParticipant.java new file mode 100644 index 00000000000..f8ddacb1ef4 --- /dev/null +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/BaseEndToEndParticipant.java @@ -0,0 +1,306 @@ +/* + * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e.participant; + +import io.restassured.common.mapper.TypeRef; +import jakarta.json.Json; +import jakarta.json.JsonObject; +import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; +import org.eclipse.edc.test.system.utils.Participant; +import org.hamcrest.Matcher; +import org.jetbrains.annotations.NotNull; + +import java.net.URI; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static io.restassured.RestAssured.given; +import static io.restassured.http.ContentType.JSON; +import static jakarta.json.Json.createArrayBuilder; +import static jakarta.json.Json.createObjectBuilder; +import static java.io.File.separator; +import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.CONTEXT; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.ID; +import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.spi.CoreConstants.EDC_PREFIX; +import static org.eclipse.edc.spi.system.ServiceExtensionContext.PARTICIPANT_ID; +import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.defaultDatasourceConfiguration; + +public class BaseEndToEndParticipant extends Participant { + + private final Duration timeout = Duration.ofSeconds(30); + + private final URI controlPlaneDefault = URI.create("http://localhost:" + getFreePort()); + private final URI controlPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); + private final URI dataPlaneDefault = URI.create("http://localhost:" + getFreePort()); + private final URI dataPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); + private final URI dataPlaneSignaling = URI.create("http://localhost:" + getFreePort() + "/signaling"); + private final URI dataPlanePublic = URI.create("http://localhost:" + getFreePort() + "/public"); + private final URI backendService = URI.create("http://localhost:" + getFreePort()); + + protected BaseEndToEndParticipant() { + super(); + } + + /** + * Get private properties to configure a dynamic http receiver for EDR. + * + * @return the receiver properties. + */ + public JsonObject dynamicReceiverPrivateProperties() { + return Json.createObjectBuilder() + .add("receiverHttpEndpoint", backendService + "/api/consumer/dataReference") + .build(); + } + + /** + * Get the latest EDR received by the backend service. + * + * @param id EDR id + * @return endpoint data reference. + */ + public EndpointDataReference getDataReference(String id) { + var dataReference = new AtomicReference(); + + await().atMost(timeout).untilAsserted(() -> { + var result = given() + .baseUri(backendService.toString()) + .when() + .get("/api/consumer/dataReference/{id}", id) + .then() + .statusCode(200) + .extract() + .body() + .as(EndpointDataReference.class); + dataReference.set(result); + }); + + return dataReference.get(); + } + + /** + * Get all EDR received by the backend service. + * + * @param id transfer process id. + * @return list of endpoint data references. + */ + public List getAllDataReferences(String id) { + var dataReference = new AtomicReference>(); + + var listType = new TypeRef>() { + }; + + await().atMost(timeout).untilAsserted(() -> { + var result = given() + .baseUri(backendService.toString()) + .when() + .get("/api/consumer/dataReference/{id}/all", id) + .then() + .statusCode(200) + .extract() + .body() + .as(listType); + dataReference.set(result); + }); + + return dataReference.get(); + } + + /** + * Pull data from provider using EDR. + * + * @param edr endpoint data reference + * @param queryParams query parameters + * @param bodyMatcher matcher for response body + */ + public void pullData(EndpointDataReference edr, Map queryParams, Matcher bodyMatcher) { + given() + .baseUri(edr.getEndpoint()) + .header(edr.getAuthKey(), edr.getAuthCode()) + .queryParams(queryParams) + .when() + .get() + .then() + .log().ifError() + .statusCode(200) + .body("message", bodyMatcher); + } + + public URI backendService() { + return backendService; + } + + public URI publicDataPlane() { + return dataPlanePublic; + } + + /** + * Register a data plane using the old data plane control API URL and no transfer types + */ + public void registerDataPlane() { + registerDataPlane(dataPlaneControl + "/transfer", Set.of()); + } + + /** + * Register a data plane using with input transfer type using the data plane signaling API url + */ + public void registerDataPlane(Set transferTypes) { + registerDataPlane(dataPlaneSignaling + "/v1/dataflows", Set.of("HttpData", "HttpProvision", "Kafka"), Set.of("HttpData", "HttpProvision", "HttpProxy", "Kafka"), transferTypes); + } + + /** + * Register a data plane + * + * @param url The data plane url + * @param transferTypes supported transfer types + */ + public void registerDataPlane(String url, Set transferTypes) { + registerDataPlane(url, Set.of("HttpData", "HttpProvision", "Kafka"), Set.of("HttpData", "HttpProvision", "HttpProxy", "Kafka"), transferTypes); + } + + /** + * Register a data plane with the old data plane control API url + * + * @param sources The allowed source types + * @param destinations The allowed destination types + */ + public void registerDataPlane(Set sources, Set destinations) { + registerDataPlane(dataPlaneControl + "/transfer", sources, destinations, Set.of()); + } + + /** + * Register a data plane + * + * @param url The url of the data plane + * @param sources The allowed source types + * @param destinations The allowed destination types + * @param transferTypes The allowed transfer types + */ + public void registerDataPlane(String url, Set sources, Set destinations, Set transferTypes) { + var jsonObject = Json.createObjectBuilder() + .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) + .add(ID, UUID.randomUUID().toString()) + .add(EDC_NAMESPACE + "url", url) + .add(EDC_NAMESPACE + "allowedSourceTypes", createArrayBuilder(sources)) + .add(EDC_NAMESPACE + "allowedDestTypes", createArrayBuilder(destinations)) + .add(EDC_NAMESPACE + "allowedTransferTypes", createArrayBuilder(transferTypes)) + .add(EDC_NAMESPACE + "properties", createObjectBuilder().add("publicApiUrl", dataPlanePublic.toString())) + .build(); + + managementEndpoint.baseRequest() + .contentType(JSON) + .body(jsonObject.toString()) + .when() + .post("/v2/dataplanes") + .then() + .statusCode(200); + } + + public Map controlPlaneConfiguration() { + return new HashMap<>() { + { + put(PARTICIPANT_ID, id); + put("web.http.port", String.valueOf(controlPlaneDefault.getPort())); + put("web.http.path", "/api"); + put("web.http.protocol.port", String.valueOf(protocolEndpoint.getUrl().getPort())); + put("web.http.protocol.path", protocolEndpoint.getUrl().getPath()); + put("web.http.management.port", String.valueOf(managementEndpoint.getUrl().getPort())); + put("web.http.management.path", managementEndpoint.getUrl().getPath()); + put("web.http.control.port", String.valueOf(controlPlaneControl.getPort())); + put("web.http.control.path", controlPlaneControl.getPath()); + put("edc.dsp.callback.address", protocolEndpoint.getUrl().toString()); + put("edc.vault", resourceAbsolutePath(getName() + "-vault.properties")); + put("edc.keystore", resourceAbsolutePath("certs/cert.pfx")); + put("edc.keystore.password", "123456"); + put("edc.receiver.http.endpoint", backendService + "/api/consumer/dataReference"); + put("edc.transfer.proxy.token.signer.privatekey.alias", "1"); + put("edc.transfer.proxy.token.verifier.publickey.alias", "public-key"); + put("edc.transfer.proxy.endpoint", dataPlanePublic.toString()); + put("edc.transfer.send.retry.limit", "1"); + put("edc.transfer.send.retry.base-delay.ms", "100"); + put("edc.negotiation.consumer.send.retry.limit", "1"); + put("edc.negotiation.provider.send.retry.limit", "1"); + put("edc.negotiation.consumer.send.retry.base-delay.ms", "100"); + put("edc.negotiation.provider.send.retry.base-delay.ms", "100"); + + put("provisioner.http.entries.default.provisioner.type", "provider"); + put("provisioner.http.entries.default.endpoint", backendService + "/api/provision"); + put("provisioner.http.entries.default.data.address.type", "HttpProvision"); + } + }; + } + + public Map controlPlanePostgresConfiguration() { + var baseConfiguration = controlPlaneConfiguration(); + baseConfiguration.putAll(defaultDatasourceConfiguration(getName())); + return baseConfiguration; + } + + public Map dataPlaneConfiguration() { + return new HashMap<>() { + { + put("web.http.port", String.valueOf(dataPlaneDefault.getPort())); + put("web.http.path", "/api"); + put("web.http.public.port", String.valueOf(dataPlanePublic.getPort())); + put("web.http.public.path", "/public"); + put("web.http.control.port", String.valueOf(dataPlaneControl.getPort())); + put("web.http.control.path", dataPlaneControl.getPath()); + put("web.http.signaling.port", String.valueOf(dataPlaneSignaling.getPort())); + put("web.http.signaling.path", dataPlaneSignaling.getPath()); + put("edc.vault", resourceAbsolutePath(getName() + "-vault.properties")); + put("edc.keystore", resourceAbsolutePath("certs/cert.pfx")); + put("edc.keystore.password", "123456"); + put("edc.dataplane.api.public.baseurl", dataPlanePublic + "/v2/"); + put("edc.dataplane.token.validation.endpoint", controlPlaneControl + "/token"); + put("edc.transfer.proxy.token.signer.privatekey.alias", "1"); + put("edc.transfer.proxy.token.verifier.publickey.alias", "public-key"); + put("edc.dataplane.http.sink.partition.size", "1"); + } + }; + } + + public Map dataPlanePostgresConfiguration() { + var baseConfiguration = dataPlaneConfiguration(); + baseConfiguration.putAll(defaultDatasourceConfiguration(getName())); + return baseConfiguration; + } + + @NotNull + private String resourceAbsolutePath(String filename) { + return System.getProperty("user.dir") + separator + "build" + separator + "resources" + separator + "test" + separator + filename; + } + + public static class Builder

> extends Participant.Builder { + + protected Builder(P participant) { + super(participant); + } + + @Override + public BaseEndToEndParticipant build() { + super.managementEndpoint(new Endpoint(URI.create("http://localhost:" + getFreePort() + "/api/management"))); + super.protocolEndpoint(new Endpoint(URI.create("http://localhost:" + getFreePort() + "/protocol"))); + super.build(); + return participant; + } + } +} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java index f3cbd1aadc6..67348ec12ef 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java @@ -15,304 +15,14 @@ package org.eclipse.edc.test.e2e.participant; import com.fasterxml.jackson.annotation.JsonCreator; -import io.restassured.common.mapper.TypeRef; -import jakarta.json.Json; -import jakarta.json.JsonObject; -import org.eclipse.edc.spi.types.domain.DataAddress; -import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; -import org.eclipse.edc.test.system.utils.Participant; -import org.hamcrest.Matcher; -import org.jetbrains.annotations.NotNull; import java.net.URI; -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; -import static io.restassured.RestAssured.given; -import static io.restassured.http.ContentType.JSON; -import static jakarta.json.Json.createArrayBuilder; -import static jakarta.json.Json.createObjectBuilder; -import static java.io.File.separator; -import static org.awaitility.Awaitility.await; -import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.CONTEXT; -import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.ID; import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; -import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; -import static org.eclipse.edc.spi.CoreConstants.EDC_PREFIX; -import static org.eclipse.edc.spi.system.ServiceExtensionContext.PARTICIPANT_ID; -import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.defaultDatasourceConfiguration; -public class EndToEndTransferParticipant extends Participant { +public class EndToEndTransferParticipant extends BaseEndToEndParticipant { - private final Duration timeout = Duration.ofSeconds(30); - - private final URI controlPlaneDefault = URI.create("http://localhost:" + getFreePort()); - private final URI controlPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); - private final URI dataPlaneDefault = URI.create("http://localhost:" + getFreePort()); - private final URI dataPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); - private final URI dataPlaneSignaling = URI.create("http://localhost:" + getFreePort() + "/signaling"); - private final URI dataPlanePublic = URI.create("http://localhost:" + getFreePort() + "/public"); - private final URI backendService = URI.create("http://localhost:" + getFreePort()); - - private EndToEndTransferParticipant() { - super(); - } - - /** - * Get private properties to configure a dynamic http receiver for EDR. - * - * @return the receiver properties. - */ - public JsonObject dynamicReceiverPrivateProperties() { - return Json.createObjectBuilder() - .add("receiverHttpEndpoint", backendService + "/api/consumer/dataReference") - .build(); - } - - /** - * Get the latest EDR received by the backend service. - * - * @param id EDR id - * @return endpoint data reference. - */ - public EndpointDataReference getDataReference(String id) { - var dataReference = new AtomicReference(); - - await().atMost(timeout).untilAsserted(() -> { - var result = given() - .baseUri(backendService.toString()) - .when() - .get("/api/consumer/dataReference/{id}", id) - .then() - .statusCode(200) - .extract() - .body() - .as(EndpointDataReference.class); - dataReference.set(result); - }); - - return dataReference.get(); - } - - /** - * Get all EDR received by the backend service. - * - * @param id transfer process id. - * @return list of endpoint data references. - */ - public List getAllDataReferences(String id) { - var dataReference = new AtomicReference>(); - - var listType = new TypeRef>() { - }; - - await().atMost(timeout).untilAsserted(() -> { - var result = given() - .baseUri(backendService.toString()) - .when() - .get("/api/consumer/dataReference/{id}/all", id) - .then() - .statusCode(200) - .extract() - .body() - .as(listType); - dataReference.set(result); - }); - - return dataReference.get(); - } - - /** - * Pull data from provider using EDR. - * - * @param edr endpoint data reference - * @param queryParams query parameters - * @param bodyMatcher matcher for response body - */ - public void pullData(EndpointDataReference edr, Map queryParams, Matcher bodyMatcher) { - given() - .baseUri(edr.getEndpoint()) - .header(edr.getAuthKey(), edr.getAuthCode()) - .queryParams(queryParams) - .when() - .get() - .then() - .log().ifError() - .statusCode(200) - .body("message", bodyMatcher); - } - - - /** - * Pull data from provider using EDR. - * - * @param edr endpoint data reference - * @param queryParams query parameters - * @param bodyMatcher matcher for response body - */ - public void pullData(DataAddress edr, Map queryParams, Matcher bodyMatcher) { - given() - .baseUri(edr.getStringProperty("endpoint")) - .header("Authorization", edr.getStringProperty("authorization")) - .queryParams(queryParams) - .when() - .get() - .then() - .log().ifError() - .statusCode(200) - .body("message", bodyMatcher); - } - - public URI backendService() { - return backendService; - } - - public URI publicDataPlane() { - return dataPlanePublic; - } - - /** - * Register a data plane using the old data plane control API URL and no transfer types - */ - public void registerDataPlane() { - registerDataPlane(dataPlaneControl + "/transfer", Set.of()); - } - - /** - * Register a data plane using with input transfer type using the data plane signaling API url - */ - public void registerDataPlane(Set transferTypes) { - registerDataPlane(dataPlaneSignaling + "/v1/dataflows", Set.of("HttpData", "HttpProvision", "Kafka"), Set.of("HttpData", "HttpProvision", "HttpProxy", "Kafka"), transferTypes); - } - - /** - * Register a data plane - * - * @param url The data plane url - * @param transferTypes supported transfer types - */ - public void registerDataPlane(String url, Set transferTypes) { - registerDataPlane(url, Set.of("HttpData", "HttpProvision", "Kafka"), Set.of("HttpData", "HttpProvision", "HttpProxy", "Kafka"), transferTypes); - } - - /** - * Register a data plane with the old data plane control API url - * - * @param sources The allowed source types - * @param destinations The allowed destination types - */ - public void registerDataPlane(Set sources, Set destinations) { - registerDataPlane(dataPlaneControl + "/transfer", sources, destinations, Set.of()); - } - - /** - * Register a data plane - * - * @param url The url of the data plane - * @param sources The allowed source types - * @param destinations The allowed destination types - * @param transferTypes The allowed transfer types - */ - public void registerDataPlane(String url, Set sources, Set destinations, Set transferTypes) { - var jsonObject = Json.createObjectBuilder() - .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) - .add(ID, UUID.randomUUID().toString()) - .add(EDC_NAMESPACE + "url", url) - .add(EDC_NAMESPACE + "allowedSourceTypes", createArrayBuilder(sources)) - .add(EDC_NAMESPACE + "allowedDestTypes", createArrayBuilder(destinations)) - .add(EDC_NAMESPACE + "allowedTransferTypes", createArrayBuilder(transferTypes)) - .add(EDC_NAMESPACE + "properties", createObjectBuilder().add("publicApiUrl", dataPlanePublic.toString())) - .build(); - - managementEndpoint.baseRequest() - .contentType(JSON) - .body(jsonObject.toString()) - .when() - .post("/v2/dataplanes") - .then() - .statusCode(200); - } - - public Map controlPlaneConfiguration() { - return new HashMap<>() { - { - put(PARTICIPANT_ID, id); - put("web.http.port", String.valueOf(controlPlaneDefault.getPort())); - put("web.http.path", "/api"); - put("web.http.protocol.port", String.valueOf(protocolEndpoint.getUrl().getPort())); - put("web.http.protocol.path", protocolEndpoint.getUrl().getPath()); - put("web.http.management.port", String.valueOf(managementEndpoint.getUrl().getPort())); - put("web.http.management.path", managementEndpoint.getUrl().getPath()); - put("web.http.control.port", String.valueOf(controlPlaneControl.getPort())); - put("web.http.control.path", controlPlaneControl.getPath()); - put("edc.dsp.callback.address", protocolEndpoint.getUrl().toString()); - put("edc.vault", resourceAbsolutePath(getName() + "-vault.properties")); - put("edc.keystore", resourceAbsolutePath("certs/cert.pfx")); - put("edc.keystore.password", "123456"); - put("edc.receiver.http.endpoint", backendService + "/api/consumer/dataReference"); - put("edc.transfer.proxy.token.signer.privatekey.alias", "1"); - put("edc.transfer.proxy.token.verifier.publickey.alias", "public-key"); - put("edc.transfer.proxy.endpoint", dataPlanePublic.toString()); - put("edc.transfer.send.retry.limit", "1"); - put("edc.transfer.send.retry.base-delay.ms", "100"); - put("edc.negotiation.consumer.send.retry.limit", "1"); - put("edc.negotiation.provider.send.retry.limit", "1"); - put("edc.negotiation.consumer.send.retry.base-delay.ms", "100"); - put("edc.negotiation.provider.send.retry.base-delay.ms", "100"); - - put("provisioner.http.entries.default.provisioner.type", "provider"); - put("provisioner.http.entries.default.endpoint", backendService + "/api/provision"); - put("provisioner.http.entries.default.data.address.type", "HttpProvision"); - } - }; - } - - public Map controlPlanePostgresConfiguration() { - var baseConfiguration = controlPlaneConfiguration(); - baseConfiguration.putAll(defaultDatasourceConfiguration(getName())); - return baseConfiguration; - } - - public Map dataPlaneConfiguration() { - return new HashMap<>() { - { - put("web.http.port", String.valueOf(dataPlaneDefault.getPort())); - put("web.http.path", "/api"); - put("web.http.public.port", String.valueOf(dataPlanePublic.getPort())); - put("web.http.public.path", "/public"); - put("web.http.control.port", String.valueOf(dataPlaneControl.getPort())); - put("web.http.control.path", dataPlaneControl.getPath()); - put("web.http.signaling.port", String.valueOf(dataPlaneSignaling.getPort())); - put("web.http.signaling.path", dataPlaneSignaling.getPath()); - put("edc.vault", resourceAbsolutePath(getName() + "-vault.properties")); - put("edc.keystore", resourceAbsolutePath("certs/cert.pfx")); - put("edc.keystore.password", "123456"); - put("edc.dataplane.api.public.baseurl", dataPlanePublic + "/v2/"); - put("edc.dataplane.token.validation.endpoint", controlPlaneControl + "/token"); - put("edc.transfer.proxy.token.signer.privatekey.alias", "1"); - put("edc.transfer.proxy.token.verifier.publickey.alias", "public-key"); - put("edc.dataplane.http.sink.partition.size", "1"); - } - }; - } - - public Map dataPlanePostgresConfiguration() { - var baseConfiguration = dataPlaneConfiguration(); - baseConfiguration.putAll(defaultDatasourceConfiguration(getName())); - return baseConfiguration; - } - - @NotNull - private String resourceAbsolutePath(String filename) { - return System.getProperty("user.dir") + separator + "build" + separator + "resources" + separator + "test" + separator + filename; - } - - public static final class Builder extends Participant.Builder { + public static final class Builder extends BaseEndToEndParticipant.Builder { private Builder() { super(new EndToEndTransferParticipant()); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/SignalingParticipant.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/SignalingParticipant.java new file mode 100644 index 00000000000..f79ed8efb01 --- /dev/null +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/SignalingParticipant.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e.participant; + +import com.fasterxml.jackson.annotation.JsonCreator; +import io.restassured.common.mapper.TypeRef; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.hamcrest.Matcher; + +import java.util.Map; + +import static io.restassured.RestAssured.given; +import static io.restassured.http.ContentType.JSON; + +public class SignalingParticipant extends BaseEndToEndParticipant { + + /** + * Get the EDR from the EDR cache by transfer process id. + * + * @param transferProcessId The transfer process id + * @return The cached {@link DataAddress} + */ + public DataAddress getEdr(String transferProcessId) { + var dataAddressRaw = managementEndpoint.baseRequest() + .contentType(JSON) + .when() + .get("/v1/edrs/{id}/dataaddress", transferProcessId) + .then() + .log().ifError() + .statusCode(200) + .contentType(JSON) + .extract().body().as(new TypeRef>() { + }); + + + var builder = DataAddress.Builder.newInstance(); + dataAddressRaw.forEach(builder::property); + return builder.build(); + + } + + /** + * Pull data from provider using EDR. + * + * @param edr endpoint data reference + * @param queryParams query parameters + * @param bodyMatcher matcher for response body + */ + public void pullData(DataAddress edr, Map queryParams, Matcher bodyMatcher) { + given() + .baseUri(edr.getStringProperty("endpoint")) + .header("Authorization", edr.getStringProperty("authorization")) + .queryParams(queryParams) + .when() + .get() + .then() + .log().ifError() + .statusCode(200) + .body("message", bodyMatcher); + } + + public static class Builder extends BaseEndToEndParticipant.Builder { + + protected Builder() { + super(new SignalingParticipant()); + } + + @JsonCreator + public static Builder newInstance() { + return new Builder(); + } + + @Override + public SignalingParticipant build() { + super.build(); + return participant; + } + + } +} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/AbstractSignalingTransfer.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/AbstractSignalingTransfer.java deleted file mode 100644 index 82f3b3ec01e..00000000000 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/AbstractSignalingTransfer.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.test.e2e.signaling; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpMethod; -import jakarta.json.Json; -import jakarta.json.JsonArrayBuilder; -import jakarta.json.JsonObject; -import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted; -import org.eclipse.edc.spi.event.EventEnvelope; -import org.eclipse.edc.test.e2e.participant.EndToEndTransferParticipant; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockserver.integration.ClientAndServer; -import org.mockserver.model.HttpRequest; -import org.mockserver.model.HttpResponse; -import org.mockserver.model.HttpStatusCode; -import org.mockserver.model.MediaType; - -import java.time.Duration; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -import static io.restassured.RestAssured.given; -import static jakarta.json.Json.createObjectBuilder; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED; -import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; -import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; -import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; -import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; -import static org.eclipse.edc.test.system.utils.PolicyFixtures.noConstraintPolicy; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.mockserver.integration.ClientAndServer.startClientAndServer; -import static org.mockserver.model.HttpRequest.request; -import static org.mockserver.model.HttpResponse.response; -import static org.mockserver.stop.Stop.stopQuietly; - -public abstract class AbstractSignalingTransfer { - - protected static final EndToEndTransferParticipant CONSUMER = EndToEndTransferParticipant.Builder.newInstance() - .name("consumer") - .id("urn:connector:consumer") - .build(); - protected static final EndToEndTransferParticipant PROVIDER = EndToEndTransferParticipant.Builder.newInstance() - .name("provider") - .id("urn:connector:provider") - .build(); - private static final ObjectMapper MAPPER = new ObjectMapper(); - private static final String CALLBACK_PATH = "hooks"; - private static final int CALLBACK_PORT = getFreePort(); - private static ClientAndServer callbacksEndpoint; - protected final Duration timeout = Duration.ofSeconds(60); - - public static JsonObject createCallback(String url, boolean transactional, Set events) { - return Json.createObjectBuilder() - .add(TYPE, EDC_NAMESPACE + "CallbackAddress") - .add(EDC_NAMESPACE + "transactional", transactional) - .add(EDC_NAMESPACE + "uri", url) - .add(EDC_NAMESPACE + "events", events - .stream() - .collect(Json::createArrayBuilder, JsonArrayBuilder::add, JsonArrayBuilder::add) - .build()) - .build(); - } - - @BeforeEach - void beforeEach() { - callbacksEndpoint = startClientAndServer(CALLBACK_PORT); - } - - @AfterEach - void tearDown() { - stopQuietly(callbacksEndpoint); - } - - @Test - void httpPull_dataTransfer() { - registerDataPlanes(); - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); - var dynamicReceiverProps = CONSUMER.dynamicReceiverPrivateProperties(); - - var callbacks = Json.createArrayBuilder() - .add(createCallback(callbackUrl(), true, Set.of("transfer.process.started"))) - .build(); - - var request = request().withPath("/" + CALLBACK_PATH) - .withMethod(HttpMethod.POST.name()); - - var events = new ConcurrentHashMap(); - - callbacksEndpoint.when(request).respond(req -> this.cacheEdr(req, events)); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress(), "HttpData-PULL", callbacks); - - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(STARTED.name()); - }); - - await().atMost(timeout).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull()); - - var event = events.get(transferProcessId); - var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(event.getDataAddress(), Map.of("message", msg), equalTo(msg))); - - } - - @Test - void httpPushDataTransfer() { - registerDataPlanes(); - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); - var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination, "HttpData-PUSH"); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(COMPLETED.name()); - - given() - .baseUri(CONSUMER.backendService().toString()) - .when() - .get("/api/consumer/data") - .then() - .statusCode(anyOf(is(200), is(204))) - .body(is(notNullValue())); - }); - } - - private JsonObject httpDataAddress(String baseUrl) { - return createObjectBuilder() - .add(TYPE, EDC_NAMESPACE + "DataAddress") - .add(EDC_NAMESPACE + "type", "HttpData") - .add(EDC_NAMESPACE + "properties", createObjectBuilder() - .add(EDC_NAMESPACE + "baseUrl", baseUrl) - .build()) - .build(); - } - - private JsonObject syncDataAddress() { - return createObjectBuilder() - .add(TYPE, EDC_NAMESPACE + "DataAddress") - .add(EDC_NAMESPACE + "type", "HttpProxy") - .build(); - } - - @NotNull - private Map httpDataAddressProperties() { - return Map.of( - "name", "transfer-test", - "baseUrl", PROVIDER.backendService() + "/api/provider/data", - "type", "HttpData", - "proxyQueryParams", "true" - ); - } - - private void registerDataPlanes() { - PROVIDER.registerDataPlane(Set.of("HttpData-PUSH", "HttpData-PULL")); - } - - private void createResourcesOnProvider(String assetId, JsonObject contractPolicy, Map dataAddressProperties) { - PROVIDER.createAsset(assetId, Map.of("description", "description"), dataAddressProperties); - var accessPolicyId = PROVIDER.createPolicyDefinition(noConstraintPolicy()); - var contractPolicyId = PROVIDER.createPolicyDefinition(contractPolicy); - PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), accessPolicyId, contractPolicyId); - } - - private HttpResponse cacheEdr(HttpRequest request, Map events) { - - try { - var event = MAPPER.readValue(request.getBody().toString(), new TypeReference>() { - }); - events.put(event.getPayload().getTransferProcessId(), event.getPayload()); - return response() - .withStatusCode(HttpStatusCode.OK_200.code()) - .withHeader(HttpHeaderNames.CONTENT_TYPE.toString(), MediaType.PLAIN_TEXT_UTF_8.toString()) - .withBody("{}"); - - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - } - - private String callbackUrl() { - return String.format("http://localhost:%d/%s", callbacksEndpoint.getLocalPort(), CALLBACK_PATH); - } - - private JsonObject noPrivateProperty() { - return Json.createObjectBuilder().build(); - } -} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedSignalingTransferInMemoryTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedDataPlaneSignalingRuntimes.java similarity index 79% rename from system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedSignalingTransferInMemoryTest.java rename to system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedDataPlaneSignalingRuntimes.java index eba3008aaf3..4f21ca417fd 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedSignalingTransferInMemoryTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedDataPlaneSignalingRuntimes.java @@ -14,7 +14,6 @@ package org.eclipse.edc.test.e2e.signaling; -import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.extensions.EdcClassRuntimesExtension; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; import org.junit.jupiter.api.extension.RegisterExtension; @@ -22,27 +21,32 @@ import java.util.HashMap; import java.util.Map; -@EndToEndTest -class EmbeddedSignalingTransferInMemoryTest extends AbstractSignalingTransfer { +import static org.eclipse.edc.test.e2e.signaling.SignalingEndToEndTestBase.CONSUMER; +import static org.eclipse.edc.test.e2e.signaling.SignalingEndToEndTestBase.PROVIDER; - static String[] providerPlaneModules = new String[]{ +public interface EmbeddedDataPlaneSignalingRuntimes { + + String[] PROVIDER_MODULES = new String[]{ ":system-tests:e2e-transfer-test:control-plane", ":extensions:control-plane:transfer:transfer-data-plane-signaling", ":system-tests:e2e-transfer-test:data-plane", ":extensions:data-plane:data-plane-public-api-v2" }; - static String[] consumerPlaneModules = new String[]{ + String[] CONSUMER_MODULES = new String[]{ ":system-tests:e2e-transfer-test:control-plane", + ":core:common:edr-store-core", + ":extensions:control-plane:api:management-api:edr-cache-api", + ":extensions:control-plane:edr:edr-store-receiver", ":extensions:control-plane:callback:callback-event-dispatcher", ":extensions:control-plane:callback:callback-http-dispatcher" }; @RegisterExtension - static EdcClassRuntimesExtension runtimes = new EdcClassRuntimesExtension( + EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( new EdcRuntimeExtension( "consumer-control-plane", CONSUMER.controlPlaneConfiguration(), - consumerPlaneModules + CONSUMER_MODULES ), new EdcRuntimeExtension( ":system-tests:e2e-transfer-test:backend-service", @@ -56,7 +60,7 @@ class EmbeddedSignalingTransferInMemoryTest extends AbstractSignalingTransfer { new EdcRuntimeExtension( "provider-control-plane", providerConfig(), - providerPlaneModules + PROVIDER_MODULES ), new EdcRuntimeExtension( ":system-tests:e2e-transfer-test:backend-service", @@ -69,7 +73,6 @@ class EmbeddedSignalingTransferInMemoryTest extends AbstractSignalingTransfer { ) ); - private static Map providerConfig() { var cfg = PROVIDER.dataPlaneConfiguration(); cfg.putAll(PROVIDER.controlPlaneConfiguration()); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingTransferInMemoryTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/InMemorySignalingRuntimes.java similarity index 74% rename from system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingTransferInMemoryTest.java rename to system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/InMemorySignalingRuntimes.java index 2377e494ff1..0432ed3edb6 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingTransferInMemoryTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/InMemorySignalingRuntimes.java @@ -14,41 +14,45 @@ package org.eclipse.edc.test.e2e.signaling; -import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.extensions.EdcClassRuntimesExtension; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; import org.junit.jupiter.api.extension.RegisterExtension; import java.util.HashMap; -@EndToEndTest -class SignalingTransferInMemoryTest extends AbstractSignalingTransfer { +import static org.eclipse.edc.test.e2e.signaling.SignalingEndToEndTestBase.CONSUMER; +import static org.eclipse.edc.test.e2e.signaling.SignalingEndToEndTestBase.PROVIDER; - static String[] controlPlaneModules = new String[]{ +public interface InMemorySignalingRuntimes { + + String[] CONTROL_PLANE_MODULES = new String[]{ ":system-tests:e2e-transfer-test:control-plane", + ":core:common:edr-store-core", ":extensions:control-plane:transfer:transfer-data-plane-signaling", + ":extensions:control-plane:api:management-api:edr-cache-api", + ":extensions:control-plane:edr:edr-store-receiver", + ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client", ":extensions:control-plane:callback:callback-event-dispatcher", - ":extensions:control-plane:callback:callback-http-dispatcher", - ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client" + ":extensions:control-plane:callback:callback-http-dispatcher" }; - static String[] dataPlanePostgresqlModules = new String[]{ + String[] DATA_PLANE_MODULES = new String[]{ ":system-tests:e2e-transfer-test:data-plane", ":extensions:data-plane:data-plane-public-api-v2" }; - static EdcRuntimeExtension dataPlane = new EdcRuntimeExtension( + EdcRuntimeExtension DATA_PLANE = new EdcRuntimeExtension( "provider-data-plane", PROVIDER.dataPlaneConfiguration(), - dataPlanePostgresqlModules + DATA_PLANE_MODULES ); @RegisterExtension - static EdcClassRuntimesExtension runtimes = new EdcClassRuntimesExtension( + EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( new EdcRuntimeExtension( "consumer-control-plane", CONSUMER.controlPlaneConfiguration(), - controlPlaneModules + CONTROL_PLANE_MODULES ), new EdcRuntimeExtension( ":system-tests:e2e-transfer-test:backend-service", @@ -59,11 +63,11 @@ class SignalingTransferInMemoryTest extends AbstractSignalingTransfer { } } ), - dataPlane, + DATA_PLANE, new EdcRuntimeExtension( "provider-control-plane", PROVIDER.controlPlaneConfiguration(), - controlPlaneModules + CONTROL_PLANE_MODULES ), new EdcRuntimeExtension( ":system-tests:e2e-transfer-test:backend-service", @@ -75,5 +79,6 @@ class SignalingTransferInMemoryTest extends AbstractSignalingTransfer { } ) ); - + + } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTestBase.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTestBase.java new file mode 100644 index 00000000000..3fe55b5b4cc --- /dev/null +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTestBase.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e.signaling; + +import jakarta.json.JsonObject; +import org.eclipse.edc.test.e2e.participant.SignalingParticipant; + +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static org.eclipse.edc.test.system.utils.PolicyFixtures.noConstraintPolicy; + +public abstract class SignalingEndToEndTestBase { + + protected static final SignalingParticipant CONSUMER = SignalingParticipant.Builder.newInstance() + .name("consumer") + .id("urn:connector:consumer") + .build(); + protected static final SignalingParticipant PROVIDER = SignalingParticipant.Builder.newInstance() + .name("provider") + .id("urn:connector:provider") + .build(); + + protected void createResourcesOnProvider(String assetId, JsonObject contractPolicy, Map dataAddressProperties) { + PROVIDER.createAsset(assetId, Map.of("description", "description"), dataAddressProperties); + var accessPolicyId = PROVIDER.createPolicyDefinition(noConstraintPolicy()); + var contractPolicyId = PROVIDER.createPolicyDefinition(contractPolicy); + PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), accessPolicyId, contractPolicyId); + } + + protected void registerDataPlanes() { + PROVIDER.registerDataPlane(Set.of("HttpData-PUSH", "HttpData-PULL")); + } + +} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTransferTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTransferTest.java new file mode 100644 index 00000000000..ac7b5a684e9 --- /dev/null +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTransferTest.java @@ -0,0 +1,256 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e.signaling; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; +import jakarta.json.Json; +import jakarta.json.JsonArrayBuilder; +import jakarta.json.JsonObject; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted; +import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.spi.event.EventEnvelope; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.HttpRequest; +import org.mockserver.model.HttpResponse; +import org.mockserver.model.HttpStatusCode; +import org.mockserver.model.MediaType; + +import java.time.Duration; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +import static io.restassured.RestAssured.given; +import static jakarta.json.Json.createObjectBuilder; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; +import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.test.system.utils.PolicyFixtures.inForceDatePolicy; +import static org.eclipse.edc.test.system.utils.PolicyFixtures.noConstraintPolicy; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockserver.integration.ClientAndServer.startClientAndServer; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; +import static org.mockserver.stop.Stop.stopQuietly; + + +class SignalingEndToEndTransferTest { + + + abstract static class Tests extends SignalingEndToEndTestBase { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String CALLBACK_PATH = "hooks"; + private static final int CALLBACK_PORT = getFreePort(); + private static ClientAndServer callbacksEndpoint; + protected final Duration timeout = Duration.ofSeconds(60); + + public static JsonObject createCallback(String url, boolean transactional, Set events) { + return Json.createObjectBuilder() + .add(TYPE, EDC_NAMESPACE + "CallbackAddress") + .add(EDC_NAMESPACE + "transactional", transactional) + .add(EDC_NAMESPACE + "uri", url) + .add(EDC_NAMESPACE + "events", events + .stream() + .collect(Json::createArrayBuilder, JsonArrayBuilder::add, JsonArrayBuilder::add) + .build()) + .build(); + } + + private static JsonObject contractExpiresInTenSeconds() { + return inForceDatePolicy("gteq", "contractAgreement+0s", "lteq", "contractAgreement+10s"); + } + + @BeforeEach + void beforeEach() { + registerDataPlanes(); + callbacksEndpoint = startClientAndServer(CALLBACK_PORT); + } + + @AfterEach + void tearDown() { + stopQuietly(callbacksEndpoint); + } + + @Test + void httpPull_dataTransfer_withCallbacks() { + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); + var dynamicReceiverProps = CONSUMER.dynamicReceiverPrivateProperties(); + + var callbacks = Json.createArrayBuilder() + .add(createCallback(callbackUrl(), true, Set.of("transfer.process.started"))) + .build(); + + var request = request().withPath("/" + CALLBACK_PATH) + .withMethod(HttpMethod.POST.name()); + + var events = new ConcurrentHashMap(); + + callbacksEndpoint.when(request).respond(req -> this.cacheEdr(req, events)); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress(), "HttpData-PULL", callbacks); + + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(STARTED.name()); + }); + + await().atMost(timeout).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull()); + + var event = events.get(transferProcessId); + var msg = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(event.getDataAddress(), Map.of("message", msg), equalTo(msg))); + + } + + @Test + void httpPull_dataTransfer_withEdrCache() { + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, contractExpiresInTenSeconds(), httpDataAddressProperties()); + var dynamicReceiverProps = CONSUMER.dynamicReceiverPrivateProperties(); + + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress(), "HttpData-PULL"); + + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(STARTED.name()); + }); + + var edr = new AtomicReference(); + + // fetch the EDR from the cache + await().atMost(timeout).untilAsserted(() -> { + var returnedEdr = CONSUMER.getEdr(transferProcessId); + assertThat(returnedEdr).isNotNull(); + edr.set(returnedEdr); + }); + + // Do the transfer + var msg = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr.get(), Map.of("message", msg), equalTo(msg))); + + // checks that the EDR is gone once the contract expires + await().atMost(timeout).untilAsserted(() -> { + assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId)); + }); + } + + @Test + void httpPushDataTransfer() { + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); + var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination, "HttpData-PUSH"); + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(COMPLETED.name()); + + given() + .baseUri(CONSUMER.backendService().toString()) + .when() + .get("/api/consumer/data") + .then() + .statusCode(anyOf(is(200), is(204))) + .body(is(notNullValue())); + }); + } + + private JsonObject httpDataAddress(String baseUrl) { + return createObjectBuilder() + .add(TYPE, EDC_NAMESPACE + "DataAddress") + .add(EDC_NAMESPACE + "type", "HttpData") + .add(EDC_NAMESPACE + "properties", createObjectBuilder() + .add(EDC_NAMESPACE + "baseUrl", baseUrl) + .build()) + .build(); + } + + private JsonObject syncDataAddress() { + return createObjectBuilder() + .add(TYPE, EDC_NAMESPACE + "DataAddress") + .add(EDC_NAMESPACE + "type", "HttpProxy") + .build(); + } + + @NotNull + private Map httpDataAddressProperties() { + return Map.of( + "name", "transfer-test", + "baseUrl", PROVIDER.backendService() + "/api/provider/data", + "type", "HttpData", + "proxyQueryParams", "true" + ); + } + + private HttpResponse cacheEdr(HttpRequest request, Map events) { + + try { + var event = MAPPER.readValue(request.getBody().toString(), new TypeReference>() { + }); + events.put(event.getPayload().getTransferProcessId(), event.getPayload()); + return response() + .withStatusCode(HttpStatusCode.OK_200.code()) + .withHeader(HttpHeaderNames.CONTENT_TYPE.toString(), MediaType.PLAIN_TEXT_UTF_8.toString()) + .withBody("{}"); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + } + + private String callbackUrl() { + return String.format("http://localhost:%d/%s", callbacksEndpoint.getLocalPort(), CALLBACK_PATH); + } + + private JsonObject noPrivateProperty() { + return Json.createObjectBuilder().build(); + } + } + + @Nested + @EndToEndTest + class InMemory extends Tests implements InMemorySignalingRuntimes { + + } + + @Nested + @EndToEndTest + class EmbeddedDataPlane extends Tests implements EmbeddedDataPlaneSignalingRuntimes { + + } +}