AnsweredAssumed Answered

Correctness of reading files and record of their contents

Question asked by vladdv on Apr 11, 2013
Latest reply on Apr 17, 2013 by peterconrad
I wrote following hadoop the program

package org.myorg;

import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ParallelIndexation {
  //public static native long Traveser(String Path);

  //public static native void Configure(String Path);

  //static {
  // System.loadLibrary("nativelib");
  //}
  public static class Map extends MapReduceBase implements
    Mapper<LongWritable, Text, Text, LongWritable> {
   private final static LongWritable zero = new LongWritable(0);
   private Text word = new Text();

   public void map(LongWritable key, Text value,
     OutputCollector<Text, LongWritable> output, Reporter reporter)
     throws IOException {
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    Path localPath = new Path("/export/hadoop-1.0.1/bin/input/paths.txt");
    Path hdfsPath=new Path("hdfs://192.168.1.8:7000/user/hadoop/paths.txt");
    Path localPath1 = new Path("/usr/countcomputers.txt");              
    Path hdfsPath1=new Path("hdfs://192.168.1.8:7000/user/hadoop/countcomputers.txt");
    if (!fs.exists(hdfsPath))
    {
     fs.copyFromLocalFile(localPath, hdfsPath);
    };
    if (!fs.exists(hdfsPath1))
    {
     fs.copyFromLocalFile(localPath1, hdfsPath1);
    };  
    FSDataInputStream in = fs.open(hdfsPath);
    BufferedReader br = new BufferedReader(new InputStreamReader(in));
    String line = br.readLine();
    // String line = value.toString();
    BufferedReader br1=new BufferedReader(new InputStreamReader(fs.open(hdfsPath1)));
    int CountComputers;
    /* FileInputStream fstream = new FileInputStream(
      "/usr/countcomputers.txt");
    DataInputStream input = new DataInputStream(fstream);
    BufferedReader br = new BufferedReader(new InputStreamReader(input)); */
    String result=br1.readLine();
    CountComputers=Integer.parseInt(result);
    // in.close();
    // fstream.close();
    ArrayList<String> paths = new ArrayList<String>();
    StringTokenizer tokenizer = new StringTokenizer(line, "|");
    while (tokenizer.hasMoreTokens()) {
     paths.add(tokenizer.nextToken());
    }
    for (int i=0; i<paths.size(); i++)
     {
      System.out.println(paths.get(i));
     }
    System.out.println(CountComputers);
    String[] ConcatPaths = new String[CountComputers];
    int NumberOfElementConcatPaths = 0;
    if (paths.size() % CountComputers == 0) {
     for (int i = 0; i < CountComputers; i++) {
      ConcatPaths[i] = paths.get(NumberOfElementConcatPaths);
      NumberOfElementConcatPaths += paths.size() / CountComputers;
      for (int j = 1; j < paths.size() / CountComputers; j++) {
       ConcatPaths[i] += "\n"
         + paths.get(i * paths.size() / CountComputers
           + j);
      }
     }
    } else {
     NumberOfElementConcatPaths = 0;
     for (int i = 0; i < paths.size() % CountComputers; i++) {
      ConcatPaths[i] = paths.get(NumberOfElementConcatPaths);
      NumberOfElementConcatPaths += paths.size() / CountComputers
        + 1;
      for (int j = 1; j < paths.size() / CountComputers + 1; j++) {
       ConcatPaths[i] += "\n"
         + paths.get(i
           * (paths.size() / CountComputers + 1)
           + j);
      }
     }
     for (int k = paths.size() % CountComputers; k < CountComputers; k++) {
      ConcatPaths[k] = paths.get(NumberOfElementConcatPaths);
      NumberOfElementConcatPaths += paths.size() / CountComputers;
      for (int j = 1; j < paths.size() / CountComputers; j++) {
       ConcatPaths[k] += "\n"
         + paths.get((k - paths.size() % CountComputers)
           * paths.size() / CountComputers
           + paths.size() % CountComputers
           * (paths.size() / CountComputers + 1)
           + j);
      }
     }
    }
    for (int i = 0; i < ConcatPaths.length; i++) {
     word.set(ConcatPaths[i]);
     output.collect(word, zero);
    }
    in.close();
   }
  }

  public static class Reduce extends MapReduceBase implements
    Reducer<Text, LongWritable, Text, LongWritable> {
   public native long Traveser(String Path);

   public native void Configure(String Path);

   public void reduce(Text key, Iterator<LongWritable> value,
     OutputCollector<Text, LongWritable> output, Reporter reporter)
     throws IOException {
    long count=0;
    String line = key.toString();
    ArrayList<String> ProcessedPaths = new ArrayList<String>();
    StringTokenizer tokenizer = new StringTokenizer(line, "\n");
    while (tokenizer.hasMoreTokens()) {
     ProcessedPaths.add(tokenizer.nextToken());
    }
    Configure("/export/hadoop-1.0.1/bin/nsindexer.conf");
    for (int i = 0; i < ProcessedPaths.size(); i++) {
     count = Traveser(ProcessedPaths.get(i));
    }
    output.collect(key, new LongWritable(count));
   }

   static {
    System.loadLibrary("nativelib");
   }
  }

  public static void main(String[] args) throws Exception {
   System.out.println("args[0]="+args[0]);
   JobConf conf = new JobConf(ParallelIndexation.class);
   conf.setJobName("parallelindexation");
   conf.setOutputKeyClass(Text.class);
   conf.setOutputValueClass(LongWritable.class);
   conf.setMapperClass(Map.class);
   conf.setCombinerClass(Reduce.class);
   conf.setReducerClass(Reduce.class);
   conf.setInputFormat(TextInputFormat.class);
   conf.setOutputFormat(TextOutputFormat.class);
   FileInputFormat.setInputPaths(conf, new Path(args[0]));
   FileOutputFormat.setOutputPath(conf, new Path(args[1]));
   JobClient.runJob(conf);
  }
}
On a step of Map copying of the local `/export/hadoop-1.0.1/bin/input/paths.txt` files (contains ways to the files, separated through a sign |) and `/usr/countcomputers.txt` is made (contains number of computers on which demons of `TaskTracker` and `DataNode`) in file system of `hdfs` are executed

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path localPath = new Path("/export/hadoop-1.0.1/bin/input/paths.txt");
Path hdfsPath=new Path("hdfs://192.168.1.8:7000/user/hadoop/paths.txt");
Path localPath1 = new Path("/usr/countcomputers.txt");              
Path hdfsPath1=new Path("hdfs://192.168.1.8:7000/user/hadoop/countcomputers.txt");
if (!fs.exists(hdfsPath))
{
  fs.copyFromLocalFile(localPath, hdfsPath);
};
if (!fs.exists(hdfsPath1))
{
  fs.copyFromLocalFile(localPath1, hdfsPath1);
};
Then reading from these files in the `CountComputers` variable and `paths` array is made

