diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 7d8fe7ddd0..45fba25f8c 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -48,7 +48,7 @@ jobs: uses: actions/setup-java@v4 with: distribution: 'temurin' - java-version: '17' + java-version: '21' overwrite-settings: false # Initializes the CodeQL tools for scanning. diff --git a/.github/workflows/release_backend.yml b/.github/workflows/release_backend.yml index 05b7e45261..d26bd51634 100644 --- a/.github/workflows/release_backend.yml +++ b/.github/workflows/release_backend.yml @@ -22,7 +22,7 @@ jobs: uses: actions/setup-java@v2 with: distribution: 'temurin' - java-version: '17' + java-version: '21' overwrite-settings: false - name: Cache local Maven repository uses: actions/cache@v2 diff --git a/.github/workflows/run_autodoc.yml b/.github/workflows/run_autodoc.yml index ba1ce0abb9..e8b84d02bc 100644 --- a/.github/workflows/run_autodoc.yml +++ b/.github/workflows/run_autodoc.yml @@ -31,7 +31,7 @@ jobs: - name: Set up JDK uses: actions/setup-java@v1 with: - java-version: 17 + java-version: 21 - name: Build Backend run: mvn install -T 1C -DskipTests -pl '!executable' - name: Run AutoDoc diff --git a/.github/workflows/test_backend.yml b/.github/workflows/test_backend.yml index f250dfcc49..e9090f1823 100644 --- a/.github/workflows/test_backend.yml +++ b/.github/workflows/test_backend.yml @@ -30,7 +30,7 @@ jobs: uses: actions/setup-java@v2 with: distribution: 'temurin' - java-version: '17' + java-version: '21' overwrite-settings: false - name: Build Backend run: mvn -T 1C install -pl backend -DskipTests -am diff --git a/.github/workflows/test_cypress.yml b/.github/workflows/test_cypress.yml index 8990da08ab..c8d2e6525f 100644 --- a/.github/workflows/test_cypress.yml +++ b/.github/workflows/test_cypress.yml @@ -37,7 +37,7 @@ jobs: uses: actions/setup-java@v4 with: distribution: 'temurin' - java-version: '17' + java-version: '21' overwrite-settings: false - name: Build Backend diff --git a/backend.Dockerfile b/backend.Dockerfile index c1d50f95aa..91e000d374 100644 --- a/backend.Dockerfile +++ b/backend.Dockerfile @@ -7,7 +7,7 @@ COPY .git . RUN git describe --tags | sed 's/^v//' > git_describe.txt # Builder -FROM maven:3.8-openjdk-17-slim AS builder +FROM maven:3.9-eclipse-temurin-21 AS builder WORKDIR /app @@ -30,7 +30,7 @@ RUN ./scripts/build_backend_version.sh `cat git_describe.txt` # Runner -FROM eclipse-temurin:17-jre-alpine AS runner +FROM eclipse-temurin:21-jre-alpine AS runner ## Apache POI needs some extra libs to auto-size columns RUN apk add --no-cache fontconfig ttf-dejavu diff --git a/backend/pom.xml b/backend/pom.xml index ab20155260..198455f9b4 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -163,10 +163,9 @@ 2.9.1 - org.codehaus.groovy + org.apache.groovy groovy - 3.0.13 - indy + 4.0.24 org.apache.commons @@ -220,18 +219,14 @@ org.apache.mina mina-core - 2.1.5 + 2.2.4 org.apache.shiro shiro-core - 1.13.0 + 2.0.2 - - org.apache.shiro - shiro-cache - org.apache.shiro shiro-config-ogdl @@ -252,14 +247,6 @@ org.apache.shiro shiro-config-core - - org.apache.shiro - shiro-event - - - org.apache.shiro - shiro-lang - diff --git a/backend/src/main/java/com/bakdata/conquery/Conquery.java b/backend/src/main/java/com/bakdata/conquery/Conquery.java index 6051d23a9c..4fbbdecfd9 100644 --- a/backend/src/main/java/com/bakdata/conquery/Conquery.java +++ b/backend/src/main/java/com/bakdata/conquery/Conquery.java @@ -3,7 +3,12 @@ import jakarta.validation.Validator; import ch.qos.logback.classic.Level; -import com.bakdata.conquery.commands.*; +import com.bakdata.conquery.commands.DistributedStandaloneCommand; +import com.bakdata.conquery.commands.ManagerNode; +import com.bakdata.conquery.commands.MigrateCommand; +import com.bakdata.conquery.commands.PreprocessorCommand; +import com.bakdata.conquery.commands.RecodeStoreCommand; +import com.bakdata.conquery.commands.ShardCommand; import com.bakdata.conquery.io.jackson.Jackson; import com.bakdata.conquery.io.jackson.MutableInjectableValues; import com.bakdata.conquery.metrics.prometheus.PrometheusBundle; @@ -21,7 +26,6 @@ import io.dropwizard.core.setup.Environment; import lombok.Getter; import lombok.RequiredArgsConstructor; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.text.StringSubstitutor; import org.glassfish.jersey.internal.inject.AbstractBinder; @@ -32,13 +36,15 @@ public class Conquery extends Application { private final String name; - @Setter - private ManagerNode managerNode; public Conquery() { this("Conquery"); } + public static void main(String... args) throws Exception { + new Conquery().run(args); + } + @Override public void initialize(Bootstrap bootstrap) { final ObjectMapper confMapper = bootstrap.getObjectMapper(); @@ -49,7 +55,7 @@ public void initialize(Bootstrap bootstrap) { bootstrap.addCommand(new ShardCommand()); bootstrap.addCommand(new PreprocessorCommand()); - bootstrap.addCommand(new DistributedStandaloneCommand(this)); + bootstrap.addCommand(new DistributedStandaloneCommand()); bootstrap.addCommand(new RecodeStoreCommand()); bootstrap.addCommand(new MigrateCommand()); @@ -92,17 +98,10 @@ protected Level bootstrapLogLevel() { public void run(ConqueryConfig configuration, Environment environment) throws Exception { ManagerProvider provider = configuration.getSqlConnectorConfig().isEnabled() ? new LocalManagerProvider() : new ClusterManagerProvider(); - run(provider.provideManager(configuration, environment)); - } + Manager manager = provider.provideManager(configuration, environment); - public void run(Manager manager) throws InterruptedException { - if (managerNode == null) { - managerNode = new ManagerNode(); - } - managerNode.run(manager); - } + ManagerNode managerNode = new ManagerNode(); - public static void main(String... args) throws Exception { - new Conquery().run(args); + managerNode.run(manager); } } diff --git a/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java index 311c72cf5f..97b67aa1f1 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java @@ -4,14 +4,13 @@ import java.util.List; import java.util.Vector; -import com.bakdata.conquery.Conquery; import com.bakdata.conquery.mode.cluster.ClusterManager; import com.bakdata.conquery.mode.cluster.ClusterManagerProvider; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.config.XodusStoreFactory; +import com.bakdata.conquery.util.commands.NoOpConquery; import com.bakdata.conquery.util.io.ConqueryMDC; import io.dropwizard.core.cli.ServerCommand; -import io.dropwizard.core.setup.Bootstrap; import io.dropwizard.core.setup.Environment; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -21,56 +20,30 @@ @Getter public class DistributedStandaloneCommand extends ServerCommand implements StandaloneCommand { - private final Conquery conquery; - private ClusterManager manager; private final ManagerNode managerNode = new ManagerNode(); private final List shardNodes = new Vector<>(); + private ClusterManager manager; - // TODO clean up the command structure, so we can use the Environment from EnvironmentCommand - private Environment environment; - - public DistributedStandaloneCommand(Conquery conquery) { - super(conquery, "standalone", "starts a server and a client at the same time."); - this.conquery = conquery; + public DistributedStandaloneCommand() { + super(new NoOpConquery(), "standalone", "starts a manager node and shard node(s) at the same time in a single JVM."); } - // this must be overridden so that @Override - public void run(Bootstrap bootstrap, Namespace namespace, ConqueryConfig configuration) throws Exception { - environment = new Environment( - bootstrap.getApplication().getName(), - bootstrap.getObjectMapper(), - bootstrap.getValidatorFactory(), - bootstrap.getMetricRegistry(), - bootstrap.getClassLoader(), - bootstrap.getHealthCheckRegistry(), - configuration - ); - configuration.getMetricsFactory().configure(environment.lifecycle(), bootstrap.getMetricRegistry()); - configuration.getServerFactory().configure(environment); - - bootstrap.run(configuration, environment); - startStandalone(environment, namespace, configuration); - } + protected void run(Environment environment, Namespace namespace, ConqueryConfig configuration) throws Exception { - public void startStandalone(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception { - // start ManagerNode - ConqueryMDC.setLocation("ManagerNode"); - log.debug("Starting ManagerNode"); - ConqueryConfig managerConfig = config; + ConqueryConfig managerConfig = configuration; - if (config.getStorage() instanceof XodusStoreFactory) { - final Path managerDir = ((XodusStoreFactory) config.getStorage()).getDirectory().resolve("manager"); - managerConfig = config.withStorage(((XodusStoreFactory) config.getStorage()).withDirectory(managerDir)); + if (configuration.getStorage() instanceof XodusStoreFactory) { + final Path managerDir = ((XodusStoreFactory) configuration.getStorage()).getDirectory().resolve("manager"); + managerConfig = configuration.withStorage(((XodusStoreFactory) configuration.getStorage()).withDirectory(managerDir)); } manager = new ClusterManagerProvider().provideManager(managerConfig, environment); - conquery.setManagerNode(managerNode); - conquery.run(manager); + managerNode.run(manager); - for (int id = 0; id < config.getStandalone().getNumberOfShardNodes(); id++) { + for (int id = 0; id < configuration.getStandalone().getNumberOfShardNodes(); id++) { ShardNode sc = new ShardNode(ShardNode.DEFAULT_NAME + id); @@ -78,11 +51,11 @@ public void startStandalone(Environment environment, Namespace namespace, Conque ConqueryMDC.setLocation(sc.getName()); - ConqueryConfig clone = config; + ConqueryConfig clone = configuration; - if (config.getStorage() instanceof XodusStoreFactory) { - final Path managerDir = ((XodusStoreFactory) config.getStorage()).getDirectory().resolve("shard-node" + id); - clone = config.withStorage(((XodusStoreFactory) config.getStorage()).withDirectory(managerDir)); + if (configuration.getStorage() instanceof XodusStoreFactory) { + final Path managerDir = ((XodusStoreFactory) configuration.getStorage()).getDirectory().resolve("shard-node" + id); + clone = configuration.withStorage(((XodusStoreFactory) configuration.getStorage()).withDirectory(managerDir)); } sc.run(clone, environment); @@ -93,6 +66,6 @@ public void startStandalone(Environment environment, Namespace namespace, Conque ConqueryMDC.setLocation("ManagerNode"); log.debug("Starting REST Server"); ConqueryMDC.setLocation(null); - super.run(environment, namespace, config); + super.run(environment, namespace, configuration); } } diff --git a/backend/src/main/java/com/bakdata/conquery/commands/StandaloneCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/StandaloneCommand.java index abf7ec7fe3..7545564291 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/StandaloneCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/StandaloneCommand.java @@ -2,25 +2,15 @@ import java.util.List; -import com.bakdata.conquery.Conquery; import com.bakdata.conquery.mode.Manager; -import com.bakdata.conquery.models.config.ConqueryConfig; -import io.dropwizard.core.setup.Bootstrap; import io.dropwizard.core.setup.Environment; -import net.sourceforge.argparse4j.inf.Namespace; public interface StandaloneCommand { - void startStandalone(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception; - Manager getManager(); List getShardNodes(); - void run(Bootstrap bootstrap, Namespace namespace, ConqueryConfig configuration) throws Exception; - - Conquery getConquery(); - ManagerNode getManagerNode(); Environment getEnvironment(); diff --git a/backend/src/main/java/com/bakdata/conquery/io/jackson/Jackson.java b/backend/src/main/java/com/bakdata/conquery/io/jackson/Jackson.java index 5ba5177574..e54b5c3c3b 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/jackson/Jackson.java +++ b/backend/src/main/java/com/bakdata/conquery/io/jackson/Jackson.java @@ -2,7 +2,8 @@ import java.util.Locale; -import com.bakdata.conquery.io.jackson.serializer.Object2IntMapMixin; +import com.bakdata.conquery.io.jackson.mixin.DefaultSocketSessionConfigMixIn; +import com.bakdata.conquery.io.jackson.mixin.Object2IntMapMixIn; import com.bakdata.conquery.models.auth.permissions.ConqueryPermission; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.core.JsonGenerator; @@ -19,6 +20,7 @@ import com.fasterxml.jackson.module.blackbird.BlackbirdModule; import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.mina.transport.socket.DefaultSocketSessionConfig; import org.apache.shiro.authz.Permission; public class Jackson { @@ -72,7 +74,8 @@ public static T configure(T objectMapper) { //.setAnnotationIntrospector(new RestrictingAnnotationIntrospector()) .setInjectableValues(new MutableInjectableValues()) .addMixIn(Permission.class, ConqueryPermission.class) - .addMixIn(Object2IntMap.class, Object2IntMapMixin.class); + .addMixIn(Object2IntMap.class, Object2IntMapMixIn.class) + .addMixIn(DefaultSocketSessionConfig.class, DefaultSocketSessionConfigMixIn.class); return objectMapper; } diff --git a/backend/src/main/java/com/bakdata/conquery/io/jackson/JacksonUtil.java b/backend/src/main/java/com/bakdata/conquery/io/jackson/JacksonUtil.java index 56ddadc71c..a914394572 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/jackson/JacksonUtil.java +++ b/backend/src/main/java/com/bakdata/conquery/io/jackson/JacksonUtil.java @@ -1,17 +1,11 @@ package com.bakdata.conquery.io.jackson; -import java.io.ByteArrayInputStream; import java.io.InputStream; -import java.io.SequenceInputStream; -import com.bakdata.conquery.io.mina.ChunkedMessage; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonMappingException; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.IteratorUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.translate.JavaUnicodeEscaper; import org.apache.mina.core.buffer.IoBuffer; @@ -19,14 +13,6 @@ @Slf4j @UtilityClass public class JacksonUtil { - public static String toJsonDebug(byte[] bytes) { - return toJsonDebug(IoBuffer.wrap(bytes)); - } - - public static String toJsonDebug(IoBuffer buffer) { - return toJsonDebug(stream(buffer)); - } - /** * Partially read and parse InputStream as Json, directly storing it into String, just for debugging purposes. */ @@ -72,7 +58,7 @@ public static String toJsonDebug(InputStream is) { sb.append('"').append(value).append("\","); break; default: - sb.append(t.toString()); + sb.append(t); log.warn("I don't know how to handle {}", t); break; } @@ -85,33 +71,4 @@ public static String toJsonDebug(InputStream is) { return sb.toString(); } } - - public static InputStream stream(IoBuffer buffer) { - return new ByteArrayInputStream( - buffer.array(), - buffer.position() + buffer.arrayOffset(), - buffer.remaining() - ); - } - - public static InputStream stream(Iterable list) { - return new SequenceInputStream( - IteratorUtils.asEnumeration( - IteratorUtils.transformedIterator( - list.iterator(), - JacksonUtil::stream - ) - ) - ); - } - - public static String toJsonDebug(ChunkedMessage msg) { - return toJsonDebug(msg.createInputStream()); - } - - public static void expect(Class parseTargetType, DeserializationContext ctxt, JsonToken token, JsonToken expected) throws JsonMappingException { - if (token != expected) { - ctxt.reportInputMismatch(parseTargetType, "Expected " + expected + " but found " + token); - } - } } diff --git a/backend/src/main/java/com/bakdata/conquery/io/jackson/mixin/DefaultSocketSessionConfigMixIn.java b/backend/src/main/java/com/bakdata/conquery/io/jackson/mixin/DefaultSocketSessionConfigMixIn.java new file mode 100644 index 0000000000..0c86bf275d --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/io/jackson/mixin/DefaultSocketSessionConfigMixIn.java @@ -0,0 +1,17 @@ +package com.bakdata.conquery.io.jackson.mixin; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.mina.transport.socket.DefaultSocketSessionConfig; + +/** + * MixIn to suppress artificial properties of {@link DefaultSocketSessionConfig}. + */ +@JsonIgnoreProperties(value = { + "throughputCalculationIntervalInMillis", + "readerIdleTimeInMillis", + "writeTimeoutInMillis", + "writerIdleTimeInMillis", + "bothIdleTimeInMillis" +}) +public class DefaultSocketSessionConfigMixIn extends DefaultSocketSessionConfig { +} diff --git a/backend/src/main/java/com/bakdata/conquery/io/jackson/serializer/Object2IntMapMixin.java b/backend/src/main/java/com/bakdata/conquery/io/jackson/mixin/Object2IntMapMixIn.java similarity index 95% rename from backend/src/main/java/com/bakdata/conquery/io/jackson/serializer/Object2IntMapMixin.java rename to backend/src/main/java/com/bakdata/conquery/io/jackson/mixin/Object2IntMapMixIn.java index 76b660ffbd..eac1ca8346 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/jackson/serializer/Object2IntMapMixin.java +++ b/backend/src/main/java/com/bakdata/conquery/io/jackson/mixin/Object2IntMapMixIn.java @@ -1,4 +1,4 @@ -package com.bakdata.conquery.io.jackson.serializer; +package com.bakdata.conquery.io.jackson.mixin; import java.io.IOException; import java.util.Map; @@ -25,9 +25,9 @@ /** * (De-)Serialization Mixin for {@link Object2IntMap}. */ -@JsonDeserialize(using = Object2IntMapMixin.Deserializer.class) +@JsonDeserialize(using = Object2IntMapMixIn.Deserializer.class) @Slf4j -public class Object2IntMapMixin { +public class Object2IntMapMixIn { public static class Deserializer extends StdDeserializer> implements ContextualDeserializer { diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/BinaryJacksonCoder.java b/backend/src/main/java/com/bakdata/conquery/io/mina/BinaryJacksonCoder.java deleted file mode 100644 index a60d742f96..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/BinaryJacksonCoder.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.bakdata.conquery.io.mina; - -import jakarta.validation.Validator; - -import com.bakdata.conquery.models.exceptions.ValidatorHelper; -import com.bakdata.conquery.models.identifiable.NamespacedStorageProvider; -import com.bakdata.conquery.models.messages.network.NetworkMessage; -import com.bakdata.conquery.util.io.EndCheckableInputStream; -import com.fasterxml.jackson.core.JsonParser.Feature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.ObjectWriter; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class BinaryJacksonCoder implements CQCoder> { - - private final Validator validator; - private final ObjectWriter writer; - private final ObjectReader reader; - - public BinaryJacksonCoder(NamespacedStorageProvider namespacedStorageProvider, Validator validator, ObjectMapper objectMapper) { - this.validator = validator; - writer = objectMapper.writerFor(NetworkMessage.class); - reader = namespacedStorageProvider.injectIntoNew(objectMapper.readerFor(NetworkMessage.class)).without(Feature.AUTO_CLOSE_SOURCE); - } - - @Override - public NetworkMessage decode(ChunkedMessage message) throws Exception { - try (EndCheckableInputStream is = message.createInputStream()) { - final Object obj = reader.readValue(is); - if (!is.isAtEnd()) { - throw new IllegalStateException("After reading the JSON message " + obj + " the buffer has still bytes available"); - } - ValidatorHelper.failOnError(log, validator.validate(obj)); - return (NetworkMessage) obj; - } - } - - @Override - public Chunkable encode(NetworkMessage message) throws Exception { - ValidatorHelper.failOnError(log, validator.validate(message)); - - return new Chunkable(message.getMessageId(), writer, message); - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/CQCoder.java b/backend/src/main/java/com/bakdata/conquery/io/mina/CQCoder.java deleted file mode 100644 index aa76894d6f..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/CQCoder.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.bakdata.conquery.io.mina; - -public interface CQCoder { - - OUT decode(ChunkedMessage message) throws Exception; - - Chunkable encode(OUT message) throws Exception; -} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/CQProtocolCodecFilter.java b/backend/src/main/java/com/bakdata/conquery/io/mina/CQProtocolCodecFilter.java deleted file mode 100644 index f38b3b31a6..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/CQProtocolCodecFilter.java +++ /dev/null @@ -1,532 +0,0 @@ -package com.bakdata.conquery.io.mina; - -import java.net.SocketAddress; -import java.util.Queue; - -import org.apache.mina.core.buffer.IoBuffer; -import org.apache.mina.core.file.FileRegion; -import org.apache.mina.core.filterchain.IoFilterAdapter; -import org.apache.mina.core.filterchain.IoFilterChain; -import org.apache.mina.core.future.DefaultWriteFuture; -import org.apache.mina.core.future.WriteFuture; -import org.apache.mina.core.session.AbstractIoSession; -import org.apache.mina.core.session.AttributeKey; -import org.apache.mina.core.session.IoSession; -import org.apache.mina.core.write.DefaultWriteRequest; -import org.apache.mina.core.write.NothingWrittenException; -import org.apache.mina.core.write.WriteRequest; -import org.apache.mina.filter.codec.AbstractProtocolDecoderOutput; -import org.apache.mina.filter.codec.AbstractProtocolEncoderOutput; -import org.apache.mina.filter.codec.ProtocolCodecFactory; -import org.apache.mina.filter.codec.ProtocolDecoder; -import org.apache.mina.filter.codec.ProtocolDecoderException; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; -import org.apache.mina.filter.codec.ProtocolEncoder; -import org.apache.mina.filter.codec.ProtocolEncoderException; -import org.apache.mina.filter.codec.ProtocolEncoderOutput; -import org.apache.mina.filter.codec.RecoverableProtocolDecoderException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -//see #167 this class is a copy of the one in the mina project -//it is used because mina on default dumps even very large hex values -public class CQProtocolCodecFilter extends IoFilterAdapter { - /** A logger for this class */ - private static final Logger LOGGER = LoggerFactory.getLogger(CQProtocolCodecFilter.class); - - private static final Class[] EMPTY_PARAMS = new Class[0]; - - private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]); - - private static final AttributeKey ENCODER = new AttributeKey(CQProtocolCodecFilter.class, "encoder"); - - private static final AttributeKey DECODER = new AttributeKey(CQProtocolCodecFilter.class, "decoder"); - - private static final AttributeKey DECODER_OUT = new AttributeKey(CQProtocolCodecFilter.class, "decoderOut"); - - private static final AttributeKey ENCODER_OUT = new AttributeKey(CQProtocolCodecFilter.class, "encoderOut"); - - /** The factory responsible for creating the encoder and decoder */ - private final ProtocolCodecFactory factory; - - /** - * Creates a new instance of ProtocolCodecFilter, associating a factory - * for the creation of the encoder and decoder. - * - * @param factory The associated factory - */ - public CQProtocolCodecFilter(ProtocolCodecFactory factory) { - if (factory == null) { - throw new IllegalArgumentException("factory"); - } - - this.factory = factory; - } - - /** - * Creates a new instance of ProtocolCodecFilter, without any factory. - * The encoder/decoder factory will be created as an inner class, using - * the two parameters (encoder and decoder). - * - * @param encoder The class responsible for encoding the message - * @param decoder The class responsible for decoding the message - */ - public CQProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) { - if (encoder == null) { - throw new IllegalArgumentException("encoder"); - } - if (decoder == null) { - throw new IllegalArgumentException("decoder"); - } - - // Create the inner Factory based on the two parameters - this.factory = new ProtocolCodecFactory() { - /** - * {@inheritDoc} - */ - @Override - public ProtocolEncoder getEncoder(IoSession session) { - return encoder; - } - - /** - * {@inheritDoc} - */ - @Override - public ProtocolDecoder getDecoder(IoSession session) { - return decoder; - } - }; - } - - /** - * Creates a new instance of ProtocolCodecFilter, without any factory. - * The encoder/decoder factory will be created as an inner class, using - * the two parameters (encoder and decoder), which are class names. Instances - * for those classes will be created in this constructor. - * - * @param encoderClass The class responsible for encoding the message - * @param decoderClass The class responsible for decoding the message - */ - public CQProtocolCodecFilter(final Class encoderClass, - final Class decoderClass) { - if (encoderClass == null) { - throw new IllegalArgumentException("encoderClass"); - } - if (decoderClass == null) { - throw new IllegalArgumentException("decoderClass"); - } - if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) { - throw new IllegalArgumentException("encoderClass: " + encoderClass.getName()); - } - if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) { - throw new IllegalArgumentException("decoderClass: " + decoderClass.getName()); - } - try { - encoderClass.getConstructor(EMPTY_PARAMS); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("encoderClass doesn't have a public default constructor."); - } - try { - decoderClass.getConstructor(EMPTY_PARAMS); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("decoderClass doesn't have a public default constructor."); - } - - final ProtocolEncoder encoder; - - try { - encoder = encoderClass.newInstance(); - } catch (Exception e) { - throw new IllegalArgumentException("encoderClass cannot be initialized"); - } - - final ProtocolDecoder decoder; - - try { - decoder = decoderClass.newInstance(); - } catch (Exception e) { - throw new IllegalArgumentException("decoderClass cannot be initialized"); - } - - // Create the inner factory based on the two parameters. - this.factory = new ProtocolCodecFactory() { - /** - * {@inheritDoc} - */ - @Override - public ProtocolEncoder getEncoder(IoSession session) throws Exception { - return encoder; - } - - /** - * {@inheritDoc} - */ - @Override - public ProtocolDecoder getDecoder(IoSession session) throws Exception { - return decoder; - } - }; - } - - /** - * {@inheritDoc} - */ - @Override - public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { - if (parent.contains(this)) { - throw new IllegalArgumentException( - "You can't add the same filter instance more than once. Create another instance and add it."); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { - // Clean everything - disposeCodec(parent.getSession()); - } - - /** - * Process the incoming message, calling the session decoder. As the incoming - * buffer might contains more than one messages, we have to loop until the decoder - * throws an exception. - * - * while ( buffer not empty ) - * try - * decode ( buffer ) - * catch - * break; - * - */ - @Override - public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { - LOGGER.trace("Processing a MESSAGE_RECEIVED for session {}", session.getId()); - - if (!(message instanceof IoBuffer)) { - nextFilter.messageReceived(session, message); - return; - } - - IoBuffer in = (IoBuffer) message; - ProtocolDecoder decoder = factory.getDecoder(session); - ProtocolDecoderOutput decoderOut = getDecoderOut(session); - - // Loop until we don't have anymore byte in the buffer, - // or until the decoder throws an unrecoverable exception or - // can't decoder a message, because there are not enough - // data in the buffer - while (in.hasRemaining()) { - int oldPos = in.position(); - try { - synchronized (session) { - // Call the decoder with the read bytes - decoder.decode(session, in, decoderOut); - } - // Finish decoding if no exception was thrown. - decoderOut.flush(nextFilter, session); - } catch (Exception e) { - ProtocolDecoderException pde; - if (e instanceof ProtocolDecoderException) { - pde = (ProtocolDecoderException) e; - } else { - pde = new ProtocolDecoderException(e); - } - if (pde.getHexdump() == null) { - // Generate a message hex dump - int curPos = in.position(); - in.position(oldPos); - pde.setHexdump(in.getHexDump(300)); - in.position(curPos); - } - // Fire the exceptionCaught event. - decoderOut.flush(nextFilter, session); - nextFilter.exceptionCaught(session, pde); - // Retry only if the type of the caught exception is - // recoverable and the buffer position has changed. - // We check buffer position additionally to prevent an - // infinite loop. - if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) { - break; - } - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { - if (writeRequest instanceof EncodedWriteRequest) { - return; - } - - if (writeRequest instanceof MessageWriteRequest wrappedRequest) { - nextFilter.messageSent(session, wrappedRequest.getOriginalRequest()); - } else { - nextFilter.messageSent(session, writeRequest); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { - Object message = writeRequest.getMessage(); - - // Bypass the encoding if the message is contained in a IoBuffer, - // as it has already been encoded before - if ((message instanceof IoBuffer) || (message instanceof FileRegion)) { - nextFilter.filterWrite(session, writeRequest); - return; - } - - // Get the encoder in the session - ProtocolEncoder encoder = factory.getEncoder(session); - - ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest); - - if (encoder == null) { - throw new ProtocolEncoderException("The encoder is null for the session " + session); - } - - try { - // The following encodes the message, chunks the message AND also flushes the chunks to the processor - // See ChunkWriter::finishBuffer and ProtocolEncoderOutputImpl::flush - encoder.encode(session, message, encoderOut); - - // Call the next filter - nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest)); - } catch (Exception e) { - ProtocolEncoderException pee; - - // Generate the correct exception - if (e instanceof ProtocolEncoderException) { - pee = (ProtocolEncoderException) e; - } else { - pee = new ProtocolEncoderException(e); - } - - throw pee; - } - } - - /** - * {@inheritDoc} - */ - @Override - public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { - // Call finishDecode() first when a connection is closed. - ProtocolDecoder decoder = factory.getDecoder(session); - ProtocolDecoderOutput decoderOut = getDecoderOut(session); - - try { - decoder.finishDecode(session, decoderOut); - } catch (Exception e) { - ProtocolDecoderException pde; - if (e instanceof ProtocolDecoderException) { - pde = (ProtocolDecoderException) e; - } else { - pde = new ProtocolDecoderException(e); - } - throw pde; - } finally { - // Dispose everything - disposeCodec(session); - decoderOut.flush(nextFilter, session); - } - - // Call the next filter - nextFilter.sessionClosed(session); - } - - private static class EncodedWriteRequest extends DefaultWriteRequest { - public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) { - super(encodedMessage, future, destination); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isEncoded() { - return true; - } - } - - /** - * Wrapper for write request that where filtered by {@link CQProtocolCodecFilter} to recognize the request - * when it's events are bubbled downstream through the filterchain. - * - */ - private static class MessageWriteRequest extends DefaultWriteRequest { - public MessageWriteRequest(WriteRequest writeRequest) { - super(writeRequest, writeRequest.getFuture()); - } - - @Override - public Object getMessage() { - return EMPTY_BUFFER; - } - - @Override - public String toString() { - return "MessageWriteRequest, parent : " + super.toString(); - } - } - - private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput { - public ProtocolDecoderOutputImpl() { - // Do nothing - } - - /** - * {@inheritDoc} - */ - @Override - public void flush(NextFilter nextFilter, IoSession session) { - Queue messageQueue = getMessageQueue(); - - while (!messageQueue.isEmpty()) { - nextFilter.messageReceived(session, messageQueue.poll()); - } - } - } - - private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput { - private final IoSession session; - - private final NextFilter nextFilter; - - /** The WriteRequest destination */ - private final SocketAddress destination; - - public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { - this.session = session; - this.nextFilter = nextFilter; - - // Only store the destination, not the full WriteRequest. - destination = writeRequest.getDestination(); - } - - /** - * {@inheritDoc} - */ - @Override - public WriteFuture flush() { - Queue bufferQueue = getMessageQueue(); - WriteFuture future = null; - - while (!bufferQueue.isEmpty()) { - Object encodedMessage = bufferQueue.poll(); - - if (encodedMessage == null) { - break; - } - - // Flush only when the buffer has remaining. - if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { - future = new DefaultWriteFuture(session); - nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination)); - } - } - - if (future == null) { - // Creates an empty writeRequest containing the destination - future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST)); - } - - return future; - } - } - - //----------- Helper methods --------------------------------------------- - /** - * Dispose the encoder, decoder, and the callback for the decoded - * messages. - */ - private void disposeCodec(IoSession session) { - // We just remove the two instances of encoder/decoder to release resources - // from the session - disposeEncoder(session); - disposeDecoder(session); - - // We also remove the callback - disposeDecoderOut(session); - } - - /** - * Dispose the encoder, removing its instance from the - * session's attributes, and calling the associated - * dispose method. - */ - private void disposeEncoder(IoSession session) { - ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER); - if (encoder == null) { - return; - } - - try { - encoder.dispose(session); - } catch (Exception e) { - LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')'); - } - } - - /** - * Dispose the decoder, removing its instance from the - * session's attributes, and calling the associated - * dispose method. - */ - private void disposeDecoder(IoSession session) { - ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER); - if (decoder == null) { - return; - } - - try { - decoder.dispose(session); - } catch (Exception e) { - LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')'); - } - } - - /** - * Return a reference to the decoder callback. If it's not already created - * and stored into the session, we create a new instance. - */ - private ProtocolDecoderOutput getDecoderOut(IoSession session) { - ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT); - - if (out == null) { - // Create a new instance, and stores it into the session - out = new ProtocolDecoderOutputImpl(); - session.setAttribute(DECODER_OUT, out); - } - - return out; - } - - private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { - ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT); - - if (out == null) { - // Create a new instance, and stores it into the session - out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest); - session.setAttribute(ENCODER_OUT, out); - } - - return out; - } - - /** - * Remove the decoder callback from the session's attributes. - */ - private void disposeDecoderOut(IoSession session) { - session.removeAttribute(DECODER_OUT); - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkReader.java b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkReader.java deleted file mode 100644 index a5fd805263..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkReader.java +++ /dev/null @@ -1,157 +0,0 @@ -package com.bakdata.conquery.io.mina; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Objects; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import com.bakdata.conquery.io.jackson.JacksonUtil; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.mina.core.buffer.IoBuffer; -import org.apache.mina.core.session.AttributeKey; -import org.apache.mina.core.session.IoSession; -import org.apache.mina.filter.codec.CumulativeProtocolDecoder; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; - -@Slf4j @RequiredArgsConstructor -public class ChunkReader extends CumulativeProtocolDecoder { - - private static final AttributeKey MESSAGE_MANAGER = new AttributeKey(BinaryJacksonCoder.class, "messageManager"); - - private final CQCoder coder; - private final ObjectMapper mapper; - - @Override - protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) { - in.mark(); - if (in.remaining() < ChunkWriter.HEADER_SIZE) { - return false; - } - - boolean last = in.get() == ChunkWriter.LAST_MESSAGE; - int length = in.getInt(); - if(length<0) { - throw new IllegalStateException("Read message length "+length); - } - UUID id = new UUID(in.getLong(), in.getLong()); - - if (in.remaining() < length) { - in.reset(); - return false; - } - - MessageManager messageManager = getMessageManager(session); - - if (last) { - ChunkedMessage chunkedMessage = messageManager.finalBuffer(id, in, length); - - try { - out.write(coder.decode(chunkedMessage)); - } catch (Exception e) { - log.error( - "Failed while deserializing the message {}: `{}` (Trying to create a dump as {}.json", - chunkedMessage, - JacksonUtil.toJsonDebug(chunkedMessage), - id, - e - ); - - dumpFailed(id, chunkedMessage.createInputStream()); - } - } - //if not the last part of the message we just store it - else { - messageManager.addBuffer(id, in, length); - } - - return true; - } - - private void dumpFailed(UUID id, InputStream inputStream) { - Path dumps = Path.of("dumps"); - final File dumpFile = dumps.resolve("reading_" + id + "_" + Math.random() + ".json").toFile(); - - try (InputStream is = inputStream) { - Files.createDirectories(dumps); - - JsonNode tree = mapper.readTree(is); - try(OutputStream os = new FileOutputStream(dumpFile)) { - mapper.copy().enable(SerializationFeature.INDENT_OUTPUT).writeValue(os, tree); - } - } catch (Exception exception) { - log.error("Failed to write the error json dump {}.json", id, exception); - } - } - - private MessageManager getMessageManager(IoSession session) { - MessageManager messageManager = (MessageManager) session.getAttribute(MESSAGE_MANAGER); - - if (messageManager == null) { - messageManager = new MessageManager(); - session.setAttribute(MESSAGE_MANAGER, messageManager); - } - return messageManager; - } - - @Getter @RequiredArgsConstructor - public static class MessageManager { - - private final ConcurrentMap messages = new ConcurrentHashMap<>(); - private UUID lastId = null; - private ChunkedMessage.List lastMessage = null; - - public ChunkedMessage finalBuffer(UUID id, IoBuffer in, int length) { - if(Objects.equals(lastId, id) || messages.containsKey(id)) { - IoBuffer copy = IoBuffer.allocate(length); - copy.put(in.array(), in.arrayOffset() + in.position(), length); - copy.flip(); - in.skip(length); - ChunkedMessage.List chunkedMessage = getChunkedMessage(id); - remove(id); - chunkedMessage.addBuffer(copy); - return chunkedMessage; - } - return new ChunkedMessage.Singleton(in.getSlice(length)); - } - - public ChunkedMessage addBuffer(UUID id, IoBuffer in, int length) { - IoBuffer copy = IoBuffer.allocate(length); - copy.put(in.array(), in.arrayOffset() + in.position(), length); - copy.flip(); - in.skip(length); - ChunkedMessage.List chunkedMessage = getChunkedMessage(id); - chunkedMessage.addBuffer(copy); - return chunkedMessage; - } - - private ChunkedMessage.List getChunkedMessage(UUID id) { - if(id.equals(lastId)) { - return lastMessage; - } - ChunkedMessage.List msg = messages.computeIfAbsent(id, a->new ChunkedMessage.List()); - lastId = id; - lastMessage = msg; - return msg; - } - - private void remove(UUID id) { - if(lastId == id) { - lastId = null; - lastMessage = null; - } - messages.remove(id); - } - - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkWriter.java b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkWriter.java deleted file mode 100644 index c5e38d316f..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkWriter.java +++ /dev/null @@ -1,123 +0,0 @@ -package com.bakdata.conquery.io.mina; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.UUID; - -import com.bakdata.conquery.models.config.ClusterConfig; -import com.bakdata.conquery.util.SoftPool; -import com.google.common.primitives.Ints; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.mina.core.buffer.IoBuffer; -import org.apache.mina.core.session.IoSession; -import org.apache.mina.filter.codec.ProtocolEncoderAdapter; -import org.apache.mina.filter.codec.ProtocolEncoderOutput; - -@Slf4j -@RequiredArgsConstructor -public class ChunkWriter extends ProtocolEncoderAdapter { - - public static final int HEADER_SIZE = Integer.BYTES + Byte.BYTES + 2 * Long.BYTES; - public static final byte LAST_MESSAGE = 1; - public static final byte CONTINUED_MESSAGE = 0; - @SuppressWarnings("rawtypes") - private final CQCoder coder; - private final SoftPool bufferPool; - - public ChunkWriter(ClusterConfig config, CQCoder coder) { - this.coder = coder; - int bufferSize = Ints.checkedCast(config.getMessageChunkSize().toBytes()); - bufferPool = new SoftPool<>(config, () -> IoBuffer.allocate(bufferSize)); - } - - @SuppressWarnings("unchecked") - @Override - public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { - Chunkable ch = coder.encode(message); - try (ChunkOutputStream cos = new ChunkOutputStream(ch.getId(), out)) { - ch.writeMessage(cos); - } - } - - @RequiredArgsConstructor - private class ChunkOutputStream extends OutputStream { - private final UUID id; - private final ProtocolEncoderOutput out; - private IoBuffer buffer = null; - private boolean closed = false; - - @Override - public void write(int b) throws IOException { - if (closed) { - throw new IllegalStateException(); - } - newBuffer(1); - buffer.put((byte) b); - } - - private void newBuffer(int required) { - if (buffer == null || buffer.remaining() < required) { - if (buffer != null) { - finishBuffer(false); - } - buffer = bufferPool.borrow(); - buffer.position(HEADER_SIZE); - } - } - - private void finishBuffer(boolean end) { - buffer.flip(); - if (buffer.remaining() < HEADER_SIZE) { - throw new IllegalStateException("Buffer of size %s is too small for header of length %s".formatted(buffer.remaining(), HEADER_SIZE)); - } - buffer.put(0, end ? LAST_MESSAGE : CONTINUED_MESSAGE); - buffer.putInt(Byte.BYTES, buffer.remaining() - HEADER_SIZE); - buffer.putLong(Byte.BYTES + Integer.BYTES, id.getMostSignificantBits()); - buffer.putLong(Byte.BYTES + Integer.BYTES + Long.BYTES, id.getLeastSignificantBits()); - out.write(buffer); - final IoBuffer currentBuffer = buffer; - out.flush().addListener(future -> { - currentBuffer.clear(); - bufferPool.offer(currentBuffer); - }); - buffer = null; - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - if (closed) { - throw new IllegalStateException(); - } - if (b == null) { - throw new NullPointerException(); - } - else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException(); - } - else if (len == 0) { - return; - } - - while (len > 0) { - if (buffer == null || !buffer.hasRemaining()) { - newBuffer(len); - } - - int write = Math.min(len, buffer.remaining()); - buffer.put(b, off, write); - len -= write; - off += write; - } - } - - @Override - public void close() throws IOException { - if (!closed) { - newBuffer(0); - finishBuffer(true); - closed = true; - } - } - } -} \ No newline at end of file diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/Chunkable.java b/backend/src/main/java/com/bakdata/conquery/io/mina/Chunkable.java deleted file mode 100644 index c5c3ae65f2..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/Chunkable.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.bakdata.conquery.io.mina; - -import java.io.OutputStream; -import java.util.UUID; - -import com.fasterxml.jackson.databind.ObjectWriter; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -@RequiredArgsConstructor -@Getter -@Slf4j -public class Chunkable { - - private final UUID id; - private final ObjectWriter writer; - private final Object message; - - public void writeMessage(OutputStream out) { - try (OutputStream os = out) { - writer.writeValue(os, message); - } - catch (Exception e) { - log.error("Failed to write message {}: {}", id, message, e); - } - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkedMessage.java b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkedMessage.java deleted file mode 100644 index ee645c4c70..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkedMessage.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.bakdata.conquery.io.mina; - -import java.util.ArrayList; -import java.util.stream.Collectors; - -import com.bakdata.conquery.io.jackson.JacksonUtil; -import com.bakdata.conquery.util.io.EndCheckableInputStream; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import org.apache.mina.core.buffer.IoBuffer; - -public interface ChunkedMessage { - - long size(); - EndCheckableInputStream createInputStream(); - - @Getter @RequiredArgsConstructor - class Singleton implements ChunkedMessage { - - private final IoBuffer buffer; - - @Override - public EndCheckableInputStream createInputStream() { - return new EndCheckableInputStream(JacksonUtil.stream(buffer)); - } - - @Override - public long size() { - return buffer.remaining(); - } - - @Override - public String toString() { - return "ChunkedMessage [buffers=" + buffer.limit() + "]"; - } - } - - @Getter @RequiredArgsConstructor - class List implements ChunkedMessage { - - private final java.util.List buffers = new ArrayList<>(); - - @Override - public EndCheckableInputStream createInputStream() { - return new EndCheckableInputStream(JacksonUtil.stream(buffers)); - } - - public void addBuffer(IoBuffer copy) { - buffers.add(copy); - } - - @Override - public long size() { - long size = 0; - for(IoBuffer b:buffers) { - size+=b.remaining(); - } - return size; - } - - @Override - public String toString() { - return "ChunkedMessage [buffers=" + buffers.stream().map(b->b.limit()).collect(Collectors.toList()) + "]"; - } - } -} \ No newline at end of file diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java new file mode 100644 index 0000000000..e460c5b354 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java @@ -0,0 +1,113 @@ +package com.bakdata.conquery.io.mina; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.dropwizard.util.DataSize; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.filterchain.IoFilterAdapter; +import org.apache.mina.core.future.DefaultWriteFuture; +import org.apache.mina.core.future.IoFuture; +import org.apache.mina.core.future.IoFutureListener; +import org.apache.mina.core.future.WriteFuture; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.write.DefaultWriteRequest; +import org.apache.mina.core.write.WriteRequest; +import org.jetbrains.annotations.NotNull; + +/** + * Chunks messages to fit them in the socket send buffer size, iff they are larger than the socket send buffer. + * The given ioBuffer is simply sliced into smaller ioBuffers up to the size of {@link ChunkingFilter#socketSendBufferSize}. + */ +@RequiredArgsConstructor +@Slf4j +public class ChunkingFilter extends IoFilterAdapter { + + private final int socketSendBufferSize; + + + + + @Override + public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { + if (!(writeRequest.getMessage() instanceof IoBuffer ioBuffer)) { + throw new IllegalStateException("Filter was added at the wrong place in the filter chain. Only expecting IoBuffers here. Got: " + writeRequest.getMessage()); + } + + // The first 4 bytes hold the object length in bytes + int objectLength = ioBuffer.getInt(ioBuffer.position()); + + + if (objectLength < socketSendBufferSize) { + // IoBuffer is shorter than socket buffer, we can just send it. + log.trace("Sending buffer without chunking: {} (limit = {})", DataSize.bytes(objectLength), DataSize.bytes(socketSendBufferSize)); + super.filterWrite(nextFilter, session, writeRequest); + return; + } + + // Split buffers + final int totalSize = ioBuffer.remaining(); + + // TODO unsure if Atomic is needed here + final AtomicInteger writtenChunks = new AtomicInteger(); + final int totalChunks = divideAndRoundUp(totalSize, socketSendBufferSize); + + // Send the first resized (original) buffer + int chunkCount = 0; + + IoFutureListener listener = handleWrittenChunk(writeRequest, writtenChunks, totalChunks); + + DefaultWriteFuture future; + + int position = ioBuffer.position(); + int remainingBytes = totalSize; + + do { + // Size a new Buffer + int nextBufSize = Math.min(remainingBytes, socketSendBufferSize); + IoBuffer slice = ioBuffer.getSlice(position, nextBufSize); + + // Write chunked buffer + chunkCount++; + log.trace("Sending {}. chunk: {} byte", chunkCount, nextBufSize); + future = new DefaultWriteFuture(session); + + nextFilter.filterWrite(session, new DefaultWriteRequest(slice, future)); + + future.addListener(listener); + + // Recalculate for next iteration + position += nextBufSize; + remainingBytes -= nextBufSize; + + } while(remainingBytes > 0); + } + + @NotNull + private static IoFutureListener handleWrittenChunk(WriteRequest writeRequest, AtomicInteger writtenChunks, int totalChunks) { + return f -> { + // Count written chunk and notify original writeRequest on error or success + + WriteFuture chunkFuture = (WriteFuture) f; + WriteFuture originalFuture = writeRequest.getFuture(); + if (!chunkFuture.isWritten()) { + log.warn("Failed to write chunk"); + if (!originalFuture.isDone()) { + originalFuture.setException(new IllegalStateException("Failed to write a chunked ioBuffer", chunkFuture.getException())); + } + return; + } + int writtenChunk = writtenChunks.incrementAndGet(); + if (writtenChunk >= totalChunks) { + log.trace("Sent all {} chunks", writtenChunk); + originalFuture.setWritten(); + } + }; + } + + public static int divideAndRoundUp(int num, int divisor) { + // only for positive values + return (num + divisor - 1) / divisor; + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolDecoder.java b/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolDecoder.java new file mode 100644 index 0000000000..9431afe8ae --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolDecoder.java @@ -0,0 +1,70 @@ +package com.bakdata.conquery.io.mina; + +import java.io.IOException; + +import com.bakdata.conquery.io.jackson.JacksonUtil; +import com.fasterxml.jackson.databind.ObjectReader; +import com.google.common.base.Stopwatch; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.mina.core.buffer.BufferDataException; +import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.filter.codec.CumulativeProtocolDecoder; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; + +@Slf4j +@RequiredArgsConstructor +public class JacksonProtocolDecoder extends CumulativeProtocolDecoder { + + private final ObjectReader objectReader; + + @Override + protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) { + if (!in.prefixedDataAvailable(4, Integer.MAX_VALUE)) { + // Not enough data available, cumulate more + return false; + } + + int length = in.getInt(); + if (length <= 4) { + throw new BufferDataException("Object length should be greater than 4: " + length); + } + + // Resize limit to frame only the object that we want to read now + int oldLimit = in.limit(); + int beforeReadPosition = in.position(); + in.limit(in.position() + length); + int objectEndLimit = in.limit(); + + try { + // Read the object we are interested in + Stopwatch stopwatch = Stopwatch.createStarted(); + log.trace("BEGIN Decoding message"); + Object o = objectReader.readValue(in.asInputStream()); + log.trace("FINISHED Decoding message in {}", stopwatch); + + out.write(o); + } + catch (IOException e) { // Includes JacksonException + String debuggedMessage = "enable TRACE for Message"; + if (log.isTraceEnabled()) { + // Rewind ordinary read attempt + in.position(beforeReadPosition); + + debuggedMessage = JacksonUtil.toJsonDebug(in.asInputStream()); + + } + log.error("Failed to decode message: {}", debuggedMessage , e); + } + finally { + // Set back the old limit, as the in buffer might already have data for a new object + in.limit(oldLimit); + + // If the debugging decoder did not read all bytes: forward the position to this object's supposed end + in.position(objectEndLimit); + } + + return true; + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java b/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java new file mode 100644 index 0000000000..9d2144b716 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java @@ -0,0 +1,53 @@ +package com.bakdata.conquery.io.mina; + +import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.common.base.Stopwatch; +import io.dropwizard.util.DataSize; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.filter.codec.ProtocolEncoderOutput; +import org.apache.mina.filter.codec.serialization.ObjectSerializationEncoder; + +@Slf4j +@RequiredArgsConstructor +public class JacksonProtocolEncoder extends ObjectSerializationEncoder { + + private final int SIZE_PREFIX_LENGTH = Integer.BYTES; + + private final ObjectWriter objectWriter; + + @Getter + @Setter + private int initialBufferCapacityBytes = 64; + + @Override + public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { + final IoBuffer buf = IoBuffer.allocate(initialBufferCapacityBytes, false); + buf.setAutoExpand(true); + + buf.skip(SIZE_PREFIX_LENGTH); // Make a room for the length field. + + final Stopwatch stopwatch = Stopwatch.createStarted(); + log.trace("BEGIN Encoding message"); + + objectWriter.writeValue(buf.asOutputStream(), message); + + final int objectSize = buf.position() - SIZE_PREFIX_LENGTH; + + if (objectSize > getMaxObjectSize()) { + throw new IllegalArgumentException("The encoded object is too big: " + objectSize + " (> " + getMaxObjectSize() + ')'); + } + + // Fill the length field + buf.putInt(0, objectSize); + + buf.flip(); + log.trace("FINISHED Encoding message in {}. Buffer size: {}. Message: {}", stopwatch, DataSize.bytes(buf.remaining()), message); + + out.write(buf); + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/MdcFilter.java b/backend/src/main/java/com/bakdata/conquery/io/mina/MdcFilter.java index f79dc4d820..a817b47110 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/MdcFilter.java +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/MdcFilter.java @@ -10,7 +10,7 @@ public class MdcFilter extends CommonEventFilter { private final ThreadLocal callDepth = ThreadLocal.withInitial(() -> 0); - private final String locationFmt; + private final String location; /** * Adapted from {@link org.apache.mina.filter.logging.MdcInjectionFilter} @@ -25,7 +25,7 @@ protected void filter(IoFilterEvent event) throws Exception { if (currentCallDepth == 0) { /* copy context to the MDC when necessary. */ - ConqueryMDC.setLocation(String.format(locationFmt, event.getSession().getLocalAddress().toString())); + ConqueryMDC.setLocation(location + String.format("[%s]", event.getSession().getLocalAddress().toString())); } try { diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java index 94d04e5e3d..1496bed828 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java @@ -353,8 +353,8 @@ private T read(ObjectReader reader, ByteIterable obj) { } } - private static byte[] debugUnGzip(byte[] bytes) throws IOException { - return new GZIPInputStream(new ByteArrayInputStream(bytes)).readAllBytes(); + private static InputStream debugUnGzip(byte[] bytes) throws IOException { + return new GZIPInputStream(new ByteArrayInputStream(bytes)); } /** diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java index c9828afb38..3bd94f6d4e 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java @@ -1,14 +1,8 @@ package com.bakdata.conquery.mode.cluster; import java.io.IOException; -import java.net.InetSocketAddress; import jakarta.validation.Validator; -import com.bakdata.conquery.io.mina.BinaryJacksonCoder; -import com.bakdata.conquery.io.mina.CQProtocolCodecFilter; -import com.bakdata.conquery.io.mina.ChunkReader; -import com.bakdata.conquery.io.mina.ChunkWriter; -import com.bakdata.conquery.io.mina.MdcFilter; import com.bakdata.conquery.io.mina.MinaAttributes; import com.bakdata.conquery.io.mina.NetworkSession; import com.bakdata.conquery.models.config.ConqueryConfig; @@ -29,7 +23,6 @@ import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; -import org.apache.mina.transport.socket.nio.NioSocketAcceptor; /** * Manager of the connection from the manager to the ConQuery shards. @@ -105,16 +98,10 @@ else if (toManagerNode instanceof SlowMessage slowMessage) { } public void start() throws IOException { - acceptor = new NioSocketAcceptor(); - acceptor.getFilterChain().addFirst("mdc", new MdcFilter("Manager[%s]")); - final ObjectMapper om = internalMapperFactory.createManagerCommunicationMapper(datasetRegistry); - final BinaryJacksonCoder coder = new BinaryJacksonCoder(datasetRegistry, validator, om); - acceptor.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(config.getCluster(), coder), new ChunkReader(coder, om))); - acceptor.setHandler(this); - acceptor.getSessionConfig().setAll(config.getCluster().getMina()); - acceptor.bind(new InetSocketAddress(config.getCluster().getPort())); + acceptor = config.getCluster().getClusterAcceptor(om, this, "Manager"); + log.info("Started ManagerNode @ {}", acceptor.getLocalAddress()); } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java index 4d108b330a..0b3e4ac643 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java @@ -5,11 +5,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import com.bakdata.conquery.io.mina.BinaryJacksonCoder; -import com.bakdata.conquery.io.mina.CQProtocolCodecFilter; -import com.bakdata.conquery.io.mina.ChunkReader; -import com.bakdata.conquery.io.mina.ChunkWriter; -import com.bakdata.conquery.io.mina.MdcFilter; import com.bakdata.conquery.io.mina.NetworkSession; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.jobs.JobManager; @@ -119,7 +114,7 @@ private void connectToCluster() { disconnectFromCluster(); - connector = getClusterConnector(workers); + connector = getClusterConnector(); while (true) { try { @@ -166,17 +161,10 @@ private void disconnectFromCluster() { } @NotNull - private NioSocketConnector getClusterConnector(ShardWorkers workers) { + private NioSocketConnector getClusterConnector() { ObjectMapper om = internalMapperFactory.createShardCommunicationMapper(); - final NioSocketConnector connector = new NioSocketConnector(); - - final BinaryJacksonCoder coder = new BinaryJacksonCoder(workers, environment.getValidator(), om); - connector.getFilterChain().addFirst("mdc", new MdcFilter("Shard[%s]")); - connector.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(config.getCluster(), coder), new ChunkReader(coder, om))); - connector.setHandler(this); - connector.getSessionConfig().setAll(config.getCluster().getMina()); - return connector; + return config.getCluster().getClusterConnector(om, this, "Shard"); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java index d7af7d5e0d..1a539812bd 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java @@ -18,10 +18,10 @@ import com.bakdata.conquery.models.datasets.concepts.Concept; import com.bakdata.conquery.models.datasets.concepts.Connector; import com.bakdata.conquery.models.events.Bucket; +import com.bakdata.conquery.models.identifiable.ids.specific.BucketId; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; import com.bakdata.conquery.models.identifiable.ids.specific.TableId; -import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; import com.bakdata.conquery.models.messages.namespaces.specific.AddImport; import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket; import com.bakdata.conquery.models.messages.namespaces.specific.RemoveImportJob; @@ -35,6 +35,7 @@ import lombok.AllArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.mina.core.future.WriteFuture; /** * Handler of {@link Import} requests that realizes them both on the manager and the cluster's shards. @@ -48,10 +49,10 @@ public class ClusterImportHandler implements ImportHandler { @SneakyThrows @Override public void updateImport(Namespace namespace, InputStream inputStream) { - handleImport(namespace, inputStream, true, datasetRegistry); + handleImport(namespace, inputStream, true); } - private static void handleImport(Namespace namespace, InputStream inputStream, boolean update, DatasetRegistry datasetRegistry) throws IOException { + private static void handleImport(Namespace namespace, InputStream inputStream, boolean update) throws IOException { try (PreprocessedReader parser = new PreprocessedReader(inputStream, namespace.getPreprocessMapper())) { // We parse semi-manually as the incoming file consist of multiple documents we read progressively: // 1) the header to check metadata @@ -61,7 +62,7 @@ private static void handleImport(Namespace namespace, InputStream inputStream, b final Table table = validateImportable(((DistributedNamespace) namespace), header, update); - readAndDistributeImport(((DistributedNamespace) namespace), table, header, parser, datasetRegistry); + readAndDistributeImport(((DistributedNamespace) namespace), table, header, parser); clearDependentConcepts(namespace.getStorage().getAllConcepts(), table.getId()); } @@ -108,7 +109,7 @@ else if (processedImport != null) { return table; } - private static void readAndDistributeImport(DistributedNamespace namespace, Table table, PreprocessedHeader header, PreprocessedReader reader, DatasetRegistry datasetRegistry) { + private static void readAndDistributeImport(DistributedNamespace namespace, Table table, PreprocessedHeader header, PreprocessedReader reader) { final TableId tableId = new TableId(namespace.getDataset().getId(), header.getTable()); final ImportId importId = new ImportId(tableId, header.getName()); @@ -131,14 +132,20 @@ private static void readAndDistributeImport(DistributedNamespace namespace, Tabl final Bucket bucket = Bucket.fromPreprocessed(table, container, imp); - log.trace("DONE reading bucket `{}`, contains {} entities.", bucket.getId(), bucket.entities().size()); + final BucketId bucketId = bucket.getId(); + log.trace("DONE reading bucket `{}`, contains {} entities.", bucketId, bucket.entities().size()); - final WorkerInformation responsibleWorker = namespace.getWorkerHandler().assignResponsibleWorker(bucket.getId()); + final WorkerInformation responsibleWorker = namespace.getWorkerHandler().assignResponsibleWorker(bucketId); - sendBucket(bucket, responsibleWorker); + sendBucket(bucket, responsibleWorker).addListener((f) -> { + if(((WriteFuture)f).isWritten()) { + log.trace("Sent Bucket {}", bucketId); + return; + } + log.warn("Failed to send Bucket {}", bucketId); + }); // NOTE: I want the bucket to be GC'd as early as possible, so I just store the part(s) I need later. - collectedEntities.put(bucket.getBucket(), bucket.entities()); } @@ -161,20 +168,19 @@ private static void clearDependentConcepts(Stream> allConcepts, Table /** * select, then send buckets. */ - public static WorkerId sendBucket(Bucket bucket, WorkerInformation responsibleWorker) { + public static WriteFuture sendBucket(Bucket bucket, WorkerInformation responsibleWorker) { responsibleWorker.awaitFreeJobQueue(); log.trace("Sending Bucket[{}] to {}", bucket.getId(), responsibleWorker.getId()); - responsibleWorker.send(new ImportBucket(bucket.getId().toString(), bucket)); + return responsibleWorker.send(new ImportBucket(bucket.getId().toString(), bucket)); - return responsibleWorker.getId(); } @SneakyThrows @Override public void addImport(Namespace namespace, InputStream inputStream) { - handleImport(namespace, inputStream, false, datasetRegistry); + handleImport(namespace, inputStream, false); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/models/auth/AuthorizationController.java b/backend/src/main/java/com/bakdata/conquery/models/auth/AuthorizationController.java index 655e93da5b..72b3f186b2 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/auth/AuthorizationController.java +++ b/backend/src/main/java/com/bakdata/conquery/models/auth/AuthorizationController.java @@ -34,11 +34,12 @@ import org.apache.shiro.SecurityUtils; import org.apache.shiro.authc.pam.FirstSuccessfulStrategy; import org.apache.shiro.authc.pam.ModularRealmAuthenticator; +import org.apache.shiro.lang.util.LifecycleUtils; import org.apache.shiro.mgt.DefaultSecurityManager; import org.apache.shiro.realm.AuthorizingRealm; import org.apache.shiro.realm.Realm; -import org.apache.shiro.util.LifecycleUtils; import org.glassfish.hk2.utilities.binding.AbstractBinder; +import org.jetbrains.annotations.NotNull; /** * The central class for the initialization of authorization and authentication. @@ -78,7 +79,7 @@ public final class AuthorizationController implements Managed { @Getter private DropwizardResourceConfig unprotectedAuthAdmin; - public AuthorizationController(MetaStorage storage, ConqueryConfig config, Environment environment, AdminServlet adminServlet) { + public AuthorizationController(@NotNull MetaStorage storage, @NotNull ConqueryConfig config, @NotNull Environment environment, AdminServlet adminServlet) { this.storage = storage; this.config = config; this.environment = environment; diff --git a/backend/src/main/java/com/bakdata/conquery/models/auth/basic/LocalAuthenticationRealm.java b/backend/src/main/java/com/bakdata/conquery/models/auth/basic/LocalAuthenticationRealm.java index ca4759999b..e015226e9b 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/auth/basic/LocalAuthenticationRealm.java +++ b/backend/src/main/java/com/bakdata/conquery/models/auth/basic/LocalAuthenticationRealm.java @@ -41,8 +41,8 @@ import org.apache.shiro.authc.AuthenticationToken; import org.apache.shiro.authc.CredentialsException; import org.apache.shiro.authc.IncorrectCredentialsException; +import org.apache.shiro.lang.util.Destroyable; import org.apache.shiro.realm.AuthenticatingRealm; -import org.apache.shiro.util.Destroyable; /** * This realm stores credentials in a local database ({@link XodusStore}). Upon @@ -52,7 +52,7 @@ * authorization related user information that is saved in the * {@link MetaStorage}. So adding or removing a user in this realm does * not change the {@link MetaStorage}. {@link Conquery} interacts with - * this realm using the Shiro frame work. However, endusers can interface it + * this realm using the Shiro framework. However, endusers can interface it * through specific endpoints that are registerd by this realm. */ @Slf4j diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java index f404aa1a48..e94c1ee3ce 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java @@ -1,16 +1,30 @@ package com.bakdata.conquery.models.config; +import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import jakarta.validation.Valid; +import jakarta.validation.constraints.Max; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; +import com.bakdata.conquery.io.mina.ChunkingFilter; +import com.bakdata.conquery.io.mina.JacksonProtocolDecoder; +import com.bakdata.conquery.io.mina.JacksonProtocolEncoder; +import com.bakdata.conquery.io.mina.MdcFilter; +import com.bakdata.conquery.models.messages.network.NetworkMessage; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.databind.ObjectMapper; import io.dropwizard.core.Configuration; -import io.dropwizard.util.DataSize; import io.dropwizard.util.Duration; import io.dropwizard.validation.PortRange; import lombok.Getter; import lombok.Setter; +import org.apache.mina.core.service.IoHandler; +import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.transport.socket.DefaultSocketSessionConfig; +import org.apache.mina.transport.socket.nio.NioSocketAcceptor; +import org.apache.mina.transport.socket.nio.NioSocketConnector; @Getter @Setter @@ -22,7 +36,7 @@ public class ClusterConfig extends Configuration { private InetAddress managerURL = InetAddress.getLoopbackAddress(); @Valid @NotNull - private MinaConfig mina = new MinaConfig(); + private DefaultSocketSessionConfig mina = new DefaultSocketSessionConfig(); @Min(1) private int entityBucketSize = 1000; @@ -30,6 +44,24 @@ public class ClusterConfig extends Configuration { private Duration heartbeatTimeout = Duration.minutes(1); private Duration connectRetryTimeout = Duration.seconds(30); + /** + * Defines the maximum buffer size inclusive 4 bytes for a header. Objects larger than this size cannot be sent over the cluster. + *

