https://www.jianshu.com/p/7321716b3227
之前的章节说过 FSDirectory 中记录了所有的文件节点信息,而具体的文件内容则被分布式的存储在各个 DataNode 上。尽管通过 FSDirectory 我们能够知道每个节点对应的路径的真实路径,但假如我们需要对整个存储块信息进行统一归属,那么 FSDirectory 由于是树状结构的形式,每次查找信息都需要遍历树中的每一个节点,效率太低,因此产生了 BlockManager 负责统一调度 DataNode 的存储消息。

如果把 Hdfs 比喻成一个人的躯干,那么 NameNode 就是他的大脑,了解每一个节点的状态信息,控制并管理四肢( DataNode )的运作。而 BlockManager 则是整个躯干中的心脏,他源源不断的接收来自四肢的血液( BlockInfo), 再反将血液( Command )传输回四肢中,让其能够正常运作。
BlockManager

BlockManager 主要的成员有以下这些:

blocksMap: 虽然已经有了 FSDirectory 类负责维护整个文件系统的树状结构,但在树状结构中进行数据查找的效率较低,在 BlockManager 内部在 blocksMap 中维护了一个定长的数组,在 Block 类中通过重载 hashcode() 函数,实现 blockId 和 hash 码的一一对应,确保不会出现多个 block 对应同一个 hash 码的情况,使得从 blocksMap 中取数据的时间复杂度为 O(1) 。
DatanodeManager: BlockManager 负责接收管理来自 DataNode 的消息,具体的管理操作由 DatanodeManager 接管,他负责监控 DataNode 节点的状态变化以及消费 Block 信息变化指令。
DocommissionManager: 管理需要退役或检修的节点信息,在确保这些节点上的数据都被成功转移后,才将节点置为退役和检修状态,避免直接设置导致的数据丢失。

DataNode 的三板斧 register, heartbeat & reportBlock

DataNode 同 NameNode 之间是一个单向通信模型。NameNode 为了保证自身的运行效率,不会主动向 DataNode 发起通信请求,因此所有通信行为都由 DataNode 主动触发。

DataNode 中针对每个 NameNode 节点会单独启动一个 BPServiceActor 的线程对象,这个对象负责同 NameNode 建立通信链接,并定时发送心跳和存储块信息。
BPServiceActor

如图所示,启动 BPServiceActor 后,首先向 NameNode 进行 register, 之后会进行一次完整的节点存储 block 信息上报,然后进入心跳流程,定时通过 sendHeartbeat 告知 NameNode 当前节点存活,如果有发生块信息变动,则在发送心跳之后,会尝试 sendIBRs (increment block report) 发送增量的块信息变动情况。
DatanodeManager 节点管理
DatanodeManager
Register

当接收到来自 DataNode 的 register 请求后,会依据传递过来的 DatanodeRegistration 构造出一个 DatanodeDescriptor 对象,并放入 HeartbeatManager 中的数组中,此时我们认为 DataNode 节点已经在集群中注册完毕,但这个节点中究竟有哪些 Block 信息仍然是未知的。
blockReport

DataNode 通过 blockReport 之后,在 BlockManager::processReport 对上报的 Block 信息进行消费

public boolean processReport(final DatanodeID nodeID,

  final DatanodeStorage storage,
  final BlockListAsLongs newReport,
DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());

if (storageInfo == null) {
    // We handle this for backwards compatibility.
    storageInfo = node.updateStorage(storage);
}

if (storageInfo.getBlockReportCount() == 0) {
    processFirstBlockReport(storageInfo, newReport);
} else {
    invalidatedBlocks = processReport(storageInfo, newReport, context);
}
storageInfo.receivedBlockReport();

}

DataNode 节点上可能有多个存储位置用于存放 Block 数据,对于每个存储位置,都有一个 DataStorage 对象和他对应。在 BlockManager::processReport 会以 DataStorage 为单位进行消费。

private Collection processReport(

  final DatanodeStorageInfo storageInfo,
  final BlockListAsLongs report,
  BlockReportContext context) throws IOException {
Iterable<BlockReportReplica> sortedReport;
if (!sorted) {
    Set<BlockReportReplica> set = new FoldedTreeSet<>();
    for (BlockReportReplica iblk : report) {
        set.add(new BlockReportReplica(iblk));
    }
    sortedReport = set;
} else {
    sortedReport = report;
}
// 筛选出需要进行额外处理的 Block
reportDiffSorted(storageInfo, sortedReport,
                 toAdd, toRemove, toInvalidate, toCorrupt, toUC);

}

需要留意的是 FoldedTreeSet 是一个基于红黑树构造的 BlockReportReplica 遍历器,如果发现在 DataNode 端没有预先对 report 进行排序,则在这里会对 report 进行再次排序,方便在 reportDiffSorted 中对上报的 Block 信息进行筛选。

