]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include "rocksdb/utilities/write_batch_with_index.h" | |
9 | ||
7c673cae FG |
10 | #include <memory> |
11 | ||
12 | #include "db/column_family.h" | |
f67539c2 | 13 | #include "db/db_impl/db_impl.h" |
7c673cae FG |
14 | #include "db/merge_context.h" |
15 | #include "db/merge_helper.h" | |
f67539c2 | 16 | #include "memory/arena.h" |
7c673cae FG |
17 | #include "memtable/skiplist.h" |
18 | #include "options/db_options.h" | |
19 | #include "rocksdb/comparator.h" | |
20 | #include "rocksdb/iterator.h" | |
11fdf7f2 TL |
21 | #include "util/cast_util.h" |
22 | #include "util/string_util.h" | |
7c673cae FG |
23 | #include "utilities/write_batch_with_index/write_batch_with_index_internal.h" |
24 | ||
f67539c2 | 25 | namespace ROCKSDB_NAMESPACE { |
7c673cae | 26 | struct WriteBatchWithIndex::Rep { |
11fdf7f2 | 27 | explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, |
1e59de90 TL |
28 | size_t max_bytes = 0, bool _overwrite_key = false, |
29 | size_t protection_bytes_per_key = 0) | |
30 | : write_batch(reserved_bytes, max_bytes, protection_bytes_per_key, | |
31 | index_comparator ? index_comparator->timestamp_size() : 0), | |
7c673cae FG |
32 | comparator(index_comparator, &write_batch), |
33 | skip_list(comparator, &arena), | |
34 | overwrite_key(_overwrite_key), | |
11fdf7f2 TL |
35 | last_entry_offset(0), |
36 | last_sub_batch_offset(0), | |
37 | sub_batch_cnt(1) {} | |
7c673cae FG |
38 | ReadableWriteBatch write_batch; |
39 | WriteBatchEntryComparator comparator; | |
40 | Arena arena; | |
41 | WriteBatchEntrySkipList skip_list; | |
42 | bool overwrite_key; | |
43 | size_t last_entry_offset; | |
11fdf7f2 TL |
44 | // The starting offset of the last sub-batch. A sub-batch starts right before |
45 | // inserting a key that is a duplicate of a key in the last sub-batch. Zero, | |
46 | // the default, means that no duplicate key is detected so far. | |
47 | size_t last_sub_batch_offset; | |
48 | // Total number of sub-batches in the write batch. Default is 1. | |
49 | size_t sub_batch_cnt; | |
7c673cae FG |
50 | |
51 | // Remember current offset of internal write batch, which is used as | |
52 | // the starting offset of the next record. | |
53 | void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); } | |
54 | ||
55 | // In overwrite mode, find the existing entry for the same key and update it | |
56 | // to point to the current entry. | |
57 | // Return true if the key is found and updated. | |
1e59de90 TL |
58 | bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key, |
59 | WriteType type); | |
60 | bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key, | |
61 | WriteType type); | |
7c673cae FG |
62 | |
63 | // Add the recent entry to the update. | |
64 | // In overwrite mode, if key already exists in the index, update it. | |
1e59de90 TL |
65 | void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key, |
66 | WriteType type); | |
67 | void AddOrUpdateIndex(const Slice& key, WriteType type); | |
7c673cae FG |
68 | |
69 | // Allocate an index entry pointing to the last entry in the write batch and | |
70 | // put it to skip list. | |
71 | void AddNewEntry(uint32_t column_family_id); | |
72 | ||
73 | // Clear all updates buffered in this batch. | |
74 | void Clear(); | |
75 | void ClearIndex(); | |
76 | ||
77 | // Rebuild index by reading all records from the batch. | |
78 | // Returns non-ok status on corruption. | |
79 | Status ReBuildIndex(); | |
80 | }; | |
81 | ||
82 | bool WriteBatchWithIndex::Rep::UpdateExistingEntry( | |
1e59de90 | 83 | ColumnFamilyHandle* column_family, const Slice& key, WriteType type) { |
7c673cae | 84 | uint32_t cf_id = GetColumnFamilyID(column_family); |
1e59de90 | 85 | return UpdateExistingEntryWithCfId(cf_id, key, type); |
7c673cae FG |
86 | } |
87 | ||
88 | bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( | |
1e59de90 | 89 | uint32_t column_family_id, const Slice& key, WriteType type) { |
7c673cae FG |
90 | if (!overwrite_key) { |
91 | return false; | |
92 | } | |
93 | ||
1e59de90 TL |
94 | WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch, |
95 | &comparator); | |
7c673cae FG |
96 | iter.Seek(key); |
97 | if (!iter.Valid()) { | |
98 | return false; | |
1e59de90 | 99 | } else if (!iter.MatchesKey(column_family_id, key)) { |
7c673cae | 100 | return false; |
1e59de90 TL |
101 | } else { |
102 | // Move to the end of this key (NextKey-Prev) | |
103 | iter.NextKey(); // Move to the next key | |
104 | if (iter.Valid()) { | |
105 | iter.Prev(); // Move back one entry | |
106 | } else { | |
107 | iter.SeekToLast(); | |
108 | } | |
7c673cae FG |
109 | } |
110 | WriteBatchIndexEntry* non_const_entry = | |
111 | const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry()); | |
11fdf7f2 TL |
112 | if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) { |
113 | last_sub_batch_offset = last_entry_offset; | |
114 | sub_batch_cnt++; | |
115 | } | |
1e59de90 TL |
116 | if (type == kMergeRecord) { |
117 | return false; | |
118 | } else { | |
119 | non_const_entry->offset = last_entry_offset; | |
120 | return true; | |
121 | } | |
7c673cae FG |
122 | } |
123 | ||
124 | void WriteBatchWithIndex::Rep::AddOrUpdateIndex( | |
1e59de90 TL |
125 | ColumnFamilyHandle* column_family, const Slice& key, WriteType type) { |
126 | if (!UpdateExistingEntry(column_family, key, type)) { | |
7c673cae FG |
127 | uint32_t cf_id = GetColumnFamilyID(column_family); |
128 | const auto* cf_cmp = GetColumnFamilyUserComparator(column_family); | |
129 | if (cf_cmp != nullptr) { | |
130 | comparator.SetComparatorForCF(cf_id, cf_cmp); | |
131 | } | |
132 | AddNewEntry(cf_id); | |
133 | } | |
134 | } | |
135 | ||
1e59de90 TL |
136 | void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key, |
137 | WriteType type) { | |
138 | if (!UpdateExistingEntryWithCfId(0, key, type)) { | |
7c673cae FG |
139 | AddNewEntry(0); |
140 | } | |
141 | } | |
142 | ||
143 | void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { | |
144 | const std::string& wb_data = write_batch.Data(); | |
145 | Slice entry_ptr = Slice(wb_data.data() + last_entry_offset, | |
146 | wb_data.size() - last_entry_offset); | |
147 | // Extract key | |
148 | Slice key; | |
1e59de90 | 149 | bool success = |
7c673cae | 150 | ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0); |
1e59de90 TL |
151 | #ifdef NDEBUG |
152 | (void)success; | |
153 | #endif | |
7c673cae FG |
154 | assert(success); |
155 | ||
1e59de90 TL |
156 | const Comparator* const ucmp = comparator.GetComparator(column_family_id); |
157 | size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0; | |
158 | ||
159 | if (ts_sz > 0) { | |
160 | key.remove_suffix(ts_sz); | |
161 | } | |
162 | ||
11fdf7f2 TL |
163 | auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry)); |
164 | auto* index_entry = | |
165 | new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id, | |
1e59de90 | 166 | key.data() - wb_data.data(), key.size()); |
11fdf7f2 TL |
167 | skip_list.Insert(index_entry); |
168 | } | |
7c673cae | 169 | |
11fdf7f2 TL |
170 | void WriteBatchWithIndex::Rep::Clear() { |
171 | write_batch.Clear(); | |
172 | ClearIndex(); | |
173 | } | |
7c673cae | 174 | |
11fdf7f2 TL |
175 | void WriteBatchWithIndex::Rep::ClearIndex() { |
176 | skip_list.~WriteBatchEntrySkipList(); | |
177 | arena.~Arena(); | |
178 | new (&arena) Arena(); | |
179 | new (&skip_list) WriteBatchEntrySkipList(comparator, &arena); | |
180 | last_entry_offset = 0; | |
181 | last_sub_batch_offset = 0; | |
182 | sub_batch_cnt = 1; | |
183 | } | |
7c673cae | 184 | |
11fdf7f2 TL |
185 | Status WriteBatchWithIndex::Rep::ReBuildIndex() { |
186 | Status s; | |
7c673cae | 187 | |
11fdf7f2 | 188 | ClearIndex(); |
7c673cae | 189 | |
11fdf7f2 TL |
190 | if (write_batch.Count() == 0) { |
191 | // Nothing to re-index | |
192 | return s; | |
193 | } | |
7c673cae | 194 | |
11fdf7f2 | 195 | size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch); |
7c673cae | 196 | |
11fdf7f2 TL |
197 | Slice input(write_batch.Data()); |
198 | input.remove_prefix(offset); | |
7c673cae | 199 | |
11fdf7f2 | 200 | // Loop through all entries in Rep and add each one to the index |
f67539c2 | 201 | uint32_t found = 0; |
11fdf7f2 TL |
202 | while (s.ok() && !input.empty()) { |
203 | Slice key, value, blob, xid; | |
204 | uint32_t column_family_id = 0; // default | |
205 | char tag = 0; | |
7c673cae | 206 | |
11fdf7f2 TL |
207 | // set offset of current entry for call to AddNewEntry() |
208 | last_entry_offset = input.data() - write_batch.Data().data(); | |
7c673cae | 209 | |
1e59de90 TL |
210 | s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, &value, |
211 | &blob, &xid); | |
11fdf7f2 TL |
212 | if (!s.ok()) { |
213 | break; | |
7c673cae FG |
214 | } |
215 | ||
11fdf7f2 TL |
216 | switch (tag) { |
217 | case kTypeColumnFamilyValue: | |
218 | case kTypeValue: | |
1e59de90 TL |
219 | found++; |
220 | if (!UpdateExistingEntryWithCfId(column_family_id, key, kPutRecord)) { | |
221 | AddNewEntry(column_family_id); | |
222 | } | |
223 | break; | |
11fdf7f2 TL |
224 | case kTypeColumnFamilyDeletion: |
225 | case kTypeDeletion: | |
1e59de90 TL |
226 | found++; |
227 | if (!UpdateExistingEntryWithCfId(column_family_id, key, | |
228 | kDeleteRecord)) { | |
229 | AddNewEntry(column_family_id); | |
230 | } | |
231 | break; | |
11fdf7f2 TL |
232 | case kTypeColumnFamilySingleDeletion: |
233 | case kTypeSingleDeletion: | |
1e59de90 TL |
234 | found++; |
235 | if (!UpdateExistingEntryWithCfId(column_family_id, key, | |
236 | kSingleDeleteRecord)) { | |
237 | AddNewEntry(column_family_id); | |
238 | } | |
239 | break; | |
11fdf7f2 TL |
240 | case kTypeColumnFamilyMerge: |
241 | case kTypeMerge: | |
242 | found++; | |
1e59de90 | 243 | if (!UpdateExistingEntryWithCfId(column_family_id, key, kMergeRecord)) { |
11fdf7f2 TL |
244 | AddNewEntry(column_family_id); |
245 | } | |
246 | break; | |
247 | case kTypeLogData: | |
248 | case kTypeBeginPrepareXID: | |
249 | case kTypeBeginPersistedPrepareXID: | |
250 | case kTypeBeginUnprepareXID: | |
251 | case kTypeEndPrepareXID: | |
252 | case kTypeCommitXID: | |
1e59de90 | 253 | case kTypeCommitXIDAndTimestamp: |
11fdf7f2 TL |
254 | case kTypeRollbackXID: |
255 | case kTypeNoop: | |
256 | break; | |
257 | default: | |
1e59de90 TL |
258 | return Status::Corruption( |
259 | "unknown WriteBatch tag in ReBuildIndex", | |
260 | std::to_string(static_cast<unsigned int>(tag))); | |
7c673cae | 261 | } |
11fdf7f2 | 262 | } |
7c673cae | 263 | |
11fdf7f2 TL |
264 | if (s.ok() && found != write_batch.Count()) { |
265 | s = Status::Corruption("WriteBatch has wrong count"); | |
7c673cae FG |
266 | } |
267 | ||
11fdf7f2 TL |
268 | return s; |
269 | } | |
270 | ||
271 | WriteBatchWithIndex::WriteBatchWithIndex( | |
272 | const Comparator* default_index_comparator, size_t reserved_bytes, | |
1e59de90 | 273 | bool overwrite_key, size_t max_bytes, size_t protection_bytes_per_key) |
11fdf7f2 | 274 | : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes, |
1e59de90 | 275 | overwrite_key, protection_bytes_per_key)) {} |
7c673cae | 276 | |
11fdf7f2 | 277 | WriteBatchWithIndex::~WriteBatchWithIndex() {} |
7c673cae | 278 | |
f67539c2 TL |
279 | WriteBatchWithIndex::WriteBatchWithIndex(WriteBatchWithIndex&&) = default; |
280 | ||
281 | WriteBatchWithIndex& WriteBatchWithIndex::operator=(WriteBatchWithIndex&&) = | |
282 | default; | |
283 | ||
11fdf7f2 | 284 | WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } |
7c673cae | 285 | |
11fdf7f2 TL |
286 | size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; } |
287 | ||
288 | WBWIIterator* WriteBatchWithIndex::NewIterator() { | |
1e59de90 TL |
289 | return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch, |
290 | &(rep->comparator)); | |
7c673cae FG |
291 | } |
292 | ||
293 | WBWIIterator* WriteBatchWithIndex::NewIterator( | |
294 | ColumnFamilyHandle* column_family) { | |
295 | return new WBWIIteratorImpl(GetColumnFamilyID(column_family), | |
1e59de90 TL |
296 | &(rep->skip_list), &rep->write_batch, |
297 | &(rep->comparator)); | |
7c673cae FG |
298 | } |
299 | ||
300 | Iterator* WriteBatchWithIndex::NewIteratorWithBase( | |
f67539c2 TL |
301 | ColumnFamilyHandle* column_family, Iterator* base_iterator, |
302 | const ReadOptions* read_options) { | |
1e59de90 TL |
303 | auto wbwiii = |
304 | new WBWIIteratorImpl(GetColumnFamilyID(column_family), &(rep->skip_list), | |
305 | &rep->write_batch, &rep->comparator); | |
306 | return new BaseDeltaIterator(column_family, base_iterator, wbwiii, | |
f67539c2 TL |
307 | GetColumnFamilyUserComparator(column_family), |
308 | read_options); | |
7c673cae FG |
309 | } |
310 | ||
311 | Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) { | |
7c673cae | 312 | // default column family's comparator |
1e59de90 TL |
313 | auto wbwiii = new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch, |
314 | &rep->comparator); | |
315 | return new BaseDeltaIterator(nullptr, base_iterator, wbwiii, | |
7c673cae FG |
316 | rep->comparator.default_comparator()); |
317 | } | |
318 | ||
319 | Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, | |
320 | const Slice& key, const Slice& value) { | |
321 | rep->SetLastEntryOffset(); | |
322 | auto s = rep->write_batch.Put(column_family, key, value); | |
323 | if (s.ok()) { | |
1e59de90 | 324 | rep->AddOrUpdateIndex(column_family, key, kPutRecord); |
7c673cae FG |
325 | } |
326 | return s; | |
327 | } | |
328 | ||
329 | Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { | |
330 | rep->SetLastEntryOffset(); | |
331 | auto s = rep->write_batch.Put(key, value); | |
332 | if (s.ok()) { | |
1e59de90 | 333 | rep->AddOrUpdateIndex(key, kPutRecord); |
7c673cae FG |
334 | } |
335 | return s; | |
336 | } | |
337 | ||
1e59de90 TL |
338 | Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, |
339 | const Slice& /*key*/, const Slice& /*ts*/, | |
340 | const Slice& /*value*/) { | |
341 | if (!column_family) { | |
342 | return Status::InvalidArgument("column family handle cannot be nullptr"); | |
343 | } | |
344 | // TODO: support WBWI::Put() with timestamp. | |
345 | return Status::NotSupported(); | |
346 | } | |
347 | ||
7c673cae FG |
348 | Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, |
349 | const Slice& key) { | |
350 | rep->SetLastEntryOffset(); | |
351 | auto s = rep->write_batch.Delete(column_family, key); | |
352 | if (s.ok()) { | |
1e59de90 | 353 | rep->AddOrUpdateIndex(column_family, key, kDeleteRecord); |
7c673cae FG |
354 | } |
355 | return s; | |
356 | } | |
357 | ||
358 | Status WriteBatchWithIndex::Delete(const Slice& key) { | |
359 | rep->SetLastEntryOffset(); | |
360 | auto s = rep->write_batch.Delete(key); | |
361 | if (s.ok()) { | |
1e59de90 | 362 | rep->AddOrUpdateIndex(key, kDeleteRecord); |
7c673cae FG |
363 | } |
364 | return s; | |
365 | } | |
366 | ||
1e59de90 TL |
367 | Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, |
368 | const Slice& /*key*/, const Slice& /*ts*/) { | |
369 | if (!column_family) { | |
370 | return Status::InvalidArgument("column family handle cannot be nullptr"); | |
371 | } | |
372 | // TODO: support WBWI::Delete() with timestamp. | |
373 | return Status::NotSupported(); | |
374 | } | |
375 | ||
7c673cae FG |
376 | Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, |
377 | const Slice& key) { | |
378 | rep->SetLastEntryOffset(); | |
379 | auto s = rep->write_batch.SingleDelete(column_family, key); | |
380 | if (s.ok()) { | |
1e59de90 | 381 | rep->AddOrUpdateIndex(column_family, key, kSingleDeleteRecord); |
7c673cae FG |
382 | } |
383 | return s; | |
384 | } | |
385 | ||
386 | Status WriteBatchWithIndex::SingleDelete(const Slice& key) { | |
387 | rep->SetLastEntryOffset(); | |
388 | auto s = rep->write_batch.SingleDelete(key); | |
389 | if (s.ok()) { | |
1e59de90 | 390 | rep->AddOrUpdateIndex(key, kSingleDeleteRecord); |
7c673cae FG |
391 | } |
392 | return s; | |
393 | } | |
394 | ||
1e59de90 TL |
395 | Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, |
396 | const Slice& /*key*/, | |
397 | const Slice& /*ts*/) { | |
398 | if (!column_family) { | |
399 | return Status::InvalidArgument("column family handle cannot be nullptr"); | |
400 | } | |
401 | // TODO: support WBWI::SingleDelete() with timestamp. | |
402 | return Status::NotSupported(); | |
403 | } | |
404 | ||
7c673cae FG |
405 | Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, |
406 | const Slice& key, const Slice& value) { | |
407 | rep->SetLastEntryOffset(); | |
408 | auto s = rep->write_batch.Merge(column_family, key, value); | |
409 | if (s.ok()) { | |
1e59de90 | 410 | rep->AddOrUpdateIndex(column_family, key, kMergeRecord); |
7c673cae FG |
411 | } |
412 | return s; | |
413 | } | |
414 | ||
415 | Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { | |
416 | rep->SetLastEntryOffset(); | |
417 | auto s = rep->write_batch.Merge(key, value); | |
418 | if (s.ok()) { | |
1e59de90 | 419 | rep->AddOrUpdateIndex(key, kMergeRecord); |
7c673cae FG |
420 | } |
421 | return s; | |
422 | } | |
423 | ||
424 | Status WriteBatchWithIndex::PutLogData(const Slice& blob) { | |
425 | return rep->write_batch.PutLogData(blob); | |
426 | } | |
427 | ||
428 | void WriteBatchWithIndex::Clear() { rep->Clear(); } | |
429 | ||
430 | Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, | |
431 | const DBOptions& options, | |
432 | const Slice& key, std::string* value) { | |
433 | Status s; | |
1e59de90 TL |
434 | WriteBatchWithIndexInternal wbwii(&options, column_family); |
435 | auto result = wbwii.GetFromBatch(this, key, value, &s); | |
7c673cae FG |
436 | |
437 | switch (result) { | |
1e59de90 TL |
438 | case WBWIIteratorImpl::kFound: |
439 | case WBWIIteratorImpl::kError: | |
7c673cae FG |
440 | // use returned status |
441 | break; | |
1e59de90 TL |
442 | case WBWIIteratorImpl::kDeleted: |
443 | case WBWIIteratorImpl::kNotFound: | |
7c673cae FG |
444 | s = Status::NotFound(); |
445 | break; | |
1e59de90 | 446 | case WBWIIteratorImpl::kMergeInProgress: |
7c673cae FG |
447 | s = Status::MergeInProgress(); |
448 | break; | |
449 | default: | |
450 | assert(false); | |
451 | } | |
452 | ||
453 | return s; | |
454 | } | |
455 | ||
456 | Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, | |
457 | const ReadOptions& read_options, | |
458 | const Slice& key, | |
459 | std::string* value) { | |
11fdf7f2 TL |
460 | assert(value != nullptr); |
461 | PinnableSlice pinnable_val(value); | |
462 | assert(!pinnable_val.IsPinned()); | |
463 | auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key, | |
464 | &pinnable_val); | |
465 | if (s.ok() && pinnable_val.IsPinned()) { | |
466 | value->assign(pinnable_val.data(), pinnable_val.size()); | |
467 | } // else value is already assigned | |
468 | return s; | |
469 | } | |
470 | ||
471 | Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, | |
472 | const ReadOptions& read_options, | |
473 | const Slice& key, | |
474 | PinnableSlice* pinnable_val) { | |
7c673cae | 475 | return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key, |
11fdf7f2 | 476 | pinnable_val); |
7c673cae FG |
477 | } |
478 | ||
479 | Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, | |
480 | const ReadOptions& read_options, | |
481 | ColumnFamilyHandle* column_family, | |
482 | const Slice& key, | |
483 | std::string* value) { | |
11fdf7f2 TL |
484 | assert(value != nullptr); |
485 | PinnableSlice pinnable_val(value); | |
486 | assert(!pinnable_val.IsPinned()); | |
487 | auto s = | |
488 | GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val); | |
489 | if (s.ok() && pinnable_val.IsPinned()) { | |
490 | value->assign(pinnable_val.data(), pinnable_val.size()); | |
491 | } // else value is already assigned | |
492 | return s; | |
493 | } | |
494 | ||
495 | Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, | |
496 | const ReadOptions& read_options, | |
497 | ColumnFamilyHandle* column_family, | |
498 | const Slice& key, | |
499 | PinnableSlice* pinnable_val) { | |
500 | return GetFromBatchAndDB(db, read_options, column_family, key, pinnable_val, | |
501 | nullptr); | |
502 | } | |
503 | ||
504 | Status WriteBatchWithIndex::GetFromBatchAndDB( | |
505 | DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, | |
506 | const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) { | |
1e59de90 TL |
507 | const Comparator* const ucmp = rep->comparator.GetComparator(column_family); |
508 | size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0; | |
509 | if (ts_sz > 0 && !read_options.timestamp) { | |
510 | return Status::InvalidArgument("Must specify timestamp"); | |
511 | } | |
512 | ||
7c673cae | 513 | Status s; |
1e59de90 | 514 | WriteBatchWithIndexInternal wbwii(db, column_family); |
7c673cae | 515 | |
11fdf7f2 TL |
516 | // Since the lifetime of the WriteBatch is the same as that of the transaction |
517 | // we cannot pin it as otherwise the returned value will not be available | |
518 | // after the transaction finishes. | |
519 | std::string& batch_value = *pinnable_val->GetSelf(); | |
1e59de90 | 520 | auto result = wbwii.GetFromBatch(this, key, &batch_value, &s); |
7c673cae | 521 | |
1e59de90 | 522 | if (result == WBWIIteratorImpl::kFound) { |
11fdf7f2 | 523 | pinnable_val->PinSelf(); |
7c673cae | 524 | return s; |
1e59de90 | 525 | } else if (!s.ok() || result == WBWIIteratorImpl::kError) { |
7c673cae | 526 | return s; |
1e59de90 TL |
527 | } else if (result == WBWIIteratorImpl::kDeleted) { |
528 | return Status::NotFound(); | |
7c673cae | 529 | } |
1e59de90 TL |
530 | assert(result == WBWIIteratorImpl::kMergeInProgress || |
531 | result == WBWIIteratorImpl::kNotFound); | |
7c673cae FG |
532 | |
533 | // Did not find key in batch OR could not resolve Merges. Try DB. | |
11fdf7f2 TL |
534 | if (!callback) { |
535 | s = db->Get(read_options, column_family, key, pinnable_val); | |
536 | } else { | |
f67539c2 TL |
537 | DBImpl::GetImplOptions get_impl_options; |
538 | get_impl_options.column_family = column_family; | |
539 | get_impl_options.value = pinnable_val; | |
540 | get_impl_options.callback = callback; | |
20effc67 | 541 | s = static_cast_with_check<DBImpl>(db->GetRootDB()) |
f67539c2 | 542 | ->GetImpl(read_options, key, get_impl_options); |
11fdf7f2 | 543 | } |
7c673cae FG |
544 | |
545 | if (s.ok() || s.IsNotFound()) { // DB Get Succeeded | |
1e59de90 | 546 | if (result == WBWIIteratorImpl::kMergeInProgress) { |
7c673cae | 547 | // Merge result from DB with merges in Batch |
1e59de90 | 548 | std::string merge_result; |
7c673cae | 549 | if (s.ok()) { |
1e59de90 | 550 | s = wbwii.MergeKey(key, pinnable_val, &merge_result); |
7c673cae | 551 | } else { // Key not present in db (s.IsNotFound()) |
1e59de90 | 552 | s = wbwii.MergeKey(key, nullptr, &merge_result); |
7c673cae | 553 | } |
1e59de90 | 554 | if (s.ok()) { |
f67539c2 TL |
555 | pinnable_val->Reset(); |
556 | *pinnable_val->GetSelf() = std::move(merge_result); | |
11fdf7f2 | 557 | pinnable_val->PinSelf(); |
7c673cae FG |
558 | } |
559 | } | |
560 | } | |
561 | ||
562 | return s; | |
563 | } | |
564 | ||
f67539c2 TL |
565 | void WriteBatchWithIndex::MultiGetFromBatchAndDB( |
566 | DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, | |
567 | const size_t num_keys, const Slice* keys, PinnableSlice* values, | |
568 | Status* statuses, bool sorted_input) { | |
569 | MultiGetFromBatchAndDB(db, read_options, column_family, num_keys, keys, | |
570 | values, statuses, sorted_input, nullptr); | |
571 | } | |
572 | ||
573 | void WriteBatchWithIndex::MultiGetFromBatchAndDB( | |
574 | DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, | |
575 | const size_t num_keys, const Slice* keys, PinnableSlice* values, | |
576 | Status* statuses, bool sorted_input, ReadCallback* callback) { | |
1e59de90 TL |
577 | const Comparator* const ucmp = rep->comparator.GetComparator(column_family); |
578 | size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0; | |
579 | if (ts_sz > 0 && !read_options.timestamp) { | |
580 | for (size_t i = 0; i < num_keys; ++i) { | |
581 | statuses[i] = Status::InvalidArgument("Must specify timestamp"); | |
582 | } | |
583 | return; | |
584 | } | |
585 | ||
586 | WriteBatchWithIndexInternal wbwii(db, column_family); | |
f67539c2 TL |
587 | |
588 | autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context; | |
589 | autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys; | |
590 | // To hold merges from the write batch | |
1e59de90 | 591 | autovector<std::pair<WBWIIteratorImpl::Result, MergeContext>, |
f67539c2 TL |
592 | MultiGetContext::MAX_BATCH_SIZE> |
593 | merges; | |
594 | // Since the lifetime of the WriteBatch is the same as that of the transaction | |
595 | // we cannot pin it as otherwise the returned value will not be available | |
596 | // after the transaction finishes. | |
597 | for (size_t i = 0; i < num_keys; ++i) { | |
598 | MergeContext merge_context; | |
1e59de90 | 599 | std::string batch_value; |
f67539c2 | 600 | Status* s = &statuses[i]; |
1e59de90 TL |
601 | PinnableSlice* pinnable_val = &values[i]; |
602 | pinnable_val->Reset(); | |
603 | auto result = | |
604 | wbwii.GetFromBatch(this, keys[i], &merge_context, &batch_value, s); | |
f67539c2 | 605 | |
1e59de90 TL |
606 | if (result == WBWIIteratorImpl::kFound) { |
607 | *pinnable_val->GetSelf() = std::move(batch_value); | |
f67539c2 TL |
608 | pinnable_val->PinSelf(); |
609 | continue; | |
610 | } | |
1e59de90 | 611 | if (result == WBWIIteratorImpl::kDeleted) { |
f67539c2 TL |
612 | *s = Status::NotFound(); |
613 | continue; | |
614 | } | |
1e59de90 | 615 | if (result == WBWIIteratorImpl::kError) { |
f67539c2 TL |
616 | continue; |
617 | } | |
1e59de90 TL |
618 | assert(result == WBWIIteratorImpl::kMergeInProgress || |
619 | result == WBWIIteratorImpl::kNotFound); | |
20effc67 TL |
620 | key_context.emplace_back(column_family, keys[i], &values[i], |
621 | /*timestamp*/ nullptr, &statuses[i]); | |
f67539c2 TL |
622 | merges.emplace_back(result, std::move(merge_context)); |
623 | } | |
624 | ||
625 | for (KeyContext& key : key_context) { | |
626 | sorted_keys.emplace_back(&key); | |
627 | } | |
628 | ||
629 | // Did not find key in batch OR could not resolve Merges. Try DB. | |
20effc67 | 630 | static_cast_with_check<DBImpl>(db->GetRootDB()) |
f67539c2 | 631 | ->PrepareMultiGetKeys(key_context.size(), sorted_input, &sorted_keys); |
20effc67 | 632 | static_cast_with_check<DBImpl>(db->GetRootDB()) |
f67539c2 TL |
633 | ->MultiGetWithCallback(read_options, column_family, callback, |
634 | &sorted_keys); | |
635 | ||
f67539c2 TL |
636 | for (auto iter = key_context.begin(); iter != key_context.end(); ++iter) { |
637 | KeyContext& key = *iter; | |
638 | if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded | |
639 | size_t index = iter - key_context.begin(); | |
1e59de90 TL |
640 | std::pair<WBWIIteratorImpl::Result, MergeContext>& merge_result = |
641 | merges[index]; | |
642 | if (merge_result.first == WBWIIteratorImpl::kMergeInProgress) { | |
643 | std::string merged_value; | |
f67539c2 | 644 | // Merge result from DB with merges in Batch |
f67539c2 | 645 | if (key.s->ok()) { |
1e59de90 TL |
646 | *key.s = wbwii.MergeKey(*key.key, iter->value, merge_result.second, |
647 | &merged_value); | |
f67539c2 | 648 | } else { // Key not present in db (s.IsNotFound()) |
1e59de90 TL |
649 | *key.s = wbwii.MergeKey(*key.key, nullptr, merge_result.second, |
650 | &merged_value); | |
f67539c2 | 651 | } |
1e59de90 TL |
652 | if (key.s->ok()) { |
653 | key.value->Reset(); | |
654 | *key.value->GetSelf() = std::move(merged_value); | |
f67539c2 | 655 | key.value->PinSelf(); |
f67539c2 TL |
656 | } |
657 | } | |
658 | } | |
659 | } | |
660 | } | |
661 | ||
7c673cae FG |
662 | void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); } |
663 | ||
664 | Status WriteBatchWithIndex::RollbackToSavePoint() { | |
665 | Status s = rep->write_batch.RollbackToSavePoint(); | |
666 | ||
667 | if (s.ok()) { | |
11fdf7f2 TL |
668 | rep->sub_batch_cnt = 1; |
669 | rep->last_sub_batch_offset = 0; | |
7c673cae FG |
670 | s = rep->ReBuildIndex(); |
671 | } | |
672 | ||
673 | return s; | |
674 | } | |
675 | ||
11fdf7f2 TL |
676 | Status WriteBatchWithIndex::PopSavePoint() { |
677 | return rep->write_batch.PopSavePoint(); | |
678 | } | |
679 | ||
7c673cae FG |
680 | void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) { |
681 | rep->write_batch.SetMaxBytes(max_bytes); | |
682 | } | |
683 | ||
11fdf7f2 TL |
684 | size_t WriteBatchWithIndex::GetDataSize() const { |
685 | return rep->write_batch.GetDataSize(); | |
686 | } | |
687 | ||
1e59de90 TL |
688 | const Comparator* WriteBatchWithIndexInternal::GetUserComparator( |
689 | const WriteBatchWithIndex& wbwi, uint32_t cf_id) { | |
690 | const WriteBatchEntryComparator& ucmps = wbwi.rep->comparator; | |
691 | return ucmps.GetComparator(cf_id); | |
692 | } | |
693 | ||
f67539c2 | 694 | } // namespace ROCKSDB_NAMESPACE |
7c673cae | 695 | #endif // !ROCKSDB_LITE |