流程详解
流程概览
用户通过 add_peer
/remove_peer
/change_peer
接口向 Leader 提交配置变更
若当前 Leader 有配置变更正在进行,则返回 EBUSY
;若新老配置相同,则直接返回成功
进入日志追赶(CaughtUp
)阶段:
3.1 将新老配置做 diff
获得新加入的节点列表;若没有新增节点,则直接进入下一阶段
3.2 为每个新节点创建 Replicator
,并对所有新节点定期广播心跳
3.3 为每个新节点安装最新的快照,每当一个节点完成快照的安装,则进行步骤 3.4 的判断
3.4 判断该节点日志与当前 Leader 之间的差距:
3.4.1 若小于一定值(默认 1000 条日志),则将其从追赶列表中移除
3.4.2 若追赶列表为空,即所有新节点都追赶上了 Leader 的日志,则进入下一阶段
3.5 向每个新节点同步日志,每当一个节点成功同步一部分日志,重复步骤 3.4
3.6 若追赶阶段任意节点失败,则本次配置变更标记为失败
进入联合共识(Joint-Consensus
)阶段:
4.1 Leader 应用联合配置(即 C{old,new}
)为当前配置,以该配置视角进行选举和日志复制:
4.1.1 在该阶段,选举需要同时在新老集群都 达到 Qourum
4.1.2 在该阶段,日志复制需要同时在新老集群都 达到 Qourum
才能提交
4.2 Leader 将联合配置日志(即 C{old,new}
)同时复制给新老集群;当然也复制其他日志
4.3 待联合配置日志在新老集群都达到 Qourum
,则提交并应用该日志:
4.3.1 调用该日志的 Closure
,进入下一阶段
进入同步新配置(Stable
)阶段:
5.1 Leader 应用新配置(即 C{new}
)为当前配置,以该配置视角进行选举和日志复制:
5.1.1 在该阶段,选举只需 在新集群达到 Qourum
5.1.2 在该阶段,日志只需 在新集群达到 Qourum
即可提交
5.1.3 在该阶段,日志仍会被复制给老集群,因为老集群节点的 Replicator
还未停止
5.2 Leader 将新配置日志(即 C{new}
)同时复制给新老集群;当然也复制其他日志
5.3 待新配置日志在新集群中达到 Qourum
,则提交并应用该日志:
5.3.1 以新配置作为参数,调用用户状态机的 on_configuration_committed
5.3.2 调用该配置的 Closure
,进入下一阶段
进行清理工作:
6.1 Leader 移除不在集群中的 Replicator
,不再向它们发送心跳和日志
6.3 若当前 Leader 不在新配置中,则 step_down
变为 Follower:
6.3.1 调用用户状态机的 on_leader_stop
6.3.2 向拥有最长日志的节点发送 TimeoutNow
请求,让其立马进行选举
对于不在集群中的节点:
7.1 若其拥有了新配置日志,则其发现自身不在集群中,则不会发起选举
7.2 若其未拥有新配置日志,则会超时发起选举,但由于日志落后,其不会通过 PreVote
干扰集群
上图来自 Raft 作者的博士论文 4.2.3 Disruptive servers
,描述的是一个被移除的节点干扰集群,造成集群重新选举的场景:
由节点 S1,S2,S3,S4
组成的集群中,S4
为 Leader
用户执行配置变更,调用 remove_peer
将节点 S1
移除集群
当 S4
同步新配置时,已将新配置持久化到本地,但未复制给 S2,S3
由于 S1
不再接收 S4
的心跳,触发其选举,并在 PreVote
阶段获得 S1,S2,S3
的支持
S1
增加自身 term
进行 RequestVote
,S4
发现其 term
比自己高,则会 step down
在<3.2 选举优化> 中提到过,这种场景 PreVote
无法阻止,但是可以利用 Check Quorum
解决。但是其实在目前 braft 的实现中,这种场景是不会出现的,因为 Leader 是将新配置(即 C{new}
)复制到新集群的大多数集群后,才开始移除不在新集群中的 Replicator
,并停止广播心跳。
相关接口
Copy class Node {
public :
// list peers of this raft group, only leader retruns ok
// [NOTE] when list_peers concurrency with add_peer/remove_peer, maybe return peers is staled.
// because add_peer/remove_peer immediately modify configuration in memory
butil :: Status list_peers (std :: vector < PeerId > * peers);
// Add a new peer to the raft group. done->Run() would be invoked after this
// operation finishes, describing the detailed result.
void add_peer ( const PeerId & peer , Closure * done);
// Remove the peer from the raft group. done->Run() would be invoked after
// this operation finishes, describing the detailed result.
void remove_peer ( const PeerId & peer , Closure * done);
// Change the configuration of the raft group to |new_peers| , done->Run()
// would be invoked after this operation finishes, describing the detailed
// result.
void change_peers ( const Configuration & new_peers , Closure * done);
};
阶段一:追赶日志
调用接口
用户调用各接口要求 Leader 进行配置变更。在这些接口中,会生成新配置和老配置,最终都会调用 on_configuration_committed
执行变更:
Copy void NodeImpl :: add_peer ( const PeerId & peer , Closure * done) {
...
Configuration new_conf = _conf . conf;
new_conf . add_peer (peer);
return unsafe_register_conf_change ( _conf . conf , new_conf , done);
}
void NodeImpl :: remove_peer ( const PeerId & peer , Closure * done) {
...
Configuration new_conf = _conf . conf;
new_conf . remove_peer (peer);
return unsafe_register_conf_change ( _conf . conf , new_conf , done);
}
void NodeImpl :: change_peers ( const Configuration & new_peers , Closure * done) {
...
return unsafe_register_conf_change ( _conf . conf , new_peers , done);
}
开始变更
而在 unsafe_register_conf_change
函数中首先做进行一些判断,决定是否要执行变更。若要进行变更,则调用 ConfigurationCtx::start
正式开始变更:
Copy void NodeImpl :: unsafe_register_conf_change ( const Configuration & old_conf ,
const Configuration & new_conf ,
Closure * done) {
...
// (1) 如果当前 Leader 已经有配置变更在进行,则返回 EBUSY
if ( _conf_ctx . is_busy ()) {
...
if (done) {
done -> status () . set_error (EBUSY , "Doing another configuration change" );
run_closure_in_bthread (done);
}
return ;
}
...
// (2) 如果新老配置一样,则直接返回
if ( _conf . conf . equals (new_conf)) {
run_closure_in_bthread (done);
return ;
}
...
// (3) 调用 ConfigurationCtx::start 开始变更
return _conf_ctx . start (old_conf , new_conf , done);
}
ConfigurationCtx::start
的主要流程详见以下注释:
Copy void NodeImpl :: ConfigurationCtx :: start ( const Configuration & old_conf ,
const Configuration & new_conf ,
Closure * done) {
// (1) 保存接口的 Closure,将当前阶段设为 `STAGE_CATCHING_UP`
_done = done;
_stage = STAGE_CATCHING_UP;
// (2) 将新老配置做 diff,获得新增节点列表
old_conf . list_peers ( & _old_peers);
new_conf . list_peers ( & _new_peers);
Configuration adding;
Configuration removing;
new_conf . diffs (old_conf , & adding , & removing);
_nchanges = adding . size () + removing . size ();
// (3) 如果没有新增节点,则直接进入下一阶段(联合共识)
if ( adding . empty ()) {
...
return next_stage ();
}
adding . list_peers ( & _adding_peers);
for (std :: set< PeerId > :: const_iterator iter
= _adding_peers . begin (); iter != _adding_peers . end (); ++ iter) {
// (4) 为每个新增节点创建 Replicator,详见以下 <创建 Replicator>
if ( _node -> _replicator_group . add_replicator ( * iter) != 0 ) {
...
return on_caughtup (_version , * iter , false );
}
// (5) 为每个新增节点保存 CatchupClosure 用来判断追赶进度,详见以下 <保存 Closure>
OnCaughtUp * caught_up = new OnCaughtUp (
_node , _node -> _current_term , * iter , _version);
...
if ( _node -> _replicator_group . wait_caughtup (
* iter , _node -> _options . catchup_margin , & due_time , caught_up) != 0 ) {
...
}
}
创建 Replicator
ReplicatorGroup::add_replicator
会为每个节点创建 Replicator
,并将其启动,Replicator
负责发送心跳和同步日志。
关于 Replicator
的创建,我们已经在<3.1 选举流程>中详细介绍过了,见创建 Replicator :
Copy int ReplicatorGroup :: add_replicator ( const PeerId & peer) {
...
options . replicator_status = new ReplicatorStatus;
...
if (Replicator :: start (options , & rid) != 0 ) {
...
return - 1 ;
}
_rmap [peer] = { rid , options . replicator_status };
return 0 ;
}
保存 Closure
在<开始变更>的主干函数中会调用 ReplicatorGroup::wait_caughtup
为每个新增节点保存 OnCaughtUp
,其会在安装快照或同步日志后被调用,来判断追赶进度是否可以进入下一阶段。具体流程见以下注释:
Copy int ReplicatorGroup :: wait_caughtup ( const PeerId & peer ,
int64_t max_margin , const timespec * due_time ,
CatchupClosure * done) {
...
Replicator :: wait_for_caught_up (rid , max_margin , due_time , done);
return ;
}
void Replicator :: wait_for_caught_up ( ReplicatorId id ,
int64_t max_margin ,
const timespec * due_time ,
CatchupClosure * done) {
...
// 保存 CatchupClosure 为 `OnCaughtUp`
r -> _catchup_closure = done;
}
安装快照
当 Replicator
被创建后,其就会调用 _send_entries
开始复制日志。由于新加入的节点需要的日志很大概率已经被快照压缩了,所以需要向其安装快照。安装快照的流程我们已经在<5.2 安装快照> 中详细介绍过来,这里只阐述相关流程:
Copy void Replicator :: _send_entries () {
...
// (1) 如果日志已经被压缩了,则安装快照
if ( _fill_common_fields ( request . get () , _next_index - 1 , false ) != 0 ) {
...
// (2) 开始安装快照
return _install_snapshot ();
}
...
}
// (3) 待快照安装完成,收到节点的 `InstallSnapshot` 响应后,
// 会调用 _on_install_snapshot_returned
void Replicator :: _on_install_snapshot_returned (
ReplicatorId id , brpc :: Controller * cntl ,
InstallSnapshotRequest * request ,
InstallSnapshotResponse * response) {
...
r -> _has_succeeded = true ;
// (4) 调用 Replicator::_notify_on_caught_up 判断日志差距
// 来决定继续同步日志,还是进入下一阶段,
// 详见以下<判断日志差距>
r -> _notify_on_caught_up ( 0 , false );
...
return r -> _send_entries ();
}
同步日志
如果日志差距仍大于配置值,则先继续调用 _send_entries
同步日志:
Copy void Replicator :: _send_entries () {
...
// (1) 发送 AppendEntries 请求同步日志,并设置回调函数为 `_on_rpc_returned`
google :: protobuf :: Closure * done = brpc :: NewCallback (
_on_rpc_returned , _id . value , cntl . get () ,
request . get () , response . get () , butil :: monotonic_time_ms ());
RaftService_Stub stub ( & _sending_channel);
stub . append_entries ( cntl . release () , request . release () ,
response . release () , done);
...
}
// (2) 收到 AppendEntries 响应,会调用 _on_rpc_returned
void Replicator :: _on_rpc_returned ( ReplicatorId id , brpc :: Controller * cntl ,
AppendEntriesRequest * request ,
AppendEntriesResponse * response ,
int64_t rpc_send_time) {
...
r -> _has_succeeded = true ;
r -> _notify_on_caught_up ( 0 , false );
...
// (4) 调用 Replicator::_notify_on_caught_up 判断日志差距
// 来决定继续同步日志,还是进入下一阶段,
// 详见以下<判断日志差距>
r -> _send_entries ();
return ;
}
判断日志差距
成功安装快照或每次成功同步一批日之后,都会调用 _notify_on_caught_up
判断新加入节点日志与当前 Leader 的差距。若差距小于一定值(默认为 1000),
Copy // (1) 调用 _notify_on_caught_up 判断日志差距
void Replicator :: _notify_on_caught_up ( int error_code , bool before_destroy) {
// (2) 判断新节点日志与当前 Leader 的差距是否小于一定值(默认为 1000,用户可通过 NodeOption 配置)
// 若不满足条件,则继续同步日志
if ( ! _is_catchup ( _catchup_closure -> _max_margin)) {
return ;
}
...
// (3) 否则,调用 之前保存的 CatchupClosure,即 OnCaughtUp
Closure * saved_catchup_closure = _catchup_closure;
...
return run_closure_in_bthread (saved_catchup_closure);
}
class OnCaughtUp : public CatchupClosure {
public :
...
virtual void Run () {
// (4) 调用 NodeImpl::on_caughtup
_node -> on_caughtup (_peer , _term , _version , status ());
...
};
...
};
void NodeImpl :: on_caughtup ( const PeerId & peer , int64_t term ,
int64_t version , const butil :: Status & st) {
...
if ( st . ok ()) { // Caught up successfully
// (5) 调用 NodeImpl::ConfigurationCtx::on_caughtup
_conf_ctx . on_caughtup (version , peer , true );
return ;
}
...
}
void NodeImpl :: ConfigurationCtx :: on_caughtup (
int64_t version , const PeerId & peer_id , bool succ) {
...
// (6) 将已经满足条件的节点从 _adding_peers 中移除
// 如果所有节点均已追赶成功,则调用 next_stage 进入下一阶段(联合共识)
if (succ) {
_adding_peers . erase (peer_id);
if ( _adding_peers . empty ()) {
return next_stage ();
}
return ;
}
...
}
阶段二:联合共识
进入新阶段
当所有新增节点都追赶上了 Leader 的日志,则调用 next_stage
进入下一阶段,详见以下注释:
Copy void NodeImpl :: ConfigurationCtx :: next_stage () {
CHECK ( is_busy ());
switch (_stage) {
case STAGE_CATCHING_UP : // (1) 执行该分支
if (_nchanges > 1 ) {
_stage = STAGE_JOINT; // (2) 进入联合共识阶段
Configuration old_conf (_old_peers);
return _node -> unsafe_apply_configuration ( // (3) 调用 unsafe_apply_configuration 开始变更
Configuration (_new_peers) , & old_conf , false );
}
// Skip joint consensus since only one peer has been changed here. Make
// it a one-stage change to be compitible with the legacy
// implementation.
case STAGE_JOINT :
_stage = STAGE_STABLE;
return _node -> unsafe_apply_configuration (
Configuration (_new_peers) , NULL , false );
case STAGE_STABLE :
{
bool should_step_down =
_new_peers . find ( _node -> _server_id) == _new_peers . end ();
butil :: Status st = butil :: Status :: OK ();
reset ( & st);
if (should_step_down) {
_node -> step_down ( _node -> _current_term , true ,
butil :: Status (ELEADERREMOVED , "This node was removed" ));
}
return ;
}
case STAGE_NONE :
CHECK ( false ) << "Can't reach here" ;
return ;
}
}
开始变更
unsafe_apply_configuration
具体会执行以下这些工作,详见以下注释,其中的每一项工作我们将在之后逐一介绍:
Copy void NodeImpl :: unsafe_apply_configuration ( const Configuration & new_conf ,
const Configuration * old_conf ,
bool leader_start) {
// (1) 生产配置日志,根据 new_conf 和 old_conf 生成联合配置日志或是新配置日志
LogEntry * entry = new LogEntry ();
entry -> AddRef ();
entry -> id . term = _current_term;
entry -> type = ENTRY_TYPE_CONFIGURATION;
entry -> peers = new std :: vector < PeerId > ;
new_conf . list_peers ( entry -> peers);
if (old_conf) {
entry -> old_peers = new std :: vector < PeerId > ;
old_conf -> list_peers ( entry -> old_peers);
}
// (2) 设置日志应用后的回调函数,
// 即该配置日志被复制并成功应用后,就会调用该回调函数
ConfigurationChangeDone * configuration_change_done =
new ConfigurationChangeDone ( this , _current_term , leader_start , _leader_lease . lease_epoch ());
// (3) 为这个配置日志初始化投票规则,这是实现在新老集群同时达到 Quorum 才能提交
// 还是只需要在新集群达到 Quorum 就可以提交的关键
// Use the new_conf to deal the quorum of this very log
_ballot_box -> append_pending_task (new_conf , old_conf , configuration_change_done);
...
// (4) 将配置日志追加到 LogManager,其会唤醒 Replicator 向 Follower 同步日志
_log_manager -> append_entries ( & entries ,
new LeaderStableClosure (
NodeId (_group_id , _server_id) ,
1 u , _ballot_box));
// (5) 将配置(联合配置或新配置)设为当前节点配置
_log_manager -> check_and_set_configuration ( & _conf);
}
初始化投票规则
Copy int BallotBox :: append_pending_task ( const Configuration & conf , const Configuration * old_conf ,
Closure * closure) {
// (1) 调用 Ballot::init
Ballot bl;
if ( bl . init (conf , old_conf) != 0 ) {
return - 1 ;
}
...
}
int Ballot :: init ( const Configuration & conf , const Configuration * old_conf) {
_peers . clear ();
_old_peers . clear ();
// (1)初始化 quorum 和 old_quorum 为 0
_quorum = 0 ;
_old_quorum = 0 ;
// (2) 如果是新配置,只设置 _quorum
_peers . reserve ( conf . size ());
for (Configuration :: const_iterator
iter = conf . begin (); iter != conf . end (); ++ iter) {
_peers . push_back ( * iter);
}
_quorum = _peers . size () / 2 + 1 ;
// (3) 如果是联合配置,_old_quorum 也将被设置
if ( ! old_conf) {
return 0 ;
}
_old_peers . reserve ( old_conf -> size ());
for (Configuration :: const_iterator
iter = old_conf -> begin (); iter != old_conf -> end (); ++ iter) {
_old_peers . push_back ( * iter);
}
_old_quorum = _old_peers . size () / 2 + 1 ;
return 0 ;
}
应用联合配置
Leader 调用 check_and_set_configuration
将当前配置变为联合配置,即 C{old,new}
:
Copy bool LogManager :: check_and_set_configuration ( ConfigurationEntry * current) {
...
// (1) 从 _config_manager 中获取最新的配置
const ConfigurationEntry & last_conf = _config_manager -> last_configuration ();
// (2) 将其变为当前节点配置
if ( current -> id != last_conf . id) {
* current = last_conf;
return true ;
}
return false ;
}
// (3) 而 _config_manager 的配置是在之前往 LogManager 追加配置日志时,保存在 _config_manager 中的
void LogManager :: append_entries (
std :: vector < LogEntry * > * entries , StableClosure * done) {
...
for ( size_t i = 0 ; i < entries -> size (); ++ i) {
...
if (( * entries)[i] -> type == ENTRY_TYPE_CONFIGURATION) {
ConfigurationEntry conf_entry ( * (( * entries)[i]));
_config_manager -> add (conf_entry);
}
}
...
}
复制日志
调用 LogManager::append_entries
追加配置日志(即 C{old,new}
) 后,Replicator
就会被唤醒向 Follower 同步日志。
Leader 会将普通日志和配置日志(即 C{old,new}
)复制给新老集群,每收到一个节点的成功响应,都会调用 BallotBox::commit_at
将对应日志的计算加一,如果其达到了 Quorum
则提交并应用该日志。具体的日志复制流程我们在之前的章节都详细介绍过了,详见<4.1 复制流程> 。
提交日志
Leader 本地持久化成功或每成功复制日志给一个 Follower,都会调用 BallotBox::commit_at 将对应日志的复制计数加一,如果达到 Quorum,则更新 commitIndex,并将其应用:
Copy int BallotBox :: commit_at (
int64_t first_log_index , int64_t last_log_index , const PeerId & peer) {
...
// (1) 将 index 在 [first_log_index, last_log_index] 之间的日志计数加一
int64_t last_committed_index = 0 ;
const int64_t start_at = std :: max (_pending_index , first_log_index);
Ballot :: PosHint pos_hint;
for ( int64_t log_index = start_at; log_index <= last_log_index; ++ log_index) {
Ballot & bl = _pending_meta_queue [log_index - _pending_index];
pos_hint = bl . grant (peer , pos_hint);
if ( bl . granted ()) { // (2) 该日志在新老集群都达到 Quorum,则提交该日志,见以下的 granted 函数
last_committed_index = log_index;
}
}
if (last_committed_index == 0 ) {
return 0 ;
}
...
_pending_index = last_committed_index + 1 ;
// (3) 更新 commitIndex
_last_committed_index . store (last_committed_index , butil :: memory_order_relaxed);
// (4) 调用 FSMCaller::on_committed 开始应用日志
// The order doesn't matter
_waiter -> on_committed (last_committed_index);
return 0 ;
}
granted
中的 _quorum
和 _old_quorum
都在以上的<初始化投票规则>中设为相应的值了:
Copy class Ballot {
public :
...
bool granted () const { return _quorum <= 0 && _old_quorum <= 0 ; }
...
}
应用日志
FSMCaller::on_committed
最终会调用 FSMCaller::do_committed
开始应用日志,具体见以下注释:
Copy void FSMCaller :: do_committed ( int64_t committed_index) {
...
IteratorImpl iter_impl (_fsm , _log_manager , & closure , first_closure_index ,
last_applied_index , committed_index , & _applying_index);
for (; iter_impl . is_good ();) {
...
// (1) 如果是新配置日志(即 C{new})被提交,则会调用状态机的 on_configuration_committed
// 如果是联合配置日志,则不会
if ( iter_impl . entry () -> type == ENTRY_TYPE_CONFIGURATION) {
if ( iter_impl . entry () -> old_peers == NULL ) {
// Joint stage is not supposed to be noticeable by end users.
_fsm -> on_configuration_committed (
Configuration ( * iter_impl . entry () -> peers) ,
iter_impl . entry () -> id . index);
}
...
// (2) 不管是那种配置日志被提交,都会调用 Closure
// 即 ConfigurationChangeDone
if ( iter_impl . done ()) {
iter_impl . done () -> Run ();
}
iter_impl . next ();
continue ;
}
Iterator iter ( & iter_impl);
_fsm -> on_apply (iter);
...
iter . next ();
}
...
}
回调 Closure
当配置日志被应用后,其会调用 ConfigurationChangeDone::Run
,而该函数最终会调用 ConfigurationCtx::next_stage
进入下一阶段:
Copy class ConfigurationChangeDone : public Closure {
public :
void Run () {
if ( status () . ok ()) {
_node -> on_configuration_change_done (_term);
...
}
...
}
...
};
void NodeImpl :: on_configuration_change_done ( int64_t term) {
...
_conf_ctx . next_stage ();
}
阶段三:同步新配置
进入新阶段
Copy void NodeImpl :: ConfigurationCtx :: next_stage () {
CHECK ( is_busy ());
switch (_stage) {
case STAGE_CATCHING_UP :
if (_nchanges > 1 ) {
_stage = STAGE_JOINT;
Configuration old_conf (_old_peers);
return _node -> unsafe_apply_configuration (
Configuration (_new_peers) , & old_conf , false );
}
// Skip joint consensus since only one peer has been changed here. Make
// it a one-stage change to be compitible with the legacy
// implementation.
case STAGE_JOINT : // (1) 执行该分支
_stage = STAGE_STABLE; // (2) 进入 Stable 阶段
return _node -> unsafe_apply_configuration ( // (3) 同样调用 unsafe_apply_configuration 开始变更
Configuration (_new_peers) , NULL , false );
case STAGE_STABLE :
{
bool should_step_down =
_new_peers . find ( _node -> _server_id) == _new_peers . end ();
butil :: Status st = butil :: Status :: OK ();
reset ( & st);
if (should_step_down) {
_node -> step_down ( _node -> _current_term , true ,
butil :: Status (ELEADERREMOVED , "This node was removed" ));
}
return ;
}
case STAGE_NONE :
CHECK ( false ) << "Can't reach here" ;
return ;
}
}
开始变更
应用新配置
复制日志
提交日志
应用日志
回调 Closure
以上这些步骤跟<阶段二:联合共识>的步骤是一样的,你可以从<开始变更> 开始重走一遍逻辑。
阶段四:清理工作
进入新阶段
待新配置被提交后,会调用 next_stage
进入清理阶段:
Copy void NodeImpl :: ConfigurationCtx :: next_stage () {
CHECK ( is_busy ());
switch (_stage) {
case STAGE_CATCHING_UP :
if (_nchanges > 1 ) {
_stage = STAGE_JOINT;
Configuration old_conf (_old_peers);
return _node -> unsafe_apply_configuration (
Configuration (_new_peers) , & old_conf , false );
}
// Skip joint consensus since only one peer has been changed here. Make
// it a one-stage change to be compitible with the legacy
// implementation.
case STAGE_JOINT :
_stage = STAGE_STABLE;
return _node -> unsafe_apply_configuration (
Configuration (_new_peers) , NULL , false );
case STAGE_STABLE : // (1) 执行该分支
{
bool should_step_down =
_new_peers . find ( _node -> _server_id) == _new_peers . end ();
// (2) 以成功状态码调用 reset 进行一些清理工作,
// 以及回调用户通过接口传递的 Closure
butil :: Status st = butil :: Status :: OK ();
reset ( & st);
// (3) 如果当前 Leader 不在新配置中,则调用 step_down
if (should_step_down) {
_node -> step_down ( _node -> _current_term , true ,
butil :: Status (ELEADERREMOVED , "This node was removed" ));
}
return ;
}
case STAGE_NONE :
CHECK ( false ) << "Can't reach here" ;
return ;
}
}
停止 Replicator
回调 Closure
reset
函数会执行一些清理工作以及回调接口的 Closure
,详见以下注释:
Copy void NodeImpl :: ConfigurationCtx :: reset (butil :: Status * st) {
// (1) 调用 NodeImpl::stop_replicator 停止不在新配置中的 Replicator
if (st && st -> ok ()) {
_node -> stop_replicator (_new_peers , _old_peers);
} else {
...
}
// (2) 清空一些状态
_new_peers . clear ();
_old_peers . clear ();
_adding_peers . clear ();
++ _version;
_stage = STAGE_NONE;
_nchanges = 0 ;
...
// (3) 若用户在调用接口([add|remove|change]_peers)时有传入 Closure,
// 则以成功状态回调 Closure
if (_done) {
...
_done -> status () = * st;
...
run_closure_in_bthread (_done);
...
}
}
stop_replicator
函数会清理不在新配置中 Follower 对应的 Replicator
,具体流程见以下注释:
Copy void NodeImpl :: stop_replicator ( const std :: set < PeerId > & keep ,
const std :: set < PeerId > & drop) {
for (std :: set< PeerId > :: const_iterator
iter = drop . begin (); iter != drop . end (); ++ iter) {
if ( keep . find ( * iter) == keep . end () && * iter != _server_id) {
// (1) 如果节点不在新配置中,则调用 `ReplicatorGroup::stop_replicator` 停止其 Replicator
_replicator_group . stop_replicator ( * iter);
}
}
}
int ReplicatorGroup :: stop_replicator ( const PeerId & peer) {
// (2) 找到节点最用的 Replicator
std :: map< PeerId , ReplicatorIdAndStatus > :: iterator iter = _rmap . find (peer);
...
ReplicatorId rid = iter -> second . id;
// Calling ReplicatorId::stop might lead to calling stop_replicator again,
// erase iter first to avoid race condition
_rmap . erase (iter); // (3) 将其从 ReplicatorGroup 中删除
return Replicator :: stop (rid); // (4) 调用 Replicator::stop
}
int Replicator :: stop ( ReplicatorId id) {
bthread_id_t dummy_id = { id };
...
// (5) 向 Replicator 对应的 bthread 发送 ESTOP 状态码
return bthread_id_error (dummy_id , ESTOP);
}
int Replicator :: _on_error ( bthread_id_t id , void* arg , int error_code) {
Replicator * r = (Replicator * )arg;
// (6) 接受到状态码后,会调用 _on_error 处理
if (error_code == ESTOP) {
...
bthread_timer_del ( r -> _heartbeat_timer); // (6.1) 停止向其发送心跳
r -> _options . log_manager -> remove_waiter ( r -> _wait_id); // (6.2) 停止向其复制日志
...
r -> _wait_id = 0 ;
...
return 0 ;
}
...
}
step_down
在 step_down
函数中会向一个日志最长的节点发送一个 TimeoutNow
请求,让其立马进行选举,该优化我们曾在<3.2 选举优化> 中提到过。step_down
的具体流程见以下注释:
Copy void NodeImpl :: step_down ( const int64_t term , bool wakeup_a_candidate ,
const butil :: Status & status) {
...
// (1) 调用用户状态机的 on_leader_stop
if (_state == STATE_LEADER) {
_fsm_caller -> on_leader_stop (status);
}
...
// (2) 将自身转变为 Follower
// soft state in memory
_state = STATE_FOLLOWER;
// (3) 向日志最长的节点发送 `TimeoutNow` 请求,让其立马进行选举
// stop stagging new node
if (wakeup_a_candidate) {
_replicator_group . stop_all_and_find_the_next_candidate (
& _waking_candidate , _conf);
// FIXME: We issue the RPC in the critical section, which is fine now
// since the Node is going to quit when reaching the branch
Replicator :: send_timeout_now_and_stop (
_waking_candidate , _options . election_timeout_ms);
} else {
...
}
...
// (4) 启动选举超时定时器,待其超时后会触发该节点进行选举
_election_timer . start ();
}
其他:故障恢复
当执行配置变更的 Leader 中途 Crash 了,新当选的 Leader 继续执行配置变更:
Copy // (1) 节点当选为 Leader
void NodeImpl :: become_leader () {
...
// (2) 调用 ConfigurationCtx::flush
_conf_ctx . flush ( _conf . conf , _conf . old_conf);
...
}
void NodeImpl :: ConfigurationCtx :: flush ( const Configuration & conf ,
const Configuration & old_conf) {
...
conf . list_peers ( & _new_peers);
if ( old_conf . empty ()) {
_stage = STAGE_STABLE;
_old_peers = _new_peers;
} else {
_stage = STAGE_JOINT;
old_conf . list_peers ( & _old_peers);
}
// (3) 依旧是调用 unsafe_apply_configuration 开始变更,详见以上 <开始变更>
_node -> unsafe_apply_configuration (conf , old_conf . empty () ? NULL : & old_conf ,
true );
}