AnsweredAssumed Answered

Orc predicate pushdown with Spark Sql

Question asked by sgudavalliR on Oct 23, 2017
Latest reply on Nov 1, 2017 by cathy

Hello,

 

I am working with Spark SQL to query Hive Managed Table (in Orc Format)

 

I have my data organized by partitions and asked to set indexes for each 50,000 Rows by setting ('orc.row.index.stride'='50000') 

 

lets say -> after evaluating partition there are around 50 files in which data is organized.

 

Each file contains data specific to one given "cat" and I have set up a bloom filter on cat.

 

my spark SQL query looks like this ->

 

select * from logs where cdt= 20171002 and catpartkey= others and usrpartkey= logUsers and cat = 24;

 

I have set following property in my spark Sql context and assuming this will push down the filters 

sqlContext.setConf("spark.sql.orc.filterPushdown", "true")

 

Never my filters are being pushed down. and it seems like partition pruning is happening on all files. I dont understand no matter what my query is, it is triggering 50 tasks and reading all files. 

 

Here is my debug logs -> 

 

17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0, size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
17/10/23 17:26:43 DEBUG OrcInputFormat: No ORC pushdown predicate
17/10/23 17:26:43 INFO OrcRawRecordMerger: min key = null, max key = null
17/10/23 17:26:43 INFO ReaderImpl: Reading ORC rows from maprfs:///apps/spark/logs/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0 with {include: [true, true, false, false, false, false, true, false, false, false, false, false, false, false, false, false, false, false], offset: 0, length: 9223372036854775807}
17/10/23 17:26:43 DEBUG MapRClient: Open: path = /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0
17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0, size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
17/10/23 17:26:43 DEBUG RecordReaderImpl: chunks = [range start: 67684 end: 15790993, range start: 21131541 end: 21146035]
17/10/23 17:26:43 DEBUG RecordReaderImpl: merge = [data range [67684, 15790993), size: 15723309 type: array-backed, data range [21131541, 21146035), size: 14494 type: array-backed]
17/10/23 17:26:43 DEBUG Utilities: Hive Conf not found or Session not initiated, use thread based class loader instead
17/10/23 17:26:43 DEBUG HadoopTableReader: org.apache.hadoop.hive.ql.io.orc.OrcStruct$OrcStructInspector<org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector@e8220d5>
17/10/23 17:26:43 DEBUG GeneratePredicate: Generated predicate '(input[1, IntegerType] = 27)':

 

and here is my execution plan 

== Parsed Logical Plan ==
'Limit 1000
+- 'Sort ['id DESC], true
+- 'Project [unresolvedalias('id)]
+- 'Filter (((('cdt = 20171002) && ('catpartkey = others)) && ('usrpartkey = logUsers)) && ('cat = 27))
+- 'UnresolvedRelation `auditlogsv5`, None

== Analyzed Logical Plan ==
id: string
Limit 1000
+- Sort [id#165 DESC], true
+- Project [id#165]
+- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && (usrpartkey#164 = logUsers)) && (cat#170 = 27))
+- MetastoreRelation default, auditlogsv5, None

== Optimized Logical Plan ==
Limit 1000
+- Sort [id#165 DESC], true
+- Project [id#165]
+- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && (usrpartkey#164 = logUsers)) && (cat#170 = 27))
+- MetastoreRelation default, auditlogsv5, None

== Physical Plan ==
TakeOrderedAndProject(limit=1000, orderBy=[id#165 DESC], output=[id#165])
+- ConvertToSafe
+- Project [id#165]
+- Filter (cat#170 = 27)
+- HiveTableScan [id#165,cat#170], MetastoreRelation default, logs, None, [(cdt#162 = 20171002),(catpartkey#163 = others),(usrpartkey#164 = logUsers)]

 

 

Am I missing something here. I am on MEP 1.1.0 (spark 1.6.1 and hive 1.2.0)

 

please correct me. Thank you

Outcomes