]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/onode_manager/staged-fltree/node.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / os / seastore / onode_manager / staged-fltree / node.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "node.h"
5
6 #include <cassert>
7 #include <exception>
8 #include <sstream>
9
10 #include "common/likely.h"
11 #include "crimson/common/utility.h"
12 #include "crimson/os/seastore/logging.h"
13
14 #include "node_extent_manager.h"
15 #include "node_impl.h"
16 #include "stages/node_stage_layout.h"
17
18 SET_SUBSYS(seastore_onode);
19
20 namespace crimson::os::seastore::onode {
21 /*
22 * tree_cursor_t
23 */
24
25 tree_cursor_t::tree_cursor_t(Ref<LeafNode> node, const search_position_t& pos)
26 : ref_leaf_node{node}, position{pos}, cache{ref_leaf_node}
27 {
28 assert(is_tracked());
29 ref_leaf_node->do_track_cursor<true>(*this);
30 }
31
32 tree_cursor_t::tree_cursor_t(
33 Ref<LeafNode> node, const search_position_t& pos,
34 const key_view_t& key_view, const value_header_t* p_value_header)
35 : ref_leaf_node{node}, position{pos}, cache{ref_leaf_node}
36 {
37 assert(is_tracked());
38 update_cache_same_node(key_view, p_value_header);
39 ref_leaf_node->do_track_cursor<true>(*this);
40 }
41
42 tree_cursor_t::tree_cursor_t(Ref<LeafNode> node)
43 : ref_leaf_node{node}, position{search_position_t::end()}, cache{ref_leaf_node}
44 {
45 assert(is_end());
46 assert(ref_leaf_node->is_level_tail());
47 }
48
49 tree_cursor_t::~tree_cursor_t()
50 {
51 if (is_tracked()) {
52 ref_leaf_node->do_untrack_cursor(*this);
53 }
54 }
55
56 eagain_ifuture<Ref<tree_cursor_t>>
57 tree_cursor_t::get_next(context_t c)
58 {
59 assert(is_tracked());
60 return ref_leaf_node->get_next_cursor(c, position);
61 }
62
63 void tree_cursor_t::assert_next_to(
64 const tree_cursor_t& prv, value_magic_t magic) const
65 {
66 #ifndef NDEBUG
67 assert(!prv.is_end());
68 if (is_end()) {
69 assert(ref_leaf_node == prv.ref_leaf_node);
70 assert(ref_leaf_node->is_level_tail());
71 } else if (is_tracked()) {
72 auto key = get_key_view(magic);
73 auto prv_key = prv.get_key_view(magic);
74 assert(key.compare_to(prv_key) == MatchKindCMP::GT);
75 if (ref_leaf_node == prv.ref_leaf_node) {
76 position.assert_next_to(prv.position);
77 } else {
78 assert(!prv.ref_leaf_node->is_level_tail());
79 assert(position == search_position_t::begin());
80 }
81 } else {
82 assert(is_invalid());
83 ceph_abort("impossible");
84 }
85 #endif
86 }
87
88 template <bool FORCE_MERGE>
89 eagain_ifuture<Ref<tree_cursor_t>>
90 tree_cursor_t::erase(context_t c, bool get_next)
91 {
92 assert(is_tracked());
93 return ref_leaf_node->erase<FORCE_MERGE>(c, position, get_next);
94 }
95 template eagain_ifuture<Ref<tree_cursor_t>>
96 tree_cursor_t::erase<true>(context_t, bool);
97 template eagain_ifuture<Ref<tree_cursor_t>>
98 tree_cursor_t::erase<false>(context_t, bool);
99
100 MatchKindCMP tree_cursor_t::compare_to(
101 const tree_cursor_t& o, value_magic_t magic) const
102 {
103 if (!is_tracked() && !o.is_tracked()) {
104 return MatchKindCMP::EQ;
105 } else if (!is_tracked()) {
106 return MatchKindCMP::GT;
107 } else if (!o.is_tracked()) {
108 return MatchKindCMP::LT;
109 }
110
111 assert(is_tracked() && o.is_tracked());
112 // all tracked cursors are singletons
113 if (this == &o) {
114 return MatchKindCMP::EQ;
115 }
116
117 MatchKindCMP ret;
118 if (ref_leaf_node == o.ref_leaf_node) {
119 ret = position.compare_to(o.position);
120 } else {
121 auto key = get_key_view(magic);
122 auto o_key = o.get_key_view(magic);
123 ret = key.compare_to(o_key);
124 }
125 assert(ret != MatchKindCMP::EQ);
126 return ret;
127 }
128
129 eagain_ifuture<>
130 tree_cursor_t::extend_value(context_t c, value_size_t extend_size)
131 {
132 assert(is_tracked());
133 return ref_leaf_node->extend_value(c, position, extend_size);
134 }
135
136 eagain_ifuture<>
137 tree_cursor_t::trim_value(context_t c, value_size_t trim_size)
138 {
139 assert(is_tracked());
140 return ref_leaf_node->trim_value(c, position, trim_size);
141 }
142
143 template <bool VALIDATE>
144 void tree_cursor_t::update_track(
145 Ref<LeafNode> node, const search_position_t& pos)
146 {
147 // I must be already untracked
148 assert(is_tracked());
149 assert(!ref_leaf_node->check_is_tracking(*this));
150 // track the new node and new pos
151 assert(!pos.is_end());
152 ref_leaf_node = node;
153 position = pos;
154 // we lazy update the key/value information until user asked
155 cache.invalidate();
156 ref_leaf_node->do_track_cursor<VALIDATE>(*this);
157 }
158 template void tree_cursor_t::update_track<true>(Ref<LeafNode>, const search_position_t&);
159 template void tree_cursor_t::update_track<false>(Ref<LeafNode>, const search_position_t&);
160
161 void tree_cursor_t::update_cache_same_node(const key_view_t& key_view,
162 const value_header_t* p_value_header) const
163 {
164 assert(is_tracked());
165 cache.update_all(ref_leaf_node->get_version(), key_view, p_value_header);
166 cache.validate_is_latest(position);
167 }
168
169 void tree_cursor_t::invalidate()
170 {
171 assert(is_tracked());
172 ref_leaf_node.reset();
173 assert(is_invalid());
174 // I must be removed from LeafNode
175 }
176
177 /*
178 * tree_cursor_t::Cache
179 */
180
181 tree_cursor_t::Cache::Cache(Ref<LeafNode>& ref_leaf_node)
182 : ref_leaf_node{ref_leaf_node} {}
183
184 void tree_cursor_t::Cache::update_all(const node_version_t& current_version,
185 const key_view_t& _key_view,
186 const value_header_t* _p_value_header)
187 {
188 assert(_p_value_header);
189
190 needs_update_all = false;
191 version = current_version;
192
193 p_node_base = ref_leaf_node->read();
194 key_view = _key_view;
195 p_value_header = _p_value_header;
196 assert((const char*)p_value_header > p_node_base);
197 assert((const char*)p_value_header - p_node_base <
198 (int)ref_leaf_node->get_node_size());
199
200 value_payload_mut.reset();
201 p_value_recorder = nullptr;
202 }
203
204 void tree_cursor_t::Cache::maybe_duplicate(const node_version_t& current_version)
205 {
206 assert(!needs_update_all);
207 assert(version.layout == current_version.layout);
208 if (version.state == current_version.state) {
209 // cache is already latest.
210 } else if (version.state < current_version.state) {
211 // the extent has been copied but the layout has not been changed.
212 assert(p_node_base != nullptr);
213 assert(key_view.has_value());
214 assert(p_value_header != nullptr);
215
216 auto current_p_node_base = ref_leaf_node->read();
217 assert(current_p_node_base != p_node_base);
218 auto node_size = ref_leaf_node->get_node_size();
219
220 version.state = current_version.state;
221 reset_ptr(p_value_header, p_node_base,
222 current_p_node_base, node_size);
223 key_view->reset_to(p_node_base, current_p_node_base, node_size);
224 value_payload_mut.reset();
225 p_value_recorder = nullptr;
226
227 p_node_base = current_p_node_base;
228 } else {
229 // It is impossible to change state backwards, see node_types.h.
230 ceph_abort("impossible");
231 }
232 }
233
234 void tree_cursor_t::Cache::make_latest(
235 value_magic_t magic, const search_position_t& pos)
236 {
237 auto current_version = ref_leaf_node->get_version();
238 if (needs_update_all || version.layout != current_version.layout) {
239 auto [_key_view, _p_value_header] = ref_leaf_node->get_kv(pos);
240 update_all(current_version, _key_view, _p_value_header);
241 } else {
242 maybe_duplicate(current_version);
243 }
244 assert(p_value_header->magic == magic);
245 validate_is_latest(pos);
246 }
247
248 void tree_cursor_t::Cache::validate_is_latest(const search_position_t& pos) const
249 {
250 #ifndef NDEBUG
251 assert(!needs_update_all);
252 assert(version == ref_leaf_node->get_version());
253
254 auto [_key_view, _p_value_header] = ref_leaf_node->get_kv(pos);
255 assert(p_node_base == ref_leaf_node->read());
256 assert(key_view->compare_to(_key_view) == MatchKindCMP::EQ);
257 assert(p_value_header == _p_value_header);
258 #endif
259 }
260
261 std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
262 tree_cursor_t::Cache::prepare_mutate_value_payload(
263 context_t c, const search_position_t& pos)
264 {
265 make_latest(c.vb.get_header_magic(), pos);
266 if (!value_payload_mut.has_value()) {
267 assert(!p_value_recorder);
268 auto value_mutable = ref_leaf_node->prepare_mutate_value_payload(c);
269 auto current_version = ref_leaf_node->get_version();
270 maybe_duplicate(current_version);
271 value_payload_mut = p_value_header->get_payload_mutable(value_mutable.first);
272 p_value_recorder = value_mutable.second;
273 validate_is_latest(pos);
274 }
275 return {*value_payload_mut, p_value_recorder};
276 }
277
278 /*
279 * Node
280 */
281
282 Node::Node(NodeImplURef&& impl) : impl{std::move(impl)} {}
283
284 Node::~Node()
285 {
286 if (!is_tracked()) {
287 // possible scenarios:
288 // a. I'm erased;
289 // b. Eagain happened after the node extent is allocated/loaded
290 // and before the node is initialized correctly;
291 } else {
292 assert(!impl->is_extent_retired());
293 if (is_root()) {
294 super->do_untrack_root(*this);
295 } else {
296 _parent_info->ptr->do_untrack_child(*this);
297 }
298 }
299 }
300
301 level_t Node::level() const
302 {
303 return impl->level();
304 }
305
306 eagain_ifuture<Node::search_result_t> Node::lower_bound(
307 context_t c, const key_hobj_t& key)
308 {
309 return seastar::do_with(
310 MatchHistory(), [this, c, &key](auto& history) {
311 return lower_bound_tracked(c, key, history);
312 }
313 );
314 }
315
316 eagain_ifuture<std::pair<Ref<tree_cursor_t>, bool>> Node::insert(
317 context_t c,
318 const key_hobj_t& key,
319 value_config_t vconf,
320 Ref<Node>&& this_ref)
321 {
322 return seastar::do_with(
323 MatchHistory(), [this, c, &key, vconf,
324 this_ref = std::move(this_ref)] (auto& history) mutable {
325 return lower_bound_tracked(c, key, history
326 ).si_then([c, &key, vconf, &history,
327 this_ref = std::move(this_ref)] (auto result) mutable {
328 // the cursor in the result should already hold the root node upwards
329 this_ref.reset();
330 if (result.match() == MatchKindBS::EQ) {
331 return eagain_iertr::make_ready_future<std::pair<Ref<tree_cursor_t>, bool>>(
332 std::make_pair(result.p_cursor, false));
333 } else {
334 auto leaf_node = result.p_cursor->get_leaf_node();
335 return leaf_node->insert_value(
336 c, key, vconf, result.p_cursor->get_position(), history, result.mstat
337 ).si_then([](auto p_cursor) {
338 return seastar::make_ready_future<std::pair<Ref<tree_cursor_t>, bool>>(
339 std::make_pair(p_cursor, true));
340 });
341 }
342 });
343 }
344 );
345 }
346
347 eagain_ifuture<std::size_t> Node::erase(
348 context_t c,
349 const key_hobj_t& key,
350 Ref<Node>&& this_ref)
351 {
352 return lower_bound(c, key
353 ).si_then([c, this_ref = std::move(this_ref)] (auto result) mutable {
354 // the cursor in the result should already hold the root node upwards
355 this_ref.reset();
356 if (result.match() != MatchKindBS::EQ) {
357 return eagain_iertr::make_ready_future<std::size_t>(0);
358 }
359 auto ref_cursor = result.p_cursor;
360 return ref_cursor->erase(c, false
361 ).si_then([ref_cursor] (auto next_cursor) {
362 assert(ref_cursor->is_invalid());
363 assert(!next_cursor);
364 return std::size_t(1);
365 });
366 });
367 }
368
369 eagain_ifuture<tree_stats_t> Node::get_tree_stats(context_t c)
370 {
371 return seastar::do_with(
372 tree_stats_t(), [this, c](auto& stats) {
373 return do_get_tree_stats(c, stats).si_then([&stats] {
374 return stats;
375 });
376 }
377 );
378 }
379
380 std::ostream& Node::dump(std::ostream& os) const
381 {
382 return impl->dump(os);
383 }
384
385 std::ostream& Node::dump_brief(std::ostream& os) const
386 {
387 return impl->dump_brief(os);
388 }
389
390 const std::string& Node::get_name() const
391 {
392 return impl->get_name();
393 }
394
395 void Node::test_make_destructable(
396 context_t c, NodeExtentMutable& mut, Super::URef&& _super)
397 {
398 impl->test_set_tail(mut);
399 make_root(c, std::move(_super));
400 }
401
402 eagain_ifuture<> Node::mkfs(context_t c, RootNodeTracker& root_tracker)
403 {
404 LOG_PREFIX(OTree::Node::mkfs);
405 return LeafNode::allocate_root(c, root_tracker
406 ).si_then([c, FNAME](auto ret) {
407 INFOT("allocated root {}", c.t, ret->get_name());
408 });
409 }
410
411 eagain_ifuture<Ref<Node>> Node::load_root(context_t c, RootNodeTracker& root_tracker)
412 {
413 LOG_PREFIX(OTree::Node::load_root);
414 return c.nm.get_super(c.t, root_tracker
415 ).handle_error_interruptible(
416 eagain_iertr::pass_further{},
417 crimson::ct_error::input_output_error::handle([FNAME, c] {
418 ERRORT("EIO during get_super()", c.t);
419 ceph_abort("fatal error");
420 })
421 ).si_then([c, &root_tracker, FNAME](auto&& _super) {
422 assert(_super);
423 auto root_addr = _super->get_root_laddr();
424 assert(root_addr != L_ADDR_NULL);
425 TRACET("loading root_addr={:x} ...", c.t, root_addr);
426 return Node::load(c, root_addr, true
427 ).si_then([c, _super = std::move(_super),
428 &root_tracker, FNAME](auto root) mutable {
429 TRACET("loaded {}", c.t, root->get_name());
430 assert(root->impl->field_type() == field_type_t::N0);
431 root->as_root(std::move(_super));
432 std::ignore = c; // as only used in an assert
433 std::ignore = root_tracker;
434 assert(root == root_tracker.get_root(c.t));
435 return seastar::make_ready_future<Ref<Node>>(root);
436 });
437 });
438 }
439
440 void Node::make_root(context_t c, Super::URef&& _super)
441 {
442 _super->write_root_laddr(c, impl->laddr());
443 as_root(std::move(_super));
444 c.t.get_onode_tree_stats().depth = static_cast<uint64_t>(level()) + 1;
445 }
446
447 void Node::as_root(Super::URef&& _super)
448 {
449 assert(!is_tracked());
450 assert(_super->get_root_laddr() == impl->laddr());
451 assert(impl->is_level_tail());
452 super = std::move(_super);
453 super->do_track_root(*this);
454 assert(is_root());
455 }
456
457 Super::URef Node::deref_super()
458 {
459 assert(is_root());
460 assert(super->get_root_laddr() == impl->laddr());
461 assert(impl->is_level_tail());
462 super->do_untrack_root(*this);
463 auto ret = std::move(super);
464 assert(!is_tracked());
465 return ret;
466 }
467
468 eagain_ifuture<> Node::upgrade_root(context_t c, laddr_t hint)
469 {
470 LOG_PREFIX(OTree::Node::upgrade_root);
471 assert(impl->field_type() == field_type_t::N0);
472 auto super_to_move = deref_super();
473 return InternalNode::allocate_root(
474 c, hint, impl->level(), impl->laddr(), std::move(super_to_move)
475 ).si_then([this, c, FNAME](auto new_root) {
476 as_child(search_position_t::end(), new_root);
477 INFOT("upgraded from {} to {}",
478 c.t, get_name(), new_root->get_name());
479 });
480 }
481
482 template <bool VALIDATE>
483 void Node::as_child(const search_position_t& pos, Ref<InternalNode> parent_node)
484 {
485 assert(!is_tracked() || !is_root());
486 #ifndef NDEBUG
487 // Although I might have an outdated _parent_info during fixing,
488 // I must be already untracked.
489 if (_parent_info.has_value()) {
490 assert(!_parent_info->ptr->check_is_tracking(*this));
491 }
492 #endif
493 _parent_info = parent_info_t{pos, parent_node};
494 parent_info().ptr->do_track_child<VALIDATE>(*this);
495 assert(!is_root());
496 }
497 template void Node::as_child<true>(const search_position_t&, Ref<InternalNode>);
498 template void Node::as_child<false>(const search_position_t&, Ref<InternalNode>);
499
500 Ref<InternalNode> Node::deref_parent()
501 {
502 assert(!is_root());
503 auto parent_ref = std::move(parent_info().ptr);
504 parent_ref->do_untrack_child(*this);
505 _parent_info.reset();
506 assert(!is_tracked());
507 return parent_ref;
508 }
509
510 eagain_ifuture<> Node::apply_split_to_parent(
511 context_t c,
512 Ref<Node>&& this_ref,
513 Ref<Node>&& split_right,
514 bool update_right_index)
515 {
516 assert(!is_root());
517 assert(this == this_ref.get());
518 // TODO(cross-node string dedup)
519 return parent_info().ptr->apply_child_split(
520 c, std::move(this_ref), std::move(split_right), update_right_index);
521 }
522
523 eagain_ifuture<Ref<tree_cursor_t>>
524 Node::get_next_cursor_from_parent(context_t c)
525 {
526 assert(!impl->is_level_tail());
527 assert(!is_root());
528 return parent_info().ptr->get_next_cursor(c, parent_info().position);
529 }
530
531 template <bool FORCE_MERGE>
532 eagain_ifuture<>
533 Node::try_merge_adjacent(
534 context_t c, bool update_parent_index, Ref<Node>&& this_ref)
535 {
536 LOG_PREFIX(OTree::Node::try_merge_adjacent);
537 assert(this == this_ref.get());
538 impl->validate_non_empty();
539 assert(!is_root());
540 if constexpr (!FORCE_MERGE) {
541 if (!impl->is_size_underflow() &&
542 !impl->has_single_value()) {
543 // skip merge
544 if (update_parent_index) {
545 return fix_parent_index(c, std::move(this_ref), false);
546 } else {
547 parent_info().ptr->validate_child_tracked(*this);
548 return eagain_iertr::now();
549 }
550 }
551 }
552
553 return parent_info().ptr->get_child_peers(c, parent_info().position
554 ).si_then([c, this_ref = std::move(this_ref), this, FNAME,
555 update_parent_index] (auto lr_nodes) mutable -> eagain_ifuture<> {
556 auto& [lnode, rnode] = lr_nodes;
557 Ref<Node> left_for_merge;
558 Ref<Node> right_for_merge;
559 Ref<Node>* p_this_ref;
560 bool is_left;
561 if (!lnode && !rnode) {
562 // XXX: this is possible before node rebalance is implemented,
563 // when its parent cannot merge with its peers and has only one child
564 // (this node).
565 p_this_ref = &this_ref;
566 } else if (!lnode) {
567 left_for_merge = std::move(this_ref);
568 p_this_ref = &left_for_merge;
569 right_for_merge = std::move(rnode);
570 is_left = true;
571 } else if (!rnode) {
572 left_for_merge = std::move(lnode);
573 right_for_merge = std::move(this_ref);
574 p_this_ref = &right_for_merge;
575 is_left = false;
576 } else { // lnode && rnode
577 if (lnode->impl->free_size() > rnode->impl->free_size()) {
578 left_for_merge = std::move(lnode);
579 right_for_merge = std::move(this_ref);
580 p_this_ref = &right_for_merge;
581 is_left = false;
582 } else { // lnode free size <= rnode free size
583 left_for_merge = std::move(this_ref);
584 p_this_ref = &left_for_merge;
585 right_for_merge = std::move(rnode);
586 is_left = true;
587 }
588 }
589
590 if (left_for_merge) {
591 assert(right_for_merge);
592 auto [merge_stage, merge_size] = left_for_merge->impl->evaluate_merge(
593 *right_for_merge->impl);
594 if (merge_size <= left_for_merge->impl->total_size()) {
595 // proceed merge
596 bool update_index_after_merge;
597 if (is_left) {
598 update_index_after_merge = false;
599 } else {
600 update_index_after_merge = update_parent_index;
601 }
602 INFOT("merge {} and {} at merge_stage={}, merge_size={}B, "
603 "update_index={}, is_left={} ...",
604 c.t, left_for_merge->get_name(), right_for_merge->get_name(),
605 merge_stage, merge_size, update_index_after_merge, is_left);
606 // we currently cannot generate delta depends on another extent content,
607 // so use rebuild_extent() as a workaround to rebuild the node from a
608 // fresh extent, thus no need to generate delta.
609 auto left_addr = left_for_merge->impl->laddr();
610 return left_for_merge->rebuild_extent(c
611 ).si_then([c, update_index_after_merge,
612 left_addr,
613 merge_stage = merge_stage,
614 merge_size = merge_size,
615 left_for_merge = std::move(left_for_merge),
616 right_for_merge = std::move(right_for_merge)] (auto left_mut) mutable {
617 if (left_for_merge->impl->node_type() == node_type_t::LEAF) {
618 auto& left = *static_cast<LeafNode*>(left_for_merge.get());
619 left.on_layout_change();
620 }
621 search_position_t left_last_pos = left_for_merge->impl->merge(
622 left_mut, *right_for_merge->impl, merge_stage, merge_size);
623 left_for_merge->track_merge(right_for_merge, merge_stage, left_last_pos);
624 return left_for_merge->parent_info().ptr->apply_children_merge(
625 c, std::move(left_for_merge), left_addr,
626 std::move(right_for_merge), update_index_after_merge);
627 });
628 } else {
629 // size will overflow if merge
630 }
631 }
632
633 // cannot merge
634 if (update_parent_index) {
635 return fix_parent_index(c, std::move(*p_this_ref), false);
636 } else {
637 parent_info().ptr->validate_child_tracked(*this);
638 return eagain_iertr::now();
639 }
640 // XXX: rebalance
641 });
642 }
643 template eagain_ifuture<> Node::try_merge_adjacent<true>(context_t, bool, Ref<Node>&&);
644 template eagain_ifuture<> Node::try_merge_adjacent<false>(context_t, bool, Ref<Node>&&);
645
646 eagain_ifuture<> Node::erase_node(context_t c, Ref<Node>&& this_ref)
647 {
648 // To erase a node:
649 // 1. I'm supposed to have already untracked any children or cursors
650 // 2. unlink parent/super --ptr-> me
651 // 3. unlink me --ref-> parent/super
652 // 4. retire extent
653 // 5. destruct node
654 assert(this_ref.get() == this);
655 assert(!is_tracking());
656 assert(!is_root());
657 assert(this_ref->use_count() == 1);
658 return parent_info().ptr->erase_child(c, std::move(this_ref));
659 }
660
661 template <bool FORCE_MERGE>
662 eagain_ifuture<> Node::fix_parent_index(
663 context_t c, Ref<Node>&& this_ref, bool check_downgrade)
664 {
665 assert(!is_root());
666 assert(this == this_ref.get());
667 return parent_info().ptr->fix_index<FORCE_MERGE>(
668 c, std::move(this_ref), check_downgrade);
669 }
670 template eagain_ifuture<> Node::fix_parent_index<true>(context_t, Ref<Node>&&, bool);
671 template eagain_ifuture<> Node::fix_parent_index<false>(context_t, Ref<Node>&&, bool);
672
673 eagain_ifuture<Ref<Node>> Node::load(
674 context_t c, laddr_t addr, bool expect_is_level_tail)
675 {
676 LOG_PREFIX(OTree::Node::load);
677 return c.nm.read_extent(c.t, addr
678 ).handle_error_interruptible(
679 eagain_iertr::pass_further{},
680 crimson::ct_error::input_output_error::handle(
681 [FNAME, c, addr, expect_is_level_tail] {
682 ERRORT("EIO -- addr={:x}, is_level_tail={}",
683 c.t, addr, expect_is_level_tail);
684 ceph_abort("fatal error");
685 }),
686 crimson::ct_error::invarg::handle(
687 [FNAME, c, addr, expect_is_level_tail] {
688 ERRORT("EINVAL -- addr={:x}, is_level_tail={}",
689 c.t, addr, expect_is_level_tail);
690 ceph_abort("fatal error");
691 }),
692 crimson::ct_error::enoent::handle(
693 [FNAME, c, addr, expect_is_level_tail] {
694 ERRORT("ENOENT -- addr={:x}, is_level_tail={}",
695 c.t, addr, expect_is_level_tail);
696 ceph_abort("fatal error");
697 }),
698 crimson::ct_error::erange::handle(
699 [FNAME, c, addr, expect_is_level_tail] {
700 ERRORT("ERANGE -- addr={:x}, is_level_tail={}",
701 c.t, addr, expect_is_level_tail);
702 ceph_abort("fatal error");
703 })
704 ).si_then([FNAME, c, addr, expect_is_level_tail](auto extent)
705 -> eagain_ifuture<Ref<Node>> {
706 assert(extent);
707 auto header = extent->get_header();
708 auto field_type = header.get_field_type();
709 if (!field_type) {
710 ERRORT("load addr={:x}, is_level_tail={} error, "
711 "got invalid header -- {}",
712 c.t, addr, expect_is_level_tail, extent);
713 ceph_abort("fatal error");
714 }
715 if (header.get_is_level_tail() != expect_is_level_tail) {
716 ERRORT("load addr={:x}, is_level_tail={} error, "
717 "is_level_tail mismatch -- {}",
718 c.t, addr, expect_is_level_tail, extent);
719 ceph_abort("fatal error");
720 }
721
722 auto node_type = header.get_node_type();
723 if (node_type == node_type_t::LEAF) {
724 if (extent->get_length() != c.vb.get_leaf_node_size()) {
725 ERRORT("load addr={:x}, is_level_tail={} error, "
726 "leaf length mismatch -- {}",
727 c.t, addr, expect_is_level_tail, extent);
728 ceph_abort("fatal error");
729 }
730 auto impl = LeafNodeImpl::load(extent, *field_type);
731 return eagain_iertr::make_ready_future<Ref<Node>>(
732 new LeafNode(impl.get(), std::move(impl)));
733 } else if (node_type == node_type_t::INTERNAL) {
734 if (extent->get_length() != c.vb.get_internal_node_size()) {
735 ERRORT("load addr={:x}, is_level_tail={} error, "
736 "internal length mismatch -- {}",
737 c.t, addr, expect_is_level_tail, extent);
738 ceph_abort("fatal error");
739 }
740 auto impl = InternalNodeImpl::load(extent, *field_type);
741 return eagain_iertr::make_ready_future<Ref<Node>>(
742 new InternalNode(impl.get(), std::move(impl)));
743 } else {
744 ceph_abort("impossible path");
745 }
746 });
747 }
748
749 eagain_ifuture<NodeExtentMutable> Node::rebuild_extent(context_t c)
750 {
751 LOG_PREFIX(OTree::Node::rebuild_extent);
752 DEBUGT("{} ...", c.t, get_name());
753 assert(!is_root());
754 // assume I'm already ref counted by caller
755
756 // note: laddr can be changed after rebuild, but we don't fix the parent
757 // mapping as it is part of the merge process.
758 return impl->rebuild_extent(c);
759 }
760
761 eagain_ifuture<> Node::retire(context_t c, Ref<Node>&& this_ref)
762 {
763 LOG_PREFIX(OTree::Node::retire);
764 DEBUGT("{} ...", c.t, get_name());
765 assert(this_ref.get() == this);
766 assert(!is_tracking());
767 assert(!is_tracked());
768 assert(this_ref->use_count() == 1);
769
770 return impl->retire_extent(c
771 ).si_then([this_ref = std::move(this_ref)]{ /* deallocate node */});
772 }
773
774 void Node::make_tail(context_t c)
775 {
776 LOG_PREFIX(OTree::Node::make_tail);
777 assert(!impl->is_level_tail());
778 assert(!impl->is_keys_empty());
779 DEBUGT("{} ...", c.t, get_name());
780 impl->prepare_mutate(c);
781 auto tail_pos = impl->make_tail();
782 if (impl->node_type() == node_type_t::INTERNAL) {
783 auto& node = *static_cast<InternalNode*>(this);
784 node.track_make_tail(tail_pos);
785 }
786 }
787
788 /*
789 * InternalNode
790 */
791
792 InternalNode::InternalNode(InternalNodeImpl* impl, NodeImplURef&& impl_ref)
793 : Node(std::move(impl_ref)), impl{impl} {}
794
795 eagain_ifuture<Ref<tree_cursor_t>>
796 InternalNode::get_next_cursor(context_t c, const search_position_t& pos)
797 {
798 impl->validate_non_empty();
799 if (pos.is_end()) {
800 assert(impl->is_level_tail());
801 return get_next_cursor_from_parent(c);
802 }
803
804 search_position_t next_pos = pos;
805 const laddr_packed_t* p_child_addr = nullptr;
806 impl->get_next_slot(next_pos, nullptr, &p_child_addr);
807 if (next_pos.is_end() && !impl->is_level_tail()) {
808 return get_next_cursor_from_parent(c);
809 } else {
810 if (next_pos.is_end()) {
811 p_child_addr = impl->get_tail_value();
812 }
813 assert(p_child_addr);
814 return get_or_track_child(c, next_pos, p_child_addr->value
815 ).si_then([c](auto child) {
816 return child->lookup_smallest(c);
817 });
818 }
819 }
820
821 eagain_ifuture<> InternalNode::apply_child_split(
822 context_t c, Ref<Node>&& left_child, Ref<Node>&& right_child,
823 bool update_right_index)
824 {
825 LOG_PREFIX(OTree::InternalNode::apply_child_split);
826 auto& left_pos = left_child->parent_info().position;
827
828 #ifndef NDEBUG
829 assert(left_child->parent_info().ptr.get() == this);
830 assert(!left_child->impl->is_level_tail());
831 if (left_pos.is_end()) {
832 assert(impl->is_level_tail());
833 assert(right_child->impl->is_level_tail());
834 assert(!update_right_index);
835 }
836
837 // right_child has not assigned parent yet
838 assert(!right_child->is_tracked());
839 #endif
840
841 impl->prepare_mutate(c);
842
843 DEBUGT("apply {}'s child {} to split to {}, update_index={} ...",
844 c.t, get_name(), left_child->get_name(),
845 right_child->get_name(), update_right_index);
846
847 // update layout from left_pos => left_child_addr to right_child_addr
848 auto left_child_addr = left_child->impl->laddr();
849 auto right_child_addr = right_child->impl->laddr();
850 impl->replace_child_addr(left_pos, right_child_addr, left_child_addr);
851
852 // update track from left_pos => left_child to right_child
853 replace_track(right_child, left_child, update_right_index);
854
855 auto left_key = *left_child->impl->get_pivot_index();
856 Ref<Node> this_ref = this;
857 return insert_or_split(
858 c, left_pos, left_key, left_child,
859 (update_right_index ? right_child : nullptr)
860 ).si_then([this, c,
861 this_ref = std::move(this_ref)] (auto split_right) mutable {
862 if (split_right) {
863 // even if update_right_index could be true,
864 // we haven't fixed the right_child index of this node yet,
865 // so my parent index should be correct now.
866 return apply_split_to_parent(
867 c, std::move(this_ref), std::move(split_right), false);
868 } else {
869 return eagain_iertr::now();
870 }
871 }).si_then([c, update_right_index,
872 right_child = std::move(right_child)] () mutable {
873 if (update_right_index) {
874 // XXX: might not need to call validate_tracked_children() in fix_index()
875 return right_child->fix_parent_index(c, std::move(right_child), false);
876 } else {
877 // there is no need to call try_merge_adjacent() because
878 // the filled size of the inserted node or the split right node
879 // won't be reduced if update_right_index is false.
880 return eagain_iertr::now();
881 }
882 });
883 }
884
885 eagain_ifuture<> InternalNode::erase_child(context_t c, Ref<Node>&& child_ref)
886 {
887 LOG_PREFIX(OTree::InternalNode::erase_child);
888 // this is a special version of recursive merge
889 impl->validate_non_empty();
890 assert(child_ref->use_count() == 1);
891 validate_child_tracked(*child_ref);
892
893 // fix the child's previous node as the new tail,
894 // and trigger prv_child_ref->try_merge_adjacent() at the end
895 bool fix_tail = (child_ref->parent_info().position.is_end() &&
896 !impl->is_keys_empty());
897 return eagain_iertr::now().si_then([c, this, fix_tail] {
898 if (fix_tail) {
899 search_position_t new_tail_pos;
900 const laddr_packed_t* new_tail_p_addr = nullptr;
901 impl->get_largest_slot(&new_tail_pos, nullptr, &new_tail_p_addr);
902 return get_or_track_child(c, new_tail_pos, new_tail_p_addr->value);
903 } else {
904 return eagain_iertr::make_ready_future<Ref<Node>>();
905 }
906 }).si_then([c, this, child_ref = std::move(child_ref), FNAME]
907 (auto&& new_tail_child) mutable {
908 auto child_pos = child_ref->parent_info().position;
909 if (new_tail_child) {
910 INFOT("erase {}'s child {} at pos({}), "
911 "and fix new child tail {} at pos({}) ...",
912 c.t, get_name(), child_ref->get_name(), child_pos,
913 new_tail_child->get_name(), new_tail_child->parent_info().position);
914 assert(!new_tail_child->impl->is_level_tail());
915 new_tail_child->make_tail(c);
916 assert(new_tail_child->impl->is_level_tail());
917 if (new_tail_child->impl->node_type() == node_type_t::LEAF) {
918 // no need to proceed merge because the filled size is not changed
919 new_tail_child.reset();
920 }
921 } else {
922 INFOT("erase {}'s child {} at pos({}) ...",
923 c.t, get_name(), child_ref->get_name(), child_pos);
924 }
925
926 Ref<Node> this_ref = child_ref->deref_parent();
927 assert(this_ref == this);
928 return child_ref->retire(c, std::move(child_ref)
929 ).si_then([c, this, child_pos, FNAME,
930 this_ref = std::move(this_ref)] () mutable {
931 if (impl->has_single_value()) {
932 // fast path without mutating the extent
933 DEBUGT("{} has one value left, erase ...", c.t, get_name());
934 #ifndef NDEBUG
935 if (impl->is_level_tail()) {
936 assert(child_pos.is_end());
937 } else {
938 assert(child_pos == search_position_t::begin());
939 }
940 #endif
941
942 if (is_root()) {
943 // Note: if merge/split works as expected, we should never encounter the
944 // situation when the internal root has <=1 children:
945 //
946 // A newly created internal root (see Node::upgrade_root()) will have 2
947 // children after split is finished.
948 //
949 // When merge happens, children will try to merge each other, and if the
950 // root detects there is only one child left, the root will be
951 // down-graded to the only child.
952 //
953 // In order to preserve the invariant, we need to make sure the new
954 // internal root also has at least 2 children.
955 ceph_abort("trying to erase the last item from the internal root node");
956 }
957
958 // track erase
959 assert(tracked_child_nodes.empty());
960
961 // no child should be referencing this node now, this_ref is the last one.
962 assert(this_ref->use_count() == 1);
963 return Node::erase_node(c, std::move(this_ref));
964 }
965
966 impl->prepare_mutate(c);
967 auto [erase_stage, next_or_last_pos] = impl->erase(child_pos);
968 if (child_pos.is_end()) {
969 // next_or_last_pos as last_pos
970 track_make_tail(next_or_last_pos);
971 } else {
972 // next_or_last_pos as next_pos
973 track_erase(child_pos, erase_stage);
974 }
975 validate_tracked_children();
976
977 if (is_root()) {
978 return try_downgrade_root(c, std::move(this_ref));
979 } else {
980 bool update_parent_index;
981 if (impl->is_level_tail()) {
982 update_parent_index = false;
983 } else {
984 // next_or_last_pos as next_pos
985 next_or_last_pos.is_end() ? update_parent_index = true
986 : update_parent_index = false;
987 }
988 return try_merge_adjacent(c, update_parent_index, std::move(this_ref));
989 }
990 }).si_then([c, new_tail_child = std::move(new_tail_child)] () mutable {
991 // finally, check if the new tail child needs to merge
992 if (new_tail_child && !new_tail_child->is_root()) {
993 assert(new_tail_child->impl->is_level_tail());
994 return new_tail_child->try_merge_adjacent(
995 c, false, std::move(new_tail_child));
996 } else {
997 return eagain_iertr::now();
998 }
999 });
1000 });
1001 }
1002
1003 template <bool FORCE_MERGE>
1004 eagain_ifuture<> InternalNode::fix_index(
1005 context_t c, Ref<Node>&& child, bool check_downgrade)
1006 {
1007 LOG_PREFIX(OTree::InternalNode::fix_index);
1008 impl->validate_non_empty();
1009
1010 validate_child_inconsistent(*child);
1011 auto& child_pos = child->parent_info().position;
1012 Ref<Node> this_ref = child->deref_parent();
1013 assert(this_ref == this);
1014 validate_tracked_children();
1015
1016 impl->prepare_mutate(c);
1017
1018 key_view_t new_key = *child->impl->get_pivot_index();
1019 DEBUGT("fix {}'s index of child {} at pos({}), new_key={} ...",
1020 c.t, get_name(), child->get_name(), child_pos, new_key);
1021
1022 // erase the incorrect item
1023 auto [erase_stage, next_pos] = impl->erase(child_pos);
1024 track_erase(child_pos, erase_stage);
1025 validate_tracked_children();
1026
1027 // find out whether there is a need to fix parent index recursively
1028 bool update_parent_index;
1029 if (impl->is_level_tail()) {
1030 update_parent_index = false;
1031 } else {
1032 next_pos.is_end() ? update_parent_index = true
1033 : update_parent_index = false;
1034 }
1035
1036 return insert_or_split(c, next_pos, new_key, child
1037 ).si_then([this, c, update_parent_index, check_downgrade,
1038 this_ref = std::move(this_ref)] (auto split_right) mutable {
1039 if (split_right) {
1040 // after split, the parent index to the split_right will be incorrect
1041 // if update_parent_index is true.
1042 return apply_split_to_parent(
1043 c, std::move(this_ref), std::move(split_right), update_parent_index);
1044 } else {
1045 // no split path
1046 if (is_root()) {
1047 if (check_downgrade) {
1048 return try_downgrade_root(c, std::move(this_ref));
1049 } else {
1050 // no need to call try_downgrade_root() because the number of keys
1051 // has not changed, and I must have at least 2 keys.
1052 assert(!impl->is_keys_empty());
1053 return eagain_iertr::now();
1054 }
1055 } else {
1056 // for non-root, maybe need merge adjacent or fix parent,
1057 // because the filled node size may be reduced.
1058 return try_merge_adjacent<FORCE_MERGE>(
1059 c, update_parent_index, std::move(this_ref));
1060 }
1061 }
1062 });
1063 }
1064
1065 template <bool FORCE_MERGE>
1066 eagain_ifuture<> InternalNode::apply_children_merge(
1067 context_t c, Ref<Node>&& left_child, laddr_t origin_left_addr,
1068 Ref<Node>&& right_child, bool update_index)
1069 {
1070 LOG_PREFIX(OTree::InternalNode::apply_children_merge);
1071 auto left_pos = left_child->parent_info().position;
1072 auto left_addr = left_child->impl->laddr();
1073 auto& right_pos = right_child->parent_info().position;
1074 auto right_addr = right_child->impl->laddr();
1075 DEBUGT("apply {}'s child {} (was {:#x}) at pos({}), "
1076 "to merge with {} at pos({}), update_index={} ...",
1077 c.t, get_name(), left_child->get_name(), origin_left_addr, left_pos,
1078 right_child->get_name(), right_pos, update_index);
1079
1080 #ifndef NDEBUG
1081 assert(left_child->parent_info().ptr == this);
1082 assert(!left_pos.is_end());
1083 const laddr_packed_t* p_value_left;
1084 impl->get_slot(left_pos, nullptr, &p_value_left);
1085 assert(p_value_left->value == origin_left_addr);
1086
1087 assert(right_child->use_count() == 1);
1088 assert(right_child->parent_info().ptr == this);
1089 const laddr_packed_t* p_value_right;
1090 if (right_pos.is_end()) {
1091 assert(right_child->impl->is_level_tail());
1092 assert(left_child->impl->is_level_tail());
1093 assert(impl->is_level_tail());
1094 assert(!update_index);
1095 p_value_right = impl->get_tail_value();
1096 } else {
1097 assert(!right_child->impl->is_level_tail());
1098 assert(!left_child->impl->is_level_tail());
1099 impl->get_slot(right_pos, nullptr, &p_value_right);
1100 }
1101 assert(p_value_right->value == right_addr);
1102 #endif
1103
1104 // XXX: we may jump to try_downgrade_root() without mutating this node.
1105
1106 // update layout from right_pos => right_addr to left_addr
1107 impl->prepare_mutate(c);
1108 impl->replace_child_addr(right_pos, left_addr, right_addr);
1109
1110 // update track from right_pos => right_child to left_child
1111 left_child->deref_parent();
1112 replace_track(left_child, right_child, update_index);
1113
1114 // erase left_pos from layout
1115 auto [erase_stage, next_pos] = impl->erase(left_pos);
1116 track_erase<false>(left_pos, erase_stage);
1117 assert(next_pos == left_child->parent_info().position);
1118
1119 // All good to retire the right_child.
1120 // I'm already ref-counted by left_child.
1121 return right_child->retire(c, std::move(right_child)
1122 ).si_then([c, this, update_index,
1123 left_child = std::move(left_child)] () mutable {
1124 if (update_index) {
1125 // I'm all good but:
1126 // - my number of keys is reduced by 1
1127 // - my size may underflow, but try_merge_adjacent() is already part of fix_index()
1128 return left_child->fix_parent_index<FORCE_MERGE>(c, std::move(left_child), true);
1129 } else {
1130 validate_tracked_children();
1131 Ref<Node> this_ref = this;
1132 left_child.reset();
1133 // I'm all good but:
1134 // - my number of keys is reduced by 1
1135 // - my size may underflow
1136 if (is_root()) {
1137 return try_downgrade_root(c, std::move(this_ref));
1138 } else {
1139 return try_merge_adjacent<FORCE_MERGE>(
1140 c, false, std::move(this_ref));
1141 }
1142 }
1143 });
1144 }
1145 template eagain_ifuture<> InternalNode::apply_children_merge<true>(
1146 context_t, Ref<Node>&&, laddr_t, Ref<Node>&&, bool);
1147 template eagain_ifuture<> InternalNode::apply_children_merge<false>(
1148 context_t, Ref<Node>&&, laddr_t, Ref<Node>&&, bool);
1149
1150 eagain_ifuture<std::pair<Ref<Node>, Ref<Node>>> InternalNode::get_child_peers(
1151 context_t c, const search_position_t& pos)
1152 {
1153 // assume I'm already ref counted by caller
1154 search_position_t prev_pos;
1155 const laddr_packed_t* prev_p_child_addr = nullptr;
1156 search_position_t next_pos;
1157 const laddr_packed_t* next_p_child_addr = nullptr;
1158
1159 if (pos.is_end()) {
1160 assert(impl->is_level_tail());
1161 if (!impl->is_keys_empty()) {
1162 // got previous child only
1163 impl->get_largest_slot(&prev_pos, nullptr, &prev_p_child_addr);
1164 assert(prev_pos < pos);
1165 assert(prev_p_child_addr != nullptr);
1166 } else {
1167 // no keys, so no peer children
1168 }
1169 } else { // !pos.is_end()
1170 if (pos != search_position_t::begin()) {
1171 // got previous child
1172 prev_pos = pos;
1173 impl->get_prev_slot(prev_pos, nullptr, &prev_p_child_addr);
1174 assert(prev_pos < pos);
1175 assert(prev_p_child_addr != nullptr);
1176 } else {
1177 // is already the first child, so no previous child
1178 }
1179
1180 next_pos = pos;
1181 impl->get_next_slot(next_pos, nullptr, &next_p_child_addr);
1182 if (next_pos.is_end()) {
1183 if (impl->is_level_tail()) {
1184 // the next child is the tail
1185 next_p_child_addr = impl->get_tail_value();
1186 assert(pos < next_pos);
1187 assert(next_p_child_addr != nullptr);
1188 } else {
1189 // next child doesn't exist
1190 assert(next_p_child_addr == nullptr);
1191 }
1192 } else {
1193 // got the next child
1194 assert(pos < next_pos);
1195 assert(next_p_child_addr != nullptr);
1196 }
1197 }
1198
1199 return eagain_iertr::now().si_then([this, c, prev_pos, prev_p_child_addr] {
1200 if (prev_p_child_addr != nullptr) {
1201 return get_or_track_child(c, prev_pos, prev_p_child_addr->value);
1202 } else {
1203 return eagain_iertr::make_ready_future<Ref<Node>>();
1204 }
1205 }).si_then([this, c, next_pos, next_p_child_addr] (Ref<Node> lnode) {
1206 if (next_p_child_addr != nullptr) {
1207 return get_or_track_child(c, next_pos, next_p_child_addr->value
1208 ).si_then([lnode] (Ref<Node> rnode) {
1209 return seastar::make_ready_future<std::pair<Ref<Node>, Ref<Node>>>(
1210 lnode, rnode);
1211 });
1212 } else {
1213 return eagain_iertr::make_ready_future<std::pair<Ref<Node>, Ref<Node>>>(
1214 lnode, nullptr);
1215 }
1216 });
1217 }
1218
1219 eagain_ifuture<Ref<InternalNode>> InternalNode::allocate_root(
1220 context_t c, laddr_t hint, level_t old_root_level,
1221 laddr_t old_root_addr, Super::URef&& super)
1222 {
1223 // support tree height up to 256
1224 ceph_assert(old_root_level < MAX_LEVEL);
1225 return InternalNode::allocate(c, hint, field_type_t::N0, true, old_root_level + 1
1226 ).si_then([c, old_root_addr,
1227 super = std::move(super)](auto fresh_node) mutable {
1228 auto root = fresh_node.node;
1229 assert(root->impl->is_keys_empty());
1230 auto p_value = root->impl->get_tail_value();
1231 fresh_node.mut.copy_in_absolute(
1232 const_cast<laddr_packed_t*>(p_value), old_root_addr);
1233 root->make_root_from(c, std::move(super), old_root_addr);
1234 return root;
1235 });
1236 }
1237
1238 eagain_ifuture<Ref<tree_cursor_t>>
1239 InternalNode::lookup_smallest(context_t c)
1240 {
1241 impl->validate_non_empty();
1242 auto position = search_position_t::begin();
1243 const laddr_packed_t* p_child_addr;
1244 impl->get_slot(position, nullptr, &p_child_addr);
1245 return get_or_track_child(c, position, p_child_addr->value
1246 ).si_then([c](auto child) {
1247 return child->lookup_smallest(c);
1248 });
1249 }
1250
1251 eagain_ifuture<Ref<tree_cursor_t>>
1252 InternalNode::lookup_largest(context_t c)
1253 {
1254 // NOTE: unlike LeafNode::lookup_largest(), this only works for the tail
1255 // internal node to return the tail child address.
1256 impl->validate_non_empty();
1257 assert(impl->is_level_tail());
1258 auto p_child_addr = impl->get_tail_value();
1259 return get_or_track_child(c, search_position_t::end(), p_child_addr->value
1260 ).si_then([c](auto child) {
1261 return child->lookup_largest(c);
1262 });
1263 }
1264
1265 eagain_ifuture<Node::search_result_t>
1266 InternalNode::lower_bound_tracked(
1267 context_t c, const key_hobj_t& key, MatchHistory& history)
1268 {
1269 auto result = impl->lower_bound(key, history);
1270 return get_or_track_child(c, result.position, result.p_value->value
1271 ).si_then([c, &key, &history](auto child) {
1272 // XXX(multi-type): pass result.mstat to child
1273 return child->lower_bound_tracked(c, key, history);
1274 });
1275 }
1276
1277 eagain_ifuture<> InternalNode::do_get_tree_stats(
1278 context_t c, tree_stats_t& stats)
1279 {
1280 impl->validate_non_empty();
1281 auto nstats = impl->get_stats();
1282 stats.size_persistent_internal += nstats.size_persistent;
1283 stats.size_filled_internal += nstats.size_filled;
1284 stats.size_logical_internal += nstats.size_logical;
1285 stats.size_overhead_internal += nstats.size_overhead;
1286 stats.size_value_internal += nstats.size_value;
1287 stats.num_kvs_internal += nstats.num_kvs;
1288 stats.num_nodes_internal += 1;
1289
1290 Ref<Node> this_ref = this;
1291 return seastar::do_with(
1292 search_position_t(), (const laddr_packed_t*)(nullptr),
1293 [this, this_ref, c, &stats](auto& pos, auto& p_child_addr) {
1294 pos = search_position_t::begin();
1295 impl->get_slot(pos, nullptr, &p_child_addr);
1296 return trans_intr::repeat(
1297 [this, this_ref, c, &stats, &pos, &p_child_addr]()
1298 -> eagain_ifuture<seastar::stop_iteration> {
1299 return get_or_track_child(c, pos, p_child_addr->value
1300 ).si_then([c, &stats](auto child) {
1301 return child->do_get_tree_stats(c, stats);
1302 }).si_then([this, this_ref, &pos, &p_child_addr] {
1303 if (pos.is_end()) {
1304 return seastar::stop_iteration::yes;
1305 } else {
1306 impl->get_next_slot(pos, nullptr, &p_child_addr);
1307 if (pos.is_end()) {
1308 if (impl->is_level_tail()) {
1309 p_child_addr = impl->get_tail_value();
1310 return seastar::stop_iteration::no;
1311 } else {
1312 return seastar::stop_iteration::yes;
1313 }
1314 } else {
1315 return seastar::stop_iteration::no;
1316 }
1317 }
1318 });
1319 });
1320 }
1321 );
1322 }
1323
1324 void InternalNode::track_merge(
1325 Ref<Node> _right_node, match_stage_t stage, search_position_t& left_last_pos)
1326 {
1327 assert(level() == _right_node->level());
1328 assert(impl->node_type() == _right_node->impl->node_type());
1329 auto& right_node = *static_cast<InternalNode*>(_right_node.get());
1330 if (right_node.tracked_child_nodes.empty()) {
1331 return;
1332 }
1333
1334 match_stage_t curr_stage = STAGE_BOTTOM;
1335
1336 // prepare the initial left_last_pos for offset
1337 while (curr_stage < stage) {
1338 left_last_pos.index_by_stage(curr_stage) = 0;
1339 ++curr_stage;
1340 }
1341 ++left_last_pos.index_by_stage(curr_stage);
1342
1343 // fix the tracked child nodes of right_node, stage by stage.
1344 auto& right_tracked_children = right_node.tracked_child_nodes;
1345 auto rit = right_tracked_children.begin();
1346 while (curr_stage <= STAGE_TOP) {
1347 auto right_pos_until = search_position_t::begin();
1348 right_pos_until.index_by_stage(curr_stage) = INDEX_UPPER_BOUND;
1349 auto rend = right_tracked_children.lower_bound(right_pos_until);
1350 while (rit != rend) {
1351 auto new_pos = rit->second->parent_info().position;
1352 assert(new_pos == rit->first);
1353 assert(rit->second->parent_info().ptr == &right_node);
1354 new_pos += left_last_pos;
1355 auto p_child = rit->second;
1356 rit = right_tracked_children.erase(rit);
1357 p_child->as_child(new_pos, this);
1358 }
1359 left_last_pos.index_by_stage(curr_stage) = 0;
1360 ++curr_stage;
1361 }
1362
1363 // fix the end tracked child node of right_node, if exists.
1364 if (rit != right_tracked_children.end()) {
1365 assert(rit->first == search_position_t::end());
1366 assert(rit->second->parent_info().position == search_position_t::end());
1367 assert(right_node.impl->is_level_tail());
1368 assert(impl->is_level_tail());
1369 auto p_child = rit->second;
1370 rit = right_tracked_children.erase(rit);
1371 p_child->as_child(search_position_t::end(), this);
1372 }
1373 assert(right_tracked_children.empty());
1374
1375 validate_tracked_children();
1376 }
1377
1378 eagain_ifuture<> InternalNode::test_clone_root(
1379 context_t c_other, RootNodeTracker& tracker_other) const
1380 {
1381 assert(is_root());
1382 assert(impl->is_level_tail());
1383 assert(impl->field_type() == field_type_t::N0);
1384 Ref<const Node> this_ref = this;
1385 return InternalNode::allocate(c_other, L_ADDR_MIN, field_type_t::N0, true, impl->level()
1386 ).si_then([this, c_other, &tracker_other](auto fresh_other) {
1387 impl->test_copy_to(fresh_other.mut);
1388 auto cloned_root = fresh_other.node;
1389 return c_other.nm.get_super(c_other.t, tracker_other
1390 ).handle_error_interruptible(
1391 eagain_iertr::pass_further{},
1392 crimson::ct_error::assert_all{"Invalid error during test clone"}
1393 ).si_then([c_other, cloned_root](auto&& super_other) {
1394 assert(super_other);
1395 cloned_root->make_root_new(c_other, std::move(super_other));
1396 return cloned_root;
1397 });
1398 }).si_then([this_ref, this, c_other](auto cloned_root) {
1399 // clone tracked children
1400 // In some unit tests, the children are stubbed out that they
1401 // don't exist in NodeExtentManager, and are only tracked in memory.
1402 return trans_intr::do_for_each(
1403 tracked_child_nodes.begin(),
1404 tracked_child_nodes.end(),
1405 [this_ref, c_other, cloned_root](auto& kv) {
1406 assert(kv.first == kv.second->parent_info().position);
1407 return kv.second->test_clone_non_root(c_other, cloned_root);
1408 }
1409 );
1410 });
1411 }
1412
1413 eagain_ifuture<> InternalNode::try_downgrade_root(
1414 context_t c, Ref<Node>&& this_ref)
1415 {
1416 LOG_PREFIX(OTree::InternalNode::try_downgrade_root);
1417 assert(this_ref.get() == this);
1418 assert(is_root());
1419 assert(impl->is_level_tail());
1420 if (!impl->is_keys_empty()) {
1421 // I have more than 1 values, no need to downgrade
1422 return eagain_iertr::now();
1423 }
1424
1425 // proceed downgrade root to the only child
1426 laddr_t child_addr = impl->get_tail_value()->value;
1427 return get_or_track_child(c, search_position_t::end(), child_addr
1428 ).si_then([c, this, FNAME,
1429 this_ref = std::move(this_ref)] (auto child) mutable {
1430 INFOT("downgrade {} to new root {}",
1431 c.t, get_name(), child->get_name());
1432 // Invariant, see InternalNode::erase_child()
1433 // the new internal root should have at least 2 children.
1434 assert(child->impl->is_level_tail());
1435 if (child->impl->node_type() == node_type_t::INTERNAL) {
1436 ceph_assert(!child->impl->is_keys_empty());
1437 }
1438
1439 assert(tracked_child_nodes.size() == 1);
1440 child->deref_parent();
1441 auto super_to_move = deref_super();
1442 child->make_root_from(c, std::move(super_to_move), impl->laddr());
1443 return retire(c, std::move(this_ref));
1444 });
1445 }
1446
1447 eagain_ifuture<Ref<InternalNode>> InternalNode::insert_or_split(
1448 context_t c,
1449 const search_position_t& pos,
1450 const key_view_t& insert_key,
1451 Ref<Node> insert_child,
1452 Ref<Node> outdated_child)
1453 {
1454 LOG_PREFIX(OTree::InternalNode::insert_or_split);
1455 // XXX: check the insert_child is unlinked from this node
1456 #ifndef NDEBUG
1457 auto _insert_key = *insert_child->impl->get_pivot_index();
1458 assert(insert_key.compare_to(_insert_key) == MatchKindCMP::EQ);
1459 #endif
1460 auto insert_value = insert_child->impl->laddr();
1461 auto insert_pos = pos;
1462 DEBUGT("insert {} with insert_key={}, insert_child={}, insert_pos({}), "
1463 "outdated_child={} ...",
1464 c.t, get_name(), insert_key, insert_child->get_name(),
1465 insert_pos, (outdated_child ? "True" : "False"));
1466 auto [insert_stage, insert_size] = impl->evaluate_insert(
1467 insert_key, insert_value, insert_pos);
1468 auto free_size = impl->free_size();
1469 if (free_size >= insert_size) {
1470 // proceed to insert
1471 [[maybe_unused]] auto p_value = impl->insert(
1472 insert_key, insert_value, insert_pos, insert_stage, insert_size);
1473 assert(impl->free_size() == free_size - insert_size);
1474 assert(insert_pos <= pos);
1475 assert(p_value->value == insert_value);
1476
1477 if (outdated_child) {
1478 track_insert<false>(insert_pos, insert_stage, insert_child);
1479 validate_child_inconsistent(*outdated_child);
1480 #ifndef NDEBUG
1481 do_untrack_child(*outdated_child);
1482 validate_tracked_children();
1483 do_track_child<false>(*outdated_child);
1484 #endif
1485 } else {
1486 track_insert(insert_pos, insert_stage, insert_child);
1487 validate_tracked_children();
1488 }
1489
1490 return eagain_iertr::make_ready_future<Ref<InternalNode>>(nullptr);
1491 }
1492
1493 // proceed to split with insert
1494 // assume I'm already ref-counted by caller
1495 laddr_t left_hint, right_hint;
1496 {
1497 key_view_t left_key;
1498 impl->get_slot(search_position_t::begin(), &left_key, nullptr);
1499 left_hint = left_key.get_hint();
1500 key_view_t right_key;
1501 impl->get_largest_slot(nullptr, &right_key, nullptr);
1502 right_hint = right_key.get_hint();
1503 }
1504 return (is_root() ? upgrade_root(c, left_hint) : eagain_iertr::now()
1505 ).si_then([this, c, right_hint] {
1506 return InternalNode::allocate(
1507 c, right_hint, impl->field_type(), impl->is_level_tail(), impl->level());
1508 }).si_then([this, insert_key, insert_child, insert_pos,
1509 insert_stage=insert_stage, insert_size=insert_size,
1510 outdated_child, c, FNAME](auto fresh_right) mutable {
1511 // I'm the left_node and need to split into the right_node
1512 auto right_node = fresh_right.node;
1513 INFOT("proceed split {} to fresh {} with insert_child={},"
1514 " outdated_child={} ...",
1515 c.t, get_name(), right_node->get_name(),
1516 insert_child->get_name(),
1517 (outdated_child ? outdated_child->get_name() : "N/A"));
1518 auto insert_value = insert_child->impl->laddr();
1519 auto [split_pos, is_insert_left, p_value] = impl->split_insert(
1520 fresh_right.mut, *right_node->impl, insert_key, insert_value,
1521 insert_pos, insert_stage, insert_size);
1522 assert(p_value->value == insert_value);
1523 track_split(split_pos, right_node);
1524
1525 if (outdated_child) {
1526 if (is_insert_left) {
1527 track_insert<false>(insert_pos, insert_stage, insert_child);
1528 } else {
1529 right_node->template track_insert<false>(insert_pos, insert_stage, insert_child);
1530 }
1531 #ifndef NDEBUG
1532 auto& _parent = outdated_child->parent_info().ptr;
1533 _parent->validate_child_inconsistent(*outdated_child);
1534 _parent->do_untrack_child(*outdated_child);
1535 validate_tracked_children();
1536 right_node->validate_tracked_children();
1537 _parent->do_track_child<false>(*outdated_child);
1538 #endif
1539 } else {
1540 if (is_insert_left) {
1541 track_insert(insert_pos, insert_stage, insert_child);
1542 } else {
1543 right_node->track_insert(insert_pos, insert_stage, insert_child);
1544 }
1545 validate_tracked_children();
1546 right_node->validate_tracked_children();
1547 }
1548 return right_node;
1549 });
1550 }
1551
1552 eagain_ifuture<Ref<Node>> InternalNode::get_or_track_child(
1553 context_t c, const search_position_t& position, laddr_t child_addr)
1554 {
1555 LOG_PREFIX(OTree::InternalNode::get_or_track_child);
1556 Ref<Node> this_ref = this;
1557 return [this, position, child_addr, c, FNAME] {
1558 auto found = tracked_child_nodes.find(position);
1559 if (found != tracked_child_nodes.end()) {
1560 TRACET("loaded child tracked {} at pos({}) addr={:x}",
1561 c.t, found->second->get_name(), position, child_addr);
1562 return eagain_iertr::make_ready_future<Ref<Node>>(found->second);
1563 }
1564 // the child is not loaded yet
1565 TRACET("loading child at pos({}) addr={:x} ...",
1566 c.t, position, child_addr);
1567 bool level_tail = position.is_end();
1568 return Node::load(c, child_addr, level_tail
1569 ).si_then([this, position, c, FNAME] (auto child) {
1570 TRACET("loaded child untracked {}",
1571 c.t, child->get_name());
1572 if (child->level() + 1 != level()) {
1573 ERRORT("loaded child {} error from parent {} at pos({}), level mismatch",
1574 c.t, child->get_name(), get_name(), position);
1575 ceph_abort("fatal error");
1576 }
1577 child->as_child(position, this);
1578 return child;
1579 });
1580 }().si_then([this_ref, this, position, child_addr] (auto child) {
1581 assert(child_addr == child->impl->laddr());
1582 assert(position == child->parent_info().position);
1583 std::ignore = position;
1584 std::ignore = child_addr;
1585 validate_child_tracked(*child);
1586 return child;
1587 });
1588 }
1589
1590 template <bool VALIDATE>
1591 void InternalNode::track_insert(
1592 const search_position_t& insert_pos, match_stage_t insert_stage,
1593 Ref<Node> insert_child, Ref<Node> nxt_child)
1594 {
1595 // update tracks
1596 auto pos_upper_bound = insert_pos;
1597 pos_upper_bound.index_by_stage(insert_stage) = INDEX_UPPER_BOUND;
1598 auto first = tracked_child_nodes.lower_bound(insert_pos);
1599 auto last = tracked_child_nodes.lower_bound(pos_upper_bound);
1600 std::vector<Node*> nodes;
1601 std::for_each(first, last, [&nodes](auto& kv) {
1602 nodes.push_back(kv.second);
1603 });
1604 tracked_child_nodes.erase(first, last);
1605 for (auto& node : nodes) {
1606 auto _pos = node->parent_info().position;
1607 assert(!_pos.is_end());
1608 ++_pos.index_by_stage(insert_stage);
1609 node->as_child<VALIDATE>(_pos, this);
1610 }
1611 // track insert
1612 insert_child->as_child(insert_pos, this);
1613
1614 #ifndef NDEBUG
1615 // validate left_child is before right_child
1616 if (nxt_child) {
1617 auto iter = tracked_child_nodes.find(insert_pos);
1618 ++iter;
1619 assert(iter->second == nxt_child);
1620 }
1621 #endif
1622 }
1623 template void InternalNode::track_insert<true>(const search_position_t&, match_stage_t, Ref<Node>, Ref<Node>);
1624 template void InternalNode::track_insert<false>(const search_position_t&, match_stage_t, Ref<Node>, Ref<Node>);
1625
1626 void InternalNode::replace_track(
1627 Ref<Node> new_child, Ref<Node> old_child, bool is_new_child_outdated)
1628 {
1629 assert(!new_child->is_tracked());
1630 auto& pos = old_child->parent_info().position;
1631 auto this_ref = old_child->deref_parent();
1632 assert(this_ref == this);
1633 if (is_new_child_outdated) {
1634 // we need to keep track of the outdated child through
1635 // insert and split.
1636 new_child->as_child<false>(pos, this);
1637 } else {
1638 new_child->as_child(pos, this);
1639 }
1640
1641 #ifndef NDEBUG
1642 if (is_new_child_outdated) {
1643 validate_child_inconsistent(*new_child);
1644 } else {
1645 validate_child_tracked(*new_child);
1646 }
1647 #endif
1648 }
1649
1650 void InternalNode::track_split(
1651 const search_position_t& split_pos, Ref<InternalNode> right_node)
1652 {
1653 auto iter = tracked_child_nodes.lower_bound(split_pos);
1654 while (iter != tracked_child_nodes.end()) {
1655 auto new_pos = iter->first;
1656 auto p_node = iter->second;
1657 iter = tracked_child_nodes.erase(iter);
1658 new_pos -= split_pos;
1659 p_node->as_child<false>(new_pos, right_node);
1660 }
1661 }
1662
1663 template <bool VALIDATE>
1664 void InternalNode::track_erase(
1665 const search_position_t& erase_pos, match_stage_t erase_stage)
1666 {
1667 auto first = tracked_child_nodes.lower_bound(erase_pos);
1668 assert(first == tracked_child_nodes.end() ||
1669 first->first != erase_pos);
1670 auto pos_upper_bound = erase_pos;
1671 pos_upper_bound.index_by_stage(erase_stage) = INDEX_UPPER_BOUND;
1672 auto last = tracked_child_nodes.lower_bound(pos_upper_bound);
1673 std::vector<Node*> p_nodes;
1674 std::for_each(first, last, [&p_nodes](auto& kv) {
1675 p_nodes.push_back(kv.second);
1676 });
1677 tracked_child_nodes.erase(first, last);
1678 for (auto& p_node: p_nodes) {
1679 auto new_pos = p_node->parent_info().position;
1680 assert(new_pos.index_by_stage(erase_stage) > 0);
1681 --new_pos.index_by_stage(erase_stage);
1682 p_node->as_child<VALIDATE>(new_pos, this);
1683 }
1684 }
1685 template void InternalNode::track_erase<true>(const search_position_t&, match_stage_t);
1686 template void InternalNode::track_erase<false>(const search_position_t&, match_stage_t);
1687
1688 void InternalNode::track_make_tail(const search_position_t& last_pos)
1689 {
1690 // assume I'm ref counted by the caller.
1691 assert(impl->is_level_tail());
1692 assert(!last_pos.is_end());
1693 assert(tracked_child_nodes.find(search_position_t::end()) ==
1694 tracked_child_nodes.end());
1695 auto last_it = tracked_child_nodes.find(last_pos);
1696 if (last_it != tracked_child_nodes.end()) {
1697 assert(std::next(last_it) == tracked_child_nodes.end());
1698 auto p_last_child = last_it->second;
1699 tracked_child_nodes.erase(last_it);
1700 p_last_child->as_child(search_position_t::end(), this);
1701 } else {
1702 assert(tracked_child_nodes.lower_bound(last_pos) ==
1703 tracked_child_nodes.end());
1704 }
1705 }
1706
1707 void InternalNode::validate_child(const Node& child) const
1708 {
1709 #ifndef NDEBUG
1710 assert(impl->level() - 1 == child.impl->level());
1711 assert(this == child.parent_info().ptr);
1712 auto& child_pos = child.parent_info().position;
1713 if (child_pos.is_end()) {
1714 assert(impl->is_level_tail());
1715 assert(child.impl->is_level_tail());
1716 assert(impl->get_tail_value()->value == child.impl->laddr());
1717 } else {
1718 assert(!child.impl->is_level_tail());
1719 key_view_t index_key;
1720 const laddr_packed_t* p_child_addr;
1721 impl->get_slot(child_pos, &index_key, &p_child_addr);
1722 assert(index_key.compare_to(*child.impl->get_pivot_index()) == MatchKindCMP::EQ);
1723 assert(p_child_addr->value == child.impl->laddr());
1724 }
1725 // XXX(multi-type)
1726 assert(impl->field_type() <= child.impl->field_type());
1727 #endif
1728 }
1729
1730 void InternalNode::validate_child_inconsistent(const Node& child) const
1731 {
1732 #ifndef NDEBUG
1733 assert(impl->level() - 1 == child.impl->level());
1734 assert(check_is_tracking(child));
1735 auto& child_pos = child.parent_info().position;
1736 // the tail value has no key to fix
1737 assert(!child_pos.is_end());
1738 assert(!child.impl->is_level_tail());
1739
1740 key_view_t current_key;
1741 const laddr_packed_t* p_value;
1742 impl->get_slot(child_pos, &current_key, &p_value);
1743 key_view_t new_key = *child.impl->get_pivot_index();
1744 assert(current_key.compare_to(new_key) != MatchKindCMP::EQ);
1745 assert(p_value->value == child.impl->laddr());
1746 #endif
1747 }
1748
1749 eagain_ifuture<InternalNode::fresh_node_t> InternalNode::allocate(
1750 context_t c, laddr_t hint, field_type_t field_type, bool is_level_tail, level_t level)
1751 {
1752 return InternalNodeImpl::allocate(c, hint, field_type, is_level_tail, level
1753 ).si_then([](auto&& fresh_impl) {
1754 auto node = Ref<InternalNode>(new InternalNode(
1755 fresh_impl.impl.get(), std::move(fresh_impl.impl)));
1756 return fresh_node_t{node, fresh_impl.mut};
1757 });
1758 }
1759
1760 /*
1761 * LeafNode
1762 */
1763
1764 LeafNode::LeafNode(LeafNodeImpl* impl, NodeImplURef&& impl_ref)
1765 : Node(std::move(impl_ref)), impl{impl} {}
1766
1767 bool LeafNode::is_level_tail() const
1768 {
1769 return impl->is_level_tail();
1770 }
1771
1772 node_version_t LeafNode::get_version() const
1773 {
1774 return {layout_version, impl->get_extent_state()};
1775 }
1776
1777 const char* LeafNode::read() const
1778 {
1779 return impl->read();
1780 }
1781
1782 extent_len_t LeafNode::get_node_size() const
1783 {
1784 return impl->get_node_size();
1785 }
1786
1787 std::tuple<key_view_t, const value_header_t*>
1788 LeafNode::get_kv(const search_position_t& pos) const
1789 {
1790 key_view_t key_view;
1791 const value_header_t* p_value_header;
1792 impl->get_slot(pos, &key_view, &p_value_header);
1793 return {key_view, p_value_header};
1794 }
1795
1796 eagain_ifuture<Ref<tree_cursor_t>>
1797 LeafNode::get_next_cursor(context_t c, const search_position_t& pos)
1798 {
1799 impl->validate_non_empty();
1800 search_position_t next_pos = pos;
1801 key_view_t index_key;
1802 const value_header_t* p_value_header = nullptr;
1803 impl->get_next_slot(next_pos, &index_key, &p_value_header);
1804 if (next_pos.is_end()) {
1805 if (unlikely(is_level_tail())) {
1806 return eagain_iertr::make_ready_future<Ref<tree_cursor_t>>(
1807 tree_cursor_t::create_end(this));
1808 } else {
1809 return get_next_cursor_from_parent(c);
1810 }
1811 } else {
1812 return eagain_iertr::make_ready_future<Ref<tree_cursor_t>>(
1813 get_or_track_cursor(next_pos, index_key, p_value_header));
1814 }
1815 }
1816
1817 template <bool FORCE_MERGE>
1818 eagain_ifuture<Ref<tree_cursor_t>>
1819 LeafNode::erase(context_t c, const search_position_t& pos, bool get_next)
1820 {
1821 LOG_PREFIX(OTree::LeafNode::erase);
1822 assert(!pos.is_end());
1823 assert(!impl->is_keys_empty());
1824 Ref<Node> this_ref = this;
1825 DEBUGT("erase {}'s pos({}), get_next={} ...",
1826 c.t, get_name(), pos, get_next);
1827 ++(c.t.get_onode_tree_stats().num_erases);
1828
1829 // get the next cursor
1830 return eagain_iertr::now().si_then([c, &pos, get_next, this] {
1831 if (get_next) {
1832 return get_next_cursor(c, pos);
1833 } else {
1834 return eagain_iertr::make_ready_future<Ref<tree_cursor_t>>();
1835 }
1836 }).si_then([c, &pos, this_ref = std::move(this_ref),
1837 this, FNAME] (Ref<tree_cursor_t> next_cursor) mutable {
1838 if (next_cursor && next_cursor->is_end()) {
1839 // reset the node reference from the end cursor
1840 next_cursor.reset();
1841 }
1842 return eagain_iertr::now().si_then(
1843 [c, &pos, this_ref = std::move(this_ref), this, FNAME] () mutable {
1844 assert_moveable(this_ref);
1845 #ifndef NDEBUG
1846 assert(!impl->is_keys_empty());
1847 if (impl->has_single_value()) {
1848 assert(pos == search_position_t::begin());
1849 }
1850 #endif
1851 if (!is_root() && impl->has_single_value()) {
1852 // we need to keep the root as an empty leaf node
1853 // fast path without mutating the extent
1854 // track_erase
1855 DEBUGT("{} has one value left, erase ...", c.t, get_name());
1856 assert(tracked_cursors.size() == 1);
1857 auto iter = tracked_cursors.begin();
1858 assert(iter->first == pos);
1859 iter->second->invalidate();
1860 tracked_cursors.clear();
1861
1862 // no cursor should be referencing this node now, this_ref is the last one.
1863 assert(this_ref->use_count() == 1);
1864 return Node::erase_node(c, std::move(this_ref));
1865 }
1866
1867 on_layout_change();
1868 impl->prepare_mutate(c);
1869 auto [erase_stage, next_pos] = impl->erase(pos);
1870 track_erase(pos, erase_stage);
1871 validate_tracked_cursors();
1872
1873 if (is_root()) {
1874 return eagain_iertr::now();
1875 } else {
1876 bool update_parent_index;
1877 if (impl->is_level_tail()) {
1878 update_parent_index = false;
1879 } else {
1880 next_pos.is_end() ? update_parent_index = true
1881 : update_parent_index = false;
1882 }
1883 return try_merge_adjacent<FORCE_MERGE>(
1884 c, update_parent_index, std::move(this_ref));
1885 }
1886 }).si_then([next_cursor] {
1887 return next_cursor;
1888 });
1889 });
1890 }
1891 template eagain_ifuture<Ref<tree_cursor_t>>
1892 LeafNode::erase<true>(context_t, const search_position_t&, bool);
1893 template eagain_ifuture<Ref<tree_cursor_t>>
1894 LeafNode::erase<false>(context_t, const search_position_t&, bool);
1895
1896 eagain_ifuture<> LeafNode::extend_value(
1897 context_t c, const search_position_t& pos, value_size_t extend_size)
1898 {
1899 ceph_abort("not implemented");
1900 return eagain_iertr::now();
1901 }
1902
1903 eagain_ifuture<> LeafNode::trim_value(
1904 context_t c, const search_position_t& pos, value_size_t trim_size)
1905 {
1906 ceph_abort("not implemented");
1907 return eagain_iertr::now();
1908 }
1909
1910 std::pair<NodeExtentMutable&, ValueDeltaRecorder*>
1911 LeafNode::prepare_mutate_value_payload(context_t c)
1912 {
1913 return impl->prepare_mutate_value_payload(c);
1914 }
1915
1916 eagain_ifuture<Ref<tree_cursor_t>>
1917 LeafNode::lookup_smallest(context_t)
1918 {
1919 if (unlikely(impl->is_keys_empty())) {
1920 assert(is_root());
1921 return seastar::make_ready_future<Ref<tree_cursor_t>>(
1922 tree_cursor_t::create_end(this));
1923 }
1924 auto pos = search_position_t::begin();
1925 key_view_t index_key;
1926 const value_header_t* p_value_header;
1927 impl->get_slot(pos, &index_key, &p_value_header);
1928 return seastar::make_ready_future<Ref<tree_cursor_t>>(
1929 get_or_track_cursor(pos, index_key, p_value_header));
1930 }
1931
1932 eagain_ifuture<Ref<tree_cursor_t>>
1933 LeafNode::lookup_largest(context_t)
1934 {
1935 if (unlikely(impl->is_keys_empty())) {
1936 assert(is_root());
1937 return seastar::make_ready_future<Ref<tree_cursor_t>>(
1938 tree_cursor_t::create_end(this));
1939 }
1940 search_position_t pos;
1941 key_view_t index_key;
1942 const value_header_t* p_value_header = nullptr;
1943 impl->get_largest_slot(&pos, &index_key, &p_value_header);
1944 return seastar::make_ready_future<Ref<tree_cursor_t>>(
1945 get_or_track_cursor(pos, index_key, p_value_header));
1946 }
1947
1948 eagain_ifuture<Node::search_result_t>
1949 LeafNode::lower_bound_tracked(
1950 context_t c, const key_hobj_t& key, MatchHistory& history)
1951 {
1952 key_view_t index_key;
1953 auto result = impl->lower_bound(key, history, &index_key);
1954 Ref<tree_cursor_t> cursor;
1955 if (result.position.is_end()) {
1956 assert(!result.p_value);
1957 cursor = tree_cursor_t::create_end(this);
1958 } else {
1959 cursor = get_or_track_cursor(result.position, index_key, result.p_value);
1960 }
1961 search_result_t ret{cursor, result.mstat};
1962 ret.validate_input_key(key, c.vb.get_header_magic());
1963 return seastar::make_ready_future<search_result_t>(ret);
1964 }
1965
1966 eagain_ifuture<> LeafNode::do_get_tree_stats(context_t, tree_stats_t& stats)
1967 {
1968 auto nstats = impl->get_stats();
1969 stats.size_persistent_leaf += nstats.size_persistent;
1970 stats.size_filled_leaf += nstats.size_filled;
1971 stats.size_logical_leaf += nstats.size_logical;
1972 stats.size_overhead_leaf += nstats.size_overhead;
1973 stats.size_value_leaf += nstats.size_value;
1974 stats.num_kvs_leaf += nstats.num_kvs;
1975 stats.num_nodes_leaf += 1;
1976 return eagain_iertr::now();
1977 }
1978
1979 void LeafNode::track_merge(
1980 Ref<Node> _right_node, match_stage_t stage, search_position_t& left_last_pos)
1981 {
1982 assert(level() == _right_node->level());
1983 // assert(impl->node_type() == _right_node->impl->node_type());
1984 auto& right_node = *static_cast<LeafNode*>(_right_node.get());
1985 if (right_node.tracked_cursors.empty()) {
1986 return;
1987 }
1988
1989 match_stage_t curr_stage = STAGE_BOTTOM;
1990
1991 // prepare the initial left_last_pos for offset
1992 while (curr_stage < stage) {
1993 left_last_pos.index_by_stage(curr_stage) = 0;
1994 ++curr_stage;
1995 }
1996 ++left_last_pos.index_by_stage(curr_stage);
1997
1998 // fix the tracked child nodes of right_node, stage by stage.
1999 auto& right_tracked_cursors = right_node.tracked_cursors;
2000 auto rit = right_tracked_cursors.begin();
2001 while (curr_stage <= STAGE_TOP) {
2002 auto right_pos_until = search_position_t::begin();
2003 right_pos_until.index_by_stage(curr_stage) = INDEX_UPPER_BOUND;
2004 auto rend = right_tracked_cursors.lower_bound(right_pos_until);
2005 while (rit != rend) {
2006 auto new_pos = rit->second->get_position();
2007 assert(new_pos == rit->first);
2008 assert(rit->second->get_leaf_node().get() == &right_node);
2009 new_pos += left_last_pos;
2010 auto p_cursor = rit->second;
2011 rit = right_tracked_cursors.erase(rit);
2012 p_cursor->update_track<true>(this, new_pos);
2013 }
2014 left_last_pos.index_by_stage(curr_stage) = 0;
2015 ++curr_stage;
2016 }
2017 assert(right_tracked_cursors.empty());
2018
2019 validate_tracked_cursors();
2020 }
2021
2022 eagain_ifuture<> LeafNode::test_clone_root(
2023 context_t c_other, RootNodeTracker& tracker_other) const
2024 {
2025 assert(is_root());
2026 assert(impl->is_level_tail());
2027 assert(impl->field_type() == field_type_t::N0);
2028 Ref<const Node> this_ref = this;
2029 return LeafNode::allocate(c_other, L_ADDR_MIN, field_type_t::N0, true
2030 ).si_then([this, c_other, &tracker_other](auto fresh_other) {
2031 impl->test_copy_to(fresh_other.mut);
2032 auto cloned_root = fresh_other.node;
2033 return c_other.nm.get_super(c_other.t, tracker_other
2034 ).handle_error_interruptible(
2035 eagain_iertr::pass_further{},
2036 crimson::ct_error::assert_all{"Invalid error during test clone"}
2037 ).si_then([c_other, cloned_root](auto&& super_other) {
2038 assert(super_other);
2039 cloned_root->make_root_new(c_other, std::move(super_other));
2040 });
2041 }).si_then([this_ref]{});
2042 }
2043
2044 eagain_ifuture<Ref<tree_cursor_t>> LeafNode::insert_value(
2045 context_t c, const key_hobj_t& key, value_config_t vconf,
2046 const search_position_t& pos, const MatchHistory& history,
2047 match_stat_t mstat)
2048 {
2049 LOG_PREFIX(OTree::LeafNode::insert_value);
2050 #ifndef NDEBUG
2051 if (pos.is_end()) {
2052 assert(impl->is_level_tail());
2053 }
2054 #endif
2055 DEBUGT("insert {} with insert_key={}, insert_value={}, insert_pos({}), "
2056 "history={}, mstat({}) ...",
2057 c.t, get_name(), key, vconf, pos, history, mstat);
2058 ++(c.t.get_onode_tree_stats().num_inserts);
2059 search_position_t insert_pos = pos;
2060 auto [insert_stage, insert_size] = impl->evaluate_insert(
2061 key, vconf, history, mstat, insert_pos);
2062 auto free_size = impl->free_size();
2063 if (free_size >= insert_size) {
2064 // proceed to insert
2065 on_layout_change();
2066 impl->prepare_mutate(c);
2067 auto p_value_header = impl->insert(key, vconf, insert_pos, insert_stage, insert_size);
2068 assert(impl->free_size() == free_size - insert_size);
2069 assert(insert_pos <= pos);
2070 assert(p_value_header->payload_size == vconf.payload_size);
2071 auto ret = track_insert(insert_pos, insert_stage, p_value_header);
2072 validate_tracked_cursors();
2073 return eagain_iertr::make_ready_future<Ref<tree_cursor_t>>(ret);
2074 }
2075 // split and insert
2076 Ref<Node> this_ref = this;
2077 laddr_t left_hint, right_hint;
2078 {
2079 key_view_t left_key;
2080 impl->get_slot(search_position_t::begin(), &left_key, nullptr);
2081 left_hint = left_key.get_hint();
2082 key_view_t right_key;
2083 impl->get_largest_slot(nullptr, &right_key, nullptr);
2084 right_hint = right_key.get_hint();
2085 }
2086 return (is_root() ? upgrade_root(c, left_hint) : eagain_iertr::now()
2087 ).si_then([this, c, right_hint] {
2088 return LeafNode::allocate(c, right_hint, impl->field_type(), impl->is_level_tail());
2089 }).si_then([this_ref = std::move(this_ref), this, c, &key, vconf, FNAME,
2090 insert_pos, insert_stage=insert_stage, insert_size=insert_size](auto fresh_right) mutable {
2091 auto right_node = fresh_right.node;
2092 INFOT("proceed split {} to fresh {} ...",
2093 c.t, get_name(), right_node->get_name());
2094 // no need to bump version for right node, as it is fresh
2095 on_layout_change();
2096 impl->prepare_mutate(c);
2097 auto [split_pos, is_insert_left, p_value_header] = impl->split_insert(
2098 fresh_right.mut, *right_node->impl, key, vconf,
2099 insert_pos, insert_stage, insert_size);
2100 assert(p_value_header->payload_size == vconf.payload_size);
2101 track_split(split_pos, right_node);
2102 Ref<tree_cursor_t> ret;
2103 if (is_insert_left) {
2104 ret = track_insert(insert_pos, insert_stage, p_value_header);
2105 } else {
2106 ret = right_node->track_insert(insert_pos, insert_stage, p_value_header);
2107 }
2108 validate_tracked_cursors();
2109 right_node->validate_tracked_cursors();
2110
2111 return apply_split_to_parent(
2112 c, std::move(this_ref), std::move(right_node), false
2113 ).si_then([ret] {
2114 return ret;
2115 });
2116 // TODO (optimize)
2117 // try to acquire space from siblings before split... see btrfs
2118 });
2119 }
2120
2121 eagain_ifuture<Ref<LeafNode>> LeafNode::allocate_root(
2122 context_t c, RootNodeTracker& root_tracker)
2123 {
2124 LOG_PREFIX(OTree::LeafNode::allocate_root);
2125 return LeafNode::allocate(c, L_ADDR_MIN, field_type_t::N0, true
2126 ).si_then([c, &root_tracker, FNAME](auto fresh_node) {
2127 auto root = fresh_node.node;
2128 return c.nm.get_super(c.t, root_tracker
2129 ).handle_error_interruptible(
2130 eagain_iertr::pass_further{},
2131 crimson::ct_error::input_output_error::handle([FNAME, c] {
2132 ERRORT("EIO during get_super()", c.t);
2133 ceph_abort("fatal error");
2134 })
2135 ).si_then([c, root](auto&& super) {
2136 assert(super);
2137 root->make_root_new(c, std::move(super));
2138 return root;
2139 });
2140 });
2141 }
2142
2143 Ref<tree_cursor_t> LeafNode::get_or_track_cursor(
2144 const search_position_t& position,
2145 const key_view_t& key, const value_header_t* p_value_header)
2146 {
2147 assert(!position.is_end());
2148 assert(p_value_header);
2149 Ref<tree_cursor_t> p_cursor;
2150 auto found = tracked_cursors.find(position);
2151 if (found == tracked_cursors.end()) {
2152 p_cursor = tree_cursor_t::create(this, position, key, p_value_header);
2153 } else {
2154 p_cursor = found->second;
2155 assert(p_cursor->get_leaf_node() == this);
2156 assert(p_cursor->get_position() == position);
2157 p_cursor->update_cache_same_node(key, p_value_header);
2158 }
2159 return p_cursor;
2160 }
2161
2162 void LeafNode::validate_cursor(const tree_cursor_t& cursor) const
2163 {
2164 #ifndef NDEBUG
2165 assert(this == cursor.get_leaf_node().get());
2166 assert(cursor.is_tracked());
2167 assert(!impl->is_extent_retired());
2168
2169 // We need to make sure user has freed all the cursors before submitting the
2170 // according transaction. Otherwise the below checks will have undefined
2171 // behaviors.
2172 auto [key, p_value_header] = get_kv(cursor.get_position());
2173 auto magic = p_value_header->magic;
2174 assert(key.compare_to(cursor.get_key_view(magic)) == MatchKindCMP::EQ);
2175 assert(p_value_header == cursor.read_value_header(magic));
2176 #endif
2177 }
2178
2179 Ref<tree_cursor_t> LeafNode::track_insert(
2180 const search_position_t& insert_pos, match_stage_t insert_stage,
2181 const value_header_t* p_value_header)
2182 {
2183 // update cursor position
2184 auto pos_upper_bound = insert_pos;
2185 pos_upper_bound.index_by_stage(insert_stage) = INDEX_UPPER_BOUND;
2186 auto first = tracked_cursors.lower_bound(insert_pos);
2187 auto last = tracked_cursors.lower_bound(pos_upper_bound);
2188 std::vector<tree_cursor_t*> p_cursors;
2189 std::for_each(first, last, [&p_cursors](auto& kv) {
2190 p_cursors.push_back(kv.second);
2191 });
2192 tracked_cursors.erase(first, last);
2193 for (auto& p_cursor : p_cursors) {
2194 search_position_t new_pos = p_cursor->get_position();
2195 ++new_pos.index_by_stage(insert_stage);
2196 p_cursor->update_track<true>(this, new_pos);
2197 }
2198
2199 // track insert
2200 // TODO: getting key_view_t from stage::proceed_insert() and
2201 // stage::append_insert() has not supported yet
2202 return tree_cursor_t::create(this, insert_pos);
2203 }
2204
2205 void LeafNode::track_split(
2206 const search_position_t& split_pos, Ref<LeafNode> right_node)
2207 {
2208 // update cursor ownership and position
2209 auto iter = tracked_cursors.lower_bound(split_pos);
2210 while (iter != tracked_cursors.end()) {
2211 auto new_pos = iter->first;
2212 auto p_cursor = iter->second;
2213 iter = tracked_cursors.erase(iter);
2214 new_pos -= split_pos;
2215 p_cursor->update_track<false>(right_node, new_pos);
2216 }
2217 }
2218
2219 void LeafNode::track_erase(
2220 const search_position_t& erase_pos, match_stage_t erase_stage)
2221 {
2222 // erase tracking and invalidate the erased cursor
2223 auto to_erase = tracked_cursors.find(erase_pos);
2224 assert(to_erase != tracked_cursors.end());
2225 to_erase->second->invalidate();
2226 auto first = tracked_cursors.erase(to_erase);
2227
2228 // update cursor position
2229 assert(first == tracked_cursors.lower_bound(erase_pos));
2230 auto pos_upper_bound = erase_pos;
2231 pos_upper_bound.index_by_stage(erase_stage) = INDEX_UPPER_BOUND;
2232 auto last = tracked_cursors.lower_bound(pos_upper_bound);
2233 std::vector<tree_cursor_t*> p_cursors;
2234 std::for_each(first, last, [&p_cursors](auto& kv) {
2235 p_cursors.push_back(kv.second);
2236 });
2237 tracked_cursors.erase(first, last);
2238 for (auto& p_cursor : p_cursors) {
2239 search_position_t new_pos = p_cursor->get_position();
2240 assert(new_pos.index_by_stage(erase_stage) > 0);
2241 --new_pos.index_by_stage(erase_stage);
2242 p_cursor->update_track<true>(this, new_pos);
2243 }
2244 }
2245
2246 eagain_ifuture<LeafNode::fresh_node_t> LeafNode::allocate(
2247 context_t c, laddr_t hint, field_type_t field_type, bool is_level_tail)
2248 {
2249 return LeafNodeImpl::allocate(c, hint, field_type, is_level_tail
2250 ).si_then([](auto&& fresh_impl) {
2251 auto node = Ref<LeafNode>(new LeafNode(
2252 fresh_impl.impl.get(), std::move(fresh_impl.impl)));
2253 return fresh_node_t{node, fresh_impl.mut};
2254 });
2255 }
2256
2257 }