启动NameNode的进程
sbin/start-dfs.sh
里面的核心代码如下
#通过命令查询处所有的NameNode
NAMENODES=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -namenodes 2>/dev/null)
if [[ -z "${NAMENODES}" ]]; then
NAMENODES=$(hostname)
fi
echo "Starting namenodes on [${NAMENODES}]"
#hdfs namenode来启动NameNode
hadoop_uservar_su hdfs namenode "${HADOOP_HDFS_HOME}/bin/hdfs" \
--workers \
--config "${HADOOP_CONF_DIR}" \
--hostnames "${NAMENODES}" \
--daemon start \
namenode ${nameStartOpt}
HADOOP_JUMBO_RETCOUNTER=$?
启动的是NameNode类的main方法
public static void main(String argv[]) throws Exception {
if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
System.exit(0);
}
try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
//创建一个NameNode
NameNode namenode = createNameNode(argv, null);
if (namenode != null) {
namenode.join();
}
} catch (Throwable e) {
LOG.fatal("Failed to start namenode.", e);
terminate(1, e);
}
}
由于没有传其它参数,所以创建NameNode 是用的默认的方法。
public static NameNode createNameNode(String argv[], Configuration conf)
throws IOException {
......//次数省略N行代码
//执行下列方法
default: {
//初始化 Metric系统的NameNode
DefaultMetricsSystem.initialize("NameNode");
//执行创建NameNode
return new NameNode(conf);
}
}
}
以上代码可以看到先执行DefaultMetricsSystem.initialize("NameNode"); 来给指标系统注册,以便对NameNode进行监控,具体什么是指标系统 详看(https://github.com/ColZer/DigAndBuried/blob/master/hadoop/metric-learn.md)。然后再执行创建NameNode
//创建NameNode的方法
protected NameNode(Configuration conf, NamenodeRole role)
throws IOException {
this.conf = conf;
this.role = role;
//设置NameNode地址 默认的就是fs.defaultFS配置对应的值hdfs://localhost:9000
setClientNamenodeAddress(conf);
//获取服务器ID
String nsId = getNameServiceId(conf);
//获取HA的NameNodeID
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
//是否启用了HA
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
//更新状态
state = createHAState(getStartupOption(conf));
//持否允许读 读取dfs.ha.allow.stale.reads,设置namenode在备用状态时是否允许读操作,默认为false
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
//联邦环境下,使用该方法配置一系列使用一个逻辑上的nsId组合在一起的namenode
initializeGenericKeys(conf, nsId, namenodeId);
//重要的环节
//namenode初始化
initialize(conf);
try {
haContext.writeLock();
state.prepareToEnterState(haContext);
//namenode进入相应状态:active state/backup state/standby state
state.enterState(haContext);
} finally {
haContext.writeUnlock();
}
} catch (IOException e) {
this.stop();
throw e;
} catch (HadoopIllegalArgumentException e) {
this.stop();
throw e;
}
}
下面我们再讲讲NameNode.java的initialize(conf);
protected void initialize(Configuration conf) throws IOException {
//监控时间间隔 用逗号分隔的整数集表示在Namenode和Datanode上对百分比延迟指标的期望的滚动间隔(以秒为单位)。默认情况下,百分比延迟指标是禁用的。
if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
if (intervals != null) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
intervals);
}
}
////设置权限,根据hadoop.security.authentication获取认证方式及规则
UserGroupInformation.setConfiguration(conf);
//登录:如果认证方式为simple则退出该方法
//否则调用UserGroupInformation.loginUserFromKeytab进行登陆,登陆使用dfs.namenode.kerberos.principal作为用户名
//初始化登录认证,如果HADOOP开启了Kerberos认证,则进行认证
//认证的配置信息来自hdfs-site.xml
//Dfs.namenode.keytab.file #keytab文件
//Dfs.namenode.kerberos.principal #kerberos认证个体
//最后调用接口进行认证 UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename);
loginAsNameNodeUser(conf);
//初始化度量系统,用于度量namenode服务状态
NameNode.initMetrics(conf, this.getRole());
StartupProgressMetrics.register(startupProgress);
if (NamenodeRole.NAMENODE == role) {
//启动http服务器
startHttpServer(conf);
}
this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
//根据命令对命名空间进行操作,如:前文所述启动时加载本地命名空间镜像和应用编辑日志,在内存中建立命名空间的映像
loadNamesystem(conf);
//创建RPC服务器
rpcServer = createRpcServer(conf);
if (clientNamenodeAddress == null) {
// This is expected for MiniDFSCluster. Set it now using
// the RPC server's bind address.
clientNamenodeAddress =
NetUtils.getHostPortString(rpcServer.getRpcAddress());
LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
+ " this namenode/service.");
}
if (NamenodeRole.NAMENODE == role) {
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
//启动活动状态和备用状态的公共服务:RPC服务和namenode的插件程序启动
startCommonServices(conf);
}
再看看NameNode.java的loadNamesystem(conf);
/**
* namenode启动时从本地文件系统加载镜像并重做编辑日志,都在此方法中实现。
* @param conf
* @throws IOException
*/
protected void loadNamesystem(Configuration conf) throws IOException {
this.namesystem = FSNamesystem.loadFromDisk(conf);
}
从这里看到调用的是FSNamesystem的loadFromDisk的方法
/**
* Instantiates an FSNamesystem loaded from the image and edits
* directories specified in the passed Configuration.
* 实例化从映像中加载的FSNamesystem,并编辑已通过的配置中指定的目录。
*
*namenode启动时从本地文件系统加载镜像并重做编辑日志,都在此方法中实现。
* @param conf the Configuration which specifies the storage directories
* from which to load
* @return an FSNamesystem which contains the loaded namespace
* @throws IOException if loading fails
*/
static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
//必须的编辑日志目录检查
checkConfiguration(conf);
//设在NNStorage,并初始化编辑日志目录。NNStorage主要功能是管理namenode使用的存储目录
FSImage fsImage = new FSImage(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
//根据指定的镜像创建FSNamesystem对象
FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
StartupOption startOpt = NameNode.getStartupOption(conf);
if (startOpt == StartupOption.RECOVER) {
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
}
long loadStart = now();
try {
//加载镜像、重做编辑日志,并打开一个新编辑文件都在此方法中
namesystem.loadFSImage(startOpt);
} catch (IOException ioe) {
LOG.warn("Encountered exception loading fsimage", ioe);
fsImage.close();
throw ioe;
}
long timeTakenToLoadFSImage = now() - loadStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
if (nnMetrics != null) {
nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
}
return namesystem;
}
再看看FSNamesystem.java的loadFSImage
/**
*加载fsimage的方法 这个是从 loadFromDisk 方法找到这里的
* @param startOpt
* @throws IOException
*/
private void loadFSImage(StartupOption startOpt) throws IOException {
final FSImage fsImage = getFSImage();
// format before starting up if requested 如果需要,请先格式化。
if (startOpt == StartupOption.FORMAT) {
fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id
startOpt = StartupOption.REGULAR;
}
boolean success = false;
//添加锁子
writeLock();
try {
// We shouldn't be calling saveNamespace if we've come up in standby state.
// 如果我们处于待机状态,我们不应该调用savespace。
//正在进行的NameNode元数据恢复过程的上下文数据。
MetaRecoveryContext recovery = startOpt.createRecoveryContext();
//真正加载fsimage的方法
//其中fsImage.recoverTransitionRead(startOpt, this, recovery)会调用到FsImage.loadFSImage()函数
final boolean staleImage
= fsImage.recoverTransitionRead(startOpt, this, recovery);
if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt) ||
RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
rollingUpgradeInfo = null;
}
final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade();
LOG.info("Need to save fs image? " + needToSave
+ " (staleImage=" + staleImage + ", haEnabled=" + haEnabled
+ ", isRollingUpgrade=" + isRollingUpgrade() + ")");
if (needToSave) {
fsImage.saveNamespace(this);
} else {
updateStorageVersionForRollingUpgrade(fsImage.getLayoutVersion(),
startOpt);
// No need to save, so mark the phase done.
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAVING_CHECKPOINT);
prog.endPhase(Phase.SAVING_CHECKPOINT);
}
// This will start a new log segment and write to the seen_txid file, so
// we shouldn't do it when coming up in standby state
if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)
|| (haEnabled && startOpt == StartupOption.UPGRADEONLY)) {
fsImage.openEditLogForWrite();
}
success = true;
} finally {
if (!success) {
fsImage.close();
}
writeUnlock();
}
imageLoadComplete();
}
最后再看看FSImage的这个方法
/**
* 恢复转换读取
* Analyze storage directories.
* Recover from previous transitions if required.
* Perform fs state transition if necessary depending on the namespace info.
* Read storage info.
* 分析存储目录。 如果需要,从以前的转换中恢复。 根据命名空间信息,在必要时执行fs状态转换。 阅读存储信息。
*
* @throws IOException
* @return true if the image needs to be saved or false otherwise
*/
boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
MetaRecoveryContext recovery)
throws IOException {
//在读取图像之前应该先执行NameNode格式话
assert startOpt != StartupOption.FORMAT :
"NameNode formatting should be performed before reading the image";
//fsimage和editlog目录
Collection<URI> imageDirs = storage.getImageDirectories();
Collection<URI> editsDirs = editLog.getEditURIs();
// none of the data dirs exist
// 所有数据都不存在。 如若 fsimage和editlog都不存在 则抛出异常
if((imageDirs.size() == 0 || editsDirs.size() == 0)
&& startOpt != StartupOption.IMPORT)
throw new IOException(
"All specified directories are not accessible or do not exist.");//所有指定的目录都不可访问或不存在。
// 1. For each data directory calculate its state and
// check whether all is consistent before transitioning.
//对于每个数据目录,计算它的状态,并检查在转换之前是否所有数据都是一致的。
Map<StorageDirectory, StorageState> dataDirStates =
new HashMap<StorageDirectory, StorageState>();
boolean isFormatted = recoverStorageDirs(startOpt, dataDirStates);
if (LOG.isTraceEnabled()) {
LOG.trace("Data dir states:\n " +
Joiner.on("\n ").withKeyValueSeparator(": ")
.join(dataDirStates));
}
/**
* 判断 NameNode不是格式化的 不是的话 抛异常
*/
if (!isFormatted && startOpt != StartupOption.ROLLBACK
&& startOpt != StartupOption.IMPORT) {
throw new IOException("NameNode is not formatted.");
}
/**
* 获取最新的版本
*/
int layoutVersion = storage.getLayoutVersion();
//验证已配置的目录是否存在,然后打印软件和图像的元数据版本。
if (startOpt == StartupOption.METADATAVERSION) {
System.out.println("HDFS Image Version: " + layoutVersion);
System.out.println("Software format version: " +
HdfsConstants.NAMENODE_LAYOUT_VERSION);
return false;
}
//假如获取的最新版本 没有 最后的布局版本大 则升级
if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {
NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
}
//文件系统映像包含一个旧的布局版本xx。
//需要升级到版本xx。
//如果滚动升级已经启动,请使用xx选项重新启动NameNode;
//或者使用xx选项重启NameNode以启动新的升级。
if (startOpt != StartupOption.UPGRADE
&& startOpt != StartupOption.UPGRADEONLY
&& !RollingUpgradeStartupOption.STARTED.matches(startOpt)
&& layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
&& layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
throw new IOException(
"\nFile system image contains an old layout version "
+ storage.getLayoutVersion() + ".\nAn upgrade to version "
+ HdfsConstants.NAMENODE_LAYOUT_VERSION + " is required.\n"
+ "Please restart NameNode with the \""
+ RollingUpgradeStartupOption.STARTED.getOptionString()
+ "\" option if a rolling upgrade is already started;"
+ " or restart NameNode with the \""
+ StartupOption.UPGRADE.getName() + "\" option to start"
+ " a new upgrade.");
}
//处理用于升级的clusterid和blockpoolid的启动选项。 查看用不用升级
storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);
// 2. Format unformatted dirs. 格式非格式化dirs
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
StorageState curState = dataDirStates.get(sd);
switch(curState) {
case NON_EXISTENT:
throw new IOException(StorageState.NON_EXISTENT +
" state cannot be here");
case NOT_FORMATTED:
//存储目录xxx(根目录)没有格式化
LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
LOG.info("Formatting ...");
//创建一个空的目录
sd.clearDirectory(); // create empty currrent dir
break;
default:
break;
}
}
// 3. Do transitions
switch(startOpt) {
case UPGRADE:
case UPGRADEONLY:
doUpgrade(target);
return false; // upgrade saved image already //保存图像已经升级
case IMPORT:
doImportCheckpoint(target);
return false; // import checkpoint saved image already //导入检查点已保存图像。
case ROLLBACK:
throw new AssertionError("Rollback is now a standalone command, "
+ "NameNode should not be starting with this option."); //
//Rollback现在是一个独立的命令,NameNode不应该从这个选项开始。
case REGULAR:
default:
// just load the image
}
/**
* 最下头进行了调用 加载fsimage
*/
return loadFSImage(target, startOpt, recovery);
}
我们可以看到最后调用了loadFSImage的方法
/**
* Choose latest image from one of the directories,
* load it and merge with the edits.
*
* Saving and loading fsimage should never trigger symlink resolution.
* The paths that are persisted do not have *intermediate* symlinks
* because intermediate symlinks are resolved at the time files,
* directories, and symlinks are created. All paths accessed while
* loading or saving fsimage should therefore only see symlinks as
* the final path component, and the functions called below do not
* resolve symlinks that are the final path component.
* 从一个目录中选择最新的镜像 加载它并和编辑日志进行合并。
*
* 保存和加载fsimage绝不应该触发符号链接解析
* 持久化的路径不具有* intermediate *符号链接,因为在创建文件,目录和符号链接时会解析中间符号链接。
* 因此,加载或保存fsimage时访问的所有路径只能将符号链接看作最终路径组件,
* 而下面调用的函数不能解析作为最终路径组件的符号链接。
*
* @return whether the image should be saved
* @throws IOException
*/
private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,
MetaRecoveryContext recovery)
throws IOException {
// 看启动项是否是 滚动升级的回滚模式
final boolean rollingRollback
= RollingUpgradeStartupOption.ROLLBACK.matches(startOpt);
final EnumSet<NameNodeFile> nnfs;
if (rollingRollback) {
//如果是滚动升级的回滚,只需要从回滚映像加载。
// if it is rollback of rolling upgrade, only load from the rollback image
nnfs = EnumSet.of(NameNodeFile.IMAGE_ROLLBACK);
} else {
// otherwise we can load from both IMAGE and IMAGE_ROLLBACK
//否则,我们可以从图像和IMAGE_ROLLBACK加载。
nnfs = EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK);
}
//遍历所有的存储dirs,读取它们的内容以确定它们的布局版本。返回已检查每个目录的FSImageStorageInspector。
final FSImageStorageInspector inspector = storage
.readAndInspectDirs(nnfs, startOpt);
//如果任何存储目录有未完成的升级,则为false。
isUpgradeFinalized = inspector.isUpgradeFinalized();
//获取应该加载到文件系统中的图像文件。
List<FSImageFile> imageFiles = inspector.getLatestImages();
//用于报告namenode启动进度的对象
StartupProgress prog = NameNode.getStartupProgress();
//**********跟进进度 namenode正在将fsimage文件加载到内存中
prog.beginPhase(Phase.LOADING_FSIMAGE);
File phaseFile = imageFiles.get(0).getFile();
//路径和长度也写到StartupProgress中
prog.setFile(Phase.LOADING_FSIMAGE, phaseFile.getAbsolutePath());
prog.setSize(Phase.LOADING_FSIMAGE, phaseFile.length());
//如果目录处于这样的状态,则图像应该被重新保存。
boolean needToSave = inspector.needToSave();
Iterable<EditLogInputStream> editStreams = null;
//初始化Editlog
initEditLog(startOpt);
//NN存储中的文件名是基于事务id的。
//如果给定的布局版本中支持给定的特性,则返回true。
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
// If we're open for write, we're either non-HA or we're the active NN, so
// we better be able to load all the edits. If we're the standby NN, it's
// OK to not be able to read all of edits right now.
// In the meanwhile, for HA upgrade, we will still write editlog thus need
// this toAtLeastTxId to be set to the max-seen txid
// For rollback in rolling upgrade, we need to set the toAtLeastTxId to
// the txid right before the upgrade marker.
//如果我们对写开放,我们要么是非ha,要么是活动的NN,所以我们最好能够加载所有的编辑。
// 如果我们是备用的NN,现在还不能读取所有的编辑。与此同时,对于HA升级,我们仍然会编写editlog,
// 因此需要将这个toattrixid设置为max-seen txid。
// 对于滚动升级的回滚,我们需要在升级标记之前将toat最少txid设置为txid。
long toAtLeastTxId = editLog.isOpenForWrite() ? inspector
.getMaxSeenTxId() : 0;
if (rollingRollback) {
// note that the first image in imageFiles is the special checkpoint
// for the rolling upgrade
//请注意,imageFiles中的第一个映像是滚动升级的特殊检查点。
toAtLeastTxId = imageFiles.get(0).getCheckpointTxId() + 2;
}
editStreams = editLog.selectInputStreams(
imageFiles.get(0).getCheckpointTxId() + 1,
toAtLeastTxId, recovery, false);
} else {
editStreams = FSImagePreTransactionalStorageInspector
.getEditLogStreams(storage);
}
//返回我们将用于输入的最大操作码大小。
int maxOpSize = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT);
//给editlog输入流设置最大的操作值
for (EditLogInputStream elis : editStreams) {
elis.setMaxOpSize(maxOpSize);
}
for (EditLogInputStream l : editStreams) {
LOG.debug("Planning to load edit log stream: " + l);
}
if (!editStreams.iterator().hasNext()) {
//没有选择编辑日志流。
LOG.info("No edit log streams selected.");
}
FSImageFile imageFile = null;
for (int i = 0; i < imageFiles.size(); i++) {
try {
imageFile = imageFiles.get(i);
//**************************************************//
//重中之重加载fsimage
loadFSImageFile(target, recovery, imageFile, startOpt);
//**************************************************//
break;
} catch (IOException ioe) {
LOG.error("Failed to load image from " + imageFile, ioe);
target.clear();
imageFile = null;
}
}
// Failed to load any images, error out
if (imageFile == null) {
FSEditLog.closeAllStreams(editStreams);
throw new IOException("Failed to load an FSImage file!");
}
prog.endPhase(Phase.LOADING_FSIMAGE);
if (!rollingRollback) {
long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
txnsAdvanced);
if (RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
// rename rollback image if it is downgrade
renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE);
}
} else {
// Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals
// to the last txid in rollback fsimage.
rollingRollback(lastAppliedTxId + 1, imageFiles.get(0).getCheckpointTxId());
needToSave = false;
}
editLog.setNextTxId(lastAppliedTxId + 1);
return needToSave;
}
在上面的方法中看到调用了FSImage的loadFSImageFile方法
/**
* 重点 fsimage数据加载
* @param target
* @param recovery
* @param imageFile
* @param startupOption
* @throws IOException
*/
void loadFSImageFile(FSNamesystem target, MetaRecoveryContext recovery,
FSImageFile imageFile, StartupOption startupOption) throws IOException {
//准备加载fsimage
LOG.debug("Planning to load image :\n" + imageFile);
StorageDirectory sdForProperties = imageFile.sd;
//赋予一些其它属性信息
storage.readProperties(sdForProperties, startupOption);
/**
* 以下是对fsimage数据的读取 有三个判断 看符合哪一个 重重重
*/
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
//如果 NN存储中的文件名是基于事务id的 走这个方法
// For txid-based layout, we should have a .md5 file
// next to the image file
//对于基于txid的布局,我们应该在图像文件旁边有一个.md5文件。
boolean isRollingRollback = RollingUpgradeStartupOption.ROLLBACK
.matches(startupOption);
loadFSImage(imageFile.getFile(), target, recovery, isRollingRollback);
} else if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.FSIMAGE_CHECKSUM, getLayoutVersion())) {
//支持校验和fsimage
// In 0.22, we have the checksum stored in the VERSION file.
//在0.22中,我们将校验和存储在版本文件中。
String md5 = storage.getDeprecatedProperty(
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY);
if (md5 == null) {
throw new InconsistentFSStateException(sdForProperties.getRoot(),
"Message digest property " +
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
" not set for storage directory " + sdForProperties.getRoot());
}
loadFSImage(imageFile.getFile(), new MD5Hash(md5), target, recovery,
false);
} else {
// We don't have any record of the md5sum
// 我们没有md5和的记录。
loadFSImage(imageFile.getFile(), null, target, recovery, false);
}
}
然后可以看到上面会调用到不同FSImage.java的loadFSImage的方法
/**
* Load in the filesystem image from file. It's a big list of
* filenames and blocks.
* 从文件加载文件系统映像。 这是一个很大的文件名和块列表。
*/
private void loadFSImage(File curFile, MD5Hash expectedMd5,
FSNamesystem target, MetaRecoveryContext recovery,
boolean requireSameLayoutVersion) throws IOException {
// BlockPoolId is required when the FsImageLoader loads the rolling upgrade
// information. Make sure the ID is properly set
// 当FsImageLoader加载滚动升级信息时,需要BlockPoolId。 确保ID已正确设置。.
target.setBlockPoolId(this.getBlockPoolID());
//加载
FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);
loader.load(curFile, requireSameLayoutVersion);
// Check that the image digest we loaded matches up with what
// we expected
//检查我们加载的镜像摘要是否符合我们的预期
MD5Hash readImageMd5 = loader.getLoadedImageMd5();
if (expectedMd5 != null &&
!expectedMd5.equals(readImageMd5)) {
throw new IOException("Image file " + curFile +
" is corrupt with MD5 checksum of " + readImageMd5 +
" but expecting " + expectedMd5);
}
long txId = loader.getLoadedImageTxId();
LOG.info("Loaded image for txid " + txId + " from " + curFile);
lastAppliedTxId = txId;
//设置最后一个检查点的事务ID和时间
storage.setMostRecentCheckpointInfo(txId, curFile.lastModified());
}
然后再接着看上面调用的load方法,是FSImageFormat.java的load方法
public void load(File file, boolean requireSameLayoutVersion)
throws IOException {
Preconditions.checkState(impl == null, "Image already loaded!");
FileInputStream is = null;
try {
//fs image输入流
is = new FileInputStream(file);
//定义自己数组 自己数组大小为 HDFSIMG1 字符串自己大小 MAGIC_HEADER 神奇的头部只会这么理解了
// 8个字节
byte[] magic = new byte[FSImageUtil.MAGIC_HEADER.length];
//读取输入流中的前8个字节 放入magic自己数组中
IOUtils.readFully(is, magic, 0, magic.length);
if (Arrays.equals(magic, FSImageUtil.MAGIC_HEADER)) {
//判断读取的前8个字节是否是 HDFSIMFG1
//如果是
//生成一个
FSImageFormatProtobuf.Loader loader = new FSImageFormatProtobuf.Loader(
conf, fsn, requireSameLayoutVersion);
impl = loader;
loader.load(file);
} else {
Loader loader = new Loader(conf, fsn);
impl = loader;
loader.load(file);
}
} finally {
IOUtils.cleanup(LOG, is);
}
}
然后调用的是FSImageFormatProtobuf.java的load方法
/**
* 加载的方法
* @param file
* @throws IOException
*/
void load(File file) throws IOException {
long start = Time.monotonicNow();
imgDigest = MD5FileUtils.computeMd5ForFile(file);
RandomAccessFile raFile = new RandomAccessFile(file, "r");
FileInputStream fin = new FileInputStream(file);
try {
//内部加载
loadInternal(raFile, fin);
long end = Time.monotonicNow();
LOG.info("Loaded FSImage in " + (end - start) / 1000 + " seconds.");
} finally {
fin.close();
raFile.close();
}
}
在这个方法中调用的是loadInternal(raFile, fin);
private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
throws IOException {
//校验头部信息是不是dfsimage1
if (!FSImageUtil.checkFileFormat(raFile)) {
throw new IOException("Unrecognized file format");
}
FileSummary summary = FSImageUtil.loadSummary(raFile);
if (requireSameLayoutVersion && summary.getLayoutVersion() !=
HdfsConstants.NAMENODE_LAYOUT_VERSION) {
throw new IOException("Image version " + summary.getLayoutVersion() +
" is not equal to the software version " +
HdfsConstants.NAMENODE_LAYOUT_VERSION);
}
//打开FileChannel通道
FileChannel channel = fin.getChannel();
//声明inodeloader对象
FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader(
fsn, this);
//声明snapshotLoader对象 快照对象
FSImageFormatPBSnapshot.Loader snapshotLoader = new FSImageFormatPBSnapshot.Loader(
fsn, this);
//获取section 数组
ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
.getSectionsList());
//按照名字排序
Collections.sort(sections, new Comparator<FileSummary.Section>() {
@Override
public int compare(FileSummary.Section s1, FileSummary.Section s2) {
SectionName n1 = SectionName.fromString(s1.getName());
SectionName n2 = SectionName.fromString(s2.getName());
if (n1 == null) {
return n2 == null ? 0 : -1;
} else if (n2 == null) {
return -1;
} else {
return n1.ordinal() - n2.ordinal();
}
}
});
//获取namenode的启动进度对象
StartupProgress prog = NameNode.getStartupProgress();
/**
* beginStep() and the endStep() calls do not match the boundary of the
* sections. This is because that the current implementation only allows
* a particular step to be started for once.
* beginStep()和endStep()调用与部分的边界不匹配。 这是因为当前的实现只允许启动一次特定的步骤。
*/
Step currentStep = null;
for (FileSummary.Section s : sections) {
channel.position(s.getOffset());
//使用LimitInputStream 可以解决 将文件的一部分作为输入流
InputStream in = new BufferedInputStream(new LimitInputStream(fin,
s.getLength()));
/**
* 包装压缩输入流
*/
in = FSImageUtil.wrapInputStreamForCompression(conf,
summary.getCodec(), in);
//获取section的名字
String n = s.getName();
switch (SectionName.fromString(n)) {
case NS_INFO:
loadNameSystemSection(in);
break;
case STRING_TABLE:
loadStringTableSection(in);
break;
case INODE: {
//整个目录树所有节点数据,包括INodeFile/INodeDirectory/INodeSymlink等所有类型节点的属性数据,
// 其中记录了如节点id,节点名称,访问权限,创建和访问时间等等信息;
currentStep = new Step(StepType.INODES);
prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
inodeLoader.loadINodeSection(in);
}
break;
case INODE_REFERENCE:
snapshotLoader.loadINodeReferenceSection(in);
break;
case INODE_DIR:
//整个目录树中所有节点之间的父子关系,配合INODE可构建完整的目录树;
inodeLoader.loadINodeDirectorySection(in);
break;
case FILES_UNDERCONSTRUCTION:
inodeLoader.loadFilesUnderConstructionSection(in);
break;
case SNAPSHOT:
snapshotLoader.loadSnapshotSection(in);
break;
case SNAPSHOT_DIFF:
snapshotLoader.loadSnapshotDiffSection(in);
break;
case SECRET_MANAGER: {
prog.endStep(Phase.LOADING_FSIMAGE, currentStep);
Step step = new Step(StepType.DELEGATION_TOKENS);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
loadSecretManagerSection(in);
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
break;
case CACHE_MANAGER: {
Step step = new Step(StepType.CACHE_POOLS);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
loadCacheManagerSection(in);
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
break;
default:
LOG.warn("Unrecognized section " + n);
break;
}
}
}
然后调用的就是
/**
* 整个目录树所有节点数据
* @param in
* @throws IOException
*/
void loadINodeSection(InputStream in) throws IOException {
//通过输入流 定义INodeSection
INodeSection s = INodeSection.parseDelimitedFrom(in);
//设置加载fsimage或editlog时最后分配的inode id
fsn.resetLastInodeId(s.getLastInodeId());
//打印日志 要加载的节点数
LOG.info("Loading " + s.getNumInodes() + " INodes.");
for (int i = 0; i < s.getNumInodes(); ++i) {
INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
if (p.getId() == INodeId.ROOT_INODE_ID) {
//如果是根结点 则加载根结点
loadRootINode(p);
} else {
INode n = loadINode(p);
dir.addToInodeMap(n);
}
}
}
/**
* 整个目录树中所有节点之间的父子关系
* @param in
* @throws IOException
*/
void loadINodeDirectorySection(InputStream in) throws IOException {
//加载全部快照
final List<INodeReference> refList = parent.getLoaderContext()
.getRefList();
while (true) {
INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
.parseDelimitedFrom(in);
// note that in is a LimitedInputStream
//请注意,它是一个LimitedInputStream
if (e == null) {
break;
}
INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
for (long id : e.getChildrenList()) {
INode child = dir.getInode(id);
//添加父节点
addToParent(p, child);
}
for (int refId : e.getRefChildrenList()) {
//如果是快照的话 添加快照节点
INodeReference ref = refList.get(refId);
addToParent(p, ref);
}
}
}
.......
只有一条评论 (QwQ)
6666