]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under the BSD-style license found in the | |
3 | // LICENSE file in the root directory of this source tree. An additional grant | |
4 | // of patent rights can be found in the PATENTS file in the same directory. | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include "rocksdb/utilities/write_batch_with_index.h" | |
9 | ||
10 | #include <limits> | |
11 | #include <memory> | |
12 | ||
13 | #include "db/column_family.h" | |
14 | #include "db/db_impl.h" | |
15 | #include "db/merge_context.h" | |
16 | #include "db/merge_helper.h" | |
17 | #include "memtable/skiplist.h" | |
18 | #include "options/db_options.h" | |
19 | #include "rocksdb/comparator.h" | |
20 | #include "rocksdb/iterator.h" | |
21 | #include "util/arena.h" | |
22 | #include "utilities/write_batch_with_index/write_batch_with_index_internal.h" | |
23 | ||
24 | namespace rocksdb { | |
25 | ||
26 | // when direction == forward | |
27 | // * current_at_base_ <=> base_iterator > delta_iterator | |
28 | // when direction == backwards | |
29 | // * current_at_base_ <=> base_iterator < delta_iterator | |
30 | // always: | |
31 | // * equal_keys_ <=> base_iterator == delta_iterator | |
32 | class BaseDeltaIterator : public Iterator { | |
33 | public: | |
34 | BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator, | |
35 | const Comparator* comparator) | |
36 | : forward_(true), | |
37 | current_at_base_(true), | |
38 | equal_keys_(false), | |
39 | status_(Status::OK()), | |
40 | base_iterator_(base_iterator), | |
41 | delta_iterator_(delta_iterator), | |
42 | comparator_(comparator) {} | |
43 | ||
44 | virtual ~BaseDeltaIterator() {} | |
45 | ||
46 | bool Valid() const override { | |
47 | return current_at_base_ ? BaseValid() : DeltaValid(); | |
48 | } | |
49 | ||
50 | void SeekToFirst() override { | |
51 | forward_ = true; | |
52 | base_iterator_->SeekToFirst(); | |
53 | delta_iterator_->SeekToFirst(); | |
54 | UpdateCurrent(); | |
55 | } | |
56 | ||
57 | void SeekToLast() override { | |
58 | forward_ = false; | |
59 | base_iterator_->SeekToLast(); | |
60 | delta_iterator_->SeekToLast(); | |
61 | UpdateCurrent(); | |
62 | } | |
63 | ||
64 | void Seek(const Slice& k) override { | |
65 | forward_ = true; | |
66 | base_iterator_->Seek(k); | |
67 | delta_iterator_->Seek(k); | |
68 | UpdateCurrent(); | |
69 | } | |
70 | ||
71 | void SeekForPrev(const Slice& k) override { | |
72 | forward_ = false; | |
73 | base_iterator_->SeekForPrev(k); | |
74 | delta_iterator_->SeekForPrev(k); | |
75 | UpdateCurrent(); | |
76 | } | |
77 | ||
78 | void Next() override { | |
79 | if (!Valid()) { | |
80 | status_ = Status::NotSupported("Next() on invalid iterator"); | |
81 | } | |
82 | ||
83 | if (!forward_) { | |
84 | // Need to change direction | |
85 | // if our direction was backward and we're not equal, we have two states: | |
86 | // * both iterators are valid: we're already in a good state (current | |
87 | // shows to smaller) | |
88 | // * only one iterator is valid: we need to advance that iterator | |
89 | forward_ = true; | |
90 | equal_keys_ = false; | |
91 | if (!BaseValid()) { | |
92 | assert(DeltaValid()); | |
93 | base_iterator_->SeekToFirst(); | |
94 | } else if (!DeltaValid()) { | |
95 | delta_iterator_->SeekToFirst(); | |
96 | } else if (current_at_base_) { | |
97 | // Change delta from larger than base to smaller | |
98 | AdvanceDelta(); | |
99 | } else { | |
100 | // Change base from larger than delta to smaller | |
101 | AdvanceBase(); | |
102 | } | |
103 | if (DeltaValid() && BaseValid()) { | |
104 | if (comparator_->Equal(delta_iterator_->Entry().key, | |
105 | base_iterator_->key())) { | |
106 | equal_keys_ = true; | |
107 | } | |
108 | } | |
109 | } | |
110 | Advance(); | |
111 | } | |
112 | ||
113 | void Prev() override { | |
114 | if (!Valid()) { | |
115 | status_ = Status::NotSupported("Prev() on invalid iterator"); | |
116 | } | |
117 | ||
118 | if (forward_) { | |
119 | // Need to change direction | |
120 | // if our direction was backward and we're not equal, we have two states: | |
121 | // * both iterators are valid: we're already in a good state (current | |
122 | // shows to smaller) | |
123 | // * only one iterator is valid: we need to advance that iterator | |
124 | forward_ = false; | |
125 | equal_keys_ = false; | |
126 | if (!BaseValid()) { | |
127 | assert(DeltaValid()); | |
128 | base_iterator_->SeekToLast(); | |
129 | } else if (!DeltaValid()) { | |
130 | delta_iterator_->SeekToLast(); | |
131 | } else if (current_at_base_) { | |
132 | // Change delta from less advanced than base to more advanced | |
133 | AdvanceDelta(); | |
134 | } else { | |
135 | // Change base from less advanced than delta to more advanced | |
136 | AdvanceBase(); | |
137 | } | |
138 | if (DeltaValid() && BaseValid()) { | |
139 | if (comparator_->Equal(delta_iterator_->Entry().key, | |
140 | base_iterator_->key())) { | |
141 | equal_keys_ = true; | |
142 | } | |
143 | } | |
144 | } | |
145 | ||
146 | Advance(); | |
147 | } | |
148 | ||
149 | Slice key() const override { | |
150 | return current_at_base_ ? base_iterator_->key() | |
151 | : delta_iterator_->Entry().key; | |
152 | } | |
153 | ||
154 | Slice value() const override { | |
155 | return current_at_base_ ? base_iterator_->value() | |
156 | : delta_iterator_->Entry().value; | |
157 | } | |
158 | ||
159 | Status status() const override { | |
160 | if (!status_.ok()) { | |
161 | return status_; | |
162 | } | |
163 | if (!base_iterator_->status().ok()) { | |
164 | return base_iterator_->status(); | |
165 | } | |
166 | return delta_iterator_->status(); | |
167 | } | |
168 | ||
169 | private: | |
170 | void AssertInvariants() { | |
171 | #ifndef NDEBUG | |
172 | if (!Valid()) { | |
173 | return; | |
174 | } | |
175 | if (!BaseValid()) { | |
176 | assert(!current_at_base_ && delta_iterator_->Valid()); | |
177 | return; | |
178 | } | |
179 | if (!DeltaValid()) { | |
180 | assert(current_at_base_ && base_iterator_->Valid()); | |
181 | return; | |
182 | } | |
183 | // we don't support those yet | |
184 | assert(delta_iterator_->Entry().type != kMergeRecord && | |
185 | delta_iterator_->Entry().type != kLogDataRecord); | |
186 | int compare = comparator_->Compare(delta_iterator_->Entry().key, | |
187 | base_iterator_->key()); | |
188 | if (forward_) { | |
189 | // current_at_base -> compare < 0 | |
190 | assert(!current_at_base_ || compare < 0); | |
191 | // !current_at_base -> compare <= 0 | |
192 | assert(current_at_base_ && compare >= 0); | |
193 | } else { | |
194 | // current_at_base -> compare > 0 | |
195 | assert(!current_at_base_ || compare > 0); | |
196 | // !current_at_base -> compare <= 0 | |
197 | assert(current_at_base_ && compare <= 0); | |
198 | } | |
199 | // equal_keys_ <=> compare == 0 | |
200 | assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0)); | |
201 | #endif | |
202 | } | |
203 | ||
204 | void Advance() { | |
205 | if (equal_keys_) { | |
206 | assert(BaseValid() && DeltaValid()); | |
207 | AdvanceBase(); | |
208 | AdvanceDelta(); | |
209 | } else { | |
210 | if (current_at_base_) { | |
211 | assert(BaseValid()); | |
212 | AdvanceBase(); | |
213 | } else { | |
214 | assert(DeltaValid()); | |
215 | AdvanceDelta(); | |
216 | } | |
217 | } | |
218 | UpdateCurrent(); | |
219 | } | |
220 | ||
221 | void AdvanceDelta() { | |
222 | if (forward_) { | |
223 | delta_iterator_->Next(); | |
224 | } else { | |
225 | delta_iterator_->Prev(); | |
226 | } | |
227 | } | |
228 | void AdvanceBase() { | |
229 | if (forward_) { | |
230 | base_iterator_->Next(); | |
231 | } else { | |
232 | base_iterator_->Prev(); | |
233 | } | |
234 | } | |
235 | bool BaseValid() const { return base_iterator_->Valid(); } | |
236 | bool DeltaValid() const { return delta_iterator_->Valid(); } | |
237 | void UpdateCurrent() { | |
238 | // Suppress false positive clang analyzer warnings. | |
239 | #ifndef __clang_analyzer__ | |
240 | while (true) { | |
241 | WriteEntry delta_entry; | |
242 | if (DeltaValid()) { | |
243 | delta_entry = delta_iterator_->Entry(); | |
244 | } | |
245 | equal_keys_ = false; | |
246 | if (!BaseValid()) { | |
247 | // Base has finished. | |
248 | if (!DeltaValid()) { | |
249 | // Finished | |
250 | return; | |
251 | } | |
252 | if (delta_entry.type == kDeleteRecord || | |
253 | delta_entry.type == kSingleDeleteRecord) { | |
254 | AdvanceDelta(); | |
255 | } else { | |
256 | current_at_base_ = false; | |
257 | return; | |
258 | } | |
259 | } else if (!DeltaValid()) { | |
260 | // Delta has finished. | |
261 | current_at_base_ = true; | |
262 | return; | |
263 | } else { | |
264 | int compare = | |
265 | (forward_ ? 1 : -1) * | |
266 | comparator_->Compare(delta_entry.key, base_iterator_->key()); | |
267 | if (compare <= 0) { // delta bigger or equal | |
268 | if (compare == 0) { | |
269 | equal_keys_ = true; | |
270 | } | |
271 | if (delta_entry.type != kDeleteRecord && | |
272 | delta_entry.type != kSingleDeleteRecord) { | |
273 | current_at_base_ = false; | |
274 | return; | |
275 | } | |
276 | // Delta is less advanced and is delete. | |
277 | AdvanceDelta(); | |
278 | if (equal_keys_) { | |
279 | AdvanceBase(); | |
280 | } | |
281 | } else { | |
282 | current_at_base_ = true; | |
283 | return; | |
284 | } | |
285 | } | |
286 | } | |
287 | ||
288 | AssertInvariants(); | |
289 | #endif // __clang_analyzer__ | |
290 | } | |
291 | ||
292 | bool forward_; | |
293 | bool current_at_base_; | |
294 | bool equal_keys_; | |
295 | Status status_; | |
296 | std::unique_ptr<Iterator> base_iterator_; | |
297 | std::unique_ptr<WBWIIterator> delta_iterator_; | |
298 | const Comparator* comparator_; // not owned | |
299 | }; | |
300 | ||
301 | typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&> | |
302 | WriteBatchEntrySkipList; | |
303 | ||
304 | class WBWIIteratorImpl : public WBWIIterator { | |
305 | public: | |
306 | WBWIIteratorImpl(uint32_t column_family_id, | |
307 | WriteBatchEntrySkipList* skip_list, | |
308 | const ReadableWriteBatch* write_batch) | |
309 | : column_family_id_(column_family_id), | |
310 | skip_list_iter_(skip_list), | |
311 | write_batch_(write_batch) {} | |
312 | ||
313 | virtual ~WBWIIteratorImpl() {} | |
314 | ||
315 | virtual bool Valid() const override { | |
316 | if (!skip_list_iter_.Valid()) { | |
317 | return false; | |
318 | } | |
319 | const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); | |
320 | return (iter_entry != nullptr && | |
321 | iter_entry->column_family == column_family_id_); | |
322 | } | |
323 | ||
324 | virtual void SeekToFirst() override { | |
325 | WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, | |
326 | column_family_id_, 0, 0); | |
327 | skip_list_iter_.Seek(&search_entry); | |
328 | } | |
329 | ||
330 | virtual void SeekToLast() override { | |
331 | WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, | |
332 | column_family_id_ + 1, 0, 0); | |
333 | skip_list_iter_.Seek(&search_entry); | |
334 | if (!skip_list_iter_.Valid()) { | |
335 | skip_list_iter_.SeekToLast(); | |
336 | } else { | |
337 | skip_list_iter_.Prev(); | |
338 | } | |
339 | } | |
340 | ||
341 | virtual void Seek(const Slice& key) override { | |
342 | WriteBatchIndexEntry search_entry(&key, column_family_id_); | |
343 | skip_list_iter_.Seek(&search_entry); | |
344 | } | |
345 | ||
346 | virtual void SeekForPrev(const Slice& key) override { | |
347 | WriteBatchIndexEntry search_entry(&key, column_family_id_); | |
348 | skip_list_iter_.SeekForPrev(&search_entry); | |
349 | } | |
350 | ||
351 | virtual void Next() override { skip_list_iter_.Next(); } | |
352 | ||
353 | virtual void Prev() override { skip_list_iter_.Prev(); } | |
354 | ||
355 | virtual WriteEntry Entry() const override { | |
356 | WriteEntry ret; | |
357 | Slice blob, xid; | |
358 | const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); | |
359 | // this is guaranteed with Valid() | |
360 | assert(iter_entry != nullptr && | |
361 | iter_entry->column_family == column_family_id_); | |
362 | auto s = write_batch_->GetEntryFromDataOffset( | |
363 | iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid); | |
364 | assert(s.ok()); | |
365 | assert(ret.type == kPutRecord || ret.type == kDeleteRecord || | |
366 | ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord || | |
367 | ret.type == kMergeRecord); | |
368 | return ret; | |
369 | } | |
370 | ||
371 | virtual Status status() const override { | |
372 | // this is in-memory data structure, so the only way status can be non-ok is | |
373 | // through memory corruption | |
374 | return Status::OK(); | |
375 | } | |
376 | ||
377 | const WriteBatchIndexEntry* GetRawEntry() const { | |
378 | return skip_list_iter_.key(); | |
379 | } | |
380 | ||
381 | private: | |
382 | uint32_t column_family_id_; | |
383 | WriteBatchEntrySkipList::Iterator skip_list_iter_; | |
384 | const ReadableWriteBatch* write_batch_; | |
385 | }; | |
386 | ||
387 | struct WriteBatchWithIndex::Rep { | |
388 | Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, | |
389 | size_t max_bytes = 0, bool _overwrite_key = false) | |
390 | : write_batch(reserved_bytes, max_bytes), | |
391 | comparator(index_comparator, &write_batch), | |
392 | skip_list(comparator, &arena), | |
393 | overwrite_key(_overwrite_key), | |
394 | last_entry_offset(0) {} | |
395 | ReadableWriteBatch write_batch; | |
396 | WriteBatchEntryComparator comparator; | |
397 | Arena arena; | |
398 | WriteBatchEntrySkipList skip_list; | |
399 | bool overwrite_key; | |
400 | size_t last_entry_offset; | |
401 | ||
402 | // Remember current offset of internal write batch, which is used as | |
403 | // the starting offset of the next record. | |
404 | void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); } | |
405 | ||
406 | // In overwrite mode, find the existing entry for the same key and update it | |
407 | // to point to the current entry. | |
408 | // Return true if the key is found and updated. | |
409 | bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key); | |
410 | bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key); | |
411 | ||
412 | // Add the recent entry to the update. | |
413 | // In overwrite mode, if key already exists in the index, update it. | |
414 | void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key); | |
415 | void AddOrUpdateIndex(const Slice& key); | |
416 | ||
417 | // Allocate an index entry pointing to the last entry in the write batch and | |
418 | // put it to skip list. | |
419 | void AddNewEntry(uint32_t column_family_id); | |
420 | ||
421 | // Clear all updates buffered in this batch. | |
422 | void Clear(); | |
423 | void ClearIndex(); | |
424 | ||
425 | // Rebuild index by reading all records from the batch. | |
426 | // Returns non-ok status on corruption. | |
427 | Status ReBuildIndex(); | |
428 | }; | |
429 | ||
430 | bool WriteBatchWithIndex::Rep::UpdateExistingEntry( | |
431 | ColumnFamilyHandle* column_family, const Slice& key) { | |
432 | uint32_t cf_id = GetColumnFamilyID(column_family); | |
433 | return UpdateExistingEntryWithCfId(cf_id, key); | |
434 | } | |
435 | ||
436 | bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( | |
437 | uint32_t column_family_id, const Slice& key) { | |
438 | if (!overwrite_key) { | |
439 | return false; | |
440 | } | |
441 | ||
442 | WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch); | |
443 | iter.Seek(key); | |
444 | if (!iter.Valid()) { | |
445 | return false; | |
446 | } | |
447 | if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) { | |
448 | return false; | |
449 | } | |
450 | WriteBatchIndexEntry* non_const_entry = | |
451 | const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry()); | |
452 | non_const_entry->offset = last_entry_offset; | |
453 | return true; | |
454 | } | |
455 | ||
456 | void WriteBatchWithIndex::Rep::AddOrUpdateIndex( | |
457 | ColumnFamilyHandle* column_family, const Slice& key) { | |
458 | if (!UpdateExistingEntry(column_family, key)) { | |
459 | uint32_t cf_id = GetColumnFamilyID(column_family); | |
460 | const auto* cf_cmp = GetColumnFamilyUserComparator(column_family); | |
461 | if (cf_cmp != nullptr) { | |
462 | comparator.SetComparatorForCF(cf_id, cf_cmp); | |
463 | } | |
464 | AddNewEntry(cf_id); | |
465 | } | |
466 | } | |
467 | ||
468 | void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) { | |
469 | if (!UpdateExistingEntryWithCfId(0, key)) { | |
470 | AddNewEntry(0); | |
471 | } | |
472 | } | |
473 | ||
474 | void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { | |
475 | const std::string& wb_data = write_batch.Data(); | |
476 | Slice entry_ptr = Slice(wb_data.data() + last_entry_offset, | |
477 | wb_data.size() - last_entry_offset); | |
478 | // Extract key | |
479 | Slice key; | |
480 | bool success __attribute__((__unused__)) = | |
481 | ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0); | |
482 | assert(success); | |
483 | ||
484 | auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry)); | |
485 | auto* index_entry = | |
486 | new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id, | |
487 | key.data() - wb_data.data(), key.size()); | |
488 | skip_list.Insert(index_entry); | |
489 | } | |
490 | ||
491 | void WriteBatchWithIndex::Rep::Clear() { | |
492 | write_batch.Clear(); | |
493 | ClearIndex(); | |
494 | } | |
495 | ||
496 | void WriteBatchWithIndex::Rep::ClearIndex() { | |
497 | skip_list.~WriteBatchEntrySkipList(); | |
498 | arena.~Arena(); | |
499 | new (&arena) Arena(); | |
500 | new (&skip_list) WriteBatchEntrySkipList(comparator, &arena); | |
501 | last_entry_offset = 0; | |
502 | } | |
503 | ||
504 | Status WriteBatchWithIndex::Rep::ReBuildIndex() { | |
505 | Status s; | |
506 | ||
507 | ClearIndex(); | |
508 | ||
509 | if (write_batch.Count() == 0) { | |
510 | // Nothing to re-index | |
511 | return s; | |
512 | } | |
513 | ||
514 | size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch); | |
515 | ||
516 | Slice input(write_batch.Data()); | |
517 | input.remove_prefix(offset); | |
518 | ||
519 | // Loop through all entries in Rep and add each one to the index | |
520 | int found = 0; | |
521 | while (s.ok() && !input.empty()) { | |
522 | Slice key, value, blob, xid; | |
523 | uint32_t column_family_id = 0; // default | |
524 | char tag = 0; | |
525 | ||
526 | // set offset of current entry for call to AddNewEntry() | |
527 | last_entry_offset = input.data() - write_batch.Data().data(); | |
528 | ||
529 | s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, | |
530 | &value, &blob, &xid); | |
531 | if (!s.ok()) { | |
532 | break; | |
533 | } | |
534 | ||
535 | switch (tag) { | |
536 | case kTypeColumnFamilyValue: | |
537 | case kTypeValue: | |
538 | case kTypeColumnFamilyDeletion: | |
539 | case kTypeDeletion: | |
540 | case kTypeColumnFamilySingleDeletion: | |
541 | case kTypeSingleDeletion: | |
542 | case kTypeColumnFamilyMerge: | |
543 | case kTypeMerge: | |
544 | found++; | |
545 | if (!UpdateExistingEntryWithCfId(column_family_id, key)) { | |
546 | AddNewEntry(column_family_id); | |
547 | } | |
548 | break; | |
549 | case kTypeLogData: | |
550 | case kTypeBeginPrepareXID: | |
551 | case kTypeEndPrepareXID: | |
552 | case kTypeCommitXID: | |
553 | case kTypeRollbackXID: | |
554 | case kTypeNoop: | |
555 | break; | |
556 | default: | |
557 | return Status::Corruption("unknown WriteBatch tag"); | |
558 | } | |
559 | } | |
560 | ||
561 | if (s.ok() && found != write_batch.Count()) { | |
562 | s = Status::Corruption("WriteBatch has wrong count"); | |
563 | } | |
564 | ||
565 | return s; | |
566 | } | |
567 | ||
568 | WriteBatchWithIndex::WriteBatchWithIndex( | |
569 | const Comparator* default_index_comparator, size_t reserved_bytes, | |
570 | bool overwrite_key, size_t max_bytes) | |
571 | : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes, | |
572 | overwrite_key)) {} | |
573 | ||
574 | WriteBatchWithIndex::~WriteBatchWithIndex() {} | |
575 | ||
576 | WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } | |
577 | ||
578 | WBWIIterator* WriteBatchWithIndex::NewIterator() { | |
579 | return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch); | |
580 | } | |
581 | ||
582 | WBWIIterator* WriteBatchWithIndex::NewIterator( | |
583 | ColumnFamilyHandle* column_family) { | |
584 | return new WBWIIteratorImpl(GetColumnFamilyID(column_family), | |
585 | &(rep->skip_list), &rep->write_batch); | |
586 | } | |
587 | ||
588 | Iterator* WriteBatchWithIndex::NewIteratorWithBase( | |
589 | ColumnFamilyHandle* column_family, Iterator* base_iterator) { | |
590 | if (rep->overwrite_key == false) { | |
591 | assert(false); | |
592 | return nullptr; | |
593 | } | |
594 | return new BaseDeltaIterator(base_iterator, NewIterator(column_family), | |
595 | GetColumnFamilyUserComparator(column_family)); | |
596 | } | |
597 | ||
598 | Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) { | |
599 | if (rep->overwrite_key == false) { | |
600 | assert(false); | |
601 | return nullptr; | |
602 | } | |
603 | // default column family's comparator | |
604 | return new BaseDeltaIterator(base_iterator, NewIterator(), | |
605 | rep->comparator.default_comparator()); | |
606 | } | |
607 | ||
608 | Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, | |
609 | const Slice& key, const Slice& value) { | |
610 | rep->SetLastEntryOffset(); | |
611 | auto s = rep->write_batch.Put(column_family, key, value); | |
612 | if (s.ok()) { | |
613 | rep->AddOrUpdateIndex(column_family, key); | |
614 | } | |
615 | return s; | |
616 | } | |
617 | ||
618 | Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { | |
619 | rep->SetLastEntryOffset(); | |
620 | auto s = rep->write_batch.Put(key, value); | |
621 | if (s.ok()) { | |
622 | rep->AddOrUpdateIndex(key); | |
623 | } | |
624 | return s; | |
625 | } | |
626 | ||
627 | Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, | |
628 | const Slice& key) { | |
629 | rep->SetLastEntryOffset(); | |
630 | auto s = rep->write_batch.Delete(column_family, key); | |
631 | if (s.ok()) { | |
632 | rep->AddOrUpdateIndex(column_family, key); | |
633 | } | |
634 | return s; | |
635 | } | |
636 | ||
637 | Status WriteBatchWithIndex::Delete(const Slice& key) { | |
638 | rep->SetLastEntryOffset(); | |
639 | auto s = rep->write_batch.Delete(key); | |
640 | if (s.ok()) { | |
641 | rep->AddOrUpdateIndex(key); | |
642 | } | |
643 | return s; | |
644 | } | |
645 | ||
646 | Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, | |
647 | const Slice& key) { | |
648 | rep->SetLastEntryOffset(); | |
649 | auto s = rep->write_batch.SingleDelete(column_family, key); | |
650 | if (s.ok()) { | |
651 | rep->AddOrUpdateIndex(column_family, key); | |
652 | } | |
653 | return s; | |
654 | } | |
655 | ||
656 | Status WriteBatchWithIndex::SingleDelete(const Slice& key) { | |
657 | rep->SetLastEntryOffset(); | |
658 | auto s = rep->write_batch.SingleDelete(key); | |
659 | if (s.ok()) { | |
660 | rep->AddOrUpdateIndex(key); | |
661 | } | |
662 | return s; | |
663 | } | |
664 | ||
665 | Status WriteBatchWithIndex::DeleteRange(ColumnFamilyHandle* column_family, | |
666 | const Slice& begin_key, | |
667 | const Slice& end_key) { | |
668 | rep->SetLastEntryOffset(); | |
669 | auto s = rep->write_batch.DeleteRange(column_family, begin_key, end_key); | |
670 | if (s.ok()) { | |
671 | rep->AddOrUpdateIndex(column_family, begin_key); | |
672 | } | |
673 | return s; | |
674 | } | |
675 | ||
676 | Status WriteBatchWithIndex::DeleteRange(const Slice& begin_key, | |
677 | const Slice& end_key) { | |
678 | rep->SetLastEntryOffset(); | |
679 | auto s = rep->write_batch.DeleteRange(begin_key, end_key); | |
680 | if (s.ok()) { | |
681 | rep->AddOrUpdateIndex(begin_key); | |
682 | } | |
683 | return s; | |
684 | } | |
685 | ||
686 | Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, | |
687 | const Slice& key, const Slice& value) { | |
688 | rep->SetLastEntryOffset(); | |
689 | auto s = rep->write_batch.Merge(column_family, key, value); | |
690 | if (s.ok()) { | |
691 | rep->AddOrUpdateIndex(column_family, key); | |
692 | } | |
693 | return s; | |
694 | } | |
695 | ||
696 | Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { | |
697 | rep->SetLastEntryOffset(); | |
698 | auto s = rep->write_batch.Merge(key, value); | |
699 | if (s.ok()) { | |
700 | rep->AddOrUpdateIndex(key); | |
701 | } | |
702 | return s; | |
703 | } | |
704 | ||
705 | Status WriteBatchWithIndex::PutLogData(const Slice& blob) { | |
706 | return rep->write_batch.PutLogData(blob); | |
707 | } | |
708 | ||
709 | void WriteBatchWithIndex::Clear() { rep->Clear(); } | |
710 | ||
711 | Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, | |
712 | const DBOptions& options, | |
713 | const Slice& key, std::string* value) { | |
714 | Status s; | |
715 | MergeContext merge_context; | |
716 | const ImmutableDBOptions immuable_db_options(options); | |
717 | ||
718 | WriteBatchWithIndexInternal::Result result = | |
719 | WriteBatchWithIndexInternal::GetFromBatch( | |
720 | immuable_db_options, this, column_family, key, &merge_context, | |
721 | &rep->comparator, value, rep->overwrite_key, &s); | |
722 | ||
723 | switch (result) { | |
724 | case WriteBatchWithIndexInternal::Result::kFound: | |
725 | case WriteBatchWithIndexInternal::Result::kError: | |
726 | // use returned status | |
727 | break; | |
728 | case WriteBatchWithIndexInternal::Result::kDeleted: | |
729 | case WriteBatchWithIndexInternal::Result::kNotFound: | |
730 | s = Status::NotFound(); | |
731 | break; | |
732 | case WriteBatchWithIndexInternal::Result::kMergeInProgress: | |
733 | s = Status::MergeInProgress(); | |
734 | break; | |
735 | default: | |
736 | assert(false); | |
737 | } | |
738 | ||
739 | return s; | |
740 | } | |
741 | ||
742 | Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, | |
743 | const ReadOptions& read_options, | |
744 | const Slice& key, | |
745 | std::string* value) { | |
746 | return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key, | |
747 | value); | |
748 | } | |
749 | ||
750 | Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, | |
751 | const ReadOptions& read_options, | |
752 | ColumnFamilyHandle* column_family, | |
753 | const Slice& key, | |
754 | std::string* value) { | |
755 | Status s; | |
756 | MergeContext merge_context; | |
757 | const ImmutableDBOptions& immuable_db_options = | |
758 | reinterpret_cast<DBImpl*>(db)->immutable_db_options(); | |
759 | ||
760 | std::string batch_value; | |
761 | WriteBatchWithIndexInternal::Result result = | |
762 | WriteBatchWithIndexInternal::GetFromBatch( | |
763 | immuable_db_options, this, column_family, key, &merge_context, | |
764 | &rep->comparator, &batch_value, rep->overwrite_key, &s); | |
765 | ||
766 | if (result == WriteBatchWithIndexInternal::Result::kFound) { | |
767 | value->assign(batch_value.data(), batch_value.size()); | |
768 | return s; | |
769 | } | |
770 | if (result == WriteBatchWithIndexInternal::Result::kDeleted) { | |
771 | return Status::NotFound(); | |
772 | } | |
773 | if (result == WriteBatchWithIndexInternal::Result::kError) { | |
774 | return s; | |
775 | } | |
776 | if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && | |
777 | rep->overwrite_key == true) { | |
778 | // Since we've overwritten keys, we do not know what other operations are | |
779 | // in this batch for this key, so we cannot do a Merge to compute the | |
780 | // result. Instead, we will simply return MergeInProgress. | |
781 | return Status::MergeInProgress(); | |
782 | } | |
783 | ||
784 | assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || | |
785 | result == WriteBatchWithIndexInternal::Result::kNotFound); | |
786 | ||
787 | // Did not find key in batch OR could not resolve Merges. Try DB. | |
788 | s = db->Get(read_options, column_family, key, value); | |
789 | ||
790 | if (s.ok() || s.IsNotFound()) { // DB Get Succeeded | |
791 | if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) { | |
792 | // Merge result from DB with merges in Batch | |
793 | auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); | |
794 | const MergeOperator* merge_operator = | |
795 | cfh->cfd()->ioptions()->merge_operator; | |
796 | Statistics* statistics = immuable_db_options.statistics.get(); | |
797 | Env* env = immuable_db_options.env; | |
798 | Logger* logger = immuable_db_options.info_log.get(); | |
799 | ||
800 | Slice db_slice(*value); | |
801 | Slice* merge_data; | |
802 | if (s.ok()) { | |
803 | merge_data = &db_slice; | |
804 | } else { // Key not present in db (s.IsNotFound()) | |
805 | merge_data = nullptr; | |
806 | } | |
807 | ||
808 | if (merge_operator) { | |
809 | s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data, | |
810 | merge_context.GetOperands(), value, | |
811 | logger, statistics, env); | |
812 | } else { | |
813 | s = Status::InvalidArgument("Options::merge_operator must be set"); | |
814 | } | |
815 | } | |
816 | } | |
817 | ||
818 | return s; | |
819 | } | |
820 | ||
821 | void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); } | |
822 | ||
823 | Status WriteBatchWithIndex::RollbackToSavePoint() { | |
824 | Status s = rep->write_batch.RollbackToSavePoint(); | |
825 | ||
826 | if (s.ok()) { | |
827 | s = rep->ReBuildIndex(); | |
828 | } | |
829 | ||
830 | return s; | |
831 | } | |
832 | ||
833 | void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) { | |
834 | rep->write_batch.SetMaxBytes(max_bytes); | |
835 | } | |
836 | ||
837 | } // namespace rocksdb | |
838 | #endif // !ROCKSDB_LITE |