AnsweredAssumed Answered

Quering the MapR stream messages for detection of duplicate records in producer

Question asked by mahdi62b on Nov 13, 2016
Latest reply on Dec 2, 2016 by iandow

Hi,

 

I am trying to avoid sending duplicate messages by quering the kafka (MaprStream) messages beforehand in my producer..

By Querying Messages in MapR Streams ,I use bellow codes in my spark streaming application..

 

val ggsn=//from a file stream

ggsnArray.foreachRDD( rdd => {

  //val documents= com.mapr.streams.Streams.getMessageStore("/sample-stream","ggsn")




  System.out.println("# events = " + rdd.count())

  rdd.foreachPartition( partition => {
    // Print statements in this section are shown in the executor's stdout logs


    val topic = "/sample-stream:ggsn"
    val properties=new Properties()
    properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
    properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    properties.put("key.deserializer","org.apache.kafka.common.serialization.StrinDeserializer")
    properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

    //properties.put("client.id","testgroup")

    val producer = new KafkaProducer[String, String](properties)

 

    partition.foreach( record => {
      val data = record.toString
      //data=data.concat(",STREAM-GGSN")



      // As as debugging technique, users can write to DBFS to verify that records are being written out
      // dbutils.fs.put("/tmp/test_kafka_output",data,true)
      val message = new ProducerRecord[String, String](topic, data)
     
     
val condition=MapRDB.newCondition().is("value",Op.EQUAL ,Values.parseBinary(message.value())).close().build()
val documents=com.mapr.streams.Streams.getMessageStore("/sample-stream").find(condition)
      println(documents)


      producer.send(message)
      producer.flush()

    })
    producer.close()
  })

})

 

 

 

The above gives me an exception

 

Job aborted due to stage failure: Task 4 in stage 3.0 failed 4 times, most recent failure: Lost task 4.3 in stage 3.0 (TID 32, cluster-1): java.lang.IllegalArgumentException: java.io.IOException: Unrecognized character: ,      at com.google.common.io.BaseEncoding.decode(BaseEncoding.java:237)      at org.ojai.util.Values.parseBinary(Values.java:149)      at RecoverableQueringProducer$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2$$anonfun$apply$3.apply(RecoverableQueryingProducer.scala:189)      at RecoverableQueringProducer$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2$$anonfun$apply$3.apply(RecoverableQueryingProducer.scala:164)      at scala.collection.Iterator$class.foreach(Iterator.scala:727)      at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)      at RecoverableQueringProducer$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.apply(RecoverableQueryingProducer.scala:164)      at RecoverableQueringProducer$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.apply(RecoverableQueryingProducer.scala:146)      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)

 

at the line when creating condition 

val condition=MapRDB.newCondition().is("value",Op.EQUAL ,Values.parseBinary(message.value())).close().build()


I am runnign above code on MapR 5.2 on a cluster mode.

I have added this line to my sbt file
"com.mapr.streams" % "mapr-streams" % "5.2.0-mapr",

I get the above errors for charactrets like ',' and '.'..I dont know how to resolve this problem

Outcomes