4.1 复制流程

流程详解

流程概览

  1. 前情回顾:当节点成为 Leader 时会为每个 Follower 创建 Replicator,并通过发送空的 AppendEntries 请求确认各 Follower 的 nextIndex

  2. 客户端通过 Node::apply 接口向 Leader 提交操作日志

  3. Leader 向本地追加日志:

    • 3.1 为日志分配 index,并将其追加到内存存储中

    • 3.2 异步将内存中的日志持久化到磁盘

    • 3.3 唤醒所有 Replicator 准备向 Follower 同步日志

  4. Leader 的 Replicator 将内存中的日志通过 AppendEntries 请求并行地复制给所有 Follower

  5. Follower 收到 AppenEntries 请求,将日志持久化到本地后返回成功响应

  6. Leader 若收到大多数确定,则提交日志,更新 commitIndex 并应用日志

  7. Leader 调用用户状态机的 on_apply 应用日志

  8. on_apply 返回后,更新 applyIndex,并删除内存中的日志

备注:文章的结尾有一张整体流程图

复制模型

图 4.1 复制模型

日志复制是树形复制的模型,采用本地写入和复制同时进行的方式,并且只要大多数写入成功就可以应用日志,不必等待 Leader 本地写入成功。

实现利用异步+并行,全程流水线式处理,并且全链路采用了 Batch,整体十分高效:

  • Batch:全链路 Batch,包括持久化、复制以及应用

  • 异步:Leader 和 Follower 的日志持久化都是异步的

  • 并行:Leader 本地持久化和复制是并行的,并且开启 pipeline 后可以保证复制和复制之间也是并行的

关于性能优化的详情,可参考<4.2 复制优化>

Replicator

节点在刚成为 Leader 时会为每个 Follower 创建一个 Replicator,其运行在单独的 bthread 上,主要有以下几个作用:

  • 记录 Follower 的一些状态,如 nextIndexflyingAppendEntriesSize

  • 作为 RPC Client,所有从 Leader 发往 Follower 的 RPC 请求都由它发送,包括心跳、AppendEntriesRequestInstallSnapshotRequest

  • 同步日志:Replicator 会不断地向 Follower 同步日志,直到 Follower 成功复制了 Leader 的所有日志,之后将在后台等待新日志的到来

nextIndex

nextIndex 是 Leader 记录下一个要发往 Follower 的日志 index,只有确认了 nextIndex 才能给 Follower 发送日志,不然不知道要给 Follower 发送哪些日志。

节点刚成为 Leader 时是不知道每个 Follower 的 nextIndex 的,需要通过发送一次或多次探测信息来确认,其探测算法如下:

  • (1) matchIndex 为最后一条 Leader 与 Follower 匹配的日志,而 nextIndex=matchIndex+1

  • (2) 初始化 matchIndex 为当前 Leader 的最后一条日志的 index

  • (3) Leader 发送探测请求,携带 matchIndex 以及 matchIndex 日志对应的 term

  • (4) Follower 接收请求后,根据自身日志获取 matchIndex 对应的 term

    • 若和请求中的 term 相等,则代表日志匹配,返回成功响应;

    • 若日志不存在或者 term 不匹配,则返回失败响应;

    • 不管成功失败,响应中都携自身的 lastLogIndex

  • (5) Leader 接收到成功响应,则表示探测成功;否则回退 matchIndex 并重复步骤 (3):

    • 若 Follower 的 lastLogIndex<matchIndex,则回退 matchIndexlastLogIndex

    • 否则回退 matchIndexmatchIndex-1

下图展示的是一次探测过程,图中的数字代表的是日志的 term

图 4.2 探测过程

相关 RPC

需要注意的是,探测 nextIndex、心跳、复制日志都是用的 append_entries 方法,区别在于其请求中携带的参数不同:

用途
entries
committed_index

探测

0

心跳

当前 Leader 的 commitIndex

复制日志

携带日志

当前 Leader 的 commitIndex

除以上 2 个参数不同外,其余的参数都是一样的

相关接口

用户提交任务:

应用日志:

前置阶段:确定 nextIndex

Replicator 在创建时会通过发送空的 AppendEntries 请求来探测 Follower 的 nextIndex,只有确定了 nextIndex 才能正式向 Follower 发送日志。具体的匹配算法我们已经在 nextIndex 中详细介绍过了。

发送 AE 请求

Replicator 调用 _send_empty_entries 向 Follower 发送空的 AppendEntries 请求,并设置响应回调函数为 _on_rpc_returned

处理 AE 请求

Follower 收到 AppendEntries 请求后,会调用 handle_append_entries_request 处理:

处理 AE 响应

Leader 收到 AppendEntries 响应后,调用 on_rpc_returned 处理响应:

等待新日志

确定 nextIndex 后,Replicator 就会调用 _send_entries 向 Follower 同步日志,直至 Follower 成功复制了 Leader 的所有日志后,将会注册一个 waiter,在后台等待新日志的到来,其整体流程如下:

阶段一:追加日志

提交任务

