]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / rocksdb / utilities / write_batch_with_index / write_batch_with_index.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
8#include "rocksdb/utilities/write_batch_with_index.h"
9
7c673cae
FG
10#include <memory>
11
12#include "db/column_family.h"
13#include "db/db_impl.h"
14#include "db/merge_context.h"
15#include "db/merge_helper.h"
16#include "memtable/skiplist.h"
17#include "options/db_options.h"
18#include "rocksdb/comparator.h"
19#include "rocksdb/iterator.h"
20#include "util/arena.h"
11fdf7f2
TL
21#include "util/cast_util.h"
22#include "util/string_util.h"
7c673cae
FG
23#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
24
25namespace rocksdb {
26
27// when direction == forward
28// * current_at_base_ <=> base_iterator > delta_iterator
29// when direction == backwards
30// * current_at_base_ <=> base_iterator < delta_iterator
31// always:
32// * equal_keys_ <=> base_iterator == delta_iterator
33class BaseDeltaIterator : public Iterator {
34 public:
35 BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
36 const Comparator* comparator)
37 : forward_(true),
38 current_at_base_(true),
39 equal_keys_(false),
40 status_(Status::OK()),
41 base_iterator_(base_iterator),
42 delta_iterator_(delta_iterator),
43 comparator_(comparator) {}
44
494da23a 45 ~BaseDeltaIterator() override {}
7c673cae
FG
46
47 bool Valid() const override {
48 return current_at_base_ ? BaseValid() : DeltaValid();
49 }
50
51 void SeekToFirst() override {
52 forward_ = true;
53 base_iterator_->SeekToFirst();
54 delta_iterator_->SeekToFirst();
55 UpdateCurrent();
56 }
57
58 void SeekToLast() override {
59 forward_ = false;
60 base_iterator_->SeekToLast();
61 delta_iterator_->SeekToLast();
62 UpdateCurrent();
63 }
64
65 void Seek(const Slice& k) override {
66 forward_ = true;
67 base_iterator_->Seek(k);
68 delta_iterator_->Seek(k);
69 UpdateCurrent();
70 }
71
72 void SeekForPrev(const Slice& k) override {
73 forward_ = false;
74 base_iterator_->SeekForPrev(k);
75 delta_iterator_->SeekForPrev(k);
76 UpdateCurrent();
77 }
78
79 void Next() override {
80 if (!Valid()) {
81 status_ = Status::NotSupported("Next() on invalid iterator");
11fdf7f2 82 return;
7c673cae
FG
83 }
84
85 if (!forward_) {
86 // Need to change direction
87 // if our direction was backward and we're not equal, we have two states:
88 // * both iterators are valid: we're already in a good state (current
89 // shows to smaller)
90 // * only one iterator is valid: we need to advance that iterator
91 forward_ = true;
92 equal_keys_ = false;
93 if (!BaseValid()) {
94 assert(DeltaValid());
95 base_iterator_->SeekToFirst();
96 } else if (!DeltaValid()) {
97 delta_iterator_->SeekToFirst();
98 } else if (current_at_base_) {
99 // Change delta from larger than base to smaller
100 AdvanceDelta();
101 } else {
102 // Change base from larger than delta to smaller
103 AdvanceBase();
104 }
105 if (DeltaValid() && BaseValid()) {
106 if (comparator_->Equal(delta_iterator_->Entry().key,
107 base_iterator_->key())) {
108 equal_keys_ = true;
109 }
110 }
111 }
112 Advance();
113 }
114
115 void Prev() override {
116 if (!Valid()) {
117 status_ = Status::NotSupported("Prev() on invalid iterator");
11fdf7f2 118 return;
7c673cae
FG
119 }
120
121 if (forward_) {
122 // Need to change direction
123 // if our direction was backward and we're not equal, we have two states:
124 // * both iterators are valid: we're already in a good state (current
125 // shows to smaller)
126 // * only one iterator is valid: we need to advance that iterator
127 forward_ = false;
128 equal_keys_ = false;
129 if (!BaseValid()) {
130 assert(DeltaValid());
131 base_iterator_->SeekToLast();
132 } else if (!DeltaValid()) {
133 delta_iterator_->SeekToLast();
134 } else if (current_at_base_) {
135 // Change delta from less advanced than base to more advanced
136 AdvanceDelta();
137 } else {
138 // Change base from less advanced than delta to more advanced
139 AdvanceBase();
140 }
141 if (DeltaValid() && BaseValid()) {
142 if (comparator_->Equal(delta_iterator_->Entry().key,
143 base_iterator_->key())) {
144 equal_keys_ = true;
145 }
146 }
147 }
148
149 Advance();
150 }
151
152 Slice key() const override {
153 return current_at_base_ ? base_iterator_->key()
154 : delta_iterator_->Entry().key;
155 }
156
157 Slice value() const override {
158 return current_at_base_ ? base_iterator_->value()
159 : delta_iterator_->Entry().value;
160 }
161
162 Status status() const override {
163 if (!status_.ok()) {
164 return status_;
165 }
166 if (!base_iterator_->status().ok()) {
167 return base_iterator_->status();
168 }
169 return delta_iterator_->status();
170 }
171
172 private:
173 void AssertInvariants() {
174#ifndef NDEBUG
11fdf7f2
TL
175 bool not_ok = false;
176 if (!base_iterator_->status().ok()) {
177 assert(!base_iterator_->Valid());
178 not_ok = true;
179 }
180 if (!delta_iterator_->status().ok()) {
181 assert(!delta_iterator_->Valid());
182 not_ok = true;
183 }
184 if (not_ok) {
185 assert(!Valid());
186 assert(!status().ok());
187 return;
188 }
189
7c673cae
FG
190 if (!Valid()) {
191 return;
192 }
193 if (!BaseValid()) {
194 assert(!current_at_base_ && delta_iterator_->Valid());
195 return;
196 }
197 if (!DeltaValid()) {
198 assert(current_at_base_ && base_iterator_->Valid());
199 return;
200 }
201 // we don't support those yet
202 assert(delta_iterator_->Entry().type != kMergeRecord &&
203 delta_iterator_->Entry().type != kLogDataRecord);
204 int compare = comparator_->Compare(delta_iterator_->Entry().key,
205 base_iterator_->key());
206 if (forward_) {
207 // current_at_base -> compare < 0
208 assert(!current_at_base_ || compare < 0);
209 // !current_at_base -> compare <= 0
210 assert(current_at_base_ && compare >= 0);
211 } else {
212 // current_at_base -> compare > 0
213 assert(!current_at_base_ || compare > 0);
214 // !current_at_base -> compare <= 0
215 assert(current_at_base_ && compare <= 0);
216 }
217 // equal_keys_ <=> compare == 0
218 assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
219#endif
220 }
221
222 void Advance() {
223 if (equal_keys_) {
224 assert(BaseValid() && DeltaValid());
225 AdvanceBase();
226 AdvanceDelta();
227 } else {
228 if (current_at_base_) {
229 assert(BaseValid());
230 AdvanceBase();
231 } else {
232 assert(DeltaValid());
233 AdvanceDelta();
234 }
235 }
236 UpdateCurrent();
237 }
238
239 void AdvanceDelta() {
240 if (forward_) {
241 delta_iterator_->Next();
242 } else {
243 delta_iterator_->Prev();
244 }
245 }
246 void AdvanceBase() {
247 if (forward_) {
248 base_iterator_->Next();
249 } else {
250 base_iterator_->Prev();
251 }
252 }
253 bool BaseValid() const { return base_iterator_->Valid(); }
254 bool DeltaValid() const { return delta_iterator_->Valid(); }
255 void UpdateCurrent() {
256// Suppress false positive clang analyzer warnings.
257#ifndef __clang_analyzer__
11fdf7f2 258 status_ = Status::OK();
7c673cae
FG
259 while (true) {
260 WriteEntry delta_entry;
261 if (DeltaValid()) {
11fdf7f2 262 assert(delta_iterator_->status().ok());
7c673cae 263 delta_entry = delta_iterator_->Entry();
11fdf7f2
TL
264 } else if (!delta_iterator_->status().ok()) {
265 // Expose the error status and stop.
266 current_at_base_ = false;
267 return;
7c673cae
FG
268 }
269 equal_keys_ = false;
270 if (!BaseValid()) {
11fdf7f2
TL
271 if (!base_iterator_->status().ok()) {
272 // Expose the error status and stop.
273 current_at_base_ = true;
274 return;
275 }
276
7c673cae
FG
277 // Base has finished.
278 if (!DeltaValid()) {
279 // Finished
280 return;
281 }
282 if (delta_entry.type == kDeleteRecord ||
283 delta_entry.type == kSingleDeleteRecord) {
284 AdvanceDelta();
285 } else {
286 current_at_base_ = false;
287 return;
288 }
289 } else if (!DeltaValid()) {
290 // Delta has finished.
291 current_at_base_ = true;
292 return;
293 } else {
294 int compare =
295 (forward_ ? 1 : -1) *
296 comparator_->Compare(delta_entry.key, base_iterator_->key());
297 if (compare <= 0) { // delta bigger or equal
298 if (compare == 0) {
299 equal_keys_ = true;
300 }
301 if (delta_entry.type != kDeleteRecord &&
302 delta_entry.type != kSingleDeleteRecord) {
303 current_at_base_ = false;
304 return;
305 }
306 // Delta is less advanced and is delete.
307 AdvanceDelta();
308 if (equal_keys_) {
309 AdvanceBase();
310 }
311 } else {
312 current_at_base_ = true;
313 return;
314 }
315 }
316 }
317
318 AssertInvariants();
319#endif // __clang_analyzer__
320 }
321
322 bool forward_;
323 bool current_at_base_;
324 bool equal_keys_;
325 Status status_;
326 std::unique_ptr<Iterator> base_iterator_;
327 std::unique_ptr<WBWIIterator> delta_iterator_;
328 const Comparator* comparator_; // not owned
329};
330
331typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
332 WriteBatchEntrySkipList;
333
334class WBWIIteratorImpl : public WBWIIterator {
335 public:
336 WBWIIteratorImpl(uint32_t column_family_id,
337 WriteBatchEntrySkipList* skip_list,
338 const ReadableWriteBatch* write_batch)
339 : column_family_id_(column_family_id),
340 skip_list_iter_(skip_list),
341 write_batch_(write_batch) {}
342
494da23a 343 ~WBWIIteratorImpl() override {}
7c673cae 344
494da23a 345 bool Valid() const override {
7c673cae
FG
346 if (!skip_list_iter_.Valid()) {
347 return false;
348 }
349 const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
350 return (iter_entry != nullptr &&
351 iter_entry->column_family == column_family_id_);
352 }
353
494da23a 354 void SeekToFirst() override {
11fdf7f2
TL
355 WriteBatchIndexEntry search_entry(
356 nullptr /* search_key */, column_family_id_,
357 true /* is_forward_direction */, true /* is_seek_to_first */);
7c673cae
FG
358 skip_list_iter_.Seek(&search_entry);
359 }
360
494da23a 361 void SeekToLast() override {
11fdf7f2
TL
362 WriteBatchIndexEntry search_entry(
363 nullptr /* search_key */, column_family_id_ + 1,
364 true /* is_forward_direction */, true /* is_seek_to_first */);
7c673cae
FG
365 skip_list_iter_.Seek(&search_entry);
366 if (!skip_list_iter_.Valid()) {
367 skip_list_iter_.SeekToLast();
368 } else {
369 skip_list_iter_.Prev();
370 }
371 }
372
494da23a 373 void Seek(const Slice& key) override {
11fdf7f2
TL
374 WriteBatchIndexEntry search_entry(&key, column_family_id_,
375 true /* is_forward_direction */,
376 false /* is_seek_to_first */);
7c673cae
FG
377 skip_list_iter_.Seek(&search_entry);
378 }
379
494da23a 380 void SeekForPrev(const Slice& key) override {
11fdf7f2
TL
381 WriteBatchIndexEntry search_entry(&key, column_family_id_,
382 false /* is_forward_direction */,
383 false /* is_seek_to_first */);
7c673cae
FG
384 skip_list_iter_.SeekForPrev(&search_entry);
385 }
386
494da23a 387 void Next() override { skip_list_iter_.Next(); }
7c673cae 388
494da23a 389 void Prev() override { skip_list_iter_.Prev(); }
7c673cae 390
494da23a 391 WriteEntry Entry() const override {
7c673cae
FG
392 WriteEntry ret;
393 Slice blob, xid;
394 const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
395 // this is guaranteed with Valid()
396 assert(iter_entry != nullptr &&
397 iter_entry->column_family == column_family_id_);
398 auto s = write_batch_->GetEntryFromDataOffset(
399 iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid);
400 assert(s.ok());
401 assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
402 ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord ||
403 ret.type == kMergeRecord);
404 return ret;
405 }
406
494da23a 407 Status status() const override {
7c673cae
FG
408 // this is in-memory data structure, so the only way status can be non-ok is
409 // through memory corruption
410 return Status::OK();
411 }
412
413 const WriteBatchIndexEntry* GetRawEntry() const {
414 return skip_list_iter_.key();
415 }
416
417 private:
418 uint32_t column_family_id_;
419 WriteBatchEntrySkipList::Iterator skip_list_iter_;
420 const ReadableWriteBatch* write_batch_;
421};
422
423struct WriteBatchWithIndex::Rep {
11fdf7f2
TL
424 explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
425 size_t max_bytes = 0, bool _overwrite_key = false)
7c673cae
FG
426 : write_batch(reserved_bytes, max_bytes),
427 comparator(index_comparator, &write_batch),
428 skip_list(comparator, &arena),
429 overwrite_key(_overwrite_key),
11fdf7f2
TL
430 last_entry_offset(0),
431 last_sub_batch_offset(0),
432 sub_batch_cnt(1) {}
7c673cae
FG
433 ReadableWriteBatch write_batch;
434 WriteBatchEntryComparator comparator;
435 Arena arena;
436 WriteBatchEntrySkipList skip_list;
437 bool overwrite_key;
438 size_t last_entry_offset;
11fdf7f2
TL
439 // The starting offset of the last sub-batch. A sub-batch starts right before
440 // inserting a key that is a duplicate of a key in the last sub-batch. Zero,
441 // the default, means that no duplicate key is detected so far.
442 size_t last_sub_batch_offset;
443 // Total number of sub-batches in the write batch. Default is 1.
444 size_t sub_batch_cnt;
7c673cae
FG
445
446 // Remember current offset of internal write batch, which is used as
447 // the starting offset of the next record.
448 void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); }
449
450 // In overwrite mode, find the existing entry for the same key and update it
451 // to point to the current entry.
452 // Return true if the key is found and updated.
453 bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key);
454 bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key);
455
456 // Add the recent entry to the update.
457 // In overwrite mode, if key already exists in the index, update it.
458 void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key);
459 void AddOrUpdateIndex(const Slice& key);
460
461 // Allocate an index entry pointing to the last entry in the write batch and
462 // put it to skip list.
463 void AddNewEntry(uint32_t column_family_id);
464
465 // Clear all updates buffered in this batch.
466 void Clear();
467 void ClearIndex();
468
469 // Rebuild index by reading all records from the batch.
470 // Returns non-ok status on corruption.
471 Status ReBuildIndex();
472};
473
474bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
475 ColumnFamilyHandle* column_family, const Slice& key) {
476 uint32_t cf_id = GetColumnFamilyID(column_family);
477 return UpdateExistingEntryWithCfId(cf_id, key);
478}
479
480bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
481 uint32_t column_family_id, const Slice& key) {
482 if (!overwrite_key) {
483 return false;
484 }
485
486 WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch);
487 iter.Seek(key);
488 if (!iter.Valid()) {
489 return false;
490 }
491 if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) {
492 return false;
493 }
494 WriteBatchIndexEntry* non_const_entry =
495 const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
11fdf7f2
TL
496 if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) {
497 last_sub_batch_offset = last_entry_offset;
498 sub_batch_cnt++;
499 }
7c673cae
FG
500 non_const_entry->offset = last_entry_offset;
501 return true;
502}
503
504void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
505 ColumnFamilyHandle* column_family, const Slice& key) {
506 if (!UpdateExistingEntry(column_family, key)) {
507 uint32_t cf_id = GetColumnFamilyID(column_family);
508 const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
509 if (cf_cmp != nullptr) {
510 comparator.SetComparatorForCF(cf_id, cf_cmp);
511 }
512 AddNewEntry(cf_id);
513 }
514}
515
516void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) {
517 if (!UpdateExistingEntryWithCfId(0, key)) {
518 AddNewEntry(0);
519 }
520}
521
522void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
523 const std::string& wb_data = write_batch.Data();
524 Slice entry_ptr = Slice(wb_data.data() + last_entry_offset,
525 wb_data.size() - last_entry_offset);
526 // Extract key
527 Slice key;
11fdf7f2
TL
528 bool success __attribute__((__unused__));
529 success =
7c673cae
FG
530 ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
531 assert(success);
532
11fdf7f2
TL
533 auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
534 auto* index_entry =
535 new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
536 key.data() - wb_data.data(), key.size());
537 skip_list.Insert(index_entry);
538}
7c673cae 539
11fdf7f2
TL
540void WriteBatchWithIndex::Rep::Clear() {
541 write_batch.Clear();
542 ClearIndex();
543}
7c673cae 544
11fdf7f2
TL
545void WriteBatchWithIndex::Rep::ClearIndex() {
546 skip_list.~WriteBatchEntrySkipList();
547 arena.~Arena();
548 new (&arena) Arena();
549 new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
550 last_entry_offset = 0;
551 last_sub_batch_offset = 0;
552 sub_batch_cnt = 1;
553}
7c673cae 554
11fdf7f2
TL
555Status WriteBatchWithIndex::Rep::ReBuildIndex() {
556 Status s;
7c673cae 557
11fdf7f2 558 ClearIndex();
7c673cae 559
11fdf7f2
TL
560 if (write_batch.Count() == 0) {
561 // Nothing to re-index
562 return s;
563 }
7c673cae 564
11fdf7f2 565 size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
7c673cae 566
11fdf7f2
TL
567 Slice input(write_batch.Data());
568 input.remove_prefix(offset);
7c673cae 569
11fdf7f2
TL
570 // Loop through all entries in Rep and add each one to the index
571 int found = 0;
572 while (s.ok() && !input.empty()) {
573 Slice key, value, blob, xid;
574 uint32_t column_family_id = 0; // default
575 char tag = 0;
7c673cae 576
11fdf7f2
TL
577 // set offset of current entry for call to AddNewEntry()
578 last_entry_offset = input.data() - write_batch.Data().data();
7c673cae 579
11fdf7f2
TL
580 s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
581 &value, &blob, &xid);
582 if (!s.ok()) {
583 break;
7c673cae
FG
584 }
585
11fdf7f2
TL
586 switch (tag) {
587 case kTypeColumnFamilyValue:
588 case kTypeValue:
589 case kTypeColumnFamilyDeletion:
590 case kTypeDeletion:
591 case kTypeColumnFamilySingleDeletion:
592 case kTypeSingleDeletion:
593 case kTypeColumnFamilyMerge:
594 case kTypeMerge:
595 found++;
596 if (!UpdateExistingEntryWithCfId(column_family_id, key)) {
597 AddNewEntry(column_family_id);
598 }
599 break;
600 case kTypeLogData:
601 case kTypeBeginPrepareXID:
602 case kTypeBeginPersistedPrepareXID:
603 case kTypeBeginUnprepareXID:
604 case kTypeEndPrepareXID:
605 case kTypeCommitXID:
606 case kTypeRollbackXID:
607 case kTypeNoop:
608 break;
609 default:
610 return Status::Corruption("unknown WriteBatch tag in ReBuildIndex",
611 ToString(static_cast<unsigned int>(tag)));
7c673cae 612 }
11fdf7f2 613 }
7c673cae 614
11fdf7f2
TL
615 if (s.ok() && found != write_batch.Count()) {
616 s = Status::Corruption("WriteBatch has wrong count");
7c673cae
FG
617 }
618
11fdf7f2
TL
619 return s;
620}
621
622WriteBatchWithIndex::WriteBatchWithIndex(
623 const Comparator* default_index_comparator, size_t reserved_bytes,
624 bool overwrite_key, size_t max_bytes)
625 : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
626 overwrite_key)) {}
7c673cae 627
11fdf7f2 628WriteBatchWithIndex::~WriteBatchWithIndex() {}
7c673cae 629
11fdf7f2 630WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
7c673cae 631
11fdf7f2
TL
632size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }
633
634WBWIIterator* WriteBatchWithIndex::NewIterator() {
635 return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
7c673cae
FG
636}
637
638WBWIIterator* WriteBatchWithIndex::NewIterator(
639 ColumnFamilyHandle* column_family) {
640 return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
641 &(rep->skip_list), &rep->write_batch);
642}
643
644Iterator* WriteBatchWithIndex::NewIteratorWithBase(
645 ColumnFamilyHandle* column_family, Iterator* base_iterator) {
646 if (rep->overwrite_key == false) {
647 assert(false);
648 return nullptr;
649 }
650 return new BaseDeltaIterator(base_iterator, NewIterator(column_family),
651 GetColumnFamilyUserComparator(column_family));
652}
653
654Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
655 if (rep->overwrite_key == false) {
656 assert(false);
657 return nullptr;
658 }
659 // default column family's comparator
660 return new BaseDeltaIterator(base_iterator, NewIterator(),
661 rep->comparator.default_comparator());
662}
663
664Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
665 const Slice& key, const Slice& value) {
666 rep->SetLastEntryOffset();
667 auto s = rep->write_batch.Put(column_family, key, value);
668 if (s.ok()) {
669 rep->AddOrUpdateIndex(column_family, key);
670 }
671 return s;
672}
673
674Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
675 rep->SetLastEntryOffset();
676 auto s = rep->write_batch.Put(key, value);
677 if (s.ok()) {
678 rep->AddOrUpdateIndex(key);
679 }
680 return s;
681}
682
683Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
684 const Slice& key) {
685 rep->SetLastEntryOffset();
686 auto s = rep->write_batch.Delete(column_family, key);
687 if (s.ok()) {
688 rep->AddOrUpdateIndex(column_family, key);
689 }
690 return s;
691}
692
693Status WriteBatchWithIndex::Delete(const Slice& key) {
694 rep->SetLastEntryOffset();
695 auto s = rep->write_batch.Delete(key);
696 if (s.ok()) {
697 rep->AddOrUpdateIndex(key);
698 }
699 return s;
700}
701
702Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
703 const Slice& key) {
704 rep->SetLastEntryOffset();
705 auto s = rep->write_batch.SingleDelete(column_family, key);
706 if (s.ok()) {
707 rep->AddOrUpdateIndex(column_family, key);
708 }
709 return s;
710}
711
712Status WriteBatchWithIndex::SingleDelete(const Slice& key) {
713 rep->SetLastEntryOffset();
714 auto s = rep->write_batch.SingleDelete(key);
715 if (s.ok()) {
716 rep->AddOrUpdateIndex(key);
717 }
718 return s;
719}
720
721Status WriteBatchWithIndex::DeleteRange(ColumnFamilyHandle* column_family,
722 const Slice& begin_key,
723 const Slice& end_key) {
724 rep->SetLastEntryOffset();
725 auto s = rep->write_batch.DeleteRange(column_family, begin_key, end_key);
726 if (s.ok()) {
727 rep->AddOrUpdateIndex(column_family, begin_key);
728 }
729 return s;
730}
731
732Status WriteBatchWithIndex::DeleteRange(const Slice& begin_key,
733 const Slice& end_key) {
734 rep->SetLastEntryOffset();
735 auto s = rep->write_batch.DeleteRange(begin_key, end_key);
736 if (s.ok()) {
737 rep->AddOrUpdateIndex(begin_key);
738 }
739 return s;
740}
741
742Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
743 const Slice& key, const Slice& value) {
744 rep->SetLastEntryOffset();
745 auto s = rep->write_batch.Merge(column_family, key, value);
746 if (s.ok()) {
747 rep->AddOrUpdateIndex(column_family, key);
748 }
749 return s;
750}
751
752Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
753 rep->SetLastEntryOffset();
754 auto s = rep->write_batch.Merge(key, value);
755 if (s.ok()) {
756 rep->AddOrUpdateIndex(key);
757 }
758 return s;
759}
760
761Status WriteBatchWithIndex::PutLogData(const Slice& blob) {
762 return rep->write_batch.PutLogData(blob);
763}
764
765void WriteBatchWithIndex::Clear() { rep->Clear(); }
766
767Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
768 const DBOptions& options,
769 const Slice& key, std::string* value) {
770 Status s;
771 MergeContext merge_context;
772 const ImmutableDBOptions immuable_db_options(options);
773
774 WriteBatchWithIndexInternal::Result result =
775 WriteBatchWithIndexInternal::GetFromBatch(
776 immuable_db_options, this, column_family, key, &merge_context,
777 &rep->comparator, value, rep->overwrite_key, &s);
778
779 switch (result) {
780 case WriteBatchWithIndexInternal::Result::kFound:
781 case WriteBatchWithIndexInternal::Result::kError:
782 // use returned status
783 break;
784 case WriteBatchWithIndexInternal::Result::kDeleted:
785 case WriteBatchWithIndexInternal::Result::kNotFound:
786 s = Status::NotFound();
787 break;
788 case WriteBatchWithIndexInternal::Result::kMergeInProgress:
789 s = Status::MergeInProgress();
790 break;
791 default:
792 assert(false);
793 }
794
795 return s;
796}
797
798Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
799 const ReadOptions& read_options,
800 const Slice& key,
801 std::string* value) {
11fdf7f2
TL
802 assert(value != nullptr);
803 PinnableSlice pinnable_val(value);
804 assert(!pinnable_val.IsPinned());
805 auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
806 &pinnable_val);
807 if (s.ok() && pinnable_val.IsPinned()) {
808 value->assign(pinnable_val.data(), pinnable_val.size());
809 } // else value is already assigned
810 return s;
811}
812
813Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
814 const ReadOptions& read_options,
815 const Slice& key,
816 PinnableSlice* pinnable_val) {
7c673cae 817 return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
11fdf7f2 818 pinnable_val);
7c673cae
FG
819}
820
821Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
822 const ReadOptions& read_options,
823 ColumnFamilyHandle* column_family,
824 const Slice& key,
825 std::string* value) {
11fdf7f2
TL
826 assert(value != nullptr);
827 PinnableSlice pinnable_val(value);
828 assert(!pinnable_val.IsPinned());
829 auto s =
830 GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val);
831 if (s.ok() && pinnable_val.IsPinned()) {
832 value->assign(pinnable_val.data(), pinnable_val.size());
833 } // else value is already assigned
834 return s;
835}
836
837Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
838 const ReadOptions& read_options,
839 ColumnFamilyHandle* column_family,
840 const Slice& key,
841 PinnableSlice* pinnable_val) {
842 return GetFromBatchAndDB(db, read_options, column_family, key, pinnable_val,
843 nullptr);
844}
845
846Status WriteBatchWithIndex::GetFromBatchAndDB(
847 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
848 const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
7c673cae
FG
849 Status s;
850 MergeContext merge_context;
851 const ImmutableDBOptions& immuable_db_options =
11fdf7f2
TL
852 static_cast_with_check<DBImpl, DB>(db->GetRootDB())
853 ->immutable_db_options();
7c673cae 854
11fdf7f2
TL
855 // Since the lifetime of the WriteBatch is the same as that of the transaction
856 // we cannot pin it as otherwise the returned value will not be available
857 // after the transaction finishes.
858 std::string& batch_value = *pinnable_val->GetSelf();
7c673cae
FG
859 WriteBatchWithIndexInternal::Result result =
860 WriteBatchWithIndexInternal::GetFromBatch(
861 immuable_db_options, this, column_family, key, &merge_context,
862 &rep->comparator, &batch_value, rep->overwrite_key, &s);
863
864 if (result == WriteBatchWithIndexInternal::Result::kFound) {
11fdf7f2 865 pinnable_val->PinSelf();
7c673cae
FG
866 return s;
867 }
868 if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
869 return Status::NotFound();
870 }
871 if (result == WriteBatchWithIndexInternal::Result::kError) {
872 return s;
873 }
874 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
875 rep->overwrite_key == true) {
876 // Since we've overwritten keys, we do not know what other operations are
877 // in this batch for this key, so we cannot do a Merge to compute the
878 // result. Instead, we will simply return MergeInProgress.
879 return Status::MergeInProgress();
880 }
881
882 assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
883 result == WriteBatchWithIndexInternal::Result::kNotFound);
884
885 // Did not find key in batch OR could not resolve Merges. Try DB.
11fdf7f2
TL
886 if (!callback) {
887 s = db->Get(read_options, column_family, key, pinnable_val);
888 } else {
889 s = static_cast_with_check<DBImpl, DB>(db->GetRootDB())
890 ->GetImpl(read_options, column_family, key, pinnable_val, nullptr,
891 callback);
892 }
7c673cae
FG
893
894 if (s.ok() || s.IsNotFound()) { // DB Get Succeeded
895 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) {
896 // Merge result from DB with merges in Batch
897 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
898 const MergeOperator* merge_operator =
899 cfh->cfd()->ioptions()->merge_operator;
900 Statistics* statistics = immuable_db_options.statistics.get();
901 Env* env = immuable_db_options.env;
902 Logger* logger = immuable_db_options.info_log.get();
903
7c673cae
FG
904 Slice* merge_data;
905 if (s.ok()) {
11fdf7f2 906 merge_data = pinnable_val;
7c673cae
FG
907 } else { // Key not present in db (s.IsNotFound())
908 merge_data = nullptr;
909 }
910
911 if (merge_operator) {
11fdf7f2
TL
912 s = MergeHelper::TimedFullMerge(
913 merge_operator, key, merge_data, merge_context.GetOperands(),
914 pinnable_val->GetSelf(), logger, statistics, env);
915 pinnable_val->PinSelf();
7c673cae
FG
916 } else {
917 s = Status::InvalidArgument("Options::merge_operator must be set");
918 }
919 }
920 }
921
922 return s;
923}
924
925void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }
926
927Status WriteBatchWithIndex::RollbackToSavePoint() {
928 Status s = rep->write_batch.RollbackToSavePoint();
929
930 if (s.ok()) {
11fdf7f2
TL
931 rep->sub_batch_cnt = 1;
932 rep->last_sub_batch_offset = 0;
7c673cae
FG
933 s = rep->ReBuildIndex();
934 }
935
936 return s;
937}
938
11fdf7f2
TL
939Status WriteBatchWithIndex::PopSavePoint() {
940 return rep->write_batch.PopSavePoint();
941}
942
7c673cae
FG
943void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) {
944 rep->write_batch.SetMaxBytes(max_bytes);
945}
946
11fdf7f2
TL
947size_t WriteBatchWithIndex::GetDataSize() const {
948 return rep->write_batch.GetDataSize();
949}
950
7c673cae
FG
951} // namespace rocksdb
952#endif // !ROCKSDB_LITE