1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
7 #include "crimson/common/log.h"
9 #include "crimson/os/seastore/object_data_handler.h"
12 seastar::logger
& logger() {
13 return crimson::get_logger(ceph_subsys_seastore_odata
);
17 SET_SUBSYS(seastore_odata
);
19 namespace crimson::os::seastore
{
20 #define assert_aligned(x) ceph_assert(((x)%ctx.tm.get_block_size()) == 0)
22 using context_t
= ObjectDataHandler::context_t
;
23 using get_iertr
= ObjectDataHandler::write_iertr
;
28 * Encapsulates extents to be written out using do_insertions.
29 * Indicates a zero/existing extent or a data extent based on whether
30 * to_write is populate.
31 * The meaning of existing_paddr is that the new extent to be
32 * written is the part of exising extent on the disk. existing_paddr
35 struct extent_to_write_t
{
45 /// non-nullopt if and only if type == DATA
46 std::optional
<bufferlist
> to_write
;
47 /// non-nullopt if and only if type == EXISTING
48 std::optional
<paddr_t
> existing_paddr
;
50 extent_to_write_t(const extent_to_write_t
&) = default;
51 extent_to_write_t(extent_to_write_t
&&) = default;
53 bool is_data() const {
54 return type
== type_t::DATA
;
57 bool is_zero() const {
58 return type
== type_t::ZERO
;
61 bool is_existing() const {
62 return type
== type_t::EXISTING
;
65 laddr_t
get_end_addr() const {
69 static extent_to_write_t
create_data(
70 laddr_t addr
, bufferlist to_write
) {
71 return extent_to_write_t(addr
, to_write
);
74 static extent_to_write_t
create_zero(
75 laddr_t addr
, extent_len_t len
) {
76 return extent_to_write_t(addr
, len
);
79 static extent_to_write_t
create_existing(
80 laddr_t addr
, paddr_t existing_paddr
, extent_len_t len
) {
81 return extent_to_write_t(addr
, existing_paddr
, len
);
85 extent_to_write_t(laddr_t addr
, bufferlist to_write
)
86 : type(type_t::DATA
), addr(addr
), len(to_write
.length()),
89 extent_to_write_t(laddr_t addr
, extent_len_t len
)
90 : type(type_t::ZERO
), addr(addr
), len(len
) {}
92 extent_to_write_t(laddr_t addr
, paddr_t existing_paddr
, extent_len_t len
)
93 : type(type_t::EXISTING
), addr(addr
), len(len
),
94 to_write(std::nullopt
), existing_paddr(existing_paddr
) {}
96 using extent_to_write_list_t
= std::list
<extent_to_write_t
>;
99 * append_extent_to_write
101 * Appends passed extent_to_write_t maintaining invariant that the
102 * list may not contain consecutive zero elements by checking and
105 void append_extent_to_write(
106 extent_to_write_list_t
&to_write
, extent_to_write_t
&&to_append
)
108 assert(to_write
.empty() ||
109 to_write
.back().get_end_addr() == to_append
.addr
);
110 if (to_write
.empty() ||
111 to_write
.back().is_data() ||
112 to_append
.is_data() ||
113 to_write
.back().type
!= to_append
.type
) {
114 to_write
.push_back(std::move(to_append
));
116 to_write
.back().len
+= to_append
.len
;
121 * splice_extent_to_write
123 * splices passed extent_to_write_list_t maintaining invariant that the
124 * list may not contain consecutive zero elements by checking and
127 void splice_extent_to_write(
128 extent_to_write_list_t
&to_write
, extent_to_write_list_t
&&to_splice
)
130 if (!to_splice
.empty()) {
131 append_extent_to_write(to_write
, std::move(to_splice
.front()));
132 to_splice
.pop_front();
133 to_write
.splice(to_write
.end(), std::move(to_splice
));
137 /// Removes extents/mappings in pins
138 ObjectDataHandler::write_ret
do_removals(
140 lba_pin_list_t
&pins
)
142 return trans_intr::do_for_each(
145 LOG_PREFIX(object_data_handler
.cc::do_removals
);
146 DEBUGT("decreasing ref: {}",
149 return ctx
.tm
.dec_ref(
154 ObjectDataHandler::write_iertr::pass_further
{},
155 crimson::ct_error::assert_all
{
156 "object_data_handler::do_removals invalid error"
162 /// Creates zero/data extents in to_write
163 ObjectDataHandler::write_ret
do_insertions(
165 extent_to_write_list_t
&to_write
)
167 return trans_intr::do_for_each(
169 [ctx
](auto ®ion
) {
170 LOG_PREFIX(object_data_handler
.cc::do_insertions
);
171 if (region
.is_data()) {
172 assert_aligned(region
.addr
);
173 assert_aligned(region
.len
);
174 ceph_assert(region
.len
== region
.to_write
->length());
175 DEBUGT("allocating extent: {}~{}",
179 return ctx
.tm
.alloc_extent
<ObjectDataBlock
>(
183 ).si_then([®ion
](auto extent
) {
184 if (extent
->get_laddr() != region
.addr
) {
186 "object_data_handler::do_insertions alloc got addr {},"
187 " should have been {}",
191 ceph_assert(extent
->get_laddr() == region
.addr
);
192 ceph_assert(extent
->get_length() == region
.len
);
193 auto iter
= region
.to_write
->cbegin();
194 iter
.copy(region
.len
, extent
->get_bptr().c_str());
195 return ObjectDataHandler::write_iertr::now();
197 } else if (region
.is_zero()) {
198 DEBUGT("reserving: {}~{}",
202 return ctx
.tm
.reserve_region(
206 ).si_then([FNAME
, ctx
, ®ion
](auto pin
) {
207 ceph_assert(pin
->get_length() == region
.len
);
208 if (pin
->get_key() != region
.addr
) {
210 "inconsistent laddr: pin: {} region {}",
215 ceph_assert(pin
->get_key() == region
.addr
);
216 return ObjectDataHandler::write_iertr::now();
219 ceph_assert(region
.is_existing());
220 DEBUGT("map existing extent: laddr {} len {} {}",
221 ctx
.t
, region
.addr
, region
.len
, *region
.existing_paddr
);
222 return ctx
.tm
.map_existing_extent
<ObjectDataBlock
>(
223 ctx
.t
, region
.addr
, *region
.existing_paddr
, region
.len
224 ).handle_error_interruptible(
225 TransactionManager::alloc_extent_iertr::pass_further
{},
226 Device::read_ertr::assert_all
{"ignore read error"}
227 ).si_then([FNAME
, ctx
, ®ion
](auto extent
) {
228 if (extent
->get_laddr() != region
.addr
) {
230 "inconsistent laddr: extent: {} region {}",
235 ceph_assert(extent
->get_laddr() == region
.addr
);
236 return ObjectDataHandler::write_iertr::now();
242 enum class overwrite_operation_t
{
244 OVERWRITE_ZERO
, // fill unaligned data with zero
245 MERGE_EXISTING
, // if present, merge data with the clean/pending extent
246 SPLIT_EXISTING
, // split the existing extent, and fill unaligned data
249 std::ostream
& operator<<(
251 const overwrite_operation_t
&operation
)
254 case overwrite_operation_t::UNKNOWN
:
255 return out
<< "UNKNOWN";
256 case overwrite_operation_t::OVERWRITE_ZERO
:
257 return out
<< "OVERWRITE_ZERO";
258 case overwrite_operation_t::MERGE_EXISTING
:
259 return out
<< "MERGE_EXISTING";
260 case overwrite_operation_t::SPLIT_EXISTING
:
261 return out
<< "SPLIT_EXISTING";
263 return out
<< "!IMPOSSIBLE_OPERATION";
270 * |<--------------------------pins_size---------------------------------------------->|
271 * pin_begin(aligned) pin_end(aligned)
272 * |<------aligned_data_size-------------------------->| (aligned-bl)
273 * aligned_data_begin aligned_data_end
274 * |<-data_size->| (bl)
277 * |<l_extent_size>|<l_alignment_size>| |<r_alignment_size>|<r_extent_size>|
278 * |<-----------left_size------------>| |<-----------right_size----------->|
280 * |<-----(existing left extent/pin)----->| |<-----(existing right extent/pin)----->|
281 * left_paddr right_paddr
283 struct overwrite_plan_t
{
291 laddr_t aligned_data_begin
;
292 laddr_t aligned_data_end
;
295 overwrite_operation_t left_operation
;
296 overwrite_operation_t right_operation
;
299 extent_len_t block_size
;
302 extent_len_t
get_left_size() const {
303 return data_begin
- pin_begin
;
306 extent_len_t
get_left_extent_size() const {
307 return aligned_data_begin
- pin_begin
;
310 extent_len_t
get_left_alignment_size() const {
311 return data_begin
- aligned_data_begin
;
314 extent_len_t
get_right_size() const {
315 return pin_end
- data_end
;
318 extent_len_t
get_right_extent_size() const {
319 return pin_end
- aligned_data_end
;
322 extent_len_t
get_right_alignment_size() const {
323 return aligned_data_end
- data_end
;
326 extent_len_t
get_aligned_data_size() const {
327 return aligned_data_end
- aligned_data_begin
;
330 extent_len_t
get_pins_size() const {
331 return pin_end
- pin_begin
;
334 friend std::ostream
& operator<<(
336 const overwrite_plan_t
& overwrite_plan
) {
337 return out
<< "overwrite_plan_t("
338 << "pin_begin=" << overwrite_plan
.pin_begin
339 << ", pin_end=" << overwrite_plan
.pin_end
340 << ", left_paddr=" << overwrite_plan
.left_paddr
341 << ", right_paddr=" << overwrite_plan
.right_paddr
342 << ", data_begin=" << overwrite_plan
.data_begin
343 << ", data_end=" << overwrite_plan
.data_end
344 << ", aligned_data_begin=" << overwrite_plan
.aligned_data_begin
345 << ", aligned_data_end=" << overwrite_plan
.aligned_data_end
346 << ", left_operation=" << overwrite_plan
.left_operation
347 << ", right_operation=" << overwrite_plan
.right_operation
348 << ", block_size=" << overwrite_plan
.block_size
352 overwrite_plan_t(laddr_t offset
,
354 const lba_pin_list_t
& pins
,
355 extent_len_t block_size
) :
356 pin_begin(pins
.front()->get_key()),
357 pin_end(pins
.back()->get_key() + pins
.back()->get_length()),
358 left_paddr(pins
.front()->get_val()),
359 right_paddr(pins
.back()->get_val()),
361 data_end(offset
+ len
),
362 aligned_data_begin(p2align((uint64_t)data_begin
, (uint64_t)block_size
)),
363 aligned_data_end(p2roundup((uint64_t)data_end
, (uint64_t)block_size
)),
364 left_operation(overwrite_operation_t::UNKNOWN
),
365 right_operation(overwrite_operation_t::UNKNOWN
),
366 block_size(block_size
) {
368 evaluate_operations();
369 assert(left_operation
!= overwrite_operation_t::UNKNOWN
);
370 assert(right_operation
!= overwrite_operation_t::UNKNOWN
);
374 // refer to overwrite_plan_t description
375 void validate() const {
376 ceph_assert(pin_begin
% block_size
== 0);
377 ceph_assert(pin_end
% block_size
== 0);
378 ceph_assert(aligned_data_begin
% block_size
== 0);
379 ceph_assert(aligned_data_end
% block_size
== 0);
381 ceph_assert(pin_begin
<= aligned_data_begin
);
382 ceph_assert(aligned_data_begin
<= data_begin
);
383 ceph_assert(data_begin
<= data_end
);
384 ceph_assert(data_end
<= aligned_data_end
);
385 ceph_assert(aligned_data_end
<= pin_end
);
389 * When trying to modify a portion of an object data block, follow
390 * the read-full-extent-then-merge-new-data strategy, if the write
391 * amplification caused by it is not greater than
392 * seastore_obj_data_write_amplification; otherwise, split the
393 * original extent into at most three parts: origin-left, part-to-be-modified
396 void evaluate_operations() {
397 auto actual_write_size
= get_pins_size();
398 auto aligned_data_size
= get_aligned_data_size();
399 auto left_ext_size
= get_left_extent_size();
400 auto right_ext_size
= get_right_extent_size();
402 if (left_paddr
.is_zero()) {
403 actual_write_size
-= left_ext_size
;
405 left_operation
= overwrite_operation_t::OVERWRITE_ZERO
;
406 // FIXME: left_paddr can be absolute and pending
407 } else if (left_paddr
.is_relative() ||
408 left_paddr
.is_delayed()) {
409 aligned_data_size
+= left_ext_size
;
411 left_operation
= overwrite_operation_t::MERGE_EXISTING
;
414 if (right_paddr
.is_zero()) {
415 actual_write_size
-= right_ext_size
;
417 right_operation
= overwrite_operation_t::OVERWRITE_ZERO
;
418 // FIXME: right_paddr can be absolute and pending
419 } else if (right_paddr
.is_relative() ||
420 right_paddr
.is_delayed()) {
421 aligned_data_size
+= right_ext_size
;
423 right_operation
= overwrite_operation_t::MERGE_EXISTING
;
426 while (left_operation
== overwrite_operation_t::UNKNOWN
||
427 right_operation
== overwrite_operation_t::UNKNOWN
) {
428 if (((double)actual_write_size
/ (double)aligned_data_size
) <=
429 crimson::common::get_conf
<double>("seastore_obj_data_write_amplification")) {
432 if (left_ext_size
== 0 && right_ext_size
== 0) {
435 if (left_ext_size
>= right_ext_size
) {
437 assert(left_operation
== overwrite_operation_t::UNKNOWN
);
438 actual_write_size
-= left_ext_size
;
440 left_operation
= overwrite_operation_t::SPLIT_EXISTING
;
441 } else { // left_ext_size < right_ext_size
443 assert(right_operation
== overwrite_operation_t::UNKNOWN
);
444 actual_write_size
-= right_ext_size
;
446 right_operation
= overwrite_operation_t::SPLIT_EXISTING
;
450 if (left_operation
== overwrite_operation_t::UNKNOWN
) {
451 // no split left, so merge with left
452 left_operation
= overwrite_operation_t::MERGE_EXISTING
;
455 if (right_operation
== overwrite_operation_t::UNKNOWN
) {
456 // no split right, so merge with right
457 right_operation
= overwrite_operation_t::MERGE_EXISTING
;
462 } // namespace crimson::os::seastore
464 #if FMT_VERSION >= 90000
465 template<> struct fmt::formatter
<crimson::os::seastore::overwrite_plan_t
> : fmt::ostream_formatter
{};
468 namespace crimson::os::seastore
{
473 * Proceed overwrite_plan.left_operation.
475 using operate_ret_bare
= std::pair
<
476 std::optional
<extent_to_write_t
>,
477 std::optional
<bufferptr
>>;
478 using operate_ret
= get_iertr::future
<operate_ret_bare
>;
479 operate_ret
operate_left(context_t ctx
, LBAMappingRef
&pin
, const overwrite_plan_t
&overwrite_plan
)
481 if (overwrite_plan
.get_left_size() == 0) {
482 return get_iertr::make_ready_future
<operate_ret_bare
>(
487 if (overwrite_plan
.left_operation
== overwrite_operation_t::OVERWRITE_ZERO
) {
488 assert(pin
->get_val().is_zero());
489 auto zero_extent_len
= overwrite_plan
.get_left_extent_size();
490 assert_aligned(zero_extent_len
);
491 auto zero_prepend_len
= overwrite_plan
.get_left_alignment_size();
492 return get_iertr::make_ready_future
<operate_ret_bare
>(
493 (zero_extent_len
== 0
495 : std::make_optional(extent_to_write_t::create_zero(
496 overwrite_plan
.pin_begin
, zero_extent_len
))),
497 (zero_prepend_len
== 0
499 : std::make_optional(bufferptr(
500 ceph::buffer::create(zero_prepend_len
, 0))))
502 } else if (overwrite_plan
.left_operation
== overwrite_operation_t::MERGE_EXISTING
) {
503 auto prepend_len
= overwrite_plan
.get_left_size();
504 if (prepend_len
== 0) {
505 return get_iertr::make_ready_future
<operate_ret_bare
>(
509 return ctx
.tm
.read_pin
<ObjectDataBlock
>(
510 ctx
.t
, pin
->duplicate()
511 ).si_then([prepend_len
](auto left_extent
) {
512 return get_iertr::make_ready_future
<operate_ret_bare
>(
514 std::make_optional(bufferptr(
515 left_extent
->get_bptr(),
521 assert(overwrite_plan
.left_operation
== overwrite_operation_t::SPLIT_EXISTING
);
523 auto extent_len
= overwrite_plan
.get_left_extent_size();
525 std::optional
<extent_to_write_t
> left_to_write_extent
=
526 std::make_optional(extent_to_write_t::create_existing(
527 overwrite_plan
.pin_begin
,
528 overwrite_plan
.left_paddr
,
531 auto prepend_len
= overwrite_plan
.get_left_alignment_size();
532 if (prepend_len
== 0) {
533 return get_iertr::make_ready_future
<operate_ret_bare
>(
534 left_to_write_extent
,
537 return ctx
.tm
.read_pin
<ObjectDataBlock
>(
538 ctx
.t
, pin
->duplicate()
539 ).si_then([prepend_offset
=extent_len
, prepend_len
,
540 left_to_write_extent
=std::move(left_to_write_extent
)]
541 (auto left_extent
) mutable {
542 return get_iertr::make_ready_future
<operate_ret_bare
>(
543 left_to_write_extent
,
544 std::make_optional(bufferptr(
545 left_extent
->get_bptr(),
556 * Proceed overwrite_plan.right_operation.
558 operate_ret
operate_right(context_t ctx
, LBAMappingRef
&pin
, const overwrite_plan_t
&overwrite_plan
)
560 if (overwrite_plan
.get_right_size() == 0) {
561 return get_iertr::make_ready_future
<operate_ret_bare
>(
566 auto right_pin_begin
= pin
->get_key();
567 assert(overwrite_plan
.data_end
>= right_pin_begin
);
568 if (overwrite_plan
.right_operation
== overwrite_operation_t::OVERWRITE_ZERO
) {
569 assert(pin
->get_val().is_zero());
570 auto zero_suffix_len
= overwrite_plan
.get_right_alignment_size();
571 auto zero_extent_len
= overwrite_plan
.get_right_extent_size();
572 assert_aligned(zero_extent_len
);
573 return get_iertr::make_ready_future
<operate_ret_bare
>(
574 (zero_extent_len
== 0
576 : std::make_optional(extent_to_write_t::create_zero(
577 overwrite_plan
.aligned_data_end
, zero_extent_len
))),
578 (zero_suffix_len
== 0
580 : std::make_optional(bufferptr(
581 ceph::buffer::create(zero_suffix_len
, 0))))
583 } else if (overwrite_plan
.right_operation
== overwrite_operation_t::MERGE_EXISTING
) {
584 auto append_len
= overwrite_plan
.get_right_size();
585 if (append_len
== 0) {
586 return get_iertr::make_ready_future
<operate_ret_bare
>(
590 auto append_offset
= overwrite_plan
.data_end
- right_pin_begin
;
591 return ctx
.tm
.read_pin
<ObjectDataBlock
>(
592 ctx
.t
, pin
->duplicate()
593 ).si_then([append_offset
, append_len
](auto right_extent
) {
594 return get_iertr::make_ready_future
<operate_ret_bare
>(
596 std::make_optional(bufferptr(
597 right_extent
->get_bptr(),
603 assert(overwrite_plan
.right_operation
== overwrite_operation_t::SPLIT_EXISTING
);
605 auto extent_len
= overwrite_plan
.get_right_extent_size();
607 std::optional
<extent_to_write_t
> right_to_write_extent
=
608 std::make_optional(extent_to_write_t::create_existing(
609 overwrite_plan
.aligned_data_end
,
610 overwrite_plan
.right_paddr
.add_offset(overwrite_plan
.aligned_data_end
- right_pin_begin
),
613 auto append_len
= overwrite_plan
.get_right_alignment_size();
614 if (append_len
== 0) {
615 return get_iertr::make_ready_future
<operate_ret_bare
>(
616 right_to_write_extent
,
619 auto append_offset
= overwrite_plan
.data_end
- right_pin_begin
;
620 return ctx
.tm
.read_pin
<ObjectDataBlock
>(
621 ctx
.t
, pin
->duplicate()
622 ).si_then([append_offset
, append_len
,
623 right_to_write_extent
=std::move(right_to_write_extent
)]
624 (auto right_extent
) mutable {
625 return get_iertr::make_ready_future
<operate_ret_bare
>(
626 right_to_write_extent
,
627 std::make_optional(bufferptr(
628 right_extent
->get_bptr(),
636 template <typename F
>
637 auto with_object_data(
638 ObjectDataHandler::context_t ctx
,
641 return seastar::do_with(
642 ctx
.onode
.get_layout().object_data
.get(),
644 [ctx
](auto &object_data
, auto &f
) {
645 return std::invoke(f
, object_data
646 ).si_then([ctx
, &object_data
] {
647 if (object_data
.must_update()) {
648 ctx
.onode
.get_mutable_layout(ctx
.t
).object_data
.update(object_data
);
650 return seastar::now();
655 ObjectDataHandler::write_ret
ObjectDataHandler::prepare_data_reservation(
657 object_data_t
&object_data
,
660 LOG_PREFIX(ObjectDataHandler::prepare_data_reservation
);
661 ceph_assert(size
<= max_object_size
);
662 if (!object_data
.is_null()) {
663 ceph_assert(object_data
.get_reserved_data_len() == max_object_size
);
664 DEBUGT("reservation present: {}~{}",
666 object_data
.get_reserved_data_base(),
667 object_data
.get_reserved_data_len());
668 return write_iertr::now();
670 DEBUGT("reserving: {}~{}",
672 ctx
.onode
.get_data_hint(),
674 return ctx
.tm
.reserve_region(
676 ctx
.onode
.get_data_hint(),
678 ).si_then([max_object_size
=max_object_size
, &object_data
](auto pin
) {
679 ceph_assert(pin
->get_length() == max_object_size
);
680 object_data
.update_reserved(
683 return write_iertr::now();
688 ObjectDataHandler::clear_ret
ObjectDataHandler::trim_data_reservation(
689 context_t ctx
, object_data_t
&object_data
, extent_len_t size
)
691 ceph_assert(!object_data
.is_null());
692 ceph_assert(size
<= object_data
.get_reserved_data_len());
693 return seastar::do_with(
695 extent_to_write_list_t(),
696 [ctx
, size
, &object_data
](auto &pins
, auto &to_write
) {
697 LOG_PREFIX(ObjectDataHandler::trim_data_reservation
);
698 DEBUGT("object_data: {}~{}",
700 object_data
.get_reserved_data_base(),
701 object_data
.get_reserved_data_len());
702 return ctx
.tm
.get_pins(
704 object_data
.get_reserved_data_base() + size
,
705 object_data
.get_reserved_data_len() - size
706 ).si_then([ctx
, size
, &pins
, &object_data
, &to_write
](auto _pins
) {
708 ceph_assert(pins
.size());
709 auto &pin
= *pins
.front();
710 ceph_assert(pin
.get_key() >= object_data
.get_reserved_data_base());
712 pin
.get_key() <= object_data
.get_reserved_data_base() + size
);
713 auto pin_offset
= pin
.get_key() -
714 object_data
.get_reserved_data_base();
715 if ((pin
.get_key() == (object_data
.get_reserved_data_base() + size
)) ||
716 (pin
.get_val().is_zero())) {
717 /* First pin is exactly at the boundary or is a zero pin. Either way,
718 * remove all pins and add a single zero pin to the end. */
719 to_write
.push_back(extent_to_write_t::create_zero(
721 object_data
.get_reserved_data_len() - pin_offset
));
722 return clear_iertr::now();
724 /* First pin overlaps the boundary and has data, read in extent
725 * and rewrite portion prior to size */
726 return ctx
.tm
.read_pin
<ObjectDataBlock
>(
729 ).si_then([ctx
, size
, pin_offset
, &pin
, &object_data
, &to_write
](
738 bl
.append_zero(p2roundup(size
, ctx
.tm
.get_block_size()) - size
);
739 to_write
.push_back(extent_to_write_t::create_data(
742 to_write
.push_back(extent_to_write_t::create_zero(
743 object_data
.get_reserved_data_base() +
744 p2roundup(size
, ctx
.tm
.get_block_size()),
745 object_data
.get_reserved_data_len() -
746 p2roundup(size
, ctx
.tm
.get_block_size())));
747 return clear_iertr::now();
750 }).si_then([ctx
, &pins
] {
751 return do_removals(ctx
, pins
);
752 }).si_then([ctx
, &to_write
] {
753 return do_insertions(ctx
, to_write
);
754 }).si_then([size
, &object_data
] {
758 return ObjectDataHandler::clear_iertr::now();
764 * get_to_writes_with_zero_buffer
766 * Returns extent_to_write_t's reflecting a zero region extending
767 * from offset~len with headptr optionally on the left and tailptr
768 * optionally on the right.
770 extent_to_write_list_t
get_to_writes_with_zero_buffer(
771 const extent_len_t block_size
,
772 laddr_t offset
, extent_len_t len
,
773 std::optional
<bufferptr
> &&headptr
, std::optional
<bufferptr
> &&tailptr
)
775 auto zero_left
= p2roundup(offset
, (laddr_t
)block_size
);
776 auto zero_right
= p2align(offset
+ len
, (laddr_t
)block_size
);
777 auto left
= headptr
? (offset
- headptr
->length()) : offset
;
778 auto right
= tailptr
?
779 (offset
+ len
+ tailptr
->length()) :
783 (headptr
&& ((zero_left
- left
) ==
784 p2roundup(headptr
->length(), block_size
))) ^
785 (!headptr
&& (zero_left
== left
)));
787 (tailptr
&& ((right
- zero_right
) ==
788 p2roundup(tailptr
->length(), block_size
))) ^
789 (!tailptr
&& (right
== zero_right
)));
791 assert(right
> left
);
792 assert((left
% block_size
) == 0);
793 assert((right
% block_size
) == 0);
795 // zero region too small for a reserved section,
796 // headptr and tailptr in same extent
797 if (zero_right
<= zero_left
) {
803 right
- left
- bl
.length() - (tailptr
? tailptr
->length() : 0));
807 assert(bl
.length() % block_size
== 0);
808 assert(bl
.length() == (right
- left
));
809 return {extent_to_write_t::create_data(left
, bl
)};
811 // reserved section between ends, headptr and tailptr in different extents
812 extent_to_write_list_t ret
;
815 headbl
.append(*headptr
);
816 headbl
.append_zero(zero_left
- left
- headbl
.length());
817 assert(headbl
.length() % block_size
== 0);
818 assert(headbl
.length() > 0);
819 ret
.push_back(extent_to_write_t::create_data(left
, headbl
));
821 // reserved zero region
822 ret
.push_back(extent_to_write_t::create_zero(zero_left
, zero_right
- zero_left
));
823 assert(ret
.back().len
% block_size
== 0);
824 assert(ret
.back().len
> 0);
827 tailbl
.append(*tailptr
);
828 tailbl
.append_zero(right
- zero_right
- tailbl
.length());
829 assert(tailbl
.length() % block_size
== 0);
830 assert(tailbl
.length() > 0);
831 ret
.push_back(extent_to_write_t::create_data(zero_right
, tailbl
));
840 * Returns extent_to_write_t's from bl.
842 * TODO: probably add some kind of upper limit on extent size.
844 extent_to_write_list_t
get_to_writes(laddr_t offset
, bufferlist
&bl
)
846 auto ret
= extent_to_write_list_t();
847 ret
.push_back(extent_to_write_t::create_data(offset
, bl
));
851 ObjectDataHandler::write_ret
ObjectDataHandler::overwrite(
855 std::optional
<bufferlist
> &&bl
,
856 lba_pin_list_t
&&_pins
)
858 if (bl
.has_value()) {
859 assert(bl
->length() == len
);
861 overwrite_plan_t
overwrite_plan(offset
, len
, _pins
, ctx
.tm
.get_block_size());
862 return seastar::do_with(
864 extent_to_write_list_t(),
865 [ctx
, len
, offset
, overwrite_plan
, bl
=std::move(bl
)]
866 (auto &pins
, auto &to_write
) mutable
868 LOG_PREFIX(ObjectDataHandler::overwrite
);
869 DEBUGT("overwrite: {}~{}",
873 ceph_assert(pins
.size() >= 1);
874 DEBUGT("overwrite: split overwrite_plan {}", ctx
.t
, overwrite_plan
);
880 ).si_then([ctx
, len
, offset
, overwrite_plan
, bl
=std::move(bl
),
881 &to_write
, &pins
](auto p
) mutable {
882 auto &[left_extent
, headptr
] = p
;
884 ceph_assert(left_extent
->addr
== overwrite_plan
.pin_begin
);
885 append_extent_to_write(to_write
, std::move(*left_extent
));
888 assert(headptr
->length() > 0);
890 return operate_right(
894 ).si_then([ctx
, len
, offset
,
895 pin_begin
=overwrite_plan
.pin_begin
,
896 pin_end
=overwrite_plan
.pin_end
,
897 bl
=std::move(bl
), headptr
=std::move(headptr
),
898 &to_write
, &pins
](auto p
) mutable {
899 auto &[right_extent
, tailptr
] = p
;
900 if (bl
.has_value()) {
901 auto write_offset
= offset
;
904 write_bl
.append(*headptr
);
905 write_offset
-= headptr
->length();
906 assert_aligned(write_offset
);
908 write_bl
.claim_append(*bl
);
910 write_bl
.append(*tailptr
);
911 assert_aligned(write_bl
.length());
913 splice_extent_to_write(
915 get_to_writes(write_offset
, write_bl
));
917 splice_extent_to_write(
919 get_to_writes_with_zero_buffer(
920 ctx
.tm
.get_block_size(),
924 std::move(tailptr
)));
927 ceph_assert(right_extent
->get_end_addr() == pin_end
);
928 append_extent_to_write(to_write
, std::move(*right_extent
));
930 assert(to_write
.size());
931 assert(pin_begin
== to_write
.front().addr
);
932 assert(pin_end
== to_write
.back().get_end_addr());
934 return do_removals(ctx
, pins
);
935 }).si_then([ctx
, &to_write
] {
936 return do_insertions(ctx
, to_write
);
942 ObjectDataHandler::zero_ret
ObjectDataHandler::zero(
947 return with_object_data(
949 [this, ctx
, offset
, len
](auto &object_data
) {
950 LOG_PREFIX(ObjectDataHandler::zero
);
951 DEBUGT("zero to {}~{}, object_data: {}~{}, is_null {}",
955 object_data
.get_reserved_data_base(),
956 object_data
.get_reserved_data_len(),
957 object_data
.is_null());
958 return prepare_data_reservation(
961 p2roundup(offset
+ len
, ctx
.tm
.get_block_size())
962 ).si_then([this, ctx
, offset
, len
, &object_data
] {
963 auto logical_offset
= object_data
.get_reserved_data_base() + offset
;
964 return ctx
.tm
.get_pins(
968 ).si_then([this, ctx
, logical_offset
, len
](auto pins
) {
970 ctx
, logical_offset
, len
,
971 std::nullopt
, std::move(pins
));
977 ObjectDataHandler::write_ret
ObjectDataHandler::write(
980 const bufferlist
&bl
)
982 return with_object_data(
984 [this, ctx
, offset
, &bl
](auto &object_data
) {
985 LOG_PREFIX(ObjectDataHandler::write
);
986 DEBUGT("writing to {}~{}, object_data: {}~{}, is_null {}",
990 object_data
.get_reserved_data_base(),
991 object_data
.get_reserved_data_len(),
992 object_data
.is_null());
993 return prepare_data_reservation(
996 p2roundup(offset
+ bl
.length(), ctx
.tm
.get_block_size())
997 ).si_then([this, ctx
, offset
, &object_data
, &bl
] {
998 auto logical_offset
= object_data
.get_reserved_data_base() + offset
;
999 return ctx
.tm
.get_pins(
1003 ).si_then([this, ctx
,logical_offset
, &bl
](
1006 ctx
, logical_offset
, bl
.length(),
1007 bufferlist(bl
), std::move(pins
));
1013 ObjectDataHandler::read_ret
ObjectDataHandler::read(
1015 objaddr_t obj_offset
,
1018 return seastar::do_with(
1020 [ctx
, obj_offset
, len
](auto &ret
) {
1021 return with_object_data(
1023 [ctx
, obj_offset
, len
, &ret
](const auto &object_data
) {
1024 LOG_PREFIX(ObjectDataHandler::read
);
1025 DEBUGT("reading {}~{}",
1027 object_data
.get_reserved_data_base(),
1028 object_data
.get_reserved_data_len());
1029 /* Assumption: callers ensure that onode size is <= reserved
1030 * size and that len is adjusted here prior to call */
1031 ceph_assert(!object_data
.is_null());
1032 ceph_assert((obj_offset
+ len
) <= object_data
.get_reserved_data_len());
1033 ceph_assert(len
> 0);
1035 object_data
.get_reserved_data_base() + obj_offset
;
1036 return ctx
.tm
.get_pins(
1040 ).si_then([ctx
, loffset
, len
, &ret
](auto _pins
) {
1041 // offset~len falls within reserved region and len > 0
1042 ceph_assert(_pins
.size() >= 1);
1043 ceph_assert((*_pins
.begin())->get_key() <= loffset
);
1044 return seastar::do_with(
1047 [ctx
, loffset
, len
, &ret
](auto &pins
, auto ¤t
) {
1048 return trans_intr::do_for_each(
1050 [ctx
, loffset
, len
, ¤t
, &ret
](auto &pin
)
1051 -> read_iertr::future
<> {
1052 ceph_assert(current
<= (loffset
+ len
));
1054 (loffset
+ len
) > pin
->get_key());
1055 laddr_t end
= std::min(
1056 pin
->get_key() + pin
->get_length(),
1058 if (pin
->get_val().is_zero()) {
1059 ceph_assert(end
> current
); // See LBAManager::get_mappings
1060 ret
.append_zero(end
- current
);
1062 return seastar::now();
1064 return ctx
.tm
.read_pin
<ObjectDataBlock
>(
1067 ).si_then([&ret
, ¤t
, end
](auto extent
) {
1069 (extent
->get_laddr() + extent
->get_length()) >= end
);
1070 ceph_assert(end
> current
);
1074 current
- extent
->get_laddr(),
1077 return seastar::now();
1078 }).handle_error_interruptible(
1079 read_iertr::pass_further
{},
1080 crimson::ct_error::assert_all
{
1081 "ObjectDataHandler::read hit invalid error"
1089 return std::move(ret
);
1094 ObjectDataHandler::fiemap_ret
ObjectDataHandler::fiemap(
1096 objaddr_t obj_offset
,
1099 return seastar::do_with(
1100 std::map
<uint64_t, uint64_t>(),
1101 [ctx
, obj_offset
, len
](auto &ret
) {
1102 return with_object_data(
1104 [ctx
, obj_offset
, len
, &ret
](const auto &object_data
) {
1105 LOG_PREFIX(ObjectDataHandler::fiemap
);
1107 "{}~{}, reservation {}~{}",
1111 object_data
.get_reserved_data_base(),
1112 object_data
.get_reserved_data_len());
1113 /* Assumption: callers ensure that onode size is <= reserved
1114 * size and that len is adjusted here prior to call */
1115 ceph_assert(!object_data
.is_null());
1116 ceph_assert((obj_offset
+ len
) <= object_data
.get_reserved_data_len());
1117 ceph_assert(len
> 0);
1119 object_data
.get_reserved_data_base() + obj_offset
;
1120 return ctx
.tm
.get_pins(
1124 ).si_then([loffset
, len
, &object_data
, &ret
](auto &&pins
) {
1125 ceph_assert(pins
.size() >= 1);
1126 ceph_assert((*pins
.begin())->get_key() <= loffset
);
1127 for (auto &&i
: pins
) {
1128 if (!(i
->get_val().is_zero())) {
1129 auto ret_left
= std::max(i
->get_key(), loffset
);
1130 auto ret_right
= std::min(
1131 i
->get_key() + i
->get_length(),
1133 assert(ret_right
> ret_left
);
1136 ret_left
- object_data
.get_reserved_data_base(),
1137 ret_right
- ret_left
1143 return std::move(ret
);
1148 ObjectDataHandler::truncate_ret
ObjectDataHandler::truncate(
1152 return with_object_data(
1154 [this, ctx
, offset
](auto &object_data
) {
1155 LOG_PREFIX(ObjectDataHandler::truncate
);
1156 DEBUGT("truncating {}~{} offset: {}",
1158 object_data
.get_reserved_data_base(),
1159 object_data
.get_reserved_data_len(),
1161 if (offset
< object_data
.get_reserved_data_len()) {
1162 return trim_data_reservation(ctx
, object_data
, offset
);
1163 } else if (offset
> object_data
.get_reserved_data_len()) {
1164 return prepare_data_reservation(
1167 p2roundup(offset
, ctx
.tm
.get_block_size()));
1169 return truncate_iertr::now();
1174 ObjectDataHandler::clear_ret
ObjectDataHandler::clear(
1177 return with_object_data(
1179 [this, ctx
](auto &object_data
) {
1180 LOG_PREFIX(ObjectDataHandler::clear
);
1181 DEBUGT("clearing: {}~{}",
1183 object_data
.get_reserved_data_base(),
1184 object_data
.get_reserved_data_len());
1185 if (object_data
.is_null()) {
1186 return clear_iertr::now();
1188 return trim_data_reservation(ctx
, object_data
, 0);
1192 } // namespace crimson::os::seastore