X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fcommon%2FAsyncReserver.h;h=d5c7a852ddf145f682d92c1ce0a442264e615563;hb=3efd99882e8c73385040d3f5c48fd014e4247be7;hp=28512ac800e374804349dddb326ea5bb881586a7;hpb=1a629971a9bcaaae99e5539a3a43f800a297f267;p=ceph.git diff --git a/ceph/src/common/AsyncReserver.h b/ceph/src/common/AsyncReserver.h index 28512ac80..d5c7a852d 100644 --- a/ceph/src/common/AsyncReserver.h +++ b/ceph/src/common/AsyncReserver.h @@ -18,6 +18,8 @@ #include "common/Finisher.h" #include "common/Formatter.h" +#define rdout(x) lgeneric_subdout(cct,reserver,x) + /** * Manages a configurable number of asyncronous reservations. * @@ -27,38 +29,104 @@ */ template class AsyncReserver { + CephContext *cct; Finisher *f; unsigned max_allowed; unsigned min_priority; Mutex lock; - map > > queues; - map >::iterator > > queue_pointers; - set in_progress; + struct Reservation { + T item; + unsigned prio = 0; + Context *grant = 0; + Context *preempt = 0; + Reservation() {} + Reservation(T i, unsigned pr, Context *g, Context *p = 0) + : item(i), prio(pr), grant(g), preempt(p) {} + void dump(Formatter *f) const { + f->dump_stream("item") << item; + f->dump_unsigned("prio", prio); + f->dump_bool("can_preempt", !!preempt); + } + friend ostream& operator<<(ostream& out, const Reservation& r) { + return out << r.item << "(prio " << r.prio << " grant " << r.grant + << " preempt " << r.preempt << ")"; + } + }; + + map> queues; + map::iterator>> queue_pointers; + map in_progress; + set> preempt_by_prio; ///< in_progress that can be preempted + + void preempt_one() { + assert(!preempt_by_prio.empty()); + auto q = in_progress.find(preempt_by_prio.begin()->second); + assert(q != in_progress.end()); + Reservation victim = q->second; + rdout(10) << __func__ << " preempt " << victim << dendl; + f->queue(victim.preempt); + victim.preempt = nullptr; + in_progress.erase(q); + preempt_by_prio.erase(preempt_by_prio.begin()); + } void do_queues() { - typename map > >::reverse_iterator it; - for (it = queues.rbegin(); - it != queues.rend() && - in_progress.size() < max_allowed && - it->first >= min_priority; - ++it) { - while (in_progress.size() < max_allowed && - !it->second.empty()) { - pair p = it->second.front(); - queue_pointers.erase(p.first); - it->second.pop_front(); - f->queue(p.second); - in_progress.insert(p.first); + rdout(20) << __func__ << ":\n"; + JSONFormatter jf(true); + jf.open_object_section("queue"); + _dump(&jf); + jf.close_section(); + jf.flush(*_dout); + *_dout << dendl; + + // in case min_priority was adjusted up or max_allowed was adjusted down + while (!preempt_by_prio.empty() && + (in_progress.size() > max_allowed || + preempt_by_prio.begin()->first < min_priority)) { + preempt_one(); + } + + while (!queues.empty()) { + // choose highest priority queue + auto it = queues.end(); + --it; + assert(!it->second.empty()); + if (it->first < min_priority) { + break; + } + if (in_progress.size() >= max_allowed && + !preempt_by_prio.empty() && + it->first > preempt_by_prio.begin()->first) { + preempt_one(); + } + if (in_progress.size() >= max_allowed) { + break; // no room + } + // grant + Reservation p = it->second.front(); + rdout(10) << __func__ << " grant " << p << dendl; + queue_pointers.erase(p.item); + it->second.pop_front(); + if (it->second.empty()) { + queues.erase(it); + } + f->queue(p.grant); + p.grant = nullptr; + in_progress[p.item] = p; + if (p.preempt) { + preempt_by_prio.insert(make_pair(p.prio, p.item)); } } } public: AsyncReserver( + CephContext *cct, Finisher *f, unsigned max_allowed, unsigned min_priority = 0) - : f(f), + : cct(cct), + f(f), max_allowed(max_allowed), min_priority(min_priority), lock("AsyncReserver::lock") {} @@ -77,27 +145,26 @@ public: void dump(Formatter *f) { Mutex::Locker l(lock); + _dump(f); + } + void _dump(Formatter *f) { f->dump_unsigned("max_allowed", max_allowed); f->dump_unsigned("min_priority", min_priority); f->open_array_section("queues"); - for (typename map > > ::const_iterator p = - queues.begin(); p != queues.end(); ++p) { + for (auto& p : queues) { f->open_object_section("queue"); - f->dump_unsigned("priority", p->first); + f->dump_unsigned("priority", p.first); f->open_array_section("items"); - for (typename list >::const_iterator q = - p->second.begin(); q != p->second.end(); ++q) { - f->dump_stream("item") << q->first; + for (auto& q : p.second) { + f->dump_object("item", q); } f->close_section(); f->close_section(); } f->close_section(); f->open_array_section("in_progress"); - for (typename set::const_iterator p = in_progress.begin(); - p != in_progress.end(); - ++p) { - f->dump_stream("item") << *p; + for (auto& p : in_progress) { + f->dump_object("item", p.second); } f->close_section(); } @@ -113,13 +180,17 @@ public: void request_reservation( T item, ///< [in] reservation key Context *on_reserved, ///< [in] callback to be called on reservation - unsigned prio + unsigned prio, ///< [in] priority + Context *on_preempt = 0 ///< [in] callback to be called if we are preempted (optional) ) { Mutex::Locker l(lock); + Reservation r(item, prio, on_reserved, on_preempt); + rdout(10) << __func__ << " queue " << r << dendl; assert(!queue_pointers.count(item) && !in_progress.count(item)); - queues[prio].push_back(make_pair(item, on_reserved)); - queue_pointers.insert(make_pair(item, make_pair(prio,--(queues[prio]).end()))); + queues[prio].push_back(r); + queue_pointers.insert(make_pair(item, + make_pair(prio,--(queues[prio]).end()))); do_queues(); } @@ -134,13 +205,31 @@ public: T item ///< [in] key for reservation to cancel ) { Mutex::Locker l(lock); - if (queue_pointers.count(item)) { - unsigned prio = queue_pointers[item].first; - delete queue_pointers[item].second->second; - queues[prio].erase(queue_pointers[item].second); - queue_pointers.erase(item); + auto i = queue_pointers.find(item); + if (i != queue_pointers.end()) { + unsigned prio = i->second.first; + const Reservation& r = *i->second.second; + rdout(10) << __func__ << " cancel " << r << " (was queued)" << dendl; + delete r.grant; + delete r.preempt; + queues[prio].erase(i->second.second); + if (queues[prio].empty()) { + queues.erase(prio); + } + queue_pointers.erase(i); } else { - in_progress.erase(item); + auto p = in_progress.find(item); + if (p != in_progress.end()) { + rdout(10) << __func__ << " cancel " << p->second + << " (was in progress)" << dendl; + if (p->second.preempt) { + preempt_by_prio.erase(make_pair(p->second.prio, p->second.item)); + delete p->second.preempt; + } + in_progress.erase(p); + } else { + rdout(10) << __func__ << " cancel " << item << " (not found)" << dendl; + } } do_queues(); } @@ -157,4 +246,5 @@ public: static const unsigned MAX_PRIORITY = (unsigned)-1; }; +#undef rdout #endif