]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #pragma once | |
7 | ||
8 | #ifndef ROCKSDB_LITE | |
9 | ||
10 | #include <string> | |
11 | #include <vector> | |
12 | #include "db/db_impl/db_impl.h" | |
13 | ||
14 | namespace ROCKSDB_NAMESPACE { | |
15 | ||
16 | // A wrapper class to hold log reader, log reporter, log status. | |
17 | class LogReaderContainer { | |
18 | public: | |
19 | LogReaderContainer() | |
20 | : reader_(nullptr), reporter_(nullptr), status_(nullptr) {} | |
21 | LogReaderContainer(Env* env, std::shared_ptr<Logger> info_log, | |
22 | std::string fname, | |
23 | std::unique_ptr<SequentialFileReader>&& file_reader, | |
24 | uint64_t log_number) { | |
25 | LogReporter* reporter = new LogReporter(); | |
26 | status_ = new Status(); | |
27 | reporter->env = env; | |
28 | reporter->info_log = info_log.get(); | |
29 | reporter->fname = std::move(fname); | |
30 | reporter->status = status_; | |
31 | reporter_ = reporter; | |
32 | // We intentially make log::Reader do checksumming even if | |
33 | // paranoid_checks==false so that corruptions cause entire commits | |
34 | // to be skipped instead of propagating bad information (like overly | |
35 | // large sequence numbers). | |
36 | reader_ = new log::FragmentBufferedReader(info_log, std::move(file_reader), | |
37 | reporter, true /*checksum*/, | |
38 | log_number); | |
39 | } | |
40 | log::FragmentBufferedReader* reader_; | |
41 | log::Reader::Reporter* reporter_; | |
42 | Status* status_; | |
43 | ~LogReaderContainer() { | |
44 | delete reader_; | |
45 | delete reporter_; | |
46 | delete status_; | |
47 | } | |
48 | private: | |
49 | struct LogReporter : public log::Reader::Reporter { | |
50 | Env* env; | |
51 | Logger* info_log; | |
52 | std::string fname; | |
53 | Status* status; // nullptr if immutable_db_options_.paranoid_checks==false | |
54 | void Corruption(size_t bytes, const Status& s) override { | |
55 | ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s", | |
56 | (this->status == nullptr ? "(ignoring error) " : ""), | |
57 | fname.c_str(), static_cast<int>(bytes), | |
58 | s.ToString().c_str()); | |
59 | if (this->status != nullptr && this->status->ok()) { | |
60 | *this->status = s; | |
61 | } | |
62 | } | |
63 | }; | |
64 | }; | |
65 | ||
66 | // The secondary instance shares access to the storage as the primary. | |
67 | // The secondary is able to read and replay changes described in both the | |
68 | // MANIFEST and the WAL files without coordination with the primary. | |
69 | // The secondary instance can be opened using `DB::OpenAsSecondary`. After | |
70 | // that, it can call `DBImplSecondary::TryCatchUpWithPrimary` to make best | |
71 | // effort attempts to catch up with the primary. | |
72 | class DBImplSecondary : public DBImpl { | |
73 | public: | |
74 | DBImplSecondary(const DBOptions& options, const std::string& dbname); | |
75 | ~DBImplSecondary() override; | |
76 | ||
77 | // Recover by replaying MANIFEST and WAL. Also initialize manifest_reader_ | |
78 | // and log_readers_ to facilitate future operations. | |
79 | Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families, | |
20effc67 TL |
80 | bool read_only, bool error_if_wal_file_exists, |
81 | bool error_if_data_exists_in_wals, | |
f67539c2 TL |
82 | uint64_t* = nullptr) override; |
83 | ||
84 | // Implementations of the DB interface | |
85 | using DB::Get; | |
86 | Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, | |
87 | const Slice& key, PinnableSlice* value) override; | |
88 | ||
89 | Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, | |
90 | const Slice& key, PinnableSlice* value); | |
91 | ||
92 | using DBImpl::NewIterator; | |
93 | Iterator* NewIterator(const ReadOptions&, | |
94 | ColumnFamilyHandle* column_family) override; | |
95 | ||
96 | ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options, | |
97 | ColumnFamilyData* cfd, | |
98 | SequenceNumber snapshot, | |
99 | ReadCallback* read_callback); | |
100 | ||
101 | Status NewIterators(const ReadOptions& options, | |
102 | const std::vector<ColumnFamilyHandle*>& column_families, | |
103 | std::vector<Iterator*>* iterators) override; | |
104 | ||
105 | using DBImpl::Put; | |
106 | Status Put(const WriteOptions& /*options*/, | |
107 | ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, | |
108 | const Slice& /*value*/) override { | |
109 | return Status::NotSupported("Not supported operation in secondary mode."); | |
110 | } | |
111 | ||
112 | using DBImpl::Merge; | |
113 | Status Merge(const WriteOptions& /*options*/, | |
114 | ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, | |
115 | const Slice& /*value*/) override { | |
116 | return Status::NotSupported("Not supported operation in secondary mode."); | |
117 | } | |
118 | ||
119 | using DBImpl::Delete; | |
120 | Status Delete(const WriteOptions& /*options*/, | |
121 | ColumnFamilyHandle* /*column_family*/, | |
122 | const Slice& /*key*/) override { | |
123 | return Status::NotSupported("Not supported operation in secondary mode."); | |
124 | } | |
125 | ||
126 | using DBImpl::SingleDelete; | |
127 | Status SingleDelete(const WriteOptions& /*options*/, | |
128 | ColumnFamilyHandle* /*column_family*/, | |
129 | const Slice& /*key*/) override { | |
130 | return Status::NotSupported("Not supported operation in secondary mode."); | |
131 | } | |
132 | ||
133 | Status Write(const WriteOptions& /*options*/, | |
134 | WriteBatch* /*updates*/) override { | |
135 | return Status::NotSupported("Not supported operation in secondary mode."); | |
136 | } | |
137 | ||
138 | using DBImpl::CompactRange; | |
139 | Status CompactRange(const CompactRangeOptions& /*options*/, | |
140 | ColumnFamilyHandle* /*column_family*/, | |
141 | const Slice* /*begin*/, const Slice* /*end*/) override { | |
142 | return Status::NotSupported("Not supported operation in secondary mode."); | |
143 | } | |
144 | ||
145 | using DBImpl::CompactFiles; | |
146 | Status CompactFiles( | |
147 | const CompactionOptions& /*compact_options*/, | |
148 | ColumnFamilyHandle* /*column_family*/, | |
149 | const std::vector<std::string>& /*input_file_names*/, | |
150 | const int /*output_level*/, const int /*output_path_id*/ = -1, | |
151 | std::vector<std::string>* const /*output_file_names*/ = nullptr, | |
152 | CompactionJobInfo* /*compaction_job_info*/ = nullptr) override { | |
153 | return Status::NotSupported("Not supported operation in secondary mode."); | |
154 | } | |
155 | ||
156 | Status DisableFileDeletions() override { | |
157 | return Status::NotSupported("Not supported operation in secondary mode."); | |
158 | } | |
159 | ||
160 | Status EnableFileDeletions(bool /*force*/) override { | |
161 | return Status::NotSupported("Not supported operation in secondary mode."); | |
162 | } | |
163 | ||
164 | Status GetLiveFiles(std::vector<std::string>&, | |
165 | uint64_t* /*manifest_file_size*/, | |
166 | bool /*flush_memtable*/ = true) override { | |
167 | return Status::NotSupported("Not supported operation in secondary mode."); | |
168 | } | |
169 | ||
170 | using DBImpl::Flush; | |
171 | Status Flush(const FlushOptions& /*options*/, | |
172 | ColumnFamilyHandle* /*column_family*/) override { | |
173 | return Status::NotSupported("Not supported operation in secondary mode."); | |
174 | } | |
175 | ||
176 | using DBImpl::SetDBOptions; | |
177 | Status SetDBOptions(const std::unordered_map<std::string, std::string>& | |
178 | /*options_map*/) override { | |
179 | // Currently not supported because changing certain options may cause | |
180 | // flush/compaction. | |
181 | return Status::NotSupported("Not supported operation in secondary mode."); | |
182 | } | |
183 | ||
184 | using DBImpl::SetOptions; | |
185 | Status SetOptions( | |
186 | ColumnFamilyHandle* /*cfd*/, | |
187 | const std::unordered_map<std::string, std::string>& /*options_map*/) | |
188 | override { | |
189 | // Currently not supported because changing certain options may cause | |
190 | // flush/compaction and/or write to MANIFEST. | |
191 | return Status::NotSupported("Not supported operation in secondary mode."); | |
192 | } | |
193 | ||
194 | using DBImpl::SyncWAL; | |
195 | Status SyncWAL() override { | |
196 | return Status::NotSupported("Not supported operation in secondary mode."); | |
197 | } | |
198 | ||
199 | using DB::IngestExternalFile; | |
200 | Status IngestExternalFile( | |
201 | ColumnFamilyHandle* /*column_family*/, | |
202 | const std::vector<std::string>& /*external_files*/, | |
203 | const IngestExternalFileOptions& /*ingestion_options*/) override { | |
204 | return Status::NotSupported("Not supported operation in secondary mode."); | |
205 | } | |
206 | ||
207 | // Try to catch up with the primary by reading as much as possible from the | |
208 | // log files until there is nothing more to read or encounters an error. If | |
209 | // the amount of information in the log files to process is huge, this | |
210 | // method can take long time due to all the I/O and CPU costs. | |
211 | Status TryCatchUpWithPrimary() override; | |
212 | ||
213 | ||
214 | // Try to find log reader using log_number from log_readers_ map, initialize | |
215 | // if it doesn't exist | |
216 | Status MaybeInitLogReader(uint64_t log_number, | |
217 | log::FragmentBufferedReader** log_reader); | |
218 | ||
219 | // Check if all live files exist on file system and that their file sizes | |
220 | // matche to the in-memory records. It is possible that some live files may | |
221 | // have been deleted by the primary. In this case, CheckConsistency() does | |
222 | // not flag the missing file as inconsistency. | |
223 | Status CheckConsistency() override; | |
224 | ||
225 | protected: | |
226 | // ColumnFamilyCollector is a write batch handler which does nothing | |
227 | // except recording unique column family IDs | |
228 | class ColumnFamilyCollector : public WriteBatch::Handler { | |
229 | std::unordered_set<uint32_t> column_family_ids_; | |
230 | ||
231 | Status AddColumnFamilyId(uint32_t column_family_id) { | |
232 | if (column_family_ids_.find(column_family_id) == | |
233 | column_family_ids_.end()) { | |
234 | column_family_ids_.insert(column_family_id); | |
235 | } | |
236 | return Status::OK(); | |
237 | } | |
238 | ||
239 | public: | |
240 | explicit ColumnFamilyCollector() {} | |
241 | ||
242 | ~ColumnFamilyCollector() override {} | |
243 | ||
244 | Status PutCF(uint32_t column_family_id, const Slice&, | |
245 | const Slice&) override { | |
246 | return AddColumnFamilyId(column_family_id); | |
247 | } | |
248 | ||
249 | Status DeleteCF(uint32_t column_family_id, const Slice&) override { | |
250 | return AddColumnFamilyId(column_family_id); | |
251 | } | |
252 | ||
253 | Status SingleDeleteCF(uint32_t column_family_id, const Slice&) override { | |
254 | return AddColumnFamilyId(column_family_id); | |
255 | } | |
256 | ||
257 | Status DeleteRangeCF(uint32_t column_family_id, const Slice&, | |
258 | const Slice&) override { | |
259 | return AddColumnFamilyId(column_family_id); | |
260 | } | |
261 | ||
262 | Status MergeCF(uint32_t column_family_id, const Slice&, | |
263 | const Slice&) override { | |
264 | return AddColumnFamilyId(column_family_id); | |
265 | } | |
266 | ||
267 | Status PutBlobIndexCF(uint32_t column_family_id, const Slice&, | |
268 | const Slice&) override { | |
269 | return AddColumnFamilyId(column_family_id); | |
270 | } | |
271 | ||
272 | const std::unordered_set<uint32_t>& column_families() const { | |
273 | return column_family_ids_; | |
274 | } | |
275 | }; | |
276 | ||
277 | Status CollectColumnFamilyIdsFromWriteBatch( | |
278 | const WriteBatch& batch, std::vector<uint32_t>* column_family_ids) { | |
279 | assert(column_family_ids != nullptr); | |
280 | column_family_ids->clear(); | |
281 | ColumnFamilyCollector handler; | |
282 | Status s = batch.Iterate(&handler); | |
283 | if (s.ok()) { | |
284 | for (const auto& cf : handler.column_families()) { | |
285 | column_family_ids->push_back(cf); | |
286 | } | |
287 | } | |
288 | return s; | |
289 | } | |
290 | ||
291 | bool OwnTablesAndLogs() const override { | |
292 | // Currently, the secondary instance does not own the database files. It | |
293 | // simply opens the files of the primary instance and tracks their file | |
294 | // descriptors until they become obsolete. In the future, the secondary may | |
295 | // create links to database files. OwnTablesAndLogs will return true then. | |
296 | return false; | |
297 | } | |
298 | ||
299 | private: | |
300 | friend class DB; | |
301 | ||
302 | // No copying allowed | |
303 | DBImplSecondary(const DBImplSecondary&); | |
304 | void operator=(const DBImplSecondary&); | |
305 | ||
306 | using DBImpl::Recover; | |
307 | ||
308 | Status FindAndRecoverLogFiles( | |
309 | std::unordered_set<ColumnFamilyData*>* cfds_changed, | |
310 | JobContext* job_context); | |
311 | Status FindNewLogNumbers(std::vector<uint64_t>* logs); | |
312 | // After manifest recovery, replay WALs and refresh log_readers_ if necessary | |
313 | // REQUIRES: log_numbers are sorted in ascending order | |
314 | Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers, | |
315 | SequenceNumber* next_sequence, | |
316 | std::unordered_set<ColumnFamilyData*>* cfds_changed, | |
317 | JobContext* job_context); | |
318 | ||
319 | std::unique_ptr<log::FragmentBufferedReader> manifest_reader_; | |
320 | std::unique_ptr<log::Reader::Reporter> manifest_reporter_; | |
321 | std::unique_ptr<Status> manifest_reader_status_; | |
322 | ||
323 | // Cache log readers for each log number, used for continue WAL replay | |
324 | // after recovery | |
325 | std::map<uint64_t, std::unique_ptr<LogReaderContainer>> log_readers_; | |
326 | ||
327 | // Current WAL number replayed for each column family. | |
328 | std::unordered_map<ColumnFamilyData*, uint64_t> cfd_to_current_log_; | |
329 | }; | |
330 | ||
331 | } // namespace ROCKSDB_NAMESPACE | |
332 | ||
333 | #endif // !ROCKSDB_LITE |