]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/onode_manager/staged-fltree/node.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / os / seastore / onode_manager / staged-fltree / node.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "node.h"
5
6 #include <cassert>
7 #include <exception>
8 #include <sstream>
9
10 #include "common/likely.h"
11 #include "crimson/common/utility.h"
12 #include "crimson/os/seastore/logging.h"
13
14 #include "node_extent_manager.h"
15 #include "node_impl.h"
16 #include "stages/node_stage_layout.h"
17
18 SET_SUBSYS(seastore_onode);
19
20 namespace fmt {
21 template <typename T>
22 const void* ptr(const ::boost::intrusive_ptr<T>& p) {
23 return p.get();
24 }
25 }
26
27 namespace crimson::os::seastore::onode {
28 /*
29 * tree_cursor_t
30 */
31
32 // create from insert
33 tree_cursor_t::tree_cursor_t(Ref<LeafNode> node, const search_position_t& pos)
34 : ref_leaf_node{node}, position{pos}, cache{ref_leaf_node}
35 {
36 assert(is_tracked());
37 ref_leaf_node->do_track_cursor<true>(*this);
38 // do not account updates for the inserted values
39 is_mutated = true;
40 }
41
42 // create from lookup
43 tree_cursor_t::tree_cursor_t(
44 Ref<LeafNode> node, const search_position_t& pos,
45 const key_view_t& key_view, const value_header_t* p_value_header)
46 : ref_leaf_node{node}, position{pos}, cache{ref_leaf_node}
47 {
48 assert(is_tracked());
49 update_cache_same_node(key_view, p_value_header);
50 ref_leaf_node->do_track_cursor<true>(*this);
51 }
52
53 // lookup reaches the end, contain leaf node for further insert
54 tree_cursor_t::tree_cursor_t(Ref<LeafNode> node)
55 : ref_leaf_node{node}, position{search_position_t::end()}, cache{ref_leaf_node}
56 {
57 assert(is_end());
58 assert(ref_leaf_node->is_level_tail());
59 }
60
61 // create an invalid tree_cursor_t
62 tree_cursor_t::~tree_cursor_t()
63 {
64 if (is_tracked()) {
65 ref_leaf_node->do_untrack_cursor(*this);
66 }
67 }
68
69 eagain_ifuture<Ref<tree_cursor_t>>
70 tree_cursor_t::get_next(context_t c)
71 {
72 assert(is_tracked());
73 return ref_leaf_node->get_next_cursor(c, position);
74 }
75
76 void tree_cursor_t::assert_next_to(
77 const tree_cursor_t& prv, value_magic_t magic) const
78 {
79 #ifndef NDEBUG
80 assert(!prv.is_end());
81 if (is_end()) {
82 assert(ref_leaf_node == prv.ref_leaf_node);
83 assert(ref_leaf_node->is_level_tail());
84 } else if (is_tracked()) {
85 auto key = get_key_view(magic);
86 auto prv_key = prv.get_key_view(magic);
87 assert(key > prv_key);
88 if (ref_leaf_node == prv.ref_leaf_node) {
89 position.assert_next_to(prv.position);
90 } else {
91 assert(!prv.ref_leaf_node->is_level_tail());
92 assert(position == search_position_t::begin());
93 }
94 } else {
95 assert(is_invalid());
96 ceph_abort("impossible");
97 }
98 #endif
99 }
100
101 template <bool FORCE_MERGE>
102 eagain_ifuture<Ref<tree_cursor_t>>
103 tree_cursor_t::erase(context_t c, bool get_next)
104 {
105 assert(is_tracked());
106 return ref_leaf_node->erase<FORCE_MERGE>(c, position, get_next);
107 }
108 template eagain_ifuture<Ref<tree_cursor_t>>
109 tree_cursor_t::erase<true>(context_t, bool);
110 template eagain_ifuture<Ref<tree_cursor_t>>
111 tree_cursor_t::erase<false>(context_t, bool);
112
113 std::strong_ordering tree_cursor_t::compare_to(
114 const tree_cursor_t& o, value_magic_t magic) const
115 {
116 if (!is_tracked() && !o.is_tracked()) {
117 return std::strong_ordering::equal;
118 } else if (!is_tracked()) {
119 return std::strong_ordering::greater;
120 } else if (!o.is_tracked()) {
121 return std::strong_ordering::less;
122 }
123
124 assert(is_tracked() && o.is_tracked());
125 // all tracked cursors are singletons
126 if (this == &o) {
127 return std::strong_ordering::equal;
128 }
129
130 std::strong_ordering ret = std::strong_ordering::equal;
131 if (ref_leaf_node == o.ref_leaf_node) {
132 ret = position <=> o.position;
133 } else {
134 auto key = get_key_view(magic);
135 auto o_key = o.get_key_view(magic);
136 ret = key <=> o_key;
137 }
138 assert(ret != 0);
139 return ret;
140 }
141
142 eagain_ifuture<>
143 tree_cursor_t::extend_value(context_t c, value_size_t extend_size)
144 {
145 assert(is_tracked());
146 return ref_leaf_node->extend_value(c, position, extend_size);
147 }
148
149 eagain_ifuture<>
150 tree_cursor_t::trim_value(context_t c, value_size_t trim_size)
151 {
152 assert(is_tracked());
153 return ref_leaf_node->trim_value(c, position, trim_size);
154 }
155
156 template <bool VALIDATE>
157 void tree_cursor_t::update_track(
158 Ref<LeafNode> node, const search_position_t& pos)
159 {
160 // I must be already untracked
161 assert(is_tracked());
162 assert(!ref_leaf_node->check_is_tracking(*this));
163 // track the new node and new pos
164 assert(!pos.is_end());
165 ref_leaf_node = node;
166 position = pos;
167 // we lazy update the key/value information until user asked
168 cache.invalidate();
169 ref_leaf_node->do_track_cursor<VALIDATE>(*this);
170 }
171 template void tree_cursor_t::update_track<true>(Ref<LeafNode>, const search_position_t&);
172 template void tree_cursor_t::update_track<false>(Ref<LeafNode>, const search_position_t&);
173
174 void tree_cursor_t::update_cache_same_node(const key_view_t& key_view,
175 const value_header_t* p_value_header) const
176 {
177 assert(is_tracked());
178 cache.update_all(ref_leaf_node->get_version(), key_view, p_value_header);
179 cache.validate_is_latest(position);
180 }
181
182 void tree_cursor_t::invalidate()
183 {
184 assert(is_tracked());
185 ref_leaf_node.reset();
186 assert(is_invalid());
187 // I must be removed from LeafNode
188 }
189
190 /*
191 * tree_cursor_t::Cache
192 */
193
194 tree_cursor_t::Cache::Cache(Ref<LeafNode>& ref_leaf_node)
195 : ref_leaf_node{ref_leaf_node} {}
196
197 void tree_cursor_t::Cache::update_all(const node_version_t& current_version,
198 const key_view_t& _key_view,
199 const value_header_t* _p_value_header)
200 {
201 assert(_p_value_header);
202
203 needs_update_all = false;
204 version = current_version;
205
206 p_node_base = ref_leaf_node->read();
207 key_view = _key_view;
208 p_value_header = _p_value_header;
209 assert((const char*)p_value_header > p_node_base);
210 assert((const char*)p_value_header - p_node_base <
211 (int)ref_leaf_node->get_node_size());
212
213 value_payload_mut.reset();
214 p_value_recorder = nullptr;
215 }
216
217 void tree_cursor_t::Cache::maybe_duplicate(const node_version_t& current_version)
218 {
219 assert(!needs_update_all);
220 assert(version.layout == current_version.layout);
221 if (version.state == current_version.state) {
222 // cache is already latest.
223 } else if (version.state < current_version.state) {
224 // the extent has been copied but the layout has not been changed.
225 assert(p_node_base != nullptr);
226 assert(key_view.has_value());
227 assert(p_value_header != nullptr);
228
229 auto current_p_node_base = ref_leaf_node->read();
230 assert(current_p_node_base != p_node_base);
231 auto node_size = ref_leaf_node->get_node_size();
232
233 version.state = current_version.state;
234 reset_ptr(p_value_header, p_node_base,
235 current_p_node_base, node_size);
236 key_view->reset_to(p_node_base, current_p_node_base, node_size);
237 value_payload_mut.reset();
238 p_value_recorder = nullptr;
239
240 p_node_base = current_p_node_base;
241 } else {
242 // It is impossible to change state backwards, see node_types.h.
243 ceph_abort("impossible");
244 }
245 }
246
247 void tree_cursor_t::Cache::make_latest(
248 value_magic_t magic, const search_position_t& pos)
249 {
250 auto current_version = ref_leaf_node->get_version();
251 if (needs_update_all || version.layout != current_version.layout) {
252 auto [_key_view, _p_value_header] = ref_leaf_node->get_kv(pos);
253 update_all(current_version, _key_view, _p_value_header);
254 } else {
255 maybe_duplicate(current_version);
256 }
257 assert(p_value_header->magic == magic);
258 validate_is_latest(pos);
259 }
260
261 void tree_cursor_t::Cache::validate_is_latest(const search_position_t& pos) const
262 {
263 #ifndef NDEBUG
264 assert(!needs_update_all);
265 assert(version == ref_leaf_node->get_version());
266
267 auto [_key_view, _p_value_header] = ref_leaf_node->get_kv(pos);
268 assert(p_node_base == ref_leaf_node->read());
269 assert(key_view ==_key_view);
270 assert(p_value_header == _p_value_header);
271 #endif
272 }
273
274 std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
275 tree_cursor_t::Cache::prepare_mutate_value_payload(
276 context_t c, const search_position_t& pos)
277 {
278 make_latest(c.vb.get_header_magic(), pos);
279 if (!value_payload_mut.has_value()) {
280 assert(!p_value_recorder);
281 auto value_mutable = ref_leaf_node->prepare_mutate_value_payload(c);
282 auto current_version = ref_leaf_node->get_version();
283 maybe_duplicate(current_version);
284 value_payload_mut = p_value_header->get_payload_mutable(value_mutable.first);
285 p_value_recorder = value_mutable.second;
286 validate_is_latest(pos);
287 }
288 return {*value_payload_mut, p_value_recorder};
289 }
290
291 /*
292 * Node
293 */
294
295 Node::Node(NodeImplURef&& impl) : impl{std::move(impl)} {}
296
297 Node::~Node()
298 {
299 if (!is_tracked()) {
300 // possible scenarios:
301 // a. I'm erased;
302 // b. Eagain happened after the node extent is allocated/loaded
303 // and before the node is initialized correctly;
304 } else {
305 assert(!impl->is_extent_retired());
306 if (is_root()) {
307 super->do_untrack_root(*this);
308 } else {
309 _parent_info->ptr->do_untrack_child(*this);
310 }
311 }
312 }
313
314 level_t Node::level() const
315 {
316 return impl->level();
317 }
318
319 eagain_ifuture<Node::search_result_t> Node::lower_bound(
320 context_t c, const key_hobj_t& key)
321 {
322 return seastar::do_with(
323 MatchHistory(), [this, c, &key](auto& history) {
324 return lower_bound_tracked(c, key, history);
325 }
326 );
327 }
328
329 eagain_ifuture<std::pair<Ref<tree_cursor_t>, bool>> Node::insert(
330 context_t c,
331 const key_hobj_t& key,
332 value_config_t vconf,
333 Ref<Node>&& this_ref)
334 {
335 return seastar::do_with(
336 MatchHistory(), [this, c, &key, vconf,
337 this_ref = std::move(this_ref)] (auto& history) mutable {
338 return lower_bound_tracked(c, key, history
339 ).si_then([c, &key, vconf, &history,
340 this_ref = std::move(this_ref)] (auto result) mutable {
341 // the cursor in the result should already hold the root node upwards
342 this_ref.reset();
343 if (result.match() == MatchKindBS::EQ) {
344 return eagain_iertr::make_ready_future<std::pair<Ref<tree_cursor_t>, bool>>(
345 std::make_pair(result.p_cursor, false));
346 } else {
347 auto leaf_node = result.p_cursor->get_leaf_node();
348 return leaf_node->insert_value(
349 c, key, vconf, result.p_cursor->get_position(), history, result.mstat
350 ).si_then([](auto p_cursor) {
351 return seastar::make_ready_future<std::pair<Ref<tree_cursor_t>, bool>>(
352 std::make_pair(p_cursor, true));
353 });
354 }
355 });
356 }
357 );
358 }
359
360 eagain_ifuture<std::size_t> Node::erase(
361 context_t c,
362 const key_hobj_t& key,
363 Ref<Node>&& this_ref)
364 {
365 return lower_bound(c, key
366 ).si_then([c, this_ref = std::move(this_ref)] (auto result) mutable {
367 // the cursor in the result should already hold the root node upwards
368 this_ref.reset();
369 if (result.match() != MatchKindBS::EQ) {
370 return eagain_iertr::make_ready_future<std::size_t>(0);
371 }
372 auto ref_cursor = result.p_cursor;
373 return ref_cursor->erase(c, false
374 ).si_then([ref_cursor] (auto next_cursor) {
375 assert(ref_cursor->is_invalid());
376 assert(!next_cursor);
377 return std::size_t(1);
378 });
379 });
380 }
381
382 eagain_ifuture<tree_stats_t> Node::get_tree_stats(context_t c)
383 {
384 return seastar::do_with(
385 tree_stats_t(), [this, c](auto& stats) {
386 return do_get_tree_stats(c, stats).si_then([&stats] {
387 return stats;
388 });
389 }
390 );
391 }
392
393 std::ostream& Node::dump(std::ostream& os) const
394 {
395 return impl->dump(os);
396 }
397
398 std::ostream& Node::dump_brief(std::ostream& os) const
399 {
400 return impl->dump_brief(os);
401 }
402
403 const std::string& Node::get_name() const
404 {
405 return impl->get_name();
406 }
407
408 void Node::test_make_destructable(
409 context_t c, NodeExtentMutable& mut, Super::URef&& _super)
410 {
411 impl->test_set_tail(mut);
412 make_root(c, std::move(_super));
413 }
414
415 eagain_ifuture<> Node::mkfs(context_t c, RootNodeTracker& root_tracker)
416 {
417 LOG_PREFIX(OTree::Node::mkfs);
418 return LeafNode::allocate_root(c, root_tracker
419 ).si_then([c, FNAME](auto ret) {
420 c.t.get_onode_tree_stats().extents_num_delta++;
421 INFOT("allocated root {}", c.t, ret->get_name());
422 });
423 }
424
425 eagain_ifuture<Ref<Node>> Node::load_root(context_t c, RootNodeTracker& root_tracker)
426 {
427 LOG_PREFIX(OTree::Node::load_root);
428 return c.nm.get_super(c.t, root_tracker
429 ).handle_error_interruptible(
430 eagain_iertr::pass_further{},
431 crimson::ct_error::input_output_error::handle([FNAME, c] {
432 ERRORT("EIO during get_super()", c.t);
433 ceph_abort("fatal error");
434 })
435 ).si_then([c, &root_tracker, FNAME](auto&& _super) {
436 assert(_super);
437 auto root_addr = _super->get_root_laddr();
438 assert(root_addr != L_ADDR_NULL);
439 TRACET("loading root_addr={:x} ...", c.t, root_addr);
440 return Node::load(c, root_addr, true
441 ).si_then([c, _super = std::move(_super),
442 &root_tracker, FNAME](auto root) mutable {
443 TRACET("loaded {}", c.t, root->get_name());
444 assert(root->impl->field_type() == field_type_t::N0);
445 root->as_root(std::move(_super));
446 std::ignore = c; // as only used in an assert
447 std::ignore = root_tracker;
448 assert(root == root_tracker.get_root(c.t));
449 return seastar::make_ready_future<Ref<Node>>(root);
450 });
451 });
452 }
453
454 void Node::make_root(context_t c, Super::URef&& _super)
455 {
456 _super->write_root_laddr(c, impl->laddr());
457 as_root(std::move(_super));
458 c.t.get_onode_tree_stats().depth = static_cast<uint64_t>(level()) + 1;
459 }
460
461 void Node::as_root(Super::URef&& _super)
462 {
463 assert(!is_tracked());
464 assert(_super->get_root_laddr() == impl->laddr());
465 assert(impl->is_level_tail());
466 super = std::move(_super);
467 super->do_track_root(*this);
468 assert(is_root());
469 }
470
471 Super::URef Node::deref_super()
472 {
473 assert(is_root());
474 assert(super->get_root_laddr() == impl->laddr());
475 assert(impl->is_level_tail());
476 super->do_untrack_root(*this);
477 auto ret = std::move(super);
478 assert(!is_tracked());
479 return ret;
480 }
481
482 eagain_ifuture<> Node::upgrade_root(context_t c, laddr_t hint)
483 {
484 LOG_PREFIX(OTree::Node::upgrade_root);
485 assert(impl->field_type() == field_type_t::N0);
486 auto super_to_move = deref_super();
487 return InternalNode::allocate_root(
488 c, hint, impl->level(), impl->laddr(), std::move(super_to_move)
489 ).si_then([this, c, FNAME](auto new_root) {
490 as_child(search_position_t::end(), new_root);
491 INFOT("upgraded from {} to {}",
492 c.t, get_name(), new_root->get_name());
493 });
494 }
495
496 template <bool VALIDATE>
497 void Node::as_child(const search_position_t& pos, Ref<InternalNode> parent_node)
498 {
499 assert(!is_tracked() || !is_root());
500 #ifndef NDEBUG
501 // Although I might have an outdated _parent_info during fixing,
502 // I must be already untracked.
503 if (_parent_info.has_value()) {
504 assert(!_parent_info->ptr->check_is_tracking(*this));
505 }
506 #endif
507 _parent_info = parent_info_t{pos, parent_node};
508 parent_info().ptr->do_track_child<VALIDATE>(*this);
509 assert(!is_root());
510 }
511 template void Node::as_child<true>(const search_position_t&, Ref<InternalNode>);
512 template void Node::as_child<false>(const search_position_t&, Ref<InternalNode>);
513
514 Ref<InternalNode> Node::deref_parent()
515 {
516 assert(!is_root());
517 auto parent_ref = std::move(parent_info().ptr);
518 parent_ref->do_untrack_child(*this);
519 _parent_info.reset();
520 assert(!is_tracked());
521 return parent_ref;
522 }
523
524 eagain_ifuture<> Node::apply_split_to_parent(
525 context_t c,
526 Ref<Node>&& this_ref,
527 Ref<Node>&& split_right,
528 bool update_right_index)
529 {
530 assert(!is_root());
531 assert(this == this_ref.get());
532 // TODO(cross-node string dedup)
533 return parent_info().ptr->apply_child_split(
534 c, std::move(this_ref), std::move(split_right), update_right_index);
535 }
536
537 eagain_ifuture<Ref<tree_cursor_t>>
538 Node::get_next_cursor_from_parent(context_t c)
539 {
540 assert(!impl->is_level_tail());
541 assert(!is_root());
542 return parent_info().ptr->get_next_cursor(c, parent_info().position);
543 }
544
545 template <bool FORCE_MERGE>
546 eagain_ifuture<>
547 Node::try_merge_adjacent(
548 context_t c, bool update_parent_index, Ref<Node>&& this_ref)
549 {
550 LOG_PREFIX(OTree::Node::try_merge_adjacent);
551 assert(this == this_ref.get());
552 impl->validate_non_empty();
553 assert(!is_root());
554 if constexpr (!FORCE_MERGE) {
555 if (!impl->is_size_underflow() &&
556 !impl->has_single_value()) {
557 // skip merge
558 if (update_parent_index) {
559 return fix_parent_index(c, std::move(this_ref), false);
560 } else {
561 parent_info().ptr->validate_child_tracked(*this);
562 return eagain_iertr::now();
563 }
564 }
565 }
566
567 return parent_info().ptr->get_child_peers(c, parent_info().position
568 ).si_then([c, this_ref = std::move(this_ref), this, FNAME,
569 update_parent_index] (auto lr_nodes) mutable -> eagain_ifuture<> {
570 auto& [lnode, rnode] = lr_nodes;
571 Ref<Node> left_for_merge;
572 Ref<Node> right_for_merge;
573 Ref<Node>* p_this_ref;
574 bool is_left;
575 if (!lnode && !rnode) {
576 // XXX: this is possible before node rebalance is implemented,
577 // when its parent cannot merge with its peers and has only one child
578 // (this node).
579 p_this_ref = &this_ref;
580 } else if (!lnode) {
581 left_for_merge = std::move(this_ref);
582 p_this_ref = &left_for_merge;
583 right_for_merge = std::move(rnode);
584 is_left = true;
585 } else if (!rnode) {
586 left_for_merge = std::move(lnode);
587 right_for_merge = std::move(this_ref);
588 p_this_ref = &right_for_merge;
589 is_left = false;
590 } else { // lnode && rnode
591 if (lnode->impl->free_size() > rnode->impl->free_size()) {
592 left_for_merge = std::move(lnode);
593 right_for_merge = std::move(this_ref);
594 p_this_ref = &right_for_merge;
595 is_left = false;
596 } else { // lnode free size <= rnode free size
597 left_for_merge = std::move(this_ref);
598 p_this_ref = &left_for_merge;
599 right_for_merge = std::move(rnode);
600 is_left = true;
601 }
602 }
603
604 if (left_for_merge) {
605 assert(right_for_merge);
606 auto [merge_stage, merge_size] = left_for_merge->impl->evaluate_merge(
607 *right_for_merge->impl);
608 if (merge_size <= left_for_merge->impl->total_size()) {
609 // proceed merge
610 bool update_index_after_merge;
611 if (is_left) {
612 update_index_after_merge = false;
613 } else {
614 update_index_after_merge = update_parent_index;
615 }
616 DEBUGT("merge {} and {} at merge_stage={}, merge_size={}B, "
617 "update_index={}, is_left={} ...",
618 c.t, left_for_merge->get_name(), right_for_merge->get_name(),
619 merge_stage, merge_size, update_index_after_merge, is_left);
620 // we currently cannot generate delta depends on another extent content,
621 // so use rebuild_extent() as a workaround to rebuild the node from a
622 // fresh extent, thus no need to generate delta.
623 auto left_addr = left_for_merge->impl->laddr();
624 return left_for_merge->rebuild_extent(c
625 ).si_then([c, update_index_after_merge,
626 left_addr,
627 merge_stage = merge_stage,
628 merge_size = merge_size,
629 left_for_merge = std::move(left_for_merge),
630 right_for_merge = std::move(right_for_merge)] (auto left_mut) mutable {
631 if (left_for_merge->impl->node_type() == node_type_t::LEAF) {
632 auto& left = *static_cast<LeafNode*>(left_for_merge.get());
633 left.on_layout_change();
634 }
635 search_position_t left_last_pos = left_for_merge->impl->merge(
636 left_mut, *right_for_merge->impl, merge_stage, merge_size);
637 left_for_merge->track_merge(right_for_merge, merge_stage, left_last_pos);
638 --(c.t.get_onode_tree_stats().extents_num_delta);
639 return left_for_merge->parent_info().ptr->apply_children_merge(
640 c, std::move(left_for_merge), left_addr,
641 std::move(right_for_merge), update_index_after_merge);
642 });
643 } else {
644 // size will overflow if merge
645 }
646 }
647
648 // cannot merge
649 if (update_parent_index) {
650 return fix_parent_index(c, std::move(*p_this_ref), false);
651 } else {
652 parent_info().ptr->validate_child_tracked(*this);
653 return eagain_iertr::now();
654 }
655 // XXX: rebalance
656 });
657 }
658 template eagain_ifuture<> Node::try_merge_adjacent<true>(context_t, bool, Ref<Node>&&);
659 template eagain_ifuture<> Node::try_merge_adjacent<false>(context_t, bool, Ref<Node>&&);
660
661 eagain_ifuture<> Node::erase_node(context_t c, Ref<Node>&& this_ref)
662 {
663 // To erase a node:
664 // 1. I'm supposed to have already untracked any children or cursors
665 // 2. unlink parent/super --ptr-> me
666 // 3. unlink me --ref-> parent/super
667 // 4. retire extent
668 // 5. destruct node
669 assert(this_ref.get() == this);
670 assert(!is_tracking());
671 assert(!is_root());
672 assert(this_ref->use_count() == 1);
673 return parent_info().ptr->erase_child(c, std::move(this_ref));
674 }
675
676 template <bool FORCE_MERGE>
677 eagain_ifuture<> Node::fix_parent_index(
678 context_t c, Ref<Node>&& this_ref, bool check_downgrade)
679 {
680 assert(!is_root());
681 assert(this == this_ref.get());
682 return parent_info().ptr->fix_index<FORCE_MERGE>(
683 c, std::move(this_ref), check_downgrade);
684 }
685 template eagain_ifuture<> Node::fix_parent_index<true>(context_t, Ref<Node>&&, bool);
686 template eagain_ifuture<> Node::fix_parent_index<false>(context_t, Ref<Node>&&, bool);
687
688 eagain_ifuture<Ref<Node>> Node::load(
689 context_t c, laddr_t addr, bool expect_is_level_tail)
690 {
691 LOG_PREFIX(OTree::Node::load);
692 return c.nm.read_extent(c.t, addr
693 ).handle_error_interruptible(
694 eagain_iertr::pass_further{},
695 crimson::ct_error::input_output_error::handle(
696 [FNAME, c, addr, expect_is_level_tail] {
697 ERRORT("EIO -- addr={:x}, is_level_tail={}",
698 c.t, addr, expect_is_level_tail);
699 ceph_abort("fatal error");
700 }),
701 crimson::ct_error::invarg::handle(
702 [FNAME, c, addr, expect_is_level_tail] {
703 ERRORT("EINVAL -- addr={:x}, is_level_tail={}",
704 c.t, addr, expect_is_level_tail);
705 ceph_abort("fatal error");
706 }),
707 crimson::ct_error::enoent::handle(
708 [FNAME, c, addr, expect_is_level_tail] {
709 ERRORT("ENOENT -- addr={:x}, is_level_tail={}",
710 c.t, addr, expect_is_level_tail);
711 ceph_abort("fatal error");
712 }),
713 crimson::ct_error::erange::handle(
714 [FNAME, c, addr, expect_is_level_tail] {
715 ERRORT("ERANGE -- addr={:x}, is_level_tail={}",
716 c.t, addr, expect_is_level_tail);
717 ceph_abort("fatal error");
718 })
719 ).si_then([FNAME, c, addr, expect_is_level_tail](auto extent)
720 -> eagain_ifuture<Ref<Node>> {
721 assert(extent);
722 auto header = extent->get_header();
723 auto field_type = header.get_field_type();
724 if (!field_type) {
725 ERRORT("load addr={:x}, is_level_tail={} error, "
726 "got invalid header -- {}",
727 c.t, addr, expect_is_level_tail, fmt::ptr(extent));
728 ceph_abort("fatal error");
729 }
730 if (header.get_is_level_tail() != expect_is_level_tail) {
731 ERRORT("load addr={:x}, is_level_tail={} error, "
732 "is_level_tail mismatch -- {}",
733 c.t, addr, expect_is_level_tail, fmt::ptr(extent));
734 ceph_abort("fatal error");
735 }
736
737 auto node_type = header.get_node_type();
738 if (node_type == node_type_t::LEAF) {
739 if (extent->get_length() != c.vb.get_leaf_node_size()) {
740 ERRORT("load addr={:x}, is_level_tail={} error, "
741 "leaf length mismatch -- {}",
742 c.t, addr, expect_is_level_tail, fmt::ptr(extent));
743 ceph_abort("fatal error");
744 }
745 auto impl = LeafNodeImpl::load(extent, *field_type);
746 auto *derived_ptr = impl.get();
747 return eagain_iertr::make_ready_future<Ref<Node>>(
748 new LeafNode(derived_ptr, std::move(impl)));
749 } else if (node_type == node_type_t::INTERNAL) {
750 if (extent->get_length() != c.vb.get_internal_node_size()) {
751 ERRORT("load addr={:x}, is_level_tail={} error, "
752 "internal length mismatch -- {}",
753 c.t, addr, expect_is_level_tail, fmt::ptr(extent));
754 ceph_abort("fatal error");
755 }
756 auto impl = InternalNodeImpl::load(extent, *field_type);
757 auto *derived_ptr = impl.get();
758 return eagain_iertr::make_ready_future<Ref<Node>>(
759 new InternalNode(derived_ptr, std::move(impl)));
760 } else {
761 ceph_abort("impossible path");
762 }
763 });
764 }
765
766 eagain_ifuture<NodeExtentMutable> Node::rebuild_extent(context_t c)
767 {
768 LOG_PREFIX(OTree::Node::rebuild_extent);
769 DEBUGT("{} ...", c.t, get_name());
770 assert(!is_root());
771 // assume I'm already ref counted by caller
772
773 // note: laddr can be changed after rebuild, but we don't fix the parent
774 // mapping as it is part of the merge process.
775 return impl->rebuild_extent(c);
776 }
777
778 eagain_ifuture<> Node::retire(context_t c, Ref<Node>&& this_ref)
779 {
780 LOG_PREFIX(OTree::Node::retire);
781 DEBUGT("{} ...", c.t, get_name());
782 assert(this_ref.get() == this);
783 assert(!is_tracking());
784 assert(!is_tracked());
785 assert(this_ref->use_count() == 1);
786
787 return impl->retire_extent(c
788 ).si_then([this_ref = std::move(this_ref)]{ /* deallocate node */});
789 }
790
791 void Node::make_tail(context_t c)
792 {
793 LOG_PREFIX(OTree::Node::make_tail);
794 assert(!impl->is_level_tail());
795 assert(!impl->is_keys_empty());
796 DEBUGT("{} ...", c.t, get_name());
797 impl->prepare_mutate(c);
798 auto tail_pos = impl->make_tail();
799 if (impl->node_type() == node_type_t::INTERNAL) {
800 auto& node = *static_cast<InternalNode*>(this);
801 node.track_make_tail(tail_pos);
802 }
803 }
804
805 /*
806 * InternalNode
807 */
808
809 InternalNode::InternalNode(InternalNodeImpl* impl, NodeImplURef&& impl_ref)
810 : Node(std::move(impl_ref)), impl{impl} {}
811
812 eagain_ifuture<Ref<tree_cursor_t>>
813 InternalNode::get_next_cursor(context_t c, const search_position_t& pos)
814 {
815 impl->validate_non_empty();
816 if (pos.is_end()) {
817 assert(impl->is_level_tail());
818 return get_next_cursor_from_parent(c);
819 }
820
821 search_position_t next_pos = pos;
822 const laddr_packed_t* p_child_addr = nullptr;
823 impl->get_next_slot(next_pos, nullptr, &p_child_addr);
824 if (next_pos.is_end() && !impl->is_level_tail()) {
825 return get_next_cursor_from_parent(c);
826 } else {
827 if (next_pos.is_end()) {
828 p_child_addr = impl->get_tail_value();
829 }
830 assert(p_child_addr);
831 return get_or_track_child(c, next_pos, p_child_addr->value
832 ).si_then([c](auto child) {
833 return child->lookup_smallest(c);
834 });
835 }
836 }
837
838 eagain_ifuture<> InternalNode::apply_child_split(
839 context_t c, Ref<Node>&& left_child, Ref<Node>&& right_child,
840 bool update_right_index)
841 {
842 LOG_PREFIX(OTree::InternalNode::apply_child_split);
843 auto& left_pos = left_child->parent_info().position;
844
845 #ifndef NDEBUG
846 assert(left_child->parent_info().ptr.get() == this);
847 assert(!left_child->impl->is_level_tail());
848 if (left_pos.is_end()) {
849 assert(impl->is_level_tail());
850 assert(right_child->impl->is_level_tail());
851 assert(!update_right_index);
852 }
853
854 // right_child has not assigned parent yet
855 assert(!right_child->is_tracked());
856 #endif
857
858 impl->prepare_mutate(c);
859
860 DEBUGT("apply {}'s child {} to split to {}, update_index={} ...",
861 c.t, get_name(), left_child->get_name(),
862 right_child->get_name(), update_right_index);
863
864 // update layout from left_pos => left_child_addr to right_child_addr
865 auto left_child_addr = left_child->impl->laddr();
866 auto right_child_addr = right_child->impl->laddr();
867 impl->replace_child_addr(left_pos, right_child_addr, left_child_addr);
868
869 // update track from left_pos => left_child to right_child
870 replace_track(right_child, left_child, update_right_index);
871
872 auto left_key = *left_child->impl->get_pivot_index();
873 Ref<Node> this_ref = this;
874 return insert_or_split(
875 c, left_pos, left_key, left_child,
876 (update_right_index ? right_child : nullptr)
877 ).si_then([this, c,
878 this_ref = std::move(this_ref)] (auto split_right) mutable {
879 if (split_right) {
880 // even if update_right_index could be true,
881 // we haven't fixed the right_child index of this node yet,
882 // so my parent index should be correct now.
883 return apply_split_to_parent(
884 c, std::move(this_ref), std::move(split_right), false);
885 } else {
886 return eagain_iertr::now();
887 }
888 }).si_then([c, update_right_index,
889 right_child = std::move(right_child)] () mutable {
890 if (update_right_index) {
891 // XXX: might not need to call validate_tracked_children() in fix_index()
892 return right_child->fix_parent_index(c, std::move(right_child), false);
893 } else {
894 // there is no need to call try_merge_adjacent() because
895 // the filled size of the inserted node or the split right node
896 // won't be reduced if update_right_index is false.
897 return eagain_iertr::now();
898 }
899 });
900 }
901
902 eagain_ifuture<> InternalNode::erase_child(context_t c, Ref<Node>&& child_ref)
903 {
904 LOG_PREFIX(OTree::InternalNode::erase_child);
905 // this is a special version of recursive merge
906 impl->validate_non_empty();
907 assert(child_ref->use_count() == 1);
908 validate_child_tracked(*child_ref);
909
910 // fix the child's previous node as the new tail,
911 // and trigger prv_child_ref->try_merge_adjacent() at the end
912 bool fix_tail = (child_ref->parent_info().position.is_end() &&
913 !impl->is_keys_empty());
914 return eagain_iertr::now().si_then([c, this, fix_tail] {
915 if (fix_tail) {
916 search_position_t new_tail_pos;
917 const laddr_packed_t* new_tail_p_addr = nullptr;
918 impl->get_largest_slot(&new_tail_pos, nullptr, &new_tail_p_addr);
919 return get_or_track_child(c, new_tail_pos, new_tail_p_addr->value);
920 } else {
921 return eagain_iertr::make_ready_future<Ref<Node>>();
922 }
923 }).si_then([c, this, child_ref = std::move(child_ref), FNAME]
924 (auto&& new_tail_child) mutable {
925 auto child_pos = child_ref->parent_info().position;
926 if (new_tail_child) {
927 DEBUGT("erase {}'s child {} at pos({}), "
928 "and fix new child tail {} at pos({}) ...",
929 c.t, get_name(), child_ref->get_name(), child_pos,
930 new_tail_child->get_name(), new_tail_child->parent_info().position);
931 assert(!new_tail_child->impl->is_level_tail());
932 new_tail_child->make_tail(c);
933 assert(new_tail_child->impl->is_level_tail());
934 if (new_tail_child->impl->node_type() == node_type_t::LEAF) {
935 // no need to proceed merge because the filled size is not changed
936 new_tail_child.reset();
937 }
938 } else {
939 DEBUGT("erase {}'s child {} at pos({}) ...",
940 c.t, get_name(), child_ref->get_name(), child_pos);
941 }
942
943 Ref<Node> this_ref = child_ref->deref_parent();
944 assert(this_ref == this);
945 return child_ref->retire(c, std::move(child_ref)
946 ).si_then([c, this, child_pos, FNAME,
947 this_ref = std::move(this_ref)] () mutable {
948 if (impl->has_single_value()) {
949 // fast path without mutating the extent
950 DEBUGT("{} has one value left, erase ...", c.t, get_name());
951 #ifndef NDEBUG
952 if (impl->is_level_tail()) {
953 assert(child_pos.is_end());
954 } else {
955 assert(child_pos == search_position_t::begin());
956 }
957 #endif
958
959 if (is_root()) {
960 // Note: if merge/split works as expected, we should never encounter the
961 // situation when the internal root has <=1 children:
962 //
963 // A newly created internal root (see Node::upgrade_root()) will have 2
964 // children after split is finished.
965 //
966 // When merge happens, children will try to merge each other, and if the
967 // root detects there is only one child left, the root will be
968 // down-graded to the only child.
969 //
970 // In order to preserve the invariant, we need to make sure the new
971 // internal root also has at least 2 children.
972 ceph_abort("trying to erase the last item from the internal root node");
973 }
974
975 // track erase
976 assert(tracked_child_nodes.empty());
977
978 // no child should be referencing this node now, this_ref is the last one.
979 assert(this_ref->use_count() == 1);
980 return Node::erase_node(c, std::move(this_ref));
981 }
982
983 impl->prepare_mutate(c);
984 auto [erase_stage, next_or_last_pos] = impl->erase(child_pos);
985 if (child_pos.is_end()) {
986 // next_or_last_pos as last_pos
987 track_make_tail(next_or_last_pos);
988 } else {
989 // next_or_last_pos as next_pos
990 track_erase(child_pos, erase_stage);
991 }
992 validate_tracked_children();
993
994 if (is_root()) {
995 return try_downgrade_root(c, std::move(this_ref));
996 } else {
997 bool update_parent_index;
998 if (impl->is_level_tail()) {
999 update_parent_index = false;
1000 } else {
1001 // next_or_last_pos as next_pos
1002 next_or_last_pos.is_end() ? update_parent_index = true
1003 : update_parent_index = false;
1004 }
1005 return try_merge_adjacent(c, update_parent_index, std::move(this_ref));
1006 }
1007 }).si_then([c, new_tail_child = std::move(new_tail_child)] () mutable {
1008 // finally, check if the new tail child needs to merge
1009 if (new_tail_child && !new_tail_child->is_root()) {
1010 assert(new_tail_child->impl->is_level_tail());
1011 return new_tail_child->try_merge_adjacent(
1012 c, false, std::move(new_tail_child));
1013 } else {
1014 return eagain_iertr::now();
1015 }
1016 });
1017 });
1018 }
1019
1020 template <bool FORCE_MERGE>
1021 eagain_ifuture<> InternalNode::fix_index(
1022 context_t c, Ref<Node>&& child, bool check_downgrade)
1023 {
1024 LOG_PREFIX(OTree::InternalNode::fix_index);
1025 impl->validate_non_empty();
1026
1027 validate_child_inconsistent(*child);
1028 auto& child_pos = child->parent_info().position;
1029 Ref<Node> this_ref = child->deref_parent();
1030 assert(this_ref == this);
1031 validate_tracked_children();
1032
1033 impl->prepare_mutate(c);
1034
1035 key_view_t new_key = *child->impl->get_pivot_index();
1036 DEBUGT("fix {}'s index of child {} at pos({}), new_key={} ...",
1037 c.t, get_name(), child->get_name(), child_pos, new_key);
1038
1039 // erase the incorrect item
1040 auto [erase_stage, next_pos] = impl->erase(child_pos);
1041 track_erase(child_pos, erase_stage);
1042 validate_tracked_children();
1043
1044 // find out whether there is a need to fix parent index recursively
1045 bool update_parent_index;
1046 if (impl->is_level_tail()) {
1047 update_parent_index = false;
1048 } else {
1049 next_pos.is_end() ? update_parent_index = true
1050 : update_parent_index = false;
1051 }
1052
1053 return insert_or_split(c, next_pos, new_key, child
1054 ).si_then([this, c, update_parent_index, check_downgrade,
1055 this_ref = std::move(this_ref)] (auto split_right) mutable {
1056 if (split_right) {
1057 // after split, the parent index to the split_right will be incorrect
1058 // if update_parent_index is true.
1059 return apply_split_to_parent(
1060 c, std::move(this_ref), std::move(split_right), update_parent_index);
1061 } else {
1062 // no split path
1063 if (is_root()) {
1064 if (check_downgrade) {
1065 return try_downgrade_root(c, std::move(this_ref));
1066 } else {
1067 // no need to call try_downgrade_root() because the number of keys
1068 // has not changed, and I must have at least 2 keys.
1069 assert(!impl->is_keys_empty());
1070 return eagain_iertr::now();
1071 }
1072 } else {
1073 // for non-root, maybe need merge adjacent or fix parent,
1074 // because the filled node size may be reduced.
1075 return try_merge_adjacent<FORCE_MERGE>(
1076 c, update_parent_index, std::move(this_ref));
1077 }
1078 }
1079 });
1080 }
1081
1082 template <bool FORCE_MERGE>
1083 eagain_ifuture<> InternalNode::apply_children_merge(
1084 context_t c, Ref<Node>&& left_child, laddr_t origin_left_addr,
1085 Ref<Node>&& right_child, bool update_index)
1086 {
1087 LOG_PREFIX(OTree::InternalNode::apply_children_merge);
1088 auto left_pos = left_child->parent_info().position;
1089 auto left_addr = left_child->impl->laddr();
1090 auto& right_pos = right_child->parent_info().position;
1091 auto right_addr = right_child->impl->laddr();
1092 DEBUGT("apply {}'s child {} (was {:#x}) at pos({}), "
1093 "to merge with {} at pos({}), update_index={} ...",
1094 c.t, get_name(), left_child->get_name(), origin_left_addr, left_pos,
1095 right_child->get_name(), right_pos, update_index);
1096
1097 #ifndef NDEBUG
1098 assert(left_child->parent_info().ptr == this);
1099 assert(!left_pos.is_end());
1100 const laddr_packed_t* p_value_left;
1101 impl->get_slot(left_pos, nullptr, &p_value_left);
1102 assert(p_value_left->value == origin_left_addr);
1103
1104 assert(right_child->use_count() == 1);
1105 assert(right_child->parent_info().ptr == this);
1106 const laddr_packed_t* p_value_right;
1107 if (right_pos.is_end()) {
1108 assert(right_child->impl->is_level_tail());
1109 assert(left_child->impl->is_level_tail());
1110 assert(impl->is_level_tail());
1111 assert(!update_index);
1112 p_value_right = impl->get_tail_value();
1113 } else {
1114 assert(!right_child->impl->is_level_tail());
1115 assert(!left_child->impl->is_level_tail());
1116 impl->get_slot(right_pos, nullptr, &p_value_right);
1117 }
1118 assert(p_value_right->value == right_addr);
1119 #endif
1120
1121 // XXX: we may jump to try_downgrade_root() without mutating this node.
1122
1123 // update layout from right_pos => right_addr to left_addr
1124 impl->prepare_mutate(c);
1125 impl->replace_child_addr(right_pos, left_addr, right_addr);
1126
1127 // update track from right_pos => right_child to left_child
1128 left_child->deref_parent();
1129 replace_track(left_child, right_child, update_index);
1130
1131 // erase left_pos from layout
1132 auto [erase_stage, next_pos] = impl->erase(left_pos);
1133 track_erase<false>(left_pos, erase_stage);
1134 assert(next_pos == left_child->parent_info().position);
1135
1136 // All good to retire the right_child.
1137 // I'm already ref-counted by left_child.
1138 return right_child->retire(c, std::move(right_child)
1139 ).si_then([c, this, update_index,
1140 left_child = std::move(left_child)] () mutable {
1141 if (update_index) {
1142 // I'm all good but:
1143 // - my number of keys is reduced by 1
1144 // - my size may underflow, but try_merge_adjacent() is already part of fix_index()
1145 return left_child->fix_parent_index<FORCE_MERGE>(c, std::move(left_child), true);
1146 } else {
1147 validate_tracked_children();
1148 Ref<Node> this_ref = this;
1149 left_child.reset();
1150 // I'm all good but:
1151 // - my number of keys is reduced by 1
1152 // - my size may underflow
1153 if (is_root()) {
1154 return try_downgrade_root(c, std::move(this_ref));
1155 } else {
1156 return try_merge_adjacent<FORCE_MERGE>(
1157 c, false, std::move(this_ref));
1158 }
1159 }
1160 });
1161 }
1162 template eagain_ifuture<> InternalNode::apply_children_merge<true>(
1163 context_t, Ref<Node>&&, laddr_t, Ref<Node>&&, bool);
1164 template eagain_ifuture<> InternalNode::apply_children_merge<false>(
1165 context_t, Ref<Node>&&, laddr_t, Ref<Node>&&, bool);
1166
1167 eagain_ifuture<std::pair<Ref<Node>, Ref<Node>>> InternalNode::get_child_peers(
1168 context_t c, const search_position_t& pos)
1169 {
1170 // assume I'm already ref counted by caller
1171 search_position_t prev_pos;
1172 const laddr_packed_t* prev_p_child_addr = nullptr;
1173 search_position_t next_pos;
1174 const laddr_packed_t* next_p_child_addr = nullptr;
1175
1176 if (pos.is_end()) {
1177 assert(impl->is_level_tail());
1178 if (!impl->is_keys_empty()) {
1179 // got previous child only
1180 impl->get_largest_slot(&prev_pos, nullptr, &prev_p_child_addr);
1181 assert(prev_pos < pos);
1182 assert(prev_p_child_addr != nullptr);
1183 } else {
1184 // no keys, so no peer children
1185 }
1186 } else { // !pos.is_end()
1187 if (pos != search_position_t::begin()) {
1188 // got previous child
1189 prev_pos = pos;
1190 impl->get_prev_slot(prev_pos, nullptr, &prev_p_child_addr);
1191 assert(prev_pos < pos);
1192 assert(prev_p_child_addr != nullptr);
1193 } else {
1194 // is already the first child, so no previous child
1195 }
1196
1197 next_pos = pos;
1198 impl->get_next_slot(next_pos, nullptr, &next_p_child_addr);
1199 if (next_pos.is_end()) {
1200 if (impl->is_level_tail()) {
1201 // the next child is the tail
1202 next_p_child_addr = impl->get_tail_value();
1203 assert(pos < next_pos);
1204 assert(next_p_child_addr != nullptr);
1205 } else {
1206 // next child doesn't exist
1207 assert(next_p_child_addr == nullptr);
1208 }
1209 } else {
1210 // got the next child
1211 assert(pos < next_pos);
1212 assert(next_p_child_addr != nullptr);
1213 }
1214 }
1215
1216 return eagain_iertr::now().si_then([this, c, prev_pos, prev_p_child_addr] {
1217 if (prev_p_child_addr != nullptr) {
1218 return get_or_track_child(c, prev_pos, prev_p_child_addr->value);
1219 } else {
1220 return eagain_iertr::make_ready_future<Ref<Node>>();
1221 }
1222 }).si_then([this, c, next_pos, next_p_child_addr] (Ref<Node> lnode) {
1223 if (next_p_child_addr != nullptr) {
1224 return get_or_track_child(c, next_pos, next_p_child_addr->value
1225 ).si_then([lnode] (Ref<Node> rnode) {
1226 return seastar::make_ready_future<std::pair<Ref<Node>, Ref<Node>>>(
1227 lnode, rnode);
1228 });
1229 } else {
1230 return eagain_iertr::make_ready_future<std::pair<Ref<Node>, Ref<Node>>>(
1231 lnode, nullptr);
1232 }
1233 });
1234 }
1235
1236 eagain_ifuture<Ref<InternalNode>> InternalNode::allocate_root(
1237 context_t c, laddr_t hint, level_t old_root_level,
1238 laddr_t old_root_addr, Super::URef&& super)
1239 {
1240 // support tree height up to 256
1241 ceph_assert(old_root_level < MAX_LEVEL);
1242 return InternalNode::allocate(c, hint, field_type_t::N0, true, old_root_level + 1
1243 ).si_then([c, old_root_addr,
1244 super = std::move(super)](auto fresh_node) mutable {
1245 auto root = fresh_node.node;
1246 assert(root->impl->is_keys_empty());
1247 auto p_value = root->impl->get_tail_value();
1248 fresh_node.mut.copy_in_absolute(
1249 const_cast<laddr_packed_t*>(p_value), old_root_addr);
1250 root->make_root_from(c, std::move(super), old_root_addr);
1251 ++(c.t.get_onode_tree_stats().extents_num_delta);
1252 return root;
1253 });
1254 }
1255
1256 eagain_ifuture<Ref<tree_cursor_t>>
1257 InternalNode::lookup_smallest(context_t c)
1258 {
1259 impl->validate_non_empty();
1260 auto position = search_position_t::begin();
1261 const laddr_packed_t* p_child_addr;
1262 impl->get_slot(position, nullptr, &p_child_addr);
1263 return get_or_track_child(c, position, p_child_addr->value
1264 ).si_then([c](auto child) {
1265 return child->lookup_smallest(c);
1266 });
1267 }
1268
1269 eagain_ifuture<Ref<tree_cursor_t>>
1270 InternalNode::lookup_largest(context_t c)
1271 {
1272 // NOTE: unlike LeafNode::lookup_largest(), this only works for the tail
1273 // internal node to return the tail child address.
1274 impl->validate_non_empty();
1275 assert(impl->is_level_tail());
1276 auto p_child_addr = impl->get_tail_value();
1277 return get_or_track_child(c, search_position_t::end(), p_child_addr->value
1278 ).si_then([c](auto child) {
1279 return child->lookup_largest(c);
1280 });
1281 }
1282
1283 eagain_ifuture<Node::search_result_t>
1284 InternalNode::lower_bound_tracked(
1285 context_t c, const key_hobj_t& key, MatchHistory& history)
1286 {
1287 auto result = impl->lower_bound(key, history);
1288 return get_or_track_child(c, result.position, result.p_value->value
1289 ).si_then([c, &key, &history](auto child) {
1290 // XXX(multi-type): pass result.mstat to child
1291 return child->lower_bound_tracked(c, key, history);
1292 });
1293 }
1294
1295 eagain_ifuture<> InternalNode::do_get_tree_stats(
1296 context_t c, tree_stats_t& stats)
1297 {
1298 impl->validate_non_empty();
1299 auto nstats = impl->get_stats();
1300 stats.size_persistent_internal += nstats.size_persistent;
1301 stats.size_filled_internal += nstats.size_filled;
1302 stats.size_logical_internal += nstats.size_logical;
1303 stats.size_overhead_internal += nstats.size_overhead;
1304 stats.size_value_internal += nstats.size_value;
1305 stats.num_kvs_internal += nstats.num_kvs;
1306 stats.num_nodes_internal += 1;
1307
1308 Ref<Node> this_ref = this;
1309 return seastar::do_with(
1310 search_position_t(), (const laddr_packed_t*)(nullptr),
1311 [this, this_ref, c, &stats](auto& pos, auto& p_child_addr) {
1312 pos = search_position_t::begin();
1313 impl->get_slot(pos, nullptr, &p_child_addr);
1314 return trans_intr::repeat(
1315 [this, this_ref, c, &stats, &pos, &p_child_addr]()
1316 -> eagain_ifuture<seastar::stop_iteration> {
1317 return get_or_track_child(c, pos, p_child_addr->value
1318 ).si_then([c, &stats](auto child) {
1319 return child->do_get_tree_stats(c, stats);
1320 }).si_then([this, this_ref, &pos, &p_child_addr] {
1321 if (pos.is_end()) {
1322 return seastar::stop_iteration::yes;
1323 } else {
1324 impl->get_next_slot(pos, nullptr, &p_child_addr);
1325 if (pos.is_end()) {
1326 if (impl->is_level_tail()) {
1327 p_child_addr = impl->get_tail_value();
1328 return seastar::stop_iteration::no;
1329 } else {
1330 return seastar::stop_iteration::yes;
1331 }
1332 } else {
1333 return seastar::stop_iteration::no;
1334 }
1335 }
1336 });
1337 });
1338 }
1339 );
1340 }
1341
1342 void InternalNode::track_merge(
1343 Ref<Node> _right_node, match_stage_t stage, search_position_t& left_last_pos)
1344 {
1345 assert(level() == _right_node->level());
1346 assert(impl->node_type() == _right_node->impl->node_type());
1347 auto& right_node = *static_cast<InternalNode*>(_right_node.get());
1348 if (right_node.tracked_child_nodes.empty()) {
1349 return;
1350 }
1351
1352 match_stage_t curr_stage = STAGE_BOTTOM;
1353
1354 // prepare the initial left_last_pos for offset
1355 while (curr_stage < stage) {
1356 left_last_pos.index_by_stage(curr_stage) = 0;
1357 ++curr_stage;
1358 }
1359 ++left_last_pos.index_by_stage(curr_stage);
1360
1361 // fix the tracked child nodes of right_node, stage by stage.
1362 auto& right_tracked_children = right_node.tracked_child_nodes;
1363 auto rit = right_tracked_children.begin();
1364 while (curr_stage <= STAGE_TOP) {
1365 auto right_pos_until = search_position_t::begin();
1366 right_pos_until.index_by_stage(curr_stage) = INDEX_UPPER_BOUND;
1367 auto rend = right_tracked_children.lower_bound(right_pos_until);
1368 while (rit != rend) {
1369 auto new_pos = rit->second->parent_info().position;
1370 assert(new_pos == rit->first);
1371 assert(rit->second->parent_info().ptr == &right_node);
1372 new_pos += left_last_pos;
1373 auto p_child = rit->second;
1374 rit = right_tracked_children.erase(rit);
1375 p_child->as_child(new_pos, this);
1376 }
1377 left_last_pos.index_by_stage(curr_stage) = 0;
1378 ++curr_stage;
1379 }
1380
1381 // fix the end tracked child node of right_node, if exists.
1382 if (rit != right_tracked_children.end()) {
1383 assert(rit->first == search_position_t::end());
1384 assert(rit->second->parent_info().position == search_position_t::end());
1385 assert(right_node.impl->is_level_tail());
1386 assert(impl->is_level_tail());
1387 auto p_child = rit->second;
1388 rit = right_tracked_children.erase(rit);
1389 p_child->as_child(search_position_t::end(), this);
1390 }
1391 assert(right_tracked_children.empty());
1392
1393 validate_tracked_children();
1394 }
1395
1396 eagain_ifuture<> InternalNode::test_clone_root(
1397 context_t c_other, RootNodeTracker& tracker_other) const
1398 {
1399 assert(is_root());
1400 assert(impl->is_level_tail());
1401 assert(impl->field_type() == field_type_t::N0);
1402 Ref<const Node> this_ref = this;
1403 return InternalNode::allocate(c_other, L_ADDR_MIN, field_type_t::N0, true, impl->level()
1404 ).si_then([this, c_other, &tracker_other](auto fresh_other) {
1405 impl->test_copy_to(fresh_other.mut);
1406 auto cloned_root = fresh_other.node;
1407 return c_other.nm.get_super(c_other.t, tracker_other
1408 ).handle_error_interruptible(
1409 eagain_iertr::pass_further{},
1410 crimson::ct_error::assert_all{"Invalid error during test clone"}
1411 ).si_then([c_other, cloned_root](auto&& super_other) {
1412 assert(super_other);
1413 cloned_root->make_root_new(c_other, std::move(super_other));
1414 return cloned_root;
1415 });
1416 }).si_then([this_ref, this, c_other](auto cloned_root) {
1417 // clone tracked children
1418 // In some unit tests, the children are stubbed out that they
1419 // don't exist in NodeExtentManager, and are only tracked in memory.
1420 return trans_intr::do_for_each(
1421 tracked_child_nodes.begin(),
1422 tracked_child_nodes.end(),
1423 [this_ref, c_other, cloned_root](auto& kv) {
1424 assert(kv.first == kv.second->parent_info().position);
1425 return kv.second->test_clone_non_root(c_other, cloned_root);
1426 }
1427 );
1428 });
1429 }
1430
1431 eagain_ifuture<> InternalNode::try_downgrade_root(
1432 context_t c, Ref<Node>&& this_ref)
1433 {
1434 LOG_PREFIX(OTree::InternalNode::try_downgrade_root);
1435 assert(this_ref.get() == this);
1436 assert(is_root());
1437 assert(impl->is_level_tail());
1438 if (!impl->is_keys_empty()) {
1439 // I have more than 1 values, no need to downgrade
1440 return eagain_iertr::now();
1441 }
1442
1443 // proceed downgrade root to the only child
1444 laddr_t child_addr = impl->get_tail_value()->value;
1445 return get_or_track_child(c, search_position_t::end(), child_addr
1446 ).si_then([c, this, FNAME,
1447 this_ref = std::move(this_ref)] (auto child) mutable {
1448 INFOT("downgrade {} to new root {}",
1449 c.t, get_name(), child->get_name());
1450 // Invariant, see InternalNode::erase_child()
1451 // the new internal root should have at least 2 children.
1452 assert(child->impl->is_level_tail());
1453 if (child->impl->node_type() == node_type_t::INTERNAL) {
1454 ceph_assert(!child->impl->is_keys_empty());
1455 }
1456
1457 assert(tracked_child_nodes.size() == 1);
1458 child->deref_parent();
1459 auto super_to_move = deref_super();
1460 child->make_root_from(c, std::move(super_to_move), impl->laddr());
1461 --(c.t.get_onode_tree_stats().extents_num_delta);
1462 return retire(c, std::move(this_ref));
1463 });
1464 }
1465
1466 eagain_ifuture<Ref<InternalNode>> InternalNode::insert_or_split(
1467 context_t c,
1468 const search_position_t& pos,
1469 const key_view_t& insert_key,
1470 Ref<Node> insert_child,
1471 Ref<Node> outdated_child)
1472 {
1473 LOG_PREFIX(OTree::InternalNode::insert_or_split);
1474 // XXX: check the insert_child is unlinked from this node
1475 #ifndef NDEBUG
1476 auto _insert_key = *insert_child->impl->get_pivot_index();
1477 assert(insert_key == _insert_key);
1478 #endif
1479 auto insert_value = insert_child->impl->laddr();
1480 auto insert_pos = pos;
1481 DEBUGT("insert {} with insert_key={}, insert_child={}, insert_pos({}), "
1482 "outdated_child={} ...",
1483 c.t, get_name(), insert_key, insert_child->get_name(),
1484 insert_pos, (outdated_child ? "True" : "False"));
1485 auto [insert_stage, insert_size] = impl->evaluate_insert(
1486 insert_key, insert_value, insert_pos);
1487 auto free_size = impl->free_size();
1488 if (free_size >= insert_size) {
1489 // proceed to insert
1490 [[maybe_unused]] auto p_value = impl->insert(
1491 insert_key, insert_value, insert_pos, insert_stage, insert_size);
1492 assert(impl->free_size() == free_size - insert_size);
1493 assert(insert_pos <= pos);
1494 assert(p_value->value == insert_value);
1495
1496 if (outdated_child) {
1497 track_insert<false>(insert_pos, insert_stage, insert_child);
1498 validate_child_inconsistent(*outdated_child);
1499 #ifndef NDEBUG
1500 do_untrack_child(*outdated_child);
1501 validate_tracked_children();
1502 do_track_child<false>(*outdated_child);
1503 #endif
1504 } else {
1505 track_insert(insert_pos, insert_stage, insert_child);
1506 validate_tracked_children();
1507 }
1508
1509 return eagain_iertr::make_ready_future<Ref<InternalNode>>(nullptr);
1510 }
1511
1512 // proceed to split with insert
1513 // assume I'm already ref-counted by caller
1514 laddr_t left_hint, right_hint;
1515 {
1516 key_view_t left_key;
1517 impl->get_slot(search_position_t::begin(), &left_key, nullptr);
1518 left_hint = left_key.get_hint();
1519 key_view_t right_key;
1520 impl->get_largest_slot(nullptr, &right_key, nullptr);
1521 right_hint = right_key.get_hint();
1522 }
1523 return (is_root() ? upgrade_root(c, left_hint) : eagain_iertr::now()
1524 ).si_then([this, c, right_hint] {
1525 return InternalNode::allocate(
1526 c, right_hint, impl->field_type(), impl->is_level_tail(), impl->level());
1527 }).si_then([this, insert_key, insert_child, insert_pos,
1528 insert_stage=insert_stage, insert_size=insert_size,
1529 outdated_child, c, FNAME](auto fresh_right) mutable {
1530 // I'm the left_node and need to split into the right_node
1531 auto right_node = fresh_right.node;
1532 DEBUGT("proceed split {} to fresh {} with insert_child={},"
1533 " outdated_child={} ...",
1534 c.t, get_name(), right_node->get_name(),
1535 insert_child->get_name(),
1536 (outdated_child ? outdated_child->get_name() : "N/A"));
1537 auto insert_value = insert_child->impl->laddr();
1538 auto [split_pos, is_insert_left, p_value] = impl->split_insert(
1539 fresh_right.mut, *right_node->impl, insert_key, insert_value,
1540 insert_pos, insert_stage, insert_size);
1541 assert(p_value->value == insert_value);
1542 track_split(split_pos, right_node);
1543
1544 if (outdated_child) {
1545 if (is_insert_left) {
1546 track_insert<false>(insert_pos, insert_stage, insert_child);
1547 } else {
1548 right_node->template track_insert<false>(insert_pos, insert_stage, insert_child);
1549 }
1550 #ifndef NDEBUG
1551 auto& _parent = outdated_child->parent_info().ptr;
1552 _parent->validate_child_inconsistent(*outdated_child);
1553 _parent->do_untrack_child(*outdated_child);
1554 validate_tracked_children();
1555 right_node->validate_tracked_children();
1556 _parent->do_track_child<false>(*outdated_child);
1557 #endif
1558 } else {
1559 if (is_insert_left) {
1560 track_insert(insert_pos, insert_stage, insert_child);
1561 } else {
1562 right_node->track_insert(insert_pos, insert_stage, insert_child);
1563 }
1564 validate_tracked_children();
1565 right_node->validate_tracked_children();
1566 }
1567 ++(c.t.get_onode_tree_stats().extents_num_delta);
1568 return right_node;
1569 });
1570 }
1571
1572 eagain_ifuture<Ref<Node>> InternalNode::get_or_track_child(
1573 context_t c, const search_position_t& position, laddr_t child_addr)
1574 {
1575 LOG_PREFIX(OTree::InternalNode::get_or_track_child);
1576 Ref<Node> this_ref = this;
1577 return [this, position, child_addr, c, FNAME] {
1578 auto found = tracked_child_nodes.find(position);
1579 if (found != tracked_child_nodes.end()) {
1580 TRACET("loaded child tracked {} at pos({}) addr={:x}",
1581 c.t, found->second->get_name(), position, child_addr);
1582 return eagain_iertr::make_ready_future<Ref<Node>>(found->second);
1583 }
1584 // the child is not loaded yet
1585 TRACET("loading child at pos({}) addr={:x} ...",
1586 c.t, position, child_addr);
1587 bool level_tail = position.is_end();
1588 return Node::load(c, child_addr, level_tail
1589 ).si_then([this, position, c, FNAME] (auto child) {
1590 TRACET("loaded child untracked {}",
1591 c.t, child->get_name());
1592 if (child->level() + 1 != level()) {
1593 ERRORT("loaded child {} error from parent {} at pos({}), level mismatch",
1594 c.t, child->get_name(), get_name(), position);
1595 ceph_abort("fatal error");
1596 }
1597 child->as_child(position, this);
1598 return child;
1599 });
1600 }().si_then([this_ref, this, position, child_addr] (auto child) {
1601 assert(child_addr == child->impl->laddr());
1602 assert(position == child->parent_info().position);
1603 std::ignore = position;
1604 std::ignore = child_addr;
1605 validate_child_tracked(*child);
1606 return child;
1607 });
1608 }
1609
1610 template <bool VALIDATE>
1611 void InternalNode::track_insert(
1612 const search_position_t& insert_pos, match_stage_t insert_stage,
1613 Ref<Node> insert_child, Ref<Node> nxt_child)
1614 {
1615 // update tracks
1616 auto pos_upper_bound = insert_pos;
1617 pos_upper_bound.index_by_stage(insert_stage) = INDEX_UPPER_BOUND;
1618 auto first = tracked_child_nodes.lower_bound(insert_pos);
1619 auto last = tracked_child_nodes.lower_bound(pos_upper_bound);
1620 std::vector<Node*> nodes;
1621 std::for_each(first, last, [&nodes](auto& kv) {
1622 nodes.push_back(kv.second);
1623 });
1624 tracked_child_nodes.erase(first, last);
1625 for (auto& node : nodes) {
1626 auto _pos = node->parent_info().position;
1627 assert(!_pos.is_end());
1628 ++_pos.index_by_stage(insert_stage);
1629 node->as_child<VALIDATE>(_pos, this);
1630 }
1631 // track insert
1632 insert_child->as_child(insert_pos, this);
1633
1634 #ifndef NDEBUG
1635 // validate left_child is before right_child
1636 if (nxt_child) {
1637 auto iter = tracked_child_nodes.find(insert_pos);
1638 ++iter;
1639 assert(iter->second == nxt_child);
1640 }
1641 #endif
1642 }
1643 template void InternalNode::track_insert<true>(const search_position_t&, match_stage_t, Ref<Node>, Ref<Node>);
1644 template void InternalNode::track_insert<false>(const search_position_t&, match_stage_t, Ref<Node>, Ref<Node>);
1645
1646 void InternalNode::replace_track(
1647 Ref<Node> new_child, Ref<Node> old_child, bool is_new_child_outdated)
1648 {
1649 assert(!new_child->is_tracked());
1650 auto& pos = old_child->parent_info().position;
1651 auto this_ref = old_child->deref_parent();
1652 assert(this_ref == this);
1653 if (is_new_child_outdated) {
1654 // we need to keep track of the outdated child through
1655 // insert and split.
1656 new_child->as_child<false>(pos, this);
1657 } else {
1658 new_child->as_child(pos, this);
1659 }
1660
1661 #ifndef NDEBUG
1662 if (is_new_child_outdated) {
1663 validate_child_inconsistent(*new_child);
1664 } else {
1665 validate_child_tracked(*new_child);
1666 }
1667 #endif
1668 }
1669
1670 void InternalNode::track_split(
1671 const search_position_t& split_pos, Ref<InternalNode> right_node)
1672 {
1673 auto iter = tracked_child_nodes.lower_bound(split_pos);
1674 while (iter != tracked_child_nodes.end()) {
1675 auto new_pos = iter->first;
1676 auto p_node = iter->second;
1677 iter = tracked_child_nodes.erase(iter);
1678 new_pos -= split_pos;
1679 p_node->as_child<false>(new_pos, right_node);
1680 }
1681 }
1682
1683 template <bool VALIDATE>
1684 void InternalNode::track_erase(
1685 const search_position_t& erase_pos, match_stage_t erase_stage)
1686 {
1687 auto first = tracked_child_nodes.lower_bound(erase_pos);
1688 assert(first == tracked_child_nodes.end() ||
1689 first->first != erase_pos);
1690 auto pos_upper_bound = erase_pos;
1691 pos_upper_bound.index_by_stage(erase_stage) = INDEX_UPPER_BOUND;
1692 auto last = tracked_child_nodes.lower_bound(pos_upper_bound);
1693 std::vector<Node*> p_nodes;
1694 std::for_each(first, last, [&p_nodes](auto& kv) {
1695 p_nodes.push_back(kv.second);
1696 });
1697 tracked_child_nodes.erase(first, last);
1698 for (auto& p_node: p_nodes) {
1699 auto new_pos = p_node->parent_info().position;
1700 assert(new_pos.index_by_stage(erase_stage) > 0);
1701 --new_pos.index_by_stage(erase_stage);
1702 p_node->as_child<VALIDATE>(new_pos, this);
1703 }
1704 }
1705 template void InternalNode::track_erase<true>(const search_position_t&, match_stage_t);
1706 template void InternalNode::track_erase<false>(const search_position_t&, match_stage_t);
1707
1708 void InternalNode::track_make_tail(const search_position_t& last_pos)
1709 {
1710 // assume I'm ref counted by the caller.
1711 assert(impl->is_level_tail());
1712 assert(!last_pos.is_end());
1713 assert(tracked_child_nodes.find(search_position_t::end()) ==
1714 tracked_child_nodes.end());
1715 auto last_it = tracked_child_nodes.find(last_pos);
1716 if (last_it != tracked_child_nodes.end()) {
1717 assert(std::next(last_it) == tracked_child_nodes.end());
1718 auto p_last_child = last_it->second;
1719 tracked_child_nodes.erase(last_it);
1720 p_last_child->as_child(search_position_t::end(), this);
1721 } else {
1722 assert(tracked_child_nodes.lower_bound(last_pos) ==
1723 tracked_child_nodes.end());
1724 }
1725 }
1726
1727 void InternalNode::validate_child(const Node& child) const
1728 {
1729 #ifndef NDEBUG
1730 assert(impl->level() - 1 == child.impl->level());
1731 assert(this == child.parent_info().ptr);
1732 auto& child_pos = child.parent_info().position;
1733 if (child_pos.is_end()) {
1734 assert(impl->is_level_tail());
1735 assert(child.impl->is_level_tail());
1736 assert(impl->get_tail_value()->value == child.impl->laddr());
1737 } else {
1738 assert(!child.impl->is_level_tail());
1739 key_view_t index_key;
1740 const laddr_packed_t* p_child_addr;
1741 impl->get_slot(child_pos, &index_key, &p_child_addr);
1742 assert(index_key == *child.impl->get_pivot_index());
1743 assert(p_child_addr->value == child.impl->laddr());
1744 }
1745 // XXX(multi-type)
1746 assert(impl->field_type() <= child.impl->field_type());
1747 #endif
1748 }
1749
1750 void InternalNode::validate_child_inconsistent(const Node& child) const
1751 {
1752 #ifndef NDEBUG
1753 assert(impl->level() - 1 == child.impl->level());
1754 assert(check_is_tracking(child));
1755 auto& child_pos = child.parent_info().position;
1756 // the tail value has no key to fix
1757 assert(!child_pos.is_end());
1758 assert(!child.impl->is_level_tail());
1759
1760 key_view_t current_key;
1761 const laddr_packed_t* p_value;
1762 impl->get_slot(child_pos, &current_key, &p_value);
1763 key_view_t new_key = *child.impl->get_pivot_index();
1764 assert(current_key != new_key);
1765 assert(p_value->value == child.impl->laddr());
1766 #endif
1767 }
1768
1769 eagain_ifuture<InternalNode::fresh_node_t> InternalNode::allocate(
1770 context_t c, laddr_t hint, field_type_t field_type, bool is_level_tail, level_t level)
1771 {
1772 return InternalNodeImpl::allocate(c, hint, field_type, is_level_tail, level
1773 ).si_then([](auto&& fresh_impl) {
1774 auto *derived_ptr = fresh_impl.impl.get();
1775 auto node = Ref<InternalNode>(new InternalNode(
1776 derived_ptr, std::move(fresh_impl.impl)));
1777 return fresh_node_t{node, fresh_impl.mut};
1778 });
1779 }
1780
1781 /*
1782 * LeafNode
1783 */
1784
1785 LeafNode::LeafNode(LeafNodeImpl* impl, NodeImplURef&& impl_ref)
1786 : Node(std::move(impl_ref)), impl{impl} {}
1787
1788 bool LeafNode::is_level_tail() const
1789 {
1790 return impl->is_level_tail();
1791 }
1792
1793 node_version_t LeafNode::get_version() const
1794 {
1795 return {layout_version, impl->get_extent_state()};
1796 }
1797
1798 const char* LeafNode::read() const
1799 {
1800 return impl->read();
1801 }
1802
1803 extent_len_t LeafNode::get_node_size() const
1804 {
1805 return impl->get_node_size();
1806 }
1807
1808 std::tuple<key_view_t, const value_header_t*>
1809 LeafNode::get_kv(const search_position_t& pos) const
1810 {
1811 key_view_t key_view;
1812 const value_header_t* p_value_header;
1813 impl->get_slot(pos, &key_view, &p_value_header);
1814 return {key_view, p_value_header};
1815 }
1816
1817 eagain_ifuture<Ref<tree_cursor_t>>
1818 LeafNode::get_next_cursor(context_t c, const search_position_t& pos)
1819 {
1820 impl->validate_non_empty();
1821 search_position_t next_pos = pos;
1822 key_view_t index_key;
1823 const value_header_t* p_value_header = nullptr;
1824 impl->get_next_slot(next_pos, &index_key, &p_value_header);
1825 if (next_pos.is_end()) {
1826 if (unlikely(is_level_tail())) {
1827 return eagain_iertr::make_ready_future<Ref<tree_cursor_t>>(
1828 tree_cursor_t::create_end(this));
1829 } else {
1830 return get_next_cursor_from_parent(c);
1831 }
1832 } else {
1833 return eagain_iertr::make_ready_future<Ref<tree_cursor_t>>(
1834 get_or_track_cursor(next_pos, index_key, p_value_header));
1835 }
1836 }
1837
1838 template <bool FORCE_MERGE>
1839 eagain_ifuture<Ref<tree_cursor_t>>
1840 LeafNode::erase(context_t c, const search_position_t& pos, bool get_next)
1841 {
1842 LOG_PREFIX(OTree::LeafNode::erase);
1843 assert(!pos.is_end());
1844 assert(!impl->is_keys_empty());
1845 Ref<Node> this_ref = this;
1846 DEBUGT("erase {}'s pos({}), get_next={} ...",
1847 c.t, get_name(), pos, get_next);
1848 ++(c.t.get_onode_tree_stats().num_erases);
1849
1850 // get the next cursor
1851 return eagain_iertr::now().si_then([c, &pos, get_next, this] {
1852 if (get_next) {
1853 return get_next_cursor(c, pos);
1854 } else {
1855 return eagain_iertr::make_ready_future<Ref<tree_cursor_t>>();
1856 }
1857 }).si_then([c, &pos, this_ref = std::move(this_ref),
1858 this, FNAME] (Ref<tree_cursor_t> next_cursor) mutable {
1859 if (next_cursor && next_cursor->is_end()) {
1860 // reset the node reference from the end cursor
1861 next_cursor.reset();
1862 }
1863 return eagain_iertr::now().si_then(
1864 [c, &pos, this_ref = std::move(this_ref), this, FNAME] () mutable {
1865 assert_moveable(this_ref);
1866 #ifndef NDEBUG
1867 assert(!impl->is_keys_empty());
1868 if (impl->has_single_value()) {
1869 assert(pos == search_position_t::begin());
1870 }
1871 #endif
1872 if (!is_root() && impl->has_single_value()) {
1873 // we need to keep the root as an empty leaf node
1874 // fast path without mutating the extent
1875 // track_erase
1876 DEBUGT("{} has one value left, erase ...", c.t, get_name());
1877 assert(tracked_cursors.size() == 1);
1878 auto iter = tracked_cursors.begin();
1879 assert(iter->first == pos);
1880 iter->second->invalidate();
1881 tracked_cursors.clear();
1882
1883 // no cursor should be referencing this node now, this_ref is the last one.
1884 assert(this_ref->use_count() == 1);
1885 return Node::erase_node(c, std::move(this_ref));
1886 }
1887
1888 on_layout_change();
1889 impl->prepare_mutate(c);
1890 auto [erase_stage, next_pos] = impl->erase(pos);
1891 track_erase(pos, erase_stage);
1892 validate_tracked_cursors();
1893
1894 if (is_root()) {
1895 return eagain_iertr::now();
1896 } else {
1897 bool update_parent_index;
1898 if (impl->is_level_tail()) {
1899 update_parent_index = false;
1900 } else {
1901 next_pos.is_end() ? update_parent_index = true
1902 : update_parent_index = false;
1903 }
1904 return try_merge_adjacent<FORCE_MERGE>(
1905 c, update_parent_index, std::move(this_ref));
1906 }
1907 }).si_then([next_cursor] {
1908 return next_cursor;
1909 });
1910 });
1911 }
1912 template eagain_ifuture<Ref<tree_cursor_t>>
1913 LeafNode::erase<true>(context_t, const search_position_t&, bool);
1914 template eagain_ifuture<Ref<tree_cursor_t>>
1915 LeafNode::erase<false>(context_t, const search_position_t&, bool);
1916
1917 eagain_ifuture<> LeafNode::extend_value(
1918 context_t c, const search_position_t& pos, value_size_t extend_size)
1919 {
1920 ceph_abort("not implemented");
1921 return eagain_iertr::now();
1922 }
1923
1924 eagain_ifuture<> LeafNode::trim_value(
1925 context_t c, const search_position_t& pos, value_size_t trim_size)
1926 {
1927 ceph_abort("not implemented");
1928 return eagain_iertr::now();
1929 }
1930
1931 std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
1932 LeafNode::prepare_mutate_value_payload(context_t c)
1933 {
1934 return impl->prepare_mutate_value_payload(c);
1935 }
1936
1937 eagain_ifuture<Ref<tree_cursor_t>>
1938 LeafNode::lookup_smallest(context_t)
1939 {
1940 if (unlikely(impl->is_keys_empty())) {
1941 assert(is_root());
1942 return seastar::make_ready_future<Ref<tree_cursor_t>>(
1943 tree_cursor_t::create_end(this));
1944 }
1945 auto pos = search_position_t::begin();
1946 key_view_t index_key;
1947 const value_header_t* p_value_header;
1948 impl->get_slot(pos, &index_key, &p_value_header);
1949 return seastar::make_ready_future<Ref<tree_cursor_t>>(
1950 get_or_track_cursor(pos, index_key, p_value_header));
1951 }
1952
1953 eagain_ifuture<Ref<tree_cursor_t>>
1954 LeafNode::lookup_largest(context_t)
1955 {
1956 if (unlikely(impl->is_keys_empty())) {
1957 assert(is_root());
1958 return seastar::make_ready_future<Ref<tree_cursor_t>>(
1959 tree_cursor_t::create_end(this));
1960 }
1961 search_position_t pos;
1962 key_view_t index_key;
1963 const value_header_t* p_value_header = nullptr;
1964 impl->get_largest_slot(&pos, &index_key, &p_value_header);
1965 return seastar::make_ready_future<Ref<tree_cursor_t>>(
1966 get_or_track_cursor(pos, index_key, p_value_header));
1967 }
1968
1969 eagain_ifuture<Node::search_result_t>
1970 LeafNode::lower_bound_tracked(
1971 context_t c, const key_hobj_t& key, MatchHistory& history)
1972 {
1973 key_view_t index_key;
1974 auto result = impl->lower_bound(key, history, &index_key);
1975 Ref<tree_cursor_t> cursor;
1976 if (result.position.is_end()) {
1977 assert(!result.p_value);
1978 cursor = tree_cursor_t::create_end(this);
1979 } else {
1980 cursor = get_or_track_cursor(result.position, index_key, result.p_value);
1981 }
1982 search_result_t ret{cursor, result.mstat};
1983 ret.validate_input_key(key, c.vb.get_header_magic());
1984 return seastar::make_ready_future<search_result_t>(ret);
1985 }
1986
1987 eagain_ifuture<> LeafNode::do_get_tree_stats(context_t, tree_stats_t& stats)
1988 {
1989 auto nstats = impl->get_stats();
1990 stats.size_persistent_leaf += nstats.size_persistent;
1991 stats.size_filled_leaf += nstats.size_filled;
1992 stats.size_logical_leaf += nstats.size_logical;
1993 stats.size_overhead_leaf += nstats.size_overhead;
1994 stats.size_value_leaf += nstats.size_value;
1995 stats.num_kvs_leaf += nstats.num_kvs;
1996 stats.num_nodes_leaf += 1;
1997 return eagain_iertr::now();
1998 }
1999
2000 void LeafNode::track_merge(
2001 Ref<Node> _right_node, match_stage_t stage, search_position_t& left_last_pos)
2002 {
2003 assert(level() == _right_node->level());
2004 // assert(impl->node_type() == _right_node->impl->node_type());
2005 auto& right_node = *static_cast<LeafNode*>(_right_node.get());
2006 if (right_node.tracked_cursors.empty()) {
2007 return;
2008 }
2009
2010 match_stage_t curr_stage = STAGE_BOTTOM;
2011
2012 // prepare the initial left_last_pos for offset
2013 while (curr_stage < stage) {
2014 left_last_pos.index_by_stage(curr_stage) = 0;
2015 ++curr_stage;
2016 }
2017 ++left_last_pos.index_by_stage(curr_stage);
2018
2019 // fix the tracked child nodes of right_node, stage by stage.
2020 auto& right_tracked_cursors = right_node.tracked_cursors;
2021 auto rit = right_tracked_cursors.begin();
2022 while (curr_stage <= STAGE_TOP) {
2023 auto right_pos_until = search_position_t::begin();
2024 right_pos_until.index_by_stage(curr_stage) = INDEX_UPPER_BOUND;
2025 auto rend = right_tracked_cursors.lower_bound(right_pos_until);
2026 while (rit != rend) {
2027 auto new_pos = rit->second->get_position();
2028 assert(new_pos == rit->first);
2029 assert(rit->second->get_leaf_node().get() == &right_node);
2030 new_pos += left_last_pos;
2031 auto p_cursor = rit->second;
2032 rit = right_tracked_cursors.erase(rit);
2033 p_cursor->update_track<true>(this, new_pos);
2034 }
2035 left_last_pos.index_by_stage(curr_stage) = 0;
2036 ++curr_stage;
2037 }
2038 assert(right_tracked_cursors.empty());
2039
2040 validate_tracked_cursors();
2041 }
2042
2043 eagain_ifuture<> LeafNode::test_clone_root(
2044 context_t c_other, RootNodeTracker& tracker_other) const
2045 {
2046 assert(is_root());
2047 assert(impl->is_level_tail());
2048 assert(impl->field_type() == field_type_t::N0);
2049 Ref<const Node> this_ref = this;
2050 return LeafNode::allocate(c_other, L_ADDR_MIN, field_type_t::N0, true
2051 ).si_then([this, c_other, &tracker_other](auto fresh_other) {
2052 impl->test_copy_to(fresh_other.mut);
2053 auto cloned_root = fresh_other.node;
2054 return c_other.nm.get_super(c_other.t, tracker_other
2055 ).handle_error_interruptible(
2056 eagain_iertr::pass_further{},
2057 crimson::ct_error::assert_all{"Invalid error during test clone"}
2058 ).si_then([c_other, cloned_root](auto&& super_other) {
2059 assert(super_other);
2060 cloned_root->make_root_new(c_other, std::move(super_other));
2061 });
2062 }).si_then([this_ref]{});
2063 }
2064
2065 eagain_ifuture<Ref<tree_cursor_t>> LeafNode::insert_value(
2066 context_t c, const key_hobj_t& key, value_config_t vconf,
2067 const search_position_t& pos, const MatchHistory& history,
2068 match_stat_t mstat)
2069 {
2070 LOG_PREFIX(OTree::LeafNode::insert_value);
2071 #ifndef NDEBUG
2072 if (pos.is_end()) {
2073 assert(impl->is_level_tail());
2074 }
2075 #endif
2076 DEBUGT("insert {} with insert_key={}, insert_value={}, insert_pos({}), "
2077 "history={}, mstat({}) ...",
2078 c.t, get_name(), key, vconf, pos, history, mstat);
2079 ++(c.t.get_onode_tree_stats().num_inserts);
2080 search_position_t insert_pos = pos;
2081 auto [insert_stage, insert_size] = impl->evaluate_insert(
2082 key, vconf, history, mstat, insert_pos);
2083 auto free_size = impl->free_size();
2084 if (free_size >= insert_size) {
2085 // proceed to insert
2086 on_layout_change();
2087 impl->prepare_mutate(c);
2088 auto p_value_header = impl->insert(key, vconf, insert_pos, insert_stage, insert_size);
2089 assert(impl->free_size() == free_size - insert_size);
2090 assert(insert_pos <= pos);
2091 assert(p_value_header->payload_size == vconf.payload_size);
2092 auto ret = track_insert(insert_pos, insert_stage, p_value_header);
2093 validate_tracked_cursors();
2094 return eagain_iertr::make_ready_future<Ref<tree_cursor_t>>(ret);
2095 }
2096 // split and insert
2097 Ref<Node> this_ref = this;
2098 laddr_t left_hint, right_hint;
2099 {
2100 key_view_t left_key;
2101 impl->get_slot(search_position_t::begin(), &left_key, nullptr);
2102 left_hint = left_key.get_hint();
2103 key_view_t right_key;
2104 impl->get_largest_slot(nullptr, &right_key, nullptr);
2105 right_hint = right_key.get_hint();
2106 }
2107 return (is_root() ? upgrade_root(c, left_hint) : eagain_iertr::now()
2108 ).si_then([this, c, right_hint] {
2109 return LeafNode::allocate(c, right_hint, impl->field_type(), impl->is_level_tail());
2110 }).si_then([this_ref = std::move(this_ref), this, c, &key, vconf, FNAME,
2111 insert_pos, insert_stage=insert_stage, insert_size=insert_size](auto fresh_right) mutable {
2112 auto right_node = fresh_right.node;
2113 DEBUGT("proceed split {} to fresh {} ...",
2114 c.t, get_name(), right_node->get_name());
2115 // no need to bump version for right node, as it is fresh
2116 on_layout_change();
2117 impl->prepare_mutate(c);
2118 auto [split_pos, is_insert_left, p_value_header] = impl->split_insert(
2119 fresh_right.mut, *right_node->impl, key, vconf,
2120 insert_pos, insert_stage, insert_size);
2121 assert(p_value_header->payload_size == vconf.payload_size);
2122 track_split(split_pos, right_node);
2123 Ref<tree_cursor_t> ret;
2124 if (is_insert_left) {
2125 ret = track_insert(insert_pos, insert_stage, p_value_header);
2126 } else {
2127 ret = right_node->track_insert(insert_pos, insert_stage, p_value_header);
2128 }
2129 validate_tracked_cursors();
2130 right_node->validate_tracked_cursors();
2131
2132 ++(c.t.get_onode_tree_stats().extents_num_delta);
2133 return apply_split_to_parent(
2134 c, std::move(this_ref), std::move(right_node), false
2135 ).si_then([ret] {
2136 return ret;
2137 });
2138 // TODO (optimize)
2139 // try to acquire space from siblings before split... see btrfs
2140 });
2141 }
2142
2143 eagain_ifuture<Ref<LeafNode>> LeafNode::allocate_root(
2144 context_t c, RootNodeTracker& root_tracker)
2145 {
2146 LOG_PREFIX(OTree::LeafNode::allocate_root);
2147 return LeafNode::allocate(c, L_ADDR_MIN, field_type_t::N0, true
2148 ).si_then([c, &root_tracker, FNAME](auto fresh_node) {
2149 auto root = fresh_node.node;
2150 return c.nm.get_super(c.t, root_tracker
2151 ).handle_error_interruptible(
2152 eagain_iertr::pass_further{},
2153 crimson::ct_error::input_output_error::handle([FNAME, c] {
2154 ERRORT("EIO during get_super()", c.t);
2155 ceph_abort("fatal error");
2156 })
2157 ).si_then([c, root](auto&& super) {
2158 assert(super);
2159 root->make_root_new(c, std::move(super));
2160 return root;
2161 });
2162 });
2163 }
2164
2165 Ref<tree_cursor_t> LeafNode::get_or_track_cursor(
2166 const search_position_t& position,
2167 const key_view_t& key, const value_header_t* p_value_header)
2168 {
2169 assert(!position.is_end());
2170 assert(p_value_header);
2171 Ref<tree_cursor_t> p_cursor;
2172 auto found = tracked_cursors.find(position);
2173 if (found == tracked_cursors.end()) {
2174 p_cursor = tree_cursor_t::create_tracked(
2175 this, position, key, p_value_header);
2176 } else {
2177 p_cursor = found->second;
2178 assert(p_cursor->get_leaf_node() == this);
2179 assert(p_cursor->get_position() == position);
2180 p_cursor->update_cache_same_node(key, p_value_header);
2181 }
2182 return p_cursor;
2183 }
2184
2185 void LeafNode::validate_cursor(const tree_cursor_t& cursor) const
2186 {
2187 #ifndef NDEBUG
2188 assert(this == cursor.get_leaf_node().get());
2189 assert(cursor.is_tracked());
2190 assert(!impl->is_extent_retired());
2191
2192 // We need to make sure user has freed all the cursors before submitting the
2193 // according transaction. Otherwise the below checks will have undefined
2194 // behaviors.
2195 auto [key, p_value_header] = get_kv(cursor.get_position());
2196 auto magic = p_value_header->magic;
2197 assert(key == cursor.get_key_view(magic));
2198 assert(p_value_header == cursor.read_value_header(magic));
2199 #endif
2200 }
2201
2202 Ref<tree_cursor_t> LeafNode::track_insert(
2203 const search_position_t& insert_pos, match_stage_t insert_stage,
2204 const value_header_t* p_value_header)
2205 {
2206 // update cursor position
2207 auto pos_upper_bound = insert_pos;
2208 pos_upper_bound.index_by_stage(insert_stage) = INDEX_UPPER_BOUND;
2209 auto first = tracked_cursors.lower_bound(insert_pos);
2210 auto last = tracked_cursors.lower_bound(pos_upper_bound);
2211 std::vector<tree_cursor_t*> p_cursors;
2212 std::for_each(first, last, [&p_cursors](auto& kv) {
2213 p_cursors.push_back(kv.second);
2214 });
2215 tracked_cursors.erase(first, last);
2216 for (auto& p_cursor : p_cursors) {
2217 search_position_t new_pos = p_cursor->get_position();
2218 ++new_pos.index_by_stage(insert_stage);
2219 p_cursor->update_track<true>(this, new_pos);
2220 }
2221
2222 // track insert
2223 // TODO: getting key_view_t from stage::proceed_insert() and
2224 // stage::append_insert() has not supported yet
2225 return tree_cursor_t::create_inserted(
2226 this, insert_pos);
2227 }
2228
2229 void LeafNode::track_split(
2230 const search_position_t& split_pos, Ref<LeafNode> right_node)
2231 {
2232 // update cursor ownership and position
2233 auto iter = tracked_cursors.lower_bound(split_pos);
2234 while (iter != tracked_cursors.end()) {
2235 auto new_pos = iter->first;
2236 auto p_cursor = iter->second;
2237 iter = tracked_cursors.erase(iter);
2238 new_pos -= split_pos;
2239 p_cursor->update_track<false>(right_node, new_pos);
2240 }
2241 }
2242
2243 void LeafNode::track_erase(
2244 const search_position_t& erase_pos, match_stage_t erase_stage)
2245 {
2246 // erase tracking and invalidate the erased cursor
2247 auto to_erase = tracked_cursors.find(erase_pos);
2248 assert(to_erase != tracked_cursors.end());
2249 to_erase->second->invalidate();
2250 auto first = tracked_cursors.erase(to_erase);
2251
2252 // update cursor position
2253 assert(first == tracked_cursors.lower_bound(erase_pos));
2254 auto pos_upper_bound = erase_pos;
2255 pos_upper_bound.index_by_stage(erase_stage) = INDEX_UPPER_BOUND;
2256 auto last = tracked_cursors.lower_bound(pos_upper_bound);
2257 std::vector<tree_cursor_t*> p_cursors;
2258 std::for_each(first, last, [&p_cursors](auto& kv) {
2259 p_cursors.push_back(kv.second);
2260 });
2261 tracked_cursors.erase(first, last);
2262 for (auto& p_cursor : p_cursors) {
2263 search_position_t new_pos = p_cursor->get_position();
2264 assert(new_pos.index_by_stage(erase_stage) > 0);
2265 --new_pos.index_by_stage(erase_stage);
2266 p_cursor->update_track<true>(this, new_pos);
2267 }
2268 }
2269
2270 eagain_ifuture<LeafNode::fresh_node_t> LeafNode::allocate(
2271 context_t c, laddr_t hint, field_type_t field_type, bool is_level_tail)
2272 {
2273 return LeafNodeImpl::allocate(c, hint, field_type, is_level_tail
2274 ).si_then([](auto&& fresh_impl) {
2275 auto *derived_ptr = fresh_impl.impl.get();
2276 auto node = Ref<LeafNode>(new LeafNode(
2277 derived_ptr, std::move(fresh_impl.impl)));
2278 return fresh_node_t{node, fresh_impl.mut};
2279 });
2280 }
2281
2282 }