]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/memtable.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / memtable.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "db/memtable.h"
11
7c673cae 12#include <algorithm>
f67539c2 13#include <array>
7c673cae 14#include <limits>
11fdf7f2 15#include <memory>
7c673cae
FG
16#include "db/dbformat.h"
17#include "db/merge_context.h"
18#include "db/merge_helper.h"
19#include "db/pinned_iterators_manager.h"
494da23a 20#include "db/range_tombstone_fragmenter.h"
11fdf7f2 21#include "db/read_callback.h"
f67539c2
TL
22#include "memory/arena.h"
23#include "memory/memory_usage.h"
7c673cae
FG
24#include "monitoring/perf_context_imp.h"
25#include "monitoring/statistics.h"
26#include "port/port.h"
27#include "rocksdb/comparator.h"
28#include "rocksdb/env.h"
29#include "rocksdb/iterator.h"
30#include "rocksdb/merge_operator.h"
31#include "rocksdb/slice_transform.h"
32#include "rocksdb/write_buffer_manager.h"
33#include "table/internal_iterator.h"
34#include "table/iterator_wrapper.h"
35#include "table/merging_iterator.h"
7c673cae
FG
36#include "util/autovector.h"
37#include "util/coding.h"
7c673cae 38#include "util/mutexlock.h"
11fdf7f2 39#include "util/util.h"
7c673cae 40
f67539c2 41namespace ROCKSDB_NAMESPACE {
7c673cae 42
11fdf7f2
TL
43ImmutableMemTableOptions::ImmutableMemTableOptions(
44 const ImmutableCFOptions& ioptions,
45 const MutableCFOptions& mutable_cf_options)
46 : arena_block_size(mutable_cf_options.arena_block_size),
7c673cae
FG
47 memtable_prefix_bloom_bits(
48 static_cast<uint32_t>(
49 static_cast<double>(mutable_cf_options.write_buffer_size) *
50 mutable_cf_options.memtable_prefix_bloom_size_ratio) *
51 8u),
52 memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size),
494da23a
TL
53 memtable_whole_key_filtering(
54 mutable_cf_options.memtable_whole_key_filtering),
7c673cae
FG
55 inplace_update_support(ioptions.inplace_update_support),
56 inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
57 inplace_callback(ioptions.inplace_callback),
58 max_successive_merges(mutable_cf_options.max_successive_merges),
59 statistics(ioptions.statistics),
60 merge_operator(ioptions.merge_operator),
61 info_log(ioptions.info_log) {}
62
63MemTable::MemTable(const InternalKeyComparator& cmp,
64 const ImmutableCFOptions& ioptions,
65 const MutableCFOptions& mutable_cf_options,
66 WriteBufferManager* write_buffer_manager,
11fdf7f2 67 SequenceNumber latest_seq, uint32_t column_family_id)
7c673cae
FG
68 : comparator_(cmp),
69 moptions_(ioptions, mutable_cf_options),
70 refs_(0),
71 kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
11fdf7f2 72 mem_tracker_(write_buffer_manager),
494da23a
TL
73 arena_(moptions_.arena_block_size,
74 (write_buffer_manager != nullptr &&
75 (write_buffer_manager->enabled() ||
76 write_buffer_manager->cost_to_cache()))
77 ? &mem_tracker_
78 : nullptr,
79 mutable_cf_options.memtable_huge_page_size),
7c673cae 80 table_(ioptions.memtable_factory->CreateMemTableRep(
11fdf7f2
TL
81 comparator_, &arena_, mutable_cf_options.prefix_extractor.get(),
82 ioptions.info_log, column_family_id)),
7c673cae 83 range_del_table_(SkipListFactory().CreateMemTableRep(
11fdf7f2
TL
84 comparator_, &arena_, nullptr /* transform */, ioptions.info_log,
85 column_family_id)),
7c673cae
FG
86 is_range_del_table_empty_(true),
87 data_size_(0),
88 num_entries_(0),
89 num_deletes_(0),
11fdf7f2 90 write_buffer_size_(mutable_cf_options.write_buffer_size),
7c673cae
FG
91 flush_in_progress_(false),
92 flush_completed_(false),
93 file_number_(0),
94 first_seqno_(0),
95 earliest_seqno_(latest_seq),
96 creation_seq_(latest_seq),
97 mem_next_logfile_number_(0),
98 min_prep_log_referenced_(0),
99 locks_(moptions_.inplace_update_support
100 ? moptions_.inplace_update_num_locks
101 : 0),
11fdf7f2 102 prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
7c673cae
FG
103 flush_state_(FLUSH_NOT_REQUESTED),
104 env_(ioptions.env),
105 insert_with_hint_prefix_extractor_(
11fdf7f2 106 ioptions.memtable_insert_with_hint_prefix_extractor),
494da23a 107 oldest_key_time_(std::numeric_limits<uint64_t>::max()),
f67539c2
TL
108 atomic_flush_seqno_(kMaxSequenceNumber),
109 approximate_memory_usage_(0) {
7c673cae
FG
110 UpdateFlushState();
111 // something went wrong if we need to flush before inserting anything
112 assert(!ShouldScheduleFlush());
113
494da23a
TL
114 // use bloom_filter_ for both whole key and prefix bloom filter
115 if ((prefix_extractor_ || moptions_.memtable_whole_key_filtering) &&
116 moptions_.memtable_prefix_bloom_bits > 0) {
117 bloom_filter_.reset(
118 new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits,
f67539c2 119 6 /* hard coded 6 probes */,
494da23a 120 moptions_.memtable_huge_page_size, ioptions.info_log));
7c673cae
FG
121 }
122}
123
11fdf7f2
TL
124MemTable::~MemTable() {
125 mem_tracker_.FreeMem();
126 assert(refs_ == 0);
127}
7c673cae
FG
128
129size_t MemTable::ApproximateMemoryUsage() {
f67539c2
TL
130 autovector<size_t> usages = {
131 arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(),
132 range_del_table_->ApproximateMemoryUsage(),
133 ROCKSDB_NAMESPACE::ApproximateMemoryUsage(insert_hints_)};
7c673cae
FG
134 size_t total_usage = 0;
135 for (size_t usage : usages) {
136 // If usage + total_usage >= kMaxSizet, return kMaxSizet.
137 // the following variation is to avoid numeric overflow.
138 if (usage >= port::kMaxSizet - total_usage) {
139 return port::kMaxSizet;
140 }
141 total_usage += usage;
142 }
f67539c2 143 approximate_memory_usage_.store(total_usage, std::memory_order_relaxed);
7c673cae
FG
144 // otherwise, return the actual usage
145 return total_usage;
146}
147
f67539c2 148bool MemTable::ShouldFlushNow() {
11fdf7f2 149 size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
7c673cae
FG
150 // In a lot of times, we cannot allocate arena blocks that exactly matches the
151 // buffer size. Thus we have to decide if we should over-allocate or
152 // under-allocate.
153 // This constant variable can be interpreted as: if we still have more than
154 // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
155 // allocate one more block.
156 const double kAllowOverAllocationRatio = 0.6;
157
158 // If arena still have room for new block allocation, we can safely say it
159 // shouldn't flush.
160 auto allocated_memory = table_->ApproximateMemoryUsage() +
161 range_del_table_->ApproximateMemoryUsage() +
162 arena_.MemoryAllocatedBytes();
163
f67539c2
TL
164 approximate_memory_usage_.store(allocated_memory, std::memory_order_relaxed);
165
7c673cae
FG
166 // if we can still allocate one more block without exceeding the
167 // over-allocation ratio, then we should not flush.
168 if (allocated_memory + kArenaBlockSize <
11fdf7f2 169 write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
7c673cae
FG
170 return false;
171 }
172
11fdf7f2
TL
173 // if user keeps adding entries that exceeds write_buffer_size, we need to
174 // flush earlier even though we still have much available memory left.
175 if (allocated_memory >
176 write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
7c673cae
FG
177 return true;
178 }
179
180 // In this code path, Arena has already allocated its "last block", which
181 // means the total allocatedmemory size is either:
182 // (1) "moderately" over allocated the memory (no more than `0.6 * arena
183 // block size`. Or,
184 // (2) the allocated memory is less than write buffer size, but we'll stop
185 // here since if we allocate a new arena block, we'll over allocate too much
186 // more (half of the arena block size) memory.
187 //
188 // In either case, to avoid over-allocate, the last block will stop allocation
189 // when its usage reaches a certain ratio, which we carefully choose "0.75
190 // full" as the stop condition because it addresses the following issue with
191 // great simplicity: What if the next inserted entry's size is
192 // bigger than AllocatedAndUnused()?
193 //
194 // The answer is: if the entry size is also bigger than 0.25 *
195 // kArenaBlockSize, a dedicated block will be allocated for it; otherwise
196 // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
197 // and regular block. In either case, we *overly* over-allocated.
198 //
199 // Therefore, setting the last block to be at most "0.75 full" avoids both
200 // cases.
201 //
202 // NOTE: the average percentage of waste space of this approach can be counted
203 // as: "arena block size * 0.25 / write buffer size". User who specify a small
204 // write buffer size and/or big arena block size may suffer.
205 return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
206}
207
208void MemTable::UpdateFlushState() {
209 auto state = flush_state_.load(std::memory_order_relaxed);
210 if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
211 // ignore CAS failure, because that means somebody else requested
212 // a flush
213 flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
214 std::memory_order_relaxed,
215 std::memory_order_relaxed);
216 }
217}
218
11fdf7f2
TL
219void MemTable::UpdateOldestKeyTime() {
220 uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed);
221 if (oldest_key_time == std::numeric_limits<uint64_t>::max()) {
222 int64_t current_time = 0;
223 auto s = env_->GetCurrentTime(&current_time);
224 if (s.ok()) {
225 assert(current_time >= 0);
226 // If fail, the timestamp is already set.
227 oldest_key_time_.compare_exchange_strong(
228 oldest_key_time, static_cast<uint64_t>(current_time),
229 std::memory_order_relaxed, std::memory_order_relaxed);
230 }
231 }
232}
233
7c673cae
FG
234int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
235 const char* prefix_len_key2) const {
236 // Internal keys are encoded as length-prefixed strings.
237 Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
238 Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
11fdf7f2 239 return comparator.CompareKeySeq(k1, k2);
7c673cae
FG
240}
241
242int MemTable::KeyComparator::operator()(const char* prefix_len_key,
11fdf7f2 243 const KeyComparator::DecodedType& key)
7c673cae
FG
244 const {
245 // Internal keys are encoded as length-prefixed strings.
246 Slice a = GetLengthPrefixedSlice(prefix_len_key);
11fdf7f2
TL
247 return comparator.CompareKeySeq(a, key);
248}
249
250void MemTableRep::InsertConcurrently(KeyHandle /*handle*/) {
251#ifndef ROCKSDB_LITE
252 throw std::runtime_error("concurrent insert not supported");
253#else
254 abort();
255#endif
7c673cae
FG
256}
257
258Slice MemTableRep::UserKey(const char* key) const {
259 Slice slice = GetLengthPrefixedSlice(key);
260 return Slice(slice.data(), slice.size() - 8);
261}
262
263KeyHandle MemTableRep::Allocate(const size_t len, char** buf) {
264 *buf = allocator_->Allocate(len);
265 return static_cast<KeyHandle>(*buf);
266}
267
268// Encode a suitable internal key target for "target" and return it.
269// Uses *scratch as scratch space, and the returned pointer will point
270// into this scratch space.
271const char* EncodeKey(std::string* scratch, const Slice& target) {
272 scratch->clear();
273 PutVarint32(scratch, static_cast<uint32_t>(target.size()));
274 scratch->append(target.data(), target.size());
275 return scratch->data();
276}
277
278class MemTableIterator : public InternalIterator {
279 public:
280 MemTableIterator(const MemTable& mem, const ReadOptions& read_options,
281 Arena* arena, bool use_range_del_table = false)
282 : bloom_(nullptr),
283 prefix_extractor_(mem.prefix_extractor_),
284 comparator_(mem.comparator_),
285 valid_(false),
286 arena_mode_(arena != nullptr),
11fdf7f2
TL
287 value_pinned_(
288 !mem.GetImmutableMemTableOptions()->inplace_update_support) {
7c673cae
FG
289 if (use_range_del_table) {
290 iter_ = mem.range_del_table_->GetIterator(arena);
f67539c2
TL
291 } else if (prefix_extractor_ != nullptr && !read_options.total_order_seek &&
292 !read_options.auto_prefix_mode) {
293 // Auto prefix mode is not implemented in memtable yet.
494da23a 294 bloom_ = mem.bloom_filter_.get();
7c673cae
FG
295 iter_ = mem.table_->GetDynamicPrefixIterator(arena);
296 } else {
297 iter_ = mem.table_->GetIterator(arena);
298 }
299 }
f67539c2
TL
300 // No copying allowed
301 MemTableIterator(const MemTableIterator&) = delete;
302 void operator=(const MemTableIterator&) = delete;
7c673cae 303
494da23a 304 ~MemTableIterator() override {
7c673cae
FG
305#ifndef NDEBUG
306 // Assert that the MemTableIterator is never deleted while
307 // Pinning is Enabled.
11fdf7f2 308 assert(!pinned_iters_mgr_ || !pinned_iters_mgr_->PinningEnabled());
7c673cae
FG
309#endif
310 if (arena_mode_) {
311 iter_->~Iterator();
312 } else {
313 delete iter_;
314 }
315 }
316
317#ifndef NDEBUG
494da23a 318 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
7c673cae
FG
319 pinned_iters_mgr_ = pinned_iters_mgr;
320 }
321 PinnedIteratorsManager* pinned_iters_mgr_ = nullptr;
322#endif
323
494da23a
TL
324 bool Valid() const override { return valid_; }
325 void Seek(const Slice& k) override {
7c673cae
FG
326 PERF_TIMER_GUARD(seek_on_memtable_time);
327 PERF_COUNTER_ADD(seek_on_memtable_count, 1);
494da23a
TL
328 if (bloom_) {
329 // iterator should only use prefix bloom filter
f67539c2
TL
330 Slice user_k(ExtractUserKey(k));
331 if (prefix_extractor_->InDomain(user_k) &&
332 !bloom_->MayContain(prefix_extractor_->Transform(user_k))) {
7c673cae
FG
333 PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
334 valid_ = false;
335 return;
336 } else {
337 PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
338 }
339 }
340 iter_->Seek(k, nullptr);
341 valid_ = iter_->Valid();
342 }
494da23a 343 void SeekForPrev(const Slice& k) override {
7c673cae
FG
344 PERF_TIMER_GUARD(seek_on_memtable_time);
345 PERF_COUNTER_ADD(seek_on_memtable_count, 1);
494da23a 346 if (bloom_) {
f67539c2
TL
347 Slice user_k(ExtractUserKey(k));
348 if (prefix_extractor_->InDomain(user_k) &&
349 !bloom_->MayContain(prefix_extractor_->Transform(user_k))) {
7c673cae
FG
350 PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
351 valid_ = false;
352 return;
353 } else {
354 PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
355 }
356 }
357 iter_->Seek(k, nullptr);
358 valid_ = iter_->Valid();
359 if (!Valid()) {
360 SeekToLast();
361 }
362 while (Valid() && comparator_.comparator.Compare(k, key()) < 0) {
363 Prev();
364 }
365 }
494da23a 366 void SeekToFirst() override {
7c673cae
FG
367 iter_->SeekToFirst();
368 valid_ = iter_->Valid();
369 }
494da23a 370 void SeekToLast() override {
7c673cae
FG
371 iter_->SeekToLast();
372 valid_ = iter_->Valid();
373 }
494da23a 374 void Next() override {
7c673cae
FG
375 PERF_COUNTER_ADD(next_on_memtable_count, 1);
376 assert(Valid());
377 iter_->Next();
378 valid_ = iter_->Valid();
379 }
494da23a 380 void Prev() override {
7c673cae
FG
381 PERF_COUNTER_ADD(prev_on_memtable_count, 1);
382 assert(Valid());
383 iter_->Prev();
384 valid_ = iter_->Valid();
385 }
494da23a 386 Slice key() const override {
7c673cae
FG
387 assert(Valid());
388 return GetLengthPrefixedSlice(iter_->key());
389 }
494da23a 390 Slice value() const override {
7c673cae
FG
391 assert(Valid());
392 Slice key_slice = GetLengthPrefixedSlice(iter_->key());
393 return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
394 }
395
494da23a 396 Status status() const override { return Status::OK(); }
7c673cae 397
494da23a 398 bool IsKeyPinned() const override {
7c673cae
FG
399 // memtable data is always pinned
400 return true;
401 }
402
494da23a 403 bool IsValuePinned() const override {
7c673cae
FG
404 // memtable value is always pinned, except if we allow inplace update.
405 return value_pinned_;
406 }
407
408 private:
409 DynamicBloom* bloom_;
410 const SliceTransform* const prefix_extractor_;
411 const MemTable::KeyComparator comparator_;
412 MemTableRep::Iterator* iter_;
413 bool valid_;
414 bool arena_mode_;
415 bool value_pinned_;
7c673cae
FG
416};
417
418InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
419 Arena* arena) {
420 assert(arena != nullptr);
421 auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
422 return new (mem) MemTableIterator(*this, read_options, arena);
423}
424
494da23a
TL
425FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
426 const ReadOptions& read_options, SequenceNumber read_seq) {
427 if (read_options.ignore_range_deletions ||
428 is_range_del_table_empty_.load(std::memory_order_relaxed)) {
7c673cae
FG
429 return nullptr;
430 }
494da23a
TL
431 auto* unfragmented_iter = new MemTableIterator(
432 *this, read_options, nullptr /* arena */, true /* use_range_del_table */);
433 if (unfragmented_iter == nullptr) {
434 return nullptr;
435 }
436 auto fragmented_tombstone_list =
437 std::make_shared<FragmentedRangeTombstoneList>(
438 std::unique_ptr<InternalIterator>(unfragmented_iter),
439 comparator_.comparator);
440
441 auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
442 fragmented_tombstone_list, comparator_.comparator, read_seq);
443 return fragmented_iter;
7c673cae
FG
444}
445
446port::RWMutex* MemTable::GetLock(const Slice& key) {
f67539c2 447 return &locks_[fastrange64(GetSliceNPHash64(key), locks_.size())];
7c673cae
FG
448}
449
450MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
451 const Slice& end_ikey) {
452 uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey);
453 entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey);
454 if (entry_count == 0) {
455 return {0, 0};
456 }
457 uint64_t n = num_entries_.load(std::memory_order_relaxed);
458 if (n == 0) {
459 return {0, 0};
460 }
461 if (entry_count > n) {
462 // (range_del_)table_->ApproximateNumEntries() is just an estimate so it can
463 // be larger than actual entries we have. Cap it to entries we have to limit
464 // the inaccuracy.
465 entry_count = n;
466 }
467 uint64_t data_size = data_size_.load(std::memory_order_relaxed);
468 return {entry_count * (data_size / n), entry_count};
469}
470
11fdf7f2 471bool MemTable::Add(SequenceNumber s, ValueType type,
7c673cae
FG
472 const Slice& key, /* user key */
473 const Slice& value, bool allow_concurrent,
f67539c2 474 MemTablePostProcessInfo* post_process_info, void** hint) {
7c673cae
FG
475 // Format of an entry is concatenation of:
476 // key_size : varint32 of internal_key.size()
477 // key bytes : char[internal_key.size()]
478 // value_size : varint32 of value.size()
479 // value bytes : char[value.size()]
480 uint32_t key_size = static_cast<uint32_t>(key.size());
481 uint32_t val_size = static_cast<uint32_t>(value.size());
482 uint32_t internal_key_size = key_size + 8;
483 const uint32_t encoded_len = VarintLength(internal_key_size) +
484 internal_key_size + VarintLength(val_size) +
485 val_size;
486 char* buf = nullptr;
487 std::unique_ptr<MemTableRep>& table =
488 type == kTypeRangeDeletion ? range_del_table_ : table_;
489 KeyHandle handle = table->Allocate(encoded_len, &buf);
490
491 char* p = EncodeVarint32(buf, internal_key_size);
492 memcpy(p, key.data(), key_size);
493 Slice key_slice(p, key_size);
494 p += key_size;
495 uint64_t packed = PackSequenceAndType(s, type);
496 EncodeFixed64(p, packed);
497 p += 8;
498 p = EncodeVarint32(p, val_size);
499 memcpy(p, value.data(), val_size);
500 assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
f67539c2
TL
501 size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
502
7c673cae
FG
503 if (!allow_concurrent) {
504 // Extract prefix for insert with hint.
505 if (insert_with_hint_prefix_extractor_ != nullptr &&
506 insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
507 Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
11fdf7f2
TL
508 bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]);
509 if (UNLIKELY(!res)) {
510 return res;
511 }
7c673cae 512 } else {
11fdf7f2
TL
513 bool res = table->InsertKey(handle);
514 if (UNLIKELY(!res)) {
515 return res;
516 }
7c673cae
FG
517 }
518
519 // this is a bit ugly, but is the way to avoid locked instructions
520 // when incrementing an atomic
521 num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1,
522 std::memory_order_relaxed);
523 data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len,
524 std::memory_order_relaxed);
525 if (type == kTypeDeletion) {
526 num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
527 std::memory_order_relaxed);
528 }
529
f67539c2
TL
530 if (bloom_filter_ && prefix_extractor_ &&
531 prefix_extractor_->InDomain(key)) {
494da23a
TL
532 bloom_filter_->Add(prefix_extractor_->Transform(key));
533 }
534 if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
f67539c2 535 bloom_filter_->Add(StripTimestampFromUserKey(key, ts_sz));
7c673cae
FG
536 }
537
538 // The first sequence number inserted into the memtable
11fdf7f2 539 assert(first_seqno_ == 0 || s >= first_seqno_);
7c673cae
FG
540 if (first_seqno_ == 0) {
541 first_seqno_.store(s, std::memory_order_relaxed);
542
543 if (earliest_seqno_ == kMaxSequenceNumber) {
544 earliest_seqno_.store(GetFirstSequenceNumber(),
545 std::memory_order_relaxed);
546 }
547 assert(first_seqno_.load() >= earliest_seqno_.load());
548 }
549 assert(post_process_info == nullptr);
550 UpdateFlushState();
551 } else {
f67539c2
TL
552 bool res = (hint == nullptr)
553 ? table->InsertKeyConcurrently(handle)
554 : table->InsertKeyWithHintConcurrently(handle, hint);
11fdf7f2
TL
555 if (UNLIKELY(!res)) {
556 return res;
557 }
7c673cae
FG
558
559 assert(post_process_info != nullptr);
560 post_process_info->num_entries++;
561 post_process_info->data_size += encoded_len;
562 if (type == kTypeDeletion) {
563 post_process_info->num_deletes++;
564 }
565
f67539c2
TL
566 if (bloom_filter_ && prefix_extractor_ &&
567 prefix_extractor_->InDomain(key)) {
494da23a
TL
568 bloom_filter_->AddConcurrently(prefix_extractor_->Transform(key));
569 }
570 if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
f67539c2 571 bloom_filter_->AddConcurrently(StripTimestampFromUserKey(key, ts_sz));
7c673cae
FG
572 }
573
574 // atomically update first_seqno_ and earliest_seqno_.
575 uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed);
576 while ((cur_seq_num == 0 || s < cur_seq_num) &&
577 !first_seqno_.compare_exchange_weak(cur_seq_num, s)) {
578 }
579 uint64_t cur_earliest_seqno =
580 earliest_seqno_.load(std::memory_order_relaxed);
581 while (
582 (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) &&
583 !first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
584 }
585 }
494da23a
TL
586 if (type == kTypeRangeDeletion) {
587 is_range_del_table_empty_.store(false, std::memory_order_relaxed);
7c673cae 588 }
11fdf7f2
TL
589 UpdateOldestKeyTime();
590 return true;
7c673cae
FG
591}
592
593// Callback from MemTable::Get()
594namespace {
595
596struct Saver {
597 Status* status;
598 const LookupKey* key;
599 bool* found_final_value; // Is value set correctly? Used by KeyMayExist
600 bool* merge_in_progress;
601 std::string* value;
602 SequenceNumber seq;
603 const MergeOperator* merge_operator;
604 // the merge operations encountered;
605 MergeContext* merge_context;
494da23a 606 SequenceNumber max_covering_tombstone_seq;
7c673cae
FG
607 MemTable* mem;
608 Logger* logger;
609 Statistics* statistics;
610 bool inplace_update_support;
f67539c2 611 bool do_merge;
7c673cae 612 Env* env_;
11fdf7f2
TL
613 ReadCallback* callback_;
614 bool* is_blob_index;
615
616 bool CheckCallback(SequenceNumber _seq) {
617 if (callback_) {
618 return callback_->IsVisible(_seq);
619 }
620 return true;
621 }
7c673cae
FG
622};
623} // namespace
624
625static bool SaveValue(void* arg, const char* entry) {
626 Saver* s = reinterpret_cast<Saver*>(arg);
11fdf7f2 627 assert(s != nullptr);
7c673cae 628 MergeContext* merge_context = s->merge_context;
494da23a 629 SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
7c673cae
FG
630 const MergeOperator* merge_operator = s->merge_operator;
631
494da23a 632 assert(merge_context != nullptr);
7c673cae
FG
633
634 // entry format is:
635 // klength varint32
636 // userkey char[klength-8]
637 // tag uint64
f67539c2 638 // vlength varint32f
7c673cae
FG
639 // value char[vlength]
640 // Check that it belongs to same user key. We do not check the
641 // sequence number since the Seek() call above should have skipped
642 // all entries with overly large sequence numbers.
643 uint32_t key_length;
644 const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
f67539c2
TL
645 Slice user_key_slice = Slice(key_ptr, key_length - 8);
646 if (s->mem->GetInternalKeyComparator()
647 .user_comparator()
648 ->CompareWithoutTimestamp(user_key_slice, s->key->user_key()) == 0) {
7c673cae
FG
649 // Correct user key
650 const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
651 ValueType type;
11fdf7f2
TL
652 SequenceNumber seq;
653 UnPackSequenceAndType(tag, &seq, &type);
654 // If the value is not in the snapshot, skip it
655 if (!s->CheckCallback(seq)) {
656 return true; // to continue to the next seq
657 }
7c673cae 658
11fdf7f2
TL
659 s->seq = seq;
660
661 if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
494da23a 662 max_covering_tombstone_seq > seq) {
7c673cae
FG
663 type = kTypeRangeDeletion;
664 }
665 switch (type) {
11fdf7f2
TL
666 case kTypeBlobIndex:
667 if (s->is_blob_index == nullptr) {
668 ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index.");
669 *(s->status) = Status::NotSupported(
670 "Encounter unsupported blob value. Please open DB with "
f67539c2 671 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
11fdf7f2
TL
672 } else if (*(s->merge_in_progress)) {
673 *(s->status) =
674 Status::NotSupported("Blob DB does not support merge operator.");
675 }
676 if (!s->status->ok()) {
677 *(s->found_final_value) = true;
678 return false;
679 }
494da23a 680 FALLTHROUGH_INTENDED;
7c673cae
FG
681 case kTypeValue: {
682 if (s->inplace_update_support) {
683 s->mem->GetLock(s->key->user_key())->ReadLock();
684 }
685 Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
686 *(s->status) = Status::OK();
687 if (*(s->merge_in_progress)) {
f67539c2
TL
688 if (s->do_merge) {
689 if (s->value != nullptr) {
690 *(s->status) = MergeHelper::TimedFullMerge(
691 merge_operator, s->key->user_key(), &v,
692 merge_context->GetOperands(), s->value, s->logger,
693 s->statistics, s->env_, nullptr /* result_operand */, true);
694 }
695 } else {
696 // Preserve the value with the goal of returning it as part of
697 // raw merge operands to the user
698 merge_context->PushOperand(
699 v, s->inplace_update_support == false /* operand_pinned */);
11fdf7f2 700 }
f67539c2
TL
701 } else if (!s->do_merge) {
702 // Preserve the value with the goal of returning it as part of
703 // raw merge operands to the user
704 merge_context->PushOperand(
705 v, s->inplace_update_support == false /* operand_pinned */);
7c673cae
FG
706 } else if (s->value != nullptr) {
707 s->value->assign(v.data(), v.size());
708 }
709 if (s->inplace_update_support) {
710 s->mem->GetLock(s->key->user_key())->ReadUnlock();
711 }
712 *(s->found_final_value) = true;
11fdf7f2
TL
713 if (s->is_blob_index != nullptr) {
714 *(s->is_blob_index) = (type == kTypeBlobIndex);
715 }
7c673cae
FG
716 return false;
717 }
718 case kTypeDeletion:
719 case kTypeSingleDeletion:
720 case kTypeRangeDeletion: {
721 if (*(s->merge_in_progress)) {
11fdf7f2
TL
722 if (s->value != nullptr) {
723 *(s->status) = MergeHelper::TimedFullMerge(
724 merge_operator, s->key->user_key(), nullptr,
725 merge_context->GetOperands(), s->value, s->logger,
726 s->statistics, s->env_, nullptr /* result_operand */, true);
727 }
7c673cae
FG
728 } else {
729 *(s->status) = Status::NotFound();
730 }
731 *(s->found_final_value) = true;
732 return false;
733 }
734 case kTypeMerge: {
735 if (!merge_operator) {
736 *(s->status) = Status::InvalidArgument(
737 "merge_operator is not properly initialized.");
738 // Normally we continue the loop (return true) when we see a merge
739 // operand. But in case of an error, we should stop the loop
740 // immediately and pretend we have found the value to stop further
741 // seek. Otherwise, the later call will override this error status.
742 *(s->found_final_value) = true;
743 return false;
744 }
745 Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
746 *(s->merge_in_progress) = true;
747 merge_context->PushOperand(
748 v, s->inplace_update_support == false /* operand_pinned */);
f67539c2
TL
749 if (s->do_merge && merge_operator->ShouldMerge(
750 merge_context->GetOperandsDirectionBackward())) {
11fdf7f2
TL
751 *(s->status) = MergeHelper::TimedFullMerge(
752 merge_operator, s->key->user_key(), nullptr,
753 merge_context->GetOperands(), s->value, s->logger, s->statistics,
754 s->env_, nullptr /* result_operand */, true);
755 *(s->found_final_value) = true;
756 return false;
757 }
7c673cae
FG
758 return true;
759 }
760 default:
761 assert(false);
762 return true;
763 }
764 }
765
766 // s->state could be Corrupt, merge or notfound
767 return false;
768}
769
770bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
771 MergeContext* merge_context,
494da23a
TL
772 SequenceNumber* max_covering_tombstone_seq,
773 SequenceNumber* seq, const ReadOptions& read_opts,
f67539c2 774 ReadCallback* callback, bool* is_blob_index, bool do_merge) {
7c673cae
FG
775 // The sequence number is updated synchronously in version_set.h
776 if (IsEmpty()) {
777 // Avoiding recording stats for speed.
778 return false;
779 }
780 PERF_TIMER_GUARD(get_from_memtable_time);
781
494da23a
TL
782 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
783 NewRangeTombstoneIterator(read_opts,
784 GetInternalKeySeqno(key.internal_key())));
785 if (range_del_iter != nullptr) {
786 *max_covering_tombstone_seq =
787 std::max(*max_covering_tombstone_seq,
788 range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()));
11fdf7f2
TL
789 }
790
7c673cae
FG
791 Slice user_key = key.user_key();
792 bool found_final_value = false;
793 bool merge_in_progress = s->IsMergeInProgress();
494da23a 794 bool may_contain = true;
f67539c2 795 size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
494da23a
TL
796 if (bloom_filter_) {
797 // when both memtable_whole_key_filtering and prefix_extractor_ are set,
798 // only do whole key filtering for Get() to save CPU
799 if (moptions_.memtable_whole_key_filtering) {
f67539c2
TL
800 may_contain =
801 bloom_filter_->MayContain(StripTimestampFromUserKey(user_key, ts_sz));
494da23a
TL
802 } else {
803 assert(prefix_extractor_);
804 may_contain =
f67539c2 805 !prefix_extractor_->InDomain(user_key) ||
494da23a
TL
806 bloom_filter_->MayContain(prefix_extractor_->Transform(user_key));
807 }
808 }
f67539c2 809
494da23a 810 if (bloom_filter_ && !may_contain) {
7c673cae
FG
811 // iter is null if prefix bloom says the key does not exist
812 PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
813 *seq = kMaxSequenceNumber;
814 } else {
494da23a 815 if (bloom_filter_) {
7c673cae
FG
816 PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
817 }
f67539c2
TL
818 GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
819 is_blob_index, value, s, merge_context, seq,
820 &found_final_value, &merge_in_progress);
7c673cae
FG
821 }
822
823 // No change to value, since we have not yet found a Put/Delete
824 if (!found_final_value && merge_in_progress) {
825 *s = Status::MergeInProgress();
826 }
827 PERF_COUNTER_ADD(get_from_memtable_count, 1);
828 return found_final_value;
829}
830
f67539c2
TL
831void MemTable::GetFromTable(const LookupKey& key,
832 SequenceNumber max_covering_tombstone_seq,
833 bool do_merge, ReadCallback* callback,
834 bool* is_blob_index, std::string* value, Status* s,
835 MergeContext* merge_context, SequenceNumber* seq,
836 bool* found_final_value, bool* merge_in_progress) {
837 Saver saver;
838 saver.status = s;
839 saver.found_final_value = found_final_value;
840 saver.merge_in_progress = merge_in_progress;
841 saver.key = &key;
842 saver.value = value;
843 saver.seq = kMaxSequenceNumber;
844 saver.mem = this;
845 saver.merge_context = merge_context;
846 saver.max_covering_tombstone_seq = max_covering_tombstone_seq;
847 saver.merge_operator = moptions_.merge_operator;
848 saver.logger = moptions_.info_log;
849 saver.inplace_update_support = moptions_.inplace_update_support;
850 saver.statistics = moptions_.statistics;
851 saver.env_ = env_;
852 saver.callback_ = callback;
853 saver.is_blob_index = is_blob_index;
854 saver.do_merge = do_merge;
855 table_->Get(key, &saver, SaveValue);
856 *seq = saver.seq;
857}
858
859void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
860 ReadCallback* callback, bool* is_blob) {
861 // The sequence number is updated synchronously in version_set.h
862 if (IsEmpty()) {
863 // Avoiding recording stats for speed.
864 return;
865 }
866 PERF_TIMER_GUARD(get_from_memtable_time);
867
868 MultiGetRange temp_range(*range, range->begin(), range->end());
869 if (bloom_filter_) {
870 std::array<Slice*, MultiGetContext::MAX_BATCH_SIZE> keys;
871 std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match = {{true}};
872 autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> prefixes;
873 int num_keys = 0;
874 for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
875 if (!prefix_extractor_) {
876 keys[num_keys++] = &iter->ukey;
877 } else if (prefix_extractor_->InDomain(iter->ukey)) {
878 prefixes.emplace_back(prefix_extractor_->Transform(iter->ukey));
879 keys[num_keys++] = &prefixes.back();
880 }
881 }
882 bloom_filter_->MayContain(num_keys, &keys[0], &may_match[0]);
883 int idx = 0;
884 for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
885 if (prefix_extractor_ && !prefix_extractor_->InDomain(iter->ukey)) {
886 PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
887 continue;
888 }
889 if (!may_match[idx]) {
890 temp_range.SkipKey(iter);
891 PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
892 } else {
893 PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
894 }
895 idx++;
896 }
897 }
898 for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
899 SequenceNumber seq = kMaxSequenceNumber;
900 bool found_final_value{false};
901 bool merge_in_progress = iter->s->IsMergeInProgress();
902 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
903 NewRangeTombstoneIterator(
904 read_options, GetInternalKeySeqno(iter->lkey->internal_key())));
905 if (range_del_iter != nullptr) {
906 iter->max_covering_tombstone_seq = std::max(
907 iter->max_covering_tombstone_seq,
908 range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()));
909 }
910 GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
911 callback, is_blob, iter->value->GetSelf(), iter->s,
912 &(iter->merge_context), &seq, &found_final_value,
913 &merge_in_progress);
914
915 if (!found_final_value && merge_in_progress) {
916 *(iter->s) = Status::MergeInProgress();
917 }
918
919 if (found_final_value) {
920 iter->value->PinSelf();
921 range->MarkKeyDone(iter);
922 RecordTick(moptions_.statistics, MEMTABLE_HIT);
923 }
924 }
925 PERF_COUNTER_ADD(get_from_memtable_count, 1);
926}
927
7c673cae
FG
928void MemTable::Update(SequenceNumber seq,
929 const Slice& key,
930 const Slice& value) {
931 LookupKey lkey(key, seq);
932 Slice mem_key = lkey.memtable_key();
933
934 std::unique_ptr<MemTableRep::Iterator> iter(
935 table_->GetDynamicPrefixIterator());
936 iter->Seek(lkey.internal_key(), mem_key.data());
937
938 if (iter->Valid()) {
939 // entry format is:
940 // key_length varint32
941 // userkey char[klength-8]
942 // tag uint64
943 // vlength varint32
944 // value char[vlength]
945 // Check that it belongs to same user key. We do not check the
946 // sequence number since the Seek() call above should have skipped
947 // all entries with overly large sequence numbers.
948 const char* entry = iter->key();
949 uint32_t key_length = 0;
950 const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
951 if (comparator_.comparator.user_comparator()->Equal(
952 Slice(key_ptr, key_length - 8), lkey.user_key())) {
953 // Correct user key
954 const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
955 ValueType type;
11fdf7f2
TL
956 SequenceNumber existing_seq;
957 UnPackSequenceAndType(tag, &existing_seq, &type);
958 assert(existing_seq != seq);
7c673cae
FG
959 if (type == kTypeValue) {
960 Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
961 uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
962 uint32_t new_size = static_cast<uint32_t>(value.size());
963
964 // Update value, if new value size <= previous value size
965 if (new_size <= prev_size) {
966 char* p =
967 EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size);
968 WriteLock wl(GetLock(lkey.user_key()));
969 memcpy(p, value.data(), value.size());
970 assert((unsigned)((p + value.size()) - entry) ==
971 (unsigned)(VarintLength(key_length) + key_length +
972 VarintLength(value.size()) + value.size()));
11fdf7f2 973 RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
7c673cae
FG
974 return;
975 }
976 }
977 }
978 }
979
980 // key doesn't exist
11fdf7f2
TL
981 bool add_res __attribute__((__unused__));
982 add_res = Add(seq, kTypeValue, key, value);
983 // We already checked unused != seq above. In that case, Add should not fail.
984 assert(add_res);
7c673cae
FG
985}
986
987bool MemTable::UpdateCallback(SequenceNumber seq,
988 const Slice& key,
989 const Slice& delta) {
990 LookupKey lkey(key, seq);
991 Slice memkey = lkey.memtable_key();
992
993 std::unique_ptr<MemTableRep::Iterator> iter(
994 table_->GetDynamicPrefixIterator());
995 iter->Seek(lkey.internal_key(), memkey.data());
996
997 if (iter->Valid()) {
998 // entry format is:
999 // key_length varint32
1000 // userkey char[klength-8]
1001 // tag uint64
1002 // vlength varint32
1003 // value char[vlength]
1004 // Check that it belongs to same user key. We do not check the
1005 // sequence number since the Seek() call above should have skipped
1006 // all entries with overly large sequence numbers.
1007 const char* entry = iter->key();
1008 uint32_t key_length = 0;
1009 const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
1010 if (comparator_.comparator.user_comparator()->Equal(
1011 Slice(key_ptr, key_length - 8), lkey.user_key())) {
1012 // Correct user key
1013 const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
1014 ValueType type;
1015 uint64_t unused;
1016 UnPackSequenceAndType(tag, &unused, &type);
1017 switch (type) {
1018 case kTypeValue: {
1019 Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
1020 uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
1021
1022 char* prev_buffer = const_cast<char*>(prev_value.data());
1023 uint32_t new_prev_size = prev_size;
1024
1025 std::string str_value;
1026 WriteLock wl(GetLock(lkey.user_key()));
1027 auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
1028 delta, &str_value);
1029 if (status == UpdateStatus::UPDATED_INPLACE) {
1030 // Value already updated by callback.
1031 assert(new_prev_size <= prev_size);
1032 if (new_prev_size < prev_size) {
1033 // overwrite the new prev_size
1034 char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
1035 new_prev_size);
1036 if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
1037 // shift the value buffer as well.
1038 memcpy(p, prev_buffer, new_prev_size);
1039 }
1040 }
1041 RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
1042 UpdateFlushState();
1043 return true;
1044 } else if (status == UpdateStatus::UPDATED) {
1045 Add(seq, kTypeValue, key, Slice(str_value));
1046 RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
1047 UpdateFlushState();
1048 return true;
1049 } else if (status == UpdateStatus::UPDATE_FAILED) {
1050 // No action required. Return.
1051 UpdateFlushState();
1052 return true;
1053 }
1054 }
1055 default:
1056 break;
1057 }
1058 }
1059 }
1060 // If the latest value is not kTypeValue
1061 // or key doesn't exist
1062 return false;
1063}
1064
1065size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
1066 Slice memkey = key.memtable_key();
1067
1068 // A total ordered iterator is costly for some memtablerep (prefix aware
1069 // reps). By passing in the user key, we allow efficient iterator creation.
1070 // The iterator only needs to be ordered within the same user key.
1071 std::unique_ptr<MemTableRep::Iterator> iter(
1072 table_->GetDynamicPrefixIterator());
1073 iter->Seek(key.internal_key(), memkey.data());
1074
1075 size_t num_successive_merges = 0;
1076
1077 for (; iter->Valid(); iter->Next()) {
1078 const char* entry = iter->key();
1079 uint32_t key_length = 0;
1080 const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
1081 if (!comparator_.comparator.user_comparator()->Equal(
1082 Slice(iter_key_ptr, key_length - 8), key.user_key())) {
1083 break;
1084 }
1085
1086 const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
1087 ValueType type;
1088 uint64_t unused;
1089 UnPackSequenceAndType(tag, &unused, &type);
1090 if (type != kTypeMerge) {
1091 break;
1092 }
1093
1094 ++num_successive_merges;
1095 }
1096
1097 return num_successive_merges;
1098}
1099
1100void MemTableRep::Get(const LookupKey& k, void* callback_args,
1101 bool (*callback_func)(void* arg, const char* entry)) {
1102 auto iter = GetDynamicPrefixIterator();
1103 for (iter->Seek(k.internal_key(), k.memtable_key().data());
1104 iter->Valid() && callback_func(callback_args, iter->key());
1105 iter->Next()) {
1106 }
1107}
1108
1109void MemTable::RefLogContainingPrepSection(uint64_t log) {
1110 assert(log > 0);
1111 auto cur = min_prep_log_referenced_.load();
1112 while ((log < cur || cur == 0) &&
1113 !min_prep_log_referenced_.compare_exchange_strong(cur, log)) {
1114 cur = min_prep_log_referenced_.load();
1115 }
1116}
1117
1118uint64_t MemTable::GetMinLogContainingPrepSection() {
1119 return min_prep_log_referenced_.load();
1120}
1121
f67539c2 1122} // namespace ROCKSDB_NAMESPACE