]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "db/memtable.h" | |
11 | ||
7c673cae | 12 | #include <algorithm> |
f67539c2 | 13 | #include <array> |
7c673cae | 14 | #include <limits> |
11fdf7f2 | 15 | #include <memory> |
7c673cae FG |
16 | #include "db/dbformat.h" |
17 | #include "db/merge_context.h" | |
18 | #include "db/merge_helper.h" | |
19 | #include "db/pinned_iterators_manager.h" | |
494da23a | 20 | #include "db/range_tombstone_fragmenter.h" |
11fdf7f2 | 21 | #include "db/read_callback.h" |
f67539c2 TL |
22 | #include "memory/arena.h" |
23 | #include "memory/memory_usage.h" | |
7c673cae FG |
24 | #include "monitoring/perf_context_imp.h" |
25 | #include "monitoring/statistics.h" | |
26 | #include "port/port.h" | |
27 | #include "rocksdb/comparator.h" | |
28 | #include "rocksdb/env.h" | |
29 | #include "rocksdb/iterator.h" | |
30 | #include "rocksdb/merge_operator.h" | |
31 | #include "rocksdb/slice_transform.h" | |
32 | #include "rocksdb/write_buffer_manager.h" | |
33 | #include "table/internal_iterator.h" | |
34 | #include "table/iterator_wrapper.h" | |
35 | #include "table/merging_iterator.h" | |
7c673cae FG |
36 | #include "util/autovector.h" |
37 | #include "util/coding.h" | |
7c673cae | 38 | #include "util/mutexlock.h" |
11fdf7f2 | 39 | #include "util/util.h" |
7c673cae | 40 | |
f67539c2 | 41 | namespace ROCKSDB_NAMESPACE { |
7c673cae | 42 | |
11fdf7f2 TL |
43 | ImmutableMemTableOptions::ImmutableMemTableOptions( |
44 | const ImmutableCFOptions& ioptions, | |
45 | const MutableCFOptions& mutable_cf_options) | |
46 | : arena_block_size(mutable_cf_options.arena_block_size), | |
7c673cae FG |
47 | memtable_prefix_bloom_bits( |
48 | static_cast<uint32_t>( | |
49 | static_cast<double>(mutable_cf_options.write_buffer_size) * | |
50 | mutable_cf_options.memtable_prefix_bloom_size_ratio) * | |
51 | 8u), | |
52 | memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size), | |
494da23a TL |
53 | memtable_whole_key_filtering( |
54 | mutable_cf_options.memtable_whole_key_filtering), | |
7c673cae FG |
55 | inplace_update_support(ioptions.inplace_update_support), |
56 | inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks), | |
57 | inplace_callback(ioptions.inplace_callback), | |
58 | max_successive_merges(mutable_cf_options.max_successive_merges), | |
59 | statistics(ioptions.statistics), | |
60 | merge_operator(ioptions.merge_operator), | |
61 | info_log(ioptions.info_log) {} | |
62 | ||
63 | MemTable::MemTable(const InternalKeyComparator& cmp, | |
64 | const ImmutableCFOptions& ioptions, | |
65 | const MutableCFOptions& mutable_cf_options, | |
66 | WriteBufferManager* write_buffer_manager, | |
11fdf7f2 | 67 | SequenceNumber latest_seq, uint32_t column_family_id) |
7c673cae FG |
68 | : comparator_(cmp), |
69 | moptions_(ioptions, mutable_cf_options), | |
70 | refs_(0), | |
71 | kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)), | |
11fdf7f2 | 72 | mem_tracker_(write_buffer_manager), |
494da23a TL |
73 | arena_(moptions_.arena_block_size, |
74 | (write_buffer_manager != nullptr && | |
75 | (write_buffer_manager->enabled() || | |
76 | write_buffer_manager->cost_to_cache())) | |
77 | ? &mem_tracker_ | |
78 | : nullptr, | |
79 | mutable_cf_options.memtable_huge_page_size), | |
7c673cae | 80 | table_(ioptions.memtable_factory->CreateMemTableRep( |
11fdf7f2 TL |
81 | comparator_, &arena_, mutable_cf_options.prefix_extractor.get(), |
82 | ioptions.info_log, column_family_id)), | |
7c673cae | 83 | range_del_table_(SkipListFactory().CreateMemTableRep( |
11fdf7f2 TL |
84 | comparator_, &arena_, nullptr /* transform */, ioptions.info_log, |
85 | column_family_id)), | |
7c673cae FG |
86 | is_range_del_table_empty_(true), |
87 | data_size_(0), | |
88 | num_entries_(0), | |
89 | num_deletes_(0), | |
11fdf7f2 | 90 | write_buffer_size_(mutable_cf_options.write_buffer_size), |
7c673cae FG |
91 | flush_in_progress_(false), |
92 | flush_completed_(false), | |
93 | file_number_(0), | |
94 | first_seqno_(0), | |
95 | earliest_seqno_(latest_seq), | |
96 | creation_seq_(latest_seq), | |
97 | mem_next_logfile_number_(0), | |
98 | min_prep_log_referenced_(0), | |
99 | locks_(moptions_.inplace_update_support | |
100 | ? moptions_.inplace_update_num_locks | |
101 | : 0), | |
11fdf7f2 | 102 | prefix_extractor_(mutable_cf_options.prefix_extractor.get()), |
7c673cae FG |
103 | flush_state_(FLUSH_NOT_REQUESTED), |
104 | env_(ioptions.env), | |
105 | insert_with_hint_prefix_extractor_( | |
11fdf7f2 | 106 | ioptions.memtable_insert_with_hint_prefix_extractor), |
494da23a | 107 | oldest_key_time_(std::numeric_limits<uint64_t>::max()), |
f67539c2 TL |
108 | atomic_flush_seqno_(kMaxSequenceNumber), |
109 | approximate_memory_usage_(0) { | |
7c673cae FG |
110 | UpdateFlushState(); |
111 | // something went wrong if we need to flush before inserting anything | |
112 | assert(!ShouldScheduleFlush()); | |
113 | ||
494da23a TL |
114 | // use bloom_filter_ for both whole key and prefix bloom filter |
115 | if ((prefix_extractor_ || moptions_.memtable_whole_key_filtering) && | |
116 | moptions_.memtable_prefix_bloom_bits > 0) { | |
117 | bloom_filter_.reset( | |
118 | new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits, | |
f67539c2 | 119 | 6 /* hard coded 6 probes */, |
494da23a | 120 | moptions_.memtable_huge_page_size, ioptions.info_log)); |
7c673cae FG |
121 | } |
122 | } | |
123 | ||
11fdf7f2 TL |
124 | MemTable::~MemTable() { |
125 | mem_tracker_.FreeMem(); | |
126 | assert(refs_ == 0); | |
127 | } | |
7c673cae FG |
128 | |
129 | size_t MemTable::ApproximateMemoryUsage() { | |
f67539c2 TL |
130 | autovector<size_t> usages = { |
131 | arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(), | |
132 | range_del_table_->ApproximateMemoryUsage(), | |
133 | ROCKSDB_NAMESPACE::ApproximateMemoryUsage(insert_hints_)}; | |
7c673cae FG |
134 | size_t total_usage = 0; |
135 | for (size_t usage : usages) { | |
136 | // If usage + total_usage >= kMaxSizet, return kMaxSizet. | |
137 | // the following variation is to avoid numeric overflow. | |
138 | if (usage >= port::kMaxSizet - total_usage) { | |
139 | return port::kMaxSizet; | |
140 | } | |
141 | total_usage += usage; | |
142 | } | |
f67539c2 | 143 | approximate_memory_usage_.store(total_usage, std::memory_order_relaxed); |
7c673cae FG |
144 | // otherwise, return the actual usage |
145 | return total_usage; | |
146 | } | |
147 | ||
f67539c2 | 148 | bool MemTable::ShouldFlushNow() { |
11fdf7f2 | 149 | size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed); |
7c673cae FG |
150 | // In a lot of times, we cannot allocate arena blocks that exactly matches the |
151 | // buffer size. Thus we have to decide if we should over-allocate or | |
152 | // under-allocate. | |
153 | // This constant variable can be interpreted as: if we still have more than | |
154 | // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over | |
155 | // allocate one more block. | |
156 | const double kAllowOverAllocationRatio = 0.6; | |
157 | ||
158 | // If arena still have room for new block allocation, we can safely say it | |
159 | // shouldn't flush. | |
160 | auto allocated_memory = table_->ApproximateMemoryUsage() + | |
161 | range_del_table_->ApproximateMemoryUsage() + | |
162 | arena_.MemoryAllocatedBytes(); | |
163 | ||
f67539c2 TL |
164 | approximate_memory_usage_.store(allocated_memory, std::memory_order_relaxed); |
165 | ||
7c673cae FG |
166 | // if we can still allocate one more block without exceeding the |
167 | // over-allocation ratio, then we should not flush. | |
168 | if (allocated_memory + kArenaBlockSize < | |
11fdf7f2 | 169 | write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) { |
7c673cae FG |
170 | return false; |
171 | } | |
172 | ||
11fdf7f2 TL |
173 | // if user keeps adding entries that exceeds write_buffer_size, we need to |
174 | // flush earlier even though we still have much available memory left. | |
175 | if (allocated_memory > | |
176 | write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) { | |
7c673cae FG |
177 | return true; |
178 | } | |
179 | ||
180 | // In this code path, Arena has already allocated its "last block", which | |
181 | // means the total allocatedmemory size is either: | |
182 | // (1) "moderately" over allocated the memory (no more than `0.6 * arena | |
183 | // block size`. Or, | |
184 | // (2) the allocated memory is less than write buffer size, but we'll stop | |
185 | // here since if we allocate a new arena block, we'll over allocate too much | |
186 | // more (half of the arena block size) memory. | |
187 | // | |
188 | // In either case, to avoid over-allocate, the last block will stop allocation | |
189 | // when its usage reaches a certain ratio, which we carefully choose "0.75 | |
190 | // full" as the stop condition because it addresses the following issue with | |
191 | // great simplicity: What if the next inserted entry's size is | |
192 | // bigger than AllocatedAndUnused()? | |
193 | // | |
194 | // The answer is: if the entry size is also bigger than 0.25 * | |
195 | // kArenaBlockSize, a dedicated block will be allocated for it; otherwise | |
196 | // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty | |
197 | // and regular block. In either case, we *overly* over-allocated. | |
198 | // | |
199 | // Therefore, setting the last block to be at most "0.75 full" avoids both | |
200 | // cases. | |
201 | // | |
202 | // NOTE: the average percentage of waste space of this approach can be counted | |
203 | // as: "arena block size * 0.25 / write buffer size". User who specify a small | |
204 | // write buffer size and/or big arena block size may suffer. | |
205 | return arena_.AllocatedAndUnused() < kArenaBlockSize / 4; | |
206 | } | |
207 | ||
208 | void MemTable::UpdateFlushState() { | |
209 | auto state = flush_state_.load(std::memory_order_relaxed); | |
210 | if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) { | |
211 | // ignore CAS failure, because that means somebody else requested | |
212 | // a flush | |
213 | flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED, | |
214 | std::memory_order_relaxed, | |
215 | std::memory_order_relaxed); | |
216 | } | |
217 | } | |
218 | ||
11fdf7f2 TL |
219 | void MemTable::UpdateOldestKeyTime() { |
220 | uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed); | |
221 | if (oldest_key_time == std::numeric_limits<uint64_t>::max()) { | |
222 | int64_t current_time = 0; | |
223 | auto s = env_->GetCurrentTime(¤t_time); | |
224 | if (s.ok()) { | |
225 | assert(current_time >= 0); | |
226 | // If fail, the timestamp is already set. | |
227 | oldest_key_time_.compare_exchange_strong( | |
228 | oldest_key_time, static_cast<uint64_t>(current_time), | |
229 | std::memory_order_relaxed, std::memory_order_relaxed); | |
230 | } | |
231 | } | |
232 | } | |
233 | ||
7c673cae FG |
234 | int MemTable::KeyComparator::operator()(const char* prefix_len_key1, |
235 | const char* prefix_len_key2) const { | |
236 | // Internal keys are encoded as length-prefixed strings. | |
237 | Slice k1 = GetLengthPrefixedSlice(prefix_len_key1); | |
238 | Slice k2 = GetLengthPrefixedSlice(prefix_len_key2); | |
11fdf7f2 | 239 | return comparator.CompareKeySeq(k1, k2); |
7c673cae FG |
240 | } |
241 | ||
242 | int MemTable::KeyComparator::operator()(const char* prefix_len_key, | |
11fdf7f2 | 243 | const KeyComparator::DecodedType& key) |
7c673cae FG |
244 | const { |
245 | // Internal keys are encoded as length-prefixed strings. | |
246 | Slice a = GetLengthPrefixedSlice(prefix_len_key); | |
11fdf7f2 TL |
247 | return comparator.CompareKeySeq(a, key); |
248 | } | |
249 | ||
250 | void MemTableRep::InsertConcurrently(KeyHandle /*handle*/) { | |
251 | #ifndef ROCKSDB_LITE | |
252 | throw std::runtime_error("concurrent insert not supported"); | |
253 | #else | |
254 | abort(); | |
255 | #endif | |
7c673cae FG |
256 | } |
257 | ||
258 | Slice MemTableRep::UserKey(const char* key) const { | |
259 | Slice slice = GetLengthPrefixedSlice(key); | |
260 | return Slice(slice.data(), slice.size() - 8); | |
261 | } | |
262 | ||
263 | KeyHandle MemTableRep::Allocate(const size_t len, char** buf) { | |
264 | *buf = allocator_->Allocate(len); | |
265 | return static_cast<KeyHandle>(*buf); | |
266 | } | |
267 | ||
268 | // Encode a suitable internal key target for "target" and return it. | |
269 | // Uses *scratch as scratch space, and the returned pointer will point | |
270 | // into this scratch space. | |
271 | const char* EncodeKey(std::string* scratch, const Slice& target) { | |
272 | scratch->clear(); | |
273 | PutVarint32(scratch, static_cast<uint32_t>(target.size())); | |
274 | scratch->append(target.data(), target.size()); | |
275 | return scratch->data(); | |
276 | } | |
277 | ||
278 | class MemTableIterator : public InternalIterator { | |
279 | public: | |
280 | MemTableIterator(const MemTable& mem, const ReadOptions& read_options, | |
281 | Arena* arena, bool use_range_del_table = false) | |
282 | : bloom_(nullptr), | |
283 | prefix_extractor_(mem.prefix_extractor_), | |
284 | comparator_(mem.comparator_), | |
285 | valid_(false), | |
286 | arena_mode_(arena != nullptr), | |
11fdf7f2 TL |
287 | value_pinned_( |
288 | !mem.GetImmutableMemTableOptions()->inplace_update_support) { | |
7c673cae FG |
289 | if (use_range_del_table) { |
290 | iter_ = mem.range_del_table_->GetIterator(arena); | |
f67539c2 TL |
291 | } else if (prefix_extractor_ != nullptr && !read_options.total_order_seek && |
292 | !read_options.auto_prefix_mode) { | |
293 | // Auto prefix mode is not implemented in memtable yet. | |
494da23a | 294 | bloom_ = mem.bloom_filter_.get(); |
7c673cae FG |
295 | iter_ = mem.table_->GetDynamicPrefixIterator(arena); |
296 | } else { | |
297 | iter_ = mem.table_->GetIterator(arena); | |
298 | } | |
299 | } | |
f67539c2 TL |
300 | // No copying allowed |
301 | MemTableIterator(const MemTableIterator&) = delete; | |
302 | void operator=(const MemTableIterator&) = delete; | |
7c673cae | 303 | |
494da23a | 304 | ~MemTableIterator() override { |
7c673cae FG |
305 | #ifndef NDEBUG |
306 | // Assert that the MemTableIterator is never deleted while | |
307 | // Pinning is Enabled. | |
11fdf7f2 | 308 | assert(!pinned_iters_mgr_ || !pinned_iters_mgr_->PinningEnabled()); |
7c673cae FG |
309 | #endif |
310 | if (arena_mode_) { | |
311 | iter_->~Iterator(); | |
312 | } else { | |
313 | delete iter_; | |
314 | } | |
315 | } | |
316 | ||
317 | #ifndef NDEBUG | |
494da23a | 318 | void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { |
7c673cae FG |
319 | pinned_iters_mgr_ = pinned_iters_mgr; |
320 | } | |
321 | PinnedIteratorsManager* pinned_iters_mgr_ = nullptr; | |
322 | #endif | |
323 | ||
494da23a TL |
324 | bool Valid() const override { return valid_; } |
325 | void Seek(const Slice& k) override { | |
7c673cae FG |
326 | PERF_TIMER_GUARD(seek_on_memtable_time); |
327 | PERF_COUNTER_ADD(seek_on_memtable_count, 1); | |
494da23a TL |
328 | if (bloom_) { |
329 | // iterator should only use prefix bloom filter | |
f67539c2 TL |
330 | Slice user_k(ExtractUserKey(k)); |
331 | if (prefix_extractor_->InDomain(user_k) && | |
332 | !bloom_->MayContain(prefix_extractor_->Transform(user_k))) { | |
7c673cae FG |
333 | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); |
334 | valid_ = false; | |
335 | return; | |
336 | } else { | |
337 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); | |
338 | } | |
339 | } | |
340 | iter_->Seek(k, nullptr); | |
341 | valid_ = iter_->Valid(); | |
342 | } | |
494da23a | 343 | void SeekForPrev(const Slice& k) override { |
7c673cae FG |
344 | PERF_TIMER_GUARD(seek_on_memtable_time); |
345 | PERF_COUNTER_ADD(seek_on_memtable_count, 1); | |
494da23a | 346 | if (bloom_) { |
f67539c2 TL |
347 | Slice user_k(ExtractUserKey(k)); |
348 | if (prefix_extractor_->InDomain(user_k) && | |
349 | !bloom_->MayContain(prefix_extractor_->Transform(user_k))) { | |
7c673cae FG |
350 | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); |
351 | valid_ = false; | |
352 | return; | |
353 | } else { | |
354 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); | |
355 | } | |
356 | } | |
357 | iter_->Seek(k, nullptr); | |
358 | valid_ = iter_->Valid(); | |
359 | if (!Valid()) { | |
360 | SeekToLast(); | |
361 | } | |
362 | while (Valid() && comparator_.comparator.Compare(k, key()) < 0) { | |
363 | Prev(); | |
364 | } | |
365 | } | |
494da23a | 366 | void SeekToFirst() override { |
7c673cae FG |
367 | iter_->SeekToFirst(); |
368 | valid_ = iter_->Valid(); | |
369 | } | |
494da23a | 370 | void SeekToLast() override { |
7c673cae FG |
371 | iter_->SeekToLast(); |
372 | valid_ = iter_->Valid(); | |
373 | } | |
494da23a | 374 | void Next() override { |
7c673cae FG |
375 | PERF_COUNTER_ADD(next_on_memtable_count, 1); |
376 | assert(Valid()); | |
377 | iter_->Next(); | |
378 | valid_ = iter_->Valid(); | |
379 | } | |
494da23a | 380 | void Prev() override { |
7c673cae FG |
381 | PERF_COUNTER_ADD(prev_on_memtable_count, 1); |
382 | assert(Valid()); | |
383 | iter_->Prev(); | |
384 | valid_ = iter_->Valid(); | |
385 | } | |
494da23a | 386 | Slice key() const override { |
7c673cae FG |
387 | assert(Valid()); |
388 | return GetLengthPrefixedSlice(iter_->key()); | |
389 | } | |
494da23a | 390 | Slice value() const override { |
7c673cae FG |
391 | assert(Valid()); |
392 | Slice key_slice = GetLengthPrefixedSlice(iter_->key()); | |
393 | return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); | |
394 | } | |
395 | ||
494da23a | 396 | Status status() const override { return Status::OK(); } |
7c673cae | 397 | |
494da23a | 398 | bool IsKeyPinned() const override { |
7c673cae FG |
399 | // memtable data is always pinned |
400 | return true; | |
401 | } | |
402 | ||
494da23a | 403 | bool IsValuePinned() const override { |
7c673cae FG |
404 | // memtable value is always pinned, except if we allow inplace update. |
405 | return value_pinned_; | |
406 | } | |
407 | ||
408 | private: | |
409 | DynamicBloom* bloom_; | |
410 | const SliceTransform* const prefix_extractor_; | |
411 | const MemTable::KeyComparator comparator_; | |
412 | MemTableRep::Iterator* iter_; | |
413 | bool valid_; | |
414 | bool arena_mode_; | |
415 | bool value_pinned_; | |
7c673cae FG |
416 | }; |
417 | ||
418 | InternalIterator* MemTable::NewIterator(const ReadOptions& read_options, | |
419 | Arena* arena) { | |
420 | assert(arena != nullptr); | |
421 | auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); | |
422 | return new (mem) MemTableIterator(*this, read_options, arena); | |
423 | } | |
424 | ||
494da23a TL |
425 | FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator( |
426 | const ReadOptions& read_options, SequenceNumber read_seq) { | |
427 | if (read_options.ignore_range_deletions || | |
428 | is_range_del_table_empty_.load(std::memory_order_relaxed)) { | |
7c673cae FG |
429 | return nullptr; |
430 | } | |
494da23a TL |
431 | auto* unfragmented_iter = new MemTableIterator( |
432 | *this, read_options, nullptr /* arena */, true /* use_range_del_table */); | |
433 | if (unfragmented_iter == nullptr) { | |
434 | return nullptr; | |
435 | } | |
436 | auto fragmented_tombstone_list = | |
437 | std::make_shared<FragmentedRangeTombstoneList>( | |
438 | std::unique_ptr<InternalIterator>(unfragmented_iter), | |
439 | comparator_.comparator); | |
440 | ||
441 | auto* fragmented_iter = new FragmentedRangeTombstoneIterator( | |
442 | fragmented_tombstone_list, comparator_.comparator, read_seq); | |
443 | return fragmented_iter; | |
7c673cae FG |
444 | } |
445 | ||
446 | port::RWMutex* MemTable::GetLock(const Slice& key) { | |
f67539c2 | 447 | return &locks_[fastrange64(GetSliceNPHash64(key), locks_.size())]; |
7c673cae FG |
448 | } |
449 | ||
450 | MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey, | |
451 | const Slice& end_ikey) { | |
452 | uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey); | |
453 | entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey); | |
454 | if (entry_count == 0) { | |
455 | return {0, 0}; | |
456 | } | |
457 | uint64_t n = num_entries_.load(std::memory_order_relaxed); | |
458 | if (n == 0) { | |
459 | return {0, 0}; | |
460 | } | |
461 | if (entry_count > n) { | |
462 | // (range_del_)table_->ApproximateNumEntries() is just an estimate so it can | |
463 | // be larger than actual entries we have. Cap it to entries we have to limit | |
464 | // the inaccuracy. | |
465 | entry_count = n; | |
466 | } | |
467 | uint64_t data_size = data_size_.load(std::memory_order_relaxed); | |
468 | return {entry_count * (data_size / n), entry_count}; | |
469 | } | |
470 | ||
11fdf7f2 | 471 | bool MemTable::Add(SequenceNumber s, ValueType type, |
7c673cae FG |
472 | const Slice& key, /* user key */ |
473 | const Slice& value, bool allow_concurrent, | |
f67539c2 | 474 | MemTablePostProcessInfo* post_process_info, void** hint) { |
7c673cae FG |
475 | // Format of an entry is concatenation of: |
476 | // key_size : varint32 of internal_key.size() | |
477 | // key bytes : char[internal_key.size()] | |
478 | // value_size : varint32 of value.size() | |
479 | // value bytes : char[value.size()] | |
480 | uint32_t key_size = static_cast<uint32_t>(key.size()); | |
481 | uint32_t val_size = static_cast<uint32_t>(value.size()); | |
482 | uint32_t internal_key_size = key_size + 8; | |
483 | const uint32_t encoded_len = VarintLength(internal_key_size) + | |
484 | internal_key_size + VarintLength(val_size) + | |
485 | val_size; | |
486 | char* buf = nullptr; | |
487 | std::unique_ptr<MemTableRep>& table = | |
488 | type == kTypeRangeDeletion ? range_del_table_ : table_; | |
489 | KeyHandle handle = table->Allocate(encoded_len, &buf); | |
490 | ||
491 | char* p = EncodeVarint32(buf, internal_key_size); | |
492 | memcpy(p, key.data(), key_size); | |
493 | Slice key_slice(p, key_size); | |
494 | p += key_size; | |
495 | uint64_t packed = PackSequenceAndType(s, type); | |
496 | EncodeFixed64(p, packed); | |
497 | p += 8; | |
498 | p = EncodeVarint32(p, val_size); | |
499 | memcpy(p, value.data(), val_size); | |
500 | assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len); | |
f67539c2 TL |
501 | size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size(); |
502 | ||
7c673cae FG |
503 | if (!allow_concurrent) { |
504 | // Extract prefix for insert with hint. | |
505 | if (insert_with_hint_prefix_extractor_ != nullptr && | |
506 | insert_with_hint_prefix_extractor_->InDomain(key_slice)) { | |
507 | Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice); | |
11fdf7f2 TL |
508 | bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]); |
509 | if (UNLIKELY(!res)) { | |
510 | return res; | |
511 | } | |
7c673cae | 512 | } else { |
11fdf7f2 TL |
513 | bool res = table->InsertKey(handle); |
514 | if (UNLIKELY(!res)) { | |
515 | return res; | |
516 | } | |
7c673cae FG |
517 | } |
518 | ||
519 | // this is a bit ugly, but is the way to avoid locked instructions | |
520 | // when incrementing an atomic | |
521 | num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1, | |
522 | std::memory_order_relaxed); | |
523 | data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len, | |
524 | std::memory_order_relaxed); | |
525 | if (type == kTypeDeletion) { | |
526 | num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1, | |
527 | std::memory_order_relaxed); | |
528 | } | |
529 | ||
f67539c2 TL |
530 | if (bloom_filter_ && prefix_extractor_ && |
531 | prefix_extractor_->InDomain(key)) { | |
494da23a TL |
532 | bloom_filter_->Add(prefix_extractor_->Transform(key)); |
533 | } | |
534 | if (bloom_filter_ && moptions_.memtable_whole_key_filtering) { | |
f67539c2 | 535 | bloom_filter_->Add(StripTimestampFromUserKey(key, ts_sz)); |
7c673cae FG |
536 | } |
537 | ||
538 | // The first sequence number inserted into the memtable | |
11fdf7f2 | 539 | assert(first_seqno_ == 0 || s >= first_seqno_); |
7c673cae FG |
540 | if (first_seqno_ == 0) { |
541 | first_seqno_.store(s, std::memory_order_relaxed); | |
542 | ||
543 | if (earliest_seqno_ == kMaxSequenceNumber) { | |
544 | earliest_seqno_.store(GetFirstSequenceNumber(), | |
545 | std::memory_order_relaxed); | |
546 | } | |
547 | assert(first_seqno_.load() >= earliest_seqno_.load()); | |
548 | } | |
549 | assert(post_process_info == nullptr); | |
550 | UpdateFlushState(); | |
551 | } else { | |
f67539c2 TL |
552 | bool res = (hint == nullptr) |
553 | ? table->InsertKeyConcurrently(handle) | |
554 | : table->InsertKeyWithHintConcurrently(handle, hint); | |
11fdf7f2 TL |
555 | if (UNLIKELY(!res)) { |
556 | return res; | |
557 | } | |
7c673cae FG |
558 | |
559 | assert(post_process_info != nullptr); | |
560 | post_process_info->num_entries++; | |
561 | post_process_info->data_size += encoded_len; | |
562 | if (type == kTypeDeletion) { | |
563 | post_process_info->num_deletes++; | |
564 | } | |
565 | ||
f67539c2 TL |
566 | if (bloom_filter_ && prefix_extractor_ && |
567 | prefix_extractor_->InDomain(key)) { | |
494da23a TL |
568 | bloom_filter_->AddConcurrently(prefix_extractor_->Transform(key)); |
569 | } | |
570 | if (bloom_filter_ && moptions_.memtable_whole_key_filtering) { | |
f67539c2 | 571 | bloom_filter_->AddConcurrently(StripTimestampFromUserKey(key, ts_sz)); |
7c673cae FG |
572 | } |
573 | ||
574 | // atomically update first_seqno_ and earliest_seqno_. | |
575 | uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed); | |
576 | while ((cur_seq_num == 0 || s < cur_seq_num) && | |
577 | !first_seqno_.compare_exchange_weak(cur_seq_num, s)) { | |
578 | } | |
579 | uint64_t cur_earliest_seqno = | |
580 | earliest_seqno_.load(std::memory_order_relaxed); | |
581 | while ( | |
582 | (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) && | |
583 | !first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) { | |
584 | } | |
585 | } | |
494da23a TL |
586 | if (type == kTypeRangeDeletion) { |
587 | is_range_del_table_empty_.store(false, std::memory_order_relaxed); | |
7c673cae | 588 | } |
11fdf7f2 TL |
589 | UpdateOldestKeyTime(); |
590 | return true; | |
7c673cae FG |
591 | } |
592 | ||
593 | // Callback from MemTable::Get() | |
594 | namespace { | |
595 | ||
596 | struct Saver { | |
597 | Status* status; | |
598 | const LookupKey* key; | |
599 | bool* found_final_value; // Is value set correctly? Used by KeyMayExist | |
600 | bool* merge_in_progress; | |
601 | std::string* value; | |
602 | SequenceNumber seq; | |
603 | const MergeOperator* merge_operator; | |
604 | // the merge operations encountered; | |
605 | MergeContext* merge_context; | |
494da23a | 606 | SequenceNumber max_covering_tombstone_seq; |
7c673cae FG |
607 | MemTable* mem; |
608 | Logger* logger; | |
609 | Statistics* statistics; | |
610 | bool inplace_update_support; | |
f67539c2 | 611 | bool do_merge; |
7c673cae | 612 | Env* env_; |
11fdf7f2 TL |
613 | ReadCallback* callback_; |
614 | bool* is_blob_index; | |
615 | ||
616 | bool CheckCallback(SequenceNumber _seq) { | |
617 | if (callback_) { | |
618 | return callback_->IsVisible(_seq); | |
619 | } | |
620 | return true; | |
621 | } | |
7c673cae FG |
622 | }; |
623 | } // namespace | |
624 | ||
625 | static bool SaveValue(void* arg, const char* entry) { | |
626 | Saver* s = reinterpret_cast<Saver*>(arg); | |
11fdf7f2 | 627 | assert(s != nullptr); |
7c673cae | 628 | MergeContext* merge_context = s->merge_context; |
494da23a | 629 | SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq; |
7c673cae FG |
630 | const MergeOperator* merge_operator = s->merge_operator; |
631 | ||
494da23a | 632 | assert(merge_context != nullptr); |
7c673cae FG |
633 | |
634 | // entry format is: | |
635 | // klength varint32 | |
636 | // userkey char[klength-8] | |
637 | // tag uint64 | |
f67539c2 | 638 | // vlength varint32f |
7c673cae FG |
639 | // value char[vlength] |
640 | // Check that it belongs to same user key. We do not check the | |
641 | // sequence number since the Seek() call above should have skipped | |
642 | // all entries with overly large sequence numbers. | |
643 | uint32_t key_length; | |
644 | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); | |
f67539c2 TL |
645 | Slice user_key_slice = Slice(key_ptr, key_length - 8); |
646 | if (s->mem->GetInternalKeyComparator() | |
647 | .user_comparator() | |
648 | ->CompareWithoutTimestamp(user_key_slice, s->key->user_key()) == 0) { | |
7c673cae FG |
649 | // Correct user key |
650 | const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); | |
651 | ValueType type; | |
11fdf7f2 TL |
652 | SequenceNumber seq; |
653 | UnPackSequenceAndType(tag, &seq, &type); | |
654 | // If the value is not in the snapshot, skip it | |
655 | if (!s->CheckCallback(seq)) { | |
656 | return true; // to continue to the next seq | |
657 | } | |
7c673cae | 658 | |
11fdf7f2 TL |
659 | s->seq = seq; |
660 | ||
661 | if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && | |
494da23a | 662 | max_covering_tombstone_seq > seq) { |
7c673cae FG |
663 | type = kTypeRangeDeletion; |
664 | } | |
665 | switch (type) { | |
11fdf7f2 TL |
666 | case kTypeBlobIndex: |
667 | if (s->is_blob_index == nullptr) { | |
668 | ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index."); | |
669 | *(s->status) = Status::NotSupported( | |
670 | "Encounter unsupported blob value. Please open DB with " | |
f67539c2 | 671 | "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); |
11fdf7f2 TL |
672 | } else if (*(s->merge_in_progress)) { |
673 | *(s->status) = | |
674 | Status::NotSupported("Blob DB does not support merge operator."); | |
675 | } | |
676 | if (!s->status->ok()) { | |
677 | *(s->found_final_value) = true; | |
678 | return false; | |
679 | } | |
494da23a | 680 | FALLTHROUGH_INTENDED; |
7c673cae FG |
681 | case kTypeValue: { |
682 | if (s->inplace_update_support) { | |
683 | s->mem->GetLock(s->key->user_key())->ReadLock(); | |
684 | } | |
685 | Slice v = GetLengthPrefixedSlice(key_ptr + key_length); | |
686 | *(s->status) = Status::OK(); | |
687 | if (*(s->merge_in_progress)) { | |
f67539c2 TL |
688 | if (s->do_merge) { |
689 | if (s->value != nullptr) { | |
690 | *(s->status) = MergeHelper::TimedFullMerge( | |
691 | merge_operator, s->key->user_key(), &v, | |
692 | merge_context->GetOperands(), s->value, s->logger, | |
693 | s->statistics, s->env_, nullptr /* result_operand */, true); | |
694 | } | |
695 | } else { | |
696 | // Preserve the value with the goal of returning it as part of | |
697 | // raw merge operands to the user | |
698 | merge_context->PushOperand( | |
699 | v, s->inplace_update_support == false /* operand_pinned */); | |
11fdf7f2 | 700 | } |
f67539c2 TL |
701 | } else if (!s->do_merge) { |
702 | // Preserve the value with the goal of returning it as part of | |
703 | // raw merge operands to the user | |
704 | merge_context->PushOperand( | |
705 | v, s->inplace_update_support == false /* operand_pinned */); | |
7c673cae FG |
706 | } else if (s->value != nullptr) { |
707 | s->value->assign(v.data(), v.size()); | |
708 | } | |
709 | if (s->inplace_update_support) { | |
710 | s->mem->GetLock(s->key->user_key())->ReadUnlock(); | |
711 | } | |
712 | *(s->found_final_value) = true; | |
11fdf7f2 TL |
713 | if (s->is_blob_index != nullptr) { |
714 | *(s->is_blob_index) = (type == kTypeBlobIndex); | |
715 | } | |
7c673cae FG |
716 | return false; |
717 | } | |
718 | case kTypeDeletion: | |
719 | case kTypeSingleDeletion: | |
720 | case kTypeRangeDeletion: { | |
721 | if (*(s->merge_in_progress)) { | |
11fdf7f2 TL |
722 | if (s->value != nullptr) { |
723 | *(s->status) = MergeHelper::TimedFullMerge( | |
724 | merge_operator, s->key->user_key(), nullptr, | |
725 | merge_context->GetOperands(), s->value, s->logger, | |
726 | s->statistics, s->env_, nullptr /* result_operand */, true); | |
727 | } | |
7c673cae FG |
728 | } else { |
729 | *(s->status) = Status::NotFound(); | |
730 | } | |
731 | *(s->found_final_value) = true; | |
732 | return false; | |
733 | } | |
734 | case kTypeMerge: { | |
735 | if (!merge_operator) { | |
736 | *(s->status) = Status::InvalidArgument( | |
737 | "merge_operator is not properly initialized."); | |
738 | // Normally we continue the loop (return true) when we see a merge | |
739 | // operand. But in case of an error, we should stop the loop | |
740 | // immediately and pretend we have found the value to stop further | |
741 | // seek. Otherwise, the later call will override this error status. | |
742 | *(s->found_final_value) = true; | |
743 | return false; | |
744 | } | |
745 | Slice v = GetLengthPrefixedSlice(key_ptr + key_length); | |
746 | *(s->merge_in_progress) = true; | |
747 | merge_context->PushOperand( | |
748 | v, s->inplace_update_support == false /* operand_pinned */); | |
f67539c2 TL |
749 | if (s->do_merge && merge_operator->ShouldMerge( |
750 | merge_context->GetOperandsDirectionBackward())) { | |
11fdf7f2 TL |
751 | *(s->status) = MergeHelper::TimedFullMerge( |
752 | merge_operator, s->key->user_key(), nullptr, | |
753 | merge_context->GetOperands(), s->value, s->logger, s->statistics, | |
754 | s->env_, nullptr /* result_operand */, true); | |
755 | *(s->found_final_value) = true; | |
756 | return false; | |
757 | } | |
7c673cae FG |
758 | return true; |
759 | } | |
760 | default: | |
761 | assert(false); | |
762 | return true; | |
763 | } | |
764 | } | |
765 | ||
766 | // s->state could be Corrupt, merge or notfound | |
767 | return false; | |
768 | } | |
769 | ||
770 | bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, | |
771 | MergeContext* merge_context, | |
494da23a TL |
772 | SequenceNumber* max_covering_tombstone_seq, |
773 | SequenceNumber* seq, const ReadOptions& read_opts, | |
f67539c2 | 774 | ReadCallback* callback, bool* is_blob_index, bool do_merge) { |
7c673cae FG |
775 | // The sequence number is updated synchronously in version_set.h |
776 | if (IsEmpty()) { | |
777 | // Avoiding recording stats for speed. | |
778 | return false; | |
779 | } | |
780 | PERF_TIMER_GUARD(get_from_memtable_time); | |
781 | ||
494da23a TL |
782 | std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter( |
783 | NewRangeTombstoneIterator(read_opts, | |
784 | GetInternalKeySeqno(key.internal_key()))); | |
785 | if (range_del_iter != nullptr) { | |
786 | *max_covering_tombstone_seq = | |
787 | std::max(*max_covering_tombstone_seq, | |
788 | range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key())); | |
11fdf7f2 TL |
789 | } |
790 | ||
7c673cae FG |
791 | Slice user_key = key.user_key(); |
792 | bool found_final_value = false; | |
793 | bool merge_in_progress = s->IsMergeInProgress(); | |
494da23a | 794 | bool may_contain = true; |
f67539c2 | 795 | size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size(); |
494da23a TL |
796 | if (bloom_filter_) { |
797 | // when both memtable_whole_key_filtering and prefix_extractor_ are set, | |
798 | // only do whole key filtering for Get() to save CPU | |
799 | if (moptions_.memtable_whole_key_filtering) { | |
f67539c2 TL |
800 | may_contain = |
801 | bloom_filter_->MayContain(StripTimestampFromUserKey(user_key, ts_sz)); | |
494da23a TL |
802 | } else { |
803 | assert(prefix_extractor_); | |
804 | may_contain = | |
f67539c2 | 805 | !prefix_extractor_->InDomain(user_key) || |
494da23a TL |
806 | bloom_filter_->MayContain(prefix_extractor_->Transform(user_key)); |
807 | } | |
808 | } | |
f67539c2 | 809 | |
494da23a | 810 | if (bloom_filter_ && !may_contain) { |
7c673cae FG |
811 | // iter is null if prefix bloom says the key does not exist |
812 | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); | |
813 | *seq = kMaxSequenceNumber; | |
814 | } else { | |
494da23a | 815 | if (bloom_filter_) { |
7c673cae FG |
816 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); |
817 | } | |
f67539c2 TL |
818 | GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback, |
819 | is_blob_index, value, s, merge_context, seq, | |
820 | &found_final_value, &merge_in_progress); | |
7c673cae FG |
821 | } |
822 | ||
823 | // No change to value, since we have not yet found a Put/Delete | |
824 | if (!found_final_value && merge_in_progress) { | |
825 | *s = Status::MergeInProgress(); | |
826 | } | |
827 | PERF_COUNTER_ADD(get_from_memtable_count, 1); | |
828 | return found_final_value; | |
829 | } | |
830 | ||
f67539c2 TL |
831 | void MemTable::GetFromTable(const LookupKey& key, |
832 | SequenceNumber max_covering_tombstone_seq, | |
833 | bool do_merge, ReadCallback* callback, | |
834 | bool* is_blob_index, std::string* value, Status* s, | |
835 | MergeContext* merge_context, SequenceNumber* seq, | |
836 | bool* found_final_value, bool* merge_in_progress) { | |
837 | Saver saver; | |
838 | saver.status = s; | |
839 | saver.found_final_value = found_final_value; | |
840 | saver.merge_in_progress = merge_in_progress; | |
841 | saver.key = &key; | |
842 | saver.value = value; | |
843 | saver.seq = kMaxSequenceNumber; | |
844 | saver.mem = this; | |
845 | saver.merge_context = merge_context; | |
846 | saver.max_covering_tombstone_seq = max_covering_tombstone_seq; | |
847 | saver.merge_operator = moptions_.merge_operator; | |
848 | saver.logger = moptions_.info_log; | |
849 | saver.inplace_update_support = moptions_.inplace_update_support; | |
850 | saver.statistics = moptions_.statistics; | |
851 | saver.env_ = env_; | |
852 | saver.callback_ = callback; | |
853 | saver.is_blob_index = is_blob_index; | |
854 | saver.do_merge = do_merge; | |
855 | table_->Get(key, &saver, SaveValue); | |
856 | *seq = saver.seq; | |
857 | } | |
858 | ||
859 | void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, | |
860 | ReadCallback* callback, bool* is_blob) { | |
861 | // The sequence number is updated synchronously in version_set.h | |
862 | if (IsEmpty()) { | |
863 | // Avoiding recording stats for speed. | |
864 | return; | |
865 | } | |
866 | PERF_TIMER_GUARD(get_from_memtable_time); | |
867 | ||
868 | MultiGetRange temp_range(*range, range->begin(), range->end()); | |
869 | if (bloom_filter_) { | |
870 | std::array<Slice*, MultiGetContext::MAX_BATCH_SIZE> keys; | |
871 | std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match = {{true}}; | |
872 | autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> prefixes; | |
873 | int num_keys = 0; | |
874 | for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) { | |
875 | if (!prefix_extractor_) { | |
876 | keys[num_keys++] = &iter->ukey; | |
877 | } else if (prefix_extractor_->InDomain(iter->ukey)) { | |
878 | prefixes.emplace_back(prefix_extractor_->Transform(iter->ukey)); | |
879 | keys[num_keys++] = &prefixes.back(); | |
880 | } | |
881 | } | |
882 | bloom_filter_->MayContain(num_keys, &keys[0], &may_match[0]); | |
883 | int idx = 0; | |
884 | for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) { | |
885 | if (prefix_extractor_ && !prefix_extractor_->InDomain(iter->ukey)) { | |
886 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); | |
887 | continue; | |
888 | } | |
889 | if (!may_match[idx]) { | |
890 | temp_range.SkipKey(iter); | |
891 | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); | |
892 | } else { | |
893 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); | |
894 | } | |
895 | idx++; | |
896 | } | |
897 | } | |
898 | for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) { | |
899 | SequenceNumber seq = kMaxSequenceNumber; | |
900 | bool found_final_value{false}; | |
901 | bool merge_in_progress = iter->s->IsMergeInProgress(); | |
902 | std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter( | |
903 | NewRangeTombstoneIterator( | |
904 | read_options, GetInternalKeySeqno(iter->lkey->internal_key()))); | |
905 | if (range_del_iter != nullptr) { | |
906 | iter->max_covering_tombstone_seq = std::max( | |
907 | iter->max_covering_tombstone_seq, | |
908 | range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key())); | |
909 | } | |
910 | GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, | |
911 | callback, is_blob, iter->value->GetSelf(), iter->s, | |
912 | &(iter->merge_context), &seq, &found_final_value, | |
913 | &merge_in_progress); | |
914 | ||
915 | if (!found_final_value && merge_in_progress) { | |
916 | *(iter->s) = Status::MergeInProgress(); | |
917 | } | |
918 | ||
919 | if (found_final_value) { | |
920 | iter->value->PinSelf(); | |
921 | range->MarkKeyDone(iter); | |
922 | RecordTick(moptions_.statistics, MEMTABLE_HIT); | |
923 | } | |
924 | } | |
925 | PERF_COUNTER_ADD(get_from_memtable_count, 1); | |
926 | } | |
927 | ||
7c673cae FG |
928 | void MemTable::Update(SequenceNumber seq, |
929 | const Slice& key, | |
930 | const Slice& value) { | |
931 | LookupKey lkey(key, seq); | |
932 | Slice mem_key = lkey.memtable_key(); | |
933 | ||
934 | std::unique_ptr<MemTableRep::Iterator> iter( | |
935 | table_->GetDynamicPrefixIterator()); | |
936 | iter->Seek(lkey.internal_key(), mem_key.data()); | |
937 | ||
938 | if (iter->Valid()) { | |
939 | // entry format is: | |
940 | // key_length varint32 | |
941 | // userkey char[klength-8] | |
942 | // tag uint64 | |
943 | // vlength varint32 | |
944 | // value char[vlength] | |
945 | // Check that it belongs to same user key. We do not check the | |
946 | // sequence number since the Seek() call above should have skipped | |
947 | // all entries with overly large sequence numbers. | |
948 | const char* entry = iter->key(); | |
949 | uint32_t key_length = 0; | |
950 | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); | |
951 | if (comparator_.comparator.user_comparator()->Equal( | |
952 | Slice(key_ptr, key_length - 8), lkey.user_key())) { | |
953 | // Correct user key | |
954 | const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); | |
955 | ValueType type; | |
11fdf7f2 TL |
956 | SequenceNumber existing_seq; |
957 | UnPackSequenceAndType(tag, &existing_seq, &type); | |
958 | assert(existing_seq != seq); | |
7c673cae FG |
959 | if (type == kTypeValue) { |
960 | Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); | |
961 | uint32_t prev_size = static_cast<uint32_t>(prev_value.size()); | |
962 | uint32_t new_size = static_cast<uint32_t>(value.size()); | |
963 | ||
964 | // Update value, if new value size <= previous value size | |
965 | if (new_size <= prev_size) { | |
966 | char* p = | |
967 | EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size); | |
968 | WriteLock wl(GetLock(lkey.user_key())); | |
969 | memcpy(p, value.data(), value.size()); | |
970 | assert((unsigned)((p + value.size()) - entry) == | |
971 | (unsigned)(VarintLength(key_length) + key_length + | |
972 | VarintLength(value.size()) + value.size())); | |
11fdf7f2 | 973 | RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); |
7c673cae FG |
974 | return; |
975 | } | |
976 | } | |
977 | } | |
978 | } | |
979 | ||
980 | // key doesn't exist | |
11fdf7f2 TL |
981 | bool add_res __attribute__((__unused__)); |
982 | add_res = Add(seq, kTypeValue, key, value); | |
983 | // We already checked unused != seq above. In that case, Add should not fail. | |
984 | assert(add_res); | |
7c673cae FG |
985 | } |
986 | ||
987 | bool MemTable::UpdateCallback(SequenceNumber seq, | |
988 | const Slice& key, | |
989 | const Slice& delta) { | |
990 | LookupKey lkey(key, seq); | |
991 | Slice memkey = lkey.memtable_key(); | |
992 | ||
993 | std::unique_ptr<MemTableRep::Iterator> iter( | |
994 | table_->GetDynamicPrefixIterator()); | |
995 | iter->Seek(lkey.internal_key(), memkey.data()); | |
996 | ||
997 | if (iter->Valid()) { | |
998 | // entry format is: | |
999 | // key_length varint32 | |
1000 | // userkey char[klength-8] | |
1001 | // tag uint64 | |
1002 | // vlength varint32 | |
1003 | // value char[vlength] | |
1004 | // Check that it belongs to same user key. We do not check the | |
1005 | // sequence number since the Seek() call above should have skipped | |
1006 | // all entries with overly large sequence numbers. | |
1007 | const char* entry = iter->key(); | |
1008 | uint32_t key_length = 0; | |
1009 | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); | |
1010 | if (comparator_.comparator.user_comparator()->Equal( | |
1011 | Slice(key_ptr, key_length - 8), lkey.user_key())) { | |
1012 | // Correct user key | |
1013 | const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); | |
1014 | ValueType type; | |
1015 | uint64_t unused; | |
1016 | UnPackSequenceAndType(tag, &unused, &type); | |
1017 | switch (type) { | |
1018 | case kTypeValue: { | |
1019 | Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); | |
1020 | uint32_t prev_size = static_cast<uint32_t>(prev_value.size()); | |
1021 | ||
1022 | char* prev_buffer = const_cast<char*>(prev_value.data()); | |
1023 | uint32_t new_prev_size = prev_size; | |
1024 | ||
1025 | std::string str_value; | |
1026 | WriteLock wl(GetLock(lkey.user_key())); | |
1027 | auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size, | |
1028 | delta, &str_value); | |
1029 | if (status == UpdateStatus::UPDATED_INPLACE) { | |
1030 | // Value already updated by callback. | |
1031 | assert(new_prev_size <= prev_size); | |
1032 | if (new_prev_size < prev_size) { | |
1033 | // overwrite the new prev_size | |
1034 | char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length, | |
1035 | new_prev_size); | |
1036 | if (VarintLength(new_prev_size) < VarintLength(prev_size)) { | |
1037 | // shift the value buffer as well. | |
1038 | memcpy(p, prev_buffer, new_prev_size); | |
1039 | } | |
1040 | } | |
1041 | RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); | |
1042 | UpdateFlushState(); | |
1043 | return true; | |
1044 | } else if (status == UpdateStatus::UPDATED) { | |
1045 | Add(seq, kTypeValue, key, Slice(str_value)); | |
1046 | RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN); | |
1047 | UpdateFlushState(); | |
1048 | return true; | |
1049 | } else if (status == UpdateStatus::UPDATE_FAILED) { | |
1050 | // No action required. Return. | |
1051 | UpdateFlushState(); | |
1052 | return true; | |
1053 | } | |
1054 | } | |
1055 | default: | |
1056 | break; | |
1057 | } | |
1058 | } | |
1059 | } | |
1060 | // If the latest value is not kTypeValue | |
1061 | // or key doesn't exist | |
1062 | return false; | |
1063 | } | |
1064 | ||
1065 | size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) { | |
1066 | Slice memkey = key.memtable_key(); | |
1067 | ||
1068 | // A total ordered iterator is costly for some memtablerep (prefix aware | |
1069 | // reps). By passing in the user key, we allow efficient iterator creation. | |
1070 | // The iterator only needs to be ordered within the same user key. | |
1071 | std::unique_ptr<MemTableRep::Iterator> iter( | |
1072 | table_->GetDynamicPrefixIterator()); | |
1073 | iter->Seek(key.internal_key(), memkey.data()); | |
1074 | ||
1075 | size_t num_successive_merges = 0; | |
1076 | ||
1077 | for (; iter->Valid(); iter->Next()) { | |
1078 | const char* entry = iter->key(); | |
1079 | uint32_t key_length = 0; | |
1080 | const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); | |
1081 | if (!comparator_.comparator.user_comparator()->Equal( | |
1082 | Slice(iter_key_ptr, key_length - 8), key.user_key())) { | |
1083 | break; | |
1084 | } | |
1085 | ||
1086 | const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8); | |
1087 | ValueType type; | |
1088 | uint64_t unused; | |
1089 | UnPackSequenceAndType(tag, &unused, &type); | |
1090 | if (type != kTypeMerge) { | |
1091 | break; | |
1092 | } | |
1093 | ||
1094 | ++num_successive_merges; | |
1095 | } | |
1096 | ||
1097 | return num_successive_merges; | |
1098 | } | |
1099 | ||
1100 | void MemTableRep::Get(const LookupKey& k, void* callback_args, | |
1101 | bool (*callback_func)(void* arg, const char* entry)) { | |
1102 | auto iter = GetDynamicPrefixIterator(); | |
1103 | for (iter->Seek(k.internal_key(), k.memtable_key().data()); | |
1104 | iter->Valid() && callback_func(callback_args, iter->key()); | |
1105 | iter->Next()) { | |
1106 | } | |
1107 | } | |
1108 | ||
1109 | void MemTable::RefLogContainingPrepSection(uint64_t log) { | |
1110 | assert(log > 0); | |
1111 | auto cur = min_prep_log_referenced_.load(); | |
1112 | while ((log < cur || cur == 0) && | |
1113 | !min_prep_log_referenced_.compare_exchange_strong(cur, log)) { | |
1114 | cur = min_prep_log_referenced_.load(); | |
1115 | } | |
1116 | } | |
1117 | ||
1118 | uint64_t MemTable::GetMinLogContainingPrepSection() { | |
1119 | return min_prep_log_referenced_.load(); | |
1120 | } | |
1121 | ||
f67539c2 | 1122 | } // namespace ROCKSDB_NAMESPACE |