+ * May only touch this for testing purposes. + */ + @Max(Integer.MAX_VALUE - 4) + @Min(64) // not practical + private int maxIoBufferSizeBytes = Integer.MAX_VALUE - 4; + + /** + * Defines the starting buffer allocation size. Larger can reduce reallocations, but can cause a greater memory demand. + *

+ * May only touch this for testing purposes. + */ + @Max(Integer.MAX_VALUE - 4) + @Min(64) // Mina's default + private int initialIoBufferSizeBytes = 8192; // 8kb + /** * @see com.bakdata.conquery.models.messages.namespaces.specific.CollectColumnValuesJob * @@ -44,27 +76,6 @@ public class ClusterConfig extends Configuration { */ private int networkSessionMaxQueueLength = 5; - /** - * {@link org.apache.mina.core.buffer.IoBuffer} size, that mina allocates. - * We assume a pagesize of 4096 bytes == 4 kibibytes - */ - @NotNull - @Valid - private DataSize messageChunkSize = DataSize.kibibytes(4); - - /** - * How long the soft pool cleaner waits before reducing the pool size down to softPoolBaselineSize. - */ - @NotNull - @Valid - private Duration softPoolCleanerPause = Duration.seconds(10); - - /** - * The number of soft references the soft pool should retain after cleaning. - * The actual number of {@link org.apache.mina.core.buffer.IoBuffer} - */ - private long softPoolBaselineSize = 100; - /** * Amount of backpressure before jobs can volunteer to block to send messages to their shards. *

