Storm - Conditionally consuming stream from kafka spout? -
i have scenario posting json kafka instance. using kafka spout emit stream bolt.
but add additional field (call x) json message. if x a consumed bolta, if x b consumed boltb.
is there way direct stream proper bolt depending on streams contents?
the simplest way should add splitbolt consumes kafkaspout, evaluates field x , , forwards different output streams:
public class splitbolt extends baserichbolt {   outputcollector collector;    public void prepare(...) {     this.collector = collector;   }    public void execute(tuple input) {     object x = ... // field x input     string streamid;     if(x == a) {       streamid = "stream-xa";     } else { // x == b       streamid = "stream-xb";     }     collector.emit(streamid, input, input.getvalues());   }    public void declareoutputfields(outputfieldsdeclarer declarer) {     fields schema = new fields(...)     declarer.declarestream("stream-xa", schema);     declarer.declarestream("stream-xy", schema);   } } when building topology, connect bolta "stream-xa" , boltb "stream-xb":
topologybuilder b = new topologybuilder(); b.setspout("spout", new kafkaspout(...)); b.setbolt("split", new splitbolt()).shufflegrouping("spout"); b.setbolt("bolta", new bolta()).shufflegrouping("split", "stream-xa"); b.setbolt("boltb", new boltb()).shufflegrouping("split", "stream-xb"); as alternative, should possible, inherit kafkaspout , emit 2 different streams directly. however, code more tricky right.
Comments
Post a Comment