]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc
bump version to 12.2.12-pve1
[ceph.git] / ceph / src / rocksdb / utilities / write_batch_with_index / write_batch_with_index.cc
CommitLineData
7c673cae
FG
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under the BSD-style license found in the
3// LICENSE file in the root directory of this source tree. An additional grant
4// of patent rights can be found in the PATENTS file in the same directory.
5
6#ifndef ROCKSDB_LITE
7
8#include "rocksdb/utilities/write_batch_with_index.h"
9
10#include <limits>
11#include <memory>
12
13#include "db/column_family.h"
14#include "db/db_impl.h"
15#include "db/merge_context.h"
16#include "db/merge_helper.h"
17#include "memtable/skiplist.h"
18#include "options/db_options.h"
19#include "rocksdb/comparator.h"
20#include "rocksdb/iterator.h"
21#include "util/arena.h"
22#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
23
24namespace rocksdb {
25
26// when direction == forward
27// * current_at_base_ <=> base_iterator > delta_iterator
28// when direction == backwards
29// * current_at_base_ <=> base_iterator < delta_iterator
30// always:
31// * equal_keys_ <=> base_iterator == delta_iterator
32class BaseDeltaIterator : public Iterator {
33 public:
34 BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
35 const Comparator* comparator)
36 : forward_(true),
37 current_at_base_(true),
38 equal_keys_(false),
39 status_(Status::OK()),
40 base_iterator_(base_iterator),
41 delta_iterator_(delta_iterator),
42 comparator_(comparator) {}
43
44 virtual ~BaseDeltaIterator() {}
45
46 bool Valid() const override {
47 return current_at_base_ ? BaseValid() : DeltaValid();
48 }
49
50 void SeekToFirst() override {
51 forward_ = true;
52 base_iterator_->SeekToFirst();
53 delta_iterator_->SeekToFirst();
54 UpdateCurrent();
55 }
56
57 void SeekToLast() override {
58 forward_ = false;
59 base_iterator_->SeekToLast();
60 delta_iterator_->SeekToLast();
61 UpdateCurrent();
62 }
63
64 void Seek(const Slice& k) override {
65 forward_ = true;
66 base_iterator_->Seek(k);
67 delta_iterator_->Seek(k);
68 UpdateCurrent();
69 }
70
71 void SeekForPrev(const Slice& k) override {
72 forward_ = false;
73 base_iterator_->SeekForPrev(k);
74 delta_iterator_->SeekForPrev(k);
75 UpdateCurrent();
76 }
77
78 void Next() override {
79 if (!Valid()) {
80 status_ = Status::NotSupported("Next() on invalid iterator");
81 }
82
83 if (!forward_) {
84 // Need to change direction
85 // if our direction was backward and we're not equal, we have two states:
86 // * both iterators are valid: we're already in a good state (current
87 // shows to smaller)
88 // * only one iterator is valid: we need to advance that iterator
89 forward_ = true;
90 equal_keys_ = false;
91 if (!BaseValid()) {
92 assert(DeltaValid());
93 base_iterator_->SeekToFirst();
94 } else if (!DeltaValid()) {
95 delta_iterator_->SeekToFirst();
96 } else if (current_at_base_) {
97 // Change delta from larger than base to smaller
98 AdvanceDelta();
99 } else {
100 // Change base from larger than delta to smaller
101 AdvanceBase();
102 }
103 if (DeltaValid() && BaseValid()) {
104 if (comparator_->Equal(delta_iterator_->Entry().key,
105 base_iterator_->key())) {
106 equal_keys_ = true;
107 }
108 }
109 }
110 Advance();
111 }
112
113 void Prev() override {
114 if (!Valid()) {
115 status_ = Status::NotSupported("Prev() on invalid iterator");
116 }
117
118 if (forward_) {
119 // Need to change direction
120 // if our direction was backward and we're not equal, we have two states:
121 // * both iterators are valid: we're already in a good state (current
122 // shows to smaller)
123 // * only one iterator is valid: we need to advance that iterator
124 forward_ = false;
125 equal_keys_ = false;
126 if (!BaseValid()) {
127 assert(DeltaValid());
128 base_iterator_->SeekToLast();
129 } else if (!DeltaValid()) {
130 delta_iterator_->SeekToLast();
131 } else if (current_at_base_) {
132 // Change delta from less advanced than base to more advanced
133 AdvanceDelta();
134 } else {
135 // Change base from less advanced than delta to more advanced
136 AdvanceBase();
137 }
138 if (DeltaValid() && BaseValid()) {
139 if (comparator_->Equal(delta_iterator_->Entry().key,
140 base_iterator_->key())) {
141 equal_keys_ = true;
142 }
143 }
144 }
145
146 Advance();
147 }
148
149 Slice key() const override {
150 return current_at_base_ ? base_iterator_->key()
151 : delta_iterator_->Entry().key;
152 }
153
154 Slice value() const override {
155 return current_at_base_ ? base_iterator_->value()
156 : delta_iterator_->Entry().value;
157 }
158
159 Status status() const override {
160 if (!status_.ok()) {
161 return status_;
162 }
163 if (!base_iterator_->status().ok()) {
164 return base_iterator_->status();
165 }
166 return delta_iterator_->status();
167 }
168
169 private:
170 void AssertInvariants() {
171#ifndef NDEBUG
172 if (!Valid()) {
173 return;
174 }
175 if (!BaseValid()) {
176 assert(!current_at_base_ && delta_iterator_->Valid());
177 return;
178 }
179 if (!DeltaValid()) {
180 assert(current_at_base_ && base_iterator_->Valid());
181 return;
182 }
183 // we don't support those yet
184 assert(delta_iterator_->Entry().type != kMergeRecord &&
185 delta_iterator_->Entry().type != kLogDataRecord);
186 int compare = comparator_->Compare(delta_iterator_->Entry().key,
187 base_iterator_->key());
188 if (forward_) {
189 // current_at_base -> compare < 0
190 assert(!current_at_base_ || compare < 0);
191 // !current_at_base -> compare <= 0
192 assert(current_at_base_ && compare >= 0);
193 } else {
194 // current_at_base -> compare > 0
195 assert(!current_at_base_ || compare > 0);
196 // !current_at_base -> compare <= 0
197 assert(current_at_base_ && compare <= 0);
198 }
199 // equal_keys_ <=> compare == 0
200 assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
201#endif
202 }
203
204 void Advance() {
205 if (equal_keys_) {
206 assert(BaseValid() && DeltaValid());
207 AdvanceBase();
208 AdvanceDelta();
209 } else {
210 if (current_at_base_) {
211 assert(BaseValid());
212 AdvanceBase();
213 } else {
214 assert(DeltaValid());
215 AdvanceDelta();
216 }
217 }
218 UpdateCurrent();
219 }
220
221 void AdvanceDelta() {
222 if (forward_) {
223 delta_iterator_->Next();
224 } else {
225 delta_iterator_->Prev();
226 }
227 }
228 void AdvanceBase() {
229 if (forward_) {
230 base_iterator_->Next();
231 } else {
232 base_iterator_->Prev();
233 }
234 }
235 bool BaseValid() const { return base_iterator_->Valid(); }
236 bool DeltaValid() const { return delta_iterator_->Valid(); }
237 void UpdateCurrent() {
238// Suppress false positive clang analyzer warnings.
239#ifndef __clang_analyzer__
240 while (true) {
241 WriteEntry delta_entry;
242 if (DeltaValid()) {
243 delta_entry = delta_iterator_->Entry();
244 }
245 equal_keys_ = false;
246 if (!BaseValid()) {
247 // Base has finished.
248 if (!DeltaValid()) {
249 // Finished
250 return;
251 }
252 if (delta_entry.type == kDeleteRecord ||
253 delta_entry.type == kSingleDeleteRecord) {
254 AdvanceDelta();
255 } else {
256 current_at_base_ = false;
257 return;
258 }
259 } else if (!DeltaValid()) {
260 // Delta has finished.
261 current_at_base_ = true;
262 return;
263 } else {
264 int compare =
265 (forward_ ? 1 : -1) *
266 comparator_->Compare(delta_entry.key, base_iterator_->key());
267 if (compare <= 0) { // delta bigger or equal
268 if (compare == 0) {
269 equal_keys_ = true;
270 }
271 if (delta_entry.type != kDeleteRecord &&
272 delta_entry.type != kSingleDeleteRecord) {
273 current_at_base_ = false;
274 return;
275 }
276 // Delta is less advanced and is delete.
277 AdvanceDelta();
278 if (equal_keys_) {
279 AdvanceBase();
280 }
281 } else {
282 current_at_base_ = true;
283 return;
284 }
285 }
286 }
287
288 AssertInvariants();
289#endif // __clang_analyzer__
290 }
291
292 bool forward_;
293 bool current_at_base_;
294 bool equal_keys_;
295 Status status_;
296 std::unique_ptr<Iterator> base_iterator_;
297 std::unique_ptr<WBWIIterator> delta_iterator_;
298 const Comparator* comparator_; // not owned
299};
300
301typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
302 WriteBatchEntrySkipList;
303
304class WBWIIteratorImpl : public WBWIIterator {
305 public:
306 WBWIIteratorImpl(uint32_t column_family_id,
307 WriteBatchEntrySkipList* skip_list,
308 const ReadableWriteBatch* write_batch)
309 : column_family_id_(column_family_id),
310 skip_list_iter_(skip_list),
311 write_batch_(write_batch) {}
312
313 virtual ~WBWIIteratorImpl() {}
314
315 virtual bool Valid() const override {
316 if (!skip_list_iter_.Valid()) {
317 return false;
318 }
319 const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
320 return (iter_entry != nullptr &&
321 iter_entry->column_family == column_family_id_);
322 }
323
324 virtual void SeekToFirst() override {
325 WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
326 column_family_id_, 0, 0);
327 skip_list_iter_.Seek(&search_entry);
328 }
329
330 virtual void SeekToLast() override {
331 WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
332 column_family_id_ + 1, 0, 0);
333 skip_list_iter_.Seek(&search_entry);
334 if (!skip_list_iter_.Valid()) {
335 skip_list_iter_.SeekToLast();
336 } else {
337 skip_list_iter_.Prev();
338 }
339 }
340
341 virtual void Seek(const Slice& key) override {
342 WriteBatchIndexEntry search_entry(&key, column_family_id_);
343 skip_list_iter_.Seek(&search_entry);
344 }
345
346 virtual void SeekForPrev(const Slice& key) override {
347 WriteBatchIndexEntry search_entry(&key, column_family_id_);
348 skip_list_iter_.SeekForPrev(&search_entry);
349 }
350
351 virtual void Next() override { skip_list_iter_.Next(); }
352
353 virtual void Prev() override { skip_list_iter_.Prev(); }
354
355 virtual WriteEntry Entry() const override {
356 WriteEntry ret;
357 Slice blob, xid;
358 const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
359 // this is guaranteed with Valid()
360 assert(iter_entry != nullptr &&
361 iter_entry->column_family == column_family_id_);
362 auto s = write_batch_->GetEntryFromDataOffset(
363 iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid);
364 assert(s.ok());
365 assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
366 ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord ||
367 ret.type == kMergeRecord);
368 return ret;
369 }
370
371 virtual Status status() const override {
372 // this is in-memory data structure, so the only way status can be non-ok is
373 // through memory corruption
374 return Status::OK();
375 }
376
377 const WriteBatchIndexEntry* GetRawEntry() const {
378 return skip_list_iter_.key();
379 }
380
381 private:
382 uint32_t column_family_id_;
383 WriteBatchEntrySkipList::Iterator skip_list_iter_;
384 const ReadableWriteBatch* write_batch_;
385};
386
387struct WriteBatchWithIndex::Rep {
388 Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
389 size_t max_bytes = 0, bool _overwrite_key = false)
390 : write_batch(reserved_bytes, max_bytes),
391 comparator(index_comparator, &write_batch),
392 skip_list(comparator, &arena),
393 overwrite_key(_overwrite_key),
394 last_entry_offset(0) {}
395 ReadableWriteBatch write_batch;
396 WriteBatchEntryComparator comparator;
397 Arena arena;
398 WriteBatchEntrySkipList skip_list;
399 bool overwrite_key;
400 size_t last_entry_offset;
401
402 // Remember current offset of internal write batch, which is used as
403 // the starting offset of the next record.
404 void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); }
405
406 // In overwrite mode, find the existing entry for the same key and update it
407 // to point to the current entry.
408 // Return true if the key is found and updated.
409 bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key);
410 bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key);
411
412 // Add the recent entry to the update.
413 // In overwrite mode, if key already exists in the index, update it.
414 void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key);
415 void AddOrUpdateIndex(const Slice& key);
416
417 // Allocate an index entry pointing to the last entry in the write batch and
418 // put it to skip list.
419 void AddNewEntry(uint32_t column_family_id);
420
421 // Clear all updates buffered in this batch.
422 void Clear();
423 void ClearIndex();
424
425 // Rebuild index by reading all records from the batch.
426 // Returns non-ok status on corruption.
427 Status ReBuildIndex();
428};
429
430bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
431 ColumnFamilyHandle* column_family, const Slice& key) {
432 uint32_t cf_id = GetColumnFamilyID(column_family);
433 return UpdateExistingEntryWithCfId(cf_id, key);
434}
435
436bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
437 uint32_t column_family_id, const Slice& key) {
438 if (!overwrite_key) {
439 return false;
440 }
441
442 WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch);
443 iter.Seek(key);
444 if (!iter.Valid()) {
445 return false;
446 }
447 if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) {
448 return false;
449 }
450 WriteBatchIndexEntry* non_const_entry =
451 const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
452 non_const_entry->offset = last_entry_offset;
453 return true;
454}
455
456void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
457 ColumnFamilyHandle* column_family, const Slice& key) {
458 if (!UpdateExistingEntry(column_family, key)) {
459 uint32_t cf_id = GetColumnFamilyID(column_family);
460 const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
461 if (cf_cmp != nullptr) {
462 comparator.SetComparatorForCF(cf_id, cf_cmp);
463 }
464 AddNewEntry(cf_id);
465 }
466}
467
468void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) {
469 if (!UpdateExistingEntryWithCfId(0, key)) {
470 AddNewEntry(0);
471 }
472}
473
474void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
475 const std::string& wb_data = write_batch.Data();
476 Slice entry_ptr = Slice(wb_data.data() + last_entry_offset,
477 wb_data.size() - last_entry_offset);
478 // Extract key
479 Slice key;
480 bool success __attribute__((__unused__)) =
481 ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
482 assert(success);
483
484 auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
485 auto* index_entry =
486 new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
487 key.data() - wb_data.data(), key.size());
488 skip_list.Insert(index_entry);
489 }
490
491 void WriteBatchWithIndex::Rep::Clear() {
492 write_batch.Clear();
493 ClearIndex();
494 }
495
496 void WriteBatchWithIndex::Rep::ClearIndex() {
497 skip_list.~WriteBatchEntrySkipList();
498 arena.~Arena();
499 new (&arena) Arena();
500 new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
501 last_entry_offset = 0;
502 }
503
504 Status WriteBatchWithIndex::Rep::ReBuildIndex() {
505 Status s;
506
507 ClearIndex();
508
509 if (write_batch.Count() == 0) {
510 // Nothing to re-index
511 return s;
512 }
513
514 size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
515
516 Slice input(write_batch.Data());
517 input.remove_prefix(offset);
518
519 // Loop through all entries in Rep and add each one to the index
520 int found = 0;
521 while (s.ok() && !input.empty()) {
522 Slice key, value, blob, xid;
523 uint32_t column_family_id = 0; // default
524 char tag = 0;
525
526 // set offset of current entry for call to AddNewEntry()
527 last_entry_offset = input.data() - write_batch.Data().data();
528
529 s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
530 &value, &blob, &xid);
531 if (!s.ok()) {
532 break;
533 }
534
535 switch (tag) {
536 case kTypeColumnFamilyValue:
537 case kTypeValue:
538 case kTypeColumnFamilyDeletion:
539 case kTypeDeletion:
540 case kTypeColumnFamilySingleDeletion:
541 case kTypeSingleDeletion:
542 case kTypeColumnFamilyMerge:
543 case kTypeMerge:
544 found++;
545 if (!UpdateExistingEntryWithCfId(column_family_id, key)) {
546 AddNewEntry(column_family_id);
547 }
548 break;
549 case kTypeLogData:
550 case kTypeBeginPrepareXID:
551 case kTypeEndPrepareXID:
552 case kTypeCommitXID:
553 case kTypeRollbackXID:
554 case kTypeNoop:
555 break;
556 default:
557 return Status::Corruption("unknown WriteBatch tag");
558 }
559 }
560
561 if (s.ok() && found != write_batch.Count()) {
562 s = Status::Corruption("WriteBatch has wrong count");
563 }
564
565 return s;
566 }
567
568 WriteBatchWithIndex::WriteBatchWithIndex(
569 const Comparator* default_index_comparator, size_t reserved_bytes,
570 bool overwrite_key, size_t max_bytes)
571 : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
572 overwrite_key)) {}
573
574 WriteBatchWithIndex::~WriteBatchWithIndex() {}
575
576 WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
577
578 WBWIIterator* WriteBatchWithIndex::NewIterator() {
579 return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
580}
581
582WBWIIterator* WriteBatchWithIndex::NewIterator(
583 ColumnFamilyHandle* column_family) {
584 return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
585 &(rep->skip_list), &rep->write_batch);
586}
587
588Iterator* WriteBatchWithIndex::NewIteratorWithBase(
589 ColumnFamilyHandle* column_family, Iterator* base_iterator) {
590 if (rep->overwrite_key == false) {
591 assert(false);
592 return nullptr;
593 }
594 return new BaseDeltaIterator(base_iterator, NewIterator(column_family),
595 GetColumnFamilyUserComparator(column_family));
596}
597
598Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
599 if (rep->overwrite_key == false) {
600 assert(false);
601 return nullptr;
602 }
603 // default column family's comparator
604 return new BaseDeltaIterator(base_iterator, NewIterator(),
605 rep->comparator.default_comparator());
606}
607
608Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
609 const Slice& key, const Slice& value) {
610 rep->SetLastEntryOffset();
611 auto s = rep->write_batch.Put(column_family, key, value);
612 if (s.ok()) {
613 rep->AddOrUpdateIndex(column_family, key);
614 }
615 return s;
616}
617
618Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
619 rep->SetLastEntryOffset();
620 auto s = rep->write_batch.Put(key, value);
621 if (s.ok()) {
622 rep->AddOrUpdateIndex(key);
623 }
624 return s;
625}
626
627Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
628 const Slice& key) {
629 rep->SetLastEntryOffset();
630 auto s = rep->write_batch.Delete(column_family, key);
631 if (s.ok()) {
632 rep->AddOrUpdateIndex(column_family, key);
633 }
634 return s;
635}
636
637Status WriteBatchWithIndex::Delete(const Slice& key) {
638 rep->SetLastEntryOffset();
639 auto s = rep->write_batch.Delete(key);
640 if (s.ok()) {
641 rep->AddOrUpdateIndex(key);
642 }
643 return s;
644}
645
646Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
647 const Slice& key) {
648 rep->SetLastEntryOffset();
649 auto s = rep->write_batch.SingleDelete(column_family, key);
650 if (s.ok()) {
651 rep->AddOrUpdateIndex(column_family, key);
652 }
653 return s;
654}
655
656Status WriteBatchWithIndex::SingleDelete(const Slice& key) {
657 rep->SetLastEntryOffset();
658 auto s = rep->write_batch.SingleDelete(key);
659 if (s.ok()) {
660 rep->AddOrUpdateIndex(key);
661 }
662 return s;
663}
664
665Status WriteBatchWithIndex::DeleteRange(ColumnFamilyHandle* column_family,
666 const Slice& begin_key,
667 const Slice& end_key) {
668 rep->SetLastEntryOffset();
669 auto s = rep->write_batch.DeleteRange(column_family, begin_key, end_key);
670 if (s.ok()) {
671 rep->AddOrUpdateIndex(column_family, begin_key);
672 }
673 return s;
674}
675
676Status WriteBatchWithIndex::DeleteRange(const Slice& begin_key,
677 const Slice& end_key) {
678 rep->SetLastEntryOffset();
679 auto s = rep->write_batch.DeleteRange(begin_key, end_key);
680 if (s.ok()) {
681 rep->AddOrUpdateIndex(begin_key);
682 }
683 return s;
684}
685
686Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
687 const Slice& key, const Slice& value) {
688 rep->SetLastEntryOffset();
689 auto s = rep->write_batch.Merge(column_family, key, value);
690 if (s.ok()) {
691 rep->AddOrUpdateIndex(column_family, key);
692 }
693 return s;
694}
695
696Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
697 rep->SetLastEntryOffset();
698 auto s = rep->write_batch.Merge(key, value);
699 if (s.ok()) {
700 rep->AddOrUpdateIndex(key);
701 }
702 return s;
703}
704
705Status WriteBatchWithIndex::PutLogData(const Slice& blob) {
706 return rep->write_batch.PutLogData(blob);
707}
708
709void WriteBatchWithIndex::Clear() { rep->Clear(); }
710
711Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
712 const DBOptions& options,
713 const Slice& key, std::string* value) {
714 Status s;
715 MergeContext merge_context;
716 const ImmutableDBOptions immuable_db_options(options);
717
718 WriteBatchWithIndexInternal::Result result =
719 WriteBatchWithIndexInternal::GetFromBatch(
720 immuable_db_options, this, column_family, key, &merge_context,
721 &rep->comparator, value, rep->overwrite_key, &s);
722
723 switch (result) {
724 case WriteBatchWithIndexInternal::Result::kFound:
725 case WriteBatchWithIndexInternal::Result::kError:
726 // use returned status
727 break;
728 case WriteBatchWithIndexInternal::Result::kDeleted:
729 case WriteBatchWithIndexInternal::Result::kNotFound:
730 s = Status::NotFound();
731 break;
732 case WriteBatchWithIndexInternal::Result::kMergeInProgress:
733 s = Status::MergeInProgress();
734 break;
735 default:
736 assert(false);
737 }
738
739 return s;
740}
741
742Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
743 const ReadOptions& read_options,
744 const Slice& key,
745 std::string* value) {
746 return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
747 value);
748}
749
750Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
751 const ReadOptions& read_options,
752 ColumnFamilyHandle* column_family,
753 const Slice& key,
754 std::string* value) {
755 Status s;
756 MergeContext merge_context;
757 const ImmutableDBOptions& immuable_db_options =
758 reinterpret_cast<DBImpl*>(db)->immutable_db_options();
759
760 std::string batch_value;
761 WriteBatchWithIndexInternal::Result result =
762 WriteBatchWithIndexInternal::GetFromBatch(
763 immuable_db_options, this, column_family, key, &merge_context,
764 &rep->comparator, &batch_value, rep->overwrite_key, &s);
765
766 if (result == WriteBatchWithIndexInternal::Result::kFound) {
767 value->assign(batch_value.data(), batch_value.size());
768 return s;
769 }
770 if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
771 return Status::NotFound();
772 }
773 if (result == WriteBatchWithIndexInternal::Result::kError) {
774 return s;
775 }
776 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
777 rep->overwrite_key == true) {
778 // Since we've overwritten keys, we do not know what other operations are
779 // in this batch for this key, so we cannot do a Merge to compute the
780 // result. Instead, we will simply return MergeInProgress.
781 return Status::MergeInProgress();
782 }
783
784 assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
785 result == WriteBatchWithIndexInternal::Result::kNotFound);
786
787 // Did not find key in batch OR could not resolve Merges. Try DB.
788 s = db->Get(read_options, column_family, key, value);
789
790 if (s.ok() || s.IsNotFound()) { // DB Get Succeeded
791 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) {
792 // Merge result from DB with merges in Batch
793 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
794 const MergeOperator* merge_operator =
795 cfh->cfd()->ioptions()->merge_operator;
796 Statistics* statistics = immuable_db_options.statistics.get();
797 Env* env = immuable_db_options.env;
798 Logger* logger = immuable_db_options.info_log.get();
799
800 Slice db_slice(*value);
801 Slice* merge_data;
802 if (s.ok()) {
803 merge_data = &db_slice;
804 } else { // Key not present in db (s.IsNotFound())
805 merge_data = nullptr;
806 }
807
808 if (merge_operator) {
809 s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data,
810 merge_context.GetOperands(), value,
811 logger, statistics, env);
812 } else {
813 s = Status::InvalidArgument("Options::merge_operator must be set");
814 }
815 }
816 }
817
818 return s;
819}
820
821void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }
822
823Status WriteBatchWithIndex::RollbackToSavePoint() {
824 Status s = rep->write_batch.RollbackToSavePoint();
825
826 if (s.ok()) {
827 s = rep->ReBuildIndex();
828 }
829
830 return s;
831}
832
833void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) {
834 rep->write_batch.SetMaxBytes(max_bytes);
835}
836
837} // namespace rocksdb
838#endif // !ROCKSDB_LITE