From 1d74c05fff2cdbf1194a50de8aa5f3068af55a91 Mon Sep 17 00:00:00 2001 From: Andrii Yurkevych Date: Thu, 28 Nov 2024 08:31:49 +0000 Subject: [PATCH] fix: Fatal error while resume PULL transfer from provider (#4647) * fix: DOS-1413 error while resume PULL transfer from provider * fix: DOS-1413 error while resume PULL transfer from provider tests improvement * fix: DOS-1413 error while resume PULL transfer from provider tests improvement * fix: unit test shouldResumeTransfer_whenDataPlaneRestarts --- .../test/system/utils/Participant.java | 5 ++ .../transfer/spi/types/TransferProcess.java | 2 +- .../spi/types/TransferProcessTest.java | 22 ++++++- .../test/e2e/TransferEndToEndTestBase.java | 11 ---- .../test/e2e/TransferPullEndToEndTest.java | 59 ++++++++++++++++--- .../test/e2e/TransferPushEndToEndTest.java | 4 +- .../e2e/TransferStreamingEndToEndTest.java | 10 ++-- 7 files changed, 83 insertions(+), 30 deletions(-) diff --git a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java index f7f336ecb9f..d05c6e59bde 100644 --- a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java +++ b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java @@ -23,6 +23,7 @@ import org.eclipse.edc.connector.controlplane.catalog.spi.Dataset; import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition; import org.eclipse.edc.connector.controlplane.policy.spi.PolicyDefinition; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.jsonld.TitaniumJsonLd; import org.eclipse.edc.jsonld.spi.JsonLd; import org.eclipse.edc.jsonld.util.JacksonJsonLd; @@ -498,6 +499,10 @@ public String getContractNegotiationState(String id) { return getContractNegotiationField(id, "state"); } + public void awaitTransferToBeInState(String transferProcessId, TransferProcessStates state) { + await().atMost(timeout).until(() -> getTransferProcessState(transferProcessId), it -> Objects.equals(it, state.name())); + } + protected String getContractNegotiationField(String negotiationId, String fieldName) { return managementEndpoint.baseRequest() .contentType(JSON) diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java index 189013c00c8..55ada0187d4 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java @@ -288,7 +288,7 @@ public void transitionStarting() { } public boolean canBeStartedConsumer() { - return currentStateIsOneOf(STARTED, REQUESTED, STARTING, RESUMED); + return currentStateIsOneOf(STARTED, REQUESTED, STARTING, RESUMED, SUSPENDED); } public void transitionStarted(String dataPlaneId) { diff --git a/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessTest.java b/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessTest.java index 7871d8a3d96..66388ff9bc2 100644 --- a/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessTest.java +++ b/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessTest.java @@ -31,6 +31,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess.Type.CONSUMER; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATING; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -98,8 +99,11 @@ void verifyConsumerTransitions() { assertThrows(IllegalStateException.class, process::transitionStarting, "STARTING is not a valid state for consumer"); process.transitionStarted("dataPlaneId"); - // should not set the data plane id - assertThat(process.getDataPlaneId()).isNull(); + + process.transitionSuspending("suspension"); + process.transitionSuspended(); + + process.transitionStarted("dataPlaneId"); process.transitionCompleting(); process.transitionCompleted(); @@ -108,6 +112,20 @@ void verifyConsumerTransitions() { process.transitionDeprovisioned(); } + @ParameterizedTest + @EnumSource(value = TransferProcessStates.class, mode = INCLUDE, names = { "STARTING", "SUSPENDED" }) + void shouldNotSetDataPlaneIdOnStart_whenTransferIsConsumer(TransferProcessStates fromState) { + var process = TransferProcess.Builder.newInstance() + .id(UUID.randomUUID().toString()).type(CONSUMER) + .state(fromState.code()) + .build(); + + process.transitionStarted("dataPlaneId"); + + assertThat(process.stateAsString()).isEqualTo(STARTED.name()); + assertThat(process.getDataPlaneId()).isNull(); + } + @Test void verifyProviderTransitions() { var process = TransferProcess.Builder.newInstance().id(UUID.randomUUID().toString()).type(TransferProcess.Type.PROVIDER).build(); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java index e3cdf7c161e..3036541641b 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java @@ -15,17 +15,14 @@ package org.eclipse.edc.test.e2e; import jakarta.json.JsonObject; -import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.spi.security.Vault; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import java.time.Duration; import java.util.Map; -import java.util.Objects; import java.util.UUID; -import static org.awaitility.Awaitility.await; import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy; import static org.eclipse.edc.junit.testfixtures.TestUtils.getResourceFileContentAsString; @@ -70,12 +67,4 @@ protected void createResourcesOnProvider(String assetId, Map dat PROVIDER.createAsset(assetId, Map.of("description", "description"), dataAddressProperties); PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), noConstraintPolicyId, noConstraintPolicyId); } - - protected void awaitTransferToBeInState(String transferProcessId, TransferProcessStates state) { - await().atMost(timeout).until( - () -> CONSUMER.getTransferProcessState(transferProcessId), - it -> Objects.equals(it, state.name()) - ); - } - } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index 4178e1ae2f3..413bf892066 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -127,7 +127,7 @@ void httpPull_dataTransfer_withCallbacks() { .withCallbacks(callbacks) .execute(); - awaitTransferToBeInState(transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); await().atMost(timeout).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull()); @@ -150,7 +150,7 @@ void httpPull_dataTransfer_withEdrCache() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); @@ -168,7 +168,7 @@ void httpPull_dataTransfer_withEdrCache() { } @Test - void suspendAndResume_httpPull_dataTransfer_withEdrCache() { + void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data")); var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, httpSourceDataAddress()); @@ -177,7 +177,7 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); @@ -186,7 +186,7 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() { CONSUMER.suspendTransfer(transferProcessId, "supension"); - awaitTransferToBeInState(transferProcessId, SUSPENDED); + CONSUMER.awaitTransferToBeInState(transferProcessId, SUSPENDED); // checks that the EDR is gone once the transfer has been suspended await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId))); @@ -196,7 +196,7 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() { CONSUMER.resumeTransfer(transferProcessId); // check that transfer is available again - awaitTransferToBeInState(transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); var secondMessage = UUID.randomUUID().toString(); await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); @@ -204,6 +204,47 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() { providerDataSource.verify(HttpRequest.request("/source").withMethod("GET")); } + @Test + void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { + providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data")); + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, httpSourceDataAddress()); + + var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) + .withTransferType("HttpData-PULL") + .execute(); + + CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED); + + var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); + + var msg = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); + + var providerTransferProcessId = PROVIDER.getTransferProcesses().stream() + .filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)) + .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow(); + + PROVIDER.suspendTransfer(providerTransferProcessId, "supension"); + + PROVIDER.awaitTransferToBeInState(providerTransferProcessId, SUSPENDED); + + // checks that the EDR is gone once the transfer has been suspended + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId))); + // checks that transfer fails + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")))); + + PROVIDER.resumeTransfer(providerTransferProcessId); + + // check that transfer is available again + PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED); + var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); + var secondMessage = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); + + providerDataSource.verify(HttpRequest.request("/source").withMethod("GET")); + } + @Test void pullFromHttp_httpProvision() { providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data")); @@ -222,7 +263,7 @@ void pullFromHttp_httpProvision() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); await().atMost(timeout).untilAsserted(() -> { var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); @@ -246,7 +287,7 @@ void shouldTerminateTransfer_whenContractExpires_fixedInForcePeriod() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(transferProcessId, TERMINATED); + CONSUMER.awaitTransferToBeInState(transferProcessId, TERMINATED); } @Test @@ -261,7 +302,7 @@ void shouldTerminateTransfer_whenContractExpires_durationInForcePeriod() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(transferProcessId, TERMINATED); + CONSUMER.awaitTransferToBeInState(transferProcessId, TERMINATED); } public JsonObject createCallback(String url, boolean transactional, Set events) { diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java index 4ee160a7223..ae400fe707d 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java @@ -82,7 +82,7 @@ void httpPushDataTransfer() { .withDestination(httpDataAddress("http://localhost:" + consumerDataDestination.getPort() + "/destination")) .withTransferType("HttpData-PUSH").execute(); - awaitTransferToBeInState(transferProcessId, COMPLETED); + CONSUMER.awaitTransferToBeInState(transferProcessId, COMPLETED); providerDataSource.verify(HttpRequest.request("/source").withMethod("GET")); consumerDataDestination.verify(HttpRequest.request("/destination").withBody(BinaryBody.binary("data".getBytes()))); @@ -109,7 +109,7 @@ void httpToHttp_oauth2Provisioning() { .withDestination(httpDataAddress("http://localhost:" + consumerDataDestination.getPort() + "/destination")) .withTransferType("HttpData-PUSH").execute(); - awaitTransferToBeInState(transferProcessId, COMPLETED); + CONSUMER.awaitTransferToBeInState(transferProcessId, COMPLETED); oauth2server.verify(HttpRequest.request("/token").withBody("grant_type=client_credentials&client_secret=supersecret&client_id=clientId")); providerDataSource.verify(HttpRequest.request("/source").withMethod("GET").withHeader("Authorization", "Bearer token")); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java index b057f1bfdfe..e169dfb6093 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java @@ -146,7 +146,7 @@ void shouldResumeTransfer_whenDataPlaneRestarts() { PROVIDER_DATA_PLANE_RUNTIME.boot(false); - awaitTransferToBeInState(transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); assertMessagesAreSentTo(consumer); } } @@ -189,7 +189,7 @@ void kafkaToHttpTransfer() { destinationServer.verify(request, atLeast(1)); }); - awaitTransferToBeInState(transferProcessId, TERMINATED); + CONSUMER.awaitTransferToBeInState(transferProcessId, TERMINATED); destinationServer.clear(request) .when(request).respond(response()); @@ -218,7 +218,7 @@ void kafkaToKafkaTransfer() { .withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute(); assertMessagesAreSentTo(consumer); - awaitTransferToBeInState(transferProcessId, TERMINATED); + CONSUMER.awaitTransferToBeInState(transferProcessId, TERMINATED); assertNoMoreMessagesAreSentTo(consumer); } } @@ -236,11 +236,11 @@ void shouldSuspendAndResumeTransfer() { assertMessagesAreSentTo(consumer); CONSUMER.suspendTransfer(transferProcessId, "any kind of reason"); - awaitTransferToBeInState(transferProcessId, SUSPENDED); + CONSUMER.awaitTransferToBeInState(transferProcessId, SUSPENDED); assertNoMoreMessagesAreSentTo(consumer); CONSUMER.resumeTransfer(transferProcessId); - awaitTransferToBeInState(transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); assertMessagesAreSentTo(consumer); } }