现在我们使用的是Capacity Scheduler ,在下面的场景存在问题:[[BR]] 当有大任务(晚上定向广告,数据组多数据查询) 占用主队列(default)时,一些比较快的定时任务会阻塞[[BR]] 设想方案[[BR]] * 研究Capacity Scheduler 参数[[BR]] mapred.capacity-scheduler.queue.*.user-limit-factor 设置大于1,一个queue可以使用超过本queue设置的100%的资源[[BR]] 提交任务时增加参数-D mapred.job.priority=HIGH,当queue里面有空闲的map和reduce的slot的时候会优先做,但如果阻塞的job单个map和reduce的时间比较长就没有办法[[BR]] * 如果Capacity Scheduler 不能慢速,修改代码让系统把job分配到不同的queue[[BR]] 假设又两条queue:default,highPrior,当default被占满后,而highPrior比较空闲 [[BR]] 优先级是HIGH的会被分配到highPrior队列[[BR]] 修改JobQueueManager,jobAdded 函数 修改前 {{{ @Override public void jobAdded(JobInProgress job) throws IOException { LOG.info("Job " + job.getJobID() + " submitted to queue " + job.getProfile().getQueueName()); // add job to the right queue CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName()); if (null == queue) { // job was submitted to a queue we're not aware of LOG.warn("Invalid queue " + job.getProfile().getQueueName() + " specified for job" + job.getProfile().getJobID() + ". Ignoring job."); return; } // add job to waiting queue. It will end up in the right place, // based on priority. queue.addWaitingJob(job); // let scheduler know. scheduler.jobAdded(job); } }}} 修改后 {{{ //增加一个map保存job真是分配到哪个queue,在jobAdd里面增加,jobRemoved清除 private Map jobQueueMap = new HashMap(); public String getJobQueue( JobInProgress job){ if (jobQueueMap.get(job.getJobID().toString()) != null){ return jobQueueMap.get(job.getJobID().toString()); }else{ return job.getProfile().queueName; } } public Map getJobQueueMap() { return jobQueueMap; } @Override public void jobAdded(JobInProgress job) throws IOException { LOG.info("Job " + job.getJobID() + " submitted to queue " + job.getProfile().getQueueName()); // add job to the right queue CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName()); if (null == queue) { // job was submitted to a queue we're not aware of LOG.warn("Invalid queue " + job.getProfile().getQueueName() + " specified for job" + job.getProfile().getJobID() + ". Ignoring job."); return; } if ((queue.getNumSlotsOccupied(org.apache.hadoop.mapreduce.TaskType.MAP)/queue.getCapacity(org.apache.hadoop.mapreduce.TaskType.MAP) >= 1 || queue.getNumSlotsOccupied(org.apache.hadoop.mapreduce.TaskType.REDUCE)/queue.getCapacity(org.apache.hadoop.mapreduce.TaskType.REDUCE) >= 1) && (job.getPriority() == JobPriority.HIGH || job.getPriority() == JobPriority.VERY_HIGH )){ //如果任务的级别比一般的高,一般是要求短时间完成的压力比较少的定时任务,分配到另外一条queue上面执行 CapacitySchedulerQueue highPriorityQueue = getQueue("highpriority"); if (highPriorityQueue != null ){ highPriorityQueue.addWaitingJob(job); jobQueueMap.put(job.getJobID().toString(), "highpriority"); }else{ queue.addWaitingJob(job); } }else{ // add job to waiting queue. It will end up in the right place, // based on priority. queue.addWaitingJob(job); } // let scheduler know. scheduler.jobAdded(job); } @Override public void jobRemoved(JobInProgress job) { jobQueueMap.remove(job.getJobID().toString()); } }}} 替换其他地方的拿jobQueue的方法为 job.getProfile().getQueueName() =》jobQueuesManager.getJobQueue(job) 经测试,当default queue阻塞的时候,优先级高的任务会切换到highpriority 这个queue里面,代码和jar见附件,更新需要重启hadoop