Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement DataPlaneSignalingAPI #3951

Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions DEPENDENCIES
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
maven/mavencentral/com.apicatalog/carbon-did/0.0.2, Apache-2.0, approved, #9239

Check warning on line 1 in DEPENDENCIES

View workflow job for this annotation

GitHub Actions / check / Dash-Verify-Licenses

Restricted Dependencies found

Some dependencies are marked 'restricted' - please review them
maven/mavencentral/com.apicatalog/iron-ed25519-cryptosuite-2020/0.8.1, Apache-2.0, approved, #11157
maven/mavencentral/com.apicatalog/iron-verifiable-credentials/0.8.1, Apache-2.0, approved, #9234
maven/mavencentral/com.apicatalog/titanium-json-ld/1.0.0, Apache-2.0, approved, clearlydefined
Expand Down Expand Up @@ -182,9 +182,10 @@
maven/mavencentral/joda-time/joda-time/2.10.5, Apache-2.0, approved, clearlydefined
maven/mavencentral/junit/junit/4.13.2, EPL-2.0, approved, CQ23636
maven/mavencentral/net.bytebuddy/byte-buddy-agent/1.14.1, Apache-2.0, approved, #7164
maven/mavencentral/net.bytebuddy/byte-buddy-agent/1.14.11, Apache-2.0, approved, #7164
maven/mavencentral/net.bytebuddy/byte-buddy-agent/1.14.12, Apache-2.0, approved, #7164
maven/mavencentral/net.bytebuddy/byte-buddy/1.14.1, Apache-2.0 AND BSD-3-Clause, approved, #7163
maven/mavencentral/net.bytebuddy/byte-buddy/1.14.11, Apache-2.0 AND BSD-3-Clause, approved, #7163
maven/mavencentral/net.bytebuddy/byte-buddy/1.14.12, Apache-2.0 AND BSD-3-Clause, approved, #7163
maven/mavencentral/net.java.dev.jna/jna/5.13.0, Apache-2.0 AND LGPL-2.1-or-later, approved, #6709
maven/mavencentral/net.javacrumbs.json-unit/json-unit-core/2.36.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/net.minidev/accessors-smart/2.4.7, Apache-2.0, approved, #7515
Expand Down Expand Up @@ -321,8 +322,8 @@
maven/mavencentral/org.mock-server/mockserver-client-java/5.15.0, Apache-2.0 AND LGPL-3.0-only, approved, #9324
maven/mavencentral/org.mock-server/mockserver-core/5.15.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/org.mock-server/mockserver-netty/5.15.0, Apache-2.0, approved, #9276
maven/mavencentral/org.mockito/mockito-core/5.11.0, MIT AND (Apache-2.0 AND MIT) AND Apache-2.0, approved, #13505
maven/mavencentral/org.mockito/mockito-core/5.2.0, MIT AND (Apache-2.0 AND MIT) AND Apache-2.0, approved, #7401
maven/mavencentral/org.mockito/mockito-core/5.9.0, MIT AND (Apache-2.0 AND MIT) AND Apache-2.0, approved, #12774
maven/mavencentral/org.mockito/mockito-inline/5.2.0, MIT, approved, clearlydefined
maven/mavencentral/org.mozilla/rhino/1.7.7.2, MPL-2.0 AND BSD-3-Clause AND ISC, approved, CQ16320
maven/mavencentral/org.objenesis/objenesis/3.3, Apache-2.0, approved, clearlydefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.edc.connector.dataplane.spi.DataFlow;
import org.eclipse.edc.connector.dataplane.spi.DataFlowStates;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.spi.entity.StatefulEntity;
Expand All @@ -29,6 +30,7 @@
import org.eclipse.edc.statemachine.Processor;
import org.eclipse.edc.statemachine.ProcessorImpl;
import org.eclipse.edc.statemachine.StateMachineManager;
import org.jetbrains.annotations.Nullable;

import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -79,32 +81,36 @@ public void initiate(DataFlowStartMessage dataRequest) {
}

