索引恢复流程

索引恢复是 ES 数据恢复过程。待恢复的数据是客户端写入成功,但未执行刷盘(flush)的 Lucene 分段。例如,当节点异常重启时,写入磁盘的数据先到文件系统的缓忡,未必来得及刷盘,如果不通过某种方式将未刷盘的数据找回来,则会丢失-些数据,这是保持数据完整性的体现。另一方面,由于写入操作在多个分片副本上没有来得及全部执行,副分片需要同步成和主分片完全一致,这是数据副本一致性的体现。

根据数据分片性质,索引恢复过程可分为主分片恢复流程和副分片恢复流程:

  • 主分片从 translog 中自我恢复,尚未执行 flush 到磁盘的 Lucene 分段可以从 translog 重建。

  • 副分片需要从主分片中拉取 Lucene 分段和 translog 进行恢复但是有机会跳过拉取 Lucene 分段的过程。

主分片恢复流程

INIT 阶段

一个分片的恢复流程中,从开始执行恢复的那一刻起,被标记为 INIT 阶段,恢复流程在新的线程池中开始执行,开始阶段主要是一些验证工作,例如,校验当前分片是否为主分片,分片状态是否异常等。

INDEX 阶段

本阶段从 Lucene 读取最后一次提交的分段信息,获取其中的版本号,更新当前索引版本。

VERIFY_INDEX 阶段

VERIFY INDEX 中的 INDEX 指 Lucene index,因此本阶段的作用是验证当前分片是否损坏,是否进行本项检查取决于配置项:index.shard.check_on_startup。在索引的数据量较大时,分片检查会消耗更多的时间,默认配置为不执行验证索引,进入最重要的 TRANSLOG 阶段。

TRANSLOG 阶段

一个 Lucene 索引由许多分段组成,每次搜索时遍历所有分段。内部维护了一个称为“提交点”的信息,其描述了当前 Lucene 索引都包括哪些分段,这些分段已经被 fsync 系统调用,从操作系统的 cache 刷入磁盘。每次提交操作都会将分段刷入磁盘实现持久化。

本阶段需要重放事务日志中尚未刷入磁盘的信息,因此,根据最后一次提交的信息做快照,来确定事务日志中哪些数据需要重放。重放完毕后将新生成的 Lucene 数据刷入磁盘。

FINALIZE 阶段

本阶段执行刷新( refresh )操作,将缓冲的数据写入文件,但不刷盘,数据在操作系统的cache中。

DONE 阶段

DONE 阶段是恢复工作的最后一个阶段, 进入 DONE 段之前再次执行 refresh 然后更新分片状态。至此,主分片恢复完毕,对恢复结果进行处理。

副分片恢复流程

流程概述

副分片恢复的核心思想是从主分片拉取 Lucene 分段和 translog 进行恢复,按数据传递的方向,分片节点称为 Source,副分片节点称为 Target。

为什么需要拉取主分片 translog?因为在副分片恢复期间允许新的写操作,从Lucene分段的那一刻开始,所恢复的副分片数据不包括新增的内容,而这些内容存在于主分片的translog中,因此副分片需要从主分片节点拉取 translog 进行重放,以获取新增内容,这就需要主分片节 translog 不被清理。为了防止主分片节点 translog 清理,这方面的实现机制历了多次选代。这里只讲述6.0版本后的处理。

从6.0 版本开始,引入 TranslogDeletionPolicy 的概念,负责维护活跃的 translog 文件。这个类的实现非常简单,它将 translog 做一个快照来保持 translog 不被清理,这样使用者只需创建一个快照,无须担心视图之类。恢复流程实际上确实需要视图,现在可以通过获取一个简单的保留锁来防止清理translog,这消除了视图概念的需求。

在保证 translog 不被清理后,恢复核心处理过程由两个内部阶段(phase)组成:

  • phase1:在主分片所在节点,获取translog保留锁,从获取保留锁开始,会保留translog不受其刷盘清空的影响。然后调用Lucene接口把 shard 做快照,快照含有 shard 中己刷到磁盘的文件引用,把这些 shard 数据复制到副本节点。在 phase1 结束前,会向副分片节点发送告知对方启动 Engine,在 phase2 开始之前,副分片就可以正常处理写请求了。

  • phase2:对 translog 做快照,这个快照里包含从 phase1 开始,到执行 translog 快照期间的新增索引,将这些 translog 发送到副分片所在节点进行重放。

