hadoop_tools: ScanTable.java

File ScanTable.java, 5.6 KB (added by liaojiaohe, 14 years ago)
Line 
1package cn.pconline.hadooptools;
2
3import java.io.IOException;
4import org.apache.commons.logging.Log;
5import org.apache.commons.logging.LogFactory;
6import org.apache.hadoop.conf.Configuration;
7import org.apache.hadoop.fs.Path;
8import org.apache.hadoop.hbase.HBaseConfiguration;
9import org.apache.hadoop.hbase.client.Result;
10import org.apache.hadoop.hbase.client.Scan;
11import org.apache.hadoop.hbase.filter.FilterList;
12import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
13import org.apache.hadoop.hbase.filter.ParseFilter;
14import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
15import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
16import org.apache.hadoop.hbase.mapreduce.TableMapper;
17import org.apache.hadoop.hbase.util.Bytes;
18import org.apache.hadoop.io.Text;
19import org.apache.hadoop.mapreduce.Job;
20import org.apache.hadoop.mapreduce.Mapper.Context;
21import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
22import org.apache.hadoop.util.GenericOptionsParser;
23
24/*
25 * Copyright 1997-2012
26 * http://www.pconline.com.cn
27 */
28/**
29 *
30 * @author larry <larryliao@pconline.com.cn>
31 */
32public class ScanTable {
33
34    private static final Log LOG =
35            LogFactory.getLog(ScanTable.class.getName());
36    static final String NAME = "ScanTable";
37
38    static enum Counters {
39
40        ROWS
41    }
42    static String OUTPUT_COLUMNS = "OUTPUT_COLUMNS";
43
44    public static void main(String[] args) throws Exception {
45        Configuration conf = HBaseConfiguration.create();
46        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
47        if (otherArgs.length < 3) {
48            System.err.println("ERROR: Wrong number of parameters: " + args.length);
49            System.err.println("Usage: ScanTable -conf <configuration file> -D <property=value>"
50                    + " <tablename> <outputDir> <outputColumn> [filter]");
51            System.err.println(" <outputColumn> parameter can use count or row or cf:colum1,cf:colum2 ");
52            System.err.println(" [filter] parameter can send a filter ");
53            System.exit(-1);
54        }
55
56        Job job = new Job(conf, ScanTable.NAME);
57        job.setJarByClass(ScanTable.class);
58        job.setMapperClass(MapClass.class);
59
60
61        Scan scan = new Scan();
62        scan.setCaching(500);
63        scan.setCacheBlocks(false);
64
65        FilterList flist = new FilterList();
66        Class mapClass = null;
67
68        if (otherArgs.length > 3) {
69            flist.addFilter(new ParseFilter().parseFilterString(otherArgs[3]));
70        }
71
72        if (otherArgs[2].equalsIgnoreCase("row")) {
73            flist.addFilter(new FirstKeyOnlyFilter());
74            mapClass = RowClass.class;
75
76        } else if (otherArgs[2].equalsIgnoreCase("count")) {
77            flist.addFilter(new FirstKeyOnlyFilter());
78            mapClass = CountClass.class;
79
80        } else {
81            job.getConfiguration().setStrings(OUTPUT_COLUMNS, otherArgs[2]);
82            mapClass = MapClass.class;
83        }
84
85        if (flist.getFilters().size() > 0) {
86            scan.setFilter(flist);
87        }
88
89        TableMapReduceUtil.initTableMapperJob(
90                otherArgs[0], // input table
91                scan, // Scan instance to control CF and attribute selection
92                mapClass, // mapper class
93                null, // mapper output key
94                null, // mapper output value
95                job);
96
97        job.setNumReduceTasks(0);
98
99        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
100        System.exit(job.waitForCompletion(true) ? 0 : 1);
101
102    }
103
104    public static class CountClass extends TableMapper<ImmutableBytesWritable, Result> {
105
106        @Override
107        public void map(ImmutableBytesWritable row, Result value, Context context)
108                throws IOException, InterruptedException {
109            context.getCounter(Counters.ROWS).increment(1);
110        }
111    }
112
113    public static class RowClass extends TableMapper<Text, Text> {
114
115        @Override
116        public void map(ImmutableBytesWritable row, Result value, Context context)
117                throws IOException, InterruptedException {
118            context.write(new Text(new String(row.get(), "UTF-8")), new Text(""));
119        }
120    }
121
122    public static class MapClass extends TableMapper<Text, Text> {
123
124        static byte[][] cfs;
125        static byte[][] cols;
126        static int length = 0;
127
128        @Override
129        protected void setup(Context context) throws IOException, InterruptedException {
130            String[] colums = context.getConfiguration().get(OUTPUT_COLUMNS).split(",");
131            cfs = new byte[colums.length][];
132            cols = new byte[colums.length][];
133            length = colums.length;
134            for (int i = 0; i < length; i++) {
135                cfs[i] = Bytes.toBytes(colums[i].split(":")[0]);
136                cols[i] = Bytes.toBytes(colums[i].split(":")[1]);
137            }
138            super.setup(context);
139
140        }
141        static byte[] columnFamily = Bytes.toBytes("base");
142
143        @Override
144        public void map(ImmutableBytesWritable row, Result value, Context context)
145                throws IOException, InterruptedException {
146            StringBuilder buf = null;
147
148            for (int i = 0; i < length; i++) {
149                byte[] v = value.getValue(cfs[i], cols[i]);
150                if (i == 0) {
151                    buf = new StringBuilder();
152                } else {
153                    buf.append("\t");
154                }
155                if (v != null) {
156                    buf.append(new String(v, "UTF-8"));
157                }
158            }
159
160            if (buf.toString().length() != length -1 ) { //only tab
161                context.write(new Text(buf.toString()), new Text(""));
162            }
163
164        }
165    }
166}