AnsweredAssumed Answered

Spark-Mapr DB Connector SaveToMapRDB expecting sorted doc_id's

Question asked by dalbhidebipin on Jun 21, 2018
Latest reply on Jun 26, 2018 by dalbhidebipin

Hi 

I am running Spark Job using Mapr DB Api and storing the result into Mapr DB JSON.

While writing the data from Spark RDD to Mapr DB JSON table we are seeing some weird behavior that MaprDB is expecting the doc_id in sorted order. Do we need to sort the RDD before storing values in DB?

 

My collection is as below.

case class keyChain(@JsonProperty("_id")_id:String, @JsonProperty("EID") EID: String, @JsonProperty("doc_crt_ts") doc_crt_ts:String)

My RDD is a collection of keyChain class. I saved my RDD using saveTOMapRDB method as below.

keyChainRDD.saveToMapRDB("/outputpath/keychaintoeid_dummy2",true, true)

 

I am unable to save RDD if _id contains unsorted values. I see below error

 

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, dbslp1494.uhc.com, executor 5): java.io.IOException: Received unsorted key-value: prev "12339911138713" cur "12339911138712"

at com.mapr.db.mapreduce.BulkLoadRecordWriter.write(BulkLoadRecordWriter.java:187)

at com.mapr.db.spark.writers.BulkTableWriter.write(BulkTableWriter.scala:20)

at com.mapr.db.spark.writers.BaseOJAIValue$$anon$2.write(OJAIKeyWriterHelper.scala:28)

at com.mapr.db.spark.RDD.OJAIDocumentRDDFunctions$$anonfun$saveToMapRDB$5.apply(DocumentRDDFunctions.scala:55)

at com.mapr.db.spark.RDD.OJAIDocumentRDDFunctions$$anonfun$saveToMapRDB$5.apply(DocumentRDDFunctions.scala:51)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)

at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)

at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)

at org.apache.spark.scheduler.Task.run(Task.scala:100)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

 

Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1436)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1424)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1423)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)

at scala.Option.foreach(Option.scala:257)

at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1651)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1606)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1595)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)

at com.mapr.db.spark.RDD.OJAIDocumentRDDFunctions.saveToMapRDB(DocumentRDDFunctions.scala:51)

at com.optum.edh.mdmdataparsers.IMDMRecordParser$.processIMDMGoldenSnapshot(IMDMRecordParser.scala:155)

at com.optum.edh.mdmdataparsers.IMDMRecordParser$.main(IMDMRecordParser.scala:82)

at com.optum.edh.mdmdataparsers.IMDMRecordParser.main(IMDMRecordParser.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:733)

at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:177)

at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:202)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:116)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.io.IOException: Received unsorted key-value: prev "12339911138713" cur "12339911138712"

at com.mapr.db.mapreduce.BulkLoadRecordWriter.write(BulkLoadRecordWriter.java:187)

at com.mapr.db.spark.writers.BulkTableWriter.write(BulkTableWriter.scala:20)

at com.mapr.db.spark.writers.BaseOJAIValue$$anon$2.write(OJAIKeyWriterHelper.scala:28)

at com.mapr.db.spark.RDD.OJAIDocumentRDDFunctions$$anonfun$saveToMapRDB$5.apply(DocumentRDDFunctions.scala:55)

at com.mapr.db.spark.RDD.OJAIDocumentRDDFunctions$$anonfun$saveToMapRDB$5.apply(DocumentRDDFunctions.scala:51)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)

at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)

at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)

at org.apache.spark.scheduler.Task.run(Task.scala:100)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

 

 

But when I swap the column values between _id and EID which mean I place EID values in _id column and vise a versa. I am able to store the data in Mapr DB. Since my EID values are in sorted order I am able to write to MapRDB JSON. Please see the below table output.

 

MapR-DB Shell

maprdb bdalbhid:> find '/outputpath/keychaintoeid_dummy2'

2018-06-21 11:24:30,8967 ERROR Cidcache fs/client/fileclient/cc/cidcache.cc:1899 Thread: 2069 Lookup of volume datalake failed, error Read-only file system(30), CLDB: 10.106.33.32:7222 backing off ...

2018-06-21 11:24:30,8999 ERROR Cidcache fs/client/fileclient/cc/cidcache.cc:1899 Thread: 2069 Lookup of volume uhclake failed, error Read-only file system(30), CLDB: 10.106.33.32:7222 backing off ...

2018-06-21 11:24:30,9028 ERROR Cidcache fs/client/fileclient/cc/cidcache.cc:1899 Thread: 2069 Lookup of volume uhclake_developer failed, error Read-only file system(30), CLDB: 10.106.33.32:7222 backing off ...

{"_id":"1","EID":"12339911138712","doc_crt_ts":"2018-06-18 12:19:18.074"}

{"_id":"10","EID":"123201045983987","doc_crt_ts":"2016-12-22 02:08:55"}

{"_id":"100","EID":"123940582033789","doc_crt_ts":"2017-03-16 20:00:10"}

{"_id":"1000","EID":"12398312461987","doc_crt_ts":"2017-03-12 21:48:33"}

{"_id":"10000","EID":"123973715817454","doc_crt_ts":"2017-11-13 10:27:46.661"}

 

 

 

Outcomes