Skip to content

Commit

Permalink
feat: allow bi-directional data transfers (#4579)
Browse files Browse the repository at this point in the history
* feat: dataplane can generate responseChannel EDRs

* fix tests

* add decision-record

* minor fixes

* make response channel optional

* format, doc

* fix tests

* avoid '/', use '-' instead

* update DR
  • Loading branch information
paullatzelsperger authored Nov 4, 2024
1 parent 504cda1 commit 28b2133
Show file tree
Hide file tree
Showing 22 changed files with 552 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@

public class TransferTypeParserImpl implements TransferTypeParser {

/**
* Parses a compose transfer type string into a {@link TransferType}:
* {@code DESTTYPE-{PUSH|PULL}(-RESPONSETYPE)}, for example {@code HttpData-PULL/Websocket}
*
* @param transferType the transfer type string representation.
* @return a {@link TransferType}
*/
@Override
public Result<TransferType> parse(String transferType) {
Optional<Result<TransferType>> parsed = Optional.ofNullable(transferType)
.map(type -> type.split("-"))
.filter(tokens -> tokens.length == 2)
.map(tokens -> parseFlowType(tokens[1]).map(flowType -> new TransferType(tokens[0], flowType)));
.filter(tokens -> tokens.length >= 2)
.map(tokens -> parseFlowType(tokens[1]).map(flowType -> new TransferType(tokens[0], flowType, tokens.length > 2 ? tokens[2] : null)));

return parsed.orElse(Result.failure("Failed to extract flow type from transferType %s".formatted(transferType)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ void shouldReturnFatalError_whenFormatIsNotCorrect() {
assertThat(result).isFailed();
}

@Test
void shouldParseReturnChannel() {
var result = parser.parse("DestinationType-PUSH-BackChannelType");
assertThat(result).isSucceeded().satisfies(type -> assertThat(type.responseChannelType()).isEqualTo("BackChannelType"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Supplier;

import static java.util.Optional.ofNullable;

class PublicEndpointGeneratorServiceImpl implements PublicEndpointGeneratorService {
private final Map<String, Function<DataAddress, Endpoint>> generatorFunctions = new ConcurrentHashMap<>();
private final Map<String, Supplier<Endpoint>> responseChannelFunctions = new ConcurrentHashMap<>();

@Override
public Result<Endpoint> generateFor(String destinationType, DataAddress sourceDataAddress) {
Expand All @@ -38,13 +42,30 @@ public Result<Endpoint> generateFor(String destinationType, DataAddress sourceDa
return Result.success(endpoint);
}

@Override
public Result<Endpoint> generateResponseFor(String responseChannelType) {
var function = responseChannelFunctions.get(responseChannelType);
return ofNullable(function).map(Supplier::get).map(Result::success)
.orElseGet(() -> Result.failure("No Response Channel Endpoint generator function registered for response channel type '%s'".formatted(responseChannelType)));
}

@Override
public void addGeneratorFunction(String destinationType, Function<DataAddress, Endpoint> generatorFunction) {
generatorFunctions.put(destinationType, generatorFunction);
}

@Override
public void addGeneratorFunction(String responseChannelType, Supplier<Endpoint> generatorFunction) {
responseChannelFunctions.put(responseChannelType, generatorFunction);
}

@Override
public Set<String> supportedDestinationTypes() {
return generatorFunctions.keySet();
}

@Override
public Set<String> supportedResponseTypes() {
return responseChannelFunctions.keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.STARTED;
import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState;
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;
import static org.eclipse.edc.spi.result.Result.success;

/**
* Default data manager implementation.
Expand All @@ -65,7 +66,7 @@ public Result<Boolean> validate(DataFlowStartMessage dataRequest) {
// TODO for now no validation for pull scenario, since the transfer service registry
// is not applicable here. Probably validation only on the source part required.
if (FlowType.PULL.equals(dataRequest.getFlowType())) {
return Result.success(true);
return success(true);
} else {
var transferService = transferServiceRegistry.resolveTransferService(dataRequest);
return transferService != null ?
Expand All @@ -88,7 +89,7 @@ public Result<DataFlowResponseMessage> start(DataFlowStartMessage startMessage)

var response = switch (startMessage.getFlowType()) {
case PULL -> handlePull(startMessage, dataFlowBuilder);
case PUSH -> handlePush(dataFlowBuilder);
case PUSH -> handlePush(startMessage, dataFlowBuilder);
};

return response.onSuccess(m -> update(dataFlowBuilder.build()));
Expand Down Expand Up @@ -174,10 +175,19 @@ private Result<DataFlowResponseMessage> handlePull(DataFlowStartMessage startMes
.build());
}

private Result<DataFlowResponseMessage> handlePush(DataFlow.Builder dataFlowBuilder) {
private Result<DataFlowResponseMessage> handlePush(DataFlowStartMessage startMessage, DataFlow.Builder dataFlowBuilder) {
dataFlowBuilder.state(RECEIVED.code());

return Result.success(DataFlowResponseMessage.Builder.newInstance()
var responseChannelType = startMessage.getTransferType().responseChannelType();
if (responseChannelType != null) {
monitor.debug("PUSH dataflow with responseChannel '%s' received. Will generate data address".formatted(responseChannelType));
var result = authorizationService.createEndpointDataReference(startMessage);

return result.map(da -> DataFlowResponseMessage.Builder.newInstance()
.dataAddress(da)
.build());
}
return success(DataFlowResponseMessage.Builder.newInstance()
.dataAddress(null)
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ class PublicEndpointGeneratorServiceImplTest {

private final PublicEndpointGeneratorService generatorService = new PublicEndpointGeneratorServiceImpl();

@Test
void supportedTypes() {
generatorService.addGeneratorFunction("type", dataAddress -> new Endpoint("any", "any"));

var result = generatorService.supportedDestinationTypes();

assertThat(result).containsOnly("type");
}

@Nested
class GenerateFor {

Expand All @@ -54,12 +63,25 @@ void shouldFail_whenFunctionIsNotRegistered() {

}

@Test
void supportedTypes() {
generatorService.addGeneratorFunction("type", dataAddress -> new Endpoint("any", "any"));
@Nested
class GenerateResponseFor {

var result = generatorService.supportedDestinationTypes();
@Test
void shouldGenerateEndpointBasedOnDestinationType() {
var endpoint = new Endpoint("fizz", "bar-type");

generatorService.addGeneratorFunction("destinationType", () -> endpoint);

var result = generatorService.generateResponseFor("destinationType");

assertThat(result).isSucceeded().isEqualTo(endpoint);
}

@Test
void shouldFail_whenFunctionIsNotRegistered() {
var result = generatorService.generateResponseFor("any");
assertThat(result).isFailed();
}

assertThat(result).containsOnly("type");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,66 @@ void terminate_shouldStillTerminate_whenDataFlowHasNoSource() {
verify(store).save(argThat(f -> f.getProperties().containsKey(TERMINATION_REASON)));
}

@Test
void completed_shouldNotifyResultToControlPlane() {
var dataFlow = dataFlowBuilder().state(COMPLETED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(COMPLETED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(transferProcessApiClient.completed(any())).thenReturn(Result.success());

manager.start();

await().untilAsserted(() -> {
verify(transferProcessApiClient).completed(any());
verify(store).save(argThat(it -> it.getState() == NOTIFIED.code()));
});
}

@Test
void completed_shouldNotTransitionToNotified() {
var dataFlow = dataFlowBuilder().state(COMPLETED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(COMPLETED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(transferProcessApiClient.completed(any())).thenReturn(Result.failure(""));

manager.start();

await().untilAsserted(() -> {
verify(transferProcessApiClient).completed(any());
verify(store).save(argThat(it -> it.getState() == COMPLETED.code()));
});
}

@Test
void failed_shouldNotifyResultToControlPlane() {
var dataFlow = dataFlowBuilder().state(FAILED.code()).errorDetail("an error").build();
when(store.nextNotLeased(anyInt(), stateIs(FAILED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);

when(transferProcessApiClient.failed(any(), eq("an error"))).thenReturn(Result.success());

manager.start();

await().untilAsserted(() -> {
verify(transferProcessApiClient).failed(any(), eq("an error"));
verify(store).save(argThat(it -> it.getState() == NOTIFIED.code()));
});
}

@Test
void failed_shouldNotTransitionToNotified() {
var dataFlow = dataFlowBuilder().state(FAILED.code()).errorDetail("an error").build();
when(store.nextNotLeased(anyInt(), stateIs(FAILED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);

when(transferProcessApiClient.failed(any(), eq("an error"))).thenReturn(Result.failure("an error"));

manager.start();

await().untilAsserted(() -> {
verify(transferProcessApiClient).failed(any(), eq("an error"));
verify(store).save(argThat(it -> it.getState() == FAILED.code()));
});
}

@Nested
class Received {
@Test
Expand Down Expand Up @@ -412,88 +472,6 @@ void shouldTransitToFailedIfNoTransferServiceCanHandleStarted() {
}
}

@Test
void completed_shouldNotifyResultToControlPlane() {
var dataFlow = dataFlowBuilder().state(COMPLETED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(COMPLETED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(transferProcessApiClient.completed(any())).thenReturn(Result.success());

manager.start();

await().untilAsserted(() -> {
verify(transferProcessApiClient).completed(any());
verify(store).save(argThat(it -> it.getState() == NOTIFIED.code()));
});
}

@Test
void completed_shouldNotTransitionToNotified() {
var dataFlow = dataFlowBuilder().state(COMPLETED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(COMPLETED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(transferProcessApiClient.completed(any())).thenReturn(Result.failure(""));

manager.start();

await().untilAsserted(() -> {
verify(transferProcessApiClient).completed(any());
verify(store).save(argThat(it -> it.getState() == COMPLETED.code()));
});
}

@Test
void failed_shouldNotifyResultToControlPlane() {
var dataFlow = dataFlowBuilder().state(FAILED.code()).errorDetail("an error").build();
when(store.nextNotLeased(anyInt(), stateIs(FAILED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);

when(transferProcessApiClient.failed(any(), eq("an error"))).thenReturn(Result.success());

manager.start();

await().untilAsserted(() -> {
verify(transferProcessApiClient).failed(any(), eq("an error"));
verify(store).save(argThat(it -> it.getState() == NOTIFIED.code()));
});
}

@Test
void failed_shouldNotTransitionToNotified() {
var dataFlow = dataFlowBuilder().state(FAILED.code()).errorDetail("an error").build();
when(store.nextNotLeased(anyInt(), stateIs(FAILED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);

when(transferProcessApiClient.failed(any(), eq("an error"))).thenReturn(Result.failure("an error"));

manager.start();

await().untilAsserted(() -> {
verify(transferProcessApiClient).failed(any(), eq("an error"));
verify(store).save(argThat(it -> it.getState() == FAILED.code()));
});
}

private DataFlow.Builder dataFlowBuilder() {
return DataFlow.Builder.newInstance()
.source(DataAddress.Builder.newInstance().type("source").build())
.destination(DataAddress.Builder.newInstance().type("destination").build())
.callbackAddress(URI.create("http://any"))
.transferType(new TransferType("DestinationType", FlowType.PUSH))
.properties(Map.of("key", "value"));
}

private Criterion[] stateIs(int state) {
return aryEq(new Criterion[]{ hasState(state) });
}

private DataFlowStartMessage createRequest() {
return DataFlowStartMessage.Builder.newInstance()
.id("1")
.processId("1")
.sourceDataAddress(DataAddress.Builder.newInstance().type("type").build())
.destinationDataAddress(DataAddress.Builder.newInstance().type("type").build())
.build();
}

@Nested
class Suspend {

Expand Down Expand Up @@ -587,4 +565,26 @@ void shouldStillSuspend_whenDataFlowHasNoSource() {
}
}

private DataFlow.Builder dataFlowBuilder() {
return DataFlow.Builder.newInstance()
.source(DataAddress.Builder.newInstance().type("source").build())
.destination(DataAddress.Builder.newInstance().type("destination").build())
.callbackAddress(URI.create("http://any"))
.transferType(new TransferType("DestinationType", FlowType.PUSH))
.properties(Map.of("key", "value"));
}

private Criterion[] stateIs(int state) {
return aryEq(new Criterion[]{ hasState(state) });
}

private DataFlowStartMessage createRequest() {
return DataFlowStartMessage.Builder.newInstance()
.id("1")
.processId("1")
.sourceDataAddress(DataAddress.Builder.newInstance().type("type").build())
.destinationDataAddress(DataAddress.Builder.newInstance().type("type").build())
.build();
}

}
Loading

0 comments on commit 28b2133

Please sign in to comment.