AnsweredAssumed Answered

java.lang.RuntimeException: java.lang.ClassNotFoundException:

Question asked by abhinav on Apr 3, 2013
Latest reply on Apr 3, 2013 by Ted Dunning
I have following driver code which produces ClassNotFoundException on all task trackers. All the code tries to do is create number of map tasks specified by the user and allocate map tasks in round robin fashion.

    import java.io.*;
    import java.util.*;
    import java.net.*;
    
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.util.*;
    import org.apache.hadoop.conf.*;
    import org.apache.commons.logging.*;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.ClusterStatus;
    import org.apache.hadoop.filecache.*;
    import org.apache.hadoop.mapreduce.lib.output.*;
    
    public class ComputeIntensiveDemo
    {
     private static final Log log = LogFactory.getLog(ComputeIntensiveDemo.class);
     public static class ReadingMapper extends Mapper<LongWritable,NullWritable,LongWritable,Text>
     {
      protected Configuration conf;
      private RandomAccessFile fis;
      private long lenOfFile;
      Path[] localPath;
      Log logger = LogFactory.getLog(ReadingMapper.class);
      public void map(LongWritable key, NullWritable value, Context context) throws IOException,InterruptedException
      {
       //Read line from the file and print it out to context
       Text curLine;
       fis.seek(0);
       while(fis.getFilePointer() < lenOfFile)
       {
        curLine = new Text(fis.readLine());
        logger.debug("Read line " + curLine.toString());
        logger.debug("Line position " + fis.getFilePointer());
        context.write(key,curLine);
       }
      }
    
      protected void setup(Context ctx) throws IOException, InterruptedException
      {
       Configuration conf = ctx.getConfiguration();
       localPath = DistributedCache.getLocalCacheFiles(conf);
       if(localPath == null || localPath.length == 0)
        throw new FileNotFoundException("There are no files in distributed cache");
       fis = new RandomAccessFile(localPath[0].toString(),"r");
       lenOfFile=fis.length();
       logger.debug("Opened file " + localPath[0].toString());
      }
     
      protected void cleanup(Context ctx) throws  IOException, InterruptedException
      {
       fis.close();
       logger.debug("Closed file " + localPath[0].toString());
      }
     }
    
     static class NullInputFormat extends InputFormat<LongWritable,NullWritable>
     {
      private static final Log log = LogFactory.getLog(NullInputFormat.class);
      private int posServerList;
    
      public RecordReader<LongWritable,NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException
      { return new NullRecordReader(); }
    
      public List<InputSplit> getSplits(JobContext job)
      {
       ArrayList<String> listServers = getActiveServerList(job);
       int numServers = listServers.size();
       int numSplits = ((JobConf) job.getConfiguration()).getNumMapTasks();
       log.info("Generating "+ numSplits +" splits");
       List<InputSplit> splits = new ArrayList<InputSplit>();
       for(int split=0, posServerList=0; split < numSplits; split++, posServerList=(posServerList+1)%numServers)
       {
        String[] servers = new String[1];
        servers[0] = listServers.get(posServerList);
        splits.add(new NullInputSplit(split,1,servers));
        log.debug("Found split "+split+" on "+posServerList+" "+servers[0]);
       }
       return splits;
      }
     
      private ArrayList<String> getActiveServerList(JobContext ctx)
      {
       ArrayList<String> trackers =  new ArrayList<String>();
       try
       {
        JobClient jc = new JobClient((JobConf)ctx.getConfiguration());
        ClusterStatus status = jc.getClusterStatus(true);
        Collection<String> st = status.getActiveTrackerNames();
        ArrayList<String> activeTrackers = (ArrayList<String>)status.getActiveTrackerNames();
        for(String tracker : activeTrackers)
        {
         String currentTracker = tracker.substring(tracker.indexOf("_")+1,tracker.indexOf(":"));
         trackers.add(currentTracker);
         log.debug(currentTracker);
        }
       }
       catch (Exception e) { e.printStackTrace(); }
       return trackers;
      }
     }
    
     static class NullInputSplit extends InputSplit implements Writable
     {
      long firstRow,rowCount;
      String[] locations;
      public NullInputSplit() {}
      public NullInputSplit(long offset, long length,String[] hosts)
      {
       firstRow = offset;
       rowCount = length;
       locations = hosts;
      }
    
      public long getLength() throws IOException { return 0l; }
      public String[] getLocations() throws IOException { return locations; }
      public void readFields(DataInput in) throws IOException
      {
       firstRow = WritableUtils.readVLong(in);
       rowCount = WritableUtils.readVLong(in);
      }
      public void write(DataOutput out) throws IOException
      {
       WritableUtils.writeVLong(out, firstRow);
       WritableUtils.writeVLong(out, rowCount);
      }
     }
    
     static class NullRecordReader extends RecordReader<LongWritable,NullWritable>
     {
        long startRow,finishedRows,totalRows;
      LongWritable key = null;
    
      public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException
      {
       startRow = ((NullInputSplit)split).firstRow;
       finishedRows = 0;
       totalRows = ((NullInputSplit)split).rowCount;
      }
      public void close() throws IOException { }
      public LongWritable getCurrentKey() { return key; }
      public NullWritable getCurrentValue() { return NullWritable.get(); }
      public float getProgress() throws IOException { return finishedRows / (float)  totalRows; }
      public boolean nextKeyValue()
      {
       if(key == null)
        key = new LongWritable();
       if (finishedRows < totalRows)
       {
        key.set(startRow + finishedRows);
        finishedRows += 1;
        return true;
       }
       else
        return false;
      }
     }
    
     public static void main(String[] args) throws Exception
     {
      Configuration conf = new Configuration();
      String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
      if(otherArgs.length != 3)
      {
       System.out.println("ComputeIntensiveDemo <input file> <output path> <num mappers>");
       System.exit(1);
      }
      conf.set("mapred.map.tasks",otherArgs[2]);
      conf.set("FILE_IN_MFS_CACHE",otherArgs[0]);
      DistributedCache.addCacheFile(new URI(otherArgs[0]),conf);
      Job job = new Job(conf,"splitter test");
      FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
      job.setJarByClass(ComputeIntensiveDemo.class);
      job.setInputFormatClass(NullInputFormat.class);
      job.setMapperClass(ReadingMapper.class);
      job.setNumReduceTasks(0); //Map only job
      job.setOutputKeyClass(LongWritable.class);
      job.setOutputValueClass(Text.class);
      job.setOutputFormatClass(TextOutputFormat.class);
      job.waitForCompletion(true);
     }
    }
    
