forked from cockroachdb/c-rocksdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
checkpoint.cc
219 lines (194 loc) · 7.49 KB
/
checkpoint.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2012 Facebook.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/checkpoint.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <algorithm>
#include <string>
#include "db/filename.h"
#include "db/wal_manager.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/transaction_log.h"
#include "util/file_util.h"
#include "port/port.h"
namespace rocksdb {
class CheckpointImpl : public Checkpoint {
public:
// Creates a Checkpoint object to be used for creating openable sbapshots
explicit CheckpointImpl(DB* db) : db_(db) {}
// Builds an openable snapshot of RocksDB on the same disk, which
// accepts an output directory on the same disk, and under the directory
// (1) hard-linked SST files pointing to existing live SST files
// SST files will be copied if output directory is on a different filesystem
// (2) a copied manifest files and other files
// The directory should not already exist and will be created by this API.
// The directory will be an absolute path
using Checkpoint::CreateCheckpoint;
virtual Status CreateCheckpoint(const std::string& checkpoint_dir) override;
private:
DB* db_;
};
Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
*checkpoint_ptr = new CheckpointImpl(db);
return Status::OK();
}
Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir) {
return Status::NotSupported("");
}
// Builds an openable snapshot of RocksDB
Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
Status s;
std::vector<std::string> live_files;
uint64_t manifest_file_size = 0;
uint64_t sequence_number = db_->GetLatestSequenceNumber();
bool same_fs = true;
VectorLogPtr live_wal_files;
s = db_->GetEnv()->FileExists(checkpoint_dir);
if (s.ok()) {
return Status::InvalidArgument("Directory exists");
} else if (!s.IsNotFound()) {
assert(s.IsIOError());
return s;
}
s = db_->DisableFileDeletions();
if (s.ok()) {
// this will return live_files prefixed with "/"
s = db_->GetLiveFiles(live_files, &manifest_file_size, true);
}
// if we have more than one column family, we need to also get WAL files
if (s.ok()) {
s = db_->GetSortedWalFiles(live_wal_files);
}
if (!s.ok()) {
db_->EnableFileDeletions(false);
return s;
}
size_t wal_size = live_wal_files.size();
Log(db_->GetOptions().info_log,
"Started the snapshot process -- creating snapshot in directory %s",
checkpoint_dir.c_str());
std::string full_private_path = checkpoint_dir + ".tmp";
// create snapshot directory
s = db_->GetEnv()->CreateDir(full_private_path);
// copy/hard link live_files
for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
uint64_t number;
FileType type;
bool ok = ParseFileName(live_files[i], &number, &type);
if (!ok) {
s = Status::Corruption("Can't parse file name. This is very bad");
break;
}
// we should only get sst, manifest and current files here
assert(type == kTableFile || type == kDescriptorFile ||
type == kCurrentFile);
assert(live_files[i].size() > 0 && live_files[i][0] == '/');
std::string src_fname = live_files[i];
// rules:
// * if it's kTableFile, then it's shared
// * if it's kDescriptorFile, limit the size to manifest_file_size
// * always copy if cross-device link
if ((type == kTableFile) && same_fs) {
Log(db_->GetOptions().info_log, "Hard Linking %s", src_fname.c_str());
s = db_->GetEnv()->LinkFile(db_->GetName() + src_fname,
full_private_path + src_fname);
if (s.IsNotSupported()) {
same_fs = false;
s = Status::OK();
}
}
if ((type != kTableFile) || (!same_fs)) {
Log(db_->GetOptions().info_log, "Copying %s", src_fname.c_str());
s = CopyFile(db_->GetEnv(), db_->GetName() + src_fname,
full_private_path + src_fname,
(type == kDescriptorFile) ? manifest_file_size : 0);
}
}
Log(db_->GetOptions().info_log, "Number of log files %" ROCKSDB_PRIszt,
live_wal_files.size());
// Link WAL files. Copy exact size of last one because it is the only one
// that has changes after the last flush.
for (size_t i = 0; s.ok() && i < wal_size; ++i) {
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
(live_wal_files[i]->StartSequence() >= sequence_number)) {
if (i + 1 == wal_size) {
Log(db_->GetOptions().info_log, "Copying %s",
live_wal_files[i]->PathName().c_str());
s = CopyFile(db_->GetEnv(),
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName(),
live_wal_files[i]->SizeFileBytes());
break;
}
if (same_fs) {
// we only care about live log files
Log(db_->GetOptions().info_log, "Hard Linking %s",
live_wal_files[i]->PathName().c_str());
s = db_->GetEnv()->LinkFile(
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName());
if (s.IsNotSupported()) {
same_fs = false;
s = Status::OK();
}
}
if (!same_fs) {
Log(db_->GetOptions().info_log, "Copying %s",
live_wal_files[i]->PathName().c_str());
s = CopyFile(db_->GetEnv(),
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName(), 0);
}
}
}
// we copied all the files, enable file deletions
db_->EnableFileDeletions(false);
if (s.ok()) {
// move tmp private backup to real snapshot directory
s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
}
if (s.ok()) {
unique_ptr<Directory> checkpoint_directory;
db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
if (checkpoint_directory != nullptr) {
s = checkpoint_directory->Fsync();
}
}
if (!s.ok()) {
// clean all the files we might have created
Log(db_->GetOptions().info_log, "Snapshot failed -- %s",
s.ToString().c_str());
// we have to delete the dir and all its children
std::vector<std::string> subchildren;
db_->GetEnv()->GetChildren(full_private_path, &subchildren);
for (auto& subchild : subchildren) {
Status s1 = db_->GetEnv()->DeleteFile(full_private_path + subchild);
if (s1.ok()) {
Log(db_->GetOptions().info_log, "Deleted %s",
(full_private_path + subchild).c_str());
}
}
// finally delete the private dir
Status s1 = db_->GetEnv()->DeleteDir(full_private_path);
Log(db_->GetOptions().info_log, "Deleted dir %s -- %s",
full_private_path.c_str(), s1.ToString().c_str());
return s;
}
// here we know that we succeeded and installed the new snapshot
Log(db_->GetOptions().info_log, "Snapshot DONE. All is good");
Log(db_->GetOptions().info_log, "Snapshot sequence number: %" PRIu64,
sequence_number);
return s;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE