AnsweredAssumed Answered

The loss of key-value pair of Map output

Question asked by vladdv on May 6, 2013
I wrote hadoop the program on which input of a mapper the text file `hdfs://192.168.1.8:7000/export/hadoop-1.0.1/bin/input/paths.txt` with the written ways of local file system (which it is identical on all computers of a cluster) the program `./readwritepaths` in one line and partitioned by the character `|`. At first in a mapper there is a reading quantity of the subordinate nodes of a cluster from the `/usr/countcomputers.txt` file, which equally 2 also it was read correctly, judging by program execution.  Further the contents of the input file arrived in the form of value on an input of a mapper and transformed to a line, are segmented by means of a separator `|` and the received ways are added in `ArrayList<String> paths`.

package org.myorg;

import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ParallelIndexation {
  public static class Map extends MapReduceBase implements
    Mapper<LongWritable, Text, Text, LongWritable> {
   private final static LongWritable zero = new LongWritable(0);
   private Text word = new Text();

   public void map(LongWritable key, Text value,
     OutputCollector<Text, LongWritable> output, Reporter reporter)
     throws IOException {
    String line = value.toString();
    int CountComputers;
    FileInputStream fstream = new FileInputStream(
      "/usr/countcomputers.txt");
    DataInputStream in = new DataInputStream(fstream);
    BufferedReader br = new BufferedReader(new InputStreamReader(in));
    String result=br.readLine();
    CountComputers=Integer.parseInt(result);
    in.close();
    fstream.close();
    System.out.println("CountComputers="+CountComputers);
    ArrayList<String> paths = new ArrayList<String>();
    StringTokenizer tokenizer = new StringTokenizer(line, "|");
    while (tokenizer.hasMoreTokens()) {
     paths.add(tokenizer.nextToken());
    }
Then for check I take out values of the `ArrayList<String> paths` elements to the `/export/hadoop-1.0.1/bin/readpathsfromdatabase.txt` file which contents are given below and which speaks about correctness of filling of `ArrayList<String> paths`.

    PrintWriter zzz = null;
          try
           {
                zzz = new PrintWriter(new FileOutputStream("/export/hadoop-1.0.1/bin/readpathsfromdatabase.txt"));
           }
           catch(FileNotFoundException e)
           {
                System.out.println("Error");
                System.exit(0);
           }
           for (int i=0; i<paths.size(); i++)
    {
      zzz.println("paths[" + i + "]=" + paths.get(i) + "\n");
           }
           zzz.close();
Then concatenation of these ways through character `\n` and record of connected results in array by `String[] ConcatPaths = new String [CountComputers]` is made.

   String[] ConcatPaths = new String[CountComputers];
   int NumberOfElementConcatPaths = 0;
   if (paths.size() % CountComputers == 0) {
    for (int i = 0; i < CountComputers; i++) {
     ConcatPaths[i] = paths.get(NumberOfElementConcatPaths);
     NumberOfElementConcatPaths += paths.size() / CountComputers;
     for (int j = 1; j < paths.size() / CountComputers; j++) {
      ConcatPaths[i] += "\n"
        + paths.get(i * paths.size() / CountComputers
          + j);
     }
    }
   } else {
    NumberOfElementConcatPaths = 0;
    for (int i = 0; i < paths.size() % CountComputers; i++) {
     ConcatPaths[i] = paths.get(NumberOfElementConcatPaths);
     NumberOfElementConcatPaths += paths.size() / CountComputers
       + 1;
     for (int j = 1; j < paths.size() / CountComputers + 1; j++) {
      ConcatPaths[i] += "\n"
        + paths.get(i
          * (paths.size() / CountComputers + 1)
          + j);
     }
    }
    for (int k = paths.size() % CountComputers; k < CountComputers; k++) {
     ConcatPaths[k] = paths.get(NumberOfElementConcatPaths);
     NumberOfElementConcatPaths += paths.size() / CountComputers;
     for (int j = 1; j < paths.size() / CountComputers; j++) {
      ConcatPaths[k] += "\n"
        + paths.get((k - paths.size() % CountComputers)
          * paths.size() / CountComputers
          + paths.size() % CountComputers
          * (paths.size() / CountComputers + 1)
          + j);
     }
    }
   }
I also take out array cells `String[] ConcatPaths` to the `/export/hadoop-1.0.1/bin/concatpaths.txt` file for check correctness of concatenation. The text of this file received and given below also speaks about correctness of the previous operation stages.

   PrintWriter zzz1 = null;
         try
          {
                zzz1 = new PrintWriter(new FileOutputStream("/export/hadoop-1.0.1/bin/concatpaths.txt"));
          }
          catch(FileNotFoundException e)
          {
                System.out.println("Error");
                System.exit(0);
          }
   for (int i = 0; i < ConcatPaths.length; i++)
   {
      zzz1.println("ConcatPaths[" + i + "]=" + ConcatPaths[i] + "\n");
   }
   zzz1.close();
On an output of a mapper array cells `String[] ConcatPaths` - connected ways arrive.

   for (int i = 0; i < ConcatPaths.length; i++)
   {
    word.set(ConcatPaths[i]);
    output.collect(word, zero);
   }
In reducers there is a partition of input keys on part by means of separator `\n` and record of the received ways in `ArrayList<String> ProcessedPaths`.

public static class Reduce extends MapReduceBase implements
   Reducer<Text, IntWritable, Text, LongWritable> {
  public native long Traveser(String Path);

  public native void Configure(String Path);

  public void reduce(Text key, Iterator<IntWritable> value,
    OutputCollector<Text, LongWritable> output, Reporter reporter)
    throws IOException {
   long count=0;
   String line = key.toString();
   ArrayList<String> ProcessedPaths = new ArrayList<String>();
   StringTokenizer tokenizer = new StringTokenizer(line, "\n");
   while (tokenizer.hasMoreTokens()) {
    ProcessedPaths.add(tokenizer.nextToken());
   }
Further for validation of separation of separate ways I bring elements out of connected keys
`ArrayList<String> ProcessedPaths` in the `/export/hadoop-1.0.1/bin/ProcessedPaths.txt` file. Contents of this file on both subordinate nodes appeared equally and represented separate ways from the second connected key and it in spite of the fact that on an output of a mepper arrived 2 different connected ways. But that the most surprising - as a result of operation of the subsequent lines of a reducer which realize file indexing on the received ways, that is introduction of words from these files in the database table, only one file - `/export/hadoop-1.0.1/bin/error.txt` which belongs to the first connected key was indexed.

   PrintWriter zzz2 = null;
          try
           {
                zzz2 = new PrintWriter(new FileOutputStream("/export/hadoop-1.0.1/bin/ProcessedPaths.txt"));
           }
           catch(FileNotFoundException e)
           {
                System.out.println("Error");
                System.exit(0);
           }
           for (int i=0; i < ProcessedPaths.size(); i++)
    {
     zzz2.println("ProcessedPaths[" + i + "]=" + ProcessedPaths.get(i) + "\n");
           }
           zzz2.close();  
   Configure("/etc/nsindexer.conf");
   for (int i = 0; i < ProcessedPaths.size(); i++) {
    count = Traveser(ProcessedPaths.get(i));
   }
   output.collect(key, new LongWritable(count));
Execution of the program happened to the help of the following of bash of a script

#!/bin/bash
cd /export/hadoop-1.0.1/bin
./hadoop namenode -format
./start-all.sh
./hadoop fs -rmr hdfs://192.168.1.8:7000/export/hadoop-1.0.1/bin/output
./hadoop fs -rmr hdfs://192.168.1.8:7000/export/hadoop-1.0.1/bin/input
./hadoop fs -mkdir hdfs://192.168.1.8:7000/export/hadoop-1.0.1/input
./readwritepaths
sleep 120
./hadoop fs -put /export/hadoop-1.0.1/bin/input/paths.txt hdfs://192.168.1.8:7000/export/hadoop-1.0.1/bin/input/paths.txt 1> copyinhdfs.txt 2>&1
./hadoop jar /export/hadoop-1.0.1/bin/ParallelIndexation.jar org.myorg.ParallelIndexation /export/hadoop-1.0.1/bin/input /export/hadoop-1.0.1/bin/output -D mapred.map.tasks=1 -D mapred.reduce.tasks=2 1> resultofexecute.txt 2>&1
according to which last command the mepper shall be one. But despite these files `/export/hadoop-
1.0.1/bin/readpathsfromdatabase.txt` and `/export/hadoop-1.0.1/bin/concatpaths.txt` appeared on both subordinate nodes. I give contents of above-mentioned files
`hdfs://192.168.1.8:7000/export/hadoop-1.0.1/bin/input/paths.txt`

/export/hadoop-1.0.1/bin/error.txt|/root/nexenta_search/nsindexer.conf|/root/nexenta_search/traverser.c|/root/nexenta_search/buf_read.c|/root/nexenta_search/main.c|/root/nexenta_search/avl_tree.c|
`/export/hadoop-1.0.1/bin/readpathsfromdatabase.txt`

paths[0]=/export/hadoop-1.0.1/bin/error.txt

paths[1]=/root/nexenta_search/nsindexer.conf

paths[2]=/root/nexenta_search/traverser.c

paths[3]=/root/nexenta_search/buf_read.c

paths[4]=/root/nexenta_search/main.c

paths[5]=/root/nexenta_search/avl_tree.c
`/export/hadoop-1.0.1/bin/concatpaths.txt`

ConcatPaths[0]=/export/hadoop-1.0.1/bin/error.txt
/root/nexenta_search/nsindexer.conf
/root/nexenta_search/traverser.c

ConcatPaths[1]=/root/nexenta_search/buf_read.c
/root/nexenta_search/main.c
/root/nexenta_search/avl_tree.c
`/export/hadoop-1.0.1/bin/ProcessedPaths.txt`

ProcessedPaths[0]=/root/nexenta_search/buf_read.c

ProcessedPaths[1]=/root/nexenta_search/main.c

ProcessedPaths[2]=/root/nexenta_search/avl_tree.c
In connection with all this I want to ask 3 questions:

 1.  Why texts of the `/export/hadoop-1.0.1/bin/ProcessedPaths.txt` files on both nodes were identical and such, as are provided here?
 2. Why only one file - `/export/hadoop-1.0.1/bin/error.txt` as a result was indexed?
 3. Why the mapper was executed on both subordinate nodes?

Outcomes