]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/memtable_list.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / memtable_list.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6#include "db/memtable_list.h"
7
f67539c2 8#include <cinttypes>
11fdf7f2 9#include <limits>
494da23a 10#include <queue>
7c673cae 11#include <string>
f67539c2 12#include "db/db_impl/db_impl.h"
7c673cae 13#include "db/memtable.h"
494da23a 14#include "db/range_tombstone_fragmenter.h"
7c673cae 15#include "db/version_set.h"
f67539c2 16#include "logging/log_buffer.h"
7c673cae
FG
17#include "monitoring/thread_status_util.h"
18#include "rocksdb/db.h"
19#include "rocksdb/env.h"
20#include "rocksdb/iterator.h"
21#include "table/merging_iterator.h"
f67539c2 22#include "test_util/sync_point.h"
7c673cae 23#include "util/coding.h"
7c673cae 24
f67539c2 25namespace ROCKSDB_NAMESPACE {
7c673cae
FG
26
27class InternalKeyComparator;
28class Mutex;
29class VersionSet;
30
31void MemTableListVersion::AddMemTable(MemTable* m) {
32 memlist_.push_front(m);
33 *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage();
34}
35
36void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete,
37 MemTable* m) {
38 if (m->Unref()) {
39 to_delete->push_back(m);
40 assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage());
41 *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage();
7c673cae
FG
42 }
43}
44
45MemTableListVersion::MemTableListVersion(
46 size_t* parent_memtable_list_memory_usage, MemTableListVersion* old)
47 : max_write_buffer_number_to_maintain_(
48 old->max_write_buffer_number_to_maintain_),
f67539c2
TL
49 max_write_buffer_size_to_maintain_(
50 old->max_write_buffer_size_to_maintain_),
7c673cae
FG
51 parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {
52 if (old != nullptr) {
53 memlist_ = old->memlist_;
54 for (auto& m : memlist_) {
55 m->Ref();
56 }
57
58 memlist_history_ = old->memlist_history_;
59 for (auto& m : memlist_history_) {
60 m->Ref();
61 }
62 }
63}
64
65MemTableListVersion::MemTableListVersion(
66 size_t* parent_memtable_list_memory_usage,
f67539c2
TL
67 int max_write_buffer_number_to_maintain,
68 int64_t max_write_buffer_size_to_maintain)
7c673cae 69 : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain),
f67539c2 70 max_write_buffer_size_to_maintain_(max_write_buffer_size_to_maintain),
7c673cae
FG
71 parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {}
72
73void MemTableListVersion::Ref() { ++refs_; }
74
75// called by superversion::clean()
76void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
77 assert(refs_ >= 1);
78 --refs_;
79 if (refs_ == 0) {
80 // if to_delete is equal to nullptr it means we're confident
81 // that refs_ will not be zero
82 assert(to_delete != nullptr);
83 for (const auto& m : memlist_) {
84 UnrefMemTable(to_delete, m);
85 }
86 for (const auto& m : memlist_history_) {
87 UnrefMemTable(to_delete, m);
88 }
89 delete this;
90 }
91}
92
93int MemTableList::NumNotFlushed() const {
94 int size = static_cast<int>(current_->memlist_.size());
95 assert(num_flush_not_started_ <= size);
96 return size;
97}
98
99int MemTableList::NumFlushed() const {
100 return static_cast<int>(current_->memlist_history_.size());
101}
102
103// Search all the memtables starting from the most recent one.
104// Return the most recent value found, if any.
105// Operands stores the list of merge operations to apply, so far.
106bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
107 Status* s, MergeContext* merge_context,
494da23a 108 SequenceNumber* max_covering_tombstone_seq,
11fdf7f2
TL
109 SequenceNumber* seq, const ReadOptions& read_opts,
110 ReadCallback* callback, bool* is_blob_index) {
494da23a
TL
111 return GetFromList(&memlist_, key, value, s, merge_context,
112 max_covering_tombstone_seq, seq, read_opts, callback,
113 is_blob_index);
7c673cae
FG
114}
115
f67539c2
TL
116void MemTableListVersion::MultiGet(const ReadOptions& read_options,
117 MultiGetRange* range, ReadCallback* callback,
118 bool* is_blob) {
119 for (auto memtable : memlist_) {
120 memtable->MultiGet(read_options, range, callback, is_blob);
121 if (range->empty()) {
122 return;
123 }
124 }
125}
126
127bool MemTableListVersion::GetMergeOperands(
128 const LookupKey& key, Status* s, MergeContext* merge_context,
129 SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) {
130 for (MemTable* memtable : memlist_) {
131 bool done = memtable->Get(key, nullptr, s, merge_context,
132 max_covering_tombstone_seq, read_opts, nullptr,
133 nullptr, false);
134 if (done) {
135 return true;
136 }
137 }
138 return false;
139}
140
11fdf7f2
TL
141bool MemTableListVersion::GetFromHistory(
142 const LookupKey& key, std::string* value, Status* s,
494da23a 143 MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq,
11fdf7f2 144 SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) {
7c673cae 145 return GetFromList(&memlist_history_, key, value, s, merge_context,
494da23a
TL
146 max_covering_tombstone_seq, seq, read_opts,
147 nullptr /*read_callback*/, is_blob_index);
7c673cae
FG
148}
149
11fdf7f2
TL
150bool MemTableListVersion::GetFromList(
151 std::list<MemTable*>* list, const LookupKey& key, std::string* value,
494da23a
TL
152 Status* s, MergeContext* merge_context,
153 SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
154 const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) {
7c673cae
FG
155 *seq = kMaxSequenceNumber;
156
157 for (auto& memtable : *list) {
158 SequenceNumber current_seq = kMaxSequenceNumber;
159
494da23a
TL
160 bool done =
161 memtable->Get(key, value, s, merge_context, max_covering_tombstone_seq,
162 &current_seq, read_opts, callback, is_blob_index);
7c673cae
FG
163 if (*seq == kMaxSequenceNumber) {
164 // Store the most recent sequence number of any operation on this key.
165 // Since we only care about the most recent change, we only need to
166 // return the first operation found when searching memtables in
167 // reverse-chronological order.
11fdf7f2
TL
168 // current_seq would be equal to kMaxSequenceNumber if the value was to be
169 // skipped. This allows seq to be assigned again when the next value is
170 // read.
7c673cae
FG
171 *seq = current_seq;
172 }
173
174 if (done) {
494da23a 175 assert(*seq != kMaxSequenceNumber || s->IsNotFound());
7c673cae
FG
176 return true;
177 }
178 if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) {
179 return false;
180 }
181 }
182 return false;
183}
184
185Status MemTableListVersion::AddRangeTombstoneIterators(
11fdf7f2 186 const ReadOptions& read_opts, Arena* /*arena*/,
7c673cae
FG
187 RangeDelAggregator* range_del_agg) {
188 assert(range_del_agg != nullptr);
f67539c2
TL
189 // Except for snapshot read, using kMaxSequenceNumber is OK because these
190 // are immutable memtables.
191 SequenceNumber read_seq = read_opts.snapshot != nullptr
192 ? read_opts.snapshot->GetSequenceNumber()
193 : kMaxSequenceNumber;
7c673cae 194 for (auto& m : memlist_) {
494da23a 195 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
f67539c2 196 m->NewRangeTombstoneIterator(read_opts, read_seq));
494da23a 197 range_del_agg->AddTombstones(std::move(range_del_iter));
11fdf7f2
TL
198 }
199 return Status::OK();
200}
201
7c673cae
FG
202void MemTableListVersion::AddIterators(
203 const ReadOptions& options, std::vector<InternalIterator*>* iterator_list,
204 Arena* arena) {
205 for (auto& m : memlist_) {
206 iterator_list->push_back(m->NewIterator(options, arena));
207 }
208}
209
210void MemTableListVersion::AddIterators(
211 const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) {
212 for (auto& m : memlist_) {
213 merge_iter_builder->AddIterator(
214 m->NewIterator(options, merge_iter_builder->GetArena()));
215 }
216}
217
218uint64_t MemTableListVersion::GetTotalNumEntries() const {
219 uint64_t total_num = 0;
220 for (auto& m : memlist_) {
221 total_num += m->num_entries();
222 }
223 return total_num;
224}
225
226MemTable::MemTableStats MemTableListVersion::ApproximateStats(
227 const Slice& start_ikey, const Slice& end_ikey) {
228 MemTable::MemTableStats total_stats = {0, 0};
229 for (auto& m : memlist_) {
230 auto mStats = m->ApproximateStats(start_ikey, end_ikey);
231 total_stats.size += mStats.size;
232 total_stats.count += mStats.count;
233 }
234 return total_stats;
235}
236
237uint64_t MemTableListVersion::GetTotalNumDeletes() const {
238 uint64_t total_num = 0;
239 for (auto& m : memlist_) {
240 total_num += m->num_deletes();
241 }
242 return total_num;
243}
244
245SequenceNumber MemTableListVersion::GetEarliestSequenceNumber(
246 bool include_history) const {
247 if (include_history && !memlist_history_.empty()) {
248 return memlist_history_.back()->GetEarliestSequenceNumber();
249 } else if (!memlist_.empty()) {
250 return memlist_.back()->GetEarliestSequenceNumber();
251 } else {
252 return kMaxSequenceNumber;
253 }
254}
255
256// caller is responsible for referencing m
257void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
258 assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
259 AddMemTable(m);
260
f67539c2 261 TrimHistory(to_delete, m->ApproximateMemoryUsage());
7c673cae
FG
262}
263
264// Removes m from list of memtables not flushed. Caller should NOT Unref m.
265void MemTableListVersion::Remove(MemTable* m,
266 autovector<MemTable*>* to_delete) {
267 assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
268 memlist_.remove(m);
269
11fdf7f2 270 m->MarkFlushed();
f67539c2
TL
271 if (max_write_buffer_size_to_maintain_ > 0 ||
272 max_write_buffer_number_to_maintain_ > 0) {
7c673cae 273 memlist_history_.push_front(m);
f67539c2
TL
274 // Unable to get size of mutable memtable at this point, pass 0 to
275 // TrimHistory as a best effort.
276 TrimHistory(to_delete, 0);
7c673cae
FG
277 } else {
278 UnrefMemTable(to_delete, m);
279 }
280}
281
f67539c2
TL
282// return the total memory usage assuming the oldest flushed memtable is dropped
283size_t MemTableListVersion::ApproximateMemoryUsageExcludingLast() const {
284 size_t total_memtable_size = 0;
285 for (auto& memtable : memlist_) {
286 total_memtable_size += memtable->ApproximateMemoryUsage();
287 }
288 for (auto& memtable : memlist_history_) {
289 total_memtable_size += memtable->ApproximateMemoryUsage();
290 }
291 if (!memlist_history_.empty()) {
292 total_memtable_size -= memlist_history_.back()->ApproximateMemoryUsage();
293 }
294 return total_memtable_size;
295}
296
297bool MemTableListVersion::MemtableLimitExceeded(size_t usage) {
298 if (max_write_buffer_size_to_maintain_ > 0) {
299 // calculate the total memory usage after dropping the oldest flushed
300 // memtable, compare with max_write_buffer_size_to_maintain_ to decide
301 // whether to trim history
302 return ApproximateMemoryUsageExcludingLast() + usage >=
303 static_cast<size_t>(max_write_buffer_size_to_maintain_);
304 } else if (max_write_buffer_number_to_maintain_ > 0) {
305 return memlist_.size() + memlist_history_.size() >
306 static_cast<size_t>(max_write_buffer_number_to_maintain_);
307 } else {
308 return false;
309 }
310}
311
7c673cae 312// Make sure we don't use up too much space in history
f67539c2
TL
313void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete,
314 size_t usage) {
315 while (MemtableLimitExceeded(usage) && !memlist_history_.empty()) {
7c673cae
FG
316 MemTable* x = memlist_history_.back();
317 memlist_history_.pop_back();
318
319 UnrefMemTable(to_delete, x);
320 }
321}
322
323// Returns true if there is at least one memtable on which flush has
324// not yet started.
325bool MemTableList::IsFlushPending() const {
494da23a 326 if ((flush_requested_ && num_flush_not_started_ > 0) ||
7c673cae
FG
327 (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
328 assert(imm_flush_needed.load(std::memory_order_relaxed));
329 return true;
330 }
331 return false;
332}
333
334// Returns the memtables that need to be flushed.
494da23a
TL
335void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id,
336 autovector<MemTable*>* ret) {
7c673cae
FG
337 AutoThreadOperationStageUpdater stage_updater(
338 ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
339 const auto& memlist = current_->memlist_;
f67539c2 340 bool atomic_flush = false;
7c673cae
FG
341 for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
342 MemTable* m = *it;
f67539c2
TL
343 if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) {
344 atomic_flush = true;
345 }
494da23a
TL
346 if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) {
347 break;
348 }
7c673cae
FG
349 if (!m->flush_in_progress_) {
350 assert(!m->flush_completed_);
351 num_flush_not_started_--;
352 if (num_flush_not_started_ == 0) {
353 imm_flush_needed.store(false, std::memory_order_release);
354 }
355 m->flush_in_progress_ = true; // flushing will start very soon
356 ret->push_back(m);
357 }
358 }
f67539c2
TL
359 if (!atomic_flush || num_flush_not_started_ == 0) {
360 flush_requested_ = false; // start-flush request is complete
361 }
7c673cae
FG
362}
363
364void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
11fdf7f2 365 uint64_t /*file_number*/) {
7c673cae
FG
366 AutoThreadOperationStageUpdater stage_updater(
367 ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
368 assert(!mems.empty());
369
370 // If the flush was not successful, then just reset state.
371 // Maybe a succeeding attempt to flush will be successful.
372 for (MemTable* m : mems) {
373 assert(m->flush_in_progress_);
374 assert(m->file_number_ == 0);
375
376 m->flush_in_progress_ = false;
377 m->flush_completed_ = false;
378 m->edit_.Clear();
379 num_flush_not_started_++;
380 }
381 imm_flush_needed.store(true, std::memory_order_release);
382}
383
494da23a
TL
384// Try record a successful flush in the manifest file. It might just return
385// Status::OK letting a concurrent flush to do actual the recording..
386Status MemTableList::TryInstallMemtableFlushResults(
7c673cae 387 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
11fdf7f2
TL
388 const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker,
389 VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
390 autovector<MemTable*>* to_delete, Directory* db_directory,
f67539c2
TL
391 LogBuffer* log_buffer,
392 std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info) {
7c673cae
FG
393 AutoThreadOperationStageUpdater stage_updater(
394 ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
395 mu->AssertHeld();
396
494da23a
TL
397 // Flush was successful
398 // Record the status on the memtable object. Either this call or a call by a
399 // concurrent flush thread will read the status and write it to manifest.
7c673cae
FG
400 for (size_t i = 0; i < mems.size(); ++i) {
401 // All the edits are associated with the first memtable of this batch.
402 assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
403
404 mems[i]->flush_completed_ = true;
405 mems[i]->file_number_ = file_number;
406 }
407
408 // if some other thread is already committing, then return
409 Status s;
410 if (commit_in_progress_) {
494da23a 411 TEST_SYNC_POINT("MemTableList::TryInstallMemtableFlushResults:InProgress");
7c673cae
FG
412 return s;
413 }
414
415 // Only a single thread can be executing this piece of code
416 commit_in_progress_ = true;
417
418 // Retry until all completed flushes are committed. New flushes can finish
419 // while the current thread is writing manifest where mutex is released.
420 while (s.ok()) {
421 auto& memlist = current_->memlist_;
494da23a
TL
422 // The back is the oldest; if flush_completed_ is not set to it, it means
423 // that we were assigned a more recent memtable. The memtables' flushes must
424 // be recorded in manifest in order. A concurrent flush thread, who is
425 // assigned to flush the oldest memtable, will later wake up and does all
426 // the pending writes to manifest, in order.
7c673cae
FG
427 if (memlist.empty() || !memlist.back()->flush_completed_) {
428 break;
429 }
430 // scan all memtables from the earliest, and commit those
494da23a 431 // (in that order) that have finished flushing. Memtables
7c673cae
FG
432 // are always committed in the order that they were created.
433 uint64_t batch_file_number = 0;
434 size_t batch_count = 0;
435 autovector<VersionEdit*> edit_list;
11fdf7f2 436 autovector<MemTable*> memtables_to_flush;
7c673cae
FG
437 // enumerate from the last (earliest) element to see how many batch finished
438 for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
439 MemTable* m = *it;
440 if (!m->flush_completed_) {
441 break;
442 }
443 if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
444 batch_file_number = m->file_number_;
445 ROCKS_LOG_BUFFER(log_buffer,
446 "[%s] Level-0 commit table #%" PRIu64 " started",
447 cfd->GetName().c_str(), m->file_number_);
448 edit_list.push_back(&m->edit_);
11fdf7f2 449 memtables_to_flush.push_back(m);
f67539c2
TL
450#ifndef ROCKSDB_LITE
451 std::unique_ptr<FlushJobInfo> info = m->ReleaseFlushJobInfo();
452 if (info != nullptr) {
453 committed_flush_jobs_info->push_back(std::move(info));
454 }
455#else
456 (void)committed_flush_jobs_info;
457#endif // !ROCKSDB_LITE
7c673cae
FG
458 }
459 batch_count++;
460 }
461
494da23a 462 // TODO(myabandeh): Not sure how batch_count could be 0 here.
7c673cae 463 if (batch_count > 0) {
11fdf7f2
TL
464 if (vset->db_options()->allow_2pc) {
465 assert(edit_list.size() > 0);
466 // We piggyback the information of earliest log file to keep in the
467 // manifest entry for the last file flushed.
468 edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep(
469 vset, *cfd, edit_list, memtables_to_flush, prep_tracker));
470 }
471
7c673cae
FG
472 // this can release and reacquire the mutex.
473 s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
474 db_directory);
475
476 // we will be changing the version in the next code path,
477 // so we better create a new one, since versions are immutable
478 InstallNewVersion();
479
480 // All the later memtables that have the same filenum
481 // are part of the same batch. They can be committed now.
482 uint64_t mem_id = 1; // how many memtables have been flushed.
11fdf7f2
TL
483
484 // commit new state only if the column family is NOT dropped.
485 // The reason is as follows (refer to
486 // ColumnFamilyTest.FlushAndDropRaceCondition).
487 // If the column family is dropped, then according to LogAndApply, its
494da23a 488 // corresponding flush operation is NOT written to the MANIFEST. This
11fdf7f2
TL
489 // means the DB is not aware of the L0 files generated from the flush.
490 // By committing the new state, we remove the memtable from the memtable
491 // list. Creating an iterator on this column family will not be able to
492 // read full data since the memtable is removed, and the DB is not aware
493 // of the L0 files, causing MergingIterator unable to build child
494 // iterators. RocksDB contract requires that the iterator can be created
495 // on a dropped column family, and we must be able to
496 // read full data as long as column family handle is not deleted, even if
497 // the column family is dropped.
498 if (s.ok() && !cfd->IsDropped()) { // commit new state
7c673cae
FG
499 while (batch_count-- > 0) {
500 MemTable* m = current_->memlist_.back();
501 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64
502 ": memtable #%" PRIu64 " done",
503 cfd->GetName().c_str(), m->file_number_, mem_id);
504 assert(m->file_number_ > 0);
505 current_->Remove(m, to_delete);
f67539c2
TL
506 UpdateCachedValuesFromMemTableListVersion();
507 ResetTrimHistoryNeeded();
7c673cae
FG
508 ++mem_id;
509 }
510 } else {
f67539c2 511 for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) {
7c673cae
FG
512 MemTable* m = *it;
513 // commit failed. setup state so that we can flush again.
514 ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64
515 ": memtable #%" PRIu64 " failed",
516 m->file_number_, mem_id);
517 m->flush_completed_ = false;
518 m->flush_in_progress_ = false;
519 m->edit_.Clear();
520 num_flush_not_started_++;
521 m->file_number_ = 0;
522 imm_flush_needed.store(true, std::memory_order_release);
523 ++mem_id;
524 }
525 }
526 }
527 }
528 commit_in_progress_ = false;
529 return s;
530}
531
532// New memtables are inserted at the front of the list.
533void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
534 assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
535 InstallNewVersion();
536 // this method is used to move mutable memtable into an immutable list.
537 // since mutable memtable is already refcounted by the DBImpl,
538 // and when moving to the imutable list we don't unref it,
539 // we don't have to ref the memtable here. we just take over the
540 // reference from the DBImpl.
541 current_->Add(m, to_delete);
542 m->MarkImmutable();
543 num_flush_not_started_++;
544 if (num_flush_not_started_ == 1) {
545 imm_flush_needed.store(true, std::memory_order_release);
546 }
f67539c2
TL
547 UpdateCachedValuesFromMemTableListVersion();
548 ResetTrimHistoryNeeded();
549}
550
551void MemTableList::TrimHistory(autovector<MemTable*>* to_delete, size_t usage) {
552 InstallNewVersion();
553 current_->TrimHistory(to_delete, usage);
554 UpdateCachedValuesFromMemTableListVersion();
555 ResetTrimHistoryNeeded();
7c673cae
FG
556}
557
558// Returns an estimate of the number of bytes of data in use.
559size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
560 size_t total_size = 0;
561 for (auto& memtable : current_->memlist_) {
562 total_size += memtable->ApproximateMemoryUsage();
563 }
564 return total_size;
565}
566
567size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
568
f67539c2
TL
569size_t MemTableList::ApproximateMemoryUsageExcludingLast() const {
570 const size_t usage =
571 current_memory_usage_excluding_last_.load(std::memory_order_relaxed);
572 return usage;
573}
574
575bool MemTableList::HasHistory() const {
576 const bool has_history = current_has_history_.load(std::memory_order_relaxed);
577 return has_history;
578}
579
580void MemTableList::UpdateCachedValuesFromMemTableListVersion() {
581 const size_t total_memtable_size =
582 current_->ApproximateMemoryUsageExcludingLast();
583 current_memory_usage_excluding_last_.store(total_memtable_size,
584 std::memory_order_relaxed);
585
586 const bool has_history = current_->HasHistory();
587 current_has_history_.store(has_history, std::memory_order_relaxed);
588}
589
11fdf7f2
TL
590uint64_t MemTableList::ApproximateOldestKeyTime() const {
591 if (!current_->memlist_.empty()) {
592 return current_->memlist_.back()->ApproximateOldestKeyTime();
593 }
594 return std::numeric_limits<uint64_t>::max();
595}
596
7c673cae
FG
597void MemTableList::InstallNewVersion() {
598 if (current_->refs_ == 1) {
599 // we're the only one using the version, just keep using it
600 } else {
601 // somebody else holds the current version, we need to create new one
602 MemTableListVersion* version = current_;
603 current_ = new MemTableListVersion(&current_memory_usage_, current_);
604 current_->Ref();
605 version->Unref();
606 }
607}
608
11fdf7f2
TL
609uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
610 const autovector<MemTable*>& memtables_to_flush) {
7c673cae
FG
611 uint64_t min_log = 0;
612
613 for (auto& m : current_->memlist_) {
11fdf7f2
TL
614 // Assume the list is very short, we can live with O(m*n). We can optimize
615 // if the performance has some problem.
616 bool should_skip = false;
617 for (MemTable* m_to_flush : memtables_to_flush) {
618 if (m == m_to_flush) {
619 should_skip = true;
620 break;
621 }
622 }
623 if (should_skip) {
7c673cae
FG
624 continue;
625 }
626
627 auto log = m->GetMinLogContainingPrepSection();
628
629 if (log > 0 && (min_log == 0 || log < min_log)) {
630 min_log = log;
631 }
632 }
633
634 return min_log;
635}
636
494da23a
TL
637// Commit a successful atomic flush in the manifest file.
638Status InstallMemtableAtomicFlushResults(
639 const autovector<MemTableList*>* imm_lists,
640 const autovector<ColumnFamilyData*>& cfds,
641 const autovector<const MutableCFOptions*>& mutable_cf_options_list,
642 const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
643 InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
644 autovector<MemTable*>* to_delete, Directory* db_directory,
645 LogBuffer* log_buffer) {
646 AutoThreadOperationStageUpdater stage_updater(
647 ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
648 mu->AssertHeld();
649
650 size_t num = mems_list.size();
651 assert(cfds.size() == num);
652 if (imm_lists != nullptr) {
653 assert(imm_lists->size() == num);
654 }
655 for (size_t k = 0; k != num; ++k) {
656#ifndef NDEBUG
657 const auto* imm =
658 (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
659 if (!mems_list[k]->empty()) {
660 assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID());
661 }
662#endif
663 assert(nullptr != file_metas[k]);
664 for (size_t i = 0; i != mems_list[k]->size(); ++i) {
665 assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0);
666 (*mems_list[k])[i]->SetFlushCompleted(true);
667 (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
668 }
669 }
670
671 Status s;
672
673 autovector<autovector<VersionEdit*>> edit_lists;
674 uint32_t num_entries = 0;
675 for (const auto mems : mems_list) {
676 assert(mems != nullptr);
677 autovector<VersionEdit*> edits;
678 assert(!mems->empty());
679 edits.emplace_back((*mems)[0]->GetEdits());
680 ++num_entries;
681 edit_lists.emplace_back(edits);
682 }
683 // Mark the version edits as an atomic group if the number of version edits
684 // exceeds 1.
685 if (cfds.size() > 1) {
686 for (auto& edits : edit_lists) {
687 assert(edits.size() == 1);
688 edits[0]->MarkAtomicGroup(--num_entries);
689 }
690 assert(0 == num_entries);
691 }
692
693 // this can release and reacquire the mutex.
694 s = vset->LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu,
695 db_directory);
696
697 for (size_t k = 0; k != cfds.size(); ++k) {
698 auto* imm = (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
699 imm->InstallNewVersion();
700 }
701
f67539c2 702 if (s.ok() || s.IsColumnFamilyDropped()) {
494da23a
TL
703 for (size_t i = 0; i != cfds.size(); ++i) {
704 if (cfds[i]->IsDropped()) {
705 continue;
706 }
707 auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
708 for (auto m : *mems_list[i]) {
709 assert(m->GetFileNumber() > 0);
710 uint64_t mem_id = m->GetID();
711 ROCKS_LOG_BUFFER(log_buffer,
712 "[%s] Level-0 commit table #%" PRIu64
713 ": memtable #%" PRIu64 " done",
714 cfds[i]->GetName().c_str(), m->GetFileNumber(),
715 mem_id);
716 imm->current_->Remove(m, to_delete);
f67539c2
TL
717 imm->UpdateCachedValuesFromMemTableListVersion();
718 imm->ResetTrimHistoryNeeded();
494da23a
TL
719 }
720 }
721 } else {
722 for (size_t i = 0; i != cfds.size(); ++i) {
723 auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
724 for (auto m : *mems_list[i]) {
725 uint64_t mem_id = m->GetID();
726 ROCKS_LOG_BUFFER(log_buffer,
727 "[%s] Level-0 commit table #%" PRIu64
728 ": memtable #%" PRIu64 " failed",
729 cfds[i]->GetName().c_str(), m->GetFileNumber(),
730 mem_id);
731 m->SetFlushCompleted(false);
732 m->SetFlushInProgress(false);
733 m->GetEdits()->Clear();
734 m->SetFileNumber(0);
735 imm->num_flush_not_started_++;
736 }
737 imm->imm_flush_needed.store(true, std::memory_order_release);
738 }
739 }
740
741 return s;
742}
743
f67539c2
TL
744void MemTableList::RemoveOldMemTables(uint64_t log_number,
745 autovector<MemTable*>* to_delete) {
746 assert(to_delete != nullptr);
747 InstallNewVersion();
748 auto& memlist = current_->memlist_;
749 autovector<MemTable*> old_memtables;
750 for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
751 MemTable* mem = *it;
752 if (mem->GetNextLogNumber() > log_number) {
753 break;
754 }
755 old_memtables.push_back(mem);
756 }
757
758 for (auto it = old_memtables.begin(); it != old_memtables.end(); ++it) {
759 MemTable* mem = *it;
760 current_->Remove(mem, to_delete);
761 --num_flush_not_started_;
762 if (0 == num_flush_not_started_) {
763 imm_flush_needed.store(false, std::memory_order_release);
764 }
765 }
766
767 UpdateCachedValuesFromMemTableListVersion();
768 ResetTrimHistoryNeeded();
769}
770
771} // namespace ROCKSDB_NAMESPACE