1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
8 #include "utilities/transactions/transaction_impl.h"
15 #include "db/column_family.h"
16 #include "db/db_impl.h"
17 #include "rocksdb/comparator.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/snapshot.h"
20 #include "rocksdb/status.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "util/string_util.h"
23 #include "util/sync_point.h"
24 #include "utilities/transactions/transaction_db_impl.h"
25 #include "utilities/transactions/transaction_util.h"
31 std::atomic
<TransactionID
> TransactionImpl::txn_id_counter_(1);
33 TransactionID
TransactionImpl::GenTxnID() {
34 return txn_id_counter_
.fetch_add(1);
37 TransactionImpl::TransactionImpl(TransactionDB
* txn_db
,
38 const WriteOptions
& write_options
,
39 const TransactionOptions
& txn_options
)
40 : TransactionBaseImpl(txn_db
->GetRootDB(), write_options
),
41 txn_db_impl_(nullptr),
44 waiting_key_(nullptr),
47 deadlock_detect_(false),
48 deadlock_detect_depth_(0) {
49 txn_db_impl_
= dynamic_cast<TransactionDBImpl
*>(txn_db
);
51 db_impl_
= dynamic_cast<DBImpl
*>(txn_db
->GetRootDB());
53 Initialize(txn_options
);
56 void TransactionImpl::Initialize(const TransactionOptions
& txn_options
) {
61 deadlock_detect_
= txn_options
.deadlock_detect
;
62 deadlock_detect_depth_
= txn_options
.deadlock_detect_depth
;
63 write_batch_
.SetMaxBytes(txn_options
.max_write_batch_size
);
65 lock_timeout_
= txn_options
.lock_timeout
* 1000;
66 if (lock_timeout_
< 0) {
67 // Lock timeout not set, use default
69 txn_db_impl_
->GetTxnDBOptions().transaction_lock_timeout
* 1000;
72 if (txn_options
.expiration
>= 0) {
73 expiration_time_
= start_time_
+ txn_options
.expiration
* 1000;
78 if (txn_options
.set_snapshot
) {
82 if (expiration_time_
> 0) {
83 txn_db_impl_
->InsertExpirableTransaction(txn_id_
, this);
87 TransactionImpl::~TransactionImpl() {
88 txn_db_impl_
->UnLock(this, &GetTrackedKeys());
89 if (expiration_time_
> 0) {
90 txn_db_impl_
->RemoveExpirableTransaction(txn_id_
);
92 if (!name_
.empty() && txn_state_
!= COMMITED
) {
93 txn_db_impl_
->UnregisterTransaction(this);
97 void TransactionImpl::Clear() {
98 txn_db_impl_
->UnLock(this, &GetTrackedKeys());
99 TransactionBaseImpl::Clear();
102 void TransactionImpl::Reinitialize(TransactionDB
* txn_db
,
103 const WriteOptions
& write_options
,
104 const TransactionOptions
& txn_options
) {
105 if (!name_
.empty() && txn_state_
!= COMMITED
) {
106 txn_db_impl_
->UnregisterTransaction(this);
108 TransactionBaseImpl::Reinitialize(txn_db
->GetRootDB(), write_options
);
109 Initialize(txn_options
);
112 bool TransactionImpl::IsExpired() const {
113 if (expiration_time_
> 0) {
114 if (db_
->GetEnv()->NowMicros() >= expiration_time_
) {
115 // Transaction is expired.
123 Status
TransactionImpl::CommitBatch(WriteBatch
* batch
) {
124 TransactionKeyMap keys_to_unlock
;
125 Status s
= LockBatch(batch
, &keys_to_unlock
);
131 bool can_commit
= false;
134 s
= Status::Expired();
135 } else if (expiration_time_
> 0) {
136 TransactionState expected
= STARTED
;
137 can_commit
= std::atomic_compare_exchange_strong(&txn_state_
, &expected
,
139 } else if (txn_state_
== STARTED
) {
140 // lock stealing is not a concern
145 txn_state_
.store(AWAITING_COMMIT
);
146 s
= db_
->Write(write_options_
, batch
);
148 txn_state_
.store(COMMITED
);
150 } else if (txn_state_
== LOCKS_STOLEN
) {
151 s
= Status::Expired();
153 s
= Status::InvalidArgument("Transaction is not in state for commit.");
156 txn_db_impl_
->UnLock(this, &keys_to_unlock
);
161 Status
TransactionImpl::Prepare() {
165 return Status::InvalidArgument(
166 "Cannot prepare a transaction that has not been named.");
170 return Status::Expired();
173 bool can_prepare
= false;
175 if (expiration_time_
> 0) {
176 // must concern ourselves with expiraton and/or lock stealing
177 // need to compare/exchange bc locks could be stolen under us here
178 TransactionState expected
= STARTED
;
179 can_prepare
= std::atomic_compare_exchange_strong(&txn_state_
, &expected
,
181 } else if (txn_state_
== STARTED
) {
182 // expiration and lock stealing is not possible
187 txn_state_
.store(AWAITING_PREPARE
);
188 // transaction can't expire after preparation
189 expiration_time_
= 0;
190 WriteOptions write_options
= write_options_
;
191 write_options
.disableWAL
= false;
192 WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_
);
193 s
= db_impl_
->WriteImpl(write_options
, GetWriteBatch()->GetWriteBatch(),
194 /*callback*/ nullptr, &log_number_
, /*log ref*/ 0,
195 /* disable_memtable*/ true);
197 assert(log_number_
!= 0);
198 dbimpl_
->MarkLogAsContainingPrepSection(log_number_
);
199 txn_state_
.store(PREPARED
);
201 } else if (txn_state_
== LOCKS_STOLEN
) {
202 s
= Status::Expired();
203 } else if (txn_state_
== PREPARED
) {
204 s
= Status::InvalidArgument("Transaction has already been prepared.");
205 } else if (txn_state_
== COMMITED
) {
206 s
= Status::InvalidArgument("Transaction has already been committed.");
207 } else if (txn_state_
== ROLLEDBACK
) {
208 s
= Status::InvalidArgument("Transaction has already been rolledback.");
210 s
= Status::InvalidArgument("Transaction is not in state for commit.");
216 Status
TransactionImpl::Commit() {
218 bool commit_single
= false;
219 bool commit_prepared
= false;
222 return Status::Expired();
225 if (expiration_time_
> 0) {
226 // we must atomicaly compare and exchange the state here because at
227 // this state in the transaction it is possible for another thread
228 // to change our state out from under us in the even that we expire and have
229 // our locks stolen. In this case the only valid state is STARTED because
230 // a state of PREPARED would have a cleared expiration_time_.
231 TransactionState expected
= STARTED
;
232 commit_single
= std::atomic_compare_exchange_strong(&txn_state_
, &expected
,
234 TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
235 } else if (txn_state_
== PREPARED
) {
236 // expiration and lock stealing is not a concern
237 commit_prepared
= true;
238 } else if (txn_state_
== STARTED
) {
239 // expiration and lock stealing is not a concern
240 commit_single
= true;
244 assert(!commit_prepared
);
245 if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
246 s
= Status::InvalidArgument(
247 "Commit-time batch contains values that will not be committed.");
249 txn_state_
.store(AWAITING_COMMIT
);
250 s
= db_
->Write(write_options_
, GetWriteBatch()->GetWriteBatch());
253 txn_state_
.store(COMMITED
);
256 } else if (commit_prepared
) {
257 txn_state_
.store(AWAITING_COMMIT
);
259 // We take the commit-time batch and append the Commit marker.
260 // The Memtable will ignore the Commit marker in non-recovery mode
261 WriteBatch
* working_batch
= GetCommitTimeWriteBatch();
262 WriteBatchInternal::MarkCommit(working_batch
, name_
);
264 // any operations appended to this working_batch will be ignored from WAL
265 working_batch
->MarkWalTerminationPoint();
267 // insert prepared batch into Memtable only skipping WAL.
268 // Memtable will ignore BeginPrepare/EndPrepare markers
269 // in non recovery mode and simply insert the values
270 WriteBatchInternal::Append(working_batch
, GetWriteBatch()->GetWriteBatch());
272 s
= db_impl_
->WriteImpl(write_options_
, working_batch
, nullptr, nullptr,
278 // FindObsoleteFiles must now look to the memtables
279 // to determine what prep logs must be kept around,
280 // not the prep section heap.
281 assert(log_number_
> 0);
282 dbimpl_
->MarkLogAsHavingPrepSectionFlushed(log_number_
);
283 txn_db_impl_
->UnregisterTransaction(this);
286 txn_state_
.store(COMMITED
);
287 } else if (txn_state_
== LOCKS_STOLEN
) {
288 s
= Status::Expired();
289 } else if (txn_state_
== COMMITED
) {
290 s
= Status::InvalidArgument("Transaction has already been committed.");
291 } else if (txn_state_
== ROLLEDBACK
) {
292 s
= Status::InvalidArgument("Transaction has already been rolledback.");
294 s
= Status::InvalidArgument("Transaction is not in state for commit.");
300 Status
TransactionImpl::Rollback() {
302 if (txn_state_
== PREPARED
) {
303 WriteBatch rollback_marker
;
304 WriteBatchInternal::MarkRollback(&rollback_marker
, name_
);
305 txn_state_
.store(AWAITING_ROLLBACK
);
306 s
= db_impl_
->WriteImpl(write_options_
, &rollback_marker
);
308 // we do not need to keep our prepared section around
309 assert(log_number_
> 0);
310 dbimpl_
->MarkLogAsHavingPrepSectionFlushed(log_number_
);
312 txn_state_
.store(ROLLEDBACK
);
314 } else if (txn_state_
== STARTED
) {
315 // prepare couldn't have taken place
317 } else if (txn_state_
== COMMITED
) {
318 s
= Status::InvalidArgument("This transaction has already been committed.");
320 s
= Status::InvalidArgument(
321 "Two phase transaction is not in state for rollback.");
327 Status
TransactionImpl::RollbackToSavePoint() {
328 if (txn_state_
!= STARTED
) {
329 return Status::InvalidArgument("Transaction is beyond state for rollback.");
332 // Unlock any keys locked since last transaction
333 const std::unique_ptr
<TransactionKeyMap
>& keys
=
334 GetTrackedKeysSinceSavePoint();
337 txn_db_impl_
->UnLock(this, keys
.get());
340 return TransactionBaseImpl::RollbackToSavePoint();
343 // Lock all keys in this batch.
344 // On success, caller should unlock keys_to_unlock
345 Status
TransactionImpl::LockBatch(WriteBatch
* batch
,
346 TransactionKeyMap
* keys_to_unlock
) {
347 class Handler
: public WriteBatch::Handler
{
349 // Sorted map of column_family_id to sorted set of keys.
350 // Since LockBatch() always locks keys in sorted order, it cannot deadlock
351 // with itself. We're not using a comparator here since it doesn't matter
352 // what the sorting is as long as it's consistent.
353 std::map
<uint32_t, std::set
<std::string
>> keys_
;
357 void RecordKey(uint32_t column_family_id
, const Slice
& key
) {
358 std::string key_str
= key
.ToString();
360 auto iter
= (keys_
)[column_family_id
].find(key_str
);
361 if (iter
== (keys_
)[column_family_id
].end()) {
362 // key not yet seen, store it.
363 (keys_
)[column_family_id
].insert({std::move(key_str
)});
367 virtual Status
PutCF(uint32_t column_family_id
, const Slice
& key
,
368 const Slice
& value
) override
{
369 RecordKey(column_family_id
, key
);
372 virtual Status
MergeCF(uint32_t column_family_id
, const Slice
& key
,
373 const Slice
& value
) override
{
374 RecordKey(column_family_id
, key
);
377 virtual Status
DeleteCF(uint32_t column_family_id
,
378 const Slice
& key
) override
{
379 RecordKey(column_family_id
, key
);
384 // Iterating on this handler will add all keys in this batch into keys
386 batch
->Iterate(&handler
);
390 // Attempt to lock all keys
391 for (const auto& cf_iter
: handler
.keys_
) {
392 uint32_t cfh_id
= cf_iter
.first
;
393 auto& cfh_keys
= cf_iter
.second
;
395 for (const auto& key_iter
: cfh_keys
) {
396 const std::string
& key
= key_iter
;
398 s
= txn_db_impl_
->TryLock(this, cfh_id
, key
, true /* exclusive */);
402 TrackKey(keys_to_unlock
, cfh_id
, std::move(key
), kMaxSequenceNumber
,
403 false, true /* exclusive */);
412 txn_db_impl_
->UnLock(this, keys_to_unlock
);
418 // Attempt to lock this key.
419 // Returns OK if the key has been successfully locked. Non-ok, otherwise.
420 // If check_shapshot is true and this transaction has a snapshot set,
421 // this key will only be locked if there have been no writes to this key since
422 // the snapshot time.
423 Status
TransactionImpl::TryLock(ColumnFamilyHandle
* column_family
,
424 const Slice
& key
, bool read_only
,
425 bool exclusive
, bool untracked
) {
426 uint32_t cfh_id
= GetColumnFamilyID(column_family
);
427 std::string key_str
= key
.ToString();
428 bool previously_locked
;
429 bool lock_upgrade
= false;
432 // lock this key if this transactions hasn't already locked it
433 SequenceNumber current_seqno
= kMaxSequenceNumber
;
434 SequenceNumber new_seqno
= kMaxSequenceNumber
;
436 const auto& tracked_keys
= GetTrackedKeys();
437 const auto tracked_keys_cf
= tracked_keys
.find(cfh_id
);
438 if (tracked_keys_cf
== tracked_keys
.end()) {
439 previously_locked
= false;
441 auto iter
= tracked_keys_cf
->second
.find(key_str
);
442 if (iter
== tracked_keys_cf
->second
.end()) {
443 previously_locked
= false;
445 if (!iter
->second
.exclusive
&& exclusive
) {
448 previously_locked
= true;
449 current_seqno
= iter
->second
.seq
;
453 // Lock this key if this transactions hasn't already locked it or we require
455 if (!previously_locked
|| lock_upgrade
) {
456 s
= txn_db_impl_
->TryLock(this, cfh_id
, key_str
, exclusive
);
459 SetSnapshotIfNeeded();
461 // Even though we do not care about doing conflict checking for this write,
462 // we still need to take a lock to make sure we do not cause a conflict with
463 // some other write. However, we do not need to check if there have been
464 // any writes since this transaction's snapshot.
465 // TODO(agiardullo): could optimize by supporting shared txn locks in the
467 if (untracked
|| snapshot_
== nullptr) {
468 // Need to remember the earliest sequence number that we know that this
469 // key has not been modified after. This is useful if this same
471 // later tries to lock this key again.
472 if (current_seqno
== kMaxSequenceNumber
) {
473 // Since we haven't checked a snapshot, we only know this key has not
474 // been modified since after we locked it.
475 new_seqno
= db_
->GetLatestSequenceNumber();
477 new_seqno
= current_seqno
;
480 // If a snapshot is set, we need to make sure the key hasn't been modified
481 // since the snapshot. This must be done after we locked the key.
483 s
= ValidateSnapshot(column_family
, key
, current_seqno
, &new_seqno
);
486 // Failed to validate key
487 if (!previously_locked
) {
488 // Unlock key we just locked
490 s
= txn_db_impl_
->TryLock(this, cfh_id
, key_str
,
491 false /* exclusive */);
494 txn_db_impl_
->UnLock(this, cfh_id
, key
.ToString());
502 // Let base class know we've conflict checked this key.
503 TrackKey(cfh_id
, key_str
, new_seqno
, read_only
, exclusive
);
509 // Return OK() if this key has not been modified more recently than the
510 // transaction snapshot_.
511 Status
TransactionImpl::ValidateSnapshot(ColumnFamilyHandle
* column_family
,
513 SequenceNumber prev_seqno
,
514 SequenceNumber
* new_seqno
) {
517 SequenceNumber seq
= snapshot_
->GetSequenceNumber();
518 if (prev_seqno
<= seq
) {
519 // If the key has been previous validated at a sequence number earlier
520 // than the curent snapshot's sequence number, we already know it has not
527 assert(dynamic_cast<DBImpl
*>(db_
) != nullptr);
528 auto db_impl
= reinterpret_cast<DBImpl
*>(db_
);
530 ColumnFamilyHandle
* cfh
=
531 column_family
? column_family
: db_impl
->DefaultColumnFamily();
533 return TransactionUtil::CheckKeyForConflicts(db_impl
, cfh
, key
.ToString(),
534 snapshot_
->GetSequenceNumber(),
535 false /* cache_only */);
538 bool TransactionImpl::TryStealingLocks() {
540 TransactionState expected
= STARTED
;
541 return std::atomic_compare_exchange_strong(&txn_state_
, &expected
,
545 void TransactionImpl::UnlockGetForUpdate(ColumnFamilyHandle
* column_family
,
547 txn_db_impl_
->UnLock(this, GetColumnFamilyID(column_family
), key
.ToString());
550 Status
TransactionImpl::SetName(const TransactionName
& name
) {
552 if (txn_state_
== STARTED
) {
553 if (name_
.length()) {
554 s
= Status::InvalidArgument("Transaction has already been named.");
555 } else if (txn_db_impl_
->GetTransactionByName(name
) != nullptr) {
556 s
= Status::InvalidArgument("Transaction name must be unique.");
557 } else if (name
.length() < 1 || name
.length() > 512) {
558 s
= Status::InvalidArgument(
559 "Transaction name length must be between 1 and 512 chars.");
562 txn_db_impl_
->RegisterTransaction(this);
565 s
= Status::InvalidArgument("Transaction is beyond state for naming.");
570 } // namespace rocksdb
572 #endif // ROCKSDB_LITE