]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/backupable/backupable_db.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / backupable / backupable_db.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifndef ROCKSDB_LITE
11
12 #include "rocksdb/utilities/backupable_db.h"
13
14 #include <stdlib.h>
15
16 #include <algorithm>
17 #include <atomic>
18 #include <cinttypes>
19 #include <functional>
20 #include <future>
21 #include <limits>
22 #include <map>
23 #include <mutex>
24 #include <sstream>
25 #include <string>
26 #include <thread>
27 #include <unordered_map>
28 #include <unordered_set>
29 #include <vector>
30
31 #include "env/composite_env_wrapper.h"
32 #include "file/filename.h"
33 #include "file/sequence_file_reader.h"
34 #include "file/writable_file_writer.h"
35 #include "logging/logging.h"
36 #include "port/port.h"
37 #include "rocksdb/rate_limiter.h"
38 #include "rocksdb/transaction_log.h"
39 #include "table/sst_file_dumper.h"
40 #include "test_util/sync_point.h"
41 #include "util/channel.h"
42 #include "util/coding.h"
43 #include "util/crc32c.h"
44 #include "util/string_util.h"
45 #include "utilities/checkpoint/checkpoint_impl.h"
46
47 namespace ROCKSDB_NAMESPACE {
48
49 namespace {
50 using ShareFilesNaming = BackupableDBOptions::ShareFilesNaming;
51
52 inline uint32_t ChecksumHexToInt32(const std::string& checksum_hex) {
53 std::string checksum_str;
54 Slice(checksum_hex).DecodeHex(&checksum_str);
55 return EndianSwapValue(DecodeFixed32(checksum_str.c_str()));
56 }
57 inline std::string ChecksumStrToHex(const std::string& checksum_str) {
58 return Slice(checksum_str).ToString(true);
59 }
60 inline std::string ChecksumInt32ToHex(const uint32_t& checksum_value) {
61 std::string checksum_str;
62 PutFixed32(&checksum_str, EndianSwapValue(checksum_value));
63 return ChecksumStrToHex(checksum_str);
64 }
65 } // namespace
66
67 void BackupStatistics::IncrementNumberSuccessBackup() {
68 number_success_backup++;
69 }
70 void BackupStatistics::IncrementNumberFailBackup() {
71 number_fail_backup++;
72 }
73
74 uint32_t BackupStatistics::GetNumberSuccessBackup() const {
75 return number_success_backup;
76 }
77 uint32_t BackupStatistics::GetNumberFailBackup() const {
78 return number_fail_backup;
79 }
80
81 std::string BackupStatistics::ToString() const {
82 char result[50];
83 snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u",
84 GetNumberSuccessBackup(), GetNumberFailBackup());
85 return result;
86 }
87
88 void BackupableDBOptions::Dump(Logger* logger) const {
89 ROCKS_LOG_INFO(logger, " Options.backup_dir: %s",
90 backup_dir.c_str());
91 ROCKS_LOG_INFO(logger, " Options.backup_env: %p", backup_env);
92 ROCKS_LOG_INFO(logger, " Options.share_table_files: %d",
93 static_cast<int>(share_table_files));
94 ROCKS_LOG_INFO(logger, " Options.info_log: %p", info_log);
95 ROCKS_LOG_INFO(logger, " Options.sync: %d",
96 static_cast<int>(sync));
97 ROCKS_LOG_INFO(logger, " Options.destroy_old_data: %d",
98 static_cast<int>(destroy_old_data));
99 ROCKS_LOG_INFO(logger, " Options.backup_log_files: %d",
100 static_cast<int>(backup_log_files));
101 ROCKS_LOG_INFO(logger, " Options.backup_rate_limit: %" PRIu64,
102 backup_rate_limit);
103 ROCKS_LOG_INFO(logger, " Options.restore_rate_limit: %" PRIu64,
104 restore_rate_limit);
105 ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d",
106 max_background_operations);
107 }
108
109 // -------- BackupEngineImpl class ---------
110 class BackupEngineImpl : public BackupEngine {
111 public:
112 BackupEngineImpl(const BackupableDBOptions& options, Env* db_env,
113 bool read_only = false);
114 ~BackupEngineImpl() override;
115
116 using BackupEngine::CreateNewBackupWithMetadata;
117 Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db,
118 const std::string& app_metadata) override;
119
120 Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
121
122 Status DeleteBackup(BackupID backup_id) override;
123
124 void StopBackup() override {
125 stop_backup_.store(true, std::memory_order_release);
126 }
127
128 Status GarbageCollect() override;
129
130 // The returned BackupInfos are in chronological order, which means the
131 // latest backup comes last.
132 void GetBackupInfo(std::vector<BackupInfo>* backup_info) override;
133
134 void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override;
135
136 using BackupEngine::RestoreDBFromBackup;
137 Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
138 const std::string& db_dir,
139 const std::string& wal_dir) override;
140
141 using BackupEngine::RestoreDBFromLatestBackup;
142 Status RestoreDBFromLatestBackup(const RestoreOptions& options,
143 const std::string& db_dir,
144 const std::string& wal_dir) override {
145 return RestoreDBFromBackup(options, latest_valid_backup_id_, db_dir,
146 wal_dir);
147 }
148
149 Status VerifyBackup(BackupID backup_id,
150 bool verify_with_checksum = false) override;
151
152 Status Initialize();
153
154 ShareFilesNaming GetNamingNoFlags() const {
155 return options_.share_files_with_checksum_naming &
156 BackupableDBOptions::kMaskNoNamingFlags;
157 }
158 ShareFilesNaming GetNamingFlags() const {
159 return options_.share_files_with_checksum_naming &
160 BackupableDBOptions::kMaskNamingFlags;
161 }
162
163 private:
164 void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
165 Status DeleteBackupInternal(BackupID backup_id);
166
167 // Extends the "result" map with pathname->size mappings for the contents of
168 // "dir" in "env". Pathnames are prefixed with "dir".
169 Status InsertPathnameToSizeBytes(
170 const std::string& dir, Env* env,
171 std::unordered_map<std::string, uint64_t>* result);
172
173 struct FileInfo {
174 FileInfo(const std::string& fname, uint64_t sz, const std::string& checksum,
175 const std::string& id = "", const std::string& sid = "")
176 : refs(0),
177 filename(fname),
178 size(sz),
179 checksum_hex(checksum),
180 db_id(id),
181 db_session_id(sid) {}
182
183 FileInfo(const FileInfo&) = delete;
184 FileInfo& operator=(const FileInfo&) = delete;
185
186 int refs;
187 const std::string filename;
188 const uint64_t size;
189 const std::string checksum_hex;
190 // DB identities
191 // db_id is obtained for potential usage in the future but not used
192 // currently
193 const std::string db_id;
194 // db_session_id appears in the backup SST filename if the table naming
195 // option is kUseDbSessionId
196 const std::string db_session_id;
197 };
198
199 class BackupMeta {
200 public:
201 BackupMeta(
202 const std::string& meta_filename, const std::string& meta_tmp_filename,
203 std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
204 Env* env)
205 : timestamp_(0),
206 sequence_number_(0),
207 size_(0),
208 meta_filename_(meta_filename),
209 meta_tmp_filename_(meta_tmp_filename),
210 file_infos_(file_infos),
211 env_(env) {}
212
213 BackupMeta(const BackupMeta&) = delete;
214 BackupMeta& operator=(const BackupMeta&) = delete;
215
216 ~BackupMeta() {}
217
218 void RecordTimestamp() {
219 env_->GetCurrentTime(&timestamp_);
220 }
221 int64_t GetTimestamp() const {
222 return timestamp_;
223 }
224 uint64_t GetSize() const {
225 return size_;
226 }
227 uint32_t GetNumberFiles() { return static_cast<uint32_t>(files_.size()); }
228 void SetSequenceNumber(uint64_t sequence_number) {
229 sequence_number_ = sequence_number;
230 }
231 uint64_t GetSequenceNumber() {
232 return sequence_number_;
233 }
234
235 const std::string& GetAppMetadata() const { return app_metadata_; }
236
237 void SetAppMetadata(const std::string& app_metadata) {
238 app_metadata_ = app_metadata;
239 }
240
241 Status AddFile(std::shared_ptr<FileInfo> file_info);
242
243 Status Delete(bool delete_meta = true);
244
245 bool Empty() {
246 return files_.empty();
247 }
248
249 std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
250 auto it = file_infos_->find(filename);
251 if (it == file_infos_->end())
252 return nullptr;
253 return it->second;
254 }
255
256 const std::vector<std::shared_ptr<FileInfo>>& GetFiles() {
257 return files_;
258 }
259
260 // @param abs_path_to_size Pre-fetched file sizes (bytes).
261 Status LoadFromFile(
262 const std::string& backup_dir,
263 const std::unordered_map<std::string, uint64_t>& abs_path_to_size);
264 Status StoreToFile(bool sync);
265
266 std::string GetInfoString() {
267 std::ostringstream ss;
268 ss << "Timestamp: " << timestamp_ << std::endl;
269 char human_size[16];
270 AppendHumanBytes(size_, human_size, sizeof(human_size));
271 ss << "Size: " << human_size << std::endl;
272 ss << "Files:" << std::endl;
273 for (const auto& file : files_) {
274 AppendHumanBytes(file->size, human_size, sizeof(human_size));
275 ss << file->filename << ", size " << human_size << ", refs "
276 << file->refs << std::endl;
277 }
278 return ss.str();
279 }
280
281 private:
282 int64_t timestamp_;
283 // sequence number is only approximate, should not be used
284 // by clients
285 uint64_t sequence_number_;
286 uint64_t size_;
287 std::string app_metadata_;
288 std::string const meta_filename_;
289 std::string const meta_tmp_filename_;
290 // files with relative paths (without "/" prefix!!)
291 std::vector<std::shared_ptr<FileInfo>> files_;
292 std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
293 Env* env_;
294
295 static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB
296 }; // BackupMeta
297
298 inline std::string GetAbsolutePath(
299 const std::string &relative_path = "") const {
300 assert(relative_path.size() == 0 || relative_path[0] != '/');
301 return options_.backup_dir + "/" + relative_path;
302 }
303 inline std::string GetPrivateDirRel() const {
304 return "private";
305 }
306 inline std::string GetSharedDirRel() const { return "shared"; }
307 inline std::string GetSharedChecksumDirRel() const {
308 return "shared_checksum";
309 }
310 inline std::string GetPrivateFileRel(BackupID backup_id,
311 bool tmp = false,
312 const std::string& file = "") const {
313 assert(file.size() == 0 || file[0] != '/');
314 return GetPrivateDirRel() + "/" + ROCKSDB_NAMESPACE::ToString(backup_id) +
315 (tmp ? ".tmp" : "") + "/" + file;
316 }
317 inline std::string GetSharedFileRel(const std::string& file = "",
318 bool tmp = false) const {
319 assert(file.size() == 0 || file[0] != '/');
320 return GetSharedDirRel() + "/" + (tmp ? "." : "") + file +
321 (tmp ? ".tmp" : "");
322 }
323 inline std::string GetSharedFileWithChecksumRel(const std::string& file = "",
324 bool tmp = false) const {
325 assert(file.size() == 0 || file[0] != '/');
326 return GetSharedChecksumDirRel() + "/" + (tmp ? "." : "") + file +
327 (tmp ? ".tmp" : "");
328 }
329 inline bool UseLegacyNaming(const std::string& sid) const {
330 return GetNamingNoFlags() ==
331 BackupableDBOptions::kLegacyCrc32cAndFileSize ||
332 sid.empty();
333 }
334 inline bool UseInterimNaming(const std::string& sid) const {
335 // The indicator of SST file from early internal 6.12 release
336 // is a '-' in the DB session id. DB session id was made more
337 // concise without '-' after that.
338 return (GetNamingFlags() & BackupableDBOptions::kFlagMatchInterimNaming) &&
339 sid.find('-') != std::string::npos;
340 }
341 inline std::string GetSharedFileWithChecksum(
342 const std::string& file, bool has_checksum,
343 const std::string& checksum_hex, const uint64_t file_size,
344 const std::string& db_session_id) const {
345 assert(file.size() == 0 || file[0] != '/');
346 std::string file_copy = file;
347 if (UseLegacyNaming(db_session_id)) {
348 assert(has_checksum);
349 (void)has_checksum;
350 file_copy.insert(file_copy.find_last_of('.'),
351 "_" + ToString(ChecksumHexToInt32(checksum_hex)) + "_" +
352 ToString(file_size));
353 } else if (UseInterimNaming(db_session_id)) {
354 file_copy.insert(file_copy.find_last_of('.'), "_" + db_session_id);
355 } else {
356 file_copy.insert(file_copy.find_last_of('.'), "_s" + db_session_id);
357 if (GetNamingFlags() & BackupableDBOptions::kFlagIncludeFileSize) {
358 file_copy.insert(file_copy.find_last_of('.'),
359 "_" + ToString(file_size));
360 }
361 }
362 return file_copy;
363 }
364 inline std::string GetFileFromChecksumFile(const std::string& file) const {
365 assert(file.size() == 0 || file[0] != '/');
366 std::string file_copy = file;
367 size_t first_underscore = file_copy.find_first_of('_');
368 return file_copy.erase(first_underscore,
369 file_copy.find_last_of('.') - first_underscore);
370 }
371 inline std::string GetBackupMetaDir() const {
372 return GetAbsolutePath("meta");
373 }
374 inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const {
375 return GetBackupMetaDir() + "/" + (tmp ? "." : "") +
376 ROCKSDB_NAMESPACE::ToString(backup_id) + (tmp ? ".tmp" : "");
377 }
378
379 // If size_limit == 0, there is no size limit, copy everything.
380 //
381 // Exactly one of src and contents must be non-empty.
382 //
383 // @param src If non-empty, the file is copied from this pathname.
384 // @param contents If non-empty, the file will be created with these contents.
385 Status CopyOrCreateFile(const std::string& src, const std::string& dst,
386 const std::string& contents, Env* src_env,
387 Env* dst_env, const EnvOptions& src_env_options,
388 bool sync, RateLimiter* rate_limiter,
389 uint64_t* size = nullptr,
390 std::string* checksum_hex = nullptr,
391 uint64_t size_limit = 0,
392 std::function<void()> progress_callback = []() {});
393
394 Status ReadFileAndComputeChecksum(const std::string& src, Env* src_env,
395 const EnvOptions& src_env_options,
396 uint64_t size_limit,
397 std::string* checksum_hex);
398
399 // Obtain db_id and db_session_id from the table properties of file_path
400 Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options,
401 const std::string& file_path, std::string* db_id,
402 std::string* db_session_id);
403
404 struct CopyOrCreateResult {
405 uint64_t size;
406 std::string checksum_hex;
407 std::string db_id;
408 std::string db_session_id;
409 Status status;
410 };
411
412 // Exactly one of src_path and contents must be non-empty. If src_path is
413 // non-empty, the file is copied from this pathname. Otherwise, if contents is
414 // non-empty, the file will be created at dst_path with these contents.
415 struct CopyOrCreateWorkItem {
416 std::string src_path;
417 std::string dst_path;
418 std::string contents;
419 Env* src_env;
420 Env* dst_env;
421 EnvOptions src_env_options;
422 bool sync;
423 RateLimiter* rate_limiter;
424 uint64_t size_limit;
425 std::promise<CopyOrCreateResult> result;
426 std::function<void()> progress_callback;
427 bool verify_checksum_after_work;
428 std::string src_checksum_func_name;
429 std::string src_checksum_hex;
430 std::string db_id;
431 std::string db_session_id;
432
433 CopyOrCreateWorkItem()
434 : src_path(""),
435 dst_path(""),
436 contents(""),
437 src_env(nullptr),
438 dst_env(nullptr),
439 src_env_options(),
440 sync(false),
441 rate_limiter(nullptr),
442 size_limit(0),
443 verify_checksum_after_work(false),
444 src_checksum_func_name(kUnknownFileChecksumFuncName),
445 src_checksum_hex(""),
446 db_id(""),
447 db_session_id("") {}
448
449 CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
450 CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
451
452 CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
453 *this = std::move(o);
454 }
455
456 CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
457 src_path = std::move(o.src_path);
458 dst_path = std::move(o.dst_path);
459 contents = std::move(o.contents);
460 src_env = o.src_env;
461 dst_env = o.dst_env;
462 src_env_options = std::move(o.src_env_options);
463 sync = o.sync;
464 rate_limiter = o.rate_limiter;
465 size_limit = o.size_limit;
466 result = std::move(o.result);
467 progress_callback = std::move(o.progress_callback);
468 verify_checksum_after_work = o.verify_checksum_after_work;
469 src_checksum_func_name = std::move(o.src_checksum_func_name);
470 src_checksum_hex = std::move(o.src_checksum_hex);
471 db_id = std::move(o.db_id);
472 db_session_id = std::move(o.db_session_id);
473 return *this;
474 }
475
476 CopyOrCreateWorkItem(
477 std::string _src_path, std::string _dst_path, std::string _contents,
478 Env* _src_env, Env* _dst_env, EnvOptions _src_env_options, bool _sync,
479 RateLimiter* _rate_limiter, uint64_t _size_limit,
480 std::function<void()> _progress_callback = []() {},
481 bool _verify_checksum_after_work = false,
482 const std::string& _src_checksum_func_name =
483 kUnknownFileChecksumFuncName,
484 const std::string& _src_checksum_hex = "",
485 const std::string& _db_id = "", const std::string& _db_session_id = "")
486 : src_path(std::move(_src_path)),
487 dst_path(std::move(_dst_path)),
488 contents(std::move(_contents)),
489 src_env(_src_env),
490 dst_env(_dst_env),
491 src_env_options(std::move(_src_env_options)),
492 sync(_sync),
493 rate_limiter(_rate_limiter),
494 size_limit(_size_limit),
495 progress_callback(_progress_callback),
496 verify_checksum_after_work(_verify_checksum_after_work),
497 src_checksum_func_name(_src_checksum_func_name),
498 src_checksum_hex(_src_checksum_hex),
499 db_id(_db_id),
500 db_session_id(_db_session_id) {}
501 };
502
503 struct BackupAfterCopyOrCreateWorkItem {
504 std::future<CopyOrCreateResult> result;
505 bool shared;
506 bool needed_to_copy;
507 Env* backup_env;
508 std::string dst_path_tmp;
509 std::string dst_path;
510 std::string dst_relative;
511 BackupAfterCopyOrCreateWorkItem()
512 : shared(false),
513 needed_to_copy(false),
514 backup_env(nullptr),
515 dst_path_tmp(""),
516 dst_path(""),
517 dst_relative("") {}
518
519 BackupAfterCopyOrCreateWorkItem(BackupAfterCopyOrCreateWorkItem&& o)
520 ROCKSDB_NOEXCEPT {
521 *this = std::move(o);
522 }
523
524 BackupAfterCopyOrCreateWorkItem& operator=(
525 BackupAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
526 result = std::move(o.result);
527 shared = o.shared;
528 needed_to_copy = o.needed_to_copy;
529 backup_env = o.backup_env;
530 dst_path_tmp = std::move(o.dst_path_tmp);
531 dst_path = std::move(o.dst_path);
532 dst_relative = std::move(o.dst_relative);
533 return *this;
534 }
535
536 BackupAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
537 bool _shared, bool _needed_to_copy,
538 Env* _backup_env, std::string _dst_path_tmp,
539 std::string _dst_path,
540 std::string _dst_relative)
541 : result(std::move(_result)),
542 shared(_shared),
543 needed_to_copy(_needed_to_copy),
544 backup_env(_backup_env),
545 dst_path_tmp(std::move(_dst_path_tmp)),
546 dst_path(std::move(_dst_path)),
547 dst_relative(std::move(_dst_relative)) {}
548 };
549
550 struct RestoreAfterCopyOrCreateWorkItem {
551 std::future<CopyOrCreateResult> result;
552 std::string checksum_hex;
553 RestoreAfterCopyOrCreateWorkItem() : checksum_hex("") {}
554 RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
555 const std::string& _checksum_hex)
556 : result(std::move(_result)), checksum_hex(_checksum_hex) {}
557 RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem&& o)
558 ROCKSDB_NOEXCEPT {
559 *this = std::move(o);
560 }
561
562 RestoreAfterCopyOrCreateWorkItem& operator=(
563 RestoreAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
564 result = std::move(o.result);
565 checksum_hex = std::move(o.checksum_hex);
566 return *this;
567 }
568 };
569
570 bool initialized_;
571 std::mutex byte_report_mutex_;
572 channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
573 std::vector<port::Thread> threads_;
574 std::atomic<CpuPriority> threads_cpu_priority_;
575 // Certain operations like PurgeOldBackups and DeleteBackup will trigger
576 // automatic GarbageCollect (true) unless we've already done one in this
577 // session and have not failed to delete backup files since then (false).
578 bool might_need_garbage_collect_ = true;
579
580 // Adds a file to the backup work queue to be copied or created if it doesn't
581 // already exist.
582 //
583 // Exactly one of src_dir and contents must be non-empty.
584 //
585 // @param src_dir If non-empty, the file in this directory named fname will be
586 // copied.
587 // @param fname Name of destination file and, in case of copy, source file.
588 // @param contents If non-empty, the file will be created with these contents.
589 Status AddBackupFileWorkItem(
590 std::unordered_set<std::string>& live_dst_paths,
591 std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
592 BackupID backup_id, bool shared, const std::string& src_dir,
593 const std::string& fname, // starts with "/"
594 const EnvOptions& src_env_options, RateLimiter* rate_limiter,
595 uint64_t size_bytes, uint64_t size_limit = 0,
596 bool shared_checksum = false,
597 std::function<void()> progress_callback = []() {},
598 const std::string& contents = std::string(),
599 const std::string& src_checksum_func_name = kUnknownFileChecksumFuncName,
600 const std::string& src_checksum_str = kUnknownFileChecksum);
601
602 // backup state data
603 BackupID latest_backup_id_;
604 BackupID latest_valid_backup_id_;
605 std::map<BackupID, std::unique_ptr<BackupMeta>> backups_;
606 std::map<BackupID, std::pair<Status, std::unique_ptr<BackupMeta>>>
607 corrupt_backups_;
608 std::unordered_map<std::string,
609 std::shared_ptr<FileInfo>> backuped_file_infos_;
610 std::atomic<bool> stop_backup_;
611
612 // options data
613 BackupableDBOptions options_;
614 Env* db_env_;
615 Env* backup_env_;
616
617 // directories
618 std::unique_ptr<Directory> backup_directory_;
619 std::unique_ptr<Directory> shared_directory_;
620 std::unique_ptr<Directory> meta_directory_;
621 std::unique_ptr<Directory> private_directory_;
622
623 static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB
624 size_t copy_file_buffer_size_;
625 bool read_only_;
626 BackupStatistics backup_statistics_;
627 static const size_t kMaxAppMetaSize = 1024 * 1024; // 1MB
628 };
629
630 Status BackupEngine::Open(const BackupableDBOptions& options, Env* env,
631 BackupEngine** backup_engine_ptr) {
632 std::unique_ptr<BackupEngineImpl> backup_engine(
633 new BackupEngineImpl(options, env));
634 auto s = backup_engine->Initialize();
635 if (!s.ok()) {
636 *backup_engine_ptr = nullptr;
637 return s;
638 }
639 *backup_engine_ptr = backup_engine.release();
640 return Status::OK();
641 }
642
643 BackupEngineImpl::BackupEngineImpl(const BackupableDBOptions& options,
644 Env* db_env, bool read_only)
645 : initialized_(false),
646 threads_cpu_priority_(),
647 latest_backup_id_(0),
648 latest_valid_backup_id_(0),
649 stop_backup_(false),
650 options_(options),
651 db_env_(db_env),
652 backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
653 copy_file_buffer_size_(kDefaultCopyFileBufferSize),
654 read_only_(read_only) {
655 if (options_.backup_rate_limiter == nullptr &&
656 options_.backup_rate_limit > 0) {
657 options_.backup_rate_limiter.reset(
658 NewGenericRateLimiter(options_.backup_rate_limit));
659 }
660 if (options_.restore_rate_limiter == nullptr &&
661 options_.restore_rate_limit > 0) {
662 options_.restore_rate_limiter.reset(
663 NewGenericRateLimiter(options_.restore_rate_limit));
664 }
665 }
666
667 BackupEngineImpl::~BackupEngineImpl() {
668 files_to_copy_or_create_.sendEof();
669 for (auto& t : threads_) {
670 t.join();
671 }
672 LogFlush(options_.info_log);
673 }
674
675 Status BackupEngineImpl::Initialize() {
676 assert(!initialized_);
677 initialized_ = true;
678 if (read_only_) {
679 ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine");
680 }
681 options_.Dump(options_.info_log);
682
683 if (!read_only_) {
684 // we might need to clean up from previous crash or I/O errors
685 might_need_garbage_collect_ = true;
686
687 if (options_.max_valid_backups_to_open != port::kMaxInt32) {
688 options_.max_valid_backups_to_open = port::kMaxInt32;
689 ROCKS_LOG_WARN(
690 options_.info_log,
691 "`max_valid_backups_to_open` is not set to the default value. Ignoring "
692 "its value since BackupEngine is not read-only.");
693 }
694
695 // gather the list of directories that we need to create
696 std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
697 directories;
698 directories.emplace_back(GetAbsolutePath(), &backup_directory_);
699 if (options_.share_table_files) {
700 if (options_.share_files_with_checksum) {
701 directories.emplace_back(
702 GetAbsolutePath(GetSharedFileWithChecksumRel()),
703 &shared_directory_);
704 } else {
705 directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
706 &shared_directory_);
707 }
708 }
709 directories.emplace_back(GetAbsolutePath(GetPrivateDirRel()),
710 &private_directory_);
711 directories.emplace_back(GetBackupMetaDir(), &meta_directory_);
712 // create all the dirs we need
713 for (const auto& d : directories) {
714 auto s = backup_env_->CreateDirIfMissing(d.first);
715 if (s.ok()) {
716 s = backup_env_->NewDirectory(d.first, d.second);
717 }
718 if (!s.ok()) {
719 return s;
720 }
721 }
722 }
723
724 std::vector<std::string> backup_meta_files;
725 {
726 auto s = backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
727 if (s.IsNotFound()) {
728 return Status::NotFound(GetBackupMetaDir() + " is missing");
729 } else if (!s.ok()) {
730 return s;
731 }
732 }
733 // create backups_ structure
734 for (auto& file : backup_meta_files) {
735 if (file == "." || file == "..") {
736 continue;
737 }
738 ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str());
739 BackupID backup_id = 0;
740 sscanf(file.c_str(), "%u", &backup_id);
741 if (backup_id == 0 || file != ROCKSDB_NAMESPACE::ToString(backup_id)) {
742 if (!read_only_) {
743 // invalid file name, delete that
744 auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
745 ROCKS_LOG_INFO(options_.info_log,
746 "Unrecognized meta file %s, deleting -- %s",
747 file.c_str(), s.ToString().c_str());
748 }
749 continue;
750 }
751 assert(backups_.find(backup_id) == backups_.end());
752 // Insert all the (backup_id, BackupMeta) that will be loaded later
753 // The loading performed later will check whether there are corrupt backups
754 // and move the corrupt backups to corrupt_backups_
755 backups_.insert(std::make_pair(
756 backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
757 GetBackupMetaFile(backup_id, false /* tmp */),
758 GetBackupMetaFile(backup_id, true /* tmp */),
759 &backuped_file_infos_, backup_env_))));
760 }
761
762 latest_backup_id_ = 0;
763 latest_valid_backup_id_ = 0;
764 if (options_.destroy_old_data) { // Destroy old data
765 assert(!read_only_);
766 ROCKS_LOG_INFO(
767 options_.info_log,
768 "Backup Engine started with destroy_old_data == true, deleting all "
769 "backups");
770 auto s = PurgeOldBackups(0);
771 if (s.ok()) {
772 s = GarbageCollect();
773 }
774 if (!s.ok()) {
775 return s;
776 }
777 } else { // Load data from storage
778 // abs_path_to_size: maps absolute paths of files in backup directory to
779 // their corresponding sizes
780 std::unordered_map<std::string, uint64_t> abs_path_to_size;
781 // Insert files and their sizes in backup sub-directories (shared and
782 // shared_checksum) to abs_path_to_size
783 for (const auto& rel_dir :
784 {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
785 const auto abs_dir = GetAbsolutePath(rel_dir);
786 InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size);
787 }
788 // load the backups if any, until valid_backups_to_open of the latest
789 // non-corrupted backups have been successfully opened.
790 int valid_backups_to_open = options_.max_valid_backups_to_open;
791 for (auto backup_iter = backups_.rbegin();
792 backup_iter != backups_.rend();
793 ++backup_iter) {
794 assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
795 if (latest_backup_id_ == 0) {
796 latest_backup_id_ = backup_iter->first;
797 }
798 if (valid_backups_to_open == 0) {
799 break;
800 }
801
802 // Insert files and their sizes in backup sub-directories
803 // (private/backup_id) to abs_path_to_size
804 InsertPathnameToSizeBytes(
805 GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_,
806 &abs_path_to_size);
807 Status s = backup_iter->second->LoadFromFile(options_.backup_dir,
808 abs_path_to_size);
809 if (s.IsCorruption()) {
810 ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
811 backup_iter->first, s.ToString().c_str());
812 corrupt_backups_.insert(
813 std::make_pair(backup_iter->first,
814 std::make_pair(s, std::move(backup_iter->second))));
815 } else if (!s.ok()) {
816 // Distinguish corruption errors from errors in the backup Env.
817 // Errors in the backup Env (i.e., this code path) will cause Open() to
818 // fail, whereas corruption errors would not cause Open() failures.
819 return s;
820 } else {
821 ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
822 backup_iter->first,
823 backup_iter->second->GetInfoString().c_str());
824 assert(latest_valid_backup_id_ == 0 ||
825 latest_valid_backup_id_ > backup_iter->first);
826 if (latest_valid_backup_id_ == 0) {
827 latest_valid_backup_id_ = backup_iter->first;
828 }
829 --valid_backups_to_open;
830 }
831 }
832
833 for (const auto& corrupt : corrupt_backups_) {
834 backups_.erase(backups_.find(corrupt.first));
835 }
836 // erase the backups before max_valid_backups_to_open
837 int num_unopened_backups;
838 if (options_.max_valid_backups_to_open == 0) {
839 num_unopened_backups = 0;
840 } else {
841 num_unopened_backups =
842 std::max(0, static_cast<int>(backups_.size()) -
843 options_.max_valid_backups_to_open);
844 }
845 for (int i = 0; i < num_unopened_backups; ++i) {
846 assert(backups_.begin()->second->Empty());
847 backups_.erase(backups_.begin());
848 }
849 }
850
851 ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_);
852 ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
853 latest_valid_backup_id_);
854
855 // set up threads perform copies from files_to_copy_or_create_ in the
856 // background
857 threads_cpu_priority_ = CpuPriority::kNormal;
858 threads_.reserve(options_.max_background_operations);
859 for (int t = 0; t < options_.max_background_operations; t++) {
860 threads_.emplace_back([this]() {
861 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
862 #if __GLIBC_PREREQ(2, 12)
863 pthread_setname_np(pthread_self(), "backup_engine");
864 #endif
865 #endif
866 CpuPriority current_priority = CpuPriority::kNormal;
867 CopyOrCreateWorkItem work_item;
868 while (files_to_copy_or_create_.read(work_item)) {
869 CpuPriority priority = threads_cpu_priority_;
870 if (current_priority != priority) {
871 TEST_SYNC_POINT_CALLBACK(
872 "BackupEngineImpl::Initialize:SetCpuPriority", &priority);
873 port::SetCpuPriority(0, priority);
874 current_priority = priority;
875 }
876 CopyOrCreateResult result;
877 result.status = CopyOrCreateFile(
878 work_item.src_path, work_item.dst_path, work_item.contents,
879 work_item.src_env, work_item.dst_env, work_item.src_env_options,
880 work_item.sync, work_item.rate_limiter, &result.size,
881 &result.checksum_hex, work_item.size_limit,
882 work_item.progress_callback);
883 result.db_id = work_item.db_id;
884 result.db_session_id = work_item.db_session_id;
885 if (result.status.ok() && work_item.verify_checksum_after_work) {
886 // unknown checksum function name implies no db table file checksum in
887 // db manifest; work_item.verify_checksum_after_work being true means
888 // backup engine has calculated its crc32c checksum for the table
889 // file; therefore, we are able to compare the checksums.
890 if (work_item.src_checksum_func_name ==
891 kUnknownFileChecksumFuncName ||
892 work_item.src_checksum_func_name == kDbFileChecksumFuncName) {
893 if (work_item.src_checksum_hex != result.checksum_hex) {
894 std::string checksum_info(
895 "Expected checksum is " + work_item.src_checksum_hex +
896 " while computed checksum is " + result.checksum_hex);
897 result.status =
898 Status::Corruption("Checksum mismatch after copying to " +
899 work_item.dst_path + ": " + checksum_info);
900 }
901 } else {
902 std::string checksum_function_info(
903 "Existing checksum function is " +
904 work_item.src_checksum_func_name +
905 " while provided checksum function is " +
906 kBackupFileChecksumFuncName);
907 ROCKS_LOG_INFO(
908 options_.info_log,
909 "Unable to verify checksum after copying to %s: %s\n",
910 work_item.dst_path.c_str(), checksum_function_info.c_str());
911 }
912 }
913 work_item.result.set_value(std::move(result));
914 }
915 });
916 }
917 ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
918
919 return Status::OK();
920 }
921
922 Status BackupEngineImpl::CreateNewBackupWithMetadata(
923 const CreateBackupOptions& options, DB* db,
924 const std::string& app_metadata) {
925 assert(initialized_);
926 assert(!read_only_);
927 if (app_metadata.size() > kMaxAppMetaSize) {
928 return Status::InvalidArgument("App metadata too large");
929 }
930
931 if (options.decrease_background_thread_cpu_priority) {
932 if (options.background_thread_cpu_priority < threads_cpu_priority_) {
933 threads_cpu_priority_.store(options.background_thread_cpu_priority);
934 }
935 }
936
937 BackupID new_backup_id = latest_backup_id_ + 1;
938
939 assert(backups_.find(new_backup_id) == backups_.end());
940
941 auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
942 Status s = backup_env_->FileExists(private_dir);
943 if (s.ok()) {
944 // maybe last backup failed and left partial state behind, clean it up.
945 // need to do this before updating backups_ such that a private dir
946 // named after new_backup_id will be cleaned up.
947 // (If an incomplete new backup is followed by an incomplete delete
948 // of the latest full backup, then there could be more than one next
949 // id with a private dir, the last thing to be deleted in delete
950 // backup, but all will be cleaned up with a GarbageCollect.)
951 s = GarbageCollect();
952 } else if (s.IsNotFound()) {
953 // normal case, the new backup's private dir doesn't exist yet
954 s = Status::OK();
955 }
956
957 auto ret = backups_.insert(std::make_pair(
958 new_backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
959 GetBackupMetaFile(new_backup_id, false /* tmp */),
960 GetBackupMetaFile(new_backup_id, true /* tmp */),
961 &backuped_file_infos_, backup_env_))));
962 assert(ret.second == true);
963 auto& new_backup = ret.first->second;
964 new_backup->RecordTimestamp();
965 new_backup->SetAppMetadata(app_metadata);
966
967 auto start_backup = backup_env_->NowMicros();
968
969 ROCKS_LOG_INFO(options_.info_log,
970 "Started the backup process -- creating backup %u",
971 new_backup_id);
972 if (s.ok()) {
973 s = backup_env_->CreateDir(private_dir);
974 }
975
976 RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
977 if (rate_limiter) {
978 copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
979 }
980
981 // A set into which we will insert the dst_paths that are calculated for live
982 // files and live WAL files.
983 // This is used to check whether a live files shares a dst_path with another
984 // live file.
985 std::unordered_set<std::string> live_dst_paths;
986
987 std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
988 // Add a CopyOrCreateWorkItem to the channel for each live file
989 db->DisableFileDeletions();
990 if (s.ok()) {
991 CheckpointImpl checkpoint(db);
992 uint64_t sequence_number = 0;
993 DBOptions db_options = db->GetDBOptions();
994 FileChecksumGenFactory* db_checksum_factory =
995 db_options.file_checksum_gen_factory.get();
996 const std::string kFileChecksumGenFactoryName =
997 "FileChecksumGenCrc32cFactory";
998 bool compare_checksum =
999 db_checksum_factory != nullptr &&
1000 db_checksum_factory->Name() == kFileChecksumGenFactoryName
1001 ? true
1002 : false;
1003 EnvOptions src_raw_env_options(db_options);
1004 s = checkpoint.CreateCustomCheckpoint(
1005 db_options,
1006 [&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
1007 FileType) {
1008 // custom checkpoint will switch to calling copy_file_cb after it sees
1009 // NotSupported returned from link_file_cb.
1010 return Status::NotSupported();
1011 } /* link_file_cb */,
1012 [&](const std::string& src_dirname, const std::string& fname,
1013 uint64_t size_limit_bytes, FileType type,
1014 const std::string& checksum_func_name,
1015 const std::string& checksum_val) {
1016 if (type == kWalFile && !options_.backup_log_files) {
1017 return Status::OK();
1018 }
1019 Log(options_.info_log, "add file for backup %s", fname.c_str());
1020 uint64_t size_bytes = 0;
1021 Status st;
1022 if (type == kTableFile) {
1023 st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
1024 }
1025 EnvOptions src_env_options;
1026 switch (type) {
1027 case kWalFile:
1028 src_env_options =
1029 db_env_->OptimizeForLogRead(src_raw_env_options);
1030 break;
1031 case kTableFile:
1032 src_env_options = db_env_->OptimizeForCompactionTableRead(
1033 src_raw_env_options, ImmutableDBOptions(db_options));
1034 break;
1035 case kDescriptorFile:
1036 src_env_options =
1037 db_env_->OptimizeForManifestRead(src_raw_env_options);
1038 break;
1039 default:
1040 // Other backed up files (like options file) are not read by live
1041 // DB, so don't need to worry about avoiding mixing buffered and
1042 // direct I/O. Just use plain defaults.
1043 src_env_options = src_raw_env_options;
1044 break;
1045 }
1046 if (st.ok()) {
1047 st = AddBackupFileWorkItem(
1048 live_dst_paths, backup_items_to_finish, new_backup_id,
1049 options_.share_table_files && type == kTableFile, src_dirname,
1050 fname, src_env_options, rate_limiter, size_bytes,
1051 size_limit_bytes,
1052 options_.share_files_with_checksum && type == kTableFile,
1053 options.progress_callback, "" /* contents */,
1054 checksum_func_name, checksum_val);
1055 }
1056 return st;
1057 } /* copy_file_cb */,
1058 [&](const std::string& fname, const std::string& contents, FileType) {
1059 Log(options_.info_log, "add file for backup %s", fname.c_str());
1060 return AddBackupFileWorkItem(
1061 live_dst_paths, backup_items_to_finish, new_backup_id,
1062 false /* shared */, "" /* src_dir */, fname,
1063 EnvOptions() /* src_env_options */, rate_limiter, contents.size(),
1064 0 /* size_limit */, false /* shared_checksum */,
1065 options.progress_callback, contents);
1066 } /* create_file_cb */,
1067 &sequence_number, options.flush_before_backup ? 0 : port::kMaxUint64,
1068 compare_checksum);
1069 if (s.ok()) {
1070 new_backup->SetSequenceNumber(sequence_number);
1071 }
1072 }
1073 ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
1074 Status item_status;
1075 for (auto& item : backup_items_to_finish) {
1076 item.result.wait();
1077 auto result = item.result.get();
1078 item_status = result.status;
1079 if (item_status.ok() && item.shared && item.needed_to_copy) {
1080 item_status =
1081 item.backup_env->RenameFile(item.dst_path_tmp, item.dst_path);
1082 }
1083 if (item_status.ok()) {
1084 item_status = new_backup.get()->AddFile(std::make_shared<FileInfo>(
1085 item.dst_relative, result.size, result.checksum_hex, result.db_id,
1086 result.db_session_id));
1087 }
1088 if (!item_status.ok()) {
1089 s = item_status;
1090 }
1091 }
1092
1093 // we copied all the files, enable file deletions
1094 db->EnableFileDeletions(false);
1095
1096 auto backup_time = backup_env_->NowMicros() - start_backup;
1097
1098 if (s.ok()) {
1099 // persist the backup metadata on the disk
1100 s = new_backup->StoreToFile(options_.sync);
1101 }
1102 if (s.ok() && options_.sync) {
1103 std::unique_ptr<Directory> backup_private_directory;
1104 backup_env_->NewDirectory(
1105 GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
1106 &backup_private_directory);
1107 if (backup_private_directory != nullptr) {
1108 s = backup_private_directory->Fsync();
1109 }
1110 if (s.ok() && private_directory_ != nullptr) {
1111 s = private_directory_->Fsync();
1112 }
1113 if (s.ok() && meta_directory_ != nullptr) {
1114 s = meta_directory_->Fsync();
1115 }
1116 if (s.ok() && shared_directory_ != nullptr) {
1117 s = shared_directory_->Fsync();
1118 }
1119 if (s.ok() && backup_directory_ != nullptr) {
1120 s = backup_directory_->Fsync();
1121 }
1122 }
1123
1124 if (s.ok()) {
1125 backup_statistics_.IncrementNumberSuccessBackup();
1126 }
1127 if (!s.ok()) {
1128 backup_statistics_.IncrementNumberFailBackup();
1129 // clean all the files we might have created
1130 ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
1131 s.ToString().c_str());
1132 ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
1133 backup_statistics_.ToString().c_str());
1134 // delete files that we might have already written
1135 might_need_garbage_collect_ = true;
1136 DeleteBackup(new_backup_id);
1137 return s;
1138 }
1139
1140 // here we know that we succeeded and installed the new backup
1141 // in the LATEST_BACKUP file
1142 latest_backup_id_ = new_backup_id;
1143 latest_valid_backup_id_ = new_backup_id;
1144 ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
1145
1146 // backup_speed is in byte/second
1147 double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
1148 ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
1149 new_backup->GetNumberFiles());
1150 char human_size[16];
1151 AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
1152 ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
1153 ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
1154 backup_time);
1155 ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
1156 ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
1157 backup_statistics_.ToString().c_str());
1158 return s;
1159 }
1160
1161 Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
1162 assert(initialized_);
1163 assert(!read_only_);
1164
1165 // Best effort deletion even with errors
1166 Status overall_status = Status::OK();
1167
1168 ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
1169 num_backups_to_keep);
1170 std::vector<BackupID> to_delete;
1171 auto itr = backups_.begin();
1172 while ((backups_.size() - to_delete.size()) > num_backups_to_keep) {
1173 to_delete.push_back(itr->first);
1174 itr++;
1175 }
1176 for (auto backup_id : to_delete) {
1177 auto s = DeleteBackupInternal(backup_id);
1178 if (!s.ok()) {
1179 overall_status = s;
1180 }
1181 }
1182 // Clean up after any incomplete backup deletion, potentially from
1183 // earlier session.
1184 if (might_need_garbage_collect_) {
1185 auto s = GarbageCollect();
1186 if (!s.ok() && overall_status.ok()) {
1187 overall_status = s;
1188 }
1189 }
1190 return overall_status;
1191 }
1192
1193 Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
1194 auto s1 = DeleteBackupInternal(backup_id);
1195 auto s2 = Status::OK();
1196
1197 // Clean up after any incomplete backup deletion, potentially from
1198 // earlier session.
1199 if (might_need_garbage_collect_) {
1200 s2 = GarbageCollect();
1201 }
1202
1203 if (!s1.ok()) {
1204 return s1;
1205 } else {
1206 return s2;
1207 }
1208 }
1209
1210 // Does not auto-GarbageCollect
1211 Status BackupEngineImpl::DeleteBackupInternal(BackupID backup_id) {
1212 assert(initialized_);
1213 assert(!read_only_);
1214
1215 ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
1216 auto backup = backups_.find(backup_id);
1217 if (backup != backups_.end()) {
1218 auto s = backup->second->Delete();
1219 if (!s.ok()) {
1220 return s;
1221 }
1222 backups_.erase(backup);
1223 } else {
1224 auto corrupt = corrupt_backups_.find(backup_id);
1225 if (corrupt == corrupt_backups_.end()) {
1226 return Status::NotFound("Backup not found");
1227 }
1228 auto s = corrupt->second.second->Delete();
1229 if (!s.ok()) {
1230 return s;
1231 }
1232 corrupt_backups_.erase(corrupt);
1233 }
1234
1235 // After removing meta file, best effort deletion even with errors.
1236 // (Don't delete other files if we can't delete the meta file right
1237 // now.)
1238 std::vector<std::string> to_delete;
1239 for (auto& itr : backuped_file_infos_) {
1240 if (itr.second->refs == 0) {
1241 Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
1242 ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
1243 s.ToString().c_str());
1244 to_delete.push_back(itr.first);
1245 if (!s.ok()) {
1246 // Trying again later might work
1247 might_need_garbage_collect_ = true;
1248 }
1249 }
1250 }
1251 for (auto& td : to_delete) {
1252 backuped_file_infos_.erase(td);
1253 }
1254
1255 // take care of private dirs -- GarbageCollect() will take care of them
1256 // if they are not empty
1257 std::string private_dir = GetPrivateFileRel(backup_id);
1258 Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
1259 ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
1260 private_dir.c_str(), s.ToString().c_str());
1261 if (!s.ok()) {
1262 // Full gc or trying again later might work
1263 might_need_garbage_collect_ = true;
1264 }
1265 return Status::OK();
1266 }
1267
1268 void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
1269 assert(initialized_);
1270 backup_info->reserve(backups_.size());
1271 for (auto& backup : backups_) {
1272 if (!backup.second->Empty()) {
1273 backup_info->push_back(BackupInfo(
1274 backup.first, backup.second->GetTimestamp(), backup.second->GetSize(),
1275 backup.second->GetNumberFiles(), backup.second->GetAppMetadata()));
1276 }
1277 }
1278 }
1279
1280 void
1281 BackupEngineImpl::GetCorruptedBackups(
1282 std::vector<BackupID>* corrupt_backup_ids) {
1283 assert(initialized_);
1284 corrupt_backup_ids->reserve(corrupt_backups_.size());
1285 for (auto& backup : corrupt_backups_) {
1286 corrupt_backup_ids->push_back(backup.first);
1287 }
1288 }
1289
1290 Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
1291 BackupID backup_id,
1292 const std::string& db_dir,
1293 const std::string& wal_dir) {
1294 assert(initialized_);
1295 auto corrupt_itr = corrupt_backups_.find(backup_id);
1296 if (corrupt_itr != corrupt_backups_.end()) {
1297 return corrupt_itr->second.first;
1298 }
1299 auto backup_itr = backups_.find(backup_id);
1300 if (backup_itr == backups_.end()) {
1301 return Status::NotFound("Backup not found");
1302 }
1303 auto& backup = backup_itr->second;
1304 if (backup->Empty()) {
1305 return Status::NotFound("Backup not found");
1306 }
1307
1308 ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id);
1309 ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n",
1310 static_cast<int>(options.keep_log_files));
1311
1312 // just in case. Ignore errors
1313 db_env_->CreateDirIfMissing(db_dir);
1314 db_env_->CreateDirIfMissing(wal_dir);
1315
1316 if (options.keep_log_files) {
1317 // delete files in db_dir, but keep all the log files
1318 DeleteChildren(db_dir, 1 << kWalFile);
1319 // move all the files from archive dir to wal_dir
1320 std::string archive_dir = ArchivalDirectory(wal_dir);
1321 std::vector<std::string> archive_files;
1322 db_env_->GetChildren(archive_dir, &archive_files); // ignore errors
1323 for (const auto& f : archive_files) {
1324 uint64_t number;
1325 FileType type;
1326 bool ok = ParseFileName(f, &number, &type);
1327 if (ok && type == kWalFile) {
1328 ROCKS_LOG_INFO(options_.info_log,
1329 "Moving log file from archive/ to wal_dir: %s",
1330 f.c_str());
1331 Status s =
1332 db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
1333 if (!s.ok()) {
1334 // if we can't move log file from archive_dir to wal_dir,
1335 // we should fail, since it might mean data loss
1336 return s;
1337 }
1338 }
1339 }
1340 } else {
1341 DeleteChildren(wal_dir);
1342 DeleteChildren(ArchivalDirectory(wal_dir));
1343 DeleteChildren(db_dir);
1344 }
1345
1346 RateLimiter* rate_limiter = options_.restore_rate_limiter.get();
1347 if (rate_limiter) {
1348 copy_file_buffer_size_ =
1349 static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
1350 }
1351 Status s;
1352 std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
1353 for (const auto& file_info : backup->GetFiles()) {
1354 const std::string& file = file_info->filename;
1355 std::string dst;
1356 // 1. extract the filename
1357 size_t slash = file.find_last_of('/');
1358 // file will either be shared/<file>, shared_checksum/<file_crc32c_size>,
1359 // shared_checksum/<file_session>, shared_checksum/<file_crc32c_session>,
1360 // or private/<number>/<file>
1361 assert(slash != std::string::npos);
1362 dst = file.substr(slash + 1);
1363
1364 // if the file was in shared_checksum, extract the real file name
1365 // in this case the file is <number>_<checksum>_<size>.<type>,
1366 // <number>_<session>.<type>, or <number>_<checksum>_<session>.<type>
1367 if (file.substr(0, slash) == GetSharedChecksumDirRel()) {
1368 dst = GetFileFromChecksumFile(dst);
1369 }
1370
1371 // 2. find the filetype
1372 uint64_t number;
1373 FileType type;
1374 bool ok = ParseFileName(dst, &number, &type);
1375 if (!ok) {
1376 return Status::Corruption("Backup corrupted: Fail to parse filename " +
1377 dst);
1378 }
1379 // 3. Construct the final path
1380 // kWalFile lives in wal_dir and all the rest live in db_dir
1381 dst = ((type == kWalFile) ? wal_dir : db_dir) + "/" + dst;
1382
1383 ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(),
1384 dst.c_str());
1385 CopyOrCreateWorkItem copy_or_create_work_item(
1386 GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
1387 EnvOptions() /* src_env_options */, false, rate_limiter,
1388 0 /* size_limit */);
1389 RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1390 copy_or_create_work_item.result.get_future(), file_info->checksum_hex);
1391 files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
1392 restore_items_to_finish.push_back(
1393 std::move(after_copy_or_create_work_item));
1394 }
1395 Status item_status;
1396 for (auto& item : restore_items_to_finish) {
1397 item.result.wait();
1398 auto result = item.result.get();
1399 item_status = result.status;
1400 // Note: It is possible that both of the following bad-status cases occur
1401 // during copying. But, we only return one status.
1402 if (!item_status.ok()) {
1403 s = item_status;
1404 break;
1405 } else if (item.checksum_hex != result.checksum_hex) {
1406 s = Status::Corruption("Checksum check failed");
1407 break;
1408 }
1409 }
1410
1411 ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
1412 s.ToString().c_str());
1413 return s;
1414 }
1415
1416 Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
1417 bool verify_with_checksum) {
1418 // Check if backup_id is corrupted, or valid and registered
1419 assert(initialized_);
1420 auto corrupt_itr = corrupt_backups_.find(backup_id);
1421 if (corrupt_itr != corrupt_backups_.end()) {
1422 return corrupt_itr->second.first;
1423 }
1424
1425 auto backup_itr = backups_.find(backup_id);
1426 if (backup_itr == backups_.end()) {
1427 return Status::NotFound();
1428 }
1429
1430 auto& backup = backup_itr->second;
1431 if (backup->Empty()) {
1432 return Status::NotFound();
1433 }
1434
1435 ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id);
1436
1437 // Find all existing backup files belong to backup_id
1438 std::unordered_map<std::string, uint64_t> curr_abs_path_to_size;
1439 for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(),
1440 GetSharedFileWithChecksumRel()}) {
1441 const auto abs_dir = GetAbsolutePath(rel_dir);
1442 InsertPathnameToSizeBytes(abs_dir, backup_env_, &curr_abs_path_to_size);
1443 }
1444
1445 // For all files registered in backup
1446 for (const auto& file_info : backup->GetFiles()) {
1447 const auto abs_path = GetAbsolutePath(file_info->filename);
1448 // check existence of the file
1449 if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) {
1450 return Status::NotFound("File missing: " + abs_path);
1451 }
1452 // verify file size
1453 if (file_info->size != curr_abs_path_to_size[abs_path]) {
1454 std::string size_info("Expected file size is " +
1455 ToString(file_info->size) +
1456 " while found file size is " +
1457 ToString(curr_abs_path_to_size[abs_path]));
1458 return Status::Corruption("File corrupted: File size mismatch for " +
1459 abs_path + ": " + size_info);
1460 }
1461 if (verify_with_checksum) {
1462 // verify file checksum
1463 std::string checksum_hex;
1464 ROCKS_LOG_INFO(options_.info_log, "Verifying %s checksum...\n",
1465 abs_path.c_str());
1466 ReadFileAndComputeChecksum(abs_path, backup_env_, EnvOptions(),
1467 0 /* size_limit */, &checksum_hex);
1468 if (file_info->checksum_hex != checksum_hex) {
1469 std::string checksum_info(
1470 "Expected checksum is " + file_info->checksum_hex +
1471 " while computed checksum is " + checksum_hex);
1472 return Status::Corruption("File corrupted: Checksum mismatch for " +
1473 abs_path + ": " + checksum_info);
1474 }
1475 }
1476 }
1477 return Status::OK();
1478 }
1479
1480 Status BackupEngineImpl::CopyOrCreateFile(
1481 const std::string& src, const std::string& dst, const std::string& contents,
1482 Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync,
1483 RateLimiter* rate_limiter, uint64_t* size, std::string* checksum_hex,
1484 uint64_t size_limit, std::function<void()> progress_callback) {
1485 assert(src.empty() != contents.empty());
1486 Status s;
1487 std::unique_ptr<WritableFile> dst_file;
1488 std::unique_ptr<SequentialFile> src_file;
1489 EnvOptions dst_env_options;
1490 dst_env_options.use_mmap_writes = false;
1491 // TODO:(gzh) maybe use direct reads/writes here if possible
1492 if (size != nullptr) {
1493 *size = 0;
1494 }
1495 uint32_t checksum_value = 0;
1496
1497 // Check if size limit is set. if not, set it to very big number
1498 if (size_limit == 0) {
1499 size_limit = std::numeric_limits<uint64_t>::max();
1500 }
1501
1502 s = dst_env->NewWritableFile(dst, &dst_file, dst_env_options);
1503 if (s.ok() && !src.empty()) {
1504 s = src_env->NewSequentialFile(src, &src_file, src_env_options);
1505 }
1506 if (!s.ok()) {
1507 return s;
1508 }
1509
1510 std::unique_ptr<WritableFileWriter> dest_writer(new WritableFileWriter(
1511 NewLegacyWritableFileWrapper(std::move(dst_file)), dst, dst_env_options));
1512 std::unique_ptr<SequentialFileReader> src_reader;
1513 std::unique_ptr<char[]> buf;
1514 if (!src.empty()) {
1515 src_reader.reset(new SequentialFileReader(
1516 NewLegacySequentialFileWrapper(src_file), src));
1517 buf.reset(new char[copy_file_buffer_size_]);
1518 }
1519
1520 Slice data;
1521 uint64_t processed_buffer_size = 0;
1522 do {
1523 if (stop_backup_.load(std::memory_order_acquire)) {
1524 return Status::Incomplete("Backup stopped");
1525 }
1526 if (!src.empty()) {
1527 size_t buffer_to_read = (copy_file_buffer_size_ < size_limit)
1528 ? copy_file_buffer_size_
1529 : static_cast<size_t>(size_limit);
1530 s = src_reader->Read(buffer_to_read, &data, buf.get());
1531 processed_buffer_size += buffer_to_read;
1532 } else {
1533 data = contents;
1534 }
1535 size_limit -= data.size();
1536 TEST_SYNC_POINT_CALLBACK(
1537 "BackupEngineImpl::CopyOrCreateFile:CorruptionDuringBackup",
1538 (src.length() > 4 && src.rfind(".sst") == src.length() - 4) ? &data
1539 : nullptr);
1540
1541 if (!s.ok()) {
1542 return s;
1543 }
1544
1545 if (size != nullptr) {
1546 *size += data.size();
1547 }
1548 if (checksum_hex != nullptr) {
1549 checksum_value = crc32c::Extend(checksum_value, data.data(), data.size());
1550 }
1551 s = dest_writer->Append(data);
1552 if (rate_limiter != nullptr) {
1553 rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
1554 RateLimiter::OpType::kWrite);
1555 }
1556 if (processed_buffer_size > options_.callback_trigger_interval_size) {
1557 processed_buffer_size -= options_.callback_trigger_interval_size;
1558 std::lock_guard<std::mutex> lock(byte_report_mutex_);
1559 progress_callback();
1560 }
1561 } while (s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
1562
1563 // Convert uint32_t checksum to hex checksum
1564 if (checksum_hex != nullptr) {
1565 checksum_hex->assign(ChecksumInt32ToHex(checksum_value));
1566 }
1567
1568 if (s.ok() && sync) {
1569 s = dest_writer->Sync(false);
1570 }
1571 if (s.ok()) {
1572 s = dest_writer->Close();
1573 }
1574 return s;
1575 }
1576
1577 // fname will always start with "/"
1578 Status BackupEngineImpl::AddBackupFileWorkItem(
1579 std::unordered_set<std::string>& live_dst_paths,
1580 std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
1581 BackupID backup_id, bool shared, const std::string& src_dir,
1582 const std::string& fname, const EnvOptions& src_env_options,
1583 RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit,
1584 bool shared_checksum, std::function<void()> progress_callback,
1585 const std::string& contents, const std::string& src_checksum_func_name,
1586 const std::string& src_checksum_str) {
1587 assert(!fname.empty() && fname[0] == '/');
1588 assert(contents.empty() != src_dir.empty());
1589
1590 std::string dst_relative = fname.substr(1);
1591 std::string dst_relative_tmp;
1592 Status s;
1593 std::string checksum_hex;
1594 std::string db_id;
1595 std::string db_session_id;
1596 // whether the checksum for a table file is available
1597 bool has_checksum = false;
1598
1599 // Whenever a default checksum function name is passed in, we will compares
1600 // the corresponding checksum values after copying. Note that only table files
1601 // may have a known checksum function name passed in.
1602 //
1603 // If no default checksum function name is passed in and db session id is not
1604 // available, we will calculate the checksum *before* copying in two cases
1605 // (we always calcuate checksums when copying or creating for any file types):
1606 // a) share_files_with_checksum is true and file type is table;
1607 // b) share_table_files is true and the file exists already.
1608 //
1609 // Step 0: Check if default checksum function name is passed in
1610 if (kDbFileChecksumFuncName == src_checksum_func_name) {
1611 if (src_checksum_str == kUnknownFileChecksum) {
1612 return Status::Aborted("Unknown checksum value for " + fname);
1613 }
1614 checksum_hex = ChecksumStrToHex(src_checksum_str);
1615 has_checksum = true;
1616 }
1617
1618 // Step 1: Prepare the relative path to destination
1619 if (shared && shared_checksum) {
1620 if (GetNamingNoFlags() != BackupableDBOptions::kLegacyCrc32cAndFileSize) {
1621 // Prepare db_session_id to add to the file name
1622 // Ignore the returned status
1623 // In the failed cases, db_id and db_session_id will be empty
1624 GetFileDbIdentities(db_env_, src_env_options, src_dir + fname, &db_id,
1625 &db_session_id);
1626 }
1627 // Calculate checksum if checksum and db session id are not available.
1628 // If db session id is available, we will not calculate the checksum
1629 // since the session id should suffice to avoid file name collision in
1630 // the shared_checksum directory.
1631 if (!has_checksum && db_session_id.empty()) {
1632 s = ReadFileAndComputeChecksum(src_dir + fname, db_env_, src_env_options,
1633 size_limit, &checksum_hex);
1634 if (!s.ok()) {
1635 return s;
1636 }
1637 has_checksum = true;
1638 }
1639 if (size_bytes == port::kMaxUint64) {
1640 return Status::NotFound("File missing: " + src_dir + fname);
1641 }
1642 // dst_relative depends on the following conditions:
1643 // 1) the naming scheme is kUseDbSessionId,
1644 // 2) db_session_id is not empty,
1645 // 3) checksum is available in the DB manifest.
1646 // If 1,2,3) are satisfied, then dst_relative will be of the form:
1647 // shared_checksum/<file_number>_<checksum>_<db_session_id>.sst
1648 // If 1,2) are satisfied, then dst_relative will be of the form:
1649 // shared_checksum/<file_number>_<db_session_id>.sst
1650 // Otherwise, dst_relative is of the form
1651 // shared_checksum/<file_number>_<checksum>_<size>.sst
1652 dst_relative = GetSharedFileWithChecksum(
1653 dst_relative, has_checksum, checksum_hex, size_bytes, db_session_id);
1654 dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
1655 dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
1656 } else if (shared) {
1657 dst_relative_tmp = GetSharedFileRel(dst_relative, true);
1658 dst_relative = GetSharedFileRel(dst_relative, false);
1659 } else {
1660 dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
1661 }
1662
1663 // We copy into `temp_dest_path` and, once finished, rename it to
1664 // `final_dest_path`. This allows files to atomically appear at
1665 // `final_dest_path`. We can copy directly to the final path when atomicity
1666 // is unnecessary, like for files in private backup directories.
1667 const std::string* copy_dest_path;
1668 std::string temp_dest_path;
1669 std::string final_dest_path = GetAbsolutePath(dst_relative);
1670 if (!dst_relative_tmp.empty()) {
1671 temp_dest_path = GetAbsolutePath(dst_relative_tmp);
1672 copy_dest_path = &temp_dest_path;
1673 } else {
1674 copy_dest_path = &final_dest_path;
1675 }
1676
1677 // Step 2: Determine whether to copy or not
1678 // if it's shared, we also need to check if it exists -- if it does, no need
1679 // to copy it again.
1680 bool need_to_copy = true;
1681 // true if final_dest_path is the same path as another live file
1682 const bool same_path =
1683 live_dst_paths.find(final_dest_path) != live_dst_paths.end();
1684
1685 bool file_exists = false;
1686 if (shared && !same_path) {
1687 // Should be in shared directory but not a live path, check existence in
1688 // shared directory
1689 Status exist = backup_env_->FileExists(final_dest_path);
1690 if (exist.ok()) {
1691 file_exists = true;
1692 } else if (exist.IsNotFound()) {
1693 file_exists = false;
1694 } else {
1695 assert(s.IsIOError());
1696 return exist;
1697 }
1698 }
1699
1700 if (!contents.empty()) {
1701 need_to_copy = false;
1702 } else if (shared && (same_path || file_exists)) {
1703 need_to_copy = false;
1704 auto find_result = backuped_file_infos_.find(dst_relative);
1705 if (find_result == backuped_file_infos_.end() && !same_path) {
1706 // file exists but not referenced
1707 ROCKS_LOG_INFO(
1708 options_.info_log,
1709 "%s already present, but not referenced by any backup. We will "
1710 "overwrite the file.",
1711 fname.c_str());
1712 need_to_copy = true;
1713 backup_env_->DeleteFile(final_dest_path);
1714 } else {
1715 // file exists and referenced
1716 if (!has_checksum) {
1717 if (!same_path) {
1718 assert(find_result != backuped_file_infos_.end());
1719 // Note: to save I/O on incremental backups, we copy prior known
1720 // checksum of the file instead of reading entire file contents
1721 // to recompute it.
1722 checksum_hex = find_result->second->checksum_hex;
1723 has_checksum = true;
1724 // Regarding corruption detection, consider:
1725 // (a) the DB file is corrupt (since previous backup) and the backup
1726 // file is OK: we failed to detect, but the backup is safe. DB can
1727 // be repaired/restored once its corruption is detected.
1728 // (b) the backup file is corrupt (since previous backup) and the
1729 // db file is OK: we failed to detect, but the backup is corrupt.
1730 // CreateNewBackup should support fast incremental backups and
1731 // there's no way to support that without reading all the files.
1732 // We might add an option for extra checks on incremental backup,
1733 // but until then, use VerifyBackups to check existing backup data.
1734 // (c) file name collision with legitimately different content.
1735 // This is almost inconceivable with a well-generated DB session
1736 // ID, but even in that case, we double check the file sizes in
1737 // BackupMeta::AddFile.
1738 } else {
1739 // same_path should not happen for a standard DB, so OK to
1740 // read file contents to check for checksum mismatch between
1741 // two files from same DB getting same name.
1742 s = ReadFileAndComputeChecksum(src_dir + fname, db_env_,
1743 src_env_options, size_limit,
1744 &checksum_hex);
1745 if (!s.ok()) {
1746 return s;
1747 }
1748 }
1749 }
1750 if (!db_session_id.empty()) {
1751 ROCKS_LOG_INFO(options_.info_log,
1752 "%s already present, with checksum %s, size %" PRIu64
1753 " and DB session identity %s",
1754 fname.c_str(), checksum_hex.c_str(), size_bytes,
1755 db_session_id.c_str());
1756 } else {
1757 ROCKS_LOG_INFO(options_.info_log,
1758 "%s already present, with checksum %s and size %" PRIu64,
1759 fname.c_str(), checksum_hex.c_str(), size_bytes);
1760 }
1761 }
1762 }
1763 live_dst_paths.insert(final_dest_path);
1764
1765 // Step 3: Add work item
1766 if (!contents.empty() || need_to_copy) {
1767 ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
1768 copy_dest_path->c_str());
1769 CopyOrCreateWorkItem copy_or_create_work_item(
1770 src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
1771 db_env_, backup_env_, src_env_options, options_.sync, rate_limiter,
1772 size_limit, progress_callback, has_checksum, src_checksum_func_name,
1773 checksum_hex, db_id, db_session_id);
1774 BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1775 copy_or_create_work_item.result.get_future(), shared, need_to_copy,
1776 backup_env_, temp_dest_path, final_dest_path, dst_relative);
1777 files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
1778 backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
1779 } else {
1780 std::promise<CopyOrCreateResult> promise_result;
1781 BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1782 promise_result.get_future(), shared, need_to_copy, backup_env_,
1783 temp_dest_path, final_dest_path, dst_relative);
1784 backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
1785 CopyOrCreateResult result;
1786 result.status = s;
1787 result.size = size_bytes;
1788 result.checksum_hex = std::move(checksum_hex);
1789 result.db_id = std::move(db_id);
1790 result.db_session_id = std::move(db_session_id);
1791 promise_result.set_value(std::move(result));
1792 }
1793 return s;
1794 }
1795
1796 Status BackupEngineImpl::ReadFileAndComputeChecksum(
1797 const std::string& src, Env* src_env, const EnvOptions& src_env_options,
1798 uint64_t size_limit, std::string* checksum_hex) {
1799 if (checksum_hex == nullptr) {
1800 return Status::Aborted("Checksum pointer is null");
1801 }
1802 uint32_t checksum_value = 0;
1803 if (size_limit == 0) {
1804 size_limit = std::numeric_limits<uint64_t>::max();
1805 }
1806
1807 std::unique_ptr<SequentialFile> src_file;
1808 Status s = src_env->NewSequentialFile(src, &src_file, src_env_options);
1809 if (!s.ok()) {
1810 return s;
1811 }
1812
1813 std::unique_ptr<SequentialFileReader> src_reader(
1814 new SequentialFileReader(NewLegacySequentialFileWrapper(src_file), src));
1815 std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
1816 Slice data;
1817
1818 do {
1819 if (stop_backup_.load(std::memory_order_acquire)) {
1820 return Status::Incomplete("Backup stopped");
1821 }
1822 size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
1823 copy_file_buffer_size_ : static_cast<size_t>(size_limit);
1824 s = src_reader->Read(buffer_to_read, &data, buf.get());
1825
1826 if (!s.ok()) {
1827 return s;
1828 }
1829
1830 size_limit -= data.size();
1831 checksum_value = crc32c::Extend(checksum_value, data.data(), data.size());
1832 } while (data.size() > 0 && size_limit > 0);
1833
1834 checksum_hex->assign(ChecksumInt32ToHex(checksum_value));
1835
1836 return s;
1837 }
1838
1839 Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
1840 const EnvOptions& src_env_options,
1841 const std::string& file_path,
1842 std::string* db_id,
1843 std::string* db_session_id) {
1844 assert(db_id != nullptr || db_session_id != nullptr);
1845
1846 Options options;
1847 options.env = src_env;
1848 SstFileDumper sst_reader(options, file_path,
1849 2 * 1024 * 1024
1850 /* readahead_size */,
1851 false /* verify_checksum */, false /* output_hex */,
1852 false /* decode_blob_index */, src_env_options,
1853 true /* silent */);
1854
1855 const TableProperties* table_properties = nullptr;
1856 std::shared_ptr<const TableProperties> tp;
1857 Status s = sst_reader.getStatus();
1858
1859 if (s.ok()) {
1860 // Try to get table properties from the table reader of sst_reader
1861 if (!sst_reader.ReadTableProperties(&tp).ok()) {
1862 // Try to use table properites from the initialization of sst_reader
1863 table_properties = sst_reader.GetInitTableProperties();
1864 } else {
1865 table_properties = tp.get();
1866 }
1867 } else {
1868 ROCKS_LOG_INFO(options_.info_log, "Failed to read %s: %s",
1869 file_path.c_str(), s.ToString().c_str());
1870 return s;
1871 }
1872
1873 if (table_properties != nullptr) {
1874 if (db_id != nullptr) {
1875 db_id->assign(table_properties->db_id);
1876 }
1877 if (db_session_id != nullptr) {
1878 db_session_id->assign(table_properties->db_session_id);
1879 if (db_session_id->empty()) {
1880 s = Status::NotFound("DB session identity not found in " + file_path);
1881 ROCKS_LOG_INFO(options_.info_log, "%s", s.ToString().c_str());
1882 return s;
1883 }
1884 }
1885 return Status::OK();
1886 } else {
1887 s = Status::Corruption("Table properties missing in " + file_path);
1888 ROCKS_LOG_INFO(options_.info_log, "%s", s.ToString().c_str());
1889 return s;
1890 }
1891 }
1892
1893 void BackupEngineImpl::DeleteChildren(const std::string& dir,
1894 uint32_t file_type_filter) {
1895 std::vector<std::string> children;
1896 db_env_->GetChildren(dir, &children); // ignore errors
1897
1898 for (const auto& f : children) {
1899 uint64_t number;
1900 FileType type;
1901 bool ok = ParseFileName(f, &number, &type);
1902 if (ok && (file_type_filter & (1 << type))) {
1903 // don't delete this file
1904 continue;
1905 }
1906 db_env_->DeleteFile(dir + "/" + f); // ignore errors
1907 }
1908 }
1909
1910 Status BackupEngineImpl::InsertPathnameToSizeBytes(
1911 const std::string& dir, Env* env,
1912 std::unordered_map<std::string, uint64_t>* result) {
1913 assert(result != nullptr);
1914 std::vector<Env::FileAttributes> files_attrs;
1915 Status status = env->FileExists(dir);
1916 if (status.ok()) {
1917 status = env->GetChildrenFileAttributes(dir, &files_attrs);
1918 } else if (status.IsNotFound()) {
1919 // Insert no entries can be considered success
1920 status = Status::OK();
1921 }
1922 const bool slash_needed = dir.empty() || dir.back() != '/';
1923 for (const auto& file_attrs : files_attrs) {
1924 result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name,
1925 file_attrs.size_bytes);
1926 }
1927 return status;
1928 }
1929
1930 Status BackupEngineImpl::GarbageCollect() {
1931 assert(!read_only_);
1932
1933 // We will make a best effort to remove all garbage even in the presence
1934 // of inconsistencies or I/O failures that inhibit finding garbage.
1935 Status overall_status = Status::OK();
1936 // If all goes well, we don't need another auto-GC this session
1937 might_need_garbage_collect_ = false;
1938
1939 ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection");
1940
1941 // delete obsolete shared files
1942 for (bool with_checksum : {false, true}) {
1943 std::vector<std::string> shared_children;
1944 {
1945 std::string shared_path;
1946 if (with_checksum) {
1947 shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel());
1948 } else {
1949 shared_path = GetAbsolutePath(GetSharedFileRel());
1950 }
1951 auto s = backup_env_->FileExists(shared_path);
1952 if (s.ok()) {
1953 s = backup_env_->GetChildren(shared_path, &shared_children);
1954 } else if (s.IsNotFound()) {
1955 s = Status::OK();
1956 }
1957 if (!s.ok()) {
1958 overall_status = s;
1959 // Trying again later might work
1960 might_need_garbage_collect_ = true;
1961 }
1962 }
1963 for (auto& child : shared_children) {
1964 if (child == "." || child == "..") {
1965 continue;
1966 }
1967 std::string rel_fname;
1968 if (with_checksum) {
1969 rel_fname = GetSharedFileWithChecksumRel(child);
1970 } else {
1971 rel_fname = GetSharedFileRel(child);
1972 }
1973 auto child_itr = backuped_file_infos_.find(rel_fname);
1974 // if it's not refcounted, delete it
1975 if (child_itr == backuped_file_infos_.end() ||
1976 child_itr->second->refs == 0) {
1977 // this might be a directory, but DeleteFile will just fail in that
1978 // case, so we're good
1979 Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
1980 ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
1981 rel_fname.c_str(), s.ToString().c_str());
1982 backuped_file_infos_.erase(rel_fname);
1983 if (!s.ok()) {
1984 // Trying again later might work
1985 might_need_garbage_collect_ = true;
1986 }
1987 }
1988 }
1989 }
1990
1991 // delete obsolete private files
1992 std::vector<std::string> private_children;
1993 {
1994 auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
1995 &private_children);
1996 if (!s.ok()) {
1997 overall_status = s;
1998 // Trying again later might work
1999 might_need_garbage_collect_ = true;
2000 }
2001 }
2002 for (auto& child : private_children) {
2003 if (child == "." || child == "..") {
2004 continue;
2005 }
2006
2007 BackupID backup_id = 0;
2008 bool tmp_dir = child.find(".tmp") != std::string::npos;
2009 sscanf(child.c_str(), "%u", &backup_id);
2010 if (!tmp_dir && // if it's tmp_dir, delete it
2011 (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
2012 // it's either not a number or it's still alive. continue
2013 continue;
2014 }
2015 // here we have to delete the dir and all its children
2016 std::string full_private_path =
2017 GetAbsolutePath(GetPrivateFileRel(backup_id));
2018 std::vector<std::string> subchildren;
2019 backup_env_->GetChildren(full_private_path, &subchildren);
2020 for (auto& subchild : subchildren) {
2021 if (subchild == "." || subchild == "..") {
2022 continue;
2023 }
2024 Status s = backup_env_->DeleteFile(full_private_path + subchild);
2025 ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
2026 (full_private_path + subchild).c_str(),
2027 s.ToString().c_str());
2028 if (!s.ok()) {
2029 // Trying again later might work
2030 might_need_garbage_collect_ = true;
2031 }
2032 }
2033 // finally delete the private dir
2034 Status s = backup_env_->DeleteDir(full_private_path);
2035 ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s",
2036 full_private_path.c_str(), s.ToString().c_str());
2037 if (!s.ok()) {
2038 // Trying again later might work
2039 might_need_garbage_collect_ = true;
2040 }
2041 }
2042
2043 assert(overall_status.ok() || might_need_garbage_collect_);
2044 return overall_status;
2045 }
2046
2047 // ------- BackupMeta class --------
2048
2049 Status BackupEngineImpl::BackupMeta::AddFile(
2050 std::shared_ptr<FileInfo> file_info) {
2051 auto itr = file_infos_->find(file_info->filename);
2052 if (itr == file_infos_->end()) {
2053 auto ret = file_infos_->insert({file_info->filename, file_info});
2054 if (ret.second) {
2055 itr = ret.first;
2056 itr->second->refs = 1;
2057 } else {
2058 // if this happens, something is seriously wrong
2059 return Status::Corruption("In memory metadata insertion error");
2060 }
2061 } else {
2062 // Compare sizes, because we scanned that off the filesystem on both
2063 // ends. This is like a check in VerifyBackup.
2064 if (itr->second->size != file_info->size) {
2065 std::string msg = "Size mismatch for existing backup file: ";
2066 msg.append(file_info->filename);
2067 msg.append(" Size in backup is " + ToString(itr->second->size) +
2068 " while size in DB is " + ToString(file_info->size));
2069 msg.append(
2070 " If this DB file checks as not corrupt, try deleting old"
2071 " backups or backing up to a different backup directory.");
2072 return Status::Corruption(msg);
2073 }
2074 // Note: to save I/O, this check will pass trivially on already backed
2075 // up files that don't have the checksum in their name. And it should
2076 // never fail for files that do have checksum in their name.
2077 if (itr->second->checksum_hex != file_info->checksum_hex) {
2078 // Should never reach here, but produce an appropriate corruption
2079 // message in case we do in a release build.
2080 assert(false);
2081 std::string msg = "Checksum mismatch for existing backup file: ";
2082 msg.append(file_info->filename);
2083 msg.append(" Expected checksum is " + itr->second->checksum_hex +
2084 " while computed checksum is " + file_info->checksum_hex);
2085 msg.append(
2086 " If this DB file checks as not corrupt, try deleting old"
2087 " backups or backing up to a different backup directory.");
2088 return Status::Corruption(msg);
2089 }
2090 ++itr->second->refs; // increase refcount if already present
2091 }
2092
2093 size_ += file_info->size;
2094 files_.push_back(itr->second);
2095
2096 return Status::OK();
2097 }
2098
2099 Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
2100 Status s;
2101 for (const auto& file : files_) {
2102 --file->refs; // decrease refcount
2103 }
2104 files_.clear();
2105 // delete meta file
2106 if (delete_meta) {
2107 s = env_->FileExists(meta_filename_);
2108 if (s.ok()) {
2109 s = env_->DeleteFile(meta_filename_);
2110 } else if (s.IsNotFound()) {
2111 s = Status::OK(); // nothing to delete
2112 }
2113 }
2114 timestamp_ = 0;
2115 return s;
2116 }
2117
2118 Slice kMetaDataPrefix("metadata ");
2119
2120 // each backup meta file is of the format:
2121 // <timestamp>
2122 // <seq number>
2123 // <metadata(literal string)> <metadata> (optional)
2124 // <number of files>
2125 // <file1> <crc32(literal string)> <crc32c_value>
2126 // <file2> <crc32(literal string)> <crc32c_value>
2127 // ...
2128 Status BackupEngineImpl::BackupMeta::LoadFromFile(
2129 const std::string& backup_dir,
2130 const std::unordered_map<std::string, uint64_t>& abs_path_to_size) {
2131 assert(Empty());
2132 Status s;
2133 std::unique_ptr<SequentialFile> backup_meta_file;
2134 s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
2135 if (!s.ok()) {
2136 return s;
2137 }
2138
2139 std::unique_ptr<SequentialFileReader> backup_meta_reader(
2140 new SequentialFileReader(NewLegacySequentialFileWrapper(backup_meta_file),
2141 meta_filename_));
2142 std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
2143 Slice data;
2144 s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());
2145
2146 if (!s.ok() || data.size() == max_backup_meta_file_size_) {
2147 return s.ok() ? Status::Corruption("File size too big") : s;
2148 }
2149 buf[data.size()] = 0;
2150
2151 uint32_t num_files = 0;
2152 char *next;
2153 timestamp_ = strtoull(data.data(), &next, 10);
2154 data.remove_prefix(next - data.data() + 1); // +1 for '\n'
2155 sequence_number_ = strtoull(data.data(), &next, 10);
2156 data.remove_prefix(next - data.data() + 1); // +1 for '\n'
2157
2158 if (data.starts_with(kMetaDataPrefix)) {
2159 // app metadata present
2160 data.remove_prefix(kMetaDataPrefix.size());
2161 Slice hex_encoded_metadata = GetSliceUntil(&data, '\n');
2162 bool decode_success = hex_encoded_metadata.DecodeHex(&app_metadata_);
2163 if (!decode_success) {
2164 return Status::Corruption(
2165 "Failed to decode stored hex encoded app metadata");
2166 }
2167 }
2168
2169 num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10));
2170 data.remove_prefix(next - data.data() + 1); // +1 for '\n'
2171
2172 std::vector<std::shared_ptr<FileInfo>> files;
2173
2174 // WART: The checksums are crc32c, not original crc32
2175 Slice checksum_prefix("crc32 ");
2176
2177 for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
2178 auto line = GetSliceUntil(&data, '\n');
2179 // filename is relative, i.e., shared/number.sst,
2180 // shared_checksum/number.sst, or private/backup_id/number.sst
2181 std::string filename = GetSliceUntil(&line, ' ').ToString();
2182
2183 uint64_t size;
2184 const std::shared_ptr<FileInfo> file_info = GetFile(filename);
2185 if (file_info) {
2186 size = file_info->size;
2187 } else {
2188 std::string abs_path = backup_dir + "/" + filename;
2189 try {
2190 size = abs_path_to_size.at(abs_path);
2191 } catch (std::out_of_range&) {
2192 return Status::Corruption("Size missing for pathname: " + abs_path);
2193 }
2194 }
2195
2196 if (line.empty()) {
2197 return Status::Corruption("File checksum is missing for " + filename +
2198 " in " + meta_filename_);
2199 }
2200
2201 uint32_t checksum_value = 0;
2202 if (line.starts_with(checksum_prefix)) {
2203 line.remove_prefix(checksum_prefix.size());
2204 checksum_value = static_cast<uint32_t>(strtoul(line.data(), nullptr, 10));
2205 if (line != ROCKSDB_NAMESPACE::ToString(checksum_value)) {
2206 return Status::Corruption("Invalid checksum value for " + filename +
2207 " in " + meta_filename_);
2208 }
2209 } else {
2210 return Status::Corruption("Unknown checksum type for " + filename +
2211 " in " + meta_filename_);
2212 }
2213
2214 files.emplace_back(
2215 new FileInfo(filename, size, ChecksumInt32ToHex(checksum_value)));
2216 }
2217
2218 if (s.ok() && data.size() > 0) {
2219 // file has to be read completely. if not, we count it as corruption
2220 s = Status::Corruption("Tailing data in backup meta file in " +
2221 meta_filename_);
2222 }
2223
2224 if (s.ok()) {
2225 files_.reserve(files.size());
2226 for (const auto& file_info : files) {
2227 s = AddFile(file_info);
2228 if (!s.ok()) {
2229 break;
2230 }
2231 }
2232 }
2233
2234 return s;
2235 }
2236
2237 Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
2238 Status s;
2239 std::unique_ptr<WritableFile> backup_meta_file;
2240 EnvOptions env_options;
2241 env_options.use_mmap_writes = false;
2242 env_options.use_direct_writes = false;
2243 s = env_->NewWritableFile(meta_tmp_filename_, &backup_meta_file, env_options);
2244 if (!s.ok()) {
2245 return s;
2246 }
2247
2248 std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
2249 size_t len = 0, buf_size = max_backup_meta_file_size_;
2250 len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_);
2251 len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
2252 sequence_number_);
2253 if (!app_metadata_.empty()) {
2254 std::string hex_encoded_metadata =
2255 Slice(app_metadata_).ToString(/* hex */ true);
2256
2257 // +1 to accommodate newline character
2258 size_t hex_meta_strlen =
2259 kMetaDataPrefix.ToString().length() + hex_encoded_metadata.length() + 1;
2260 if (hex_meta_strlen >= buf_size) {
2261 return Status::Corruption("Buffer too small to fit backup metadata");
2262 }
2263 else if (len + hex_meta_strlen >= buf_size) {
2264 backup_meta_file->Append(Slice(buf.get(), len));
2265 buf.reset();
2266 std::unique_ptr<char[]> new_reset_buf(
2267 new char[max_backup_meta_file_size_]);
2268 buf.swap(new_reset_buf);
2269 len = 0;
2270 }
2271 len += snprintf(buf.get() + len, buf_size - len, "%s%s\n",
2272 kMetaDataPrefix.ToString().c_str(),
2273 hex_encoded_metadata.c_str());
2274 }
2275
2276 char writelen_temp[19];
2277 if (len + snprintf(writelen_temp, sizeof(writelen_temp),
2278 "%" ROCKSDB_PRIszt "\n", files_.size()) >= buf_size) {
2279 backup_meta_file->Append(Slice(buf.get(), len));
2280 buf.reset();
2281 std::unique_ptr<char[]> new_reset_buf(new char[max_backup_meta_file_size_]);
2282 buf.swap(new_reset_buf);
2283 len = 0;
2284 }
2285 {
2286 const char *const_write = writelen_temp;
2287 len += snprintf(buf.get() + len, buf_size - len, "%s", const_write);
2288 }
2289
2290 for (const auto& file : files_) {
2291 // use crc32c for now, switch to something else if needed
2292 // WART: The checksums are crc32c, not original crc32
2293
2294 size_t newlen =
2295 len + file->filename.length() +
2296 snprintf(writelen_temp, sizeof(writelen_temp), " crc32 %u\n",
2297 ChecksumHexToInt32(file->checksum_hex));
2298 const char* const_write = writelen_temp;
2299 if (newlen >= buf_size) {
2300 backup_meta_file->Append(Slice(buf.get(), len));
2301 buf.reset();
2302 std::unique_ptr<char[]> new_reset_buf(
2303 new char[max_backup_meta_file_size_]);
2304 buf.swap(new_reset_buf);
2305 len = 0;
2306 }
2307 len += snprintf(buf.get() + len, buf_size - len, "%s%s",
2308 file->filename.c_str(), const_write);
2309 }
2310
2311 s = backup_meta_file->Append(Slice(buf.get(), len));
2312 if (s.ok() && sync) {
2313 s = backup_meta_file->Sync();
2314 }
2315 if (s.ok()) {
2316 s = backup_meta_file->Close();
2317 }
2318 if (s.ok()) {
2319 s = env_->RenameFile(meta_tmp_filename_, meta_filename_);
2320 }
2321 return s;
2322 }
2323
2324 // -------- BackupEngineReadOnlyImpl ---------
2325 class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
2326 public:
2327 BackupEngineReadOnlyImpl(const BackupableDBOptions& options, Env* db_env)
2328 : backup_engine_(new BackupEngineImpl(options, db_env, true)) {}
2329
2330 ~BackupEngineReadOnlyImpl() override {}
2331
2332 // The returned BackupInfos are in chronological order, which means the
2333 // latest backup comes last.
2334 void GetBackupInfo(std::vector<BackupInfo>* backup_info) override {
2335 backup_engine_->GetBackupInfo(backup_info);
2336 }
2337
2338 void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override {
2339 backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
2340 }
2341
2342 using BackupEngineReadOnly::RestoreDBFromBackup;
2343 Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
2344 const std::string& db_dir,
2345 const std::string& wal_dir) override {
2346 return backup_engine_->RestoreDBFromBackup(options, backup_id, db_dir,
2347 wal_dir);
2348 }
2349
2350 using BackupEngineReadOnly::RestoreDBFromLatestBackup;
2351 Status RestoreDBFromLatestBackup(const RestoreOptions& options,
2352 const std::string& db_dir,
2353 const std::string& wal_dir) override {
2354 return backup_engine_->RestoreDBFromLatestBackup(options, db_dir, wal_dir);
2355 }
2356
2357 Status VerifyBackup(BackupID backup_id,
2358 bool verify_with_checksum = false) override {
2359 return backup_engine_->VerifyBackup(backup_id, verify_with_checksum);
2360 }
2361
2362 Status Initialize() { return backup_engine_->Initialize(); }
2363
2364 private:
2365 std::unique_ptr<BackupEngineImpl> backup_engine_;
2366 };
2367
2368 Status BackupEngineReadOnly::Open(const BackupableDBOptions& options, Env* env,
2369 BackupEngineReadOnly** backup_engine_ptr) {
2370 if (options.destroy_old_data) {
2371 return Status::InvalidArgument(
2372 "Can't destroy old data with ReadOnly BackupEngine");
2373 }
2374 std::unique_ptr<BackupEngineReadOnlyImpl> backup_engine(
2375 new BackupEngineReadOnlyImpl(options, env));
2376 auto s = backup_engine->Initialize();
2377 if (!s.ok()) {
2378 *backup_engine_ptr = nullptr;
2379 return s;
2380 }
2381 *backup_engine_ptr = backup_engine.release();
2382 return Status::OK();
2383 }
2384
2385 } // namespace ROCKSDB_NAMESPACE
2386
2387 #endif // ROCKSDB_LITE