]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/AsyncReserver.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / common / AsyncReserver.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef ASYNC_RESERVER_H
16 #define ASYNC_RESERVER_H
17
18 #include "common/Formatter.h"
19
20 #define rdout(x) lgeneric_subdout(cct,reserver,x)
21
22 /**
23 * Manages a configurable number of asynchronous reservations.
24 *
25 * Memory usage is linear with the number of items queued and
26 * linear with respect to the total number of priorities used
27 * over all time.
28 */
29 template <typename T, typename F>
30 class AsyncReserver {
31 CephContext *cct;
32 F *f;
33 unsigned max_allowed;
34 unsigned min_priority;
35 ceph::mutex lock = ceph::make_mutex("AsyncReserver::lock");
36
37 struct Reservation {
38 T item;
39 unsigned prio = 0;
40 Context *grant = 0;
41 Context *preempt = 0;
42 Reservation() {}
43 Reservation(T i, unsigned pr, Context *g, Context *p = 0)
44 : item(i), prio(pr), grant(g), preempt(p) {}
45 void dump(ceph::Formatter *f) const {
46 f->dump_stream("item") << item;
47 f->dump_unsigned("prio", prio);
48 f->dump_bool("can_preempt", !!preempt);
49 }
50 friend std::ostream& operator<<(std::ostream& out, const Reservation& r) {
51 return out << r.item << "(prio " << r.prio << " grant " << r.grant
52 << " preempt " << r.preempt << ")";
53 }
54 };
55
56 std::map<unsigned, std::list<Reservation>> queues;
57 std::map<T, std::pair<unsigned, typename std::list<Reservation>::iterator>> queue_pointers;
58 std::map<T,Reservation> in_progress;
59 std::set<std::pair<unsigned,T>> preempt_by_prio; ///< in_progress that can be preempted
60
61 void preempt_one() {
62 ceph_assert(!preempt_by_prio.empty());
63 auto q = in_progress.find(preempt_by_prio.begin()->second);
64 ceph_assert(q != in_progress.end());
65 Reservation victim = q->second;
66 rdout(10) << __func__ << " preempt " << victim << dendl;
67 f->queue(victim.preempt);
68 victim.preempt = nullptr;
69 in_progress.erase(q);
70 preempt_by_prio.erase(preempt_by_prio.begin());
71 }
72
73 void do_queues() {
74 rdout(20) << __func__ << ":\n";
75 ceph::JSONFormatter jf(true);
76 jf.open_object_section("queue");
77 _dump(&jf);
78 jf.close_section();
79 jf.flush(*_dout);
80 *_dout << dendl;
81
82 // in case min_priority was adjusted up or max_allowed was adjusted down
83 while (!preempt_by_prio.empty() &&
84 (in_progress.size() > max_allowed ||
85 preempt_by_prio.begin()->first < min_priority)) {
86 preempt_one();
87 }
88
89 while (!queues.empty()) {
90 // choose highest priority queue
91 auto it = queues.end();
92 --it;
93 ceph_assert(!it->second.empty());
94 if (it->first < min_priority) {
95 break;
96 }
97 if (in_progress.size() >= max_allowed &&
98 !preempt_by_prio.empty() &&
99 it->first > preempt_by_prio.begin()->first) {
100 preempt_one();
101 }
102 if (in_progress.size() >= max_allowed) {
103 break; // no room
104 }
105 // grant
106 Reservation p = it->second.front();
107 rdout(10) << __func__ << " grant " << p << dendl;
108 queue_pointers.erase(p.item);
109 it->second.pop_front();
110 if (it->second.empty()) {
111 queues.erase(it);
112 }
113 f->queue(p.grant);
114 p.grant = nullptr;
115 in_progress[p.item] = p;
116 if (p.preempt) {
117 preempt_by_prio.insert(std::make_pair(p.prio, p.item));
118 }
119 }
120 }
121 public:
122 AsyncReserver(
123 CephContext *cct,
124 F *f,
125 unsigned max_allowed,
126 unsigned min_priority = 0)
127 : cct(cct),
128 f(f),
129 max_allowed(max_allowed),
130 min_priority(min_priority) {}
131
132 void set_max(unsigned max) {
133 std::lock_guard l(lock);
134 max_allowed = max;
135 do_queues();
136 }
137
138 void set_min_priority(unsigned min) {
139 std::lock_guard l(lock);
140 min_priority = min;
141 do_queues();
142 }
143
144 /**
145 * Update the priority of a reservation
146 *
147 * Note, on_reserved may be called following update_priority. Thus,
148 * the callback must be safe in that case. Callback will be called
149 * with no locks held. cancel_reservation must be called to release the
150 * reservation slot.
151 *
152 * Cases
153 * 1. Item is queued, re-queue with new priority
154 * 2. Item is queued, re-queue and preempt if new priority higher than an in progress item
155 * 3. Item is in progress, just adjust priority if no higher priority waiting
156 * 4. Item is in progress, adjust priority if higher priority items waiting preempt item
157 *
158 */
159 void update_priority(T item, unsigned newprio) {
160 std::lock_guard l(lock);
161 auto i = queue_pointers.find(item);
162 if (i != queue_pointers.end()) {
163 unsigned prio = i->second.first;
164 if (newprio == prio)
165 return;
166 Reservation r = *i->second.second;
167 rdout(10) << __func__ << " update " << r << " (was queued)" << dendl;
168 // Like cancel_reservation() without preempting
169 queues[prio].erase(i->second.second);
170 if (queues[prio].empty()) {
171 queues.erase(prio);
172 }
173 queue_pointers.erase(i);
174
175 // Like request_reservation() to re-queue it but with new priority
176 ceph_assert(!queue_pointers.count(item) &&
177 !in_progress.count(item));
178 r.prio = newprio;
179 queues[newprio].push_back(r);
180 queue_pointers.insert(std::make_pair(item,
181 std::make_pair(newprio,--(queues[newprio]).end())));
182 } else {
183 auto p = in_progress.find(item);
184 if (p != in_progress.end()) {
185 if (p->second.prio == newprio)
186 return;
187 rdout(10) << __func__ << " update " << p->second
188 << " (in progress)" << dendl;
189 // We want to preempt if priority goes down
190 // and smaller then highest priority waiting
191 if (p->second.preempt) {
192 if (newprio < p->second.prio && !queues.empty()) {
193 // choose highest priority queue
194 auto it = queues.end();
195 --it;
196 ceph_assert(!it->second.empty());
197 if (it->first > newprio) {
198 rdout(10) << __func__ << " update " << p->second
199 << " lowered priority let do_queues() preempt it" << dendl;
200 }
201 }
202 preempt_by_prio.erase(std::make_pair(p->second.prio, p->second.item));
203 p->second.prio = newprio;
204 preempt_by_prio.insert(std::make_pair(p->second.prio, p->second.item));
205 } else {
206 p->second.prio = newprio;
207 }
208 } else {
209 rdout(10) << __func__ << " update " << item << " (not found)" << dendl;
210 }
211 }
212 do_queues();
213 return;
214 }
215
216 void dump(ceph::Formatter *f) {
217 std::lock_guard l(lock);
218 _dump(f);
219 }
220 void _dump(ceph::Formatter *f) {
221 f->dump_unsigned("max_allowed", max_allowed);
222 f->dump_unsigned("min_priority", min_priority);
223 f->open_array_section("queues");
224 for (auto& p : queues) {
225 f->open_object_section("queue");
226 f->dump_unsigned("priority", p.first);
227 f->open_array_section("items");
228 for (auto& q : p.second) {
229 f->dump_object("item", q);
230 }
231 f->close_section();
232 f->close_section();
233 }
234 f->close_section();
235 f->open_array_section("in_progress");
236 for (auto& p : in_progress) {
237 f->dump_object("item", p.second);
238 }
239 f->close_section();
240 }
241
242 /**
243 * Requests a reservation
244 *
245 * Note, on_reserved may be called following cancel_reservation. Thus,
246 * the callback must be safe in that case. Callback will be called
247 * with no locks held. cancel_reservation must be called to release the
248 * reservation slot.
249 */
250 void request_reservation(
251 T item, ///< [in] reservation key
252 Context *on_reserved, ///< [in] callback to be called on reservation
253 unsigned prio, ///< [in] priority
254 Context *on_preempt = 0 ///< [in] callback to be called if we are preempted (optional)
255 ) {
256 std::lock_guard l(lock);
257 Reservation r(item, prio, on_reserved, on_preempt);
258 rdout(10) << __func__ << " queue " << r << dendl;
259 ceph_assert(!queue_pointers.count(item) &&
260 !in_progress.count(item));
261 queues[prio].push_back(r);
262 queue_pointers.insert(std::make_pair(item,
263 std::make_pair(prio,--(queues[prio]).end())));
264 do_queues();
265 }
266
267 /**
268 * Cancels reservation
269 *
270 * Frees the reservation under key for use.
271 * Note, after cancel_reservation, the reservation_callback may or
272 * may not still be called.
273 */
274 void cancel_reservation(
275 T item ///< [in] key for reservation to cancel
276 ) {
277 std::lock_guard l(lock);
278 auto i = queue_pointers.find(item);
279 if (i != queue_pointers.end()) {
280 unsigned prio = i->second.first;
281 const Reservation& r = *i->second.second;
282 rdout(10) << __func__ << " cancel " << r << " (was queued)" << dendl;
283 delete r.grant;
284 delete r.preempt;
285 queues[prio].erase(i->second.second);
286 if (queues[prio].empty()) {
287 queues.erase(prio);
288 }
289 queue_pointers.erase(i);
290 } else {
291 auto p = in_progress.find(item);
292 if (p != in_progress.end()) {
293 rdout(10) << __func__ << " cancel " << p->second
294 << " (was in progress)" << dendl;
295 if (p->second.preempt) {
296 preempt_by_prio.erase(std::make_pair(p->second.prio, p->second.item));
297 delete p->second.preempt;
298 }
299 in_progress.erase(p);
300 } else {
301 rdout(10) << __func__ << " cancel " << item << " (not found)" << dendl;
302 }
303 }
304 do_queues();
305 }
306
307 /**
308 * Has reservations
309 *
310 * Return true if there are reservations in progress
311 */
312 bool has_reservation() {
313 std::lock_guard l(lock);
314 return !in_progress.empty();
315 }
316 static const unsigned MAX_PRIORITY = (unsigned)-1;
317 };
318
319 #undef rdout
320 #endif