]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/object_data_handler.cc
0d852696b7144a6a5e3068d1f45e991159409b1f
[ceph.git] / ceph / src / crimson / os / seastore / object_data_handler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <utility>
5 #include <functional>
6
7 #include "crimson/common/log.h"
8
9 #include "crimson/os/seastore/object_data_handler.h"
10
11 namespace {
12 seastar::logger& logger() {
13 return crimson::get_logger(ceph_subsys_seastore_odata);
14 }
15 }
16
17 SET_SUBSYS(seastore_odata);
18
19 namespace crimson::os::seastore {
20 #define assert_aligned(x) ceph_assert(((x)%ctx.tm.get_block_size()) == 0)
21
22 using context_t = ObjectDataHandler::context_t;
23 using get_iertr = ObjectDataHandler::write_iertr;
24
25 /**
26 * extent_to_write_t
27 *
28 * Encapsulates smallest write operations in overwrite.
29 * Indicates a zero/existing extent or a data extent based on whether
30 * to_write is populate.
31 * Should be handled by prepare_ops_list.
32 */
33 struct extent_to_write_t {
34 enum class type_t {
35 DATA,
36 ZERO,
37 EXISTING,
38 };
39 type_t type;
40
41 /// pin of original extent, not nullptr if type == EXISTING
42 LBAMappingRef pin;
43
44 laddr_t addr;
45 extent_len_t len;
46
47 /// non-nullopt if and only if type == DATA
48 std::optional<bufferlist> to_write;
49
50 extent_to_write_t(const extent_to_write_t &) = delete;
51 extent_to_write_t(extent_to_write_t &&) = default;
52
53 bool is_data() const {
54 return type == type_t::DATA;
55 }
56
57 bool is_zero() const {
58 return type == type_t::ZERO;
59 }
60
61 bool is_existing() const {
62 return type == type_t::EXISTING;
63 }
64
65 laddr_t get_end_addr() const {
66 return addr + len;
67 }
68
69 static extent_to_write_t create_data(
70 laddr_t addr, bufferlist to_write) {
71 return extent_to_write_t(addr, to_write);
72 }
73
74 static extent_to_write_t create_zero(
75 laddr_t addr, extent_len_t len) {
76 return extent_to_write_t(addr, len);
77 }
78
79 static extent_to_write_t create_existing(
80 LBAMappingRef &&pin, laddr_t addr, extent_len_t len) {
81 assert(pin);
82 return extent_to_write_t(std::move(pin), addr, len);
83 }
84
85 private:
86 extent_to_write_t(laddr_t addr, bufferlist to_write)
87 : type(type_t::DATA), addr(addr), len(to_write.length()),
88 to_write(to_write) {}
89
90 extent_to_write_t(laddr_t addr, extent_len_t len)
91 : type(type_t::ZERO), addr(addr), len(len) {}
92
93 extent_to_write_t(LBAMappingRef &&pin, laddr_t addr, extent_len_t len)
94 : type(type_t::EXISTING), pin(std::move(pin)), addr(addr), len(len) {}
95 };
96 using extent_to_write_list_t = std::list<extent_to_write_t>;
97
98 // Encapsulates extents to be written out using do_remappings.
99 struct extent_to_remap_t {
100 enum class type_t {
101 REMAP,
102 OVERWRITE
103 };
104 type_t type;
105 /// pin of original extent
106 LBAMappingRef pin;
107 /// offset of remapped extent or overwrite part of overwrite extent.
108 /// overwrite part of overwrite extent might correspond to mutiple
109 /// fresh write extent.
110 extent_len_t new_offset;
111 /// length of remapped extent or overwrite part of overwrite extent
112 extent_len_t new_len;
113
114 extent_to_remap_t(const extent_to_remap_t &) = delete;
115 extent_to_remap_t(extent_to_remap_t &&) = default;
116
117 bool is_remap() const {
118 return type == type_t::REMAP;
119 }
120
121 bool is_overwrite() const {
122 assert((new_offset != 0) && (pin->get_length() != new_offset + new_len));
123 return type == type_t::OVERWRITE;
124 }
125
126 using remap_entry = TransactionManager::remap_entry;
127 remap_entry create_remap_entry() {
128 assert(is_remap());
129 return remap_entry(
130 new_offset,
131 new_len);
132 }
133
134 remap_entry create_left_remap_entry() {
135 assert(is_overwrite());
136 return remap_entry(
137 0,
138 new_offset);
139 }
140
141 remap_entry create_right_remap_entry() {
142 assert(is_overwrite());
143 return remap_entry(
144 new_offset + new_len,
145 pin->get_length() - new_offset - new_len);
146 }
147
148 static extent_to_remap_t create_remap(
149 LBAMappingRef &&pin, extent_len_t new_offset, extent_len_t new_len) {
150 return extent_to_remap_t(type_t::REMAP,
151 std::move(pin), new_offset, new_len);
152 }
153
154 static extent_to_remap_t create_overwrite(
155 LBAMappingRef &&pin, extent_len_t new_offset, extent_len_t new_len) {
156 return extent_to_remap_t(type_t::OVERWRITE,
157 std::move(pin), new_offset, new_len);
158 }
159
160 private:
161 extent_to_remap_t(type_t type,
162 LBAMappingRef &&pin, extent_len_t new_offset, extent_len_t new_len)
163 : type(type),
164 pin(std::move(pin)), new_offset(new_offset), new_len(new_len) {}
165 };
166 using extent_to_remap_list_t = std::list<extent_to_remap_t>;
167
168 // Encapsulates extents to be written out using do_insertions.
169 struct extent_to_insert_t {
170 enum class type_t {
171 DATA,
172 ZERO
173 };
174 type_t type;
175 /// laddr of new extent
176 laddr_t addr;
177 /// length of new extent
178 extent_len_t len;
179 /// non-nullopt if type == DATA
180 std::optional<bufferlist> bl;
181
182 extent_to_insert_t(const extent_to_insert_t &) = default;
183 extent_to_insert_t(extent_to_insert_t &&) = default;
184
185 bool is_data() const {
186 return type == type_t::DATA;
187 }
188
189 bool is_zero() const {
190 return type == type_t::ZERO;
191 }
192
193 static extent_to_insert_t create_data(
194 laddr_t addr, extent_len_t len, std::optional<bufferlist> bl) {
195 return extent_to_insert_t(addr, len, bl);
196 }
197
198 static extent_to_insert_t create_zero(
199 laddr_t addr, extent_len_t len) {
200 return extent_to_insert_t(addr, len);
201 }
202
203 private:
204 extent_to_insert_t(laddr_t addr, extent_len_t len,
205 std::optional<bufferlist> bl)
206 :type(type_t::DATA), addr(addr), len(len), bl(bl) {}
207
208 extent_to_insert_t(laddr_t addr, extent_len_t len)
209 :type(type_t::ZERO), addr(addr), len(len) {}
210 };
211 using extent_to_insert_list_t = std::list<extent_to_insert_t>;
212
213 // Encapsulates extents to be retired in do_removals.
214 using extent_to_remove_list_t = std::list<LBAMappingRef>;
215
216 struct overwrite_ops_t {
217 extent_to_remap_list_t to_remap;
218 extent_to_insert_list_t to_insert;
219 extent_to_remove_list_t to_remove;
220 };
221
222 // prepare to_remap, to_retire, to_insert list
223 overwrite_ops_t prepare_ops_list(
224 lba_pin_list_t &pins_to_remove,
225 extent_to_write_list_t &to_write) {
226 assert(pins_to_remove.size() != 0);
227 overwrite_ops_t ops;
228 ops.to_remove.swap(pins_to_remove);
229 if (to_write.empty()) {
230 logger().debug("empty to_write");
231 return ops;
232 }
233 long unsigned int visitted = 0;
234 auto& front = to_write.front();
235 auto& back = to_write.back();
236
237 // prepare overwrite, happens in one original extent.
238 if (ops.to_remove.size() == 1 &&
239 front.is_existing() && back.is_existing()) {
240 visitted += 2;
241 assert(to_write.size() > 2);
242 assert(front.addr == front.pin->get_key());
243 assert(back.addr > back.pin->get_key());
244 ops.to_remap.push_back(extent_to_remap_t::create_overwrite(
245 std::move(front.pin),
246 front.len,
247 back.addr - front.addr - front.len));
248 ops.to_remove.pop_front();
249 } else {
250 // prepare to_remap, happens in one or multiple extents
251 if (front.is_existing()) {
252 visitted++;
253 assert(to_write.size() > 1);
254 assert(front.addr == front.pin->get_key());
255 ops.to_remap.push_back(extent_to_remap_t::create_remap(
256 std::move(front.pin),
257 0,
258 front.len));
259 ops.to_remove.pop_front();
260 }
261 if (back.is_existing()) {
262 visitted++;
263 assert(to_write.size() > 1);
264 assert(back.addr + back.len ==
265 back.pin->get_key() + back.pin->get_length());
266 ops.to_remap.push_back(extent_to_remap_t::create_remap(
267 std::move(back.pin),
268 back.addr - back.pin->get_key(),
269 back.len));
270 ops.to_remove.pop_back();
271 }
272 }
273
274 // prepare to_insert
275 for (auto &region : to_write) {
276 if (region.is_data()) {
277 visitted++;
278 assert(region.to_write.has_value());
279 ops.to_insert.push_back(extent_to_insert_t::create_data(
280 region.addr, region.len, region.to_write));
281 } else if (region.is_zero()) {
282 visitted++;
283 assert(!(region.to_write.has_value()));
284 ops.to_insert.push_back(extent_to_insert_t::create_zero(
285 region.addr, region.len));
286 }
287 }
288
289 logger().debug(
290 "to_remap list size: {}"
291 " to_insert list size: {}"
292 " to_remove list size: {}",
293 ops.to_remap.size(), ops.to_insert.size(), ops.to_remove.size());
294 assert(visitted == to_write.size());
295 return ops;
296 }
297
298 /**
299 * append_extent_to_write
300 *
301 * Appends passed extent_to_write_t maintaining invariant that the
302 * list may not contain consecutive zero elements by checking and
303 * combining them.
304 */
305 void append_extent_to_write(
306 extent_to_write_list_t &to_write, extent_to_write_t &&to_append)
307 {
308 assert(to_write.empty() ||
309 to_write.back().get_end_addr() == to_append.addr);
310 if (to_write.empty() ||
311 to_write.back().is_data() ||
312 to_append.is_data() ||
313 to_write.back().type != to_append.type) {
314 to_write.push_back(std::move(to_append));
315 } else {
316 to_write.back().len += to_append.len;
317 }
318 }
319
320 /**
321 * splice_extent_to_write
322 *
323 * splices passed extent_to_write_list_t maintaining invariant that the
324 * list may not contain consecutive zero elements by checking and
325 * combining them.
326 */
327 void splice_extent_to_write(
328 extent_to_write_list_t &to_write, extent_to_write_list_t &&to_splice)
329 {
330 if (!to_splice.empty()) {
331 append_extent_to_write(to_write, std::move(to_splice.front()));
332 to_splice.pop_front();
333 to_write.splice(to_write.end(), std::move(to_splice));
334 }
335 }
336
337 /// Creates remap extents in to_remap
338 ObjectDataHandler::write_ret do_remappings(
339 context_t ctx,
340 extent_to_remap_list_t &to_remap)
341 {
342 return trans_intr::do_for_each(
343 to_remap,
344 [ctx](auto &region) {
345 if (region.is_remap()) {
346 return ctx.tm.remap_pin<ObjectDataBlock, 1>(
347 ctx.t,
348 std::move(region.pin),
349 std::array{
350 region.create_remap_entry()
351 }
352 ).si_then([&region](auto pins) {
353 ceph_assert(pins.size() == 1);
354 ceph_assert(region.new_len == pins[0]->get_length());
355 return ObjectDataHandler::write_iertr::now();
356 });
357 } else if (region.is_overwrite()) {
358 return ctx.tm.remap_pin<ObjectDataBlock, 2>(
359 ctx.t,
360 std::move(region.pin),
361 std::array{
362 region.create_left_remap_entry(),
363 region.create_right_remap_entry()
364 }
365 ).si_then([&region](auto pins) {
366 ceph_assert(pins.size() == 2);
367 ceph_assert(region.pin->get_key() == pins[0]->get_key());
368 ceph_assert(region.pin->get_key() + pins[0]->get_length() +
369 region.new_len == pins[1]->get_key());
370 return ObjectDataHandler::write_iertr::now();
371 });
372 } else {
373 ceph_abort("impossible");
374 return ObjectDataHandler::write_iertr::now();
375 }
376 });
377 }
378
379 ObjectDataHandler::write_ret do_removals(
380 context_t ctx,
381 lba_pin_list_t &to_remove)
382 {
383 return trans_intr::do_for_each(
384 to_remove,
385 [ctx](auto &pin) {
386 LOG_PREFIX(object_data_handler.cc::do_removals);
387 DEBUGT("decreasing ref: {}",
388 ctx.t,
389 pin->get_key());
390 return ctx.tm.dec_ref(
391 ctx.t,
392 pin->get_key()
393 ).si_then(
394 [](auto){},
395 ObjectDataHandler::write_iertr::pass_further{},
396 crimson::ct_error::assert_all{
397 "object_data_handler::do_removals invalid error"
398 }
399 );
400 });
401 }
402
403 /// Creates zero/data extents in to_insert
404 ObjectDataHandler::write_ret do_insertions(
405 context_t ctx,
406 extent_to_insert_list_t &to_insert)
407 {
408 return trans_intr::do_for_each(
409 to_insert,
410 [ctx](auto &region) {
411 LOG_PREFIX(object_data_handler.cc::do_insertions);
412 if (region.is_data()) {
413 assert_aligned(region.addr);
414 assert_aligned(region.len);
415 ceph_assert(region.len == region.bl->length());
416 DEBUGT("allocating extent: {}~{}",
417 ctx.t,
418 region.addr,
419 region.len);
420 return ctx.tm.alloc_extent<ObjectDataBlock>(
421 ctx.t,
422 region.addr,
423 region.len
424 ).si_then([&region](auto extent) {
425 if (extent->get_laddr() != region.addr) {
426 logger().debug(
427 "object_data_handler::do_insertions alloc got addr {},"
428 " should have been {}",
429 extent->get_laddr(),
430 region.addr);
431 }
432 ceph_assert(extent->get_laddr() == region.addr);
433 ceph_assert(extent->get_length() == region.len);
434 auto iter = region.bl->cbegin();
435 iter.copy(region.len, extent->get_bptr().c_str());
436 return ObjectDataHandler::write_iertr::now();
437 });
438 } else if (region.is_zero()) {
439 DEBUGT("reserving: {}~{}",
440 ctx.t,
441 region.addr,
442 region.len);
443 return ctx.tm.reserve_region(
444 ctx.t,
445 region.addr,
446 region.len
447 ).si_then([FNAME, ctx, &region](auto pin) {
448 ceph_assert(pin->get_length() == region.len);
449 if (pin->get_key() != region.addr) {
450 ERRORT(
451 "inconsistent laddr: pin: {} region {}",
452 ctx.t,
453 pin->get_key(),
454 region.addr);
455 }
456 ceph_assert(pin->get_key() == region.addr);
457 return ObjectDataHandler::write_iertr::now();
458 });
459 } else {
460 ceph_abort("impossible");
461 return ObjectDataHandler::write_iertr::now();
462 }
463 });
464 }
465
466 enum class overwrite_operation_t {
467 UNKNOWN,
468 OVERWRITE_ZERO, // fill unaligned data with zero
469 MERGE_EXISTING, // if present, merge data with the clean/pending extent
470 SPLIT_EXISTING, // split the existing extent, and fill unaligned data
471 };
472
473 std::ostream& operator<<(
474 std::ostream &out,
475 const overwrite_operation_t &operation)
476 {
477 switch (operation) {
478 case overwrite_operation_t::UNKNOWN:
479 return out << "UNKNOWN";
480 case overwrite_operation_t::OVERWRITE_ZERO:
481 return out << "OVERWRITE_ZERO";
482 case overwrite_operation_t::MERGE_EXISTING:
483 return out << "MERGE_EXISTING";
484 case overwrite_operation_t::SPLIT_EXISTING:
485 return out << "SPLIT_EXISTING";
486 default:
487 return out << "!IMPOSSIBLE_OPERATION";
488 }
489 }
490
491 /**
492 * overwrite_plan_t
493 *
494 * |<--------------------------pins_size---------------------------------------------->|
495 * pin_begin(aligned) pin_end(aligned)
496 * |<------aligned_data_size-------------------------->| (aligned-bl)
497 * aligned_data_begin aligned_data_end
498 * |<-data_size->| (bl)
499 * data_begin end
500 * left(l) right(r)
501 * |<l_extent_size>|<l_alignment_size>| |<r_alignment_size>|<r_extent_size>|
502 * |<-----------left_size------------>| |<-----------right_size----------->|
503 *
504 * |<-----(existing left extent/pin)----->| |<-----(existing right extent/pin)----->|
505 * left_paddr right_paddr
506 */
507 struct overwrite_plan_t {
508 // addresses
509 laddr_t pin_begin;
510 laddr_t pin_end;
511 paddr_t left_paddr;
512 paddr_t right_paddr;
513 laddr_t data_begin;
514 laddr_t data_end;
515 laddr_t aligned_data_begin;
516 laddr_t aligned_data_end;
517
518 // operations
519 overwrite_operation_t left_operation;
520 overwrite_operation_t right_operation;
521
522 // helper member
523 extent_len_t block_size;
524
525 public:
526 extent_len_t get_left_size() const {
527 return data_begin - pin_begin;
528 }
529
530 extent_len_t get_left_extent_size() const {
531 return aligned_data_begin - pin_begin;
532 }
533
534 extent_len_t get_left_alignment_size() const {
535 return data_begin - aligned_data_begin;
536 }
537
538 extent_len_t get_right_size() const {
539 return pin_end - data_end;
540 }
541
542 extent_len_t get_right_extent_size() const {
543 return pin_end - aligned_data_end;
544 }
545
546 extent_len_t get_right_alignment_size() const {
547 return aligned_data_end - data_end;
548 }
549
550 extent_len_t get_aligned_data_size() const {
551 return aligned_data_end - aligned_data_begin;
552 }
553
554 extent_len_t get_pins_size() const {
555 return pin_end - pin_begin;
556 }
557
558 friend std::ostream& operator<<(
559 std::ostream& out,
560 const overwrite_plan_t& overwrite_plan) {
561 return out << "overwrite_plan_t("
562 << "pin_begin=" << overwrite_plan.pin_begin
563 << ", pin_end=" << overwrite_plan.pin_end
564 << ", left_paddr=" << overwrite_plan.left_paddr
565 << ", right_paddr=" << overwrite_plan.right_paddr
566 << ", data_begin=" << overwrite_plan.data_begin
567 << ", data_end=" << overwrite_plan.data_end
568 << ", aligned_data_begin=" << overwrite_plan.aligned_data_begin
569 << ", aligned_data_end=" << overwrite_plan.aligned_data_end
570 << ", left_operation=" << overwrite_plan.left_operation
571 << ", right_operation=" << overwrite_plan.right_operation
572 << ", block_size=" << overwrite_plan.block_size
573 << ")";
574 }
575
576 overwrite_plan_t(laddr_t offset,
577 extent_len_t len,
578 const lba_pin_list_t& pins,
579 extent_len_t block_size,
580 Transaction& t) :
581 pin_begin(pins.front()->get_key()),
582 pin_end(pins.back()->get_key() + pins.back()->get_length()),
583 left_paddr(pins.front()->get_val()),
584 right_paddr(pins.back()->get_val()),
585 data_begin(offset),
586 data_end(offset + len),
587 aligned_data_begin(p2align((uint64_t)data_begin, (uint64_t)block_size)),
588 aligned_data_end(p2roundup((uint64_t)data_end, (uint64_t)block_size)),
589 left_operation(overwrite_operation_t::UNKNOWN),
590 right_operation(overwrite_operation_t::UNKNOWN),
591 block_size(block_size) {
592 validate();
593 evaluate_operations(t);
594 assert(left_operation != overwrite_operation_t::UNKNOWN);
595 assert(right_operation != overwrite_operation_t::UNKNOWN);
596 }
597
598 private:
599 // refer to overwrite_plan_t description
600 void validate() const {
601 ceph_assert(pin_begin % block_size == 0);
602 ceph_assert(pin_end % block_size == 0);
603 ceph_assert(aligned_data_begin % block_size == 0);
604 ceph_assert(aligned_data_end % block_size == 0);
605
606 ceph_assert(pin_begin <= aligned_data_begin);
607 ceph_assert(aligned_data_begin <= data_begin);
608 ceph_assert(data_begin <= data_end);
609 ceph_assert(data_end <= aligned_data_end);
610 ceph_assert(aligned_data_end <= pin_end);
611 }
612
613 /*
614 * When trying to modify a portion of an object data block, follow
615 * the read-full-extent-then-merge-new-data strategy, if the write
616 * amplification caused by it is not greater than
617 * seastore_obj_data_write_amplification; otherwise, split the
618 * original extent into at most three parts: origin-left, part-to-be-modified
619 * and origin-right.
620 */
621 void evaluate_operations(Transaction& t) {
622 auto actual_write_size = get_pins_size();
623 auto aligned_data_size = get_aligned_data_size();
624 auto left_ext_size = get_left_extent_size();
625 auto right_ext_size = get_right_extent_size();
626
627 auto can_merge = [](Transaction& t, paddr_t paddr) {
628 CachedExtentRef ext;
629 if (paddr.is_relative() || paddr.is_delayed()) {
630 return true;
631 } else if (t.get_extent(paddr, &ext) ==
632 Transaction::get_extent_ret::PRESENT) {
633 // FIXME: there is no need to lookup the cache if the pin can
634 // be associated with the extent state
635 if (ext->is_mutable()) {
636 return true;
637 }
638 }
639 return false;
640 };
641 if (left_paddr.is_zero()) {
642 actual_write_size -= left_ext_size;
643 left_ext_size = 0;
644 left_operation = overwrite_operation_t::OVERWRITE_ZERO;
645 } else if (can_merge(t, left_paddr)) {
646 aligned_data_size += left_ext_size;
647 left_ext_size = 0;
648 left_operation = overwrite_operation_t::MERGE_EXISTING;
649 }
650
651 if (right_paddr.is_zero()) {
652 actual_write_size -= right_ext_size;
653 right_ext_size = 0;
654 right_operation = overwrite_operation_t::OVERWRITE_ZERO;
655 } else if (can_merge(t, right_paddr)) {
656 aligned_data_size += right_ext_size;
657 right_ext_size = 0;
658 right_operation = overwrite_operation_t::MERGE_EXISTING;
659 }
660
661 while (left_operation == overwrite_operation_t::UNKNOWN ||
662 right_operation == overwrite_operation_t::UNKNOWN) {
663 if (((double)actual_write_size / (double)aligned_data_size) <=
664 crimson::common::get_conf<double>("seastore_obj_data_write_amplification")) {
665 break;
666 }
667 if (left_ext_size == 0 && right_ext_size == 0) {
668 break;
669 }
670 if (left_ext_size >= right_ext_size) {
671 // split left
672 assert(left_operation == overwrite_operation_t::UNKNOWN);
673 actual_write_size -= left_ext_size;
674 left_ext_size = 0;
675 left_operation = overwrite_operation_t::SPLIT_EXISTING;
676 } else { // left_ext_size < right_ext_size
677 // split right
678 assert(right_operation == overwrite_operation_t::UNKNOWN);
679 actual_write_size -= right_ext_size;
680 right_ext_size = 0;
681 right_operation = overwrite_operation_t::SPLIT_EXISTING;
682 }
683 }
684
685 if (left_operation == overwrite_operation_t::UNKNOWN) {
686 // no split left, so merge with left
687 left_operation = overwrite_operation_t::MERGE_EXISTING;
688 }
689
690 if (right_operation == overwrite_operation_t::UNKNOWN) {
691 // no split right, so merge with right
692 right_operation = overwrite_operation_t::MERGE_EXISTING;
693 }
694 }
695 };
696
697 } // namespace crimson::os::seastore
698
699 #if FMT_VERSION >= 90000
700 template<> struct fmt::formatter<crimson::os::seastore::overwrite_plan_t> : fmt::ostream_formatter {};
701 #endif
702
703 namespace crimson::os::seastore {
704
705 /**
706 * operate_left
707 *
708 * Proceed overwrite_plan.left_operation.
709 */
710 using operate_ret_bare = std::pair<
711 std::optional<extent_to_write_t>,
712 std::optional<bufferptr>>;
713 using operate_ret = get_iertr::future<operate_ret_bare>;
714 operate_ret operate_left(context_t ctx, LBAMappingRef &pin, const overwrite_plan_t &overwrite_plan)
715 {
716 if (overwrite_plan.get_left_size() == 0) {
717 return get_iertr::make_ready_future<operate_ret_bare>(
718 std::nullopt,
719 std::nullopt);
720 }
721
722 if (overwrite_plan.left_operation == overwrite_operation_t::OVERWRITE_ZERO) {
723 assert(pin->get_val().is_zero());
724 auto zero_extent_len = overwrite_plan.get_left_extent_size();
725 assert_aligned(zero_extent_len);
726 auto zero_prepend_len = overwrite_plan.get_left_alignment_size();
727 return get_iertr::make_ready_future<operate_ret_bare>(
728 (zero_extent_len == 0
729 ? std::nullopt
730 : std::make_optional(extent_to_write_t::create_zero(
731 overwrite_plan.pin_begin, zero_extent_len))),
732 (zero_prepend_len == 0
733 ? std::nullopt
734 : std::make_optional(bufferptr(
735 ceph::buffer::create(zero_prepend_len, 0))))
736 );
737 } else if (overwrite_plan.left_operation == overwrite_operation_t::MERGE_EXISTING) {
738 auto prepend_len = overwrite_plan.get_left_size();
739 if (prepend_len == 0) {
740 return get_iertr::make_ready_future<operate_ret_bare>(
741 std::nullopt,
742 std::nullopt);
743 } else {
744 extent_len_t off = pin->get_intermediate_offset();
745 return ctx.tm.read_pin<ObjectDataBlock>(
746 ctx.t, pin->duplicate()
747 ).si_then([prepend_len, off](auto left_extent) {
748 return get_iertr::make_ready_future<operate_ret_bare>(
749 std::nullopt,
750 std::make_optional(bufferptr(
751 left_extent->get_bptr(),
752 off,
753 prepend_len)));
754 });
755 }
756 } else {
757 assert(overwrite_plan.left_operation == overwrite_operation_t::SPLIT_EXISTING);
758
759 auto extent_len = overwrite_plan.get_left_extent_size();
760 assert(extent_len);
761 std::optional<extent_to_write_t> left_to_write_extent =
762 std::make_optional(extent_to_write_t::create_existing(
763 pin->duplicate(),
764 pin->get_key(),
765 extent_len));
766
767 auto prepend_len = overwrite_plan.get_left_alignment_size();
768 if (prepend_len == 0) {
769 return get_iertr::make_ready_future<operate_ret_bare>(
770 std::move(left_to_write_extent),
771 std::nullopt);
772 } else {
773 extent_len_t off = pin->get_intermediate_offset();
774 return ctx.tm.read_pin<ObjectDataBlock>(
775 ctx.t, pin->duplicate()
776 ).si_then([prepend_offset=extent_len + off, prepend_len,
777 left_to_write_extent=std::move(left_to_write_extent)]
778 (auto left_extent) mutable {
779 return get_iertr::make_ready_future<operate_ret_bare>(
780 std::move(left_to_write_extent),
781 std::make_optional(bufferptr(
782 left_extent->get_bptr(),
783 prepend_offset,
784 prepend_len)));
785 });
786 }
787 }
788 };
789
790 /**
791 * operate_right
792 *
793 * Proceed overwrite_plan.right_operation.
794 */
795 operate_ret operate_right(context_t ctx, LBAMappingRef &pin, const overwrite_plan_t &overwrite_plan)
796 {
797 if (overwrite_plan.get_right_size() == 0) {
798 return get_iertr::make_ready_future<operate_ret_bare>(
799 std::nullopt,
800 std::nullopt);
801 }
802
803 auto right_pin_begin = pin->get_key();
804 assert(overwrite_plan.data_end >= right_pin_begin);
805 if (overwrite_plan.right_operation == overwrite_operation_t::OVERWRITE_ZERO) {
806 assert(pin->get_val().is_zero());
807 auto zero_suffix_len = overwrite_plan.get_right_alignment_size();
808 auto zero_extent_len = overwrite_plan.get_right_extent_size();
809 assert_aligned(zero_extent_len);
810 return get_iertr::make_ready_future<operate_ret_bare>(
811 (zero_extent_len == 0
812 ? std::nullopt
813 : std::make_optional(extent_to_write_t::create_zero(
814 overwrite_plan.aligned_data_end, zero_extent_len))),
815 (zero_suffix_len == 0
816 ? std::nullopt
817 : std::make_optional(bufferptr(
818 ceph::buffer::create(zero_suffix_len, 0))))
819 );
820 } else if (overwrite_plan.right_operation == overwrite_operation_t::MERGE_EXISTING) {
821 auto append_len = overwrite_plan.get_right_size();
822 if (append_len == 0) {
823 return get_iertr::make_ready_future<operate_ret_bare>(
824 std::nullopt,
825 std::nullopt);
826 } else {
827 auto append_offset =
828 overwrite_plan.data_end
829 - right_pin_begin
830 + pin->get_intermediate_offset();
831 return ctx.tm.read_pin<ObjectDataBlock>(
832 ctx.t, pin->duplicate()
833 ).si_then([append_offset, append_len](auto right_extent) {
834 return get_iertr::make_ready_future<operate_ret_bare>(
835 std::nullopt,
836 std::make_optional(bufferptr(
837 right_extent->get_bptr(),
838 append_offset,
839 append_len)));
840 });
841 }
842 } else {
843 assert(overwrite_plan.right_operation == overwrite_operation_t::SPLIT_EXISTING);
844
845 auto extent_len = overwrite_plan.get_right_extent_size();
846 assert(extent_len);
847 std::optional<extent_to_write_t> right_to_write_extent =
848 std::make_optional(extent_to_write_t::create_existing(
849 pin->duplicate(),
850 overwrite_plan.aligned_data_end,
851 extent_len));
852
853 auto append_len = overwrite_plan.get_right_alignment_size();
854 if (append_len == 0) {
855 return get_iertr::make_ready_future<operate_ret_bare>(
856 std::move(right_to_write_extent),
857 std::nullopt);
858 } else {
859 auto append_offset =
860 overwrite_plan.data_end
861 - right_pin_begin
862 + pin->get_intermediate_offset();
863 return ctx.tm.read_pin<ObjectDataBlock>(
864 ctx.t, pin->duplicate()
865 ).si_then([append_offset, append_len,
866 right_to_write_extent=std::move(right_to_write_extent)]
867 (auto right_extent) mutable {
868 return get_iertr::make_ready_future<operate_ret_bare>(
869 std::move(right_to_write_extent),
870 std::make_optional(bufferptr(
871 right_extent->get_bptr(),
872 append_offset,
873 append_len)));
874 });
875 }
876 }
877 };
878
879 template <typename F>
880 auto with_object_data(
881 ObjectDataHandler::context_t ctx,
882 F &&f)
883 {
884 return seastar::do_with(
885 ctx.onode.get_layout().object_data.get(),
886 std::forward<F>(f),
887 [ctx](auto &object_data, auto &f) {
888 return std::invoke(f, object_data
889 ).si_then([ctx, &object_data] {
890 if (object_data.must_update()) {
891 ctx.onode.get_mutable_layout(ctx.t).object_data.update(object_data);
892 }
893 return seastar::now();
894 });
895 });
896 }
897
898 template <typename F>
899 auto with_objects_data(
900 ObjectDataHandler::context_t ctx,
901 F &&f)
902 {
903 ceph_assert(ctx.d_onode);
904 return seastar::do_with(
905 ctx.onode.get_layout().object_data.get(),
906 ctx.d_onode->get_layout().object_data.get(),
907 std::forward<F>(f),
908 [ctx](auto &object_data, auto &d_object_data, auto &f) {
909 return std::invoke(f, object_data, d_object_data
910 ).si_then([ctx, &object_data, &d_object_data] {
911 if (object_data.must_update()) {
912 ctx.onode.get_mutable_layout(ctx.t).object_data.update(object_data);
913 }
914 if (d_object_data.must_update()) {
915 ctx.d_onode->get_mutable_layout(
916 ctx.t).object_data.update(d_object_data);
917 }
918 return seastar::now();
919 });
920 });
921 }
922
923 ObjectDataHandler::write_ret ObjectDataHandler::prepare_data_reservation(
924 context_t ctx,
925 object_data_t &object_data,
926 extent_len_t size)
927 {
928 LOG_PREFIX(ObjectDataHandler::prepare_data_reservation);
929 ceph_assert(size <= max_object_size);
930 if (!object_data.is_null()) {
931 ceph_assert(object_data.get_reserved_data_len() == max_object_size);
932 DEBUGT("reservation present: {}~{}",
933 ctx.t,
934 object_data.get_reserved_data_base(),
935 object_data.get_reserved_data_len());
936 return write_iertr::now();
937 } else {
938 DEBUGT("reserving: {}~{}",
939 ctx.t,
940 ctx.onode.get_data_hint(),
941 max_object_size);
942 return ctx.tm.reserve_region(
943 ctx.t,
944 ctx.onode.get_data_hint(),
945 max_object_size
946 ).si_then([max_object_size=max_object_size, &object_data](auto pin) {
947 ceph_assert(pin->get_length() == max_object_size);
948 object_data.update_reserved(
949 pin->get_key(),
950 pin->get_length());
951 return write_iertr::now();
952 });
953 }
954 }
955
956 ObjectDataHandler::clear_ret ObjectDataHandler::trim_data_reservation(
957 context_t ctx, object_data_t &object_data, extent_len_t size)
958 {
959 ceph_assert(!object_data.is_null());
960 ceph_assert(size <= object_data.get_reserved_data_len());
961 return seastar::do_with(
962 lba_pin_list_t(),
963 extent_to_write_list_t(),
964 [ctx, size, &object_data](auto &pins, auto &to_write) {
965 LOG_PREFIX(ObjectDataHandler::trim_data_reservation);
966 DEBUGT("object_data: {}~{}",
967 ctx.t,
968 object_data.get_reserved_data_base(),
969 object_data.get_reserved_data_len());
970 return ctx.tm.get_pins(
971 ctx.t,
972 object_data.get_reserved_data_base() + size,
973 object_data.get_reserved_data_len() - size
974 ).si_then([ctx, size, &pins, &object_data, &to_write](auto _pins) {
975 _pins.swap(pins);
976 ceph_assert(pins.size());
977 if (!size) {
978 // no need to reserve region if we are truncating the object's
979 // size to 0
980 return clear_iertr::now();
981 }
982 auto &pin = *pins.front();
983 ceph_assert(pin.get_key() >= object_data.get_reserved_data_base());
984 ceph_assert(
985 pin.get_key() <= object_data.get_reserved_data_base() + size);
986 auto pin_offset = pin.get_key() -
987 object_data.get_reserved_data_base();
988 if ((pin.get_key() == (object_data.get_reserved_data_base() + size)) ||
989 (pin.get_val().is_zero())) {
990 /* First pin is exactly at the boundary or is a zero pin. Either way,
991 * remove all pins and add a single zero pin to the end. */
992 to_write.push_back(extent_to_write_t::create_zero(
993 pin.get_key(),
994 object_data.get_reserved_data_len() - pin_offset));
995 return clear_iertr::now();
996 } else {
997 /* First pin overlaps the boundary and has data, remap it
998 * if aligned or rewrite it if not aligned to size */
999 auto roundup_size = p2roundup(size, ctx.tm.get_block_size());
1000 auto append_len = roundup_size - size;
1001 if (append_len == 0) {
1002 LOG_PREFIX(ObjectDataHandler::trim_data_reservation);
1003 TRACET("First pin overlaps the boundary and has aligned data"
1004 "create existing at addr:{}, len:{}",
1005 ctx.t, pin.get_key(), size - pin_offset);
1006 to_write.push_back(extent_to_write_t::create_existing(
1007 pin.duplicate(),
1008 pin.get_key(),
1009 size - pin_offset));
1010 to_write.push_back(extent_to_write_t::create_zero(
1011 object_data.get_reserved_data_base() + roundup_size,
1012 object_data.get_reserved_data_len() - roundup_size));
1013 return clear_iertr::now();
1014 } else {
1015 return ctx.tm.read_pin<ObjectDataBlock>(
1016 ctx.t,
1017 pin.duplicate()
1018 ).si_then([ctx, size, pin_offset, append_len, roundup_size,
1019 &pin, &object_data, &to_write](auto extent) {
1020 bufferlist bl;
1021 bl.append(
1022 bufferptr(
1023 extent->get_bptr(),
1024 pin.get_intermediate_offset(),
1025 size - pin_offset
1026 ));
1027 bl.append_zero(append_len);
1028 LOG_PREFIX(ObjectDataHandler::trim_data_reservation);
1029 TRACET("First pin overlaps the boundary and has unaligned data"
1030 "create data at addr:{}, len:{}",
1031 ctx.t, pin.get_key(), bl.length());
1032 to_write.push_back(extent_to_write_t::create_data(
1033 pin.get_key(),
1034 bl));
1035 to_write.push_back(extent_to_write_t::create_zero(
1036 object_data.get_reserved_data_base() + roundup_size,
1037 object_data.get_reserved_data_len() - roundup_size));
1038 return clear_iertr::now();
1039 });
1040 }
1041 }
1042 }).si_then([ctx, size, &to_write, &object_data, &pins] {
1043 return seastar::do_with(
1044 prepare_ops_list(pins, to_write),
1045 [ctx, size, &object_data](auto &ops) {
1046 return do_remappings(ctx, ops.to_remap
1047 ).si_then([ctx, &ops] {
1048 return do_removals(ctx, ops.to_remove);
1049 }).si_then([ctx, &ops] {
1050 return do_insertions(ctx, ops.to_insert);
1051 }).si_then([size, &object_data] {
1052 if (size == 0) {
1053 object_data.clear();
1054 }
1055 return ObjectDataHandler::clear_iertr::now();
1056 });
1057 });
1058 });
1059 });
1060 }
1061
1062 /**
1063 * get_to_writes_with_zero_buffer
1064 *
1065 * Returns extent_to_write_t's reflecting a zero region extending
1066 * from offset~len with headptr optionally on the left and tailptr
1067 * optionally on the right.
1068 */
1069 extent_to_write_list_t get_to_writes_with_zero_buffer(
1070 const extent_len_t block_size,
1071 laddr_t offset, extent_len_t len,
1072 std::optional<bufferptr> &&headptr, std::optional<bufferptr> &&tailptr)
1073 {
1074 auto zero_left = p2roundup(offset, (laddr_t)block_size);
1075 auto zero_right = p2align(offset + len, (laddr_t)block_size);
1076 auto left = headptr ? (offset - headptr->length()) : offset;
1077 auto right = tailptr ?
1078 (offset + len + tailptr->length()) :
1079 (offset + len);
1080
1081 assert(
1082 (headptr && ((zero_left - left) ==
1083 p2roundup(headptr->length(), block_size))) ^
1084 (!headptr && (zero_left == left)));
1085 assert(
1086 (tailptr && ((right - zero_right) ==
1087 p2roundup(tailptr->length(), block_size))) ^
1088 (!tailptr && (right == zero_right)));
1089
1090 assert(right > left);
1091 assert((left % block_size) == 0);
1092 assert((right % block_size) == 0);
1093
1094 // zero region too small for a reserved section,
1095 // headptr and tailptr in same extent
1096 if (zero_right <= zero_left) {
1097 bufferlist bl;
1098 if (headptr) {
1099 bl.append(*headptr);
1100 }
1101 bl.append_zero(
1102 right - left - bl.length() - (tailptr ? tailptr->length() : 0));
1103 if (tailptr) {
1104 bl.append(*tailptr);
1105 }
1106 assert(bl.length() % block_size == 0);
1107 assert(bl.length() == (right - left));
1108 extent_to_write_list_t ret;
1109 ret.push_back(extent_to_write_t::create_data(left, bl));
1110 return ret;
1111 } else {
1112 // reserved section between ends, headptr and tailptr in different extents
1113 extent_to_write_list_t ret;
1114 if (headptr) {
1115 bufferlist headbl;
1116 headbl.append(*headptr);
1117 headbl.append_zero(zero_left - left - headbl.length());
1118 assert(headbl.length() % block_size == 0);
1119 assert(headbl.length() > 0);
1120 ret.push_back(extent_to_write_t::create_data(left, headbl));
1121 }
1122 // reserved zero region
1123 ret.push_back(extent_to_write_t::create_zero(zero_left, zero_right - zero_left));
1124 assert(ret.back().len % block_size == 0);
1125 assert(ret.back().len > 0);
1126 if (tailptr) {
1127 bufferlist tailbl;
1128 tailbl.append(*tailptr);
1129 tailbl.append_zero(right - zero_right - tailbl.length());
1130 assert(tailbl.length() % block_size == 0);
1131 assert(tailbl.length() > 0);
1132 ret.push_back(extent_to_write_t::create_data(zero_right, tailbl));
1133 }
1134 return ret;
1135 }
1136 }
1137
1138 /**
1139 * get_to_writes
1140 *
1141 * Returns extent_to_write_t's from bl.
1142 *
1143 * TODO: probably add some kind of upper limit on extent size.
1144 */
1145 extent_to_write_list_t get_to_writes(laddr_t offset, bufferlist &bl)
1146 {
1147 auto ret = extent_to_write_list_t();
1148 ret.push_back(extent_to_write_t::create_data(offset, bl));
1149 return ret;
1150 };
1151
1152 ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
1153 context_t ctx,
1154 laddr_t offset,
1155 extent_len_t len,
1156 std::optional<bufferlist> &&bl,
1157 lba_pin_list_t &&_pins)
1158 {
1159 if (bl.has_value()) {
1160 assert(bl->length() == len);
1161 }
1162 overwrite_plan_t overwrite_plan(offset, len, _pins, ctx.tm.get_block_size(), ctx.t);
1163 return seastar::do_with(
1164 std::move(_pins),
1165 extent_to_write_list_t(),
1166 [ctx, len, offset, overwrite_plan, bl=std::move(bl)]
1167 (auto &pins, auto &to_write) mutable
1168 {
1169 LOG_PREFIX(ObjectDataHandler::overwrite);
1170 DEBUGT("overwrite: {}~{}",
1171 ctx.t,
1172 offset,
1173 len);
1174 ceph_assert(pins.size() >= 1);
1175 DEBUGT("overwrite: split overwrite_plan {}", ctx.t, overwrite_plan);
1176
1177 return operate_left(
1178 ctx,
1179 pins.front(),
1180 overwrite_plan
1181 ).si_then([ctx, len, offset, overwrite_plan, bl=std::move(bl),
1182 &to_write, &pins](auto p) mutable {
1183 auto &[left_extent, headptr] = p;
1184 if (left_extent) {
1185 ceph_assert(left_extent->addr == overwrite_plan.pin_begin);
1186 append_extent_to_write(to_write, std::move(*left_extent));
1187 }
1188 if (headptr) {
1189 assert(headptr->length() > 0);
1190 }
1191 return operate_right(
1192 ctx,
1193 pins.back(),
1194 overwrite_plan
1195 ).si_then([ctx, len, offset,
1196 pin_begin=overwrite_plan.pin_begin,
1197 pin_end=overwrite_plan.pin_end,
1198 bl=std::move(bl), headptr=std::move(headptr),
1199 &to_write, &pins](auto p) mutable {
1200 auto &[right_extent, tailptr] = p;
1201 if (bl.has_value()) {
1202 auto write_offset = offset;
1203 bufferlist write_bl;
1204 if (headptr) {
1205 write_bl.append(*headptr);
1206 write_offset -= headptr->length();
1207 assert_aligned(write_offset);
1208 }
1209 write_bl.claim_append(*bl);
1210 if (tailptr) {
1211 write_bl.append(*tailptr);
1212 assert_aligned(write_bl.length());
1213 }
1214 splice_extent_to_write(
1215 to_write,
1216 get_to_writes(write_offset, write_bl));
1217 } else {
1218 splice_extent_to_write(
1219 to_write,
1220 get_to_writes_with_zero_buffer(
1221 ctx.tm.get_block_size(),
1222 offset,
1223 len,
1224 std::move(headptr),
1225 std::move(tailptr)));
1226 }
1227 if (right_extent) {
1228 ceph_assert(right_extent->get_end_addr() == pin_end);
1229 append_extent_to_write(to_write, std::move(*right_extent));
1230 }
1231 assert(to_write.size());
1232 assert(pin_begin == to_write.front().addr);
1233 assert(pin_end == to_write.back().get_end_addr());
1234
1235 return seastar::do_with(
1236 prepare_ops_list(pins, to_write),
1237 [ctx](auto &ops) {
1238 return do_remappings(ctx, ops.to_remap
1239 ).si_then([ctx, &ops] {
1240 return do_removals(ctx, ops.to_remove);
1241 }).si_then([ctx, &ops] {
1242 return do_insertions(ctx, ops.to_insert);
1243 });
1244 });
1245 });
1246 });
1247 });
1248 }
1249
1250 ObjectDataHandler::zero_ret ObjectDataHandler::zero(
1251 context_t ctx,
1252 objaddr_t offset,
1253 extent_len_t len)
1254 {
1255 return with_object_data(
1256 ctx,
1257 [this, ctx, offset, len](auto &object_data) {
1258 LOG_PREFIX(ObjectDataHandler::zero);
1259 DEBUGT("zero to {}~{}, object_data: {}~{}, is_null {}",
1260 ctx.t,
1261 offset,
1262 len,
1263 object_data.get_reserved_data_base(),
1264 object_data.get_reserved_data_len(),
1265 object_data.is_null());
1266 return prepare_data_reservation(
1267 ctx,
1268 object_data,
1269 p2roundup(offset + len, ctx.tm.get_block_size())
1270 ).si_then([this, ctx, offset, len, &object_data] {
1271 auto logical_offset = object_data.get_reserved_data_base() + offset;
1272 return ctx.tm.get_pins(
1273 ctx.t,
1274 logical_offset,
1275 len
1276 ).si_then([this, ctx, logical_offset, len](auto pins) {
1277 return overwrite(
1278 ctx, logical_offset, len,
1279 std::nullopt, std::move(pins));
1280 });
1281 });
1282 });
1283 }
1284
1285 ObjectDataHandler::write_ret ObjectDataHandler::write(
1286 context_t ctx,
1287 objaddr_t offset,
1288 const bufferlist &bl)
1289 {
1290 return with_object_data(
1291 ctx,
1292 [this, ctx, offset, &bl](auto &object_data) {
1293 LOG_PREFIX(ObjectDataHandler::write);
1294 DEBUGT("writing to {}~{}, object_data: {}~{}, is_null {}",
1295 ctx.t,
1296 offset,
1297 bl.length(),
1298 object_data.get_reserved_data_base(),
1299 object_data.get_reserved_data_len(),
1300 object_data.is_null());
1301 return prepare_data_reservation(
1302 ctx,
1303 object_data,
1304 p2roundup(offset + bl.length(), ctx.tm.get_block_size())
1305 ).si_then([this, ctx, offset, &object_data, &bl] {
1306 auto logical_offset = object_data.get_reserved_data_base() + offset;
1307 return ctx.tm.get_pins(
1308 ctx.t,
1309 logical_offset,
1310 bl.length()
1311 ).si_then([this, ctx,logical_offset, &bl](
1312 auto pins) {
1313 return overwrite(
1314 ctx, logical_offset, bl.length(),
1315 bufferlist(bl), std::move(pins));
1316 });
1317 });
1318 });
1319 }
1320
1321 ObjectDataHandler::read_ret ObjectDataHandler::read(
1322 context_t ctx,
1323 objaddr_t obj_offset,
1324 extent_len_t len)
1325 {
1326 return seastar::do_with(
1327 bufferlist(),
1328 [ctx, obj_offset, len](auto &ret) {
1329 return with_object_data(
1330 ctx,
1331 [ctx, obj_offset, len, &ret](const auto &object_data) {
1332 LOG_PREFIX(ObjectDataHandler::read);
1333 DEBUGT("reading {}~{}",
1334 ctx.t,
1335 object_data.get_reserved_data_base(),
1336 object_data.get_reserved_data_len());
1337 /* Assumption: callers ensure that onode size is <= reserved
1338 * size and that len is adjusted here prior to call */
1339 ceph_assert(!object_data.is_null());
1340 ceph_assert((obj_offset + len) <= object_data.get_reserved_data_len());
1341 ceph_assert(len > 0);
1342 laddr_t loffset =
1343 object_data.get_reserved_data_base() + obj_offset;
1344 return ctx.tm.get_pins(
1345 ctx.t,
1346 loffset,
1347 len
1348 ).si_then([ctx, loffset, len, &ret](auto _pins) {
1349 // offset~len falls within reserved region and len > 0
1350 ceph_assert(_pins.size() >= 1);
1351 ceph_assert((*_pins.begin())->get_key() <= loffset);
1352 return seastar::do_with(
1353 std::move(_pins),
1354 loffset,
1355 [ctx, loffset, len, &ret](auto &pins, auto &current) {
1356 return trans_intr::do_for_each(
1357 pins,
1358 [ctx, loffset, len, &current, &ret](auto &pin)
1359 -> read_iertr::future<> {
1360 ceph_assert(current <= (loffset + len));
1361 ceph_assert(
1362 (loffset + len) > pin->get_key());
1363 laddr_t end = std::min(
1364 pin->get_key() + pin->get_length(),
1365 loffset + len);
1366 if (pin->get_val().is_zero()) {
1367 ceph_assert(end > current); // See LBAManager::get_mappings
1368 ret.append_zero(end - current);
1369 current = end;
1370 return seastar::now();
1371 } else {
1372 LOG_PREFIX(ObjectDataHandler::read);
1373 auto key = pin->get_key();
1374 bool is_indirect = pin->is_indirect();
1375 extent_len_t off = pin->get_intermediate_offset();
1376 DEBUGT("reading {}~{}, indirect: {}, "
1377 "intermediate offset: {}, current: {}, end: {}",
1378 ctx.t,
1379 key,
1380 pin->get_length(),
1381 is_indirect,
1382 off,
1383 current,
1384 end);
1385 return ctx.tm.read_pin<ObjectDataBlock>(
1386 ctx.t,
1387 std::move(pin)
1388 ).si_then([&ret, &current, end, key, off,
1389 is_indirect](auto extent) {
1390 ceph_assert(
1391 is_indirect
1392 ? (key - off + extent->get_length()) >= end
1393 : (extent->get_laddr() + extent->get_length()) >= end);
1394 ceph_assert(end > current);
1395 ret.append(
1396 bufferptr(
1397 extent->get_bptr(),
1398 off + current - (is_indirect ? key : extent->get_laddr()),
1399 end - current));
1400 current = end;
1401 return seastar::now();
1402 }).handle_error_interruptible(
1403 read_iertr::pass_further{},
1404 crimson::ct_error::assert_all{
1405 "ObjectDataHandler::read hit invalid error"
1406 }
1407 );
1408 }
1409 });
1410 });
1411 });
1412 }).si_then([&ret] {
1413 return std::move(ret);
1414 });
1415 });
1416 }
1417
1418 ObjectDataHandler::fiemap_ret ObjectDataHandler::fiemap(
1419 context_t ctx,
1420 objaddr_t obj_offset,
1421 extent_len_t len)
1422 {
1423 return seastar::do_with(
1424 std::map<uint64_t, uint64_t>(),
1425 [ctx, obj_offset, len](auto &ret) {
1426 return with_object_data(
1427 ctx,
1428 [ctx, obj_offset, len, &ret](const auto &object_data) {
1429 LOG_PREFIX(ObjectDataHandler::fiemap);
1430 DEBUGT(
1431 "{}~{}, reservation {}~{}",
1432 ctx.t,
1433 obj_offset,
1434 len,
1435 object_data.get_reserved_data_base(),
1436 object_data.get_reserved_data_len());
1437 /* Assumption: callers ensure that onode size is <= reserved
1438 * size and that len is adjusted here prior to call */
1439 ceph_assert(!object_data.is_null());
1440 ceph_assert((obj_offset + len) <= object_data.get_reserved_data_len());
1441 ceph_assert(len > 0);
1442 laddr_t loffset =
1443 object_data.get_reserved_data_base() + obj_offset;
1444 return ctx.tm.get_pins(
1445 ctx.t,
1446 loffset,
1447 len
1448 ).si_then([loffset, len, &object_data, &ret](auto &&pins) {
1449 ceph_assert(pins.size() >= 1);
1450 ceph_assert((*pins.begin())->get_key() <= loffset);
1451 for (auto &&i: pins) {
1452 if (!(i->get_val().is_zero())) {
1453 auto ret_left = std::max(i->get_key(), loffset);
1454 auto ret_right = std::min(
1455 i->get_key() + i->get_length(),
1456 loffset + len);
1457 assert(ret_right > ret_left);
1458 ret.emplace(
1459 std::make_pair(
1460 ret_left - object_data.get_reserved_data_base(),
1461 ret_right - ret_left
1462 ));
1463 }
1464 }
1465 });
1466 }).si_then([&ret] {
1467 return std::move(ret);
1468 });
1469 });
1470 }
1471
1472 ObjectDataHandler::truncate_ret ObjectDataHandler::truncate(
1473 context_t ctx,
1474 objaddr_t offset)
1475 {
1476 return with_object_data(
1477 ctx,
1478 [this, ctx, offset](auto &object_data) {
1479 LOG_PREFIX(ObjectDataHandler::truncate);
1480 DEBUGT("truncating {}~{} offset: {}",
1481 ctx.t,
1482 object_data.get_reserved_data_base(),
1483 object_data.get_reserved_data_len(),
1484 offset);
1485 if (offset < object_data.get_reserved_data_len()) {
1486 return trim_data_reservation(ctx, object_data, offset);
1487 } else if (offset > object_data.get_reserved_data_len()) {
1488 return prepare_data_reservation(
1489 ctx,
1490 object_data,
1491 p2roundup(offset, ctx.tm.get_block_size()));
1492 } else {
1493 return truncate_iertr::now();
1494 }
1495 });
1496 }
1497
1498 ObjectDataHandler::clear_ret ObjectDataHandler::clear(
1499 context_t ctx)
1500 {
1501 return with_object_data(
1502 ctx,
1503 [this, ctx](auto &object_data) {
1504 LOG_PREFIX(ObjectDataHandler::clear);
1505 DEBUGT("clearing: {}~{}",
1506 ctx.t,
1507 object_data.get_reserved_data_base(),
1508 object_data.get_reserved_data_len());
1509 if (object_data.is_null()) {
1510 return clear_iertr::now();
1511 }
1512 return trim_data_reservation(ctx, object_data, 0);
1513 });
1514 }
1515
1516 ObjectDataHandler::clone_ret ObjectDataHandler::clone_extents(
1517 context_t ctx,
1518 object_data_t &object_data,
1519 lba_pin_list_t &pins,
1520 laddr_t data_base)
1521 {
1522 LOG_PREFIX(ObjectDataHandler::clone_extents);
1523 TRACET(" object_data: {}~{}, data_base: {}",
1524 ctx.t,
1525 object_data.get_reserved_data_base(),
1526 object_data.get_reserved_data_len(),
1527 data_base);
1528 return ctx.tm.dec_ref(
1529 ctx.t,
1530 object_data.get_reserved_data_base()
1531 ).si_then(
1532 [&pins, &object_data, ctx, data_base](auto) mutable {
1533 return seastar::do_with(
1534 (extent_len_t)0,
1535 [&object_data, ctx, data_base, &pins](auto &last_pos) {
1536 return trans_intr::do_for_each(
1537 pins,
1538 [&last_pos, &object_data, ctx, data_base](auto &pin) {
1539 auto offset = pin->get_key() - data_base;
1540 ceph_assert(offset == last_pos);
1541 auto fut = TransactionManager::alloc_extent_iertr
1542 ::make_ready_future<LBAMappingRef>();
1543 auto addr = object_data.get_reserved_data_base() + offset;
1544 if (pin->get_val().is_zero()) {
1545 fut = ctx.tm.reserve_region(ctx.t, addr, pin->get_length());
1546 } else {
1547 fut = ctx.tm.clone_pin(ctx.t, addr, *pin);
1548 }
1549 return fut.si_then(
1550 [&pin, &last_pos, offset](auto) {
1551 last_pos = offset + pin->get_length();
1552 return seastar::now();
1553 }).handle_error_interruptible(
1554 crimson::ct_error::input_output_error::pass_further(),
1555 crimson::ct_error::assert_all("not possible")
1556 );
1557 }).si_then([&last_pos, &object_data, ctx] {
1558 if (last_pos != object_data.get_reserved_data_len()) {
1559 return ctx.tm.reserve_region(
1560 ctx.t,
1561 object_data.get_reserved_data_base() + last_pos,
1562 object_data.get_reserved_data_len() - last_pos
1563 ).si_then([](auto) {
1564 return seastar::now();
1565 });
1566 }
1567 return TransactionManager::reserve_extent_iertr::now();
1568 });
1569 });
1570 },
1571 ObjectDataHandler::write_iertr::pass_further{},
1572 crimson::ct_error::assert_all{
1573 "object_data_handler::clone invalid error"
1574 }
1575 );
1576 }
1577
1578 ObjectDataHandler::clone_ret ObjectDataHandler::clone(
1579 context_t ctx)
1580 {
1581 // the whole clone procedure can be seperated into the following steps:
1582 // 1. let clone onode(d_object_data) take the head onode's
1583 // object data base;
1584 // 2. reserve a new region in lba tree for the head onode;
1585 // 3. clone all extents of the clone onode, see transaction_manager.h
1586 // for the details of clone_pin;
1587 // 4. reserve the space between the head onode's size and its reservation
1588 // length.
1589 return with_objects_data(
1590 ctx,
1591 [ctx, this](auto &object_data, auto &d_object_data) {
1592 ceph_assert(d_object_data.is_null());
1593 if (object_data.is_null()) {
1594 return clone_iertr::now();
1595 }
1596 return prepare_data_reservation(
1597 ctx,
1598 d_object_data,
1599 object_data.get_reserved_data_len()
1600 ).si_then([&object_data, &d_object_data, ctx, this] {
1601 assert(!object_data.is_null());
1602 auto base = object_data.get_reserved_data_base();
1603 auto len = object_data.get_reserved_data_len();
1604 object_data.clear();
1605 LOG_PREFIX(ObjectDataHandler::clone);
1606 DEBUGT("cloned obj reserve_data_base: {}, len {}",
1607 ctx.t,
1608 d_object_data.get_reserved_data_base(),
1609 d_object_data.get_reserved_data_len());
1610 return prepare_data_reservation(
1611 ctx,
1612 object_data,
1613 d_object_data.get_reserved_data_len()
1614 ).si_then([&d_object_data, ctx, &object_data, base, len, this] {
1615 LOG_PREFIX("ObjectDataHandler::clone");
1616 DEBUGT("head obj reserve_data_base: {}, len {}",
1617 ctx.t,
1618 object_data.get_reserved_data_base(),
1619 object_data.get_reserved_data_len());
1620 return ctx.tm.get_pins(ctx.t, base, len
1621 ).si_then([ctx, &object_data, &d_object_data, base, this](auto pins) {
1622 return seastar::do_with(
1623 std::move(pins),
1624 [ctx, &object_data, &d_object_data, base, this](auto &pins) {
1625 return clone_extents(ctx, object_data, pins, base
1626 ).si_then([ctx, &d_object_data, base, &pins, this] {
1627 return clone_extents(ctx, d_object_data, pins, base);
1628 }).si_then([&pins, ctx] {
1629 return do_removals(ctx, pins);
1630 });
1631 });
1632 });
1633 });
1634 });
1635 });
1636 }
1637
1638 } // namespace crimson::os::seastore