优化 1:Batch
Batch 队列
从客户端调用 apply
接口提交 Task
到日志成功复制,并回调用户状态机的 on_apply
接口,日志依次经过了 Apply Queue
、Disk Queue
、Apply Task Queue
这 3 个队列。这些队列都是 brpc 的 ExecutionQueue 实现,其消费函数都做了 batch
ApplyQueue : 用户提交的 Task
会进入该队列,在该队列的消费函数中会将这些 Task
对应的日志进行打包,并调用 LogManager
的 append_entries
函数进行追加日志,打包的日志既用于持久化,也用于复制。默认一次打包 32 条日志。
DiskQueue : LogManager
在接收到这批日志后,需对其进行持久化处理,故会往该队列中提交一个持久化任务(一个任务对应一批日志);在该队列的消费函数中会将这些任务进行打包,将这批任务对应的所有日志写入磁盘。默认一次最多打包 256 个持久化任务,而每个任务最多包含 32 条日志,所以其一次 Bacth Write
最多会写入 256 * 32 = 8192
条日志对应的数据。当然其也受字节数限制,默认每次 Batch Write
最多写入 256KB
ApplyTaskQueue :当日志的复制数(包含持久化)达到 Quorum
后,会调用 on_committed
往 ApplyTaskQueue
中提交一个 ApplyTask
(每个 ApplyTask
对应一批已提交的日志);在该队列的消费函数中会将这些 ApplyTask
打包成 Iterator
,并作为参数回调用户状态机的 on_apply
函数。 默认一次最多打包 512 个 ApplyTask
,而每个 ApplyTask
最多包含 32 条日志,所以每一次 on_apply
参数中的 Iterator
最多包含 512 * 32 = 16384
从以上看出,日志的复制、持久化、应用,全链路都是经过 Batch
Follower 的 Batch
以上讨论的是节点作为 Leader 时的 Batch
优化,当节点为 Follower 时,其优化也是一样的,因为其用的是相同的代码逻辑,唯一的区别在于:
ApplyQueue : Follower 不会接受用户提交的日志(Task
),其批量日志来源于 Leader 的复制
DiskQueue : 日志批量落盘的逻辑是一样的,Follower 在接收到 Leader 的一批日志之后也是直接调用 LogManager
的 append_entries
ApplyTaskQueue : 批量 on_apply
的逻辑是一样的,区别在于 Follower 的 commitIndex
来源于 Leader 在 RPC 中携带的 commitIndex
,并非通过自身的 Quorum
当然,框架也提供了一些配置项来调整这些 Batch
;上限为 512
每次 on_apply
的最大日志数为 raft_apply_batch * raft_fsm_caller_commit_batch
另外,配置 raft_max_entries_size
请求)携带的日志数量,其默认值为 1024
Copy int NodeImpl :: execute_applying_tasks (
void* meta , bthread :: TaskIterator < LogEntryAndClosure > & iter) {
// (1) FLAGS_raft_apply_batch 默认为 32
// TODO: the batch size should limited by both task size and the total log size
const size_t batch_size = FLAGS_raft_apply_batch;
// (2) 定义一个 tasks 数组,数组大小 = min(batch_size, 256)
DEFINE_SMALL_ARRAY (LogEntryAndClosure , tasks , batch_size , 256 );
size_t cur_size = 0 ;
NodeImpl * m = (NodeImpl * )meta;
for (; iter; ++ iter) {
if (cur_size == batch_size) {
m -> apply (tasks , cur_size); // (4) 调用批量 apply 函数
cur_size = 0 ;
tasks [cur_size ++ ] = * iter; // (3) batch 打包
return 0 ;
Copy int LogManager :: disk_thread ( void* meta ,
bthread :: TaskIterator < StableClosure * > & iter) {
// (1) 定义 AppendBatcher 的 Batch 大小为 256
StableClosure * storage [ 256 ];
AppendBatcher ab (storage , ARRAY_SIZE (storage) , & last_id , log_manager);
for (; iter; ++ iter) {
// ^^^ Must iterate to the end to release to corresponding
// even if some error has occurred
StableClosure * done = * iter;
if ( ! done -> _entries . empty ()) {
ab . append (done); // (2) batch 打包持久化任务
} else {
ab . flush (); // (3) 批量持久化
return 0 ;
class AppendBatcher {
public :
void flush () {
if (_size > 0 ) {
// 写入持久化存储
_lm -> append_to_storage ( & _to_append , _last_id , & metric);
void append (LogManager :: StableClosure * done) {
// FLAGS_raft_max_append_buffer_size 默认为 256KB
// 一次 Batch Write 写入的字节数不能超过该配置值
if (_size == _cap ||
_buffer_size >= ( size_t )FLAGS_raft_max_append_buffer_size) {
flush ();
_storage [_size ++ ] = done;
_to_append . insert ( _to_append . end () ,
done -> _entries . begin () , done -> _entries . end ());
for ( size_t i = 0 ; i < done -> _entries . size (); ++ i) {
_buffer_size += done -> _entries[i] -> data . length ();
Copy int FSMCaller :: run ( void* meta , bthread :: TaskIterator < ApplyTask > & iter) {
FSMCaller * caller = (FSMCaller * )meta;
int64_t max_committed_index = - 1 ;
int64_t counter = 0 ;
// FLAGS_raft_fsm_caller_commit_batch 默认为 512
size_t batch_size = FLAGS_raft_fsm_caller_commit_batch;
for (; iter; ++ iter) {
if ( iter -> type == COMMITTED && counter < batch_size) {
if ( iter -> committed_index > max_committed_index) {
max_committed_index = iter -> committed_index;
counter ++ ;
} else {
if (max_committed_index >= 0 ) {
caller -> _cur_task = COMMITTED;
caller -> do_committed (max_committed_index);
max_committed_index = - 1 ;
counter = 0 ;
batch_size = FLAGS_raft_fsm_caller_commit_batch;
return 0 ;
// 生成 Iterator,调用用户状态机的 `on_apply`
void FSMCaller :: do_committed ( int64_t committed_index) {
IteratorImpl iter_impl (_fsm , _log_manager , & closure , first_closure_index ,
last_applied_index , committed_index , & _applying_index);
for (; iter_impl . is_good ();) {
Iterator iter ( & iter_impl);
_fsm -> on_apply (iter);
// Try move to next in case that we pass the same log twice.
iter . next ();
优化 2:并行持久化日志
在 Raft 的实现中,Leader 需要先在本地持久化日志,再向所有的 Follower 复制日志,显然这样的实现具有较高的时延。特别地客户端的写都要经过 Leader,导致 Leader 的压力会变大,从而导致 IO
延迟变高成为慢节点,而本地持久化也会阻塞后续的 Follower 复制。
所以在 braft 中,Leader 本地持久化和 Follower 复制是并行的,即 Leader 会先将日志写入内存,同时异步地进行持久化和向 Follower 复制。并且只要大多数节点写入成功就可以应用日志,不必等待 Leader 本地持久化成功。
具体的实现我们已经在上一节中详细介绍过了,参见 <4.1 复制流程> 。
优化 3:流水线复制
Raft 默认是串行复制日志,需要等待一个 AppendEntries
发送成功后再发送下一个,显然这样不是最高效的。可以采用 Pipeline
的复制方式,即 Leader 发送完 AppendEntries
请求后不必等待其响应,立马发送一下批日志。当然,这样的实现对于接受端(Follower)来说,可能会带来乱序、空洞等问题,为此,braft 在 Follower 端引入了日志缓存,将不是顺序的日志先缓存起来,待其前面的日志都接受到后再写入该日志,以达到日志连续的目的。
优化默认是关闭的,需要用户通过以下 2 个配置项开启:
Copy DEFINE_bool (raft_enable_append_entries_cache , false ,
"enable cache for out-of-order append entries requests, should used when "
"pipeline replication is enabled (raft_max_parallel_append_entries_rpc_num > 1)." );
DEFINE_int32 (raft_max_parallel_append_entries_rpc_num , 1 ,
"The max number of parallel AppendEntries requests" );
Leader 端 pipeline
(1) 发送 AppendEntries
(1.1) 如果是的话,直接返回;等待收到响应后回调 _on_rpc_returned
将 flying
(1.2) 否则继续发送 AppendEntries
请求,并将 flying
(2) 发送完后,继续判断 flying
(3) 否则,判断是否还有日志可以发送:
(3.2) 否则注册 waiter
Copy void Replicator :: _send_entries () {
// (1) 如果已经发送的 AppendEntries 请求数大于等于 `raft_max_parallel_append_entries_rpc_num`,
// 则返回等待 `_on_rpc_returned`
if (_flying_append_entries_size >= FLAGS_raft_max_entries_size ||
_append_entries_in_fly . size () >= ( size_t )FLAGS_raft_max_parallel_append_entries_rpc_num ||
_st . st == BLOCKING) {
return ;
// (2) 否则发送 AppendEntries 请求
google :: protobuf :: Closure * done = brpc :: NewCallback (
_on_rpc_returned , _id . value , cntl . get () ,
request . get () , response . get () , butil :: monotonic_time_ms ());
RaftService_Stub stub ( & _sending_channel);
stub . append_entries ( cntl . release () , request . release () ,
response . release () , done);
// (3) 发送完后调用 `_wait_more_entries`
_wait_more_entries ();
// (4) `_wait_more_entries` 首先判断已经并行的 `AppendEntries` 请求数是否已达上限
// (4.1) 如果已达上限,则直接返回,等待收到响应后回调 `_on_rpc_returned` 即可
// (4.2) 否则调用 LogManager::wait 判断是否还有日志没同步
// 如果还有日志,则继续发送;否则创建 waiter 在后台等待新日志到来
void Replicator :: _wait_more_entries () {
if (_wait_id == 0 && FLAGS_raft_max_entries_size > _flying_append_entries_size &&
( size_t )FLAGS_raft_max_parallel_append_entries_rpc_num > _append_entries_in_fly . size ()) {
_wait_id = _options . log_manager -> wait (
_next_index - 1 , _continue_sending , ( void* ) _id . value);
// (5) LogManager::wait 会调用 LogManager::notify_on_new_log
LogManager :: WaitId LogManager :: wait (
int64_t expected_last_log_index ,
int (*on_new_log)( void * arg , int error_code) , void * arg) {
return notify_on_new_log (expected_last_log_index , wm);
// (6) 判断是否还有日志没同步:
LogManager :: WaitId LogManager :: notify_on_new_log (
int64_t expected_last_log_index , WaitMeta * wm) {
// (6.1) 如果有的话,调用 run_on_new_log 继续发送
if (expected_last_log_index != _last_log_index || _stopped) {
if ( bthread_start_urgent ( & tid , NULL , run_on_new_log , wm) != 0 ) {
return 0 ; // Not pushed into _wait_map
// (6.2) 没有可发送的日志,创建 waiter 在后台等待新日志
if (_next_wait_id == 0 ) { // skip 0
++ _next_wait_id;
const int wait_id = _next_wait_id ++ ;
_wait_map [wait_id] = wm;
return wait_id;
// (7) run_on_new_log 会调用 `_continue_sending`
void* LogManager :: run_on_new_log ( void * arg) {
WaitMeta * wm = (WaitMeta * )arg;
wm -> on_new_log ( wm -> arg , wm -> error_code); // on_new_log: _continue_sending
return NULL ;
// (8) 而 _continue_sending 则调用 `_send_entries` 继续发送日志
int Replicator :: _continue_sending ( void* arg , int error_code) {
r -> _send_entries ();
Follower 接收到 AppendEntries
对于到来的日志,如果其前面的日志没有接受到,先将其缓存起来,并要求 Leader 重发前面的日志
Copy void NodeImpl :: handle_append_entries_request (brpc :: Controller * cntl ,
const AppendEntriesRequest * request ,
AppendEntriesResponse * response ,
google :: protobuf :: Closure * done ,
bool from_append_entries_cache) {
// (1) 当到来的日志的前面的日志没有接受到时,先将其缓存起来
if (local_prev_log_term != prev_log_term) {
int64_t last_index = _log_manager -> last_log_index ();
int64_t saved_term = request -> term ();
int saved_entries_size = request -> entries_size ();
std :: string rpc_server_id = request -> server_id ();
// (1.1) 调用 handle_out_of_order_append_entries 将日志缓存起来
if ( ! from_append_entries_cache &&
handle_out_of_order_append_entries (
cntl , request , response , done , last_index)) {
// It's not safe to touch cntl/request/response/done after this point,
// since the ownership is tranfered to the cache.
return ;
// (1.2) 并将响应设为 False, 要求 Leader 重发前面的日志
response -> set_success ( false );
response -> set_term (_current_term);
response -> set_last_log_index (last_index); // follower 的 lastLogIndex
return ;
// (2) 当前的日志前面的日志都已达到(即日志是顺序的)
// 这时候查看缓存中是否还有连续的日志,如果有的话,
// 将以缓存的日志再次调用 handle_append_entries_request,
// 并将这些日志一并写入磁盘
// check out-of-order cache
check_append_entries_cache (index);
// (3) 将日志持久化
FollowerStableClosure * c = new FollowerStableClosure (
cntl , request , response , done_guard . release () ,
this , _current_term);
_log_manager -> append_entries ( & entries , c);
优化 4:raft sync
sync 配置
日志每次 Batch Write
是先将日志写入 Page Cache
,最后再调用一次 fsync
操作。显然 sync
操作会增加时延,为此 braft 提供了一些配置项来控制日志的 sync
行为。用户可以根据业务数据丢失的容忍度高低,灵活调整这些配置以达到性能和可靠性之间的权衡。例如对于数据丢失容忍度较高的业务,可以选择将配置项 raft_sync
设置为 Flase
以下是控制日志 sync
的相关参数,前三项针对的是每次的 Batch Write
,最后两项针对的是 Segment
每次 Batch Write
后是否需要 sync
对于每次 Batch Write
后的 sync
策略,有立即 sync
)和按字节数 sync
的策略下,每多少字节 sync
每当日志写满一个 Segment
需要切换时是否需要 sync
,每个 Segment
默认存储 8MB
Batch Write
写入 Page Cache
后会调用 Segment::sync
进行 sync
Copy int Segment :: sync ( bool will_sync , bool has_conf) {
if (will_sync) {
// (1) 如果 `raft_sync` 为 False,则直接返回
if ( ! FLAGS_raft_sync) {
return 0 ;
// (2) 如果是按字节 `sync` 并且当前未 `sync` 的字节小于配置值,则直接返回
if (FLAGS_raft_sync_policy == RaftSyncPolicy :: RAFT_SYNC_BY_BYTES
&& FLAGS_raft_sync_per_bytes > _unsynced_bytes
&& ! has_conf) {
return 0 ;
_unsynced_bytes = 0 ;
// (3) 调用 fsync(fd)
return raft_fsync (_fd);
return 0 ;
每当一个 Segment
文件写完后,都会调用 Segment::close
将其转换成 closed segment
Copy int Segment :: close ( bool will_sync) {
if (FLAGS_raft_sync_segments && will_sync) {
ret = raft_fsync (_fd);
优化 5:异步 Apply
Copy void on_apply (braft :: Iterator & iter) {
// A batch of tasks are committed, which must be processed through
// |iter|
for (; iter . valid (); iter . next ()) {
// This guard helps invoke iter.done()->Run() asynchronously to
// avoid that callback blocks the StateMachine.
braft :: AsyncClosureGuard closure_guard ( iter . done ());
// Parse operation from iter.data() and execute this operation
// op = parse(iter.data());
// result = process(op)
// The purpose of following logs is to help you understand the way
// this StateMachine works.
// Remove these logs in performance-sensitive servers.
LOG_IF (INFO , FLAGS_log_applied_task)
<< "Exeucted operation " << op
<< " and the result is " << result
<< " at log_index=" << iter . index ();
当日志被提交时,框架会串行调用用户状态机的 on_apply
,虽然这里做了 Batch
优化,但是对于那些不支持批量更新的状态机来说,仍然是低效的。为此,用户可以将 on_apply
需要注意的是,当 on_apply
异步后,需要处理好节点成为 Leader 时日志回放的问题。当节点刚成为 Leader 时,需要回放(on_apply