]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include "utilities/transactions/write_prepared_txn.h" | |
9 | ||
f67539c2 | 10 | #include <cinttypes> |
11fdf7f2 TL |
11 | #include <map> |
12 | #include <set> | |
13 | ||
14 | #include "db/column_family.h" | |
f67539c2 | 15 | #include "db/db_impl/db_impl.h" |
11fdf7f2 TL |
16 | #include "rocksdb/db.h" |
17 | #include "rocksdb/status.h" | |
18 | #include "rocksdb/utilities/transaction_db.h" | |
19 | #include "util/cast_util.h" | |
20 | #include "utilities/transactions/pessimistic_transaction.h" | |
21 | #include "utilities/transactions/write_prepared_txn_db.h" | |
22 | ||
f67539c2 | 23 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
24 | |
25 | struct WriteOptions; | |
26 | ||
27 | WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db, | |
28 | const WriteOptions& write_options, | |
29 | const TransactionOptions& txn_options) | |
494da23a TL |
30 | : PessimisticTransaction(txn_db, write_options, txn_options, false), |
31 | wpt_db_(txn_db) { | |
32 | // Call Initialize outside PessimisticTransaction constructor otherwise it | |
33 | // would skip overridden functions in WritePreparedTxn since they are not | |
34 | // defined yet in the constructor of PessimisticTransaction | |
35 | Initialize(txn_options); | |
36 | } | |
11fdf7f2 TL |
37 | |
38 | void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) { | |
39 | PessimisticTransaction::Initialize(txn_options); | |
40 | prepare_batch_cnt_ = 0; | |
41 | } | |
42 | ||
f67539c2 TL |
43 | void WritePreparedTxn::MultiGet(const ReadOptions& options, |
44 | ColumnFamilyHandle* column_family, | |
45 | const size_t num_keys, const Slice* keys, | |
46 | PinnableSlice* values, Status* statuses, | |
47 | const bool sorted_input) { | |
48 | SequenceNumber min_uncommitted, snap_seq; | |
49 | const SnapshotBackup backed_by_snapshot = | |
50 | wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); | |
51 | WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted, | |
52 | backed_by_snapshot); | |
53 | write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys, | |
54 | keys, values, statuses, sorted_input, | |
55 | &callback); | |
56 | if (UNLIKELY(!callback.valid() || | |
57 | !wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { | |
58 | wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN); | |
59 | for (size_t i = 0; i < num_keys; i++) { | |
60 | statuses[i] = Status::TryAgain(); | |
61 | } | |
62 | } | |
63 | } | |
64 | ||
65 | Status WritePreparedTxn::Get(const ReadOptions& options, | |
11fdf7f2 TL |
66 | ColumnFamilyHandle* column_family, |
67 | const Slice& key, PinnableSlice* pinnable_val) { | |
f67539c2 TL |
68 | SequenceNumber min_uncommitted, snap_seq; |
69 | const SnapshotBackup backed_by_snapshot = | |
70 | wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); | |
71 | WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted, | |
72 | backed_by_snapshot); | |
73 | auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key, | |
74 | pinnable_val, &callback); | |
75 | if (LIKELY(callback.valid() && | |
76 | wpt_db_->ValidateSnapshot(callback.max_visible_seq(), | |
77 | backed_by_snapshot))) { | |
78 | return res; | |
79 | } else { | |
80 | wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN); | |
81 | return Status::TryAgain(); | |
11fdf7f2 | 82 | } |
11fdf7f2 TL |
83 | } |
84 | ||
85 | Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) { | |
86 | // Make sure to get iterator from WritePrepareTxnDB, not the root db. | |
87 | Iterator* db_iter = wpt_db_->NewIterator(options); | |
88 | assert(db_iter); | |
89 | ||
90 | return write_batch_.NewIteratorWithBase(db_iter); | |
91 | } | |
92 | ||
93 | Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options, | |
94 | ColumnFamilyHandle* column_family) { | |
95 | // Make sure to get iterator from WritePrepareTxnDB, not the root db. | |
96 | Iterator* db_iter = wpt_db_->NewIterator(options, column_family); | |
97 | assert(db_iter); | |
98 | ||
99 | return write_batch_.NewIteratorWithBase(column_family, db_iter); | |
100 | } | |
101 | ||
102 | Status WritePreparedTxn::PrepareInternal() { | |
103 | WriteOptions write_options = write_options_; | |
104 | write_options.disableWAL = false; | |
105 | const bool WRITE_AFTER_COMMIT = true; | |
494da23a | 106 | const bool kFirstPrepareBatch = true; |
20effc67 TL |
107 | auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), |
108 | name_, !WRITE_AFTER_COMMIT); | |
109 | assert(s.ok()); | |
11fdf7f2 TL |
110 | // For each duplicate key we account for a new sub-batch |
111 | prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); | |
494da23a TL |
112 | // Having AddPrepared in the PreReleaseCallback allows in-order addition of |
113 | // prepared entries to PreparedHeap and hence enables an optimization. Refer to | |
11fdf7f2 TL |
114 | // SmallestUnCommittedSeq for more details. |
115 | AddPreparedCallback add_prepared_callback( | |
494da23a TL |
116 | wpt_db_, db_impl_, prepare_batch_cnt_, |
117 | db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch); | |
11fdf7f2 TL |
118 | const bool DISABLE_MEMTABLE = true; |
119 | uint64_t seq_used = kMaxSequenceNumber; | |
20effc67 TL |
120 | s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), |
121 | /*callback*/ nullptr, &log_number_, /*log ref*/ 0, | |
122 | !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_, | |
123 | &add_prepared_callback); | |
11fdf7f2 TL |
124 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
125 | auto prepare_seq = seq_used; | |
126 | SetId(prepare_seq); | |
127 | return s; | |
128 | } | |
129 | ||
130 | Status WritePreparedTxn::CommitWithoutPrepareInternal() { | |
131 | // For each duplicate key we account for a new sub-batch | |
132 | const size_t batch_cnt = GetWriteBatch()->SubBatchCnt(); | |
133 | return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt); | |
134 | } | |
135 | ||
136 | Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch, | |
137 | size_t batch_cnt) { | |
138 | return wpt_db_->WriteInternal(write_options_, batch, batch_cnt, this); | |
139 | } | |
140 | ||
141 | Status WritePreparedTxn::CommitInternal() { | |
142 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, | |
143 | "CommitInternal prepare_seq: %" PRIu64, GetID()); | |
144 | // We take the commit-time batch and append the Commit marker. | |
145 | // The Memtable will ignore the Commit marker in non-recovery mode | |
146 | WriteBatch* working_batch = GetCommitTimeWriteBatch(); | |
147 | const bool empty = working_batch->Count() == 0; | |
20effc67 TL |
148 | auto s = WriteBatchInternal::MarkCommit(working_batch, name_); |
149 | assert(s.ok()); | |
11fdf7f2 TL |
150 | |
151 | const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; | |
152 | if (!empty && for_recovery) { | |
153 | // When not writing to memtable, we can still cache the latest write batch. | |
154 | // The cached batch will be written to memtable in WriteRecoverableState | |
155 | // during FlushMemTable | |
156 | WriteBatchInternal::SetAsLastestPersistentState(working_batch); | |
157 | } | |
158 | ||
159 | auto prepare_seq = GetId(); | |
160 | const bool includes_data = !empty && !for_recovery; | |
161 | assert(prepare_batch_cnt_); | |
162 | size_t commit_batch_cnt = 0; | |
163 | if (UNLIKELY(includes_data)) { | |
164 | ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, | |
165 | "Duplicate key overhead"); | |
166 | SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); | |
20effc67 | 167 | s = working_batch->Iterate(&counter); |
11fdf7f2 TL |
168 | assert(s.ok()); |
169 | commit_batch_cnt = counter.BatchCount(); | |
170 | } | |
171 | const bool disable_memtable = !includes_data; | |
172 | const bool do_one_write = | |
173 | !db_impl_->immutable_db_options().two_write_queues || disable_memtable; | |
11fdf7f2 | 174 | WritePreparedCommitEntryPreReleaseCallback update_commit_map( |
494da23a TL |
175 | wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt); |
176 | // This is to call AddPrepared on CommitTimeWriteBatch | |
177 | const bool kFirstPrepareBatch = true; | |
178 | AddPreparedCallback add_prepared_callback( | |
179 | wpt_db_, db_impl_, commit_batch_cnt, | |
180 | db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch); | |
181 | PreReleaseCallback* pre_release_callback; | |
182 | if (do_one_write) { | |
183 | pre_release_callback = &update_commit_map; | |
184 | } else { | |
185 | pre_release_callback = &add_prepared_callback; | |
186 | } | |
11fdf7f2 TL |
187 | uint64_t seq_used = kMaxSequenceNumber; |
188 | // Since the prepared batch is directly written to memtable, there is already | |
189 | // a connection between the memtable and its WAL, so there is no need to | |
190 | // redundantly reference the log that contains the prepared data. | |
191 | const uint64_t zero_log_number = 0ull; | |
192 | size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; | |
20effc67 TL |
193 | s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, |
194 | zero_log_number, disable_memtable, &seq_used, | |
195 | batch_cnt, pre_release_callback); | |
11fdf7f2 | 196 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
494da23a | 197 | const SequenceNumber commit_batch_seq = seq_used; |
11fdf7f2 | 198 | if (LIKELY(do_one_write || !s.ok())) { |
f67539c2 TL |
199 | if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues && |
200 | s.ok())) { | |
201 | // Note: RemovePrepared should be called after WriteImpl that publishsed | |
11fdf7f2 TL |
202 | // the seq. Otherwise SmallestUnCommittedSeq optimization breaks. |
203 | wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); | |
f67539c2 | 204 | } // else RemovePrepared is called from within PreReleaseCallback |
494da23a | 205 | if (UNLIKELY(!do_one_write)) { |
f67539c2 TL |
206 | assert(!s.ok()); |
207 | // Cleanup the prepared entry we added with add_prepared_callback | |
494da23a TL |
208 | wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); |
209 | } | |
11fdf7f2 TL |
210 | return s; |
211 | } // else do the 2nd write to publish seq | |
212 | // Note: the 2nd write comes with a performance penality. So if we have too | |
213 | // many of commits accompanied with ComitTimeWriteBatch and yet we cannot | |
214 | // enable use_only_the_last_commit_time_batch_for_recovery_ optimization, | |
215 | // two_write_queues should be disabled to avoid many additional writes here. | |
494da23a TL |
216 | const size_t kZeroData = 0; |
217 | // Update commit map only from the 2nd queue | |
218 | WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch( | |
219 | wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData, | |
220 | commit_batch_seq, commit_batch_cnt); | |
11fdf7f2 | 221 | WriteBatch empty_batch; |
20effc67 TL |
222 | s = empty_batch.PutLogData(Slice()); |
223 | assert(s.ok()); | |
11fdf7f2 | 224 | // In the absence of Prepare markers, use Noop as a batch separator |
20effc67 TL |
225 | s = WriteBatchInternal::InsertNoop(&empty_batch); |
226 | assert(s.ok()); | |
11fdf7f2 TL |
227 | const bool DISABLE_MEMTABLE = true; |
228 | const size_t ONE_BATCH = 1; | |
229 | const uint64_t NO_REF_LOG = 0; | |
230 | s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, | |
231 | NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
494da23a | 232 | &update_commit_map_with_aux_batch); |
11fdf7f2 | 233 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
f67539c2 TL |
234 | if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues)) { |
235 | if (s.ok()) { | |
236 | // Note: RemovePrepared should be called after WriteImpl that publishsed | |
237 | // the seq. Otherwise SmallestUnCommittedSeq optimization breaks. | |
238 | wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); | |
239 | } | |
240 | wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); | |
241 | } // else RemovePrepared is called from within PreReleaseCallback | |
11fdf7f2 TL |
242 | return s; |
243 | } | |
244 | ||
245 | Status WritePreparedTxn::RollbackInternal() { | |
246 | ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, | |
247 | "RollbackInternal prepare_seq: %" PRIu64, GetId()); | |
248 | WriteBatch rollback_batch; | |
249 | assert(GetId() != kMaxSequenceNumber); | |
250 | assert(GetId() > 0); | |
251 | auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap(); | |
252 | auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap(); | |
494da23a | 253 | auto read_at_seq = kMaxSequenceNumber; |
f67539c2 TL |
254 | ReadOptions roptions; |
255 | // to prevent callback's seq to be overrriden inside DBImpk::Get | |
256 | roptions.snapshot = wpt_db_->GetMaxSnapshot(); | |
11fdf7f2 TL |
257 | struct RollbackWriteBatchBuilder : public WriteBatch::Handler { |
258 | DBImpl* db_; | |
11fdf7f2 TL |
259 | WritePreparedTxnReadCallback callback; |
260 | WriteBatch* rollback_batch_; | |
261 | std::map<uint32_t, const Comparator*>& comparators_; | |
262 | std::map<uint32_t, ColumnFamilyHandle*>& handles_; | |
263 | using CFKeys = std::set<Slice, SetComparator>; | |
264 | std::map<uint32_t, CFKeys> keys_; | |
265 | bool rollback_merge_operands_; | |
f67539c2 | 266 | ReadOptions roptions_; |
11fdf7f2 TL |
267 | RollbackWriteBatchBuilder( |
268 | DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq, | |
269 | WriteBatch* dst_batch, | |
270 | std::map<uint32_t, const Comparator*>& comparators, | |
271 | std::map<uint32_t, ColumnFamilyHandle*>& handles, | |
f67539c2 | 272 | bool rollback_merge_operands, ReadOptions _roptions) |
11fdf7f2 | 273 | : db_(db), |
494da23a | 274 | callback(wpt_db, snap_seq), // disable min_uncommitted optimization |
11fdf7f2 TL |
275 | rollback_batch_(dst_batch), |
276 | comparators_(comparators), | |
277 | handles_(handles), | |
f67539c2 TL |
278 | rollback_merge_operands_(rollback_merge_operands), |
279 | roptions_(_roptions) {} | |
11fdf7f2 TL |
280 | |
281 | Status Rollback(uint32_t cf, const Slice& key) { | |
282 | Status s; | |
283 | CFKeys& cf_keys = keys_[cf]; | |
284 | if (cf_keys.size() == 0) { // just inserted | |
285 | auto cmp = comparators_[cf]; | |
286 | keys_[cf] = CFKeys(SetComparator(cmp)); | |
287 | } | |
288 | auto it = cf_keys.insert(key); | |
289 | if (it.second == | |
290 | false) { // second is false if a element already existed. | |
291 | return s; | |
292 | } | |
293 | ||
294 | PinnableSlice pinnable_val; | |
295 | bool not_used; | |
296 | auto cf_handle = handles_[cf]; | |
f67539c2 TL |
297 | DBImpl::GetImplOptions get_impl_options; |
298 | get_impl_options.column_family = cf_handle; | |
299 | get_impl_options.value = &pinnable_val; | |
300 | get_impl_options.value_found = ¬_used; | |
301 | get_impl_options.callback = &callback; | |
302 | s = db_->GetImpl(roptions_, key, get_impl_options); | |
11fdf7f2 TL |
303 | assert(s.ok() || s.IsNotFound()); |
304 | if (s.ok()) { | |
305 | s = rollback_batch_->Put(cf_handle, key, pinnable_val); | |
306 | assert(s.ok()); | |
307 | } else if (s.IsNotFound()) { | |
308 | // There has been no readable value before txn. By adding a delete we | |
309 | // make sure that there will be none afterwards either. | |
310 | s = rollback_batch_->Delete(cf_handle, key); | |
311 | assert(s.ok()); | |
312 | } else { | |
313 | // Unexpected status. Return it to the user. | |
314 | } | |
315 | return s; | |
316 | } | |
317 | ||
318 | Status PutCF(uint32_t cf, const Slice& key, const Slice& /*val*/) override { | |
319 | return Rollback(cf, key); | |
320 | } | |
321 | ||
322 | Status DeleteCF(uint32_t cf, const Slice& key) override { | |
323 | return Rollback(cf, key); | |
324 | } | |
325 | ||
326 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { | |
327 | return Rollback(cf, key); | |
328 | } | |
329 | ||
330 | Status MergeCF(uint32_t cf, const Slice& key, | |
331 | const Slice& /*val*/) override { | |
332 | if (rollback_merge_operands_) { | |
333 | return Rollback(cf, key); | |
334 | } else { | |
335 | return Status::OK(); | |
336 | } | |
337 | } | |
338 | ||
339 | Status MarkNoop(bool) override { return Status::OK(); } | |
340 | Status MarkBeginPrepare(bool) override { return Status::OK(); } | |
341 | Status MarkEndPrepare(const Slice&) override { return Status::OK(); } | |
342 | Status MarkCommit(const Slice&) override { return Status::OK(); } | |
343 | Status MarkRollback(const Slice&) override { | |
344 | return Status::InvalidArgument(); | |
345 | } | |
346 | ||
347 | protected: | |
494da23a TL |
348 | bool WriteAfterCommit() const override { return false; } |
349 | } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch, | |
11fdf7f2 | 350 | *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(), |
f67539c2 TL |
351 | wpt_db_->txn_db_options_.rollback_merge_operands, |
352 | roptions); | |
11fdf7f2 | 353 | auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler); |
11fdf7f2 TL |
354 | if (!s.ok()) { |
355 | return s; | |
356 | } | |
357 | // The Rollback marker will be used as a batch separator | |
20effc67 TL |
358 | s = WriteBatchInternal::MarkRollback(&rollback_batch, name_); |
359 | assert(s.ok()); | |
11fdf7f2 TL |
360 | bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; |
361 | const bool DISABLE_MEMTABLE = true; | |
362 | const uint64_t NO_REF_LOG = 0; | |
363 | uint64_t seq_used = kMaxSequenceNumber; | |
364 | const size_t ONE_BATCH = 1; | |
494da23a TL |
365 | const bool kFirstPrepareBatch = true; |
366 | // We commit the rolled back prepared batches. Although this is | |
11fdf7f2 TL |
367 | // counter-intuitive, i) it is safe to do so, since the prepared batches are |
368 | // already canceled out by the rollback batch, ii) adding the commit entry to | |
369 | // CommitCache will allow us to benefit from the existing mechanism in | |
370 | // CommitCache that keeps an entry evicted due to max advance and yet overlaps | |
371 | // with a live snapshot around so that the live snapshot properly skips the | |
372 | // entry even if its prepare seq is lower than max_evicted_seq_. | |
494da23a TL |
373 | AddPreparedCallback add_prepared_callback( |
374 | wpt_db_, db_impl_, ONE_BATCH, | |
375 | db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch); | |
11fdf7f2 TL |
376 | WritePreparedCommitEntryPreReleaseCallback update_commit_map( |
377 | wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH); | |
494da23a TL |
378 | PreReleaseCallback* pre_release_callback; |
379 | if (do_one_write) { | |
380 | pre_release_callback = &update_commit_map; | |
381 | } else { | |
382 | pre_release_callback = &add_prepared_callback; | |
383 | } | |
11fdf7f2 TL |
384 | // Note: the rollback batch does not need AddPrepared since it is written to |
385 | // DB in one shot. min_uncommitted still works since it requires capturing | |
386 | // data that is written to DB but not yet committed, while | |
494da23a | 387 | // the rollback batch commits with PreReleaseCallback. |
11fdf7f2 TL |
388 | s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, |
389 | NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
494da23a | 390 | pre_release_callback); |
11fdf7f2 TL |
391 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
392 | if (!s.ok()) { | |
393 | return s; | |
394 | } | |
395 | if (do_one_write) { | |
f67539c2 | 396 | assert(!db_impl_->immutable_db_options().two_write_queues); |
11fdf7f2 TL |
397 | wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_); |
398 | return s; | |
399 | } // else do the 2nd write for commit | |
494da23a | 400 | uint64_t rollback_seq = seq_used; |
11fdf7f2 | 401 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, |
494da23a TL |
402 | "RollbackInternal 2nd write rollback_seq: %" PRIu64, |
403 | rollback_seq); | |
11fdf7f2 TL |
404 | // Commit the batch by writing an empty batch to the queue that will release |
405 | // the commit sequence number to readers. | |
494da23a TL |
406 | WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare( |
407 | wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_); | |
11fdf7f2 | 408 | WriteBatch empty_batch; |
20effc67 TL |
409 | s = empty_batch.PutLogData(Slice()); |
410 | assert(s.ok()); | |
11fdf7f2 | 411 | // In the absence of Prepare markers, use Noop as a batch separator |
20effc67 TL |
412 | s = WriteBatchInternal::InsertNoop(&empty_batch); |
413 | assert(s.ok()); | |
11fdf7f2 TL |
414 | s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, |
415 | NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
416 | &update_commit_map_with_prepare); | |
417 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
494da23a TL |
418 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, |
419 | "RollbackInternal (status=%s) commit: %" PRIu64, | |
420 | s.ToString().c_str(), GetId()); | |
f67539c2 TL |
421 | // TODO(lth): For WriteUnPrepared that rollback is called frequently, |
422 | // RemovePrepared could be moved to the callback to reduce lock contention. | |
11fdf7f2 | 423 | if (s.ok()) { |
11fdf7f2 TL |
424 | wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_); |
425 | } | |
f67539c2 TL |
426 | // Note: RemovePrepared for prepared batch is called from within |
427 | // PreReleaseCallback | |
494da23a | 428 | wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH); |
11fdf7f2 TL |
429 | |
430 | return s; | |
431 | } | |
432 | ||
433 | Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, | |
434 | const Slice& key, | |
435 | SequenceNumber* tracked_at_seq) { | |
436 | assert(snapshot_); | |
437 | ||
438 | SequenceNumber min_uncommitted = | |
20effc67 | 439 | static_cast_with_check<const SnapshotImpl>(snapshot_.get()) |
11fdf7f2 TL |
440 | ->min_uncommitted_; |
441 | SequenceNumber snap_seq = snapshot_->GetSequenceNumber(); | |
442 | // tracked_at_seq is either max or the last snapshot with which this key was | |
443 | // trackeed so there is no need to apply the IsInSnapshot to this comparison | |
444 | // here as tracked_at_seq is not a prepare seq. | |
445 | if (*tracked_at_seq <= snap_seq) { | |
446 | // If the key has been previous validated at a sequence number earlier | |
447 | // than the curent snapshot's sequence number, we already know it has not | |
448 | // been modified. | |
449 | return Status::OK(); | |
450 | } | |
451 | ||
452 | *tracked_at_seq = snap_seq; | |
453 | ||
454 | ColumnFamilyHandle* cfh = | |
455 | column_family ? column_family : db_impl_->DefaultColumnFamily(); | |
456 | ||
f67539c2 TL |
457 | WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted, |
458 | kBackedByDBSnapshot); | |
11fdf7f2 TL |
459 | return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(), |
460 | snap_seq, false /* cache_only */, | |
494da23a | 461 | &snap_checker, min_uncommitted); |
11fdf7f2 TL |
462 | } |
463 | ||
464 | void WritePreparedTxn::SetSnapshot() { | |
494da23a TL |
465 | const bool kForWWConflictCheck = true; |
466 | SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck); | |
11fdf7f2 TL |
467 | SetSnapshotInternal(snapshot); |
468 | } | |
469 | ||
470 | Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) { | |
471 | auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch); | |
472 | prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); | |
473 | return ret; | |
474 | } | |
475 | ||
f67539c2 | 476 | } // namespace ROCKSDB_NAMESPACE |
11fdf7f2 TL |
477 | |
478 | #endif // ROCKSDB_LITE |