前两篇文章 (这里这里) 描述了 es 集群的组成、主选举算法、主更新元过程, 并分析了选举和元更新的一致性问题。本文分析了 es 中的数据流, 包括其写入过程、太平洋 a 算法模型、序列数、检查点, 并比较了 es 实现与标准 pacifica 算法的异同。我们将涵盖:

  1. 当前问题。
  2. 数据写入过程。
  3. 和平 a 算法。
  4. 序列号、检查点和故障恢复。
  5. 比较 es 和太平洋,
  6. 总结。

当前问题

任何曾经使用过 es 的人都知道, 每个 es 指数被分成多个分片。碎片分布在不同的节点上, 以启用分布式存储和查询, 并支持大规模数据集。每个分片都有多个副本, 其中一个是主节点, 其他是副本节点。首先将数据写入主节点, 然后与主节点中的副本节点同步。读取数据时, 为了提高读取能力, 主节点和副本节点都接受读取请求。

ES Cluster

使用此模型, 我们可以看到 es 具有以下一些特征:

  1. 高数据可靠性: 数据具有多个副本。
  2. 高服务可用性: 如果主节点崩溃, 可以从副本节点中选择一个新的主节点以继续提供服务。
  3. 扩展读取功能: 主节点和副本节点可以接受读取请求。
  4. 故障恢复功能: 如果主节点或副本节点崩溃, 则没有足够的副本。通过从新的主节点复制数据, 可以生成新的副本。

可能会想到一些问题, 例如:

  1. 如何将数据从主节点复制到副本节点?
  2. 它需要写入所有副本才能成功吗?
  3. 主节点崩溃是否会导致数据丢失?
  4. 从副本节点读取时, 是否始终读取最新数据?
  5. 执行故障恢复时, 是否需要复制所有碎片数据?

正如您所看到的, 尽管我们可以很容易地理解 es 数据一致性的一般原则, 但许多细节仍不清楚。本文重点介绍 es 的编写过程、所使用的一致性算法、序列 id 和检查点设计等方面, 以描述 es 的工作原理并解决上述问题。需要注意的是, 本文中的分析基于 es 6.2 版。大部分内容不适用于以前的 es 版本, 如版本2。x 版本。

数据写入过程

首先, 让我们来看看数据写入过程。

从复制的角度来看: 主 > 副本

从宏的角度来看, es 写入过程首先涉及将数据写入主节点, 然后将其同时写入副本节点, 最后将其返回到客户端。该过程如下所示:

检查活动页片计数。

String activeShardCountFailure = checkActiveShardCount();

给主。

primaryResult = primary.perform(request);

同时启动对所有复制的写入请求。

performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());

在所有复制都返回或失败后, 它们将返回到客户端得到 () > 0: “对于请求 [” + 请求 + “]”, 挂起的操作计数低于 0;
如果 (挂起行动) 获取 () = = 0) {完成 ();
}
}

上面的过程是 ReplicationOperation 类的执行函数, 完整的代码如下所示:

    public void execute() throws Exception {
        final String activeShardCountFailure = checkActiveShardCount();
        final ShardRouting primaryRouting = primary.routingEntry();
        final ShardId primaryId = primaryRouting.shardId();
        if (activeShardCountFailure ! = null) {
            finishAsFailed(new UnavailableShardsException(primaryId,
                "{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
            return;
        }

        totalShards.incrementAndGet();
        pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
        primaryResult = primary.perform(request);
        primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
        final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
        if (replicaRequest ! = null) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
            }

            // We must obtain the replication group after successfully indexing into the primary to follow recovery semantics.
            // We must make sure that every operation indexed into the primary after recovery start is also replicated
            // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
            // We also must make sure to obtain the global checkpoint before the replication group to ensure that the global checkpoint
            // is valid for this replication group. If we sampled in the reverse direction, the global checkpoint might be based on a subset
            // of the sampled replication group and advanced further than what the given replication group would allow.
            // This would mean that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
            final long globalCheckpoint = primary.globalCheckpoint();
            final ReplicationGroup replicationGroup = primary.getReplicationGroup();
            markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
            performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
        }

        successfulShards.incrementAndGet();  // mark primary as successful
        decPendingAndFinishIfNeeded();
    }

接下来, 我们将分析有关此过程的一些问题:

1. 为什么必须在第一步中检查活动分片计数?

有一个在 wait_for_active_shards es 中调用的参数。它是一个索引设置, 可以附加到请求。此参数指示分片在每次写入操作之前应具有的活动副本的最小数量。假设我们有一个索引, 其中每个分片有三个副本节点, 总计四个副本 (加上主节点)。如果 wait_for_active_shards 配置为 3, 则最多允许一个副本节点崩溃; 如果两个副本节点崩溃, 则活动副本的数量小于三个, 此时不允许写入操作。

默认情况下, 此参数设置为 1, 这意味着如果主节点存在, 则允许写入操作, 这意味着此时不使用此参数。如果将其设置为大于1的数字, 则可以具有保护作用, 从而确保写入数据具有更高的可靠性

2. 写入主节点完成后, 为什么在所有副本节点响应 (或连接失败) 之前不返回它?

