1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
7 #include "crimson/common/log.h"
9 #include "crimson/os/seastore/object_data_handler.h"
12 seastar::logger
& logger() {
13 return crimson::get_logger(ceph_subsys_seastore_odata
);
17 SET_SUBSYS(seastore_odata
);
19 namespace crimson::os::seastore
{
20 #define assert_aligned(x) ceph_assert(((x)%ctx.tm.get_block_size()) == 0)
22 using context_t
= ObjectDataHandler::context_t
;
23 using get_iertr
= ObjectDataHandler::write_iertr
;
28 * Encapsulates smallest write operations in overwrite.
29 * Indicates a zero/existing extent or a data extent based on whether
30 * to_write is populate.
31 * Should be handled by prepare_ops_list.
33 struct extent_to_write_t
{
41 /// pin of original extent, not nullptr if type == EXISTING
47 /// non-nullopt if and only if type == DATA
48 std::optional
<bufferlist
> to_write
;
50 extent_to_write_t(const extent_to_write_t
&) = delete;
51 extent_to_write_t(extent_to_write_t
&&) = default;
53 bool is_data() const {
54 return type
== type_t::DATA
;
57 bool is_zero() const {
58 return type
== type_t::ZERO
;
61 bool is_existing() const {
62 return type
== type_t::EXISTING
;
65 laddr_t
get_end_addr() const {
69 static extent_to_write_t
create_data(
70 laddr_t addr
, bufferlist to_write
) {
71 return extent_to_write_t(addr
, to_write
);
74 static extent_to_write_t
create_zero(
75 laddr_t addr
, extent_len_t len
) {
76 return extent_to_write_t(addr
, len
);
79 static extent_to_write_t
create_existing(
80 LBAMappingRef
&&pin
, laddr_t addr
, extent_len_t len
) {
82 return extent_to_write_t(std::move(pin
), addr
, len
);
86 extent_to_write_t(laddr_t addr
, bufferlist to_write
)
87 : type(type_t::DATA
), addr(addr
), len(to_write
.length()),
90 extent_to_write_t(laddr_t addr
, extent_len_t len
)
91 : type(type_t::ZERO
), addr(addr
), len(len
) {}
93 extent_to_write_t(LBAMappingRef
&&pin
, laddr_t addr
, extent_len_t len
)
94 : type(type_t::EXISTING
), pin(std::move(pin
)), addr(addr
), len(len
) {}
96 using extent_to_write_list_t
= std::list
<extent_to_write_t
>;
98 // Encapsulates extents to be written out using do_remappings.
99 struct extent_to_remap_t
{
105 /// pin of original extent
107 /// offset of remapped extent or overwrite part of overwrite extent.
108 /// overwrite part of overwrite extent might correspond to mutiple
109 /// fresh write extent.
110 extent_len_t new_offset
;
111 /// length of remapped extent or overwrite part of overwrite extent
112 extent_len_t new_len
;
114 extent_to_remap_t(const extent_to_remap_t
&) = delete;
115 extent_to_remap_t(extent_to_remap_t
&&) = default;
117 bool is_remap() const {
118 return type
== type_t::REMAP
;
121 bool is_overwrite() const {
122 assert((new_offset
!= 0) && (pin
->get_length() != new_offset
+ new_len
));
123 return type
== type_t::OVERWRITE
;
126 using remap_entry
= TransactionManager::remap_entry
;
127 remap_entry
create_remap_entry() {
134 remap_entry
create_left_remap_entry() {
135 assert(is_overwrite());
141 remap_entry
create_right_remap_entry() {
142 assert(is_overwrite());
144 new_offset
+ new_len
,
145 pin
->get_length() - new_offset
- new_len
);
148 static extent_to_remap_t
create_remap(
149 LBAMappingRef
&&pin
, extent_len_t new_offset
, extent_len_t new_len
) {
150 return extent_to_remap_t(type_t::REMAP
,
151 std::move(pin
), new_offset
, new_len
);
154 static extent_to_remap_t
create_overwrite(
155 LBAMappingRef
&&pin
, extent_len_t new_offset
, extent_len_t new_len
) {
156 return extent_to_remap_t(type_t::OVERWRITE
,
157 std::move(pin
), new_offset
, new_len
);
161 extent_to_remap_t(type_t type
,
162 LBAMappingRef
&&pin
, extent_len_t new_offset
, extent_len_t new_len
)
164 pin(std::move(pin
)), new_offset(new_offset
), new_len(new_len
) {}
166 using extent_to_remap_list_t
= std::list
<extent_to_remap_t
>;
168 // Encapsulates extents to be written out using do_insertions.
169 struct extent_to_insert_t
{
175 /// laddr of new extent
177 /// length of new extent
179 /// non-nullopt if type == DATA
180 std::optional
<bufferlist
> bl
;
182 extent_to_insert_t(const extent_to_insert_t
&) = default;
183 extent_to_insert_t(extent_to_insert_t
&&) = default;
185 bool is_data() const {
186 return type
== type_t::DATA
;
189 bool is_zero() const {
190 return type
== type_t::ZERO
;
193 static extent_to_insert_t
create_data(
194 laddr_t addr
, extent_len_t len
, std::optional
<bufferlist
> bl
) {
195 return extent_to_insert_t(addr
, len
, bl
);
198 static extent_to_insert_t
create_zero(
199 laddr_t addr
, extent_len_t len
) {
200 return extent_to_insert_t(addr
, len
);
204 extent_to_insert_t(laddr_t addr
, extent_len_t len
,
205 std::optional
<bufferlist
> bl
)
206 :type(type_t::DATA
), addr(addr
), len(len
), bl(bl
) {}
208 extent_to_insert_t(laddr_t addr
, extent_len_t len
)
209 :type(type_t::ZERO
), addr(addr
), len(len
) {}
211 using extent_to_insert_list_t
= std::list
<extent_to_insert_t
>;
213 // Encapsulates extents to be retired in do_removals.
214 using extent_to_remove_list_t
= std::list
<LBAMappingRef
>;
216 struct overwrite_ops_t
{
217 extent_to_remap_list_t to_remap
;
218 extent_to_insert_list_t to_insert
;
219 extent_to_remove_list_t to_remove
;
222 // prepare to_remap, to_retire, to_insert list
223 overwrite_ops_t
prepare_ops_list(
224 lba_pin_list_t
&pins_to_remove
,
225 extent_to_write_list_t
&to_write
) {
226 assert(pins_to_remove
.size() != 0);
228 ops
.to_remove
.swap(pins_to_remove
);
229 if (to_write
.empty()) {
230 logger().debug("empty to_write");
233 long unsigned int visitted
= 0;
234 auto& front
= to_write
.front();
235 auto& back
= to_write
.back();
237 // prepare overwrite, happens in one original extent.
238 if (ops
.to_remove
.size() == 1 &&
239 front
.is_existing() && back
.is_existing()) {
241 assert(to_write
.size() > 2);
242 assert(front
.addr
== front
.pin
->get_key());
243 assert(back
.addr
> back
.pin
->get_key());
244 ops
.to_remap
.push_back(extent_to_remap_t::create_overwrite(
245 std::move(front
.pin
),
247 back
.addr
- front
.addr
- front
.len
));
248 ops
.to_remove
.pop_front();
250 // prepare to_remap, happens in one or multiple extents
251 if (front
.is_existing()) {
253 assert(to_write
.size() > 1);
254 assert(front
.addr
== front
.pin
->get_key());
255 ops
.to_remap
.push_back(extent_to_remap_t::create_remap(
256 std::move(front
.pin
),
259 ops
.to_remove
.pop_front();
261 if (back
.is_existing()) {
263 assert(to_write
.size() > 1);
264 assert(back
.addr
+ back
.len
==
265 back
.pin
->get_key() + back
.pin
->get_length());
266 ops
.to_remap
.push_back(extent_to_remap_t::create_remap(
268 back
.addr
- back
.pin
->get_key(),
270 ops
.to_remove
.pop_back();
275 for (auto ®ion
: to_write
) {
276 if (region
.is_data()) {
278 assert(region
.to_write
.has_value());
279 ops
.to_insert
.push_back(extent_to_insert_t::create_data(
280 region
.addr
, region
.len
, region
.to_write
));
281 } else if (region
.is_zero()) {
283 assert(!(region
.to_write
.has_value()));
284 ops
.to_insert
.push_back(extent_to_insert_t::create_zero(
285 region
.addr
, region
.len
));
290 "to_remap list size: {}"
291 " to_insert list size: {}"
292 " to_remove list size: {}",
293 ops
.to_remap
.size(), ops
.to_insert
.size(), ops
.to_remove
.size());
294 assert(visitted
== to_write
.size());
299 * append_extent_to_write
301 * Appends passed extent_to_write_t maintaining invariant that the
302 * list may not contain consecutive zero elements by checking and
305 void append_extent_to_write(
306 extent_to_write_list_t
&to_write
, extent_to_write_t
&&to_append
)
308 assert(to_write
.empty() ||
309 to_write
.back().get_end_addr() == to_append
.addr
);
310 if (to_write
.empty() ||
311 to_write
.back().is_data() ||
312 to_append
.is_data() ||
313 to_write
.back().type
!= to_append
.type
) {
314 to_write
.push_back(std::move(to_append
));
316 to_write
.back().len
+= to_append
.len
;
321 * splice_extent_to_write
323 * splices passed extent_to_write_list_t maintaining invariant that the
324 * list may not contain consecutive zero elements by checking and
327 void splice_extent_to_write(
328 extent_to_write_list_t
&to_write
, extent_to_write_list_t
&&to_splice
)
330 if (!to_splice
.empty()) {
331 append_extent_to_write(to_write
, std::move(to_splice
.front()));
332 to_splice
.pop_front();
333 to_write
.splice(to_write
.end(), std::move(to_splice
));
337 /// Creates remap extents in to_remap
338 ObjectDataHandler::write_ret
do_remappings(
340 extent_to_remap_list_t
&to_remap
)
342 return trans_intr::do_for_each(
344 [ctx
](auto ®ion
) {
345 if (region
.is_remap()) {
346 return ctx
.tm
.remap_pin
<ObjectDataBlock
, 1>(
348 std::move(region
.pin
),
350 region
.create_remap_entry()
352 ).si_then([®ion
](auto pins
) {
353 ceph_assert(pins
.size() == 1);
354 ceph_assert(region
.new_len
== pins
[0]->get_length());
355 return ObjectDataHandler::write_iertr::now();
357 } else if (region
.is_overwrite()) {
358 return ctx
.tm
.remap_pin
<ObjectDataBlock
, 2>(
360 std::move(region
.pin
),
362 region
.create_left_remap_entry(),
363 region
.create_right_remap_entry()
365 ).si_then([®ion
](auto pins
) {
366 ceph_assert(pins
.size() == 2);
367 ceph_assert(region
.pin
->get_key() == pins
[0]->get_key());
368 ceph_assert(region
.pin
->get_key() + pins
[0]->get_length() +
369 region
.new_len
== pins
[1]->get_key());
370 return ObjectDataHandler::write_iertr::now();
373 ceph_abort("impossible");
374 return ObjectDataHandler::write_iertr::now();
379 ObjectDataHandler::write_ret
do_removals(
381 lba_pin_list_t
&to_remove
)
383 return trans_intr::do_for_each(
386 LOG_PREFIX(object_data_handler
.cc::do_removals
);
387 DEBUGT("decreasing ref: {}",
390 return ctx
.tm
.dec_ref(
395 ObjectDataHandler::write_iertr::pass_further
{},
396 crimson::ct_error::assert_all
{
397 "object_data_handler::do_removals invalid error"
403 /// Creates zero/data extents in to_insert
404 ObjectDataHandler::write_ret
do_insertions(
406 extent_to_insert_list_t
&to_insert
)
408 return trans_intr::do_for_each(
410 [ctx
](auto ®ion
) {
411 LOG_PREFIX(object_data_handler
.cc::do_insertions
);
412 if (region
.is_data()) {
413 assert_aligned(region
.addr
);
414 assert_aligned(region
.len
);
415 ceph_assert(region
.len
== region
.bl
->length());
416 DEBUGT("allocating extent: {}~{}",
420 return ctx
.tm
.alloc_extent
<ObjectDataBlock
>(
424 ).si_then([®ion
](auto extent
) {
425 if (extent
->get_laddr() != region
.addr
) {
427 "object_data_handler::do_insertions alloc got addr {},"
428 " should have been {}",
432 ceph_assert(extent
->get_laddr() == region
.addr
);
433 ceph_assert(extent
->get_length() == region
.len
);
434 auto iter
= region
.bl
->cbegin();
435 iter
.copy(region
.len
, extent
->get_bptr().c_str());
436 return ObjectDataHandler::write_iertr::now();
438 } else if (region
.is_zero()) {
439 DEBUGT("reserving: {}~{}",
443 return ctx
.tm
.reserve_region(
447 ).si_then([FNAME
, ctx
, ®ion
](auto pin
) {
448 ceph_assert(pin
->get_length() == region
.len
);
449 if (pin
->get_key() != region
.addr
) {
451 "inconsistent laddr: pin: {} region {}",
456 ceph_assert(pin
->get_key() == region
.addr
);
457 return ObjectDataHandler::write_iertr::now();
460 ceph_abort("impossible");
461 return ObjectDataHandler::write_iertr::now();
466 enum class overwrite_operation_t
{
468 OVERWRITE_ZERO
, // fill unaligned data with zero
469 MERGE_EXISTING
, // if present, merge data with the clean/pending extent
470 SPLIT_EXISTING
, // split the existing extent, and fill unaligned data
473 std::ostream
& operator<<(
475 const overwrite_operation_t
&operation
)
478 case overwrite_operation_t::UNKNOWN
:
479 return out
<< "UNKNOWN";
480 case overwrite_operation_t::OVERWRITE_ZERO
:
481 return out
<< "OVERWRITE_ZERO";
482 case overwrite_operation_t::MERGE_EXISTING
:
483 return out
<< "MERGE_EXISTING";
484 case overwrite_operation_t::SPLIT_EXISTING
:
485 return out
<< "SPLIT_EXISTING";
487 return out
<< "!IMPOSSIBLE_OPERATION";
494 * |<--------------------------pins_size---------------------------------------------->|
495 * pin_begin(aligned) pin_end(aligned)
496 * |<------aligned_data_size-------------------------->| (aligned-bl)
497 * aligned_data_begin aligned_data_end
498 * |<-data_size->| (bl)
501 * |<l_extent_size>|<l_alignment_size>| |<r_alignment_size>|<r_extent_size>|
502 * |<-----------left_size------------>| |<-----------right_size----------->|
504 * |<-----(existing left extent/pin)----->| |<-----(existing right extent/pin)----->|
505 * left_paddr right_paddr
507 struct overwrite_plan_t
{
515 laddr_t aligned_data_begin
;
516 laddr_t aligned_data_end
;
519 overwrite_operation_t left_operation
;
520 overwrite_operation_t right_operation
;
523 extent_len_t block_size
;
526 extent_len_t
get_left_size() const {
527 return data_begin
- pin_begin
;
530 extent_len_t
get_left_extent_size() const {
531 return aligned_data_begin
- pin_begin
;
534 extent_len_t
get_left_alignment_size() const {
535 return data_begin
- aligned_data_begin
;
538 extent_len_t
get_right_size() const {
539 return pin_end
- data_end
;
542 extent_len_t
get_right_extent_size() const {
543 return pin_end
- aligned_data_end
;
546 extent_len_t
get_right_alignment_size() const {
547 return aligned_data_end
- data_end
;
550 extent_len_t
get_aligned_data_size() const {
551 return aligned_data_end
- aligned_data_begin
;
554 extent_len_t
get_pins_size() const {
555 return pin_end
- pin_begin
;
558 friend std::ostream
& operator<<(
560 const overwrite_plan_t
& overwrite_plan
) {
561 return out
<< "overwrite_plan_t("
562 << "pin_begin=" << overwrite_plan
.pin_begin
563 << ", pin_end=" << overwrite_plan
.pin_end
564 << ", left_paddr=" << overwrite_plan
.left_paddr
565 << ", right_paddr=" << overwrite_plan
.right_paddr
566 << ", data_begin=" << overwrite_plan
.data_begin
567 << ", data_end=" << overwrite_plan
.data_end
568 << ", aligned_data_begin=" << overwrite_plan
.aligned_data_begin
569 << ", aligned_data_end=" << overwrite_plan
.aligned_data_end
570 << ", left_operation=" << overwrite_plan
.left_operation
571 << ", right_operation=" << overwrite_plan
.right_operation
572 << ", block_size=" << overwrite_plan
.block_size
576 overwrite_plan_t(laddr_t offset
,
578 const lba_pin_list_t
& pins
,
579 extent_len_t block_size
,
581 pin_begin(pins
.front()->get_key()),
582 pin_end(pins
.back()->get_key() + pins
.back()->get_length()),
583 left_paddr(pins
.front()->get_val()),
584 right_paddr(pins
.back()->get_val()),
586 data_end(offset
+ len
),
587 aligned_data_begin(p2align((uint64_t)data_begin
, (uint64_t)block_size
)),
588 aligned_data_end(p2roundup((uint64_t)data_end
, (uint64_t)block_size
)),
589 left_operation(overwrite_operation_t::UNKNOWN
),
590 right_operation(overwrite_operation_t::UNKNOWN
),
591 block_size(block_size
) {
593 evaluate_operations(t
);
594 assert(left_operation
!= overwrite_operation_t::UNKNOWN
);
595 assert(right_operation
!= overwrite_operation_t::UNKNOWN
);
599 // refer to overwrite_plan_t description
600 void validate() const {
601 ceph_assert(pin_begin
% block_size
== 0);
602 ceph_assert(pin_end
% block_size
== 0);
603 ceph_assert(aligned_data_begin
% block_size
== 0);
604 ceph_assert(aligned_data_end
% block_size
== 0);
606 ceph_assert(pin_begin
<= aligned_data_begin
);
607 ceph_assert(aligned_data_begin
<= data_begin
);
608 ceph_assert(data_begin
<= data_end
);
609 ceph_assert(data_end
<= aligned_data_end
);
610 ceph_assert(aligned_data_end
<= pin_end
);
614 * When trying to modify a portion of an object data block, follow
615 * the read-full-extent-then-merge-new-data strategy, if the write
616 * amplification caused by it is not greater than
617 * seastore_obj_data_write_amplification; otherwise, split the
618 * original extent into at most three parts: origin-left, part-to-be-modified
621 void evaluate_operations(Transaction
& t
) {
622 auto actual_write_size
= get_pins_size();
623 auto aligned_data_size
= get_aligned_data_size();
624 auto left_ext_size
= get_left_extent_size();
625 auto right_ext_size
= get_right_extent_size();
627 auto can_merge
= [](Transaction
& t
, paddr_t paddr
) {
629 if (paddr
.is_relative() || paddr
.is_delayed()) {
631 } else if (t
.get_extent(paddr
, &ext
) ==
632 Transaction::get_extent_ret::PRESENT
) {
633 // FIXME: there is no need to lookup the cache if the pin can
634 // be associated with the extent state
635 if (ext
->is_mutable()) {
641 if (left_paddr
.is_zero()) {
642 actual_write_size
-= left_ext_size
;
644 left_operation
= overwrite_operation_t::OVERWRITE_ZERO
;
645 } else if (can_merge(t
, left_paddr
)) {
646 aligned_data_size
+= left_ext_size
;
648 left_operation
= overwrite_operation_t::MERGE_EXISTING
;
651 if (right_paddr
.is_zero()) {
652 actual_write_size
-= right_ext_size
;
654 right_operation
= overwrite_operation_t::OVERWRITE_ZERO
;
655 } else if (can_merge(t
, right_paddr
)) {
656 aligned_data_size
+= right_ext_size
;
658 right_operation
= overwrite_operation_t::MERGE_EXISTING
;
661 while (left_operation
== overwrite_operation_t::UNKNOWN
||
662 right_operation
== overwrite_operation_t::UNKNOWN
) {
663 if (((double)actual_write_size
/ (double)aligned_data_size
) <=
664 crimson::common::get_conf
<double>("seastore_obj_data_write_amplification")) {
667 if (left_ext_size
== 0 && right_ext_size
== 0) {
670 if (left_ext_size
>= right_ext_size
) {
672 assert(left_operation
== overwrite_operation_t::UNKNOWN
);
673 actual_write_size
-= left_ext_size
;
675 left_operation
= overwrite_operation_t::SPLIT_EXISTING
;
676 } else { // left_ext_size < right_ext_size
678 assert(right_operation
== overwrite_operation_t::UNKNOWN
);
679 actual_write_size
-= right_ext_size
;
681 right_operation
= overwrite_operation_t::SPLIT_EXISTING
;
685 if (left_operation
== overwrite_operation_t::UNKNOWN
) {
686 // no split left, so merge with left
687 left_operation
= overwrite_operation_t::MERGE_EXISTING
;
690 if (right_operation
== overwrite_operation_t::UNKNOWN
) {
691 // no split right, so merge with right
692 right_operation
= overwrite_operation_t::MERGE_EXISTING
;
697 } // namespace crimson::os::seastore
699 #if FMT_VERSION >= 90000
700 template<> struct fmt::formatter
<crimson::os::seastore::overwrite_plan_t
> : fmt::ostream_formatter
{};
703 namespace crimson::os::seastore
{
708 * Proceed overwrite_plan.left_operation.
710 using operate_ret_bare
= std::pair
<
711 std::optional
<extent_to_write_t
>,
712 std::optional
<bufferptr
>>;
713 using operate_ret
= get_iertr::future
<operate_ret_bare
>;
714 operate_ret
operate_left(context_t ctx
, LBAMappingRef
&pin
, const overwrite_plan_t
&overwrite_plan
)
716 if (overwrite_plan
.get_left_size() == 0) {
717 return get_iertr::make_ready_future
<operate_ret_bare
>(
722 if (overwrite_plan
.left_operation
== overwrite_operation_t::OVERWRITE_ZERO
) {
723 assert(pin
->get_val().is_zero());
724 auto zero_extent_len
= overwrite_plan
.get_left_extent_size();
725 assert_aligned(zero_extent_len
);
726 auto zero_prepend_len
= overwrite_plan
.get_left_alignment_size();
727 return get_iertr::make_ready_future
<operate_ret_bare
>(
728 (zero_extent_len
== 0
730 : std::make_optional(extent_to_write_t::create_zero(
731 overwrite_plan
.pin_begin
, zero_extent_len
))),
732 (zero_prepend_len
== 0
734 : std::make_optional(bufferptr(
735 ceph::buffer::create(zero_prepend_len
, 0))))
737 } else if (overwrite_plan
.left_operation
== overwrite_operation_t::MERGE_EXISTING
) {
738 auto prepend_len
= overwrite_plan
.get_left_size();
739 if (prepend_len
== 0) {
740 return get_iertr::make_ready_future
<operate_ret_bare
>(
744 extent_len_t off
= pin
->get_intermediate_offset();
745 return ctx
.tm
.read_pin
<ObjectDataBlock
>(
746 ctx
.t
, pin
->duplicate()
747 ).si_then([prepend_len
, off
](auto left_extent
) {
748 return get_iertr::make_ready_future
<operate_ret_bare
>(
750 std::make_optional(bufferptr(
751 left_extent
->get_bptr(),
757 assert(overwrite_plan
.left_operation
== overwrite_operation_t::SPLIT_EXISTING
);
759 auto extent_len
= overwrite_plan
.get_left_extent_size();
761 std::optional
<extent_to_write_t
> left_to_write_extent
=
762 std::make_optional(extent_to_write_t::create_existing(
767 auto prepend_len
= overwrite_plan
.get_left_alignment_size();
768 if (prepend_len
== 0) {
769 return get_iertr::make_ready_future
<operate_ret_bare
>(
770 std::move(left_to_write_extent
),
773 extent_len_t off
= pin
->get_intermediate_offset();
774 return ctx
.tm
.read_pin
<ObjectDataBlock
>(
775 ctx
.t
, pin
->duplicate()
776 ).si_then([prepend_offset
=extent_len
+ off
, prepend_len
,
777 left_to_write_extent
=std::move(left_to_write_extent
)]
778 (auto left_extent
) mutable {
779 return get_iertr::make_ready_future
<operate_ret_bare
>(
780 std::move(left_to_write_extent
),
781 std::make_optional(bufferptr(
782 left_extent
->get_bptr(),
793 * Proceed overwrite_plan.right_operation.
795 operate_ret
operate_right(context_t ctx
, LBAMappingRef
&pin
, const overwrite_plan_t
&overwrite_plan
)
797 if (overwrite_plan
.get_right_size() == 0) {
798 return get_iertr::make_ready_future
<operate_ret_bare
>(
803 auto right_pin_begin
= pin
->get_key();
804 assert(overwrite_plan
.data_end
>= right_pin_begin
);
805 if (overwrite_plan
.right_operation
== overwrite_operation_t::OVERWRITE_ZERO
) {
806 assert(pin
->get_val().is_zero());
807 auto zero_suffix_len
= overwrite_plan
.get_right_alignment_size();
808 auto zero_extent_len
= overwrite_plan
.get_right_extent_size();
809 assert_aligned(zero_extent_len
);
810 return get_iertr::make_ready_future
<operate_ret_bare
>(
811 (zero_extent_len
== 0
813 : std::make_optional(extent_to_write_t::create_zero(
814 overwrite_plan
.aligned_data_end
, zero_extent_len
))),
815 (zero_suffix_len
== 0
817 : std::make_optional(bufferptr(
818 ceph::buffer::create(zero_suffix_len
, 0))))
820 } else if (overwrite_plan
.right_operation
== overwrite_operation_t::MERGE_EXISTING
) {
821 auto append_len
= overwrite_plan
.get_right_size();
822 if (append_len
== 0) {
823 return get_iertr::make_ready_future
<operate_ret_bare
>(
828 overwrite_plan
.data_end
830 + pin
->get_intermediate_offset();
831 return ctx
.tm
.read_pin
<ObjectDataBlock
>(
832 ctx
.t
, pin
->duplicate()
833 ).si_then([append_offset
, append_len
](auto right_extent
) {
834 return get_iertr::make_ready_future
<operate_ret_bare
>(
836 std::make_optional(bufferptr(
837 right_extent
->get_bptr(),
843 assert(overwrite_plan
.right_operation
== overwrite_operation_t::SPLIT_EXISTING
);
845 auto extent_len
= overwrite_plan
.get_right_extent_size();
847 std::optional
<extent_to_write_t
> right_to_write_extent
=
848 std::make_optional(extent_to_write_t::create_existing(
850 overwrite_plan
.aligned_data_end
,
853 auto append_len
= overwrite_plan
.get_right_alignment_size();
854 if (append_len
== 0) {
855 return get_iertr::make_ready_future
<operate_ret_bare
>(
856 std::move(right_to_write_extent
),
860 overwrite_plan
.data_end
862 + pin
->get_intermediate_offset();
863 return ctx
.tm
.read_pin
<ObjectDataBlock
>(
864 ctx
.t
, pin
->duplicate()
865 ).si_then([append_offset
, append_len
,
866 right_to_write_extent
=std::move(right_to_write_extent
)]
867 (auto right_extent
) mutable {
868 return get_iertr::make_ready_future
<operate_ret_bare
>(
869 std::move(right_to_write_extent
),
870 std::make_optional(bufferptr(
871 right_extent
->get_bptr(),
879 template <typename F
>
880 auto with_object_data(
881 ObjectDataHandler::context_t ctx
,
884 return seastar::do_with(
885 ctx
.onode
.get_layout().object_data
.get(),
887 [ctx
](auto &object_data
, auto &f
) {
888 return std::invoke(f
, object_data
889 ).si_then([ctx
, &object_data
] {
890 if (object_data
.must_update()) {
891 ctx
.onode
.get_mutable_layout(ctx
.t
).object_data
.update(object_data
);
893 return seastar::now();
898 template <typename F
>
899 auto with_objects_data(
900 ObjectDataHandler::context_t ctx
,
903 ceph_assert(ctx
.d_onode
);
904 return seastar::do_with(
905 ctx
.onode
.get_layout().object_data
.get(),
906 ctx
.d_onode
->get_layout().object_data
.get(),
908 [ctx
](auto &object_data
, auto &d_object_data
, auto &f
) {
909 return std::invoke(f
, object_data
, d_object_data
910 ).si_then([ctx
, &object_data
, &d_object_data
] {
911 if (object_data
.must_update()) {
912 ctx
.onode
.get_mutable_layout(ctx
.t
).object_data
.update(object_data
);
914 if (d_object_data
.must_update()) {
915 ctx
.d_onode
->get_mutable_layout(
916 ctx
.t
).object_data
.update(d_object_data
);
918 return seastar::now();
923 ObjectDataHandler::write_ret
ObjectDataHandler::prepare_data_reservation(
925 object_data_t
&object_data
,
928 LOG_PREFIX(ObjectDataHandler::prepare_data_reservation
);
929 ceph_assert(size
<= max_object_size
);
930 if (!object_data
.is_null()) {
931 ceph_assert(object_data
.get_reserved_data_len() == max_object_size
);
932 DEBUGT("reservation present: {}~{}",
934 object_data
.get_reserved_data_base(),
935 object_data
.get_reserved_data_len());
936 return write_iertr::now();
938 DEBUGT("reserving: {}~{}",
940 ctx
.onode
.get_data_hint(),
942 return ctx
.tm
.reserve_region(
944 ctx
.onode
.get_data_hint(),
946 ).si_then([max_object_size
=max_object_size
, &object_data
](auto pin
) {
947 ceph_assert(pin
->get_length() == max_object_size
);
948 object_data
.update_reserved(
951 return write_iertr::now();
956 ObjectDataHandler::clear_ret
ObjectDataHandler::trim_data_reservation(
957 context_t ctx
, object_data_t
&object_data
, extent_len_t size
)
959 ceph_assert(!object_data
.is_null());
960 ceph_assert(size
<= object_data
.get_reserved_data_len());
961 return seastar::do_with(
963 extent_to_write_list_t(),
964 [ctx
, size
, &object_data
](auto &pins
, auto &to_write
) {
965 LOG_PREFIX(ObjectDataHandler::trim_data_reservation
);
966 DEBUGT("object_data: {}~{}",
968 object_data
.get_reserved_data_base(),
969 object_data
.get_reserved_data_len());
970 return ctx
.tm
.get_pins(
972 object_data
.get_reserved_data_base() + size
,
973 object_data
.get_reserved_data_len() - size
974 ).si_then([ctx
, size
, &pins
, &object_data
, &to_write
](auto _pins
) {
976 ceph_assert(pins
.size());
978 // no need to reserve region if we are truncating the object's
980 return clear_iertr::now();
982 auto &pin
= *pins
.front();
983 ceph_assert(pin
.get_key() >= object_data
.get_reserved_data_base());
985 pin
.get_key() <= object_data
.get_reserved_data_base() + size
);
986 auto pin_offset
= pin
.get_key() -
987 object_data
.get_reserved_data_base();
988 if ((pin
.get_key() == (object_data
.get_reserved_data_base() + size
)) ||
989 (pin
.get_val().is_zero())) {
990 /* First pin is exactly at the boundary or is a zero pin. Either way,
991 * remove all pins and add a single zero pin to the end. */
992 to_write
.push_back(extent_to_write_t::create_zero(
994 object_data
.get_reserved_data_len() - pin_offset
));
995 return clear_iertr::now();
997 /* First pin overlaps the boundary and has data, remap it
998 * if aligned or rewrite it if not aligned to size */
999 auto roundup_size
= p2roundup(size
, ctx
.tm
.get_block_size());
1000 auto append_len
= roundup_size
- size
;
1001 if (append_len
== 0) {
1002 LOG_PREFIX(ObjectDataHandler::trim_data_reservation
);
1003 TRACET("First pin overlaps the boundary and has aligned data"
1004 "create existing at addr:{}, len:{}",
1005 ctx
.t
, pin
.get_key(), size
- pin_offset
);
1006 to_write
.push_back(extent_to_write_t::create_existing(
1009 size
- pin_offset
));
1010 to_write
.push_back(extent_to_write_t::create_zero(
1011 object_data
.get_reserved_data_base() + roundup_size
,
1012 object_data
.get_reserved_data_len() - roundup_size
));
1013 return clear_iertr::now();
1015 return ctx
.tm
.read_pin
<ObjectDataBlock
>(
1018 ).si_then([ctx
, size
, pin_offset
, append_len
, roundup_size
,
1019 &pin
, &object_data
, &to_write
](auto extent
) {
1024 pin
.get_intermediate_offset(),
1027 bl
.append_zero(append_len
);
1028 LOG_PREFIX(ObjectDataHandler::trim_data_reservation
);
1029 TRACET("First pin overlaps the boundary and has unaligned data"
1030 "create data at addr:{}, len:{}",
1031 ctx
.t
, pin
.get_key(), bl
.length());
1032 to_write
.push_back(extent_to_write_t::create_data(
1035 to_write
.push_back(extent_to_write_t::create_zero(
1036 object_data
.get_reserved_data_base() + roundup_size
,
1037 object_data
.get_reserved_data_len() - roundup_size
));
1038 return clear_iertr::now();
1042 }).si_then([ctx
, size
, &to_write
, &object_data
, &pins
] {
1043 return seastar::do_with(
1044 prepare_ops_list(pins
, to_write
),
1045 [ctx
, size
, &object_data
](auto &ops
) {
1046 return do_remappings(ctx
, ops
.to_remap
1047 ).si_then([ctx
, &ops
] {
1048 return do_removals(ctx
, ops
.to_remove
);
1049 }).si_then([ctx
, &ops
] {
1050 return do_insertions(ctx
, ops
.to_insert
);
1051 }).si_then([size
, &object_data
] {
1053 object_data
.clear();
1055 return ObjectDataHandler::clear_iertr::now();
1063 * get_to_writes_with_zero_buffer
1065 * Returns extent_to_write_t's reflecting a zero region extending
1066 * from offset~len with headptr optionally on the left and tailptr
1067 * optionally on the right.
1069 extent_to_write_list_t
get_to_writes_with_zero_buffer(
1070 const extent_len_t block_size
,
1071 laddr_t offset
, extent_len_t len
,
1072 std::optional
<bufferptr
> &&headptr
, std::optional
<bufferptr
> &&tailptr
)
1074 auto zero_left
= p2roundup(offset
, (laddr_t
)block_size
);
1075 auto zero_right
= p2align(offset
+ len
, (laddr_t
)block_size
);
1076 auto left
= headptr
? (offset
- headptr
->length()) : offset
;
1077 auto right
= tailptr
?
1078 (offset
+ len
+ tailptr
->length()) :
1082 (headptr
&& ((zero_left
- left
) ==
1083 p2roundup(headptr
->length(), block_size
))) ^
1084 (!headptr
&& (zero_left
== left
)));
1086 (tailptr
&& ((right
- zero_right
) ==
1087 p2roundup(tailptr
->length(), block_size
))) ^
1088 (!tailptr
&& (right
== zero_right
)));
1090 assert(right
> left
);
1091 assert((left
% block_size
) == 0);
1092 assert((right
% block_size
) == 0);
1094 // zero region too small for a reserved section,
1095 // headptr and tailptr in same extent
1096 if (zero_right
<= zero_left
) {
1099 bl
.append(*headptr
);
1102 right
- left
- bl
.length() - (tailptr
? tailptr
->length() : 0));
1104 bl
.append(*tailptr
);
1106 assert(bl
.length() % block_size
== 0);
1107 assert(bl
.length() == (right
- left
));
1108 extent_to_write_list_t ret
;
1109 ret
.push_back(extent_to_write_t::create_data(left
, bl
));
1112 // reserved section between ends, headptr and tailptr in different extents
1113 extent_to_write_list_t ret
;
1116 headbl
.append(*headptr
);
1117 headbl
.append_zero(zero_left
- left
- headbl
.length());
1118 assert(headbl
.length() % block_size
== 0);
1119 assert(headbl
.length() > 0);
1120 ret
.push_back(extent_to_write_t::create_data(left
, headbl
));
1122 // reserved zero region
1123 ret
.push_back(extent_to_write_t::create_zero(zero_left
, zero_right
- zero_left
));
1124 assert(ret
.back().len
% block_size
== 0);
1125 assert(ret
.back().len
> 0);
1128 tailbl
.append(*tailptr
);
1129 tailbl
.append_zero(right
- zero_right
- tailbl
.length());
1130 assert(tailbl
.length() % block_size
== 0);
1131 assert(tailbl
.length() > 0);
1132 ret
.push_back(extent_to_write_t::create_data(zero_right
, tailbl
));
1141 * Returns extent_to_write_t's from bl.
1143 * TODO: probably add some kind of upper limit on extent size.
1145 extent_to_write_list_t
get_to_writes(laddr_t offset
, bufferlist
&bl
)
1147 auto ret
= extent_to_write_list_t();
1148 ret
.push_back(extent_to_write_t::create_data(offset
, bl
));
1152 ObjectDataHandler::write_ret
ObjectDataHandler::overwrite(
1156 std::optional
<bufferlist
> &&bl
,
1157 lba_pin_list_t
&&_pins
)
1159 if (bl
.has_value()) {
1160 assert(bl
->length() == len
);
1162 overwrite_plan_t
overwrite_plan(offset
, len
, _pins
, ctx
.tm
.get_block_size(), ctx
.t
);
1163 return seastar::do_with(
1165 extent_to_write_list_t(),
1166 [ctx
, len
, offset
, overwrite_plan
, bl
=std::move(bl
)]
1167 (auto &pins
, auto &to_write
) mutable
1169 LOG_PREFIX(ObjectDataHandler::overwrite
);
1170 DEBUGT("overwrite: {}~{}",
1174 ceph_assert(pins
.size() >= 1);
1175 DEBUGT("overwrite: split overwrite_plan {}", ctx
.t
, overwrite_plan
);
1177 return operate_left(
1181 ).si_then([ctx
, len
, offset
, overwrite_plan
, bl
=std::move(bl
),
1182 &to_write
, &pins
](auto p
) mutable {
1183 auto &[left_extent
, headptr
] = p
;
1185 ceph_assert(left_extent
->addr
== overwrite_plan
.pin_begin
);
1186 append_extent_to_write(to_write
, std::move(*left_extent
));
1189 assert(headptr
->length() > 0);
1191 return operate_right(
1195 ).si_then([ctx
, len
, offset
,
1196 pin_begin
=overwrite_plan
.pin_begin
,
1197 pin_end
=overwrite_plan
.pin_end
,
1198 bl
=std::move(bl
), headptr
=std::move(headptr
),
1199 &to_write
, &pins
](auto p
) mutable {
1200 auto &[right_extent
, tailptr
] = p
;
1201 if (bl
.has_value()) {
1202 auto write_offset
= offset
;
1203 bufferlist write_bl
;
1205 write_bl
.append(*headptr
);
1206 write_offset
-= headptr
->length();
1207 assert_aligned(write_offset
);
1209 write_bl
.claim_append(*bl
);
1211 write_bl
.append(*tailptr
);
1212 assert_aligned(write_bl
.length());
1214 splice_extent_to_write(
1216 get_to_writes(write_offset
, write_bl
));
1218 splice_extent_to_write(
1220 get_to_writes_with_zero_buffer(
1221 ctx
.tm
.get_block_size(),
1225 std::move(tailptr
)));
1228 ceph_assert(right_extent
->get_end_addr() == pin_end
);
1229 append_extent_to_write(to_write
, std::move(*right_extent
));
1231 assert(to_write
.size());
1232 assert(pin_begin
== to_write
.front().addr
);
1233 assert(pin_end
== to_write
.back().get_end_addr());
1235 return seastar::do_with(
1236 prepare_ops_list(pins
, to_write
),
1238 return do_remappings(ctx
, ops
.to_remap
1239 ).si_then([ctx
, &ops
] {
1240 return do_removals(ctx
, ops
.to_remove
);
1241 }).si_then([ctx
, &ops
] {
1242 return do_insertions(ctx
, ops
.to_insert
);
1250 ObjectDataHandler::zero_ret
ObjectDataHandler::zero(
1255 return with_object_data(
1257 [this, ctx
, offset
, len
](auto &object_data
) {
1258 LOG_PREFIX(ObjectDataHandler::zero
);
1259 DEBUGT("zero to {}~{}, object_data: {}~{}, is_null {}",
1263 object_data
.get_reserved_data_base(),
1264 object_data
.get_reserved_data_len(),
1265 object_data
.is_null());
1266 return prepare_data_reservation(
1269 p2roundup(offset
+ len
, ctx
.tm
.get_block_size())
1270 ).si_then([this, ctx
, offset
, len
, &object_data
] {
1271 auto logical_offset
= object_data
.get_reserved_data_base() + offset
;
1272 return ctx
.tm
.get_pins(
1276 ).si_then([this, ctx
, logical_offset
, len
](auto pins
) {
1278 ctx
, logical_offset
, len
,
1279 std::nullopt
, std::move(pins
));
1285 ObjectDataHandler::write_ret
ObjectDataHandler::write(
1288 const bufferlist
&bl
)
1290 return with_object_data(
1292 [this, ctx
, offset
, &bl
](auto &object_data
) {
1293 LOG_PREFIX(ObjectDataHandler::write
);
1294 DEBUGT("writing to {}~{}, object_data: {}~{}, is_null {}",
1298 object_data
.get_reserved_data_base(),
1299 object_data
.get_reserved_data_len(),
1300 object_data
.is_null());
1301 return prepare_data_reservation(
1304 p2roundup(offset
+ bl
.length(), ctx
.tm
.get_block_size())
1305 ).si_then([this, ctx
, offset
, &object_data
, &bl
] {
1306 auto logical_offset
= object_data
.get_reserved_data_base() + offset
;
1307 return ctx
.tm
.get_pins(
1311 ).si_then([this, ctx
,logical_offset
, &bl
](
1314 ctx
, logical_offset
, bl
.length(),
1315 bufferlist(bl
), std::move(pins
));
1321 ObjectDataHandler::read_ret
ObjectDataHandler::read(
1323 objaddr_t obj_offset
,
1326 return seastar::do_with(
1328 [ctx
, obj_offset
, len
](auto &ret
) {
1329 return with_object_data(
1331 [ctx
, obj_offset
, len
, &ret
](const auto &object_data
) {
1332 LOG_PREFIX(ObjectDataHandler::read
);
1333 DEBUGT("reading {}~{}",
1335 object_data
.get_reserved_data_base(),
1336 object_data
.get_reserved_data_len());
1337 /* Assumption: callers ensure that onode size is <= reserved
1338 * size and that len is adjusted here prior to call */
1339 ceph_assert(!object_data
.is_null());
1340 ceph_assert((obj_offset
+ len
) <= object_data
.get_reserved_data_len());
1341 ceph_assert(len
> 0);
1343 object_data
.get_reserved_data_base() + obj_offset
;
1344 return ctx
.tm
.get_pins(
1348 ).si_then([ctx
, loffset
, len
, &ret
](auto _pins
) {
1349 // offset~len falls within reserved region and len > 0
1350 ceph_assert(_pins
.size() >= 1);
1351 ceph_assert((*_pins
.begin())->get_key() <= loffset
);
1352 return seastar::do_with(
1355 [ctx
, loffset
, len
, &ret
](auto &pins
, auto ¤t
) {
1356 return trans_intr::do_for_each(
1358 [ctx
, loffset
, len
, ¤t
, &ret
](auto &pin
)
1359 -> read_iertr::future
<> {
1360 ceph_assert(current
<= (loffset
+ len
));
1362 (loffset
+ len
) > pin
->get_key());
1363 laddr_t end
= std::min(
1364 pin
->get_key() + pin
->get_length(),
1366 if (pin
->get_val().is_zero()) {
1367 ceph_assert(end
> current
); // See LBAManager::get_mappings
1368 ret
.append_zero(end
- current
);
1370 return seastar::now();
1372 LOG_PREFIX(ObjectDataHandler::read
);
1373 auto key
= pin
->get_key();
1374 bool is_indirect
= pin
->is_indirect();
1375 extent_len_t off
= pin
->get_intermediate_offset();
1376 DEBUGT("reading {}~{}, indirect: {}, "
1377 "intermediate offset: {}, current: {}, end: {}",
1385 return ctx
.tm
.read_pin
<ObjectDataBlock
>(
1388 ).si_then([&ret
, ¤t
, end
, key
, off
,
1389 is_indirect
](auto extent
) {
1392 ? (key
- off
+ extent
->get_length()) >= end
1393 : (extent
->get_laddr() + extent
->get_length()) >= end
);
1394 ceph_assert(end
> current
);
1398 off
+ current
- (is_indirect
? key
: extent
->get_laddr()),
1401 return seastar::now();
1402 }).handle_error_interruptible(
1403 read_iertr::pass_further
{},
1404 crimson::ct_error::assert_all
{
1405 "ObjectDataHandler::read hit invalid error"
1413 return std::move(ret
);
1418 ObjectDataHandler::fiemap_ret
ObjectDataHandler::fiemap(
1420 objaddr_t obj_offset
,
1423 return seastar::do_with(
1424 std::map
<uint64_t, uint64_t>(),
1425 [ctx
, obj_offset
, len
](auto &ret
) {
1426 return with_object_data(
1428 [ctx
, obj_offset
, len
, &ret
](const auto &object_data
) {
1429 LOG_PREFIX(ObjectDataHandler::fiemap
);
1431 "{}~{}, reservation {}~{}",
1435 object_data
.get_reserved_data_base(),
1436 object_data
.get_reserved_data_len());
1437 /* Assumption: callers ensure that onode size is <= reserved
1438 * size and that len is adjusted here prior to call */
1439 ceph_assert(!object_data
.is_null());
1440 ceph_assert((obj_offset
+ len
) <= object_data
.get_reserved_data_len());
1441 ceph_assert(len
> 0);
1443 object_data
.get_reserved_data_base() + obj_offset
;
1444 return ctx
.tm
.get_pins(
1448 ).si_then([loffset
, len
, &object_data
, &ret
](auto &&pins
) {
1449 ceph_assert(pins
.size() >= 1);
1450 ceph_assert((*pins
.begin())->get_key() <= loffset
);
1451 for (auto &&i
: pins
) {
1452 if (!(i
->get_val().is_zero())) {
1453 auto ret_left
= std::max(i
->get_key(), loffset
);
1454 auto ret_right
= std::min(
1455 i
->get_key() + i
->get_length(),
1457 assert(ret_right
> ret_left
);
1460 ret_left
- object_data
.get_reserved_data_base(),
1461 ret_right
- ret_left
1467 return std::move(ret
);
1472 ObjectDataHandler::truncate_ret
ObjectDataHandler::truncate(
1476 return with_object_data(
1478 [this, ctx
, offset
](auto &object_data
) {
1479 LOG_PREFIX(ObjectDataHandler::truncate
);
1480 DEBUGT("truncating {}~{} offset: {}",
1482 object_data
.get_reserved_data_base(),
1483 object_data
.get_reserved_data_len(),
1485 if (offset
< object_data
.get_reserved_data_len()) {
1486 return trim_data_reservation(ctx
, object_data
, offset
);
1487 } else if (offset
> object_data
.get_reserved_data_len()) {
1488 return prepare_data_reservation(
1491 p2roundup(offset
, ctx
.tm
.get_block_size()));
1493 return truncate_iertr::now();
1498 ObjectDataHandler::clear_ret
ObjectDataHandler::clear(
1501 return with_object_data(
1503 [this, ctx
](auto &object_data
) {
1504 LOG_PREFIX(ObjectDataHandler::clear
);
1505 DEBUGT("clearing: {}~{}",
1507 object_data
.get_reserved_data_base(),
1508 object_data
.get_reserved_data_len());
1509 if (object_data
.is_null()) {
1510 return clear_iertr::now();
1512 return trim_data_reservation(ctx
, object_data
, 0);
1516 ObjectDataHandler::clone_ret
ObjectDataHandler::clone_extents(
1518 object_data_t
&object_data
,
1519 lba_pin_list_t
&pins
,
1522 LOG_PREFIX(ObjectDataHandler::clone_extents
);
1523 TRACET(" object_data: {}~{}, data_base: {}",
1525 object_data
.get_reserved_data_base(),
1526 object_data
.get_reserved_data_len(),
1528 return ctx
.tm
.dec_ref(
1530 object_data
.get_reserved_data_base()
1532 [&pins
, &object_data
, ctx
, data_base
](auto) mutable {
1533 return seastar::do_with(
1535 [&object_data
, ctx
, data_base
, &pins
](auto &last_pos
) {
1536 return trans_intr::do_for_each(
1538 [&last_pos
, &object_data
, ctx
, data_base
](auto &pin
) {
1539 auto offset
= pin
->get_key() - data_base
;
1540 ceph_assert(offset
== last_pos
);
1541 auto fut
= TransactionManager::alloc_extent_iertr
1542 ::make_ready_future
<LBAMappingRef
>();
1543 auto addr
= object_data
.get_reserved_data_base() + offset
;
1544 if (pin
->get_val().is_zero()) {
1545 fut
= ctx
.tm
.reserve_region(ctx
.t
, addr
, pin
->get_length());
1547 fut
= ctx
.tm
.clone_pin(ctx
.t
, addr
, *pin
);
1550 [&pin
, &last_pos
, offset
](auto) {
1551 last_pos
= offset
+ pin
->get_length();
1552 return seastar::now();
1553 }).handle_error_interruptible(
1554 crimson::ct_error::input_output_error::pass_further(),
1555 crimson::ct_error::assert_all("not possible")
1557 }).si_then([&last_pos
, &object_data
, ctx
] {
1558 if (last_pos
!= object_data
.get_reserved_data_len()) {
1559 return ctx
.tm
.reserve_region(
1561 object_data
.get_reserved_data_base() + last_pos
,
1562 object_data
.get_reserved_data_len() - last_pos
1563 ).si_then([](auto) {
1564 return seastar::now();
1567 return TransactionManager::reserve_extent_iertr::now();
1571 ObjectDataHandler::write_iertr::pass_further
{},
1572 crimson::ct_error::assert_all
{
1573 "object_data_handler::clone invalid error"
1578 ObjectDataHandler::clone_ret
ObjectDataHandler::clone(
1581 // the whole clone procedure can be seperated into the following steps:
1582 // 1. let clone onode(d_object_data) take the head onode's
1583 // object data base;
1584 // 2. reserve a new region in lba tree for the head onode;
1585 // 3. clone all extents of the clone onode, see transaction_manager.h
1586 // for the details of clone_pin;
1587 // 4. reserve the space between the head onode's size and its reservation
1589 return with_objects_data(
1591 [ctx
, this](auto &object_data
, auto &d_object_data
) {
1592 ceph_assert(d_object_data
.is_null());
1593 if (object_data
.is_null()) {
1594 return clone_iertr::now();
1596 return prepare_data_reservation(
1599 object_data
.get_reserved_data_len()
1600 ).si_then([&object_data
, &d_object_data
, ctx
, this] {
1601 assert(!object_data
.is_null());
1602 auto base
= object_data
.get_reserved_data_base();
1603 auto len
= object_data
.get_reserved_data_len();
1604 object_data
.clear();
1605 LOG_PREFIX(ObjectDataHandler::clone
);
1606 DEBUGT("cloned obj reserve_data_base: {}, len {}",
1608 d_object_data
.get_reserved_data_base(),
1609 d_object_data
.get_reserved_data_len());
1610 return prepare_data_reservation(
1613 d_object_data
.get_reserved_data_len()
1614 ).si_then([&d_object_data
, ctx
, &object_data
, base
, len
, this] {
1615 LOG_PREFIX("ObjectDataHandler::clone");
1616 DEBUGT("head obj reserve_data_base: {}, len {}",
1618 object_data
.get_reserved_data_base(),
1619 object_data
.get_reserved_data_len());
1620 return ctx
.tm
.get_pins(ctx
.t
, base
, len
1621 ).si_then([ctx
, &object_data
, &d_object_data
, base
, this](auto pins
) {
1622 return seastar::do_with(
1624 [ctx
, &object_data
, &d_object_data
, base
, this](auto &pins
) {
1625 return clone_extents(ctx
, object_data
, pins
, base
1626 ).si_then([ctx
, &d_object_data
, base
, &pins
, this] {
1627 return clone_extents(ctx
, d_object_data
, pins
, base
);
1628 }).si_then([&pins
, ctx
] {
1629 return do_removals(ctx
, pins
);
1638 } // namespace crimson::os::seastore