Storm - Conditionally consuming stream from kafka spout? -


i have scenario posting json kafka instance. using kafka spout emit stream bolt.

but add additional field (call x) json message. if x a consumed bolta, if x b consumed boltb.

is there way direct stream proper bolt depending on streams contents?

the simplest way should add splitbolt consumes kafkaspout, evaluates field x , , forwards different output streams:

public class splitbolt extends baserichbolt {   outputcollector collector;    public void prepare(...) {     this.collector = collector;   }    public void execute(tuple input) {     object x = ... // field x input     string streamid;     if(x == a) {       streamid = "stream-xa";     } else { // x == b       streamid = "stream-xb";     }     collector.emit(streamid, input, input.getvalues());   }    public void declareoutputfields(outputfieldsdeclarer declarer) {     fields schema = new fields(...)     declarer.declarestream("stream-xa", schema);     declarer.declarestream("stream-xy", schema);   } } 

when building topology, connect bolta "stream-xa" , boltb "stream-xb":

topologybuilder b = new topologybuilder(); b.setspout("spout", new kafkaspout(...)); b.setbolt("split", new splitbolt()).shufflegrouping("spout"); b.setbolt("bolta", new bolta()).shufflegrouping("split", "stream-xa"); b.setbolt("boltb", new boltb()).shufflegrouping("split", "stream-xb"); 

as alternative, should possible, inherit kafkaspout , emit 2 different streams directly. however, code more tricky right.


Comments

Popular posts from this blog

javascript - Karma not able to start PhantomJS on Windows - Error: spawn UNKNOWN -

c# - Display ASPX Popup control in RowDeleteing Event (ASPX Gridview) -

Nuget pack csproj using nuspec -