]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/backupable/backupable_db.cc
4cafc6ab1483925173af33bc507334f4f1a4140c
[ceph.git] / ceph / src / rocksdb / utilities / backupable / backupable_db.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifndef ROCKSDB_LITE
11
12 #include "rocksdb/utilities/backupable_db.h"
13 #include "port/port.h"
14 #include "rocksdb/rate_limiter.h"
15 #include "rocksdb/transaction_log.h"
16 #include "util/channel.h"
17 #include "util/coding.h"
18 #include "util/crc32c.h"
19 #include "util/file_reader_writer.h"
20 #include "util/filename.h"
21 #include "util/logging.h"
22 #include "util/string_util.h"
23 #include "util/sync_point.h"
24 #include "utilities/checkpoint/checkpoint_impl.h"
25
26 #ifndef __STDC_FORMAT_MACROS
27 #define __STDC_FORMAT_MACROS
28 #endif // __STDC_FORMAT_MACROS
29
30 #include <inttypes.h>
31 #include <stdlib.h>
32 #include <algorithm>
33 #include <atomic>
34 #include <functional>
35 #include <future>
36 #include <limits>
37 #include <map>
38 #include <mutex>
39 #include <sstream>
40 #include <string>
41 #include <thread>
42 #include <unordered_map>
43 #include <unordered_set>
44 #include <vector>
45
46 namespace rocksdb {
47
48 void BackupStatistics::IncrementNumberSuccessBackup() {
49 number_success_backup++;
50 }
51 void BackupStatistics::IncrementNumberFailBackup() {
52 number_fail_backup++;
53 }
54
55 uint32_t BackupStatistics::GetNumberSuccessBackup() const {
56 return number_success_backup;
57 }
58 uint32_t BackupStatistics::GetNumberFailBackup() const {
59 return number_fail_backup;
60 }
61
62 std::string BackupStatistics::ToString() const {
63 char result[50];
64 snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u",
65 GetNumberSuccessBackup(), GetNumberFailBackup());
66 return result;
67 }
68
69 void BackupableDBOptions::Dump(Logger* logger) const {
70 ROCKS_LOG_INFO(logger, " Options.backup_dir: %s",
71 backup_dir.c_str());
72 ROCKS_LOG_INFO(logger, " Options.backup_env: %p", backup_env);
73 ROCKS_LOG_INFO(logger, " Options.share_table_files: %d",
74 static_cast<int>(share_table_files));
75 ROCKS_LOG_INFO(logger, " Options.info_log: %p", info_log);
76 ROCKS_LOG_INFO(logger, " Options.sync: %d",
77 static_cast<int>(sync));
78 ROCKS_LOG_INFO(logger, " Options.destroy_old_data: %d",
79 static_cast<int>(destroy_old_data));
80 ROCKS_LOG_INFO(logger, " Options.backup_log_files: %d",
81 static_cast<int>(backup_log_files));
82 ROCKS_LOG_INFO(logger, " Options.backup_rate_limit: %" PRIu64,
83 backup_rate_limit);
84 ROCKS_LOG_INFO(logger, " Options.restore_rate_limit: %" PRIu64,
85 restore_rate_limit);
86 ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d",
87 max_background_operations);
88 }
89
90 // -------- BackupEngineImpl class ---------
91 class BackupEngineImpl : public BackupEngine {
92 public:
93 BackupEngineImpl(Env* db_env, const BackupableDBOptions& options,
94 bool read_only = false);
95 ~BackupEngineImpl();
96 Status CreateNewBackupWithMetadata(DB* db, const std::string& app_metadata,
97 bool flush_before_backup = false,
98 std::function<void()> progress_callback =
99 []() {}) override;
100 Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
101 Status DeleteBackup(BackupID backup_id) override;
102 void StopBackup() override {
103 stop_backup_.store(true, std::memory_order_release);
104 }
105 Status GarbageCollect() override;
106
107 // The returned BackupInfos are in chronological order, which means the
108 // latest backup comes last.
109 void GetBackupInfo(std::vector<BackupInfo>* backup_info) override;
110 void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override;
111 Status RestoreDBFromBackup(
112 BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
113 const RestoreOptions& restore_options = RestoreOptions()) override;
114 Status RestoreDBFromLatestBackup(
115 const std::string& db_dir, const std::string& wal_dir,
116 const RestoreOptions& restore_options = RestoreOptions()) override {
117 return RestoreDBFromBackup(latest_valid_backup_id_, db_dir, wal_dir,
118 restore_options);
119 }
120
121 virtual Status VerifyBackup(BackupID backup_id) override;
122
123 Status Initialize();
124
125 private:
126 void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
127
128 // Extends the "result" map with pathname->size mappings for the contents of
129 // "dir" in "env". Pathnames are prefixed with "dir".
130 Status InsertPathnameToSizeBytes(
131 const std::string& dir, Env* env,
132 std::unordered_map<std::string, uint64_t>* result);
133
134 struct FileInfo {
135 FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
136 : refs(0), filename(fname), size(sz), checksum_value(checksum) {}
137
138 FileInfo(const FileInfo&) = delete;
139 FileInfo& operator=(const FileInfo&) = delete;
140
141 int refs;
142 const std::string filename;
143 const uint64_t size;
144 const uint32_t checksum_value;
145 };
146
147 class BackupMeta {
148 public:
149 BackupMeta(
150 const std::string& meta_filename, const std::string& meta_tmp_filename,
151 std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
152 Env* env)
153 : timestamp_(0),
154 sequence_number_(0),
155 size_(0),
156 meta_filename_(meta_filename),
157 meta_tmp_filename_(meta_tmp_filename),
158 file_infos_(file_infos),
159 env_(env) {}
160
161 BackupMeta(const BackupMeta&) = delete;
162 BackupMeta& operator=(const BackupMeta&) = delete;
163
164 ~BackupMeta() {}
165
166 void RecordTimestamp() {
167 env_->GetCurrentTime(&timestamp_);
168 }
169 int64_t GetTimestamp() const {
170 return timestamp_;
171 }
172 uint64_t GetSize() const {
173 return size_;
174 }
175 uint32_t GetNumberFiles() { return static_cast<uint32_t>(files_.size()); }
176 void SetSequenceNumber(uint64_t sequence_number) {
177 sequence_number_ = sequence_number;
178 }
179 uint64_t GetSequenceNumber() {
180 return sequence_number_;
181 }
182
183 const std::string& GetAppMetadata() const { return app_metadata_; }
184
185 void SetAppMetadata(const std::string& app_metadata) {
186 app_metadata_ = app_metadata;
187 }
188
189 Status AddFile(std::shared_ptr<FileInfo> file_info);
190
191 Status Delete(bool delete_meta = true);
192
193 bool Empty() {
194 return files_.empty();
195 }
196
197 std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
198 auto it = file_infos_->find(filename);
199 if (it == file_infos_->end())
200 return nullptr;
201 return it->second;
202 }
203
204 const std::vector<std::shared_ptr<FileInfo>>& GetFiles() {
205 return files_;
206 }
207
208 // @param abs_path_to_size Pre-fetched file sizes (bytes).
209 Status LoadFromFile(
210 const std::string& backup_dir,
211 const std::unordered_map<std::string, uint64_t>& abs_path_to_size);
212 Status StoreToFile(bool sync);
213
214 std::string GetInfoString() {
215 std::ostringstream ss;
216 ss << "Timestamp: " << timestamp_ << std::endl;
217 char human_size[16];
218 AppendHumanBytes(size_, human_size, sizeof(human_size));
219 ss << "Size: " << human_size << std::endl;
220 ss << "Files:" << std::endl;
221 for (const auto& file : files_) {
222 AppendHumanBytes(file->size, human_size, sizeof(human_size));
223 ss << file->filename << ", size " << human_size << ", refs "
224 << file->refs << std::endl;
225 }
226 return ss.str();
227 }
228
229 private:
230 int64_t timestamp_;
231 // sequence number is only approximate, should not be used
232 // by clients
233 uint64_t sequence_number_;
234 uint64_t size_;
235 std::string app_metadata_;
236 std::string const meta_filename_;
237 std::string const meta_tmp_filename_;
238 // files with relative paths (without "/" prefix!!)
239 std::vector<std::shared_ptr<FileInfo>> files_;
240 std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
241 Env* env_;
242
243 static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB
244 }; // BackupMeta
245
246 inline std::string GetAbsolutePath(
247 const std::string &relative_path = "") const {
248 assert(relative_path.size() == 0 || relative_path[0] != '/');
249 return options_.backup_dir + "/" + relative_path;
250 }
251 inline std::string GetPrivateDirRel() const {
252 return "private";
253 }
254 inline std::string GetSharedChecksumDirRel() const {
255 return "shared_checksum";
256 }
257 inline std::string GetPrivateFileRel(BackupID backup_id,
258 bool tmp = false,
259 const std::string& file = "") const {
260 assert(file.size() == 0 || file[0] != '/');
261 return GetPrivateDirRel() + "/" + rocksdb::ToString(backup_id) +
262 (tmp ? ".tmp" : "") + "/" + file;
263 }
264 inline std::string GetSharedFileRel(const std::string& file = "",
265 bool tmp = false) const {
266 assert(file.size() == 0 || file[0] != '/');
267 return std::string("shared/") + (tmp ? "." : "") + file +
268 (tmp ? ".tmp" : "");
269 }
270 inline std::string GetSharedFileWithChecksumRel(const std::string& file = "",
271 bool tmp = false) const {
272 assert(file.size() == 0 || file[0] != '/');
273 return GetSharedChecksumDirRel() + "/" + (tmp ? "." : "") + file +
274 (tmp ? ".tmp" : "");
275 }
276 inline std::string GetSharedFileWithChecksum(const std::string& file,
277 const uint32_t checksum_value,
278 const uint64_t file_size) const {
279 assert(file.size() == 0 || file[0] != '/');
280 std::string file_copy = file;
281 return file_copy.insert(file_copy.find_last_of('.'),
282 "_" + rocksdb::ToString(checksum_value) + "_" +
283 rocksdb::ToString(file_size));
284 }
285 inline std::string GetFileFromChecksumFile(const std::string& file) const {
286 assert(file.size() == 0 || file[0] != '/');
287 std::string file_copy = file;
288 size_t first_underscore = file_copy.find_first_of('_');
289 return file_copy.erase(first_underscore,
290 file_copy.find_last_of('.') - first_underscore);
291 }
292 inline std::string GetBackupMetaDir() const {
293 return GetAbsolutePath("meta");
294 }
295 inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const {
296 return GetBackupMetaDir() + "/" + (tmp ? "." : "") +
297 rocksdb::ToString(backup_id) + (tmp ? ".tmp" : "");
298 }
299
300 // If size_limit == 0, there is no size limit, copy everything.
301 //
302 // Exactly one of src and contents must be non-empty.
303 //
304 // @param src If non-empty, the file is copied from this pathname.
305 // @param contents If non-empty, the file will be created with these contents.
306 Status CopyOrCreateFile(const std::string& src, const std::string& dst,
307 const std::string& contents, Env* src_env,
308 Env* dst_env, bool sync, RateLimiter* rate_limiter,
309 uint64_t* size = nullptr,
310 uint32_t* checksum_value = nullptr,
311 uint64_t size_limit = 0,
312 std::function<void()> progress_callback = []() {});
313
314 Status CalculateChecksum(const std::string& src,
315 Env* src_env,
316 uint64_t size_limit,
317 uint32_t* checksum_value);
318
319 struct CopyOrCreateResult {
320 uint64_t size;
321 uint32_t checksum_value;
322 Status status;
323 };
324
325 // Exactly one of src_path and contents must be non-empty. If src_path is
326 // non-empty, the file is copied from this pathname. Otherwise, if contents is
327 // non-empty, the file will be created at dst_path with these contents.
328 struct CopyOrCreateWorkItem {
329 std::string src_path;
330 std::string dst_path;
331 std::string contents;
332 Env* src_env;
333 Env* dst_env;
334 bool sync;
335 RateLimiter* rate_limiter;
336 uint64_t size_limit;
337 std::promise<CopyOrCreateResult> result;
338 std::function<void()> progress_callback;
339
340 CopyOrCreateWorkItem()
341 : src_path(""),
342 dst_path(""),
343 contents(""),
344 src_env(nullptr),
345 dst_env(nullptr),
346 sync(false),
347 rate_limiter(nullptr),
348 size_limit(0) {}
349
350 CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
351 CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
352
353 CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
354 *this = std::move(o);
355 }
356
357 CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
358 src_path = std::move(o.src_path);
359 dst_path = std::move(o.dst_path);
360 contents = std::move(o.contents);
361 src_env = o.src_env;
362 dst_env = o.dst_env;
363 sync = o.sync;
364 rate_limiter = o.rate_limiter;
365 size_limit = o.size_limit;
366 result = std::move(o.result);
367 progress_callback = std::move(o.progress_callback);
368 return *this;
369 }
370
371 CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path,
372 std::string _contents, Env* _src_env, Env* _dst_env,
373 bool _sync, RateLimiter* _rate_limiter,
374 uint64_t _size_limit,
375 std::function<void()> _progress_callback = []() {})
376 : src_path(std::move(_src_path)),
377 dst_path(std::move(_dst_path)),
378 contents(std::move(_contents)),
379 src_env(_src_env),
380 dst_env(_dst_env),
381 sync(_sync),
382 rate_limiter(_rate_limiter),
383 size_limit(_size_limit),
384 progress_callback(_progress_callback) {}
385 };
386
387 struct BackupAfterCopyOrCreateWorkItem {
388 std::future<CopyOrCreateResult> result;
389 bool shared;
390 bool needed_to_copy;
391 Env* backup_env;
392 std::string dst_path_tmp;
393 std::string dst_path;
394 std::string dst_relative;
395 BackupAfterCopyOrCreateWorkItem()
396 : shared(false),
397 needed_to_copy(false),
398 backup_env(nullptr),
399 dst_path_tmp(""),
400 dst_path(""),
401 dst_relative("") {}
402
403 BackupAfterCopyOrCreateWorkItem(BackupAfterCopyOrCreateWorkItem&& o)
404 ROCKSDB_NOEXCEPT {
405 *this = std::move(o);
406 }
407
408 BackupAfterCopyOrCreateWorkItem& operator=(
409 BackupAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
410 result = std::move(o.result);
411 shared = o.shared;
412 needed_to_copy = o.needed_to_copy;
413 backup_env = o.backup_env;
414 dst_path_tmp = std::move(o.dst_path_tmp);
415 dst_path = std::move(o.dst_path);
416 dst_relative = std::move(o.dst_relative);
417 return *this;
418 }
419
420 BackupAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
421 bool _shared, bool _needed_to_copy,
422 Env* _backup_env, std::string _dst_path_tmp,
423 std::string _dst_path,
424 std::string _dst_relative)
425 : result(std::move(_result)),
426 shared(_shared),
427 needed_to_copy(_needed_to_copy),
428 backup_env(_backup_env),
429 dst_path_tmp(std::move(_dst_path_tmp)),
430 dst_path(std::move(_dst_path)),
431 dst_relative(std::move(_dst_relative)) {}
432 };
433
434 struct RestoreAfterCopyOrCreateWorkItem {
435 std::future<CopyOrCreateResult> result;
436 uint32_t checksum_value;
437 RestoreAfterCopyOrCreateWorkItem()
438 : checksum_value(0) {}
439 RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
440 uint32_t _checksum_value)
441 : result(std::move(_result)), checksum_value(_checksum_value) {}
442 RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem&& o)
443 ROCKSDB_NOEXCEPT {
444 *this = std::move(o);
445 }
446
447 RestoreAfterCopyOrCreateWorkItem& operator=(
448 RestoreAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
449 result = std::move(o.result);
450 checksum_value = o.checksum_value;
451 return *this;
452 }
453 };
454
455 bool initialized_;
456 std::mutex byte_report_mutex_;
457 channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
458 std::vector<port::Thread> threads_;
459
460 // Adds a file to the backup work queue to be copied or created if it doesn't
461 // already exist.
462 //
463 // Exactly one of src_dir and contents must be non-empty.
464 //
465 // @param src_dir If non-empty, the file in this directory named fname will be
466 // copied.
467 // @param fname Name of destination file and, in case of copy, source file.
468 // @param contents If non-empty, the file will be created with these contents.
469 Status AddBackupFileWorkItem(
470 std::unordered_set<std::string>& live_dst_paths,
471 std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
472 BackupID backup_id, bool shared, const std::string& src_dir,
473 const std::string& fname, // starts with "/"
474 RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit = 0,
475 bool shared_checksum = false,
476 std::function<void()> progress_callback = []() {},
477 const std::string& contents = std::string());
478
479 // backup state data
480 BackupID latest_backup_id_;
481 BackupID latest_valid_backup_id_;
482 std::map<BackupID, unique_ptr<BackupMeta>> backups_;
483 std::map<BackupID,
484 std::pair<Status, unique_ptr<BackupMeta>>> corrupt_backups_;
485 std::unordered_map<std::string,
486 std::shared_ptr<FileInfo>> backuped_file_infos_;
487 std::atomic<bool> stop_backup_;
488
489 // options data
490 BackupableDBOptions options_;
491 Env* db_env_;
492 Env* backup_env_;
493
494 // directories
495 unique_ptr<Directory> backup_directory_;
496 unique_ptr<Directory> shared_directory_;
497 unique_ptr<Directory> meta_directory_;
498 unique_ptr<Directory> private_directory_;
499
500 static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB
501 size_t copy_file_buffer_size_;
502 bool read_only_;
503 BackupStatistics backup_statistics_;
504 static const size_t kMaxAppMetaSize = 1024 * 1024; // 1MB
505 };
506
507 Status BackupEngine::Open(Env* env, const BackupableDBOptions& options,
508 BackupEngine** backup_engine_ptr) {
509 std::unique_ptr<BackupEngineImpl> backup_engine(
510 new BackupEngineImpl(env, options));
511 auto s = backup_engine->Initialize();
512 if (!s.ok()) {
513 *backup_engine_ptr = nullptr;
514 return s;
515 }
516 *backup_engine_ptr = backup_engine.release();
517 return Status::OK();
518 }
519
520 BackupEngineImpl::BackupEngineImpl(Env* db_env,
521 const BackupableDBOptions& options,
522 bool read_only)
523 : initialized_(false),
524 latest_backup_id_(0),
525 latest_valid_backup_id_(0),
526 stop_backup_(false),
527 options_(options),
528 db_env_(db_env),
529 backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
530 copy_file_buffer_size_(kDefaultCopyFileBufferSize),
531 read_only_(read_only) {
532 if (options_.backup_rate_limiter == nullptr &&
533 options_.backup_rate_limit > 0) {
534 options_.backup_rate_limiter.reset(
535 NewGenericRateLimiter(options_.backup_rate_limit));
536 }
537 if (options_.restore_rate_limiter == nullptr &&
538 options_.restore_rate_limit > 0) {
539 options_.restore_rate_limiter.reset(
540 NewGenericRateLimiter(options_.restore_rate_limit));
541 }
542 }
543
544 BackupEngineImpl::~BackupEngineImpl() {
545 files_to_copy_or_create_.sendEof();
546 for (auto& t : threads_) {
547 t.join();
548 }
549 LogFlush(options_.info_log);
550 }
551
552 Status BackupEngineImpl::Initialize() {
553 assert(!initialized_);
554 initialized_ = true;
555 if (read_only_) {
556 ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine");
557 }
558 options_.Dump(options_.info_log);
559
560 if (!read_only_) {
561 // gather the list of directories that we need to create
562 std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
563 directories;
564 directories.emplace_back(GetAbsolutePath(), &backup_directory_);
565 if (options_.share_table_files) {
566 if (options_.share_files_with_checksum) {
567 directories.emplace_back(
568 GetAbsolutePath(GetSharedFileWithChecksumRel()),
569 &shared_directory_);
570 } else {
571 directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
572 &shared_directory_);
573 }
574 }
575 directories.emplace_back(GetAbsolutePath(GetPrivateDirRel()),
576 &private_directory_);
577 directories.emplace_back(GetBackupMetaDir(), &meta_directory_);
578 // create all the dirs we need
579 for (const auto& d : directories) {
580 auto s = backup_env_->CreateDirIfMissing(d.first);
581 if (s.ok()) {
582 s = backup_env_->NewDirectory(d.first, d.second);
583 }
584 if (!s.ok()) {
585 return s;
586 }
587 }
588 }
589
590 std::vector<std::string> backup_meta_files;
591 {
592 auto s = backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
593 if (s.IsNotFound()) {
594 return Status::NotFound(GetBackupMetaDir() + " is missing");
595 } else if (!s.ok()) {
596 return s;
597 }
598 }
599 // create backups_ structure
600 for (auto& file : backup_meta_files) {
601 if (file == "." || file == "..") {
602 continue;
603 }
604 ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str());
605 BackupID backup_id = 0;
606 sscanf(file.c_str(), "%u", &backup_id);
607 if (backup_id == 0 || file != rocksdb::ToString(backup_id)) {
608 if (!read_only_) {
609 // invalid file name, delete that
610 auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
611 ROCKS_LOG_INFO(options_.info_log,
612 "Unrecognized meta file %s, deleting -- %s",
613 file.c_str(), s.ToString().c_str());
614 }
615 continue;
616 }
617 assert(backups_.find(backup_id) == backups_.end());
618 backups_.insert(std::make_pair(
619 backup_id, unique_ptr<BackupMeta>(new BackupMeta(
620 GetBackupMetaFile(backup_id, false /* tmp */),
621 GetBackupMetaFile(backup_id, true /* tmp */),
622 &backuped_file_infos_, backup_env_))));
623 }
624
625 latest_backup_id_ = 0;
626 latest_valid_backup_id_ = 0;
627 if (options_.destroy_old_data) { // Destroy old data
628 assert(!read_only_);
629 ROCKS_LOG_INFO(
630 options_.info_log,
631 "Backup Engine started with destroy_old_data == true, deleting all "
632 "backups");
633 auto s = PurgeOldBackups(0);
634 if (s.ok()) {
635 s = GarbageCollect();
636 }
637 if (!s.ok()) {
638 return s;
639 }
640 } else { // Load data from storage
641 std::unordered_map<std::string, uint64_t> abs_path_to_size;
642 for (const auto& rel_dir :
643 {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
644 const auto abs_dir = GetAbsolutePath(rel_dir);
645 InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size);
646 }
647 // load the backups if any, until valid_backups_to_open of the latest
648 // non-corrupted backups have been successfully opened.
649 int valid_backups_to_open = options_.max_valid_backups_to_open;
650 for (auto backup_iter = backups_.rbegin();
651 backup_iter != backups_.rend();
652 ++backup_iter) {
653 assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
654 if (latest_backup_id_ == 0) {
655 latest_backup_id_ = backup_iter->first;
656 }
657 if (valid_backups_to_open == 0) {
658 break;
659 }
660
661 InsertPathnameToSizeBytes(
662 GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_,
663 &abs_path_to_size);
664 Status s = backup_iter->second->LoadFromFile(options_.backup_dir,
665 abs_path_to_size);
666 if (s.IsCorruption()) {
667 ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
668 backup_iter->first, s.ToString().c_str());
669 corrupt_backups_.insert(
670 std::make_pair(backup_iter->first,
671 std::make_pair(s, std::move(backup_iter->second))));
672 } else if (!s.ok()) {
673 // Distinguish corruption errors from errors in the backup Env.
674 // Errors in the backup Env (i.e., this code path) will cause Open() to
675 // fail, whereas corruption errors would not cause Open() failures.
676 return s;
677 } else {
678 ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
679 backup_iter->first,
680 backup_iter->second->GetInfoString().c_str());
681 assert(latest_valid_backup_id_ == 0 ||
682 latest_valid_backup_id_ > backup_iter->first);
683 if (latest_valid_backup_id_ == 0) {
684 latest_valid_backup_id_ = backup_iter->first;
685 }
686 --valid_backups_to_open;
687 }
688 }
689
690 for (const auto& corrupt : corrupt_backups_) {
691 backups_.erase(backups_.find(corrupt.first));
692 }
693 // erase the backups before max_valid_backups_to_open
694 int num_unopened_backups;
695 if (options_.max_valid_backups_to_open == 0) {
696 num_unopened_backups = 0;
697 } else {
698 num_unopened_backups =
699 std::max(0, static_cast<int>(backups_.size()) -
700 options_.max_valid_backups_to_open);
701 }
702 for (int i = 0; i < num_unopened_backups; ++i) {
703 assert(backups_.begin()->second->Empty());
704 backups_.erase(backups_.begin());
705 }
706 }
707
708 ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_);
709 ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
710 latest_valid_backup_id_);
711
712 // set up threads perform copies from files_to_copy_or_create_ in the
713 // background
714 for (int t = 0; t < options_.max_background_operations; t++) {
715 threads_.emplace_back([this]() {
716 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
717 #if __GLIBC_PREREQ(2, 12)
718 pthread_setname_np(pthread_self(), "backup_engine");
719 #endif
720 #endif
721 CopyOrCreateWorkItem work_item;
722 while (files_to_copy_or_create_.read(work_item)) {
723 CopyOrCreateResult result;
724 result.status = CopyOrCreateFile(
725 work_item.src_path, work_item.dst_path, work_item.contents,
726 work_item.src_env, work_item.dst_env, work_item.sync,
727 work_item.rate_limiter, &result.size, &result.checksum_value,
728 work_item.size_limit, work_item.progress_callback);
729 work_item.result.set_value(std::move(result));
730 }
731 });
732 }
733 ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
734
735 return Status::OK();
736 }
737
738 Status BackupEngineImpl::CreateNewBackupWithMetadata(
739 DB* db, const std::string& app_metadata, bool flush_before_backup,
740 std::function<void()> progress_callback) {
741 assert(initialized_);
742 assert(!read_only_);
743 if (app_metadata.size() > kMaxAppMetaSize) {
744 return Status::InvalidArgument("App metadata too large");
745 }
746
747 BackupID new_backup_id = latest_backup_id_ + 1;
748
749 assert(backups_.find(new_backup_id) == backups_.end());
750
751 auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
752 Status s = backup_env_->FileExists(private_dir);
753 if (s.ok()) {
754 // maybe last backup failed and left partial state behind, clean it up.
755 // need to do this before updating backups_ such that a private dir
756 // named after new_backup_id will be cleaned up
757 s = GarbageCollect();
758 } else if (s.IsNotFound()) {
759 // normal case, the new backup's private dir doesn't exist yet
760 s = Status::OK();
761 }
762
763 auto ret = backups_.insert(std::make_pair(
764 new_backup_id, unique_ptr<BackupMeta>(new BackupMeta(
765 GetBackupMetaFile(new_backup_id, false /* tmp */),
766 GetBackupMetaFile(new_backup_id, true /* tmp */),
767 &backuped_file_infos_, backup_env_))));
768 assert(ret.second == true);
769 auto& new_backup = ret.first->second;
770 new_backup->RecordTimestamp();
771 new_backup->SetAppMetadata(app_metadata);
772
773 auto start_backup = backup_env_->NowMicros();
774
775 ROCKS_LOG_INFO(options_.info_log,
776 "Started the backup process -- creating backup %u",
777 new_backup_id);
778 if (s.ok()) {
779 s = backup_env_->CreateDir(private_dir);
780 }
781
782 RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
783 if (rate_limiter) {
784 copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
785 }
786
787 // A set into which we will insert the dst_paths that are calculated for live
788 // files and live WAL files.
789 // This is used to check whether a live files shares a dst_path with another
790 // live file.
791 std::unordered_set<std::string> live_dst_paths;
792
793 std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
794 // Add a CopyOrCreateWorkItem to the channel for each live file
795 db->DisableFileDeletions();
796 if (s.ok()) {
797 CheckpointImpl checkpoint(db);
798 uint64_t sequence_number = 0;
799 s = checkpoint.CreateCustomCheckpoint(
800 db->GetDBOptions(),
801 [&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
802 FileType) {
803 // custom checkpoint will switch to calling copy_file_cb after it sees
804 // NotSupported returned from link_file_cb.
805 return Status::NotSupported();
806 } /* link_file_cb */,
807 [&](const std::string& src_dirname, const std::string& fname,
808 uint64_t size_limit_bytes, FileType type) {
809 if (type == kLogFile && !options_.backup_log_files) {
810 return Status::OK();
811 }
812 Log(options_.info_log, "add file for backup %s", fname.c_str());
813 uint64_t size_bytes = 0;
814 Status st;
815 if (type == kTableFile) {
816 st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
817 }
818 if (st.ok()) {
819 st = AddBackupFileWorkItem(
820 live_dst_paths, backup_items_to_finish, new_backup_id,
821 options_.share_table_files && type == kTableFile, src_dirname,
822 fname, rate_limiter, size_bytes, size_limit_bytes,
823 options_.share_files_with_checksum && type == kTableFile,
824 progress_callback);
825 }
826 return st;
827 } /* copy_file_cb */,
828 [&](const std::string& fname, const std::string& contents, FileType) {
829 Log(options_.info_log, "add file for backup %s", fname.c_str());
830 return AddBackupFileWorkItem(
831 live_dst_paths, backup_items_to_finish, new_backup_id,
832 false /* shared */, "" /* src_dir */, fname, rate_limiter,
833 contents.size(), 0 /* size_limit */, false /* shared_checksum */,
834 progress_callback, contents);
835 } /* create_file_cb */,
836 &sequence_number, flush_before_backup ? 0 : port::kMaxUint64);
837 if (s.ok()) {
838 new_backup->SetSequenceNumber(sequence_number);
839 }
840 }
841 ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
842 Status item_status;
843 for (auto& item : backup_items_to_finish) {
844 item.result.wait();
845 auto result = item.result.get();
846 item_status = result.status;
847 if (item_status.ok() && item.shared && item.needed_to_copy) {
848 item_status = item.backup_env->RenameFile(item.dst_path_tmp,
849 item.dst_path);
850 }
851 if (item_status.ok()) {
852 item_status = new_backup.get()->AddFile(
853 std::make_shared<FileInfo>(item.dst_relative,
854 result.size,
855 result.checksum_value));
856 }
857 if (!item_status.ok()) {
858 s = item_status;
859 }
860 }
861
862 // we copied all the files, enable file deletions
863 db->EnableFileDeletions(false);
864
865 auto backup_time = backup_env_->NowMicros() - start_backup;
866
867 if (s.ok()) {
868 // persist the backup metadata on the disk
869 s = new_backup->StoreToFile(options_.sync);
870 }
871 if (s.ok() && options_.sync) {
872 unique_ptr<Directory> backup_private_directory;
873 backup_env_->NewDirectory(
874 GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
875 &backup_private_directory);
876 if (backup_private_directory != nullptr) {
877 s = backup_private_directory->Fsync();
878 }
879 if (s.ok() && private_directory_ != nullptr) {
880 s = private_directory_->Fsync();
881 }
882 if (s.ok() && meta_directory_ != nullptr) {
883 s = meta_directory_->Fsync();
884 }
885 if (s.ok() && shared_directory_ != nullptr) {
886 s = shared_directory_->Fsync();
887 }
888 if (s.ok() && backup_directory_ != nullptr) {
889 s = backup_directory_->Fsync();
890 }
891 }
892
893 if (s.ok()) {
894 backup_statistics_.IncrementNumberSuccessBackup();
895 }
896 if (!s.ok()) {
897 backup_statistics_.IncrementNumberFailBackup();
898 // clean all the files we might have created
899 ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
900 s.ToString().c_str());
901 ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
902 backup_statistics_.ToString().c_str());
903 // delete files that we might have already written
904 DeleteBackup(new_backup_id);
905 GarbageCollect();
906 return s;
907 }
908
909 // here we know that we succeeded and installed the new backup
910 // in the LATEST_BACKUP file
911 latest_backup_id_ = new_backup_id;
912 latest_valid_backup_id_ = new_backup_id;
913 ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
914
915 // backup_speed is in byte/second
916 double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
917 ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
918 new_backup->GetNumberFiles());
919 char human_size[16];
920 AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
921 ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
922 ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
923 backup_time);
924 ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
925 ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
926 backup_statistics_.ToString().c_str());
927 return s;
928 }
929
930 Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
931 assert(initialized_);
932 assert(!read_only_);
933 ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
934 num_backups_to_keep);
935 std::vector<BackupID> to_delete;
936 auto itr = backups_.begin();
937 while ((backups_.size() - to_delete.size()) > num_backups_to_keep) {
938 to_delete.push_back(itr->first);
939 itr++;
940 }
941 for (auto backup_id : to_delete) {
942 auto s = DeleteBackup(backup_id);
943 if (!s.ok()) {
944 return s;
945 }
946 }
947 return Status::OK();
948 }
949
950 Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
951 assert(initialized_);
952 assert(!read_only_);
953 ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
954 auto backup = backups_.find(backup_id);
955 if (backup != backups_.end()) {
956 auto s = backup->second->Delete();
957 if (!s.ok()) {
958 return s;
959 }
960 backups_.erase(backup);
961 } else {
962 auto corrupt = corrupt_backups_.find(backup_id);
963 if (corrupt == corrupt_backups_.end()) {
964 return Status::NotFound("Backup not found");
965 }
966 auto s = corrupt->second.second->Delete();
967 if (!s.ok()) {
968 return s;
969 }
970 corrupt_backups_.erase(corrupt);
971 }
972
973 if (options_.max_valid_backups_to_open == port::kMaxInt32) {
974 std::vector<std::string> to_delete;
975 for (auto& itr : backuped_file_infos_) {
976 if (itr.second->refs == 0) {
977 Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
978 ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
979 itr.first.c_str(), s.ToString().c_str());
980 to_delete.push_back(itr.first);
981 }
982 }
983 for (auto& td : to_delete) {
984 backuped_file_infos_.erase(td);
985 }
986 } else {
987 ROCKS_LOG_WARN(
988 options_.info_log,
989 "DeleteBackup cleanup is limited since `max_valid_backups_to_open` "
990 "constrains how many backups the engine knows about");
991 }
992
993 // take care of private dirs -- GarbageCollect() will take care of them
994 // if they are not empty
995 std::string private_dir = GetPrivateFileRel(backup_id);
996 Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
997 ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
998 private_dir.c_str(), s.ToString().c_str());
999 return Status::OK();
1000 }
1001
1002 void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
1003 assert(initialized_);
1004 backup_info->reserve(backups_.size());
1005 for (auto& backup : backups_) {
1006 if (!backup.second->Empty()) {
1007 backup_info->push_back(BackupInfo(
1008 backup.first, backup.second->GetTimestamp(), backup.second->GetSize(),
1009 backup.second->GetNumberFiles(), backup.second->GetAppMetadata()));
1010 }
1011 }
1012 }
1013
1014 void
1015 BackupEngineImpl::GetCorruptedBackups(
1016 std::vector<BackupID>* corrupt_backup_ids) {
1017 assert(initialized_);
1018 corrupt_backup_ids->reserve(corrupt_backups_.size());
1019 for (auto& backup : corrupt_backups_) {
1020 corrupt_backup_ids->push_back(backup.first);
1021 }
1022 }
1023
1024 Status BackupEngineImpl::RestoreDBFromBackup(
1025 BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
1026 const RestoreOptions& restore_options) {
1027 assert(initialized_);
1028 auto corrupt_itr = corrupt_backups_.find(backup_id);
1029 if (corrupt_itr != corrupt_backups_.end()) {
1030 return corrupt_itr->second.first;
1031 }
1032 auto backup_itr = backups_.find(backup_id);
1033 if (backup_itr == backups_.end()) {
1034 return Status::NotFound("Backup not found");
1035 }
1036 auto& backup = backup_itr->second;
1037 if (backup->Empty()) {
1038 return Status::NotFound("Backup not found");
1039 }
1040
1041 ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id);
1042 ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n",
1043 static_cast<int>(restore_options.keep_log_files));
1044
1045 // just in case. Ignore errors
1046 db_env_->CreateDirIfMissing(db_dir);
1047 db_env_->CreateDirIfMissing(wal_dir);
1048
1049 if (restore_options.keep_log_files) {
1050 // delete files in db_dir, but keep all the log files
1051 DeleteChildren(db_dir, 1 << kLogFile);
1052 // move all the files from archive dir to wal_dir
1053 std::string archive_dir = ArchivalDirectory(wal_dir);
1054 std::vector<std::string> archive_files;
1055 db_env_->GetChildren(archive_dir, &archive_files); // ignore errors
1056 for (const auto& f : archive_files) {
1057 uint64_t number;
1058 FileType type;
1059 bool ok = ParseFileName(f, &number, &type);
1060 if (ok && type == kLogFile) {
1061 ROCKS_LOG_INFO(options_.info_log,
1062 "Moving log file from archive/ to wal_dir: %s",
1063 f.c_str());
1064 Status s =
1065 db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
1066 if (!s.ok()) {
1067 // if we can't move log file from archive_dir to wal_dir,
1068 // we should fail, since it might mean data loss
1069 return s;
1070 }
1071 }
1072 }
1073 } else {
1074 DeleteChildren(wal_dir);
1075 DeleteChildren(ArchivalDirectory(wal_dir));
1076 DeleteChildren(db_dir);
1077 }
1078
1079 RateLimiter* rate_limiter = options_.restore_rate_limiter.get();
1080 if (rate_limiter) {
1081 copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
1082 }
1083 Status s;
1084 std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
1085 for (const auto& file_info : backup->GetFiles()) {
1086 const std::string &file = file_info->filename;
1087 std::string dst;
1088 // 1. extract the filename
1089 size_t slash = file.find_last_of('/');
1090 // file will either be shared/<file>, shared_checksum/<file_crc32_size>
1091 // or private/<number>/<file>
1092 assert(slash != std::string::npos);
1093 dst = file.substr(slash + 1);
1094
1095 // if the file was in shared_checksum, extract the real file name
1096 // in this case the file is <number>_<checksum>_<size>.<type>
1097 if (file.substr(0, slash) == GetSharedChecksumDirRel()) {
1098 dst = GetFileFromChecksumFile(dst);
1099 }
1100
1101 // 2. find the filetype
1102 uint64_t number;
1103 FileType type;
1104 bool ok = ParseFileName(dst, &number, &type);
1105 if (!ok) {
1106 return Status::Corruption("Backup corrupted");
1107 }
1108 // 3. Construct the final path
1109 // kLogFile lives in wal_dir and all the rest live in db_dir
1110 dst = ((type == kLogFile) ? wal_dir : db_dir) +
1111 "/" + dst;
1112
1113 ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(),
1114 dst.c_str());
1115 CopyOrCreateWorkItem copy_or_create_work_item(
1116 GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
1117 false, rate_limiter, 0 /* size_limit */);
1118 RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1119 copy_or_create_work_item.result.get_future(),
1120 file_info->checksum_value);
1121 files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
1122 restore_items_to_finish.push_back(
1123 std::move(after_copy_or_create_work_item));
1124 }
1125 Status item_status;
1126 for (auto& item : restore_items_to_finish) {
1127 item.result.wait();
1128 auto result = item.result.get();
1129 item_status = result.status;
1130 // Note: It is possible that both of the following bad-status cases occur
1131 // during copying. But, we only return one status.
1132 if (!item_status.ok()) {
1133 s = item_status;
1134 break;
1135 } else if (item.checksum_value != result.checksum_value) {
1136 s = Status::Corruption("Checksum check failed");
1137 break;
1138 }
1139 }
1140
1141 ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
1142 s.ToString().c_str());
1143 return s;
1144 }
1145
1146 Status BackupEngineImpl::VerifyBackup(BackupID backup_id) {
1147 assert(initialized_);
1148 auto corrupt_itr = corrupt_backups_.find(backup_id);
1149 if (corrupt_itr != corrupt_backups_.end()) {
1150 return corrupt_itr->second.first;
1151 }
1152
1153 auto backup_itr = backups_.find(backup_id);
1154 if (backup_itr == backups_.end()) {
1155 return Status::NotFound();
1156 }
1157
1158 auto& backup = backup_itr->second;
1159 if (backup->Empty()) {
1160 return Status::NotFound();
1161 }
1162
1163 ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id);
1164
1165 std::unordered_map<std::string, uint64_t> curr_abs_path_to_size;
1166 for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(),
1167 GetSharedFileWithChecksumRel()}) {
1168 const auto abs_dir = GetAbsolutePath(rel_dir);
1169 InsertPathnameToSizeBytes(abs_dir, backup_env_, &curr_abs_path_to_size);
1170 }
1171
1172 for (const auto& file_info : backup->GetFiles()) {
1173 const auto abs_path = GetAbsolutePath(file_info->filename);
1174 if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) {
1175 return Status::NotFound("File missing: " + abs_path);
1176 }
1177 if (file_info->size != curr_abs_path_to_size[abs_path]) {
1178 return Status::Corruption("File corrupted: " + abs_path);
1179 }
1180 }
1181 return Status::OK();
1182 }
1183
1184 Status BackupEngineImpl::CopyOrCreateFile(
1185 const std::string& src, const std::string& dst, const std::string& contents,
1186 Env* src_env, Env* dst_env, bool sync, RateLimiter* rate_limiter,
1187 uint64_t* size, uint32_t* checksum_value, uint64_t size_limit,
1188 std::function<void()> progress_callback) {
1189 assert(src.empty() != contents.empty());
1190 Status s;
1191 unique_ptr<WritableFile> dst_file;
1192 unique_ptr<SequentialFile> src_file;
1193 EnvOptions env_options;
1194 env_options.use_mmap_writes = false;
1195 // TODO:(gzh) maybe use direct reads/writes here if possible
1196 if (size != nullptr) {
1197 *size = 0;
1198 }
1199 if (checksum_value != nullptr) {
1200 *checksum_value = 0;
1201 }
1202
1203 // Check if size limit is set. if not, set it to very big number
1204 if (size_limit == 0) {
1205 size_limit = std::numeric_limits<uint64_t>::max();
1206 }
1207
1208 s = dst_env->NewWritableFile(dst, &dst_file, env_options);
1209 if (s.ok() && !src.empty()) {
1210 s = src_env->NewSequentialFile(src, &src_file, env_options);
1211 }
1212 if (!s.ok()) {
1213 return s;
1214 }
1215
1216 unique_ptr<WritableFileWriter> dest_writer(
1217 new WritableFileWriter(std::move(dst_file), dst, env_options));
1218 unique_ptr<SequentialFileReader> src_reader;
1219 unique_ptr<char[]> buf;
1220 if (!src.empty()) {
1221 src_reader.reset(new SequentialFileReader(std::move(src_file), src));
1222 buf.reset(new char[copy_file_buffer_size_]);
1223 }
1224
1225 Slice data;
1226 uint64_t processed_buffer_size = 0;
1227 do {
1228 if (stop_backup_.load(std::memory_order_acquire)) {
1229 return Status::Incomplete("Backup stopped");
1230 }
1231 if (!src.empty()) {
1232 size_t buffer_to_read = (copy_file_buffer_size_ < size_limit)
1233 ? copy_file_buffer_size_
1234 : static_cast<size_t>(size_limit);
1235 s = src_reader->Read(buffer_to_read, &data, buf.get());
1236 processed_buffer_size += buffer_to_read;
1237 } else {
1238 data = contents;
1239 }
1240 size_limit -= data.size();
1241
1242 if (!s.ok()) {
1243 return s;
1244 }
1245
1246 if (size != nullptr) {
1247 *size += data.size();
1248 }
1249 if (checksum_value != nullptr) {
1250 *checksum_value =
1251 crc32c::Extend(*checksum_value, data.data(), data.size());
1252 }
1253 s = dest_writer->Append(data);
1254 if (rate_limiter != nullptr) {
1255 rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
1256 RateLimiter::OpType::kWrite);
1257 }
1258 if (processed_buffer_size > options_.callback_trigger_interval_size) {
1259 processed_buffer_size -= options_.callback_trigger_interval_size;
1260 std::lock_guard<std::mutex> lock(byte_report_mutex_);
1261 progress_callback();
1262 }
1263 } while (s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
1264
1265 if (s.ok() && sync) {
1266 s = dest_writer->Sync(false);
1267 }
1268 if (s.ok()) {
1269 s = dest_writer->Close();
1270 }
1271 return s;
1272 }
1273
1274 // fname will always start with "/"
1275 Status BackupEngineImpl::AddBackupFileWorkItem(
1276 std::unordered_set<std::string>& live_dst_paths,
1277 std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
1278 BackupID backup_id, bool shared, const std::string& src_dir,
1279 const std::string& fname, RateLimiter* rate_limiter, uint64_t size_bytes,
1280 uint64_t size_limit, bool shared_checksum,
1281 std::function<void()> progress_callback, const std::string& contents) {
1282 assert(!fname.empty() && fname[0] == '/');
1283 assert(contents.empty() != src_dir.empty());
1284
1285 std::string dst_relative = fname.substr(1);
1286 std::string dst_relative_tmp;
1287 Status s;
1288 uint32_t checksum_value = 0;
1289
1290 if (shared && shared_checksum) {
1291 // add checksum and file length to the file name
1292 s = CalculateChecksum(src_dir + fname, db_env_, size_limit,
1293 &checksum_value);
1294 if (!s.ok()) {
1295 return s;
1296 }
1297 if (size_bytes == port::kMaxUint64) {
1298 return Status::NotFound("File missing: " + src_dir + fname);
1299 }
1300 dst_relative =
1301 GetSharedFileWithChecksum(dst_relative, checksum_value, size_bytes);
1302 dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
1303 dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
1304 } else if (shared) {
1305 dst_relative_tmp = GetSharedFileRel(dst_relative, true);
1306 dst_relative = GetSharedFileRel(dst_relative, false);
1307 } else {
1308 dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
1309 }
1310
1311 // We copy into `temp_dest_path` and, once finished, rename it to
1312 // `final_dest_path`. This allows files to atomically appear at
1313 // `final_dest_path`. We can copy directly to the final path when atomicity
1314 // is unnecessary, like for files in private backup directories.
1315 const std::string* copy_dest_path;
1316 std::string temp_dest_path;
1317 std::string final_dest_path = GetAbsolutePath(dst_relative);
1318 if (!dst_relative_tmp.empty()) {
1319 temp_dest_path = GetAbsolutePath(dst_relative_tmp);
1320 copy_dest_path = &temp_dest_path;
1321 } else {
1322 copy_dest_path = &final_dest_path;
1323 }
1324
1325 // if it's shared, we also need to check if it exists -- if it does, no need
1326 // to copy it again.
1327 bool need_to_copy = true;
1328 // true if final_dest_path is the same path as another live file
1329 const bool same_path =
1330 live_dst_paths.find(final_dest_path) != live_dst_paths.end();
1331
1332 bool file_exists = false;
1333 if (shared && !same_path) {
1334 Status exist = backup_env_->FileExists(final_dest_path);
1335 if (exist.ok()) {
1336 file_exists = true;
1337 } else if (exist.IsNotFound()) {
1338 file_exists = false;
1339 } else {
1340 assert(s.IsIOError());
1341 return exist;
1342 }
1343 }
1344
1345 if (!contents.empty()) {
1346 need_to_copy = false;
1347 } else if (shared && (same_path || file_exists)) {
1348 need_to_copy = false;
1349 if (shared_checksum) {
1350 ROCKS_LOG_INFO(options_.info_log,
1351 "%s already present, with checksum %u and size %" PRIu64,
1352 fname.c_str(), checksum_value, size_bytes);
1353 } else if (backuped_file_infos_.find(dst_relative) ==
1354 backuped_file_infos_.end() && !same_path) {
1355 // file already exists, but it's not referenced by any backup. overwrite
1356 // the file
1357 ROCKS_LOG_INFO(
1358 options_.info_log,
1359 "%s already present, but not referenced by any backup. We will "
1360 "overwrite the file.",
1361 fname.c_str());
1362 need_to_copy = true;
1363 backup_env_->DeleteFile(final_dest_path);
1364 } else {
1365 // the file is present and referenced by a backup
1366 ROCKS_LOG_INFO(options_.info_log,
1367 "%s already present, calculate checksum", fname.c_str());
1368 s = CalculateChecksum(src_dir + fname, db_env_, size_limit,
1369 &checksum_value);
1370 }
1371 }
1372 live_dst_paths.insert(final_dest_path);
1373
1374 if (!contents.empty() || need_to_copy) {
1375 ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
1376 copy_dest_path->c_str());
1377 CopyOrCreateWorkItem copy_or_create_work_item(
1378 src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
1379 db_env_, backup_env_, options_.sync, rate_limiter, size_limit,
1380 progress_callback);
1381 BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1382 copy_or_create_work_item.result.get_future(), shared, need_to_copy,
1383 backup_env_, temp_dest_path, final_dest_path, dst_relative);
1384 files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
1385 backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
1386 } else {
1387 std::promise<CopyOrCreateResult> promise_result;
1388 BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1389 promise_result.get_future(), shared, need_to_copy, backup_env_,
1390 temp_dest_path, final_dest_path, dst_relative);
1391 backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
1392 CopyOrCreateResult result;
1393 result.status = s;
1394 result.size = size_bytes;
1395 result.checksum_value = checksum_value;
1396 promise_result.set_value(std::move(result));
1397 }
1398 return s;
1399 }
1400
1401 Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
1402 uint64_t size_limit,
1403 uint32_t* checksum_value) {
1404 *checksum_value = 0;
1405 if (size_limit == 0) {
1406 size_limit = std::numeric_limits<uint64_t>::max();
1407 }
1408
1409 EnvOptions env_options;
1410 env_options.use_mmap_writes = false;
1411 env_options.use_direct_reads = false;
1412
1413 std::unique_ptr<SequentialFile> src_file;
1414 Status s = src_env->NewSequentialFile(src, &src_file, env_options);
1415 if (!s.ok()) {
1416 return s;
1417 }
1418
1419 unique_ptr<SequentialFileReader> src_reader(
1420 new SequentialFileReader(std::move(src_file), src));
1421 std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
1422 Slice data;
1423
1424 do {
1425 if (stop_backup_.load(std::memory_order_acquire)) {
1426 return Status::Incomplete("Backup stopped");
1427 }
1428 size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
1429 copy_file_buffer_size_ : static_cast<size_t>(size_limit);
1430 s = src_reader->Read(buffer_to_read, &data, buf.get());
1431
1432 if (!s.ok()) {
1433 return s;
1434 }
1435
1436 size_limit -= data.size();
1437 *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
1438 } while (data.size() > 0 && size_limit > 0);
1439
1440 return s;
1441 }
1442
1443 void BackupEngineImpl::DeleteChildren(const std::string& dir,
1444 uint32_t file_type_filter) {
1445 std::vector<std::string> children;
1446 db_env_->GetChildren(dir, &children); // ignore errors
1447
1448 for (const auto& f : children) {
1449 uint64_t number;
1450 FileType type;
1451 bool ok = ParseFileName(f, &number, &type);
1452 if (ok && (file_type_filter & (1 << type))) {
1453 // don't delete this file
1454 continue;
1455 }
1456 db_env_->DeleteFile(dir + "/" + f); // ignore errors
1457 }
1458 }
1459
1460 Status BackupEngineImpl::InsertPathnameToSizeBytes(
1461 const std::string& dir, Env* env,
1462 std::unordered_map<std::string, uint64_t>* result) {
1463 assert(result != nullptr);
1464 std::vector<Env::FileAttributes> files_attrs;
1465 Status status = env->FileExists(dir);
1466 if (status.ok()) {
1467 status = env->GetChildrenFileAttributes(dir, &files_attrs);
1468 } else if (status.IsNotFound()) {
1469 // Insert no entries can be considered success
1470 status = Status::OK();
1471 }
1472 const bool slash_needed = dir.empty() || dir.back() != '/';
1473 for (const auto& file_attrs : files_attrs) {
1474 result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name,
1475 file_attrs.size_bytes);
1476 }
1477 return status;
1478 }
1479
1480 Status BackupEngineImpl::GarbageCollect() {
1481 assert(!read_only_);
1482 ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection");
1483 if (options_.max_valid_backups_to_open == port::kMaxInt32) {
1484 ROCKS_LOG_WARN(
1485 options_.info_log,
1486 "Garbage collection is limited since `max_valid_backups_to_open` "
1487 "constrains how many backups the engine knows about");
1488 }
1489
1490 if (options_.share_table_files &&
1491 options_.max_valid_backups_to_open == port::kMaxInt32) {
1492 // delete obsolete shared files
1493 // we cannot do this when BackupEngine has `max_valid_backups_to_open` set
1494 // as those engines don't know about all shared files.
1495 std::vector<std::string> shared_children;
1496 {
1497 std::string shared_path;
1498 if (options_.share_files_with_checksum) {
1499 shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel());
1500 } else {
1501 shared_path = GetAbsolutePath(GetSharedFileRel());
1502 }
1503 auto s = backup_env_->FileExists(shared_path);
1504 if (s.ok()) {
1505 s = backup_env_->GetChildren(shared_path, &shared_children);
1506 } else if (s.IsNotFound()) {
1507 s = Status::OK();
1508 }
1509 if (!s.ok()) {
1510 return s;
1511 }
1512 }
1513 for (auto& child : shared_children) {
1514 std::string rel_fname;
1515 if (options_.share_files_with_checksum) {
1516 rel_fname = GetSharedFileWithChecksumRel(child);
1517 } else {
1518 rel_fname = GetSharedFileRel(child);
1519 }
1520 auto child_itr = backuped_file_infos_.find(rel_fname);
1521 // if it's not refcounted, delete it
1522 if (child_itr == backuped_file_infos_.end() ||
1523 child_itr->second->refs == 0) {
1524 // this might be a directory, but DeleteFile will just fail in that
1525 // case, so we're good
1526 Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
1527 ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
1528 rel_fname.c_str(), s.ToString().c_str());
1529 backuped_file_infos_.erase(rel_fname);
1530 }
1531 }
1532 }
1533
1534 // delete obsolete private files
1535 std::vector<std::string> private_children;
1536 {
1537 auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
1538 &private_children);
1539 if (!s.ok()) {
1540 return s;
1541 }
1542 }
1543 for (auto& child : private_children) {
1544 // it's ok to do this when BackupEngine has `max_valid_backups_to_open` set
1545 // as the engine always knows all valid backup numbers.
1546 BackupID backup_id = 0;
1547 bool tmp_dir = child.find(".tmp") != std::string::npos;
1548 sscanf(child.c_str(), "%u", &backup_id);
1549 if (!tmp_dir && // if it's tmp_dir, delete it
1550 (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
1551 // it's either not a number or it's still alive. continue
1552 continue;
1553 }
1554 // here we have to delete the dir and all its children
1555 std::string full_private_path =
1556 GetAbsolutePath(GetPrivateFileRel(backup_id));
1557 std::vector<std::string> subchildren;
1558 backup_env_->GetChildren(full_private_path, &subchildren);
1559 for (auto& subchild : subchildren) {
1560 Status s = backup_env_->DeleteFile(full_private_path + subchild);
1561 ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
1562 (full_private_path + subchild).c_str(),
1563 s.ToString().c_str());
1564 }
1565 // finally delete the private dir
1566 Status s = backup_env_->DeleteDir(full_private_path);
1567 ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s",
1568 full_private_path.c_str(), s.ToString().c_str());
1569 }
1570
1571 return Status::OK();
1572 }
1573
1574 // ------- BackupMeta class --------
1575
1576 Status BackupEngineImpl::BackupMeta::AddFile(
1577 std::shared_ptr<FileInfo> file_info) {
1578 auto itr = file_infos_->find(file_info->filename);
1579 if (itr == file_infos_->end()) {
1580 auto ret = file_infos_->insert({file_info->filename, file_info});
1581 if (ret.second) {
1582 itr = ret.first;
1583 itr->second->refs = 1;
1584 } else {
1585 // if this happens, something is seriously wrong
1586 return Status::Corruption("In memory metadata insertion error");
1587 }
1588 } else {
1589 if (itr->second->checksum_value != file_info->checksum_value) {
1590 return Status::Corruption(
1591 "Checksum mismatch for existing backup file. Delete old backups and "
1592 "try again.");
1593 }
1594 ++itr->second->refs; // increase refcount if already present
1595 }
1596
1597 size_ += file_info->size;
1598 files_.push_back(itr->second);
1599
1600 return Status::OK();
1601 }
1602
1603 Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
1604 Status s;
1605 for (const auto& file : files_) {
1606 --file->refs; // decrease refcount
1607 }
1608 files_.clear();
1609 // delete meta file
1610 if (delete_meta) {
1611 s = env_->FileExists(meta_filename_);
1612 if (s.ok()) {
1613 s = env_->DeleteFile(meta_filename_);
1614 } else if (s.IsNotFound()) {
1615 s = Status::OK(); // nothing to delete
1616 }
1617 }
1618 timestamp_ = 0;
1619 return s;
1620 }
1621
1622 Slice kMetaDataPrefix("metadata ");
1623
1624 // each backup meta file is of the format:
1625 // <timestamp>
1626 // <seq number>
1627 // <metadata(literal string)> <metadata> (optional)
1628 // <number of files>
1629 // <file1> <crc32(literal string)> <crc32_value>
1630 // <file2> <crc32(literal string)> <crc32_value>
1631 // ...
1632 Status BackupEngineImpl::BackupMeta::LoadFromFile(
1633 const std::string& backup_dir,
1634 const std::unordered_map<std::string, uint64_t>& abs_path_to_size) {
1635 assert(Empty());
1636 Status s;
1637 unique_ptr<SequentialFile> backup_meta_file;
1638 s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
1639 if (!s.ok()) {
1640 return s;
1641 }
1642
1643 unique_ptr<SequentialFileReader> backup_meta_reader(
1644 new SequentialFileReader(std::move(backup_meta_file), meta_filename_));
1645 unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
1646 Slice data;
1647 s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());
1648
1649 if (!s.ok() || data.size() == max_backup_meta_file_size_) {
1650 return s.ok() ? Status::Corruption("File size too big") : s;
1651 }
1652 buf[data.size()] = 0;
1653
1654 uint32_t num_files = 0;
1655 char *next;
1656 timestamp_ = strtoull(data.data(), &next, 10);
1657 data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1658 sequence_number_ = strtoull(data.data(), &next, 10);
1659 data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1660
1661 if (data.starts_with(kMetaDataPrefix)) {
1662 // app metadata present
1663 data.remove_prefix(kMetaDataPrefix.size());
1664 Slice hex_encoded_metadata = GetSliceUntil(&data, '\n');
1665 bool decode_success = hex_encoded_metadata.DecodeHex(&app_metadata_);
1666 if (!decode_success) {
1667 return Status::Corruption(
1668 "Failed to decode stored hex encoded app metadata");
1669 }
1670 }
1671
1672 num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10));
1673 data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1674
1675 std::vector<std::shared_ptr<FileInfo>> files;
1676
1677 Slice checksum_prefix("crc32 ");
1678
1679 for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
1680 auto line = GetSliceUntil(&data, '\n');
1681 std::string filename = GetSliceUntil(&line, ' ').ToString();
1682
1683 uint64_t size;
1684 const std::shared_ptr<FileInfo> file_info = GetFile(filename);
1685 if (file_info) {
1686 size = file_info->size;
1687 } else {
1688 std::string abs_path = backup_dir + "/" + filename;
1689 try {
1690 size = abs_path_to_size.at(abs_path);
1691 } catch (std::out_of_range&) {
1692 return Status::Corruption("Size missing for pathname: " + abs_path);
1693 }
1694 }
1695
1696 if (line.empty()) {
1697 return Status::Corruption("File checksum is missing for " + filename +
1698 " in " + meta_filename_);
1699 }
1700
1701 uint32_t checksum_value = 0;
1702 if (line.starts_with(checksum_prefix)) {
1703 line.remove_prefix(checksum_prefix.size());
1704 checksum_value = static_cast<uint32_t>(
1705 strtoul(line.data(), nullptr, 10));
1706 if (line != rocksdb::ToString(checksum_value)) {
1707 return Status::Corruption("Invalid checksum value for " + filename +
1708 " in " + meta_filename_);
1709 }
1710 } else {
1711 return Status::Corruption("Unknown checksum type for " + filename +
1712 " in " + meta_filename_);
1713 }
1714
1715 files.emplace_back(new FileInfo(filename, size, checksum_value));
1716 }
1717
1718 if (s.ok() && data.size() > 0) {
1719 // file has to be read completely. if not, we count it as corruption
1720 s = Status::Corruption("Tailing data in backup meta file in " +
1721 meta_filename_);
1722 }
1723
1724 if (s.ok()) {
1725 files_.reserve(files.size());
1726 for (const auto& file_info : files) {
1727 s = AddFile(file_info);
1728 if (!s.ok()) {
1729 break;
1730 }
1731 }
1732 }
1733
1734 return s;
1735 }
1736
1737 Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
1738 Status s;
1739 unique_ptr<WritableFile> backup_meta_file;
1740 EnvOptions env_options;
1741 env_options.use_mmap_writes = false;
1742 env_options.use_direct_writes = false;
1743 s = env_->NewWritableFile(meta_tmp_filename_, &backup_meta_file, env_options);
1744 if (!s.ok()) {
1745 return s;
1746 }
1747
1748 unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
1749 size_t len = 0, buf_size = max_backup_meta_file_size_;
1750 len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_);
1751 len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
1752 sequence_number_);
1753 if (!app_metadata_.empty()) {
1754 std::string hex_encoded_metadata =
1755 Slice(app_metadata_).ToString(/* hex */ true);
1756
1757 // +1 to accommodate newline character
1758 size_t hex_meta_strlen = kMetaDataPrefix.ToString().length() + hex_encoded_metadata.length() + 1;
1759 if (hex_meta_strlen >= buf_size) {
1760 return Status::Corruption("Buffer too small to fit backup metadata");
1761 }
1762 else if (len + hex_meta_strlen >= buf_size) {
1763 backup_meta_file->Append(Slice(buf.get(), len));
1764 buf.reset();
1765 unique_ptr<char[]> new_reset_buf(new char[max_backup_meta_file_size_]);
1766 buf.swap(new_reset_buf);
1767 len = 0;
1768 }
1769 len += snprintf(buf.get() + len, buf_size - len, "%s%s\n",
1770 kMetaDataPrefix.ToString().c_str(),
1771 hex_encoded_metadata.c_str());
1772 }
1773
1774 char writelen_temp[19];
1775 if (len + snprintf(writelen_temp, sizeof(writelen_temp),
1776 "%" ROCKSDB_PRIszt "\n", files_.size()) >= buf_size) {
1777 backup_meta_file->Append(Slice(buf.get(), len));
1778 buf.reset();
1779 unique_ptr<char[]> new_reset_buf(new char[max_backup_meta_file_size_]);
1780 buf.swap(new_reset_buf);
1781 len = 0;
1782 }
1783 {
1784 const char *const_write = writelen_temp;
1785 len += snprintf(buf.get() + len, buf_size - len, "%s", const_write);
1786 }
1787
1788 for (const auto& file : files_) {
1789 // use crc32 for now, switch to something else if needed
1790
1791 size_t newlen = len + file->filename.length() + snprintf(writelen_temp,
1792 sizeof(writelen_temp), " crc32 %u\n", file->checksum_value);
1793 const char *const_write = writelen_temp;
1794 if (newlen >= buf_size) {
1795 backup_meta_file->Append(Slice(buf.get(), len));
1796 buf.reset();
1797 unique_ptr<char[]> new_reset_buf(new char[max_backup_meta_file_size_]);
1798 buf.swap(new_reset_buf);
1799 len = 0;
1800 }
1801 len += snprintf(buf.get() + len, buf_size - len, "%s%s",
1802 file->filename.c_str(), const_write);
1803 }
1804
1805 s = backup_meta_file->Append(Slice(buf.get(), len));
1806 if (s.ok() && sync) {
1807 s = backup_meta_file->Sync();
1808 }
1809 if (s.ok()) {
1810 s = backup_meta_file->Close();
1811 }
1812 if (s.ok()) {
1813 s = env_->RenameFile(meta_tmp_filename_, meta_filename_);
1814 }
1815 return s;
1816 }
1817
1818 // -------- BackupEngineReadOnlyImpl ---------
1819 class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
1820 public:
1821 BackupEngineReadOnlyImpl(Env* db_env, const BackupableDBOptions& options)
1822 : backup_engine_(new BackupEngineImpl(db_env, options, true)) {}
1823
1824 virtual ~BackupEngineReadOnlyImpl() {}
1825
1826 // The returned BackupInfos are in chronological order, which means the
1827 // latest backup comes last.
1828 virtual void GetBackupInfo(std::vector<BackupInfo>* backup_info) override {
1829 backup_engine_->GetBackupInfo(backup_info);
1830 }
1831
1832 virtual void GetCorruptedBackups(
1833 std::vector<BackupID>* corrupt_backup_ids) override {
1834 backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
1835 }
1836
1837 virtual Status RestoreDBFromBackup(
1838 BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
1839 const RestoreOptions& restore_options = RestoreOptions()) override {
1840 return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
1841 restore_options);
1842 }
1843
1844 virtual Status RestoreDBFromLatestBackup(
1845 const std::string& db_dir, const std::string& wal_dir,
1846 const RestoreOptions& restore_options = RestoreOptions()) override {
1847 return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
1848 restore_options);
1849 }
1850
1851 virtual Status VerifyBackup(BackupID backup_id) override {
1852 return backup_engine_->VerifyBackup(backup_id);
1853 }
1854
1855 Status Initialize() { return backup_engine_->Initialize(); }
1856
1857 private:
1858 std::unique_ptr<BackupEngineImpl> backup_engine_;
1859 };
1860
1861 Status BackupEngineReadOnly::Open(Env* env, const BackupableDBOptions& options,
1862 BackupEngineReadOnly** backup_engine_ptr) {
1863 if (options.destroy_old_data) {
1864 return Status::InvalidArgument(
1865 "Can't destroy old data with ReadOnly BackupEngine");
1866 }
1867 std::unique_ptr<BackupEngineReadOnlyImpl> backup_engine(
1868 new BackupEngineReadOnlyImpl(env, options));
1869 auto s = backup_engine->Initialize();
1870 if (!s.ok()) {
1871 *backup_engine_ptr = nullptr;
1872 return s;
1873 }
1874 *backup_engine_ptr = backup_engine.release();
1875 return Status::OK();
1876 }
1877
1878 } // namespace rocksdb
1879
1880 #endif // ROCKSDB_LITE