]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/Readahead.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / common / Readahead.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/Readahead.h"
5 #include "common/Cond.h"
6
7 using std::vector;
8
9 Readahead::Readahead()
10 : m_trigger_requests(10),
11 m_readahead_min_bytes(0),
12 m_readahead_max_bytes(NO_LIMIT),
13 m_alignments(),
14 m_nr_consec_read(0),
15 m_consec_read_bytes(0),
16 m_last_pos(0),
17 m_readahead_pos(0),
18 m_readahead_trigger_pos(0),
19 m_readahead_size(0),
20 m_pending(0) {
21 }
22
23 Readahead::~Readahead() {
24 }
25
26 Readahead::extent_t Readahead::update(const vector<extent_t>& extents, uint64_t limit) {
27 m_lock.lock();
28 for (vector<extent_t>::const_iterator p = extents.begin(); p != extents.end(); ++p) {
29 _observe_read(p->first, p->second);
30 }
31 if (m_readahead_pos >= limit|| m_last_pos >= limit) {
32 m_lock.unlock();
33 return extent_t(0, 0);
34 }
35 std::pair<uint64_t, uint64_t> extent = _compute_readahead(limit);
36 m_lock.unlock();
37 return extent;
38 }
39
40 Readahead::extent_t Readahead::update(uint64_t offset, uint64_t length, uint64_t limit) {
41 m_lock.lock();
42 _observe_read(offset, length);
43 if (m_readahead_pos >= limit || m_last_pos >= limit) {
44 m_lock.unlock();
45 return extent_t(0, 0);
46 }
47 extent_t extent = _compute_readahead(limit);
48 m_lock.unlock();
49 return extent;
50 }
51
52 void Readahead::_observe_read(uint64_t offset, uint64_t length) {
53 if (offset == m_last_pos) {
54 m_nr_consec_read++;
55 m_consec_read_bytes += length;
56 } else {
57 m_nr_consec_read = 0;
58 m_consec_read_bytes = 0;
59 m_readahead_trigger_pos = 0;
60 m_readahead_size = 0;
61 m_readahead_pos = 0;
62 }
63 m_last_pos = offset + length;
64 }
65
66 Readahead::extent_t Readahead::_compute_readahead(uint64_t limit) {
67 uint64_t readahead_offset = 0;
68 uint64_t readahead_length = 0;
69 if (m_nr_consec_read >= m_trigger_requests) {
70 // currently reading sequentially
71 if (m_last_pos >= m_readahead_trigger_pos) {
72 // need to read ahead
73 if (m_readahead_size == 0) {
74 // initial readahead trigger
75 m_readahead_size = m_consec_read_bytes;
76 m_readahead_pos = m_last_pos;
77 } else {
78 // continuing readahead trigger
79 m_readahead_size *= 2;
80 if (m_last_pos > m_readahead_pos) {
81 m_readahead_pos = m_last_pos;
82 }
83 }
84 m_readahead_size = std::max(m_readahead_size, m_readahead_min_bytes);
85 m_readahead_size = std::min(m_readahead_size, m_readahead_max_bytes);
86 readahead_offset = m_readahead_pos;
87 readahead_length = m_readahead_size;
88
89 // Snap to the first alignment possible
90 uint64_t readahead_end = readahead_offset + readahead_length;
91 for (vector<uint64_t>::iterator p = m_alignments.begin(); p != m_alignments.end(); ++p) {
92 // Align the readahead, if possible.
93 uint64_t alignment = *p;
94 uint64_t align_prev = readahead_end / alignment * alignment;
95 uint64_t align_next = align_prev + alignment;
96 uint64_t dist_prev = readahead_end - align_prev;
97 uint64_t dist_next = align_next - readahead_end;
98 if (dist_prev < readahead_length / 2 && dist_prev < dist_next) {
99 // we can snap to the previous alignment point by a less than 50% reduction in size
100 ceph_assert(align_prev > readahead_offset);
101 readahead_length = align_prev - readahead_offset;
102 break;
103 } else if(dist_next < readahead_length / 2) {
104 // we can snap to the next alignment point by a less than 50% increase in size
105 ceph_assert(align_next > readahead_offset);
106 readahead_length = align_next - readahead_offset;
107 break;
108 }
109 // Note that m_readahead_size should remain unadjusted.
110 }
111
112 if (m_readahead_pos + readahead_length > limit) {
113 readahead_length = limit - m_readahead_pos;
114 }
115
116 m_readahead_trigger_pos = m_readahead_pos + readahead_length / 2;
117 m_readahead_pos += readahead_length;
118 }
119 }
120 return extent_t(readahead_offset, readahead_length);
121 }
122
123 void Readahead::inc_pending(int count) {
124 ceph_assert(count > 0);
125 m_pending_lock.lock();
126 m_pending += count;
127 m_pending_lock.unlock();
128 }
129
130 void Readahead::dec_pending(int count) {
131 ceph_assert(count > 0);
132 m_pending_lock.lock();
133 ceph_assert(m_pending >= count);
134 m_pending -= count;
135 if (m_pending == 0) {
136 std::list<Context *> pending_waiting(std::move(m_pending_waiting));
137 m_pending_lock.unlock();
138
139 for (auto ctx : pending_waiting) {
140 ctx->complete(0);
141 }
142 } else {
143 m_pending_lock.unlock();
144 }
145 }
146
147 void Readahead::wait_for_pending() {
148 C_SaferCond ctx;
149 wait_for_pending(&ctx);
150 ctx.wait();
151 }
152
153 void Readahead::wait_for_pending(Context *ctx) {
154 m_pending_lock.lock();
155 if (m_pending > 0) {
156 m_pending_lock.unlock();
157 m_pending_waiting.push_back(ctx);
158 return;
159 }
160 m_pending_lock.unlock();
161
162 ctx->complete(0);
163 }
164 void Readahead::set_trigger_requests(int trigger_requests) {
165 m_lock.lock();
166 m_trigger_requests = trigger_requests;
167 m_lock.unlock();
168 }
169
170 uint64_t Readahead::get_min_readahead_size(void) {
171 std::lock_guard lock(m_lock);
172 return m_readahead_min_bytes;
173 }
174
175 uint64_t Readahead::get_max_readahead_size(void) {
176 std::lock_guard lock(m_lock);
177 return m_readahead_max_bytes;
178 }
179
180 void Readahead::set_min_readahead_size(uint64_t min_readahead_size) {
181 m_lock.lock();
182 m_readahead_min_bytes = min_readahead_size;
183 m_lock.unlock();
184 }
185
186 void Readahead::set_max_readahead_size(uint64_t max_readahead_size) {
187 m_lock.lock();
188 m_readahead_max_bytes = max_readahead_size;
189 m_lock.unlock();
190 }
191
192 void Readahead::set_alignments(const vector<uint64_t> &alignments) {
193 m_lock.lock();
194 m_alignments = alignments;
195 m_lock.unlock();
196 }