]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / write_batch_with_index / write_batch_with_index.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "rocksdb/utilities/write_batch_with_index.h"
9
10 #include <memory>
11
12 #include "db/column_family.h"
13 #include "db/db_impl/db_impl.h"
14 #include "db/merge_context.h"
15 #include "db/merge_helper.h"
16 #include "memory/arena.h"
17 #include "memtable/skiplist.h"
18 #include "options/db_options.h"
19 #include "rocksdb/comparator.h"
20 #include "rocksdb/iterator.h"
21 #include "util/cast_util.h"
22 #include "util/string_util.h"
23 #include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
24
25 namespace ROCKSDB_NAMESPACE {
26 struct WriteBatchWithIndex::Rep {
27 explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
28 size_t max_bytes = 0, bool _overwrite_key = false,
29 size_t protection_bytes_per_key = 0)
30 : write_batch(reserved_bytes, max_bytes, protection_bytes_per_key,
31 index_comparator ? index_comparator->timestamp_size() : 0),
32 comparator(index_comparator, &write_batch),
33 skip_list(comparator, &arena),
34 overwrite_key(_overwrite_key),
35 last_entry_offset(0),
36 last_sub_batch_offset(0),
37 sub_batch_cnt(1) {}
38 ReadableWriteBatch write_batch;
39 WriteBatchEntryComparator comparator;
40 Arena arena;
41 WriteBatchEntrySkipList skip_list;
42 bool overwrite_key;
43 size_t last_entry_offset;
44 // The starting offset of the last sub-batch. A sub-batch starts right before
45 // inserting a key that is a duplicate of a key in the last sub-batch. Zero,
46 // the default, means that no duplicate key is detected so far.
47 size_t last_sub_batch_offset;
48 // Total number of sub-batches in the write batch. Default is 1.
49 size_t sub_batch_cnt;
50
51 // Remember current offset of internal write batch, which is used as
52 // the starting offset of the next record.
53 void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); }
54
55 // In overwrite mode, find the existing entry for the same key and update it
56 // to point to the current entry.
57 // Return true if the key is found and updated.
58 bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key,
59 WriteType type);
60 bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key,
61 WriteType type);
62
63 // Add the recent entry to the update.
64 // In overwrite mode, if key already exists in the index, update it.
65 void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key,
66 WriteType type);
67 void AddOrUpdateIndex(const Slice& key, WriteType type);
68
69 // Allocate an index entry pointing to the last entry in the write batch and
70 // put it to skip list.
71 void AddNewEntry(uint32_t column_family_id);
72
73 // Clear all updates buffered in this batch.
74 void Clear();
75 void ClearIndex();
76
77 // Rebuild index by reading all records from the batch.
78 // Returns non-ok status on corruption.
79 Status ReBuildIndex();
80 };
81
82 bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
83 ColumnFamilyHandle* column_family, const Slice& key, WriteType type) {
84 uint32_t cf_id = GetColumnFamilyID(column_family);
85 return UpdateExistingEntryWithCfId(cf_id, key, type);
86 }
87
88 bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
89 uint32_t column_family_id, const Slice& key, WriteType type) {
90 if (!overwrite_key) {
91 return false;
92 }
93
94 WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch,
95 &comparator);
96 iter.Seek(key);
97 if (!iter.Valid()) {
98 return false;
99 } else if (!iter.MatchesKey(column_family_id, key)) {
100 return false;
101 } else {
102 // Move to the end of this key (NextKey-Prev)
103 iter.NextKey(); // Move to the next key
104 if (iter.Valid()) {
105 iter.Prev(); // Move back one entry
106 } else {
107 iter.SeekToLast();
108 }
109 }
110 WriteBatchIndexEntry* non_const_entry =
111 const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
112 if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) {
113 last_sub_batch_offset = last_entry_offset;
114 sub_batch_cnt++;
115 }
116 if (type == kMergeRecord) {
117 return false;
118 } else {
119 non_const_entry->offset = last_entry_offset;
120 return true;
121 }
122 }
123
124 void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
125 ColumnFamilyHandle* column_family, const Slice& key, WriteType type) {
126 if (!UpdateExistingEntry(column_family, key, type)) {
127 uint32_t cf_id = GetColumnFamilyID(column_family);
128 const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
129 if (cf_cmp != nullptr) {
130 comparator.SetComparatorForCF(cf_id, cf_cmp);
131 }
132 AddNewEntry(cf_id);
133 }
134 }
135
136 void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key,
137 WriteType type) {
138 if (!UpdateExistingEntryWithCfId(0, key, type)) {
139 AddNewEntry(0);
140 }
141 }
142
143 void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
144 const std::string& wb_data = write_batch.Data();
145 Slice entry_ptr = Slice(wb_data.data() + last_entry_offset,
146 wb_data.size() - last_entry_offset);
147 // Extract key
148 Slice key;
149 bool success =
150 ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
151 #ifdef NDEBUG
152 (void)success;
153 #endif
154 assert(success);
155
156 const Comparator* const ucmp = comparator.GetComparator(column_family_id);
157 size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0;
158
159 if (ts_sz > 0) {
160 key.remove_suffix(ts_sz);
161 }
162
163 auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
164 auto* index_entry =
165 new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
166 key.data() - wb_data.data(), key.size());
167 skip_list.Insert(index_entry);
168 }
169
170 void WriteBatchWithIndex::Rep::Clear() {
171 write_batch.Clear();
172 ClearIndex();
173 }
174
175 void WriteBatchWithIndex::Rep::ClearIndex() {
176 skip_list.~WriteBatchEntrySkipList();
177 arena.~Arena();
178 new (&arena) Arena();
179 new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
180 last_entry_offset = 0;
181 last_sub_batch_offset = 0;
182 sub_batch_cnt = 1;
183 }
184
185 Status WriteBatchWithIndex::Rep::ReBuildIndex() {
186 Status s;
187
188 ClearIndex();
189
190 if (write_batch.Count() == 0) {
191 // Nothing to re-index
192 return s;
193 }
194
195 size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
196
197 Slice input(write_batch.Data());
198 input.remove_prefix(offset);
199
200 // Loop through all entries in Rep and add each one to the index
201 uint32_t found = 0;
202 while (s.ok() && !input.empty()) {
203 Slice key, value, blob, xid;
204 uint32_t column_family_id = 0; // default
205 char tag = 0;
206
207 // set offset of current entry for call to AddNewEntry()
208 last_entry_offset = input.data() - write_batch.Data().data();
209
210 s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, &value,
211 &blob, &xid);
212 if (!s.ok()) {
213 break;
214 }
215
216 switch (tag) {
217 case kTypeColumnFamilyValue:
218 case kTypeValue:
219 found++;
220 if (!UpdateExistingEntryWithCfId(column_family_id, key, kPutRecord)) {
221 AddNewEntry(column_family_id);
222 }
223 break;
224 case kTypeColumnFamilyDeletion:
225 case kTypeDeletion:
226 found++;
227 if (!UpdateExistingEntryWithCfId(column_family_id, key,
228 kDeleteRecord)) {
229 AddNewEntry(column_family_id);
230 }
231 break;
232 case kTypeColumnFamilySingleDeletion:
233 case kTypeSingleDeletion:
234 found++;
235 if (!UpdateExistingEntryWithCfId(column_family_id, key,
236 kSingleDeleteRecord)) {
237 AddNewEntry(column_family_id);
238 }
239 break;
240 case kTypeColumnFamilyMerge:
241 case kTypeMerge:
242 found++;
243 if (!UpdateExistingEntryWithCfId(column_family_id, key, kMergeRecord)) {
244 AddNewEntry(column_family_id);
245 }
246 break;
247 case kTypeLogData:
248 case kTypeBeginPrepareXID:
249 case kTypeBeginPersistedPrepareXID:
250 case kTypeBeginUnprepareXID:
251 case kTypeEndPrepareXID:
252 case kTypeCommitXID:
253 case kTypeCommitXIDAndTimestamp:
254 case kTypeRollbackXID:
255 case kTypeNoop:
256 break;
257 default:
258 return Status::Corruption(
259 "unknown WriteBatch tag in ReBuildIndex",
260 std::to_string(static_cast<unsigned int>(tag)));
261 }
262 }
263
264 if (s.ok() && found != write_batch.Count()) {
265 s = Status::Corruption("WriteBatch has wrong count");
266 }
267
268 return s;
269 }
270
271 WriteBatchWithIndex::WriteBatchWithIndex(
272 const Comparator* default_index_comparator, size_t reserved_bytes,
273 bool overwrite_key, size_t max_bytes, size_t protection_bytes_per_key)
274 : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
275 overwrite_key, protection_bytes_per_key)) {}
276
277 WriteBatchWithIndex::~WriteBatchWithIndex() {}
278
279 WriteBatchWithIndex::WriteBatchWithIndex(WriteBatchWithIndex&&) = default;
280
281 WriteBatchWithIndex& WriteBatchWithIndex::operator=(WriteBatchWithIndex&&) =
282 default;
283
284 WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
285
286 size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }
287
288 WBWIIterator* WriteBatchWithIndex::NewIterator() {
289 return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch,
290 &(rep->comparator));
291 }
292
293 WBWIIterator* WriteBatchWithIndex::NewIterator(
294 ColumnFamilyHandle* column_family) {
295 return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
296 &(rep->skip_list), &rep->write_batch,
297 &(rep->comparator));
298 }
299
300 Iterator* WriteBatchWithIndex::NewIteratorWithBase(
301 ColumnFamilyHandle* column_family, Iterator* base_iterator,
302 const ReadOptions* read_options) {
303 auto wbwiii =
304 new WBWIIteratorImpl(GetColumnFamilyID(column_family), &(rep->skip_list),
305 &rep->write_batch, &rep->comparator);
306 return new BaseDeltaIterator(column_family, base_iterator, wbwiii,
307 GetColumnFamilyUserComparator(column_family),
308 read_options);
309 }
310
311 Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
312 // default column family's comparator
313 auto wbwiii = new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch,
314 &rep->comparator);
315 return new BaseDeltaIterator(nullptr, base_iterator, wbwiii,
316 rep->comparator.default_comparator());
317 }
318
319 Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
320 const Slice& key, const Slice& value) {
321 rep->SetLastEntryOffset();
322 auto s = rep->write_batch.Put(column_family, key, value);
323 if (s.ok()) {
324 rep->AddOrUpdateIndex(column_family, key, kPutRecord);
325 }
326 return s;
327 }
328
329 Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
330 rep->SetLastEntryOffset();
331 auto s = rep->write_batch.Put(key, value);
332 if (s.ok()) {
333 rep->AddOrUpdateIndex(key, kPutRecord);
334 }
335 return s;
336 }
337
338 Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
339 const Slice& /*key*/, const Slice& /*ts*/,
340 const Slice& /*value*/) {
341 if (!column_family) {
342 return Status::InvalidArgument("column family handle cannot be nullptr");
343 }
344 // TODO: support WBWI::Put() with timestamp.
345 return Status::NotSupported();
346 }
347
348 Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
349 const Slice& key) {
350 rep->SetLastEntryOffset();
351 auto s = rep->write_batch.Delete(column_family, key);
352 if (s.ok()) {
353 rep->AddOrUpdateIndex(column_family, key, kDeleteRecord);
354 }
355 return s;
356 }
357
358 Status WriteBatchWithIndex::Delete(const Slice& key) {
359 rep->SetLastEntryOffset();
360 auto s = rep->write_batch.Delete(key);
361 if (s.ok()) {
362 rep->AddOrUpdateIndex(key, kDeleteRecord);
363 }
364 return s;
365 }
366
367 Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
368 const Slice& /*key*/, const Slice& /*ts*/) {
369 if (!column_family) {
370 return Status::InvalidArgument("column family handle cannot be nullptr");
371 }
372 // TODO: support WBWI::Delete() with timestamp.
373 return Status::NotSupported();
374 }
375
376 Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
377 const Slice& key) {
378 rep->SetLastEntryOffset();
379 auto s = rep->write_batch.SingleDelete(column_family, key);
380 if (s.ok()) {
381 rep->AddOrUpdateIndex(column_family, key, kSingleDeleteRecord);
382 }
383 return s;
384 }
385
386 Status WriteBatchWithIndex::SingleDelete(const Slice& key) {
387 rep->SetLastEntryOffset();
388 auto s = rep->write_batch.SingleDelete(key);
389 if (s.ok()) {
390 rep->AddOrUpdateIndex(key, kSingleDeleteRecord);
391 }
392 return s;
393 }
394
395 Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
396 const Slice& /*key*/,
397 const Slice& /*ts*/) {
398 if (!column_family) {
399 return Status::InvalidArgument("column family handle cannot be nullptr");
400 }
401 // TODO: support WBWI::SingleDelete() with timestamp.
402 return Status::NotSupported();
403 }
404
405 Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
406 const Slice& key, const Slice& value) {
407 rep->SetLastEntryOffset();
408 auto s = rep->write_batch.Merge(column_family, key, value);
409 if (s.ok()) {
410 rep->AddOrUpdateIndex(column_family, key, kMergeRecord);
411 }
412 return s;
413 }
414
415 Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
416 rep->SetLastEntryOffset();
417 auto s = rep->write_batch.Merge(key, value);
418 if (s.ok()) {
419 rep->AddOrUpdateIndex(key, kMergeRecord);
420 }
421 return s;
422 }
423
424 Status WriteBatchWithIndex::PutLogData(const Slice& blob) {
425 return rep->write_batch.PutLogData(blob);
426 }
427
428 void WriteBatchWithIndex::Clear() { rep->Clear(); }
429
430 Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
431 const DBOptions& options,
432 const Slice& key, std::string* value) {
433 Status s;
434 WriteBatchWithIndexInternal wbwii(&options, column_family);
435 auto result = wbwii.GetFromBatch(this, key, value, &s);
436
437 switch (result) {
438 case WBWIIteratorImpl::kFound:
439 case WBWIIteratorImpl::kError:
440 // use returned status
441 break;
442 case WBWIIteratorImpl::kDeleted:
443 case WBWIIteratorImpl::kNotFound:
444 s = Status::NotFound();
445 break;
446 case WBWIIteratorImpl::kMergeInProgress:
447 s = Status::MergeInProgress();
448 break;
449 default:
450 assert(false);
451 }
452
453 return s;
454 }
455
456 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
457 const ReadOptions& read_options,
458 const Slice& key,
459 std::string* value) {
460 assert(value != nullptr);
461 PinnableSlice pinnable_val(value);
462 assert(!pinnable_val.IsPinned());
463 auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
464 &pinnable_val);
465 if (s.ok() && pinnable_val.IsPinned()) {
466 value->assign(pinnable_val.data(), pinnable_val.size());
467 } // else value is already assigned
468 return s;
469 }
470
471 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
472 const ReadOptions& read_options,
473 const Slice& key,
474 PinnableSlice* pinnable_val) {
475 return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
476 pinnable_val);
477 }
478
479 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
480 const ReadOptions& read_options,
481 ColumnFamilyHandle* column_family,
482 const Slice& key,
483 std::string* value) {
484 assert(value != nullptr);
485 PinnableSlice pinnable_val(value);
486 assert(!pinnable_val.IsPinned());
487 auto s =
488 GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val);
489 if (s.ok() && pinnable_val.IsPinned()) {
490 value->assign(pinnable_val.data(), pinnable_val.size());
491 } // else value is already assigned
492 return s;
493 }
494
495 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
496 const ReadOptions& read_options,
497 ColumnFamilyHandle* column_family,
498 const Slice& key,
499 PinnableSlice* pinnable_val) {
500 return GetFromBatchAndDB(db, read_options, column_family, key, pinnable_val,
501 nullptr);
502 }
503
504 Status WriteBatchWithIndex::GetFromBatchAndDB(
505 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
506 const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
507 const Comparator* const ucmp = rep->comparator.GetComparator(column_family);
508 size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0;
509 if (ts_sz > 0 && !read_options.timestamp) {
510 return Status::InvalidArgument("Must specify timestamp");
511 }
512
513 Status s;
514 WriteBatchWithIndexInternal wbwii(db, column_family);
515
516 // Since the lifetime of the WriteBatch is the same as that of the transaction
517 // we cannot pin it as otherwise the returned value will not be available
518 // after the transaction finishes.
519 std::string& batch_value = *pinnable_val->GetSelf();
520 auto result = wbwii.GetFromBatch(this, key, &batch_value, &s);
521
522 if (result == WBWIIteratorImpl::kFound) {
523 pinnable_val->PinSelf();
524 return s;
525 } else if (!s.ok() || result == WBWIIteratorImpl::kError) {
526 return s;
527 } else if (result == WBWIIteratorImpl::kDeleted) {
528 return Status::NotFound();
529 }
530 assert(result == WBWIIteratorImpl::kMergeInProgress ||
531 result == WBWIIteratorImpl::kNotFound);
532
533 // Did not find key in batch OR could not resolve Merges. Try DB.
534 if (!callback) {
535 s = db->Get(read_options, column_family, key, pinnable_val);
536 } else {
537 DBImpl::GetImplOptions get_impl_options;
538 get_impl_options.column_family = column_family;
539 get_impl_options.value = pinnable_val;
540 get_impl_options.callback = callback;
541 s = static_cast_with_check<DBImpl>(db->GetRootDB())
542 ->GetImpl(read_options, key, get_impl_options);
543 }
544
545 if (s.ok() || s.IsNotFound()) { // DB Get Succeeded
546 if (result == WBWIIteratorImpl::kMergeInProgress) {
547 // Merge result from DB with merges in Batch
548 std::string merge_result;
549 if (s.ok()) {
550 s = wbwii.MergeKey(key, pinnable_val, &merge_result);
551 } else { // Key not present in db (s.IsNotFound())
552 s = wbwii.MergeKey(key, nullptr, &merge_result);
553 }
554 if (s.ok()) {
555 pinnable_val->Reset();
556 *pinnable_val->GetSelf() = std::move(merge_result);
557 pinnable_val->PinSelf();
558 }
559 }
560 }
561
562 return s;
563 }
564
565 void WriteBatchWithIndex::MultiGetFromBatchAndDB(
566 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
567 const size_t num_keys, const Slice* keys, PinnableSlice* values,
568 Status* statuses, bool sorted_input) {
569 MultiGetFromBatchAndDB(db, read_options, column_family, num_keys, keys,
570 values, statuses, sorted_input, nullptr);
571 }
572
573 void WriteBatchWithIndex::MultiGetFromBatchAndDB(
574 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
575 const size_t num_keys, const Slice* keys, PinnableSlice* values,
576 Status* statuses, bool sorted_input, ReadCallback* callback) {
577 const Comparator* const ucmp = rep->comparator.GetComparator(column_family);
578 size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0;
579 if (ts_sz > 0 && !read_options.timestamp) {
580 for (size_t i = 0; i < num_keys; ++i) {
581 statuses[i] = Status::InvalidArgument("Must specify timestamp");
582 }
583 return;
584 }
585
586 WriteBatchWithIndexInternal wbwii(db, column_family);
587
588 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
589 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
590 // To hold merges from the write batch
591 autovector<std::pair<WBWIIteratorImpl::Result, MergeContext>,
592 MultiGetContext::MAX_BATCH_SIZE>
593 merges;
594 // Since the lifetime of the WriteBatch is the same as that of the transaction
595 // we cannot pin it as otherwise the returned value will not be available
596 // after the transaction finishes.
597 for (size_t i = 0; i < num_keys; ++i) {
598 MergeContext merge_context;
599 std::string batch_value;
600 Status* s = &statuses[i];
601 PinnableSlice* pinnable_val = &values[i];
602 pinnable_val->Reset();
603 auto result =
604 wbwii.GetFromBatch(this, keys[i], &merge_context, &batch_value, s);
605
606 if (result == WBWIIteratorImpl::kFound) {
607 *pinnable_val->GetSelf() = std::move(batch_value);
608 pinnable_val->PinSelf();
609 continue;
610 }
611 if (result == WBWIIteratorImpl::kDeleted) {
612 *s = Status::NotFound();
613 continue;
614 }
615 if (result == WBWIIteratorImpl::kError) {
616 continue;
617 }
618 assert(result == WBWIIteratorImpl::kMergeInProgress ||
619 result == WBWIIteratorImpl::kNotFound);
620 key_context.emplace_back(column_family, keys[i], &values[i],
621 /*timestamp*/ nullptr, &statuses[i]);
622 merges.emplace_back(result, std::move(merge_context));
623 }
624
625 for (KeyContext& key : key_context) {
626 sorted_keys.emplace_back(&key);
627 }
628
629 // Did not find key in batch OR could not resolve Merges. Try DB.
630 static_cast_with_check<DBImpl>(db->GetRootDB())
631 ->PrepareMultiGetKeys(key_context.size(), sorted_input, &sorted_keys);
632 static_cast_with_check<DBImpl>(db->GetRootDB())
633 ->MultiGetWithCallback(read_options, column_family, callback,
634 &sorted_keys);
635
636 for (auto iter = key_context.begin(); iter != key_context.end(); ++iter) {
637 KeyContext& key = *iter;
638 if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded
639 size_t index = iter - key_context.begin();
640 std::pair<WBWIIteratorImpl::Result, MergeContext>& merge_result =
641 merges[index];
642 if (merge_result.first == WBWIIteratorImpl::kMergeInProgress) {
643 std::string merged_value;
644 // Merge result from DB with merges in Batch
645 if (key.s->ok()) {
646 *key.s = wbwii.MergeKey(*key.key, iter->value, merge_result.second,
647 &merged_value);
648 } else { // Key not present in db (s.IsNotFound())
649 *key.s = wbwii.MergeKey(*key.key, nullptr, merge_result.second,
650 &merged_value);
651 }
652 if (key.s->ok()) {
653 key.value->Reset();
654 *key.value->GetSelf() = std::move(merged_value);
655 key.value->PinSelf();
656 }
657 }
658 }
659 }
660 }
661
662 void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }
663
664 Status WriteBatchWithIndex::RollbackToSavePoint() {
665 Status s = rep->write_batch.RollbackToSavePoint();
666
667 if (s.ok()) {
668 rep->sub_batch_cnt = 1;
669 rep->last_sub_batch_offset = 0;
670 s = rep->ReBuildIndex();
671 }
672
673 return s;
674 }
675
676 Status WriteBatchWithIndex::PopSavePoint() {
677 return rep->write_batch.PopSavePoint();
678 }
679
680 void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) {
681 rep->write_batch.SetMaxBytes(max_bytes);
682 }
683
684 size_t WriteBatchWithIndex::GetDataSize() const {
685 return rep->write_batch.GetDataSize();
686 }
687
688 const Comparator* WriteBatchWithIndexInternal::GetUserComparator(
689 const WriteBatchWithIndex& wbwi, uint32_t cf_id) {
690 const WriteBatchEntryComparator& ucmps = wbwi.rep->comparator;
691 return ucmps.GetComparator(cf_id);
692 }
693
694 } // namespace ROCKSDB_NAMESPACE
695 #endif // !ROCKSDB_LITE