AnsweredAssumed Answered

Using DirectKafka APIs in MapR Streams

Question asked by mahdi62b on Oct 18, 2016


Hi, I am trying to use DirectKafka API from MapR streams which produced messages from a filestream..

 

val sparkConf = new   
   SparkConf().setAppName("StreamProducer").set("spark.streaming.fileStream.minRememberDuration", "2000000h")
     .set("spark.shuffle.blockTransferService", "nio")
     .registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable],classOf[SIMPLE_KEY_JOINS],classOf[SIMPLE_GGSN],classOf[SIMPLE_GGSN_2],classOf[SIMPLE_CCN],classOf[SIMPLE_CCN_2],classOf[(SIMPLE_KEY_JOINS,String)],classOf[(LongWritable,Text)]))//.setMaster("local[7]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val ggsnArray = ssc.fileStream[LongWritable, Text, TextInputFormat]("/mapr/cellos-mapr/user/mbazarganigilani/SparkStreaming1/src/main/GGSN", filterF, false).repartition(20)
     .map(x => x._2.toString()).filter(x => funcGSSNFilterHeader(x))
ggsnArray.foreachRDD( rdd => {
  System.out.println("# events = " + rdd.count())
  rdd.foreachPartition( partition => {
    // Print statements in this section are shown in the executor's stdout logs
    val topic = "/sample-stream:ggsn"
    val properties=new Properties()
    properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
    properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](properties)
    partition.foreach( record => {
      val data = record.toString
      // As as debugging technique, users can write to DBFS to verify that records are being written out
      // dbutils.fs.put("/tmp/test_kafka_output",data,true)
      val message = new ProducerRecord[String, String](topic, data)
      producer.send(message)
    })
    producer.close()
  })
})
val topics="/sample-stream:ccn,/sample-streams:ggsn"
val kafkaBrokers = "172.16.50.64:9092,172.16.50.65:9093"
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaBrokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams , topicsSet)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()

 


I can produce message successfully but not sure how to use DirectStream at the below line

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]
   (ssc, kafkaParams , topicsSet)

I am using above parameters on a Mapr Cluster but getting 

java.nio.channels.ClosedChannelException

which indicates the broker parameters are incorrect..
can someone guide me to properly use directkafka API for MapR streams..

Outcomes