Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fetch in batches instead of using Query.max() #1110

Merged
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
release:
types: [ published ]
pull_request:
branches: [ main ]
branches: [ main, 2025-01-29-eclipse-edc-0.7.2 ]

env:
REGISTRY_URL: ghcr.io
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ please see [changelog_updates.md](docs/dev/changelog_updates.md).

- Refactoring: Config as Java Code ([#1051](https://github.com/sovity/edc-ce/pull/1051))
- Fix issues with the Create Data Offer Endpoint ([PR#1055](https://github.com/sovity/edc-ce/pull/1055))
- Fix issue when the number of items is greater than 5000

### Deployment Migration Notes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,7 @@ public static WrapperExtensionContext buildContext(
var cxDidConfigService = new CxDidConfigService(config);
var dapsConfigService = new DapsConfigService(config);
var dashboardDataFetcher = new DashboardDataFetcher(
contractNegotiationStore,
transferProcessService,
assetIndex,
policyDefinitionService,
contractDefinitionService
transferProcessService
);
var dashboardApiService = new DashboardPageApiService(
dashboardDataFetcher,
Expand Down Expand Up @@ -315,7 +311,8 @@ public static WrapperExtensionContext buildContext(
var useCaseResource = new UseCaseResourceImpl(
kpiApiService,
supportedPolicyApiService,
useCaseCatalogApiService
useCaseCatalogApiService,
dslContextFactory
);
val placeholderEndpointController = new PlaceholderEndpointController();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class UiResourceImpl implements UiResource {

@Override
public DashboardPage getDashboardPage() {
return dashboardPageApiService.dashboardPage();
return dslContextFactory.transactionResult(dashboardPageApiService::dashboardPage);
}

@Override
Expand Down Expand Up @@ -159,13 +159,13 @@ public UiContractNegotiation getContractNegotiation(String contractNegotiationId
@Override
public ContractAgreementPage getContractAgreementPage(@Nullable ContractAgreementPageQuery contractAgreementPageQuery) {
return dslContextFactory.transactionResult(dsl ->
contractAgreementApiService.contractAgreementPage(dsl, contractAgreementPageQuery));
contractAgreementApiService.contractAgreementPage(dsl, contractAgreementPageQuery));
}

@Override
public ContractAgreementCard getContractAgreementCard(String contractAgreementId) {
return dslContextFactory.transactionResult(dsl ->
contractAgreementApiService.contractAgreement(dsl, contractAgreementId));
contractAgreementApiService.contractAgreement(dsl, contractAgreementId));
}

@Override
Expand All @@ -180,12 +180,12 @@ public IdResponseDto initiateCustomTransfer(InitiateCustomTransferRequest reques

@Override
public IdResponseDto terminateContractAgreement(
String contractAgreementId,
ContractTerminationRequest contractTerminationRequest
String contractAgreementId,
ContractTerminationRequest contractTerminationRequest
) {
validate(contractTerminationRequest);
return dslContextFactory.transactionResult(dsl ->
contractAgreementTerminationApiService.terminate(dsl, contractAgreementId, contractTerminationRequest));
contractAgreementTerminationApiService.terminate(dsl, contractAgreementId, contractTerminationRequest));
}

@Override
Expand All @@ -201,18 +201,18 @@ public UiAsset getTransferProcessAsset(String transferProcessId) {
@Override
public IdAvailabilityResponse isPolicyIdAvailable(String policyId) {
return dslContextFactory.transactionResult(dsl ->
dataOfferPageApiService.checkIfPolicyIdAvailable(dsl, policyId));
dataOfferPageApiService.checkIfPolicyIdAvailable(dsl, policyId));
}

@Override
public IdAvailabilityResponse isAssetIdAvailable(String assetId) {
return dslContextFactory.transactionResult(dsl ->
dataOfferPageApiService.checkIfAssetIdAvailable(dsl, assetId));
dataOfferPageApiService.checkIfAssetIdAvailable(dsl, assetId));
}

@Override
public IdAvailabilityResponse isContractDefinitionIdAvailable(String contractDefinitionId) {
return dslContextFactory.transactionResult(dsl ->
dataOfferPageApiService.checkIfContractDefinitionIdAvailable(dsl, contractDefinitionId));
dataOfferPageApiService.checkIfContractDefinitionIdAvailable(dsl, contractDefinitionId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,20 @@

package de.sovity.edc.ext.wrapper.api.ui.pages.asset;

import de.sovity.edc.ext.db.jooq.Tables;
import de.sovity.edc.ext.wrapper.api.ServiceException;
import de.sovity.edc.ext.wrapper.api.common.mappers.AssetMapper;
import de.sovity.edc.ext.wrapper.api.common.model.UiAsset;
import de.sovity.edc.ext.wrapper.api.common.model.UiAssetCreateRequest;
import de.sovity.edc.ext.wrapper.api.common.model.UiAssetEditRequest;
import de.sovity.edc.ext.wrapper.api.ui.model.IdResponseDto;
import de.sovity.edc.ext.wrapper.api.ui.pages.dashboard.services.SelfDescriptionService;
import de.sovity.edc.ext.wrapper.utils.QueryUtils;
import lombok.RequiredArgsConstructor;
import lombok.val;
import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
import org.eclipse.edc.connector.controlplane.services.spi.asset.AssetService;
import org.eclipse.edc.spi.query.QuerySpec;
import org.jetbrains.annotations.NotNull;
import org.jooq.DSLContext;

import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -75,14 +74,10 @@ public IdResponseDto deleteAsset(String assetId) {
}

private List<Asset> getAllAssets() {
return assetService.query(QuerySpec.max()).orElseThrow(ServiceException::new).toList();
}

public boolean assetExists(DSLContext dsl, String assetId) {
val a = Tables.EDC_ASSET;
return dsl.selectCount()
.from(a)
.where(a.ASSET_ID.eq(assetId))
.fetchSingleInto(Integer.class) > 0;
return QueryUtils.fetchInBatches((offset, size) ->
assetService.search(
QuerySpec.Builder.newInstance().offset(offset).limit(size).build()
).orElseThrow(ServiceException::new)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import de.sovity.edc.ext.db.jooq.tables.records.SovityContractTerminationRecord;
import de.sovity.edc.ext.wrapper.api.ServiceException;
import de.sovity.edc.ext.wrapper.utils.MapUtils;
import de.sovity.edc.ext.wrapper.utils.QueryUtils;
import lombok.RequiredArgsConstructor;
import lombok.val;
import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
Expand All @@ -31,6 +31,7 @@
import org.jetbrains.annotations.NotNull;
import org.jooq.DSLContext;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand All @@ -55,7 +56,7 @@ public class ContractAgreementDataFetcher {
@NotNull
public List<ContractAgreementData> getContractAgreements(DSLContext dsl) {
var agreements = getAllContractAgreements();
var assets = MapUtils.associateBy(getAllAssets(), Asset::getId);
var assets = new HashMap<String, Asset>();

var negotiations = getAllContractNegotiations().stream()
.filter(it -> it.getContractAgreement() != null)
Expand All @@ -73,7 +74,12 @@ public List<ContractAgreementData> getContractAgreements(DSLContext dsl) {
.flatMap(agreement -> negotiations.getOrDefault(agreement.getId(), List.of())
.stream()
.map(negotiation -> {
var asset = getAsset(agreement, negotiation, assets::get);
var asset = getAsset(
agreement.getAssetId(),
negotiation.getType(),
(id) -> assets.computeIfAbsent(id, assetIndex::findById)
);

var contractTransfers = transfers.getOrDefault(agreement.getId(), List.of());
return new ContractAgreementData(agreement, negotiation, asset, contractTransfers, terminations.get(agreement.getId()));
}))
Expand All @@ -84,8 +90,12 @@ public List<ContractAgreementData> getContractAgreements(DSLContext dsl) {
public ContractAgreementData getContractAgreement(DSLContext dsl, String contractAgreementId) {
val agreement = getContractAgreementById(contractAgreementId);

val negotiationQuery = QuerySpec.max();
val negotiation = contractNegotiationStore.queryNegotiations(negotiationQuery)
val negotiation = QueryUtils.fetchInBatches((offset, limit) ->
contractNegotiationStore.queryNegotiations(
QuerySpec.Builder.newInstance().offset(offset).limit(limit).build())
.toList()
)
.stream()
.filter(it -> it.getContractAgreement().getId().equals(contractAgreementId))
.findFirst()
.orElseThrow(
Expand All @@ -95,7 +105,7 @@ public ContractAgreementData getContractAgreement(DSLContext dsl, String contrac

val terminations = fetchTerminations(dsl, agreement.getId());

val asset = getAsset(agreement, negotiation, (it) -> assetIndex.findById(agreement.getAssetId()));
val asset = getAsset(agreement.getAssetId(), negotiation.getType(), (it) -> assetIndex.findById(agreement.getAssetId()));

return new ContractAgreementData(
agreement,
Expand Down Expand Up @@ -128,10 +138,8 @@ private Map<String, SovityContractTerminationRecord> fetchTerminations(DSLContex
.collect(toMap(SovityContractTerminationRecord::getContractAgreementId, identity()));
}

private Asset getAsset(ContractAgreement agreement, ContractNegotiation negotiation, Function<String, Asset> selector) {
var assetId = agreement.getAssetId();

if (negotiation.getType() == ContractNegotiation.Type.CONSUMER) {
private Asset getAsset(String assetId, ContractNegotiation.Type negotiationType, Function<String, Asset> selector) {
if (negotiationType == ContractNegotiation.Type.CONSUMER) {
return dummyAsset(assetId);
}

Expand All @@ -143,22 +151,30 @@ private Asset dummyAsset(String assetId) {
return Asset.Builder.newInstance().id(assetId).build();
}

private List<Asset> getAllAssets() {
return assetIndex.queryAssets(QuerySpec.max()).toList();
}

@NotNull
private List<ContractNegotiation> getAllContractNegotiations() {
return contractNegotiationStore.queryNegotiations(QuerySpec.max()).toList();
return QueryUtils.fetchInBatches(
(offset, batchSize) -> contractNegotiationStore.queryNegotiations(
QuerySpec.Builder.newInstance().offset(offset).limit(batchSize).build()
).toList()
);
}

@NotNull
private List<ContractAgreement> getAllContractAgreements() {
return contractAgreementService.query(QuerySpec.max()).orElseThrow(ServiceException::new).toList();
return QueryUtils.fetchInBatches(
(offset, batchSize) -> contractAgreementService.search(
QuerySpec.Builder.newInstance().offset(offset).limit(batchSize).build()
).orElseThrow(ServiceException::new)
);
}

@NotNull
private List<TransferProcess> getAllTransferProcesses() {
return transferProcessService.query(QuerySpec.max()).orElseThrow(ServiceException::new).toList();
return QueryUtils.fetchInBatches(
(offset, batchSize) -> transferProcessService.search(
QuerySpec.Builder.newInstance().offset(offset).limit(batchSize).build()
).orElseThrow(ServiceException::new)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
import de.sovity.edc.ext.wrapper.api.ui.model.ContractDefinitionEntry;
import de.sovity.edc.ext.wrapper.api.ui.model.ContractDefinitionRequest;
import de.sovity.edc.ext.wrapper.api.ui.model.IdResponseDto;
import de.sovity.edc.ext.wrapper.utils.QueryUtils;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition;
import org.eclipse.edc.connector.controlplane.services.spi.contractdefinition.ContractDefinitionService;
import org.eclipse.edc.spi.entity.Entity;
import org.eclipse.edc.spi.query.QuerySpec;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.FileWriter;
import java.util.Comparator;
import java.util.List;

Expand All @@ -43,9 +41,9 @@ public List<ContractDefinitionEntry> getContractDefinitions() {
var definitions = getAllContractDefinitions();

return definitions.stream()
.sorted(Comparator.comparing(ContractDefinition::getCreatedAt).reversed())
.map(this::buildContractDefinitionEntry)
.toList();
.sorted(Comparator.comparing(ContractDefinition::getCreatedAt).reversed())
.map(this::buildContractDefinitionEntry)
.toList();
}

@NotNull
Expand All @@ -72,6 +70,13 @@ public IdResponseDto deleteContractDefinition(String contractDefinitionId) {
}

private List<ContractDefinition> getAllContractDefinitions() {
return contractDefinitionService.query(QuerySpec.max()).orElseThrow(ServiceException::new).toList();
return QueryUtils.fetchInBatches((offset, limit) ->
contractDefinitionService.search(
QuerySpec.Builder.newInstance()
.offset(offset)
.limit(limit)
.build()
).orElseThrow(ServiceException::new)
);
}
}
Loading
Loading