AnsweredAssumed Answered

Writing spark streaming output to maprdb

Question asked by dhinakaran on Feb 12, 2018
Latest reply on Feb 20, 2018 by vmeghraj

I am trying to write a sample streaming application which continuously listens to a mapr-fs path for files and loads the data into a maprdb table.

 

I have followed the connector configurations mentioned in "Configuring the MapR-DB Binary Connector for Apache Spark "

 

Below is my code & the error faced:

 

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf

case class DataCount(msisdn: String, cost: String) extends Serializable

object DataCount extends Serializable {
final val tableName = "/hbase/table1"
final val cfDataBytes = Bytes.toBytes("data")
final val colCostBytes = Bytes.toBytes("cost")
def parseData(str: String): DataCount = {
val p = str.split(",")
DataCount(p(0), p(1))
}

def convertToPut(dataCount: DataCount ): (ImmutableBytesWritable, Put) = {
val rowkey = dataCount.msisdn
val put = new Put(Bytes.toBytes(rowkey))
put.addColumn(cfDataBytes, colCostBytes, Bytes.toBytes(dataCount.cost))
return (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put)
}



def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("StreamingMapRDB")
.setMaster("local")
val conf1 = HBaseConfiguration.create()
conf1.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val jobConfig: JobConf = new JobConf(conf1, this.getClass)
jobConfig.set("mapreduce.output.fileoutputformat.outputdir", "/var/test")
jobConfig.setOutputFormat(classOf[TableOutputFormat])
jobConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName)


val ssc = new StreamingContext(conf,Seconds(1))
val lines = ssc.textFileStream("/var/test/")
val output = lines.map(DataCount.parseData)

output.foreachRDD{ rdd =>
rdd.map(DataCount.convertToPut).saveAsHadoopDataset(jobConfig)

}
ssc.start()
ssc.awaitTermination()
}
}

 

ERROR:

 

18/02/12 15:24:23 ERROR Inode: getFamilyId() on '/hbase/table1' failed with error: No such file or directory (2).
18/02/12 15:24:23 ERROR Utils: Aborting task
java.io.InterruptedIOException: Cannot put to this mapr table. Reason: java.io.InterruptedIOException: com.mapr.fs.PathNotFoundException: getFamilyId() on '/hbase/table1' failed with error: No such file or directory (2).
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.doMutate(BufferedMutatorImpl.java:192)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:155)
at org.apache.hadoop.hbase.mapred.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:70)
at org.apache.hadoop.hbase.mapred.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:54)
at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:95)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1212)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1210)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1210)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1218)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
at org.apache.spark.scheduler.Task.run(Task.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/02/12 15:24:23 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.InterruptedIOException: Cannot put to this mapr table. Reason: java.io.InterruptedIOException: com.mapr.fs.PathNotFoundException: getFamilyId() on '/hbase/table1' failed with error: No such file or directory (2).
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.doMutate(BufferedMutatorImpl.java:192)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:155)
at org.apache.hadoop.hbase.mapred.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:70)
at org.apache.hadoop.hbase.mapred.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:54)
at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:95)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1212)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1210)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1210)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1218)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
at org.apache.spark.scheduler.Task.run(Task.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Outcomes