]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
11fdf7f2 | 8 | #include "utilities/transactions/pessimistic_transaction.h" |
7c673cae FG |
9 | |
10 | #include <map> | |
11 | #include <set> | |
12 | #include <string> | |
13 | #include <vector> | |
14 | ||
15 | #include "db/column_family.h" | |
f67539c2 | 16 | #include "db/db_impl/db_impl.h" |
7c673cae FG |
17 | #include "rocksdb/comparator.h" |
18 | #include "rocksdb/db.h" | |
19 | #include "rocksdb/snapshot.h" | |
20 | #include "rocksdb/status.h" | |
21 | #include "rocksdb/utilities/transaction_db.h" | |
f67539c2 | 22 | #include "test_util/sync_point.h" |
11fdf7f2 | 23 | #include "util/cast_util.h" |
7c673cae | 24 | #include "util/string_util.h" |
11fdf7f2 | 25 | #include "utilities/transactions/pessimistic_transaction_db.h" |
7c673cae FG |
26 | #include "utilities/transactions/transaction_util.h" |
27 | ||
f67539c2 | 28 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
29 | |
30 | struct WriteOptions; | |
31 | ||
11fdf7f2 | 32 | std::atomic<TransactionID> PessimisticTransaction::txn_id_counter_(1); |
7c673cae | 33 | |
11fdf7f2 | 34 | TransactionID PessimisticTransaction::GenTxnID() { |
7c673cae FG |
35 | return txn_id_counter_.fetch_add(1); |
36 | } | |
37 | ||
11fdf7f2 TL |
38 | PessimisticTransaction::PessimisticTransaction( |
39 | TransactionDB* txn_db, const WriteOptions& write_options, | |
494da23a | 40 | const TransactionOptions& txn_options, const bool init) |
20effc67 TL |
41 | : TransactionBaseImpl( |
42 | txn_db->GetRootDB(), write_options, | |
43 | static_cast_with_check<PessimisticTransactionDB>(txn_db) | |
44 | ->GetLockTrackerFactory()), | |
7c673cae | 45 | txn_db_impl_(nullptr), |
11fdf7f2 | 46 | expiration_time_(0), |
7c673cae FG |
47 | txn_id_(0), |
48 | waiting_cf_id_(0), | |
49 | waiting_key_(nullptr), | |
7c673cae FG |
50 | lock_timeout_(0), |
51 | deadlock_detect_(false), | |
11fdf7f2 TL |
52 | deadlock_detect_depth_(0), |
53 | skip_concurrency_control_(false) { | |
20effc67 TL |
54 | txn_db_impl_ = static_cast_with_check<PessimisticTransactionDB>(txn_db); |
55 | db_impl_ = static_cast_with_check<DBImpl>(db_); | |
494da23a TL |
56 | if (init) { |
57 | Initialize(txn_options); | |
58 | } | |
7c673cae FG |
59 | } |
60 | ||
11fdf7f2 | 61 | void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) { |
7c673cae FG |
62 | txn_id_ = GenTxnID(); |
63 | ||
64 | txn_state_ = STARTED; | |
65 | ||
66 | deadlock_detect_ = txn_options.deadlock_detect; | |
67 | deadlock_detect_depth_ = txn_options.deadlock_detect_depth; | |
68 | write_batch_.SetMaxBytes(txn_options.max_write_batch_size); | |
11fdf7f2 | 69 | skip_concurrency_control_ = txn_options.skip_concurrency_control; |
7c673cae FG |
70 | |
71 | lock_timeout_ = txn_options.lock_timeout * 1000; | |
72 | if (lock_timeout_ < 0) { | |
73 | // Lock timeout not set, use default | |
74 | lock_timeout_ = | |
75 | txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000; | |
76 | } | |
77 | ||
78 | if (txn_options.expiration >= 0) { | |
79 | expiration_time_ = start_time_ + txn_options.expiration * 1000; | |
80 | } else { | |
81 | expiration_time_ = 0; | |
82 | } | |
83 | ||
84 | if (txn_options.set_snapshot) { | |
85 | SetSnapshot(); | |
86 | } | |
87 | ||
88 | if (expiration_time_ > 0) { | |
89 | txn_db_impl_->InsertExpirableTransaction(txn_id_, this); | |
90 | } | |
11fdf7f2 TL |
91 | use_only_the_last_commit_time_batch_for_recovery_ = |
92 | txn_options.use_only_the_last_commit_time_batch_for_recovery; | |
20effc67 | 93 | skip_prepare_ = txn_options.skip_prepare; |
7c673cae FG |
94 | } |
95 | ||
11fdf7f2 | 96 | PessimisticTransaction::~PessimisticTransaction() { |
20effc67 | 97 | txn_db_impl_->UnLock(this, *tracked_locks_); |
7c673cae FG |
98 | if (expiration_time_ > 0) { |
99 | txn_db_impl_->RemoveExpirableTransaction(txn_id_); | |
100 | } | |
20effc67 | 101 | if (!name_.empty() && txn_state_ != COMMITTED) { |
7c673cae FG |
102 | txn_db_impl_->UnregisterTransaction(this); |
103 | } | |
104 | } | |
105 | ||
11fdf7f2 | 106 | void PessimisticTransaction::Clear() { |
20effc67 | 107 | txn_db_impl_->UnLock(this, *tracked_locks_); |
7c673cae FG |
108 | TransactionBaseImpl::Clear(); |
109 | } | |
110 | ||
11fdf7f2 TL |
111 | void PessimisticTransaction::Reinitialize( |
112 | TransactionDB* txn_db, const WriteOptions& write_options, | |
113 | const TransactionOptions& txn_options) { | |
20effc67 | 114 | if (!name_.empty() && txn_state_ != COMMITTED) { |
7c673cae FG |
115 | txn_db_impl_->UnregisterTransaction(this); |
116 | } | |
117 | TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options); | |
118 | Initialize(txn_options); | |
119 | } | |
120 | ||
11fdf7f2 | 121 | bool PessimisticTransaction::IsExpired() const { |
7c673cae FG |
122 | if (expiration_time_ > 0) { |
123 | if (db_->GetEnv()->NowMicros() >= expiration_time_) { | |
124 | // Transaction is expired. | |
125 | return true; | |
126 | } | |
127 | } | |
128 | ||
129 | return false; | |
130 | } | |
131 | ||
11fdf7f2 TL |
132 | WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db, |
133 | const WriteOptions& write_options, | |
134 | const TransactionOptions& txn_options) | |
135 | : PessimisticTransaction(txn_db, write_options, txn_options){}; | |
136 | ||
137 | Status PessimisticTransaction::CommitBatch(WriteBatch* batch) { | |
20effc67 TL |
138 | std::unique_ptr<LockTracker> keys_to_unlock(lock_tracker_factory_.Create()); |
139 | Status s = LockBatch(batch, keys_to_unlock.get()); | |
7c673cae FG |
140 | |
141 | if (!s.ok()) { | |
142 | return s; | |
143 | } | |
144 | ||
145 | bool can_commit = false; | |
146 | ||
147 | if (IsExpired()) { | |
148 | s = Status::Expired(); | |
149 | } else if (expiration_time_ > 0) { | |
150 | TransactionState expected = STARTED; | |
151 | can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected, | |
152 | AWAITING_COMMIT); | |
153 | } else if (txn_state_ == STARTED) { | |
154 | // lock stealing is not a concern | |
155 | can_commit = true; | |
156 | } | |
157 | ||
158 | if (can_commit) { | |
159 | txn_state_.store(AWAITING_COMMIT); | |
11fdf7f2 | 160 | s = CommitBatchInternal(batch); |
7c673cae | 161 | if (s.ok()) { |
20effc67 | 162 | txn_state_.store(COMMITTED); |
7c673cae FG |
163 | } |
164 | } else if (txn_state_ == LOCKS_STOLEN) { | |
165 | s = Status::Expired(); | |
166 | } else { | |
167 | s = Status::InvalidArgument("Transaction is not in state for commit."); | |
168 | } | |
169 | ||
20effc67 | 170 | txn_db_impl_->UnLock(this, *keys_to_unlock); |
7c673cae FG |
171 | |
172 | return s; | |
173 | } | |
174 | ||
11fdf7f2 | 175 | Status PessimisticTransaction::Prepare() { |
7c673cae FG |
176 | |
177 | if (name_.empty()) { | |
178 | return Status::InvalidArgument( | |
179 | "Cannot prepare a transaction that has not been named."); | |
180 | } | |
181 | ||
182 | if (IsExpired()) { | |
183 | return Status::Expired(); | |
184 | } | |
185 | ||
20effc67 | 186 | Status s; |
7c673cae FG |
187 | bool can_prepare = false; |
188 | ||
189 | if (expiration_time_ > 0) { | |
190 | // must concern ourselves with expiraton and/or lock stealing | |
191 | // need to compare/exchange bc locks could be stolen under us here | |
192 | TransactionState expected = STARTED; | |
193 | can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected, | |
194 | AWAITING_PREPARE); | |
195 | } else if (txn_state_ == STARTED) { | |
196 | // expiration and lock stealing is not possible | |
20effc67 | 197 | txn_state_.store(AWAITING_PREPARE); |
7c673cae FG |
198 | can_prepare = true; |
199 | } | |
200 | ||
201 | if (can_prepare) { | |
7c673cae FG |
202 | // transaction can't expire after preparation |
203 | expiration_time_ = 0; | |
494da23a TL |
204 | assert(log_number_ == 0 || |
205 | txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED); | |
11fdf7f2 TL |
206 | |
207 | s = PrepareInternal(); | |
7c673cae | 208 | if (s.ok()) { |
7c673cae FG |
209 | txn_state_.store(PREPARED); |
210 | } | |
211 | } else if (txn_state_ == LOCKS_STOLEN) { | |
212 | s = Status::Expired(); | |
213 | } else if (txn_state_ == PREPARED) { | |
214 | s = Status::InvalidArgument("Transaction has already been prepared."); | |
20effc67 | 215 | } else if (txn_state_ == COMMITTED) { |
7c673cae FG |
216 | s = Status::InvalidArgument("Transaction has already been committed."); |
217 | } else if (txn_state_ == ROLLEDBACK) { | |
218 | s = Status::InvalidArgument("Transaction has already been rolledback."); | |
219 | } else { | |
220 | s = Status::InvalidArgument("Transaction is not in state for commit."); | |
221 | } | |
222 | ||
223 | return s; | |
224 | } | |
225 | ||
11fdf7f2 TL |
226 | Status WriteCommittedTxn::PrepareInternal() { |
227 | WriteOptions write_options = write_options_; | |
228 | write_options.disableWAL = false; | |
20effc67 TL |
229 | auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), |
230 | name_); | |
231 | assert(s.ok()); | |
494da23a TL |
232 | class MarkLogCallback : public PreReleaseCallback { |
233 | public: | |
234 | MarkLogCallback(DBImpl* db, bool two_write_queues) | |
235 | : db_(db), two_write_queues_(two_write_queues) { | |
236 | (void)two_write_queues_; // to silence unused private field warning | |
237 | } | |
238 | virtual Status Callback(SequenceNumber, bool is_mem_disabled, | |
f67539c2 TL |
239 | uint64_t log_number, size_t /*index*/, |
240 | size_t /*total*/) override { | |
494da23a TL |
241 | #ifdef NDEBUG |
242 | (void)is_mem_disabled; | |
243 | #endif | |
244 | assert(log_number != 0); | |
245 | assert(!two_write_queues_ || is_mem_disabled); // implies the 2nd queue | |
246 | db_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number); | |
247 | return Status::OK(); | |
248 | } | |
249 | ||
250 | private: | |
251 | DBImpl* db_; | |
252 | bool two_write_queues_; | |
253 | } mark_log_callback(db_impl_, | |
254 | db_impl_->immutable_db_options().two_write_queues); | |
255 | ||
256 | WriteCallback* const kNoWriteCallback = nullptr; | |
257 | const uint64_t kRefNoLog = 0; | |
258 | const bool kDisableMemtable = true; | |
259 | SequenceNumber* const KIgnoreSeqUsed = nullptr; | |
260 | const size_t kNoBatchCount = 0; | |
20effc67 TL |
261 | s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), |
262 | kNoWriteCallback, &log_number_, kRefNoLog, | |
263 | kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount, | |
264 | &mark_log_callback); | |
11fdf7f2 TL |
265 | return s; |
266 | } | |
267 | ||
268 | Status PessimisticTransaction::Commit() { | |
11fdf7f2 | 269 | bool commit_without_prepare = false; |
7c673cae FG |
270 | bool commit_prepared = false; |
271 | ||
272 | if (IsExpired()) { | |
273 | return Status::Expired(); | |
274 | } | |
275 | ||
276 | if (expiration_time_ > 0) { | |
277 | // we must atomicaly compare and exchange the state here because at | |
278 | // this state in the transaction it is possible for another thread | |
279 | // to change our state out from under us in the even that we expire and have | |
280 | // our locks stolen. In this case the only valid state is STARTED because | |
281 | // a state of PREPARED would have a cleared expiration_time_. | |
282 | TransactionState expected = STARTED; | |
11fdf7f2 TL |
283 | commit_without_prepare = std::atomic_compare_exchange_strong( |
284 | &txn_state_, &expected, AWAITING_COMMIT); | |
7c673cae FG |
285 | TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1"); |
286 | } else if (txn_state_ == PREPARED) { | |
287 | // expiration and lock stealing is not a concern | |
288 | commit_prepared = true; | |
289 | } else if (txn_state_ == STARTED) { | |
290 | // expiration and lock stealing is not a concern | |
20effc67 TL |
291 | if (skip_prepare_) { |
292 | commit_without_prepare = true; | |
293 | } else { | |
294 | return Status::TxnNotPrepared(); | |
295 | } | |
7c673cae FG |
296 | } |
297 | ||
20effc67 | 298 | Status s; |
11fdf7f2 | 299 | if (commit_without_prepare) { |
7c673cae FG |
300 | assert(!commit_prepared); |
301 | if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) { | |
302 | s = Status::InvalidArgument( | |
303 | "Commit-time batch contains values that will not be committed."); | |
304 | } else { | |
305 | txn_state_.store(AWAITING_COMMIT); | |
11fdf7f2 TL |
306 | if (log_number_ > 0) { |
307 | dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( | |
308 | log_number_); | |
309 | } | |
310 | s = CommitWithoutPrepareInternal(); | |
311 | if (!name_.empty()) { | |
312 | txn_db_impl_->UnregisterTransaction(this); | |
313 | } | |
7c673cae FG |
314 | Clear(); |
315 | if (s.ok()) { | |
20effc67 | 316 | txn_state_.store(COMMITTED); |
7c673cae FG |
317 | } |
318 | } | |
319 | } else if (commit_prepared) { | |
320 | txn_state_.store(AWAITING_COMMIT); | |
321 | ||
11fdf7f2 | 322 | s = CommitInternal(); |
7c673cae | 323 | |
7c673cae | 324 | if (!s.ok()) { |
11fdf7f2 TL |
325 | ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, |
326 | "Commit write failed"); | |
7c673cae FG |
327 | return s; |
328 | } | |
329 | ||
330 | // FindObsoleteFiles must now look to the memtables | |
331 | // to determine what prep logs must be kept around, | |
332 | // not the prep section heap. | |
333 | assert(log_number_ > 0); | |
11fdf7f2 TL |
334 | dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( |
335 | log_number_); | |
7c673cae FG |
336 | txn_db_impl_->UnregisterTransaction(this); |
337 | ||
338 | Clear(); | |
20effc67 | 339 | txn_state_.store(COMMITTED); |
7c673cae FG |
340 | } else if (txn_state_ == LOCKS_STOLEN) { |
341 | s = Status::Expired(); | |
20effc67 | 342 | } else if (txn_state_ == COMMITTED) { |
7c673cae FG |
343 | s = Status::InvalidArgument("Transaction has already been committed."); |
344 | } else if (txn_state_ == ROLLEDBACK) { | |
345 | s = Status::InvalidArgument("Transaction has already been rolledback."); | |
346 | } else { | |
347 | s = Status::InvalidArgument("Transaction is not in state for commit."); | |
348 | } | |
349 | ||
350 | return s; | |
351 | } | |
352 | ||
11fdf7f2 | 353 | Status WriteCommittedTxn::CommitWithoutPrepareInternal() { |
494da23a TL |
354 | uint64_t seq_used = kMaxSequenceNumber; |
355 | auto s = | |
356 | db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(), | |
357 | /*callback*/ nullptr, /*log_used*/ nullptr, | |
358 | /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used); | |
359 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
360 | if (s.ok()) { | |
361 | SetId(seq_used); | |
362 | } | |
11fdf7f2 TL |
363 | return s; |
364 | } | |
365 | ||
366 | Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) { | |
494da23a TL |
367 | uint64_t seq_used = kMaxSequenceNumber; |
368 | auto s = db_impl_->WriteImpl(write_options_, batch, /*callback*/ nullptr, | |
369 | /*log_used*/ nullptr, /*log_ref*/ 0, | |
370 | /*disable_memtable*/ false, &seq_used); | |
371 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
372 | if (s.ok()) { | |
373 | SetId(seq_used); | |
374 | } | |
11fdf7f2 TL |
375 | return s; |
376 | } | |
377 | ||
378 | Status WriteCommittedTxn::CommitInternal() { | |
379 | // We take the commit-time batch and append the Commit marker. | |
380 | // The Memtable will ignore the Commit marker in non-recovery mode | |
381 | WriteBatch* working_batch = GetCommitTimeWriteBatch(); | |
20effc67 TL |
382 | auto s = WriteBatchInternal::MarkCommit(working_batch, name_); |
383 | assert(s.ok()); | |
11fdf7f2 TL |
384 | |
385 | // any operations appended to this working_batch will be ignored from WAL | |
386 | working_batch->MarkWalTerminationPoint(); | |
387 | ||
388 | // insert prepared batch into Memtable only skipping WAL. | |
389 | // Memtable will ignore BeginPrepare/EndPrepare markers | |
390 | // in non recovery mode and simply insert the values | |
20effc67 TL |
391 | s = WriteBatchInternal::Append(working_batch, |
392 | GetWriteBatch()->GetWriteBatch()); | |
393 | assert(s.ok()); | |
11fdf7f2 | 394 | |
494da23a | 395 | uint64_t seq_used = kMaxSequenceNumber; |
20effc67 | 396 | s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr, |
494da23a | 397 | /*log_used*/ nullptr, /*log_ref*/ log_number_, |
20effc67 | 398 | /*disable_memtable*/ false, &seq_used); |
494da23a TL |
399 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
400 | if (s.ok()) { | |
401 | SetId(seq_used); | |
402 | } | |
11fdf7f2 TL |
403 | return s; |
404 | } | |
405 | ||
406 | Status PessimisticTransaction::Rollback() { | |
7c673cae FG |
407 | Status s; |
408 | if (txn_state_ == PREPARED) { | |
7c673cae | 409 | txn_state_.store(AWAITING_ROLLBACK); |
11fdf7f2 TL |
410 | |
411 | s = RollbackInternal(); | |
412 | ||
7c673cae FG |
413 | if (s.ok()) { |
414 | // we do not need to keep our prepared section around | |
415 | assert(log_number_ > 0); | |
11fdf7f2 TL |
416 | dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( |
417 | log_number_); | |
7c673cae FG |
418 | Clear(); |
419 | txn_state_.store(ROLLEDBACK); | |
420 | } | |
421 | } else if (txn_state_ == STARTED) { | |
11fdf7f2 TL |
422 | if (log_number_ > 0) { |
423 | assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED); | |
424 | assert(GetId() > 0); | |
425 | s = RollbackInternal(); | |
426 | ||
427 | if (s.ok()) { | |
428 | dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( | |
429 | log_number_); | |
430 | } | |
431 | } | |
7c673cae FG |
432 | // prepare couldn't have taken place |
433 | Clear(); | |
20effc67 | 434 | } else if (txn_state_ == COMMITTED) { |
7c673cae FG |
435 | s = Status::InvalidArgument("This transaction has already been committed."); |
436 | } else { | |
437 | s = Status::InvalidArgument( | |
438 | "Two phase transaction is not in state for rollback."); | |
439 | } | |
440 | ||
441 | return s; | |
442 | } | |
443 | ||
11fdf7f2 TL |
444 | Status WriteCommittedTxn::RollbackInternal() { |
445 | WriteBatch rollback_marker; | |
20effc67 TL |
446 | auto s = WriteBatchInternal::MarkRollback(&rollback_marker, name_); |
447 | assert(s.ok()); | |
448 | s = db_impl_->WriteImpl(write_options_, &rollback_marker); | |
11fdf7f2 TL |
449 | return s; |
450 | } | |
451 | ||
452 | Status PessimisticTransaction::RollbackToSavePoint() { | |
7c673cae FG |
453 | if (txn_state_ != STARTED) { |
454 | return Status::InvalidArgument("Transaction is beyond state for rollback."); | |
455 | } | |
456 | ||
20effc67 TL |
457 | if (save_points_ != nullptr && !save_points_->empty()) { |
458 | // Unlock any keys locked since last transaction | |
459 | auto& save_point_tracker = *save_points_->top().new_locks_; | |
460 | std::unique_ptr<LockTracker> t( | |
461 | tracked_locks_->GetTrackedLocksSinceSavePoint(save_point_tracker)); | |
462 | if (t) { | |
463 | txn_db_impl_->UnLock(this, *t); | |
464 | } | |
7c673cae FG |
465 | } |
466 | ||
467 | return TransactionBaseImpl::RollbackToSavePoint(); | |
468 | } | |
469 | ||
470 | // Lock all keys in this batch. | |
471 | // On success, caller should unlock keys_to_unlock | |
11fdf7f2 | 472 | Status PessimisticTransaction::LockBatch(WriteBatch* batch, |
20effc67 | 473 | LockTracker* keys_to_unlock) { |
7c673cae FG |
474 | class Handler : public WriteBatch::Handler { |
475 | public: | |
476 | // Sorted map of column_family_id to sorted set of keys. | |
477 | // Since LockBatch() always locks keys in sorted order, it cannot deadlock | |
478 | // with itself. We're not using a comparator here since it doesn't matter | |
479 | // what the sorting is as long as it's consistent. | |
480 | std::map<uint32_t, std::set<std::string>> keys_; | |
481 | ||
482 | Handler() {} | |
483 | ||
484 | void RecordKey(uint32_t column_family_id, const Slice& key) { | |
485 | std::string key_str = key.ToString(); | |
486 | ||
f67539c2 TL |
487 | auto& cfh_keys = keys_[column_family_id]; |
488 | auto iter = cfh_keys.find(key_str); | |
489 | if (iter == cfh_keys.end()) { | |
7c673cae | 490 | // key not yet seen, store it. |
f67539c2 | 491 | cfh_keys.insert({std::move(key_str)}); |
7c673cae FG |
492 | } |
493 | } | |
494 | ||
494da23a TL |
495 | Status PutCF(uint32_t column_family_id, const Slice& key, |
496 | const Slice& /* unused */) override { | |
7c673cae FG |
497 | RecordKey(column_family_id, key); |
498 | return Status::OK(); | |
499 | } | |
494da23a TL |
500 | Status MergeCF(uint32_t column_family_id, const Slice& key, |
501 | const Slice& /* unused */) override { | |
7c673cae FG |
502 | RecordKey(column_family_id, key); |
503 | return Status::OK(); | |
504 | } | |
494da23a | 505 | Status DeleteCF(uint32_t column_family_id, const Slice& key) override { |
7c673cae FG |
506 | RecordKey(column_family_id, key); |
507 | return Status::OK(); | |
508 | } | |
509 | }; | |
510 | ||
511 | // Iterating on this handler will add all keys in this batch into keys | |
512 | Handler handler; | |
20effc67 TL |
513 | Status s = batch->Iterate(&handler); |
514 | if (!s.ok()) { | |
515 | return s; | |
516 | } | |
7c673cae FG |
517 | |
518 | // Attempt to lock all keys | |
519 | for (const auto& cf_iter : handler.keys_) { | |
520 | uint32_t cfh_id = cf_iter.first; | |
521 | auto& cfh_keys = cf_iter.second; | |
522 | ||
523 | for (const auto& key_iter : cfh_keys) { | |
524 | const std::string& key = key_iter; | |
525 | ||
526 | s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */); | |
527 | if (!s.ok()) { | |
528 | break; | |
529 | } | |
20effc67 TL |
530 | PointLockRequest r; |
531 | r.column_family_id = cfh_id; | |
532 | r.key = key; | |
533 | r.seq = kMaxSequenceNumber; | |
534 | r.read_only = false; | |
535 | r.exclusive = true; | |
536 | keys_to_unlock->Track(r); | |
7c673cae FG |
537 | } |
538 | ||
539 | if (!s.ok()) { | |
540 | break; | |
541 | } | |
542 | } | |
543 | ||
544 | if (!s.ok()) { | |
20effc67 | 545 | txn_db_impl_->UnLock(this, *keys_to_unlock); |
7c673cae FG |
546 | } |
547 | ||
548 | return s; | |
549 | } | |
550 | ||
551 | // Attempt to lock this key. | |
552 | // Returns OK if the key has been successfully locked. Non-ok, otherwise. | |
553 | // If check_shapshot is true and this transaction has a snapshot set, | |
554 | // this key will only be locked if there have been no writes to this key since | |
555 | // the snapshot time. | |
11fdf7f2 TL |
556 | Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, |
557 | const Slice& key, bool read_only, | |
494da23a TL |
558 | bool exclusive, const bool do_validate, |
559 | const bool assume_tracked) { | |
560 | assert(!assume_tracked || !do_validate); | |
11fdf7f2 TL |
561 | Status s; |
562 | if (UNLIKELY(skip_concurrency_control_)) { | |
563 | return s; | |
564 | } | |
7c673cae FG |
565 | uint32_t cfh_id = GetColumnFamilyID(column_family); |
566 | std::string key_str = key.ToString(); | |
20effc67 TL |
567 | PointLockStatus status = tracked_locks_->GetPointLockStatus(cfh_id, key_str); |
568 | bool previously_locked = status.locked; | |
569 | bool lock_upgrade = previously_locked && exclusive && !status.exclusive; | |
7c673cae FG |
570 | |
571 | // Lock this key if this transactions hasn't already locked it or we require | |
572 | // an upgrade. | |
573 | if (!previously_locked || lock_upgrade) { | |
574 | s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive); | |
575 | } | |
576 | ||
577 | SetSnapshotIfNeeded(); | |
578 | ||
579 | // Even though we do not care about doing conflict checking for this write, | |
580 | // we still need to take a lock to make sure we do not cause a conflict with | |
581 | // some other write. However, we do not need to check if there have been | |
582 | // any writes since this transaction's snapshot. | |
583 | // TODO(agiardullo): could optimize by supporting shared txn locks in the | |
584 | // future | |
20effc67 TL |
585 | SequenceNumber tracked_at_seq = |
586 | status.locked ? status.seq : kMaxSequenceNumber; | |
494da23a TL |
587 | if (!do_validate || snapshot_ == nullptr) { |
588 | if (assume_tracked && !previously_locked) { | |
589 | s = Status::InvalidArgument( | |
590 | "assume_tracked is set but it is not tracked yet"); | |
591 | } | |
7c673cae FG |
592 | // Need to remember the earliest sequence number that we know that this |
593 | // key has not been modified after. This is useful if this same | |
594 | // transaction | |
595 | // later tries to lock this key again. | |
11fdf7f2 | 596 | if (tracked_at_seq == kMaxSequenceNumber) { |
7c673cae FG |
597 | // Since we haven't checked a snapshot, we only know this key has not |
598 | // been modified since after we locked it. | |
11fdf7f2 TL |
599 | // Note: when last_seq_same_as_publish_seq_==false this is less than the |
600 | // latest allocated seq but it is ok since i) this is just a heuristic | |
601 | // used only as a hint to avoid actual check for conflicts, ii) this would | |
602 | // cause a false positive only if the snapthot is taken right after the | |
603 | // lock, which would be an unusual sequence. | |
604 | tracked_at_seq = db_->GetLatestSequenceNumber(); | |
7c673cae FG |
605 | } |
606 | } else { | |
607 | // If a snapshot is set, we need to make sure the key hasn't been modified | |
608 | // since the snapshot. This must be done after we locked the key. | |
11fdf7f2 TL |
609 | // If we already have validated an earilier snapshot it must has been |
610 | // reflected in tracked_at_seq and ValidateSnapshot will return OK. | |
7c673cae | 611 | if (s.ok()) { |
11fdf7f2 | 612 | s = ValidateSnapshot(column_family, key, &tracked_at_seq); |
7c673cae FG |
613 | |
614 | if (!s.ok()) { | |
615 | // Failed to validate key | |
20effc67 TL |
616 | // Unlock key we just locked |
617 | if (lock_upgrade) { | |
618 | s = txn_db_impl_->TryLock(this, cfh_id, key_str, | |
619 | false /* exclusive */); | |
620 | assert(s.ok()); | |
621 | } else if (!previously_locked) { | |
622 | txn_db_impl_->UnLock(this, cfh_id, key.ToString()); | |
7c673cae FG |
623 | } |
624 | } | |
625 | } | |
626 | } | |
627 | ||
628 | if (s.ok()) { | |
11fdf7f2 TL |
629 | // We must track all the locked keys so that we can unlock them later. If |
630 | // the key is already locked, this func will update some stats on the | |
f67539c2 TL |
631 | // tracked key. It could also update the tracked_at_seq if it is lower |
632 | // than the existing tracked key seq. These stats are necessary for | |
633 | // RollbackToSavePoint to determine whether a key can be safely removed | |
634 | // from tracked_keys_. Removal can only be done if a key was only locked | |
635 | // during the current savepoint. | |
636 | // | |
637 | // Recall that if assume_tracked is true, we assume that TrackKey has been | |
638 | // called previously since the last savepoint, with the same exclusive | |
639 | // setting, and at a lower sequence number, so skipping here should be | |
640 | // safe. | |
641 | if (!assume_tracked) { | |
642 | TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive); | |
643 | } else { | |
644 | #ifndef NDEBUG | |
20effc67 TL |
645 | PointLockStatus lock_status = |
646 | tracked_locks_->GetPointLockStatus(cfh_id, key_str); | |
647 | assert(lock_status.locked); | |
648 | assert(lock_status.seq <= tracked_at_seq); | |
649 | assert(lock_status.exclusive == exclusive); | |
f67539c2 TL |
650 | #endif |
651 | } | |
7c673cae FG |
652 | } |
653 | ||
654 | return s; | |
655 | } | |
656 | ||
657 | // Return OK() if this key has not been modified more recently than the | |
658 | // transaction snapshot_. | |
11fdf7f2 TL |
659 | // tracked_at_seq is the global seq at which we either locked the key or already |
660 | // have done ValidateSnapshot. | |
661 | Status PessimisticTransaction::ValidateSnapshot( | |
662 | ColumnFamilyHandle* column_family, const Slice& key, | |
663 | SequenceNumber* tracked_at_seq) { | |
7c673cae FG |
664 | assert(snapshot_); |
665 | ||
11fdf7f2 TL |
666 | SequenceNumber snap_seq = snapshot_->GetSequenceNumber(); |
667 | if (*tracked_at_seq <= snap_seq) { | |
668 | // If the key has been previous validated (or locked) at a sequence number | |
669 | // earlier than the current snapshot's sequence number, we already know it | |
670 | // has not been modified aftter snap_seq either. | |
7c673cae FG |
671 | return Status::OK(); |
672 | } | |
11fdf7f2 TL |
673 | // Otherwise we have either |
674 | // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key | |
675 | // 2: snap_seq < tracked_at_seq: last time we lock the key was via | |
494da23a | 676 | // do_validate=false which means we had skipped ValidateSnapshot. In both |
11fdf7f2 | 677 | // cases we should do ValidateSnapshot now. |
7c673cae | 678 | |
11fdf7f2 | 679 | *tracked_at_seq = snap_seq; |
7c673cae FG |
680 | |
681 | ColumnFamilyHandle* cfh = | |
11fdf7f2 | 682 | column_family ? column_family : db_impl_->DefaultColumnFamily(); |
7c673cae | 683 | |
11fdf7f2 TL |
684 | return TransactionUtil::CheckKeyForConflicts( |
685 | db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */); | |
7c673cae FG |
686 | } |
687 | ||
11fdf7f2 | 688 | bool PessimisticTransaction::TryStealingLocks() { |
7c673cae FG |
689 | assert(IsExpired()); |
690 | TransactionState expected = STARTED; | |
691 | return std::atomic_compare_exchange_strong(&txn_state_, &expected, | |
692 | LOCKS_STOLEN); | |
693 | } | |
694 | ||
11fdf7f2 TL |
695 | void PessimisticTransaction::UnlockGetForUpdate( |
696 | ColumnFamilyHandle* column_family, const Slice& key) { | |
7c673cae FG |
697 | txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString()); |
698 | } | |
699 | ||
11fdf7f2 | 700 | Status PessimisticTransaction::SetName(const TransactionName& name) { |
7c673cae FG |
701 | Status s; |
702 | if (txn_state_ == STARTED) { | |
703 | if (name_.length()) { | |
704 | s = Status::InvalidArgument("Transaction has already been named."); | |
705 | } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) { | |
706 | s = Status::InvalidArgument("Transaction name must be unique."); | |
707 | } else if (name.length() < 1 || name.length() > 512) { | |
708 | s = Status::InvalidArgument( | |
709 | "Transaction name length must be between 1 and 512 chars."); | |
710 | } else { | |
711 | name_ = name; | |
712 | txn_db_impl_->RegisterTransaction(this); | |
713 | } | |
714 | } else { | |
715 | s = Status::InvalidArgument("Transaction is beyond state for naming."); | |
716 | } | |
717 | return s; | |
718 | } | |
719 | ||
f67539c2 | 720 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
721 | |
722 | #endif // ROCKSDB_LITE |