wiki:loaddata/bulkload_code

转: Hbase bulkload源代码阅读笔记

1.LoadIncrementalHFiles.doBulkLoad(Path hfofDir, HTable table)

首先用discoverLoadQueue方法扫描出hfofDir下有哪些fhile文件,再循环调用tryLoad方法把每个文件load进去,这是一个串行的过程。

      Deque<LoadQueueItem> queue = null; 
      queue = discoverLoadQueue(hfofDir); 
      while (!queue.isEmpty()) { 
        LoadQueueItem item = queue.remove(); 
        tryLoad(item, conn, table.getTableName(), queue); 
      } 

2.LoadIncrementalHFiles.discoverLoadQueue(Path hfofDir)

hfofDir下是两层目录结构family-->hfile,因此二重循环遍历每个hfile文件,加到Deque里返回,hfofDir下以"_"开头的不是family目录。 LoadQueueItem的数据结构用于记录family和hfile

final byte[] family; 
    final Path hfilePath; 

    FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);//第一层目录 
    Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>(); 
    for (FileStatus stat : familyDirStatuses) { 
      Path familyDir = stat.getPath(); 
      // Skip _logs, etc 
      if (familyDir.getName().startsWith("_")) continue;//以"_"开头的不是family目录。 
      byte[] family = familyDir.getName().getBytes(); 
      Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));//第二层目录 
      for (Path hfile : hfiles) { 
        if (hfile.getName().startsWith("_")) continue; 
        ret.add(new LoadQueueItem(family, hfile)); 
      } 
    }

3.LoadIncrementalHFiles.tryLoad(final LoadQueueItem? item,

HConnection conn, final byte[] table, final Deque<LoadQueueItem?> queue)

首先检查当前的hfile所属的region是否已经发生分裂,如果发生分裂,则将hfile分裂成匹配新region的两个hfile,并将这两个hfile放入deque;哪果没有发生分裂,则调用region所在server的bulkLoadHFile方法将hfile导入。重点是以下几句

          if (!hri.containsRange(first, last)) {//判断包含firstkey的当前region是否包含hfile的startkey和endkey,如果不包含说明当前region是分裂过的 
            LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + 
                "region. Splitting..."); 

            HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family); 
            Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom"); 
            Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top"); 
            splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(), 
                botOut, topOut);//以当前region的endkey为中值分裂hfile为两个,文件存为.bottom和.top 

            // Add these back at the *front* of the queue, so there's a lower 
            // chance that the region will just split again before we get there. 
            //.bottom和.top重新放回queue 
            queue.addFirst(new LoadQueueItem(item.family, botOut)); 
            queue.addFirst(new LoadQueueItem(item.family, topOut)); 
            LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); 
            return null; 
          } 

          byte[] regionName = location.getRegionInfo().getRegionName(); 
          server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);//如果包含,直接调用region所在server的bulkLoadHFile方法将hfile导入 
          return null;

4.LoadIncrementalHFiles.splitStoreFile(

Configuration conf, Path inFile, HColumnDescriptor familyDesc, byte[] splitKey, Path bottomOut, Path topOut)

//以splitKey为中值,将inFile拷贝分裂为bottomOut和topOut两个文件 
    Reference topReference = new Reference(splitKey, Range.top); 
    Reference bottomReference = new Reference(splitKey, Range.bottom); 

    copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); 
    copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);

5.HRegionServer.bulkLoadHFile(String hfilePath, byte[] regionName,

byte[] familyName)

这是个HRegionInterface下的远程调用,是在regionserver中执行的。

checkOpen();检查region是否已停,已经停了便不再导 HRegion region = getRegion(regionName);从regionserver中拿到region region.bulkLoadHFile(hfilePath, familyName);这步才开始导

6.

HRegion.bulkLoadHFile(String hfilePath, byte[] familyName) 
  throws IOException { 
    startRegionOperation();//上读锁 
    try { 
      Store store = getStore(familyName); 
      if (store == null) { 
        throw new DoNotRetryIOException( 
            "No such column family " + Bytes.toStringBinary(familyName)); 
      } 
      store.bulkLoadHFile(hfilePath);//调store的同名方法 
    } finally { 
      closeRegionOperation();//解读锁 
    } 

  }

7.Store.bulkLoadHFile(String srcPathStr)

就三步:首先将hfile重命名到store的目录下;其次将hfile包装成StoreFile对象装载到Store的列表里。在这两步之前是再一次检查region的startkey和endkey是否跟hfile的匹配

//再次检查是否匹配region 
      HRegionInfo hri = region.getRegionInfo(); 
      if (!hri.containsRange(firstKey, lastKey)) { 
        throw new WrongRegionException( 
            "Bulk load file " + srcPathStr + " does not fit inside region " 
            + this.region); 
      } 

//挪文件 
    Path srcPath = new Path(srcPathStr); 
    Path dstPath = StoreFile.getRandomFilename(fs, homedir); 
    LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath); 
    StoreFile.rename(fs, srcPath, dstPath); 


//装载文件以提供在线服务 
    // Append the new storefile into the list 
    this.lock.writeLock().lock();//加store的写锁 
    try { 
      ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles); 
      newFiles.add(sf); 
      this.storefiles = sortAndClone(newFiles); 
      notifyChangedReadersObservers(); 
    } finally { 
      this.lock.writeLock().unlock();//解store的写锁 
    }