-
Notifications
You must be signed in to change notification settings - Fork 1.8k
JStorm English Documentation
- Download release package from Downloads
- Deploy Zookeeper cluster
- Install Python 2.6
- Install the Java
- Install zeromq
- Install Jzmq
- Configuration $ JSTORM_HOME / conf / storm.yaml
- Deploy web ui
- Start JStorm cluster
Zookeeper installation procedure is not described here
- Please refer to the "zookeeper installation steps" instructions.
- Please refer to the "zookeeper Configuration Description" instructions.
If the current system python version is 2.6 or 2.7, you need not to install python. Refer python
You can also use https://github.com/utahta/pythonbrew to install python> curl-kL http://xrl.us/pythonbrewinstall | bash -s $HOME/.pythonbrew/etc/bashrc && source $HOME/.pythonbrew/etc/bashrc pythonbrew install 2.6.7 pythonbrew switch 2.6.7
Note that, if the current system is a 64-bit system, you need to download java for 64 bit, and if it is 32-bit system, then download java for 32 bit.
wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
tar zxf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7
make
sudo make install
sudo ldconfig
If you do not have root privileges, or the current user does not have sudo privileges, execute "/configure -- Prefix=/home/xxxxx" replace "/configure", /home/xxxx is the installation target directory.
git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install
If you do not have root privileges, or the current user does not have sudo privileges, execute "/configure -- Prefix=/home/xxxxx" replace "/configure", /home/xxxx is the installation target directory.
Take jstorm-0.9.1.zip as an example
unzip jstorm-0.9.1.zip
vi ~/.bashrc
export JSTORM_HOME=/XXXXX/XXXX
export PATH=$PATH:$JSTORM_HOME/bin
Configure $ JSTORM_HOME /conf/storm.yaml
Configuration items:
storm.zookeeper.servers: zookeeper address.
nimbus.host: nimbus address.
storm.zookeeper.root: represents JStorm in the root directory of zookeeper, when multiple JStorm share a ZOOKEEPER, you need to set this option, the default is "/jstorm".
storm.local.dir: JStorm temporary data storage directory, you need to ensure JStorm program has write authority.
java.library.path: zeromq and java zeromq library installation directory, the default "/usr/local/lib:/opt/local/lib:/usr/lib".
supervisor.slots.ports: supervisor provided a list of ports, be careful not to conflict with other ports, the default is 68xx, while storm is 67xx.
supervisor.disk.slot: providing data directory, when a machine has more than one disk, the disk can provide read and write slot, easy to heavy IO operation application.
topology.enable.classloader: false, classloader disabled by default, if the jar of application and JStorm depend on conflicts, for example, application use thrift9, but JStorm use thrift7, then you need to enable this configure item.
nimbus.groupfile.path: if you need to make resource isolation, for example when you need isolate resource between some departments, such as data warehouse department, technology department, and the wireless department, then you need to enable the grouping feature, set the absolute path to a configuration file, which refer to group_file.ini.
The operations where jar submitted node:
#mkdir ~/.jstorm
#cp –f $JSTORM_HOME/conf/storm.yaml ~/.jstorm
Note: web ui and nimbus can not the same node.
mkdir ~/.jstorm cp -f $JSTORM_HOME/conf/storm.yaml ~/.jstorm
Download tomcat 7.x (take apache-tomcat-7.0.37 as an example)
tar -xzf apache-tomcat-7.0.37.tar.gz
cd apache-tomcat-7.0.37
cd webapps
cp $JSTORM_HOME/jstorm-ui-0.9.0.war ./
mv ROOT ROOT.old
ln -s jstorm-ui-0.9.0 ROOT
cd ../bin
./startup.sh
- Execute "nohup jstorm nimbus &" in the nimbus node, view $JSTORM_HOME/logs/nimbus.log to check if any errors.
- Execute "nohup jstorm supervisor &" in the supervisor node view $JSTORM_HOME/logs/supervisor.log to check if any errors.
JStorm contain the abstraction of stream, which is an ongoing continuous unbounded tuple sequence. Note that the abstract event in stream is tuple when modeling event stream in JStrom, followed when we explain JStorm we will explain how to use it.
JStorm consider that every stream has a source, so sources are abstracted as spout. it may be a source which connect to messaging middleware(MetaQ、Kafka、TBNotify Etc.), and continue to send out a message, it may be continuously read from a queue, then emitted out the tuple which assemblied by the element of the queue.
With spout there is a stream, then how to handle tuple stream within it ? The same ideas, in the middle of the process, tuple abstracted as a bolt, bolt can consume any number of input streams, as long as the direction of the stream be guided to this bolt, but it also can send a new stream to other bolt for use, As a result, simply open a specific spout (nozzle), then spout out of the tuple -oriented in a particular bolt, and bolt handle and oriented it to other bolt.
We can consider spout as a faucet, and each faucet outflow water is different, which kind of water we want is just unscrew that faucet, Then use the pipeline direct water of faucet to a water processor(bolt), Water processor processed and then use pipeline directed water to another processor or stored in containers.
According to the introduction all above, we can easily understand this picture, This is a DAG, in JStorm this picture abstracted as a topology(indeed topological structure is a directed acyclic), topology is the highest abstraction in JStorm, it can be submit to JStorm cluster and execute, a topology is a data stream conversion chart, each node in above graph represent a spout or bolt, edges in the graph represents one bolt subscribe to the stream. When spout or bolt send tuple to a stream, then it will sends tuple to each bolt subscribed from this stream.(which means we do not need to manually pull the pipeline, as long as pre-subscriptions, spout stream will be sent to the appropriate bolt). Here insert position to talk about the achieve of topology in JStorm, in order to make real-time calculation, we need to design a topology diagram and implement the handle detail of bolt, topology is just some defined thrift structure, so that we can use other languages to create or submit topology.
The data in stream abstracted as tuple in JStorm, a tuple is a list of values, each value in the list has a name, and the value can be a basic type, character type, byte array, of course, also be other serializable type. Each node in the topology of the field in which it must explain emitted tuple name, other nodes only need to subscribe to the name.
Worker and Task is the execution units in JStorm task, a worker represents a process, a task represents a thread, and worker can run more than one task.
In JStorm, the resource types are divided into four dimensions, CPU, Memory, Disk, and Port, no longer confined to port like Storm. That is, how many CPU Slot, how many Memory Slot, the number of Disk Slot, how many Port Slot a supervisor can provide.
- A worker consumes a Port Slot, by default a task consumes a CPU Slot and a Memory Slot.
- When the task is a computational task, you can apply more CPU Slot.
- When the task needs more memory, you can apply more Memory Slot.
- When the task needs more disk access, you can apply for the Disk Slot, and then the Disk Slot is exclusively to the task.
This article helps readers to quickly implement a JStorm example Example source The simplest JStorm example is divided into four steps:
Map conf = new HashMp();
//topology所有自定义的配置均放入这个Map
TopologyBuilder builder = new TopologyBuilder();
//创建topology的生成器
int spoutParal = get("spout.parallel", 1);
//获取spout的并发设置
SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
new SequenceSpout(), spoutParal);
//创建Spout, 其中new SequenceSpout() 为真正spout对象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 为spout的名字,注意名字中不要含有空格
int boltParal = get("bolt.parallel", 1);
//获取bolt的并发设置
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
//创建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 为bolt名字,TotalCount 为bolt对象,boltParal为bolt并发数,
//shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME),
//表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的数据,并且以shuffle方式,
//即每个spout随机轮询发送tuple到下一级bolt中
int ackerParal = get("acker.parallel", 1);
Config.setNumAckers(conf, ackerParal);
//设置表示acker的并发数
int workerNum = get("worker.num", 10);
conf.put(Config.TOPOLOGY_WORKERS, workerNum);
//表示整个topology将使用几个worker
conf.put(Config.STORM_CLUSTER_MODE, "distributed");
//设置topolog模式为分布式,这样topology就可以放到JStorm集群上运行
StormSubmitter.submitTopology(streamName, conf,
builder.createTopology());
//submit topology
IRichSpout is the easiest Spout Interface
IRichSpout{
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void nextTuple() {
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
Note:
- Spout object must inherit Serializable interface, thus requiring all the data structures within the spout must be serialized.
- Spout can have a constructor, but the constructor is executed only once when submitting the task to create spout object, Therefore, initialization can be done here before task is assigned to specific worker, once completed, the contents of the initialization will bring to each task (because when you submit task, spout object will be serialized to a file, and then spout object will be deserialized from file when worker start).
- open action is the initialization action after the task start up.
- close action is the close action after the task shutdown.
- activate action triggered after the task is activated.
- deactivate action triggered after the task is activated.
- nextTuple is the core implements of a spout, nextuple is the logic which you need completed, collector emit the message out when taking a message.
- ack action triggered after the task taking a ack message, the details of ack mechanism.
- fail action triggered after the task taking a fail message, the details of ack mechanism.
- declareOutputFields defines the meaning of each field which emitted out by spout.
- getComponentConfiguration is the interface of component configuration for spout.
IRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Note:
- Bolt object must inherit serializable, thus requiring all the data structures within the bolt must be serialized.
- Bolt can have a constructor, but the constructor is executed only once when submitting the task to create a bolt object, Therefore, initialization can be done here before task is assigned to a specific worker, once completed, the contents of the initialization will bring to each task (because when you submit, bolt object will be serialized to a file, and then bolt object will be deserialized from file when the worker start).
- prepare action is the initialization action after the task start up.
- cleanup action is the close action after the task shutdown.
- execute is the core implements of a blot, execute is the logic which you need completed, collector may emit the message out when taking a message. In the executor, when program process a message, you need to perform collector.ack, details can refer ack mechanism, when program unable to process a message or have an error, you need to perform collector.fail, details can refer ack mechanism.
- declareOutputFields defines the meaning of each field which emitted out by bolt.
- getComponentConfiguration is the interface of component configuration for bolt.
Configuration in Maven
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client</artifactId>
<version>0.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client-extension</artifactId>
<version>0.9.0</version>
<scope>provided</scope>
</dependency>
If you can not find jstorm-client and jstorm-client-extension package, you can download the source code of JStorm to compile, please refer to the source code compiler When packing, you need packing all dependent to one package.
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>storm.starter.SequenceTopology</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter
- xxxx.jar is packaged jar.
- com.alibaba.xxxx.xx is the entry of the task class.
- parameter is the arguments of topology.
Selection of JStorm/Storm/flume/S4
JStorm vs Storm please click here(JStorm 0.9.0 介绍.pptx)
JStorm is more stable, more powerful, faster than Storm, and the program can run on JStorm unchanged if it can be run on the storm.
Flumeis a mature system that focuses on the pipe, it can transfer data from one data source to another data source, and the system provides a large number of ready-made pipe plugin. Certainly you can also do some calculations and analysis, but the development of plugin is not convenient and fast than JStrom.
S4is a semi-finished products, system robustness is OK, but poor data accuracy, can not guarantee the data not lost, this feature makes the S4 greatly restricted, also is the reason that S4 open for many years, but the development is still slowly.
AKKAis an actor model, is also a good system, based on the actor model, you do anything you want, but the problem is that you need to do more work, you should consider that topology how to generate, how to serialization, how data flows (random or group by), and so on.
Sparkis a lightweight memory MR, more emphasis on bulk data processing.
Performance test of JStorm 0.9.0
JStorm 0.9.0 Performance is very good, the maximum sending speed of a single worker is 110,000 QPS when using netty, when using zeromq, the maximum speed is 120,000 QPS.
Conclusion
- JStorm 0.9.0 10% faster than Storm 0.9.0 when using netty and JStorm netty plugin is stable, while the netty plugin of Storm is unstable.
- In the case of using Zeromq, JStorm 0.9.0 30% faster than Storm 0.9.0.
Reason
- Zeromq reduce a memory copy.
- Increase deserialization thread.
- Rewrite sampling code, significantly reducing the sampling impact.
- Ack code optimization.
- Optimize the performance of the buffer map.
- Java is more bottom than clojure.
Testing
Testing Example
Testing example is https://github.com/longdafeng/storm-examples%20https:/github.com/longdafeng/storm-examples
Testing Environment
Five 16 cores, 98G physical machines
uname -a :
Linux dwcache1 2.6.32-220.23.1.tb735.el5.x86_64 #1 SMP Tue Aug 14 16:03:04 CST 2012 x86_64 x86_64 x86_64 GNU/Linux
Testing Results
JStorm with netty, Spout sending QPS is 110,000
Storm with netty, Spout sending QPS is 100,000 (screenshot is the QPS of upper application, not including the QPS of ack, Spout sending QPS exactly two times of the upper application QPS).
JStorm with zeromq, Spout sending QPS is 120,000
Storm with zeromq, Spout sending QPS is 90,000 (screenshot is the QPS of the upper application, not including the QPS of ack, Spout sending QPS exactly two times of the upper application QPS).