storm源码分析
你已经看到实现有且只有一次被执行的语义时的复杂性。Trident这样做的好处把所有容错想过的逻辑都放在了State里面 -- 作为一个用户,你并不需要自己去处理复杂的txid,存储多余的信息到数据库中,或者是任何其他类似的事情。你只需要写如下这样简单的code:
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count")) .parallelismHint(6);所有管理opaque transactional state所需的逻辑都在MemcachedState.opaque方法的调用中被涵盖了,除此之外,数据库的更新会自动以batch的形式来进行以避免多次访问数据库。State的基本接口只包含下面两个方法:
public interface State { void beginCommit(Long txid); // can be null for things like partitionPersist occurring off a DRPC stream void commit(Long txid); }当一个State更新开始时,以及当一个State更新结束时你都会被告知,并且会告诉你该次的txid。Trident并没有对你的state的工作方式有任何的假定。
假定你自己搭了一套数据库来存储用户位置信息,并且你想要在Trident中去访问它。则在State的实现中应该有用户信息的set、get方法:
public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocation(long userId, String location) { // code to access database and set location } public String getLocation(long userId) { // code to get location from database } }然后你还需要提供给Trident一个StateFactory来在Trident的task中创建你的State对象。LocationDB 的 StateFactory可能会如下所示:
public class LocationDBFactory implements StateFactory { public State makeState(Map conf, int partitionIndex, int numPartitions) { return new LocationDB(); } }Trident提供了一个QueryFunction接口用来实现Trident中在一个state source上查询的功能。同时还提供了一个StateUpdater来实现Trident中更新statesource的功能。比如说,让我们写一个查询地址的操作,这个操作会查询LocationDB来找到用户的地址。下面以以怎样在topology中使用该功能开始,假定这个topology会接受一个用户id作为输入数据流:
TridentTopology topology = new TridentTopology(); TridentState locations = topology.newStaticState(new LocationDBFactory()); topology.newStream("myspout", spout) .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/71431.html