diff --git a/DEPENDENCIES b/DEPENDENCIES index b5db0adf4d..cd89b97a18 100644 --- a/DEPENDENCIES +++ b/DEPENDENCIES @@ -182,9 +182,10 @@ maven/mavencentral/javax.ws.rs/javax.ws.rs-api/2.1, (CDDL-1.1 OR GPL-2.0 WITH Cl maven/mavencentral/joda-time/joda-time/2.10.5, Apache-2.0, approved, clearlydefined maven/mavencentral/junit/junit/4.13.2, EPL-2.0, approved, CQ23636 maven/mavencentral/net.bytebuddy/byte-buddy-agent/1.14.1, Apache-2.0, approved, #7164 -maven/mavencentral/net.bytebuddy/byte-buddy-agent/1.14.11, Apache-2.0, approved, #7164 +maven/mavencentral/net.bytebuddy/byte-buddy-agent/1.14.12, Apache-2.0, approved, #7164 maven/mavencentral/net.bytebuddy/byte-buddy/1.14.1, Apache-2.0 AND BSD-3-Clause, approved, #7163 maven/mavencentral/net.bytebuddy/byte-buddy/1.14.11, Apache-2.0 AND BSD-3-Clause, approved, #7163 +maven/mavencentral/net.bytebuddy/byte-buddy/1.14.12, Apache-2.0 AND BSD-3-Clause, approved, #7163 maven/mavencentral/net.java.dev.jna/jna/5.13.0, Apache-2.0 AND LGPL-2.1-or-later, approved, #6709 maven/mavencentral/net.javacrumbs.json-unit/json-unit-core/2.36.0, Apache-2.0, approved, clearlydefined maven/mavencentral/net.minidev/accessors-smart/2.4.7, Apache-2.0, approved, #7515 @@ -321,8 +322,8 @@ maven/mavencentral/org.lz4/lz4-java/1.8.0, Apache-2.0, approved, clearlydefined maven/mavencentral/org.mock-server/mockserver-client-java/5.15.0, Apache-2.0 AND LGPL-3.0-only, approved, #9324 maven/mavencentral/org.mock-server/mockserver-core/5.15.0, Apache-2.0, approved, clearlydefined maven/mavencentral/org.mock-server/mockserver-netty/5.15.0, Apache-2.0, approved, #9276 +maven/mavencentral/org.mockito/mockito-core/5.11.0, MIT AND (Apache-2.0 AND MIT) AND Apache-2.0, approved, #13505 maven/mavencentral/org.mockito/mockito-core/5.2.0, MIT AND (Apache-2.0 AND MIT) AND Apache-2.0, approved, #7401 -maven/mavencentral/org.mockito/mockito-core/5.9.0, MIT AND (Apache-2.0 AND MIT) AND Apache-2.0, approved, #12774 maven/mavencentral/org.mockito/mockito-inline/5.2.0, MIT, approved, clearlydefined maven/mavencentral/org.mozilla/rhino/1.7.7.2, MPL-2.0 AND BSD-3-Clause AND ISC, approved, CQ16320 maven/mavencentral/org.objenesis/objenesis/3.3, Apache-2.0, approved, clearlydefined diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java index 275d019b18..be48d2adba 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java @@ -19,6 +19,7 @@ import org.eclipse.edc.connector.dataplane.spi.DataFlow; import org.eclipse.edc.connector.dataplane.spi.DataFlowStates; import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure; import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry; import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; import org.eclipse.edc.spi.entity.StatefulEntity; @@ -29,6 +30,7 @@ import org.eclipse.edc.statemachine.Processor; import org.eclipse.edc.statemachine.ProcessorImpl; import org.eclipse.edc.statemachine.StateMachineManager; +import org.jetbrains.annotations.Nullable; import java.util.Objects; import java.util.Optional; @@ -79,32 +81,36 @@ public void initiate(DataFlowStartMessage dataRequest) { } @Override - public DataFlowStates transferState(String processId) { + public DataFlowStates getTransferState(String processId) { return Optional.ofNullable(store.findById(processId)).map(StatefulEntity::getState) .map(DataFlowStates::from).orElse(null); } @Override - public StatusResult terminate(String dataFlowId) { + public StatusResult terminate(String dataFlowId, @Nullable String reason) { var result = store.findByIdAndLease(dataFlowId); - if (result.succeeded()) { - var dataFlow = result.getContent(); - var transferService = transferServiceRegistry.resolveTransferService(dataFlow.toRequest()); + if (result.failed()) { + return StatusResult.from(result).map(it -> null); + } - if (transferService == null) { - return StatusResult.failure(FATAL_ERROR, "TransferService cannot be resolved for DataFlow %s".formatted(dataFlowId)); - } + var dataFlow = result.getContent(); + var transferService = transferServiceRegistry.resolveTransferService(dataFlow.toRequest()); - var terminateResult = transferService.terminate(dataFlow); - if (terminateResult.failed()) { + if (transferService == null) { + return StatusResult.failure(FATAL_ERROR, "TransferService cannot be resolved for DataFlow %s".formatted(dataFlowId)); + } + + var terminateResult = transferService.terminate(dataFlow); + if (terminateResult.failed()) { + if (terminateResult.reason().equals(StreamFailure.Reason.NOT_FOUND)) { + monitor.warning("No source was found for DataFlow '%s'. This may indicate an inconsistent state.".formatted(dataFlowId)); + } else { return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, terminateResult.getFailureDetail())); } - dataFlow.transitToTerminated(); - store.save(dataFlow); - return StatusResult.success(); - } else { - return StatusResult.from(result).map(it -> null); } + dataFlow.transitToTerminated(reason); + store.save(dataFlow); + return StatusResult.success(); } @Override diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/PublicEndpointGeneratorServiceImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/PublicEndpointGeneratorServiceImplTest.java index 5c77672baa..ac96773410 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/PublicEndpointGeneratorServiceImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/PublicEndpointGeneratorServiceImplTest.java @@ -18,8 +18,6 @@ import org.eclipse.edc.spi.types.domain.DataAddress; import org.junit.jupiter.api.Test; -import java.util.Map; - import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; @@ -29,7 +27,7 @@ class PublicEndpointGeneratorServiceImplTest { @Test void generateFor() { - var endpoint = new Endpoint(Map.of("fizz", "buzz"), "bar-type"); + var endpoint = new Endpoint("fizz", "bar-type"); generatorService.addGeneratorFunction("testtype", dataAddress -> endpoint); assertThat(generatorService.generateFor(DataAddress.Builder.newInstance().type("testtype").build())).isSucceeded() @@ -43,5 +41,5 @@ void generateFor_noFunction() { .detail() .isEqualTo("No Endpoint generator function registered for source data type 'testtype'"); } - + } \ No newline at end of file diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java index 36c981a41f..ad00dda022 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java @@ -58,7 +58,7 @@ class DataPlaneAuthorizationServiceImplTest { @BeforeEach void setup() { - when(endpointGenerator.generateFor(any())).thenReturn(Result.success(new Endpoint(Map.of("url", "http://example.com"), "https://w3id.org/idsa/v4.1/HTTP"))); + when(endpointGenerator.generateFor(any())).thenReturn(Result.success(Endpoint.url("http://example.com"))); } @Test @@ -70,7 +70,7 @@ void createEndpointDataReference() { assertThat(result).isSucceeded() .satisfies(da -> { assertThat(da.getType()).isEqualTo("https://w3id.org/idsa/v4.1/HTTP"); - assertThat(da.getProperties().get("endpoint")).isEqualTo(Map.of("url", "http://example.com")); + assertThat(da.getProperties().get("endpoint")).isEqualTo("http://example.com"); assertThat(da.getProperties().get("endpointType")).isEqualTo(da.getType()); assertThat(da.getStringProperty("authorization")).isEqualTo("footoken"); }); @@ -98,7 +98,7 @@ void createEndpointDataReference_withAuthType() { assertThat(result).isSucceeded() .satisfies(da -> { assertThat(da.getType()).isEqualTo("https://w3id.org/idsa/v4.1/HTTP"); - assertThat(da.getProperties().get("endpoint")).isEqualTo(Map.of("url", "http://example.com")); + assertThat(da.getProperties().get("endpoint")).isEqualTo("http://example.com"); assertThat(da.getStringProperty("authorization")).isEqualTo("footoken"); assertThat(da.getStringProperty("authType")).isEqualTo("bearer"); assertThat(da.getStringProperty("fizz")).isEqualTo("buzz"); @@ -165,4 +165,4 @@ private DataFlowStartMessage.Builder createStartMessage() { .destinationDataAddress(DataAddress.Builder.newInstance().type("test-dest").build()) .properties(Map.of("foo", "bar", "fizz", "buzz")); } -} \ No newline at end of file +} diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java index a9b5f9bb0a..ca4fa63e26 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java @@ -41,6 +41,7 @@ import static java.util.concurrent.CompletableFuture.failedFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.connector.dataplane.spi.DataFlow.TERMINATION_REASON; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.COMPLETED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.FAILED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.NOTIFIED; @@ -124,6 +125,20 @@ void terminate_shouldTerminateDataFlow() { verify(transferService).terminate(dataFlow); } + @Test + void terminate_shouldTerminateDataFlow_withReason() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); + when(registry.resolveTransferService(any())).thenReturn(transferService); + when(transferService.terminate(any())).thenReturn(StreamResult.success()); + + var result = manager.terminate("dataFlowId", "test-reason"); + + assertThat(result).isSucceeded(); + verify(store).save(argThat(d -> d.getState() == TERMINATED.code() && d.getProperties().get(TERMINATION_REASON).equals("test-reason"))); + verify(transferService).terminate(dataFlow); + } + @Test void terminate_shouldReturnFatalError_whenDataFlowDoesNotExist() { when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.notFound("not found")); @@ -172,6 +187,19 @@ void terminate_shouldReturnFatalError_whenDataFlowCannotBeTerminated() { verify(store, never()).save(any()); } + @Test + void terminate_shouldStillTerminate_whenDataFlowHasNoSource() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); + when(registry.resolveTransferService(any())).thenReturn(transferService); + when(transferService.terminate(any())).thenReturn(StreamResult.notFound()); + + var result = manager.terminate("dataFlowId", "test-reason"); + + assertThat(result).isSucceeded(); + verify(store).save(argThat(f -> f.getProperties().containsKey(TERMINATION_REASON))); + } + @Test void received_shouldStartTransferTransitionAndTransitionToStarted() { var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); diff --git a/extensions/data-plane/data-plane-control-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiController.java b/extensions/data-plane/data-plane-control-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiController.java index e0d2043f09..fc3686d055 100644 --- a/extensions/data-plane/data-plane-control-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiController.java +++ b/extensions/data-plane/data-plane-control-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiController.java @@ -36,8 +36,8 @@ import static java.lang.String.format; @Path("/transfer") -@Consumes({MediaType.APPLICATION_JSON}) -@Produces({MediaType.APPLICATION_JSON}) +@Consumes({ MediaType.APPLICATION_JSON }) +@Produces({ MediaType.APPLICATION_JSON }) public class DataPlaneControlApiController implements DataPlaneControlApi { private final DataPlaneManager dataPlaneManager; @@ -65,7 +65,7 @@ public void initiateTransfer(DataFlowStartMessage request, @Suspended AsyncRespo @Override @Path("/{transferProcessId}") public DataFlowStates getTransferState(@PathParam("transferProcessId") String transferProcessId) { - return dataPlaneManager.transferState(transferProcessId); + return dataPlaneManager.getTransferState(transferProcessId); } @DELETE diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api-configuration/src/main/java/org/eclipse/edc/connector/api/signaling/configuration/SignalingApiConfigurationExtension.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api-configuration/src/main/java/org/eclipse/edc/connector/api/signaling/configuration/SignalingApiConfigurationExtension.java index 7ae368a008..369c00009d 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api-configuration/src/main/java/org/eclipse/edc/connector/api/signaling/configuration/SignalingApiConfigurationExtension.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api-configuration/src/main/java/org/eclipse/edc/connector/api/signaling/configuration/SignalingApiConfigurationExtension.java @@ -14,11 +14,14 @@ package org.eclipse.edc.connector.api.signaling.configuration; +import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.json.Json; import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistry; import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistryImpl; +import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataAddressTransformer; import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowResponseMessageTransformer; import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataAddressTransformer; +import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowStartMessageTransformer; import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowSuspendMessageTransformer; import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowTerminateMessageTransformer; import org.eclipse.edc.jsonld.spi.JsonLd; @@ -36,6 +39,7 @@ import org.eclipse.edc.web.spi.WebService; import org.eclipse.edc.web.spi.configuration.WebServiceConfigurer; import org.eclipse.edc.web.spi.configuration.WebServiceSettings; +import org.jetbrains.annotations.NotNull; import java.util.Map; @@ -87,7 +91,7 @@ public void initialize(ServiceExtensionContext context) { context.registerService(SignalingApiConfiguration.class, new SignalingApiConfiguration(webServiceConfiguration)); jsonLd.registerNamespace(ODRL_PREFIX, ODRL_SCHEMA, SIGNALING_SCOPE); - var jsonLdMapper = typeManager.getMapper(JSON_LD); + var jsonLdMapper = getJsonLdMapper(); webService.registerResource(webServiceConfiguration.getContextAlias(), new ObjectMapperProvider(jsonLdMapper)); webService.registerResource(webServiceConfiguration.getContextAlias(), new JerseyJsonLdInterceptor(jsonLd, jsonLdMapper, SIGNALING_SCOPE)); } @@ -97,10 +101,17 @@ public SignalingApiTransformerRegistry managementApiTypeTransformerRegistry() { var factory = Json.createBuilderFactory(Map.of()); var registry = new SignalingApiTransformerRegistryImpl(this.transformerRegistry); + registry.register(new JsonObjectToDataFlowStartMessageTransformer()); registry.register(new JsonObjectToDataFlowSuspendMessageTransformer()); registry.register(new JsonObjectToDataFlowTerminateMessageTransformer()); registry.register(new JsonObjectToDataAddressTransformer()); registry.register(new JsonObjectFromDataFlowResponseMessageTransformer(factory)); + registry.register(new JsonObjectFromDataAddressTransformer(factory, getJsonLdMapper())); return registry; } + + @NotNull + private ObjectMapper getJsonLdMapper() { + return typeManager.getMapper(JSON_LD); + } } diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/build.gradle.kts b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/build.gradle.kts index 71c78059d6..b729617334 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/build.gradle.kts +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/build.gradle.kts @@ -24,8 +24,11 @@ dependencies { api(project(":spi:data-plane:data-plane-spi")) implementation(project(":extensions:common:api:control-api-configuration")) + implementation(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-transform")) implementation(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-api-configuration")) implementation(libs.jakarta.rsApi) + testImplementation(libs.restAssured) + testImplementation(testFixtures(project(":extensions:common:http:jersey-core"))) } edcBuild { diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlaneSignalingApiExtension.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlaneSignalingApiExtension.java index dee9d65afe..0cdc573252 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlaneSignalingApiExtension.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlaneSignalingApiExtension.java @@ -15,7 +15,10 @@ package org.eclipse.edc.connector.dataplane.api; import org.eclipse.edc.connector.api.signaling.configuration.SignalingApiConfiguration; +import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistry; import org.eclipse.edc.connector.dataplane.api.controller.v1.DataPlaneSignalingApiController; +import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; +import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.spi.system.ServiceExtension; @@ -34,6 +37,13 @@ public class DataPlaneSignalingApiExtension implements ServiceExtension { @Inject private SignalingApiConfiguration controlApiConfiguration; + @Inject + private SignalingApiTransformerRegistry transformerRegistry; + @Inject + private DataPlaneAuthorizationService authorizationService; + @Inject + private DataPlaneManager dataPlaneManager; + @Override public String name() { return NAME; @@ -41,6 +51,7 @@ public String name() { @Override public void initialize(ServiceExtensionContext context) { - webService.registerResource(controlApiConfiguration.getContextAlias(), new DataPlaneSignalingApiController()); + var controller = new DataPlaneSignalingApiController(transformerRegistry, authorizationService, dataPlaneManager, context.getMonitor().withPrefix("SignalingAPI")); + webService.registerResource(controlApiConfiguration.getContextAlias(), controller); } } diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApi.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApi.java index 18be3164af..088b372e1b 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApi.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApi.java @@ -23,7 +23,6 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.json.JsonObject; -import jakarta.ws.rs.container.AsyncResponse; import org.eclipse.edc.connector.dataplane.api.model.DataFlowState; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; @@ -51,7 +50,7 @@ public interface DataPlaneSignalingApi { content = @Content(schema = @Schema(implementation = DataFlowResponseMessageSchema.class))), } ) - JsonObject start(JsonObject dataFlowStartMessage, AsyncResponse response); + JsonObject start(JsonObject dataFlowStartMessage); @Operation(description = "Get the current state of a data transfer.", responses = { diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiController.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiController.java index 1a86492359..a34ab0c1b4 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiController.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiController.java @@ -14,6 +14,7 @@ package org.eclipse.edc.connector.dataplane.api.controller.v1; +import jakarta.json.Json; import jakarta.json.JsonObject; import jakarta.ws.rs.Consumes; import jakarta.ws.rs.GET; @@ -21,39 +22,98 @@ import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; -import jakarta.ws.rs.container.AsyncResponse; -import jakarta.ws.rs.container.Suspended; import jakarta.ws.rs.core.MediaType; +import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; +import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; +import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowSuspendMessage; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage; +import org.eclipse.edc.transform.spi.TypeTransformerRegistry; +import org.eclipse.edc.web.spi.exception.InvalidRequestException; + +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; @Consumes({ MediaType.APPLICATION_JSON }) @Produces({ MediaType.APPLICATION_JSON }) @Path("/v1/dataflows") public class DataPlaneSignalingApiController implements DataPlaneSignalingApi { + private final TypeTransformerRegistry typeTransformerRegistry; + private final DataPlaneAuthorizationService dataPlaneAuthorizationService; + private final DataPlaneManager dataPlaneManager; + private final Monitor monitor; + + public DataPlaneSignalingApiController(TypeTransformerRegistry typeTransformerRegistry, DataPlaneAuthorizationService dataPlaneAuthorizationService, DataPlaneManager dataPlaneManager, Monitor monitor) { + this.typeTransformerRegistry = typeTransformerRegistry; + this.dataPlaneAuthorizationService = dataPlaneAuthorizationService; + this.dataPlaneManager = dataPlaneManager; + this.monitor = monitor; + } + @POST @Override - public JsonObject start(JsonObject dataFlowStartMessage, @Suspended AsyncResponse response) { - throw new UnsupportedOperationException(); + public JsonObject start(JsonObject dataFlowStartMessage) { + var startMsg = typeTransformerRegistry.transform(dataFlowStartMessage, DataFlowStartMessage.class) + .onFailure(f -> monitor.warning("Error transforming %s: %s".formatted(DataFlowStartMessage.class, f.getFailureDetail()))) + .orElseThrow(InvalidRequestException::new); + + dataPlaneManager.validate(startMsg) + .onFailure(f -> monitor.warning("Failed to validate request: %s".formatted(f.getFailureDetail()))) + .orElseThrow(f -> f.getMessages().isEmpty() ? + new InvalidRequestException("Failed to validate request: %s".formatted(startMsg.getId())) : + new InvalidRequestException(f.getMessages())); + + monitor.debug("Create EDR"); + var dataAddress = dataPlaneAuthorizationService.createEndpointDataReference(startMsg) + .onFailure(f -> monitor.warning("Error obtaining EDR DataAddress: %s".formatted(f.getFailureDetail()))) + .orElseThrow(InvalidRequestException::new); + + dataPlaneManager.initiate(startMsg); + + return typeTransformerRegistry.transform(dataAddress, JsonObject.class) + .onFailure(f -> monitor.warning("Error obtaining EDR DataAddress: %s".formatted(f.getFailureDetail()))) + .orElseThrow(f -> new EdcException(f.getFailureDetail())); } @GET @Path("/{id}/state") @Override public JsonObject getTransferState(@PathParam("id") String transferProcessId) { - throw new UnsupportedOperationException(); + var state = dataPlaneManager.getTransferState(transferProcessId); + // not really worth to create a dedicated transformer for this simple object + + return Json.createObjectBuilder() + .add(TYPE, "DataFlowState") + .add(EDC_NAMESPACE + "state", state.toString()) + .build(); } @POST @Path("/{id}/terminate") @Override - public void terminate(@PathParam("id") String transferProcessId, JsonObject terminationMessage) { - throw new UnsupportedOperationException(); + public void terminate(@PathParam("id") String dataFlowId, JsonObject terminationMessage) { + + var msg = typeTransformerRegistry.transform(terminationMessage, DataFlowTerminateMessage.class) + .onFailure(f -> monitor.warning("Error transforming %s: %s".formatted(DataFlowTerminateMessage.class, f.getFailureDetail()))) + .orElseThrow(InvalidRequestException::new); + + dataPlaneManager.terminate(dataFlowId, msg.getReason()) + .orElseThrow(InvalidRequestException::new); } @POST @Path("/{id}/suspend") @Override public void suspend(@PathParam("id") String id, JsonObject suspendMessage) { - throw new UnsupportedOperationException(); + var msg = typeTransformerRegistry.transform(suspendMessage, DataFlowSuspendMessage.class) + .onFailure(f -> monitor.warning("Error transforming %s: %s".formatted(DataFlowSuspendMessage.class, f.getFailureDetail()))) + .orElseThrow(InvalidRequestException::new); + + monitor.warning(" >>> A valid DataFlowSuspendMessage was provided, but suspension messages are not yet supported, " + + "and will depend on https://github.com/eclipse-edc/Connector/issues/3350."); + throw new UnsupportedOperationException("Not yet implemented"); } } diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiControllerTest.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiControllerTest.java new file mode 100644 index 0000000000..077550c4ab --- /dev/null +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiControllerTest.java @@ -0,0 +1,261 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller.v1; + +import io.restassured.http.ContentType; +import io.restassured.specification.RequestSpecification; +import jakarta.json.Json; +import jakarta.json.JsonObject; +import jakarta.json.JsonString; +import org.eclipse.edc.connector.dataplane.spi.DataFlowStates; +import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; +import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; +import org.eclipse.edc.spi.response.ResponseStatus; +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowSuspendMessage; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.transform.spi.TypeTransformerRegistry; +import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.net.URI; + +import static io.restassured.RestAssured.given; +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.spi.result.Result.failure; +import static org.eclipse.edc.spi.result.Result.success; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +class DataPlaneSignalingApiControllerTest extends RestControllerTestBase { + + private final TypeTransformerRegistry transformerRegistry = mock(); + private final DataPlaneAuthorizationService authService = mock(); + private final DataPlaneManager dataplaneManager = mock(); + + @DisplayName("Expect HTTP 200 and the correct EDR when a data flow is started") + @Test + void start() { + var flowStartMessage = createFlowStartMessage(); + when(transformerRegistry.transform(isA(JsonObject.class), eq(DataFlowStartMessage.class))) + .thenReturn(success(flowStartMessage)); + when(dataplaneManager.validate(any())).thenReturn(success(true)); + when(authService.createEndpointDataReference(any())) + .thenReturn(success(DataAddress.Builder.newInstance().type("test-edr").build())); + + when(transformerRegistry.transform(isA(DataAddress.class), eq(JsonObject.class))) + .thenReturn(success(Json.createObjectBuilder().add("foo", "bar").build())); + + var jsonObject = Json.createObjectBuilder().build(); + var result = baseRequest() + .contentType(ContentType.JSON) + .body(jsonObject) + .post("/v1/dataflows") + .then() + .statusCode(200) + .extract().body().as(JsonObject.class); + + assertThat(result).hasEntrySatisfying("foo", val -> assertThat(((JsonString) val).getString()).isEqualTo("bar")); + verify(dataplaneManager).initiate(eq(flowStartMessage)); + } + + @DisplayName("Expect HTTP 400 when DataFlowStartMessage is invalid") + @Test + void start_whenInvalidMessage() { + when(transformerRegistry.transform(isA(JsonObject.class), eq(DataFlowStartMessage.class))) + .thenReturn(success(createFlowStartMessage())); + when(dataplaneManager.validate(any())).thenReturn(Result.failure("foobar")); + + var jsonObject = Json.createObjectBuilder().build(); + baseRequest() + .contentType(ContentType.JSON) + .body(jsonObject) + .post("/v1/dataflows") + .then() + .statusCode(400); + + verify(transformerRegistry).transform(isA(JsonObject.class), eq(DataFlowStartMessage.class)); + verify(dataplaneManager).validate(any(DataFlowStartMessage.class)); + verifyNoMoreInteractions(transformerRegistry, dataplaneManager, authService); + + } + + @DisplayName("Expect HTTP 400 when DataFlowStartMessage cannot be deserialized") + @Test + void start_whenTransformationFails() { + when(transformerRegistry.transform(isA(JsonObject.class), eq(DataFlowStartMessage.class))).thenReturn(Result.failure("foo-bar")); + + var jsonObject = Json.createObjectBuilder().build(); + baseRequest() + .contentType(ContentType.JSON) + .body(jsonObject) + .post("/v1/dataflows") + .then() + .statusCode(400); + + verify(transformerRegistry).transform(isA(JsonObject.class), eq(DataFlowStartMessage.class)); + verifyNoMoreInteractions(transformerRegistry, dataplaneManager, authService); + } + + @DisplayName("Expect HTTP 400 when an EDR cannot be created") + @Test + void start_whenCreateEdrFails() { + when(transformerRegistry.transform(isA(JsonObject.class), eq(DataFlowStartMessage.class))) + .thenReturn(success(createFlowStartMessage())); + when(dataplaneManager.validate(any())).thenReturn(success(true)); + when(authService.createEndpointDataReference(any())) + .thenReturn(Result.failure("test-failure")); + + var jsonObject = Json.createObjectBuilder().build(); + baseRequest() + .contentType(ContentType.JSON) + .body(jsonObject) + .post("/v1/dataflows") + .then() + .statusCode(400); + + verify(transformerRegistry).transform(isA(JsonObject.class), eq(DataFlowStartMessage.class)); + verify(dataplaneManager).validate(any(DataFlowStartMessage.class)); + verify(authService).createEndpointDataReference(any()); + verifyNoMoreInteractions(transformerRegistry, dataplaneManager, authService); + } + + @DisplayName("Expect HTTP 500 when the DataAddress cannot be serialized on egress") + @Test + void start_whenDataAddressTransformationFails() { + var flowStartMessage = createFlowStartMessage(); + when(transformerRegistry.transform(isA(JsonObject.class), eq(DataFlowStartMessage.class))) + .thenReturn(success(flowStartMessage)); + when(dataplaneManager.validate(any())).thenReturn(success(true)); + when(authService.createEndpointDataReference(any())) + .thenReturn(success(DataAddress.Builder.newInstance().type("test-edr").build())); + + when(transformerRegistry.transform(isA(DataAddress.class), eq(JsonObject.class))) + .thenReturn(failure("test-failure")); + + var jsonObject = Json.createObjectBuilder().build(); + var result = baseRequest() + .contentType(ContentType.JSON) + .body(jsonObject) + .post("/v1/dataflows") + .then() + .statusCode(500); + + verify(dataplaneManager).initiate(eq(flowStartMessage)); + } + + @DisplayName("Expect HTTP 200 and the correct response when getting the state") + @Test + void getTransferState() { + var flowId = "test-id"; + when(dataplaneManager.getTransferState(eq(flowId))).thenReturn(DataFlowStates.RECEIVED); + + var state = baseRequest() + .get("/v1/dataflows/%s/state".formatted(flowId)) + .then() + .statusCode(200) + .extract().as(JsonObject.class); + assertThat(state.getString(EDC_NAMESPACE + "state")).isEqualTo("RECEIVED"); + } + + @DisplayName("Expect HTTP 204 when DataFlow is terminated successfully") + @Test + void terminate() { + when(transformerRegistry.transform(isA(JsonObject.class), eq(DataFlowTerminateMessage.class))) + .thenReturn(success(DataFlowTerminateMessage.Builder.newInstance().reason("test-reason").build())); + var flowId = "test-id"; + when(dataplaneManager.terminate(eq(flowId), any())).thenReturn(StatusResult.success()); + + var jsonObject = Json.createObjectBuilder().build(); + baseRequest() + .contentType(ContentType.JSON) + .body(jsonObject) + .post("/v1/dataflows/%s/terminate".formatted(flowId)) + .then() + .statusCode(204); + } + + @DisplayName("Expect HTTP 400 when DataFlow is cannot be terminated") + @Test + void terminate_whenCannotTerminate() { + when(transformerRegistry.transform(isA(JsonObject.class), eq(DataFlowTerminateMessage.class))) + .thenReturn(success(DataFlowTerminateMessage.Builder.newInstance().reason("test-reason").build())); + var flowId = "test-id"; + when(dataplaneManager.terminate(eq(flowId), any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR)); + + var jsonObject = Json.createObjectBuilder().build(); + baseRequest() + .contentType(ContentType.JSON) + .body(jsonObject) + .post("/v1/dataflows/%s/terminate".formatted(flowId)) + .then() + .statusCode(400); + } + + @DisplayName("Expect HTTP 501 because suspension messages are not yet supported") + @Test + void suspend() { + var flowId = "test-id"; + when(transformerRegistry.transform(isA(JsonObject.class), eq(DataFlowSuspendMessage.class))).thenReturn(success(DataFlowSuspendMessage.Builder.newInstance() + .reason("foo-reaset") + .build())); + + var jsonObject = Json.createObjectBuilder().build(); + baseRequest() + .contentType(ContentType.JSON) + .body(jsonObject) + .post("/v1/dataflows/%s/suspend".formatted(flowId)) + .then() + .statusCode(501) + .body(Matchers.containsString("Not Implemented")); + + } + + @Override + protected Object controller() { + return new DataPlaneSignalingApiController(transformerRegistry, authService, dataplaneManager, mock()); + } + + private DataFlowStartMessage createFlowStartMessage() { + return DataFlowStartMessage.Builder.newInstance() + .processId("processId") + .assetId("assetId") + .agreementId("agreementId") + .participantId("participantId") + .flowType(FlowType.PUSH) + .callbackAddress(URI.create("http://localhost")) + .sourceDataAddress(DataAddress.Builder.newInstance().type("sourceType").build()) + .destinationDataAddress(DataAddress.Builder.newInstance().type("destType").build()) + .build(); + } + + private RequestSpecification baseRequest() { + return given() + .baseUri("http://localhost:" + port) + .when(); + } +} \ No newline at end of file diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataAddressTransformer.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataAddressTransformer.java index fac0b9dd6c..f87e95e40b 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataAddressTransformer.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataAddressTransformer.java @@ -40,7 +40,7 @@ public class JsonObjectFromDataAddressTransformer extends AbstractJsonLdTransfor private final JsonBuilderFactory jsonFactory; private final ObjectMapper mapper; - protected JsonObjectFromDataAddressTransformer(JsonBuilderFactory jsonFactory, ObjectMapper mapper) { + public JsonObjectFromDataAddressTransformer(JsonBuilderFactory jsonFactory, ObjectMapper mapper) { super(DataAddress.class, JsonObject.class); this.jsonFactory = jsonFactory; this.mapper = mapper; @@ -58,7 +58,8 @@ protected JsonObjectFromDataAddressTransformer(JsonBuilderFactory jsonFactory, O var objectBuilder = jsonFactory.createObjectBuilder() .add(TYPE, DSPACE_DATAADDRESS_TYPE) .add(ENDPOINT_TYPE_PROPERTY, dataAddress.getType()) - .add(ENDPOINT_PROPERTY, dataAddress.getStringProperty("endpoint")); + .add(ENDPOINT_PROPERTY, dataAddress.getProperties().getOrDefault("endpoint", "https://example.com").toString()); + objectBuilder.add(ENDPOINT_PROPERTIES_PROPERTY, propsBuilder.build()); diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataAddressTransformer.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataAddressTransformer.java index ed2a42db2a..965d375a8b 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataAddressTransformer.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataAddressTransformer.java @@ -16,7 +16,6 @@ import jakarta.json.JsonArray; import jakarta.json.JsonObject; -import jakarta.json.JsonString; import jakarta.json.JsonValue; import org.eclipse.edc.connector.api.signaling.transform.DspaceDataAddressSerialization; import org.eclipse.edc.jsonld.spi.transformer.AbstractJsonLdTransformer; @@ -30,7 +29,6 @@ import static org.eclipse.edc.connector.api.signaling.transform.DspaceDataAddressSerialization.ENDPOINT_PROPERTY_NAME_PROPERTY; import static org.eclipse.edc.connector.api.signaling.transform.DspaceDataAddressSerialization.ENDPOINT_PROPERTY_VALUE_PROPERTY; -import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.VALUE; /** * Transforms a {@link JsonObject} into a DataAddress using the DSPACE-serialization format. @@ -56,7 +54,7 @@ private void transformProperties(String key, JsonValue jsonValue, DataAddress.Bu builder.type(endpointType); } case DspaceDataAddressSerialization.ENDPOINT_PROPERTIES_PROPERTY -> - transformEndpointProperties(jsonValue, ep -> builder.property(ep.name(), ep.value())); + transformEndpointProperties(jsonValue, ep -> builder.property(ep.name(), ep.value()), context); default -> throw new IllegalArgumentException("Unexpected value: " + key); } } @@ -67,23 +65,24 @@ private void transformProperties(String key, JsonValue jsonValue, DataAddress.Bu * * @param jsonValue The endpointProperties JsonArray * @param consumer A consumer that takes the {@link DspaceEndpointProperty} and processes it. + * @param context the transformer context, to which this method delegates when transforming strings. */ - private void transformEndpointProperties(JsonValue jsonValue, Consumer consumer) { + private void transformEndpointProperties(JsonValue jsonValue, Consumer consumer, TransformerContext context) { Function converter = (jo) -> { - var name = jo.getJsonArray(ENDPOINT_PROPERTY_NAME_PROPERTY).get(0).asJsonObject().get(VALUE); - var value = jo.getJsonArray(ENDPOINT_PROPERTY_VALUE_PROPERTY).get(0).asJsonObject().get(VALUE); - return new DspaceEndpointProperty(((JsonString) name).getString(), ((JsonString) value).getString()); + var name = transformString(jo.get(ENDPOINT_PROPERTY_NAME_PROPERTY), context); + var value = transformString(jo.get(ENDPOINT_PROPERTY_VALUE_PROPERTY), context); + return new DspaceEndpointProperty(name, value); }; if (jsonValue instanceof JsonObject object) { consumer.accept(converter.apply(object)); } if (jsonValue instanceof JsonArray array) { // invoke the method recursively for every dspace:EndpointProperty entry - array.forEach(jv -> transformEndpointProperties(jv, consumer)); + array.forEach(jv -> transformEndpointProperties(jv, consumer, context)); } } - //container to hold endpoint property objects + //container to hold endpoint property objects private record DspaceEndpointProperty(String name, String value) { } } diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java index 47673c2546..f2551f8fca 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java @@ -19,8 +19,11 @@ import org.eclipse.edc.spi.entity.StatefulEntity; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.jetbrains.annotations.Nullable; import java.net.URI; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -37,10 +40,11 @@ */ public class DataFlow extends StatefulEntity { + public static final String TERMINATION_REASON = "terminationReason"; private DataAddress source; private DataAddress destination; private URI callbackAddress; - private Map properties = Map.of(); + private Map properties = new HashMap<>(); @Override public DataFlow copy() { @@ -71,7 +75,7 @@ public URI getCallbackAddress() { } public Map getProperties() { - return properties; + return Collections.unmodifiableMap(properties); } public DataFlowStartMessage toRequest() { @@ -103,8 +107,11 @@ public void transitToNotified() { transitionTo(NOTIFIED.code()); } - public void transitToTerminated() { + public void transitToTerminated(@Nullable String reason) { transitionTo(TERMINATED.code()); + if (reason != null) { + properties.put(TERMINATION_REASON, reason); + } } public void transitionToStarted() { @@ -155,7 +162,7 @@ public Builder callbackAddress(URI callbackAddress) { } public Builder properties(Map properties) { - entity.properties = properties; + entity.properties = new HashMap<>(properties); return this; } diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/Endpoint.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/Endpoint.java index c8a455e4bc..1f02bd29fe 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/Endpoint.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/Endpoint.java @@ -14,8 +14,6 @@ package org.eclipse.edc.connector.dataplane.spi; -import java.util.Map; - /** * Representation of a publicly accessible data egress point. This indicates to consumers the type of data (denoted by the {@code endpointType}) * and an object containing fields describing the endpoint. @@ -25,7 +23,7 @@ * @param endpoint An object describing the endpoint * @param endpointType A string uniquely identifying the type of endpoint */ -public record Endpoint(Map endpoint, String endpointType) { +public record Endpoint(String endpoint, String endpointType) { /** * Convenience factory method to create a HTTP endpoint. @@ -34,7 +32,7 @@ public record Endpoint(Map endpoint, String endpointType) { * @return the endpoint. */ public static Endpoint url(String url) { - return new Endpoint(Map.of("url", url), "https://w3id.org/idsa/v4.1/HTTP"); + return new Endpoint(url, "https://w3id.org/idsa/v4.1/HTTP"); } } diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java index 68427d5bf5..c5a8a00557 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java @@ -20,6 +20,7 @@ import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.jetbrains.annotations.Nullable; /** * Manages the execution of data plane requests. @@ -40,7 +41,7 @@ public interface DataPlaneManager extends StateEntityManager { /** * Returns the transfer state for the process. */ - DataFlowStates transferState(String processId); + DataFlowStates getTransferState(String processId); /** * Terminate the data flow. @@ -48,5 +49,16 @@ public interface DataPlaneManager extends StateEntityManager { * @param dataFlowId the data flow id. * @return success if data flow is terminated, failed otherwise. */ - StatusResult terminate(String dataFlowId); + default StatusResult terminate(String dataFlowId) { + return terminate(dataFlowId, null); + } + + /** + * Terminate the data flow and specifies a reason. + * + * @param dataFlowId the data flow id. + * @param reason the reason for the termination. May be null. + * @return success if data flow is terminated, failed otherwise. + */ + StatusResult terminate(String dataFlowId, @Nullable String reason); } diff --git a/system-tests/e2e-dataplane-tests/tests/build.gradle.kts b/system-tests/e2e-dataplane-tests/tests/build.gradle.kts index e410d9007b..369e6e7aaf 100644 --- a/system-tests/e2e-dataplane-tests/tests/build.gradle.kts +++ b/system-tests/e2e-dataplane-tests/tests/build.gradle.kts @@ -33,6 +33,9 @@ dependencies { testImplementation(libs.junit.jupiter.api) testImplementation(libs.mockserver.netty) testImplementation(libs.mockserver.client) + testImplementation(project(":core:common:transform-core")) // for the transformer registry impl + + implementation(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-transform")) testCompileOnly(project(":system-tests:e2e-dataplane-tests:runtimes:data-plane")) } diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java new file mode 100644 index 0000000000..16339d97af --- /dev/null +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.restassured.http.ContentType; +import jakarta.json.Json; +import jakarta.json.JsonObject; +import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistry; +import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistryImpl; +import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataAddressTransformer; +import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowStartMessageTransformer; +import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataAddressTransformer; +import org.eclipse.edc.connector.dataplane.spi.DataFlow; +import org.eclipse.edc.connector.dataplane.spi.DataFlowStates; +import org.eclipse.edc.connector.dataplane.spi.Endpoint; +import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService; +import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; +import org.eclipse.edc.core.transform.TypeTransformerRegistryImpl; +import org.eclipse.edc.jsonld.util.JacksonJsonLd; +import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; +import org.eclipse.edc.spi.result.Failure; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.test.e2e.participant.DataPlaneParticipant; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.net.URI; +import java.time.Duration; +import java.util.Map; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage.DATA_FLOW_TERMINATE_MESSAGE_REASON; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +@EndToEndTest +public class DataPlaneSignalingApiEndToEndTest { + + public static final String DATAPLANE_PUBLIC_ENDPOINT_URL = "http://fizz.buzz/bar"; + protected static final DataPlaneParticipant DATAPLANE = DataPlaneParticipant.Builder.newInstance() + .name("provider") + .id("urn:connector:provider") + .build(); + @RegisterExtension + static EdcRuntimeExtension runtime = + new EdcRuntimeExtension( + ":system-tests:e2e-dataplane-tests:runtimes:data-plane", + "data-plane", + DATAPLANE.dataPlaneConfiguration() + ); + + protected final Duration timeout = Duration.ofSeconds(60); + private ObjectMapper mapper; + private SignalingApiTransformerRegistry registry; + + @BeforeEach + void setup() { + // this registry is entirely separate from the one that is included in the runtime + registry = new SignalingApiTransformerRegistryImpl(new TypeTransformerRegistryImpl()); + var builderFactory = Json.createBuilderFactory(Map.of()); + mapper = JacksonJsonLd.createObjectMapper(); + registry.register(new JsonObjectFromDataFlowStartMessageTransformer(builderFactory, mapper)); + registry.register(new JsonObjectFromDataAddressTransformer(builderFactory, mapper)); + registry.register(new JsonObjectToDataAddressTransformer()); + } + + @DisplayName("Verify the POST /v1/dataflows endpoint returns the correct EDR") + @Test + void startTransfer() throws JsonProcessingException { + var generator = runtime.getContext().getService(PublicEndpointGeneratorService.class); + generator.addGeneratorFunction("HttpData", dataAddress -> Endpoint.url(DATAPLANE_PUBLIC_ENDPOINT_URL)); + + var processId = "test-processId"; + var flowMessage = createStartMessage(processId); + var jo = registry.transform(flowMessage, JsonObject.class).orElseThrow(failTest()); + + var resultJson = DATAPLANE.initiateTransfer(jo); + var dataAddress = registry.transform(mapper.readValue(resultJson, JsonObject.class), DataAddress.class) + .orElseThrow(failTest()); + + // verify basic shape of the DSPACE data address (=EDR token) + assertThat(dataAddress).isNotNull(); + assertThat(dataAddress.getType()).isEqualTo("https://w3id.org/idsa/v4.1/HTTP"); + assertThat(dataAddress.getProperties()) + .containsKey("authorization") + .containsEntry("endpoint", DATAPLANE_PUBLIC_ENDPOINT_URL) + .containsEntry("authType", "bearer"); + + // verify that the data flow was created + var store = runtime.getService(DataPlaneStore.class).findById(processId); + assertThat(store).isNotNull(); + } + + @DisplayName("Verify that GET /v1/dataflows/{id}/state returns the correct state") + @Test + void getState() { + var dataFlowId = "test-flowId"; + + var flow = DataFlow.Builder.newInstance() + .id(dataFlowId) + .state(DataFlowStates.STARTED.code()) + .build(); + runtime.getService(DataPlaneStore.class).save(flow); + + var resultJson = DATAPLANE.getDataPlaneSignalingApi() + .baseRequest() + .contentType(ContentType.JSON) + .get("/v1/dataflows/%s/state".formatted(dataFlowId)) + .then() + .body(notNullValue()) + .statusCode(200) + .extract().response().jsonPath(); + + assertThat(resultJson.getString("state")).isEqualToIgnoringCase("STARTED"); + } + + @DisplayName("Verify that POST /v1/dataflows/{id}/terminate terminates the flow, with an optional message") + @Test + void terminate() { + var dataFlowId = "test-flowId"; + + var flow = DataFlow.Builder.newInstance() + .id(dataFlowId) + .source(DataAddress.Builder.newInstance().type("HttpData").property(EDC_NAMESPACE + "baseUrl", "http://foo.bar/").build()) + .destination(DataAddress.Builder.newInstance().type("HttpData").property(EDC_NAMESPACE + "baseUrl", "http://fizz.buzz").build()) + .traceContext(Map.of()) + .state(DataFlowStates.STARTED.code()) + .build(); + var store = runtime.getService(DataPlaneStore.class); + store.save(flow); + + var terminateMessage = Json.createObjectBuilder() + .add(TYPE, DataFlowTerminateMessage.DATA_FLOW_TERMINATE_MESSAGE_TYPE) + .add(DATA_FLOW_TERMINATE_MESSAGE_REASON, "test-reason") + .build(); + + DATAPLANE.getDataPlaneSignalingApi() + .baseRequest() + .body(terminateMessage) + .contentType(ContentType.JSON) + .post("/v1/dataflows/%s/terminate".formatted(dataFlowId)) + .then() + .log().ifError() + .statusCode(anyOf(equalTo(200), equalTo(204))); + + + assertThat(store.findById(dataFlowId)).isNotNull() + .extracting(DataFlow::getState) + .isEqualTo(DataFlowStates.TERMINATED.code()); + + } + + private DataFlowStartMessage createStartMessage(String processId) { + return DataFlowStartMessage.Builder.newInstance() + .processId(processId) + .sourceDataAddress(DataAddress.Builder.newInstance().type("HttpData").property(EDC_NAMESPACE + "baseUrl", "http://foo.bar/").build()) + .destinationDataAddress(DataAddress.Builder.newInstance().type("HttpData").property(EDC_NAMESPACE + "baseUrl", "http://fizz.buzz").build()) + .flowType(FlowType.PULL) + .participantId("some-participantId") + .assetId("test-asset") + .callbackAddress(URI.create("https://foo.bar/callback")) + .agreementId("test-agreement") + .build(); + } + + @NotNull + private Function failTest() { + return f -> new AssertionError(f.getFailureDetail()); + } +} diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataplaneEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataplaneEndToEndTest.java deleted file mode 100644 index b8edad0a10..0000000000 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataplaneEndToEndTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.test.e2e; - -import org.eclipse.edc.connector.dataplane.spi.Endpoint; -import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService; -import org.eclipse.edc.junit.annotations.EndToEndTest; -import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; -import org.eclipse.edc.spi.types.domain.DataAddress; -import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; -import org.eclipse.edc.spi.types.domain.transfer.FlowType; -import org.eclipse.edc.test.e2e.participant.DataPlaneParticipant; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import java.time.Duration; - -import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; - -@EndToEndTest -public class DataplaneEndToEndTest { - - protected static final DataPlaneParticipant DATAPLANE = DataPlaneParticipant.Builder.newInstance() - .name("provider") - .id("urn:connector:provider") - .build(); - @RegisterExtension - static EdcRuntimeExtension runtime = - new EdcRuntimeExtension( - ":system-tests:e2e-dataplane-tests:runtimes:data-plane", - "data-plane", - DATAPLANE.dataPlaneConfiguration() - ); - - protected final Duration timeout = Duration.ofSeconds(60); - - @Test - void startTransfer_httpPull() { - var generator = runtime.getContext().getService(PublicEndpointGeneratorService.class); - generator.addGeneratorFunction("HttpData", dataAddress -> Endpoint.url("http//fizz.buzz.com/bar")); - - var flowMessage = DataFlowStartMessage.Builder.newInstance() - .processId("test-processId") - .sourceDataAddress(DataAddress.Builder.newInstance().type("HttpData").property(EDC_NAMESPACE + "baseUrl", "http://foo.bar/").build()) - .destinationDataAddress(DataAddress.Builder.newInstance().type("HttpData").property(EDC_NAMESPACE + "baseUrl", "http://fizz.buzz").build()) - .flowType(FlowType.PULL) - .assetId("test-asset") - .agreementId("test-agreement") - .build(); - } -} diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java index 7fc4e78220..714ce0cda0 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java @@ -16,7 +16,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import io.restassured.http.ContentType; -import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import jakarta.json.JsonObject; import org.eclipse.edc.test.system.utils.Participant; import org.hamcrest.Matchers; import org.jetbrains.annotations.NotNull; @@ -41,6 +41,10 @@ private DataPlaneParticipant() { super(); } + public Endpoint getDataPlaneSignalingApi() { + return dataPlaneSignalingApi; + } + public Map dataPlaneConfiguration() { return new HashMap<>() { { @@ -65,7 +69,7 @@ public Map dataPlaneConfiguration() { /** * Uses the data plane's control API to initiate a transfer */ - public String initiateTransfer(DataFlowStartMessage startMessage) { + public String initiateTransfer(JsonObject startMessage) { return dataPlaneSignalingApi.baseRequest() .contentType(ContentType.JSON) .body(startMessage)