启动NameNode的进程

sbin/start-dfs.sh

里面的核心代码如下

#通过命令查询处所有的NameNode
NAMENODES=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -namenodes 2>/dev/null)
if [[ -z "${NAMENODES}" ]]; then
  NAMENODES=$(hostname)
fi
echo "Starting namenodes on [${NAMENODES}]"
#hdfs namenode来启动NameNode
hadoop_uservar_su hdfs namenode "${HADOOP_HDFS_HOME}/bin/hdfs" \
    --workers \
    --config "${HADOOP_CONF_DIR}" \
    --hostnames "${NAMENODES}" \
    --daemon start \
    namenode ${nameStartOpt}
HADOOP_JUMBO_RETCOUNTER=$?

启动的是NameNode类的main方法

 public static void main(String argv[]) throws Exception {
    if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
      System.exit(0);
    }

    try {
      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
     //创建一个NameNode
      NameNode namenode = createNameNode(argv, null);
      if (namenode != null) {
        namenode.join();
      }
    } catch (Throwable e) {
      LOG.fatal("Failed to start namenode.", e);
      terminate(1, e);
    }
  }

由于没有传其它参数,所以创建NameNode 是用的默认的方法。

public static NameNode createNameNode(String argv[], Configuration conf)
      throws IOException {
  ......//次数省略N行代码
 //执行下列方法
      default: {
      //初始化 Metric系统的NameNode
        DefaultMetricsSystem.initialize("NameNode");
        //执行创建NameNode
        return new NameNode(conf);
      }
    }
  }

以上代码可以看到先执行DefaultMetricsSystem.initialize("NameNode"); 来给指标系统注册,以便对NameNode进行监控,具体什么是指标系统 详看(https://github.com/ColZer/DigAndBuried/blob/master/hadoop/metric-learn.md)。然后再执行创建NameNode

//创建NameNode的方法
protected NameNode(Configuration conf, NamenodeRole role) 
      throws IOException { 
    this.conf = conf;
    this.role = role;
    //设置NameNode地址 默认的就是fs.defaultFS配置对应的值hdfs://localhost:9000
    setClientNamenodeAddress(conf);
    //获取服务器ID
    String nsId = getNameServiceId(conf);
    //获取HA的NameNodeID
    String namenodeId = HAUtil.getNameNodeId(conf, nsId);
    //是否启用了HA
    this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
    //更新状态
    state = createHAState(getStartupOption(conf));
    //持否允许读 读取dfs.ha.allow.stale.reads,设置namenode在备用状态时是否允许读操作,默认为false
    this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
    this.haContext = createHAContext();
    try {
      //联邦环境下,使用该方法配置一系列使用一个逻辑上的nsId组合在一起的namenode
      initializeGenericKeys(conf, nsId, namenodeId);
      //重要的环节
      //namenode初始化
      initialize(conf);
      try {
        haContext.writeLock();
        state.prepareToEnterState(haContext);
        //namenode进入相应状态:active state/backup state/standby state
        state.enterState(haContext);
      } finally {
        haContext.writeUnlock();
      }
    } catch (IOException e) {
      this.stop();
      throw e;
    } catch (HadoopIllegalArgumentException e) {
      this.stop();
      throw e;
    }
  }

下面我们再讲讲NameNode.java的initialize(conf);

protected void initialize(Configuration conf) throws IOException {
    //监控时间间隔 用逗号分隔的整数集表示在Namenode和Datanode上对百分比延迟指标的期望的滚动间隔(以秒为单位)。默认情况下,百分比延迟指标是禁用的。
    if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
      String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
      if (intervals != null) {
        conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
          intervals);
      }
    }
   ////设置权限,根据hadoop.security.authentication获取认证方式及规则
    UserGroupInformation.setConfiguration(conf);
    //登录:如果认证方式为simple则退出该方法
    //否则调用UserGroupInformation.loginUserFromKeytab进行登陆,登陆使用dfs.namenode.kerberos.principal作为用户名
