1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
10 #include "include/buffer.h"
11 #include "include/byteorder.h"
13 #include "crimson/os/seastore/lba_manager/btree/lba_btree_node_impl.h"
16 seastar::logger
& logger() {
17 return crimson::get_logger(ceph_subsys_filestore
);
21 namespace crimson::os::seastore::lba_manager::btree
{
23 std::ostream
&LBAInternalNode::print_detail(std::ostream
&out
) const
25 return out
<< ", size=" << get_size()
26 << ", meta=" << get_meta();
29 LBAInternalNode::lookup_ret
LBAInternalNode::lookup(
34 auto meta
= get_meta();
35 if (depth
== get_meta().depth
) {
37 lookup_ertr::ready_future_marker
{},
40 assert(meta
.begin
<= addr
);
41 assert(meta
.end
> addr
);
42 auto iter
= lower_bound(addr
);
43 return get_lba_btree_extent(
47 get_paddr()).safe_then([c
, addr
, depth
](auto child
) {
48 return child
->lookup(c
, addr
, depth
);
49 }).finally([ref
=LBANodeRef(this)] {});
52 LBAInternalNode::lookup_range_ret
LBAInternalNode::lookup_range(
57 auto [begin
, end
] = bound(addr
, addr
+ len
);
58 auto result_up
= std::make_unique
<lba_pin_list_t
>();
59 auto &result
= *result_up
;
60 return crimson::do_for_each(
63 [this, c
, &result
, addr
, len
](const auto &val
) mutable {
64 return get_lba_btree_extent(
68 get_paddr()).safe_then(
69 [c
, &result
, addr
, len
](auto extent
) mutable {
70 return extent
->lookup_range(
74 [&result
](auto pin_list
) mutable {
75 result
.splice(result
.end(), pin_list
,
76 pin_list
.begin(), pin_list
.end());
79 }).safe_then([result
=std::move(result_up
), ref
=LBANodeRef(this)] {
80 return lookup_range_ertr::make_ready_future
<lba_pin_list_t
>(
85 LBAInternalNode::insert_ret
LBAInternalNode::insert(
90 auto insertion_pt
= get_containing_child(laddr
);
91 return get_lba_btree_extent(
94 insertion_pt
->get_val(),
95 get_paddr()).safe_then(
96 [this, insertion_pt
, c
, laddr
, val
=std::move(val
)](
97 auto extent
) mutable {
98 return extent
->at_max_capacity() ?
99 split_entry(c
, laddr
, insertion_pt
, extent
) :
100 insert_ertr::make_ready_future
<LBANodeRef
>(std::move(extent
));
101 }).safe_then([c
, laddr
, val
=std::move(val
)](
102 LBANodeRef extent
) mutable {
103 return extent
->insert(c
, laddr
, val
);
107 LBAInternalNode::mutate_mapping_ret
LBAInternalNode::mutate_mapping(
112 return mutate_mapping_internal(c
, laddr
, true, std::move(f
));
115 LBAInternalNode::mutate_mapping_ret
LBAInternalNode::mutate_mapping_internal(
121 auto mutation_pt
= get_containing_child(laddr
);
122 if (mutation_pt
== end()) {
123 assert(0 == "impossible");
124 return crimson::ct_error::enoent::make();
126 return get_lba_btree_extent(
128 get_meta().depth
- 1,
129 mutation_pt
->get_val(),
131 ).safe_then([=](LBANodeRef extent
) {
132 if (extent
->at_min_capacity() && get_size() > 1) {
140 return merge_ertr::make_ready_future
<LBANodeRef
>(
143 }).safe_then([c
, laddr
, f
=std::move(f
)](LBANodeRef extent
) mutable {
144 return extent
->mutate_mapping_internal(c
, laddr
, false, std::move(f
));
148 LBAInternalNode::mutate_internal_address_ret
LBAInternalNode::mutate_internal_address(
154 if (get_meta().depth
== (depth
+ 1)) {
156 return c
.cache
.duplicate_for_write(c
.trans
, this)->cast
<LBAInternalNode
>(
157 )->mutate_internal_address(
163 auto iter
= get_containing_child(laddr
);
164 if (iter
->get_key() != laddr
) {
165 return crimson::ct_error::enoent::make();
168 auto old_paddr
= iter
->get_val();
172 maybe_generate_relative(paddr
),
173 maybe_get_delta_buffer());
175 return mutate_internal_address_ret(
176 mutate_internal_address_ertr::ready_future_marker
{},
180 auto iter
= get_containing_child(laddr
);
181 return get_lba_btree_extent(
183 get_meta().depth
- 1,
186 ).safe_then([=](auto node
) {
187 return node
->mutate_internal_address(
196 LBAInternalNode::find_hole_ret
LBAInternalNode::find_hole(
203 "LBAInternalNode::find_hole min={}, max={}, len={}, *this={}",
204 min_addr
, max_addr
, len
, *this);
205 auto [begin
, end
] = bound(min_addr
, max_addr
);
206 return seastar::repeat_until_value(
207 [i
=begin
, e
=end
, c
, min_addr
, len
, this]() mutable {
209 return seastar::make_ready_future
<std::optional
<laddr_t
>>(
210 std::make_optional
<laddr_t
>(L_ADDR_NULL
));
212 return get_lba_btree_extent(c
,
213 get_meta().depth
- 1,
215 get_paddr()).safe_then(
216 [c
, min_addr
, len
, i
](auto extent
) mutable {
217 auto lb
= std::max(min_addr
, i
->get_key());
218 auto ub
= i
->get_next_key_or_max();
219 logger().debug("LBAInternalNode::find_hole extent {} lb {} ub {}",
221 return extent
->find_hole(c
, lb
, ub
, len
);
222 }).safe_then([&i
](auto addr
) mutable -> std::optional
<laddr_t
> {
223 if (addr
== L_ADDR_NULL
) {
230 // TODO: GCC enters a dead loop if crimson::do_until() is used
231 // or erroratorized future is returned
232 crimson::ct_error::assert_all
{ "fix me - APIv6" });
236 LBAInternalNode::scan_mappings_ret
LBAInternalNode::scan_mappings(
240 scan_mappings_func_t
&f
)
242 auto [biter
, eiter
] = bound(begin
, end
);
243 return crimson::do_for_each(
246 [=, &f
](auto &viter
) {
247 return get_lba_btree_extent(
249 get_meta().depth
- 1,
251 get_paddr()).safe_then([=, &f
](auto child
) {
252 return child
->scan_mappings(c
, begin
, end
, f
);
254 }).safe_then([ref
=LBANodeRef(this)]{});
257 LBAInternalNode::scan_mapped_space_ret
LBAInternalNode::scan_mapped_space(
259 scan_mapped_space_func_t
&f
)
261 f(get_paddr(), get_length());
262 return crimson::do_for_each(
264 [=, &f
](auto &viter
) {
265 return get_lba_btree_extent(
267 get_meta().depth
- 1,
269 get_paddr()).safe_then([=, &f
](auto child
) {
270 return child
->scan_mapped_space(c
, f
);
272 }).safe_then([ref
=LBANodeRef(this)]{});
276 void LBAInternalNode::resolve_relative_addrs(paddr_t base
)
278 for (auto i
: *this) {
279 if (i
->get_val().is_relative()) {
280 auto updated
= base
.add_relative(i
->get_val());
282 "LBAInternalNode::resolve_relative_addrs {} -> {}",
291 LBAInternalNode::split_ret
292 LBAInternalNode::split_entry(
295 internal_iterator_t iter
, LBANodeRef entry
)
298 auto mut
= c
.cache
.duplicate_for_write(
299 c
.trans
, this)->cast
<LBAInternalNode
>();
300 auto mut_iter
= mut
->iter_idx(iter
->get_offset());
301 return mut
->split_entry(c
, addr
, mut_iter
, entry
);
304 ceph_assert(!at_max_capacity());
305 auto [left
, right
, pivot
] = entry
->make_split_children(c
);
309 maybe_generate_relative(left
->get_paddr()),
310 maybe_get_delta_buffer());
314 maybe_generate_relative(right
->get_paddr()),
315 maybe_get_delta_buffer());
317 c
.cache
.retire_extent(c
.trans
, entry
);
320 "LBAInternalNode::split_entry *this {} entry {} into left {} right {}",
326 return split_ertr::make_ready_future
<LBANodeRef
>(
327 pivot
> addr
? left
: right
331 LBAInternalNode::merge_ret
332 LBAInternalNode::merge_entry(
335 internal_iterator_t iter
,
340 auto mut
= c
.cache
.duplicate_for_write(c
.trans
, this)->cast
<LBAInternalNode
>();
341 auto mut_iter
= mut
->iter_idx(iter
->get_offset());
342 return mut
->merge_entry(c
, addr
, mut_iter
, entry
, is_root
);
346 "LBAInternalNode: merge_entry: {}, {}",
349 auto donor_is_left
= (iter
+ 1) == end();
350 auto donor_iter
= donor_is_left
? iter
- 1 : iter
+ 1;
351 return get_lba_btree_extent(
353 get_meta().depth
- 1,
354 donor_iter
->get_val(),
356 ).safe_then([=](auto donor
) mutable {
357 auto [l
, r
] = donor_is_left
?
358 std::make_pair(donor
, entry
) : std::make_pair(entry
, donor
);
359 auto [liter
, riter
] = donor_is_left
?
360 std::make_pair(donor_iter
, iter
) : std::make_pair(iter
, donor_iter
);
361 if (donor
->at_min_capacity()) {
362 auto replacement
= l
->make_full_merge(
368 maybe_generate_relative(replacement
->get_paddr()),
369 maybe_get_delta_buffer());
370 journal_remove(riter
, maybe_get_delta_buffer());
372 c
.cache
.retire_extent(c
.trans
, l
);
373 c
.cache
.retire_extent(c
.trans
, r
);
375 if (is_root
&& get_size() == 1) {
376 return c
.cache
.get_root(c
.trans
).safe_then([=](RootBlockRef croot
) {
378 auto mut_croot
= c
.cache
.duplicate_for_write(c
.trans
, croot
);
379 croot
= mut_croot
->cast
<RootBlock
>();
381 croot
->root
.lba_root_addr
= begin()->get_val();
383 "LBAInternalNode::merge_entry: collapsing root {} to addr {}",
386 croot
->root
.lba_depth
= get_meta().depth
- 1;
387 c
.cache
.retire_extent(c
.trans
, this);
388 return merge_ertr::make_ready_future
<LBANodeRef
>(replacement
);
391 return merge_ertr::make_ready_future
<LBANodeRef
>(replacement
);
395 "LBAInternalEntry::merge_entry balanced l {} r {}",
398 auto [replacement_l
, replacement_r
, pivot
] =
406 maybe_generate_relative(replacement_l
->get_paddr()),
407 maybe_get_delta_buffer());
411 maybe_generate_relative(replacement_r
->get_paddr()),
412 maybe_get_delta_buffer());
414 c
.cache
.retire_extent(c
.trans
, l
);
415 c
.cache
.retire_extent(c
.trans
, r
);
416 return merge_ertr::make_ready_future
<LBANodeRef
>(
417 addr
>= pivot
? replacement_r
: replacement_l
424 LBAInternalNode::internal_iterator_t
425 LBAInternalNode::get_containing_child(laddr_t laddr
)
427 // TODO: binary search
428 for (auto i
= begin(); i
!= end(); ++i
) {
429 if (i
.contains(laddr
))
432 ceph_assert(0 == "invalid");
436 std::ostream
&LBALeafNode::print_detail(std::ostream
&out
) const
438 return out
<< ", size=" << get_size()
439 << ", meta=" << get_meta();
442 LBALeafNode::lookup_range_ret
LBALeafNode::lookup_range(
448 "LBALeafNode::lookup_range {}~{}",
451 auto ret
= lba_pin_list_t();
452 auto [i
, end
] = get_leaf_entries(addr
, len
);
453 for (; i
!= end
; ++i
) {
454 auto val
= i
->get_val();
455 auto begin
= i
->get_key();
457 std::make_unique
<BtreeLBAPin
>(
459 val
.paddr
.maybe_relative_to(get_paddr()),
460 lba_node_meta_t
{ begin
, begin
+ val
.len
, 0}));
462 return lookup_range_ertr::make_ready_future
<lba_pin_list_t
>(
466 LBALeafNode::insert_ret
LBALeafNode::insert(
471 ceph_assert(!at_max_capacity());
474 return c
.cache
.duplicate_for_write(c
.trans
, this
475 )->cast
<LBALeafNode
>()->insert(c
, laddr
, val
);
478 val
.paddr
= maybe_generate_relative(val
.paddr
);
480 "LBALeafNode::insert: inserting {}~{} -> {}",
485 auto insert_pt
= lower_bound(laddr
);
486 journal_insert(insert_pt
, laddr
, val
, maybe_get_delta_buffer());
489 "LBALeafNode::insert: inserted {}~{} -> {}",
491 insert_pt
.get_val().len
,
492 insert_pt
.get_val().paddr
);
493 auto begin
= insert_pt
.get_key();
495 insert_ertr::ready_future_marker
{},
496 std::make_unique
<BtreeLBAPin
>(
498 val
.paddr
.maybe_relative_to(get_paddr()),
499 lba_node_meta_t
{ begin
, begin
+ val
.len
, 0}));
502 LBALeafNode::mutate_mapping_ret
LBALeafNode::mutate_mapping(
507 return mutate_mapping_internal(c
, laddr
, true, std::move(f
));
510 LBALeafNode::mutate_mapping_ret
LBALeafNode::mutate_mapping_internal(
516 auto mutation_pt
= find(laddr
);
517 if (mutation_pt
== end()) {
518 return crimson::ct_error::enoent::make();
522 return c
.cache
.duplicate_for_write(c
.trans
, this)->cast
<LBALeafNode
>(
523 )->mutate_mapping_internal(
530 auto cur
= mutation_pt
.get_val();
531 auto mutated
= f(cur
);
533 mutated
.paddr
= maybe_generate_relative(mutated
.paddr
);
536 "{}: mutate addr {}: {} -> {}",
542 if (mutated
.refcount
> 0) {
543 journal_update(mutation_pt
, mutated
, maybe_get_delta_buffer());
544 return mutate_mapping_ret(
545 mutate_mapping_ertr::ready_future_marker
{},
548 journal_remove(mutation_pt
, maybe_get_delta_buffer());
549 return mutate_mapping_ret(
550 mutate_mapping_ertr::ready_future_marker
{},
555 LBALeafNode::mutate_internal_address_ret
LBALeafNode::mutate_internal_address(
561 ceph_assert(0 == "Impossible");
562 return mutate_internal_address_ret(
563 mutate_internal_address_ertr::ready_future_marker
{},
567 LBALeafNode::find_hole_ret
LBALeafNode::find_hole(
574 "LBALeafNode::find_hole min={} max={}, len={}, *this={}",
575 min
, max
, len
, *this);
576 auto [liter
, uiter
] = bound(min
, max
);
577 for (auto i
= liter
; i
!= uiter
; ++i
) {
578 auto ub
= i
->get_key();
579 if (min
+ len
<= ub
) {
580 return find_hole_ret(
581 find_hole_ertr::ready_future_marker
{},
584 min
= i
->get_key() + i
->get_val().len
;
587 if (min
+ len
<= max
) {
588 return find_hole_ret(
589 find_hole_ertr::ready_future_marker
{},
592 return find_hole_ret(
593 find_hole_ertr::ready_future_marker
{},
598 LBALeafNode::scan_mappings_ret
LBALeafNode::scan_mappings(
602 scan_mappings_func_t
&f
)
604 auto [biter
, eiter
] = bound(begin
, end
);
605 for (auto i
= biter
; i
!= eiter
; ++i
) {
606 auto val
= i
->get_val();
607 f(i
->get_key(), val
.paddr
, val
.len
);
609 return scan_mappings_ertr::now();
612 LBALeafNode::scan_mapped_space_ret
LBALeafNode::scan_mapped_space(
614 scan_mapped_space_func_t
&f
)
616 f(get_paddr(), get_length());
617 for (auto i
= begin(); i
!= end(); ++i
) {
618 auto val
= i
->get_val();
619 f(val
.paddr
, val
.len
);
621 return scan_mappings_ertr::now();
625 void LBALeafNode::resolve_relative_addrs(paddr_t base
)
627 for (auto i
: *this) {
628 if (i
->get_val().paddr
.is_relative()) {
629 auto val
= i
->get_val();
630 val
.paddr
= base
.add_relative(val
.paddr
);
632 "LBALeafNode::resolve_relative_addrs {} -> {}",
640 std::pair
<LBALeafNode::internal_iterator_t
, LBALeafNode::internal_iterator_t
>
641 LBALeafNode::get_leaf_entries(laddr_t addr
, extent_len_t len
)
643 return bound(addr
, addr
+ len
);
646 Cache::get_extent_ertr::future
<LBANodeRef
> get_lba_btree_extent(
652 offset
= offset
.maybe_relative_to(base
);
653 ceph_assert(depth
> 0);
656 "get_lba_btree_extent: reading internal at offset {}, depth {}",
659 return c
.cache
.get_extent
<LBAInternalNode
>(
662 LBA_BLOCK_SIZE
).safe_then([c
](auto ret
) {
663 auto meta
= ret
->get_meta();
664 if (ret
->get_size()) {
665 ceph_assert(meta
.begin
<= ret
->begin()->get_key());
666 ceph_assert(meta
.end
> (ret
->end() - 1)->get_key());
668 if (!ret
->is_pending() && !ret
->pin
.is_linked()) {
669 ret
->pin
.set_range(meta
);
670 c
.pins
.add_pin(ret
->pin
);
672 return LBANodeRef(ret
.detach(), /* add_ref = */ false);
676 "get_lba_btree_extent: reading leaf at offset {}, depth {}",
679 return c
.cache
.get_extent
<LBALeafNode
>(
682 LBA_BLOCK_SIZE
).safe_then([offset
, c
](auto ret
) {
684 "get_lba_btree_extent: read leaf at offset {} {}",
687 auto meta
= ret
->get_meta();
688 if (ret
->get_size()) {
689 ceph_assert(meta
.begin
<= ret
->begin()->get_key());
690 ceph_assert(meta
.end
> (ret
->end() - 1)->get_key());
692 if (!ret
->is_pending() && !ret
->pin
.is_linked()) {
693 ret
->pin
.set_range(meta
);
694 c
.pins
.add_pin(ret
->pin
);
696 return LBANodeRef(ret
.detach(), /* add_ref = */ false);