]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef ASYNC_RESERVER_H | |
16 | #define ASYNC_RESERVER_H | |
17 | ||
18 | #include <map> | |
19 | #include <utility> | |
20 | #include <list> | |
21 | ||
22 | #include "common/Mutex.h" | |
23 | #include "common/Finisher.h" | |
24 | #include "common/Formatter.h" | |
25 | ||
26 | /** | |
27 | * Manages a configurable number of asyncronous reservations. | |
28 | * | |
29 | * Memory usage is linear with the number of items queued and | |
30 | * linear with respect to the total number of priorities used | |
31 | * over all time. | |
32 | */ | |
33 | template <typename T> | |
34 | class AsyncReserver { | |
35 | Finisher *f; | |
36 | unsigned max_allowed; | |
37 | unsigned min_priority; | |
38 | Mutex lock; | |
39 | ||
40 | map<unsigned, list<pair<T, Context*> > > queues; | |
41 | map<T, pair<unsigned, typename list<pair<T, Context*> >::iterator > > queue_pointers; | |
42 | set<T> in_progress; | |
43 | ||
44 | void do_queues() { | |
45 | typename map<unsigned, list<pair<T, Context*> > >::reverse_iterator it; | |
46 | for (it = queues.rbegin(); | |
47 | it != queues.rend() && | |
48 | in_progress.size() < max_allowed && | |
49 | it->first >= min_priority; | |
50 | ++it) { | |
51 | while (in_progress.size() < max_allowed && | |
52 | !it->second.empty()) { | |
53 | pair<T, Context*> p = it->second.front(); | |
54 | queue_pointers.erase(p.first); | |
55 | it->second.pop_front(); | |
56 | f->queue(p.second); | |
57 | in_progress.insert(p.first); | |
58 | } | |
59 | } | |
60 | } | |
61 | public: | |
62 | AsyncReserver( | |
63 | Finisher *f, | |
64 | unsigned max_allowed, | |
65 | unsigned min_priority = 0) | |
66 | : f(f), | |
67 | max_allowed(max_allowed), | |
68 | min_priority(min_priority), | |
69 | lock("AsyncReserver::lock") {} | |
70 | ||
71 | void set_max(unsigned max) { | |
72 | Mutex::Locker l(lock); | |
73 | max_allowed = max; | |
74 | do_queues(); | |
75 | } | |
76 | ||
77 | void set_min_priority(unsigned min) { | |
78 | Mutex::Locker l(lock); | |
79 | min_priority = min; | |
80 | do_queues(); | |
81 | } | |
82 | ||
83 | void dump(Formatter *f) { | |
84 | Mutex::Locker l(lock); | |
85 | f->dump_unsigned("max_allowed", max_allowed); | |
86 | f->dump_unsigned("min_priority", min_priority); | |
87 | f->open_array_section("queues"); | |
88 | for (typename map<unsigned, list<pair<T, Context*> > > ::const_iterator p = | |
89 | queues.begin(); p != queues.end(); ++p) { | |
90 | f->open_object_section("queue"); | |
91 | f->dump_unsigned("priority", p->first); | |
92 | f->open_array_section("items"); | |
93 | for (typename list<pair<T, Context*> >::const_iterator q = | |
94 | p->second.begin(); q != p->second.end(); ++q) { | |
95 | f->dump_stream("item") << q->first; | |
96 | } | |
97 | f->close_section(); | |
98 | f->close_section(); | |
99 | } | |
100 | f->close_section(); | |
101 | f->open_array_section("in_progress"); | |
102 | for (typename set<T>::const_iterator p = in_progress.begin(); | |
103 | p != in_progress.end(); | |
104 | ++p) { | |
105 | f->dump_stream("item") << *p; | |
106 | } | |
107 | f->close_section(); | |
108 | } | |
109 | ||
110 | /** | |
111 | * Requests a reservation | |
112 | * | |
113 | * Note, on_reserved may be called following cancel_reservation. Thus, | |
114 | * the callback must be safe in that case. Callback will be called | |
115 | * with no locks held. cancel_reservation must be called to release the | |
116 | * reservation slot. | |
117 | */ | |
118 | void request_reservation( | |
119 | T item, ///< [in] reservation key | |
120 | Context *on_reserved, ///< [in] callback to be called on reservation | |
121 | unsigned prio | |
122 | ) { | |
123 | Mutex::Locker l(lock); | |
124 | assert(!queue_pointers.count(item) && | |
125 | !in_progress.count(item)); | |
126 | queues[prio].push_back(make_pair(item, on_reserved)); | |
127 | queue_pointers.insert(make_pair(item, make_pair(prio,--(queues[prio]).end()))); | |
128 | do_queues(); | |
129 | } | |
130 | ||
131 | /** | |
132 | * Cancels reservation | |
133 | * | |
134 | * Frees the reservation under key for use. | |
135 | * Note, after cancel_reservation, the reservation_callback may or | |
136 | * may not still be called. | |
137 | */ | |
138 | void cancel_reservation( | |
139 | T item ///< [in] key for reservation to cancel | |
140 | ) { | |
141 | Mutex::Locker l(lock); | |
142 | if (queue_pointers.count(item)) { | |
143 | unsigned prio = queue_pointers[item].first; | |
144 | delete queue_pointers[item].second->second; | |
145 | queues[prio].erase(queue_pointers[item].second); | |
146 | queue_pointers.erase(item); | |
147 | } else { | |
148 | in_progress.erase(item); | |
149 | } | |
150 | do_queues(); | |
151 | } | |
152 | static const unsigned MAX_PRIORITY = (unsigned)-1; | |
153 | }; | |
154 | ||
155 | #endif |