//初始化登录认证,如果HADOOP开启了Kerberos认证,则进行认证
//认证的配置信息来自hdfs-site.xml
//Dfs.namenode.keytab.file                #keytab文件
//Dfs.namenode.kerberos.principal           #kerberos认证个体
//最后调用接口进行认证 UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename);
    loginAsNameNodeUser(conf);

    //初始化度量系统,用于度量namenode服务状态
    NameNode.initMetrics(conf, this.getRole());
    StartupProgressMetrics.register(startupProgress);

    if (NamenodeRole.NAMENODE == role) {
      //启动http服务器
      startHttpServer(conf);
    }

    this.spanReceiverHost = SpanReceiverHost.getInstance(conf);

    //根据命令对命名空间进行操作,如:前文所述启动时加载本地命名空间镜像和应用编辑日志,在内存中建立命名空间的映像
    loadNamesystem(conf);

    //创建RPC服务器
    rpcServer = createRpcServer(conf);
    if (clientNamenodeAddress == null) {
      // This is expected for MiniDFSCluster. Set it now using 
      // the RPC server's bind address.
      clientNamenodeAddress = 
          NetUtils.getHostPortString(rpcServer.getRpcAddress());
      LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
          + " this namenode/service.");
    }
    if (NamenodeRole.NAMENODE == role) {
      httpServer.setNameNodeAddress(getNameNodeAddress());
      httpServer.setFSImage(getFSImage());
    }
    
    pauseMonitor = new JvmPauseMonitor(conf);
    pauseMonitor.start();
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
    //启动活动状态和备用状态的公共服务:RPC服务和namenode的插件程序启动
    startCommonServices(conf);
  }
  

再看看NameNode.java的loadNamesystem(conf);

 /**
   * namenode启动时从本地文件系统加载镜像并重做编辑日志,都在此方法中实现。
   * @param conf
   * @throws IOException
   */
  protected void loadNamesystem(Configuration conf) throws IOException {
    this.namesystem = FSNamesystem.loadFromDisk(conf);
  }

从这里看到调用的是FSNamesystem的loadFromDisk的方法

/**
   * Instantiates an FSNamesystem loaded from the image and edits
   * directories specified in the passed Configuration.
   * 实例化从映像中加载的FSNamesystem,并编辑已通过的配置中指定的目录。
   *
   *namenode启动时从本地文件系统加载镜像并重做编辑日志,都在此方法中实现。
   * @param conf the Configuration which specifies the storage directories
   *             from which to load
   * @return an FSNamesystem which contains the loaded namespace
   * @throws IOException if loading fails
   */
  static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
   //必须的编辑日志目录检查
    checkConfiguration(conf);
    //设在NNStorage,并初始化编辑日志目录。NNStorage主要功能是管理namenode使用的存储目录
    FSImage fsImage = new FSImage(conf,
        FSNamesystem.getNamespaceDirs(conf),
        FSNamesystem.getNamespaceEditsDirs(conf));
    //根据指定的镜像创建FSNamesystem对象
    FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
    StartupOption startOpt = NameNode.getStartupOption(conf);
    if (startOpt == StartupOption.RECOVER) {
      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
    }

    long loadStart = now();
    try {
      //加载镜像、重做编辑日志,并打开一个新编辑文件都在此方法中
      namesystem.loadFSImage(startOpt);
    } catch (IOException ioe) {
      LOG.warn("Encountered exception loading fsimage", ioe);
      fsImage.close();
      throw ioe;
    }
    long timeTakenToLoadFSImage = now() - loadStart;
    LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
    NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
    if (nnMetrics != null) {
      nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
    }
    return namesystem;
  }

再看看FSNamesystem.java的loadFSImage

