4.1 复制流程
流程详解
流程概览
前情回顾:当节点成为 Leader 时会为每个 Follower 创建
Replicator,并通过发送空的AppendEntries请求确认各 Follower 的nextIndex客户端通过
Node::apply接口向 Leader 提交操作日志Leader 向本地追加日志:
3.1 为日志分配
index,并将其追加到内存存储中3.2 异步将内存中的日志持久化到磁盘
3.3 唤醒所有
Replicator准备向 Follower 同步日志
Leader 的
Replicator将内存中的日志通过AppendEntries请求并行地复制给所有 FollowerFollower 收到
AppenEntries请求,将日志持久化到本地后返回成功响应Leader 若收到大多数确定,则提交日志,更新
commitIndex并应用日志Leader 调用用户状态机的
on_apply应用日志待
on_apply返回后,更新applyIndex,并删除内存中的日志
备注:文章的结尾有一张整体流程图
复制模型

日志复制是树形复制的模型,采用本地写入和复制同时进行的方式,并且只要大多数写入成功就可以应用日志,不必等待 Leader 本地写入成功。
实现利用异步+并行,全程流水线式处理,并且全链路采用了 Batch,整体十分高效:
Batch:全链路 Batch,包括持久化、复制以及应用
异步:Leader 和 Follower 的日志持久化都是异步的
并行:Leader 本地持久化和复制是并行的,并且开启
pipeline后可以保证复制和复制之间也是并行的
关于性能优化的详情,可参考<4.2 复制优化>。
Replicator
节点在刚成为 Leader 时会为每个 Follower 创建一个 Replicator,其运行在单独的 bthread 上,主要有以下几个作用:
记录 Follower 的一些状态,如
nextIndex、flyingAppendEntriesSize等作为 RPC Client,所有从 Leader 发往 Follower 的 RPC 请求都由它发送,包括心跳、
AppendEntriesRequest、InstallSnapshotRequest同步日志:
Replicator会不断地向 Follower 同步日志,直到 Follower 成功复制了 Leader 的所有日志,之后将在后台等待新日志的到来
nextIndex
nextIndex 是 Leader 记录下一个要发往 Follower 的日志 index,只有确认了 nextIndex 才能给 Follower 发送日志,不然不知道要给 Follower 发送哪些日志。
节点刚成为 Leader 时是不知道每个 Follower 的 nextIndex 的,需要通过发送一次或多次探测信息来确认,其探测算法如下:
(1)
matchIndex为最后一条 Leader 与 Follower 匹配的日志,而nextIndex=matchIndex+1(2) 初始化
matchIndex为当前 Leader 的最后一条日志的index(3) Leader 发送探测请求,携带
matchIndex以及matchIndex日志对应的term(4) Follower 接收请求后,根据自身日志获取
matchIndex对应的term:若和请求中的
term相等,则代表日志匹配,返回成功响应;若日志不存在或者
term不匹配,则返回失败响应;不管成功失败,响应中都携自身的
lastLogIndex
(5) Leader 接收到成功响应,则表示探测成功;否则回退
matchIndex并重复步骤 (3):若 Follower 的
lastLogIndex<matchIndex,则回退matchIndex为lastLogIndex否则回退
matchIndex为matchIndex-1
下图展示的是一次探测过程,图中的数字代表的是日志的
term

