diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java index 0b3e4ac643..7c727207d1 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java @@ -162,7 +162,7 @@ private void disconnectFromCluster() { @NotNull private NioSocketConnector getClusterConnector() { - ObjectMapper om = internalMapperFactory.createShardCommunicationMapper(); + ObjectMapper om = internalMapperFactory.createShardCommunicationMapper(workers); return config.getCluster().getClusterConnector(om, this, "Shard"); } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java index ffb80ce5f1..846cf603e4 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java @@ -1,5 +1,7 @@ package com.bakdata.conquery.mode.cluster; +import jakarta.validation.Validator; + import com.bakdata.conquery.io.jackson.Jackson; import com.bakdata.conquery.io.jackson.MutableInjectableValues; import com.bakdata.conquery.io.jackson.View; @@ -9,15 +11,17 @@ import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.identifiable.ids.IIdInterner; import com.bakdata.conquery.models.worker.DatasetRegistry; +import com.bakdata.conquery.models.worker.ShardWorkers; import com.fasterxml.jackson.databind.DeserializationConfig; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationConfig; -import jakarta.validation.Validator; public record InternalMapperFactory(ConqueryConfig config, Validator validator) { - public ObjectMapper createShardCommunicationMapper() { - return createInternalObjectMapper(View.InternalCommunication.class); + public ObjectMapper createShardCommunicationMapper(ShardWorkers workers) { + ObjectMapper objectMapper = createInternalObjectMapper(View.InternalCommunication.class); + workers.injectInto(objectMapper); + return objectMapper; } /** diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java index 02b1e6a598..6bea91691a 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java @@ -175,6 +175,10 @@ public void addBucket(Bucket bucket) { .filter(connector -> !hasCBlock(new CBlockId(bucket.getId(), connector.getId()))) .forEach(connector -> job.addCBlock(bucket, (ConceptTreeConnector) connector)); + if (job.isEmpty()){ + return; + } + jobManager.addSlowJob(job); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java b/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java index c118778fb8..4220d5f7e9 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/ForwardToWorker.java @@ -1,27 +1,21 @@ package com.bakdata.conquery.models.messages.network.specific; -import java.io.IOException; import java.util.Objects; import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; -import com.bakdata.conquery.models.jobs.SimpleJob; import com.bakdata.conquery.models.messages.SlowMessage; import com.bakdata.conquery.models.messages.namespaces.WorkerMessage; import com.bakdata.conquery.models.messages.network.MessageToShardNode; import com.bakdata.conquery.models.messages.network.NetworkMessage; import com.bakdata.conquery.models.messages.network.NetworkMessageContext.ShardNodeNetworkContext; import com.bakdata.conquery.models.worker.Worker; -import com.bakdata.conquery.util.io.ConqueryMDC; import com.bakdata.conquery.util.progressreporter.ProgressReporter; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; import lombok.AccessLevel; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; -import lombok.SneakyThrows; import lombok.ToString; /** @@ -31,51 +25,29 @@ @CPSType(id = "FORWARD_TO_WORKER", base = NetworkMessage.class) @RequiredArgsConstructor(access = AccessLevel.PROTECTED) @Getter -@ToString(of = {"workerId", "text"}) +@ToString(of = {"workerId", "message"}) public class ForwardToWorker extends MessageToShardNode implements SlowMessage { private final WorkerId workerId; - private final byte[] messageRaw; - // We cache these on the sender side. + private final WorkerMessage message; + @Getter(onMethod_ = @JsonIgnore(false)) private final boolean slowMessage; - private final String text; + @JsonIgnore @Setter private ProgressReporter progressReporter; - public static ForwardToWorker create(WorkerId worker, WorkerMessage message, ObjectWriter writer) { - return new ForwardToWorker( - worker, - serializeMessage(message, writer), - true, - message.toString() - ); - } - - @SneakyThrows(IOException.class) - private static byte[] serializeMessage(WorkerMessage message, ObjectWriter writer) { - return writer.writeValueAsBytes(message); - } - - private static WorkerMessage deserializeMessage(byte[] messageRaw, ObjectMapper mapper) throws java.io.IOException { - return mapper.readerFor(WorkerMessage.class).readValue(messageRaw); + public static ForwardToWorker create(WorkerId worker, WorkerMessage message) { + return new ForwardToWorker(worker, message, true); } @Override public void react(ShardNodeNetworkContext context) throws Exception { final Worker worker = Objects.requireNonNull(context.getWorkers().getWorker(workerId)); - ConqueryMDC.setLocation(worker.toString()); - - - // Jobception: this is to ensure that no subsequent message is deserialized before one message is processed - worker.getJobManager().addSlowJob(new SimpleJob("Process %s".formatted(getText()), () -> { - - final WorkerMessage message = deserializeMessage(messageRaw, worker.getCommunicationMapper()); - message.setProgressReporter(progressReporter); - message.react(worker); - })); + getMessage().setProgressReporter(progressReporter); + getMessage().react(worker); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java index 8eb023e427..a6b3b48491 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java @@ -1,5 +1,8 @@ package com.bakdata.conquery.models.worker; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; + import com.bakdata.conquery.io.mina.MessageSender; import com.bakdata.conquery.models.identifiable.NamedImpl; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; @@ -10,8 +13,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.ObjectWriter; import it.unimi.dsi.fastutil.ints.IntArraySet; -import jakarta.validation.constraints.Min; -import jakarta.validation.constraints.NotNull; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -54,6 +55,6 @@ public ShardNodeInformation getMessageParent() { @Override public MessageToShardNode transform(WorkerMessage message) { - return ForwardToWorker.create(getId(), message, communicationWriter); + return ForwardToWorker.create(getId(), message); } }