/**
   *加载fsimage的方法 这个是从 loadFromDisk 方法找到这里的
   * @param startOpt
   * @throws IOException
   */
  private void loadFSImage(StartupOption startOpt) throws IOException {
    final FSImage fsImage = getFSImage();

    // format before starting up if requested 如果需要,请先格式化。
    if (startOpt == StartupOption.FORMAT) {
      
      fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id

      startOpt = StartupOption.REGULAR;
    }
    boolean success = false;
    //添加锁子
    writeLock();

    try {
      // We shouldn't be calling saveNamespace if we've come up in standby state.
      // 如果我们处于待机状态,我们不应该调用savespace。
      //正在进行的NameNode元数据恢复过程的上下文数据。
      MetaRecoveryContext recovery = startOpt.createRecoveryContext();

      //真正加载fsimage的方法
      //其中fsImage.recoverTransitionRead(startOpt, this, recovery)会调用到FsImage.loadFSImage()函数
      final boolean staleImage
          = fsImage.recoverTransitionRead(startOpt, this, recovery);

      if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt) ||
          RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
        rollingUpgradeInfo = null;
      }
      final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade(); 
      LOG.info("Need to save fs image? " + needToSave
          + " (staleImage=" + staleImage + ", haEnabled=" + haEnabled
          + ", isRollingUpgrade=" + isRollingUpgrade() + ")");
      if (needToSave) {
        fsImage.saveNamespace(this);
      } else {
        updateStorageVersionForRollingUpgrade(fsImage.getLayoutVersion(),
            startOpt);
        // No need to save, so mark the phase done.
        StartupProgress prog = NameNode.getStartupProgress();
        prog.beginPhase(Phase.SAVING_CHECKPOINT);
        prog.endPhase(Phase.SAVING_CHECKPOINT);
      }
      // This will start a new log segment and write to the seen_txid file, so
      // we shouldn't do it when coming up in standby state
      if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)
          || (haEnabled && startOpt == StartupOption.UPGRADEONLY)) {
        fsImage.openEditLogForWrite();
      }
      success = true;
    } finally {
      if (!success) {
        fsImage.close();
      }
      writeUnlock();
    }
    imageLoadComplete();
  }

最后再看看FSImage的这个方法

/**
   * 恢复转换读取
   * Analyze storage directories.
   * Recover from previous transitions if required. 
   * Perform fs state transition if necessary depending on the namespace info.
   * Read storage info.
   * 分析存储目录。 如果需要,从以前的转换中恢复。 根据命名空间信息,在必要时执行fs状态转换。 阅读存储信息。
   *
   * @throws IOException
   * @return true if the image needs to be saved or false otherwise
   */
  boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
      MetaRecoveryContext recovery)
      throws IOException {
    //在读取图像之前应该先执行NameNode格式话
    assert startOpt != StartupOption.FORMAT : 
      "NameNode formatting should be performed before reading the image";

    //fsimage和editlog目录
    Collection<URI> imageDirs = storage.getImageDirectories();
    Collection<URI> editsDirs = editLog.getEditURIs();

    // none of the data dirs exist
    // 所有数据都不存在。 如若 fsimage和editlog都不存在 则抛出异常
    if((imageDirs.size() == 0 || editsDirs.size() == 0) 
                             && startOpt != StartupOption.IMPORT)  
      throw new IOException(
          "All specified directories are not accessible or do not exist.");//所有指定的目录都不可访问或不存在。
    
    // 1. For each data directory calculate its state and 
    // check whether all is consistent before transitioning.
    //对于每个数据目录,计算它的状态,并检查在转换之前是否所有数据都是一致的。
    Map<StorageDirectory, StorageState> dataDirStates = 
             new HashMap<StorageDirectory, StorageState>();
    boolean isFormatted = recoverStorageDirs(startOpt, dataDirStates);

    if (LOG.isTraceEnabled()) {
      LOG.trace("Data dir states:\n  " +
        Joiner.on("\n  ").withKeyValueSeparator(": ")
        .join(dataDirStates));
    }

    /**
     * 判断 NameNode不是格式化的 不是的话 抛异常
     */
    if (!isFormatted && startOpt != StartupOption.ROLLBACK 
                     && startOpt != StartupOption.IMPORT) {
      throw new IOException("NameNode is not formatted.");      
    }


    /**
     * 获取最新的版本
     */
    int layoutVersion = storage.getLayoutVersion();
    //验证已配置的目录是否存在,然后打印软件和图像的元数据版本。
    if (startOpt == StartupOption.METADATAVERSION) {
      System.out.println("HDFS Image Version: " + layoutVersion);
      System.out.println("Software format version: " +
        HdfsConstants.NAMENODE_LAYOUT_VERSION);
      return false;
    }

    //假如获取的最新版本 没有 最后的布局版本大 则升级
    if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {
      NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
    }
    //文件系统映像包含一个旧的布局版本xx。
    //需要升级到版本xx。
    //如果滚动升级已经启动,请使用xx选项重新启动NameNode;
    //或者使用xx选项重启NameNode以启动新的升级。
    if (startOpt != StartupOption.UPGRADE
        && startOpt != StartupOption.UPGRADEONLY
        && !RollingUpgradeStartupOption.STARTED.matches(startOpt)
        && layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
        && layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
      throw new IOException(
          "\nFile system image contains an old layout version " 
          + storage.getLayoutVersion() + ".\nAn upgrade to version "
          + HdfsConstants.NAMENODE_LAYOUT_VERSION + " is required.\n"
          + "Please restart NameNode with the \""
          + RollingUpgradeStartupOption.STARTED.getOptionString()
          + "\" option if a rolling upgrade is already started;"
          + " or restart NameNode with the \""
          + StartupOption.UPGRADE.getName() + "\" option to start"
          + " a new upgrade.");
    }

    //处理用于升级的clusterid和blockpoolid的启动选项。 查看用不用升级
    storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);

    // 2. Format unformatted dirs. 格式非格式化dirs
    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
      StorageDirectory sd = it.next();
      StorageState curState = dataDirStates.get(sd);
      switch(curState) {
      case NON_EXISTENT:
        throw new IOException(StorageState.NON_EXISTENT + 
                              " state cannot be here");
      case NOT_FORMATTED:
        //存储目录xxx(根目录)没有格式化
        LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
        LOG.info("Formatting ...");
        //创建一个空的目录
        sd.clearDirectory(); // create empty currrent dir
        break;
      default:
        break;
      }
    }

    // 3. Do transitions
    switch(startOpt) {
    case UPGRADE:
    case UPGRADEONLY:
      doUpgrade(target);
      return false; // upgrade saved image already //保存图像已经升级
    case IMPORT:
      doImportCheckpoint(target);
      return false; // import checkpoint saved image already //导入检查点已保存图像。
    case ROLLBACK:
      throw new AssertionError("Rollback is now a standalone command, "
          + "NameNode should not be starting with this option."); //
      //Rollback现在是一个独立的命令,NameNode不应该从这个选项开始。
    case REGULAR:
    default:
      // just load the image
    }

    /**
     * 最下头进行了调用 加载fsimage
     */
    return loadFSImage(target, startOpt, recovery);
  }
  

