AnsweredAssumed Answered

Spark Code works in Shell but fails when submmited as job

Question asked by charanthota on May 1, 2016
Latest reply on May 2, 2016 by charanthota

Hi,

 

I am trying to run this piece of code which actually flattens an xml based on some config json, I was using spark-xml to get the job done.

When I go through the steps in my code using spark shell i was able to see the data getting flattened in the form of table

 

/opt/mapr/spark/spark-1.5.2/bin/spark-shell --packages com.databricks:spark-xml_2.10:0.3.2

import org.apache.spark.sql.types.StructType

import scala.reflect.runtime.universe

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.client.Put

import org.apache.hadoop.hbase.client.Result

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapred.JobConf

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.rdd.PairRDDFunctions

import org.apache.spark.sql.Row

import org.apache.spark.sql.functions.avg

import org.apache.hadoop.mapreduce.Job

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SQLContext

import org.apache.hadoop.fs.FileSystem

import org.apache.hadoop.fs.Path

import java.io.File

import org.ojai.Document

import com.mapr.db.MapRDB

import com.mapr.db.Table

 

 

 

 

def formString(sf: StructType, my:StringBuilder, head:String, fieldPath:String, doc:Document ): String = {

  var prefix = head

  if(head.equalsIgnoreCase("EMPTY")) prefix = "" else prefix = prefix + "."

  for(ss <- sf.seq){

  val path = fieldPath+"."+ss.name

  if (ss.dataType.isInstanceOf[org.apache.spark.sql.types.StructType]){

  my.append(formString(ss.dataType.asInstanceOf[org.apache.spark.sql.types.StructType],new StringBuilder, prefix+ss.name, path , doc))

  } else if(!(ss.name.equalsIgnoreCase("xsi") || ss.name.equalsIgnoreCase("#VALUE"))){

  var docVal = doc.getString(path)

  if(docVal == null) {

  println(path)

  println(docVal)

  docVal = ss.name

  }

  my.append(prefix).append(ss.name).append(" as ").append(docVal).append(",")

  }

  }

  return my.toString;

   }

 

 

val processor = "GPS"

val txn = "CardFee"

val file = "/poc/"+processor+".xml"

val configFormat = "FieldMappings."+txn

val saveTable = "/projects/hbase/" + processor

val table = MapRDB.getTable("MyMappings")

val record = table.findById(processor)

val df = sqlContext.read.format("xml").option("rootTag",txn).option("rowTag",txn).option("attributePrefix","").load(file)

df.registerTempTable(txn)

val query = new StringBuilder

var genQuery = formString(df.schema, new StringBuilder, "EMPTY", configFormat , record)

query.append("select ").append(genQuery.dropRight(1)).append(" from ").append(txn)

val tdf = sqlContext.sql(query.toString)

tdf.show

 

It just works superb, But when i run the same file using spark-submit it just fails thowing different kind of errors for different xml tags

 

/opt/mapr/spark/spark-1.5.2/bin/spark-submit --driver-class-path `hbase classpath` --master local[2] --jars spark-xml_2.10-0.3.2.jar --class examples.Test sparkstreamhbaseapp-1.0.jar GPS CardAuthorisation

 

