Storm - Conditionally consuming stream from kafka spout? -
i have scenario posting json kafka instance. using kafka spout emit stream bolt.
but add additional field (call x
) json message. if x
a
consumed bolta, if x
b
consumed boltb.
is there way direct stream proper bolt depending on streams contents?
the simplest way should add splitbolt
consumes kafkaspout
, evaluates field x
, , forwards different output streams:
public class splitbolt extends baserichbolt { outputcollector collector; public void prepare(...) { this.collector = collector; } public void execute(tuple input) { object x = ... // field x input string streamid; if(x == a) { streamid = "stream-xa"; } else { // x == b streamid = "stream-xb"; } collector.emit(streamid, input, input.getvalues()); } public void declareoutputfields(outputfieldsdeclarer declarer) { fields schema = new fields(...) declarer.declarestream("stream-xa", schema); declarer.declarestream("stream-xy", schema); } }
when building topology, connect bolta
"stream-xa" , boltb
"stream-xb":
topologybuilder b = new topologybuilder(); b.setspout("spout", new kafkaspout(...)); b.setbolt("split", new splitbolt()).shufflegrouping("spout"); b.setbolt("bolta", new bolta()).shufflegrouping("split", "stream-xa"); b.setbolt("boltb", new boltb()).shufflegrouping("split", "stream-xb");
as alternative, should possible, inherit kafkaspout
, emit 2 different streams directly. however, code more tricky right.
Comments
Post a Comment