1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 #include "db/memtable_list.h"
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
14 #include "db/memtable.h"
15 #include "db/version_set.h"
16 #include "monitoring/thread_status_util.h"
17 #include "rocksdb/db.h"
18 #include "rocksdb/env.h"
19 #include "rocksdb/iterator.h"
20 #include "table/merging_iterator.h"
21 #include "util/coding.h"
22 #include "util/log_buffer.h"
23 #include "util/sync_point.h"
27 class InternalKeyComparator
;
31 void MemTableListVersion::AddMemTable(MemTable
* m
) {
32 memlist_
.push_front(m
);
33 *parent_memtable_list_memory_usage_
+= m
->ApproximateMemoryUsage();
36 void MemTableListVersion::UnrefMemTable(autovector
<MemTable
*>* to_delete
,
39 to_delete
->push_back(m
);
40 assert(*parent_memtable_list_memory_usage_
>= m
->ApproximateMemoryUsage());
41 *parent_memtable_list_memory_usage_
-= m
->ApproximateMemoryUsage();
46 MemTableListVersion::MemTableListVersion(
47 size_t* parent_memtable_list_memory_usage
, MemTableListVersion
* old
)
48 : max_write_buffer_number_to_maintain_(
49 old
->max_write_buffer_number_to_maintain_
),
50 parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage
) {
52 memlist_
= old
->memlist_
;
53 for (auto& m
: memlist_
) {
57 memlist_history_
= old
->memlist_history_
;
58 for (auto& m
: memlist_history_
) {
64 MemTableListVersion::MemTableListVersion(
65 size_t* parent_memtable_list_memory_usage
,
66 int max_write_buffer_number_to_maintain
)
67 : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain
),
68 parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage
) {}
70 void MemTableListVersion::Ref() { ++refs_
; }
72 // called by superversion::clean()
73 void MemTableListVersion::Unref(autovector
<MemTable
*>* to_delete
) {
77 // if to_delete is equal to nullptr it means we're confident
78 // that refs_ will not be zero
79 assert(to_delete
!= nullptr);
80 for (const auto& m
: memlist_
) {
81 UnrefMemTable(to_delete
, m
);
83 for (const auto& m
: memlist_history_
) {
84 UnrefMemTable(to_delete
, m
);
90 int MemTableList::NumNotFlushed() const {
91 int size
= static_cast<int>(current_
->memlist_
.size());
92 assert(num_flush_not_started_
<= size
);
96 int MemTableList::NumFlushed() const {
97 return static_cast<int>(current_
->memlist_history_
.size());
100 // Search all the memtables starting from the most recent one.
101 // Return the most recent value found, if any.
102 // Operands stores the list of merge operations to apply, so far.
103 bool MemTableListVersion::Get(const LookupKey
& key
, std::string
* value
,
104 Status
* s
, MergeContext
* merge_context
,
105 RangeDelAggregator
* range_del_agg
,
107 const ReadOptions
& read_opts
) {
108 return GetFromList(&memlist_
, key
, value
, s
, merge_context
, range_del_agg
,
112 bool MemTableListVersion::GetFromHistory(const LookupKey
& key
,
113 std::string
* value
, Status
* s
,
114 MergeContext
* merge_context
,
115 RangeDelAggregator
* range_del_agg
,
117 const ReadOptions
& read_opts
) {
118 return GetFromList(&memlist_history_
, key
, value
, s
, merge_context
,
119 range_del_agg
, seq
, read_opts
);
122 bool MemTableListVersion::GetFromList(std::list
<MemTable
*>* list
,
123 const LookupKey
& key
, std::string
* value
,
124 Status
* s
, MergeContext
* merge_context
,
125 RangeDelAggregator
* range_del_agg
,
127 const ReadOptions
& read_opts
) {
128 *seq
= kMaxSequenceNumber
;
130 for (auto& memtable
: *list
) {
131 SequenceNumber current_seq
= kMaxSequenceNumber
;
133 bool done
= memtable
->Get(key
, value
, s
, merge_context
, range_del_agg
,
134 ¤t_seq
, read_opts
);
135 if (*seq
== kMaxSequenceNumber
) {
136 // Store the most recent sequence number of any operation on this key.
137 // Since we only care about the most recent change, we only need to
138 // return the first operation found when searching memtables in
139 // reverse-chronological order.
144 assert(*seq
!= kMaxSequenceNumber
);
147 if (!done
&& !s
->ok() && !s
->IsMergeInProgress() && !s
->IsNotFound()) {
154 Status
MemTableListVersion::AddRangeTombstoneIterators(
155 const ReadOptions
& read_opts
, Arena
* arena
,
156 RangeDelAggregator
* range_del_agg
) {
157 assert(range_del_agg
!= nullptr);
158 for (auto& m
: memlist_
) {
159 std::unique_ptr
<InternalIterator
> range_del_iter(
160 m
->NewRangeTombstoneIterator(read_opts
));
161 Status s
= range_del_agg
->AddTombstones(std::move(range_del_iter
));
169 void MemTableListVersion::AddIterators(
170 const ReadOptions
& options
, std::vector
<InternalIterator
*>* iterator_list
,
172 for (auto& m
: memlist_
) {
173 iterator_list
->push_back(m
->NewIterator(options
, arena
));
177 void MemTableListVersion::AddIterators(
178 const ReadOptions
& options
, MergeIteratorBuilder
* merge_iter_builder
) {
179 for (auto& m
: memlist_
) {
180 merge_iter_builder
->AddIterator(
181 m
->NewIterator(options
, merge_iter_builder
->GetArena()));
185 uint64_t MemTableListVersion::GetTotalNumEntries() const {
186 uint64_t total_num
= 0;
187 for (auto& m
: memlist_
) {
188 total_num
+= m
->num_entries();
193 MemTable::MemTableStats
MemTableListVersion::ApproximateStats(
194 const Slice
& start_ikey
, const Slice
& end_ikey
) {
195 MemTable::MemTableStats total_stats
= {0, 0};
196 for (auto& m
: memlist_
) {
197 auto mStats
= m
->ApproximateStats(start_ikey
, end_ikey
);
198 total_stats
.size
+= mStats
.size
;
199 total_stats
.count
+= mStats
.count
;
204 uint64_t MemTableListVersion::GetTotalNumDeletes() const {
205 uint64_t total_num
= 0;
206 for (auto& m
: memlist_
) {
207 total_num
+= m
->num_deletes();
212 SequenceNumber
MemTableListVersion::GetEarliestSequenceNumber(
213 bool include_history
) const {
214 if (include_history
&& !memlist_history_
.empty()) {
215 return memlist_history_
.back()->GetEarliestSequenceNumber();
216 } else if (!memlist_
.empty()) {
217 return memlist_
.back()->GetEarliestSequenceNumber();
219 return kMaxSequenceNumber
;
223 // caller is responsible for referencing m
224 void MemTableListVersion::Add(MemTable
* m
, autovector
<MemTable
*>* to_delete
) {
225 assert(refs_
== 1); // only when refs_ == 1 is MemTableListVersion mutable
228 TrimHistory(to_delete
);
231 // Removes m from list of memtables not flushed. Caller should NOT Unref m.
232 void MemTableListVersion::Remove(MemTable
* m
,
233 autovector
<MemTable
*>* to_delete
) {
234 assert(refs_
== 1); // only when refs_ == 1 is MemTableListVersion mutable
237 if (max_write_buffer_number_to_maintain_
> 0) {
238 memlist_history_
.push_front(m
);
239 TrimHistory(to_delete
);
241 UnrefMemTable(to_delete
, m
);
245 // Make sure we don't use up too much space in history
246 void MemTableListVersion::TrimHistory(autovector
<MemTable
*>* to_delete
) {
247 while (memlist_
.size() + memlist_history_
.size() >
248 static_cast<size_t>(max_write_buffer_number_to_maintain_
) &&
249 !memlist_history_
.empty()) {
250 MemTable
* x
= memlist_history_
.back();
251 memlist_history_
.pop_back();
253 UnrefMemTable(to_delete
, x
);
257 // Returns true if there is at least one memtable on which flush has
259 bool MemTableList::IsFlushPending() const {
260 if ((flush_requested_
&& num_flush_not_started_
>= 1) ||
261 (num_flush_not_started_
>= min_write_buffer_number_to_merge_
)) {
262 assert(imm_flush_needed
.load(std::memory_order_relaxed
));
268 // Returns the memtables that need to be flushed.
269 void MemTableList::PickMemtablesToFlush(autovector
<MemTable
*>* ret
) {
270 AutoThreadOperationStageUpdater
stage_updater(
271 ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH
);
272 const auto& memlist
= current_
->memlist_
;
273 for (auto it
= memlist
.rbegin(); it
!= memlist
.rend(); ++it
) {
275 if (!m
->flush_in_progress_
) {
276 assert(!m
->flush_completed_
);
277 num_flush_not_started_
--;
278 if (num_flush_not_started_
== 0) {
279 imm_flush_needed
.store(false, std::memory_order_release
);
281 m
->flush_in_progress_
= true; // flushing will start very soon
285 flush_requested_
= false; // start-flush request is complete
288 void MemTableList::RollbackMemtableFlush(const autovector
<MemTable
*>& mems
,
289 uint64_t file_number
) {
290 AutoThreadOperationStageUpdater
stage_updater(
291 ThreadStatus::STAGE_MEMTABLE_ROLLBACK
);
292 assert(!mems
.empty());
294 // If the flush was not successful, then just reset state.
295 // Maybe a succeeding attempt to flush will be successful.
296 for (MemTable
* m
: mems
) {
297 assert(m
->flush_in_progress_
);
298 assert(m
->file_number_
== 0);
300 m
->flush_in_progress_
= false;
301 m
->flush_completed_
= false;
303 num_flush_not_started_
++;
305 imm_flush_needed
.store(true, std::memory_order_release
);
308 // Record a successful flush in the manifest file
309 Status
MemTableList::InstallMemtableFlushResults(
310 ColumnFamilyData
* cfd
, const MutableCFOptions
& mutable_cf_options
,
311 const autovector
<MemTable
*>& mems
, VersionSet
* vset
, InstrumentedMutex
* mu
,
312 uint64_t file_number
, autovector
<MemTable
*>* to_delete
,
313 Directory
* db_directory
, LogBuffer
* log_buffer
) {
314 AutoThreadOperationStageUpdater
stage_updater(
315 ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS
);
318 // flush was successful
319 for (size_t i
= 0; i
< mems
.size(); ++i
) {
320 // All the edits are associated with the first memtable of this batch.
321 assert(i
== 0 || mems
[i
]->GetEdits()->NumEntries() == 0);
323 mems
[i
]->flush_completed_
= true;
324 mems
[i
]->file_number_
= file_number
;
327 // if some other thread is already committing, then return
329 if (commit_in_progress_
) {
330 TEST_SYNC_POINT("MemTableList::InstallMemtableFlushResults:InProgress");
334 // Only a single thread can be executing this piece of code
335 commit_in_progress_
= true;
337 // Retry until all completed flushes are committed. New flushes can finish
338 // while the current thread is writing manifest where mutex is released.
340 auto& memlist
= current_
->memlist_
;
341 if (memlist
.empty() || !memlist
.back()->flush_completed_
) {
344 // scan all memtables from the earliest, and commit those
345 // (in that order) that have finished flushing. Memetables
346 // are always committed in the order that they were created.
347 uint64_t batch_file_number
= 0;
348 size_t batch_count
= 0;
349 autovector
<VersionEdit
*> edit_list
;
350 // enumerate from the last (earliest) element to see how many batch finished
351 for (auto it
= memlist
.rbegin(); it
!= memlist
.rend(); ++it
) {
353 if (!m
->flush_completed_
) {
356 if (it
== memlist
.rbegin() || batch_file_number
!= m
->file_number_
) {
357 batch_file_number
= m
->file_number_
;
358 ROCKS_LOG_BUFFER(log_buffer
,
359 "[%s] Level-0 commit table #%" PRIu64
" started",
360 cfd
->GetName().c_str(), m
->file_number_
);
361 edit_list
.push_back(&m
->edit_
);
366 if (batch_count
> 0) {
367 // this can release and reacquire the mutex.
368 s
= vset
->LogAndApply(cfd
, mutable_cf_options
, edit_list
, mu
,
371 // we will be changing the version in the next code path,
372 // so we better create a new one, since versions are immutable
375 // All the later memtables that have the same filenum
376 // are part of the same batch. They can be committed now.
377 uint64_t mem_id
= 1; // how many memtables have been flushed.
378 if (s
.ok()) { // commit new state
379 while (batch_count
-- > 0) {
380 MemTable
* m
= current_
->memlist_
.back();
381 ROCKS_LOG_BUFFER(log_buffer
, "[%s] Level-0 commit table #%" PRIu64
382 ": memtable #%" PRIu64
" done",
383 cfd
->GetName().c_str(), m
->file_number_
, mem_id
);
384 assert(m
->file_number_
> 0);
385 current_
->Remove(m
, to_delete
);
389 for (auto it
= current_
->memlist_
.rbegin(); batch_count
-- > 0; it
++) {
391 // commit failed. setup state so that we can flush again.
392 ROCKS_LOG_BUFFER(log_buffer
, "Level-0 commit table #%" PRIu64
393 ": memtable #%" PRIu64
" failed",
394 m
->file_number_
, mem_id
);
395 m
->flush_completed_
= false;
396 m
->flush_in_progress_
= false;
398 num_flush_not_started_
++;
400 imm_flush_needed
.store(true, std::memory_order_release
);
406 commit_in_progress_
= false;
410 // New memtables are inserted at the front of the list.
411 void MemTableList::Add(MemTable
* m
, autovector
<MemTable
*>* to_delete
) {
412 assert(static_cast<int>(current_
->memlist_
.size()) >= num_flush_not_started_
);
414 // this method is used to move mutable memtable into an immutable list.
415 // since mutable memtable is already refcounted by the DBImpl,
416 // and when moving to the imutable list we don't unref it,
417 // we don't have to ref the memtable here. we just take over the
418 // reference from the DBImpl.
419 current_
->Add(m
, to_delete
);
421 num_flush_not_started_
++;
422 if (num_flush_not_started_
== 1) {
423 imm_flush_needed
.store(true, std::memory_order_release
);
427 // Returns an estimate of the number of bytes of data in use.
428 size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
429 size_t total_size
= 0;
430 for (auto& memtable
: current_
->memlist_
) {
431 total_size
+= memtable
->ApproximateMemoryUsage();
436 size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_
; }
438 void MemTableList::InstallNewVersion() {
439 if (current_
->refs_
== 1) {
440 // we're the only one using the version, just keep using it
442 // somebody else holds the current version, we need to create new one
443 MemTableListVersion
* version
= current_
;
444 current_
= new MemTableListVersion(¤t_memory_usage_
, current_
);
450 uint64_t MemTableList::GetMinLogContainingPrepSection() {
451 uint64_t min_log
= 0;
453 for (auto& m
: current_
->memlist_
) {
454 // this mem has been flushed it no longer
455 // needs to hold on the its prep section
456 if (m
->flush_completed_
) {
460 auto log
= m
->GetMinLogContainingPrepSection();
462 if (log
> 0 && (min_log
== 0 || log
< min_log
)) {
470 } // namespace rocksdb