wiki:schedule

Version 14 (modified by liaojiaohe, 14 years ago) (diff)

--

现在我们使用的是Capacity Scheduler ,在下面的场景存在问题:

当有大任务(晚上定向广告,数据组多数据查询) 占用主队列(default)时,一些比较快的定时任务会阻塞

设想方案

  • 研究Capacity Scheduler 参数

mapred.capacity-scheduler.queue.*.user-limit-factor 设置大于1,一个queue可以使用超过本queue设置的100%的资源

提交任务时增加参数-D mapred.job.priority=HIGH,当queue里面有空闲的map和reduce的slot的时候会优先做,但如果阻塞的job单个map和reduce的时间比较长就没有办法

  • 如果Capacity Scheduler 不能慢速,修改代码让系统把job分配到不同的queue

假设又两条queue:default,highPrior,当default被占满后,而highPrior比较空闲[[BR]]

优先级是HIGH的会被分配到highPrior队列

修改JobQueueManager,jobAdded 函数

修改前

  @Override
  public void jobAdded(JobInProgress job) throws IOException {
    LOG.info("Job " + job.getJobID() + " submitted to queue " + 
        job.getProfile().getQueueName());
    
    // add job to the right queue
    CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());
    if (null == queue) {
      // job was submitted to a queue we're not aware of
      LOG.warn("Invalid queue " + job.getProfile().getQueueName() + 
          " specified for job" + job.getProfile().getJobID() + 
          ". Ignoring job.");
      return;
    }
    // add job to waiting queue. It will end up in the right place, 
    // based on priority. 
    queue.addWaitingJob(job);
    // let scheduler know. 
    scheduler.jobAdded(job);
  }

修改后

 @Override
  public void jobAdded(JobInProgress job) throws IOException {
    LOG.info("Job " + job.getJobID() + " submitted to queue " + 
        job.getProfile().getQueueName());
    
    // add job to the right queue
    CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());
    if (null == queue) {
      // job was submitted to a queue we're not aware of
      LOG.warn("Invalid queue " + job.getProfile().getQueueName() + 
          " specified for job" + job.getProfile().getJobID() + 
          ". Ignoring job.");
      return;
    }
    
    if ((queue.getNumSlotsOccupied(org.apache.hadoop.mapreduce.TaskType.MAP)/queue.getCapacity(org.apache.hadoop.mapreduce.TaskType.MAP) >= 1
            || queue.getNumSlotsOccupied(org.apache.hadoop.mapreduce.TaskType.REDUCE)/queue.getCapacity(org.apache.hadoop.mapreduce.TaskType.REDUCE) >= 1)
            && (job.getPriority() == JobPriority.HIGH || job.getPriority() == JobPriority.VERY_HIGH )){
        //如果任务的级别比一般的高,一般是要求短时间完成的压力比较少的定时任务,分配到另外一条queue上面执行
        
        CapacitySchedulerQueue highPriorityQueue = getQueue("highpriority");
        if (highPriorityQueue != null ){
            highPriorityQueue.addWaitingJob(job);
        }else{
            queue.addWaitingJob(job);
        }
            
    }else{
            // add job to waiting queue. It will end up in the right place, 
            // based on priority. 
        queue.addWaitingJob(job);
    }
    

    // let scheduler know. 
    scheduler.jobAdded(job);
  }

Attachments