diff --git a/docs/user-manual/modules/ROOT/pages/camel-jbang-k.adoc b/docs/user-manual/modules/ROOT/pages/camel-jbang-k.adoc index 2eeff9fc6b741..574a6e9943a52 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-jbang-k.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-jbang-k.adoc @@ -339,6 +339,121 @@ This Pipe explicitly defines Camel endpoint URIs that act as a source and sink. NOTE: You can also specify endpoint parameters directly on the source/sink like `--source timer:tick?period=5000` +=== Binding to Knative broker + +You can reference Knative eventing resources as source or sink in a Pipe resource. +The reference to the Knative resource is identified by the apiVersion, kind and resource name. Users may add properties to the object reference as usual. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: my-pipe + annotations: + camel.apache.org/operator.id: camel-k +spec: + source: # <1> + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: "Camel rocks!" + sink: # <2> + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.event.my-event # <3> +---- +<1> Reference to the source that provides data +<2> Reference to the Knative broker where data should be sent to +<3> The CloudEvents event type that is used for the events + +NOTE: Knative eventing uses CloudEvents data format by default. Camel provides the concept of data types that is able to transform from many different component data formats to CloudEvents data type. The data type transformation will set proper event properties such as ce-type, ce-source or ce-subject. + +When referencing a Knative broker as a source the `type` property is mandatory in order to filter the event stream. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: my-pipe + annotations: + camel.apache.org/operator.id: camel-k +spec: + source: # <1> + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.event.my-event # <2> + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink +---- +<1> Reference to the source Knative broker that provides the events +<2> Filter the event stream for events with the given CloudEvents event type + +=== Binding to Knative channels + +Knative eventing provides the channel resource for a subscription consumer model. +Camel K is able to automatically manage the subscription when referencing Knative eventing channels as a source or sink in a Pipe. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: my-pipe + annotations: + camel.apache.org/operator.id: camel-k +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: "Camel rocks!" + sink: # <1> + ref: + kind: InMemoryChannel + apiVersion: messaging.knative.dev/v1 + name: my-messages +---- +<1> Reference to the Knative message channel that receives the events + +The same approach can be used to subscribe to a message chanel as a consumer to act as a source in a Pipe. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: my-pipe + annotations: + camel.apache.org/operator.id: camel-k +spec: + source: # <1> + ref: + kind: InMemoryChannel + apiVersion: messaging.knative.dev/v1 + name: my-messages + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink +---- +<1> Reference to the source Knative message channel that provides the events + === Error handling You can configure an error handler in order to specify what to do when some event ends up with failure. diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Bind.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Bind.java deleted file mode 100644 index 61ab92c85acf4..0000000000000 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Bind.java +++ /dev/null @@ -1,513 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.dsl.jbang.core.commands; - -import java.io.FileOutputStream; -import java.io.InputStream; -import java.net.URL; -import java.nio.file.Path; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.Stack; - -import org.apache.camel.dsl.jbang.core.common.JSonHelper; -import org.apache.camel.dsl.jbang.core.common.YamlHelper; -import org.apache.camel.github.GitHubResourceResolver; -import org.apache.camel.impl.engine.DefaultResourceResolvers; -import org.apache.camel.spi.Resource; -import org.apache.camel.spi.ResourceResolver; -import org.apache.camel.util.FileUtil; -import org.apache.camel.util.IOHelper; -import org.apache.camel.util.StringHelper; -import org.apache.camel.util.URISupport; -import org.apache.camel.util.json.Jsoner; -import org.snakeyaml.engine.v2.api.LoadSettings; -import org.snakeyaml.engine.v2.api.YamlUnicodeReader; -import org.snakeyaml.engine.v2.composer.Composer; -import org.snakeyaml.engine.v2.nodes.Node; -import org.snakeyaml.engine.v2.parser.Parser; -import org.snakeyaml.engine.v2.parser.ParserImpl; -import org.snakeyaml.engine.v2.scanner.StreamReader; -import picocli.CommandLine; -import picocli.CommandLine.Command; - -import static org.apache.camel.dsl.yaml.common.YamlDeserializerSupport.asStringSet; -import static org.apache.camel.dsl.yaml.common.YamlDeserializerSupport.asText; -import static org.apache.camel.dsl.yaml.common.YamlDeserializerSupport.nodeAt; - -@Command(name = "bind", description = "Bind source and sink Kamelets as a new Camel integration", - sortOptions = false) -public class Bind extends CamelCommand { - - @CommandLine.Parameters(description = "Name of binding file to be saved", arity = "1", - paramLabel = "", parameterConsumer = FileConsumer.class) - Path filePath; // Defined only for file path completion; the field never used - String file; - - @CommandLine.Option(names = { "--source" }, description = "Source (from) such as a Kamelet or Camel endpoint uri", - required = true) - String source; - - @CommandLine.Option(names = { "--step" }, description = "Optional steps such as a Kamelet or Camel endpoint uri") - String[] steps; - - @CommandLine.Option(names = { "--sink" }, description = "Sink (to) such as a Kamelet or Camel endpoint uri", - required = true) - String sink; - - @CommandLine.Option(names = { "--error-handler" }, - description = "Add error handler (none|log|sink:). Sink endpoints are expected in the format \"[[apigroup/]version:]kind:[namespace/]name\", plain Camel URIs or Kamelet name.") - String errorHandler; - - @CommandLine.Option(names = { "--property" }, - description = "Adds a pipe property in the form of [source|sink|error-handler|step-].= where is the step number starting from 1", - arity = "0") - String[] properties; - - @CommandLine.Option(names = { "--output" }, - defaultValue = "file", - description = "Output format generated by this command (supports: file, yaml or json).") - String output; - - private final TemplateProvider templateProvider; - - public Bind(CamelJBangMain main) { - this(main, new TemplateProvider() { - }); - } - - public Bind(CamelJBangMain main, TemplateProvider templateProvider) { - super(main); - this.templateProvider = templateProvider; - } - - /** - * Helper class provides access to the templates that construct the Pipe resource. Subclasses may overwrite the - * provider to inject their own templates. - */ - public interface TemplateProvider { - default InputStream getPipeTemplate(String in, String out) { - return Bind.class.getClassLoader().getResourceAsStream("templates/pipe-" + in + "-" + out + ".yaml.tmpl"); - } - - default InputStream getStepTemplate(String stepType) { - return Bind.class.getClassLoader().getResourceAsStream("templates/step-%s.yaml.tmpl".formatted(stepType)); - } - - default InputStream getErrorHandlerTemplate(String type) { - return Bind.class.getClassLoader() - .getResourceAsStream("templates/error-handler-%s.yaml.tmpl".formatted(type)); - } - } - - @Override - public Integer doCall() throws Exception { - String pipe = constructPipe(); - - if (pipe.isEmpty()) { - printer().println("Failed to construct Pipe resource"); - return -1; - } - - return dumpPipe(pipe); - } - - public String constructPipe() throws Exception { - // the pipe source and sink can either be a kamelet or an uri - String in = "kamelet"; - String out = "kamelet"; - - String sourceEndpoint = source; - String sinkEndpoint = sink; - Map sourceUriProperties = new HashMap<>(); - Map sinkUriProperties = new HashMap<>(); - if (source.contains(":")) { - in = "uri"; - if (source.contains("?")) { - sourceEndpoint = StringHelper.before(source, "?"); - String query = StringHelper.after(source, "?"); - if (query != null) { - sourceUriProperties = URISupport.parseQuery(query, true); - } - } - } - if (sink.contains(":")) { - out = "uri"; - if (sink.contains("?")) { - sinkEndpoint = StringHelper.before(sink, "?"); - String query = StringHelper.after(sink, "?"); - if (query != null) { - sinkUriProperties = URISupport.parseQuery(query, true); - } - } - } - - InputStream is = templateProvider.getPipeTemplate(in, out); - String context = IOHelper.loadText(is); - IOHelper.close(is); - - String stepsContext = ""; - if (steps != null) { - StringBuilder sb = new StringBuilder("\n steps:\n"); - for (int i = 0; i < steps.length; i++) { - String step = steps[i]; - boolean uri = step.contains(":"); - String text; - String stepType; - Map stepProperties = getProperties("step-%d".formatted(i + 1)); - if (uri) { - stepType = "uri"; - if (step.contains("?")) { - String query = StringHelper.after(step, "?"); - step = StringHelper.before(step, "?"); - if (query != null) { - stepProperties.putAll(URISupport.parseQuery(query, true)); - } - } - } else { - stepType = "kamelet"; - stepProperties = kameletProperties(step, stepProperties); - } - - is = templateProvider.getStepTemplate(stepType); - text = IOHelper.loadText(is); - IOHelper.close(is); - text = text.replaceFirst("\\{\\{ \\.Name }}", step); - - if (i == steps.length - 1) { - text = text.replaceFirst("\\{\\{ \\.StepProperties }}\n", asEndpointProperties(stepProperties)); - } else { - text = text.replaceFirst("\\{\\{ \\.StepProperties }}", asEndpointProperties(stepProperties)); - } - sb.append(text); - } - stepsContext = sb.toString(); - } - - String errorHandlerContext = ""; - if (errorHandler != null) { - StringBuilder sb = new StringBuilder("\n errorHandler:\n"); - - Map errorHandlerParameters = getProperties("error-handler"); - - String[] errorHandlerTokens = errorHandler.split(":", 2); - String errorHandlerType = errorHandlerTokens[0]; - - String errorHandlerSpec; - switch (errorHandlerType) { - case "sink": - if (errorHandlerTokens.length != 2) { - printer().println( - "Invalid error handler syntax. Type 'sink' needs an endpoint configuration (ie sink:endpointUri)"); - // Error abort Pipe construction - return ""; - } - String endpoint = errorHandlerTokens[1]; - - String sinkType; - Map errorHandlerSinkProperties = getProperties("error-handler.sink"); - - // remove sink properties from error handler parameters - errorHandlerSinkProperties.keySet().stream() - .map(key -> "sink." + key) - .filter(errorHandlerParameters::containsKey) - .forEach(errorHandlerParameters::remove); - - if (endpoint.contains(":")) { - sinkType = "uri"; - if (endpoint.contains("?")) { - String query = StringHelper.after(endpoint, "?"); - endpoint = StringHelper.before(endpoint, "?"); - if (query != null) { - errorHandlerSinkProperties.putAll(URISupport.parseQuery(query, true)); - } - } - } else { - sinkType = "kamelet"; - errorHandlerSinkProperties = kameletProperties(endpoint, errorHandlerSinkProperties); - } - - is = templateProvider.getErrorHandlerTemplate("sink-" + sinkType); - errorHandlerSpec = IOHelper.loadText(is); - IOHelper.close(is); - errorHandlerSpec = errorHandlerSpec.replaceFirst("\\{\\{ \\.Name }}", endpoint); - errorHandlerSpec = errorHandlerSpec.replaceFirst("\\{\\{ \\.ErrorHandlerProperties }}", - asEndpointProperties(errorHandlerSinkProperties, 4)); - errorHandlerSpec = errorHandlerSpec.replaceFirst("\\{\\{ \\.ErrorHandlerParameter }}", - asErrorHandlerParameters(errorHandlerParameters)); - break; - case "log": - is = templateProvider.getErrorHandlerTemplate("log"); - errorHandlerSpec = IOHelper.loadText(is); - IOHelper.close(is); - errorHandlerSpec = errorHandlerSpec.replaceFirst("\\{\\{ \\.ErrorHandlerParameter }}", - asErrorHandlerParameters(errorHandlerParameters)); - break; - default: - errorHandlerSpec = " none: {}"; - } - sb.append(errorHandlerSpec); - errorHandlerContext = sb.toString(); - } - - String name = FileUtil.onlyName(file, false); - context = context.replaceFirst("\\{\\{ \\.Name }}", name); - context = context.replaceFirst("\\{\\{ \\.Source }}", sourceEndpoint); - context = context.replaceFirst("\\{\\{ \\.Sink }}", sinkEndpoint); - context = context.replaceFirst("\\{\\{ \\.Steps }}", stepsContext); - context = context.replaceFirst("\\{\\{ \\.ErrorHandler }}", errorHandlerContext); - - Map sourceProperties = getProperties("source"); - if ("kamelet".equals(in)) { - sourceProperties = kameletProperties(sourceEndpoint, sourceProperties); - } else { - sourceProperties.putAll(sourceUriProperties); - } - context = context.replaceFirst("\\{\\{ \\.SourceProperties }}\n", asEndpointProperties(sourceProperties)); - - Map sinkProperties = getProperties("sink"); - if ("kamelet".equals(out)) { - sinkProperties = kameletProperties(sinkEndpoint, sinkProperties); - } else { - sinkProperties.putAll(sinkUriProperties); - } - context = context.replaceFirst("\\{\\{ \\.SinkProperties }}\n", asEndpointProperties(sinkProperties)); - return context; - } - - public int dumpPipe(String pipe) throws Exception { - switch (output) { - case "file": - if (file.endsWith(".yaml")) { - IOHelper.writeText(pipe, new FileOutputStream(file, false)); - } else if (file.endsWith(".json")) { - IOHelper.writeText(Jsoner.serialize(YamlHelper.yaml().loadAs(pipe, Map.class)), - new FileOutputStream(file, false)); - } else { - IOHelper.writeText(pipe, new FileOutputStream(file + ".yaml", false)); - } - break; - case "yaml": - printer().println(pipe); - break; - case "json": - printer().println(JSonHelper.prettyPrint(Jsoner.serialize(YamlHelper.yaml().loadAs(pipe, Map.class)), 2) - .replaceAll("\\\\/", "/")); - break; - default: - printer().printf("Unsupported output format '%s' (supported: file, yaml, json)%n", output); - return -1; - } - return 0; - } - - /** - * Creates YAML snippet representing the error handler parameters section. - * - * @param props the properties to set as error handler parameters. - */ - private String asErrorHandlerParameters(Map props) { - if (props.isEmpty()) { - return "parameters: {}"; - } - - StringBuilder sb = new StringBuilder(); - sb.append("parameters:\n"); - for (Map.Entry propertyEntry : props.entrySet()) { - sb.append(" ").append(propertyEntry.getKey()).append(": ").append(propertyEntry.getValue()).append("\n"); - } - return sb.toString().trim(); - } - - /** - * Creates YAML snippet representing the endpoint properties section. - * - * @param props the properties to set as endpoint properties. - * @return - */ - private String asEndpointProperties(Map props) { - return asEndpointProperties(props, 0); - } - - /** - * Creates YAML snippet representing the endpoint properties section. - * - * @param props the properties to set as endpoint properties. - * @param additionalIndent optional number of additional spaces used as indentation. - * @return - */ - private String asEndpointProperties(Map props, int additionalIndent) { - StringBuilder sb = new StringBuilder(); - if (props.isEmpty()) { - // create a dummy placeholder, so it is easier to add new properties manually - return sb.append("#properties:\n ").append(" ".repeat(additionalIndent)).append("#key: \"value\"").toString(); - } - - sb.append("properties:\n"); - for (Map.Entry propertyEntry : props.entrySet()) { - sb.append(" ").append(" ".repeat(additionalIndent)).append(propertyEntry.getKey()).append(": ") - .append(propertyEntry.getValue()).append("\n"); - } - return sb.toString().trim(); - } - - /** - * Extracts properties from given property arguments. Filter properties by given prefix. This way each component in - * pipe (source, sink, errorHandler, step[1-n]) can have its individual properties. - * - * @param keyPrefix - * @return - */ - private Map getProperties(String keyPrefix) { - Map props = new HashMap<>(); - if (properties != null) { - for (String propertyExpression : properties) { - if (propertyExpression.startsWith(keyPrefix + ".")) { - String[] keyValue = propertyExpression.split("=", 2); - if (keyValue.length != 2) { - printer().printf( - "property '%s' does not follow format [source|sink|error-handler|step-].=%n", - propertyExpression); - continue; - } - - props.put(keyValue[0].substring(keyPrefix.length() + 1), keyValue[1]); - } - } - } - - return props; - } - - /** - * Get required properties from Kamelet specification and add those to the given user properties if not already set. - * In case a required property is not present in the provided user properties the value is either set to the example - * coming from the Kamelet specification or to a placeholder value for users to fill in manually. Property values do - * already have quotes when the type is String. - * - * @param kamelet - * @return - * @throws Exception - */ - protected Map kameletProperties(String kamelet, Map userProperties) throws Exception { - Map endpointProperties = new HashMap<>(); - InputStream is; - String loc; - Resource res; - - // try local disk first before GitHub - ResourceResolver resolver = new DefaultResourceResolvers.FileResolver(); - try { - res = resolver.resolve("file:" + kamelet + ".kamelet.yaml"); - } finally { - resolver.close(); - } - if (res.exists()) { - is = res.getInputStream(); - loc = res.getLocation(); - } else { - resolver = new GitHubResourceResolver(); - try { - res = resolver.resolve( - "github:apache:camel-kamelets:main:kamelets/" + kamelet + ".kamelet.yaml"); - } finally { - resolver.close(); - } - loc = res.getLocation(); - URL u = new URL(loc); - is = u.openStream(); - } - if (is != null) { - try { - LoadSettings local = LoadSettings.builder().setLabel(loc).build(); - final StreamReader reader = new StreamReader(local, new YamlUnicodeReader(is)); - final Parser parser = new ParserImpl(local, reader); - final Composer composer = new Composer(local, parser); - Node root = composer.getSingleNode().orElse(null); - if (root != null) { - Set required = asStringSet(nodeAt(root, "/spec/definition/required")); - if (required != null && !required.isEmpty()) { - for (String req : required) { - if (!userProperties.containsKey(req)) { - String type = asText(nodeAt(root, "/spec/definition/properties/" + req + "/type")); - String example = asText(nodeAt(root, "/spec/definition/properties/" + req + "/example")); - StringBuilder vb = new StringBuilder(); - if (example != null) { - if ("string".equals(type)) { - vb.append("\""); - } - vb.append(example); - if ("string".equals(type)) { - vb.append("\""); - } - } else { - vb.append("\"value\""); - } - endpointProperties.put(req, vb.toString()); - } - } - } - } - IOHelper.close(is); - } catch (Exception e) { - System.err.println("Error parsing Kamelet: " + loc + " due to: " + e.getMessage()); - } - } else { - System.err.println("Kamelet not found on github: " + kamelet); - } - - endpointProperties.putAll(userProperties); - - return endpointProperties; - } - - static class FileConsumer extends ParameterConsumer { - @Override - protected void doConsumeParameters(Stack args, Bind cmd) { - cmd.file = args.pop(); - } - } - - public void setFile(String file) { - this.file = file; - } - - public void setSource(String source) { - this.source = source; - } - - public void setSink(String sink) { - this.sink = sink; - } - - public void setSteps(String[] steps) { - this.steps = steps; - } - - public void setProperties(String[] properties) { - this.properties = properties; - } - - public void setErrorHandler(String errorHandler) { - this.errorHandler = errorHandler; - } - - public void setOutput(String output) { - this.output = output; - } -} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java index 89ef4cdd0fe60..934898d56f008 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java @@ -21,6 +21,7 @@ import org.apache.camel.catalog.CamelCatalog; import org.apache.camel.catalog.DefaultCamelCatalog; import org.apache.camel.dsl.jbang.core.commands.action.*; +import org.apache.camel.dsl.jbang.core.commands.bind.Bind; import org.apache.camel.dsl.jbang.core.commands.catalog.CatalogCommand; import org.apache.camel.dsl.jbang.core.commands.catalog.CatalogComponent; import org.apache.camel.dsl.jbang.core.commands.catalog.CatalogDataFormat; diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/Bind.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/Bind.java new file mode 100644 index 0000000000000..cb503b9491056 --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/Bind.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dsl.jbang.core.commands.bind; + +import java.io.FileOutputStream; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Stack; +import java.util.stream.Collectors; + +import org.apache.camel.CamelException; +import org.apache.camel.dsl.jbang.core.commands.CamelCommand; +import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; +import org.apache.camel.dsl.jbang.core.common.JSonHelper; +import org.apache.camel.dsl.jbang.core.common.YamlHelper; +import org.apache.camel.util.FileUtil; +import org.apache.camel.util.IOHelper; +import org.apache.camel.util.json.Jsoner; +import picocli.CommandLine; +import picocli.CommandLine.Command; + +@Command(name = "bind", description = "Bind source and sink Kamelets as a new Camel integration", + sortOptions = false) +public class Bind extends CamelCommand { + + @CommandLine.Parameters(description = "Name of binding file to be saved", arity = "1", + paramLabel = "", parameterConsumer = FileConsumer.class) + Path filePath; // Defined only for file path completion; the field never used + String file; + + @CommandLine.Option(names = { "--source" }, description = "Source (from) such as a Kamelet or Camel endpoint uri", + required = true) + String source; + + @CommandLine.Option(names = { "--step" }, description = "Optional steps such as a Kamelet or Camel endpoint uri") + String[] steps; + + @CommandLine.Option(names = { "--sink" }, description = "Sink (to) such as a Kamelet or Camel endpoint uri", + required = true) + String sink; + + @CommandLine.Option(names = { "--error-handler" }, + description = "Add error handler (none|log|sink:). Sink endpoints are expected in the format \"[[apigroup/]version:]kind:[namespace/]name\", plain Camel URIs or Kamelet name.") + String errorHandler; + + @CommandLine.Option(names = { "--property" }, + description = "Adds a pipe property in the form of [source|sink|error-handler|step-].= where is the step number starting from 1", + arity = "0") + String[] properties; + + @CommandLine.Option(names = { "--output" }, + defaultValue = "file", + description = "Output format generated by this command (supports: file, yaml or json).") + String output; + + private final TemplateProvider templateProvider; + + // Available binding providers, order in array is important! + private final BindingProvider[] bindingProviders = new BindingProvider[] { + new KameletBindingProvider(), + new KnativeBrokerBindingProvider(), + new KnativeChannelBindingProvider(), + new ObjectReferenceBindingProvider(), + new UriBindingProvider() + }; + + public Bind(CamelJBangMain main) { + this(main, new TemplateProvider() { + }); + } + + public Bind(CamelJBangMain main, TemplateProvider templateProvider) { + super(main); + this.templateProvider = templateProvider; + } + + @Override + public Integer doCall() throws Exception { + String pipe = constructPipe(); + + if (pipe.isEmpty()) { + printer().println("Failed to construct Pipe resource"); + return -1; + } + + return dumpPipe(pipe); + } + + public String constructPipe() throws Exception { + try { + String sourceEndpoint = resolveEndpoint(BindingProvider.EndpointType.SOURCE, source, getProperties("source")); + String sinkEndpoint = resolveEndpoint(BindingProvider.EndpointType.SINK, sink, getProperties("sink")); + + InputStream is = templateProvider.getPipeTemplate(); + String context = IOHelper.loadText(is); + IOHelper.close(is); + + String stepsContext = ""; + if (steps != null) { + StringBuilder sb = new StringBuilder("\n steps:\n"); + for (int i = 0; i < steps.length; i++) { + sb.append(resolveEndpoint(BindingProvider.EndpointType.STEP, steps[i], + getProperties("step-%d".formatted(i + 1)))); + + if (i < steps.length - 1) { + sb.append("\n"); + } + } + stepsContext = sb.toString(); + } + + String errorHandlerContext = ""; + if (errorHandler != null) { + StringBuilder sb = new StringBuilder("\n errorHandler:\n"); + + Map errorHandlerParameters = getProperties("error-handler"); + + String[] errorHandlerTokens = errorHandler.split(":", 2); + String errorHandlerType = errorHandlerTokens[0]; + + String errorHandlerSpec; + switch (errorHandlerType) { + case "sink": + if (errorHandlerTokens.length != 2) { + printer().println( + "Invalid error handler syntax. Type 'sink' needs an endpoint configuration (ie sink:endpointUri)"); + // Error abort Pipe construction + return ""; + } + String endpointUri = errorHandlerTokens[1]; + Map errorHandlerSinkProperties = getProperties("error-handler.sink"); + + // remove sink properties from error handler parameters + errorHandlerSinkProperties.keySet().stream() + .map(key -> "sink." + key) + .filter(errorHandlerParameters::containsKey) + .forEach(errorHandlerParameters::remove); + + String endpoint = resolveEndpoint(BindingProvider.EndpointType.ERROR_HANDLER, endpointUri, + errorHandlerSinkProperties); + + is = templateProvider.getErrorHandlerTemplate("sink"); + errorHandlerSpec = IOHelper.loadText(is); + IOHelper.close(is); + errorHandlerSpec = errorHandlerSpec.replaceFirst("\\{\\{ \\.Endpoint }}", endpoint); + errorHandlerSpec = errorHandlerSpec.replaceFirst("\\{\\{ \\.ErrorHandlerParameter }}", + templateProvider.asErrorHandlerParameters(errorHandlerParameters)); + break; + case "log": + is = templateProvider.getErrorHandlerTemplate("log"); + errorHandlerSpec = IOHelper.loadText(is); + IOHelper.close(is); + errorHandlerSpec = errorHandlerSpec.replaceFirst("\\{\\{ \\.ErrorHandlerParameter }}", + templateProvider.asErrorHandlerParameters(errorHandlerParameters)); + break; + default: + errorHandlerSpec = " none: {}"; + } + sb.append(errorHandlerSpec); + errorHandlerContext = sb.toString(); + } + + String name = FileUtil.onlyName(file, false); + context = context.replaceFirst("\\{\\{ \\.Name }}", name); + context = context.replaceFirst("\\{\\{ \\.Source }}\n", sourceEndpoint); + context = context.replaceFirst("\\{\\{ \\.Sink }}\n", sinkEndpoint); + context = context.replaceFirst("\\{\\{ \\.Steps }}", stepsContext); + context = context.replaceFirst("\\{\\{ \\.ErrorHandler }}", errorHandlerContext); + return context; + } catch (Exception e) { + printer().printErr(e); + } + + return ""; + } + + private String resolveEndpoint( + BindingProvider.EndpointType endpointType, String uriExpression, Map endpointProperties) + throws Exception { + for (BindingProvider provider : bindingProviders) { + if (provider.canHandle(uriExpression)) { + String context + = provider.getEndpoint(endpointType, uriExpression, endpointProperties, templateProvider); + + int additionalIndent = templateProvider.getAdditionalIndent(endpointType); + if (additionalIndent > 0) { + context = Arrays.stream(context.split("\n")) + .map(line -> " ".repeat(additionalIndent) + line) + .collect(Collectors.joining("\n")); + } + + return context; + } + } + + throw new CamelException( + "Failed to resolve endpoint URI expression %s - no matching binding provider found" + .formatted(uriExpression)); + } + + public int dumpPipe(String pipe) throws Exception { + switch (output) { + case "file": + if (file.endsWith(".yaml")) { + IOHelper.writeText(pipe, new FileOutputStream(file, false)); + } else if (file.endsWith(".json")) { + IOHelper.writeText(Jsoner.serialize(YamlHelper.yaml().loadAs(pipe, Map.class)), + new FileOutputStream(file, false)); + } else { + IOHelper.writeText(pipe, new FileOutputStream(file + ".yaml", false)); + } + break; + case "yaml": + printer().println(pipe); + break; + case "json": + printer().println(JSonHelper.prettyPrint(Jsoner.serialize(YamlHelper.yaml().loadAs(pipe, Map.class)), 2) + .replaceAll("\\\\/", "/")); + break; + default: + printer().printf("Unsupported output format '%s' (supported: file, yaml, json)%n", output); + return -1; + } + return 0; + } + + /** + * Extracts properties from given property arguments. Filter properties by given prefix. This way each component in + * pipe (source, sink, errorHandler, step[1-n]) can have its individual properties. + * + * @param keyPrefix + * @return + */ + private Map getProperties(String keyPrefix) { + Map props = new HashMap<>(); + if (properties != null) { + for (String propertyExpression : properties) { + if (propertyExpression.startsWith(keyPrefix + ".")) { + String[] keyValue = propertyExpression.split("=", 2); + if (keyValue.length != 2) { + printer().printf( + "property '%s' does not follow format [source|sink|error-handler|step-].=%n", + propertyExpression); + continue; + } + + props.put(keyValue[0].substring(keyPrefix.length() + 1), keyValue[1]); + } + } + } + + return props; + } + + static class FileConsumer extends ParameterConsumer { + @Override + protected void doConsumeParameters(Stack args, Bind cmd) { + cmd.file = args.pop(); + } + } + + public void setFile(String file) { + this.file = file; + } + + public void setSource(String source) { + this.source = source; + } + + public void setSink(String sink) { + this.sink = sink; + } + + public void setSteps(String[] steps) { + this.steps = steps; + } + + public void setProperties(String[] properties) { + this.properties = properties; + } + + public void setErrorHandler(String errorHandler) { + this.errorHandler = errorHandler; + } + + public void setOutput(String output) { + this.output = output; + } +} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/BindingProvider.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/BindingProvider.java new file mode 100644 index 0000000000000..64df50ced4057 --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/BindingProvider.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.dsl.jbang.core.commands.bind; + +import java.util.Map; + +/** + * Binding provider able to create an endpoint that can be used as a source/sink/step in a Pipe specification. Endpoints + * may represent a Kamelet, Camel endpoint URI or a Kubernetes object reference such as a reference to a Knative broker + * for instance. Implementations must not hold any state as the binding provider instance is used for multiple calls. + */ +public interface BindingProvider { + + String getEndpoint( + EndpointType type, String uriExpression, Map endpointProperties, TemplateProvider templateProvider) + throws Exception; + + boolean canHandle(String uriExpression); + + enum EndpointType { + SOURCE, + SINK, + STEP, + ERROR_HANDLER + } + +} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/KameletBindingProvider.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/KameletBindingProvider.java new file mode 100644 index 0000000000000..1e65af00fef67 --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/KameletBindingProvider.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.dsl.jbang.core.commands.bind; + +import java.io.InputStream; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.camel.github.GitHubResourceResolver; +import org.apache.camel.impl.engine.DefaultResourceResolvers; +import org.apache.camel.spi.Resource; +import org.apache.camel.spi.ResourceResolver; +import org.apache.camel.util.IOHelper; +import org.apache.camel.util.StringHelper; +import org.snakeyaml.engine.v2.api.LoadSettings; +import org.snakeyaml.engine.v2.api.YamlUnicodeReader; +import org.snakeyaml.engine.v2.composer.Composer; +import org.snakeyaml.engine.v2.nodes.Node; +import org.snakeyaml.engine.v2.parser.Parser; +import org.snakeyaml.engine.v2.parser.ParserImpl; +import org.snakeyaml.engine.v2.scanner.StreamReader; + +import static org.apache.camel.dsl.yaml.common.YamlDeserializerSupport.asStringSet; +import static org.apache.camel.dsl.yaml.common.YamlDeserializerSupport.asText; +import static org.apache.camel.dsl.yaml.common.YamlDeserializerSupport.nodeAt; + +/** + * Binding to Kamelets as Kubernetes object references. Automatically resolves Kamelet from catalog and reads required + * properties. Adds required properties as placeholder to the object reference when not set already by the user. + */ +public class KameletBindingProvider extends ObjectReferenceBindingProvider { + + private static final String prefix = "kamelet:"; + + public KameletBindingProvider() { + super("camel.apache.org/v1", "Kamelet"); + } + + @Override + public String getEndpoint( + EndpointType type, String uriExpression, Map endpointProperties, TemplateProvider templateProvider) + throws Exception { + if (uriExpression.startsWith(prefix)) { + return super.getEndpoint(type, StringHelper.after(uriExpression, prefix), endpointProperties, templateProvider); + } + + return super.getEndpoint(type, uriExpression, endpointProperties, templateProvider); + } + + @Override + protected Map getEndpointUriProperties( + EndpointType type, String objectName, String uriExpression, Map endpointProperties) + throws Exception { + return kameletProperties(objectName, + super.getEndpointUriProperties(type, objectName, uriExpression, endpointProperties)); + } + + /** + * Get required properties from Kamelet specification and add those to the given user properties if not already set. + * In case a required property is not present in the provided user properties the value is either set to the example + * coming from the Kamelet specification or to a placeholder value for users to fill in manually. Property values do + * already have quotes when the type is String. + * + * @param kamelet + * @return + * @throws Exception + */ + protected Map kameletProperties(String kamelet, Map userProperties) throws Exception { + Map endpointProperties = new HashMap<>(); + InputStream is; + String loc; + Resource res; + + // try local disk first before GitHub + ResourceResolver resolver = new DefaultResourceResolvers.FileResolver(); + try { + res = resolver.resolve("file:" + kamelet + ".kamelet.yaml"); + } finally { + resolver.close(); + } + if (res.exists()) { + is = res.getInputStream(); + loc = res.getLocation(); + } else { + resolver = new GitHubResourceResolver(); + try { + res = resolver.resolve( + "github:apache:camel-kamelets:main:kamelets/" + kamelet + ".kamelet.yaml"); + } finally { + resolver.close(); + } + loc = res.getLocation(); + URL u = new URL(loc); + is = u.openStream(); + } + if (is != null) { + try { + LoadSettings local = LoadSettings.builder().setLabel(loc).build(); + final StreamReader reader = new StreamReader(local, new YamlUnicodeReader(is)); + final Parser parser = new ParserImpl(local, reader); + final Composer composer = new Composer(local, parser); + Node root = composer.getSingleNode().orElse(null); + if (root != null) { + Set required = asStringSet(nodeAt(root, "/spec/definition/required")); + if (required != null && !required.isEmpty()) { + for (String req : required) { + if (!userProperties.containsKey(req)) { + String type = asText(nodeAt(root, "/spec/definition/properties/" + req + "/type")); + String example = asText(nodeAt(root, "/spec/definition/properties/" + req + "/example")); + StringBuilder vb = new StringBuilder(); + if (example != null) { + if ("string".equals(type)) { + vb.append("\""); + } + vb.append(example); + if ("string".equals(type)) { + vb.append("\""); + } + } else { + vb.append("\"value\""); + } + endpointProperties.put(req, vb.toString()); + } + } + } + } + IOHelper.close(is); + } catch (Exception e) { + System.err.println("Error parsing Kamelet: " + loc + " due to: " + e.getMessage()); + } + } else { + System.err.println("Kamelet not found on github: " + kamelet); + } + + endpointProperties.putAll(userProperties); + + return endpointProperties; + } + + @Override + public boolean canHandle(String uriExpression) { + return uriExpression.startsWith(prefix) || !uriExpression.contains(":"); + } +} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/KnativeBrokerBindingProvider.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/KnativeBrokerBindingProvider.java new file mode 100644 index 0000000000000..8413e1479a1ac --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/KnativeBrokerBindingProvider.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.dsl.jbang.core.commands.bind; + +import java.util.Map; + +import org.apache.camel.util.StringHelper; + +/** + * Binding to a Knative broker resource. + */ +public class KnativeBrokerBindingProvider extends ObjectReferenceBindingProvider { + + private static final String prefix = "knative:broker:"; + + public KnativeBrokerBindingProvider() { + super("eventing.knative.dev/v1", "Broker"); + } + + @Override + public String getEndpoint( + EndpointType type, String uriExpression, Map endpointProperties, TemplateProvider templateProvider) + throws Exception { + if (uriExpression.startsWith(prefix)) { + return super.getEndpoint(type, StringHelper.after(uriExpression, prefix), endpointProperties, templateProvider); + } + + return super.getEndpoint(type, uriExpression, endpointProperties, templateProvider); + } + + @Override + protected Map getEndpointUriProperties( + EndpointType type, String objectName, String uriExpression, Map endpointProperties) + throws Exception { + Map props = super.getEndpointUriProperties(type, objectName, uriExpression, endpointProperties); + + if (type == EndpointType.SOURCE && !props.containsKey("type")) { + // When acting as a source the type property is added by default in order to filter the event stream. + props.put("type", "org.apache.camel.event"); + } + + return props; + } + + @Override + public boolean canHandle(String uriExpression) { + return uriExpression.startsWith(prefix); + } +} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/KnativeChannelBindingProvider.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/KnativeChannelBindingProvider.java new file mode 100644 index 0000000000000..b134633dd7abd --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/KnativeChannelBindingProvider.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.dsl.jbang.core.commands.bind; + +import java.util.Map; + +import org.apache.camel.util.StringHelper; + +public class KnativeChannelBindingProvider extends ObjectReferenceBindingProvider { + + private static final String prefix = "knative:channel:"; + + public KnativeChannelBindingProvider() { + super("messaging.knative.dev/v1", "Channel"); + } + + @Override + public String getEndpoint( + EndpointType type, String uriExpression, Map endpointProperties, TemplateProvider templateProvider) + throws Exception { + if (uriExpression.startsWith(prefix)) { + return super.getEndpoint(type, StringHelper.after(uriExpression, prefix), endpointProperties, templateProvider); + } + + return super.getEndpoint(type, uriExpression, endpointProperties, templateProvider); + } + + @Override + public boolean canHandle(String uriExpression) { + return uriExpression.startsWith(prefix); + } +} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/ObjectReferenceBindingProvider.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/ObjectReferenceBindingProvider.java new file mode 100644 index 0000000000000..f93db17eea858 --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/ObjectReferenceBindingProvider.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.dsl.jbang.core.commands.bind; + +import java.io.InputStream; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.camel.CamelException; +import org.apache.camel.util.IOHelper; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.StringHelper; +import org.apache.camel.util.URISupport; + +/** + * Binding provider creates an object reference, usually to a Kubernetes resource. An object is identified by its fully + * qualified reference with Kind, ApiVersion, name and optional namespace. In addition to that the reference is able to + * specify resource properties. Subclasses may add logic for a very specific Kubernetes resource such as Kamelets or + * Knative brokers. + */ +public class ObjectReferenceBindingProvider implements BindingProvider { + + private static final Pattern OBJECT_REFERENCE_URI_PATTERN + = Pattern.compile("^([a-z.]+/[alphbetv0-9]+):([A-Z][a-z]+):([a-z][a-z-]*/?[a-z][a-z-]*)\\??[^?]*", Pattern.DOTALL); + + private final String apiVersion; + private final String kind; + + public ObjectReferenceBindingProvider() { + this("", ""); + } + + protected ObjectReferenceBindingProvider(String apiVersion, String kind) { + if (ObjectHelper.isNotEmpty(kind) && ObjectHelper.isEmpty(apiVersion)) { + throw new IllegalArgumentException( + "Object reference provider with static kind '%s' requires apiVersion to be set.".formatted(kind)); + } + + this.apiVersion = apiVersion; + this.kind = kind; + } + + @Override + public String getEndpoint( + EndpointType type, String uriExpression, Map endpointProperties, TemplateProvider templateProvider) + throws Exception { + + String apiVersionValue; + String kindValue; + String namespace; + String objectName; + if (ObjectHelper.isEmpty(kind)) { + Matcher objectRef = OBJECT_REFERENCE_URI_PATTERN.matcher(uriExpression); + if (objectRef.matches()) { + apiVersionValue = objectRef.group(1); + kindValue = objectRef.group(2); + + String namespacedName = objectRef.group(3); + objectName = getObjectName(namespacedName); + namespace = getNamespace(namespacedName); + } else { + throw new CamelException("Unsupported object reference: %s".formatted(uriExpression)); + } + } else { + apiVersionValue = apiVersion; + kindValue = kind; + objectName = getObjectName(uriExpression); + namespace = getNamespace(uriExpression); + } + + Map endpointUriProperties + = getEndpointUriProperties(type, objectName, uriExpression, endpointProperties); + + InputStream is; + if (type == EndpointType.STEP) { + is = templateProvider.getStepTemplate("ref"); + } else { + is = templateProvider.getEndpointTemplate("ref"); + } + + String context = IOHelper.loadText(is); + IOHelper.close(is); + + String namespaceContext = ""; + if (namespace != null) { + namespaceContext = " namespace: " + namespace + "\n"; + } + + context = context.replaceFirst("\\{\\{ \\.ApiVersion }}", apiVersionValue); + context = context.replaceFirst("\\{\\{ \\.Kind }}", kindValue); + context = context.replaceFirst("\\{\\{ \\.Name }}", objectName); + context = context.replaceFirst("\\{\\{ \\.Namespace }}\n", namespaceContext); + context = context.replaceFirst("\\{\\{ \\.EndpointProperties }}\n", + templateProvider.asEndpointProperties(endpointUriProperties)); + + return context; + } + + protected String getObjectName(String uriExpression) { + String namespacedName = uriExpression; + if (uriExpression.contains("?")) { + namespacedName = StringHelper.before(uriExpression, "?"); + } + + if (namespacedName.contains("/")) { + return namespacedName.split("/", 2)[1]; + } + + return namespacedName; + } + + protected String getNamespace(String uriExpression) { + String namespacedName = uriExpression; + if (uriExpression.contains("?")) { + namespacedName = StringHelper.before(uriExpression, "?"); + } + + if (namespacedName.contains("/")) { + return namespacedName.split("/", 2)[0]; + } + + return null; + } + + protected Map getEndpointUriProperties( + EndpointType type, String objectName, String uriExpression, Map endpointProperties) + throws Exception { + Map endpointUriProperties = new HashMap<>(endpointProperties); + if (uriExpression.contains("?")) { + String query = StringHelper.after(uriExpression, "?"); + if (query != null) { + endpointUriProperties = URISupport.parseQuery(query, true); + } + } + + return endpointUriProperties; + } + + @Override + public boolean canHandle(String uriExpression) { + if (ObjectHelper.isNotEmpty(kind)) { + return uriExpression.startsWith(kind.toLowerCase(Locale.US) + ":"); + } + + return OBJECT_REFERENCE_URI_PATTERN.matcher(uriExpression).matches(); + } +} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/TemplateProvider.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/TemplateProvider.java new file mode 100644 index 0000000000000..7c39e61be098d --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/TemplateProvider.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.dsl.jbang.core.commands.bind; + +import java.io.InputStream; +import java.util.Map; + +/** + * Helper class provides access to the templates that construct the Pipe resource. Subclasses may overwrite the provider + * to inject their own templates. + */ +public interface TemplateProvider { + default InputStream getPipeTemplate() { + return TemplateProvider.class.getClassLoader().getResourceAsStream("templates/pipe.yaml.tmpl"); + } + + default InputStream getStepTemplate(String stepType) { + return TemplateProvider.class.getClassLoader().getResourceAsStream("templates/step-%s.yaml.tmpl".formatted(stepType)); + } + + default InputStream getEndpointTemplate(String endpointType) { + return TemplateProvider.class.getClassLoader() + .getResourceAsStream("templates/endpoint-%s.yaml.tmpl".formatted(endpointType)); + } + + default InputStream getErrorHandlerTemplate(String type) { + return TemplateProvider.class.getClassLoader() + .getResourceAsStream("templates/error-handler-%s.yaml.tmpl".formatted(type)); + } + + /** + * Creates YAML snippet representing the endpoint properties section. + * + * @param props the properties to set as endpoint properties. + * @return + */ + default String asEndpointProperties(Map props) { + StringBuilder sb = new StringBuilder(); + if (props.isEmpty()) { + // create a dummy placeholder, so it is easier to add new properties manually + return sb.append("#properties:\n ").append("#key: \"value\"").toString(); + } + + sb.append("properties:\n"); + for (Map.Entry propertyEntry : props.entrySet()) { + sb.append(" ").append(propertyEntry.getKey()).append(": ") + .append(propertyEntry.getValue()).append("\n"); + } + return sb.toString().trim(); + } + + /** + * Creates YAML snippet representing the error handler parameters section. + * + * @param props the properties to set as error handler parameters. + */ + default String asErrorHandlerParameters(Map props) { + if (props.isEmpty()) { + return "parameters: {}"; + } + + StringBuilder sb = new StringBuilder(); + sb.append("parameters:\n"); + for (Map.Entry propertyEntry : props.entrySet()) { + sb.append(" ").append(propertyEntry.getKey()).append(": ").append(propertyEntry.getValue()).append("\n"); + } + return sb.toString().trim(); + } + + /** + * Get additional indent that should be applied to endpoint templates. + * + * @param type the endpoint type. + * @return + */ + default int getAdditionalIndent(BindingProvider.EndpointType type) { + if (type == BindingProvider.EndpointType.ERROR_HANDLER) { + return 4; + } + + return 0; + } +} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/UriBindingProvider.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/UriBindingProvider.java new file mode 100644 index 0000000000000..746952e6db548 --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/bind/UriBindingProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.dsl.jbang.core.commands.bind; + +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.camel.util.IOHelper; +import org.apache.camel.util.StringHelper; +import org.apache.camel.util.URISupport; + +public class UriBindingProvider implements BindingProvider { + + private static final Pattern CAMEL_ENDPOINT_URI_PATTERN = Pattern.compile("^[a-z0-9+][a-zA-Z0-9-+]*:.*$"); + + @Override + public String getEndpoint( + EndpointType type, String uriExpression, Map endpointProperties, TemplateProvider templateProvider) + throws Exception { + String endpointUri = uriExpression; + Map endpointUriProperties = new HashMap<>(); + if (uriExpression.contains("?")) { + endpointUri = StringHelper.before(uriExpression, "?"); + String query = StringHelper.after(uriExpression, "?"); + if (query != null) { + endpointUriProperties = URISupport.parseQuery(query, true); + } + } + + endpointProperties.putAll(endpointUriProperties); + + InputStream is; + if (type == EndpointType.STEP) { + is = templateProvider.getStepTemplate("uri"); + } else { + is = templateProvider.getEndpointTemplate("uri"); + } + + String context = IOHelper.loadText(is); + IOHelper.close(is); + + context = context.replaceFirst("\\{\\{ \\.URI }}", endpointUri); + context = context.replaceFirst("\\{\\{ \\.EndpointProperties }}\n", + templateProvider.asEndpointProperties(endpointProperties)); + + return context; + } + + @Override + public boolean canHandle(String uriExpression) { + return CAMEL_ENDPOINT_URI_PATTERN.matcher(uriExpression).matches(); + } +} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/common/Printer.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/common/Printer.java index 2aba6387a94ab..9d07be3ed1483 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/common/Printer.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/common/Printer.java @@ -39,6 +39,10 @@ default void printf(String format, Object... args) { System.out.printf(format, args); } + default void printErr(Exception e) { + printf("Error: %s%n", e.getMessage()); + } + /** * Default printer uses System out print stream. */ diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/endpoint-ref.yaml.tmpl b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/endpoint-ref.yaml.tmpl new file mode 100644 index 0000000000000..40227006c13cf --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/endpoint-ref.yaml.tmpl @@ -0,0 +1,6 @@ + ref: + kind: {{ .Kind }} + apiVersion: {{ .ApiVersion }} + name: {{ .Name }} +{{ .Namespace }} + {{ .EndpointProperties }} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/endpoint-uri.yaml.tmpl b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/endpoint-uri.yaml.tmpl new file mode 100644 index 0000000000000..6af1eef795ed2 --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/endpoint-uri.yaml.tmpl @@ -0,0 +1,2 @@ + uri: {{ .URI }} + {{ .EndpointProperties }} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/error-handler-sink-kamelet.yaml.tmpl b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/error-handler-sink-kamelet.yaml.tmpl deleted file mode 100644 index 2b418ebb2f23a..0000000000000 --- a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/error-handler-sink-kamelet.yaml.tmpl +++ /dev/null @@ -1,8 +0,0 @@ - sink: - endpoint: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: {{ .Name }} - {{ .ErrorHandlerProperties }} - {{ .ErrorHandlerParameter }} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/error-handler-sink-uri.yaml.tmpl b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/error-handler-sink-uri.yaml.tmpl deleted file mode 100644 index ff67827b25a18..0000000000000 --- a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/error-handler-sink-uri.yaml.tmpl +++ /dev/null @@ -1,5 +0,0 @@ - sink: - endpoint: - uri: {{ .Name }} - {{ .ErrorHandlerProperties }} - {{ .ErrorHandlerParameter }} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/error-handler-sink.yaml.tmpl b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/error-handler-sink.yaml.tmpl new file mode 100644 index 0000000000000..127ed0687664f --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/error-handler-sink.yaml.tmpl @@ -0,0 +1,4 @@ + sink: + endpoint: +{{ .Endpoint }} + {{ .ErrorHandlerParameter }} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe-kamelet-kamelet.yaml.tmpl b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe-kamelet-kamelet.yaml.tmpl deleted file mode 100644 index b2536e7267021..0000000000000 --- a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe-kamelet-kamelet.yaml.tmpl +++ /dev/null @@ -1,19 +0,0 @@ -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: {{ .Name }} -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: {{ .Source }} - {{ .SourceProperties }} -{{ .Steps }} - sink: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: {{ .Sink }} - {{ .SinkProperties }} -{{ .ErrorHandler }} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe-kamelet-uri.yaml.tmpl b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe-kamelet-uri.yaml.tmpl deleted file mode 100644 index 560840b183871..0000000000000 --- a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe-kamelet-uri.yaml.tmpl +++ /dev/null @@ -1,16 +0,0 @@ -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: {{ .Name }} -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: {{ .Source }} - {{ .SourceProperties }} -{{ .Steps }} - sink: - uri: {{ .Sink }} - {{ .SinkProperties }} -{{ .ErrorHandler }} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe-uri-kamelet.yaml.tmpl b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe-uri-kamelet.yaml.tmpl deleted file mode 100644 index 44b3bfc296c4b..0000000000000 --- a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe-uri-kamelet.yaml.tmpl +++ /dev/null @@ -1,16 +0,0 @@ -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: {{ .Name }} -spec: - source: - uri: {{ .Source }} - {{ .SourceProperties }} -{{ .Steps }} - sink: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: {{ .Sink }} - {{ .SinkProperties }} -{{ .ErrorHandler }} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe-uri-uri.yaml.tmpl b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe.yaml.tmpl similarity index 57% rename from dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe-uri-uri.yaml.tmpl rename to dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe.yaml.tmpl index 2e0050262fae0..a354cc3fc55b5 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe-uri-uri.yaml.tmpl +++ b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/pipe.yaml.tmpl @@ -4,10 +4,8 @@ metadata: name: {{ .Name }} spec: source: - uri: {{ .Source }} - {{ .SourceProperties }} +{{ .Source }} {{ .Steps }} sink: - uri: {{ .Sink }} - {{ .SinkProperties }} +{{ .Sink }} {{ .ErrorHandler }} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/step-kamelet.yaml.tmpl b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/step-kamelet.yaml.tmpl deleted file mode 100644 index 38f3214fbff22..0000000000000 --- a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/step-kamelet.yaml.tmpl +++ /dev/null @@ -1,5 +0,0 @@ - - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: {{ .Name }} - {{ .StepProperties }} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/step-ref.yaml.tmpl b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/step-ref.yaml.tmpl new file mode 100644 index 0000000000000..e0dada131762b --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/step-ref.yaml.tmpl @@ -0,0 +1,5 @@ + - ref: + kind: {{ .Kind }} + apiVersion: {{ .ApiVersion }} + name: {{ .Name }} + {{ .EndpointProperties }} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/step-uri.yaml.tmpl b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/step-uri.yaml.tmpl index dc1c371c06a65..014caf06d15d4 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/step-uri.yaml.tmpl +++ b/dsl/camel-jbang/camel-jbang-core/src/main/resources/templates/step-uri.yaml.tmpl @@ -1,2 +1,2 @@ - - uri: {{ .Name }} - {{ .StepProperties }} + - uri: {{ .URI }} + {{ .EndpointProperties }} diff --git a/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/bind/BindKnativeBrokerTest.java b/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/bind/BindKnativeBrokerTest.java new file mode 100644 index 0000000000000..7ea3c92a61f46 --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/bind/BindKnativeBrokerTest.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.dsl.jbang.core.commands.bind; + +import org.apache.camel.dsl.jbang.core.commands.CamelCommandBaseTest; +import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; +import org.apache.camel.util.StringHelper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class BindKnativeBrokerTest extends CamelCommandBaseTest { + + @Test + public void shouldBindToKnativeBroker() throws Exception { + Bind command = createCommand("timer-source", "knative:broker:default"); + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-broker-default + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: "hello world" + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + #properties: + #key: "value" + """.trim(), output); + } + + @Test + public void shouldBindToNamespacedKnativeBroker() throws Exception { + Bind command = createCommand("timer-source", "knative:broker:default"); + + command.sink = "knative:broker:my-namespace/default"; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-broker-default + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: "hello world" + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + namespace: my-namespace + #properties: + #key: "value" + """.trim(), output); + } + + @Test + public void shouldBindToKnativeBrokerWithProperties() throws Exception { + Bind command = createCommand("timer-source", "knative:broker:default"); + + command.properties = new String[] { + "source.message=Hello", + "sink.type=my-event", + }; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-broker-default + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: Hello + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: my-event + """.trim(), output); + } + + @Test + public void shouldBindToKnativeBrokerWithUriProperties() throws Exception { + Bind command = createCommand("timer-source", "knative:broker:default?type=my-event&source=camel"); + + command.properties = new String[] { + "source.message=Hello", + }; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-broker-default + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: Hello + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: my-event + source: camel + """.trim(), output); + } + + @Test + public void shouldBindKnativeBrokerSource() throws Exception { + Bind command = createCommand("knative:broker:default", "log-sink"); + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: broker-default-to-log + spec: + source: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.event + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + #properties: + #key: "value" + """.trim(), output); + } + + @Test + public void shouldBindNamespacedKnativeBrokerSource() throws Exception { + Bind command = createCommand("knative:broker:default", "log-sink"); + + command.source = "knative:broker:my-namespace/default"; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: broker-default-to-log + spec: + source: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + namespace: my-namespace + properties: + type: org.apache.camel.event + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + #properties: + #key: "value" + """.trim(), output); + } + + @Test + public void shouldBindKnativeBrokerSourceWithProperties() throws Exception { + Bind command = createCommand("knative:broker:default", "log-sink"); + + command.properties = new String[] { + "source.type=my-event", + "sink.showHeaders=true", + }; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: broker-default-to-log + spec: + source: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: my-event + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + showHeaders: true + """.trim(), output); + } + + @Test + public void shouldBindKnativeBrokerSourceWithUriProperties() throws Exception { + Bind command = createCommand("knative:broker:default?type=my-event&source=camel", "log-sink"); + + command.properties = new String[] { + "sink.showHeaders=true", + + }; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: broker-default-to-log + spec: + source: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: my-event + source: camel + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + showHeaders: true + """.trim(), output); + } + + private Bind createCommand(String source, String sink) { + Bind command = new Bind(new CamelJBangMain().withPrinter(printer)); + + String sourceName; + if (source.startsWith("knative:")) { + sourceName = StringHelper.after(source, "knative:").replaceAll(":", "-"); + if (sourceName.contains("?")) { + sourceName = StringHelper.before(sourceName, "?"); + } + } else { + sourceName = StringHelper.before(source, "-source"); + } + + String sinkName; + if (sink.startsWith("knative:")) { + sinkName = StringHelper.after(sink, "knative:").replaceAll(":", "-"); + if (sinkName.contains("?")) { + sinkName = StringHelper.before(sinkName, "?"); + } + } else { + sinkName = StringHelper.before(sink, "-sink"); + } + + command.file = sourceName + "-to-" + sinkName + ".yaml"; + command.source = source; + command.sink = sink; + command.output = "yaml"; + + return command; + } +} diff --git a/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/bind/BindKnativeChannelTest.java b/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/bind/BindKnativeChannelTest.java new file mode 100644 index 0000000000000..f88ff8ce2b9a5 --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/bind/BindKnativeChannelTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.dsl.jbang.core.commands.bind; + +import org.apache.camel.dsl.jbang.core.commands.CamelCommandBaseTest; +import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; +import org.apache.camel.util.StringHelper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class BindKnativeChannelTest extends CamelCommandBaseTest { + + @Test + public void shouldBindToKnativeChannel() throws Exception { + Bind command = createCommand("timer-source", "knative:channel:my-channel"); + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-channel-my-channel + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: "hello world" + sink: + ref: + kind: Channel + apiVersion: messaging.knative.dev/v1 + name: my-channel + #properties: + #key: "value" + """.trim(), output); + } + + @Test + public void shouldBindToKnativeChannelWithProperties() throws Exception { + Bind command = createCommand("timer-source", "knative:channel:my-channel"); + + command.properties = new String[] { + "source.message=Hello", + "sink.type=my-event", + }; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-channel-my-channel + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: Hello + sink: + ref: + kind: Channel + apiVersion: messaging.knative.dev/v1 + name: my-channel + properties: + type: my-event + """.trim(), output); + } + + @Test + public void shouldBindToKnativeChannelWithUriProperties() throws Exception { + Bind command = createCommand("timer-source", "knative:channel:my-channel?type=my-event&source=camel"); + + command.properties = new String[] { + "source.message=Hello", + }; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-channel-my-channel + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: Hello + sink: + ref: + kind: Channel + apiVersion: messaging.knative.dev/v1 + name: my-channel + properties: + type: my-event + source: camel + """.trim(), output); + } + + @Test + public void shouldBindKnativeChannelSource() throws Exception { + Bind command = createCommand("knative:channel:my-channel", "log-sink"); + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: channel-my-channel-to-log + spec: + source: + ref: + kind: Channel + apiVersion: messaging.knative.dev/v1 + name: my-channel + #properties: + #key: "value" + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + #properties: + #key: "value" + """.trim(), output); + } + + @Test + public void shouldBindKnativeChannelSourceWithProperties() throws Exception { + Bind command = createCommand("knative:channel:my-channel", "log-sink"); + + command.properties = new String[] { + "source.type=my-event", + "sink.showHeaders=true", + }; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: channel-my-channel-to-log + spec: + source: + ref: + kind: Channel + apiVersion: messaging.knative.dev/v1 + name: my-channel + properties: + type: my-event + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + showHeaders: true + """.trim(), output); + } + + @Test + public void shouldBindKnativeChannelSourceWithUriProperties() throws Exception { + Bind command = createCommand("knative:channel:my-channel?type=my-event&source=camel", "log-sink"); + + command.properties = new String[] { + "sink.showHeaders=true", + + }; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: channel-my-channel-to-log + spec: + source: + ref: + kind: Channel + apiVersion: messaging.knative.dev/v1 + name: my-channel + properties: + type: my-event + source: camel + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + showHeaders: true + """.trim(), output); + } + + private Bind createCommand(String source, String sink) { + Bind command = new Bind(new CamelJBangMain().withPrinter(printer)); + + String sourceName; + if (source.startsWith("knative:")) { + sourceName = StringHelper.after(source, "knative:").replaceAll(":", "-"); + if (sourceName.contains("?")) { + sourceName = StringHelper.before(sourceName, "?"); + } + } else { + sourceName = StringHelper.before(source, "-source"); + } + + String sinkName; + if (sink.startsWith("knative:")) { + sinkName = StringHelper.after(sink, "knative:").replaceAll(":", "-"); + if (sinkName.contains("?")) { + sinkName = StringHelper.before(sinkName, "?"); + } + } else { + sinkName = StringHelper.before(sink, "-sink"); + } + + command.file = sourceName + "-to-" + sinkName + ".yaml"; + command.source = source; + command.sink = sink; + command.output = "yaml"; + + return command; + } +} diff --git a/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/bind/BindObjectReferenceTest.java b/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/bind/BindObjectReferenceTest.java new file mode 100644 index 0000000000000..78a733c3eafe8 --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/bind/BindObjectReferenceTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.dsl.jbang.core.commands.bind; + +import org.apache.camel.dsl.jbang.core.commands.CamelCommandBaseTest; +import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; +import org.apache.camel.util.StringHelper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class BindObjectReferenceTest extends CamelCommandBaseTest { + + @Test + public void shouldBindToObjectReference() throws Exception { + Bind command = createCommand("timer", "foo"); + + command.sink = "sandbox.camel.apache.org/v1:Foo:bar"; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-foo + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: "hello world" + sink: + ref: + kind: Foo + apiVersion: sandbox.camel.apache.org/v1 + name: bar + #properties: + #key: "value" + """.trim(), output); + } + + @Test + public void shouldBindToNamespacedObjectReference() throws Exception { + Bind command = createCommand("timer", "foo"); + + command.sink = "sandbox.camel.apache.org/v1alpha1:Foo:my-namespace/bar"; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-foo + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: "hello world" + sink: + ref: + kind: Foo + apiVersion: sandbox.camel.apache.org/v1alpha1 + name: bar + namespace: my-namespace + #properties: + #key: "value" + """.trim(), output); + } + + @Test + public void shouldBindToObjectReferenceWithProperties() throws Exception { + Bind command = createCommand("timer", "foo"); + + command.sink = "sandbox.camel.apache.org/v1:Foo:bar"; + command.properties = new String[] { + "source.message=Hello", + "sink.foo=bar", + "sink.bar=baz", + }; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-foo + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: Hello + sink: + ref: + kind: Foo + apiVersion: sandbox.camel.apache.org/v1 + name: bar + properties: + bar: baz + foo: bar + """.trim(), output); + } + + @Test + public void shouldBindToObjectReferenceWithUriProperties() throws Exception { + Bind command = createCommand("timer", "foo"); + + command.sink = "sandbox.camel.apache.org/v1:Foo:bar?bar=baz&foo=bar"; + command.properties = new String[] { + "source.message=Hello", + }; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-foo + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: Hello + sink: + ref: + kind: Foo + apiVersion: sandbox.camel.apache.org/v1 + name: bar + properties: + bar: baz + foo: bar + """.trim(), output); + } + + @Test + public void shouldHandleInvalidObjectReference() throws Exception { + Bind command = createCommand("timer", "foo"); + + command.sink = "sandbox.camel.apache.org:Foo:bar"; // missing api version + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals( + """ + Error: Failed to resolve endpoint URI expression sandbox.camel.apache.org:Foo:bar - no matching binding provider found + Failed to construct Pipe resource + """ + .trim(), + output); + } + + private Bind createCommand(String source, String sink) { + Bind command = new Bind(new CamelJBangMain().withPrinter(printer)); + + String sourceName; + String sourceUri; + if (source.contains(":")) { + sourceName = StringHelper.before(source, ":"); + sourceUri = source; + } else { + sourceName = source; + sourceUri = source + "-source"; + } + + String sinkName; + String sinkUri; + if (sink.contains(":")) { + sinkName = StringHelper.before(sink, ":"); + sinkUri = sink; + } else { + sinkName = sink; + sinkUri = sink + "-sink"; + } + + command.file = sourceName + "-to-" + sinkName + ".yaml"; + command.source = sourceUri; + command.sink = sinkUri; + command.output = "yaml"; + + return command; + } +} diff --git a/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/BindTest.java b/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/bind/BindTest.java similarity index 90% rename from dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/BindTest.java rename to dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/bind/BindTest.java index bdde0d78c43db..8f961258e745b 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/BindTest.java +++ b/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/bind/BindTest.java @@ -15,8 +15,10 @@ * limitations under the License. */ -package org.apache.camel.dsl.jbang.core.commands; +package org.apache.camel.dsl.jbang.core.commands.bind; +import org.apache.camel.dsl.jbang.core.commands.CamelCommandBaseTest; +import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; import org.apache.camel.util.StringHelper; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -52,6 +54,72 @@ public void shouldBindKameletSourceToKameletSink() throws Exception { """.trim(), output); } + @Test + public void shouldBindNamespacedKamelets() throws Exception { + Bind command = createCommand("timer", "log"); + command.source = "my-namespace/timer-source"; + command.sink = "my-namespace/log-sink"; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-log + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + namespace: my-namespace + properties: + message: "hello world" + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + namespace: my-namespace + #properties: + #key: "value" + """.trim(), output); + } + + @Test + public void shouldBindKameletsExplicitPrefix() throws Exception { + Bind command = createCommand("timer", "log"); + command.source = "kamelet:timer-source"; + command.sink = "kamelet:log-sink"; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-log + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: "hello world" + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + #properties: + #key: "value" + """.trim(), output); + } + @Test public void shouldBindKameletSourceToKameletSinkWithProperties() throws Exception { Bind command = createCommand("timer", "log"); @@ -89,6 +157,38 @@ public void shouldBindKameletSourceToKameletSinkWithProperties() throws Exceptio """.trim(), output); } + @Test + public void shouldBindKameletsWithUriProperties() throws Exception { + Bind command = createCommand("timer", "log"); + command.source = "timer-source?message=Hi"; + command.sink = "log-sink?showHeaders=true"; + + command.doCall(); + + String output = printer.getOutput(); + Assertions.assertEquals(""" + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-to-log + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: Hi + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + showHeaders: true + """.trim(), output); + } + @Test public void shouldBindWithSteps() throws Exception { Bind command = createCommand("timer", "http"); diff --git a/dsl/camel-jbang/camel-jbang-plugin-k/src/main/java/org/apache/camel/dsl/jbang/core/commands/k/Bind.java b/dsl/camel-jbang/camel-jbang-plugin-k/src/main/java/org/apache/camel/dsl/jbang/core/commands/k/Bind.java index c5531d1add460..a1e05446c0445 100644 --- a/dsl/camel-jbang/camel-jbang-plugin-k/src/main/java/org/apache/camel/dsl/jbang/core/commands/k/Bind.java +++ b/dsl/camel-jbang/camel-jbang-plugin-k/src/main/java/org/apache/camel/dsl/jbang/core/commands/k/Bind.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; +import org.apache.camel.dsl.jbang.core.commands.bind.TemplateProvider; import org.apache.camel.util.ObjectHelper; import org.apache.camel.v1.Pipe; import org.apache.camel.v1.integrationspec.Traits; @@ -37,7 +38,7 @@ sortOptions = false) public class Bind extends KubeBaseCommand { - private final org.apache.camel.dsl.jbang.core.commands.Bind delegate; + private final org.apache.camel.dsl.jbang.core.commands.bind.Bind delegate; @CommandLine.Parameters(description = "The name of the Pipe resource created on the cluster.", arity = "1", paramLabel = "", parameterConsumer = NameConsumer.class) @@ -96,12 +97,12 @@ public class Bind extends KubeBaseCommand { public Bind(CamelJBangMain main) { super(main); - delegate = new org.apache.camel.dsl.jbang.core.commands.Bind( - main, new org.apache.camel.dsl.jbang.core.commands.Bind.TemplateProvider() { + delegate = new org.apache.camel.dsl.jbang.core.commands.bind.Bind( + main, new TemplateProvider() { @Override - public InputStream getPipeTemplate(String in, String out) { + public InputStream getPipeTemplate() { return Bind.class.getClassLoader() - .getResourceAsStream("templates/pipe-" + in + "-" + out + ".yaml.tmpl"); + .getResourceAsStream("templates/pipe.yaml.tmpl"); } }); } diff --git a/dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe-kamelet-kamelet.yaml.tmpl b/dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe-kamelet-kamelet.yaml.tmpl deleted file mode 100644 index 400ae37bedf6a..0000000000000 --- a/dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe-kamelet-kamelet.yaml.tmpl +++ /dev/null @@ -1,21 +0,0 @@ -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: {{ .Name }} -{{ .Annotations }} -spec: -{{ .IntegrationSpec }} - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: {{ .Source }} - {{ .SourceProperties }} -{{ .Steps }} - sink: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: {{ .Sink }} - {{ .SinkProperties }} -{{ .ErrorHandler }} diff --git a/dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe-kamelet-uri.yaml.tmpl b/dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe-kamelet-uri.yaml.tmpl deleted file mode 100644 index 1ac52571ecea8..0000000000000 --- a/dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe-kamelet-uri.yaml.tmpl +++ /dev/null @@ -1,18 +0,0 @@ -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: {{ .Name }} -{{ .Annotations }} -spec: -{{ .IntegrationSpec }} - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: {{ .Source }} - {{ .SourceProperties }} -{{ .Steps }} - sink: - uri: {{ .Sink }} - {{ .SinkProperties }} -{{ .ErrorHandler }} diff --git a/dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe-uri-kamelet.yaml.tmpl b/dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe-uri-kamelet.yaml.tmpl deleted file mode 100644 index e6c33789d943a..0000000000000 --- a/dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe-uri-kamelet.yaml.tmpl +++ /dev/null @@ -1,18 +0,0 @@ -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: {{ .Name }} -{{ .Annotations }} -spec: -{{ .IntegrationSpec }} - source: - uri: {{ .Source }} - {{ .SourceProperties }} -{{ .Steps }} - sink: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: {{ .Sink }} - {{ .SinkProperties }} -{{ .ErrorHandler }} diff --git a/dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe-uri-uri.yaml.tmpl b/dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe.yaml.tmpl similarity index 63% rename from dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe-uri-uri.yaml.tmpl rename to dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe.yaml.tmpl index 2a181b6ece522..72351a2555733 100644 --- a/dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe-uri-uri.yaml.tmpl +++ b/dsl/camel-jbang/camel-jbang-plugin-k/src/main/resources/templates/pipe.yaml.tmpl @@ -6,10 +6,8 @@ metadata: spec: {{ .IntegrationSpec }} source: - uri: {{ .Source }} - {{ .SourceProperties }} +{{ .Source }} {{ .Steps }} sink: - uri: {{ .Sink }} - {{ .SinkProperties }} +{{ .Sink }} {{ .ErrorHandler }}