# 5.2 安装快照

## 流程详解

### 流程概览

当 Follower 落后太多或新节点加入集群时，就会触发安装快照，其流程如下：

1. Leader 向 Follower 发送 `InstallSnapshot` 请求，并停止向 Follower 同步日志
2. Follower 根据请求中的 `URI` 逐一向 Leader 下载快照对应的文件集：
   * 2.1 创建连接 Leader 的客户端
   * 2.2 发送 `GetFile` 请求获取 Leader 快照的元数据
   * 2.3 创建 `temp` 目录用于保存下载的临时快照
   * 2.3 根据元数据中的文件列表对比本地快照与远程快照的差异，获得下载文件列表
   * 2.4 根据文件列表，逐一发送 `GetFile` 请求下载文件并保存至临时快照
   * 2.5 待快照下载完成后，删除本地快照，并将临时快照 `rename()` 成正式快照
3. Follower 回调用户状态机的 `on_snapshot_load` 加载快照
4. 等待快照加载完毕后：
   * 4.1 更新 `applyIndex` 为快照元数据中的 `lastIncludedIndex`
   * 4.2 删除 `logIndex<=lastIncludedIndex` 的日志（即全部日志）
   * 4.3 将快照元数据中的节点配置设为当前节点配置
5. Follower 向 Leader 发送成功的 `InstallSnapshot` 响应
6. Leader 收到成功响应后更新 Follower 的 `nextIndex` 为快照的 `lastIncludedIndex` + 1
7. Leader 从 `nextIndex` 开始继续向 Follower 发送日志