我们可以看到最后调用了loadFSImage的方法

 /**
   * Choose latest image from one of the directories,
   * load it and merge with the edits.
   * 
   * Saving and loading fsimage should never trigger symlink resolution. 
   * The paths that are persisted do not have *intermediate* symlinks 
   * because intermediate symlinks are resolved at the time files, 
   * directories, and symlinks are created. All paths accessed while 
   * loading or saving fsimage should therefore only see symlinks as 
   * the final path component, and the functions called below do not
   * resolve symlinks that are the final path component.
   * 从一个目录中选择最新的镜像 加载它并和编辑日志进行合并。
   *
   *  保存和加载fsimage绝不应该触发符号链接解析
   * 持久化的路径不具有* intermediate *符号链接,因为在创建文件,目录和符号链接时会解析中间符号链接。
   * 因此,加载或保存fsimage时访问的所有路径只能将符号链接看作最终路径组件,
   * 而下面调用的函数不能解析作为最终路径组件的符号链接。
   *
   * @return whether the image should be saved
   * @throws IOException
   */
  private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,
      MetaRecoveryContext recovery)
      throws IOException {

    // 看启动项是否是 滚动升级的回滚模式
    final boolean rollingRollback
        = RollingUpgradeStartupOption.ROLLBACK.matches(startOpt);
    final EnumSet<NameNodeFile> nnfs;
    if (rollingRollback) {
      //如果是滚动升级的回滚,只需要从回滚映像加载。
      // if it is rollback of rolling upgrade, only load from the rollback image
      nnfs = EnumSet.of(NameNodeFile.IMAGE_ROLLBACK);
    } else {
      // otherwise we can load from both IMAGE and IMAGE_ROLLBACK
      //否则,我们可以从图像和IMAGE_ROLLBACK加载。
      nnfs = EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK);
    }
    //遍历所有的存储dirs,读取它们的内容以确定它们的布局版本。返回已检查每个目录的FSImageStorageInspector。
    final FSImageStorageInspector inspector = storage
        .readAndInspectDirs(nnfs, startOpt);

    //如果任何存储目录有未完成的升级,则为false。
    isUpgradeFinalized = inspector.isUpgradeFinalized();
    //获取应该加载到文件系统中的图像文件。
    List<FSImageFile> imageFiles = inspector.getLatestImages();

    //用于报告namenode启动进度的对象
    StartupProgress prog = NameNode.getStartupProgress();
    //**********跟进进度 namenode正在将fsimage文件加载到内存中
    prog.beginPhase(Phase.LOADING_FSIMAGE);
    File phaseFile = imageFiles.get(0).getFile();
    //路径和长度也写到StartupProgress中
    prog.setFile(Phase.LOADING_FSIMAGE, phaseFile.getAbsolutePath());
    prog.setSize(Phase.LOADING_FSIMAGE, phaseFile.length());
    //如果目录处于这样的状态,则图像应该被重新保存。
    boolean needToSave = inspector.needToSave();

    Iterable<EditLogInputStream> editStreams = null;

    //初始化Editlog
    initEditLog(startOpt);

    //NN存储中的文件名是基于事务id的。
    //如果给定的布局版本中支持给定的特性,则返回true。
    if (NameNodeLayoutVersion.supports(
        LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
      // If we're open for write, we're either non-HA or we're the active NN, so
      // we better be able to load all the edits. If we're the standby NN, it's
      // OK to not be able to read all of edits right now.
      // In the meanwhile, for HA upgrade, we will still write editlog thus need
      // this toAtLeastTxId to be set to the max-seen txid
      // For rollback in rolling upgrade, we need to set the toAtLeastTxId to
      // the txid right before the upgrade marker.
      //如果我们对写开放,我们要么是非ha,要么是活动的NN,所以我们最好能够加载所有的编辑。
      // 如果我们是备用的NN,现在还不能读取所有的编辑。与此同时,对于HA升级,我们仍然会编写editlog,
      // 因此需要将这个toattrixid设置为max-seen txid。
     // 对于滚动升级的回滚,我们需要在升级标记之前将toat最少txid设置为txid。
      long toAtLeastTxId = editLog.isOpenForWrite() ? inspector
          .getMaxSeenTxId() : 0;
      if (rollingRollback) {
        // note that the first image in imageFiles is the special checkpoint
        // for the rolling upgrade
        //请注意,imageFiles中的第一个映像是滚动升级的特殊检查点。
        toAtLeastTxId = imageFiles.get(0).getCheckpointTxId() + 2;
      }
      editStreams = editLog.selectInputStreams(
          imageFiles.get(0).getCheckpointTxId() + 1,
          toAtLeastTxId, recovery, false);
    } else {
      editStreams = FSImagePreTransactionalStorageInspector
        .getEditLogStreams(storage);
    }
    //返回我们将用于输入的最大操作码大小。
    int maxOpSize = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_KEY,
        DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT);
    //给editlog输入流设置最大的操作值
    for (EditLogInputStream elis : editStreams) {
      elis.setMaxOpSize(maxOpSize);
    }
 
    for (EditLogInputStream l : editStreams) {
      LOG.debug("Planning to load edit log stream: " + l);
    }
    if (!editStreams.iterator().hasNext()) {
      //没有选择编辑日志流。
      LOG.info("No edit log streams selected.");
    }
    
    FSImageFile imageFile = null;
    for (int i = 0; i < imageFiles.size(); i++) {
      try {
        imageFile = imageFiles.get(i);
        //**************************************************//
        //重中之重加载fsimage
        loadFSImageFile(target, recovery, imageFile, startOpt);
        //**************************************************//
        break;
      } catch (IOException ioe) {
        LOG.error("Failed to load image from " + imageFile, ioe);
        target.clear();
        imageFile = null;
      }
    }
    // Failed to load any images, error out
    if (imageFile == null) {
      FSEditLog.closeAllStreams(editStreams);
      throw new IOException("Failed to load an FSImage file!");
    }
    prog.endPhase(Phase.LOADING_FSIMAGE);
    
    if (!rollingRollback) {
      long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
      needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
          txnsAdvanced);
      if (RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
        // rename rollback image if it is downgrade
        renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE);
      }
    } else {
      // Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals
      // to the last txid in rollback fsimage.
      rollingRollback(lastAppliedTxId + 1, imageFiles.get(0).getCheckpointTxId());
      needToSave = false;
    }
    editLog.setNextTxId(lastAppliedTxId + 1);
    return needToSave;
  }

