wiki:schedule

现在我们使用的是Capacity Scheduler ,在下面的场景存在问题:

当有大任务(晚上定向广告,数据组多数据查询) 占用主队列(default)时,一些比较快的定时任务会阻塞

设想方案

  • 研究Capacity Scheduler 参数

mapred.capacity-scheduler.queue.*.user-limit-factor 设置大于1,一个queue可以使用超过本queue设置的100%的资源

提交任务时增加参数-D mapred.job.priority=HIGH,当queue里面有空闲的map和reduce的slot的时候会优先做,但如果阻塞的job单个map和reduce的时间比较长就没有办法

  • 如果Capacity Scheduler 不能慢速,修改代码让系统把job分配到不同的queue

假设又两条queue:default,highPrior,当default被占满后,而highPrior比较空闲

优先级是HIGH的会被分配到highPrior队列

修改JobQueueManager,jobAdded 函数

修改前

  @Override
  public void jobAdded(JobInProgress job) throws IOException {
    LOG.info("Job " + job.getJobID() + " submitted to queue " + 
        job.getProfile().getQueueName());
    
    // add job to the right queue
    CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());
    if (null == queue) {
      // job was submitted to a queue we're not aware of
      LOG.warn("Invalid queue " + job.getProfile().getQueueName() + 
          " specified for job" + job.getProfile().getJobID() + 
          ". Ignoring job.");
      return;
    }
    // add job to waiting queue. It will end up in the right place, 
    // based on priority. 
    queue.addWaitingJob(job);
    // let scheduler know. 
    scheduler.jobAdded(job);
  }

修改后

   //增加一个map保存job真是分配到哪个queue,在jobAdd里面增加,jobRemoved清除
  private Map<String,String> jobQueueMap = new HashMap<String, String>();
  public String getJobQueue( JobInProgress job){
      if (jobQueueMap.get(job.getJobID().toString()) != null){
            return jobQueueMap.get(job.getJobID().toString());        
      }else{
          return job.getProfile().queueName;
      }
  }

  public Map<String, String> getJobQueueMap() {
        return jobQueueMap;
  }

  @Override
  public void jobAdded(JobInProgress job) throws IOException {
    LOG.info("Job " + job.getJobID() + " submitted to queue " + 
        job.getProfile().getQueueName());
    
    // add job to the right queue
    CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());
    if (null == queue) {
      // job was submitted to a queue we're not aware of
      LOG.warn("Invalid queue " + job.getProfile().getQueueName() + 
          " specified for job" + job.getProfile().getJobID() + 
          ". Ignoring job.");
      return;
    }
    
    if ((queue.getNumSlotsOccupied(org.apache.hadoop.mapreduce.TaskType.MAP)/queue.getCapacity(org.apache.hadoop.mapreduce.TaskType.MAP) >= 1
            || queue.getNumSlotsOccupied(org.apache.hadoop.mapreduce.TaskType.REDUCE)/queue.getCapacity(org.apache.hadoop.mapreduce.TaskType.REDUCE) >= 1)
            && (job.getPriority() == JobPriority.HIGH || job.getPriority() == JobPriority.VERY_HIGH )){
        //如果任务的级别比一般的高,一般是要求短时间完成的压力比较少的定时任务,分配到另外一条queue上面执行
        
        CapacitySchedulerQueue highPriorityQueue = getQueue("highpriority");
        if (highPriorityQueue != null ){
            highPriorityQueue.addWaitingJob(job);
            jobQueueMap.put(job.getJobID().toString(), "highpriority");
        }else{
            queue.addWaitingJob(job);
        }
            
    }else{
            // add job to waiting queue. It will end up in the right place, 
            // based on priority. 
        queue.addWaitingJob(job);
    }
    

    // let scheduler know. 
    scheduler.jobAdded(job);
  }

  @Override
  public void jobRemoved(JobInProgress job) {
    jobQueueMap.remove(job.getJobID().toString());
  }

替换其他地方的拿jobQueue的方法为

job.getProfile().getQueueName() =》jobQueuesManager.getJobQueue(job)

经测试,当default queue阻塞的时候,优先级高的任务会切换到highpriority 这个queue里面,代码和jar见附件,更新需要重启hadoop

Attachments