@Override
public DataFlowStates transferState(String processId) {
public DataFlowStates getTransferState(String processId) {
return Optional.ofNullable(store.findById(processId)).map(StatefulEntity::getState)
.map(DataFlowStates::from).orElse(null);
}

@Override
public StatusResult<Void> terminate(String dataFlowId) {
public StatusResult<Void> terminate(String dataFlowId, @Nullable String reason) {
var result = store.findByIdAndLease(dataFlowId);
if (result.succeeded()) {
var dataFlow = result.getContent();
var transferService = transferServiceRegistry.resolveTransferService(dataFlow.toRequest());
if (result.failed()) {
return StatusResult.from(result).map(it -> null);
}

if (transferService == null) {
return StatusResult.failure(FATAL_ERROR, "TransferService cannot be resolved for DataFlow %s".formatted(dataFlowId));
}
var dataFlow = result.getContent();
var transferService = transferServiceRegistry.resolveTransferService(dataFlow.toRequest());

var terminateResult = transferService.terminate(dataFlow);
if (terminateResult.failed()) {
if (transferService == null) {
return StatusResult.failure(FATAL_ERROR, "TransferService cannot be resolved for DataFlow %s".formatted(dataFlowId));
}

var terminateResult = transferService.terminate(dataFlow);
if (terminateResult.failed()) {
if (terminateResult.reason().equals(StreamFailure.Reason.NOT_FOUND)) {
monitor.warning("No source was found for DataFlow '%s'. This may indicate an inconsistent state.".formatted(dataFlowId));
} else {
return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, terminateResult.getFailureDetail()));
}
dataFlow.transitToTerminated();
store.save(dataFlow);
return StatusResult.success();
} else {
return StatusResult.from(result).map(it -> null);
}
dataFlow.transitToTerminated(reason);
store.save(dataFlow);
return StatusResult.success();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;


Expand All @@ -29,7 +27,7 @@ class PublicEndpointGeneratorServiceImplTest {

@Test
void generateFor() {
var endpoint = new Endpoint(Map.of("fizz", "buzz"), "bar-type");
var endpoint = new Endpoint("fizz", "bar-type");
generatorService.addGeneratorFunction("testtype", dataAddress -> endpoint);

assertThat(generatorService.generateFor(DataAddress.Builder.newInstance().type("testtype").build())).isSucceeded()
Expand All @@ -43,5 +41,5 @@ void generateFor_noFunction() {
.detail()
.isEqualTo("No Endpoint generator function registered for source data type 'testtype'");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class DataPlaneAuthorizationServiceImplTest {

@BeforeEach
void setup() {
when(endpointGenerator.generateFor(any())).thenReturn(Result.success(new Endpoint(Map.of("url", "http://example.com"), "https://w3id.org/idsa/v4.1/HTTP")));
when(endpointGenerator.generateFor(any())).thenReturn(Result.success(Endpoint.url("http://example.com")));
}

@Test
Expand All @@ -70,7 +70,7 @@ void createEndpointDataReference() {
assertThat(result).isSucceeded()
.satisfies(da -> {
assertThat(da.getType()).isEqualTo("https://w3id.org/idsa/v4.1/HTTP");
assertThat(da.getProperties().get("endpoint")).isEqualTo(Map.of("url", "http://example.com"));
assertThat(da.getProperties().get("endpoint")).isEqualTo("http://example.com");
assertThat(da.getProperties().get("endpointType")).isEqualTo(da.getType());
assertThat(da.getStringProperty("authorization")).isEqualTo("footoken");
});
Expand Down Expand Up @@ -98,7 +98,7 @@ void createEndpointDataReference_withAuthType() {
assertThat(result).isSucceeded()
.satisfies(da -> {
assertThat(da.getType()).isEqualTo("https://w3id.org/idsa/v4.1/HTTP");
assertThat(da.getProperties().get("endpoint")).isEqualTo(Map.of("url", "http://example.com"));
assertThat(da.getProperties().get("endpoint")).isEqualTo("http://example.com");
assertThat(da.getStringProperty("authorization")).isEqualTo("footoken");
assertThat(da.getStringProperty("authType")).isEqualTo("bearer");
assertThat(da.getStringProperty("fizz")).isEqualTo("buzz");
Expand Down Expand Up @@ -165,4 +165,4 @@ private DataFlowStartMessage.Builder createStartMessage() {
.destinationDataAddress(DataAddress.Builder.newInstance().type("test-dest").build())
.properties(Map.of("foo", "bar", "fizz", "buzz"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.dataplane.spi.DataFlow.TERMINATION_REASON;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.COMPLETED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.FAILED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.NOTIFIED;
Expand Down Expand Up @@ -124,6 +125,20 @@ void terminate_shouldTerminateDataFlow() {
verify(transferService).terminate(dataFlow);
}

@Test
void terminate_shouldTerminateDataFlow_withReason() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.terminate(any())).thenReturn(StreamResult.success());

var result = manager.terminate("dataFlowId", "test-reason");

assertThat(result).isSucceeded();
verify(store).save(argThat(d -> d.getState() == TERMINATED.code() && d.getProperties().get(TERMINATION_REASON).equals("test-reason")));
verify(transferService).terminate(dataFlow);
}

@Test
void terminate_shouldReturnFatalError_whenDataFlowDoesNotExist() {
when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.notFound("not found"));
Expand Down Expand Up @@ -172,6 +187,19 @@ void terminate_shouldReturnFatalError_whenDataFlowCannotBeTerminated() {
verify(store, never()).save(any());
}

@Test
void terminate_shouldStillTerminate_whenDataFlowHasNoSource() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.terminate(any())).thenReturn(StreamResult.notFound());

var result = manager.terminate("dataFlowId", "test-reason");

assertThat(result).isSucceeded();
verify(store).save(argThat(f -> f.getProperties().containsKey(TERMINATION_REASON)));
}

@Test
void received_shouldStartTransferTransitionAndTransitionToStarted() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import static java.lang.String.format;

@Path("/transfer")
@Consumes({MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON})
@Consumes({ MediaType.APPLICATION_JSON })
@Produces({ MediaType.APPLICATION_JSON })
public class DataPlaneControlApiController implements DataPlaneControlApi {
private final DataPlaneManager dataPlaneManager;

Expand Down Expand Up @@ -65,7 +65,7 @@ public void initiateTransfer(DataFlowStartMessage request, @Suspended AsyncRespo
@Override
@Path("/{transferProcessId}")
public DataFlowStates getTransferState(@PathParam("transferProcessId") String transferProcessId) {
return dataPlaneManager.transferState(transferProcessId);
return dataPlaneManager.getTransferState(transferProcessId);
}

@DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

package org.eclipse.edc.connector.api.signaling.configuration;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.json.Json;
import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistry;
import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistryImpl;
import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataAddressTransformer;
import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowResponseMessageTransformer;
import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataAddressTransformer;
import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowStartMessageTransformer;
import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowSuspendMessageTransformer;
import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowTerminateMessageTransformer;
import org.eclipse.edc.jsonld.spi.JsonLd;
Expand All @@ -36,6 +39,7 @@
import org.eclipse.edc.web.spi.WebService;
import org.eclipse.edc.web.spi.configuration.WebServiceConfigurer;
import org.eclipse.edc.web.spi.configuration.WebServiceSettings;
import org.jetbrains.annotations.NotNull;

import java.util.Map;

Expand Down Expand Up @@ -87,7 +91,7 @@ public void initialize(ServiceExtensionContext context) {
context.registerService(SignalingApiConfiguration.class, new SignalingApiConfiguration(webServiceConfiguration));

jsonLd.registerNamespace(ODRL_PREFIX, ODRL_SCHEMA, SIGNALING_SCOPE);
var jsonLdMapper = typeManager.getMapper(JSON_LD);
var jsonLdMapper = getJsonLdMapper();
webService.registerResource(webServiceConfiguration.getContextAlias(), new ObjectMapperProvider(jsonLdMapper));
webService.registerResource(webServiceConfiguration.getContextAlias(), new JerseyJsonLdInterceptor(jsonLd, jsonLdMapper, SIGNALING_SCOPE));
}
Expand All @@ -97,10 +101,17 @@ public SignalingApiTransformerRegistry managementApiTypeTransformerRegistry() {
var factory = Json.createBuilderFactory(Map.of());

var registry = new SignalingApiTransformerRegistryImpl(this.transformerRegistry);
registry.register(new JsonObjectToDataFlowStartMessageTransformer());
registry.register(new JsonObjectToDataFlowSuspendMessageTransformer());
registry.register(new JsonObjectToDataFlowTerminateMessageTransformer());
registry.register(new JsonObjectToDataAddressTransformer());
registry.register(new JsonObjectFromDataFlowResponseMessageTransformer(factory));
registry.register(new JsonObjectFromDataAddressTransformer(factory, getJsonLdMapper()));
return registry;
}

@NotNull
private ObjectMapper getJsonLdMapper() {
return typeManager.getMapper(JSON_LD);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ dependencies {
api(project(":spi:data-plane:data-plane-spi"))

implementation(project(":extensions:common:api:control-api-configuration"))
implementation(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-transform"))
implementation(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-api-configuration"))
implementation(libs.jakarta.rsApi)
testImplementation(libs.restAssured)
testImplementation(testFixtures(project(":extensions:common:http:jersey-core")))

}
edcBuild {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
package org.eclipse.edc.connector.dataplane.api;

import org.eclipse.edc.connector.api.signaling.configuration.SignalingApiConfiguration;
import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistry;
import org.eclipse.edc.connector.dataplane.api.controller.v1.DataPlaneSignalingApiController;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.spi.system.ServiceExtension;
Expand All @@ -34,13 +37,21 @@ public class DataPlaneSignalingApiExtension implements ServiceExtension {
@Inject
private SignalingApiConfiguration controlApiConfiguration;

@Inject
private SignalingApiTransformerRegistry transformerRegistry;
@Inject
private DataPlaneAuthorizationService authorizationService;
@Inject
private DataPlaneManager dataPlaneManager;

@Override
public String name() {
return NAME;
}

@Override
public void initialize(ServiceExtensionContext context) {
webService.registerResource(controlApiConfiguration.getContextAlias(), new DataPlaneSignalingApiController());
var controller = new DataPlaneSignalingApiController(transformerRegistry, authorizationService, dataPlaneManager, context.getMonitor().withPrefix("SignalingAPI"));
webService.registerResource(controlApiConfiguration.getContextAlias(), controller);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.json.JsonObject;
import jakarta.ws.rs.container.AsyncResponse;
import org.eclipse.edc.connector.dataplane.api.model.DataFlowState;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage;
Expand Down Expand Up @@ -51,7 +50,7 @@ public interface DataPlaneSignalingApi {
content = @Content(schema = @Schema(implementation = DataFlowResponseMessageSchema.class))),
}
)
JsonObject start(JsonObject dataFlowStartMessage, AsyncResponse response);
JsonObject start(JsonObject dataFlowStartMessage);

@Operation(description = "Get the current state of a data transfer.",
responses = {
Expand Down
Loading
Loading