当任务被执行时,会创建一个 temp
目录用来保存临时快照,并返回一个 SnapshotWriter
以 SnapshotWriter
和 Closure
做为参数调用用户状态机的 on_snapshot_save
用户需通过 SnapshotWriter
5.1 调用 SnapshotWriter::get_path
5.2 调用 SnapshotWriter::add_file
待数据写入完成,用户需回调 Closure
6.2 通过 rename()
流程整体分为以下 3 个阶段:创建临时快照(1-3),用户写入数据(4-5),转为正式快照(6-7)
正常情况下用户指定的快照存储目录下只会有一个快照目录。当创建快照时,会创建一个 temp
目录来保存临时快照,待创建完成后,通过 rename
正式快照目录以当时创建快照时的 applyIndex
命名,如 snapshot_00000000000000002000
快照目录下除了用户写入的快照文件集合外,还有一个框架写入的元数据文件 __raft_snapshot_meta
,元数据主要保存以下 2 部分信息:
快照中包含的最后日志的 index
和 term
参见以下元数据 proto
Copy enum FileSource {
message LocalFileMeta {
optional bytes user_meta = 1 ;
optional FileSource source = 2 ;
optional string checksum = 3 ;
message SnapshotMeta {
required int64 last_included_index = 1 ;
required int64 last_included_term = 2 ;
repeated string peers = 3 ;
repeated string old_peers = 4 ;
message LocalSnapshotPbMeta {
message File {
required string name = 1 ;
optional LocalFileMeta meta = 2 ;
optional SnapshotMeta meta = 1 ; // (1) 快照元数据
repeated File files = 2 ; // (2) 用户快照文件列表
Copy class Node {
public :
// Start a snapshot immediately if possible. done->Run() would be invoked
// when the snapshot finishes, describing the detailed result.
void snapshot ( Closure * done);
Copy class StateMachine {
public :
// user defined snapshot generate function, this method will block on_apply.
// user can make snapshot async when fsm can be cow(copy-on-write).
// call done->Run() when snapshot finished.
// success return 0, fail return errno
// Default: Save nothing and returns error.
virtual void on_snapshot_save (::braft :: SnapshotWriter * writer ,
::braft :: Closure * done);
Copy class SnapshotWriter : public Snapshot {
public :
// Add a file to the snapshot.
// |file_meta| is an implmentation-defined protobuf message
// All the implementation must handle the case that |file_meta| is NULL and
// no error can be raised.
// Note that whether the file will be created onto the backing storage is
// implementation-defined.
virtual int add_file ( const std :: string & filename);
// Remove a file from the snapshot
// Note that whether the file will be removed from the backing storage is
// implementation-defined.
virtual int remove_file ( const std :: string & filename) = 0 ;
class Snapshot : public butil :: Status {
public :
Snapshot () {}
virtual ~Snapshot () {}
// Get the path of the Snapshot
virtual std :: string get_path () = 0 ;
// List all the existing files in the Snapshot currently
virtual void list_files (std :: vector <std :: string > * files) = 0 ;
// Get the implementation-defined file_meta
virtual int get_file_meta ( const std :: string & filename ,
::google :: protobuf :: Message * file_meta) {
( void )filename;
if (file_meta != NULL ) {
file_meta -> Clear ();
return 0 ;
节点有以下 2 种方式触发快照任务,其最终都是调用 NodeImpl::do_snapshot
Copy void SnapshotTimer :: run () {
_node -> handle_snapshot_timeout ();
void NodeImpl :: handle_snapshot_timeout () {
do_snapshot ( NULL );
Copy void Node :: snapshot ( Closure * done) {
_impl -> snapshot (done);
void NodeImpl :: snapshot ( Closure * done) {
do_snapshot (done);
会调用 SnapshotExecutor::do_snapshot
Copy void NodeImpl :: do_snapshot ( Closure * done) {
if (_snapshot_executor) {
_snapshot_executor -> do_snapshot (done);
void SnapshotExecutor :: do_snapshot ( Closure * done) {
// (1) 判断 1:正在从 Leader 下载快照
// check snapshot install/load
if ( _downloading_snapshot . load (butil :: memory_order_relaxed)) {
return ;
// (2) 判断 2:已经有创建快照任务在运行了
// check snapshot saving?
if (_saving_snapshot) {
return ;
// (3) 判断 3:如果上次创建快照到目前为止被应用的日志极少,则放弃本次任务
// FLAGS_raft_do_snapshot_min_index_gap 默认是 1
int64_t saved_fsm_applied_index = _fsm_caller -> last_applied_index ();
if (saved_fsm_applied_index - _last_snapshot_index <
FLAGS_raft_do_snapshot_min_index_gap) {
// There might be false positive as the last_applied_index() is being
// updated. But it's fine since we will do next snapshot saving in a
// predictable time.
_log_manager -> clear_bufferred_logs (); // TODO(Wine93)
return ;
// (4) 调用 LocalSnapshotStorage::create 创建 temp 目录用于保存临时快照
// 并返回 SnapshotWriter
SnapshotWriter * writer = _snapshot_storage -> create ();
_saving_snapshot = true ;
// 当用户保存完快照后,将回调这个 Closure,见阶段三
SaveSnapshotDone * snapshot_save_done = new SaveSnapshotDone ( this , writer , done);
// (5) 将创建快照任务放入 ApplyTaskQueue 中等待被执行
if ( _fsm_caller -> on_snapshot_save (snapshot_save_done) != 0 ) {
return ;
会创建 temp
目录,若事先已存在 temp
目录,则先将其删除后再创建,并返回 SnapshotWriter
Copy SnapshotWriter * LocalSnapshotStorage :: create () {
return create ( true );
SnapshotWriter * LocalSnapshotStorage :: create ( bool from_empty) {
LocalSnapshotWriter * writer = NULL ;
do {
std :: string snapshot_path (_path); // _path 为用户配置的快照存储目录
snapshot_path . append ( "/" );
snapshot_path . append (_s_temp_path); // e.g. data/temp
// (1) 删除 temp 目录
// delete temp
// TODO: Notify watcher before deleting
if ( _fs -> path_exists (snapshot_path) && from_empty) {
if ( destroy_snapshot (snapshot_path) != 0 ) {
break ;
// (2) 调用 LocalSnapshotWriter::init 创建 temp 目录
writer = new LocalSnapshotWriter (snapshot_path , _fs . get ());
if ( writer -> init () != 0 ) {
break ;
} while ( 0 );
return writer;
int LocalSnapshotWriter :: init () {
butil :: File :: Error e;
// (3) 创建 temp 目录
if ( ! _fs -> create_directory (_path , & e , false )) {
return EIO;
return 0 ;
创建好 temp
目录后,会调用 FSMCaller::on_snapshot_save
将快照任务放入 ApplyTaskQueue ,等待其被执行:
Copy int FSMCaller :: on_snapshot_save ( SaveSnapshotClosure * done) {
ApplyTask task;
task . type = SNAPSHOT_SAVE;
task . done = done;
return bthread :: execution_queue_execute (_queue_id , task);
队列消费函数会调用 FSMCaller::do_snapshot_save
Copy int FSMCaller :: run ( void* meta , bthread :: TaskIterator < ApplyTask > & iter) {
for (; iter; ++ iter) {
switch ( iter -> type) {
caller -> _cur_task = SNAPSHOT_SAVE;
if ( caller -> pass_by_status ( iter -> done)) {
caller -> do_snapshot_save ((SaveSnapshotClosure * ) iter -> done);
break ;
在 FSMCaller::do_snapshot_save
函数中主要做以下 2 件事:
将上述创建的 SnapshotWriter
和 Closure
作为参数调用用户状态机的 on_snapshot_save
Copy void FSMCaller :: do_snapshot_save ( SaveSnapshotClosure * done) {
int64_t last_applied_index = _last_applied_index . load (butil :: memory_order_relaxed); // applyIndex
SnapshotMeta meta;
// (1.1) 最后一条应用日志的 index
meta . set_last_included_index (last_applied_index);
// (1.2) 最后一条应用日志的 term
meta . set_last_included_term (_last_applied_term);
// (1.3) 当前节点的配置
// 若 old_peers 不为空,则为配置变更的 C{old,new}
// 否则为当前集群的配置,即 C{new}
ConfigurationEntry conf_entry;
_log_manager -> get_configuration (last_applied_index , & conf_entry);
for (Configuration :: const_iterator
iter = conf_entry . conf . begin ();
iter != conf_entry . conf . end (); ++ iter) {
* meta . add_peers () = iter -> to_string ();
for (Configuration :: const_iterator
iter = conf_entry . old_conf . begin ();
iter != conf_entry . old_conf . end (); ++ iter) {
* meta . add_old_peers () = iter -> to_string ();
// (1.4) 将元数据保存在 Closure 中
SnapshotWriter * writer = done -> start (meta);
// (2) 调用用户状态机 on_snapshot_save
_fsm -> on_snapshot_save (writer , done);
return ;
用户需要实现状态机的 on_snapshot_save
函数,在该函数中用户需要做以下 3 件事:
调用 writer->get_path
调用 write->add
待以上全部完成后,调用 done->Run
Copy class StateMachine {
public :
// user defined snapshot generate function, this method will block on_apply.
// user can make snapshot async when fsm can be cow(copy-on-write).
// call done->Run() when snapshot finished.
// success return 0, fail return errno
// Default: Save nothing and returns error.
virtual void on_snapshot_save (::braft :: SnapshotWriter * writer ,
::braft :: Closure * done);
Copy class LocalSnapshotWriter : public SnapshotWriter {
public :
virtual std :: string get_path () { return _path; }
private :
// Users shouldn't create LocalSnapshotWriter Directly
LocalSnapshotWriter ( const std :: string & path ,
FileSystemAdaptor * fs);
std :: string _path;
Copy class SnapshotWriter : public Snapshot {
public :
// Add a file to the snapshot.
// |file_meta| is an implmentation-defined protobuf message
// All the implementation must handle the case that |file_meta| is NULL and
// no error can be raised.
// Note that whether the file will be created onto the backing storage is
// implementation-defined.
virtual int add_file ( const std :: string & filename) {
return add_file (filename , NULL );
virtual int add_file ( const std :: string & filename ,
const ::google :: protobuf :: Message * file_meta) = 0 ;
Copy int LocalSnapshotWriter :: add_file (
const std :: string & filename ,
const ::google :: protobuf :: Message * file_meta) {
return _meta_table . add_file (filename , meta);
int LocalSnapshotMetaTable :: add_file ( const std :: string & filename ,
const LocalFileMeta & meta) {
Map :: value_type value (filename , meta);
std :: pair < Map :: iterator , bool> ret = _file_map . insert (value);
return ret . second ? 0 : - 1 ;
调用 Closure
用户完成快照的创建后,需调用 Closure
,即 SaveSnapshotDone::Run()
,而该函数会将临时快照 rename
用户调用 SaveSnapshotDone::Run
,该函数主要执行以下 2 件事:
Copy void SaveSnapshotDone :: Run () {
bthread_t tid;
if ( bthread_start_urgent ( & tid , NULL , continue_run , this ) != 0 ) {
void* SaveSnapshotDone :: continue_run ( void* arg) {
SaveSnapshotDone * self = (SaveSnapshotDone * )arg;
// Must call on_snapshot_save_done to clear _saving_snapshot
// (1) 调用 SnapshotExecutor::on_snapshot_save_done 执行实际的收尾动作
int ret = self -> _se -> on_snapshot_save_done (
self -> status () , self -> _meta , self -> _writer);
// (2) 如果是用户手动触发快照,这时候会回调用户传入的 Closure
if ( self -> _done) {
run_closure_in_bthread ( self -> _done , true );
return NULL ;
Copy int SnapshotExecutor :: on_snapshot_save_done (
const butil :: Status & st , const SnapshotMeta & meta , SnapshotWriter * writer) {
// (1.1) 将快照元数据保存到 LocalSnapshotMetaTable,等待持久化
if ( writer -> save_meta (meta)) {
// (1.2) 调用 LocalSnapshotStorage::close 完成写入元数据、将快照 rename 成正式快照等工作
if ( _snapshot_storage -> close (writer) != 0 ) {
// (1.3) 调用 LogManager::set_snapshot 删除上一个快照对应的日志
if (ret == 0 ) {
_last_snapshot_index = meta . last_included_index ();
_last_snapshot_term = meta . last_included_term ();
_log_manager -> set_snapshot ( & meta);
_saving_snapshot = false ;
return ret;
会将元数据保存到 LocalSnapshotMetaTable
Copy int LocalSnapshotWriter :: save_meta ( const SnapshotMeta & meta) {
_meta_table . set_meta (meta);
return 0 ;
class LocalSnapshotMetaTable {
public :
void set_meta ( const SnapshotMeta & meta) { _meta = meta; }
private :
SnapshotMeta _meta;
rename 成正式快照
在 close
(2) 通过 rename()
Copy int LocalSnapshotStorage :: close ( SnapshotWriter * writer_base ,
bool keep_data_on_error) {
LocalSnapshotWriter * writer = dynamic_cast< LocalSnapshotWriter *> (writer_base);
do {
// (1) 调用 LocalSnapshotWriter::sync 将快照元数据写入文件
ret = writer -> sync ();
int old_index = _last_snapshot_index;
int64_t new_index = writer -> snapshot_index ();
// (2) 将临时快照 rename 成正式快照
// rename temp to new
std :: string temp_path (_path);
temp_path . append ( "/" );
temp_path . append (_s_temp_path);
std :: string new_path (_path);
butil :: string_appendf ( & new_path , "/" BRAFT_SNAPSHOT_PATTERN , new_index);
if ( ! _fs -> delete_file (new_path , true )) {
if ( ! _fs -> rename (temp_path , new_path)) {
ref (new_index);
CHECK_EQ (old_index , _last_snapshot_index);
_last_snapshot_index = new_index;
// unref old_index, ref new_index
// (3) 删除上一个快照。
// 需要注意的是,这里有一个引用判断
// 因为当前节点可能是 Leader,而该快照可能正用于下载给 Follower
unref (old_index);
} while ( 0 );
会调用 save_to_file
将元数据填充到 proto
Copy int LocalSnapshotWriter :: sync () {
// BRAFT_SNAPSHOT_META_FILE: __raft_snapshot_meta
const int rc = _meta_table . save_to_file (_fs , _path + "/" BRAFT_SNAPSHOT_META_FILE);
return rc;
int LocalSnapshotMetaTable :: save_to_file ( FileSystemAdaptor * fs , const std :: string & path) const {
// (1.1) _meta 中保存的是 lastIncludeIndex,lastIncludedTerm 以及集群配置
LocalSnapshotPbMeta pb_meta;
if ( _meta . IsInitialized ()) {
* pb_meta . mutable_meta () = _meta;
// (1.2) 将所有文件列表加入到 proto
for (Map :: const_iterator
iter = _file_map . begin (); iter != _file_map . end (); ++ iter) {
LocalSnapshotPbMeta :: File * f = pb_meta . add_files ();
f -> set_name ( iter -> first);
* f -> mutable_meta () = iter -> second;
// (1.3) 序列化并持久化到文件
ProtoBufFile pb_file (path , fs);
int ret = pb_file . save ( & pb_meta , raft_sync_meta ());
return ret;
调用 unref
删除上一个快照。需要注意的是,当前节点可能是 Leader,而该快照可能正用于下载给其他 Follower,所以需要判断其引用计数,若引用计数为 0,则删除其目录:
Copy // (1) index 为上一个快照的 lastIncludeIndex
void LocalSnapshotStorage :: unref ( const int64_t index) {
std :: unique_lock < raft_mutex_t > lck (_mutex);
std :: map< int64_t , int > :: iterator it = _ref_map . find (index);
// (2) 找到上一个快照
if (it != _ref_map . end ()) {
// (3) 将其引用计数减一
it -> second -- ;
// (4) 如果减到 0,则将其删除
if ( it -> second == 0 ) {
_ref_map . erase (it);
std :: string old_path (_path);
butil :: string_appendf ( & old_path , "/" BRAFT_SNAPSHOT_PATTERN , index);
destroy_snapshot (old_path);
// (5) 删除上一个快照的目录
int LocalSnapshotStorage :: destroy_snapshot ( const std :: string & path) {
if ( ! _fs -> delete_file (path , true )) {
return - 1 ;
return 0 ;
在 Closure
中最后会调用 set_snapshot
删除上一个快照的日志,之所以只删除上一个快照的日志,而不立即删除当前快照的日志,主要考虑到有些 Follower 还没有同步完日志,如果删除了当前的日志,哪怕只差几条日志也只能发送快照进行同步,见以下注释:
Copy void LogManager :: set_snapshot ( const SnapshotMeta * meta) {
// (1) last_but_one_snapshot_id 是上一个快照的 Id
const LogId last_but_one_snapshot_id = _last_snapshot_id;
// Truncating log to the index of the last snapshot.
// We don't truncate log before the latest snapshot immediately since
// some log around last_snapshot_index is probably needed by some
// followers
if ( last_but_one_snapshot_id . index > 0 ) {
// We have last snapshot index
_virtual_first_log_id = last_but_one_snapshot_id;
// (2) 删除上一个快照对应的日志
truncate_prefix ( last_but_one_snapshot_id . index + 1 , lck);
return ;
