]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/transaction.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / os / seastore / transaction.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <iostream>
7
8 #include <boost/intrusive/list.hpp>
9
10 #include "crimson/common/log.h"
11 #include "crimson/os/seastore/logging.h"
12 #include "crimson/os/seastore/ordering_handle.h"
13 #include "crimson/os/seastore/seastore_types.h"
14 #include "crimson/os/seastore/cached_extent.h"
15 #include "crimson/os/seastore/root_block.h"
16
17 namespace crimson::os::seastore {
18
19 class SeaStore;
20 class Transaction;
21
22 /**
23 * Transaction
24 *
25 * Representation of in-progress mutation. Used exclusively through Cache methods.
26 */
27 class Transaction {
28 public:
29 using Ref = std::unique_ptr<Transaction>;
30 using on_destruct_func_t = std::function<void(Transaction&)>;
31 enum class get_extent_ret {
32 PRESENT,
33 ABSENT,
34 RETIRED
35 };
36 get_extent_ret get_extent(paddr_t addr, CachedExtentRef *out) {
37 LOG_PREFIX(Transaction::get_extent);
38 if (retired_set.count(addr)) {
39 return get_extent_ret::RETIRED;
40 } else if (auto iter = write_set.find_offset(addr);
41 iter != write_set.end()) {
42 if (out)
43 *out = CachedExtentRef(&*iter);
44 SUBTRACET(seastore_tm, "Found offset {} in write_set: {}", *this, addr, *iter);
45 return get_extent_ret::PRESENT;
46 } else if (
47 auto iter = read_set.find(addr);
48 iter != read_set.end()) {
49 // placeholder in read-set should be in the retired-set
50 // at the same time.
51 assert(iter->ref->get_type() != extent_types_t::RETIRED_PLACEHOLDER);
52 if (out)
53 *out = iter->ref;
54 SUBTRACET(seastore_tm, "Found offset {} in read_set: {}", *this, addr, *(iter->ref));
55 return get_extent_ret::PRESENT;
56 } else {
57 return get_extent_ret::ABSENT;
58 }
59 }
60
61 void add_to_retired_set(CachedExtentRef ref) {
62 ceph_assert(!is_weak());
63 if (ref->is_initial_pending()) {
64 ref->state = CachedExtent::extent_state_t::INVALID;
65 write_set.erase(*ref);
66 } else if (ref->is_mutation_pending()) {
67 ref->state = CachedExtent::extent_state_t::INVALID;
68 write_set.erase(*ref);
69 assert(ref->prior_instance);
70 retired_set.insert(ref->prior_instance);
71 assert(read_set.count(ref->prior_instance->get_paddr()));
72 ref->prior_instance.reset();
73 } else {
74 // && retired_set.count(ref->get_paddr()) == 0
75 // If it's already in the set, insert here will be a noop,
76 // which is what we want.
77 retired_set.insert(ref);
78 }
79 }
80
81 void add_to_read_set(CachedExtentRef ref) {
82 if (is_weak()) return;
83
84 auto [iter, inserted] = read_set.emplace(this, ref);
85 ceph_assert(inserted);
86 }
87
88 void add_fresh_extent(
89 CachedExtentRef ref,
90 bool delayed = false) {
91 LOG_PREFIX(Transaction::add_fresh_extent);
92 ceph_assert(!is_weak());
93 if (delayed) {
94 assert(ref->is_logical());
95 ref->set_paddr(delayed_temp_paddr(delayed_temp_offset));
96 delayed_temp_offset += ref->get_length();
97 delayed_alloc_list.emplace_back(ref->cast<LogicalCachedExtent>());
98 } else {
99 ref->set_paddr(make_record_relative_paddr(offset));
100 offset += ref->get_length();
101 inline_block_list.push_back(ref);
102 }
103 ++fresh_block_stats.num;
104 fresh_block_stats.bytes += ref->get_length();
105 SUBTRACET(seastore_tm, "adding {} to write_set", *this, *ref);
106 write_set.insert(*ref);
107 }
108
109 void mark_delayed_extent_inline(LogicalCachedExtentRef& ref) {
110 LOG_PREFIX(Transaction::mark_delayed_extent_inline);
111 SUBTRACET(seastore_tm, "removing {} from write_set", *this, *ref);
112 write_set.erase(*ref);
113 ref->set_paddr(make_record_relative_paddr(offset));
114 offset += ref->get_length();
115 inline_block_list.push_back(ref);
116 SUBTRACET(seastore_tm, "adding {} to write_set", *this, *ref);
117 write_set.insert(*ref);
118 }
119
120 void mark_delayed_extent_ool(LogicalCachedExtentRef& ref, paddr_t final_addr) {
121 LOG_PREFIX(Transaction::mark_delayed_extent_ool);
122 SUBTRACET(seastore_tm, "removing {} from write_set", *this, *ref);
123 write_set.erase(*ref);
124 ref->set_paddr(final_addr);
125 assert(!ref->get_paddr().is_null());
126 assert(!ref->is_inline());
127 ool_block_list.push_back(ref);
128 SUBTRACET(seastore_tm, "adding {} to write_set", *this, *ref);
129 write_set.insert(*ref);
130 }
131
132 void add_mutated_extent(CachedExtentRef ref) {
133 LOG_PREFIX(Transaction::add_mutated_extent);
134 ceph_assert(!is_weak());
135 assert(read_set.count(ref->prior_instance->get_paddr()));
136 mutated_block_list.push_back(ref);
137 SUBTRACET(seastore_tm, "adding {} to write_set", *this, *ref);
138 write_set.insert(*ref);
139 }
140
141 void replace_placeholder(CachedExtent& placeholder, CachedExtent& extent) {
142 ceph_assert(!is_weak());
143
144 assert(placeholder.get_type() == extent_types_t::RETIRED_PLACEHOLDER);
145 assert(extent.get_type() != extent_types_t::RETIRED_PLACEHOLDER);
146 assert(extent.get_type() != extent_types_t::ROOT);
147 assert(extent.get_paddr() == placeholder.get_paddr());
148 {
149 auto where = read_set.find(placeholder.get_paddr());
150 assert(where != read_set.end());
151 assert(where->ref.get() == &placeholder);
152 where = read_set.erase(where);
153 read_set.emplace_hint(where, this, &extent);
154 }
155 {
156 auto where = retired_set.find(&placeholder);
157 assert(where != retired_set.end());
158 assert(where->get() == &placeholder);
159 where = retired_set.erase(where);
160 retired_set.emplace_hint(where, &extent);
161 }
162 }
163
164 void mark_segment_to_release(segment_id_t segment) {
165 assert(to_release == NULL_SEG_ID);
166 to_release = segment;
167 }
168
169 segment_id_t get_segment_to_release() const {
170 return to_release;
171 }
172
173 auto& get_delayed_alloc_list() {
174 return delayed_alloc_list;
175 }
176
177 const auto &get_mutated_block_list() {
178 return mutated_block_list;
179 }
180
181 const auto &get_retired_set() {
182 return retired_set;
183 }
184
185 template <typename F>
186 auto for_each_fresh_block(F &&f) const {
187 std::for_each(ool_block_list.begin(), ool_block_list.end(), f);
188 std::for_each(inline_block_list.begin(), inline_block_list.end(), f);
189 }
190
191 struct io_stat_t {
192 uint64_t num = 0;
193 uint64_t bytes = 0;
194
195 bool is_clear() const {
196 return (num == 0 && bytes == 0);
197 }
198 };
199 const io_stat_t& get_fresh_block_stats() const {
200 return fresh_block_stats;
201 }
202
203 size_t get_allocation_size() const {
204 size_t ret = 0;
205 for_each_fresh_block([&ret](auto &e) { ret += e->get_length(); });
206 return ret;
207 }
208
209 enum class src_t : uint8_t {
210 MUTATE = 0,
211 READ, // including weak and non-weak read transactions
212 CLEANER_TRIM,
213 CLEANER_RECLAIM,
214 MAX
215 };
216 static constexpr auto SRC_MAX = static_cast<std::size_t>(src_t::MAX);
217 src_t get_src() const {
218 return src;
219 }
220
221 bool is_weak() const {
222 return weak;
223 }
224
225 void test_set_conflict() {
226 conflicted = true;
227 }
228
229 bool is_conflicted() const {
230 return conflicted;
231 }
232
233 auto &get_handle() {
234 return handle;
235 }
236
237 Transaction(
238 OrderingHandle &&handle,
239 bool weak,
240 src_t src,
241 journal_seq_t initiated_after,
242 on_destruct_func_t&& f
243 ) : weak(weak),
244 handle(std::move(handle)),
245 on_destruct(std::move(f)),
246 src(src)
247 {}
248
249 void invalidate_clear_write_set() {
250 for (auto &&i: write_set) {
251 i.state = CachedExtent::extent_state_t::INVALID;
252 }
253 write_set.clear();
254 }
255
256 ~Transaction() {
257 on_destruct(*this);
258 invalidate_clear_write_set();
259 }
260
261 friend class crimson::os::seastore::SeaStore;
262 friend class TransactionConflictCondition;
263
264 void reset_preserve_handle(journal_seq_t initiated_after) {
265 root.reset();
266 offset = 0;
267 delayed_temp_offset = 0;
268 read_set.clear();
269 invalidate_clear_write_set();
270 mutated_block_list.clear();
271 fresh_block_stats = {};
272 num_delayed_invalid_extents = 0;
273 delayed_alloc_list.clear();
274 inline_block_list.clear();
275 ool_block_list.clear();
276 retired_set.clear();
277 onode_tree_stats = {};
278 lba_tree_stats = {};
279 ool_write_stats = {};
280 to_release = NULL_SEG_ID;
281 conflicted = false;
282 if (!has_reset) {
283 has_reset = true;
284 }
285 }
286
287 bool did_reset() const {
288 return has_reset;
289 }
290
291 struct tree_stats_t {
292 uint64_t depth = 0;
293 uint64_t num_inserts = 0;
294 uint64_t num_erases = 0;
295
296 bool is_clear() const {
297 return (depth == 0 &&
298 num_inserts == 0 &&
299 num_erases == 0);
300 }
301 };
302 tree_stats_t& get_onode_tree_stats() {
303 return onode_tree_stats;
304 }
305 tree_stats_t& get_lba_tree_stats() {
306 return lba_tree_stats;
307 }
308 void add_rbm_alloc_info_blocks(rbm_alloc_delta_t &d) {
309 rbm_alloc_info_blocks.push_back(d);
310 }
311 void clear_rbm_alloc_info_blocks() {
312 if (!rbm_alloc_info_blocks.empty()) {
313 rbm_alloc_info_blocks.clear();
314 }
315 }
316 const auto &get_rbm_alloc_info_blocks() {
317 return rbm_alloc_info_blocks;
318 }
319
320 struct ool_write_stats_t {
321 io_stat_t extents;
322 uint64_t header_raw_bytes = 0;
323 uint64_t header_bytes = 0;
324 uint64_t data_bytes = 0;
325 uint64_t num_records = 0;
326
327 bool is_clear() const {
328 return (extents.is_clear() &&
329 header_raw_bytes == 0 &&
330 header_bytes == 0 &&
331 data_bytes == 0 &&
332 num_records == 0);
333 }
334 };
335 ool_write_stats_t& get_ool_write_stats() {
336 return ool_write_stats;
337 }
338
339 void increment_delayed_invalid_extents() {
340 ++num_delayed_invalid_extents;
341 }
342
343 private:
344 friend class Cache;
345 friend Ref make_test_transaction();
346
347 /**
348 * If set, *this may not be used to perform writes and will not provide
349 * consistentency allowing operations using to avoid maintaining a read_set.
350 */
351 const bool weak;
352
353 RootBlockRef root; ///< ref to root if read or written by transaction
354
355 segment_off_t offset = 0; ///< relative offset of next block
356 segment_off_t delayed_temp_offset = 0;
357
358 /**
359 * read_set
360 *
361 * Holds a reference (with a refcount) to every extent read via *this.
362 * Submitting a transaction mutating any contained extent/addr will
363 * invalidate *this.
364 */
365 read_set_t<Transaction> read_set; ///< set of extents read by paddr
366
367 /**
368 * write_set
369 *
370 * Contains a reference (without a refcount) to every extent mutated
371 * as part of *this. No contained extent may be referenced outside
372 * of *this. Every contained extent will be in one of inline_block_list,
373 * ool_block_list, mutated_block_list, or delayed_alloc_list.
374 */
375 ExtentIndex write_set;
376
377 /**
378 * lists of fresh blocks, holds refcounts, subset of write_set
379 */
380 io_stat_t fresh_block_stats;
381 uint64_t num_delayed_invalid_extents = 0;
382 /// blocks that will be committed with journal record inline
383 std::list<CachedExtentRef> inline_block_list;
384 /// blocks that will be committed with out-of-line record
385 std::list<CachedExtentRef> ool_block_list;
386 /// blocks with delayed allocation, may become inline or ool above
387 std::list<LogicalCachedExtentRef> delayed_alloc_list;
388
389 /// list of mutated blocks, holds refcounts, subset of write_set
390 std::list<CachedExtentRef> mutated_block_list;
391
392 /**
393 * retire_set
394 *
395 * Set of extents retired by *this.
396 */
397 pextent_set_t retired_set;
398
399 /// stats to collect when commit or invalidate
400 tree_stats_t onode_tree_stats;
401 tree_stats_t lba_tree_stats;
402 ool_write_stats_t ool_write_stats;
403
404 ///< if != NULL_SEG_ID, release this segment after completion
405 segment_id_t to_release = NULL_SEG_ID;
406
407 bool conflicted = false;
408
409 bool has_reset = false;
410
411 OrderingHandle handle;
412
413 on_destruct_func_t on_destruct;
414
415 const src_t src;
416
417 std::vector<rbm_alloc_delta_t> rbm_alloc_info_blocks;
418 };
419 using TransactionRef = Transaction::Ref;
420
421 inline std::ostream& operator<<(std::ostream& os,
422 const Transaction::src_t& src) {
423 switch (src) {
424 case Transaction::src_t::MUTATE:
425 return os << "MUTATE";
426 case Transaction::src_t::READ:
427 return os << "READ";
428 case Transaction::src_t::CLEANER_TRIM:
429 return os << "CLEANER_TRIM";
430 case Transaction::src_t::CLEANER_RECLAIM:
431 return os << "CLEANER_RECLAIM";
432 default:
433 ceph_abort("impossible");
434 }
435 }
436
437 /// Should only be used with dummy staged-fltree node extent manager
438 inline TransactionRef make_test_transaction() {
439 return std::make_unique<Transaction>(
440 get_dummy_ordering_handle(),
441 false,
442 Transaction::src_t::MUTATE,
443 journal_seq_t{},
444 [](Transaction&) {}
445 );
446 }
447
448 struct TransactionConflictCondition {
449 class transaction_conflict final : public std::exception {
450 public:
451 const char* what() const noexcept final {
452 return "transaction conflict detected";
453 }
454 };
455
456 public:
457 TransactionConflictCondition(Transaction &t) : t(t) {}
458
459 template <typename Fut>
460 std::pair<bool, std::optional<Fut>> may_interrupt() {
461 if (t.conflicted) {
462 return {
463 true,
464 seastar::futurize<Fut>::make_exception_future(
465 transaction_conflict())};
466 } else {
467 return {false, std::optional<Fut>()};
468 }
469 }
470
471 template <typename T>
472 static constexpr bool is_interruption_v =
473 std::is_same_v<T, transaction_conflict>;
474
475
476 static bool is_interruption(std::exception_ptr& eptr) {
477 return *eptr.__cxa_exception_type() == typeid(transaction_conflict);
478 }
479
480 private:
481 Transaction &t;
482 };
483
484 using trans_intr = crimson::interruptible::interruptor<
485 TransactionConflictCondition
486 >;
487
488 template <typename E>
489 using trans_iertr =
490 crimson::interruptible::interruptible_errorator<
491 TransactionConflictCondition,
492 E
493 >;
494
495 template <typename F, typename... Args>
496 auto with_trans_intr(Transaction &t, F &&f, Args&&... args) {
497 return trans_intr::with_interruption_to_error<crimson::ct_error::eagain>(
498 std::move(f),
499 TransactionConflictCondition(t),
500 t,
501 std::forward<Args>(args)...);
502 }
503
504 template <typename T>
505 using with_trans_ertr = typename T::base_ertr::template extend<crimson::ct_error::eagain>;
506
507 }