]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / write_batch_with_index / write_batch_with_index.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "rocksdb/utilities/write_batch_with_index.h"
9
10 #include <memory>
11
12 #include "db/column_family.h"
13 #include "db/db_impl/db_impl.h"
14 #include "db/merge_context.h"
15 #include "db/merge_helper.h"
16 #include "memory/arena.h"
17 #include "memtable/skiplist.h"
18 #include "options/db_options.h"
19 #include "rocksdb/comparator.h"
20 #include "rocksdb/iterator.h"
21 #include "util/cast_util.h"
22 #include "util/string_util.h"
23 #include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
24
25 namespace ROCKSDB_NAMESPACE {
26
27 // when direction == forward
28 // * current_at_base_ <=> base_iterator > delta_iterator
29 // when direction == backwards
30 // * current_at_base_ <=> base_iterator < delta_iterator
31 // always:
32 // * equal_keys_ <=> base_iterator == delta_iterator
33 class BaseDeltaIterator : public Iterator {
34 public:
35 BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
36 const Comparator* comparator,
37 const ReadOptions* read_options = nullptr)
38 : forward_(true),
39 current_at_base_(true),
40 equal_keys_(false),
41 status_(Status::OK()),
42 base_iterator_(base_iterator),
43 delta_iterator_(delta_iterator),
44 comparator_(comparator),
45 iterate_upper_bound_(read_options ? read_options->iterate_upper_bound
46 : nullptr) {}
47
48 ~BaseDeltaIterator() override {}
49
50 bool Valid() const override {
51 return current_at_base_ ? BaseValid() : DeltaValid();
52 }
53
54 void SeekToFirst() override {
55 forward_ = true;
56 base_iterator_->SeekToFirst();
57 delta_iterator_->SeekToFirst();
58 UpdateCurrent();
59 }
60
61 void SeekToLast() override {
62 forward_ = false;
63 base_iterator_->SeekToLast();
64 delta_iterator_->SeekToLast();
65 UpdateCurrent();
66 }
67
68 void Seek(const Slice& k) override {
69 forward_ = true;
70 base_iterator_->Seek(k);
71 delta_iterator_->Seek(k);
72 UpdateCurrent();
73 }
74
75 void SeekForPrev(const Slice& k) override {
76 forward_ = false;
77 base_iterator_->SeekForPrev(k);
78 delta_iterator_->SeekForPrev(k);
79 UpdateCurrent();
80 }
81
82 void Next() override {
83 if (!Valid()) {
84 status_ = Status::NotSupported("Next() on invalid iterator");
85 return;
86 }
87
88 if (!forward_) {
89 // Need to change direction
90 // if our direction was backward and we're not equal, we have two states:
91 // * both iterators are valid: we're already in a good state (current
92 // shows to smaller)
93 // * only one iterator is valid: we need to advance that iterator
94 forward_ = true;
95 equal_keys_ = false;
96 if (!BaseValid()) {
97 assert(DeltaValid());
98 base_iterator_->SeekToFirst();
99 } else if (!DeltaValid()) {
100 delta_iterator_->SeekToFirst();
101 } else if (current_at_base_) {
102 // Change delta from larger than base to smaller
103 AdvanceDelta();
104 } else {
105 // Change base from larger than delta to smaller
106 AdvanceBase();
107 }
108 if (DeltaValid() && BaseValid()) {
109 if (comparator_->Equal(delta_iterator_->Entry().key,
110 base_iterator_->key())) {
111 equal_keys_ = true;
112 }
113 }
114 }
115 Advance();
116 }
117
118 void Prev() override {
119 if (!Valid()) {
120 status_ = Status::NotSupported("Prev() on invalid iterator");
121 return;
122 }
123
124 if (forward_) {
125 // Need to change direction
126 // if our direction was backward and we're not equal, we have two states:
127 // * both iterators are valid: we're already in a good state (current
128 // shows to smaller)
129 // * only one iterator is valid: we need to advance that iterator
130 forward_ = false;
131 equal_keys_ = false;
132 if (!BaseValid()) {
133 assert(DeltaValid());
134 base_iterator_->SeekToLast();
135 } else if (!DeltaValid()) {
136 delta_iterator_->SeekToLast();
137 } else if (current_at_base_) {
138 // Change delta from less advanced than base to more advanced
139 AdvanceDelta();
140 } else {
141 // Change base from less advanced than delta to more advanced
142 AdvanceBase();
143 }
144 if (DeltaValid() && BaseValid()) {
145 if (comparator_->Equal(delta_iterator_->Entry().key,
146 base_iterator_->key())) {
147 equal_keys_ = true;
148 }
149 }
150 }
151
152 Advance();
153 }
154
155 Slice key() const override {
156 return current_at_base_ ? base_iterator_->key()
157 : delta_iterator_->Entry().key;
158 }
159
160 Slice value() const override {
161 return current_at_base_ ? base_iterator_->value()
162 : delta_iterator_->Entry().value;
163 }
164
165 Status status() const override {
166 if (!status_.ok()) {
167 return status_;
168 }
169 if (!base_iterator_->status().ok()) {
170 return base_iterator_->status();
171 }
172 return delta_iterator_->status();
173 }
174
175 private:
176 void AssertInvariants() {
177 #ifndef NDEBUG
178 bool not_ok = false;
179 if (!base_iterator_->status().ok()) {
180 assert(!base_iterator_->Valid());
181 not_ok = true;
182 }
183 if (!delta_iterator_->status().ok()) {
184 assert(!delta_iterator_->Valid());
185 not_ok = true;
186 }
187 if (not_ok) {
188 assert(!Valid());
189 assert(!status().ok());
190 return;
191 }
192
193 if (!Valid()) {
194 return;
195 }
196 if (!BaseValid()) {
197 assert(!current_at_base_ && delta_iterator_->Valid());
198 return;
199 }
200 if (!DeltaValid()) {
201 assert(current_at_base_ && base_iterator_->Valid());
202 return;
203 }
204 // we don't support those yet
205 assert(delta_iterator_->Entry().type != kMergeRecord &&
206 delta_iterator_->Entry().type != kLogDataRecord);
207 int compare = comparator_->Compare(delta_iterator_->Entry().key,
208 base_iterator_->key());
209 if (forward_) {
210 // current_at_base -> compare < 0
211 assert(!current_at_base_ || compare < 0);
212 // !current_at_base -> compare <= 0
213 assert(current_at_base_ && compare >= 0);
214 } else {
215 // current_at_base -> compare > 0
216 assert(!current_at_base_ || compare > 0);
217 // !current_at_base -> compare <= 0
218 assert(current_at_base_ && compare <= 0);
219 }
220 // equal_keys_ <=> compare == 0
221 assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
222 #endif
223 }
224
225 void Advance() {
226 if (equal_keys_) {
227 assert(BaseValid() && DeltaValid());
228 AdvanceBase();
229 AdvanceDelta();
230 } else {
231 if (current_at_base_) {
232 assert(BaseValid());
233 AdvanceBase();
234 } else {
235 assert(DeltaValid());
236 AdvanceDelta();
237 }
238 }
239 UpdateCurrent();
240 }
241
242 void AdvanceDelta() {
243 if (forward_) {
244 delta_iterator_->Next();
245 } else {
246 delta_iterator_->Prev();
247 }
248 }
249 void AdvanceBase() {
250 if (forward_) {
251 base_iterator_->Next();
252 } else {
253 base_iterator_->Prev();
254 }
255 }
256 bool BaseValid() const { return base_iterator_->Valid(); }
257 bool DeltaValid() const { return delta_iterator_->Valid(); }
258 void UpdateCurrent() {
259 // Suppress false positive clang analyzer warnings.
260 #ifndef __clang_analyzer__
261 status_ = Status::OK();
262 while (true) {
263 WriteEntry delta_entry;
264 if (DeltaValid()) {
265 assert(delta_iterator_->status().ok());
266 delta_entry = delta_iterator_->Entry();
267 } else if (!delta_iterator_->status().ok()) {
268 // Expose the error status and stop.
269 current_at_base_ = false;
270 return;
271 }
272 equal_keys_ = false;
273 if (!BaseValid()) {
274 if (!base_iterator_->status().ok()) {
275 // Expose the error status and stop.
276 current_at_base_ = true;
277 return;
278 }
279
280 // Base has finished.
281 if (!DeltaValid()) {
282 // Finished
283 return;
284 }
285 if (iterate_upper_bound_) {
286 if (comparator_->Compare(delta_entry.key, *iterate_upper_bound_) >=
287 0) {
288 // out of upper bound -> finished.
289 return;
290 }
291 }
292 if (delta_entry.type == kDeleteRecord ||
293 delta_entry.type == kSingleDeleteRecord) {
294 AdvanceDelta();
295 } else {
296 current_at_base_ = false;
297 return;
298 }
299 } else if (!DeltaValid()) {
300 // Delta has finished.
301 current_at_base_ = true;
302 return;
303 } else {
304 int compare =
305 (forward_ ? 1 : -1) *
306 comparator_->Compare(delta_entry.key, base_iterator_->key());
307 if (compare <= 0) { // delta bigger or equal
308 if (compare == 0) {
309 equal_keys_ = true;
310 }
311 if (delta_entry.type != kDeleteRecord &&
312 delta_entry.type != kSingleDeleteRecord) {
313 current_at_base_ = false;
314 return;
315 }
316 // Delta is less advanced and is delete.
317 AdvanceDelta();
318 if (equal_keys_) {
319 AdvanceBase();
320 }
321 } else {
322 current_at_base_ = true;
323 return;
324 }
325 }
326 }
327
328 AssertInvariants();
329 #endif // __clang_analyzer__
330 }
331
332 bool forward_;
333 bool current_at_base_;
334 bool equal_keys_;
335 Status status_;
336 std::unique_ptr<Iterator> base_iterator_;
337 std::unique_ptr<WBWIIterator> delta_iterator_;
338 const Comparator* comparator_; // not owned
339 const Slice* iterate_upper_bound_;
340 };
341
342 typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
343 WriteBatchEntrySkipList;
344
345 class WBWIIteratorImpl : public WBWIIterator {
346 public:
347 WBWIIteratorImpl(uint32_t column_family_id,
348 WriteBatchEntrySkipList* skip_list,
349 const ReadableWriteBatch* write_batch)
350 : column_family_id_(column_family_id),
351 skip_list_iter_(skip_list),
352 write_batch_(write_batch) {}
353
354 ~WBWIIteratorImpl() override {}
355
356 bool Valid() const override {
357 if (!skip_list_iter_.Valid()) {
358 return false;
359 }
360 const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
361 return (iter_entry != nullptr &&
362 iter_entry->column_family == column_family_id_);
363 }
364
365 void SeekToFirst() override {
366 WriteBatchIndexEntry search_entry(
367 nullptr /* search_key */, column_family_id_,
368 true /* is_forward_direction */, true /* is_seek_to_first */);
369 skip_list_iter_.Seek(&search_entry);
370 }
371
372 void SeekToLast() override {
373 WriteBatchIndexEntry search_entry(
374 nullptr /* search_key */, column_family_id_ + 1,
375 true /* is_forward_direction */, true /* is_seek_to_first */);
376 skip_list_iter_.Seek(&search_entry);
377 if (!skip_list_iter_.Valid()) {
378 skip_list_iter_.SeekToLast();
379 } else {
380 skip_list_iter_.Prev();
381 }
382 }
383
384 void Seek(const Slice& key) override {
385 WriteBatchIndexEntry search_entry(&key, column_family_id_,
386 true /* is_forward_direction */,
387 false /* is_seek_to_first */);
388 skip_list_iter_.Seek(&search_entry);
389 }
390
391 void SeekForPrev(const Slice& key) override {
392 WriteBatchIndexEntry search_entry(&key, column_family_id_,
393 false /* is_forward_direction */,
394 false /* is_seek_to_first */);
395 skip_list_iter_.SeekForPrev(&search_entry);
396 }
397
398 void Next() override { skip_list_iter_.Next(); }
399
400 void Prev() override { skip_list_iter_.Prev(); }
401
402 WriteEntry Entry() const override {
403 WriteEntry ret;
404 Slice blob, xid;
405 const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
406 // this is guaranteed with Valid()
407 assert(iter_entry != nullptr &&
408 iter_entry->column_family == column_family_id_);
409 auto s = write_batch_->GetEntryFromDataOffset(
410 iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid);
411 assert(s.ok());
412 assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
413 ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord ||
414 ret.type == kMergeRecord);
415 return ret;
416 }
417
418 Status status() const override {
419 // this is in-memory data structure, so the only way status can be non-ok is
420 // through memory corruption
421 return Status::OK();
422 }
423
424 const WriteBatchIndexEntry* GetRawEntry() const {
425 return skip_list_iter_.key();
426 }
427
428 private:
429 uint32_t column_family_id_;
430 WriteBatchEntrySkipList::Iterator skip_list_iter_;
431 const ReadableWriteBatch* write_batch_;
432 };
433
434 struct WriteBatchWithIndex::Rep {
435 explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
436 size_t max_bytes = 0, bool _overwrite_key = false)
437 : write_batch(reserved_bytes, max_bytes),
438 comparator(index_comparator, &write_batch),
439 skip_list(comparator, &arena),
440 overwrite_key(_overwrite_key),
441 last_entry_offset(0),
442 last_sub_batch_offset(0),
443 sub_batch_cnt(1) {}
444 ReadableWriteBatch write_batch;
445 WriteBatchEntryComparator comparator;
446 Arena arena;
447 WriteBatchEntrySkipList skip_list;
448 bool overwrite_key;
449 size_t last_entry_offset;
450 // The starting offset of the last sub-batch. A sub-batch starts right before
451 // inserting a key that is a duplicate of a key in the last sub-batch. Zero,
452 // the default, means that no duplicate key is detected so far.
453 size_t last_sub_batch_offset;
454 // Total number of sub-batches in the write batch. Default is 1.
455 size_t sub_batch_cnt;
456
457 // Remember current offset of internal write batch, which is used as
458 // the starting offset of the next record.
459 void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); }
460
461 // In overwrite mode, find the existing entry for the same key and update it
462 // to point to the current entry.
463 // Return true if the key is found and updated.
464 bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key);
465 bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key);
466
467 // Add the recent entry to the update.
468 // In overwrite mode, if key already exists in the index, update it.
469 void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key);
470 void AddOrUpdateIndex(const Slice& key);
471
472 // Allocate an index entry pointing to the last entry in the write batch and
473 // put it to skip list.
474 void AddNewEntry(uint32_t column_family_id);
475
476 // Clear all updates buffered in this batch.
477 void Clear();
478 void ClearIndex();
479
480 // Rebuild index by reading all records from the batch.
481 // Returns non-ok status on corruption.
482 Status ReBuildIndex();
483 };
484
485 bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
486 ColumnFamilyHandle* column_family, const Slice& key) {
487 uint32_t cf_id = GetColumnFamilyID(column_family);
488 return UpdateExistingEntryWithCfId(cf_id, key);
489 }
490
491 bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
492 uint32_t column_family_id, const Slice& key) {
493 if (!overwrite_key) {
494 return false;
495 }
496
497 WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch);
498 iter.Seek(key);
499 if (!iter.Valid()) {
500 return false;
501 }
502 if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) {
503 return false;
504 }
505 WriteBatchIndexEntry* non_const_entry =
506 const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
507 if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) {
508 last_sub_batch_offset = last_entry_offset;
509 sub_batch_cnt++;
510 }
511 non_const_entry->offset = last_entry_offset;
512 return true;
513 }
514
515 void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
516 ColumnFamilyHandle* column_family, const Slice& key) {
517 if (!UpdateExistingEntry(column_family, key)) {
518 uint32_t cf_id = GetColumnFamilyID(column_family);
519 const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
520 if (cf_cmp != nullptr) {
521 comparator.SetComparatorForCF(cf_id, cf_cmp);
522 }
523 AddNewEntry(cf_id);
524 }
525 }
526
527 void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) {
528 if (!UpdateExistingEntryWithCfId(0, key)) {
529 AddNewEntry(0);
530 }
531 }
532
533 void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
534 const std::string& wb_data = write_batch.Data();
535 Slice entry_ptr = Slice(wb_data.data() + last_entry_offset,
536 wb_data.size() - last_entry_offset);
537 // Extract key
538 Slice key;
539 bool success __attribute__((__unused__));
540 success =
541 ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
542 assert(success);
543
544 auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
545 auto* index_entry =
546 new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
547 key.data() - wb_data.data(), key.size());
548 skip_list.Insert(index_entry);
549 }
550
551 void WriteBatchWithIndex::Rep::Clear() {
552 write_batch.Clear();
553 ClearIndex();
554 }
555
556 void WriteBatchWithIndex::Rep::ClearIndex() {
557 skip_list.~WriteBatchEntrySkipList();
558 arena.~Arena();
559 new (&arena) Arena();
560 new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
561 last_entry_offset = 0;
562 last_sub_batch_offset = 0;
563 sub_batch_cnt = 1;
564 }
565
566 Status WriteBatchWithIndex::Rep::ReBuildIndex() {
567 Status s;
568
569 ClearIndex();
570
571 if (write_batch.Count() == 0) {
572 // Nothing to re-index
573 return s;
574 }
575
576 size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
577
578 Slice input(write_batch.Data());
579 input.remove_prefix(offset);
580
581 // Loop through all entries in Rep and add each one to the index
582 uint32_t found = 0;
583 while (s.ok() && !input.empty()) {
584 Slice key, value, blob, xid;
585 uint32_t column_family_id = 0; // default
586 char tag = 0;
587
588 // set offset of current entry for call to AddNewEntry()
589 last_entry_offset = input.data() - write_batch.Data().data();
590
591 s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
592 &value, &blob, &xid);
593 if (!s.ok()) {
594 break;
595 }
596
597 switch (tag) {
598 case kTypeColumnFamilyValue:
599 case kTypeValue:
600 case kTypeColumnFamilyDeletion:
601 case kTypeDeletion:
602 case kTypeColumnFamilySingleDeletion:
603 case kTypeSingleDeletion:
604 case kTypeColumnFamilyMerge:
605 case kTypeMerge:
606 found++;
607 if (!UpdateExistingEntryWithCfId(column_family_id, key)) {
608 AddNewEntry(column_family_id);
609 }
610 break;
611 case kTypeLogData:
612 case kTypeBeginPrepareXID:
613 case kTypeBeginPersistedPrepareXID:
614 case kTypeBeginUnprepareXID:
615 case kTypeEndPrepareXID:
616 case kTypeCommitXID:
617 case kTypeRollbackXID:
618 case kTypeNoop:
619 break;
620 default:
621 return Status::Corruption("unknown WriteBatch tag in ReBuildIndex",
622 ToString(static_cast<unsigned int>(tag)));
623 }
624 }
625
626 if (s.ok() && found != write_batch.Count()) {
627 s = Status::Corruption("WriteBatch has wrong count");
628 }
629
630 return s;
631 }
632
633 WriteBatchWithIndex::WriteBatchWithIndex(
634 const Comparator* default_index_comparator, size_t reserved_bytes,
635 bool overwrite_key, size_t max_bytes)
636 : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
637 overwrite_key)) {}
638
639 WriteBatchWithIndex::~WriteBatchWithIndex() {}
640
641 WriteBatchWithIndex::WriteBatchWithIndex(WriteBatchWithIndex&&) = default;
642
643 WriteBatchWithIndex& WriteBatchWithIndex::operator=(WriteBatchWithIndex&&) =
644 default;
645
646 WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
647
648 size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }
649
650 WBWIIterator* WriteBatchWithIndex::NewIterator() {
651 return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
652 }
653
654 WBWIIterator* WriteBatchWithIndex::NewIterator(
655 ColumnFamilyHandle* column_family) {
656 return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
657 &(rep->skip_list), &rep->write_batch);
658 }
659
660 Iterator* WriteBatchWithIndex::NewIteratorWithBase(
661 ColumnFamilyHandle* column_family, Iterator* base_iterator,
662 const ReadOptions* read_options) {
663 if (rep->overwrite_key == false) {
664 assert(false);
665 return nullptr;
666 }
667 return new BaseDeltaIterator(base_iterator, NewIterator(column_family),
668 GetColumnFamilyUserComparator(column_family),
669 read_options);
670 }
671
672 Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
673 if (rep->overwrite_key == false) {
674 assert(false);
675 return nullptr;
676 }
677 // default column family's comparator
678 return new BaseDeltaIterator(base_iterator, NewIterator(),
679 rep->comparator.default_comparator());
680 }
681
682 Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
683 const Slice& key, const Slice& value) {
684 rep->SetLastEntryOffset();
685 auto s = rep->write_batch.Put(column_family, key, value);
686 if (s.ok()) {
687 rep->AddOrUpdateIndex(column_family, key);
688 }
689 return s;
690 }
691
692 Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
693 rep->SetLastEntryOffset();
694 auto s = rep->write_batch.Put(key, value);
695 if (s.ok()) {
696 rep->AddOrUpdateIndex(key);
697 }
698 return s;
699 }
700
701 Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
702 const Slice& key) {
703 rep->SetLastEntryOffset();
704 auto s = rep->write_batch.Delete(column_family, key);
705 if (s.ok()) {
706 rep->AddOrUpdateIndex(column_family, key);
707 }
708 return s;
709 }
710
711 Status WriteBatchWithIndex::Delete(const Slice& key) {
712 rep->SetLastEntryOffset();
713 auto s = rep->write_batch.Delete(key);
714 if (s.ok()) {
715 rep->AddOrUpdateIndex(key);
716 }
717 return s;
718 }
719
720 Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
721 const Slice& key) {
722 rep->SetLastEntryOffset();
723 auto s = rep->write_batch.SingleDelete(column_family, key);
724 if (s.ok()) {
725 rep->AddOrUpdateIndex(column_family, key);
726 }
727 return s;
728 }
729
730 Status WriteBatchWithIndex::SingleDelete(const Slice& key) {
731 rep->SetLastEntryOffset();
732 auto s = rep->write_batch.SingleDelete(key);
733 if (s.ok()) {
734 rep->AddOrUpdateIndex(key);
735 }
736 return s;
737 }
738
739 Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
740 const Slice& key, const Slice& value) {
741 rep->SetLastEntryOffset();
742 auto s = rep->write_batch.Merge(column_family, key, value);
743 if (s.ok()) {
744 rep->AddOrUpdateIndex(column_family, key);
745 }
746 return s;
747 }
748
749 Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
750 rep->SetLastEntryOffset();
751 auto s = rep->write_batch.Merge(key, value);
752 if (s.ok()) {
753 rep->AddOrUpdateIndex(key);
754 }
755 return s;
756 }
757
758 Status WriteBatchWithIndex::PutLogData(const Slice& blob) {
759 return rep->write_batch.PutLogData(blob);
760 }
761
762 void WriteBatchWithIndex::Clear() { rep->Clear(); }
763
764 Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
765 const DBOptions& options,
766 const Slice& key, std::string* value) {
767 Status s;
768 MergeContext merge_context;
769 const ImmutableDBOptions immuable_db_options(options);
770
771 WriteBatchWithIndexInternal::Result result =
772 WriteBatchWithIndexInternal::GetFromBatch(
773 immuable_db_options, this, column_family, key, &merge_context,
774 &rep->comparator, value, rep->overwrite_key, &s);
775
776 switch (result) {
777 case WriteBatchWithIndexInternal::Result::kFound:
778 case WriteBatchWithIndexInternal::Result::kError:
779 // use returned status
780 break;
781 case WriteBatchWithIndexInternal::Result::kDeleted:
782 case WriteBatchWithIndexInternal::Result::kNotFound:
783 s = Status::NotFound();
784 break;
785 case WriteBatchWithIndexInternal::Result::kMergeInProgress:
786 s = Status::MergeInProgress();
787 break;
788 default:
789 assert(false);
790 }
791
792 return s;
793 }
794
795 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
796 const ReadOptions& read_options,
797 const Slice& key,
798 std::string* value) {
799 assert(value != nullptr);
800 PinnableSlice pinnable_val(value);
801 assert(!pinnable_val.IsPinned());
802 auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
803 &pinnable_val);
804 if (s.ok() && pinnable_val.IsPinned()) {
805 value->assign(pinnable_val.data(), pinnable_val.size());
806 } // else value is already assigned
807 return s;
808 }
809
810 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
811 const ReadOptions& read_options,
812 const Slice& key,
813 PinnableSlice* pinnable_val) {
814 return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
815 pinnable_val);
816 }
817
818 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
819 const ReadOptions& read_options,
820 ColumnFamilyHandle* column_family,
821 const Slice& key,
822 std::string* value) {
823 assert(value != nullptr);
824 PinnableSlice pinnable_val(value);
825 assert(!pinnable_val.IsPinned());
826 auto s =
827 GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val);
828 if (s.ok() && pinnable_val.IsPinned()) {
829 value->assign(pinnable_val.data(), pinnable_val.size());
830 } // else value is already assigned
831 return s;
832 }
833
834 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
835 const ReadOptions& read_options,
836 ColumnFamilyHandle* column_family,
837 const Slice& key,
838 PinnableSlice* pinnable_val) {
839 return GetFromBatchAndDB(db, read_options, column_family, key, pinnable_val,
840 nullptr);
841 }
842
843 Status WriteBatchWithIndex::GetFromBatchAndDB(
844 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
845 const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
846 Status s;
847 MergeContext merge_context;
848 const ImmutableDBOptions& immuable_db_options =
849 static_cast_with_check<DBImpl>(db->GetRootDB())->immutable_db_options();
850
851 // Since the lifetime of the WriteBatch is the same as that of the transaction
852 // we cannot pin it as otherwise the returned value will not be available
853 // after the transaction finishes.
854 std::string& batch_value = *pinnable_val->GetSelf();
855 WriteBatchWithIndexInternal::Result result =
856 WriteBatchWithIndexInternal::GetFromBatch(
857 immuable_db_options, this, column_family, key, &merge_context,
858 &rep->comparator, &batch_value, rep->overwrite_key, &s);
859
860 if (result == WriteBatchWithIndexInternal::Result::kFound) {
861 pinnable_val->PinSelf();
862 return s;
863 }
864 if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
865 return Status::NotFound();
866 }
867 if (result == WriteBatchWithIndexInternal::Result::kError) {
868 return s;
869 }
870 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
871 rep->overwrite_key == true) {
872 // Since we've overwritten keys, we do not know what other operations are
873 // in this batch for this key, so we cannot do a Merge to compute the
874 // result. Instead, we will simply return MergeInProgress.
875 return Status::MergeInProgress();
876 }
877
878 assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
879 result == WriteBatchWithIndexInternal::Result::kNotFound);
880
881 // Did not find key in batch OR could not resolve Merges. Try DB.
882 if (!callback) {
883 s = db->Get(read_options, column_family, key, pinnable_val);
884 } else {
885 DBImpl::GetImplOptions get_impl_options;
886 get_impl_options.column_family = column_family;
887 get_impl_options.value = pinnable_val;
888 get_impl_options.callback = callback;
889 s = static_cast_with_check<DBImpl>(db->GetRootDB())
890 ->GetImpl(read_options, key, get_impl_options);
891 }
892
893 if (s.ok() || s.IsNotFound()) { // DB Get Succeeded
894 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) {
895 // Merge result from DB with merges in Batch
896 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
897 const MergeOperator* merge_operator =
898 cfh->cfd()->ioptions()->merge_operator;
899 Statistics* statistics = immuable_db_options.statistics.get();
900 Env* env = immuable_db_options.env;
901 Logger* logger = immuable_db_options.info_log.get();
902
903 Slice* merge_data;
904 if (s.ok()) {
905 merge_data = pinnable_val;
906 } else { // Key not present in db (s.IsNotFound())
907 merge_data = nullptr;
908 }
909
910 if (merge_operator) {
911 std::string merge_result;
912 s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data,
913 merge_context.GetOperands(),
914 &merge_result, logger, statistics, env);
915 pinnable_val->Reset();
916 *pinnable_val->GetSelf() = std::move(merge_result);
917 pinnable_val->PinSelf();
918 } else {
919 s = Status::InvalidArgument("Options::merge_operator must be set");
920 }
921 }
922 }
923
924 return s;
925 }
926
927 void WriteBatchWithIndex::MultiGetFromBatchAndDB(
928 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
929 const size_t num_keys, const Slice* keys, PinnableSlice* values,
930 Status* statuses, bool sorted_input) {
931 MultiGetFromBatchAndDB(db, read_options, column_family, num_keys, keys,
932 values, statuses, sorted_input, nullptr);
933 }
934
935 void WriteBatchWithIndex::MultiGetFromBatchAndDB(
936 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
937 const size_t num_keys, const Slice* keys, PinnableSlice* values,
938 Status* statuses, bool sorted_input, ReadCallback* callback) {
939 const ImmutableDBOptions& immuable_db_options =
940 static_cast_with_check<DBImpl>(db->GetRootDB())->immutable_db_options();
941
942 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
943 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
944 // To hold merges from the write batch
945 autovector<std::pair<WriteBatchWithIndexInternal::Result, MergeContext>,
946 MultiGetContext::MAX_BATCH_SIZE>
947 merges;
948 // Since the lifetime of the WriteBatch is the same as that of the transaction
949 // we cannot pin it as otherwise the returned value will not be available
950 // after the transaction finishes.
951 for (size_t i = 0; i < num_keys; ++i) {
952 MergeContext merge_context;
953 PinnableSlice* pinnable_val = &values[i];
954 std::string& batch_value = *pinnable_val->GetSelf();
955 Status* s = &statuses[i];
956 WriteBatchWithIndexInternal::Result result =
957 WriteBatchWithIndexInternal::GetFromBatch(
958 immuable_db_options, this, column_family, keys[i], &merge_context,
959 &rep->comparator, &batch_value, rep->overwrite_key, s);
960
961 if (result == WriteBatchWithIndexInternal::Result::kFound) {
962 pinnable_val->PinSelf();
963 continue;
964 }
965 if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
966 *s = Status::NotFound();
967 continue;
968 }
969 if (result == WriteBatchWithIndexInternal::Result::kError) {
970 continue;
971 }
972 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
973 rep->overwrite_key == true) {
974 // Since we've overwritten keys, we do not know what other operations are
975 // in this batch for this key, so we cannot do a Merge to compute the
976 // result. Instead, we will simply return MergeInProgress.
977 *s = Status::MergeInProgress();
978 continue;
979 }
980
981 assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
982 result == WriteBatchWithIndexInternal::Result::kNotFound);
983 key_context.emplace_back(column_family, keys[i], &values[i],
984 /*timestamp*/ nullptr, &statuses[i]);
985 merges.emplace_back(result, std::move(merge_context));
986 }
987
988 for (KeyContext& key : key_context) {
989 sorted_keys.emplace_back(&key);
990 }
991
992 // Did not find key in batch OR could not resolve Merges. Try DB.
993 static_cast_with_check<DBImpl>(db->GetRootDB())
994 ->PrepareMultiGetKeys(key_context.size(), sorted_input, &sorted_keys);
995 static_cast_with_check<DBImpl>(db->GetRootDB())
996 ->MultiGetWithCallback(read_options, column_family, callback,
997 &sorted_keys);
998
999 ColumnFamilyHandleImpl* cfh =
1000 static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1001 const MergeOperator* merge_operator = cfh->cfd()->ioptions()->merge_operator;
1002 for (auto iter = key_context.begin(); iter != key_context.end(); ++iter) {
1003 KeyContext& key = *iter;
1004 if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded
1005 size_t index = iter - key_context.begin();
1006 std::pair<WriteBatchWithIndexInternal::Result, MergeContext>&
1007 merge_result = merges[index];
1008 if (merge_result.first ==
1009 WriteBatchWithIndexInternal::Result::kMergeInProgress) {
1010 // Merge result from DB with merges in Batch
1011 Statistics* statistics = immuable_db_options.statistics.get();
1012 Env* env = immuable_db_options.env;
1013 Logger* logger = immuable_db_options.info_log.get();
1014
1015 Slice* merge_data;
1016 if (key.s->ok()) {
1017 merge_data = iter->value;
1018 } else { // Key not present in db (s.IsNotFound())
1019 merge_data = nullptr;
1020 }
1021
1022 if (merge_operator) {
1023 *key.s = MergeHelper::TimedFullMerge(
1024 merge_operator, *key.key, merge_data,
1025 merge_result.second.GetOperands(), key.value->GetSelf(), logger,
1026 statistics, env);
1027 key.value->PinSelf();
1028 } else {
1029 *key.s =
1030 Status::InvalidArgument("Options::merge_operator must be set");
1031 }
1032 }
1033 }
1034 }
1035 }
1036
1037 void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }
1038
1039 Status WriteBatchWithIndex::RollbackToSavePoint() {
1040 Status s = rep->write_batch.RollbackToSavePoint();
1041
1042 if (s.ok()) {
1043 rep->sub_batch_cnt = 1;
1044 rep->last_sub_batch_offset = 0;
1045 s = rep->ReBuildIndex();
1046 }
1047
1048 return s;
1049 }
1050
1051 Status WriteBatchWithIndex::PopSavePoint() {
1052 return rep->write_batch.PopSavePoint();
1053 }
1054
1055 void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) {
1056 rep->write_batch.SetMaxBytes(max_bytes);
1057 }
1058
1059 size_t WriteBatchWithIndex::GetDataSize() const {
1060 return rep->write_batch.GetDataSize();
1061 }
1062
1063 } // namespace ROCKSDB_NAMESPACE
1064 #endif // !ROCKSDB_LITE