1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/pessimistic_transaction.h"
15 #include "db/column_family.h"
16 #include "db/db_impl/db_impl.h"
17 #include "rocksdb/comparator.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/snapshot.h"
20 #include "rocksdb/status.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "test_util/sync_point.h"
23 #include "util/cast_util.h"
24 #include "util/string_util.h"
25 #include "utilities/transactions/pessimistic_transaction_db.h"
26 #include "utilities/transactions/transaction_util.h"
28 namespace ROCKSDB_NAMESPACE
{
32 std::atomic
<TransactionID
> PessimisticTransaction::txn_id_counter_(1);
34 TransactionID
PessimisticTransaction::GenTxnID() {
35 return txn_id_counter_
.fetch_add(1);
38 PessimisticTransaction::PessimisticTransaction(
39 TransactionDB
* txn_db
, const WriteOptions
& write_options
,
40 const TransactionOptions
& txn_options
, const bool init
)
41 : TransactionBaseImpl(txn_db
->GetRootDB(), write_options
),
42 txn_db_impl_(nullptr),
46 waiting_key_(nullptr),
48 deadlock_detect_(false),
49 deadlock_detect_depth_(0),
50 skip_concurrency_control_(false) {
52 static_cast_with_check
<PessimisticTransactionDB
, TransactionDB
>(txn_db
);
53 db_impl_
= static_cast_with_check
<DBImpl
, DB
>(db_
);
55 Initialize(txn_options
);
59 void PessimisticTransaction::Initialize(const TransactionOptions
& txn_options
) {
64 deadlock_detect_
= txn_options
.deadlock_detect
;
65 deadlock_detect_depth_
= txn_options
.deadlock_detect_depth
;
66 write_batch_
.SetMaxBytes(txn_options
.max_write_batch_size
);
67 skip_concurrency_control_
= txn_options
.skip_concurrency_control
;
69 lock_timeout_
= txn_options
.lock_timeout
* 1000;
70 if (lock_timeout_
< 0) {
71 // Lock timeout not set, use default
73 txn_db_impl_
->GetTxnDBOptions().transaction_lock_timeout
* 1000;
76 if (txn_options
.expiration
>= 0) {
77 expiration_time_
= start_time_
+ txn_options
.expiration
* 1000;
82 if (txn_options
.set_snapshot
) {
86 if (expiration_time_
> 0) {
87 txn_db_impl_
->InsertExpirableTransaction(txn_id_
, this);
89 use_only_the_last_commit_time_batch_for_recovery_
=
90 txn_options
.use_only_the_last_commit_time_batch_for_recovery
;
93 PessimisticTransaction::~PessimisticTransaction() {
94 txn_db_impl_
->UnLock(this, &GetTrackedKeys());
95 if (expiration_time_
> 0) {
96 txn_db_impl_
->RemoveExpirableTransaction(txn_id_
);
98 if (!name_
.empty() && txn_state_
!= COMMITED
) {
99 txn_db_impl_
->UnregisterTransaction(this);
103 void PessimisticTransaction::Clear() {
104 txn_db_impl_
->UnLock(this, &GetTrackedKeys());
105 TransactionBaseImpl::Clear();
108 void PessimisticTransaction::Reinitialize(
109 TransactionDB
* txn_db
, const WriteOptions
& write_options
,
110 const TransactionOptions
& txn_options
) {
111 if (!name_
.empty() && txn_state_
!= COMMITED
) {
112 txn_db_impl_
->UnregisterTransaction(this);
114 TransactionBaseImpl::Reinitialize(txn_db
->GetRootDB(), write_options
);
115 Initialize(txn_options
);
118 bool PessimisticTransaction::IsExpired() const {
119 if (expiration_time_
> 0) {
120 if (db_
->GetEnv()->NowMicros() >= expiration_time_
) {
121 // Transaction is expired.
129 WriteCommittedTxn::WriteCommittedTxn(TransactionDB
* txn_db
,
130 const WriteOptions
& write_options
,
131 const TransactionOptions
& txn_options
)
132 : PessimisticTransaction(txn_db
, write_options
, txn_options
){};
134 Status
PessimisticTransaction::CommitBatch(WriteBatch
* batch
) {
135 TransactionKeyMap keys_to_unlock
;
136 Status s
= LockBatch(batch
, &keys_to_unlock
);
142 bool can_commit
= false;
145 s
= Status::Expired();
146 } else if (expiration_time_
> 0) {
147 TransactionState expected
= STARTED
;
148 can_commit
= std::atomic_compare_exchange_strong(&txn_state_
, &expected
,
150 } else if (txn_state_
== STARTED
) {
151 // lock stealing is not a concern
156 txn_state_
.store(AWAITING_COMMIT
);
157 s
= CommitBatchInternal(batch
);
159 txn_state_
.store(COMMITED
);
161 } else if (txn_state_
== LOCKS_STOLEN
) {
162 s
= Status::Expired();
164 s
= Status::InvalidArgument("Transaction is not in state for commit.");
167 txn_db_impl_
->UnLock(this, &keys_to_unlock
);
172 Status
PessimisticTransaction::Prepare() {
176 return Status::InvalidArgument(
177 "Cannot prepare a transaction that has not been named.");
181 return Status::Expired();
184 bool can_prepare
= false;
186 if (expiration_time_
> 0) {
187 // must concern ourselves with expiraton and/or lock stealing
188 // need to compare/exchange bc locks could be stolen under us here
189 TransactionState expected
= STARTED
;
190 can_prepare
= std::atomic_compare_exchange_strong(&txn_state_
, &expected
,
192 } else if (txn_state_
== STARTED
) {
193 // expiration and lock stealing is not possible
198 txn_state_
.store(AWAITING_PREPARE
);
199 // transaction can't expire after preparation
200 expiration_time_
= 0;
201 assert(log_number_
== 0 ||
202 txn_db_impl_
->GetTxnDBOptions().write_policy
== WRITE_UNPREPARED
);
204 s
= PrepareInternal();
206 txn_state_
.store(PREPARED
);
208 } else if (txn_state_
== LOCKS_STOLEN
) {
209 s
= Status::Expired();
210 } else if (txn_state_
== PREPARED
) {
211 s
= Status::InvalidArgument("Transaction has already been prepared.");
212 } else if (txn_state_
== COMMITED
) {
213 s
= Status::InvalidArgument("Transaction has already been committed.");
214 } else if (txn_state_
== ROLLEDBACK
) {
215 s
= Status::InvalidArgument("Transaction has already been rolledback.");
217 s
= Status::InvalidArgument("Transaction is not in state for commit.");
223 Status
WriteCommittedTxn::PrepareInternal() {
224 WriteOptions write_options
= write_options_
;
225 write_options
.disableWAL
= false;
226 WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_
);
227 class MarkLogCallback
: public PreReleaseCallback
{
229 MarkLogCallback(DBImpl
* db
, bool two_write_queues
)
230 : db_(db
), two_write_queues_(two_write_queues
) {
231 (void)two_write_queues_
; // to silence unused private field warning
233 virtual Status
Callback(SequenceNumber
, bool is_mem_disabled
,
234 uint64_t log_number
, size_t /*index*/,
235 size_t /*total*/) override
{
237 (void)is_mem_disabled
;
239 assert(log_number
!= 0);
240 assert(!two_write_queues_
|| is_mem_disabled
); // implies the 2nd queue
241 db_
->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number
);
247 bool two_write_queues_
;
248 } mark_log_callback(db_impl_
,
249 db_impl_
->immutable_db_options().two_write_queues
);
251 WriteCallback
* const kNoWriteCallback
= nullptr;
252 const uint64_t kRefNoLog
= 0;
253 const bool kDisableMemtable
= true;
254 SequenceNumber
* const KIgnoreSeqUsed
= nullptr;
255 const size_t kNoBatchCount
= 0;
256 Status s
= db_impl_
->WriteImpl(
257 write_options
, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback
,
258 &log_number_
, kRefNoLog
, kDisableMemtable
, KIgnoreSeqUsed
, kNoBatchCount
,
263 Status
PessimisticTransaction::Commit() {
265 bool commit_without_prepare
= false;
266 bool commit_prepared
= false;
269 return Status::Expired();
272 if (expiration_time_
> 0) {
273 // we must atomicaly compare and exchange the state here because at
274 // this state in the transaction it is possible for another thread
275 // to change our state out from under us in the even that we expire and have
276 // our locks stolen. In this case the only valid state is STARTED because
277 // a state of PREPARED would have a cleared expiration_time_.
278 TransactionState expected
= STARTED
;
279 commit_without_prepare
= std::atomic_compare_exchange_strong(
280 &txn_state_
, &expected
, AWAITING_COMMIT
);
281 TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
282 } else if (txn_state_
== PREPARED
) {
283 // expiration and lock stealing is not a concern
284 commit_prepared
= true;
285 } else if (txn_state_
== STARTED
) {
286 // expiration and lock stealing is not a concern
287 commit_without_prepare
= true;
288 // TODO(myabandeh): what if the user mistakenly forgets prepare? We should
289 // add an option so that the user explictly express the intention of
290 // skipping the prepare phase.
293 if (commit_without_prepare
) {
294 assert(!commit_prepared
);
295 if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
296 s
= Status::InvalidArgument(
297 "Commit-time batch contains values that will not be committed.");
299 txn_state_
.store(AWAITING_COMMIT
);
300 if (log_number_
> 0) {
301 dbimpl_
->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
304 s
= CommitWithoutPrepareInternal();
305 if (!name_
.empty()) {
306 txn_db_impl_
->UnregisterTransaction(this);
310 txn_state_
.store(COMMITED
);
313 } else if (commit_prepared
) {
314 txn_state_
.store(AWAITING_COMMIT
);
316 s
= CommitInternal();
319 ROCKS_LOG_WARN(db_impl_
->immutable_db_options().info_log
,
320 "Commit write failed");
324 // FindObsoleteFiles must now look to the memtables
325 // to determine what prep logs must be kept around,
326 // not the prep section heap.
327 assert(log_number_
> 0);
328 dbimpl_
->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
330 txn_db_impl_
->UnregisterTransaction(this);
333 txn_state_
.store(COMMITED
);
334 } else if (txn_state_
== LOCKS_STOLEN
) {
335 s
= Status::Expired();
336 } else if (txn_state_
== COMMITED
) {
337 s
= Status::InvalidArgument("Transaction has already been committed.");
338 } else if (txn_state_
== ROLLEDBACK
) {
339 s
= Status::InvalidArgument("Transaction has already been rolledback.");
341 s
= Status::InvalidArgument("Transaction is not in state for commit.");
347 Status
WriteCommittedTxn::CommitWithoutPrepareInternal() {
348 uint64_t seq_used
= kMaxSequenceNumber
;
350 db_impl_
->WriteImpl(write_options_
, GetWriteBatch()->GetWriteBatch(),
351 /*callback*/ nullptr, /*log_used*/ nullptr,
352 /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used
);
353 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
360 Status
WriteCommittedTxn::CommitBatchInternal(WriteBatch
* batch
, size_t) {
361 uint64_t seq_used
= kMaxSequenceNumber
;
362 auto s
= db_impl_
->WriteImpl(write_options_
, batch
, /*callback*/ nullptr,
363 /*log_used*/ nullptr, /*log_ref*/ 0,
364 /*disable_memtable*/ false, &seq_used
);
365 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
372 Status
WriteCommittedTxn::CommitInternal() {
373 // We take the commit-time batch and append the Commit marker.
374 // The Memtable will ignore the Commit marker in non-recovery mode
375 WriteBatch
* working_batch
= GetCommitTimeWriteBatch();
376 WriteBatchInternal::MarkCommit(working_batch
, name_
);
378 // any operations appended to this working_batch will be ignored from WAL
379 working_batch
->MarkWalTerminationPoint();
381 // insert prepared batch into Memtable only skipping WAL.
382 // Memtable will ignore BeginPrepare/EndPrepare markers
383 // in non recovery mode and simply insert the values
384 WriteBatchInternal::Append(working_batch
, GetWriteBatch()->GetWriteBatch());
386 uint64_t seq_used
= kMaxSequenceNumber
;
388 db_impl_
->WriteImpl(write_options_
, working_batch
, /*callback*/ nullptr,
389 /*log_used*/ nullptr, /*log_ref*/ log_number_
,
390 /*disable_memtable*/ false, &seq_used
);
391 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
398 Status
PessimisticTransaction::Rollback() {
400 if (txn_state_
== PREPARED
) {
401 txn_state_
.store(AWAITING_ROLLBACK
);
403 s
= RollbackInternal();
406 // we do not need to keep our prepared section around
407 assert(log_number_
> 0);
408 dbimpl_
->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
411 txn_state_
.store(ROLLEDBACK
);
413 } else if (txn_state_
== STARTED
) {
414 if (log_number_
> 0) {
415 assert(txn_db_impl_
->GetTxnDBOptions().write_policy
== WRITE_UNPREPARED
);
417 s
= RollbackInternal();
420 dbimpl_
->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
424 // prepare couldn't have taken place
426 } else if (txn_state_
== COMMITED
) {
427 s
= Status::InvalidArgument("This transaction has already been committed.");
429 s
= Status::InvalidArgument(
430 "Two phase transaction is not in state for rollback.");
436 Status
WriteCommittedTxn::RollbackInternal() {
437 WriteBatch rollback_marker
;
438 WriteBatchInternal::MarkRollback(&rollback_marker
, name_
);
439 auto s
= db_impl_
->WriteImpl(write_options_
, &rollback_marker
);
443 Status
PessimisticTransaction::RollbackToSavePoint() {
444 if (txn_state_
!= STARTED
) {
445 return Status::InvalidArgument("Transaction is beyond state for rollback.");
448 // Unlock any keys locked since last transaction
449 const std::unique_ptr
<TransactionKeyMap
>& keys
=
450 GetTrackedKeysSinceSavePoint();
453 txn_db_impl_
->UnLock(this, keys
.get());
456 return TransactionBaseImpl::RollbackToSavePoint();
459 // Lock all keys in this batch.
460 // On success, caller should unlock keys_to_unlock
461 Status
PessimisticTransaction::LockBatch(WriteBatch
* batch
,
462 TransactionKeyMap
* keys_to_unlock
) {
463 class Handler
: public WriteBatch::Handler
{
465 // Sorted map of column_family_id to sorted set of keys.
466 // Since LockBatch() always locks keys in sorted order, it cannot deadlock
467 // with itself. We're not using a comparator here since it doesn't matter
468 // what the sorting is as long as it's consistent.
469 std::map
<uint32_t, std::set
<std::string
>> keys_
;
473 void RecordKey(uint32_t column_family_id
, const Slice
& key
) {
474 std::string key_str
= key
.ToString();
476 auto& cfh_keys
= keys_
[column_family_id
];
477 auto iter
= cfh_keys
.find(key_str
);
478 if (iter
== cfh_keys
.end()) {
479 // key not yet seen, store it.
480 cfh_keys
.insert({std::move(key_str
)});
484 Status
PutCF(uint32_t column_family_id
, const Slice
& key
,
485 const Slice
& /* unused */) override
{
486 RecordKey(column_family_id
, key
);
489 Status
MergeCF(uint32_t column_family_id
, const Slice
& key
,
490 const Slice
& /* unused */) override
{
491 RecordKey(column_family_id
, key
);
494 Status
DeleteCF(uint32_t column_family_id
, const Slice
& key
) override
{
495 RecordKey(column_family_id
, key
);
500 // Iterating on this handler will add all keys in this batch into keys
502 batch
->Iterate(&handler
);
506 // Attempt to lock all keys
507 for (const auto& cf_iter
: handler
.keys_
) {
508 uint32_t cfh_id
= cf_iter
.first
;
509 auto& cfh_keys
= cf_iter
.second
;
511 for (const auto& key_iter
: cfh_keys
) {
512 const std::string
& key
= key_iter
;
514 s
= txn_db_impl_
->TryLock(this, cfh_id
, key
, true /* exclusive */);
518 TrackKey(keys_to_unlock
, cfh_id
, std::move(key
), kMaxSequenceNumber
,
519 false, true /* exclusive */);
528 txn_db_impl_
->UnLock(this, keys_to_unlock
);
534 // Attempt to lock this key.
535 // Returns OK if the key has been successfully locked. Non-ok, otherwise.
536 // If check_shapshot is true and this transaction has a snapshot set,
537 // this key will only be locked if there have been no writes to this key since
538 // the snapshot time.
539 Status
PessimisticTransaction::TryLock(ColumnFamilyHandle
* column_family
,
540 const Slice
& key
, bool read_only
,
541 bool exclusive
, const bool do_validate
,
542 const bool assume_tracked
) {
543 assert(!assume_tracked
|| !do_validate
);
545 if (UNLIKELY(skip_concurrency_control_
)) {
548 uint32_t cfh_id
= GetColumnFamilyID(column_family
);
549 std::string key_str
= key
.ToString();
550 bool previously_locked
;
551 bool lock_upgrade
= false;
553 // lock this key if this transactions hasn't already locked it
554 SequenceNumber tracked_at_seq
= kMaxSequenceNumber
;
556 const auto& tracked_keys
= GetTrackedKeys();
557 const auto tracked_keys_cf
= tracked_keys
.find(cfh_id
);
558 if (tracked_keys_cf
== tracked_keys
.end()) {
559 previously_locked
= false;
561 auto iter
= tracked_keys_cf
->second
.find(key_str
);
562 if (iter
== tracked_keys_cf
->second
.end()) {
563 previously_locked
= false;
565 if (!iter
->second
.exclusive
&& exclusive
) {
568 previously_locked
= true;
569 tracked_at_seq
= iter
->second
.seq
;
573 // Lock this key if this transactions hasn't already locked it or we require
575 if (!previously_locked
|| lock_upgrade
) {
576 s
= txn_db_impl_
->TryLock(this, cfh_id
, key_str
, exclusive
);
579 SetSnapshotIfNeeded();
581 // Even though we do not care about doing conflict checking for this write,
582 // we still need to take a lock to make sure we do not cause a conflict with
583 // some other write. However, we do not need to check if there have been
584 // any writes since this transaction's snapshot.
585 // TODO(agiardullo): could optimize by supporting shared txn locks in the
587 if (!do_validate
|| snapshot_
== nullptr) {
588 if (assume_tracked
&& !previously_locked
) {
589 s
= Status::InvalidArgument(
590 "assume_tracked is set but it is not tracked yet");
592 // Need to remember the earliest sequence number that we know that this
593 // key has not been modified after. This is useful if this same
595 // later tries to lock this key again.
596 if (tracked_at_seq
== kMaxSequenceNumber
) {
597 // Since we haven't checked a snapshot, we only know this key has not
598 // been modified since after we locked it.
599 // Note: when last_seq_same_as_publish_seq_==false this is less than the
600 // latest allocated seq but it is ok since i) this is just a heuristic
601 // used only as a hint to avoid actual check for conflicts, ii) this would
602 // cause a false positive only if the snapthot is taken right after the
603 // lock, which would be an unusual sequence.
604 tracked_at_seq
= db_
->GetLatestSequenceNumber();
607 // If a snapshot is set, we need to make sure the key hasn't been modified
608 // since the snapshot. This must be done after we locked the key.
609 // If we already have validated an earilier snapshot it must has been
610 // reflected in tracked_at_seq and ValidateSnapshot will return OK.
612 s
= ValidateSnapshot(column_family
, key
, &tracked_at_seq
);
615 // Failed to validate key
616 if (!previously_locked
) {
617 // Unlock key we just locked
619 s
= txn_db_impl_
->TryLock(this, cfh_id
, key_str
,
620 false /* exclusive */);
623 txn_db_impl_
->UnLock(this, cfh_id
, key
.ToString());
631 // We must track all the locked keys so that we can unlock them later. If
632 // the key is already locked, this func will update some stats on the
633 // tracked key. It could also update the tracked_at_seq if it is lower
634 // than the existing tracked key seq. These stats are necessary for
635 // RollbackToSavePoint to determine whether a key can be safely removed
636 // from tracked_keys_. Removal can only be done if a key was only locked
637 // during the current savepoint.
639 // Recall that if assume_tracked is true, we assume that TrackKey has been
640 // called previously since the last savepoint, with the same exclusive
641 // setting, and at a lower sequence number, so skipping here should be
643 if (!assume_tracked
) {
644 TrackKey(cfh_id
, key_str
, tracked_at_seq
, read_only
, exclusive
);
647 assert(tracked_keys_cf
->second
.count(key_str
) > 0);
648 const auto& info
= tracked_keys_cf
->second
.find(key_str
)->second
;
649 assert(info
.seq
<= tracked_at_seq
);
650 assert(info
.exclusive
== exclusive
);
658 // Return OK() if this key has not been modified more recently than the
659 // transaction snapshot_.
660 // tracked_at_seq is the global seq at which we either locked the key or already
661 // have done ValidateSnapshot.
662 Status
PessimisticTransaction::ValidateSnapshot(
663 ColumnFamilyHandle
* column_family
, const Slice
& key
,
664 SequenceNumber
* tracked_at_seq
) {
667 SequenceNumber snap_seq
= snapshot_
->GetSequenceNumber();
668 if (*tracked_at_seq
<= snap_seq
) {
669 // If the key has been previous validated (or locked) at a sequence number
670 // earlier than the current snapshot's sequence number, we already know it
671 // has not been modified aftter snap_seq either.
674 // Otherwise we have either
675 // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key
676 // 2: snap_seq < tracked_at_seq: last time we lock the key was via
677 // do_validate=false which means we had skipped ValidateSnapshot. In both
678 // cases we should do ValidateSnapshot now.
680 *tracked_at_seq
= snap_seq
;
682 ColumnFamilyHandle
* cfh
=
683 column_family
? column_family
: db_impl_
->DefaultColumnFamily();
685 return TransactionUtil::CheckKeyForConflicts(
686 db_impl_
, cfh
, key
.ToString(), snap_seq
, false /* cache_only */);
689 bool PessimisticTransaction::TryStealingLocks() {
691 TransactionState expected
= STARTED
;
692 return std::atomic_compare_exchange_strong(&txn_state_
, &expected
,
696 void PessimisticTransaction::UnlockGetForUpdate(
697 ColumnFamilyHandle
* column_family
, const Slice
& key
) {
698 txn_db_impl_
->UnLock(this, GetColumnFamilyID(column_family
), key
.ToString());
701 Status
PessimisticTransaction::SetName(const TransactionName
& name
) {
703 if (txn_state_
== STARTED
) {
704 if (name_
.length()) {
705 s
= Status::InvalidArgument("Transaction has already been named.");
706 } else if (txn_db_impl_
->GetTransactionByName(name
) != nullptr) {
707 s
= Status::InvalidArgument("Transaction name must be unique.");
708 } else if (name
.length() < 1 || name
.length() > 512) {
709 s
= Status::InvalidArgument(
710 "Transaction name length must be between 1 and 512 chars.");
713 txn_db_impl_
->RegisterTransaction(this);
716 s
= Status::InvalidArgument("Transaction is beyond state for naming.");
721 } // namespace ROCKSDB_NAMESPACE
723 #endif // ROCKSDB_LITE