4.2 复制优化

优化 1:Batch

Batch 队列

从客户端调用 apply 接口提交 Task 到日志成功复制,并回调用户状态机的 on_apply 接口,日志依次经过了 Apply QueueDisk QueueApply Task Queue 这 3 个队列。这些队列都是 brpc 的 ExecutionQueue 实现,其消费函数都做了 batch 优化:

  • ApplyQueue: 用户提交的 Task 会进入该队列,在该队列的消费函数中会将这些 Task 对应的日志进行打包,并调用 LogManagerappend_entries 函数进行追加日志,打包的日志既用于持久化,也用于复制。默认一次打包 32 条日志。

  • DiskQueue: LogManager 在接收到这批日志后,需对其进行持久化处理,故会往该队列中提交一个持久化任务(一个任务对应一批日志);在该队列的消费函数中会将这些任务进行打包,将这批任务对应的所有日志写入磁盘。默认一次最多打包 256 个持久化任务,而每个任务最多包含 32 条日志,所以其一次 Bacth Write 最多会写入 256 * 32 = 8192 条日志对应的数据。当然其也受字节数限制,默认每次 Batch Write 最多写入 256KB

  • ApplyTaskQueue:当日志的复制数(包含持久化)达到 Quorum 后,会调用 on_committedApplyTaskQueue 中提交一个 ApplyTask(每个 ApplyTask 对应一批已提交的日志);在该队列的消费函数中会将这些 ApplyTask 打包成 Iterator,并作为参数回调用户状态机的 on_apply 函数。 默认一次最多打包 512 个 ApplyTask,而每个 ApplyTask 最多包含 32 条日志,所以每一次 on_apply 参数中的 Iterator 最多包含 512 * 32 = 16384 条日志。

从以上看出,日志的复制、持久化、应用,全链路都是经过 Batch 优化的。

Follower 的 Batch

以上讨论的是节点作为 Leader 时的 Batch 优化,当节点为 Follower 时,其优化也是一样的,因为其用的是相同的代码逻辑,唯一的区别在于:

  • ApplyQueue: Follower 不会接受用户提交的日志(Task),其批量日志来源于 Leader 的复制

  • DiskQueue: 日志批量落盘的逻辑是一样的,Follower 在接收到 Leader 的一批日志之后也是直接调用 LogManagerappend_entries 函数

  • ApplyTaskQueue: 批量 on_apply 的逻辑是一样的,区别在于 Follower 的 commitIndex 来源于 Leader 在 RPC 中携带的 commitIndex,并非通过自身的 Quorum 计算

相关配置

当然,框架也提供了一些配置项来调整这些 Batch 大小:

作用队列配置项默认值说明

ApplyQueue

raft_apply_batch

32

该打包大小影响复制、持久化以及on_apply;上限为 512

DiskQueue

raft_max_append_buffer_size

256 KB

每次 Batch Write 最大的字节数

ApplyTaskQueue

raft_fsm_caller_commit_batch

512

每次 on_apply 的最大日志数为 raft_apply_batch * raft_fsm_caller_commit_batch

另外,配置 raft_max_entries_size 也会影响每次复制(AppendEntries 请求)携带的日志数量,其默认值为 1024

具体实现

ApplyQueue 消费函数:

int NodeImpl::execute_applying_tasks(
        void* meta, bthread::TaskIterator<LogEntryAndClosure>& iter) {
    ...
    // (1) FLAGS_raft_apply_batch 默认为 32
    // TODO: the batch size should limited by both task size and the total log size
    const size_t batch_size = FLAGS_raft_apply_batch;
    // (2) 定义一个 tasks 数组,数组大小 = min(batch_size, 256)
    DEFINE_SMALL_ARRAY(LogEntryAndClosure, tasks, batch_size, 256);
    size_t cur_size = 0;
    NodeImpl* m = (NodeImpl*)meta;
    for (; iter; ++iter) {
        if (cur_size == batch_size) {
            m->apply(tasks, cur_size);  // (4) 调用批量 apply 函数
            cur_size = 0;
        }
        tasks[cur_size++] = *iter;  // (3) batch 打包
    }
    ...
    return 0;
}

DiskQueue 消费函数:

int LogManager::disk_thread(void* meta,
                            bthread::TaskIterator<StableClosure*>& iter) {
    ...
    // (1) 定义 AppendBatcher 的 Batch 大小为 256
    StableClosure* storage[256];
    AppendBatcher ab(storage, ARRAY_SIZE(storage), &last_id, log_manager);

    for (; iter; ++iter) {
                // ^^^ Must iterate to the end to release to corresponding
                //     even if some error has occurred
        StableClosure* done = *iter;
        ...
        if (!done->_entries.empty()) {
            ab.append(done);  // (2) batch 打包持久化任务
        } else {
            ab.flush();  // (3) 批量持久化
            ...
        }
    }
    ...
    return 0;
}