在上面的方法中看到调用了FSImage的loadFSImageFile方法

/**
   * 重点 fsimage数据加载
   * @param target
   * @param recovery
   * @param imageFile
   * @param startupOption
   * @throws IOException
   */
  void loadFSImageFile(FSNamesystem target, MetaRecoveryContext recovery,
      FSImageFile imageFile, StartupOption startupOption) throws IOException {
    //准备加载fsimage
    LOG.debug("Planning to load image :\n" + imageFile);
    StorageDirectory sdForProperties = imageFile.sd;
    //赋予一些其它属性信息
    storage.readProperties(sdForProperties, startupOption);

    /**
     * 以下是对fsimage数据的读取 有三个判断 看符合哪一个 重重重
     */
    if (NameNodeLayoutVersion.supports(
        LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
      //如果 NN存储中的文件名是基于事务id的 走这个方法

      // For txid-based layout, we should have a .md5 file
      // next to the image file
      //对于基于txid的布局,我们应该在图像文件旁边有一个.md5文件。
      boolean isRollingRollback = RollingUpgradeStartupOption.ROLLBACK
          .matches(startupOption);
      loadFSImage(imageFile.getFile(), target, recovery, isRollingRollback);
    } else if (NameNodeLayoutVersion.supports(
        LayoutVersion.Feature.FSIMAGE_CHECKSUM, getLayoutVersion())) {
      //支持校验和fsimage

      // In 0.22, we have the checksum stored in the VERSION file.
      //在0.22中,我们将校验和存储在版本文件中。
      String md5 = storage.getDeprecatedProperty(
          NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY);
      if (md5 == null) {
        throw new InconsistentFSStateException(sdForProperties.getRoot(),
            "Message digest property " +
            NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
            " not set for storage directory " + sdForProperties.getRoot());
      }
      loadFSImage(imageFile.getFile(), new MD5Hash(md5), target, recovery,
          false);
    } else {
      // We don't have any record of the md5sum
      // 我们没有md5和的记录。
      loadFSImage(imageFile.getFile(), null, target, recovery, false);
    }
  }

然后可以看到上面会调用到不同FSImage.java的loadFSImage的方法

/**
   * Load in the filesystem image from file. It's a big list of
   * filenames and blocks.
   * 从文件加载文件系统映像。 这是一个很大的文件名和块列表。
   */
  private void loadFSImage(File curFile, MD5Hash expectedMd5,
      FSNamesystem target, MetaRecoveryContext recovery,
      boolean requireSameLayoutVersion) throws IOException {
    // BlockPoolId is required when the FsImageLoader loads the rolling upgrade
    // information. Make sure the ID is properly set
    // 当FsImageLoader加载滚动升级信息时,需要BlockPoolId。 确保ID已正确设置。.
    target.setBlockPoolId(this.getBlockPoolID());
    //加载
    FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);
    loader.load(curFile, requireSameLayoutVersion);

    // Check that the image digest we loaded matches up with what
    // we expected
    //检查我们加载的镜像摘要是否符合我们的预期
    MD5Hash readImageMd5 = loader.getLoadedImageMd5();
    if (expectedMd5 != null &&
        !expectedMd5.equals(readImageMd5)) {
      throw new IOException("Image file " + curFile +
          " is corrupt with MD5 checksum of " + readImageMd5 +
          " but expecting " + expectedMd5);
    }

    long txId = loader.getLoadedImageTxId();
    LOG.info("Loaded image for txid " + txId + " from " + curFile);
    lastAppliedTxId = txId;
    //设置最后一个检查点的事务ID和时间
    storage.setMostRecentCheckpointInfo(txId, curFile.lastModified());
  }

