]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include "rocksdb/utilities/write_batch_with_index.h" | |
9 | ||
7c673cae FG |
10 | #include <memory> |
11 | ||
12 | #include "db/column_family.h" | |
13 | #include "db/db_impl.h" | |
14 | #include "db/merge_context.h" | |
15 | #include "db/merge_helper.h" | |
16 | #include "memtable/skiplist.h" | |
17 | #include "options/db_options.h" | |
18 | #include "rocksdb/comparator.h" | |
19 | #include "rocksdb/iterator.h" | |
20 | #include "util/arena.h" | |
11fdf7f2 TL |
21 | #include "util/cast_util.h" |
22 | #include "util/string_util.h" | |
7c673cae FG |
23 | #include "utilities/write_batch_with_index/write_batch_with_index_internal.h" |
24 | ||
25 | namespace rocksdb { | |
26 | ||
27 | // when direction == forward | |
28 | // * current_at_base_ <=> base_iterator > delta_iterator | |
29 | // when direction == backwards | |
30 | // * current_at_base_ <=> base_iterator < delta_iterator | |
31 | // always: | |
32 | // * equal_keys_ <=> base_iterator == delta_iterator | |
33 | class BaseDeltaIterator : public Iterator { | |
34 | public: | |
35 | BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator, | |
36 | const Comparator* comparator) | |
37 | : forward_(true), | |
38 | current_at_base_(true), | |
39 | equal_keys_(false), | |
40 | status_(Status::OK()), | |
41 | base_iterator_(base_iterator), | |
42 | delta_iterator_(delta_iterator), | |
43 | comparator_(comparator) {} | |
44 | ||
494da23a | 45 | ~BaseDeltaIterator() override {} |
7c673cae FG |
46 | |
47 | bool Valid() const override { | |
48 | return current_at_base_ ? BaseValid() : DeltaValid(); | |
49 | } | |
50 | ||
51 | void SeekToFirst() override { | |
52 | forward_ = true; | |
53 | base_iterator_->SeekToFirst(); | |
54 | delta_iterator_->SeekToFirst(); | |
55 | UpdateCurrent(); | |
56 | } | |
57 | ||
58 | void SeekToLast() override { | |
59 | forward_ = false; | |
60 | base_iterator_->SeekToLast(); | |
61 | delta_iterator_->SeekToLast(); | |
62 | UpdateCurrent(); | |
63 | } | |
64 | ||
65 | void Seek(const Slice& k) override { | |
66 | forward_ = true; | |
67 | base_iterator_->Seek(k); | |
68 | delta_iterator_->Seek(k); | |
69 | UpdateCurrent(); | |
70 | } | |
71 | ||
72 | void SeekForPrev(const Slice& k) override { | |
73 | forward_ = false; | |
74 | base_iterator_->SeekForPrev(k); | |
75 | delta_iterator_->SeekForPrev(k); | |
76 | UpdateCurrent(); | |
77 | } | |
78 | ||
79 | void Next() override { | |
80 | if (!Valid()) { | |
81 | status_ = Status::NotSupported("Next() on invalid iterator"); | |
11fdf7f2 | 82 | return; |
7c673cae FG |
83 | } |
84 | ||
85 | if (!forward_) { | |
86 | // Need to change direction | |
87 | // if our direction was backward and we're not equal, we have two states: | |
88 | // * both iterators are valid: we're already in a good state (current | |
89 | // shows to smaller) | |
90 | // * only one iterator is valid: we need to advance that iterator | |
91 | forward_ = true; | |
92 | equal_keys_ = false; | |
93 | if (!BaseValid()) { | |
94 | assert(DeltaValid()); | |
95 | base_iterator_->SeekToFirst(); | |
96 | } else if (!DeltaValid()) { | |
97 | delta_iterator_->SeekToFirst(); | |
98 | } else if (current_at_base_) { | |
99 | // Change delta from larger than base to smaller | |
100 | AdvanceDelta(); | |
101 | } else { | |
102 | // Change base from larger than delta to smaller | |
103 | AdvanceBase(); | |
104 | } | |
105 | if (DeltaValid() && BaseValid()) { | |
106 | if (comparator_->Equal(delta_iterator_->Entry().key, | |
107 | base_iterator_->key())) { | |
108 | equal_keys_ = true; | |
109 | } | |
110 | } | |
111 | } | |
112 | Advance(); | |
113 | } | |
114 | ||
115 | void Prev() override { | |
116 | if (!Valid()) { | |
117 | status_ = Status::NotSupported("Prev() on invalid iterator"); | |
11fdf7f2 | 118 | return; |
7c673cae FG |
119 | } |
120 | ||
121 | if (forward_) { | |
122 | // Need to change direction | |
123 | // if our direction was backward and we're not equal, we have two states: | |
124 | // * both iterators are valid: we're already in a good state (current | |
125 | // shows to smaller) | |
126 | // * only one iterator is valid: we need to advance that iterator | |
127 | forward_ = false; | |
128 | equal_keys_ = false; | |
129 | if (!BaseValid()) { | |
130 | assert(DeltaValid()); | |
131 | base_iterator_->SeekToLast(); | |
132 | } else if (!DeltaValid()) { | |
133 | delta_iterator_->SeekToLast(); | |
134 | } else if (current_at_base_) { | |
135 | // Change delta from less advanced than base to more advanced | |
136 | AdvanceDelta(); | |
137 | } else { | |
138 | // Change base from less advanced than delta to more advanced | |
139 | AdvanceBase(); | |
140 | } | |
141 | if (DeltaValid() && BaseValid()) { | |
142 | if (comparator_->Equal(delta_iterator_->Entry().key, | |
143 | base_iterator_->key())) { | |
144 | equal_keys_ = true; | |
145 | } | |
146 | } | |
147 | } | |
148 | ||
149 | Advance(); | |
150 | } | |
151 | ||
152 | Slice key() const override { | |
153 | return current_at_base_ ? base_iterator_->key() | |
154 | : delta_iterator_->Entry().key; | |
155 | } | |
156 | ||
157 | Slice value() const override { | |
158 | return current_at_base_ ? base_iterator_->value() | |
159 | : delta_iterator_->Entry().value; | |
160 | } | |
161 | ||
162 | Status status() const override { | |
163 | if (!status_.ok()) { | |
164 | return status_; | |
165 | } | |
166 | if (!base_iterator_->status().ok()) { | |
167 | return base_iterator_->status(); | |
168 | } | |
169 | return delta_iterator_->status(); | |
170 | } | |
171 | ||
172 | private: | |
173 | void AssertInvariants() { | |
174 | #ifndef NDEBUG | |
11fdf7f2 TL |
175 | bool not_ok = false; |
176 | if (!base_iterator_->status().ok()) { | |
177 | assert(!base_iterator_->Valid()); | |
178 | not_ok = true; | |
179 | } | |
180 | if (!delta_iterator_->status().ok()) { | |
181 | assert(!delta_iterator_->Valid()); | |
182 | not_ok = true; | |
183 | } | |
184 | if (not_ok) { | |
185 | assert(!Valid()); | |
186 | assert(!status().ok()); | |
187 | return; | |
188 | } | |
189 | ||
7c673cae FG |
190 | if (!Valid()) { |
191 | return; | |
192 | } | |
193 | if (!BaseValid()) { | |
194 | assert(!current_at_base_ && delta_iterator_->Valid()); | |
195 | return; | |
196 | } | |
197 | if (!DeltaValid()) { | |
198 | assert(current_at_base_ && base_iterator_->Valid()); | |
199 | return; | |
200 | } | |
201 | // we don't support those yet | |
202 | assert(delta_iterator_->Entry().type != kMergeRecord && | |
203 | delta_iterator_->Entry().type != kLogDataRecord); | |
204 | int compare = comparator_->Compare(delta_iterator_->Entry().key, | |
205 | base_iterator_->key()); | |
206 | if (forward_) { | |
207 | // current_at_base -> compare < 0 | |
208 | assert(!current_at_base_ || compare < 0); | |
209 | // !current_at_base -> compare <= 0 | |
210 | assert(current_at_base_ && compare >= 0); | |
211 | } else { | |
212 | // current_at_base -> compare > 0 | |
213 | assert(!current_at_base_ || compare > 0); | |
214 | // !current_at_base -> compare <= 0 | |
215 | assert(current_at_base_ && compare <= 0); | |
216 | } | |
217 | // equal_keys_ <=> compare == 0 | |
218 | assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0)); | |
219 | #endif | |
220 | } | |
221 | ||
222 | void Advance() { | |
223 | if (equal_keys_) { | |
224 | assert(BaseValid() && DeltaValid()); | |
225 | AdvanceBase(); | |
226 | AdvanceDelta(); | |
227 | } else { | |
228 | if (current_at_base_) { | |
229 | assert(BaseValid()); | |
230 | AdvanceBase(); | |
231 | } else { | |
232 | assert(DeltaValid()); | |
233 | AdvanceDelta(); | |
234 | } | |
235 | } | |
236 | UpdateCurrent(); | |
237 | } | |
238 | ||
239 | void AdvanceDelta() { | |
240 | if (forward_) { | |
241 | delta_iterator_->Next(); | |
242 | } else { | |
243 | delta_iterator_->Prev(); | |
244 | } | |
245 | } | |
246 | void AdvanceBase() { | |
247 | if (forward_) { | |
248 | base_iterator_->Next(); | |
249 | } else { | |
250 | base_iterator_->Prev(); | |
251 | } | |
252 | } | |
253 | bool BaseValid() const { return base_iterator_->Valid(); } | |
254 | bool DeltaValid() const { return delta_iterator_->Valid(); } | |
255 | void UpdateCurrent() { | |
256 | // Suppress false positive clang analyzer warnings. | |
257 | #ifndef __clang_analyzer__ | |
11fdf7f2 | 258 | status_ = Status::OK(); |
7c673cae FG |
259 | while (true) { |
260 | WriteEntry delta_entry; | |
261 | if (DeltaValid()) { | |
11fdf7f2 | 262 | assert(delta_iterator_->status().ok()); |
7c673cae | 263 | delta_entry = delta_iterator_->Entry(); |
11fdf7f2 TL |
264 | } else if (!delta_iterator_->status().ok()) { |
265 | // Expose the error status and stop. | |
266 | current_at_base_ = false; | |
267 | return; | |
7c673cae FG |
268 | } |
269 | equal_keys_ = false; | |
270 | if (!BaseValid()) { | |
11fdf7f2 TL |
271 | if (!base_iterator_->status().ok()) { |
272 | // Expose the error status and stop. | |
273 | current_at_base_ = true; | |
274 | return; | |
275 | } | |
276 | ||
7c673cae FG |
277 | // Base has finished. |
278 | if (!DeltaValid()) { | |
279 | // Finished | |
280 | return; | |
281 | } | |
282 | if (delta_entry.type == kDeleteRecord || | |
283 | delta_entry.type == kSingleDeleteRecord) { | |
284 | AdvanceDelta(); | |
285 | } else { | |
286 | current_at_base_ = false; | |
287 | return; | |
288 | } | |
289 | } else if (!DeltaValid()) { | |
290 | // Delta has finished. | |
291 | current_at_base_ = true; | |
292 | return; | |
293 | } else { | |
294 | int compare = | |
295 | (forward_ ? 1 : -1) * | |
296 | comparator_->Compare(delta_entry.key, base_iterator_->key()); | |
297 | if (compare <= 0) { // delta bigger or equal | |
298 | if (compare == 0) { | |
299 | equal_keys_ = true; | |
300 | } | |
301 | if (delta_entry.type != kDeleteRecord && | |
302 | delta_entry.type != kSingleDeleteRecord) { | |
303 | current_at_base_ = false; | |
304 | return; | |
305 | } | |
306 | // Delta is less advanced and is delete. | |
307 | AdvanceDelta(); | |
308 | if (equal_keys_) { | |
309 | AdvanceBase(); | |
310 | } | |
311 | } else { | |
312 | current_at_base_ = true; | |
313 | return; | |
314 | } | |
315 | } | |
316 | } | |
317 | ||
318 | AssertInvariants(); | |
319 | #endif // __clang_analyzer__ | |
320 | } | |
321 | ||
322 | bool forward_; | |
323 | bool current_at_base_; | |
324 | bool equal_keys_; | |
325 | Status status_; | |
326 | std::unique_ptr<Iterator> base_iterator_; | |
327 | std::unique_ptr<WBWIIterator> delta_iterator_; | |
328 | const Comparator* comparator_; // not owned | |
329 | }; | |
330 | ||
331 | typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&> | |
332 | WriteBatchEntrySkipList; | |
333 | ||
334 | class WBWIIteratorImpl : public WBWIIterator { | |
335 | public: | |
336 | WBWIIteratorImpl(uint32_t column_family_id, | |
337 | WriteBatchEntrySkipList* skip_list, | |
338 | const ReadableWriteBatch* write_batch) | |
339 | : column_family_id_(column_family_id), | |
340 | skip_list_iter_(skip_list), | |
341 | write_batch_(write_batch) {} | |
342 | ||
494da23a | 343 | ~WBWIIteratorImpl() override {} |
7c673cae | 344 | |
494da23a | 345 | bool Valid() const override { |
7c673cae FG |
346 | if (!skip_list_iter_.Valid()) { |
347 | return false; | |
348 | } | |
349 | const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); | |
350 | return (iter_entry != nullptr && | |
351 | iter_entry->column_family == column_family_id_); | |
352 | } | |
353 | ||
494da23a | 354 | void SeekToFirst() override { |
11fdf7f2 TL |
355 | WriteBatchIndexEntry search_entry( |
356 | nullptr /* search_key */, column_family_id_, | |
357 | true /* is_forward_direction */, true /* is_seek_to_first */); | |
7c673cae FG |
358 | skip_list_iter_.Seek(&search_entry); |
359 | } | |
360 | ||
494da23a | 361 | void SeekToLast() override { |
11fdf7f2 TL |
362 | WriteBatchIndexEntry search_entry( |
363 | nullptr /* search_key */, column_family_id_ + 1, | |
364 | true /* is_forward_direction */, true /* is_seek_to_first */); | |
7c673cae FG |
365 | skip_list_iter_.Seek(&search_entry); |
366 | if (!skip_list_iter_.Valid()) { | |
367 | skip_list_iter_.SeekToLast(); | |
368 | } else { | |
369 | skip_list_iter_.Prev(); | |
370 | } | |
371 | } | |
372 | ||
494da23a | 373 | void Seek(const Slice& key) override { |
11fdf7f2 TL |
374 | WriteBatchIndexEntry search_entry(&key, column_family_id_, |
375 | true /* is_forward_direction */, | |
376 | false /* is_seek_to_first */); | |
7c673cae FG |
377 | skip_list_iter_.Seek(&search_entry); |
378 | } | |
379 | ||
494da23a | 380 | void SeekForPrev(const Slice& key) override { |
11fdf7f2 TL |
381 | WriteBatchIndexEntry search_entry(&key, column_family_id_, |
382 | false /* is_forward_direction */, | |
383 | false /* is_seek_to_first */); | |
7c673cae FG |
384 | skip_list_iter_.SeekForPrev(&search_entry); |
385 | } | |
386 | ||
494da23a | 387 | void Next() override { skip_list_iter_.Next(); } |
7c673cae | 388 | |
494da23a | 389 | void Prev() override { skip_list_iter_.Prev(); } |
7c673cae | 390 | |
494da23a | 391 | WriteEntry Entry() const override { |
7c673cae FG |
392 | WriteEntry ret; |
393 | Slice blob, xid; | |
394 | const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); | |
395 | // this is guaranteed with Valid() | |
396 | assert(iter_entry != nullptr && | |
397 | iter_entry->column_family == column_family_id_); | |
398 | auto s = write_batch_->GetEntryFromDataOffset( | |
399 | iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid); | |
400 | assert(s.ok()); | |
401 | assert(ret.type == kPutRecord || ret.type == kDeleteRecord || | |
402 | ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord || | |
403 | ret.type == kMergeRecord); | |
404 | return ret; | |
405 | } | |
406 | ||
494da23a | 407 | Status status() const override { |
7c673cae FG |
408 | // this is in-memory data structure, so the only way status can be non-ok is |
409 | // through memory corruption | |
410 | return Status::OK(); | |
411 | } | |
412 | ||
413 | const WriteBatchIndexEntry* GetRawEntry() const { | |
414 | return skip_list_iter_.key(); | |
415 | } | |
416 | ||
417 | private: | |
418 | uint32_t column_family_id_; | |
419 | WriteBatchEntrySkipList::Iterator skip_list_iter_; | |
420 | const ReadableWriteBatch* write_batch_; | |
421 | }; | |
422 | ||
423 | struct WriteBatchWithIndex::Rep { | |
11fdf7f2 TL |
424 | explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, |
425 | size_t max_bytes = 0, bool _overwrite_key = false) | |
7c673cae FG |
426 | : write_batch(reserved_bytes, max_bytes), |
427 | comparator(index_comparator, &write_batch), | |
428 | skip_list(comparator, &arena), | |
429 | overwrite_key(_overwrite_key), | |
11fdf7f2 TL |
430 | last_entry_offset(0), |
431 | last_sub_batch_offset(0), | |
432 | sub_batch_cnt(1) {} | |
7c673cae FG |
433 | ReadableWriteBatch write_batch; |
434 | WriteBatchEntryComparator comparator; | |
435 | Arena arena; | |
436 | WriteBatchEntrySkipList skip_list; | |
437 | bool overwrite_key; | |
438 | size_t last_entry_offset; | |
11fdf7f2 TL |
439 | // The starting offset of the last sub-batch. A sub-batch starts right before |
440 | // inserting a key that is a duplicate of a key in the last sub-batch. Zero, | |
441 | // the default, means that no duplicate key is detected so far. | |
442 | size_t last_sub_batch_offset; | |
443 | // Total number of sub-batches in the write batch. Default is 1. | |
444 | size_t sub_batch_cnt; | |
7c673cae FG |
445 | |
446 | // Remember current offset of internal write batch, which is used as | |
447 | // the starting offset of the next record. | |
448 | void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); } | |
449 | ||
450 | // In overwrite mode, find the existing entry for the same key and update it | |
451 | // to point to the current entry. | |
452 | // Return true if the key is found and updated. | |
453 | bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key); | |
454 | bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key); | |
455 | ||
456 | // Add the recent entry to the update. | |
457 | // In overwrite mode, if key already exists in the index, update it. | |
458 | void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key); | |
459 | void AddOrUpdateIndex(const Slice& key); | |
460 | ||
461 | // Allocate an index entry pointing to the last entry in the write batch and | |
462 | // put it to skip list. | |
463 | void AddNewEntry(uint32_t column_family_id); | |
464 | ||
465 | // Clear all updates buffered in this batch. | |
466 | void Clear(); | |
467 | void ClearIndex(); | |
468 | ||
469 | // Rebuild index by reading all records from the batch. | |
470 | // Returns non-ok status on corruption. | |
471 | Status ReBuildIndex(); | |
472 | }; | |
473 | ||
474 | bool WriteBatchWithIndex::Rep::UpdateExistingEntry( | |
475 | ColumnFamilyHandle* column_family, const Slice& key) { | |
476 | uint32_t cf_id = GetColumnFamilyID(column_family); | |
477 | return UpdateExistingEntryWithCfId(cf_id, key); | |
478 | } | |
479 | ||
480 | bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( | |
481 | uint32_t column_family_id, const Slice& key) { | |
482 | if (!overwrite_key) { | |
483 | return false; | |
484 | } | |
485 | ||
486 | WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch); | |
487 | iter.Seek(key); | |
488 | if (!iter.Valid()) { | |
489 | return false; | |
490 | } | |
491 | if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) { | |
492 | return false; | |
493 | } | |
494 | WriteBatchIndexEntry* non_const_entry = | |
495 | const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry()); | |
11fdf7f2 TL |
496 | if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) { |
497 | last_sub_batch_offset = last_entry_offset; | |
498 | sub_batch_cnt++; | |
499 | } | |
7c673cae FG |
500 | non_const_entry->offset = last_entry_offset; |
501 | return true; | |
502 | } | |
503 | ||
504 | void WriteBatchWithIndex::Rep::AddOrUpdateIndex( | |
505 | ColumnFamilyHandle* column_family, const Slice& key) { | |
506 | if (!UpdateExistingEntry(column_family, key)) { | |
507 | uint32_t cf_id = GetColumnFamilyID(column_family); | |
508 | const auto* cf_cmp = GetColumnFamilyUserComparator(column_family); | |
509 | if (cf_cmp != nullptr) { | |
510 | comparator.SetComparatorForCF(cf_id, cf_cmp); | |
511 | } | |
512 | AddNewEntry(cf_id); | |
513 | } | |
514 | } | |
515 | ||
516 | void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) { | |
517 | if (!UpdateExistingEntryWithCfId(0, key)) { | |
518 | AddNewEntry(0); | |
519 | } | |
520 | } | |
521 | ||
522 | void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { | |
523 | const std::string& wb_data = write_batch.Data(); | |
524 | Slice entry_ptr = Slice(wb_data.data() + last_entry_offset, | |
525 | wb_data.size() - last_entry_offset); | |
526 | // Extract key | |
527 | Slice key; | |
11fdf7f2 TL |
528 | bool success __attribute__((__unused__)); |
529 | success = | |
7c673cae FG |
530 | ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0); |
531 | assert(success); | |
532 | ||
11fdf7f2 TL |
533 | auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry)); |
534 | auto* index_entry = | |
535 | new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id, | |
536 | key.data() - wb_data.data(), key.size()); | |
537 | skip_list.Insert(index_entry); | |
538 | } | |
7c673cae | 539 | |
11fdf7f2 TL |
540 | void WriteBatchWithIndex::Rep::Clear() { |
541 | write_batch.Clear(); | |
542 | ClearIndex(); | |
543 | } | |
7c673cae | 544 | |
11fdf7f2 TL |
545 | void WriteBatchWithIndex::Rep::ClearIndex() { |
546 | skip_list.~WriteBatchEntrySkipList(); | |
547 | arena.~Arena(); | |
548 | new (&arena) Arena(); | |
549 | new (&skip_list) WriteBatchEntrySkipList(comparator, &arena); | |
550 | last_entry_offset = 0; | |
551 | last_sub_batch_offset = 0; | |
552 | sub_batch_cnt = 1; | |
553 | } | |
7c673cae | 554 | |
11fdf7f2 TL |
555 | Status WriteBatchWithIndex::Rep::ReBuildIndex() { |
556 | Status s; | |
7c673cae | 557 | |
11fdf7f2 | 558 | ClearIndex(); |
7c673cae | 559 | |
11fdf7f2 TL |
560 | if (write_batch.Count() == 0) { |
561 | // Nothing to re-index | |
562 | return s; | |
563 | } | |
7c673cae | 564 | |
11fdf7f2 | 565 | size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch); |
7c673cae | 566 | |
11fdf7f2 TL |
567 | Slice input(write_batch.Data()); |
568 | input.remove_prefix(offset); | |
7c673cae | 569 | |
11fdf7f2 TL |
570 | // Loop through all entries in Rep and add each one to the index |
571 | int found = 0; | |
572 | while (s.ok() && !input.empty()) { | |
573 | Slice key, value, blob, xid; | |
574 | uint32_t column_family_id = 0; // default | |
575 | char tag = 0; | |
7c673cae | 576 | |
11fdf7f2 TL |
577 | // set offset of current entry for call to AddNewEntry() |
578 | last_entry_offset = input.data() - write_batch.Data().data(); | |
7c673cae | 579 | |
11fdf7f2 TL |
580 | s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, |
581 | &value, &blob, &xid); | |
582 | if (!s.ok()) { | |
583 | break; | |
7c673cae FG |
584 | } |
585 | ||
11fdf7f2 TL |
586 | switch (tag) { |
587 | case kTypeColumnFamilyValue: | |
588 | case kTypeValue: | |
589 | case kTypeColumnFamilyDeletion: | |
590 | case kTypeDeletion: | |
591 | case kTypeColumnFamilySingleDeletion: | |
592 | case kTypeSingleDeletion: | |
593 | case kTypeColumnFamilyMerge: | |
594 | case kTypeMerge: | |
595 | found++; | |
596 | if (!UpdateExistingEntryWithCfId(column_family_id, key)) { | |
597 | AddNewEntry(column_family_id); | |
598 | } | |
599 | break; | |
600 | case kTypeLogData: | |
601 | case kTypeBeginPrepareXID: | |
602 | case kTypeBeginPersistedPrepareXID: | |
603 | case kTypeBeginUnprepareXID: | |
604 | case kTypeEndPrepareXID: | |
605 | case kTypeCommitXID: | |
606 | case kTypeRollbackXID: | |
607 | case kTypeNoop: | |
608 | break; | |
609 | default: | |
610 | return Status::Corruption("unknown WriteBatch tag in ReBuildIndex", | |
611 | ToString(static_cast<unsigned int>(tag))); | |
7c673cae | 612 | } |
11fdf7f2 | 613 | } |
7c673cae | 614 | |
11fdf7f2 TL |
615 | if (s.ok() && found != write_batch.Count()) { |
616 | s = Status::Corruption("WriteBatch has wrong count"); | |
7c673cae FG |
617 | } |
618 | ||
11fdf7f2 TL |
619 | return s; |
620 | } | |
621 | ||
622 | WriteBatchWithIndex::WriteBatchWithIndex( | |
623 | const Comparator* default_index_comparator, size_t reserved_bytes, | |
624 | bool overwrite_key, size_t max_bytes) | |
625 | : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes, | |
626 | overwrite_key)) {} | |
7c673cae | 627 | |
11fdf7f2 | 628 | WriteBatchWithIndex::~WriteBatchWithIndex() {} |
7c673cae | 629 | |
11fdf7f2 | 630 | WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } |
7c673cae | 631 | |
11fdf7f2 TL |
632 | size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; } |
633 | ||
634 | WBWIIterator* WriteBatchWithIndex::NewIterator() { | |
635 | return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch); | |
7c673cae FG |
636 | } |
637 | ||
638 | WBWIIterator* WriteBatchWithIndex::NewIterator( | |
639 | ColumnFamilyHandle* column_family) { | |
640 | return new WBWIIteratorImpl(GetColumnFamilyID(column_family), | |
641 | &(rep->skip_list), &rep->write_batch); | |
642 | } | |
643 | ||
644 | Iterator* WriteBatchWithIndex::NewIteratorWithBase( | |
645 | ColumnFamilyHandle* column_family, Iterator* base_iterator) { | |
646 | if (rep->overwrite_key == false) { | |
647 | assert(false); | |
648 | return nullptr; | |
649 | } | |
650 | return new BaseDeltaIterator(base_iterator, NewIterator(column_family), | |
651 | GetColumnFamilyUserComparator(column_family)); | |
652 | } | |
653 | ||
654 | Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) { | |
655 | if (rep->overwrite_key == false) { | |
656 | assert(false); | |
657 | return nullptr; | |
658 | } | |
659 | // default column family's comparator | |
660 | return new BaseDeltaIterator(base_iterator, NewIterator(), | |
661 | rep->comparator.default_comparator()); | |
662 | } | |
663 | ||
664 | Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, | |
665 | const Slice& key, const Slice& value) { | |
666 | rep->SetLastEntryOffset(); | |
667 | auto s = rep->write_batch.Put(column_family, key, value); | |
668 | if (s.ok()) { | |
669 | rep->AddOrUpdateIndex(column_family, key); | |
670 | } | |
671 | return s; | |
672 | } | |
673 | ||
674 | Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { | |
675 | rep->SetLastEntryOffset(); | |
676 | auto s = rep->write_batch.Put(key, value); | |
677 | if (s.ok()) { | |
678 | rep->AddOrUpdateIndex(key); | |
679 | } | |
680 | return s; | |
681 | } | |
682 | ||
683 | Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, | |
684 | const Slice& key) { | |
685 | rep->SetLastEntryOffset(); | |
686 | auto s = rep->write_batch.Delete(column_family, key); | |
687 | if (s.ok()) { | |
688 | rep->AddOrUpdateIndex(column_family, key); | |
689 | } | |
690 | return s; | |
691 | } | |
692 | ||
693 | Status WriteBatchWithIndex::Delete(const Slice& key) { | |
694 | rep->SetLastEntryOffset(); | |
695 | auto s = rep->write_batch.Delete(key); | |
696 | if (s.ok()) { | |
697 | rep->AddOrUpdateIndex(key); | |
698 | } | |
699 | return s; | |
700 | } | |
701 | ||
702 | Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, | |
703 | const Slice& key) { | |
704 | rep->SetLastEntryOffset(); | |
705 | auto s = rep->write_batch.SingleDelete(column_family, key); | |
706 | if (s.ok()) { | |
707 | rep->AddOrUpdateIndex(column_family, key); | |
708 | } | |
709 | return s; | |
710 | } | |
711 | ||
712 | Status WriteBatchWithIndex::SingleDelete(const Slice& key) { | |
713 | rep->SetLastEntryOffset(); | |
714 | auto s = rep->write_batch.SingleDelete(key); | |
715 | if (s.ok()) { | |
716 | rep->AddOrUpdateIndex(key); | |
717 | } | |
718 | return s; | |
719 | } | |
720 | ||
721 | Status WriteBatchWithIndex::DeleteRange(ColumnFamilyHandle* column_family, | |
722 | const Slice& begin_key, | |
723 | const Slice& end_key) { | |
724 | rep->SetLastEntryOffset(); | |
725 | auto s = rep->write_batch.DeleteRange(column_family, begin_key, end_key); | |
726 | if (s.ok()) { | |
727 | rep->AddOrUpdateIndex(column_family, begin_key); | |
728 | } | |
729 | return s; | |
730 | } | |
731 | ||
732 | Status WriteBatchWithIndex::DeleteRange(const Slice& begin_key, | |
733 | const Slice& end_key) { | |
734 | rep->SetLastEntryOffset(); | |
735 | auto s = rep->write_batch.DeleteRange(begin_key, end_key); | |
736 | if (s.ok()) { | |
737 | rep->AddOrUpdateIndex(begin_key); | |
738 | } | |
739 | return s; | |
740 | } | |
741 | ||
742 | Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, | |
743 | const Slice& key, const Slice& value) { | |
744 | rep->SetLastEntryOffset(); | |
745 | auto s = rep->write_batch.Merge(column_family, key, value); | |
746 | if (s.ok()) { | |
747 | rep->AddOrUpdateIndex(column_family, key); | |
748 | } | |
749 | return s; | |
750 | } | |
751 | ||
752 | Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { | |
753 | rep->SetLastEntryOffset(); | |
754 | auto s = rep->write_batch.Merge(key, value); | |
755 | if (s.ok()) { | |
756 | rep->AddOrUpdateIndex(key); | |
757 | } | |
758 | return s; | |
759 | } | |
760 | ||
761 | Status WriteBatchWithIndex::PutLogData(const Slice& blob) { | |
762 | return rep->write_batch.PutLogData(blob); | |
763 | } | |
764 | ||
765 | void WriteBatchWithIndex::Clear() { rep->Clear(); } | |
766 | ||
767 | Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, | |
768 | const DBOptions& options, | |
769 | const Slice& key, std::string* value) { | |
770 | Status s; | |
771 | MergeContext merge_context; | |
772 | const ImmutableDBOptions immuable_db_options(options); | |
773 | ||
774 | WriteBatchWithIndexInternal::Result result = | |
775 | WriteBatchWithIndexInternal::GetFromBatch( | |
776 | immuable_db_options, this, column_family, key, &merge_context, | |
777 | &rep->comparator, value, rep->overwrite_key, &s); | |
778 | ||
779 | switch (result) { | |
780 | case WriteBatchWithIndexInternal::Result::kFound: | |
781 | case WriteBatchWithIndexInternal::Result::kError: | |
782 | // use returned status | |
783 | break; | |
784 | case WriteBatchWithIndexInternal::Result::kDeleted: | |
785 | case WriteBatchWithIndexInternal::Result::kNotFound: | |
786 | s = Status::NotFound(); | |
787 | break; | |
788 | case WriteBatchWithIndexInternal::Result::kMergeInProgress: | |
789 | s = Status::MergeInProgress(); | |
790 | break; | |
791 | default: | |
792 | assert(false); | |
793 | } | |
794 | ||
795 | return s; | |
796 | } | |
797 | ||
798 | Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, | |
799 | const ReadOptions& read_options, | |
800 | const Slice& key, | |
801 | std::string* value) { | |
11fdf7f2 TL |
802 | assert(value != nullptr); |
803 | PinnableSlice pinnable_val(value); | |
804 | assert(!pinnable_val.IsPinned()); | |
805 | auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key, | |
806 | &pinnable_val); | |
807 | if (s.ok() && pinnable_val.IsPinned()) { | |
808 | value->assign(pinnable_val.data(), pinnable_val.size()); | |
809 | } // else value is already assigned | |
810 | return s; | |
811 | } | |
812 | ||
813 | Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, | |
814 | const ReadOptions& read_options, | |
815 | const Slice& key, | |
816 | PinnableSlice* pinnable_val) { | |
7c673cae | 817 | return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key, |
11fdf7f2 | 818 | pinnable_val); |
7c673cae FG |
819 | } |
820 | ||
821 | Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, | |
822 | const ReadOptions& read_options, | |
823 | ColumnFamilyHandle* column_family, | |
824 | const Slice& key, | |
825 | std::string* value) { | |
11fdf7f2 TL |
826 | assert(value != nullptr); |
827 | PinnableSlice pinnable_val(value); | |
828 | assert(!pinnable_val.IsPinned()); | |
829 | auto s = | |
830 | GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val); | |
831 | if (s.ok() && pinnable_val.IsPinned()) { | |
832 | value->assign(pinnable_val.data(), pinnable_val.size()); | |
833 | } // else value is already assigned | |
834 | return s; | |
835 | } | |
836 | ||
837 | Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, | |
838 | const ReadOptions& read_options, | |
839 | ColumnFamilyHandle* column_family, | |
840 | const Slice& key, | |
841 | PinnableSlice* pinnable_val) { | |
842 | return GetFromBatchAndDB(db, read_options, column_family, key, pinnable_val, | |
843 | nullptr); | |
844 | } | |
845 | ||
846 | Status WriteBatchWithIndex::GetFromBatchAndDB( | |
847 | DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, | |
848 | const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) { | |
7c673cae FG |
849 | Status s; |
850 | MergeContext merge_context; | |
851 | const ImmutableDBOptions& immuable_db_options = | |
11fdf7f2 TL |
852 | static_cast_with_check<DBImpl, DB>(db->GetRootDB()) |
853 | ->immutable_db_options(); | |
7c673cae | 854 | |
11fdf7f2 TL |
855 | // Since the lifetime of the WriteBatch is the same as that of the transaction |
856 | // we cannot pin it as otherwise the returned value will not be available | |
857 | // after the transaction finishes. | |
858 | std::string& batch_value = *pinnable_val->GetSelf(); | |
7c673cae FG |
859 | WriteBatchWithIndexInternal::Result result = |
860 | WriteBatchWithIndexInternal::GetFromBatch( | |
861 | immuable_db_options, this, column_family, key, &merge_context, | |
862 | &rep->comparator, &batch_value, rep->overwrite_key, &s); | |
863 | ||
864 | if (result == WriteBatchWithIndexInternal::Result::kFound) { | |
11fdf7f2 | 865 | pinnable_val->PinSelf(); |
7c673cae FG |
866 | return s; |
867 | } | |
868 | if (result == WriteBatchWithIndexInternal::Result::kDeleted) { | |
869 | return Status::NotFound(); | |
870 | } | |
871 | if (result == WriteBatchWithIndexInternal::Result::kError) { | |
872 | return s; | |
873 | } | |
874 | if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && | |
875 | rep->overwrite_key == true) { | |
876 | // Since we've overwritten keys, we do not know what other operations are | |
877 | // in this batch for this key, so we cannot do a Merge to compute the | |
878 | // result. Instead, we will simply return MergeInProgress. | |
879 | return Status::MergeInProgress(); | |
880 | } | |
881 | ||
882 | assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || | |
883 | result == WriteBatchWithIndexInternal::Result::kNotFound); | |
884 | ||
885 | // Did not find key in batch OR could not resolve Merges. Try DB. | |
11fdf7f2 TL |
886 | if (!callback) { |
887 | s = db->Get(read_options, column_family, key, pinnable_val); | |
888 | } else { | |
889 | s = static_cast_with_check<DBImpl, DB>(db->GetRootDB()) | |
890 | ->GetImpl(read_options, column_family, key, pinnable_val, nullptr, | |
891 | callback); | |
892 | } | |
7c673cae FG |
893 | |
894 | if (s.ok() || s.IsNotFound()) { // DB Get Succeeded | |
895 | if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) { | |
896 | // Merge result from DB with merges in Batch | |
897 | auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); | |
898 | const MergeOperator* merge_operator = | |
899 | cfh->cfd()->ioptions()->merge_operator; | |
900 | Statistics* statistics = immuable_db_options.statistics.get(); | |
901 | Env* env = immuable_db_options.env; | |
902 | Logger* logger = immuable_db_options.info_log.get(); | |
903 | ||
7c673cae FG |
904 | Slice* merge_data; |
905 | if (s.ok()) { | |
11fdf7f2 | 906 | merge_data = pinnable_val; |
7c673cae FG |
907 | } else { // Key not present in db (s.IsNotFound()) |
908 | merge_data = nullptr; | |
909 | } | |
910 | ||
911 | if (merge_operator) { | |
11fdf7f2 TL |
912 | s = MergeHelper::TimedFullMerge( |
913 | merge_operator, key, merge_data, merge_context.GetOperands(), | |
914 | pinnable_val->GetSelf(), logger, statistics, env); | |
915 | pinnable_val->PinSelf(); | |
7c673cae FG |
916 | } else { |
917 | s = Status::InvalidArgument("Options::merge_operator must be set"); | |
918 | } | |
919 | } | |
920 | } | |
921 | ||
922 | return s; | |
923 | } | |
924 | ||
925 | void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); } | |
926 | ||
927 | Status WriteBatchWithIndex::RollbackToSavePoint() { | |
928 | Status s = rep->write_batch.RollbackToSavePoint(); | |
929 | ||
930 | if (s.ok()) { | |
11fdf7f2 TL |
931 | rep->sub_batch_cnt = 1; |
932 | rep->last_sub_batch_offset = 0; | |
7c673cae FG |
933 | s = rep->ReBuildIndex(); |
934 | } | |
935 | ||
936 | return s; | |
937 | } | |
938 | ||
11fdf7f2 TL |
939 | Status WriteBatchWithIndex::PopSavePoint() { |
940 | return rep->write_batch.PopSavePoint(); | |
941 | } | |
942 | ||
7c673cae FG |
943 | void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) { |
944 | rep->write_batch.SetMaxBytes(max_bytes); | |
945 | } | |
946 | ||
11fdf7f2 TL |
947 | size_t WriteBatchWithIndex::GetDataSize() const { |
948 | return rep->write_batch.GetDataSize(); | |
949 | } | |
950 | ||
7c673cae FG |
951 | } // namespace rocksdb |
952 | #endif // !ROCKSDB_LITE |