当前位置:首页 > Windows程序 > 正文

storm源码分析

2021-05-26 Windows程序

你已经看到实现有且只有一次被执行的语义时的复杂性。Trident这样做的好处把所有容错想过的逻辑都放在了State里面 -- 作为一个用户,你并不需要自己去处理复杂的txid,存储多余的信息到数据库中,或者是任何其他类似的事情。你只需要写如下这样简单的code

  TridentTopology topology = new  TridentTopology();          TridentState wordCounts =         topology.newStream("spout1", spout)           .each(new Fields("sentence"), new Split(), new  Fields("word"))           .groupBy(new Fields("word"))           .persistentAggregate(MemcachedState.opaque(serverLocations), new  Count(), new Fields("count"))                           .parallelismHint(6);    

所有管理opaque transactional state所需的逻辑都在MemcachedState.opaque方法的调用中被涵盖了,除此之外,数据库的更新会自动以batch的形式来进行以避免多次访问数据库。State的基本接口只包含下面两个方法:

  public interface State {       void beginCommit(Long txid); // can be null for things like  partitionPersist occurring off a DRPC stream       void commit(Long txid);  }    

当一个State更新开始时,以及当一个State更新结束时你都会被告知,并且会告诉你该次的txid。Trident并没有对你的state的工作方式有任何的假定。

假定你自己搭了一套数据库来存储用户位置信息,并且你想要在Trident中去访问它。则在State的实现中应该有用户信息的set、get方法:

  public class LocationDB implements State {       public void beginCommit(Long txid) {            }          public void commit(Long txid) {            }          public void setLocation(long userId, String location) {         // code to access database and set location       }          public String getLocation(long userId) {         // code to get location from database       }  }    

然后你还需要提供给Trident一个StateFactory来在Trident的task中创建你的State对象。LocationDB 的 StateFactory可能会如下所示:

  public class LocationDBFactory implements  StateFactory {      public State makeState(Map conf, int partitionIndex, int  numPartitions) {         return new LocationDB();     }    }    

Trident提供了一个QueryFunction接口用来实现Trident中在一个state source上查询的功能。同时还提供了一个StateUpdater来实现Trident中更新statesource的功能。比如说,让我们写一个查询地址的操作,这个操作会查询LocationDB来找到用户的地址。下面以以怎样在topology中使用该功能开始,假定这个topology会接受一个用户id作为输入数据流:

  TridentTopology topology = new  TridentTopology();  TridentState locations =  topology.newStaticState(new LocationDBFactory());  topology.newStream("myspout",  spout)           .stateQuery(locations, new Fields("userid"), new  QueryLocation(), new Fields("location"))    

温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/71431.html