]>
git.proxmox.com Git - ceph.git/blob - ceph/src/common/AsyncReserver.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef ASYNC_RESERVER_H
16 #define ASYNC_RESERVER_H
18 #include "common/Finisher.h"
19 #include "common/Formatter.h"
21 #define rdout(x) lgeneric_subdout(cct,reserver,x)
24 * Manages a configurable number of asyncronous reservations.
26 * Memory usage is linear with the number of items queued and
27 * linear with respect to the total number of priorities used
35 unsigned min_priority
;
44 Reservation(T i
, unsigned pr
, Context
*g
, Context
*p
= 0)
45 : item(i
), prio(pr
), grant(g
), preempt(p
) {}
46 void dump(Formatter
*f
) const {
47 f
->dump_stream("item") << item
;
48 f
->dump_unsigned("prio", prio
);
49 f
->dump_bool("can_preempt", !!preempt
);
51 friend ostream
& operator<<(ostream
& out
, const Reservation
& r
) {
52 return out
<< r
.item
<< "(prio " << r
.prio
<< " grant " << r
.grant
53 << " preempt " << r
.preempt
<< ")";
57 map
<unsigned, list
<Reservation
>> queues
;
58 map
<T
, pair
<unsigned, typename list
<Reservation
>::iterator
>> queue_pointers
;
59 map
<T
,Reservation
> in_progress
;
60 set
<pair
<unsigned,T
>> preempt_by_prio
; ///< in_progress that can be preempted
63 assert(!preempt_by_prio
.empty());
64 auto q
= in_progress
.find(preempt_by_prio
.begin()->second
);
65 assert(q
!= in_progress
.end());
66 Reservation victim
= q
->second
;
67 rdout(10) << __func__
<< " preempt " << victim
<< dendl
;
68 f
->queue(victim
.preempt
);
69 victim
.preempt
= nullptr;
71 preempt_by_prio
.erase(preempt_by_prio
.begin());
75 rdout(20) << __func__
<< ":\n";
76 JSONFormatter
jf(true);
77 jf
.open_object_section("queue");
83 // in case min_priority was adjusted up or max_allowed was adjusted down
84 while (!preempt_by_prio
.empty() &&
85 (in_progress
.size() > max_allowed
||
86 preempt_by_prio
.begin()->first
< min_priority
)) {
90 while (!queues
.empty()) {
91 // choose highest priority queue
92 auto it
= queues
.end();
94 assert(!it
->second
.empty());
95 if (it
->first
< min_priority
) {
98 if (in_progress
.size() >= max_allowed
&&
99 !preempt_by_prio
.empty() &&
100 it
->first
> preempt_by_prio
.begin()->first
) {
103 if (in_progress
.size() >= max_allowed
) {
107 Reservation p
= it
->second
.front();
108 rdout(10) << __func__
<< " grant " << p
<< dendl
;
109 queue_pointers
.erase(p
.item
);
110 it
->second
.pop_front();
111 if (it
->second
.empty()) {
116 in_progress
[p
.item
] = p
;
118 preempt_by_prio
.insert(make_pair(p
.prio
, p
.item
));
126 unsigned max_allowed
,
127 unsigned min_priority
= 0)
130 max_allowed(max_allowed
),
131 min_priority(min_priority
),
132 lock("AsyncReserver::lock") {}
134 void set_max(unsigned max
) {
135 Mutex::Locker
l(lock
);
140 void set_min_priority(unsigned min
) {
141 Mutex::Locker
l(lock
);
146 void dump(Formatter
*f
) {
147 Mutex::Locker
l(lock
);
150 void _dump(Formatter
*f
) {
151 f
->dump_unsigned("max_allowed", max_allowed
);
152 f
->dump_unsigned("min_priority", min_priority
);
153 f
->open_array_section("queues");
154 for (auto& p
: queues
) {
155 f
->open_object_section("queue");
156 f
->dump_unsigned("priority", p
.first
);
157 f
->open_array_section("items");
158 for (auto& q
: p
.second
) {
159 f
->dump_object("item", q
);
165 f
->open_array_section("in_progress");
166 for (auto& p
: in_progress
) {
167 f
->dump_object("item", p
.second
);
173 * Requests a reservation
175 * Note, on_reserved may be called following cancel_reservation. Thus,
176 * the callback must be safe in that case. Callback will be called
177 * with no locks held. cancel_reservation must be called to release the
180 void request_reservation(
181 T item
, ///< [in] reservation key
182 Context
*on_reserved
, ///< [in] callback to be called on reservation
183 unsigned prio
, ///< [in] priority
184 Context
*on_preempt
= 0 ///< [in] callback to be called if we are preempted (optional)
186 Mutex::Locker
l(lock
);
187 Reservation
r(item
, prio
, on_reserved
, on_preempt
);
188 rdout(10) << __func__
<< " queue " << r
<< dendl
;
189 assert(!queue_pointers
.count(item
) &&
190 !in_progress
.count(item
));
191 queues
[prio
].push_back(r
);
192 queue_pointers
.insert(make_pair(item
,
193 make_pair(prio
,--(queues
[prio
]).end())));
198 * Cancels reservation
200 * Frees the reservation under key for use.
201 * Note, after cancel_reservation, the reservation_callback may or
202 * may not still be called.
204 void cancel_reservation(
205 T item
///< [in] key for reservation to cancel
207 Mutex::Locker
l(lock
);
208 auto i
= queue_pointers
.find(item
);
209 if (i
!= queue_pointers
.end()) {
210 unsigned prio
= i
->second
.first
;
211 const Reservation
& r
= *i
->second
.second
;
212 rdout(10) << __func__
<< " cancel " << r
<< " (was queued)" << dendl
;
215 queues
[prio
].erase(i
->second
.second
);
216 if (queues
[prio
].empty()) {
219 queue_pointers
.erase(i
);
221 auto p
= in_progress
.find(item
);
222 if (p
!= in_progress
.end()) {
223 rdout(10) << __func__
<< " cancel " << p
->second
224 << " (was in progress)" << dendl
;
225 if (p
->second
.preempt
) {
226 preempt_by_prio
.erase(make_pair(p
->second
.prio
, p
->second
.item
));
227 delete p
->second
.preempt
;
229 in_progress
.erase(p
);
231 rdout(10) << __func__
<< " cancel " << item
<< " (not found)" << dendl
;
240 * Return true if there are reservations in progress
242 bool has_reservation() {
243 Mutex::Locker
l(lock
);
244 return !in_progress
.empty();
246 static const unsigned MAX_PRIORITY
= (unsigned)-1;