]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #pragma once | |
7 | ||
8 | #ifndef ROCKSDB_LITE | |
9 | ||
10 | #include <string> | |
11 | #include <vector> | |
1e59de90 | 12 | |
f67539c2 | 13 | #include "db/db_impl/db_impl.h" |
1e59de90 | 14 | #include "logging/logging.h" |
f67539c2 TL |
15 | |
16 | namespace ROCKSDB_NAMESPACE { | |
17 | ||
18 | // A wrapper class to hold log reader, log reporter, log status. | |
19 | class LogReaderContainer { | |
20 | public: | |
21 | LogReaderContainer() | |
22 | : reader_(nullptr), reporter_(nullptr), status_(nullptr) {} | |
23 | LogReaderContainer(Env* env, std::shared_ptr<Logger> info_log, | |
24 | std::string fname, | |
25 | std::unique_ptr<SequentialFileReader>&& file_reader, | |
26 | uint64_t log_number) { | |
27 | LogReporter* reporter = new LogReporter(); | |
28 | status_ = new Status(); | |
29 | reporter->env = env; | |
30 | reporter->info_log = info_log.get(); | |
31 | reporter->fname = std::move(fname); | |
32 | reporter->status = status_; | |
33 | reporter_ = reporter; | |
34 | // We intentially make log::Reader do checksumming even if | |
35 | // paranoid_checks==false so that corruptions cause entire commits | |
36 | // to be skipped instead of propagating bad information (like overly | |
37 | // large sequence numbers). | |
38 | reader_ = new log::FragmentBufferedReader(info_log, std::move(file_reader), | |
39 | reporter, true /*checksum*/, | |
40 | log_number); | |
41 | } | |
42 | log::FragmentBufferedReader* reader_; | |
43 | log::Reader::Reporter* reporter_; | |
44 | Status* status_; | |
45 | ~LogReaderContainer() { | |
46 | delete reader_; | |
47 | delete reporter_; | |
48 | delete status_; | |
49 | } | |
1e59de90 | 50 | |
f67539c2 TL |
51 | private: |
52 | struct LogReporter : public log::Reader::Reporter { | |
53 | Env* env; | |
54 | Logger* info_log; | |
55 | std::string fname; | |
56 | Status* status; // nullptr if immutable_db_options_.paranoid_checks==false | |
57 | void Corruption(size_t bytes, const Status& s) override { | |
58 | ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s", | |
59 | (this->status == nullptr ? "(ignoring error) " : ""), | |
60 | fname.c_str(), static_cast<int>(bytes), | |
61 | s.ToString().c_str()); | |
62 | if (this->status != nullptr && this->status->ok()) { | |
63 | *this->status = s; | |
64 | } | |
65 | } | |
66 | }; | |
67 | }; | |
68 | ||
69 | // The secondary instance shares access to the storage as the primary. | |
70 | // The secondary is able to read and replay changes described in both the | |
71 | // MANIFEST and the WAL files without coordination with the primary. | |
72 | // The secondary instance can be opened using `DB::OpenAsSecondary`. After | |
73 | // that, it can call `DBImplSecondary::TryCatchUpWithPrimary` to make best | |
74 | // effort attempts to catch up with the primary. | |
1e59de90 | 75 | // TODO: Share common structure with CompactedDBImpl and DBImplReadOnly |
f67539c2 TL |
76 | class DBImplSecondary : public DBImpl { |
77 | public: | |
1e59de90 TL |
78 | DBImplSecondary(const DBOptions& options, const std::string& dbname, |
79 | std::string secondary_path); | |
f67539c2 TL |
80 | ~DBImplSecondary() override; |
81 | ||
82 | // Recover by replaying MANIFEST and WAL. Also initialize manifest_reader_ | |
83 | // and log_readers_ to facilitate future operations. | |
84 | Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families, | |
20effc67 | 85 | bool read_only, bool error_if_wal_file_exists, |
1e59de90 TL |
86 | bool error_if_data_exists_in_wals, uint64_t* = nullptr, |
87 | RecoveryContext* recovery_ctx = nullptr) override; | |
f67539c2 | 88 | |
1e59de90 | 89 | // Implementations of the DB interface. |
f67539c2 | 90 | using DB::Get; |
1e59de90 TL |
91 | // Can return IOError due to files being deleted by the primary. To avoid |
92 | // IOError in this case, application can coordinate between primary and | |
93 | // secondaries so that primary will not delete files that are currently being | |
94 | // used by the secondaries. The application can also provide a custom FS/Env | |
95 | // implementation so that files will remain present until all primary and | |
96 | // secondaries indicate that they can be deleted. As a partial hacky | |
97 | // workaround, the secondaries can be opened with `max_open_files=-1` so that | |
98 | // it eagerly keeps all talbe files open and is able to access the contents of | |
99 | // deleted files via prior open fd. | |
f67539c2 TL |
100 | Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, |
101 | const Slice& key, PinnableSlice* value) override; | |
102 | ||
1e59de90 TL |
103 | Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, |
104 | const Slice& key, PinnableSlice* value, | |
105 | std::string* timestamp) override; | |
106 | ||
f67539c2 | 107 | Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, |
1e59de90 TL |
108 | const Slice& key, PinnableSlice* value, |
109 | std::string* timestamp); | |
f67539c2 TL |
110 | |
111 | using DBImpl::NewIterator; | |
1e59de90 TL |
112 | // Operations on the created iterators can return IOError due to files being |
113 | // deleted by the primary. To avoid IOError in this case, application can | |
114 | // coordinate between primary and secondaries so that primary will not delete | |
115 | // files that are currently being used by the secondaries. The application can | |
116 | // also provide a custom FS/Env implementation so that files will remain | |
117 | // present until all primary and secondaries indicate that they can be | |
118 | // deleted. As a partial hacky workaround, the secondaries can be opened with | |
119 | // `max_open_files=-1` so that it eagerly keeps all talbe files open and is | |
120 | // able to access the contents of deleted files via prior open fd. | |
f67539c2 TL |
121 | Iterator* NewIterator(const ReadOptions&, |
122 | ColumnFamilyHandle* column_family) override; | |
123 | ||
124 | ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options, | |
125 | ColumnFamilyData* cfd, | |
126 | SequenceNumber snapshot, | |
1e59de90 TL |
127 | ReadCallback* read_callback, |
128 | bool expose_blob_index = false, | |
129 | bool allow_refresh = true); | |
f67539c2 TL |
130 | |
131 | Status NewIterators(const ReadOptions& options, | |
132 | const std::vector<ColumnFamilyHandle*>& column_families, | |
133 | std::vector<Iterator*>* iterators) override; | |
134 | ||
135 | using DBImpl::Put; | |
136 | Status Put(const WriteOptions& /*options*/, | |
137 | ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, | |
138 | const Slice& /*value*/) override { | |
139 | return Status::NotSupported("Not supported operation in secondary mode."); | |
140 | } | |
141 | ||
1e59de90 TL |
142 | using DBImpl::PutEntity; |
143 | Status PutEntity(const WriteOptions& /* options */, | |
144 | ColumnFamilyHandle* /* column_family */, | |
145 | const Slice& /* key */, | |
146 | const WideColumns& /* columns */) override { | |
147 | return Status::NotSupported("Not supported operation in secondary mode."); | |
148 | } | |
149 | ||
f67539c2 TL |
150 | using DBImpl::Merge; |
151 | Status Merge(const WriteOptions& /*options*/, | |
152 | ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, | |
153 | const Slice& /*value*/) override { | |
154 | return Status::NotSupported("Not supported operation in secondary mode."); | |
155 | } | |
156 | ||
157 | using DBImpl::Delete; | |
158 | Status Delete(const WriteOptions& /*options*/, | |
159 | ColumnFamilyHandle* /*column_family*/, | |
160 | const Slice& /*key*/) override { | |
161 | return Status::NotSupported("Not supported operation in secondary mode."); | |
162 | } | |
163 | ||
164 | using DBImpl::SingleDelete; | |
165 | Status SingleDelete(const WriteOptions& /*options*/, | |
166 | ColumnFamilyHandle* /*column_family*/, | |
167 | const Slice& /*key*/) override { | |
168 | return Status::NotSupported("Not supported operation in secondary mode."); | |
169 | } | |
170 | ||
171 | Status Write(const WriteOptions& /*options*/, | |
172 | WriteBatch* /*updates*/) override { | |
173 | return Status::NotSupported("Not supported operation in secondary mode."); | |
174 | } | |
175 | ||
176 | using DBImpl::CompactRange; | |
177 | Status CompactRange(const CompactRangeOptions& /*options*/, | |
178 | ColumnFamilyHandle* /*column_family*/, | |
179 | const Slice* /*begin*/, const Slice* /*end*/) override { | |
180 | return Status::NotSupported("Not supported operation in secondary mode."); | |
181 | } | |
182 | ||
183 | using DBImpl::CompactFiles; | |
184 | Status CompactFiles( | |
185 | const CompactionOptions& /*compact_options*/, | |
186 | ColumnFamilyHandle* /*column_family*/, | |
187 | const std::vector<std::string>& /*input_file_names*/, | |
188 | const int /*output_level*/, const int /*output_path_id*/ = -1, | |
189 | std::vector<std::string>* const /*output_file_names*/ = nullptr, | |
190 | CompactionJobInfo* /*compaction_job_info*/ = nullptr) override { | |
191 | return Status::NotSupported("Not supported operation in secondary mode."); | |
192 | } | |
193 | ||
194 | Status DisableFileDeletions() override { | |
195 | return Status::NotSupported("Not supported operation in secondary mode."); | |
196 | } | |
197 | ||
198 | Status EnableFileDeletions(bool /*force*/) override { | |
199 | return Status::NotSupported("Not supported operation in secondary mode."); | |
200 | } | |
201 | ||
202 | Status GetLiveFiles(std::vector<std::string>&, | |
203 | uint64_t* /*manifest_file_size*/, | |
204 | bool /*flush_memtable*/ = true) override { | |
205 | return Status::NotSupported("Not supported operation in secondary mode."); | |
206 | } | |
207 | ||
208 | using DBImpl::Flush; | |
209 | Status Flush(const FlushOptions& /*options*/, | |
210 | ColumnFamilyHandle* /*column_family*/) override { | |
211 | return Status::NotSupported("Not supported operation in secondary mode."); | |
212 | } | |
213 | ||
214 | using DBImpl::SetDBOptions; | |
215 | Status SetDBOptions(const std::unordered_map<std::string, std::string>& | |
216 | /*options_map*/) override { | |
217 | // Currently not supported because changing certain options may cause | |
218 | // flush/compaction. | |
219 | return Status::NotSupported("Not supported operation in secondary mode."); | |
220 | } | |
221 | ||
222 | using DBImpl::SetOptions; | |
223 | Status SetOptions( | |
224 | ColumnFamilyHandle* /*cfd*/, | |
225 | const std::unordered_map<std::string, std::string>& /*options_map*/) | |
226 | override { | |
227 | // Currently not supported because changing certain options may cause | |
228 | // flush/compaction and/or write to MANIFEST. | |
229 | return Status::NotSupported("Not supported operation in secondary mode."); | |
230 | } | |
231 | ||
232 | using DBImpl::SyncWAL; | |
233 | Status SyncWAL() override { | |
234 | return Status::NotSupported("Not supported operation in secondary mode."); | |
235 | } | |
236 | ||
237 | using DB::IngestExternalFile; | |
238 | Status IngestExternalFile( | |
239 | ColumnFamilyHandle* /*column_family*/, | |
240 | const std::vector<std::string>& /*external_files*/, | |
241 | const IngestExternalFileOptions& /*ingestion_options*/) override { | |
242 | return Status::NotSupported("Not supported operation in secondary mode."); | |
243 | } | |
244 | ||
245 | // Try to catch up with the primary by reading as much as possible from the | |
246 | // log files until there is nothing more to read or encounters an error. If | |
247 | // the amount of information in the log files to process is huge, this | |
248 | // method can take long time due to all the I/O and CPU costs. | |
249 | Status TryCatchUpWithPrimary() override; | |
250 | ||
f67539c2 TL |
251 | // Try to find log reader using log_number from log_readers_ map, initialize |
252 | // if it doesn't exist | |
253 | Status MaybeInitLogReader(uint64_t log_number, | |
254 | log::FragmentBufferedReader** log_reader); | |
255 | ||
256 | // Check if all live files exist on file system and that their file sizes | |
257 | // matche to the in-memory records. It is possible that some live files may | |
258 | // have been deleted by the primary. In this case, CheckConsistency() does | |
259 | // not flag the missing file as inconsistency. | |
260 | Status CheckConsistency() override; | |
261 | ||
1e59de90 TL |
262 | #ifndef NDEBUG |
263 | Status TEST_CompactWithoutInstallation(const OpenAndCompactOptions& options, | |
264 | ColumnFamilyHandle* cfh, | |
265 | const CompactionServiceInput& input, | |
266 | CompactionServiceResult* result) { | |
267 | return CompactWithoutInstallation(options, cfh, input, result); | |
268 | } | |
269 | #endif // NDEBUG | |
270 | ||
f67539c2 | 271 | protected: |
1e59de90 TL |
272 | #ifndef ROCKSDB_LITE |
273 | Status FlushForGetLiveFiles() override { | |
274 | // No-op for read-only DB | |
275 | return Status::OK(); | |
276 | } | |
277 | #endif // !ROCKSDB_LITE | |
278 | ||
f67539c2 TL |
279 | // ColumnFamilyCollector is a write batch handler which does nothing |
280 | // except recording unique column family IDs | |
281 | class ColumnFamilyCollector : public WriteBatch::Handler { | |
282 | std::unordered_set<uint32_t> column_family_ids_; | |
283 | ||
284 | Status AddColumnFamilyId(uint32_t column_family_id) { | |
285 | if (column_family_ids_.find(column_family_id) == | |
286 | column_family_ids_.end()) { | |
287 | column_family_ids_.insert(column_family_id); | |
288 | } | |
289 | return Status::OK(); | |
290 | } | |
291 | ||
292 | public: | |
293 | explicit ColumnFamilyCollector() {} | |
294 | ||
295 | ~ColumnFamilyCollector() override {} | |
296 | ||
297 | Status PutCF(uint32_t column_family_id, const Slice&, | |
298 | const Slice&) override { | |
299 | return AddColumnFamilyId(column_family_id); | |
300 | } | |
301 | ||
302 | Status DeleteCF(uint32_t column_family_id, const Slice&) override { | |
303 | return AddColumnFamilyId(column_family_id); | |
304 | } | |
305 | ||
306 | Status SingleDeleteCF(uint32_t column_family_id, const Slice&) override { | |
307 | return AddColumnFamilyId(column_family_id); | |
308 | } | |
309 | ||
310 | Status DeleteRangeCF(uint32_t column_family_id, const Slice&, | |
311 | const Slice&) override { | |
312 | return AddColumnFamilyId(column_family_id); | |
313 | } | |
314 | ||
315 | Status MergeCF(uint32_t column_family_id, const Slice&, | |
316 | const Slice&) override { | |
317 | return AddColumnFamilyId(column_family_id); | |
318 | } | |
319 | ||
320 | Status PutBlobIndexCF(uint32_t column_family_id, const Slice&, | |
321 | const Slice&) override { | |
322 | return AddColumnFamilyId(column_family_id); | |
323 | } | |
324 | ||
1e59de90 TL |
325 | Status MarkBeginPrepare(bool) override { return Status::OK(); } |
326 | ||
327 | Status MarkEndPrepare(const Slice&) override { return Status::OK(); } | |
328 | ||
329 | Status MarkRollback(const Slice&) override { return Status::OK(); } | |
330 | ||
331 | Status MarkCommit(const Slice&) override { return Status::OK(); } | |
332 | ||
333 | Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { | |
334 | return Status::OK(); | |
335 | } | |
336 | ||
337 | Status MarkNoop(bool) override { return Status::OK(); } | |
338 | ||
f67539c2 TL |
339 | const std::unordered_set<uint32_t>& column_families() const { |
340 | return column_family_ids_; | |
341 | } | |
342 | }; | |
343 | ||
344 | Status CollectColumnFamilyIdsFromWriteBatch( | |
345 | const WriteBatch& batch, std::vector<uint32_t>* column_family_ids) { | |
346 | assert(column_family_ids != nullptr); | |
347 | column_family_ids->clear(); | |
348 | ColumnFamilyCollector handler; | |
349 | Status s = batch.Iterate(&handler); | |
350 | if (s.ok()) { | |
351 | for (const auto& cf : handler.column_families()) { | |
352 | column_family_ids->push_back(cf); | |
353 | } | |
354 | } | |
355 | return s; | |
356 | } | |
357 | ||
358 | bool OwnTablesAndLogs() const override { | |
359 | // Currently, the secondary instance does not own the database files. It | |
360 | // simply opens the files of the primary instance and tracks their file | |
361 | // descriptors until they become obsolete. In the future, the secondary may | |
362 | // create links to database files. OwnTablesAndLogs will return true then. | |
363 | return false; | |
364 | } | |
365 | ||
366 | private: | |
367 | friend class DB; | |
368 | ||
369 | // No copying allowed | |
370 | DBImplSecondary(const DBImplSecondary&); | |
371 | void operator=(const DBImplSecondary&); | |
372 | ||
373 | using DBImpl::Recover; | |
374 | ||
375 | Status FindAndRecoverLogFiles( | |
376 | std::unordered_set<ColumnFamilyData*>* cfds_changed, | |
377 | JobContext* job_context); | |
378 | Status FindNewLogNumbers(std::vector<uint64_t>* logs); | |
379 | // After manifest recovery, replay WALs and refresh log_readers_ if necessary | |
380 | // REQUIRES: log_numbers are sorted in ascending order | |
381 | Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers, | |
382 | SequenceNumber* next_sequence, | |
383 | std::unordered_set<ColumnFamilyData*>* cfds_changed, | |
384 | JobContext* job_context); | |
385 | ||
1e59de90 TL |
386 | // Run compaction without installation, the output files will be placed in the |
387 | // secondary DB path. The LSM tree won't be changed, the secondary DB is still | |
388 | // in read-only mode. | |
389 | Status CompactWithoutInstallation(const OpenAndCompactOptions& options, | |
390 | ColumnFamilyHandle* cfh, | |
391 | const CompactionServiceInput& input, | |
392 | CompactionServiceResult* result); | |
393 | ||
f67539c2 TL |
394 | std::unique_ptr<log::FragmentBufferedReader> manifest_reader_; |
395 | std::unique_ptr<log::Reader::Reporter> manifest_reporter_; | |
396 | std::unique_ptr<Status> manifest_reader_status_; | |
397 | ||
398 | // Cache log readers for each log number, used for continue WAL replay | |
399 | // after recovery | |
400 | std::map<uint64_t, std::unique_ptr<LogReaderContainer>> log_readers_; | |
401 | ||
402 | // Current WAL number replayed for each column family. | |
403 | std::unordered_map<ColumnFamilyData*, uint64_t> cfd_to_current_log_; | |
1e59de90 TL |
404 | |
405 | const std::string secondary_path_; | |
f67539c2 TL |
406 | }; |
407 | ||
408 | } // namespace ROCKSDB_NAMESPACE | |
409 | ||
410 | #endif // !ROCKSDB_LITE |