Hadooptools工作包实现的功能:
- HBASE分布式查询,用户输入表名,filter,输出目录等信息查询相应的结果
- 查看HDFS压缩文件文件里面的前几行数据
创建ProgramDriver
把其他命令类增加进ProgramDriver,ScanTable为hbase查询工具,Head为hdfs文件查看工具
ProgramDriver pgd = new ProgramDriver();
pgd.addClass(ScanTable.NAME, ScanTable.class,
"Search table in HBase table use filter");
pgd.addClass(Head.NAME, Head.class, "read dfs file top n");
pgd.driver(args);
ScanTable?
ScanTable是一个map任务,不需要reduce,main函数先创建Configuration,处理参数如下代码
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create(); //建立Configuration
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //输入参数parser
参数传递的规则:hadoop jar xxxx.jar program [genericOptions] [commandOptions]
genericOptions:
-conf <configuration file> specify a configuration file
-D <property=value> use value for given property
-fs <local|namenode:port> specify a namenode
-jt <local|jobtracker:port> specify a job tracker
-files <comma separated list of files> specify comma separated
files to be copied to the map reduce cluster
-libjars <comma separated list of jars> specify comma separated
jar files to include in the classpath.
-archives <comma separated list of archives> specify comma
separated archives to be unarchived on the compute machines.v
采用TableMapReduceUtil.initTableMapperJob 设置输入表格,
Job job = new Job(conf, ScanTable.NAME);
job.setJarByClass(ScanTable.class);
job.setMapperClass(MapClass.class);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
FilterList flist = new FilterList();
Class mapClass = null;
if (otherArgs.length > 3) {
flist.addFilter(new ParseFilter().parseFilterString(otherArgs[3]));
}
//根据参数设置输出,row表示输出rowkey,count表述输出条数,cf:colum1,cf:colum2 输出字段信息
if (otherArgs[2].equalsIgnoreCase("row")) {
flist.addFilter(new FirstKeyOnlyFilter());
mapClass = RowClass.class;
} else if (otherArgs[2].equalsIgnoreCase("count")) {
flist.addFilter(new FirstKeyOnlyFilter());
mapClass = CountClass.class;
} else {
job.getConfiguration().setStrings(OUTPUT_COLUMNS, otherArgs[2]);
mapClass = MapClass.class;
}
if (flist.getFilters().size() > 0) {
scan.setFilter(flist);
}
TableMapReduceUtil.initTableMapperJob(
otherArgs[0], // input table
scan, // Scan instance to control CF and attribute selection
mapClass, // mapper class
null, // mapper output key
null, // mapper output value
job);
job.setNumReduceTasks(0);
写3个map类对应3种输出
public static class RowClass extends TableMapper<Text, Text> {
@Override
public void map(ImmutableBytesWritable row, Result value, Context context)
throws IOException, InterruptedException {
context.write(new Text(new String(row.get(), "UTF-8")), new Text(""));
}
}
执行例子
hadoop jar hadooptools.jar ScanTable -conf /data/hbase-0.94.1/conf/hbase-site.xml mofang_device /temp1 row "SingleColumnValueFilter('base','model',=,'binaryprefix:iPhone',true,false)
Head
也是要先生成Configuration和处理参数
FileSystem hdfs = FileSystem.get(conf); //获得HDFS文件系统的对象
Path inputDir = new Path(otherArgs[0]);
if (otherArgs.length > 1) {
MaxLineNumber = Integer.parseInt(otherArgs[1]) > 0 ? Integer.parseInt(otherArgs[1]) : 10;
}
byte[] buffer = new byte[4096];
FSDataInputStream inputStream = hdfs.open(inputDir);
GzipCodec codec = new GzipCodec();
codec.setConf(conf);
CompressionInputStream input = codec.createInputStream(inputStream);
执行例子
hadoop jar hadooptools.jar Head /dingxiang/warehouse/third/20120923/third_00031_20120923.gz 8 Warning: $HADOOP_HOME is deprecated. 12/09/24 10:37:46 INFO util.NativeCodeLoader: Loaded the native-hadoop library 12/09/24 10:37:46 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 01hbicu2021100.2000.0630.0000.0000.077113305719100.0000.3330.000 01hbicu202340.2000.0630.0000.0000.077113341274270.0000.3330.000 01hbicu209710.2000.0770.0000.0000.231313306680920.0001.0000.000 01hbicu202200.2000.0770.0000.0000.231313306680920.0001.0000.000 01hbicu209750.2000.0630.0000.0000.077113341274270.0000.3330.000 01hbicu2091100.2000.0630.0000.0000.077113305719100.0000.3330.000 01vteic102209280.2000.0660.0000.0000.125113305950700.0000.5000.000 01vteic1081030.2000.0660.0000.0000.125113260775180.0000.5000.000
![(please configure the [header_logo] section in trac.ini)](http://www1.pconline.com.cn/hr/2009/global/images/logo.gif)