class AppendBatcher {
public:
    ...
    void flush() {
        if (_size > 0) {
            // 写入持久化存储
            _lm->append_to_storage(&_to_append, _last_id, &metric);
            ...
        }
        ...
    }

    void append(LogManager::StableClosure* done) {
        // FLAGS_raft_max_append_buffer_size 默认为 256KB
        // 一次 Batch Write 写入的字节数不能超过该配置值
        if (_size == _cap ||
                _buffer_size >= (size_t)FLAGS_raft_max_append_buffer_size) {
            flush();
        }
        _storage[_size++] = done;
        _to_append.insert(_to_append.end(),
                         done->_entries.begin(), done->_entries.end());
        for (size_t i = 0; i < done->_entries.size(); ++i) {
            _buffer_size += done->_entries[i]->data.length();
        }
    }
}

ApplyTaskQueue 消费函数:

int FSMCaller::run(void* meta, bthread::TaskIterator<ApplyTask>& iter) {
    FSMCaller* caller = (FSMCaller*)meta;
    ...
    int64_t max_committed_index = -1;
    int64_t counter = 0;
    // FLAGS_raft_fsm_caller_commit_batch 默认为 512
    size_t  batch_size = FLAGS_raft_fsm_caller_commit_batch;
    for (; iter; ++iter) {
        if (iter->type == COMMITTED && counter < batch_size) {
            if (iter->committed_index > max_committed_index) {
                max_committed_index = iter->committed_index;
                counter++;
            }
        } else {
            if (max_committed_index >= 0) {
                caller->_cur_task = COMMITTED;
                caller->do_committed(max_committed_index);
                max_committed_index = -1;
                counter = 0;
                batch_size = FLAGS_raft_fsm_caller_commit_batch;
            }
            ...
        }
    }
    ...
    return 0;
}

// 生成 Iterator,调用用户状态机的 `on_apply`
void FSMCaller::do_committed(int64_t committed_index) {
    ...
    IteratorImpl iter_impl(_fsm, _log_manager, &closure, first_closure_index,
                 last_applied_index, committed_index, &_applying_index);
    for (; iter_impl.is_good();) {
        ...
        Iterator iter(&iter_impl);
        _fsm->on_apply(iter);
        ...
        // Try move to next in case that we pass the same log twice.
        iter.next();
    }
    ...
}

优化 2:并行持久化日志

持久化与复制

在 Raft 的实现中,Leader 需要先在本地持久化日志,再向所有的 Follower 复制日志,显然这样的实现具有较高的时延。特别地客户端的写都要经过 Leader,导致 Leader 的压力会变大,从而导致 IO 延迟变高成为慢节点,而本地持久化也会阻塞后续的 Follower 复制。

所以在 braft 中,Leader 本地持久化和 Follower 复制是并行的,即 Leader 会先将日志写入内存,同时异步地进行持久化和向 Follower 复制。并且只要大多数节点写入成功就可以应用日志,不必等待 Leader 本地持久化成功。

具体实现

具体的实现我们已经在上一节中详细介绍过了,参见 <4.1 复制流程>

优化 3:流水线复制

pipeline

Raft 默认是串行复制日志,需要等待一个 AppendEntries 发送成功后再发送下一个,显然这样不是最高效的。可以采用 Pipeline 的复制方式,即 Leader 发送完 AppendEntries 请求后不必等待其响应,立马发送一下批日志。当然,这样的实现对于接受端(Follower)来说,可能会带来乱序、空洞等问题,为此,braft 在 Follower 端引入了日志缓存,将不是顺序的日志先缓存起来,待其前面的日志都接受到后再写入该日志,以达到日志连续的目的。

特别需要注意的是,pipeline 优化默认是关闭的,需要用户通过以下 2 个配置项开启:

DEFINE_bool(raft_enable_append_entries_cache, false,
            "enable cache for out-of-order append entries requests, should used when "
            "pipeline replication is enabled (raft_max_parallel_append_entries_rpc_num > 1).");

DEFINE_int32(raft_max_parallel_append_entries_rpc_num, 1,
             "The max number of parallel AppendEntries requests");

具体实现

Leader 端 pipeline 的流程如下:

  • (1) 发送 AppendEntries 请求时会判断并行的请求数(flying)是否达到并行上限

    • (1.1) 如果是的话,直接返回;等待收到响应后回调 _on_rpc_returnedflying 计数减一

    • (1.2) 否则继续发送 AppendEntries 请求,并将 flying 计数加一

  • (2) 发送完后,继续判断 flying 数是否已达限,是的话如同步骤(1.1)

  • (3) 否则,判断是否还有日志可以发送:

    • (3.1) 有的话,重复步骤(1)

    • (3.2) 否则注册 waiter 等待新日志到来

