文档写入流程

Index/Bulk 基本流程

新建、索引(这里的索引是动词,指写入操作,将文档添加到 Lucene 的过程称为索引文档)和删除请求都是写操作。写操作必须先在主分片执行成功后才能复制到相关的副分片。下面是基本步骤:

  1. 客户端向 NODE1 发送写请求。

  2. NODE1 使用文档 ID 来确定文档属于分片0,通过集群状态中的内容路由表信息获知分片0的主分片位于NODE3,因此请求被转发到NODE3上。

  3. NODE3 上的主分片执行写操作,如果写入成功,则它将请求并行转发到 NODE1 和 NODE2 的副分片上,等待返回结果。当所有的副分片都报告成功,NODE3 将向协调节点报告成功,协调节点再向客户端报告成功。

在客户端收到成功响应 ,意味着写操作已经在主分片和所有副分片都执行完成。

写一致性的默认策略是 quorum,即多数的分片(其中分片副本可以是主分片或副分片)在写入操作时处于可用状态。

quorum = int( (primary + number_of_replicas) / 2 ) + 1

Index/Bulk 详细流程

以不同角色节点执行的任务整理流程如下图所示:

avatar

协调节点流程

协调节点负责创建索引、转发请求到主分片节点、等待响应、回复客户端。

  1. 参数检查:如同我们平常设计的任何一个对外服务的接口处理一样,收到用户请求后首先检测请求的合法性,把检查操作放在处理流程的第一步,有问题就直接拒绝,对异常请求的处理代价是最小的。

  2. 处理 pipeline 请求:数据预处理(ingest)工作通过定义 pipeline和processors 实现。pipeline 是一系列 processors的定义,processors 按照声明的顺序执行。(比如为添加的数据动态的添加一个定义好的固定属性等等类似工作)如果 Index Bulk 请求中指定了 pipeline 参数,先使用相应的 pipline 进行处理。如果本节点不具备预处理资格,则将请求随机转发到其他具备预处理资格的节点。

  3. 自动创建索引:如果配置为允许自动创建索引(默认允许〉,则计算请求中涉及的索引,可能有多个,其中有哪些索引是不存在的,然后创建它。如果部分索引创建失败,则涉及创建失败索引的请求被标记为失败。其他索引正常执行写流程。

  4. 对请求的预先处理:这里不同于对数据的预处理,对请求的预先处理只是检查参数、自动生成 id 、处理 routing 等。如果 id 不存在,则生成一个 UUID 作为文档 id。

  5. 检测集群状态:协调节点在开始处理时会先检测集群状态,若集群异常则取消写入。例如,Master 节点不存在,会阻塞等待 Master 点直至超时。

  6. 内容路由,构建基于 shard 的请求:将用户的 bulkRequest 重新组织 shard 的请求列表。例如,原始用户请求可能有10个写操作,如果这些文档的主分片都属于同1个,则写请求被合并为1个。所以这里本质上是合并请求的过程 此处尚未确定主分片节点。

  7. 路由算法:路由算法就是根据 routing 和文档 id 计算目标 shard id 的过程。一般情况下,路由计算方式的公式:shard_num = hash(_routing) % num_primary_shards。

  8. 转发请求井等待晌应:主要是根据集群状态中的内容路由表确定主分片所在节点,转发请求并等待响应。

主分片节点流程

主分片所在节点负责在本地写主分片,写成功后,转发写副本片请求,等待响应,回复协调节点。

  1. 检查请求:主分片所在节点收到协调节点发来的请求后。是先做了校验工作,主要检测要写的是否是主分片,Allocationid 是否符合预期,索引是否处于关闭状态等。

  2. 是否延迟执行:判断请求是否需要延迟执行,如果需要延迟则放入队列,否则继续下面的流程。

  3. 判断主分片是否已经发生迁移:如果己经发生迁移,则转发请求到迁移的节点。

  4. 检测写一致性:在开始写之前,检测本次写操作涉及的 shard ,活跃 shard 数量是否足够,不足则不执行写入。默认为1,只要主分片可用就执行写入。

  5. 写 Lucene 和事务日志:遍历请求,处理动态更新宇段映射,然后逐条对 doc 进行索引。Engine 封装了 Lucene translog 的调用,对外提供读写接口。在写入 Lucene 之前,先生成 Sequence Number和Version。这些都是在 InternalEngine中实现的。 Sequence Number 每次递增 1, Version 根据 当前 doc 最大版本加 1。索引过程为先写 Lucene 后写 translog。因为 Lucene 写入时对数据有检查,写操作可能会失败。如果先写 translog ,写入 Lucene 失败, 则还需要对 translog 回滚处理。写Lucene这一步开始会对文档uid加锁,然后判断uid对应的version v2和之前update转换时的versoin v1是否一致,不一致则返回第二步重新执行。 如果version一致,如果同id的doc已经存在,则调用lucene的updateDocument接口,如果是新文档则调用lucene的addDoucument. 这里有个问题,如何保证Delete-Then-Add的原子性,ES是通过在Delete之前会加上已refresh锁,禁止被refresh,只有等待Add完成后释放了Refresh Lock, 这样就保证了这个操作的原子性。

  6. flush translog:根据配置的 trans lo flush 策略进行刷盘控制 定时或立即刷盘。

  7. 写副分片:现在己经为要写的副本 shard 准备了一个列表,循环处理每个shard,跳过 unassigned 状态的shard,向目标节点发送请求,等待响应。这个过程是异步并行的。在等待Response的过程中,本节点发出了多少个Request就要等待多少个Response。无论这些 Response 是成功的还是失败的,直到超时。收集到全部的 Response 后,执行 finish(),给协调节点返回消息,告知其哪些成功、哪些失败了。

  8. 处理副分片写失败情况:主分片所在节点将发送一个shardFailed请求给 Master,然后 Master 会更新集群状态,在新的集群状态中,这个shard将从in_sync_allocations 中删除;在routing_table shard 表中将 state由STARTED 更改为 UNASSIGNED;添加到 routingNodes 的 unassignedShards 列表。

副分片节点流程

执行与主分片基本相同的写doc过程,写完毕后回复主分片节点。在副分片的写入过程中,参数检查的实现与主分片略有不同,最终都调用 IndexShardOperationPermits#acquire 断是否需要 delay,继续后面的写流程。

异常处理

  1. 如果请求在协调节点的路由阶段失败,则会等待集群状态更新,拿到更新后,进行重试,如果再次失败,则仍旧等集群状态更新,直到超时1分钟为止。超时后仍失败则进行整体请求失败处理。

  2. 在主分片写入过程中,写入是阻塞的。只有写入成功,才会发起写副本请求。如果主shard 写失败,则整个请求被认为处理失败 如果有部分副本写失败,则整个请求被认为处理成功。

  3. 无论主分片还是副分片,当写一个 doc 失败时,集群不会重试,而是关闭本地 shard然后向 Master 汇报,删除是以 shard 为单位的。

ES系统特性

ES 本身也是一个分布式存储系统,如同其他分布式系统一样,我们经常关注的一些特性如下:

  • 数据可靠性:通过分片副本和事务日志机制保障数据安全性。

  • 服务可用性:在可用性和一致性的取舍方面,默认情况下 ES 更倾向于可用性,只要主分片可用即可执行写入操作。

  • 一致性:笔者认为是弱一致性。只要主分片写成功,数据就可能被读取。因此读取操作在主分片和副分片上可能会得到不同结果。

  • 原子性:索引的读写、别名更新是原子操作,不会出现中间状态。但 bulk 不是原子操作,不能用来实现事务。

  • 扩展性:主副分片都可以承担读请求,分担系统负载。

GET 流程

ES 的读取分为GET和Search两种操作,这两种读取操作有较大差异,GET/MGET必须指定三元组:_index、_type、_id。也就是说,根据文档id从正排索引中获取内容。而Search不指定_id,根据关键词从倒排索引中获取内容。(GET是根据三元信息一次获取一个文档,MGET是根据三元信息一次获取多个文档)

GET 基本流程

搜索和读取文档都属于读操作,可以从主分片或副分片中读取数据。读取单个文档的流程:

  1. 客户端向 NODE1 发送读请求。

  2. NODEl 使用文档 ID 来确定文档属于分片,通过集群状态中的内容路由表信息获知分片0有三个副本数据,位于所有的三个节点中,此时它可以将请求发送到任意节点,这里它将请求转发到 NODE2。

  3. NODE2 将文档返回给 NODE1, NODEl 将文档返回给客户端。

NODEl 作为协调节点,会将客户端请求轮询发送到集群的所有副本来实现负载均衡。

在读取时,文档可能己经存在于主分片上,但还没有复制到副分片。在这种情况下,读请求命中副分片时可能会报告文档不存在,但是命中主分片可能成功返回文挡。 一旦写请求成功返回给客户端,则意味着文档在主分片和副分片都是可用的。

GET 详细流程

GET/MGET 流程涉及两个节点:协调节点和数据节点,流程如下图所示:

avatar

协调节点

用来处理存在于一个单个(主或副)分片上的读请求。将请求转发到目标节点,如果请求执行失败,则尝试转发到其他节点读取。在收到读请求后,处理过程如下。

内容路由
  1. 准备集群状态、节点表等信息。

  2. 根据内容路由算法计算目标shardid,也就是文档应该落在哪个分片上。

  3. 计算出目标shardid后,结合请求参数中指定的优先级和集群状态确定目标节点,由于分片可能存在多个副本,因此计算出是一个列表。

转发请求

作为协调节点,向目标节点转发请求,或者目标是本地节点,直接读取数据。如果发送到网络,则请求被异步发送,等待数据节点的回复,如果数据节点处理成功,返回给客户端;如果数据节点处理失败,则进行重试。内容路由结束时构造了目标节点列表的迭代器,重试发送时,目标节点选择迭代器的下一个。

数据节点

读取数据并组织成 Response ,给客户端 channel 返回。

读取及过滤
  1. 通过indexShard.get()获取Engine.GetResult()。 Engine.GetResult()类与 innerGet 返回的GetResult是同名的类,但实现不同。IndexShard.get()最终调用 Intema!Engin get 读取数据。

  2. 调用 ShardGetService#innerGetLoadFromStoredFields(),根据 type、id、DocumentMapp等信息从刚刚获取的信息中获取数据,对指定的 field、source 进行过滤(source 过滤只支持对字段),把结果存于 GetResult 对象中。

Intema!Engine#get 过程会加读锁。处理 realtime 选项,如果为 true ,则先判断是否有数据可以刷盘,然后调用 Searcher 进行读取。Searcher 是对 IndexSearcher 的封装。在早期的 ES 版本中,如果开启(默认)realtime,则会尝试从 translog 读取,刚写入不久的数据可以从 translog 中读取。5.x 开始不会从 translog 中读取,只从 Lucene 读。 realtime 的实现机制变成依靠 refresh 实现。

注意:GET API 默认是实时的,不受索引刷新(refresh)频率设置的影响。如果文档己经更新,但还没有刷新,则 GET API 将会发出一次刷新调用,使文档可见。

MGET 详细流程

avatar

主要流程:

  1. 遍历请求,计算出每个 doc 的路由信息,得到由 shardid key 组成的 request map。这个过程没有在 TransportSingleShardAction 中实现,是因为如果在那里实现,shardid 就会重复,这也是合并为基于分片的请求的过程。

  2. 循环处理组织好的每个 shard 级请求,调用处理 GET 请求时使用 TransportSingleShardAction#AsyncSingleAction 处理单个 doc 的流程。

  3. 收集 Response ,全部 Response 返回后执行 finishHim(),给客户端返回结果。

回复的消息中文档顺序与请求的顺序一致。如果部分文档读取失败,则不影响其他结果,检索失败的 doc 会在回复信息中标出。

注意:update 操作需要先 GET 再写,为了保证一致性,update调用 GET 时将 realtime 选项设置为 true,并且不可配置,因此update操作可能会导致 refresh 生成新的 Lucene 分段。

Search 流程

索引和搜索

ES 中的数据可以分为两类:精确值和全文。

  • 精确值,比如日期和用户 id、IP 地址等。

  • 全文,指文本内容,比如一条日志,或者邮件的内容。

这两种类型的数据在查询时是不同的:对精确值的比较是二进制的,查询要么匹配,要么不匹配。全文内容的查询无法给出 有 还是 没有 的结果,它只能找到结果是“看起来像”你要查询的东西,因此把查询结果按相似度排序,评分越高,相似度越大。

对数据建立索引和执行搜索的原理如下图所示:

avatar

建立索引

如果是全文数据,则对文本内容进行分析,这项工作在 ES 中由分析器实现。分析器实现如下功能:

  • 字符过滤器:主要是对字符串进行预处理,例如,去掉 HTML ,将&转换成and等。

  • 分词器(Tokenizer):将字符串分割为单个词条,例如,根据空格和标点符号分割,输出的词条称为词元(Token)。

  • Token 过滤器:根据停止词(Stop word)删除词元,例如, and the 等无用词,或者根据同义词表增加词条,例如,jump 和 leap。

  • 语言处理:对上一步得到的 Token 做一些和语言相关的处理,例如,转为小写,以及将单词转换为词根的形式。语言处理组件输出的结果称为词(Term)。

分析完毕后,将分析器输出的词(Term)传递给索引组件,生成倒排和正排索引,再存储到文件系统中。

文档 -> 字符过滤器 -> 分词器 -> 词条过滤器 -> 语言处理 -> 文档写入(正排索引 既 doc values) -> 倒排索引

执行搜索

搜索调用 Lucene 完成,如果是全文检索,则:

  1. 对检索字段使用建立索引时相同的分析器进行分析,产生 Token 列表;

  2. 根据查询语句的语法规则转换成一颗语法树;

  3. 查找符合语法树的文档。

  4. 对匹配到的文档列表进行相关性评分,评分策略一般使用 TF/IDF;(新版现在都是BM25)

5.根据评分结果进行排序。

分布式搜索过程

一个搜索请求必须询问请求的索引中所有分片的某个副本来进行匹配。一次搜索请求只会命中所有分片副本中的一个。当搜索任务执行在分布式系统上时, 整体流程如下图所示:

avatar

协调节点流程

协调节点有两个流程,Query阶段 和 FETCH阶段。

Query阶段

QUERY_THEN_FETCH 搜索类型的查询阶段步骤如下:

  1. 客户端发送 search 请求到 NODE3。

  2. Node3 将查询请求转发到索引的每个主分片或副分片中。

  3. 每个分片在本地执行查询,并使用本地的 Term/Document Frequency 信息进行打分,添加结果到大小为 from + size 的本地有序优先队列中。

  4. 每个分片返回各自优先队列中所有文档的 ID 和排序值给协调节点,协调节点合并这些值到自己的优先队列中,产生一个全局排序后的列表。

协调节点广播查询请求到所有相关分片时,可以是主分片或副分片,协调节点将在之后的请求中轮询所有的分片副本来分摊负载。查询阶段并不会对搜索请求的内容进行解析,无论搜索什么内容,只看本次搜索需要命中哪些 shard,然后针对每个特定 shard 选择一个副本,转发搜索请求。

Query阶段详细流程
  1. 解析请求:将请求体解析为 SearchRequest 数据结构。

  2. 构造目的 shard 列表:将请求涉及的本集群 shard 列表和远程集群的 shard 列表(远程集群用于跨集群访)合并。

  3. 遍历所有 shard 发送请求:请求是基于 shard 遍历的,如果列表中有 shard 位于同一个节点,则向其发送N次请求,并不会把请求合并为一个。

  4. 收集返回结果:对收集到的结果进行合并。

FETCH阶段

Query 阶段知道了要取哪些数据,但是并没有取具体的数据,这就是 Fetch 阶段要做的。

Fetch阶段由以下步骤构成:

  1. 协调节点向相关 NODE 发送 GET 请求。

  2. 分片所在节点向协调节点返回数据。

  3. 协调节点等待所有文档被取得,然后返回给客户端。

分片所在节点在返回文档数据时,处理有可能出现的 _source 字段和高亮参数。

协调节点首先决定哪些文档确实需要被取回,例如,如果查询指定了 { from:90, size:10 },则只有从第91个开始的10个结果需要被取回。为了避免在协调节点中创建的 number_of_shards * (from + size)优先队列过大,应该尽量控制分页深度。

Fetch阶段详细流程

Fetch 阶段的目的是通过文档 ID 获取完整的文档内容。

  1. 发送 Fetch 请求。

  2. 收集结果。

  3. ExpandSearchPhase:取回阶段完成之后执行 ExpandSearchPhase#run, 主要判断是否启用字段折叠,根据需要实现字段折叠功能,如果没有实现字段折叠,直接返回给客户端。

  4. 回复客户端。

数据节点流程

晌应 Query 请求

主要过程就是执行查询,然后发送 Response。查询时,先看是否允许 cache ,由以下配置决定: index.requests.cache.enable 。

默认为 true,会把查询结果放到 cache 中, 查询 优先从 cache 中取。这 cache 由节点的所有分片共享,基于 LRU 算法实现:空间满的时候删除最近最少使用的数据。cache 并不缓存全部检索结果。

注意:慢查询 Query 日志的统计时间在于本阶段的处理时间。聚合操作在本阶段实现,在 Lucene 检索后完成。

响应 Fetch 请求

主要过程是执行 Fetch 然后发送 Response。慢查 Fetch 日志的统计时间在于本阶段的处理时间。

小结

  • 聚合是在 ES 实现的,而非 Lucene。

  • Query Fetch 请求之间是无状态的,除非是 scroll 方式。

  • 分页搜索不会单独“cache”, cache和分页没有关系。

  • 每次分页的请求都是一次重新搜索的过程,而不是从第一次搜索的结果中获取。看上去不太符合常规的做法,事实上互联网的的搜索引擎都是重新执行了搜索过程:人们基本只看前几页,很少深度分页;重新执行一次搜索很快;如果缓存第一次搜索结果等待翻页命中,则这种缓存的代价较大,意义却不大,因此不如重新执行一次搜索。

  • 搜索需要遍历分片所有的 ucen 分段,因此合并 Lucene 分段对搜索性能有好处。

  • es 默认采用的分页方式是 from+ size 的形式,在深度分页的情况下,这种使用方式效率是非常低的,比如我们执行如下查询:

1
2
3
4
5
6
7
8
GET /student/student/_search
{
"query":{
"match_all": {}
},
"from":5000,
"size":10
}

意味着 es 需要在各个分片上匹配排序并得到5010条数据,协调节点拿到这些数据再进行排序等处理,然后结果集中取最后10条数据返回。我们会发现这样的深度分页将会使得效率非常低,因为我只需要查询10条数据,而es则需要执行from+size条数据然后处理后返回。

其次:es为了性能,限制了我们分页的深度,es目前支持的最大的 max_result_window = 10000;也就是说我们不能分页到10000条数据以上。

在es中如果我们分页要请求大数据集或者一次请求要获取较大的数据集,scroll都是一个非常好的解决方案。

使用scroll滚动搜索,可以先搜索一批数据,然后下次再搜索一批数据,以此类推,直到搜索出全部的数据来scroll搜索会在第一次搜索的时候,保存一个当时的视图快照,之后只会基于该旧的视图快照提供数据搜索,如果这个期间数据变更,是不会让用户看到的。每次发送scroll请求,我们还需要指定一个scroll参数,指定一个时间窗口,每次搜索请求只要在这个时间窗口内能完成就可以了。

最后更新: 2021年04月14日 09:15

原始链接: https://jjw-story.github.io/2020/01/01/Elasticsearch-核心技术二/

× 请我吃糖~
打赏二维码