]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/memtable_list.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / db / memtable_list.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6 #include "db/memtable_list.h"
7
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
10 #endif
11
12 #include <inttypes.h>
13 #include <string>
14 #include "db/memtable.h"
15 #include "db/version_set.h"
16 #include "monitoring/thread_status_util.h"
17 #include "rocksdb/db.h"
18 #include "rocksdb/env.h"
19 #include "rocksdb/iterator.h"
20 #include "table/merging_iterator.h"
21 #include "util/coding.h"
22 #include "util/log_buffer.h"
23 #include "util/sync_point.h"
24
25 namespace rocksdb {
26
27 class InternalKeyComparator;
28 class Mutex;
29 class VersionSet;
30
31 void MemTableListVersion::AddMemTable(MemTable* m) {
32 memlist_.push_front(m);
33 *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage();
34 }
35
36 void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete,
37 MemTable* m) {
38 if (m->Unref()) {
39 to_delete->push_back(m);
40 assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage());
41 *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage();
42 } else {
43 }
44 }
45
46 MemTableListVersion::MemTableListVersion(
47 size_t* parent_memtable_list_memory_usage, MemTableListVersion* old)
48 : max_write_buffer_number_to_maintain_(
49 old->max_write_buffer_number_to_maintain_),
50 parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {
51 if (old != nullptr) {
52 memlist_ = old->memlist_;
53 for (auto& m : memlist_) {
54 m->Ref();
55 }
56
57 memlist_history_ = old->memlist_history_;
58 for (auto& m : memlist_history_) {
59 m->Ref();
60 }
61 }
62 }
63
64 MemTableListVersion::MemTableListVersion(
65 size_t* parent_memtable_list_memory_usage,
66 int max_write_buffer_number_to_maintain)
67 : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain),
68 parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {}
69
70 void MemTableListVersion::Ref() { ++refs_; }
71
72 // called by superversion::clean()
73 void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
74 assert(refs_ >= 1);
75 --refs_;
76 if (refs_ == 0) {
77 // if to_delete is equal to nullptr it means we're confident
78 // that refs_ will not be zero
79 assert(to_delete != nullptr);
80 for (const auto& m : memlist_) {
81 UnrefMemTable(to_delete, m);
82 }
83 for (const auto& m : memlist_history_) {
84 UnrefMemTable(to_delete, m);
85 }
86 delete this;
87 }
88 }
89
90 int MemTableList::NumNotFlushed() const {
91 int size = static_cast<int>(current_->memlist_.size());
92 assert(num_flush_not_started_ <= size);
93 return size;
94 }
95
96 int MemTableList::NumFlushed() const {
97 return static_cast<int>(current_->memlist_history_.size());
98 }
99
100 // Search all the memtables starting from the most recent one.
101 // Return the most recent value found, if any.
102 // Operands stores the list of merge operations to apply, so far.
103 bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
104 Status* s, MergeContext* merge_context,
105 RangeDelAggregator* range_del_agg,
106 SequenceNumber* seq,
107 const ReadOptions& read_opts) {
108 return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg,
109 seq, read_opts);
110 }
111
112 bool MemTableListVersion::GetFromHistory(const LookupKey& key,
113 std::string* value, Status* s,
114 MergeContext* merge_context,
115 RangeDelAggregator* range_del_agg,
116 SequenceNumber* seq,
117 const ReadOptions& read_opts) {
118 return GetFromList(&memlist_history_, key, value, s, merge_context,
119 range_del_agg, seq, read_opts);
120 }
121
122 bool MemTableListVersion::GetFromList(std::list<MemTable*>* list,
123 const LookupKey& key, std::string* value,
124 Status* s, MergeContext* merge_context,
125 RangeDelAggregator* range_del_agg,
126 SequenceNumber* seq,
127 const ReadOptions& read_opts) {
128 *seq = kMaxSequenceNumber;
129
130 for (auto& memtable : *list) {
131 SequenceNumber current_seq = kMaxSequenceNumber;
132
133 bool done = memtable->Get(key, value, s, merge_context, range_del_agg,
134 &current_seq, read_opts);
135 if (*seq == kMaxSequenceNumber) {
136 // Store the most recent sequence number of any operation on this key.
137 // Since we only care about the most recent change, we only need to
138 // return the first operation found when searching memtables in
139 // reverse-chronological order.
140 *seq = current_seq;
141 }
142
143 if (done) {
144 assert(*seq != kMaxSequenceNumber);
145 return true;
146 }
147 if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) {
148 return false;
149 }
150 }
151 return false;
152 }
153
154 Status MemTableListVersion::AddRangeTombstoneIterators(
155 const ReadOptions& read_opts, Arena* arena,
156 RangeDelAggregator* range_del_agg) {
157 assert(range_del_agg != nullptr);
158 for (auto& m : memlist_) {
159 std::unique_ptr<InternalIterator> range_del_iter(
160 m->NewRangeTombstoneIterator(read_opts));
161 Status s = range_del_agg->AddTombstones(std::move(range_del_iter));
162 if (!s.ok()) {
163 return s;
164 }
165 }
166 return Status::OK();
167 }
168
169 void MemTableListVersion::AddIterators(
170 const ReadOptions& options, std::vector<InternalIterator*>* iterator_list,
171 Arena* arena) {
172 for (auto& m : memlist_) {
173 iterator_list->push_back(m->NewIterator(options, arena));
174 }
175 }
176
177 void MemTableListVersion::AddIterators(
178 const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) {
179 for (auto& m : memlist_) {
180 merge_iter_builder->AddIterator(
181 m->NewIterator(options, merge_iter_builder->GetArena()));
182 }
183 }
184
185 uint64_t MemTableListVersion::GetTotalNumEntries() const {
186 uint64_t total_num = 0;
187 for (auto& m : memlist_) {
188 total_num += m->num_entries();
189 }
190 return total_num;
191 }
192
193 MemTable::MemTableStats MemTableListVersion::ApproximateStats(
194 const Slice& start_ikey, const Slice& end_ikey) {
195 MemTable::MemTableStats total_stats = {0, 0};
196 for (auto& m : memlist_) {
197 auto mStats = m->ApproximateStats(start_ikey, end_ikey);
198 total_stats.size += mStats.size;
199 total_stats.count += mStats.count;
200 }
201 return total_stats;
202 }
203
204 uint64_t MemTableListVersion::GetTotalNumDeletes() const {
205 uint64_t total_num = 0;
206 for (auto& m : memlist_) {
207 total_num += m->num_deletes();
208 }
209 return total_num;
210 }
211
212 SequenceNumber MemTableListVersion::GetEarliestSequenceNumber(
213 bool include_history) const {
214 if (include_history && !memlist_history_.empty()) {
215 return memlist_history_.back()->GetEarliestSequenceNumber();
216 } else if (!memlist_.empty()) {
217 return memlist_.back()->GetEarliestSequenceNumber();
218 } else {
219 return kMaxSequenceNumber;
220 }
221 }
222
223 // caller is responsible for referencing m
224 void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
225 assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
226 AddMemTable(m);
227
228 TrimHistory(to_delete);
229 }
230
231 // Removes m from list of memtables not flushed. Caller should NOT Unref m.
232 void MemTableListVersion::Remove(MemTable* m,
233 autovector<MemTable*>* to_delete) {
234 assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
235 memlist_.remove(m);
236
237 if (max_write_buffer_number_to_maintain_ > 0) {
238 memlist_history_.push_front(m);
239 TrimHistory(to_delete);
240 } else {
241 UnrefMemTable(to_delete, m);
242 }
243 }
244
245 // Make sure we don't use up too much space in history
246 void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete) {
247 while (memlist_.size() + memlist_history_.size() >
248 static_cast<size_t>(max_write_buffer_number_to_maintain_) &&
249 !memlist_history_.empty()) {
250 MemTable* x = memlist_history_.back();
251 memlist_history_.pop_back();
252
253 UnrefMemTable(to_delete, x);
254 }
255 }
256
257 // Returns true if there is at least one memtable on which flush has
258 // not yet started.
259 bool MemTableList::IsFlushPending() const {
260 if ((flush_requested_ && num_flush_not_started_ >= 1) ||
261 (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
262 assert(imm_flush_needed.load(std::memory_order_relaxed));
263 return true;
264 }
265 return false;
266 }
267
268 // Returns the memtables that need to be flushed.
269 void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
270 AutoThreadOperationStageUpdater stage_updater(
271 ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
272 const auto& memlist = current_->memlist_;
273 for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
274 MemTable* m = *it;
275 if (!m->flush_in_progress_) {
276 assert(!m->flush_completed_);
277 num_flush_not_started_--;
278 if (num_flush_not_started_ == 0) {
279 imm_flush_needed.store(false, std::memory_order_release);
280 }
281 m->flush_in_progress_ = true; // flushing will start very soon
282 ret->push_back(m);
283 }
284 }
285 flush_requested_ = false; // start-flush request is complete
286 }
287
288 void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
289 uint64_t file_number) {
290 AutoThreadOperationStageUpdater stage_updater(
291 ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
292 assert(!mems.empty());
293
294 // If the flush was not successful, then just reset state.
295 // Maybe a succeeding attempt to flush will be successful.
296 for (MemTable* m : mems) {
297 assert(m->flush_in_progress_);
298 assert(m->file_number_ == 0);
299
300 m->flush_in_progress_ = false;
301 m->flush_completed_ = false;
302 m->edit_.Clear();
303 num_flush_not_started_++;
304 }
305 imm_flush_needed.store(true, std::memory_order_release);
306 }
307
308 // Record a successful flush in the manifest file
309 Status MemTableList::InstallMemtableFlushResults(
310 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
311 const autovector<MemTable*>& mems, VersionSet* vset, InstrumentedMutex* mu,
312 uint64_t file_number, autovector<MemTable*>* to_delete,
313 Directory* db_directory, LogBuffer* log_buffer) {
314 AutoThreadOperationStageUpdater stage_updater(
315 ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
316 mu->AssertHeld();
317
318 // flush was successful
319 for (size_t i = 0; i < mems.size(); ++i) {
320 // All the edits are associated with the first memtable of this batch.
321 assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
322
323 mems[i]->flush_completed_ = true;
324 mems[i]->file_number_ = file_number;
325 }
326
327 // if some other thread is already committing, then return
328 Status s;
329 if (commit_in_progress_) {
330 TEST_SYNC_POINT("MemTableList::InstallMemtableFlushResults:InProgress");
331 return s;
332 }
333
334 // Only a single thread can be executing this piece of code
335 commit_in_progress_ = true;
336
337 // Retry until all completed flushes are committed. New flushes can finish
338 // while the current thread is writing manifest where mutex is released.
339 while (s.ok()) {
340 auto& memlist = current_->memlist_;
341 if (memlist.empty() || !memlist.back()->flush_completed_) {
342 break;
343 }
344 // scan all memtables from the earliest, and commit those
345 // (in that order) that have finished flushing. Memetables
346 // are always committed in the order that they were created.
347 uint64_t batch_file_number = 0;
348 size_t batch_count = 0;
349 autovector<VersionEdit*> edit_list;
350 // enumerate from the last (earliest) element to see how many batch finished
351 for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
352 MemTable* m = *it;
353 if (!m->flush_completed_) {
354 break;
355 }
356 if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
357 batch_file_number = m->file_number_;
358 ROCKS_LOG_BUFFER(log_buffer,
359 "[%s] Level-0 commit table #%" PRIu64 " started",
360 cfd->GetName().c_str(), m->file_number_);
361 edit_list.push_back(&m->edit_);
362 }
363 batch_count++;
364 }
365
366 if (batch_count > 0) {
367 // this can release and reacquire the mutex.
368 s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
369 db_directory);
370
371 // we will be changing the version in the next code path,
372 // so we better create a new one, since versions are immutable
373 InstallNewVersion();
374
375 // All the later memtables that have the same filenum
376 // are part of the same batch. They can be committed now.
377 uint64_t mem_id = 1; // how many memtables have been flushed.
378 if (s.ok()) { // commit new state
379 while (batch_count-- > 0) {
380 MemTable* m = current_->memlist_.back();
381 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64
382 ": memtable #%" PRIu64 " done",
383 cfd->GetName().c_str(), m->file_number_, mem_id);
384 assert(m->file_number_ > 0);
385 current_->Remove(m, to_delete);
386 ++mem_id;
387 }
388 } else {
389 for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) {
390 MemTable* m = *it;
391 // commit failed. setup state so that we can flush again.
392 ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64
393 ": memtable #%" PRIu64 " failed",
394 m->file_number_, mem_id);
395 m->flush_completed_ = false;
396 m->flush_in_progress_ = false;
397 m->edit_.Clear();
398 num_flush_not_started_++;
399 m->file_number_ = 0;
400 imm_flush_needed.store(true, std::memory_order_release);
401 ++mem_id;
402 }
403 }
404 }
405 }
406 commit_in_progress_ = false;
407 return s;
408 }
409
410 // New memtables are inserted at the front of the list.
411 void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
412 assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
413 InstallNewVersion();
414 // this method is used to move mutable memtable into an immutable list.
415 // since mutable memtable is already refcounted by the DBImpl,
416 // and when moving to the imutable list we don't unref it,
417 // we don't have to ref the memtable here. we just take over the
418 // reference from the DBImpl.
419 current_->Add(m, to_delete);
420 m->MarkImmutable();
421 num_flush_not_started_++;
422 if (num_flush_not_started_ == 1) {
423 imm_flush_needed.store(true, std::memory_order_release);
424 }
425 }
426
427 // Returns an estimate of the number of bytes of data in use.
428 size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
429 size_t total_size = 0;
430 for (auto& memtable : current_->memlist_) {
431 total_size += memtable->ApproximateMemoryUsage();
432 }
433 return total_size;
434 }
435
436 size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
437
438 void MemTableList::InstallNewVersion() {
439 if (current_->refs_ == 1) {
440 // we're the only one using the version, just keep using it
441 } else {
442 // somebody else holds the current version, we need to create new one
443 MemTableListVersion* version = current_;
444 current_ = new MemTableListVersion(&current_memory_usage_, current_);
445 current_->Ref();
446 version->Unref();
447 }
448 }
449
450 uint64_t MemTableList::GetMinLogContainingPrepSection() {
451 uint64_t min_log = 0;
452
453 for (auto& m : current_->memlist_) {
454 // this mem has been flushed it no longer
455 // needs to hold on the its prep section
456 if (m->flush_completed_) {
457 continue;
458 }
459
460 auto log = m->GetMinLogContainingPrepSection();
461
462 if (log > 0 && (min_log == 0 || log < min_log)) {
463 min_log = log;
464 }
465 }
466
467 return min_log;
468 }
469
470 } // namespace rocksdb