由于 phase1 需要通过网络复制大量数据,过程非常漫长,在 ES 6.x 中,有两个机会可以跳 phase1:

  1. 如果可以基于恢复请求中的 SequenceNumber 进行恢复,则跳过 phase1。

  2. 如果主副两分片有相同的 syncid 且 doc 数相同,则跳过 phase1。

synced flush 机制

为了解决副分片恢复过程第一阶段时间太漫长而引入了 synced flush,默认情况下5分钟没有写入操作,索引被标记为 inactive,执行 synced flush 生成一个唯一 syncid,写入分片的所有副本中。这个 syncid 是分片级,意味着拥有相同 syncid 的分片具有相同的 Lucene 索引。synced flush 本质上是一次普通的 flush 操作,只是在 Lucene commit 过程中多写了一个syncid。原则上,在没有数据写入的情况下,各分片在同一时间flush成功后,它们理应有相同的 Lucene 索引内容,无论 Lucene 分段是否一致,于是给分片分配一个 id 表示数据一致。

但是显然 synced flush 期间不能有新写入的内容,如果 sync flush 执行期间收到写请求,则ES 选择了写入可用性:让 synced flush 失败,让写操作成功。在没有执行 flush 的情况下己有 syncid不会失效。在某个分片上执行普通 flush 操作会删除己有syncid 因此,synced flush 操作是一个不可靠操作,只适用于冷索引。

副分片节点处理过程

副分片恢复的 VERIFY_INDEX、TRANSLOG、FINALIZE 三个阶段由主分片节点发送的RPC 调用触发。核心流程如下图所示:

avatar

INIT 阶段

本阶段在副分片节点执行。与主分片恢复的 INIT 阶段类似,恢复任务开始时被设置为 INIT,构建准备发往主分片 StartRecoveryRequest 请求,请求中包括将本次要恢复的shard信息,如shardid、metadataSnapshot等。metadataSnapshot包含 syncid。

INDEX 阶段

INDEX 阶段负责将主分片的Lucene数据复制到副分片节点,向主分片节点发送 action 为 intemal:index/shard/recovery/start_recovery的 RPC 请求,并阻当前线程,等待响应,直到对方处理完成,然后设置为DONE阶段。(概括来说,主分片节点收到请求后把 Lucene translog 发送给副分片)。在这期间主分片节点会发送几次RPC调用,通知副分片节点启 Engine,执行清理等操作。VERIFY_INDEX 和 TRANSLOG 阶段也是由主分片节点的 RPC 调用触发的。

VERIFY_INDEX 阶段

副分片的索引验证过程与主分片相同,是否进行验证取决于配置。默认为不执行索引验证。

TRANSLOG 阶段

TRANSLOG 阶段负责将主分片的 translog 复制到副分片节点进行重放。先创建新的 Engine,跳过 Engine 自身的 translog 恢复。此时主分片 phase2 尚未开始,接下来的 TRANSLOG 阶段就是等待主分片节点将 translog 发到副分片节点进行重放,也就是
phase2 的执行过程。

FINALIZE 阶段

主分片节点执行完 phase2,调用 finalizeRecovery 向副分片节点发送 action为internal:index/shard/recovery/finalize的 RPC 请求,副分片节点对此action处理为先更新全局检查点,然后执行与主分片相同的清理操作。

DONE 阶段

副分片节点等待 INDEX 阶段执行完成后进入 DONE 阶段,主要处理是调用 indexShard#postRecovery,与主分片的postRecovery处理过程相同,包括对恢复成功或失败的处理,也和主分片的处理过程相同。

主分片节点处理过程

核心流程如下图所示:

avatar

