| | 24 | |
| | 25 | 在每个job里增加main函数,使用 ToolRunner.run 方法调用,这样有些系统的参数如map数量,系统就会先保存到conf里面[[BR]] |
| | 26 | 传给run的arg都是经过GenericOptionsParser过虑的,在run的方法里面我们可以配置任务各部分需要的类 |
| | 27 | {{{ |
| | 28 | |
| | 29 | public static int main(String[] args) throws Exception { |
| | 30 | return ToolRunner.run(new Configuration(), new RefererUniq(), args); |
| | 31 | } |
| | 32 | |
| | 33 | @Override |
| | 34 | public int run(String[] args) throws Exception { |
| | 35 | Job job = new Job(conf); |
| | 36 | job.setJarByClass(this.getClass()); |
| | 37 | job.setJobName(jobName); |
| | 38 | |
| | 39 | job.setOutputKeyClass(Text.class); |
| | 40 | job.setOutputValueClass(IntWritable.class); |
| | 41 | 。。。。 |
| | 42 | |
| | 43 | FileOutputFormat.setOutputPath(job, new Path(userArgs[ARGNAME.WORK.ordinal()])); |
| | 44 | |
| | 45 | job.setMapperClass(MyMapper.class); |
| | 46 | job.setCombinerClass(IntSumReducer.class); |
| | 47 | job.setReducerClass(IntSumReducer.class); |
| | 48 | boolean success = job.waitForCompletion(true); |
| | 49 | } |
| | 50 | }}} |
| | 51 | 提醒一下新的hadoop mapreduce类在包 org.apache.hadoop.mapreduce.*下[[BR]] |
| | 52 | |
| | 57 | |
| | 58 | InputFormat做Map前的预处理,主要负责以下工作: |
| | 59 | 验证输入的格式是否符合Configuration的输入定义,这个在实现Map和构建Conf的时候就会知道,不定义可以是Writable的任意子类。[[BR]] |
| | 60 | |
| | 61 | 将input的文件切分为逻辑上的输入InputSplit,其实这就是在上面提到的在分布式文件系统中blocksize是有大小限制的,因此大文件会被划分为多个block。 |
| | 62 | 通过RecordReader来再次处理inputsplit为一组records,输出给Map。[[BR]] |
| | 63 | RecordReader处理后的结果作为Map的输入,Map执行定义的Map逻辑,输出处理后的key和value对应到临时中间文件。[[BR]] |
| | 64 | 更多的输入输例子可以看[/wiki/inputouput/ Hadoop常用输入输出][[BR]] |
| | 65 | |
| | 66 | |
| | 67 | Combiner可选择配置,主要作用是在每一个Map执行完分析以后,在本地优先作Reduce的工作,减少在Reduce过程中的数据传输量。[[BR]] |
| | 68 | 我们在广告统计pv的时候使用了,这样可以减少总体排序的数量和网络上数据传输量[[BR]] |
| | 69 | CombinerClass可以和reduce类相同也可以不同 |
| | 70 | {{{ |
| | 71 | job.setMapperClass(MyMapper.class); |
| | 72 | job.setCombinerClass(IntSumReducer.class); |
| | 73 | job.setReducerClass(IntSumReducer.class); |
| | 74 | }}} |
| | 75 | |
| | 76 | |
| | 77 | Partitioner可选择配置,主要作用是在多个Reduce的情况下,指定Map的结果由某一个Reduce处理,每一个Reduce都会有单独的输出文件。[[BR]] |
| | 78 | 主要使用场景是想一次读文件,得到多样的结果,比如读广告的原始文件,希望一次得到所有唯一的ip和唯一的uv可以使用这种[[BR]] |
| | 79 | 但如果搞得太复杂会影响程序的可读性,增加判断肯定会影响job的速度,有时候我宁愿多读一次输入。[[BR]] |
| | 80 | |
| | 81 | |
| | 82 | Reduce执行具体的业务逻辑,并且将处理结果输出给OutputFormat。[[BR]] |
| | 83 | 一个简单的reduce里面,要注意的是reduce的迭代器只能读取一次Iterable<IntWritable> values,如果你要遍历所有values,要注意会不会OOM。 |
| | 84 | {{{ |
| | 85 | /** |
| | 86 | * 累计value的reducer类 |
| | 87 | * @param <K> |
| | 88 | */ |
| | 89 | public static class IntSumReducer<K> |
| | 90 | extends Reducer<K, IntWritable, K, IntWritable> { |
| | 91 | |
| | 92 | private IntWritable result = new IntWritable(); |
| | 93 | |
| | 94 | @Override |
| | 95 | public void reduce(K key, Iterable<IntWritable> values, |
| | 96 | Context context) throws IOException, InterruptedException { |
| | 97 | int sum = 0; |
| | 98 | for (IntWritable val : values) { |
| | 99 | sum += val.get(); |
| | 100 | } |
| | 101 | result.set(sum); |
| | 102 | context.write(key, result); |
| | 103 | } |
| | 104 | } |
| | 105 | }}} |
| | 106 | |
| | 107 | |
| | 108 | OutputFormat的职责是,验证输出目录是否已经存在,同时验证输出结果类型是否如Config中配置,最后输出Reduce汇总后的结果。[[BR]] |
| | 109 | 除非你要改变hadoop的输出规则,否则建议不需要自己写这部分,使用系统的就可以了,我们曾经试图把输出写到同一个文件上,这样hadoop只能使用单个的reduce |
| | 110 | 完成job的时间也是极慢的。 |
| | 111 | 更多的输入输例子可以看[/wiki/inputouput/ Hadoop常用输入输出][[BR]] |