]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/pessimistic_transaction.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / pessimistic_transaction.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
11fdf7f2 8#include "utilities/transactions/pessimistic_transaction.h"
7c673cae
FG
9
10#include <map>
11#include <set>
12#include <string>
13#include <vector>
14
15#include "db/column_family.h"
f67539c2 16#include "db/db_impl/db_impl.h"
7c673cae
FG
17#include "rocksdb/comparator.h"
18#include "rocksdb/db.h"
19#include "rocksdb/snapshot.h"
20#include "rocksdb/status.h"
21#include "rocksdb/utilities/transaction_db.h"
f67539c2 22#include "test_util/sync_point.h"
11fdf7f2 23#include "util/cast_util.h"
7c673cae 24#include "util/string_util.h"
11fdf7f2 25#include "utilities/transactions/pessimistic_transaction_db.h"
7c673cae
FG
26#include "utilities/transactions/transaction_util.h"
27
f67539c2 28namespace ROCKSDB_NAMESPACE {
7c673cae
FG
29
30struct WriteOptions;
31
11fdf7f2 32std::atomic<TransactionID> PessimisticTransaction::txn_id_counter_(1);
7c673cae 33
11fdf7f2 34TransactionID PessimisticTransaction::GenTxnID() {
7c673cae
FG
35 return txn_id_counter_.fetch_add(1);
36}
37
11fdf7f2
TL
38PessimisticTransaction::PessimisticTransaction(
39 TransactionDB* txn_db, const WriteOptions& write_options,
494da23a 40 const TransactionOptions& txn_options, const bool init)
20effc67
TL
41 : TransactionBaseImpl(
42 txn_db->GetRootDB(), write_options,
43 static_cast_with_check<PessimisticTransactionDB>(txn_db)
44 ->GetLockTrackerFactory()),
7c673cae 45 txn_db_impl_(nullptr),
11fdf7f2 46 expiration_time_(0),
7c673cae
FG
47 txn_id_(0),
48 waiting_cf_id_(0),
49 waiting_key_(nullptr),
7c673cae
FG
50 lock_timeout_(0),
51 deadlock_detect_(false),
11fdf7f2
TL
52 deadlock_detect_depth_(0),
53 skip_concurrency_control_(false) {
20effc67
TL
54 txn_db_impl_ = static_cast_with_check<PessimisticTransactionDB>(txn_db);
55 db_impl_ = static_cast_with_check<DBImpl>(db_);
494da23a
TL
56 if (init) {
57 Initialize(txn_options);
58 }
7c673cae
FG
59}
60
11fdf7f2 61void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
7c673cae
FG
62 txn_id_ = GenTxnID();
63
64 txn_state_ = STARTED;
65
66 deadlock_detect_ = txn_options.deadlock_detect;
67 deadlock_detect_depth_ = txn_options.deadlock_detect_depth;
68 write_batch_.SetMaxBytes(txn_options.max_write_batch_size);
11fdf7f2 69 skip_concurrency_control_ = txn_options.skip_concurrency_control;
7c673cae
FG
70
71 lock_timeout_ = txn_options.lock_timeout * 1000;
72 if (lock_timeout_ < 0) {
73 // Lock timeout not set, use default
74 lock_timeout_ =
75 txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000;
76 }
77
78 if (txn_options.expiration >= 0) {
79 expiration_time_ = start_time_ + txn_options.expiration * 1000;
80 } else {
81 expiration_time_ = 0;
82 }
83
84 if (txn_options.set_snapshot) {
85 SetSnapshot();
86 }
87
88 if (expiration_time_ > 0) {
89 txn_db_impl_->InsertExpirableTransaction(txn_id_, this);
90 }
11fdf7f2
TL
91 use_only_the_last_commit_time_batch_for_recovery_ =
92 txn_options.use_only_the_last_commit_time_batch_for_recovery;
20effc67 93 skip_prepare_ = txn_options.skip_prepare;
7c673cae
FG
94}
95
11fdf7f2 96PessimisticTransaction::~PessimisticTransaction() {
20effc67 97 txn_db_impl_->UnLock(this, *tracked_locks_);
7c673cae
FG
98 if (expiration_time_ > 0) {
99 txn_db_impl_->RemoveExpirableTransaction(txn_id_);
100 }
20effc67 101 if (!name_.empty() && txn_state_ != COMMITTED) {
7c673cae
FG
102 txn_db_impl_->UnregisterTransaction(this);
103 }
104}
105
11fdf7f2 106void PessimisticTransaction::Clear() {
20effc67 107 txn_db_impl_->UnLock(this, *tracked_locks_);
7c673cae
FG
108 TransactionBaseImpl::Clear();
109}
110
11fdf7f2
TL
111void PessimisticTransaction::Reinitialize(
112 TransactionDB* txn_db, const WriteOptions& write_options,
113 const TransactionOptions& txn_options) {
20effc67 114 if (!name_.empty() && txn_state_ != COMMITTED) {
7c673cae
FG
115 txn_db_impl_->UnregisterTransaction(this);
116 }
117 TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options);
118 Initialize(txn_options);
119}
120
11fdf7f2 121bool PessimisticTransaction::IsExpired() const {
7c673cae
FG
122 if (expiration_time_ > 0) {
123 if (db_->GetEnv()->NowMicros() >= expiration_time_) {
124 // Transaction is expired.
125 return true;
126 }
127 }
128
129 return false;
130}
131
11fdf7f2
TL
132WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
133 const WriteOptions& write_options,
134 const TransactionOptions& txn_options)
135 : PessimisticTransaction(txn_db, write_options, txn_options){};
136
137Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
20effc67
TL
138 std::unique_ptr<LockTracker> keys_to_unlock(lock_tracker_factory_.Create());
139 Status s = LockBatch(batch, keys_to_unlock.get());
7c673cae
FG
140
141 if (!s.ok()) {
142 return s;
143 }
144
145 bool can_commit = false;
146
147 if (IsExpired()) {
148 s = Status::Expired();
149 } else if (expiration_time_ > 0) {
150 TransactionState expected = STARTED;
151 can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected,
152 AWAITING_COMMIT);
153 } else if (txn_state_ == STARTED) {
154 // lock stealing is not a concern
155 can_commit = true;
156 }
157
158 if (can_commit) {
159 txn_state_.store(AWAITING_COMMIT);
11fdf7f2 160 s = CommitBatchInternal(batch);
7c673cae 161 if (s.ok()) {
20effc67 162 txn_state_.store(COMMITTED);
7c673cae
FG
163 }
164 } else if (txn_state_ == LOCKS_STOLEN) {
165 s = Status::Expired();
166 } else {
167 s = Status::InvalidArgument("Transaction is not in state for commit.");
168 }
169
20effc67 170 txn_db_impl_->UnLock(this, *keys_to_unlock);
7c673cae
FG
171
172 return s;
173}
174
11fdf7f2 175Status PessimisticTransaction::Prepare() {
7c673cae
FG
176
177 if (name_.empty()) {
178 return Status::InvalidArgument(
179 "Cannot prepare a transaction that has not been named.");
180 }
181
182 if (IsExpired()) {
183 return Status::Expired();
184 }
185
20effc67 186 Status s;
7c673cae
FG
187 bool can_prepare = false;
188
189 if (expiration_time_ > 0) {
190 // must concern ourselves with expiraton and/or lock stealing
191 // need to compare/exchange bc locks could be stolen under us here
192 TransactionState expected = STARTED;
193 can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected,
194 AWAITING_PREPARE);
195 } else if (txn_state_ == STARTED) {
196 // expiration and lock stealing is not possible
20effc67 197 txn_state_.store(AWAITING_PREPARE);
7c673cae
FG
198 can_prepare = true;
199 }
200
201 if (can_prepare) {
7c673cae
FG
202 // transaction can't expire after preparation
203 expiration_time_ = 0;
494da23a
TL
204 assert(log_number_ == 0 ||
205 txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
11fdf7f2
TL
206
207 s = PrepareInternal();
7c673cae 208 if (s.ok()) {
7c673cae
FG
209 txn_state_.store(PREPARED);
210 }
211 } else if (txn_state_ == LOCKS_STOLEN) {
212 s = Status::Expired();
213 } else if (txn_state_ == PREPARED) {
214 s = Status::InvalidArgument("Transaction has already been prepared.");
20effc67 215 } else if (txn_state_ == COMMITTED) {
7c673cae
FG
216 s = Status::InvalidArgument("Transaction has already been committed.");
217 } else if (txn_state_ == ROLLEDBACK) {
218 s = Status::InvalidArgument("Transaction has already been rolledback.");
219 } else {
220 s = Status::InvalidArgument("Transaction is not in state for commit.");
221 }
222
223 return s;
224}
225
11fdf7f2
TL
226Status WriteCommittedTxn::PrepareInternal() {
227 WriteOptions write_options = write_options_;
228 write_options.disableWAL = false;
20effc67
TL
229 auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
230 name_);
231 assert(s.ok());
494da23a
TL
232 class MarkLogCallback : public PreReleaseCallback {
233 public:
234 MarkLogCallback(DBImpl* db, bool two_write_queues)
235 : db_(db), two_write_queues_(two_write_queues) {
236 (void)two_write_queues_; // to silence unused private field warning
237 }
238 virtual Status Callback(SequenceNumber, bool is_mem_disabled,
f67539c2
TL
239 uint64_t log_number, size_t /*index*/,
240 size_t /*total*/) override {
494da23a
TL
241#ifdef NDEBUG
242 (void)is_mem_disabled;
243#endif
244 assert(log_number != 0);
245 assert(!two_write_queues_ || is_mem_disabled); // implies the 2nd queue
246 db_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number);
247 return Status::OK();
248 }
249
250 private:
251 DBImpl* db_;
252 bool two_write_queues_;
253 } mark_log_callback(db_impl_,
254 db_impl_->immutable_db_options().two_write_queues);
255
256 WriteCallback* const kNoWriteCallback = nullptr;
257 const uint64_t kRefNoLog = 0;
258 const bool kDisableMemtable = true;
259 SequenceNumber* const KIgnoreSeqUsed = nullptr;
260 const size_t kNoBatchCount = 0;
20effc67
TL
261 s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
262 kNoWriteCallback, &log_number_, kRefNoLog,
263 kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount,
264 &mark_log_callback);
11fdf7f2
TL
265 return s;
266}
267
268Status PessimisticTransaction::Commit() {
11fdf7f2 269 bool commit_without_prepare = false;
7c673cae
FG
270 bool commit_prepared = false;
271
272 if (IsExpired()) {
273 return Status::Expired();
274 }
275
276 if (expiration_time_ > 0) {
277 // we must atomicaly compare and exchange the state here because at
278 // this state in the transaction it is possible for another thread
279 // to change our state out from under us in the even that we expire and have
280 // our locks stolen. In this case the only valid state is STARTED because
281 // a state of PREPARED would have a cleared expiration_time_.
282 TransactionState expected = STARTED;
11fdf7f2
TL
283 commit_without_prepare = std::atomic_compare_exchange_strong(
284 &txn_state_, &expected, AWAITING_COMMIT);
7c673cae
FG
285 TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
286 } else if (txn_state_ == PREPARED) {
287 // expiration and lock stealing is not a concern
288 commit_prepared = true;
289 } else if (txn_state_ == STARTED) {
290 // expiration and lock stealing is not a concern
20effc67
TL
291 if (skip_prepare_) {
292 commit_without_prepare = true;
293 } else {
294 return Status::TxnNotPrepared();
295 }
7c673cae
FG
296 }
297
20effc67 298 Status s;
11fdf7f2 299 if (commit_without_prepare) {
7c673cae
FG
300 assert(!commit_prepared);
301 if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
302 s = Status::InvalidArgument(
303 "Commit-time batch contains values that will not be committed.");
304 } else {
305 txn_state_.store(AWAITING_COMMIT);
11fdf7f2
TL
306 if (log_number_ > 0) {
307 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
308 log_number_);
309 }
310 s = CommitWithoutPrepareInternal();
311 if (!name_.empty()) {
312 txn_db_impl_->UnregisterTransaction(this);
313 }
7c673cae
FG
314 Clear();
315 if (s.ok()) {
20effc67 316 txn_state_.store(COMMITTED);
7c673cae
FG
317 }
318 }
319 } else if (commit_prepared) {
320 txn_state_.store(AWAITING_COMMIT);
321
11fdf7f2 322 s = CommitInternal();
7c673cae 323
7c673cae 324 if (!s.ok()) {
11fdf7f2
TL
325 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
326 "Commit write failed");
7c673cae
FG
327 return s;
328 }
329
330 // FindObsoleteFiles must now look to the memtables
331 // to determine what prep logs must be kept around,
332 // not the prep section heap.
333 assert(log_number_ > 0);
11fdf7f2
TL
334 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
335 log_number_);
7c673cae
FG
336 txn_db_impl_->UnregisterTransaction(this);
337
338 Clear();
20effc67 339 txn_state_.store(COMMITTED);
7c673cae
FG
340 } else if (txn_state_ == LOCKS_STOLEN) {
341 s = Status::Expired();
20effc67 342 } else if (txn_state_ == COMMITTED) {
7c673cae
FG
343 s = Status::InvalidArgument("Transaction has already been committed.");
344 } else if (txn_state_ == ROLLEDBACK) {
345 s = Status::InvalidArgument("Transaction has already been rolledback.");
346 } else {
347 s = Status::InvalidArgument("Transaction is not in state for commit.");
348 }
349
350 return s;
351}
352
11fdf7f2 353Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
494da23a
TL
354 uint64_t seq_used = kMaxSequenceNumber;
355 auto s =
356 db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(),
357 /*callback*/ nullptr, /*log_used*/ nullptr,
358 /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used);
359 assert(!s.ok() || seq_used != kMaxSequenceNumber);
360 if (s.ok()) {
361 SetId(seq_used);
362 }
11fdf7f2
TL
363 return s;
364}
365
366Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) {
494da23a
TL
367 uint64_t seq_used = kMaxSequenceNumber;
368 auto s = db_impl_->WriteImpl(write_options_, batch, /*callback*/ nullptr,
369 /*log_used*/ nullptr, /*log_ref*/ 0,
370 /*disable_memtable*/ false, &seq_used);
371 assert(!s.ok() || seq_used != kMaxSequenceNumber);
372 if (s.ok()) {
373 SetId(seq_used);
374 }
11fdf7f2
TL
375 return s;
376}
377
378Status WriteCommittedTxn::CommitInternal() {
379 // We take the commit-time batch and append the Commit marker.
380 // The Memtable will ignore the Commit marker in non-recovery mode
381 WriteBatch* working_batch = GetCommitTimeWriteBatch();
20effc67
TL
382 auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
383 assert(s.ok());
11fdf7f2
TL
384
385 // any operations appended to this working_batch will be ignored from WAL
386 working_batch->MarkWalTerminationPoint();
387
388 // insert prepared batch into Memtable only skipping WAL.
389 // Memtable will ignore BeginPrepare/EndPrepare markers
390 // in non recovery mode and simply insert the values
20effc67
TL
391 s = WriteBatchInternal::Append(working_batch,
392 GetWriteBatch()->GetWriteBatch());
393 assert(s.ok());
11fdf7f2 394
494da23a 395 uint64_t seq_used = kMaxSequenceNumber;
20effc67 396 s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
494da23a 397 /*log_used*/ nullptr, /*log_ref*/ log_number_,
20effc67 398 /*disable_memtable*/ false, &seq_used);
494da23a
TL
399 assert(!s.ok() || seq_used != kMaxSequenceNumber);
400 if (s.ok()) {
401 SetId(seq_used);
402 }
11fdf7f2
TL
403 return s;
404}
405
406Status PessimisticTransaction::Rollback() {
7c673cae
FG
407 Status s;
408 if (txn_state_ == PREPARED) {
7c673cae 409 txn_state_.store(AWAITING_ROLLBACK);
11fdf7f2
TL
410
411 s = RollbackInternal();
412
7c673cae
FG
413 if (s.ok()) {
414 // we do not need to keep our prepared section around
415 assert(log_number_ > 0);
11fdf7f2
TL
416 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
417 log_number_);
7c673cae
FG
418 Clear();
419 txn_state_.store(ROLLEDBACK);
420 }
421 } else if (txn_state_ == STARTED) {
11fdf7f2
TL
422 if (log_number_ > 0) {
423 assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
424 assert(GetId() > 0);
425 s = RollbackInternal();
426
427 if (s.ok()) {
428 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
429 log_number_);
430 }
431 }
7c673cae
FG
432 // prepare couldn't have taken place
433 Clear();
20effc67 434 } else if (txn_state_ == COMMITTED) {
7c673cae
FG
435 s = Status::InvalidArgument("This transaction has already been committed.");
436 } else {
437 s = Status::InvalidArgument(
438 "Two phase transaction is not in state for rollback.");
439 }
440
441 return s;
442}
443
11fdf7f2
TL
444Status WriteCommittedTxn::RollbackInternal() {
445 WriteBatch rollback_marker;
20effc67
TL
446 auto s = WriteBatchInternal::MarkRollback(&rollback_marker, name_);
447 assert(s.ok());
448 s = db_impl_->WriteImpl(write_options_, &rollback_marker);
11fdf7f2
TL
449 return s;
450}
451
452Status PessimisticTransaction::RollbackToSavePoint() {
7c673cae
FG
453 if (txn_state_ != STARTED) {
454 return Status::InvalidArgument("Transaction is beyond state for rollback.");
455 }
456
20effc67
TL
457 if (save_points_ != nullptr && !save_points_->empty()) {
458 // Unlock any keys locked since last transaction
459 auto& save_point_tracker = *save_points_->top().new_locks_;
460 std::unique_ptr<LockTracker> t(
461 tracked_locks_->GetTrackedLocksSinceSavePoint(save_point_tracker));
462 if (t) {
463 txn_db_impl_->UnLock(this, *t);
464 }
7c673cae
FG
465 }
466
467 return TransactionBaseImpl::RollbackToSavePoint();
468}
469
470// Lock all keys in this batch.
471// On success, caller should unlock keys_to_unlock
11fdf7f2 472Status PessimisticTransaction::LockBatch(WriteBatch* batch,
20effc67 473 LockTracker* keys_to_unlock) {
7c673cae
FG
474 class Handler : public WriteBatch::Handler {
475 public:
476 // Sorted map of column_family_id to sorted set of keys.
477 // Since LockBatch() always locks keys in sorted order, it cannot deadlock
478 // with itself. We're not using a comparator here since it doesn't matter
479 // what the sorting is as long as it's consistent.
480 std::map<uint32_t, std::set<std::string>> keys_;
481
482 Handler() {}
483
484 void RecordKey(uint32_t column_family_id, const Slice& key) {
485 std::string key_str = key.ToString();
486
f67539c2
TL
487 auto& cfh_keys = keys_[column_family_id];
488 auto iter = cfh_keys.find(key_str);
489 if (iter == cfh_keys.end()) {
7c673cae 490 // key not yet seen, store it.
f67539c2 491 cfh_keys.insert({std::move(key_str)});
7c673cae
FG
492 }
493 }
494
494da23a
TL
495 Status PutCF(uint32_t column_family_id, const Slice& key,
496 const Slice& /* unused */) override {
7c673cae
FG
497 RecordKey(column_family_id, key);
498 return Status::OK();
499 }
494da23a
TL
500 Status MergeCF(uint32_t column_family_id, const Slice& key,
501 const Slice& /* unused */) override {
7c673cae
FG
502 RecordKey(column_family_id, key);
503 return Status::OK();
504 }
494da23a 505 Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
7c673cae
FG
506 RecordKey(column_family_id, key);
507 return Status::OK();
508 }
509 };
510
511 // Iterating on this handler will add all keys in this batch into keys
512 Handler handler;
20effc67
TL
513 Status s = batch->Iterate(&handler);
514 if (!s.ok()) {
515 return s;
516 }
7c673cae
FG
517
518 // Attempt to lock all keys
519 for (const auto& cf_iter : handler.keys_) {
520 uint32_t cfh_id = cf_iter.first;
521 auto& cfh_keys = cf_iter.second;
522
523 for (const auto& key_iter : cfh_keys) {
524 const std::string& key = key_iter;
525
526 s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */);
527 if (!s.ok()) {
528 break;
529 }
20effc67
TL
530 PointLockRequest r;
531 r.column_family_id = cfh_id;
532 r.key = key;
533 r.seq = kMaxSequenceNumber;
534 r.read_only = false;
535 r.exclusive = true;
536 keys_to_unlock->Track(r);
7c673cae
FG
537 }
538
539 if (!s.ok()) {
540 break;
541 }
542 }
543
544 if (!s.ok()) {
20effc67 545 txn_db_impl_->UnLock(this, *keys_to_unlock);
7c673cae
FG
546 }
547
548 return s;
549}
550
551// Attempt to lock this key.
552// Returns OK if the key has been successfully locked. Non-ok, otherwise.
553// If check_shapshot is true and this transaction has a snapshot set,
554// this key will only be locked if there have been no writes to this key since
555// the snapshot time.
11fdf7f2
TL
556Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
557 const Slice& key, bool read_only,
494da23a
TL
558 bool exclusive, const bool do_validate,
559 const bool assume_tracked) {
560 assert(!assume_tracked || !do_validate);
11fdf7f2
TL
561 Status s;
562 if (UNLIKELY(skip_concurrency_control_)) {
563 return s;
564 }
7c673cae
FG
565 uint32_t cfh_id = GetColumnFamilyID(column_family);
566 std::string key_str = key.ToString();
20effc67
TL
567 PointLockStatus status = tracked_locks_->GetPointLockStatus(cfh_id, key_str);
568 bool previously_locked = status.locked;
569 bool lock_upgrade = previously_locked && exclusive && !status.exclusive;
7c673cae
FG
570
571 // Lock this key if this transactions hasn't already locked it or we require
572 // an upgrade.
573 if (!previously_locked || lock_upgrade) {
574 s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
575 }
576
577 SetSnapshotIfNeeded();
578
579 // Even though we do not care about doing conflict checking for this write,
580 // we still need to take a lock to make sure we do not cause a conflict with
581 // some other write. However, we do not need to check if there have been
582 // any writes since this transaction's snapshot.
583 // TODO(agiardullo): could optimize by supporting shared txn locks in the
584 // future
20effc67
TL
585 SequenceNumber tracked_at_seq =
586 status.locked ? status.seq : kMaxSequenceNumber;
494da23a
TL
587 if (!do_validate || snapshot_ == nullptr) {
588 if (assume_tracked && !previously_locked) {
589 s = Status::InvalidArgument(
590 "assume_tracked is set but it is not tracked yet");
591 }
7c673cae
FG
592 // Need to remember the earliest sequence number that we know that this
593 // key has not been modified after. This is useful if this same
594 // transaction
595 // later tries to lock this key again.
11fdf7f2 596 if (tracked_at_seq == kMaxSequenceNumber) {
7c673cae
FG
597 // Since we haven't checked a snapshot, we only know this key has not
598 // been modified since after we locked it.
11fdf7f2
TL
599 // Note: when last_seq_same_as_publish_seq_==false this is less than the
600 // latest allocated seq but it is ok since i) this is just a heuristic
601 // used only as a hint to avoid actual check for conflicts, ii) this would
602 // cause a false positive only if the snapthot is taken right after the
603 // lock, which would be an unusual sequence.
604 tracked_at_seq = db_->GetLatestSequenceNumber();
7c673cae
FG
605 }
606 } else {
607 // If a snapshot is set, we need to make sure the key hasn't been modified
608 // since the snapshot. This must be done after we locked the key.
11fdf7f2
TL
609 // If we already have validated an earilier snapshot it must has been
610 // reflected in tracked_at_seq and ValidateSnapshot will return OK.
7c673cae 611 if (s.ok()) {
11fdf7f2 612 s = ValidateSnapshot(column_family, key, &tracked_at_seq);
7c673cae
FG
613
614 if (!s.ok()) {
615 // Failed to validate key
20effc67
TL
616 // Unlock key we just locked
617 if (lock_upgrade) {
618 s = txn_db_impl_->TryLock(this, cfh_id, key_str,
619 false /* exclusive */);
620 assert(s.ok());
621 } else if (!previously_locked) {
622 txn_db_impl_->UnLock(this, cfh_id, key.ToString());
7c673cae
FG
623 }
624 }
625 }
626 }
627
628 if (s.ok()) {
11fdf7f2
TL
629 // We must track all the locked keys so that we can unlock them later. If
630 // the key is already locked, this func will update some stats on the
f67539c2
TL
631 // tracked key. It could also update the tracked_at_seq if it is lower
632 // than the existing tracked key seq. These stats are necessary for
633 // RollbackToSavePoint to determine whether a key can be safely removed
634 // from tracked_keys_. Removal can only be done if a key was only locked
635 // during the current savepoint.
636 //
637 // Recall that if assume_tracked is true, we assume that TrackKey has been
638 // called previously since the last savepoint, with the same exclusive
639 // setting, and at a lower sequence number, so skipping here should be
640 // safe.
641 if (!assume_tracked) {
642 TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
643 } else {
644#ifndef NDEBUG
20effc67
TL
645 PointLockStatus lock_status =
646 tracked_locks_->GetPointLockStatus(cfh_id, key_str);
647 assert(lock_status.locked);
648 assert(lock_status.seq <= tracked_at_seq);
649 assert(lock_status.exclusive == exclusive);
f67539c2
TL
650#endif
651 }
7c673cae
FG
652 }
653
654 return s;
655}
656
657// Return OK() if this key has not been modified more recently than the
658// transaction snapshot_.
11fdf7f2
TL
659// tracked_at_seq is the global seq at which we either locked the key or already
660// have done ValidateSnapshot.
661Status PessimisticTransaction::ValidateSnapshot(
662 ColumnFamilyHandle* column_family, const Slice& key,
663 SequenceNumber* tracked_at_seq) {
7c673cae
FG
664 assert(snapshot_);
665
11fdf7f2
TL
666 SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
667 if (*tracked_at_seq <= snap_seq) {
668 // If the key has been previous validated (or locked) at a sequence number
669 // earlier than the current snapshot's sequence number, we already know it
670 // has not been modified aftter snap_seq either.
7c673cae
FG
671 return Status::OK();
672 }
11fdf7f2
TL
673 // Otherwise we have either
674 // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key
675 // 2: snap_seq < tracked_at_seq: last time we lock the key was via
494da23a 676 // do_validate=false which means we had skipped ValidateSnapshot. In both
11fdf7f2 677 // cases we should do ValidateSnapshot now.
7c673cae 678
11fdf7f2 679 *tracked_at_seq = snap_seq;
7c673cae
FG
680
681 ColumnFamilyHandle* cfh =
11fdf7f2 682 column_family ? column_family : db_impl_->DefaultColumnFamily();
7c673cae 683
11fdf7f2
TL
684 return TransactionUtil::CheckKeyForConflicts(
685 db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */);
7c673cae
FG
686}
687
11fdf7f2 688bool PessimisticTransaction::TryStealingLocks() {
7c673cae
FG
689 assert(IsExpired());
690 TransactionState expected = STARTED;
691 return std::atomic_compare_exchange_strong(&txn_state_, &expected,
692 LOCKS_STOLEN);
693}
694
11fdf7f2
TL
695void PessimisticTransaction::UnlockGetForUpdate(
696 ColumnFamilyHandle* column_family, const Slice& key) {
7c673cae
FG
697 txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
698}
699
11fdf7f2 700Status PessimisticTransaction::SetName(const TransactionName& name) {
7c673cae
FG
701 Status s;
702 if (txn_state_ == STARTED) {
703 if (name_.length()) {
704 s = Status::InvalidArgument("Transaction has already been named.");
705 } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) {
706 s = Status::InvalidArgument("Transaction name must be unique.");
707 } else if (name.length() < 1 || name.length() > 512) {
708 s = Status::InvalidArgument(
709 "Transaction name length must be between 1 and 512 chars.");
710 } else {
711 name_ = name;
712 txn_db_impl_->RegisterTransaction(this);
713 }
714 } else {
715 s = Status::InvalidArgument("Transaction is beyond state for naming.");
716 }
717 return s;
718}
719
f67539c2 720} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
721
722#endif // ROCKSDB_LITE