public:
BackupEngineImpl(Env* db_env, const BackupableDBOptions& options,
bool read_only = false);
- ~BackupEngineImpl();
+ ~BackupEngineImpl() override;
Status CreateNewBackupWithMetadata(DB* db, const std::string& app_metadata,
bool flush_before_backup = false,
std::function<void()> progress_callback =
restore_options);
}
- virtual Status VerifyBackup(BackupID backup_id) override;
+ Status VerifyBackup(BackupID backup_id) override;
Status Initialize();
// @param contents If non-empty, the file will be created with these contents.
Status CopyOrCreateFile(const std::string& src, const std::string& dst,
const std::string& contents, Env* src_env,
- Env* dst_env, bool sync, RateLimiter* rate_limiter,
+ Env* dst_env, const EnvOptions& src_env_options,
+ bool sync, RateLimiter* rate_limiter,
uint64_t* size = nullptr,
uint32_t* checksum_value = nullptr,
uint64_t size_limit = 0,
std::function<void()> progress_callback = []() {});
- Status CalculateChecksum(const std::string& src,
- Env* src_env,
- uint64_t size_limit,
- uint32_t* checksum_value);
+ Status CalculateChecksum(const std::string& src, Env* src_env,
+ const EnvOptions& src_env_options,
+ uint64_t size_limit, uint32_t* checksum_value);
struct CopyOrCreateResult {
uint64_t size;
std::string contents;
Env* src_env;
Env* dst_env;
+ EnvOptions src_env_options;
bool sync;
RateLimiter* rate_limiter;
uint64_t size_limit;
std::function<void()> progress_callback;
CopyOrCreateWorkItem()
- : src_path(""),
- dst_path(""),
- contents(""),
- src_env(nullptr),
- dst_env(nullptr),
- sync(false),
- rate_limiter(nullptr),
- size_limit(0) {}
+ : src_path(""),
+ dst_path(""),
+ contents(""),
+ src_env(nullptr),
+ dst_env(nullptr),
+ src_env_options(),
+ sync(false),
+ rate_limiter(nullptr),
+ size_limit(0) {}
CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
contents = std::move(o.contents);
src_env = o.src_env;
dst_env = o.dst_env;
+ src_env_options = std::move(o.src_env_options);
sync = o.sync;
rate_limiter = o.rate_limiter;
size_limit = o.size_limit;
CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path,
std::string _contents, Env* _src_env, Env* _dst_env,
- bool _sync, RateLimiter* _rate_limiter,
- uint64_t _size_limit,
+ EnvOptions _src_env_options, bool _sync,
+ RateLimiter* _rate_limiter, uint64_t _size_limit,
std::function<void()> _progress_callback = []() {})
: src_path(std::move(_src_path)),
dst_path(std::move(_dst_path)),
contents(std::move(_contents)),
src_env(_src_env),
dst_env(_dst_env),
+ src_env_options(std::move(_src_env_options)),
sync(_sync),
rate_limiter(_rate_limiter),
size_limit(_size_limit),
std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
BackupID backup_id, bool shared, const std::string& src_dir,
const std::string& fname, // starts with "/"
- RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit = 0,
+ const EnvOptions& src_env_options, RateLimiter* rate_limiter,
+ uint64_t size_bytes, uint64_t size_limit = 0,
bool shared_checksum = false,
std::function<void()> progress_callback = []() {},
const std::string& contents = std::string());
// backup state data
BackupID latest_backup_id_;
BackupID latest_valid_backup_id_;
- std::map<BackupID, unique_ptr<BackupMeta>> backups_;
- std::map<BackupID,
- std::pair<Status, unique_ptr<BackupMeta>>> corrupt_backups_;
+ std::map<BackupID, std::unique_ptr<BackupMeta>> backups_;
+ std::map<BackupID, std::pair<Status, std::unique_ptr<BackupMeta>>>
+ corrupt_backups_;
std::unordered_map<std::string,
std::shared_ptr<FileInfo>> backuped_file_infos_;
std::atomic<bool> stop_backup_;
Env* backup_env_;
// directories
- unique_ptr<Directory> backup_directory_;
- unique_ptr<Directory> shared_directory_;
- unique_ptr<Directory> meta_directory_;
- unique_ptr<Directory> private_directory_;
+ std::unique_ptr<Directory> backup_directory_;
+ std::unique_ptr<Directory> shared_directory_;
+ std::unique_ptr<Directory> meta_directory_;
+ std::unique_ptr<Directory> private_directory_;
static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB
size_t copy_file_buffer_size_;
}
assert(backups_.find(backup_id) == backups_.end());
backups_.insert(std::make_pair(
- backup_id, unique_ptr<BackupMeta>(new BackupMeta(
+ backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
GetBackupMetaFile(backup_id, false /* tmp */),
GetBackupMetaFile(backup_id, true /* tmp */),
&backuped_file_infos_, backup_env_))));
CopyOrCreateResult result;
result.status = CopyOrCreateFile(
work_item.src_path, work_item.dst_path, work_item.contents,
- work_item.src_env, work_item.dst_env, work_item.sync,
- work_item.rate_limiter, &result.size, &result.checksum_value,
- work_item.size_limit, work_item.progress_callback);
+ work_item.src_env, work_item.dst_env, work_item.src_env_options,
+ work_item.sync, work_item.rate_limiter, &result.size,
+ &result.checksum_value, work_item.size_limit,
+ work_item.progress_callback);
work_item.result.set_value(std::move(result));
}
});
}
auto ret = backups_.insert(std::make_pair(
- new_backup_id, unique_ptr<BackupMeta>(new BackupMeta(
+ new_backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
GetBackupMetaFile(new_backup_id, false /* tmp */),
GetBackupMetaFile(new_backup_id, true /* tmp */),
&backuped_file_infos_, backup_env_))));
if (s.ok()) {
CheckpointImpl checkpoint(db);
uint64_t sequence_number = 0;
+ DBOptions db_options = db->GetDBOptions();
+ EnvOptions src_raw_env_options(db_options);
s = checkpoint.CreateCustomCheckpoint(
- db->GetDBOptions(),
+ db_options,
[&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
FileType) {
// custom checkpoint will switch to calling copy_file_cb after it sees
if (type == kTableFile) {
st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
}
+ EnvOptions src_env_options;
+ switch (type) {
+ case kLogFile:
+ src_env_options =
+ db_env_->OptimizeForLogRead(src_raw_env_options);
+ break;
+ case kTableFile:
+ src_env_options = db_env_->OptimizeForCompactionTableRead(
+ src_raw_env_options, ImmutableDBOptions(db_options));
+ break;
+ case kDescriptorFile:
+ src_env_options =
+ db_env_->OptimizeForManifestRead(src_raw_env_options);
+ break;
+ default:
+ // Other backed up files (like options file) are not read by live
+ // DB, so don't need to worry about avoiding mixing buffered and
+ // direct I/O. Just use plain defaults.
+ src_env_options = src_raw_env_options;
+ break;
+ }
if (st.ok()) {
st = AddBackupFileWorkItem(
live_dst_paths, backup_items_to_finish, new_backup_id,
options_.share_table_files && type == kTableFile, src_dirname,
- fname, rate_limiter, size_bytes, size_limit_bytes,
+ fname, src_env_options, rate_limiter, size_bytes,
+ size_limit_bytes,
options_.share_files_with_checksum && type == kTableFile,
progress_callback);
}
Log(options_.info_log, "add file for backup %s", fname.c_str());
return AddBackupFileWorkItem(
live_dst_paths, backup_items_to_finish, new_backup_id,
- false /* shared */, "" /* src_dir */, fname, rate_limiter,
- contents.size(), 0 /* size_limit */, false /* shared_checksum */,
+ false /* shared */, "" /* src_dir */, fname,
+ EnvOptions() /* src_env_options */, rate_limiter, contents.size(),
+ 0 /* size_limit */, false /* shared_checksum */,
progress_callback, contents);
} /* create_file_cb */,
&sequence_number, flush_before_backup ? 0 : port::kMaxUint64);
s = new_backup->StoreToFile(options_.sync);
}
if (s.ok() && options_.sync) {
- unique_ptr<Directory> backup_private_directory;
+ std::unique_ptr<Directory> backup_private_directory;
backup_env_->NewDirectory(
GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
&backup_private_directory);
dst.c_str());
CopyOrCreateWorkItem copy_or_create_work_item(
GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
- false, rate_limiter, 0 /* size_limit */);
+ EnvOptions() /* src_env_options */, false, rate_limiter,
+ 0 /* size_limit */);
RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(),
file_info->checksum_value);
Status BackupEngineImpl::CopyOrCreateFile(
const std::string& src, const std::string& dst, const std::string& contents,
- Env* src_env, Env* dst_env, bool sync, RateLimiter* rate_limiter,
- uint64_t* size, uint32_t* checksum_value, uint64_t size_limit,
- std::function<void()> progress_callback) {
+ Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync,
+ RateLimiter* rate_limiter, uint64_t* size, uint32_t* checksum_value,
+ uint64_t size_limit, std::function<void()> progress_callback) {
assert(src.empty() != contents.empty());
Status s;
- unique_ptr<WritableFile> dst_file;
- unique_ptr<SequentialFile> src_file;
- EnvOptions env_options;
- env_options.use_mmap_writes = false;
+ std::unique_ptr<WritableFile> dst_file;
+ std::unique_ptr<SequentialFile> src_file;
+ EnvOptions dst_env_options;
+ dst_env_options.use_mmap_writes = false;
// TODO:(gzh) maybe use direct reads/writes here if possible
if (size != nullptr) {
*size = 0;
size_limit = std::numeric_limits<uint64_t>::max();
}
- s = dst_env->NewWritableFile(dst, &dst_file, env_options);
+ s = dst_env->NewWritableFile(dst, &dst_file, dst_env_options);
if (s.ok() && !src.empty()) {
- s = src_env->NewSequentialFile(src, &src_file, env_options);
+ s = src_env->NewSequentialFile(src, &src_file, src_env_options);
}
if (!s.ok()) {
return s;
}
- unique_ptr<WritableFileWriter> dest_writer(
- new WritableFileWriter(std::move(dst_file), dst, env_options));
- unique_ptr<SequentialFileReader> src_reader;
- unique_ptr<char[]> buf;
+ std::unique_ptr<WritableFileWriter> dest_writer(
+ new WritableFileWriter(std::move(dst_file), dst, dst_env_options));
+ std::unique_ptr<SequentialFileReader> src_reader;
+ std::unique_ptr<char[]> buf;
if (!src.empty()) {
src_reader.reset(new SequentialFileReader(std::move(src_file), src));
buf.reset(new char[copy_file_buffer_size_]);
std::unordered_set<std::string>& live_dst_paths,
std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
BackupID backup_id, bool shared, const std::string& src_dir,
- const std::string& fname, RateLimiter* rate_limiter, uint64_t size_bytes,
- uint64_t size_limit, bool shared_checksum,
- std::function<void()> progress_callback, const std::string& contents) {
+ const std::string& fname, const EnvOptions& src_env_options,
+ RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit,
+ bool shared_checksum, std::function<void()> progress_callback,
+ const std::string& contents) {
assert(!fname.empty() && fname[0] == '/');
assert(contents.empty() != src_dir.empty());
if (shared && shared_checksum) {
// add checksum and file length to the file name
- s = CalculateChecksum(src_dir + fname, db_env_, size_limit,
+ s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, size_limit,
&checksum_value);
if (!s.ok()) {
return s;
// the file is present and referenced by a backup
ROCKS_LOG_INFO(options_.info_log,
"%s already present, calculate checksum", fname.c_str());
- s = CalculateChecksum(src_dir + fname, db_env_, size_limit,
- &checksum_value);
+ s = CalculateChecksum(src_dir + fname, db_env_, src_env_options,
+ size_limit, &checksum_value);
}
}
live_dst_paths.insert(final_dest_path);
copy_dest_path->c_str());
CopyOrCreateWorkItem copy_or_create_work_item(
src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
- db_env_, backup_env_, options_.sync, rate_limiter, size_limit,
- progress_callback);
+ db_env_, backup_env_, src_env_options, options_.sync, rate_limiter,
+ size_limit, progress_callback);
BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), shared, need_to_copy,
backup_env_, temp_dest_path, final_dest_path, dst_relative);
}
Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
+ const EnvOptions& src_env_options,
uint64_t size_limit,
uint32_t* checksum_value) {
*checksum_value = 0;
size_limit = std::numeric_limits<uint64_t>::max();
}
- EnvOptions env_options;
- env_options.use_mmap_writes = false;
- env_options.use_direct_reads = false;
-
std::unique_ptr<SequentialFile> src_file;
- Status s = src_env->NewSequentialFile(src, &src_file, env_options);
+ Status s = src_env->NewSequentialFile(src, &src_file, src_env_options);
if (!s.ok()) {
return s;
}
- unique_ptr<SequentialFileReader> src_reader(
+ std::unique_ptr<SequentialFileReader> src_reader(
new SequentialFileReader(std::move(src_file), src));
std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
Slice data;
const std::unordered_map<std::string, uint64_t>& abs_path_to_size) {
assert(Empty());
Status s;
- unique_ptr<SequentialFile> backup_meta_file;
+ std::unique_ptr<SequentialFile> backup_meta_file;
s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
if (!s.ok()) {
return s;
}
- unique_ptr<SequentialFileReader> backup_meta_reader(
+ std::unique_ptr<SequentialFileReader> backup_meta_reader(
new SequentialFileReader(std::move(backup_meta_file), meta_filename_));
- unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
+ std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
Slice data;
s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());
Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
Status s;
- unique_ptr<WritableFile> backup_meta_file;
+ std::unique_ptr<WritableFile> backup_meta_file;
EnvOptions env_options;
env_options.use_mmap_writes = false;
env_options.use_direct_writes = false;
return s;
}
- unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
+ std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
size_t len = 0, buf_size = max_backup_meta_file_size_;
len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_);
len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
else if (len + hex_meta_strlen >= buf_size) {
backup_meta_file->Append(Slice(buf.get(), len));
buf.reset();
- unique_ptr<char[]> new_reset_buf(new char[max_backup_meta_file_size_]);
+ std::unique_ptr<char[]> new_reset_buf(
+ new char[max_backup_meta_file_size_]);
buf.swap(new_reset_buf);
len = 0;
}
"%" ROCKSDB_PRIszt "\n", files_.size()) >= buf_size) {
backup_meta_file->Append(Slice(buf.get(), len));
buf.reset();
- unique_ptr<char[]> new_reset_buf(new char[max_backup_meta_file_size_]);
+ std::unique_ptr<char[]> new_reset_buf(new char[max_backup_meta_file_size_]);
buf.swap(new_reset_buf);
len = 0;
}
if (newlen >= buf_size) {
backup_meta_file->Append(Slice(buf.get(), len));
buf.reset();
- unique_ptr<char[]> new_reset_buf(new char[max_backup_meta_file_size_]);
+ std::unique_ptr<char[]> new_reset_buf(
+ new char[max_backup_meta_file_size_]);
buf.swap(new_reset_buf);
len = 0;
}
BackupEngineReadOnlyImpl(Env* db_env, const BackupableDBOptions& options)
: backup_engine_(new BackupEngineImpl(db_env, options, true)) {}
- virtual ~BackupEngineReadOnlyImpl() {}
+ ~BackupEngineReadOnlyImpl() override {}
// The returned BackupInfos are in chronological order, which means the
// latest backup comes last.
- virtual void GetBackupInfo(std::vector<BackupInfo>* backup_info) override {
+ void GetBackupInfo(std::vector<BackupInfo>* backup_info) override {
backup_engine_->GetBackupInfo(backup_info);
}
- virtual void GetCorruptedBackups(
- std::vector<BackupID>* corrupt_backup_ids) override {
+ void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override {
backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
}
- virtual Status RestoreDBFromBackup(
+ Status RestoreDBFromBackup(
BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options = RestoreOptions()) override {
return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
restore_options);
}
- virtual Status RestoreDBFromLatestBackup(
+ Status RestoreDBFromLatestBackup(
const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options = RestoreOptions()) override {
return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
restore_options);
}
- virtual Status VerifyBackup(BackupID backup_id) override {
+ Status VerifyBackup(BackupID backup_id) override {
return backup_engine_->VerifyBackup(backup_id);
}