Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: support plugin loading in conifg #4974

3 changes: 2 additions & 1 deletion data-prepper-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
implementation libs.reflections.core
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need this anymore.

implementation libs.parquet.common
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation libs.commons.lang3
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-test-common')
testImplementation 'org.skyscreamer:jsonassert:1.5.3'
testImplementation libs.commons.io
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.opensearch.dataprepper.model.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotates a field that uses Data Prepper plugin config as its value.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface UsesDataPrepperPlugin {
/**
* The class type for this plugin.
*
* @return The Java class
* @since 1.2
*/
Class<?> pluginType();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need this if we support it on the target itself.

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,33 @@
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.deser.ContextualDeserializer;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin;
import org.reflections.Reflections;
import org.reflections.scanners.Scanners;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Model class for a Plugin in Configuration YAML containing name of the Plugin and its associated settings
Expand All @@ -36,7 +47,7 @@
@JsonSerialize(using = PluginModel.PluginModelSerializer.class)
@JsonDeserialize(using = PluginModel.PluginModelDeserializer.class)
public class PluginModel {

static final String DEFAULT_PLUGINS_CLASSPATH = "org.opensearch.dataprepper.plugins";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this approach is the right one for a few reasons:

  1. This is leaking information on the plugins classpath out of core and into Data Prepper API.
  2. This class deals with basic serialization and this approach is coupling it with the plugin framework to some degree.
  3. This duplicates concerns and code from the plugin framework.
  4. The plugin framework supports loading plugins from other packages as well, and this does not honor that.

This class - PluginModel - should remain concerned only with the most basic form of a plugin.

Use the plugin framework to detect this metadata. Then, have the schema converter use the plugin framework to get the metadata out of there. It may necessitate new method method on PluginFactory (though I'm not sure on this yet).

private static final ObjectMapper SERIALIZER_OBJECT_MAPPER = new ObjectMapper();

private final String pluginName;
Expand Down Expand Up @@ -148,11 +159,15 @@ public PluginModelDeserializer() {
*
* @see SinkModel.SinkModelDeserializer
*/
abstract static class AbstractPluginModelDeserializer<T extends PluginModel, M extends InternalJsonModel> extends StdDeserializer<PluginModel> {
abstract static class AbstractPluginModelDeserializer<T extends PluginModel, M extends InternalJsonModel>
extends StdDeserializer<PluginModel> implements ContextualDeserializer {

private final Class<M> innerModelClass;
private final BiFunction<String, M, T> constructorFunction;
private final Supplier<M> emptyInnerModelConstructor;
private final Reflections reflections;

private UsesDataPrepperPlugin usesDataPrepperPlugin;

protected AbstractPluginModelDeserializer(
final Class<T> valueClass,
Expand All @@ -163,6 +178,18 @@ protected AbstractPluginModelDeserializer(
this.innerModelClass = innerModelClass;
this.constructorFunction = constructorFunction;
this.emptyInnerModelConstructor = emptyInnerModelConstructor;
this.reflections = new Reflections(new ConfigurationBuilder()
.setUrls(ClasspathHelper.forPackage(DEFAULT_PLUGINS_CLASSPATH))
.setScanners(Scanners.TypesAnnotated, Scanners.SubTypes));
}

@Override
public AbstractPluginModelDeserializer<?, ?> createContextual(
final DeserializationContext context, final BeanProperty property) {
if (Objects.nonNull(property)) {
usesDataPrepperPlugin = property.getAnnotation(UsesDataPrepperPlugin.class);
}
return this;
}

@Override
Expand All @@ -175,12 +202,33 @@ public PluginModel deserialize(final JsonParser jsonParser, final Deserializatio
final String pluginName = onlyField.getKey();
final JsonNode value = onlyField.getValue();

if (usesDataPrepperPlugin != null) {
final Set<String> pluginNames = scanForPluginNames(usesDataPrepperPlugin.pluginType());
if (!pluginNames.contains(pluginName)) {
throw new IOException(String.format("%s is not found on %s.",
pluginName, usesDataPrepperPlugin.pluginType()));
}
}

M innerModel = SERIALIZER_OBJECT_MAPPER.convertValue(value, innerModelClass);
if(innerModel == null)
innerModel = emptyInnerModelConstructor.get();

return constructorFunction.apply(pluginName, innerModel);
}

private Set<String> scanForPluginNames(final Class<?> pluginType) {
return reflections.getTypesAnnotatedWith(DataPrepperPlugin.class).stream()
.map(clazz -> clazz.getAnnotation(DataPrepperPlugin.class))
.filter(dataPrepperPlugin -> pluginType.equals(dataPrepperPlugin.pluginType()))
.flatMap(dataPrepperPlugin -> {
if (!dataPrepperPlugin.deprecatedName().isEmpty()) {
return Stream.of(dataPrepperPlugin.deprecatedName(), dataPrepperPlugin.name());
}
return Stream.of(dataPrepperPlugin.name());
})
.collect(Collectors.toSet());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.model.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
Expand All @@ -14,6 +15,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin;
import org.opensearch.dataprepper.model.processor.Processor;

import java.io.BufferedReader;
import java.io.IOException;
Expand All @@ -27,9 +30,11 @@
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.jupiter.api.Assertions.assertThrows;

class PluginModelTests {

Expand Down Expand Up @@ -90,6 +95,35 @@ final void deserialize_with_empty_inner(final String resourceName) throws IOExce
assertThat(pluginModel.getPluginSettings().size(), equalTo(0));
}

@Test
final void deserialize_PluginModel_attribute_matches_UsesDataPrepperPlugin_annotation() throws IOException {
final InputStream inputStream = PluginModelTests.class.getResourceAsStream(
"test_config_with_plugin_model_attribute_and_existing_plugin.yaml");

final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
final TestConfig testConfig = mapper.readValue(inputStream, TestConfig.class);
assertThat(testConfig.getPluginModel(), instanceOf(PluginModel.class));
assertThat(testConfig.getPluginModel().getPluginName(), equalTo("test_plugin"));
assertThat(testConfig.getPluginModel().getPluginSettings(), notNullValue());
assertThat(testConfig.getPluginModel().getPluginSettings().size(), equalTo(0));
assertThat(testConfig.getPluginModelDeprecated(), instanceOf(PluginModel.class));
assertThat(testConfig.getPluginModelDeprecated().getPluginName(), equalTo("test_plugin_deprecated"));
assertThat(testConfig.getPluginModelDeprecated().getPluginSettings(), notNullValue());
assertThat(testConfig.getPluginModelDeprecated().getPluginSettings().size(), equalTo(0));
}

@Test
final void deserialize_PluginModel_attribute_does_not_match_UsesDataPrepperPlugin_annotation() {
final InputStream inputStream = PluginModelTests.class.getResourceAsStream(
"test_config_with_plugin_model_attribute_but_non_existing_plugin.yaml");

final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
final IOException exception = assertThrows(
IOException.class, () -> mapper.readValue(inputStream, TestConfig.class));
assertThat(exception.getMessage(), equalTo(
"non-existing is not found on interface org.opensearch.dataprepper.model.processor.Processor."));
}

@Test
final void testUsingCustomSerializerWithPluginSettings_noExceptions() throws JsonGenerationException, JsonMappingException, IOException {
final PluginModel pluginModel = new PluginModel("customPlugin", validPluginSettings());
Expand Down Expand Up @@ -156,4 +190,21 @@ static String convertInputStreamToString(InputStream inputStream) throws IOExcep
return stringBuilder.toString();
}

static class TestConfig {
@JsonProperty("plugin_model")
@UsesDataPrepperPlugin(pluginType = Processor.class)
private PluginModel pluginModel;

@JsonProperty("plugin_model_deprecated")
@UsesDataPrepperPlugin(pluginType = Processor.class)
private PluginModel pluginModelDeprecated;

public PluginModel getPluginModel() {
return pluginModel;
}

public PluginModel getPluginModelDeprecated() {
return pluginModelDeprecated;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.opensearch.dataprepper.plugins;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.processor.Processor;

@DataPrepperPlugin(name = "test_plugin", pluginType = Processor.class)
public class TestPluginConfig {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.opensearch.dataprepper.plugins;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.processor.Processor;

@DataPrepperPlugin(name = "test_plugin_config_with_deprecated_name",
deprecatedName = "test_plugin_deprecated", pluginType = Processor.class)
public class TestPluginConfigWithDeprecatedName {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
plugin_model:
test_plugin: {}
plugin_model_deprecated:
test_plugin_deprecated: {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
plugin_model:
non-existing: {}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void run() {
.setUrls(ClasspathHelper.forPackage(DEFAULT_PLUGINS_CLASSPATH))
.setScanners(Scanners.TypesAnnotated, Scanners.SubTypes));
final PluginConfigsJsonSchemaConverter pluginConfigsJsonSchemaConverter = new PluginConfigsJsonSchemaConverter(
reflections, new JsonSchemaConverter(modules), siteUrl, siteBaseUrl);
reflections, new JsonSchemaConverter(modules, reflections), siteUrl, siteBaseUrl);
final Class<?> pluginType = pluginConfigsJsonSchemaConverter.pluginTypeNameToPluginType(pluginTypeName);
final Map<String, String> pluginNameToJsonSchemaMap = pluginConfigsJsonSchemaConverter.convertPluginConfigsIntoJsonSchemas(
SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON, pluginType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,28 @@
import com.github.victools.jsonschema.generator.SchemaGeneratorConfig;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigPart;
import com.github.victools.jsonschema.generator.SchemaGeneratorGeneralConfigPart;
import com.github.victools.jsonschema.generator.SchemaVersion;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin;
import org.reflections.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class JsonSchemaConverter {
private static final Logger LOG = LoggerFactory.getLogger(JsonSchemaConverter.class);
static final String DEPRECATED_SINCE_KEY = "deprecated";
private final List<Module> jsonSchemaGeneratorModules;
private final Reflections reflections;

public JsonSchemaConverter(final List<Module> jsonSchemaGeneratorModules) {
public JsonSchemaConverter(final List<Module> jsonSchemaGeneratorModules, final Reflections reflections) {
this.jsonSchemaGeneratorModules = jsonSchemaGeneratorModules;
this.reflections = reflections;
}

public ObjectNode convertIntoJsonSchema(
Expand All @@ -30,7 +42,9 @@ public ObjectNode convertIntoJsonSchema(
loadJsonSchemaGeneratorModules(configBuilder);
final SchemaGeneratorConfigPart<FieldScope> scopeSchemaGeneratorConfigPart = configBuilder.forFields();
overrideInstanceAttributeWithDeprecated(scopeSchemaGeneratorConfigPart);
overrideTargetTypeWithUsesDataPrepperPlugin(scopeSchemaGeneratorConfigPart);
resolveDefaultValueFromJsonProperty(scopeSchemaGeneratorConfigPart);
overrideDataPrepperPluginTypeAttribute(configBuilder.forTypesInGeneral(), schemaVersion, optionPreset);

final SchemaGeneratorConfig config = configBuilder.build();
final SchemaGenerator generator = new SchemaGenerator(config);
Expand All @@ -52,11 +66,49 @@ private void overrideInstanceAttributeWithDeprecated(
});
}

private void overrideTargetTypeWithUsesDataPrepperPlugin(
final SchemaGeneratorConfigPart<FieldScope> scopeSchemaGeneratorConfigPart) {
scopeSchemaGeneratorConfigPart.withTargetTypeOverridesResolver(field -> Optional
.ofNullable(field.getAnnotationConsideringFieldAndGetterIfSupported(UsesDataPrepperPlugin.class))
.map(usesDataPrepperPlugin -> scanForPluginConfigs(usesDataPrepperPlugin.pluginType()))
.map(stream -> stream.map(specificSubtype -> field.getContext().resolve(specificSubtype)))
.map(stream -> stream.collect(Collectors.toList()))
.orElse(null));
}

private void overrideDataPrepperPluginTypeAttribute(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scanning for plugins should only be done in the plugin framework. Make modifications there and use those.

final SchemaGeneratorGeneralConfigPart schemaGeneratorGeneralConfigPart,
final SchemaVersion schemaVersion, final OptionPreset optionPreset) {
schemaGeneratorGeneralConfigPart.withTypeAttributeOverride((node, scope, context) -> {
final DataPrepperPlugin dataPrepperPlugin = scope.getType().getErasedType()
.getAnnotation(DataPrepperPlugin.class);
if (dataPrepperPlugin != null) {
final ObjectNode propertiesNode = node.putObject("properties");
try {
final ObjectNode schemaNode = this.convertIntoJsonSchema(
schemaVersion, optionPreset, dataPrepperPlugin.pluginConfigurationType());
propertiesNode.set(dataPrepperPlugin.name(), schemaNode);
} catch (JsonProcessingException e) {
LOG.error("Encountered error retrieving JSON schema for {}", dataPrepperPlugin.name(), e);
throw new RuntimeException(e);
}
}
});
}

private void resolveDefaultValueFromJsonProperty(
final SchemaGeneratorConfigPart<FieldScope> scopeSchemaGeneratorConfigPart) {
scopeSchemaGeneratorConfigPart.withDefaultResolver(field -> {
final JsonProperty annotation = field.getAnnotationConsideringFieldAndGetter(JsonProperty.class);
return annotation == null || annotation.defaultValue().isEmpty() ? null : annotation.defaultValue();
});
}

private Stream<Class<?>> scanForPluginConfigs(final Class<?> pluginType) {
return reflections.getTypesAnnotatedWith(DataPrepperPlugin.class).stream()
.filter(clazz -> {
final DataPrepperPlugin dataPrepperPlugin = clazz.getAnnotation(DataPrepperPlugin.class);
return pluginType.equals(dataPrepperPlugin.pluginType());
});
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.opensearch.dataprepper.schemas;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.victools.jsonschema.generator.OptionPreset;
import com.github.victools.jsonschema.generator.SchemaVersion;
Expand Down Expand Up @@ -90,8 +89,8 @@ public Map<String, String> convertPluginConfigsIntoJsonSchemas(
addPluginName(jsonSchemaNode, pluginName);
addDocumentationLink(jsonSchemaNode, pluginName, pluginType);
value = jsonSchemaNode.toPrettyString();
} catch (JsonProcessingException e) {
LOG.error("Encountered error retrieving JSON schema for {}", pluginName);
} catch (final Exception e) {
LOG.error("Encountered error retrieving JSON schema for {}", pluginName, e);
return Stream.empty();
}
return Stream.of(Map.entry(entry.getKey(), value));
Expand Down
Loading
Loading