Here is the output from a sample job run

    hadoop jar compute.jar ComputeIntensiveDemo /user/root/intsource.txt /user/root/wcout3 10
    13/04/03 14:07:06 INFO fs.JobTrackerWatcher: Current running JobTracker is: mapr-centos1.abhinav.prv/10.10.80.231:9001
    13/04/03 14:07:06 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
    13/04/03 14:07:06 INFO fs.JobTrackerWatcher: Current running JobTracker is: mapr-centos1.abhinav.prv/10.10.80.231:9001
    13/04/03 14:07:06 INFO ComputeIntensiveDemo$NullInputFormat: Generating 10 splits
    13/04/03 14:07:06 INFO mapred.JobClient: Creating job's output directory at /user/root/wcout3
    13/04/03 14:07:06 INFO mapred.JobClient: Creating job's user history location directory at /user/root/wcout3/_logs
    13/04/03 14:07:06 INFO mapred.JobClient: Running job: job_201303291030_0029
    13/04/03 14:07:07 INFO mapred.JobClient:  map 0% reduce 0%
    13/04/03 14:07:15 INFO mapred.JobClient: Task Id : attempt_201303291030_0029_m_000000_0, Status : FAILED on node mapr-centos1.abhinav.prv
    java.lang.RuntimeException: java.lang.ClassNotFoundException: ComputeIntensiveDemo$ReadingMapper
            at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1092)
            at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:212)
            at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:636)
            at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
            at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:396)
            at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
            at org.apache.hadoop.mapred.Child.main(Child.java:264)
    Caused by: java.lang.ClassNotFoundException: ComputeIntensiveDemo$ReadingMapper
            at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
            at java.security.AccessController.doPrivileged(Native Method)
            at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
            at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
            at java.lang.Class.forName0(Native Method)
            at java.lang.Class.forName(Class.java:247)
            at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1039)
            at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1090)
            ... 8 more

Outcomes