]>
git.proxmox.com Git - ceph.git/blob - ceph/src/common/AsyncReserver.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef ASYNC_RESERVER_H
16 #define ASYNC_RESERVER_H
18 #include "common/Formatter.h"
20 #define rdout(x) lgeneric_subdout(cct,reserver,x)
23 * Manages a configurable number of asynchronous reservations.
25 * Memory usage is linear with the number of items queued and
26 * linear with respect to the total number of priorities used
29 template <typename T
, typename F
>
34 unsigned min_priority
;
35 ceph::mutex lock
= ceph::make_mutex("AsyncReserver::lock");
43 Reservation(T i
, unsigned pr
, Context
*g
, Context
*p
= 0)
44 : item(i
), prio(pr
), grant(g
), preempt(p
) {}
45 void dump(ceph::Formatter
*f
) const {
46 f
->dump_stream("item") << item
;
47 f
->dump_unsigned("prio", prio
);
48 f
->dump_bool("can_preempt", !!preempt
);
50 friend std::ostream
& operator<<(std::ostream
& out
, const Reservation
& r
) {
51 return out
<< r
.item
<< "(prio " << r
.prio
<< " grant " << r
.grant
52 << " preempt " << r
.preempt
<< ")";
56 std::map
<unsigned, std::list
<Reservation
>> queues
;
57 std::map
<T
, std::pair
<unsigned, typename
std::list
<Reservation
>::iterator
>> queue_pointers
;
58 std::map
<T
,Reservation
> in_progress
;
59 std::set
<std::pair
<unsigned,T
>> preempt_by_prio
; ///< in_progress that can be preempted
62 ceph_assert(!preempt_by_prio
.empty());
63 auto q
= in_progress
.find(preempt_by_prio
.begin()->second
);
64 ceph_assert(q
!= in_progress
.end());
65 Reservation victim
= q
->second
;
66 rdout(10) << __func__
<< " preempt " << victim
<< dendl
;
67 f
->queue(victim
.preempt
);
68 victim
.preempt
= nullptr;
70 preempt_by_prio
.erase(preempt_by_prio
.begin());
74 rdout(20) << __func__
<< ":\n";
75 ceph::JSONFormatter
jf(true);
76 jf
.open_object_section("queue");
82 // in case min_priority was adjusted up or max_allowed was adjusted down
83 while (!preempt_by_prio
.empty() &&
84 (in_progress
.size() > max_allowed
||
85 preempt_by_prio
.begin()->first
< min_priority
)) {
89 while (!queues
.empty()) {
90 // choose highest priority queue
91 auto it
= queues
.end();
93 ceph_assert(!it
->second
.empty());
94 if (it
->first
< min_priority
) {
97 if (in_progress
.size() >= max_allowed
&&
98 !preempt_by_prio
.empty() &&
99 it
->first
> preempt_by_prio
.begin()->first
) {
102 if (in_progress
.size() >= max_allowed
) {
106 Reservation p
= it
->second
.front();
107 rdout(10) << __func__
<< " grant " << p
<< dendl
;
108 queue_pointers
.erase(p
.item
);
109 it
->second
.pop_front();
110 if (it
->second
.empty()) {
115 in_progress
[p
.item
] = p
;
117 preempt_by_prio
.insert(std::make_pair(p
.prio
, p
.item
));
125 unsigned max_allowed
,
126 unsigned min_priority
= 0)
129 max_allowed(max_allowed
),
130 min_priority(min_priority
) {}
132 void set_max(unsigned max
) {
133 std::lock_guard
l(lock
);
138 void set_min_priority(unsigned min
) {
139 std::lock_guard
l(lock
);
145 * Update the priority of a reservation
147 * Note, on_reserved may be called following update_priority. Thus,
148 * the callback must be safe in that case. Callback will be called
149 * with no locks held. cancel_reservation must be called to release the
153 * 1. Item is queued, re-queue with new priority
154 * 2. Item is queued, re-queue and preempt if new priority higher than an in progress item
155 * 3. Item is in progress, just adjust priority if no higher priority waiting
156 * 4. Item is in progress, adjust priority if higher priority items waiting preempt item
159 void update_priority(T item
, unsigned newprio
) {
160 std::lock_guard
l(lock
);
161 auto i
= queue_pointers
.find(item
);
162 if (i
!= queue_pointers
.end()) {
163 unsigned prio
= i
->second
.first
;
166 Reservation r
= *i
->second
.second
;
167 rdout(10) << __func__
<< " update " << r
<< " (was queued)" << dendl
;
168 // Like cancel_reservation() without preempting
169 queues
[prio
].erase(i
->second
.second
);
170 if (queues
[prio
].empty()) {
173 queue_pointers
.erase(i
);
175 // Like request_reservation() to re-queue it but with new priority
176 ceph_assert(!queue_pointers
.count(item
) &&
177 !in_progress
.count(item
));
179 queues
[newprio
].push_back(r
);
180 queue_pointers
.insert(std::make_pair(item
,
181 std::make_pair(newprio
,--(queues
[newprio
]).end())));
183 auto p
= in_progress
.find(item
);
184 if (p
!= in_progress
.end()) {
185 if (p
->second
.prio
== newprio
)
187 rdout(10) << __func__
<< " update " << p
->second
188 << " (in progress)" << dendl
;
189 // We want to preempt if priority goes down
190 // and smaller then highest priority waiting
191 if (p
->second
.preempt
) {
192 if (newprio
< p
->second
.prio
&& !queues
.empty()) {
193 // choose highest priority queue
194 auto it
= queues
.end();
196 ceph_assert(!it
->second
.empty());
197 if (it
->first
> newprio
) {
198 rdout(10) << __func__
<< " update " << p
->second
199 << " lowered priority let do_queues() preempt it" << dendl
;
202 preempt_by_prio
.erase(std::make_pair(p
->second
.prio
, p
->second
.item
));
203 p
->second
.prio
= newprio
;
204 preempt_by_prio
.insert(std::make_pair(p
->second
.prio
, p
->second
.item
));
206 p
->second
.prio
= newprio
;
209 rdout(10) << __func__
<< " update " << item
<< " (not found)" << dendl
;
216 void dump(ceph::Formatter
*f
) {
217 std::lock_guard
l(lock
);
220 void _dump(ceph::Formatter
*f
) {
221 f
->dump_unsigned("max_allowed", max_allowed
);
222 f
->dump_unsigned("min_priority", min_priority
);
223 f
->open_array_section("queues");
224 for (auto& p
: queues
) {
225 f
->open_object_section("queue");
226 f
->dump_unsigned("priority", p
.first
);
227 f
->open_array_section("items");
228 for (auto& q
: p
.second
) {
229 f
->dump_object("item", q
);
235 f
->open_array_section("in_progress");
236 for (auto& p
: in_progress
) {
237 f
->dump_object("item", p
.second
);
243 * Requests a reservation
245 * Note, on_reserved may be called following cancel_reservation. Thus,
246 * the callback must be safe in that case. Callback will be called
247 * with no locks held. cancel_reservation must be called to release the
250 void request_reservation(
251 T item
, ///< [in] reservation key
252 Context
*on_reserved
, ///< [in] callback to be called on reservation
253 unsigned prio
, ///< [in] priority
254 Context
*on_preempt
= 0 ///< [in] callback to be called if we are preempted (optional)
256 std::lock_guard
l(lock
);
257 Reservation
r(item
, prio
, on_reserved
, on_preempt
);
258 rdout(10) << __func__
<< " queue " << r
<< dendl
;
259 ceph_assert(!queue_pointers
.count(item
) &&
260 !in_progress
.count(item
));
261 queues
[prio
].push_back(r
);
262 queue_pointers
.insert(std::make_pair(item
,
263 std::make_pair(prio
,--(queues
[prio
]).end())));
268 * Cancels reservation
270 * Frees the reservation under key for use.
271 * Note, after cancel_reservation, the reservation_callback may or
272 * may not still be called.
274 void cancel_reservation(
275 T item
///< [in] key for reservation to cancel
277 std::lock_guard
l(lock
);
278 auto i
= queue_pointers
.find(item
);
279 if (i
!= queue_pointers
.end()) {
280 unsigned prio
= i
->second
.first
;
281 const Reservation
& r
= *i
->second
.second
;
282 rdout(10) << __func__
<< " cancel " << r
<< " (was queued)" << dendl
;
285 queues
[prio
].erase(i
->second
.second
);
286 if (queues
[prio
].empty()) {
289 queue_pointers
.erase(i
);
291 auto p
= in_progress
.find(item
);
292 if (p
!= in_progress
.end()) {
293 rdout(10) << __func__
<< " cancel " << p
->second
294 << " (was in progress)" << dendl
;
295 if (p
->second
.preempt
) {
296 preempt_by_prio
.erase(std::make_pair(p
->second
.prio
, p
->second
.item
));
297 delete p
->second
.preempt
;
299 in_progress
.erase(p
);
301 rdout(10) << __func__
<< " cancel " << item
<< " (not found)" << dendl
;
310 * Return true if there are reservations in progress
312 bool has_reservation() {
313 std::lock_guard
l(lock
);
314 return !in_progress
.empty();
316 static const unsigned MAX_PRIORITY
= (unsigned)-1;