相关 RPC
需要注意的是,探测 nextIndex、心跳、复制日志都是用的 append_entries 方法,区别在于其请求中携带的参数不同:
探测
空
0
心跳
空
当前 Leader 的 commitIndex
复制日志
携带日志
当前 Leader 的 commitIndex
除以上 2 个参数不同外,其余的参数都是一样的
相关接口
用户提交任务:
应用日志:
前置阶段:确定 nextIndex
Replicator 在创建时会通过发送空的 AppendEntries 请求来探测 Follower 的 nextIndex,只有确定了 nextIndex 才能正式向 Follower 发送日志。具体的匹配算法我们已经在 nextIndex 中详细介绍过了。
发送 AE 请求
AE 请求Replicator 调用 _send_empty_entries 向 Follower 发送空的 AppendEntries 请求,并设置响应回调函数为 _on_rpc_returned:
处理 AE 请求
AE 请求Follower 收到 AppendEntries 请求后,会调用 handle_append_entries_request 处理:
处理 AE 响应
AE 响应Leader 收到 AppendEntries 响应后,调用 on_rpc_returned 处理响应:
等待新日志
确定 nextIndex 后,Replicator 就会调用 _send_entries 向 Follower 同步日志,直至 Follower 成功复制了 Leader 的所有日志后,将会注册一个 waiter,在后台等待新日志的到来,其整体流程如下:
阶段一:追加日志
提交任务
客户端需要将操作序列化成 IOBuf,并构建一个 Task 向 braft::Node 提交。
放入队列
Node 在收到 Task 后,会将其转换成 LogEntryAndClosure 并放入 ApplyQueue 中。至此,客户端的 apply 就完成返回了。ApplyQueue 也是个串行队列,由 BRPC ExecutionQueue 实现:
任务批处理
ApplyQueue 的消费函数是 execute_applying_tasks,在该函数中会将 LogEntryAndClosure 进行 bacth 打包处理,并调用批量 apply 接口来处理:
批量 apply 接口在接收到这些 tasks 后,主要做以下几件事情:
阶段二:持久化日志
追加日志
LogManager 在接收到这些日志(LogEntry)后会做以下几件事情:
分配 logIndex
logIndexLogManager 调用 check_and_resolve_conflict 为每一个 LogEntry 分配 index:
持久化日志
DiskQueue 的消费函数是 disk_thread,在该函数中,会利用 AppendBatcher 对持久化任务做 Batch 打包处理,最终调用 AppendBatcher::flush 将日志写入磁盘:
AppendBatcher 负责将数据写入磁盘,并调用最终的回调函数,具体的存储写入我们将在 <4.3 日志存储> 中详细介绍:
待持久化成功后,会调用之前设置的回调函数 LeaderStableClosure::Run,该函数会调用 BallotBox::commit_at 将对应日志的计数加一,详见以下小节 <提交日志>:
唤醒 Replicator
LogManager 调用 wakeup_all_waiter 唤醒所有 Replicator 向 Follower 同步日志,Replicator 唤醒会调用 _continue_sending 继续发送 AppendEntries 请求:
阶段三:复制日志
发送 AE 请求
AE 请求Replicator 被唤醒后,会调用 _send_entries 发送 AppendEntries 请求:
在 _send_entries 函数中主要做以下几件事情:
_fill_common_fields 函数负责填充 AppendEntries 请求中除 entries 的其余字段:
处理 AE 请求
AE 请求Follower 接收到 AppendEntries 请求后,会调用 handle_append_entries_request 处理请求。其实 Follower 持久化逻辑和 Leader 是一样的,都是调用 LogManager::append_entries 函数进行持久化,只不过在持久化成功后各自的回调函数不一样:
Leader:回调函数是
LeaderStableClosure;该回调函数主要是将Quorum计数加一Follower:回调函数是
FollowerStableClosure;该回调函数主要是发送AppendEntries响应,并根据请求中携带的 LeadercommitIndex更新自身的commitIndex并应用日志
还有一个小的不同点是,Leader 端日志的 index 是自己生成的,而 Follower 中的日志完全来自于 Leader。
持久化成功后将运行回调函数 FollowerStableClosure::Run:
调用 set_last_committed_index 更新 commitIndex 并应用日志:
处理 AE 响应
AE 响应Leader 在收到 AppendEntries 响应后,会根据响应的不同类型对相应的处理:
RPC 失败:调用
_block阻塞当前Replicator一段时间(默认 100 毫秒),超时后调用_continue_sending重新发送当前AppendEntries请求。出现这种情况一般是对应的 Follower Crash 了,需要不断重试直到其恢复正常或被剔除集群响应失败:这里又细分为 2 种情况
Follower 的
term比leader高:调用increase_term_to将自己step_down成 Follower,并以错误状态调用所有日志的回调函数,通知用户apply失败了日志不匹配:重新探测
nextIndex,待其确认后重新发送日志
响应成功:调用
BallotBox::commit_at对复制计算加一
increase_term_to 函数处理失败的任务,调用 step_down 将自己降级成 Follower:
在 step_down 中会调用 BallotBox::clear_pending_tasks,该函数将以失败状态调用所有用户任务的 Closure:
阶段四:提交日志
Leader 本地持久化成功或每成功复制日志给一个 Follower,都会调用 BallotBox::commit_at 将对应日志的复制计数加一,如果达到 Quorum,则更新 commitIndex,并将其应用:
阶段五:应用日志
放入队列
当日志复制数已达到 Quorum,则调用 FSMCaller::on_committed 应用日志,该函数会将应用任务放如串行队列 ApplyTaskQueue 当中。ApplyTaskQueue 也是个串行队列,由 BRPC ExecutionQueue 实现:
批处理
ApplyTaskQueue 的消费函数是 FSMCaller::run,在该函数中会对应用任务进行 Bacth 打包,并调用 FSMCaller::do_committed 进行批处理:
on_apply
在 do_committed 函数中会将这批应用任务生成迭代器 braft::iterator,并将其作为参数调用用户状态机的 on_apply,待这批日志全部被 on_apply 后,再更新 applyIndex。最后调用 LogManager::set_applied_id 删除内存中的日志:
删除内存日志
调用 set_applied_id 删除内存中的日志。注意,不删除 Leader 未持久化的日志,即使其已被 apply:
删除内存中的日志,并释放其内存:
结尾

Last updated