| Version 8 (modified by liaojiaohe, 13 years ago) (diff) |
|---|
输入输出主要有下面种类:
文件类型输入:
TextInputFormat?
用于读取纯文本文件,文件被分为一系列以LF或者CR结束的行,key是每一行的位置(偏移量,LongWritable类型),value是每一行的内容,Text类型。
这个在我们的项目中比较常用,不做说明hadoop会使用这个作为输入,只要往里面添加文件路径就可以了
FileInputFormat.addInputPath(job, path);
KeyValueTextInputFormat?
同样用于读取文件,如果行被分隔符(缺省是tab)分割为两部分,第一部分为key,剩下的部分为value;如果没有分隔符,整行作为 key,value为空
SequenceFileInputFormat? (这种方式性能会比较好)
用于读取sequence file。 sequence file是Hadoop用于存储数据自定义格式的binary文件。
它有两个子类:SequenceFileAsBinaryInputFormat?,将 key和value以BytesWritable的类型读出;
SequenceFileAsTextInputFormat?,将key和value以 Text的类型读出
SequenceFileInputFilter?
根据filter从sequence文件中取得部分满足条件的数据,通过setFilterClass指定Filter,内置了三种 Filter,RegexFilter取key值满足指定的正则表达式的记录;
PercentFilter通过指定参数f,取记录行数%f==0的记录;MD5Filter通过指定参数f,取MD5(key)%f==0的记录。
NLineInputFormat
可以将文件以行为单位进行split,比如文件的每一行对应一个map。得到的key是每一行的位置(偏移量,LongWritable类型),value是每一行的内容,Text类型。
CompositeInputFormat?,用于多个数据源的join。(可以参考hadoop例子join)
static String compose(String op, Class<? extends InputFormat> inf, Path... path)
参数op表示进行连接的类型:外连接还是内连接。paths是数据源文件,这是一个String数组或Path数组。
另外还是指定用哪种InputFormat来读取这些数据源文件--这要求所有的数据源文件可以用同一种方式来解析key和value
ZipFileInputFormat?
zip文件作为输入,每个zip文件对应一个map,hadoop对很多的压缩方式是透明的,但zip文件里面可能会有目录比较复杂
用关系数据库:
DBInputFormat
对mysql支持比较好,1.0.3版对oracle时候split有问题,具体是
OracleDBRecordReader 这个类 84行 if (split.getLength() > 0 && split.getStart() > 0){ 这个判断有问题,第一个split start值就是为0,要去掉
使用DBInputformat要给一个读数据的sql和读总数的sql过去,用户map分割
DBInputFormat.setInput(job, DataRecord.class, sql, countSql);
另外要增加驱动,jobtracker 机器上的hadoop/lib目录要放驱动,不用重启,
其他的机器使用下面语句增加到class path上
DistributedCache.addFileToClassPath(new Path("/lib/ojdbc14.jar"), conf, FileSystem.get(conf));
HBASE作为输入
使用hbase输入比较简单,hadoop会根据表的region数定义map的数量
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(
T_APP_DEVICE, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
null, // mapper output key
null, // mapper output value
job);
输出到关系数据库
DBOuputFormat
在实际项目中没有使用过,如果要写数据库我们的做法是生成入库的sql,使用另外的程序单线程执行,这样会比较好控制。
HBASE作为输出
对一个表输出比较简单
TableMapReduceUtil.initTableReducerJob(
Constant.TABLE_LAUNCH_REPORT, // output table
null, // reducer class
job);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
在reduce里面输出就可以了
Put put = new Put(row);
put.add(Constant.CF_BASE, qualifier, Bytes.toBytes(sum));
context.write(null, put);
如果要对其他表的操作,或者执行递增操作可以在reduce的时候新建htable对象
public void setup(Context context) throws IOException {
String table = context.getConfiguration().get(TableOutputFormat.OUTPUT_TABLE);
hTable = new HTable(context.getConfiguration(), table);
}
在reduce的时候使用hTable操作hbase
row = HBaseUtil.getLaunchRowByte(items[2], items[1], null);
qualifier = Bytes.toBytes(StringUtil.join(UNDERLINE_PARTITION,
"c" + items[0], items[3], items[4]));
hTable.incrementColumnValue(row, Constant.CF_AREA, qualifier, result.get());
使用文件做输出
![(please configure the [header_logo] section in trac.ini)](http://www1.pconline.com.cn/hr/2009/global/images/logo.gif)