Hadooptools工作包实现的功能:[[BR]] * HBASE分布式查询,用户输入表名,filter,输出目录等信息查询相应的结果 * 查看HDFS压缩文件文件里面的前几行数据 == 创建ProgramDriver == 把其他命令类增加进ProgramDriver,ScanTable为hbase查询工具,Head为hdfs文件查看工具 {{{ ProgramDriver pgd = new ProgramDriver(); pgd.addClass(ScanTable.NAME, ScanTable.class, "Search table in HBase table use filter"); pgd.addClass(Head.NAME, Head.class, "read dfs file top n"); pgd.driver(args); }}} == ScanTable == ScanTable是一个map任务,不需要reduce,main函数先创建Configuration,处理参数如下代码 {{{ public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); //建立Configuration String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //输入参数parser }}} 参数传递的规则:hadoop jar xxxx.jar program [genericOptions] [commandOptions][[BR]] genericOptions: {{{ -conf specify a configuration file -D use value for given property -fs specify a namenode -jt specify a job tracker -files specify comma separated files to be copied to the map reduce cluster -libjars specify comma separated jar files to include in the classpath. -archives specify comma separated archives to be unarchived on the compute machines.v }}} 采用TableMapReduceUtil.initTableMapperJob 设置输入表格, {{{ Job job = new Job(conf, ScanTable.NAME); job.setJarByClass(ScanTable.class); job.setMapperClass(MapClass.class); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); FilterList flist = new FilterList(); Class mapClass = null; if (otherArgs.length > 3) { flist.addFilter(new ParseFilter().parseFilterString(otherArgs[3])); } //根据参数设置输出,row表示输出rowkey,count表述输出条数,cf:colum1,cf:colum2 输出字段信息 if (otherArgs[2].equalsIgnoreCase("row")) { flist.addFilter(new FirstKeyOnlyFilter()); mapClass = RowClass.class; } else if (otherArgs[2].equalsIgnoreCase("count")) { flist.addFilter(new FirstKeyOnlyFilter()); mapClass = CountClass.class; } else { job.getConfiguration().setStrings(OUTPUT_COLUMNS, otherArgs[2]); mapClass = MapClass.class; } if (flist.getFilters().size() > 0) { scan.setFilter(flist); } TableMapReduceUtil.initTableMapperJob( otherArgs[0], // input table scan, // Scan instance to control CF and attribute selection mapClass, // mapper class null, // mapper output key null, // mapper output value job); job.setNumReduceTasks(0); }}} 写3个map类对应3种输出 {{{ public static class RowClass extends TableMapper { @Override public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { context.write(new Text(new String(row.get(), "UTF-8")), new Text("")); } } }}}