]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/write_unprepared_txn.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_unprepared_txn.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#ifndef ROCKSDB_LITE
7
8#include "utilities/transactions/write_unprepared_txn.h"
f67539c2 9#include "db/db_impl/db_impl.h"
11fdf7f2
TL
10#include "util/cast_util.h"
11#include "utilities/transactions/write_unprepared_txn_db.h"
12
f67539c2 13namespace ROCKSDB_NAMESPACE {
11fdf7f2 14
494da23a 15bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) {
11fdf7f2
TL
16 // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is
17 // in unprep_seqs, we have to check if seq is equal to prep_seq or any of
18 // the prepare_batch_cnt seq nums after it.
19 //
20 // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is
21 // large.
f67539c2 22 for (const auto& it : unprep_seqs_) {
11fdf7f2
TL
23 if (it.first <= seq && seq < it.first + it.second) {
24 return true;
25 }
26 }
27
f67539c2
TL
28 bool snap_released = false;
29 auto ret =
30 db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_, &snap_released);
31 assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
32 snap_released_ |= snap_released;
33 return ret;
11fdf7f2
TL
34}
35
36WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
37 const WriteOptions& write_options,
38 const TransactionOptions& txn_options)
f67539c2
TL
39 : WritePreparedTxn(txn_db, write_options, txn_options),
40 wupt_db_(txn_db),
41 last_log_number_(0),
42 recovered_txn_(false),
43 largest_validated_seq_(0) {
44 if (txn_options.write_batch_flush_threshold < 0) {
45 write_batch_flush_threshold_ =
46 txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
47 } else {
48 write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
49 }
11fdf7f2
TL
50}
51
52WriteUnpreparedTxn::~WriteUnpreparedTxn() {
53 if (!unprep_seqs_.empty()) {
54 assert(log_number_ > 0);
55 assert(GetId() > 0);
56 assert(!name_.empty());
57
58 // We should rollback regardless of GetState, but some unit tests that
59 // test crash recovery run the destructor assuming that rollback does not
60 // happen, so that rollback during recovery can be exercised.
f67539c2
TL
61 if (GetState() == STARTED || GetState() == LOCKS_STOLEN) {
62 auto s = RollbackInternal();
11fdf7f2 63 assert(s.ok());
f67539c2
TL
64 if (!s.ok()) {
65 ROCKS_LOG_FATAL(
66 wupt_db_->info_log_,
67 "Rollback of WriteUnprepared transaction failed in destructor: %s",
68 s.ToString().c_str());
69 }
11fdf7f2
TL
70 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
71 log_number_);
72 }
73 }
f67539c2 74
20effc67 75 // Clear the tracked locks so that ~PessimisticTransaction does not
f67539c2
TL
76 // try to unlock keys for recovered transactions.
77 if (recovered_txn_) {
20effc67 78 tracked_locks_->Clear();
f67539c2 79 }
11fdf7f2
TL
80}
81
82void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
83 PessimisticTransaction::Initialize(txn_options);
f67539c2
TL
84 if (txn_options.write_batch_flush_threshold < 0) {
85 write_batch_flush_threshold_ =
86 txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
87 } else {
88 write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
89 }
90
11fdf7f2 91 unprep_seqs_.clear();
f67539c2
TL
92 flushed_save_points_.reset(nullptr);
93 unflushed_save_points_.reset(nullptr);
94 recovered_txn_ = false;
95 largest_validated_seq_ = 0;
96 assert(active_iterators_.empty());
97 active_iterators_.clear();
98 untracked_keys_.clear();
99}
100
101Status WriteUnpreparedTxn::HandleWrite(std::function<Status()> do_write) {
102 Status s;
103 if (active_iterators_.empty()) {
104 s = MaybeFlushWriteBatchToDB();
105 if (!s.ok()) {
106 return s;
107 }
108 }
109 s = do_write();
110 if (s.ok()) {
111 if (snapshot_) {
112 largest_validated_seq_ =
113 std::max(largest_validated_seq_, snapshot_->GetSequenceNumber());
114 } else {
115 // TODO(lth): We should use the same number as tracked_at_seq in TryLock,
116 // because what is actually being tracked is the sequence number at which
117 // this key was locked at.
118 largest_validated_seq_ = db_impl_->GetLastPublishedSequence();
119 }
120 }
121 return s;
11fdf7f2
TL
122}
123
124Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
494da23a
TL
125 const Slice& key, const Slice& value,
126 const bool assume_tracked) {
f67539c2
TL
127 return HandleWrite([&]() {
128 return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
129 });
11fdf7f2
TL
130}
131
132Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
494da23a
TL
133 const SliceParts& key, const SliceParts& value,
134 const bool assume_tracked) {
f67539c2
TL
135 return HandleWrite([&]() {
136 return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
137 });
11fdf7f2
TL
138}
139
140Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family,
494da23a
TL
141 const Slice& key, const Slice& value,
142 const bool assume_tracked) {
f67539c2
TL
143 return HandleWrite([&]() {
144 return TransactionBaseImpl::Merge(column_family, key, value,
145 assume_tracked);
146 });
11fdf7f2
TL
147}
148
149Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
494da23a 150 const Slice& key, const bool assume_tracked) {
f67539c2
TL
151 return HandleWrite([&]() {
152 return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
153 });
11fdf7f2
TL
154}
155
156Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
494da23a
TL
157 const SliceParts& key,
158 const bool assume_tracked) {
f67539c2
TL
159 return HandleWrite([&]() {
160 return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
161 });
11fdf7f2
TL
162}
163
164Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
494da23a
TL
165 const Slice& key,
166 const bool assume_tracked) {
f67539c2
TL
167 return HandleWrite([&]() {
168 return TransactionBaseImpl::SingleDelete(column_family, key,
169 assume_tracked);
170 });
11fdf7f2
TL
171}
172
173Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
494da23a
TL
174 const SliceParts& key,
175 const bool assume_tracked) {
f67539c2
TL
176 return HandleWrite([&]() {
177 return TransactionBaseImpl::SingleDelete(column_family, key,
178 assume_tracked);
179 });
180}
181
182// WriteUnpreparedTxn::RebuildFromWriteBatch is only called on recovery. For
183// WriteUnprepared, the write batches have already been written into the
184// database during WAL replay, so all we have to do is just to "retrack" the key
185// so that rollbacks are possible.
186//
187// Calling TryLock instead of TrackKey is also possible, but as an optimization,
188// recovered transactions do not hold locks on their keys. This follows the
189// implementation in PessimisticTransactionDB::Initialize where we set
190// skip_concurrency_control to true.
191Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) {
192 struct TrackKeyHandler : public WriteBatch::Handler {
193 WriteUnpreparedTxn* txn_;
194 bool rollback_merge_operands_;
195
196 TrackKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
197 : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
198
199 Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
200 txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
201 false /* read_only */, true /* exclusive */);
202 return Status::OK();
203 }
204
205 Status DeleteCF(uint32_t cf, const Slice& key) override {
206 txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
207 false /* read_only */, true /* exclusive */);
208 return Status::OK();
209 }
210
211 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
212 txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
213 false /* read_only */, true /* exclusive */);
214 return Status::OK();
215 }
216
217 Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
218 if (rollback_merge_operands_) {
219 txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
220 false /* read_only */, true /* exclusive */);
221 }
222 return Status::OK();
223 }
224
225 // Recovered batches do not contain 2PC markers.
226 Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
227
228 Status MarkEndPrepare(const Slice&) override {
229 return Status::InvalidArgument();
230 }
231
232 Status MarkNoop(bool) override { return Status::InvalidArgument(); }
233
234 Status MarkCommit(const Slice&) override {
235 return Status::InvalidArgument();
236 }
237
238 Status MarkRollback(const Slice&) override {
239 return Status::InvalidArgument();
240 }
241 };
242
243 TrackKeyHandler handler(this,
244 wupt_db_->txn_db_options_.rollback_merge_operands);
245 return wb->Iterate(&handler);
11fdf7f2
TL
246}
247
248Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
249 const bool kPrepared = true;
250 Status s;
f67539c2
TL
251 if (write_batch_flush_threshold_ > 0 &&
252 write_batch_.GetWriteBatch()->Count() > 0 &&
253 write_batch_.GetDataSize() >
254 static_cast<size_t>(write_batch_flush_threshold_)) {
11fdf7f2
TL
255 assert(GetState() != PREPARED);
256 s = FlushWriteBatchToDB(!kPrepared);
11fdf7f2
TL
257 }
258 return s;
259}
260
f67539c2
TL
261Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
262 // If the current write batch contains savepoints, then some special handling
263 // is required so that RollbackToSavepoint can work.
264 //
265 // RollbackToSavepoint is not supported after Prepare() is called, so only do
266 // this for unprepared batches.
267 if (!prepared && unflushed_save_points_ != nullptr &&
268 !unflushed_save_points_->empty()) {
269 return FlushWriteBatchWithSavePointToDB();
270 }
271
272 return FlushWriteBatchToDBInternal(prepared);
11fdf7f2
TL
273}
274
f67539c2 275Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) {
11fdf7f2 276 if (name_.empty()) {
f67539c2
TL
277 assert(!prepared);
278#ifndef NDEBUG
279 static std::atomic_ullong autogen_id{0};
280 // To avoid changing all tests to call SetName, just autogenerate one.
281 if (wupt_db_->txn_db_options_.autogenerate_name) {
20effc67
TL
282 auto s =
283 SetName(std::string("autoxid") + ToString(autogen_id.fetch_add(1)));
284 assert(s.ok());
f67539c2
TL
285 } else
286#endif
287 {
288 return Status::InvalidArgument("Cannot write to DB without SetName.");
289 }
11fdf7f2
TL
290 }
291
f67539c2
TL
292 struct UntrackedKeyHandler : public WriteBatch::Handler {
293 WriteUnpreparedTxn* txn_;
294 bool rollback_merge_operands_;
295
296 UntrackedKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
297 : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
298
299 Status AddUntrackedKey(uint32_t cf, const Slice& key) {
300 auto str = key.ToString();
20effc67
TL
301 PointLockStatus lock_status =
302 txn_->tracked_locks_->GetPointLockStatus(cf, str);
303 if (!lock_status.locked) {
f67539c2
TL
304 txn_->untracked_keys_[cf].push_back(str);
305 }
306 return Status::OK();
307 }
308
309 Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
310 return AddUntrackedKey(cf, key);
311 }
312
313 Status DeleteCF(uint32_t cf, const Slice& key) override {
314 return AddUntrackedKey(cf, key);
315 }
316
317 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
318 return AddUntrackedKey(cf, key);
319 }
320
321 Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
322 if (rollback_merge_operands_) {
323 return AddUntrackedKey(cf, key);
324 }
325 return Status::OK();
326 }
327
328 // The only expected 2PC marker is the initial Noop marker.
329 Status MarkNoop(bool empty_batch) override {
330 return empty_batch ? Status::OK() : Status::InvalidArgument();
331 }
332
333 Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
334
335 Status MarkEndPrepare(const Slice&) override {
336 return Status::InvalidArgument();
337 }
338
339 Status MarkCommit(const Slice&) override {
340 return Status::InvalidArgument();
341 }
342
343 Status MarkRollback(const Slice&) override {
344 return Status::InvalidArgument();
345 }
346 };
347
348 UntrackedKeyHandler handler(
11fdf7f2 349 this, wupt_db_->txn_db_options_.rollback_merge_operands);
f67539c2 350 auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&handler);
11fdf7f2 351 assert(s.ok());
11fdf7f2
TL
352
353 // TODO(lth): Reduce duplicate code with WritePrepared prepare logic.
354 WriteOptions write_options = write_options_;
355 write_options.disableWAL = false;
356 const bool WRITE_AFTER_COMMIT = true;
494da23a 357 const bool first_prepare_batch = log_number_ == 0;
11fdf7f2 358 // MarkEndPrepare will change Noop marker to the appropriate marker.
20effc67
TL
359 s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
360 name_, !WRITE_AFTER_COMMIT, !prepared);
361 assert(s.ok());
11fdf7f2
TL
362 // For each duplicate key we account for a new sub-batch
363 prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
364 // AddPrepared better to be called in the pre-release callback otherwise there
365 // is a non-zero chance of max advancing prepare_seq and readers assume the
366 // data as committed.
367 // Also having it in the PreReleaseCallback allows in-order addition of
494da23a
TL
368 // prepared entries to PreparedHeap and hence enables an optimization. Refer
369 // to SmallestUnCommittedSeq for more details.
11fdf7f2 370 AddPreparedCallback add_prepared_callback(
494da23a
TL
371 wpt_db_, db_impl_, prepare_batch_cnt_,
372 db_impl_->immutable_db_options().two_write_queues, first_prepare_batch);
11fdf7f2
TL
373 const bool DISABLE_MEMTABLE = true;
374 uint64_t seq_used = kMaxSequenceNumber;
375 // log_number_ should refer to the oldest log containing uncommitted data
376 // from the current transaction. This means that if log_number_ is set,
377 // WriteImpl should not overwrite that value, so set log_used to nullptr if
378 // log_number_ is already set.
11fdf7f2 379 s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
f67539c2
TL
380 /*callback*/ nullptr, &last_log_number_,
381 /*log ref*/ 0, !DISABLE_MEMTABLE, &seq_used,
382 prepare_batch_cnt_, &add_prepared_callback);
383 if (log_number_ == 0) {
384 log_number_ = last_log_number_;
385 }
11fdf7f2
TL
386 assert(!s.ok() || seq_used != kMaxSequenceNumber);
387 auto prepare_seq = seq_used;
388
389 // Only call SetId if it hasn't been set yet.
390 if (GetId() == 0) {
391 SetId(prepare_seq);
392 }
393 // unprep_seqs_ will also contain prepared seqnos since they are treated in
394 // the same way in the prepare/commit callbacks. See the comment on the
395 // definition of unprep_seqs_.
396 unprep_seqs_[prepare_seq] = prepare_batch_cnt_;
397
398 // Reset transaction state.
399 if (!prepared) {
400 prepare_batch_cnt_ = 0;
f67539c2
TL
401 const bool kClear = true;
402 TransactionBaseImpl::InitWriteBatch(kClear);
11fdf7f2
TL
403 }
404
405 return s;
406}
407
f67539c2
TL
408Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() {
409 assert(unflushed_save_points_ != nullptr &&
410 unflushed_save_points_->size() > 0);
411 assert(save_points_ != nullptr && save_points_->size() > 0);
412 assert(save_points_->size() >= unflushed_save_points_->size());
413
414 // Handler class for creating an unprepared batch from a savepoint.
415 struct SavePointBatchHandler : public WriteBatch::Handler {
416 WriteBatchWithIndex* wb_;
417 const std::map<uint32_t, ColumnFamilyHandle*>& handles_;
418
419 SavePointBatchHandler(
420 WriteBatchWithIndex* wb,
421 const std::map<uint32_t, ColumnFamilyHandle*>& handles)
422 : wb_(wb), handles_(handles) {}
423
424 Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
425 return wb_->Put(handles_.at(cf), key, value);
426 }
427
428 Status DeleteCF(uint32_t cf, const Slice& key) override {
429 return wb_->Delete(handles_.at(cf), key);
430 }
431
432 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
433 return wb_->SingleDelete(handles_.at(cf), key);
434 }
435
436 Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
437 return wb_->Merge(handles_.at(cf), key, value);
438 }
439
440 // The only expected 2PC marker is the initial Noop marker.
441 Status MarkNoop(bool empty_batch) override {
442 return empty_batch ? Status::OK() : Status::InvalidArgument();
443 }
444
445 Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
446
447 Status MarkEndPrepare(const Slice&) override {
448 return Status::InvalidArgument();
449 }
450
451 Status MarkCommit(const Slice&) override {
452 return Status::InvalidArgument();
453 }
454
455 Status MarkRollback(const Slice&) override {
456 return Status::InvalidArgument();
457 }
458 };
459
460 // The comparator of the default cf is passed in, similar to the
461 // initialization of TransactionBaseImpl::write_batch_. This comparator is
462 // only used if the write batch encounters an invalid cf id, and falls back to
463 // this comparator.
464 WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0,
465 true, 0);
466 // Swap with write_batch_ so that wb contains the complete write batch. The
467 // actual write batch that will be flushed to DB will be built in
468 // write_batch_, and will be read by FlushWriteBatchToDBInternal.
469 std::swap(wb, write_batch_);
470 TransactionBaseImpl::InitWriteBatch();
471
472 size_t prev_boundary = WriteBatchInternal::kHeader;
473 const bool kPrepared = true;
474 for (size_t i = 0; i < unflushed_save_points_->size() + 1; i++) {
475 bool trailing_batch = i == unflushed_save_points_->size();
476 SavePointBatchHandler sp_handler(&write_batch_,
477 *wupt_db_->GetCFHandleMap().get());
478 size_t curr_boundary = trailing_batch ? wb.GetWriteBatch()->GetDataSize()
479 : (*unflushed_save_points_)[i];
480
481 // Construct the partial write batch up to the savepoint.
482 //
483 // Theoretically, a memcpy between the write batches should be sufficient
484 // since the rewriting into the batch should produce the exact same byte
485 // representation. Rebuilding the WriteBatchWithIndex index is still
486 // necessary though, and would imply doing two passes over the batch though.
487 Status s = WriteBatchInternal::Iterate(wb.GetWriteBatch(), &sp_handler,
488 prev_boundary, curr_boundary);
489 if (!s.ok()) {
490 return s;
491 }
492
493 if (write_batch_.GetWriteBatch()->Count() > 0) {
494 // Flush the write batch.
495 s = FlushWriteBatchToDBInternal(!kPrepared);
496 if (!s.ok()) {
497 return s;
498 }
499 }
500
501 if (!trailing_batch) {
502 if (flushed_save_points_ == nullptr) {
503 flushed_save_points_.reset(
504 new autovector<WriteUnpreparedTxn::SavePoint>());
505 }
506 flushed_save_points_->emplace_back(
507 unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot()));
508 }
509
510 prev_boundary = curr_boundary;
511 const bool kClear = true;
512 TransactionBaseImpl::InitWriteBatch(kClear);
513 }
514
515 unflushed_save_points_->clear();
516 return Status::OK();
517}
518
11fdf7f2
TL
519Status WriteUnpreparedTxn::PrepareInternal() {
520 const bool kPrepared = true;
521 return FlushWriteBatchToDB(kPrepared);
522}
523
524Status WriteUnpreparedTxn::CommitWithoutPrepareInternal() {
525 if (unprep_seqs_.empty()) {
526 assert(log_number_ == 0);
527 assert(GetId() == 0);
528 return WritePreparedTxn::CommitWithoutPrepareInternal();
529 }
530
531 // TODO(lth): We should optimize commit without prepare to not perform
532 // a prepare under the hood.
533 auto s = PrepareInternal();
534 if (!s.ok()) {
535 return s;
536 }
537 return CommitInternal();
538}
539
540Status WriteUnpreparedTxn::CommitInternal() {
541 // TODO(lth): Reduce duplicate code with WritePrepared commit logic.
542
543 // We take the commit-time batch and append the Commit marker. The Memtable
544 // will ignore the Commit marker in non-recovery mode
545 WriteBatch* working_batch = GetCommitTimeWriteBatch();
546 const bool empty = working_batch->Count() == 0;
20effc67
TL
547 auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
548 assert(s.ok());
11fdf7f2
TL
549
550 const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
551 if (!empty && for_recovery) {
552 // When not writing to memtable, we can still cache the latest write batch.
553 // The cached batch will be written to memtable in WriteRecoverableState
554 // during FlushMemTable
555 WriteBatchInternal::SetAsLastestPersistentState(working_batch);
556 }
557
558 const bool includes_data = !empty && !for_recovery;
559 size_t commit_batch_cnt = 0;
560 if (UNLIKELY(includes_data)) {
561 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
562 "Duplicate key overhead");
563 SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
20effc67 564 s = working_batch->Iterate(&counter);
11fdf7f2
TL
565 assert(s.ok());
566 commit_batch_cnt = counter.BatchCount();
567 }
568 const bool disable_memtable = !includes_data;
569 const bool do_one_write =
570 !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
f67539c2 571
11fdf7f2 572 WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
f67539c2
TL
573 wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt);
574 const bool kFirstPrepareBatch = true;
575 AddPreparedCallback add_prepared_callback(
576 wpt_db_, db_impl_, commit_batch_cnt,
577 db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
578 PreReleaseCallback* pre_release_callback;
579 if (do_one_write) {
580 pre_release_callback = &update_commit_map;
581 } else {
582 pre_release_callback = &add_prepared_callback;
583 }
11fdf7f2 584 uint64_t seq_used = kMaxSequenceNumber;
f67539c2
TL
585 // Since the prepared batch is directly written to memtable, there is
586 // already a connection between the memtable and its WAL, so there is no
587 // need to redundantly reference the log that contains the prepared data.
11fdf7f2
TL
588 const uint64_t zero_log_number = 0ull;
589 size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
20effc67
TL
590 s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
591 zero_log_number, disable_memtable, &seq_used,
592 batch_cnt, pre_release_callback);
11fdf7f2 593 assert(!s.ok() || seq_used != kMaxSequenceNumber);
f67539c2 594 const SequenceNumber commit_batch_seq = seq_used;
11fdf7f2
TL
595 if (LIKELY(do_one_write || !s.ok())) {
596 if (LIKELY(s.ok())) {
597 // Note RemovePrepared should be called after WriteImpl that publishsed
598 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
599 for (const auto& seq : unprep_seqs_) {
600 wpt_db_->RemovePrepared(seq.first, seq.second);
601 }
602 }
f67539c2
TL
603 if (UNLIKELY(!do_one_write)) {
604 wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
605 }
11fdf7f2 606 unprep_seqs_.clear();
f67539c2
TL
607 flushed_save_points_.reset(nullptr);
608 unflushed_save_points_.reset(nullptr);
11fdf7f2
TL
609 return s;
610 } // else do the 2nd write to publish seq
f67539c2
TL
611
612 // Populate unprep_seqs_ with commit_batch_seq, since we treat data in the
613 // commit write batch as just another "unprepared" batch. This will also
614 // update the unprep_seqs_ in the update_commit_map callback.
615 unprep_seqs_[commit_batch_seq] = commit_batch_cnt;
20effc67
TL
616 WriteUnpreparedCommitEntryPreReleaseCallback
617 update_commit_map_with_commit_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
f67539c2 618
11fdf7f2
TL
619 // Note: the 2nd write comes with a performance penality. So if we have too
620 // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
621 // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
622 // two_write_queues should be disabled to avoid many additional writes here.
11fdf7f2 623
f67539c2 624 // Update commit map only from the 2nd queue
11fdf7f2 625 WriteBatch empty_batch;
20effc67
TL
626 s = empty_batch.PutLogData(Slice());
627 assert(s.ok());
11fdf7f2 628 // In the absence of Prepare markers, use Noop as a batch separator
20effc67
TL
629 s = WriteBatchInternal::InsertNoop(&empty_batch);
630 assert(s.ok());
11fdf7f2
TL
631 const bool DISABLE_MEMTABLE = true;
632 const size_t ONE_BATCH = 1;
633 const uint64_t NO_REF_LOG = 0;
634 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
635 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
20effc67 636 &update_commit_map_with_commit_batch);
11fdf7f2
TL
637 assert(!s.ok() || seq_used != kMaxSequenceNumber);
638 // Note RemovePrepared should be called after WriteImpl that publishsed the
639 // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
640 for (const auto& seq : unprep_seqs_) {
641 wpt_db_->RemovePrepared(seq.first, seq.second);
642 }
643 unprep_seqs_.clear();
f67539c2
TL
644 flushed_save_points_.reset(nullptr);
645 unflushed_save_points_.reset(nullptr);
11fdf7f2
TL
646 return s;
647}
648
f67539c2 649Status WriteUnpreparedTxn::WriteRollbackKeys(
20effc67 650 const LockTracker& lock_tracker, WriteBatchWithIndex* rollback_batch,
f67539c2 651 ReadCallback* callback, const ReadOptions& roptions) {
20effc67
TL
652 // This assertion can be removed when range lock is supported.
653 assert(lock_tracker.IsPointLockSupported());
f67539c2
TL
654 const auto& cf_map = *wupt_db_->GetCFHandleMap();
655 auto WriteRollbackKey = [&](const std::string& key, uint32_t cfid) {
656 const auto& cf_handle = cf_map.at(cfid);
657 PinnableSlice pinnable_val;
658 bool not_used;
659 DBImpl::GetImplOptions get_impl_options;
660 get_impl_options.column_family = cf_handle;
661 get_impl_options.value = &pinnable_val;
662 get_impl_options.value_found = &not_used;
663 get_impl_options.callback = callback;
664 auto s = db_impl_->GetImpl(roptions, key, get_impl_options);
665
666 if (s.ok()) {
667 s = rollback_batch->Put(cf_handle, key, pinnable_val);
668 assert(s.ok());
669 } else if (s.IsNotFound()) {
670 s = rollback_batch->Delete(cf_handle, key);
671 assert(s.ok());
672 } else {
673 return s;
674 }
675
676 return Status::OK();
677 };
678
20effc67
TL
679 std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
680 lock_tracker.GetColumnFamilyIterator());
681 assert(cf_it != nullptr);
682 while (cf_it->HasNext()) {
683 ColumnFamilyId cf = cf_it->Next();
684 std::unique_ptr<LockTracker::KeyIterator> key_it(
685 lock_tracker.GetKeyIterator(cf));
686 assert(key_it != nullptr);
687 while (key_it->HasNext()) {
688 const std::string& key = key_it->Next();
689 auto s = WriteRollbackKey(key, cf);
f67539c2
TL
690 if (!s.ok()) {
691 return s;
692 }
693 }
694 }
695
696 for (const auto& cfkey : untracked_keys_) {
697 const auto cfid = cfkey.first;
698 const auto& keys = cfkey.second;
699 for (const auto& key : keys) {
700 auto s = WriteRollbackKey(key, cfid);
701 if (!s.ok()) {
702 return s;
703 }
704 }
705 }
706
707 return Status::OK();
708}
709
11fdf7f2
TL
710Status WriteUnpreparedTxn::RollbackInternal() {
711 // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
712 WriteBatchWithIndex rollback_batch(
713 wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0);
714 assert(GetId() != kMaxSequenceNumber);
715 assert(GetId() > 0);
11fdf7f2 716 Status s;
f67539c2 717 auto read_at_seq = kMaxSequenceNumber;
11fdf7f2 718 ReadOptions roptions;
f67539c2
TL
719 // to prevent callback's seq to be overrriden inside DBImpk::Get
720 roptions.snapshot = wpt_db_->GetMaxSnapshot();
11fdf7f2
TL
721 // Note that we do not use WriteUnpreparedTxnReadCallback because we do not
722 // need to read our own writes when reading prior versions of the key for
723 // rollback.
494da23a 724 WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq);
20effc67
TL
725 // TODO(lth): We write rollback batch all in a single batch here, but this
726 // should be subdivded into multiple batches as well. In phase 2, when key
727 // sets are read from WAL, this will happen naturally.
728 s = WriteRollbackKeys(*tracked_locks_, &rollback_batch, &callback, roptions);
729 if (!s.ok()) {
730 return s;
731 }
11fdf7f2
TL
732
733 // The Rollback marker will be used as a batch separator
20effc67
TL
734 s = WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_);
735 assert(s.ok());
11fdf7f2
TL
736 bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
737 const bool DISABLE_MEMTABLE = true;
738 const uint64_t NO_REF_LOG = 0;
739 uint64_t seq_used = kMaxSequenceNumber;
20effc67
TL
740 // Rollback batch may contain duplicate keys, because tracked_keys_ is not
741 // comparator aware.
742 auto rollback_batch_cnt = rollback_batch.SubBatchCnt();
743 // We commit the rolled back prepared batches. Although this is
11fdf7f2
TL
744 // counter-intuitive, i) it is safe to do so, since the prepared batches are
745 // already canceled out by the rollback batch, ii) adding the commit entry to
746 // CommitCache will allow us to benefit from the existing mechanism in
747 // CommitCache that keeps an entry evicted due to max advance and yet overlaps
748 // with a live snapshot around so that the live snapshot properly skips the
749 // entry even if its prepare seq is lower than max_evicted_seq_.
20effc67
TL
750 //
751 // TODO(lth): RollbackInternal is conceptually very similar to
752 // CommitInternal, with the rollback batch simply taking on the role of
753 // CommitTimeWriteBatch. We should be able to merge the two code paths.
11fdf7f2 754 WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
20effc67 755 wpt_db_, db_impl_, unprep_seqs_, rollback_batch_cnt);
11fdf7f2
TL
756 // Note: the rollback batch does not need AddPrepared since it is written to
757 // DB in one shot. min_uncommitted still works since it requires capturing
20effc67 758 // data that is written to DB but not yet committed, while the rollback
11fdf7f2
TL
759 // batch commits with PreReleaseCallback.
760 s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(),
761 nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE,
20effc67 762 &seq_used, rollback_batch_cnt,
11fdf7f2
TL
763 do_one_write ? &update_commit_map : nullptr);
764 assert(!s.ok() || seq_used != kMaxSequenceNumber);
765 if (!s.ok()) {
766 return s;
767 }
768 if (do_one_write) {
769 for (const auto& seq : unprep_seqs_) {
770 wpt_db_->RemovePrepared(seq.first, seq.second);
771 }
772 unprep_seqs_.clear();
f67539c2
TL
773 flushed_save_points_.reset(nullptr);
774 unflushed_save_points_.reset(nullptr);
11fdf7f2
TL
775 return s;
776 } // else do the 2nd write for commit
20effc67 777
11fdf7f2 778 uint64_t& prepare_seq = seq_used;
20effc67
TL
779 // Populate unprep_seqs_ with rollback_batch_cnt, since we treat data in the
780 // rollback write batch as just another "unprepared" batch. This will also
781 // update the unprep_seqs_ in the update_commit_map callback.
782 unprep_seqs_[prepare_seq] = rollback_batch_cnt;
783 WriteUnpreparedCommitEntryPreReleaseCallback
784 update_commit_map_with_rollback_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
785
11fdf7f2
TL
786 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
787 "RollbackInternal 2nd write prepare_seq: %" PRIu64,
788 prepare_seq);
11fdf7f2 789 WriteBatch empty_batch;
20effc67
TL
790 const size_t ONE_BATCH = 1;
791 s = empty_batch.PutLogData(Slice());
792 assert(s.ok());
11fdf7f2 793 // In the absence of Prepare markers, use Noop as a batch separator
20effc67
TL
794 s = WriteBatchInternal::InsertNoop(&empty_batch);
795 assert(s.ok());
11fdf7f2
TL
796 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
797 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
20effc67 798 &update_commit_map_with_rollback_batch);
11fdf7f2
TL
799 assert(!s.ok() || seq_used != kMaxSequenceNumber);
800 // Mark the txn as rolled back
11fdf7f2 801 if (s.ok()) {
11fdf7f2
TL
802 for (const auto& seq : unprep_seqs_) {
803 wpt_db_->RemovePrepared(seq.first, seq.second);
804 }
805 }
806
807 unprep_seqs_.clear();
f67539c2
TL
808 flushed_save_points_.reset(nullptr);
809 unflushed_save_points_.reset(nullptr);
810 return s;
811}
812
813void WriteUnpreparedTxn::Clear() {
814 if (!recovered_txn_) {
20effc67 815 txn_db_impl_->UnLock(this, *tracked_locks_);
f67539c2
TL
816 }
817 unprep_seqs_.clear();
818 flushed_save_points_.reset(nullptr);
819 unflushed_save_points_.reset(nullptr);
820 recovered_txn_ = false;
821 largest_validated_seq_ = 0;
822 assert(active_iterators_.empty());
823 active_iterators_.clear();
824 untracked_keys_.clear();
825 TransactionBaseImpl::Clear();
826}
827
828void WriteUnpreparedTxn::SetSavePoint() {
829 assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
830 (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
831 (save_points_ ? save_points_->size() : 0));
832 PessimisticTransaction::SetSavePoint();
833 if (unflushed_save_points_ == nullptr) {
834 unflushed_save_points_.reset(new autovector<size_t>());
835 }
836 unflushed_save_points_->push_back(write_batch_.GetDataSize());
837}
838
839Status WriteUnpreparedTxn::RollbackToSavePoint() {
840 assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
841 (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
842 (save_points_ ? save_points_->size() : 0));
843 if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
844 Status s = PessimisticTransaction::RollbackToSavePoint();
845 assert(!s.IsNotFound());
846 unflushed_save_points_->pop_back();
847 return s;
848 }
849
850 if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
851 return RollbackToSavePointInternal();
852 }
853
854 return Status::NotFound();
855}
856
857Status WriteUnpreparedTxn::RollbackToSavePointInternal() {
858 Status s;
859
860 const bool kClear = true;
861 TransactionBaseImpl::InitWriteBatch(kClear);
862
863 assert(flushed_save_points_->size() > 0);
864 WriteUnpreparedTxn::SavePoint& top = flushed_save_points_->back();
865
866 assert(save_points_ != nullptr && save_points_->size() > 0);
20effc67 867 const LockTracker& tracked_keys = *save_points_->top().new_locks_;
f67539c2
TL
868
869 ReadOptions roptions;
870 roptions.snapshot = top.snapshot_->snapshot();
871 SequenceNumber min_uncommitted =
20effc67 872 static_cast_with_check<const SnapshotImpl>(roptions.snapshot)
f67539c2
TL
873 ->min_uncommitted_;
874 SequenceNumber snap_seq = roptions.snapshot->GetSequenceNumber();
875 WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
876 top.unprep_seqs_,
877 kBackedByDBSnapshot);
20effc67
TL
878 s = WriteRollbackKeys(tracked_keys, &write_batch_, &callback, roptions);
879 if (!s.ok()) {
880 return s;
881 }
f67539c2
TL
882
883 const bool kPrepared = true;
884 s = FlushWriteBatchToDBInternal(!kPrepared);
f67539c2
TL
885 if (!s.ok()) {
886 return s;
887 }
888
889 // PessimisticTransaction::RollbackToSavePoint will call also call
890 // RollbackToSavepoint on write_batch_. However, write_batch_ is empty and has
891 // no savepoints because this savepoint has already been flushed. Work around
892 // this by setting a fake savepoint.
893 write_batch_.SetSavePoint();
894 s = PessimisticTransaction::RollbackToSavePoint();
895 assert(s.ok());
896 if (!s.ok()) {
897 return s;
898 }
899
900 flushed_save_points_->pop_back();
11fdf7f2
TL
901 return s;
902}
903
f67539c2
TL
904Status WriteUnpreparedTxn::PopSavePoint() {
905 assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
906 (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
907 (save_points_ ? save_points_->size() : 0));
908 if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
909 Status s = PessimisticTransaction::PopSavePoint();
910 assert(!s.IsNotFound());
911 unflushed_save_points_->pop_back();
912 return s;
913 }
914
915 if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
916 // PessimisticTransaction::PopSavePoint will call also call PopSavePoint on
917 // write_batch_. However, write_batch_ is empty and has no savepoints
918 // because this savepoint has already been flushed. Work around this by
919 // setting a fake savepoint.
920 write_batch_.SetSavePoint();
921 Status s = PessimisticTransaction::PopSavePoint();
922 assert(!s.IsNotFound());
923 flushed_save_points_->pop_back();
924 return s;
925 }
926
927 return Status::NotFound();
928}
929
930void WriteUnpreparedTxn::MultiGet(const ReadOptions& options,
931 ColumnFamilyHandle* column_family,
932 const size_t num_keys, const Slice* keys,
933 PinnableSlice* values, Status* statuses,
934 const bool sorted_input) {
935 SequenceNumber min_uncommitted, snap_seq;
936 const SnapshotBackup backed_by_snapshot =
937 wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
938 WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
939 unprep_seqs_, backed_by_snapshot);
940 write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
941 keys, values, statuses, sorted_input,
942 &callback);
943 if (UNLIKELY(!callback.valid() ||
944 !wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
945 wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
946 for (size_t i = 0; i < num_keys; i++) {
947 statuses[i] = Status::TryAgain();
948 }
949 }
950}
951
11fdf7f2
TL
952Status WriteUnpreparedTxn::Get(const ReadOptions& options,
953 ColumnFamilyHandle* column_family,
954 const Slice& key, PinnableSlice* value) {
f67539c2
TL
955 SequenceNumber min_uncommitted, snap_seq;
956 const SnapshotBackup backed_by_snapshot =
957 wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
958 WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
959 unprep_seqs_, backed_by_snapshot);
960 auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
961 value, &callback);
962 if (LIKELY(callback.valid() &&
963 wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
964 return res;
965 } else {
966 wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
967 return Status::TryAgain();
11fdf7f2 968 }
f67539c2 969}
11fdf7f2 970
f67539c2
TL
971namespace {
972static void CleanupWriteUnpreparedWBWIIterator(void* arg1, void* arg2) {
973 auto txn = reinterpret_cast<WriteUnpreparedTxn*>(arg1);
974 auto iter = reinterpret_cast<Iterator*>(arg2);
975 txn->RemoveActiveIterator(iter);
11fdf7f2 976}
f67539c2 977} // anonymous namespace
11fdf7f2
TL
978
979Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) {
980 return GetIterator(options, wupt_db_->DefaultColumnFamily());
981}
982
983Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
984 ColumnFamilyHandle* column_family) {
985 // Make sure to get iterator from WriteUnprepareTxnDB, not the root db.
986 Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
987 assert(db_iter);
988
f67539c2
TL
989 auto iter = write_batch_.NewIteratorWithBase(column_family, db_iter);
990 active_iterators_.push_back(iter);
991 iter->RegisterCleanup(CleanupWriteUnpreparedWBWIIterator, this, iter);
992 return iter;
993}
994
995Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
996 const Slice& key,
997 SequenceNumber* tracked_at_seq) {
998 // TODO(lth): Reduce duplicate code with WritePrepared ValidateSnapshot logic.
999 assert(snapshot_);
1000
1001 SequenceNumber min_uncommitted =
20effc67 1002 static_cast_with_check<const SnapshotImpl>(snapshot_.get())
f67539c2
TL
1003 ->min_uncommitted_;
1004 SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
1005 // tracked_at_seq is either max or the last snapshot with which this key was
1006 // trackeed so there is no need to apply the IsInSnapshot to this comparison
1007 // here as tracked_at_seq is not a prepare seq.
1008 if (*tracked_at_seq <= snap_seq) {
1009 // If the key has been previous validated at a sequence number earlier
1010 // than the curent snapshot's sequence number, we already know it has not
1011 // been modified.
1012 return Status::OK();
1013 }
1014
1015 *tracked_at_seq = snap_seq;
1016
1017 ColumnFamilyHandle* cfh =
1018 column_family ? column_family : db_impl_->DefaultColumnFamily();
1019
1020 WriteUnpreparedTxnReadCallback snap_checker(
1021 wupt_db_, snap_seq, min_uncommitted, unprep_seqs_, kBackedByDBSnapshot);
1022 return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
1023 snap_seq, false /* cache_only */,
1024 &snap_checker, min_uncommitted);
11fdf7f2
TL
1025}
1026
1027const std::map<SequenceNumber, size_t>&
1028WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() {
1029 return unprep_seqs_;
1030}
1031
f67539c2 1032} // namespace ROCKSDB_NAMESPACE
11fdf7f2
TL
1033
1034#endif // ROCKSDB_LITE