]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/AsyncReserver.h
update sources to 12.2.2
[ceph.git] / ceph / src / common / AsyncReserver.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef ASYNC_RESERVER_H
16 #define ASYNC_RESERVER_H
17
18 #include "common/Finisher.h"
19 #include "common/Formatter.h"
20
21 #define rdout(x) lgeneric_subdout(cct,reserver,x)
22
23 /**
24 * Manages a configurable number of asyncronous reservations.
25 *
26 * Memory usage is linear with the number of items queued and
27 * linear with respect to the total number of priorities used
28 * over all time.
29 */
30 template <typename T>
31 class AsyncReserver {
32 CephContext *cct;
33 Finisher *f;
34 unsigned max_allowed;
35 unsigned min_priority;
36 Mutex lock;
37
38 struct Reservation {
39 T item;
40 unsigned prio = 0;
41 Context *grant = 0;
42 Context *preempt = 0;
43 Reservation() {}
44 Reservation(T i, unsigned pr, Context *g, Context *p = 0)
45 : item(i), prio(pr), grant(g), preempt(p) {}
46 void dump(Formatter *f) const {
47 f->dump_stream("item") << item;
48 f->dump_unsigned("prio", prio);
49 f->dump_bool("can_preempt", !!preempt);
50 }
51 friend ostream& operator<<(ostream& out, const Reservation& r) {
52 return out << r.item << "(prio " << r.prio << " grant " << r.grant
53 << " preempt " << r.preempt << ")";
54 }
55 };
56
57 map<unsigned, list<Reservation>> queues;
58 map<T, pair<unsigned, typename list<Reservation>::iterator>> queue_pointers;
59 map<T,Reservation> in_progress;
60 set<pair<unsigned,T>> preempt_by_prio; ///< in_progress that can be preempted
61
62 void preempt_one() {
63 assert(!preempt_by_prio.empty());
64 auto q = in_progress.find(preempt_by_prio.begin()->second);
65 assert(q != in_progress.end());
66 Reservation victim = q->second;
67 rdout(10) << __func__ << " preempt " << victim << dendl;
68 f->queue(victim.preempt);
69 victim.preempt = nullptr;
70 in_progress.erase(q);
71 preempt_by_prio.erase(preempt_by_prio.begin());
72 }
73
74 void do_queues() {
75 rdout(20) << __func__ << ":\n";
76 JSONFormatter jf(true);
77 jf.open_object_section("queue");
78 _dump(&jf);
79 jf.close_section();
80 jf.flush(*_dout);
81 *_dout << dendl;
82
83 // in case min_priority was adjusted up or max_allowed was adjusted down
84 while (!preempt_by_prio.empty() &&
85 (in_progress.size() > max_allowed ||
86 preempt_by_prio.begin()->first < min_priority)) {
87 preempt_one();
88 }
89
90 while (!queues.empty()) {
91 // choose highest priority queue
92 auto it = queues.end();
93 --it;
94 assert(!it->second.empty());
95 if (it->first < min_priority) {
96 break;
97 }
98 if (in_progress.size() >= max_allowed &&
99 !preempt_by_prio.empty() &&
100 it->first > preempt_by_prio.begin()->first) {
101 preempt_one();
102 }
103 if (in_progress.size() >= max_allowed) {
104 break; // no room
105 }
106 // grant
107 Reservation p = it->second.front();
108 rdout(10) << __func__ << " grant " << p << dendl;
109 queue_pointers.erase(p.item);
110 it->second.pop_front();
111 if (it->second.empty()) {
112 queues.erase(it);
113 }
114 f->queue(p.grant);
115 p.grant = nullptr;
116 in_progress[p.item] = p;
117 if (p.preempt) {
118 preempt_by_prio.insert(make_pair(p.prio, p.item));
119 }
120 }
121 }
122 public:
123 AsyncReserver(
124 CephContext *cct,
125 Finisher *f,
126 unsigned max_allowed,
127 unsigned min_priority = 0)
128 : cct(cct),
129 f(f),
130 max_allowed(max_allowed),
131 min_priority(min_priority),
132 lock("AsyncReserver::lock") {}
133
134 void set_max(unsigned max) {
135 Mutex::Locker l(lock);
136 max_allowed = max;
137 do_queues();
138 }
139
140 void set_min_priority(unsigned min) {
141 Mutex::Locker l(lock);
142 min_priority = min;
143 do_queues();
144 }
145
146 void dump(Formatter *f) {
147 Mutex::Locker l(lock);
148 _dump(f);
149 }
150 void _dump(Formatter *f) {
151 f->dump_unsigned("max_allowed", max_allowed);
152 f->dump_unsigned("min_priority", min_priority);
153 f->open_array_section("queues");
154 for (auto& p : queues) {
155 f->open_object_section("queue");
156 f->dump_unsigned("priority", p.first);
157 f->open_array_section("items");
158 for (auto& q : p.second) {
159 f->dump_object("item", q);
160 }
161 f->close_section();
162 f->close_section();
163 }
164 f->close_section();
165 f->open_array_section("in_progress");
166 for (auto& p : in_progress) {
167 f->dump_object("item", p.second);
168 }
169 f->close_section();
170 }
171
172 /**
173 * Requests a reservation
174 *
175 * Note, on_reserved may be called following cancel_reservation. Thus,
176 * the callback must be safe in that case. Callback will be called
177 * with no locks held. cancel_reservation must be called to release the
178 * reservation slot.
179 */
180 void request_reservation(
181 T item, ///< [in] reservation key
182 Context *on_reserved, ///< [in] callback to be called on reservation
183 unsigned prio, ///< [in] priority
184 Context *on_preempt = 0 ///< [in] callback to be called if we are preempted (optional)
185 ) {
186 Mutex::Locker l(lock);
187 Reservation r(item, prio, on_reserved, on_preempt);
188 rdout(10) << __func__ << " queue " << r << dendl;
189 assert(!queue_pointers.count(item) &&
190 !in_progress.count(item));
191 queues[prio].push_back(r);
192 queue_pointers.insert(make_pair(item,
193 make_pair(prio,--(queues[prio]).end())));
194 do_queues();
195 }
196
197 /**
198 * Cancels reservation
199 *
200 * Frees the reservation under key for use.
201 * Note, after cancel_reservation, the reservation_callback may or
202 * may not still be called.
203 */
204 void cancel_reservation(
205 T item ///< [in] key for reservation to cancel
206 ) {
207 Mutex::Locker l(lock);
208 auto i = queue_pointers.find(item);
209 if (i != queue_pointers.end()) {
210 unsigned prio = i->second.first;
211 const Reservation& r = *i->second.second;
212 rdout(10) << __func__ << " cancel " << r << " (was queued)" << dendl;
213 delete r.grant;
214 delete r.preempt;
215 queues[prio].erase(i->second.second);
216 if (queues[prio].empty()) {
217 queues.erase(prio);
218 }
219 queue_pointers.erase(i);
220 } else {
221 auto p = in_progress.find(item);
222 if (p != in_progress.end()) {
223 rdout(10) << __func__ << " cancel " << p->second
224 << " (was in progress)" << dendl;
225 if (p->second.preempt) {
226 preempt_by_prio.erase(make_pair(p->second.prio, p->second.item));
227 delete p->second.preempt;
228 }
229 in_progress.erase(p);
230 } else {
231 rdout(10) << __func__ << " cancel " << item << " (not found)" << dendl;
232 }
233 }
234 do_queues();
235 }
236
237 /**
238 * Has reservations
239 *
240 * Return true if there are reservations in progress
241 */
242 bool has_reservation() {
243 Mutex::Locker l(lock);
244 return !in_progress.empty();
245 }
246 static const unsigned MAX_PRIORITY = (unsigned)-1;
247 };
248
249 #undef rdout
250 #endif