1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2012 Facebook.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file.
12 #include "utilities/checkpoint/checkpoint_impl.h"
14 #ifndef __STDC_FORMAT_MACROS
15 #define __STDC_FORMAT_MACROS
23 #include "db/wal_manager.h"
24 #include "port/port.h"
25 #include "rocksdb/db.h"
26 #include "rocksdb/env.h"
27 #include "rocksdb/transaction_log.h"
28 #include "rocksdb/utilities/checkpoint.h"
29 #include "util/file_util.h"
30 #include "util/filename.h"
31 #include "util/sync_point.h"
35 Status
Checkpoint::Create(DB
* db
, Checkpoint
** checkpoint_ptr
) {
36 *checkpoint_ptr
= new CheckpointImpl(db
);
40 Status
Checkpoint::CreateCheckpoint(const std::string
& /*checkpoint_dir*/,
41 uint64_t /*log_size_for_flush*/) {
42 return Status::NotSupported("");
45 void CheckpointImpl::CleanStagingDirectory(
46 const std::string
& full_private_path
, Logger
* info_log
) {
47 std::vector
<std::string
> subchildren
;
48 Status s
= db_
->GetEnv()->FileExists(full_private_path
);
52 ROCKS_LOG_INFO(info_log
, "File exists %s -- %s",
53 full_private_path
.c_str(), s
.ToString().c_str());
54 db_
->GetEnv()->GetChildren(full_private_path
, &subchildren
);
55 for (auto& subchild
: subchildren
) {
56 std::string subchild_path
= full_private_path
+ "/" + subchild
;
57 s
= db_
->GetEnv()->DeleteFile(subchild_path
);
58 ROCKS_LOG_INFO(info_log
, "Delete file %s -- %s",
59 subchild_path
.c_str(), s
.ToString().c_str());
61 // finally delete the private dir
62 s
= db_
->GetEnv()->DeleteDir(full_private_path
);
63 ROCKS_LOG_INFO(info_log
, "Delete dir %s -- %s",
64 full_private_path
.c_str(), s
.ToString().c_str());
67 // Builds an openable snapshot of RocksDB
68 Status
CheckpointImpl::CreateCheckpoint(const std::string
& checkpoint_dir
,
69 uint64_t log_size_for_flush
) {
70 DBOptions db_options
= db_
->GetDBOptions();
72 Status s
= db_
->GetEnv()->FileExists(checkpoint_dir
);
74 return Status::InvalidArgument("Directory exists");
75 } else if (!s
.IsNotFound()) {
76 assert(s
.IsIOError());
82 "Started the snapshot process -- creating snapshot in directory %s",
83 checkpoint_dir
.c_str());
85 size_t final_nonslash_idx
= checkpoint_dir
.find_last_not_of('/');
86 if (final_nonslash_idx
== std::string::npos
) {
87 // npos means it's only slashes or empty. Non-empty means it's the root
88 // directory, but it shouldn't be because we verified above the directory
90 assert(checkpoint_dir
.empty());
91 return Status::InvalidArgument("invalid checkpoint directory name");
94 std::string full_private_path
=
95 checkpoint_dir
.substr(0, final_nonslash_idx
+ 1) + ".tmp";
98 "Snapshot process -- using temporary directory %s",
99 full_private_path
.c_str());
100 CleanStagingDirectory(full_private_path
, db_options
.info_log
.get());
101 // create snapshot directory
102 s
= db_
->GetEnv()->CreateDir(full_private_path
);
103 uint64_t sequence_number
= 0;
105 db_
->DisableFileDeletions();
106 s
= CreateCustomCheckpoint(
108 [&](const std::string
& src_dirname
, const std::string
& fname
,
110 ROCKS_LOG_INFO(db_options
.info_log
, "Hard Linking %s", fname
.c_str());
111 return db_
->GetEnv()->LinkFile(src_dirname
+ fname
,
112 full_private_path
+ fname
);
113 } /* link_file_cb */,
114 [&](const std::string
& src_dirname
, const std::string
& fname
,
115 uint64_t size_limit_bytes
, FileType
) {
116 ROCKS_LOG_INFO(db_options
.info_log
, "Copying %s", fname
.c_str());
117 return CopyFile(db_
->GetEnv(), src_dirname
+ fname
,
118 full_private_path
+ fname
, size_limit_bytes
,
119 db_options
.use_fsync
);
120 } /* copy_file_cb */,
121 [&](const std::string
& fname
, const std::string
& contents
, FileType
) {
122 ROCKS_LOG_INFO(db_options
.info_log
, "Creating %s", fname
.c_str());
123 return CreateFile(db_
->GetEnv(), full_private_path
+ fname
, contents
,
124 db_options
.use_fsync
);
125 } /* create_file_cb */,
126 &sequence_number
, log_size_for_flush
);
127 // we copied all the files, enable file deletions
128 db_
->EnableFileDeletions(false);
132 // move tmp private backup to real snapshot directory
133 s
= db_
->GetEnv()->RenameFile(full_private_path
, checkpoint_dir
);
136 unique_ptr
<Directory
> checkpoint_directory
;
137 db_
->GetEnv()->NewDirectory(checkpoint_dir
, &checkpoint_directory
);
138 if (checkpoint_directory
!= nullptr) {
139 s
= checkpoint_directory
->Fsync();
144 // here we know that we succeeded and installed the new snapshot
145 ROCKS_LOG_INFO(db_options
.info_log
, "Snapshot DONE. All is good");
146 ROCKS_LOG_INFO(db_options
.info_log
, "Snapshot sequence number: %" PRIu64
,
149 // clean all the files we might have created
150 ROCKS_LOG_INFO(db_options
.info_log
, "Snapshot failed -- %s",
151 s
.ToString().c_str());
152 CleanStagingDirectory(full_private_path
, db_options
.info_log
.get());
157 Status
CheckpointImpl::CreateCustomCheckpoint(
158 const DBOptions
& db_options
,
159 std::function
<Status(const std::string
& src_dirname
,
160 const std::string
& src_fname
, FileType type
)>
162 std::function
<Status(const std::string
& src_dirname
,
163 const std::string
& src_fname
,
164 uint64_t size_limit_bytes
, FileType type
)>
166 std::function
<Status(const std::string
& fname
, const std::string
& contents
,
169 uint64_t* sequence_number
, uint64_t log_size_for_flush
) {
171 std::vector
<std::string
> live_files
;
172 uint64_t manifest_file_size
= 0;
173 uint64_t min_log_num
= port::kMaxUint64
;
174 *sequence_number
= db_
->GetLatestSequenceNumber();
176 VectorLogPtr live_wal_files
;
178 bool flush_memtable
= true;
180 if (!db_options
.allow_2pc
) {
181 if (log_size_for_flush
== port::kMaxUint64
) {
182 flush_memtable
= false;
183 } else if (log_size_for_flush
> 0) {
184 // If out standing log files are small, we skip the flush.
185 s
= db_
->GetSortedWalFiles(live_wal_files
);
191 // Don't flush column families if total log size is smaller than
192 // log_size_for_flush. We copy the log files instead.
193 // We may be able to cover 2PC case too.
194 uint64_t total_wal_size
= 0;
195 for (auto& wal
: live_wal_files
) {
196 total_wal_size
+= wal
->SizeFileBytes();
198 if (total_wal_size
< log_size_for_flush
) {
199 flush_memtable
= false;
201 live_wal_files
.clear();
205 // this will return live_files prefixed with "/"
206 s
= db_
->GetLiveFiles(live_files
, &manifest_file_size
, flush_memtable
);
208 if (s
.ok() && db_options
.allow_2pc
) {
209 // If 2PC is enabled, we need to get minimum log number after the flush.
210 // Need to refetch the live files to recapture the snapshot.
211 if (!db_
->GetIntProperty(DB::Properties::kMinLogNumberToKeep
,
213 return Status::InvalidArgument(
214 "2PC enabled but cannot fine the min log number to keep.");
216 // We need to refetch live files with flush to handle this case:
217 // A previous 000001.log contains the prepare record of transaction tnx1.
218 // The current log file is 000002.log, and sequence_number points to this
220 // After calling GetLiveFiles(), 000003.log is created.
221 // Then tnx1 is committed. The commit record is written to 000003.log.
222 // Now we fetch min_log_num, which will be 3.
223 // Then only 000002.log and 000003.log will be copied, and 000001.log will
224 // be skipped. 000003.log contains commit message of tnx1, but we don't
225 // have respective prepare record for it.
226 // In order to avoid this situation, we need to force flush to make sure
227 // all transactions committed before getting min_log_num will be flushed
229 // We cannot get min_log_num before calling the GetLiveFiles() for the
230 // first time, because if we do that, all the logs files will be included,
231 // far more than needed.
232 s
= db_
->GetLiveFiles(live_files
, &manifest_file_size
, flush_memtable
);
235 TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
236 TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
237 db_
->FlushWAL(false /* sync */);
239 // if we have more than one column family, we need to also get WAL files
241 s
= db_
->GetSortedWalFiles(live_wal_files
);
247 size_t wal_size
= live_wal_files
.size();
249 // copy/hard link live_files
250 std::string manifest_fname
, current_fname
;
251 for (size_t i
= 0; s
.ok() && i
< live_files
.size(); ++i
) {
254 bool ok
= ParseFileName(live_files
[i
], &number
, &type
);
256 s
= Status::Corruption("Can't parse file name. This is very bad");
259 // we should only get sst, options, manifest and current files here
260 assert(type
== kTableFile
|| type
== kDescriptorFile
||
261 type
== kCurrentFile
|| type
== kOptionsFile
);
262 assert(live_files
[i
].size() > 0 && live_files
[i
][0] == '/');
263 if (type
== kCurrentFile
) {
264 // We will craft the current file manually to ensure it's consistent with
265 // the manifest number. This is necessary because current's file contents
266 // can change during checkpoint creation.
267 current_fname
= live_files
[i
];
269 } else if (type
== kDescriptorFile
) {
270 manifest_fname
= live_files
[i
];
272 std::string src_fname
= live_files
[i
];
275 // * if it's kTableFile, then it's shared
276 // * if it's kDescriptorFile, limit the size to manifest_file_size
277 // * always copy if cross-device link
278 if ((type
== kTableFile
) && same_fs
) {
279 s
= link_file_cb(db_
->GetName(), src_fname
, type
);
280 if (s
.IsNotSupported()) {
285 if ((type
!= kTableFile
) || (!same_fs
)) {
286 s
= copy_file_cb(db_
->GetName(), src_fname
,
287 (type
== kDescriptorFile
) ? manifest_file_size
: 0,
291 if (s
.ok() && !current_fname
.empty() && !manifest_fname
.empty()) {
292 create_file_cb(current_fname
, manifest_fname
.substr(1) + "\n",
295 ROCKS_LOG_INFO(db_options
.info_log
, "Number of log files %" ROCKSDB_PRIszt
,
296 live_wal_files
.size());
298 // Link WAL files. Copy exact size of last one because it is the only one
299 // that has changes after the last flush.
300 for (size_t i
= 0; s
.ok() && i
< wal_size
; ++i
) {
301 if ((live_wal_files
[i
]->Type() == kAliveLogFile
) &&
303 live_wal_files
[i
]->StartSequence() >= *sequence_number
||
304 live_wal_files
[i
]->LogNumber() >= min_log_num
)) {
305 if (i
+ 1 == wal_size
) {
306 s
= copy_file_cb(db_options
.wal_dir
, live_wal_files
[i
]->PathName(),
307 live_wal_files
[i
]->SizeFileBytes(), kLogFile
);
311 // we only care about live log files
312 s
= link_file_cb(db_options
.wal_dir
, live_wal_files
[i
]->PathName(),
314 if (s
.IsNotSupported()) {
320 s
= copy_file_cb(db_options
.wal_dir
, live_wal_files
[i
]->PathName(), 0,
329 } // namespace rocksdb
331 #endif // ROCKSDB_LITE