1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab expandtab
4 #include "include/denc.h"
5 #include "include/intarith.h"
7 #include "crimson/os/seastore/logging.h"
8 #include "crimson/os/seastore/transaction_manager.h"
9 #include "crimson/os/seastore/journal.h"
10 #include "crimson/os/seastore/journal/circular_bounded_journal.h"
11 #include "crimson/os/seastore/lba_manager/btree/lba_btree_node.h"
12 #include "crimson/os/seastore/random_block_manager/rbm_device.h"
15 * TransactionManager logs
18 * - INFO: major initiation, closing operations
19 * - DEBUG: major extent related operations, INFO details
20 * - TRACE: DEBUG details
23 SET_SUBSYS(seastore_tm
);
25 namespace crimson::os::seastore
{
27 TransactionManager::TransactionManager(
30 LBAManagerRef _lba_manager
,
31 ExtentPlacementManagerRef
&&_epm
,
32 BackrefManagerRef
&& _backref_manager
)
33 : cache(std::move(_cache
)),
34 lba_manager(std::move(_lba_manager
)),
35 journal(std::move(_journal
)),
37 backref_manager(std::move(_backref_manager
))
39 epm
->set_extent_callback(this);
40 journal
->set_write_pipeline(&write_pipeline
);
43 TransactionManager::mkfs_ertr::future
<> TransactionManager::mkfs()
45 LOG_PREFIX(TransactionManager::mkfs
);
49 return journal
->open_for_mkfs();
50 }).safe_then([this](auto start_seq
) {
51 journal
->get_trimmer().update_journal_tails(start_seq
, start_seq
);
52 journal
->get_trimmer().set_journal_head(start_seq
);
53 return epm
->open_for_write();
54 }).safe_then([this, FNAME
]() {
55 return with_transaction_intr(
56 Transaction::src_t::MUTATE
,
58 [this, FNAME
](auto& t
)
62 ).si_then([this, &t
] {
63 return lba_manager
->mkfs(t
);
64 }).si_then([this, &t
] {
65 return backref_manager
->mkfs(t
);
66 }).si_then([this, FNAME
, &t
] {
67 INFOT("submitting mkfs transaction", t
);
68 return submit_transaction_direct(t
);
71 crimson::ct_error::eagain::handle([] {
72 ceph_assert(0 == "eagain impossible");
73 return mkfs_ertr::now();
75 mkfs_ertr::pass_further
{}
79 }).safe_then([FNAME
] {
84 TransactionManager::mount_ertr::future
<> TransactionManager::mount()
86 LOG_PREFIX(TransactionManager::mount
);
91 return journal
->replay(
95 const journal_seq_t
&dirty_tail
,
96 const journal_seq_t
&alloc_tail
,
97 sea_time_point modify_time
)
99 auto start_seq
= offsets
.write_result
.start_seq
;
100 return cache
->replay_delta(
102 offsets
.record_block_base
,
108 }).safe_then([this] {
109 return journal
->open_for_mount();
110 }).safe_then([this](auto start_seq
) {
111 journal
->get_trimmer().set_journal_head(start_seq
);
112 return with_transaction_weak(
116 return cache
->init_cached_extents(t
, [this](auto &t
, auto &e
) {
117 if (is_backref_node(e
->get_type())) {
118 return backref_manager
->init_cached_extent(t
, e
);
120 return lba_manager
->init_cached_extent(t
, e
);
122 }).si_then([this, &t
] {
123 epm
->start_scan_space();
124 return backref_manager
->scan_mapped_space(
132 if (is_backref_node(type
)) {
133 assert(laddr
== L_ADDR_NULL
);
134 assert(backref_key
!= P_ADDR_NULL
);
135 backref_manager
->cache_new_backref_extent(paddr
, backref_key
, type
);
136 cache
->update_tree_extents_num(type
, 1);
137 epm
->mark_space_used(paddr
, len
);
138 } else if (laddr
== L_ADDR_NULL
) {
139 assert(backref_key
== P_ADDR_NULL
);
140 cache
->update_tree_extents_num(type
, -1);
141 epm
->mark_space_free(paddr
, len
);
143 assert(backref_key
== P_ADDR_NULL
);
144 cache
->update_tree_extents_num(type
, 1);
145 epm
->mark_space_used(paddr
, len
);
150 }).safe_then([this] {
151 return epm
->open_for_write();
152 }).safe_then([FNAME
, this] {
153 epm
->start_background();
156 mount_ertr::pass_further
{},
157 crimson::ct_error::all_same_way([] {
158 ceph_assert(0 == "unhandled error");
159 return mount_ertr::now();
164 TransactionManager::close_ertr::future
<> TransactionManager::close() {
165 LOG_PREFIX(TransactionManager::close
);
167 return epm
->stop_background(
169 return cache
->close();
170 }).safe_then([this] {
171 cache
->dump_contents();
172 return journal
->close();
173 }).safe_then([this] {
175 }).safe_then([FNAME
] {
177 return seastar::now();
181 TransactionManager::ref_ret
TransactionManager::inc_ref(
183 LogicalCachedExtentRef
&ref
)
185 LOG_PREFIX(TransactionManager::inc_ref
);
186 TRACET("{}", t
, *ref
);
187 return lba_manager
->incref_extent(t
, ref
->get_laddr()
188 ).si_then([FNAME
, ref
, &t
](auto result
) {
189 DEBUGT("extent refcount is incremented to {} -- {}",
190 t
, result
.refcount
, *ref
);
191 return result
.refcount
;
192 }).handle_error_interruptible(
193 ref_iertr::pass_further
{},
194 ct_error::all_same_way([](auto e
) {
195 ceph_assert(0 == "unhandled error, TODO");
199 TransactionManager::ref_ret
TransactionManager::inc_ref(
203 LOG_PREFIX(TransactionManager::inc_ref
);
204 TRACET("{}", t
, offset
);
205 return lba_manager
->incref_extent(t
, offset
206 ).si_then([FNAME
, offset
, &t
](auto result
) {
207 DEBUGT("extent refcount is incremented to {} -- {}~{}, {}",
208 t
, result
.refcount
, offset
, result
.length
, result
.addr
);
209 return result
.refcount
;
213 TransactionManager::ref_ret
TransactionManager::dec_ref(
215 LogicalCachedExtentRef
&ref
)
217 LOG_PREFIX(TransactionManager::dec_ref
);
218 TRACET("{}", t
, *ref
);
219 return lba_manager
->decref_extent(t
, ref
->get_laddr()
220 ).si_then([this, FNAME
, &t
, ref
](auto result
) {
221 DEBUGT("extent refcount is decremented to {} -- {}",
222 t
, result
.refcount
, *ref
);
223 if (result
.refcount
== 0) {
224 cache
->retire_extent(t
, ref
);
226 return result
.refcount
;
230 TransactionManager::ref_ret
TransactionManager::dec_ref(
234 LOG_PREFIX(TransactionManager::dec_ref
);
235 TRACET("{}", t
, offset
);
236 return lba_manager
->decref_extent(t
, offset
237 ).si_then([this, FNAME
, offset
, &t
](auto result
) -> ref_ret
{
238 DEBUGT("extent refcount is decremented to {} -- {}~{}, {}",
239 t
, result
.refcount
, offset
, result
.length
, result
.addr
);
240 if (result
.refcount
== 0 && !result
.addr
.is_zero()) {
241 return cache
->retire_extent_addr(
242 t
, result
.addr
, result
.length
245 interruptible::ready_future_marker
{},
250 interruptible::ready_future_marker
{},
256 TransactionManager::refs_ret
TransactionManager::dec_ref(
258 std::vector
<laddr_t
> offsets
)
260 LOG_PREFIX(TransactionManager::dec_ref
);
261 DEBUG("{} offsets", offsets
.size());
262 return seastar::do_with(std::move(offsets
), std::vector
<unsigned>(),
263 [this, &t
] (auto &&offsets
, auto &refcnt
) {
264 return trans_intr::do_for_each(offsets
.begin(), offsets
.end(),
265 [this, &t
, &refcnt
] (auto &laddr
) {
266 return this->dec_ref(t
, laddr
).si_then([&refcnt
] (auto ref
) {
267 refcnt
.push_back(ref
);
268 return ref_iertr::now();
270 }).si_then([&refcnt
] {
271 return ref_iertr::make_ready_future
<std::vector
<unsigned>>(std::move(refcnt
));
276 TransactionManager::submit_transaction_iertr::future
<>
277 TransactionManager::submit_transaction(
280 LOG_PREFIX(TransactionManager::submit_transaction
);
281 SUBTRACET(seastore_t
, "start", t
);
282 return trans_intr::make_interruptible(
283 t
.get_handle().enter(write_pipeline
.reserve_projected_usage
)
284 ).then_interruptible([this, FNAME
, &t
] {
285 auto dispatch_result
= epm
->dispatch_delayed_extents(t
);
286 auto projected_usage
= dispatch_result
.usage
;
287 SUBTRACET(seastore_t
, "waiting for projected_usage: {}", t
, projected_usage
);
288 return trans_intr::make_interruptible(
289 epm
->reserve_projected_usage(projected_usage
)
290 ).then_interruptible([this, &t
, dispatch_result
= std::move(dispatch_result
)] {
291 return do_submit_transaction(t
, std::move(dispatch_result
));
292 }).finally([this, FNAME
, projected_usage
, &t
] {
293 SUBTRACET(seastore_t
, "releasing projected_usage: {}", t
, projected_usage
);
294 epm
->release_projected_usage(projected_usage
);
299 TransactionManager::submit_transaction_direct_ret
300 TransactionManager::submit_transaction_direct(
302 std::optional
<journal_seq_t
> trim_alloc_to
)
304 return do_submit_transaction(
306 epm
->dispatch_delayed_extents(tref
),
310 TransactionManager::submit_transaction_direct_ret
311 TransactionManager::do_submit_transaction(
313 ExtentPlacementManager::dispatch_result_t dispatch_result
,
314 std::optional
<journal_seq_t
> trim_alloc_to
)
316 LOG_PREFIX(TransactionManager::do_submit_transaction
);
317 SUBTRACET(seastore_t
, "start", tref
);
318 return trans_intr::make_interruptible(
319 tref
.get_handle().enter(write_pipeline
.ool_writes
)
320 ).then_interruptible([this, FNAME
, &tref
,
321 dispatch_result
= std::move(dispatch_result
)] {
322 return seastar::do_with(std::move(dispatch_result
),
323 [this, FNAME
, &tref
](auto &dispatch_result
) {
324 return epm
->write_delayed_ool_extents(tref
, dispatch_result
.alloc_map
325 ).si_then([this, FNAME
, &tref
, &dispatch_result
] {
326 SUBTRACET(seastore_t
, "update delayed extent mappings", tref
);
327 return lba_manager
->update_mappings(tref
, dispatch_result
.delayed_extents
);
328 }).handle_error_interruptible(
329 crimson::ct_error::input_output_error::pass_further(),
330 crimson::ct_error::assert_all("invalid error")
333 }).si_then([this, FNAME
, &tref
] {
334 auto allocated_extents
= tref
.get_valid_pre_alloc_list();
335 auto num_extents
= allocated_extents
.size();
336 SUBTRACET(seastore_t
, "process {} allocated extents", tref
, num_extents
);
337 return epm
->write_preallocated_ool_extents(tref
, allocated_extents
338 ).handle_error_interruptible(
339 crimson::ct_error::input_output_error::pass_further(),
340 crimson::ct_error::assert_all("invalid error")
342 }).si_then([this, FNAME
, &tref
] {
343 SUBTRACET(seastore_t
, "about to prepare", tref
);
344 return tref
.get_handle().enter(write_pipeline
.prepare
);
345 }).si_then([this, FNAME
, &tref
, trim_alloc_to
=std::move(trim_alloc_to
)]() mutable
346 -> submit_transaction_iertr::future
<> {
347 if (trim_alloc_to
&& *trim_alloc_to
!= JOURNAL_SEQ_NULL
) {
348 cache
->trim_backref_bufs(*trim_alloc_to
);
351 auto record
= cache
->prepare_record(
353 journal
->get_trimmer().get_journal_head(),
354 journal
->get_trimmer().get_dirty_tail());
356 tref
.get_handle().maybe_release_collection_lock();
358 SUBTRACET(seastore_t
, "about to submit to journal", tref
);
359 return journal
->submit_record(std::move(record
), tref
.get_handle()
360 ).safe_then([this, FNAME
, &tref
](auto submit_result
) mutable {
361 SUBDEBUGT(seastore_t
, "committed with {}", tref
, submit_result
);
362 auto start_seq
= submit_result
.write_result
.start_seq
;
363 journal
->get_trimmer().set_journal_head(start_seq
);
364 cache
->complete_commit(
366 submit_result
.record_block_base
,
369 std::vector
<CachedExtentRef
> lba_to_clear
;
370 std::vector
<CachedExtentRef
> backref_to_clear
;
371 lba_to_clear
.reserve(tref
.get_retired_set().size());
372 backref_to_clear
.reserve(tref
.get_retired_set().size());
373 for (auto &e
: tref
.get_retired_set()) {
374 if (e
->is_logical() || is_lba_node(e
->get_type()))
375 lba_to_clear
.push_back(e
);
376 else if (is_backref_node(e
->get_type()))
377 backref_to_clear
.push_back(e
);
380 journal
->get_trimmer().update_journal_tails(
381 cache
->get_oldest_dirty_from().value_or(start_seq
),
382 cache
->get_oldest_backref_dirty_from().value_or(start_seq
));
383 return journal
->finish_commit(tref
.get_src()
385 return tref
.get_handle().complete();
388 submit_transaction_iertr::pass_further
{},
389 crimson::ct_error::all_same_way([](auto e
) {
390 ceph_assert(0 == "Hit error submitting to journal");
393 }).finally([&tref
]() {
394 tref
.get_handle().exit();
398 seastar::future
<> TransactionManager::flush(OrderingHandle
&handle
)
400 LOG_PREFIX(TransactionManager::flush
);
401 SUBDEBUG(seastore_t
, "H{} start", (void*)&handle
);
402 return handle
.enter(write_pipeline
.reserve_projected_usage
403 ).then([this, &handle
] {
404 return handle
.enter(write_pipeline
.ool_writes
);
405 }).then([this, &handle
] {
406 return handle
.enter(write_pipeline
.prepare
);
407 }).then([this, &handle
] {
408 handle
.maybe_release_collection_lock();
409 return journal
->flush(handle
);
410 }).then([FNAME
, &handle
] {
411 SUBDEBUG(seastore_t
, "H{} completed", (void*)&handle
);
415 TransactionManager::get_next_dirty_extents_ret
416 TransactionManager::get_next_dirty_extents(
421 LOG_PREFIX(TransactionManager::get_next_dirty_extents
);
422 DEBUGT("max_bytes={}B, seq={}", t
, max_bytes
, seq
);
423 return cache
->get_next_dirty_extents(t
, seq
, max_bytes
);
426 TransactionManager::rewrite_extent_ret
427 TransactionManager::rewrite_logical_extent(
429 LogicalCachedExtentRef extent
)
431 LOG_PREFIX(TransactionManager::rewrite_logical_extent
);
432 if (extent
->has_been_invalidated()) {
433 ERRORT("extent has been invalidated -- {}", t
, *extent
);
436 TRACET("rewriting extent -- {}", t
, *extent
);
438 auto lextent
= extent
->cast
<LogicalCachedExtent
>();
439 cache
->retire_extent(t
, extent
);
440 auto nlextent
= cache
->alloc_new_extent_by_type(
443 lextent
->get_length(),
444 lextent
->get_user_hint(),
445 // get target rewrite generation
446 lextent
->get_rewrite_generation())->cast
<LogicalCachedExtent
>();
447 lextent
->get_bptr().copy_out(
449 lextent
->get_length(),
450 nlextent
->get_bptr().c_str());
451 nlextent
->set_laddr(lextent
->get_laddr());
452 nlextent
->set_modify_time(lextent
->get_modify_time());
454 DEBUGT("rewriting logical extent -- {} to {}", t
, *lextent
, *nlextent
);
456 /* This update_mapping is, strictly speaking, unnecessary for delayed_alloc
457 * extents since we're going to do it again once we either do the ool write
458 * or allocate a relative inline addr. TODO: refactor AsyncCleaner to
459 * avoid this complication. */
460 return lba_manager
->update_mapping(
462 lextent
->get_laddr(),
463 lextent
->get_paddr(),
464 nlextent
->get_paddr(),
468 TransactionManager::rewrite_extent_ret
TransactionManager::rewrite_extent(
470 CachedExtentRef extent
,
471 rewrite_gen_t target_generation
,
472 sea_time_point modify_time
)
474 LOG_PREFIX(TransactionManager::rewrite_extent
);
477 auto updated
= cache
->update_extent_from_transaction(t
, extent
);
479 DEBUGT("extent is already retired, skipping -- {}", t
, *extent
);
480 return rewrite_extent_iertr::now();
483 ceph_assert(!extent
->is_pending_io());
486 assert(extent
->is_valid() && !extent
->is_initial_pending());
487 if (extent
->is_dirty()) {
488 extent
->set_target_rewrite_generation(INIT_GENERATION
);
490 extent
->set_target_rewrite_generation(target_generation
);
491 ceph_assert(modify_time
!= NULL_TIME
);
492 extent
->set_modify_time(modify_time
);
495 t
.get_rewrite_version_stats().increment(extent
->get_version());
497 if (is_backref_node(extent
->get_type())) {
498 DEBUGT("rewriting backref extent -- {}", t
, *extent
);
499 return backref_manager
->rewrite_extent(t
, extent
);
502 if (extent
->get_type() == extent_types_t::ROOT
) {
503 DEBUGT("rewriting root extent -- {}", t
, *extent
);
504 cache
->duplicate_for_write(t
, extent
);
505 return rewrite_extent_iertr::now();
508 if (extent
->is_logical()) {
509 return rewrite_logical_extent(t
, extent
->cast
<LogicalCachedExtent
>());
511 DEBUGT("rewriting physical extent -- {}", t
, *extent
);
512 return lba_manager
->rewrite_extent(t
, extent
);
516 TransactionManager::get_extents_if_live_ret
517 TransactionManager::get_extents_if_live(
524 LOG_PREFIX(TransactionManager::get_extent_if_live
);
525 TRACET("{} {}~{} {}", t
, type
, laddr
, len
, paddr
);
527 // This only works with segments to check if alive,
528 // as parallel transactions may split the extent at the same time.
529 ceph_assert(paddr
.get_addr_type() == paddr_types_t::SEGMENT
);
531 return cache
->get_extent_if_cached(t
, paddr
, type
532 ).si_then([=, this, &t
](auto extent
)
533 -> get_extents_if_live_ret
{
534 if (extent
&& extent
->get_length() == len
) {
535 DEBUGT("{} {}~{} {} is live in cache -- {}",
536 t
, type
, laddr
, len
, paddr
, *extent
);
537 std::list
<CachedExtentRef
> res
;
538 res
.emplace_back(std::move(extent
));
539 return get_extents_if_live_ret(
540 interruptible::ready_future_marker
{},
544 if (is_logical_type(type
)) {
545 return lba_manager
->get_mappings(
549 ).si_then([=, this, &t
](lba_pin_list_t pin_list
) {
550 return seastar::do_with(
551 std::list
<CachedExtentRef
>(),
552 [=, this, &t
, pin_list
=std::move(pin_list
)](
553 std::list
<CachedExtentRef
> &list
) mutable
555 auto paddr_seg_id
= paddr
.as_seg_paddr().get_segment_id();
556 return trans_intr::parallel_for_each(
558 [=, this, &list
, &t
](
559 LBAMappingRef
&pin
) -> Cache::get_extent_iertr::future
<>
561 auto pin_paddr
= pin
->get_val();
562 auto &pin_seg_paddr
= pin_paddr
.as_seg_paddr();
563 auto pin_paddr_seg_id
= pin_seg_paddr
.get_segment_id();
564 auto pin_len
= pin
->get_length();
565 if (pin_paddr_seg_id
!= paddr_seg_id
) {
566 return seastar::now();
568 // Only extent split can happen during the lookup
569 ceph_assert(pin_seg_paddr
>= paddr
&&
570 pin_seg_paddr
.add_offset(pin_len
) <= paddr
.add_offset(len
));
571 return read_pin_by_type(t
, std::move(pin
), type
572 ).si_then([&list
](auto ret
) {
573 list
.emplace_back(std::move(ret
));
574 return seastar::now();
577 return get_extents_if_live_ret(
578 interruptible::ready_future_marker
{},
582 }).handle_error_interruptible(crimson::ct_error::enoent::handle([] {
583 return get_extents_if_live_ret(
584 interruptible::ready_future_marker
{},
585 std::list
<CachedExtentRef
>());
586 }), crimson::ct_error::pass_further_all
{});
588 return lba_manager
->get_physical_extent_if_live(
594 ).si_then([=, &t
](auto ret
) {
595 std::list
<CachedExtentRef
> res
;
597 DEBUGT("{} {}~{} {} is live as physical extent -- {}",
598 t
, type
, laddr
, len
, paddr
, *ret
);
599 res
.emplace_back(std::move(ret
));
601 DEBUGT("{} {}~{} {} is not live as physical extent",
602 t
, type
, laddr
, len
, paddr
);
604 return get_extents_if_live_ret(
605 interruptible::ready_future_marker
{},
612 TransactionManager::~TransactionManager() {}
614 TransactionManagerRef
make_transaction_manager(
615 Device
*primary_device
,
616 const std::vector
<Device
*> &secondary_devices
,
619 auto epm
= std::make_unique
<ExtentPlacementManager
>();
620 auto cache
= std::make_unique
<Cache
>(*epm
);
621 auto lba_manager
= lba_manager::create_lba_manager(*cache
);
622 auto sms
= std::make_unique
<SegmentManagerGroup
>();
623 auto rbs
= std::make_unique
<RBMDeviceGroup
>();
624 auto backref_manager
= create_backref_manager(*cache
);
625 SegmentManagerGroupRef cold_sms
= nullptr;
626 std::vector
<SegmentProvider
*> segment_providers_by_id
{DEVICE_ID_MAX
, nullptr};
628 auto p_backend_type
= primary_device
->get_backend_type();
630 if (p_backend_type
== backend_type_t::SEGMENTED
) {
631 auto dtype
= primary_device
->get_device_type();
632 ceph_assert(dtype
!= device_type_t::HDD
&&
633 dtype
!= device_type_t::EPHEMERAL_COLD
);
634 sms
->add_segment_manager(static_cast<SegmentManager
*>(primary_device
));
636 auto rbm
= std::make_unique
<BlockRBManager
>(
637 static_cast<RBMDevice
*>(primary_device
), "", is_test
);
638 rbs
->add_rb_manager(std::move(rbm
));
641 for (auto &p_dev
: secondary_devices
) {
642 if (p_dev
->get_backend_type() == backend_type_t::SEGMENTED
) {
643 if (p_dev
->get_device_type() == primary_device
->get_device_type()) {
644 sms
->add_segment_manager(static_cast<SegmentManager
*>(p_dev
));
647 cold_sms
= std::make_unique
<SegmentManagerGroup
>();
649 cold_sms
->add_segment_manager(static_cast<SegmentManager
*>(p_dev
));
652 auto rbm
= std::make_unique
<BlockRBManager
>(
653 static_cast<RBMDevice
*>(p_dev
), "", is_test
);
654 rbs
->add_rb_manager(std::move(rbm
));
658 auto journal_type
= p_backend_type
;
659 device_off_t roll_size
;
660 device_off_t roll_start
;
661 if (journal_type
== journal_type_t::SEGMENTED
) {
662 roll_size
= static_cast<SegmentManager
*>(primary_device
)->get_segment_size();
665 roll_size
= static_cast<random_block_device::RBMDevice
*>(primary_device
)
666 ->get_journal_size() - primary_device
->get_block_size();
667 // see CircularBoundedJournal::get_records_start()
668 roll_start
= static_cast<random_block_device::RBMDevice
*>(primary_device
)
669 ->get_journal_start() + primary_device
->get_block_size();
670 ceph_assert_always(roll_size
<= DEVICE_OFF_MAX
);
671 ceph_assert_always((std::size_t)roll_size
+ roll_start
<=
672 primary_device
->get_available_size());
674 ceph_assert(roll_size
% primary_device
->get_block_size() == 0);
675 ceph_assert(roll_start
% primary_device
->get_block_size() == 0);
677 bool cleaner_is_detailed
;
678 SegmentCleaner::config_t cleaner_config
;
679 JournalTrimmerImpl::config_t trimmer_config
;
681 cleaner_is_detailed
= true;
682 cleaner_config
= SegmentCleaner::config_t::get_test();
683 trimmer_config
= JournalTrimmerImpl::config_t::get_test(
684 roll_size
, journal_type
);
686 cleaner_is_detailed
= false;
687 cleaner_config
= SegmentCleaner::config_t::get_default();
688 trimmer_config
= JournalTrimmerImpl::config_t::get_default(
689 roll_size
, journal_type
);
692 auto journal_trimmer
= JournalTrimmerImpl::create(
693 *backref_manager
, trimmer_config
,
694 journal_type
, roll_start
, roll_size
);
696 AsyncCleanerRef cleaner
;
699 SegmentCleanerRef cold_segment_cleaner
= nullptr;
702 cold_segment_cleaner
= SegmentCleaner::create(
706 epm
->get_ool_segment_seq_allocator(),
708 /* is_cold = */ true);
709 if (journal_type
== journal_type_t::SEGMENTED
) {
710 for (auto id
: cold_segment_cleaner
->get_device_ids()) {
711 segment_providers_by_id
[id
] =
712 static_cast<SegmentProvider
*>(cold_segment_cleaner
.get());
717 if (journal_type
== journal_type_t::SEGMENTED
) {
718 cleaner
= SegmentCleaner::create(
722 epm
->get_ool_segment_seq_allocator(),
723 cleaner_is_detailed
);
724 auto segment_cleaner
= static_cast<SegmentCleaner
*>(cleaner
.get());
725 for (auto id
: segment_cleaner
->get_device_ids()) {
726 segment_providers_by_id
[id
] =
727 static_cast<SegmentProvider
*>(segment_cleaner
);
729 segment_cleaner
->set_journal_trimmer(*journal_trimmer
);
730 journal
= journal::make_segmented(
734 cleaner
= RBMCleaner::create(
737 cleaner_is_detailed
);
738 journal
= journal::make_circularbounded(
740 static_cast<random_block_device::RBMDevice
*>(primary_device
),
744 cache
->set_segment_providers(std::move(segment_providers_by_id
));
746 epm
->init(std::move(journal_trimmer
),
748 std::move(cold_segment_cleaner
));
749 epm
->set_primary_device(primary_device
);
751 return std::make_unique
<TransactionManager
>(
754 std::move(lba_manager
),
756 std::move(backref_manager
));