当前位置:首页 > Windows程序 > 正文

Storm高级原语(四)Trident API 综述

2021-05-26 Windows程序

Stream”是Trident中的核心数据模型,它被当做一系列的batch来处理。在Storm集群的节点之间,一个stream被划分成很多partition(分区),对流的操作(operation)是在每个partition上并行进行的。

注:

①“Stream”是Trident中的核心数据模型:有些地方也说是TridentTuple,没有个标准的说法。

②一个stream被划分成很多partition:partition是stream的一个子集,里面可能有多个batch,一个batch也可能位于不同的partition上

Trident有五类操作(operation):

1、Partition-local operations,对每个partition的局部操作,不产生网络传输

2、Repartitioning operations:对数据流的重新划分(仅仅是划分,但不改变内容),产生网络传输

3、Aggregation operations:聚合操作

4、Operations on grouped streams:作用在分组流上的操作

5、Merge、Join操作

Partition-localoperations

对每个partition的局部操作包括:function、filter、partitionAggregate、stateQuery、partitionPersist、project等。

Functions

一个function收到一个输入tuple后可以输出0或多个tuple,输出tuple的字段被追加到接收到的输入tuple后面。如果对某个tuple执行function后没有输出tuple,则该tuple被过滤(filter),否则,就会为每个输出tuple复制一份输入tuple的副本。假设有如下的function:

  

public class  MyFunction extends BaseFunction {

  

public void execute(TridentTuple tuple,  TridentCollector collector) {

  

for(int i=0; i <  tuple.getInteger(0); i++) {

  

collector.emit(new Values(i));

  

}

  

}

  

}

  
 

假设有个叫“mystream”的流(stream),该流中有如下tuple( tuple的字段为["a", "b","c"] ),

[1, 2, 3]

[4, 1, 6]

[3, 0, 8]

运行下面的代码:

  

mystream.each(new  Fields("b"), new MyFunction(), new Fields("d")))

  
 

则输出tuple中的字段为["a", "b","c", "d"],,如下所示

[1, 2, 3, 0]

[1, 2, 3, 1]

[4, 1, 6, 0]

Filters

fileter收到一个输入tuple后可以决定是否留着这个tuple,看下面的filter:

  

public class  MyFilter extends BaseFunction {

  

public boolean isKeep(TridentTuple tuple)  {

  

return tuple.getInteger(0) == 1  && tuple.getInteger(1) == 2;

  

}

  

}

  
 

假设你有如下这些tuple(包含的字段为["a","b", "c"]):

[1, 2, 3]

[2, 1, 1]

[2, 3, 4]

运行下面的代码:

  

mystream.each(new  Fields("b", "a"), new MyFilter())

  
 

则得到的输出tuple为:

[2, 1, 1]

partitionAggregate

partitionAggregate对每个partition执行一个function操作(实际上是聚合操作),但它又不同于上面的functions操作,partitionAggregate的输出tuple将会取代收到的输入tuple,如下面的例子:

  

mystream.partitionAggregate(new  Fields("b"), new Sum(), new Fields("sum"))

  
 

假设输入流包括字段 ["a","b"] ,并有下面的partitions:

Partition 0:

["a", 1]

["b", 2]

Partition 1:

["a", 3]

["c", 8]

Partition 2:

["e", 1]

["d", 9]

["d", 10]

则这段代码的输出流包含如下tuple,且只有一个”sum”的字段:

Partition 0:

[3]

Partition 1:

[11]

Partition 2:

[20]

上面代码中的new Sum()实际上是一个聚合器(aggregator),定义一个聚合器有三种不同的接口:CombinerAggregator、ReducerAggregator 和 Aggregator。

下面是CombinerAggregator接口:

  

public interface  CombinerAggregator<T> extends Serializable {

  

T init(TridentTuple tuple);

  

T combine(T val1, T val2);

  

T zero();

  

}

  
 

一个CombinerAggregator仅输出一个tuple(该tuple也只有一个字段)。每收到一个输入tuple,CombinerAggregator就会执行init()方法(该方法返回一个初始值),并且用combine()方法汇总这些值,直到剩下一个值为止(聚合值)。如果partition中没有tuple,CombinerAggregator会发送zero()的返回值。下面是聚合器Count的实现:

  

public class Count  implements CombinerAggregator<Long> {

  

public Long init(TridentTuple tuple) {

  

return 1L;

  

}

    

public Long combine(Long val1, Long val2)  {

  

return val1 + val2;

  

}

    

public Long zero() {

  

return 0L;

  

}

  

}

  
 

温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/71554.html