From f46f75750bf3e20a9513ef7f07e7eaccc3236659 Mon Sep 17 00:00:00 2001
From: Max Thonagel <12283268+thoniTUB@users.noreply.github.com>
Date: Thu, 23 Jan 2025 10:54:21 +0100
Subject: [PATCH] don't deserialize every bucket upon adding a cblock
---
.../models/config/XodusStoreFactory.java | 16 ++++++++++++----
.../conquery/models/events/BucketManager.java | 10 +++++-----
.../bakdata/conquery/models/events/CBlock.java | 6 ++++--
.../models/jobs/CalculateCBlocksJob.java | 16 ++++++++--------
4 files changed, 29 insertions(+), 19 deletions(-)
diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java b/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java
index 178b2f1da3..36ccea46b2 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java
@@ -167,10 +167,13 @@ public class XodusStoreFactory implements StoreFactory {
private boolean loadEnvironmentWithMissingStores;
/**
- * See CaffeinSpec
+ * Cache spec for deserialized values.
+ * Conquery depends currently on softValues
to avoid data race conditions.
+ * So a specification must include this option.
+ * See CaffeineSpec
*/
@NotEmpty
- @ValidCaffeineSpec
+ @ValidCaffeineSpec(softValue = true)
private String caffeineSpec = "softValues";
private boolean loadStoresOnStart = false;
@@ -198,6 +201,11 @@ public class XodusStoreFactory implements StoreFactory {
@JacksonInject(useInput = OptBoolean.FALSE)
private transient MetricRegistry metricRegistry;
+ @JsonIgnore
+ private CaffeineSpec getCaffeineSpecParsed() {
+ return CaffeineSpec.parse(getCaffeineSpec());
+ }
+
@Override
public Collection discoverNamespaceStorages() {
return loadNamespacedStores("dataset_", (storePath) -> new NamespaceStorage(this, storePath), NAMESPACE_STORES);
@@ -301,7 +309,7 @@ public Store createStore(Environment environment, Valid
getUnreadableDataDumpDirectory(),
getReaderExecutorService()
),
- CaffeineSpec.parse(getCaffeineSpec()),
+ getCaffeineSpecParsed(),
metricRegistry
);
}
@@ -429,7 +437,7 @@ public SingletonStore createIdMappingStore(String pathName, ObjectM
openStoresInEnv.put(bigStore.getDataXodusStore().getEnvironment(), bigStore.getDataXodusStore());
openStoresInEnv.put(bigStore.getMetaXodusStore().getEnvironment(), bigStore.getMetaXodusStore());
- return new SingletonStore<>(bigStore);
+ return new SingletonStore<>(new CachedStore<>(bigStore, getCaffeineSpecParsed(), getMetricRegistry()));
}
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java
index 3b9c0edba0..4729463568 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java
@@ -167,7 +167,7 @@ public void fullUpdate() {
}
log.warn("CBlock[{}] missing in Storage. Queuing recalculation", cBlockId);
- job.addCBlock(bucketId.resolve(), connector);
+ job.addCBlock(bucketId, connector);
}));
}
@@ -205,7 +205,7 @@ public void addBucket(Bucket bucket) {
.flatMap(concept -> concept.getConnectors().stream())
.filter(connector -> connector.getResolvedTableId().equals(bucket.getTable()))
.filter(connector -> !hasCBlock(new CBlockId(bucket.getId(), connector.getId())))
- .forEach(connector -> job.addCBlock(bucket, (ConceptTreeConnector) connector));
+ .forEach(connector -> job.addCBlock(bucket.getId(), (ConceptTreeConnector) connector));
}
jobManager.addSlowJob(job);
@@ -363,10 +363,10 @@ public void addConcept(Concept> concept) {
for (ConceptTreeConnector connector : ((TreeConcept) concept).getConnectors()) {
- try(Stream allBuckets = storage.getAllBuckets()) {
+ try(Stream allBuckets = storage.getAllBucketIds()) {
allBuckets
- .filter(bucket -> bucket.getTable().equals(connector.getResolvedTableId()))
- .filter(bucket -> !hasCBlock(new CBlockId(bucket.getId(), connector.getId())))
+ .filter(bucketId -> bucketId.getImp().getTable().equals(connector.getResolvedTableId()))
+ .filter(bucketId -> !hasCBlock(new CBlockId(bucketId, connector.getId())))
.forEach(bucket -> job.addCBlock(bucket, connector));
}
}
diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java b/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java
index 692c8a743d..d0d64cbd7e 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java
@@ -91,9 +91,11 @@ public static long estimateMemoryBytes(long entities, long entries, double depth
);
}
- public static CBlock createCBlock(ConceptTreeConnector connector, Bucket bucket, BucketManager bucketManager) {
+ public static CBlock createCBlock(ConceptTreeConnector connector, BucketId bucketId, BucketManager bucketManager) {
final int bucketSize = bucketManager.getEntityBucketSize();
- final int root = bucket.getBucket() * bucketSize;
+ final int root = bucketId.getBucket() * bucketSize;
+
+ Bucket bucket = bucketId.resolve();
final int[][] mostSpecificChildren = calculateSpecificChildrenPaths(bucket, connector, bucketManager);
//TODO Object2LongMap
diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java
index 270230a145..3ae9fe79a6 100644
--- a/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java
+++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java
@@ -10,9 +10,9 @@
import com.bakdata.conquery.io.storage.WorkerStorage;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeConnector;
-import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.events.BucketManager;
import com.bakdata.conquery.models.events.CBlock;
+import com.bakdata.conquery.models.identifiable.ids.specific.BucketId;
import com.bakdata.conquery.models.identifiable.ids.specific.CBlockId;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -40,12 +40,12 @@ public class CalculateCBlocksJob extends Job {
private final BucketManager bucketManager;
private final ExecutorService executorService;
- public void addCBlock(Bucket bucket, ConceptTreeConnector connector) {
- tasks.add(createInformationProcessor(connector, bucket));
+ public void addCBlock(BucketId bucketId, ConceptTreeConnector connector) {
+ tasks.add(createInformationProcessor(connector, bucketId));
}
- private CalculationInformationProcessor createInformationProcessor(ConceptTreeConnector connector, Bucket bucket) {
- return new CalculationInformationProcessor(connector, bucket, bucketManager, storage);
+ private CalculationInformationProcessor createInformationProcessor(ConceptTreeConnector connector, BucketId bucketId) {
+ return new CalculationInformationProcessor(connector, bucketId, bucketManager, storage);
}
@Override
@@ -105,7 +105,7 @@ public boolean isEmpty() {
@ToString(onlyExplicitlyIncluded = true)
private static class CalculationInformationProcessor implements Runnable {
private final ConceptTreeConnector connector;
- private final Bucket bucket;
+ private final BucketId bucketId;
private final BucketManager bucketManager;
private final WorkerStorage storage;
@@ -120,7 +120,7 @@ public void run() {
log.trace("BEGIN calculating CBlock for {}", getCBlockId());
- final CBlock cBlock = CBlock.createCBlock(getConnector(), getBucket(), bucketManager);
+ final CBlock cBlock = CBlock.createCBlock(getConnector(), getBucketId(), bucketManager);
log.trace("DONE calculating CBlock for {}", getCBlockId());
@@ -134,7 +134,7 @@ public void run() {
@ToString.Include
public CBlockId getCBlockId() {
- return new CBlockId(getBucket().getId(), getConnector().getId());
+ return new CBlockId(getBucketId(), getConnector().getId());
}
}