]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under the BSD-style license found in the | |
3 | // LICENSE file in the root directory of this source tree. An additional grant | |
4 | // of patent rights can be found in the PATENTS file in the same directory. | |
5 | // | |
6 | #include "db/memtable_list.h" | |
7 | ||
8 | #ifndef __STDC_FORMAT_MACROS | |
9 | #define __STDC_FORMAT_MACROS | |
10 | #endif | |
11 | ||
12 | #include <inttypes.h> | |
13 | #include <string> | |
14 | #include "db/memtable.h" | |
15 | #include "db/version_set.h" | |
16 | #include "monitoring/thread_status_util.h" | |
17 | #include "rocksdb/db.h" | |
18 | #include "rocksdb/env.h" | |
19 | #include "rocksdb/iterator.h" | |
20 | #include "table/merging_iterator.h" | |
21 | #include "util/coding.h" | |
22 | #include "util/log_buffer.h" | |
23 | #include "util/sync_point.h" | |
24 | ||
25 | namespace rocksdb { | |
26 | ||
27 | class InternalKeyComparator; | |
28 | class Mutex; | |
29 | class VersionSet; | |
30 | ||
31 | void MemTableListVersion::AddMemTable(MemTable* m) { | |
32 | memlist_.push_front(m); | |
33 | *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage(); | |
34 | } | |
35 | ||
36 | void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete, | |
37 | MemTable* m) { | |
38 | if (m->Unref()) { | |
39 | to_delete->push_back(m); | |
40 | assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage()); | |
41 | *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage(); | |
42 | } else { | |
43 | } | |
44 | } | |
45 | ||
46 | MemTableListVersion::MemTableListVersion( | |
47 | size_t* parent_memtable_list_memory_usage, MemTableListVersion* old) | |
48 | : max_write_buffer_number_to_maintain_( | |
49 | old->max_write_buffer_number_to_maintain_), | |
50 | parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) { | |
51 | if (old != nullptr) { | |
52 | memlist_ = old->memlist_; | |
53 | for (auto& m : memlist_) { | |
54 | m->Ref(); | |
55 | } | |
56 | ||
57 | memlist_history_ = old->memlist_history_; | |
58 | for (auto& m : memlist_history_) { | |
59 | m->Ref(); | |
60 | } | |
61 | } | |
62 | } | |
63 | ||
64 | MemTableListVersion::MemTableListVersion( | |
65 | size_t* parent_memtable_list_memory_usage, | |
66 | int max_write_buffer_number_to_maintain) | |
67 | : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain), | |
68 | parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {} | |
69 | ||
70 | void MemTableListVersion::Ref() { ++refs_; } | |
71 | ||
72 | // called by superversion::clean() | |
73 | void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) { | |
74 | assert(refs_ >= 1); | |
75 | --refs_; | |
76 | if (refs_ == 0) { | |
77 | // if to_delete is equal to nullptr it means we're confident | |
78 | // that refs_ will not be zero | |
79 | assert(to_delete != nullptr); | |
80 | for (const auto& m : memlist_) { | |
81 | UnrefMemTable(to_delete, m); | |
82 | } | |
83 | for (const auto& m : memlist_history_) { | |
84 | UnrefMemTable(to_delete, m); | |
85 | } | |
86 | delete this; | |
87 | } | |
88 | } | |
89 | ||
90 | int MemTableList::NumNotFlushed() const { | |
91 | int size = static_cast<int>(current_->memlist_.size()); | |
92 | assert(num_flush_not_started_ <= size); | |
93 | return size; | |
94 | } | |
95 | ||
96 | int MemTableList::NumFlushed() const { | |
97 | return static_cast<int>(current_->memlist_history_.size()); | |
98 | } | |
99 | ||
100 | // Search all the memtables starting from the most recent one. | |
101 | // Return the most recent value found, if any. | |
102 | // Operands stores the list of merge operations to apply, so far. | |
103 | bool MemTableListVersion::Get(const LookupKey& key, std::string* value, | |
104 | Status* s, MergeContext* merge_context, | |
105 | RangeDelAggregator* range_del_agg, | |
106 | SequenceNumber* seq, | |
107 | const ReadOptions& read_opts) { | |
108 | return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg, | |
109 | seq, read_opts); | |
110 | } | |
111 | ||
112 | bool MemTableListVersion::GetFromHistory(const LookupKey& key, | |
113 | std::string* value, Status* s, | |
114 | MergeContext* merge_context, | |
115 | RangeDelAggregator* range_del_agg, | |
116 | SequenceNumber* seq, | |
117 | const ReadOptions& read_opts) { | |
118 | return GetFromList(&memlist_history_, key, value, s, merge_context, | |
119 | range_del_agg, seq, read_opts); | |
120 | } | |
121 | ||
122 | bool MemTableListVersion::GetFromList(std::list<MemTable*>* list, | |
123 | const LookupKey& key, std::string* value, | |
124 | Status* s, MergeContext* merge_context, | |
125 | RangeDelAggregator* range_del_agg, | |
126 | SequenceNumber* seq, | |
127 | const ReadOptions& read_opts) { | |
128 | *seq = kMaxSequenceNumber; | |
129 | ||
130 | for (auto& memtable : *list) { | |
131 | SequenceNumber current_seq = kMaxSequenceNumber; | |
132 | ||
133 | bool done = memtable->Get(key, value, s, merge_context, range_del_agg, | |
134 | ¤t_seq, read_opts); | |
135 | if (*seq == kMaxSequenceNumber) { | |
136 | // Store the most recent sequence number of any operation on this key. | |
137 | // Since we only care about the most recent change, we only need to | |
138 | // return the first operation found when searching memtables in | |
139 | // reverse-chronological order. | |
140 | *seq = current_seq; | |
141 | } | |
142 | ||
143 | if (done) { | |
144 | assert(*seq != kMaxSequenceNumber); | |
145 | return true; | |
146 | } | |
147 | if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) { | |
148 | return false; | |
149 | } | |
150 | } | |
151 | return false; | |
152 | } | |
153 | ||
154 | Status MemTableListVersion::AddRangeTombstoneIterators( | |
155 | const ReadOptions& read_opts, Arena* arena, | |
156 | RangeDelAggregator* range_del_agg) { | |
157 | assert(range_del_agg != nullptr); | |
158 | for (auto& m : memlist_) { | |
159 | std::unique_ptr<InternalIterator> range_del_iter( | |
160 | m->NewRangeTombstoneIterator(read_opts)); | |
161 | Status s = range_del_agg->AddTombstones(std::move(range_del_iter)); | |
162 | if (!s.ok()) { | |
163 | return s; | |
164 | } | |
165 | } | |
166 | return Status::OK(); | |
167 | } | |
168 | ||
169 | void MemTableListVersion::AddIterators( | |
170 | const ReadOptions& options, std::vector<InternalIterator*>* iterator_list, | |
171 | Arena* arena) { | |
172 | for (auto& m : memlist_) { | |
173 | iterator_list->push_back(m->NewIterator(options, arena)); | |
174 | } | |
175 | } | |
176 | ||
177 | void MemTableListVersion::AddIterators( | |
178 | const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) { | |
179 | for (auto& m : memlist_) { | |
180 | merge_iter_builder->AddIterator( | |
181 | m->NewIterator(options, merge_iter_builder->GetArena())); | |
182 | } | |
183 | } | |
184 | ||
185 | uint64_t MemTableListVersion::GetTotalNumEntries() const { | |
186 | uint64_t total_num = 0; | |
187 | for (auto& m : memlist_) { | |
188 | total_num += m->num_entries(); | |
189 | } | |
190 | return total_num; | |
191 | } | |
192 | ||
193 | MemTable::MemTableStats MemTableListVersion::ApproximateStats( | |
194 | const Slice& start_ikey, const Slice& end_ikey) { | |
195 | MemTable::MemTableStats total_stats = {0, 0}; | |
196 | for (auto& m : memlist_) { | |
197 | auto mStats = m->ApproximateStats(start_ikey, end_ikey); | |
198 | total_stats.size += mStats.size; | |
199 | total_stats.count += mStats.count; | |
200 | } | |
201 | return total_stats; | |
202 | } | |
203 | ||
204 | uint64_t MemTableListVersion::GetTotalNumDeletes() const { | |
205 | uint64_t total_num = 0; | |
206 | for (auto& m : memlist_) { | |
207 | total_num += m->num_deletes(); | |
208 | } | |
209 | return total_num; | |
210 | } | |
211 | ||
212 | SequenceNumber MemTableListVersion::GetEarliestSequenceNumber( | |
213 | bool include_history) const { | |
214 | if (include_history && !memlist_history_.empty()) { | |
215 | return memlist_history_.back()->GetEarliestSequenceNumber(); | |
216 | } else if (!memlist_.empty()) { | |
217 | return memlist_.back()->GetEarliestSequenceNumber(); | |
218 | } else { | |
219 | return kMaxSequenceNumber; | |
220 | } | |
221 | } | |
222 | ||
223 | // caller is responsible for referencing m | |
224 | void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) { | |
225 | assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable | |
226 | AddMemTable(m); | |
227 | ||
228 | TrimHistory(to_delete); | |
229 | } | |
230 | ||
231 | // Removes m from list of memtables not flushed. Caller should NOT Unref m. | |
232 | void MemTableListVersion::Remove(MemTable* m, | |
233 | autovector<MemTable*>* to_delete) { | |
234 | assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable | |
235 | memlist_.remove(m); | |
236 | ||
237 | if (max_write_buffer_number_to_maintain_ > 0) { | |
238 | memlist_history_.push_front(m); | |
239 | TrimHistory(to_delete); | |
240 | } else { | |
241 | UnrefMemTable(to_delete, m); | |
242 | } | |
243 | } | |
244 | ||
245 | // Make sure we don't use up too much space in history | |
246 | void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete) { | |
247 | while (memlist_.size() + memlist_history_.size() > | |
248 | static_cast<size_t>(max_write_buffer_number_to_maintain_) && | |
249 | !memlist_history_.empty()) { | |
250 | MemTable* x = memlist_history_.back(); | |
251 | memlist_history_.pop_back(); | |
252 | ||
253 | UnrefMemTable(to_delete, x); | |
254 | } | |
255 | } | |
256 | ||
257 | // Returns true if there is at least one memtable on which flush has | |
258 | // not yet started. | |
259 | bool MemTableList::IsFlushPending() const { | |
260 | if ((flush_requested_ && num_flush_not_started_ >= 1) || | |
261 | (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) { | |
262 | assert(imm_flush_needed.load(std::memory_order_relaxed)); | |
263 | return true; | |
264 | } | |
265 | return false; | |
266 | } | |
267 | ||
268 | // Returns the memtables that need to be flushed. | |
269 | void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) { | |
270 | AutoThreadOperationStageUpdater stage_updater( | |
271 | ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); | |
272 | const auto& memlist = current_->memlist_; | |
273 | for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { | |
274 | MemTable* m = *it; | |
275 | if (!m->flush_in_progress_) { | |
276 | assert(!m->flush_completed_); | |
277 | num_flush_not_started_--; | |
278 | if (num_flush_not_started_ == 0) { | |
279 | imm_flush_needed.store(false, std::memory_order_release); | |
280 | } | |
281 | m->flush_in_progress_ = true; // flushing will start very soon | |
282 | ret->push_back(m); | |
283 | } | |
284 | } | |
285 | flush_requested_ = false; // start-flush request is complete | |
286 | } | |
287 | ||
288 | void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems, | |
289 | uint64_t file_number) { | |
290 | AutoThreadOperationStageUpdater stage_updater( | |
291 | ThreadStatus::STAGE_MEMTABLE_ROLLBACK); | |
292 | assert(!mems.empty()); | |
293 | ||
294 | // If the flush was not successful, then just reset state. | |
295 | // Maybe a succeeding attempt to flush will be successful. | |
296 | for (MemTable* m : mems) { | |
297 | assert(m->flush_in_progress_); | |
298 | assert(m->file_number_ == 0); | |
299 | ||
300 | m->flush_in_progress_ = false; | |
301 | m->flush_completed_ = false; | |
302 | m->edit_.Clear(); | |
303 | num_flush_not_started_++; | |
304 | } | |
305 | imm_flush_needed.store(true, std::memory_order_release); | |
306 | } | |
307 | ||
308 | // Record a successful flush in the manifest file | |
309 | Status MemTableList::InstallMemtableFlushResults( | |
310 | ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, | |
311 | const autovector<MemTable*>& mems, VersionSet* vset, InstrumentedMutex* mu, | |
312 | uint64_t file_number, autovector<MemTable*>* to_delete, | |
313 | Directory* db_directory, LogBuffer* log_buffer) { | |
314 | AutoThreadOperationStageUpdater stage_updater( | |
315 | ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); | |
316 | mu->AssertHeld(); | |
317 | ||
318 | // flush was successful | |
319 | for (size_t i = 0; i < mems.size(); ++i) { | |
320 | // All the edits are associated with the first memtable of this batch. | |
321 | assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0); | |
322 | ||
323 | mems[i]->flush_completed_ = true; | |
324 | mems[i]->file_number_ = file_number; | |
325 | } | |
326 | ||
327 | // if some other thread is already committing, then return | |
328 | Status s; | |
329 | if (commit_in_progress_) { | |
330 | TEST_SYNC_POINT("MemTableList::InstallMemtableFlushResults:InProgress"); | |
331 | return s; | |
332 | } | |
333 | ||
334 | // Only a single thread can be executing this piece of code | |
335 | commit_in_progress_ = true; | |
336 | ||
337 | // Retry until all completed flushes are committed. New flushes can finish | |
338 | // while the current thread is writing manifest where mutex is released. | |
339 | while (s.ok()) { | |
340 | auto& memlist = current_->memlist_; | |
341 | if (memlist.empty() || !memlist.back()->flush_completed_) { | |
342 | break; | |
343 | } | |
344 | // scan all memtables from the earliest, and commit those | |
345 | // (in that order) that have finished flushing. Memetables | |
346 | // are always committed in the order that they were created. | |
347 | uint64_t batch_file_number = 0; | |
348 | size_t batch_count = 0; | |
349 | autovector<VersionEdit*> edit_list; | |
350 | // enumerate from the last (earliest) element to see how many batch finished | |
351 | for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { | |
352 | MemTable* m = *it; | |
353 | if (!m->flush_completed_) { | |
354 | break; | |
355 | } | |
356 | if (it == memlist.rbegin() || batch_file_number != m->file_number_) { | |
357 | batch_file_number = m->file_number_; | |
358 | ROCKS_LOG_BUFFER(log_buffer, | |
359 | "[%s] Level-0 commit table #%" PRIu64 " started", | |
360 | cfd->GetName().c_str(), m->file_number_); | |
361 | edit_list.push_back(&m->edit_); | |
362 | } | |
363 | batch_count++; | |
364 | } | |
365 | ||
366 | if (batch_count > 0) { | |
367 | // this can release and reacquire the mutex. | |
368 | s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, | |
369 | db_directory); | |
370 | ||
371 | // we will be changing the version in the next code path, | |
372 | // so we better create a new one, since versions are immutable | |
373 | InstallNewVersion(); | |
374 | ||
375 | // All the later memtables that have the same filenum | |
376 | // are part of the same batch. They can be committed now. | |
377 | uint64_t mem_id = 1; // how many memtables have been flushed. | |
378 | if (s.ok()) { // commit new state | |
379 | while (batch_count-- > 0) { | |
380 | MemTable* m = current_->memlist_.back(); | |
381 | ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64 | |
382 | ": memtable #%" PRIu64 " done", | |
383 | cfd->GetName().c_str(), m->file_number_, mem_id); | |
384 | assert(m->file_number_ > 0); | |
385 | current_->Remove(m, to_delete); | |
386 | ++mem_id; | |
387 | } | |
388 | } else { | |
389 | for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) { | |
390 | MemTable* m = *it; | |
391 | // commit failed. setup state so that we can flush again. | |
392 | ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64 | |
393 | ": memtable #%" PRIu64 " failed", | |
394 | m->file_number_, mem_id); | |
395 | m->flush_completed_ = false; | |
396 | m->flush_in_progress_ = false; | |
397 | m->edit_.Clear(); | |
398 | num_flush_not_started_++; | |
399 | m->file_number_ = 0; | |
400 | imm_flush_needed.store(true, std::memory_order_release); | |
401 | ++mem_id; | |
402 | } | |
403 | } | |
404 | } | |
405 | } | |
406 | commit_in_progress_ = false; | |
407 | return s; | |
408 | } | |
409 | ||
410 | // New memtables are inserted at the front of the list. | |
411 | void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) { | |
412 | assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_); | |
413 | InstallNewVersion(); | |
414 | // this method is used to move mutable memtable into an immutable list. | |
415 | // since mutable memtable is already refcounted by the DBImpl, | |
416 | // and when moving to the imutable list we don't unref it, | |
417 | // we don't have to ref the memtable here. we just take over the | |
418 | // reference from the DBImpl. | |
419 | current_->Add(m, to_delete); | |
420 | m->MarkImmutable(); | |
421 | num_flush_not_started_++; | |
422 | if (num_flush_not_started_ == 1) { | |
423 | imm_flush_needed.store(true, std::memory_order_release); | |
424 | } | |
425 | } | |
426 | ||
427 | // Returns an estimate of the number of bytes of data in use. | |
428 | size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() { | |
429 | size_t total_size = 0; | |
430 | for (auto& memtable : current_->memlist_) { | |
431 | total_size += memtable->ApproximateMemoryUsage(); | |
432 | } | |
433 | return total_size; | |
434 | } | |
435 | ||
436 | size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; } | |
437 | ||
438 | void MemTableList::InstallNewVersion() { | |
439 | if (current_->refs_ == 1) { | |
440 | // we're the only one using the version, just keep using it | |
441 | } else { | |
442 | // somebody else holds the current version, we need to create new one | |
443 | MemTableListVersion* version = current_; | |
444 | current_ = new MemTableListVersion(¤t_memory_usage_, current_); | |
445 | current_->Ref(); | |
446 | version->Unref(); | |
447 | } | |
448 | } | |
449 | ||
450 | uint64_t MemTableList::GetMinLogContainingPrepSection() { | |
451 | uint64_t min_log = 0; | |
452 | ||
453 | for (auto& m : current_->memlist_) { | |
454 | // this mem has been flushed it no longer | |
455 | // needs to hold on the its prep section | |
456 | if (m->flush_completed_) { | |
457 | continue; | |
458 | } | |
459 | ||
460 | auto log = m->GetMinLogContainingPrepSection(); | |
461 | ||
462 | if (log > 0 && (min_log == 0 || log < min_log)) { | |
463 | min_log = log; | |
464 | } | |
465 | } | |
466 | ||
467 | return min_log; | |
468 | } | |
469 | ||
470 | } // namespace rocksdb |