]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/AsyncReserver.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / common / AsyncReserver.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef ASYNC_RESERVER_H
16 #define ASYNC_RESERVER_H
17
18 #include <map>
19 #include <utility>
20 #include <list>
21
22 #include "common/Mutex.h"
23 #include "common/Finisher.h"
24 #include "common/Formatter.h"
25
26 /**
27 * Manages a configurable number of asyncronous reservations.
28 *
29 * Memory usage is linear with the number of items queued and
30 * linear with respect to the total number of priorities used
31 * over all time.
32 */
33 template <typename T>
34 class AsyncReserver {
35 Finisher *f;
36 unsigned max_allowed;
37 unsigned min_priority;
38 Mutex lock;
39
40 map<unsigned, list<pair<T, Context*> > > queues;
41 map<T, pair<unsigned, typename list<pair<T, Context*> >::iterator > > queue_pointers;
42 set<T> in_progress;
43
44 void do_queues() {
45 typename map<unsigned, list<pair<T, Context*> > >::reverse_iterator it;
46 for (it = queues.rbegin();
47 it != queues.rend() &&
48 in_progress.size() < max_allowed &&
49 it->first >= min_priority;
50 ++it) {
51 while (in_progress.size() < max_allowed &&
52 !it->second.empty()) {
53 pair<T, Context*> p = it->second.front();
54 queue_pointers.erase(p.first);
55 it->second.pop_front();
56 f->queue(p.second);
57 in_progress.insert(p.first);
58 }
59 }
60 }
61 public:
62 AsyncReserver(
63 Finisher *f,
64 unsigned max_allowed,
65 unsigned min_priority = 0)
66 : f(f),
67 max_allowed(max_allowed),
68 min_priority(min_priority),
69 lock("AsyncReserver::lock") {}
70
71 void set_max(unsigned max) {
72 Mutex::Locker l(lock);
73 max_allowed = max;
74 do_queues();
75 }
76
77 void set_min_priority(unsigned min) {
78 Mutex::Locker l(lock);
79 min_priority = min;
80 do_queues();
81 }
82
83 void dump(Formatter *f) {
84 Mutex::Locker l(lock);
85 f->dump_unsigned("max_allowed", max_allowed);
86 f->dump_unsigned("min_priority", min_priority);
87 f->open_array_section("queues");
88 for (typename map<unsigned, list<pair<T, Context*> > > ::const_iterator p =
89 queues.begin(); p != queues.end(); ++p) {
90 f->open_object_section("queue");
91 f->dump_unsigned("priority", p->first);
92 f->open_array_section("items");
93 for (typename list<pair<T, Context*> >::const_iterator q =
94 p->second.begin(); q != p->second.end(); ++q) {
95 f->dump_stream("item") << q->first;
96 }
97 f->close_section();
98 f->close_section();
99 }
100 f->close_section();
101 f->open_array_section("in_progress");
102 for (typename set<T>::const_iterator p = in_progress.begin();
103 p != in_progress.end();
104 ++p) {
105 f->dump_stream("item") << *p;
106 }
107 f->close_section();
108 }
109
110 /**
111 * Requests a reservation
112 *
113 * Note, on_reserved may be called following cancel_reservation. Thus,
114 * the callback must be safe in that case. Callback will be called
115 * with no locks held. cancel_reservation must be called to release the
116 * reservation slot.
117 */
118 void request_reservation(
119 T item, ///< [in] reservation key
120 Context *on_reserved, ///< [in] callback to be called on reservation
121 unsigned prio
122 ) {
123 Mutex::Locker l(lock);
124 assert(!queue_pointers.count(item) &&
125 !in_progress.count(item));
126 queues[prio].push_back(make_pair(item, on_reserved));
127 queue_pointers.insert(make_pair(item, make_pair(prio,--(queues[prio]).end())));
128 do_queues();
129 }
130
131 /**
132 * Cancels reservation
133 *
134 * Frees the reservation under key for use.
135 * Note, after cancel_reservation, the reservation_callback may or
136 * may not still be called.
137 */
138 void cancel_reservation(
139 T item ///< [in] key for reservation to cancel
140 ) {
141 Mutex::Locker l(lock);
142 if (queue_pointers.count(item)) {
143 unsigned prio = queue_pointers[item].first;
144 delete queue_pointers[item].second->second;
145 queues[prio].erase(queue_pointers[item].second);
146 queue_pointers.erase(item);
147 } else {
148 in_progress.erase(item);
149 }
150 do_queues();
151 }
152 static const unsigned MAX_PRIORITY = (unsigned)-1;
153 };
154
155 #endif