]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / utilities / transactions / pessimistic_transaction_db.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
11fdf7f2
TL
8#ifndef __STDC_FORMAT_MACROS
9#define __STDC_FORMAT_MACROS
10#endif
7c673cae 11
11fdf7f2
TL
12#include "utilities/transactions/pessimistic_transaction_db.h"
13
14#include <inttypes.h>
7c673cae
FG
15#include <string>
16#include <unordered_set>
17#include <vector>
18
19#include "db/db_impl.h"
20#include "rocksdb/db.h"
21#include "rocksdb/options.h"
22#include "rocksdb/utilities/transaction_db.h"
11fdf7f2
TL
23#include "util/cast_util.h"
24#include "util/mutexlock.h"
25#include "util/sync_point.h"
26#include "utilities/transactions/pessimistic_transaction.h"
7c673cae 27#include "utilities/transactions/transaction_db_mutex_impl.h"
11fdf7f2
TL
28#include "utilities/transactions/write_prepared_txn_db.h"
29#include "utilities/transactions/write_unprepared_txn_db.h"
7c673cae
FG
30
31namespace rocksdb {
32
11fdf7f2
TL
33PessimisticTransactionDB::PessimisticTransactionDB(
34 DB* db, const TransactionDBOptions& txn_db_options)
7c673cae 35 : TransactionDB(db),
11fdf7f2 36 db_impl_(static_cast_with_check<DBImpl, DB>(db)),
7c673cae
FG
37 txn_db_options_(txn_db_options),
38 lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
11fdf7f2 39 txn_db_options_.max_num_deadlocks,
7c673cae
FG
40 txn_db_options_.custom_mutex_factory
41 ? txn_db_options_.custom_mutex_factory
42 : std::shared_ptr<TransactionDBMutexFactory>(
43 new TransactionDBMutexFactoryImpl())) {
44 assert(db_impl_ != nullptr);
11fdf7f2 45 info_log_ = db_impl_->GetDBOptions().info_log;
7c673cae
FG
46}
47
11fdf7f2 48// Support initiliazing PessimisticTransactionDB from a stackable db
7c673cae 49//
11fdf7f2 50// PessimisticTransactionDB
7c673cae
FG
51// ^ ^
52// | |
53// | +
54// | StackableDB
55// | ^
56// | |
57// + +
58// DBImpl
59// ^
60// |(inherit)
61// +
62// DB
63//
11fdf7f2
TL
64PessimisticTransactionDB::PessimisticTransactionDB(
65 StackableDB* db, const TransactionDBOptions& txn_db_options)
7c673cae 66 : TransactionDB(db),
11fdf7f2 67 db_impl_(static_cast_with_check<DBImpl, DB>(db->GetRootDB())),
7c673cae
FG
68 txn_db_options_(txn_db_options),
69 lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
11fdf7f2 70 txn_db_options_.max_num_deadlocks,
7c673cae
FG
71 txn_db_options_.custom_mutex_factory
72 ? txn_db_options_.custom_mutex_factory
73 : std::shared_ptr<TransactionDBMutexFactory>(
74 new TransactionDBMutexFactoryImpl())) {
75 assert(db_impl_ != nullptr);
76}
77
11fdf7f2 78PessimisticTransactionDB::~PessimisticTransactionDB() {
7c673cae
FG
79 while (!transactions_.empty()) {
80 delete transactions_.begin()->second;
11fdf7f2
TL
81 // TODO(myabandeh): this seems to be an unsafe approach as it is not quite
82 // clear whether delete would also remove the entry from transactions_.
7c673cae
FG
83 }
84}
85
11fdf7f2
TL
86Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) {
87 return Status::OK();
88}
89
90Status PessimisticTransactionDB::Initialize(
7c673cae
FG
91 const std::vector<size_t>& compaction_enabled_cf_indices,
92 const std::vector<ColumnFamilyHandle*>& handles) {
93 for (auto cf_ptr : handles) {
94 AddColumnFamily(cf_ptr);
95 }
11fdf7f2
TL
96 // Verify cf options
97 for (auto handle : handles) {
98 ColumnFamilyDescriptor cfd;
99 Status s = handle->GetDescriptor(&cfd);
100 if (!s.ok()) {
101 return s;
102 }
103 s = VerifyCFOptions(cfd.options);
104 if (!s.ok()) {
105 return s;
106 }
107 }
108
7c673cae
FG
109 // Re-enable compaction for the column families that initially had
110 // compaction enabled.
111 std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
112 compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
113 for (auto index : compaction_enabled_cf_indices) {
114 compaction_enabled_cf_handles.push_back(handles[index]);
115 }
116
117 Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
118
119 // create 'real' transactions from recovered shell transactions
120 auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
121 assert(dbimpl != nullptr);
122 auto rtrxs = dbimpl->recovered_transactions();
123
124 for (auto it = rtrxs.begin(); it != rtrxs.end(); it++) {
125 auto recovered_trx = it->second;
126 assert(recovered_trx);
11fdf7f2
TL
127 assert(recovered_trx->batches_.size() == 1);
128 const auto& seq = recovered_trx->batches_.begin()->first;
129 const auto& batch_info = recovered_trx->batches_.begin()->second;
130 assert(batch_info.log_number_);
7c673cae
FG
131 assert(recovered_trx->name_.length());
132
133 WriteOptions w_options;
134 w_options.sync = true;
135 TransactionOptions t_options;
11fdf7f2
TL
136 // This would help avoiding deadlock for keys that although exist in the WAL
137 // did not go through concurrency control. This includes the merge that
138 // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if
139 // there is a conflict between the keys of two transactions that must be
140 // avoided, it is already avoided by the application, MyRocks, before the
141 // restart (ii) application, MyRocks, guarntees to rollback/commit the
142 // recovered transactions before new transactions start.
143 t_options.skip_concurrency_control = true;
7c673cae
FG
144
145 Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
146 assert(real_trx);
11fdf7f2
TL
147 real_trx->SetLogNumber(batch_info.log_number_);
148 assert(seq != kMaxSequenceNumber);
494da23a
TL
149 if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
150 real_trx->SetId(seq);
151 }
7c673cae
FG
152
153 s = real_trx->SetName(recovered_trx->name_);
154 if (!s.ok()) {
155 break;
156 }
157
11fdf7f2
TL
158 s = real_trx->RebuildFromWriteBatch(batch_info.batch_);
159 // WriteCommitted set this to to disable this check that is specific to
160 // WritePrepared txns
161 assert(batch_info.batch_cnt_ == 0 ||
162 real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_);
7c673cae
FG
163 real_trx->SetState(Transaction::PREPARED);
164 if (!s.ok()) {
165 break;
166 }
167 }
168 if (s.ok()) {
169 dbimpl->DeleteAllRecoveredTransactions();
170 }
171 return s;
172}
173
11fdf7f2 174Transaction* WriteCommittedTxnDB::BeginTransaction(
7c673cae
FG
175 const WriteOptions& write_options, const TransactionOptions& txn_options,
176 Transaction* old_txn) {
177 if (old_txn != nullptr) {
178 ReinitializeTransaction(old_txn, write_options, txn_options);
179 return old_txn;
180 } else {
11fdf7f2 181 return new WriteCommittedTxn(this, write_options, txn_options);
7c673cae
FG
182 }
183}
184
11fdf7f2 185TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
7c673cae
FG
186 const TransactionDBOptions& txn_db_options) {
187 TransactionDBOptions validated = txn_db_options;
188
189 if (txn_db_options.num_stripes == 0) {
190 validated.num_stripes = 1;
191 }
192
193 return validated;
194}
195
196Status TransactionDB::Open(const Options& options,
197 const TransactionDBOptions& txn_db_options,
198 const std::string& dbname, TransactionDB** dbptr) {
199 DBOptions db_options(options);
200 ColumnFamilyOptions cf_options(options);
201 std::vector<ColumnFamilyDescriptor> column_families;
202 column_families.push_back(
203 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
204 std::vector<ColumnFamilyHandle*> handles;
205 Status s = TransactionDB::Open(db_options, txn_db_options, dbname,
206 column_families, &handles, dbptr);
207 if (s.ok()) {
208 assert(handles.size() == 1);
209 // i can delete the handle since DBImpl is always holding a reference to
210 // default column family
211 delete handles[0];
212 }
213
214 return s;
215}
216
217Status TransactionDB::Open(
218 const DBOptions& db_options, const TransactionDBOptions& txn_db_options,
219 const std::string& dbname,
220 const std::vector<ColumnFamilyDescriptor>& column_families,
221 std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
222 Status s;
11fdf7f2 223 DB* db = nullptr;
7c673cae 224
11fdf7f2
TL
225 ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is %" PRId32,
226 static_cast<int>(txn_db_options.write_policy));
7c673cae
FG
227 std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
228 std::vector<size_t> compaction_enabled_cf_indices;
229 DBOptions db_options_2pc = db_options;
230 PrepareWrap(&db_options_2pc, &column_families_copy,
231 &compaction_enabled_cf_indices);
11fdf7f2
TL
232 const bool use_seq_per_batch =
233 txn_db_options.write_policy == WRITE_PREPARED ||
234 txn_db_options.write_policy == WRITE_UNPREPARED;
235 const bool use_batch_per_txn =
236 txn_db_options.write_policy == WRITE_COMMITTED ||
237 txn_db_options.write_policy == WRITE_PREPARED;
238 s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
239 use_seq_per_batch, use_batch_per_txn);
7c673cae
FG
240 if (s.ok()) {
241 s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
242 dbptr);
243 }
11fdf7f2
TL
244 if (!s.ok()) {
245 // just in case it was not deleted (and not set to nullptr).
246 delete db;
247 }
7c673cae
FG
248 return s;
249}
250
251void TransactionDB::PrepareWrap(
252 DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families,
253 std::vector<size_t>* compaction_enabled_cf_indices) {
254 compaction_enabled_cf_indices->clear();
255
256 // Enable MemTable History if not already enabled
257 for (size_t i = 0; i < column_families->size(); i++) {
258 ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
259
260 if (cf_options->max_write_buffer_number_to_maintain == 0) {
261 // Setting to -1 will set the History size to max_write_buffer_number.
262 cf_options->max_write_buffer_number_to_maintain = -1;
263 }
264 if (!cf_options->disable_auto_compactions) {
265 // Disable compactions momentarily to prevent race with DB::Open
266 cf_options->disable_auto_compactions = true;
267 compaction_enabled_cf_indices->push_back(i);
268 }
269 }
270 db_options->allow_2pc = true;
271}
272
273Status TransactionDB::WrapDB(
274 // make sure this db is already opened with memtable history enabled,
275 // auto compaction distabled and 2 phase commit enabled
276 DB* db, const TransactionDBOptions& txn_db_options,
277 const std::vector<size_t>& compaction_enabled_cf_indices,
278 const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
11fdf7f2
TL
279 assert(db != nullptr);
280 assert(dbptr != nullptr);
281 *dbptr = nullptr;
282 std::unique_ptr<PessimisticTransactionDB> txn_db;
283 switch (txn_db_options.write_policy) {
284 case WRITE_UNPREPARED:
285 txn_db.reset(new WriteUnpreparedTxnDB(
286 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
287 break;
288 case WRITE_PREPARED:
289 txn_db.reset(new WritePreparedTxnDB(
290 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
291 break;
292 case WRITE_COMMITTED:
293 default:
294 txn_db.reset(new WriteCommittedTxnDB(
295 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
296 }
297 txn_db->UpdateCFComparatorMap(handles);
7c673cae 298 Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
11fdf7f2
TL
299 // In case of a failure at this point, db is deleted via the txn_db destructor
300 // and set to nullptr.
301 if (s.ok()) {
302 *dbptr = txn_db.release();
303 }
7c673cae
FG
304 return s;
305}
306
307Status TransactionDB::WrapStackableDB(
308 // make sure this stackable_db is already opened with memtable history
11fdf7f2 309 // enabled, auto compaction distabled and 2 phase commit enabled
7c673cae
FG
310 StackableDB* db, const TransactionDBOptions& txn_db_options,
311 const std::vector<size_t>& compaction_enabled_cf_indices,
312 const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
11fdf7f2
TL
313 assert(db != nullptr);
314 assert(dbptr != nullptr);
315 *dbptr = nullptr;
316 std::unique_ptr<PessimisticTransactionDB> txn_db;
317
318 switch (txn_db_options.write_policy) {
319 case WRITE_UNPREPARED:
320 txn_db.reset(new WriteUnpreparedTxnDB(
321 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
322 break;
323 case WRITE_PREPARED:
324 txn_db.reset(new WritePreparedTxnDB(
325 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
326 break;
327 case WRITE_COMMITTED:
328 default:
329 txn_db.reset(new WriteCommittedTxnDB(
330 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
331 }
332 txn_db->UpdateCFComparatorMap(handles);
7c673cae 333 Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
11fdf7f2
TL
334 // In case of a failure at this point, db is deleted via the txn_db destructor
335 // and set to nullptr.
336 if (s.ok()) {
337 *dbptr = txn_db.release();
338 }
7c673cae
FG
339 return s;
340}
341
342// Let TransactionLockMgr know that this column family exists so it can
343// allocate a LockMap for it.
11fdf7f2
TL
344void PessimisticTransactionDB::AddColumnFamily(
345 const ColumnFamilyHandle* handle) {
7c673cae
FG
346 lock_mgr_.AddColumnFamily(handle->GetID());
347}
348
11fdf7f2 349Status PessimisticTransactionDB::CreateColumnFamily(
7c673cae
FG
350 const ColumnFamilyOptions& options, const std::string& column_family_name,
351 ColumnFamilyHandle** handle) {
352 InstrumentedMutexLock l(&column_family_mutex_);
11fdf7f2
TL
353 Status s = VerifyCFOptions(options);
354 if (!s.ok()) {
355 return s;
356 }
7c673cae 357
11fdf7f2 358 s = db_->CreateColumnFamily(options, column_family_name, handle);
7c673cae
FG
359 if (s.ok()) {
360 lock_mgr_.AddColumnFamily((*handle)->GetID());
11fdf7f2 361 UpdateCFComparatorMap(*handle);
7c673cae
FG
362 }
363
364 return s;
365}
366
367// Let TransactionLockMgr know that it can deallocate the LockMap for this
368// column family.
11fdf7f2
TL
369Status PessimisticTransactionDB::DropColumnFamily(
370 ColumnFamilyHandle* column_family) {
7c673cae
FG
371 InstrumentedMutexLock l(&column_family_mutex_);
372
373 Status s = db_->DropColumnFamily(column_family);
374 if (s.ok()) {
375 lock_mgr_.RemoveColumnFamily(column_family->GetID());
376 }
377
378 return s;
379}
380
11fdf7f2
TL
381Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
382 uint32_t cfh_id,
383 const std::string& key,
384 bool exclusive) {
7c673cae
FG
385 return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
386}
387
11fdf7f2
TL
388void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
389 const TransactionKeyMap* keys) {
7c673cae
FG
390 lock_mgr_.UnLock(txn, keys, GetEnv());
391}
392
11fdf7f2
TL
393void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
394 uint32_t cfh_id, const std::string& key) {
7c673cae
FG
395 lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
396}
397
398// Used when wrapping DB write operations in a transaction
11fdf7f2 399Transaction* PessimisticTransactionDB::BeginInternalTransaction(
7c673cae
FG
400 const WriteOptions& options) {
401 TransactionOptions txn_options;
402 Transaction* txn = BeginTransaction(options, txn_options, nullptr);
403
404 // Use default timeout for non-transactional writes
405 txn->SetLockTimeout(txn_db_options_.default_lock_timeout);
406 return txn;
407}
408
409// All user Put, Merge, Delete, and Write requests must be intercepted to make
410// sure that they lock all keys that they are writing to avoid causing conflicts
11fdf7f2 411// with any concurrent transactions. The easiest way to do this is to wrap all
7c673cae
FG
412// write operations in a transaction.
413//
414// Put(), Merge(), and Delete() only lock a single key per call. Write() will
415// sort its keys before locking them. This guarantees that TransactionDB write
11fdf7f2 416// methods cannot deadlock with each other (but still could deadlock with a
7c673cae 417// Transaction).
11fdf7f2
TL
418Status PessimisticTransactionDB::Put(const WriteOptions& options,
419 ColumnFamilyHandle* column_family,
420 const Slice& key, const Slice& val) {
7c673cae
FG
421 Status s;
422
423 Transaction* txn = BeginInternalTransaction(options);
424 txn->DisableIndexing();
425
426 // Since the client didn't create a transaction, they don't care about
427 // conflict checking for this write. So we just need to do PutUntracked().
428 s = txn->PutUntracked(column_family, key, val);
429
430 if (s.ok()) {
431 s = txn->Commit();
432 }
433
434 delete txn;
435
436 return s;
437}
438
11fdf7f2
TL
439Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
440 ColumnFamilyHandle* column_family,
441 const Slice& key) {
7c673cae
FG
442 Status s;
443
444 Transaction* txn = BeginInternalTransaction(wopts);
445 txn->DisableIndexing();
446
447 // Since the client didn't create a transaction, they don't care about
448 // conflict checking for this write. So we just need to do
449 // DeleteUntracked().
450 s = txn->DeleteUntracked(column_family, key);
451
452 if (s.ok()) {
453 s = txn->Commit();
454 }
455
456 delete txn;
457
458 return s;
459}
460
11fdf7f2
TL
461Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,
462 ColumnFamilyHandle* column_family,
463 const Slice& key) {
464 Status s;
465
466 Transaction* txn = BeginInternalTransaction(wopts);
467 txn->DisableIndexing();
468
469 // Since the client didn't create a transaction, they don't care about
470 // conflict checking for this write. So we just need to do
471 // SingleDeleteUntracked().
472 s = txn->SingleDeleteUntracked(column_family, key);
473
474 if (s.ok()) {
475 s = txn->Commit();
476 }
477
478 delete txn;
479
480 return s;
481}
482
483Status PessimisticTransactionDB::Merge(const WriteOptions& options,
484 ColumnFamilyHandle* column_family,
485 const Slice& key, const Slice& value) {
7c673cae
FG
486 Status s;
487
488 Transaction* txn = BeginInternalTransaction(options);
489 txn->DisableIndexing();
490
491 // Since the client didn't create a transaction, they don't care about
492 // conflict checking for this write. So we just need to do
493 // MergeUntracked().
494 s = txn->MergeUntracked(column_family, key, value);
495
496 if (s.ok()) {
497 s = txn->Commit();
498 }
499
500 delete txn;
501
502 return s;
503}
504
11fdf7f2
TL
505Status PessimisticTransactionDB::Write(const WriteOptions& opts,
506 WriteBatch* updates) {
7c673cae
FG
507 // Need to lock all keys in this batch to prevent write conflicts with
508 // concurrent transactions.
509 Transaction* txn = BeginInternalTransaction(opts);
510 txn->DisableIndexing();
511
11fdf7f2
TL
512 auto txn_impl =
513 static_cast_with_check<PessimisticTransaction, Transaction>(txn);
7c673cae
FG
514
515 // Since commitBatch sorts the keys before locking, concurrent Write()
516 // operations will not cause a deadlock.
517 // In order to avoid a deadlock with a concurrent Transaction, Transactions
518 // should use a lock timeout.
519 Status s = txn_impl->CommitBatch(updates);
520
521 delete txn;
522
523 return s;
524}
525
11fdf7f2
TL
526Status WriteCommittedTxnDB::Write(
527 const WriteOptions& opts,
528 const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
529 if (optimizations.skip_concurrency_control) {
530 return db_impl_->Write(opts, updates);
531 } else {
532 return Write(opts, updates);
533 }
534}
535
536void PessimisticTransactionDB::InsertExpirableTransaction(
537 TransactionID tx_id, PessimisticTransaction* tx) {
7c673cae
FG
538 assert(tx->GetExpirationTime() > 0);
539 std::lock_guard<std::mutex> lock(map_mutex_);
540 expirable_transactions_map_.insert({tx_id, tx});
541}
542
11fdf7f2 543void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
7c673cae
FG
544 std::lock_guard<std::mutex> lock(map_mutex_);
545 expirable_transactions_map_.erase(tx_id);
546}
547
11fdf7f2 548bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
7c673cae
FG
549 TransactionID tx_id) {
550 std::lock_guard<std::mutex> lock(map_mutex_);
551
552 auto tx_it = expirable_transactions_map_.find(tx_id);
553 if (tx_it == expirable_transactions_map_.end()) {
554 return true;
555 }
11fdf7f2 556 PessimisticTransaction& tx = *(tx_it->second);
7c673cae
FG
557 return tx.TryStealingLocks();
558}
559
11fdf7f2 560void PessimisticTransactionDB::ReinitializeTransaction(
7c673cae
FG
561 Transaction* txn, const WriteOptions& write_options,
562 const TransactionOptions& txn_options) {
11fdf7f2
TL
563 auto txn_impl =
564 static_cast_with_check<PessimisticTransaction, Transaction>(txn);
7c673cae
FG
565
566 txn_impl->Reinitialize(this, write_options, txn_options);
567}
568
11fdf7f2 569Transaction* PessimisticTransactionDB::GetTransactionByName(
7c673cae
FG
570 const TransactionName& name) {
571 std::lock_guard<std::mutex> lock(name_map_mutex_);
572 auto it = transactions_.find(name);
573 if (it == transactions_.end()) {
574 return nullptr;
575 } else {
576 return it->second;
577 }
578}
579
11fdf7f2 580void PessimisticTransactionDB::GetAllPreparedTransactions(
7c673cae
FG
581 std::vector<Transaction*>* transv) {
582 assert(transv);
583 transv->clear();
584 std::lock_guard<std::mutex> lock(name_map_mutex_);
585 for (auto it = transactions_.begin(); it != transactions_.end(); it++) {
586 if (it->second->GetState() == Transaction::PREPARED) {
587 transv->push_back(it->second);
588 }
589 }
590}
591
11fdf7f2
TL
592TransactionLockMgr::LockStatusData
593PessimisticTransactionDB::GetLockStatusData() {
7c673cae
FG
594 return lock_mgr_.GetLockStatusData();
595}
596
11fdf7f2
TL
597std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
598 return lock_mgr_.GetDeadlockInfoBuffer();
599}
600
601void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
602 lock_mgr_.Resize(target_size);
603}
604
605void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
7c673cae
FG
606 assert(txn);
607 assert(txn->GetName().length() > 0);
608 assert(GetTransactionByName(txn->GetName()) == nullptr);
609 assert(txn->GetState() == Transaction::STARTED);
610 std::lock_guard<std::mutex> lock(name_map_mutex_);
611 transactions_[txn->GetName()] = txn;
612}
613
11fdf7f2 614void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
7c673cae
FG
615 assert(txn);
616 std::lock_guard<std::mutex> lock(name_map_mutex_);
617 auto it = transactions_.find(txn->GetName());
618 assert(it != transactions_.end());
619 transactions_.erase(it);
620}
621
622} // namespace rocksdb
623#endif // ROCKSDB_LITE