AnsweredAssumed Answered

Speculative task makes the default JobQueueTaskScheduler scheduling becomes unreasonable

Question asked by anlly on Sep 25, 2013
Speculative task makes the default JobQueueTaskScheduler scheduling becomes unreasonable

Speculative task resulted in a resource is abundant, using the default scheduler, still prone to (map, reduce) task pend.
The Cluster configuration : 3 tasktracker, 12 reduce slot per node.

 
In the job queue has only 2 jobs:
job_201309221020_0357's eleven reduce tasks are running, and  job_201309221020_0358 has a reduce in the pending state;
but my cluster, a total of 36 slot, why does job_201309221020_0358 need to be pending ?
Job_201309221020_0358 has been waiting for 2 minutes, and finally in the job_201309221020_0357 has completed a reduce task after the operation .

Check the operation log and scheduling algorithm source code, found that may be because "Speculative task" lead to scheduling algorithm default becomes less.


The task_201309221020_0357_r_000006 task actual start of two attmept (attempt_201309221020_0357_r_000006_0, attempt_201309221020_0357_r_000006_1), so although the job_201309221020_0357 only eleven reduce tasks, but since the opening Speculative task, causing it to the actual occupation of twelve slot (four slots per node), so the currently running   12 slots.

According to the default scheduling algorithm, completed the reduce tasks running job_201309221020_0358 reduce task must wait for job_201309221020_0357‘s a reduce task, otherwise it will always be pending.So the default scheduling algorithm is not suitable for open "Speculative task" ï¼ÿ

 
<code>
JobQueueTaskScheduler  :
 
double reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
//remainingReduceLoad   job queue:job_201309221020_0357's running Reduce + job_201309221020_0358's pending Reduce = 12
//clusterReduceCapacity  : 36
//reduceLoadFactor=12/36=0.3333333333333333
 
final int trackerCurrentReduceCapacity =
    Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),   trackerReduceCapacity);
//trackerReduceCapacity  running slot:  job_201309221020_0357 ---   12 slots
//trackerCurrentReduceCapacity=ceil(0.3333333333333333*12)=4
    
    
final int availableReduceSlots =
      Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
//trackerRunningReduces   : 4 slots per node
//availableReduceSlots=Math.min((4 - 4), 1)=0
 
boolean exceededReducePadding = false;
if(availableReduceSlots > 0) {   // if job_201309221020_0357's reduce tasks is running ,the availableReduceSlots is always less 1
exceededReducePadding = exceededPadding(false, clusterStatus, trackerReduceCapacity);       
synchronized (jobQueue) {
  LOG.debug("try to assign 1 reduce task to TaskTracker["+taskTracker.trackerName+"]..");
  for (JobInProgress job : jobQueue) {
   if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) {
   continue;
}
... ...

Outcomes