Changes between Version 12 and Version 13 of schedule


Ignore:
Timestamp:
10/19/2012 11:32:15 AM (14 years ago)
Author:
liaojiaohe
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • schedule

    v12 v13  
    1616修改JobQueueManager,jobAdded 函数 
    1717 
    18  
     18修改前 
    1919{{{ 
    2020  @Override 
     
    3939  } 
    4040}}} 
     41 
     42修改后 
     43 
     44{{{ 
     45 @Override 
     46  public void jobAdded(JobInProgress job) throws IOException { 
     47    LOG.info("Job " + job.getJobID() + " submitted to queue " +  
     48        job.getProfile().getQueueName()); 
     49     
     50    // add job to the right queue 
     51    CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName()); 
     52    if (null == queue) { 
     53      // job was submitted to a queue we're not aware of 
     54      LOG.warn("Invalid queue " + job.getProfile().getQueueName() +  
     55          " specified for job" + job.getProfile().getJobID() +  
     56          ". Ignoring job."); 
     57      return; 
     58    } 
     59     
     60    if ((queue.getNumSlotsOccupied(org.apache.hadoop.mapreduce.TaskType.MAP)/queue.getCapacity(org.apache.hadoop.mapreduce.TaskType.MAP) >= 1 
     61            || queue.getNumSlotsOccupied(org.apache.hadoop.mapreduce.TaskType.REDUCE)/queue.getCapacity(org.apache.hadoop.mapreduce.TaskType.REDUCE) >= 1) 
     62            && (job.getPriority() == JobPriority.HIGH || job.getPriority() == JobPriority.VERY_HIGH )){ 
     63        //如果任务的级别比一般的高,一般是要求短时间完成的压力比较少的定时任务,分配到另外一条queue上面执行 
     64         
     65        CapacitySchedulerQueue highPriorityQueue = getQueue("highpriority"); 
     66        if (highPriorityQueue != null ){ 
     67            highPriorityQueue.addWaitingJob(job); 
     68        }else{ 
     69            queue.addWaitingJob(job); 
     70        } 
     71             
     72    }else{ 
     73            // add job to waiting queue. It will end up in the right place,  
     74            // based on priority.  
     75        queue.addWaitingJob(job); 
     76    } 
     77     
     78 
     79    // let scheduler know.  
     80    scheduler.jobAdded(job); 
     81  } 
     82 
     83}}}