]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include "utilities/transactions/write_prepared_txn.h" | |
9 | ||
f67539c2 | 10 | #include <cinttypes> |
11fdf7f2 TL |
11 | #include <map> |
12 | #include <set> | |
13 | ||
14 | #include "db/column_family.h" | |
f67539c2 | 15 | #include "db/db_impl/db_impl.h" |
11fdf7f2 TL |
16 | #include "rocksdb/db.h" |
17 | #include "rocksdb/status.h" | |
18 | #include "rocksdb/utilities/transaction_db.h" | |
19 | #include "util/cast_util.h" | |
20 | #include "utilities/transactions/pessimistic_transaction.h" | |
21 | #include "utilities/transactions/write_prepared_txn_db.h" | |
22 | ||
f67539c2 | 23 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
24 | |
25 | struct WriteOptions; | |
26 | ||
27 | WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db, | |
28 | const WriteOptions& write_options, | |
29 | const TransactionOptions& txn_options) | |
494da23a TL |
30 | : PessimisticTransaction(txn_db, write_options, txn_options, false), |
31 | wpt_db_(txn_db) { | |
32 | // Call Initialize outside PessimisticTransaction constructor otherwise it | |
33 | // would skip overridden functions in WritePreparedTxn since they are not | |
34 | // defined yet in the constructor of PessimisticTransaction | |
35 | Initialize(txn_options); | |
36 | } | |
11fdf7f2 TL |
37 | |
38 | void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) { | |
39 | PessimisticTransaction::Initialize(txn_options); | |
40 | prepare_batch_cnt_ = 0; | |
41 | } | |
42 | ||
f67539c2 TL |
43 | void WritePreparedTxn::MultiGet(const ReadOptions& options, |
44 | ColumnFamilyHandle* column_family, | |
45 | const size_t num_keys, const Slice* keys, | |
46 | PinnableSlice* values, Status* statuses, | |
47 | const bool sorted_input) { | |
48 | SequenceNumber min_uncommitted, snap_seq; | |
49 | const SnapshotBackup backed_by_snapshot = | |
50 | wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); | |
51 | WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted, | |
52 | backed_by_snapshot); | |
53 | write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys, | |
54 | keys, values, statuses, sorted_input, | |
55 | &callback); | |
56 | if (UNLIKELY(!callback.valid() || | |
57 | !wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { | |
58 | wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN); | |
59 | for (size_t i = 0; i < num_keys; i++) { | |
60 | statuses[i] = Status::TryAgain(); | |
61 | } | |
62 | } | |
63 | } | |
64 | ||
65 | Status WritePreparedTxn::Get(const ReadOptions& options, | |
11fdf7f2 TL |
66 | ColumnFamilyHandle* column_family, |
67 | const Slice& key, PinnableSlice* pinnable_val) { | |
f67539c2 TL |
68 | SequenceNumber min_uncommitted, snap_seq; |
69 | const SnapshotBackup backed_by_snapshot = | |
70 | wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); | |
71 | WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted, | |
72 | backed_by_snapshot); | |
73 | auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key, | |
74 | pinnable_val, &callback); | |
75 | if (LIKELY(callback.valid() && | |
76 | wpt_db_->ValidateSnapshot(callback.max_visible_seq(), | |
77 | backed_by_snapshot))) { | |
78 | return res; | |
79 | } else { | |
80 | wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN); | |
81 | return Status::TryAgain(); | |
11fdf7f2 | 82 | } |
11fdf7f2 TL |
83 | } |
84 | ||
85 | Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) { | |
86 | // Make sure to get iterator from WritePrepareTxnDB, not the root db. | |
87 | Iterator* db_iter = wpt_db_->NewIterator(options); | |
88 | assert(db_iter); | |
89 | ||
90 | return write_batch_.NewIteratorWithBase(db_iter); | |
91 | } | |
92 | ||
93 | Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options, | |
94 | ColumnFamilyHandle* column_family) { | |
95 | // Make sure to get iterator from WritePrepareTxnDB, not the root db. | |
96 | Iterator* db_iter = wpt_db_->NewIterator(options, column_family); | |
97 | assert(db_iter); | |
98 | ||
99 | return write_batch_.NewIteratorWithBase(column_family, db_iter); | |
100 | } | |
101 | ||
102 | Status WritePreparedTxn::PrepareInternal() { | |
103 | WriteOptions write_options = write_options_; | |
104 | write_options.disableWAL = false; | |
105 | const bool WRITE_AFTER_COMMIT = true; | |
494da23a | 106 | const bool kFirstPrepareBatch = true; |
11fdf7f2 TL |
107 | WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_, |
108 | !WRITE_AFTER_COMMIT); | |
109 | // For each duplicate key we account for a new sub-batch | |
110 | prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); | |
494da23a TL |
111 | // Having AddPrepared in the PreReleaseCallback allows in-order addition of |
112 | // prepared entries to PreparedHeap and hence enables an optimization. Refer to | |
11fdf7f2 TL |
113 | // SmallestUnCommittedSeq for more details. |
114 | AddPreparedCallback add_prepared_callback( | |
494da23a TL |
115 | wpt_db_, db_impl_, prepare_batch_cnt_, |
116 | db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch); | |
11fdf7f2 TL |
117 | const bool DISABLE_MEMTABLE = true; |
118 | uint64_t seq_used = kMaxSequenceNumber; | |
119 | Status s = db_impl_->WriteImpl( | |
120 | write_options, GetWriteBatch()->GetWriteBatch(), | |
121 | /*callback*/ nullptr, &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE, | |
122 | &seq_used, prepare_batch_cnt_, &add_prepared_callback); | |
123 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
124 | auto prepare_seq = seq_used; | |
125 | SetId(prepare_seq); | |
126 | return s; | |
127 | } | |
128 | ||
129 | Status WritePreparedTxn::CommitWithoutPrepareInternal() { | |
130 | // For each duplicate key we account for a new sub-batch | |
131 | const size_t batch_cnt = GetWriteBatch()->SubBatchCnt(); | |
132 | return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt); | |
133 | } | |
134 | ||
135 | Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch, | |
136 | size_t batch_cnt) { | |
137 | return wpt_db_->WriteInternal(write_options_, batch, batch_cnt, this); | |
138 | } | |
139 | ||
140 | Status WritePreparedTxn::CommitInternal() { | |
141 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, | |
142 | "CommitInternal prepare_seq: %" PRIu64, GetID()); | |
143 | // We take the commit-time batch and append the Commit marker. | |
144 | // The Memtable will ignore the Commit marker in non-recovery mode | |
145 | WriteBatch* working_batch = GetCommitTimeWriteBatch(); | |
146 | const bool empty = working_batch->Count() == 0; | |
147 | WriteBatchInternal::MarkCommit(working_batch, name_); | |
148 | ||
149 | const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; | |
150 | if (!empty && for_recovery) { | |
151 | // When not writing to memtable, we can still cache the latest write batch. | |
152 | // The cached batch will be written to memtable in WriteRecoverableState | |
153 | // during FlushMemTable | |
154 | WriteBatchInternal::SetAsLastestPersistentState(working_batch); | |
155 | } | |
156 | ||
157 | auto prepare_seq = GetId(); | |
158 | const bool includes_data = !empty && !for_recovery; | |
159 | assert(prepare_batch_cnt_); | |
160 | size_t commit_batch_cnt = 0; | |
161 | if (UNLIKELY(includes_data)) { | |
162 | ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, | |
163 | "Duplicate key overhead"); | |
164 | SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); | |
165 | auto s = working_batch->Iterate(&counter); | |
166 | assert(s.ok()); | |
167 | commit_batch_cnt = counter.BatchCount(); | |
168 | } | |
169 | const bool disable_memtable = !includes_data; | |
170 | const bool do_one_write = | |
171 | !db_impl_->immutable_db_options().two_write_queues || disable_memtable; | |
11fdf7f2 | 172 | WritePreparedCommitEntryPreReleaseCallback update_commit_map( |
494da23a TL |
173 | wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt); |
174 | // This is to call AddPrepared on CommitTimeWriteBatch | |
175 | const bool kFirstPrepareBatch = true; | |
176 | AddPreparedCallback add_prepared_callback( | |
177 | wpt_db_, db_impl_, commit_batch_cnt, | |
178 | db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch); | |
179 | PreReleaseCallback* pre_release_callback; | |
180 | if (do_one_write) { | |
181 | pre_release_callback = &update_commit_map; | |
182 | } else { | |
183 | pre_release_callback = &add_prepared_callback; | |
184 | } | |
11fdf7f2 TL |
185 | uint64_t seq_used = kMaxSequenceNumber; |
186 | // Since the prepared batch is directly written to memtable, there is already | |
187 | // a connection between the memtable and its WAL, so there is no need to | |
188 | // redundantly reference the log that contains the prepared data. | |
189 | const uint64_t zero_log_number = 0ull; | |
190 | size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; | |
191 | auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, | |
192 | zero_log_number, disable_memtable, &seq_used, | |
494da23a | 193 | batch_cnt, pre_release_callback); |
11fdf7f2 | 194 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
494da23a | 195 | const SequenceNumber commit_batch_seq = seq_used; |
11fdf7f2 | 196 | if (LIKELY(do_one_write || !s.ok())) { |
f67539c2 TL |
197 | if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues && |
198 | s.ok())) { | |
199 | // Note: RemovePrepared should be called after WriteImpl that publishsed | |
11fdf7f2 TL |
200 | // the seq. Otherwise SmallestUnCommittedSeq optimization breaks. |
201 | wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); | |
f67539c2 | 202 | } // else RemovePrepared is called from within PreReleaseCallback |
494da23a | 203 | if (UNLIKELY(!do_one_write)) { |
f67539c2 TL |
204 | assert(!s.ok()); |
205 | // Cleanup the prepared entry we added with add_prepared_callback | |
494da23a TL |
206 | wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); |
207 | } | |
11fdf7f2 TL |
208 | return s; |
209 | } // else do the 2nd write to publish seq | |
210 | // Note: the 2nd write comes with a performance penality. So if we have too | |
211 | // many of commits accompanied with ComitTimeWriteBatch and yet we cannot | |
212 | // enable use_only_the_last_commit_time_batch_for_recovery_ optimization, | |
213 | // two_write_queues should be disabled to avoid many additional writes here. | |
494da23a TL |
214 | const size_t kZeroData = 0; |
215 | // Update commit map only from the 2nd queue | |
216 | WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch( | |
217 | wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData, | |
218 | commit_batch_seq, commit_batch_cnt); | |
11fdf7f2 TL |
219 | WriteBatch empty_batch; |
220 | empty_batch.PutLogData(Slice()); | |
221 | // In the absence of Prepare markers, use Noop as a batch separator | |
222 | WriteBatchInternal::InsertNoop(&empty_batch); | |
223 | const bool DISABLE_MEMTABLE = true; | |
224 | const size_t ONE_BATCH = 1; | |
225 | const uint64_t NO_REF_LOG = 0; | |
226 | s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, | |
227 | NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
494da23a | 228 | &update_commit_map_with_aux_batch); |
11fdf7f2 | 229 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
f67539c2 TL |
230 | if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues)) { |
231 | if (s.ok()) { | |
232 | // Note: RemovePrepared should be called after WriteImpl that publishsed | |
233 | // the seq. Otherwise SmallestUnCommittedSeq optimization breaks. | |
234 | wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); | |
235 | } | |
236 | wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); | |
237 | } // else RemovePrepared is called from within PreReleaseCallback | |
11fdf7f2 TL |
238 | return s; |
239 | } | |
240 | ||
241 | Status WritePreparedTxn::RollbackInternal() { | |
242 | ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, | |
243 | "RollbackInternal prepare_seq: %" PRIu64, GetId()); | |
244 | WriteBatch rollback_batch; | |
245 | assert(GetId() != kMaxSequenceNumber); | |
246 | assert(GetId() > 0); | |
247 | auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap(); | |
248 | auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap(); | |
494da23a | 249 | auto read_at_seq = kMaxSequenceNumber; |
f67539c2 TL |
250 | ReadOptions roptions; |
251 | // to prevent callback's seq to be overrriden inside DBImpk::Get | |
252 | roptions.snapshot = wpt_db_->GetMaxSnapshot(); | |
11fdf7f2 TL |
253 | struct RollbackWriteBatchBuilder : public WriteBatch::Handler { |
254 | DBImpl* db_; | |
11fdf7f2 TL |
255 | WritePreparedTxnReadCallback callback; |
256 | WriteBatch* rollback_batch_; | |
257 | std::map<uint32_t, const Comparator*>& comparators_; | |
258 | std::map<uint32_t, ColumnFamilyHandle*>& handles_; | |
259 | using CFKeys = std::set<Slice, SetComparator>; | |
260 | std::map<uint32_t, CFKeys> keys_; | |
261 | bool rollback_merge_operands_; | |
f67539c2 | 262 | ReadOptions roptions_; |
11fdf7f2 TL |
263 | RollbackWriteBatchBuilder( |
264 | DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq, | |
265 | WriteBatch* dst_batch, | |
266 | std::map<uint32_t, const Comparator*>& comparators, | |
267 | std::map<uint32_t, ColumnFamilyHandle*>& handles, | |
f67539c2 | 268 | bool rollback_merge_operands, ReadOptions _roptions) |
11fdf7f2 | 269 | : db_(db), |
494da23a | 270 | callback(wpt_db, snap_seq), // disable min_uncommitted optimization |
11fdf7f2 TL |
271 | rollback_batch_(dst_batch), |
272 | comparators_(comparators), | |
273 | handles_(handles), | |
f67539c2 TL |
274 | rollback_merge_operands_(rollback_merge_operands), |
275 | roptions_(_roptions) {} | |
11fdf7f2 TL |
276 | |
277 | Status Rollback(uint32_t cf, const Slice& key) { | |
278 | Status s; | |
279 | CFKeys& cf_keys = keys_[cf]; | |
280 | if (cf_keys.size() == 0) { // just inserted | |
281 | auto cmp = comparators_[cf]; | |
282 | keys_[cf] = CFKeys(SetComparator(cmp)); | |
283 | } | |
284 | auto it = cf_keys.insert(key); | |
285 | if (it.second == | |
286 | false) { // second is false if a element already existed. | |
287 | return s; | |
288 | } | |
289 | ||
290 | PinnableSlice pinnable_val; | |
291 | bool not_used; | |
292 | auto cf_handle = handles_[cf]; | |
f67539c2 TL |
293 | DBImpl::GetImplOptions get_impl_options; |
294 | get_impl_options.column_family = cf_handle; | |
295 | get_impl_options.value = &pinnable_val; | |
296 | get_impl_options.value_found = ¬_used; | |
297 | get_impl_options.callback = &callback; | |
298 | s = db_->GetImpl(roptions_, key, get_impl_options); | |
11fdf7f2 TL |
299 | assert(s.ok() || s.IsNotFound()); |
300 | if (s.ok()) { | |
301 | s = rollback_batch_->Put(cf_handle, key, pinnable_val); | |
302 | assert(s.ok()); | |
303 | } else if (s.IsNotFound()) { | |
304 | // There has been no readable value before txn. By adding a delete we | |
305 | // make sure that there will be none afterwards either. | |
306 | s = rollback_batch_->Delete(cf_handle, key); | |
307 | assert(s.ok()); | |
308 | } else { | |
309 | // Unexpected status. Return it to the user. | |
310 | } | |
311 | return s; | |
312 | } | |
313 | ||
314 | Status PutCF(uint32_t cf, const Slice& key, const Slice& /*val*/) override { | |
315 | return Rollback(cf, key); | |
316 | } | |
317 | ||
318 | Status DeleteCF(uint32_t cf, const Slice& key) override { | |
319 | return Rollback(cf, key); | |
320 | } | |
321 | ||
322 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { | |
323 | return Rollback(cf, key); | |
324 | } | |
325 | ||
326 | Status MergeCF(uint32_t cf, const Slice& key, | |
327 | const Slice& /*val*/) override { | |
328 | if (rollback_merge_operands_) { | |
329 | return Rollback(cf, key); | |
330 | } else { | |
331 | return Status::OK(); | |
332 | } | |
333 | } | |
334 | ||
335 | Status MarkNoop(bool) override { return Status::OK(); } | |
336 | Status MarkBeginPrepare(bool) override { return Status::OK(); } | |
337 | Status MarkEndPrepare(const Slice&) override { return Status::OK(); } | |
338 | Status MarkCommit(const Slice&) override { return Status::OK(); } | |
339 | Status MarkRollback(const Slice&) override { | |
340 | return Status::InvalidArgument(); | |
341 | } | |
342 | ||
343 | protected: | |
494da23a TL |
344 | bool WriteAfterCommit() const override { return false; } |
345 | } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch, | |
11fdf7f2 | 346 | *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(), |
f67539c2 TL |
347 | wpt_db_->txn_db_options_.rollback_merge_operands, |
348 | roptions); | |
11fdf7f2 TL |
349 | auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler); |
350 | assert(s.ok()); | |
351 | if (!s.ok()) { | |
352 | return s; | |
353 | } | |
354 | // The Rollback marker will be used as a batch separator | |
355 | WriteBatchInternal::MarkRollback(&rollback_batch, name_); | |
356 | bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; | |
357 | const bool DISABLE_MEMTABLE = true; | |
358 | const uint64_t NO_REF_LOG = 0; | |
359 | uint64_t seq_used = kMaxSequenceNumber; | |
360 | const size_t ONE_BATCH = 1; | |
494da23a TL |
361 | const bool kFirstPrepareBatch = true; |
362 | // We commit the rolled back prepared batches. Although this is | |
11fdf7f2 TL |
363 | // counter-intuitive, i) it is safe to do so, since the prepared batches are |
364 | // already canceled out by the rollback batch, ii) adding the commit entry to | |
365 | // CommitCache will allow us to benefit from the existing mechanism in | |
366 | // CommitCache that keeps an entry evicted due to max advance and yet overlaps | |
367 | // with a live snapshot around so that the live snapshot properly skips the | |
368 | // entry even if its prepare seq is lower than max_evicted_seq_. | |
494da23a TL |
369 | AddPreparedCallback add_prepared_callback( |
370 | wpt_db_, db_impl_, ONE_BATCH, | |
371 | db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch); | |
11fdf7f2 TL |
372 | WritePreparedCommitEntryPreReleaseCallback update_commit_map( |
373 | wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH); | |
494da23a TL |
374 | PreReleaseCallback* pre_release_callback; |
375 | if (do_one_write) { | |
376 | pre_release_callback = &update_commit_map; | |
377 | } else { | |
378 | pre_release_callback = &add_prepared_callback; | |
379 | } | |
11fdf7f2 TL |
380 | // Note: the rollback batch does not need AddPrepared since it is written to |
381 | // DB in one shot. min_uncommitted still works since it requires capturing | |
382 | // data that is written to DB but not yet committed, while | |
494da23a | 383 | // the rollback batch commits with PreReleaseCallback. |
11fdf7f2 TL |
384 | s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, |
385 | NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
494da23a | 386 | pre_release_callback); |
11fdf7f2 TL |
387 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
388 | if (!s.ok()) { | |
389 | return s; | |
390 | } | |
391 | if (do_one_write) { | |
f67539c2 | 392 | assert(!db_impl_->immutable_db_options().two_write_queues); |
11fdf7f2 TL |
393 | wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_); |
394 | return s; | |
395 | } // else do the 2nd write for commit | |
494da23a | 396 | uint64_t rollback_seq = seq_used; |
11fdf7f2 | 397 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, |
494da23a TL |
398 | "RollbackInternal 2nd write rollback_seq: %" PRIu64, |
399 | rollback_seq); | |
11fdf7f2 TL |
400 | // Commit the batch by writing an empty batch to the queue that will release |
401 | // the commit sequence number to readers. | |
494da23a TL |
402 | WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare( |
403 | wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_); | |
11fdf7f2 TL |
404 | WriteBatch empty_batch; |
405 | empty_batch.PutLogData(Slice()); | |
406 | // In the absence of Prepare markers, use Noop as a batch separator | |
407 | WriteBatchInternal::InsertNoop(&empty_batch); | |
408 | s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, | |
409 | NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
410 | &update_commit_map_with_prepare); | |
411 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
494da23a TL |
412 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, |
413 | "RollbackInternal (status=%s) commit: %" PRIu64, | |
414 | s.ToString().c_str(), GetId()); | |
f67539c2 TL |
415 | // TODO(lth): For WriteUnPrepared that rollback is called frequently, |
416 | // RemovePrepared could be moved to the callback to reduce lock contention. | |
11fdf7f2 | 417 | if (s.ok()) { |
11fdf7f2 TL |
418 | wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_); |
419 | } | |
f67539c2 TL |
420 | // Note: RemovePrepared for prepared batch is called from within |
421 | // PreReleaseCallback | |
494da23a | 422 | wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH); |
11fdf7f2 TL |
423 | |
424 | return s; | |
425 | } | |
426 | ||
427 | Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, | |
428 | const Slice& key, | |
429 | SequenceNumber* tracked_at_seq) { | |
430 | assert(snapshot_); | |
431 | ||
432 | SequenceNumber min_uncommitted = | |
433 | static_cast_with_check<const SnapshotImpl, const Snapshot>( | |
434 | snapshot_.get()) | |
435 | ->min_uncommitted_; | |
436 | SequenceNumber snap_seq = snapshot_->GetSequenceNumber(); | |
437 | // tracked_at_seq is either max or the last snapshot with which this key was | |
438 | // trackeed so there is no need to apply the IsInSnapshot to this comparison | |
439 | // here as tracked_at_seq is not a prepare seq. | |
440 | if (*tracked_at_seq <= snap_seq) { | |
441 | // If the key has been previous validated at a sequence number earlier | |
442 | // than the curent snapshot's sequence number, we already know it has not | |
443 | // been modified. | |
444 | return Status::OK(); | |
445 | } | |
446 | ||
447 | *tracked_at_seq = snap_seq; | |
448 | ||
449 | ColumnFamilyHandle* cfh = | |
450 | column_family ? column_family : db_impl_->DefaultColumnFamily(); | |
451 | ||
f67539c2 TL |
452 | WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted, |
453 | kBackedByDBSnapshot); | |
11fdf7f2 TL |
454 | return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(), |
455 | snap_seq, false /* cache_only */, | |
494da23a | 456 | &snap_checker, min_uncommitted); |
11fdf7f2 TL |
457 | } |
458 | ||
459 | void WritePreparedTxn::SetSnapshot() { | |
494da23a TL |
460 | const bool kForWWConflictCheck = true; |
461 | SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck); | |
11fdf7f2 TL |
462 | SetSnapshotInternal(snapshot); |
463 | } | |
464 | ||
465 | Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) { | |
466 | auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch); | |
467 | prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); | |
468 | return ret; | |
469 | } | |
470 | ||
f67539c2 | 471 | } // namespace ROCKSDB_NAMESPACE |
11fdf7f2 TL |
472 | |
473 | #endif // ROCKSDB_LITE |