scala - How to write to Kafka from Spark Streaming -
i'm using spark streaming process data between 2 kafka queues can't seem find way write on kafka spark. i've tried this:
input.foreachrdd(rdd => rdd.foreachpartition(partition => partition.foreach{ case x:string=>{ val props = new hashmap[string, object]() props.put(producerconfig.bootstrap_servers_config, brokers) props.put(producerconfig.value_serializer_class_config, "org.apache.kafka.common.serialization.stringserializer") props.put(producerconfig.key_serializer_class_config, "org.apache.kafka.common.serialization.stringserializer") println(x) val producer = new kafkaproducer[string,string](props) val message=new producerrecord[string, string]("output",null,x) producer.send(message) } } ) )
and works intended instancing new kafkaproducer every message unfeasible in real context , i'm trying work around it.
kafkaproducer not serializable, obviously.
i keep reference single instance every process , access when need send message. how can that?
my first advice try create new instance in foreachpartition , measure if fast enough needs (instantiating heavy objects in foreachpartition official documentation suggests).
another option use object pool illustrated in example:
i found hard implement when using checkpointing.
another version working me factory described in following blog post, have check if provides enough parallelism needs (check comments section):
Comments
Post a Comment