Skip to content

Commit

Permalink
don't deserialize every bucket upon adding a cblock
Browse files Browse the repository at this point in the history
  • Loading branch information
thoniTUB committed Jan 23, 2025
1 parent 0a5c550 commit f46f757
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,13 @@ public class XodusStoreFactory implements StoreFactory {
private boolean loadEnvironmentWithMissingStores;

/**
* See <a href="https://github.com/ben-manes/caffeine/wiki/Specification">CaffeinSpec</a>
* Cache spec for deserialized values.
* Conquery depends currently on <code>softValues</code> to avoid data race conditions.
* So a specification must include this option.
* See <a href="https://github.com/ben-manes/caffeine/wiki/Specification">CaffeineSpec</a>
*/
@NotEmpty
@ValidCaffeineSpec
@ValidCaffeineSpec(softValue = true)
private String caffeineSpec = "softValues";

private boolean loadStoresOnStart = false;
Expand Down Expand Up @@ -198,6 +201,11 @@ public class XodusStoreFactory implements StoreFactory {
@JacksonInject(useInput = OptBoolean.FALSE)
private transient MetricRegistry metricRegistry;

@JsonIgnore
private CaffeineSpec getCaffeineSpecParsed() {
return CaffeineSpec.parse(getCaffeineSpec());
}

@Override
public Collection<NamespaceStorage> discoverNamespaceStorages() {
return loadNamespacedStores("dataset_", (storePath) -> new NamespaceStorage(this, storePath), NAMESPACE_STORES);
Expand Down Expand Up @@ -301,7 +309,7 @@ public <KEY, VALUE> Store<KEY, VALUE> createStore(Environment environment, Valid
getUnreadableDataDumpDirectory(),
getReaderExecutorService()
),
CaffeineSpec.parse(getCaffeineSpec()),
getCaffeineSpecParsed(),
metricRegistry
);
}
Expand Down Expand Up @@ -429,7 +437,7 @@ public SingletonStore<EntityIdMap> createIdMappingStore(String pathName, ObjectM

openStoresInEnv.put(bigStore.getDataXodusStore().getEnvironment(), bigStore.getDataXodusStore());
openStoresInEnv.put(bigStore.getMetaXodusStore().getEnvironment(), bigStore.getMetaXodusStore());
return new SingletonStore<>(bigStore);
return new SingletonStore<>(new CachedStore<>(bigStore, getCaffeineSpecParsed(), getMetricRegistry()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void fullUpdate() {
}

log.warn("CBlock[{}] missing in Storage. Queuing recalculation", cBlockId);
job.addCBlock(bucketId.resolve(), connector);
job.addCBlock(bucketId, connector);
}));

}
Expand Down Expand Up @@ -205,7 +205,7 @@ public void addBucket(Bucket bucket) {
.flatMap(concept -> concept.getConnectors().stream())
.filter(connector -> connector.getResolvedTableId().equals(bucket.getTable()))
.filter(connector -> !hasCBlock(new CBlockId(bucket.getId(), connector.getId())))
.forEach(connector -> job.addCBlock(bucket, (ConceptTreeConnector) connector));
.forEach(connector -> job.addCBlock(bucket.getId(), (ConceptTreeConnector) connector));
}

jobManager.addSlowJob(job);
Expand Down Expand Up @@ -363,10 +363,10 @@ public void addConcept(Concept<?> concept) {

for (ConceptTreeConnector connector : ((TreeConcept) concept).getConnectors()) {

try(Stream<Bucket> allBuckets = storage.getAllBuckets()) {
try(Stream<BucketId> allBuckets = storage.getAllBucketIds()) {
allBuckets
.filter(bucket -> bucket.getTable().equals(connector.getResolvedTableId()))
.filter(bucket -> !hasCBlock(new CBlockId(bucket.getId(), connector.getId())))
.filter(bucketId -> bucketId.getImp().getTable().equals(connector.getResolvedTableId()))
.filter(bucketId -> !hasCBlock(new CBlockId(bucketId, connector.getId())))
.forEach(bucket -> job.addCBlock(bucket, connector));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ public static long estimateMemoryBytes(long entities, long entries, double depth
);
}

public static CBlock createCBlock(ConceptTreeConnector connector, Bucket bucket, BucketManager bucketManager) {
public static CBlock createCBlock(ConceptTreeConnector connector, BucketId bucketId, BucketManager bucketManager) {
final int bucketSize = bucketManager.getEntityBucketSize();
final int root = bucket.getBucket() * bucketSize;
final int root = bucketId.getBucket() * bucketSize;

Bucket bucket = bucketId.resolve();

final int[][] mostSpecificChildren = calculateSpecificChildrenPaths(bucket, connector, bucketManager);
//TODO Object2LongMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

import com.bakdata.conquery.io.storage.WorkerStorage;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeConnector;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.events.BucketManager;
import com.bakdata.conquery.models.events.CBlock;
import com.bakdata.conquery.models.identifiable.ids.specific.BucketId;
import com.bakdata.conquery.models.identifiable.ids.specific.CBlockId;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -40,12 +40,12 @@ public class CalculateCBlocksJob extends Job {
private final BucketManager bucketManager;
private final ExecutorService executorService;

public void addCBlock(Bucket bucket, ConceptTreeConnector connector) {
tasks.add(createInformationProcessor(connector, bucket));
public void addCBlock(BucketId bucketId, ConceptTreeConnector connector) {
tasks.add(createInformationProcessor(connector, bucketId));
}

private CalculationInformationProcessor createInformationProcessor(ConceptTreeConnector connector, Bucket bucket) {
return new CalculationInformationProcessor(connector, bucket, bucketManager, storage);
private CalculationInformationProcessor createInformationProcessor(ConceptTreeConnector connector, BucketId bucketId) {
return new CalculationInformationProcessor(connector, bucketId, bucketManager, storage);
}

@Override
Expand Down Expand Up @@ -105,7 +105,7 @@ public boolean isEmpty() {
@ToString(onlyExplicitlyIncluded = true)
private static class CalculationInformationProcessor implements Runnable {
private final ConceptTreeConnector connector;
private final Bucket bucket;
private final BucketId bucketId;

private final BucketManager bucketManager;
private final WorkerStorage storage;
Expand All @@ -120,7 +120,7 @@ public void run() {

log.trace("BEGIN calculating CBlock for {}", getCBlockId());

final CBlock cBlock = CBlock.createCBlock(getConnector(), getBucket(), bucketManager);
final CBlock cBlock = CBlock.createCBlock(getConnector(), getBucketId(), bucketManager);

log.trace("DONE calculating CBlock for {}", getCBlockId());

Expand All @@ -134,7 +134,7 @@ public void run() {

@ToString.Include
public CBlockId getCBlockId() {
return new CBlockId(getBucket().getId(), getConnector().getId());
return new CBlockId(getBucketId(), getConnector().getId());
}

}
Expand Down

0 comments on commit f46f757

Please sign in to comment.