]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef ASYNC_RESERVER_H | |
16 | #define ASYNC_RESERVER_H | |
17 | ||
7c673cae FG |
18 | #include "common/Finisher.h" |
19 | #include "common/Formatter.h" | |
20 | ||
21 | /** | |
22 | * Manages a configurable number of asyncronous reservations. | |
23 | * | |
24 | * Memory usage is linear with the number of items queued and | |
25 | * linear with respect to the total number of priorities used | |
26 | * over all time. | |
27 | */ | |
28 | template <typename T> | |
29 | class AsyncReserver { | |
30 | Finisher *f; | |
31 | unsigned max_allowed; | |
32 | unsigned min_priority; | |
33 | Mutex lock; | |
34 | ||
35 | map<unsigned, list<pair<T, Context*> > > queues; | |
36 | map<T, pair<unsigned, typename list<pair<T, Context*> >::iterator > > queue_pointers; | |
37 | set<T> in_progress; | |
38 | ||
39 | void do_queues() { | |
40 | typename map<unsigned, list<pair<T, Context*> > >::reverse_iterator it; | |
41 | for (it = queues.rbegin(); | |
42 | it != queues.rend() && | |
43 | in_progress.size() < max_allowed && | |
44 | it->first >= min_priority; | |
45 | ++it) { | |
46 | while (in_progress.size() < max_allowed && | |
47 | !it->second.empty()) { | |
48 | pair<T, Context*> p = it->second.front(); | |
49 | queue_pointers.erase(p.first); | |
50 | it->second.pop_front(); | |
51 | f->queue(p.second); | |
52 | in_progress.insert(p.first); | |
53 | } | |
54 | } | |
55 | } | |
56 | public: | |
57 | AsyncReserver( | |
58 | Finisher *f, | |
59 | unsigned max_allowed, | |
60 | unsigned min_priority = 0) | |
61 | : f(f), | |
62 | max_allowed(max_allowed), | |
63 | min_priority(min_priority), | |
64 | lock("AsyncReserver::lock") {} | |
65 | ||
66 | void set_max(unsigned max) { | |
67 | Mutex::Locker l(lock); | |
68 | max_allowed = max; | |
69 | do_queues(); | |
70 | } | |
71 | ||
72 | void set_min_priority(unsigned min) { | |
73 | Mutex::Locker l(lock); | |
74 | min_priority = min; | |
75 | do_queues(); | |
76 | } | |
77 | ||
78 | void dump(Formatter *f) { | |
79 | Mutex::Locker l(lock); | |
80 | f->dump_unsigned("max_allowed", max_allowed); | |
81 | f->dump_unsigned("min_priority", min_priority); | |
82 | f->open_array_section("queues"); | |
83 | for (typename map<unsigned, list<pair<T, Context*> > > ::const_iterator p = | |
84 | queues.begin(); p != queues.end(); ++p) { | |
85 | f->open_object_section("queue"); | |
86 | f->dump_unsigned("priority", p->first); | |
87 | f->open_array_section("items"); | |
88 | for (typename list<pair<T, Context*> >::const_iterator q = | |
89 | p->second.begin(); q != p->second.end(); ++q) { | |
90 | f->dump_stream("item") << q->first; | |
91 | } | |
92 | f->close_section(); | |
93 | f->close_section(); | |
94 | } | |
95 | f->close_section(); | |
96 | f->open_array_section("in_progress"); | |
97 | for (typename set<T>::const_iterator p = in_progress.begin(); | |
98 | p != in_progress.end(); | |
99 | ++p) { | |
100 | f->dump_stream("item") << *p; | |
101 | } | |
102 | f->close_section(); | |
103 | } | |
104 | ||
105 | /** | |
106 | * Requests a reservation | |
107 | * | |
108 | * Note, on_reserved may be called following cancel_reservation. Thus, | |
109 | * the callback must be safe in that case. Callback will be called | |
110 | * with no locks held. cancel_reservation must be called to release the | |
111 | * reservation slot. | |
112 | */ | |
113 | void request_reservation( | |
114 | T item, ///< [in] reservation key | |
115 | Context *on_reserved, ///< [in] callback to be called on reservation | |
116 | unsigned prio | |
117 | ) { | |
118 | Mutex::Locker l(lock); | |
119 | assert(!queue_pointers.count(item) && | |
120 | !in_progress.count(item)); | |
121 | queues[prio].push_back(make_pair(item, on_reserved)); | |
122 | queue_pointers.insert(make_pair(item, make_pair(prio,--(queues[prio]).end()))); | |
123 | do_queues(); | |
124 | } | |
125 | ||
126 | /** | |
127 | * Cancels reservation | |
128 | * | |
129 | * Frees the reservation under key for use. | |
130 | * Note, after cancel_reservation, the reservation_callback may or | |
131 | * may not still be called. | |
132 | */ | |
133 | void cancel_reservation( | |
134 | T item ///< [in] key for reservation to cancel | |
135 | ) { | |
136 | Mutex::Locker l(lock); | |
137 | if (queue_pointers.count(item)) { | |
138 | unsigned prio = queue_pointers[item].first; | |
139 | delete queue_pointers[item].second->second; | |
140 | queues[prio].erase(queue_pointers[item].second); | |
141 | queue_pointers.erase(item); | |
142 | } else { | |
143 | in_progress.erase(item); | |
144 | } | |
145 | do_queues(); | |
146 | } | |
147 | static const unsigned MAX_PRIORITY = (unsigned)-1; | |
148 | }; | |
149 | ||
150 | #endif |