AnsweredAssumed Answered

Java API - Reducer receives different values for the same key multiple times when using Combiner class

Question asked by gianmario on Nov 20, 2013
I wrote a Map Reduce job in a Hadoop Java API (1.0.3) and running over a cluster of MapR nodes.
The job consists on summing all the values of a particular field (X) in my data and create a weighted distribution of other fields.

INPUT:

    1 field1_1 field2_1 field3_1 ... fieldX_1
    2 field1_2 field2_2 field3_2 ... fieldX_2
    3 field1_3 field2_3 field3_3 ... fieldX_3

Since one pair is emitted for any line in my data and that I need to use a single reducer (there are other reasons that forced me to this choice), I thought to set the same Reduce class as a Combiner.

For each of the following 2 problems I used the pair of type: <Text, FloatWritable> for both input and output.

**Total X summing:**

MAP OUTPUT:

    X fieldX_1
    X fieldX_2
    X fieldX_3
    X ...

REDUCE OUTPUT:

    X fieldX_1+fieldX_2+fieldX_3+...

The weird thing that happens is that the combiner/reducer receives the same key multiple times:

    X [fieldX_1 fieldX_1 fieldX_1 ... fieldX_1]
    X [fieldX_2 fieldX_2 fieldX_2 ...]
    X [fieldX_3 fieldX_3 fieldX_3 ...]
    X ...

I am sure about this because I am logging in the stderr all the <key, values> that are passed to each invocation of the reduce method for dubugging.

I want to add a more concrete example:

Data:

    1 field1_1 field2_1 field3_1 ... 10
    2 field1_2 field2_2 field3_2 ... 20
    3 field1_3 field2_3 field3_3 ... 30
    4 field1_1 field2_1 field3_1 ... 10
    5 field1_2 field2_2 field3_2 ... 40
    6 field1_3 field2_3 field3_3 ... 20
    ...

Map output:

    X 10
    X 20
    X 30
    X 10
    X 40
    X 20

Reduce input (with combiner):

    X [10 10 10 10]
    X [20 20 20]
    X [30 30 30 30 30 30 30]
    X [40 40]

Reduce output (with combiner):

    X 40
    X 60
    X 210
    X 80

X is a constant label (field name).
To note that the reducer is called with the same key X and collection of identical values of X, e.g. [10 10 10 ...] or [30 30 30...].
Each sum will be output separately. I mean the algorithm works fine but at this stage needs an additional reduce step to sum up the duplicates.

Real log example:

    Nov 06, 2013 8:50:12 AM MYCLASS logInputError
    WARNING: REDUCE-INPUT: X,[10.0]
    Nov 06, 2013 8:50:12 AM MYCLASS logOutputError
    WARNING: REDUCE-OUTPUT: X,10.0
    Nov 06, 2013 8:50:12 AM MYCLASS logInputError
    WARNING: REDUCE-INPUT: X,[25.865, 25.865]
    Nov 06, 2013 8:50:12 AM MYCLAS logOutputError
    WARNING: REDUCE-OUTPUT: X,51.73
    Nov 06, 2013 8:50:12 AM MYCLASS logInputError
    WARNING: REDUCE-INPUT: X,[1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4]
    Nov 06, 2013 8:50:12 AM MYCLASS logOutputError
    WARNING: REDUCE-OUTPUT: X,2.0289798E7
    Nov 06, 2013 8:50:12 AM MYCLASS logInputError
    WARNING: REDUCE-INPUT: X,[514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53]
    Nov 06, 2013 8:50:12 AM MYCLASS logOutputError
    WARNING: REDUCE-OUTPUT: X,6694929.0
    Nov 06, 2013 8:50:12 AM MYCLASS logInputError
    WARNING: REDUCE-INPUT: X,[1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5]
    Nov 06, 2013 8:50:12 AM MYCLASS logOutputError
    WARNING: REDUCE-OUTPUT: X,1.8702654E7

If I remove the Combiner everything works fine.
I understand that the Combiner may be invoked 0, 1 or multiple times, but what about the Reducer? It should be invoked exactly once, isn't it?

But even more weird is that I am repeating a similar procedure for the fields distribution, and this happens only for X summing problem...

**Weighted Field Distribution**

MAP OUTPUT (example field1):

    field1_1 X_1
    field1_2 X_2
    field1_3 X_3
    ...

REDUCE OUTPUT:

    field1(class1) fieldX(class1)+fieldX(class1)+fieldX(class1)+...
    field1(class2) fieldX(class2)+fieldX(class2)+fieldX(class2)+...
    field1(class3) fieldX(class3)+fieldX(class3)+fieldX(class3)+...
    ...

Essentially, for each value of field1 I am summing up all the associated values of fieldX and repeating the same procedure for several fields (field1, field2, field3...).

For those emitted pairs, the reducer receives the single key (field1(class1)) and the array of values ([fieldX(class1)...]) as should normally behave.


**Observations**

One consideration is that for the X summing problem the single key (X) maps a number of values equals to the size of the data (number of lines). Whilst, for the field weighted distribution the values are spread among the several class labels that field contains and never filled the memory so that the combiner is invoked.

Is it a bug in my code or there is some procedural detail of Hadoop that I am not considering?

Standing to the M/R paradigm a Reducer class should receive all the values of a particular key at once, not divided in more partitions.

Hope to receive good feedback.

Outcomes