/**
* extent_to_write_t
*
- * Encapsulates extents to be written out using do_insertions.
+ * Encapsulates smallest write operations in overwrite.
* Indicates a zero/existing extent or a data extent based on whether
* to_write is populate.
- * The meaning of existing_paddr is that the new extent to be
- * written is the part of exising extent on the disk. existing_paddr
- * must be absolute.
+ * Should be handled by prepare_ops_list.
*/
struct extent_to_write_t {
enum class type_t {
ZERO,
EXISTING,
};
-
type_t type;
+
+ /// pin of original extent, not nullptr if type == EXISTING
+ LBAMappingRef pin;
+
laddr_t addr;
extent_len_t len;
+
/// non-nullopt if and only if type == DATA
std::optional<bufferlist> to_write;
- /// non-nullopt if and only if type == EXISTING
- std::optional<paddr_t> existing_paddr;
- extent_to_write_t(const extent_to_write_t &) = default;
+ extent_to_write_t(const extent_to_write_t &) = delete;
extent_to_write_t(extent_to_write_t &&) = default;
bool is_data() const {
}
static extent_to_write_t create_zero(
- laddr_t addr, extent_len_t len) {
+ laddr_t addr, extent_len_t len) {
return extent_to_write_t(addr, len);
}
static extent_to_write_t create_existing(
- laddr_t addr, paddr_t existing_paddr, extent_len_t len) {
- return extent_to_write_t(addr, existing_paddr, len);
+ LBAMappingRef &&pin, laddr_t addr, extent_len_t len) {
+ assert(pin);
+ return extent_to_write_t(std::move(pin), addr, len);
}
private:
extent_to_write_t(laddr_t addr, extent_len_t len)
: type(type_t::ZERO), addr(addr), len(len) {}
- extent_to_write_t(laddr_t addr, paddr_t existing_paddr, extent_len_t len)
- : type(type_t::EXISTING), addr(addr), len(len),
- to_write(std::nullopt), existing_paddr(existing_paddr) {}
+ extent_to_write_t(LBAMappingRef &&pin, laddr_t addr, extent_len_t len)
+ : type(type_t::EXISTING), pin(std::move(pin)), addr(addr), len(len) {}
};
using extent_to_write_list_t = std::list<extent_to_write_t>;
+// Encapsulates extents to be written out using do_remappings.
+struct extent_to_remap_t {
+ enum class type_t {
+ REMAP,
+ OVERWRITE
+ };
+ type_t type;
+ /// pin of original extent
+ LBAMappingRef pin;
+ /// offset of remapped extent or overwrite part of overwrite extent.
+ /// overwrite part of overwrite extent might correspond to mutiple
+ /// fresh write extent.
+ extent_len_t new_offset;
+ /// length of remapped extent or overwrite part of overwrite extent
+ extent_len_t new_len;
+
+ extent_to_remap_t(const extent_to_remap_t &) = delete;
+ extent_to_remap_t(extent_to_remap_t &&) = default;
+
+ bool is_remap() const {
+ return type == type_t::REMAP;
+ }
+
+ bool is_overwrite() const {
+ assert((new_offset != 0) && (pin->get_length() != new_offset + new_len));
+ return type == type_t::OVERWRITE;
+ }
+
+ using remap_entry = TransactionManager::remap_entry;
+ remap_entry create_remap_entry() {
+ assert(is_remap());
+ return remap_entry(
+ new_offset,
+ new_len);
+ }
+
+ remap_entry create_left_remap_entry() {
+ assert(is_overwrite());
+ return remap_entry(
+ 0,
+ new_offset);
+ }
+
+ remap_entry create_right_remap_entry() {
+ assert(is_overwrite());
+ return remap_entry(
+ new_offset + new_len,
+ pin->get_length() - new_offset - new_len);
+ }
+
+ static extent_to_remap_t create_remap(
+ LBAMappingRef &&pin, extent_len_t new_offset, extent_len_t new_len) {
+ return extent_to_remap_t(type_t::REMAP,
+ std::move(pin), new_offset, new_len);
+ }
+
+ static extent_to_remap_t create_overwrite(
+ LBAMappingRef &&pin, extent_len_t new_offset, extent_len_t new_len) {
+ return extent_to_remap_t(type_t::OVERWRITE,
+ std::move(pin), new_offset, new_len);
+ }
+
+private:
+ extent_to_remap_t(type_t type,
+ LBAMappingRef &&pin, extent_len_t new_offset, extent_len_t new_len)
+ : type(type),
+ pin(std::move(pin)), new_offset(new_offset), new_len(new_len) {}
+};
+using extent_to_remap_list_t = std::list<extent_to_remap_t>;
+
+// Encapsulates extents to be written out using do_insertions.
+struct extent_to_insert_t {
+ enum class type_t {
+ DATA,
+ ZERO
+ };
+ type_t type;
+ /// laddr of new extent
+ laddr_t addr;
+ /// length of new extent
+ extent_len_t len;
+ /// non-nullopt if type == DATA
+ std::optional<bufferlist> bl;
+
+ extent_to_insert_t(const extent_to_insert_t &) = default;
+ extent_to_insert_t(extent_to_insert_t &&) = default;
+
+ bool is_data() const {
+ return type == type_t::DATA;
+ }
+
+ bool is_zero() const {
+ return type == type_t::ZERO;
+ }
+
+ static extent_to_insert_t create_data(
+ laddr_t addr, extent_len_t len, std::optional<bufferlist> bl) {
+ return extent_to_insert_t(addr, len, bl);
+ }
+
+ static extent_to_insert_t create_zero(
+ laddr_t addr, extent_len_t len) {
+ return extent_to_insert_t(addr, len);
+ }
+
+private:
+ extent_to_insert_t(laddr_t addr, extent_len_t len,
+ std::optional<bufferlist> bl)
+ :type(type_t::DATA), addr(addr), len(len), bl(bl) {}
+
+ extent_to_insert_t(laddr_t addr, extent_len_t len)
+ :type(type_t::ZERO), addr(addr), len(len) {}
+};
+using extent_to_insert_list_t = std::list<extent_to_insert_t>;
+
+// Encapsulates extents to be retired in do_removals.
+using extent_to_remove_list_t = std::list<LBAMappingRef>;
+
+struct overwrite_ops_t {
+ extent_to_remap_list_t to_remap;
+ extent_to_insert_list_t to_insert;
+ extent_to_remove_list_t to_remove;
+};
+
+// prepare to_remap, to_retire, to_insert list
+overwrite_ops_t prepare_ops_list(
+ lba_pin_list_t &pins_to_remove,
+ extent_to_write_list_t &to_write) {
+ assert(pins_to_remove.size() != 0);
+ overwrite_ops_t ops;
+ ops.to_remove.swap(pins_to_remove);
+ if (to_write.empty()) {
+ logger().debug("empty to_write");
+ return ops;
+ }
+ long unsigned int visitted = 0;
+ auto& front = to_write.front();
+ auto& back = to_write.back();
+
+ // prepare overwrite, happens in one original extent.
+ if (ops.to_remove.size() == 1 &&
+ front.is_existing() && back.is_existing()) {
+ visitted += 2;
+ assert(to_write.size() > 2);
+ assert(front.addr == front.pin->get_key());
+ assert(back.addr > back.pin->get_key());
+ ops.to_remap.push_back(extent_to_remap_t::create_overwrite(
+ std::move(front.pin),
+ front.len,
+ back.addr - front.addr - front.len));
+ ops.to_remove.pop_front();
+ } else {
+ // prepare to_remap, happens in one or multiple extents
+ if (front.is_existing()) {
+ visitted++;
+ assert(to_write.size() > 1);
+ assert(front.addr == front.pin->get_key());
+ ops.to_remap.push_back(extent_to_remap_t::create_remap(
+ std::move(front.pin),
+ 0,
+ front.len));
+ ops.to_remove.pop_front();
+ }
+ if (back.is_existing()) {
+ visitted++;
+ assert(to_write.size() > 1);
+ assert(back.addr + back.len ==
+ back.pin->get_key() + back.pin->get_length());
+ ops.to_remap.push_back(extent_to_remap_t::create_remap(
+ std::move(back.pin),
+ back.addr - back.pin->get_key(),
+ back.len));
+ ops.to_remove.pop_back();
+ }
+ }
+
+ // prepare to_insert
+ for (auto ®ion : to_write) {
+ if (region.is_data()) {
+ visitted++;
+ assert(region.to_write.has_value());
+ ops.to_insert.push_back(extent_to_insert_t::create_data(
+ region.addr, region.len, region.to_write));
+ } else if (region.is_zero()) {
+ visitted++;
+ assert(!(region.to_write.has_value()));
+ ops.to_insert.push_back(extent_to_insert_t::create_zero(
+ region.addr, region.len));
+ }
+ }
+
+ logger().debug(
+ "to_remap list size: {}"
+ " to_insert list size: {}"
+ " to_remove list size: {}",
+ ops.to_remap.size(), ops.to_insert.size(), ops.to_remove.size());
+ assert(visitted == to_write.size());
+ return ops;
+}
+
/**
* append_extent_to_write
*
}
}
-/// Removes extents/mappings in pins
+/// Creates remap extents in to_remap
+ObjectDataHandler::write_ret do_remappings(
+ context_t ctx,
+ extent_to_remap_list_t &to_remap)
+{
+ return trans_intr::do_for_each(
+ to_remap,
+ [ctx](auto ®ion) {
+ if (region.is_remap()) {
+ return ctx.tm.remap_pin<ObjectDataBlock, 1>(
+ ctx.t,
+ std::move(region.pin),
+ std::array{
+ region.create_remap_entry()
+ }
+ ).si_then([®ion](auto pins) {
+ ceph_assert(pins.size() == 1);
+ ceph_assert(region.new_len == pins[0]->get_length());
+ return ObjectDataHandler::write_iertr::now();
+ });
+ } else if (region.is_overwrite()) {
+ return ctx.tm.remap_pin<ObjectDataBlock, 2>(
+ ctx.t,
+ std::move(region.pin),
+ std::array{
+ region.create_left_remap_entry(),
+ region.create_right_remap_entry()
+ }
+ ).si_then([®ion](auto pins) {
+ ceph_assert(pins.size() == 2);
+ ceph_assert(region.pin->get_key() == pins[0]->get_key());
+ ceph_assert(region.pin->get_key() + pins[0]->get_length() +
+ region.new_len == pins[1]->get_key());
+ return ObjectDataHandler::write_iertr::now();
+ });
+ } else {
+ ceph_abort("impossible");
+ return ObjectDataHandler::write_iertr::now();
+ }
+ });
+}
+
ObjectDataHandler::write_ret do_removals(
context_t ctx,
- lba_pin_list_t &pins)
+ lba_pin_list_t &to_remove)
{
return trans_intr::do_for_each(
- pins,
+ to_remove,
[ctx](auto &pin) {
LOG_PREFIX(object_data_handler.cc::do_removals);
DEBUGT("decreasing ref: {}",
});
}
-/// Creates zero/data extents in to_write
+/// Creates zero/data extents in to_insert
ObjectDataHandler::write_ret do_insertions(
context_t ctx,
- extent_to_write_list_t &to_write)
+ extent_to_insert_list_t &to_insert)
{
return trans_intr::do_for_each(
- to_write,
+ to_insert,
[ctx](auto ®ion) {
LOG_PREFIX(object_data_handler.cc::do_insertions);
if (region.is_data()) {
assert_aligned(region.addr);
assert_aligned(region.len);
- ceph_assert(region.len == region.to_write->length());
+ ceph_assert(region.len == region.bl->length());
DEBUGT("allocating extent: {}~{}",
ctx.t,
region.addr,
}
ceph_assert(extent->get_laddr() == region.addr);
ceph_assert(extent->get_length() == region.len);
- auto iter = region.to_write->cbegin();
+ auto iter = region.bl->cbegin();
iter.copy(region.len, extent->get_bptr().c_str());
return ObjectDataHandler::write_iertr::now();
});
return ObjectDataHandler::write_iertr::now();
});
} else {
- ceph_assert(region.is_existing());
- DEBUGT("map existing extent: laddr {} len {} {}",
- ctx.t, region.addr, region.len, *region.existing_paddr);
- return ctx.tm.map_existing_extent<ObjectDataBlock>(
- ctx.t, region.addr, *region.existing_paddr, region.len
- ).handle_error_interruptible(
- TransactionManager::alloc_extent_iertr::pass_further{},
- Device::read_ertr::assert_all{"ignore read error"}
- ).si_then([FNAME, ctx, ®ion](auto extent) {
- if (extent->get_laddr() != region.addr) {
- ERRORT(
- "inconsistent laddr: extent: {} region {}",
- ctx.t,
- extent->get_laddr(),
- region.addr);
- }
- ceph_assert(extent->get_laddr() == region.addr);
- return ObjectDataHandler::write_iertr::now();
- });
+ ceph_abort("impossible");
+ return ObjectDataHandler::write_iertr::now();
}
});
}
overwrite_plan_t(laddr_t offset,
extent_len_t len,
const lba_pin_list_t& pins,
- extent_len_t block_size) :
+ extent_len_t block_size,
+ Transaction& t) :
pin_begin(pins.front()->get_key()),
pin_end(pins.back()->get_key() + pins.back()->get_length()),
left_paddr(pins.front()->get_val()),
right_operation(overwrite_operation_t::UNKNOWN),
block_size(block_size) {
validate();
- evaluate_operations();
+ evaluate_operations(t);
assert(left_operation != overwrite_operation_t::UNKNOWN);
assert(right_operation != overwrite_operation_t::UNKNOWN);
}
* original extent into at most three parts: origin-left, part-to-be-modified
* and origin-right.
*/
- void evaluate_operations() {
+ void evaluate_operations(Transaction& t) {
auto actual_write_size = get_pins_size();
auto aligned_data_size = get_aligned_data_size();
auto left_ext_size = get_left_extent_size();
auto right_ext_size = get_right_extent_size();
+ auto can_merge = [](Transaction& t, paddr_t paddr) {
+ CachedExtentRef ext;
+ if (paddr.is_relative() || paddr.is_delayed()) {
+ return true;
+ } else if (t.get_extent(paddr, &ext) ==
+ Transaction::get_extent_ret::PRESENT) {
+ // FIXME: there is no need to lookup the cache if the pin can
+ // be associated with the extent state
+ if (ext->is_mutable()) {
+ return true;
+ }
+ }
+ return false;
+ };
if (left_paddr.is_zero()) {
actual_write_size -= left_ext_size;
left_ext_size = 0;
left_operation = overwrite_operation_t::OVERWRITE_ZERO;
- // FIXME: left_paddr can be absolute and pending
- } else if (left_paddr.is_relative() ||
- left_paddr.is_delayed()) {
+ } else if (can_merge(t, left_paddr)) {
aligned_data_size += left_ext_size;
left_ext_size = 0;
left_operation = overwrite_operation_t::MERGE_EXISTING;
actual_write_size -= right_ext_size;
right_ext_size = 0;
right_operation = overwrite_operation_t::OVERWRITE_ZERO;
- // FIXME: right_paddr can be absolute and pending
- } else if (right_paddr.is_relative() ||
- right_paddr.is_delayed()) {
+ } else if (can_merge(t, right_paddr)) {
aligned_data_size += right_ext_size;
right_ext_size = 0;
right_operation = overwrite_operation_t::MERGE_EXISTING;
std::nullopt,
std::nullopt);
} else {
+ extent_len_t off = pin->get_intermediate_offset();
return ctx.tm.read_pin<ObjectDataBlock>(
ctx.t, pin->duplicate()
- ).si_then([prepend_len](auto left_extent) {
+ ).si_then([prepend_len, off](auto left_extent) {
return get_iertr::make_ready_future<operate_ret_bare>(
std::nullopt,
std::make_optional(bufferptr(
left_extent->get_bptr(),
- 0,
+ off,
prepend_len)));
});
}
assert(extent_len);
std::optional<extent_to_write_t> left_to_write_extent =
std::make_optional(extent_to_write_t::create_existing(
- overwrite_plan.pin_begin,
- overwrite_plan.left_paddr,
+ pin->duplicate(),
+ pin->get_key(),
extent_len));
auto prepend_len = overwrite_plan.get_left_alignment_size();
if (prepend_len == 0) {
return get_iertr::make_ready_future<operate_ret_bare>(
- left_to_write_extent,
+ std::move(left_to_write_extent),
std::nullopt);
} else {
+ extent_len_t off = pin->get_intermediate_offset();
return ctx.tm.read_pin<ObjectDataBlock>(
ctx.t, pin->duplicate()
- ).si_then([prepend_offset=extent_len, prepend_len,
+ ).si_then([prepend_offset=extent_len + off, prepend_len,
left_to_write_extent=std::move(left_to_write_extent)]
(auto left_extent) mutable {
return get_iertr::make_ready_future<operate_ret_bare>(
- left_to_write_extent,
+ std::move(left_to_write_extent),
std::make_optional(bufferptr(
left_extent->get_bptr(),
prepend_offset,
std::nullopt,
std::nullopt);
} else {
- auto append_offset = overwrite_plan.data_end - right_pin_begin;
+ auto append_offset =
+ overwrite_plan.data_end
+ - right_pin_begin
+ + pin->get_intermediate_offset();
return ctx.tm.read_pin<ObjectDataBlock>(
ctx.t, pin->duplicate()
).si_then([append_offset, append_len](auto right_extent) {
assert(extent_len);
std::optional<extent_to_write_t> right_to_write_extent =
std::make_optional(extent_to_write_t::create_existing(
+ pin->duplicate(),
overwrite_plan.aligned_data_end,
- overwrite_plan.right_paddr.add_offset(overwrite_plan.aligned_data_end - right_pin_begin),
extent_len));
auto append_len = overwrite_plan.get_right_alignment_size();
if (append_len == 0) {
return get_iertr::make_ready_future<operate_ret_bare>(
- right_to_write_extent,
+ std::move(right_to_write_extent),
std::nullopt);
} else {
- auto append_offset = overwrite_plan.data_end - right_pin_begin;
+ auto append_offset =
+ overwrite_plan.data_end
+ - right_pin_begin
+ + pin->get_intermediate_offset();
return ctx.tm.read_pin<ObjectDataBlock>(
ctx.t, pin->duplicate()
).si_then([append_offset, append_len,
right_to_write_extent=std::move(right_to_write_extent)]
(auto right_extent) mutable {
return get_iertr::make_ready_future<operate_ret_bare>(
- right_to_write_extent,
+ std::move(right_to_write_extent),
std::make_optional(bufferptr(
right_extent->get_bptr(),
append_offset,
});
}
+template <typename F>
+auto with_objects_data(
+ ObjectDataHandler::context_t ctx,
+ F &&f)
+{
+ ceph_assert(ctx.d_onode);
+ return seastar::do_with(
+ ctx.onode.get_layout().object_data.get(),
+ ctx.d_onode->get_layout().object_data.get(),
+ std::forward<F>(f),
+ [ctx](auto &object_data, auto &d_object_data, auto &f) {
+ return std::invoke(f, object_data, d_object_data
+ ).si_then([ctx, &object_data, &d_object_data] {
+ if (object_data.must_update()) {
+ ctx.onode.get_mutable_layout(ctx.t).object_data.update(object_data);
+ }
+ if (d_object_data.must_update()) {
+ ctx.d_onode->get_mutable_layout(
+ ctx.t).object_data.update(d_object_data);
+ }
+ return seastar::now();
+ });
+ });
+}
+
ObjectDataHandler::write_ret ObjectDataHandler::prepare_data_reservation(
context_t ctx,
object_data_t &object_data,
).si_then([ctx, size, &pins, &object_data, &to_write](auto _pins) {
_pins.swap(pins);
ceph_assert(pins.size());
+ if (!size) {
+ // no need to reserve region if we are truncating the object's
+ // size to 0
+ return clear_iertr::now();
+ }
auto &pin = *pins.front();
ceph_assert(pin.get_key() >= object_data.get_reserved_data_base());
ceph_assert(
object_data.get_reserved_data_len() - pin_offset));
return clear_iertr::now();
} else {
- /* First pin overlaps the boundary and has data, read in extent
- * and rewrite portion prior to size */
- return ctx.tm.read_pin<ObjectDataBlock>(
- ctx.t,
- pin.duplicate()
- ).si_then([ctx, size, pin_offset, &pin, &object_data, &to_write](
- auto extent) {
- bufferlist bl;
- bl.append(
- bufferptr(
- extent->get_bptr(),
- 0,
- size - pin_offset
- ));
- bl.append_zero(p2roundup(size, ctx.tm.get_block_size()) - size);
- to_write.push_back(extent_to_write_t::create_data(
- pin.get_key(),
- bl));
+ /* First pin overlaps the boundary and has data, remap it
+ * if aligned or rewrite it if not aligned to size */
+ auto roundup_size = p2roundup(size, ctx.tm.get_block_size());
+ auto append_len = roundup_size - size;
+ if (append_len == 0) {
+ LOG_PREFIX(ObjectDataHandler::trim_data_reservation);
+ TRACET("First pin overlaps the boundary and has aligned data"
+ "create existing at addr:{}, len:{}",
+ ctx.t, pin.get_key(), size - pin_offset);
+ to_write.push_back(extent_to_write_t::create_existing(
+ pin.duplicate(),
+ pin.get_key(),
+ size - pin_offset));
to_write.push_back(extent_to_write_t::create_zero(
- object_data.get_reserved_data_base() +
- p2roundup(size, ctx.tm.get_block_size()),
- object_data.get_reserved_data_len() -
- p2roundup(size, ctx.tm.get_block_size())));
- return clear_iertr::now();
- });
- }
- }).si_then([ctx, &pins] {
- return do_removals(ctx, pins);
- }).si_then([ctx, &to_write] {
- return do_insertions(ctx, to_write);
- }).si_then([size, &object_data] {
- if (size == 0) {
- object_data.clear();
+ object_data.get_reserved_data_base() + roundup_size,
+ object_data.get_reserved_data_len() - roundup_size));
+ return clear_iertr::now();
+ } else {
+ return ctx.tm.read_pin<ObjectDataBlock>(
+ ctx.t,
+ pin.duplicate()
+ ).si_then([ctx, size, pin_offset, append_len, roundup_size,
+ &pin, &object_data, &to_write](auto extent) {
+ bufferlist bl;
+ bl.append(
+ bufferptr(
+ extent->get_bptr(),
+ pin.get_intermediate_offset(),
+ size - pin_offset
+ ));
+ bl.append_zero(append_len);
+ LOG_PREFIX(ObjectDataHandler::trim_data_reservation);
+ TRACET("First pin overlaps the boundary and has unaligned data"
+ "create data at addr:{}, len:{}",
+ ctx.t, pin.get_key(), bl.length());
+ to_write.push_back(extent_to_write_t::create_data(
+ pin.get_key(),
+ bl));
+ to_write.push_back(extent_to_write_t::create_zero(
+ object_data.get_reserved_data_base() + roundup_size,
+ object_data.get_reserved_data_len() - roundup_size));
+ return clear_iertr::now();
+ });
+ }
}
- return ObjectDataHandler::clear_iertr::now();
+ }).si_then([ctx, size, &to_write, &object_data, &pins] {
+ return seastar::do_with(
+ prepare_ops_list(pins, to_write),
+ [ctx, size, &object_data](auto &ops) {
+ return do_remappings(ctx, ops.to_remap
+ ).si_then([ctx, &ops] {
+ return do_removals(ctx, ops.to_remove);
+ }).si_then([ctx, &ops] {
+ return do_insertions(ctx, ops.to_insert);
+ }).si_then([size, &object_data] {
+ if (size == 0) {
+ object_data.clear();
+ }
+ return ObjectDataHandler::clear_iertr::now();
+ });
+ });
});
});
}
}
assert(bl.length() % block_size == 0);
assert(bl.length() == (right - left));
- return {extent_to_write_t::create_data(left, bl)};
+ extent_to_write_list_t ret;
+ ret.push_back(extent_to_write_t::create_data(left, bl));
+ return ret;
} else {
// reserved section between ends, headptr and tailptr in different extents
extent_to_write_list_t ret;
if (bl.has_value()) {
assert(bl->length() == len);
}
- overwrite_plan_t overwrite_plan(offset, len, _pins, ctx.tm.get_block_size());
+ overwrite_plan_t overwrite_plan(offset, len, _pins, ctx.tm.get_block_size(), ctx.t);
return seastar::do_with(
std::move(_pins),
extent_to_write_list_t(),
assert(pin_begin == to_write.front().addr);
assert(pin_end == to_write.back().get_end_addr());
- return do_removals(ctx, pins);
- }).si_then([ctx, &to_write] {
- return do_insertions(ctx, to_write);
+ return seastar::do_with(
+ prepare_ops_list(pins, to_write),
+ [ctx](auto &ops) {
+ return do_remappings(ctx, ops.to_remap
+ ).si_then([ctx, &ops] {
+ return do_removals(ctx, ops.to_remove);
+ }).si_then([ctx, &ops] {
+ return do_insertions(ctx, ops.to_insert);
+ });
+ });
});
});
});
current = end;
return seastar::now();
} else {
+ LOG_PREFIX(ObjectDataHandler::read);
+ auto key = pin->get_key();
+ bool is_indirect = pin->is_indirect();
+ extent_len_t off = pin->get_intermediate_offset();
+ DEBUGT("reading {}~{}, indirect: {}, "
+ "intermediate offset: {}, current: {}, end: {}",
+ ctx.t,
+ key,
+ pin->get_length(),
+ is_indirect,
+ off,
+ current,
+ end);
return ctx.tm.read_pin<ObjectDataBlock>(
ctx.t,
std::move(pin)
- ).si_then([&ret, ¤t, end](auto extent) {
+ ).si_then([&ret, ¤t, end, key, off,
+ is_indirect](auto extent) {
ceph_assert(
- (extent->get_laddr() + extent->get_length()) >= end);
+ is_indirect
+ ? (key - off + extent->get_length()) >= end
+ : (extent->get_laddr() + extent->get_length()) >= end);
ceph_assert(end > current);
ret.append(
bufferptr(
extent->get_bptr(),
- current - extent->get_laddr(),
+ off + current - (is_indirect ? key : extent->get_laddr()),
end - current));
current = end;
return seastar::now();
});
}
+ObjectDataHandler::clone_ret ObjectDataHandler::clone_extents(
+ context_t ctx,
+ object_data_t &object_data,
+ lba_pin_list_t &pins,
+ laddr_t data_base)
+{
+ LOG_PREFIX(ObjectDataHandler::clone_extents);
+ TRACET(" object_data: {}~{}, data_base: {}",
+ ctx.t,
+ object_data.get_reserved_data_base(),
+ object_data.get_reserved_data_len(),
+ data_base);
+ return ctx.tm.dec_ref(
+ ctx.t,
+ object_data.get_reserved_data_base()
+ ).si_then(
+ [&pins, &object_data, ctx, data_base](auto) mutable {
+ return seastar::do_with(
+ (extent_len_t)0,
+ [&object_data, ctx, data_base, &pins](auto &last_pos) {
+ return trans_intr::do_for_each(
+ pins,
+ [&last_pos, &object_data, ctx, data_base](auto &pin) {
+ auto offset = pin->get_key() - data_base;
+ ceph_assert(offset == last_pos);
+ auto fut = TransactionManager::alloc_extent_iertr
+ ::make_ready_future<LBAMappingRef>();
+ auto addr = object_data.get_reserved_data_base() + offset;
+ if (pin->get_val().is_zero()) {
+ fut = ctx.tm.reserve_region(ctx.t, addr, pin->get_length());
+ } else {
+ fut = ctx.tm.clone_pin(ctx.t, addr, *pin);
+ }
+ return fut.si_then(
+ [&pin, &last_pos, offset](auto) {
+ last_pos = offset + pin->get_length();
+ return seastar::now();
+ }).handle_error_interruptible(
+ crimson::ct_error::input_output_error::pass_further(),
+ crimson::ct_error::assert_all("not possible")
+ );
+ }).si_then([&last_pos, &object_data, ctx] {
+ if (last_pos != object_data.get_reserved_data_len()) {
+ return ctx.tm.reserve_region(
+ ctx.t,
+ object_data.get_reserved_data_base() + last_pos,
+ object_data.get_reserved_data_len() - last_pos
+ ).si_then([](auto) {
+ return seastar::now();
+ });
+ }
+ return TransactionManager::reserve_extent_iertr::now();
+ });
+ });
+ },
+ ObjectDataHandler::write_iertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "object_data_handler::clone invalid error"
+ }
+ );
+}
+
+ObjectDataHandler::clone_ret ObjectDataHandler::clone(
+ context_t ctx)
+{
+ // the whole clone procedure can be seperated into the following steps:
+ // 1. let clone onode(d_object_data) take the head onode's
+ // object data base;
+ // 2. reserve a new region in lba tree for the head onode;
+ // 3. clone all extents of the clone onode, see transaction_manager.h
+ // for the details of clone_pin;
+ // 4. reserve the space between the head onode's size and its reservation
+ // length.
+ return with_objects_data(
+ ctx,
+ [ctx, this](auto &object_data, auto &d_object_data) {
+ ceph_assert(d_object_data.is_null());
+ if (object_data.is_null()) {
+ return clone_iertr::now();
+ }
+ return prepare_data_reservation(
+ ctx,
+ d_object_data,
+ object_data.get_reserved_data_len()
+ ).si_then([&object_data, &d_object_data, ctx, this] {
+ assert(!object_data.is_null());
+ auto base = object_data.get_reserved_data_base();
+ auto len = object_data.get_reserved_data_len();
+ object_data.clear();
+ LOG_PREFIX(ObjectDataHandler::clone);
+ DEBUGT("cloned obj reserve_data_base: {}, len {}",
+ ctx.t,
+ d_object_data.get_reserved_data_base(),
+ d_object_data.get_reserved_data_len());
+ return prepare_data_reservation(
+ ctx,
+ object_data,
+ d_object_data.get_reserved_data_len()
+ ).si_then([&d_object_data, ctx, &object_data, base, len, this] {
+ LOG_PREFIX("ObjectDataHandler::clone");
+ DEBUGT("head obj reserve_data_base: {}, len {}",
+ ctx.t,
+ object_data.get_reserved_data_base(),
+ object_data.get_reserved_data_len());
+ return ctx.tm.get_pins(ctx.t, base, len
+ ).si_then([ctx, &object_data, &d_object_data, base, this](auto pins) {
+ return seastar::do_with(
+ std::move(pins),
+ [ctx, &object_data, &d_object_data, base, this](auto &pins) {
+ return clone_extents(ctx, object_data, pins, base
+ ).si_then([ctx, &d_object_data, base, &pins, this] {
+ return clone_extents(ctx, d_object_data, pins, base);
+ }).si_then([&pins, ctx] {
+ return do_removals(ctx, pins);
+ });
+ });
+ });
+ });
+ });
+ });
+}
+
} // namespace crimson::os::seastore