Skip to content

storm example

kiddingbaby edited this page Nov 1, 2017 · 7 revisions

说明:通过 storm 读取 Kafka指定topic的消息, 并写入Storm-ui

准备工作:

1.创建测试需要的 kafka topic

kafka-topics.sh --create --topic test --zookeeper bgs-8p80-udp-01.bfdabc.com:2181,bgs-8p81-udp-02.bfdabc.com:2181,bgs-8p82-udp-03.bfdabc.com:2181,bgs-8p87-udp-08.bfdabc.com:2181,bgs-8p85-udp-06.bfdabc.com:2181 --replication-factor 1 --partitions 1

上述命令中, --topic test表示创建了一个名为test的topic, --zookeeper bgs-8p80-udp-01.bfdabc.com:2181,bgs-8p81-udp-02.bfdabc.com:2181,bgs-8p82-udp-03.bfdabc.com:2181,bgs-8p87-udp-08.bfdabc.com:2181,bgs-8p85-udp-06.bfdabc.com:2181表示zookeeper的集群地址, --replication-factor 1表示备份数为1, --partitions 1表示分区数为1

注意:

提示ERROR org.apache.kafka.common.errors.TopicExistsException: Topic "test" already exists., 则执行步骤2即可

zookeeper集群地址,可根据ambari上kafka服务的zookeeper.connect参数的值来配置

2.编译storm测试程序源码

在项目根目录下,执行,如下命令

mvn clean package -pl examples -am

在{udp_demo_home}/examples/target目录下,会有examples-{version}.jar文件,该jar内的com.baifendian.udp.example.storm.TestTopology为Storm的测试拓扑主类。 在{udp_demo_home}/examples/classes目录下,会有storm_demo.properties文件,该文件为storm测试程序的配置文件。

修改配置文件的kafka集群地址,topic名称(test)以及group id(任意):

bootstrap.servers=bgs-8p80-udp-01.bfdabc.com:6667,bgs-8p81-udp-02.bfdabc.com:6667,bgs-8p82-udp-03.bfdabc.com:6667,bgs-8p84-udp-05.bfdabc.com:6667,bgs-8p86-udp-07.bfdabc.com:6667
kafka.topic=test
group.id=test_group_1

注意:

bootstrap.servers这个参数,可以根据ambari上kafka的Kafka Broker hosts的值来确定,多个broker之间通过逗号分隔

kafka.topic为刚才创建的topic名称

group.id无要求,建议设置为topic名称_group_任意数字

3.UDP上传资源

在UDP系统的数据开发->资源管理 examples-{version}.jar文件上传到UDP系统内,资源名称为: storm_demo.jar storm_demo.properties文件上传到UDP系统内,资源名称为: storm_demo.properties

4.新建UDP storm流任务

在UDP系统的数据开发->流任务开发页面,新建Storm任务

固定参数: 参数说明:

  1. 流任务类型:storm
  2. 主jar:选择storm_demo.jar
  3. 运行主class: com.baifendian.udp.example.storm.TestTopology
  4. 任务名称: test_topology
  5. 主class参数:@resource_reference{storm_demo.properties}

5.模拟发送数据到kafka的topic 脚本如下: demo-send.sh

#!/usr/bin/env bash
# 配置kafka topic名称,同上配置为test
TOPIC=test
# 配置kafka broker地址,同上配置为bootstrap.servers参数的值
BROKER_LIST=bgs-8p80-udp-01.bfdabc.com:6667,bgs-8p81-udp-02.bfdabc.com:6667,bgs-8p82-udp-03.bfdabc.com:6667,bgs-8p84-udp-05.bfdabc.com:6667,bgs-8p86-udp-07.bfdabc.com:6667
# 配置最大生产者发送数量,这里表示发送10000条后自动停止发送
MAX_NUMBER=10000
# 运行命令
kafka-verifiable-producer.sh --topic ${TOPIC} --broker-list ${BROKER_LIST} --max-messages ${MAX_NUMBER}

编辑脚本参数, 将编辑好的demo-send.sh放到带有kafka客户端的机器上, 直接sh demo-send.sh运行等待结束即可。

6.查看Storm-ui 模拟发送数据成功后,进入Storm-ui, 选择test_topology这个拓扑, 再点击进入test_bolt, 点击任意Executors下的Port按钮, 查看实时日志即可,如图所示:

topology bolt port log

Clone this wiki locally