Changes between Initial Version and Version 1 of loaddata/bulkload_code


Ignore:
Timestamp:
08/21/2012 04:06:40 PM (14 years ago)
Author:
liaojiaohe
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • loaddata/bulkload_code

    v1 v1  
     1转: Hbase bulkload源代码阅读笔记 
     2 
     31.LoadIncrementalHFiles.doBulkLoad(Path hfofDir, HTable table)  
     4 
     5首先用discoverLoadQueue方法扫描出hfofDir下有哪些fhile文件,再循环调用tryLoad方法把每个文件load进去,这是一个串行的过程。  
     6 
     7{{{ 
     8      Deque<LoadQueueItem> queue = null;  
     9      queue = discoverLoadQueue(hfofDir);  
     10      while (!queue.isEmpty()) {  
     11        LoadQueueItem item = queue.remove();  
     12        tryLoad(item, conn, table.getTableName(), queue);  
     13      }  
     14 
     15}}} 
     16 
     172.LoadIncrementalHFiles.discoverLoadQueue(Path hfofDir)  
     18 
     19hfofDir下是两层目录结构family-->hfile,因此二重循环遍历每个hfile文件,加到Deque里返回,hfofDir下以"_"开头的不是family目录。  
     20LoadQueueItem的数据结构用于记录family和hfile  
     21     
     22{{{ 
     23final byte[] family;  
     24    final Path hfilePath;  
     25 
     26    FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);//第一层目录  
     27    Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>();  
     28    for (FileStatus stat : familyDirStatuses) {  
     29      Path familyDir = stat.getPath();  
     30      // Skip _logs, etc  
     31      if (familyDir.getName().startsWith("_")) continue;//以"_"开头的不是family目录。  
     32      byte[] family = familyDir.getName().getBytes();  
     33      Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));//第二层目录  
     34      for (Path hfile : hfiles) {  
     35        if (hfile.getName().startsWith("_")) continue;  
     36        ret.add(new LoadQueueItem(family, hfile));  
     37      }  
     38    } 
     39}}} 
     40  
     41 
     423.LoadIncrementalHFiles.tryLoad(final LoadQueueItem item,  
     43      HConnection conn, final byte[] table,  
     44      final Deque<LoadQueueItem> queue)  
     45 
     46首先检查当前的hfile所属的region是否已经发生分裂,如果发生分裂,则将hfile分裂成匹配新region的两个hfile,并将这两个hfile放入deque;哪果没有发生分裂,则调用region所在server的bulkLoadHFile方法将hfile导入。重点是以下几句  
     47 
     48 
     49{{{ 
     50          if (!hri.containsRange(first, last)) {//判断包含firstkey的当前region是否包含hfile的startkey和endkey,如果不包含说明当前region是分裂过的  
     51            LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +  
     52                "region. Splitting...");  
     53 
     54            HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family);  
     55            Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom");  
     56            Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top");  
     57            splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(),  
     58                botOut, topOut);//以当前region的endkey为中值分裂hfile为两个,文件存为.bottom和.top  
     59 
     60            // Add these back at the *front* of the queue, so there's a lower  
     61            // chance that the region will just split again before we get there.  
     62            //.bottom和.top重新放回queue  
     63            queue.addFirst(new LoadQueueItem(item.family, botOut));  
     64            queue.addFirst(new LoadQueueItem(item.family, topOut));  
     65            LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);  
     66            return null;  
     67          }  
     68 
     69          byte[] regionName = location.getRegionInfo().getRegionName();  
     70          server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);//如果包含,直接调用region所在server的bulkLoadHFile方法将hfile导入  
     71          return null; 
     72}}} 
     73  
     74 
     754.LoadIncrementalHFiles.splitStoreFile(  
     76      Configuration conf, Path inFile,  
     77      HColumnDescriptor familyDesc, byte[] splitKey,  
     78      Path bottomOut, Path topOut)  
     79 
     80 
     81{{{ 
     82//以splitKey为中值,将inFile拷贝分裂为bottomOut和topOut两个文件  
     83    Reference topReference = new Reference(splitKey, Range.top);  
     84    Reference bottomReference = new Reference(splitKey, Range.bottom);  
     85 
     86    copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);  
     87    copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); 
     88}}} 
     89  
     90 
     915.HRegionServer.bulkLoadHFile(String hfilePath, byte[] regionName,  
     92      byte[] familyName)  
     93 
     94这是个HRegionInterface下的远程调用,是在regionserver中执行的。  
     95 
     96    checkOpen();//检查region是否已停,已经停了便不再导     
     97    HRegion region = getRegion(regionName);//从regionserver中拿到region  
     98    region.bulkLoadHFile(hfilePath, familyName);//这步才开始导  
     99 
     1006. 
     101{{{ 
     102HRegion.bulkLoadHFile(String hfilePath, byte[] familyName)  
     103  throws IOException {  
     104    startRegionOperation();//上读锁  
     105    try {  
     106      Store store = getStore(familyName);  
     107      if (store == null) {  
     108        throw new DoNotRetryIOException(  
     109            "No such column family " + Bytes.toStringBinary(familyName));  
     110      }  
     111      store.bulkLoadHFile(hfilePath);//调store的同名方法  
     112    } finally {  
     113      closeRegionOperation();//解读锁  
     114    }  
     115 
     116  } 
     117}}} 
     118  
     119 
     1207.Store.bulkLoadHFile(String srcPathStr)  
     121 
     122就三步:首先将hfile重命名到store的目录下;其次将hfile包装成StoreFile对象装载到Store的列表里。在这两步之前是再一次检查region的startkey和endkey是否跟hfile的匹配  
     123 
     124 
     125{{{ 
     126//再次检查是否匹配region  
     127      HRegionInfo hri = region.getRegionInfo();  
     128      if (!hri.containsRange(firstKey, lastKey)) {  
     129        throw new WrongRegionException(  
     130            "Bulk load file " + srcPathStr + " does not fit inside region "  
     131            + this.region);  
     132      }  
     133 
     134//挪文件  
     135    Path srcPath = new Path(srcPathStr);  
     136    Path dstPath = StoreFile.getRandomFilename(fs, homedir);  
     137    LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath);  
     138    StoreFile.rename(fs, srcPath, dstPath);  
     139 
     140 
     141//装载文件以提供在线服务  
     142    // Append the new storefile into the list  
     143    this.lock.writeLock().lock();//加store的写锁  
     144    try {  
     145      ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);  
     146      newFiles.add(sf);  
     147      this.storefiles = sortAndClone(newFiles);  
     148      notifyChangedReadersObservers();  
     149    } finally {  
     150      this.lock.writeLock().unlock();//解store的写锁  
     151    } 
     152}}}