classStateMachine{public:// user defined snapshot load function// get and load snapshot// success return 0, fail return errno// Default: Load nothing and returns error.virtualinton_snapshot_load(::braft::SnapshotReader*reader);virtualvoidon_configuration_committed(const ::braft::Configuration&conf,int64_tindex);};
class SnapshotReader : public Snapshot {
public:
// Load meta from
virtual int load_meta(SnapshotMeta* meta) = 0;
// Generate uri for other peers to copy this snapshot.
// Return an empty string if some error has occcured
virtual std::string generate_uri_for_copy() = 0;
};
class Snapshot : public butil::Status {
public:
// Get the path of the Snapshot
virtual std::string get_path() = 0;
// List all the existing files in the Snapshot currently
virtual void list_files(std::vector<std::string> *files) = 0;
// Get the implementation-defined file_meta
virtual int get_file_meta(const std::string& filename,
::google::protobuf::Message* file_meta) {
(void)filename;
if (file_meta != NULL) {
file_meta->Clear();
}
return 0;
}
};
int FSMCaller::run(void* meta, bthread::TaskIterator<ApplyTask>& iter) {
...
for (; iter; ++iter) {
switch (iter->type) {
...
case SNAPSHOT_LOAD:
caller->_cur_task = SNAPSHOT_LOAD;
if (caller->pass_by_status(iter->done)) {
caller->do_snapshot_load((LoadSnapshotClosure*)iter->done);
}
break;
...
};
}
...
return 0;
}
void FSMCaller::do_snapshot_load(LoadSnapshotClosure* done) {
...
SnapshotReader* reader = done->start();
// (1) 获取快照的元数据
SnapshotMeta meta;
int ret = reader->load_meta(&meta);
if (0 != ret) {
...
return;
}
// (2) 调用用户状态机的 on_snapshot_load 加载快照
ret = _fsm->on_snapshot_load(reader);
if (ret != 0) {
done->status().set_error(ret, "StateMachine on_snapshot_load failed");
done->Run();
...
return;
}
// (3) 等待快照加载完成后,
// 获取快照元数据中的节点配置,
// 并以该配置调用用户状态机的 on_configuration_committed
if (meta.old_peers_size() == 0) {
// Joint stage is not supposed to be noticeable by end users.
Configuration conf;
for (int i = 0; i < meta.peers_size(); ++i) {
conf.add_peer(meta.peers(i));
}
_fsm->on_configuration_committed(conf, meta.last_included_index());
}
// (4) 设置 applyIndex 为快照元数据中的 lastIncludeIndex
_last_applied_index.store(meta.last_included_index(),
butil::memory_order_release);
_last_applied_term = meta.last_included_term();
// (5) 调用上面设置的 Closure,即 FirstSnapshotLoadDone 或 InstallSnapshotDone
done->Run();
}
class StateMachine {
public:
// user defined snapshot load function
// get and load snapshot
// success return 0, fail return errno
// Default: Load nothing and returns error.
virtual int on_snapshot_load(::braft::SnapshotReader* reader);
};
class LocalSnapshotReader : public SnapshotReader {
friend class LocalSnapshotStorage;
public:
...
// Get the path of the Snapshot
virtual std::string get_path() { return _path; }
private:
...
std::string _path;
...
int LocalSnapshotReader::load_meta(SnapshotMeta* meta) {
if (!_meta_table.has_meta()) {
return -1;
}
*meta = _meta_table.meta();
return 0;
}
void LocalSnapshotReader::list_files(std::vector<std::string> *files) {
return _meta_table.list_files(files);
}
void LocalSnapshotMetaTable::list_files(std::vector<std::string>* files) const {
...
files->clear();
files->reserve(_file_map.size());
for (Map::const_iterator
iter = _file_map.begin(); iter != _file_map.end(); ++iter) {
files->push_back(iter->first);
}
}
void FSMCaller::do_snapshot_load(LoadSnapshotClosure* done) {
...
if (meta.old_peers_size() == 0) {
// Joint stage is not supposed to be noticeable by end users.
Configuration conf;
for (int i = 0; i < meta.peers_size(); ++i) {
conf.add_peer(meta.peers(i));
}
_fsm->on_configuration_committed(conf, meta.last_included_index());
}
...
}