]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/checkpoint/checkpoint_impl.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / utilities / checkpoint / checkpoint_impl.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2012 Facebook.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file.
9
10#ifndef ROCKSDB_LITE
11
12#include "utilities/checkpoint/checkpoint_impl.h"
13
14#ifndef __STDC_FORMAT_MACROS
15#define __STDC_FORMAT_MACROS
16#endif
17
18#include <inttypes.h>
19#include <algorithm>
20#include <string>
21#include <vector>
22
23#include "db/wal_manager.h"
24#include "port/port.h"
25#include "rocksdb/db.h"
26#include "rocksdb/env.h"
27#include "rocksdb/transaction_log.h"
28#include "rocksdb/utilities/checkpoint.h"
29#include "util/file_util.h"
30#include "util/filename.h"
31#include "util/sync_point.h"
32
33namespace rocksdb {
34
35Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
36 *checkpoint_ptr = new CheckpointImpl(db);
37 return Status::OK();
38}
39
11fdf7f2
TL
40Status Checkpoint::CreateCheckpoint(const std::string& /*checkpoint_dir*/,
41 uint64_t /*log_size_for_flush*/) {
7c673cae
FG
42 return Status::NotSupported("");
43}
44
11fdf7f2
TL
45void CheckpointImpl::CleanStagingDirectory(
46 const std::string& full_private_path, Logger* info_log) {
47 std::vector<std::string> subchildren;
48 Status s = db_->GetEnv()->FileExists(full_private_path);
49 if (s.IsNotFound()) {
50 return;
51 }
52 ROCKS_LOG_INFO(info_log, "File exists %s -- %s",
53 full_private_path.c_str(), s.ToString().c_str());
54 db_->GetEnv()->GetChildren(full_private_path, &subchildren);
55 for (auto& subchild : subchildren) {
56 std::string subchild_path = full_private_path + "/" + subchild;
57 s = db_->GetEnv()->DeleteFile(subchild_path);
58 ROCKS_LOG_INFO(info_log, "Delete file %s -- %s",
59 subchild_path.c_str(), s.ToString().c_str());
60 }
61 // finally delete the private dir
62 s = db_->GetEnv()->DeleteDir(full_private_path);
63 ROCKS_LOG_INFO(info_log, "Delete dir %s -- %s",
64 full_private_path.c_str(), s.ToString().c_str());
65}
66
7c673cae
FG
67// Builds an openable snapshot of RocksDB
68Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
69 uint64_t log_size_for_flush) {
70 DBOptions db_options = db_->GetDBOptions();
71
72 Status s = db_->GetEnv()->FileExists(checkpoint_dir);
73 if (s.ok()) {
74 return Status::InvalidArgument("Directory exists");
75 } else if (!s.IsNotFound()) {
76 assert(s.IsIOError());
77 return s;
78 }
79
80 ROCKS_LOG_INFO(
81 db_options.info_log,
82 "Started the snapshot process -- creating snapshot in directory %s",
83 checkpoint_dir.c_str());
11fdf7f2
TL
84
85 size_t final_nonslash_idx = checkpoint_dir.find_last_not_of('/');
86 if (final_nonslash_idx == std::string::npos) {
87 // npos means it's only slashes or empty. Non-empty means it's the root
88 // directory, but it shouldn't be because we verified above the directory
89 // doesn't exist.
90 assert(checkpoint_dir.empty());
91 return Status::InvalidArgument("invalid checkpoint directory name");
92 }
93
94 std::string full_private_path =
95 checkpoint_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
96 ROCKS_LOG_INFO(
97 db_options.info_log,
98 "Snapshot process -- using temporary directory %s",
99 full_private_path.c_str());
100 CleanStagingDirectory(full_private_path, db_options.info_log.get());
7c673cae
FG
101 // create snapshot directory
102 s = db_->GetEnv()->CreateDir(full_private_path);
103 uint64_t sequence_number = 0;
104 if (s.ok()) {
105 db_->DisableFileDeletions();
106 s = CreateCustomCheckpoint(
107 db_options,
108 [&](const std::string& src_dirname, const std::string& fname,
109 FileType) {
110 ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", fname.c_str());
111 return db_->GetEnv()->LinkFile(src_dirname + fname,
112 full_private_path + fname);
113 } /* link_file_cb */,
114 [&](const std::string& src_dirname, const std::string& fname,
115 uint64_t size_limit_bytes, FileType) {
116 ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
117 return CopyFile(db_->GetEnv(), src_dirname + fname,
118 full_private_path + fname, size_limit_bytes,
119 db_options.use_fsync);
120 } /* copy_file_cb */,
121 [&](const std::string& fname, const std::string& contents, FileType) {
122 ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
11fdf7f2
TL
123 return CreateFile(db_->GetEnv(), full_private_path + fname, contents,
124 db_options.use_fsync);
7c673cae
FG
125 } /* create_file_cb */,
126 &sequence_number, log_size_for_flush);
127 // we copied all the files, enable file deletions
128 db_->EnableFileDeletions(false);
129 }
130
131 if (s.ok()) {
132 // move tmp private backup to real snapshot directory
133 s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
134 }
135 if (s.ok()) {
494da23a 136 std::unique_ptr<Directory> checkpoint_directory;
7c673cae
FG
137 db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
138 if (checkpoint_directory != nullptr) {
139 s = checkpoint_directory->Fsync();
140 }
141 }
142
143 if (s.ok()) {
144 // here we know that we succeeded and installed the new snapshot
145 ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good");
146 ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64,
147 sequence_number);
148 } else {
149 // clean all the files we might have created
150 ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s",
151 s.ToString().c_str());
11fdf7f2 152 CleanStagingDirectory(full_private_path, db_options.info_log.get());
7c673cae
FG
153 }
154 return s;
155}
156
157Status CheckpointImpl::CreateCustomCheckpoint(
158 const DBOptions& db_options,
159 std::function<Status(const std::string& src_dirname,
160 const std::string& src_fname, FileType type)>
161 link_file_cb,
162 std::function<Status(const std::string& src_dirname,
163 const std::string& src_fname,
164 uint64_t size_limit_bytes, FileType type)>
165 copy_file_cb,
166 std::function<Status(const std::string& fname, const std::string& contents,
167 FileType type)>
168 create_file_cb,
169 uint64_t* sequence_number, uint64_t log_size_for_flush) {
170 Status s;
171 std::vector<std::string> live_files;
172 uint64_t manifest_file_size = 0;
173 uint64_t min_log_num = port::kMaxUint64;
174 *sequence_number = db_->GetLatestSequenceNumber();
175 bool same_fs = true;
176 VectorLogPtr live_wal_files;
177
178 bool flush_memtable = true;
179 if (s.ok()) {
180 if (!db_options.allow_2pc) {
181 if (log_size_for_flush == port::kMaxUint64) {
182 flush_memtable = false;
183 } else if (log_size_for_flush > 0) {
184 // If out standing log files are small, we skip the flush.
185 s = db_->GetSortedWalFiles(live_wal_files);
186
187 if (!s.ok()) {
188 return s;
189 }
190
191 // Don't flush column families if total log size is smaller than
192 // log_size_for_flush. We copy the log files instead.
193 // We may be able to cover 2PC case too.
194 uint64_t total_wal_size = 0;
195 for (auto& wal : live_wal_files) {
196 total_wal_size += wal->SizeFileBytes();
197 }
198 if (total_wal_size < log_size_for_flush) {
199 flush_memtable = false;
200 }
201 live_wal_files.clear();
202 }
203 }
204
205 // this will return live_files prefixed with "/"
206 s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
207
208 if (s.ok() && db_options.allow_2pc) {
209 // If 2PC is enabled, we need to get minimum log number after the flush.
210 // Need to refetch the live files to recapture the snapshot.
211 if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
212 &min_log_num)) {
213 return Status::InvalidArgument(
214 "2PC enabled but cannot fine the min log number to keep.");
215 }
216 // We need to refetch live files with flush to handle this case:
217 // A previous 000001.log contains the prepare record of transaction tnx1.
218 // The current log file is 000002.log, and sequence_number points to this
219 // file.
220 // After calling GetLiveFiles(), 000003.log is created.
221 // Then tnx1 is committed. The commit record is written to 000003.log.
222 // Now we fetch min_log_num, which will be 3.
223 // Then only 000002.log and 000003.log will be copied, and 000001.log will
224 // be skipped. 000003.log contains commit message of tnx1, but we don't
225 // have respective prepare record for it.
226 // In order to avoid this situation, we need to force flush to make sure
227 // all transactions committed before getting min_log_num will be flushed
228 // to SST files.
229 // We cannot get min_log_num before calling the GetLiveFiles() for the
230 // first time, because if we do that, all the logs files will be included,
231 // far more than needed.
232 s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
233 }
234
235 TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
236 TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
11fdf7f2 237 db_->FlushWAL(false /* sync */);
7c673cae
FG
238 }
239 // if we have more than one column family, we need to also get WAL files
240 if (s.ok()) {
241 s = db_->GetSortedWalFiles(live_wal_files);
242 }
243 if (!s.ok()) {
244 return s;
245 }
246
247 size_t wal_size = live_wal_files.size();
248
249 // copy/hard link live_files
250 std::string manifest_fname, current_fname;
251 for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
252 uint64_t number;
253 FileType type;
254 bool ok = ParseFileName(live_files[i], &number, &type);
255 if (!ok) {
256 s = Status::Corruption("Can't parse file name. This is very bad");
257 break;
258 }
259 // we should only get sst, options, manifest and current files here
260 assert(type == kTableFile || type == kDescriptorFile ||
261 type == kCurrentFile || type == kOptionsFile);
262 assert(live_files[i].size() > 0 && live_files[i][0] == '/');
263 if (type == kCurrentFile) {
264 // We will craft the current file manually to ensure it's consistent with
265 // the manifest number. This is necessary because current's file contents
266 // can change during checkpoint creation.
267 current_fname = live_files[i];
268 continue;
269 } else if (type == kDescriptorFile) {
270 manifest_fname = live_files[i];
271 }
272 std::string src_fname = live_files[i];
273
274 // rules:
275 // * if it's kTableFile, then it's shared
276 // * if it's kDescriptorFile, limit the size to manifest_file_size
277 // * always copy if cross-device link
278 if ((type == kTableFile) && same_fs) {
279 s = link_file_cb(db_->GetName(), src_fname, type);
280 if (s.IsNotSupported()) {
281 same_fs = false;
282 s = Status::OK();
283 }
284 }
285 if ((type != kTableFile) || (!same_fs)) {
286 s = copy_file_cb(db_->GetName(), src_fname,
287 (type == kDescriptorFile) ? manifest_file_size : 0,
288 type);
289 }
290 }
291 if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
292 create_file_cb(current_fname, manifest_fname.substr(1) + "\n",
293 kCurrentFile);
294 }
295 ROCKS_LOG_INFO(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt,
296 live_wal_files.size());
297
298 // Link WAL files. Copy exact size of last one because it is the only one
299 // that has changes after the last flush.
300 for (size_t i = 0; s.ok() && i < wal_size; ++i) {
301 if ((live_wal_files[i]->Type() == kAliveLogFile) &&
302 (!flush_memtable ||
303 live_wal_files[i]->StartSequence() >= *sequence_number ||
304 live_wal_files[i]->LogNumber() >= min_log_num)) {
305 if (i + 1 == wal_size) {
306 s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
307 live_wal_files[i]->SizeFileBytes(), kLogFile);
308 break;
309 }
310 if (same_fs) {
311 // we only care about live log files
312 s = link_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
313 kLogFile);
314 if (s.IsNotSupported()) {
315 same_fs = false;
316 s = Status::OK();
317 }
318 }
319 if (!same_fs) {
320 s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), 0,
321 kLogFile);
322 }
323 }
324 }
325
326 return s;
327}
328
329} // namespace rocksdb
330
331#endif // ROCKSDB_LITE