现在我们使用的是Capacity Scheduler ,在下面的场景存在问题:
当有大任务(晚上定向广告,数据组多数据查询) 占用主队列(default)时,一些比较快的定时任务会阻塞
设想方案
- 研究Capacity Scheduler 参数
mapred.capacity-scheduler.queue.*.user-limit-factor 设置大于1,一个queue可以使用超过本queue设置的100%的资源
提交任务时增加参数-D mapred.job.priority=HIGH,当queue里面有空闲的map和reduce的slot的时候会优先做,但如果阻塞的job单个map和reduce的时间比较长就没有办法
- 如果Capacity Scheduler 不能慢速,修改代码让系统把job分配到不同的queue
假设又两条queue:default,highPrior,当default被占满后,而highPrior比较空闲
优先级是HIGH的会被分配到highPrior队列
修改JobQueueManager,jobAdded 函数
修改前
@Override
public void jobAdded(JobInProgress job) throws IOException {
LOG.info("Job " + job.getJobID() + " submitted to queue " +
job.getProfile().getQueueName());
// add job to the right queue
CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());
if (null == queue) {
// job was submitted to a queue we're not aware of
LOG.warn("Invalid queue " + job.getProfile().getQueueName() +
" specified for job" + job.getProfile().getJobID() +
". Ignoring job.");
return;
}
// add job to waiting queue. It will end up in the right place,
// based on priority.
queue.addWaitingJob(job);
// let scheduler know.
scheduler.jobAdded(job);
}
修改后
//增加一个map保存job真是分配到哪个queue,在jobAdd里面增加,jobRemoved清除
private Map<String,String> jobQueueMap = new HashMap<String, String>();
public String getJobQueue( JobInProgress job){
if (jobQueueMap.get(job.getJobID().toString()) != null){
return jobQueueMap.get(job.getJobID().toString());
}else{
return job.getProfile().queueName;
}
}
public Map<String, String> getJobQueueMap() {
return jobQueueMap;
}
@Override
public void jobAdded(JobInProgress job) throws IOException {
LOG.info("Job " + job.getJobID() + " submitted to queue " +
job.getProfile().getQueueName());
// add job to the right queue
CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());
if (null == queue) {
// job was submitted to a queue we're not aware of
LOG.warn("Invalid queue " + job.getProfile().getQueueName() +
" specified for job" + job.getProfile().getJobID() +
". Ignoring job.");
return;
}
if ((queue.getNumSlotsOccupied(org.apache.hadoop.mapreduce.TaskType.MAP)/queue.getCapacity(org.apache.hadoop.mapreduce.TaskType.MAP) >= 1
|| queue.getNumSlotsOccupied(org.apache.hadoop.mapreduce.TaskType.REDUCE)/queue.getCapacity(org.apache.hadoop.mapreduce.TaskType.REDUCE) >= 1)
&& (job.getPriority() == JobPriority.HIGH || job.getPriority() == JobPriority.VERY_HIGH )){
//如果任务的级别比一般的高,一般是要求短时间完成的压力比较少的定时任务,分配到另外一条queue上面执行
CapacitySchedulerQueue highPriorityQueue = getQueue("highpriority");
if (highPriorityQueue != null ){
highPriorityQueue.addWaitingJob(job);
jobQueueMap.put(job.getJobID().toString(), "highpriority");
}else{
queue.addWaitingJob(job);
}
}else{
// add job to waiting queue. It will end up in the right place,
// based on priority.
queue.addWaitingJob(job);
}
// let scheduler know.
scheduler.jobAdded(job);
}
@Override
public void jobRemoved(JobInProgress job) {
jobQueueMap.remove(job.getJobID().toString());
}
替换其他地方的拿jobQueue的方法为
job.getProfile().getQueueName() =》jobQueuesManager.getJobQueue(job)
经测试,当default queue阻塞的时候,优先级高的任务会切换到highpriority 这个queue里面,代码和jar见附件,更新需要重启hadoop
Attachments
-
src.zip
(38.5 KB) -
added by liaojiaohe 13 years ago.
-
hadoop-capacity-scheduler-1.0.3.jar
(63.9 KB) -
added by liaojiaohe 13 years ago.
![(please configure the [header_logo] section in trac.ini)](http://www1.pconline.com.cn/hr/2009/global/images/logo.gif)