Hadooptools工作包实现的功能:[[BR]] * HBASE分布式查询,用户输入表名,filter,输出目录等信息查询相应的结果 * 查看HDFS压缩文件文件里面的前几行数据 == 创建ProgramDriver == 把其他命令类增加进ProgramDriver,ScanTable为hbase查询工具,Head为hdfs文件查看工具 {{{ ProgramDriver pgd = new ProgramDriver(); pgd.addClass(ScanTable.NAME, ScanTable.class, "Search table in HBase table use filter"); pgd.addClass(Head.NAME, Head.class, "read dfs file top n"); pgd.driver(args); }}} == ScanTable == ScanTable是一个map任务,不需要reduce,main函数先创建Configuration,处理参数如下代码 {{{ public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); //建立Configuration String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //输入参数parser }}} 参数传递的规则:hadoop jar xxxx.jar program [genericOptions] [commandOptions][[BR]] genericOptions: {{{ -conf specify a configuration file -D use value for given property -fs specify a namenode -jt specify a job tracker -files specify comma separated files to be copied to the map reduce cluster -libjars specify comma separated jar files to include in the classpath. -archives specify comma separated archives to be unarchived on the compute machines.v }}} 采用TableMapReduceUtil.initTableMapperJob 设置输入表格, {{{ Job job = new Job(conf, ScanTable.NAME); job.setJarByClass(ScanTable.class); job.setMapperClass(MapClass.class); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); FilterList flist = new FilterList(); Class mapClass = null; if (otherArgs.length > 3) { flist.addFilter(new ParseFilter().parseFilterString(otherArgs[3])); } //根据参数设置输出,row表示输出rowkey,count表述输出条数,cf:colum1,cf:colum2 输出字段信息 if (otherArgs[2].equalsIgnoreCase("row")) { flist.addFilter(new FirstKeyOnlyFilter()); mapClass = RowClass.class; } else if (otherArgs[2].equalsIgnoreCase("count")) { flist.addFilter(new FirstKeyOnlyFilter()); mapClass = CountClass.class; } else { job.getConfiguration().setStrings(OUTPUT_COLUMNS, otherArgs[2]); mapClass = MapClass.class; } if (flist.getFilters().size() > 0) { scan.setFilter(flist); } TableMapReduceUtil.initTableMapperJob( otherArgs[0], // input table scan, // Scan instance to control CF and attribute selection mapClass, // mapper class null, // mapper output key null, // mapper output value job); job.setNumReduceTasks(0); }}} 写3个map类对应3种输出 {{{ public static class RowClass extends TableMapper { @Override public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { context.write(new Text(new String(row.get(), "UTF-8")), new Text("")); } } }}} 执行例子 {{{ hadoop jar hadooptools.jar ScanTable -conf /data/hbase-0.94.1/conf/hbase-site.xml mofang_device /temp1 row "SingleColumnValueFilter('base','model',=,'binaryprefix:iPhone',true,false) }}} == Head == 也是要先生成Configuration和处理参数 {{{ FileSystem hdfs = FileSystem.get(conf); //获得HDFS文件系统的对象 Path inputDir = new Path(otherArgs[0]); if (otherArgs.length > 1) { MaxLineNumber = Integer.parseInt(otherArgs[1]) > 0 ? Integer.parseInt(otherArgs[1]) : 10; } byte[] buffer = new byte[4096]; FSDataInputStream inputStream = hdfs.open(inputDir); GzipCodec codec = new GzipCodec(); codec.setConf(conf); CompressionInputStream input = codec.createInputStream(inputStream); }}} 执行例子 {{{ hadoop jar hadooptools.jar Head /dingxiang/warehouse/third/20120923/third_00031_20120923.gz 8 Warning: $HADOOP_HOME is deprecated. 12/09/24 10:37:46 INFO util.NativeCodeLoader: Loaded the native-hadoop library 12/09/24 10:37:46 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 01hbicu2021100.2000.0630.0000.0000.077113305719100.0000.3330.000 01hbicu202340.2000.0630.0000.0000.077113341274270.0000.3330.000 01hbicu209710.2000.0770.0000.0000.231313306680920.0001.0000.000 01hbicu202200.2000.0770.0000.0000.231313306680920.0001.0000.000 01hbicu209750.2000.0630.0000.0000.077113341274270.0000.3330.000 01hbicu2091100.2000.0630.0000.0000.077113305719100.0000.3330.000 01vteic102209280.2000.0660.0000.0000.125113305950700.0000.5000.000 01vteic1081030.2000.0660.0000.0000.125113260775180.0000.5000.000 }}}