在 reportDiffSorted 中会对当前上报的 Block 进行分类,分别拆到不同的 List 中,其中:

toAdd: 被认为是正式数据,需要同 BlockInfo 进行关联的数据
toRemove: replica 的 blockId 比节点中的 blockId 更大,认为是无效数据
toInvalidate: 已经被 NameNode 移除的节点的 Replica 文件,需要通知 DataNode 移除数据
toCorrupt: 与其对应的 Block 对应的节点存在,但数据和节点中的描述数据存在差异,被认为是无效数据
toUC: 数据正处于写入过程中,等待后续写入完毕

对上报的 Block 进行解析处理之后,会根据其具体类型作出对应的处理操作,所有数据处理完毕之后,整个 Storage 的 block 信息对于 NameNode 就是已知的。
heartbeat

每次 DataNode 发送心跳后,都会在 HeartbeatManager 中更新其最近的访问时间,用以识别该节点仍旧存活。

在 DatanodeDescriptor 中有多个集合,以 invalidateBlocks, replicateBlocks 为例,他们分别代表着已经废弃的 Block 和 需要向其他节点进行备份的 Block。 在更新完毕心跳信息后,会根据集合信息建立返回指令,告诉 DataNode 在本地节点需要做的行为操作,例如删除无效的 Block 以及向其他节点进行备份操作保证数据冗余度。

同时在 HeartbeatManager 中还有一个 HeartbeatManager.Monitor 线程,通过轮询 heartCheck() 检测是否有 DataNode 过期。一旦发现有节点过期,则将其从 DatanodeManager 中移除,同时也会将其存储空间中的 Block 信息从对应节点中移除,避免无效访问。
Block 容灾备份

为了避免由于节点异常导致的数据丢失, Hdfs 采取多地备份的策略,同时在多个 DataNode 中持有同一份数据。

在 Hdfs 中多副本容灾的策略由两部分组成,首先是在文件首次创建时,通过数据管道同时在多个 DataNode 中创建 Block ,保证创建时就拥有多个副本;其次是在节点因为断开链接或则文件异常损坏的情况下,通过复制现有的副本文件到另一个节点中,保证副本的冗余度。

第一点的数据管道在前一篇文章中已经做过介绍,在这里不再多说。第二点的副本复制策略是通过 RedundancyMonitor 线程进行实现的。

public void run() {

while (namesystem.isRunning()) {
    computeDatanodeWork();
    processPendingReconstructions();
    rescanPostponedMisreplicatedBlocks();
}

}

在 RedundancyMonitor 的 while 循环中,主要执行了 computeDatanodeWork, processPendingReconstructions, rescanPostonedMisreplicatedBlocks 三个方法
computeDatanodeWork

int computeDatanodeWork() {

int workFound = this.computeBlockReconstructionWork(blocksToProcess);
workFound += this.computeInvalidateWork(nodesToProcess);

}

BlockManager 中有一个 LowRedundancyBlocks 对象负责记录那些副本数量较低的 Block 信息,在 LowRedundancyBlocks 中有一个 priorityQueues 的优先级队列,队列按照当前副本数量进行排列,急需备份的 Block 会被优先取出来中进行消费。

这里我们需要知道,当前节点异常或某个 Storage 上保存的文件异常时,会对受到影响的 BlockInfo 进行分析,如果发现副本数量不足,则会加入 priorityQueues 队列中,等待节点进行备份。

computeDatanodeWork 会先从 priorityQueues 取出指定个数的高优任务,将其加入 DatanodeDescriptor 的 replicateBlocks 集合中。然后再从 BlockManager.invalidateBlocks 中拿到被移除的无效文件的 Block 信息,加入 DatanodeDescriptor 的 invalidateBlocks 集合中。

当 DataNode 发送心跳信息到 NameNode 时,会从对应的 DatanodeDescriptor 中取出 replicateBlocks 以及 invalidateBlocks,分别对应需要进行再次备份和已经无效的 Block 块,将其作为返回信息,返回给 DataNode 进行内部处理
processPendingReconstructions

private void processPendingReconstructions() {

BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();
for (int i = 0; i < timedOutItems.length; i++) {
    if (isNeededReconstruction(bi, num)) {
        neededReconstruction.add(bi, num.liveReplicas(),
            num.readOnlyReplicas(), num.outOfServiceReplicas(),
            getExpectedRedundancyNum(bi));
    }
}

}

pendingReconstruction 中存放着一些 NameNode 认为正在进行数据生成的 Block 信息,如果长时间等待之后,发现 Block 的数据还没有生成完毕,就将对应的 Block 信息再次放入 pendingReconstruction 中,等待其他 DataNode 进行数据生成。
rescanPostponedMisreplicatedBlocks

