]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_impl_secondary.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / db_impl_secondary.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/db_impl_secondary.h"
7 #include "db/db_iter.h"
8 #include "db/merge_context.h"
9 #include "monitoring/perf_context_imp.h"
10 #include "util/auto_roll_logger.h"
11
12 namespace rocksdb {
13
14 #ifndef ROCKSDB_LITE
15
16 DBImplSecondary::DBImplSecondary(const DBOptions& db_options,
17 const std::string& dbname)
18 : DBImpl(db_options, dbname) {
19 ROCKS_LOG_INFO(immutable_db_options_.info_log,
20 "Opening the db in secondary mode");
21 LogFlush(immutable_db_options_.info_log);
22 }
23
24 DBImplSecondary::~DBImplSecondary() {}
25
26 Status DBImplSecondary::Recover(
27 const std::vector<ColumnFamilyDescriptor>& column_families,
28 bool /*readonly*/, bool /*error_if_log_file_exist*/,
29 bool /*error_if_data_exists_in_logs*/) {
30 mutex_.AssertHeld();
31
32 Status s;
33 s = static_cast<ReactiveVersionSet*>(versions_.get())
34 ->Recover(column_families, &manifest_reader_, &manifest_reporter_,
35 &manifest_reader_status_);
36 if (!s.ok()) {
37 return s;
38 }
39 if (immutable_db_options_.paranoid_checks && s.ok()) {
40 s = CheckConsistency();
41 }
42 // Initial max_total_in_memory_state_ before recovery logs.
43 max_total_in_memory_state_ = 0;
44 for (auto cfd : *versions_->GetColumnFamilySet()) {
45 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
46 max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
47 mutable_cf_options->max_write_buffer_number;
48 }
49 if (s.ok()) {
50 default_cf_handle_ = new ColumnFamilyHandleImpl(
51 versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
52 default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
53 single_column_family_mode_ =
54 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
55 }
56
57 // TODO: attempt to recover from WAL files.
58 return s;
59 }
60
61 // Implementation of the DB interface
62 Status DBImplSecondary::Get(const ReadOptions& read_options,
63 ColumnFamilyHandle* column_family, const Slice& key,
64 PinnableSlice* value) {
65 return GetImpl(read_options, column_family, key, value);
66 }
67
68 Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
69 ColumnFamilyHandle* column_family,
70 const Slice& key, PinnableSlice* pinnable_val) {
71 assert(pinnable_val != nullptr);
72 PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
73 StopWatch sw(env_, stats_, DB_GET);
74 PERF_TIMER_GUARD(get_snapshot_time);
75
76 auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
77 ColumnFamilyData* cfd = cfh->cfd();
78 if (tracer_) {
79 InstrumentedMutexLock lock(&trace_mutex_);
80 if (tracer_) {
81 tracer_->Get(column_family, key);
82 }
83 }
84 // Acquire SuperVersion
85 SuperVersion* super_version = GetAndRefSuperVersion(cfd);
86 SequenceNumber snapshot = versions_->LastSequence();
87 MergeContext merge_context;
88 SequenceNumber max_covering_tombstone_seq = 0;
89 Status s;
90 LookupKey lkey(key, snapshot);
91 PERF_TIMER_STOP(get_snapshot_time);
92
93 bool done = false;
94 if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
95 &max_covering_tombstone_seq, read_options)) {
96 done = true;
97 pinnable_val->PinSelf();
98 RecordTick(stats_, MEMTABLE_HIT);
99 } else if ((s.ok() || s.IsMergeInProgress()) &&
100 super_version->imm->Get(
101 lkey, pinnable_val->GetSelf(), &s, &merge_context,
102 &max_covering_tombstone_seq, read_options)) {
103 done = true;
104 pinnable_val->PinSelf();
105 RecordTick(stats_, MEMTABLE_HIT);
106 }
107 if (!done && !s.ok() && !s.IsMergeInProgress()) {
108 ReturnAndCleanupSuperVersion(cfd, super_version);
109 return s;
110 }
111 if (!done) {
112 PERF_TIMER_GUARD(get_from_output_files_time);
113 super_version->current->Get(read_options, lkey, pinnable_val, &s,
114 &merge_context, &max_covering_tombstone_seq);
115 RecordTick(stats_, MEMTABLE_MISS);
116 }
117 {
118 PERF_TIMER_GUARD(get_post_process_time);
119 ReturnAndCleanupSuperVersion(cfd, super_version);
120 RecordTick(stats_, NUMBER_KEYS_READ);
121 size_t size = pinnable_val->size();
122 RecordTick(stats_, BYTES_READ, size);
123 RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
124 PERF_COUNTER_ADD(get_read_bytes, size);
125 }
126 return s;
127 }
128
129 Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
130 ColumnFamilyHandle* column_family) {
131 if (read_options.managed) {
132 return NewErrorIterator(
133 Status::NotSupported("Managed iterator is not supported anymore."));
134 }
135 if (read_options.read_tier == kPersistedTier) {
136 return NewErrorIterator(Status::NotSupported(
137 "ReadTier::kPersistedData is not yet supported in iterators."));
138 }
139 Iterator* result = nullptr;
140 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
141 auto cfd = cfh->cfd();
142 ReadCallback* read_callback = nullptr; // No read callback provided.
143 if (read_options.tailing) {
144 return NewErrorIterator(Status::NotSupported(
145 "tailing iterator not supported in secondary mode"));
146 } else if (read_options.snapshot != nullptr) {
147 // TODO (yanqin) support snapshot.
148 return NewErrorIterator(
149 Status::NotSupported("snapshot not supported in secondary mode"));
150 } else {
151 auto snapshot = versions_->LastSequence();
152 result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
153 }
154 return result;
155 }
156
157 ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
158 const ReadOptions& read_options, ColumnFamilyData* cfd,
159 SequenceNumber snapshot, ReadCallback* read_callback) {
160 assert(nullptr != cfd);
161 SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
162 auto db_iter = NewArenaWrappedDbIterator(
163 env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
164 snapshot,
165 super_version->mutable_cf_options.max_sequential_skip_in_iterations,
166 super_version->version_number, read_callback);
167 auto internal_iter =
168 NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
169 db_iter->GetRangeDelAggregator(), snapshot);
170 db_iter->SetIterUnderDBIter(internal_iter);
171 return db_iter;
172 }
173
174 Status DBImplSecondary::NewIterators(
175 const ReadOptions& read_options,
176 const std::vector<ColumnFamilyHandle*>& column_families,
177 std::vector<Iterator*>* iterators) {
178 if (read_options.managed) {
179 return Status::NotSupported("Managed iterator is not supported anymore.");
180 }
181 if (read_options.read_tier == kPersistedTier) {
182 return Status::NotSupported(
183 "ReadTier::kPersistedData is not yet supported in iterators.");
184 }
185 ReadCallback* read_callback = nullptr; // No read callback provided.
186 if (iterators == nullptr) {
187 return Status::InvalidArgument("iterators not allowed to be nullptr");
188 }
189 iterators->clear();
190 iterators->reserve(column_families.size());
191 if (read_options.tailing) {
192 return Status::NotSupported(
193 "tailing iterator not supported in secondary mode");
194 } else if (read_options.snapshot != nullptr) {
195 // TODO (yanqin) support snapshot.
196 return Status::NotSupported("snapshot not supported in secondary mode");
197 } else {
198 SequenceNumber read_seq = versions_->LastSequence();
199 for (auto cfh : column_families) {
200 ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
201 iterators->push_back(
202 NewIteratorImpl(read_options, cfd, read_seq, read_callback));
203 }
204 }
205 return Status::OK();
206 }
207
208 Status DBImplSecondary::TryCatchUpWithPrimary() {
209 assert(versions_.get() != nullptr);
210 assert(manifest_reader_.get() != nullptr);
211 Status s;
212 std::unordered_set<ColumnFamilyData*> cfds_changed;
213 InstrumentedMutexLock lock_guard(&mutex_);
214 s = static_cast<ReactiveVersionSet*>(versions_.get())
215 ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
216 if (s.ok()) {
217 SuperVersionContext sv_context(true /* create_superversion */);
218 for (auto cfd : cfds_changed) {
219 sv_context.NewSuperVersion();
220 cfd->InstallSuperVersion(&sv_context, &mutex_);
221 }
222 sv_context.Clean();
223 }
224 return s;
225 }
226
227 Status DB::OpenAsSecondary(const Options& options, const std::string& dbname,
228 const std::string& secondary_path, DB** dbptr) {
229 *dbptr = nullptr;
230
231 DBOptions db_options(options);
232 ColumnFamilyOptions cf_options(options);
233 std::vector<ColumnFamilyDescriptor> column_families;
234 column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
235 std::vector<ColumnFamilyHandle*> handles;
236
237 Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path,
238 column_families, &handles, dbptr);
239 if (s.ok()) {
240 assert(handles.size() == 1);
241 delete handles[0];
242 }
243 return s;
244 }
245
246 Status DB::OpenAsSecondary(
247 const DBOptions& db_options, const std::string& dbname,
248 const std::string& secondary_path,
249 const std::vector<ColumnFamilyDescriptor>& column_families,
250 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
251 *dbptr = nullptr;
252 if (db_options.max_open_files != -1) {
253 // TODO (yanqin) maybe support max_open_files != -1 by creating hard links
254 // on SST files so that db secondary can still have access to old SSTs
255 // while primary instance may delete original.
256 return Status::InvalidArgument("require max_open_files to be -1");
257 }
258
259 DBOptions tmp_opts(db_options);
260 if (nullptr == tmp_opts.info_log) {
261 Env* env = tmp_opts.env;
262 assert(env != nullptr);
263 std::string secondary_abs_path;
264 env->GetAbsolutePath(secondary_path, &secondary_abs_path);
265 std::string fname = InfoLogFileName(secondary_path, secondary_abs_path,
266 tmp_opts.db_log_dir);
267
268 env->CreateDirIfMissing(secondary_path);
269 if (tmp_opts.log_file_time_to_roll > 0 || tmp_opts.max_log_file_size > 0) {
270 AutoRollLogger* result = new AutoRollLogger(
271 env, secondary_path, tmp_opts.db_log_dir, tmp_opts.max_log_file_size,
272 tmp_opts.log_file_time_to_roll, tmp_opts.info_log_level);
273 Status s = result->GetStatus();
274 if (!s.ok()) {
275 delete result;
276 } else {
277 tmp_opts.info_log.reset(result);
278 }
279 }
280 if (nullptr == tmp_opts.info_log) {
281 env->RenameFile(
282 fname, OldInfoLogFileName(secondary_path, env->NowMicros(),
283 secondary_abs_path, tmp_opts.db_log_dir));
284 Status s = env->NewLogger(fname, &(tmp_opts.info_log));
285 if (tmp_opts.info_log != nullptr) {
286 tmp_opts.info_log->SetInfoLogLevel(tmp_opts.info_log_level);
287 }
288 }
289 }
290
291 assert(tmp_opts.info_log != nullptr);
292
293 handles->clear();
294 DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname);
295 impl->versions_.reset(new ReactiveVersionSet(
296 dbname, &impl->immutable_db_options_, impl->env_options_,
297 impl->table_cache_.get(), impl->write_buffer_manager_,
298 &impl->write_controller_));
299 impl->column_family_memtables_.reset(
300 new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
301 impl->mutex_.Lock();
302 Status s = impl->Recover(column_families, true, false, false);
303 if (s.ok()) {
304 for (auto cf : column_families) {
305 auto cfd =
306 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
307 if (nullptr == cfd) {
308 s = Status::InvalidArgument("Column family not found: ", cf.name);
309 break;
310 }
311 handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
312 }
313 }
314 SuperVersionContext sv_context(true /* create_superversion */);
315 if (s.ok()) {
316 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
317 sv_context.NewSuperVersion();
318 cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
319 }
320 }
321 impl->mutex_.Unlock();
322 sv_context.Clean();
323 if (s.ok()) {
324 *dbptr = impl;
325 for (auto h : *handles) {
326 impl->NewThreadStatusCfInfo(
327 reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
328 }
329 } else {
330 for (auto h : *handles) {
331 delete h;
332 }
333 handles->clear();
334 delete impl;
335 }
336 return s;
337 }
338 #else // !ROCKSDB_LITE
339
340 Status DB::OpenAsSecondary(const Options& /*options*/,
341 const std::string& /*name*/,
342 const std::string& /*secondary_path*/,
343 DB** /*dbptr*/) {
344 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
345 }
346
347 Status DB::OpenAsSecondary(
348 const DBOptions& /*db_options*/, const std::string& /*dbname*/,
349 const std::string& /*secondary_path*/,
350 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
351 std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/) {
352 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
353 }
354 #endif // !ROCKSDB_LITE
355
356 } // namespace rocksdb