]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/common/AsyncReserver.h
update sources to 12.2.2
[ceph.git] / ceph / src / common / AsyncReserver.h
index 28512ac800e374804349dddb326ea5bb881586a7..d5c7a852ddf145f682d92c1ce0a442264e615563 100644 (file)
@@ -18,6 +18,8 @@
 #include "common/Finisher.h"
 #include "common/Formatter.h"
 
+#define rdout(x) lgeneric_subdout(cct,reserver,x)
+
 /**
  * Manages a configurable number of asyncronous reservations.
  *
  */
 template <typename T>
 class AsyncReserver {
+  CephContext *cct;
   Finisher *f;
   unsigned max_allowed;
   unsigned min_priority;
   Mutex lock;
 
-  map<unsigned, list<pair<T, Context*> > > queues;
-  map<T, pair<unsigned, typename list<pair<T, Context*> >::iterator > > queue_pointers;
-  set<T> in_progress;
+  struct Reservation {
+    T item;
+    unsigned prio = 0;
+    Context *grant = 0;
+    Context *preempt = 0;
+    Reservation() {}
+    Reservation(T i, unsigned pr, Context *g, Context *p = 0)
+      : item(i), prio(pr), grant(g), preempt(p) {}
+    void dump(Formatter *f) const {
+      f->dump_stream("item") << item;
+      f->dump_unsigned("prio", prio);
+      f->dump_bool("can_preempt", !!preempt);
+    }
+    friend ostream& operator<<(ostream& out, const Reservation& r) {
+      return out << r.item << "(prio " << r.prio << " grant " << r.grant
+                << " preempt " << r.preempt << ")";
+    }
+  };
+
+  map<unsigned, list<Reservation>> queues;
+  map<T, pair<unsigned, typename list<Reservation>::iterator>> queue_pointers;
+  map<T,Reservation> in_progress;
+  set<pair<unsigned,T>> preempt_by_prio;  ///< in_progress that can be preempted
+
+  void preempt_one() {
+    assert(!preempt_by_prio.empty());
+    auto q = in_progress.find(preempt_by_prio.begin()->second);
+    assert(q != in_progress.end());
+    Reservation victim = q->second;
+    rdout(10) << __func__ << " preempt " << victim << dendl;
+    f->queue(victim.preempt);
+    victim.preempt = nullptr;
+    in_progress.erase(q);
+    preempt_by_prio.erase(preempt_by_prio.begin());
+  }
 
   void do_queues() {
-    typename map<unsigned, list<pair<T, Context*> > >::reverse_iterator it;
-    for (it = queues.rbegin();
-         it != queues.rend() &&
-          in_progress.size() < max_allowed &&
-          it->first >= min_priority;
-         ++it) {
-      while (in_progress.size() < max_allowed &&
-             !it->second.empty()) {
-        pair<T, Context*> p = it->second.front();
-        queue_pointers.erase(p.first);
-        it->second.pop_front();
-        f->queue(p.second);
-        in_progress.insert(p.first);
+    rdout(20) << __func__ << ":\n";
+    JSONFormatter jf(true);
+    jf.open_object_section("queue");
+    _dump(&jf);
+    jf.close_section();
+    jf.flush(*_dout);
+    *_dout << dendl;
+
+    // in case min_priority was adjusted up or max_allowed was adjusted down
+    while (!preempt_by_prio.empty() &&
+          (in_progress.size() > max_allowed ||
+           preempt_by_prio.begin()->first < min_priority)) {
+      preempt_one();
+    }
+
+    while (!queues.empty()) {
+      // choose highest priority queue
+      auto it = queues.end();
+      --it;
+      assert(!it->second.empty());
+      if (it->first < min_priority) {
+       break;
+      }
+      if (in_progress.size() >= max_allowed &&
+         !preempt_by_prio.empty() &&
+         it->first > preempt_by_prio.begin()->first) {
+       preempt_one();
+      }
+      if (in_progress.size() >= max_allowed) {
+       break; // no room
+      }
+      // grant
+      Reservation p = it->second.front();
+      rdout(10) << __func__ << " grant " << p << dendl;
+      queue_pointers.erase(p.item);
+      it->second.pop_front();
+      if (it->second.empty()) {
+       queues.erase(it);
+      }
+      f->queue(p.grant);
+      p.grant = nullptr;
+      in_progress[p.item] = p;
+      if (p.preempt) {
+       preempt_by_prio.insert(make_pair(p.prio, p.item));
       }
     }
   }
 public:
   AsyncReserver(
+    CephContext *cct,
     Finisher *f,
     unsigned max_allowed,
     unsigned min_priority = 0)
-    : f(f),
+    : cct(cct),
+      f(f),
       max_allowed(max_allowed),
       min_priority(min_priority),
       lock("AsyncReserver::lock") {}
@@ -77,27 +145,26 @@ public:
 
   void dump(Formatter *f) {
     Mutex::Locker l(lock);
+    _dump(f);
+  }
+  void _dump(Formatter *f) {
     f->dump_unsigned("max_allowed", max_allowed);
     f->dump_unsigned("min_priority", min_priority);
     f->open_array_section("queues");
-    for (typename map<unsigned, list<pair<T, Context*> > > ::const_iterator p =
-          queues.begin(); p != queues.end(); ++p) {
+    for (auto& p : queues) {
       f->open_object_section("queue");
-      f->dump_unsigned("priority", p->first);
+      f->dump_unsigned("priority", p.first);
       f->open_array_section("items");
-      for (typename list<pair<T, Context*> >::const_iterator q =
-            p->second.begin(); q != p->second.end(); ++q) {
-       f->dump_stream("item") << q->first;
+      for (auto& q : p.second) {
+       f->dump_object("item", q);
       }
       f->close_section();
       f->close_section();
     }
     f->close_section();
     f->open_array_section("in_progress");
-    for (typename set<T>::const_iterator p = in_progress.begin();
-        p != in_progress.end();
-        ++p) {
-      f->dump_stream("item") << *p;
+    for (auto& p : in_progress) {
+      f->dump_object("item", p.second);
     }
     f->close_section();
   }
@@ -113,13 +180,17 @@ public:
   void request_reservation(
     T item,                   ///< [in] reservation key
     Context *on_reserved,     ///< [in] callback to be called on reservation
-    unsigned prio
+    unsigned prio,            ///< [in] priority
+    Context *on_preempt = 0   ///< [in] callback to be called if we are preempted (optional)
     ) {
     Mutex::Locker l(lock);
+    Reservation r(item, prio, on_reserved, on_preempt);
+    rdout(10) << __func__ << " queue " << r << dendl;
     assert(!queue_pointers.count(item) &&
           !in_progress.count(item));
-    queues[prio].push_back(make_pair(item, on_reserved));
-    queue_pointers.insert(make_pair(item, make_pair(prio,--(queues[prio]).end())));
+    queues[prio].push_back(r);
+    queue_pointers.insert(make_pair(item,
+                                   make_pair(prio,--(queues[prio]).end())));
     do_queues();
   }
 
@@ -134,13 +205,31 @@ public:
     T item                   ///< [in] key for reservation to cancel
     ) {
     Mutex::Locker l(lock);
-    if (queue_pointers.count(item)) {
-      unsigned prio = queue_pointers[item].first;
-      delete queue_pointers[item].second->second;
-      queues[prio].erase(queue_pointers[item].second);
-      queue_pointers.erase(item);
+    auto i = queue_pointers.find(item);
+    if (i != queue_pointers.end()) {
+      unsigned prio = i->second.first;
+      const Reservation& r = *i->second.second;
+      rdout(10) << __func__ << " cancel " << r << " (was queued)" << dendl;
+      delete r.grant;
+      delete r.preempt;
+      queues[prio].erase(i->second.second);
+      if (queues[prio].empty()) {
+       queues.erase(prio);
+      }
+      queue_pointers.erase(i);
     } else {
-      in_progress.erase(item);
+      auto p = in_progress.find(item);
+      if (p != in_progress.end()) {
+       rdout(10) << __func__ << " cancel " << p->second
+                 << " (was in progress)" << dendl;
+       if (p->second.preempt) {
+         preempt_by_prio.erase(make_pair(p->second.prio, p->second.item));
+         delete p->second.preempt;
+       }
+       in_progress.erase(p);
+      } else {
+       rdout(10) << __func__ << " cancel " << item << " (not found)" << dendl;
+      }
     }
     do_queues();
   }
@@ -157,4 +246,5 @@ public:
   static const unsigned MAX_PRIORITY = (unsigned)-1;
 };
 
+#undef rdout
 #endif