diff --git a/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/CoreServicesExtension.java b/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/CoreServicesExtension.java index 3c98660ae93..fa52b542df6 100644 --- a/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/CoreServicesExtension.java +++ b/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/CoreServicesExtension.java @@ -20,6 +20,7 @@ import org.eclipse.edc.connector.core.event.EventExecutorServiceContainer; import org.eclipse.edc.connector.core.event.EventRouterImpl; import org.eclipse.edc.connector.core.message.RemoteMessageDispatcherRegistryImpl; +import org.eclipse.edc.connector.core.protocol.ProtocolWebhookRegistryImpl; import org.eclipse.edc.connector.core.validator.DataAddressValidatorRegistryImpl; import org.eclipse.edc.connector.core.validator.JsonObjectValidatorRegistryImpl; import org.eclipse.edc.http.client.ControlApiHttpClientImpl; @@ -42,6 +43,7 @@ import org.eclipse.edc.spi.command.CommandHandlerRegistry; import org.eclipse.edc.spi.event.EventRouter; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.query.CriterionOperatorRegistry; import org.eclipse.edc.spi.system.Hostname; import org.eclipse.edc.spi.system.ServiceExtension; @@ -61,11 +63,8 @@ public class CoreServicesExtension implements ServiceExtension { @Setting(description = "The name of the claim key used to determine the participant identity", defaultValue = DEFAULT_IDENTITY_CLAIM_KEY) public static final String EDC_AGENT_IDENTITY_KEY = "edc.agent.identity.key"; - - private static final String DEFAULT_EDC_HOSTNAME = "localhost"; - public static final String EDC_HOSTNAME = "edc.hostname"; - + private static final String DEFAULT_EDC_HOSTNAME = "localhost"; @Setting(description = "Connector hostname, which e.g. is used in referer urls", defaultValue = DEFAULT_EDC_HOSTNAME, key = EDC_HOSTNAME, warnOnMissingConfig = true) public static String hostname; @@ -173,6 +172,11 @@ public CriterionOperatorRegistry criterionOperatorRegistry() { public ControlApiHttpClient controlApiHttpClient() { return new ControlApiHttpClientImpl(edcHttpClient, controlClientAuthenticationProvider); } + + @Provider + public ProtocolWebhookRegistry protocolWebhookRegistry() { + return new ProtocolWebhookRegistryImpl(); + } } diff --git a/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/protocol/ProtocolWebhookRegistryImpl.java b/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/protocol/ProtocolWebhookRegistryImpl.java new file mode 100644 index 00000000000..161e6f10901 --- /dev/null +++ b/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/protocol/ProtocolWebhookRegistryImpl.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.connector.core.protocol; + +import org.eclipse.edc.spi.protocol.ProtocolWebhook; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; +import org.jetbrains.annotations.Nullable; + +import java.util.HashMap; +import java.util.Map; + +public class ProtocolWebhookRegistryImpl implements ProtocolWebhookRegistry { + private final Map webhooks = new HashMap<>(); + + @Override + public void registerWebhook(String protocol, ProtocolWebhook webhook) { + webhooks.put(protocol, webhook); + } + + @Override + public @Nullable ProtocolWebhook resolve(String protocol) { + return webhooks.get(protocol); + } +} diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/catalog/CatalogProtocolServiceImpl.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/catalog/CatalogProtocolServiceImpl.java index 82a3cd58b63..1185263615b 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/catalog/CatalogProtocolServiceImpl.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/catalog/CatalogProtocolServiceImpl.java @@ -55,8 +55,8 @@ public CatalogProtocolServiceImpl(DatasetResolver datasetResolver, public ServiceResult getCatalog(CatalogRequestMessage message, TokenRepresentation tokenRepresentation) { return transactionContext.execute(() -> protocolTokenValidator.verify(tokenRepresentation, RequestCatalogPolicyContext::new, message) .map(agent -> { - try (var datasets = datasetResolver.query(agent, message.getQuerySpec())) { - var dataServices = dataServiceRegistry.getDataServices(); + try (var datasets = datasetResolver.query(agent, message.getQuerySpec(), message.getProtocol())) { + var dataServices = dataServiceRegistry.getDataServices(message.getProtocol()); return Catalog.Builder.newInstance() .dataServices(dataServices) @@ -69,9 +69,9 @@ public ServiceResult getCatalog(CatalogRequestMessage message, TokenRep } @Override - public @NotNull ServiceResult getDataset(String datasetId, TokenRepresentation tokenRepresentation) { + public @NotNull ServiceResult getDataset(String datasetId, TokenRepresentation tokenRepresentation, String protocol) { return transactionContext.execute(() -> protocolTokenValidator.verify(tokenRepresentation, RequestCatalogPolicyContext::new) - .map(agent -> datasetResolver.getById(agent, datasetId)) + .map(agent -> datasetResolver.getById(agent, datasetId, protocol)) .compose(dataset -> { if (dataset == null) { return ServiceResult.notFound(format("Dataset %s does not exist", datasetId)); diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/catalog/CatalogProtocolServiceImplTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/catalog/CatalogProtocolServiceImplTest.java index a3834ad22cb..bd489b0f114 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/catalog/CatalogProtocolServiceImplTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/catalog/CatalogProtocolServiceImplTest.java @@ -87,8 +87,8 @@ void shouldReturnCatalogWithConnectorDataServiceAndItsDataset() { var dataService = DataService.Builder.newInstance().build(); when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); - when(dataServiceRegistry.getDataServices()).thenReturn(List.of(dataService)); - when(datasetResolver.query(any(), any())).thenReturn(Stream.of(createDataset())); + when(dataServiceRegistry.getDataServices(any())).thenReturn(List.of(dataService)); + when(datasetResolver.query(any(), any(), any())).thenReturn(Stream.of(createDataset())); var result = service.getCatalog(message, tokenRepresentation); @@ -97,7 +97,7 @@ void shouldReturnCatalogWithConnectorDataServiceAndItsDataset() { assertThat(catalog.getDatasets()).hasSize(1); assertThat(catalog.getParticipantId()).isEqualTo("participantId"); }); - verify(datasetResolver).query(eq(participantAgent), eq(querySpec)); + verify(datasetResolver).query(eq(participantAgent), eq(querySpec), eq("protocol")); verify(transactionContext).execute(any(TransactionContext.ResultTransactionBlock.class)); } @@ -125,12 +125,12 @@ void shouldReturnDataset() { var dataset = createDataset(); when(protocolTokenValidator.verify(eq(tokenRepresentation), any())).thenReturn(ServiceResult.success(participantAgent)); - when(datasetResolver.getById(any(), any())).thenReturn(dataset); + when(datasetResolver.getById(any(), any(), any())).thenReturn(dataset); - var result = service.getDataset("datasetId", tokenRepresentation); + var result = service.getDataset("datasetId", tokenRepresentation, "protocol"); assertThat(result).isSucceeded().isEqualTo(dataset); - verify(datasetResolver).getById(participantAgent, "datasetId"); + verify(datasetResolver).getById(participantAgent, "datasetId", "protocol"); verify(transactionContext).execute(any(TransactionContext.ResultTransactionBlock.class)); } @@ -140,9 +140,9 @@ void shouldFail_whenDatasetIsNull() { var tokenRepresentation = createTokenRepresentation(); when(protocolTokenValidator.verify(eq(tokenRepresentation), any())).thenReturn(ServiceResult.success(participantAgent)); - when(datasetResolver.getById(any(), any())).thenReturn(null); + when(datasetResolver.getById(any(), any(), any())).thenReturn(null); - var result = service.getDataset("datasetId", tokenRepresentation); + var result = service.getDataset("datasetId", tokenRepresentation, "protocol"); assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND); } @@ -153,7 +153,7 @@ void shouldFail_whenTokenValidationFails() { when(protocolTokenValidator.verify(eq(tokenRepresentation), any())).thenReturn(ServiceResult.unauthorized("unauthorized")); - var result = service.getDataset("datasetId", tokenRepresentation); + var result = service.getDataset("datasetId", tokenRepresentation, "protocol"); assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(UNAUTHORIZED); } diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessEventDispatchTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessEventDispatchTest.java index a574c2b5927..b0a1fa375de 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessEventDispatchTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessEventDispatchTest.java @@ -51,11 +51,12 @@ import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.message.RemoteMessageDispatcher; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; -import org.eclipse.edc.spi.protocol.ProtocolWebhook; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.DataAddress; import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.ArgumentCaptor; @@ -84,6 +85,8 @@ public class TransferProcessEventDispatchTest { public static final Duration TIMEOUT = Duration.ofSeconds(30); + private static final ProtocolWebhookRegistry PROTOCOL_WEBHOOK_REGISTRY = mock(ProtocolWebhookRegistry.class); + @RegisterExtension static final RuntimeExtension RUNTIME = new RuntimePerClassExtension() .setConfiguration(Map.of( @@ -93,13 +96,20 @@ public class TransferProcessEventDispatchTest { .registerServiceMock(TransferWaitStrategy.class, () -> 1) .registerServiceMock(EventExecutorServiceContainer.class, new EventExecutorServiceContainer(newSingleThreadExecutor())) .registerServiceMock(IdentityService.class, mock()) - .registerServiceMock(ProtocolWebhook.class, () -> "http://dummy") + .registerServiceMock(ProtocolWebhookRegistry.class, PROTOCOL_WEBHOOK_REGISTRY) .registerServiceMock(PolicyArchive.class, mock()) .registerServiceMock(ContractNegotiationStore.class, mock()) .registerServiceMock(ParticipantAgentService.class, mock()) .registerServiceMock(DataPlaneClientFactory.class, mock()); + private final EventSubscriber eventSubscriber = mock(); + @BeforeEach + void setup() { + // setup + when(PROTOCOL_WEBHOOK_REGISTRY.resolve(any())).thenReturn(() -> "http://dummy"); + } + @Test void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService service, TransferProcessProtocolService protocolService, diff --git a/core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/DataServiceRegistryImpl.java b/core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/DataServiceRegistryImpl.java index e3bd1193fde..e0c4b6c8044 100644 --- a/core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/DataServiceRegistryImpl.java +++ b/core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/DataServiceRegistryImpl.java @@ -19,19 +19,21 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class DataServiceRegistryImpl implements DataServiceRegistry { - private final List dataServices = new ArrayList<>(); + private final Map> dataServices = new ConcurrentHashMap<>(); @Override - public void register(DataService dataService) { - dataServices.add(dataService); + public void register(String protocol, DataService dataService) { + dataServices.computeIfAbsent(protocol, k -> new ArrayList<>()).add(dataService); } @Override - public List getDataServices() { - return dataServices; + public List getDataServices(String protocol) { + return dataServices.computeIfAbsent(protocol, k -> new ArrayList<>()); } } diff --git a/core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/DatasetResolverImpl.java b/core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/DatasetResolverImpl.java index b98ba98ae93..17e46cb53df 100644 --- a/core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/DatasetResolverImpl.java +++ b/core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/DatasetResolverImpl.java @@ -63,32 +63,32 @@ public DatasetResolverImpl(ContractDefinitionResolver contractDefinitionResolver @Override @NotNull - public Stream query(ParticipantAgent agent, QuerySpec querySpec) { + public Stream query(ParticipantAgent agent, QuerySpec querySpec, String protocol) { var resolved = contractDefinitionResolver.resolveFor(agent); var contractDefinitions = resolved.contractDefinitions(); if (contractDefinitions.isEmpty()) { return Stream.empty(); } - + var assetsQuery = QuerySpec.Builder.newInstance().offset(0).limit(MAX_VALUE).filter(querySpec.getFilterExpression()).build(); return assetIndex.queryAssets(assetsQuery) - .map(asset -> toDataset(contractDefinitions, asset, resolved.policies())) + .map(asset -> toDataset(contractDefinitions, asset, resolved.policies(), protocol)) .filter(Dataset::hasOffers) .skip(querySpec.getOffset()) .limit(querySpec.getLimit()); } @Override - public Dataset getById(ParticipantAgent agent, String id) { + public Dataset getById(ParticipantAgent agent, String id, String protocol) { var resolved = contractDefinitionResolver.resolveFor(agent); var contractDefinitions = resolved.contractDefinitions(); if (contractDefinitions.isEmpty()) { return null; } - + return Optional.of(id) .map(assetIndex::findById) - .map(asset -> toDataset(contractDefinitions, asset, resolved.policies())) + .map(asset -> toDataset(contractDefinitions, asset, resolved.policies(), protocol)) .filter(Dataset::hasOffers) .orElse(null); } @@ -106,9 +106,9 @@ public Dataset getById(ParticipantAgent agent, String id) { .build()); } - private Dataset toDataset(List contractDefinitions, Asset asset, Map policies) { + private Dataset toDataset(List contractDefinitions, Asset asset, Map policies, String protocol) { - var distributions = distributionResolver.getDistributions(asset); + var distributions = distributionResolver.getDistributions(protocol, asset); var datasetBuilder = buildDataset(asset) .id(asset.getId()) .distributions(distributions) diff --git a/core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/DefaultDistributionResolver.java b/core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/DefaultDistributionResolver.java index d2f41d6d6ec..e2510162047 100644 --- a/core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/DefaultDistributionResolver.java +++ b/core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/DefaultDistributionResolver.java @@ -36,7 +36,7 @@ public DefaultDistributionResolver(DataServiceRegistry dataServiceRegistry, Data } @Override - public List getDistributions(Asset asset) { + public List getDistributions(String protocol, Asset asset) { if (asset.isCatalog()) { return List.of(Distribution.Builder.newInstance() .format(asset.getDataAddress().getType()) @@ -45,12 +45,12 @@ public List getDistributions(Asset asset) { .build()) .build()); } - return dataFlowManager.transferTypesFor(asset).stream().map(this::createDistribution).toList(); + return dataFlowManager.transferTypesFor(asset).stream().map((format) -> createDistribution(protocol, format)).toList(); } - private Distribution createDistribution(String format) { + private Distribution createDistribution(String protocol, String format) { var builder = Distribution.Builder.newInstance().format(format); - dataServiceRegistry.getDataServices().forEach(builder::dataService); + dataServiceRegistry.getDataServices(protocol).forEach(builder::dataService); return builder.build(); } } diff --git a/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DataServiceRegistryImplTest.java b/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DataServiceRegistryImplTest.java index 61f0b91d76e..9a1e6a83ff1 100644 --- a/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DataServiceRegistryImplTest.java +++ b/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DataServiceRegistryImplTest.java @@ -27,10 +27,24 @@ class DataServiceRegistryImplTest { void shouldReturnRegisteredDataService() { var dataService = DataService.Builder.newInstance().build(); - registry.register(dataService); - var dataServices = registry.getDataServices(); + var protocol = "protocol"; + + registry.register(protocol, dataService); + var dataServices = registry.getDataServices(protocol); assertThat(dataServices).containsExactly(dataService); } + @Test + void shouldReturnEmptyDataServices() { + var dataService = DataService.Builder.newInstance().build(); + + var protocol = "protocol"; + + registry.register(protocol, dataService); + var dataServices = registry.getDataServices("unknownProtocol"); + + assertThat(dataServices).isEmpty(); + } + } diff --git a/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DatasetResolverImplIntegrationTest.java b/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DatasetResolverImplIntegrationTest.java index a4fd93d08fb..03fcbcda12f 100644 --- a/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DatasetResolverImplIntegrationTest.java +++ b/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DatasetResolverImplIntegrationTest.java @@ -102,7 +102,7 @@ void shouldLimitResult_withHeterogeneousChunks() { var to = 50; var querySpec = QuerySpec.Builder.newInstance().range(new Range(from, to)).build(); - var datasets = resolver.query(createAgent(), querySpec); + var datasets = resolver.query(createAgent(), querySpec, "protocol"); assertThat(datasets).hasSize(to - from); } @@ -126,7 +126,7 @@ void should_return_offers_subset_when_across_multiple_contract_definitions(int f when(contractDefinitionResolver.resolveFor(isA(ParticipantAgent.class))).thenReturn(new ResolvedContractDefinitions(List.of(contractDefinition1, contractDefinition2))); var querySpec = QuerySpec.Builder.newInstance().range(new Range(from, to)).build(); - var datasets = resolver.query(createAgent(), querySpec); + var datasets = resolver.query(createAgent(), querySpec, "protocol"); assertThat(datasets).hasSize(min(requestedRange, maximumRange)); } @@ -149,7 +149,7 @@ void shouldLimitResult_insufficientAssets() { var querySpec = QuerySpec.Builder.newInstance().range(new Range(from, to)).build(); // 4 definitions, 10 assets each = 40 offers total -> offset 20 ==> result = 20 - var dataset = resolver.query(createAgent(), querySpec); + var dataset = resolver.query(createAgent(), querySpec, "protocol"); assertThat(dataset).hasSize(4); } @@ -163,7 +163,7 @@ void shouldLimitResult_pageOffsetLargerThanNumAssets() { var to = 50; var querySpec = QuerySpec.Builder.newInstance().range(new Range(from, to)).build(); // 2 definitions, 10 assets each = 20 offers total -> offset of 25 is outside - var datasets = resolver.query(createAgent(), querySpec); + var datasets = resolver.query(createAgent(), querySpec, "protocol"); assertThat(datasets).isEmpty(); } diff --git a/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DatasetResolverImplTest.java b/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DatasetResolverImplTest.java index eb84332b321..7dfc133e80a 100644 --- a/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DatasetResolverImplTest.java +++ b/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DatasetResolverImplTest.java @@ -80,6 +80,30 @@ void setUp() { CriterionOperatorRegistryImpl.ofDefaults()); } + private ContractDefinition.Builder contractDefinitionBuilder(String id) { + return ContractDefinition.Builder.newInstance() + .id(id) + .accessPolicyId("access") + .contractPolicyId("contract"); + } + + private Asset.Builder createAsset(String id) { + return Asset.Builder.newInstance().id(id).name("test asset " + id); + } + + private ParticipantAgent createParticipantAgent() { + return new ParticipantAgent(emptyMap(), emptyMap()); + } + + private DataService createDataService() { + return DataService.Builder.newInstance().build(); + } + + @NotNull + private ThrowingExtractor getId() { + return it -> it.getProperty(Asset.PROPERTY_ID); + } + @Nested class Query { @Test @@ -91,9 +115,9 @@ void shouldReturnOneDatasetPerAsset() { when(definitionResolver.resolveFor(any())).thenReturn(new ResolvedContractDefinitions(List.of(contractDefinition))); when(assetIndex.queryAssets(isA(QuerySpec.class))).thenReturn(Stream.of(createAsset("assetId").property("key", "value").build())); when(policyStore.findById("contractPolicyId")).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build()); - when(distributionResolver.getDistributions(isA(Asset.class))).thenReturn(List.of(distribution)); + when(distributionResolver.getDistributions(any(), isA(Asset.class))).thenReturn(List.of(distribution)); - var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none()); + var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none(), "protocol"); assertThat(datasets).isNotNull().hasSize(1).first().satisfies(dataset -> { assertThat(dataset.getId()).isEqualTo("assetId"); @@ -111,7 +135,7 @@ void shouldReturnOneDatasetPerAsset() { void shouldNotQueryAssets_whenNoValidContractDefinition() { when(definitionResolver.resolveFor(any())).thenReturn(new ResolvedContractDefinitions(emptyList())); - var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none()); + var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none(), "protocol"); assertThat(datasets).isNotNull().isEmpty(); verify(assetIndex, never()).queryAssets(any()); @@ -124,7 +148,7 @@ void shouldReturnNoDataset_whenPolicyNotFound() { when(assetIndex.queryAssets(isA(QuerySpec.class))).thenReturn(Stream.of(createAsset("id").build())); when(policyStore.findById("contractPolicyId")).thenReturn(null); - var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none()); + var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none(), "protocol"); assertThat(datasets).isNotNull().isEmpty(); } @@ -141,7 +165,7 @@ void shouldReturnOneDataset_whenMultipleDefinitionsOnSameAsset() { when(policyStore.findById("policy1")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy1).build()); when(policyStore.findById("policy2")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy2).build()); - var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none()); + var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none(), "protocol"); assertThat(datasets).hasSize(1).first().satisfies(dataset -> { assertThat(dataset.getId()).isEqualTo("assetId"); @@ -170,7 +194,7 @@ void shouldFilterAssetsByPassedCriteria() { var additionalCriterion = new Criterion(EDC_NAMESPACE + "key", "=", "value"); var querySpec = QuerySpec.Builder.newInstance().filter(additionalCriterion).build(); - datasetResolver.query(createParticipantAgent(), querySpec); + datasetResolver.query(createParticipantAgent(), querySpec, "protocol"); verify(assetIndex).queryAssets(and( isA(QuerySpec.class), @@ -188,7 +212,7 @@ void shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_contained() { when(policyStore.findById("contractPolicyId")).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build()); var querySpec = QuerySpec.Builder.newInstance().range(new Range(2, 5)).build(); - var datasets = datasetResolver.query(createParticipantAgent(), querySpec); + var datasets = datasetResolver.query(createParticipantAgent(), querySpec, "protocol"); assertThat(datasets).hasSize(3).map(getId()).containsExactly("2", "3", "4"); } @@ -203,7 +227,7 @@ void shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_overflowing() { when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build()); var querySpec = QuerySpec.Builder.newInstance().range(new Range(7, 15)).build(); - var datasets = datasetResolver.query(createParticipantAgent(), querySpec); + var datasets = datasetResolver.query(createParticipantAgent(), querySpec, "protocol"); assertThat(datasets).hasSize(3).map(getId()).containsExactly("7", "8", "9"); } @@ -218,7 +242,7 @@ void shouldLimitDataset_whenMultipleDefinitionAndMultipleAssets_across() { when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build()); var querySpec = QuerySpec.Builder.newInstance().range(new Range(6, 14)).build(); - var datasets = datasetResolver.query(createParticipantAgent(), querySpec); + var datasets = datasetResolver.query(createParticipantAgent(), querySpec, "protocol"); assertThat(datasets).hasSize(8).map(getId()).containsExactly("6", "7", "8", "9", "10", "11", "12", "13"); } @@ -233,7 +257,7 @@ void shouldLimitDataset_whenMultipleDefinitionsWithSameAssets() { when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build()); var querySpec = QuerySpec.Builder.newInstance().range(new Range(6, 8)).build(); - var datasets = datasetResolver.query(createParticipantAgent(), querySpec); + var datasets = datasetResolver.query(createParticipantAgent(), querySpec, "protocol"); assertThat(datasets).hasSize(2) .allSatisfy(dataset -> assertThat(dataset.getOffers()).hasSize(2)) @@ -256,9 +280,9 @@ void shouldReturnCatalogWithinCatalog_whenAssetIsCatalogAsset() { .dataAddress(DataAddress.Builder.newInstance().type(HttpDataAddressSchema.HTTP_DATA_TYPE).build()) .build())); when(policyStore.findById("contractPolicyId")).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build()); - when(distributionResolver.getDistributions(isA(Asset.class))).thenReturn(List.of(distribution)); + when(distributionResolver.getDistributions(any(), isA(Asset.class))).thenReturn(List.of(distribution)); - var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none()); + var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none(), "protocol"); assertThat(datasets).isNotNull().hasSize(1).first().satisfies(dataset -> { assertThat(dataset).isInstanceOf(Catalog.class); @@ -279,9 +303,9 @@ void shouldNotFetchContractPolicy_whenIsSameAsAccessPolicy() { var cachedPolicies = new HashMap<>(Map.of("samePolicy", Policy.Builder.newInstance().build())); when(definitionResolver.resolveFor(any())).thenReturn(new ResolvedContractDefinitions(List.of(contractDefinition), cachedPolicies)); when(assetIndex.queryAssets(isA(QuerySpec.class))).thenReturn(Stream.of(createAsset("assetId").property("key", "value").build())); - when(distributionResolver.getDistributions(isA(Asset.class))).thenReturn(List.of(distribution)); + when(distributionResolver.getDistributions(any(), isA(Asset.class))).thenReturn(List.of(distribution)); - var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none()); + var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none(), "protocol"); assertThat(datasets).hasSize(1); verify(policyStore, never()).findById(any()); @@ -303,7 +327,7 @@ void shouldReturnDataset() { when(policyStore.findById("policy2")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy2).build()); var participantAgent = createParticipantAgent(); - var dataset = datasetResolver.getById(participantAgent, "datasetId"); + var dataset = datasetResolver.getById(participantAgent, "datasetId", "protocol"); assertThat(dataset).isNotNull(); assertThat(dataset.getId()).isEqualTo("datasetId"); @@ -327,7 +351,7 @@ void shouldReturnNull_whenAssetNotFound() { when(assetIndex.findById(any())).thenReturn(null); var participantAgent = createParticipantAgent(); - var dataset = datasetResolver.getById(participantAgent, "datasetId"); + var dataset = datasetResolver.getById(participantAgent, "datasetId", "protocol"); assertThat(dataset).isNull(); } @@ -338,7 +362,7 @@ void shouldReturnNull_whenNoValidContractDefinition() { when(definitionResolver.resolveFor(any())).thenReturn(new ResolvedContractDefinitions(emptyList())); - var dataset = datasetResolver.getById(participantAgent, "datasetId"); + var dataset = datasetResolver.getById(participantAgent, "datasetId", "protocol"); assertThat(dataset).isNull(); verify(assetIndex, never()).findById(any()); @@ -359,34 +383,10 @@ void shouldReturnNull_whenNoValidContractDefinitionForAsset() { when(definitionResolver.resolveFor(any())).thenReturn(new ResolvedContractDefinitions(List.of(contractDefinition))); when(assetIndex.findById(any())).thenReturn(createAsset(assetId).build()); - var dataset = datasetResolver.getById(participantAgent, assetId); + var dataset = datasetResolver.getById(participantAgent, assetId, "protocol"); assertThat(dataset).isNull(); } } - private ContractDefinition.Builder contractDefinitionBuilder(String id) { - return ContractDefinition.Builder.newInstance() - .id(id) - .accessPolicyId("access") - .contractPolicyId("contract"); - } - - private Asset.Builder createAsset(String id) { - return Asset.Builder.newInstance().id(id).name("test asset " + id); - } - - private ParticipantAgent createParticipantAgent() { - return new ParticipantAgent(emptyMap(), emptyMap()); - } - - private DataService createDataService() { - return DataService.Builder.newInstance().build(); - } - - @NotNull - private ThrowingExtractor getId() { - return it -> it.getProperty(Asset.PROPERTY_ID); - } - } diff --git a/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DefaultDistributionResolverTest.java b/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DefaultDistributionResolverTest.java index 8b39a601e99..ed8d7615021 100644 --- a/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DefaultDistributionResolverTest.java +++ b/core/control-plane/control-plane-catalog/src/test/java/org/eclipse/edc/connector/controlplane/catalog/DefaultDistributionResolverTest.java @@ -40,13 +40,13 @@ class DefaultDistributionResolverTest { @Test void shouldReturnDistributionsForEveryTransferType() { - when(dataServiceRegistry.getDataServices()).thenReturn(List.of(dataService)); + when(dataServiceRegistry.getDataServices(any())).thenReturn(List.of(dataService)); when(dataFlowManager.transferTypesFor(any())).thenReturn(Set.of("type1", "type2")); var dataAddress = DataAddress.Builder.newInstance().type("any").build(); var asset = Asset.Builder.newInstance().dataAddress(dataAddress).build(); - var distributions = resolver.getDistributions(asset); + var distributions = resolver.getDistributions(any(), asset); assertThat(distributions).hasSize(2) .anySatisfy(distribution -> { @@ -61,7 +61,7 @@ void shouldReturnDistributionsForEveryTransferType() { @Test void shouldReturnDistribution_whenAssetIsCatalog() { - when(dataServiceRegistry.getDataServices()).thenReturn(List.of(dataService)); + when(dataServiceRegistry.getDataServices(any())).thenReturn(List.of(dataService)); when(dataFlowManager.transferTypesFor(any())).thenReturn(Set.of("type1", "type2")); var dataAddress = DataAddress.Builder.newInstance() @@ -74,7 +74,7 @@ void shouldReturnDistribution_whenAssetIsCatalog() { .description("test description") .build(); - var distributions = resolver.getDistributions(asset); + var distributions = resolver.getDistributions(any(), asset); assertThat(distributions).hasSize(1) .anySatisfy(distribution -> { diff --git a/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/ContractCoreExtension.java b/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/ContractCoreExtension.java index 6b92be2a264..22cf49000fe 100644 --- a/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/ContractCoreExtension.java +++ b/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/ContractCoreExtension.java @@ -46,7 +46,7 @@ import org.eclipse.edc.spi.event.EventRouter; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.eclipse.edc.spi.monitor.Monitor; -import org.eclipse.edc.spi.protocol.ProtocolWebhook; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.retry.ExponentialWaitStrategy; import org.eclipse.edc.spi.system.ExecutorInstrumentation; import org.eclipse.edc.spi.system.ServiceExtension; @@ -136,7 +136,7 @@ public class ContractCoreExtension implements ServiceExtension { private RuleBindingRegistry ruleBindingRegistry; @Inject - private ProtocolWebhook protocolWebhook; + private ProtocolWebhookRegistry protocolWebhookRegistry; @Inject private ContractNegotiationObservable observable; @@ -209,7 +209,7 @@ private void registerServices(ServiceExtensionContext context) { .policyStore(policyStore) .batchSize(consumerStateMachineBatchSize) .entityRetryProcessConfiguration(consumerEntityRetryProcessConfiguration()) - .protocolWebhook(protocolWebhook) + .protocolWebhookRegistry(protocolWebhookRegistry) .pendingGuard(pendingGuard) .build(); @@ -226,7 +226,7 @@ private void registerServices(ServiceExtensionContext context) { .policyStore(policyStore) .batchSize(providerStateMachineBatchSize) .entityRetryProcessConfiguration(providerEntityRetryProcessConfiguration()) - .protocolWebhook(protocolWebhook) + .protocolWebhookRegistry(protocolWebhookRegistry) .pendingGuard(pendingGuard) .build(); diff --git a/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/negotiation/AbstractContractNegotiationManager.java b/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/negotiation/AbstractContractNegotiationManager.java index 125766f9dac..06b56a191d6 100644 --- a/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/negotiation/AbstractContractNegotiationManager.java +++ b/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/negotiation/AbstractContractNegotiationManager.java @@ -25,7 +25,7 @@ import org.eclipse.edc.connector.controlplane.contract.spi.types.protocol.ContractNegotiationAck; import org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStore; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; -import org.eclipse.edc.spi.protocol.ProtocolWebhook; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.types.domain.message.ProcessRemoteMessage; import org.eclipse.edc.statemachine.AbstractStateEntityManager; @@ -47,7 +47,7 @@ public abstract class AbstractContractNegotiationManager extends AbstractStateEn protected RemoteMessageDispatcherRegistry dispatcherRegistry; protected ContractNegotiationObservable observable; protected PolicyDefinitionStore policyStore; - protected ProtocolWebhook protocolWebhook; + protected ProtocolWebhookRegistry protocolWebhookRegistry; protected ContractNegotiationPendingGuard pendingGuard = it -> false; abstract ContractNegotiation.Type type(); @@ -61,12 +61,6 @@ protected Processor processNegotiationsInState(ContractNegotiationStates state, .build(); } - private boolean setPending(ContractNegotiation contractNegotiation) { - contractNegotiation.setPending(true); - update(contractNegotiation); - return true; - } - /** * Processes {@link ContractNegotiation} in state TERMINATING. Tries to send a contract termination to the counter-party. * If this succeeds, the ContractNegotiation is transitioned to state TERMINATED. Else, it is transitioned @@ -89,7 +83,7 @@ protected boolean processTerminating(ContractNegotiation negotiation) { } protected AsyncStatusResultRetryProcess dispatch(ProcessRemoteMessage.Builder messageBuilder, - ContractNegotiation negotiation, Class responseType) { + ContractNegotiation negotiation, Class responseType) { messageBuilder.counterPartyAddress(negotiation.getCounterPartyAddress()) .counterPartyId(negotiation.getCounterPartyId()) .protocol(negotiation.getProtocol()) @@ -208,6 +202,12 @@ protected void transitionToTerminated(ContractNegotiation negotiation) { observable.invokeForEach(l -> l.terminated(negotiation)); } + private boolean setPending(ContractNegotiation contractNegotiation) { + contractNegotiation.setPending(true); + update(contractNegotiation); + return true; + } + public static class Builder extends AbstractStateEntityManager.Builder> { @@ -220,6 +220,17 @@ public Builder self() { return this; } + @Override + public T build() { + super.build(); + Objects.requireNonNull(manager.participantId, "participantId"); + Objects.requireNonNull(manager.dispatcherRegistry, "dispatcherRegistry"); + Objects.requireNonNull(manager.observable, "observable"); + + Objects.requireNonNull(manager.policyStore, "policyStore"); + return manager; + } + public Builder participantId(String id) { manager.participantId = id; return this; @@ -240,8 +251,8 @@ public Builder policyStore(PolicyDefinitionStore policyStore) { return this; } - public Builder protocolWebhook(ProtocolWebhook protocolWebhook) { - manager.protocolWebhook = protocolWebhook; + public Builder protocolWebhookRegistry(ProtocolWebhookRegistry protocolWebhookRegistry) { + manager.protocolWebhookRegistry = protocolWebhookRegistry; return this; } @@ -249,17 +260,6 @@ public Builder pendingGuard(ContractNegotiationPendingGuard pendingGuard) { manager.pendingGuard = pendingGuard; return this; } - - @Override - public T build() { - super.build(); - Objects.requireNonNull(manager.participantId, "participantId"); - Objects.requireNonNull(manager.dispatcherRegistry, "dispatcherRegistry"); - Objects.requireNonNull(manager.observable, "observable"); - - Objects.requireNonNull(manager.policyStore, "policyStore"); - return manager; - } } } diff --git a/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ConsumerContractNegotiationManagerImpl.java b/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ConsumerContractNegotiationManagerImpl.java index 08e465b77b4..57d6cbe63fa 100644 --- a/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ConsumerContractNegotiationManagerImpl.java +++ b/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ConsumerContractNegotiationManagerImpl.java @@ -49,17 +49,6 @@ public class ConsumerContractNegotiationManagerImpl extends AbstractContractNego private ConsumerContractNegotiationManagerImpl() { } - @Override - protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) { - return builder - .processor(processNegotiationsInState(INITIAL, this::processInitial)) - .processor(processNegotiationsInState(REQUESTING, this::processRequesting)) - .processor(processNegotiationsInState(ACCEPTING, this::processAccepting)) - .processor(processNegotiationsInState(AGREED, this::processAgreed)) - .processor(processNegotiationsInState(VERIFYING, this::processVerifying)) - .processor(processNegotiationsInState(TERMINATING, this::processTerminating)); - } - /** * Initiates a new {@link ContractNegotiation}. The ContractNegotiation is created and persisted, which moves it to * state REQUESTING. @@ -92,6 +81,17 @@ ContractNegotiation.Type type() { return CONSUMER; } + @Override + protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) { + return builder + .processor(processNegotiationsInState(INITIAL, this::processInitial)) + .processor(processNegotiationsInState(REQUESTING, this::processRequesting)) + .processor(processNegotiationsInState(ACCEPTING, this::processAccepting)) + .processor(processNegotiationsInState(AGREED, this::processAgreed)) + .processor(processNegotiationsInState(VERIFYING, this::processVerifying)) + .processor(processNegotiationsInState(TERMINATING, this::processTerminating)); + } + /** * Processes {@link ContractNegotiation} in state INITIAL. Transition ContractNegotiation to REQUESTING. * @@ -112,17 +112,26 @@ private boolean processInitial(ContractNegotiation negotiation) { */ @WithSpan private boolean processRequesting(ContractNegotiation negotiation) { - var messageBuilder = ContractRequestMessage.Builder.newInstance() - .contractOffer(negotiation.getLastContractOffer()) - .callbackAddress(protocolWebhook.url()) - .type(ContractRequestMessage.Type.INITIAL); - - return dispatch(messageBuilder, negotiation, ContractNegotiationAck.class) - .onSuccessResult(this::transitionToRequested) - .onFailure((n, throwable) -> transitionToRequesting(n)) - .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) - .onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send request to provider: %s", throwable.getMessage()))) - .execute("[Consumer] send request"); + + var callbackAddress = protocolWebhookRegistry.resolve(negotiation.getProtocol()); + + if (callbackAddress != null) { + var messageBuilder = ContractRequestMessage.Builder.newInstance() + .contractOffer(negotiation.getLastContractOffer()) + .callbackAddress(callbackAddress.url()) + .type(ContractRequestMessage.Type.INITIAL); + + return dispatch(messageBuilder, negotiation, ContractNegotiationAck.class) + .onSuccessResult(this::transitionToRequested) + .onFailure((n, throwable) -> transitionToRequesting(n)) + .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) + .onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send request to provider: %s", throwable.getMessage()))) + .execute("[Consumer] send request"); + } else { + transitionToTerminated(negotiation, "No callback address found for protocol: %s".formatted(negotiation.getProtocol())); + return true; + } + } /** diff --git a/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ProviderContractNegotiationManagerImpl.java b/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ProviderContractNegotiationManagerImpl.java index e7839138b8f..00375faf58c 100644 --- a/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ProviderContractNegotiationManagerImpl.java +++ b/core/control-plane/control-plane-contract/src/main/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ProviderContractNegotiationManagerImpl.java @@ -75,16 +75,24 @@ protected ContractNegotiation.Type type() { */ @WithSpan private boolean processOffering(ContractNegotiation negotiation) { - var messageBuilder = ContractOfferMessage.Builder.newInstance() - .contractOffer(negotiation.getLastContractOffer()) - .callbackAddress(protocolWebhook.url()); - return dispatch(messageBuilder, negotiation, ContractNegotiationAck.class) - .onSuccessResult(this::transitionToOffered) - .onFailure((n, throwable) -> transitionToOffering(n)) - .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) - .onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send offer to consumer: %s", throwable.getMessage()))) - .execute("[Provider] send offer"); + var callbackAddress = protocolWebhookRegistry.resolve(negotiation.getProtocol()); + + if (callbackAddress != null) { + var messageBuilder = ContractOfferMessage.Builder.newInstance() + .contractOffer(negotiation.getLastContractOffer()) + .callbackAddress(callbackAddress.url()); + + return dispatch(messageBuilder, negotiation, ContractNegotiationAck.class) + .onSuccessResult(this::transitionToOffered) + .onFailure((n, throwable) -> transitionToOffering(n)) + .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) + .onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send offer to consumer: %s", throwable.getMessage()))) + .execute("[Provider] send offer"); + } else { + transitionToTerminated(negotiation, "No callback address found for protocol: %s".formatted(negotiation.getProtocol())); + return true; + } } /** diff --git a/core/control-plane/control-plane-contract/src/test/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ConsumerContractNegotiationManagerImplTest.java b/core/control-plane/control-plane-contract/src/test/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ConsumerContractNegotiationManagerImplTest.java index 4da57db523a..fd964518a99 100644 --- a/core/control-plane/control-plane-contract/src/test/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ConsumerContractNegotiationManagerImplTest.java +++ b/core/control-plane/control-plane-contract/src/test/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ConsumerContractNegotiationManagerImplTest.java @@ -33,7 +33,7 @@ import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.eclipse.edc.spi.monitor.Monitor; -import org.eclipse.edc.spi.protocol.ProtocolWebhook; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.retry.ExponentialWaitStrategy; @@ -93,14 +93,13 @@ class ConsumerContractNegotiationManagerImplTest { private final RemoteMessageDispatcherRegistry dispatcherRegistry = mock(); private final PolicyDefinitionStore policyStore = mock(); private final ContractNegotiationListener listener = mock(); - private final ProtocolWebhook protocolWebhook = mock(); + private final ProtocolWebhookRegistry protocolWebhookRegistry = mock(); private final String protocolWebhookUrl = "http://protocol.webhook/url"; private final ContractNegotiationPendingGuard pendingGuard = mock(); private ConsumerContractNegotiationManagerImpl manager; @BeforeEach void setUp() { - when(protocolWebhook.url()).thenReturn(protocolWebhookUrl); var observable = new ContractNegotiationObservableImpl(); observable.registerListener(listener); @@ -113,7 +112,7 @@ void setUp() { .store(store) .policyStore(policyStore) .entityRetryProcessConfiguration(new EntityRetryProcessConfiguration(RETRY_LIMIT, () -> new ExponentialWaitStrategy(0L))) - .protocolWebhook(protocolWebhook) + .protocolWebhookRegistry(protocolWebhookRegistry) .pendingGuard(pendingGuard) .build(); } @@ -166,7 +165,8 @@ void requesting_shouldSendOfferAndTransitionRequested() { var ack = ContractNegotiationAck.Builder.newInstance().providerPid("providerPid").build(); when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack))); when(store.findById(negotiation.getId())).thenReturn(negotiation); - when(protocolWebhook.url()).thenReturn(protocolWebhookUrl); + when(protocolWebhookRegistry.resolve(any())).thenReturn(() -> protocolWebhookUrl); + manager.start(); @@ -188,7 +188,7 @@ void requesting_shouldSendMessageWithId_whenCorrelationIdIsNull_toSupportOldProt var ack = ContractNegotiationAck.Builder.newInstance().providerPid("providerPid").build(); when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack))); when(store.findById(negotiation.getId())).thenReturn(negotiation); - when(protocolWebhook.url()).thenReturn(protocolWebhookUrl); + when(protocolWebhookRegistry.resolve(any())).thenReturn(() -> protocolWebhookUrl); manager.start(); @@ -207,6 +207,25 @@ void requesting_shouldSendMessageWithId_whenCorrelationIdIsNull_toSupportOldProt }); } + @Test + void requesting_shouldTransitionToTerminated_whenProtocolNotResolved() { + var negotiation = contractNegotiationBuilder().correlationId("correlationId").state(REQUESTING.code()).contractOffer(contractOffer()).build(); + when(store.nextNotLeased(anyInt(), stateIs(REQUESTING.code()))).thenReturn(List.of(negotiation)).thenReturn(emptyList()); + var ack = ContractNegotiationAck.Builder.newInstance().providerPid("providerPid").build(); + when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack))); + when(store.findById(negotiation.getId())).thenReturn(negotiation); + when(protocolWebhookRegistry.resolve(any())).thenReturn(null); + + + manager.start(); + + await().untilAsserted(() -> { + verify(store).save(argThat(p -> p.getState() == TERMINATED.code())); + verifyNoInteractions(dispatcherRegistry); + verify(listener).terminated(any()); + }); + } + @Test void accepting_shouldSendAcceptedMessageAndTransitionToApproved() { var negotiation = contractNegotiationBuilder().state(ACCEPTING.code()).contractOffer(contractOffer()).build(); @@ -298,6 +317,7 @@ void dispatchException(ContractNegotiationStates starting, ContractNegotiationSt when(store.nextNotLeased(anyInt(), stateIs(starting.code()))).thenReturn(List.of(negotiation)).thenReturn(emptyList()); when(dispatcherRegistry.dispatch(any(), any())).thenReturn(result); when(store.findById(negotiation.getId())).thenReturn(negotiation); + when(protocolWebhookRegistry.resolve(negotiation.getProtocol())).thenReturn(() -> protocolWebhookUrl); manager.start(); diff --git a/core/control-plane/control-plane-contract/src/test/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ContractNegotiationIntegrationTest.java b/core/control-plane/control-plane-contract/src/test/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ContractNegotiationIntegrationTest.java index 9a9726ad167..6d5c9150069 100644 --- a/core/control-plane/control-plane-contract/src/test/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ContractNegotiationIntegrationTest.java +++ b/core/control-plane/control-plane-contract/src/test/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ContractNegotiationIntegrationTest.java @@ -48,6 +48,7 @@ import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.eclipse.edc.spi.monitor.ConsoleMonitor; import org.eclipse.edc.spi.protocol.ProtocolWebhook; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.query.CriterionOperatorRegistry; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; @@ -118,12 +119,12 @@ class ContractNegotiationIntegrationTest { private final RemoteMessageDispatcherRegistry consumerDispatcherRegistry = mock(); private final ProtocolTokenValidator protocolTokenValidator = mock(); private final ProtocolWebhook protocolWebhook = () -> "http://dummy"; + private final ProtocolWebhookRegistry protocolWebhookRegistry = mock(); + private final AtomicReference providerNegotiationId = new AtomicReference<>(null); + private final NoopTransactionContext transactionContext = new NoopTransactionContext(); protected ParticipantAgent participantAgent = new ParticipantAgent(Collections.emptyMap(), Collections.emptyMap()); protected TokenRepresentation tokenRepresentation = TokenRepresentation.Builder.newInstance().build(); private String consumerNegotiationId; - private final AtomicReference providerNegotiationId = new AtomicReference<>(null); - private final NoopTransactionContext transactionContext = new NoopTransactionContext(); - private ProviderContractNegotiationManagerImpl providerManager; private ConsumerContractNegotiationManagerImpl consumerManager; private ContractNegotiationProtocolService consumerService; @@ -132,6 +133,7 @@ class ContractNegotiationIntegrationTest { @BeforeEach void init() { var monitor = new ConsoleMonitor(); + when(protocolWebhookRegistry.resolve(any())).thenReturn(protocolWebhook); providerManager = ProviderContractNegotiationManagerImpl.Builder.newInstance() .participantId(PROVIDER_ID) @@ -141,7 +143,7 @@ void init() { .observable(mock()) .store(providerStore) .policyStore(mock()) - .protocolWebhook(protocolWebhook) + .protocolWebhookRegistry(protocolWebhookRegistry) .build(); consumerManager = ConsumerContractNegotiationManagerImpl.Builder.newInstance() @@ -151,7 +153,7 @@ void init() { .observable(mock()) .store(consumerStore) .policyStore(mock()) - .protocolWebhook(protocolWebhook) + .protocolWebhookRegistry(protocolWebhookRegistry) .build(); when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), any())).thenReturn(ServiceResult.success(participantAgent)); diff --git a/core/control-plane/control-plane-contract/src/test/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ProviderContractNegotiationManagerImplTest.java b/core/control-plane/control-plane-contract/src/test/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ProviderContractNegotiationManagerImplTest.java index 5f432d5285d..3880ebbd3e6 100644 --- a/core/control-plane/control-plane-contract/src/test/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ProviderContractNegotiationManagerImplTest.java +++ b/core/control-plane/control-plane-contract/src/test/java/org/eclipse/edc/connector/controlplane/contract/negotiation/ProviderContractNegotiationManagerImplTest.java @@ -33,7 +33,7 @@ import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; -import org.eclipse.edc.spi.protocol.ProtocolWebhook; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.retry.ExponentialWaitStrategy; @@ -95,7 +95,7 @@ class ProviderContractNegotiationManagerImplTest { private final PolicyDefinitionStore policyStore = mock(); private final ContractNegotiationListener listener = mock(); private final ContractNegotiationPendingGuard pendingGuard = mock(); - private final ProtocolWebhook protocolWebhook = mock(); + private final ProtocolWebhookRegistry protocolWebhookRegistry = mock(); private ProviderContractNegotiationManagerImpl manager; @BeforeEach @@ -111,7 +111,7 @@ void setUp() { .policyStore(policyStore) .entityRetryProcessConfiguration(new EntityRetryProcessConfiguration(RETRY_LIMIT, () -> new ExponentialWaitStrategy(0L))) .pendingGuard(pendingGuard) - .protocolWebhook(protocolWebhook) + .protocolWebhookRegistry(protocolWebhookRegistry) .build(); } @@ -122,7 +122,7 @@ void offering_shouldSendOfferAndTransitionToOffered() { var ack = ContractNegotiationAck.Builder.newInstance().consumerPid("consumerPid").build(); when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack))); when(store.findById(negotiation.getId())).thenReturn(negotiation); - when(protocolWebhook.url()).thenReturn("http://callback.address"); + when(protocolWebhookRegistry.resolve(negotiation.getProtocol())).thenReturn(() -> "http://callback.address"); manager.start(); @@ -140,6 +140,28 @@ void offering_shouldSendOfferAndTransitionToOffered() { }); } + @Test + void offering_shouldTransitionToTerminated_whenProtocolNotResolved() { + var negotiation = contractNegotiationBuilder().state(OFFERING.code()).contractOffer(contractOffer()).build(); + when(store.nextNotLeased(anyInt(), stateIs(OFFERING.code()))).thenReturn(List.of(negotiation)).thenReturn(emptyList()); + var ack = ContractNegotiationAck.Builder.newInstance().consumerPid("consumerPid").build(); + when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack))); + when(store.findById(negotiation.getId())).thenReturn(negotiation); + when(protocolWebhookRegistry.resolve(negotiation.getProtocol())).thenReturn(null); + + manager.start(); + + await().untilAsserted(() -> { + var captor = ArgumentCaptor.forClass(ContractNegotiation.class); + verify(store).save(captor.capture()); + var storedNegotiation = captor.getValue(); + assertThat(storedNegotiation.getState()).isEqualTo(TERMINATED.code()); + assertThat(storedNegotiation.getErrorDetail()).isNotNull(); + verifyNoInteractions(dispatcherRegistry); + verify(listener).terminated(any()); + }); + } + @Test void requested_shouldTransitionToAgreeing() { var negotiation = contractNegotiationBuilder().state(REQUESTED.code()).build(); @@ -265,7 +287,7 @@ void dispatchException(ContractNegotiationStates starting, ContractNegotiationSt when(store.nextNotLeased(anyInt(), stateIs(starting.code()))).thenReturn(List.of(negotiation)).thenReturn(emptyList()); when(dispatcherRegistry.dispatch(any(), any())).thenReturn(result); when(store.findById(negotiation.getId())).thenReturn(negotiation); - when(protocolWebhook.url()).thenReturn("http://callback.address"); + when(protocolWebhookRegistry.resolve(negotiation.getProtocol())).thenReturn(() -> "http://callback.address"); manager.start(); diff --git a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/TransferCoreExtension.java b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/TransferCoreExtension.java index 0b1a49c1bb5..02f24e7bdec 100644 --- a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/TransferCoreExtension.java +++ b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/TransferCoreExtension.java @@ -44,7 +44,7 @@ import org.eclipse.edc.spi.command.CommandHandlerRegistry; import org.eclipse.edc.spi.event.EventRouter; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; -import org.eclipse.edc.spi.protocol.ProtocolWebhook; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.retry.ExponentialWaitStrategy; import org.eclipse.edc.spi.security.Vault; import org.eclipse.edc.spi.system.ExecutorInstrumentation; @@ -130,7 +130,7 @@ public class TransferCoreExtension implements ServiceExtension { private TypeTransformerRegistry typeTransformerRegistry; @Inject - private ProtocolWebhook protocolWebhook; + private ProtocolWebhookRegistry protocolWebhookRegistry; @Inject private TransferProcessPendingGuard pendingGuard; @@ -183,7 +183,7 @@ public void initialize(ServiceExtensionContext context) { .batchSize(stateMachineBatchSize) .addressResolver(addressResolver) .entityRetryProcessConfiguration(entityRetryProcessConfiguration) - .protocolWebhook(protocolWebhook) + .protocolWebhookRegistry(protocolWebhookRegistry) .provisionResponsesHandler(provisionResponsesHandler) .deprovisionResponsesHandler(deprovisionResponsesHandler) .pendingGuard(pendingGuard) diff --git a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java index 79194046765..2159e76ab06 100644 --- a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java +++ b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java @@ -45,7 +45,7 @@ import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferTerminationMessage; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; -import org.eclipse.edc.spi.protocol.ProtocolWebhook; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.response.ResponseStatus; import org.eclipse.edc.spi.response.StatusResult; @@ -116,7 +116,7 @@ public class TransferProcessManagerImpl extends AbstractStateEntityManager false; @@ -286,25 +286,32 @@ private boolean processProvisioned(TransferProcess process) { @WithSpan private boolean processRequesting(TransferProcess process) { var originalDestination = process.getDataDestination(); + var callbackAddress = protocolWebhookRegistry.resolve(process.getProtocol()); + + if (callbackAddress != null) { + var dataDestination = Optional.ofNullable(originalDestination) + .map(DataAddress::getKeyName) + .map(key -> vault.resolveSecret(key)) + .map(secret -> DataAddress.Builder.newInstance().properties(originalDestination.getProperties()).property(EDC_DATA_ADDRESS_SECRET, secret).build()) + .orElse(originalDestination); + + var messageBuilder = TransferRequestMessage.Builder.newInstance() + .callbackAddress(callbackAddress.url()) + .dataDestination(dataDestination) + .transferType(process.getTransferType()) + .contractId(process.getContractId()); + + return dispatch(messageBuilder, process, policyArchive.findPolicyForContract(process.getContractId()), TransferProcessAck.class) + .onSuccessResult(this::transitionToRequested) + .onRetryExhausted(this::transitionToTerminated) + .onFailure((t, throwable) -> transitionToRequesting(t)) + .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) + .execute("send transfer request to " + process.getCounterPartyAddress()); - var dataDestination = Optional.ofNullable(originalDestination) - .map(DataAddress::getKeyName) - .map(key -> vault.resolveSecret(key)) - .map(secret -> DataAddress.Builder.newInstance().properties(originalDestination.getProperties()).property(EDC_DATA_ADDRESS_SECRET, secret).build()) - .orElse(originalDestination); - - var messageBuilder = TransferRequestMessage.Builder.newInstance() - .callbackAddress(protocolWebhook.url()) - .dataDestination(dataDestination) - .transferType(process.getTransferType()) - .contractId(process.getContractId()); - - return dispatch(messageBuilder, process, policyArchive.findPolicyForContract(process.getContractId()), TransferProcessAck.class) - .onSuccessResult(this::transitionToRequested) - .onRetryExhausted(this::transitionToTerminated) - .onFailure((t, throwable) -> transitionToRequesting(t)) - .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) - .execute("send transfer request to " + process.getCounterPartyAddress()); + } else { + transitionToTerminated(process, "No callback address found for protocol: " + process.getProtocol()); + return true; + } } /** @@ -744,8 +751,8 @@ public Builder addressResolver(DataAddressResolver addressResolver) { return this; } - public Builder protocolWebhook(ProtocolWebhook protocolWebhook) { - manager.protocolWebhook = protocolWebhook; + public Builder protocolWebhookRegistry(ProtocolWebhookRegistry protocolWebhookRegistry) { + manager.protocolWebhookRegistry = protocolWebhookRegistry; return this; } diff --git a/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplIntegrationTest.java b/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplIntegrationTest.java index 374ad3ca075..59866ec1554 100644 --- a/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplIntegrationTest.java +++ b/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplIntegrationTest.java @@ -41,6 +41,7 @@ import org.eclipse.edc.spi.entity.StatefulEntity; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.retry.ExponentialWaitStrategy; @@ -98,10 +99,12 @@ class TransferProcessManagerImplIntegrationTest { private final TransferProcessStore store = new InMemoryTransferProcessStore(clock, CriterionOperatorRegistryImpl.ofDefaults()); private final RemoteMessageDispatcherRegistry dispatcherRegistry = mock(); private final DataFlowManager dataFlowManager = mock(); + private final ProtocolWebhookRegistry protocolWebhookRegistry = mock(); private TransferProcessManagerImpl manager; @BeforeEach void setup() { + when(protocolWebhookRegistry.resolve(any())).thenReturn(() -> "any"); var resourceManifest = ResourceManifest.Builder.newInstance().definitions(List.of(new TestResourceDefinition())).build(); when(manifestGenerator.generateConsumerResourceManifest(any(TransferProcess.class), any(Policy.class))).thenReturn(Result.success(resourceManifest)); @@ -122,7 +125,7 @@ void setup() { .observable(mock()) .store(store) .policyArchive(policyArchive) - .protocolWebhook(() -> "any") + .protocolWebhookRegistry(protocolWebhookRegistry) .addressResolver(mock()) .provisionResponsesHandler(new ProvisionResponsesHandler(mock(), mock(), mock(), mock())) .deprovisionResponsesHandler(mock()) diff --git a/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java b/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java index f4b9c3673ef..c0e71a11d55 100644 --- a/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java +++ b/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java @@ -48,7 +48,7 @@ import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; -import org.eclipse.edc.spi.protocol.ProtocolWebhook; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; @@ -140,7 +140,7 @@ class TransferProcessManagerImplTest { private final Vault vault = mock(); private final Clock clock = Clock.systemUTC(); private final TransferProcessListener listener = mock(); - private final ProtocolWebhook protocolWebhook = mock(); + private final ProtocolWebhookRegistry protocolWebhookRegistry = mock(); private final DataAddressResolver addressResolver = mock(); private final ProvisionResponsesHandler provisionResponsesHandler = mock(); private final DeprovisionResponsesHandler deprovisionResponsesHandler = mock(); @@ -151,7 +151,7 @@ class TransferProcessManagerImplTest { @BeforeEach void setup() { - when(protocolWebhook.url()).thenReturn(protocolWebhookUrl); + when(protocolWebhookRegistry.resolve(any())).thenReturn(() -> protocolWebhookUrl); when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.success(createDataFlowResponse())); when(policyArchive.findPolicyForContract(any())).thenReturn(Policy.Builder.newInstance().build()); var observable = new TransferProcessObservableImpl(); @@ -172,191 +172,13 @@ void setup() { .vault(vault) .addressResolver(addressResolver) .entityRetryProcessConfiguration(entityRetryProcessConfiguration) - .protocolWebhook(protocolWebhook) + .protocolWebhookRegistry(protocolWebhookRegistry) .provisionResponsesHandler(provisionResponsesHandler) .deprovisionResponsesHandler(deprovisionResponsesHandler) .pendingGuard(pendingGuard) .build(); } - @Nested - class InitiateConsumerRequest { - @Test - void shouldStoreTransferProcess() { - when(policyArchive.findPolicyForContract(any())).thenReturn(Policy.Builder.newInstance().target("assetId").build()); - when(transferProcessStore.findForCorrelationId("1")).thenReturn(null); - var callback = CallbackAddress.Builder.newInstance().uri("local://test").events(Set.of("test")).build(); - - var transferRequest = TransferRequest.Builder.newInstance() - .id("1") - .dataDestination(DataAddress.Builder.newInstance().type("test").build()) - .callbackAddresses(List.of(callback)) - .build(); - - var captor = ArgumentCaptor.forClass(TransferProcess.class); - - var result = manager.initiateConsumerRequest(transferRequest); - - assertThat(result).isSucceeded().isNotNull(); - verify(transferProcessStore, times(RETRY_LIMIT)).save(captor.capture()); - var transferProcess = captor.getValue(); - assertThat(transferProcess.getId()).isEqualTo("1"); - assertThat(transferProcess.getCorrelationId()).isNull(); - assertThat(transferProcess.getCallbackAddresses()).usingRecursiveFieldByFieldElementComparator().contains(callback); - assertThat(transferProcess.getAssetId()).isEqualTo("assetId"); - verify(listener).initiated(any()); - } - - @Test - void shouldFail_whenPolicyNotAvailable() { - when(policyArchive.findPolicyForContract(any())).thenReturn(null); - when(transferProcessStore.findForCorrelationId("1")).thenReturn(null); - - var transferRequest = TransferRequest.Builder.newInstance() - .id("1") - .contractId("contractId") - .dataDestination(DataAddress.Builder.newInstance().type("test").build()) - .build(); - - var result = manager.initiateConsumerRequest(transferRequest); - - assertThat(result).isFailed(); - } - } - - @Nested - class InitialConsumer { - @Test - void initial_consumer_shouldTransitionToProvisioning() { - var transferProcess = createTransferProcess(INITIAL); - when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); - when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))) - .thenReturn(List.of(transferProcess)) - .thenReturn(emptyList()); - var resourceManifest = ResourceManifest.Builder.newInstance().definitions(List.of(new TestResourceDefinition())).build(); - when(manifestGenerator.generateConsumerResourceManifest(any(TransferProcess.class), any(Policy.class))) - .thenReturn(Result.success(resourceManifest)); - - manager.start(); - - await().untilAsserted(() -> { - verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); - verifyNoInteractions(provisionManager); - verify(transferProcessStore).save(argThat(p -> p.getState() == PROVISIONING.code())); - }); - } - - @Test - void initial_consumer_manifestEvaluationFailed_shouldTransitionToTerminated() { - var transferProcess = createTransferProcess(INITIAL); - when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); - when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))) - .thenReturn(List.of(transferProcess)) - .thenReturn(emptyList()); - when(manifestGenerator.generateConsumerResourceManifest(any(TransferProcess.class), any(Policy.class))) - .thenReturn(Result.failure("error")); - - manager.start(); - - await().untilAsserted(() -> { - verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); - verifyNoInteractions(provisionManager); - verify(transferProcessStore).save(argThat(p -> p.getState() == TERMINATED.code())); - }); - } - - @Test - void initial_consumer_shouldTransitionToTerminated_whenNoPolicyFound() { - var transferProcess = createTransferProcess(INITIAL); - when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))) - .thenReturn(List.of(transferProcess)) - .thenReturn(emptyList()); - when(policyArchive.findPolicyForContract(anyString())).thenReturn(null); - - manager.start(); - - await().untilAsserted(() -> { - verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); - verifyNoInteractions(provisionManager); - verify(transferProcessStore).save(argThat(p -> p.getState() == TERMINATED.code())); - }); - } - } - - @Nested - class InitialProvider { - - private final TransferProcess.Builder builder = createTransferProcessBuilder(INITIAL).type(PROVIDER); - - @Test - void shouldTransitionToProvisioning() { - var transferProcess = builder.dataDestination(null).build(); - when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); - when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))).thenReturn(List.of(transferProcess)).thenReturn(emptyList()); - var contentDataAddress = DataAddress.Builder.newInstance().type("type").build(); - when(addressResolver.resolveForAsset(any())).thenReturn(contentDataAddress); - var resourceManifest = ResourceManifest.Builder.newInstance().definitions(List.of(new TestResourceDefinition())).build(); - when(manifestGenerator.generateProviderResourceManifest(any(TransferProcess.class), any(), any())) - .thenReturn(resourceManifest); - - manager.start(); - - await().untilAsserted(() -> { - verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); - var captor = ArgumentCaptor.forClass(TransferProcess.class); - verify(transferProcessStore).save(captor.capture()); - verify(manifestGenerator).generateProviderResourceManifest(any(), any(), any()); - verifyNoInteractions(provisionManager, vault); - var actualTransferProcess = captor.getValue(); - assertThat(actualTransferProcess.getState()).isEqualTo(PROVISIONING.code()); - assertThat(actualTransferProcess.getContentDataAddress()).isSameAs(contentDataAddress); - assertThat(actualTransferProcess.getResourceManifest()).isSameAs(resourceManifest); - }); - } - - @Test - void shouldStoreSecret_whenItIsFoundInTheDataAddress() { - var destinationDataAddress = DataAddress.Builder.newInstance() - .keyName("keyName") - .type("type") - .property(EDC_DATA_ADDRESS_SECRET, "secret") - .build(); - var transferProcess = builder.dataDestination(destinationDataAddress).build(); - when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); - when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))).thenReturn(List.of(transferProcess)).thenReturn(emptyList()); - when(addressResolver.resolveForAsset(any())).thenReturn(DataAddress.Builder.newInstance().type("type").build()); - var resourceManifest = ResourceManifest.Builder.newInstance().definitions(List.of(new TestResourceDefinition())).build(); - when(manifestGenerator.generateProviderResourceManifest(any(TransferProcess.class), any(), any())) - .thenReturn(resourceManifest); - - manager.start(); - - await().untilAsserted(() -> { - verify(vault).storeSecret("keyName", "secret"); - verify(transferProcessStore).save(any()); - }); - } - - @Test - void shouldTransitionToTerminating_whenAssetIsNotResolved() { - var transferProcess = builder.build(); - when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); - when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))).thenReturn(List.of(transferProcess)).thenReturn(emptyList()); - when(addressResolver.resolveForAsset(any())).thenReturn(null); - - manager.start(); - - await().untilAsserted(() -> { - verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); - var captor = ArgumentCaptor.forClass(TransferProcess.class); - verify(transferProcessStore).save(captor.capture()); - verifyNoInteractions(manifestGenerator, provisionManager); - var actualTransferProcess = captor.getValue(); - assertThat(actualTransferProcess.getState()).isEqualTo(TERMINATING.code()); - }); - } - } - @Test void provisioning_shouldInvokeProvisionResultHandler() { var process = createTransferProcess(PROVISIONING).toBuilder() @@ -475,177 +297,33 @@ void provisionedProvider_shouldTransitionToStarting() { }); } - @Nested - class Requesting { - @Test - void requesting_shouldSendMessageAndTransitionToRequested() { - var process = createTransferProcessBuilder(REQUESTING).dataDestination(null).build(); - process.setCorrelationId(null); - var ack = TransferProcessAck.Builder.newInstance().providerPid("providerPid").build(); - when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack))); - when(transferProcessStore.nextNotLeased(anyInt(), consumerStateIs(REQUESTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); - when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(REQUESTING.code()).build()); - when(vault.resolveSecret(any())).thenReturn(null); - - manager.start(); - - await().untilAsserted(() -> { - var storeCaptor = ArgumentCaptor.forClass(TransferProcess.class); - verify(transferProcessStore, times(1)).save(storeCaptor.capture()); - var storedTransferProcess = storeCaptor.getValue(); - assertThat(storedTransferProcess.getState()).isEqualTo(REQUESTED.code()); - assertThat(storedTransferProcess.getCorrelationId()).isEqualTo("providerPid"); - verify(listener).requested(process); - var captor = ArgumentCaptor.forClass(TransferRequestMessage.class); - verify(dispatcherRegistry).dispatch(eq(TransferProcessAck.class), captor.capture()); - var message = captor.getValue(); - assertThat(message.getProcessId()).isEqualTo(process.getId()); - assertThat(message.getConsumerPid()).isEqualTo(process.getId()); - assertThat(message.getProviderPid()).isEqualTo(null); - assertThat(message.getCallbackAddress()).isEqualTo(protocolWebhookUrl); - assertThat(message.getDataDestination()).isNull(); - }); - } - - @Test - void requesting_shouldAddSecretToDataAddress_whenItExists() { - var destination = DataAddress.Builder.newInstance().type("any").keyName("keyName").build(); - var process = createTransferProcessBuilder(REQUESTING).dataDestination(destination).build(); - var ack = TransferProcessAck.Builder.newInstance().providerPid("providerPid").build(); - when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack))); - when(transferProcessStore.nextNotLeased(anyInt(), consumerStateIs(REQUESTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); - when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(REQUESTING.code()).build()); - when(vault.resolveSecret(any())).thenReturn("secret"); + @Test + void completing_provider_shouldTransitionToDeprovisioning_whenSendingMessageSucceed() { + var process = createTransferProcessBuilder(COMPLETING).type(PROVIDER).correlationId("correlationId").build(); + when(transferProcessStore.nextNotLeased(anyInt(), stateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(COMPLETING.code()).build()); + when(dispatcherRegistry.dispatch(any(), isA(TransferCompletionMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); - manager.start(); + manager.start(); - await().untilAsserted(() -> { - var captor = ArgumentCaptor.forClass(TransferRequestMessage.class); - verify(dispatcherRegistry).dispatch(eq(TransferProcessAck.class), captor.capture()); - verify(transferProcessStore, times(1)).save(argThat(p -> p.getState() == REQUESTED.code())); - verify(listener).requested(process); - verify(vault).resolveSecret("keyName"); - var requestMessage = captor.getValue(); - assertThat(requestMessage.getDataDestination().getStringProperty(EDC_DATA_ADDRESS_SECRET)).isEqualTo("secret"); - }); - } + await().untilAsserted(() -> { + var captor = ArgumentCaptor.forClass(TransferCompletionMessage.class); + verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture()); + var message = captor.getValue(); + assertThat(message.getProviderPid()).isEqualTo(process.getId()); + assertThat(message.getConsumerPid()).isEqualTo("correlationId"); + assertThat(message.getProcessId()).isEqualTo("correlationId"); + verify(transferProcessStore, atLeastOnce()).save(argThat(p -> p.getState() == DEPROVISIONING.code())); + verify(listener).completed(process); + }); } - @Nested - class StartingProvider { - - @Test - void shouldStartDataTransferAndSendMessageToConsumer() { - var process = createTransferProcess(STARTING).toBuilder().type(PROVIDER).build(); - var dataFlowResponse = createDataFlowResponse(); - when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); - when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(STARTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); - when(transferProcessStore.findById(process.getId())).thenReturn(process); - when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.success(dataFlowResponse)); - when(dispatcherRegistry.dispatch(any(), isA(TransferStartMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); - - manager.start(); - - await().untilAsserted(() -> { - var captor = ArgumentCaptor.forClass(TransferStartMessage.class); - verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); - verify(dispatcherRegistry).dispatch(any(), captor.capture()); - verify(transferProcessStore).save(argThat(p -> p.getState() == STARTED.code())); - verify(listener).started(eq(process), any()); - var message = captor.getValue(); - assertThat(message.getProcessId()).isEqualTo(process.getCorrelationId()); - assertThat(message.getConsumerPid()).isEqualTo(process.getCorrelationId()); - assertThat(message.getProviderPid()).isEqualTo(process.getId()); - assertThat(message.getDataAddress()).usingRecursiveComparison().isEqualTo(dataFlowResponse.getDataAddress()); - }); - } - } - - @Nested - class ResumingProvider { - - @Test - void shouldStartDataTransferAndSendMessageToConsumer() { - var process = createTransferProcess(RESUMING).toBuilder().type(PROVIDER).build(); - var dataFlowResponse = createDataFlowResponse(); - when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); - when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(RESUMING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); - when(transferProcessStore.findById(process.getId())).thenReturn(process); - when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.success(dataFlowResponse)); - when(dispatcherRegistry.dispatch(any(), isA(TransferStartMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); - - manager.start(); - - await().untilAsserted(() -> { - var captor = ArgumentCaptor.forClass(TransferStartMessage.class); - verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); - verify(dispatcherRegistry).dispatch(any(), captor.capture()); - var transferCaptor = ArgumentCaptor.forClass(TransferProcess.class); - verify(transferProcessStore).save(transferCaptor.capture()); - assertThat(transferCaptor.getValue().getState()).isEqualTo(STARTED.code()); - verify(listener).started(eq(process), any()); - var message = captor.getValue(); - assertThat(message.getProcessId()).isEqualTo(process.getCorrelationId()); - assertThat(message.getConsumerPid()).isEqualTo(process.getCorrelationId()); - assertThat(message.getProviderPid()).isEqualTo(process.getId()); - assertThat(message.getDataAddress()).usingRecursiveComparison().isEqualTo(dataFlowResponse.getDataAddress()); - }); - } - } - - @Nested - class ResumingConsumer { - - @Test - void shouldSendMessageToProviderAndTransitionToResumed() { - var process = createTransferProcess(RESUMING).toBuilder().type(CONSUMER).build(); - when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); - when(transferProcessStore.nextNotLeased(anyInt(), consumerStateIs(RESUMING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); - when(transferProcessStore.findById(process.getId())).thenReturn(process); - when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success("any"))); - - manager.start(); - - await().untilAsserted(() -> { - var captor = ArgumentCaptor.forClass(TransferStartMessage.class); - verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); - verify(dispatcherRegistry).dispatch(any(), captor.capture()); - verify(transferProcessStore).save(argThat(p -> p.getState() == RESUMED.code())); - var message = captor.getValue(); - assertThat(message.getProcessId()).isEqualTo(process.getCorrelationId()); - assertThat(message.getProviderPid()).isEqualTo(process.getCorrelationId()); - assertThat(message.getConsumerPid()).isEqualTo(process.getId()); - }); - } - } - - @Test - void completing_provider_shouldTransitionToDeprovisioning_whenSendingMessageSucceed() { - var process = createTransferProcessBuilder(COMPLETING).type(PROVIDER).correlationId("correlationId").build(); - when(transferProcessStore.nextNotLeased(anyInt(), stateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); - when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(COMPLETING.code()).build()); - when(dispatcherRegistry.dispatch(any(), isA(TransferCompletionMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); - - manager.start(); - - await().untilAsserted(() -> { - var captor = ArgumentCaptor.forClass(TransferCompletionMessage.class); - verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture()); - var message = captor.getValue(); - assertThat(message.getProviderPid()).isEqualTo(process.getId()); - assertThat(message.getConsumerPid()).isEqualTo("correlationId"); - assertThat(message.getProcessId()).isEqualTo("correlationId"); - verify(transferProcessStore, atLeastOnce()).save(argThat(p -> p.getState() == DEPROVISIONING.code())); - verify(listener).completed(process); - }); - } - - @Test - void completing_consumer_shouldTransitionToCompleted_whenSendingMessageSucceed() { - var process = createTransferProcessBuilder(COMPLETING).type(CONSUMER).correlationId("correlationId").build(); - when(transferProcessStore.nextNotLeased(anyInt(), stateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); - when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(COMPLETING.code()).build()); - when(dispatcherRegistry.dispatch(any(), isA(TransferCompletionMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); + @Test + void completing_consumer_shouldTransitionToCompleted_whenSendingMessageSucceed() { + var process = createTransferProcessBuilder(COMPLETING).type(CONSUMER).correlationId("correlationId").build(); + when(transferProcessStore.nextNotLeased(anyInt(), stateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(COMPLETING.code()).build()); + when(dispatcherRegistry.dispatch(any(), isA(TransferCompletionMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); manager.start(); @@ -765,56 +443,6 @@ void terminating_onFailureAndRetriesExhausted_transitToTerminated() { }); } - @Nested - class Suspending { - - @Test - void provider_shouldSuspendDataFlowAndTransitionToSuspended_whenMessageSentCorrectly() { - var process = createTransferProcessBuilder(SUSPENDING).type(PROVIDER).correlationId("counterPartyId").build(); - when(transferProcessStore.nextNotLeased(anyInt(), stateIs(SUSPENDING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); - when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(SUSPENDING.code()).build()); - when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success("any"))); - when(dataFlowManager.suspend(any())).thenReturn(StatusResult.success()); - - manager.start(); - - await().untilAsserted(() -> { - verify(dataFlowManager).suspend(process); - var captor = ArgumentCaptor.forClass(TransferSuspensionMessage.class); - verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture()); - var message = captor.getValue(); - assertThat(message.getProviderPid()).isEqualTo(process.getId()); - assertThat(message.getConsumerPid()).isEqualTo("counterPartyId"); - assertThat(message.getProcessId()).isEqualTo("counterPartyId"); - verify(transferProcessStore, atLeastOnce()).save(argThat(p -> p.getState() == SUSPENDED.code())); - verify(listener).suspended(process); - }); - } - - @Test - void consumer_shouldTransitionToSuspended_whenMessageSentCorrectly() { - var process = createTransferProcessBuilder(SUSPENDING).type(CONSUMER).correlationId("counterPartyId").build(); - when(transferProcessStore.nextNotLeased(anyInt(), stateIs(SUSPENDING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); - when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(SUSPENDING.code()).build()); - when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success("any"))); - - manager.start(); - - await().untilAsserted(() -> { - verifyNoInteractions(dataFlowManager); - var captor = ArgumentCaptor.forClass(TransferSuspensionMessage.class); - verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture()); - var message = captor.getValue(); - assertThat(message.getProviderPid()).isEqualTo("counterPartyId"); - assertThat(message.getConsumerPid()).isEqualTo(process.getId()); - assertThat(message.getProcessId()).isEqualTo("counterPartyId"); - verify(transferProcessStore).save(argThat(p -> p.getState() == SUSPENDED.code())); - verify(listener).suspended(process); - }); - } - - } - @Test void deprovisioning_shouldTransitionToDeprovisioned() { var manifest = ResourceManifest.Builder.newInstance() @@ -998,4 +626,376 @@ public Stream provideArguments(ExtensionContext extensionCo } } + @Nested + class InitiateConsumerRequest { + @Test + void shouldStoreTransferProcess() { + when(policyArchive.findPolicyForContract(any())).thenReturn(Policy.Builder.newInstance().target("assetId").build()); + when(transferProcessStore.findForCorrelationId("1")).thenReturn(null); + var callback = CallbackAddress.Builder.newInstance().uri("local://test").events(Set.of("test")).build(); + + var transferRequest = TransferRequest.Builder.newInstance() + .id("1") + .dataDestination(DataAddress.Builder.newInstance().type("test").build()) + .callbackAddresses(List.of(callback)) + .build(); + + var captor = ArgumentCaptor.forClass(TransferProcess.class); + + var result = manager.initiateConsumerRequest(transferRequest); + + assertThat(result).isSucceeded().isNotNull(); + verify(transferProcessStore, times(RETRY_LIMIT)).save(captor.capture()); + var transferProcess = captor.getValue(); + assertThat(transferProcess.getId()).isEqualTo("1"); + assertThat(transferProcess.getCorrelationId()).isNull(); + assertThat(transferProcess.getCallbackAddresses()).usingRecursiveFieldByFieldElementComparator().contains(callback); + assertThat(transferProcess.getAssetId()).isEqualTo("assetId"); + verify(listener).initiated(any()); + } + + @Test + void shouldFail_whenPolicyNotAvailable() { + when(policyArchive.findPolicyForContract(any())).thenReturn(null); + when(transferProcessStore.findForCorrelationId("1")).thenReturn(null); + + var transferRequest = TransferRequest.Builder.newInstance() + .id("1") + .contractId("contractId") + .dataDestination(DataAddress.Builder.newInstance().type("test").build()) + .build(); + + var result = manager.initiateConsumerRequest(transferRequest); + + assertThat(result).isFailed(); + } + } + + @Nested + class InitialConsumer { + @Test + void initial_consumer_shouldTransitionToProvisioning() { + var transferProcess = createTransferProcess(INITIAL); + when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); + when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))) + .thenReturn(List.of(transferProcess)) + .thenReturn(emptyList()); + var resourceManifest = ResourceManifest.Builder.newInstance().definitions(List.of(new TestResourceDefinition())).build(); + when(manifestGenerator.generateConsumerResourceManifest(any(TransferProcess.class), any(Policy.class))) + .thenReturn(Result.success(resourceManifest)); + + manager.start(); + + await().untilAsserted(() -> { + verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); + verifyNoInteractions(provisionManager); + verify(transferProcessStore).save(argThat(p -> p.getState() == PROVISIONING.code())); + }); + } + + @Test + void initial_consumer_manifestEvaluationFailed_shouldTransitionToTerminated() { + var transferProcess = createTransferProcess(INITIAL); + when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); + when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))) + .thenReturn(List.of(transferProcess)) + .thenReturn(emptyList()); + when(manifestGenerator.generateConsumerResourceManifest(any(TransferProcess.class), any(Policy.class))) + .thenReturn(Result.failure("error")); + + manager.start(); + + await().untilAsserted(() -> { + verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); + verifyNoInteractions(provisionManager); + verify(transferProcessStore).save(argThat(p -> p.getState() == TERMINATED.code())); + }); + } + + @Test + void initial_consumer_shouldTransitionToTerminated_whenNoPolicyFound() { + var transferProcess = createTransferProcess(INITIAL); + when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))) + .thenReturn(List.of(transferProcess)) + .thenReturn(emptyList()); + when(policyArchive.findPolicyForContract(anyString())).thenReturn(null); + + manager.start(); + + await().untilAsserted(() -> { + verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); + verifyNoInteractions(provisionManager); + verify(transferProcessStore).save(argThat(p -> p.getState() == TERMINATED.code())); + }); + } + } + + @Nested + class InitialProvider { + + private final TransferProcess.Builder builder = createTransferProcessBuilder(INITIAL).type(PROVIDER); + + @Test + void shouldTransitionToProvisioning() { + var transferProcess = builder.dataDestination(null).build(); + when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); + when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))).thenReturn(List.of(transferProcess)).thenReturn(emptyList()); + var contentDataAddress = DataAddress.Builder.newInstance().type("type").build(); + when(addressResolver.resolveForAsset(any())).thenReturn(contentDataAddress); + var resourceManifest = ResourceManifest.Builder.newInstance().definitions(List.of(new TestResourceDefinition())).build(); + when(manifestGenerator.generateProviderResourceManifest(any(TransferProcess.class), any(), any())) + .thenReturn(resourceManifest); + + manager.start(); + + await().untilAsserted(() -> { + verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); + var captor = ArgumentCaptor.forClass(TransferProcess.class); + verify(transferProcessStore).save(captor.capture()); + verify(manifestGenerator).generateProviderResourceManifest(any(), any(), any()); + verifyNoInteractions(provisionManager, vault); + var actualTransferProcess = captor.getValue(); + assertThat(actualTransferProcess.getState()).isEqualTo(PROVISIONING.code()); + assertThat(actualTransferProcess.getContentDataAddress()).isSameAs(contentDataAddress); + assertThat(actualTransferProcess.getResourceManifest()).isSameAs(resourceManifest); + }); + } + + @Test + void shouldStoreSecret_whenItIsFoundInTheDataAddress() { + var destinationDataAddress = DataAddress.Builder.newInstance() + .keyName("keyName") + .type("type") + .property(EDC_DATA_ADDRESS_SECRET, "secret") + .build(); + var transferProcess = builder.dataDestination(destinationDataAddress).build(); + when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); + when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))).thenReturn(List.of(transferProcess)).thenReturn(emptyList()); + when(addressResolver.resolveForAsset(any())).thenReturn(DataAddress.Builder.newInstance().type("type").build()); + var resourceManifest = ResourceManifest.Builder.newInstance().definitions(List.of(new TestResourceDefinition())).build(); + when(manifestGenerator.generateProviderResourceManifest(any(TransferProcess.class), any(), any())) + .thenReturn(resourceManifest); + + manager.start(); + + await().untilAsserted(() -> { + verify(vault).storeSecret("keyName", "secret"); + verify(transferProcessStore).save(any()); + }); + } + + @Test + void shouldTransitionToTerminating_whenAssetIsNotResolved() { + var transferProcess = builder.build(); + when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); + when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))).thenReturn(List.of(transferProcess)).thenReturn(emptyList()); + when(addressResolver.resolveForAsset(any())).thenReturn(null); + + manager.start(); + + await().untilAsserted(() -> { + verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); + var captor = ArgumentCaptor.forClass(TransferProcess.class); + verify(transferProcessStore).save(captor.capture()); + verifyNoInteractions(manifestGenerator, provisionManager); + var actualTransferProcess = captor.getValue(); + assertThat(actualTransferProcess.getState()).isEqualTo(TERMINATING.code()); + }); + } + } + + @Nested + class Requesting { + @Test + void requesting_shouldSendMessageAndTransitionToRequested() { + var process = createTransferProcessBuilder(REQUESTING).dataDestination(null).build(); + process.setCorrelationId(null); + var ack = TransferProcessAck.Builder.newInstance().providerPid("providerPid").build(); + when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack))); + when(transferProcessStore.nextNotLeased(anyInt(), consumerStateIs(REQUESTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(REQUESTING.code()).build()); + when(vault.resolveSecret(any())).thenReturn(null); + + manager.start(); + + await().untilAsserted(() -> { + var storeCaptor = ArgumentCaptor.forClass(TransferProcess.class); + verify(transferProcessStore, times(1)).save(storeCaptor.capture()); + var storedTransferProcess = storeCaptor.getValue(); + assertThat(storedTransferProcess.getState()).isEqualTo(REQUESTED.code()); + assertThat(storedTransferProcess.getCorrelationId()).isEqualTo("providerPid"); + verify(listener).requested(process); + var captor = ArgumentCaptor.forClass(TransferRequestMessage.class); + verify(dispatcherRegistry).dispatch(eq(TransferProcessAck.class), captor.capture()); + var message = captor.getValue(); + assertThat(message.getProcessId()).isEqualTo(process.getId()); + assertThat(message.getConsumerPid()).isEqualTo(process.getId()); + assertThat(message.getProviderPid()).isEqualTo(null); + assertThat(message.getCallbackAddress()).isEqualTo(protocolWebhookUrl); + assertThat(message.getDataDestination()).isNull(); + }); + } + + @Test + void requesting_shouldAddSecretToDataAddress_whenItExists() { + var destination = DataAddress.Builder.newInstance().type("any").keyName("keyName").build(); + var process = createTransferProcessBuilder(REQUESTING).dataDestination(destination).build(); + var ack = TransferProcessAck.Builder.newInstance().providerPid("providerPid").build(); + when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack))); + when(transferProcessStore.nextNotLeased(anyInt(), consumerStateIs(REQUESTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(REQUESTING.code()).build()); + when(vault.resolveSecret(any())).thenReturn("secret"); + + manager.start(); + + await().untilAsserted(() -> { + var captor = ArgumentCaptor.forClass(TransferRequestMessage.class); + verify(dispatcherRegistry).dispatch(eq(TransferProcessAck.class), captor.capture()); + verify(transferProcessStore, times(1)).save(argThat(p -> p.getState() == REQUESTED.code())); + verify(listener).requested(process); + verify(vault).resolveSecret("keyName"); + var requestMessage = captor.getValue(); + assertThat(requestMessage.getDataDestination().getStringProperty(EDC_DATA_ADDRESS_SECRET)).isEqualTo("secret"); + }); + } + } + + @Nested + class StartingProvider { + + @Test + void shouldStartDataTransferAndSendMessageToConsumer() { + var process = createTransferProcess(STARTING).toBuilder().type(PROVIDER).build(); + var dataFlowResponse = createDataFlowResponse(); + when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); + when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(STARTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process); + when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.success(dataFlowResponse)); + when(dispatcherRegistry.dispatch(any(), isA(TransferStartMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); + + manager.start(); + + await().untilAsserted(() -> { + var captor = ArgumentCaptor.forClass(TransferStartMessage.class); + verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); + verify(dispatcherRegistry).dispatch(any(), captor.capture()); + verify(transferProcessStore).save(argThat(p -> p.getState() == STARTED.code())); + verify(listener).started(eq(process), any()); + var message = captor.getValue(); + assertThat(message.getProcessId()).isEqualTo(process.getCorrelationId()); + assertThat(message.getConsumerPid()).isEqualTo(process.getCorrelationId()); + assertThat(message.getProviderPid()).isEqualTo(process.getId()); + assertThat(message.getDataAddress()).usingRecursiveComparison().isEqualTo(dataFlowResponse.getDataAddress()); + }); + } + } + + @Nested + class ResumingProvider { + + @Test + void shouldStartDataTransferAndSendMessageToConsumer() { + var process = createTransferProcess(RESUMING).toBuilder().type(PROVIDER).build(); + var dataFlowResponse = createDataFlowResponse(); + when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); + when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(RESUMING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process); + when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.success(dataFlowResponse)); + when(dispatcherRegistry.dispatch(any(), isA(TransferStartMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); + + manager.start(); + + await().untilAsserted(() -> { + var captor = ArgumentCaptor.forClass(TransferStartMessage.class); + verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); + verify(dispatcherRegistry).dispatch(any(), captor.capture()); + var transferCaptor = ArgumentCaptor.forClass(TransferProcess.class); + verify(transferProcessStore).save(transferCaptor.capture()); + assertThat(transferCaptor.getValue().getState()).isEqualTo(STARTED.code()); + verify(listener).started(eq(process), any()); + var message = captor.getValue(); + assertThat(message.getProcessId()).isEqualTo(process.getCorrelationId()); + assertThat(message.getConsumerPid()).isEqualTo(process.getCorrelationId()); + assertThat(message.getProviderPid()).isEqualTo(process.getId()); + assertThat(message.getDataAddress()).usingRecursiveComparison().isEqualTo(dataFlowResponse.getDataAddress()); + }); + } + } + + @Nested + class ResumingConsumer { + + @Test + void shouldSendMessageToProviderAndTransitionToResumed() { + var process = createTransferProcess(RESUMING).toBuilder().type(CONSUMER).build(); + when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); + when(transferProcessStore.nextNotLeased(anyInt(), consumerStateIs(RESUMING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process); + when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success("any"))); + + manager.start(); + + await().untilAsserted(() -> { + var captor = ArgumentCaptor.forClass(TransferStartMessage.class); + verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); + verify(dispatcherRegistry).dispatch(any(), captor.capture()); + verify(transferProcessStore).save(argThat(p -> p.getState() == RESUMED.code())); + var message = captor.getValue(); + assertThat(message.getProcessId()).isEqualTo(process.getCorrelationId()); + assertThat(message.getProviderPid()).isEqualTo(process.getCorrelationId()); + assertThat(message.getConsumerPid()).isEqualTo(process.getId()); + }); + } + } + + @Nested + class Suspending { + + @Test + void provider_shouldSuspendDataFlowAndTransitionToSuspended_whenMessageSentCorrectly() { + var process = createTransferProcessBuilder(SUSPENDING).type(PROVIDER).correlationId("counterPartyId").build(); + when(transferProcessStore.nextNotLeased(anyInt(), stateIs(SUSPENDING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(SUSPENDING.code()).build()); + when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success("any"))); + when(dataFlowManager.suspend(any())).thenReturn(StatusResult.success()); + + manager.start(); + + await().untilAsserted(() -> { + verify(dataFlowManager).suspend(process); + var captor = ArgumentCaptor.forClass(TransferSuspensionMessage.class); + verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture()); + var message = captor.getValue(); + assertThat(message.getProviderPid()).isEqualTo(process.getId()); + assertThat(message.getConsumerPid()).isEqualTo("counterPartyId"); + assertThat(message.getProcessId()).isEqualTo("counterPartyId"); + verify(transferProcessStore, atLeastOnce()).save(argThat(p -> p.getState() == SUSPENDED.code())); + verify(listener).suspended(process); + }); + } + + @Test + void consumer_shouldTransitionToSuspended_whenMessageSentCorrectly() { + var process = createTransferProcessBuilder(SUSPENDING).type(CONSUMER).correlationId("counterPartyId").build(); + when(transferProcessStore.nextNotLeased(anyInt(), stateIs(SUSPENDING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(SUSPENDING.code()).build()); + when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success("any"))); + + manager.start(); + + await().untilAsserted(() -> { + verifyNoInteractions(dataFlowManager); + var captor = ArgumentCaptor.forClass(TransferSuspensionMessage.class); + verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture()); + var message = captor.getValue(); + assertThat(message.getProviderPid()).isEqualTo("counterPartyId"); + assertThat(message.getConsumerPid()).isEqualTo(process.getId()); + assertThat(message.getProcessId()).isEqualTo("counterPartyId"); + verify(transferProcessStore).save(argThat(p -> p.getState() == SUSPENDED.code())); + verify(listener).suspended(process); + }); + } + + } + } diff --git a/data-protocols/dsp/dsp-catalog/dsp-catalog-http-api/src/main/java/org/eclipse/edc/protocol/dsp/catalog/http/api/DspCatalogApiExtension.java b/data-protocols/dsp/dsp-catalog/dsp-catalog-http-api/src/main/java/org/eclipse/edc/protocol/dsp/catalog/http/api/DspCatalogApiExtension.java index 3f9037eed6e..600e8022bd1 100644 --- a/data-protocols/dsp/dsp-catalog/dsp-catalog-http-api/src/main/java/org/eclipse/edc/protocol/dsp/catalog/http/api/DspCatalogApiExtension.java +++ b/data-protocols/dsp/dsp-catalog/dsp-catalog-http-api/src/main/java/org/eclipse/edc/protocol/dsp/catalog/http/api/DspCatalogApiExtension.java @@ -30,7 +30,7 @@ import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.spi.monitor.Monitor; -import org.eclipse.edc.spi.protocol.ProtocolWebhook; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.query.CriterionOperatorRegistry; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; @@ -41,6 +41,10 @@ import org.eclipse.edc.web.spi.WebService; import org.eclipse.edc.web.spi.configuration.ApiContext; +import java.util.Objects; + +import static org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP; +import static org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP_V_2024_1; import static org.eclipse.edc.protocol.dsp.spi.type.DspCatalogPropertyAndTypeNames.DSPACE_TYPE_CATALOG_REQUEST_MESSAGE_TERM; import static org.eclipse.edc.protocol.dsp.spi.type.DspConstants.DSP_NAMESPACE_V_08; import static org.eclipse.edc.protocol.dsp.spi.type.DspConstants.DSP_NAMESPACE_V_2024_1; @@ -63,7 +67,7 @@ public class DspCatalogApiExtension implements ServiceExtension { @Inject private WebService webService; @Inject - private ProtocolWebhook protocolWebhook; + private ProtocolWebhookRegistry protocolWebhookRegistry; @Inject private CatalogProtocolService service; @Inject @@ -100,15 +104,25 @@ public void initialize(ServiceExtensionContext context) { webService.registerDynamicResource(ApiContext.PROTOCOL, DspCatalogApiController.class, new JerseyJsonLdInterceptor(jsonLd, typeManager, JSON_LD, DSP_SCOPE_V_08)); webService.registerDynamicResource(ApiContext.PROTOCOL, DspCatalogApiController20241.class, new JerseyJsonLdInterceptor(jsonLd, typeManager, JSON_LD, DSP_SCOPE_V_2024_1)); - dataServiceRegistry.register(DataService.Builder.newInstance() - .endpointDescription("dspace:connector") - .endpointUrl(protocolWebhook.url()) - .build()); versionRegistry.register(V_2024_1); versionRegistry.register(V_08); } + @Override + public void prepare() { + registerDataService(DATASPACE_PROTOCOL_HTTP); + registerDataService(DATASPACE_PROTOCOL_HTTP_V_2024_1); + } + + private void registerDataService(String protocol) { + var endpointUrl = Objects.requireNonNull(protocolWebhookRegistry.resolve(protocol)).url(); + dataServiceRegistry.register(protocol, DataService.Builder.newInstance() + .endpointDescription("dspace:connector") + .endpointUrl(endpointUrl) + .build()); + } + private ContinuationTokenManager continuationTokenManager(Monitor monitor, String version, JsonLdNamespace namespace) { var continuationTokenSerDes = new Base64continuationTokenSerDes(transformerRegistry.forContext(version), jsonLd); return new ContinuationTokenManagerImpl(continuationTokenSerDes, namespace, monitor); diff --git a/data-protocols/dsp/dsp-catalog/lib/dsp-catalog-http-api-lib/src/main/java/org/eclipse/edc/protocol/dsp/catalog/http/api/controller/BaseDspCatalogApiController.java b/data-protocols/dsp/dsp-catalog/lib/dsp-catalog-http-api-lib/src/main/java/org/eclipse/edc/protocol/dsp/catalog/http/api/controller/BaseDspCatalogApiController.java index 33b2866b378..b05a72ae2b0 100644 --- a/data-protocols/dsp/dsp-catalog/lib/dsp-catalog-http-api-lib/src/main/java/org/eclipse/edc/protocol/dsp/catalog/http/api/controller/BaseDspCatalogApiController.java +++ b/data-protocols/dsp/dsp-catalog/lib/dsp-catalog-http-api-lib/src/main/java/org/eclipse/edc/protocol/dsp/catalog/http/api/controller/BaseDspCatalogApiController.java @@ -90,7 +90,7 @@ public Response getDataset(@PathParam("id") String id, @HeaderParam(AUTHORIZATIO var request = GetDspRequest.Builder.newInstance(Dataset.class, CatalogError.class) .token(token) .id(id) - .serviceCall(service::getDataset) + .serviceCall((datasetId, tokenRepresentation) -> service.getDataset(datasetId, tokenRepresentation, protocol)) .errorProvider(CatalogError.Builder::newInstance) .protocol(protocol) .build(); diff --git a/data-protocols/dsp/dsp-http-api-configuration/src/main/java/org/eclipse/edc/protocol/dsp/http/api/configuration/DspApiConfigurationExtension.java b/data-protocols/dsp/dsp-http-api-configuration/src/main/java/org/eclipse/edc/protocol/dsp/http/api/configuration/DspApiConfigurationExtension.java index ce13baa057f..b56822d6fe1 100644 --- a/data-protocols/dsp/dsp-http-api-configuration/src/main/java/org/eclipse/edc/protocol/dsp/http/api/configuration/DspApiConfigurationExtension.java +++ b/data-protocols/dsp/dsp-http-api-configuration/src/main/java/org/eclipse/edc/protocol/dsp/http/api/configuration/DspApiConfigurationExtension.java @@ -27,10 +27,9 @@ import org.eclipse.edc.runtime.metamodel.annotation.Configuration; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; -import org.eclipse.edc.runtime.metamodel.annotation.Provides; import org.eclipse.edc.runtime.metamodel.annotation.Setting; import org.eclipse.edc.runtime.metamodel.annotation.Settings; -import org.eclipse.edc.spi.protocol.ProtocolWebhook; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.system.Hostname; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; @@ -62,6 +61,8 @@ import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_PREFIX; import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_PREFIX; import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_SCHEMA; +import static org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP; +import static org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP_V_2024_1; import static org.eclipse.edc.protocol.dsp.spi.type.DspConstants.DSP_NAMESPACE_V_08; import static org.eclipse.edc.protocol.dsp.spi.type.DspConstants.DSP_NAMESPACE_V_2024_1; import static org.eclipse.edc.protocol.dsp.spi.type.DspConstants.DSP_SCOPE_V_08; @@ -76,7 +77,6 @@ * Configure 'protocol' api context. */ @Extension(value = DspApiConfigurationExtension.NAME) -@Provides(ProtocolWebhook.class) public class DspApiConfigurationExtension implements ServiceExtension { public static final String NAME = "Dataspace Protocol API Configuration Extension"; @@ -104,6 +104,9 @@ public class DspApiConfigurationExtension implements ServiceExtension { @Inject private PortMappingRegistry portMappingRegistry; + @Inject + private ProtocolWebhookRegistry protocolWebhookRegistry; + @Override public String name() { return NAME; @@ -115,8 +118,9 @@ public void initialize(ServiceExtensionContext context) { portMappingRegistry.register(portMapping); var dspWebhookAddress = ofNullable(callbackAddress).orElseGet(() -> format("http://%s:%s%s", hostname.get(), portMapping.port(), portMapping.path())); - context.registerService(ProtocolWebhook.class, () -> dspWebhookAddress); + protocolWebhookRegistry.registerWebhook(DATASPACE_PROTOCOL_HTTP, () -> dspWebhookAddress); + protocolWebhookRegistry.registerWebhook(DATASPACE_PROTOCOL_HTTP_V_2024_1, () -> dspWebhookAddress); // registers ns for DSP scope registerNamespaces(DSP_SCOPE_V_08, DSP_NAMESPACE_V_08); diff --git a/data-protocols/dsp/dsp-http-api-configuration/src/test/java/org/eclipse/edc/protocol/dsp/http/api/configuration/DspApiConfigurationExtensionTest.java b/data-protocols/dsp/dsp-http-api-configuration/src/test/java/org/eclipse/edc/protocol/dsp/http/api/configuration/DspApiConfigurationExtensionTest.java index 308a808d077..dc86a08fa6a 100644 --- a/data-protocols/dsp/dsp-http-api-configuration/src/test/java/org/eclipse/edc/protocol/dsp/http/api/configuration/DspApiConfigurationExtensionTest.java +++ b/data-protocols/dsp/dsp-http-api-configuration/src/test/java/org/eclipse/edc/protocol/dsp/http/api/configuration/DspApiConfigurationExtensionTest.java @@ -17,7 +17,7 @@ import org.eclipse.edc.boot.system.injection.ObjectFactory; import org.eclipse.edc.jsonld.spi.JsonLd; import org.eclipse.edc.junit.extensions.DependencyInjectionExtension; -import org.eclipse.edc.spi.protocol.ProtocolWebhook; +import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.system.Hostname; import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.spi.system.configuration.ConfigFactory; @@ -36,7 +36,6 @@ import java.util.Map; -import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.VOCAB; import static org.eclipse.edc.jsonld.spi.Namespaces.DCAT_PREFIX; import static org.eclipse.edc.jsonld.spi.Namespaces.DCAT_SCHEMA; @@ -46,11 +45,14 @@ import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_SCHEMA; import static org.eclipse.edc.protocol.dsp.http.api.configuration.DspApiConfigurationExtension.DEFAULT_PROTOCOL_PATH; import static org.eclipse.edc.protocol.dsp.http.api.configuration.DspApiConfigurationExtension.DEFAULT_PROTOCOL_PORT; +import static org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP; +import static org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP_V_2024_1; import static org.eclipse.edc.protocol.dsp.spi.type.DspConstants.DSP_SCOPE_V_08; import static org.eclipse.edc.protocol.dsp.spi.type.DspConstants.DSP_SCOPE_V_2024_1; import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; import static org.eclipse.edc.spi.constants.CoreConstants.EDC_PREFIX; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; @@ -64,7 +66,7 @@ class DspApiConfigurationExtensionTest { private final TypeManager typeManager = mock(); private final JsonLd jsonLd = mock(); private final PortMappingRegistry portMappingRegistry = mock(); - + private final ProtocolWebhookRegistry protocolWebhookRegistry = mock(); @BeforeEach void setUp(ServiceExtensionContext context) { @@ -73,6 +75,7 @@ void setUp(ServiceExtensionContext context) { context.registerService(TypeManager.class, typeManager); context.registerService(Hostname.class, () -> "hostname"); context.registerService(JsonLd.class, jsonLd); + context.registerService(ProtocolWebhookRegistry.class, protocolWebhookRegistry); TypeTransformerRegistry typeTransformerRegistry = mock(); when(typeTransformerRegistry.forContext(any())).thenReturn(mock()); context.registerService(TypeTransformerRegistry.class, typeTransformerRegistry); @@ -87,7 +90,10 @@ void shouldComposeProtocolWebhook_whenNotConfigured(DspApiConfigurationExtension extension.initialize(context); verify(portMappingRegistry).register(new PortMapping(ApiContext.PROTOCOL, DEFAULT_PROTOCOL_PORT, DEFAULT_PROTOCOL_PATH)); - assertThat(context.getService(ProtocolWebhook.class).url()).isEqualTo("http://hostname:%s%s".formatted(DEFAULT_PROTOCOL_PORT, DEFAULT_PROTOCOL_PATH)); + + var url = "http://hostname:%s%s".formatted(DEFAULT_PROTOCOL_PORT, DEFAULT_PROTOCOL_PATH); + verify(protocolWebhookRegistry).registerWebhook(eq(DATASPACE_PROTOCOL_HTTP), argThat(webhook -> webhook.url().equals(url))); + verify(protocolWebhookRegistry).registerWebhook(eq(DATASPACE_PROTOCOL_HTTP_V_2024_1), argThat(webhook -> webhook.url().equals(url))); } @Test @@ -103,7 +109,10 @@ void shouldUseConfiguredProtocolWebhook(ServiceExtensionContext context, ObjectF extension.initialize(context); verify(portMappingRegistry).register(new PortMapping(ApiContext.PROTOCOL, 1234, "/path")); - assertThat(context.getService(ProtocolWebhook.class).url()).isEqualTo("http://webhook"); + + verify(protocolWebhookRegistry).registerWebhook(eq(DATASPACE_PROTOCOL_HTTP), argThat(webhook -> webhook.url().equals(webhookAddress))); + verify(protocolWebhookRegistry).registerWebhook(eq(DATASPACE_PROTOCOL_HTTP_V_2024_1), argThat(webhook -> webhook.url().equals(webhookAddress))); + } @Test diff --git a/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/http/DspHttpCoreExtension.java b/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/http/DspHttpCoreExtension.java index 0e738ce34c6..1a0235cf44d 100644 --- a/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/http/DspHttpCoreExtension.java +++ b/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/http/DspHttpCoreExtension.java @@ -169,6 +169,7 @@ public DspProtocolParser dspProtocolParser() { return dspProtocolParser; } + private void registerNegotiationPolicyScopes(DspHttpRemoteMessageDispatcher dispatcher) { dispatcher.registerPolicyScope(ContractAgreementMessage.class, ContractRemoteMessage::getPolicy, RequestContractNegotiationPolicyContext::new); dispatcher.registerPolicyScope(ContractNegotiationEventMessage.class, ContractRemoteMessage::getPolicy, RequestContractNegotiationPolicyContext::new); diff --git a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/protocol/ProtocolWebhookRegistry.java b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/protocol/ProtocolWebhookRegistry.java new file mode 100644 index 00000000000..f86c2f38c81 --- /dev/null +++ b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/protocol/ProtocolWebhookRegistry.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.spi.protocol; + +import org.eclipse.edc.runtime.metamodel.annotation.ExtensionPoint; +import org.jetbrains.annotations.Nullable; + +/** + * A registry for protocol webhooks. + */ +@ExtensionPoint +public interface ProtocolWebhookRegistry { + + /** + * Register a webhook for a protocol. + * + * @param protocol The protocol + * @param webhook The webhook + */ + void registerWebhook(String protocol, ProtocolWebhook webhook); + + /** + * Resolve a webhook for a protocol. + * + * @param protocol The protocol + * @return The webhook for the protocol, or null if no webhook is registered for the protocol + */ + @Nullable + ProtocolWebhook resolve(String protocol); +} diff --git a/spi/control-plane/catalog-spi/src/main/java/org/eclipse/edc/connector/controlplane/catalog/spi/DataServiceRegistry.java b/spi/control-plane/catalog-spi/src/main/java/org/eclipse/edc/connector/controlplane/catalog/spi/DataServiceRegistry.java index f1714278724..4888c96b901 100644 --- a/spi/control-plane/catalog-spi/src/main/java/org/eclipse/edc/connector/controlplane/catalog/spi/DataServiceRegistry.java +++ b/spi/control-plane/catalog-spi/src/main/java/org/eclipse/edc/connector/controlplane/catalog/spi/DataServiceRegistry.java @@ -24,14 +24,16 @@ public interface DataServiceRegistry { /** * Register a {@link DataService} with its {@link DistributionResolver}. * + * @param protocol the protocol * @param dataService the Data Service */ - void register(DataService dataService); + void register(String protocol, DataService dataService); /** * Returns all the {@link DataService}s * + * @param protocol the protocol * @return a list of Data Services. Always not null */ - List getDataServices(); + List getDataServices(String protocol); } diff --git a/spi/control-plane/catalog-spi/src/main/java/org/eclipse/edc/connector/controlplane/catalog/spi/DatasetResolver.java b/spi/control-plane/catalog-spi/src/main/java/org/eclipse/edc/connector/controlplane/catalog/spi/DatasetResolver.java index 263efd9ca06..7691ef5181d 100644 --- a/spi/control-plane/catalog-spi/src/main/java/org/eclipse/edc/connector/controlplane/catalog/spi/DatasetResolver.java +++ b/spi/control-plane/catalog-spi/src/main/java/org/eclipse/edc/connector/controlplane/catalog/spi/DatasetResolver.java @@ -33,7 +33,7 @@ public interface DatasetResolver { * @return a stream of datasets. */ @NotNull - Stream query(ParticipantAgent agent, QuerySpec querySpec); + Stream query(ParticipantAgent agent, QuerySpec querySpec, String protocol); /** * Resolves a {@link Dataset} given its id @@ -42,5 +42,5 @@ public interface DatasetResolver { * @param id the dataset id. * @return the {@link Dataset} if found, null otherwise. */ - Dataset getById(ParticipantAgent participantAgent, String id); + Dataset getById(ParticipantAgent participantAgent, String id, String protocol); } diff --git a/spi/control-plane/catalog-spi/src/main/java/org/eclipse/edc/connector/controlplane/catalog/spi/DistributionResolver.java b/spi/control-plane/catalog-spi/src/main/java/org/eclipse/edc/connector/controlplane/catalog/spi/DistributionResolver.java index 6fb6515edb3..c0458ea7e37 100644 --- a/spi/control-plane/catalog-spi/src/main/java/org/eclipse/edc/connector/controlplane/catalog/spi/DistributionResolver.java +++ b/spi/control-plane/catalog-spi/src/main/java/org/eclipse/edc/connector/controlplane/catalog/spi/DistributionResolver.java @@ -28,5 +28,5 @@ public interface DistributionResolver { * * @return a list of Distributions, always not null */ - List getDistributions(Asset asset); + List getDistributions(String protocol, Asset asset); } diff --git a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/controlplane/services/spi/catalog/CatalogProtocolService.java b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/controlplane/services/spi/catalog/CatalogProtocolService.java index a55e56b5310..d6b1361cd30 100644 --- a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/controlplane/services/spi/catalog/CatalogProtocolService.java +++ b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/controlplane/services/spi/catalog/CatalogProtocolService.java @@ -45,5 +45,5 @@ public interface CatalogProtocolService { * @return succeeded result with the {@link Dataset}, failed result otherwise. */ @NotNull - ServiceResult getDataset(String datasetId, TokenRepresentation tokenRepresentation); + ServiceResult getDataset(String datasetId, TokenRepresentation tokenRepresentation, String protocol); }