]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.cc
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / crimson / os / seastore / omap_manager / btree / omap_btree_node_impl.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <algorithm>
5 #include <string.h>
6 #include "include/buffer.h"
7 #include "include/byteorder.h"
8 #include "crimson/os/seastore/transaction_manager.h"
9 #include "crimson/os/seastore/omap_manager/btree/omap_btree_node.h"
10 #include "crimson/os/seastore/omap_manager/btree/omap_btree_node_impl.h"
11 #include "seastar/core/thread.hh"
12
13 SET_SUBSYS(seastore_omap);
14
15 namespace crimson::os::seastore::omap_manager {
16
17 std::ostream &operator<<(std::ostream &out, const omap_inner_key_t &rhs)
18 {
19 return out << "omap_inner_key (" << rhs.key_off<< " - " << rhs.key_len
20 << " - " << rhs.laddr << ")";
21 }
22
23 std::ostream &operator<<(std::ostream &out, const omap_leaf_key_t &rhs)
24 {
25 return out << "omap_leaf_key_t (" << rhs.key_off<< " - " << rhs.key_len
26 << " - " << rhs.val_len << ")";
27 }
28
29 std::ostream &OMapInnerNode::print_detail_l(std::ostream &out) const
30 {
31 return out << ", size=" << get_size()
32 << ", depth=" << get_meta().depth;
33 }
34
35 using dec_ref_iertr = OMapInnerNode::base_iertr;
36 using dec_ref_ret = dec_ref_iertr::future<>;
37 template <typename T>
38 dec_ref_ret dec_ref(omap_context_t oc, T&& addr) {
39 return oc.tm.dec_ref(oc.t, std::forward<T>(addr)).handle_error_interruptible(
40 dec_ref_iertr::pass_further{},
41 crimson::ct_error::assert_all{
42 "Invalid error in OMapInnerNode helper dec_ref"
43 }
44 ).si_then([](auto &&e) {});
45 }
46
47 /**
48 * make_split_insert
49 *
50 * insert an entry at iter, with the address of key.
51 * will result in a split outcome encoded in the returned mutation_result_t
52 */
53 OMapInnerNode::make_split_insert_ret
54 OMapInnerNode::make_split_insert(
55 omap_context_t oc,
56 internal_iterator_t iter,
57 std::string key,
58 laddr_t laddr)
59 {
60 LOG_PREFIX(OMapInnerNode::make_split_insert);
61 DEBUGT("this: {}, key: {}", oc.t, *this, key);
62 return make_split_children(oc).si_then([=] (auto tuple) {
63 auto [left, right, pivot] = tuple;
64 if (pivot > key) {
65 auto liter = left->iter_idx(iter.get_index());
66 left->journal_inner_insert(liter, laddr, key,
67 left->maybe_get_delta_buffer());
68 } else { //right
69 auto riter = right->iter_idx(iter.get_index() - left->get_node_size());
70 right->journal_inner_insert(riter, laddr, key,
71 right->maybe_get_delta_buffer());
72 }
73 ++(oc.t.get_omap_tree_stats().extents_num_delta);
74 return make_split_insert_ret(
75 interruptible::ready_future_marker{},
76 mutation_result_t(mutation_status_t::WAS_SPLIT, tuple, std::nullopt));
77 });
78
79 }
80
81
82 OMapInnerNode::handle_split_ret
83 OMapInnerNode::handle_split(
84 omap_context_t oc,
85 internal_iterator_t iter,
86 mutation_result_t mresult)
87 {
88 LOG_PREFIX(OMapInnerNode::handle_split);
89 DEBUGT("this: {}", oc.t, *this);
90 if (!is_mutable()) {
91 auto mut = oc.tm.get_mutable_extent(oc.t, this)->cast<OMapInnerNode>();
92 auto mut_iter = mut->iter_idx(iter.get_index());
93 return mut->handle_split(oc, mut_iter, mresult);
94 }
95 auto [left, right, pivot] = *(mresult.split_tuple);
96 //update operation will not cause node overflow, so we can do it first.
97 journal_inner_update(iter, left->get_laddr(), maybe_get_delta_buffer());
98 bool overflow = extent_will_overflow(pivot.size(), std::nullopt);
99 if (!overflow) {
100 journal_inner_insert(iter + 1, right->get_laddr(), pivot,
101 maybe_get_delta_buffer());
102 return insert_ret(
103 interruptible::ready_future_marker{},
104 mutation_result_t(mutation_status_t::SUCCESS, std::nullopt, std::nullopt));
105 } else {
106 return make_split_insert(oc, iter + 1, pivot, right->get_laddr())
107 .si_then([this, oc] (auto m_result) {
108 return dec_ref(oc, get_laddr())
109 .si_then([m_result = std::move(m_result)] {
110 return insert_ret(
111 interruptible::ready_future_marker{},
112 m_result);
113 });
114 });
115 }
116 }
117
118 OMapInnerNode::get_value_ret
119 OMapInnerNode::get_value(
120 omap_context_t oc,
121 const std::string &key)
122 {
123 LOG_PREFIX(OMapInnerNode::get_value);
124 DEBUGT("key = {}, this: {}", oc.t, key, *this);
125 auto child_pt = get_containing_child(key);
126 assert(child_pt != iter_cend());
127 auto laddr = child_pt->get_val();
128 return omap_load_extent(oc, laddr, get_meta().depth - 1).si_then(
129 [oc, &key] (auto extent) {
130 return extent->get_value(oc, key);
131 }).finally([ref = OMapNodeRef(this)] {});
132 }
133
134 OMapInnerNode::insert_ret
135 OMapInnerNode::insert(
136 omap_context_t oc,
137 const std::string &key,
138 const ceph::bufferlist &value)
139 {
140 LOG_PREFIX(OMapInnerNode::insert);
141 DEBUGT("{}->{}, this: {}", oc.t, key, value, *this);
142 auto child_pt = get_containing_child(key);
143 assert(child_pt != iter_cend());
144 auto laddr = child_pt->get_val();
145 return omap_load_extent(oc, laddr, get_meta().depth - 1).si_then(
146 [oc, &key, &value] (auto extent) {
147 return extent->insert(oc, key, value);
148 }).si_then([this, oc, child_pt] (auto mresult) {
149 if (mresult.status == mutation_status_t::SUCCESS) {
150 return insert_iertr::make_ready_future<mutation_result_t>(mresult);
151 } else if (mresult.status == mutation_status_t::WAS_SPLIT) {
152 return handle_split(oc, child_pt, mresult);
153 } else {
154 return insert_ret(
155 interruptible::ready_future_marker{},
156 mutation_result_t(mutation_status_t::SUCCESS, std::nullopt, std::nullopt));
157 }
158 });
159 }
160
161 OMapInnerNode::rm_key_ret
162 OMapInnerNode::rm_key(omap_context_t oc, const std::string &key)
163 {
164 LOG_PREFIX(OMapInnerNode::rm_key);
165 DEBUGT("key={}, this: {}", oc.t, key, *this);
166 auto child_pt = get_containing_child(key);
167 assert(child_pt != iter_cend());
168 auto laddr = child_pt->get_val();
169 return omap_load_extent(oc, laddr, get_meta().depth - 1).si_then(
170 [this, oc, &key, child_pt] (auto extent) {
171 return extent->rm_key(oc, key)
172 .si_then([this, oc, child_pt, extent = std::move(extent)] (auto mresult) {
173 switch (mresult.status) {
174 case mutation_status_t::SUCCESS:
175 case mutation_status_t::FAIL:
176 return rm_key_iertr::make_ready_future<mutation_result_t>(mresult);
177 case mutation_status_t::NEED_MERGE: {
178 if (get_node_size() >1)
179 return merge_entry(oc, child_pt, *(mresult.need_merge));
180 else
181 return rm_key_ret(
182 interruptible::ready_future_marker{},
183 mutation_result_t(mutation_status_t::SUCCESS,
184 std::nullopt, std::nullopt));
185 }
186 case mutation_status_t::WAS_SPLIT:
187 return handle_split(oc, child_pt, mresult);
188 default:
189 return rm_key_iertr::make_ready_future<mutation_result_t>(mresult);
190 }
191 });
192 });
193 }
194
195 OMapInnerNode::list_ret
196 OMapInnerNode::list(
197 omap_context_t oc,
198 const std::optional<std::string> &first,
199 const std::optional<std::string> &last,
200 omap_list_config_t config)
201 {
202 LOG_PREFIX(OMapInnerNode::list);
203 if (first && last) {
204 DEBUGT("first: {}, last: {}, this: {}", oc.t, *first, *last, *this);
205 assert(*first <= *last);
206 } else if (first) {
207 DEBUGT("first: {}, this: {}", oc.t, *first, *this);
208 } else if (last) {
209 DEBUGT("last: {}, this: {}", oc.t, *last, *this);
210 } else {
211 DEBUGT("this: {}", oc.t, *this);
212 }
213
214 auto first_iter = first ?
215 get_containing_child(*first) :
216 iter_cbegin();
217 auto last_iter = last ?
218 get_containing_child(*last) + 1:
219 iter_cend();
220 assert(first_iter != iter_cend());
221
222 return seastar::do_with(
223 first_iter,
224 last_iter,
225 iter_t(first_iter),
226 list_bare_ret(false, {}),
227 [this, &first, &last, oc, config](
228 auto &fiter,
229 auto &liter,
230 auto &iter,
231 auto &ret)
232 {
233 auto &complete = std::get<0>(ret);
234 auto &result = std::get<1>(ret);
235 return trans_intr::repeat(
236 [&, config, oc, this]() -> list_iertr::future<seastar::stop_iteration>
237 {
238 if (iter == liter || result.size() == config.max_result_size) {
239 complete = iter == liter;
240 return list_iertr::make_ready_future<seastar::stop_iteration>(
241 seastar::stop_iteration::yes);
242 }
243 auto laddr = iter->get_val();
244 return omap_load_extent(
245 oc, laddr,
246 get_meta().depth - 1
247 ).si_then([&, config, oc](auto &&extent) {
248 return seastar::do_with(
249 iter == fiter ? first : std::optional<std::string>(std::nullopt),
250 iter == liter - 1 ? last : std::optional<std::string>(std::nullopt),
251 [&result, extent = std::move(extent), config, oc](
252 auto &nfirst,
253 auto &nlast) {
254 return extent->list(
255 oc,
256 nfirst,
257 nlast,
258 config.with_reduced_max(result.size()));
259 }).si_then([&, config](auto &&child_ret) mutable {
260 boost::ignore_unused(config); // avoid clang warning;
261 auto &[child_complete, child_result] = child_ret;
262 if (result.size() && child_result.size()) {
263 assert(child_result.begin()->first > result.rbegin()->first);
264 }
265 if (child_result.size() && first && iter == fiter) {
266 if (config.first_inclusive) {
267 assert(child_result.begin()->first >= *first);
268 } else {
269 assert(child_result.begin()->first > *first);
270 }
271 }
272 if (child_result.size() && last && iter == liter - 1) {
273 auto biter = --(child_result.end());
274 if (config.last_inclusive) {
275 assert(biter->first <= *last);
276 } else {
277 assert(biter->first < *last);
278 }
279 }
280 result.merge(std::move(child_result));
281 ++iter;
282 assert(child_complete || result.size() == config.max_result_size);
283 return list_iertr::make_ready_future<seastar::stop_iteration>(
284 seastar::stop_iteration::no);
285 });
286 });
287 }).si_then([&ret, ref = OMapNodeRef(this)] {
288 return list_iertr::make_ready_future<list_bare_ret>(std::move(ret));
289 });
290 });
291 }
292
293 OMapInnerNode::clear_ret
294 OMapInnerNode::clear(omap_context_t oc)
295 {
296 LOG_PREFIX(OMapInnerNode::clear);
297 DEBUGT("this: {}", oc.t, *this);
298 return trans_intr::do_for_each(iter_begin(), iter_end(),
299 [oc, this](auto iter) {
300 auto laddr = iter->get_val();
301 auto ndepth = get_meta().depth - 1;
302 if (ndepth > 1) {
303 return omap_load_extent(oc, laddr, ndepth
304 ).si_then([oc](auto &&extent) {
305 return extent->clear(oc);
306 }).si_then([oc, laddr] {
307 return dec_ref(oc, laddr);
308 }).si_then([ref = OMapNodeRef(this)] {
309 return clear_iertr::now();
310 });
311 } else {
312 assert(ndepth == 1);
313 return dec_ref(oc, laddr
314 ).si_then([ref = OMapNodeRef(this)] {
315 return clear_iertr::now();
316 });
317 }
318 });
319 }
320
321 OMapInnerNode::split_children_ret
322 OMapInnerNode:: make_split_children(omap_context_t oc)
323 {
324 LOG_PREFIX(OMapInnerNode::make_split_children);
325 DEBUGT("this: {}", oc.t, *this);
326 return oc.tm.alloc_extents<OMapInnerNode>(oc.t, oc.hint,
327 OMAP_INNER_BLOCK_SIZE, 2)
328 .si_then([this, oc] (auto &&ext_pair) {
329 LOG_PREFIX(OMapInnerNode::make_split_children);
330 auto left = ext_pair.front();
331 auto right = ext_pair.back();
332 DEBUGT("this: {}, split into: l {} r {}", oc.t, *this, *left, *right);
333 return split_children_ret(
334 interruptible::ready_future_marker{},
335 std::make_tuple(left, right, split_into(*left, *right)));
336 });
337 }
338
339 OMapInnerNode::full_merge_ret
340 OMapInnerNode::make_full_merge(omap_context_t oc, OMapNodeRef right)
341 {
342 LOG_PREFIX(OMapInnerNode::make_full_merge);
343 DEBUGT("", oc.t);
344 return oc.tm.alloc_extent<OMapInnerNode>(oc.t, oc.hint,
345 OMAP_INNER_BLOCK_SIZE)
346 .si_then([this, right] (auto &&replacement) {
347 replacement->merge_from(*this, *right->cast<OMapInnerNode>());
348 return full_merge_ret(
349 interruptible::ready_future_marker{},
350 std::move(replacement));
351 });
352 }
353
354 OMapInnerNode::make_balanced_ret
355 OMapInnerNode::make_balanced(omap_context_t oc, OMapNodeRef _right)
356 {
357 LOG_PREFIX(OMapInnerNode::make_balanced);
358 DEBUGT("l: {}, r: {}", oc.t, *this, *_right);
359 ceph_assert(_right->get_type() == TYPE);
360 return oc.tm.alloc_extents<OMapInnerNode>(oc.t, oc.hint,
361 OMAP_INNER_BLOCK_SIZE, 2)
362 .si_then([this, _right] (auto &&replacement_pair){
363 auto replacement_left = replacement_pair.front();
364 auto replacement_right = replacement_pair.back();
365 auto &right = *_right->cast<OMapInnerNode>();
366 return make_balanced_ret(
367 interruptible::ready_future_marker{},
368 std::make_tuple(replacement_left, replacement_right,
369 balance_into_new_nodes(*this, right,
370 *replacement_left, *replacement_right)));
371 });
372 }
373
374 OMapInnerNode::merge_entry_ret
375 OMapInnerNode::merge_entry(
376 omap_context_t oc,
377 internal_iterator_t iter,
378 OMapNodeRef entry)
379 {
380 LOG_PREFIX(OMapInnerNode::merge_entry);
381 DEBUGT("{}, parent: {}", oc.t, *entry, *this);
382 if (!is_mutable()) {
383 auto mut = oc.tm.get_mutable_extent(oc.t, this)->cast<OMapInnerNode>();
384 auto mut_iter = mut->iter_idx(iter->get_index());
385 return mut->merge_entry(oc, mut_iter, entry);
386 }
387 auto is_left = (iter + 1) == iter_cend();
388 auto donor_iter = is_left ? iter - 1 : iter + 1;
389 return omap_load_extent(oc, donor_iter->get_val(), get_meta().depth - 1
390 ).si_then([=, this](auto &&donor) mutable {
391 LOG_PREFIX(OMapInnerNode::merge_entry);
392 auto [l, r] = is_left ?
393 std::make_pair(donor, entry) : std::make_pair(entry, donor);
394 auto [liter, riter] = is_left ?
395 std::make_pair(donor_iter, iter) : std::make_pair(iter, donor_iter);
396 if (l->can_merge(r)) {
397 DEBUGT("make_full_merge l {} r {}", oc.t, *l, *r);
398 assert(entry->extent_is_below_min());
399 return l->make_full_merge(oc, r
400 ).si_then([liter=liter, riter=riter, l=l, r=r, oc, this]
401 (auto &&replacement) {
402 LOG_PREFIX(OMapInnerNode::merge_entry);
403 DEBUGT("to update parent: {}", oc.t, *this);
404 journal_inner_update(
405 liter,
406 replacement->get_laddr(),
407 maybe_get_delta_buffer());
408 journal_inner_remove(riter, maybe_get_delta_buffer());
409 //retire extent
410 std::vector<laddr_t> dec_laddrs {l->get_laddr(), r->get_laddr()};
411 return dec_ref(oc, dec_laddrs
412 ).si_then([this, oc] {
413 --(oc.t.get_omap_tree_stats().extents_num_delta);
414 if (extent_is_below_min()) {
415 return merge_entry_ret(
416 interruptible::ready_future_marker{},
417 mutation_result_t(mutation_status_t::NEED_MERGE,
418 std::nullopt, this));
419 } else {
420 return merge_entry_ret(
421 interruptible::ready_future_marker{},
422 mutation_result_t(mutation_status_t::SUCCESS,
423 std::nullopt, std::nullopt));
424 }
425 });
426 });
427 } else {
428 DEBUGT("balanced l {} r {}", oc.t, *l, *r);
429 return l->make_balanced(oc, r
430 ).si_then([liter=liter, riter=riter, l=l, r=r, oc, this](auto tuple) {
431 LOG_PREFIX(OMapInnerNode::merge_entry);
432 DEBUGT("to update parent: {}", oc.t, *this);
433 auto [replacement_l, replacement_r, replacement_pivot] = tuple;
434 //update operation will not cuase node overflow, so we can do it first
435 journal_inner_update(
436 liter,
437 replacement_l->get_laddr(),
438 maybe_get_delta_buffer());
439 bool overflow = extent_will_overflow(replacement_pivot.size(),
440 std::nullopt);
441 if (!overflow) {
442 journal_inner_remove(riter, maybe_get_delta_buffer());
443 journal_inner_insert(
444 riter,
445 replacement_r->get_laddr(),
446 replacement_pivot,
447 maybe_get_delta_buffer());
448 std::vector<laddr_t> dec_laddrs{l->get_laddr(), r->get_laddr()};
449 return dec_ref(oc, dec_laddrs
450 ).si_then([] {
451 return merge_entry_ret(
452 interruptible::ready_future_marker{},
453 mutation_result_t(mutation_status_t::SUCCESS,
454 std::nullopt, std::nullopt));
455 });
456 } else {
457 DEBUGT("balanced and split {} r {}", oc.t, *l, *r);
458 //use remove and insert to instead of replace,
459 //remove operation will not cause node split, so we can do it first
460 journal_inner_remove(riter, maybe_get_delta_buffer());
461 return make_split_insert(oc, riter, replacement_pivot,
462 replacement_r->get_laddr()
463 ).si_then([this, oc, l = l, r = r](auto mresult) {
464 std::vector<laddr_t> dec_laddrs{
465 l->get_laddr(),
466 r->get_laddr(),
467 get_laddr()};
468 return dec_ref(oc, dec_laddrs
469 ).si_then([mresult = std::move(mresult)] {
470 return merge_entry_ret(
471 interruptible::ready_future_marker{}, mresult);
472 });
473 });
474 }
475 });
476 }
477 });
478
479 }
480
481 OMapInnerNode::internal_iterator_t
482 OMapInnerNode::get_containing_child(const std::string &key)
483 {
484 auto iter = std::find_if(iter_begin(), iter_end(),
485 [&key](auto it) { return it.contains(key); });
486 return iter;
487 }
488
489 std::ostream &OMapLeafNode::print_detail_l(std::ostream &out) const
490 {
491 return out << ", size=" << get_size()
492 << ", depth=" << get_meta().depth;
493 }
494
495 OMapLeafNode::get_value_ret
496 OMapLeafNode::get_value(omap_context_t oc, const std::string &key)
497 {
498 LOG_PREFIX(OMapLeafNode::get_value);
499 DEBUGT("key = {}, this: {}", oc.t, *this, key);
500 auto ite = find_string_key(key);
501 if (ite != iter_end()) {
502 auto value = ite->get_val();
503 return get_value_ret(
504 interruptible::ready_future_marker{},
505 value);
506 } else {
507 return get_value_ret(
508 interruptible::ready_future_marker{},
509 std::nullopt);
510 }
511 }
512
513 OMapLeafNode::insert_ret
514 OMapLeafNode::insert(
515 omap_context_t oc,
516 const std::string &key,
517 const ceph::bufferlist &value)
518 {
519 LOG_PREFIX(OMapLeafNode::insert);
520 DEBUGT("{} -> {}, this: {}", oc.t, key, value, *this);
521 bool overflow = extent_will_overflow(key.size(), value.length());
522 if (!overflow) {
523 if (!is_mutable()) {
524 auto mut = oc.tm.get_mutable_extent(oc.t, this)->cast<OMapLeafNode>();
525 return mut->insert(oc, key, value);
526 }
527 auto replace_pt = find_string_key(key);
528 if (replace_pt != iter_end()) {
529 ++(oc.t.get_omap_tree_stats().num_updates);
530 journal_leaf_update(replace_pt, key, value, maybe_get_delta_buffer());
531 } else {
532 ++(oc.t.get_omap_tree_stats().num_inserts);
533 auto insert_pt = string_lower_bound(key);
534 journal_leaf_insert(insert_pt, key, value, maybe_get_delta_buffer());
535
536 DEBUGT("inserted {}, this: {}", oc.t, insert_pt.get_key(), *this);
537 }
538 return insert_ret(
539 interruptible::ready_future_marker{},
540 mutation_result_t(mutation_status_t::SUCCESS, std::nullopt, std::nullopt));
541 } else {
542 return make_split_children(oc).si_then([this, oc, &key, &value] (auto tuple) {
543 auto [left, right, pivot] = tuple;
544 auto replace_pt = find_string_key(key);
545 if (replace_pt != iter_end()) {
546 ++(oc.t.get_omap_tree_stats().num_updates);
547 if (key < pivot) { //left
548 auto mut_iter = left->iter_idx(replace_pt->get_index());
549 left->journal_leaf_update(mut_iter, key, value, left->maybe_get_delta_buffer());
550 } else if (key >= pivot) { //right
551 auto mut_iter = right->iter_idx(replace_pt->get_index() - left->get_node_size());
552 right->journal_leaf_update(mut_iter, key, value, right->maybe_get_delta_buffer());
553 }
554 } else {
555 ++(oc.t.get_omap_tree_stats().num_inserts);
556 auto insert_pt = string_lower_bound(key);
557 if (key < pivot) { //left
558 auto mut_iter = left->iter_idx(insert_pt->get_index());
559 left->journal_leaf_insert(mut_iter, key, value, left->maybe_get_delta_buffer());
560 } else {
561 auto mut_iter = right->iter_idx(insert_pt->get_index() - left->get_node_size());
562 right->journal_leaf_insert(mut_iter, key, value, right->maybe_get_delta_buffer());
563 }
564 }
565 ++(oc.t.get_omap_tree_stats().extents_num_delta);
566 return dec_ref(oc, get_laddr())
567 .si_then([tuple = std::move(tuple)] {
568 return insert_ret(
569 interruptible::ready_future_marker{},
570 mutation_result_t(mutation_status_t::WAS_SPLIT, tuple, std::nullopt));
571 });
572 });
573 }
574 }
575
576 OMapLeafNode::rm_key_ret
577 OMapLeafNode::rm_key(omap_context_t oc, const std::string &key)
578 {
579 LOG_PREFIX(OMapLeafNode::rm_key);
580 DEBUGT("{}, this: {}", oc.t, key, *this);
581 auto rm_pt = find_string_key(key);
582 if (!is_mutable() && rm_pt != iter_end()) {
583 auto mut = oc.tm.get_mutable_extent(oc.t, this)->cast<OMapLeafNode>();
584 return mut->rm_key(oc, key);
585 }
586
587 if (rm_pt != iter_end()) {
588 ++(oc.t.get_omap_tree_stats().num_erases);
589 journal_leaf_remove(rm_pt, maybe_get_delta_buffer());
590 if (extent_is_below_min()) {
591 return rm_key_ret(
592 interruptible::ready_future_marker{},
593 mutation_result_t(mutation_status_t::NEED_MERGE, std::nullopt,
594 this->cast<OMapNode>()));
595 } else {
596 return rm_key_ret(
597 interruptible::ready_future_marker{},
598 mutation_result_t(mutation_status_t::SUCCESS, std::nullopt, std::nullopt));
599 }
600 } else {
601 return rm_key_ret(
602 interruptible::ready_future_marker{},
603 mutation_result_t(mutation_status_t::FAIL, std::nullopt, std::nullopt));
604 }
605
606 }
607
608 OMapLeafNode::list_ret
609 OMapLeafNode::list(
610 omap_context_t oc,
611 const std::optional<std::string> &first,
612 const std::optional<std::string> &last,
613 omap_list_config_t config)
614 {
615 LOG_PREFIX(OMapLeafNode::list);
616 DEBUGT(
617 "first {} last {} max_result_size {} first_inclusive {} \
618 last_inclusive {}, this: {}",
619 oc.t,
620 first ? first->c_str() : "",
621 last ? last->c_str() : "",
622 config.max_result_size,
623 config.first_inclusive,
624 config.last_inclusive,
625 *this
626 );
627 auto ret = list_bare_ret(false, {});
628 auto &[complete, result] = ret;
629 auto iter = first ?
630 (config.first_inclusive ?
631 string_lower_bound(*first) :
632 string_upper_bound(*first)) :
633 iter_begin();
634 auto liter = last ?
635 (config.last_inclusive ?
636 string_upper_bound(*last) :
637 string_lower_bound(*last)) :
638 iter_end();
639
640 for (; iter != liter && result.size() < config.max_result_size; iter++) {
641 result.emplace(std::make_pair(iter->get_key(), iter->get_val()));
642 }
643
644 complete = (iter == liter);
645
646 return list_iertr::make_ready_future<list_bare_ret>(
647 std::move(ret));
648 }
649
650 OMapLeafNode::clear_ret
651 OMapLeafNode::clear(omap_context_t oc)
652 {
653 return clear_iertr::now();
654 }
655
656 OMapLeafNode::split_children_ret
657 OMapLeafNode::make_split_children(omap_context_t oc)
658 {
659 LOG_PREFIX(OMapLeafNode::make_split_children);
660 DEBUGT("this: {}", oc.t, *this);
661 return oc.tm.alloc_extents<OMapLeafNode>(oc.t, oc.hint, OMAP_LEAF_BLOCK_SIZE, 2)
662 .si_then([this] (auto &&ext_pair) {
663 auto left = ext_pair.front();
664 auto right = ext_pair.back();
665 return split_children_ret(
666 interruptible::ready_future_marker{},
667 std::make_tuple(left, right, split_into(*left, *right)));
668 });
669 }
670
671 OMapLeafNode::full_merge_ret
672 OMapLeafNode::make_full_merge(omap_context_t oc, OMapNodeRef right)
673 {
674 ceph_assert(right->get_type() == TYPE);
675 LOG_PREFIX(OMapLeafNode::make_full_merge);
676 DEBUGT("this: {}", oc.t, *this);
677 return oc.tm.alloc_extent<OMapLeafNode>(oc.t, oc.hint, OMAP_LEAF_BLOCK_SIZE)
678 .si_then([this, right] (auto &&replacement) {
679 replacement->merge_from(*this, *right->cast<OMapLeafNode>());
680 return full_merge_ret(
681 interruptible::ready_future_marker{},
682 std::move(replacement));
683 });
684 }
685
686 OMapLeafNode::make_balanced_ret
687 OMapLeafNode::make_balanced(omap_context_t oc, OMapNodeRef _right)
688 {
689 ceph_assert(_right->get_type() == TYPE);
690 LOG_PREFIX(OMapLeafNode::make_balanced);
691 DEBUGT("this: {}", oc.t, *this);
692 return oc.tm.alloc_extents<OMapLeafNode>(oc.t, oc.hint, OMAP_LEAF_BLOCK_SIZE, 2)
693 .si_then([this, _right] (auto &&replacement_pair) {
694 auto replacement_left = replacement_pair.front();
695 auto replacement_right = replacement_pair.back();
696 auto &right = *_right->cast<OMapLeafNode>();
697 return make_balanced_ret(
698 interruptible::ready_future_marker{},
699 std::make_tuple(
700 replacement_left, replacement_right,
701 balance_into_new_nodes(
702 *this, right,
703 *replacement_left, *replacement_right)));
704 });
705 }
706
707
708 omap_load_extent_iertr::future<OMapNodeRef>
709 omap_load_extent(omap_context_t oc, laddr_t laddr, depth_t depth)
710 {
711 ceph_assert(depth > 0);
712 if (depth > 1) {
713 return oc.tm.read_extent<OMapInnerNode>(oc.t, laddr,
714 OMAP_INNER_BLOCK_SIZE)
715 .handle_error_interruptible(
716 omap_load_extent_iertr::pass_further{},
717 crimson::ct_error::assert_all{ "Invalid error in omap_load_extent" }
718 ).si_then(
719 [](auto&& e) {
720 return seastar::make_ready_future<OMapNodeRef>(std::move(e));
721 });
722 } else {
723 return oc.tm.read_extent<OMapLeafNode>(oc.t, laddr, OMAP_LEAF_BLOCK_SIZE
724 ).handle_error_interruptible(
725 omap_load_extent_iertr::pass_further{},
726 crimson::ct_error::assert_all{ "Invalid error in omap_load_extent" }
727 ).si_then(
728 [](auto&& e) {
729 return seastar::make_ready_future<OMapNodeRef>(std::move(e));
730 });
731 }
732 }
733 }