]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #pragma once | |
11 | #include <atomic> | |
12 | #include <deque> | |
13 | #include <functional> | |
14 | #include <memory> | |
15 | #include <string> | |
16 | #include <unordered_map> | |
17 | #include <vector> | |
18 | #include "db/dbformat.h" | |
19 | #include "db/range_del_aggregator.h" | |
11fdf7f2 | 20 | #include "db/read_callback.h" |
7c673cae | 21 | #include "db/version_edit.h" |
7c673cae FG |
22 | #include "monitoring/instrumented_mutex.h" |
23 | #include "options/cf_options.h" | |
24 | #include "rocksdb/db.h" | |
25 | #include "rocksdb/env.h" | |
26 | #include "rocksdb/memtablerep.h" | |
11fdf7f2 | 27 | #include "util/allocator.h" |
7c673cae FG |
28 | #include "util/concurrent_arena.h" |
29 | #include "util/dynamic_bloom.h" | |
30 | #include "util/hash.h" | |
31 | ||
32 | namespace rocksdb { | |
33 | ||
34 | class Mutex; | |
35 | class MemTableIterator; | |
36 | class MergeContext; | |
7c673cae | 37 | |
11fdf7f2 TL |
38 | struct ImmutableMemTableOptions { |
39 | explicit ImmutableMemTableOptions(const ImmutableCFOptions& ioptions, | |
40 | const MutableCFOptions& mutable_cf_options); | |
7c673cae FG |
41 | size_t arena_block_size; |
42 | uint32_t memtable_prefix_bloom_bits; | |
43 | size_t memtable_huge_page_size; | |
44 | bool inplace_update_support; | |
45 | size_t inplace_update_num_locks; | |
46 | UpdateStatus (*inplace_callback)(char* existing_value, | |
47 | uint32_t* existing_value_size, | |
48 | Slice delta_value, | |
49 | std::string* merged_value); | |
50 | size_t max_successive_merges; | |
51 | Statistics* statistics; | |
52 | MergeOperator* merge_operator; | |
53 | Logger* info_log; | |
54 | }; | |
55 | ||
56 | // Batched counters to updated when inserting keys in one write batch. | |
57 | // In post process of the write batch, these can be updated together. | |
58 | // Only used in concurrent memtable insert case. | |
59 | struct MemTablePostProcessInfo { | |
60 | uint64_t data_size = 0; | |
61 | uint64_t num_entries = 0; | |
62 | uint64_t num_deletes = 0; | |
63 | }; | |
64 | ||
65 | // Note: Many of the methods in this class have comments indicating that | |
11fdf7f2 | 66 | // external synchronization is required as these methods are not thread-safe. |
7c673cae FG |
67 | // It is up to higher layers of code to decide how to prevent concurrent |
68 | // invokation of these methods. This is usually done by acquiring either | |
69 | // the db mutex or the single writer thread. | |
70 | // | |
71 | // Some of these methods are documented to only require external | |
72 | // synchronization if this memtable is immutable. Calling MarkImmutable() is | |
73 | // not sufficient to guarantee immutability. It is up to higher layers of | |
74 | // code to determine if this MemTable can still be modified by other threads. | |
75 | // Eg: The Superversion stores a pointer to the current MemTable (that can | |
76 | // be modified) and a separate list of the MemTables that can no longer be | |
77 | // written to (aka the 'immutable memtables'). | |
78 | class MemTable { | |
79 | public: | |
80 | struct KeyComparator : public MemTableRep::KeyComparator { | |
81 | const InternalKeyComparator comparator; | |
82 | explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } | |
83 | virtual int operator()(const char* prefix_len_key1, | |
84 | const char* prefix_len_key2) const override; | |
85 | virtual int operator()(const char* prefix_len_key, | |
11fdf7f2 | 86 | const DecodedType& key) const override; |
7c673cae FG |
87 | }; |
88 | ||
89 | // MemTables are reference counted. The initial reference count | |
90 | // is zero and the caller must call Ref() at least once. | |
91 | // | |
92 | // earliest_seq should be the current SequenceNumber in the db such that any | |
93 | // key inserted into this memtable will have an equal or larger seq number. | |
94 | // (When a db is first created, the earliest sequence number will be 0). | |
95 | // If the earliest sequence number is not known, kMaxSequenceNumber may be | |
96 | // used, but this may prevent some transactions from succeeding until the | |
97 | // first key is inserted into the memtable. | |
98 | explicit MemTable(const InternalKeyComparator& comparator, | |
99 | const ImmutableCFOptions& ioptions, | |
100 | const MutableCFOptions& mutable_cf_options, | |
101 | WriteBufferManager* write_buffer_manager, | |
11fdf7f2 | 102 | SequenceNumber earliest_seq, uint32_t column_family_id); |
7c673cae FG |
103 | |
104 | // Do not delete this MemTable unless Unref() indicates it not in use. | |
105 | ~MemTable(); | |
106 | ||
107 | // Increase reference count. | |
108 | // REQUIRES: external synchronization to prevent simultaneous | |
109 | // operations on the same MemTable. | |
110 | void Ref() { ++refs_; } | |
111 | ||
112 | // Drop reference count. | |
113 | // If the refcount goes to zero return this memtable, otherwise return null. | |
114 | // REQUIRES: external synchronization to prevent simultaneous | |
115 | // operations on the same MemTable. | |
116 | MemTable* Unref() { | |
117 | --refs_; | |
118 | assert(refs_ >= 0); | |
119 | if (refs_ <= 0) { | |
120 | return this; | |
121 | } | |
122 | return nullptr; | |
123 | } | |
124 | ||
125 | // Returns an estimate of the number of bytes of data in use by this | |
126 | // data structure. | |
127 | // | |
128 | // REQUIRES: external synchronization to prevent simultaneous | |
129 | // operations on the same MemTable (unless this Memtable is immutable). | |
130 | size_t ApproximateMemoryUsage(); | |
131 | ||
132 | // This method heuristically determines if the memtable should continue to | |
133 | // host more data. | |
134 | bool ShouldScheduleFlush() const { | |
135 | return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED; | |
136 | } | |
137 | ||
138 | // Returns true if a flush should be scheduled and the caller should | |
139 | // be the one to schedule it | |
140 | bool MarkFlushScheduled() { | |
141 | auto before = FLUSH_REQUESTED; | |
142 | return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED, | |
143 | std::memory_order_relaxed, | |
144 | std::memory_order_relaxed); | |
145 | } | |
146 | ||
147 | // Return an iterator that yields the contents of the memtable. | |
148 | // | |
149 | // The caller must ensure that the underlying MemTable remains live | |
150 | // while the returned iterator is live. The keys returned by this | |
151 | // iterator are internal keys encoded by AppendInternalKey in the | |
152 | // db/dbformat.{h,cc} module. | |
153 | // | |
154 | // By default, it returns an iterator for prefix seek if prefix_extractor | |
155 | // is configured in Options. | |
156 | // arena: If not null, the arena needs to be used to allocate the Iterator. | |
157 | // Calling ~Iterator of the iterator will destroy all the states but | |
158 | // those allocated in arena. | |
159 | InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena); | |
160 | ||
161 | InternalIterator* NewRangeTombstoneIterator(const ReadOptions& read_options); | |
162 | ||
163 | // Add an entry into memtable that maps key to value at the | |
164 | // specified sequence number and with the specified type. | |
165 | // Typically value will be empty if type==kTypeDeletion. | |
166 | // | |
167 | // REQUIRES: if allow_concurrent = false, external synchronization to prevent | |
168 | // simultaneous operations on the same MemTable. | |
11fdf7f2 TL |
169 | // |
170 | // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and | |
171 | // the <key, seq> already exists. | |
172 | bool Add(SequenceNumber seq, ValueType type, const Slice& key, | |
7c673cae FG |
173 | const Slice& value, bool allow_concurrent = false, |
174 | MemTablePostProcessInfo* post_process_info = nullptr); | |
175 | ||
176 | // If memtable contains a value for key, store it in *value and return true. | |
177 | // If memtable contains a deletion for key, store a NotFound() error | |
178 | // in *status and return true. | |
179 | // If memtable contains Merge operation as the most recent entry for a key, | |
180 | // and the merge process does not stop (not reaching a value or delete), | |
181 | // prepend the current merge operand to *operands. | |
182 | // store MergeInProgress in s, and return false. | |
183 | // Else, return false. | |
184 | // If any operation was found, its most recent sequence number | |
185 | // will be stored in *seq on success (regardless of whether true/false is | |
186 | // returned). Otherwise, *seq will be set to kMaxSequenceNumber. | |
187 | // On success, *s may be set to OK, NotFound, or MergeInProgress. Any other | |
188 | // status returned indicates a corruption or other unexpected error. | |
189 | bool Get(const LookupKey& key, std::string* value, Status* s, | |
190 | MergeContext* merge_context, RangeDelAggregator* range_del_agg, | |
11fdf7f2 TL |
191 | SequenceNumber* seq, const ReadOptions& read_opts, |
192 | ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); | |
7c673cae FG |
193 | |
194 | bool Get(const LookupKey& key, std::string* value, Status* s, | |
195 | MergeContext* merge_context, RangeDelAggregator* range_del_agg, | |
11fdf7f2 TL |
196 | const ReadOptions& read_opts, ReadCallback* callback = nullptr, |
197 | bool* is_blob_index = nullptr) { | |
7c673cae | 198 | SequenceNumber seq; |
11fdf7f2 TL |
199 | return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts, |
200 | callback, is_blob_index); | |
7c673cae FG |
201 | } |
202 | ||
203 | // Attempts to update the new_value inplace, else does normal Add | |
204 | // Pseudocode | |
205 | // if key exists in current memtable && prev_value is of type kTypeValue | |
206 | // if new sizeof(new_value) <= sizeof(prev_value) | |
207 | // update inplace | |
208 | // else add(key, new_value) | |
209 | // else add(key, new_value) | |
210 | // | |
211 | // REQUIRES: external synchronization to prevent simultaneous | |
212 | // operations on the same MemTable. | |
213 | void Update(SequenceNumber seq, | |
214 | const Slice& key, | |
215 | const Slice& value); | |
216 | ||
217 | // If prev_value for key exists, attempts to update it inplace. | |
218 | // else returns false | |
219 | // Pseudocode | |
220 | // if key exists in current memtable && prev_value is of type kTypeValue | |
221 | // new_value = delta(prev_value) | |
222 | // if sizeof(new_value) <= sizeof(prev_value) | |
223 | // update inplace | |
224 | // else add(key, new_value) | |
225 | // else return false | |
226 | // | |
227 | // REQUIRES: external synchronization to prevent simultaneous | |
228 | // operations on the same MemTable. | |
229 | bool UpdateCallback(SequenceNumber seq, | |
230 | const Slice& key, | |
231 | const Slice& delta); | |
232 | ||
233 | // Returns the number of successive merge entries starting from the newest | |
234 | // entry for the key up to the last non-merge entry or last entry for the | |
235 | // key in the memtable. | |
236 | size_t CountSuccessiveMergeEntries(const LookupKey& key); | |
237 | ||
238 | // Update counters and flush status after inserting a whole write batch | |
239 | // Used in concurrent memtable inserts. | |
240 | void BatchPostProcess(const MemTablePostProcessInfo& update_counters) { | |
241 | num_entries_.fetch_add(update_counters.num_entries, | |
242 | std::memory_order_relaxed); | |
243 | data_size_.fetch_add(update_counters.data_size, std::memory_order_relaxed); | |
244 | if (update_counters.num_deletes != 0) { | |
245 | num_deletes_.fetch_add(update_counters.num_deletes, | |
246 | std::memory_order_relaxed); | |
247 | } | |
248 | UpdateFlushState(); | |
249 | } | |
250 | ||
251 | // Get total number of entries in the mem table. | |
252 | // REQUIRES: external synchronization to prevent simultaneous | |
253 | // operations on the same MemTable (unless this Memtable is immutable). | |
254 | uint64_t num_entries() const { | |
255 | return num_entries_.load(std::memory_order_relaxed); | |
256 | } | |
257 | ||
258 | // Get total number of deletes in the mem table. | |
259 | // REQUIRES: external synchronization to prevent simultaneous | |
260 | // operations on the same MemTable (unless this Memtable is immutable). | |
261 | uint64_t num_deletes() const { | |
262 | return num_deletes_.load(std::memory_order_relaxed); | |
263 | } | |
264 | ||
11fdf7f2 TL |
265 | // Dynamically change the memtable's capacity. If set below the current usage, |
266 | // the next key added will trigger a flush. Can only increase size when | |
267 | // memtable prefix bloom is disabled, since we can't easily allocate more | |
268 | // space. | |
269 | void UpdateWriteBufferSize(size_t new_write_buffer_size) { | |
270 | if (prefix_bloom_ == nullptr || | |
271 | new_write_buffer_size < write_buffer_size_) { | |
272 | write_buffer_size_.store(new_write_buffer_size, | |
273 | std::memory_order_relaxed); | |
274 | } | |
275 | } | |
276 | ||
7c673cae FG |
277 | // Returns the edits area that is needed for flushing the memtable |
278 | VersionEdit* GetEdits() { return &edit_; } | |
279 | ||
280 | // Returns if there is no entry inserted to the mem table. | |
281 | // REQUIRES: external synchronization to prevent simultaneous | |
282 | // operations on the same MemTable (unless this Memtable is immutable). | |
283 | bool IsEmpty() const { return first_seqno_ == 0; } | |
284 | ||
285 | // Returns the sequence number of the first element that was inserted | |
286 | // into the memtable. | |
287 | // REQUIRES: external synchronization to prevent simultaneous | |
288 | // operations on the same MemTable (unless this Memtable is immutable). | |
289 | SequenceNumber GetFirstSequenceNumber() { | |
290 | return first_seqno_.load(std::memory_order_relaxed); | |
291 | } | |
292 | ||
293 | // Returns the sequence number that is guaranteed to be smaller than or equal | |
294 | // to the sequence number of any key that could be inserted into this | |
295 | // memtable. It can then be assumed that any write with a larger(or equal) | |
296 | // sequence number will be present in this memtable or a later memtable. | |
297 | // | |
298 | // If the earliest sequence number could not be determined, | |
299 | // kMaxSequenceNumber will be returned. | |
300 | SequenceNumber GetEarliestSequenceNumber() { | |
301 | return earliest_seqno_.load(std::memory_order_relaxed); | |
302 | } | |
303 | ||
304 | // DB's latest sequence ID when the memtable is created. This number | |
305 | // may be updated to a more recent one before any key is inserted. | |
306 | SequenceNumber GetCreationSeq() const { return creation_seq_; } | |
307 | ||
308 | void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; } | |
309 | ||
310 | // Returns the next active logfile number when this memtable is about to | |
311 | // be flushed to storage | |
312 | // REQUIRES: external synchronization to prevent simultaneous | |
313 | // operations on the same MemTable. | |
314 | uint64_t GetNextLogNumber() { return mem_next_logfile_number_; } | |
315 | ||
316 | // Sets the next active logfile number when this memtable is about to | |
317 | // be flushed to storage | |
318 | // REQUIRES: external synchronization to prevent simultaneous | |
319 | // operations on the same MemTable. | |
320 | void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; } | |
321 | ||
322 | // if this memtable contains data from a committed | |
323 | // two phase transaction we must take note of the | |
324 | // log which contains that data so we can know | |
325 | // when to relese that log | |
326 | void RefLogContainingPrepSection(uint64_t log); | |
327 | uint64_t GetMinLogContainingPrepSection(); | |
328 | ||
329 | // Notify the underlying storage that no more items will be added. | |
330 | // REQUIRES: external synchronization to prevent simultaneous | |
331 | // operations on the same MemTable. | |
332 | // After MarkImmutable() is called, you should not attempt to | |
333 | // write anything to this MemTable(). (Ie. do not call Add() or Update()). | |
334 | void MarkImmutable() { | |
335 | table_->MarkReadOnly(); | |
11fdf7f2 TL |
336 | mem_tracker_.DoneAllocating(); |
337 | } | |
338 | ||
339 | // Notify the underlying storage that all data it contained has been | |
340 | // persisted. | |
341 | // REQUIRES: external synchronization to prevent simultaneous | |
342 | // operations on the same MemTable. | |
343 | void MarkFlushed() { | |
344 | table_->MarkFlushed(); | |
7c673cae FG |
345 | } |
346 | ||
347 | // return true if the current MemTableRep supports merge operator. | |
348 | bool IsMergeOperatorSupported() const { | |
349 | return table_->IsMergeOperatorSupported(); | |
350 | } | |
351 | ||
352 | // return true if the current MemTableRep supports snapshots. | |
353 | // inplace update prevents snapshots, | |
354 | bool IsSnapshotSupported() const { | |
355 | return table_->IsSnapshotSupported() && !moptions_.inplace_update_support; | |
356 | } | |
357 | ||
358 | struct MemTableStats { | |
359 | uint64_t size; | |
360 | uint64_t count; | |
361 | }; | |
362 | ||
363 | MemTableStats ApproximateStats(const Slice& start_ikey, | |
364 | const Slice& end_ikey); | |
365 | ||
366 | // Get the lock associated for the key | |
367 | port::RWMutex* GetLock(const Slice& key); | |
368 | ||
369 | const InternalKeyComparator& GetInternalKeyComparator() const { | |
370 | return comparator_.comparator; | |
371 | } | |
372 | ||
11fdf7f2 TL |
373 | const ImmutableMemTableOptions* GetImmutableMemTableOptions() const { |
374 | return &moptions_; | |
375 | } | |
376 | ||
377 | uint64_t ApproximateOldestKeyTime() const { | |
378 | return oldest_key_time_.load(std::memory_order_relaxed); | |
379 | } | |
380 | ||
381 | // REQUIRES: db_mutex held. | |
382 | void SetID(uint64_t id) { id_ = id; } | |
383 | ||
384 | uint64_t GetID() const { return id_; } | |
7c673cae FG |
385 | |
386 | private: | |
387 | enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; | |
388 | ||
389 | friend class MemTableIterator; | |
390 | friend class MemTableBackwardIterator; | |
391 | friend class MemTableList; | |
392 | ||
393 | KeyComparator comparator_; | |
11fdf7f2 | 394 | const ImmutableMemTableOptions moptions_; |
7c673cae FG |
395 | int refs_; |
396 | const size_t kArenaBlockSize; | |
11fdf7f2 | 397 | AllocTracker mem_tracker_; |
7c673cae | 398 | ConcurrentArena arena_; |
7c673cae FG |
399 | unique_ptr<MemTableRep> table_; |
400 | unique_ptr<MemTableRep> range_del_table_; | |
401 | bool is_range_del_table_empty_; | |
402 | ||
403 | // Total data size of all data inserted | |
404 | std::atomic<uint64_t> data_size_; | |
405 | std::atomic<uint64_t> num_entries_; | |
406 | std::atomic<uint64_t> num_deletes_; | |
407 | ||
11fdf7f2 TL |
408 | // Dynamically changeable memtable option |
409 | std::atomic<size_t> write_buffer_size_; | |
410 | ||
7c673cae FG |
411 | // These are used to manage memtable flushes to storage |
412 | bool flush_in_progress_; // started the flush | |
413 | bool flush_completed_; // finished the flush | |
414 | uint64_t file_number_; // filled up after flush is complete | |
415 | ||
416 | // The updates to be applied to the transaction log when this | |
417 | // memtable is flushed to storage. | |
418 | VersionEdit edit_; | |
419 | ||
420 | // The sequence number of the kv that was inserted first | |
421 | std::atomic<SequenceNumber> first_seqno_; | |
422 | ||
423 | // The db sequence number at the time of creation or kMaxSequenceNumber | |
424 | // if not set. | |
425 | std::atomic<SequenceNumber> earliest_seqno_; | |
426 | ||
427 | SequenceNumber creation_seq_; | |
428 | ||
429 | // The log files earlier than this number can be deleted. | |
430 | uint64_t mem_next_logfile_number_; | |
431 | ||
432 | // the earliest log containing a prepared section | |
433 | // which has been inserted into this memtable. | |
434 | std::atomic<uint64_t> min_prep_log_referenced_; | |
435 | ||
436 | // rw locks for inplace updates | |
437 | std::vector<port::RWMutex> locks_; | |
438 | ||
439 | const SliceTransform* const prefix_extractor_; | |
440 | std::unique_ptr<DynamicBloom> prefix_bloom_; | |
441 | ||
442 | std::atomic<FlushStateEnum> flush_state_; | |
443 | ||
444 | Env* env_; | |
445 | ||
446 | // Extract sequential insert prefixes. | |
447 | const SliceTransform* insert_with_hint_prefix_extractor_; | |
448 | ||
449 | // Insert hints for each prefix. | |
450 | std::unordered_map<Slice, void*, SliceHasher> insert_hints_; | |
451 | ||
11fdf7f2 TL |
452 | // Timestamp of oldest key |
453 | std::atomic<uint64_t> oldest_key_time_; | |
454 | ||
455 | // Memtable id to track flush. | |
456 | uint64_t id_ = 0; | |
457 | ||
7c673cae FG |
458 | // Returns a heuristic flush decision |
459 | bool ShouldFlushNow() const; | |
460 | ||
461 | // Updates flush_state_ using ShouldFlushNow() | |
462 | void UpdateFlushState(); | |
463 | ||
11fdf7f2 TL |
464 | void UpdateOldestKeyTime(); |
465 | ||
7c673cae FG |
466 | // No copying allowed |
467 | MemTable(const MemTable&); | |
468 | MemTable& operator=(const MemTable&); | |
469 | }; | |
470 | ||
471 | extern const char* EncodeKey(std::string* scratch, const Slice& target); | |
472 | ||
473 | } // namespace rocksdb |