Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make Services return Lists and not Streams #3685

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void setUp() {
}

@Test
void query_shouldReturnOneDatasetPerAsset() {
void search_shouldReturnOneDatasetPerAsset() {
var dataService = createDataService();
var contractDefinition = contractDefinitionBuilder("definitionId").contractPolicyId("contractPolicyId").build();
var contractPolicy = Policy.Builder.newInstance().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.eclipse.edc.validator.spi.DataAddressValidatorRegistry;

import java.util.List;
import java.util.stream.Stream;

import static java.lang.String.format;

Expand Down Expand Up @@ -59,14 +58,12 @@ public Asset findById(String assetId) {
}

@Override
public ServiceResult<Stream<Asset>> query(QuerySpec query) {
var result = queryValidator.validate(query);

if (result.failed()) {
return ServiceResult.badRequest(result.getFailureMessages());
}

return ServiceResult.success(transactionContext.execute(() -> index.queryAssets(query)));
public ServiceResult<List<Asset>> search(QuerySpec query) {
return queryValidator.validate(query)
.flatMap(validation -> validation.failed()
? ServiceResult.badRequest(validation.getFailureMessages())
: ServiceResult.success(queryAssets(query))
);
}

@Override
Expand Down Expand Up @@ -128,4 +125,12 @@ public ServiceResult<Asset> update(Asset asset) {
});
}

private List<Asset> queryAssets(QuerySpec query) {
return transactionContext.execute(() -> {
try (var stream = index.queryAssets(query)) {
return stream.toList();
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.eclipse.edc.spi.types.domain.agreement.ContractAgreement;
import org.eclipse.edc.transaction.spi.TransactionContext;

import java.util.stream.Stream;
import java.util.List;

import static java.lang.String.format;
import static org.eclipse.edc.spi.query.Criterion.criterion;
Expand All @@ -45,14 +45,12 @@ public ContractAgreement findById(String contractAgreementId) {
}

@Override
public ServiceResult<Stream<ContractAgreement>> query(QuerySpec query) {
var result = queryValidator.validate(query);

if (result.failed()) {
return ServiceResult.badRequest(format("Error validating schema: %s", result.getFailureDetail()));
}

return ServiceResult.success(transactionContext.execute(() -> store.queryAgreements(query)));
public ServiceResult<List<ContractAgreement>> search(QuerySpec query) {
return queryValidator.validate(query)
.flatMap(validation -> validation.failed()
? ServiceResult.badRequest(format("Error validating schema: %s", validation.getFailureDetail()))
: ServiceResult.success(queryAgreements(query))
);
}

@Override
Expand All @@ -61,4 +59,12 @@ public ContractNegotiation findNegotiation(String contractAgreementId) {
var query = QuerySpec.Builder.newInstance().filter(criterion).build();
return transactionContext.execute(() -> store.queryNegotiations(query).findFirst().orElse(null));
}

private List<ContractAgreement> queryAgreements(QuerySpec query) {
return transactionContext.execute(() -> {
try (var stream = store.queryAgreements(query)) {
return stream.toList();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.eclipse.edc.spi.result.ServiceResult;
import org.eclipse.edc.transaction.spi.TransactionContext;

import java.util.stream.Stream;
import java.util.List;

import static java.lang.String.format;

Expand All @@ -46,13 +46,12 @@ public ContractDefinition findById(String contractDefinitionId) {
}

@Override
public ServiceResult<Stream<ContractDefinition>> query(QuerySpec query) {
var result = queryValidator.validate(query);

if (result.failed()) {
return ServiceResult.badRequest(format("Error validating schema: %s", result.getFailureDetail()));
}
return ServiceResult.success(transactionContext.execute(() -> store.findAll(query)));
public ServiceResult<List<ContractDefinition>> search(QuerySpec query) {
return queryValidator.validate(query)
.flatMap(validation -> validation.failed()
? ServiceResult.badRequest(format("Error validating schema: %s", validation.getFailureDetail()))
: ServiceResult.success(queryContractDefinitions(query))
);
}

@Override
Expand Down Expand Up @@ -88,4 +87,12 @@ public ServiceResult<ContractDefinition> delete(String contractDefinitionId) {
return serviceResult;
});
}

private List<ContractDefinition> queryContractDefinitions(QuerySpec query) {
return transactionContext.execute(() -> {
try (var stream = store.findAll(query)) {
return stream.toList();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.eclipse.edc.spi.types.domain.agreement.ContractAgreement;
import org.eclipse.edc.transaction.spi.TransactionContext;

import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

import static java.lang.String.format;
import static java.util.Optional.ofNullable;
Expand Down Expand Up @@ -57,13 +57,12 @@ public ContractNegotiation findbyId(String contractNegotiationId) {
}

@Override
public ServiceResult<Stream<ContractNegotiation>> query(QuerySpec query) {
var result = queryValidator.validate(query);

if (result.failed()) {
return ServiceResult.badRequest(format("Error validating schema: %s", result.getFailureDetail()));
}
return ServiceResult.success(transactionContext.execute(() -> store.queryNegotiations(query)));
public ServiceResult<List<ContractNegotiation>> search(QuerySpec query) {
return queryValidator.validate(query)
.flatMap(validation -> validation.failed()
? ServiceResult.badRequest(format("Error validating schema: %s", validation.getFailureDetail()))
: ServiceResult.success(queryNegotiations(query))
);
}

@Override
Expand Down Expand Up @@ -92,4 +91,12 @@ public ServiceResult<Void> terminate(TerminateNegotiationCommand command) {
return transactionContext.execute(() -> commandHandlerRegistry.execute(command).flatMap(ServiceResult::from));
}

private List<ContractNegotiation> queryNegotiations(QuerySpec query) {
return transactionContext.execute(() -> {
try (var stream = store.queryNegotiations(query)) {
return stream.toList();
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static java.lang.String.format;
import static org.eclipse.edc.spi.query.Criterion.criterion;
Expand Down Expand Up @@ -64,16 +63,14 @@ public PolicyDefinition findById(String policyId) {
}

@Override
public ServiceResult<Stream<PolicyDefinition>> query(QuerySpec query) {
var result = queryValidator.validate(query);

if (result.failed()) {
return ServiceResult.badRequest(format("Error validating schema: %s", result.getFailureDetail()));
}
return ServiceResult.success(transactionContext.execute(() -> policyStore.findAll(query)));
public ServiceResult<List<PolicyDefinition>> search(QuerySpec query) {
return queryValidator.validate(query)
.flatMap(validation -> validation.failed()
? ServiceResult.badRequest(format("Error validating schema: %s", validation.getFailureDetail()))
: ServiceResult.success(queryPolicyDefinitions(query))
);
}


@Override
public @NotNull ServiceResult<PolicyDefinition> deleteById(String policyId) {
return transactionContext.execute(() -> {
Expand Down Expand Up @@ -120,6 +117,14 @@ public ServiceResult<PolicyDefinition> update(PolicyDefinition policyDefinition)
});
}

private List<PolicyDefinition> queryPolicyDefinitions(QuerySpec query) {
return transactionContext.execute(() -> {
try (var stream = policyStore.findAll(query)) {
return stream.toList();
}
});
}

private Map<Class<?>, List<Class<?>>> getSubtypeMap() {
return Map.of(
Constraint.class, List.of(MultiplicityConstraint.class, AtomicConstraint.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import static java.lang.String.format;

Expand Down Expand Up @@ -74,13 +73,12 @@ public TransferProcessServiceImpl(TransferProcessStore transferProcessStore, Tra
}

@Override
public ServiceResult<Stream<TransferProcess>> query(QuerySpec query) {
var result = queryValidator.validate(query);

if (result.failed()) {
return ServiceResult.badRequest(format("Error validating schema: %s", result.getFailureDetail()));
}
return ServiceResult.success(transactionContext.execute(() -> transferProcessStore.findAll(query)));
public ServiceResult<List<TransferProcess>> search(QuerySpec query) {
return queryValidator.validate(query)
.flatMap(validation -> validation.failed()
? ServiceResult.badRequest(format("Error validating schema: %s", validation.getFailureDetail()))
: ServiceResult.success(queryTransferProcesses(query))
);
}

@Override
Expand Down Expand Up @@ -137,6 +135,14 @@ public ServiceResult<Void> addProvisionedResource(String transferProcessId, Prov
return transactionContext.execute(() -> commandHandlerRegistry.execute(command).flatMap(ServiceResult::from));
}

private List<TransferProcess> queryTransferProcesses(QuerySpec query) {
return transactionContext.execute(() -> {
try (var stream = transferProcessStore.findAll(query)) {
return stream.toList();
}
});
}

private Map<Class<?>, List<Class<?>>> getSubtypes() {
return Map.of(
ProvisionedResource.class, List.of(ProvisionedDataAddressResource.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,32 +90,32 @@ void findById_shouldRelyOnAssetIndex() {
}

@Test
void query_shouldRelyOnAssetIndex() {
void search_shouldRelyOnAssetIndex() {
var asset = createAsset("assetId");
when(index.queryAssets(any(QuerySpec.class))).thenReturn(Stream.of(asset));

var assets = service.query(QuerySpec.none());
var assets = service.search(QuerySpec.none());

assertThat(assets.succeeded()).isTrue();
assertThat(assets.getContent()).hasSize(1).first().matches(hasId("assetId"));
}

@ParameterizedTest
@ValueSource(strings = { Asset.PROPERTY_ID, Asset.PROPERTY_NAME, Asset.PROPERTY_DESCRIPTION, Asset.PROPERTY_VERSION, Asset.PROPERTY_CONTENT_TYPE })
void query_validFilter(String filter) {
void search_validFilter(String filter) {
var query = QuerySpec.Builder.newInstance().filter(criterion(filter, "=", "somevalue")).build();

service.query(query);
service.search(query);

verify(index).queryAssets(query);
}

@ParameterizedTest
@ArgumentsSource(InvalidFilters.class)
void query_invalidFilter(Criterion filter) {
void search_invalidFilter(Criterion filter) {
var query = QuerySpec.Builder.newInstance().filter(filter).build();

var result = service.query(query);
var result = service.search(query);

assertThat(result).isFailed().extracting(Failure::getMessages).asList().hasSize(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.types.domain.agreement.ContractAgreement;
Expand All @@ -40,9 +41,9 @@

class ContractAgreementServiceImplTest {

private final ContractNegotiationStore store = mock(ContractNegotiationStore.class);
private final ContractNegotiationStore store = mock();
private final TransactionContext transactionContext = new NoopTransactionContext();
private final ContractAgreementServiceImpl service = new ContractAgreementServiceImpl(store, transactionContext);
private final ContractAgreementService service = new ContractAgreementServiceImpl(store, transactionContext);

@Test
void findById_filtersById() {
Expand All @@ -64,11 +65,11 @@ void findById_returnsNullIfNotFound() {
}

@Test
void query_filtersBySpec() {
void search_filtersBySpec() {
var agreement = createContractAgreement("agreementId");
when(store.queryAgreements(isA(QuerySpec.class))).thenReturn(Stream.of(agreement));

var result = service.query(QuerySpec.none());
var result = service.search(QuerySpec.none());

assertThat(result.succeeded()).isTrue();
assertThat(result.getContent()).hasSize(1).first().matches(it -> it.getId().equals("agreementId"));
Expand Down
Loading
Loading