本系列属个人原创,转载请注明!
原文地址:http://blog.csdn.net/xeseo/article/details/18219183
本系列源码地址: https://github.com/EdisonXu/storm-samples
https://github.com/baijian/storm-java
https://github.com/ashrithr/storm-helloworld
根据前文介绍,我们知道,storm的任务是包装在topology类中,由nimbus提交分配到整个cluster。
Topology有两种大类提交部署方式:
- 提交到本地模式,一般用于调试。该模式下由于是起在同一个JVM进程下,所以不要让其负载太高。
- 提交到集群模式。
提交到本地模式
- public class LocalRunningTopology extends ExclaimBasicTopo {
- public static void main(String[] args) throws Exception {
- LocalRunningTopology topo = new LocalRunningTopology();
- Config conf = new Config();
- conf.setDebug(true);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, topo.buildTopology());
- Utils.sleep(100000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
- }
提交到集群
- public class ClusterRunningTopology extends ExclaimBasicTopo {
- public static void main(String[] args) throws Exception {
- String topoName = "test";
- ClusterRunningTopology topo = new ClusterRunningTopology();
- Config conf = new Config();
- conf.setDebug(true);
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology(topoName, conf, topo.buildTopology());
- }
- }
实际开发时常用提交模式
- public static void main(String[] args) throws Exception {
- ExclaimBasicTopo topo = new ExclaimBasicTopo();
- Config conf = new Config();
- conf.setDebug(false);
- if (args != null && args.length > 0) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], conf, topo.buildTopology());
- } else {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, topo.buildTopology());
- Utils.sleep(100000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
- }
有人又说了,这样还不是很方便,我能不能直接在IDE里面提交到storm集群?
可以。
IDE直接提交至集群
修改上面提交集群的代码如下:
- public static void main(String[] args) throws Exception {
- String topoName = "test";
- ExclaimBasicTopo topo = new ExclaimBasicTopo();
- Config conf = new Config();
- conf.setDebug(false);
- File jarFile = EJob.createTempJar(RemoteRunningTopology.class.getClassLoader().getResource("").getPath());
- ClassLoader classLoader = EJob.getClassLoader();
- Thread.currentThread().setContextClassLoader(classLoader);
- //System.setProperty("storm.jar", Class.forName("com.edi.storm.topos.RemoteRunningTopology").getProtectionDomain().getCodeSource().getLocation().getPath());
- System.setProperty("storm.jar", jarFile.toString());
- conf.setNumWorkers(5);
- conf.setDebug(false);
- conf.put(Config.NIMBUS_HOST, "10.1.110.24");
- //conf.put(Config.NIMBUS_THRIFT_PORT, 8889);
- StormSubmitter.submitTopology(topoName, conf, topo.buildTopology());
- }
起作用的部分主要有三点:
1. 设置系统变量"storm.jar"。这个变量的值代表要部署的Topology的jar包地址。
这个地址必须是文件,所以,我们就可以写完代码后自己打个jar包放在某个固定位置,然后IDE直接运行该topology去集群提交部署。
当然,也可以在代码中打jar,所以我这里的代码中加入了一个打包的Utilities类,EJob。
2. 设置参数Config.NIMBUS_HOST,其值为nimbus的hostname或ip地址。
3. 设置参数Config.NIMBUS_THRIFT_PORT,其值为nimbus上Thrift接口的地址,也就是nimbus的conf/storm.yaml中参数nimbus.thrift.port的值,前提是你配了。如果没配,可以不设。
这样就可以直接在IDE里面运行提交上去了。
Topology提交原理
Topology提交后发生了什么呢?这个原理要放在这里讲了。因为这直接关系到对Strom运行概念的理解。
1. Nimbus$Iface的beginFileUpload,uploadChunk以及finishFileUpload方法将运行的包上传至其数据目录(storm.yaml中storm.local.dir对应的目录)下的inbox目录。
- /{storm.local.dir}
- |
- | - /nimbus
- |
- | - /inbox
- |
- | - /stormjar-{uuid}.jar
不论上传的包名字是什么,最终会变成stormjar-{uuid}.jar。
2. Nimbus$Iface的submitTopology方法会负责对这个topology进行处理,首先是对Storm本身及topology进行一些校验:
a. 检查Storm状态是否active
b. 检查是否有同名topology在运行
c. 检查是否有同id的spout和bolt,以及其id是否合法。任何一个id都不能以"_"开头,这种命名方式是系统保留的。
3. 建立topology的本地目录
- /{storm.local.dir}
- |
- | - /nimbus
- |
- | - /inbox
- | - /stormdist
- |
- | - /{topology-id}
- |
- | - /stormjar.jar -- 包含这个topology所有代码的jar包(从nimbus/inbox挪过来)
- |
- | -/stormcode.ser -- 这个topology对象的序列化
- | -/stormconf.ser -- 运行这个topology的配置
4. 建立该topology在zookeeper上的心跳目录
nimbus老兄是个有责任心的人,它虽然最终会把任务分成一个个task让supervisor去做,但是它时刻在关注着大家的情况,所以它要求每个task每隔一定时间就要给它打个招呼(心跳信息),让它知道事情还在正常发展。如果有task超时不打招呼,nimbus会人为这个task不行了,然后进行重新分配。zookeeper上的心跳目录:
- /<span style="font-family: Consolas, 'Liberation Mono', Courier, monospace;">{storm.zookeeper.root}</span>
- |
- | - /workerbeats
- |
- | - {topology-id}
- |
- | - /{task-id} -- task的心跳信息,包括心跳的时间,task运行时间以及一些统计信息
5. 计算topology的工作量
nimbus会根据topology中给的parallelism hint参数,来给spout/bolt设定task数目,并分配相应的task-id,然后把分配号的task信息写到zookeeper上去:
- /{storm.zookeeper.root}
- |
- | - /assignments
- |
- | - /{topology-id}
6. 保存toplogy信息到zookeeper
- /{storm.zookeeper.root}
- |
- | - /storms
- |
- | - /{topology-id}
7. supervisor因为监听了zookeeper上的目录,所以当它发现有topology时,会先把所有的topology的信息如jar等下到本地,并删除不再运行的topology的本地信息
- /{storm.local.dir}
- |
- | - /supervisor
- |
- | - stormdist
- |
- | - {topology-id}
- |
- | - stormcode.ser
- | - stormconf.ser
- | - stormjar.jar
8. supervisor根据分配的任务,去启动worker去处理assignment
9. worker启动后,会去zookeeper上找其对应的task。同时根据task的outbound信息建立对外的socket连接,将来发送tuple就是从这些socket连接发出去的。
到这里,一个topology就已经完全部署和运转起来了。
相关推荐
Storm分布式实时计算模式由Apache Storm 项目核心贡献者吉奥兹、奥尼尔亲笔撰 写,融合了作者丰富的Storm实战经验,通过大量示例,全面而系统地讲解使用Storm进行分布式实 时计算的核心概念及应用,并针对不同的应用...
写第一个Storm应用--数单词数量(一个spout读取文本,第一个bolt用来标准化单词,第二个bolt为单词计数) 一、Storm运行模式: 1.本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解) 运行在本地...
Storm作为开源的分布式实时计算系统在业界得到了广泛应用,针对Storm自带调度策略忽略了Topology组件任务间的逻辑耦合性,从而引起大量tuple传输产生较大网络时延问题,结合进程代数将Topology等效简化为具有明显...
STORM的TOPOLOGY在线上运行时,随着数据量的增加,在一定的服务器性能及集群规模下,会渐渐达到一个极限,到达极限后,服务器的load、io、cpu、mem等可能会出现耗尽,系统很卡,storm吞吐量骤降的情况。本文档中截图...
大家都知道,要提交StormTopology到Cluster,需要运行如下命令:bin目录下storm是一个Python文件,我们可以看一下Python脚本的main方法首先解析args参数,解析完了之后,把所有的参数传递给COMMANDS,由COMMANDS调用...
storm提交topology的过程共1页.pdf.zip
并细分为20个章节,其中“基础知识”6章、“安装与部署”4章、“研发与维护”4章、“进阶知识”5章、“企业应用”1章,分别介绍了Storm的基本原理、Topology组件、Spout组件、Bolt组件、ZooKeeper集群、Storm的安装...
内容涵盖Storm本地开发环境搭建、日志流数据处理、Trident、分布式远程过程调用、Topology在不同编程语言中的实现方法、Storm与Hadoop的集成方法、实时机器学习、持续交付和如何在AWS上部署Storm。此外,Storm实时...
02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API 简单开发测试 08.storm-kafka...
Chapter 3 Topology design Chapter 4 Creating robust topologies Chapter 5 Moving from local to remote topologies Chapter 6 Tuning in Storm Chapter 7 Resource contention Chapter 8 Storm internals
内容涵盖Storm本地开发环境搭建、日志流数据处理、Trident、分布式远程过程调用、Topology在不同编程语言中的实现方法、Storm与Hadoop的集成方法、实时机器学习、持续交付和如何在AWS上部署Storm。此外,《大数据...
本书涵盖搭建基于Storm的开发环境和测试实时系统的许多实用方法与实战用例,以及如何应用交付最佳实践来将系统部署至云端。 通过阅读本书,你将学到如何构建包含统计面板和可视化的实时日志处理系统。通过集成Storm...
阿里巴巴集团数据平台事业部商家数据业务部正是最早使用Storm的技术团队之一。 《Storm实战:构建大数据实时计算》是一本系统并且具有实践指导意义的Storm工具书和参考书,对Storm整个技术体系进行了全面的讲解,...
Twitter将Storm正式开源了,这是一个分布式的、容错的实时计算系统,它被...有个名为storm-deploy的子项目,可以在AWS上一键部署Storm集群。关于详细的步骤,可以阅读Storm Wiki上的《Setting up a Storm cluster》。
import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import ...
storm概念、基本概念、构建Topology、安装部署、消息的可靠处理
java开发的基于kafka、xlog的web日志实时分析storm topology.zip
Java Transaction Service(Java事务服务)拓扑套件,是一种能够利用清楚精确的模型和强大的几何算法来实现一套核心空间数据操作的JAVA 应用编程接口(API)。它提供一种详细说明2-D线性几何图形(Geometry)的完善...
风暴拓扑示例 概述: 该项目提供了有关使用各种Apache Storm拓扑的示例集合... cd /tmp/storm-topology-examples && bash -x bin/install_mongodb.sh 如果使用HiveBolt,则创建表(您可能要修改ddl) cd /tmp/storm