#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <seastar/core/shared_mutex.hh>
#include <seastar/core/future.hh>
+#include <seastar/core/timer.hh>
+#include <seastar/core/lowres_clock.hh>
#include "include/ceph_assert.h"
+#include "crimson/osd/scheduler/scheduler.h"
namespace ceph {
class Formatter;
enum class OperationTypeCode {
client_request = 0,
- peering_event = 1,
- compound_peering_request = 2,
- pg_advance_map = 3,
- pg_creation = 4,
- replicated_request = 5,
- last_op = 6
+ peering_event,
+ compound_peering_request,
+ pg_advance_map,
+ pg_creation,
+ replicated_request,
+ background_recovery,
+ background_recovery_sub,
+ last_op
};
static constexpr const char* const OP_NAMES[] = {
"pg_advance_map",
"pg_creation",
"replicated_request",
+ "background_recovery",
+ "background_recovery_sub",
};
// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
* Provides an abstraction for registering and unregistering a blocker
* for the duration of a future becoming available.
*/
-template <typename... T>
-class blocking_future {
+template <typename Fut>
+class blocking_future_detail {
friend class Operation;
friend class Blocker;
Blocker *blocker;
- seastar::future<T...> fut;
- blocking_future(Blocker *b, seastar::future<T...> &&f)
+ Fut fut;
+ blocking_future_detail(Blocker *b, Fut &&f)
: blocker(b), fut(std::move(f)) {}
- template <typename... V, typename... U>
- friend blocking_future<V...> make_ready_blocking_future(U&&... args);
+ template <typename V, typename U>
+ friend blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args);
+ template <typename V, typename Exception>
+ friend blocking_future_detail<seastar::future<V>>
+ make_exception_blocking_future(Exception&& e);
+
+ template <typename U>
+ friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
+
+ template <typename U>
+ friend class blocking_future_detail;
+
+public:
+ template <typename F>
+ auto then(F &&f) && {
+ using result = decltype(std::declval<Fut>().then(f));
+ return blocking_future_detail<seastar::futurize_t<result>>(
+ blocker,
+ std::move(fut).then(std::forward<F>(f)));
+ }
};
-template <typename... V, typename... U>
-blocking_future<V...> make_ready_blocking_future(U&&... args) {
- return blocking_future<V...>(
+template <typename T=void>
+using blocking_future = blocking_future_detail<seastar::future<T>>;
+
+template <typename V, typename U>
+blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args) {
+ return blocking_future<V>(
nullptr,
- seastar::make_ready_future<V...>(std::forward<U>(args)...));
+ seastar::make_ready_future<V>(std::forward<U>(args)));
+}
+
+template <typename V, typename Exception>
+blocking_future_detail<seastar::future<V>>
+make_exception_blocking_future(Exception&& e) {
+ return blocking_future<V>(
+ nullptr,
+ seastar::make_exception_future<V>(e));
}
/**
* why a particular op is not making progress.
*/
class Blocker {
-protected:
- virtual void dump_detail(ceph::Formatter *f) const = 0;
-
public:
- template <typename... T>
- blocking_future<T...> make_blocking_future(seastar::future<T...> &&f) {
- return blocking_future(this, std::move(f));
+ template <typename T>
+ blocking_future<T> make_blocking_future(seastar::future<T> &&f) {
+ return blocking_future<T>(this, std::move(f));
}
-
void dump(ceph::Formatter *f) const;
+ virtual ~Blocker() = default;
+private:
+ virtual void dump_detail(ceph::Formatter *f) const = 0;
virtual const char *get_type_name() const = 0;
-
- virtual ~Blocker() = default;
};
template <typename T>
class BlockerT : public Blocker {
public:
+ virtual ~BlockerT() = default;
+private:
const char *get_type_name() const final {
return T::type_name;
}
+};
- virtual ~BlockerT() = default;
+class AggregateBlocker : public BlockerT<AggregateBlocker> {
+ vector<Blocker*> parent_blockers;
+public:
+ AggregateBlocker(vector<Blocker*> &&parent_blockers)
+ : parent_blockers(std::move(parent_blockers)) {}
+ static constexpr const char *type_name = "AggregateBlocker";
+private:
+ void dump_detail(ceph::Formatter *f) const final;
};
+template <typename T>
+blocking_future<> join_blocking_futures(T &&t) {
+ vector<Blocker*> blockers;
+ blockers.reserve(t.size());
+ for (auto &&bf: t) {
+ blockers.push_back(bf.blocker);
+ bf.blocker = nullptr;
+ }
+ auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
+ return agg->make_blocking_future(
+ seastar::parallel_for_each(
+ std::forward<T>(t),
+ [](auto &&bf) {
+ return std::move(bf.fut);
+ }).then([agg=std::move(agg)] {
+ return seastar::make_ready_future<>();
+ }));
+}
+
+
/**
* Common base for all crimson-osd operations. Mainly provides
* an interface for registering ops in flight and dumping
virtual const char *get_type_name() const = 0;
virtual void print(std::ostream &) const = 0;
- template <typename... T>
- seastar::future<T...> with_blocking_future(blocking_future<T...> &&f) {
+ template <typename T>
+ seastar::future<T> with_blocking_future(blocking_future<T> &&f) {
if (f.fut.available()) {
return std::move(f.fut);
}
void dump_brief(ceph::Formatter *f);
virtual ~Operation() = default;
- protected:
+ private:
virtual void dump_detail(ceph::Formatter *f) const = 0;
private:
template <typename T>
class OperationT : public Operation {
-
-protected:
- virtual void dump_detail(ceph::Formatter *f) const = 0;
-
public:
static constexpr const char *type_name = OP_NAMES[static_cast<int>(T::type)];
using IRef = boost::intrusive_ptr<T>;
}
virtual ~OperationT() = default;
+
+private:
+ virtual void dump_detail(ceph::Formatter *f) const = 0;
};
/**
static_cast<int>(OperationTypeCode::last_op)
> op_id_counters = {};
+ seastar::timer<seastar::lowres_clock> shutdown_timer;
+ seastar::promise<> shutdown;
public:
template <typename T, typename... Args>
typename T::IRef create_operation(Args&&... args) {
op->set_id(op_id_counters[static_cast<int>(T::type)]++);
return op;
}
+
+ seastar::future<> stop() {
+ shutdown_timer.set_callback([this] {
+ if (std::all_of(registries.begin(),
+ registries.end(),
+ [](auto& opl) {
+ return opl.empty();
+ })) {
+ shutdown.set_value();
+ shutdown_timer.cancel();
+ }
+ });
+ shutdown_timer.arm_periodic(std::chrono::milliseconds(100/*TODO: use option instead*/));
+ return shutdown.get_future();
+ }
+};
+
+/**
+ * Throttles set of currently running operations
+ *
+ * Very primitive currently, assumes all ops are equally
+ * expensive and simply limits the number that can be
+ * concurrently active.
+ */
+class OperationThrottler : public Blocker,
+ private md_config_obs_t {
+public:
+ OperationThrottler(ConfigProxy &conf);
+
+ const char** get_tracked_conf_keys() const final;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string> &changed) final;
+ void update_from_config(const ConfigProxy &conf);
+
+ template <typename F>
+ auto with_throttle(
+ OperationRef op,
+ crimson::osd::scheduler::params_t params,
+ F &&f) {
+ if (!max_in_progress) return f();
+ auto fut = acquire_throttle(params);
+ return op->with_blocking_future(std::move(fut))
+ .then(std::forward<F>(f))
+ .then([this](auto x) {
+ release_throttle();
+ return x;
+ });
+ }
+
+ template <typename F>
+ seastar::future<> with_throttle_while(
+ OperationRef op,
+ crimson::osd::scheduler::params_t params,
+ F &&f) {
+ return with_throttle(op, params, f).then([this, params, op, f](bool cont) {
+ if (cont)
+ return with_throttle_while(op, params, f);
+ else
+ return seastar::make_ready_future<>();
+ });
+ }
+
+private:
+ void dump_detail(Formatter *f) const final;
+ const char *get_type_name() const final {
+ return "OperationThrottler";
+ }
+
+private:
+ crimson::osd::scheduler::SchedulerRef scheduler;
+
+ uint64_t max_in_progress = 0;
+ uint64_t in_progress = 0;
+
+ uint64_t pending = 0;
+
+ void wake();
+
+ blocking_future<> acquire_throttle(
+ crimson::osd::scheduler::params_t params);
+
+ void release_throttle();
};
/**
* the op ordering is preserved.
*/
class OrderedPipelinePhase : public Blocker {
- const char * name;
-
-protected:
- virtual void dump_detail(ceph::Formatter *f) const final;
+private:
+ void dump_detail(ceph::Formatter *f) const final;
const char *get_type_name() const final {
return name;
}
public:
- seastar::shared_mutex mutex;
-
/**
* Used to encapsulate pipeline residency state.
*/
};
OrderedPipelinePhase(const char *name) : name(name) {}
+
+private:
+ const char * name;
+ seastar::shared_mutex mutex;
};
}