@@ -72,4 +83,56 @@ public class ClusterConfig extends Configuration { */ @Min(0) private int backpressure = 1500; + + @JsonIgnore + public NioSocketConnector getClusterConnector(ObjectMapper om, IoHandler ioHandler, String mdcLocation) { + + final NioSocketConnector connector = new NioSocketConnector(); + + JacksonProtocolEncoder encoder = new JacksonProtocolEncoder(om.writerFor(NetworkMessage.class)); + encoder.setMaxObjectSize(maxIoBufferSizeBytes); + encoder.setInitialBufferCapacityBytes(initialIoBufferSizeBytes); + + ProtocolCodecFilter codecFilter = new ProtocolCodecFilter( + encoder, + new JacksonProtocolDecoder(om.readerFor(NetworkMessage.class)) + ); + connector.getFilterChain().addFirst("mdc", new MdcFilter(mdcLocation)); + if (mina.getSendBufferSize() > 0) { + connector.getFilterChain().addLast("chunk", new ChunkingFilter(mina.getSendBufferSize())); + } + connector.getFilterChain().addLast("codec", codecFilter); + + connector.setHandler(ioHandler); + connector.getSessionConfig().setAll(getMina()); + + return connector; + } + + @JsonIgnore + public NioSocketAcceptor getClusterAcceptor(ObjectMapper om, IoHandler ioHandler, String mdcLocation) throws IOException { + NioSocketAcceptor acceptor = new NioSocketAcceptor(); + + + JacksonProtocolEncoder encoder = new JacksonProtocolEncoder(om.writerFor(NetworkMessage.class)); + encoder.setMaxObjectSize(maxIoBufferSizeBytes); + encoder.setInitialBufferCapacityBytes(initialIoBufferSizeBytes); + + ProtocolCodecFilter codecFilter = new ProtocolCodecFilter( + encoder, + new JacksonProtocolDecoder(om.readerFor(NetworkMessage.class)) + ); + + acceptor.getFilterChain().addFirst("mdc", new MdcFilter(mdcLocation)); + if (mina.getSendBufferSize() > 0) { + acceptor.getFilterChain().addLast("chunk", new ChunkingFilter(mina.getSendBufferSize())); + } + acceptor.getFilterChain().addLast("codec", codecFilter); + + acceptor.setHandler(ioHandler); + acceptor.getSessionConfig().setAll(getMina()); + acceptor.bind(new InetSocketAddress(getPort())); + + return acceptor; + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java b/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java index 4488c38e11..236cf62019 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java @@ -221,7 +221,7 @@ public SingletonStore createIdMappingStore(String pathName, ObjectM openStoresInEnv.put(bigStore.getDataXodusStore().getEnvironment(), bigStore.getDataXodusStore()); openStoresInEnv.put(bigStore.getMetaXodusStore().getEnvironment(), bigStore.getMetaXodusStore()); - return new SingletonStore<>(bigStore); + return new SingletonStore<>(new CachedStore<>(bigStore)); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/util/DateContext.java b/backend/src/main/java/com/bakdata/conquery/models/forms/util/DateContext.java index 1a1c1c2f82..96270441f3 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/util/DateContext.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/util/DateContext.java @@ -96,7 +96,7 @@ public static Function> getDateRangeSubdivider(Align int alignedPerResolution = resolution.getAmountForAlignment(alignment).orElseThrow(() -> new ConqueryError.ExecutionCreationPlanDateContextError(alignment, resolution)); if (alignedPerResolution == 1) { - // When the alignment fits the resolution we can use the the alignment subdivision directly + // When the alignment fits the resolution we can use the alignment subdivision directly return (dateRange) -> alignment.getSubdivider().apply(dateRange); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java index 95ca03ecf4..526be8e7b5 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java @@ -36,6 +36,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.mina.core.future.WriteFuture; /** @@ -67,7 +68,8 @@ public void react(Worker context) throws Exception { final Map> table2Buckets = context.getStorage().getAllBuckets() .collect(Collectors.groupingBy(Bucket::getTable)); - final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS)); + BasicThreadFactory threadFactory = (new BasicThreadFactory.Builder()).namingPattern(this.getClass().getSimpleName() + "-Worker-%d").build(); + final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS, threadFactory)); final AtomicInteger done = new AtomicInteger(); @@ -84,7 +86,7 @@ public void react(Worker context) throws Exception { .flatMap(bucket -> ((StringStore) bucket.getStore(column)).streamValues()) .collect(Collectors.toSet()); - log.trace("Finished collections values for column {} as number {}", column, done.incrementAndGet()); + log.trace("Finished collecting {} values for column {}", values.size(), column); // Chunk values, to produce smaller messages Iterable> partition = Iterables.partition(values, columValueChunkSize); @@ -93,14 +95,18 @@ public void react(Worker context) throws Exception { column.getId(), values.size(), columValueChunkSize ); + int i = 0; for (List chunk : partition) { // Send values to manager RegisterColumnValues message = new RegisterColumnValues(getMessageId(), context.getInfo().getId(), column.getId(), chunk); WriteFuture send = context.send(message); - send.awaitUninterruptibly(); + log.trace("Finished sending chunk {} for column '{}'", i++, column.getId()); } + + getProgressReporter().report(1); + log.trace("Finished collections values for column {} as number {}", column, done.incrementAndGet()); }); } ) @@ -124,6 +130,7 @@ public void react(Worker context) throws Exception { // We may do this, because we own this specific ExecutorService. jobsExecutorService.shutdown(); + getProgressReporter().done(); log.info("Finished collecting values from these columns: {}", Arrays.toString(columns.toArray())); context.send(new FinalizeReactionMessage(getMessageId(), context.getInfo().getId())); diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java index 94af290f66..c59b181ea2 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java @@ -80,7 +80,6 @@ public void execute() throws Exception { calculateConceptMatches(resolved, matchingStats, worker); final WriteFuture writeFuture = worker.send(new UpdateElementMatchingStats(worker.getInfo().getId(), matchingStats)); - writeFuture.awaitUninterruptibly(); progressReporter.report(1); }, worker.getJobsExecutorService()) diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java b/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java index 4984115649..5404597fa7 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java @@ -27,7 +27,7 @@ public FormShardResult(ManagedExecutionId formId, ManagedExecutionId subQueryId, * Distribute the result to a sub query. */ @Override - public void addResult(DistributedExecutionManager executionManager) { + protected void addResult(DistributedExecutionManager executionManager) { final ManagedInternalForm managedInternalForm = (ManagedInternalForm) executionManager.getExecution(getFormId()); final ManagedQuery subQuery = managedInternalForm.getSubQuery(getQueryId()); diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/results/ShardResult.java b/backend/src/main/java/com/bakdata/conquery/models/query/results/ShardResult.java index 8c5a192593..4c827857d6 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/results/ShardResult.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/results/ShardResult.java @@ -82,7 +82,7 @@ public synchronized void finish(@NonNull List results, Optional> nodeProvider; - public void addRoles(List roles) { - - for (Role role : roles) { - try { - addRole(role); - } - catch (Exception e) { - log.error(String.format("Failed to add Role: %s", role), e); - } - } - } - public synchronized void addRole(Role role) throws JSONException { ValidatorHelper.failOnError(log, validator.validate(role)); log.trace("New role:\tLabel: {}\tName: {}\tId: {} ", role.getLabel(), role.getName(), role.getId()); @@ -296,7 +284,7 @@ public Object executeScript(String script) { groovy.setProperty("managerNode", getManagerNode()); groovy.setProperty("datasetRegistry", getDatasetRegistry()); groovy.setProperty("jobManager", getJobManager()); - groovy.setProperty("config", getConfig()); + groovy.setProperty("conqueryConfig", getConfig()); groovy.setProperty("storage", getStorage()); try { diff --git a/backend/src/main/java/com/bakdata/conquery/tasks/PermissionCleanupTask.java b/backend/src/main/java/com/bakdata/conquery/tasks/PermissionCleanupTask.java index 98c00a29ad..784e8f9620 100644 --- a/backend/src/main/java/com/bakdata/conquery/tasks/PermissionCleanupTask.java +++ b/backend/src/main/java/com/bakdata/conquery/tasks/PermissionCleanupTask.java @@ -133,7 +133,7 @@ public static & Owned, ID extends Id> int del if (wpermission.getInstances().size() != 1) { log.trace("Skipping permission {} because it refers to multiple instances.", wpermission); } - ID executionId = null; + ID executionId; try { executionId = idParser.parse(wpermission.getInstances().iterator().next()); } @@ -144,14 +144,17 @@ public static & Owned, ID extends Id> int del E execution = instanceStorageExtractor.apply(executionId); if (execution == null) { - log.trace("The execution referenced in permission {} does not exist. Skipping permission"); + log.trace("The execution referenced in permission {} does not exist. Skipping permission", wpermission); continue; } if (!user.isOwner(execution)) { - log.trace("The user is not owner of the instance. Keeping the permission. User: {}, Owner: {}, Instance: {}, Permission: {}", user.getId(), execution - .getOwner(), execution - .getId(), wpermission); + log.trace("The user is not owner of the instance. Keeping the permission. User: {}, Owner: {}, Instance: {}, Permission: {}", + user.getId(), + execution.getOwner(), + execution.getId(), + wpermission + ); continue; } diff --git a/backend/src/main/java/com/bakdata/conquery/util/SoftPool.java b/backend/src/main/java/com/bakdata/conquery/util/SoftPool.java deleted file mode 100644 index 9f58bd4d72..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/util/SoftPool.java +++ /dev/null @@ -1,93 +0,0 @@ -package com.bakdata.conquery.util; - -import java.lang.ref.SoftReference; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; - -import com.bakdata.conquery.models.config.ClusterConfig; -import com.google.common.util.concurrent.Uninterruptibles; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class SoftPool { - - private final ConcurrentLinkedDeque> pool = new ConcurrentLinkedDeque<>(); - private final AtomicLong poolSize = new AtomicLong(0); - private final Supplier supplier; - private final Thread poolCleaner; - private final long softPoolBaselineSize; - private final long cleanerPauseSeconds; - - public SoftPool(ClusterConfig config, Supplier supplier) { - this.supplier = supplier; - - softPoolBaselineSize = config.getSoftPoolBaselineSize(); - cleanerPauseSeconds = config.getSoftPoolCleanerPause().toSeconds(); - - if (softPoolBaselineSize <= 0 || cleanerPauseSeconds <= 0) { - log.debug("Not creating a Cleaner."); - poolCleaner = null; - return; - } - - poolCleaner = new Thread(this::cleanPool, "SoftPool Cleaner"); - // Should not prevent the JVM shutdown -> daemon - poolCleaner.setDaemon(true); - poolCleaner.start(); - } - - /** - * Offer/return a reusable object to the pool. - * - * @param v the object to return to the pool. - */ - public void offer(T v) { - pool.addLast(new SoftReference<>(v)); - - final long currentPoolSize = poolSize.incrementAndGet(); - - log.trace("Pool size: {} (offer)", currentPoolSize); - } - - /** - * Returns a reusable element from the pool if available or - * returns a new element from the provided supplier. - */ - public T borrow() { - SoftReference result; - - // First check the pool for available/returned elements - while ((result = pool.poll()) != null) { - final long currentPoolSize = poolSize.decrementAndGet(); - - log.trace("Pool size: {} (borrow)", currentPoolSize); - - // The pool had an element, inspect if it is still valid - final T elem = result.get(); - if (elem != null) { - // Return valid element - return elem; - } - // Referenced element was already garbage collected. Poll further - } - // Pool was empty -- request a new element - return supplier.get(); - } - - /** - * Trims the pool in a custom interval so that soft references get purged earlier - */ - private void cleanPool() { - while (true) { - Uninterruptibles.sleepUninterruptibly(cleanerPauseSeconds, TimeUnit.SECONDS); - - log.trace("Running pool cleaner"); - while (poolSize.get() > softPoolBaselineSize) { - // Poll until we reached the baseline - borrow(); - } - } - } -} diff --git a/backend/src/test/java/com/bakdata/conquery/integration/sql/SqlStandaloneCommand.java b/backend/src/test/java/com/bakdata/conquery/integration/sql/SqlStandaloneCommand.java index 6504e804a2..5b21e5491b 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/sql/SqlStandaloneCommand.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/sql/SqlStandaloneCommand.java @@ -3,7 +3,6 @@ import java.util.Collections; import java.util.List; -import com.bakdata.conquery.Conquery; import com.bakdata.conquery.commands.ManagerNode; import com.bakdata.conquery.commands.ShardNode; import com.bakdata.conquery.commands.StandaloneCommand; @@ -12,9 +11,9 @@ import com.bakdata.conquery.mode.local.LocalManagerProvider; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.worker.LocalNamespace; +import com.bakdata.conquery.util.commands.NoOpConquery; import com.bakdata.conquery.util.io.ConqueryMDC; import io.dropwizard.core.cli.ServerCommand; -import io.dropwizard.core.setup.Bootstrap; import io.dropwizard.core.setup.Environment; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -24,27 +23,11 @@ @Getter public class SqlStandaloneCommand extends ServerCommand implements StandaloneCommand { - private final Conquery conquery; - private ManagerNode managerNode = new ManagerNode(); + private final ManagerNode managerNode = new ManagerNode(); private DelegateManager manager; - private Environment environment; - public SqlStandaloneCommand(Conquery conquery) { - super(conquery, "standalone", "starts a sql server and a client at the same time."); - this.conquery = conquery; - } - - @Override - public void startStandalone(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception { - ConqueryMDC.setLocation("ManagerNode"); - log.debug("Starting ManagerNode"); - this.manager = new LocalManagerProvider(new TestSqlDialectFactory()).provideManager(config, environment); - this.conquery.setManagerNode(managerNode); - this.conquery.run(manager); - // starts the Jersey Server - log.debug("Starting REST Server"); - ConqueryMDC.setLocation(null); - super.run(environment, namespace, config); + public SqlStandaloneCommand() { + super(new NoOpConquery(), "standalone", "starts a sql server and a client at the same time."); } @Override @@ -53,23 +36,14 @@ public List getShardNodes() { } @Override - public void run(Bootstrap bootstrap, Namespace namespace, ConqueryConfig configuration) throws Exception { - environment = new Environment( - bootstrap.getApplication().getName(), - bootstrap.getObjectMapper(), - bootstrap.getValidatorFactory(), - bootstrap.getMetricRegistry(), - bootstrap.getClassLoader(), - bootstrap.getHealthCheckRegistry(), - configuration - ); - configuration.getMetricsFactory().configure(environment.lifecycle(), bootstrap.getMetricRegistry()); - configuration.getServerFactory().configure(environment); - - bootstrap.run(configuration, environment); - startStandalone(environment, namespace, configuration); + protected void run(Environment environment, Namespace namespace, ConqueryConfig configuration) throws Exception { + ConqueryMDC.setLocation("ManagerNode"); + log.debug("Starting ManagerNode"); + this.manager = new LocalManagerProvider(new TestSqlDialectFactory()).provideManager(configuration, environment); + managerNode.run(manager); + // starts the Jersey Server + log.debug("Starting REST Server"); + ConqueryMDC.setLocation(null); + super.run(environment, namespace, configuration); } - - - } diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/ScriptEndTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/ScriptEndTest.java new file mode 100644 index 0000000000..2aa4ff5f99 --- /dev/null +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/ScriptEndTest.java @@ -0,0 +1,30 @@ +package com.bakdata.conquery.integration.tests; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.net.URI; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + +import com.bakdata.conquery.integration.IntegrationTest; +import com.bakdata.conquery.io.storage.MetaStorage; +import com.bakdata.conquery.resources.admin.rest.AdminResource; +import com.bakdata.conquery.resources.hierarchies.HierarchyHelper; +import com.bakdata.conquery.util.support.StandaloneSupport; + +public class ScriptEndTest extends IntegrationTest.Simple implements ProgrammaticIntegrationTest { + @Override + public void execute(StandaloneSupport conquery) throws Exception { + final URI scriptUri = HierarchyHelper.hierarchicalPath(conquery.defaultAdminURIBuilder() + , AdminResource.class, "executeScript") + .build(); + + try(Response resp = conquery.getClient().target(scriptUri).request(MediaType.TEXT_PLAIN_TYPE).post(Entity.entity("storage", MediaType.TEXT_PLAIN_TYPE))){ + assertThat(resp.getStatusInfo().getFamily()).isEqualTo(Response.Status.Family.SUCCESSFUL); + + assertThat(resp.readEntity(String.class)) + .contains(MetaStorage.class.getSimpleName()); + } + } +} diff --git a/backend/src/test/java/com/bakdata/conquery/io/mina/MinaStackTest.java b/backend/src/test/java/com/bakdata/conquery/io/mina/MinaStackTest.java new file mode 100644 index 0000000000..4c7f35048e --- /dev/null +++ b/backend/src/test/java/com/bakdata/conquery/io/mina/MinaStackTest.java @@ -0,0 +1,280 @@ +package com.bakdata.conquery.io.mina; + +import static java.lang.Math.toIntExact; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import com.bakdata.conquery.io.cps.CPSType; +import com.bakdata.conquery.io.jackson.Jackson; +import com.bakdata.conquery.models.config.ClusterConfig; +import com.bakdata.conquery.models.messages.network.NetworkMessage; +import com.bakdata.conquery.models.messages.network.NetworkMessageContext; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.dropwizard.util.DataSize; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.mina.core.future.ConnectFuture; +import org.apache.mina.core.future.WriteFuture; +import org.apache.mina.core.service.IoHandlerAdapter; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.transport.socket.nio.NioSocketAcceptor; +import org.apache.mina.transport.socket.nio.NioSocketConnector; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +@Slf4j +public class MinaStackTest { + + private static final ClusterConfig CLUSTER_CONFIG = new ClusterConfig(); + private static final ObjectMapper OM = Jackson.BINARY_MAPPER.copy(); + private static final ConcurrentLinkedQueue> SERVER_RECEIVED_MESSAGES = new ConcurrentLinkedQueue<>(); + + private static NioSocketAcceptor SERVER; + + @BeforeAll + public static void beforeAll() throws IOException { + + CLUSTER_CONFIG.setPort(0); + CLUSTER_CONFIG.setMaxIoBufferSizeBytes(toIntExact(DataSize.mebibytes(10).toBytes())); + + // This enables the Chunking filter, which triggers for messages > 1 MebiByte + CLUSTER_CONFIG.getMina().setSendBufferSize(toIntExact(DataSize.mebibytes(1).toBytes())); + + // Server + SERVER = CLUSTER_CONFIG.getClusterAcceptor(OM, new IoHandlerAdapter() { + @Override + public void sessionOpened(IoSession session) { + log.info("Session to {} established", session.getRemoteAddress()); + } + + @Override + public void messageReceived(IoSession session, Object message) { + SERVER_RECEIVED_MESSAGES.add((NetworkMessage) message); + log.trace("Received {} messages", SERVER_RECEIVED_MESSAGES.size()); + } + + @Override + public void exceptionCaught(IoSession session, Throwable cause) { + fail("Server caught an Exception", cause); + } + }, "Server"); + + } + + @BeforeEach + public void beforeEach() { + SERVER_RECEIVED_MESSAGES.clear(); + } + + @Test + void smokeTest() { + + NioSocketConnector client = CLUSTER_CONFIG.getClusterConnector(OM, new IoHandlerAdapter() { + @Override + public void sessionOpened(IoSession session) { + log.info("Session to {} established", session.getRemoteAddress()); + } + }, "Client"); + + try { + + ConnectFuture connect = client.connect(SERVER.getLocalAddress()); + + connect.awaitUninterruptibly(); + IoSession clientSession = connect.getSession(); + + NetworkMessage input = new TestMessage(RandomStringUtils.randomAscii(1000)); + + WriteFuture write = clientSession.write(input); + + write.awaitUninterruptibly(); + + await().atMost(1, TimeUnit.SECONDS).until(() -> !SERVER_RECEIVED_MESSAGES.isEmpty()); + assertThat(SERVER_RECEIVED_MESSAGES).containsExactlyInAnyOrder(input); + + clientSession.closeNow().awaitUninterruptibly(); + } + finally { + client.dispose(); + + } + } + + /** + * This test requires a little RAM because we hold the messages twice to compare sender and receiver payloads. + */ + @Test + void concurrentWriting(){ + final int clientCount = 20; + final int messagesPerClient = 500; + final int minMessageLength = toIntExact(DataSize.kibibytes(1).toBytes()); + final int maxMessageLength = toIntExact(DataSize.kibibytes(100).toBytes()); + + final ConcurrentLinkedQueue> messagesWritten = new ConcurrentLinkedQueue<>(); + final List> clientThreads = new ArrayList<>(); + + ExecutorService executorService = Executors.newFixedThreadPool(10); + try { + for (int clientI = 0; clientI < clientCount; clientI++) { + final int clientNumber = clientI; + CompletableFuture clientThread = CompletableFuture.runAsync(() -> { + NioSocketConnector client = CLUSTER_CONFIG.getClusterConnector(OM, new IoHandlerAdapter() { + @Override + public void sessionOpened(IoSession session) { + log.info("Session to {} established", session.getRemoteAddress()); + } + + @Override + public void messageSent(IoSession session, Object message) { + log.trace("Message written: {} bytes", ((TestMessage)message).data.getBytes().length); + } + + @Override + public void exceptionCaught(IoSession session, Throwable cause) { + fail("Client[%d] caught an Exception".formatted(clientNumber), cause); + } + }, "Client"); + try { + // Connect + ConnectFuture connect = client.connect(SERVER.getLocalAddress()); + connect.awaitUninterruptibly(); + IoSession clientSession = connect.getSession(); + + for (int i = 0; i < messagesPerClient; i++) { + NetworkMessage input = new TestMessage(RandomStringUtils.randomAscii(minMessageLength, maxMessageLength)); + + WriteFuture writeFuture = clientSession.write(input); + writeFuture.addListener((f) -> { + if (!((WriteFuture) f).isWritten()) { + fail("Failed to write a message"); + } + messagesWritten.add(input); + }); + writeFuture.awaitUninterruptibly(); + } + } + finally { + client.dispose(); + } + }, executorService); + clientThreads.add(clientThread); + } + + // Wait until all clients completed writing + CompletableFuture.allOf(clientThreads.toArray(new CompletableFuture[0])).join(); + + log.info("Waiting to receive all send messages"); + // Wait until all messages are received + await().atMost(10,TimeUnit.SECONDS).alias("Send and received same amount of messages").until(() -> SERVER_RECEIVED_MESSAGES.size() == messagesWritten.size()); + + // Check that the messages are correct + assertThat(SERVER_RECEIVED_MESSAGES).containsExactlyInAnyOrderElementsOf(messagesWritten); + + } + finally { + executorService.shutdownNow(); + } + + } + + private static Stream dataSizes() { + return Stream.of( + Arguments.of(DataSize.bytes(10), true), + Arguments.of(DataSize.kibibytes(10), true), + Arguments.of(DataSize.mebibytes(9), true), // Uses chunking + Arguments.of(DataSize.mebibytes(10), false) // Is too large for jackson encoder + ); + } + + @ParameterizedTest + @MethodSource("dataSizes") + void messageSizes(DataSize dataSize, boolean shouldPass) { + NioSocketConnector client = CLUSTER_CONFIG.getClusterConnector(OM, new IoHandlerAdapter() { + @Override + public void sessionOpened(IoSession session) { + log.info("Session to {} established", session.getRemoteAddress()); + } + + @Override + public void exceptionCaught(IoSession session, Throwable cause) { + log.trace("Failed to write message (probably expected)",cause); + } + }, "Client"); + + try { + + ConnectFuture connect = client.connect(SERVER.getLocalAddress()); + + connect.awaitUninterruptibly(); + IoSession clientSession = connect.getSession(); + + NetworkMessage input = new TestMessage(RandomStringUtils.randomAscii(toIntExact(dataSize.toBytes()))); + + WriteFuture write = clientSession.write(input); + + write.awaitUninterruptibly(); + + assertThat(write.isWritten()) + .describedAs(() -> write.getException().getMessage()) + .isEqualTo(shouldPass); + + Assertions.setMaxStackTraceElementsDisplayed(200); + if (!shouldPass) { + assertThat(write.getException()).hasCauseInstanceOf(IllegalArgumentException.class); + } + + clientSession.closeNow().awaitUninterruptibly(); + } + finally { + client.dispose(); + + } + } + + @AfterAll + public static void afterAll() { + SERVER.dispose(); + } + + public static class TestNetworkMessageContext extends NetworkMessageContext { + + public TestNetworkMessageContext(NetworkSession session) { + super(session, 0); + } + } + + @CPSType(id = "TEST_MSG", base = NetworkMessage.class) + @RequiredArgsConstructor(onConstructor_ = @JsonCreator) + @Getter + @EqualsAndHashCode(callSuper = false) + public static class TestMessage extends NetworkMessage { + + private final String data; + + @Override + public void react(TestNetworkMessageContext context) { + // Do nothing + } + } +} diff --git a/backend/src/test/java/com/bakdata/conquery/models/auth/LocalAuthRealmTest.java b/backend/src/test/java/com/bakdata/conquery/models/auth/LocalAuthRealmTest.java index fa560e3c01..1c51ac9d9b 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/auth/LocalAuthRealmTest.java +++ b/backend/src/test/java/com/bakdata/conquery/models/auth/LocalAuthRealmTest.java @@ -21,7 +21,7 @@ import org.apache.shiro.authc.BearerToken; import org.apache.shiro.authc.CredentialsException; import org.apache.shiro.authc.IncorrectCredentialsException; -import org.apache.shiro.util.LifecycleUtils; +import org.apache.shiro.lang.util.LifecycleUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; diff --git a/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java b/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java index 90b6f61be1..955cf7300a 100644 --- a/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java +++ b/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java @@ -16,7 +16,6 @@ import jakarta.ws.rs.client.Client; import jakarta.ws.rs.core.UriBuilder; -import com.bakdata.conquery.Conquery; import com.bakdata.conquery.commands.DistributedStandaloneCommand; import com.bakdata.conquery.commands.ShardNode; import com.bakdata.conquery.commands.StandaloneCommand; @@ -177,12 +176,12 @@ public void waitUntilWorkDone() { if (Duration.ofNanos(System.nanoTime() - started).toSeconds() > 10) { started = System.nanoTime(); - log.warn("waiting for done work for a long time", new Exception()); + log.warn("Waiting for done work for a long time", new Exception("This Exception marks the stacktrace, to show where we are waiting.")); } } while (true); } - log.trace("all jobs finished"); + log.trace("All jobs finished"); } public UriBuilder defaultAdminURIBuilder() { @@ -230,10 +229,10 @@ public void beforeAll() throws Exception { // define server dropwizard = new DropwizardTestSupport<>(TestBootstrappingConquery.class, config, app -> { if (config.getSqlConnectorConfig().isEnabled()) { - standaloneCommand = new SqlStandaloneCommand((Conquery) app); + standaloneCommand = new SqlStandaloneCommand(); } else { - standaloneCommand = new DistributedStandaloneCommand((Conquery) app); + standaloneCommand = new DistributedStandaloneCommand(); } return (Command) standaloneCommand; }); diff --git a/pom.xml b/pom.xml index 9329496399..935b4718d3 100644 --- a/pom.xml +++ b/pom.xml @@ -17,12 +17,12 @@ - 17 - 1.18.24 + 21 + 1.18.36 UTF-8 ${java.required} ${java.required} - 3.3.0 + 3.8.1 yyyy-MM-dd'T'HH:mm:ssXXX ${maven.build.timestamp} 0.0.0-SNAPSHOT