Submitting Mapreduce job to cluster without installing MapR client

Document created by snayeem Employee on Feb 27, 2016
Version 1Show Document
  • View in full screen mode

Author: Sanjamala Nayeem

Original Publication Date: May 10, 2015

Env:

  • MapR v3.1.1+patch (mapr-patch-3.1.1.26113.GA-31296 onwards)

Goal:

To submit Mapreduce job from application without installing MapR client for un-secure cluster.

Solutions:

To achieve this MapR has provided following configuration parameter to set cluster details at run time which allows to connect to that cluster.

conf.set("dfs.nameservices", clusterName); 
conf.set("dfs.ha.namenodes." + clusterName, "cldb1,cldb2,cldb3");
conf.set("dfs.namenode.rpc-address." + clusterName + ".cldb1",cldb1node + ":7222");
  • In above snippet, we need to provide cluster name against dfs.nameservices.
  • Need to specify how many CLDB do we have, if we have HA implemented for CLDB and in this example we are having three cldbs, configured as cldb1,cldb2.cldb3. and against dfs.namenodes.<clustername>
  • Hostname of cldb or ip address of cldb with port  7222 need to be set against dfs.namenode.rpc-address.<clustername>.<cldbno>

 

import java.io.IOException;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Reducer.Context;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

 

 

public class SampleMapReduc {

 

 

public static void main(String[] args) {

System.out.println("Check for MR job!!");

  MapReduceApp app = new MapReduceApp();

  try{

  app.intitializeJob(new Path("/mapr/gauth/input"),new Path("/mapr/gauth/output"+System.currentTimeMillis()));

  app.job.waitForCompletion(true);

  }catch(Exception ex){

  ex.printStackTrace();

  }

}

}

class MapReduceApp {

 

  public Job job = null;

 

 

  public static class Mapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {

  private static final IntWritable one = new IntWritable(1);

  private Text word = new Text();

 

  public void map(LongWritable key, Text value, Context context)

  throws IOException, InterruptedException {

  String[] str = value.toString().split(" ");

  for (String st : str) {

  word.set(st.trim());

  context.write(word, one);

  }

  }

  }

 

 

  public static class Reducer1 extends

  Reducer<Text, IntWritable, Text, IntWritable> {

  public void reduce(Text key, Iterable<IntWritable> values,

  Context context) throws IOException, InterruptedException {

  int count = 0;

  for (IntWritable val : values) {

  count += val.get();

  }

  context.write(key, new IntWritable(count));

  }

  }

 

  public void intitializeJob(Path inputPath, Path outputPath)

  throws Exception {

  Configuration conf = new Configuration();

String clusterName = "gauth";

String cldb1node = "10.10.70.148";

String cldb2node = "10.10.70.149";

String cldb3node = "10.10.70.150";

 

conf.setBoolean("fs.mapr.impl.clustername.unique", false);

 

  conf.set("dfs.nameservices", clusterName);

conf.set("dfs.ha.namenodes." + clusterName, "cldb1,cldb2,cldb3");

conf.set("dfs.namenode.rpc-address." + clusterName + ".cldb1",cldb1node + ":7222");

conf.set("dfs.namenode.rpc-address." + clusterName + ".cldb2",cldb2node + ":7222");

conf.set("dfs.namenode.rpc-address." + clusterName + ".cldb3",cldb3node + ":7222");

  job = Job.getInstance(conf);

  job.setJobName("MapReduceApp");

  job.setJarByClass(MapReduceApp.class);

 

  job.setMapperClass(Mapper1.class);

  job.setCombinerClass(Reducer1.class);

  job.setReducerClass(Reducer1.class);

 

  job.setMapOutputKeyClass(Text.class);

  job.setMapOutputValueClass(IntWritable.class);

  job.setOutputKeyClass(Text.class);

  job.setOutputValueClass(IntWritable.class);

 

  job.setNumReduceTasks(1);

 

  job.setOutputFormatClass(TextOutputFormat.class);

  job.setInputFormatClass(TextInputFormat.class);

 

  FileInputFormat.addInputPath(job, inputPath);

  FileOutputFormat.setOutputPath(job, outputPath);

 

 

  }

 

 

}

  • In above sample word count map reduce job is implemented in class MapReduceApp
  • Job is submitted to cluster at app.job.waitForCompletion(true);
  • In this sample input path and output path are hard coded to /mapr/gauth/input and /mapr/gauth/output.

Attachments

    Outcomes