]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/osd_operation.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / osd / osd_operation.h
index fe06023a4416b0155f16a437daa820ecf71cefec..5178749b0ddabb382e2a8c2d2733974f3a8f799e 100644 (file)
 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
 #include <seastar/core/shared_mutex.hh>
 #include <seastar/core/future.hh>
+#include <seastar/core/timer.hh>
+#include <seastar/core/lowres_clock.hh>
 
 #include "include/ceph_assert.h"
+#include "crimson/osd/scheduler/scheduler.h"
 
 namespace ceph {
   class Formatter;
@@ -23,12 +26,14 @@ namespace crimson::osd {
 
 enum class OperationTypeCode {
   client_request = 0,
-  peering_event = 1,
-  compound_peering_request = 2,
-  pg_advance_map = 3,
-  pg_creation = 4,
-  replicated_request = 5,
-  last_op = 6
+  peering_event,
+  compound_peering_request,
+  pg_advance_map,
+  pg_creation,
+  replicated_request,
+  background_recovery,
+  background_recovery_sub,
+  last_op
 };
 
 static constexpr const char* const OP_NAMES[] = {
@@ -38,6 +43,8 @@ static constexpr const char* const OP_NAMES[] = {
   "pg_advance_map",
   "pg_creation",
   "replicated_request",
+  "background_recovery",
+  "background_recovery_sub",
 };
 
 // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
@@ -57,24 +64,53 @@ class Blocker;
  * Provides an abstraction for registering and unregistering a blocker
  * for the duration of a future becoming available.
  */
-template <typename... T>
-class blocking_future {
+template <typename Fut>
+class blocking_future_detail {
   friend class Operation;
   friend class Blocker;
   Blocker *blocker;
-  seastar::future<T...> fut;
-  blocking_future(Blocker *b, seastar::future<T...> &&f)
+  Fut fut;
+  blocking_future_detail(Blocker *b, Fut &&f)
     : blocker(b), fut(std::move(f)) {}
 
-  template <typename... V, typename... U>
-  friend blocking_future<V...> make_ready_blocking_future(U&&... args);
+  template <typename V, typename U>
+  friend blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args);
+  template <typename V, typename Exception>
+  friend blocking_future_detail<seastar::future<V>>
+  make_exception_blocking_future(Exception&& e);
+
+  template <typename U>
+  friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
+
+  template <typename U>
+  friend class blocking_future_detail;
+
+public:
+  template <typename F>
+  auto then(F &&f) && {
+    using result = decltype(std::declval<Fut>().then(f));
+    return blocking_future_detail<seastar::futurize_t<result>>(
+      blocker,
+      std::move(fut).then(std::forward<F>(f)));
+  }
 };
 
-template <typename... V, typename... U>
-blocking_future<V...> make_ready_blocking_future(U&&... args) {
-  return blocking_future<V...>(
+template <typename T=void>
+using blocking_future = blocking_future_detail<seastar::future<T>>;
+
+template <typename V, typename U>
+blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args) {
+  return blocking_future<V>(
     nullptr,
-    seastar::make_ready_future<V...>(std::forward<U>(args)...));
+    seastar::make_ready_future<V>(std::forward<U>(args)));
+}
+
+template <typename V, typename Exception>
+blocking_future_detail<seastar::future<V>>
+make_exception_blocking_future(Exception&& e) {
+  return blocking_future<V>(
+    nullptr,
+    seastar::make_exception_future<V>(e));
 }
 
 /**
@@ -82,32 +118,59 @@ blocking_future<V...> make_ready_blocking_future(U&&... args) {
  * why a particular op is not making progress.
  */
 class Blocker {
-protected:
-  virtual void dump_detail(ceph::Formatter *f) const = 0;
-
 public:
-  template <typename... T>
-  blocking_future<T...> make_blocking_future(seastar::future<T...> &&f) {
-    return blocking_future(this, std::move(f));
+  template <typename T>
+  blocking_future<T> make_blocking_future(seastar::future<T> &&f) {
+    return blocking_future<T>(this, std::move(f));
   }
-
   void dump(ceph::Formatter *f) const;
+  virtual ~Blocker() = default;
 
+private:
+  virtual void dump_detail(ceph::Formatter *f) const = 0;
   virtual const char *get_type_name() const = 0;
-
-  virtual ~Blocker() = default;
 };
 
 template <typename T>
 class BlockerT : public Blocker {
 public:
+  virtual ~BlockerT() = default;
+private:
   const char *get_type_name() const final {
     return T::type_name;
   }
+};
 
-  virtual ~BlockerT() = default;
+class AggregateBlocker : public BlockerT<AggregateBlocker> {
+  vector<Blocker*> parent_blockers;
+public:
+  AggregateBlocker(vector<Blocker*> &&parent_blockers)
+    : parent_blockers(std::move(parent_blockers)) {}
+  static constexpr const char *type_name = "AggregateBlocker";
+private:
+  void dump_detail(ceph::Formatter *f) const final;
 };
 
+template <typename T>
+blocking_future<> join_blocking_futures(T &&t) {
+  vector<Blocker*> blockers;
+  blockers.reserve(t.size());
+  for (auto &&bf: t) {
+    blockers.push_back(bf.blocker);
+    bf.blocker = nullptr;
+  }
+  auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
+  return agg->make_blocking_future(
+    seastar::parallel_for_each(
+      std::forward<T>(t),
+      [](auto &&bf) {
+       return std::move(bf.fut);
+      }).then([agg=std::move(agg)] {
+       return seastar::make_ready_future<>();
+      }));
+}
+
+
 /**
  * Common base for all crimson-osd operations.  Mainly provides
  * an interface for registering ops in flight and dumping
@@ -124,8 +187,8 @@ class Operation : public boost::intrusive_ref_counter<
   virtual const char *get_type_name() const = 0;
   virtual void print(std::ostream &) const = 0;
 
-  template <typename... T>
-  seastar::future<T...> with_blocking_future(blocking_future<T...> &&f) {
+  template <typename T>
+  seastar::future<T> with_blocking_future(blocking_future<T> &&f) {
     if (f.fut.available()) {
       return std::move(f.fut);
     }
@@ -141,7 +204,7 @@ class Operation : public boost::intrusive_ref_counter<
   void dump_brief(ceph::Formatter *f);
   virtual ~Operation() = default;
 
- protected:
+ private:
   virtual void dump_detail(ceph::Formatter *f) const = 0;
 
  private:
@@ -172,10 +235,6 @@ std::ostream &operator<<(std::ostream &, const Operation &op);
 
 template <typename T>
 class OperationT : public Operation {
-
-protected:
-  virtual void dump_detail(ceph::Formatter *f) const = 0;
-
 public:
   static constexpr const char *type_name = OP_NAMES[static_cast<int>(T::type)];
   using IRef = boost::intrusive_ptr<T>;
@@ -189,6 +248,9 @@ public:
   }
 
   virtual ~OperationT() = default;
+
+private:
+  virtual void dump_detail(ceph::Formatter *f) const = 0;
 };
 
 /**
@@ -216,6 +278,8 @@ class OperationRegistry {
     static_cast<int>(OperationTypeCode::last_op)
   > op_id_counters = {};
 
+  seastar::timer<seastar::lowres_clock> shutdown_timer;
+  seastar::promise<> shutdown;
 public:
   template <typename T, typename... Args>
   typename T::IRef create_operation(Args&&... args) {
@@ -224,6 +288,88 @@ public:
     op->set_id(op_id_counters[static_cast<int>(T::type)]++);
     return op;
   }
+
+  seastar::future<> stop() {
+    shutdown_timer.set_callback([this] {
+       if (std::all_of(registries.begin(),
+                       registries.end(),
+                       [](auto& opl) {
+                         return opl.empty();
+                       })) {
+         shutdown.set_value();
+         shutdown_timer.cancel();
+       }
+      });
+    shutdown_timer.arm_periodic(std::chrono::milliseconds(100/*TODO: use option instead*/));
+    return shutdown.get_future();
+  }
+};
+
+/**
+ * Throttles set of currently running operations
+ *
+ * Very primitive currently, assumes all ops are equally
+ * expensive and simply limits the number that can be
+ * concurrently active.
+ */
+class OperationThrottler : public Blocker,
+                       private md_config_obs_t {
+public:
+  OperationThrottler(ConfigProxy &conf);
+
+  const char** get_tracked_conf_keys() const final;
+  void handle_conf_change(const ConfigProxy& conf,
+                         const std::set<std::string> &changed) final;
+  void update_from_config(const ConfigProxy &conf);
+
+  template <typename F>
+  auto with_throttle(
+    OperationRef op,
+    crimson::osd::scheduler::params_t params,
+    F &&f) {
+    if (!max_in_progress) return f();
+    auto fut = acquire_throttle(params);
+    return op->with_blocking_future(std::move(fut))
+      .then(std::forward<F>(f))
+      .then([this](auto x) {
+       release_throttle();
+       return x;
+      });
+  }
+
+  template <typename F>
+  seastar::future<> with_throttle_while(
+    OperationRef op,
+    crimson::osd::scheduler::params_t params,
+    F &&f) {
+    return with_throttle(op, params, f).then([this, params, op, f](bool cont) {
+      if (cont)
+       return with_throttle_while(op, params, f);
+      else
+       return seastar::make_ready_future<>();
+    });
+  }
+
+private:
+  void dump_detail(Formatter *f) const final;
+  const char *get_type_name() const final {
+    return "OperationThrottler";
+  }
+
+private:
+  crimson::osd::scheduler::SchedulerRef scheduler;
+
+  uint64_t max_in_progress = 0;
+  uint64_t in_progress = 0;
+
+  uint64_t pending = 0;
+
+  void wake();
+
+  blocking_future<> acquire_throttle(
+    crimson::osd::scheduler::params_t params);
+
+  void release_throttle();
 };
 
 /**
@@ -234,17 +380,13 @@ public:
  * the op ordering is preserved.
  */
 class OrderedPipelinePhase : public Blocker {
-  const char * name;
-
-protected:
-  virtual void dump_detail(ceph::Formatter *f) const final;
+private:
+  void dump_detail(ceph::Formatter *f) const final;
   const char *get_type_name() const final {
     return name;
   }
 
 public:
-  seastar::shared_mutex mutex;
-
   /**
    * Used to encapsulate pipeline residency state.
    */
@@ -276,6 +418,10 @@ public:
   };
 
   OrderedPipelinePhase(const char *name) : name(name) {}
+
+private:
+  const char * name;
+  seastar::shared_mutex mutex;
 };
 
 }