scala - How to write to Kafka from Spark Streaming -


i'm using spark streaming process data between 2 kafka queues can't seem find way write on kafka spark. i've tried this:

input.foreachrdd(rdd =>       rdd.foreachpartition(partition =>                  partition.foreach{                   case x:string=>{                      val props = new hashmap[string, object]()                     props.put(producerconfig.bootstrap_servers_config, brokers)                     props.put(producerconfig.value_serializer_class_config,                       "org.apache.kafka.common.serialization.stringserializer")                     props.put(producerconfig.key_serializer_class_config,                       "org.apache.kafka.common.serialization.stringserializer")                      println(x)                     val producer = new kafkaproducer[string,string](props)                     val message=new producerrecord[string, string]("output",null,x)                     producer.send(message)                   }                 }         )     )  

and works intended instancing new kafkaproducer every message unfeasible in real context , i'm trying work around it.

kafkaproducer not serializable, obviously.

i keep reference single instance every process , access when need send message. how can that?

my first advice try create new instance in foreachpartition , measure if fast enough needs (instantiating heavy objects in foreachpartition official documentation suggests).

another option use object pool illustrated in example:

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/pooledkafkaproducerappfactory.scala

i found hard implement when using checkpointing.

another version working me factory described in following blog post, have check if provides enough parallelism needs (check comments section):

http://allegro.tech/2015/08/spark-kafka-integration.html


Comments

Popular posts from this blog

javascript - Karma not able to start PhantomJS on Windows - Error: spawn UNKNOWN -

c# - Display ASPX Popup control in RowDeleteing Event (ASPX Gridview) -

Nuget pack csproj using nuspec -