AnsweredAssumed Answered

Hadoop : java.io.IOException: Pass a Delete or a Put

Question asked by ans4175 on Jan 5, 2015
Latest reply on Jan 6, 2015 by ans4175
I got these error log on console

    java.io.IOException: Pass a Delete or a Put
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:586)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)
    15/01/06 14:13:34 INFO mapred.JobClient: Job complete: job_local259887539_0001
    15/01/06 14:13:34 INFO mapred.JobClient: Counters: 19
    15/01/06 14:13:34 INFO mapred.JobClient:   File Input Format Counters
    15/01/06 14:13:34 INFO mapred.JobClient:     Bytes Read=0
    15/01/06 14:13:34 INFO mapred.JobClient:   FileSystemCounters
    15/01/06 14:13:34 INFO mapred.JobClient:     FILE_BYTES_READ=12384691
    15/01/06 14:13:34 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=12567287
    15/01/06 14:13:34 INFO mapred.JobClient:   Map-Reduce Framework
    15/01/06 14:13:34 INFO mapred.JobClient:     Reduce input groups=0
    15/01/06 14:13:34 INFO mapred.JobClient:     Map output materialized bytes=8188
    15/01/06 14:13:34 INFO mapred.JobClient:     Combine output records=0
    15/01/06 14:13:34 INFO mapred.JobClient:     Map input records=285
    15/01/06 14:13:34 INFO mapred.JobClient:     Reduce shuffle bytes=0
    15/01/06 14:13:34 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
    15/01/06 14:13:34 INFO mapred.JobClient:     Reduce output records=0
    15/01/06 14:13:34 INFO mapred.JobClient:     Spilled Records=285
    15/01/06 14:13:34 INFO mapred.JobClient:     Map output bytes=7612
    15/01/06 14:13:34 INFO mapred.JobClient:     Total committed heap usage (bytes)=1029046272
    15/01/06 14:13:34 INFO mapred.JobClient:     CPU time spent (ms)=0
    15/01/06 14:13:34 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
    15/01/06 14:13:34 INFO mapred.JobClient:     SPLIT_RAW_BYTES=77
    15/01/06 14:13:34 INFO mapred.JobClient:     Map output records=285
    15/01/06 14:13:34 INFO mapred.JobClient:     Combine input records=0
    15/01/06 14:13:34 INFO mapred.JobClient:     Reduce input records=0


When I'm trying to make CopyTable with Scala implementation based on http://hbase.apache.org/book/mapreduce.example.html#mapreduce.example.readwrite

Here's example my code, is there anyway better than doing like this ?

    

    package com.example
     
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.HBaseAdmin
    import org.apache.hadoop.hbase.client.HTable
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.hbase.client.Get
    import java.io.IOException
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase._
    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.io._
    import org.apache.hadoop.hbase.mapreduce._
    import org.apache.hadoop.io._
    import org.apache.hadoop.mapreduce._
    import scala.collection.JavaConversions._
     
    case class HString(name: String) {
            lazy val bytes = name.getBytes
            override def toString = name
    }
    object HString {
            import scala.language.implicitConversions
            implicit def hstring2String(src: HString): String = src.name
            implicit def hstring2Bytes(src: HString): Array[Byte] = src.bytes
    }
     
    object Families {
            val stream = HString("stream")
            val identity = HString("identity")
    }
    object Qualifiers {
            val title = HString("title")
            val url = HString("url")
            val media = HString("media")
            val media_source = HString("media_source")
            val content = HString("content")
            val nolimitid_timestamp = HString("nolimitid.timestamp")
            val original_id = HString("original_id")
            val timestamp = HString("timestamp")
            val date_created = HString("date_created")
            val count = HString("count")
    }
    object Tables {
            val rawstream100 = HString("raw_stream_1.0.0")
            val rawstream = HString("rawstream")
    }
     
    class tmapper extends TableMapper[ImmutableBytesWritable, Put]{
      def map (row: ImmutableBytesWritable, value: Result, context: Context) {
        val put = new Put(row.get())
     for (kv <- value.raw()) {
      put.add(kv)
     }
        context.write(row, put)
      }
    }
     
    object Hello {
      val hbaseMaster = "127.0.0.1:60000"
      val hbaseZookeper = "127.0.0.1"
      def main(args: Array[String]): Unit = {
            val conf = HBaseConfiguration.create()
        conf.set("hbase.master", hbaseMaster)
        conf.set("hbase.zookeeper.quorum", hbaseZookeper)
        val hbaseAdmin = new HBaseAdmin(conf)
          
        val job = Job.getInstance(conf, "CopyTable")
        job.setJarByClass(classOf[Hello])
        job.setMapperClass(classOf[tmapper])
        job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setMapOutputValueClass(classOf[Result])
        //
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setOutputValueClass(classOf[Put])
           
            val scan = new Scan()
            scan.setCaching(500)         // 1 is the default in Scan, which will be bad for MapReduce jobs
            scan.setCacheBlocks(false)   // don't set to true for MR jobs
             
            TableMapReduceUtil.initTableMapperJob(
              Tables.rawstream100.bytes,     // input HBase table name
              scan,                      // Scan instance to control CF and attribute selection
              classOf[tmapper],  // mapper class
              null,             // mapper output key class
              null,     // mapper output value class
              job
            )
             
            TableMapReduceUtil.initTableReducerJob(
              Tables.rawstream,          // Table name
              null, // Reducer class
              job
            )
            val b = job.waitForCompletion(true);
            if (!b) {
                throw new IOException("error with job!");
            }
      }
    }
     
    class Hello {}

Thank you again

Outcomes