]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/transaction_base.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / transaction_base.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/transaction_base.h"
9
10 #include <cinttypes>
11
12 #include "db/column_family.h"
13 #include "db/db_impl/db_impl.h"
14 #include "rocksdb/comparator.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/status.h"
17 #include "util/cast_util.h"
18 #include "util/string_util.h"
19 #include "utilities/transactions/lock/lock_tracker.h"
20
21 namespace ROCKSDB_NAMESPACE {
22
23 TransactionBaseImpl::TransactionBaseImpl(
24 DB* db, const WriteOptions& write_options,
25 const LockTrackerFactory& lock_tracker_factory)
26 : db_(db),
27 dbimpl_(static_cast_with_check<DBImpl>(db)),
28 write_options_(write_options),
29 cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
30 lock_tracker_factory_(lock_tracker_factory),
31 start_time_(db_->GetEnv()->NowMicros()),
32 write_batch_(cmp_, 0, true, 0),
33 tracked_locks_(lock_tracker_factory_.Create()),
34 indexing_enabled_(true) {
35 assert(dynamic_cast<DBImpl*>(db_) != nullptr);
36 log_number_ = 0;
37 if (dbimpl_->allow_2pc()) {
38 InitWriteBatch();
39 }
40 }
41
42 TransactionBaseImpl::~TransactionBaseImpl() {
43 // Release snapshot if snapshot is set
44 SetSnapshotInternal(nullptr);
45 }
46
47 void TransactionBaseImpl::Clear() {
48 save_points_.reset(nullptr);
49 write_batch_.Clear();
50 commit_time_batch_.Clear();
51 tracked_locks_->Clear();
52 num_puts_ = 0;
53 num_deletes_ = 0;
54 num_merges_ = 0;
55
56 if (dbimpl_->allow_2pc()) {
57 InitWriteBatch();
58 }
59 }
60
61 void TransactionBaseImpl::Reinitialize(DB* db,
62 const WriteOptions& write_options) {
63 Clear();
64 ClearSnapshot();
65 id_ = 0;
66 db_ = db;
67 name_.clear();
68 log_number_ = 0;
69 write_options_ = write_options;
70 start_time_ = db_->GetEnv()->NowMicros();
71 indexing_enabled_ = true;
72 cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily());
73 }
74
75 void TransactionBaseImpl::SetSnapshot() {
76 const Snapshot* snapshot = dbimpl_->GetSnapshotForWriteConflictBoundary();
77 SetSnapshotInternal(snapshot);
78 }
79
80 void TransactionBaseImpl::SetSnapshotInternal(const Snapshot* snapshot) {
81 // Set a custom deleter for the snapshot_ SharedPtr as the snapshot needs to
82 // be released, not deleted when it is no longer referenced.
83 snapshot_.reset(snapshot, std::bind(&TransactionBaseImpl::ReleaseSnapshot,
84 this, std::placeholders::_1, db_));
85 snapshot_needed_ = false;
86 snapshot_notifier_ = nullptr;
87 }
88
89 void TransactionBaseImpl::SetSnapshotOnNextOperation(
90 std::shared_ptr<TransactionNotifier> notifier) {
91 snapshot_needed_ = true;
92 snapshot_notifier_ = notifier;
93 }
94
95 void TransactionBaseImpl::SetSnapshotIfNeeded() {
96 if (snapshot_needed_) {
97 std::shared_ptr<TransactionNotifier> notifier = snapshot_notifier_;
98 SetSnapshot();
99 if (notifier != nullptr) {
100 notifier->SnapshotCreated(GetSnapshot());
101 }
102 }
103 }
104
105 Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
106 const SliceParts& key, bool read_only,
107 bool exclusive, const bool do_validate,
108 const bool assume_tracked) {
109 size_t key_size = 0;
110 for (int i = 0; i < key.num_parts; ++i) {
111 key_size += key.parts[i].size();
112 }
113
114 std::string str;
115 str.reserve(key_size);
116
117 for (int i = 0; i < key.num_parts; ++i) {
118 str.append(key.parts[i].data(), key.parts[i].size());
119 }
120
121 return TryLock(column_family, str, read_only, exclusive, do_validate,
122 assume_tracked);
123 }
124
125 void TransactionBaseImpl::SetSavePoint() {
126 if (save_points_ == nullptr) {
127 save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint, autovector<TransactionBaseImpl::SavePoint>>());
128 }
129 save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
130 num_puts_, num_deletes_, num_merges_,
131 lock_tracker_factory_);
132 write_batch_.SetSavePoint();
133 }
134
135 Status TransactionBaseImpl::RollbackToSavePoint() {
136 if (save_points_ != nullptr && save_points_->size() > 0) {
137 // Restore saved SavePoint
138 TransactionBaseImpl::SavePoint& save_point = save_points_->top();
139 snapshot_ = save_point.snapshot_;
140 snapshot_needed_ = save_point.snapshot_needed_;
141 snapshot_notifier_ = save_point.snapshot_notifier_;
142 num_puts_ = save_point.num_puts_;
143 num_deletes_ = save_point.num_deletes_;
144 num_merges_ = save_point.num_merges_;
145
146 // Rollback batch
147 Status s = write_batch_.RollbackToSavePoint();
148 assert(s.ok());
149
150 // Rollback any keys that were tracked since the last savepoint
151 tracked_locks_->Subtract(*save_point.new_locks_);
152
153 save_points_->pop();
154
155 return s;
156 } else {
157 assert(write_batch_.RollbackToSavePoint().IsNotFound());
158 return Status::NotFound();
159 }
160 }
161
162 Status TransactionBaseImpl::PopSavePoint() {
163 if (save_points_ == nullptr ||
164 save_points_->empty()) {
165 // No SavePoint yet.
166 assert(write_batch_.PopSavePoint().IsNotFound());
167 return Status::NotFound();
168 }
169
170 assert(!save_points_->empty());
171 // If there is another savepoint A below the current savepoint B, then A needs
172 // to inherit tracked_keys in B so that if we rollback to savepoint A, we
173 // remember to unlock keys in B. If there is no other savepoint below, then we
174 // can safely discard savepoint info.
175 if (save_points_->size() == 1) {
176 save_points_->pop();
177 } else {
178 TransactionBaseImpl::SavePoint top(lock_tracker_factory_);
179 std::swap(top, save_points_->top());
180 save_points_->pop();
181
182 save_points_->top().new_locks_->Merge(*top.new_locks_);
183 }
184
185 return write_batch_.PopSavePoint();
186 }
187
188 Status TransactionBaseImpl::Get(const ReadOptions& read_options,
189 ColumnFamilyHandle* column_family,
190 const Slice& key, std::string* value) {
191 assert(value != nullptr);
192 PinnableSlice pinnable_val(value);
193 assert(!pinnable_val.IsPinned());
194 auto s = Get(read_options, column_family, key, &pinnable_val);
195 if (s.ok() && pinnable_val.IsPinned()) {
196 value->assign(pinnable_val.data(), pinnable_val.size());
197 } // else value is already assigned
198 return s;
199 }
200
201 Status TransactionBaseImpl::Get(const ReadOptions& read_options,
202 ColumnFamilyHandle* column_family,
203 const Slice& key, PinnableSlice* pinnable_val) {
204 return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
205 pinnable_val);
206 }
207
208 Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
209 ColumnFamilyHandle* column_family,
210 const Slice& key, std::string* value,
211 bool exclusive,
212 const bool do_validate) {
213 if (!do_validate && read_options.snapshot != nullptr) {
214 return Status::InvalidArgument(
215 "If do_validate is false then GetForUpdate with snapshot is not "
216 "defined.");
217 }
218 Status s =
219 TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
220
221 if (s.ok() && value != nullptr) {
222 assert(value != nullptr);
223 PinnableSlice pinnable_val(value);
224 assert(!pinnable_val.IsPinned());
225 s = Get(read_options, column_family, key, &pinnable_val);
226 if (s.ok() && pinnable_val.IsPinned()) {
227 value->assign(pinnable_val.data(), pinnable_val.size());
228 } // else value is already assigned
229 }
230 return s;
231 }
232
233 Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
234 ColumnFamilyHandle* column_family,
235 const Slice& key,
236 PinnableSlice* pinnable_val,
237 bool exclusive,
238 const bool do_validate) {
239 if (!do_validate && read_options.snapshot != nullptr) {
240 return Status::InvalidArgument(
241 "If do_validate is false then GetForUpdate with snapshot is not "
242 "defined.");
243 }
244 Status s =
245 TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
246
247 if (s.ok() && pinnable_val != nullptr) {
248 s = Get(read_options, column_family, key, pinnable_val);
249 }
250 return s;
251 }
252
253 std::vector<Status> TransactionBaseImpl::MultiGet(
254 const ReadOptions& read_options,
255 const std::vector<ColumnFamilyHandle*>& column_family,
256 const std::vector<Slice>& keys, std::vector<std::string>* values) {
257 size_t num_keys = keys.size();
258 values->resize(num_keys);
259
260 std::vector<Status> stat_list(num_keys);
261 for (size_t i = 0; i < num_keys; ++i) {
262 stat_list[i] = Get(read_options, column_family[i], keys[i], &(*values)[i]);
263 }
264
265 return stat_list;
266 }
267
268 void TransactionBaseImpl::MultiGet(const ReadOptions& read_options,
269 ColumnFamilyHandle* column_family,
270 const size_t num_keys, const Slice* keys,
271 PinnableSlice* values, Status* statuses,
272 const bool sorted_input) {
273 write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
274 num_keys, keys, values, statuses,
275 sorted_input);
276 }
277
278 std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
279 const ReadOptions& read_options,
280 const std::vector<ColumnFamilyHandle*>& column_family,
281 const std::vector<Slice>& keys, std::vector<std::string>* values) {
282 // Regardless of whether the MultiGet succeeded, track these keys.
283 size_t num_keys = keys.size();
284 values->resize(num_keys);
285
286 // Lock all keys
287 for (size_t i = 0; i < num_keys; ++i) {
288 Status s = TryLock(column_family[i], keys[i], true /* read_only */,
289 true /* exclusive */);
290 if (!s.ok()) {
291 // Fail entire multiget if we cannot lock all keys
292 return std::vector<Status>(num_keys, s);
293 }
294 }
295
296 // TODO(agiardullo): optimize multiget?
297 std::vector<Status> stat_list(num_keys);
298 for (size_t i = 0; i < num_keys; ++i) {
299 stat_list[i] = Get(read_options, column_family[i], keys[i], &(*values)[i]);
300 }
301
302 return stat_list;
303 }
304
305 Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
306 Iterator* db_iter = db_->NewIterator(read_options);
307 assert(db_iter);
308
309 return write_batch_.NewIteratorWithBase(db_iter);
310 }
311
312 Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
313 ColumnFamilyHandle* column_family) {
314 Iterator* db_iter = db_->NewIterator(read_options, column_family);
315 assert(db_iter);
316
317 return write_batch_.NewIteratorWithBase(column_family, db_iter,
318 &read_options);
319 }
320
321 Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
322 const Slice& key, const Slice& value,
323 const bool assume_tracked) {
324 const bool do_validate = !assume_tracked;
325 Status s = TryLock(column_family, key, false /* read_only */,
326 true /* exclusive */, do_validate, assume_tracked);
327
328 if (s.ok()) {
329 s = GetBatchForWrite()->Put(column_family, key, value);
330 if (s.ok()) {
331 num_puts_++;
332 }
333 }
334
335 return s;
336 }
337
338 Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
339 const SliceParts& key, const SliceParts& value,
340 const bool assume_tracked) {
341 const bool do_validate = !assume_tracked;
342 Status s = TryLock(column_family, key, false /* read_only */,
343 true /* exclusive */, do_validate, assume_tracked);
344
345 if (s.ok()) {
346 s = GetBatchForWrite()->Put(column_family, key, value);
347 if (s.ok()) {
348 num_puts_++;
349 }
350 }
351
352 return s;
353 }
354
355 Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
356 const Slice& key, const Slice& value,
357 const bool assume_tracked) {
358 const bool do_validate = !assume_tracked;
359 Status s = TryLock(column_family, key, false /* read_only */,
360 true /* exclusive */, do_validate, assume_tracked);
361
362 if (s.ok()) {
363 s = GetBatchForWrite()->Merge(column_family, key, value);
364 if (s.ok()) {
365 num_merges_++;
366 }
367 }
368
369 return s;
370 }
371
372 Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
373 const Slice& key,
374 const bool assume_tracked) {
375 const bool do_validate = !assume_tracked;
376 Status s = TryLock(column_family, key, false /* read_only */,
377 true /* exclusive */, do_validate, assume_tracked);
378
379 if (s.ok()) {
380 s = GetBatchForWrite()->Delete(column_family, key);
381 if (s.ok()) {
382 num_deletes_++;
383 }
384 }
385
386 return s;
387 }
388
389 Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
390 const SliceParts& key,
391 const bool assume_tracked) {
392 const bool do_validate = !assume_tracked;
393 Status s = TryLock(column_family, key, false /* read_only */,
394 true /* exclusive */, do_validate, assume_tracked);
395
396 if (s.ok()) {
397 s = GetBatchForWrite()->Delete(column_family, key);
398 if (s.ok()) {
399 num_deletes_++;
400 }
401 }
402
403 return s;
404 }
405
406 Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
407 const Slice& key,
408 const bool assume_tracked) {
409 const bool do_validate = !assume_tracked;
410 Status s = TryLock(column_family, key, false /* read_only */,
411 true /* exclusive */, do_validate, assume_tracked);
412
413 if (s.ok()) {
414 s = GetBatchForWrite()->SingleDelete(column_family, key);
415 if (s.ok()) {
416 num_deletes_++;
417 }
418 }
419
420 return s;
421 }
422
423 Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
424 const SliceParts& key,
425 const bool assume_tracked) {
426 const bool do_validate = !assume_tracked;
427 Status s = TryLock(column_family, key, false /* read_only */,
428 true /* exclusive */, do_validate, assume_tracked);
429
430 if (s.ok()) {
431 s = GetBatchForWrite()->SingleDelete(column_family, key);
432 if (s.ok()) {
433 num_deletes_++;
434 }
435 }
436
437 return s;
438 }
439
440 Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
441 const Slice& key, const Slice& value) {
442 Status s = TryLock(column_family, key, false /* read_only */,
443 true /* exclusive */, false /* do_validate */);
444
445 if (s.ok()) {
446 s = GetBatchForWrite()->Put(column_family, key, value);
447 if (s.ok()) {
448 num_puts_++;
449 }
450 }
451
452 return s;
453 }
454
455 Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
456 const SliceParts& key,
457 const SliceParts& value) {
458 Status s = TryLock(column_family, key, false /* read_only */,
459 true /* exclusive */, false /* do_validate */);
460
461 if (s.ok()) {
462 s = GetBatchForWrite()->Put(column_family, key, value);
463 if (s.ok()) {
464 num_puts_++;
465 }
466 }
467
468 return s;
469 }
470
471 Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
472 const Slice& key,
473 const Slice& value) {
474 Status s = TryLock(column_family, key, false /* read_only */,
475 true /* exclusive */, false /* do_validate */);
476
477 if (s.ok()) {
478 s = GetBatchForWrite()->Merge(column_family, key, value);
479 if (s.ok()) {
480 num_merges_++;
481 }
482 }
483
484 return s;
485 }
486
487 Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
488 const Slice& key) {
489 Status s = TryLock(column_family, key, false /* read_only */,
490 true /* exclusive */, false /* do_validate */);
491
492 if (s.ok()) {
493 s = GetBatchForWrite()->Delete(column_family, key);
494 if (s.ok()) {
495 num_deletes_++;
496 }
497 }
498
499 return s;
500 }
501
502 Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
503 const SliceParts& key) {
504 Status s = TryLock(column_family, key, false /* read_only */,
505 true /* exclusive */, false /* do_validate */);
506
507 if (s.ok()) {
508 s = GetBatchForWrite()->Delete(column_family, key);
509 if (s.ok()) {
510 num_deletes_++;
511 }
512 }
513
514 return s;
515 }
516
517 Status TransactionBaseImpl::SingleDeleteUntracked(
518 ColumnFamilyHandle* column_family, const Slice& key) {
519 Status s = TryLock(column_family, key, false /* read_only */,
520 true /* exclusive */, false /* do_validate */);
521
522 if (s.ok()) {
523 s = GetBatchForWrite()->SingleDelete(column_family, key);
524 if (s.ok()) {
525 num_deletes_++;
526 }
527 }
528
529 return s;
530 }
531
532 void TransactionBaseImpl::PutLogData(const Slice& blob) {
533 auto s = write_batch_.PutLogData(blob);
534 (void)s;
535 assert(s.ok());
536 }
537
538 WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() {
539 return &write_batch_;
540 }
541
542 uint64_t TransactionBaseImpl::GetElapsedTime() const {
543 return (db_->GetEnv()->NowMicros() - start_time_) / 1000;
544 }
545
546 uint64_t TransactionBaseImpl::GetNumPuts() const { return num_puts_; }
547
548 uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_; }
549
550 uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; }
551
552 uint64_t TransactionBaseImpl::GetNumKeys() const {
553 return tracked_locks_->GetNumPointLocks();
554 }
555
556 void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
557 SequenceNumber seq, bool read_only,
558 bool exclusive) {
559 PointLockRequest r;
560 r.column_family_id = cfh_id;
561 r.key = key;
562 r.seq = seq;
563 r.read_only = read_only;
564 r.exclusive = exclusive;
565
566 // Update map of all tracked keys for this transaction
567 tracked_locks_->Track(r);
568
569 if (save_points_ != nullptr && !save_points_->empty()) {
570 // Update map of tracked keys in this SavePoint
571 save_points_->top().new_locks_->Track(r);
572 }
573 }
574
575 // Gets the write batch that should be used for Put/Merge/Deletes.
576 //
577 // Returns either a WriteBatch or WriteBatchWithIndex depending on whether
578 // DisableIndexing() has been called.
579 WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() {
580 if (indexing_enabled_) {
581 // Use WriteBatchWithIndex
582 return &write_batch_;
583 } else {
584 // Don't use WriteBatchWithIndex. Return base WriteBatch.
585 return write_batch_.GetWriteBatch();
586 }
587 }
588
589 void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) {
590 if (snapshot != nullptr) {
591 ROCKS_LOG_DETAILS(dbimpl_->immutable_db_options().info_log,
592 "ReleaseSnapshot %" PRIu64 " Set",
593 snapshot->GetSequenceNumber());
594 db->ReleaseSnapshot(snapshot);
595 }
596 }
597
598 void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family,
599 const Slice& key) {
600 PointLockRequest r;
601 r.column_family_id = GetColumnFamilyID(column_family);
602 r.key = key.ToString();
603 r.read_only = true;
604
605 bool can_untrack = false;
606 if (save_points_ != nullptr && !save_points_->empty()) {
607 // If there is no GetForUpdate of the key in this save point,
608 // then cannot untrack from the global lock tracker.
609 UntrackStatus s = save_points_->top().new_locks_->Untrack(r);
610 can_untrack = (s != UntrackStatus::NOT_TRACKED);
611 } else {
612 // No save point, so can untrack from the global lock tracker.
613 can_untrack = true;
614 }
615
616 if (can_untrack) {
617 // If erased from the global tracker, then can unlock the key.
618 UntrackStatus s = tracked_locks_->Untrack(r);
619 bool can_unlock = (s == UntrackStatus::REMOVED);
620 if (can_unlock) {
621 UnlockGetForUpdate(column_family, key);
622 }
623 }
624 }
625
626 Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) {
627 struct IndexedWriteBatchBuilder : public WriteBatch::Handler {
628 Transaction* txn_;
629 DBImpl* db_;
630 IndexedWriteBatchBuilder(Transaction* txn, DBImpl* db)
631 : txn_(txn), db_(db) {
632 assert(dynamic_cast<TransactionBaseImpl*>(txn_) != nullptr);
633 }
634
635 Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
636 return txn_->Put(db_->GetColumnFamilyHandle(cf), key, val);
637 }
638
639 Status DeleteCF(uint32_t cf, const Slice& key) override {
640 return txn_->Delete(db_->GetColumnFamilyHandle(cf), key);
641 }
642
643 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
644 return txn_->SingleDelete(db_->GetColumnFamilyHandle(cf), key);
645 }
646
647 Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
648 return txn_->Merge(db_->GetColumnFamilyHandle(cf), key, val);
649 }
650
651 // this is used for reconstructing prepared transactions upon
652 // recovery. there should not be any meta markers in the batches
653 // we are processing.
654 Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
655
656 Status MarkEndPrepare(const Slice&) override {
657 return Status::InvalidArgument();
658 }
659
660 Status MarkCommit(const Slice&) override {
661 return Status::InvalidArgument();
662 }
663
664 Status MarkRollback(const Slice&) override {
665 return Status::InvalidArgument();
666 }
667 };
668
669 IndexedWriteBatchBuilder copycat(this, dbimpl_);
670 return src_batch->Iterate(&copycat);
671 }
672
673 WriteBatch* TransactionBaseImpl::GetCommitTimeWriteBatch() {
674 return &commit_time_batch_;
675 }
676 } // namespace ROCKSDB_NAMESPACE
677
678 #endif // ROCKSDB_LITE