FSDataInputStream in = fs.open(hdfsPath);
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line = br.readLine();
// String line = value.toString();
BufferedReader br1=new BufferedReader(new InputStreamReader(fs.open(hdfsPath1)));
int CountComputers;
/* FileInputStream fstream = new FileInputStream(
"/usr/countcomputers.txt");
DataInputStream input = new DataInputStream(fstream);
BufferedReader br = new BufferedReader(new InputStreamReader(input)); */
String result=br1.readLine();
CountComputers=Integer.parseInt(result);
// in.close();
// fstream.close();
ArrayList<String> paths = new ArrayList<String>();
StringTokenizer tokenizer = new StringTokenizer(line, "|");
while (tokenizer.hasMoreTokens()) {
  paths.add(tokenizer.nextToken());
}
Then on the basis of this variable and an array concatenation of ways of an array and transmission of the concatenated ways to a `Reduce` function input is made. As I didn't find results of the `Reduce` function (in a database words from files with the specified ways shall be skidded), I assumed that the cause of error is that on a step of `Map` values in the `CountComputers` variable and `paths` array weren't correctly written. Therefore I added an output of this variable and an array

for (int i=0; i<paths.size(); i++)
{
  System.out.println(paths.get(i));
}
System.out.println(CountComputers);
And as a result of start of the program I received the following log file

args[0]=/export/hadoop-1.0.1/bin/input
13/04/12 10:35:30 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/04/12 10:35:30 INFO mapred.FileInputFormat: Total input paths to process : 0
13/04/12 10:35:30 INFO mapred.JobClient: Running job: job_201304121018_0003
13/04/12 10:35:31 INFO mapred.JobClient:  map 0% reduce 0%
13/04/12 10:35:45 INFO mapred.JobClient:  map 0% reduce 100%
13/04/12 10:35:50 INFO mapred.JobClient: Job complete: job_201304121018_0003
13/04/12 10:35:50 INFO mapred.JobClient: Counters: 15
13/04/12 10:35:50 INFO mapred.JobClient:   Job Counters
13/04/12 10:35:50 INFO mapred.JobClient:     Launched reduce tasks=1
13/04/12 10:35:50 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=9323
13/04/12 10:35:50 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/04/12 10:35:50 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/04/12 10:35:50 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=4459
13/04/12 10:35:50 INFO mapred.JobClient:   File Output Format Counters
13/04/12 10:35:50 INFO mapred.JobClient:     Bytes Written=0
13/04/12 10:35:50 INFO mapred.JobClient:   FileSystemCounters
13/04/12 10:35:50 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=21536
13/04/12 10:35:50 INFO mapred.JobClient:   Map-Reduce Framework
13/04/12 10:35:50 INFO mapred.JobClient:     Reduce input groups=0
13/04/12 10:35:50 INFO mapred.JobClient:     Combine output records=0
13/04/12 10:35:50 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/04/12 10:35:50 INFO mapred.JobClient:     Reduce output records=0
13/04/12 10:35:50 INFO mapred.JobClient:     Spilled Records=0
13/04/12 10:35:50 INFO mapred.JobClient:     Total committed heap usage (bytes)=16252928
13/04/12 10:35:50 INFO mapred.JobClient:     Combine input records=0
13/04/12 10:35:50 INFO mapred.JobClient:     Reduce input records=0
Why values of the specified variable and an array weren't removed? Whether reading from the file and record of the read values in a variable and an array is correct generally?

Outcomes