Exception in thread "main" java.lang.RuntimeException: [1.1925] failure: ``*'' expected but `group' found

 

 

select Account.No as CARDHOLDERACCOUNTNUMBER,Account.Type as ASSOCIATION,ApprCode as APPROVALCODE,AuthId as TRANSACTIONID,AuthTxnID as ADDITIONALTRANSACTIONID,BillAmt.currency as BILLAMOUNTCURRENCY,BillAmt.rate as BILLAMOUNTRATE,BillAmt.value as BILLAMOUNT,Card.PAN as PAN,Card.branchcode as AGENTBRANCHCODE,Card.product as CARDPRODUCT,Card.programid as PROGRAMID,CashbackAmt.currency as CASHBACKAMOUNTCURRENCY,CashbackAmt.value as CASHBACKAMOUNT,Classification.MCC as MCC,Classification.RCC as RCC,CommissionAmt.currency as COMMISSIONAMOUNTCURRENCY,CommissionAmt.value as COMMISSIONAMOUNT,Fixed_Fee.value as FIXEDFEEAMOUNT,LocalDate as TXNORIGINDATE,MerchCode as MERCHANTCODE,MsgSource.domesticMaestro as DOMESTICMAESTROFLAG,MsgSource.value as MESSAGESOURCE,OrigTxnAmt.currency as ORIGINALTRANSACTIONAMOUNTCURRENCY,OrigTxnAmt.value as ORIGINALTRANSACTIONAMOUNT,PaddingAmt.currency as PADDINGAMOUNTCURRENCY,PaddingAmt.value as PADDINGAMOUNT,Rate_Fee.value as RATEFEEAMOUNT,RecType as RECORDTYPE,Response.Actioncode as ACTIONCODEAPPROVAL,Response.AdditionalDesc as RESPONSEADDITIONALDESCRIPTION,Response.Approved as RESPONSEAPPROVEDSTATUS,Response.Responsecode as ACTIONCODERESPONSE,ReversalReason as REVERSALREASON,Schema as SCHEMA,SettlementDate as SETTLEMENTDATE,Term.authcapability as CARDAUTHENTICATIONCAPABILITY,Term.city as TERMINALCITY,Term.code as TERMINALCODE,Term.country as TERMINALCOUNTRY,Term.inputcapability as CARDINPUTCAPABILITY,Term.location as TERMINALLOCATION,Term.street as TERMINALSTREET,Trace.Retrefno as RRN,Trace.auditno as STAN,Trace.origauditno as STANORIGINAL,Txn.cardauthentity as CARDHOLDERAUTHENTICATIONENTITY,Txn.cardauthmethod as CARDAUTHENTICATIONMETHOD,Txn.cardholderpresent as CARDHOLDERPRESENCE,Txn.cardinputmethod as CARDINPUTMETHOD,Txn.cardpresent as CARDPRESENCE,TxnAmt.currency as TRANSACTIONAMOUNTCURRENCY,TxnAmt.value as TRANSACTIONAMOUNT,TxnCode.FeeWaivedOff as FEEPENDING,TxnCode.Group as TRANSACTIONSUBCODE,TxnCode.Partial as FEEPARTIALCHARGE,TxnCode.ProcCode as TRANSACTIONPROCESSINGCODE,TxnCode.Type as TRANSACTIONCODE,TxnCode.direction as CREDITDEBITINDICATOR from CardAuthorisation

                                                                                                                                                                                                                                                                                                                             ^

        at scala.sys.package$.error(package.scala:27)

        at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)

        at org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)

        at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:185)

        at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:185)

        at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:115)

        at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)

        at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)

        at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)

        at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)

        at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)

        at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)

        at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)

        at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)

        at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)

        at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)

        at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)

        at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)

        at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)

        at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)

        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

        at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)

        at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)

        at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)

        at org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:182)

        at org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:182)

        at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:42)

        at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:205)

        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:741)

        at examples.Test$.main(Test.scala:61)

        at examples.Test.main(Test.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:685)

        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)

        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

2016-05-01 19:55:54,964 INFO  [Thread-3] spark.SparkContext: Invoking stop() from shutdown hook

-----------------------------------------------------------------------------------------------------------------------

Case 2:

============================================================================

select Account.No as CARDHOLDERACCOUNTNUMBER,Account.Type as ASSOCIATION,Amt.currency as FEEAMOUNTCURRENCY,Amt.direction as FEECREDITDEBITINDICATOR,Amt.value as FEEAMOUNT,Card.PAN as PAN,Card.ProgramID as PROGRAMID,Card.branchcode as AGENTBRANCHCODE,Card.product as CARDPRODUCT,CardFeeId as TRANSACTIONID,Desc as ADJUSTDESC,FeeAmt.currency as TRANSACTIONFEECURRENCY,FeeAmt.direction as TRANSACTIONFEECREDITDEBITINDICATOR,FeeAmt.value as TRANSACTIONFEEAMOUNT,FeeClass.code as CARDHOLDERFEETYPEINDICATOR,FeeClass.interchangeTransaction as INTERCHANGEFEEINDICATOR,FeeClass.type as FEETYPEINDICATOR,LoadUnloadId as IDLOADUNLOADTRANSACTION,LocalDate as TXNORIGINDATE,ReasonCode as REASONCODE,SettlementDate as SETTLEMENTDATE,TxId as SETTLEMENTPRESENTMENTID from CardFee

============================================================================

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'desc' given input columns xsi, LocalDate, Account, Desc, FeeClass, ReasonCode, FeeAmt, TxId, Card, LoadUnloadId, SettlementDate, Amt, CardFeeId;

        at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)

        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:56)

        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:53)

        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293)

        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293)

        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)

        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:292)

        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:290)

        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:290)

        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)

        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

        at scala.collection.Iterator$class.foreach(Iterator.scala:727)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)

        at scala.collection.AbstractIterator.to(Iterator.scala:1157)

        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

        at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279)

        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:290)

        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:107)

        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:117)

        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:121)

        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

        at scala.collection.immutable.List.foreach(List.scala:318)

        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

        at scala.collection.AbstractTraversable.map(Traversable.scala:105)

        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:121)

        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:125)

        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

        at scala.collection.Iterator$class.foreach(Iterator.scala:727)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)

        at scala.collection.AbstractIterator.to(Iterator.scala:1157)

        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:125)

        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:53)

        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49)

        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:103)

        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:49)

        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)

        at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:930)

        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)

        at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)

        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:741)

        at examples.Test$.main(Test.scala:61)

        at examples.Test.main(Test.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:685)

        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)

        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

2016-05-01 19:57:05,748 INFO  [Thread-3] spark.SparkContext: Invoking stop() from shutdown hook

 

 

I am clueless what could be the reason for this kind of odd behaviour, Could some one shed some light?

Attachments

Outcomes