This article lists few of the Spark SQL tuning parameters that may be set before running complex Spark SQL queries.
Spark SQL can cache tables using an in-memory columnar format by calling:
spark.catalog.cacheTable("tableName") or dataFrame.cache()
Then Spark SQL will scan only required columns and will automatically tune compression to minimize memory usage and GC pressure.
You can call below to remove the table from memory.
It’s possible to control the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data.
The number of partitions to use when shuffling data for joins or aggregation can be controlled. Setting the optimal value for the shuffle partitions improves the performance. Number of partitions and number of reduce tasks would be same. If the reducer has resource intensive operations, then increasing the shuffle partitions would increase the parallelism and result in better utilization of the resources and minimize the load per task.
Broadcast join is similar to the Map Join in Hive, where the smaller table will be loaded into distributed cache and join operations can be done as Map only operations. Broadcast join is turned on by default in Spark SQL.
Broadcast join can be turned off as below:
The same property can be used to increase the maximum size of the table that can be broadcasted while performing join operation. The default value is 10 MB and the same is expressed in bytes. Example below is the configuration to set the maximum size to 50MB.