然后再接着看上面调用的load方法,是FSImageFormat.java的load方法

   public void load(File file, boolean requireSameLayoutVersion)
        throws IOException {
      Preconditions.checkState(impl == null, "Image already loaded!");

      FileInputStream is = null;
      try {
        //fs image输入流
        is = new FileInputStream(file);
        //定义自己数组 自己数组大小为 HDFSIMG1 字符串自己大小 MAGIC_HEADER 神奇的头部只会这么理解了
        // 8个字节
        byte[] magic = new byte[FSImageUtil.MAGIC_HEADER.length];
        //读取输入流中的前8个字节 放入magic自己数组中
        IOUtils.readFully(is, magic, 0, magic.length);

        if (Arrays.equals(magic, FSImageUtil.MAGIC_HEADER)) {
          //判断读取的前8个字节是否是 HDFSIMFG1
          //如果是
          //生成一个
          FSImageFormatProtobuf.Loader loader = new FSImageFormatProtobuf.Loader(
              conf, fsn, requireSameLayoutVersion);
          impl = loader;
          loader.load(file);
        } else {
          Loader loader = new Loader(conf, fsn);
          impl = loader;
          loader.load(file);
        }
      } finally {
        IOUtils.cleanup(LOG, is);
      }
    }

然后调用的是FSImageFormatProtobuf.java的load方法

  /**
     * 加载的方法
     * @param file
     * @throws IOException
     */
    void load(File file) throws IOException {
      long start = Time.monotonicNow();
      imgDigest = MD5FileUtils.computeMd5ForFile(file);
      RandomAccessFile raFile = new RandomAccessFile(file, "r");
      FileInputStream fin = new FileInputStream(file);
      try {
        //内部加载
        loadInternal(raFile, fin);
        long end = Time.monotonicNow();
        LOG.info("Loaded FSImage in " + (end - start) / 1000 + " seconds.");
      } finally {
        fin.close();
        raFile.close();
      }
    }