void rescanPostponedMisreplicatedBlocks() {

Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
for (int i=0; i < blocksPerPostpondedRescan && it.hasNext(); i++) {
    MisReplicationResult res = processMisReplicatedBlock(bi);
    if (res == MisReplicationResult.POSTPONE) {
        rescannedMisreplicatedBlocks.add(b);
    }
}

}

由于数据在多个节点中进行传输,无法实时在 NameNode 的内存中进行查看,因此有时候我们无法确定某些 Block 具体处于什么状态,此时就会返回一个 MisReplicationResult.POSTPONE 结果,将 Block 再次加入回等待队列,直到能够完全确定块状态,再在 processMisReplicatedBlock 中对其进行处理。
DataNode 节点状态变化
DataNode State

DataNode 注册到 NameNode 之后,默认是 Normal 状态。但是某些特定情况下,我们如果希望对节点信息操作,则可能将其变化成其他的状态。
Docommission 退役

有时候处于业务发展需求,部分机器可能无法满足性能要求,因此我们选择将其下线,但此时这些机器上可能正存放着一些关键的 Block 块,为了确保下线机器不会影响线上的正常服务。

我们会在 NameNode 的节点中配置一个 dfs.hosts.exclude ,用于标识那些需要废弃的节点。在 NameNode 运行过程中,我们也可以通过 DFSAdmin 实时通知集群需要下线的节点信息。

当发现有机器命中了废弃节点之后,首先检查立刻移除该节点是否会造成 Block 丢失,如果会,则进入 DECOMMISSION_INPROGRESS 状态。

当节点处于 DECOMMISSION_INPROGRESS 状态时,不会再有新的 Block 块被放到这个节点中,同时这个节点中那些需要保留下的 Block 会放入 LowRedundancyBlocks.priorityQueues 队列中等待被备份。

在 BlockManger 中还有一个 DecommissionManager.Monitor 线程不断的检查那些处于 DECOMMISSION_INPROGRESS 状态的节点,如果发现节点不再影响线上数据,则将其设置为 DECOMMISIONED 状态,此时节点可以正式从集群中移除。
Maintenance 维护

和 Docommission 比较类似, Maintenance 也是在 NameNode 的节点中进行注册的,用于对那些需要进行检修的节点进行声明。

为了避免在检修过程中,由于当前检修节点拥有唯一的 Replicate,导致 Block 无法访问,和 Docommission 一样,节点会先进入一个 ENTERING_MAINTENANCE 的状态,此时节点中那些唯一的 Block 会被逐步转移到其他 Node 中,直到节点中的 Block 已经被转移,能够确保每个 Block 在其他的 DataNode 中都能被找到,才能够进入下一个状态 IN_MAINTENANCE,此时可以放心对节点进行处理。

但是如果始终都无法转移 Block 完毕,导致检修超时,则会退出 MAINTENANCE 状态回到 NORMAL 状态。
BlockManager 中的其他工作线程

除了 HeartbeatManager.Monitor,RedundancyMonitor 和 DecommissionManager.Monitor 之外,BlockManager 中还有几个独立线程,负责周期性遍历数据情况,优化节点间的 Block 分布情况。
PendingReconstructionMonitor

当我们进行文件传输和副本备份的时候,会默认挑选出一些目标节点进行传输,此时我们认为这些目标节点之后会拥有特定的 Block,这时候我们会在 PendingReconstructionBlocks 的 pendingReconstructions 中存放 BlockInfo 和期待的储存位置之间的关系。

PendingReconstructionMonitor 中会定时检查 timeout 的 pendingConstruction 事件,放入 timedOutItems 中。

RedundancyMonitor 在进行周期检查的时候,会取出这些过期的数据,然后重新分配创建任务。
StorageInfoDefragmenter

最后一个独立的工作线程是 StorageInfoDefragmenter。这个类负责优化每个 Storage 中的 block 内存占用。

DataNode 中的每一个存储路径都会被抽象成为一个 StorageInfo 对象,在这个对象中会有一棵红黑树 FoldedTreeSet 用来保存存储路径中的节点信息。随着 DataNode 节点的不断运行,不断有新的 Replica 被创建,有旧的 Replica 被移除,如果长时间不对 FoldedTreeSet 的内存做优化,则其内部的数据占用空间会越来越来,影响节点性能。

为了解决这个问题,在 BlockManager 中会启动一个 StorageInfoDefragmenter 的线程,定期通过 scanAndCompactStorages() 方法找到红黑树中 fillRatio 占用比较低的树,然后通过 compat 对树的节点进行压缩,减少内存占用情况。