流程详解
流程概览
当 Follower 落后太多或新节点加入集群时,就会触发安装快照,其流程如下:
Leader 向 Follower 发送 InstallSnapshot
请求,并停止向 Follower 同步日志
Follower 根据请求中的 URI
逐一向 Leader 下载快照对应的文件集:
2.2 发送 GetFile
请求获取 Leader 快照的元数据
2.3 创建 temp
目录用于保存下载的临时快照
2.3 根据元数据中的文件列表对比本地快照与远程快照的差异,获得下载文件列表
2.4 根据文件列表,逐一发送 GetFile
请求下载文件并保存至临时快照
2.5 待快照下载完成后,删除本地快照,并将临时快照 rename()
成正式快照
Follower 回调用户状态机的 on_snapshot_load
加载快照
等待快照加载完毕后:
4.1 更新 applyIndex
为快照元数据中的 lastIncludedIndex
4.2 删除 logIndex<=lastIncludedIndex
的日志(即全部日志)
Follower 向 Leader 发送成功的 InstallSnapshot
响应
Leader 收到成功响应后更新 Follower 的 nextIndex
为快照的 lastIncludedIndex
+ 1
Leader 从 nextIndex
开始继续向 Follower 发送日志
大文件下载
Copy message GetFileRequest {
required int64 reader_id = 1 ;
required string filename = 2 ;
required int64 count = 3 ; // 分片下载 length
required int64 offset = 4 ; // 分片下载 offset
optional bool read_partly = 5 ;
}
Follower 通过发送 GetFileRequest
从 Leader 下载文件,而当快照的文件较大时(超过单个分片大小),这时候就会开启分片下载。每次通过设置 GetFileRequest
中的 count
和 offset
来实现分片下载,默认每个分片为 128KB
,其受配置项 raft_max_byte_count_per_rpc
控制:
Copy DEFINE_int32 (raft_max_byte_count_per_rpc , 1024 * 128 /*128K*/ ,
"Maximum of block size per RPC" );
BRPC_VALIDATE_GFLAG (raft_max_byte_count_per_rpc , brpc :: PositiveInteger);
断点续传
Follwer 从 Leader 下载的快照文件会保存在临时快照 temp
目录中,如果 Follower 下载了一部分后 Crash,在重启后重新接收 InstallSnapshot
开始下载快照时,其不会删除 temp
目录,而是对比本地临时快照和远程快照的元数据,对于那些本地已经存在且 CRC
一样的文件,则无需重复下载:
进一步地,对于本地正式快照已经存在的文件也无需重复下载。之所以要对比本地正式快照,是因为该快照可能也来自于 Leader 之前的快照:
总的来说,为了减少网络的传输,只要本地存在的文件,其文件名和 CRC 和 Leader 的一样就无需重复下载,详见以下过滤下载列表 。
快照限流
当一个 Raft 进程挂掉一段时间后重启,其可能会从 Leader 下载快照。特别地,当一个进程上跑着大量的 Raft Group
,而每一个 Node
都需要从 Leader 下载快照,这时候下载的数据量将是庞大的,可能会占满 Leader 和 Follower 的网卡和磁盘带宽,影响正常的 IO
。为此,braft 提供了相应的快照限流特性。
限流作用于以下两个维度:
任务个数:节点每开启一个安装快照任务,任务计数将加一(该限制仅作用于 Follower)
带宽:Leader 读取本地快照、Follower 通过网络从 Leader 下载文件(文件会写入临时快照),带宽计数将增加对应的字节数。总的来说,其作用的是磁盘带宽和网络带宽
快照限流默认是关闭的,用户需要实现 SnapshotThrottle ,并在构建 Node
时将其通过 NodeOptions
传递给 braft:
Copy struct NodeOptions {
...
// If non-null, we will pass this snapshot_throttle to SnapshotExecutor
// Default: NULL
scoped_refptr < SnapshotThrottle >* snapshot_throttle;
...
};
当然了,框架也提供了默认的 SnapshotThrottle
实现,即 ThroughputSnapshotThrottle ,具体算法实现见限流算法 ,用户构造时可控制带宽大小:
Copy class ThroughputSnapshotThrottle : public SnapshotThrottle {
public :
ThroughputSnapshotThrottle ( int64_t throttle_throughput_bytes , int64_t check_cycle);
...
private :
// user defined throughput threshold for raft, bytes per second
int64_t _throttle_throughput_bytes;
// user defined check cycles of throughput per second
int64_t _check_cycle;
...
};
此外,ThroughputSnapshotThrottle 还提供了一些动态配置项来控制并发任务个数以及带宽大小:
Copy // used to increase throttle threshold dynamically when user-defined
// threshold is too small in extreme cases.
// notice that this flag does not distinguish disk types(sata or ssd, and so on)
DEFINE_bool (raft_enable_throttle_when_install_snapshot , true ,
"enable throttle when install snapshot, for both leader and follower" );
::brpc :: PassValidate);
DEFINE_int64 (raft_minimal_throttle_threshold_mb , 0 ,
"minimal throttle throughput threshold per second" );
brpc :: NonNegativeInteger);
DEFINE_int32 (raft_max_install_snapshot_tasks_num , 1000 ,
"Max num of install_snapshot tasks per disk at the same time" );
brpc :: PositiveInteger);
由于传递的限流对象是一个指针,用户可以通过指定哪些 Node
共用同一个限流对象,来实现各个级别的限流策略,如针对每块盘或每个 Raft Group
。
相关 RPC
安装快照 RPC:
Copy message SnapshotMeta {
required int64 last_included_index = 1 ;
required int64 last_included_term = 2 ;
repeated string peers = 3 ;
repeated string old_peers = 4 ;
}
message InstallSnapshotRequest {
required string group_id = 1 ;
required string server_id = 2 ;
required string peer_id = 3 ;
required int64 term = 4 ;
required SnapshotMeta meta = 5 ;
required string uri = 6 ;
};
message InstallSnapshotResponse {
required int64 term = 1 ;
required bool success = 2 ;
};
service RaftService {
rpc install_snapshot (InstallSnapshotRequest) returns (InstallSnapshotResponse);
};
下载文件 RPC:
Copy message GetFileRequest {
required int64 reader_id = 1 ;
required string filename = 2 ;
required int64 count = 3 ;
required int64 offset = 4 ;
optional bool read_partly = 5 ;
}
message GetFileResponse {
// Data is in attachment
required bool eof = 1 ;
optional int64 read_size = 2 ;
}
service FileService {
rpc get_file (GetFileRequest) returns (GetFileResponse);
}
阶段一:Leader 下发指令
触发安装快照
Leader 在给 Follower 同步日志时,发现 Follower 需要的日志已经被压缩掉了,就会调用 _install_snapshot
向 Follower 下发安装快照的指令:
Copy // (1) 正常同步日志会调用 `_send_entries`
void Replicator :: _send_entries () {
// (2) 调用 _fill_common_fields 判断 Follower 需要的日志是否还存在
// `_next_index` 为下一条需要同步给 Follower 的日志
if ( _fill_common_fields ( request . get () , _next_index - 1 , false ) != 0 ) {
...
// (4) 调用 `_install_snapshot` 下发安装快照指令
return _install_snapshot ();
}
...
}
int Replicator :: _fill_common_fields ( AppendEntriesRequest * request ,
int64_t prev_log_index ,
bool is_heartbeat) {
// (3) 获取 index 日志对应的 term
const int64_t prev_log_term = _options . log_manager -> get_term (prev_log_index);
// (4) 如果 term 不存在则代表该日志已经被压缩,返回 -1
if (prev_log_term == 0 && prev_log_index != 0 ) {
if ( ! is_heartbeat) {
...
return - 1 ;
} else {
}
}
...
return 0 ;
}
发送请求
_install_snapshot
会向 Follower 发送 InstallSnapshot
请求,详见以下注释:
Copy void Replicator :: _install_snapshot () {
...
// (1) 打开本地最新快照,并获得快照元数据
_reader = _options . snapshot_storage -> open ();
...
std :: string uri = _reader -> generate_uri_for_copy ();
...
SnapshotMeta meta;
// report error on failure
if ( _reader -> load_meta ( & meta) != 0 ) {
...
}
// (2) 设置 `InstallSnapshot` 中的 meta 和 URI 等字段
brpc :: Controller * cntl = new brpc :: Controller;
InstallSnapshotRequest * request = new InstallSnapshotRequest ();
InstallSnapshotResponse * response = new InstallSnapshotResponse ();
...
request -> mutable_meta () -> CopyFrom (meta);
request -> set_uri (uri);
// (3) 发送 `InstallSnapshot` 请求
RaftService_Stub stub ( & _sending_channel);
stub . install_snapshot (cntl , request , response , done);
CHECK_EQ ( 0 , bthread_id_unlock (_id)) << "Fail to unlock " << _id;
}
generate_uri_for_copy
会返回下载的 URI
,其格式为 remote://ip:port/reader_id
:
Copy std :: string LocalSnapshotReader :: generate_uri_for_copy () {
if (_addr == butil :: EndPoint ()) {
LOG (ERROR) << "Address is not specified, path:" << _path;
return std :: string ();
}
if (_reader_id == 0 ) {
// TODO: handler referenced files
scoped_refptr < SnapshotFileReader > reader (
new SnapshotFileReader ( _fs . get () , _path , _snapshot_throttle . get ()));
reader -> set_meta_table (_meta_table);
if ( ! reader -> open ()) {
...
return std :: string ();
}
if ( file_service_add ( reader . get () , & _reader_id) != 0 ) {
...
return std :: string ();
}
}
oss << "remote://" << _addr << "/" << _reader_id;
return oss . str ();
}
处理请求
Follower 在接收到 InstallSnapshot
请求后,会调用 handle_install_snapshot_request
处理,而该处理函数最终会调用 SnapshotExecutor::install_snapshot
来安装快照:
Copy void NodeImpl :: handle_install_snapshot_request (brpc :: Controller * cntl ,
const InstallSnapshotRequest * request ,
InstallSnapshotResponse * response ,
google :: protobuf :: Closure * done) {
...
return _snapshot_executor -> install_snapshot (
cntl , request , response , done_guard . release ());
}
install_snapshot
主要执行以下几件事:
Copy void SnapshotExecutor :: install_snapshot (brpc :: Controller * cntl ,
const InstallSnapshotRequest * request ,
InstallSnapshotResponse * response ,
google :: protobuf :: Closure * done) {
...
std :: unique_ptr < DownloadingSnapshot > ds ( new DownloadingSnapshot);
...
// (1) 调用 register_downloading_snapshot 从 Leader 下载快照
ret = register_downloading_snapshot ( ds . get ());
...
// (2) 等待快照下载完成;下载快照为单独的 bthread
_cur_copier -> join ();
...
// (3) 调用 load_downloading_snapshot 加载快照
return load_downloading_snapshot ( ds . release () , meta);
}
阶段二:Follower 下载快照
准备下载快照
register_downloading_snapshot
会创建客户端,并启动客户端来下载快照:
Copy int SnapshotExecutor :: register_downloading_snapshot ( DownloadingSnapshot * ds) {
...
// (1) 如果当前正在创建快照,返回 `EBUSY`
if (_saving_snapshot) {
...
ds -> cntl -> SetFailed (EBUSY , "Is saving snapshot" );
return - 1 ;
}
...
DownloadingSnapshot * m = _downloading_snapshot . load (
butil :: memory_order_relaxed);
if ( ! m) {
_downloading_snapshot . store (ds , butil :: memory_order_relaxed);
// (2) 调用 `LocalSnapshotStorage::start_to_copy_from` 开始下载快照
// _cur_copier: LocalSnapshotCopier
_cur_copier = _snapshot_storage -> start_to_copy_from ( ds -> request -> uri ());
}
}
SnapshotCopier * LocalSnapshotStorage :: start_to_copy_from ( const std :: string & uri) {
// (3) 调用 LocalSnapshotCopier::init 初始化客户端
LocalSnapshotCopier * copier = new LocalSnapshotCopier (_copy_file);
if ( copier -> init (uri) != 0 ) { // copier: LocalSnapshotCopier
}
...
// (4) 调用 `LocalSnapshotCopier::start` 启动客户端,开始下载
copier -> start ();
return copier;
}
初始化客户端
LocalSnapshotCopier::init
会初始化一个 RPC Client 用于下载快照:
Copy int LocalSnapshotCopier :: init ( const std :: string & uri) {
return _copier . init (uri , _fs , _throttle); // _copier: RemoteFileCopier
}
int RemoteFileCopier :: init ( const std :: string & uri , FileSystemAdaptor * fs ,
SnapshotThrottle * throttle) {
// (1) 根据 InstallSnapshotRequest 中 uri,解析出 Leader 的 RPC 服务地址
// Parse uri format: remote://ip:port/reader_id
static const size_t prefix_size = strlen ( "remote://" );
butil :: StringPiece uri_str (uri);
... // 这里忽略解析过程
butil :: StringPiece ip_and_port = uri_str . substr ( 0 , slash_pos);
...
// (2) 根据解析出来的 Leader 的 RPC 服务地址,初始化连接其的 `channel`
// 该 channel 用于从 Leader 下载快照文件
brpc :: ChannelOptions channel_opt;
...
if ( _channel . Init ( ip_and_port . as_string () . c_str () , & channel_opt) != 0 ) {
...
return - 1 ;
}
...
return 0 ;
}
启动客户端
LocalSnapshotCopier::start
会创建单独的 bthread
来运行 LocalSnapshotCopier::copy
,而该函数就正式开始下载快照:
Copy void LocalSnapshotCopier :: start () {
if ( bthread_start_background (
& _tid , NULL , start_copy , this ) != 0 ) {
...
}
}
void * LocalSnapshotCopier :: start_copy ( void* arg) {
LocalSnapshotCopier * c = (LocalSnapshotCopier * )arg;
c -> copy ();
return NULL ;
}
下载快照流程
copy
函数中描述的是整个下载流程,待全部下载完毕后,会调用 LocalSnapshotStorage::close
将临时快照转为正式快照,其中的每一个步骤,我们都将在下面一一介绍:
Copy void LocalSnapshotCopier :: copy () {
// (1) 下载步骤
do {
// (1.1) 首先下载快照元数据,保存在 _remote_snapshot
load_meta_table ();
...
// (1.2) 过滤掉没必要下载的文件
filter ();
...
// (1.3) 根据元数据中的文件列表,调用 `copy_file` 逐一下载保存至本地临时快照
std :: vector < std :: string > files;
_remote_snapshot . list_files ( & files);
for ( size_t i = 0 ; i < files . size () && ok (); ++ i) {
copy_file ( files [i]);
}
} while ( 0 );
...
// (2) 下载完成后,调用 `LocalSnapshotStorage::close` 将临时快照转为正式快照
// 详见以下 <阶段三:转为正式快照>
if (_writer) {
if ( _storage -> close (_writer , _filter_before_copy_remote) != 0 && ok ()) {
...
}
}
...
}
下载快照元数据
调用 RemoteFileCopier::start_to_copy_to_iobuf
下载快照元数据,等待其下载完成后,保存至 _remote_snapshot
:
Copy void LocalSnapshotCopier :: load_meta_table () {
butil :: IOBuf meta_buf;
...
// (1) 开始下载快照元数据
// BRAFT_SNAPSHOT_META_FILE: __raft_snapshot_meta
scoped_refptr < RemoteFileCopier :: Session > session
= _copier . start_to_copy_to_iobuf (BRAFT_SNAPSHOT_META_FILE ,
& meta_buf , NULL );
...
// (2) 等待下载完成
_cur_session = session . get ();
session -> join ();
_cur_session = NULL ;
// (3) 将元数据保存在 `_remote_snapshot`
if ( _remote_snapshot . _meta_table . load_from_iobuf_as_remote (meta_buf) != 0 ) {
..
..
return ;
}
}
安装快照流程中的所有下载操作都会调用 start_to_copy_to_iobuf
,该函数接受一个 source
,向 Leader 发送 GetFile
请求,将下载的内容保存在 dest_buf
中。特别地,针对大文件,会采用分片的方式进行传输:
Copy scoped_refptr < RemoteFileCopier :: Session >
RemoteFileCopier :: start_to_copy_to_iobuf (
const std :: string & source ,
butil :: IOBuf * dest_buf ,
const CopyOptions * options) {
...
// (1) 准备好请求
scoped_refptr < Session > session ( new Session ());
...
session -> _buf = dest_buf;
session -> _request . set_filename (source);
...
session -> _channel = & _channel;
...
// (2) 发送 `GetFile` 请求
session -> send_next_rpc ();
return session;
}
/*
* message GetFileRequest {
* required int64 reader_id = 1;
* required string filename = 2;
* required int64 count = 3;
* required int64 offset = 4;
* optional bool read_partly = 5;
* }
*/
void RemoteFileCopier :: Session :: send_next_rpc () {
// (3) 设置 offset
// _request.offset() 是上一次发送的 offset
// Not clear request as we need some fields of the previous RPC
off_t offset = _request . offset () + _request . count ();
// (4) FLAGS_raft_max_byte_count_per_rpc 默认是 128KB
// 如果文件大小超过 128KB,则重新计算 count
const size_t max_count =
( ! _buf) ? FLAGS_raft_max_byte_count_per_rpc : UINT_MAX;
_cntl . set_timeout_ms ( _options . timeout_ms);
_request . set_offset (offset);
...
size_t new_max_count = max_count;
...
_request . set_count (new_max_count);
// (5) 发送 `GetFile` 请求
_rpc_call = _cntl . call_id ();
FileService_Stub stub (_channel);
AddRef (); // Release in on_rpc_returned
return stub . get_file ( & _cntl , & _request , & _response , & _done);
}
过滤下载列表
在正式下载文件前,我们会过滤掉本地拥有的文件,具体规则详见以上断点续传 。
Copy void LocalSnapshotCopier :: filter () {
_writer = (LocalSnapshotWriter * ) _storage -> create ( ! _filter_before_copy_remote);
...
if (_filter_before_copy_remote) { // true
SnapshotReader * reader = _storage -> open ();
if ( filter_before_copy (_writer , reader) != 0 ) {
...
}
...
}
_writer -> save_meta ( _remote_snapshot . _meta_table . meta ());
if ( _writer -> sync () != 0 ) {
...
return ;
}
}
Copy int LocalSnapshotCopier :: filter_before_copy ( LocalSnapshotWriter * writer ,
SnapshotReader * last_snapshot) {
// (1) 首先获取本地临时快照 temp 目录中的文件列表
std :: vector < std :: string > existing_files;
writer -> list_files ( & existing_files);
std :: vector < std :: string > to_remove;
for ( size_t i = 0 ; i < existing_files . size (); ++ i) {
// (2) 如果远程(Leader)快照不存在该文件,则将该文件从 temp 目录中移除
if ( _remote_snapshot . get_file_meta ( existing_files [i] , NULL ) != 0 ) {
to_remove . push_back ( existing_files [i]);
writer -> remove_file ( existing_files [i]); // 将文件名从元数据表中移除
}
}
// (3) 从元数据中获取远程快照的文件列表
std :: vector < std :: string > remote_files;
_remote_snapshot . list_files ( & remote_files);
for ( size_t i = 0 ; i < remote_files . size (); ++ i) {
const std :: string & filename = remote_files [i];
LocalFileMeta remote_meta;
if ( ! remote_meta . has_checksum ()) {
// Redownload file if this file doen't have checksum
writer -> remove_file (filename);
to_remove . push_back (filename);
continue ;
}
// (3.2) temp 目录已经存在
LocalFileMeta local_meta;
if ( writer -> get_file_meta (filename , & local_meta) == 0 ) { // temp 目录有
if ( local_meta . has_checksum () &&
local_meta . checksum () == remote_meta . checksum ()) {
...
continue ;
}
// Remove files from writer so that the file is to be copied from
// remote_snapshot or last_snapshot
writer -> remove_file (filename);
to_remove . push_back (filename);
}
// (3.3) 再在本地的上一个快照中找
// Try find files in last_snapshot
if ( ! last_snapshot) {
continue ;
}
if ( last_snapshot -> get_file_meta (filename , & local_meta) != 0 ) {
continue ;
}
if ( ! local_meta . has_checksum () || local_meta . checksum () != remote_meta . checksum ()) {
continue ;
}
...
if ( local_meta . source () == braft :: FILE_SOURCE_LOCAL) {
std :: string source_path = last_snapshot -> get_path () + '/'
+ filename;
std :: string dest_path = writer -> get_path () + '/'
+ filename;
_fs -> delete_file (dest_path , false );
if ( ! _fs -> link (source_path , dest_path)) {
continue ;
}
// Don't delete linked file
if ( ! to_remove . empty () && to_remove . back () == filename) {
to_remove . pop_back ();
}
}
// Copy file from last_snapshot
writer -> add_file (filename , & local_meta);
}
if ( writer -> sync () != 0 ) {
LOG (ERROR) << "Fail to sync writer on path=" << writer -> get_path ();
return - 1 ;
}
// 删除临时快照中需要删除的文件
for ( size_t i = 0 ; i < to_remove . size (); ++ i) {
std :: string file_path = writer -> get_path () + "/" + to_remove [i];
_fs -> delete_file (file_path , false );
}
return 0 ;
}
创建临时目录
首先会调用 LocalSnapshotStorage::create
创建一个 temp
目录用来保存下载的临时快照,并返回 SnapshotWriter
。注意,我们在创建本地快照时,也有一样的创建流程。唯一的区别在于用于保存下载快照的 temp
目录即使事先存在也不会删除,主要是为了实现我们上面提到的断点续传功能:
Copy SnapshotWriter * LocalSnapshotStorage :: create ( bool from_empty) {
LocalSnapshotWriter * writer = NULL ;
do {
std :: string snapshot_path (_path); // _path 为用户配置的快照存储目录
snapshot_path . append ( "/" );
snapshot_path . append (_s_temp_path); // e.g. data/temp
// (1) 因为 `from_empty` 为 False,所以有 `temp` 目录的话
// 将不会删除,而是直接返回 `SnapshotWriter`
// delete temp
// TODO: Notify watcher before deleting
if ( _fs -> path_exists (snapshot_path) && from_empty) {
if ( destroy_snapshot (snapshot_path) != 0 ) {
break ;
}
}
// (2) 如果不存在的话,则调用 `LocalSnapshotWriter::init` 创建 temp 目录
writer = new LocalSnapshotWriter (snapshot_path , _fs . get ());
if ( writer -> init () != 0 ) {
...
break ;
}
} while ( 0 );
return writer;
}
遍历用户指定的快照目录,保存最近的一个快照,删除其余全部快照:
Copy int LocalSnapshotWriter :: init () {
butil :: File :: Error e;
// (3) 创建 temp 目录
if ( ! _fs -> create_directory (_path , & e , false )) {
...
return EIO;
}
...
return 0 ;
}
逐一下载文件
在上面的下载快照流程 copy
函数中,我们已经介绍过,会根据快照元数据中的文件列表,逐一从 Leader 下载快照文件:
Copy void LocalSnapshotCopier :: copy () {
do {
...
// (1) 获取文件列表
std :: vector < std :: string > files;
_remote_snapshot . list_files ( & files);
// (2) 逐一下载
for ( size_t i = 0 ; i < files . size () && ok (); ++ i) {
copy_file ( files [i]);
}
} while ( 0 );
...
}
调用 list_files
获取快照元数据中的文件列表:
Copy void LocalSnapshot :: list_files (std :: vector <std :: string > * files) {
return _meta_table . list_files (files); // _meta_table: LocalSnapshotMetaTable
}
void LocalSnapshotMetaTable :: list_files (std :: vector <std :: string > * files) const {
...
for (Map :: const_iterator
iter = _file_map . begin (); iter != _file_map . end (); ++ iter) {
files -> push_back ( iter -> first);
}
}
调用 copy_file
下载指定文件,参见以下详情注释:
Copy void LocalSnapshotCopier :: copy_file ( const std :: string & filename) {
...
// (1) 如果路径中还有目录,则在本地创建子目录
std :: string file_path = _writer -> get_path () + '/' + filename;
butil :: FilePath sub_path (filename);
if (sub_path != sub_path . DirName () && sub_path . DirName () . value () != "." ) {
...
if (FLAGS_raft_create_parent_directories) {
butil :: FilePath sub_dir =
butil :: FilePath ( _writer -> get_path ()) . Append ( sub_path . DirName ());
rc = _fs -> create_directory ( sub_dir . value () , & e , true );
}
...
}
// (2) 调用 `RemoteFileCopier::start_to_copy_to_file` 从 Leader 下载文件存至临时快照
scoped_refptr < RemoteFileCopier :: Session > session
= _copier . start_to_copy_to_file (filename , file_path , NULL );
...
_cur_session = session . get ();
..
session -> join ();
...
// (3) 每成功下载一个文件,则将其添加至临时快照元数据中
if ( _writer -> add_file (filename , & meta) != 0 ) {
...
return ;
}
// (4) 每成功下载一个文件,则将临时快照元数据中持久化,
// 也就是说每次都会覆盖快照元数据,这么做主要是为了不重复下载文件
if ( _writer -> sync () != 0 ) {
...
return ;
}
}
阶段三:转为正式快照
在上面的下载快照流程 copy
函数中,我们已经介绍过,当快照中的所有文件都下载完毕后,就会调用 LocalSnapshotStorage::close
将下载的临时快照转为正式快照:
Copy void LocalSnapshotCopier :: copy () {
// (1) 各种下载流程
...
// (2) 调用 `LocalSnapshotStorage::close`
if (_writer) {
// set_error for copier only when failed to close writer and copier was
// ok before this moment
if ( _storage -> close (_writer , _filter_before_copy_remote) != 0 && ok ()) {
...
}
...
}
...
}
写入元数据
rename 成正式快照
在 close
函数中主要以下做四件事:
(2) 通过 rename()
将临时快照变为正式快照
Copy int LocalSnapshotStorage :: close ( SnapshotWriter * writer_base ,
bool keep_data_on_error) {
LocalSnapshotWriter * writer = dynamic_cast< LocalSnapshotWriter *> (writer_base);
do {
...
// (1) 将快照元数据写入文件
ret = writer -> sync ();
...
//
int old_index = _last_snapshot_index;
int64_t new_index = writer -> snapshot_index ();
// (2) 将临时快照 rename 成正式快照
// rename temp to new
std :: string temp_path (_path);
temp_path . append ( "/" );
temp_path . append (_s_temp_path);
std :: string new_path (_path);
butil :: string_appendf ( & new_path , "/" BRAFT_SNAPSHOT_PATTERN , new_index);
if ( ! _fs -> delete_file (new_path , true )) {
...
}
...
if ( ! _fs -> rename (temp_path , new_path)) {
...
}
ref (new_index);
{
BAIDU_SCOPED_LOCK (_mutex);
CHECK_EQ (old_index , _last_snapshot_index);
_last_snapshot_index = new_index;
}
// unref old_index, ref new_index
// (3) 删除上一个快照。
// 需要注意的是,这里有一个引用判断
// 因为当前节点可能是 Leader,而该快照可能正用于下载给 Follower
unref (old_index);
} while ( 0 );
...
}
sync
会调用 save_to_file
将元数据填充到 proto
(LocalSnapshotPbMeta
)中,并将其序列化,最终持久化到文件:
Copy int LocalSnapshotWriter :: sync () {
// BRAFT_SNAPSHOT_META_FILE: __raft_snapshot_meta
const int rc = _meta_table . save_to_file (_fs , _path + "/" BRAFT_SNAPSHOT_META_FILE);
...
return rc;
}
int LocalSnapshotMetaTable :: save_to_file ( FileSystemAdaptor * fs , const std :: string & path) const {
// (1) _meta 中保存的是 lastIncludeIndex,lastIncludedTerm 以及集群配置
LocalSnapshotPbMeta pb_meta;
if ( _meta . IsInitialized ()) {
* pb_meta . mutable_meta () = _meta;
}
// (2) 将所有文件列表加入到 proto
for (Map :: const_iterator
iter = _file_map . begin (); iter != _file_map . end (); ++ iter) {
LocalSnapshotPbMeta :: File * f = pb_meta . add_files ();
f -> set_name ( iter -> first);
* f -> mutable_meta () = iter -> second;
}
// (3) 序列化并持久化到文件
ProtoBufFile pb_file (path , fs);
int ret = pb_file . save ( & pb_meta , raft_sync_meta ());
...
return ret;
}
删除上一个快照
调用 unref
删除上一个快照。需要注意的是,当前节点可能是 Leader,而该快照可能正用于下载给其他 Follower,所以需要判断其引用计数,若引用计数为 0,则删除其目录:
Copy // (1) index 为上一个快照的 lastIncludeIndex
void LocalSnapshotStorage :: unref ( const int64_t index) {
std :: unique_lock < raft_mutex_t > lck (_mutex);
std :: map< int64_t , int > :: iterator it = _ref_map . find (index);
// (2) 找到上一个快照
if (it != _ref_map . end ()) {
// (3) 将其引用计数减一
it -> second -- ;
// (4) 如果减到 0,则将其删除
if ( it -> second == 0 ) {
_ref_map . erase (it);
std :: string old_path (_path);
butil :: string_appendf ( & old_path , "/" BRAFT_SNAPSHOT_PATTERN , index);
destroy_snapshot (old_path);
}
}
}
// (5) 删除上一个快照的目录
int LocalSnapshotStorage :: destroy_snapshot ( const std :: string & path) {
if ( ! _fs -> delete_file (path , true )) {
...
return - 1 ;
}
return 0 ;
}
阶段四:Follower 加载快照
在将下载来的临时快照转为正式快照后,节点就开始加载快照。加载快照的流程我们在<5.3 加载快照> 已有详细介绍,在这里就不重复介绍了。
当快照加载完毕后,运行 Closure
时会发送 InstallSnapshot
响应给 Leader。
阶段五:Leader 处理响应
Leader 在收到 InstallSnapshot
响应后,会调用 _on_install_snapshot_returned
进行处理。在该函数中会根据响应的结果,做出不同的决策:
Copy void Replicator :: _on_install_snapshot_returned (
ReplicatorId id , brpc :: Controller * cntl ,
InstallSnapshotRequest * request ,
InstallSnapshotResponse * response) {
...
do {
if ( cntl -> Failed ()) {
...
succ = false ;
break ;
}
if ( ! response -> success ()) {
succ = false ;
...
break ;
}
// (1) 如果安装快照成功,则将 Follower 的 nextIndex 设置为快照的 lastIncludedIndex + 1
// Success
r -> _next_index = request -> meta () . last_included_index () + 1 ;
...
} while ( 0 );
// (2) 如果失败,则先阻塞当前 Replicator 一会儿,最终仍会再次安装快照
// We don't retry installing the snapshot explicitly.
// dummy_id is unlock in _send_entries
if ( ! succ) {
return r -> _block (butil :: gettimeofday_us () , cntl -> ErrorCode ());
}
// (1.1) 如果成功的话,则继续向 Follower 同步日志
// dummy_id is unlock in _send_entries
return r -> _send_entries ();
}