]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/os/seastore/onode_manager/staged-fltree/stages/sub_items_stage.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / os / seastore / onode_manager / staged-fltree / stages / sub_items_stage.h
index 8ef5f747263b04a6589a9248b929ac62776f5634..a17660ff89a0044de5149919cbe1aff2a66f6b8d 100644 (file)
@@ -42,7 +42,10 @@ class internal_sub_items_t {
  public:
   using num_keys_t = index_t;
 
-  internal_sub_items_t(const memory_range_t& range) {
+  internal_sub_items_t(const container_range_t& _range)
+      : node_size{_range.node_size} {
+    assert(is_valid_node_size(node_size));
+    auto& range = _range.range;
     assert(range.p_start < range.p_end);
     assert((range.p_end - range.p_start) % sizeof(internal_sub_item_t) == 0);
     num_items = (range.p_end - range.p_start) / sizeof(internal_sub_item_t);
@@ -61,7 +64,7 @@ class internal_sub_items_t {
   }
   node_offset_t size_before(index_t index) const {
     size_t ret = index * sizeof(internal_sub_item_t);
-    assert(ret < NODE_BLOCK_SIZE);
+    assert(ret < node_size);
     return ret;
   }
   const laddr_packed_t* get_p_value(index_t index) const {
@@ -74,46 +77,54 @@ class internal_sub_items_t {
                  sizeof(internal_sub_item_t);
     auto p_start = p_end - num_items * sizeof(internal_sub_item_t);
     int start_offset = p_start - p_node_start;
-    int end_offset = p_end - p_node_start;
-    assert(start_offset > 0 &&
-           start_offset < end_offset &&
-           end_offset < NODE_BLOCK_SIZE);
+    int stage_size = p_end - p_start;
+    assert(start_offset > 0);
+    assert(stage_size > 0);
+    assert(start_offset + stage_size < (int)node_size);
     ceph::encode(static_cast<node_offset_t>(start_offset), encoded);
-    ceph::encode(static_cast<node_offset_t>(end_offset), encoded);
+    ceph::encode(static_cast<node_offset_t>(stage_size), encoded);
   }
 
   static internal_sub_items_t decode(
-      const char* p_node_start, ceph::bufferlist::const_iterator& delta) {
+      const char* p_node_start,
+      extent_len_t node_size,
+      ceph::bufferlist::const_iterator& delta) {
     node_offset_t start_offset;
     ceph::decode(start_offset, delta);
-    node_offset_t end_offset;
-    ceph::decode(end_offset, delta);
-    assert(start_offset < end_offset);
-    assert(end_offset <= NODE_BLOCK_SIZE);
-    return internal_sub_items_t({p_node_start + start_offset,
-                                 p_node_start + end_offset});
+    node_offset_t stage_size;
+    ceph::decode(stage_size, delta);
+    assert(start_offset > 0);
+    assert(stage_size > 0);
+    assert((unsigned)start_offset + stage_size < node_size);
+    return internal_sub_items_t({{p_node_start + start_offset,
+                                  p_node_start + start_offset + stage_size},
+                                 node_size});
   }
 
   static node_offset_t header_size() { return 0u; }
 
   template <KeyT KT>
   static node_offset_t estimate_insert(
-      const full_key_t<KT>&, const laddr_packed_t&) {
+      const full_key_t<KT>&, const laddr_t&) {
     return sizeof(internal_sub_item_t);
   }
 
   template <KeyT KT>
   static const laddr_packed_t* insert_at(
       NodeExtentMutable&, const internal_sub_items_t&,
-      const full_key_t<KT>&, const laddr_packed_t&,
+      const full_key_t<KT>&, const laddr_t&,
       index_t index, node_offset_t size, const char* p_left_bound);
 
   static node_offset_t trim_until(NodeExtentMutable&, internal_sub_items_t&, index_t);
 
+  static node_offset_t erase_at(
+      NodeExtentMutable&, const internal_sub_items_t&, index_t, const char*);
+
   template <KeyT KT>
   class Appender;
 
  private:
+  extent_len_t node_size;
   index_t num_items;
   const internal_sub_item_t* p_first_item;
 };
@@ -123,8 +134,13 @@ class internal_sub_items_t::Appender {
  public:
   Appender(NodeExtentMutable* p_mut, char* p_append)
     : p_mut{p_mut}, p_append{p_append} {}
+  Appender(NodeExtentMutable* p_mut, const internal_sub_items_t& sub_items)
+    : p_mut{p_mut},
+      p_append{(char*)(sub_items.p_first_item + 1 - sub_items.keys())} {
+    assert(sub_items.keys());
+  }
   void append(const internal_sub_items_t& src, index_t from, index_t items);
-  void append(const full_key_t<KT>&, const laddr_packed_t&, const laddr_packed_t*&);
+  void append(const full_key_t<KT>&, const laddr_t&, const laddr_packed_t*&);
   char* wrap() { return p_append; }
  private:
   NodeExtentMutable* p_mut;
@@ -135,7 +151,7 @@ class internal_sub_items_t::Appender {
  * leaf_sub_items_t
  *
  * The STAGE_RIGHT implementation for leaf node N0/N1/N2, implements staged
- * contract as an indexable container to index snap-gen to onode_t.
+ * contract as an indexable container to index snap-gen to value_header_t.
  *
  * The layout of the contaner storing n sub-items:
  *
@@ -143,7 +159,7 @@ class internal_sub_items_t::Appender {
  * # <---------- sub-items ----------------> # <--- offsets ---------#          #
  * #<~># sub-items [2, n)                    #<~>| offsets [2, n)    #          #
  * #   # <- sub-item 1 -> # <- sub-item 0 -> #   |                   #          #
- * #...# snap-gen | onode # snap-gen | onode #...| offset1 | offset0 # num_keys #
+ * #...# snap-gen | value # snap-gen | value #...| offset1 | offset0 # num_keys #
  *                                           ^             ^         ^
  *                                           |             |         |
  *                               p_items_end +   p_offsets +         |
@@ -151,15 +167,22 @@ class internal_sub_items_t::Appender {
  */
 class leaf_sub_items_t {
  public:
-  // TODO: decide by NODE_BLOCK_SIZE, sizeof(snap_gen_t),
-  //       and the minimal size of onode_t
-  using num_keys_t = uint8_t;
-
-  leaf_sub_items_t(const memory_range_t& range) {
+  // should be enough to index all keys under 64 KiB node
+  using num_keys_t = uint16_t;
+
+  // TODO: remove if num_keys_t is aligned
+  struct num_keys_packed_t {
+    num_keys_t value;
+  } __attribute__((packed));
+
+  leaf_sub_items_t(const container_range_t& _range)
+      : node_size{_range.node_size} {
+    assert(is_valid_node_size(node_size));
+    auto& range = _range.range;
     assert(range.p_start < range.p_end);
     auto _p_num_keys = range.p_end - sizeof(num_keys_t);
     assert(range.p_start < _p_num_keys);
-    p_num_keys = reinterpret_cast<const num_keys_t*>(_p_num_keys);
+    p_num_keys = reinterpret_cast<const num_keys_packed_t*>(_p_num_keys);
     assert(keys());
     auto _p_offsets = _p_num_keys - sizeof(node_offset_t);
     assert(range.p_start < _p_offsets);
@@ -198,7 +221,7 @@ class leaf_sub_items_t {
   // container type system
   using key_get_type = const snap_gen_t&;
   static constexpr auto CONTAINER_TYPE = ContainerType::INDEXABLE;
-  num_keys_t keys() const { return *p_num_keys; }
+  num_keys_t keys() const { return p_num_keys->value; }
   key_get_type operator[](index_t index) const {
     assert(index < keys());
     auto pointer = get_item_end(index);
@@ -218,62 +241,71 @@ class leaf_sub_items_t {
             (index + 1) * sizeof(node_offset_t) +
             get_offset(index).value;
     }
-    assert(ret < NODE_BLOCK_SIZE);
+    assert(ret < node_size);
     return ret;
   }
   node_offset_t size_overhead_at(index_t index) const { return sizeof(node_offset_t); }
-  const onode_t* get_p_value(index_t index) const {
+  const value_header_t* get_p_value(index_t index) const {
     assert(index < keys());
     auto pointer = get_item_start(index);
-    auto value = reinterpret_cast<const onode_t*>(pointer);
-    assert(pointer + value->size + sizeof(snap_gen_t) == get_item_end(index));
+    auto value = reinterpret_cast<const value_header_t*>(pointer);
+    assert(pointer + value->allocation_size() + sizeof(snap_gen_t) ==
+           get_item_end(index));
     return value;
   }
   void encode(const char* p_node_start, ceph::bufferlist& encoded) const {
     auto p_end = reinterpret_cast<const char*>(p_num_keys) +
                   sizeof(num_keys_t);
     int start_offset = p_start() - p_node_start;
-    int end_offset = p_end - p_node_start;
-    assert(start_offset > 0 &&
-           start_offset < end_offset &&
-           end_offset < NODE_BLOCK_SIZE);
+    int stage_size = p_end - p_start();
+    assert(start_offset > 0);
+    assert(stage_size > 0);
+    assert(start_offset + stage_size < (int)node_size);
     ceph::encode(static_cast<node_offset_t>(start_offset), encoded);
-    ceph::encode(static_cast<node_offset_t>(end_offset), encoded);
+    ceph::encode(static_cast<node_offset_t>(stage_size), encoded);
   }
 
   static leaf_sub_items_t decode(
-      const char* p_node_start, ceph::bufferlist::const_iterator& delta) {
+      const char* p_node_start,
+      extent_len_t node_size,
+      ceph::bufferlist::const_iterator& delta) {
     node_offset_t start_offset;
     ceph::decode(start_offset, delta);
-    node_offset_t end_offset;
-    ceph::decode(end_offset, delta);
-    assert(start_offset < end_offset);
-    assert(end_offset <= NODE_BLOCK_SIZE);
-    return leaf_sub_items_t({p_node_start + start_offset,
-                             p_node_start + end_offset});
+    node_offset_t stage_size;
+    ceph::decode(stage_size, delta);
+    assert(start_offset > 0);
+    assert(stage_size > 0);
+    assert((unsigned)start_offset + stage_size < node_size);
+    return leaf_sub_items_t({{p_node_start + start_offset,
+                              p_node_start + start_offset + stage_size},
+                             node_size});
   }
 
   static node_offset_t header_size() { return sizeof(num_keys_t); }
 
   template <KeyT KT>
-  static node_offset_t estimate_insert(const full_key_t<KT>&, const onode_t& value) {
-    return value.size + sizeof(snap_gen_t) + sizeof(node_offset_t);
+  static node_offset_t estimate_insert(
+      const full_key_t<KT>&, const value_config_t& value) {
+    return value.allocation_size() + sizeof(snap_gen_t) + sizeof(node_offset_t);
   }
 
   template <KeyT KT>
-  static const onode_t* insert_at(
+  static const value_header_t* insert_at(
       NodeExtentMutable&, const leaf_sub_items_t&,
-      const full_key_t<KT>&, const onode_t&,
+      const full_key_t<KT>&, const value_config_t&,
       index_t index, node_offset_t size, const char* p_left_bound);
 
   static node_offset_t trim_until(NodeExtentMutable&, leaf_sub_items_t&, index_t index);
 
+  static node_offset_t erase_at(
+      NodeExtentMutable&, const leaf_sub_items_t&, index_t, const char*);
+
   template <KeyT KT>
   class Appender;
 
  private:
-  // TODO: support unaligned access
-  const num_keys_t* p_num_keys;
+  extent_len_t node_size;
+  const num_keys_packed_t* p_num_keys;
   const node_offset_packed_t* p_offsets;
   const char* p_items_end;
 };
@@ -288,7 +320,7 @@ class leaf_sub_items_t::Appender {
   };
   struct kv_item_t {
     const full_key_t<KT>* p_key;
-    const onode_t* p_value;
+    value_config_t value_config;
   };
   using var_t = std::variant<range_items_t, kv_item_t>;
 
@@ -296,40 +328,35 @@ class leaf_sub_items_t::Appender {
   Appender(NodeExtentMutable* p_mut, char* p_append)
     : p_mut{p_mut}, p_append{p_append} {
   }
-
-  void append(const leaf_sub_items_t& src, index_t from, index_t items) {
-    assert(cnt <= APPENDER_LIMIT);
-    assert(from <= src.keys());
-    if (items == 0) {
-      return;
-    }
-    if (op_src) {
-      assert(*op_src == src);
-    } else {
-      op_src = src;
-    }
-    assert(from < src.keys());
-    assert(from + items <= src.keys());
-    appends[cnt] = range_items_t{from, items};
-    ++cnt;
+  Appender(NodeExtentMutable* p_mut, const leaf_sub_items_t& sub_items)
+    : p_mut{p_mut} , op_dst(sub_items) {
+    assert(sub_items.keys());
   }
+
+  void append(const leaf_sub_items_t& src, index_t from, index_t items);
   void append(const full_key_t<KT>& key,
-              const onode_t& value, const onode_t*& p_value) {
+              const value_config_t& value, const value_header_t*& p_value) {
+    // append from empty
+    assert(p_append);
     assert(pp_value == nullptr);
     assert(cnt <= APPENDER_LIMIT);
-    appends[cnt] = kv_item_t{&key, &value};
+    appends[cnt] = kv_item_t{&key, value};
     ++cnt;
     pp_value = &p_value;
   }
   char* wrap();
 
  private:
-  std::optional<leaf_sub_items_t> op_src;
-  const onode_t** pp_value = nullptr;
   NodeExtentMutable* p_mut;
-  char* p_append;
+  // append from empty
+  std::optional<leaf_sub_items_t> op_src;
+  const value_header_t** pp_value = nullptr;
+  char* p_append = nullptr;
   var_t appends[APPENDER_LIMIT];
   index_t cnt = 0;
+  // append from existing
+  std::optional<leaf_sub_items_t> op_dst;
+  char* p_appended = nullptr;
 };
 
 template <node_type_t> struct _sub_items_t;