]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | #include "db/memtable_list.h" | |
7 | ||
f67539c2 | 8 | #include <cinttypes> |
11fdf7f2 | 9 | #include <limits> |
494da23a | 10 | #include <queue> |
7c673cae | 11 | #include <string> |
f67539c2 | 12 | #include "db/db_impl/db_impl.h" |
7c673cae | 13 | #include "db/memtable.h" |
494da23a | 14 | #include "db/range_tombstone_fragmenter.h" |
7c673cae | 15 | #include "db/version_set.h" |
f67539c2 | 16 | #include "logging/log_buffer.h" |
7c673cae FG |
17 | #include "monitoring/thread_status_util.h" |
18 | #include "rocksdb/db.h" | |
19 | #include "rocksdb/env.h" | |
20 | #include "rocksdb/iterator.h" | |
21 | #include "table/merging_iterator.h" | |
f67539c2 | 22 | #include "test_util/sync_point.h" |
7c673cae | 23 | #include "util/coding.h" |
7c673cae | 24 | |
f67539c2 | 25 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
26 | |
27 | class InternalKeyComparator; | |
28 | class Mutex; | |
29 | class VersionSet; | |
30 | ||
31 | void MemTableListVersion::AddMemTable(MemTable* m) { | |
32 | memlist_.push_front(m); | |
33 | *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage(); | |
34 | } | |
35 | ||
36 | void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete, | |
37 | MemTable* m) { | |
38 | if (m->Unref()) { | |
39 | to_delete->push_back(m); | |
40 | assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage()); | |
41 | *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage(); | |
7c673cae FG |
42 | } |
43 | } | |
44 | ||
45 | MemTableListVersion::MemTableListVersion( | |
46 | size_t* parent_memtable_list_memory_usage, MemTableListVersion* old) | |
47 | : max_write_buffer_number_to_maintain_( | |
48 | old->max_write_buffer_number_to_maintain_), | |
f67539c2 TL |
49 | max_write_buffer_size_to_maintain_( |
50 | old->max_write_buffer_size_to_maintain_), | |
7c673cae FG |
51 | parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) { |
52 | if (old != nullptr) { | |
53 | memlist_ = old->memlist_; | |
54 | for (auto& m : memlist_) { | |
55 | m->Ref(); | |
56 | } | |
57 | ||
58 | memlist_history_ = old->memlist_history_; | |
59 | for (auto& m : memlist_history_) { | |
60 | m->Ref(); | |
61 | } | |
62 | } | |
63 | } | |
64 | ||
65 | MemTableListVersion::MemTableListVersion( | |
66 | size_t* parent_memtable_list_memory_usage, | |
f67539c2 TL |
67 | int max_write_buffer_number_to_maintain, |
68 | int64_t max_write_buffer_size_to_maintain) | |
7c673cae | 69 | : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain), |
f67539c2 | 70 | max_write_buffer_size_to_maintain_(max_write_buffer_size_to_maintain), |
7c673cae FG |
71 | parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {} |
72 | ||
73 | void MemTableListVersion::Ref() { ++refs_; } | |
74 | ||
75 | // called by superversion::clean() | |
76 | void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) { | |
77 | assert(refs_ >= 1); | |
78 | --refs_; | |
79 | if (refs_ == 0) { | |
80 | // if to_delete is equal to nullptr it means we're confident | |
81 | // that refs_ will not be zero | |
82 | assert(to_delete != nullptr); | |
83 | for (const auto& m : memlist_) { | |
84 | UnrefMemTable(to_delete, m); | |
85 | } | |
86 | for (const auto& m : memlist_history_) { | |
87 | UnrefMemTable(to_delete, m); | |
88 | } | |
89 | delete this; | |
90 | } | |
91 | } | |
92 | ||
93 | int MemTableList::NumNotFlushed() const { | |
94 | int size = static_cast<int>(current_->memlist_.size()); | |
95 | assert(num_flush_not_started_ <= size); | |
96 | return size; | |
97 | } | |
98 | ||
99 | int MemTableList::NumFlushed() const { | |
100 | return static_cast<int>(current_->memlist_history_.size()); | |
101 | } | |
102 | ||
103 | // Search all the memtables starting from the most recent one. | |
104 | // Return the most recent value found, if any. | |
105 | // Operands stores the list of merge operations to apply, so far. | |
106 | bool MemTableListVersion::Get(const LookupKey& key, std::string* value, | |
107 | Status* s, MergeContext* merge_context, | |
494da23a | 108 | SequenceNumber* max_covering_tombstone_seq, |
11fdf7f2 TL |
109 | SequenceNumber* seq, const ReadOptions& read_opts, |
110 | ReadCallback* callback, bool* is_blob_index) { | |
494da23a TL |
111 | return GetFromList(&memlist_, key, value, s, merge_context, |
112 | max_covering_tombstone_seq, seq, read_opts, callback, | |
113 | is_blob_index); | |
7c673cae FG |
114 | } |
115 | ||
f67539c2 TL |
116 | void MemTableListVersion::MultiGet(const ReadOptions& read_options, |
117 | MultiGetRange* range, ReadCallback* callback, | |
118 | bool* is_blob) { | |
119 | for (auto memtable : memlist_) { | |
120 | memtable->MultiGet(read_options, range, callback, is_blob); | |
121 | if (range->empty()) { | |
122 | return; | |
123 | } | |
124 | } | |
125 | } | |
126 | ||
127 | bool MemTableListVersion::GetMergeOperands( | |
128 | const LookupKey& key, Status* s, MergeContext* merge_context, | |
129 | SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) { | |
130 | for (MemTable* memtable : memlist_) { | |
131 | bool done = memtable->Get(key, nullptr, s, merge_context, | |
132 | max_covering_tombstone_seq, read_opts, nullptr, | |
133 | nullptr, false); | |
134 | if (done) { | |
135 | return true; | |
136 | } | |
137 | } | |
138 | return false; | |
139 | } | |
140 | ||
11fdf7f2 TL |
141 | bool MemTableListVersion::GetFromHistory( |
142 | const LookupKey& key, std::string* value, Status* s, | |
494da23a | 143 | MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, |
11fdf7f2 | 144 | SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) { |
7c673cae | 145 | return GetFromList(&memlist_history_, key, value, s, merge_context, |
494da23a TL |
146 | max_covering_tombstone_seq, seq, read_opts, |
147 | nullptr /*read_callback*/, is_blob_index); | |
7c673cae FG |
148 | } |
149 | ||
11fdf7f2 TL |
150 | bool MemTableListVersion::GetFromList( |
151 | std::list<MemTable*>* list, const LookupKey& key, std::string* value, | |
494da23a TL |
152 | Status* s, MergeContext* merge_context, |
153 | SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, | |
154 | const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) { | |
7c673cae FG |
155 | *seq = kMaxSequenceNumber; |
156 | ||
157 | for (auto& memtable : *list) { | |
158 | SequenceNumber current_seq = kMaxSequenceNumber; | |
159 | ||
494da23a TL |
160 | bool done = |
161 | memtable->Get(key, value, s, merge_context, max_covering_tombstone_seq, | |
162 | ¤t_seq, read_opts, callback, is_blob_index); | |
7c673cae FG |
163 | if (*seq == kMaxSequenceNumber) { |
164 | // Store the most recent sequence number of any operation on this key. | |
165 | // Since we only care about the most recent change, we only need to | |
166 | // return the first operation found when searching memtables in | |
167 | // reverse-chronological order. | |
11fdf7f2 TL |
168 | // current_seq would be equal to kMaxSequenceNumber if the value was to be |
169 | // skipped. This allows seq to be assigned again when the next value is | |
170 | // read. | |
7c673cae FG |
171 | *seq = current_seq; |
172 | } | |
173 | ||
174 | if (done) { | |
494da23a | 175 | assert(*seq != kMaxSequenceNumber || s->IsNotFound()); |
7c673cae FG |
176 | return true; |
177 | } | |
178 | if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) { | |
179 | return false; | |
180 | } | |
181 | } | |
182 | return false; | |
183 | } | |
184 | ||
185 | Status MemTableListVersion::AddRangeTombstoneIterators( | |
11fdf7f2 | 186 | const ReadOptions& read_opts, Arena* /*arena*/, |
7c673cae FG |
187 | RangeDelAggregator* range_del_agg) { |
188 | assert(range_del_agg != nullptr); | |
f67539c2 TL |
189 | // Except for snapshot read, using kMaxSequenceNumber is OK because these |
190 | // are immutable memtables. | |
191 | SequenceNumber read_seq = read_opts.snapshot != nullptr | |
192 | ? read_opts.snapshot->GetSequenceNumber() | |
193 | : kMaxSequenceNumber; | |
7c673cae | 194 | for (auto& m : memlist_) { |
494da23a | 195 | std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter( |
f67539c2 | 196 | m->NewRangeTombstoneIterator(read_opts, read_seq)); |
494da23a | 197 | range_del_agg->AddTombstones(std::move(range_del_iter)); |
11fdf7f2 TL |
198 | } |
199 | return Status::OK(); | |
200 | } | |
201 | ||
7c673cae FG |
202 | void MemTableListVersion::AddIterators( |
203 | const ReadOptions& options, std::vector<InternalIterator*>* iterator_list, | |
204 | Arena* arena) { | |
205 | for (auto& m : memlist_) { | |
206 | iterator_list->push_back(m->NewIterator(options, arena)); | |
207 | } | |
208 | } | |
209 | ||
210 | void MemTableListVersion::AddIterators( | |
211 | const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) { | |
212 | for (auto& m : memlist_) { | |
213 | merge_iter_builder->AddIterator( | |
214 | m->NewIterator(options, merge_iter_builder->GetArena())); | |
215 | } | |
216 | } | |
217 | ||
218 | uint64_t MemTableListVersion::GetTotalNumEntries() const { | |
219 | uint64_t total_num = 0; | |
220 | for (auto& m : memlist_) { | |
221 | total_num += m->num_entries(); | |
222 | } | |
223 | return total_num; | |
224 | } | |
225 | ||
226 | MemTable::MemTableStats MemTableListVersion::ApproximateStats( | |
227 | const Slice& start_ikey, const Slice& end_ikey) { | |
228 | MemTable::MemTableStats total_stats = {0, 0}; | |
229 | for (auto& m : memlist_) { | |
230 | auto mStats = m->ApproximateStats(start_ikey, end_ikey); | |
231 | total_stats.size += mStats.size; | |
232 | total_stats.count += mStats.count; | |
233 | } | |
234 | return total_stats; | |
235 | } | |
236 | ||
237 | uint64_t MemTableListVersion::GetTotalNumDeletes() const { | |
238 | uint64_t total_num = 0; | |
239 | for (auto& m : memlist_) { | |
240 | total_num += m->num_deletes(); | |
241 | } | |
242 | return total_num; | |
243 | } | |
244 | ||
245 | SequenceNumber MemTableListVersion::GetEarliestSequenceNumber( | |
246 | bool include_history) const { | |
247 | if (include_history && !memlist_history_.empty()) { | |
248 | return memlist_history_.back()->GetEarliestSequenceNumber(); | |
249 | } else if (!memlist_.empty()) { | |
250 | return memlist_.back()->GetEarliestSequenceNumber(); | |
251 | } else { | |
252 | return kMaxSequenceNumber; | |
253 | } | |
254 | } | |
255 | ||
256 | // caller is responsible for referencing m | |
257 | void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) { | |
258 | assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable | |
259 | AddMemTable(m); | |
260 | ||
f67539c2 | 261 | TrimHistory(to_delete, m->ApproximateMemoryUsage()); |
7c673cae FG |
262 | } |
263 | ||
264 | // Removes m from list of memtables not flushed. Caller should NOT Unref m. | |
265 | void MemTableListVersion::Remove(MemTable* m, | |
266 | autovector<MemTable*>* to_delete) { | |
267 | assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable | |
268 | memlist_.remove(m); | |
269 | ||
11fdf7f2 | 270 | m->MarkFlushed(); |
f67539c2 TL |
271 | if (max_write_buffer_size_to_maintain_ > 0 || |
272 | max_write_buffer_number_to_maintain_ > 0) { | |
7c673cae | 273 | memlist_history_.push_front(m); |
f67539c2 TL |
274 | // Unable to get size of mutable memtable at this point, pass 0 to |
275 | // TrimHistory as a best effort. | |
276 | TrimHistory(to_delete, 0); | |
7c673cae FG |
277 | } else { |
278 | UnrefMemTable(to_delete, m); | |
279 | } | |
280 | } | |
281 | ||
f67539c2 TL |
282 | // return the total memory usage assuming the oldest flushed memtable is dropped |
283 | size_t MemTableListVersion::ApproximateMemoryUsageExcludingLast() const { | |
284 | size_t total_memtable_size = 0; | |
285 | for (auto& memtable : memlist_) { | |
286 | total_memtable_size += memtable->ApproximateMemoryUsage(); | |
287 | } | |
288 | for (auto& memtable : memlist_history_) { | |
289 | total_memtable_size += memtable->ApproximateMemoryUsage(); | |
290 | } | |
291 | if (!memlist_history_.empty()) { | |
292 | total_memtable_size -= memlist_history_.back()->ApproximateMemoryUsage(); | |
293 | } | |
294 | return total_memtable_size; | |
295 | } | |
296 | ||
297 | bool MemTableListVersion::MemtableLimitExceeded(size_t usage) { | |
298 | if (max_write_buffer_size_to_maintain_ > 0) { | |
299 | // calculate the total memory usage after dropping the oldest flushed | |
300 | // memtable, compare with max_write_buffer_size_to_maintain_ to decide | |
301 | // whether to trim history | |
302 | return ApproximateMemoryUsageExcludingLast() + usage >= | |
303 | static_cast<size_t>(max_write_buffer_size_to_maintain_); | |
304 | } else if (max_write_buffer_number_to_maintain_ > 0) { | |
305 | return memlist_.size() + memlist_history_.size() > | |
306 | static_cast<size_t>(max_write_buffer_number_to_maintain_); | |
307 | } else { | |
308 | return false; | |
309 | } | |
310 | } | |
311 | ||
7c673cae | 312 | // Make sure we don't use up too much space in history |
f67539c2 TL |
313 | void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete, |
314 | size_t usage) { | |
315 | while (MemtableLimitExceeded(usage) && !memlist_history_.empty()) { | |
7c673cae FG |
316 | MemTable* x = memlist_history_.back(); |
317 | memlist_history_.pop_back(); | |
318 | ||
319 | UnrefMemTable(to_delete, x); | |
320 | } | |
321 | } | |
322 | ||
323 | // Returns true if there is at least one memtable on which flush has | |
324 | // not yet started. | |
325 | bool MemTableList::IsFlushPending() const { | |
494da23a | 326 | if ((flush_requested_ && num_flush_not_started_ > 0) || |
7c673cae FG |
327 | (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) { |
328 | assert(imm_flush_needed.load(std::memory_order_relaxed)); | |
329 | return true; | |
330 | } | |
331 | return false; | |
332 | } | |
333 | ||
334 | // Returns the memtables that need to be flushed. | |
494da23a TL |
335 | void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id, |
336 | autovector<MemTable*>* ret) { | |
7c673cae FG |
337 | AutoThreadOperationStageUpdater stage_updater( |
338 | ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); | |
339 | const auto& memlist = current_->memlist_; | |
f67539c2 | 340 | bool atomic_flush = false; |
7c673cae FG |
341 | for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { |
342 | MemTable* m = *it; | |
f67539c2 TL |
343 | if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) { |
344 | atomic_flush = true; | |
345 | } | |
494da23a TL |
346 | if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) { |
347 | break; | |
348 | } | |
7c673cae FG |
349 | if (!m->flush_in_progress_) { |
350 | assert(!m->flush_completed_); | |
351 | num_flush_not_started_--; | |
352 | if (num_flush_not_started_ == 0) { | |
353 | imm_flush_needed.store(false, std::memory_order_release); | |
354 | } | |
355 | m->flush_in_progress_ = true; // flushing will start very soon | |
356 | ret->push_back(m); | |
357 | } | |
358 | } | |
f67539c2 TL |
359 | if (!atomic_flush || num_flush_not_started_ == 0) { |
360 | flush_requested_ = false; // start-flush request is complete | |
361 | } | |
7c673cae FG |
362 | } |
363 | ||
364 | void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems, | |
11fdf7f2 | 365 | uint64_t /*file_number*/) { |
7c673cae FG |
366 | AutoThreadOperationStageUpdater stage_updater( |
367 | ThreadStatus::STAGE_MEMTABLE_ROLLBACK); | |
368 | assert(!mems.empty()); | |
369 | ||
370 | // If the flush was not successful, then just reset state. | |
371 | // Maybe a succeeding attempt to flush will be successful. | |
372 | for (MemTable* m : mems) { | |
373 | assert(m->flush_in_progress_); | |
374 | assert(m->file_number_ == 0); | |
375 | ||
376 | m->flush_in_progress_ = false; | |
377 | m->flush_completed_ = false; | |
378 | m->edit_.Clear(); | |
379 | num_flush_not_started_++; | |
380 | } | |
381 | imm_flush_needed.store(true, std::memory_order_release); | |
382 | } | |
383 | ||
494da23a TL |
384 | // Try record a successful flush in the manifest file. It might just return |
385 | // Status::OK letting a concurrent flush to do actual the recording.. | |
386 | Status MemTableList::TryInstallMemtableFlushResults( | |
7c673cae | 387 | ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, |
11fdf7f2 TL |
388 | const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker, |
389 | VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, | |
390 | autovector<MemTable*>* to_delete, Directory* db_directory, | |
f67539c2 TL |
391 | LogBuffer* log_buffer, |
392 | std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info) { | |
7c673cae FG |
393 | AutoThreadOperationStageUpdater stage_updater( |
394 | ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); | |
395 | mu->AssertHeld(); | |
396 | ||
494da23a TL |
397 | // Flush was successful |
398 | // Record the status on the memtable object. Either this call or a call by a | |
399 | // concurrent flush thread will read the status and write it to manifest. | |
7c673cae FG |
400 | for (size_t i = 0; i < mems.size(); ++i) { |
401 | // All the edits are associated with the first memtable of this batch. | |
402 | assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0); | |
403 | ||
404 | mems[i]->flush_completed_ = true; | |
405 | mems[i]->file_number_ = file_number; | |
406 | } | |
407 | ||
408 | // if some other thread is already committing, then return | |
409 | Status s; | |
410 | if (commit_in_progress_) { | |
494da23a | 411 | TEST_SYNC_POINT("MemTableList::TryInstallMemtableFlushResults:InProgress"); |
7c673cae FG |
412 | return s; |
413 | } | |
414 | ||
415 | // Only a single thread can be executing this piece of code | |
416 | commit_in_progress_ = true; | |
417 | ||
418 | // Retry until all completed flushes are committed. New flushes can finish | |
419 | // while the current thread is writing manifest where mutex is released. | |
420 | while (s.ok()) { | |
421 | auto& memlist = current_->memlist_; | |
494da23a TL |
422 | // The back is the oldest; if flush_completed_ is not set to it, it means |
423 | // that we were assigned a more recent memtable. The memtables' flushes must | |
424 | // be recorded in manifest in order. A concurrent flush thread, who is | |
425 | // assigned to flush the oldest memtable, will later wake up and does all | |
426 | // the pending writes to manifest, in order. | |
7c673cae FG |
427 | if (memlist.empty() || !memlist.back()->flush_completed_) { |
428 | break; | |
429 | } | |
430 | // scan all memtables from the earliest, and commit those | |
494da23a | 431 | // (in that order) that have finished flushing. Memtables |
7c673cae FG |
432 | // are always committed in the order that they were created. |
433 | uint64_t batch_file_number = 0; | |
434 | size_t batch_count = 0; | |
435 | autovector<VersionEdit*> edit_list; | |
11fdf7f2 | 436 | autovector<MemTable*> memtables_to_flush; |
7c673cae FG |
437 | // enumerate from the last (earliest) element to see how many batch finished |
438 | for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { | |
439 | MemTable* m = *it; | |
440 | if (!m->flush_completed_) { | |
441 | break; | |
442 | } | |
443 | if (it == memlist.rbegin() || batch_file_number != m->file_number_) { | |
444 | batch_file_number = m->file_number_; | |
445 | ROCKS_LOG_BUFFER(log_buffer, | |
446 | "[%s] Level-0 commit table #%" PRIu64 " started", | |
447 | cfd->GetName().c_str(), m->file_number_); | |
448 | edit_list.push_back(&m->edit_); | |
11fdf7f2 | 449 | memtables_to_flush.push_back(m); |
f67539c2 TL |
450 | #ifndef ROCKSDB_LITE |
451 | std::unique_ptr<FlushJobInfo> info = m->ReleaseFlushJobInfo(); | |
452 | if (info != nullptr) { | |
453 | committed_flush_jobs_info->push_back(std::move(info)); | |
454 | } | |
455 | #else | |
456 | (void)committed_flush_jobs_info; | |
457 | #endif // !ROCKSDB_LITE | |
7c673cae FG |
458 | } |
459 | batch_count++; | |
460 | } | |
461 | ||
494da23a | 462 | // TODO(myabandeh): Not sure how batch_count could be 0 here. |
7c673cae | 463 | if (batch_count > 0) { |
11fdf7f2 TL |
464 | if (vset->db_options()->allow_2pc) { |
465 | assert(edit_list.size() > 0); | |
466 | // We piggyback the information of earliest log file to keep in the | |
467 | // manifest entry for the last file flushed. | |
468 | edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep( | |
469 | vset, *cfd, edit_list, memtables_to_flush, prep_tracker)); | |
470 | } | |
471 | ||
7c673cae FG |
472 | // this can release and reacquire the mutex. |
473 | s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, | |
474 | db_directory); | |
475 | ||
476 | // we will be changing the version in the next code path, | |
477 | // so we better create a new one, since versions are immutable | |
478 | InstallNewVersion(); | |
479 | ||
480 | // All the later memtables that have the same filenum | |
481 | // are part of the same batch. They can be committed now. | |
482 | uint64_t mem_id = 1; // how many memtables have been flushed. | |
11fdf7f2 TL |
483 | |
484 | // commit new state only if the column family is NOT dropped. | |
485 | // The reason is as follows (refer to | |
486 | // ColumnFamilyTest.FlushAndDropRaceCondition). | |
487 | // If the column family is dropped, then according to LogAndApply, its | |
494da23a | 488 | // corresponding flush operation is NOT written to the MANIFEST. This |
11fdf7f2 TL |
489 | // means the DB is not aware of the L0 files generated from the flush. |
490 | // By committing the new state, we remove the memtable from the memtable | |
491 | // list. Creating an iterator on this column family will not be able to | |
492 | // read full data since the memtable is removed, and the DB is not aware | |
493 | // of the L0 files, causing MergingIterator unable to build child | |
494 | // iterators. RocksDB contract requires that the iterator can be created | |
495 | // on a dropped column family, and we must be able to | |
496 | // read full data as long as column family handle is not deleted, even if | |
497 | // the column family is dropped. | |
498 | if (s.ok() && !cfd->IsDropped()) { // commit new state | |
7c673cae FG |
499 | while (batch_count-- > 0) { |
500 | MemTable* m = current_->memlist_.back(); | |
501 | ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64 | |
502 | ": memtable #%" PRIu64 " done", | |
503 | cfd->GetName().c_str(), m->file_number_, mem_id); | |
504 | assert(m->file_number_ > 0); | |
505 | current_->Remove(m, to_delete); | |
f67539c2 TL |
506 | UpdateCachedValuesFromMemTableListVersion(); |
507 | ResetTrimHistoryNeeded(); | |
7c673cae FG |
508 | ++mem_id; |
509 | } | |
510 | } else { | |
f67539c2 | 511 | for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) { |
7c673cae FG |
512 | MemTable* m = *it; |
513 | // commit failed. setup state so that we can flush again. | |
514 | ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64 | |
515 | ": memtable #%" PRIu64 " failed", | |
516 | m->file_number_, mem_id); | |
517 | m->flush_completed_ = false; | |
518 | m->flush_in_progress_ = false; | |
519 | m->edit_.Clear(); | |
520 | num_flush_not_started_++; | |
521 | m->file_number_ = 0; | |
522 | imm_flush_needed.store(true, std::memory_order_release); | |
523 | ++mem_id; | |
524 | } | |
525 | } | |
526 | } | |
527 | } | |
528 | commit_in_progress_ = false; | |
529 | return s; | |
530 | } | |
531 | ||
532 | // New memtables are inserted at the front of the list. | |
533 | void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) { | |
534 | assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_); | |
535 | InstallNewVersion(); | |
536 | // this method is used to move mutable memtable into an immutable list. | |
537 | // since mutable memtable is already refcounted by the DBImpl, | |
538 | // and when moving to the imutable list we don't unref it, | |
539 | // we don't have to ref the memtable here. we just take over the | |
540 | // reference from the DBImpl. | |
541 | current_->Add(m, to_delete); | |
542 | m->MarkImmutable(); | |
543 | num_flush_not_started_++; | |
544 | if (num_flush_not_started_ == 1) { | |
545 | imm_flush_needed.store(true, std::memory_order_release); | |
546 | } | |
f67539c2 TL |
547 | UpdateCachedValuesFromMemTableListVersion(); |
548 | ResetTrimHistoryNeeded(); | |
549 | } | |
550 | ||
551 | void MemTableList::TrimHistory(autovector<MemTable*>* to_delete, size_t usage) { | |
552 | InstallNewVersion(); | |
553 | current_->TrimHistory(to_delete, usage); | |
554 | UpdateCachedValuesFromMemTableListVersion(); | |
555 | ResetTrimHistoryNeeded(); | |
7c673cae FG |
556 | } |
557 | ||
558 | // Returns an estimate of the number of bytes of data in use. | |
559 | size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() { | |
560 | size_t total_size = 0; | |
561 | for (auto& memtable : current_->memlist_) { | |
562 | total_size += memtable->ApproximateMemoryUsage(); | |
563 | } | |
564 | return total_size; | |
565 | } | |
566 | ||
567 | size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; } | |
568 | ||
f67539c2 TL |
569 | size_t MemTableList::ApproximateMemoryUsageExcludingLast() const { |
570 | const size_t usage = | |
571 | current_memory_usage_excluding_last_.load(std::memory_order_relaxed); | |
572 | return usage; | |
573 | } | |
574 | ||
575 | bool MemTableList::HasHistory() const { | |
576 | const bool has_history = current_has_history_.load(std::memory_order_relaxed); | |
577 | return has_history; | |
578 | } | |
579 | ||
580 | void MemTableList::UpdateCachedValuesFromMemTableListVersion() { | |
581 | const size_t total_memtable_size = | |
582 | current_->ApproximateMemoryUsageExcludingLast(); | |
583 | current_memory_usage_excluding_last_.store(total_memtable_size, | |
584 | std::memory_order_relaxed); | |
585 | ||
586 | const bool has_history = current_->HasHistory(); | |
587 | current_has_history_.store(has_history, std::memory_order_relaxed); | |
588 | } | |
589 | ||
11fdf7f2 TL |
590 | uint64_t MemTableList::ApproximateOldestKeyTime() const { |
591 | if (!current_->memlist_.empty()) { | |
592 | return current_->memlist_.back()->ApproximateOldestKeyTime(); | |
593 | } | |
594 | return std::numeric_limits<uint64_t>::max(); | |
595 | } | |
596 | ||
7c673cae FG |
597 | void MemTableList::InstallNewVersion() { |
598 | if (current_->refs_ == 1) { | |
599 | // we're the only one using the version, just keep using it | |
600 | } else { | |
601 | // somebody else holds the current version, we need to create new one | |
602 | MemTableListVersion* version = current_; | |
603 | current_ = new MemTableListVersion(¤t_memory_usage_, current_); | |
604 | current_->Ref(); | |
605 | version->Unref(); | |
606 | } | |
607 | } | |
608 | ||
11fdf7f2 TL |
609 | uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( |
610 | const autovector<MemTable*>& memtables_to_flush) { | |
7c673cae FG |
611 | uint64_t min_log = 0; |
612 | ||
613 | for (auto& m : current_->memlist_) { | |
11fdf7f2 TL |
614 | // Assume the list is very short, we can live with O(m*n). We can optimize |
615 | // if the performance has some problem. | |
616 | bool should_skip = false; | |
617 | for (MemTable* m_to_flush : memtables_to_flush) { | |
618 | if (m == m_to_flush) { | |
619 | should_skip = true; | |
620 | break; | |
621 | } | |
622 | } | |
623 | if (should_skip) { | |
7c673cae FG |
624 | continue; |
625 | } | |
626 | ||
627 | auto log = m->GetMinLogContainingPrepSection(); | |
628 | ||
629 | if (log > 0 && (min_log == 0 || log < min_log)) { | |
630 | min_log = log; | |
631 | } | |
632 | } | |
633 | ||
634 | return min_log; | |
635 | } | |
636 | ||
494da23a TL |
637 | // Commit a successful atomic flush in the manifest file. |
638 | Status InstallMemtableAtomicFlushResults( | |
639 | const autovector<MemTableList*>* imm_lists, | |
640 | const autovector<ColumnFamilyData*>& cfds, | |
641 | const autovector<const MutableCFOptions*>& mutable_cf_options_list, | |
642 | const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset, | |
643 | InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas, | |
644 | autovector<MemTable*>* to_delete, Directory* db_directory, | |
645 | LogBuffer* log_buffer) { | |
646 | AutoThreadOperationStageUpdater stage_updater( | |
647 | ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); | |
648 | mu->AssertHeld(); | |
649 | ||
650 | size_t num = mems_list.size(); | |
651 | assert(cfds.size() == num); | |
652 | if (imm_lists != nullptr) { | |
653 | assert(imm_lists->size() == num); | |
654 | } | |
655 | for (size_t k = 0; k != num; ++k) { | |
656 | #ifndef NDEBUG | |
657 | const auto* imm = | |
658 | (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k); | |
659 | if (!mems_list[k]->empty()) { | |
660 | assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID()); | |
661 | } | |
662 | #endif | |
663 | assert(nullptr != file_metas[k]); | |
664 | for (size_t i = 0; i != mems_list[k]->size(); ++i) { | |
665 | assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0); | |
666 | (*mems_list[k])[i]->SetFlushCompleted(true); | |
667 | (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber()); | |
668 | } | |
669 | } | |
670 | ||
671 | Status s; | |
672 | ||
673 | autovector<autovector<VersionEdit*>> edit_lists; | |
674 | uint32_t num_entries = 0; | |
675 | for (const auto mems : mems_list) { | |
676 | assert(mems != nullptr); | |
677 | autovector<VersionEdit*> edits; | |
678 | assert(!mems->empty()); | |
679 | edits.emplace_back((*mems)[0]->GetEdits()); | |
680 | ++num_entries; | |
681 | edit_lists.emplace_back(edits); | |
682 | } | |
683 | // Mark the version edits as an atomic group if the number of version edits | |
684 | // exceeds 1. | |
685 | if (cfds.size() > 1) { | |
686 | for (auto& edits : edit_lists) { | |
687 | assert(edits.size() == 1); | |
688 | edits[0]->MarkAtomicGroup(--num_entries); | |
689 | } | |
690 | assert(0 == num_entries); | |
691 | } | |
692 | ||
693 | // this can release and reacquire the mutex. | |
694 | s = vset->LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu, | |
695 | db_directory); | |
696 | ||
697 | for (size_t k = 0; k != cfds.size(); ++k) { | |
698 | auto* imm = (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k); | |
699 | imm->InstallNewVersion(); | |
700 | } | |
701 | ||
f67539c2 | 702 | if (s.ok() || s.IsColumnFamilyDropped()) { |
494da23a TL |
703 | for (size_t i = 0; i != cfds.size(); ++i) { |
704 | if (cfds[i]->IsDropped()) { | |
705 | continue; | |
706 | } | |
707 | auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i); | |
708 | for (auto m : *mems_list[i]) { | |
709 | assert(m->GetFileNumber() > 0); | |
710 | uint64_t mem_id = m->GetID(); | |
711 | ROCKS_LOG_BUFFER(log_buffer, | |
712 | "[%s] Level-0 commit table #%" PRIu64 | |
713 | ": memtable #%" PRIu64 " done", | |
714 | cfds[i]->GetName().c_str(), m->GetFileNumber(), | |
715 | mem_id); | |
716 | imm->current_->Remove(m, to_delete); | |
f67539c2 TL |
717 | imm->UpdateCachedValuesFromMemTableListVersion(); |
718 | imm->ResetTrimHistoryNeeded(); | |
494da23a TL |
719 | } |
720 | } | |
721 | } else { | |
722 | for (size_t i = 0; i != cfds.size(); ++i) { | |
723 | auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i); | |
724 | for (auto m : *mems_list[i]) { | |
725 | uint64_t mem_id = m->GetID(); | |
726 | ROCKS_LOG_BUFFER(log_buffer, | |
727 | "[%s] Level-0 commit table #%" PRIu64 | |
728 | ": memtable #%" PRIu64 " failed", | |
729 | cfds[i]->GetName().c_str(), m->GetFileNumber(), | |
730 | mem_id); | |
731 | m->SetFlushCompleted(false); | |
732 | m->SetFlushInProgress(false); | |
733 | m->GetEdits()->Clear(); | |
734 | m->SetFileNumber(0); | |
735 | imm->num_flush_not_started_++; | |
736 | } | |
737 | imm->imm_flush_needed.store(true, std::memory_order_release); | |
738 | } | |
739 | } | |
740 | ||
741 | return s; | |
742 | } | |
743 | ||
f67539c2 TL |
744 | void MemTableList::RemoveOldMemTables(uint64_t log_number, |
745 | autovector<MemTable*>* to_delete) { | |
746 | assert(to_delete != nullptr); | |
747 | InstallNewVersion(); | |
748 | auto& memlist = current_->memlist_; | |
749 | autovector<MemTable*> old_memtables; | |
750 | for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { | |
751 | MemTable* mem = *it; | |
752 | if (mem->GetNextLogNumber() > log_number) { | |
753 | break; | |
754 | } | |
755 | old_memtables.push_back(mem); | |
756 | } | |
757 | ||
758 | for (auto it = old_memtables.begin(); it != old_memtables.end(); ++it) { | |
759 | MemTable* mem = *it; | |
760 | current_->Remove(mem, to_delete); | |
761 | --num_flush_not_started_; | |
762 | if (0 == num_flush_not_started_) { | |
763 | imm_flush_needed.store(false, std::memory_order_release); | |
764 | } | |
765 | } | |
766 | ||
767 | UpdateCachedValuesFromMemTableListVersion(); | |
768 | ResetTrimHistoryNeeded(); | |
769 | } | |
770 | ||
771 | } // namespace ROCKSDB_NAMESPACE |