AnsweredAssumed Answered

Using DirectKafka APIs in MapR Streams

Question asked by mahdi62b on Oct 18, 2016

Hi, I am trying to use DirectKafka API from MapR streams which produced messages from a filestream..


val sparkConf = new   
   SparkConf().setAppName("StreamProducer").set("spark.streaming.fileStream.minRememberDuration", "2000000h")
     .set("spark.shuffle.blockTransferService", "nio")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val ggsnArray = ssc.fileStream[LongWritable, Text, TextInputFormat]("/mapr/cellos-mapr/user/mbazarganigilani/SparkStreaming1/src/main/GGSN", filterF, false).repartition(20)
     .map(x => x._2.toString()).filter(x => funcGSSNFilterHeader(x))
ggsnArray.foreachRDD( rdd => {
  System.out.println("# events = " + rdd.count())
  rdd.foreachPartition( partition => {
    // Print statements in this section are shown in the executor's stdout logs
    val topic = "/sample-stream:ggsn"
    val properties=new Properties()
    val producer = new KafkaProducer[String, String](properties)
    partition.foreach( record => {
      val data = record.toString
      // As as debugging technique, users can write to DBFS to verify that records are being written out
      // dbutils.fs.put("/tmp/test_kafka_output",data,true)
      val message = new ProducerRecord[String, String](topic, data)
val topics="/sample-stream:ccn,/sample-streams:ggsn"
val kafkaBrokers = ","
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("" -> kafkaBrokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams , topicsSet)
// Get the lines, split them into words, count the words and print
val lines =
val words = lines.flatMap(_.split(" "))
val wordCounts = => (x, 1L)).reduceByKey(_ + _)

// Start the computation


I can produce message successfully but not sure how to use DirectStream at the below line

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]
   (ssc, kafkaParams , topicsSet)

I am using above parameters on a Mapr Cluster but getting 


which indicates the broker parameters are incorrect..
can someone guide me to properly use directkafka API for MapR streams..