]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/transaction_base.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/transaction_base.h"
12 #include "db/column_family.h"
13 #include "db/db_impl/db_impl.h"
14 #include "rocksdb/comparator.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/status.h"
17 #include "util/cast_util.h"
18 #include "util/string_util.h"
19 #include "utilities/transactions/lock/lock_tracker.h"
21 namespace ROCKSDB_NAMESPACE
{
23 TransactionBaseImpl::TransactionBaseImpl(
24 DB
* db
, const WriteOptions
& write_options
,
25 const LockTrackerFactory
& lock_tracker_factory
)
27 dbimpl_(static_cast_with_check
<DBImpl
>(db
)),
28 write_options_(write_options
),
29 cmp_(GetColumnFamilyUserComparator(db
->DefaultColumnFamily())),
30 lock_tracker_factory_(lock_tracker_factory
),
31 start_time_(db_
->GetEnv()->NowMicros()),
32 write_batch_(cmp_
, 0, true, 0),
33 tracked_locks_(lock_tracker_factory_
.Create()),
34 indexing_enabled_(true) {
35 assert(dynamic_cast<DBImpl
*>(db_
) != nullptr);
37 if (dbimpl_
->allow_2pc()) {
42 TransactionBaseImpl::~TransactionBaseImpl() {
43 // Release snapshot if snapshot is set
44 SetSnapshotInternal(nullptr);
47 void TransactionBaseImpl::Clear() {
48 save_points_
.reset(nullptr);
50 commit_time_batch_
.Clear();
51 tracked_locks_
->Clear();
56 if (dbimpl_
->allow_2pc()) {
61 void TransactionBaseImpl::Reinitialize(DB
* db
,
62 const WriteOptions
& write_options
) {
69 write_options_
= write_options
;
70 start_time_
= db_
->GetEnv()->NowMicros();
71 indexing_enabled_
= true;
72 cmp_
= GetColumnFamilyUserComparator(db_
->DefaultColumnFamily());
75 void TransactionBaseImpl::SetSnapshot() {
76 const Snapshot
* snapshot
= dbimpl_
->GetSnapshotForWriteConflictBoundary();
77 SetSnapshotInternal(snapshot
);
80 void TransactionBaseImpl::SetSnapshotInternal(const Snapshot
* snapshot
) {
81 // Set a custom deleter for the snapshot_ SharedPtr as the snapshot needs to
82 // be released, not deleted when it is no longer referenced.
83 snapshot_
.reset(snapshot
, std::bind(&TransactionBaseImpl::ReleaseSnapshot
,
84 this, std::placeholders::_1
, db_
));
85 snapshot_needed_
= false;
86 snapshot_notifier_
= nullptr;
89 void TransactionBaseImpl::SetSnapshotOnNextOperation(
90 std::shared_ptr
<TransactionNotifier
> notifier
) {
91 snapshot_needed_
= true;
92 snapshot_notifier_
= notifier
;
95 void TransactionBaseImpl::SetSnapshotIfNeeded() {
96 if (snapshot_needed_
) {
97 std::shared_ptr
<TransactionNotifier
> notifier
= snapshot_notifier_
;
99 if (notifier
!= nullptr) {
100 notifier
->SnapshotCreated(GetSnapshot());
105 Status
TransactionBaseImpl::TryLock(ColumnFamilyHandle
* column_family
,
106 const SliceParts
& key
, bool read_only
,
107 bool exclusive
, const bool do_validate
,
108 const bool assume_tracked
) {
110 for (int i
= 0; i
< key
.num_parts
; ++i
) {
111 key_size
+= key
.parts
[i
].size();
115 str
.reserve(key_size
);
117 for (int i
= 0; i
< key
.num_parts
; ++i
) {
118 str
.append(key
.parts
[i
].data(), key
.parts
[i
].size());
121 return TryLock(column_family
, str
, read_only
, exclusive
, do_validate
,
125 void TransactionBaseImpl::SetSavePoint() {
126 if (save_points_
== nullptr) {
127 save_points_
.reset(new std::stack
<TransactionBaseImpl::SavePoint
, autovector
<TransactionBaseImpl::SavePoint
>>());
129 save_points_
->emplace(snapshot_
, snapshot_needed_
, snapshot_notifier_
,
130 num_puts_
, num_deletes_
, num_merges_
,
131 lock_tracker_factory_
);
132 write_batch_
.SetSavePoint();
135 Status
TransactionBaseImpl::RollbackToSavePoint() {
136 if (save_points_
!= nullptr && save_points_
->size() > 0) {
137 // Restore saved SavePoint
138 TransactionBaseImpl::SavePoint
& save_point
= save_points_
->top();
139 snapshot_
= save_point
.snapshot_
;
140 snapshot_needed_
= save_point
.snapshot_needed_
;
141 snapshot_notifier_
= save_point
.snapshot_notifier_
;
142 num_puts_
= save_point
.num_puts_
;
143 num_deletes_
= save_point
.num_deletes_
;
144 num_merges_
= save_point
.num_merges_
;
147 Status s
= write_batch_
.RollbackToSavePoint();
150 // Rollback any keys that were tracked since the last savepoint
151 tracked_locks_
->Subtract(*save_point
.new_locks_
);
157 assert(write_batch_
.RollbackToSavePoint().IsNotFound());
158 return Status::NotFound();
162 Status
TransactionBaseImpl::PopSavePoint() {
163 if (save_points_
== nullptr ||
164 save_points_
->empty()) {
166 assert(write_batch_
.PopSavePoint().IsNotFound());
167 return Status::NotFound();
170 assert(!save_points_
->empty());
171 // If there is another savepoint A below the current savepoint B, then A needs
172 // to inherit tracked_keys in B so that if we rollback to savepoint A, we
173 // remember to unlock keys in B. If there is no other savepoint below, then we
174 // can safely discard savepoint info.
175 if (save_points_
->size() == 1) {
178 TransactionBaseImpl::SavePoint
top(lock_tracker_factory_
);
179 std::swap(top
, save_points_
->top());
182 save_points_
->top().new_locks_
->Merge(*top
.new_locks_
);
185 return write_batch_
.PopSavePoint();
188 Status
TransactionBaseImpl::Get(const ReadOptions
& read_options
,
189 ColumnFamilyHandle
* column_family
,
190 const Slice
& key
, std::string
* value
) {
191 assert(value
!= nullptr);
192 PinnableSlice
pinnable_val(value
);
193 assert(!pinnable_val
.IsPinned());
194 auto s
= Get(read_options
, column_family
, key
, &pinnable_val
);
195 if (s
.ok() && pinnable_val
.IsPinned()) {
196 value
->assign(pinnable_val
.data(), pinnable_val
.size());
197 } // else value is already assigned
201 Status
TransactionBaseImpl::Get(const ReadOptions
& read_options
,
202 ColumnFamilyHandle
* column_family
,
203 const Slice
& key
, PinnableSlice
* pinnable_val
) {
204 return write_batch_
.GetFromBatchAndDB(db_
, read_options
, column_family
, key
,
208 Status
TransactionBaseImpl::GetForUpdate(const ReadOptions
& read_options
,
209 ColumnFamilyHandle
* column_family
,
210 const Slice
& key
, std::string
* value
,
212 const bool do_validate
) {
213 if (!do_validate
&& read_options
.snapshot
!= nullptr) {
214 return Status::InvalidArgument(
215 "If do_validate is false then GetForUpdate with snapshot is not "
219 TryLock(column_family
, key
, true /* read_only */, exclusive
, do_validate
);
221 if (s
.ok() && value
!= nullptr) {
222 assert(value
!= nullptr);
223 PinnableSlice
pinnable_val(value
);
224 assert(!pinnable_val
.IsPinned());
225 s
= Get(read_options
, column_family
, key
, &pinnable_val
);
226 if (s
.ok() && pinnable_val
.IsPinned()) {
227 value
->assign(pinnable_val
.data(), pinnable_val
.size());
228 } // else value is already assigned
233 Status
TransactionBaseImpl::GetForUpdate(const ReadOptions
& read_options
,
234 ColumnFamilyHandle
* column_family
,
236 PinnableSlice
* pinnable_val
,
238 const bool do_validate
) {
239 if (!do_validate
&& read_options
.snapshot
!= nullptr) {
240 return Status::InvalidArgument(
241 "If do_validate is false then GetForUpdate with snapshot is not "
245 TryLock(column_family
, key
, true /* read_only */, exclusive
, do_validate
);
247 if (s
.ok() && pinnable_val
!= nullptr) {
248 s
= Get(read_options
, column_family
, key
, pinnable_val
);
253 std::vector
<Status
> TransactionBaseImpl::MultiGet(
254 const ReadOptions
& read_options
,
255 const std::vector
<ColumnFamilyHandle
*>& column_family
,
256 const std::vector
<Slice
>& keys
, std::vector
<std::string
>* values
) {
257 size_t num_keys
= keys
.size();
258 values
->resize(num_keys
);
260 std::vector
<Status
> stat_list(num_keys
);
261 for (size_t i
= 0; i
< num_keys
; ++i
) {
262 stat_list
[i
] = Get(read_options
, column_family
[i
], keys
[i
], &(*values
)[i
]);
268 void TransactionBaseImpl::MultiGet(const ReadOptions
& read_options
,
269 ColumnFamilyHandle
* column_family
,
270 const size_t num_keys
, const Slice
* keys
,
271 PinnableSlice
* values
, Status
* statuses
,
272 const bool sorted_input
) {
273 write_batch_
.MultiGetFromBatchAndDB(db_
, read_options
, column_family
,
274 num_keys
, keys
, values
, statuses
,
278 std::vector
<Status
> TransactionBaseImpl::MultiGetForUpdate(
279 const ReadOptions
& read_options
,
280 const std::vector
<ColumnFamilyHandle
*>& column_family
,
281 const std::vector
<Slice
>& keys
, std::vector
<std::string
>* values
) {
282 // Regardless of whether the MultiGet succeeded, track these keys.
283 size_t num_keys
= keys
.size();
284 values
->resize(num_keys
);
287 for (size_t i
= 0; i
< num_keys
; ++i
) {
288 Status s
= TryLock(column_family
[i
], keys
[i
], true /* read_only */,
289 true /* exclusive */);
291 // Fail entire multiget if we cannot lock all keys
292 return std::vector
<Status
>(num_keys
, s
);
296 // TODO(agiardullo): optimize multiget?
297 std::vector
<Status
> stat_list(num_keys
);
298 for (size_t i
= 0; i
< num_keys
; ++i
) {
299 stat_list
[i
] = Get(read_options
, column_family
[i
], keys
[i
], &(*values
)[i
]);
305 Iterator
* TransactionBaseImpl::GetIterator(const ReadOptions
& read_options
) {
306 Iterator
* db_iter
= db_
->NewIterator(read_options
);
309 return write_batch_
.NewIteratorWithBase(db_iter
);
312 Iterator
* TransactionBaseImpl::GetIterator(const ReadOptions
& read_options
,
313 ColumnFamilyHandle
* column_family
) {
314 Iterator
* db_iter
= db_
->NewIterator(read_options
, column_family
);
317 return write_batch_
.NewIteratorWithBase(column_family
, db_iter
,
321 Status
TransactionBaseImpl::Put(ColumnFamilyHandle
* column_family
,
322 const Slice
& key
, const Slice
& value
,
323 const bool assume_tracked
) {
324 const bool do_validate
= !assume_tracked
;
325 Status s
= TryLock(column_family
, key
, false /* read_only */,
326 true /* exclusive */, do_validate
, assume_tracked
);
329 s
= GetBatchForWrite()->Put(column_family
, key
, value
);
338 Status
TransactionBaseImpl::Put(ColumnFamilyHandle
* column_family
,
339 const SliceParts
& key
, const SliceParts
& value
,
340 const bool assume_tracked
) {
341 const bool do_validate
= !assume_tracked
;
342 Status s
= TryLock(column_family
, key
, false /* read_only */,
343 true /* exclusive */, do_validate
, assume_tracked
);
346 s
= GetBatchForWrite()->Put(column_family
, key
, value
);
355 Status
TransactionBaseImpl::Merge(ColumnFamilyHandle
* column_family
,
356 const Slice
& key
, const Slice
& value
,
357 const bool assume_tracked
) {
358 const bool do_validate
= !assume_tracked
;
359 Status s
= TryLock(column_family
, key
, false /* read_only */,
360 true /* exclusive */, do_validate
, assume_tracked
);
363 s
= GetBatchForWrite()->Merge(column_family
, key
, value
);
372 Status
TransactionBaseImpl::Delete(ColumnFamilyHandle
* column_family
,
374 const bool assume_tracked
) {
375 const bool do_validate
= !assume_tracked
;
376 Status s
= TryLock(column_family
, key
, false /* read_only */,
377 true /* exclusive */, do_validate
, assume_tracked
);
380 s
= GetBatchForWrite()->Delete(column_family
, key
);
389 Status
TransactionBaseImpl::Delete(ColumnFamilyHandle
* column_family
,
390 const SliceParts
& key
,
391 const bool assume_tracked
) {
392 const bool do_validate
= !assume_tracked
;
393 Status s
= TryLock(column_family
, key
, false /* read_only */,
394 true /* exclusive */, do_validate
, assume_tracked
);
397 s
= GetBatchForWrite()->Delete(column_family
, key
);
406 Status
TransactionBaseImpl::SingleDelete(ColumnFamilyHandle
* column_family
,
408 const bool assume_tracked
) {
409 const bool do_validate
= !assume_tracked
;
410 Status s
= TryLock(column_family
, key
, false /* read_only */,
411 true /* exclusive */, do_validate
, assume_tracked
);
414 s
= GetBatchForWrite()->SingleDelete(column_family
, key
);
423 Status
TransactionBaseImpl::SingleDelete(ColumnFamilyHandle
* column_family
,
424 const SliceParts
& key
,
425 const bool assume_tracked
) {
426 const bool do_validate
= !assume_tracked
;
427 Status s
= TryLock(column_family
, key
, false /* read_only */,
428 true /* exclusive */, do_validate
, assume_tracked
);
431 s
= GetBatchForWrite()->SingleDelete(column_family
, key
);
440 Status
TransactionBaseImpl::PutUntracked(ColumnFamilyHandle
* column_family
,
441 const Slice
& key
, const Slice
& value
) {
442 Status s
= TryLock(column_family
, key
, false /* read_only */,
443 true /* exclusive */, false /* do_validate */);
446 s
= GetBatchForWrite()->Put(column_family
, key
, value
);
455 Status
TransactionBaseImpl::PutUntracked(ColumnFamilyHandle
* column_family
,
456 const SliceParts
& key
,
457 const SliceParts
& value
) {
458 Status s
= TryLock(column_family
, key
, false /* read_only */,
459 true /* exclusive */, false /* do_validate */);
462 s
= GetBatchForWrite()->Put(column_family
, key
, value
);
471 Status
TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle
* column_family
,
473 const Slice
& value
) {
474 Status s
= TryLock(column_family
, key
, false /* read_only */,
475 true /* exclusive */, false /* do_validate */);
478 s
= GetBatchForWrite()->Merge(column_family
, key
, value
);
487 Status
TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle
* column_family
,
489 Status s
= TryLock(column_family
, key
, false /* read_only */,
490 true /* exclusive */, false /* do_validate */);
493 s
= GetBatchForWrite()->Delete(column_family
, key
);
502 Status
TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle
* column_family
,
503 const SliceParts
& key
) {
504 Status s
= TryLock(column_family
, key
, false /* read_only */,
505 true /* exclusive */, false /* do_validate */);
508 s
= GetBatchForWrite()->Delete(column_family
, key
);
517 Status
TransactionBaseImpl::SingleDeleteUntracked(
518 ColumnFamilyHandle
* column_family
, const Slice
& key
) {
519 Status s
= TryLock(column_family
, key
, false /* read_only */,
520 true /* exclusive */, false /* do_validate */);
523 s
= GetBatchForWrite()->SingleDelete(column_family
, key
);
532 void TransactionBaseImpl::PutLogData(const Slice
& blob
) {
533 auto s
= write_batch_
.PutLogData(blob
);
538 WriteBatchWithIndex
* TransactionBaseImpl::GetWriteBatch() {
539 return &write_batch_
;
542 uint64_t TransactionBaseImpl::GetElapsedTime() const {
543 return (db_
->GetEnv()->NowMicros() - start_time_
) / 1000;
546 uint64_t TransactionBaseImpl::GetNumPuts() const { return num_puts_
; }
548 uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_
; }
550 uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_
; }
552 uint64_t TransactionBaseImpl::GetNumKeys() const {
553 return tracked_locks_
->GetNumPointLocks();
556 void TransactionBaseImpl::TrackKey(uint32_t cfh_id
, const std::string
& key
,
557 SequenceNumber seq
, bool read_only
,
560 r
.column_family_id
= cfh_id
;
563 r
.read_only
= read_only
;
564 r
.exclusive
= exclusive
;
566 // Update map of all tracked keys for this transaction
567 tracked_locks_
->Track(r
);
569 if (save_points_
!= nullptr && !save_points_
->empty()) {
570 // Update map of tracked keys in this SavePoint
571 save_points_
->top().new_locks_
->Track(r
);
575 // Gets the write batch that should be used for Put/Merge/Deletes.
577 // Returns either a WriteBatch or WriteBatchWithIndex depending on whether
578 // DisableIndexing() has been called.
579 WriteBatchBase
* TransactionBaseImpl::GetBatchForWrite() {
580 if (indexing_enabled_
) {
581 // Use WriteBatchWithIndex
582 return &write_batch_
;
584 // Don't use WriteBatchWithIndex. Return base WriteBatch.
585 return write_batch_
.GetWriteBatch();
589 void TransactionBaseImpl::ReleaseSnapshot(const Snapshot
* snapshot
, DB
* db
) {
590 if (snapshot
!= nullptr) {
591 ROCKS_LOG_DETAILS(dbimpl_
->immutable_db_options().info_log
,
592 "ReleaseSnapshot %" PRIu64
" Set",
593 snapshot
->GetSequenceNumber());
594 db
->ReleaseSnapshot(snapshot
);
598 void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle
* column_family
,
601 r
.column_family_id
= GetColumnFamilyID(column_family
);
602 r
.key
= key
.ToString();
605 bool can_untrack
= false;
606 if (save_points_
!= nullptr && !save_points_
->empty()) {
607 // If there is no GetForUpdate of the key in this save point,
608 // then cannot untrack from the global lock tracker.
609 UntrackStatus s
= save_points_
->top().new_locks_
->Untrack(r
);
610 can_untrack
= (s
!= UntrackStatus::NOT_TRACKED
);
612 // No save point, so can untrack from the global lock tracker.
617 // If erased from the global tracker, then can unlock the key.
618 UntrackStatus s
= tracked_locks_
->Untrack(r
);
619 bool can_unlock
= (s
== UntrackStatus::REMOVED
);
621 UnlockGetForUpdate(column_family
, key
);
626 Status
TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch
* src_batch
) {
627 struct IndexedWriteBatchBuilder
: public WriteBatch::Handler
{
630 IndexedWriteBatchBuilder(Transaction
* txn
, DBImpl
* db
)
631 : txn_(txn
), db_(db
) {
632 assert(dynamic_cast<TransactionBaseImpl
*>(txn_
) != nullptr);
635 Status
PutCF(uint32_t cf
, const Slice
& key
, const Slice
& val
) override
{
636 return txn_
->Put(db_
->GetColumnFamilyHandle(cf
), key
, val
);
639 Status
DeleteCF(uint32_t cf
, const Slice
& key
) override
{
640 return txn_
->Delete(db_
->GetColumnFamilyHandle(cf
), key
);
643 Status
SingleDeleteCF(uint32_t cf
, const Slice
& key
) override
{
644 return txn_
->SingleDelete(db_
->GetColumnFamilyHandle(cf
), key
);
647 Status
MergeCF(uint32_t cf
, const Slice
& key
, const Slice
& val
) override
{
648 return txn_
->Merge(db_
->GetColumnFamilyHandle(cf
), key
, val
);
651 // this is used for reconstructing prepared transactions upon
652 // recovery. there should not be any meta markers in the batches
653 // we are processing.
654 Status
MarkBeginPrepare(bool) override
{ return Status::InvalidArgument(); }
656 Status
MarkEndPrepare(const Slice
&) override
{
657 return Status::InvalidArgument();
660 Status
MarkCommit(const Slice
&) override
{
661 return Status::InvalidArgument();
664 Status
MarkRollback(const Slice
&) override
{
665 return Status::InvalidArgument();
669 IndexedWriteBatchBuilder
copycat(this, dbimpl_
);
670 return src_batch
->Iterate(©cat
);
673 WriteBatch
* TransactionBaseImpl::GetCommitTimeWriteBatch() {
674 return &commit_time_batch_
;
676 } // namespace ROCKSDB_NAMESPACE
678 #endif // ROCKSDB_LITE