| | 41 | |
| | 42 | 修改后 |
| | 43 | |
| | 44 | {{{ |
| | 45 | @Override |
| | 46 | public void jobAdded(JobInProgress job) throws IOException { |
| | 47 | LOG.info("Job " + job.getJobID() + " submitted to queue " + |
| | 48 | job.getProfile().getQueueName()); |
| | 49 | |
| | 50 | // add job to the right queue |
| | 51 | CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName()); |
| | 52 | if (null == queue) { |
| | 53 | // job was submitted to a queue we're not aware of |
| | 54 | LOG.warn("Invalid queue " + job.getProfile().getQueueName() + |
| | 55 | " specified for job" + job.getProfile().getJobID() + |
| | 56 | ". Ignoring job."); |
| | 57 | return; |
| | 58 | } |
| | 59 | |
| | 60 | if ((queue.getNumSlotsOccupied(org.apache.hadoop.mapreduce.TaskType.MAP)/queue.getCapacity(org.apache.hadoop.mapreduce.TaskType.MAP) >= 1 |
| | 61 | || queue.getNumSlotsOccupied(org.apache.hadoop.mapreduce.TaskType.REDUCE)/queue.getCapacity(org.apache.hadoop.mapreduce.TaskType.REDUCE) >= 1) |
| | 62 | && (job.getPriority() == JobPriority.HIGH || job.getPriority() == JobPriority.VERY_HIGH )){ |
| | 63 | //如果任务的级别比一般的高,一般是要求短时间完成的压力比较少的定时任务,分配到另外一条queue上面执行 |
| | 64 | |
| | 65 | CapacitySchedulerQueue highPriorityQueue = getQueue("highpriority"); |
| | 66 | if (highPriorityQueue != null ){ |
| | 67 | highPriorityQueue.addWaitingJob(job); |
| | 68 | }else{ |
| | 69 | queue.addWaitingJob(job); |
| | 70 | } |
| | 71 | |
| | 72 | }else{ |
| | 73 | // add job to waiting queue. It will end up in the right place, |
| | 74 | // based on priority. |
| | 75 | queue.addWaitingJob(job); |
| | 76 | } |
| | 77 | |
| | 78 | |
| | 79 | // let scheduler know. |
| | 80 | scheduler.jobAdded(job); |
| | 81 | } |
| | 82 | |
| | 83 | }}} |