1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013- Sage Weil <sage@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_PAGESET_H
16 #define CEPH_PAGESET_H
23 #include <boost/intrusive/avl_set.hpp>
24 #include <boost/intrusive_ptr.hpp>
26 #include "include/encoding.h"
27 #include "include/Spinlock.h"
32 boost::intrusive::avl_set_member_hook
<> hook
;
35 // avoid RefCountedObject because it has a virtual destructor
36 std::atomic
<uint16_t> nrefs
;
37 void get() { ++nrefs
; }
38 void put() { if (--nrefs
== 0) delete this; }
40 typedef boost::intrusive_ptr
<Page
> Ref
;
41 friend void intrusive_ptr_add_ref(Page
*p
) { p
->get(); }
42 friend void intrusive_ptr_release(Page
*p
) { p
->put(); }
44 // key-value comparison functor for avl
46 bool operator()(uint64_t offset
, const Page
&page
) const {
47 return offset
< page
.offset
;
49 bool operator()(const Page
&page
, uint64_t offset
) const {
50 return page
.offset
< offset
;
52 bool operator()(const Page
&lhs
, const Page
&rhs
) const {
53 return lhs
.offset
< rhs
.offset
;
56 void encode(bufferlist
&bl
, size_t page_size
) const {
57 bl
.append(buffer::copy(data
, page_size
));
60 void decode(bufferlist::iterator
&p
, size_t page_size
) {
61 p
.copy(page_size
, data
);
65 static Ref
create(size_t page_size
, uint64_t offset
= 0) {
66 // ensure proper alignment of the Page
67 const auto align
= alignof(Page
);
68 page_size
= (page_size
+ align
- 1) & ~(align
- 1);
69 // allocate the Page and its data in a single buffer
70 auto buffer
= new char[page_size
+ sizeof(Page
)];
71 // place the Page structure at the end of the buffer
72 return new (buffer
+ page_size
) Page(buffer
, offset
);
76 Page(const Page
&) = delete;
77 const Page
& operator=(const Page
&) = delete;
79 private: // private constructor, use create() instead
80 Page(char *data
, uint64_t offset
) : data(data
), offset(offset
), nrefs(1) {}
82 static void operator delete(void *p
) {
83 delete[] reinterpret_cast<Page
*>(p
)->data
;
89 // alloc_range() and get_range() return page refs in a vector
90 typedef std::vector
<Page::Ref
> page_vector
;
93 // store pages in a boost intrusive avl_set
94 typedef Page::Less page_cmp
;
95 typedef boost::intrusive::member_hook
<Page
,
96 boost::intrusive::avl_set_member_hook
<>,
97 &Page::hook
> member_option
;
98 typedef boost::intrusive::avl_set
<Page
,
99 boost::intrusive::compare
<page_cmp
>, member_option
> page_set
;
101 typedef typename
page_set::iterator iterator
;
106 typedef Spinlock lock_type
;
109 void free_pages(iterator cur
, iterator end
) {
112 cur
= pages
.erase(cur
);
117 int count_pages(uint64_t offset
, uint64_t len
) const {
118 // count the overlapping pages
120 if (offset
% page_size
) {
122 size_t rem
= page_size
- offset
% page_size
;
123 len
= len
<= rem
? 0 : len
- rem
;
125 count
+= len
/ page_size
;
132 explicit PageSet(size_t page_size
) : page_size(page_size
) {}
133 PageSet(PageSet
&&rhs
)
134 : pages(std::move(rhs
.pages
)), page_size(rhs
.page_size
) {}
136 free_pages(pages
.begin(), pages
.end());
140 PageSet(const PageSet
&) = delete;
141 const PageSet
& operator=(const PageSet
&) = delete;
143 bool empty() const { return pages
.empty(); }
144 size_t size() const { return pages
.size(); }
145 size_t get_page_size() const { return page_size
; }
147 // allocate all pages that intersect the range [offset,length)
148 void alloc_range(uint64_t offset
, uint64_t length
, page_vector
&range
) {
149 // loop in reverse so we can provide hints to avl_set::insert_check()
150 // and get O(1) insertions after the first
151 uint64_t position
= offset
+ length
- 1;
153 range
.resize(count_pages(offset
, length
));
154 auto out
= range
.rbegin();
156 std::lock_guard
<lock_type
> lock(mutex
);
157 iterator cur
= pages
.end();
159 const uint64_t page_offset
= position
& ~(page_size
-1);
161 typename
page_set::insert_commit_data commit
;
162 auto insert
= pages
.insert_check(cur
, page_offset
, page_cmp(), commit
);
164 auto page
= Page::create(page_size
, page_offset
);
165 cur
= pages
.insert_commit(*page
, commit
);
167 // assume that the caller will write to the range [offset,length),
168 // so we only need to zero memory outside of this range
170 // zero end of page past offset + length
171 if (offset
+ length
< page
->offset
+ page_size
)
172 std::fill(page
->data
+ offset
+ length
- page
->offset
,
173 page
->data
+ page_size
, 0);
174 // zero front of page between page_offset and offset
175 if (offset
> page
->offset
)
176 std::fill(page
->data
, page
->data
+ offset
- page
->offset
, 0);
180 // add a reference to output vector
184 auto c
= std::min(length
, (position
& (page_size
-1)) + 1);
188 // make sure we sized the vector correctly
189 assert(out
== range
.rend());
192 // return all allocated pages that intersect the range [offset,length)
193 void get_range(uint64_t offset
, uint64_t length
, page_vector
&range
) {
194 auto cur
= pages
.lower_bound(offset
& ~(page_size
-1), page_cmp());
195 while (cur
!= pages
.end() && cur
->offset
< offset
+ length
)
196 range
.push_back(&*cur
++);
199 void free_pages_after(uint64_t offset
) {
200 std::lock_guard
<lock_type
> lock(mutex
);
201 auto cur
= pages
.lower_bound(offset
& ~(page_size
-1), page_cmp());
202 if (cur
== pages
.end())
204 if (cur
->offset
< offset
)
206 free_pages(cur
, pages
.end());
209 void encode(bufferlist
&bl
) const {
210 ::encode(page_size
, bl
);
211 unsigned count
= pages
.size();
213 for (auto p
= pages
.rbegin(); p
!= pages
.rend(); ++p
)
214 p
->encode(bl
, page_size
);
216 void decode(bufferlist::iterator
&p
) {
218 ::decode(page_size
, p
);
221 auto cur
= pages
.end();
222 for (unsigned i
= 0; i
< count
; i
++) {
223 auto page
= Page::create(page_size
);
224 page
->decode(p
, page_size
);
225 cur
= pages
.insert_before(cur
, *page
);
230 #endif // CEPH_PAGESET_H