AnsweredAssumed Answered

Issue when saving RDD and dataframe to MapRDB

Question asked by Kashivishwanath on May 3, 2018
Latest reply on May 16, 2018 by maprcommunity

Hello All, 

 I am trying to stream data from MapR topic to MapR DB. I am using MapR Sandbox 5.2.1. I am attaching the code snippet and the error I am getting, please suggest me if I am missing anything and help me to fix this issue. 

 

CODE SNIPPET 1 (Saving Rdd to MapR-DB:

ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true")
//def saveToMapRDB(tablename: String, createTable: Boolean = true, bulkInsert: Boolean = false, idFieldPath: String = "username"): Unit = {}
val topic = "/TEST:SUPPLIER"
val topicSet = topic.split(",").toSet
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)
val lines = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
val tmp = lines.map(_.value())
tmp.print()

tmp.foreachRDD {rdd =>

rdd.saveToMapRDB("/tmp/TEST",createTable= true)

 

ERROR: 
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"SUPPLIER_CODE"},{"type":"string","optional":true,"field":"SUPPLIER_NAME"},{"type":"string","optional":true,"field":"SUPPLIER_DESC"}],"optional":false},"payload":{"SUPPLIER_CODE":3,"SUPPLIER_NAME":"Wallmart","SUPPLIER_DESC":"Wallmart "}}

 

18/05/03 14:46:42 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
org.ojai.exceptions.DecodingException: Failed to convert the java bean to Document
at com.mapr.db.spark.codec.BeanCodec$.decode(BeanCodec.scala:23)
at com.mapr.db.spark.writers.BaseOJAIValue$$anon$2.getValue(OJAIKeyWriterHelper.scala:27)
at com.mapr.db.spark.RDD.OJAIDocumentRDDFunctions$$anonfun$saveToMapRDB$5.apply(DocumentRDDFunctions.scala:55)
at com.mapr.db.spark.RDD.OJAIDocumentRDDFunctions$$anonfun$saveToMapRDB$5.apply(DocumentRDDFunctions.scala:51)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Mismatch in writeContext. Expected ARRAY but found null
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:141)
at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:2383)
at com.mapr.db.spark.codec.BeanCodec$.decode(BeanCodec.scala:18)
... 13 more
Caused by: java.lang.IllegalStateException: Mismatch in writeContext. Expected ARRAY but found null
at com.google.common.base.Preconditions.checkState(Preconditions.java:176)
at com.mapr.db.ojai.DBDocumentBuilder.checkContext(DBDocumentBuilder.java:290)
at com.mapr.db.ojai.DBDocumentBuilder.prepareAdd(DBDocumentBuilder.java:280)
at com.mapr.db.ojai.DBDocumentBuilder.addElementToList(DBDocumentBuilder.java:296)
at com.mapr.db.ojai.DBDocumentBuilder.add(DBDocumentBuilder.java:327)
at com.mapr.db.ojai.DBDocumentBuilder.add(DBDocumentBuilder.java:34)
at org.ojai.beans.jackson.DocumentGenerator.writeString(DocumentGenerator.java:289)
at com.fasterxml.jackson.databind.ser.std.StringSerializer.serialize(StringSerializer.java:49)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
... 15 more  

 

CODE SNIPPET ( saving DATA FRAME to MapR DB)

ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true")
//def saveToMapRDB(tablename: String, createTable: Boolean = true, bulkInsert: Boolean = false, idFieldPath: String = "ID"): Unit = {}

val topic = "/TEST:SUPPLIER"
val topicSet = topic.split(",").toSet
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)
val lines = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
val tmp = lines.map(_.value())
tmp.print()

tmp.foreachRDD {rdd =>
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import spark.implicits._
val dataframe = spark.read.json(parseAvroRdd(rdd));
dataframe.saveToMapRDB("/tmp/Hartest533",createTable= true)

}
ssc.start()
}

 

ERROR: 
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"SUPPLIER_CODE"},{"type":"string","optional":true,"field":"SUPPLIER_NAME"},{"type":"string","optional":true,"field":"SUPPLIER_DESC"}],"optional":false},"payload":{"SUPPLIER_CODE":3,"SUPPLIER_NAME":"Wallmart","SUPPLIER_DESC":"Wallmart "}}

 

Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: com.mapr.db.spark.utils.MapRSpark$.save(Lorg/apache/spark/sql/Dataset;Ljava/lang/String;Ljava/lang/String;ZZ)V
at com.mapr.db.spark.sql.MapRDBDataFrameFunctions.saveToMapRDB(MapRDBDataFrameFunctions.scala:18)
at $line91.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$STREAMHANA$$anonfun$main$1.apply(<console>:132)
at $line91.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$STREAMHANA$$anonfun$main$1.apply(<console>:128)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Outcomes