客户端需要将操作序列化成 IOBuf,并构建一个 Taskbraft::Node 提交。

放入队列

Node 在收到 Task 后,会将其转换成 LogEntryAndClosure 并放入 ApplyQueue 中。至此,客户端的 apply 就完成返回了。ApplyQueue 也是个串行队列,由 BRPC ExecutionQueue 实现:

任务批处理

ApplyQueue 的消费函数是 execute_applying_tasks,在该函数中会将 LogEntryAndClosure 进行 bacth 打包处理,并调用批量 apply 接口来处理:

批量 apply 接口在接收到这些 tasks 后,主要做以下几件事情:

阶段二:持久化日志

追加日志

LogManager 在接收到这些日志(LogEntry)后会做以下几件事情:

分配 logIndex

LogManager 调用 check_and_resolve_conflict 为每一个 LogEntry 分配 index:

持久化日志

DiskQueue 的消费函数是 disk_thread,在该函数中,会利用 AppendBatcher 对持久化任务做 Batch 打包处理,最终调用 AppendBatcher::flush 将日志写入磁盘:

AppendBatcher 负责将数据写入磁盘,并调用最终的回调函数,具体的存储写入我们将在 <4.3 日志存储> 中详细介绍:

待持久化成功后,会调用之前设置的回调函数 LeaderStableClosure::Run,该函数会调用 BallotBox::commit_at 将对应日志的计数加一,详见以下小节 <提交日志>:

唤醒 Replicator

LogManager 调用 wakeup_all_waiter 唤醒所有 Replicator 向 Follower 同步日志,Replicator 唤醒会调用 _continue_sending 继续发送 AppendEntries 请求:

阶段三:复制日志

发送 AE 请求

Replicator 被唤醒后,会调用 _send_entries 发送 AppendEntries 请求:

_send_entries 函数中主要做以下几件事情:

_fill_common_fields 函数负责填充 AppendEntries 请求中除 entries 的其余字段:

处理 AE 请求

Follower 接收到 AppendEntries 请求后,会调用 handle_append_entries_request 处理请求。其实 Follower 持久化逻辑和 Leader 是一样的,都是调用 LogManager::append_entries 函数进行持久化,只不过在持久化成功后各自的回调函数不一样:

  • Leader:回调函数是 LeaderStableClosure;该回调函数主要是将 Quorum 计数加一

  • Follower:回调函数是 FollowerStableClosure;该回调函数主要是发送 AppendEntries 响应,并根据请求中携带的 Leader commitIndex 更新自身的 commitIndex 并应用日志

还有一个小的不同点是,Leader 端日志的 index 是自己生成的,而 Follower 中的日志完全来自于 Leader。

持久化成功后将运行回调函数 FollowerStableClosure::Run

调用 set_last_committed_index 更新 commitIndex 并应用日志:

处理 AE 响应

Leader 在收到 AppendEntries 响应后,会根据响应的不同类型对相应的处理:

  • RPC 失败:调用 _block 阻塞当前 Replicator 一段时间(默认 100 毫秒),超时后调用 _continue_sending 重新发送当前 AppendEntries 请求。出现这种情况一般是对应的 Follower Crash 了,需要不断重试直到其恢复正常或被剔除集群

  • 响应失败:这里又细分为 2 种情况

    • Follower 的 termleader 高:调用 increase_term_to 将自己 step_down 成 Follower,并以错误状态调用所有日志的回调函数,通知用户 apply 失败了

    • 日志不匹配:重新探测 nextIndex,待其确认后重新发送日志

  • 响应成功:调用 BallotBox::commit_at 对复制计算加一

increase_term_to 函数处理失败的任务,调用 step_down 将自己降级成 Follower:

step_down 中会调用 BallotBox::clear_pending_tasks,该函数将以失败状态调用所有用户任务的 Closure

阶段四:提交日志

Leader 本地持久化成功或每成功复制日志给一个 Follower,都会调用 BallotBox::commit_at 将对应日志的复制计数加一,如果达到 Quorum,则更新 commitIndex,并将其应用:

阶段五:应用日志

放入队列

当日志复制数已达到 Quorum,则调用 FSMCaller::on_committed 应用日志,该函数会将应用任务放如串行队列 ApplyTaskQueue 当中。ApplyTaskQueue 也是个串行队列,由 BRPC ExecutionQueue 实现:

批处理

ApplyTaskQueue 的消费函数是 FSMCaller::run,在该函数中会对应用任务进行 Bacth 打包,并调用 FSMCaller::do_committed 进行批处理:

on_apply

do_committed 函数中会将这批应用任务生成迭代器 braft::iterator,并将其作为参数调用用户状态机的 on_apply,待这批日志全部被 on_apply 后,再更新 applyIndex。最后调用 LogManager::set_applied_id 删除内存中的日志:

删除内存日志

调用 set_applied_id 删除内存中的日志。注意,不删除 Leader 未持久化的日志,即使其已被 apply

删除内存中的日志,并释放其内存:

结尾

图 4.3 日志复制整体流程图

Last updated