]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/AsyncReserver.h
update sources to v12.2.0
[ceph.git] / ceph / src / common / AsyncReserver.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef ASYNC_RESERVER_H
16 #define ASYNC_RESERVER_H
17
18 #include "common/Finisher.h"
19 #include "common/Formatter.h"
20
21 /**
22 * Manages a configurable number of asyncronous reservations.
23 *
24 * Memory usage is linear with the number of items queued and
25 * linear with respect to the total number of priorities used
26 * over all time.
27 */
28 template <typename T>
29 class AsyncReserver {
30 Finisher *f;
31 unsigned max_allowed;
32 unsigned min_priority;
33 Mutex lock;
34
35 map<unsigned, list<pair<T, Context*> > > queues;
36 map<T, pair<unsigned, typename list<pair<T, Context*> >::iterator > > queue_pointers;
37 set<T> in_progress;
38
39 void do_queues() {
40 typename map<unsigned, list<pair<T, Context*> > >::reverse_iterator it;
41 for (it = queues.rbegin();
42 it != queues.rend() &&
43 in_progress.size() < max_allowed &&
44 it->first >= min_priority;
45 ++it) {
46 while (in_progress.size() < max_allowed &&
47 !it->second.empty()) {
48 pair<T, Context*> p = it->second.front();
49 queue_pointers.erase(p.first);
50 it->second.pop_front();
51 f->queue(p.second);
52 in_progress.insert(p.first);
53 }
54 }
55 }
56 public:
57 AsyncReserver(
58 Finisher *f,
59 unsigned max_allowed,
60 unsigned min_priority = 0)
61 : f(f),
62 max_allowed(max_allowed),
63 min_priority(min_priority),
64 lock("AsyncReserver::lock") {}
65
66 void set_max(unsigned max) {
67 Mutex::Locker l(lock);
68 max_allowed = max;
69 do_queues();
70 }
71
72 void set_min_priority(unsigned min) {
73 Mutex::Locker l(lock);
74 min_priority = min;
75 do_queues();
76 }
77
78 void dump(Formatter *f) {
79 Mutex::Locker l(lock);
80 f->dump_unsigned("max_allowed", max_allowed);
81 f->dump_unsigned("min_priority", min_priority);
82 f->open_array_section("queues");
83 for (typename map<unsigned, list<pair<T, Context*> > > ::const_iterator p =
84 queues.begin(); p != queues.end(); ++p) {
85 f->open_object_section("queue");
86 f->dump_unsigned("priority", p->first);
87 f->open_array_section("items");
88 for (typename list<pair<T, Context*> >::const_iterator q =
89 p->second.begin(); q != p->second.end(); ++q) {
90 f->dump_stream("item") << q->first;
91 }
92 f->close_section();
93 f->close_section();
94 }
95 f->close_section();
96 f->open_array_section("in_progress");
97 for (typename set<T>::const_iterator p = in_progress.begin();
98 p != in_progress.end();
99 ++p) {
100 f->dump_stream("item") << *p;
101 }
102 f->close_section();
103 }
104
105 /**
106 * Requests a reservation
107 *
108 * Note, on_reserved may be called following cancel_reservation. Thus,
109 * the callback must be safe in that case. Callback will be called
110 * with no locks held. cancel_reservation must be called to release the
111 * reservation slot.
112 */
113 void request_reservation(
114 T item, ///< [in] reservation key
115 Context *on_reserved, ///< [in] callback to be called on reservation
116 unsigned prio
117 ) {
118 Mutex::Locker l(lock);
119 assert(!queue_pointers.count(item) &&
120 !in_progress.count(item));
121 queues[prio].push_back(make_pair(item, on_reserved));
122 queue_pointers.insert(make_pair(item, make_pair(prio,--(queues[prio]).end())));
123 do_queues();
124 }
125
126 /**
127 * Cancels reservation
128 *
129 * Frees the reservation under key for use.
130 * Note, after cancel_reservation, the reservation_callback may or
131 * may not still be called.
132 */
133 void cancel_reservation(
134 T item ///< [in] key for reservation to cancel
135 ) {
136 Mutex::Locker l(lock);
137 if (queue_pointers.count(item)) {
138 unsigned prio = queue_pointers[item].first;
139 delete queue_pointers[item].second->second;
140 queues[prio].erase(queue_pointers[item].second);
141 queue_pointers.erase(item);
142 } else {
143 in_progress.erase(item);
144 }
145 do_queues();
146 }
147
148 /**
149 * Has reservations
150 *
151 * Return true if there are reservations in progress
152 */
153 bool has_reservation() {
154 Mutex::Locker l(lock);
155 return !in_progress.empty();
156 }
157 static const unsigned MAX_PRIORITY = (unsigned)-1;
158 };
159
160 #endif