在这个方法中调用的是loadInternal(raFile, fin);

private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
        throws IOException {
      //校验头部信息是不是dfsimage1
      if (!FSImageUtil.checkFileFormat(raFile)) {
        throw new IOException("Unrecognized file format");
      }
      FileSummary summary = FSImageUtil.loadSummary(raFile);
      if (requireSameLayoutVersion && summary.getLayoutVersion() !=
          HdfsConstants.NAMENODE_LAYOUT_VERSION) {
        throw new IOException("Image version " + summary.getLayoutVersion() +
            " is not equal to the software version " +
            HdfsConstants.NAMENODE_LAYOUT_VERSION);
      }

      //打开FileChannel通道
      FileChannel channel = fin.getChannel();

      //声明inodeloader对象
      FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader(
          fsn, this);
      //声明snapshotLoader对象 快照对象
      FSImageFormatPBSnapshot.Loader snapshotLoader = new FSImageFormatPBSnapshot.Loader(
          fsn, this);

      //获取section 数组
      ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
          .getSectionsList());
      //按照名字排序
      Collections.sort(sections, new Comparator<FileSummary.Section>() {
        @Override
        public int compare(FileSummary.Section s1, FileSummary.Section s2) {
          SectionName n1 = SectionName.fromString(s1.getName());
          SectionName n2 = SectionName.fromString(s2.getName());
          if (n1 == null) {
            return n2 == null ? 0 : -1;
          } else if (n2 == null) {
            return -1;
          } else {
            return n1.ordinal() - n2.ordinal();
          }
        }
      });

      //获取namenode的启动进度对象
      StartupProgress prog = NameNode.getStartupProgress();
      /**
       * beginStep() and the endStep() calls do not match the boundary of the
       * sections. This is because that the current implementation only allows
       * a particular step to be started for once.
       * beginStep()和endStep()调用与部分的边界不匹配。 这是因为当前的实现只允许启动一次特定的步骤。
       */
      Step currentStep = null;

      for (FileSummary.Section s : sections) {
        channel.position(s.getOffset());
        //使用LimitInputStream 可以解决 将文件的一部分作为输入流
        InputStream in = new BufferedInputStream(new LimitInputStream(fin,
            s.getLength()));

        /**
         * 包装压缩输入流
         */
        in = FSImageUtil.wrapInputStreamForCompression(conf,
            summary.getCodec(), in);

        //获取section的名字
        String n = s.getName();

        switch (SectionName.fromString(n)) {
        case NS_INFO:
          loadNameSystemSection(in);
          break;
        case STRING_TABLE:
          loadStringTableSection(in);
          break;
        case INODE: {
          //整个目录树所有节点数据,包括INodeFile/INodeDirectory/INodeSymlink等所有类型节点的属性数据,
          // 其中记录了如节点id,节点名称,访问权限,创建和访问时间等等信息;
          currentStep = new Step(StepType.INODES);
          prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
          inodeLoader.loadINodeSection(in);
        }
          break;
        case INODE_REFERENCE:
          snapshotLoader.loadINodeReferenceSection(in);
          break;
        case INODE_DIR:
          //整个目录树中所有节点之间的父子关系,配合INODE可构建完整的目录树;
          inodeLoader.loadINodeDirectorySection(in);
          break;
        case FILES_UNDERCONSTRUCTION:
          inodeLoader.loadFilesUnderConstructionSection(in);
          break;
        case SNAPSHOT:
          snapshotLoader.loadSnapshotSection(in);
          break;
        case SNAPSHOT_DIFF:
          snapshotLoader.loadSnapshotDiffSection(in);
          break;
        case SECRET_MANAGER: {
          prog.endStep(Phase.LOADING_FSIMAGE, currentStep);
          Step step = new Step(StepType.DELEGATION_TOKENS);
          prog.beginStep(Phase.LOADING_FSIMAGE, step);
          loadSecretManagerSection(in);
          prog.endStep(Phase.LOADING_FSIMAGE, step);
        }
          break;
        case CACHE_MANAGER: {
          Step step = new Step(StepType.CACHE_POOLS);
          prog.beginStep(Phase.LOADING_FSIMAGE, step);
          loadCacheManagerSection(in);
          prog.endStep(Phase.LOADING_FSIMAGE, step);
        }
          break;
        default:
          LOG.warn("Unrecognized section " + n);
          break;
        }
      }
    }

