MichaelSegel

Simple Spark Tips #1

Blog Post created by MichaelSegel on Oct 19, 2017

Many developers are switching over to using Spark and Spark.SQL as a way to ingest and use data.  As an example, you could be asked to take a .csv file and convert it in to a parquet file or even a Hive or MapR-DB table.

 

With spark, its very easy to do this... you just load the file in to a DataFrame/DataSet and then write the file out as a parquet file and you're done. The code to create the DataFrame:

val used_car_databaseDF = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")
        .load(used_cars_databaseURL);

where the used_cars_databaseURL is a String of the path to the file that I had created earlier in my code.

 

But suppose you want to work with the data as a SQL table? Spark allows you to create temporary tables/views of the data and rename the DataFrame (DF) into a table 'alias'. 

used_car_databaseDF.createOrReplaceTempView("used_cars");

Here I've created a table/view used_cars where I can now use this in a Spark.sql command.

spark.sql("SELECT COUNT(*)  FROM used_cars ").show();

Obviously this is just a simple example just to show that you can run a query and see its output.

If you're working with a lot of different tables, its easy to lose tract of the tables/views that you've created.

 

But spark does have a couple of commands which will allow you to view the list of tables that you have already set up for use.  The spark.catalog .  Below is some sample code I pulled from my notebook where I have been experimenting with using Spark and MapR

import org.apache.spark.sql.catalog
import org.apache.spark.sql.SparkSession

spark.catalog
spark.catalog.listTables.show
//Note: We need to look at listing columns for each table...
spark.catalog.listColumns("creditcard").show // Another test table

// Now lets try to run thru catalog
println("Testing walking thru the table catalog...")
val tableDS = spark.catalog.listTables
println("There are "+ tableDS.count + " rows in the catalog...")

tableDS.printSchema // Prints the structure of the objects in the dataSet

tableDS.collect.foreach{
    e => println(e.name) }

// Now trying a different way...

spark.catalog.listTables.collect.foreach{
    e =>
    val n = e.name
    println("Table Name: "+n)
    spark.catalog.listColumns(n).collect.foreach{ e => println("\t"+e.name+"\t"+e.dataType) }
}
   

Note: While I am not sure if I needed to pull in org.apache.spark.sql.SparkSession , it doesn't seem to hurt.

 

In the sample code, I use the method show() which formats the output and displays it.  However, show() is limited to only the first 20 rows of output, regardless of the source. This can be problematic, especially if you have more than 20 temp tables in your session, or that there are more than 20 columns when we inspect a table.

For more information on the spark catalog, please see:

https://spark.apache.org/docs/2.1.1/api/scala/index.html#org.apache.spark.sql.catalog.Catalog 

listTables() is a method that returns a list of Table objects that describe the temporary view that I have created.

 

As I said earlier, while show() will give you a nice formatted output, it limits you to the first 20 rows of output.

So lets instead printout our own list of tables.  I used the printSchema() method to identify elements of the Table object. The output looks like this:

root
|-- name: string (nullable = true)
|-- database: string (nullable = true)
|-- description: string (nullable = true)
|-- tableType: string (nullable = true)
|-- isTemporary: boolean (nullable = false)

For my first example, I'm walking through the list of tables and printing out the name of the table.

 

In my second example, foreach table I want to print out the table's schema. (In this example, only the column name and its data type.  This works well when you have more than 20 tables and you have more than 20 columns in a table.

 

If we put this all together, we can load a file, apply filters, and then store the data.  Without knowing the schema, its still possible to determine the data set's schema and then use that information to build out a schema to dynamically create a hive table or to put the data in to a MapR-DB table.

Outcomes