]>
git.proxmox.com Git - ceph.git/blob - ceph/src/common/AsyncReserver.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef ASYNC_RESERVER_H
16 #define ASYNC_RESERVER_H
18 #include "common/Finisher.h"
19 #include "common/Formatter.h"
21 #define rdout(x) lgeneric_subdout(cct,reserver,x)
24 * Manages a configurable number of asyncronous reservations.
26 * Memory usage is linear with the number of items queued and
27 * linear with respect to the total number of priorities used
35 unsigned min_priority
;
44 Reservation(T i
, unsigned pr
, Context
*g
, Context
*p
= 0)
45 : item(i
), prio(pr
), grant(g
), preempt(p
) {}
46 void dump(Formatter
*f
) const {
47 f
->dump_stream("item") << item
;
48 f
->dump_unsigned("prio", prio
);
49 f
->dump_bool("can_preempt", !!preempt
);
51 friend ostream
& operator<<(ostream
& out
, const Reservation
& r
) {
52 return out
<< r
.item
<< "(prio " << r
.prio
<< " grant " << r
.grant
53 << " preempt " << r
.preempt
<< ")";
57 map
<unsigned, list
<Reservation
>> queues
;
58 map
<T
, pair
<unsigned, typename list
<Reservation
>::iterator
>> queue_pointers
;
59 map
<T
,Reservation
> in_progress
;
60 set
<pair
<unsigned,T
>> preempt_by_prio
; ///< in_progress that can be preempted
63 assert(!preempt_by_prio
.empty());
64 auto q
= in_progress
.find(preempt_by_prio
.begin()->second
);
65 assert(q
!= in_progress
.end());
66 Reservation victim
= q
->second
;
67 rdout(10) << __func__
<< " preempt " << victim
<< dendl
;
68 f
->queue(victim
.preempt
);
69 victim
.preempt
= nullptr;
71 preempt_by_prio
.erase(preempt_by_prio
.begin());
75 rdout(20) << __func__
<< ":\n";
76 JSONFormatter
jf(true);
77 jf
.open_object_section("queue");
83 // in case min_priority was adjusted up or max_allowed was adjusted down
84 while (!preempt_by_prio
.empty() &&
85 (in_progress
.size() > max_allowed
||
86 preempt_by_prio
.begin()->first
< min_priority
)) {
90 while (!queues
.empty()) {
91 // choose highest priority queue
92 auto it
= queues
.end();
94 assert(!it
->second
.empty());
95 if (it
->first
< min_priority
) {
98 if (in_progress
.size() >= max_allowed
&&
99 !preempt_by_prio
.empty() &&
100 it
->first
> preempt_by_prio
.begin()->first
) {
103 if (in_progress
.size() >= max_allowed
) {
107 Reservation p
= it
->second
.front();
108 rdout(10) << __func__
<< " grant " << p
<< dendl
;
109 queue_pointers
.erase(p
.item
);
110 it
->second
.pop_front();
111 if (it
->second
.empty()) {
116 in_progress
[p
.item
] = p
;
118 preempt_by_prio
.insert(make_pair(p
.prio
, p
.item
));
126 unsigned max_allowed
,
127 unsigned min_priority
= 0)
130 max_allowed(max_allowed
),
131 min_priority(min_priority
),
132 lock("AsyncReserver::lock") {}
134 void set_max(unsigned max
) {
135 Mutex::Locker
l(lock
);
140 void set_min_priority(unsigned min
) {
141 Mutex::Locker
l(lock
);
147 * Update the priority of a reservation
149 * Note, on_reserved may be called following update_priority. Thus,
150 * the callback must be safe in that case. Callback will be called
151 * with no locks held. cancel_reservation must be called to release the
155 * 1. Item is queued, re-queue with new priority
156 * 2. Item is queued, re-queue and preempt if new priority higher than an in progress item
157 * 3. Item is in progress, just adjust priority if no higher priority waiting
158 * 4. Item is in progress, adjust priority if higher priority items waiting preempt item
161 void update_priority(T item
, unsigned newprio
) {
162 Mutex::Locker
l(lock
);
163 auto i
= queue_pointers
.find(item
);
164 if (i
!= queue_pointers
.end()) {
165 unsigned prio
= i
->second
.first
;
168 Reservation r
= *i
->second
.second
;
169 rdout(10) << __func__
<< " update " << r
<< " (was queued)" << dendl
;
170 // Like cancel_reservation() without preempting
171 queues
[prio
].erase(i
->second
.second
);
172 if (queues
[prio
].empty()) {
175 queue_pointers
.erase(i
);
177 // Like request_reservation() to re-queue it but with new priority
178 assert(!queue_pointers
.count(item
) &&
179 !in_progress
.count(item
));
181 queues
[newprio
].push_back(r
);
182 queue_pointers
.insert(make_pair(item
,
183 make_pair(newprio
,--(queues
[newprio
]).end())));
185 auto p
= in_progress
.find(item
);
186 if (p
!= in_progress
.end()) {
187 if (p
->second
.prio
== newprio
)
189 rdout(10) << __func__
<< " update " << p
->second
190 << " (in progress)" << dendl
;
191 // We want to preempt if priority goes down
192 // and smaller then highest priority waiting
193 if (p
->second
.preempt
) {
194 if (newprio
< p
->second
.prio
&& !queues
.empty()) {
195 // choose highest priority queue
196 auto it
= queues
.end();
198 assert(!it
->second
.empty());
199 if (it
->first
> newprio
) {
200 rdout(10) << __func__
<< " update " << p
->second
201 << " lowered priority let do_queues() preempt it" << dendl
;
204 preempt_by_prio
.erase(make_pair(p
->second
.prio
, p
->second
.item
));
205 p
->second
.prio
= newprio
;
206 preempt_by_prio
.insert(make_pair(p
->second
.prio
, p
->second
.item
));
208 p
->second
.prio
= newprio
;
211 rdout(10) << __func__
<< " update " << item
<< " (not found)" << dendl
;
218 void dump(Formatter
*f
) {
219 Mutex::Locker
l(lock
);
222 void _dump(Formatter
*f
) {
223 f
->dump_unsigned("max_allowed", max_allowed
);
224 f
->dump_unsigned("min_priority", min_priority
);
225 f
->open_array_section("queues");
226 for (auto& p
: queues
) {
227 f
->open_object_section("queue");
228 f
->dump_unsigned("priority", p
.first
);
229 f
->open_array_section("items");
230 for (auto& q
: p
.second
) {
231 f
->dump_object("item", q
);
237 f
->open_array_section("in_progress");
238 for (auto& p
: in_progress
) {
239 f
->dump_object("item", p
.second
);
245 * Requests a reservation
247 * Note, on_reserved may be called following cancel_reservation. Thus,
248 * the callback must be safe in that case. Callback will be called
249 * with no locks held. cancel_reservation must be called to release the
252 void request_reservation(
253 T item
, ///< [in] reservation key
254 Context
*on_reserved
, ///< [in] callback to be called on reservation
255 unsigned prio
, ///< [in] priority
256 Context
*on_preempt
= 0 ///< [in] callback to be called if we are preempted (optional)
258 Mutex::Locker
l(lock
);
259 Reservation
r(item
, prio
, on_reserved
, on_preempt
);
260 rdout(10) << __func__
<< " queue " << r
<< dendl
;
261 assert(!queue_pointers
.count(item
) &&
262 !in_progress
.count(item
));
263 queues
[prio
].push_back(r
);
264 queue_pointers
.insert(make_pair(item
,
265 make_pair(prio
,--(queues
[prio
]).end())));
270 * Cancels reservation
272 * Frees the reservation under key for use.
273 * Note, after cancel_reservation, the reservation_callback may or
274 * may not still be called.
276 void cancel_reservation(
277 T item
///< [in] key for reservation to cancel
279 Mutex::Locker
l(lock
);
280 auto i
= queue_pointers
.find(item
);
281 if (i
!= queue_pointers
.end()) {
282 unsigned prio
= i
->second
.first
;
283 const Reservation
& r
= *i
->second
.second
;
284 rdout(10) << __func__
<< " cancel " << r
<< " (was queued)" << dendl
;
287 queues
[prio
].erase(i
->second
.second
);
288 if (queues
[prio
].empty()) {
291 queue_pointers
.erase(i
);
293 auto p
= in_progress
.find(item
);
294 if (p
!= in_progress
.end()) {
295 rdout(10) << __func__
<< " cancel " << p
->second
296 << " (was in progress)" << dendl
;
297 if (p
->second
.preempt
) {
298 preempt_by_prio
.erase(make_pair(p
->second
.prio
, p
->second
.item
));
299 delete p
->second
.preempt
;
301 in_progress
.erase(p
);
303 rdout(10) << __func__
<< " cancel " << item
<< " (not found)" << dendl
;
312 * Return true if there are reservations in progress
314 bool has_reservation() {
315 Mutex::Locker
l(lock
);
316 return !in_progress
.empty();
318 static const unsigned MAX_PRIORITY
= (unsigned)-1;