然后调用的就是

  /**
     * 整个目录树所有节点数据
     * @param in
     * @throws IOException
     */
    void loadINodeSection(InputStream in) throws IOException {
      //通过输入流 定义INodeSection
      INodeSection s = INodeSection.parseDelimitedFrom(in);
      //设置加载fsimage或editlog时最后分配的inode id
      fsn.resetLastInodeId(s.getLastInodeId());
      //打印日志 要加载的节点数
      LOG.info("Loading " + s.getNumInodes() + " INodes.");
      for (int i = 0; i < s.getNumInodes(); ++i) {
        INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
        if (p.getId() == INodeId.ROOT_INODE_ID) {
          //如果是根结点 则加载根结点
          loadRootINode(p);
        } else {
          INode n = loadINode(p);
          dir.addToInodeMap(n);
        }
      }
    }
       /**
     * 整个目录树中所有节点之间的父子关系
     * @param in
     * @throws IOException
     */
    void loadINodeDirectorySection(InputStream in) throws IOException {
      //加载全部快照
      final List<INodeReference> refList = parent.getLoaderContext()
          .getRefList();
      while (true) {
        INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
            .parseDelimitedFrom(in);
        // note that in is a LimitedInputStream
        //请注意,它是一个LimitedInputStream
        if (e == null) {
          break;
        }
        INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
        for (long id : e.getChildrenList()) {
          INode child = dir.getInode(id);
          //添加父节点
          addToParent(p, child);
        }
        for (int refId : e.getRefChildrenList()) {
          //如果是快照的话 添加快照节点
          INodeReference ref = refList.get(refId);
          addToParent(p, ref);
        }
      }
    } 
.......