转: Hbase bulkload源代码阅读笔记 1.LoadIncrementalHFiles.doBulkLoad(Path hfofDir, HTable table) 首先用discoverLoadQueue方法扫描出hfofDir下有哪些fhile文件,再循环调用tryLoad方法把每个文件load进去,这是一个串行的过程。 {{{ Deque queue = null; queue = discoverLoadQueue(hfofDir); while (!queue.isEmpty()) { LoadQueueItem item = queue.remove(); tryLoad(item, conn, table.getTableName(), queue); } }}} 2.LoadIncrementalHFiles.discoverLoadQueue(Path hfofDir) hfofDir下是两层目录结构family-->hfile,因此二重循环遍历每个hfile文件,加到Deque里返回,hfofDir下以"_"开头的不是family目录。 LoadQueueItem的数据结构用于记录family和hfile {{{ final byte[] family; final Path hfilePath; FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);//第一层目录 Deque ret = new LinkedList(); for (FileStatus stat : familyDirStatuses) { Path familyDir = stat.getPath(); // Skip _logs, etc if (familyDir.getName().startsWith("_")) continue;//以"_"开头的不是family目录。 byte[] family = familyDir.getName().getBytes(); Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));//第二层目录 for (Path hfile : hfiles) { if (hfile.getName().startsWith("_")) continue; ret.add(new LoadQueueItem(family, hfile)); } } }}} 3.LoadIncrementalHFiles.tryLoad(final LoadQueueItem item, HConnection conn, final byte[] table, final Deque queue) 首先检查当前的hfile所属的region是否已经发生分裂,如果发生分裂,则将hfile分裂成匹配新region的两个hfile,并将这两个hfile放入deque;哪果没有发生分裂,则调用region所在server的bulkLoadHFile方法将hfile导入。重点是以下几句 {{{ if (!hri.containsRange(first, last)) {//判断包含firstkey的当前region是否包含hfile的startkey和endkey,如果不包含说明当前region是分裂过的 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family); Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom"); Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top"); splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(), botOut, topOut);//以当前region的endkey为中值分裂hfile为两个,文件存为.bottom和.top // Add these back at the *front* of the queue, so there's a lower // chance that the region will just split again before we get there. //.bottom和.top重新放回queue queue.addFirst(new LoadQueueItem(item.family, botOut)); queue.addFirst(new LoadQueueItem(item.family, topOut)); LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); return null; } byte[] regionName = location.getRegionInfo().getRegionName(); server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);//如果包含,直接调用region所在server的bulkLoadHFile方法将hfile导入 return null; }}} 4.LoadIncrementalHFiles.splitStoreFile( Configuration conf, Path inFile, HColumnDescriptor familyDesc, byte[] splitKey, Path bottomOut, Path topOut) {{{ //以splitKey为中值,将inFile拷贝分裂为bottomOut和topOut两个文件 Reference topReference = new Reference(splitKey, Range.top); Reference bottomReference = new Reference(splitKey, Range.bottom); copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); }}} 5.HRegionServer.bulkLoadHFile(String hfilePath, byte[] regionName, byte[] familyName) 这是个HRegionInterface下的远程调用,是在regionserver中执行的。 checkOpen();//检查region是否已停,已经停了便不再导 HRegion region = getRegion(regionName);//从regionserver中拿到region region.bulkLoadHFile(hfilePath, familyName);//这步才开始导 6. {{{ HRegion.bulkLoadHFile(String hfilePath, byte[] familyName) throws IOException { startRegionOperation();//上读锁 try { Store store = getStore(familyName); if (store == null) { throw new DoNotRetryIOException( "No such column family " + Bytes.toStringBinary(familyName)); } store.bulkLoadHFile(hfilePath);//调store的同名方法 } finally { closeRegionOperation();//解读锁 } } }}} 7.Store.bulkLoadHFile(String srcPathStr) 就三步:首先将hfile重命名到store的目录下;其次将hfile包装成StoreFile对象装载到Store的列表里。在这两步之前是再一次检查region的startkey和endkey是否跟hfile的匹配 {{{ //再次检查是否匹配region HRegionInfo hri = region.getRegionInfo(); if (!hri.containsRange(firstKey, lastKey)) { throw new WrongRegionException( "Bulk load file " + srcPathStr + " does not fit inside region " + this.region); } //挪文件 Path srcPath = new Path(srcPathStr); Path dstPath = StoreFile.getRandomFilename(fs, homedir); LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath); StoreFile.rename(fs, srcPath, dstPath); //装载文件以提供在线服务 // Append the new storefile into the list this.lock.writeLock().lock();//加store的写锁 try { ArrayList newFiles = new ArrayList(storefiles); newFiles.add(sf); this.storefiles = sortAndClone(newFiles); notifyChangedReadersObservers(); } finally { this.lock.writeLock().unlock();//解store的写锁 } }}}