Storm - Conditionally consuming stream from kafka spout? -
i have scenario posting json kafka instance. using kafka spout emit stream bolt.
but add additional field (call x) json message. if x a consumed bolta, if x b consumed boltb.
is there way direct stream proper bolt depending on streams contents?
the simplest way should add splitbolt consumes kafkaspout, evaluates field x , , forwards different output streams:
public class splitbolt extends baserichbolt { outputcollector collector; public void prepare(...) { this.collector = collector; } public void execute(tuple input) { object x = ... // field x input string streamid; if(x == a) { streamid = "stream-xa"; } else { // x == b streamid = "stream-xb"; } collector.emit(streamid, input, input.getvalues()); } public void declareoutputfields(outputfieldsdeclarer declarer) { fields schema = new fields(...) declarer.declarestream("stream-xa", schema); declarer.declarestream("stream-xy", schema); } } when building topology, connect bolta "stream-xa" , boltb "stream-xb":
topologybuilder b = new topologybuilder(); b.setspout("spout", new kafkaspout(...)); b.setbolt("split", new splitbolt()).shufflegrouping("spout"); b.setbolt("bolta", new bolta()).shufflegrouping("split", "stream-xa"); b.setbolt("boltb", new boltb()).shufflegrouping("split", "stream-xb"); as alternative, should possible, inherit kafkaspout , emit 2 different streams directly. however, code more tricky right.
Comments
Post a Comment