AnsweredAssumed Answered

Adding filename as column to each record of dataframe

Question asked by charanthota on May 5, 2016
Latest reply on May 7, 2016 by charanthota


How do i add a filename as a column in a dataframe or RDD

 

/opt/mapr/spark/spark-1.5.2/bin/spark-shell --packages com.databricks:spark-xml_2.10:0.3.3

 

val df = sqlContext.read.format("xml").option("rootTag","CardFee").option("rowTag","CardFee").option("attributePrefix","").load("/poc/GPS")

For single file by file i tried

df.withColumn("columnName", lit("ColumnValue")) which worked for me, but the problem with file by file is when i do unionAll of dataframes they fail if the data frames does not have same set of numbers.

 

I tried getting filename using the below way

 

import java.io.Serializable;

import org.apache.spark.rdd.NewHadoopPartition;

import org.apache.spark.Partition;

public class DeriveFileName implements Serializable{

   public String getFileName(Partition partition)

   {

       NewHadoopPartition npp = (NewHadoopPartition) partition;

       String filePath=npp.serializableHadoopSplit().value().toString();

      return filePath;

    }

}

 

scala> val df = sqlContext.read.format("xml").option("rootTag","CardFee").option("rowTag","CardFee").option("attributePrefix","").load("/poc/GPS")

df: org.apache.spark.sql.DataFrame = [Account: struct<#VALUE:string,No:bigint,Type:bigint>, Amt: struct<#VALUE:string,currency:bigint,direction:string,value:double>, Card: struct<#VALUE:string,PAN:bigint,ProgramID:string,branchcode:bigint,product:string>, CardFeeId: bigint, Desc: string, FeeAmt: struct<#VALUE:string,currency:bigint,direction:string,value:double>, FeeClass: struct<#VALUE:string,code:bigint,interchangeTransaction:string,type:bigint>, LoadUnloadId: bigint, LocalDate: bigint, ReasonCode: bigint, SettlementDate: bigint, TxId: bigint, xsi: string]

 

 

scala> val obj = new DeriveFileName

obj: DeriveFileName = DeriveFileName@3e1f6318

 

 

scala> for(part <- df.rdd.partitions){ println(obj.getFileName(part)) }

maprfs:///poc/GPS/GPS-OPTtxnexp20160303.XML:0+44048810

maprfs:///poc/GPS/GPS-OPTtxnexp20160228.XML:0+107356738

maprfs:///poc/GPS/GPS-OPTtxnexp20160304.XML:0+23584987

maprfs:///poc/GPS/GPS-OPTtxnexp20160229.XML:0+46557726

maprfs:///poc/GPS/GPS-OPTtxnexp20160305.XML:0+22444948

maprfs:///poc/GPS/GPS-OPTtxnexp20160227.XML:0+26510311

maprfs:///poc/GPS/GPS-OPTtxnexp20160301.XML:0+42816879

maprfs:///poc/GPS/GPS-OPTtxnexp20160302.XML:0+43808281

 

 

Is there a way where i could get filenames for each partition and then iterate over each row in partition and add that filename as column

Outcomes