| | 1 | 转: Hbase bulkload源代码阅读笔记 |
| | 2 | |
| | 3 | 1.LoadIncrementalHFiles.doBulkLoad(Path hfofDir, HTable table) |
| | 4 | |
| | 5 | 首先用discoverLoadQueue方法扫描出hfofDir下有哪些fhile文件,再循环调用tryLoad方法把每个文件load进去,这是一个串行的过程。 |
| | 6 | |
| | 7 | {{{ |
| | 8 | Deque<LoadQueueItem> queue = null; |
| | 9 | queue = discoverLoadQueue(hfofDir); |
| | 10 | while (!queue.isEmpty()) { |
| | 11 | LoadQueueItem item = queue.remove(); |
| | 12 | tryLoad(item, conn, table.getTableName(), queue); |
| | 13 | } |
| | 14 | |
| | 15 | }}} |
| | 16 | |
| | 17 | 2.LoadIncrementalHFiles.discoverLoadQueue(Path hfofDir) |
| | 18 | |
| | 19 | hfofDir下是两层目录结构family-->hfile,因此二重循环遍历每个hfile文件,加到Deque里返回,hfofDir下以"_"开头的不是family目录。 |
| | 20 | LoadQueueItem的数据结构用于记录family和hfile |
| | 21 | |
| | 22 | {{{ |
| | 23 | final byte[] family; |
| | 24 | final Path hfilePath; |
| | 25 | |
| | 26 | FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);//第一层目录 |
| | 27 | Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>(); |
| | 28 | for (FileStatus stat : familyDirStatuses) { |
| | 29 | Path familyDir = stat.getPath(); |
| | 30 | // Skip _logs, etc |
| | 31 | if (familyDir.getName().startsWith("_")) continue;//以"_"开头的不是family目录。 |
| | 32 | byte[] family = familyDir.getName().getBytes(); |
| | 33 | Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));//第二层目录 |
| | 34 | for (Path hfile : hfiles) { |
| | 35 | if (hfile.getName().startsWith("_")) continue; |
| | 36 | ret.add(new LoadQueueItem(family, hfile)); |
| | 37 | } |
| | 38 | } |
| | 39 | }}} |
| | 40 | |
| | 41 | |
| | 42 | 3.LoadIncrementalHFiles.tryLoad(final LoadQueueItem item, |
| | 43 | HConnection conn, final byte[] table, |
| | 44 | final Deque<LoadQueueItem> queue) |
| | 45 | |
| | 46 | 首先检查当前的hfile所属的region是否已经发生分裂,如果发生分裂,则将hfile分裂成匹配新region的两个hfile,并将这两个hfile放入deque;哪果没有发生分裂,则调用region所在server的bulkLoadHFile方法将hfile导入。重点是以下几句 |
| | 47 | |
| | 48 | |
| | 49 | {{{ |
| | 50 | if (!hri.containsRange(first, last)) {//判断包含firstkey的当前region是否包含hfile的startkey和endkey,如果不包含说明当前region是分裂过的 |
| | 51 | LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + |
| | 52 | "region. Splitting..."); |
| | 53 | |
| | 54 | HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family); |
| | 55 | Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom"); |
| | 56 | Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top"); |
| | 57 | splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(), |
| | 58 | botOut, topOut);//以当前region的endkey为中值分裂hfile为两个,文件存为.bottom和.top |
| | 59 | |
| | 60 | // Add these back at the *front* of the queue, so there's a lower |
| | 61 | // chance that the region will just split again before we get there. |
| | 62 | //.bottom和.top重新放回queue |
| | 63 | queue.addFirst(new LoadQueueItem(item.family, botOut)); |
| | 64 | queue.addFirst(new LoadQueueItem(item.family, topOut)); |
| | 65 | LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); |
| | 66 | return null; |
| | 67 | } |
| | 68 | |
| | 69 | byte[] regionName = location.getRegionInfo().getRegionName(); |
| | 70 | server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);//如果包含,直接调用region所在server的bulkLoadHFile方法将hfile导入 |
| | 71 | return null; |
| | 72 | }}} |
| | 73 | |
| | 74 | |
| | 75 | 4.LoadIncrementalHFiles.splitStoreFile( |
| | 76 | Configuration conf, Path inFile, |
| | 77 | HColumnDescriptor familyDesc, byte[] splitKey, |
| | 78 | Path bottomOut, Path topOut) |
| | 79 | |
| | 80 | |
| | 81 | {{{ |
| | 82 | //以splitKey为中值,将inFile拷贝分裂为bottomOut和topOut两个文件 |
| | 83 | Reference topReference = new Reference(splitKey, Range.top); |
| | 84 | Reference bottomReference = new Reference(splitKey, Range.bottom); |
| | 85 | |
| | 86 | copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); |
| | 87 | copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); |
| | 88 | }}} |
| | 89 | |
| | 90 | |
| | 91 | 5.HRegionServer.bulkLoadHFile(String hfilePath, byte[] regionName, |
| | 92 | byte[] familyName) |
| | 93 | |
| | 94 | 这是个HRegionInterface下的远程调用,是在regionserver中执行的。 |
| | 95 | |
| | 96 | checkOpen();//检查region是否已停,已经停了便不再导 |
| | 97 | HRegion region = getRegion(regionName);//从regionserver中拿到region |
| | 98 | region.bulkLoadHFile(hfilePath, familyName);//这步才开始导 |
| | 99 | |
| | 100 | 6. |
| | 101 | {{{ |
| | 102 | HRegion.bulkLoadHFile(String hfilePath, byte[] familyName) |
| | 103 | throws IOException { |
| | 104 | startRegionOperation();//上读锁 |
| | 105 | try { |
| | 106 | Store store = getStore(familyName); |
| | 107 | if (store == null) { |
| | 108 | throw new DoNotRetryIOException( |
| | 109 | "No such column family " + Bytes.toStringBinary(familyName)); |
| | 110 | } |
| | 111 | store.bulkLoadHFile(hfilePath);//调store的同名方法 |
| | 112 | } finally { |
| | 113 | closeRegionOperation();//解读锁 |
| | 114 | } |
| | 115 | |
| | 116 | } |
| | 117 | }}} |
| | 118 | |
| | 119 | |
| | 120 | 7.Store.bulkLoadHFile(String srcPathStr) |
| | 121 | |
| | 122 | 就三步:首先将hfile重命名到store的目录下;其次将hfile包装成StoreFile对象装载到Store的列表里。在这两步之前是再一次检查region的startkey和endkey是否跟hfile的匹配 |
| | 123 | |
| | 124 | |
| | 125 | {{{ |
| | 126 | //再次检查是否匹配region |
| | 127 | HRegionInfo hri = region.getRegionInfo(); |
| | 128 | if (!hri.containsRange(firstKey, lastKey)) { |
| | 129 | throw new WrongRegionException( |
| | 130 | "Bulk load file " + srcPathStr + " does not fit inside region " |
| | 131 | + this.region); |
| | 132 | } |
| | 133 | |
| | 134 | //挪文件 |
| | 135 | Path srcPath = new Path(srcPathStr); |
| | 136 | Path dstPath = StoreFile.getRandomFilename(fs, homedir); |
| | 137 | LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath); |
| | 138 | StoreFile.rename(fs, srcPath, dstPath); |
| | 139 | |
| | 140 | |
| | 141 | //装载文件以提供在线服务 |
| | 142 | // Append the new storefile into the list |
| | 143 | this.lock.writeLock().lock();//加store的写锁 |
| | 144 | try { |
| | 145 | ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles); |
| | 146 | newFiles.add(sf); |
| | 147 | this.storefiles = sortAndClone(newFiles); |
| | 148 | notifyChangedReadersObservers(); |
| | 149 | } finally { |
| | 150 | this.lock.writeLock().unlock();//解store的写锁 |
| | 151 | } |
| | 152 | }}} |