]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/os/seastore/transaction_manager.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / os / seastore / transaction_manager.cc
CommitLineData
f67539c2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1e59de90 2// vim: ts=8 sw=2 smarttab expandtab
f67539c2
TL
3
4#include "include/denc.h"
5#include "include/intarith.h"
6
20effc67 7#include "crimson/os/seastore/logging.h"
f67539c2 8#include "crimson/os/seastore/transaction_manager.h"
f67539c2 9#include "crimson/os/seastore/journal.h"
1e59de90
TL
10#include "crimson/os/seastore/journal/circular_bounded_journal.h"
11#include "crimson/os/seastore/lba_manager/btree/lba_btree_node.h"
12#include "crimson/os/seastore/random_block_manager/rbm_device.h"
13
14/*
15 * TransactionManager logs
16 *
17 * levels:
18 * - INFO: major initiation, closing operations
19 * - DEBUG: major extent related operations, INFO details
20 * - TRACE: DEBUG details
21 * - seastore_t logs
22 */
20effc67 23SET_SUBSYS(seastore_tm);
f67539c2
TL
24
25namespace crimson::os::seastore {
26
27TransactionManager::TransactionManager(
20effc67
TL
28 JournalRef _journal,
29 CacheRef _cache,
30 LBAManagerRef _lba_manager,
1e59de90
TL
31 ExtentPlacementManagerRef &&_epm,
32 BackrefManagerRef&& _backref_manager)
33 : cache(std::move(_cache)),
20effc67
TL
34 lba_manager(std::move(_lba_manager)),
35 journal(std::move(_journal)),
1e59de90
TL
36 epm(std::move(_epm)),
37 backref_manager(std::move(_backref_manager))
20effc67 38{
1e59de90 39 epm->set_extent_callback(this);
20effc67 40 journal->set_write_pipeline(&write_pipeline);
20effc67 41}
f67539c2
TL
42
43TransactionManager::mkfs_ertr::future<> TransactionManager::mkfs()
44{
20effc67 45 LOG_PREFIX(TransactionManager::mkfs);
1e59de90
TL
46 INFO("enter");
47 return epm->mount(
48 ).safe_then([this] {
49 return journal->open_for_mkfs();
50 }).safe_then([this](auto start_seq) {
51 journal->get_trimmer().update_journal_tails(start_seq, start_seq);
52 journal->get_trimmer().set_journal_head(start_seq);
53 return epm->open_for_write();
54 }).safe_then([this, FNAME]() {
20effc67
TL
55 return with_transaction_intr(
56 Transaction::src_t::MUTATE,
57 "mkfs_tm",
58 [this, FNAME](auto& t)
59 {
20effc67
TL
60 cache->init();
61 return cache->mkfs(t
62 ).si_then([this, &t] {
63 return lba_manager->mkfs(t);
1e59de90
TL
64 }).si_then([this, &t] {
65 return backref_manager->mkfs(t);
20effc67 66 }).si_then([this, FNAME, &t] {
1e59de90 67 INFOT("submitting mkfs transaction", t);
20effc67 68 return submit_transaction_direct(t);
f67539c2 69 });
20effc67
TL
70 }).handle_error(
71 crimson::ct_error::eagain::handle([] {
72 ceph_assert(0 == "eagain impossible");
73 return mkfs_ertr::now();
74 }),
75 mkfs_ertr::pass_further{}
76 );
f67539c2 77 }).safe_then([this] {
20effc67 78 return close();
1e59de90
TL
79 }).safe_then([FNAME] {
80 INFO("completed");
f67539c2
TL
81 });
82}
83
84TransactionManager::mount_ertr::future<> TransactionManager::mount()
85{
20effc67 86 LOG_PREFIX(TransactionManager::mount);
1e59de90 87 INFO("enter");
20effc67 88 cache->init();
1e59de90
TL
89 return epm->mount(
90 ).safe_then([this] {
20effc67 91 return journal->replay(
1e59de90
TL
92 [this](
93 const auto &offsets,
94 const auto &e,
95 const journal_seq_t &dirty_tail,
96 const journal_seq_t &alloc_tail,
97 sea_time_point modify_time)
98 {
99 auto start_seq = offsets.write_result.start_seq;
100 return cache->replay_delta(
101 start_seq,
102 offsets.record_block_base,
103 e,
104 dirty_tail,
105 alloc_tail,
106 modify_time);
107 });
f67539c2 108 }).safe_then([this] {
1e59de90
TL
109 return journal->open_for_mount();
110 }).safe_then([this](auto start_seq) {
111 journal->get_trimmer().set_journal_head(start_seq);
112 return with_transaction_weak(
113 "mount",
114 [this](auto &t)
115 {
116 return cache->init_cached_extents(t, [this](auto &t, auto &e) {
117 if (is_backref_node(e->get_type())) {
118 return backref_manager->init_cached_extent(t, e);
119 } else {
120 return lba_manager->init_cached_extent(t, e);
121 }
122 }).si_then([this, &t] {
123 epm->start_scan_space();
124 return backref_manager->scan_mapped_space(
125 t,
126 [this](
127 paddr_t paddr,
128 paddr_t backref_key,
129 extent_len_t len,
130 extent_types_t type,
131 laddr_t laddr) {
132 if (is_backref_node(type)) {
133 assert(laddr == L_ADDR_NULL);
134 assert(backref_key != P_ADDR_NULL);
135 backref_manager->cache_new_backref_extent(paddr, backref_key, type);
136 cache->update_tree_extents_num(type, 1);
137 epm->mark_space_used(paddr, len);
138 } else if (laddr == L_ADDR_NULL) {
139 assert(backref_key == P_ADDR_NULL);
140 cache->update_tree_extents_num(type, -1);
141 epm->mark_space_free(paddr, len);
142 } else {
143 assert(backref_key == P_ADDR_NULL);
144 cache->update_tree_extents_num(type, 1);
145 epm->mark_space_used(paddr, len);
146 }
147 });
f67539c2 148 });
1e59de90 149 });
f67539c2 150 }).safe_then([this] {
1e59de90
TL
151 return epm->open_for_write();
152 }).safe_then([FNAME, this] {
153 epm->start_background();
154 INFO("completed");
f67539c2
TL
155 }).handle_error(
156 mount_ertr::pass_further{},
157 crimson::ct_error::all_same_way([] {
158 ceph_assert(0 == "unhandled error");
159 return mount_ertr::now();
1e59de90
TL
160 })
161 );
f67539c2
TL
162}
163
164TransactionManager::close_ertr::future<> TransactionManager::close() {
20effc67 165 LOG_PREFIX(TransactionManager::close);
1e59de90
TL
166 INFO("enter");
167 return epm->stop_background(
20effc67
TL
168 ).then([this] {
169 return cache->close();
170 }).safe_then([this] {
171 cache->dump_contents();
172 return journal->close();
1e59de90
TL
173 }).safe_then([this] {
174 return epm->close();
20effc67 175 }).safe_then([FNAME] {
1e59de90 176 INFO("completed");
20effc67 177 return seastar::now();
f67539c2
TL
178 });
179}
180
181TransactionManager::ref_ret TransactionManager::inc_ref(
182 Transaction &t,
183 LogicalCachedExtentRef &ref)
184{
1e59de90
TL
185 LOG_PREFIX(TransactionManager::inc_ref);
186 TRACET("{}", t, *ref);
187 return lba_manager->incref_extent(t, ref->get_laddr()
188 ).si_then([FNAME, ref, &t](auto result) {
189 DEBUGT("extent refcount is incremented to {} -- {}",
190 t, result.refcount, *ref);
191 return result.refcount;
20effc67
TL
192 }).handle_error_interruptible(
193 ref_iertr::pass_further{},
f67539c2
TL
194 ct_error::all_same_way([](auto e) {
195 ceph_assert(0 == "unhandled error, TODO");
196 }));
197}
198
199TransactionManager::ref_ret TransactionManager::inc_ref(
200 Transaction &t,
201 laddr_t offset)
202{
1e59de90
TL
203 LOG_PREFIX(TransactionManager::inc_ref);
204 TRACET("{}", t, offset);
205 return lba_manager->incref_extent(t, offset
206 ).si_then([FNAME, offset, &t](auto result) {
207 DEBUGT("extent refcount is incremented to {} -- {}~{}, {}",
208 t, result.refcount, offset, result.length, result.addr);
f67539c2
TL
209 return result.refcount;
210 });
211}
212
213TransactionManager::ref_ret TransactionManager::dec_ref(
214 Transaction &t,
215 LogicalCachedExtentRef &ref)
216{
20effc67 217 LOG_PREFIX(TransactionManager::dec_ref);
1e59de90 218 TRACET("{}", t, *ref);
aee94f69 219 return lba_manager->decref_extent(t, ref->get_laddr(), true
1e59de90
TL
220 ).si_then([this, FNAME, &t, ref](auto result) {
221 DEBUGT("extent refcount is decremented to {} -- {}",
222 t, result.refcount, *ref);
223 if (result.refcount == 0) {
20effc67 224 cache->retire_extent(t, ref);
f67539c2 225 }
1e59de90 226 return result.refcount;
f67539c2
TL
227 });
228}
229
aee94f69 230TransactionManager::ref_ret TransactionManager::_dec_ref(
f67539c2 231 Transaction &t,
aee94f69
TL
232 laddr_t offset,
233 bool cascade_remove)
f67539c2 234{
aee94f69 235 LOG_PREFIX(TransactionManager::_dec_ref);
1e59de90 236 TRACET("{}", t, offset);
aee94f69 237 return lba_manager->decref_extent(t, offset, cascade_remove
20effc67 238 ).si_then([this, FNAME, offset, &t](auto result) -> ref_ret {
1e59de90
TL
239 DEBUGT("extent refcount is decremented to {} -- {}~{}, {}",
240 t, result.refcount, offset, result.length, result.addr);
aee94f69
TL
241 auto fut = ref_iertr::now();
242 if (result.refcount == 0) {
243 if (result.addr.is_paddr() &&
244 !result.addr.get_paddr().is_zero()) {
245 fut = cache->retire_extent_addr(
246 t, result.addr.get_paddr(), result.length);
247 }
f67539c2 248 }
aee94f69
TL
249
250 return fut.si_then([result=std::move(result)] {
251 return result.refcount;
252 });
f67539c2
TL
253 });
254}
255
20effc67
TL
256TransactionManager::refs_ret TransactionManager::dec_ref(
257 Transaction &t,
258 std::vector<laddr_t> offsets)
259{
1e59de90
TL
260 LOG_PREFIX(TransactionManager::dec_ref);
261 DEBUG("{} offsets", offsets.size());
20effc67
TL
262 return seastar::do_with(std::move(offsets), std::vector<unsigned>(),
263 [this, &t] (auto &&offsets, auto &refcnt) {
264 return trans_intr::do_for_each(offsets.begin(), offsets.end(),
265 [this, &t, &refcnt] (auto &laddr) {
266 return this->dec_ref(t, laddr).si_then([&refcnt] (auto ref) {
267 refcnt.push_back(ref);
268 return ref_iertr::now();
269 });
270 }).si_then([&refcnt] {
271 return ref_iertr::make_ready_future<std::vector<unsigned>>(std::move(refcnt));
272 });
273 });
274}
275
276TransactionManager::submit_transaction_iertr::future<>
f67539c2 277TransactionManager::submit_transaction(
20effc67 278 Transaction &t)
f67539c2 279{
20effc67 280 LOG_PREFIX(TransactionManager::submit_transaction);
1e59de90 281 SUBTRACET(seastore_t, "start", t);
20effc67
TL
282 return trans_intr::make_interruptible(
283 t.get_handle().enter(write_pipeline.reserve_projected_usage)
284 ).then_interruptible([this, FNAME, &t] {
1e59de90
TL
285 auto dispatch_result = epm->dispatch_delayed_extents(t);
286 auto projected_usage = dispatch_result.usage;
287 SUBTRACET(seastore_t, "waiting for projected_usage: {}", t, projected_usage);
20effc67 288 return trans_intr::make_interruptible(
1e59de90
TL
289 epm->reserve_projected_usage(projected_usage)
290 ).then_interruptible([this, &t, dispatch_result = std::move(dispatch_result)] {
291 return do_submit_transaction(t, std::move(dispatch_result));
20effc67 292 }).finally([this, FNAME, projected_usage, &t] {
1e59de90
TL
293 SUBTRACET(seastore_t, "releasing projected_usage: {}", t, projected_usage);
294 epm->release_projected_usage(projected_usage);
20effc67
TL
295 });
296 });
297}
298
299TransactionManager::submit_transaction_direct_ret
300TransactionManager::submit_transaction_direct(
1e59de90
TL
301 Transaction &tref,
302 std::optional<journal_seq_t> trim_alloc_to)
20effc67 303{
1e59de90
TL
304 return do_submit_transaction(
305 tref,
306 epm->dispatch_delayed_extents(tref),
307 trim_alloc_to);
308}
20effc67 309
1e59de90
TL
310TransactionManager::submit_transaction_direct_ret
311TransactionManager::do_submit_transaction(
312 Transaction &tref,
313 ExtentPlacementManager::dispatch_result_t dispatch_result,
314 std::optional<journal_seq_t> trim_alloc_to)
315{
316 LOG_PREFIX(TransactionManager::do_submit_transaction);
317 SUBTRACET(seastore_t, "start", tref);
20effc67
TL
318 return trans_intr::make_interruptible(
319 tref.get_handle().enter(write_pipeline.ool_writes)
1e59de90
TL
320 ).then_interruptible([this, FNAME, &tref,
321 dispatch_result = std::move(dispatch_result)] {
322 return seastar::do_with(std::move(dispatch_result),
323 [this, FNAME, &tref](auto &dispatch_result) {
324 return epm->write_delayed_ool_extents(tref, dispatch_result.alloc_map
325 ).si_then([this, FNAME, &tref, &dispatch_result] {
326 SUBTRACET(seastore_t, "update delayed extent mappings", tref);
327 return lba_manager->update_mappings(tref, dispatch_result.delayed_extents);
328 }).handle_error_interruptible(
329 crimson::ct_error::input_output_error::pass_further(),
330 crimson::ct_error::assert_all("invalid error")
331 );
332 });
333 }).si_then([this, FNAME, &tref] {
334 auto allocated_extents = tref.get_valid_pre_alloc_list();
335 auto num_extents = allocated_extents.size();
336 SUBTRACET(seastore_t, "process {} allocated extents", tref, num_extents);
337 return epm->write_preallocated_ool_extents(tref, allocated_extents
20effc67
TL
338 ).handle_error_interruptible(
339 crimson::ct_error::input_output_error::pass_further(),
340 crimson::ct_error::assert_all("invalid error")
341 );
342 }).si_then([this, FNAME, &tref] {
1e59de90 343 SUBTRACET(seastore_t, "about to prepare", tref);
20effc67 344 return tref.get_handle().enter(write_pipeline.prepare);
1e59de90 345 }).si_then([this, FNAME, &tref, trim_alloc_to=std::move(trim_alloc_to)]() mutable
20effc67 346 -> submit_transaction_iertr::future<> {
1e59de90
TL
347 if (trim_alloc_to && *trim_alloc_to != JOURNAL_SEQ_NULL) {
348 cache->trim_backref_bufs(*trim_alloc_to);
349 }
20effc67 350
1e59de90
TL
351 auto record = cache->prepare_record(
352 tref,
353 journal->get_trimmer().get_journal_head(),
354 journal->get_trimmer().get_dirty_tail());
20effc67 355
1e59de90 356 tref.get_handle().maybe_release_collection_lock();
f67539c2 357
1e59de90 358 SUBTRACET(seastore_t, "about to submit to journal", tref);
20effc67
TL
359 return journal->submit_record(std::move(record), tref.get_handle()
360 ).safe_then([this, FNAME, &tref](auto submit_result) mutable {
1e59de90 361 SUBDEBUGT(seastore_t, "committed with {}", tref, submit_result);
20effc67 362 auto start_seq = submit_result.write_result.start_seq;
1e59de90 363 journal->get_trimmer().set_journal_head(start_seq);
20effc67
TL
364 cache->complete_commit(
365 tref,
366 submit_result.record_block_base,
1e59de90
TL
367 start_seq);
368
369 std::vector<CachedExtentRef> lba_to_clear;
370 std::vector<CachedExtentRef> backref_to_clear;
371 lba_to_clear.reserve(tref.get_retired_set().size());
372 backref_to_clear.reserve(tref.get_retired_set().size());
373 for (auto &e: tref.get_retired_set()) {
374 if (e->is_logical() || is_lba_node(e->get_type()))
375 lba_to_clear.push_back(e);
376 else if (is_backref_node(e->get_type()))
377 backref_to_clear.push_back(e);
f67539c2 378 }
1e59de90
TL
379
380 journal->get_trimmer().update_journal_tails(
381 cache->get_oldest_dirty_from().value_or(start_seq),
382 cache->get_oldest_backref_dirty_from().value_or(start_seq));
383 return journal->finish_commit(tref.get_src()
384 ).then([&tref] {
385 return tref.get_handle().complete();
386 });
f67539c2 387 }).handle_error(
20effc67 388 submit_transaction_iertr::pass_further{},
f67539c2
TL
389 crimson::ct_error::all_same_way([](auto e) {
390 ceph_assert(0 == "Hit error submitting to journal");
20effc67
TL
391 })
392 );
393 }).finally([&tref]() {
394 tref.get_handle().exit();
f67539c2
TL
395 });
396}
397
1e59de90
TL
398seastar::future<> TransactionManager::flush(OrderingHandle &handle)
399{
400 LOG_PREFIX(TransactionManager::flush);
401 SUBDEBUG(seastore_t, "H{} start", (void*)&handle);
402 return handle.enter(write_pipeline.reserve_projected_usage
403 ).then([this, &handle] {
404 return handle.enter(write_pipeline.ool_writes);
405 }).then([this, &handle] {
406 return handle.enter(write_pipeline.prepare);
407 }).then([this, &handle] {
408 handle.maybe_release_collection_lock();
409 return journal->flush(handle);
410 }).then([FNAME, &handle] {
411 SUBDEBUG(seastore_t, "H{} completed", (void*)&handle);
412 });
413}
414
f67539c2 415TransactionManager::get_next_dirty_extents_ret
20effc67
TL
416TransactionManager::get_next_dirty_extents(
417 Transaction &t,
418 journal_seq_t seq,
419 size_t max_bytes)
f67539c2 420{
1e59de90
TL
421 LOG_PREFIX(TransactionManager::get_next_dirty_extents);
422 DEBUGT("max_bytes={}B, seq={}", t, max_bytes, seq);
20effc67
TL
423 return cache->get_next_dirty_extents(t, seq, max_bytes);
424}
425
426TransactionManager::rewrite_extent_ret
427TransactionManager::rewrite_logical_extent(
428 Transaction& t,
429 LogicalCachedExtentRef extent)
430{
431 LOG_PREFIX(TransactionManager::rewrite_logical_extent);
432 if (extent->has_been_invalidated()) {
1e59de90
TL
433 ERRORT("extent has been invalidated -- {}", t, *extent);
434 ceph_abort();
20effc67 435 }
1e59de90 436 TRACET("rewriting extent -- {}", t, *extent);
20effc67
TL
437
438 auto lextent = extent->cast<LogicalCachedExtent>();
439 cache->retire_extent(t, extent);
1e59de90 440 auto nlextent = cache->alloc_new_extent_by_type(
20effc67
TL
441 t,
442 lextent->get_type(),
443 lextent->get_length(),
1e59de90
TL
444 lextent->get_user_hint(),
445 // get target rewrite generation
446 lextent->get_rewrite_generation())->cast<LogicalCachedExtent>();
20effc67
TL
447 lextent->get_bptr().copy_out(
448 0,
449 lextent->get_length(),
450 nlextent->get_bptr().c_str());
451 nlextent->set_laddr(lextent->get_laddr());
1e59de90 452 nlextent->set_modify_time(lextent->get_modify_time());
20effc67 453
1e59de90 454 DEBUGT("rewriting logical extent -- {} to {}", t, *lextent, *nlextent);
20effc67
TL
455
456 /* This update_mapping is, strictly speaking, unnecessary for delayed_alloc
457 * extents since we're going to do it again once we either do the ool write
1e59de90 458 * or allocate a relative inline addr. TODO: refactor AsyncCleaner to
20effc67
TL
459 * avoid this complication. */
460 return lba_manager->update_mapping(
461 t,
462 lextent->get_laddr(),
463 lextent->get_paddr(),
1e59de90
TL
464 nlextent->get_paddr(),
465 nlextent.get());
f67539c2
TL
466}
467
468TransactionManager::rewrite_extent_ret TransactionManager::rewrite_extent(
469 Transaction &t,
1e59de90
TL
470 CachedExtentRef extent,
471 rewrite_gen_t target_generation,
472 sea_time_point modify_time)
f67539c2 473{
20effc67 474 LOG_PREFIX(TransactionManager::rewrite_extent);
1e59de90 475
f67539c2 476 {
20effc67 477 auto updated = cache->update_extent_from_transaction(t, extent);
f67539c2 478 if (!updated) {
1e59de90 479 DEBUGT("extent is already retired, skipping -- {}", t, *extent);
20effc67 480 return rewrite_extent_iertr::now();
f67539c2
TL
481 }
482 extent = updated;
1e59de90
TL
483 ceph_assert(!extent->is_pending_io());
484 }
485
486 assert(extent->is_valid() && !extent->is_initial_pending());
487 if (extent->is_dirty()) {
488 extent->set_target_rewrite_generation(INIT_GENERATION);
489 } else {
490 extent->set_target_rewrite_generation(target_generation);
491 ceph_assert(modify_time != NULL_TIME);
492 extent->set_modify_time(modify_time);
493 }
494
495 t.get_rewrite_version_stats().increment(extent->get_version());
496
497 if (is_backref_node(extent->get_type())) {
498 DEBUGT("rewriting backref extent -- {}", t, *extent);
499 return backref_manager->rewrite_extent(t, extent);
f67539c2
TL
500 }
501
502 if (extent->get_type() == extent_types_t::ROOT) {
1e59de90 503 DEBUGT("rewriting root extent -- {}", t, *extent);
20effc67
TL
504 cache->duplicate_for_write(t, extent);
505 return rewrite_extent_iertr::now();
506 }
507
508 if (extent->is_logical()) {
509 return rewrite_logical_extent(t, extent->cast<LogicalCachedExtent>());
510 } else {
1e59de90 511 DEBUGT("rewriting physical extent -- {}", t, *extent);
20effc67 512 return lba_manager->rewrite_extent(t, extent);
f67539c2 513 }
f67539c2
TL
514}
515
1e59de90
TL
516TransactionManager::get_extents_if_live_ret
517TransactionManager::get_extents_if_live(
f67539c2
TL
518 Transaction &t,
519 extent_types_t type,
1e59de90 520 paddr_t paddr,
f67539c2 521 laddr_t laddr,
1e59de90 522 extent_len_t len)
f67539c2 523{
20effc67 524 LOG_PREFIX(TransactionManager::get_extent_if_live);
1e59de90
TL
525 TRACET("{} {}~{} {}", t, type, laddr, len, paddr);
526
527 // This only works with segments to check if alive,
528 // as parallel transactions may split the extent at the same time.
529 ceph_assert(paddr.get_addr_type() == paddr_types_t::SEGMENT);
530
531 return cache->get_extent_if_cached(t, paddr, type
532 ).si_then([=, this, &t](auto extent)
533 -> get_extents_if_live_ret {
534 if (extent && extent->get_length() == len) {
535 DEBUGT("{} {}~{} {} is live in cache -- {}",
536 t, type, laddr, len, paddr, *extent);
537 std::list<CachedExtentRef> res;
538 res.emplace_back(std::move(extent));
539 return get_extents_if_live_ret(
20effc67 540 interruptible::ready_future_marker{},
1e59de90 541 res);
20effc67
TL
542 }
543
544 if (is_logical_type(type)) {
1e59de90 545 return lba_manager->get_mappings(
20effc67 546 t,
1e59de90
TL
547 laddr,
548 len
549 ).si_then([=, this, &t](lba_pin_list_t pin_list) {
550 return seastar::do_with(
551 std::list<CachedExtentRef>(),
552 [=, this, &t, pin_list=std::move(pin_list)](
553 std::list<CachedExtentRef> &list) mutable
554 {
555 auto paddr_seg_id = paddr.as_seg_paddr().get_segment_id();
556 return trans_intr::parallel_for_each(
557 pin_list,
558 [=, this, &list, &t](
559 LBAMappingRef &pin) -> Cache::get_extent_iertr::future<>
560 {
561 auto pin_paddr = pin->get_val();
562 auto &pin_seg_paddr = pin_paddr.as_seg_paddr();
563 auto pin_paddr_seg_id = pin_seg_paddr.get_segment_id();
564 auto pin_len = pin->get_length();
565 if (pin_paddr_seg_id != paddr_seg_id) {
566 return seastar::now();
567 }
568 // Only extent split can happen during the lookup
569 ceph_assert(pin_seg_paddr >= paddr &&
570 pin_seg_paddr.add_offset(pin_len) <= paddr.add_offset(len));
571 return read_pin_by_type(t, std::move(pin), type
572 ).si_then([&list](auto ret) {
573 list.emplace_back(std::move(ret));
574 return seastar::now();
575 });
576 }).si_then([&list] {
577 return get_extents_if_live_ret(
578 interruptible::ready_future_marker{},
579 std::move(list));
580 });
581 });
582 }).handle_error_interruptible(crimson::ct_error::enoent::handle([] {
583 return get_extents_if_live_ret(
584 interruptible::ready_future_marker{},
585 std::list<CachedExtentRef>());
586 }), crimson::ct_error::pass_further_all{});
20effc67 587 } else {
20effc67
TL
588 return lba_manager->get_physical_extent_if_live(
589 t,
590 type,
1e59de90 591 paddr,
20effc67 592 laddr,
1e59de90
TL
593 len
594 ).si_then([=, &t](auto ret) {
595 std::list<CachedExtentRef> res;
596 if (ret) {
597 DEBUGT("{} {}~{} {} is live as physical extent -- {}",
598 t, type, laddr, len, paddr, *ret);
599 res.emplace_back(std::move(ret));
600 } else {
601 DEBUGT("{} {}~{} {} is not live as physical extent",
602 t, type, laddr, len, paddr);
603 }
604 return get_extents_if_live_ret(
605 interruptible::ready_future_marker{},
606 std::move(res));
607 });
20effc67
TL
608 }
609 });
f67539c2
TL
610}
611
612TransactionManager::~TransactionManager() {}
613
1e59de90
TL
614TransactionManagerRef make_transaction_manager(
615 Device *primary_device,
616 const std::vector<Device*> &secondary_devices,
617 bool is_test)
20effc67 618{
1e59de90
TL
619 auto epm = std::make_unique<ExtentPlacementManager>();
620 auto cache = std::make_unique<Cache>(*epm);
621 auto lba_manager = lba_manager::create_lba_manager(*cache);
622 auto sms = std::make_unique<SegmentManagerGroup>();
623 auto rbs = std::make_unique<RBMDeviceGroup>();
624 auto backref_manager = create_backref_manager(*cache);
625 SegmentManagerGroupRef cold_sms = nullptr;
626 std::vector<SegmentProvider*> segment_providers_by_id{DEVICE_ID_MAX, nullptr};
627
628 auto p_backend_type = primary_device->get_backend_type();
629
630 if (p_backend_type == backend_type_t::SEGMENTED) {
631 auto dtype = primary_device->get_device_type();
632 ceph_assert(dtype != device_type_t::HDD &&
633 dtype != device_type_t::EPHEMERAL_COLD);
634 sms->add_segment_manager(static_cast<SegmentManager*>(primary_device));
635 } else {
636 auto rbm = std::make_unique<BlockRBManager>(
637 static_cast<RBMDevice*>(primary_device), "", is_test);
638 rbs->add_rb_manager(std::move(rbm));
639 }
640
641 for (auto &p_dev : secondary_devices) {
642 if (p_dev->get_backend_type() == backend_type_t::SEGMENTED) {
643 if (p_dev->get_device_type() == primary_device->get_device_type()) {
644 sms->add_segment_manager(static_cast<SegmentManager*>(p_dev));
645 } else {
646 if (!cold_sms) {
647 cold_sms = std::make_unique<SegmentManagerGroup>();
648 }
649 cold_sms->add_segment_manager(static_cast<SegmentManager*>(p_dev));
650 }
651 } else {
652 auto rbm = std::make_unique<BlockRBManager>(
653 static_cast<RBMDevice*>(p_dev), "", is_test);
654 rbs->add_rb_manager(std::move(rbm));
655 }
656 }
657
658 auto journal_type = p_backend_type;
659 device_off_t roll_size;
660 device_off_t roll_start;
661 if (journal_type == journal_type_t::SEGMENTED) {
662 roll_size = static_cast<SegmentManager*>(primary_device)->get_segment_size();
663 roll_start = 0;
664 } else {
665 roll_size = static_cast<random_block_device::RBMDevice*>(primary_device)
666 ->get_journal_size() - primary_device->get_block_size();
667 // see CircularBoundedJournal::get_records_start()
668 roll_start = static_cast<random_block_device::RBMDevice*>(primary_device)
aee94f69 669 ->get_shard_journal_start() + primary_device->get_block_size();
1e59de90
TL
670 ceph_assert_always(roll_size <= DEVICE_OFF_MAX);
671 ceph_assert_always((std::size_t)roll_size + roll_start <=
672 primary_device->get_available_size());
673 }
674 ceph_assert(roll_size % primary_device->get_block_size() == 0);
675 ceph_assert(roll_start % primary_device->get_block_size() == 0);
676
677 bool cleaner_is_detailed;
678 SegmentCleaner::config_t cleaner_config;
679 JournalTrimmerImpl::config_t trimmer_config;
680 if (is_test) {
681 cleaner_is_detailed = true;
682 cleaner_config = SegmentCleaner::config_t::get_test();
683 trimmer_config = JournalTrimmerImpl::config_t::get_test(
684 roll_size, journal_type);
685 } else {
686 cleaner_is_detailed = false;
687 cleaner_config = SegmentCleaner::config_t::get_default();
688 trimmer_config = JournalTrimmerImpl::config_t::get_default(
689 roll_size, journal_type);
690 }
691
692 auto journal_trimmer = JournalTrimmerImpl::create(
693 *backref_manager, trimmer_config,
694 journal_type, roll_start, roll_size);
695
696 AsyncCleanerRef cleaner;
697 JournalRef journal;
698
699 SegmentCleanerRef cold_segment_cleaner = nullptr;
700
701 if (cold_sms) {
702 cold_segment_cleaner = SegmentCleaner::create(
703 cleaner_config,
704 std::move(cold_sms),
705 *backref_manager,
706 epm->get_ool_segment_seq_allocator(),
707 cleaner_is_detailed,
708 /* is_cold = */ true);
709 if (journal_type == journal_type_t::SEGMENTED) {
710 for (auto id : cold_segment_cleaner->get_device_ids()) {
711 segment_providers_by_id[id] =
712 static_cast<SegmentProvider*>(cold_segment_cleaner.get());
713 }
714 }
715 }
716
717 if (journal_type == journal_type_t::SEGMENTED) {
718 cleaner = SegmentCleaner::create(
719 cleaner_config,
720 std::move(sms),
721 *backref_manager,
722 epm->get_ool_segment_seq_allocator(),
723 cleaner_is_detailed);
724 auto segment_cleaner = static_cast<SegmentCleaner*>(cleaner.get());
725 for (auto id : segment_cleaner->get_device_ids()) {
726 segment_providers_by_id[id] =
727 static_cast<SegmentProvider*>(segment_cleaner);
728 }
729 segment_cleaner->set_journal_trimmer(*journal_trimmer);
730 journal = journal::make_segmented(
731 *segment_cleaner,
732 *journal_trimmer);
733 } else {
734 cleaner = RBMCleaner::create(
735 std::move(rbs),
736 *backref_manager,
737 cleaner_is_detailed);
738 journal = journal::make_circularbounded(
739 *journal_trimmer,
740 static_cast<random_block_device::RBMDevice*>(primary_device),
741 "");
742 }
743
744 cache->set_segment_providers(std::move(segment_providers_by_id));
745
746 epm->init(std::move(journal_trimmer),
747 std::move(cleaner),
748 std::move(cold_segment_cleaner));
749 epm->set_primary_device(primary_device);
750
751 return std::make_unique<TransactionManager>(
752 std::move(journal),
753 std::move(cache),
754 std::move(lba_manager),
755 std::move(epm),
756 std::move(backref_manager));
20effc67
TL
757}
758
f67539c2 759}