1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/pessimistic_transaction.h"
15 #include "db/column_family.h"
16 #include "db/db_impl/db_impl.h"
17 #include "rocksdb/comparator.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/snapshot.h"
20 #include "rocksdb/status.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "test_util/sync_point.h"
23 #include "util/cast_util.h"
24 #include "util/string_util.h"
25 #include "utilities/transactions/pessimistic_transaction_db.h"
26 #include "utilities/transactions/transaction_util.h"
28 namespace ROCKSDB_NAMESPACE
{
32 std::atomic
<TransactionID
> PessimisticTransaction::txn_id_counter_(1);
34 TransactionID
PessimisticTransaction::GenTxnID() {
35 return txn_id_counter_
.fetch_add(1);
38 PessimisticTransaction::PessimisticTransaction(
39 TransactionDB
* txn_db
, const WriteOptions
& write_options
,
40 const TransactionOptions
& txn_options
, const bool init
)
41 : TransactionBaseImpl(
42 txn_db
->GetRootDB(), write_options
,
43 static_cast_with_check
<PessimisticTransactionDB
>(txn_db
)
44 ->GetLockTrackerFactory()),
45 txn_db_impl_(nullptr),
49 waiting_key_(nullptr),
51 deadlock_detect_(false),
52 deadlock_detect_depth_(0),
53 skip_concurrency_control_(false) {
54 txn_db_impl_
= static_cast_with_check
<PessimisticTransactionDB
>(txn_db
);
55 db_impl_
= static_cast_with_check
<DBImpl
>(db_
);
57 Initialize(txn_options
);
61 void PessimisticTransaction::Initialize(const TransactionOptions
& txn_options
) {
66 deadlock_detect_
= txn_options
.deadlock_detect
;
67 deadlock_detect_depth_
= txn_options
.deadlock_detect_depth
;
68 write_batch_
.SetMaxBytes(txn_options
.max_write_batch_size
);
69 skip_concurrency_control_
= txn_options
.skip_concurrency_control
;
71 lock_timeout_
= txn_options
.lock_timeout
* 1000;
72 if (lock_timeout_
< 0) {
73 // Lock timeout not set, use default
75 txn_db_impl_
->GetTxnDBOptions().transaction_lock_timeout
* 1000;
78 if (txn_options
.expiration
>= 0) {
79 expiration_time_
= start_time_
+ txn_options
.expiration
* 1000;
84 if (txn_options
.set_snapshot
) {
88 if (expiration_time_
> 0) {
89 txn_db_impl_
->InsertExpirableTransaction(txn_id_
, this);
91 use_only_the_last_commit_time_batch_for_recovery_
=
92 txn_options
.use_only_the_last_commit_time_batch_for_recovery
;
93 skip_prepare_
= txn_options
.skip_prepare
;
96 PessimisticTransaction::~PessimisticTransaction() {
97 txn_db_impl_
->UnLock(this, *tracked_locks_
);
98 if (expiration_time_
> 0) {
99 txn_db_impl_
->RemoveExpirableTransaction(txn_id_
);
101 if (!name_
.empty() && txn_state_
!= COMMITTED
) {
102 txn_db_impl_
->UnregisterTransaction(this);
106 void PessimisticTransaction::Clear() {
107 txn_db_impl_
->UnLock(this, *tracked_locks_
);
108 TransactionBaseImpl::Clear();
111 void PessimisticTransaction::Reinitialize(
112 TransactionDB
* txn_db
, const WriteOptions
& write_options
,
113 const TransactionOptions
& txn_options
) {
114 if (!name_
.empty() && txn_state_
!= COMMITTED
) {
115 txn_db_impl_
->UnregisterTransaction(this);
117 TransactionBaseImpl::Reinitialize(txn_db
->GetRootDB(), write_options
);
118 Initialize(txn_options
);
121 bool PessimisticTransaction::IsExpired() const {
122 if (expiration_time_
> 0) {
123 if (db_
->GetEnv()->NowMicros() >= expiration_time_
) {
124 // Transaction is expired.
132 WriteCommittedTxn::WriteCommittedTxn(TransactionDB
* txn_db
,
133 const WriteOptions
& write_options
,
134 const TransactionOptions
& txn_options
)
135 : PessimisticTransaction(txn_db
, write_options
, txn_options
){};
137 Status
PessimisticTransaction::CommitBatch(WriteBatch
* batch
) {
138 std::unique_ptr
<LockTracker
> keys_to_unlock(lock_tracker_factory_
.Create());
139 Status s
= LockBatch(batch
, keys_to_unlock
.get());
145 bool can_commit
= false;
148 s
= Status::Expired();
149 } else if (expiration_time_
> 0) {
150 TransactionState expected
= STARTED
;
151 can_commit
= std::atomic_compare_exchange_strong(&txn_state_
, &expected
,
153 } else if (txn_state_
== STARTED
) {
154 // lock stealing is not a concern
159 txn_state_
.store(AWAITING_COMMIT
);
160 s
= CommitBatchInternal(batch
);
162 txn_state_
.store(COMMITTED
);
164 } else if (txn_state_
== LOCKS_STOLEN
) {
165 s
= Status::Expired();
167 s
= Status::InvalidArgument("Transaction is not in state for commit.");
170 txn_db_impl_
->UnLock(this, *keys_to_unlock
);
175 Status
PessimisticTransaction::Prepare() {
178 return Status::InvalidArgument(
179 "Cannot prepare a transaction that has not been named.");
183 return Status::Expired();
187 bool can_prepare
= false;
189 if (expiration_time_
> 0) {
190 // must concern ourselves with expiraton and/or lock stealing
191 // need to compare/exchange bc locks could be stolen under us here
192 TransactionState expected
= STARTED
;
193 can_prepare
= std::atomic_compare_exchange_strong(&txn_state_
, &expected
,
195 } else if (txn_state_
== STARTED
) {
196 // expiration and lock stealing is not possible
197 txn_state_
.store(AWAITING_PREPARE
);
202 // transaction can't expire after preparation
203 expiration_time_
= 0;
204 assert(log_number_
== 0 ||
205 txn_db_impl_
->GetTxnDBOptions().write_policy
== WRITE_UNPREPARED
);
207 s
= PrepareInternal();
209 txn_state_
.store(PREPARED
);
211 } else if (txn_state_
== LOCKS_STOLEN
) {
212 s
= Status::Expired();
213 } else if (txn_state_
== PREPARED
) {
214 s
= Status::InvalidArgument("Transaction has already been prepared.");
215 } else if (txn_state_
== COMMITTED
) {
216 s
= Status::InvalidArgument("Transaction has already been committed.");
217 } else if (txn_state_
== ROLLEDBACK
) {
218 s
= Status::InvalidArgument("Transaction has already been rolledback.");
220 s
= Status::InvalidArgument("Transaction is not in state for commit.");
226 Status
WriteCommittedTxn::PrepareInternal() {
227 WriteOptions write_options
= write_options_
;
228 write_options
.disableWAL
= false;
229 auto s
= WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
232 class MarkLogCallback
: public PreReleaseCallback
{
234 MarkLogCallback(DBImpl
* db
, bool two_write_queues
)
235 : db_(db
), two_write_queues_(two_write_queues
) {
236 (void)two_write_queues_
; // to silence unused private field warning
238 virtual Status
Callback(SequenceNumber
, bool is_mem_disabled
,
239 uint64_t log_number
, size_t /*index*/,
240 size_t /*total*/) override
{
242 (void)is_mem_disabled
;
244 assert(log_number
!= 0);
245 assert(!two_write_queues_
|| is_mem_disabled
); // implies the 2nd queue
246 db_
->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number
);
252 bool two_write_queues_
;
253 } mark_log_callback(db_impl_
,
254 db_impl_
->immutable_db_options().two_write_queues
);
256 WriteCallback
* const kNoWriteCallback
= nullptr;
257 const uint64_t kRefNoLog
= 0;
258 const bool kDisableMemtable
= true;
259 SequenceNumber
* const KIgnoreSeqUsed
= nullptr;
260 const size_t kNoBatchCount
= 0;
261 s
= db_impl_
->WriteImpl(write_options
, GetWriteBatch()->GetWriteBatch(),
262 kNoWriteCallback
, &log_number_
, kRefNoLog
,
263 kDisableMemtable
, KIgnoreSeqUsed
, kNoBatchCount
,
268 Status
PessimisticTransaction::Commit() {
269 bool commit_without_prepare
= false;
270 bool commit_prepared
= false;
273 return Status::Expired();
276 if (expiration_time_
> 0) {
277 // we must atomicaly compare and exchange the state here because at
278 // this state in the transaction it is possible for another thread
279 // to change our state out from under us in the even that we expire and have
280 // our locks stolen. In this case the only valid state is STARTED because
281 // a state of PREPARED would have a cleared expiration_time_.
282 TransactionState expected
= STARTED
;
283 commit_without_prepare
= std::atomic_compare_exchange_strong(
284 &txn_state_
, &expected
, AWAITING_COMMIT
);
285 TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
286 } else if (txn_state_
== PREPARED
) {
287 // expiration and lock stealing is not a concern
288 commit_prepared
= true;
289 } else if (txn_state_
== STARTED
) {
290 // expiration and lock stealing is not a concern
292 commit_without_prepare
= true;
294 return Status::TxnNotPrepared();
299 if (commit_without_prepare
) {
300 assert(!commit_prepared
);
301 if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
302 s
= Status::InvalidArgument(
303 "Commit-time batch contains values that will not be committed.");
305 txn_state_
.store(AWAITING_COMMIT
);
306 if (log_number_
> 0) {
307 dbimpl_
->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
310 s
= CommitWithoutPrepareInternal();
311 if (!name_
.empty()) {
312 txn_db_impl_
->UnregisterTransaction(this);
316 txn_state_
.store(COMMITTED
);
319 } else if (commit_prepared
) {
320 txn_state_
.store(AWAITING_COMMIT
);
322 s
= CommitInternal();
325 ROCKS_LOG_WARN(db_impl_
->immutable_db_options().info_log
,
326 "Commit write failed");
330 // FindObsoleteFiles must now look to the memtables
331 // to determine what prep logs must be kept around,
332 // not the prep section heap.
333 assert(log_number_
> 0);
334 dbimpl_
->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
336 txn_db_impl_
->UnregisterTransaction(this);
339 txn_state_
.store(COMMITTED
);
340 } else if (txn_state_
== LOCKS_STOLEN
) {
341 s
= Status::Expired();
342 } else if (txn_state_
== COMMITTED
) {
343 s
= Status::InvalidArgument("Transaction has already been committed.");
344 } else if (txn_state_
== ROLLEDBACK
) {
345 s
= Status::InvalidArgument("Transaction has already been rolledback.");
347 s
= Status::InvalidArgument("Transaction is not in state for commit.");
353 Status
WriteCommittedTxn::CommitWithoutPrepareInternal() {
354 uint64_t seq_used
= kMaxSequenceNumber
;
356 db_impl_
->WriteImpl(write_options_
, GetWriteBatch()->GetWriteBatch(),
357 /*callback*/ nullptr, /*log_used*/ nullptr,
358 /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used
);
359 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
366 Status
WriteCommittedTxn::CommitBatchInternal(WriteBatch
* batch
, size_t) {
367 uint64_t seq_used
= kMaxSequenceNumber
;
368 auto s
= db_impl_
->WriteImpl(write_options_
, batch
, /*callback*/ nullptr,
369 /*log_used*/ nullptr, /*log_ref*/ 0,
370 /*disable_memtable*/ false, &seq_used
);
371 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
378 Status
WriteCommittedTxn::CommitInternal() {
379 // We take the commit-time batch and append the Commit marker.
380 // The Memtable will ignore the Commit marker in non-recovery mode
381 WriteBatch
* working_batch
= GetCommitTimeWriteBatch();
382 auto s
= WriteBatchInternal::MarkCommit(working_batch
, name_
);
385 // any operations appended to this working_batch will be ignored from WAL
386 working_batch
->MarkWalTerminationPoint();
388 // insert prepared batch into Memtable only skipping WAL.
389 // Memtable will ignore BeginPrepare/EndPrepare markers
390 // in non recovery mode and simply insert the values
391 s
= WriteBatchInternal::Append(working_batch
,
392 GetWriteBatch()->GetWriteBatch());
395 uint64_t seq_used
= kMaxSequenceNumber
;
396 s
= db_impl_
->WriteImpl(write_options_
, working_batch
, /*callback*/ nullptr,
397 /*log_used*/ nullptr, /*log_ref*/ log_number_
,
398 /*disable_memtable*/ false, &seq_used
);
399 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
406 Status
PessimisticTransaction::Rollback() {
408 if (txn_state_
== PREPARED
) {
409 txn_state_
.store(AWAITING_ROLLBACK
);
411 s
= RollbackInternal();
414 // we do not need to keep our prepared section around
415 assert(log_number_
> 0);
416 dbimpl_
->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
419 txn_state_
.store(ROLLEDBACK
);
421 } else if (txn_state_
== STARTED
) {
422 if (log_number_
> 0) {
423 assert(txn_db_impl_
->GetTxnDBOptions().write_policy
== WRITE_UNPREPARED
);
425 s
= RollbackInternal();
428 dbimpl_
->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
432 // prepare couldn't have taken place
434 } else if (txn_state_
== COMMITTED
) {
435 s
= Status::InvalidArgument("This transaction has already been committed.");
437 s
= Status::InvalidArgument(
438 "Two phase transaction is not in state for rollback.");
444 Status
WriteCommittedTxn::RollbackInternal() {
445 WriteBatch rollback_marker
;
446 auto s
= WriteBatchInternal::MarkRollback(&rollback_marker
, name_
);
448 s
= db_impl_
->WriteImpl(write_options_
, &rollback_marker
);
452 Status
PessimisticTransaction::RollbackToSavePoint() {
453 if (txn_state_
!= STARTED
) {
454 return Status::InvalidArgument("Transaction is beyond state for rollback.");
457 if (save_points_
!= nullptr && !save_points_
->empty()) {
458 // Unlock any keys locked since last transaction
459 auto& save_point_tracker
= *save_points_
->top().new_locks_
;
460 std::unique_ptr
<LockTracker
> t(
461 tracked_locks_
->GetTrackedLocksSinceSavePoint(save_point_tracker
));
463 txn_db_impl_
->UnLock(this, *t
);
467 return TransactionBaseImpl::RollbackToSavePoint();
470 // Lock all keys in this batch.
471 // On success, caller should unlock keys_to_unlock
472 Status
PessimisticTransaction::LockBatch(WriteBatch
* batch
,
473 LockTracker
* keys_to_unlock
) {
474 class Handler
: public WriteBatch::Handler
{
476 // Sorted map of column_family_id to sorted set of keys.
477 // Since LockBatch() always locks keys in sorted order, it cannot deadlock
478 // with itself. We're not using a comparator here since it doesn't matter
479 // what the sorting is as long as it's consistent.
480 std::map
<uint32_t, std::set
<std::string
>> keys_
;
484 void RecordKey(uint32_t column_family_id
, const Slice
& key
) {
485 std::string key_str
= key
.ToString();
487 auto& cfh_keys
= keys_
[column_family_id
];
488 auto iter
= cfh_keys
.find(key_str
);
489 if (iter
== cfh_keys
.end()) {
490 // key not yet seen, store it.
491 cfh_keys
.insert({std::move(key_str
)});
495 Status
PutCF(uint32_t column_family_id
, const Slice
& key
,
496 const Slice
& /* unused */) override
{
497 RecordKey(column_family_id
, key
);
500 Status
MergeCF(uint32_t column_family_id
, const Slice
& key
,
501 const Slice
& /* unused */) override
{
502 RecordKey(column_family_id
, key
);
505 Status
DeleteCF(uint32_t column_family_id
, const Slice
& key
) override
{
506 RecordKey(column_family_id
, key
);
511 // Iterating on this handler will add all keys in this batch into keys
513 Status s
= batch
->Iterate(&handler
);
518 // Attempt to lock all keys
519 for (const auto& cf_iter
: handler
.keys_
) {
520 uint32_t cfh_id
= cf_iter
.first
;
521 auto& cfh_keys
= cf_iter
.second
;
523 for (const auto& key_iter
: cfh_keys
) {
524 const std::string
& key
= key_iter
;
526 s
= txn_db_impl_
->TryLock(this, cfh_id
, key
, true /* exclusive */);
531 r
.column_family_id
= cfh_id
;
533 r
.seq
= kMaxSequenceNumber
;
536 keys_to_unlock
->Track(r
);
545 txn_db_impl_
->UnLock(this, *keys_to_unlock
);
551 // Attempt to lock this key.
552 // Returns OK if the key has been successfully locked. Non-ok, otherwise.
553 // If check_shapshot is true and this transaction has a snapshot set,
554 // this key will only be locked if there have been no writes to this key since
555 // the snapshot time.
556 Status
PessimisticTransaction::TryLock(ColumnFamilyHandle
* column_family
,
557 const Slice
& key
, bool read_only
,
558 bool exclusive
, const bool do_validate
,
559 const bool assume_tracked
) {
560 assert(!assume_tracked
|| !do_validate
);
562 if (UNLIKELY(skip_concurrency_control_
)) {
565 uint32_t cfh_id
= GetColumnFamilyID(column_family
);
566 std::string key_str
= key
.ToString();
567 PointLockStatus status
= tracked_locks_
->GetPointLockStatus(cfh_id
, key_str
);
568 bool previously_locked
= status
.locked
;
569 bool lock_upgrade
= previously_locked
&& exclusive
&& !status
.exclusive
;
571 // Lock this key if this transactions hasn't already locked it or we require
573 if (!previously_locked
|| lock_upgrade
) {
574 s
= txn_db_impl_
->TryLock(this, cfh_id
, key_str
, exclusive
);
577 SetSnapshotIfNeeded();
579 // Even though we do not care about doing conflict checking for this write,
580 // we still need to take a lock to make sure we do not cause a conflict with
581 // some other write. However, we do not need to check if there have been
582 // any writes since this transaction's snapshot.
583 // TODO(agiardullo): could optimize by supporting shared txn locks in the
585 SequenceNumber tracked_at_seq
=
586 status
.locked
? status
.seq
: kMaxSequenceNumber
;
587 if (!do_validate
|| snapshot_
== nullptr) {
588 if (assume_tracked
&& !previously_locked
) {
589 s
= Status::InvalidArgument(
590 "assume_tracked is set but it is not tracked yet");
592 // Need to remember the earliest sequence number that we know that this
593 // key has not been modified after. This is useful if this same
595 // later tries to lock this key again.
596 if (tracked_at_seq
== kMaxSequenceNumber
) {
597 // Since we haven't checked a snapshot, we only know this key has not
598 // been modified since after we locked it.
599 // Note: when last_seq_same_as_publish_seq_==false this is less than the
600 // latest allocated seq but it is ok since i) this is just a heuristic
601 // used only as a hint to avoid actual check for conflicts, ii) this would
602 // cause a false positive only if the snapthot is taken right after the
603 // lock, which would be an unusual sequence.
604 tracked_at_seq
= db_
->GetLatestSequenceNumber();
607 // If a snapshot is set, we need to make sure the key hasn't been modified
608 // since the snapshot. This must be done after we locked the key.
609 // If we already have validated an earilier snapshot it must has been
610 // reflected in tracked_at_seq and ValidateSnapshot will return OK.
612 s
= ValidateSnapshot(column_family
, key
, &tracked_at_seq
);
615 // Failed to validate key
616 // Unlock key we just locked
618 s
= txn_db_impl_
->TryLock(this, cfh_id
, key_str
,
619 false /* exclusive */);
621 } else if (!previously_locked
) {
622 txn_db_impl_
->UnLock(this, cfh_id
, key
.ToString());
629 // We must track all the locked keys so that we can unlock them later. If
630 // the key is already locked, this func will update some stats on the
631 // tracked key. It could also update the tracked_at_seq if it is lower
632 // than the existing tracked key seq. These stats are necessary for
633 // RollbackToSavePoint to determine whether a key can be safely removed
634 // from tracked_keys_. Removal can only be done if a key was only locked
635 // during the current savepoint.
637 // Recall that if assume_tracked is true, we assume that TrackKey has been
638 // called previously since the last savepoint, with the same exclusive
639 // setting, and at a lower sequence number, so skipping here should be
641 if (!assume_tracked
) {
642 TrackKey(cfh_id
, key_str
, tracked_at_seq
, read_only
, exclusive
);
645 PointLockStatus lock_status
=
646 tracked_locks_
->GetPointLockStatus(cfh_id
, key_str
);
647 assert(lock_status
.locked
);
648 assert(lock_status
.seq
<= tracked_at_seq
);
649 assert(lock_status
.exclusive
== exclusive
);
657 // Return OK() if this key has not been modified more recently than the
658 // transaction snapshot_.
659 // tracked_at_seq is the global seq at which we either locked the key or already
660 // have done ValidateSnapshot.
661 Status
PessimisticTransaction::ValidateSnapshot(
662 ColumnFamilyHandle
* column_family
, const Slice
& key
,
663 SequenceNumber
* tracked_at_seq
) {
666 SequenceNumber snap_seq
= snapshot_
->GetSequenceNumber();
667 if (*tracked_at_seq
<= snap_seq
) {
668 // If the key has been previous validated (or locked) at a sequence number
669 // earlier than the current snapshot's sequence number, we already know it
670 // has not been modified aftter snap_seq either.
673 // Otherwise we have either
674 // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key
675 // 2: snap_seq < tracked_at_seq: last time we lock the key was via
676 // do_validate=false which means we had skipped ValidateSnapshot. In both
677 // cases we should do ValidateSnapshot now.
679 *tracked_at_seq
= snap_seq
;
681 ColumnFamilyHandle
* cfh
=
682 column_family
? column_family
: db_impl_
->DefaultColumnFamily();
684 return TransactionUtil::CheckKeyForConflicts(
685 db_impl_
, cfh
, key
.ToString(), snap_seq
, false /* cache_only */);
688 bool PessimisticTransaction::TryStealingLocks() {
690 TransactionState expected
= STARTED
;
691 return std::atomic_compare_exchange_strong(&txn_state_
, &expected
,
695 void PessimisticTransaction::UnlockGetForUpdate(
696 ColumnFamilyHandle
* column_family
, const Slice
& key
) {
697 txn_db_impl_
->UnLock(this, GetColumnFamilyID(column_family
), key
.ToString());
700 Status
PessimisticTransaction::SetName(const TransactionName
& name
) {
702 if (txn_state_
== STARTED
) {
703 if (name_
.length()) {
704 s
= Status::InvalidArgument("Transaction has already been named.");
705 } else if (txn_db_impl_
->GetTransactionByName(name
) != nullptr) {
706 s
= Status::InvalidArgument("Transaction name must be unique.");
707 } else if (name
.length() < 1 || name
.length() > 512) {
708 s
= Status::InvalidArgument(
709 "Transaction name length must be between 1 and 512 chars.");
712 txn_db_impl_
->RegisterTransaction(this);
715 s
= Status::InvalidArgument("Transaction is beyond state for naming.");
720 } // namespace ROCKSDB_NAMESPACE
722 #endif // ROCKSDB_LITE