]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/AsyncReserver.h
import ceph 12.2.12
[ceph.git] / ceph / src / common / AsyncReserver.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef ASYNC_RESERVER_H
16 #define ASYNC_RESERVER_H
17
18 #include "common/Finisher.h"
19 #include "common/Formatter.h"
20
21 #define rdout(x) lgeneric_subdout(cct,reserver,x)
22
23 /**
24 * Manages a configurable number of asyncronous reservations.
25 *
26 * Memory usage is linear with the number of items queued and
27 * linear with respect to the total number of priorities used
28 * over all time.
29 */
30 template <typename T>
31 class AsyncReserver {
32 CephContext *cct;
33 Finisher *f;
34 unsigned max_allowed;
35 unsigned min_priority;
36 Mutex lock;
37
38 struct Reservation {
39 T item;
40 unsigned prio = 0;
41 Context *grant = 0;
42 Context *preempt = 0;
43 Reservation() {}
44 Reservation(T i, unsigned pr, Context *g, Context *p = 0)
45 : item(i), prio(pr), grant(g), preempt(p) {}
46 void dump(Formatter *f) const {
47 f->dump_stream("item") << item;
48 f->dump_unsigned("prio", prio);
49 f->dump_bool("can_preempt", !!preempt);
50 }
51 friend ostream& operator<<(ostream& out, const Reservation& r) {
52 return out << r.item << "(prio " << r.prio << " grant " << r.grant
53 << " preempt " << r.preempt << ")";
54 }
55 };
56
57 map<unsigned, list<Reservation>> queues;
58 map<T, pair<unsigned, typename list<Reservation>::iterator>> queue_pointers;
59 map<T,Reservation> in_progress;
60 set<pair<unsigned,T>> preempt_by_prio; ///< in_progress that can be preempted
61
62 void preempt_one() {
63 assert(!preempt_by_prio.empty());
64 auto q = in_progress.find(preempt_by_prio.begin()->second);
65 assert(q != in_progress.end());
66 Reservation victim = q->second;
67 rdout(10) << __func__ << " preempt " << victim << dendl;
68 f->queue(victim.preempt);
69 victim.preempt = nullptr;
70 in_progress.erase(q);
71 preempt_by_prio.erase(preempt_by_prio.begin());
72 }
73
74 void do_queues() {
75 rdout(20) << __func__ << ":\n";
76 JSONFormatter jf(true);
77 jf.open_object_section("queue");
78 _dump(&jf);
79 jf.close_section();
80 jf.flush(*_dout);
81 *_dout << dendl;
82
83 // in case min_priority was adjusted up or max_allowed was adjusted down
84 while (!preempt_by_prio.empty() &&
85 (in_progress.size() > max_allowed ||
86 preempt_by_prio.begin()->first < min_priority)) {
87 preempt_one();
88 }
89
90 while (!queues.empty()) {
91 // choose highest priority queue
92 auto it = queues.end();
93 --it;
94 assert(!it->second.empty());
95 if (it->first < min_priority) {
96 break;
97 }
98 if (in_progress.size() >= max_allowed &&
99 !preempt_by_prio.empty() &&
100 it->first > preempt_by_prio.begin()->first) {
101 preempt_one();
102 }
103 if (in_progress.size() >= max_allowed) {
104 break; // no room
105 }
106 // grant
107 Reservation p = it->second.front();
108 rdout(10) << __func__ << " grant " << p << dendl;
109 queue_pointers.erase(p.item);
110 it->second.pop_front();
111 if (it->second.empty()) {
112 queues.erase(it);
113 }
114 f->queue(p.grant);
115 p.grant = nullptr;
116 in_progress[p.item] = p;
117 if (p.preempt) {
118 preempt_by_prio.insert(make_pair(p.prio, p.item));
119 }
120 }
121 }
122 public:
123 AsyncReserver(
124 CephContext *cct,
125 Finisher *f,
126 unsigned max_allowed,
127 unsigned min_priority = 0)
128 : cct(cct),
129 f(f),
130 max_allowed(max_allowed),
131 min_priority(min_priority),
132 lock("AsyncReserver::lock") {}
133
134 void set_max(unsigned max) {
135 Mutex::Locker l(lock);
136 max_allowed = max;
137 do_queues();
138 }
139
140 void set_min_priority(unsigned min) {
141 Mutex::Locker l(lock);
142 min_priority = min;
143 do_queues();
144 }
145
146 /**
147 * Update the priority of a reservation
148 *
149 * Note, on_reserved may be called following update_priority. Thus,
150 * the callback must be safe in that case. Callback will be called
151 * with no locks held. cancel_reservation must be called to release the
152 * reservation slot.
153 *
154 * Cases
155 * 1. Item is queued, re-queue with new priority
156 * 2. Item is queued, re-queue and preempt if new priority higher than an in progress item
157 * 3. Item is in progress, just adjust priority if no higher priority waiting
158 * 4. Item is in progress, adjust priority if higher priority items waiting preempt item
159 *
160 */
161 void update_priority(T item, unsigned newprio) {
162 Mutex::Locker l(lock);
163 auto i = queue_pointers.find(item);
164 if (i != queue_pointers.end()) {
165 unsigned prio = i->second.first;
166 if (newprio == prio)
167 return;
168 Reservation r = *i->second.second;
169 rdout(10) << __func__ << " update " << r << " (was queued)" << dendl;
170 // Like cancel_reservation() without preempting
171 queues[prio].erase(i->second.second);
172 if (queues[prio].empty()) {
173 queues.erase(prio);
174 }
175 queue_pointers.erase(i);
176
177 // Like request_reservation() to re-queue it but with new priority
178 assert(!queue_pointers.count(item) &&
179 !in_progress.count(item));
180 r.prio = newprio;
181 queues[newprio].push_back(r);
182 queue_pointers.insert(make_pair(item,
183 make_pair(newprio,--(queues[newprio]).end())));
184 } else {
185 auto p = in_progress.find(item);
186 if (p != in_progress.end()) {
187 if (p->second.prio == newprio)
188 return;
189 rdout(10) << __func__ << " update " << p->second
190 << " (in progress)" << dendl;
191 // We want to preempt if priority goes down
192 // and smaller then highest priority waiting
193 if (p->second.preempt) {
194 if (newprio < p->second.prio && !queues.empty()) {
195 // choose highest priority queue
196 auto it = queues.end();
197 --it;
198 assert(!it->second.empty());
199 if (it->first > newprio) {
200 rdout(10) << __func__ << " update " << p->second
201 << " lowered priority let do_queues() preempt it" << dendl;
202 }
203 }
204 preempt_by_prio.erase(make_pair(p->second.prio, p->second.item));
205 p->second.prio = newprio;
206 preempt_by_prio.insert(make_pair(p->second.prio, p->second.item));
207 } else {
208 p->second.prio = newprio;
209 }
210 } else {
211 rdout(10) << __func__ << " update " << item << " (not found)" << dendl;
212 }
213 }
214 do_queues();
215 return;
216 }
217
218 void dump(Formatter *f) {
219 Mutex::Locker l(lock);
220 _dump(f);
221 }
222 void _dump(Formatter *f) {
223 f->dump_unsigned("max_allowed", max_allowed);
224 f->dump_unsigned("min_priority", min_priority);
225 f->open_array_section("queues");
226 for (auto& p : queues) {
227 f->open_object_section("queue");
228 f->dump_unsigned("priority", p.first);
229 f->open_array_section("items");
230 for (auto& q : p.second) {
231 f->dump_object("item", q);
232 }
233 f->close_section();
234 f->close_section();
235 }
236 f->close_section();
237 f->open_array_section("in_progress");
238 for (auto& p : in_progress) {
239 f->dump_object("item", p.second);
240 }
241 f->close_section();
242 }
243
244 /**
245 * Requests a reservation
246 *
247 * Note, on_reserved may be called following cancel_reservation. Thus,
248 * the callback must be safe in that case. Callback will be called
249 * with no locks held. cancel_reservation must be called to release the
250 * reservation slot.
251 */
252 void request_reservation(
253 T item, ///< [in] reservation key
254 Context *on_reserved, ///< [in] callback to be called on reservation
255 unsigned prio, ///< [in] priority
256 Context *on_preempt = 0 ///< [in] callback to be called if we are preempted (optional)
257 ) {
258 Mutex::Locker l(lock);
259 Reservation r(item, prio, on_reserved, on_preempt);
260 rdout(10) << __func__ << " queue " << r << dendl;
261 assert(!queue_pointers.count(item) &&
262 !in_progress.count(item));
263 queues[prio].push_back(r);
264 queue_pointers.insert(make_pair(item,
265 make_pair(prio,--(queues[prio]).end())));
266 do_queues();
267 }
268
269 /**
270 * Cancels reservation
271 *
272 * Frees the reservation under key for use.
273 * Note, after cancel_reservation, the reservation_callback may or
274 * may not still be called.
275 */
276 void cancel_reservation(
277 T item ///< [in] key for reservation to cancel
278 ) {
279 Mutex::Locker l(lock);
280 auto i = queue_pointers.find(item);
281 if (i != queue_pointers.end()) {
282 unsigned prio = i->second.first;
283 const Reservation& r = *i->second.second;
284 rdout(10) << __func__ << " cancel " << r << " (was queued)" << dendl;
285 delete r.grant;
286 delete r.preempt;
287 queues[prio].erase(i->second.second);
288 if (queues[prio].empty()) {
289 queues.erase(prio);
290 }
291 queue_pointers.erase(i);
292 } else {
293 auto p = in_progress.find(item);
294 if (p != in_progress.end()) {
295 rdout(10) << __func__ << " cancel " << p->second
296 << " (was in progress)" << dendl;
297 if (p->second.preempt) {
298 preempt_by_prio.erase(make_pair(p->second.prio, p->second.item));
299 delete p->second.preempt;
300 }
301 in_progress.erase(p);
302 } else {
303 rdout(10) << __func__ << " cancel " << item << " (not found)" << dendl;
304 }
305 }
306 do_queues();
307 }
308
309 /**
310 * Has reservations
311 *
312 * Return true if there are reservations in progress
313 */
314 bool has_reservation() {
315 Mutex::Locker l(lock);
316 return !in_progress.empty();
317 }
318 static const unsigned MAX_PRIORITY = (unsigned)-1;
319 };
320
321 #undef rdout
322 #endif