Changes between Version 4 and Version 5 of hadoop_tools


Ignore:
Timestamp:
09/24/2012 10:55:59 AM (14 years ago)
Author:
liaojiaohe
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • hadoop_tools

    v4 v5  
    44 
    55 
     6== 创建ProgramDriver == 
     7把其他命令类增加进ProgramDriver,ScanTable为hbase查询工具,Head为hdfs文件查看工具 
     8 
     9{{{ 
     10    ProgramDriver pgd = new ProgramDriver(); 
     11    pgd.addClass(ScanTable.NAME, ScanTable.class, 
     12      "Search table in HBase table use filter"); 
     13    pgd.addClass(Head.NAME, Head.class, "read dfs file top n"); 
     14 
     15    pgd.driver(args); 
     16}}} 
     17 
     18== ScanTable == 
     19ScanTable是一个map任务,不需要reduce,main函数先创建Configuration,处理参数如下代码 
    620 
    721{{{ 
     
    2640             separated archives to be unarchived on the compute machines.v 
    2741}}} 
     42 
     43采用TableMapReduceUtil.initTableMapperJob 设置输入表格, 
     44{{{ 
     45      Job job = new Job(conf, ScanTable.NAME); 
     46        job.setJarByClass(ScanTable.class); 
     47        job.setMapperClass(MapClass.class); 
     48 
     49 
     50        Scan scan = new Scan(); 
     51        scan.setCaching(500); 
     52        scan.setCacheBlocks(false); 
     53 
     54        FilterList flist = new FilterList(); 
     55        Class mapClass = null; 
     56 
     57        if (otherArgs.length > 3) { 
     58            flist.addFilter(new ParseFilter().parseFilterString(otherArgs[3])); 
     59        } 
     60        
     61       //根据参数设置输出,row表示输出rowkey,count表述输出条数,cf:colum1,cf:colum2 输出字段信息 
     62        if (otherArgs[2].equalsIgnoreCase("row")) { 
     63            flist.addFilter(new FirstKeyOnlyFilter()); 
     64            mapClass = RowClass.class; 
     65 
     66        } else if (otherArgs[2].equalsIgnoreCase("count")) { 
     67            flist.addFilter(new FirstKeyOnlyFilter()); 
     68            mapClass = CountClass.class; 
     69 
     70        } else { 
     71            job.getConfiguration().setStrings(OUTPUT_COLUMNS, otherArgs[2]); 
     72            mapClass = MapClass.class; 
     73        } 
     74 
     75        if (flist.getFilters().size() > 0) { 
     76            scan.setFilter(flist); 
     77        } 
     78 
     79        TableMapReduceUtil.initTableMapperJob( 
     80                otherArgs[0], // input table 
     81                scan, // Scan instance to control CF and attribute selection 
     82                mapClass, // mapper class 
     83                null, // mapper output key 
     84                null, // mapper output value 
     85                job); 
     86 
     87        job.setNumReduceTasks(0); 
     88}}} 
     89 
     90 
     91写3个map类对应3种输出 
     92 
     93{{{ 
     94    public static class RowClass extends TableMapper<Text, Text> { 
     95 
     96        @Override 
     97        public void map(ImmutableBytesWritable row, Result value, Context context) 
     98                throws IOException, InterruptedException { 
     99            context.write(new Text(new String(row.get(), "UTF-8")), new Text("")); 
     100        } 
     101    } 
     102}}} 
     103