比较完整的Hadoop应用jar包,包括入口程序,任务和帮助类3部分就够了 '''1. 我们在项目的包的顶级放一个入口程序[[BR]]''' 如 cn.pconline.ad.analytics3.Analyser,入口程序里面有一个ProgramDriver,把任务类加在里面就可以通过命令'' hadoop jar Analyser3.0.jar SumPvClick'' 执行 {{{ public static void main(String argv[]) throws Exception { int exitCode = -1; ProgramDriver pgd = new ProgramDriver(); try { pgd.addClass("CombineADRawFile", CombineADRawFile.class, ""); pgd.addClass("SumPvClick", SumPvClick.class, ""); .... }}} '''2. hadoop任务是程序包的核心'''[[BR]] 我们的做法是写一个实现Tool接口的基类,其他的任务继承这个基类,有些公共的方法就不用每个重复 {{{ public abstract class JobBase implements Tool }}} 在每个job里增加main函数,使用 ToolRunner.run 方法调用,这样有些系统的参数如map数量,系统就会先保存到conf里面[[BR]] 传给run的arg都是经过GenericOptionsParser过虑的,在run的方法里面我们可以配置任务各部分需要的类 {{{ public static int main(String[] args) throws Exception { return ToolRunner.run(new Configuration(), new RefererUniq(), args); } @Override public int run(String[] args) throws Exception { Job job = new Job(conf); job.setJarByClass(this.getClass()); job.setJobName(jobName); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); 。。。。 FileOutputFormat.setOutputPath(job, new Path(userArgs[ARGNAME.WORK.ordinal()])); job.setMapperClass(MyMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); boolean success = job.waitForCompletion(true); } }}} 提醒一下新的hadoop mapreduce类在包 org.apache.hadoop.mapreduce.*下[[BR]] 每个任务的核心是map和reduce,有些类可以使用系统现有的,有些可以项目公用,不过更多的需要每个job里面自己定制,借网上的两张图说明一下map和reduce的流程 [[Image(map.jpg)]] [[Image(reduce.jpg)]] InputFormat做Map前的预处理,主要负责以下工作: 验证输入的格式是否符合Configuration的输入定义,这个在实现Map和构建Conf的时候就会知道,不定义可以是Writable的任意子类。[[BR]] 将input的文件切分为逻辑上的输入InputSplit,其实这就是在上面提到的在分布式文件系统中blocksize是有大小限制的,因此大文件会被划分为多个block。 通过RecordReader来再次处理inputsplit为一组records,输出给Map。[[BR]] RecordReader处理后的结果作为Map的输入,Map执行定义的Map逻辑,输出处理后的key和value对应到临时中间文件。[[BR]] 更多的输入输例子可以看[/wiki/inputouput/ Hadoop常用输入输出][[BR]] Combiner可选择配置,主要作用是在每一个Map执行完分析以后,在本地优先作Reduce的工作,减少在Reduce过程中的数据传输量。[[BR]] 我们在广告统计pv的时候使用了,这样可以减少总体排序的数量和网络上数据传输量[[BR]] CombinerClass可以和reduce类相同也可以不同 {{{ job.setMapperClass(MyMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); }}} Partitioner可选择配置,主要作用是在多个Reduce的情况下,指定Map的结果由某一个Reduce处理,每一个Reduce都会有单独的输出文件。[[BR]] 主要使用场景是想一次读文件,得到多样的结果,比如读广告的原始文件,希望一次得到所有唯一的ip和唯一的uv可以使用这种[[BR]] 但如果搞得太复杂会影响程序的可读性,增加判断肯定会影响job的速度,有时候我宁愿多读一次输入。[[BR]] Reduce执行具体的业务逻辑,并且将处理结果输出给OutputFormat。[[BR]] 一个简单的reduce里面,要注意的是reduce的迭代器只能读取一次Iterable values,如果你要遍历所有values,要注意会不会OOM。 {{{ /** * 累计value的reducer类 * @param */ public static class IntSumReducer extends Reducer { private IntWritable result = new IntWritable(); @Override public void reduce(K key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } }}} OutputFormat的职责是,验证输出目录是否已经存在,同时验证输出结果类型是否如Config中配置,最后输出Reduce汇总后的结果。[[BR]] 除非你要改变hadoop的输出规则,否则建议不需要自己写这部分,使用系统的就可以了,我们曾经试图把输出写到同一个文件上,这样hadoop只能使用单个的reduce 完成job的时间也是极慢的。 更多的输入输例子可以看[/wiki/inputouput/ Hadoop常用输入输出][[BR]]