优化 1:Batch
Batch 队列
从客户端调用 apply
接口提交 Task
到日志成功复制,并回调用户状态机的 on_apply
接口,日志依次经过了 Apply Queue
、Disk Queue
、Apply Task Queue
这 3 个队列。这些队列都是 brpc 的 ExecutionQueue 实现,其消费函数都做了 batch
优化:
ApplyQueue : 用户提交的 Task
会进入该队列,在该队列的消费函数中会将这些 Task
对应的日志进行打包,并调用 LogManager
的 append_entries
函数进行追加日志,打包的日志既用于持久化,也用于复制。默认一次打包 32 条日志。
DiskQueue : LogManager
在接收到这批日志后,需对其进行持久化处理,故会往该队列中提交一个持久化任务(一个任务对应一批日志);在该队列的消费函数中会将这些任务进行打包,将这批任务对应的所有日志写入磁盘。默认一次最多打包 256 个持久化任务,而每个任务最多包含 32 条日志,所以其一次 Bacth Write
最多会写入 256 * 32 = 8192
条日志对应的数据。当然其也受字节数限制,默认每次 Batch Write
最多写入 256KB
。
ApplyTaskQueue :当日志的复制数(包含持久化)达到 Quorum
后,会调用 on_committed
往 ApplyTaskQueue
中提交一个 ApplyTask
(每个 ApplyTask
对应一批已提交的日志);在该队列的消费函数中会将这些 ApplyTask
打包成 Iterator
,并作为参数回调用户状态机的 on_apply
函数。 默认一次最多打包 512 个 ApplyTask
,而每个 ApplyTask
最多包含 32 条日志,所以每一次 on_apply
参数中的 Iterator
最多包含 512 * 32 = 16384
条日志。
从以上看出,日志的复制、持久化、应用,全链路都是经过 Batch
优化的。
Follower 的 Batch
以上讨论的是节点作为 Leader 时的 Batch
优化,当节点为 Follower 时,其优化也是一样的,因为其用的是相同的代码逻辑,唯一的区别在于:
ApplyQueue : Follower 不会接受用户提交的日志(Task
),其批量日志来源于 Leader 的复制
DiskQueue : 日志批量落盘的逻辑是一样的,Follower 在接收到 Leader 的一批日志之后也是直接调用 LogManager
的 append_entries
函数
ApplyTaskQueue : 批量 on_apply
的逻辑是一样的,区别在于 Follower 的 commitIndex
来源于 Leader 在 RPC 中携带的 commitIndex
,并非通过自身的 Quorum
计算
相关配置
当然,框架也提供了一些配置项来调整这些 Batch
大小:
该打包大小影响复制、持久化以及on_apply
;上限为 512
raft_max_append_buffer_size
raft_fsm_caller_commit_batch
每次 on_apply
的最大日志数为 raft_apply_batch * raft_fsm_caller_commit_batch
另外,配置 raft_max_entries_size
也会影响每次复制(AppendEntries
请求)携带的日志数量,其默认值为 1024
具体实现
ApplyQueue
消费函数:
Copy int NodeImpl::execute_applying_tasks(
void* meta, bthread::TaskIterator<LogEntryAndClosure>& iter) {
...
// (1) FLAGS_raft_apply_batch 默认为 32
// TODO: the batch size should limited by both task size and the total log size
const size_t batch_size = FLAGS_raft_apply_batch;
// (2) 定义一个 tasks 数组,数组大小 = min(batch_size, 256)
DEFINE_SMALL_ARRAY(LogEntryAndClosure, tasks, batch_size, 256);
size_t cur_size = 0;
NodeImpl* m = (NodeImpl*)meta;
for (; iter; ++iter) {
if (cur_size == batch_size) {
m->apply(tasks, cur_size); // (4) 调用批量 apply 函数
cur_size = 0;
}
tasks[cur_size++] = *iter; // (3) batch 打包
}
...
return 0;
}
DiskQueue
消费函数:
Copy int LogManager::disk_thread(void* meta,
bthread::TaskIterator<StableClosure*>& iter) {
...
// (1) 定义 AppendBatcher 的 Batch 大小为 256
StableClosure* storage[256];
AppendBatcher ab(storage, ARRAY_SIZE(storage), &last_id, log_manager);
for (; iter; ++iter) {
// ^^^ Must iterate to the end to release to corresponding
// even if some error has occurred
StableClosure* done = *iter;
...
if (!done->_entries.empty()) {
ab.append(done); // (2) batch 打包持久化任务
} else {
ab.flush(); // (3) 批量持久化
...
}
}
...
return 0;
}
class AppendBatcher {
public:
...
void flush() {
if (_size > 0) {
// 写入持久化存储
_lm->append_to_storage(&_to_append, _last_id, &metric);
...
}
...
}
void append(LogManager::StableClosure* done) {
// FLAGS_raft_max_append_buffer_size 默认为 256KB
// 一次 Batch Write 写入的字节数不能超过该配置值
if (_size == _cap ||
_buffer_size >= (size_t)FLAGS_raft_max_append_buffer_size) {
flush();
}
_storage[_size++] = done;
_to_append.insert(_to_append.end(),
done->_entries.begin(), done->_entries.end());
for (size_t i = 0; i < done->_entries.size(); ++i) {
_buffer_size += done->_entries[i]->data.length();
}
}
}
ApplyTaskQueue
消费函数:
Copy int FSMCaller::run(void* meta, bthread::TaskIterator<ApplyTask>& iter) {
FSMCaller* caller = (FSMCaller*)meta;
...
int64_t max_committed_index = -1;
int64_t counter = 0;
// FLAGS_raft_fsm_caller_commit_batch 默认为 512
size_t batch_size = FLAGS_raft_fsm_caller_commit_batch;
for (; iter; ++iter) {
if (iter->type == COMMITTED && counter < batch_size) {
if (iter->committed_index > max_committed_index) {
max_committed_index = iter->committed_index;
counter++;
}
} else {
if (max_committed_index >= 0) {
caller->_cur_task = COMMITTED;
caller->do_committed(max_committed_index);
max_committed_index = -1;
counter = 0;
batch_size = FLAGS_raft_fsm_caller_commit_batch;
}
...
}
}
...
return 0;
}
// 生成 Iterator,调用用户状态机的 `on_apply`
void FSMCaller::do_committed(int64_t committed_index) {
...
IteratorImpl iter_impl(_fsm, _log_manager, &closure, first_closure_index,
last_applied_index, committed_index, &_applying_index);
for (; iter_impl.is_good();) {
...
Iterator iter(&iter_impl);
_fsm->on_apply(iter);
...
// Try move to next in case that we pass the same log twice.
iter.next();
}
...
}
优化 2:并行持久化日志
持久化与复制
在 Raft 的实现中,Leader 需要先在本地持久化日志,再向所有的 Follower 复制日志,显然这样的实现具有较高的时延。特别地客户端的写都要经过 Leader,导致 Leader 的压力会变大,从而导致 IO
延迟变高成为慢节点,而本地持久化也会阻塞后续的 Follower 复制。
所以在 braft 中,Leader 本地持久化和 Follower 复制是并行的,即 Leader 会先将日志写入内存,同时异步地进行持久化和向 Follower 复制。并且只要大多数节点写入成功就可以应用日志,不必等待 Leader 本地持久化成功。
具体实现
具体的实现我们已经在上一节中详细介绍过了,参见 <4.1 复制流程> 。
优化 3:流水线复制
pipeline
Raft 默认是串行复制日志,需要等待一个 AppendEntries
发送成功后再发送下一个,显然这样不是最高效的。可以采用 Pipeline
的复制方式,即 Leader 发送完 AppendEntries
请求后不必等待其响应,立马发送一下批日志。当然,这样的实现对于接受端(Follower)来说,可能会带来乱序、空洞等问题,为此,braft 在 Follower 端引入了日志缓存,将不是顺序的日志先缓存起来,待其前面的日志都接受到后再写入该日志,以达到日志连续的目的。
特别需要注意的是,pipeline
优化默认是关闭的,需要用户通过以下 2 个配置项开启:
Copy DEFINE_bool(raft_enable_append_entries_cache, false,
"enable cache for out-of-order append entries requests, should used when "
"pipeline replication is enabled (raft_max_parallel_append_entries_rpc_num > 1).");
DEFINE_int32(raft_max_parallel_append_entries_rpc_num, 1,
"The max number of parallel AppendEntries requests");
具体实现
Leader 端 pipeline
的流程如下:
(1) 发送 AppendEntries
请求时会判断并行的请求数(flying
)是否达到并行上限
(1.1) 如果是的话,直接返回;等待收到响应后回调 _on_rpc_returned
将 flying
计数减一
(1.2) 否则继续发送 AppendEntries
请求,并将 flying
计数加一
(2) 发送完后,继续判断 flying
数是否已达限,是的话如同步骤(1.1)
(3) 否则,判断是否还有日志可以发送:
(3.2) 否则注册 waiter
等待新日志到来
具体流程见下面代码解析:
Copy void Replicator::_send_entries() {
// (1) 如果已经发送的 AppendEntries 请求数大于等于 `raft_max_parallel_append_entries_rpc_num`,
// 则返回等待 `_on_rpc_returned`
if (_flying_append_entries_size >= FLAGS_raft_max_entries_size ||
_append_entries_in_fly.size() >= (size_t)FLAGS_raft_max_parallel_append_entries_rpc_num ||
_st.st == BLOCKING) {
...
return;
}
...
// (2) 否则发送 AppendEntries 请求
google::protobuf::Closure* done = brpc::NewCallback(
_on_rpc_returned, _id.value, cntl.get(),
request.get(), response.get(), butil::monotonic_time_ms());
RaftService_Stub stub(&_sending_channel);
stub.append_entries(cntl.release(), request.release(),
response.release(), done);
// (3) 发送完后调用 `_wait_more_entries`
_wait_more_entries();
}
// (4) `_wait_more_entries` 首先判断已经并行的 `AppendEntries` 请求数是否已达上限
// (4.1) 如果已达上限,则直接返回,等待收到响应后回调 `_on_rpc_returned` 即可
// (4.2) 否则调用 LogManager::wait 判断是否还有日志没同步
// 如果还有日志,则继续发送;否则创建 waiter 在后台等待新日志到来
void Replicator::_wait_more_entries() {
if (_wait_id == 0 && FLAGS_raft_max_entries_size > _flying_append_entries_size &&
(size_t)FLAGS_raft_max_parallel_append_entries_rpc_num > _append_entries_in_fly.size()) {
_wait_id = _options.log_manager->wait(
_next_index - 1, _continue_sending, (void*)_id.value);
...
}
...
}
// (5) LogManager::wait 会调用 LogManager::notify_on_new_log
LogManager::WaitId LogManager::wait(
int64_t expected_last_log_index,
int (*on_new_log)(void *arg, int error_code), void *arg) {
...
return notify_on_new_log(expected_last_log_index, wm);
}
// (6) 判断是否还有日志没同步:
LogManager::WaitId LogManager::notify_on_new_log(
int64_t expected_last_log_index, WaitMeta* wm) {
// (6.1) 如果有的话,调用 run_on_new_log 继续发送
if (expected_last_log_index != _last_log_index || _stopped) {
...
if (bthread_start_urgent(&tid, NULL, run_on_new_log, wm) != 0) {
...
}
return 0; // Not pushed into _wait_map
}
// (6.2) 没有可发送的日志,创建 waiter 在后台等待新日志
if (_next_wait_id == 0) { // skip 0
++_next_wait_id;
}
const int wait_id = _next_wait_id++;
_wait_map[wait_id] = wm;
return wait_id;
}
// (7) run_on_new_log 会调用 `_continue_sending`
void* LogManager::run_on_new_log(void *arg) {
WaitMeta* wm = (WaitMeta*)arg;
wm->on_new_log(wm->arg, wm->error_code); // on_new_log: _continue_sending
...
return NULL;
}
// (8) 而 _continue_sending 则调用 `_send_entries` 继续发送日志
int Replicator::_continue_sending(void* arg, int error_code) {
...
r->_send_entries();
...
}
Follower 接收到 AppendEntries
请求后,其处理逻辑如下:
对于到来的日志,如果其前面的日志没有接受到,先将其缓存起来,并要求 Leader 重发前面的日志
待前面的日志都达到后,再将缓存中在其之后的顺序的日志一起写入磁盘
Copy void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
const AppendEntriesRequest* request,
AppendEntriesResponse* response,
google::protobuf::Closure* done,
bool from_append_entries_cache) {
// (1) 当到来的日志的前面的日志没有接受到时,先将其缓存起来
if (local_prev_log_term != prev_log_term) {
int64_t last_index = _log_manager->last_log_index();
int64_t saved_term = request->term();
int saved_entries_size = request->entries_size();
std::string rpc_server_id = request->server_id();
// (1.1) 调用 handle_out_of_order_append_entries 将日志缓存起来
if (!from_append_entries_cache &&
handle_out_of_order_append_entries(
cntl, request, response, done, last_index)) {
// It's not safe to touch cntl/request/response/done after this point,
// since the ownership is tranfered to the cache.
...
return;
}
// (1.2) 并将响应设为 False, 要求 Leader 重发前面的日志
response->set_success(false);
response->set_term(_current_term);
response->set_last_log_index(last_index); // follower 的 lastLogIndex
...
return;
}
// (2) 当前的日志前面的日志都已达到(即日志是顺序的)
// 这时候查看缓存中是否还有连续的日志,如果有的话,
// 将以缓存的日志再次调用 handle_append_entries_request,
// 并将这些日志一并写入磁盘
// check out-of-order cache
check_append_entries_cache(index);
// (3) 将日志持久化
FollowerStableClosure* c = new FollowerStableClosure(
cntl, request, response, done_guard.release(),
this, _current_term);
_log_manager->append_entries(&entries, c);
}
优化 4:raft sync
sync 配置
日志每次 Batch Write
是先将日志写入 Page Cache
,最后再调用一次 fsync
操作。显然 sync
操作会增加时延,为此 braft 提供了一些配置项来控制日志的 sync
行为。用户可以根据业务数据丢失的容忍度高低,灵活调整这些配置以达到性能和可靠性之间的权衡。例如对于数据丢失容忍度较高的业务,可以选择将配置项 raft_sync
设置为 Flase
,这将有助于提升性能。
以下是控制日志 sync
的相关参数,前三项针对的是每次的 Batch Write
,最后两项针对的是 Segment
:
每次 Batch Write
后是否需要 sync
对于每次 Batch Write
后的 sync
策略,有立即 sync
(RAFT_SYNC_IMMEDIATELY
)和按字节数 sync
(RAFT_SYNC_BY_BYTES
)
在 RAFT_SYNC_BY_BYTES
的策略下,每多少字节 sync
一次
每当日志写满一个 Segment
需要切换时是否需要 sync
,每个 Segment
默认存储 8MB
的日志
具体实现
Batch Write
写入 Page Cache
后会调用 Segment::sync
进行 sync
操作:
Copy int Segment::sync(bool will_sync, bool has_conf) {
...
if (will_sync) {
// (1) 如果 `raft_sync` 为 False,则直接返回
if (!FLAGS_raft_sync) {
return 0;
}
// (2) 如果是按字节 `sync` 并且当前未 `sync` 的字节小于配置值,则直接返回
if (FLAGS_raft_sync_policy == RaftSyncPolicy::RAFT_SYNC_BY_BYTES
&& FLAGS_raft_sync_per_bytes > _unsynced_bytes
&& !has_conf) {
return 0;
}
_unsynced_bytes = 0;
// (3) 调用 fsync(fd)
return raft_fsync(_fd);
}
return 0;
}
每当一个 Segment
文件写完后,都会调用 Segment::close
将其转换成 closed segment
:
Copy int Segment::close(bool will_sync) {
...
if (FLAGS_raft_sync_segments && will_sync) {
ret = raft_fsync(_fd);
}
...
}
优化 5:异步 Apply
Copy void on_apply(braft::Iterator& iter) {
// A batch of tasks are committed, which must be processed through
// |iter|
for (; iter.valid(); iter.next()) {
// This guard helps invoke iter.done()->Run() asynchronously to
// avoid that callback blocks the StateMachine.
braft::AsyncClosureGuard closure_guard(iter.done());
// Parse operation from iter.data() and execute this operation
// op = parse(iter.data());
// result = process(op)
// The purpose of following logs is to help you understand the way
// this StateMachine works.
// Remove these logs in performance-sensitive servers.
LOG_IF(INFO, FLAGS_log_applied_task)
<< "Exeucted operation " << op
<< " and the result is " << result
<< " at log_index=" << iter.index();
}
}
当日志被提交时,框架会串行调用用户状态机的 on_apply
,虽然这里做了 Batch
优化,但是对于那些不支持批量更新的状态机来说,仍然是低效的。为此,用户可以将 on_apply
函数异步执行,让那些可以并行的操作尽可能并行起来。
需要注意的是,当 on_apply
异步后,需要处理好节点成为 Leader 时日志回放的问题。当节点刚成为 Leader 时,需要回放(on_apply
)之前任期的日志,这时候需要将这些日志全部应用到状态机后才能处理读取操作,不然可能会违背线性一致性。因为这些日志在之前的任期可能被提交了,客户端能读取到,而在新任期回放的时候,由于这些日志是异步执行的(on_apply
返回了,但还在异步执行中),可能还没应用到状态机,这时候客户端去读取可能是读取不到的。
参考