AnsweredAssumed Answered

NameError: name 'SQLContext' is not defined

Question asked by ihijazi on Jul 28, 2016
Latest reply on Jul 28, 2016 by namato

So I used to submit a job and happily having it executed, then suddenly, submitting the same with the same exact, it started throwing the following error:

 

error

  File "/tmp/spark/Hive__Spark_Hive.py", line 2, in <module>

    from pyspark import SparkContext, SparkConf

  File "/opt/mapr/spark/spark-1.6.1/python/lib/pyspark.zip/pyspark/__init__.py", line 68, in <module>

  File "/opt/mapr/spark/spark-1.6.1/python/lib/pyspark.zip/pyspark/sql/__init__.py", line 47, in <module>

  File "/opt/mapr/spark/spark-1.6.1/python/lib/pyspark.zip/pyspark/sql/types.py", line 23, in <module>

    pass

  File "/tmp/spark/json.py", line 6, in <module>

    sqlContext = SQLContext(sc)

NameError: name 'SQLContext' is not defined

 

code

# -*- coding: utf-8 -*-

from pyspark import SparkContext, SparkConf

from pyspark.sql import *

config = SparkConf().setAppName("json").setMaster("yarn-client")

sc = SparkContext(conf = config)

sqlContext = SQLContext(sc)

sparkVersion = reduce(lambda sum, elem: sum*10 + elem, map(lambda x: int(x) if x.isdigit() else 0, sc.version.strip().split('.')), 0)

import sys

from datetime import *

hiveCtx = HiveContext(sc)

def convertRowToDict(row):

    ret = {}

    for num in range(0, len(row.__FIELDS__)) :

        ret[row.__FIELDS__[num]] = row[num]

    return ret

import json

def getValue(type,value,format='%Y-%m-%d'):

  try:

    if type is date:

        return datetime.strptime(value,format).date()

    else: return type(value)

  except ValueError:return None;

def getScaledValue(scale, value):

    try: return '' if value is None else ('%0.'+ str(scale) +'f')%float(value);

    except ValueError:return '';

def getStrValue(value, format='%Y-%m-%d'):

  if value is None : return ''

  if isinstance(value, date): return value.strftime(format)

  if isinstance(value, str): return unicode(value, 'utf-8')

  if isinstance(value, unicode) : return value

  try: return unicode(value)

  except UnicodeError : return ''

def safeExpr(expr, default):

    try: return expr();

    except Exception: return default;

strLoc   = 'hdfs://maprdemo:7222/user/mapr/sample_json/products.json'

protocol = 'file://' if  ( strLoc.startswith('/') or strLoc.startswith('\\') ) else ''

#1.1 template

CHI_1 = sqlContext.jsonFile(protocol+strLoc, samplingRatio=0.1)

CHI_1 = CHI_1.map(lambda x:x) #workaround for SP      ARK-4533

CHI_1 = CHI_1.filter(lambda CHI_1 : isinstance(CHI_1.dimensions,CHI_1.warehouseLocation, (list, tuple) ))

FLATTEN = CHI_1.flatMap(lambda CHI_1:

map(lambda CLX_TPE:Row(**{"id":CHI_1.id, "name":CHI_1.name, "price":CHI_1.price, "tags":CHI_1.tags, "length":CLX_TPE.length, "width":CLX_TPE.width, "height":CLX_TPE.height, "latitude":CLX_TPE.latitude, "longitude":CLX_TPE.longitude, "lantitude":CLX_TPE.lantitude}), CHI_1.dimensions,CHI_1.warehouseLocation))

hiveCtx.sql('CREATE TABLE IF NOT EXISTS products (  id STRING , name STRING , price STRING , tags STRING , length STRING , width STRING , height STRING , lantitude STRING , longitude STRING  )')

FLATTEN = FLATTEN.map(lambda FLATTEN : {'id' : FLATTEN.id,'name' : FLATTEN.name,'price' : FLATTEN.price,'tags' : FLATTEN.tags,'length' : FLATTEN.length,'width' : FLATTEN.width,'height' : FLATTEN.height,'lantitude' : FLATTEN.lantitude,'longitude' : FLATTEN.longitude})

(sqlTypeName, inferSchemaMethod) =  ('DataFrame', 'createDataFrame') if sparkVersion >= 130 else ('SchemaRDD', 'inferSchema')

if sqlTypeName not in type(FLATTEN).__name__ :

   FLATTEN = FLATTEN.map(lambda row : Row(**row) if isinstance(row,dict) else row)

   if FLATTEN.take(1).__len__() > 0 :

     FLATTEN = getattr(hiveCtx, inferSchemaMethod)(FLATTEN)

if sqlTypeName in type(FLATTEN).__name__:

   # Spark 1.3 bug FLATTEN.saveAsTable(products, mode='append')

   FLATTEN.registerTempTable('HIVE_TMP_377')

   hiveCtx.sql('INSERT  INTO  TABLE products \

       SELECT  id , name , price , tags , length , width , height , lantitude , longitude  FROM HIVE_TMP_377')

 

command used to submit

/opt/mapr/spark/spark-1.6.1/bin/spark-submit --master yarn-client --py-files  /tmp/spark/pyspark_ext.py --executor-memory 1G --driver-memory 512M --executor-cores 1 --driver-cores 1 --num-executors 2 --queue default /tmp/spark/Hive__Spark_Hive.py

 

 

Any idea what's wrong?

Outcomes