具体流程见下面代码解析:

void Replicator::_send_entries() {
    // (1) 如果已经发送的 AppendEntries 请求数大于等于 `raft_max_parallel_append_entries_rpc_num`,
    //     则返回等待 `_on_rpc_returned`
    if (_flying_append_entries_size >= FLAGS_raft_max_entries_size ||
        _append_entries_in_fly.size() >= (size_t)FLAGS_raft_max_parallel_append_entries_rpc_num ||
        _st.st == BLOCKING) {
        ...
        return;
    }
    ...
    // (2) 否则发送 AppendEntries 请求
    google::protobuf::Closure* done = brpc::NewCallback(
                _on_rpc_returned, _id.value, cntl.get(),
                request.get(), response.get(), butil::monotonic_time_ms());
    RaftService_Stub stub(&_sending_channel);
    stub.append_entries(cntl.release(), request.release(),
                        response.release(), done);
    // (3) 发送完后调用 `_wait_more_entries`
    _wait_more_entries();
}

// (4) `_wait_more_entries` 首先判断已经并行的 `AppendEntries` 请求数是否已达上限
//     (4.1) 如果已达上限,则直接返回,等待收到响应后回调 `_on_rpc_returned` 即可
//     (4.2) 否则调用 LogManager::wait 判断是否还有日志没同步
//           如果还有日志,则继续发送;否则创建 waiter 在后台等待新日志到来
void Replicator::_wait_more_entries() {
    if (_wait_id == 0 && FLAGS_raft_max_entries_size > _flying_append_entries_size &&
        (size_t)FLAGS_raft_max_parallel_append_entries_rpc_num > _append_entries_in_fly.size()) {
        _wait_id = _options.log_manager->wait(
                _next_index - 1, _continue_sending, (void*)_id.value);
        ...
    }
    ...
}

// (5) LogManager::wait 会调用 LogManager::notify_on_new_log
LogManager::WaitId LogManager::wait(
        int64_t expected_last_log_index,
        int (*on_new_log)(void *arg, int error_code), void *arg) {
    ...
    return notify_on_new_log(expected_last_log_index, wm);
}

// (6) 判断是否还有日志没同步:
LogManager::WaitId LogManager::notify_on_new_log(
        int64_t expected_last_log_index, WaitMeta* wm) {
    // (6.1) 如果有的话,调用 run_on_new_log 继续发送
    if (expected_last_log_index != _last_log_index || _stopped) {
        ...
        if (bthread_start_urgent(&tid, NULL, run_on_new_log, wm) != 0) {
            ...
        }
        return 0;  // Not pushed into _wait_map
    }
    // (6.2) 没有可发送的日志,创建 waiter 在后台等待新日志
    if (_next_wait_id == 0) {  // skip 0
        ++_next_wait_id;
    }
    const int wait_id = _next_wait_id++;
    _wait_map[wait_id] = wm;
    return wait_id;
}

// (7) run_on_new_log 会调用 `_continue_sending`
void* LogManager::run_on_new_log(void *arg) {
    WaitMeta* wm = (WaitMeta*)arg;
    wm->on_new_log(wm->arg, wm->error_code);  // on_new_log: _continue_sending
    ...
    return NULL;
}

// (8) 而 _continue_sending 则调用 `_send_entries` 继续发送日志
int Replicator::_continue_sending(void* arg, int error_code) {
    ...
    r->_send_entries();
    ...
}

Follower 接收到 AppendEntries 请求后,其处理逻辑如下:

  • 对于到来的日志,如果其前面的日志没有接受到,先将其缓存起来,并要求 Leader 重发前面的日志

  • 待前面的日志都达到后,再将缓存中在其之后的顺序的日志一起写入磁盘

void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
                                             const AppendEntriesRequest* request,
                                             AppendEntriesResponse* response,
                                             google::protobuf::Closure* done,
                                             bool from_append_entries_cache) {
    // (1) 当到来的日志的前面的日志没有接受到时,先将其缓存起来
    if (local_prev_log_term != prev_log_term) {
        int64_t last_index = _log_manager->last_log_index();
        int64_t saved_term = request->term();
        int     saved_entries_size = request->entries_size();
        std::string rpc_server_id = request->server_id();
        // (1.1) 调用 handle_out_of_order_append_entries 将日志缓存起来
        if (!from_append_entries_cache &&
            handle_out_of_order_append_entries(
                    cntl, request, response, done, last_index)) {
            // It's not safe to touch cntl/request/response/done after this point,
            // since the ownership is tranfered to the cache.
            ...
            return;
        }

        // (1.2) 并将响应设为 False, 要求 Leader 重发前面的日志
        response->set_success(false);
        response->set_term(_current_term);
        response->set_last_log_index(last_index);  // follower 的 lastLogIndex
        ...
        return;
    }

    // (2) 当前的日志前面的日志都已达到(即日志是顺序的)
    //     这时候查看缓存中是否还有连续的日志,如果有的话,
    //     将以缓存的日志再次调用 handle_append_entries_request,
    //     并将这些日志一并写入磁盘
    // check out-of-order cache
    check_append_entries_cache(index);

    // (3) 将日志持久化
    FollowerStableClosure* c = new FollowerStableClosure(
            cntl, request, response, done_guard.release(),
            this, _current_term);
    _log_manager->append_entries(&entries, c);
}

