]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/memstore/PageSet.h
update sources to v12.1.2
[ceph.git] / ceph / src / os / memstore / PageSet.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013- Sage Weil <sage@inktank.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_PAGESET_H
16 #define CEPH_PAGESET_H
17
18 #include <algorithm>
19 #include <atomic>
20 #include <cassert>
21 #include <mutex>
22 #include <vector>
23 #include <boost/intrusive/avl_set.hpp>
24 #include <boost/intrusive_ptr.hpp>
25
26 #include "include/encoding.h"
27 #include "include/Spinlock.h"
28
29
30 struct Page {
31 char *const data;
32 boost::intrusive::avl_set_member_hook<> hook;
33 uint64_t offset;
34
35 // avoid RefCountedObject because it has a virtual destructor
36 std::atomic<uint16_t> nrefs;
37 void get() { ++nrefs; }
38 void put() { if (--nrefs == 0) delete this; }
39
40 typedef boost::intrusive_ptr<Page> Ref;
41 friend void intrusive_ptr_add_ref(Page *p) { p->get(); }
42 friend void intrusive_ptr_release(Page *p) { p->put(); }
43
44 // key-value comparison functor for avl
45 struct Less {
46 bool operator()(uint64_t offset, const Page &page) const {
47 return offset < page.offset;
48 }
49 bool operator()(const Page &page, uint64_t offset) const {
50 return page.offset < offset;
51 }
52 bool operator()(const Page &lhs, const Page &rhs) const {
53 return lhs.offset < rhs.offset;
54 }
55 };
56 void encode(bufferlist &bl, size_t page_size) const {
57 bl.append(buffer::copy(data, page_size));
58 ::encode(offset, bl);
59 }
60 void decode(bufferlist::iterator &p, size_t page_size) {
61 p.copy(page_size, data);
62 ::decode(offset, p);
63 }
64
65 static Ref create(size_t page_size, uint64_t offset = 0) {
66 // ensure proper alignment of the Page
67 const auto align = alignof(Page);
68 page_size = (page_size + align - 1) & ~(align - 1);
69 // allocate the Page and its data in a single buffer
70 auto buffer = new char[page_size + sizeof(Page)];
71 // place the Page structure at the end of the buffer
72 return new (buffer + page_size) Page(buffer, offset);
73 }
74
75 // copy disabled
76 Page(const Page&) = delete;
77 const Page& operator=(const Page&) = delete;
78
79 private: // private constructor, use create() instead
80 Page(char *data, uint64_t offset) : data(data), offset(offset), nrefs(1) {}
81
82 static void operator delete(void *p) {
83 delete[] reinterpret_cast<Page*>(p)->data;
84 }
85 };
86
87 class PageSet {
88 public:
89 // alloc_range() and get_range() return page refs in a vector
90 typedef std::vector<Page::Ref> page_vector;
91
92 private:
93 // store pages in a boost intrusive avl_set
94 typedef Page::Less page_cmp;
95 typedef boost::intrusive::member_hook<Page,
96 boost::intrusive::avl_set_member_hook<>,
97 &Page::hook> member_option;
98 typedef boost::intrusive::avl_set<Page,
99 boost::intrusive::compare<page_cmp>, member_option> page_set;
100
101 typedef typename page_set::iterator iterator;
102
103 page_set pages;
104 uint64_t page_size;
105
106 typedef Spinlock lock_type;
107 lock_type mutex;
108
109 void free_pages(iterator cur, iterator end) {
110 while (cur != end) {
111 Page *page = &*cur;
112 cur = pages.erase(cur);
113 page->put();
114 }
115 }
116
117 int count_pages(uint64_t offset, uint64_t len) const {
118 // count the overlapping pages
119 int count = 0;
120 if (offset % page_size) {
121 count++;
122 size_t rem = page_size - offset % page_size;
123 len = len <= rem ? 0 : len - rem;
124 }
125 count += len / page_size;
126 if (len % page_size)
127 count++;
128 return count;
129 }
130
131 public:
132 explicit PageSet(size_t page_size) : page_size(page_size) {}
133 PageSet(PageSet &&rhs)
134 : pages(std::move(rhs.pages)), page_size(rhs.page_size) {}
135 ~PageSet() {
136 free_pages(pages.begin(), pages.end());
137 }
138
139 // disable copy
140 PageSet(const PageSet&) = delete;
141 const PageSet& operator=(const PageSet&) = delete;
142
143 bool empty() const { return pages.empty(); }
144 size_t size() const { return pages.size(); }
145 size_t get_page_size() const { return page_size; }
146
147 // allocate all pages that intersect the range [offset,length)
148 void alloc_range(uint64_t offset, uint64_t length, page_vector &range) {
149 // loop in reverse so we can provide hints to avl_set::insert_check()
150 // and get O(1) insertions after the first
151 uint64_t position = offset + length - 1;
152
153 range.resize(count_pages(offset, length));
154 auto out = range.rbegin();
155
156 std::lock_guard<lock_type> lock(mutex);
157 iterator cur = pages.end();
158 while (length) {
159 const uint64_t page_offset = position & ~(page_size-1);
160
161 typename page_set::insert_commit_data commit;
162 auto insert = pages.insert_check(cur, page_offset, page_cmp(), commit);
163 if (insert.second) {
164 auto page = Page::create(page_size, page_offset);
165 cur = pages.insert_commit(*page, commit);
166
167 // assume that the caller will write to the range [offset,length),
168 // so we only need to zero memory outside of this range
169
170 // zero end of page past offset + length
171 if (offset + length < page->offset + page_size)
172 std::fill(page->data + offset + length - page->offset,
173 page->data + page_size, 0);
174 // zero front of page between page_offset and offset
175 if (offset > page->offset)
176 std::fill(page->data, page->data + offset - page->offset, 0);
177 } else { // exists
178 cur = insert.first;
179 }
180 // add a reference to output vector
181 out->reset(&*cur);
182 ++out;
183
184 auto c = std::min(length, (position & (page_size-1)) + 1);
185 position -= c;
186 length -= c;
187 }
188 // make sure we sized the vector correctly
189 assert(out == range.rend());
190 }
191
192 // return all allocated pages that intersect the range [offset,length)
193 void get_range(uint64_t offset, uint64_t length, page_vector &range) {
194 auto cur = pages.lower_bound(offset & ~(page_size-1), page_cmp());
195 while (cur != pages.end() && cur->offset < offset + length)
196 range.push_back(&*cur++);
197 }
198
199 void free_pages_after(uint64_t offset) {
200 std::lock_guard<lock_type> lock(mutex);
201 auto cur = pages.lower_bound(offset & ~(page_size-1), page_cmp());
202 if (cur == pages.end())
203 return;
204 if (cur->offset < offset)
205 cur++;
206 free_pages(cur, pages.end());
207 }
208
209 void encode(bufferlist &bl) const {
210 ::encode(page_size, bl);
211 unsigned count = pages.size();
212 ::encode(count, bl);
213 for (auto p = pages.rbegin(); p != pages.rend(); ++p)
214 p->encode(bl, page_size);
215 }
216 void decode(bufferlist::iterator &p) {
217 assert(empty());
218 ::decode(page_size, p);
219 unsigned count;
220 ::decode(count, p);
221 auto cur = pages.end();
222 for (unsigned i = 0; i < count; i++) {
223 auto page = Page::create(page_size);
224 page->decode(p, page_size);
225 cur = pages.insert_before(cur, *page);
226 }
227 }
228 };
229
230 #endif // CEPH_PAGESET_H