Storm概念及组件
Nimbus:负责资源分配和任务调度。Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。Worker:运行具体处理组件逻辑的进程。Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应, 同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。Topology:storm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。 为了在storm上面做实时计算, 你要去建立一些topologies。 一个topology就是一个计算节点所组成的图。 Topology里面的每个处理节点都包含处理逻辑, 而节点之间的连接则表示数据流动的方向。Spout:在一个topology中产生源数据流的组件。 通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。 Spout是一个主动的角色,其接口中有个nextTuple()函数,storm框架会不停地调用此函数,用户只要在其中生成源数据即可。Bolt:在一个topology中接受数据然后执行处理的组件。 Bolt可以执行过滤、函数操作、合并、写数据库等任何操作。 Bolt是一个被动的角色,其接口中有个execute(Tuple input)函数,在接受到消息后会调用此函数,用户可以在其中执行自己想要的操作。 Tuple:Storm Spout、Bolt组件消息传递的基本单元(数据模型), Tuple是包含名称的列表,Storm支持所有原生类型,字节数组为Tuple字段传递,如果要传递自定义对象,需要实现接口serializer。 Stream:源源不断传递的tuple就组成了stream。
storm使用tuple来作为它的数据模型。
每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型, 在我的理解里面一个tuple可以看作一个没有方法的java对象。 总体来看,storm支持所有的基本类型、字符串以及字节数组作为tuple的值类 型。 你也可以使用你自己定义的类型来作为值类型, 只要你实现对应的序列化器(serializer)。 一个Tuple代表数据流中的一个基本的处理单元,例如一条cookie日志,它可以包含多个Field,每个Field表示一个属性。或者文本文件的单词解析,一行可以看作一个Tuple,行中的单词可以看作Field Tuple本来应该是一个Key-Value的Map,由于各个组件间传递的tuple的字段名称已经事先定义好了,所以Tuple只需要按序填入各个Value,所以就是一个Value List。 一个没有边界的、源源不断的、连续的Tuple序列就组成了Stream。 topology里面的每个节点必须定义它要发射的tuple的每个字段。 比如下面这个bolt定义它所发射的tuple包含两个字段,类型分别是: double和triple。 @Override publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(newFields("double","triple")); }一个简单的Topology
让我们来看一个简单的topology的例子, 我们看一下storm-starter里面的ExclamationTopology:TopologyBuilder builder =newTopologyBuilder();builder.setSpout(1,newTestWordSpout(),10);builder.setBolt(2,newExclamationBolt(),3).shuffleGrouping(1);builder.setBolt(3,newExclamationBolt(),2).shuffleGrouping(2);
这个Topology包含一个Spout和两个Bolt。
Spout发射单词, 每个bolt在每个单词后面加个”!!!”。 这三个节点被排成一条线: spout发射单词给第一个bolt, 第一个bolt然后把处理好的单词发射给第二个bolt。 如果spout发射的单词是["bob"]和["john"], 那么第二个bolt会发射["bolt!!!!!!"]和["john!!!!!!"]出来。我们使用setSpout和setBolt来定义Topology里面的节点。
这些方法接收我们指定的[id, 包含处理逻辑的对象(spout或者bolt), 所需要的并行度] 这个包含处理的对象如果是spout那么要实现IRichSpout的接口, 如果是bolt,那么就要实现IRichBolt接口. 最后一个指定并行度的参数是可选的。 它表示集群里面需要多少个thread来一起执行这个节点。如果你忽略它那么storm会分配一个线程来执行这个节点。setBolt方法返回一个InputDeclarer对象, 这个对象是用来定义Bolt的输入。
第一个Bolt声明它要读取spout所发射的所有的tuple — 使用shuffle grouping。 第二个bolt声明它读取第一个bolt所发射的tuple。 shuffle grouping表示所有的tuple会被随机的分发给bolt的所有task。 给task分发tuple的策略有很多种,后面会介绍。如果你想第二个bolt读取spout和第一个bolt所发射的所有的tuple, 那么你应该这样定义第二个bolt:
builder.setBolt(3,newExclamationBolt(),5).shuffleGrouping(1).shuffleGrouping(2);
让我们深入地看一下这个topology里面的spout和bolt是怎么实现的。
Spout负责发射新的tuple到这个topology里面来。 TestWordSpout从["nathan", "mike", "jackson", "golda", "bertels"]里面随机选择一个单词发射出来。 TestWordSpout里面的nextTuple()方法是这样定义的:publicvoidnextTuple() { Utils.sleep(100); final String[] words =new String[] {"nathan","mike", "jackson","golda","bertels"}; final Random rand =new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(newValues(word));}
ExclamationBolt把”!!!”拼接到输入tuple后面。我们来看下ExclamationBolt的完整实现。
public static class ExclamationBolt implements IRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { //ExclamationBolt获取tuple的第一个字段,加上”!!!”之后再发射出去。 _collector.emit(tuple,newValues(tuple.getString(0) +"!!!")); _collector.ack(tuple); } publicvoidcleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(newFields("word")); }}
Bolt可以在任何时候发射tuple — 在prepare, execute或者cleanup方法里面, 或者甚至在另一个线程里面异步发射。
1)prepare prepare方法提供给bolt一个Outputcollector用来发射tuple。 prepare方法只是简单地把OutputCollector作为一个类字段保存下来给后面execute方法 使用。2)execute execute方法从bolt的一个输入接收tuple(一个bolt可能有多个输入源)。 ExclamationBolt获取tuple的第一个字段,加上”!!!”之后再发射出去。 如果一个bolt有多个输入源,你可以通过调用 Tuple#getSourceComponent方法来知道它是来自哪个输入源的。 execute方法里面还有其它一些事情值得一提:输入tuple被作为emit方法的第一个参数,并且输入tuple在最后一行被ack。 这些呢都是Storm可靠性API的一部分,后面会解释。3)clenup cleanup方法在bolt被关闭的时候调用, 它应该清理所有被打开的资源。 但是集群不保证这个方法一定会被执行。 比如执行task的机器down掉了,那么根本就没有办法来调用那个方法。 cleanup设计的时候是被用来在local mode的时候才被调用(也就是说在一个进程里面模拟整个storm集群), 并且你想在关闭一些topology的时候避免资源泄漏。4)declareOutputFields declareOutputFields定义一个叫做”word”的字段的tuple。 以local mode运行ExclamationTopology 让我们看看怎么以local mode运行ExclamationToplogy。 storm的运行有两种模式: 本地模式和分布式模式. 在本地模式中, storm用一个进程里面的线程来模拟所有的spout和bolt. 本地模式对开发和测试来说比较有用。 你运行storm-starter里面的topology的时候它们就是以本地模式运行的, 你可以看到topology里面的每一个组件在发射什么消息。 在分布式模式下, storm由一堆机器组成。当你提交topology给master的时候, 你同时也把topology的代码提交了。master负责分发你的代码并且负责给你的topolgoy分配工作进程。如果一个工作进程挂掉了, master节点会把认为重新分配到其它节点。关于如何在一个集群上面运行topology, 你可以看看Running topologies on a production cluster文章。 下面是以本地模式运行ExclamationTopology的代码:Config conf =newConfig();
conf.setDebug(true); conf.setNumWorkers(2);LocalCluster cluster =newLocalCluster();
cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown();首先, 这个代码定义通过定义一个LocalCluster对象来定义一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集 群是一样的。通过调用submitTopology方法来提交topology, 它接受三个参数:要运行的topology的名字,一个配置对象以及要运行的topology本身。
topology的名字是用来唯一区别一个topology的,这样你然后可以用这个名字来杀死这个topology的。前面已经说过了, 你必须显式的杀掉一个topology, 否则它会一直运行。 Conf对象可以配置很多东西, 下面两个是最常见的:TOPOLOGY_WORKERS(setNumWorkers) 定义你希望集群分配多少个工作进程给你来执行这个topology. topology里面的每个组件会被需要线程来执行。每个组件到底用多少个线程是通过setBolt和setSpout来指定的。这些线程都运行在工作进 程里面. 每一个工作进程包含一些节点的一些工作线程。比如, 如果你指定300个线程,60个进程, 那么每个工作进程里面要执行6个线程, 而这6个线程可能属于不同的组件(Spout, Bolt)。你可以通过调整每个组件的并行度以及这些线程所在的进程数量来调整topology的性能。
TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话, storm会记录下每个组件所发射的每条消息。这在本地环境调试topology很有用, 但是在线上这么做的话会影响性能的。感兴趣的话可以去看看Conf对象的Javadoc去看看topology的所有配置。
可以看看创建一个新storm项目去看看怎么配置开发环境以使你能够以本地模式运行topology. 运行中的Topology主要由以下三个组件组成的: Worker processes(进程) Executors (threads)(线程) Tasksstorm的主要作用是进行流式的实时计算,对于一直产生的数据流处理是非常迅速的,然而大部分数据并不是均匀的数据流,而是时而多时而少。
对于这种情况下进行批处理是不合适的,因此引入了kafka作为消息队列,与storm完美配合,这样可以实现稳定的流式计算。