diff --git a/sql/storm-sql-core/pom.xml b/sql/storm-sql-core/pom.xml index b2b37774f2b..73636f9f58e 100644 --- a/sql/storm-sql-core/pom.xml +++ b/sql/storm-sql-core/pom.xml @@ -207,7 +207,7 @@ maven-checkstyle-plugin - 59 + 0 diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java index e57ffd867c7..e4715f88869 100644 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java @@ -19,8 +19,8 @@ /** * The StormSql class provides standalone, interactive interfaces to execute * SQL statements over streaming data. - *

- * The StormSql class is stateless. The user needs to submit the data + * + *

The StormSql class is stateless. The user needs to submit the data * definition language (DDL) statements and the query statements in the same * batch. */ diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java index fff801daa77..2f3006395e0 100644 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java @@ -78,14 +78,13 @@ public void submit( @Override public void explain(Iterable statements) throws Exception { for (String sql : statements) { - StormParser parser = new StormParser(sql); - SqlNode node = parser.impl().parseSqlStmtEof(); - System.out.println("==========================================================="); System.out.println("query>"); System.out.println(sql); System.out.println("-----------------------------------------------------------"); + StormParser parser = new StormParser(sql); + SqlNode node = parser.impl().parseSqlStmtEof(); if (node instanceof SqlCreateTable) { sqlContext.interpretCreateTable((SqlCreateTable) node); System.out.println("No plan presented on DDL"); diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java index 3ff5394eb43..edcc46d2d18 100644 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java @@ -56,8 +56,8 @@ public static void main(String[] args) throws Exception { SubmitOptions submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE); sql.submit(topoName, stmts, conf, submitOptions, null, null); } else { - printUsageAndExit(options, "Either " + OPTION_SQL_TOPOLOGY_NAME_LONG + " or " + OPTION_SQL_EXPLAIN_LONG + - " must be presented"); + printUsageAndExit(options, "Either " + OPTION_SQL_TOPOLOGY_NAME_LONG + + " or " + OPTION_SQL_EXPLAIN_LONG + " must be presented"); } } diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java index 3cb0f9640b3..a91199c4fa6 100644 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java @@ -12,9 +12,16 @@ package org.apache.storm.sql.compiler; +import static org.apache.calcite.rel.RelFieldCollation.Direction; +import static org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING; +import static org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING; +import static org.apache.calcite.rel.RelFieldCollation.NullDirection; +import static org.apache.calcite.sql.validate.SqlMonotonicity.INCREASING; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; + import java.util.ArrayList; import org.apache.calcite.config.CalciteConnectionConfig; @@ -33,17 +40,11 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.validate.SqlMonotonicity; import org.apache.calcite.util.ImmutableBitSet; -import org.apache.calcite.util.Util; + import org.apache.storm.sql.calcite.ParallelStreamableTable; import org.apache.storm.sql.calcite.ParallelTable; import org.apache.storm.sql.parser.ColumnConstraint; -import static org.apache.calcite.sql.validate.SqlMonotonicity.INCREASING; -import static org.apache.calcite.rel.RelFieldCollation.Direction; -import static org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING; -import static org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING; -import static org.apache.calcite.rel.RelFieldCollation.NullDirection; - public class CompilerUtil { public static class TableBuilderInfo { diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java index 589154a2501..99e557f18a2 100644 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java @@ -52,9 +52,10 @@ /** * Compiles a scalar expression ({@link org.apache.calcite.rex.RexNode}) to Java source code String. - *

- * This code is inspired by JaninoRexCompiler in Calcite, but while it is returning {@link org.apache.calcite.interpreter.Scalar} which is - * executable, we need to pass the source code to compile and serialize instance so that it can be executed on worker efficiently. + * + *

This code is inspired by JaninoRexCompiler in Calcite, but while it is returning + * {@link org.apache.calcite.interpreter.Scalar} which is executable, we need to pass the source code to compile and + * serialize instance so that it can be executed on worker efficiently. */ public class RexNodeToJavaCodeCompiler { private final RexBuilder rexBuilder; @@ -64,8 +65,8 @@ public RexNodeToJavaCodeCompiler(RexBuilder rexBuilder) { } /** - * Given a method that implements {@link ExecutableExpression#execute(Context, Object[])}, adds a bridge method that implements {@link - * ExecutableExpression#execute(Context)}, and compiles. + * Given a method that implements {@link ExecutableExpression#execute(Context, Object[])}, adds a bridge method that + * implements {@link ExecutableExpression#execute(Context)}, and compiles. */ static String baz(ParameterExpression context, ParameterExpression outputValues, BlockStatement block, String className) { @@ -121,41 +122,21 @@ public BlockStatement compileToBlock(final RexProgram program) { return compileToBlock(program, context_, outputValues_).toBlock(); } - public String compile(List nodes, RelDataType inputRowType, String className) { - final RexProgramBuilder programBuilder = - new RexProgramBuilder(inputRowType, rexBuilder); - for (RexNode node : nodes) { - programBuilder.addProject(node, null); - } - - return compile(programBuilder.getProgram(), className); - } - - public String compile(final RexProgram program, String className) { - final ParameterExpression context_ = - Expressions.parameter(Context.class, "context"); - final ParameterExpression outputValues_ = - Expressions.parameter(Object[].class, "outputValues"); - - BlockBuilder builder = compileToBlock(program, context_, outputValues_); - return baz(context_, outputValues_, builder.toBlock(), className); - } - private BlockBuilder compileToBlock(final RexProgram program, ParameterExpression context, - ParameterExpression outputValues) { + ParameterExpression outputValues) { RelDataType inputRowType = program.getInputRowType(); final BlockBuilder builder = new BlockBuilder(); final JavaTypeFactoryImpl javaTypeFactory = - new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem()); + new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem()); final RexToLixTranslator.InputGetter inputGetter = - new RexToLixTranslator.InputGetterImpl( - ImmutableList.of( - Pair.of( - Expressions.field(context, - BuiltInMethod.CONTEXT_VALUES.field), - PhysTypeImpl.of(javaTypeFactory, inputRowType, - JavaRowFormat.ARRAY, false)))); + new RexToLixTranslator.InputGetterImpl( + ImmutableList.of( + Pair.of( + Expressions.field(context, + BuiltInMethod.CONTEXT_VALUES.field), + PhysTypeImpl.of(javaTypeFactory, inputRowType, + JavaRowFormat.ARRAY, false)))); final Function1 correlates = new Function1() { @Override @@ -164,22 +145,42 @@ public RexToLixTranslator.InputGetter apply(String a0) { } }; final Expression root = - Expressions.field(context, BuiltInMethod.CONTEXT_ROOT.field); + Expressions.field(context, BuiltInMethod.CONTEXT_ROOT.field); final List list = - RexToLixTranslator.translateProjects(program, javaTypeFactory, builder, - null, root, inputGetter, correlates); + RexToLixTranslator.translateProjects(program, javaTypeFactory, builder, + null, root, inputGetter, correlates); for (int i = 0; i < list.size(); i++) { builder.add( - Expressions.statement( - Expressions.assign( - Expressions.arrayIndex(outputValues, - Expressions.constant(i)), - list.get(i)))); + Expressions.statement( + Expressions.assign( + Expressions.arrayIndex(outputValues, + Expressions.constant(i)), + list.get(i)))); } return builder; } + public String compile(List nodes, RelDataType inputRowType, String className) { + final RexProgramBuilder programBuilder = + new RexProgramBuilder(inputRowType, rexBuilder); + for (RexNode node : nodes) { + programBuilder.addProject(node, null); + } + + return compile(programBuilder.getProgram(), className); + } + + public String compile(final RexProgram program, String className) { + final ParameterExpression context_ = + Expressions.parameter(Context.class, "context"); + final ParameterExpression outputValues_ = + Expressions.parameter(Object[].class, "outputValues"); + + BlockBuilder builder = compileToBlock(program, context_, outputValues_); + return baz(context_, outputValues_, builder.toBlock(), className); + } + enum StormBuiltInMethod { EXPR_EXECUTE1(ExecutableExpression.class, "execute", Context.class), EXPR_EXECUTE2(ExecutableExpression.class, "execute", Context.class, Object[].class); diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java index 8e377d7dba8..aaf4bf77413 100644 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java @@ -125,7 +125,8 @@ public Class findClass(String name) throws ClassNotFoundException { } /** - * @return Whether compilation was successful. + * Compiles source code to byte code. + * @return indicates whether compilation was successful */ private boolean compileSourceCodeToByteCode( String className, String sourceCode, DiagnosticListener diagnosticListener) { @@ -195,7 +196,7 @@ public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOExcept /** * Provides an in-memory representation of JavaFileManager abstraction, so we do not need to write any files to disk. * - * When files are written to, rather than putting the bytes on disk, they are appended to buffers in byteCodeForClasses. + *

When files are written to, rather than putting the bytes on disk, they are appended to buffers in byteCodeForClasses. * * @see javax.tools.JavaFileManager */ diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java index b283a175252..f3cc2b80e00 100644 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java @@ -28,8 +28,12 @@ public class StormRelUtils { private static final AtomicInteger classSequence = new AtomicInteger(0); public static String getClassName(StreamsRel relNode) { - return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" + - classSequence.getAndIncrement(); + return "Generated_" + + relNode.getClass().getSimpleName().toUpperCase() + + "_" + + relNode.getId() + + "_" + + classSequence.getAndIncrement(); } public static StreamsRel getStormRelInput(RelNode input) { diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java index 95ddc00066b..0fd2ae1d1fe 100644 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java @@ -19,10 +19,9 @@ package org.apache.storm.sql.planner.streams; -import java.util.Iterator; - import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import java.util.Iterator; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.rel.rules.CalcMergeRule; import org.apache.calcite.rel.rules.FilterCalcMergeRule; diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsCalcRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsCalcRel.java index f4c2c7d0c40..9cdda935243 100644 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsCalcRel.java +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsCalcRel.java @@ -19,10 +19,9 @@ package org.apache.storm.sql.planner.streams.rel; +import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.List; - -import com.google.common.collect.Lists; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; @@ -54,13 +53,9 @@ public void streamsPlan(StreamsPlanCreator planCreator) throws Exception { // SingleRel RelNode input = getInput(); StormRelUtils.getStormRelInput(input).streamsPlan(planCreator); - Stream inputStream = planCreator.pop(); RelDataType inputRowType = getInput(0).getRowType(); - List outputFieldNames = getRowType().getFieldNames(); - int outputCount = outputFieldNames.size(); - // filter ExecutableExpression filterInstance = null; RexLocalRef condition = program.getCondition(); @@ -88,7 +83,10 @@ public void streamsPlan(StreamsPlanCreator planCreator) throws Exception { throw new IllegalStateException("Either projection or condition, or both should be provided."); } + List outputFieldNames = getRowType().getFieldNames(); + int outputCount = outputFieldNames.size(); EvaluationCalc evalCalc = new EvaluationCalc(filterInstance, projectionInstance, outputCount, planCreator.getDataContext()); + final Stream inputStream = planCreator.pop(); final Stream finalStream = inputStream.flatMap(evalCalc); planCreator.addStream(finalStream); diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java index f3b8994d62e..50c488df0ba 100644 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java @@ -19,10 +19,11 @@ package org.apache.storm.sql.planner.streams.rel; -import java.util.List; - import com.google.common.base.Joiner; import com.google.common.base.Preconditions; + +import java.util.List; + import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamScanRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamScanRel.java index 563ea238a6c..6ecf9b844d4 100644 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamScanRel.java +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamScanRel.java @@ -19,10 +19,9 @@ package org.apache.storm.sql.planner.streams.rel; +import com.google.common.base.Joiner; import java.util.List; import java.util.Map; - -import com.google.common.base.Joiner; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet;