AnsweredAssumed Answered

ObjectStore: Failed to get database default, returning NoSuchObjectException

Question asked by karthikSpark on Jan 17, 2017
Latest reply on Jan 18, 2017 by karthikSpark

bgajjela

Raju Bairishetti

Community Manager

Takeshi NAKANO

Hi Mapr Community,

I'm here trying to persist data frame as hive table using spark streaming application and getting these exceptions.

 

./bin/spark-submit --class "HiveGenerator" --master yarn --deploy-mode client --executor-memory 2G /home/mapr/Hivegenerator/target/scala-2.10/HiveGenerator.jar
17/01/17 03:47:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/01/17 03:48:40 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/01/17 03:48:40 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException


root
|-- action: string (nullable = true)
|-- device_os_ver: string (nullable = true)
|-- device_type: string (nullable = true)
|-- event_name: string (nullable = true)
|-- item_name: string (nullable = true)
|-- lat: double (nullable = true)
|-- lon: double (nullable = true)
|-- memberid: long (nullable = true)
|-- productUpccd: long (nullable = true)
|-- tenantid: long (nullable = true)

17/01/17 03:48:53 ERROR JobScheduler: Error running job streaming job 1484642910000 ms.0
java.lang.RuntimeException: Partition column Timestamp_val not found in schema StructType(StructField(action,StringType,true), StructField(device_os_ver,StringType,true), StructField(device_type,StringType,true), StructField(event_name,StringType,true), StructField(item_name,StringType,true), StructField(lat,DoubleType,true), StructField(lon,DoubleType,true), StructField(memberid,LongType,true), StructField(productUpccd,LongType,true), StructField(tenantid,LongType,true))
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$partitionColumnsSchema$1$$anonfun$apply$7.apply(ResolvedDataSource.scala:195)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$partitionColumnsSchema$1$$anonfun$apply$7.apply(ResolvedDataSource.scala:195)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$partitionColumnsSchema$1.apply(ResolvedDataSource.scala:194)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$partitionColumnsSchema$1.apply(ResolvedDataSource.scala:193)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.partitionColumnsSchema(ResolvedDataSource.scala:193)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.validatePartitionColumnDataTypes(PartitioningUtils.scala:333)
at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$3.apply(rules.scala:194)
at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$3.apply(rules.scala:109)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:111)
at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:109)
at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:105)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:218)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:218)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:218)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:39)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:38)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:43)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:43)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:251)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:221)
at HiveGenerator$$anonfun$main$1.apply(HiveGenerator.scala:85)
at HiveGenerator$$anonfun$main$1.apply(HiveGenerator.scala:73)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "main" java.lang.RuntimeException: Partition column Timestamp_val not found in schema StructType(StructField(action,StringType,true), StructField(device_os_ver,StringType,true), StructField(device_type,StringType,true), StructField(event_name,StringType,true), StructField(item_name,StringType,true), StructField(lat,DoubleType,true), StructField(lon,DoubleType,true), StructField(memberid,LongType,true), StructField(productUpccd,LongType,true), StructField(tenantid,LongType,true))
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$partitionColumnsSchema$1$$anonfun$apply$7.apply(ResolvedDataSource.scala:195)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$partitionColumnsSchema$1$$anonfun$apply$7.apply(ResolvedDataSource.scala:195)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$partitionColumnsSchema$1.apply(ResolvedDataSource.scala:194)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$partitionColumnsSchema$1.apply(ResolvedDataSource.scala:193)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.partitionColumnsSchema(ResolvedDataSource.scala:193)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.validatePartitionColumnDataTypes(PartitioningUtils.scala:333)
at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$3.apply(rules.scala:194)
at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$3.apply(rules.scala:109)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:111)
at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:109)
at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:105)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:218)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:218)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:218)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:39)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:38)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:43)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:43)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:251)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:221)
at HiveGenerator$$anonfun$main$1.apply(HiveGenerator.scala:85)
at HiveGenerator$$anonfun$main$1.apply(HiveGenerator.scala:73)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/01/17 03:48:53 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver

Here is my spark app,

 

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("HiveGenerator").
setMaster("yarn-client").set("spark.driver.allowMultipleContexts", "true").set("spark.speculation", "false")
sparkConf.set("spark.cores.max", "2")
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.app.id", "HiveGenerator")
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "true")
sparkConf.set("spark.streaming.backpressure.enabled", "true")
sparkConf.set("spark.sql.avro.compression.codec", "snappy")
sparkConf.set("spark.sql.avro.mergeSchema", "true")
sparkConf.set("spark.sql.avro.binaryAsString", "true")
sparkConf.set("spark.eventLog.enabled","true")
sparkConf.set("spark.hadoop.yarn.resourcemanager.address", "localhost:8032")
sparkConf.set("spark.sql.hive.thriftServer.singleSession", "true")
val sc = new SparkContext(sparkConf)
sc.hadoopConfiguration.set("avro.enable.summary-metadata", "false")
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
sc.addFile("/opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/core-site.xml")
sc.addFile("/opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/hdfs-site.xml")
val ssc = new StreamingContext(sc, Seconds(30))
val kafkaConf = Map[String, String]("metadata.broker.list" -> "localhost:9092",
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "KafkaConsumer",
"zookeeper.connection.timeout.ms" -> "100000")
val topicMaps = Map("topic" -> 1)
val messages = KafkaUtils.createStream[String, String,StringDecoder,StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)
messages.foreachRDD(rdd=>
{
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext.implicits._
val dataframe =sqlContext.read.json(rdd.map(_._2)).toDF()
import org.apache.spark.sql.functions._
dataframe.withColumn("Timestamp_val",current_timestamp())
dataframe.show()
dataframe.printSchema()
sqlContext.setConf("hive.metastore.warehouse.dir","maprfs:///localhost/user/hive/warehouse")
import org.apache.spark.sql.SaveMode
val tablepath = Map("path" -> "maprfs:///user/hive/warehouse/path/of/external/table")
dataframe.write.format("parquet").partitionBy("Timestamp_val").options(tablepath).mode(org.apache.spark.sql.SaveMode.Append).saveAsTable("table_name")


})

ssc.start()

ssc.stop()

}}

Here is how my external table is created:

create external table if not exists mdl_events1 (action String,device_os_ver String,device_type String,event_name String,item_name String, lat double,lon double,memberid bigint, productUpccd bigint,tenantid bigint) partitioned by (Timestamp_val timestamp)
stored as parquet location 'maprfs:///user/hive/warehouse/path/of/external/table';

Help me why am i getting this exception!!

Outcomes