1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "rocksdb/utilities/write_batch_with_index.h"
12 #include "db/column_family.h"
13 #include "db/db_impl/db_impl.h"
14 #include "db/merge_context.h"
15 #include "db/merge_helper.h"
16 #include "memory/arena.h"
17 #include "memtable/skiplist.h"
18 #include "options/db_options.h"
19 #include "rocksdb/comparator.h"
20 #include "rocksdb/iterator.h"
21 #include "util/cast_util.h"
22 #include "util/string_util.h"
23 #include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
25 namespace ROCKSDB_NAMESPACE
{
27 // when direction == forward
28 // * current_at_base_ <=> base_iterator > delta_iterator
29 // when direction == backwards
30 // * current_at_base_ <=> base_iterator < delta_iterator
32 // * equal_keys_ <=> base_iterator == delta_iterator
33 class BaseDeltaIterator
: public Iterator
{
35 BaseDeltaIterator(Iterator
* base_iterator
, WBWIIterator
* delta_iterator
,
36 const Comparator
* comparator
,
37 const ReadOptions
* read_options
= nullptr)
39 current_at_base_(true),
41 status_(Status::OK()),
42 base_iterator_(base_iterator
),
43 delta_iterator_(delta_iterator
),
44 comparator_(comparator
),
45 iterate_upper_bound_(read_options
? read_options
->iterate_upper_bound
48 ~BaseDeltaIterator() override
{}
50 bool Valid() const override
{
51 return current_at_base_
? BaseValid() : DeltaValid();
54 void SeekToFirst() override
{
56 base_iterator_
->SeekToFirst();
57 delta_iterator_
->SeekToFirst();
61 void SeekToLast() override
{
63 base_iterator_
->SeekToLast();
64 delta_iterator_
->SeekToLast();
68 void Seek(const Slice
& k
) override
{
70 base_iterator_
->Seek(k
);
71 delta_iterator_
->Seek(k
);
75 void SeekForPrev(const Slice
& k
) override
{
77 base_iterator_
->SeekForPrev(k
);
78 delta_iterator_
->SeekForPrev(k
);
82 void Next() override
{
84 status_
= Status::NotSupported("Next() on invalid iterator");
89 // Need to change direction
90 // if our direction was backward and we're not equal, we have two states:
91 // * both iterators are valid: we're already in a good state (current
93 // * only one iterator is valid: we need to advance that iterator
98 base_iterator_
->SeekToFirst();
99 } else if (!DeltaValid()) {
100 delta_iterator_
->SeekToFirst();
101 } else if (current_at_base_
) {
102 // Change delta from larger than base to smaller
105 // Change base from larger than delta to smaller
108 if (DeltaValid() && BaseValid()) {
109 if (comparator_
->Equal(delta_iterator_
->Entry().key
,
110 base_iterator_
->key())) {
118 void Prev() override
{
120 status_
= Status::NotSupported("Prev() on invalid iterator");
125 // Need to change direction
126 // if our direction was backward and we're not equal, we have two states:
127 // * both iterators are valid: we're already in a good state (current
129 // * only one iterator is valid: we need to advance that iterator
133 assert(DeltaValid());
134 base_iterator_
->SeekToLast();
135 } else if (!DeltaValid()) {
136 delta_iterator_
->SeekToLast();
137 } else if (current_at_base_
) {
138 // Change delta from less advanced than base to more advanced
141 // Change base from less advanced than delta to more advanced
144 if (DeltaValid() && BaseValid()) {
145 if (comparator_
->Equal(delta_iterator_
->Entry().key
,
146 base_iterator_
->key())) {
155 Slice
key() const override
{
156 return current_at_base_
? base_iterator_
->key()
157 : delta_iterator_
->Entry().key
;
160 Slice
value() const override
{
161 return current_at_base_
? base_iterator_
->value()
162 : delta_iterator_
->Entry().value
;
165 Status
status() const override
{
169 if (!base_iterator_
->status().ok()) {
170 return base_iterator_
->status();
172 return delta_iterator_
->status();
176 void AssertInvariants() {
179 if (!base_iterator_
->status().ok()) {
180 assert(!base_iterator_
->Valid());
183 if (!delta_iterator_
->status().ok()) {
184 assert(!delta_iterator_
->Valid());
189 assert(!status().ok());
197 assert(!current_at_base_
&& delta_iterator_
->Valid());
201 assert(current_at_base_
&& base_iterator_
->Valid());
204 // we don't support those yet
205 assert(delta_iterator_
->Entry().type
!= kMergeRecord
&&
206 delta_iterator_
->Entry().type
!= kLogDataRecord
);
207 int compare
= comparator_
->Compare(delta_iterator_
->Entry().key
,
208 base_iterator_
->key());
210 // current_at_base -> compare < 0
211 assert(!current_at_base_
|| compare
< 0);
212 // !current_at_base -> compare <= 0
213 assert(current_at_base_
&& compare
>= 0);
215 // current_at_base -> compare > 0
216 assert(!current_at_base_
|| compare
> 0);
217 // !current_at_base -> compare <= 0
218 assert(current_at_base_
&& compare
<= 0);
220 // equal_keys_ <=> compare == 0
221 assert((equal_keys_
|| compare
!= 0) && (!equal_keys_
|| compare
== 0));
227 assert(BaseValid() && DeltaValid());
231 if (current_at_base_
) {
235 assert(DeltaValid());
242 void AdvanceDelta() {
244 delta_iterator_
->Next();
246 delta_iterator_
->Prev();
251 base_iterator_
->Next();
253 base_iterator_
->Prev();
256 bool BaseValid() const { return base_iterator_
->Valid(); }
257 bool DeltaValid() const { return delta_iterator_
->Valid(); }
258 void UpdateCurrent() {
259 // Suppress false positive clang analyzer warnings.
260 #ifndef __clang_analyzer__
261 status_
= Status::OK();
263 WriteEntry delta_entry
;
265 assert(delta_iterator_
->status().ok());
266 delta_entry
= delta_iterator_
->Entry();
267 } else if (!delta_iterator_
->status().ok()) {
268 // Expose the error status and stop.
269 current_at_base_
= false;
274 if (!base_iterator_
->status().ok()) {
275 // Expose the error status and stop.
276 current_at_base_
= true;
280 // Base has finished.
285 if (iterate_upper_bound_
) {
286 if (comparator_
->Compare(delta_entry
.key
, *iterate_upper_bound_
) >=
288 // out of upper bound -> finished.
292 if (delta_entry
.type
== kDeleteRecord
||
293 delta_entry
.type
== kSingleDeleteRecord
) {
296 current_at_base_
= false;
299 } else if (!DeltaValid()) {
300 // Delta has finished.
301 current_at_base_
= true;
305 (forward_
? 1 : -1) *
306 comparator_
->Compare(delta_entry
.key
, base_iterator_
->key());
307 if (compare
<= 0) { // delta bigger or equal
311 if (delta_entry
.type
!= kDeleteRecord
&&
312 delta_entry
.type
!= kSingleDeleteRecord
) {
313 current_at_base_
= false;
316 // Delta is less advanced and is delete.
322 current_at_base_
= true;
329 #endif // __clang_analyzer__
333 bool current_at_base_
;
336 std::unique_ptr
<Iterator
> base_iterator_
;
337 std::unique_ptr
<WBWIIterator
> delta_iterator_
;
338 const Comparator
* comparator_
; // not owned
339 const Slice
* iterate_upper_bound_
;
342 typedef SkipList
<WriteBatchIndexEntry
*, const WriteBatchEntryComparator
&>
343 WriteBatchEntrySkipList
;
345 class WBWIIteratorImpl
: public WBWIIterator
{
347 WBWIIteratorImpl(uint32_t column_family_id
,
348 WriteBatchEntrySkipList
* skip_list
,
349 const ReadableWriteBatch
* write_batch
)
350 : column_family_id_(column_family_id
),
351 skip_list_iter_(skip_list
),
352 write_batch_(write_batch
) {}
354 ~WBWIIteratorImpl() override
{}
356 bool Valid() const override
{
357 if (!skip_list_iter_
.Valid()) {
360 const WriteBatchIndexEntry
* iter_entry
= skip_list_iter_
.key();
361 return (iter_entry
!= nullptr &&
362 iter_entry
->column_family
== column_family_id_
);
365 void SeekToFirst() override
{
366 WriteBatchIndexEntry
search_entry(
367 nullptr /* search_key */, column_family_id_
,
368 true /* is_forward_direction */, true /* is_seek_to_first */);
369 skip_list_iter_
.Seek(&search_entry
);
372 void SeekToLast() override
{
373 WriteBatchIndexEntry
search_entry(
374 nullptr /* search_key */, column_family_id_
+ 1,
375 true /* is_forward_direction */, true /* is_seek_to_first */);
376 skip_list_iter_
.Seek(&search_entry
);
377 if (!skip_list_iter_
.Valid()) {
378 skip_list_iter_
.SeekToLast();
380 skip_list_iter_
.Prev();
384 void Seek(const Slice
& key
) override
{
385 WriteBatchIndexEntry
search_entry(&key
, column_family_id_
,
386 true /* is_forward_direction */,
387 false /* is_seek_to_first */);
388 skip_list_iter_
.Seek(&search_entry
);
391 void SeekForPrev(const Slice
& key
) override
{
392 WriteBatchIndexEntry
search_entry(&key
, column_family_id_
,
393 false /* is_forward_direction */,
394 false /* is_seek_to_first */);
395 skip_list_iter_
.SeekForPrev(&search_entry
);
398 void Next() override
{ skip_list_iter_
.Next(); }
400 void Prev() override
{ skip_list_iter_
.Prev(); }
402 WriteEntry
Entry() const override
{
405 const WriteBatchIndexEntry
* iter_entry
= skip_list_iter_
.key();
406 // this is guaranteed with Valid()
407 assert(iter_entry
!= nullptr &&
408 iter_entry
->column_family
== column_family_id_
);
409 auto s
= write_batch_
->GetEntryFromDataOffset(
410 iter_entry
->offset
, &ret
.type
, &ret
.key
, &ret
.value
, &blob
, &xid
);
412 assert(ret
.type
== kPutRecord
|| ret
.type
== kDeleteRecord
||
413 ret
.type
== kSingleDeleteRecord
|| ret
.type
== kDeleteRangeRecord
||
414 ret
.type
== kMergeRecord
);
418 Status
status() const override
{
419 // this is in-memory data structure, so the only way status can be non-ok is
420 // through memory corruption
424 const WriteBatchIndexEntry
* GetRawEntry() const {
425 return skip_list_iter_
.key();
429 uint32_t column_family_id_
;
430 WriteBatchEntrySkipList::Iterator skip_list_iter_
;
431 const ReadableWriteBatch
* write_batch_
;
434 struct WriteBatchWithIndex::Rep
{
435 explicit Rep(const Comparator
* index_comparator
, size_t reserved_bytes
= 0,
436 size_t max_bytes
= 0, bool _overwrite_key
= false)
437 : write_batch(reserved_bytes
, max_bytes
),
438 comparator(index_comparator
, &write_batch
),
439 skip_list(comparator
, &arena
),
440 overwrite_key(_overwrite_key
),
441 last_entry_offset(0),
442 last_sub_batch_offset(0),
444 ReadableWriteBatch write_batch
;
445 WriteBatchEntryComparator comparator
;
447 WriteBatchEntrySkipList skip_list
;
449 size_t last_entry_offset
;
450 // The starting offset of the last sub-batch. A sub-batch starts right before
451 // inserting a key that is a duplicate of a key in the last sub-batch. Zero,
452 // the default, means that no duplicate key is detected so far.
453 size_t last_sub_batch_offset
;
454 // Total number of sub-batches in the write batch. Default is 1.
455 size_t sub_batch_cnt
;
457 // Remember current offset of internal write batch, which is used as
458 // the starting offset of the next record.
459 void SetLastEntryOffset() { last_entry_offset
= write_batch
.GetDataSize(); }
461 // In overwrite mode, find the existing entry for the same key and update it
462 // to point to the current entry.
463 // Return true if the key is found and updated.
464 bool UpdateExistingEntry(ColumnFamilyHandle
* column_family
, const Slice
& key
);
465 bool UpdateExistingEntryWithCfId(uint32_t column_family_id
, const Slice
& key
);
467 // Add the recent entry to the update.
468 // In overwrite mode, if key already exists in the index, update it.
469 void AddOrUpdateIndex(ColumnFamilyHandle
* column_family
, const Slice
& key
);
470 void AddOrUpdateIndex(const Slice
& key
);
472 // Allocate an index entry pointing to the last entry in the write batch and
473 // put it to skip list.
474 void AddNewEntry(uint32_t column_family_id
);
476 // Clear all updates buffered in this batch.
480 // Rebuild index by reading all records from the batch.
481 // Returns non-ok status on corruption.
482 Status
ReBuildIndex();
485 bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
486 ColumnFamilyHandle
* column_family
, const Slice
& key
) {
487 uint32_t cf_id
= GetColumnFamilyID(column_family
);
488 return UpdateExistingEntryWithCfId(cf_id
, key
);
491 bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
492 uint32_t column_family_id
, const Slice
& key
) {
493 if (!overwrite_key
) {
497 WBWIIteratorImpl
iter(column_family_id
, &skip_list
, &write_batch
);
502 if (comparator
.CompareKey(column_family_id
, key
, iter
.Entry().key
) != 0) {
505 WriteBatchIndexEntry
* non_const_entry
=
506 const_cast<WriteBatchIndexEntry
*>(iter
.GetRawEntry());
507 if (LIKELY(last_sub_batch_offset
<= non_const_entry
->offset
)) {
508 last_sub_batch_offset
= last_entry_offset
;
511 non_const_entry
->offset
= last_entry_offset
;
515 void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
516 ColumnFamilyHandle
* column_family
, const Slice
& key
) {
517 if (!UpdateExistingEntry(column_family
, key
)) {
518 uint32_t cf_id
= GetColumnFamilyID(column_family
);
519 const auto* cf_cmp
= GetColumnFamilyUserComparator(column_family
);
520 if (cf_cmp
!= nullptr) {
521 comparator
.SetComparatorForCF(cf_id
, cf_cmp
);
527 void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice
& key
) {
528 if (!UpdateExistingEntryWithCfId(0, key
)) {
533 void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id
) {
534 const std::string
& wb_data
= write_batch
.Data();
535 Slice entry_ptr
= Slice(wb_data
.data() + last_entry_offset
,
536 wb_data
.size() - last_entry_offset
);
539 bool success
__attribute__((__unused__
));
541 ReadKeyFromWriteBatchEntry(&entry_ptr
, &key
, column_family_id
!= 0);
544 auto* mem
= arena
.Allocate(sizeof(WriteBatchIndexEntry
));
546 new (mem
) WriteBatchIndexEntry(last_entry_offset
, column_family_id
,
547 key
.data() - wb_data
.data(), key
.size());
548 skip_list
.Insert(index_entry
);
551 void WriteBatchWithIndex::Rep::Clear() {
556 void WriteBatchWithIndex::Rep::ClearIndex() {
557 skip_list
.~WriteBatchEntrySkipList();
559 new (&arena
) Arena();
560 new (&skip_list
) WriteBatchEntrySkipList(comparator
, &arena
);
561 last_entry_offset
= 0;
562 last_sub_batch_offset
= 0;
566 Status
WriteBatchWithIndex::Rep::ReBuildIndex() {
571 if (write_batch
.Count() == 0) {
572 // Nothing to re-index
576 size_t offset
= WriteBatchInternal::GetFirstOffset(&write_batch
);
578 Slice
input(write_batch
.Data());
579 input
.remove_prefix(offset
);
581 // Loop through all entries in Rep and add each one to the index
583 while (s
.ok() && !input
.empty()) {
584 Slice key
, value
, blob
, xid
;
585 uint32_t column_family_id
= 0; // default
588 // set offset of current entry for call to AddNewEntry()
589 last_entry_offset
= input
.data() - write_batch
.Data().data();
591 s
= ReadRecordFromWriteBatch(&input
, &tag
, &column_family_id
, &key
,
592 &value
, &blob
, &xid
);
598 case kTypeColumnFamilyValue
:
600 case kTypeColumnFamilyDeletion
:
602 case kTypeColumnFamilySingleDeletion
:
603 case kTypeSingleDeletion
:
604 case kTypeColumnFamilyMerge
:
607 if (!UpdateExistingEntryWithCfId(column_family_id
, key
)) {
608 AddNewEntry(column_family_id
);
612 case kTypeBeginPrepareXID
:
613 case kTypeBeginPersistedPrepareXID
:
614 case kTypeBeginUnprepareXID
:
615 case kTypeEndPrepareXID
:
617 case kTypeRollbackXID
:
621 return Status::Corruption("unknown WriteBatch tag in ReBuildIndex",
622 ToString(static_cast<unsigned int>(tag
)));
626 if (s
.ok() && found
!= write_batch
.Count()) {
627 s
= Status::Corruption("WriteBatch has wrong count");
633 WriteBatchWithIndex::WriteBatchWithIndex(
634 const Comparator
* default_index_comparator
, size_t reserved_bytes
,
635 bool overwrite_key
, size_t max_bytes
)
636 : rep(new Rep(default_index_comparator
, reserved_bytes
, max_bytes
,
639 WriteBatchWithIndex::~WriteBatchWithIndex() {}
641 WriteBatchWithIndex::WriteBatchWithIndex(WriteBatchWithIndex
&&) = default;
643 WriteBatchWithIndex
& WriteBatchWithIndex::operator=(WriteBatchWithIndex
&&) =
646 WriteBatch
* WriteBatchWithIndex::GetWriteBatch() { return &rep
->write_batch
; }
648 size_t WriteBatchWithIndex::SubBatchCnt() { return rep
->sub_batch_cnt
; }
650 WBWIIterator
* WriteBatchWithIndex::NewIterator() {
651 return new WBWIIteratorImpl(0, &(rep
->skip_list
), &rep
->write_batch
);
654 WBWIIterator
* WriteBatchWithIndex::NewIterator(
655 ColumnFamilyHandle
* column_family
) {
656 return new WBWIIteratorImpl(GetColumnFamilyID(column_family
),
657 &(rep
->skip_list
), &rep
->write_batch
);
660 Iterator
* WriteBatchWithIndex::NewIteratorWithBase(
661 ColumnFamilyHandle
* column_family
, Iterator
* base_iterator
,
662 const ReadOptions
* read_options
) {
663 if (rep
->overwrite_key
== false) {
667 return new BaseDeltaIterator(base_iterator
, NewIterator(column_family
),
668 GetColumnFamilyUserComparator(column_family
),
672 Iterator
* WriteBatchWithIndex::NewIteratorWithBase(Iterator
* base_iterator
) {
673 if (rep
->overwrite_key
== false) {
677 // default column family's comparator
678 return new BaseDeltaIterator(base_iterator
, NewIterator(),
679 rep
->comparator
.default_comparator());
682 Status
WriteBatchWithIndex::Put(ColumnFamilyHandle
* column_family
,
683 const Slice
& key
, const Slice
& value
) {
684 rep
->SetLastEntryOffset();
685 auto s
= rep
->write_batch
.Put(column_family
, key
, value
);
687 rep
->AddOrUpdateIndex(column_family
, key
);
692 Status
WriteBatchWithIndex::Put(const Slice
& key
, const Slice
& value
) {
693 rep
->SetLastEntryOffset();
694 auto s
= rep
->write_batch
.Put(key
, value
);
696 rep
->AddOrUpdateIndex(key
);
701 Status
WriteBatchWithIndex::Delete(ColumnFamilyHandle
* column_family
,
703 rep
->SetLastEntryOffset();
704 auto s
= rep
->write_batch
.Delete(column_family
, key
);
706 rep
->AddOrUpdateIndex(column_family
, key
);
711 Status
WriteBatchWithIndex::Delete(const Slice
& key
) {
712 rep
->SetLastEntryOffset();
713 auto s
= rep
->write_batch
.Delete(key
);
715 rep
->AddOrUpdateIndex(key
);
720 Status
WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle
* column_family
,
722 rep
->SetLastEntryOffset();
723 auto s
= rep
->write_batch
.SingleDelete(column_family
, key
);
725 rep
->AddOrUpdateIndex(column_family
, key
);
730 Status
WriteBatchWithIndex::SingleDelete(const Slice
& key
) {
731 rep
->SetLastEntryOffset();
732 auto s
= rep
->write_batch
.SingleDelete(key
);
734 rep
->AddOrUpdateIndex(key
);
739 Status
WriteBatchWithIndex::Merge(ColumnFamilyHandle
* column_family
,
740 const Slice
& key
, const Slice
& value
) {
741 rep
->SetLastEntryOffset();
742 auto s
= rep
->write_batch
.Merge(column_family
, key
, value
);
744 rep
->AddOrUpdateIndex(column_family
, key
);
749 Status
WriteBatchWithIndex::Merge(const Slice
& key
, const Slice
& value
) {
750 rep
->SetLastEntryOffset();
751 auto s
= rep
->write_batch
.Merge(key
, value
);
753 rep
->AddOrUpdateIndex(key
);
758 Status
WriteBatchWithIndex::PutLogData(const Slice
& blob
) {
759 return rep
->write_batch
.PutLogData(blob
);
762 void WriteBatchWithIndex::Clear() { rep
->Clear(); }
764 Status
WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle
* column_family
,
765 const DBOptions
& options
,
766 const Slice
& key
, std::string
* value
) {
768 MergeContext merge_context
;
769 const ImmutableDBOptions
immuable_db_options(options
);
771 WriteBatchWithIndexInternal::Result result
=
772 WriteBatchWithIndexInternal::GetFromBatch(
773 immuable_db_options
, this, column_family
, key
, &merge_context
,
774 &rep
->comparator
, value
, rep
->overwrite_key
, &s
);
777 case WriteBatchWithIndexInternal::Result::kFound
:
778 case WriteBatchWithIndexInternal::Result::kError
:
779 // use returned status
781 case WriteBatchWithIndexInternal::Result::kDeleted
:
782 case WriteBatchWithIndexInternal::Result::kNotFound
:
783 s
= Status::NotFound();
785 case WriteBatchWithIndexInternal::Result::kMergeInProgress
:
786 s
= Status::MergeInProgress();
795 Status
WriteBatchWithIndex::GetFromBatchAndDB(DB
* db
,
796 const ReadOptions
& read_options
,
798 std::string
* value
) {
799 assert(value
!= nullptr);
800 PinnableSlice
pinnable_val(value
);
801 assert(!pinnable_val
.IsPinned());
802 auto s
= GetFromBatchAndDB(db
, read_options
, db
->DefaultColumnFamily(), key
,
804 if (s
.ok() && pinnable_val
.IsPinned()) {
805 value
->assign(pinnable_val
.data(), pinnable_val
.size());
806 } // else value is already assigned
810 Status
WriteBatchWithIndex::GetFromBatchAndDB(DB
* db
,
811 const ReadOptions
& read_options
,
813 PinnableSlice
* pinnable_val
) {
814 return GetFromBatchAndDB(db
, read_options
, db
->DefaultColumnFamily(), key
,
818 Status
WriteBatchWithIndex::GetFromBatchAndDB(DB
* db
,
819 const ReadOptions
& read_options
,
820 ColumnFamilyHandle
* column_family
,
822 std::string
* value
) {
823 assert(value
!= nullptr);
824 PinnableSlice
pinnable_val(value
);
825 assert(!pinnable_val
.IsPinned());
827 GetFromBatchAndDB(db
, read_options
, column_family
, key
, &pinnable_val
);
828 if (s
.ok() && pinnable_val
.IsPinned()) {
829 value
->assign(pinnable_val
.data(), pinnable_val
.size());
830 } // else value is already assigned
834 Status
WriteBatchWithIndex::GetFromBatchAndDB(DB
* db
,
835 const ReadOptions
& read_options
,
836 ColumnFamilyHandle
* column_family
,
838 PinnableSlice
* pinnable_val
) {
839 return GetFromBatchAndDB(db
, read_options
, column_family
, key
, pinnable_val
,
843 Status
WriteBatchWithIndex::GetFromBatchAndDB(
844 DB
* db
, const ReadOptions
& read_options
, ColumnFamilyHandle
* column_family
,
845 const Slice
& key
, PinnableSlice
* pinnable_val
, ReadCallback
* callback
) {
847 MergeContext merge_context
;
848 const ImmutableDBOptions
& immuable_db_options
=
849 static_cast_with_check
<DBImpl
>(db
->GetRootDB())->immutable_db_options();
851 // Since the lifetime of the WriteBatch is the same as that of the transaction
852 // we cannot pin it as otherwise the returned value will not be available
853 // after the transaction finishes.
854 std::string
& batch_value
= *pinnable_val
->GetSelf();
855 WriteBatchWithIndexInternal::Result result
=
856 WriteBatchWithIndexInternal::GetFromBatch(
857 immuable_db_options
, this, column_family
, key
, &merge_context
,
858 &rep
->comparator
, &batch_value
, rep
->overwrite_key
, &s
);
860 if (result
== WriteBatchWithIndexInternal::Result::kFound
) {
861 pinnable_val
->PinSelf();
864 if (result
== WriteBatchWithIndexInternal::Result::kDeleted
) {
865 return Status::NotFound();
867 if (result
== WriteBatchWithIndexInternal::Result::kError
) {
870 if (result
== WriteBatchWithIndexInternal::Result::kMergeInProgress
&&
871 rep
->overwrite_key
== true) {
872 // Since we've overwritten keys, we do not know what other operations are
873 // in this batch for this key, so we cannot do a Merge to compute the
874 // result. Instead, we will simply return MergeInProgress.
875 return Status::MergeInProgress();
878 assert(result
== WriteBatchWithIndexInternal::Result::kMergeInProgress
||
879 result
== WriteBatchWithIndexInternal::Result::kNotFound
);
881 // Did not find key in batch OR could not resolve Merges. Try DB.
883 s
= db
->Get(read_options
, column_family
, key
, pinnable_val
);
885 DBImpl::GetImplOptions get_impl_options
;
886 get_impl_options
.column_family
= column_family
;
887 get_impl_options
.value
= pinnable_val
;
888 get_impl_options
.callback
= callback
;
889 s
= static_cast_with_check
<DBImpl
>(db
->GetRootDB())
890 ->GetImpl(read_options
, key
, get_impl_options
);
893 if (s
.ok() || s
.IsNotFound()) { // DB Get Succeeded
894 if (result
== WriteBatchWithIndexInternal::Result::kMergeInProgress
) {
895 // Merge result from DB with merges in Batch
896 auto cfh
= static_cast_with_check
<ColumnFamilyHandleImpl
>(column_family
);
897 const MergeOperator
* merge_operator
=
898 cfh
->cfd()->ioptions()->merge_operator
;
899 Statistics
* statistics
= immuable_db_options
.statistics
.get();
900 Env
* env
= immuable_db_options
.env
;
901 Logger
* logger
= immuable_db_options
.info_log
.get();
905 merge_data
= pinnable_val
;
906 } else { // Key not present in db (s.IsNotFound())
907 merge_data
= nullptr;
910 if (merge_operator
) {
911 std::string merge_result
;
912 s
= MergeHelper::TimedFullMerge(merge_operator
, key
, merge_data
,
913 merge_context
.GetOperands(),
914 &merge_result
, logger
, statistics
, env
);
915 pinnable_val
->Reset();
916 *pinnable_val
->GetSelf() = std::move(merge_result
);
917 pinnable_val
->PinSelf();
919 s
= Status::InvalidArgument("Options::merge_operator must be set");
927 void WriteBatchWithIndex::MultiGetFromBatchAndDB(
928 DB
* db
, const ReadOptions
& read_options
, ColumnFamilyHandle
* column_family
,
929 const size_t num_keys
, const Slice
* keys
, PinnableSlice
* values
,
930 Status
* statuses
, bool sorted_input
) {
931 MultiGetFromBatchAndDB(db
, read_options
, column_family
, num_keys
, keys
,
932 values
, statuses
, sorted_input
, nullptr);
935 void WriteBatchWithIndex::MultiGetFromBatchAndDB(
936 DB
* db
, const ReadOptions
& read_options
, ColumnFamilyHandle
* column_family
,
937 const size_t num_keys
, const Slice
* keys
, PinnableSlice
* values
,
938 Status
* statuses
, bool sorted_input
, ReadCallback
* callback
) {
939 const ImmutableDBOptions
& immuable_db_options
=
940 static_cast_with_check
<DBImpl
>(db
->GetRootDB())->immutable_db_options();
942 autovector
<KeyContext
, MultiGetContext::MAX_BATCH_SIZE
> key_context
;
943 autovector
<KeyContext
*, MultiGetContext::MAX_BATCH_SIZE
> sorted_keys
;
944 // To hold merges from the write batch
945 autovector
<std::pair
<WriteBatchWithIndexInternal::Result
, MergeContext
>,
946 MultiGetContext::MAX_BATCH_SIZE
>
948 // Since the lifetime of the WriteBatch is the same as that of the transaction
949 // we cannot pin it as otherwise the returned value will not be available
950 // after the transaction finishes.
951 for (size_t i
= 0; i
< num_keys
; ++i
) {
952 MergeContext merge_context
;
953 PinnableSlice
* pinnable_val
= &values
[i
];
954 std::string
& batch_value
= *pinnable_val
->GetSelf();
955 Status
* s
= &statuses
[i
];
956 WriteBatchWithIndexInternal::Result result
=
957 WriteBatchWithIndexInternal::GetFromBatch(
958 immuable_db_options
, this, column_family
, keys
[i
], &merge_context
,
959 &rep
->comparator
, &batch_value
, rep
->overwrite_key
, s
);
961 if (result
== WriteBatchWithIndexInternal::Result::kFound
) {
962 pinnable_val
->PinSelf();
965 if (result
== WriteBatchWithIndexInternal::Result::kDeleted
) {
966 *s
= Status::NotFound();
969 if (result
== WriteBatchWithIndexInternal::Result::kError
) {
972 if (result
== WriteBatchWithIndexInternal::Result::kMergeInProgress
&&
973 rep
->overwrite_key
== true) {
974 // Since we've overwritten keys, we do not know what other operations are
975 // in this batch for this key, so we cannot do a Merge to compute the
976 // result. Instead, we will simply return MergeInProgress.
977 *s
= Status::MergeInProgress();
981 assert(result
== WriteBatchWithIndexInternal::Result::kMergeInProgress
||
982 result
== WriteBatchWithIndexInternal::Result::kNotFound
);
983 key_context
.emplace_back(column_family
, keys
[i
], &values
[i
],
984 /*timestamp*/ nullptr, &statuses
[i
]);
985 merges
.emplace_back(result
, std::move(merge_context
));
988 for (KeyContext
& key
: key_context
) {
989 sorted_keys
.emplace_back(&key
);
992 // Did not find key in batch OR could not resolve Merges. Try DB.
993 static_cast_with_check
<DBImpl
>(db
->GetRootDB())
994 ->PrepareMultiGetKeys(key_context
.size(), sorted_input
, &sorted_keys
);
995 static_cast_with_check
<DBImpl
>(db
->GetRootDB())
996 ->MultiGetWithCallback(read_options
, column_family
, callback
,
999 ColumnFamilyHandleImpl
* cfh
=
1000 static_cast_with_check
<ColumnFamilyHandleImpl
>(column_family
);
1001 const MergeOperator
* merge_operator
= cfh
->cfd()->ioptions()->merge_operator
;
1002 for (auto iter
= key_context
.begin(); iter
!= key_context
.end(); ++iter
) {
1003 KeyContext
& key
= *iter
;
1004 if (key
.s
->ok() || key
.s
->IsNotFound()) { // DB Get Succeeded
1005 size_t index
= iter
- key_context
.begin();
1006 std::pair
<WriteBatchWithIndexInternal::Result
, MergeContext
>&
1007 merge_result
= merges
[index
];
1008 if (merge_result
.first
==
1009 WriteBatchWithIndexInternal::Result::kMergeInProgress
) {
1010 // Merge result from DB with merges in Batch
1011 Statistics
* statistics
= immuable_db_options
.statistics
.get();
1012 Env
* env
= immuable_db_options
.env
;
1013 Logger
* logger
= immuable_db_options
.info_log
.get();
1017 merge_data
= iter
->value
;
1018 } else { // Key not present in db (s.IsNotFound())
1019 merge_data
= nullptr;
1022 if (merge_operator
) {
1023 *key
.s
= MergeHelper::TimedFullMerge(
1024 merge_operator
, *key
.key
, merge_data
,
1025 merge_result
.second
.GetOperands(), key
.value
->GetSelf(), logger
,
1027 key
.value
->PinSelf();
1030 Status::InvalidArgument("Options::merge_operator must be set");
1037 void WriteBatchWithIndex::SetSavePoint() { rep
->write_batch
.SetSavePoint(); }
1039 Status
WriteBatchWithIndex::RollbackToSavePoint() {
1040 Status s
= rep
->write_batch
.RollbackToSavePoint();
1043 rep
->sub_batch_cnt
= 1;
1044 rep
->last_sub_batch_offset
= 0;
1045 s
= rep
->ReBuildIndex();
1051 Status
WriteBatchWithIndex::PopSavePoint() {
1052 return rep
->write_batch
.PopSavePoint();
1055 void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes
) {
1056 rep
->write_batch
.SetMaxBytes(max_bytes
);
1059 size_t WriteBatchWithIndex::GetDataSize() const {
1060 return rep
->write_batch
.GetDataSize();
1063 } // namespace ROCKSDB_NAMESPACE
1064 #endif // !ROCKSDB_LITE