package cn.pconline.hadooptools;

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/*
 * Copyright 1997-2012
 * http://www.pconline.com.cn
 */
/**
 *
 * @author larry <larryliao@pconline.com.cn>
 */
public class ScanTable {

    private static final Log LOG =
            LogFactory.getLog(ScanTable.class.getName());
    static final String NAME = "ScanTable";

    static enum Counters {

        ROWS
    }
    static String OUTPUT_COLUMNS = "OUTPUT_COLUMNS";

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 3) {
            System.err.println("ERROR: Wrong number of parameters: " + args.length);
            System.err.println("Usage: ScanTable -conf <configuration file> -D <property=value>"
                    + " <tablename> <outputDir> <outputColumn> [filter]");
            System.err.println(" <outputColumn> parameter can use count or row or cf:colum1,cf:colum2 ");
            System.err.println(" [filter] parameter can send a filter ");
            System.exit(-1);
        }

        Job job = new Job(conf, ScanTable.NAME);
        job.setJarByClass(ScanTable.class);
        job.setMapperClass(MapClass.class);


        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);

        FilterList flist = new FilterList();
        Class mapClass = null;

        if (otherArgs.length > 3) {
            flist.addFilter(new ParseFilter().parseFilterString(otherArgs[3]));
        }

        if (otherArgs[2].equalsIgnoreCase("row")) {
            flist.addFilter(new FirstKeyOnlyFilter());
            mapClass = RowClass.class;

        } else if (otherArgs[2].equalsIgnoreCase("count")) {
            flist.addFilter(new FirstKeyOnlyFilter());
            mapClass = CountClass.class;

        } else {
            job.getConfiguration().setStrings(OUTPUT_COLUMNS, otherArgs[2]);
            mapClass = MapClass.class;
        }

        if (flist.getFilters().size() > 0) {
            scan.setFilter(flist);
        }

        TableMapReduceUtil.initTableMapperJob(
                otherArgs[0], // input table
                scan, // Scan instance to control CF and attribute selection
                mapClass, // mapper class
                null, // mapper output key
                null, // mapper output value
                job);

        job.setNumReduceTasks(0);

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    public static class CountClass extends TableMapper<ImmutableBytesWritable, Result> {

        @Override
        public void map(ImmutableBytesWritable row, Result value, Context context)
                throws IOException, InterruptedException {
            context.getCounter(Counters.ROWS).increment(1);
        }
    }

    public static class RowClass extends TableMapper<Text, Text> {

        @Override
        public void map(ImmutableBytesWritable row, Result value, Context context)
                throws IOException, InterruptedException {
            context.write(new Text(new String(row.get(), "UTF-8")), new Text(""));
        }
    }

    public static class MapClass extends TableMapper<Text, Text> {

        static byte[][] cfs;
        static byte[][] cols;
        static int length = 0;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            String[] colums = context.getConfiguration().get(OUTPUT_COLUMNS).split(",");
            cfs = new byte[colums.length][];
            cols = new byte[colums.length][];
            length = colums.length;
            for (int i = 0; i < length; i++) {
                cfs[i] = Bytes.toBytes(colums[i].split(":")[0]);
                cols[i] = Bytes.toBytes(colums[i].split(":")[1]);
            }
            super.setup(context);

        }
        static byte[] columnFamily = Bytes.toBytes("base");

        @Override
        public void map(ImmutableBytesWritable row, Result value, Context context)
                throws IOException, InterruptedException {
            StringBuilder buf = null;

            for (int i = 0; i < length; i++) {
                byte[] v = value.getValue(cfs[i], cols[i]);
                if (i == 0) {
                    buf = new StringBuilder();
                } else {
                    buf.append("\t");
                }
                if (v != null) {
                    buf.append(new String(v, "UTF-8"));
                }
            }

            if (buf.toString().length() != length -1 ) { //only tab
                context.write(new Text(buf.toString()), new Text(""));
            }

        }
    }
}
