wiki:hadoop_tools

Hadooptools工作包实现的功能:

  • HBASE分布式查询,用户输入表名,filter,输出目录等信息查询相应的结果
  • 查看HDFS压缩文件文件里面的前几行数据

创建ProgramDriver

把其他命令类增加进ProgramDriver,ScanTable为hbase查询工具,Head为hdfs文件查看工具

    ProgramDriver pgd = new ProgramDriver();
    pgd.addClass(ScanTable.NAME, ScanTable.class,
      "Search table in HBase table use filter");
    pgd.addClass(Head.NAME, Head.class, "read dfs file top n");

    pgd.driver(args);

ScanTable?

ScanTable是一个map任务,不需要reduce,main函数先创建Configuration,处理参数如下代码

   public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();  //建立Configuration
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  //输入参数parser  
 

参数传递的规则:hadoop jar xxxx.jar program [genericOptions] [commandOptions]
genericOptions:

     -conf <configuration file>     specify a configuration file
     -D <property=value>            use value for given property
     -fs <local|namenode:port>      specify a namenode
     -jt <local|jobtracker:port>    specify a job tracker
     -files <comma separated list of files>    specify comma separated
                            files to be copied to the map reduce cluster
     -libjars <comma separated list of jars>   specify comma separated
                            jar files to include in the classpath.
     -archives <comma separated list of archives>    specify comma
             separated archives to be unarchived on the compute machines.v

采用TableMapReduceUtil.initTableMapperJob 设置输入表格,

      Job job = new Job(conf, ScanTable.NAME);
        job.setJarByClass(ScanTable.class);
        job.setMapperClass(MapClass.class);


        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);

        FilterList flist = new FilterList();
        Class mapClass = null;

        if (otherArgs.length > 3) {
            flist.addFilter(new ParseFilter().parseFilterString(otherArgs[3]));
        }
       
       //根据参数设置输出,row表示输出rowkey,count表述输出条数,cf:colum1,cf:colum2 输出字段信息
        if (otherArgs[2].equalsIgnoreCase("row")) {
            flist.addFilter(new FirstKeyOnlyFilter());
            mapClass = RowClass.class;

        } else if (otherArgs[2].equalsIgnoreCase("count")) {
            flist.addFilter(new FirstKeyOnlyFilter());
            mapClass = CountClass.class;

        } else {
            job.getConfiguration().setStrings(OUTPUT_COLUMNS, otherArgs[2]);
            mapClass = MapClass.class;
        }

        if (flist.getFilters().size() > 0) {
            scan.setFilter(flist);
        }

        TableMapReduceUtil.initTableMapperJob(
                otherArgs[0], // input table
                scan, // Scan instance to control CF and attribute selection
                mapClass, // mapper class
                null, // mapper output key
                null, // mapper output value
                job);

        job.setNumReduceTasks(0);

写3个map类对应3种输出

    public static class RowClass extends TableMapper<Text, Text> {

        @Override
        public void map(ImmutableBytesWritable row, Result value, Context context)
                throws IOException, InterruptedException {
            context.write(new Text(new String(row.get(), "UTF-8")), new Text(""));
        }
    }

执行例子

 hadoop jar   hadooptools.jar  ScanTable -conf /data/hbase-0.94.1/conf/hbase-site.xml  mofang_device /temp1 row "SingleColumnValueFilter('base','model',=,'binaryprefix:iPhone',true,false)

也是要先生成Configuration和处理参数

      FileSystem hdfs = FileSystem.get(conf); //获得HDFS文件系统的对象

        Path inputDir = new Path(otherArgs[0]);
        if (otherArgs.length > 1) {
            MaxLineNumber = Integer.parseInt(otherArgs[1]) > 0 ? Integer.parseInt(otherArgs[1]) : 10;
        }

        byte[] buffer = new byte[4096];
        FSDataInputStream inputStream = hdfs.open(inputDir);

        GzipCodec codec = new GzipCodec();
        codec.setConf(conf);
        CompressionInputStream input = codec.createInputStream(inputStream);

执行例子

hadoop jar hadooptools.jar Head /dingxiang/warehouse/third/20120923/third_00031_20120923.gz 8
Warning: $HADOOP_HOME is deprecated.

12/09/24 10:37:46 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/09/24 10:37:46 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
01hbicu2021100.2000.0630.0000.0000.077113305719100.0000.3330.000
01hbicu202340.2000.0630.0000.0000.077113341274270.0000.3330.000
01hbicu209710.2000.0770.0000.0000.231313306680920.0001.0000.000
01hbicu202200.2000.0770.0000.0000.231313306680920.0001.0000.000
01hbicu209750.2000.0630.0000.0000.077113341274270.0000.3330.000
01hbicu2091100.2000.0630.0000.0000.077113305719100.0000.3330.000
01vteic102209280.2000.0660.0000.0000.125113305950700.0000.5000.000
01vteic1081030.2000.0660.0000.0000.125113260775180.0000.5000.000

Attachments