]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/transaction_impl.cc
bump version to 12.2.12-pve1
[ceph.git] / ceph / src / rocksdb / utilities / transactions / transaction_impl.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/transaction_impl.h"
9
10 #include <map>
11 #include <set>
12 #include <string>
13 #include <vector>
14
15 #include "db/column_family.h"
16 #include "db/db_impl.h"
17 #include "rocksdb/comparator.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/snapshot.h"
20 #include "rocksdb/status.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "util/string_util.h"
23 #include "util/sync_point.h"
24 #include "utilities/transactions/transaction_db_impl.h"
25 #include "utilities/transactions/transaction_util.h"
26
27 namespace rocksdb {
28
29 struct WriteOptions;
30
31 std::atomic<TransactionID> TransactionImpl::txn_id_counter_(1);
32
33 TransactionID TransactionImpl::GenTxnID() {
34 return txn_id_counter_.fetch_add(1);
35 }
36
37 TransactionImpl::TransactionImpl(TransactionDB* txn_db,
38 const WriteOptions& write_options,
39 const TransactionOptions& txn_options)
40 : TransactionBaseImpl(txn_db->GetRootDB(), write_options),
41 txn_db_impl_(nullptr),
42 txn_id_(0),
43 waiting_cf_id_(0),
44 waiting_key_(nullptr),
45 expiration_time_(0),
46 lock_timeout_(0),
47 deadlock_detect_(false),
48 deadlock_detect_depth_(0) {
49 txn_db_impl_ = dynamic_cast<TransactionDBImpl*>(txn_db);
50 assert(txn_db_impl_);
51 db_impl_ = dynamic_cast<DBImpl*>(txn_db->GetRootDB());
52 assert(db_impl_);
53 Initialize(txn_options);
54 }
55
56 void TransactionImpl::Initialize(const TransactionOptions& txn_options) {
57 txn_id_ = GenTxnID();
58
59 txn_state_ = STARTED;
60
61 deadlock_detect_ = txn_options.deadlock_detect;
62 deadlock_detect_depth_ = txn_options.deadlock_detect_depth;
63 write_batch_.SetMaxBytes(txn_options.max_write_batch_size);
64
65 lock_timeout_ = txn_options.lock_timeout * 1000;
66 if (lock_timeout_ < 0) {
67 // Lock timeout not set, use default
68 lock_timeout_ =
69 txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000;
70 }
71
72 if (txn_options.expiration >= 0) {
73 expiration_time_ = start_time_ + txn_options.expiration * 1000;
74 } else {
75 expiration_time_ = 0;
76 }
77
78 if (txn_options.set_snapshot) {
79 SetSnapshot();
80 }
81
82 if (expiration_time_ > 0) {
83 txn_db_impl_->InsertExpirableTransaction(txn_id_, this);
84 }
85 }
86
87 TransactionImpl::~TransactionImpl() {
88 txn_db_impl_->UnLock(this, &GetTrackedKeys());
89 if (expiration_time_ > 0) {
90 txn_db_impl_->RemoveExpirableTransaction(txn_id_);
91 }
92 if (!name_.empty() && txn_state_ != COMMITED) {
93 txn_db_impl_->UnregisterTransaction(this);
94 }
95 }
96
97 void TransactionImpl::Clear() {
98 txn_db_impl_->UnLock(this, &GetTrackedKeys());
99 TransactionBaseImpl::Clear();
100 }
101
102 void TransactionImpl::Reinitialize(TransactionDB* txn_db,
103 const WriteOptions& write_options,
104 const TransactionOptions& txn_options) {
105 if (!name_.empty() && txn_state_ != COMMITED) {
106 txn_db_impl_->UnregisterTransaction(this);
107 }
108 TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options);
109 Initialize(txn_options);
110 }
111
112 bool TransactionImpl::IsExpired() const {
113 if (expiration_time_ > 0) {
114 if (db_->GetEnv()->NowMicros() >= expiration_time_) {
115 // Transaction is expired.
116 return true;
117 }
118 }
119
120 return false;
121 }
122
123 Status TransactionImpl::CommitBatch(WriteBatch* batch) {
124 TransactionKeyMap keys_to_unlock;
125 Status s = LockBatch(batch, &keys_to_unlock);
126
127 if (!s.ok()) {
128 return s;
129 }
130
131 bool can_commit = false;
132
133 if (IsExpired()) {
134 s = Status::Expired();
135 } else if (expiration_time_ > 0) {
136 TransactionState expected = STARTED;
137 can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected,
138 AWAITING_COMMIT);
139 } else if (txn_state_ == STARTED) {
140 // lock stealing is not a concern
141 can_commit = true;
142 }
143
144 if (can_commit) {
145 txn_state_.store(AWAITING_COMMIT);
146 s = db_->Write(write_options_, batch);
147 if (s.ok()) {
148 txn_state_.store(COMMITED);
149 }
150 } else if (txn_state_ == LOCKS_STOLEN) {
151 s = Status::Expired();
152 } else {
153 s = Status::InvalidArgument("Transaction is not in state for commit.");
154 }
155
156 txn_db_impl_->UnLock(this, &keys_to_unlock);
157
158 return s;
159 }
160
161 Status TransactionImpl::Prepare() {
162 Status s;
163
164 if (name_.empty()) {
165 return Status::InvalidArgument(
166 "Cannot prepare a transaction that has not been named.");
167 }
168
169 if (IsExpired()) {
170 return Status::Expired();
171 }
172
173 bool can_prepare = false;
174
175 if (expiration_time_ > 0) {
176 // must concern ourselves with expiraton and/or lock stealing
177 // need to compare/exchange bc locks could be stolen under us here
178 TransactionState expected = STARTED;
179 can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected,
180 AWAITING_PREPARE);
181 } else if (txn_state_ == STARTED) {
182 // expiration and lock stealing is not possible
183 can_prepare = true;
184 }
185
186 if (can_prepare) {
187 txn_state_.store(AWAITING_PREPARE);
188 // transaction can't expire after preparation
189 expiration_time_ = 0;
190 WriteOptions write_options = write_options_;
191 write_options.disableWAL = false;
192 WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
193 s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
194 /*callback*/ nullptr, &log_number_, /*log ref*/ 0,
195 /* disable_memtable*/ true);
196 if (s.ok()) {
197 assert(log_number_ != 0);
198 dbimpl_->MarkLogAsContainingPrepSection(log_number_);
199 txn_state_.store(PREPARED);
200 }
201 } else if (txn_state_ == LOCKS_STOLEN) {
202 s = Status::Expired();
203 } else if (txn_state_ == PREPARED) {
204 s = Status::InvalidArgument("Transaction has already been prepared.");
205 } else if (txn_state_ == COMMITED) {
206 s = Status::InvalidArgument("Transaction has already been committed.");
207 } else if (txn_state_ == ROLLEDBACK) {
208 s = Status::InvalidArgument("Transaction has already been rolledback.");
209 } else {
210 s = Status::InvalidArgument("Transaction is not in state for commit.");
211 }
212
213 return s;
214 }
215
216 Status TransactionImpl::Commit() {
217 Status s;
218 bool commit_single = false;
219 bool commit_prepared = false;
220
221 if (IsExpired()) {
222 return Status::Expired();
223 }
224
225 if (expiration_time_ > 0) {
226 // we must atomicaly compare and exchange the state here because at
227 // this state in the transaction it is possible for another thread
228 // to change our state out from under us in the even that we expire and have
229 // our locks stolen. In this case the only valid state is STARTED because
230 // a state of PREPARED would have a cleared expiration_time_.
231 TransactionState expected = STARTED;
232 commit_single = std::atomic_compare_exchange_strong(&txn_state_, &expected,
233 AWAITING_COMMIT);
234 TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
235 } else if (txn_state_ == PREPARED) {
236 // expiration and lock stealing is not a concern
237 commit_prepared = true;
238 } else if (txn_state_ == STARTED) {
239 // expiration and lock stealing is not a concern
240 commit_single = true;
241 }
242
243 if (commit_single) {
244 assert(!commit_prepared);
245 if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
246 s = Status::InvalidArgument(
247 "Commit-time batch contains values that will not be committed.");
248 } else {
249 txn_state_.store(AWAITING_COMMIT);
250 s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch());
251 Clear();
252 if (s.ok()) {
253 txn_state_.store(COMMITED);
254 }
255 }
256 } else if (commit_prepared) {
257 txn_state_.store(AWAITING_COMMIT);
258
259 // We take the commit-time batch and append the Commit marker.
260 // The Memtable will ignore the Commit marker in non-recovery mode
261 WriteBatch* working_batch = GetCommitTimeWriteBatch();
262 WriteBatchInternal::MarkCommit(working_batch, name_);
263
264 // any operations appended to this working_batch will be ignored from WAL
265 working_batch->MarkWalTerminationPoint();
266
267 // insert prepared batch into Memtable only skipping WAL.
268 // Memtable will ignore BeginPrepare/EndPrepare markers
269 // in non recovery mode and simply insert the values
270 WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());
271
272 s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
273 log_number_);
274 if (!s.ok()) {
275 return s;
276 }
277
278 // FindObsoleteFiles must now look to the memtables
279 // to determine what prep logs must be kept around,
280 // not the prep section heap.
281 assert(log_number_ > 0);
282 dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_);
283 txn_db_impl_->UnregisterTransaction(this);
284
285 Clear();
286 txn_state_.store(COMMITED);
287 } else if (txn_state_ == LOCKS_STOLEN) {
288 s = Status::Expired();
289 } else if (txn_state_ == COMMITED) {
290 s = Status::InvalidArgument("Transaction has already been committed.");
291 } else if (txn_state_ == ROLLEDBACK) {
292 s = Status::InvalidArgument("Transaction has already been rolledback.");
293 } else {
294 s = Status::InvalidArgument("Transaction is not in state for commit.");
295 }
296
297 return s;
298 }
299
300 Status TransactionImpl::Rollback() {
301 Status s;
302 if (txn_state_ == PREPARED) {
303 WriteBatch rollback_marker;
304 WriteBatchInternal::MarkRollback(&rollback_marker, name_);
305 txn_state_.store(AWAITING_ROLLBACK);
306 s = db_impl_->WriteImpl(write_options_, &rollback_marker);
307 if (s.ok()) {
308 // we do not need to keep our prepared section around
309 assert(log_number_ > 0);
310 dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_);
311 Clear();
312 txn_state_.store(ROLLEDBACK);
313 }
314 } else if (txn_state_ == STARTED) {
315 // prepare couldn't have taken place
316 Clear();
317 } else if (txn_state_ == COMMITED) {
318 s = Status::InvalidArgument("This transaction has already been committed.");
319 } else {
320 s = Status::InvalidArgument(
321 "Two phase transaction is not in state for rollback.");
322 }
323
324 return s;
325 }
326
327 Status TransactionImpl::RollbackToSavePoint() {
328 if (txn_state_ != STARTED) {
329 return Status::InvalidArgument("Transaction is beyond state for rollback.");
330 }
331
332 // Unlock any keys locked since last transaction
333 const std::unique_ptr<TransactionKeyMap>& keys =
334 GetTrackedKeysSinceSavePoint();
335
336 if (keys) {
337 txn_db_impl_->UnLock(this, keys.get());
338 }
339
340 return TransactionBaseImpl::RollbackToSavePoint();
341 }
342
343 // Lock all keys in this batch.
344 // On success, caller should unlock keys_to_unlock
345 Status TransactionImpl::LockBatch(WriteBatch* batch,
346 TransactionKeyMap* keys_to_unlock) {
347 class Handler : public WriteBatch::Handler {
348 public:
349 // Sorted map of column_family_id to sorted set of keys.
350 // Since LockBatch() always locks keys in sorted order, it cannot deadlock
351 // with itself. We're not using a comparator here since it doesn't matter
352 // what the sorting is as long as it's consistent.
353 std::map<uint32_t, std::set<std::string>> keys_;
354
355 Handler() {}
356
357 void RecordKey(uint32_t column_family_id, const Slice& key) {
358 std::string key_str = key.ToString();
359
360 auto iter = (keys_)[column_family_id].find(key_str);
361 if (iter == (keys_)[column_family_id].end()) {
362 // key not yet seen, store it.
363 (keys_)[column_family_id].insert({std::move(key_str)});
364 }
365 }
366
367 virtual Status PutCF(uint32_t column_family_id, const Slice& key,
368 const Slice& value) override {
369 RecordKey(column_family_id, key);
370 return Status::OK();
371 }
372 virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
373 const Slice& value) override {
374 RecordKey(column_family_id, key);
375 return Status::OK();
376 }
377 virtual Status DeleteCF(uint32_t column_family_id,
378 const Slice& key) override {
379 RecordKey(column_family_id, key);
380 return Status::OK();
381 }
382 };
383
384 // Iterating on this handler will add all keys in this batch into keys
385 Handler handler;
386 batch->Iterate(&handler);
387
388 Status s;
389
390 // Attempt to lock all keys
391 for (const auto& cf_iter : handler.keys_) {
392 uint32_t cfh_id = cf_iter.first;
393 auto& cfh_keys = cf_iter.second;
394
395 for (const auto& key_iter : cfh_keys) {
396 const std::string& key = key_iter;
397
398 s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */);
399 if (!s.ok()) {
400 break;
401 }
402 TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber,
403 false, true /* exclusive */);
404 }
405
406 if (!s.ok()) {
407 break;
408 }
409 }
410
411 if (!s.ok()) {
412 txn_db_impl_->UnLock(this, keys_to_unlock);
413 }
414
415 return s;
416 }
417
418 // Attempt to lock this key.
419 // Returns OK if the key has been successfully locked. Non-ok, otherwise.
420 // If check_shapshot is true and this transaction has a snapshot set,
421 // this key will only be locked if there have been no writes to this key since
422 // the snapshot time.
423 Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
424 const Slice& key, bool read_only,
425 bool exclusive, bool untracked) {
426 uint32_t cfh_id = GetColumnFamilyID(column_family);
427 std::string key_str = key.ToString();
428 bool previously_locked;
429 bool lock_upgrade = false;
430 Status s;
431
432 // lock this key if this transactions hasn't already locked it
433 SequenceNumber current_seqno = kMaxSequenceNumber;
434 SequenceNumber new_seqno = kMaxSequenceNumber;
435
436 const auto& tracked_keys = GetTrackedKeys();
437 const auto tracked_keys_cf = tracked_keys.find(cfh_id);
438 if (tracked_keys_cf == tracked_keys.end()) {
439 previously_locked = false;
440 } else {
441 auto iter = tracked_keys_cf->second.find(key_str);
442 if (iter == tracked_keys_cf->second.end()) {
443 previously_locked = false;
444 } else {
445 if (!iter->second.exclusive && exclusive) {
446 lock_upgrade = true;
447 }
448 previously_locked = true;
449 current_seqno = iter->second.seq;
450 }
451 }
452
453 // Lock this key if this transactions hasn't already locked it or we require
454 // an upgrade.
455 if (!previously_locked || lock_upgrade) {
456 s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
457 }
458
459 SetSnapshotIfNeeded();
460
461 // Even though we do not care about doing conflict checking for this write,
462 // we still need to take a lock to make sure we do not cause a conflict with
463 // some other write. However, we do not need to check if there have been
464 // any writes since this transaction's snapshot.
465 // TODO(agiardullo): could optimize by supporting shared txn locks in the
466 // future
467 if (untracked || snapshot_ == nullptr) {
468 // Need to remember the earliest sequence number that we know that this
469 // key has not been modified after. This is useful if this same
470 // transaction
471 // later tries to lock this key again.
472 if (current_seqno == kMaxSequenceNumber) {
473 // Since we haven't checked a snapshot, we only know this key has not
474 // been modified since after we locked it.
475 new_seqno = db_->GetLatestSequenceNumber();
476 } else {
477 new_seqno = current_seqno;
478 }
479 } else {
480 // If a snapshot is set, we need to make sure the key hasn't been modified
481 // since the snapshot. This must be done after we locked the key.
482 if (s.ok()) {
483 s = ValidateSnapshot(column_family, key, current_seqno, &new_seqno);
484
485 if (!s.ok()) {
486 // Failed to validate key
487 if (!previously_locked) {
488 // Unlock key we just locked
489 if (lock_upgrade) {
490 s = txn_db_impl_->TryLock(this, cfh_id, key_str,
491 false /* exclusive */);
492 assert(s.ok());
493 } else {
494 txn_db_impl_->UnLock(this, cfh_id, key.ToString());
495 }
496 }
497 }
498 }
499 }
500
501 if (s.ok()) {
502 // Let base class know we've conflict checked this key.
503 TrackKey(cfh_id, key_str, new_seqno, read_only, exclusive);
504 }
505
506 return s;
507 }
508
509 // Return OK() if this key has not been modified more recently than the
510 // transaction snapshot_.
511 Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family,
512 const Slice& key,
513 SequenceNumber prev_seqno,
514 SequenceNumber* new_seqno) {
515 assert(snapshot_);
516
517 SequenceNumber seq = snapshot_->GetSequenceNumber();
518 if (prev_seqno <= seq) {
519 // If the key has been previous validated at a sequence number earlier
520 // than the curent snapshot's sequence number, we already know it has not
521 // been modified.
522 return Status::OK();
523 }
524
525 *new_seqno = seq;
526
527 assert(dynamic_cast<DBImpl*>(db_) != nullptr);
528 auto db_impl = reinterpret_cast<DBImpl*>(db_);
529
530 ColumnFamilyHandle* cfh =
531 column_family ? column_family : db_impl->DefaultColumnFamily();
532
533 return TransactionUtil::CheckKeyForConflicts(db_impl, cfh, key.ToString(),
534 snapshot_->GetSequenceNumber(),
535 false /* cache_only */);
536 }
537
538 bool TransactionImpl::TryStealingLocks() {
539 assert(IsExpired());
540 TransactionState expected = STARTED;
541 return std::atomic_compare_exchange_strong(&txn_state_, &expected,
542 LOCKS_STOLEN);
543 }
544
545 void TransactionImpl::UnlockGetForUpdate(ColumnFamilyHandle* column_family,
546 const Slice& key) {
547 txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
548 }
549
550 Status TransactionImpl::SetName(const TransactionName& name) {
551 Status s;
552 if (txn_state_ == STARTED) {
553 if (name_.length()) {
554 s = Status::InvalidArgument("Transaction has already been named.");
555 } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) {
556 s = Status::InvalidArgument("Transaction name must be unique.");
557 } else if (name.length() < 1 || name.length() > 512) {
558 s = Status::InvalidArgument(
559 "Transaction name length must be between 1 and 512 chars.");
560 } else {
561 name_ = name;
562 txn_db_impl_->RegisterTransaction(this);
563 }
564 } else {
565 s = Status::InvalidArgument("Transaction is beyond state for naming.");
566 }
567 return s;
568 }
569
570 } // namespace rocksdb
571
572 #endif // ROCKSDB_LITE