AnsweredAssumed Answered

Reduce Merge Performance issues

Question asked by jerdavis on Oct 3, 2012
Latest reply on Oct 11, 2012 by gera
Hello,
I'm having some performance issues with Reduce Merge phase and I wonder if someone can take a look.
I have a 6 GB dataset (text), evenly distributed on the cluster, The dataset has two keys that I then GroupBy into two reducers (I'm using cascading). So each reducer has 3GB of data.
I give each reducer 12 GB of memory, but I'm still seeing a 20 minute merge phase.

Two questions: Shouldn't this merge be done entirely in memory (if I have 12 GB of Heap). Even without an in memory merge, 20 minutes seems like way way too long to merge 3GB, especially with 12 disks(JBOD) and 12 cores on a node. I'm wondering if I'm writing the partial merge data to the wrong place (HDFS, vs local?).

the MAPRFS_BYTES_READ, and MAPRFS_BYTES_WRITTEN are interesting.
The initial dataset is 6GB (which it shows in the Map column). Somehow sorting increases it to 17GB, which seems odd.
Then in the reduce Phase it's reading 23GB from MapRfs, and writing  17GB.
Should the reduce phase merge data be written to MapRFS or to the local FS?


--------------------
<pre>
Counter  Map  Reduce  Total
Job Counters  Aggregate execution time of mappers(ms)  0  0  29,887,359
Launched reduce tasks  0  0  2
Rack-local map tasks  0  0  4
Launched map tasks  0  0  353
Data-local map tasks  0  0  311
cascading.flow.SliceCounters  Read_Duration  329,399  366,004  695,403
Tuples_Read  252,000,000  67,896,295  319,896,295
Tuples_Written  252,000,000  0  252,000,000
Process_End_Time  476,294,761,317,139  0  476,294,761,317,139
Write_Duration  2,713,840  0  2,713,840
Process_Begin_Time  476,294,753,764,176  2,698,557,228,678  478,993,310,992,854
FileSystemCounters  MAPRFS_BYTES_READ  6,651,978,400  21,721,014,791  28,372,993,191
MAPRFS_BYTES_WRITTEN  17,044,716,578  17,044,701,398  34,089,417,976
FILE_BYTES_WRITTEN  19,046,005  107,748  19,153,753
Map-Reduce Framework  Map input records  252,000,000  0  252,000,000
Reduce shuffle bytes  0  16,980,659,887  16,980,659,887
Spilled Records  252,000,000  0  252,000,000
Map output bytes  16,540,701,046  0  16,540,701,046
CPU_MILLISECONDS  18,861,020  7,640,360  26,501,380
Map input bytes  6,644,947,675  0  6,644,947,675
Combine input records  0  0  0
SPLIT_RAW_BYTES  97,428  0  97,428
Reduce input records  0  67,896,295  67,896,295
Reduce input groups  0  2  2
Combine output records  0  0  0
PHYSICAL_MEMORY_BYTES  324,852,019,200  15,041,486,848  339,893,506,048
Reduce output records  0  0  0
VIRTUAL_MEMORY_BYTES  626,863,038,464  26,729,230,336  653,592,268,800
Map output records  252,000,000  0  252,000,000
GC time elapsed (ms)  1,568,523  76,636  1,645,159
cascading.flow.StepCounters  Tuples_Read  252,000,000  0  252,000,000
</pre>
----------------
<pre>
name value
fs.s3n.impl org.apache.hadoop.fs.s3native.NativeS3FileSystem
mapreduce.heartbeat.100 1000
mapred.task.cache.levels 2
hadoop.tmp.dir /tmp/hadoop-${user.name}
hadoop.native.lib true
map.sort.class org.apache.hadoop.util.QuickSort
mapreduce.jobtracker.recovery.dir /var/mapr/cluster/mapred/jobTracker/recovery
mapreduce.heartbeat.1000 10000
ipc.client.idlethreshold 4000
mapred.system.dir /var/mapr/cluster/mapred/jobTracker/system
mapreduce.cluster.reduce.userlog.retain-size 10485760
mapred.job.tracker.persist.jobstatus.hours 0
io.skip.checksum.errors false
fs.default.name maprfs:///
mapred.cluster.reduce.memory.mb -1
mapred.child.tmp ./tmp
fs.har.impl.disable.cache true
mapred.jobtracker.jobhistory.lru.cache.size 5
mapred.skip.reduce.max.skip.groups 0
cascading.flow.step.num 1
mapred.jobtracker.instrumentation org.apache.hadoop.mapred.JobTrackerMetricsInst
mapr.localvolumes.path /var/mapr/local
mapred.tasktracker.dns.nameserver default
io.sort.factor 50
mapred.output.value.groupfn.class cascading.tuple.hadoop.util.GroupingComparator
mapreduce.use.maprfs true
mapred.task.timeout 600000
mapred.max.tracker.failures 4
hadoop.rpc.socket.factory.class.default org.apache.hadoop.net.StandardSocketFactory
mapred.mapoutput.key.class cascading.tuple.io.TuplePair
fs.hdfs.impl org.apache.hadoop.hdfs.DistributedFileSystem
mapred.queue.default.acl-administer-jobs *
mapred.output.key.class org.apache.hadoop.io.Text
mapred.skip.map.auto.incr.proc.count true
mapred.map.runner.class cascading.flow.hadoop.FlowMapper
mapreduce.job.complete.cancel.delegation.tokens true
mapreduce.tasktracker.heapbased.memory.management false
io.mapfile.bloom.size 1048576
tasktracker.http.threads 2
mapred.job.shuffle.merge.percent 0.70
cascading.flow.id 853276BF02049D394C31880B08C9E6CC
mapred.child.renice 10
fs.ftp.impl org.apache.hadoop.fs.ftp.FTPFileSystem
user.name jdavis
mapred.fairscheduler.smalljob.max.inputsize 10737418240
mapred.output.compress false
io.bytes.per.checksum 512
mapred.healthChecker.script.timeout 600000
topology.node.switch.mapping.impl org.apache.hadoop.net.ScriptBasedMapping
mapred.reduce.slowstart.completed.maps 0.95
mapred.reduce.max.attempts 4
fs.ramfs.impl org.apache.hadoop.fs.InMemoryFileSystem
mapr.localoutput.dir output
mapred.skip.map.max.skip.records 0
mapred.jobtracker.port 9001
mapred.cluster.map.memory.mb -1
mapreduce.tasktracker.prefetch.maptasks 1.0
hadoop.security.group.mapping org.apache.hadoop.security.ShellBasedUnixGroupsMapping
mapreduce.tasktracker.task.slowlaunch false
mapred.job.tracker.persist.jobstatus.dir /var/mapr/cluster/mapred/jobTracker/jobsInfo
mapred.jar /var/mapr/cluster/mapred/jobTracker/staging/jdavis/.staging/job_201210022148_0086/job.jar
fs.s3.buffer.dir ${hadoop.tmp.dir}/s3
job.end.retry.attempts 0
fs.file.impl org.apache.hadoop.fs.LocalFileSystem
cascading.app.name omeg
mapred.local.dir.minspacestart 0
mapred.output.compression.type RECORD
fs.mapr.working.dir /user/$USERNAME/
fs.maprfs.impl com.mapr.fs.MapRFileSystem
fs.https.impl cascading.tap.hadoop.io.HttpFileSystem
topology.script.number.args 100
io.mapfile.bloom.error.rate 0.005
mapred.cluster.max.reduce.memory.mb -1
mapred.max.tracker.blacklists 4
mapred.task.profile.maps 0-2
mapred.userlog.retain.hours 24
mapred.job.tracker.persist.jobstatus.active false
hadoop.security.authorization false
local.cache.size 10737418240
mapred.min.split.size 0
mapred.map.tasks 353
mapred.tasktracker.task-controller.config.overwrite true
cascading.app.appjar.path /home/jdavis/tmp/omeg.jar
mapred.output.value.class org.apache.hadoop.io.Text
mapred.partitioner.class cascading.tuple.hadoop.util.GroupingPartitioner
mapreduce.maprfs.use.compression true
mapred.job.queue.name default
mapreduce.tasktracker.reserved.physicalmemory.mb.low 0.90
cascading.group.comparator.size 3
ipc.server.listen.queue.size 128
group.name common
mapred.inmem.merge.threshold 0
job.end.retry.interval 30000
mapred.fairscheduler.smalljob.max.maps 10
mapred.skip.attempts.to.start.skipping 2
fs.checkpoint.dir ${hadoop.tmp.dir}/dfs/namesecondary
mapred.reduce.tasks 2
mapred.merge.recordsBeforeProgress 10000
mapred.userlog.limit.kb 0
mapred.job.reduce.memory.mb -1
webinterface.private.actions true
io.sort.spill.percent 0.99
mapred.job.shuffle.input.buffer.percent 0.80
mapred.job.name [853276BF02049D394C31880B08C9E6CC/DCB7B555F1FC65C767B8E2CD716607AA] copyr/(1/1) /user/jdavis/ctest/end
mapred.map.tasks.speculative.execution false
hadoop.util.hash.type murmur
mapred.map.max.attempts 4
mapreduce.job.acl-view-job
mapred.job.tracker.handler.count 10
mapred.input.format.class cascading.tap.hadoop.io.MultiInputFormat
mapred.tasktracker.expiry.interval 600000
mapred.jobtracker.maxtasks.per.job -1
mapred.jobtracker.job.history.block.size 3145728
keep.failed.task.files false
mapred.output.format.class org.apache.hadoop.mapred.TextOutputFormat
ipc.client.tcpnodelay false
mapred.task.profile.reduces 0-2
mapred.output.compression.codec org.apache.hadoop.io.compress.DefaultCodec
io.map.index.skip 0
mapred.working.dir /user/jdavis
ipc.server.tcpnodelay false
hadoop.proxyuser.root.hosts *
mapred.reducer.class cascading.flow.hadoop.FlowReducer
cascading.app.id A593B4669179BB6F06771249E7ADFA48
mapred.used.genericoptionsparser true
jobclient.progress.monitor.poll.interval 1000
mapreduce.tasktracker.jvm.idle.time 10000
mapred.job.map.memory.mb -1
hadoop.logfile.size 10000000
mapred.reduce.tasks.speculative.execution false
mapreduce.job.dir maprfs:/var/mapr/cluster/mapred/jobTracker/staging/jdavis/.staging/job_201210022148_0086
mapreduce.tasktracker.outofband.heartbeat true
mapreduce.reduce.input.limit -1
mapred.tasktracker.ephemeral.tasks.ulimit 4294967296>
fs.s3n.block.size 67108864
fs.inmemory.size.mb 200
mapred.fairscheduler.smalljob.max.reducers 10
hadoop.security.authentication simple
fs.checkpoint.period 3600
cascading.flow.step.id DCB7B555F1FC65C767B8E2CD716607AA
mapred.job.reuse.jvm.num.tasks -1
mapred.jobtracker.completeuserjobs.maximum 5
mapreduce.cluster.map.userlog.retain-size 10485760
mapred.task.tracker.task-controller org.apache.hadoop.mapred.LinuxTaskController
mapred.output.key.comparator.class cascading.tuple.hadoop.util.GroupingSortingComparator
fs.s3.maxRetries 4
mapred.cluster.max.map.memory.mb -1
mapred.mapoutput.value.class cascading.tuple.Tuple
mapred.map.child.java.opts -XX:ErrorFile=/opt/cores/mapreduce_java_error%p.log
mapred.job.tracker.history.completed.location /var/mapr/cluster/mapred/jobTracker/history/done
mapred.local.dir /tmp/mapr-hadoop/mapred/local
fs.hftp.impl org.apache.hadoop.hdfs.HftpFileSystem
fs.trash.interval 0
fs.s3.sleepTimeSeconds 10
mapred.submit.replication 10
fs.har.impl org.apache.hadoop.fs.HarFileSystem
mapreduce.heartbeat.10 300
cascading.version Concurrent, Inc - Cascading 2.0.5
mapred.map.output.compression.codec org.apache.hadoop.io.compress.DefaultCodec
mapred.tasktracker.dns.interface default
hadoop.proxyuser.root.groups root
mapred.job.tracker maprfs:///
mapreduce.job.submithost c10-m001.wowrack.upstream.priv
mapreduce.tasktracker.cache.local.numberdirectories 10000
io.seqfile.sorter.recordlimit 1000000
mapreduce.heartbeat.10000 100000
mapred.line.input.format.linespermap 1
mapred.jobtracker.taskScheduler org.apache.hadoop.mapred.FairScheduler
mapred.tasktracker.instrumentation org.apache.hadoop.mapred.TaskTrackerMetricsInst
mapred.tasktracker.taskmemorymanager.killtask.maxRSS false
mapred.child.taskset true
jobclient.completion.poll.interval 5000
mapred.fairscheduler.smalljob.max.reducer.inputsize 1073741824
mapred.local.dir.minspacekill 0
io.sort.record.percent 0.28
mapr.localspill.dir spill
io.compression.codec.lzo.class com.hadoop.compression.lzo.LzoCodec
fs.kfs.impl org.apache.hadoop.fs.kfs.KosmosFileSystem
mapred.tasktracker.reduce.tasks.maximum (CPUS > 2) ? (CPUS * 0.70): 1
mapred.temp.dir ${hadoop.tmp.dir}/mapred/temp
mapred.tasktracker.ephemeral.tasks.maximum 1
fs.checkpoint.edits.dir ${fs.checkpoint.dir}
mapred.tasktracker.tasks.sleeptime-before-sigkill 5000
mapred.job.reduce.input.buffer.percent 0.0
mapred.tasktracker.indexcache.mb 10
mapreduce.task.classpath.user.precedence false
mapreduce.job.split.metainfo.maxsize -1
hadoop.logfile.count 10
fs.automatic.close true
mapred.skip.reduce.auto.incr.proc.count true
mapreduce.job.submithostaddress 10.100.0.99
mapred.child.oom_adj 10
io.seqfile.compress.blocksize 1000000
fs.s3.block.size 67108864
mapred.tasktracker.taskmemorymanager.monitoring-interval 3000
mapreduce.tasktracker.volume.healthcheck.interval 60000
mapred.cluster.ephemeral.tasks.memory.limit.mb 200
mapreduce.jobtracker.staging.root.dir /var/mapr/cluster/mapred/jobTracker/staging
mapred.acls.enabled false
mapred.queue.default.state RUNNING
mapred.fairscheduler.smalljob.schedule.enable false
mapred.queue.names default
fs.hsftp.impl org.apache.hadoop.hdfs.HsftpFileSystem
mapred.fairscheduler.eventlog.enabled false
mapreduce.jobtracker.recovery.maxtime 480
mapred.task.tracker.http.address 0.0.0.0:50060
mapreduce.jobtracker.inline.setup.cleanup false
mapred.reduce.parallel.copies 40
io.seqfile.lazydecompress true
mapred.tasktracker.ephemeral.tasks.timeout 10000
mapred.output.dir maprfs:/user/jdavis/ctest/end
mapreduce.tasktracker.group root
hadoop.workaround.non.threadsafe.getpwuid false
io.sort.mb 512
mapred.reduce.child.java.opts -Xmx12000m
ipc.client.connection.maxidletime 10000
mapred.compress.map.output false
hadoop.security.uid.cache.secs 14400
mapred.task.tracker.report.address 127.0.0.1:0
mapred.healthChecker.interval 60000
ipc.client.kill.max 10
ipc.client.connect.max.retries 10
fs.http.impl cascading.tap.hadoop.io.HttpFileSystem
fs.s3.impl org.apache.hadoop.fs.s3.S3FileSystem
mapred.fairscheduler.assignmultiple true
mapred.user.jobconf.limit 5242880
mapred.input.dir maprfs:/user/jdavis/ctest/mid
mapred.job.tracker.http.address 0.0.0.0:50030
io.file.buffer.size 131072
mapred.jobtracker.restart.recover true
io.serializations cascading.tuple.hadoop.TupleSerialization,org.apache.hadoop.io.serializer.WritableSerialization
mapreduce.use.fastreduce false
mapred.reduce.copy.backoff 300
mapred.task.profile false
mapred.jobtracker.retiredjobs.cache.size 300
jobclient.output.filter FAILED
mapred.tasktracker.map.tasks.maximum (CPUS > 2) ? (CPUS * 0.80) : 1
io.compression.codecs org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec
fs.checkpoint.size 67108864
cascading.sort.comparator.size 3
</pre>
-------------------
<pre>
2012-10-02 19:30:50,676 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing JVM Metrics with processName=SHUFFLE, sessionId=
2012-10-02 19:30:50,737 INFO org.apache.hadoop.mapreduce.util.ProcessTree: setsid exited with exit code 0
2012-10-02 19:30:50,742 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: /proc/<pid>/status does not have information about swap space used(VmSwap). Can not track swap usage of a task.
2012-10-02 19:30:50,742 INFO org.apache.hadoop.mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.mapreduce.util.LinuxResourceCalculatorPlugin@27b62aab
2012-10-02 19:30:50,903 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 9115 may have finished in the interim.
2012-10-02 19:31:01,663 INFO org.apache.hadoop.mapred.Merger: Merging 37 sorted segments
2012-10-02 19:31:01,672 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 36 segments left of total size: 1204882102 bytes
2012-10-02 19:31:03,079 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 7596 may have finished in the interim.
2012-10-02 19:31:15,487 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 4803 may have finished in the interim.
2012-10-02 19:31:15,489 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 11069 may have finished in the interim.
2012-10-02 19:33:37,821 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 20846 may have finished in the interim.
2012-10-02 19:33:59,274 INFO org.apache.hadoop.mapred.Merger: Merging 35 sorted segments
2012-10-02 19:33:59,275 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 35 segments left of total size: 1176895576 bytes
2012-10-02 19:34:02,131 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 21791 may have finished in the interim.
2012-10-02 19:34:29,927 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 22847 may have finished in the interim.
2012-10-02 19:36:32,181 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 30438 may have finished in the interim.
2012-10-02 19:37:18,243 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 3852 may have finished in the interim.
2012-10-02 19:37:26,292 INFO org.apache.hadoop.mapred.Merger: Merging 37 sorted segments
2012-10-02 19:37:26,293 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 37 segments left of total size: 1233203028 bytes
2012-10-02 19:39:07,695 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 9813 may have finished in the interim.
2012-10-02 19:39:10,764 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 10045 may have finished in the interim.
2012-10-02 19:39:56,829 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 17383 may have finished in the interim.
2012-10-02 19:40:18,295 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 19584 may have finished in the interim.
2012-10-02 19:40:32,307 INFO org.apache.hadoop.mapred.Merger: Merging 58 sorted segments
2012-10-02 19:40:32,308 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 44 segments left of total size: 1206978885 bytes
2012-10-02 19:41:35,154 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 26361 may have finished in the interim.
2012-10-02 19:43:53,644 INFO org.apache.hadoop.mapred.Merger: Merging 56 sorted segments
2012-10-02 19:43:53,645 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 56 segments left of total size: 1217287352 bytes
2012-10-02 19:46:55,246 INFO org.apache.hadoop.mapred.Merger: Merging 44 sorted segments
2012-10-02 19:46:55,246 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 44 segments left of total size: 1221163604 bytes
2012-10-02 19:49:57,894 INFO org.apache.hadoop.mapred.Merger: Merging 85 sorted segments
2012-10-02 19:49:57,895 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 62 segments left of total size: 1229975233 bytes
2012-10-02 19:52:09,914 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 25247 may have finished in the interim.
2012-10-02 19:52:52,620 INFO org.apache.hadoop.mapred.Merger: Merging 1 sorted segments
2012-10-02 19:52:52,620 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 32065409 bytes
2012-10-02 19:52:53,327 INFO org.apache.hadoop.mapred.Merger: Merging 8 sorted segments
2012-10-02 19:52:53,345 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 8 segments left of total size: 8522450575 bytes
2012-10-02 19:52:53,366 INFO cascading.flow.hadoop.FlowReducer: cascading version: Concurrent, Inc - Cascading 2.0.5
</pre>

Outcomes