Skip to content

Commit

Permalink
Fix (de-) serialization performance of ForwardToWorkerMessage (#3663)
Browse files Browse the repository at this point in the history
* removes pre-serialization of ForwardToWorker as that was a tool to avoid timing issues with NsIdReferences that are no longer relevant.
* dont add empty CBlocksJobJobs
  • Loading branch information
awildturtok authored Feb 6, 2025
1 parent 6de5a95 commit 5daa113
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void disconnectFromCluster() {

@NotNull
private NioSocketConnector getClusterConnector() {
ObjectMapper om = internalMapperFactory.createShardCommunicationMapper();
ObjectMapper om = internalMapperFactory.createShardCommunicationMapper(workers);

return config.getCluster().getClusterConnector(om, this, "Shard");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.bakdata.conquery.mode.cluster;

import jakarta.validation.Validator;

import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.jackson.MutableInjectableValues;
import com.bakdata.conquery.io.jackson.View;
Expand All @@ -9,15 +11,17 @@
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.identifiable.ids.IIdInterner;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationConfig;
import jakarta.validation.Validator;

public record InternalMapperFactory(ConqueryConfig config, Validator validator) {

public ObjectMapper createShardCommunicationMapper() {
return createInternalObjectMapper(View.InternalCommunication.class);
public ObjectMapper createShardCommunicationMapper(ShardWorkers workers) {
ObjectMapper objectMapper = createInternalObjectMapper(View.InternalCommunication.class);
workers.injectInto(objectMapper);
return objectMapper;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ public void addBucket(Bucket bucket) {
.filter(connector -> !hasCBlock(new CBlockId(bucket.getId(), connector.getId())))
.forEach(connector -> job.addCBlock(bucket, (ConceptTreeConnector) connector));

if (job.isEmpty()){
return;
}

jobManager.addSlowJob(job);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,21 @@
package com.bakdata.conquery.models.messages.network.specific;

import java.io.IOException;
import java.util.Objects;

import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId;
import com.bakdata.conquery.models.jobs.SimpleJob;
import com.bakdata.conquery.models.messages.SlowMessage;
import com.bakdata.conquery.models.messages.namespaces.WorkerMessage;
import com.bakdata.conquery.models.messages.network.MessageToShardNode;
import com.bakdata.conquery.models.messages.network.NetworkMessage;
import com.bakdata.conquery.models.messages.network.NetworkMessageContext.ShardNodeNetworkContext;
import com.bakdata.conquery.models.worker.Worker;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.bakdata.conquery.util.progressreporter.ProgressReporter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.ToString;

/**
Expand All @@ -31,51 +25,29 @@
@CPSType(id = "FORWARD_TO_WORKER", base = NetworkMessage.class)
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
@ToString(of = {"workerId", "text"})
@ToString(of = {"workerId", "message"})
public class ForwardToWorker extends MessageToShardNode implements SlowMessage {

private final WorkerId workerId;
private final byte[] messageRaw;
// We cache these on the sender side.
private final WorkerMessage message;

@Getter(onMethod_ = @JsonIgnore(false))
private final boolean slowMessage;
private final String text;

@JsonIgnore
@Setter
private ProgressReporter progressReporter;

public static ForwardToWorker create(WorkerId worker, WorkerMessage message, ObjectWriter writer) {
return new ForwardToWorker(
worker,
serializeMessage(message, writer),
true,
message.toString()
);
}

@SneakyThrows(IOException.class)
private static byte[] serializeMessage(WorkerMessage message, ObjectWriter writer) {
return writer.writeValueAsBytes(message);
}

private static WorkerMessage deserializeMessage(byte[] messageRaw, ObjectMapper mapper) throws java.io.IOException {
return mapper.readerFor(WorkerMessage.class).readValue(messageRaw);
public static ForwardToWorker create(WorkerId worker, WorkerMessage message) {
return new ForwardToWorker(worker, message, true);
}

@Override
public void react(ShardNodeNetworkContext context) throws Exception {
final Worker worker = Objects.requireNonNull(context.getWorkers().getWorker(workerId));
ConqueryMDC.setLocation(worker.toString());


// Jobception: this is to ensure that no subsequent message is deserialized before one message is processed
worker.getJobManager().addSlowJob(new SimpleJob("Process %s".formatted(getText()), () -> {

final WorkerMessage message = deserializeMessage(messageRaw, worker.getCommunicationMapper());

message.setProgressReporter(progressReporter);
message.react(worker);
}));
getMessage().setProgressReporter(progressReporter);
getMessage().react(worker);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.bakdata.conquery.models.worker;

import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

import com.bakdata.conquery.io.mina.MessageSender;
import com.bakdata.conquery.models.identifiable.NamedImpl;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
Expand All @@ -10,8 +13,6 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectWriter;
import it.unimi.dsi.fastutil.ints.IntArraySet;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -54,6 +55,6 @@ public ShardNodeInformation getMessageParent() {

@Override
public MessageToShardNode transform(WorkerMessage message) {
return ForwardToWorker.create(getId(), message, communicationWriter);
return ForwardToWorker.create(getId(), message);
}
}

0 comments on commit 5daa113

Please sign in to comment.