Skip to content

Commit

Permalink
STORM-3429: closure: fix all checkstyle warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
krichter722 committed Jun 27, 2019
1 parent b4f9eb1 commit b203ae3
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 172 deletions.
2 changes: 1 addition & 1 deletion storm-clojure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
<maxAllowedViolations>173</maxAllowedViolations>
<maxAllowedViolations>0</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.clojure;

import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.PersistentArrayMap;
import clojure.lang.RT;
import clojure.lang.Symbol;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -31,47 +38,43 @@
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.PersistentArrayMap;
import clojure.lang.RT;
import clojure.lang.Symbol;

public class ClojureBolt implements IRichBolt, FinishedCallback {
Map<String, StreamInfo> _fields;
List<String> _fnSpec;
List<String> _confSpec;
List<Object> _params;
Map<String, StreamInfo> fields;
List<String> fnSpec;
List<String> confSpec;
List<Object> params;

IBolt _bolt;
IBolt bolt;

public ClojureBolt(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
_fnSpec = fnSpec;
_confSpec = confSpec;
_params = params;
_fields = fields;
this.fnSpec = fnSpec;
this.confSpec = confSpec;
this.params = params;
this.fields = fields;
}

@Override
public void prepare(final Map<String, Object> topoConf, final TopologyContext context, final OutputCollector collector) {
IFn hof = ClojureUtil.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
IFn hof = ClojureUtil.loadClojureFn(fnSpec.get(0), fnSpec.get(1));
try {
IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
IFn preparer = (IFn) hof.applyTo(RT.seq(params));
final Map<Keyword,Object> collectorMap = new PersistentArrayMap(new Object[] {
Keyword.intern(Symbol.create("output-collector")), collector,
Keyword.intern(Symbol.create("context")), context});
List<Object> args = new ArrayList<Object>() {{
add(topoConf);
add(context);
add(collectorMap);
}};
List<Object> args = new ArrayList<Object>() {
{
add(topoConf);
add(context);
add(collectorMap);
}
};

_bolt = (IBolt) preparer.applyTo(RT.seq(args));
bolt = (IBolt) preparer.applyTo(RT.seq(args));
//this is kind of unnecessary for clojure
try {
_bolt.prepare(topoConf, context, collector);
} catch(AbstractMethodError ame) {

bolt.prepare(topoConf, context, collector);
} catch (AbstractMethodError ame) {
//ignore
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -80,38 +83,38 @@ public void prepare(final Map<String, Object> topoConf, final TopologyContext co

@Override
public void execute(Tuple input) {
_bolt.execute(new ClojureTuple(input));
bolt.execute(new ClojureTuple(input));
}

@Override
public void cleanup() {
try {
_bolt.cleanup();
} catch(AbstractMethodError ame) {

bolt.cleanup();
} catch (AbstractMethodError ame) {
//ignore
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
for(String stream: _fields.keySet()) {
StreamInfo info = _fields.get(stream);
for (String stream: fields.keySet()) {
StreamInfo info = fields.get(stream);
declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
}
}

@Override
public void finishedId(Object id) {
if(_bolt instanceof FinishedCallback) {
((FinishedCallback) _bolt).finishedId(id);
if (bolt instanceof FinishedCallback) {
((FinishedCallback) bolt).finishedId(id);
}
}

@Override
public Map<String, Object> getComponentConfiguration() {
IFn hof = ClojureUtil.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
IFn hof = ClojureUtil.loadClojureFn(confSpec.get(0), confSpec.get(1));
try {
return (Map) hof.applyTo(RT.seq(_params));
return (Map) hof.applyTo(RT.seq(params));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.clojure;

import org.apache.storm.serialization.SerializationRegister;
import carbonite.JavaBridge;

import com.esotericsoftware.kryo.Kryo;

import carbonite.JavaBridge;
import org.apache.storm.serialization.SerializationRegister;

public class ClojureSerializationRegister implements SerializationRegister {

Expand Down
101 changes: 52 additions & 49 deletions storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.clojure;

import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.PersistentArrayMap;
import clojure.lang.RT;
import clojure.lang.Symbol;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -29,48 +36,44 @@
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;

import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.PersistentArrayMap;
import clojure.lang.RT;
import clojure.lang.Symbol;

public class ClojureSpout implements IRichSpout {
Map<String, StreamInfo> _fields;
List<String> _fnSpec;
List<String> _confSpec;
List<Object> _params;
Map<String, StreamInfo> fields;
List<String> fnSpec;
List<String> confSpec;
List<Object> params;

ISpout _spout;
ISpout spout;

public ClojureSpout(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
_fnSpec = fnSpec;
_confSpec = confSpec;
_params = params;
_fields = fields;
this.fnSpec = fnSpec;
this.confSpec = confSpec;
this.params = params;
this.fields = fields;
}


@Override
public void open(final Map<String, Object> conf, final TopologyContext context, final SpoutOutputCollector collector) {
IFn hof = ClojureUtil.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
IFn hof = ClojureUtil.loadClojureFn(fnSpec.get(0), fnSpec.get(1));
try {
IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
IFn preparer = (IFn) hof.applyTo(RT.seq(params));
final Map<Keyword,Object> collectorMap = new PersistentArrayMap(new Object[] {
Keyword.intern(Symbol.create("output-collector")), collector,
Keyword.intern(Symbol.create("context")), context});
List<Object> args = new ArrayList<Object>() {{
add(conf);
add(context);
add(collectorMap);
}};
List<Object> args = new ArrayList<Object>() {
{
add(conf);
add(context);
add(collectorMap);
}
};

_spout = (ISpout) preparer.applyTo(RT.seq(args));
spout = (ISpout) preparer.applyTo(RT.seq(args));
//this is kind of unnecessary for clojure
try {
_spout.open(conf, context, collector);
} catch(AbstractMethodError ame) {

spout.open(conf, context, collector);
} catch (AbstractMethodError ame) {
//ignore
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -80,55 +83,55 @@ public void open(final Map<String, Object> conf, final TopologyContext context,
@Override
public void close() {
try {
_spout.close();
} catch(AbstractMethodError ame) {
spout.close();
} catch (AbstractMethodError ame) {
//ignore
}
}

@Override
public void nextTuple() {
try {
_spout.nextTuple();
} catch(AbstractMethodError ame) {
spout.nextTuple();
} catch (AbstractMethodError ame) {
//ignore
}

}

@Override
public void ack(Object msgId) {
try {
_spout.ack(msgId);
} catch(AbstractMethodError ame) {
spout.ack(msgId);
} catch (AbstractMethodError ame) {
//ignore
}

}

@Override
public void fail(Object msgId) {
try {
_spout.fail(msgId);
} catch(AbstractMethodError ame) {
spout.fail(msgId);
} catch (AbstractMethodError ame) {
//ignore
}

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
for(String stream: _fields.keySet()) {
StreamInfo info = _fields.get(stream);
for (String stream: fields.keySet()) {
StreamInfo info = fields.get(stream);
declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
}
}

@Override
public Map<String, Object> getComponentConfiguration() {
IFn hof = ClojureUtil.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
IFn hof = ClojureUtil.loadClojureFn(confSpec.get(0), confSpec.get(1));
try {
return (Map) hof.applyTo(RT.seq(_params));
return (Map) hof.applyTo(RT.seq(params));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -137,18 +140,18 @@ public Map<String, Object> getComponentConfiguration() {
@Override
public void activate() {
try {
_spout.activate();
} catch(AbstractMethodError ame) {
spout.activate();
} catch (AbstractMethodError ame) {
//ignore
}
}

@Override
public void deactivate() {
try {
_spout.deactivate();
} catch(AbstractMethodError ame) {
spout.deactivate();
} catch (AbstractMethodError ame) {
//ignore
}
}
}
Loading

0 comments on commit b203ae3

Please sign in to comment.