在早期版本的 es 中, 允许在主节点和副本节点之间进行异步复制, 这意味着在写入成功后返回主节点。但是, 在此模式下, 如果主节点崩溃, 则存在数据丢失的风险, 并且很难保证从副本节点读取的数据是最新的。因此, es 停止使用异步模式。现在, 在返回副本节点之前, 主节点不会返回到客户端。

由于在返回所有副本节点之前, 主节点不会返回到客户端, 因此延迟会受到最慢的副本节点的影响, 这显然是当前 es 体系结构的一个缺点。最初, 我们认为一旦写入 wait_for_active_shards 副本成功, 结果就会返回, 但是, 后来, 在阅读了源代码之后, 我们意识到, 直到返回所有副本节点, 才返回结果。

如果写入副本节点失败, es 将执行重试逻辑; 如果写入副本节点失败, es 将执行重试逻辑。但是, 没有显式指定需要成功写入的节点数。返回的结果包括数据写入成功或失败的分片数:

{
    "_shards" : {
        "total" : 2,
        "failed" : 0,
        "successful" : 2
    }
}

3. 如果写入副本节点连续失败, 用户查找是否会看到旧数据?

换句话说, 假设写入副本节点连续失败, 副本节点中的数据可能比主节点中的数据早很多。我们知道, 在 es 中, 副本也可以处理读取请求, 因此用户是否读取此副本节点中的旧数据?

答案是, 如果写入副本节点失败, 主节点将问题报告给主节点, 然后主节点 InSyncAllocations 更新元中索引的配置并删除副本节点。之后, 它将不再处理读取请求。在元更新到达每个节点之前, 用户仍然可以读取此副本节点上的数据, 但在元更新完成后不会这样做。这个解决方案并不严格。考虑到 es 是一个近乎实时的系统, 在写入数据后, 需要刷新才能显示。因此, 一般来说, 可以在短时间内读取遗留数据是可以接受的。

ReplicationOperation.java, OnFailure function for failure to write to Replica nodes:

            public void onFailure(Exception replicaException) {
                logger.trace(
                    (org.apache.logging.log4j.util.Supplier<? >) () -> new ParameterizedMessage(
                        "[{}] failure while performing [{}] on replica {}, request [{}]",
                        shard.shardId(),
                        opType,
                        shard,
                        replicaRequest),
                    replicaException);
                if (TransportActions.isShardNotAvailableException(replicaException)) {
                    decPendingAndFinishIfNeeded();
                } else {
                    RestStatus restStatus = ExceptionsHelper.status(replicaException);
                    shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
                        shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
                    String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
                    replicasProxy.failShardIfNeeded(shard, message,
                            replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
                            ReplicationOperation

warn((org.apache.logging.log4j.util.Supplier< >) () ()-> 新的参数化消息 (“{}] {}”, 副本. shardid (), 消息), 例外);
(副本. shardid), 副本。
将请求发送给主, 执行副本的 shard失败逻辑, 并从 “入侵分配” 中删除 “碎片”。

公共空空白 shardFailed (shad路由故障处理器, unassignedInfo unifidnidinfo) {如果 (故障说明. 活动 () & & un宁愿 dde info. get治号 ()! = UnassignedInfo.Reason.NODE _ 滞后) {停锁 (故障分配页);

如果 (故障保护. 主 ()) {更新更新 = 更改 (故障管理器. shardid ());
如果 (更新. first. dereprim = null) {//多个主服务器可能会失败 (由于批处理, 主服务器可能会失败, 副本会升级, 然后失败…)
更新。
}}} 如果 (失败 shard. 活动 () & & 失败 shard. prim () {增长主期限 (失败 shard. shardid ());
}
}

InSyncAllocation 在 es 中维护使用 pacfica 算法, 下一篇文章对此进行了详细介绍。

从小学的角度来看

从主服务器的角度来看, 写入请求在写入转换之前会写入 lucene。

1. 为什么需要传输日志写入?

translog 类似于数据库或双日志中的提交日志。传输日志写入成功并刷新后, 数据将直接刷新到磁盘, 从而保证数据安全, 以便以后可以将段刷新到磁盘。因为转换是使用追加编写的, 所以写入性能比使用随机写入要好。

此外, 由于传输日志记录每个数据更改和数据更改的顺序, 因此可用于数据恢复。数据恢复由两部分组成: 第一, 在节点重新启动后, 从传输日志中恢复未刷新到磁盘的段数据;其次, 它用于主节点和新副本节点之间的数据同步, 这是副本尝试跟上主数据的过程。

2. 为什么 lucene 在记录写入之前需要写入?

lucene 写入将数据写入内存。写入操作完成后, 可以在刷新时立即读取数据;传输日志写入将数据刷新到磁盘以实现数据持久性和恢复。通常, 在分布式系统中, commitLog 首先为数据持久性编写, 然后将此更改应用于内存。那么, 为什么 es 的工作方式完全相反呢?这很可能是主要原因是, 当写入 lucene 时, lucene 运行各种数据检查, lucene 写入操作可能会失败。如果首先写入传输日志, 则可能需要在传输日志写入操作成功时处理 lucene 写入连续失败的问题。因此, es 首先采用了给 lucene 写信的过程。

这都是第1部分!在第2部分中, 我们将介绍 pacfica 算法、序列号、检查点和故障发现。明天再调整一下再看看!

Comments are closed.