]>
git.proxmox.com Git - ceph.git/blob - ceph/src/common/AsyncReserver.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef ASYNC_RESERVER_H
16 #define ASYNC_RESERVER_H
22 #include "common/Mutex.h"
23 #include "common/Finisher.h"
24 #include "common/Formatter.h"
27 * Manages a configurable number of asyncronous reservations.
29 * Memory usage is linear with the number of items queued and
30 * linear with respect to the total number of priorities used
37 unsigned min_priority
;
40 map
<unsigned, list
<pair
<T
, Context
*> > > queues
;
41 map
<T
, pair
<unsigned, typename list
<pair
<T
, Context
*> >::iterator
> > queue_pointers
;
45 typename map
<unsigned, list
<pair
<T
, Context
*> > >::reverse_iterator it
;
46 for (it
= queues
.rbegin();
47 it
!= queues
.rend() &&
48 in_progress
.size() < max_allowed
&&
49 it
->first
>= min_priority
;
51 while (in_progress
.size() < max_allowed
&&
52 !it
->second
.empty()) {
53 pair
<T
, Context
*> p
= it
->second
.front();
54 queue_pointers
.erase(p
.first
);
55 it
->second
.pop_front();
57 in_progress
.insert(p
.first
);
65 unsigned min_priority
= 0)
67 max_allowed(max_allowed
),
68 min_priority(min_priority
),
69 lock("AsyncReserver::lock") {}
71 void set_max(unsigned max
) {
72 Mutex::Locker
l(lock
);
77 void set_min_priority(unsigned min
) {
78 Mutex::Locker
l(lock
);
83 void dump(Formatter
*f
) {
84 Mutex::Locker
l(lock
);
85 f
->dump_unsigned("max_allowed", max_allowed
);
86 f
->dump_unsigned("min_priority", min_priority
);
87 f
->open_array_section("queues");
88 for (typename map
<unsigned, list
<pair
<T
, Context
*> > > ::const_iterator p
=
89 queues
.begin(); p
!= queues
.end(); ++p
) {
90 f
->open_object_section("queue");
91 f
->dump_unsigned("priority", p
->first
);
92 f
->open_array_section("items");
93 for (typename list
<pair
<T
, Context
*> >::const_iterator q
=
94 p
->second
.begin(); q
!= p
->second
.end(); ++q
) {
95 f
->dump_stream("item") << q
->first
;
101 f
->open_array_section("in_progress");
102 for (typename set
<T
>::const_iterator p
= in_progress
.begin();
103 p
!= in_progress
.end();
105 f
->dump_stream("item") << *p
;
111 * Requests a reservation
113 * Note, on_reserved may be called following cancel_reservation. Thus,
114 * the callback must be safe in that case. Callback will be called
115 * with no locks held. cancel_reservation must be called to release the
118 void request_reservation(
119 T item
, ///< [in] reservation key
120 Context
*on_reserved
, ///< [in] callback to be called on reservation
123 Mutex::Locker
l(lock
);
124 assert(!queue_pointers
.count(item
) &&
125 !in_progress
.count(item
));
126 queues
[prio
].push_back(make_pair(item
, on_reserved
));
127 queue_pointers
.insert(make_pair(item
, make_pair(prio
,--(queues
[prio
]).end())));
132 * Cancels reservation
134 * Frees the reservation under key for use.
135 * Note, after cancel_reservation, the reservation_callback may or
136 * may not still be called.
138 void cancel_reservation(
139 T item
///< [in] key for reservation to cancel
141 Mutex::Locker
l(lock
);
142 if (queue_pointers
.count(item
)) {
143 unsigned prio
= queue_pointers
[item
].first
;
144 delete queue_pointers
[item
].second
->second
;
145 queues
[prio
].erase(queue_pointers
[item
].second
);
146 queue_pointers
.erase(item
);
148 in_progress
.erase(item
);
152 static const unsigned MAX_PRIORITY
= (unsigned)-1;