输入输出主要有下面种类: '''文件类型输入:''' TextInputFormat [[BR]] 用于读取纯文本文件,文件被分为一系列以LF或者CR结束的行,key是每一行的位置(偏移量,LongWritable类型),value是每一行的内容,Text类型。[[BR]] 这个在我们的项目中比较常用,不做说明hadoop会使用这个作为输入,只要往里面添加文件路径就可以了 {{{ FileInputFormat.addInputPath(job, path); }}} KeyValueTextInputFormat [[BR]] 同样用于读取文件,如果行被分隔符(缺省是tab)分割为两部分,第一部分为key,剩下的部分为value;如果没有分隔符,整行作为 key,value为空[[BR]] SequenceFileInputFormat (这种方式性能会比较好)[[BR]] 用于读取sequence file。 sequence file是Hadoop用于存储数据自定义格式的binary文件。[[BR]] 它有两个子类:SequenceFileAsBinaryInputFormat,将 key和value以BytesWritable的类型读出;[[BR]] SequenceFileAsTextInputFormat,将key和value以 Text的类型读出[[BR]] SequenceFileInputFilter[[BR]] 根据filter从sequence文件中取得部分满足条件的数据,通过setFilterClass指定Filter,内置了三种 Filter,RegexFilter取key值满足指定的正则表达式的记录;[[BR]] PercentFilter通过指定参数f,取记录行数%f==0的记录;MD5Filter通过指定参数f,取MD5(key)%f==0的记录。[[BR]] NLineInputFormat[[BR]] 可以将文件以行为单位进行split,比如文件的每一行对应一个map。得到的key是每一行的位置(偏移量,LongWritable类型),value是每一行的内容,Text类型。 CompositeInputFormat,用于多个数据源的join。(可以参考hadoop例子join)[[BR]] {{{ static String compose(String op, Class inf, Path... path) }}} 参数op表示进行连接的类型:外连接还是内连接。paths是数据源文件,这是一个String数组或Path数组。[[BR]] 另外还是指定用哪种InputFormat来读取这些数据源文件--这要求所有的数据源文件可以用同一种方式来解析key和value[[BR]] ZipFileInputFormat[[BR]] zip文件作为输入,每个zip文件对应一个map,hadoop对很多的压缩方式是透明的,但zip文件里面可能会有目录比较复杂 '''用关系数据库:'''[[BR]] DBInputFormat[[BR]] 对mysql支持比较好,1.0.3版对oracle时候split有问题,具体是[[BR]] OracleDBRecordReader 这个类 84行 if (split.getLength() > 0 && split.getStart() > 0){ 这个判断有问题,第一个split start值就是为0,要去掉[[BR]] 使用DBInputformat要给一个读数据的sql和读总数的sql过去,用户map分割 {{{ DBInputFormat.setInput(job, DataRecord.class, sql, countSql); }}} 另外要增加驱动,jobtracker 机器上的hadoop/lib目录要放驱动,不用重启,[[BR]] 其他的机器使用下面语句增加到class path上 {{{ DistributedCache.addFileToClassPath(new Path("/lib/ojdbc14.jar"), conf, FileSystem.get(conf)); }}} HBASE作为输入[[BR]] 使用hbase输入比较简单,hadoop会根据表的region数定义map的数量 {{{ Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob( T_APP_DEVICE, // input table scan, // Scan instance to control CF and attribute selection MyMapper.class, // mapper class null, // mapper output key null, // mapper output value job); }}} 输出到关系数据库[[BR]] DBOuputFormat[[BR]] 在实际项目中没有使用过,如果要写数据库我们的做法是生成入库的sql,使用另外的程序单线程执行,这样会比较好控制。 HBASE作为输出[[BR]] 对一个表输出比较简单 {{{ TableMapReduceUtil.initTableReducerJob( Constant.TABLE_LAUNCH_REPORT, // output table null, // reducer class job); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); 在reduce里面输出就可以了 Put put = new Put(row); put.add(Constant.CF_BASE, qualifier, Bytes.toBytes(sum)); context.write(null, put); }}} 如果要对其他表的操作,或者执行递增操作可以在reduce的时候新建htable对象 {{{ public void setup(Context context) throws IOException { String table = context.getConfiguration().get(TableOutputFormat.OUTPUT_TABLE); hTable = new HTable(context.getConfiguration(), table); } 在reduce的时候使用hTable操作hbase row = HBaseUtil.getLaunchRowByte(items[2], items[1], null); qualifier = Bytes.toBytes(StringUtil.join(UNDERLINE_PARTITION, "c" + items[0], items[3], items[4])); hTable.incrementColumnValue(row, Constant.CF_AREA, qualifier, result.get()); }}} 使用文件做输出