1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
12 #include "rocksdb/utilities/backupable_db.h"
13 #include "port/port.h"
14 #include "rocksdb/rate_limiter.h"
15 #include "rocksdb/transaction_log.h"
16 #include "util/channel.h"
17 #include "util/coding.h"
18 #include "util/crc32c.h"
19 #include "util/file_reader_writer.h"
20 #include "util/filename.h"
21 #include "util/logging.h"
22 #include "util/string_util.h"
23 #include "util/sync_point.h"
24 #include "utilities/checkpoint/checkpoint_impl.h"
26 #ifndef __STDC_FORMAT_MACROS
27 #define __STDC_FORMAT_MACROS
28 #endif // __STDC_FORMAT_MACROS
42 #include <unordered_map>
43 #include <unordered_set>
48 void BackupStatistics::IncrementNumberSuccessBackup() {
49 number_success_backup
++;
51 void BackupStatistics::IncrementNumberFailBackup() {
55 uint32_t BackupStatistics::GetNumberSuccessBackup() const {
56 return number_success_backup
;
58 uint32_t BackupStatistics::GetNumberFailBackup() const {
59 return number_fail_backup
;
62 std::string
BackupStatistics::ToString() const {
64 snprintf(result
, sizeof(result
), "# success backup: %u, # fail backup: %u",
65 GetNumberSuccessBackup(), GetNumberFailBackup());
69 void BackupableDBOptions::Dump(Logger
* logger
) const {
70 ROCKS_LOG_INFO(logger
, " Options.backup_dir: %s",
72 ROCKS_LOG_INFO(logger
, " Options.backup_env: %p", backup_env
);
73 ROCKS_LOG_INFO(logger
, " Options.share_table_files: %d",
74 static_cast<int>(share_table_files
));
75 ROCKS_LOG_INFO(logger
, " Options.info_log: %p", info_log
);
76 ROCKS_LOG_INFO(logger
, " Options.sync: %d",
77 static_cast<int>(sync
));
78 ROCKS_LOG_INFO(logger
, " Options.destroy_old_data: %d",
79 static_cast<int>(destroy_old_data
));
80 ROCKS_LOG_INFO(logger
, " Options.backup_log_files: %d",
81 static_cast<int>(backup_log_files
));
82 ROCKS_LOG_INFO(logger
, " Options.backup_rate_limit: %" PRIu64
,
84 ROCKS_LOG_INFO(logger
, " Options.restore_rate_limit: %" PRIu64
,
86 ROCKS_LOG_INFO(logger
, "Options.max_background_operations: %d",
87 max_background_operations
);
90 // -------- BackupEngineImpl class ---------
91 class BackupEngineImpl
: public BackupEngine
{
93 BackupEngineImpl(Env
* db_env
, const BackupableDBOptions
& options
,
94 bool read_only
= false);
96 Status
CreateNewBackupWithMetadata(DB
* db
, const std::string
& app_metadata
,
97 bool flush_before_backup
= false,
98 std::function
<void()> progress_callback
=
100 Status
PurgeOldBackups(uint32_t num_backups_to_keep
) override
;
101 Status
DeleteBackup(BackupID backup_id
) override
;
102 void StopBackup() override
{
103 stop_backup_
.store(true, std::memory_order_release
);
105 Status
GarbageCollect() override
;
107 // The returned BackupInfos are in chronological order, which means the
108 // latest backup comes last.
109 void GetBackupInfo(std::vector
<BackupInfo
>* backup_info
) override
;
110 void GetCorruptedBackups(std::vector
<BackupID
>* corrupt_backup_ids
) override
;
111 Status
RestoreDBFromBackup(
112 BackupID backup_id
, const std::string
& db_dir
, const std::string
& wal_dir
,
113 const RestoreOptions
& restore_options
= RestoreOptions()) override
;
114 Status
RestoreDBFromLatestBackup(
115 const std::string
& db_dir
, const std::string
& wal_dir
,
116 const RestoreOptions
& restore_options
= RestoreOptions()) override
{
117 return RestoreDBFromBackup(latest_valid_backup_id_
, db_dir
, wal_dir
,
121 virtual Status
VerifyBackup(BackupID backup_id
) override
;
126 void DeleteChildren(const std::string
& dir
, uint32_t file_type_filter
= 0);
128 // Extends the "result" map with pathname->size mappings for the contents of
129 // "dir" in "env". Pathnames are prefixed with "dir".
130 Status
InsertPathnameToSizeBytes(
131 const std::string
& dir
, Env
* env
,
132 std::unordered_map
<std::string
, uint64_t>* result
);
135 FileInfo(const std::string
& fname
, uint64_t sz
, uint32_t checksum
)
136 : refs(0), filename(fname
), size(sz
), checksum_value(checksum
) {}
138 FileInfo(const FileInfo
&) = delete;
139 FileInfo
& operator=(const FileInfo
&) = delete;
142 const std::string filename
;
144 const uint32_t checksum_value
;
150 const std::string
& meta_filename
, const std::string
& meta_tmp_filename
,
151 std::unordered_map
<std::string
, std::shared_ptr
<FileInfo
>>* file_infos
,
156 meta_filename_(meta_filename
),
157 meta_tmp_filename_(meta_tmp_filename
),
158 file_infos_(file_infos
),
161 BackupMeta(const BackupMeta
&) = delete;
162 BackupMeta
& operator=(const BackupMeta
&) = delete;
166 void RecordTimestamp() {
167 env_
->GetCurrentTime(×tamp_
);
169 int64_t GetTimestamp() const {
172 uint64_t GetSize() const {
175 uint32_t GetNumberFiles() { return static_cast<uint32_t>(files_
.size()); }
176 void SetSequenceNumber(uint64_t sequence_number
) {
177 sequence_number_
= sequence_number
;
179 uint64_t GetSequenceNumber() {
180 return sequence_number_
;
183 const std::string
& GetAppMetadata() const { return app_metadata_
; }
185 void SetAppMetadata(const std::string
& app_metadata
) {
186 app_metadata_
= app_metadata
;
189 Status
AddFile(std::shared_ptr
<FileInfo
> file_info
);
191 Status
Delete(bool delete_meta
= true);
194 return files_
.empty();
197 std::shared_ptr
<FileInfo
> GetFile(const std::string
& filename
) const {
198 auto it
= file_infos_
->find(filename
);
199 if (it
== file_infos_
->end())
204 const std::vector
<std::shared_ptr
<FileInfo
>>& GetFiles() {
208 // @param abs_path_to_size Pre-fetched file sizes (bytes).
210 const std::string
& backup_dir
,
211 const std::unordered_map
<std::string
, uint64_t>& abs_path_to_size
);
212 Status
StoreToFile(bool sync
);
214 std::string
GetInfoString() {
215 std::ostringstream ss
;
216 ss
<< "Timestamp: " << timestamp_
<< std::endl
;
218 AppendHumanBytes(size_
, human_size
, sizeof(human_size
));
219 ss
<< "Size: " << human_size
<< std::endl
;
220 ss
<< "Files:" << std::endl
;
221 for (const auto& file
: files_
) {
222 AppendHumanBytes(file
->size
, human_size
, sizeof(human_size
));
223 ss
<< file
->filename
<< ", size " << human_size
<< ", refs "
224 << file
->refs
<< std::endl
;
231 // sequence number is only approximate, should not be used
233 uint64_t sequence_number_
;
235 std::string app_metadata_
;
236 std::string
const meta_filename_
;
237 std::string
const meta_tmp_filename_
;
238 // files with relative paths (without "/" prefix!!)
239 std::vector
<std::shared_ptr
<FileInfo
>> files_
;
240 std::unordered_map
<std::string
, std::shared_ptr
<FileInfo
>>* file_infos_
;
243 static const size_t max_backup_meta_file_size_
= 10 * 1024 * 1024; // 10MB
246 inline std::string
GetAbsolutePath(
247 const std::string
&relative_path
= "") const {
248 assert(relative_path
.size() == 0 || relative_path
[0] != '/');
249 return options_
.backup_dir
+ "/" + relative_path
;
251 inline std::string
GetPrivateDirRel() const {
254 inline std::string
GetSharedChecksumDirRel() const {
255 return "shared_checksum";
257 inline std::string
GetPrivateFileRel(BackupID backup_id
,
259 const std::string
& file
= "") const {
260 assert(file
.size() == 0 || file
[0] != '/');
261 return GetPrivateDirRel() + "/" + rocksdb::ToString(backup_id
) +
262 (tmp
? ".tmp" : "") + "/" + file
;
264 inline std::string
GetSharedFileRel(const std::string
& file
= "",
265 bool tmp
= false) const {
266 assert(file
.size() == 0 || file
[0] != '/');
267 return std::string("shared/") + (tmp
? "." : "") + file
+
270 inline std::string
GetSharedFileWithChecksumRel(const std::string
& file
= "",
271 bool tmp
= false) const {
272 assert(file
.size() == 0 || file
[0] != '/');
273 return GetSharedChecksumDirRel() + "/" + (tmp
? "." : "") + file
+
276 inline std::string
GetSharedFileWithChecksum(const std::string
& file
,
277 const uint32_t checksum_value
,
278 const uint64_t file_size
) const {
279 assert(file
.size() == 0 || file
[0] != '/');
280 std::string file_copy
= file
;
281 return file_copy
.insert(file_copy
.find_last_of('.'),
282 "_" + rocksdb::ToString(checksum_value
) + "_" +
283 rocksdb::ToString(file_size
));
285 inline std::string
GetFileFromChecksumFile(const std::string
& file
) const {
286 assert(file
.size() == 0 || file
[0] != '/');
287 std::string file_copy
= file
;
288 size_t first_underscore
= file_copy
.find_first_of('_');
289 return file_copy
.erase(first_underscore
,
290 file_copy
.find_last_of('.') - first_underscore
);
292 inline std::string
GetBackupMetaDir() const {
293 return GetAbsolutePath("meta");
295 inline std::string
GetBackupMetaFile(BackupID backup_id
, bool tmp
) const {
296 return GetBackupMetaDir() + "/" + (tmp
? "." : "") +
297 rocksdb::ToString(backup_id
) + (tmp
? ".tmp" : "");
300 // If size_limit == 0, there is no size limit, copy everything.
302 // Exactly one of src and contents must be non-empty.
304 // @param src If non-empty, the file is copied from this pathname.
305 // @param contents If non-empty, the file will be created with these contents.
306 Status
CopyOrCreateFile(const std::string
& src
, const std::string
& dst
,
307 const std::string
& contents
, Env
* src_env
,
308 Env
* dst_env
, bool sync
, RateLimiter
* rate_limiter
,
309 uint64_t* size
= nullptr,
310 uint32_t* checksum_value
= nullptr,
311 uint64_t size_limit
= 0,
312 std::function
<void()> progress_callback
= []() {});
314 Status
CalculateChecksum(const std::string
& src
,
317 uint32_t* checksum_value
);
319 struct CopyOrCreateResult
{
321 uint32_t checksum_value
;
325 // Exactly one of src_path and contents must be non-empty. If src_path is
326 // non-empty, the file is copied from this pathname. Otherwise, if contents is
327 // non-empty, the file will be created at dst_path with these contents.
328 struct CopyOrCreateWorkItem
{
329 std::string src_path
;
330 std::string dst_path
;
331 std::string contents
;
335 RateLimiter
* rate_limiter
;
337 std::promise
<CopyOrCreateResult
> result
;
338 std::function
<void()> progress_callback
;
340 CopyOrCreateWorkItem()
347 rate_limiter(nullptr),
350 CopyOrCreateWorkItem(const CopyOrCreateWorkItem
&) = delete;
351 CopyOrCreateWorkItem
& operator=(const CopyOrCreateWorkItem
&) = delete;
353 CopyOrCreateWorkItem(CopyOrCreateWorkItem
&& o
) ROCKSDB_NOEXCEPT
{
354 *this = std::move(o
);
357 CopyOrCreateWorkItem
& operator=(CopyOrCreateWorkItem
&& o
) ROCKSDB_NOEXCEPT
{
358 src_path
= std::move(o
.src_path
);
359 dst_path
= std::move(o
.dst_path
);
360 contents
= std::move(o
.contents
);
364 rate_limiter
= o
.rate_limiter
;
365 size_limit
= o
.size_limit
;
366 result
= std::move(o
.result
);
367 progress_callback
= std::move(o
.progress_callback
);
371 CopyOrCreateWorkItem(std::string _src_path
, std::string _dst_path
,
372 std::string _contents
, Env
* _src_env
, Env
* _dst_env
,
373 bool _sync
, RateLimiter
* _rate_limiter
,
374 uint64_t _size_limit
,
375 std::function
<void()> _progress_callback
= []() {})
376 : src_path(std::move(_src_path
)),
377 dst_path(std::move(_dst_path
)),
378 contents(std::move(_contents
)),
382 rate_limiter(_rate_limiter
),
383 size_limit(_size_limit
),
384 progress_callback(_progress_callback
) {}
387 struct BackupAfterCopyOrCreateWorkItem
{
388 std::future
<CopyOrCreateResult
> result
;
392 std::string dst_path_tmp
;
393 std::string dst_path
;
394 std::string dst_relative
;
395 BackupAfterCopyOrCreateWorkItem()
397 needed_to_copy(false),
403 BackupAfterCopyOrCreateWorkItem(BackupAfterCopyOrCreateWorkItem
&& o
)
405 *this = std::move(o
);
408 BackupAfterCopyOrCreateWorkItem
& operator=(
409 BackupAfterCopyOrCreateWorkItem
&& o
) ROCKSDB_NOEXCEPT
{
410 result
= std::move(o
.result
);
412 needed_to_copy
= o
.needed_to_copy
;
413 backup_env
= o
.backup_env
;
414 dst_path_tmp
= std::move(o
.dst_path_tmp
);
415 dst_path
= std::move(o
.dst_path
);
416 dst_relative
= std::move(o
.dst_relative
);
420 BackupAfterCopyOrCreateWorkItem(std::future
<CopyOrCreateResult
>&& _result
,
421 bool _shared
, bool _needed_to_copy
,
422 Env
* _backup_env
, std::string _dst_path_tmp
,
423 std::string _dst_path
,
424 std::string _dst_relative
)
425 : result(std::move(_result
)),
427 needed_to_copy(_needed_to_copy
),
428 backup_env(_backup_env
),
429 dst_path_tmp(std::move(_dst_path_tmp
)),
430 dst_path(std::move(_dst_path
)),
431 dst_relative(std::move(_dst_relative
)) {}
434 struct RestoreAfterCopyOrCreateWorkItem
{
435 std::future
<CopyOrCreateResult
> result
;
436 uint32_t checksum_value
;
437 RestoreAfterCopyOrCreateWorkItem()
438 : checksum_value(0) {}
439 RestoreAfterCopyOrCreateWorkItem(std::future
<CopyOrCreateResult
>&& _result
,
440 uint32_t _checksum_value
)
441 : result(std::move(_result
)), checksum_value(_checksum_value
) {}
442 RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem
&& o
)
444 *this = std::move(o
);
447 RestoreAfterCopyOrCreateWorkItem
& operator=(
448 RestoreAfterCopyOrCreateWorkItem
&& o
) ROCKSDB_NOEXCEPT
{
449 result
= std::move(o
.result
);
450 checksum_value
= o
.checksum_value
;
456 std::mutex byte_report_mutex_
;
457 channel
<CopyOrCreateWorkItem
> files_to_copy_or_create_
;
458 std::vector
<port::Thread
> threads_
;
460 // Adds a file to the backup work queue to be copied or created if it doesn't
463 // Exactly one of src_dir and contents must be non-empty.
465 // @param src_dir If non-empty, the file in this directory named fname will be
467 // @param fname Name of destination file and, in case of copy, source file.
468 // @param contents If non-empty, the file will be created with these contents.
469 Status
AddBackupFileWorkItem(
470 std::unordered_set
<std::string
>& live_dst_paths
,
471 std::vector
<BackupAfterCopyOrCreateWorkItem
>& backup_items_to_finish
,
472 BackupID backup_id
, bool shared
, const std::string
& src_dir
,
473 const std::string
& fname
, // starts with "/"
474 RateLimiter
* rate_limiter
, uint64_t size_bytes
, uint64_t size_limit
= 0,
475 bool shared_checksum
= false,
476 std::function
<void()> progress_callback
= []() {},
477 const std::string
& contents
= std::string());
480 BackupID latest_backup_id_
;
481 BackupID latest_valid_backup_id_
;
482 std::map
<BackupID
, unique_ptr
<BackupMeta
>> backups_
;
484 std::pair
<Status
, unique_ptr
<BackupMeta
>>> corrupt_backups_
;
485 std::unordered_map
<std::string
,
486 std::shared_ptr
<FileInfo
>> backuped_file_infos_
;
487 std::atomic
<bool> stop_backup_
;
490 BackupableDBOptions options_
;
495 unique_ptr
<Directory
> backup_directory_
;
496 unique_ptr
<Directory
> shared_directory_
;
497 unique_ptr
<Directory
> meta_directory_
;
498 unique_ptr
<Directory
> private_directory_
;
500 static const size_t kDefaultCopyFileBufferSize
= 5 * 1024 * 1024LL; // 5MB
501 size_t copy_file_buffer_size_
;
503 BackupStatistics backup_statistics_
;
504 static const size_t kMaxAppMetaSize
= 1024 * 1024; // 1MB
507 Status
BackupEngine::Open(Env
* env
, const BackupableDBOptions
& options
,
508 BackupEngine
** backup_engine_ptr
) {
509 std::unique_ptr
<BackupEngineImpl
> backup_engine(
510 new BackupEngineImpl(env
, options
));
511 auto s
= backup_engine
->Initialize();
513 *backup_engine_ptr
= nullptr;
516 *backup_engine_ptr
= backup_engine
.release();
520 BackupEngineImpl::BackupEngineImpl(Env
* db_env
,
521 const BackupableDBOptions
& options
,
523 : initialized_(false),
524 latest_backup_id_(0),
525 latest_valid_backup_id_(0),
529 backup_env_(options
.backup_env
!= nullptr ? options
.backup_env
: db_env_
),
530 copy_file_buffer_size_(kDefaultCopyFileBufferSize
),
531 read_only_(read_only
) {
532 if (options_
.backup_rate_limiter
== nullptr &&
533 options_
.backup_rate_limit
> 0) {
534 options_
.backup_rate_limiter
.reset(
535 NewGenericRateLimiter(options_
.backup_rate_limit
));
537 if (options_
.restore_rate_limiter
== nullptr &&
538 options_
.restore_rate_limit
> 0) {
539 options_
.restore_rate_limiter
.reset(
540 NewGenericRateLimiter(options_
.restore_rate_limit
));
544 BackupEngineImpl::~BackupEngineImpl() {
545 files_to_copy_or_create_
.sendEof();
546 for (auto& t
: threads_
) {
549 LogFlush(options_
.info_log
);
552 Status
BackupEngineImpl::Initialize() {
553 assert(!initialized_
);
556 ROCKS_LOG_INFO(options_
.info_log
, "Starting read_only backup engine");
558 options_
.Dump(options_
.info_log
);
561 // gather the list of directories that we need to create
562 std::vector
<std::pair
<std::string
, std::unique_ptr
<Directory
>*>>
564 directories
.emplace_back(GetAbsolutePath(), &backup_directory_
);
565 if (options_
.share_table_files
) {
566 if (options_
.share_files_with_checksum
) {
567 directories
.emplace_back(
568 GetAbsolutePath(GetSharedFileWithChecksumRel()),
571 directories
.emplace_back(GetAbsolutePath(GetSharedFileRel()),
575 directories
.emplace_back(GetAbsolutePath(GetPrivateDirRel()),
576 &private_directory_
);
577 directories
.emplace_back(GetBackupMetaDir(), &meta_directory_
);
578 // create all the dirs we need
579 for (const auto& d
: directories
) {
580 auto s
= backup_env_
->CreateDirIfMissing(d
.first
);
582 s
= backup_env_
->NewDirectory(d
.first
, d
.second
);
590 std::vector
<std::string
> backup_meta_files
;
592 auto s
= backup_env_
->GetChildren(GetBackupMetaDir(), &backup_meta_files
);
593 if (s
.IsNotFound()) {
594 return Status::NotFound(GetBackupMetaDir() + " is missing");
595 } else if (!s
.ok()) {
599 // create backups_ structure
600 for (auto& file
: backup_meta_files
) {
601 if (file
== "." || file
== "..") {
604 ROCKS_LOG_INFO(options_
.info_log
, "Detected backup %s", file
.c_str());
605 BackupID backup_id
= 0;
606 sscanf(file
.c_str(), "%u", &backup_id
);
607 if (backup_id
== 0 || file
!= rocksdb::ToString(backup_id
)) {
609 // invalid file name, delete that
610 auto s
= backup_env_
->DeleteFile(GetBackupMetaDir() + "/" + file
);
611 ROCKS_LOG_INFO(options_
.info_log
,
612 "Unrecognized meta file %s, deleting -- %s",
613 file
.c_str(), s
.ToString().c_str());
617 assert(backups_
.find(backup_id
) == backups_
.end());
618 backups_
.insert(std::make_pair(
619 backup_id
, unique_ptr
<BackupMeta
>(new BackupMeta(
620 GetBackupMetaFile(backup_id
, false /* tmp */),
621 GetBackupMetaFile(backup_id
, true /* tmp */),
622 &backuped_file_infos_
, backup_env_
))));
625 latest_backup_id_
= 0;
626 latest_valid_backup_id_
= 0;
627 if (options_
.destroy_old_data
) { // Destroy old data
631 "Backup Engine started with destroy_old_data == true, deleting all "
633 auto s
= PurgeOldBackups(0);
635 s
= GarbageCollect();
640 } else { // Load data from storage
641 std::unordered_map
<std::string
, uint64_t> abs_path_to_size
;
642 for (const auto& rel_dir
:
643 {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
644 const auto abs_dir
= GetAbsolutePath(rel_dir
);
645 InsertPathnameToSizeBytes(abs_dir
, backup_env_
, &abs_path_to_size
);
647 // load the backups if any, until valid_backups_to_open of the latest
648 // non-corrupted backups have been successfully opened.
649 int valid_backups_to_open
= options_
.max_valid_backups_to_open
;
650 for (auto backup_iter
= backups_
.rbegin();
651 backup_iter
!= backups_
.rend();
653 assert(latest_backup_id_
== 0 || latest_backup_id_
> backup_iter
->first
);
654 if (latest_backup_id_
== 0) {
655 latest_backup_id_
= backup_iter
->first
;
657 if (valid_backups_to_open
== 0) {
661 InsertPathnameToSizeBytes(
662 GetAbsolutePath(GetPrivateFileRel(backup_iter
->first
)), backup_env_
,
664 Status s
= backup_iter
->second
->LoadFromFile(options_
.backup_dir
,
666 if (s
.IsCorruption()) {
667 ROCKS_LOG_INFO(options_
.info_log
, "Backup %u corrupted -- %s",
668 backup_iter
->first
, s
.ToString().c_str());
669 corrupt_backups_
.insert(
670 std::make_pair(backup_iter
->first
,
671 std::make_pair(s
, std::move(backup_iter
->second
))));
672 } else if (!s
.ok()) {
673 // Distinguish corruption errors from errors in the backup Env.
674 // Errors in the backup Env (i.e., this code path) will cause Open() to
675 // fail, whereas corruption errors would not cause Open() failures.
678 ROCKS_LOG_INFO(options_
.info_log
, "Loading backup %" PRIu32
" OK:\n%s",
680 backup_iter
->second
->GetInfoString().c_str());
681 assert(latest_valid_backup_id_
== 0 ||
682 latest_valid_backup_id_
> backup_iter
->first
);
683 if (latest_valid_backup_id_
== 0) {
684 latest_valid_backup_id_
= backup_iter
->first
;
686 --valid_backups_to_open
;
690 for (const auto& corrupt
: corrupt_backups_
) {
691 backups_
.erase(backups_
.find(corrupt
.first
));
693 // erase the backups before max_valid_backups_to_open
694 int num_unopened_backups
;
695 if (options_
.max_valid_backups_to_open
== 0) {
696 num_unopened_backups
= 0;
698 num_unopened_backups
=
699 std::max(0, static_cast<int>(backups_
.size()) -
700 options_
.max_valid_backups_to_open
);
702 for (int i
= 0; i
< num_unopened_backups
; ++i
) {
703 assert(backups_
.begin()->second
->Empty());
704 backups_
.erase(backups_
.begin());
708 ROCKS_LOG_INFO(options_
.info_log
, "Latest backup is %u", latest_backup_id_
);
709 ROCKS_LOG_INFO(options_
.info_log
, "Latest valid backup is %u",
710 latest_valid_backup_id_
);
712 // set up threads perform copies from files_to_copy_or_create_ in the
714 for (int t
= 0; t
< options_
.max_background_operations
; t
++) {
715 threads_
.emplace_back([this]() {
716 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
717 #if __GLIBC_PREREQ(2, 12)
718 pthread_setname_np(pthread_self(), "backup_engine");
721 CopyOrCreateWorkItem work_item
;
722 while (files_to_copy_or_create_
.read(work_item
)) {
723 CopyOrCreateResult result
;
724 result
.status
= CopyOrCreateFile(
725 work_item
.src_path
, work_item
.dst_path
, work_item
.contents
,
726 work_item
.src_env
, work_item
.dst_env
, work_item
.sync
,
727 work_item
.rate_limiter
, &result
.size
, &result
.checksum_value
,
728 work_item
.size_limit
, work_item
.progress_callback
);
729 work_item
.result
.set_value(std::move(result
));
733 ROCKS_LOG_INFO(options_
.info_log
, "Initialized BackupEngine");
738 Status
BackupEngineImpl::CreateNewBackupWithMetadata(
739 DB
* db
, const std::string
& app_metadata
, bool flush_before_backup
,
740 std::function
<void()> progress_callback
) {
741 assert(initialized_
);
743 if (app_metadata
.size() > kMaxAppMetaSize
) {
744 return Status::InvalidArgument("App metadata too large");
747 BackupID new_backup_id
= latest_backup_id_
+ 1;
749 assert(backups_
.find(new_backup_id
) == backups_
.end());
751 auto private_dir
= GetAbsolutePath(GetPrivateFileRel(new_backup_id
));
752 Status s
= backup_env_
->FileExists(private_dir
);
754 // maybe last backup failed and left partial state behind, clean it up.
755 // need to do this before updating backups_ such that a private dir
756 // named after new_backup_id will be cleaned up
757 s
= GarbageCollect();
758 } else if (s
.IsNotFound()) {
759 // normal case, the new backup's private dir doesn't exist yet
763 auto ret
= backups_
.insert(std::make_pair(
764 new_backup_id
, unique_ptr
<BackupMeta
>(new BackupMeta(
765 GetBackupMetaFile(new_backup_id
, false /* tmp */),
766 GetBackupMetaFile(new_backup_id
, true /* tmp */),
767 &backuped_file_infos_
, backup_env_
))));
768 assert(ret
.second
== true);
769 auto& new_backup
= ret
.first
->second
;
770 new_backup
->RecordTimestamp();
771 new_backup
->SetAppMetadata(app_metadata
);
773 auto start_backup
= backup_env_
->NowMicros();
775 ROCKS_LOG_INFO(options_
.info_log
,
776 "Started the backup process -- creating backup %u",
779 s
= backup_env_
->CreateDir(private_dir
);
782 RateLimiter
* rate_limiter
= options_
.backup_rate_limiter
.get();
784 copy_file_buffer_size_
= static_cast<size_t>(rate_limiter
->GetSingleBurstBytes());
787 // A set into which we will insert the dst_paths that are calculated for live
788 // files and live WAL files.
789 // This is used to check whether a live files shares a dst_path with another
791 std::unordered_set
<std::string
> live_dst_paths
;
793 std::vector
<BackupAfterCopyOrCreateWorkItem
> backup_items_to_finish
;
794 // Add a CopyOrCreateWorkItem to the channel for each live file
795 db
->DisableFileDeletions();
797 CheckpointImpl
checkpoint(db
);
798 uint64_t sequence_number
= 0;
799 s
= checkpoint
.CreateCustomCheckpoint(
801 [&](const std::string
& /*src_dirname*/, const std::string
& /*fname*/,
803 // custom checkpoint will switch to calling copy_file_cb after it sees
804 // NotSupported returned from link_file_cb.
805 return Status::NotSupported();
806 } /* link_file_cb */,
807 [&](const std::string
& src_dirname
, const std::string
& fname
,
808 uint64_t size_limit_bytes
, FileType type
) {
809 if (type
== kLogFile
&& !options_
.backup_log_files
) {
812 Log(options_
.info_log
, "add file for backup %s", fname
.c_str());
813 uint64_t size_bytes
= 0;
815 if (type
== kTableFile
) {
816 st
= db_env_
->GetFileSize(src_dirname
+ fname
, &size_bytes
);
819 st
= AddBackupFileWorkItem(
820 live_dst_paths
, backup_items_to_finish
, new_backup_id
,
821 options_
.share_table_files
&& type
== kTableFile
, src_dirname
,
822 fname
, rate_limiter
, size_bytes
, size_limit_bytes
,
823 options_
.share_files_with_checksum
&& type
== kTableFile
,
827 } /* copy_file_cb */,
828 [&](const std::string
& fname
, const std::string
& contents
, FileType
) {
829 Log(options_
.info_log
, "add file for backup %s", fname
.c_str());
830 return AddBackupFileWorkItem(
831 live_dst_paths
, backup_items_to_finish
, new_backup_id
,
832 false /* shared */, "" /* src_dir */, fname
, rate_limiter
,
833 contents
.size(), 0 /* size_limit */, false /* shared_checksum */,
834 progress_callback
, contents
);
835 } /* create_file_cb */,
836 &sequence_number
, flush_before_backup
? 0 : port::kMaxUint64
);
838 new_backup
->SetSequenceNumber(sequence_number
);
841 ROCKS_LOG_INFO(options_
.info_log
, "add files for backup done, wait finish.");
843 for (auto& item
: backup_items_to_finish
) {
845 auto result
= item
.result
.get();
846 item_status
= result
.status
;
847 if (item_status
.ok() && item
.shared
&& item
.needed_to_copy
) {
848 item_status
= item
.backup_env
->RenameFile(item
.dst_path_tmp
,
851 if (item_status
.ok()) {
852 item_status
= new_backup
.get()->AddFile(
853 std::make_shared
<FileInfo
>(item
.dst_relative
,
855 result
.checksum_value
));
857 if (!item_status
.ok()) {
862 // we copied all the files, enable file deletions
863 db
->EnableFileDeletions(false);
865 auto backup_time
= backup_env_
->NowMicros() - start_backup
;
868 // persist the backup metadata on the disk
869 s
= new_backup
->StoreToFile(options_
.sync
);
871 if (s
.ok() && options_
.sync
) {
872 unique_ptr
<Directory
> backup_private_directory
;
873 backup_env_
->NewDirectory(
874 GetAbsolutePath(GetPrivateFileRel(new_backup_id
, false)),
875 &backup_private_directory
);
876 if (backup_private_directory
!= nullptr) {
877 s
= backup_private_directory
->Fsync();
879 if (s
.ok() && private_directory_
!= nullptr) {
880 s
= private_directory_
->Fsync();
882 if (s
.ok() && meta_directory_
!= nullptr) {
883 s
= meta_directory_
->Fsync();
885 if (s
.ok() && shared_directory_
!= nullptr) {
886 s
= shared_directory_
->Fsync();
888 if (s
.ok() && backup_directory_
!= nullptr) {
889 s
= backup_directory_
->Fsync();
894 backup_statistics_
.IncrementNumberSuccessBackup();
897 backup_statistics_
.IncrementNumberFailBackup();
898 // clean all the files we might have created
899 ROCKS_LOG_INFO(options_
.info_log
, "Backup failed -- %s",
900 s
.ToString().c_str());
901 ROCKS_LOG_INFO(options_
.info_log
, "Backup Statistics %s\n",
902 backup_statistics_
.ToString().c_str());
903 // delete files that we might have already written
904 DeleteBackup(new_backup_id
);
909 // here we know that we succeeded and installed the new backup
910 // in the LATEST_BACKUP file
911 latest_backup_id_
= new_backup_id
;
912 latest_valid_backup_id_
= new_backup_id
;
913 ROCKS_LOG_INFO(options_
.info_log
, "Backup DONE. All is good");
915 // backup_speed is in byte/second
916 double backup_speed
= new_backup
->GetSize() / (1.048576 * backup_time
);
917 ROCKS_LOG_INFO(options_
.info_log
, "Backup number of files: %u",
918 new_backup
->GetNumberFiles());
920 AppendHumanBytes(new_backup
->GetSize(), human_size
, sizeof(human_size
));
921 ROCKS_LOG_INFO(options_
.info_log
, "Backup size: %s", human_size
);
922 ROCKS_LOG_INFO(options_
.info_log
, "Backup time: %" PRIu64
" microseconds",
924 ROCKS_LOG_INFO(options_
.info_log
, "Backup speed: %.3f MB/s", backup_speed
);
925 ROCKS_LOG_INFO(options_
.info_log
, "Backup Statistics %s",
926 backup_statistics_
.ToString().c_str());
930 Status
BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep
) {
931 assert(initialized_
);
933 ROCKS_LOG_INFO(options_
.info_log
, "Purging old backups, keeping %u",
934 num_backups_to_keep
);
935 std::vector
<BackupID
> to_delete
;
936 auto itr
= backups_
.begin();
937 while ((backups_
.size() - to_delete
.size()) > num_backups_to_keep
) {
938 to_delete
.push_back(itr
->first
);
941 for (auto backup_id
: to_delete
) {
942 auto s
= DeleteBackup(backup_id
);
950 Status
BackupEngineImpl::DeleteBackup(BackupID backup_id
) {
951 assert(initialized_
);
953 ROCKS_LOG_INFO(options_
.info_log
, "Deleting backup %u", backup_id
);
954 auto backup
= backups_
.find(backup_id
);
955 if (backup
!= backups_
.end()) {
956 auto s
= backup
->second
->Delete();
960 backups_
.erase(backup
);
962 auto corrupt
= corrupt_backups_
.find(backup_id
);
963 if (corrupt
== corrupt_backups_
.end()) {
964 return Status::NotFound("Backup not found");
966 auto s
= corrupt
->second
.second
->Delete();
970 corrupt_backups_
.erase(corrupt
);
973 if (options_
.max_valid_backups_to_open
== port::kMaxInt32
) {
974 std::vector
<std::string
> to_delete
;
975 for (auto& itr
: backuped_file_infos_
) {
976 if (itr
.second
->refs
== 0) {
977 Status s
= backup_env_
->DeleteFile(GetAbsolutePath(itr
.first
));
978 ROCKS_LOG_INFO(options_
.info_log
, "Deleting %s -- %s",
979 itr
.first
.c_str(), s
.ToString().c_str());
980 to_delete
.push_back(itr
.first
);
983 for (auto& td
: to_delete
) {
984 backuped_file_infos_
.erase(td
);
989 "DeleteBackup cleanup is limited since `max_valid_backups_to_open` "
990 "constrains how many backups the engine knows about");
993 // take care of private dirs -- GarbageCollect() will take care of them
994 // if they are not empty
995 std::string private_dir
= GetPrivateFileRel(backup_id
);
996 Status s
= backup_env_
->DeleteDir(GetAbsolutePath(private_dir
));
997 ROCKS_LOG_INFO(options_
.info_log
, "Deleting private dir %s -- %s",
998 private_dir
.c_str(), s
.ToString().c_str());
1002 void BackupEngineImpl::GetBackupInfo(std::vector
<BackupInfo
>* backup_info
) {
1003 assert(initialized_
);
1004 backup_info
->reserve(backups_
.size());
1005 for (auto& backup
: backups_
) {
1006 if (!backup
.second
->Empty()) {
1007 backup_info
->push_back(BackupInfo(
1008 backup
.first
, backup
.second
->GetTimestamp(), backup
.second
->GetSize(),
1009 backup
.second
->GetNumberFiles(), backup
.second
->GetAppMetadata()));
1015 BackupEngineImpl::GetCorruptedBackups(
1016 std::vector
<BackupID
>* corrupt_backup_ids
) {
1017 assert(initialized_
);
1018 corrupt_backup_ids
->reserve(corrupt_backups_
.size());
1019 for (auto& backup
: corrupt_backups_
) {
1020 corrupt_backup_ids
->push_back(backup
.first
);
1024 Status
BackupEngineImpl::RestoreDBFromBackup(
1025 BackupID backup_id
, const std::string
& db_dir
, const std::string
& wal_dir
,
1026 const RestoreOptions
& restore_options
) {
1027 assert(initialized_
);
1028 auto corrupt_itr
= corrupt_backups_
.find(backup_id
);
1029 if (corrupt_itr
!= corrupt_backups_
.end()) {
1030 return corrupt_itr
->second
.first
;
1032 auto backup_itr
= backups_
.find(backup_id
);
1033 if (backup_itr
== backups_
.end()) {
1034 return Status::NotFound("Backup not found");
1036 auto& backup
= backup_itr
->second
;
1037 if (backup
->Empty()) {
1038 return Status::NotFound("Backup not found");
1041 ROCKS_LOG_INFO(options_
.info_log
, "Restoring backup id %u\n", backup_id
);
1042 ROCKS_LOG_INFO(options_
.info_log
, "keep_log_files: %d\n",
1043 static_cast<int>(restore_options
.keep_log_files
));
1045 // just in case. Ignore errors
1046 db_env_
->CreateDirIfMissing(db_dir
);
1047 db_env_
->CreateDirIfMissing(wal_dir
);
1049 if (restore_options
.keep_log_files
) {
1050 // delete files in db_dir, but keep all the log files
1051 DeleteChildren(db_dir
, 1 << kLogFile
);
1052 // move all the files from archive dir to wal_dir
1053 std::string archive_dir
= ArchivalDirectory(wal_dir
);
1054 std::vector
<std::string
> archive_files
;
1055 db_env_
->GetChildren(archive_dir
, &archive_files
); // ignore errors
1056 for (const auto& f
: archive_files
) {
1059 bool ok
= ParseFileName(f
, &number
, &type
);
1060 if (ok
&& type
== kLogFile
) {
1061 ROCKS_LOG_INFO(options_
.info_log
,
1062 "Moving log file from archive/ to wal_dir: %s",
1065 db_env_
->RenameFile(archive_dir
+ "/" + f
, wal_dir
+ "/" + f
);
1067 // if we can't move log file from archive_dir to wal_dir,
1068 // we should fail, since it might mean data loss
1074 DeleteChildren(wal_dir
);
1075 DeleteChildren(ArchivalDirectory(wal_dir
));
1076 DeleteChildren(db_dir
);
1079 RateLimiter
* rate_limiter
= options_
.restore_rate_limiter
.get();
1081 copy_file_buffer_size_
= static_cast<size_t>(rate_limiter
->GetSingleBurstBytes());
1084 std::vector
<RestoreAfterCopyOrCreateWorkItem
> restore_items_to_finish
;
1085 for (const auto& file_info
: backup
->GetFiles()) {
1086 const std::string
&file
= file_info
->filename
;
1088 // 1. extract the filename
1089 size_t slash
= file
.find_last_of('/');
1090 // file will either be shared/<file>, shared_checksum/<file_crc32_size>
1091 // or private/<number>/<file>
1092 assert(slash
!= std::string::npos
);
1093 dst
= file
.substr(slash
+ 1);
1095 // if the file was in shared_checksum, extract the real file name
1096 // in this case the file is <number>_<checksum>_<size>.<type>
1097 if (file
.substr(0, slash
) == GetSharedChecksumDirRel()) {
1098 dst
= GetFileFromChecksumFile(dst
);
1101 // 2. find the filetype
1104 bool ok
= ParseFileName(dst
, &number
, &type
);
1106 return Status::Corruption("Backup corrupted");
1108 // 3. Construct the final path
1109 // kLogFile lives in wal_dir and all the rest live in db_dir
1110 dst
= ((type
== kLogFile
) ? wal_dir
: db_dir
) +
1113 ROCKS_LOG_INFO(options_
.info_log
, "Restoring %s to %s\n", file
.c_str(),
1115 CopyOrCreateWorkItem
copy_or_create_work_item(
1116 GetAbsolutePath(file
), dst
, "" /* contents */, backup_env_
, db_env_
,
1117 false, rate_limiter
, 0 /* size_limit */);
1118 RestoreAfterCopyOrCreateWorkItem
after_copy_or_create_work_item(
1119 copy_or_create_work_item
.result
.get_future(),
1120 file_info
->checksum_value
);
1121 files_to_copy_or_create_
.write(std::move(copy_or_create_work_item
));
1122 restore_items_to_finish
.push_back(
1123 std::move(after_copy_or_create_work_item
));
1126 for (auto& item
: restore_items_to_finish
) {
1128 auto result
= item
.result
.get();
1129 item_status
= result
.status
;
1130 // Note: It is possible that both of the following bad-status cases occur
1131 // during copying. But, we only return one status.
1132 if (!item_status
.ok()) {
1135 } else if (item
.checksum_value
!= result
.checksum_value
) {
1136 s
= Status::Corruption("Checksum check failed");
1141 ROCKS_LOG_INFO(options_
.info_log
, "Restoring done -- %s\n",
1142 s
.ToString().c_str());
1146 Status
BackupEngineImpl::VerifyBackup(BackupID backup_id
) {
1147 assert(initialized_
);
1148 auto corrupt_itr
= corrupt_backups_
.find(backup_id
);
1149 if (corrupt_itr
!= corrupt_backups_
.end()) {
1150 return corrupt_itr
->second
.first
;
1153 auto backup_itr
= backups_
.find(backup_id
);
1154 if (backup_itr
== backups_
.end()) {
1155 return Status::NotFound();
1158 auto& backup
= backup_itr
->second
;
1159 if (backup
->Empty()) {
1160 return Status::NotFound();
1163 ROCKS_LOG_INFO(options_
.info_log
, "Verifying backup id %u\n", backup_id
);
1165 std::unordered_map
<std::string
, uint64_t> curr_abs_path_to_size
;
1166 for (const auto& rel_dir
: {GetPrivateFileRel(backup_id
), GetSharedFileRel(),
1167 GetSharedFileWithChecksumRel()}) {
1168 const auto abs_dir
= GetAbsolutePath(rel_dir
);
1169 InsertPathnameToSizeBytes(abs_dir
, backup_env_
, &curr_abs_path_to_size
);
1172 for (const auto& file_info
: backup
->GetFiles()) {
1173 const auto abs_path
= GetAbsolutePath(file_info
->filename
);
1174 if (curr_abs_path_to_size
.find(abs_path
) == curr_abs_path_to_size
.end()) {
1175 return Status::NotFound("File missing: " + abs_path
);
1177 if (file_info
->size
!= curr_abs_path_to_size
[abs_path
]) {
1178 return Status::Corruption("File corrupted: " + abs_path
);
1181 return Status::OK();
1184 Status
BackupEngineImpl::CopyOrCreateFile(
1185 const std::string
& src
, const std::string
& dst
, const std::string
& contents
,
1186 Env
* src_env
, Env
* dst_env
, bool sync
, RateLimiter
* rate_limiter
,
1187 uint64_t* size
, uint32_t* checksum_value
, uint64_t size_limit
,
1188 std::function
<void()> progress_callback
) {
1189 assert(src
.empty() != contents
.empty());
1191 unique_ptr
<WritableFile
> dst_file
;
1192 unique_ptr
<SequentialFile
> src_file
;
1193 EnvOptions env_options
;
1194 env_options
.use_mmap_writes
= false;
1195 // TODO:(gzh) maybe use direct reads/writes here if possible
1196 if (size
!= nullptr) {
1199 if (checksum_value
!= nullptr) {
1200 *checksum_value
= 0;
1203 // Check if size limit is set. if not, set it to very big number
1204 if (size_limit
== 0) {
1205 size_limit
= std::numeric_limits
<uint64_t>::max();
1208 s
= dst_env
->NewWritableFile(dst
, &dst_file
, env_options
);
1209 if (s
.ok() && !src
.empty()) {
1210 s
= src_env
->NewSequentialFile(src
, &src_file
, env_options
);
1216 unique_ptr
<WritableFileWriter
> dest_writer(
1217 new WritableFileWriter(std::move(dst_file
), dst
, env_options
));
1218 unique_ptr
<SequentialFileReader
> src_reader
;
1219 unique_ptr
<char[]> buf
;
1221 src_reader
.reset(new SequentialFileReader(std::move(src_file
), src
));
1222 buf
.reset(new char[copy_file_buffer_size_
]);
1226 uint64_t processed_buffer_size
= 0;
1228 if (stop_backup_
.load(std::memory_order_acquire
)) {
1229 return Status::Incomplete("Backup stopped");
1232 size_t buffer_to_read
= (copy_file_buffer_size_
< size_limit
)
1233 ? copy_file_buffer_size_
1234 : static_cast<size_t>(size_limit
);
1235 s
= src_reader
->Read(buffer_to_read
, &data
, buf
.get());
1236 processed_buffer_size
+= buffer_to_read
;
1240 size_limit
-= data
.size();
1246 if (size
!= nullptr) {
1247 *size
+= data
.size();
1249 if (checksum_value
!= nullptr) {
1251 crc32c::Extend(*checksum_value
, data
.data(), data
.size());
1253 s
= dest_writer
->Append(data
);
1254 if (rate_limiter
!= nullptr) {
1255 rate_limiter
->Request(data
.size(), Env::IO_LOW
, nullptr /* stats */,
1256 RateLimiter::OpType::kWrite
);
1258 if (processed_buffer_size
> options_
.callback_trigger_interval_size
) {
1259 processed_buffer_size
-= options_
.callback_trigger_interval_size
;
1260 std::lock_guard
<std::mutex
> lock(byte_report_mutex_
);
1261 progress_callback();
1263 } while (s
.ok() && contents
.empty() && data
.size() > 0 && size_limit
> 0);
1265 if (s
.ok() && sync
) {
1266 s
= dest_writer
->Sync(false);
1269 s
= dest_writer
->Close();
1274 // fname will always start with "/"
1275 Status
BackupEngineImpl::AddBackupFileWorkItem(
1276 std::unordered_set
<std::string
>& live_dst_paths
,
1277 std::vector
<BackupAfterCopyOrCreateWorkItem
>& backup_items_to_finish
,
1278 BackupID backup_id
, bool shared
, const std::string
& src_dir
,
1279 const std::string
& fname
, RateLimiter
* rate_limiter
, uint64_t size_bytes
,
1280 uint64_t size_limit
, bool shared_checksum
,
1281 std::function
<void()> progress_callback
, const std::string
& contents
) {
1282 assert(!fname
.empty() && fname
[0] == '/');
1283 assert(contents
.empty() != src_dir
.empty());
1285 std::string dst_relative
= fname
.substr(1);
1286 std::string dst_relative_tmp
;
1288 uint32_t checksum_value
= 0;
1290 if (shared
&& shared_checksum
) {
1291 // add checksum and file length to the file name
1292 s
= CalculateChecksum(src_dir
+ fname
, db_env_
, size_limit
,
1297 if (size_bytes
== port::kMaxUint64
) {
1298 return Status::NotFound("File missing: " + src_dir
+ fname
);
1301 GetSharedFileWithChecksum(dst_relative
, checksum_value
, size_bytes
);
1302 dst_relative_tmp
= GetSharedFileWithChecksumRel(dst_relative
, true);
1303 dst_relative
= GetSharedFileWithChecksumRel(dst_relative
, false);
1304 } else if (shared
) {
1305 dst_relative_tmp
= GetSharedFileRel(dst_relative
, true);
1306 dst_relative
= GetSharedFileRel(dst_relative
, false);
1308 dst_relative
= GetPrivateFileRel(backup_id
, false, dst_relative
);
1311 // We copy into `temp_dest_path` and, once finished, rename it to
1312 // `final_dest_path`. This allows files to atomically appear at
1313 // `final_dest_path`. We can copy directly to the final path when atomicity
1314 // is unnecessary, like for files in private backup directories.
1315 const std::string
* copy_dest_path
;
1316 std::string temp_dest_path
;
1317 std::string final_dest_path
= GetAbsolutePath(dst_relative
);
1318 if (!dst_relative_tmp
.empty()) {
1319 temp_dest_path
= GetAbsolutePath(dst_relative_tmp
);
1320 copy_dest_path
= &temp_dest_path
;
1322 copy_dest_path
= &final_dest_path
;
1325 // if it's shared, we also need to check if it exists -- if it does, no need
1326 // to copy it again.
1327 bool need_to_copy
= true;
1328 // true if final_dest_path is the same path as another live file
1329 const bool same_path
=
1330 live_dst_paths
.find(final_dest_path
) != live_dst_paths
.end();
1332 bool file_exists
= false;
1333 if (shared
&& !same_path
) {
1334 Status exist
= backup_env_
->FileExists(final_dest_path
);
1337 } else if (exist
.IsNotFound()) {
1338 file_exists
= false;
1340 assert(s
.IsIOError());
1345 if (!contents
.empty()) {
1346 need_to_copy
= false;
1347 } else if (shared
&& (same_path
|| file_exists
)) {
1348 need_to_copy
= false;
1349 if (shared_checksum
) {
1350 ROCKS_LOG_INFO(options_
.info_log
,
1351 "%s already present, with checksum %u and size %" PRIu64
,
1352 fname
.c_str(), checksum_value
, size_bytes
);
1353 } else if (backuped_file_infos_
.find(dst_relative
) ==
1354 backuped_file_infos_
.end() && !same_path
) {
1355 // file already exists, but it's not referenced by any backup. overwrite
1359 "%s already present, but not referenced by any backup. We will "
1360 "overwrite the file.",
1362 need_to_copy
= true;
1363 backup_env_
->DeleteFile(final_dest_path
);
1365 // the file is present and referenced by a backup
1366 ROCKS_LOG_INFO(options_
.info_log
,
1367 "%s already present, calculate checksum", fname
.c_str());
1368 s
= CalculateChecksum(src_dir
+ fname
, db_env_
, size_limit
,
1372 live_dst_paths
.insert(final_dest_path
);
1374 if (!contents
.empty() || need_to_copy
) {
1375 ROCKS_LOG_INFO(options_
.info_log
, "Copying %s to %s", fname
.c_str(),
1376 copy_dest_path
->c_str());
1377 CopyOrCreateWorkItem
copy_or_create_work_item(
1378 src_dir
.empty() ? "" : src_dir
+ fname
, *copy_dest_path
, contents
,
1379 db_env_
, backup_env_
, options_
.sync
, rate_limiter
, size_limit
,
1381 BackupAfterCopyOrCreateWorkItem
after_copy_or_create_work_item(
1382 copy_or_create_work_item
.result
.get_future(), shared
, need_to_copy
,
1383 backup_env_
, temp_dest_path
, final_dest_path
, dst_relative
);
1384 files_to_copy_or_create_
.write(std::move(copy_or_create_work_item
));
1385 backup_items_to_finish
.push_back(std::move(after_copy_or_create_work_item
));
1387 std::promise
<CopyOrCreateResult
> promise_result
;
1388 BackupAfterCopyOrCreateWorkItem
after_copy_or_create_work_item(
1389 promise_result
.get_future(), shared
, need_to_copy
, backup_env_
,
1390 temp_dest_path
, final_dest_path
, dst_relative
);
1391 backup_items_to_finish
.push_back(std::move(after_copy_or_create_work_item
));
1392 CopyOrCreateResult result
;
1394 result
.size
= size_bytes
;
1395 result
.checksum_value
= checksum_value
;
1396 promise_result
.set_value(std::move(result
));
1401 Status
BackupEngineImpl::CalculateChecksum(const std::string
& src
, Env
* src_env
,
1402 uint64_t size_limit
,
1403 uint32_t* checksum_value
) {
1404 *checksum_value
= 0;
1405 if (size_limit
== 0) {
1406 size_limit
= std::numeric_limits
<uint64_t>::max();
1409 EnvOptions env_options
;
1410 env_options
.use_mmap_writes
= false;
1411 env_options
.use_direct_reads
= false;
1413 std::unique_ptr
<SequentialFile
> src_file
;
1414 Status s
= src_env
->NewSequentialFile(src
, &src_file
, env_options
);
1419 unique_ptr
<SequentialFileReader
> src_reader(
1420 new SequentialFileReader(std::move(src_file
), src
));
1421 std::unique_ptr
<char[]> buf(new char[copy_file_buffer_size_
]);
1425 if (stop_backup_
.load(std::memory_order_acquire
)) {
1426 return Status::Incomplete("Backup stopped");
1428 size_t buffer_to_read
= (copy_file_buffer_size_
< size_limit
) ?
1429 copy_file_buffer_size_
: static_cast<size_t>(size_limit
);
1430 s
= src_reader
->Read(buffer_to_read
, &data
, buf
.get());
1436 size_limit
-= data
.size();
1437 *checksum_value
= crc32c::Extend(*checksum_value
, data
.data(), data
.size());
1438 } while (data
.size() > 0 && size_limit
> 0);
1443 void BackupEngineImpl::DeleteChildren(const std::string
& dir
,
1444 uint32_t file_type_filter
) {
1445 std::vector
<std::string
> children
;
1446 db_env_
->GetChildren(dir
, &children
); // ignore errors
1448 for (const auto& f
: children
) {
1451 bool ok
= ParseFileName(f
, &number
, &type
);
1452 if (ok
&& (file_type_filter
& (1 << type
))) {
1453 // don't delete this file
1456 db_env_
->DeleteFile(dir
+ "/" + f
); // ignore errors
1460 Status
BackupEngineImpl::InsertPathnameToSizeBytes(
1461 const std::string
& dir
, Env
* env
,
1462 std::unordered_map
<std::string
, uint64_t>* result
) {
1463 assert(result
!= nullptr);
1464 std::vector
<Env::FileAttributes
> files_attrs
;
1465 Status status
= env
->FileExists(dir
);
1467 status
= env
->GetChildrenFileAttributes(dir
, &files_attrs
);
1468 } else if (status
.IsNotFound()) {
1469 // Insert no entries can be considered success
1470 status
= Status::OK();
1472 const bool slash_needed
= dir
.empty() || dir
.back() != '/';
1473 for (const auto& file_attrs
: files_attrs
) {
1474 result
->emplace(dir
+ (slash_needed
? "/" : "") + file_attrs
.name
,
1475 file_attrs
.size_bytes
);
1480 Status
BackupEngineImpl::GarbageCollect() {
1481 assert(!read_only_
);
1482 ROCKS_LOG_INFO(options_
.info_log
, "Starting garbage collection");
1483 if (options_
.max_valid_backups_to_open
== port::kMaxInt32
) {
1486 "Garbage collection is limited since `max_valid_backups_to_open` "
1487 "constrains how many backups the engine knows about");
1490 if (options_
.share_table_files
&&
1491 options_
.max_valid_backups_to_open
== port::kMaxInt32
) {
1492 // delete obsolete shared files
1493 // we cannot do this when BackupEngine has `max_valid_backups_to_open` set
1494 // as those engines don't know about all shared files.
1495 std::vector
<std::string
> shared_children
;
1497 std::string shared_path
;
1498 if (options_
.share_files_with_checksum
) {
1499 shared_path
= GetAbsolutePath(GetSharedFileWithChecksumRel());
1501 shared_path
= GetAbsolutePath(GetSharedFileRel());
1503 auto s
= backup_env_
->FileExists(shared_path
);
1505 s
= backup_env_
->GetChildren(shared_path
, &shared_children
);
1506 } else if (s
.IsNotFound()) {
1513 for (auto& child
: shared_children
) {
1514 std::string rel_fname
;
1515 if (options_
.share_files_with_checksum
) {
1516 rel_fname
= GetSharedFileWithChecksumRel(child
);
1518 rel_fname
= GetSharedFileRel(child
);
1520 auto child_itr
= backuped_file_infos_
.find(rel_fname
);
1521 // if it's not refcounted, delete it
1522 if (child_itr
== backuped_file_infos_
.end() ||
1523 child_itr
->second
->refs
== 0) {
1524 // this might be a directory, but DeleteFile will just fail in that
1525 // case, so we're good
1526 Status s
= backup_env_
->DeleteFile(GetAbsolutePath(rel_fname
));
1527 ROCKS_LOG_INFO(options_
.info_log
, "Deleting %s -- %s",
1528 rel_fname
.c_str(), s
.ToString().c_str());
1529 backuped_file_infos_
.erase(rel_fname
);
1534 // delete obsolete private files
1535 std::vector
<std::string
> private_children
;
1537 auto s
= backup_env_
->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
1543 for (auto& child
: private_children
) {
1544 // it's ok to do this when BackupEngine has `max_valid_backups_to_open` set
1545 // as the engine always knows all valid backup numbers.
1546 BackupID backup_id
= 0;
1547 bool tmp_dir
= child
.find(".tmp") != std::string::npos
;
1548 sscanf(child
.c_str(), "%u", &backup_id
);
1549 if (!tmp_dir
&& // if it's tmp_dir, delete it
1550 (backup_id
== 0 || backups_
.find(backup_id
) != backups_
.end())) {
1551 // it's either not a number or it's still alive. continue
1554 // here we have to delete the dir and all its children
1555 std::string full_private_path
=
1556 GetAbsolutePath(GetPrivateFileRel(backup_id
));
1557 std::vector
<std::string
> subchildren
;
1558 backup_env_
->GetChildren(full_private_path
, &subchildren
);
1559 for (auto& subchild
: subchildren
) {
1560 Status s
= backup_env_
->DeleteFile(full_private_path
+ subchild
);
1561 ROCKS_LOG_INFO(options_
.info_log
, "Deleting %s -- %s",
1562 (full_private_path
+ subchild
).c_str(),
1563 s
.ToString().c_str());
1565 // finally delete the private dir
1566 Status s
= backup_env_
->DeleteDir(full_private_path
);
1567 ROCKS_LOG_INFO(options_
.info_log
, "Deleting dir %s -- %s",
1568 full_private_path
.c_str(), s
.ToString().c_str());
1571 return Status::OK();
1574 // ------- BackupMeta class --------
1576 Status
BackupEngineImpl::BackupMeta::AddFile(
1577 std::shared_ptr
<FileInfo
> file_info
) {
1578 auto itr
= file_infos_
->find(file_info
->filename
);
1579 if (itr
== file_infos_
->end()) {
1580 auto ret
= file_infos_
->insert({file_info
->filename
, file_info
});
1583 itr
->second
->refs
= 1;
1585 // if this happens, something is seriously wrong
1586 return Status::Corruption("In memory metadata insertion error");
1589 if (itr
->second
->checksum_value
!= file_info
->checksum_value
) {
1590 return Status::Corruption(
1591 "Checksum mismatch for existing backup file. Delete old backups and "
1594 ++itr
->second
->refs
; // increase refcount if already present
1597 size_
+= file_info
->size
;
1598 files_
.push_back(itr
->second
);
1600 return Status::OK();
1603 Status
BackupEngineImpl::BackupMeta::Delete(bool delete_meta
) {
1605 for (const auto& file
: files_
) {
1606 --file
->refs
; // decrease refcount
1611 s
= env_
->FileExists(meta_filename_
);
1613 s
= env_
->DeleteFile(meta_filename_
);
1614 } else if (s
.IsNotFound()) {
1615 s
= Status::OK(); // nothing to delete
1622 Slice
kMetaDataPrefix("metadata ");
1624 // each backup meta file is of the format:
1627 // <metadata(literal string)> <metadata> (optional)
1628 // <number of files>
1629 // <file1> <crc32(literal string)> <crc32_value>
1630 // <file2> <crc32(literal string)> <crc32_value>
1632 Status
BackupEngineImpl::BackupMeta::LoadFromFile(
1633 const std::string
& backup_dir
,
1634 const std::unordered_map
<std::string
, uint64_t>& abs_path_to_size
) {
1637 unique_ptr
<SequentialFile
> backup_meta_file
;
1638 s
= env_
->NewSequentialFile(meta_filename_
, &backup_meta_file
, EnvOptions());
1643 unique_ptr
<SequentialFileReader
> backup_meta_reader(
1644 new SequentialFileReader(std::move(backup_meta_file
), meta_filename_
));
1645 unique_ptr
<char[]> buf(new char[max_backup_meta_file_size_
+ 1]);
1647 s
= backup_meta_reader
->Read(max_backup_meta_file_size_
, &data
, buf
.get());
1649 if (!s
.ok() || data
.size() == max_backup_meta_file_size_
) {
1650 return s
.ok() ? Status::Corruption("File size too big") : s
;
1652 buf
[data
.size()] = 0;
1654 uint32_t num_files
= 0;
1656 timestamp_
= strtoull(data
.data(), &next
, 10);
1657 data
.remove_prefix(next
- data
.data() + 1); // +1 for '\n'
1658 sequence_number_
= strtoull(data
.data(), &next
, 10);
1659 data
.remove_prefix(next
- data
.data() + 1); // +1 for '\n'
1661 if (data
.starts_with(kMetaDataPrefix
)) {
1662 // app metadata present
1663 data
.remove_prefix(kMetaDataPrefix
.size());
1664 Slice hex_encoded_metadata
= GetSliceUntil(&data
, '\n');
1665 bool decode_success
= hex_encoded_metadata
.DecodeHex(&app_metadata_
);
1666 if (!decode_success
) {
1667 return Status::Corruption(
1668 "Failed to decode stored hex encoded app metadata");
1672 num_files
= static_cast<uint32_t>(strtoul(data
.data(), &next
, 10));
1673 data
.remove_prefix(next
- data
.data() + 1); // +1 for '\n'
1675 std::vector
<std::shared_ptr
<FileInfo
>> files
;
1677 Slice
checksum_prefix("crc32 ");
1679 for (uint32_t i
= 0; s
.ok() && i
< num_files
; ++i
) {
1680 auto line
= GetSliceUntil(&data
, '\n');
1681 std::string filename
= GetSliceUntil(&line
, ' ').ToString();
1684 const std::shared_ptr
<FileInfo
> file_info
= GetFile(filename
);
1686 size
= file_info
->size
;
1688 std::string abs_path
= backup_dir
+ "/" + filename
;
1690 size
= abs_path_to_size
.at(abs_path
);
1691 } catch (std::out_of_range
&) {
1692 return Status::Corruption("Size missing for pathname: " + abs_path
);
1697 return Status::Corruption("File checksum is missing for " + filename
+
1698 " in " + meta_filename_
);
1701 uint32_t checksum_value
= 0;
1702 if (line
.starts_with(checksum_prefix
)) {
1703 line
.remove_prefix(checksum_prefix
.size());
1704 checksum_value
= static_cast<uint32_t>(
1705 strtoul(line
.data(), nullptr, 10));
1706 if (line
!= rocksdb::ToString(checksum_value
)) {
1707 return Status::Corruption("Invalid checksum value for " + filename
+
1708 " in " + meta_filename_
);
1711 return Status::Corruption("Unknown checksum type for " + filename
+
1712 " in " + meta_filename_
);
1715 files
.emplace_back(new FileInfo(filename
, size
, checksum_value
));
1718 if (s
.ok() && data
.size() > 0) {
1719 // file has to be read completely. if not, we count it as corruption
1720 s
= Status::Corruption("Tailing data in backup meta file in " +
1725 files_
.reserve(files
.size());
1726 for (const auto& file_info
: files
) {
1727 s
= AddFile(file_info
);
1737 Status
BackupEngineImpl::BackupMeta::StoreToFile(bool sync
) {
1739 unique_ptr
<WritableFile
> backup_meta_file
;
1740 EnvOptions env_options
;
1741 env_options
.use_mmap_writes
= false;
1742 env_options
.use_direct_writes
= false;
1743 s
= env_
->NewWritableFile(meta_tmp_filename_
, &backup_meta_file
, env_options
);
1748 unique_ptr
<char[]> buf(new char[max_backup_meta_file_size_
]);
1749 size_t len
= 0, buf_size
= max_backup_meta_file_size_
;
1750 len
+= snprintf(buf
.get(), buf_size
, "%" PRId64
"\n", timestamp_
);
1751 len
+= snprintf(buf
.get() + len
, buf_size
- len
, "%" PRIu64
"\n",
1753 if (!app_metadata_
.empty()) {
1754 std::string hex_encoded_metadata
=
1755 Slice(app_metadata_
).ToString(/* hex */ true);
1757 // +1 to accommodate newline character
1758 size_t hex_meta_strlen
= kMetaDataPrefix
.ToString().length() + hex_encoded_metadata
.length() + 1;
1759 if (hex_meta_strlen
>= buf_size
) {
1760 return Status::Corruption("Buffer too small to fit backup metadata");
1762 else if (len
+ hex_meta_strlen
>= buf_size
) {
1763 backup_meta_file
->Append(Slice(buf
.get(), len
));
1765 unique_ptr
<char[]> new_reset_buf(new char[max_backup_meta_file_size_
]);
1766 buf
.swap(new_reset_buf
);
1769 len
+= snprintf(buf
.get() + len
, buf_size
- len
, "%s%s\n",
1770 kMetaDataPrefix
.ToString().c_str(),
1771 hex_encoded_metadata
.c_str());
1774 char writelen_temp
[19];
1775 if (len
+ snprintf(writelen_temp
, sizeof(writelen_temp
),
1776 "%" ROCKSDB_PRIszt
"\n", files_
.size()) >= buf_size
) {
1777 backup_meta_file
->Append(Slice(buf
.get(), len
));
1779 unique_ptr
<char[]> new_reset_buf(new char[max_backup_meta_file_size_
]);
1780 buf
.swap(new_reset_buf
);
1784 const char *const_write
= writelen_temp
;
1785 len
+= snprintf(buf
.get() + len
, buf_size
- len
, "%s", const_write
);
1788 for (const auto& file
: files_
) {
1789 // use crc32 for now, switch to something else if needed
1791 size_t newlen
= len
+ file
->filename
.length() + snprintf(writelen_temp
,
1792 sizeof(writelen_temp
), " crc32 %u\n", file
->checksum_value
);
1793 const char *const_write
= writelen_temp
;
1794 if (newlen
>= buf_size
) {
1795 backup_meta_file
->Append(Slice(buf
.get(), len
));
1797 unique_ptr
<char[]> new_reset_buf(new char[max_backup_meta_file_size_
]);
1798 buf
.swap(new_reset_buf
);
1801 len
+= snprintf(buf
.get() + len
, buf_size
- len
, "%s%s",
1802 file
->filename
.c_str(), const_write
);
1805 s
= backup_meta_file
->Append(Slice(buf
.get(), len
));
1806 if (s
.ok() && sync
) {
1807 s
= backup_meta_file
->Sync();
1810 s
= backup_meta_file
->Close();
1813 s
= env_
->RenameFile(meta_tmp_filename_
, meta_filename_
);
1818 // -------- BackupEngineReadOnlyImpl ---------
1819 class BackupEngineReadOnlyImpl
: public BackupEngineReadOnly
{
1821 BackupEngineReadOnlyImpl(Env
* db_env
, const BackupableDBOptions
& options
)
1822 : backup_engine_(new BackupEngineImpl(db_env
, options
, true)) {}
1824 virtual ~BackupEngineReadOnlyImpl() {}
1826 // The returned BackupInfos are in chronological order, which means the
1827 // latest backup comes last.
1828 virtual void GetBackupInfo(std::vector
<BackupInfo
>* backup_info
) override
{
1829 backup_engine_
->GetBackupInfo(backup_info
);
1832 virtual void GetCorruptedBackups(
1833 std::vector
<BackupID
>* corrupt_backup_ids
) override
{
1834 backup_engine_
->GetCorruptedBackups(corrupt_backup_ids
);
1837 virtual Status
RestoreDBFromBackup(
1838 BackupID backup_id
, const std::string
& db_dir
, const std::string
& wal_dir
,
1839 const RestoreOptions
& restore_options
= RestoreOptions()) override
{
1840 return backup_engine_
->RestoreDBFromBackup(backup_id
, db_dir
, wal_dir
,
1844 virtual Status
RestoreDBFromLatestBackup(
1845 const std::string
& db_dir
, const std::string
& wal_dir
,
1846 const RestoreOptions
& restore_options
= RestoreOptions()) override
{
1847 return backup_engine_
->RestoreDBFromLatestBackup(db_dir
, wal_dir
,
1851 virtual Status
VerifyBackup(BackupID backup_id
) override
{
1852 return backup_engine_
->VerifyBackup(backup_id
);
1855 Status
Initialize() { return backup_engine_
->Initialize(); }
1858 std::unique_ptr
<BackupEngineImpl
> backup_engine_
;
1861 Status
BackupEngineReadOnly::Open(Env
* env
, const BackupableDBOptions
& options
,
1862 BackupEngineReadOnly
** backup_engine_ptr
) {
1863 if (options
.destroy_old_data
) {
1864 return Status::InvalidArgument(
1865 "Can't destroy old data with ReadOnly BackupEngine");
1867 std::unique_ptr
<BackupEngineReadOnlyImpl
> backup_engine(
1868 new BackupEngineReadOnlyImpl(env
, options
));
1869 auto s
= backup_engine
->Initialize();
1871 *backup_engine_ptr
= nullptr;
1874 *backup_engine_ptr
= backup_engine
.release();
1875 return Status::OK();
1878 } // namespace rocksdb
1880 #endif // ROCKSDB_LITE