]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / pessimistic_transaction_db.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
11fdf7f2
TL
8#include "utilities/transactions/pessimistic_transaction_db.h"
9
f67539c2 10#include <cinttypes>
7c673cae
FG
11#include <string>
12#include <unordered_set>
13#include <vector>
14
f67539c2 15#include "db/db_impl/db_impl.h"
7c673cae
FG
16#include "rocksdb/db.h"
17#include "rocksdb/options.h"
18#include "rocksdb/utilities/transaction_db.h"
f67539c2 19#include "test_util/sync_point.h"
11fdf7f2
TL
20#include "util/cast_util.h"
21#include "util/mutexlock.h"
11fdf7f2 22#include "utilities/transactions/pessimistic_transaction.h"
7c673cae 23#include "utilities/transactions/transaction_db_mutex_impl.h"
11fdf7f2
TL
24#include "utilities/transactions/write_prepared_txn_db.h"
25#include "utilities/transactions/write_unprepared_txn_db.h"
7c673cae 26
f67539c2 27namespace ROCKSDB_NAMESPACE {
7c673cae 28
11fdf7f2
TL
29PessimisticTransactionDB::PessimisticTransactionDB(
30 DB* db, const TransactionDBOptions& txn_db_options)
7c673cae 31 : TransactionDB(db),
20effc67 32 db_impl_(static_cast_with_check<DBImpl>(db)),
7c673cae 33 txn_db_options_(txn_db_options),
20effc67 34 lock_manager_(NewLockManager(this, txn_db_options)) {
7c673cae 35 assert(db_impl_ != nullptr);
11fdf7f2 36 info_log_ = db_impl_->GetDBOptions().info_log;
7c673cae
FG
37}
38
11fdf7f2 39// Support initiliazing PessimisticTransactionDB from a stackable db
7c673cae 40//
11fdf7f2 41// PessimisticTransactionDB
7c673cae
FG
42// ^ ^
43// | |
44// | +
45// | StackableDB
46// | ^
47// | |
48// + +
49// DBImpl
50// ^
51// |(inherit)
52// +
53// DB
54//
11fdf7f2
TL
55PessimisticTransactionDB::PessimisticTransactionDB(
56 StackableDB* db, const TransactionDBOptions& txn_db_options)
7c673cae 57 : TransactionDB(db),
20effc67 58 db_impl_(static_cast_with_check<DBImpl>(db->GetRootDB())),
7c673cae 59 txn_db_options_(txn_db_options),
20effc67 60 lock_manager_(NewLockManager(this, txn_db_options)) {
7c673cae
FG
61 assert(db_impl_ != nullptr);
62}
63
11fdf7f2 64PessimisticTransactionDB::~PessimisticTransactionDB() {
7c673cae
FG
65 while (!transactions_.empty()) {
66 delete transactions_.begin()->second;
11fdf7f2
TL
67 // TODO(myabandeh): this seems to be an unsafe approach as it is not quite
68 // clear whether delete would also remove the entry from transactions_.
7c673cae
FG
69 }
70}
71
11fdf7f2
TL
72Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) {
73 return Status::OK();
74}
75
76Status PessimisticTransactionDB::Initialize(
7c673cae
FG
77 const std::vector<size_t>& compaction_enabled_cf_indices,
78 const std::vector<ColumnFamilyHandle*>& handles) {
79 for (auto cf_ptr : handles) {
80 AddColumnFamily(cf_ptr);
81 }
11fdf7f2
TL
82 // Verify cf options
83 for (auto handle : handles) {
84 ColumnFamilyDescriptor cfd;
85 Status s = handle->GetDescriptor(&cfd);
86 if (!s.ok()) {
87 return s;
88 }
89 s = VerifyCFOptions(cfd.options);
90 if (!s.ok()) {
91 return s;
92 }
93 }
94
7c673cae
FG
95 // Re-enable compaction for the column families that initially had
96 // compaction enabled.
97 std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
98 compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
99 for (auto index : compaction_enabled_cf_indices) {
100 compaction_enabled_cf_handles.push_back(handles[index]);
101 }
102
103 Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
104
105 // create 'real' transactions from recovered shell transactions
20effc67 106 auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
7c673cae
FG
107 assert(dbimpl != nullptr);
108 auto rtrxs = dbimpl->recovered_transactions();
109
f67539c2 110 for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) {
7c673cae
FG
111 auto recovered_trx = it->second;
112 assert(recovered_trx);
11fdf7f2
TL
113 assert(recovered_trx->batches_.size() == 1);
114 const auto& seq = recovered_trx->batches_.begin()->first;
115 const auto& batch_info = recovered_trx->batches_.begin()->second;
116 assert(batch_info.log_number_);
7c673cae
FG
117 assert(recovered_trx->name_.length());
118
119 WriteOptions w_options;
120 w_options.sync = true;
121 TransactionOptions t_options;
11fdf7f2
TL
122 // This would help avoiding deadlock for keys that although exist in the WAL
123 // did not go through concurrency control. This includes the merge that
124 // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if
125 // there is a conflict between the keys of two transactions that must be
126 // avoided, it is already avoided by the application, MyRocks, before the
127 // restart (ii) application, MyRocks, guarntees to rollback/commit the
128 // recovered transactions before new transactions start.
129 t_options.skip_concurrency_control = true;
7c673cae
FG
130
131 Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
132 assert(real_trx);
11fdf7f2
TL
133 real_trx->SetLogNumber(batch_info.log_number_);
134 assert(seq != kMaxSequenceNumber);
494da23a
TL
135 if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
136 real_trx->SetId(seq);
137 }
7c673cae
FG
138
139 s = real_trx->SetName(recovered_trx->name_);
140 if (!s.ok()) {
141 break;
142 }
143
11fdf7f2
TL
144 s = real_trx->RebuildFromWriteBatch(batch_info.batch_);
145 // WriteCommitted set this to to disable this check that is specific to
146 // WritePrepared txns
147 assert(batch_info.batch_cnt_ == 0 ||
148 real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_);
7c673cae
FG
149 real_trx->SetState(Transaction::PREPARED);
150 if (!s.ok()) {
151 break;
152 }
153 }
154 if (s.ok()) {
155 dbimpl->DeleteAllRecoveredTransactions();
156 }
157 return s;
158}
159
11fdf7f2 160Transaction* WriteCommittedTxnDB::BeginTransaction(
7c673cae
FG
161 const WriteOptions& write_options, const TransactionOptions& txn_options,
162 Transaction* old_txn) {
163 if (old_txn != nullptr) {
164 ReinitializeTransaction(old_txn, write_options, txn_options);
165 return old_txn;
166 } else {
11fdf7f2 167 return new WriteCommittedTxn(this, write_options, txn_options);
7c673cae
FG
168 }
169}
170
11fdf7f2 171TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
7c673cae
FG
172 const TransactionDBOptions& txn_db_options) {
173 TransactionDBOptions validated = txn_db_options;
174
175 if (txn_db_options.num_stripes == 0) {
176 validated.num_stripes = 1;
177 }
178
179 return validated;
180}
181
182Status TransactionDB::Open(const Options& options,
183 const TransactionDBOptions& txn_db_options,
184 const std::string& dbname, TransactionDB** dbptr) {
185 DBOptions db_options(options);
186 ColumnFamilyOptions cf_options(options);
187 std::vector<ColumnFamilyDescriptor> column_families;
188 column_families.push_back(
189 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
190 std::vector<ColumnFamilyHandle*> handles;
191 Status s = TransactionDB::Open(db_options, txn_db_options, dbname,
192 column_families, &handles, dbptr);
193 if (s.ok()) {
194 assert(handles.size() == 1);
195 // i can delete the handle since DBImpl is always holding a reference to
196 // default column family
197 delete handles[0];
198 }
199
200 return s;
201}
202
203Status TransactionDB::Open(
204 const DBOptions& db_options, const TransactionDBOptions& txn_db_options,
205 const std::string& dbname,
206 const std::vector<ColumnFamilyDescriptor>& column_families,
207 std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
208 Status s;
11fdf7f2 209 DB* db = nullptr;
f67539c2
TL
210 if (txn_db_options.write_policy == WRITE_COMMITTED &&
211 db_options.unordered_write) {
212 return Status::NotSupported(
213 "WRITE_COMMITTED is incompatible with unordered_writes");
214 }
215 if (txn_db_options.write_policy == WRITE_UNPREPARED &&
216 db_options.unordered_write) {
217 // TODO(lth): support it
218 return Status::NotSupported(
219 "WRITE_UNPREPARED is currently incompatible with unordered_writes");
220 }
221 if (txn_db_options.write_policy == WRITE_PREPARED &&
222 db_options.unordered_write && !db_options.two_write_queues) {
223 return Status::NotSupported(
224 "WRITE_PREPARED is incompatible with unordered_writes if "
225 "two_write_queues is not enabled.");
226 }
7c673cae
FG
227
228 std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
229 std::vector<size_t> compaction_enabled_cf_indices;
230 DBOptions db_options_2pc = db_options;
231 PrepareWrap(&db_options_2pc, &column_families_copy,
232 &compaction_enabled_cf_indices);
11fdf7f2
TL
233 const bool use_seq_per_batch =
234 txn_db_options.write_policy == WRITE_PREPARED ||
235 txn_db_options.write_policy == WRITE_UNPREPARED;
236 const bool use_batch_per_txn =
237 txn_db_options.write_policy == WRITE_COMMITTED ||
238 txn_db_options.write_policy == WRITE_PREPARED;
239 s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
240 use_seq_per_batch, use_batch_per_txn);
7c673cae 241 if (s.ok()) {
f67539c2
TL
242 ROCKS_LOG_WARN(db->GetDBOptions().info_log,
243 "Transaction write_policy is %" PRId32,
244 static_cast<int>(txn_db_options.write_policy));
7c673cae
FG
245 s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
246 dbptr);
247 }
11fdf7f2
TL
248 if (!s.ok()) {
249 // just in case it was not deleted (and not set to nullptr).
250 delete db;
251 }
7c673cae
FG
252 return s;
253}
254
255void TransactionDB::PrepareWrap(
256 DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families,
257 std::vector<size_t>* compaction_enabled_cf_indices) {
258 compaction_enabled_cf_indices->clear();
259
260 // Enable MemTable History if not already enabled
261 for (size_t i = 0; i < column_families->size(); i++) {
262 ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
263
f67539c2
TL
264 if (cf_options->max_write_buffer_size_to_maintain == 0 &&
265 cf_options->max_write_buffer_number_to_maintain == 0) {
266 // Setting to -1 will set the History size to
267 // max_write_buffer_number * write_buffer_size.
268 cf_options->max_write_buffer_size_to_maintain = -1;
7c673cae
FG
269 }
270 if (!cf_options->disable_auto_compactions) {
271 // Disable compactions momentarily to prevent race with DB::Open
272 cf_options->disable_auto_compactions = true;
273 compaction_enabled_cf_indices->push_back(i);
274 }
275 }
276 db_options->allow_2pc = true;
277}
278
279Status TransactionDB::WrapDB(
280 // make sure this db is already opened with memtable history enabled,
281 // auto compaction distabled and 2 phase commit enabled
282 DB* db, const TransactionDBOptions& txn_db_options,
283 const std::vector<size_t>& compaction_enabled_cf_indices,
284 const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
11fdf7f2
TL
285 assert(db != nullptr);
286 assert(dbptr != nullptr);
287 *dbptr = nullptr;
288 std::unique_ptr<PessimisticTransactionDB> txn_db;
289 switch (txn_db_options.write_policy) {
290 case WRITE_UNPREPARED:
291 txn_db.reset(new WriteUnpreparedTxnDB(
292 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
293 break;
294 case WRITE_PREPARED:
295 txn_db.reset(new WritePreparedTxnDB(
296 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
297 break;
298 case WRITE_COMMITTED:
299 default:
300 txn_db.reset(new WriteCommittedTxnDB(
301 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
302 }
303 txn_db->UpdateCFComparatorMap(handles);
7c673cae 304 Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
11fdf7f2
TL
305 // In case of a failure at this point, db is deleted via the txn_db destructor
306 // and set to nullptr.
307 if (s.ok()) {
308 *dbptr = txn_db.release();
309 }
7c673cae
FG
310 return s;
311}
312
313Status TransactionDB::WrapStackableDB(
314 // make sure this stackable_db is already opened with memtable history
11fdf7f2 315 // enabled, auto compaction distabled and 2 phase commit enabled
7c673cae
FG
316 StackableDB* db, const TransactionDBOptions& txn_db_options,
317 const std::vector<size_t>& compaction_enabled_cf_indices,
318 const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
11fdf7f2
TL
319 assert(db != nullptr);
320 assert(dbptr != nullptr);
321 *dbptr = nullptr;
322 std::unique_ptr<PessimisticTransactionDB> txn_db;
323
324 switch (txn_db_options.write_policy) {
325 case WRITE_UNPREPARED:
326 txn_db.reset(new WriteUnpreparedTxnDB(
327 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
328 break;
329 case WRITE_PREPARED:
330 txn_db.reset(new WritePreparedTxnDB(
331 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
332 break;
333 case WRITE_COMMITTED:
334 default:
335 txn_db.reset(new WriteCommittedTxnDB(
336 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
337 }
338 txn_db->UpdateCFComparatorMap(handles);
7c673cae 339 Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
11fdf7f2
TL
340 // In case of a failure at this point, db is deleted via the txn_db destructor
341 // and set to nullptr.
342 if (s.ok()) {
343 *dbptr = txn_db.release();
344 }
7c673cae
FG
345 return s;
346}
347
20effc67 348// Let LockManager know that this column family exists so it can
7c673cae 349// allocate a LockMap for it.
11fdf7f2
TL
350void PessimisticTransactionDB::AddColumnFamily(
351 const ColumnFamilyHandle* handle) {
20effc67 352 lock_manager_->AddColumnFamily(handle);
7c673cae
FG
353}
354
11fdf7f2 355Status PessimisticTransactionDB::CreateColumnFamily(
7c673cae
FG
356 const ColumnFamilyOptions& options, const std::string& column_family_name,
357 ColumnFamilyHandle** handle) {
358 InstrumentedMutexLock l(&column_family_mutex_);
11fdf7f2
TL
359 Status s = VerifyCFOptions(options);
360 if (!s.ok()) {
361 return s;
362 }
7c673cae 363
11fdf7f2 364 s = db_->CreateColumnFamily(options, column_family_name, handle);
7c673cae 365 if (s.ok()) {
20effc67 366 lock_manager_->AddColumnFamily(*handle);
11fdf7f2 367 UpdateCFComparatorMap(*handle);
7c673cae
FG
368 }
369
370 return s;
371}
372
20effc67 373// Let LockManager know that it can deallocate the LockMap for this
7c673cae 374// column family.
11fdf7f2
TL
375Status PessimisticTransactionDB::DropColumnFamily(
376 ColumnFamilyHandle* column_family) {
7c673cae
FG
377 InstrumentedMutexLock l(&column_family_mutex_);
378
379 Status s = db_->DropColumnFamily(column_family);
380 if (s.ok()) {
20effc67 381 lock_manager_->RemoveColumnFamily(column_family);
7c673cae
FG
382 }
383
384 return s;
385}
386
11fdf7f2
TL
387Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
388 uint32_t cfh_id,
389 const std::string& key,
390 bool exclusive) {
20effc67 391 return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive);
7c673cae
FG
392}
393
11fdf7f2 394void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
20effc67
TL
395 const LockTracker& keys) {
396 lock_manager_->UnLock(txn, keys, GetEnv());
7c673cae
FG
397}
398
11fdf7f2
TL
399void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
400 uint32_t cfh_id, const std::string& key) {
20effc67 401 lock_manager_->UnLock(txn, cfh_id, key, GetEnv());
7c673cae
FG
402}
403
404// Used when wrapping DB write operations in a transaction
11fdf7f2 405Transaction* PessimisticTransactionDB::BeginInternalTransaction(
7c673cae
FG
406 const WriteOptions& options) {
407 TransactionOptions txn_options;
408 Transaction* txn = BeginTransaction(options, txn_options, nullptr);
409
410 // Use default timeout for non-transactional writes
411 txn->SetLockTimeout(txn_db_options_.default_lock_timeout);
412 return txn;
413}
414
415// All user Put, Merge, Delete, and Write requests must be intercepted to make
416// sure that they lock all keys that they are writing to avoid causing conflicts
11fdf7f2 417// with any concurrent transactions. The easiest way to do this is to wrap all
7c673cae
FG
418// write operations in a transaction.
419//
420// Put(), Merge(), and Delete() only lock a single key per call. Write() will
421// sort its keys before locking them. This guarantees that TransactionDB write
11fdf7f2 422// methods cannot deadlock with each other (but still could deadlock with a
7c673cae 423// Transaction).
11fdf7f2
TL
424Status PessimisticTransactionDB::Put(const WriteOptions& options,
425 ColumnFamilyHandle* column_family,
426 const Slice& key, const Slice& val) {
7c673cae
FG
427 Status s;
428
429 Transaction* txn = BeginInternalTransaction(options);
430 txn->DisableIndexing();
431
432 // Since the client didn't create a transaction, they don't care about
433 // conflict checking for this write. So we just need to do PutUntracked().
434 s = txn->PutUntracked(column_family, key, val);
435
436 if (s.ok()) {
437 s = txn->Commit();
438 }
439
440 delete txn;
441
442 return s;
443}
444
11fdf7f2
TL
445Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
446 ColumnFamilyHandle* column_family,
447 const Slice& key) {
7c673cae
FG
448 Status s;
449
450 Transaction* txn = BeginInternalTransaction(wopts);
451 txn->DisableIndexing();
452
453 // Since the client didn't create a transaction, they don't care about
454 // conflict checking for this write. So we just need to do
455 // DeleteUntracked().
456 s = txn->DeleteUntracked(column_family, key);
457
458 if (s.ok()) {
459 s = txn->Commit();
460 }
461
462 delete txn;
463
464 return s;
465}
466
11fdf7f2
TL
467Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,
468 ColumnFamilyHandle* column_family,
469 const Slice& key) {
470 Status s;
471
472 Transaction* txn = BeginInternalTransaction(wopts);
473 txn->DisableIndexing();
474
475 // Since the client didn't create a transaction, they don't care about
476 // conflict checking for this write. So we just need to do
477 // SingleDeleteUntracked().
478 s = txn->SingleDeleteUntracked(column_family, key);
479
480 if (s.ok()) {
481 s = txn->Commit();
482 }
483
484 delete txn;
485
486 return s;
487}
488
489Status PessimisticTransactionDB::Merge(const WriteOptions& options,
490 ColumnFamilyHandle* column_family,
491 const Slice& key, const Slice& value) {
7c673cae
FG
492 Status s;
493
494 Transaction* txn = BeginInternalTransaction(options);
495 txn->DisableIndexing();
496
497 // Since the client didn't create a transaction, they don't care about
498 // conflict checking for this write. So we just need to do
499 // MergeUntracked().
500 s = txn->MergeUntracked(column_family, key, value);
501
502 if (s.ok()) {
503 s = txn->Commit();
504 }
505
506 delete txn;
507
508 return s;
509}
510
11fdf7f2
TL
511Status PessimisticTransactionDB::Write(const WriteOptions& opts,
512 WriteBatch* updates) {
f67539c2
TL
513 return WriteWithConcurrencyControl(opts, updates);
514}
7c673cae 515
f67539c2
TL
516Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
517 WriteBatch* updates) {
518 if (txn_db_options_.skip_concurrency_control) {
519 return db_impl_->Write(opts, updates);
520 } else {
521 return WriteWithConcurrencyControl(opts, updates);
522 }
7c673cae
FG
523}
524
11fdf7f2
TL
525Status WriteCommittedTxnDB::Write(
526 const WriteOptions& opts,
527 const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
528 if (optimizations.skip_concurrency_control) {
529 return db_impl_->Write(opts, updates);
530 } else {
f67539c2 531 return WriteWithConcurrencyControl(opts, updates);
11fdf7f2
TL
532 }
533}
534
535void PessimisticTransactionDB::InsertExpirableTransaction(
536 TransactionID tx_id, PessimisticTransaction* tx) {
7c673cae
FG
537 assert(tx->GetExpirationTime() > 0);
538 std::lock_guard<std::mutex> lock(map_mutex_);
539 expirable_transactions_map_.insert({tx_id, tx});
540}
541
11fdf7f2 542void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
7c673cae
FG
543 std::lock_guard<std::mutex> lock(map_mutex_);
544 expirable_transactions_map_.erase(tx_id);
545}
546
11fdf7f2 547bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
7c673cae
FG
548 TransactionID tx_id) {
549 std::lock_guard<std::mutex> lock(map_mutex_);
550
551 auto tx_it = expirable_transactions_map_.find(tx_id);
552 if (tx_it == expirable_transactions_map_.end()) {
553 return true;
554 }
11fdf7f2 555 PessimisticTransaction& tx = *(tx_it->second);
7c673cae
FG
556 return tx.TryStealingLocks();
557}
558
11fdf7f2 559void PessimisticTransactionDB::ReinitializeTransaction(
7c673cae
FG
560 Transaction* txn, const WriteOptions& write_options,
561 const TransactionOptions& txn_options) {
20effc67 562 auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
7c673cae
FG
563
564 txn_impl->Reinitialize(this, write_options, txn_options);
565}
566
11fdf7f2 567Transaction* PessimisticTransactionDB::GetTransactionByName(
7c673cae
FG
568 const TransactionName& name) {
569 std::lock_guard<std::mutex> lock(name_map_mutex_);
570 auto it = transactions_.find(name);
571 if (it == transactions_.end()) {
572 return nullptr;
573 } else {
574 return it->second;
575 }
576}
577
11fdf7f2 578void PessimisticTransactionDB::GetAllPreparedTransactions(
7c673cae
FG
579 std::vector<Transaction*>* transv) {
580 assert(transv);
581 transv->clear();
582 std::lock_guard<std::mutex> lock(name_map_mutex_);
f67539c2 583 for (auto it = transactions_.begin(); it != transactions_.end(); ++it) {
7c673cae
FG
584 if (it->second->GetState() == Transaction::PREPARED) {
585 transv->push_back(it->second);
586 }
587 }
588}
589
20effc67
TL
590LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() {
591 return lock_manager_->GetPointLockStatus();
7c673cae
FG
592}
593
11fdf7f2 594std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
20effc67 595 return lock_manager_->GetDeadlockInfoBuffer();
11fdf7f2
TL
596}
597
598void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
20effc67 599 lock_manager_->Resize(target_size);
11fdf7f2
TL
600}
601
602void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
7c673cae
FG
603 assert(txn);
604 assert(txn->GetName().length() > 0);
605 assert(GetTransactionByName(txn->GetName()) == nullptr);
606 assert(txn->GetState() == Transaction::STARTED);
607 std::lock_guard<std::mutex> lock(name_map_mutex_);
608 transactions_[txn->GetName()] = txn;
609}
610
11fdf7f2 611void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
7c673cae
FG
612 assert(txn);
613 std::lock_guard<std::mutex> lock(name_map_mutex_);
614 auto it = transactions_.find(txn->GetName());
615 assert(it != transactions_.end());
616 transactions_.erase(it);
617}
618
f67539c2 619} // namespace ROCKSDB_NAMESPACE
7c673cae 620#endif // ROCKSDB_LITE