Skip to content

Commit

Permalink
improve logging around network filter and message toString
Browse files Browse the repository at this point in the history
  • Loading branch information
thoniTUB committed Feb 7, 2025
1 parent 7a2903f commit d4dc983
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.bakdata.conquery.io.mina;

import com.bakdata.conquery.models.messages.network.NetworkMessage;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Stopwatch;
import io.dropwizard.util.DataSize;
Expand All @@ -26,15 +27,19 @@ public class JacksonProtocolEncoder extends ObjectSerializationEncoder {

@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
if (!(message instanceof NetworkMessage<?> networkMessage)) {
throw new IllegalArgumentException("Message must be of type %s (was %s)".formatted(NetworkMessage.class, message.getClass()));
}

final IoBuffer buf = IoBuffer.allocate(initialBufferCapacityBytes, false);
buf.setAutoExpand(true);

buf.skip(SIZE_PREFIX_LENGTH); // Make a room for the length field.

final Stopwatch stopwatch = Stopwatch.createStarted();
log.trace("BEGIN Encoding message");
log.trace("BEGIN Encoding message: {}", networkMessage);

objectWriter.writeValue(buf.asOutputStream(), message);
objectWriter.writeValue(buf.asOutputStream(), networkMessage);

final int objectSize = buf.position() - SIZE_PREFIX_LENGTH;

Expand All @@ -46,7 +51,7 @@ public void encode(IoSession session, Object message, ProtocolEncoderOutput out)
buf.putInt(0, objectSize);

buf.flip();
log.trace("FINISHED Encoding message in {}. Buffer size: {}. Message: {}", stopwatch, DataSize.bytes(buf.remaining()), message);
log.trace("FINISHED Encoding message in {}. Buffer size: {}. Message: {}", stopwatch, DataSize.bytes(buf.remaining()), networkMessage);

out.write(buf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import com.bakdata.conquery.util.progressreporter.ProgressReporter;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;

@CPSType(id = "FORWARD_TO_NAMESPACE", base = NetworkMessage.class)
@RequiredArgsConstructor
@Getter
@ToString(callSuper = true)
public class ForwardToNamespace extends MessageToManagerNode implements SlowMessage {

private final DatasetId datasetId;
Expand All @@ -38,11 +40,6 @@ public void react(ManagerNodeNetworkContext context) throws Exception {
}
}

@Override
public String toString() {
return message.toString() + " for dataset " + datasetId;
}

@Override
public ProgressReporter getProgressReporter() {
return ((SlowMessage) message).getProgressReporter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
@CPSType(id = "FORWARD_TO_WORKER", base = NetworkMessage.class)
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
@ToString(of = {"workerId", "text"})
@ToString(of = {"workerId", "text"}, callSuper = true)
public class ForwardToWorker extends MessageToShardNode implements SlowMessage {

private final WorkerId workerId;
Expand Down

0 comments on commit d4dc983

Please sign in to comment.