bulk_reshuffle

bulk_reshuffle #

描述 #

极限网关具有本地计算每个索引文档对应后端 Elasticsearch 集群的目标存放位置的能力,从而能够精准的进行请求定位,在一批 bulk 请求中,可能存在多个后端节点的数据,bulk_reshuffle 过滤器用来将正常的 bulk 请求打散,按照目标节点或者分片进行拆分重新组装,避免 Elasticsearch 节点收到请求之后再次进行请求分发, 从而降低 Elasticsearch 集群间的流量和负载,也能避免单个节点成为热点瓶颈,确保各个数据节点的处理均衡,从而提升集群总体的索引吞吐能力。

配置示例 #

一个简单的示例如下:

flow:
  - name: online_indexing_merge
    filter:
      - name: bulk_reshuffle
        parameters:
          elasticsearch: prod
          level: node
          mode: sync
      - name: elasticsearch
        parameters:
          elasticsearch: prod
          refresh:
            enabled: true
            interval: 30s

以上配置表示会将 bulk 请求拆分,按照索引文档所对应的目标节点,重新拆组装,然后分别同步提交到目标 Elasticsearch 节点。

节点级别的异步提交 #

默认的同步提交方式可能受目标 Elasticsearch 服务的性能影响,极限网关支持异步的提交方式,将数据线落地到本地磁盘队列,然后通过单独的任务来消费提交。

当极限网关处于开启异步模式的情况下,就算后端 Elasticsearch 集群出现故障也不会影响索引操作的正常进行,因为请求都是存放在网关本地的磁盘队列,从而解耦了前端索引和后端集群的依赖。因此就算后端 Elasticsearch 集群出现故障、进行重启、或是版本升级都不会影响正常的索引操作。

配置流程 #

首先定义一个异步的请求处理流程。

flow:
  - name: online_indexing_merge
    filter:
      - name: bulk_reshuffle
        parameters:
          elasticsearch: prod
          level: node
          mode: async
      - name: elasticsearch
        parameters:
          elasticsearch: prod
          refresh:
            enabled: true
            interval: 30s

极限网关会以目标节点为单位来将请求存放到本地磁盘。

配置管道 #

然后配置一个消费队列的管道,如下:

pipelines:
- name: bulk_request_ingest
  start:
    joint: bulk_indexing
    enabled: true
    parameters:
      elasticsearch: "prod"
      timeout: "60s"
      worker_size: 1
      bulk_size_in_mb: 1

这里使用了一个名为 bulk_request_ingest 的管道任务,并且设置目标的 Elasticsearch 集群为 prod,和前面的集群保持一致,也可以设置消费每个队列的 worker 大小和 bulk 提交的批次大小。

开启任务 #

最后,我们开启 pipeline 的任务,如下:

modules:
- name: pipeline
  enabled: true
  runners:
    - name: nodes_index
      enabled: true
      max_go_routine: 1
      threshold_in_ms: 0
      timeout_in_ms: 5000
      pipeline_id: bulk_request_ingest

这里定义了一个管道的 runner,也就是服务的执行对象,引用了我们前面定义的 bulk_request_ingest 任务,这样当极限网关收到的节点级别的请求会自动的发送到对应的 Elasticsearch 节点。

分片级别的异步提交 #

分片级别的异步提交比较适合单个索引数据量很大,需要单独处理的场景,通过将索引拆分到分片为单位,然后让 bulk 请求以分片为单位进行提交,进一步提高后端 Elasticsearch 处理的效率。

具体的配置如下:

定义流程 #

flow:
  - name: online_indexing_merge
    filter:
      - name: bulk_reshuffle
        parameters:
          elasticsearch: prod
          level: shard
          mode: async
      - name: elasticsearch
        parameters:
          elasticsearch: prod
          refresh:
            enabled: true
            interval: 30s

将拆装的级别设置为分片类型。

定义管道 #

pipelines:
- name: bulk_request_ingest
  start:
    joint: bulk_indexing
    enabled: true
    parameters:
      elasticsearch: "prod"
      index:
        - logs-repeat-test
        - logs100million
        - medcl4
      timeout: "60s"
      worker_size: 1
      bulk_size_in_mb: 1 #in MB

相比前面节点级别的配置,这里主要新增了一个 index 参数用来监听该索引下面的所有分片磁盘队列,这里需要主动添加要处理的索引,如果索引很多的话本地磁盘的开销会比较大,建议仅针对特定要优化吞吐的索引开启该模式。

开启任务 #

modules:
- name: pipeline
  enabled: true
  runners:
    - name: nodes_index
      enabled: true
      max_go_routine: 1
      threshold_in_ms: 0
      timeout_in_ms: 5000
      pipeline_id: bulk_request_ingest

此次定义和节点级别的管道任务定义一致,重启极限网关即可实现按分片为单位进行数据的异步索引提交了。

参数说明 #

名称类型说明
elasticsearchstringElasticsearch 集群实例名称
levelstring请求的 shuffle 级别,默认为 node,也就是节点级别,还可以设置为 shard 级别
modestring请求打散重新组装之后的发送模式,支持 sync 同步发送和 async 异步两种模式,如果是 async 异步模式,需要结合队列消费管道处理,默认为 sync 模式
fix_null_idbool如果 bulk 索引请求的文档里面没有指定文档 id,是否自动生成一个随机的 UUID,适合日志类型数据,默认 true
index_stats_analysisbool是否记录索引名称统计信息到请求日志,默认 true
action_stats_analysisbool是否记录批次操作统计信息到请求日志,默认 true
shardsarray设置哪些索引的分片将要被处理,默认所有分片,可以开启特定分片