]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / write_batch_with_index / write_batch_with_index.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
8#include "rocksdb/utilities/write_batch_with_index.h"
9
7c673cae
FG
10#include <memory>
11
12#include "db/column_family.h"
f67539c2 13#include "db/db_impl/db_impl.h"
7c673cae
FG
14#include "db/merge_context.h"
15#include "db/merge_helper.h"
f67539c2 16#include "memory/arena.h"
7c673cae
FG
17#include "memtable/skiplist.h"
18#include "options/db_options.h"
19#include "rocksdb/comparator.h"
20#include "rocksdb/iterator.h"
11fdf7f2
TL
21#include "util/cast_util.h"
22#include "util/string_util.h"
7c673cae
FG
23#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
24
f67539c2 25namespace ROCKSDB_NAMESPACE {
7c673cae 26struct WriteBatchWithIndex::Rep {
11fdf7f2 27 explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
1e59de90
TL
28 size_t max_bytes = 0, bool _overwrite_key = false,
29 size_t protection_bytes_per_key = 0)
30 : write_batch(reserved_bytes, max_bytes, protection_bytes_per_key,
31 index_comparator ? index_comparator->timestamp_size() : 0),
7c673cae
FG
32 comparator(index_comparator, &write_batch),
33 skip_list(comparator, &arena),
34 overwrite_key(_overwrite_key),
11fdf7f2
TL
35 last_entry_offset(0),
36 last_sub_batch_offset(0),
37 sub_batch_cnt(1) {}
7c673cae
FG
38 ReadableWriteBatch write_batch;
39 WriteBatchEntryComparator comparator;
40 Arena arena;
41 WriteBatchEntrySkipList skip_list;
42 bool overwrite_key;
43 size_t last_entry_offset;
11fdf7f2
TL
44 // The starting offset of the last sub-batch. A sub-batch starts right before
45 // inserting a key that is a duplicate of a key in the last sub-batch. Zero,
46 // the default, means that no duplicate key is detected so far.
47 size_t last_sub_batch_offset;
48 // Total number of sub-batches in the write batch. Default is 1.
49 size_t sub_batch_cnt;
7c673cae
FG
50
51 // Remember current offset of internal write batch, which is used as
52 // the starting offset of the next record.
53 void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); }
54
55 // In overwrite mode, find the existing entry for the same key and update it
56 // to point to the current entry.
57 // Return true if the key is found and updated.
1e59de90
TL
58 bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key,
59 WriteType type);
60 bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key,
61 WriteType type);
7c673cae
FG
62
63 // Add the recent entry to the update.
64 // In overwrite mode, if key already exists in the index, update it.
1e59de90
TL
65 void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key,
66 WriteType type);
67 void AddOrUpdateIndex(const Slice& key, WriteType type);
7c673cae
FG
68
69 // Allocate an index entry pointing to the last entry in the write batch and
70 // put it to skip list.
71 void AddNewEntry(uint32_t column_family_id);
72
73 // Clear all updates buffered in this batch.
74 void Clear();
75 void ClearIndex();
76
77 // Rebuild index by reading all records from the batch.
78 // Returns non-ok status on corruption.
79 Status ReBuildIndex();
80};
81
82bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
1e59de90 83 ColumnFamilyHandle* column_family, const Slice& key, WriteType type) {
7c673cae 84 uint32_t cf_id = GetColumnFamilyID(column_family);
1e59de90 85 return UpdateExistingEntryWithCfId(cf_id, key, type);
7c673cae
FG
86}
87
88bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
1e59de90 89 uint32_t column_family_id, const Slice& key, WriteType type) {
7c673cae
FG
90 if (!overwrite_key) {
91 return false;
92 }
93
1e59de90
TL
94 WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch,
95 &comparator);
7c673cae
FG
96 iter.Seek(key);
97 if (!iter.Valid()) {
98 return false;
1e59de90 99 } else if (!iter.MatchesKey(column_family_id, key)) {
7c673cae 100 return false;
1e59de90
TL
101 } else {
102 // Move to the end of this key (NextKey-Prev)
103 iter.NextKey(); // Move to the next key
104 if (iter.Valid()) {
105 iter.Prev(); // Move back one entry
106 } else {
107 iter.SeekToLast();
108 }
7c673cae
FG
109 }
110 WriteBatchIndexEntry* non_const_entry =
111 const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
11fdf7f2
TL
112 if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) {
113 last_sub_batch_offset = last_entry_offset;
114 sub_batch_cnt++;
115 }
1e59de90
TL
116 if (type == kMergeRecord) {
117 return false;
118 } else {
119 non_const_entry->offset = last_entry_offset;
120 return true;
121 }
7c673cae
FG
122}
123
124void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
1e59de90
TL
125 ColumnFamilyHandle* column_family, const Slice& key, WriteType type) {
126 if (!UpdateExistingEntry(column_family, key, type)) {
7c673cae
FG
127 uint32_t cf_id = GetColumnFamilyID(column_family);
128 const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
129 if (cf_cmp != nullptr) {
130 comparator.SetComparatorForCF(cf_id, cf_cmp);
131 }
132 AddNewEntry(cf_id);
133 }
134}
135
1e59de90
TL
136void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key,
137 WriteType type) {
138 if (!UpdateExistingEntryWithCfId(0, key, type)) {
7c673cae
FG
139 AddNewEntry(0);
140 }
141}
142
143void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
144 const std::string& wb_data = write_batch.Data();
145 Slice entry_ptr = Slice(wb_data.data() + last_entry_offset,
146 wb_data.size() - last_entry_offset);
147 // Extract key
148 Slice key;
1e59de90 149 bool success =
7c673cae 150 ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
1e59de90
TL
151#ifdef NDEBUG
152 (void)success;
153#endif
7c673cae
FG
154 assert(success);
155
1e59de90
TL
156 const Comparator* const ucmp = comparator.GetComparator(column_family_id);
157 size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0;
158
159 if (ts_sz > 0) {
160 key.remove_suffix(ts_sz);
161 }
162
11fdf7f2
TL
163 auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
164 auto* index_entry =
165 new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
1e59de90 166 key.data() - wb_data.data(), key.size());
11fdf7f2
TL
167 skip_list.Insert(index_entry);
168}
7c673cae 169
11fdf7f2
TL
170void WriteBatchWithIndex::Rep::Clear() {
171 write_batch.Clear();
172 ClearIndex();
173}
7c673cae 174
11fdf7f2
TL
175void WriteBatchWithIndex::Rep::ClearIndex() {
176 skip_list.~WriteBatchEntrySkipList();
177 arena.~Arena();
178 new (&arena) Arena();
179 new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
180 last_entry_offset = 0;
181 last_sub_batch_offset = 0;
182 sub_batch_cnt = 1;
183}
7c673cae 184
11fdf7f2
TL
185Status WriteBatchWithIndex::Rep::ReBuildIndex() {
186 Status s;
7c673cae 187
11fdf7f2 188 ClearIndex();
7c673cae 189
11fdf7f2
TL
190 if (write_batch.Count() == 0) {
191 // Nothing to re-index
192 return s;
193 }
7c673cae 194
11fdf7f2 195 size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
7c673cae 196
11fdf7f2
TL
197 Slice input(write_batch.Data());
198 input.remove_prefix(offset);
7c673cae 199
11fdf7f2 200 // Loop through all entries in Rep and add each one to the index
f67539c2 201 uint32_t found = 0;
11fdf7f2
TL
202 while (s.ok() && !input.empty()) {
203 Slice key, value, blob, xid;
204 uint32_t column_family_id = 0; // default
205 char tag = 0;
7c673cae 206
11fdf7f2
TL
207 // set offset of current entry for call to AddNewEntry()
208 last_entry_offset = input.data() - write_batch.Data().data();
7c673cae 209
1e59de90
TL
210 s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, &value,
211 &blob, &xid);
11fdf7f2
TL
212 if (!s.ok()) {
213 break;
7c673cae
FG
214 }
215
11fdf7f2
TL
216 switch (tag) {
217 case kTypeColumnFamilyValue:
218 case kTypeValue:
1e59de90
TL
219 found++;
220 if (!UpdateExistingEntryWithCfId(column_family_id, key, kPutRecord)) {
221 AddNewEntry(column_family_id);
222 }
223 break;
11fdf7f2
TL
224 case kTypeColumnFamilyDeletion:
225 case kTypeDeletion:
1e59de90
TL
226 found++;
227 if (!UpdateExistingEntryWithCfId(column_family_id, key,
228 kDeleteRecord)) {
229 AddNewEntry(column_family_id);
230 }
231 break;
11fdf7f2
TL
232 case kTypeColumnFamilySingleDeletion:
233 case kTypeSingleDeletion:
1e59de90
TL
234 found++;
235 if (!UpdateExistingEntryWithCfId(column_family_id, key,
236 kSingleDeleteRecord)) {
237 AddNewEntry(column_family_id);
238 }
239 break;
11fdf7f2
TL
240 case kTypeColumnFamilyMerge:
241 case kTypeMerge:
242 found++;
1e59de90 243 if (!UpdateExistingEntryWithCfId(column_family_id, key, kMergeRecord)) {
11fdf7f2
TL
244 AddNewEntry(column_family_id);
245 }
246 break;
247 case kTypeLogData:
248 case kTypeBeginPrepareXID:
249 case kTypeBeginPersistedPrepareXID:
250 case kTypeBeginUnprepareXID:
251 case kTypeEndPrepareXID:
252 case kTypeCommitXID:
1e59de90 253 case kTypeCommitXIDAndTimestamp:
11fdf7f2
TL
254 case kTypeRollbackXID:
255 case kTypeNoop:
256 break;
257 default:
1e59de90
TL
258 return Status::Corruption(
259 "unknown WriteBatch tag in ReBuildIndex",
260 std::to_string(static_cast<unsigned int>(tag)));
7c673cae 261 }
11fdf7f2 262 }
7c673cae 263
11fdf7f2
TL
264 if (s.ok() && found != write_batch.Count()) {
265 s = Status::Corruption("WriteBatch has wrong count");
7c673cae
FG
266 }
267
11fdf7f2
TL
268 return s;
269}
270
271WriteBatchWithIndex::WriteBatchWithIndex(
272 const Comparator* default_index_comparator, size_t reserved_bytes,
1e59de90 273 bool overwrite_key, size_t max_bytes, size_t protection_bytes_per_key)
11fdf7f2 274 : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
1e59de90 275 overwrite_key, protection_bytes_per_key)) {}
7c673cae 276
11fdf7f2 277WriteBatchWithIndex::~WriteBatchWithIndex() {}
7c673cae 278
f67539c2
TL
279WriteBatchWithIndex::WriteBatchWithIndex(WriteBatchWithIndex&&) = default;
280
281WriteBatchWithIndex& WriteBatchWithIndex::operator=(WriteBatchWithIndex&&) =
282 default;
283
11fdf7f2 284WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
7c673cae 285
11fdf7f2
TL
286size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }
287
288WBWIIterator* WriteBatchWithIndex::NewIterator() {
1e59de90
TL
289 return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch,
290 &(rep->comparator));
7c673cae
FG
291}
292
293WBWIIterator* WriteBatchWithIndex::NewIterator(
294 ColumnFamilyHandle* column_family) {
295 return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
1e59de90
TL
296 &(rep->skip_list), &rep->write_batch,
297 &(rep->comparator));
7c673cae
FG
298}
299
300Iterator* WriteBatchWithIndex::NewIteratorWithBase(
f67539c2
TL
301 ColumnFamilyHandle* column_family, Iterator* base_iterator,
302 const ReadOptions* read_options) {
1e59de90
TL
303 auto wbwiii =
304 new WBWIIteratorImpl(GetColumnFamilyID(column_family), &(rep->skip_list),
305 &rep->write_batch, &rep->comparator);
306 return new BaseDeltaIterator(column_family, base_iterator, wbwiii,
f67539c2
TL
307 GetColumnFamilyUserComparator(column_family),
308 read_options);
7c673cae
FG
309}
310
311Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
7c673cae 312 // default column family's comparator
1e59de90
TL
313 auto wbwiii = new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch,
314 &rep->comparator);
315 return new BaseDeltaIterator(nullptr, base_iterator, wbwiii,
7c673cae
FG
316 rep->comparator.default_comparator());
317}
318
319Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
320 const Slice& key, const Slice& value) {
321 rep->SetLastEntryOffset();
322 auto s = rep->write_batch.Put(column_family, key, value);
323 if (s.ok()) {
1e59de90 324 rep->AddOrUpdateIndex(column_family, key, kPutRecord);
7c673cae
FG
325 }
326 return s;
327}
328
329Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
330 rep->SetLastEntryOffset();
331 auto s = rep->write_batch.Put(key, value);
332 if (s.ok()) {
1e59de90 333 rep->AddOrUpdateIndex(key, kPutRecord);
7c673cae
FG
334 }
335 return s;
336}
337
1e59de90
TL
338Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
339 const Slice& /*key*/, const Slice& /*ts*/,
340 const Slice& /*value*/) {
341 if (!column_family) {
342 return Status::InvalidArgument("column family handle cannot be nullptr");
343 }
344 // TODO: support WBWI::Put() with timestamp.
345 return Status::NotSupported();
346}
347
7c673cae
FG
348Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
349 const Slice& key) {
350 rep->SetLastEntryOffset();
351 auto s = rep->write_batch.Delete(column_family, key);
352 if (s.ok()) {
1e59de90 353 rep->AddOrUpdateIndex(column_family, key, kDeleteRecord);
7c673cae
FG
354 }
355 return s;
356}
357
358Status WriteBatchWithIndex::Delete(const Slice& key) {
359 rep->SetLastEntryOffset();
360 auto s = rep->write_batch.Delete(key);
361 if (s.ok()) {
1e59de90 362 rep->AddOrUpdateIndex(key, kDeleteRecord);
7c673cae
FG
363 }
364 return s;
365}
366
1e59de90
TL
367Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
368 const Slice& /*key*/, const Slice& /*ts*/) {
369 if (!column_family) {
370 return Status::InvalidArgument("column family handle cannot be nullptr");
371 }
372 // TODO: support WBWI::Delete() with timestamp.
373 return Status::NotSupported();
374}
375
7c673cae
FG
376Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
377 const Slice& key) {
378 rep->SetLastEntryOffset();
379 auto s = rep->write_batch.SingleDelete(column_family, key);
380 if (s.ok()) {
1e59de90 381 rep->AddOrUpdateIndex(column_family, key, kSingleDeleteRecord);
7c673cae
FG
382 }
383 return s;
384}
385
386Status WriteBatchWithIndex::SingleDelete(const Slice& key) {
387 rep->SetLastEntryOffset();
388 auto s = rep->write_batch.SingleDelete(key);
389 if (s.ok()) {
1e59de90 390 rep->AddOrUpdateIndex(key, kSingleDeleteRecord);
7c673cae
FG
391 }
392 return s;
393}
394
1e59de90
TL
395Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
396 const Slice& /*key*/,
397 const Slice& /*ts*/) {
398 if (!column_family) {
399 return Status::InvalidArgument("column family handle cannot be nullptr");
400 }
401 // TODO: support WBWI::SingleDelete() with timestamp.
402 return Status::NotSupported();
403}
404
7c673cae
FG
405Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
406 const Slice& key, const Slice& value) {
407 rep->SetLastEntryOffset();
408 auto s = rep->write_batch.Merge(column_family, key, value);
409 if (s.ok()) {
1e59de90 410 rep->AddOrUpdateIndex(column_family, key, kMergeRecord);
7c673cae
FG
411 }
412 return s;
413}
414
415Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
416 rep->SetLastEntryOffset();
417 auto s = rep->write_batch.Merge(key, value);
418 if (s.ok()) {
1e59de90 419 rep->AddOrUpdateIndex(key, kMergeRecord);
7c673cae
FG
420 }
421 return s;
422}
423
424Status WriteBatchWithIndex::PutLogData(const Slice& blob) {
425 return rep->write_batch.PutLogData(blob);
426}
427
428void WriteBatchWithIndex::Clear() { rep->Clear(); }
429
430Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
431 const DBOptions& options,
432 const Slice& key, std::string* value) {
433 Status s;
1e59de90
TL
434 WriteBatchWithIndexInternal wbwii(&options, column_family);
435 auto result = wbwii.GetFromBatch(this, key, value, &s);
7c673cae
FG
436
437 switch (result) {
1e59de90
TL
438 case WBWIIteratorImpl::kFound:
439 case WBWIIteratorImpl::kError:
7c673cae
FG
440 // use returned status
441 break;
1e59de90
TL
442 case WBWIIteratorImpl::kDeleted:
443 case WBWIIteratorImpl::kNotFound:
7c673cae
FG
444 s = Status::NotFound();
445 break;
1e59de90 446 case WBWIIteratorImpl::kMergeInProgress:
7c673cae
FG
447 s = Status::MergeInProgress();
448 break;
449 default:
450 assert(false);
451 }
452
453 return s;
454}
455
456Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
457 const ReadOptions& read_options,
458 const Slice& key,
459 std::string* value) {
11fdf7f2
TL
460 assert(value != nullptr);
461 PinnableSlice pinnable_val(value);
462 assert(!pinnable_val.IsPinned());
463 auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
464 &pinnable_val);
465 if (s.ok() && pinnable_val.IsPinned()) {
466 value->assign(pinnable_val.data(), pinnable_val.size());
467 } // else value is already assigned
468 return s;
469}
470
471Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
472 const ReadOptions& read_options,
473 const Slice& key,
474 PinnableSlice* pinnable_val) {
7c673cae 475 return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
11fdf7f2 476 pinnable_val);
7c673cae
FG
477}
478
479Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
480 const ReadOptions& read_options,
481 ColumnFamilyHandle* column_family,
482 const Slice& key,
483 std::string* value) {
11fdf7f2
TL
484 assert(value != nullptr);
485 PinnableSlice pinnable_val(value);
486 assert(!pinnable_val.IsPinned());
487 auto s =
488 GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val);
489 if (s.ok() && pinnable_val.IsPinned()) {
490 value->assign(pinnable_val.data(), pinnable_val.size());
491 } // else value is already assigned
492 return s;
493}
494
495Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
496 const ReadOptions& read_options,
497 ColumnFamilyHandle* column_family,
498 const Slice& key,
499 PinnableSlice* pinnable_val) {
500 return GetFromBatchAndDB(db, read_options, column_family, key, pinnable_val,
501 nullptr);
502}
503
504Status WriteBatchWithIndex::GetFromBatchAndDB(
505 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
506 const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
1e59de90
TL
507 const Comparator* const ucmp = rep->comparator.GetComparator(column_family);
508 size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0;
509 if (ts_sz > 0 && !read_options.timestamp) {
510 return Status::InvalidArgument("Must specify timestamp");
511 }
512
7c673cae 513 Status s;
1e59de90 514 WriteBatchWithIndexInternal wbwii(db, column_family);
7c673cae 515
11fdf7f2
TL
516 // Since the lifetime of the WriteBatch is the same as that of the transaction
517 // we cannot pin it as otherwise the returned value will not be available
518 // after the transaction finishes.
519 std::string& batch_value = *pinnable_val->GetSelf();
1e59de90 520 auto result = wbwii.GetFromBatch(this, key, &batch_value, &s);
7c673cae 521
1e59de90 522 if (result == WBWIIteratorImpl::kFound) {
11fdf7f2 523 pinnable_val->PinSelf();
7c673cae 524 return s;
1e59de90 525 } else if (!s.ok() || result == WBWIIteratorImpl::kError) {
7c673cae 526 return s;
1e59de90
TL
527 } else if (result == WBWIIteratorImpl::kDeleted) {
528 return Status::NotFound();
7c673cae 529 }
1e59de90
TL
530 assert(result == WBWIIteratorImpl::kMergeInProgress ||
531 result == WBWIIteratorImpl::kNotFound);
7c673cae
FG
532
533 // Did not find key in batch OR could not resolve Merges. Try DB.
11fdf7f2
TL
534 if (!callback) {
535 s = db->Get(read_options, column_family, key, pinnable_val);
536 } else {
f67539c2
TL
537 DBImpl::GetImplOptions get_impl_options;
538 get_impl_options.column_family = column_family;
539 get_impl_options.value = pinnable_val;
540 get_impl_options.callback = callback;
20effc67 541 s = static_cast_with_check<DBImpl>(db->GetRootDB())
f67539c2 542 ->GetImpl(read_options, key, get_impl_options);
11fdf7f2 543 }
7c673cae
FG
544
545 if (s.ok() || s.IsNotFound()) { // DB Get Succeeded
1e59de90 546 if (result == WBWIIteratorImpl::kMergeInProgress) {
7c673cae 547 // Merge result from DB with merges in Batch
1e59de90 548 std::string merge_result;
7c673cae 549 if (s.ok()) {
1e59de90 550 s = wbwii.MergeKey(key, pinnable_val, &merge_result);
7c673cae 551 } else { // Key not present in db (s.IsNotFound())
1e59de90 552 s = wbwii.MergeKey(key, nullptr, &merge_result);
7c673cae 553 }
1e59de90 554 if (s.ok()) {
f67539c2
TL
555 pinnable_val->Reset();
556 *pinnable_val->GetSelf() = std::move(merge_result);
11fdf7f2 557 pinnable_val->PinSelf();
7c673cae
FG
558 }
559 }
560 }
561
562 return s;
563}
564
f67539c2
TL
565void WriteBatchWithIndex::MultiGetFromBatchAndDB(
566 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
567 const size_t num_keys, const Slice* keys, PinnableSlice* values,
568 Status* statuses, bool sorted_input) {
569 MultiGetFromBatchAndDB(db, read_options, column_family, num_keys, keys,
570 values, statuses, sorted_input, nullptr);
571}
572
573void WriteBatchWithIndex::MultiGetFromBatchAndDB(
574 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
575 const size_t num_keys, const Slice* keys, PinnableSlice* values,
576 Status* statuses, bool sorted_input, ReadCallback* callback) {
1e59de90
TL
577 const Comparator* const ucmp = rep->comparator.GetComparator(column_family);
578 size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0;
579 if (ts_sz > 0 && !read_options.timestamp) {
580 for (size_t i = 0; i < num_keys; ++i) {
581 statuses[i] = Status::InvalidArgument("Must specify timestamp");
582 }
583 return;
584 }
585
586 WriteBatchWithIndexInternal wbwii(db, column_family);
f67539c2
TL
587
588 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
589 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
590 // To hold merges from the write batch
1e59de90 591 autovector<std::pair<WBWIIteratorImpl::Result, MergeContext>,
f67539c2
TL
592 MultiGetContext::MAX_BATCH_SIZE>
593 merges;
594 // Since the lifetime of the WriteBatch is the same as that of the transaction
595 // we cannot pin it as otherwise the returned value will not be available
596 // after the transaction finishes.
597 for (size_t i = 0; i < num_keys; ++i) {
598 MergeContext merge_context;
1e59de90 599 std::string batch_value;
f67539c2 600 Status* s = &statuses[i];
1e59de90
TL
601 PinnableSlice* pinnable_val = &values[i];
602 pinnable_val->Reset();
603 auto result =
604 wbwii.GetFromBatch(this, keys[i], &merge_context, &batch_value, s);
f67539c2 605
1e59de90
TL
606 if (result == WBWIIteratorImpl::kFound) {
607 *pinnable_val->GetSelf() = std::move(batch_value);
f67539c2
TL
608 pinnable_val->PinSelf();
609 continue;
610 }
1e59de90 611 if (result == WBWIIteratorImpl::kDeleted) {
f67539c2
TL
612 *s = Status::NotFound();
613 continue;
614 }
1e59de90 615 if (result == WBWIIteratorImpl::kError) {
f67539c2
TL
616 continue;
617 }
1e59de90
TL
618 assert(result == WBWIIteratorImpl::kMergeInProgress ||
619 result == WBWIIteratorImpl::kNotFound);
20effc67
TL
620 key_context.emplace_back(column_family, keys[i], &values[i],
621 /*timestamp*/ nullptr, &statuses[i]);
f67539c2
TL
622 merges.emplace_back(result, std::move(merge_context));
623 }
624
625 for (KeyContext& key : key_context) {
626 sorted_keys.emplace_back(&key);
627 }
628
629 // Did not find key in batch OR could not resolve Merges. Try DB.
20effc67 630 static_cast_with_check<DBImpl>(db->GetRootDB())
f67539c2 631 ->PrepareMultiGetKeys(key_context.size(), sorted_input, &sorted_keys);
20effc67 632 static_cast_with_check<DBImpl>(db->GetRootDB())
f67539c2
TL
633 ->MultiGetWithCallback(read_options, column_family, callback,
634 &sorted_keys);
635
f67539c2
TL
636 for (auto iter = key_context.begin(); iter != key_context.end(); ++iter) {
637 KeyContext& key = *iter;
638 if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded
639 size_t index = iter - key_context.begin();
1e59de90
TL
640 std::pair<WBWIIteratorImpl::Result, MergeContext>& merge_result =
641 merges[index];
642 if (merge_result.first == WBWIIteratorImpl::kMergeInProgress) {
643 std::string merged_value;
f67539c2 644 // Merge result from DB with merges in Batch
f67539c2 645 if (key.s->ok()) {
1e59de90
TL
646 *key.s = wbwii.MergeKey(*key.key, iter->value, merge_result.second,
647 &merged_value);
f67539c2 648 } else { // Key not present in db (s.IsNotFound())
1e59de90
TL
649 *key.s = wbwii.MergeKey(*key.key, nullptr, merge_result.second,
650 &merged_value);
f67539c2 651 }
1e59de90
TL
652 if (key.s->ok()) {
653 key.value->Reset();
654 *key.value->GetSelf() = std::move(merged_value);
f67539c2 655 key.value->PinSelf();
f67539c2
TL
656 }
657 }
658 }
659 }
660}
661
7c673cae
FG
662void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }
663
664Status WriteBatchWithIndex::RollbackToSavePoint() {
665 Status s = rep->write_batch.RollbackToSavePoint();
666
667 if (s.ok()) {
11fdf7f2
TL
668 rep->sub_batch_cnt = 1;
669 rep->last_sub_batch_offset = 0;
7c673cae
FG
670 s = rep->ReBuildIndex();
671 }
672
673 return s;
674}
675
11fdf7f2
TL
676Status WriteBatchWithIndex::PopSavePoint() {
677 return rep->write_batch.PopSavePoint();
678}
679
7c673cae
FG
680void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) {
681 rep->write_batch.SetMaxBytes(max_bytes);
682}
683
11fdf7f2
TL
684size_t WriteBatchWithIndex::GetDataSize() const {
685 return rep->write_batch.GetDataSize();
686}
687
1e59de90
TL
688const Comparator* WriteBatchWithIndexInternal::GetUserComparator(
689 const WriteBatchWithIndex& wbwi, uint32_t cf_id) {
690 const WriteBatchEntryComparator& ucmps = wbwi.rep->comparator;
691 return ucmps.GetComparator(cf_id);
692}
693
f67539c2 694} // namespace ROCKSDB_NAMESPACE
7c673cae 695#endif // !ROCKSDB_LITE