AnsweredAssumed Answered

Creating a Spark RDD from a MapR-DB table succeeds, but returns no rows

Question asked by imichaeldotorg on Dec 29, 2014
Latest reply on May 24, 2016 by maprcommunity
Branched to a new discussion
I've got Spark 1.1.0 running on my MapR 4.0.1 cluster, using Scala 2.10.4 and Java 1.7.0_71.

I'm trying to load a MapR-DB table into a Spark RDD.  I'm using the example code from https://www.mapr.com/developercentral/code/loading-hbase-tables-spark

When running this script in spark-shell, everything succeeds (including creating the MapR-DB table and inserting data), but the RDD returns a .count() of zero.  This also happens on existing tables in MapR-DB not created by this script.

(Note that "sc" is set automatically when starting spark shell via /opt/mapr/spark/spark-1.1.0/bin/spark-shell)

    import org.apache.spark._
    import org.apache.spark.rdd.NewHadoopRDD
    import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
    import org.apache.hadoop.hbase.client.HBaseAdmin
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HColumnDescriptor
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.HTable;
   
    val conf = HBaseConfiguration.create()
    
    val tableName = "/user/mapr/mmtest19"
    
    conf.addResource(new Path("file:///opt/mapr/hbase/hbase-0.94.21/conf/hbase-site.xml"))
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    
    // create m7 table with column family
    val admin = new HBaseAdmin(conf)
    if(!admin.isTableAvailable(tableName)) {
      print("Creating M7 Table")
      val tableDesc = new HTableDescriptor(tableName)
      tableDesc.addFamily(new HColumnDescriptor("cf1"
                                    .getBytes()));
      admin.createTable(tableDesc)
    }else{
      print("Table already exists!!")
      val columnDesc = new HColumnDescriptor("cf1");
      admin.disableTable(Bytes.toBytes(tableName));
      admin.addColumn(tableName, columnDesc);
      admin.enableTable(Bytes.toBytes(tableName));
    }
    
    //put data into table
    val myTable = new HTable(conf, tableName);
    for (i <- 0 to 500) {
      var p = new Put();
      p = new Put(new String("row" + i).getBytes());
      p.add("cf1".getBytes(), "column-1".getBytes(), new String(
                        "value " + i).getBytes());
      myTable.put(p);
    }
    myTable.flushCommits();
    
    //create rdd
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    
    //get the row count
    val count = hBaseRDD.count()
    print("HBase RDD count:"+count)

**Output**: HBase RDD count:0

Any thoughts?  My spark-env.sh does properly point to my Spark MapR Jar:

    SPARK_DAEMON_CLASSPATH=$SPARK_DAEMON_CLASSPATH::/opt/mapr/hadoop/hadoop-0.20.2/conf:/opt/mapr/hadoop/hadoop-0.20.2/lib/commons-configuration-1.6.jar:/opt/mapr/hadoop/hadoop-0.20.2/lib/commons-logging-1.0.4.jar:/opt/mapr/hadoop/hadoop-0.20.2/lib/guava-13.0.1.jar:/opt/mapr/hadoop/hadoop-0.20.2/lib/hadoop-0.20.2-dev-core.jar:/opt/mapr/hadoop/hadoop-0.20.2/lib/hadoop-auth-2.4.1-mapr-1408.jar:/opt/mapr/hadoop/hadoop-0.20.2/lib/mapr-hbase-4.0.1-mapr.jar:/opt/mapr/hadoop/hadoop-0.20.2/lib/zookeeper-3.4.5-mapr-1406.jar:/opt/mapr/hbase/hbase-0.94.21/conf:/opt/mapr/hbase/hbase-0.94.21/hbase-0.94.21-mapr-1409-tests.jar:/opt/mapr/lib/baseutils-4.0.1-mapr.jar:/opt/mapr/lib/commons-collections-3.2.1.jar:/opt/mapr/lib/commons-lang-2.5.jar:/opt/mapr/lib/hadoop-common-2.4.1.jar:/opt/mapr/lib/json-20080701.jar:/opt/mapr/lib/libprotodefs-4.0.1-mapr.jar:/opt/mapr/lib/maprfs-4.0.1-mapr.jar:/opt/mapr/lib/maprutil-4.0.1-mapr.jar:/opt/mapr/lib/protobuf-java-2.5.0.jar::/opt/mapr/hbase/hbase-0.94.21/hbase-0.94.21-mapr-1409.jar:/opt/mapr/hbase/hbase-0.94.21/conf/

Outcomes