AnsweredAssumed Answered

Combiner not working properly in hadoop Hbase Scan

Question asked by sudarshan on Apr 12, 2017
Latest reply on Apr 13, 2017 by maprcommunity

Hi i am running an application which reads records from HBase and writes into text files .

I have used combiner in my application and custom partitioner also.

I have used 41 reducer in my application because i need to create 40 reducer output file that satisfies my condition .

All working fine but when i use combiner in my application it creates map output file per regions or per mapper .

For example i have 40 regions in my application so 40 mapper getting initiated then it create 40 map-output files .
But reducer is not able to combine all map-output and generate final output file.

Data in the files are correct but no of files increased .

Any idea how can i get only reducer output not mapper output .

// Reducer Class
job.setCombinerClass(CommonReducer.class);
job.setReducerClass(CommonReducer.class); // reducer class


below is my Job details


Submitted: Mon Apr 10 09:42:55 CDT 2017
Started: Mon Apr 10 09:43:03 CDT 2017
Finished: Mon Apr 10 10:11:20 CDT 2017
Elapsed: 28mins, 17sec
Diagnostics:
Average Map Time 6mins, 13sec
Average Shuffle Time 17mins, 56sec
Average Merge Time 0sec
Average Reduce Time 0sec


Here is my reducer logic

import java.io.IOException;
import org.apache.log4j.Logger;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class CommonCombiner extends Reducer<NullWritable, Text, NullWritable, Text> {

private Logger logger = Logger.getLogger(CommonCombiner.class);
private MultipleOutputs<NullWritable, Text> multipleOutputs;
String strName = "";
private static final String DATA_SEPERATOR = "\\|\\!\\|";

public void setup(Context context) {
logger.info("Inside Combiner.");
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}

@Override
public void reduce(NullWritable Key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

for (Text value : values) {
final String valueStr = value.toString();
StringBuilder sb = new StringBuilder();
if ("".equals(strName) && strName.length() == 0) {
String[] strArrFileName = valueStr.split(DATA_SEPERATOR);
String strFullFileName[] = strArrFileName[1].split("\\|\\^\\|");

strName = strFullFileName[strFullFileName.length - 1];


String strArrvalueStr[] = valueStr.split(DATA_SEPERATOR);
if (!strArrvalueStr[0].contains(HbaseBulkLoadMapperConstants.FF_ACTION)) {
sb.append(strArrvalueStr[0] + "|!|");
}
multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName);
context.getCounter(Counters.FILE_DATA_COUNTER).increment(1);
}
}
}
public void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}
}#

Outcomes