]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/transaction_manager.cc
eda9ca1c56fb0ca18903fe171624767d3b161a1f
[ceph.git] / ceph / src / crimson / os / seastore / transaction_manager.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab expandtab
3
4 #include "include/denc.h"
5 #include "include/intarith.h"
6
7 #include "crimson/os/seastore/logging.h"
8 #include "crimson/os/seastore/transaction_manager.h"
9 #include "crimson/os/seastore/journal.h"
10 #include "crimson/os/seastore/journal/circular_bounded_journal.h"
11 #include "crimson/os/seastore/lba_manager/btree/lba_btree_node.h"
12 #include "crimson/os/seastore/random_block_manager/rbm_device.h"
13
14 /*
15 * TransactionManager logs
16 *
17 * levels:
18 * - INFO: major initiation, closing operations
19 * - DEBUG: major extent related operations, INFO details
20 * - TRACE: DEBUG details
21 * - seastore_t logs
22 */
23 SET_SUBSYS(seastore_tm);
24
25 namespace crimson::os::seastore {
26
27 TransactionManager::TransactionManager(
28 JournalRef _journal,
29 CacheRef _cache,
30 LBAManagerRef _lba_manager,
31 ExtentPlacementManagerRef &&_epm,
32 BackrefManagerRef&& _backref_manager)
33 : cache(std::move(_cache)),
34 lba_manager(std::move(_lba_manager)),
35 journal(std::move(_journal)),
36 epm(std::move(_epm)),
37 backref_manager(std::move(_backref_manager))
38 {
39 epm->set_extent_callback(this);
40 journal->set_write_pipeline(&write_pipeline);
41 }
42
43 TransactionManager::mkfs_ertr::future<> TransactionManager::mkfs()
44 {
45 LOG_PREFIX(TransactionManager::mkfs);
46 INFO("enter");
47 return epm->mount(
48 ).safe_then([this] {
49 return journal->open_for_mkfs();
50 }).safe_then([this](auto start_seq) {
51 journal->get_trimmer().update_journal_tails(start_seq, start_seq);
52 journal->get_trimmer().set_journal_head(start_seq);
53 return epm->open_for_write();
54 }).safe_then([this, FNAME]() {
55 return with_transaction_intr(
56 Transaction::src_t::MUTATE,
57 "mkfs_tm",
58 [this, FNAME](auto& t)
59 {
60 cache->init();
61 return cache->mkfs(t
62 ).si_then([this, &t] {
63 return lba_manager->mkfs(t);
64 }).si_then([this, &t] {
65 return backref_manager->mkfs(t);
66 }).si_then([this, FNAME, &t] {
67 INFOT("submitting mkfs transaction", t);
68 return submit_transaction_direct(t);
69 });
70 }).handle_error(
71 crimson::ct_error::eagain::handle([] {
72 ceph_assert(0 == "eagain impossible");
73 return mkfs_ertr::now();
74 }),
75 mkfs_ertr::pass_further{}
76 );
77 }).safe_then([this] {
78 return close();
79 }).safe_then([FNAME] {
80 INFO("completed");
81 });
82 }
83
84 TransactionManager::mount_ertr::future<> TransactionManager::mount()
85 {
86 LOG_PREFIX(TransactionManager::mount);
87 INFO("enter");
88 cache->init();
89 return epm->mount(
90 ).safe_then([this] {
91 return journal->replay(
92 [this](
93 const auto &offsets,
94 const auto &e,
95 const journal_seq_t &dirty_tail,
96 const journal_seq_t &alloc_tail,
97 sea_time_point modify_time)
98 {
99 auto start_seq = offsets.write_result.start_seq;
100 return cache->replay_delta(
101 start_seq,
102 offsets.record_block_base,
103 e,
104 dirty_tail,
105 alloc_tail,
106 modify_time);
107 });
108 }).safe_then([this] {
109 return journal->open_for_mount();
110 }).safe_then([this](auto start_seq) {
111 journal->get_trimmer().set_journal_head(start_seq);
112 return with_transaction_weak(
113 "mount",
114 [this](auto &t)
115 {
116 return cache->init_cached_extents(t, [this](auto &t, auto &e) {
117 if (is_backref_node(e->get_type())) {
118 return backref_manager->init_cached_extent(t, e);
119 } else {
120 return lba_manager->init_cached_extent(t, e);
121 }
122 }).si_then([this, &t] {
123 epm->start_scan_space();
124 return backref_manager->scan_mapped_space(
125 t,
126 [this](
127 paddr_t paddr,
128 paddr_t backref_key,
129 extent_len_t len,
130 extent_types_t type,
131 laddr_t laddr) {
132 if (is_backref_node(type)) {
133 assert(laddr == L_ADDR_NULL);
134 assert(backref_key != P_ADDR_NULL);
135 backref_manager->cache_new_backref_extent(paddr, backref_key, type);
136 cache->update_tree_extents_num(type, 1);
137 epm->mark_space_used(paddr, len);
138 } else if (laddr == L_ADDR_NULL) {
139 assert(backref_key == P_ADDR_NULL);
140 cache->update_tree_extents_num(type, -1);
141 epm->mark_space_free(paddr, len);
142 } else {
143 assert(backref_key == P_ADDR_NULL);
144 cache->update_tree_extents_num(type, 1);
145 epm->mark_space_used(paddr, len);
146 }
147 });
148 });
149 });
150 }).safe_then([this] {
151 return epm->open_for_write();
152 }).safe_then([FNAME, this] {
153 epm->start_background();
154 INFO("completed");
155 }).handle_error(
156 mount_ertr::pass_further{},
157 crimson::ct_error::all_same_way([] {
158 ceph_assert(0 == "unhandled error");
159 return mount_ertr::now();
160 })
161 );
162 }
163
164 TransactionManager::close_ertr::future<> TransactionManager::close() {
165 LOG_PREFIX(TransactionManager::close);
166 INFO("enter");
167 return epm->stop_background(
168 ).then([this] {
169 return cache->close();
170 }).safe_then([this] {
171 cache->dump_contents();
172 return journal->close();
173 }).safe_then([this] {
174 return epm->close();
175 }).safe_then([FNAME] {
176 INFO("completed");
177 return seastar::now();
178 });
179 }
180
181 TransactionManager::ref_ret TransactionManager::inc_ref(
182 Transaction &t,
183 LogicalCachedExtentRef &ref)
184 {
185 LOG_PREFIX(TransactionManager::inc_ref);
186 TRACET("{}", t, *ref);
187 return lba_manager->incref_extent(t, ref->get_laddr()
188 ).si_then([FNAME, ref, &t](auto result) {
189 DEBUGT("extent refcount is incremented to {} -- {}",
190 t, result.refcount, *ref);
191 return result.refcount;
192 }).handle_error_interruptible(
193 ref_iertr::pass_further{},
194 ct_error::all_same_way([](auto e) {
195 ceph_assert(0 == "unhandled error, TODO");
196 }));
197 }
198
199 TransactionManager::ref_ret TransactionManager::inc_ref(
200 Transaction &t,
201 laddr_t offset)
202 {
203 LOG_PREFIX(TransactionManager::inc_ref);
204 TRACET("{}", t, offset);
205 return lba_manager->incref_extent(t, offset
206 ).si_then([FNAME, offset, &t](auto result) {
207 DEBUGT("extent refcount is incremented to {} -- {}~{}, {}",
208 t, result.refcount, offset, result.length, result.addr);
209 return result.refcount;
210 });
211 }
212
213 TransactionManager::ref_ret TransactionManager::dec_ref(
214 Transaction &t,
215 LogicalCachedExtentRef &ref)
216 {
217 LOG_PREFIX(TransactionManager::dec_ref);
218 TRACET("{}", t, *ref);
219 return lba_manager->decref_extent(t, ref->get_laddr()
220 ).si_then([this, FNAME, &t, ref](auto result) {
221 DEBUGT("extent refcount is decremented to {} -- {}",
222 t, result.refcount, *ref);
223 if (result.refcount == 0) {
224 cache->retire_extent(t, ref);
225 }
226 return result.refcount;
227 });
228 }
229
230 TransactionManager::ref_ret TransactionManager::dec_ref(
231 Transaction &t,
232 laddr_t offset)
233 {
234 LOG_PREFIX(TransactionManager::dec_ref);
235 TRACET("{}", t, offset);
236 return lba_manager->decref_extent(t, offset
237 ).si_then([this, FNAME, offset, &t](auto result) -> ref_ret {
238 DEBUGT("extent refcount is decremented to {} -- {}~{}, {}",
239 t, result.refcount, offset, result.length, result.addr);
240 if (result.refcount == 0 && !result.addr.is_zero()) {
241 return cache->retire_extent_addr(
242 t, result.addr, result.length
243 ).si_then([] {
244 return ref_ret(
245 interruptible::ready_future_marker{},
246 0);
247 });
248 } else {
249 return ref_ret(
250 interruptible::ready_future_marker{},
251 result.refcount);
252 }
253 });
254 }
255
256 TransactionManager::refs_ret TransactionManager::dec_ref(
257 Transaction &t,
258 std::vector<laddr_t> offsets)
259 {
260 LOG_PREFIX(TransactionManager::dec_ref);
261 DEBUG("{} offsets", offsets.size());
262 return seastar::do_with(std::move(offsets), std::vector<unsigned>(),
263 [this, &t] (auto &&offsets, auto &refcnt) {
264 return trans_intr::do_for_each(offsets.begin(), offsets.end(),
265 [this, &t, &refcnt] (auto &laddr) {
266 return this->dec_ref(t, laddr).si_then([&refcnt] (auto ref) {
267 refcnt.push_back(ref);
268 return ref_iertr::now();
269 });
270 }).si_then([&refcnt] {
271 return ref_iertr::make_ready_future<std::vector<unsigned>>(std::move(refcnt));
272 });
273 });
274 }
275
276 TransactionManager::submit_transaction_iertr::future<>
277 TransactionManager::submit_transaction(
278 Transaction &t)
279 {
280 LOG_PREFIX(TransactionManager::submit_transaction);
281 SUBTRACET(seastore_t, "start", t);
282 return trans_intr::make_interruptible(
283 t.get_handle().enter(write_pipeline.reserve_projected_usage)
284 ).then_interruptible([this, FNAME, &t] {
285 auto dispatch_result = epm->dispatch_delayed_extents(t);
286 auto projected_usage = dispatch_result.usage;
287 SUBTRACET(seastore_t, "waiting for projected_usage: {}", t, projected_usage);
288 return trans_intr::make_interruptible(
289 epm->reserve_projected_usage(projected_usage)
290 ).then_interruptible([this, &t, dispatch_result = std::move(dispatch_result)] {
291 return do_submit_transaction(t, std::move(dispatch_result));
292 }).finally([this, FNAME, projected_usage, &t] {
293 SUBTRACET(seastore_t, "releasing projected_usage: {}", t, projected_usage);
294 epm->release_projected_usage(projected_usage);
295 });
296 });
297 }
298
299 TransactionManager::submit_transaction_direct_ret
300 TransactionManager::submit_transaction_direct(
301 Transaction &tref,
302 std::optional<journal_seq_t> trim_alloc_to)
303 {
304 return do_submit_transaction(
305 tref,
306 epm->dispatch_delayed_extents(tref),
307 trim_alloc_to);
308 }
309
310 TransactionManager::submit_transaction_direct_ret
311 TransactionManager::do_submit_transaction(
312 Transaction &tref,
313 ExtentPlacementManager::dispatch_result_t dispatch_result,
314 std::optional<journal_seq_t> trim_alloc_to)
315 {
316 LOG_PREFIX(TransactionManager::do_submit_transaction);
317 SUBTRACET(seastore_t, "start", tref);
318 return trans_intr::make_interruptible(
319 tref.get_handle().enter(write_pipeline.ool_writes)
320 ).then_interruptible([this, FNAME, &tref,
321 dispatch_result = std::move(dispatch_result)] {
322 return seastar::do_with(std::move(dispatch_result),
323 [this, FNAME, &tref](auto &dispatch_result) {
324 return epm->write_delayed_ool_extents(tref, dispatch_result.alloc_map
325 ).si_then([this, FNAME, &tref, &dispatch_result] {
326 SUBTRACET(seastore_t, "update delayed extent mappings", tref);
327 return lba_manager->update_mappings(tref, dispatch_result.delayed_extents);
328 }).handle_error_interruptible(
329 crimson::ct_error::input_output_error::pass_further(),
330 crimson::ct_error::assert_all("invalid error")
331 );
332 });
333 }).si_then([this, FNAME, &tref] {
334 auto allocated_extents = tref.get_valid_pre_alloc_list();
335 auto num_extents = allocated_extents.size();
336 SUBTRACET(seastore_t, "process {} allocated extents", tref, num_extents);
337 return epm->write_preallocated_ool_extents(tref, allocated_extents
338 ).handle_error_interruptible(
339 crimson::ct_error::input_output_error::pass_further(),
340 crimson::ct_error::assert_all("invalid error")
341 );
342 }).si_then([this, FNAME, &tref] {
343 SUBTRACET(seastore_t, "about to prepare", tref);
344 return tref.get_handle().enter(write_pipeline.prepare);
345 }).si_then([this, FNAME, &tref, trim_alloc_to=std::move(trim_alloc_to)]() mutable
346 -> submit_transaction_iertr::future<> {
347 if (trim_alloc_to && *trim_alloc_to != JOURNAL_SEQ_NULL) {
348 cache->trim_backref_bufs(*trim_alloc_to);
349 }
350
351 auto record = cache->prepare_record(
352 tref,
353 journal->get_trimmer().get_journal_head(),
354 journal->get_trimmer().get_dirty_tail());
355
356 tref.get_handle().maybe_release_collection_lock();
357
358 SUBTRACET(seastore_t, "about to submit to journal", tref);
359 return journal->submit_record(std::move(record), tref.get_handle()
360 ).safe_then([this, FNAME, &tref](auto submit_result) mutable {
361 SUBDEBUGT(seastore_t, "committed with {}", tref, submit_result);
362 auto start_seq = submit_result.write_result.start_seq;
363 journal->get_trimmer().set_journal_head(start_seq);
364 cache->complete_commit(
365 tref,
366 submit_result.record_block_base,
367 start_seq);
368
369 std::vector<CachedExtentRef> lba_to_clear;
370 std::vector<CachedExtentRef> backref_to_clear;
371 lba_to_clear.reserve(tref.get_retired_set().size());
372 backref_to_clear.reserve(tref.get_retired_set().size());
373 for (auto &e: tref.get_retired_set()) {
374 if (e->is_logical() || is_lba_node(e->get_type()))
375 lba_to_clear.push_back(e);
376 else if (is_backref_node(e->get_type()))
377 backref_to_clear.push_back(e);
378 }
379
380 journal->get_trimmer().update_journal_tails(
381 cache->get_oldest_dirty_from().value_or(start_seq),
382 cache->get_oldest_backref_dirty_from().value_or(start_seq));
383 return journal->finish_commit(tref.get_src()
384 ).then([&tref] {
385 return tref.get_handle().complete();
386 });
387 }).handle_error(
388 submit_transaction_iertr::pass_further{},
389 crimson::ct_error::all_same_way([](auto e) {
390 ceph_assert(0 == "Hit error submitting to journal");
391 })
392 );
393 }).finally([&tref]() {
394 tref.get_handle().exit();
395 });
396 }
397
398 seastar::future<> TransactionManager::flush(OrderingHandle &handle)
399 {
400 LOG_PREFIX(TransactionManager::flush);
401 SUBDEBUG(seastore_t, "H{} start", (void*)&handle);
402 return handle.enter(write_pipeline.reserve_projected_usage
403 ).then([this, &handle] {
404 return handle.enter(write_pipeline.ool_writes);
405 }).then([this, &handle] {
406 return handle.enter(write_pipeline.prepare);
407 }).then([this, &handle] {
408 handle.maybe_release_collection_lock();
409 return journal->flush(handle);
410 }).then([FNAME, &handle] {
411 SUBDEBUG(seastore_t, "H{} completed", (void*)&handle);
412 });
413 }
414
415 TransactionManager::get_next_dirty_extents_ret
416 TransactionManager::get_next_dirty_extents(
417 Transaction &t,
418 journal_seq_t seq,
419 size_t max_bytes)
420 {
421 LOG_PREFIX(TransactionManager::get_next_dirty_extents);
422 DEBUGT("max_bytes={}B, seq={}", t, max_bytes, seq);
423 return cache->get_next_dirty_extents(t, seq, max_bytes);
424 }
425
426 TransactionManager::rewrite_extent_ret
427 TransactionManager::rewrite_logical_extent(
428 Transaction& t,
429 LogicalCachedExtentRef extent)
430 {
431 LOG_PREFIX(TransactionManager::rewrite_logical_extent);
432 if (extent->has_been_invalidated()) {
433 ERRORT("extent has been invalidated -- {}", t, *extent);
434 ceph_abort();
435 }
436 TRACET("rewriting extent -- {}", t, *extent);
437
438 auto lextent = extent->cast<LogicalCachedExtent>();
439 cache->retire_extent(t, extent);
440 auto nlextent = cache->alloc_new_extent_by_type(
441 t,
442 lextent->get_type(),
443 lextent->get_length(),
444 lextent->get_user_hint(),
445 // get target rewrite generation
446 lextent->get_rewrite_generation())->cast<LogicalCachedExtent>();
447 lextent->get_bptr().copy_out(
448 0,
449 lextent->get_length(),
450 nlextent->get_bptr().c_str());
451 nlextent->set_laddr(lextent->get_laddr());
452 nlextent->set_modify_time(lextent->get_modify_time());
453
454 DEBUGT("rewriting logical extent -- {} to {}", t, *lextent, *nlextent);
455
456 /* This update_mapping is, strictly speaking, unnecessary for delayed_alloc
457 * extents since we're going to do it again once we either do the ool write
458 * or allocate a relative inline addr. TODO: refactor AsyncCleaner to
459 * avoid this complication. */
460 return lba_manager->update_mapping(
461 t,
462 lextent->get_laddr(),
463 lextent->get_paddr(),
464 nlextent->get_paddr(),
465 nlextent.get());
466 }
467
468 TransactionManager::rewrite_extent_ret TransactionManager::rewrite_extent(
469 Transaction &t,
470 CachedExtentRef extent,
471 rewrite_gen_t target_generation,
472 sea_time_point modify_time)
473 {
474 LOG_PREFIX(TransactionManager::rewrite_extent);
475
476 {
477 auto updated = cache->update_extent_from_transaction(t, extent);
478 if (!updated) {
479 DEBUGT("extent is already retired, skipping -- {}", t, *extent);
480 return rewrite_extent_iertr::now();
481 }
482 extent = updated;
483 ceph_assert(!extent->is_pending_io());
484 }
485
486 assert(extent->is_valid() && !extent->is_initial_pending());
487 if (extent->is_dirty()) {
488 extent->set_target_rewrite_generation(INIT_GENERATION);
489 } else {
490 extent->set_target_rewrite_generation(target_generation);
491 ceph_assert(modify_time != NULL_TIME);
492 extent->set_modify_time(modify_time);
493 }
494
495 t.get_rewrite_version_stats().increment(extent->get_version());
496
497 if (is_backref_node(extent->get_type())) {
498 DEBUGT("rewriting backref extent -- {}", t, *extent);
499 return backref_manager->rewrite_extent(t, extent);
500 }
501
502 if (extent->get_type() == extent_types_t::ROOT) {
503 DEBUGT("rewriting root extent -- {}", t, *extent);
504 cache->duplicate_for_write(t, extent);
505 return rewrite_extent_iertr::now();
506 }
507
508 if (extent->is_logical()) {
509 return rewrite_logical_extent(t, extent->cast<LogicalCachedExtent>());
510 } else {
511 DEBUGT("rewriting physical extent -- {}", t, *extent);
512 return lba_manager->rewrite_extent(t, extent);
513 }
514 }
515
516 TransactionManager::get_extents_if_live_ret
517 TransactionManager::get_extents_if_live(
518 Transaction &t,
519 extent_types_t type,
520 paddr_t paddr,
521 laddr_t laddr,
522 extent_len_t len)
523 {
524 LOG_PREFIX(TransactionManager::get_extent_if_live);
525 TRACET("{} {}~{} {}", t, type, laddr, len, paddr);
526
527 // This only works with segments to check if alive,
528 // as parallel transactions may split the extent at the same time.
529 ceph_assert(paddr.get_addr_type() == paddr_types_t::SEGMENT);
530
531 return cache->get_extent_if_cached(t, paddr, type
532 ).si_then([=, this, &t](auto extent)
533 -> get_extents_if_live_ret {
534 if (extent && extent->get_length() == len) {
535 DEBUGT("{} {}~{} {} is live in cache -- {}",
536 t, type, laddr, len, paddr, *extent);
537 std::list<CachedExtentRef> res;
538 res.emplace_back(std::move(extent));
539 return get_extents_if_live_ret(
540 interruptible::ready_future_marker{},
541 res);
542 }
543
544 if (is_logical_type(type)) {
545 return lba_manager->get_mappings(
546 t,
547 laddr,
548 len
549 ).si_then([=, this, &t](lba_pin_list_t pin_list) {
550 return seastar::do_with(
551 std::list<CachedExtentRef>(),
552 [=, this, &t, pin_list=std::move(pin_list)](
553 std::list<CachedExtentRef> &list) mutable
554 {
555 auto paddr_seg_id = paddr.as_seg_paddr().get_segment_id();
556 return trans_intr::parallel_for_each(
557 pin_list,
558 [=, this, &list, &t](
559 LBAMappingRef &pin) -> Cache::get_extent_iertr::future<>
560 {
561 auto pin_paddr = pin->get_val();
562 auto &pin_seg_paddr = pin_paddr.as_seg_paddr();
563 auto pin_paddr_seg_id = pin_seg_paddr.get_segment_id();
564 auto pin_len = pin->get_length();
565 if (pin_paddr_seg_id != paddr_seg_id) {
566 return seastar::now();
567 }
568 // Only extent split can happen during the lookup
569 ceph_assert(pin_seg_paddr >= paddr &&
570 pin_seg_paddr.add_offset(pin_len) <= paddr.add_offset(len));
571 return read_pin_by_type(t, std::move(pin), type
572 ).si_then([&list](auto ret) {
573 list.emplace_back(std::move(ret));
574 return seastar::now();
575 });
576 }).si_then([&list] {
577 return get_extents_if_live_ret(
578 interruptible::ready_future_marker{},
579 std::move(list));
580 });
581 });
582 }).handle_error_interruptible(crimson::ct_error::enoent::handle([] {
583 return get_extents_if_live_ret(
584 interruptible::ready_future_marker{},
585 std::list<CachedExtentRef>());
586 }), crimson::ct_error::pass_further_all{});
587 } else {
588 return lba_manager->get_physical_extent_if_live(
589 t,
590 type,
591 paddr,
592 laddr,
593 len
594 ).si_then([=, &t](auto ret) {
595 std::list<CachedExtentRef> res;
596 if (ret) {
597 DEBUGT("{} {}~{} {} is live as physical extent -- {}",
598 t, type, laddr, len, paddr, *ret);
599 res.emplace_back(std::move(ret));
600 } else {
601 DEBUGT("{} {}~{} {} is not live as physical extent",
602 t, type, laddr, len, paddr);
603 }
604 return get_extents_if_live_ret(
605 interruptible::ready_future_marker{},
606 std::move(res));
607 });
608 }
609 });
610 }
611
612 TransactionManager::~TransactionManager() {}
613
614 TransactionManagerRef make_transaction_manager(
615 Device *primary_device,
616 const std::vector<Device*> &secondary_devices,
617 bool is_test)
618 {
619 auto epm = std::make_unique<ExtentPlacementManager>();
620 auto cache = std::make_unique<Cache>(*epm);
621 auto lba_manager = lba_manager::create_lba_manager(*cache);
622 auto sms = std::make_unique<SegmentManagerGroup>();
623 auto rbs = std::make_unique<RBMDeviceGroup>();
624 auto backref_manager = create_backref_manager(*cache);
625 SegmentManagerGroupRef cold_sms = nullptr;
626 std::vector<SegmentProvider*> segment_providers_by_id{DEVICE_ID_MAX, nullptr};
627
628 auto p_backend_type = primary_device->get_backend_type();
629
630 if (p_backend_type == backend_type_t::SEGMENTED) {
631 auto dtype = primary_device->get_device_type();
632 ceph_assert(dtype != device_type_t::HDD &&
633 dtype != device_type_t::EPHEMERAL_COLD);
634 sms->add_segment_manager(static_cast<SegmentManager*>(primary_device));
635 } else {
636 auto rbm = std::make_unique<BlockRBManager>(
637 static_cast<RBMDevice*>(primary_device), "", is_test);
638 rbs->add_rb_manager(std::move(rbm));
639 }
640
641 for (auto &p_dev : secondary_devices) {
642 if (p_dev->get_backend_type() == backend_type_t::SEGMENTED) {
643 if (p_dev->get_device_type() == primary_device->get_device_type()) {
644 sms->add_segment_manager(static_cast<SegmentManager*>(p_dev));
645 } else {
646 if (!cold_sms) {
647 cold_sms = std::make_unique<SegmentManagerGroup>();
648 }
649 cold_sms->add_segment_manager(static_cast<SegmentManager*>(p_dev));
650 }
651 } else {
652 auto rbm = std::make_unique<BlockRBManager>(
653 static_cast<RBMDevice*>(p_dev), "", is_test);
654 rbs->add_rb_manager(std::move(rbm));
655 }
656 }
657
658 auto journal_type = p_backend_type;
659 device_off_t roll_size;
660 device_off_t roll_start;
661 if (journal_type == journal_type_t::SEGMENTED) {
662 roll_size = static_cast<SegmentManager*>(primary_device)->get_segment_size();
663 roll_start = 0;
664 } else {
665 roll_size = static_cast<random_block_device::RBMDevice*>(primary_device)
666 ->get_journal_size() - primary_device->get_block_size();
667 // see CircularBoundedJournal::get_records_start()
668 roll_start = static_cast<random_block_device::RBMDevice*>(primary_device)
669 ->get_journal_start() + primary_device->get_block_size();
670 ceph_assert_always(roll_size <= DEVICE_OFF_MAX);
671 ceph_assert_always((std::size_t)roll_size + roll_start <=
672 primary_device->get_available_size());
673 }
674 ceph_assert(roll_size % primary_device->get_block_size() == 0);
675 ceph_assert(roll_start % primary_device->get_block_size() == 0);
676
677 bool cleaner_is_detailed;
678 SegmentCleaner::config_t cleaner_config;
679 JournalTrimmerImpl::config_t trimmer_config;
680 if (is_test) {
681 cleaner_is_detailed = true;
682 cleaner_config = SegmentCleaner::config_t::get_test();
683 trimmer_config = JournalTrimmerImpl::config_t::get_test(
684 roll_size, journal_type);
685 } else {
686 cleaner_is_detailed = false;
687 cleaner_config = SegmentCleaner::config_t::get_default();
688 trimmer_config = JournalTrimmerImpl::config_t::get_default(
689 roll_size, journal_type);
690 }
691
692 auto journal_trimmer = JournalTrimmerImpl::create(
693 *backref_manager, trimmer_config,
694 journal_type, roll_start, roll_size);
695
696 AsyncCleanerRef cleaner;
697 JournalRef journal;
698
699 SegmentCleanerRef cold_segment_cleaner = nullptr;
700
701 if (cold_sms) {
702 cold_segment_cleaner = SegmentCleaner::create(
703 cleaner_config,
704 std::move(cold_sms),
705 *backref_manager,
706 epm->get_ool_segment_seq_allocator(),
707 cleaner_is_detailed,
708 /* is_cold = */ true);
709 if (journal_type == journal_type_t::SEGMENTED) {
710 for (auto id : cold_segment_cleaner->get_device_ids()) {
711 segment_providers_by_id[id] =
712 static_cast<SegmentProvider*>(cold_segment_cleaner.get());
713 }
714 }
715 }
716
717 if (journal_type == journal_type_t::SEGMENTED) {
718 cleaner = SegmentCleaner::create(
719 cleaner_config,
720 std::move(sms),
721 *backref_manager,
722 epm->get_ool_segment_seq_allocator(),
723 cleaner_is_detailed);
724 auto segment_cleaner = static_cast<SegmentCleaner*>(cleaner.get());
725 for (auto id : segment_cleaner->get_device_ids()) {
726 segment_providers_by_id[id] =
727 static_cast<SegmentProvider*>(segment_cleaner);
728 }
729 segment_cleaner->set_journal_trimmer(*journal_trimmer);
730 journal = journal::make_segmented(
731 *segment_cleaner,
732 *journal_trimmer);
733 } else {
734 cleaner = RBMCleaner::create(
735 std::move(rbs),
736 *backref_manager,
737 cleaner_is_detailed);
738 journal = journal::make_circularbounded(
739 *journal_trimmer,
740 static_cast<random_block_device::RBMDevice*>(primary_device),
741 "");
742 }
743
744 cache->set_segment_providers(std::move(segment_providers_by_id));
745
746 epm->init(std::move(journal_trimmer),
747 std::move(cleaner),
748 std::move(cold_segment_cleaner));
749 epm->set_primary_device(primary_device);
750
751 return std::make_unique<TransactionManager>(
752 std::move(journal),
753 std::move(cache),
754 std::move(lba_manager),
755 std::move(epm),
756 std::move(backref_manager));
757 }
758
759 }