]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
11fdf7f2 TL |
8 | #ifndef __STDC_FORMAT_MACROS |
9 | #define __STDC_FORMAT_MACROS | |
10 | #endif | |
7c673cae | 11 | |
11fdf7f2 TL |
12 | #include "utilities/transactions/pessimistic_transaction_db.h" |
13 | ||
14 | #include <inttypes.h> | |
7c673cae FG |
15 | #include <string> |
16 | #include <unordered_set> | |
17 | #include <vector> | |
18 | ||
19 | #include "db/db_impl.h" | |
20 | #include "rocksdb/db.h" | |
21 | #include "rocksdb/options.h" | |
22 | #include "rocksdb/utilities/transaction_db.h" | |
11fdf7f2 TL |
23 | #include "util/cast_util.h" |
24 | #include "util/mutexlock.h" | |
25 | #include "util/sync_point.h" | |
26 | #include "utilities/transactions/pessimistic_transaction.h" | |
7c673cae | 27 | #include "utilities/transactions/transaction_db_mutex_impl.h" |
11fdf7f2 TL |
28 | #include "utilities/transactions/write_prepared_txn_db.h" |
29 | #include "utilities/transactions/write_unprepared_txn_db.h" | |
7c673cae FG |
30 | |
31 | namespace rocksdb { | |
32 | ||
11fdf7f2 TL |
33 | PessimisticTransactionDB::PessimisticTransactionDB( |
34 | DB* db, const TransactionDBOptions& txn_db_options) | |
7c673cae | 35 | : TransactionDB(db), |
11fdf7f2 | 36 | db_impl_(static_cast_with_check<DBImpl, DB>(db)), |
7c673cae FG |
37 | txn_db_options_(txn_db_options), |
38 | lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, | |
11fdf7f2 | 39 | txn_db_options_.max_num_deadlocks, |
7c673cae FG |
40 | txn_db_options_.custom_mutex_factory |
41 | ? txn_db_options_.custom_mutex_factory | |
42 | : std::shared_ptr<TransactionDBMutexFactory>( | |
43 | new TransactionDBMutexFactoryImpl())) { | |
44 | assert(db_impl_ != nullptr); | |
11fdf7f2 | 45 | info_log_ = db_impl_->GetDBOptions().info_log; |
7c673cae FG |
46 | } |
47 | ||
11fdf7f2 | 48 | // Support initiliazing PessimisticTransactionDB from a stackable db |
7c673cae | 49 | // |
11fdf7f2 | 50 | // PessimisticTransactionDB |
7c673cae FG |
51 | // ^ ^ |
52 | // | | | |
53 | // | + | |
54 | // | StackableDB | |
55 | // | ^ | |
56 | // | | | |
57 | // + + | |
58 | // DBImpl | |
59 | // ^ | |
60 | // |(inherit) | |
61 | // + | |
62 | // DB | |
63 | // | |
11fdf7f2 TL |
64 | PessimisticTransactionDB::PessimisticTransactionDB( |
65 | StackableDB* db, const TransactionDBOptions& txn_db_options) | |
7c673cae | 66 | : TransactionDB(db), |
11fdf7f2 | 67 | db_impl_(static_cast_with_check<DBImpl, DB>(db->GetRootDB())), |
7c673cae FG |
68 | txn_db_options_(txn_db_options), |
69 | lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, | |
11fdf7f2 | 70 | txn_db_options_.max_num_deadlocks, |
7c673cae FG |
71 | txn_db_options_.custom_mutex_factory |
72 | ? txn_db_options_.custom_mutex_factory | |
73 | : std::shared_ptr<TransactionDBMutexFactory>( | |
74 | new TransactionDBMutexFactoryImpl())) { | |
75 | assert(db_impl_ != nullptr); | |
76 | } | |
77 | ||
11fdf7f2 | 78 | PessimisticTransactionDB::~PessimisticTransactionDB() { |
7c673cae FG |
79 | while (!transactions_.empty()) { |
80 | delete transactions_.begin()->second; | |
11fdf7f2 TL |
81 | // TODO(myabandeh): this seems to be an unsafe approach as it is not quite |
82 | // clear whether delete would also remove the entry from transactions_. | |
7c673cae FG |
83 | } |
84 | } | |
85 | ||
11fdf7f2 TL |
86 | Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) { |
87 | return Status::OK(); | |
88 | } | |
89 | ||
90 | Status PessimisticTransactionDB::Initialize( | |
7c673cae FG |
91 | const std::vector<size_t>& compaction_enabled_cf_indices, |
92 | const std::vector<ColumnFamilyHandle*>& handles) { | |
93 | for (auto cf_ptr : handles) { | |
94 | AddColumnFamily(cf_ptr); | |
95 | } | |
11fdf7f2 TL |
96 | // Verify cf options |
97 | for (auto handle : handles) { | |
98 | ColumnFamilyDescriptor cfd; | |
99 | Status s = handle->GetDescriptor(&cfd); | |
100 | if (!s.ok()) { | |
101 | return s; | |
102 | } | |
103 | s = VerifyCFOptions(cfd.options); | |
104 | if (!s.ok()) { | |
105 | return s; | |
106 | } | |
107 | } | |
108 | ||
7c673cae FG |
109 | // Re-enable compaction for the column families that initially had |
110 | // compaction enabled. | |
111 | std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles; | |
112 | compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size()); | |
113 | for (auto index : compaction_enabled_cf_indices) { | |
114 | compaction_enabled_cf_handles.push_back(handles[index]); | |
115 | } | |
116 | ||
117 | Status s = EnableAutoCompaction(compaction_enabled_cf_handles); | |
118 | ||
119 | // create 'real' transactions from recovered shell transactions | |
120 | auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB()); | |
121 | assert(dbimpl != nullptr); | |
122 | auto rtrxs = dbimpl->recovered_transactions(); | |
123 | ||
124 | for (auto it = rtrxs.begin(); it != rtrxs.end(); it++) { | |
125 | auto recovered_trx = it->second; | |
126 | assert(recovered_trx); | |
11fdf7f2 TL |
127 | assert(recovered_trx->batches_.size() == 1); |
128 | const auto& seq = recovered_trx->batches_.begin()->first; | |
129 | const auto& batch_info = recovered_trx->batches_.begin()->second; | |
130 | assert(batch_info.log_number_); | |
7c673cae FG |
131 | assert(recovered_trx->name_.length()); |
132 | ||
133 | WriteOptions w_options; | |
134 | w_options.sync = true; | |
135 | TransactionOptions t_options; | |
11fdf7f2 TL |
136 | // This would help avoiding deadlock for keys that although exist in the WAL |
137 | // did not go through concurrency control. This includes the merge that | |
138 | // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if | |
139 | // there is a conflict between the keys of two transactions that must be | |
140 | // avoided, it is already avoided by the application, MyRocks, before the | |
141 | // restart (ii) application, MyRocks, guarntees to rollback/commit the | |
142 | // recovered transactions before new transactions start. | |
143 | t_options.skip_concurrency_control = true; | |
7c673cae FG |
144 | |
145 | Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr); | |
146 | assert(real_trx); | |
11fdf7f2 TL |
147 | real_trx->SetLogNumber(batch_info.log_number_); |
148 | assert(seq != kMaxSequenceNumber); | |
494da23a TL |
149 | if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) { |
150 | real_trx->SetId(seq); | |
151 | } | |
7c673cae FG |
152 | |
153 | s = real_trx->SetName(recovered_trx->name_); | |
154 | if (!s.ok()) { | |
155 | break; | |
156 | } | |
157 | ||
11fdf7f2 TL |
158 | s = real_trx->RebuildFromWriteBatch(batch_info.batch_); |
159 | // WriteCommitted set this to to disable this check that is specific to | |
160 | // WritePrepared txns | |
161 | assert(batch_info.batch_cnt_ == 0 || | |
162 | real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_); | |
7c673cae FG |
163 | real_trx->SetState(Transaction::PREPARED); |
164 | if (!s.ok()) { | |
165 | break; | |
166 | } | |
167 | } | |
168 | if (s.ok()) { | |
169 | dbimpl->DeleteAllRecoveredTransactions(); | |
170 | } | |
171 | return s; | |
172 | } | |
173 | ||
11fdf7f2 | 174 | Transaction* WriteCommittedTxnDB::BeginTransaction( |
7c673cae FG |
175 | const WriteOptions& write_options, const TransactionOptions& txn_options, |
176 | Transaction* old_txn) { | |
177 | if (old_txn != nullptr) { | |
178 | ReinitializeTransaction(old_txn, write_options, txn_options); | |
179 | return old_txn; | |
180 | } else { | |
11fdf7f2 | 181 | return new WriteCommittedTxn(this, write_options, txn_options); |
7c673cae FG |
182 | } |
183 | } | |
184 | ||
11fdf7f2 | 185 | TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions( |
7c673cae FG |
186 | const TransactionDBOptions& txn_db_options) { |
187 | TransactionDBOptions validated = txn_db_options; | |
188 | ||
189 | if (txn_db_options.num_stripes == 0) { | |
190 | validated.num_stripes = 1; | |
191 | } | |
192 | ||
193 | return validated; | |
194 | } | |
195 | ||
196 | Status TransactionDB::Open(const Options& options, | |
197 | const TransactionDBOptions& txn_db_options, | |
198 | const std::string& dbname, TransactionDB** dbptr) { | |
199 | DBOptions db_options(options); | |
200 | ColumnFamilyOptions cf_options(options); | |
201 | std::vector<ColumnFamilyDescriptor> column_families; | |
202 | column_families.push_back( | |
203 | ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); | |
204 | std::vector<ColumnFamilyHandle*> handles; | |
205 | Status s = TransactionDB::Open(db_options, txn_db_options, dbname, | |
206 | column_families, &handles, dbptr); | |
207 | if (s.ok()) { | |
208 | assert(handles.size() == 1); | |
209 | // i can delete the handle since DBImpl is always holding a reference to | |
210 | // default column family | |
211 | delete handles[0]; | |
212 | } | |
213 | ||
214 | return s; | |
215 | } | |
216 | ||
217 | Status TransactionDB::Open( | |
218 | const DBOptions& db_options, const TransactionDBOptions& txn_db_options, | |
219 | const std::string& dbname, | |
220 | const std::vector<ColumnFamilyDescriptor>& column_families, | |
221 | std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) { | |
222 | Status s; | |
11fdf7f2 | 223 | DB* db = nullptr; |
7c673cae | 224 | |
11fdf7f2 TL |
225 | ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is %" PRId32, |
226 | static_cast<int>(txn_db_options.write_policy)); | |
7c673cae FG |
227 | std::vector<ColumnFamilyDescriptor> column_families_copy = column_families; |
228 | std::vector<size_t> compaction_enabled_cf_indices; | |
229 | DBOptions db_options_2pc = db_options; | |
230 | PrepareWrap(&db_options_2pc, &column_families_copy, | |
231 | &compaction_enabled_cf_indices); | |
11fdf7f2 TL |
232 | const bool use_seq_per_batch = |
233 | txn_db_options.write_policy == WRITE_PREPARED || | |
234 | txn_db_options.write_policy == WRITE_UNPREPARED; | |
235 | const bool use_batch_per_txn = | |
236 | txn_db_options.write_policy == WRITE_COMMITTED || | |
237 | txn_db_options.write_policy == WRITE_PREPARED; | |
238 | s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db, | |
239 | use_seq_per_batch, use_batch_per_txn); | |
7c673cae FG |
240 | if (s.ok()) { |
241 | s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles, | |
242 | dbptr); | |
243 | } | |
11fdf7f2 TL |
244 | if (!s.ok()) { |
245 | // just in case it was not deleted (and not set to nullptr). | |
246 | delete db; | |
247 | } | |
7c673cae FG |
248 | return s; |
249 | } | |
250 | ||
251 | void TransactionDB::PrepareWrap( | |
252 | DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families, | |
253 | std::vector<size_t>* compaction_enabled_cf_indices) { | |
254 | compaction_enabled_cf_indices->clear(); | |
255 | ||
256 | // Enable MemTable History if not already enabled | |
257 | for (size_t i = 0; i < column_families->size(); i++) { | |
258 | ColumnFamilyOptions* cf_options = &(*column_families)[i].options; | |
259 | ||
260 | if (cf_options->max_write_buffer_number_to_maintain == 0) { | |
261 | // Setting to -1 will set the History size to max_write_buffer_number. | |
262 | cf_options->max_write_buffer_number_to_maintain = -1; | |
263 | } | |
264 | if (!cf_options->disable_auto_compactions) { | |
265 | // Disable compactions momentarily to prevent race with DB::Open | |
266 | cf_options->disable_auto_compactions = true; | |
267 | compaction_enabled_cf_indices->push_back(i); | |
268 | } | |
269 | } | |
270 | db_options->allow_2pc = true; | |
271 | } | |
272 | ||
273 | Status TransactionDB::WrapDB( | |
274 | // make sure this db is already opened with memtable history enabled, | |
275 | // auto compaction distabled and 2 phase commit enabled | |
276 | DB* db, const TransactionDBOptions& txn_db_options, | |
277 | const std::vector<size_t>& compaction_enabled_cf_indices, | |
278 | const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) { | |
11fdf7f2 TL |
279 | assert(db != nullptr); |
280 | assert(dbptr != nullptr); | |
281 | *dbptr = nullptr; | |
282 | std::unique_ptr<PessimisticTransactionDB> txn_db; | |
283 | switch (txn_db_options.write_policy) { | |
284 | case WRITE_UNPREPARED: | |
285 | txn_db.reset(new WriteUnpreparedTxnDB( | |
286 | db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); | |
287 | break; | |
288 | case WRITE_PREPARED: | |
289 | txn_db.reset(new WritePreparedTxnDB( | |
290 | db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); | |
291 | break; | |
292 | case WRITE_COMMITTED: | |
293 | default: | |
294 | txn_db.reset(new WriteCommittedTxnDB( | |
295 | db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); | |
296 | } | |
297 | txn_db->UpdateCFComparatorMap(handles); | |
7c673cae | 298 | Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); |
11fdf7f2 TL |
299 | // In case of a failure at this point, db is deleted via the txn_db destructor |
300 | // and set to nullptr. | |
301 | if (s.ok()) { | |
302 | *dbptr = txn_db.release(); | |
303 | } | |
7c673cae FG |
304 | return s; |
305 | } | |
306 | ||
307 | Status TransactionDB::WrapStackableDB( | |
308 | // make sure this stackable_db is already opened with memtable history | |
11fdf7f2 | 309 | // enabled, auto compaction distabled and 2 phase commit enabled |
7c673cae FG |
310 | StackableDB* db, const TransactionDBOptions& txn_db_options, |
311 | const std::vector<size_t>& compaction_enabled_cf_indices, | |
312 | const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) { | |
11fdf7f2 TL |
313 | assert(db != nullptr); |
314 | assert(dbptr != nullptr); | |
315 | *dbptr = nullptr; | |
316 | std::unique_ptr<PessimisticTransactionDB> txn_db; | |
317 | ||
318 | switch (txn_db_options.write_policy) { | |
319 | case WRITE_UNPREPARED: | |
320 | txn_db.reset(new WriteUnpreparedTxnDB( | |
321 | db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); | |
322 | break; | |
323 | case WRITE_PREPARED: | |
324 | txn_db.reset(new WritePreparedTxnDB( | |
325 | db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); | |
326 | break; | |
327 | case WRITE_COMMITTED: | |
328 | default: | |
329 | txn_db.reset(new WriteCommittedTxnDB( | |
330 | db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); | |
331 | } | |
332 | txn_db->UpdateCFComparatorMap(handles); | |
7c673cae | 333 | Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); |
11fdf7f2 TL |
334 | // In case of a failure at this point, db is deleted via the txn_db destructor |
335 | // and set to nullptr. | |
336 | if (s.ok()) { | |
337 | *dbptr = txn_db.release(); | |
338 | } | |
7c673cae FG |
339 | return s; |
340 | } | |
341 | ||
342 | // Let TransactionLockMgr know that this column family exists so it can | |
343 | // allocate a LockMap for it. | |
11fdf7f2 TL |
344 | void PessimisticTransactionDB::AddColumnFamily( |
345 | const ColumnFamilyHandle* handle) { | |
7c673cae FG |
346 | lock_mgr_.AddColumnFamily(handle->GetID()); |
347 | } | |
348 | ||
11fdf7f2 | 349 | Status PessimisticTransactionDB::CreateColumnFamily( |
7c673cae FG |
350 | const ColumnFamilyOptions& options, const std::string& column_family_name, |
351 | ColumnFamilyHandle** handle) { | |
352 | InstrumentedMutexLock l(&column_family_mutex_); | |
11fdf7f2 TL |
353 | Status s = VerifyCFOptions(options); |
354 | if (!s.ok()) { | |
355 | return s; | |
356 | } | |
7c673cae | 357 | |
11fdf7f2 | 358 | s = db_->CreateColumnFamily(options, column_family_name, handle); |
7c673cae FG |
359 | if (s.ok()) { |
360 | lock_mgr_.AddColumnFamily((*handle)->GetID()); | |
11fdf7f2 | 361 | UpdateCFComparatorMap(*handle); |
7c673cae FG |
362 | } |
363 | ||
364 | return s; | |
365 | } | |
366 | ||
367 | // Let TransactionLockMgr know that it can deallocate the LockMap for this | |
368 | // column family. | |
11fdf7f2 TL |
369 | Status PessimisticTransactionDB::DropColumnFamily( |
370 | ColumnFamilyHandle* column_family) { | |
7c673cae FG |
371 | InstrumentedMutexLock l(&column_family_mutex_); |
372 | ||
373 | Status s = db_->DropColumnFamily(column_family); | |
374 | if (s.ok()) { | |
375 | lock_mgr_.RemoveColumnFamily(column_family->GetID()); | |
376 | } | |
377 | ||
378 | return s; | |
379 | } | |
380 | ||
11fdf7f2 TL |
381 | Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, |
382 | uint32_t cfh_id, | |
383 | const std::string& key, | |
384 | bool exclusive) { | |
7c673cae FG |
385 | return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); |
386 | } | |
387 | ||
11fdf7f2 TL |
388 | void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, |
389 | const TransactionKeyMap* keys) { | |
7c673cae FG |
390 | lock_mgr_.UnLock(txn, keys, GetEnv()); |
391 | } | |
392 | ||
11fdf7f2 TL |
393 | void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, |
394 | uint32_t cfh_id, const std::string& key) { | |
7c673cae FG |
395 | lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); |
396 | } | |
397 | ||
398 | // Used when wrapping DB write operations in a transaction | |
11fdf7f2 | 399 | Transaction* PessimisticTransactionDB::BeginInternalTransaction( |
7c673cae FG |
400 | const WriteOptions& options) { |
401 | TransactionOptions txn_options; | |
402 | Transaction* txn = BeginTransaction(options, txn_options, nullptr); | |
403 | ||
404 | // Use default timeout for non-transactional writes | |
405 | txn->SetLockTimeout(txn_db_options_.default_lock_timeout); | |
406 | return txn; | |
407 | } | |
408 | ||
409 | // All user Put, Merge, Delete, and Write requests must be intercepted to make | |
410 | // sure that they lock all keys that they are writing to avoid causing conflicts | |
11fdf7f2 | 411 | // with any concurrent transactions. The easiest way to do this is to wrap all |
7c673cae FG |
412 | // write operations in a transaction. |
413 | // | |
414 | // Put(), Merge(), and Delete() only lock a single key per call. Write() will | |
415 | // sort its keys before locking them. This guarantees that TransactionDB write | |
11fdf7f2 | 416 | // methods cannot deadlock with each other (but still could deadlock with a |
7c673cae | 417 | // Transaction). |
11fdf7f2 TL |
418 | Status PessimisticTransactionDB::Put(const WriteOptions& options, |
419 | ColumnFamilyHandle* column_family, | |
420 | const Slice& key, const Slice& val) { | |
7c673cae FG |
421 | Status s; |
422 | ||
423 | Transaction* txn = BeginInternalTransaction(options); | |
424 | txn->DisableIndexing(); | |
425 | ||
426 | // Since the client didn't create a transaction, they don't care about | |
427 | // conflict checking for this write. So we just need to do PutUntracked(). | |
428 | s = txn->PutUntracked(column_family, key, val); | |
429 | ||
430 | if (s.ok()) { | |
431 | s = txn->Commit(); | |
432 | } | |
433 | ||
434 | delete txn; | |
435 | ||
436 | return s; | |
437 | } | |
438 | ||
11fdf7f2 TL |
439 | Status PessimisticTransactionDB::Delete(const WriteOptions& wopts, |
440 | ColumnFamilyHandle* column_family, | |
441 | const Slice& key) { | |
7c673cae FG |
442 | Status s; |
443 | ||
444 | Transaction* txn = BeginInternalTransaction(wopts); | |
445 | txn->DisableIndexing(); | |
446 | ||
447 | // Since the client didn't create a transaction, they don't care about | |
448 | // conflict checking for this write. So we just need to do | |
449 | // DeleteUntracked(). | |
450 | s = txn->DeleteUntracked(column_family, key); | |
451 | ||
452 | if (s.ok()) { | |
453 | s = txn->Commit(); | |
454 | } | |
455 | ||
456 | delete txn; | |
457 | ||
458 | return s; | |
459 | } | |
460 | ||
11fdf7f2 TL |
461 | Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts, |
462 | ColumnFamilyHandle* column_family, | |
463 | const Slice& key) { | |
464 | Status s; | |
465 | ||
466 | Transaction* txn = BeginInternalTransaction(wopts); | |
467 | txn->DisableIndexing(); | |
468 | ||
469 | // Since the client didn't create a transaction, they don't care about | |
470 | // conflict checking for this write. So we just need to do | |
471 | // SingleDeleteUntracked(). | |
472 | s = txn->SingleDeleteUntracked(column_family, key); | |
473 | ||
474 | if (s.ok()) { | |
475 | s = txn->Commit(); | |
476 | } | |
477 | ||
478 | delete txn; | |
479 | ||
480 | return s; | |
481 | } | |
482 | ||
483 | Status PessimisticTransactionDB::Merge(const WriteOptions& options, | |
484 | ColumnFamilyHandle* column_family, | |
485 | const Slice& key, const Slice& value) { | |
7c673cae FG |
486 | Status s; |
487 | ||
488 | Transaction* txn = BeginInternalTransaction(options); | |
489 | txn->DisableIndexing(); | |
490 | ||
491 | // Since the client didn't create a transaction, they don't care about | |
492 | // conflict checking for this write. So we just need to do | |
493 | // MergeUntracked(). | |
494 | s = txn->MergeUntracked(column_family, key, value); | |
495 | ||
496 | if (s.ok()) { | |
497 | s = txn->Commit(); | |
498 | } | |
499 | ||
500 | delete txn; | |
501 | ||
502 | return s; | |
503 | } | |
504 | ||
11fdf7f2 TL |
505 | Status PessimisticTransactionDB::Write(const WriteOptions& opts, |
506 | WriteBatch* updates) { | |
7c673cae FG |
507 | // Need to lock all keys in this batch to prevent write conflicts with |
508 | // concurrent transactions. | |
509 | Transaction* txn = BeginInternalTransaction(opts); | |
510 | txn->DisableIndexing(); | |
511 | ||
11fdf7f2 TL |
512 | auto txn_impl = |
513 | static_cast_with_check<PessimisticTransaction, Transaction>(txn); | |
7c673cae FG |
514 | |
515 | // Since commitBatch sorts the keys before locking, concurrent Write() | |
516 | // operations will not cause a deadlock. | |
517 | // In order to avoid a deadlock with a concurrent Transaction, Transactions | |
518 | // should use a lock timeout. | |
519 | Status s = txn_impl->CommitBatch(updates); | |
520 | ||
521 | delete txn; | |
522 | ||
523 | return s; | |
524 | } | |
525 | ||
11fdf7f2 TL |
526 | Status WriteCommittedTxnDB::Write( |
527 | const WriteOptions& opts, | |
528 | const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) { | |
529 | if (optimizations.skip_concurrency_control) { | |
530 | return db_impl_->Write(opts, updates); | |
531 | } else { | |
532 | return Write(opts, updates); | |
533 | } | |
534 | } | |
535 | ||
536 | void PessimisticTransactionDB::InsertExpirableTransaction( | |
537 | TransactionID tx_id, PessimisticTransaction* tx) { | |
7c673cae FG |
538 | assert(tx->GetExpirationTime() > 0); |
539 | std::lock_guard<std::mutex> lock(map_mutex_); | |
540 | expirable_transactions_map_.insert({tx_id, tx}); | |
541 | } | |
542 | ||
11fdf7f2 | 543 | void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) { |
7c673cae FG |
544 | std::lock_guard<std::mutex> lock(map_mutex_); |
545 | expirable_transactions_map_.erase(tx_id); | |
546 | } | |
547 | ||
11fdf7f2 | 548 | bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks( |
7c673cae FG |
549 | TransactionID tx_id) { |
550 | std::lock_guard<std::mutex> lock(map_mutex_); | |
551 | ||
552 | auto tx_it = expirable_transactions_map_.find(tx_id); | |
553 | if (tx_it == expirable_transactions_map_.end()) { | |
554 | return true; | |
555 | } | |
11fdf7f2 | 556 | PessimisticTransaction& tx = *(tx_it->second); |
7c673cae FG |
557 | return tx.TryStealingLocks(); |
558 | } | |
559 | ||
11fdf7f2 | 560 | void PessimisticTransactionDB::ReinitializeTransaction( |
7c673cae FG |
561 | Transaction* txn, const WriteOptions& write_options, |
562 | const TransactionOptions& txn_options) { | |
11fdf7f2 TL |
563 | auto txn_impl = |
564 | static_cast_with_check<PessimisticTransaction, Transaction>(txn); | |
7c673cae FG |
565 | |
566 | txn_impl->Reinitialize(this, write_options, txn_options); | |
567 | } | |
568 | ||
11fdf7f2 | 569 | Transaction* PessimisticTransactionDB::GetTransactionByName( |
7c673cae FG |
570 | const TransactionName& name) { |
571 | std::lock_guard<std::mutex> lock(name_map_mutex_); | |
572 | auto it = transactions_.find(name); | |
573 | if (it == transactions_.end()) { | |
574 | return nullptr; | |
575 | } else { | |
576 | return it->second; | |
577 | } | |
578 | } | |
579 | ||
11fdf7f2 | 580 | void PessimisticTransactionDB::GetAllPreparedTransactions( |
7c673cae FG |
581 | std::vector<Transaction*>* transv) { |
582 | assert(transv); | |
583 | transv->clear(); | |
584 | std::lock_guard<std::mutex> lock(name_map_mutex_); | |
585 | for (auto it = transactions_.begin(); it != transactions_.end(); it++) { | |
586 | if (it->second->GetState() == Transaction::PREPARED) { | |
587 | transv->push_back(it->second); | |
588 | } | |
589 | } | |
590 | } | |
591 | ||
11fdf7f2 TL |
592 | TransactionLockMgr::LockStatusData |
593 | PessimisticTransactionDB::GetLockStatusData() { | |
7c673cae FG |
594 | return lock_mgr_.GetLockStatusData(); |
595 | } | |
596 | ||
11fdf7f2 TL |
597 | std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() { |
598 | return lock_mgr_.GetDeadlockInfoBuffer(); | |
599 | } | |
600 | ||
601 | void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) { | |
602 | lock_mgr_.Resize(target_size); | |
603 | } | |
604 | ||
605 | void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) { | |
7c673cae FG |
606 | assert(txn); |
607 | assert(txn->GetName().length() > 0); | |
608 | assert(GetTransactionByName(txn->GetName()) == nullptr); | |
609 | assert(txn->GetState() == Transaction::STARTED); | |
610 | std::lock_guard<std::mutex> lock(name_map_mutex_); | |
611 | transactions_[txn->GetName()] = txn; | |
612 | } | |
613 | ||
11fdf7f2 | 614 | void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) { |
7c673cae FG |
615 | assert(txn); |
616 | std::lock_guard<std::mutex> lock(name_map_mutex_); | |
617 | auto it = transactions_.find(txn->GetName()); | |
618 | assert(it != transactions_.end()); | |
619 | transactions_.erase(it); | |
620 | } | |
621 | ||
622 | } // namespace rocksdb | |
623 | #endif // ROCKSDB_LITE |