]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/onode_manager/staged-fltree/stages/stage.h
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / crimson / os / seastore / onode_manager / staged-fltree / stages / stage.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <cassert>
7 #include <compare>
8 #include <optional>
9 #include <ostream>
10 #include <sstream>
11 #include <type_traits>
12
13 #include "common/likely.h"
14
15 #include "sub_items_stage.h"
16 #include "item_iterator_stage.h"
17
18 namespace crimson::os::seastore::onode {
19
20 struct search_result_bs_t {
21 index_t index;
22 MatchKindBS match;
23 };
24 template <typename FGetKey>
25 search_result_bs_t binary_search(
26 const key_hobj_t& key,
27 index_t begin, index_t end, FGetKey&& f_get_key) {
28 assert(begin <= end);
29 while (begin < end) {
30 auto total = begin + end;
31 auto mid = total >> 1;
32 // do not copy if return value is reference
33 decltype(f_get_key(mid)) target = f_get_key(mid);
34 auto match = key <=> target;
35 if (match == std::strong_ordering::less) {
36 end = mid;
37 } else if (match == std::strong_ordering::greater) {
38 begin = mid + 1;
39 } else {
40 return {mid, MatchKindBS::EQ};
41 }
42 }
43 return {begin , MatchKindBS::NE};
44 }
45
46 template <typename PivotType, typename FGet>
47 search_result_bs_t binary_search_r(
48 index_t rend, index_t rbegin, FGet&& f_get, const PivotType& key) {
49 assert(rend <= rbegin);
50 while (rend < rbegin) {
51 auto total = rend + rbegin + 1;
52 auto mid = total >> 1;
53 // do not copy if return value is reference
54 decltype(f_get(mid)) target = f_get(mid);
55 int match = target - key;
56 if (match < 0) {
57 rend = mid;
58 } else if (match > 0) {
59 rbegin = mid - 1;
60 } else {
61 return {mid, MatchKindBS::EQ};
62 }
63 }
64 return {rbegin, MatchKindBS::NE};
65 }
66
67 inline bool matchable(field_type_t type, match_stat_t mstat) {
68 assert(mstat >= MSTAT_MIN && mstat <= MSTAT_MAX);
69 /*
70 * compressed prefix by field type:
71 * N0: NONE
72 * N1: pool/shard
73 * N2: pool/shard crush
74 * N3: pool/shard crush ns/oid
75 *
76 * if key matches the node's compressed prefix, return true
77 * else, return false
78 */
79 #ifndef NDEBUG
80 if (mstat == MSTAT_END) {
81 assert(type == field_type_t::N0);
82 }
83 #endif
84 return mstat + to_unsigned(type) < 4;
85 }
86
87 inline void assert_mstat(
88 const key_hobj_t& key,
89 const key_view_t& index,
90 match_stat_t mstat) {
91 assert(mstat >= MSTAT_MIN && mstat <= MSTAT_LT2);
92 // key < index ...
93 switch (mstat) {
94 case MSTAT_EQ:
95 break;
96 case MSTAT_LT0:
97 assert(key < index.snap_gen_packed());
98 break;
99 case MSTAT_LT1:
100 assert(key < index.ns_oid_view());
101 break;
102 case MSTAT_LT2:
103 if (index.has_shard_pool()) {
104 assert((key < shard_pool_crush_t{
105 index.shard_pool_packed(), index.crush_packed()}));
106 } else {
107 assert(key < index.crush_packed());
108 }
109 break;
110 default:
111 ceph_abort("impossible path");
112 }
113 // key == index ...
114 switch (mstat) {
115 case MSTAT_EQ:
116 assert(key == index.snap_gen_packed());
117 case MSTAT_LT0:
118 if (!index.has_ns_oid())
119 break;
120 assert(index.ns_oid_view().type() == ns_oid_view_t::Type::MAX ||
121 key == index.ns_oid_view());
122 case MSTAT_LT1:
123 if (!index.has_crush())
124 break;
125 assert(key == index.crush_packed());
126 if (!index.has_shard_pool())
127 break;
128 assert(key == index.shard_pool_packed());
129 default:
130 break;
131 }
132 }
133
134 #define NXT_STAGE_T staged<next_param_t>
135
136 enum class TrimType { BEFORE, AFTER, AT };
137
138 /**
139 * staged
140 *
141 * Implements recursive logic that modifies or reads the node layout
142 * (N0/N1/N2/N3 * LEAF/INTERNAL) with the multi-stage design. The specific
143 * stage implementation is flexible. So the implementations for different
144 * stages can be assembled independently, as long as they follow the
145 * definitions of container interfaces.
146 *
147 * Multi-stage is designed to index different portions of onode keys
148 * stage-by-stage. There are at most 3 stages for a node:
149 * - STAGE_LEFT: index shard-pool-crush for N0, or index crush for N1 node;
150 * - STAGE_STRING: index ns-oid for N0/N1/N2 nodes;
151 * - STAGE_RIGHT: index snap-gen for N0/N1/N2/N3 nodes;
152 *
153 * The intention is to consolidate the high-level indexing implementations at
154 * the level of stage, so we don't need to write them repeatedly for every
155 * stage and for every node type.
156 */
157 template <typename Params>
158 struct staged {
159 static_assert(Params::STAGE >= STAGE_BOTTOM);
160 static_assert(Params::STAGE <= STAGE_TOP);
161 using container_t = typename Params::container_t;
162 using key_get_type = typename container_t::key_get_type;
163 using next_param_t = typename Params::next_param_t;
164 using position_t = staged_position_t<Params::STAGE>;
165 using result_t = staged_result_t<Params::NODE_TYPE, Params::STAGE>;
166 using value_input_t = value_input_type_t<Params::NODE_TYPE>;
167 using value_t = value_type_t<Params::NODE_TYPE>;
168 static constexpr auto CONTAINER_TYPE = container_t::CONTAINER_TYPE;
169 static constexpr bool IS_BOTTOM = (Params::STAGE == STAGE_BOTTOM);
170 static constexpr auto NODE_TYPE = Params::NODE_TYPE;
171 static constexpr auto STAGE = Params::STAGE;
172
173 template <bool is_exclusive>
174 static void _left_or_right(index_t& split_index, index_t insert_index,
175 std::optional<bool>& is_insert_left) {
176 assert(!is_insert_left.has_value());
177 assert(is_valid_index(split_index));
178 if constexpr (is_exclusive) {
179 if (split_index <= insert_index) {
180 // ...[s_index-1] |!| (i_index) [s_index]...
181 // offset i_position to right
182 is_insert_left = false;
183 } else {
184 // ...[s_index-1] (i_index)) |?[s_index]| ...
185 // ...(i_index)...[s_index-1] |?[s_index]| ...
186 is_insert_left = true;
187 --split_index;
188 }
189 } else {
190 if (split_index < insert_index) {
191 // ...[s_index-1] |?[s_index]| ...[(i_index)[s_index_k]...
192 is_insert_left = false;
193 } else if (split_index > insert_index) {
194 // ...[(i_index)s_index-1] |?[s_index]| ...
195 // ...[(i_index)s_index_k]...[s_index-1] |?[s_index]| ...
196 is_insert_left = true;
197 } else {
198 // ...[s_index-1] |?[(i_index)s_index]| ...
199 // i_to_left = std::nullopt;
200 }
201 }
202 }
203
204 template <ContainerType CTYPE, typename Enable = void> class _iterator_t;
205 template <ContainerType CTYPE>
206 class _iterator_t<CTYPE, std::enable_if_t<CTYPE == ContainerType::INDEXABLE>> {
207 /*
208 * indexable container type system:
209 * CONTAINER_TYPE = ContainerType::INDEXABLE
210 * keys() const -> index_t
211 * operator[](index_t) const -> key_get_type
212 * size_before(index_t) const -> extent_len_t
213 * size_overhead_at(index_t) const -> node_offset_t
214 * (IS_BOTTOM) get_p_value(index_t) const -> const value_t*
215 * (!IS_BOTTOM) size_to_nxt_at(index_t) const -> node_offset_t
216 * (!IS_BOTTOM) get_nxt_container(index_t) const
217 * encode(p_node_start, encoded)
218 * decode(p_node_start, node_size, delta) -> container_t
219 * static:
220 * header_size() -> node_offset_t
221 * estimate_insert(key, value) -> node_offset_t
222 * (IS_BOTTOM) insert_at(mut, src, key, value,
223 * index, size, p_left_bound) -> const value_t*
224 * (!IS_BOTTOM) insert_prefix_at(mut, src, key,
225 * index, size, p_left_bound) -> memory_range_t
226 * (!IS_BOTTOM) update_size_at(mut, src, index, size)
227 * trim_until(mut, container, index) -> trim_size
228 * (!IS_BOTTOM) trim_at(mut, container, index, trimmed) -> trim_size
229 * erase_at(mut, container, index, p_left_bound) -> erase_size
230 *
231 * Appender::append(const container_t& src, from, items)
232 */
233 public:
234 using me_t = _iterator_t<CTYPE>;
235
236 _iterator_t(const container_t& container) : container{container} {
237 assert(container.keys());
238 }
239
240 index_t index() const {
241 return _index;
242 }
243 key_get_type get_key() const {
244 assert(!is_end());
245 return container[_index];
246 }
247 node_offset_t size_to_nxt() const {
248 assert(!is_end());
249 return container.size_to_nxt_at(_index);
250 }
251 template <typename T = typename NXT_STAGE_T::container_t>
252 std::enable_if_t<!IS_BOTTOM, T> get_nxt_container() const {
253 assert(!is_end());
254 return container.get_nxt_container(_index);
255 }
256 template <typename T = value_t>
257 std::enable_if_t<IS_BOTTOM, const T*> get_p_value() const {
258 assert(!is_end());
259 return container.get_p_value(_index);
260 }
261 bool is_last() const {
262 return _index + 1 == container.keys();
263 }
264 bool is_end() const { return _index == container.keys(); }
265 node_offset_t size() const {
266 assert(!is_end());
267 assert(header_size() == container.size_before(0));
268 assert(container.size_before(_index + 1) > container.size_before(_index));
269 return container.size_before(_index + 1) -
270 container.size_before(_index);
271 }
272 node_offset_t size_overhead() const {
273 assert(!is_end());
274 return container.size_overhead_at(_index);
275 }
276
277 me_t& operator++() {
278 assert(!is_end());
279 assert(!is_last());
280 ++_index;
281 return *this;
282 }
283 void seek_at(index_t index) {
284 assert(index < container.keys());
285 seek_till_end(index);
286 }
287 void seek_till_end(index_t index) {
288 assert(!is_end());
289 assert(this->index() == 0);
290 assert(index <= container.keys());
291 _index = index;
292 }
293 void seek_last() {
294 assert(!is_end());
295 assert(index() == 0);
296 _index = container.keys() - 1;
297 }
298 void set_end() {
299 assert(!is_end());
300 assert(is_last());
301 ++_index;
302 }
303 // Note: possible to return an end iterator
304 MatchKindBS seek(const key_hobj_t& key, bool exclude_last) {
305 assert(!is_end());
306 assert(index() == 0);
307 index_t end_index = container.keys();
308 if (exclude_last) {
309 assert(end_index);
310 --end_index;
311 assert(key < container[end_index]);
312 }
313 auto ret = binary_search(key, _index, end_index,
314 [this] (index_t index) { return container[index]; });
315 _index = ret.index;
316 return ret.match;
317 }
318
319 template <IsFullKey Key, typename T = value_t>
320 std::enable_if_t<IS_BOTTOM, const T*> insert(
321 NodeExtentMutable& mut,
322 const Key& key,
323 const value_input_t& value,
324 node_offset_t insert_size,
325 const char* p_left_bound) {
326 return container_t::insert_at(
327 mut, container, key, value, _index, insert_size, p_left_bound);
328 }
329
330 template <IsFullKey Key, typename T = memory_range_t>
331 std::enable_if_t<!IS_BOTTOM, T> insert_prefix(
332 NodeExtentMutable& mut, const Key& key,
333 node_offset_t size, const char* p_left_bound) {
334 return container_t::insert_prefix_at(
335 mut, container, key, _index, size, p_left_bound);
336 }
337
338 template <typename T = void>
339 std::enable_if_t<!IS_BOTTOM, T>
340 update_size(NodeExtentMutable& mut, int insert_size) {
341 assert(!is_end());
342 container_t::update_size_at(mut, container, _index, insert_size);
343 }
344
345 // Note: possible to return an end iterator when is_exclusive is true
346 template <bool is_exclusive>
347 size_t seek_split_inserted(
348 size_t start_size, size_t extra_size, size_t target_size,
349 index_t& insert_index, size_t insert_size,
350 std::optional<bool>& is_insert_left) {
351 assert(!is_end());
352 assert(index() == 0);
353 // replace insert_index placeholder
354 if constexpr (!is_exclusive) {
355 if (insert_index == INDEX_LAST) {
356 insert_index = container.keys() - 1;
357 }
358 } else {
359 if (insert_index == INDEX_END) {
360 insert_index = container.keys();
361 }
362 }
363 assert(insert_index <= container.keys());
364
365 auto start_size_1 = start_size + extra_size;
366 auto f_get_used_size = [this, start_size, start_size_1,
367 insert_index, insert_size] (index_t index) {
368 size_t current_size;
369 if (unlikely(index == 0)) {
370 current_size = start_size;
371 } else {
372 current_size = start_size_1;
373 if (index > insert_index) {
374 current_size += insert_size;
375 if constexpr (is_exclusive) {
376 --index;
377 }
378 }
379 // already includes header size
380 current_size += container.size_before(index);
381 }
382 return current_size;
383 };
384 index_t s_end;
385 if constexpr (is_exclusive) {
386 s_end = container.keys();
387 } else {
388 s_end = container.keys() - 1;
389 }
390 _index = binary_search_r(0, s_end, f_get_used_size, target_size).index;
391 size_t current_size = f_get_used_size(_index);
392 assert(current_size <= target_size);
393
394 _left_or_right<is_exclusive>(_index, insert_index, is_insert_left);
395 return current_size;
396 }
397
398 size_t seek_split(size_t start_size, size_t extra_size, size_t target_size) {
399 assert(!is_end());
400 assert(index() == 0);
401 auto start_size_1 = start_size + extra_size;
402 auto f_get_used_size = [this, start_size, start_size_1] (index_t index) {
403 size_t current_size;
404 if (unlikely(index == 0)) {
405 current_size = start_size;
406 } else {
407 // already includes header size
408 current_size = start_size_1 + container.size_before(index);
409 }
410 return current_size;
411 };
412 _index = binary_search_r(
413 0, container.keys() - 1, f_get_used_size, target_size).index;
414 size_t current_size = f_get_used_size(_index);
415 assert(current_size <= target_size);
416 return current_size;
417 }
418
419 // Note: possible to return an end iterater if to_index == INDEX_END
420 template <KeyT KT>
421 void copy_out_until(
422 typename container_t::template Appender<KT>& appender, index_t& to_index) {
423 auto num_keys = container.keys();
424 index_t items;
425 if (to_index == INDEX_END) {
426 items = num_keys - _index;
427 appender.append(container, _index, items);
428 _index = num_keys;
429 to_index = _index;
430 } else if (to_index == INDEX_LAST) {
431 assert(!is_end());
432 items = num_keys - 1 - _index;
433 appender.append(container, _index, items);
434 _index = num_keys - 1;
435 to_index = _index;
436 } else {
437 assert(_index <= to_index);
438 assert(to_index <= num_keys);
439 items = to_index - _index;
440 appender.append(container, _index, items);
441 _index = to_index;
442 }
443 }
444
445 node_offset_t trim_until(NodeExtentMutable& mut) {
446 return container_t::trim_until(mut, container, _index);
447 }
448
449 template <typename T = node_offset_t>
450 std::enable_if_t<!IS_BOTTOM, T>
451 trim_at(NodeExtentMutable& mut, node_offset_t trimmed) {
452 return container_t::trim_at(mut, container, _index, trimmed);
453 }
454
455 node_offset_t erase(NodeExtentMutable& mut, const char* p_left_bound) {
456 assert(!is_end());
457 return container_t::erase_at(mut, container, _index, p_left_bound);
458 }
459
460 template <KeyT KT>
461 typename container_t::template Appender<KT>
462 get_appender(NodeExtentMutable* p_mut) {
463 assert(_index + 1 == container.keys());
464 return typename container_t::template Appender<KT>(p_mut, container);
465 }
466
467 template <KeyT KT>
468 typename container_t::template Appender<KT>
469 get_appender_opened(NodeExtentMutable* p_mut) {
470 if constexpr (!IS_BOTTOM) {
471 assert(_index + 1 == container.keys());
472 return typename container_t::template Appender<KT>(p_mut, container, true);
473 } else {
474 ceph_abort("impossible path");
475 }
476 }
477
478 void encode(const char* p_node_start, ceph::bufferlist& encoded) const {
479 container.encode(p_node_start, encoded);
480 ceph::encode(_index, encoded);
481 }
482
483 static me_t decode(const char* p_node_start,
484 extent_len_t node_size,
485 ceph::bufferlist::const_iterator& delta) {
486 auto container = container_t::decode(
487 p_node_start, node_size, delta);
488 auto ret = me_t(container);
489 index_t index;
490 ceph::decode(index, delta);
491 ret.seek_till_end(index);
492 return ret;
493 }
494
495 static node_offset_t header_size() {
496 return container_t::header_size();
497 }
498
499 template <IsFullKey Key>
500 static node_offset_t estimate_insert(
501 const Key& key, const value_input_t& value) {
502 return container_t::estimate_insert(key, value);
503 }
504
505 private:
506 container_t container;
507 index_t _index = 0;
508 };
509
510 template <ContainerType CTYPE>
511 class _iterator_t<CTYPE, std::enable_if_t<CTYPE == ContainerType::ITERATIVE>> {
512 /*
513 * iterative container type system (!IS_BOTTOM):
514 * CONTAINER_TYPE = ContainerType::ITERATIVE
515 * index() const -> index_t
516 * get_key() const -> key_get_type
517 * size() const -> node_offset_t
518 * size_to_nxt() const -> node_offset_t
519 * size_overhead() const -> node_offset_t
520 * get_nxt_container() const
521 * has_next() const -> bool
522 * encode(p_node_start, encoded)
523 * decode(p_node_start, node_length, delta) -> container_t
524 * operator++()
525 * static:
526 * header_size() -> node_offset_t
527 * estimate_insert(key, value) -> node_offset_t
528 * insert_prefix(mut, src, key, is_end, size, p_left_bound) -> memory_range_t
529 * update_size(mut, src, size)
530 * trim_until(mut, container) -> trim_size
531 * trim_at(mut, container, trimmed) -> trim_size
532 * erase(mut, container, p_left_bound) -> erase_size
533 */
534 // currently the iterative iterator is only implemented with STAGE_STRING
535 // for in-node space efficiency
536 static_assert(STAGE == STAGE_STRING);
537 public:
538 using me_t = _iterator_t<CTYPE>;
539
540 _iterator_t(const container_t& container) : container{container} {}
541
542 index_t index() const {
543 if (is_end()) {
544 return container.index() + 1;
545 } else {
546 return container.index();
547 }
548 }
549 key_get_type get_key() const {
550 assert(!is_end());
551 return container.get_key();
552 }
553 node_offset_t size_to_nxt() const {
554 assert(!is_end());
555 return container.size_to_nxt();
556 }
557 const typename NXT_STAGE_T::container_t get_nxt_container() const {
558 assert(!is_end());
559 return container.get_nxt_container();
560 }
561 bool is_last() const {
562 assert(!is_end());
563 return !container.has_next();
564 }
565 bool is_end() const {
566 #ifndef NDEBUG
567 if (_is_end) {
568 assert(!container.has_next());
569 }
570 #endif
571 return _is_end;
572 }
573 node_offset_t size() const {
574 assert(!is_end());
575 return container.size();
576 }
577 node_offset_t size_overhead() const {
578 assert(!is_end());
579 return container.size_overhead();
580 }
581
582 me_t& operator++() {
583 assert(!is_end());
584 assert(!is_last());
585 ++container;
586 return *this;
587 }
588 void seek_at(index_t index) {
589 assert(!is_end());
590 assert(this->index() == 0);
591 while (index > 0) {
592 assert(container.has_next());
593 ++container;
594 --index;
595 }
596 }
597 void seek_till_end(index_t index) {
598 assert(!is_end());
599 assert(this->index() == 0);
600 while (index > 0) {
601 if (!container.has_next()) {
602 assert(index == 1);
603 set_end();
604 break;
605 }
606 ++container;
607 --index;
608 }
609 }
610 void seek_last() {
611 assert(!is_end());
612 assert(index() == 0);
613 while (container.has_next()) {
614 ++container;
615 }
616 }
617 void set_end() {
618 assert(!is_end());
619 assert(is_last());
620 _is_end = true;
621 }
622 // Note: possible to return an end iterator
623 MatchKindBS seek(const key_hobj_t& key, bool exclude_last) {
624 assert(!is_end());
625 assert(index() == 0);
626 do {
627 if (exclude_last && is_last()) {
628 assert(key < get_key());
629 return MatchKindBS::NE;
630 }
631 auto match = key <=> get_key();
632 if (match == std::strong_ordering::less) {
633 return MatchKindBS::NE;
634 } else if (match == std::strong_ordering::equal) {
635 return MatchKindBS::EQ;
636 } else {
637 if (container.has_next()) {
638 ++container;
639 } else {
640 // end
641 break;
642 }
643 }
644 } while (true);
645 assert(!exclude_last);
646 set_end();
647 return MatchKindBS::NE;
648 }
649
650 template <IsFullKey Key>
651 memory_range_t insert_prefix(
652 NodeExtentMutable& mut, const Key& key,
653 node_offset_t size, const char* p_left_bound) {
654 return container_t::insert_prefix(
655 mut, container, key, is_end(), size, p_left_bound);
656 }
657
658 void update_size(NodeExtentMutable& mut, int insert_size) {
659 assert(!is_end());
660 container_t::update_size(mut, container, insert_size);
661 }
662
663 // Note: possible to return an end iterator when is_exclusive is true
664 // insert_index can still be INDEX_LAST or INDEX_END
665 template <bool is_exclusive>
666 size_t seek_split_inserted(
667 size_t start_size, size_t extra_size, size_t target_size,
668 index_t& insert_index, size_t insert_size,
669 std::optional<bool>& is_insert_left) {
670 assert(!is_end());
671 assert(index() == 0);
672 size_t current_size = start_size;
673 index_t split_index = 0;
674 extra_size += header_size();
675 do {
676 if constexpr (!is_exclusive) {
677 if (is_last()) {
678 assert(split_index == index());
679 if (insert_index == INDEX_LAST) {
680 insert_index = index();
681 }
682 assert(insert_index <= index());
683 break;
684 }
685 }
686
687 size_t nxt_size = current_size;
688 if (split_index == 0) {
689 nxt_size += extra_size;
690 }
691 if (split_index == insert_index) {
692 nxt_size += insert_size;
693 if constexpr (is_exclusive) {
694 if (nxt_size > target_size) {
695 break;
696 }
697 current_size = nxt_size;
698 ++split_index;
699 }
700 }
701 nxt_size += size();
702 if (nxt_size > target_size) {
703 break;
704 }
705 current_size = nxt_size;
706
707 if constexpr (is_exclusive) {
708 if (is_last()) {
709 assert(split_index == index());
710 set_end();
711 split_index = index();
712 if (insert_index == INDEX_END) {
713 insert_index = index();
714 }
715 assert(insert_index == index());
716 break;
717 } else {
718 ++(*this);
719 ++split_index;
720 }
721 } else {
722 ++(*this);
723 ++split_index;
724 }
725 } while (true);
726 assert(current_size <= target_size);
727
728 _left_or_right<is_exclusive>(split_index, insert_index, is_insert_left);
729 assert(split_index == index());
730 return current_size;
731 }
732
733 size_t seek_split(size_t start_size, size_t extra_size, size_t target_size) {
734 assert(!is_end());
735 assert(index() == 0);
736 size_t current_size = start_size;
737 do {
738 if (is_last()) {
739 break;
740 }
741
742 size_t nxt_size = current_size;
743 if (index() == 0) {
744 nxt_size += extra_size;
745 }
746 nxt_size += size();
747 if (nxt_size > target_size) {
748 break;
749 }
750 current_size = nxt_size;
751 ++(*this);
752 } while (true);
753 assert(current_size <= target_size);
754 return current_size;
755 }
756
757 // Note: possible to return an end iterater if to_index == INDEX_END
758 template <KeyT KT>
759 void copy_out_until(
760 typename container_t::template Appender<KT>& appender, index_t& to_index) {
761 if (is_end()) {
762 assert(!container.has_next());
763 if (to_index == INDEX_END) {
764 to_index = index();
765 }
766 assert(to_index == index());
767 return;
768 }
769 index_t items;
770 if (to_index == INDEX_END || to_index == INDEX_LAST) {
771 items = to_index;
772 } else {
773 assert(is_valid_index(to_index));
774 assert(index() <= to_index);
775 items = to_index - index();
776 }
777 if (appender.append(container, items)) {
778 set_end();
779 }
780 to_index = index();
781 }
782
783 node_offset_t trim_until(NodeExtentMutable& mut) {
784 if (is_end()) {
785 return 0;
786 }
787 return container_t::trim_until(mut, container);
788 }
789
790 node_offset_t trim_at(NodeExtentMutable& mut, node_offset_t trimmed) {
791 assert(!is_end());
792 return container_t::trim_at(mut, container, trimmed);
793 }
794
795 node_offset_t erase(NodeExtentMutable& mut, const char* p_left_bound) {
796 assert(!is_end());
797 return container_t::erase(mut, container, p_left_bound);
798 }
799
800 template <KeyT KT>
801 typename container_t::template Appender<KT>
802 get_appender(NodeExtentMutable* p_mut) {
803 return typename container_t::template Appender<KT>(p_mut, container, false);
804 }
805
806 template <KeyT KT>
807 typename container_t::template Appender<KT>
808 get_appender_opened(NodeExtentMutable* p_mut) {
809 if constexpr (!IS_BOTTOM) {
810 return typename container_t::template Appender<KT>(p_mut, container, true);
811 } else {
812 ceph_abort("impossible path");
813 }
814 }
815
816 void encode(const char* p_node_start, ceph::bufferlist& encoded) const {
817 container.encode(p_node_start, encoded);
818 uint8_t is_end = _is_end;
819 ceph::encode(is_end, encoded);
820 }
821
822 static me_t decode(const char* p_node_start,
823 extent_len_t node_size,
824 ceph::bufferlist::const_iterator& delta) {
825 auto container = container_t::decode(
826 p_node_start, node_size, delta);
827 auto ret = me_t(container);
828 uint8_t is_end;
829 ceph::decode(is_end, delta);
830 if (is_end) {
831 ret.set_end();
832 }
833 return ret;
834 }
835
836 static node_offset_t header_size() {
837 return container_t::header_size();
838 }
839
840 template <IsFullKey Key>
841 static node_offset_t estimate_insert(const Key& key,
842 const value_input_t& value) {
843 return container_t::estimate_insert(key, value);
844 }
845
846 private:
847 container_t container;
848 bool _is_end = false;
849 };
850
851 /*
852 * iterator_t encapsulates both indexable and iterative implementations
853 * from a *non-empty* container.
854 * cstr(const container_t&)
855 * access:
856 * index() -> index_t
857 * get_key() -> key_get_type (const reference or value type)
858 * is_last() -> bool
859 * is_end() -> bool
860 * size() -> node_offset_t
861 * size_overhead() -> node_offset_t
862 * (IS_BOTTOM) get_p_value() -> const value_t*
863 * (!IS_BOTTOM) get_nxt_container() -> container_range_t
864 * (!IS_BOTTOM) size_to_nxt() -> node_offset_t
865 * seek:
866 * operator++() -> iterator_t&
867 * seek_at(index)
868 * seek_till_end(index)
869 * seek_last()
870 * set_end()
871 * seek(key, exclude_last) -> MatchKindBS
872 * insert:
873 * (IS_BOTTOM) insert(mut, key, value, size, p_left_bound) -> p_value
874 * (!IS_BOTTOM) insert_prefix(mut, key, size, p_left_bound) -> memory_range_t
875 * (!IS_BOTTOM) update_size(mut, size)
876 * split:
877 * seek_split_inserted<bool is_exclusive>(
878 * start_size, extra_size, target_size, insert_index, insert_size,
879 * std::optional<bool>& is_insert_left)
880 * -> insert to left/right/unknown (!exclusive)
881 * -> insert to left/right (exclusive, can be end)
882 * -> split_size
883 * seek_split(start_size, extra_size, target_size) -> split_size
884 * copy_out_until(appender, to_index) (can be end)
885 * trim_until(mut) -> trim_size
886 * (!IS_BOTTOM) trim_at(mut, trimmed) -> trim_size
887 * erase:
888 * erase(mut, p_left_bound) -> erase_size
889 * merge:
890 * get_appender(p_mut) -> Appender
891 * (!IS_BOTTOM)get_appender_opened(p_mut) -> Appender
892 * denc:
893 * encode(p_node_start, encoded)
894 * decode(p_node_start, node_size, delta) -> iterator_t
895 * static:
896 * header_size() -> node_offset_t
897 * estimate_insert(key, value) -> node_offset_t
898 */
899 using iterator_t = _iterator_t<CONTAINER_TYPE>;
900 /* TODO: detailed comments
901 * - trim_until(mut) -> trim_size
902 * * keep 0 to i - 1, and remove the rest, return the size trimmed.
903 * * if this is the end iterator, do nothing and return 0.
904 * * if this is the start iterator, normally needs to go to the higher
905 * stage to trim the entire container.
906 * - trim_at(mut, trimmed) -> trim_size
907 * * trim happens inside the current iterator, causing the size reduced by
908 * <trimmed>, return the total size trimmed.
909 */
910
911 /*
912 * Lookup internals (hide?)
913 */
914
915 static bool is_keys_one(
916 const container_t& container) { // IN
917 auto iter = iterator_t(container);
918 iter.seek_last();
919 if (iter.index() == 0) {
920 if constexpr (IS_BOTTOM) {
921 // ok, there is only 1 key
922 return true;
923 } else {
924 auto nxt_container = iter.get_nxt_container();
925 return NXT_STAGE_T::is_keys_one(nxt_container);
926 }
927 } else {
928 // more than 1 keys
929 return false;
930 }
931 }
932
933 template <bool GET_KEY>
934 static result_t smallest_result(
935 const iterator_t& iter, key_view_t* p_index_key) {
936 static_assert(!IS_BOTTOM);
937 assert(!iter.is_end());
938 auto nxt_container = iter.get_nxt_container();
939 auto pos_smallest = NXT_STAGE_T::position_t::begin();
940 const value_t* p_value;
941 NXT_STAGE_T::template get_slot<GET_KEY, true>(
942 nxt_container, pos_smallest, p_index_key, &p_value);
943 if constexpr (GET_KEY) {
944 assert(p_index_key);
945 p_index_key->set(iter.get_key());
946 } else {
947 assert(!p_index_key);
948 }
949 return result_t{{iter.index(), pos_smallest}, p_value, STAGE};
950 }
951
952 template <bool GET_KEY>
953 static result_t nxt_lower_bound(
954 const key_hobj_t& key, iterator_t& iter,
955 MatchHistory& history, key_view_t* index_key) {
956 static_assert(!IS_BOTTOM);
957 assert(!iter.is_end());
958 auto nxt_container = iter.get_nxt_container();
959 auto nxt_result = NXT_STAGE_T::template lower_bound<GET_KEY>(
960 nxt_container, key, history, index_key);
961 if (nxt_result.is_end()) {
962 if (iter.is_last()) {
963 return result_t::end();
964 } else {
965 return smallest_result<GET_KEY>(++iter, index_key);
966 }
967 } else {
968 if constexpr (GET_KEY) {
969 index_key->set(iter.get_key());
970 }
971 return result_t::from_nxt(iter.index(), nxt_result);
972 }
973 }
974
975 template <bool GET_POS, bool GET_KEY, bool GET_VAL>
976 static void get_largest_slot(
977 const container_t& container, // IN
978 position_t* p_position, // OUT
979 key_view_t* p_index_key, // OUT
980 const value_t** pp_value) { // OUT
981 auto iter = iterator_t(container);
982 iter.seek_last();
983 if constexpr (GET_KEY) {
984 assert(p_index_key);
985 p_index_key->set(iter.get_key());
986 } else {
987 assert(!p_index_key);
988 }
989 if constexpr (GET_POS) {
990 assert(p_position);
991 p_position->index = iter.index();
992 } else {
993 assert(!p_position);
994 }
995 if constexpr (IS_BOTTOM) {
996 if constexpr (GET_VAL) {
997 assert(pp_value);
998 *pp_value = iter.get_p_value();
999 } else {
1000 assert(!pp_value);
1001 }
1002 } else {
1003 auto nxt_container = iter.get_nxt_container();
1004 if constexpr (GET_POS) {
1005 NXT_STAGE_T::template get_largest_slot<true, GET_KEY, GET_VAL>(
1006 nxt_container, &p_position->nxt, p_index_key, pp_value);
1007 } else {
1008 NXT_STAGE_T::template get_largest_slot<false, GET_KEY, GET_VAL>(
1009 nxt_container, nullptr, p_index_key, pp_value);
1010 }
1011 }
1012 }
1013
1014 template <bool GET_KEY, bool GET_VAL>
1015 static void get_slot(
1016 const container_t& container, // IN
1017 const position_t& pos, // IN
1018 key_view_t* p_index_key, // OUT
1019 const value_t** pp_value) { // OUT
1020 auto iter = iterator_t(container);
1021 iter.seek_at(pos.index);
1022
1023 if constexpr (GET_KEY) {
1024 assert(p_index_key);
1025 p_index_key->set(iter.get_key());
1026 } else {
1027 assert(!p_index_key);
1028 }
1029
1030 if constexpr (!IS_BOTTOM) {
1031 auto nxt_container = iter.get_nxt_container();
1032 NXT_STAGE_T::template get_slot<GET_KEY, GET_VAL>(
1033 nxt_container, pos.nxt, p_index_key, pp_value);
1034 } else {
1035 if constexpr (GET_VAL) {
1036 assert(pp_value);
1037 *pp_value = iter.get_p_value();
1038 } else {
1039 assert(!pp_value);
1040 }
1041 }
1042 }
1043
1044 template <bool GET_KEY = false>
1045 static result_t lower_bound(
1046 const container_t& container,
1047 const key_hobj_t& key,
1048 MatchHistory& history,
1049 key_view_t* index_key = nullptr) {
1050 bool exclude_last = false;
1051 if (history.get<STAGE>().has_value()) {
1052 if (*history.get<STAGE>() == MatchKindCMP::EQ) {
1053 // lookup is short-circuited
1054 if constexpr (!IS_BOTTOM) {
1055 assert(history.get<STAGE - 1>().has_value());
1056 if (history.is_GT<STAGE - 1>()) {
1057 auto iter = iterator_t(container);
1058 bool test_key_equal;
1059 if constexpr (STAGE == STAGE_STRING) {
1060 // TODO(cross-node string dedup)
1061 // test_key_equal = (iter.get_key().type() == ns_oid_view_t::Type::MIN);
1062 auto cmp = key <=> iter.get_key();
1063 assert(cmp != std::strong_ordering::greater);
1064 test_key_equal = (cmp == 0);
1065 } else {
1066 auto cmp = key <=> iter.get_key();
1067 // From history, key[stage] == parent[stage][index - 1]
1068 // which should be the smallest possible value for all
1069 // index[stage][*]
1070 assert(cmp != std::strong_ordering::greater);
1071 test_key_equal = (cmp == 0);
1072 }
1073 if (test_key_equal) {
1074 return nxt_lower_bound<GET_KEY>(key, iter, history, index_key);
1075 } else {
1076 // key[stage] < index[stage][left-most]
1077 return smallest_result<GET_KEY>(iter, index_key);
1078 }
1079 }
1080 }
1081 // IS_BOTTOM || !history.is_GT<STAGE - 1>()
1082 auto iter = iterator_t(container);
1083 iter.seek_last();
1084 if constexpr (STAGE == STAGE_STRING) {
1085 // TODO(cross-node string dedup)
1086 // assert(iter.get_key().type() == ns_oid_view_t::Type::MAX);
1087 assert(key == iter.get_key());
1088 } else {
1089 assert(key == iter.get_key());
1090 }
1091 if constexpr (GET_KEY) {
1092 index_key->set(iter.get_key());
1093 }
1094 if constexpr (IS_BOTTOM) {
1095 auto value_ptr = iter.get_p_value();
1096 return result_t{{iter.index()}, value_ptr, MSTAT_EQ};
1097 } else {
1098 auto nxt_container = iter.get_nxt_container();
1099 auto nxt_result = NXT_STAGE_T::template lower_bound<GET_KEY>(
1100 nxt_container, key, history, index_key);
1101 // !history.is_GT<STAGE - 1>() means
1102 // key[stage+1 ...] <= index[stage+1 ...][*]
1103 assert(!nxt_result.is_end());
1104 return result_t::from_nxt(iter.index(), nxt_result);
1105 }
1106 } else if (*history.get<STAGE>() == MatchKindCMP::LT) {
1107 exclude_last = true;
1108 }
1109 }
1110 auto iter = iterator_t(container);
1111 auto bs_match = iter.seek(key, exclude_last);
1112 if (iter.is_end()) {
1113 assert(!exclude_last);
1114 assert(bs_match == MatchKindBS::NE);
1115 history.set<STAGE>(MatchKindCMP::GT);
1116 return result_t::end();
1117 }
1118 history.set<STAGE>(bs_match == MatchKindBS::EQ ?
1119 MatchKindCMP::EQ : MatchKindCMP::LT);
1120 if constexpr (IS_BOTTOM) {
1121 if constexpr (GET_KEY) {
1122 index_key->set(iter.get_key());
1123 }
1124 auto value_ptr = iter.get_p_value();
1125 return result_t{{iter.index()}, value_ptr,
1126 (bs_match == MatchKindBS::EQ ? MSTAT_EQ : MSTAT_LT0)};
1127 } else {
1128 if (bs_match == MatchKindBS::EQ) {
1129 return nxt_lower_bound<GET_KEY>(key, iter, history, index_key);
1130 } else {
1131 return smallest_result<GET_KEY>(iter, index_key);
1132 }
1133 }
1134 }
1135
1136 template <IsFullKey Key>
1137 static node_offset_t insert_size(const Key& key,
1138 const value_input_t& value) {
1139 if constexpr (IS_BOTTOM) {
1140 return iterator_t::estimate_insert(key, value);
1141 } else {
1142 return iterator_t::estimate_insert(key, value) +
1143 NXT_STAGE_T::iterator_t::header_size() +
1144 NXT_STAGE_T::insert_size(key, value);
1145 }
1146 }
1147
1148 template <IsFullKey Key>
1149 static node_offset_t insert_size_at(match_stage_t stage,
1150 const Key& key,
1151 const value_input_t& value) {
1152 if (stage == STAGE) {
1153 return insert_size(key, value);
1154 } else {
1155 assert(stage < STAGE);
1156 return NXT_STAGE_T::template insert_size_at(stage, key, value);
1157 }
1158 }
1159
1160 template <typename T = std::tuple<match_stage_t, node_offset_t>>
1161 static std::enable_if_t<NODE_TYPE == node_type_t::INTERNAL, T> evaluate_insert(
1162 const container_t& container, const key_view_t& key,
1163 const value_input_t& value, position_t& position, bool evaluate_last) {
1164 auto iter = iterator_t(container);
1165 auto& index = position.index;
1166 if (evaluate_last || index == INDEX_END) {
1167 iter.seek_last();
1168 index = iter.index();
1169 // evaluate the previous index
1170 } else {
1171 assert(is_valid_index(index));
1172 // evaluate the current index
1173 iter.seek_at(index);
1174 auto match = key <=> iter.get_key();
1175 if (match == 0) {
1176 if constexpr (IS_BOTTOM) {
1177 ceph_abort("insert conflict at current index!");
1178 } else {
1179 // insert into the current index
1180 auto nxt_container = iter.get_nxt_container();
1181 return NXT_STAGE_T::evaluate_insert(
1182 nxt_container, key, value, position.nxt, false);
1183 }
1184 } else {
1185 assert(match == std::strong_ordering::less);
1186 if (index == 0) {
1187 // already the first index, so insert at the current index
1188 return {STAGE, insert_size(key, value)};
1189 }
1190 --index;
1191 iter = iterator_t(container);
1192 iter.seek_at(index);
1193 // proceed to evaluate the previous index
1194 }
1195 }
1196
1197 // XXX(multi-type): when key is from a different type of node
1198 auto match = key <=> iter.get_key();
1199 if (match == std::strong_ordering::greater) {
1200 // key doesn't match both indexes, so insert at the current index
1201 ++index;
1202 return {STAGE, insert_size(key, value)};
1203 } else {
1204 assert(match == std::strong_ordering::equal);
1205 if constexpr (IS_BOTTOM) {
1206 // ceph_abort?
1207 ceph_abort("insert conflict at the previous index!");
1208 } else {
1209 // insert into the previous index
1210 auto nxt_container = iter.get_nxt_container();
1211 return NXT_STAGE_T::evaluate_insert(
1212 nxt_container, key, value, position.nxt, true);
1213 }
1214 }
1215 }
1216
1217 template <typename T = bool>
1218 static std::enable_if_t<NODE_TYPE == node_type_t::LEAF, T>
1219 compensate_insert_position_at(match_stage_t stage, position_t& position) {
1220 auto& index = position.index;
1221 if (stage == STAGE) {
1222 assert(index == 0);
1223 // insert at the end of the current stage
1224 index = INDEX_END;
1225 return true;
1226 } else {
1227 if constexpr (IS_BOTTOM) {
1228 ceph_abort("impossible path");
1229 } else {
1230 assert(stage < STAGE);
1231 bool compensate = NXT_STAGE_T::
1232 compensate_insert_position_at(stage, position.nxt);
1233 if (compensate) {
1234 assert(is_valid_index(index));
1235 if (index == 0) {
1236 // insert into the *last* index of the current stage
1237 index = INDEX_LAST;
1238 return true;
1239 } else {
1240 --index;
1241 return false;
1242 }
1243 } else {
1244 return false;
1245 }
1246 }
1247 }
1248 }
1249
1250 static void patch_insert_end(position_t& insert_pos, match_stage_t insert_stage) {
1251 assert(insert_stage <= STAGE);
1252 if (insert_stage == STAGE) {
1253 insert_pos.index = INDEX_END;
1254 } else if constexpr (!IS_BOTTOM) {
1255 insert_pos.index = INDEX_LAST;
1256 NXT_STAGE_T::patch_insert_end(insert_pos.nxt, insert_stage);
1257 }
1258 }
1259
1260 template <typename T = std::tuple<match_stage_t, node_offset_t>>
1261 static std::enable_if_t<NODE_TYPE == node_type_t::LEAF, T> evaluate_insert(
1262 const key_hobj_t& key, const value_config_t& value,
1263 const MatchHistory& history, match_stat_t mstat, position_t& position) {
1264 match_stage_t insert_stage = STAGE_TOP;
1265 while (*history.get_by_stage(insert_stage) == MatchKindCMP::EQ) {
1266 assert(insert_stage != STAGE_BOTTOM && "insert conflict!");
1267 --insert_stage;
1268 }
1269
1270 if (history.is_GT()) {
1271 if (position.is_end()) {
1272 // no need to compensate insert position
1273 assert(insert_stage <= STAGE && "impossible insert stage");
1274 } else if (position == position_t::begin()) {
1275 // I must be short-circuited by staged::smallest_result()
1276 // in staged::lower_bound(), so we need to rely on mstat instead
1277 assert(mstat >= MSTAT_LT0 && mstat <= MSTAT_LT3);
1278 if (mstat == MSTAT_LT0) {
1279 insert_stage = STAGE_RIGHT;
1280 } else if (mstat == MSTAT_LT1) {
1281 insert_stage = STAGE_STRING;
1282 } else {
1283 insert_stage = STAGE_LEFT;
1284 }
1285 // XXX(multi-type): need to upgrade node type before inserting an
1286 // incompatible index at front.
1287 assert(insert_stage <= STAGE && "incompatible insert");
1288 } else {
1289 assert(insert_stage <= STAGE && "impossible insert stage");
1290 [[maybe_unused]] bool ret = compensate_insert_position_at(insert_stage, position);
1291 assert(!ret);
1292 }
1293 }
1294
1295 if (position.is_end()) {
1296 patch_insert_end(position, insert_stage);
1297 }
1298
1299 node_offset_t insert_size = insert_size_at(insert_stage, key, value);
1300
1301 return {insert_stage, insert_size};
1302 }
1303
1304 template <KeyT KT>
1305 static const value_t* insert_new(
1306 NodeExtentMutable& mut, const memory_range_t& range,
1307 const full_key_t<KT>& key, const value_input_t& value) {
1308 char* p_insert = const_cast<char*>(range.p_end);
1309 const value_t* p_value = nullptr;
1310 StagedAppender<KT> appender;
1311 appender.init_empty(&mut, p_insert);
1312 appender.append(key, value, p_value);
1313 [[maybe_unused]] const char* p_insert_front = appender.wrap();
1314 assert(p_insert_front == range.p_start);
1315 return p_value;
1316 }
1317
1318 template <KeyT KT, bool SPLIT>
1319 static const value_t* proceed_insert_recursively(
1320 NodeExtentMutable& mut, const container_t& container,
1321 const full_key_t<KT>& key, const value_input_t& value,
1322 position_t& position, match_stage_t& stage,
1323 node_offset_t& _insert_size, const char* p_left_bound) {
1324 // proceed insert from right to left
1325 assert(stage <= STAGE);
1326 auto iter = iterator_t(container);
1327 auto& index = position.index;
1328
1329 bool do_insert = false;
1330 if (stage == STAGE) {
1331 if (index == INDEX_END) {
1332 iter.seek_last();
1333 iter.set_end();
1334 index = iter.index();
1335 } else {
1336 assert(is_valid_index(index));
1337 iter.seek_till_end(index);
1338 }
1339 do_insert = true;
1340 } else { // stage < STAGE
1341 if (index == INDEX_LAST) {
1342 iter.seek_last();
1343 index = iter.index();
1344 } else {
1345 assert(is_valid_index(index));
1346 iter.seek_till_end(index);
1347 }
1348 if constexpr (SPLIT) {
1349 if (iter.is_end()) {
1350 // insert at the higher stage due to split
1351 do_insert = true;
1352 _insert_size = insert_size(key, value);
1353 stage = STAGE;
1354 }
1355 } else {
1356 assert(!iter.is_end());
1357 }
1358 }
1359
1360 if (do_insert) {
1361 if constexpr (!IS_BOTTOM) {
1362 position.nxt = position_t::nxt_t::begin();
1363 }
1364 assert(_insert_size == insert_size(key, value));
1365 if constexpr (IS_BOTTOM) {
1366 return iter.insert(
1367 mut, key, value, _insert_size, p_left_bound);
1368 } else {
1369 auto range = iter.insert_prefix(
1370 mut, key, _insert_size, p_left_bound);
1371 return NXT_STAGE_T::template insert_new<KT>(mut, range, key, value);
1372 }
1373 } else {
1374 if constexpr (!IS_BOTTOM) {
1375 auto nxt_container = iter.get_nxt_container();
1376 auto p_value = NXT_STAGE_T::template proceed_insert_recursively<KT, SPLIT>(
1377 mut, nxt_container, key, value,
1378 position.nxt, stage, _insert_size, p_left_bound);
1379 iter.update_size(mut, _insert_size);
1380 return p_value;
1381 } else {
1382 ceph_abort("impossible path");
1383 }
1384 }
1385 }
1386
1387 template <KeyT KT, bool SPLIT>
1388 static const value_t* proceed_insert(
1389 NodeExtentMutable& mut, const container_t& container,
1390 const full_key_t<KT>& key, const value_input_t& value,
1391 position_t& position, match_stage_t& stage, node_offset_t& _insert_size) {
1392 auto p_left_bound = container.p_left_bound();
1393 if (unlikely(!container.keys())) {
1394 if (position.is_end()) {
1395 position = position_t::begin();
1396 assert(stage == STAGE);
1397 assert(_insert_size == insert_size(key, value));
1398 } else if (position == position_t::begin()) {
1399 // when insert into a trimmed and empty left node
1400 stage = STAGE;
1401 _insert_size = insert_size(key, value);
1402 } else {
1403 ceph_abort("impossible path");
1404 }
1405 if constexpr (IS_BOTTOM) {
1406 return container_t::insert_at(
1407 mut, container, key, value, 0, _insert_size, p_left_bound);
1408 } else {
1409 auto range = container_t::template insert_prefix_at(
1410 mut, container, key, 0, _insert_size, p_left_bound);
1411 return NXT_STAGE_T::template insert_new<KT>(mut, range, key, value);
1412 }
1413 } else {
1414 return proceed_insert_recursively<KT, SPLIT>(
1415 mut, container, key, value,
1416 position, stage, _insert_size, p_left_bound);
1417 }
1418 }
1419
1420 static std::ostream& dump(const container_t& container,
1421 std::ostream& os,
1422 const std::string& prefix,
1423 size_t& size,
1424 const char* p_start) {
1425 auto iter = iterator_t(container);
1426 assert(!iter.is_end());
1427 std::string prefix_blank(prefix.size(), ' ');
1428 const std::string* p_prefix = &prefix;
1429 size += iterator_t::header_size();
1430 do {
1431 std::ostringstream sos;
1432 sos << *p_prefix << iter.get_key() << ": ";
1433 std::string i_prefix = sos.str();
1434 if constexpr (!IS_BOTTOM) {
1435 auto nxt_container = iter.get_nxt_container();
1436 size += iter.size_to_nxt();
1437 NXT_STAGE_T::dump(nxt_container, os, i_prefix, size, p_start);
1438 } else {
1439 auto value_ptr = iter.get_p_value();
1440 int offset = reinterpret_cast<const char*>(value_ptr) - p_start;
1441 size += iter.size();
1442 os << "\n" << i_prefix;
1443 if constexpr (NODE_TYPE == node_type_t::LEAF) {
1444 os << *value_ptr;
1445 } else {
1446 os << "0x" << std::hex << value_ptr->value << std::dec;
1447 }
1448 os << " " << size << "B"
1449 << " @" << offset << "B";
1450 }
1451 if (iter.is_last()) {
1452 break;
1453 } else {
1454 ++iter;
1455 p_prefix = &prefix_blank;
1456 }
1457 } while (true);
1458 return os;
1459 }
1460
1461 static void validate(const container_t& container) {
1462 auto iter = iterator_t(container);
1463 assert(!iter.is_end());
1464 auto key = iter.get_key();
1465 do {
1466 if constexpr (!IS_BOTTOM) {
1467 auto nxt_container = iter.get_nxt_container();
1468 NXT_STAGE_T::validate(nxt_container);
1469 }
1470 if (iter.is_last()) {
1471 break;
1472 } else {
1473 ++iter;
1474 assert(key < iter.get_key());
1475 key = iter.get_key();
1476 }
1477 } while (true);
1478 }
1479
1480 static void get_stats(const container_t& container, node_stats_t& stats,
1481 key_view_t& index_key) {
1482 auto iter = iterator_t(container);
1483 assert(!iter.is_end());
1484 stats.size_overhead += iterator_t::header_size();
1485 do {
1486 index_key.replace(iter.get_key());
1487 stats.size_overhead += iter.size_overhead();
1488 if constexpr (!IS_BOTTOM) {
1489 auto nxt_container = iter.get_nxt_container();
1490 NXT_STAGE_T::get_stats(nxt_container, stats, index_key);
1491 } else {
1492 ++stats.num_kvs;
1493 size_t kv_logical_size = index_key.size_logical();
1494 size_t value_size;
1495 if constexpr (NODE_TYPE == node_type_t::LEAF) {
1496 value_size = iter.get_p_value()->allocation_size();
1497 } else {
1498 value_size = sizeof(value_t);
1499 }
1500 stats.size_value += value_size;
1501 kv_logical_size += value_size;
1502 stats.size_logical += kv_logical_size;
1503 }
1504 if (iter.is_last()) {
1505 break;
1506 } else {
1507 ++iter;
1508 }
1509 } while (true);
1510 }
1511
1512 template <bool GET_KEY, bool GET_VAL>
1513 static bool get_next_slot(
1514 const container_t& container, // IN
1515 position_t& pos, // IN&OUT
1516 key_view_t* p_index_key, // OUT
1517 const value_t** pp_value) { // OUT
1518 auto iter = iterator_t(container);
1519 assert(!iter.is_end());
1520 iter.seek_at(pos.index);
1521 bool find_next;
1522 if constexpr (!IS_BOTTOM) {
1523 auto nxt_container = iter.get_nxt_container();
1524 find_next = NXT_STAGE_T::template get_next_slot<GET_KEY, GET_VAL>(
1525 nxt_container, pos.nxt, p_index_key, pp_value);
1526 } else {
1527 find_next = true;
1528 }
1529
1530 if (find_next) {
1531 if (iter.is_last()) {
1532 return true;
1533 } else {
1534 pos.index = iter.index() + 1;
1535 if constexpr (!IS_BOTTOM) {
1536 pos.nxt = NXT_STAGE_T::position_t::begin();
1537 }
1538 get_slot<GET_KEY, GET_VAL>(
1539 container, pos, p_index_key, pp_value);
1540 return false;
1541 }
1542 } else { // !find_next && !IS_BOTTOM
1543 if constexpr (GET_KEY) {
1544 assert(p_index_key);
1545 p_index_key->set(iter.get_key());
1546 } else {
1547 assert(!p_index_key);
1548 }
1549 return false;
1550 }
1551 }
1552
1553 template <bool GET_KEY, bool GET_VAL>
1554 static void get_prev_slot(
1555 const container_t& container, // IN
1556 position_t& pos, // IN&OUT
1557 key_view_t* p_index_key, // OUT
1558 const value_t** pp_value) { // OUT
1559 assert(pos != position_t::begin());
1560 assert(!pos.is_end());
1561 auto& index = pos.index;
1562 auto iter = iterator_t(container);
1563 if constexpr (!IS_BOTTOM) {
1564 auto& nxt_pos = pos.nxt;
1565 if (nxt_pos == NXT_STAGE_T::position_t::begin()) {
1566 assert(index);
1567 --index;
1568 iter.seek_at(index);
1569 auto nxt_container = iter.get_nxt_container();
1570 NXT_STAGE_T::template get_largest_slot<true, GET_KEY, GET_VAL>(
1571 nxt_container, &nxt_pos, p_index_key, pp_value);
1572 } else {
1573 iter.seek_at(index);
1574 auto nxt_container = iter.get_nxt_container();
1575 NXT_STAGE_T::template get_prev_slot<GET_KEY, GET_VAL>(
1576 nxt_container, nxt_pos, p_index_key, pp_value);
1577 }
1578 } else {
1579 assert(index);
1580 --index;
1581 iter.seek_at(index);
1582 if constexpr (GET_VAL) {
1583 assert(pp_value);
1584 *pp_value = iter.get_p_value();
1585 } else {
1586 assert(!pp_value);
1587 }
1588 }
1589 if constexpr (GET_KEY) {
1590 p_index_key->set(iter.get_key());
1591 } else {
1592 assert(!p_index_key);
1593 }
1594 }
1595
1596 struct _BaseEmpty {};
1597 class _BaseWithNxtIterator {
1598 protected:
1599 typename NXT_STAGE_T::StagedIterator _nxt;
1600 };
1601 class StagedIterator
1602 : std::conditional_t<IS_BOTTOM, _BaseEmpty, _BaseWithNxtIterator> {
1603 public:
1604 StagedIterator() = default;
1605 bool valid() const { return iter.has_value(); }
1606 index_t index() const {
1607 return iter->index();
1608 }
1609 bool is_end() const { return iter->is_end(); }
1610 bool in_progress() const {
1611 assert(valid());
1612 assert(!is_end());
1613 if constexpr (!IS_BOTTOM) {
1614 if (this->_nxt.valid()) {
1615 if (this->_nxt.index() == 0) {
1616 return this->_nxt.in_progress();
1617 } else {
1618 return true;
1619 }
1620 } else {
1621 return false;
1622 }
1623 } else {
1624 return false;
1625 }
1626 }
1627 key_get_type get_key() const { return iter->get_key(); }
1628
1629 iterator_t& get() { return *iter; }
1630 void set(const container_t& container) {
1631 assert(!valid());
1632 iter = iterator_t(container);
1633 }
1634 void set_end() { iter->set_end(); }
1635 typename NXT_STAGE_T::StagedIterator& nxt() {
1636 if constexpr (!IS_BOTTOM) {
1637 if (!this->_nxt.valid()) {
1638 auto nxt_container = iter->get_nxt_container();
1639 this->_nxt.set(nxt_container);
1640 }
1641 return this->_nxt;
1642 } else {
1643 ceph_abort("impossible path");
1644 }
1645 }
1646 typename NXT_STAGE_T::StagedIterator& get_nxt() {
1647 if constexpr (!IS_BOTTOM) {
1648 return this->_nxt;
1649 } else {
1650 ceph_abort("impossible path");
1651 }
1652 }
1653 StagedIterator& operator++() {
1654 if (iter->is_last()) {
1655 iter->set_end();
1656 } else {
1657 ++(*iter);
1658 }
1659 if constexpr (!IS_BOTTOM) {
1660 this->_nxt.reset();
1661 }
1662 return *this;
1663 }
1664 void reset() {
1665 if (valid()) {
1666 iter.reset();
1667 if constexpr (!IS_BOTTOM) {
1668 this->_nxt.reset();
1669 }
1670 }
1671 }
1672
1673 template<typename OutputIt>
1674 auto do_format_to(OutputIt out, bool is_top) const {
1675 if (valid()) {
1676 if (iter->is_end()) {
1677 return fmt::format_to(out, "END");
1678 } else {
1679 out = fmt::format_to(out, "{}", index());
1680 }
1681 } else {
1682 if (is_top) {
1683 return fmt::format_to(out, "invalid StagedIterator!");
1684 } else {
1685 out = fmt::format_to(out, "0!");
1686 }
1687 }
1688 if constexpr (!IS_BOTTOM) {
1689 out = fmt::format_to(out, ", ");
1690 return this->_nxt.do_format_to(out, false);
1691 } else {
1692 return out;
1693 }
1694 }
1695
1696 position_t get_pos() const {
1697 if (valid()) {
1698 if constexpr (IS_BOTTOM) {
1699 return position_t{index()};
1700 } else {
1701 return position_t{index(), this->_nxt.get_pos()};
1702 }
1703 } else {
1704 return position_t::begin();
1705 }
1706 }
1707 void encode(const char* p_node_start, ceph::bufferlist& encoded) const {
1708 uint8_t present = static_cast<bool>(iter);
1709 ceph::encode(present, encoded);
1710 if (iter.has_value()) {
1711 iter->encode(p_node_start, encoded);
1712 if constexpr (!IS_BOTTOM) {
1713 this->_nxt.encode(p_node_start, encoded);
1714 }
1715 }
1716 }
1717 static StagedIterator decode(const char* p_node_start,
1718 extent_len_t node_size,
1719 ceph::bufferlist::const_iterator& delta) {
1720 StagedIterator ret;
1721 uint8_t present;
1722 ceph::decode(present, delta);
1723 if (present) {
1724 ret.iter = iterator_t::decode(
1725 p_node_start, node_size, delta);
1726 if constexpr (!IS_BOTTOM) {
1727 ret._nxt = NXT_STAGE_T::StagedIterator::decode(
1728 p_node_start, node_size, delta);
1729 }
1730 }
1731 return ret;
1732 }
1733 private:
1734 std::optional<iterator_t> iter;
1735 };
1736
1737 static bool recursively_locate_split(
1738 size_t& current_size, size_t extra_size,
1739 size_t target_size, StagedIterator& split_at) {
1740 assert(current_size <= target_size);
1741 iterator_t& split_iter = split_at.get();
1742 current_size = split_iter.seek_split(current_size, extra_size, target_size);
1743 assert(current_size <= target_size);
1744 assert(!split_iter.is_end());
1745 if (split_iter.index() == 0) {
1746 extra_size += iterator_t::header_size();
1747 } else {
1748 extra_size = 0;
1749 }
1750 bool locate_nxt;
1751 if constexpr (!IS_BOTTOM) {
1752 locate_nxt = NXT_STAGE_T::recursively_locate_split(
1753 current_size, extra_size + split_iter.size_to_nxt(),
1754 target_size, split_at.nxt());
1755 } else { // IS_BOTTOM
1756 // located upper_bound, fair split strategy
1757 size_t nxt_size = split_iter.size() + extra_size;
1758 assert(current_size + nxt_size > target_size);
1759 if (current_size + nxt_size/2 < target_size) {
1760 // include next
1761 current_size += nxt_size;
1762 locate_nxt = true;
1763 } else {
1764 // exclude next
1765 locate_nxt = false;
1766 }
1767 }
1768 if (locate_nxt) {
1769 if (split_iter.is_last()) {
1770 return true;
1771 } else {
1772 ++split_at;
1773 return false;
1774 }
1775 } else {
1776 return false;
1777 }
1778 }
1779
1780 static bool recursively_locate_split_inserted(
1781 size_t& current_size, size_t extra_size, size_t target_size,
1782 position_t& insert_pos, match_stage_t insert_stage, size_t insert_size,
1783 std::optional<bool>& is_insert_left, StagedIterator& split_at) {
1784 assert(current_size <= target_size);
1785 assert(!is_insert_left.has_value());
1786 iterator_t& split_iter = split_at.get();
1787 auto& insert_index = insert_pos.index;
1788 if (insert_stage == STAGE) {
1789 current_size = split_iter.template seek_split_inserted<true>(
1790 current_size, extra_size, target_size,
1791 insert_index, insert_size, is_insert_left);
1792 assert(is_insert_left.has_value());
1793 assert(current_size <= target_size);
1794 if (split_iter.index() == 0) {
1795 if (insert_index == 0) {
1796 if (*is_insert_left == false) {
1797 extra_size += iterator_t::header_size();
1798 } else {
1799 extra_size = 0;
1800 }
1801 } else {
1802 extra_size += iterator_t::header_size();
1803 }
1804 } else {
1805 extra_size = 0;
1806 }
1807 if (*is_insert_left == false && split_iter.index() == insert_index) {
1808 // split_iter can be end
1809 // found the lower-bound of target_size
1810 // ...[s_index-1] |!| (i_index) [s_index]...
1811
1812 // located upper-bound, fair split strategy
1813 // look at the next slot (the insert item)
1814 size_t nxt_size = insert_size + extra_size;
1815 assert(current_size + nxt_size > target_size);
1816 if (current_size + nxt_size/2 < target_size) {
1817 // include next
1818 *is_insert_left = true;
1819 current_size += nxt_size;
1820 if (split_iter.is_end()) {
1821 // ...[s_index-1] (i_index) |!|
1822 return true;
1823 } else {
1824 return false;
1825 }
1826 } else {
1827 // exclude next
1828 return false;
1829 }
1830 } else {
1831 // Already considered insert effect in the current stage.
1832 // Look into the next stage to identify the target_size lower-bound w/o
1833 // insert effect.
1834 assert(!split_iter.is_end());
1835 bool locate_nxt;
1836 if constexpr (!IS_BOTTOM) {
1837 locate_nxt = NXT_STAGE_T::recursively_locate_split(
1838 current_size, extra_size + split_iter.size_to_nxt(),
1839 target_size, split_at.nxt());
1840 } else { // IS_BOTTOM
1841 // located upper-bound, fair split strategy
1842 // look at the next slot
1843 size_t nxt_size = split_iter.size() + extra_size;
1844 assert(current_size + nxt_size > target_size);
1845 if (current_size + nxt_size/2 < target_size) {
1846 // include next
1847 current_size += nxt_size;
1848 locate_nxt = true;
1849 } else {
1850 // exclude next
1851 locate_nxt = false;
1852 }
1853 }
1854 if (locate_nxt) {
1855 if (split_iter.is_last()) {
1856 auto end_index = split_iter.index() + 1;
1857 if (insert_index == INDEX_END) {
1858 insert_index = end_index;
1859 }
1860 assert(insert_index <= end_index);
1861 if (insert_index == end_index) {
1862 assert(*is_insert_left == false);
1863 split_iter.set_end();
1864 // ...[s_index-1] |!| (i_index)
1865 return false;
1866 } else {
1867 assert(*is_insert_left == true);
1868 return true;
1869 }
1870 } else {
1871 ++split_at;
1872 return false;
1873 }
1874 } else {
1875 return false;
1876 }
1877 }
1878 } else {
1879 if constexpr (!IS_BOTTOM) {
1880 assert(insert_stage < STAGE);
1881 current_size = split_iter.template seek_split_inserted<false>(
1882 current_size, extra_size, target_size,
1883 insert_index, insert_size, is_insert_left);
1884 assert(!split_iter.is_end());
1885 assert(current_size <= target_size);
1886 if (split_iter.index() == 0) {
1887 extra_size += iterator_t::header_size();
1888 } else {
1889 extra_size = 0;
1890 }
1891 bool locate_nxt;
1892 if (!is_insert_left.has_value()) {
1893 // Considered insert effect in the current stage, and insert happens
1894 // in the lower stage.
1895 // Look into the next stage to identify the target_size lower-bound w/
1896 // insert effect.
1897 assert(split_iter.index() == insert_index);
1898 locate_nxt = NXT_STAGE_T::recursively_locate_split_inserted(
1899 current_size, extra_size + split_iter.size_to_nxt(), target_size,
1900 insert_pos.nxt, insert_stage, insert_size,
1901 is_insert_left, split_at.nxt());
1902 assert(is_insert_left.has_value());
1903 #ifndef NDEBUG
1904 if (locate_nxt) {
1905 assert(*is_insert_left == true);
1906 }
1907 #endif
1908 } else {
1909 // is_insert_left.has_value() == true
1910 // Insert will *not* happen in the lower stage.
1911 // Need to look into the next stage to identify the target_size
1912 // lower-bound w/ insert effect
1913 assert(split_iter.index() != insert_index);
1914 locate_nxt = NXT_STAGE_T::recursively_locate_split(
1915 current_size, extra_size + split_iter.size_to_nxt(),
1916 target_size, split_at.nxt());
1917 #ifndef NDEBUG
1918 if (split_iter.index() < insert_index) {
1919 assert(*is_insert_left == false);
1920 } else {
1921 assert(*is_insert_left == true);
1922 }
1923 #endif
1924 }
1925 if (locate_nxt) {
1926 if (split_iter.is_last()) {
1927 return true;
1928 } else {
1929 ++split_at;
1930 return false;
1931 }
1932 } else {
1933 return false;
1934 }
1935 } else {
1936 ceph_abort("impossible path");
1937 return false;;
1938 }
1939 }
1940 }
1941
1942 /*
1943 * container appender type system
1944 * container_t::Appender(NodeExtentMutable& mut, char* p_append)
1945 * append(const container_t& src, index_t from, index_t items)
1946 * wrap() -> char*
1947 * IF !IS_BOTTOM:
1948 * open_nxt(const key_get_type&)
1949 * open_nxt(const full_key_t&)
1950 * -> std::tuple<NodeExtentMutable&, char*>
1951 * wrap_nxt(char* p_append)
1952 * ELSE
1953 * append(const full_key_t& key, const value_input_t& value)
1954 */
1955 template <KeyT KT>
1956 struct _BaseWithNxtAppender {
1957 typename NXT_STAGE_T::template StagedAppender<KT> _nxt;
1958 };
1959 template <KeyT KT>
1960 class StagedAppender
1961 : std::conditional_t<IS_BOTTOM, _BaseEmpty, _BaseWithNxtAppender<KT>> {
1962 public:
1963 StagedAppender() = default;
1964 ~StagedAppender() {
1965 assert(!require_wrap_nxt);
1966 assert(!valid());
1967 }
1968 bool valid() const { return appender.has_value(); }
1969 index_t index() const {
1970 assert(valid());
1971 return _index;
1972 }
1973 bool in_progress() const { return require_wrap_nxt; }
1974 // TODO: pass by reference
1975 void init_empty(NodeExtentMutable* p_mut, char* p_start) {
1976 assert(!valid());
1977 appender = typename container_t::template Appender<KT>(p_mut, p_start);
1978 _index = 0;
1979 }
1980 void init_tail(NodeExtentMutable* p_mut,
1981 const container_t& container,
1982 match_stage_t stage) {
1983 assert(!valid());
1984 auto iter = iterator_t(container);
1985 iter.seek_last();
1986 if (stage == STAGE) {
1987 appender = iter.template get_appender<KT>(p_mut);
1988 _index = iter.index() + 1;
1989 if constexpr (!IS_BOTTOM) {
1990 assert(!this->_nxt.valid());
1991 }
1992 } else {
1993 assert(stage < STAGE);
1994 if constexpr (!IS_BOTTOM) {
1995 appender = iter.template get_appender_opened<KT>(p_mut);
1996 _index = iter.index();
1997 require_wrap_nxt = true;
1998 auto nxt_container = iter.get_nxt_container();
1999 this->_nxt.init_tail(p_mut, nxt_container, stage);
2000 } else {
2001 ceph_abort("impossible path");
2002 }
2003 }
2004 }
2005 // possible to make src_iter end if to_index == INDEX_END
2006 void append_until(StagedIterator& src_iter, index_t& to_index) {
2007 assert(!require_wrap_nxt);
2008 auto s_index = src_iter.index();
2009 src_iter.get().template copy_out_until<KT>(*appender, to_index);
2010 assert(src_iter.index() == to_index);
2011 assert(to_index >= s_index);
2012 auto increment = (to_index - s_index);
2013 if (increment) {
2014 _index += increment;
2015 if constexpr (!IS_BOTTOM) {
2016 src_iter.get_nxt().reset();
2017 }
2018 }
2019 }
2020 void append(const full_key_t<KT>& key,
2021 const value_input_t& value, const value_t*& p_value) {
2022 assert(!require_wrap_nxt);
2023 if constexpr (!IS_BOTTOM) {
2024 auto& nxt = open_nxt(key);
2025 nxt.append(key, value, p_value);
2026 wrap_nxt();
2027 } else {
2028 appender->append(key, value, p_value);
2029 ++_index;
2030 }
2031 }
2032 char* wrap() {
2033 assert(valid());
2034 assert(_index > 0);
2035 if constexpr (!IS_BOTTOM) {
2036 if (require_wrap_nxt) {
2037 wrap_nxt();
2038 }
2039 }
2040 auto ret = appender->wrap();
2041 appender.reset();
2042 return ret;
2043 }
2044 typename NXT_STAGE_T::template StagedAppender<KT>&
2045 open_nxt(key_get_type paritial_key) {
2046 assert(!require_wrap_nxt);
2047 if constexpr (!IS_BOTTOM) {
2048 require_wrap_nxt = true;
2049 auto [p_mut, p_append] = appender->open_nxt(paritial_key);
2050 this->_nxt.init_empty(p_mut, p_append);
2051 return this->_nxt;
2052 } else {
2053 ceph_abort("impossible path");
2054 }
2055 }
2056 typename NXT_STAGE_T::template StagedAppender<KT>&
2057 open_nxt(const full_key_t<KT>& key) {
2058 assert(!require_wrap_nxt);
2059 if constexpr (!IS_BOTTOM) {
2060 require_wrap_nxt = true;
2061 auto [p_mut, p_append] = appender->open_nxt(key);
2062 this->_nxt.init_empty(p_mut, p_append);
2063 return this->_nxt;
2064 } else {
2065 ceph_abort("impossible path");
2066 }
2067 }
2068 typename NXT_STAGE_T::template StagedAppender<KT>& get_nxt() {
2069 if constexpr (!IS_BOTTOM) {
2070 assert(require_wrap_nxt);
2071 return this->_nxt;
2072 } else {
2073 ceph_abort("impossible path");
2074 }
2075 }
2076 void wrap_nxt() {
2077 if constexpr (!IS_BOTTOM) {
2078 assert(require_wrap_nxt);
2079 require_wrap_nxt = false;
2080 auto p_append = this->_nxt.wrap();
2081 appender->wrap_nxt(p_append);
2082 ++_index;
2083 } else {
2084 ceph_abort("impossible path");
2085 }
2086 }
2087 private:
2088 std::optional<typename container_t::template Appender<KT>> appender;
2089 index_t _index;
2090 bool require_wrap_nxt = false;
2091 };
2092
2093 template <KeyT KT>
2094 static void _append_range(
2095 StagedIterator& src_iter, StagedAppender<KT>& appender, index_t& to_index) {
2096 if (src_iter.is_end()) {
2097 // append done
2098 assert(to_index == INDEX_END);
2099 to_index = src_iter.index();
2100 } else if constexpr (!IS_BOTTOM) {
2101 if (appender.in_progress()) {
2102 // appender has appended something at the current item,
2103 // cannot append the current item as-a-whole
2104 index_t to_index_nxt = INDEX_END;
2105 NXT_STAGE_T::template _append_range<KT>(
2106 src_iter.nxt(), appender.get_nxt(), to_index_nxt);
2107 ++src_iter;
2108 appender.wrap_nxt();
2109 } else if (src_iter.in_progress()) {
2110 // src_iter is not at the beginning of the current item,
2111 // cannot append the current item as-a-whole
2112 index_t to_index_nxt = INDEX_END;
2113 NXT_STAGE_T::template _append_range<KT>(
2114 src_iter.get_nxt(), appender.open_nxt(src_iter.get_key()), to_index_nxt);
2115 ++src_iter;
2116 appender.wrap_nxt();
2117 } else {
2118 // we can safely append the current item as-a-whole
2119 }
2120 }
2121 appender.append_until(src_iter, to_index);
2122 }
2123
2124 template <KeyT KT>
2125 static void _append_into(StagedIterator& src_iter, StagedAppender<KT>& appender,
2126 position_t& position, match_stage_t stage) {
2127 assert(position.index == src_iter.index());
2128 // reaches the last item
2129 if (stage == STAGE) {
2130 // done, end recursion
2131 if constexpr (!IS_BOTTOM) {
2132 position.nxt = position_t::nxt_t::begin();
2133 }
2134 } else {
2135 assert(stage < STAGE);
2136 // proceed append in the next stage
2137 NXT_STAGE_T::template append_until<KT>(
2138 src_iter.nxt(), appender.open_nxt(src_iter.get_key()),
2139 position.nxt, stage);
2140 }
2141 }
2142
2143 template <KeyT KT>
2144 static void append_until(StagedIterator& src_iter, StagedAppender<KT>& appender,
2145 position_t& position, match_stage_t stage) {
2146 index_t from_index = src_iter.index();
2147 index_t& to_index = position.index;
2148 assert(from_index <= to_index);
2149 if constexpr (IS_BOTTOM) {
2150 assert(stage == STAGE);
2151 appender.append_until(src_iter, to_index);
2152 } else {
2153 assert(stage <= STAGE);
2154 if (src_iter.index() == to_index) {
2155 _append_into<KT>(src_iter, appender, position, stage);
2156 } else {
2157 if (to_index == INDEX_END) {
2158 assert(stage == STAGE);
2159 } else if (to_index == INDEX_LAST) {
2160 assert(stage < STAGE);
2161 }
2162 _append_range<KT>(src_iter, appender, to_index);
2163 _append_into<KT>(src_iter, appender, position, stage);
2164 }
2165 }
2166 to_index -= from_index;
2167 }
2168
2169 template <KeyT KT>
2170 static bool append_insert(
2171 const full_key_t<KT>& key, const value_input_t& value,
2172 StagedIterator& src_iter, StagedAppender<KT>& appender,
2173 bool is_front_insert, match_stage_t& stage, const value_t*& p_value) {
2174 assert(src_iter.valid());
2175 if (stage == STAGE) {
2176 appender.append(key, value, p_value);
2177 if (src_iter.is_end()) {
2178 return true;
2179 } else {
2180 return false;
2181 }
2182 } else {
2183 assert(stage < STAGE);
2184 if constexpr (!IS_BOTTOM) {
2185 auto nxt_is_end = NXT_STAGE_T::template append_insert<KT>(
2186 key, value, src_iter.get_nxt(), appender.get_nxt(),
2187 is_front_insert, stage, p_value);
2188 if (nxt_is_end) {
2189 appender.wrap_nxt();
2190 ++src_iter;
2191 if (is_front_insert) {
2192 stage = STAGE;
2193 }
2194 if (src_iter.is_end()) {
2195 return true;
2196 }
2197 }
2198 return false;
2199 } else {
2200 ceph_abort("impossible path");
2201 }
2202 }
2203 }
2204
2205 /* TrimType:
2206 * BEFORE: remove the entire container, normally means the according higher
2207 * stage iterator needs to be trimmed as-a-whole.
2208 * AFTER: retain the entire container, normally means the trim should be
2209 * start from the next iterator at the higher stage.
2210 * AT: trim happens in the current container, and the according higher
2211 * stage iterator needs to be adjusted by the trimmed size.
2212 */
2213 static std::tuple<TrimType, node_offset_t>
2214 recursively_trim(NodeExtentMutable& mut, StagedIterator& trim_at) {
2215 if (!trim_at.valid()) {
2216 return {TrimType::BEFORE, 0u};
2217 }
2218 if (trim_at.is_end()) {
2219 return {TrimType::AFTER, 0u};
2220 }
2221
2222 auto& iter = trim_at.get();
2223 if constexpr (!IS_BOTTOM) {
2224 auto [type, trimmed] = NXT_STAGE_T::recursively_trim(
2225 mut, trim_at.get_nxt());
2226 node_offset_t trim_size;
2227 if (type == TrimType::AFTER) {
2228 if (iter.is_last()) {
2229 return {TrimType::AFTER, 0u};
2230 }
2231 ++trim_at;
2232 trim_size = iter.trim_until(mut);
2233 } else if (type == TrimType::BEFORE) {
2234 if (iter.index() == 0) {
2235 return {TrimType::BEFORE, 0u};
2236 }
2237 trim_size = iter.trim_until(mut);
2238 } else {
2239 trim_size = iter.trim_at(mut, trimmed);
2240 }
2241 return {TrimType::AT, trim_size};
2242 } else {
2243 if (iter.index() == 0) {
2244 return {TrimType::BEFORE, 0u};
2245 } else {
2246 auto trimmed = iter.trim_until(mut);
2247 return {TrimType::AT, trimmed};
2248 }
2249 }
2250 }
2251
2252 static void trim(NodeExtentMutable& mut, StagedIterator& trim_at) {
2253 auto [type, trimmed] = recursively_trim(mut, trim_at);
2254 if (type == TrimType::BEFORE) {
2255 assert(trim_at.valid());
2256 auto& iter = trim_at.get();
2257 iter.trim_until(mut);
2258 }
2259 }
2260
2261 static std::optional<std::tuple<match_stage_t, node_offset_t, bool>>
2262 proceed_erase_recursively(
2263 NodeExtentMutable& mut,
2264 const container_t& container, // IN
2265 const char* p_left_bound, // IN
2266 position_t& pos) { // IN&OUT
2267 auto iter = iterator_t(container);
2268 auto& index = pos.index;
2269 assert(is_valid_index(index));
2270 iter.seek_at(index);
2271 bool is_last = iter.is_last();
2272
2273 if constexpr (!IS_BOTTOM) {
2274 auto nxt_container = iter.get_nxt_container();
2275 auto ret = NXT_STAGE_T::proceed_erase_recursively(
2276 mut, nxt_container, p_left_bound, pos.nxt);
2277 if (ret.has_value()) {
2278 // erased at lower level
2279 auto [r_stage, r_erase_size, r_done] = *ret;
2280 assert(r_erase_size != 0);
2281 iter.update_size(mut, -r_erase_size);
2282 if (r_done) {
2283 // done, the next_pos is calculated
2284 return ret;
2285 } else {
2286 if (is_last) {
2287 // need to find the next pos at upper stage
2288 return ret;
2289 } else {
2290 // done, calculate the next pos
2291 ++index;
2292 pos.nxt = NXT_STAGE_T::position_t::begin();
2293 return {{r_stage, r_erase_size, true}};
2294 }
2295 }
2296 }
2297 // not erased at lower level
2298 }
2299
2300 // not erased yet
2301 if (index == 0 && is_last) {
2302 // need to erase from the upper stage
2303 return std::nullopt;
2304 } else {
2305 auto erase_size = iter.erase(mut, p_left_bound);
2306 assert(erase_size != 0);
2307 if (is_last) {
2308 // need to find the next pos at upper stage
2309 return {{STAGE, erase_size, false}};
2310 } else {
2311 // done, calculate the next pos (should be correct already)
2312 if constexpr (!IS_BOTTOM) {
2313 assert(pos.nxt == NXT_STAGE_T::position_t::begin());
2314 }
2315 return {{STAGE, erase_size, true}};
2316 }
2317 }
2318 }
2319
2320 static match_stage_t erase(
2321 NodeExtentMutable& mut,
2322 const container_t& node_stage, // IN
2323 position_t& erase_pos) { // IN&OUT
2324 auto p_left_bound = node_stage.p_left_bound();
2325 auto ret = proceed_erase_recursively(
2326 mut, node_stage, p_left_bound, erase_pos);
2327 if (ret.has_value()) {
2328 auto [r_stage, r_erase_size, r_done] = *ret;
2329 std::ignore = r_erase_size;
2330 if (r_done) {
2331 assert(!erase_pos.is_end());
2332 return r_stage;
2333 } else {
2334 // erased the last kv
2335 erase_pos = position_t::end();
2336 return r_stage;
2337 }
2338 } else {
2339 assert(node_stage.keys() == 1);
2340 node_stage.erase_at(mut, node_stage, 0, p_left_bound);
2341 erase_pos = position_t::end();
2342 return STAGE;
2343 }
2344 }
2345
2346 static std::tuple<match_stage_t, node_offset_t> evaluate_merge(
2347 const key_view_t& left_pivot_index,
2348 const container_t& right_container) {
2349 auto r_iter = iterator_t(right_container);
2350 r_iter.seek_at(0);
2351 node_offset_t compensate = r_iter.header_size();
2352 auto cmp = left_pivot_index <=> r_iter.get_key();
2353 if (cmp == std::strong_ordering::equal) {
2354 if constexpr (!IS_BOTTOM) {
2355 // the index is equal, compensate and look at the lower stage
2356 compensate += r_iter.size_to_nxt();
2357 auto r_nxt_container = r_iter.get_nxt_container();
2358 auto [ret_stage, ret_compensate] = NXT_STAGE_T::evaluate_merge(
2359 left_pivot_index, r_nxt_container);
2360 compensate += ret_compensate;
2361 return {ret_stage, compensate};
2362 } else {
2363 ceph_abort("impossible path: left_pivot_key == right_first_key");
2364 }
2365 } else if (cmp == std::strong_ordering::less) {
2366 // ok, do merge here
2367 return {STAGE, compensate};
2368 } else {
2369 ceph_abort("impossible path: left_pivot_key < right_first_key");
2370 }
2371 }
2372 };
2373
2374 /**
2375 * Configurations for struct staged
2376 *
2377 * staged_params_* assembles different container_t implementations (defined by
2378 * stated::_iterator_t) by STAGE, and constructs the final multi-stage
2379 * implementations for different node layouts defined by
2380 * node_extent_t<FieldType, NODE_TYPE>.
2381 *
2382 * The specialized implementations for different layouts are accessible through
2383 * the helper type node_to_stage_t<node_extent_t<FieldType, NODE_TYPE>>.
2384 *
2385 * Specifically, the settings of 8 layouts are:
2386 *
2387 * The layout (N0, LEAF/INTERNAL) has 3 stages:
2388 * - STAGE_LEFT: node_extent_t<node_fields_0_t, LEAF/INTERNAL>
2389 * - STAGE_STRING: item_iterator_t<LEAF/INTERNAL>
2390 * - STAGE_RIGHT: sub_items_t<LEAF/INTERNAL>
2391 *
2392 * The layout (N1, LEAF/INTERNAL) has 3 stages:
2393 * - STAGE_LEFT: node_extent_t<node_fields_1_t, LEAF/INTERNAL>
2394 * - STAGE_STRING: item_iterator_t<LEAF/INTERNAL>
2395 * - STAGE_RIGHT: sub_items_t<LEAF/INTERNAL>
2396 *
2397 * The layout (N2, LEAF/INTERNAL) has 2 stages:
2398 * - STAGE_STRING: node_extent_t<node_fields_2_t, LEAF/INTERNAL>
2399 * - STAGE_RIGHT: sub_items_t<LEAF/INTERNAL>
2400 *
2401 * The layout (N3, LEAF) has 1 stage:
2402 * - STAGE_RIGHT: node_extent_t<leaf_fields_3_t, LEAF>
2403 *
2404 * The layout (N3, INTERNAL) has 1 stage:
2405 * - STAGE_RIGHT: node_extent_t<internal_fields_3_t, INTERNAL>
2406 */
2407
2408 template <node_type_t _NODE_TYPE>
2409 struct staged_params_subitems {
2410 using container_t = sub_items_t<_NODE_TYPE>;
2411 static constexpr auto NODE_TYPE = _NODE_TYPE;
2412 static constexpr auto STAGE = STAGE_RIGHT;
2413
2414 // dummy type in order to make our type system work
2415 // any better solution to get rid of this?
2416 using next_param_t = staged_params_subitems<NODE_TYPE>;
2417 };
2418
2419 template <node_type_t _NODE_TYPE>
2420 struct staged_params_item_iterator {
2421 using container_t = item_iterator_t<_NODE_TYPE>;
2422 static constexpr auto NODE_TYPE = _NODE_TYPE;
2423 static constexpr auto STAGE = STAGE_STRING;
2424
2425 using next_param_t = staged_params_subitems<NODE_TYPE>;
2426 };
2427
2428 template <typename NodeType>
2429 struct staged_params_node_01 {
2430 using container_t = NodeType;
2431 static constexpr auto NODE_TYPE = NodeType::NODE_TYPE;
2432 static constexpr auto STAGE = STAGE_LEFT;
2433
2434 using next_param_t = staged_params_item_iterator<NODE_TYPE>;
2435 };
2436
2437 template <typename NodeType>
2438 struct staged_params_node_2 {
2439 using container_t = NodeType;
2440 static constexpr auto NODE_TYPE = NodeType::NODE_TYPE;
2441 static constexpr auto STAGE = STAGE_STRING;
2442
2443 using next_param_t = staged_params_subitems<NODE_TYPE>;
2444 };
2445
2446 template <typename NodeType>
2447 struct staged_params_node_3 {
2448 using container_t = NodeType;
2449 static constexpr auto NODE_TYPE = NodeType::NODE_TYPE;
2450 static constexpr auto STAGE = STAGE_RIGHT;
2451
2452 // dummy type in order to make our type system work
2453 // any better solution to get rid of this?
2454 using next_param_t = staged_params_node_3<NodeType>;
2455 };
2456
2457 template <typename NodeType, typename Enable = void> struct _node_to_stage_t;
2458 template <typename NodeType>
2459 struct _node_to_stage_t<NodeType,
2460 std::enable_if_t<NodeType::FIELD_TYPE == field_type_t::N0 ||
2461 NodeType::FIELD_TYPE == field_type_t::N1>> {
2462 using type = staged<staged_params_node_01<NodeType>>;
2463 };
2464 template <typename NodeType>
2465 struct _node_to_stage_t<NodeType,
2466 std::enable_if_t<NodeType::FIELD_TYPE == field_type_t::N2>> {
2467 using type = staged<staged_params_node_2<NodeType>>;
2468 };
2469 template <typename NodeType>
2470 struct _node_to_stage_t<NodeType,
2471 std::enable_if_t<NodeType::FIELD_TYPE == field_type_t::N3>> {
2472 using type = staged<staged_params_node_3<NodeType>>;
2473 };
2474 template <typename NodeType>
2475 using node_to_stage_t = typename _node_to_stage_t<NodeType>::type;
2476
2477 }
2478
2479 template<typename T>
2480 concept HasDoFormatTo = requires(T x, std::back_insert_iterator<fmt::memory_buffer> out) {
2481 { x.do_format_to(out, true) } -> std::same_as<decltype(out)>;
2482 };
2483 template <HasDoFormatTo T> struct fmt::formatter<T> : fmt::formatter<std::string_view> {
2484 template <typename FormatContext>
2485 auto format(const T& staged_iterator, FormatContext& ctx) {
2486 return staged_iterator.do_format_to(ctx.out(), true);
2487 }
2488 };