Skip to content
This repository was archived by the owner on Sep 26, 2024. It is now read-only.

Commit

Permalink
KAFKA-14759: Move Mock, Schema, and Verifiable connectors to new test…
Browse files Browse the repository at this point in the history
…-plugins module (apache#13302)

Reviewers: Hector Geraldino <[email protected]>, Chris Egerton <[email protected]>
  • Loading branch information
gharris1727 authored Aug 16, 2023
1 parent f970ddf commit a9efca0
Show file tree
Hide file tree
Showing 18 changed files with 48 additions and 58 deletions.
16 changes: 15 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ def connectPkgs = [
'connect:file',
'connect:json',
'connect:runtime',
'connect:test-plugins',
'connect:transforms',
'connect:mirror',
'connect:mirror-client'
Expand Down Expand Up @@ -2768,7 +2769,6 @@ project(':connect:runtime') {
implementation libs.reflections
implementation libs.mavenArtifact
implementation libs.swaggerAnnotations
implementation project(':server-common')

// We use this library to generate OpenAPI docs for the REST API, but we don't want or need it at compile
// or run time. So, we add it to a separate configuration, which we use later on during docs generation
Expand All @@ -2778,6 +2778,8 @@ project(':connect:runtime') {
testImplementation project(':core')
testImplementation project(':metadata')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common')
testImplementation project(':connect:test-plugins')

testImplementation libs.easymock
testImplementation libs.junitJupiterApi
Expand Down Expand Up @@ -3075,6 +3077,18 @@ project(':connect:mirror-client') {
}
}

project(':connect:test-plugins') {
archivesBaseName = "connect-test-plugins"

dependencies {
api project(':connect:api')

implementation project(':server-common')
implementation libs.slf4jApi
implementation libs.jacksonDatabind
}
}

task aggregatedJavadoc(type: Javadoc, dependsOn: compileJava) {
def projectsWithJavadoc = subprojects.findAll { it.javadoc.enabled }
source = projectsWithJavadoc.collect { it.sourceSets.main.allJava }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.PluginInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.tools.MockSinkConnector;
import org.apache.kafka.connect.tools.MockSourceConnector;
import org.apache.kafka.connect.tools.SchemaSourceConnector;
import org.apache.kafka.connect.tools.VerifiableSinkConnector;
import org.apache.kafka.connect.tools.VerifiableSourceConnector;
import org.apache.kafka.connect.util.FutureCallback;

import javax.ws.rs.BadRequestException;
Expand All @@ -47,7 +40,6 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -66,34 +58,22 @@ public class ConnectorPluginsResource implements ConnectResource {
private final List<PluginInfo> connectorPlugins;
private long requestTimeoutMs;

static final List<Class<? extends SinkConnector>> SINK_CONNECTOR_EXCLUDES = Arrays.asList(
VerifiableSinkConnector.class,
MockSinkConnector.class
);

static final List<Class<? extends SourceConnector>> SOURCE_CONNECTOR_EXCLUDES = Arrays.asList(
VerifiableSourceConnector.class,
MockSourceConnector.class,
SchemaSourceConnector.class
);

public ConnectorPluginsResource(Herder herder) {
this.herder = herder;
this.connectorPlugins = new ArrayList<>();
this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;

// TODO: improve once plugins are allowed to be added/removed during runtime.
addConnectorPlugins(herder.plugins().sinkConnectors(), SINK_CONNECTOR_EXCLUDES);
addConnectorPlugins(herder.plugins().sourceConnectors(), SOURCE_CONNECTOR_EXCLUDES);
addConnectorPlugins(herder.plugins().transformations(), Collections.emptySet());
addConnectorPlugins(herder.plugins().predicates(), Collections.emptySet());
addConnectorPlugins(herder.plugins().converters(), Collections.emptySet());
addConnectorPlugins(herder.plugins().headerConverters(), Collections.emptySet());
addConnectorPlugins(herder.plugins().sinkConnectors());
addConnectorPlugins(herder.plugins().sourceConnectors());
addConnectorPlugins(herder.plugins().transformations());
addConnectorPlugins(herder.plugins().predicates());
addConnectorPlugins(herder.plugins().converters());
addConnectorPlugins(herder.plugins().headerConverters());
}

private <T> void addConnectorPlugins(Collection<PluginDesc<T>> plugins, Collection<Class<? extends T>> excludes) {
private <T> void addConnectorPlugins(Collection<PluginDesc<T>> plugins) {
plugins.stream()
.filter(p -> !excludes.contains(p.pluginClass()))
.map(PluginInfo::new)
.forEach(connectorPlugins::add);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,8 @@ public void testValidateConfigWithNonExistentAlias() {

@Test
public void testListConnectorPlugins() {
Set<Class<?>> excludes = Stream.of(ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES, ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
Set<PluginInfo> expectedConnectorPlugins = Stream.of(SINK_CONNECTOR_PLUGINS, SOURCE_CONNECTOR_PLUGINS)
.flatMap(Collection::stream)
.filter(p -> !excludes.contains(p.pluginClass()))
.map(PluginInfo::new)
.collect(Collectors.toSet());
Set<PluginInfo> actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(true));
Expand Down Expand Up @@ -390,11 +386,6 @@ public void testConnectorPluginsIncludesClassTypeAndVersionInformation() throws

@Test
public void testListAllPlugins() {
Set<Class<?>> excludes = Stream.of(
ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES,
ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES
).flatMap(Collection::stream)
.collect(Collectors.toSet());
Set<PluginInfo> expectedConnectorPlugins = Stream.of(
SINK_CONNECTOR_PLUGINS,
SOURCE_CONNECTOR_PLUGINS,
Expand All @@ -403,7 +394,6 @@ public void testListAllPlugins() {
TRANSFORMATION_PLUGINS,
PREDICATE_PLUGINS
).flatMap(Collection::stream)
.filter(p -> !excludes.contains(p.pluginClass()))
.map(PluginInfo::new)
.collect(Collectors.toSet());
Set<PluginInfo> actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(false));
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ include 'clients',
'connect:mirror',
'connect:mirror-client',
'connect:runtime',
'connect:test-plugins',
'connect:transforms',
'core',
'examples',
Expand Down
45 changes: 25 additions & 20 deletions tests/kafkatest/services/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,26 +285,31 @@ def append_to_environment_variable(self, envvar, value):
env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
self.environment[envvar] = env_opts

def append_filestream_connectors_to_classpath(self):
def maybe_append_filestream_connectors_to_classpath(self):
if self.include_filestream_connectors:
cwd = os.getcwd()
self.logger.info("Including filestream connectors when starting Connect. "
"Looking for jar locally in: %s" % cwd)
relative_path = "/connect/file/build/libs/"
local_dir = cwd + relative_path
lib_dir = self.path.home() + relative_path
for pwd, dirs, files in os.walk(local_dir):
for file in files:
if file.startswith("connect-file") and file.endswith(".jar"):
# Use the expected directory on the node instead of the path in the driver node
file_path = lib_dir + file
self.logger.debug("Appending %s to Connect worker's CLASSPATH" % file_path)
return "export CLASSPATH=${CLASSPATH}:%s; " % file_path
self.logger.info("Jar with filestream connectors was not found under %s" % lib_dir)
return self.append_module_to_classpath("file")
else:
self.logger.info("Starting Connect without filestream connectors in the CLASSPATH")
return ""

return None
def append_test_plugins_to_classpath(self):
return self.append_module_to_classpath("test-plugins")

def append_module_to_classpath(self, module):
cwd = os.getcwd()
relative_path = "/connect/" + module + "/build/libs/"
local_dir = cwd + relative_path
lib_dir = self.path.home() + relative_path
for pwd, dirs, files in os.walk(local_dir):
for file in files:
if file.endswith(".jar"):
# Use the expected directory on the node instead of the path in the driver node
file_path = lib_dir + file
self.logger.info("Appending %s to Connect worker's CLASSPATH" % file_path)
return "export CLASSPATH=${CLASSPATH}:%s; " % file_path

self.logger.info("Jar not found within %s" % local_dir)
return ""


class ConnectStandaloneService(ConnectServiceBase):
Expand All @@ -327,8 +332,8 @@ def start_cmd(self, node, connector_configs):

cmd += fix_opts_for_new_jvm(node)
cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
classpath = self.append_filestream_connectors_to_classpath()
cmd += classpath if classpath else ""
cmd += self.append_test_plugins_to_classpath()
cmd += self.maybe_append_filestream_connectors_to_classpath()

for envvar in self.environment:
cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
Expand Down Expand Up @@ -388,8 +393,8 @@ def start_cmd(self, node, connector_configs):
for envvar in self.environment:
cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))

classpath = self.append_filestream_connectors_to_classpath()
cmd += classpath if classpath else ""
cmd += self.maybe_append_filestream_connectors_to_classpath()
cmd += self.append_test_plugins_to_classpath()
cmd += "%s %s " % (self.path.script("connect-distributed.sh", node), self.CONFIG_FILE)
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
return cmd
Expand Down

0 comments on commit a9efca0

Please sign in to comment.