]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef ASYNC_RESERVER_H | |
16 | #define ASYNC_RESERVER_H | |
17 | ||
7c673cae FG |
18 | #include "common/Formatter.h" |
19 | ||
3efd9988 FG |
20 | #define rdout(x) lgeneric_subdout(cct,reserver,x) |
21 | ||
7c673cae | 22 | /** |
11fdf7f2 | 23 | * Manages a configurable number of asynchronous reservations. |
7c673cae FG |
24 | * |
25 | * Memory usage is linear with the number of items queued and | |
26 | * linear with respect to the total number of priorities used | |
27 | * over all time. | |
28 | */ | |
f67539c2 | 29 | template <typename T, typename F> |
7c673cae | 30 | class AsyncReserver { |
3efd9988 | 31 | CephContext *cct; |
f67539c2 | 32 | F *f; |
7c673cae FG |
33 | unsigned max_allowed; |
34 | unsigned min_priority; | |
11fdf7f2 | 35 | ceph::mutex lock = ceph::make_mutex("AsyncReserver::lock"); |
7c673cae | 36 | |
3efd9988 FG |
37 | struct Reservation { |
38 | T item; | |
39 | unsigned prio = 0; | |
40 | Context *grant = 0; | |
41 | Context *preempt = 0; | |
42 | Reservation() {} | |
43 | Reservation(T i, unsigned pr, Context *g, Context *p = 0) | |
44 | : item(i), prio(pr), grant(g), preempt(p) {} | |
f67539c2 | 45 | void dump(ceph::Formatter *f) const { |
3efd9988 FG |
46 | f->dump_stream("item") << item; |
47 | f->dump_unsigned("prio", prio); | |
48 | f->dump_bool("can_preempt", !!preempt); | |
49 | } | |
f67539c2 | 50 | friend std::ostream& operator<<(std::ostream& out, const Reservation& r) { |
3efd9988 FG |
51 | return out << r.item << "(prio " << r.prio << " grant " << r.grant |
52 | << " preempt " << r.preempt << ")"; | |
53 | } | |
54 | }; | |
55 | ||
f67539c2 TL |
56 | std::map<unsigned, std::list<Reservation>> queues; |
57 | std::map<T, std::pair<unsigned, typename std::list<Reservation>::iterator>> queue_pointers; | |
58 | std::map<T,Reservation> in_progress; | |
59 | std::set<std::pair<unsigned,T>> preempt_by_prio; ///< in_progress that can be preempted | |
3efd9988 FG |
60 | |
61 | void preempt_one() { | |
11fdf7f2 | 62 | ceph_assert(!preempt_by_prio.empty()); |
3efd9988 | 63 | auto q = in_progress.find(preempt_by_prio.begin()->second); |
11fdf7f2 | 64 | ceph_assert(q != in_progress.end()); |
3efd9988 FG |
65 | Reservation victim = q->second; |
66 | rdout(10) << __func__ << " preempt " << victim << dendl; | |
67 | f->queue(victim.preempt); | |
68 | victim.preempt = nullptr; | |
69 | in_progress.erase(q); | |
70 | preempt_by_prio.erase(preempt_by_prio.begin()); | |
71 | } | |
7c673cae FG |
72 | |
73 | void do_queues() { | |
3efd9988 | 74 | rdout(20) << __func__ << ":\n"; |
f67539c2 | 75 | ceph::JSONFormatter jf(true); |
3efd9988 FG |
76 | jf.open_object_section("queue"); |
77 | _dump(&jf); | |
78 | jf.close_section(); | |
79 | jf.flush(*_dout); | |
80 | *_dout << dendl; | |
81 | ||
82 | // in case min_priority was adjusted up or max_allowed was adjusted down | |
83 | while (!preempt_by_prio.empty() && | |
84 | (in_progress.size() > max_allowed || | |
85 | preempt_by_prio.begin()->first < min_priority)) { | |
86 | preempt_one(); | |
87 | } | |
88 | ||
89 | while (!queues.empty()) { | |
90 | // choose highest priority queue | |
91 | auto it = queues.end(); | |
92 | --it; | |
11fdf7f2 | 93 | ceph_assert(!it->second.empty()); |
3efd9988 FG |
94 | if (it->first < min_priority) { |
95 | break; | |
96 | } | |
97 | if (in_progress.size() >= max_allowed && | |
98 | !preempt_by_prio.empty() && | |
99 | it->first > preempt_by_prio.begin()->first) { | |
100 | preempt_one(); | |
101 | } | |
102 | if (in_progress.size() >= max_allowed) { | |
103 | break; // no room | |
104 | } | |
105 | // grant | |
106 | Reservation p = it->second.front(); | |
107 | rdout(10) << __func__ << " grant " << p << dendl; | |
108 | queue_pointers.erase(p.item); | |
109 | it->second.pop_front(); | |
110 | if (it->second.empty()) { | |
111 | queues.erase(it); | |
112 | } | |
113 | f->queue(p.grant); | |
114 | p.grant = nullptr; | |
115 | in_progress[p.item] = p; | |
116 | if (p.preempt) { | |
f67539c2 | 117 | preempt_by_prio.insert(std::make_pair(p.prio, p.item)); |
7c673cae FG |
118 | } |
119 | } | |
120 | } | |
121 | public: | |
122 | AsyncReserver( | |
3efd9988 | 123 | CephContext *cct, |
f67539c2 | 124 | F *f, |
7c673cae FG |
125 | unsigned max_allowed, |
126 | unsigned min_priority = 0) | |
3efd9988 FG |
127 | : cct(cct), |
128 | f(f), | |
7c673cae | 129 | max_allowed(max_allowed), |
11fdf7f2 | 130 | min_priority(min_priority) {} |
7c673cae FG |
131 | |
132 | void set_max(unsigned max) { | |
11fdf7f2 | 133 | std::lock_guard l(lock); |
7c673cae FG |
134 | max_allowed = max; |
135 | do_queues(); | |
136 | } | |
137 | ||
138 | void set_min_priority(unsigned min) { | |
11fdf7f2 | 139 | std::lock_guard l(lock); |
7c673cae FG |
140 | min_priority = min; |
141 | do_queues(); | |
142 | } | |
143 | ||
a8e16298 TL |
144 | /** |
145 | * Update the priority of a reservation | |
146 | * | |
147 | * Note, on_reserved may be called following update_priority. Thus, | |
148 | * the callback must be safe in that case. Callback will be called | |
149 | * with no locks held. cancel_reservation must be called to release the | |
150 | * reservation slot. | |
151 | * | |
152 | * Cases | |
153 | * 1. Item is queued, re-queue with new priority | |
154 | * 2. Item is queued, re-queue and preempt if new priority higher than an in progress item | |
155 | * 3. Item is in progress, just adjust priority if no higher priority waiting | |
156 | * 4. Item is in progress, adjust priority if higher priority items waiting preempt item | |
157 | * | |
158 | */ | |
159 | void update_priority(T item, unsigned newprio) { | |
11fdf7f2 | 160 | std::lock_guard l(lock); |
a8e16298 TL |
161 | auto i = queue_pointers.find(item); |
162 | if (i != queue_pointers.end()) { | |
163 | unsigned prio = i->second.first; | |
164 | if (newprio == prio) | |
165 | return; | |
166 | Reservation r = *i->second.second; | |
167 | rdout(10) << __func__ << " update " << r << " (was queued)" << dendl; | |
168 | // Like cancel_reservation() without preempting | |
169 | queues[prio].erase(i->second.second); | |
170 | if (queues[prio].empty()) { | |
171 | queues.erase(prio); | |
172 | } | |
173 | queue_pointers.erase(i); | |
174 | ||
175 | // Like request_reservation() to re-queue it but with new priority | |
11fdf7f2 | 176 | ceph_assert(!queue_pointers.count(item) && |
a8e16298 TL |
177 | !in_progress.count(item)); |
178 | r.prio = newprio; | |
179 | queues[newprio].push_back(r); | |
f67539c2 TL |
180 | queue_pointers.insert(std::make_pair(item, |
181 | std::make_pair(newprio,--(queues[newprio]).end()))); | |
a8e16298 TL |
182 | } else { |
183 | auto p = in_progress.find(item); | |
184 | if (p != in_progress.end()) { | |
185 | if (p->second.prio == newprio) | |
186 | return; | |
187 | rdout(10) << __func__ << " update " << p->second | |
188 | << " (in progress)" << dendl; | |
189 | // We want to preempt if priority goes down | |
190 | // and smaller then highest priority waiting | |
191 | if (p->second.preempt) { | |
192 | if (newprio < p->second.prio && !queues.empty()) { | |
193 | // choose highest priority queue | |
194 | auto it = queues.end(); | |
195 | --it; | |
11fdf7f2 | 196 | ceph_assert(!it->second.empty()); |
a8e16298 TL |
197 | if (it->first > newprio) { |
198 | rdout(10) << __func__ << " update " << p->second | |
199 | << " lowered priority let do_queues() preempt it" << dendl; | |
200 | } | |
201 | } | |
f67539c2 | 202 | preempt_by_prio.erase(std::make_pair(p->second.prio, p->second.item)); |
a8e16298 | 203 | p->second.prio = newprio; |
f67539c2 | 204 | preempt_by_prio.insert(std::make_pair(p->second.prio, p->second.item)); |
a8e16298 TL |
205 | } else { |
206 | p->second.prio = newprio; | |
207 | } | |
208 | } else { | |
209 | rdout(10) << __func__ << " update " << item << " (not found)" << dendl; | |
210 | } | |
211 | } | |
212 | do_queues(); | |
213 | return; | |
214 | } | |
215 | ||
f67539c2 | 216 | void dump(ceph::Formatter *f) { |
11fdf7f2 | 217 | std::lock_guard l(lock); |
3efd9988 FG |
218 | _dump(f); |
219 | } | |
f67539c2 | 220 | void _dump(ceph::Formatter *f) { |
7c673cae FG |
221 | f->dump_unsigned("max_allowed", max_allowed); |
222 | f->dump_unsigned("min_priority", min_priority); | |
223 | f->open_array_section("queues"); | |
3efd9988 | 224 | for (auto& p : queues) { |
7c673cae | 225 | f->open_object_section("queue"); |
3efd9988 | 226 | f->dump_unsigned("priority", p.first); |
7c673cae | 227 | f->open_array_section("items"); |
3efd9988 FG |
228 | for (auto& q : p.second) { |
229 | f->dump_object("item", q); | |
7c673cae FG |
230 | } |
231 | f->close_section(); | |
232 | f->close_section(); | |
233 | } | |
234 | f->close_section(); | |
235 | f->open_array_section("in_progress"); | |
3efd9988 FG |
236 | for (auto& p : in_progress) { |
237 | f->dump_object("item", p.second); | |
7c673cae FG |
238 | } |
239 | f->close_section(); | |
240 | } | |
241 | ||
242 | /** | |
243 | * Requests a reservation | |
244 | * | |
245 | * Note, on_reserved may be called following cancel_reservation. Thus, | |
246 | * the callback must be safe in that case. Callback will be called | |
247 | * with no locks held. cancel_reservation must be called to release the | |
248 | * reservation slot. | |
249 | */ | |
250 | void request_reservation( | |
251 | T item, ///< [in] reservation key | |
252 | Context *on_reserved, ///< [in] callback to be called on reservation | |
3efd9988 FG |
253 | unsigned prio, ///< [in] priority |
254 | Context *on_preempt = 0 ///< [in] callback to be called if we are preempted (optional) | |
7c673cae | 255 | ) { |
11fdf7f2 | 256 | std::lock_guard l(lock); |
3efd9988 FG |
257 | Reservation r(item, prio, on_reserved, on_preempt); |
258 | rdout(10) << __func__ << " queue " << r << dendl; | |
11fdf7f2 | 259 | ceph_assert(!queue_pointers.count(item) && |
7c673cae | 260 | !in_progress.count(item)); |
3efd9988 | 261 | queues[prio].push_back(r); |
f67539c2 TL |
262 | queue_pointers.insert(std::make_pair(item, |
263 | std::make_pair(prio,--(queues[prio]).end()))); | |
7c673cae FG |
264 | do_queues(); |
265 | } | |
266 | ||
267 | /** | |
268 | * Cancels reservation | |
269 | * | |
270 | * Frees the reservation under key for use. | |
271 | * Note, after cancel_reservation, the reservation_callback may or | |
272 | * may not still be called. | |
273 | */ | |
274 | void cancel_reservation( | |
275 | T item ///< [in] key for reservation to cancel | |
276 | ) { | |
11fdf7f2 | 277 | std::lock_guard l(lock); |
3efd9988 FG |
278 | auto i = queue_pointers.find(item); |
279 | if (i != queue_pointers.end()) { | |
280 | unsigned prio = i->second.first; | |
281 | const Reservation& r = *i->second.second; | |
282 | rdout(10) << __func__ << " cancel " << r << " (was queued)" << dendl; | |
283 | delete r.grant; | |
284 | delete r.preempt; | |
285 | queues[prio].erase(i->second.second); | |
286 | if (queues[prio].empty()) { | |
287 | queues.erase(prio); | |
288 | } | |
289 | queue_pointers.erase(i); | |
7c673cae | 290 | } else { |
3efd9988 FG |
291 | auto p = in_progress.find(item); |
292 | if (p != in_progress.end()) { | |
293 | rdout(10) << __func__ << " cancel " << p->second | |
294 | << " (was in progress)" << dendl; | |
295 | if (p->second.preempt) { | |
f67539c2 | 296 | preempt_by_prio.erase(std::make_pair(p->second.prio, p->second.item)); |
3efd9988 FG |
297 | delete p->second.preempt; |
298 | } | |
299 | in_progress.erase(p); | |
300 | } else { | |
301 | rdout(10) << __func__ << " cancel " << item << " (not found)" << dendl; | |
302 | } | |
7c673cae FG |
303 | } |
304 | do_queues(); | |
305 | } | |
b5b8bbf5 FG |
306 | |
307 | /** | |
308 | * Has reservations | |
309 | * | |
310 | * Return true if there are reservations in progress | |
311 | */ | |
312 | bool has_reservation() { | |
11fdf7f2 | 313 | std::lock_guard l(lock); |
b5b8bbf5 FG |
314 | return !in_progress.empty(); |
315 | } | |
7c673cae FG |
316 | static const unsigned MAX_PRIORITY = (unsigned)-1; |
317 | }; | |
318 | ||
3efd9988 | 319 | #undef rdout |
7c673cae | 320 | #endif |