public:
using num_keys_t = index_t;
- internal_sub_items_t(const memory_range_t& range) {
+ internal_sub_items_t(const container_range_t& _range)
+ : node_size{_range.node_size} {
+ assert(is_valid_node_size(node_size));
+ auto& range = _range.range;
assert(range.p_start < range.p_end);
assert((range.p_end - range.p_start) % sizeof(internal_sub_item_t) == 0);
num_items = (range.p_end - range.p_start) / sizeof(internal_sub_item_t);
}
node_offset_t size_before(index_t index) const {
size_t ret = index * sizeof(internal_sub_item_t);
- assert(ret < NODE_BLOCK_SIZE);
+ assert(ret < node_size);
return ret;
}
const laddr_packed_t* get_p_value(index_t index) const {
sizeof(internal_sub_item_t);
auto p_start = p_end - num_items * sizeof(internal_sub_item_t);
int start_offset = p_start - p_node_start;
- int end_offset = p_end - p_node_start;
- assert(start_offset > 0 &&
- start_offset < end_offset &&
- end_offset < NODE_BLOCK_SIZE);
+ int stage_size = p_end - p_start;
+ assert(start_offset > 0);
+ assert(stage_size > 0);
+ assert(start_offset + stage_size < (int)node_size);
ceph::encode(static_cast<node_offset_t>(start_offset), encoded);
- ceph::encode(static_cast<node_offset_t>(end_offset), encoded);
+ ceph::encode(static_cast<node_offset_t>(stage_size), encoded);
}
static internal_sub_items_t decode(
- const char* p_node_start, ceph::bufferlist::const_iterator& delta) {
+ const char* p_node_start,
+ extent_len_t node_size,
+ ceph::bufferlist::const_iterator& delta) {
node_offset_t start_offset;
ceph::decode(start_offset, delta);
- node_offset_t end_offset;
- ceph::decode(end_offset, delta);
- assert(start_offset < end_offset);
- assert(end_offset <= NODE_BLOCK_SIZE);
- return internal_sub_items_t({p_node_start + start_offset,
- p_node_start + end_offset});
+ node_offset_t stage_size;
+ ceph::decode(stage_size, delta);
+ assert(start_offset > 0);
+ assert(stage_size > 0);
+ assert((unsigned)start_offset + stage_size < node_size);
+ return internal_sub_items_t({{p_node_start + start_offset,
+ p_node_start + start_offset + stage_size},
+ node_size});
}
static node_offset_t header_size() { return 0u; }
template <KeyT KT>
static node_offset_t estimate_insert(
- const full_key_t<KT>&, const laddr_packed_t&) {
+ const full_key_t<KT>&, const laddr_t&) {
return sizeof(internal_sub_item_t);
}
template <KeyT KT>
static const laddr_packed_t* insert_at(
NodeExtentMutable&, const internal_sub_items_t&,
- const full_key_t<KT>&, const laddr_packed_t&,
+ const full_key_t<KT>&, const laddr_t&,
index_t index, node_offset_t size, const char* p_left_bound);
static node_offset_t trim_until(NodeExtentMutable&, internal_sub_items_t&, index_t);
+ static node_offset_t erase_at(
+ NodeExtentMutable&, const internal_sub_items_t&, index_t, const char*);
+
template <KeyT KT>
class Appender;
private:
+ extent_len_t node_size;
index_t num_items;
const internal_sub_item_t* p_first_item;
};
public:
Appender(NodeExtentMutable* p_mut, char* p_append)
: p_mut{p_mut}, p_append{p_append} {}
+ Appender(NodeExtentMutable* p_mut, const internal_sub_items_t& sub_items)
+ : p_mut{p_mut},
+ p_append{(char*)(sub_items.p_first_item + 1 - sub_items.keys())} {
+ assert(sub_items.keys());
+ }
void append(const internal_sub_items_t& src, index_t from, index_t items);
- void append(const full_key_t<KT>&, const laddr_packed_t&, const laddr_packed_t*&);
+ void append(const full_key_t<KT>&, const laddr_t&, const laddr_packed_t*&);
char* wrap() { return p_append; }
private:
NodeExtentMutable* p_mut;
* leaf_sub_items_t
*
* The STAGE_RIGHT implementation for leaf node N0/N1/N2, implements staged
- * contract as an indexable container to index snap-gen to onode_t.
+ * contract as an indexable container to index snap-gen to value_header_t.
*
* The layout of the contaner storing n sub-items:
*
* # <---------- sub-items ----------------> # <--- offsets ---------# #
* #<~># sub-items [2, n) #<~>| offsets [2, n) # #
* # # <- sub-item 1 -> # <- sub-item 0 -> # | # #
- * #...# snap-gen | onode # snap-gen | onode #...| offset1 | offset0 # num_keys #
+ * #...# snap-gen | value # snap-gen | value #...| offset1 | offset0 # num_keys #
* ^ ^ ^
* | | |
* p_items_end + p_offsets + |
*/
class leaf_sub_items_t {
public:
- // TODO: decide by NODE_BLOCK_SIZE, sizeof(snap_gen_t),
- // and the minimal size of onode_t
- using num_keys_t = uint8_t;
-
- leaf_sub_items_t(const memory_range_t& range) {
+ // should be enough to index all keys under 64 KiB node
+ using num_keys_t = uint16_t;
+
+ // TODO: remove if num_keys_t is aligned
+ struct num_keys_packed_t {
+ num_keys_t value;
+ } __attribute__((packed));
+
+ leaf_sub_items_t(const container_range_t& _range)
+ : node_size{_range.node_size} {
+ assert(is_valid_node_size(node_size));
+ auto& range = _range.range;
assert(range.p_start < range.p_end);
auto _p_num_keys = range.p_end - sizeof(num_keys_t);
assert(range.p_start < _p_num_keys);
- p_num_keys = reinterpret_cast<const num_keys_t*>(_p_num_keys);
+ p_num_keys = reinterpret_cast<const num_keys_packed_t*>(_p_num_keys);
assert(keys());
auto _p_offsets = _p_num_keys - sizeof(node_offset_t);
assert(range.p_start < _p_offsets);
// container type system
using key_get_type = const snap_gen_t&;
static constexpr auto CONTAINER_TYPE = ContainerType::INDEXABLE;
- num_keys_t keys() const { return *p_num_keys; }
+ num_keys_t keys() const { return p_num_keys->value; }
key_get_type operator[](index_t index) const {
assert(index < keys());
auto pointer = get_item_end(index);
(index + 1) * sizeof(node_offset_t) +
get_offset(index).value;
}
- assert(ret < NODE_BLOCK_SIZE);
+ assert(ret < node_size);
return ret;
}
node_offset_t size_overhead_at(index_t index) const { return sizeof(node_offset_t); }
- const onode_t* get_p_value(index_t index) const {
+ const value_header_t* get_p_value(index_t index) const {
assert(index < keys());
auto pointer = get_item_start(index);
- auto value = reinterpret_cast<const onode_t*>(pointer);
- assert(pointer + value->size + sizeof(snap_gen_t) == get_item_end(index));
+ auto value = reinterpret_cast<const value_header_t*>(pointer);
+ assert(pointer + value->allocation_size() + sizeof(snap_gen_t) ==
+ get_item_end(index));
return value;
}
void encode(const char* p_node_start, ceph::bufferlist& encoded) const {
auto p_end = reinterpret_cast<const char*>(p_num_keys) +
sizeof(num_keys_t);
int start_offset = p_start() - p_node_start;
- int end_offset = p_end - p_node_start;
- assert(start_offset > 0 &&
- start_offset < end_offset &&
- end_offset < NODE_BLOCK_SIZE);
+ int stage_size = p_end - p_start();
+ assert(start_offset > 0);
+ assert(stage_size > 0);
+ assert(start_offset + stage_size < (int)node_size);
ceph::encode(static_cast<node_offset_t>(start_offset), encoded);
- ceph::encode(static_cast<node_offset_t>(end_offset), encoded);
+ ceph::encode(static_cast<node_offset_t>(stage_size), encoded);
}
static leaf_sub_items_t decode(
- const char* p_node_start, ceph::bufferlist::const_iterator& delta) {
+ const char* p_node_start,
+ extent_len_t node_size,
+ ceph::bufferlist::const_iterator& delta) {
node_offset_t start_offset;
ceph::decode(start_offset, delta);
- node_offset_t end_offset;
- ceph::decode(end_offset, delta);
- assert(start_offset < end_offset);
- assert(end_offset <= NODE_BLOCK_SIZE);
- return leaf_sub_items_t({p_node_start + start_offset,
- p_node_start + end_offset});
+ node_offset_t stage_size;
+ ceph::decode(stage_size, delta);
+ assert(start_offset > 0);
+ assert(stage_size > 0);
+ assert((unsigned)start_offset + stage_size < node_size);
+ return leaf_sub_items_t({{p_node_start + start_offset,
+ p_node_start + start_offset + stage_size},
+ node_size});
}
static node_offset_t header_size() { return sizeof(num_keys_t); }
template <KeyT KT>
- static node_offset_t estimate_insert(const full_key_t<KT>&, const onode_t& value) {
- return value.size + sizeof(snap_gen_t) + sizeof(node_offset_t);
+ static node_offset_t estimate_insert(
+ const full_key_t<KT>&, const value_config_t& value) {
+ return value.allocation_size() + sizeof(snap_gen_t) + sizeof(node_offset_t);
}
template <KeyT KT>
- static const onode_t* insert_at(
+ static const value_header_t* insert_at(
NodeExtentMutable&, const leaf_sub_items_t&,
- const full_key_t<KT>&, const onode_t&,
+ const full_key_t<KT>&, const value_config_t&,
index_t index, node_offset_t size, const char* p_left_bound);
static node_offset_t trim_until(NodeExtentMutable&, leaf_sub_items_t&, index_t index);
+ static node_offset_t erase_at(
+ NodeExtentMutable&, const leaf_sub_items_t&, index_t, const char*);
+
template <KeyT KT>
class Appender;
private:
- // TODO: support unaligned access
- const num_keys_t* p_num_keys;
+ extent_len_t node_size;
+ const num_keys_packed_t* p_num_keys;
const node_offset_packed_t* p_offsets;
const char* p_items_end;
};
};
struct kv_item_t {
const full_key_t<KT>* p_key;
- const onode_t* p_value;
+ value_config_t value_config;
};
using var_t = std::variant<range_items_t, kv_item_t>;
Appender(NodeExtentMutable* p_mut, char* p_append)
: p_mut{p_mut}, p_append{p_append} {
}
-
- void append(const leaf_sub_items_t& src, index_t from, index_t items) {
- assert(cnt <= APPENDER_LIMIT);
- assert(from <= src.keys());
- if (items == 0) {
- return;
- }
- if (op_src) {
- assert(*op_src == src);
- } else {
- op_src = src;
- }
- assert(from < src.keys());
- assert(from + items <= src.keys());
- appends[cnt] = range_items_t{from, items};
- ++cnt;
+ Appender(NodeExtentMutable* p_mut, const leaf_sub_items_t& sub_items)
+ : p_mut{p_mut} , op_dst(sub_items) {
+ assert(sub_items.keys());
}
+
+ void append(const leaf_sub_items_t& src, index_t from, index_t items);
void append(const full_key_t<KT>& key,
- const onode_t& value, const onode_t*& p_value) {
+ const value_config_t& value, const value_header_t*& p_value) {
+ // append from empty
+ assert(p_append);
assert(pp_value == nullptr);
assert(cnt <= APPENDER_LIMIT);
- appends[cnt] = kv_item_t{&key, &value};
+ appends[cnt] = kv_item_t{&key, value};
++cnt;
pp_value = &p_value;
}
char* wrap();
private:
- std::optional<leaf_sub_items_t> op_src;
- const onode_t** pp_value = nullptr;
NodeExtentMutable* p_mut;
- char* p_append;
+ // append from empty
+ std::optional<leaf_sub_items_t> op_src;
+ const value_header_t** pp_value = nullptr;
+ char* p_append = nullptr;
var_t appends[APPENDER_LIMIT];
index_t cnt = 0;
+ // append from existing
+ std::optional<leaf_sub_items_t> op_dst;
+ char* p_appended = nullptr;
};
template <node_type_t> struct _sub_items_t;