]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/memtable_list.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / memtable_list.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6#include "db/memtable_list.h"
7
8#ifndef __STDC_FORMAT_MACROS
9#define __STDC_FORMAT_MACROS
10#endif
11
12#include <inttypes.h>
11fdf7f2 13#include <limits>
494da23a 14#include <queue>
7c673cae 15#include <string>
11fdf7f2 16#include "db/db_impl.h"
7c673cae 17#include "db/memtable.h"
494da23a 18#include "db/range_tombstone_fragmenter.h"
7c673cae
FG
19#include "db/version_set.h"
20#include "monitoring/thread_status_util.h"
21#include "rocksdb/db.h"
22#include "rocksdb/env.h"
23#include "rocksdb/iterator.h"
24#include "table/merging_iterator.h"
25#include "util/coding.h"
26#include "util/log_buffer.h"
27#include "util/sync_point.h"
28
29namespace rocksdb {
30
31class InternalKeyComparator;
32class Mutex;
33class VersionSet;
34
35void MemTableListVersion::AddMemTable(MemTable* m) {
36 memlist_.push_front(m);
37 *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage();
38}
39
40void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete,
41 MemTable* m) {
42 if (m->Unref()) {
43 to_delete->push_back(m);
44 assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage());
45 *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage();
7c673cae
FG
46 }
47}
48
49MemTableListVersion::MemTableListVersion(
50 size_t* parent_memtable_list_memory_usage, MemTableListVersion* old)
51 : max_write_buffer_number_to_maintain_(
52 old->max_write_buffer_number_to_maintain_),
53 parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {
54 if (old != nullptr) {
55 memlist_ = old->memlist_;
56 for (auto& m : memlist_) {
57 m->Ref();
58 }
59
60 memlist_history_ = old->memlist_history_;
61 for (auto& m : memlist_history_) {
62 m->Ref();
63 }
64 }
65}
66
67MemTableListVersion::MemTableListVersion(
68 size_t* parent_memtable_list_memory_usage,
69 int max_write_buffer_number_to_maintain)
70 : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain),
71 parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {}
72
73void MemTableListVersion::Ref() { ++refs_; }
74
75// called by superversion::clean()
76void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
77 assert(refs_ >= 1);
78 --refs_;
79 if (refs_ == 0) {
80 // if to_delete is equal to nullptr it means we're confident
81 // that refs_ will not be zero
82 assert(to_delete != nullptr);
83 for (const auto& m : memlist_) {
84 UnrefMemTable(to_delete, m);
85 }
86 for (const auto& m : memlist_history_) {
87 UnrefMemTable(to_delete, m);
88 }
89 delete this;
90 }
91}
92
93int MemTableList::NumNotFlushed() const {
94 int size = static_cast<int>(current_->memlist_.size());
95 assert(num_flush_not_started_ <= size);
96 return size;
97}
98
99int MemTableList::NumFlushed() const {
100 return static_cast<int>(current_->memlist_history_.size());
101}
102
103// Search all the memtables starting from the most recent one.
104// Return the most recent value found, if any.
105// Operands stores the list of merge operations to apply, so far.
106bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
107 Status* s, MergeContext* merge_context,
494da23a 108 SequenceNumber* max_covering_tombstone_seq,
11fdf7f2
TL
109 SequenceNumber* seq, const ReadOptions& read_opts,
110 ReadCallback* callback, bool* is_blob_index) {
494da23a
TL
111 return GetFromList(&memlist_, key, value, s, merge_context,
112 max_covering_tombstone_seq, seq, read_opts, callback,
113 is_blob_index);
7c673cae
FG
114}
115
11fdf7f2
TL
116bool MemTableListVersion::GetFromHistory(
117 const LookupKey& key, std::string* value, Status* s,
494da23a 118 MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq,
11fdf7f2 119 SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) {
7c673cae 120 return GetFromList(&memlist_history_, key, value, s, merge_context,
494da23a
TL
121 max_covering_tombstone_seq, seq, read_opts,
122 nullptr /*read_callback*/, is_blob_index);
7c673cae
FG
123}
124
11fdf7f2
TL
125bool MemTableListVersion::GetFromList(
126 std::list<MemTable*>* list, const LookupKey& key, std::string* value,
494da23a
TL
127 Status* s, MergeContext* merge_context,
128 SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
129 const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) {
7c673cae
FG
130 *seq = kMaxSequenceNumber;
131
132 for (auto& memtable : *list) {
133 SequenceNumber current_seq = kMaxSequenceNumber;
134
494da23a
TL
135 bool done =
136 memtable->Get(key, value, s, merge_context, max_covering_tombstone_seq,
137 &current_seq, read_opts, callback, is_blob_index);
7c673cae
FG
138 if (*seq == kMaxSequenceNumber) {
139 // Store the most recent sequence number of any operation on this key.
140 // Since we only care about the most recent change, we only need to
141 // return the first operation found when searching memtables in
142 // reverse-chronological order.
11fdf7f2
TL
143 // current_seq would be equal to kMaxSequenceNumber if the value was to be
144 // skipped. This allows seq to be assigned again when the next value is
145 // read.
7c673cae
FG
146 *seq = current_seq;
147 }
148
149 if (done) {
494da23a 150 assert(*seq != kMaxSequenceNumber || s->IsNotFound());
7c673cae
FG
151 return true;
152 }
153 if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) {
154 return false;
155 }
156 }
157 return false;
158}
159
160Status MemTableListVersion::AddRangeTombstoneIterators(
11fdf7f2 161 const ReadOptions& read_opts, Arena* /*arena*/,
7c673cae
FG
162 RangeDelAggregator* range_del_agg) {
163 assert(range_del_agg != nullptr);
164 for (auto& m : memlist_) {
494da23a
TL
165 // Using kMaxSequenceNumber is OK because these are immutable memtables.
166 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
167 m->NewRangeTombstoneIterator(read_opts,
168 kMaxSequenceNumber /* read_seq */));
169 range_del_agg->AddTombstones(std::move(range_del_iter));
11fdf7f2
TL
170 }
171 return Status::OK();
172}
173
7c673cae
FG
174void MemTableListVersion::AddIterators(
175 const ReadOptions& options, std::vector<InternalIterator*>* iterator_list,
176 Arena* arena) {
177 for (auto& m : memlist_) {
178 iterator_list->push_back(m->NewIterator(options, arena));
179 }
180}
181
182void MemTableListVersion::AddIterators(
183 const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) {
184 for (auto& m : memlist_) {
185 merge_iter_builder->AddIterator(
186 m->NewIterator(options, merge_iter_builder->GetArena()));
187 }
188}
189
190uint64_t MemTableListVersion::GetTotalNumEntries() const {
191 uint64_t total_num = 0;
192 for (auto& m : memlist_) {
193 total_num += m->num_entries();
194 }
195 return total_num;
196}
197
198MemTable::MemTableStats MemTableListVersion::ApproximateStats(
199 const Slice& start_ikey, const Slice& end_ikey) {
200 MemTable::MemTableStats total_stats = {0, 0};
201 for (auto& m : memlist_) {
202 auto mStats = m->ApproximateStats(start_ikey, end_ikey);
203 total_stats.size += mStats.size;
204 total_stats.count += mStats.count;
205 }
206 return total_stats;
207}
208
209uint64_t MemTableListVersion::GetTotalNumDeletes() const {
210 uint64_t total_num = 0;
211 for (auto& m : memlist_) {
212 total_num += m->num_deletes();
213 }
214 return total_num;
215}
216
217SequenceNumber MemTableListVersion::GetEarliestSequenceNumber(
218 bool include_history) const {
219 if (include_history && !memlist_history_.empty()) {
220 return memlist_history_.back()->GetEarliestSequenceNumber();
221 } else if (!memlist_.empty()) {
222 return memlist_.back()->GetEarliestSequenceNumber();
223 } else {
224 return kMaxSequenceNumber;
225 }
226}
227
228// caller is responsible for referencing m
229void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
230 assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
231 AddMemTable(m);
232
233 TrimHistory(to_delete);
234}
235
236// Removes m from list of memtables not flushed. Caller should NOT Unref m.
237void MemTableListVersion::Remove(MemTable* m,
238 autovector<MemTable*>* to_delete) {
239 assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
240 memlist_.remove(m);
241
11fdf7f2 242 m->MarkFlushed();
7c673cae
FG
243 if (max_write_buffer_number_to_maintain_ > 0) {
244 memlist_history_.push_front(m);
245 TrimHistory(to_delete);
246 } else {
247 UnrefMemTable(to_delete, m);
248 }
249}
250
251// Make sure we don't use up too much space in history
252void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete) {
253 while (memlist_.size() + memlist_history_.size() >
254 static_cast<size_t>(max_write_buffer_number_to_maintain_) &&
255 !memlist_history_.empty()) {
256 MemTable* x = memlist_history_.back();
257 memlist_history_.pop_back();
258
259 UnrefMemTable(to_delete, x);
260 }
261}
262
263// Returns true if there is at least one memtable on which flush has
264// not yet started.
265bool MemTableList::IsFlushPending() const {
494da23a 266 if ((flush_requested_ && num_flush_not_started_ > 0) ||
7c673cae
FG
267 (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
268 assert(imm_flush_needed.load(std::memory_order_relaxed));
269 return true;
270 }
271 return false;
272}
273
274// Returns the memtables that need to be flushed.
494da23a
TL
275void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id,
276 autovector<MemTable*>* ret) {
7c673cae
FG
277 AutoThreadOperationStageUpdater stage_updater(
278 ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
279 const auto& memlist = current_->memlist_;
280 for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
281 MemTable* m = *it;
494da23a
TL
282 if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) {
283 break;
284 }
7c673cae
FG
285 if (!m->flush_in_progress_) {
286 assert(!m->flush_completed_);
287 num_flush_not_started_--;
288 if (num_flush_not_started_ == 0) {
289 imm_flush_needed.store(false, std::memory_order_release);
290 }
291 m->flush_in_progress_ = true; // flushing will start very soon
292 ret->push_back(m);
293 }
294 }
295 flush_requested_ = false; // start-flush request is complete
296}
297
298void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
11fdf7f2 299 uint64_t /*file_number*/) {
7c673cae
FG
300 AutoThreadOperationStageUpdater stage_updater(
301 ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
302 assert(!mems.empty());
303
304 // If the flush was not successful, then just reset state.
305 // Maybe a succeeding attempt to flush will be successful.
306 for (MemTable* m : mems) {
307 assert(m->flush_in_progress_);
308 assert(m->file_number_ == 0);
309
310 m->flush_in_progress_ = false;
311 m->flush_completed_ = false;
312 m->edit_.Clear();
313 num_flush_not_started_++;
314 }
315 imm_flush_needed.store(true, std::memory_order_release);
316}
317
494da23a
TL
318// Try record a successful flush in the manifest file. It might just return
319// Status::OK letting a concurrent flush to do actual the recording..
320Status MemTableList::TryInstallMemtableFlushResults(
7c673cae 321 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
11fdf7f2
TL
322 const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker,
323 VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
324 autovector<MemTable*>* to_delete, Directory* db_directory,
325 LogBuffer* log_buffer) {
7c673cae
FG
326 AutoThreadOperationStageUpdater stage_updater(
327 ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
328 mu->AssertHeld();
329
494da23a
TL
330 // Flush was successful
331 // Record the status on the memtable object. Either this call or a call by a
332 // concurrent flush thread will read the status and write it to manifest.
7c673cae
FG
333 for (size_t i = 0; i < mems.size(); ++i) {
334 // All the edits are associated with the first memtable of this batch.
335 assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
336
337 mems[i]->flush_completed_ = true;
338 mems[i]->file_number_ = file_number;
339 }
340
341 // if some other thread is already committing, then return
342 Status s;
343 if (commit_in_progress_) {
494da23a 344 TEST_SYNC_POINT("MemTableList::TryInstallMemtableFlushResults:InProgress");
7c673cae
FG
345 return s;
346 }
347
348 // Only a single thread can be executing this piece of code
349 commit_in_progress_ = true;
350
351 // Retry until all completed flushes are committed. New flushes can finish
352 // while the current thread is writing manifest where mutex is released.
353 while (s.ok()) {
354 auto& memlist = current_->memlist_;
494da23a
TL
355 // The back is the oldest; if flush_completed_ is not set to it, it means
356 // that we were assigned a more recent memtable. The memtables' flushes must
357 // be recorded in manifest in order. A concurrent flush thread, who is
358 // assigned to flush the oldest memtable, will later wake up and does all
359 // the pending writes to manifest, in order.
7c673cae
FG
360 if (memlist.empty() || !memlist.back()->flush_completed_) {
361 break;
362 }
363 // scan all memtables from the earliest, and commit those
494da23a 364 // (in that order) that have finished flushing. Memtables
7c673cae
FG
365 // are always committed in the order that they were created.
366 uint64_t batch_file_number = 0;
367 size_t batch_count = 0;
368 autovector<VersionEdit*> edit_list;
11fdf7f2 369 autovector<MemTable*> memtables_to_flush;
7c673cae
FG
370 // enumerate from the last (earliest) element to see how many batch finished
371 for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
372 MemTable* m = *it;
373 if (!m->flush_completed_) {
374 break;
375 }
376 if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
377 batch_file_number = m->file_number_;
378 ROCKS_LOG_BUFFER(log_buffer,
379 "[%s] Level-0 commit table #%" PRIu64 " started",
380 cfd->GetName().c_str(), m->file_number_);
381 edit_list.push_back(&m->edit_);
11fdf7f2 382 memtables_to_flush.push_back(m);
7c673cae
FG
383 }
384 batch_count++;
385 }
386
494da23a 387 // TODO(myabandeh): Not sure how batch_count could be 0 here.
7c673cae 388 if (batch_count > 0) {
11fdf7f2
TL
389 if (vset->db_options()->allow_2pc) {
390 assert(edit_list.size() > 0);
391 // We piggyback the information of earliest log file to keep in the
392 // manifest entry for the last file flushed.
393 edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep(
394 vset, *cfd, edit_list, memtables_to_flush, prep_tracker));
395 }
396
7c673cae
FG
397 // this can release and reacquire the mutex.
398 s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
399 db_directory);
400
401 // we will be changing the version in the next code path,
402 // so we better create a new one, since versions are immutable
403 InstallNewVersion();
404
405 // All the later memtables that have the same filenum
406 // are part of the same batch. They can be committed now.
407 uint64_t mem_id = 1; // how many memtables have been flushed.
11fdf7f2
TL
408
409 // commit new state only if the column family is NOT dropped.
410 // The reason is as follows (refer to
411 // ColumnFamilyTest.FlushAndDropRaceCondition).
412 // If the column family is dropped, then according to LogAndApply, its
494da23a 413 // corresponding flush operation is NOT written to the MANIFEST. This
11fdf7f2
TL
414 // means the DB is not aware of the L0 files generated from the flush.
415 // By committing the new state, we remove the memtable from the memtable
416 // list. Creating an iterator on this column family will not be able to
417 // read full data since the memtable is removed, and the DB is not aware
418 // of the L0 files, causing MergingIterator unable to build child
419 // iterators. RocksDB contract requires that the iterator can be created
420 // on a dropped column family, and we must be able to
421 // read full data as long as column family handle is not deleted, even if
422 // the column family is dropped.
423 if (s.ok() && !cfd->IsDropped()) { // commit new state
7c673cae
FG
424 while (batch_count-- > 0) {
425 MemTable* m = current_->memlist_.back();
426 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64
427 ": memtable #%" PRIu64 " done",
428 cfd->GetName().c_str(), m->file_number_, mem_id);
429 assert(m->file_number_ > 0);
430 current_->Remove(m, to_delete);
431 ++mem_id;
432 }
433 } else {
434 for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) {
435 MemTable* m = *it;
436 // commit failed. setup state so that we can flush again.
437 ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64
438 ": memtable #%" PRIu64 " failed",
439 m->file_number_, mem_id);
440 m->flush_completed_ = false;
441 m->flush_in_progress_ = false;
442 m->edit_.Clear();
443 num_flush_not_started_++;
444 m->file_number_ = 0;
445 imm_flush_needed.store(true, std::memory_order_release);
446 ++mem_id;
447 }
448 }
449 }
450 }
451 commit_in_progress_ = false;
452 return s;
453}
454
455// New memtables are inserted at the front of the list.
456void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
457 assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
458 InstallNewVersion();
459 // this method is used to move mutable memtable into an immutable list.
460 // since mutable memtable is already refcounted by the DBImpl,
461 // and when moving to the imutable list we don't unref it,
462 // we don't have to ref the memtable here. we just take over the
463 // reference from the DBImpl.
464 current_->Add(m, to_delete);
465 m->MarkImmutable();
466 num_flush_not_started_++;
467 if (num_flush_not_started_ == 1) {
468 imm_flush_needed.store(true, std::memory_order_release);
469 }
470}
471
472// Returns an estimate of the number of bytes of data in use.
473size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
474 size_t total_size = 0;
475 for (auto& memtable : current_->memlist_) {
476 total_size += memtable->ApproximateMemoryUsage();
477 }
478 return total_size;
479}
480
481size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
482
11fdf7f2
TL
483uint64_t MemTableList::ApproximateOldestKeyTime() const {
484 if (!current_->memlist_.empty()) {
485 return current_->memlist_.back()->ApproximateOldestKeyTime();
486 }
487 return std::numeric_limits<uint64_t>::max();
488}
489
7c673cae
FG
490void MemTableList::InstallNewVersion() {
491 if (current_->refs_ == 1) {
492 // we're the only one using the version, just keep using it
493 } else {
494 // somebody else holds the current version, we need to create new one
495 MemTableListVersion* version = current_;
496 current_ = new MemTableListVersion(&current_memory_usage_, current_);
497 current_->Ref();
498 version->Unref();
499 }
500}
501
11fdf7f2
TL
502uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
503 const autovector<MemTable*>& memtables_to_flush) {
7c673cae
FG
504 uint64_t min_log = 0;
505
506 for (auto& m : current_->memlist_) {
11fdf7f2
TL
507 // Assume the list is very short, we can live with O(m*n). We can optimize
508 // if the performance has some problem.
509 bool should_skip = false;
510 for (MemTable* m_to_flush : memtables_to_flush) {
511 if (m == m_to_flush) {
512 should_skip = true;
513 break;
514 }
515 }
516 if (should_skip) {
7c673cae
FG
517 continue;
518 }
519
520 auto log = m->GetMinLogContainingPrepSection();
521
522 if (log > 0 && (min_log == 0 || log < min_log)) {
523 min_log = log;
524 }
525 }
526
527 return min_log;
528}
529
494da23a
TL
530// Commit a successful atomic flush in the manifest file.
531Status InstallMemtableAtomicFlushResults(
532 const autovector<MemTableList*>* imm_lists,
533 const autovector<ColumnFamilyData*>& cfds,
534 const autovector<const MutableCFOptions*>& mutable_cf_options_list,
535 const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
536 InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
537 autovector<MemTable*>* to_delete, Directory* db_directory,
538 LogBuffer* log_buffer) {
539 AutoThreadOperationStageUpdater stage_updater(
540 ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
541 mu->AssertHeld();
542
543 size_t num = mems_list.size();
544 assert(cfds.size() == num);
545 if (imm_lists != nullptr) {
546 assert(imm_lists->size() == num);
547 }
548 for (size_t k = 0; k != num; ++k) {
549#ifndef NDEBUG
550 const auto* imm =
551 (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
552 if (!mems_list[k]->empty()) {
553 assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID());
554 }
555#endif
556 assert(nullptr != file_metas[k]);
557 for (size_t i = 0; i != mems_list[k]->size(); ++i) {
558 assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0);
559 (*mems_list[k])[i]->SetFlushCompleted(true);
560 (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
561 }
562 }
563
564 Status s;
565
566 autovector<autovector<VersionEdit*>> edit_lists;
567 uint32_t num_entries = 0;
568 for (const auto mems : mems_list) {
569 assert(mems != nullptr);
570 autovector<VersionEdit*> edits;
571 assert(!mems->empty());
572 edits.emplace_back((*mems)[0]->GetEdits());
573 ++num_entries;
574 edit_lists.emplace_back(edits);
575 }
576 // Mark the version edits as an atomic group if the number of version edits
577 // exceeds 1.
578 if (cfds.size() > 1) {
579 for (auto& edits : edit_lists) {
580 assert(edits.size() == 1);
581 edits[0]->MarkAtomicGroup(--num_entries);
582 }
583 assert(0 == num_entries);
584 }
585
586 // this can release and reacquire the mutex.
587 s = vset->LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu,
588 db_directory);
589
590 for (size_t k = 0; k != cfds.size(); ++k) {
591 auto* imm = (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
592 imm->InstallNewVersion();
593 }
594
595 if (s.ok() || s.IsShutdownInProgress()) {
596 for (size_t i = 0; i != cfds.size(); ++i) {
597 if (cfds[i]->IsDropped()) {
598 continue;
599 }
600 auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
601 for (auto m : *mems_list[i]) {
602 assert(m->GetFileNumber() > 0);
603 uint64_t mem_id = m->GetID();
604 ROCKS_LOG_BUFFER(log_buffer,
605 "[%s] Level-0 commit table #%" PRIu64
606 ": memtable #%" PRIu64 " done",
607 cfds[i]->GetName().c_str(), m->GetFileNumber(),
608 mem_id);
609 imm->current_->Remove(m, to_delete);
610 }
611 }
612 } else {
613 for (size_t i = 0; i != cfds.size(); ++i) {
614 auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
615 for (auto m : *mems_list[i]) {
616 uint64_t mem_id = m->GetID();
617 ROCKS_LOG_BUFFER(log_buffer,
618 "[%s] Level-0 commit table #%" PRIu64
619 ": memtable #%" PRIu64 " failed",
620 cfds[i]->GetName().c_str(), m->GetFileNumber(),
621 mem_id);
622 m->SetFlushCompleted(false);
623 m->SetFlushInProgress(false);
624 m->GetEdits()->Clear();
625 m->SetFileNumber(0);
626 imm->num_flush_not_started_++;
627 }
628 imm->imm_flush_needed.store(true, std::memory_order_release);
629 }
630 }
631
632 return s;
633}
634
7c673cae 635} // namespace rocksdb