AnsweredAssumed Answered

Syntax of option -files in hadoop script

Question asked by vladdv on Nov 2, 2012
Latest reply on Feb 13, 2013 by gera
I wrote this hadoop program

    package org.myorg;

import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ParallelIndexation {

public native long Traveser(String Path);
public native void Configure(String Path);
static
{
  System.loadLibrary("nativelib");
}
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
   private final static IntWritable zero = new IntWritable(0);
   private Text word = new Text();
   public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
   String line = value.toString();
   int CountComputers;
   //DataInputStream ConfigFile = new DataInputStream( new FileInputStream("countcomputers.txt"));
   FileInputStream fstream = new FileInputStream("/usr/countcomputers.txt"); // путќ к файлу
   DataInputStream in = new DataInputStream(fstream);
   BufferedReader br = new BufferedReader(new InputStreamReader(in));
   String result = br.readLine(); // цитаем как Ñтроку
   CountComputers = Integer.parseInt(result); // переводим Ñтроку в циÑло
   //CountComputers=ConfigFile.readInt();
   in.close();
   fstream.close();
   ArrayList<String> paths = new ArrayList<String>();
   StringTokenizer tokenizer = new StringTokenizer(line, "\n");
   while (tokenizer.hasMoreTokens())
   {
     paths.add(tokenizer.nextToken());
   }
   String[] ConcatPaths= new String[CountComputers];
   int NumberOfElementConcatPaths=0;
   if (paths.size()%CountComputers==0)
   {
    for (int i=0; i<CountComputers; i++)
    {
     ConcatPaths[i]=paths.get(NumberOfElementConcatPaths);
     NumberOfElementConcatPaths+=paths.size()/CountComputers;
     for (int j=1; j<paths.size()/CountComputers; j++)
     {
      ConcatPaths[i]+="\n"+paths.get(i*paths.size()/CountComputers+j);
     }
    }
   }
   else
   {
    NumberOfElementConcatPaths=0;
    for (int i=0; i<paths.size()%CountComputers; i++)
    {
     ConcatPaths[i]=paths.get(NumberOfElementConcatPaths);
     NumberOfElementConcatPaths+=paths.size()/CountComputers+1;         
     for (int j=1; j<paths.size()/CountComputers+1; j++)
     {
      ConcatPaths[i]+="\n"+paths.get(i*(paths.size()/CountComputers+1)+j);
     }        
    }
    for (int k=paths.size()%CountComputers; k<CountComputers; k++)
    {
     ConcatPaths[k]=paths.get(NumberOfElementConcatPaths);
     NumberOfElementConcatPaths+=paths.size()/CountComputers;         
     for (int j=1; j<paths.size()/CountComputers; j++)
     {
      ConcatPaths[k]+="\n"+paths.get((k-paths.size()%CountComputers)*paths.size()/CountComputers+paths.size()%CountComputers*(paths.size()/CountComputers+1)+j);
     }             
    }
   }
   //CountComputers=ConfigFile.readInt();
   for (int i=0; i<ConcatPaths.length; i++)
   {
    word.set(ConcatPaths[i]);
    output.collect(word, zero);
   }
   }
}



public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
   long count;
   String line = value.toString();
   ArrayList<String> ProcessedPaths = new ArrayList<String>();
   StringTokenizer tokenizer = new StringTokenizer(line, "\n");
   while (tokenizer.hasMoreTokens())
   {
     ProcessedPaths.add(tokenizer.nextToken());
   }      
   Configure("/etc/nsindexer.conf");
   for (int i=0; i<ProcessedPaths.size(); i++)
   {
    count=ParallelIndexation.this.Traveser(ProcessedPaths.get(i));
   }
   output.collect(key, new IntWritable(sum));
    }
}

public static void main(String[] args) throws Exception {
    JobConf conf = new JobConf(ParallelIndexation.class);
    conf.setJobName("parallelindexation");
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);
    conf.setMapperClass(Map.class);
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    JobClient.runJob(conf);
  }
}
As the code in the map and reduce functions are used text files / usr / countcomputers.txt and / etc / nsindexer.conf. I want these files in all local file systems, cluster nodes in the same places.How taking into account my wish the command of start of hadoop of the program with option use - files will look(./hadoop jar /export/hadoop-1.0.1/bin/ParallelIndexation.jar  org.myorg.ParallelIndexation input output  ......)?

Outcomes