主分片节点收到副分片节点发送的恢复请求,执行恢 ,然后返回结果,这里也是阻塞处理的过程。

  1. 首先获取取一个保留锁,使 translog 不被清理。

  2. 判断是否可以从 SequenceNumber 恢复。(除了异常检测和版本号检测,主要在 isTranslogReadyForSequenceNumberBasedRecovery中判断请求的序列号号是否小于主分片节的 local Checkpoint,以及 translog 中的数据是否足以恢复(有可能因为 translog 数据太大或过期删除而无法恢复)。以请求的序列号作为最小值做一个快照,遍历这个值从开始到最新的数据之间的操作,检查序列号验证事务日志中的操作是否完整。

  3. 如果可以基于 SequenceNumber 恢复,则跳过 phasel 否则调用 Lucene 接口对分片做快照,执行 phasel。

  4. 等待 phasel 执行完毕,主分片节点通知副分片节点启动此分片的 Engine。该方法会阻塞处理,直到分片 Engine 启动完毕。待副分片启动 Engine 完毕,就可以正常接收写请求了。注意,此时 phase2 未开始,此分片的恢复流程尚未结束。

  5. 等待当前操作处理完成后,以startingSeqN为起始点,对 translog 做快照,开始执 phase2。(如果基于 SequenceNumber 恢复,则 sartingSeqNo 取值为恢复请求中的序列号,从请求的序列号开始快照 translog。否则取值为0,快照完整的 translog)

  6. 最后执行清理工作,该方法向副分片节点发送action为internal:index/shard/recovery/finalize的 RPC 请求告知对方执行清理,同时把全局检查点发送过去,等待对方执行成功,主分片更新全局检查点。

  • phase1 详解:

phase1 检查目标节点上的段文件,井对缺失的部分进行复制。只有具有相同大小和校验和的段才能被重用。但是由于分片副本执行各自的合并策略,所以合并出来的段文件相同的概率很低。在对比分段之前,先检查主副两分片是否都有 syncid,如果 syncid 相同,且 doc 数相同,跳过phase1,否则对比文件差异,发送文件。

  • phase2 详解:

phase2 将 translog 批量发送到副分片节点,发送时将待发送 translog 组合成一批来提高发送效率,默认的批量大小为 512KB,不支持配置。

recovery 速度优化

众所周知,索引恢复是集群启动过程中最缓慢的过程,集群完全重启,或者 Master 节点挂掉后,新选出的 Master 也有可能执行这个过程。下面归纳有哪些方法可以提升索引恢复速度:

  • 配置项cluster.routing.allocation.node_concurrent_recoveries 决定了单个节点执行副分片recovery时的最大并发数(进/出),默认为1,适当提高此值可增加 recovery 井发数。

  • 配置项 indices.recovery_max_bytes_per_sec 决定节点间复制数据时的限速,可以适当提此值或取消限速。

  • 配置项 cluster.routing.allocation.node_initial_primaries_recoveries 决定了单个节点执行主分片 recovery 时的最大并发数,默认为4。由于主分片的恢复不涉及在网络上复制数据,仅在本地磁盘读写,所以在节点配置了多个数据磁盘的情况下,可以适当提高此值。

  • 在重启集群之前,先停止写入端,执行 sync flush,让恢复过程有机会跳过 phase1。

  • 适当地多保留些translog,配置项 index.translog.retention.size 默认最大保留 512MB,index.translog.retention.age 默认为不超过 12 小时。调整这两个配置可让恢复过程有机会跳过 phase1。

  • 合并 Lucene 分段,对于冷索引甚至不再更新的索引执行_forcemerge,较少的 Lucene 分段可以提升恢复效率,例如,减少对比,降低文件传输请求数量。

gateway 模块 (元数据存储和恢复模块)

gateway 模块负责集群元信息的存储和集群重启时的恢复。

元数据

ES 中存储的数据有以下几种:

  • state 元数据信息;

  • index Lucene 生成的索引文件;

  • translog 事务日志。

元数据信息又有以下几种:

  • nodes/O/_state/*.st 集群层面元信息;

  • nodes/O/indices/{index_uuid}/_state/*.st 索引层面元信息;

  • nodes/O/indices/{index_uuid}/0/_state/*.st,分片层面元信息。

分别对应 ES 中的数据结构:

  • MetaData(集群层),主要是 clusterUUID、settings、templates。

  • IndexMetaData(索引层),主要是 numberOfShards、mappings。

  • ShardStateMetaData(分片层),主要是 version、indexUUID、primary 等。

上述信息被持久化到磁盘,需要注意的是:持久化的 state 不包括某个分片存在于哪个节点这种内容路由信息,集群完全重启时,依靠 gateway 的 recovery 过程重建 RoutingTable 。当读取某个文档时,根据路由算法确定目的分片后,从RoutingTable中查找分片位于哪个节点,然后将请求转发到目的节点。

元数据的持久化

只有具备 Master 资格的节点和数据节点可以持久化集群状态。当收到主节发布的集群状态时,节点判断元信息是否发生变化,如果发生变化将其持久化到磁盘中。

元数据的恢复

上述的三种元数据信息被持久化存储到集群的每个节点,当集群完全重启(full restart)时,由于分布式系统的复杂性,各个节点保存的元数据信息可能不同。此时需要选择正确的元数据作为权威元数据。

gateway 的 recovery 负责找到正确的元数据,应用到集群。

当集群完全重启,达到 recovery 条件时,进入元数据恢复流程,一般情况下,recovery件由以下三个配置控制:

  • gateway.expected_nodes:预期的节点数。加入集群的节点数(数据节点或具备 Master格的节点)达到这个数量后即开始 gateway 恢复。默认为0。

  • gateway.recover_after_time,如果没有达到预期的节点数量,则恢复过程将等待配置的时间,再尝试恢复。默认为 5min。

  • gateway.recover_after_nodes,只要配置数量的节点(数据节点或具备 Master资格的节点)加入集群就可以开始恢复。

当集群完全启动时,gateway 模块负责集群层和索引层的元数据恢复,分片层元数据恢复在allocation模块实现,但是由 gateway 模块在执行完上述两个层次恢复工作后触发。因此,三个层次的元数据恢复是 gateway 块和 allocation 模块共同完成的。

元数据恢复流程分析

  1. Master 选举成功之后,判断其持有的集群状态中是否存在 STATE_NOT_RECOVEBLOCK,如果不存在,则说明元数据已经恢复,跳过 gateway 过程,否则等待。

  2. Master 从各个节点主动获取元数据信息。

  3. 从获取的元数据信息中选择版本号最大的作为最新元数据,包括集群级、索引级。

  4. 两者确定之后,调用 allocation 模块的 reroute,对未分配的分片执行分配,主分片分配过程中会异步获取各个 shard 级别元数据,默认超时为 13s。

集群级和索引级元数据信息是根据存储在其中的版本号来选举的,而主分片位于哪个节点却是 allocation 模块动态计算出来的,先前主分片不一定还被选为新的主分片。

选举集群级和索引级别的元数据

  1. 判断是否满足进入 recovery 条件,当满足条件时,进入 recovery 主要流程。

  2. 首先向有 Master 资格的节点发起请求,获取它们存储的元数据。

  3. 等待回复 ,必须收到所有节点的回复,无论回复成功还是失败(节点通信失败异常会被捕获,作为失败处理),此处没有超时。

  4. 在收齐的这些恢复中,有效元信息的总数必须达到指定数量。异常情况下,例如,某个节点上元信息读取失败,则回复信息中元数据为空。

  5. 接下来就是通过版本号选取集群级和索引级元数据。

触发 allocation

当上述两个层次的元信息选举完毕,调用 clusterService.submitStateUpdateTask 提交一个集群任务,该任务在 masterService#updateTask 线程池中执行。主要工作是构建集群状态(ClusterState),其中的内容路由表依赖 allocation 模块协助完成,调用 allocationService.reroute 进入下一阶段:异步执行分片层元数据的恢复,以及分片分配。updateTask 线程结束。

至此,gateway 恢复流程结束,集群级和索引级元数据选举完毕,如果存在未分配的主分片,分片级元数据选举和分片分配正在进行中。

allocation 模块 (分片分配模块)

这里注意部署意识的概念

什么是 allocation

分片分配就是把一个分片指派到集群中某个节点的过程,分配决策由主节点完成,分配决策包含两方面:

  • 哪些分片应该分配给哪些节点;

  • 哪个分片作为主分片,哪些作为副分片。

对于新建索引和己有索引,分片分配过程也不尽相同。不过不管哪种场景,ES 都通过两个基础组件完成工作: allocators 和 deciders。 allocators 尝试寻找最优的节点来分配分片,deciders则负责判断并决定是否要进行这次分配。

allocators 负责为某个特定的分片分配目的节点。每个 allocator 的主要工作是根据某种逻辑得到一个节点列表,然后调用 deciders 去决策,根据决策结果选择一个目的 node。

分片分配重点

  • 对于新建索引,allocators 负责找出拥有分片数最少的节点,并按分片数量升序排序,因此分片较少的节点会被优先选择。所以对于新建索引,allocators 的目标就是以更均衡的方式把新索引的分片分配到集群的节点中,然后 deciders 依次遍历 allocators 给出的节点,并判断是否把分片分配到该节点。例如:如果分配过滤规则中禁止节点 A 持有索引 idx 中的任一分片,那么过滤器也阻止把索引 idx 分配到节点 A 中,即便A节点是 allocators 集群负载均衡角度选出的最优节点。需要注意的是,allocators 只关心每个节点上的分片数,而不管每个分片的具体大小。这恰好是 deciders 工作的一部分,既阻止把分片分配到超出节点磁盘容量阈值的节点上。

  • 对于己有索引,则要区分主分片还是副分片。对于主分片, allocators 只允许把主分片指定在己经拥有该分片完整数据的节点上。而对于副分片,allocators 是先判断其他节点上是否己有该分片的数据的副本(即便数据不是最新的)。如果有这样的节点,则allocators 优先把分片分配到其中一节点。因为副分片一旦分配,就需要从主分片中进行数据同步,所以当一个节点只拥分片中的部分数据时,也就意味着那些未拥有的数据必须从主节点中复制得到。这样可以明显地提高副分片的数据恢复速度。

分片分配触发时机

触发分片分配有以下几种情况:

  • index 增删。

  • node 增删。

  • 手工 reroute。

  • replica 数量改变。

  • 集群重启。

流程分析

gateway 阶段恢复集群状态中,我们己经知道集群一共有多少个索引,每个索引的主副分片各有多少个,但是不知道它们位于哪个节点,现在需要找到它们都位于哪个节点。集群完全重启的初始状态,所有分片都被标记为未分配状态,此处也被称作分片分配过程。因此分片分配的概念不仅仅是分配一个全新分片。

对于索引某个特定分片的分配过程中,先分配其主分片,后分配其副分片。

gatewayAllocator

gatewayAllocator 分为主分片和副分片分配器。

主分片分配器

  1. 遍历所有未分配的分片依次处理,通过 decider 决策分配,期间可能需要 fetchData 获取这个 shard 对应的元数据。如果决策结果为 YES,则将其初始化。

  2. 主副分片执行相同的函数,将分片的 unassigned 状态改为initialize 状态。(设置 RoutingNodes 己更新。更新的内容大约就是某个 shard 被分配到了某个节点,这个 shard 是主还是副,副的话会设置 recoverySource PEER,但只是一个类型,并没有告诉节点 recovery 的时候从哪个节点恢复,节点恢复时自己从集群状态中的路由表中查找。)

  3. Mast 把新的集群状态广播下去,当数据节点发现某个分片分配给自己,开始执行分片的 recovery。

  4. 主分片分配器返回指定的分片是否可以被分配,如果还没有这个分片的信息,则向集群的其他节点去请求该信息;如果己经有了,则根据 decider 进行决策。

首次进入函数时,还没有任何分片的元信息,发起向集群所有数据节点获取某个 shard 元信息的 fetchData 请求。之所以把请求发到所有节点,是因为它不知道哪个节点有这个 shard 的数据。集群启动的时候,遍历所有 shard ,再对每个 shard 向所有数据节点发 fetchData 请求,如果集群有 100 个节点、1000 个分片,则总计需要请求 100 * 1000 = 100000 。虽然是异步的,但仍然存在效率问。当 ES 集群规模比较大、分片数非常多的时候,这个请求的总量就会很大。

  1. 向各节点发起 fetchData 请求。(从所有数据节点异步获取某个特定分片的信息,没有超时设置)

  2. 数据节点的响应。(数据节点读取本地 shard 元数据返回请求方)

  3. 收集返回结果并处理。(进入主分片分配过程,依据这些元信息选择主分片)

  4. 主分片选举实现。(ES 之后的主分片选举与之前的版本机制是不一样的。 ES 5 之前的版本依据分片元数据的版本号对比实现,选择分片元信息中版本号高的分片来选举主分片, ES 5 及之后的版本依据allocation id 从 inSyncAllocationlds 中选择1个作为主分片。这种改变的主要原因是依据版本号无法保证版本号最高的分片一定被选为主分片。例如,当前只有1个活跃分片,那它被选为主分片,而拥有最新数据的分片尚未启动。)

副分片分配器

副分片决策过程中也需 fetchData, 只不过主分片分配节点已经 fetch 过,可以直接从结果中获取,但是在“fetchData”之前先运行一遍 allocation.deciders().canAllocate 来判断是否至少可以在一个node上分配,如果分配不了就省略后面的逻辑了,例如,其主分片尚未就绪等。

本章注意点:

不需要等所有主分片都分配完才执行副分片的分配。每个分片有自己的分配流程。

不需要等所有分片都分配完才执行 recovery 流程。

主分片不需要等副分片分配成功才进入主分片的 re co very 副分片有自己的 recovery流程。

略过模块

  1. Cluster 模块分析

  2. Transport 模块分析

  3. ThreadPool 模块分析

Shrink 原理分析

Shrink API ES 5.0 之后提供的新功能,其可以缩小主分片数量。但其并不对源索引直接进行缩小操作,而是使用与源索引相同的配置创建一个新索引,仅降低分片数。由于添加新文档时使用对分片数量取余获取目的分片的关系,新索引的主分片数必须是源索引主分片数的因数。例如,8个分片可以缩小到 4、2、1 个分片。如果源索引的分片数为素数,则目标索引的分片数只能为1。

Shrink 的工作原理

  • 以相同配置创建目标索引,但是降低主分片数量。

  • 从源索引的 Lucene 分段创建硬链接到目的索引。如果系统不支持硬链接,那么索引的所有分段都将复制到新索引,将会花费大量时间。

  • 对目标索引执行恢复操作,就像一个关闭的索引重新打开时一样。

创建新索引

使用旧索引的配置创建新索引,只是减少主分片的数量,所有副本都迁移到同一个节点。显然,创建硬链接时,源文件和目标文件必须在同一台主机。

创建硬链接

从源索引到目的索引创建硬链接。如果操作系统不支持硬链接,则复制 Lucene 分段。

为什么一定要硬链接,不使用软链接?

Linux 的文件系统由两部分组成(实际上任何文件系统的基本概念都相似):inode 和 block。block 用于存储用户数据,inode 用于记录元数据,系统通过 inode 定位唯一的文件。

硬链接:文件有相同的 inode 和 block。

软链接:文件有独立的 inode 和 block, block 内容为目的文件路径名。

那么为什么一定要硬链接过去呢?从本质上来说,我们需要保证 Shrink 之后,源索引和目的索引是完全独立的,读写和删除都不应该互相影响,如果软链接过去,删除濒索引,则目的索引的数据就会被删除,硬链接则不会。

使用硬链接,删除源索引,只是将文件的硬链接数量减 1,删除源索引和目的索引中的任何一个,都不影响另一个正常读写。

由于使用了硬链接,也因为硬链接的特性带来一些限制:不能交叉文件系统或分区进行硬链接的创建,因为不同分区和文件系统有自己的 inode。

不过,既然都是链接,shrink 完成后,修改源索引,目的索引会变吗?答案是不会。虽然链接到了源分段,shrink 期间索引只读,目标索引能看到的只有源索引的当前数据,Shrink完成后,由于 Lucene 中分段的不变性,“write once”机制保证每个文件都不会被更新。源索引新写入的数据随着 refresh 会生成新分段,而新分段没有链接,在目标索引中是看不到的。如果源索引进行 merge,对源分段执行删除时,只是硬链接数量减1,目标索引仍然不受影响 因此,shrink 完毕后最终的效果就是,两个索引的数据看起来是完全独立的。

经过链接过程之后,主分片己经就绪,副分片还是空的,通过 recovery 将主分片数据复制到副分片。

最后更新: 2021年04月14日 09:16

原始链接: https://jjw-story.github.io/2020/01/08/Elasticsearch-核心技术三/

× 请我吃糖~
打赏二维码