]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/write_prepared_txn.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#ifndef ROCKSDB_LITE
7
8#include "utilities/transactions/write_prepared_txn.h"
9
f67539c2 10#include <cinttypes>
11fdf7f2
TL
11#include <map>
12#include <set>
13
14#include "db/column_family.h"
f67539c2 15#include "db/db_impl/db_impl.h"
11fdf7f2
TL
16#include "rocksdb/db.h"
17#include "rocksdb/status.h"
18#include "rocksdb/utilities/transaction_db.h"
19#include "util/cast_util.h"
20#include "utilities/transactions/pessimistic_transaction.h"
21#include "utilities/transactions/write_prepared_txn_db.h"
22
f67539c2 23namespace ROCKSDB_NAMESPACE {
11fdf7f2
TL
24
25struct WriteOptions;
26
27WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
28 const WriteOptions& write_options,
29 const TransactionOptions& txn_options)
494da23a
TL
30 : PessimisticTransaction(txn_db, write_options, txn_options, false),
31 wpt_db_(txn_db) {
32 // Call Initialize outside PessimisticTransaction constructor otherwise it
33 // would skip overridden functions in WritePreparedTxn since they are not
34 // defined yet in the constructor of PessimisticTransaction
35 Initialize(txn_options);
36}
11fdf7f2
TL
37
38void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
39 PessimisticTransaction::Initialize(txn_options);
40 prepare_batch_cnt_ = 0;
41}
42
f67539c2
TL
43void WritePreparedTxn::MultiGet(const ReadOptions& options,
44 ColumnFamilyHandle* column_family,
45 const size_t num_keys, const Slice* keys,
46 PinnableSlice* values, Status* statuses,
47 const bool sorted_input) {
48 SequenceNumber min_uncommitted, snap_seq;
49 const SnapshotBackup backed_by_snapshot =
50 wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
51 WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
52 backed_by_snapshot);
53 write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
54 keys, values, statuses, sorted_input,
55 &callback);
56 if (UNLIKELY(!callback.valid() ||
57 !wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
58 wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
59 for (size_t i = 0; i < num_keys; i++) {
60 statuses[i] = Status::TryAgain();
61 }
62 }
63}
64
65Status WritePreparedTxn::Get(const ReadOptions& options,
11fdf7f2
TL
66 ColumnFamilyHandle* column_family,
67 const Slice& key, PinnableSlice* pinnable_val) {
f67539c2
TL
68 SequenceNumber min_uncommitted, snap_seq;
69 const SnapshotBackup backed_by_snapshot =
70 wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
71 WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
72 backed_by_snapshot);
73 auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
74 pinnable_val, &callback);
75 if (LIKELY(callback.valid() &&
76 wpt_db_->ValidateSnapshot(callback.max_visible_seq(),
77 backed_by_snapshot))) {
78 return res;
79 } else {
80 wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
81 return Status::TryAgain();
11fdf7f2 82 }
11fdf7f2
TL
83}
84
85Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
86 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
87 Iterator* db_iter = wpt_db_->NewIterator(options);
88 assert(db_iter);
89
90 return write_batch_.NewIteratorWithBase(db_iter);
91}
92
93Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
94 ColumnFamilyHandle* column_family) {
95 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
96 Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
97 assert(db_iter);
98
99 return write_batch_.NewIteratorWithBase(column_family, db_iter);
100}
101
102Status WritePreparedTxn::PrepareInternal() {
103 WriteOptions write_options = write_options_;
104 write_options.disableWAL = false;
105 const bool WRITE_AFTER_COMMIT = true;
494da23a 106 const bool kFirstPrepareBatch = true;
20effc67
TL
107 auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
108 name_, !WRITE_AFTER_COMMIT);
109 assert(s.ok());
11fdf7f2
TL
110 // For each duplicate key we account for a new sub-batch
111 prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
494da23a
TL
112 // Having AddPrepared in the PreReleaseCallback allows in-order addition of
113 // prepared entries to PreparedHeap and hence enables an optimization. Refer to
11fdf7f2
TL
114 // SmallestUnCommittedSeq for more details.
115 AddPreparedCallback add_prepared_callback(
494da23a
TL
116 wpt_db_, db_impl_, prepare_batch_cnt_,
117 db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch);
11fdf7f2
TL
118 const bool DISABLE_MEMTABLE = true;
119 uint64_t seq_used = kMaxSequenceNumber;
20effc67
TL
120 s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
121 /*callback*/ nullptr, &log_number_, /*log ref*/ 0,
122 !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_,
123 &add_prepared_callback);
11fdf7f2
TL
124 assert(!s.ok() || seq_used != kMaxSequenceNumber);
125 auto prepare_seq = seq_used;
126 SetId(prepare_seq);
127 return s;
128}
129
130Status WritePreparedTxn::CommitWithoutPrepareInternal() {
131 // For each duplicate key we account for a new sub-batch
132 const size_t batch_cnt = GetWriteBatch()->SubBatchCnt();
133 return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt);
134}
135
136Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch,
137 size_t batch_cnt) {
138 return wpt_db_->WriteInternal(write_options_, batch, batch_cnt, this);
139}
140
141Status WritePreparedTxn::CommitInternal() {
142 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
143 "CommitInternal prepare_seq: %" PRIu64, GetID());
144 // We take the commit-time batch and append the Commit marker.
145 // The Memtable will ignore the Commit marker in non-recovery mode
146 WriteBatch* working_batch = GetCommitTimeWriteBatch();
147 const bool empty = working_batch->Count() == 0;
20effc67
TL
148 auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
149 assert(s.ok());
11fdf7f2
TL
150
151 const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
152 if (!empty && for_recovery) {
153 // When not writing to memtable, we can still cache the latest write batch.
154 // The cached batch will be written to memtable in WriteRecoverableState
155 // during FlushMemTable
156 WriteBatchInternal::SetAsLastestPersistentState(working_batch);
157 }
158
159 auto prepare_seq = GetId();
160 const bool includes_data = !empty && !for_recovery;
161 assert(prepare_batch_cnt_);
162 size_t commit_batch_cnt = 0;
163 if (UNLIKELY(includes_data)) {
164 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
165 "Duplicate key overhead");
166 SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
20effc67 167 s = working_batch->Iterate(&counter);
11fdf7f2
TL
168 assert(s.ok());
169 commit_batch_cnt = counter.BatchCount();
170 }
171 const bool disable_memtable = !includes_data;
172 const bool do_one_write =
173 !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
11fdf7f2 174 WritePreparedCommitEntryPreReleaseCallback update_commit_map(
494da23a
TL
175 wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
176 // This is to call AddPrepared on CommitTimeWriteBatch
177 const bool kFirstPrepareBatch = true;
178 AddPreparedCallback add_prepared_callback(
179 wpt_db_, db_impl_, commit_batch_cnt,
180 db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
181 PreReleaseCallback* pre_release_callback;
182 if (do_one_write) {
183 pre_release_callback = &update_commit_map;
184 } else {
185 pre_release_callback = &add_prepared_callback;
186 }
11fdf7f2
TL
187 uint64_t seq_used = kMaxSequenceNumber;
188 // Since the prepared batch is directly written to memtable, there is already
189 // a connection between the memtable and its WAL, so there is no need to
190 // redundantly reference the log that contains the prepared data.
191 const uint64_t zero_log_number = 0ull;
192 size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
20effc67
TL
193 s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
194 zero_log_number, disable_memtable, &seq_used,
195 batch_cnt, pre_release_callback);
11fdf7f2 196 assert(!s.ok() || seq_used != kMaxSequenceNumber);
494da23a 197 const SequenceNumber commit_batch_seq = seq_used;
11fdf7f2 198 if (LIKELY(do_one_write || !s.ok())) {
f67539c2
TL
199 if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues &&
200 s.ok())) {
201 // Note: RemovePrepared should be called after WriteImpl that publishsed
11fdf7f2
TL
202 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
203 wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
f67539c2 204 } // else RemovePrepared is called from within PreReleaseCallback
494da23a 205 if (UNLIKELY(!do_one_write)) {
f67539c2
TL
206 assert(!s.ok());
207 // Cleanup the prepared entry we added with add_prepared_callback
494da23a
TL
208 wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
209 }
11fdf7f2
TL
210 return s;
211 } // else do the 2nd write to publish seq
212 // Note: the 2nd write comes with a performance penality. So if we have too
213 // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
214 // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
215 // two_write_queues should be disabled to avoid many additional writes here.
494da23a
TL
216 const size_t kZeroData = 0;
217 // Update commit map only from the 2nd queue
218 WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch(
219 wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData,
220 commit_batch_seq, commit_batch_cnt);
11fdf7f2 221 WriteBatch empty_batch;
20effc67
TL
222 s = empty_batch.PutLogData(Slice());
223 assert(s.ok());
11fdf7f2 224 // In the absence of Prepare markers, use Noop as a batch separator
20effc67
TL
225 s = WriteBatchInternal::InsertNoop(&empty_batch);
226 assert(s.ok());
11fdf7f2
TL
227 const bool DISABLE_MEMTABLE = true;
228 const size_t ONE_BATCH = 1;
229 const uint64_t NO_REF_LOG = 0;
230 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
231 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
494da23a 232 &update_commit_map_with_aux_batch);
11fdf7f2 233 assert(!s.ok() || seq_used != kMaxSequenceNumber);
f67539c2
TL
234 if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues)) {
235 if (s.ok()) {
236 // Note: RemovePrepared should be called after WriteImpl that publishsed
237 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
238 wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
239 }
240 wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
241 } // else RemovePrepared is called from within PreReleaseCallback
11fdf7f2
TL
242 return s;
243}
244
245Status WritePreparedTxn::RollbackInternal() {
246 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
247 "RollbackInternal prepare_seq: %" PRIu64, GetId());
248 WriteBatch rollback_batch;
249 assert(GetId() != kMaxSequenceNumber);
250 assert(GetId() > 0);
251 auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
252 auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
494da23a 253 auto read_at_seq = kMaxSequenceNumber;
f67539c2
TL
254 ReadOptions roptions;
255 // to prevent callback's seq to be overrriden inside DBImpk::Get
256 roptions.snapshot = wpt_db_->GetMaxSnapshot();
11fdf7f2
TL
257 struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
258 DBImpl* db_;
11fdf7f2
TL
259 WritePreparedTxnReadCallback callback;
260 WriteBatch* rollback_batch_;
261 std::map<uint32_t, const Comparator*>& comparators_;
262 std::map<uint32_t, ColumnFamilyHandle*>& handles_;
263 using CFKeys = std::set<Slice, SetComparator>;
264 std::map<uint32_t, CFKeys> keys_;
265 bool rollback_merge_operands_;
f67539c2 266 ReadOptions roptions_;
11fdf7f2
TL
267 RollbackWriteBatchBuilder(
268 DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
269 WriteBatch* dst_batch,
270 std::map<uint32_t, const Comparator*>& comparators,
271 std::map<uint32_t, ColumnFamilyHandle*>& handles,
f67539c2 272 bool rollback_merge_operands, ReadOptions _roptions)
11fdf7f2 273 : db_(db),
494da23a 274 callback(wpt_db, snap_seq), // disable min_uncommitted optimization
11fdf7f2
TL
275 rollback_batch_(dst_batch),
276 comparators_(comparators),
277 handles_(handles),
f67539c2
TL
278 rollback_merge_operands_(rollback_merge_operands),
279 roptions_(_roptions) {}
11fdf7f2
TL
280
281 Status Rollback(uint32_t cf, const Slice& key) {
282 Status s;
283 CFKeys& cf_keys = keys_[cf];
284 if (cf_keys.size() == 0) { // just inserted
285 auto cmp = comparators_[cf];
286 keys_[cf] = CFKeys(SetComparator(cmp));
287 }
288 auto it = cf_keys.insert(key);
289 if (it.second ==
290 false) { // second is false if a element already existed.
291 return s;
292 }
293
294 PinnableSlice pinnable_val;
295 bool not_used;
296 auto cf_handle = handles_[cf];
f67539c2
TL
297 DBImpl::GetImplOptions get_impl_options;
298 get_impl_options.column_family = cf_handle;
299 get_impl_options.value = &pinnable_val;
300 get_impl_options.value_found = &not_used;
301 get_impl_options.callback = &callback;
302 s = db_->GetImpl(roptions_, key, get_impl_options);
11fdf7f2
TL
303 assert(s.ok() || s.IsNotFound());
304 if (s.ok()) {
305 s = rollback_batch_->Put(cf_handle, key, pinnable_val);
306 assert(s.ok());
307 } else if (s.IsNotFound()) {
308 // There has been no readable value before txn. By adding a delete we
309 // make sure that there will be none afterwards either.
310 s = rollback_batch_->Delete(cf_handle, key);
311 assert(s.ok());
312 } else {
313 // Unexpected status. Return it to the user.
314 }
315 return s;
316 }
317
318 Status PutCF(uint32_t cf, const Slice& key, const Slice& /*val*/) override {
319 return Rollback(cf, key);
320 }
321
322 Status DeleteCF(uint32_t cf, const Slice& key) override {
323 return Rollback(cf, key);
324 }
325
326 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
327 return Rollback(cf, key);
328 }
329
330 Status MergeCF(uint32_t cf, const Slice& key,
331 const Slice& /*val*/) override {
332 if (rollback_merge_operands_) {
333 return Rollback(cf, key);
334 } else {
335 return Status::OK();
336 }
337 }
338
339 Status MarkNoop(bool) override { return Status::OK(); }
340 Status MarkBeginPrepare(bool) override { return Status::OK(); }
341 Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
342 Status MarkCommit(const Slice&) override { return Status::OK(); }
343 Status MarkRollback(const Slice&) override {
344 return Status::InvalidArgument();
345 }
346
347 protected:
494da23a
TL
348 bool WriteAfterCommit() const override { return false; }
349 } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
11fdf7f2 350 *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
f67539c2
TL
351 wpt_db_->txn_db_options_.rollback_merge_operands,
352 roptions);
11fdf7f2 353 auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
11fdf7f2
TL
354 if (!s.ok()) {
355 return s;
356 }
357 // The Rollback marker will be used as a batch separator
20effc67
TL
358 s = WriteBatchInternal::MarkRollback(&rollback_batch, name_);
359 assert(s.ok());
11fdf7f2
TL
360 bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
361 const bool DISABLE_MEMTABLE = true;
362 const uint64_t NO_REF_LOG = 0;
363 uint64_t seq_used = kMaxSequenceNumber;
364 const size_t ONE_BATCH = 1;
494da23a
TL
365 const bool kFirstPrepareBatch = true;
366 // We commit the rolled back prepared batches. Although this is
11fdf7f2
TL
367 // counter-intuitive, i) it is safe to do so, since the prepared batches are
368 // already canceled out by the rollback batch, ii) adding the commit entry to
369 // CommitCache will allow us to benefit from the existing mechanism in
370 // CommitCache that keeps an entry evicted due to max advance and yet overlaps
371 // with a live snapshot around so that the live snapshot properly skips the
372 // entry even if its prepare seq is lower than max_evicted_seq_.
494da23a
TL
373 AddPreparedCallback add_prepared_callback(
374 wpt_db_, db_impl_, ONE_BATCH,
375 db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
11fdf7f2
TL
376 WritePreparedCommitEntryPreReleaseCallback update_commit_map(
377 wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH);
494da23a
TL
378 PreReleaseCallback* pre_release_callback;
379 if (do_one_write) {
380 pre_release_callback = &update_commit_map;
381 } else {
382 pre_release_callback = &add_prepared_callback;
383 }
11fdf7f2
TL
384 // Note: the rollback batch does not need AddPrepared since it is written to
385 // DB in one shot. min_uncommitted still works since it requires capturing
386 // data that is written to DB but not yet committed, while
494da23a 387 // the rollback batch commits with PreReleaseCallback.
11fdf7f2
TL
388 s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
389 NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
494da23a 390 pre_release_callback);
11fdf7f2
TL
391 assert(!s.ok() || seq_used != kMaxSequenceNumber);
392 if (!s.ok()) {
393 return s;
394 }
395 if (do_one_write) {
f67539c2 396 assert(!db_impl_->immutable_db_options().two_write_queues);
11fdf7f2
TL
397 wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
398 return s;
399 } // else do the 2nd write for commit
494da23a 400 uint64_t rollback_seq = seq_used;
11fdf7f2 401 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
494da23a
TL
402 "RollbackInternal 2nd write rollback_seq: %" PRIu64,
403 rollback_seq);
11fdf7f2
TL
404 // Commit the batch by writing an empty batch to the queue that will release
405 // the commit sequence number to readers.
494da23a
TL
406 WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
407 wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_);
11fdf7f2 408 WriteBatch empty_batch;
20effc67
TL
409 s = empty_batch.PutLogData(Slice());
410 assert(s.ok());
11fdf7f2 411 // In the absence of Prepare markers, use Noop as a batch separator
20effc67
TL
412 s = WriteBatchInternal::InsertNoop(&empty_batch);
413 assert(s.ok());
11fdf7f2
TL
414 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
415 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
416 &update_commit_map_with_prepare);
417 assert(!s.ok() || seq_used != kMaxSequenceNumber);
494da23a
TL
418 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
419 "RollbackInternal (status=%s) commit: %" PRIu64,
420 s.ToString().c_str(), GetId());
f67539c2
TL
421 // TODO(lth): For WriteUnPrepared that rollback is called frequently,
422 // RemovePrepared could be moved to the callback to reduce lock contention.
11fdf7f2 423 if (s.ok()) {
11fdf7f2
TL
424 wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
425 }
f67539c2
TL
426 // Note: RemovePrepared for prepared batch is called from within
427 // PreReleaseCallback
494da23a 428 wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);
11fdf7f2
TL
429
430 return s;
431}
432
433Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
434 const Slice& key,
435 SequenceNumber* tracked_at_seq) {
436 assert(snapshot_);
437
438 SequenceNumber min_uncommitted =
20effc67 439 static_cast_with_check<const SnapshotImpl>(snapshot_.get())
11fdf7f2
TL
440 ->min_uncommitted_;
441 SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
442 // tracked_at_seq is either max or the last snapshot with which this key was
443 // trackeed so there is no need to apply the IsInSnapshot to this comparison
444 // here as tracked_at_seq is not a prepare seq.
445 if (*tracked_at_seq <= snap_seq) {
446 // If the key has been previous validated at a sequence number earlier
447 // than the curent snapshot's sequence number, we already know it has not
448 // been modified.
449 return Status::OK();
450 }
451
452 *tracked_at_seq = snap_seq;
453
454 ColumnFamilyHandle* cfh =
455 column_family ? column_family : db_impl_->DefaultColumnFamily();
456
f67539c2
TL
457 WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted,
458 kBackedByDBSnapshot);
11fdf7f2
TL
459 return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
460 snap_seq, false /* cache_only */,
494da23a 461 &snap_checker, min_uncommitted);
11fdf7f2
TL
462}
463
464void WritePreparedTxn::SetSnapshot() {
494da23a
TL
465 const bool kForWWConflictCheck = true;
466 SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck);
11fdf7f2
TL
467 SetSnapshotInternal(snapshot);
468}
469
470Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
471 auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
472 prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
473 return ret;
474}
475
f67539c2 476} // namespace ROCKSDB_NAMESPACE
11fdf7f2
TL
477
478#endif // ROCKSDB_LITE