]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/object_data_handler.cc
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / crimson / os / seastore / object_data_handler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <utility>
5 #include <functional>
6
7 #include "crimson/common/log.h"
8
9 #include "crimson/os/seastore/object_data_handler.h"
10
11 namespace {
12 seastar::logger& logger() {
13 return crimson::get_logger(ceph_subsys_seastore_odata);
14 }
15 }
16
17 SET_SUBSYS(seastore_odata);
18
19 namespace crimson::os::seastore {
20 #define assert_aligned(x) ceph_assert(((x)%ctx.tm.get_block_size()) == 0)
21
22 using context_t = ObjectDataHandler::context_t;
23 using get_iertr = ObjectDataHandler::write_iertr;
24
25 /**
26 * extent_to_write_t
27 *
28 * Encapsulates extents to be written out using do_insertions.
29 * Indicates a zero/existing extent or a data extent based on whether
30 * to_write is populate.
31 * The meaning of existing_paddr is that the new extent to be
32 * written is the part of exising extent on the disk. existing_paddr
33 * must be absolute.
34 */
35 struct extent_to_write_t {
36 enum class type_t {
37 DATA,
38 ZERO,
39 EXISTING,
40 };
41
42 type_t type;
43 laddr_t addr;
44 extent_len_t len;
45 /// non-nullopt if and only if type == DATA
46 std::optional<bufferlist> to_write;
47 /// non-nullopt if and only if type == EXISTING
48 std::optional<paddr_t> existing_paddr;
49
50 extent_to_write_t(const extent_to_write_t &) = default;
51 extent_to_write_t(extent_to_write_t &&) = default;
52
53 bool is_data() const {
54 return type == type_t::DATA;
55 }
56
57 bool is_zero() const {
58 return type == type_t::ZERO;
59 }
60
61 bool is_existing() const {
62 return type == type_t::EXISTING;
63 }
64
65 laddr_t get_end_addr() const {
66 return addr + len;
67 }
68
69 static extent_to_write_t create_data(
70 laddr_t addr, bufferlist to_write) {
71 return extent_to_write_t(addr, to_write);
72 }
73
74 static extent_to_write_t create_zero(
75 laddr_t addr, extent_len_t len) {
76 return extent_to_write_t(addr, len);
77 }
78
79 static extent_to_write_t create_existing(
80 laddr_t addr, paddr_t existing_paddr, extent_len_t len) {
81 return extent_to_write_t(addr, existing_paddr, len);
82 }
83
84 private:
85 extent_to_write_t(laddr_t addr, bufferlist to_write)
86 : type(type_t::DATA), addr(addr), len(to_write.length()),
87 to_write(to_write) {}
88
89 extent_to_write_t(laddr_t addr, extent_len_t len)
90 : type(type_t::ZERO), addr(addr), len(len) {}
91
92 extent_to_write_t(laddr_t addr, paddr_t existing_paddr, extent_len_t len)
93 : type(type_t::EXISTING), addr(addr), len(len),
94 to_write(std::nullopt), existing_paddr(existing_paddr) {}
95 };
96 using extent_to_write_list_t = std::list<extent_to_write_t>;
97
98 /**
99 * append_extent_to_write
100 *
101 * Appends passed extent_to_write_t maintaining invariant that the
102 * list may not contain consecutive zero elements by checking and
103 * combining them.
104 */
105 void append_extent_to_write(
106 extent_to_write_list_t &to_write, extent_to_write_t &&to_append)
107 {
108 assert(to_write.empty() ||
109 to_write.back().get_end_addr() == to_append.addr);
110 if (to_write.empty() ||
111 to_write.back().is_data() ||
112 to_append.is_data() ||
113 to_write.back().type != to_append.type) {
114 to_write.push_back(std::move(to_append));
115 } else {
116 to_write.back().len += to_append.len;
117 }
118 }
119
120 /**
121 * splice_extent_to_write
122 *
123 * splices passed extent_to_write_list_t maintaining invariant that the
124 * list may not contain consecutive zero elements by checking and
125 * combining them.
126 */
127 void splice_extent_to_write(
128 extent_to_write_list_t &to_write, extent_to_write_list_t &&to_splice)
129 {
130 if (!to_splice.empty()) {
131 append_extent_to_write(to_write, std::move(to_splice.front()));
132 to_splice.pop_front();
133 to_write.splice(to_write.end(), std::move(to_splice));
134 }
135 }
136
137 /// Removes extents/mappings in pins
138 ObjectDataHandler::write_ret do_removals(
139 context_t ctx,
140 lba_pin_list_t &pins)
141 {
142 return trans_intr::do_for_each(
143 pins,
144 [ctx](auto &pin) {
145 LOG_PREFIX(object_data_handler.cc::do_removals);
146 DEBUGT("decreasing ref: {}",
147 ctx.t,
148 pin->get_key());
149 return ctx.tm.dec_ref(
150 ctx.t,
151 pin->get_key()
152 ).si_then(
153 [](auto){},
154 ObjectDataHandler::write_iertr::pass_further{},
155 crimson::ct_error::assert_all{
156 "object_data_handler::do_removals invalid error"
157 }
158 );
159 });
160 }
161
162 /// Creates zero/data extents in to_write
163 ObjectDataHandler::write_ret do_insertions(
164 context_t ctx,
165 extent_to_write_list_t &to_write)
166 {
167 return trans_intr::do_for_each(
168 to_write,
169 [ctx](auto &region) {
170 LOG_PREFIX(object_data_handler.cc::do_insertions);
171 if (region.is_data()) {
172 assert_aligned(region.addr);
173 assert_aligned(region.len);
174 ceph_assert(region.len == region.to_write->length());
175 DEBUGT("allocating extent: {}~{}",
176 ctx.t,
177 region.addr,
178 region.len);
179 return ctx.tm.alloc_extent<ObjectDataBlock>(
180 ctx.t,
181 region.addr,
182 region.len
183 ).si_then([&region](auto extent) {
184 if (extent->get_laddr() != region.addr) {
185 logger().debug(
186 "object_data_handler::do_insertions alloc got addr {},"
187 " should have been {}",
188 extent->get_laddr(),
189 region.addr);
190 }
191 ceph_assert(extent->get_laddr() == region.addr);
192 ceph_assert(extent->get_length() == region.len);
193 auto iter = region.to_write->cbegin();
194 iter.copy(region.len, extent->get_bptr().c_str());
195 return ObjectDataHandler::write_iertr::now();
196 });
197 } else if (region.is_zero()) {
198 DEBUGT("reserving: {}~{}",
199 ctx.t,
200 region.addr,
201 region.len);
202 return ctx.tm.reserve_region(
203 ctx.t,
204 region.addr,
205 region.len
206 ).si_then([FNAME, ctx, &region](auto pin) {
207 ceph_assert(pin->get_length() == region.len);
208 if (pin->get_key() != region.addr) {
209 ERRORT(
210 "inconsistent laddr: pin: {} region {}",
211 ctx.t,
212 pin->get_key(),
213 region.addr);
214 }
215 ceph_assert(pin->get_key() == region.addr);
216 return ObjectDataHandler::write_iertr::now();
217 });
218 } else {
219 ceph_assert(region.is_existing());
220 DEBUGT("map existing extent: laddr {} len {} {}",
221 ctx.t, region.addr, region.len, *region.existing_paddr);
222 return ctx.tm.map_existing_extent<ObjectDataBlock>(
223 ctx.t, region.addr, *region.existing_paddr, region.len
224 ).handle_error_interruptible(
225 TransactionManager::alloc_extent_iertr::pass_further{},
226 Device::read_ertr::assert_all{"ignore read error"}
227 ).si_then([FNAME, ctx, &region](auto extent) {
228 if (extent->get_laddr() != region.addr) {
229 ERRORT(
230 "inconsistent laddr: extent: {} region {}",
231 ctx.t,
232 extent->get_laddr(),
233 region.addr);
234 }
235 ceph_assert(extent->get_laddr() == region.addr);
236 return ObjectDataHandler::write_iertr::now();
237 });
238 }
239 });
240 }
241
242 enum class overwrite_operation_t {
243 UNKNOWN,
244 OVERWRITE_ZERO, // fill unaligned data with zero
245 MERGE_EXISTING, // if present, merge data with the clean/pending extent
246 SPLIT_EXISTING, // split the existing extent, and fill unaligned data
247 };
248
249 std::ostream& operator<<(
250 std::ostream &out,
251 const overwrite_operation_t &operation)
252 {
253 switch (operation) {
254 case overwrite_operation_t::UNKNOWN:
255 return out << "UNKNOWN";
256 case overwrite_operation_t::OVERWRITE_ZERO:
257 return out << "OVERWRITE_ZERO";
258 case overwrite_operation_t::MERGE_EXISTING:
259 return out << "MERGE_EXISTING";
260 case overwrite_operation_t::SPLIT_EXISTING:
261 return out << "SPLIT_EXISTING";
262 default:
263 return out << "!IMPOSSIBLE_OPERATION";
264 }
265 }
266
267 /**
268 * overwrite_plan_t
269 *
270 * |<--------------------------pins_size---------------------------------------------->|
271 * pin_begin(aligned) pin_end(aligned)
272 * |<------aligned_data_size-------------------------->| (aligned-bl)
273 * aligned_data_begin aligned_data_end
274 * |<-data_size->| (bl)
275 * data_begin end
276 * left(l) right(r)
277 * |<l_extent_size>|<l_alignment_size>| |<r_alignment_size>|<r_extent_size>|
278 * |<-----------left_size------------>| |<-----------right_size----------->|
279 *
280 * |<-----(existing left extent/pin)----->| |<-----(existing right extent/pin)----->|
281 * left_paddr right_paddr
282 */
283 struct overwrite_plan_t {
284 // addresses
285 laddr_t pin_begin;
286 laddr_t pin_end;
287 paddr_t left_paddr;
288 paddr_t right_paddr;
289 laddr_t data_begin;
290 laddr_t data_end;
291 laddr_t aligned_data_begin;
292 laddr_t aligned_data_end;
293
294 // operations
295 overwrite_operation_t left_operation;
296 overwrite_operation_t right_operation;
297
298 // helper member
299 extent_len_t block_size;
300
301 public:
302 extent_len_t get_left_size() const {
303 return data_begin - pin_begin;
304 }
305
306 extent_len_t get_left_extent_size() const {
307 return aligned_data_begin - pin_begin;
308 }
309
310 extent_len_t get_left_alignment_size() const {
311 return data_begin - aligned_data_begin;
312 }
313
314 extent_len_t get_right_size() const {
315 return pin_end - data_end;
316 }
317
318 extent_len_t get_right_extent_size() const {
319 return pin_end - aligned_data_end;
320 }
321
322 extent_len_t get_right_alignment_size() const {
323 return aligned_data_end - data_end;
324 }
325
326 extent_len_t get_aligned_data_size() const {
327 return aligned_data_end - aligned_data_begin;
328 }
329
330 extent_len_t get_pins_size() const {
331 return pin_end - pin_begin;
332 }
333
334 friend std::ostream& operator<<(
335 std::ostream& out,
336 const overwrite_plan_t& overwrite_plan) {
337 return out << "overwrite_plan_t("
338 << "pin_begin=" << overwrite_plan.pin_begin
339 << ", pin_end=" << overwrite_plan.pin_end
340 << ", left_paddr=" << overwrite_plan.left_paddr
341 << ", right_paddr=" << overwrite_plan.right_paddr
342 << ", data_begin=" << overwrite_plan.data_begin
343 << ", data_end=" << overwrite_plan.data_end
344 << ", aligned_data_begin=" << overwrite_plan.aligned_data_begin
345 << ", aligned_data_end=" << overwrite_plan.aligned_data_end
346 << ", left_operation=" << overwrite_plan.left_operation
347 << ", right_operation=" << overwrite_plan.right_operation
348 << ", block_size=" << overwrite_plan.block_size
349 << ")";
350 }
351
352 overwrite_plan_t(laddr_t offset,
353 extent_len_t len,
354 const lba_pin_list_t& pins,
355 extent_len_t block_size) :
356 pin_begin(pins.front()->get_key()),
357 pin_end(pins.back()->get_key() + pins.back()->get_length()),
358 left_paddr(pins.front()->get_val()),
359 right_paddr(pins.back()->get_val()),
360 data_begin(offset),
361 data_end(offset + len),
362 aligned_data_begin(p2align((uint64_t)data_begin, (uint64_t)block_size)),
363 aligned_data_end(p2roundup((uint64_t)data_end, (uint64_t)block_size)),
364 left_operation(overwrite_operation_t::UNKNOWN),
365 right_operation(overwrite_operation_t::UNKNOWN),
366 block_size(block_size) {
367 validate();
368 evaluate_operations();
369 assert(left_operation != overwrite_operation_t::UNKNOWN);
370 assert(right_operation != overwrite_operation_t::UNKNOWN);
371 }
372
373 private:
374 // refer to overwrite_plan_t description
375 void validate() const {
376 ceph_assert(pin_begin % block_size == 0);
377 ceph_assert(pin_end % block_size == 0);
378 ceph_assert(aligned_data_begin % block_size == 0);
379 ceph_assert(aligned_data_end % block_size == 0);
380
381 ceph_assert(pin_begin <= aligned_data_begin);
382 ceph_assert(aligned_data_begin <= data_begin);
383 ceph_assert(data_begin <= data_end);
384 ceph_assert(data_end <= aligned_data_end);
385 ceph_assert(aligned_data_end <= pin_end);
386 }
387
388 /*
389 * When trying to modify a portion of an object data block, follow
390 * the read-full-extent-then-merge-new-data strategy, if the write
391 * amplification caused by it is not greater than
392 * seastore_obj_data_write_amplification; otherwise, split the
393 * original extent into at most three parts: origin-left, part-to-be-modified
394 * and origin-right.
395 */
396 void evaluate_operations() {
397 auto actual_write_size = get_pins_size();
398 auto aligned_data_size = get_aligned_data_size();
399 auto left_ext_size = get_left_extent_size();
400 auto right_ext_size = get_right_extent_size();
401
402 if (left_paddr.is_zero()) {
403 actual_write_size -= left_ext_size;
404 left_ext_size = 0;
405 left_operation = overwrite_operation_t::OVERWRITE_ZERO;
406 // FIXME: left_paddr can be absolute and pending
407 } else if (left_paddr.is_relative() ||
408 left_paddr.is_delayed()) {
409 aligned_data_size += left_ext_size;
410 left_ext_size = 0;
411 left_operation = overwrite_operation_t::MERGE_EXISTING;
412 }
413
414 if (right_paddr.is_zero()) {
415 actual_write_size -= right_ext_size;
416 right_ext_size = 0;
417 right_operation = overwrite_operation_t::OVERWRITE_ZERO;
418 // FIXME: right_paddr can be absolute and pending
419 } else if (right_paddr.is_relative() ||
420 right_paddr.is_delayed()) {
421 aligned_data_size += right_ext_size;
422 right_ext_size = 0;
423 right_operation = overwrite_operation_t::MERGE_EXISTING;
424 }
425
426 while (left_operation == overwrite_operation_t::UNKNOWN ||
427 right_operation == overwrite_operation_t::UNKNOWN) {
428 if (((double)actual_write_size / (double)aligned_data_size) <=
429 crimson::common::get_conf<double>("seastore_obj_data_write_amplification")) {
430 break;
431 }
432 if (left_ext_size == 0 && right_ext_size == 0) {
433 break;
434 }
435 if (left_ext_size >= right_ext_size) {
436 // split left
437 assert(left_operation == overwrite_operation_t::UNKNOWN);
438 actual_write_size -= left_ext_size;
439 left_ext_size = 0;
440 left_operation = overwrite_operation_t::SPLIT_EXISTING;
441 } else { // left_ext_size < right_ext_size
442 // split right
443 assert(right_operation == overwrite_operation_t::UNKNOWN);
444 actual_write_size -= right_ext_size;
445 right_ext_size = 0;
446 right_operation = overwrite_operation_t::SPLIT_EXISTING;
447 }
448 }
449
450 if (left_operation == overwrite_operation_t::UNKNOWN) {
451 // no split left, so merge with left
452 left_operation = overwrite_operation_t::MERGE_EXISTING;
453 }
454
455 if (right_operation == overwrite_operation_t::UNKNOWN) {
456 // no split right, so merge with right
457 right_operation = overwrite_operation_t::MERGE_EXISTING;
458 }
459 }
460 };
461
462 } // namespace crimson::os::seastore
463
464 #if FMT_VERSION >= 90000
465 template<> struct fmt::formatter<crimson::os::seastore::overwrite_plan_t> : fmt::ostream_formatter {};
466 #endif
467
468 namespace crimson::os::seastore {
469
470 /**
471 * operate_left
472 *
473 * Proceed overwrite_plan.left_operation.
474 */
475 using operate_ret_bare = std::pair<
476 std::optional<extent_to_write_t>,
477 std::optional<bufferptr>>;
478 using operate_ret = get_iertr::future<operate_ret_bare>;
479 operate_ret operate_left(context_t ctx, LBAMappingRef &pin, const overwrite_plan_t &overwrite_plan)
480 {
481 if (overwrite_plan.get_left_size() == 0) {
482 return get_iertr::make_ready_future<operate_ret_bare>(
483 std::nullopt,
484 std::nullopt);
485 }
486
487 if (overwrite_plan.left_operation == overwrite_operation_t::OVERWRITE_ZERO) {
488 assert(pin->get_val().is_zero());
489 auto zero_extent_len = overwrite_plan.get_left_extent_size();
490 assert_aligned(zero_extent_len);
491 auto zero_prepend_len = overwrite_plan.get_left_alignment_size();
492 return get_iertr::make_ready_future<operate_ret_bare>(
493 (zero_extent_len == 0
494 ? std::nullopt
495 : std::make_optional(extent_to_write_t::create_zero(
496 overwrite_plan.pin_begin, zero_extent_len))),
497 (zero_prepend_len == 0
498 ? std::nullopt
499 : std::make_optional(bufferptr(
500 ceph::buffer::create(zero_prepend_len, 0))))
501 );
502 } else if (overwrite_plan.left_operation == overwrite_operation_t::MERGE_EXISTING) {
503 auto prepend_len = overwrite_plan.get_left_size();
504 if (prepend_len == 0) {
505 return get_iertr::make_ready_future<operate_ret_bare>(
506 std::nullopt,
507 std::nullopt);
508 } else {
509 return ctx.tm.read_pin<ObjectDataBlock>(
510 ctx.t, pin->duplicate()
511 ).si_then([prepend_len](auto left_extent) {
512 return get_iertr::make_ready_future<operate_ret_bare>(
513 std::nullopt,
514 std::make_optional(bufferptr(
515 left_extent->get_bptr(),
516 0,
517 prepend_len)));
518 });
519 }
520 } else {
521 assert(overwrite_plan.left_operation == overwrite_operation_t::SPLIT_EXISTING);
522
523 auto extent_len = overwrite_plan.get_left_extent_size();
524 assert(extent_len);
525 std::optional<extent_to_write_t> left_to_write_extent =
526 std::make_optional(extent_to_write_t::create_existing(
527 overwrite_plan.pin_begin,
528 overwrite_plan.left_paddr,
529 extent_len));
530
531 auto prepend_len = overwrite_plan.get_left_alignment_size();
532 if (prepend_len == 0) {
533 return get_iertr::make_ready_future<operate_ret_bare>(
534 left_to_write_extent,
535 std::nullopt);
536 } else {
537 return ctx.tm.read_pin<ObjectDataBlock>(
538 ctx.t, pin->duplicate()
539 ).si_then([prepend_offset=extent_len, prepend_len,
540 left_to_write_extent=std::move(left_to_write_extent)]
541 (auto left_extent) mutable {
542 return get_iertr::make_ready_future<operate_ret_bare>(
543 left_to_write_extent,
544 std::make_optional(bufferptr(
545 left_extent->get_bptr(),
546 prepend_offset,
547 prepend_len)));
548 });
549 }
550 }
551 };
552
553 /**
554 * operate_right
555 *
556 * Proceed overwrite_plan.right_operation.
557 */
558 operate_ret operate_right(context_t ctx, LBAMappingRef &pin, const overwrite_plan_t &overwrite_plan)
559 {
560 if (overwrite_plan.get_right_size() == 0) {
561 return get_iertr::make_ready_future<operate_ret_bare>(
562 std::nullopt,
563 std::nullopt);
564 }
565
566 auto right_pin_begin = pin->get_key();
567 assert(overwrite_plan.data_end >= right_pin_begin);
568 if (overwrite_plan.right_operation == overwrite_operation_t::OVERWRITE_ZERO) {
569 assert(pin->get_val().is_zero());
570 auto zero_suffix_len = overwrite_plan.get_right_alignment_size();
571 auto zero_extent_len = overwrite_plan.get_right_extent_size();
572 assert_aligned(zero_extent_len);
573 return get_iertr::make_ready_future<operate_ret_bare>(
574 (zero_extent_len == 0
575 ? std::nullopt
576 : std::make_optional(extent_to_write_t::create_zero(
577 overwrite_plan.aligned_data_end, zero_extent_len))),
578 (zero_suffix_len == 0
579 ? std::nullopt
580 : std::make_optional(bufferptr(
581 ceph::buffer::create(zero_suffix_len, 0))))
582 );
583 } else if (overwrite_plan.right_operation == overwrite_operation_t::MERGE_EXISTING) {
584 auto append_len = overwrite_plan.get_right_size();
585 if (append_len == 0) {
586 return get_iertr::make_ready_future<operate_ret_bare>(
587 std::nullopt,
588 std::nullopt);
589 } else {
590 auto append_offset = overwrite_plan.data_end - right_pin_begin;
591 return ctx.tm.read_pin<ObjectDataBlock>(
592 ctx.t, pin->duplicate()
593 ).si_then([append_offset, append_len](auto right_extent) {
594 return get_iertr::make_ready_future<operate_ret_bare>(
595 std::nullopt,
596 std::make_optional(bufferptr(
597 right_extent->get_bptr(),
598 append_offset,
599 append_len)));
600 });
601 }
602 } else {
603 assert(overwrite_plan.right_operation == overwrite_operation_t::SPLIT_EXISTING);
604
605 auto extent_len = overwrite_plan.get_right_extent_size();
606 assert(extent_len);
607 std::optional<extent_to_write_t> right_to_write_extent =
608 std::make_optional(extent_to_write_t::create_existing(
609 overwrite_plan.aligned_data_end,
610 overwrite_plan.right_paddr.add_offset(overwrite_plan.aligned_data_end - right_pin_begin),
611 extent_len));
612
613 auto append_len = overwrite_plan.get_right_alignment_size();
614 if (append_len == 0) {
615 return get_iertr::make_ready_future<operate_ret_bare>(
616 right_to_write_extent,
617 std::nullopt);
618 } else {
619 auto append_offset = overwrite_plan.data_end - right_pin_begin;
620 return ctx.tm.read_pin<ObjectDataBlock>(
621 ctx.t, pin->duplicate()
622 ).si_then([append_offset, append_len,
623 right_to_write_extent=std::move(right_to_write_extent)]
624 (auto right_extent) mutable {
625 return get_iertr::make_ready_future<operate_ret_bare>(
626 right_to_write_extent,
627 std::make_optional(bufferptr(
628 right_extent->get_bptr(),
629 append_offset,
630 append_len)));
631 });
632 }
633 }
634 };
635
636 template <typename F>
637 auto with_object_data(
638 ObjectDataHandler::context_t ctx,
639 F &&f)
640 {
641 return seastar::do_with(
642 ctx.onode.get_layout().object_data.get(),
643 std::forward<F>(f),
644 [ctx](auto &object_data, auto &f) {
645 return std::invoke(f, object_data
646 ).si_then([ctx, &object_data] {
647 if (object_data.must_update()) {
648 ctx.onode.get_mutable_layout(ctx.t).object_data.update(object_data);
649 }
650 return seastar::now();
651 });
652 });
653 }
654
655 ObjectDataHandler::write_ret ObjectDataHandler::prepare_data_reservation(
656 context_t ctx,
657 object_data_t &object_data,
658 extent_len_t size)
659 {
660 LOG_PREFIX(ObjectDataHandler::prepare_data_reservation);
661 ceph_assert(size <= max_object_size);
662 if (!object_data.is_null()) {
663 ceph_assert(object_data.get_reserved_data_len() == max_object_size);
664 DEBUGT("reservation present: {}~{}",
665 ctx.t,
666 object_data.get_reserved_data_base(),
667 object_data.get_reserved_data_len());
668 return write_iertr::now();
669 } else {
670 DEBUGT("reserving: {}~{}",
671 ctx.t,
672 ctx.onode.get_data_hint(),
673 max_object_size);
674 return ctx.tm.reserve_region(
675 ctx.t,
676 ctx.onode.get_data_hint(),
677 max_object_size
678 ).si_then([max_object_size=max_object_size, &object_data](auto pin) {
679 ceph_assert(pin->get_length() == max_object_size);
680 object_data.update_reserved(
681 pin->get_key(),
682 pin->get_length());
683 return write_iertr::now();
684 });
685 }
686 }
687
688 ObjectDataHandler::clear_ret ObjectDataHandler::trim_data_reservation(
689 context_t ctx, object_data_t &object_data, extent_len_t size)
690 {
691 ceph_assert(!object_data.is_null());
692 ceph_assert(size <= object_data.get_reserved_data_len());
693 return seastar::do_with(
694 lba_pin_list_t(),
695 extent_to_write_list_t(),
696 [ctx, size, &object_data](auto &pins, auto &to_write) {
697 LOG_PREFIX(ObjectDataHandler::trim_data_reservation);
698 DEBUGT("object_data: {}~{}",
699 ctx.t,
700 object_data.get_reserved_data_base(),
701 object_data.get_reserved_data_len());
702 return ctx.tm.get_pins(
703 ctx.t,
704 object_data.get_reserved_data_base() + size,
705 object_data.get_reserved_data_len() - size
706 ).si_then([ctx, size, &pins, &object_data, &to_write](auto _pins) {
707 _pins.swap(pins);
708 ceph_assert(pins.size());
709 auto &pin = *pins.front();
710 ceph_assert(pin.get_key() >= object_data.get_reserved_data_base());
711 ceph_assert(
712 pin.get_key() <= object_data.get_reserved_data_base() + size);
713 auto pin_offset = pin.get_key() -
714 object_data.get_reserved_data_base();
715 if ((pin.get_key() == (object_data.get_reserved_data_base() + size)) ||
716 (pin.get_val().is_zero())) {
717 /* First pin is exactly at the boundary or is a zero pin. Either way,
718 * remove all pins and add a single zero pin to the end. */
719 to_write.push_back(extent_to_write_t::create_zero(
720 pin.get_key(),
721 object_data.get_reserved_data_len() - pin_offset));
722 return clear_iertr::now();
723 } else {
724 /* First pin overlaps the boundary and has data, read in extent
725 * and rewrite portion prior to size */
726 return ctx.tm.read_pin<ObjectDataBlock>(
727 ctx.t,
728 pin.duplicate()
729 ).si_then([ctx, size, pin_offset, &pin, &object_data, &to_write](
730 auto extent) {
731 bufferlist bl;
732 bl.append(
733 bufferptr(
734 extent->get_bptr(),
735 0,
736 size - pin_offset
737 ));
738 bl.append_zero(p2roundup(size, ctx.tm.get_block_size()) - size);
739 to_write.push_back(extent_to_write_t::create_data(
740 pin.get_key(),
741 bl));
742 to_write.push_back(extent_to_write_t::create_zero(
743 object_data.get_reserved_data_base() +
744 p2roundup(size, ctx.tm.get_block_size()),
745 object_data.get_reserved_data_len() -
746 p2roundup(size, ctx.tm.get_block_size())));
747 return clear_iertr::now();
748 });
749 }
750 }).si_then([ctx, &pins] {
751 return do_removals(ctx, pins);
752 }).si_then([ctx, &to_write] {
753 return do_insertions(ctx, to_write);
754 }).si_then([size, &object_data] {
755 if (size == 0) {
756 object_data.clear();
757 }
758 return ObjectDataHandler::clear_iertr::now();
759 });
760 });
761 }
762
763 /**
764 * get_to_writes_with_zero_buffer
765 *
766 * Returns extent_to_write_t's reflecting a zero region extending
767 * from offset~len with headptr optionally on the left and tailptr
768 * optionally on the right.
769 */
770 extent_to_write_list_t get_to_writes_with_zero_buffer(
771 const extent_len_t block_size,
772 laddr_t offset, extent_len_t len,
773 std::optional<bufferptr> &&headptr, std::optional<bufferptr> &&tailptr)
774 {
775 auto zero_left = p2roundup(offset, (laddr_t)block_size);
776 auto zero_right = p2align(offset + len, (laddr_t)block_size);
777 auto left = headptr ? (offset - headptr->length()) : offset;
778 auto right = tailptr ?
779 (offset + len + tailptr->length()) :
780 (offset + len);
781
782 assert(
783 (headptr && ((zero_left - left) ==
784 p2roundup(headptr->length(), block_size))) ^
785 (!headptr && (zero_left == left)));
786 assert(
787 (tailptr && ((right - zero_right) ==
788 p2roundup(tailptr->length(), block_size))) ^
789 (!tailptr && (right == zero_right)));
790
791 assert(right > left);
792 assert((left % block_size) == 0);
793 assert((right % block_size) == 0);
794
795 // zero region too small for a reserved section,
796 // headptr and tailptr in same extent
797 if (zero_right <= zero_left) {
798 bufferlist bl;
799 if (headptr) {
800 bl.append(*headptr);
801 }
802 bl.append_zero(
803 right - left - bl.length() - (tailptr ? tailptr->length() : 0));
804 if (tailptr) {
805 bl.append(*tailptr);
806 }
807 assert(bl.length() % block_size == 0);
808 assert(bl.length() == (right - left));
809 return {extent_to_write_t::create_data(left, bl)};
810 } else {
811 // reserved section between ends, headptr and tailptr in different extents
812 extent_to_write_list_t ret;
813 if (headptr) {
814 bufferlist headbl;
815 headbl.append(*headptr);
816 headbl.append_zero(zero_left - left - headbl.length());
817 assert(headbl.length() % block_size == 0);
818 assert(headbl.length() > 0);
819 ret.push_back(extent_to_write_t::create_data(left, headbl));
820 }
821 // reserved zero region
822 ret.push_back(extent_to_write_t::create_zero(zero_left, zero_right - zero_left));
823 assert(ret.back().len % block_size == 0);
824 assert(ret.back().len > 0);
825 if (tailptr) {
826 bufferlist tailbl;
827 tailbl.append(*tailptr);
828 tailbl.append_zero(right - zero_right - tailbl.length());
829 assert(tailbl.length() % block_size == 0);
830 assert(tailbl.length() > 0);
831 ret.push_back(extent_to_write_t::create_data(zero_right, tailbl));
832 }
833 return ret;
834 }
835 }
836
837 /**
838 * get_to_writes
839 *
840 * Returns extent_to_write_t's from bl.
841 *
842 * TODO: probably add some kind of upper limit on extent size.
843 */
844 extent_to_write_list_t get_to_writes(laddr_t offset, bufferlist &bl)
845 {
846 auto ret = extent_to_write_list_t();
847 ret.push_back(extent_to_write_t::create_data(offset, bl));
848 return ret;
849 };
850
851 ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
852 context_t ctx,
853 laddr_t offset,
854 extent_len_t len,
855 std::optional<bufferlist> &&bl,
856 lba_pin_list_t &&_pins)
857 {
858 if (bl.has_value()) {
859 assert(bl->length() == len);
860 }
861 overwrite_plan_t overwrite_plan(offset, len, _pins, ctx.tm.get_block_size());
862 return seastar::do_with(
863 std::move(_pins),
864 extent_to_write_list_t(),
865 [ctx, len, offset, overwrite_plan, bl=std::move(bl)]
866 (auto &pins, auto &to_write) mutable
867 {
868 LOG_PREFIX(ObjectDataHandler::overwrite);
869 DEBUGT("overwrite: {}~{}",
870 ctx.t,
871 offset,
872 len);
873 ceph_assert(pins.size() >= 1);
874 DEBUGT("overwrite: split overwrite_plan {}", ctx.t, overwrite_plan);
875
876 return operate_left(
877 ctx,
878 pins.front(),
879 overwrite_plan
880 ).si_then([ctx, len, offset, overwrite_plan, bl=std::move(bl),
881 &to_write, &pins](auto p) mutable {
882 auto &[left_extent, headptr] = p;
883 if (left_extent) {
884 ceph_assert(left_extent->addr == overwrite_plan.pin_begin);
885 append_extent_to_write(to_write, std::move(*left_extent));
886 }
887 if (headptr) {
888 assert(headptr->length() > 0);
889 }
890 return operate_right(
891 ctx,
892 pins.back(),
893 overwrite_plan
894 ).si_then([ctx, len, offset,
895 pin_begin=overwrite_plan.pin_begin,
896 pin_end=overwrite_plan.pin_end,
897 bl=std::move(bl), headptr=std::move(headptr),
898 &to_write, &pins](auto p) mutable {
899 auto &[right_extent, tailptr] = p;
900 if (bl.has_value()) {
901 auto write_offset = offset;
902 bufferlist write_bl;
903 if (headptr) {
904 write_bl.append(*headptr);
905 write_offset -= headptr->length();
906 assert_aligned(write_offset);
907 }
908 write_bl.claim_append(*bl);
909 if (tailptr) {
910 write_bl.append(*tailptr);
911 assert_aligned(write_bl.length());
912 }
913 splice_extent_to_write(
914 to_write,
915 get_to_writes(write_offset, write_bl));
916 } else {
917 splice_extent_to_write(
918 to_write,
919 get_to_writes_with_zero_buffer(
920 ctx.tm.get_block_size(),
921 offset,
922 len,
923 std::move(headptr),
924 std::move(tailptr)));
925 }
926 if (right_extent) {
927 ceph_assert(right_extent->get_end_addr() == pin_end);
928 append_extent_to_write(to_write, std::move(*right_extent));
929 }
930 assert(to_write.size());
931 assert(pin_begin == to_write.front().addr);
932 assert(pin_end == to_write.back().get_end_addr());
933
934 return do_removals(ctx, pins);
935 }).si_then([ctx, &to_write] {
936 return do_insertions(ctx, to_write);
937 });
938 });
939 });
940 }
941
942 ObjectDataHandler::zero_ret ObjectDataHandler::zero(
943 context_t ctx,
944 objaddr_t offset,
945 extent_len_t len)
946 {
947 return with_object_data(
948 ctx,
949 [this, ctx, offset, len](auto &object_data) {
950 LOG_PREFIX(ObjectDataHandler::zero);
951 DEBUGT("zero to {}~{}, object_data: {}~{}, is_null {}",
952 ctx.t,
953 offset,
954 len,
955 object_data.get_reserved_data_base(),
956 object_data.get_reserved_data_len(),
957 object_data.is_null());
958 return prepare_data_reservation(
959 ctx,
960 object_data,
961 p2roundup(offset + len, ctx.tm.get_block_size())
962 ).si_then([this, ctx, offset, len, &object_data] {
963 auto logical_offset = object_data.get_reserved_data_base() + offset;
964 return ctx.tm.get_pins(
965 ctx.t,
966 logical_offset,
967 len
968 ).si_then([this, ctx, logical_offset, len](auto pins) {
969 return overwrite(
970 ctx, logical_offset, len,
971 std::nullopt, std::move(pins));
972 });
973 });
974 });
975 }
976
977 ObjectDataHandler::write_ret ObjectDataHandler::write(
978 context_t ctx,
979 objaddr_t offset,
980 const bufferlist &bl)
981 {
982 return with_object_data(
983 ctx,
984 [this, ctx, offset, &bl](auto &object_data) {
985 LOG_PREFIX(ObjectDataHandler::write);
986 DEBUGT("writing to {}~{}, object_data: {}~{}, is_null {}",
987 ctx.t,
988 offset,
989 bl.length(),
990 object_data.get_reserved_data_base(),
991 object_data.get_reserved_data_len(),
992 object_data.is_null());
993 return prepare_data_reservation(
994 ctx,
995 object_data,
996 p2roundup(offset + bl.length(), ctx.tm.get_block_size())
997 ).si_then([this, ctx, offset, &object_data, &bl] {
998 auto logical_offset = object_data.get_reserved_data_base() + offset;
999 return ctx.tm.get_pins(
1000 ctx.t,
1001 logical_offset,
1002 bl.length()
1003 ).si_then([this, ctx,logical_offset, &bl](
1004 auto pins) {
1005 return overwrite(
1006 ctx, logical_offset, bl.length(),
1007 bufferlist(bl), std::move(pins));
1008 });
1009 });
1010 });
1011 }
1012
1013 ObjectDataHandler::read_ret ObjectDataHandler::read(
1014 context_t ctx,
1015 objaddr_t obj_offset,
1016 extent_len_t len)
1017 {
1018 return seastar::do_with(
1019 bufferlist(),
1020 [ctx, obj_offset, len](auto &ret) {
1021 return with_object_data(
1022 ctx,
1023 [ctx, obj_offset, len, &ret](const auto &object_data) {
1024 LOG_PREFIX(ObjectDataHandler::read);
1025 DEBUGT("reading {}~{}",
1026 ctx.t,
1027 object_data.get_reserved_data_base(),
1028 object_data.get_reserved_data_len());
1029 /* Assumption: callers ensure that onode size is <= reserved
1030 * size and that len is adjusted here prior to call */
1031 ceph_assert(!object_data.is_null());
1032 ceph_assert((obj_offset + len) <= object_data.get_reserved_data_len());
1033 ceph_assert(len > 0);
1034 laddr_t loffset =
1035 object_data.get_reserved_data_base() + obj_offset;
1036 return ctx.tm.get_pins(
1037 ctx.t,
1038 loffset,
1039 len
1040 ).si_then([ctx, loffset, len, &ret](auto _pins) {
1041 // offset~len falls within reserved region and len > 0
1042 ceph_assert(_pins.size() >= 1);
1043 ceph_assert((*_pins.begin())->get_key() <= loffset);
1044 return seastar::do_with(
1045 std::move(_pins),
1046 loffset,
1047 [ctx, loffset, len, &ret](auto &pins, auto &current) {
1048 return trans_intr::do_for_each(
1049 pins,
1050 [ctx, loffset, len, &current, &ret](auto &pin)
1051 -> read_iertr::future<> {
1052 ceph_assert(current <= (loffset + len));
1053 ceph_assert(
1054 (loffset + len) > pin->get_key());
1055 laddr_t end = std::min(
1056 pin->get_key() + pin->get_length(),
1057 loffset + len);
1058 if (pin->get_val().is_zero()) {
1059 ceph_assert(end > current); // See LBAManager::get_mappings
1060 ret.append_zero(end - current);
1061 current = end;
1062 return seastar::now();
1063 } else {
1064 return ctx.tm.read_pin<ObjectDataBlock>(
1065 ctx.t,
1066 std::move(pin)
1067 ).si_then([&ret, &current, end](auto extent) {
1068 ceph_assert(
1069 (extent->get_laddr() + extent->get_length()) >= end);
1070 ceph_assert(end > current);
1071 ret.append(
1072 bufferptr(
1073 extent->get_bptr(),
1074 current - extent->get_laddr(),
1075 end - current));
1076 current = end;
1077 return seastar::now();
1078 }).handle_error_interruptible(
1079 read_iertr::pass_further{},
1080 crimson::ct_error::assert_all{
1081 "ObjectDataHandler::read hit invalid error"
1082 }
1083 );
1084 }
1085 });
1086 });
1087 });
1088 }).si_then([&ret] {
1089 return std::move(ret);
1090 });
1091 });
1092 }
1093
1094 ObjectDataHandler::fiemap_ret ObjectDataHandler::fiemap(
1095 context_t ctx,
1096 objaddr_t obj_offset,
1097 extent_len_t len)
1098 {
1099 return seastar::do_with(
1100 std::map<uint64_t, uint64_t>(),
1101 [ctx, obj_offset, len](auto &ret) {
1102 return with_object_data(
1103 ctx,
1104 [ctx, obj_offset, len, &ret](const auto &object_data) {
1105 LOG_PREFIX(ObjectDataHandler::fiemap);
1106 DEBUGT(
1107 "{}~{}, reservation {}~{}",
1108 ctx.t,
1109 obj_offset,
1110 len,
1111 object_data.get_reserved_data_base(),
1112 object_data.get_reserved_data_len());
1113 /* Assumption: callers ensure that onode size is <= reserved
1114 * size and that len is adjusted here prior to call */
1115 ceph_assert(!object_data.is_null());
1116 ceph_assert((obj_offset + len) <= object_data.get_reserved_data_len());
1117 ceph_assert(len > 0);
1118 laddr_t loffset =
1119 object_data.get_reserved_data_base() + obj_offset;
1120 return ctx.tm.get_pins(
1121 ctx.t,
1122 loffset,
1123 len
1124 ).si_then([loffset, len, &object_data, &ret](auto &&pins) {
1125 ceph_assert(pins.size() >= 1);
1126 ceph_assert((*pins.begin())->get_key() <= loffset);
1127 for (auto &&i: pins) {
1128 if (!(i->get_val().is_zero())) {
1129 auto ret_left = std::max(i->get_key(), loffset);
1130 auto ret_right = std::min(
1131 i->get_key() + i->get_length(),
1132 loffset + len);
1133 assert(ret_right > ret_left);
1134 ret.emplace(
1135 std::make_pair(
1136 ret_left - object_data.get_reserved_data_base(),
1137 ret_right - ret_left
1138 ));
1139 }
1140 }
1141 });
1142 }).si_then([&ret] {
1143 return std::move(ret);
1144 });
1145 });
1146 }
1147
1148 ObjectDataHandler::truncate_ret ObjectDataHandler::truncate(
1149 context_t ctx,
1150 objaddr_t offset)
1151 {
1152 return with_object_data(
1153 ctx,
1154 [this, ctx, offset](auto &object_data) {
1155 LOG_PREFIX(ObjectDataHandler::truncate);
1156 DEBUGT("truncating {}~{} offset: {}",
1157 ctx.t,
1158 object_data.get_reserved_data_base(),
1159 object_data.get_reserved_data_len(),
1160 offset);
1161 if (offset < object_data.get_reserved_data_len()) {
1162 return trim_data_reservation(ctx, object_data, offset);
1163 } else if (offset > object_data.get_reserved_data_len()) {
1164 return prepare_data_reservation(
1165 ctx,
1166 object_data,
1167 p2roundup(offset, ctx.tm.get_block_size()));
1168 } else {
1169 return truncate_iertr::now();
1170 }
1171 });
1172 }
1173
1174 ObjectDataHandler::clear_ret ObjectDataHandler::clear(
1175 context_t ctx)
1176 {
1177 return with_object_data(
1178 ctx,
1179 [this, ctx](auto &object_data) {
1180 LOG_PREFIX(ObjectDataHandler::clear);
1181 DEBUGT("clearing: {}~{}",
1182 ctx.t,
1183 object_data.get_reserved_data_base(),
1184 object_data.get_reserved_data_len());
1185 if (object_data.is_null()) {
1186 return clear_iertr::now();
1187 }
1188 return trim_data_reservation(ctx, object_data, 0);
1189 });
1190 }
1191
1192 } // namespace crimson::os::seastore