优化 4:raft sync

sync 配置

日志每次 Batch Write 是先将日志写入 Page Cache,最后再调用一次 fsync 操作。显然 sync 操作会增加时延,为此 braft 提供了一些配置项来控制日志的 sync 行为。用户可以根据业务数据丢失的容忍度高低,灵活调整这些配置以达到性能和可靠性之间的权衡。例如对于数据丢失容忍度较高的业务,可以选择将配置项 raft_sync 设置为 Flase,这将有助于提升性能。

以下是控制日志 sync 的相关参数,前三项针对的是每次的 Batch Write,最后两项针对的是 Segment

配置项说明默认值

raft_sync

每次 Batch Write 后是否需要 sync

True

raft_sync_policy

对于每次 Batch Write 后的 sync 策略,有立即 syncRAFT_SYNC_IMMEDIATELY)和按字节数 sync (RAFT_SYNC_BY_BYTES)

RAFT_SYNC_IMMEDIATELY

raft_sync_per_bytes

RAFT_SYNC_BY_BYTES 的策略下,每多少字节 sync 一次

INT32_MAX

raft_sync_segments

每当日志写满一个 Segment 需要切换时是否需要 sync,每个 Segment 默认存储 8MB 的日志

False

raft_max_segment_size

单个日志 Segment 大小

8MB

具体实现

Batch Write 写入 Page Cache 后会调用 Segment::sync 进行 sync 操作:

int Segment::sync(bool will_sync, bool has_conf) {
    ...
    if (will_sync) {
        // (1) 如果 `raft_sync` 为 False,则直接返回
        if (!FLAGS_raft_sync) {
            return 0;
        }

        // (2) 如果是按字节 `sync` 并且当前未 `sync` 的字节小于配置值,则直接返回
        if (FLAGS_raft_sync_policy == RaftSyncPolicy::RAFT_SYNC_BY_BYTES
            && FLAGS_raft_sync_per_bytes > _unsynced_bytes
            && !has_conf) {
            return 0;
        }
        _unsynced_bytes = 0;

        // (3) 调用 fsync(fd)
        return raft_fsync(_fd);
    }
    return 0;
}

每当一个 Segment 文件写完后,都会调用 Segment::close 将其转换成 closed segment

int Segment::close(bool will_sync) {
    ...
    if (FLAGS_raft_sync_segments && will_sync) {
        ret = raft_fsync(_fd);
    }
    ...
}

优化 5:异步 Apply

void on_apply(braft::Iterator& iter) {
    // A batch of tasks are committed, which must be processed through
    // |iter|
    for (; iter.valid(); iter.next()) {
        // This guard helps invoke iter.done()->Run() asynchronously to
        // avoid that callback blocks the StateMachine.
        braft::AsyncClosureGuard closure_guard(iter.done());
        // Parse operation from iter.data() and execute this operation
        // op = parse(iter.data());
        // result = process(op)

        // The purpose of following logs is to help you understand the way
        // this StateMachine works.
        // Remove these logs in performance-sensitive servers.
        LOG_IF(INFO, FLAGS_log_applied_task)
                << "Exeucted operation " << op
                << " and the result is " << result
                << " at log_index=" << iter.index();
    }
}

当日志被提交时,框架会串行调用用户状态机的 on_apply,虽然这里做了 Batch 优化,但是对于那些不支持批量更新的状态机来说,仍然是低效的。为此,用户可以将 on_apply 函数异步执行,让那些可以并行的操作尽可能并行起来。

需要注意的是,当 on_apply 异步后,需要处理好节点成为 Leader 时日志回放的问题。当节点刚成为 Leader 时,需要回放(on_apply)之前任期的日志,这时候需要将这些日志全部应用到状态机后才能处理读取操作,不然可能会违背线性一致性。因为这些日志在之前的任期可能被提交了,客户端能读取到,而在新任期回放的时候,由于这些日志是异步执行的(on_apply 返回了,但还在异步执行中),可能还没应用到状态机,这时候客户端去读取可能是读取不到的。

参考

Last updated