]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / write_batch_with_index / write_batch_with_index.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
8#include "rocksdb/utilities/write_batch_with_index.h"
9
7c673cae
FG
10#include <memory>
11
12#include "db/column_family.h"
f67539c2 13#include "db/db_impl/db_impl.h"
7c673cae
FG
14#include "db/merge_context.h"
15#include "db/merge_helper.h"
f67539c2 16#include "memory/arena.h"
7c673cae
FG
17#include "memtable/skiplist.h"
18#include "options/db_options.h"
19#include "rocksdb/comparator.h"
20#include "rocksdb/iterator.h"
11fdf7f2
TL
21#include "util/cast_util.h"
22#include "util/string_util.h"
7c673cae
FG
23#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
24
f67539c2 25namespace ROCKSDB_NAMESPACE {
7c673cae
FG
26
27// when direction == forward
28// * current_at_base_ <=> base_iterator > delta_iterator
29// when direction == backwards
30// * current_at_base_ <=> base_iterator < delta_iterator
31// always:
32// * equal_keys_ <=> base_iterator == delta_iterator
33class BaseDeltaIterator : public Iterator {
34 public:
35 BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
f67539c2
TL
36 const Comparator* comparator,
37 const ReadOptions* read_options = nullptr)
7c673cae
FG
38 : forward_(true),
39 current_at_base_(true),
40 equal_keys_(false),
41 status_(Status::OK()),
42 base_iterator_(base_iterator),
43 delta_iterator_(delta_iterator),
f67539c2
TL
44 comparator_(comparator),
45 iterate_upper_bound_(read_options ? read_options->iterate_upper_bound
46 : nullptr) {}
7c673cae 47
494da23a 48 ~BaseDeltaIterator() override {}
7c673cae
FG
49
50 bool Valid() const override {
51 return current_at_base_ ? BaseValid() : DeltaValid();
52 }
53
54 void SeekToFirst() override {
55 forward_ = true;
56 base_iterator_->SeekToFirst();
57 delta_iterator_->SeekToFirst();
58 UpdateCurrent();
59 }
60
61 void SeekToLast() override {
62 forward_ = false;
63 base_iterator_->SeekToLast();
64 delta_iterator_->SeekToLast();
65 UpdateCurrent();
66 }
67
68 void Seek(const Slice& k) override {
69 forward_ = true;
70 base_iterator_->Seek(k);
71 delta_iterator_->Seek(k);
72 UpdateCurrent();
73 }
74
75 void SeekForPrev(const Slice& k) override {
76 forward_ = false;
77 base_iterator_->SeekForPrev(k);
78 delta_iterator_->SeekForPrev(k);
79 UpdateCurrent();
80 }
81
82 void Next() override {
83 if (!Valid()) {
84 status_ = Status::NotSupported("Next() on invalid iterator");
11fdf7f2 85 return;
7c673cae
FG
86 }
87
88 if (!forward_) {
89 // Need to change direction
90 // if our direction was backward and we're not equal, we have two states:
91 // * both iterators are valid: we're already in a good state (current
92 // shows to smaller)
93 // * only one iterator is valid: we need to advance that iterator
94 forward_ = true;
95 equal_keys_ = false;
96 if (!BaseValid()) {
97 assert(DeltaValid());
98 base_iterator_->SeekToFirst();
99 } else if (!DeltaValid()) {
100 delta_iterator_->SeekToFirst();
101 } else if (current_at_base_) {
102 // Change delta from larger than base to smaller
103 AdvanceDelta();
104 } else {
105 // Change base from larger than delta to smaller
106 AdvanceBase();
107 }
108 if (DeltaValid() && BaseValid()) {
109 if (comparator_->Equal(delta_iterator_->Entry().key,
110 base_iterator_->key())) {
111 equal_keys_ = true;
112 }
113 }
114 }
115 Advance();
116 }
117
118 void Prev() override {
119 if (!Valid()) {
120 status_ = Status::NotSupported("Prev() on invalid iterator");
11fdf7f2 121 return;
7c673cae
FG
122 }
123
124 if (forward_) {
125 // Need to change direction
126 // if our direction was backward and we're not equal, we have two states:
127 // * both iterators are valid: we're already in a good state (current
128 // shows to smaller)
129 // * only one iterator is valid: we need to advance that iterator
130 forward_ = false;
131 equal_keys_ = false;
132 if (!BaseValid()) {
133 assert(DeltaValid());
134 base_iterator_->SeekToLast();
135 } else if (!DeltaValid()) {
136 delta_iterator_->SeekToLast();
137 } else if (current_at_base_) {
138 // Change delta from less advanced than base to more advanced
139 AdvanceDelta();
140 } else {
141 // Change base from less advanced than delta to more advanced
142 AdvanceBase();
143 }
144 if (DeltaValid() && BaseValid()) {
145 if (comparator_->Equal(delta_iterator_->Entry().key,
146 base_iterator_->key())) {
147 equal_keys_ = true;
148 }
149 }
150 }
151
152 Advance();
153 }
154
155 Slice key() const override {
156 return current_at_base_ ? base_iterator_->key()
157 : delta_iterator_->Entry().key;
158 }
159
160 Slice value() const override {
161 return current_at_base_ ? base_iterator_->value()
162 : delta_iterator_->Entry().value;
163 }
164
165 Status status() const override {
166 if (!status_.ok()) {
167 return status_;
168 }
169 if (!base_iterator_->status().ok()) {
170 return base_iterator_->status();
171 }
172 return delta_iterator_->status();
173 }
174
175 private:
176 void AssertInvariants() {
177#ifndef NDEBUG
11fdf7f2
TL
178 bool not_ok = false;
179 if (!base_iterator_->status().ok()) {
180 assert(!base_iterator_->Valid());
181 not_ok = true;
182 }
183 if (!delta_iterator_->status().ok()) {
184 assert(!delta_iterator_->Valid());
185 not_ok = true;
186 }
187 if (not_ok) {
188 assert(!Valid());
189 assert(!status().ok());
190 return;
191 }
192
7c673cae
FG
193 if (!Valid()) {
194 return;
195 }
196 if (!BaseValid()) {
197 assert(!current_at_base_ && delta_iterator_->Valid());
198 return;
199 }
200 if (!DeltaValid()) {
201 assert(current_at_base_ && base_iterator_->Valid());
202 return;
203 }
204 // we don't support those yet
205 assert(delta_iterator_->Entry().type != kMergeRecord &&
206 delta_iterator_->Entry().type != kLogDataRecord);
207 int compare = comparator_->Compare(delta_iterator_->Entry().key,
208 base_iterator_->key());
209 if (forward_) {
210 // current_at_base -> compare < 0
211 assert(!current_at_base_ || compare < 0);
212 // !current_at_base -> compare <= 0
213 assert(current_at_base_ && compare >= 0);
214 } else {
215 // current_at_base -> compare > 0
216 assert(!current_at_base_ || compare > 0);
217 // !current_at_base -> compare <= 0
218 assert(current_at_base_ && compare <= 0);
219 }
220 // equal_keys_ <=> compare == 0
221 assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
222#endif
223 }
224
225 void Advance() {
226 if (equal_keys_) {
227 assert(BaseValid() && DeltaValid());
228 AdvanceBase();
229 AdvanceDelta();
230 } else {
231 if (current_at_base_) {
232 assert(BaseValid());
233 AdvanceBase();
234 } else {
235 assert(DeltaValid());
236 AdvanceDelta();
237 }
238 }
239 UpdateCurrent();
240 }
241
242 void AdvanceDelta() {
243 if (forward_) {
244 delta_iterator_->Next();
245 } else {
246 delta_iterator_->Prev();
247 }
248 }
249 void AdvanceBase() {
250 if (forward_) {
251 base_iterator_->Next();
252 } else {
253 base_iterator_->Prev();
254 }
255 }
256 bool BaseValid() const { return base_iterator_->Valid(); }
257 bool DeltaValid() const { return delta_iterator_->Valid(); }
258 void UpdateCurrent() {
259// Suppress false positive clang analyzer warnings.
260#ifndef __clang_analyzer__
11fdf7f2 261 status_ = Status::OK();
7c673cae
FG
262 while (true) {
263 WriteEntry delta_entry;
264 if (DeltaValid()) {
11fdf7f2 265 assert(delta_iterator_->status().ok());
7c673cae 266 delta_entry = delta_iterator_->Entry();
11fdf7f2
TL
267 } else if (!delta_iterator_->status().ok()) {
268 // Expose the error status and stop.
269 current_at_base_ = false;
270 return;
7c673cae
FG
271 }
272 equal_keys_ = false;
273 if (!BaseValid()) {
11fdf7f2
TL
274 if (!base_iterator_->status().ok()) {
275 // Expose the error status and stop.
276 current_at_base_ = true;
277 return;
278 }
279
7c673cae
FG
280 // Base has finished.
281 if (!DeltaValid()) {
282 // Finished
283 return;
284 }
f67539c2
TL
285 if (iterate_upper_bound_) {
286 if (comparator_->Compare(delta_entry.key, *iterate_upper_bound_) >=
287 0) {
288 // out of upper bound -> finished.
289 return;
290 }
291 }
7c673cae
FG
292 if (delta_entry.type == kDeleteRecord ||
293 delta_entry.type == kSingleDeleteRecord) {
294 AdvanceDelta();
295 } else {
296 current_at_base_ = false;
297 return;
298 }
299 } else if (!DeltaValid()) {
300 // Delta has finished.
301 current_at_base_ = true;
302 return;
303 } else {
304 int compare =
305 (forward_ ? 1 : -1) *
306 comparator_->Compare(delta_entry.key, base_iterator_->key());
307 if (compare <= 0) { // delta bigger or equal
308 if (compare == 0) {
309 equal_keys_ = true;
310 }
311 if (delta_entry.type != kDeleteRecord &&
312 delta_entry.type != kSingleDeleteRecord) {
313 current_at_base_ = false;
314 return;
315 }
316 // Delta is less advanced and is delete.
317 AdvanceDelta();
318 if (equal_keys_) {
319 AdvanceBase();
320 }
321 } else {
322 current_at_base_ = true;
323 return;
324 }
325 }
326 }
327
328 AssertInvariants();
329#endif // __clang_analyzer__
330 }
331
332 bool forward_;
333 bool current_at_base_;
334 bool equal_keys_;
335 Status status_;
336 std::unique_ptr<Iterator> base_iterator_;
337 std::unique_ptr<WBWIIterator> delta_iterator_;
338 const Comparator* comparator_; // not owned
f67539c2 339 const Slice* iterate_upper_bound_;
7c673cae
FG
340};
341
342typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
343 WriteBatchEntrySkipList;
344
345class WBWIIteratorImpl : public WBWIIterator {
346 public:
347 WBWIIteratorImpl(uint32_t column_family_id,
348 WriteBatchEntrySkipList* skip_list,
349 const ReadableWriteBatch* write_batch)
350 : column_family_id_(column_family_id),
351 skip_list_iter_(skip_list),
352 write_batch_(write_batch) {}
353
494da23a 354 ~WBWIIteratorImpl() override {}
7c673cae 355
494da23a 356 bool Valid() const override {
7c673cae
FG
357 if (!skip_list_iter_.Valid()) {
358 return false;
359 }
360 const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
361 return (iter_entry != nullptr &&
362 iter_entry->column_family == column_family_id_);
363 }
364
494da23a 365 void SeekToFirst() override {
11fdf7f2
TL
366 WriteBatchIndexEntry search_entry(
367 nullptr /* search_key */, column_family_id_,
368 true /* is_forward_direction */, true /* is_seek_to_first */);
7c673cae
FG
369 skip_list_iter_.Seek(&search_entry);
370 }
371
494da23a 372 void SeekToLast() override {
11fdf7f2
TL
373 WriteBatchIndexEntry search_entry(
374 nullptr /* search_key */, column_family_id_ + 1,
375 true /* is_forward_direction */, true /* is_seek_to_first */);
7c673cae
FG
376 skip_list_iter_.Seek(&search_entry);
377 if (!skip_list_iter_.Valid()) {
378 skip_list_iter_.SeekToLast();
379 } else {
380 skip_list_iter_.Prev();
381 }
382 }
383
494da23a 384 void Seek(const Slice& key) override {
11fdf7f2
TL
385 WriteBatchIndexEntry search_entry(&key, column_family_id_,
386 true /* is_forward_direction */,
387 false /* is_seek_to_first */);
7c673cae
FG
388 skip_list_iter_.Seek(&search_entry);
389 }
390
494da23a 391 void SeekForPrev(const Slice& key) override {
11fdf7f2
TL
392 WriteBatchIndexEntry search_entry(&key, column_family_id_,
393 false /* is_forward_direction */,
394 false /* is_seek_to_first */);
7c673cae
FG
395 skip_list_iter_.SeekForPrev(&search_entry);
396 }
397
494da23a 398 void Next() override { skip_list_iter_.Next(); }
7c673cae 399
494da23a 400 void Prev() override { skip_list_iter_.Prev(); }
7c673cae 401
494da23a 402 WriteEntry Entry() const override {
7c673cae
FG
403 WriteEntry ret;
404 Slice blob, xid;
405 const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
406 // this is guaranteed with Valid()
407 assert(iter_entry != nullptr &&
408 iter_entry->column_family == column_family_id_);
409 auto s = write_batch_->GetEntryFromDataOffset(
410 iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid);
411 assert(s.ok());
412 assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
413 ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord ||
414 ret.type == kMergeRecord);
415 return ret;
416 }
417
494da23a 418 Status status() const override {
7c673cae
FG
419 // this is in-memory data structure, so the only way status can be non-ok is
420 // through memory corruption
421 return Status::OK();
422 }
423
424 const WriteBatchIndexEntry* GetRawEntry() const {
425 return skip_list_iter_.key();
426 }
427
428 private:
429 uint32_t column_family_id_;
430 WriteBatchEntrySkipList::Iterator skip_list_iter_;
431 const ReadableWriteBatch* write_batch_;
432};
433
434struct WriteBatchWithIndex::Rep {
11fdf7f2
TL
435 explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
436 size_t max_bytes = 0, bool _overwrite_key = false)
7c673cae
FG
437 : write_batch(reserved_bytes, max_bytes),
438 comparator(index_comparator, &write_batch),
439 skip_list(comparator, &arena),
440 overwrite_key(_overwrite_key),
11fdf7f2
TL
441 last_entry_offset(0),
442 last_sub_batch_offset(0),
443 sub_batch_cnt(1) {}
7c673cae
FG
444 ReadableWriteBatch write_batch;
445 WriteBatchEntryComparator comparator;
446 Arena arena;
447 WriteBatchEntrySkipList skip_list;
448 bool overwrite_key;
449 size_t last_entry_offset;
11fdf7f2
TL
450 // The starting offset of the last sub-batch. A sub-batch starts right before
451 // inserting a key that is a duplicate of a key in the last sub-batch. Zero,
452 // the default, means that no duplicate key is detected so far.
453 size_t last_sub_batch_offset;
454 // Total number of sub-batches in the write batch. Default is 1.
455 size_t sub_batch_cnt;
7c673cae
FG
456
457 // Remember current offset of internal write batch, which is used as
458 // the starting offset of the next record.
459 void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); }
460
461 // In overwrite mode, find the existing entry for the same key and update it
462 // to point to the current entry.
463 // Return true if the key is found and updated.
464 bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key);
465 bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key);
466
467 // Add the recent entry to the update.
468 // In overwrite mode, if key already exists in the index, update it.
469 void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key);
470 void AddOrUpdateIndex(const Slice& key);
471
472 // Allocate an index entry pointing to the last entry in the write batch and
473 // put it to skip list.
474 void AddNewEntry(uint32_t column_family_id);
475
476 // Clear all updates buffered in this batch.
477 void Clear();
478 void ClearIndex();
479
480 // Rebuild index by reading all records from the batch.
481 // Returns non-ok status on corruption.
482 Status ReBuildIndex();
483};
484
485bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
486 ColumnFamilyHandle* column_family, const Slice& key) {
487 uint32_t cf_id = GetColumnFamilyID(column_family);
488 return UpdateExistingEntryWithCfId(cf_id, key);
489}
490
491bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
492 uint32_t column_family_id, const Slice& key) {
493 if (!overwrite_key) {
494 return false;
495 }
496
497 WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch);
498 iter.Seek(key);
499 if (!iter.Valid()) {
500 return false;
501 }
502 if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) {
503 return false;
504 }
505 WriteBatchIndexEntry* non_const_entry =
506 const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
11fdf7f2
TL
507 if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) {
508 last_sub_batch_offset = last_entry_offset;
509 sub_batch_cnt++;
510 }
7c673cae
FG
511 non_const_entry->offset = last_entry_offset;
512 return true;
513}
514
515void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
516 ColumnFamilyHandle* column_family, const Slice& key) {
517 if (!UpdateExistingEntry(column_family, key)) {
518 uint32_t cf_id = GetColumnFamilyID(column_family);
519 const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
520 if (cf_cmp != nullptr) {
521 comparator.SetComparatorForCF(cf_id, cf_cmp);
522 }
523 AddNewEntry(cf_id);
524 }
525}
526
527void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) {
528 if (!UpdateExistingEntryWithCfId(0, key)) {
529 AddNewEntry(0);
530 }
531}
532
533void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
534 const std::string& wb_data = write_batch.Data();
535 Slice entry_ptr = Slice(wb_data.data() + last_entry_offset,
536 wb_data.size() - last_entry_offset);
537 // Extract key
538 Slice key;
11fdf7f2
TL
539 bool success __attribute__((__unused__));
540 success =
7c673cae
FG
541 ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
542 assert(success);
543
11fdf7f2
TL
544 auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
545 auto* index_entry =
546 new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
547 key.data() - wb_data.data(), key.size());
548 skip_list.Insert(index_entry);
549}
7c673cae 550
11fdf7f2
TL
551void WriteBatchWithIndex::Rep::Clear() {
552 write_batch.Clear();
553 ClearIndex();
554}
7c673cae 555
11fdf7f2
TL
556void WriteBatchWithIndex::Rep::ClearIndex() {
557 skip_list.~WriteBatchEntrySkipList();
558 arena.~Arena();
559 new (&arena) Arena();
560 new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
561 last_entry_offset = 0;
562 last_sub_batch_offset = 0;
563 sub_batch_cnt = 1;
564}
7c673cae 565
11fdf7f2
TL
566Status WriteBatchWithIndex::Rep::ReBuildIndex() {
567 Status s;
7c673cae 568
11fdf7f2 569 ClearIndex();
7c673cae 570
11fdf7f2
TL
571 if (write_batch.Count() == 0) {
572 // Nothing to re-index
573 return s;
574 }
7c673cae 575
11fdf7f2 576 size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
7c673cae 577
11fdf7f2
TL
578 Slice input(write_batch.Data());
579 input.remove_prefix(offset);
7c673cae 580
11fdf7f2 581 // Loop through all entries in Rep and add each one to the index
f67539c2 582 uint32_t found = 0;
11fdf7f2
TL
583 while (s.ok() && !input.empty()) {
584 Slice key, value, blob, xid;
585 uint32_t column_family_id = 0; // default
586 char tag = 0;
7c673cae 587
11fdf7f2
TL
588 // set offset of current entry for call to AddNewEntry()
589 last_entry_offset = input.data() - write_batch.Data().data();
7c673cae 590
11fdf7f2
TL
591 s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
592 &value, &blob, &xid);
593 if (!s.ok()) {
594 break;
7c673cae
FG
595 }
596
11fdf7f2
TL
597 switch (tag) {
598 case kTypeColumnFamilyValue:
599 case kTypeValue:
600 case kTypeColumnFamilyDeletion:
601 case kTypeDeletion:
602 case kTypeColumnFamilySingleDeletion:
603 case kTypeSingleDeletion:
604 case kTypeColumnFamilyMerge:
605 case kTypeMerge:
606 found++;
607 if (!UpdateExistingEntryWithCfId(column_family_id, key)) {
608 AddNewEntry(column_family_id);
609 }
610 break;
611 case kTypeLogData:
612 case kTypeBeginPrepareXID:
613 case kTypeBeginPersistedPrepareXID:
614 case kTypeBeginUnprepareXID:
615 case kTypeEndPrepareXID:
616 case kTypeCommitXID:
617 case kTypeRollbackXID:
618 case kTypeNoop:
619 break;
620 default:
621 return Status::Corruption("unknown WriteBatch tag in ReBuildIndex",
622 ToString(static_cast<unsigned int>(tag)));
7c673cae 623 }
11fdf7f2 624 }
7c673cae 625
11fdf7f2
TL
626 if (s.ok() && found != write_batch.Count()) {
627 s = Status::Corruption("WriteBatch has wrong count");
7c673cae
FG
628 }
629
11fdf7f2
TL
630 return s;
631}
632
633WriteBatchWithIndex::WriteBatchWithIndex(
634 const Comparator* default_index_comparator, size_t reserved_bytes,
635 bool overwrite_key, size_t max_bytes)
636 : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
637 overwrite_key)) {}
7c673cae 638
11fdf7f2 639WriteBatchWithIndex::~WriteBatchWithIndex() {}
7c673cae 640
f67539c2
TL
641WriteBatchWithIndex::WriteBatchWithIndex(WriteBatchWithIndex&&) = default;
642
643WriteBatchWithIndex& WriteBatchWithIndex::operator=(WriteBatchWithIndex&&) =
644 default;
645
11fdf7f2 646WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
7c673cae 647
11fdf7f2
TL
648size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }
649
650WBWIIterator* WriteBatchWithIndex::NewIterator() {
651 return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
7c673cae
FG
652}
653
654WBWIIterator* WriteBatchWithIndex::NewIterator(
655 ColumnFamilyHandle* column_family) {
656 return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
657 &(rep->skip_list), &rep->write_batch);
658}
659
660Iterator* WriteBatchWithIndex::NewIteratorWithBase(
f67539c2
TL
661 ColumnFamilyHandle* column_family, Iterator* base_iterator,
662 const ReadOptions* read_options) {
7c673cae
FG
663 if (rep->overwrite_key == false) {
664 assert(false);
665 return nullptr;
666 }
667 return new BaseDeltaIterator(base_iterator, NewIterator(column_family),
f67539c2
TL
668 GetColumnFamilyUserComparator(column_family),
669 read_options);
7c673cae
FG
670}
671
672Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
673 if (rep->overwrite_key == false) {
674 assert(false);
675 return nullptr;
676 }
677 // default column family's comparator
678 return new BaseDeltaIterator(base_iterator, NewIterator(),
679 rep->comparator.default_comparator());
680}
681
682Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
683 const Slice& key, const Slice& value) {
684 rep->SetLastEntryOffset();
685 auto s = rep->write_batch.Put(column_family, key, value);
686 if (s.ok()) {
687 rep->AddOrUpdateIndex(column_family, key);
688 }
689 return s;
690}
691
692Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
693 rep->SetLastEntryOffset();
694 auto s = rep->write_batch.Put(key, value);
695 if (s.ok()) {
696 rep->AddOrUpdateIndex(key);
697 }
698 return s;
699}
700
701Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
702 const Slice& key) {
703 rep->SetLastEntryOffset();
704 auto s = rep->write_batch.Delete(column_family, key);
705 if (s.ok()) {
706 rep->AddOrUpdateIndex(column_family, key);
707 }
708 return s;
709}
710
711Status WriteBatchWithIndex::Delete(const Slice& key) {
712 rep->SetLastEntryOffset();
713 auto s = rep->write_batch.Delete(key);
714 if (s.ok()) {
715 rep->AddOrUpdateIndex(key);
716 }
717 return s;
718}
719
720Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
721 const Slice& key) {
722 rep->SetLastEntryOffset();
723 auto s = rep->write_batch.SingleDelete(column_family, key);
724 if (s.ok()) {
725 rep->AddOrUpdateIndex(column_family, key);
726 }
727 return s;
728}
729
730Status WriteBatchWithIndex::SingleDelete(const Slice& key) {
731 rep->SetLastEntryOffset();
732 auto s = rep->write_batch.SingleDelete(key);
733 if (s.ok()) {
734 rep->AddOrUpdateIndex(key);
735 }
736 return s;
737}
738
7c673cae
FG
739Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
740 const Slice& key, const Slice& value) {
741 rep->SetLastEntryOffset();
742 auto s = rep->write_batch.Merge(column_family, key, value);
743 if (s.ok()) {
744 rep->AddOrUpdateIndex(column_family, key);
745 }
746 return s;
747}
748
749Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
750 rep->SetLastEntryOffset();
751 auto s = rep->write_batch.Merge(key, value);
752 if (s.ok()) {
753 rep->AddOrUpdateIndex(key);
754 }
755 return s;
756}
757
758Status WriteBatchWithIndex::PutLogData(const Slice& blob) {
759 return rep->write_batch.PutLogData(blob);
760}
761
762void WriteBatchWithIndex::Clear() { rep->Clear(); }
763
764Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
765 const DBOptions& options,
766 const Slice& key, std::string* value) {
767 Status s;
768 MergeContext merge_context;
769 const ImmutableDBOptions immuable_db_options(options);
770
771 WriteBatchWithIndexInternal::Result result =
772 WriteBatchWithIndexInternal::GetFromBatch(
773 immuable_db_options, this, column_family, key, &merge_context,
774 &rep->comparator, value, rep->overwrite_key, &s);
775
776 switch (result) {
777 case WriteBatchWithIndexInternal::Result::kFound:
778 case WriteBatchWithIndexInternal::Result::kError:
779 // use returned status
780 break;
781 case WriteBatchWithIndexInternal::Result::kDeleted:
782 case WriteBatchWithIndexInternal::Result::kNotFound:
783 s = Status::NotFound();
784 break;
785 case WriteBatchWithIndexInternal::Result::kMergeInProgress:
786 s = Status::MergeInProgress();
787 break;
788 default:
789 assert(false);
790 }
791
792 return s;
793}
794
795Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
796 const ReadOptions& read_options,
797 const Slice& key,
798 std::string* value) {
11fdf7f2
TL
799 assert(value != nullptr);
800 PinnableSlice pinnable_val(value);
801 assert(!pinnable_val.IsPinned());
802 auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
803 &pinnable_val);
804 if (s.ok() && pinnable_val.IsPinned()) {
805 value->assign(pinnable_val.data(), pinnable_val.size());
806 } // else value is already assigned
807 return s;
808}
809
810Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
811 const ReadOptions& read_options,
812 const Slice& key,
813 PinnableSlice* pinnable_val) {
7c673cae 814 return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
11fdf7f2 815 pinnable_val);
7c673cae
FG
816}
817
818Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
819 const ReadOptions& read_options,
820 ColumnFamilyHandle* column_family,
821 const Slice& key,
822 std::string* value) {
11fdf7f2
TL
823 assert(value != nullptr);
824 PinnableSlice pinnable_val(value);
825 assert(!pinnable_val.IsPinned());
826 auto s =
827 GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val);
828 if (s.ok() && pinnable_val.IsPinned()) {
829 value->assign(pinnable_val.data(), pinnable_val.size());
830 } // else value is already assigned
831 return s;
832}
833
834Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
835 const ReadOptions& read_options,
836 ColumnFamilyHandle* column_family,
837 const Slice& key,
838 PinnableSlice* pinnable_val) {
839 return GetFromBatchAndDB(db, read_options, column_family, key, pinnable_val,
840 nullptr);
841}
842
843Status WriteBatchWithIndex::GetFromBatchAndDB(
844 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
845 const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
7c673cae
FG
846 Status s;
847 MergeContext merge_context;
848 const ImmutableDBOptions& immuable_db_options =
20effc67 849 static_cast_with_check<DBImpl>(db->GetRootDB())->immutable_db_options();
7c673cae 850
11fdf7f2
TL
851 // Since the lifetime of the WriteBatch is the same as that of the transaction
852 // we cannot pin it as otherwise the returned value will not be available
853 // after the transaction finishes.
854 std::string& batch_value = *pinnable_val->GetSelf();
7c673cae
FG
855 WriteBatchWithIndexInternal::Result result =
856 WriteBatchWithIndexInternal::GetFromBatch(
857 immuable_db_options, this, column_family, key, &merge_context,
858 &rep->comparator, &batch_value, rep->overwrite_key, &s);
859
860 if (result == WriteBatchWithIndexInternal::Result::kFound) {
11fdf7f2 861 pinnable_val->PinSelf();
7c673cae
FG
862 return s;
863 }
864 if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
865 return Status::NotFound();
866 }
867 if (result == WriteBatchWithIndexInternal::Result::kError) {
868 return s;
869 }
870 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
871 rep->overwrite_key == true) {
872 // Since we've overwritten keys, we do not know what other operations are
873 // in this batch for this key, so we cannot do a Merge to compute the
874 // result. Instead, we will simply return MergeInProgress.
875 return Status::MergeInProgress();
876 }
877
878 assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
879 result == WriteBatchWithIndexInternal::Result::kNotFound);
880
881 // Did not find key in batch OR could not resolve Merges. Try DB.
11fdf7f2
TL
882 if (!callback) {
883 s = db->Get(read_options, column_family, key, pinnable_val);
884 } else {
f67539c2
TL
885 DBImpl::GetImplOptions get_impl_options;
886 get_impl_options.column_family = column_family;
887 get_impl_options.value = pinnable_val;
888 get_impl_options.callback = callback;
20effc67 889 s = static_cast_with_check<DBImpl>(db->GetRootDB())
f67539c2 890 ->GetImpl(read_options, key, get_impl_options);
11fdf7f2 891 }
7c673cae
FG
892
893 if (s.ok() || s.IsNotFound()) { // DB Get Succeeded
894 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) {
895 // Merge result from DB with merges in Batch
20effc67 896 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
897 const MergeOperator* merge_operator =
898 cfh->cfd()->ioptions()->merge_operator;
899 Statistics* statistics = immuable_db_options.statistics.get();
900 Env* env = immuable_db_options.env;
901 Logger* logger = immuable_db_options.info_log.get();
902
7c673cae
FG
903 Slice* merge_data;
904 if (s.ok()) {
11fdf7f2 905 merge_data = pinnable_val;
7c673cae
FG
906 } else { // Key not present in db (s.IsNotFound())
907 merge_data = nullptr;
908 }
909
910 if (merge_operator) {
f67539c2
TL
911 std::string merge_result;
912 s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data,
913 merge_context.GetOperands(),
914 &merge_result, logger, statistics, env);
915 pinnable_val->Reset();
916 *pinnable_val->GetSelf() = std::move(merge_result);
11fdf7f2 917 pinnable_val->PinSelf();
7c673cae
FG
918 } else {
919 s = Status::InvalidArgument("Options::merge_operator must be set");
920 }
921 }
922 }
923
924 return s;
925}
926
f67539c2
TL
927void WriteBatchWithIndex::MultiGetFromBatchAndDB(
928 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
929 const size_t num_keys, const Slice* keys, PinnableSlice* values,
930 Status* statuses, bool sorted_input) {
931 MultiGetFromBatchAndDB(db, read_options, column_family, num_keys, keys,
932 values, statuses, sorted_input, nullptr);
933}
934
935void WriteBatchWithIndex::MultiGetFromBatchAndDB(
936 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
937 const size_t num_keys, const Slice* keys, PinnableSlice* values,
938 Status* statuses, bool sorted_input, ReadCallback* callback) {
939 const ImmutableDBOptions& immuable_db_options =
20effc67 940 static_cast_with_check<DBImpl>(db->GetRootDB())->immutable_db_options();
f67539c2
TL
941
942 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
943 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
944 // To hold merges from the write batch
945 autovector<std::pair<WriteBatchWithIndexInternal::Result, MergeContext>,
946 MultiGetContext::MAX_BATCH_SIZE>
947 merges;
948 // Since the lifetime of the WriteBatch is the same as that of the transaction
949 // we cannot pin it as otherwise the returned value will not be available
950 // after the transaction finishes.
951 for (size_t i = 0; i < num_keys; ++i) {
952 MergeContext merge_context;
953 PinnableSlice* pinnable_val = &values[i];
954 std::string& batch_value = *pinnable_val->GetSelf();
955 Status* s = &statuses[i];
956 WriteBatchWithIndexInternal::Result result =
957 WriteBatchWithIndexInternal::GetFromBatch(
958 immuable_db_options, this, column_family, keys[i], &merge_context,
959 &rep->comparator, &batch_value, rep->overwrite_key, s);
960
961 if (result == WriteBatchWithIndexInternal::Result::kFound) {
962 pinnable_val->PinSelf();
963 continue;
964 }
965 if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
966 *s = Status::NotFound();
967 continue;
968 }
969 if (result == WriteBatchWithIndexInternal::Result::kError) {
970 continue;
971 }
972 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
973 rep->overwrite_key == true) {
974 // Since we've overwritten keys, we do not know what other operations are
975 // in this batch for this key, so we cannot do a Merge to compute the
976 // result. Instead, we will simply return MergeInProgress.
977 *s = Status::MergeInProgress();
978 continue;
979 }
980
981 assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
982 result == WriteBatchWithIndexInternal::Result::kNotFound);
20effc67
TL
983 key_context.emplace_back(column_family, keys[i], &values[i],
984 /*timestamp*/ nullptr, &statuses[i]);
f67539c2
TL
985 merges.emplace_back(result, std::move(merge_context));
986 }
987
988 for (KeyContext& key : key_context) {
989 sorted_keys.emplace_back(&key);
990 }
991
992 // Did not find key in batch OR could not resolve Merges. Try DB.
20effc67 993 static_cast_with_check<DBImpl>(db->GetRootDB())
f67539c2 994 ->PrepareMultiGetKeys(key_context.size(), sorted_input, &sorted_keys);
20effc67 995 static_cast_with_check<DBImpl>(db->GetRootDB())
f67539c2
TL
996 ->MultiGetWithCallback(read_options, column_family, callback,
997 &sorted_keys);
998
999 ColumnFamilyHandleImpl* cfh =
20effc67 1000 static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
f67539c2
TL
1001 const MergeOperator* merge_operator = cfh->cfd()->ioptions()->merge_operator;
1002 for (auto iter = key_context.begin(); iter != key_context.end(); ++iter) {
1003 KeyContext& key = *iter;
1004 if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded
1005 size_t index = iter - key_context.begin();
1006 std::pair<WriteBatchWithIndexInternal::Result, MergeContext>&
1007 merge_result = merges[index];
1008 if (merge_result.first ==
1009 WriteBatchWithIndexInternal::Result::kMergeInProgress) {
1010 // Merge result from DB with merges in Batch
1011 Statistics* statistics = immuable_db_options.statistics.get();
1012 Env* env = immuable_db_options.env;
1013 Logger* logger = immuable_db_options.info_log.get();
1014
1015 Slice* merge_data;
1016 if (key.s->ok()) {
1017 merge_data = iter->value;
1018 } else { // Key not present in db (s.IsNotFound())
1019 merge_data = nullptr;
1020 }
1021
1022 if (merge_operator) {
1023 *key.s = MergeHelper::TimedFullMerge(
1024 merge_operator, *key.key, merge_data,
1025 merge_result.second.GetOperands(), key.value->GetSelf(), logger,
1026 statistics, env);
1027 key.value->PinSelf();
1028 } else {
1029 *key.s =
1030 Status::InvalidArgument("Options::merge_operator must be set");
1031 }
1032 }
1033 }
1034 }
1035}
1036
7c673cae
FG
1037void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }
1038
1039Status WriteBatchWithIndex::RollbackToSavePoint() {
1040 Status s = rep->write_batch.RollbackToSavePoint();
1041
1042 if (s.ok()) {
11fdf7f2
TL
1043 rep->sub_batch_cnt = 1;
1044 rep->last_sub_batch_offset = 0;
7c673cae
FG
1045 s = rep->ReBuildIndex();
1046 }
1047
1048 return s;
1049}
1050
11fdf7f2
TL
1051Status WriteBatchWithIndex::PopSavePoint() {
1052 return rep->write_batch.PopSavePoint();
1053}
1054
7c673cae
FG
1055void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) {
1056 rep->write_batch.SetMaxBytes(max_bytes);
1057}
1058
11fdf7f2
TL
1059size_t WriteBatchWithIndex::GetDataSize() const {
1060 return rep->write_batch.GetDataSize();
1061}
1062
f67539c2 1063} // namespace ROCKSDB_NAMESPACE
7c673cae 1064#endif // !ROCKSDB_LITE