![图 5.3  安装快照 RPC 交互](https://4080527122-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F2OrmypuAcAZcD1NfaAJQ%2Fuploads%2Fgit-blob-9b8973f2727c6d40b2b9e4a6b21ac61786f81697%2Finstall_snapshot.png?alt=media)

### 大文件下载

```proto
message GetFileRequest {
    required int64 reader_id = 1;
    required string filename = 2;
    required int64 count = 3;   // 分片下载 length
    required int64 offset = 4;  // 分片下载 offset
    optional bool read_partly = 5;
}
```

Follower 通过发送 `GetFileRequest` 从 Leader 下载文件，而当快照的文件较大时（超过单个分片大小），这时候就会开启分片下载。每次通过设置 `GetFileRequest` 中的 `count` 和 `offset` 来实现分片下载，默认每个分片为 `128KB`，其受配置项 `raft_max_byte_count_per_rpc` 控制：

```cpp
DEFINE_int32(raft_max_byte_count_per_rpc, 1024 * 128 /*128K*/,
             "Maximum of block size per RPC");
BRPC_VALIDATE_GFLAG(raft_max_byte_count_per_rpc, brpc::PositiveInteger);
```

### 断点续传

Follwer 从 Leader 下载的快照文件会保存在临时快照 `temp` 目录中，如果 Follower 下载了一部分后 Crash，在重启后重新接收 `InstallSnapshot` 开始下载快照时，其不会删除 `temp` 目录，而是对比本地临时快照和远程快照的元数据，对于那些本地已经存在且 `CRC` 一样的文件，则无需重复下载:

![图 5.3  断点续传](https://4080527122-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F2OrmypuAcAZcD1NfaAJQ%2Fuploads%2Fgit-blob-4857c62cd0431b5a20b25b4e13fc673c514d30db%2Fdiff_1.png?alt=media)

进一步地，对于本地正式快照已经存在的文件也无需重复下载。之所以要对比本地正式快照，是因为该快照可能也来自于 Leader 之前的快照：

![图 5.4  断点续传](https://4080527122-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F2OrmypuAcAZcD1NfaAJQ%2Fuploads%2Fgit-blob-2baf325a24ed0bd238a2eb27f105724c62244cce%2Fdiff_2.png?alt=media)

总的来说，为了减少网络的传输，只要本地存在的文件，其文件名和 CRC 和 Leader 的一样就无需重复下载，详见以下[过滤下载列表](#guo-lv-xia-zai-lie-biao)。

### 快照限流

当一个 Raft 进程挂掉一段时间后重启，其可能会从 Leader 下载快照。特别地，当一个进程上跑着大量的 `Raft Group`，而每一个 `Node` 都需要从 Leader 下载快照，这时候下载的数据量将是庞大的，可能会占满 Leader 和 Follower 的网卡和磁盘带宽，影响正常的 `IO`。为此，braft 提供了相应的快照限流特性。

限流作用于以下两个维度：

* 任务个数：节点每开启一个安装快照任务，任务计数将加一（该限制仅作用于 Follower)
* 带宽：Leader 读取本地快照、Follower 通过网络从 Leader 下载文件（文件会写入临时快照），带宽计数将增加对应的字节数。总的来说，其作用的是磁盘带宽和网络带宽

快照限流默认是关闭的，用户需要实现 [SnapshotThrottle](https://github.com/baidu/braft/blob/master/src/braft/snapshot_throttle.h#L26)，并在构建 `Node` 时将其通过 `NodeOptions` 传递给 braft：

```cpp
struct NodeOptions {
    ...
    // If non-null, we will pass this snapshot_throttle to SnapshotExecutor
    // Default: NULL
    scoped_refptr<SnapshotThrottle>* snapshot_throttle;
    ...
};
```

当然了，框架也提供了默认的 `SnapshotThrottle` 实现，即 [ThroughputSnapshotThrottle](https://github.com/baidu/braft/blob/master/src/braft/snapshot_throttle.h#L53)，具体算法实现见[限流算法](https://github.com/baidu/braft/blob/master/src/braft/snapshot_throttle.cpp#L49)，用户构造时可控制带宽大小：

```cpp
class ThroughputSnapshotThrottle : public SnapshotThrottle {
public:
    ThroughputSnapshotThrottle(int64_t throttle_throughput_bytes, int64_t check_cycle);
    ...
private:
    // user defined throughput threshold for raft, bytes per second
    int64_t _throttle_throughput_bytes;
    // user defined check cycles of throughput per second
    int64_t _check_cycle;
    ...
};
```

此外，[ThroughputSnapshotThrottle](https://github.com/baidu/braft/blob/master/src/braft/snapshot_throttle.h#L53) 还提供了一些动态配置项来控制并发任务个数以及带宽大小：

```cpp
// used to increase throttle threshold dynamically when user-defined
// threshold is too small in extreme cases.
// notice that this flag does not distinguish disk types(sata or ssd, and so on)
DEFINE_bool(raft_enable_throttle_when_install_snapshot, true,
            "enable throttle when install snapshot, for both leader and follower");
                    ::brpc::PassValidate);
DEFINE_int64(raft_minimal_throttle_threshold_mb, 0,
            "minimal throttle throughput threshold per second");
                    brpc::NonNegativeInteger);
DEFINE_int32(raft_max_install_snapshot_tasks_num, 1000,
             "Max num of install_snapshot tasks per disk at the same time");
                    brpc::PositiveInteger);
```

由于传递的限流对象是一个指针，用户可以通过指定哪些 `Node` 共用同一个限流对象，来实现各个级别的限流策略，如针对每块盘或每个 `Raft Group`。

### 相关 RPC

安装快照 RPC：

```proto
message SnapshotMeta {
    required int64 last_included_index = 1;
    required int64 last_included_term = 2;
    repeated string peers = 3;
    repeated string old_peers = 4;
}

message InstallSnapshotRequest {
    required string group_id = 1;
    required string server_id = 2;
    required string peer_id = 3;
    required int64 term = 4;
    required SnapshotMeta meta = 5;
    required string uri = 6;
};

message InstallSnapshotResponse {
    required int64 term = 1;
    required bool success = 2;
};

service RaftService {
    rpc install_snapshot(InstallSnapshotRequest) returns (InstallSnapshotResponse);
};
```

下载文件 RPC：

```proto
message GetFileRequest {
    required int64 reader_id = 1;
    required string filename = 2;
    required int64 count = 3;
    required int64 offset = 4;
    optional bool read_partly = 5;
}

message GetFileResponse {
    // Data is in attachment
    required bool eof = 1;
    optional int64 read_size = 2;
}

service FileService {
    rpc get_file(GetFileRequest) returns (GetFileResponse);
}
```

## 阶段一：Leader 下发指令

### 触发安装快照

Leader 在给 Follower 同步日志时，发现 Follower 需要的日志已经被压缩掉了，就会调用 `_install_snapshot` 向 Follower 下发安装快照的指令：

```cpp
// (1) 正常同步日志会调用 `_send_entries`
void Replicator::_send_entries() {
    // (2) 调用  _fill_common_fields 判断 Follower 需要的日志是否还存在
    //     `_next_index` 为下一条需要同步给 Follower 的日志
    if (_fill_common_fields(request.get(), _next_index - 1, false) != 0) {
        ...
        // (4) 调用 `_install_snapshot` 下发安装快照指令
        return _install_snapshot();
    }
    ...
}

int Replicator::_fill_common_fields(AppendEntriesRequest* request,
                                    int64_t prev_log_index,
                                    bool is_heartbeat) {
    // (3) 获取 index 日志对应的 term
    const int64_t prev_log_term = _options.log_manager->get_term(prev_log_index);
    // (4) 如果 term 不存在则代表该日志已经被压缩，返回 -1
    if (prev_log_term == 0 && prev_log_index != 0) {
        if (!is_heartbeat) {
            ...
            return -1;
        } else {
        }
    }
    ...
    return 0;
}
```

### 发送请求

`_install_snapshot` 会向 Follower 发送 `InstallSnapshot` 请求，详见以下注释：

```cpp
void Replicator::_install_snapshot() {
    ...
    // (1) 打开本地最新快照，并获得快照元数据
    _reader = _options.snapshot_storage->open();
    ...
    std::string uri = _reader->generate_uri_for_copy();
    ...
    SnapshotMeta meta;
    // report error on failure
    if (_reader->load_meta(&meta) != 0) {
        ...
    }

    // (2) 设置 `InstallSnapshot` 中的 meta 和 URI 等字段
    brpc::Controller* cntl = new brpc::Controller;
    InstallSnapshotRequest* request = new InstallSnapshotRequest();
    InstallSnapshotResponse* response = new InstallSnapshotResponse();
    ...
    request->mutable_meta()->CopyFrom(meta);
    request->set_uri(uri);

    // (3) 发送 `InstallSnapshot` 请求
    RaftService_Stub stub(&_sending_channel);
    stub.install_snapshot(cntl, request, response, done);
    CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
}
```

`generate_uri_for_copy` 会返回下载的 `URI`，其格式为 `remote://ip:port/reader_id`：

```cpp
std::string LocalSnapshotReader::generate_uri_for_copy() {
    if (_addr == butil::EndPoint()) {
        LOG(ERROR) << "Address is not specified, path:" << _path;
        return std::string();
    }
    if (_reader_id == 0) {
        // TODO: handler referenced files
        scoped_refptr<SnapshotFileReader> reader(
                new SnapshotFileReader(_fs.get(), _path, _snapshot_throttle.get()));
        reader->set_meta_table(_meta_table);
        if (!reader->open()) {
            ...
            return std::string();
        }
        if (file_service_add(reader.get(), &_reader_id) != 0) {
            ...
            return std::string();
        }
    }
    oss << "remote://" << _addr << "/" << _reader_id;
    return oss.str();
}
```

### 处理请求

Follower 在接收到 `InstallSnapshot` 请求后，会调用 `handle_install_snapshot_request` 处理，而该处理函数最终会调用 `SnapshotExecutor::install_snapshot` 来安装快照：

```cpp
void NodeImpl::handle_install_snapshot_request(brpc::Controller* cntl,
                                    const InstallSnapshotRequest* request,
                                    InstallSnapshotResponse* response,
                                    google::protobuf::Closure* done) {
    ...
    return _snapshot_executor->install_snapshot(
            cntl, request, response, done_guard.release());
}
```

`install_snapshot` 主要执行以下几件事：

```cpp
void SnapshotExecutor::install_snapshot(brpc::Controller* cntl,
                                        const InstallSnapshotRequest* request,
                                        InstallSnapshotResponse* response,
                                        google::protobuf::Closure* done) {
    ...
    std::unique_ptr<DownloadingSnapshot> ds(new DownloadingSnapshot);
    ...
    // (1) 调用 register_downloading_snapshot 从 Leader 下载快照
    ret = register_downloading_snapshot(ds.get());
    ...
    // (2) 等待快照下载完成；下载快照为单独的 bthread
    _cur_copier->join();
    ...
    // (3) 调用 load_downloading_snapshot 加载快照
    return load_downloading_snapshot(ds.release(), meta);
}
```

## 阶段二：Follower 下载快照

### 准备下载快照

`register_downloading_snapshot` 会创建客户端，并启动客户端来下载快照：

```cpp
int SnapshotExecutor::register_downloading_snapshot(DownloadingSnapshot* ds) {
    ...
    // (1) 如果当前正在创建快照，返回 `EBUSY`
    if (_saving_snapshot) {
        ...
        ds->cntl->SetFailed(EBUSY, "Is saving snapshot");
        return -1;
    }
    ...
    DownloadingSnapshot* m = _downloading_snapshot.load(
            butil::memory_order_relaxed);
    if (!m) {
        _downloading_snapshot.store(ds, butil::memory_order_relaxed);
        // (2) 调用 `LocalSnapshotStorage::start_to_copy_from` 开始下载快照
        // _cur_copier: LocalSnapshotCopier
        _cur_copier = _snapshot_storage->start_to_copy_from(ds->request->uri());
    }
}

SnapshotCopier* LocalSnapshotStorage::start_to_copy_from(const std::string& uri) {
    // (3) 调用 LocalSnapshotCopier::init 初始化客户端
    LocalSnapshotCopier* copier = new LocalSnapshotCopier(_copy_file);
    if (copier->init(uri) != 0) {  // copier: LocalSnapshotCopier
    }
    ...
    // (4) 调用 `LocalSnapshotCopier::start` 启动客户端，开始下载
    copier->start();
    return copier;
}
```

### 初始化客户端

`LocalSnapshotCopier::init` 会初始化一个 RPC Client 用于下载快照：

```cpp
int LocalSnapshotCopier::init(const std::string& uri) {
    return _copier.init(uri, _fs, _throttle);  // _copier: RemoteFileCopier
}

int RemoteFileCopier::init(const std::string& uri, FileSystemAdaptor* fs,
        SnapshotThrottle* throttle) {
    // (1) 根据 InstallSnapshotRequest 中 uri，解析出 Leader 的 RPC 服务地址
    // Parse uri format: remote://ip:port/reader_id
    static const size_t prefix_size = strlen("remote://");
    butil::StringPiece uri_str(uri);
    ... // 这里忽略解析过程
    butil::StringPiece ip_and_port = uri_str.substr(0, slash_pos);
    ...

    // (2) 根据解析出来的 Leader 的 RPC 服务地址，初始化连接其的 `channel`
    //     该 channel 用于从 Leader 下载快照文件
    brpc::ChannelOptions channel_opt;
    ...
    if (_channel.Init(ip_and_port.as_string().c_str(), &channel_opt) != 0) {
        ...
        return -1;
    }
    ...
    return 0;
}
```

### 启动客户端

`LocalSnapshotCopier::start` 会创建单独的 `bthread` 来运行 `LocalSnapshotCopier::copy`，而该函数就正式开始下载快照：

```cpp
void LocalSnapshotCopier::start() {
    if (bthread_start_background(
                &_tid, NULL, start_copy, this) != 0) {
        ...
    }
}

void *LocalSnapshotCopier::start_copy(void* arg) {
    LocalSnapshotCopier* c = (LocalSnapshotCopier*)arg;
    c->copy();
    return NULL;
}
```

### 下载快照流程

`copy` 函数中描述的是整个下载流程，待全部下载完毕后，会调用 `LocalSnapshotStorage::close` 将临时快照转为正式快照，其中的每一个步骤，我们都将在下面一一介绍：

```cpp
void LocalSnapshotCopier::copy() {
    // (1) 下载步骤
    do {
        // (1.1) 首先下载快照元数据，保存在 _remote_snapshot
        load_meta_table();
        ...
        // (1.2) 过滤掉没必要下载的文件
        filter();
        ...
        // (1.3) 根据元数据中的文件列表，调用 `copy_file` 逐一下载保存至本地临时快照
        std::vector<std::string> files;
        _remote_snapshot.list_files(&files);
        for (size_t i = 0; i < files.size() && ok(); ++i) {
            copy_file(files[i]);
        }
    } while (0);
    ...
    // (2) 下载完成后，调用 `LocalSnapshotStorage::close` 将临时快照转为正式快照
    //     详见以下 <阶段三：转为正式快照>
    if (_writer) {
        if (_storage->close(_writer, _filter_before_copy_remote) != 0 && ok()) {
            ...
        }
    }
    ...
}
```

### 下载快照元数据

调用 `RemoteFileCopier::start_to_copy_to_iobuf` 下载快照元数据，等待其下载完成后，保存至 `_remote_snapshot`：

```cpp
void LocalSnapshotCopier::load_meta_table() {
    butil::IOBuf meta_buf;
    ...
    // (1) 开始下载快照元数据
    //     BRAFT_SNAPSHOT_META_FILE: __raft_snapshot_meta
    scoped_refptr<RemoteFileCopier::Session> session
            = _copier.start_to_copy_to_iobuf(BRAFT_SNAPSHOT_META_FILE,
                                            &meta_buf, NULL);
    ...
    // (2) 等待下载完成
    _cur_session = session.get();
    session->join();
    _cur_session = NULL;

    // (3) 将元数据保存在 `_remote_snapshot`
    if (_remote_snapshot._meta_table.load_from_iobuf_as_remote(meta_buf) != 0) {
        ..
        ..
        return;
    }
}
```

安装快照流程中的所有下载操作都会调用 `start_to_copy_to_iobuf`，该函数接受一个 `source`，向 Leader 发送 `GetFile` 请求，将下载的内容保存在 `dest_buf` 中。特别地，针对大文件，会采用分片的方式进行传输：

```cpp
scoped_refptr<RemoteFileCopier::Session>
RemoteFileCopier::start_to_copy_to_iobuf(
                      const std::string& source,
                      butil::IOBuf* dest_buf,
                      const CopyOptions* options) {
    ...
    // (1) 准备好请求
    scoped_refptr<Session> session(new Session());
    ...
    session->_buf = dest_buf;
    session->_request.set_filename(source);
    ...
    session->_channel = &_channel;
    ...

    // (2) 发送 `GetFile` 请求
    session->send_next_rpc();
    return session;
}

/*
 * message GetFileRequest {
 *   required int64 reader_id = 1;
 *   required string filename = 2;
 *   required int64 count = 3;
 *   required int64 offset = 4;
 *   optional bool read_partly = 5;
 * }
 */
void RemoteFileCopier::Session::send_next_rpc() {
    // (3) 设置 offset
    //     _request.offset() 是上一次发送的 offset
    // Not clear request as we need some fields of the previous RPC
    off_t offset = _request.offset() + _request.count();

    // (4) FLAGS_raft_max_byte_count_per_rpc 默认是 128KB
    //     如果文件大小超过 128KB，则重新计算 count
    const size_t max_count =
            (!_buf) ? FLAGS_raft_max_byte_count_per_rpc : UINT_MAX;
    _cntl.set_timeout_ms(_options.timeout_ms);
    _request.set_offset(offset);

    ...
    size_t new_max_count = max_count;
    ...
    _request.set_count(new_max_count);

    // (5) 发送 `GetFile` 请求
    _rpc_call = _cntl.call_id();
    FileService_Stub stub(_channel);
    AddRef();  // Release in on_rpc_returned
    return stub.get_file(&_cntl, &_request, &_response, &_done);
}
```

### 过滤下载列表

在正式下载文件前，我们会过滤掉本地拥有的文件，具体规则详见以上[断点续传](#断点续传)。

```cpp
void LocalSnapshotCopier::filter() {
    _writer = (LocalSnapshotWriter*)_storage->create(!_filter_before_copy_remote);
    ...

    if (_filter_before_copy_remote) {  // true
        SnapshotReader* reader = _storage->open();
        if (filter_before_copy(_writer, reader) != 0) {
            ...
        }
        ...
    }
    _writer->save_meta(_remote_snapshot._meta_table.meta());
    if (_writer->sync() != 0) {
        ...
        return;
    }
}
```

```cpp
int LocalSnapshotCopier::filter_before_copy(LocalSnapshotWriter* writer,
                                            SnapshotReader* last_snapshot) {
    // (1) 首先获取本地临时快照 temp 目录中的文件列表
    std::vector<std::string> existing_files;
    writer->list_files(&existing_files);
    std::vector<std::string> to_remove;

    for (size_t i = 0; i < existing_files.size(); ++i) {
        // (2) 如果远程（Leader）快照不存在该文件，则将该文件从 temp 目录中移除
        if (_remote_snapshot.get_file_meta(existing_files[i], NULL) != 0) {
            to_remove.push_back(existing_files[i]);
            writer->remove_file(existing_files[i]);  // 将文件名从元数据表中移除
        }
    }

    // (3) 从元数据中获取远程快照的文件列表
    std::vector<std::string> remote_files;
    _remote_snapshot.list_files(&remote_files);
    for (size_t i = 0; i < remote_files.size(); ++i) {
        const std::string& filename = remote_files[i];
        LocalFileMeta remote_meta;
        if (!remote_meta.has_checksum()) {
            // Redownload file if this file doen't have checksum
            writer->remove_file(filename);
            to_remove.push_back(filename);
            continue;
        }

        // (3.2) temp 目录已经存在
        LocalFileMeta local_meta;
        if (writer->get_file_meta(filename, &local_meta) == 0) {  // temp 目录有
            if (local_meta.has_checksum() &&
                local_meta.checksum() == remote_meta.checksum()) {
                ...
                continue;
            }
            // Remove files from writer so that the file is to be copied from
            // remote_snapshot or last_snapshot
            writer->remove_file(filename);
            to_remove.push_back(filename);
        }

        // (3.3) 再在本地的上一个快照中找
        // Try find files in last_snapshot
        if (!last_snapshot) {
            continue;
        }
        if (last_snapshot->get_file_meta(filename, &local_meta) != 0) {
            continue;
        }
        if (!local_meta.has_checksum() || local_meta.checksum() != remote_meta.checksum()) {
            continue;
        }
        ...
        if (local_meta.source() == braft::FILE_SOURCE_LOCAL) {
            std::string source_path = last_snapshot->get_path() + '/'
                                      + filename;
            std::string dest_path = writer->get_path() + '/'
                                      + filename;
            _fs->delete_file(dest_path, false);
            if (!_fs->link(source_path, dest_path)) {
                continue;
            }
            // Don't delete linked file
            if (!to_remove.empty() && to_remove.back() == filename) {
                to_remove.pop_back();
            }
        }
        // Copy file from last_snapshot
        writer->add_file(filename, &local_meta);
    }

    if (writer->sync() != 0) {
        LOG(ERROR) << "Fail to sync writer on path=" << writer->get_path();
        return -1;
    }

    // 删除临时快照中需要删除的文件
    for (size_t i = 0; i < to_remove.size(); ++i) {
        std::string file_path = writer->get_path() + "/" + to_remove[i];
        _fs->delete_file(file_path, false);
    }
    return 0;
}
```

### 创建临时目录

首先会调用 `LocalSnapshotStorage::create` 创建一个 `temp` 目录用来保存下载的临时快照，并返回 `SnapshotWriter`。注意，我们在创建本地快照时，也有一样的创建流程。唯一的区别在于用于保存下载快照的 `temp` 目录即使事先存在也不会删除，主要是为了实现我们上面提到的断点续传功能：

```cpp
SnapshotWriter* LocalSnapshotStorage::create(bool from_empty) {
    LocalSnapshotWriter* writer = NULL;

    do {
        std::string snapshot_path(_path);  // _path 为用户配置的快照存储目录
        snapshot_path.append("/");
        snapshot_path.append(_s_temp_path);  // e.g. data/temp

        // (1) 因为 `from_empty` 为 False，所以有 `temp` 目录的话
        //    将不会删除，而是直接返回 `SnapshotWriter`
        // delete temp
        // TODO: Notify watcher before deleting
        if (_fs->path_exists(snapshot_path) && from_empty) {
            if (destroy_snapshot(snapshot_path) != 0) {
                break;
            }
        }

        // (2) 如果不存在的话，则调用 `LocalSnapshotWriter::init` 创建 temp 目录
        writer = new LocalSnapshotWriter(snapshot_path, _fs.get());
        if (writer->init() != 0) {
            ...
            break;
        }
    } while (0);

    return writer;
}
```

遍历用户指定的快照目录，保存最近的一个快照，删除其余全部快照：

```cpp
int LocalSnapshotWriter::init() {
    butil::File::Error e;
    // (3) 创建 temp 目录
    if (!_fs->create_directory(_path, &e, false)) {
        ...
        return EIO;
    }
    ...
    return 0;
}
```

### 逐一下载文件

在上面的下载快照流程 `copy` 函数中，我们已经介绍过，会根据快照元数据中的文件列表，逐一从 Leader 下载快照文件：

```cpp
void LocalSnapshotCopier::copy() {
    do {
        ...
        // (1) 获取文件列表
        std::vector<std::string> files;
        _remote_snapshot.list_files(&files);
        // (2) 逐一下载
        for (size_t i = 0; i < files.size() && ok(); ++i) {
            copy_file(files[i]);
        }
    } while (0);
    ...
}
```

调用 `list_files` 获取快照元数据中的文件列表：

```cpp
void LocalSnapshot::list_files(std::vector<std::string> *files) {
    return _meta_table.list_files(files);  // _meta_table: LocalSnapshotMetaTable
}

void LocalSnapshotMetaTable::list_files(std::vector<std::string>* files) const {
    ...
    for (Map::const_iterator
            iter = _file_map.begin(); iter != _file_map.end(); ++iter) {
        files->push_back(iter->first);
    }
}
```

调用 `copy_file` 下载指定文件，参见以下详情注释：

```cpp
void LocalSnapshotCopier::copy_file(const std::string& filename) {
    ...
    // (1) 如果路径中还有目录，则在本地创建子目录
    std::string file_path = _writer->get_path() + '/' + filename;
    butil::FilePath sub_path(filename);
    if (sub_path != sub_path.DirName() && sub_path.DirName().value() != ".") {
        ...
        if (FLAGS_raft_create_parent_directories) {
            butil::FilePath sub_dir =
                    butil::FilePath(_writer->get_path()).Append(sub_path.DirName());
            rc = _fs->create_directory(sub_dir.value(), &e, true);
        }
        ...
    }

    // (2) 调用 `RemoteFileCopier::start_to_copy_to_file` 从 Leader 下载文件存至临时快照
    scoped_refptr<RemoteFileCopier::Session> session
        = _copier.start_to_copy_to_file(filename, file_path, NULL);
    ...
    _cur_session = session.get();
    ..
    session->join();
    ...

    // (3) 每成功下载一个文件，则将其添加至临时快照元数据中
    if (_writer->add_file(filename, &meta) != 0) {
        ...
        return;
    }

    // (4) 每成功下载一个文件，则将临时快照元数据中持久化，
    //     也就是说每次都会覆盖快照元数据，这么做主要是为了不重复下载文件
    if (_writer->sync() != 0) {
        ...
        return;
    }
}
```

## 阶段三：转为正式快照

在上面的下载快照流程 `copy` 函数中，我们已经介绍过，当快照中的所有文件都下载完毕后，就会调用 `LocalSnapshotStorage::close` 将下载的临时快照转为正式快照：

```cpp
void LocalSnapshotCopier::copy() {
    // (1) 各种下载流程
    ...
    // (2) 调用 `LocalSnapshotStorage::close`
    if (_writer) {
        // set_error for copier only when failed to close writer and copier was
        // ok before this moment
        if (_storage->close(_writer, _filter_before_copy_remote) != 0 && ok()) {
            ...
        }
        ...
    }
    ...
}
```

### 写入元数据

### rename 成正式快照

在 `close` 函数中主要以下做四件事：

* (1) 将元数据持久化到文件
* (2) 通过 `rename()` 将临时快照变为正式快照
* (3) 删除上一个快照

```cpp
int LocalSnapshotStorage::close(SnapshotWriter* writer_base,
                                bool keep_data_on_error) {
    LocalSnapshotWriter* writer = dynamic_cast<LocalSnapshotWriter*>(writer_base);
    do {
        ...
        // (1) 将快照元数据写入文件
        ret = writer->sync();
        ...
        //
        int old_index = _last_snapshot_index;
        int64_t new_index = writer->snapshot_index();

        // (2) 将临时快照 rename 成正式快照
        // rename temp to new
        std::string temp_path(_path);
        temp_path.append("/");
        temp_path.append(_s_temp_path);
        std::string new_path(_path);
        butil::string_appendf(&new_path, "/" BRAFT_SNAPSHOT_PATTERN, new_index);
        if (!_fs->delete_file(new_path, true)) {
            ...
        }
        ...
        if (!_fs->rename(temp_path, new_path)) {
            ...
        }

        ref(new_index);
        {
            BAIDU_SCOPED_LOCK(_mutex);
            CHECK_EQ(old_index, _last_snapshot_index);
            _last_snapshot_index = new_index;
        }
        // unref old_index, ref new_index

        // (3) 删除上一个快照。
        //     需要注意的是，这里有一个引用判断
        //     因为当前节点可能是 Leader，而该快照可能正用于下载给 Follower
        unref(old_index);
    } while (0);
    ...
}
```

`sync` 会调用 `save_to_file` 将元数据填充到 `proto`（`LocalSnapshotPbMeta`）中，并将其序列化，最终持久化到文件：

```cpp
int LocalSnapshotWriter::sync() {
    // BRAFT_SNAPSHOT_META_FILE: __raft_snapshot_meta
    const int rc = _meta_table.save_to_file(_fs, _path + "/" BRAFT_SNAPSHOT_META_FILE);
    ...
    return rc;
}

int LocalSnapshotMetaTable::save_to_file(FileSystemAdaptor* fs, const std::string& path) const {
    // (1) _meta 中保存的是 lastIncludeIndex，lastIncludedTerm 以及集群配置
    LocalSnapshotPbMeta pb_meta;
    if (_meta.IsInitialized()) {
        *pb_meta.mutable_meta() = _meta;
    }

    // (2) 将所有文件列表加入到 proto
    for (Map::const_iterator
            iter = _file_map.begin(); iter != _file_map.end(); ++iter) {
        LocalSnapshotPbMeta::File *f = pb_meta.add_files();
        f->set_name(iter->first);
        *f->mutable_meta() = iter->second;
    }

    // (3) 序列化并持久化到文件
    ProtoBufFile pb_file(path, fs);
    int ret = pb_file.save(&pb_meta, raft_sync_meta());
    ...
    return ret;
}
```

### 删除上一个快照

调用 `unref` 删除上一个快照。需要注意的是，当前节点可能是 Leader，而该快照可能正用于下载给其他 Follower，所以需要判断其引用计数，若引用计数为 0，则删除其目录：

```cpp
// (1) index 为上一个快照的 lastIncludeIndex
void LocalSnapshotStorage::unref(const int64_t index) {
    std::unique_lock<raft_mutex_t> lck(_mutex);
    std::map<int64_t, int>::iterator it = _ref_map.find(index);
    // (2) 找到上一个快照
    if (it != _ref_map.end()) {
        // (3) 将其引用计数减一
        it->second--;
        // (4) 如果减到 0，则将其删除
        if (it->second == 0) {
            _ref_map.erase(it);
            std::string old_path(_path);
            butil::string_appendf(&old_path, "/" BRAFT_SNAPSHOT_PATTERN, index);
            destroy_snapshot(old_path);
        }
    }
}

// (5) 删除上一个快照的目录
int LocalSnapshotStorage::destroy_snapshot(const std::string& path) {
    if (!_fs->delete_file(path, true)) {
        ...
        return -1;
    }
    return 0;
}
```

## 阶段四：Follower 加载快照

在将下载来的临时快照转为正式快照后，节点就开始加载快照。加载快照的流程我们在[<5.3 加载快照>](https://github.com/Wine93/learn-raft/blob/main/ch05/5.3/load.md)已有详细介绍，在这里就不重复介绍了。

当快照加载完毕后，运行 `Closure` 时会发送 `InstallSnapshot` 响应给 Leader。

## 阶段五：Leader 处理响应

Leader 在收到 `InstallSnapshot` 响应后，会调用 `_on_install_snapshot_returned` 进行处理。在该函数中会根据响应的结果，做出不同的决策：

```cpp
void Replicator::_on_install_snapshot_returned(
            ReplicatorId id, brpc::Controller* cntl,
            InstallSnapshotRequest* request,
            InstallSnapshotResponse* response) {
    ...
    do {
        if (cntl->Failed()) {
            ...
            succ = false;
            break;
        }
        if (!response->success()) {
            succ = false;
            ...
            break;
        }
        // (1) 如果安装快照成功，则将 Follower 的 nextIndex 设置为快照的 lastIncludedIndex + 1
        // Success
        r->_next_index = request->meta().last_included_index() + 1;
        ...
    } while (0);

    // (2) 如果失败，则先阻塞当前 Replicator 一会儿，最终仍会再次安装快照
    // We don't retry installing the snapshot explicitly.
    // dummy_id is unlock in _send_entries
    if (!succ) {
        return r->_block(butil::gettimeofday_us(), cntl->ErrorCode());
    }

    // (1.1) 如果成功的话，则继续向 Follower 同步日志
    // dummy_id is unlock in _send_entries
    return r->_send_entries();
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://wine93.gitbook.io/xray-braft/ch05/install_snapshot.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
