1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/denc.h"
5 #include "include/intarith.h"
7 #include "crimson/os/seastore/logging.h"
8 #include "crimson/os/seastore/transaction_manager.h"
9 #include "crimson/os/seastore/segment_manager.h"
10 #include "crimson/os/seastore/journal.h"
12 SET_SUBSYS(seastore_tm
);
14 namespace crimson::os::seastore
{
16 TransactionManager::TransactionManager(
17 SegmentManager
&_segment_manager
,
18 SegmentCleanerRef _segment_cleaner
,
21 LBAManagerRef _lba_manager
,
22 ExtentPlacementManagerRef
&& epm
,
23 ExtentReader
& scanner
)
24 : segment_manager(_segment_manager
),
25 segment_cleaner(std::move(_segment_cleaner
)),
26 cache(std::move(_cache
)),
27 lba_manager(std::move(_lba_manager
)),
28 journal(std::move(_journal
)),
32 segment_cleaner
->set_extent_callback(this);
33 journal
->set_write_pipeline(&write_pipeline
);
37 TransactionManager::mkfs_ertr::future
<> TransactionManager::mkfs()
39 LOG_PREFIX(TransactionManager::mkfs
);
40 segment_cleaner
->mount(
41 segment_manager
.get_device_id(),
42 scanner
.get_segment_managers());
43 return journal
->open_for_write().safe_then([this, FNAME
](auto addr
) {
44 DEBUG("about to do_with");
45 segment_cleaner
->init_mkfs(addr
);
46 return with_transaction_intr(
47 Transaction::src_t::MUTATE
,
49 [this, FNAME
](auto& t
)
51 DEBUGT("about to cache->mkfs", t
);
54 ).si_then([this, &t
] {
55 return lba_manager
->mkfs(t
);
56 }).si_then([this, FNAME
, &t
] {
57 DEBUGT("about to submit_transaction", t
);
58 return submit_transaction_direct(t
);
61 crimson::ct_error::eagain::handle([] {
62 ceph_assert(0 == "eagain impossible");
63 return mkfs_ertr::now();
65 mkfs_ertr::pass_further
{}
72 TransactionManager::mount_ertr::future
<> TransactionManager::mount()
74 LOG_PREFIX(TransactionManager::mount
);
76 segment_cleaner
->mount(
77 segment_manager
.get_device_id(),
78 scanner
.get_segment_managers());
79 return segment_cleaner
->init_segments().safe_then(
80 [this](auto&& segments
) {
81 return journal
->replay(
83 [this](const auto &offsets
, const auto &e
) {
84 auto start_seq
= offsets
.write_result
.start_seq
;
85 segment_cleaner
->update_journal_tail_target(
86 cache
->get_oldest_dirty_from().value_or(start_seq
));
87 return cache
->replay_delta(
89 offsets
.record_block_base
,
93 return journal
->open_for_write();
94 }).safe_then([this, FNAME
](auto addr
) {
95 segment_cleaner
->set_journal_head(addr
);
96 return seastar::do_with(
97 create_weak_transaction(
98 Transaction::src_t::READ
, "mount"),
99 [this, FNAME
](auto &tref
) {
100 return with_trans_intr(
102 [this, FNAME
](auto &t
) {
103 return cache
->init_cached_extents(t
, [this](auto &t
, auto &e
) {
104 return lba_manager
->init_cached_extent(t
, e
);
105 }).si_then([this, FNAME
, &t
] {
106 assert(segment_cleaner
->debug_check_space(
107 *segment_cleaner
->get_empty_space_tracker()));
108 return lba_manager
->scan_mapped_space(
110 [this, FNAME
, &t
](paddr_t addr
, extent_len_t len
) {
112 "marking {}~{} used",
116 if (addr
.is_real()) {
117 segment_cleaner
->mark_space_used(
120 /* init_scan = */ true);
126 }).safe_then([this] {
127 segment_cleaner
->complete_init();
129 mount_ertr::pass_further
{},
130 crimson::ct_error::all_same_way([] {
131 ceph_assert(0 == "unhandled error");
132 return mount_ertr::now();
136 TransactionManager::close_ertr::future
<> TransactionManager::close() {
137 LOG_PREFIX(TransactionManager::close
);
139 return segment_cleaner
->stop(
141 return cache
->close();
142 }).safe_then([this] {
143 cache
->dump_contents();
144 return journal
->close();
145 }).safe_then([FNAME
] {
147 return seastar::now();
151 TransactionManager::ref_ret
TransactionManager::inc_ref(
153 LogicalCachedExtentRef
&ref
)
155 return lba_manager
->incref_extent(t
, ref
->get_laddr()).si_then([](auto r
) {
157 }).handle_error_interruptible(
158 ref_iertr::pass_further
{},
159 ct_error::all_same_way([](auto e
) {
160 ceph_assert(0 == "unhandled error, TODO");
164 TransactionManager::ref_ret
TransactionManager::inc_ref(
168 return lba_manager
->incref_extent(t
, offset
).si_then([](auto result
) {
169 return result
.refcount
;
173 TransactionManager::ref_ret
TransactionManager::dec_ref(
175 LogicalCachedExtentRef
&ref
)
177 LOG_PREFIX(TransactionManager::dec_ref
);
178 return lba_manager
->decref_extent(t
, ref
->get_laddr()
179 ).si_then([this, FNAME
, &t
, ref
](auto ret
) {
180 if (ret
.refcount
== 0) {
182 "extent {} refcount 0",
185 cache
->retire_extent(t
, ref
);
186 stats
.extents_retired_total
++;
187 stats
.extents_retired_bytes
+= ref
->get_length();
193 TransactionManager::ref_ret
TransactionManager::dec_ref(
197 LOG_PREFIX(TransactionManager::dec_ref
);
198 return lba_manager
->decref_extent(t
, offset
199 ).si_then([this, FNAME
, offset
, &t
](auto result
) -> ref_ret
{
200 if (result
.refcount
== 0 && !result
.addr
.is_zero()) {
201 DEBUGT("offset {} refcount 0", t
, offset
);
202 return cache
->retire_extent_addr(
203 t
, result
.addr
, result
.length
204 ).si_then([result
, this] {
205 stats
.extents_retired_total
++;
206 stats
.extents_retired_bytes
+= result
.length
;
208 interruptible::ready_future_marker
{},
213 interruptible::ready_future_marker
{},
219 TransactionManager::refs_ret
TransactionManager::dec_ref(
221 std::vector
<laddr_t
> offsets
)
223 return seastar::do_with(std::move(offsets
), std::vector
<unsigned>(),
224 [this, &t
] (auto &&offsets
, auto &refcnt
) {
225 return trans_intr::do_for_each(offsets
.begin(), offsets
.end(),
226 [this, &t
, &refcnt
] (auto &laddr
) {
227 return this->dec_ref(t
, laddr
).si_then([&refcnt
] (auto ref
) {
228 refcnt
.push_back(ref
);
229 return ref_iertr::now();
231 }).si_then([&refcnt
] {
232 return ref_iertr::make_ready_future
<std::vector
<unsigned>>(std::move(refcnt
));
237 TransactionManager::submit_transaction_iertr::future
<>
238 TransactionManager::submit_transaction(
241 LOG_PREFIX(TransactionManager::submit_transaction
);
242 return trans_intr::make_interruptible(
243 t
.get_handle().enter(write_pipeline
.reserve_projected_usage
)
244 ).then_interruptible([this, FNAME
, &t
] {
245 size_t projected_usage
= t
.get_allocation_size();
246 DEBUGT("waiting for projected_usage: {}", t
, projected_usage
);
247 return trans_intr::make_interruptible(
248 segment_cleaner
->reserve_projected_usage(projected_usage
)
249 ).then_interruptible([this, &t
] {
250 return submit_transaction_direct(t
);
251 }).finally([this, FNAME
, projected_usage
, &t
] {
252 DEBUGT("releasing projected_usage: {}", t
, projected_usage
);
253 segment_cleaner
->release_projected_usage(projected_usage
);
258 TransactionManager::submit_transaction_direct_ret
259 TransactionManager::submit_transaction_direct(
262 LOG_PREFIX(TransactionManager::submit_transaction_direct
);
263 DEBUGT("about to alloc delayed extents", tref
);
265 return trans_intr::make_interruptible(
266 tref
.get_handle().enter(write_pipeline
.ool_writes
)
267 ).then_interruptible([this, &tref
] {
268 return epm
->delayed_alloc_or_ool_write(tref
269 ).handle_error_interruptible(
270 crimson::ct_error::input_output_error::pass_further(),
271 crimson::ct_error::assert_all("invalid error")
273 }).si_then([this, FNAME
, &tref
] {
274 DEBUGT("about to prepare", tref
);
275 return tref
.get_handle().enter(write_pipeline
.prepare
);
276 }).si_then([this, FNAME
, &tref
]() mutable
277 -> submit_transaction_iertr::future
<> {
278 auto record
= cache
->prepare_record(tref
);
280 tref
.get_handle().maybe_release_collection_lock();
282 DEBUGT("about to submit to journal", tref
);
284 return journal
->submit_record(std::move(record
), tref
.get_handle()
285 ).safe_then([this, FNAME
, &tref
](auto submit_result
) mutable {
286 auto start_seq
= submit_result
.write_result
.start_seq
;
287 auto end_seq
= submit_result
.write_result
.get_end_seq();
288 DEBUGT("journal commit to record_block_base={}, start_seq={}, end_seq={}",
290 submit_result
.record_block_base
,
293 segment_cleaner
->set_journal_head(end_seq
);
294 cache
->complete_commit(
296 submit_result
.record_block_base
,
298 segment_cleaner
.get());
299 lba_manager
->complete_transaction(tref
);
300 segment_cleaner
->update_journal_tail_target(
301 cache
->get_oldest_dirty_from().value_or(start_seq
));
302 auto to_release
= tref
.get_segment_to_release();
303 if (to_release
!= NULL_SEG_ID
) {
304 return segment_manager
.release(to_release
305 ).safe_then([this, to_release
] {
306 segment_cleaner
->mark_segment_released(to_release
);
309 return SegmentManager::release_ertr::now();
311 }).safe_then([&tref
] {
312 return tref
.get_handle().complete();
314 submit_transaction_iertr::pass_further
{},
315 crimson::ct_error::all_same_way([](auto e
) {
316 ceph_assert(0 == "Hit error submitting to journal");
319 }).finally([&tref
]() {
320 tref
.get_handle().exit();
324 TransactionManager::get_next_dirty_extents_ret
325 TransactionManager::get_next_dirty_extents(
330 return cache
->get_next_dirty_extents(t
, seq
, max_bytes
);
333 TransactionManager::rewrite_extent_ret
334 TransactionManager::rewrite_logical_extent(
336 LogicalCachedExtentRef extent
)
338 LOG_PREFIX(TransactionManager::rewrite_logical_extent
);
339 if (extent
->has_been_invalidated()) {
340 ERRORT("{} has been invalidated", t
, *extent
);
342 assert(!extent
->has_been_invalidated());
343 DEBUGT("rewriting {}", t
, *extent
);
345 auto lextent
= extent
->cast
<LogicalCachedExtent
>();
346 cache
->retire_extent(t
, extent
);
347 auto nlextent
= epm
->alloc_new_extent_by_type(
350 lextent
->get_length(),
351 placement_hint_t::REWRITE
)->cast
<LogicalCachedExtent
>();
352 lextent
->get_bptr().copy_out(
354 lextent
->get_length(),
355 nlextent
->get_bptr().c_str());
356 nlextent
->set_laddr(lextent
->get_laddr());
357 nlextent
->set_pin(lextent
->get_pin().duplicate());
360 "rewriting {} into {}",
365 /* This update_mapping is, strictly speaking, unnecessary for delayed_alloc
366 * extents since we're going to do it again once we either do the ool write
367 * or allocate a relative inline addr. TODO: refactor SegmentCleaner to
368 * avoid this complication. */
369 return lba_manager
->update_mapping(
371 lextent
->get_laddr(),
372 lextent
->get_paddr(),
373 nlextent
->get_paddr());
376 TransactionManager::rewrite_extent_ret
TransactionManager::rewrite_extent(
378 CachedExtentRef extent
)
380 LOG_PREFIX(TransactionManager::rewrite_extent
);
382 auto updated
= cache
->update_extent_from_transaction(t
, extent
);
384 DEBUGT("{} is already retired, skipping", t
, *extent
);
385 return rewrite_extent_iertr::now();
390 if (extent
->get_type() == extent_types_t::ROOT
) {
391 DEBUGT("marking root {} for rewrite", t
, *extent
);
392 cache
->duplicate_for_write(t
, extent
);
393 return rewrite_extent_iertr::now();
396 if (extent
->is_logical()) {
397 return rewrite_logical_extent(t
, extent
->cast
<LogicalCachedExtent
>());
399 return lba_manager
->rewrite_extent(t
, extent
);
403 TransactionManager::get_extent_if_live_ret
TransactionManager::get_extent_if_live(
410 LOG_PREFIX(TransactionManager::get_extent_if_live
);
411 DEBUGT("type {}, addr {}, laddr {}, len {}", t
, type
, addr
, laddr
, len
);
413 return cache
->get_extent_if_cached(t
, addr
, type
414 ).si_then([this, FNAME
, &t
, type
, addr
, laddr
, len
](auto extent
)
415 -> get_extent_if_live_ret
{
417 return get_extent_if_live_ret (
418 interruptible::ready_future_marker
{},
422 if (is_logical_type(type
)) {
423 using inner_ret
= LBAManager::get_mapping_iertr::future
<CachedExtentRef
>;
424 return lba_manager
->get_mapping(
426 laddr
).si_then([=, &t
] (LBAPinRef pin
) -> inner_ret
{
427 ceph_assert(pin
->get_laddr() == laddr
);
428 if (pin
->get_paddr() == addr
) {
429 if (pin
->get_length() != (extent_len_t
)len
) {
431 "Invalid pin laddr {} paddr {} len {} found for "
432 "extent laddr {} len{}",
440 ceph_assert(pin
->get_length() == (extent_len_t
)len
);
441 return cache
->get_extent_by_type(
447 [this, pin
=std::move(pin
)](CachedExtent
&extent
) mutable {
448 auto lref
= extent
.cast
<LogicalCachedExtent
>();
449 assert(!lref
->has_pin());
450 assert(!lref
->has_been_invalidated());
451 assert(!pin
->has_been_invalidated());
452 lref
->set_pin(std::move(pin
));
453 lba_manager
->add_pin(lref
->get_pin());
457 interruptible::ready_future_marker
{},
460 }).handle_error_interruptible(crimson::ct_error::enoent::handle([] {
461 return CachedExtentRef();
462 }), crimson::ct_error::pass_further_all
{});
464 DEBUGT("non-logical extent {}", t
, addr
);
465 return lba_manager
->get_physical_extent_if_live(
475 TransactionManager::~TransactionManager() {}
477 void TransactionManager::register_metrics()
479 namespace sm
= seastar::metrics
;
480 metrics
.add_group("tm", {
481 sm::make_counter("extents_retired_total", stats
.extents_retired_total
,
482 sm::description("total number of retired extents in TransactionManager")),
483 sm::make_counter("extents_retired_bytes", stats
.extents_retired_bytes
,
484 sm::description("total size of retired extents in TransactionManager")),
485 sm::make_counter("extents_mutated_total", stats
.extents_mutated_total
,
486 sm::description("total number of mutated extents in TransactionManager")),
487 sm::make_counter("extents_mutated_bytes", stats
.extents_mutated_bytes
,
488 sm::description("total size of mutated extents in TransactionManager")),
489 sm::make_counter("extents_allocated_total", stats
.extents_allocated_total
,
490 sm::description("total number of allocated extents in TransactionManager")),
491 sm::make_counter("extents_allocated_bytes", stats
.extents_allocated_bytes
,
492 sm::description("total size of allocated extents in TransactionManager")),