#include "OpRequest.h"
#include "Session.h"
+#include "osd/PGQueueable.h"
+
#include <atomic>
#include <map>
#include <memory>
#include "common/sharedptr_registry.hpp"
#include "common/WeightedPriorityQueue.h"
#include "common/PrioritizedQueue.h"
+#include "osd/mClockOpClassQueue.h"
+#include "osd/mClockClientQueue.h"
#include "messages/MOSDOp.h"
#include "include/Spinlock.h"
#include "common/EventTrace.h"
l_osd_op_rw_process_lat,
l_osd_op_rw_prepare_lat,
+ l_osd_op_before_queue_op_lat,
+ l_osd_op_before_dequeue_op_lat,
+
l_osd_sop,
l_osd_sop_inb,
l_osd_sop_lat,
l_osd_map_cache_miss,
l_osd_map_cache_miss_low,
l_osd_map_cache_miss_low_avg,
+ l_osd_map_bl_cache_hit,
+ l_osd_map_bl_cache_miss,
l_osd_stat_bytes,
l_osd_stat_bytes_used,
class OSD;
-struct PGScrub {
- epoch_t epoch_queued;
- explicit PGScrub(epoch_t e) : epoch_queued(e) {}
- ostream &operator<<(ostream &rhs) {
- return rhs << "PGScrub";
- }
-};
-
-struct PGSnapTrim {
- epoch_t epoch_queued;
- explicit PGSnapTrim(epoch_t e) : epoch_queued(e) {}
- ostream &operator<<(ostream &rhs) {
- return rhs << "PGSnapTrim";
- }
-};
-
-struct PGRecovery {
- epoch_t epoch_queued;
- uint64_t reserved_pushes;
- PGRecovery(epoch_t e, uint64_t reserved_pushes)
- : epoch_queued(e), reserved_pushes(reserved_pushes) {}
- ostream &operator<<(ostream &rhs) {
- return rhs << "PGRecovery(epoch=" << epoch_queued
- << ", reserved_pushes: " << reserved_pushes << ")";
- }
-};
-
-class PGQueueable {
- typedef boost::variant<
- OpRequestRef,
- PGSnapTrim,
- PGScrub,
- PGRecovery
- > QVariant;
- QVariant qvariant;
- int cost;
- unsigned priority;
- utime_t start_time;
- entity_inst_t owner;
- epoch_t map_epoch; ///< an epoch we expect the PG to exist in
-
- struct RunVis : public boost::static_visitor<> {
- OSD *osd;
- PGRef &pg;
- ThreadPool::TPHandle &handle;
- RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle)
- : osd(osd), pg(pg), handle(handle) {}
- void operator()(const OpRequestRef &op);
- void operator()(const PGSnapTrim &op);
- void operator()(const PGScrub &op);
- void operator()(const PGRecovery &op);
- };
-
- struct StringifyVis : public boost::static_visitor<std::string> {
- std::string operator()(const OpRequestRef &op) {
- return stringify(op);
- }
- std::string operator()(const PGSnapTrim &op) {
- return "PGSnapTrim";
- }
- std::string operator()(const PGScrub &op) {
- return "PGScrub";
- }
- std::string operator()(const PGRecovery &op) {
- return "PGRecovery";
- }
- };
- friend ostream& operator<<(ostream& out, const PGQueueable& q) {
- StringifyVis v;
- return out << "PGQueueable(" << boost::apply_visitor(v, q.qvariant)
- << " prio " << q.priority << " cost " << q.cost
- << " e" << q.map_epoch << ")";
- }
-
-public:
- // cppcheck-suppress noExplicitConstructor
- PGQueueable(OpRequestRef op, epoch_t e)
- : qvariant(op), cost(op->get_req()->get_cost()),
- priority(op->get_req()->get_priority()),
- start_time(op->get_req()->get_recv_stamp()),
- owner(op->get_req()->get_source_inst()),
- map_epoch(e)
- {}
- PGQueueable(
- const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time,
- const entity_inst_t &owner, epoch_t e)
- : qvariant(op), cost(cost), priority(priority), start_time(start_time),
- owner(owner), map_epoch(e) {}
- PGQueueable(
- const PGScrub &op, int cost, unsigned priority, utime_t start_time,
- const entity_inst_t &owner, epoch_t e)
- : qvariant(op), cost(cost), priority(priority), start_time(start_time),
- owner(owner), map_epoch(e) {}
- PGQueueable(
- const PGRecovery &op, int cost, unsigned priority, utime_t start_time,
- const entity_inst_t &owner, epoch_t e)
- : qvariant(op), cost(cost), priority(priority), start_time(start_time),
- owner(owner), map_epoch(e) {}
- const boost::optional<OpRequestRef> maybe_get_op() const {
- const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
- return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
- }
- uint64_t get_reserved_pushes() const {
- const PGRecovery *op = boost::get<PGRecovery>(&qvariant);
- return op ? op->reserved_pushes : 0;
- }
- void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
- RunVis v(osd, pg, handle);
- boost::apply_visitor(v, qvariant);
- }
- unsigned get_priority() const { return priority; }
- int get_cost() const { return cost; }
- utime_t get_start_time() const { return start_time; }
- entity_inst_t get_owner() const { return owner; }
- epoch_t get_map_epoch() const { return map_epoch; }
-};
-
class OSDService {
public:
OSD *osd;
Mutex recovery_request_lock;
SafeTimer recovery_request_timer;
+ // For async recovery sleep
+ bool recovery_needs_sleep = true;
+ utime_t recovery_schedule_time = utime_t();
+
+ Mutex recovery_sleep_lock;
+ SafeTimer recovery_sleep_timer;
+
// -- tids --
// for ops i issue
std::atomic_uint last_tid{0};
Mutex snap_sleep_lock;
SafeTimer snap_sleep_timer;
+ Mutex scrub_sleep_lock;
+ SafeTimer scrub_sleep_timer;
+
AsyncReserver<spg_t> snap_reserver;
void queue_for_snap_trim(PG *pg);
- void queue_for_scrub(PG *pg) {
+ void queue_for_scrub(PG *pg, bool with_high_priority) {
+ unsigned scrub_queue_priority = pg->scrubber.priority;
+ if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
+ scrub_queue_priority = cct->_conf->osd_client_op_priority;
+ }
enqueue_back(
pg->info.pgid,
PGQueueable(
PGScrub(pg->get_osdmap()->get_epoch()),
cct->_conf->osd_scrub_cost,
- pg->scrubber.priority,
+ scrub_queue_priority,
ceph_clock_now(),
entity_inst_t(),
pg->get_osdmap()->get_epoch()));
}
}
// delayed pg activation
- void queue_for_recovery(PG *pg, bool front = false) {
+ void queue_for_recovery(PG *pg) {
Mutex::Locker l(recovery_lock);
- if (front) {
+
+ if (pg->get_state() & (PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL)) {
awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
} else {
awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
}
_maybe_queue_recovery();
}
+ void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
+ Mutex::Locker l(recovery_lock);
+ _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
+ }
+ void adjust_pg_priorities(const vector<PGRef>& pgs, int newflags);
// osd map cache (past osd maps)
Mutex map_cache_lock;
void init();
void final_init();
void start_shutdown();
+ void shutdown_reserver();
void shutdown();
private:
// -- stats --
Mutex stat_lock;
osd_stat_t osd_stat;
+ uint32_t seq = 0;
void update_osd_stat(vector<int>& hb_peers);
+ osd_stat_t set_osd_stat(const struct store_statfs_t &stbuf,
+ vector<int>& hb_peers);
osd_stat_t get_osd_stat() {
Mutex::Locker l(stat_lock);
+ ++seq;
+ osd_stat.up_from = up_epoch;
+ osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
return osd_stat;
}
+ uint64_t get_osd_stat_seq() {
+ Mutex::Locker l(stat_lock);
+ return osd_stat.seq;
+ }
// -- OSD Full Status --
private:
mutable int64_t injectfull = 0;
s_names injectfull_state = NONE;
float get_failsafe_full_ratio();
- void check_full_status(const osd_stat_t &stat);
+ void check_full_status(float ratio);
bool _check_full(s_names type, ostream &ss) const;
public:
bool check_failsafe_full(ostream &ss) const;
Mutex pgid_lock;
map<spg_t, int> pgid_tracker;
map<spg_t, PG*> live_pgs;
- void add_pgid(spg_t pgid, PG *pg) {
- Mutex::Locker l(pgid_lock);
- if (!pgid_tracker.count(pgid)) {
- live_pgs[pgid] = pg;
- }
- pgid_tracker[pgid]++;
- }
- void remove_pgid(spg_t pgid, PG *pg) {
- Mutex::Locker l(pgid_lock);
- assert(pgid_tracker.count(pgid));
- assert(pgid_tracker[pgid] > 0);
- pgid_tracker[pgid]--;
- if (pgid_tracker[pgid] == 0) {
- pgid_tracker.erase(pgid);
- live_pgs.erase(pgid);
- }
- }
- void dump_live_pgids() {
- Mutex::Locker l(pgid_lock);
- derr << "live pgids:" << dendl;
- for (map<spg_t, int>::const_iterator i = pgid_tracker.cbegin();
- i != pgid_tracker.cend();
- ++i) {
- derr << "\t" << *i << dendl;
- live_pgs[i->first]->dump_live_ids();
- }
- }
+ void add_pgid(spg_t pgid, PG *pg);
+ void remove_pgid(spg_t pgid, PG *pg);
+ void dump_live_pgids();
#endif
explicit OSDService(OSD *osd);
int whoami;
std::string dev_path, journal_path;
+ bool store_is_rotational = true;
+ bool journal_is_rotational = true;
+
ZTracer::Endpoint trace_endpoint;
void create_logger();
void create_recoverystate_perf();
private:
std::atomic_int state{STATE_INITIALIZING};
+ bool waiting_for_luminous_mons = false;
public:
int get_state() const {
private:
- ThreadPool osd_tp;
+ ThreadPool peering_tp;
ShardedThreadPool osd_op_tp;
ThreadPool disk_tp;
ThreadPool command_tp;
friend struct C_OpenPGs;
// -- op queue --
- enum io_queue {
+ enum class io_queue {
prioritized,
- weightedpriority
+ weightedpriority,
+ mclock_opclass,
+ mclock_client,
};
+ friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+
const io_queue op_queue;
const unsigned int op_prio_cutoff;
* and already requeued the items.
*/
friend class PGQueueable;
+
class ShardedOpWQ
: public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
{
: sdata_lock(lock_name.c_str(), false, true, false, cct),
sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
false, cct) {
- if (opqueue == weightedpriority) {
+ if (opqueue == io_queue::weightedpriority) {
pqueue = std::unique_ptr
<WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
max_tok_per_prio, min_cost));
- } else if (opqueue == prioritized) {
+ } else if (opqueue == io_queue::prioritized) {
pqueue = std::unique_ptr
<PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
max_tok_per_prio, min_cost));
+ } else if (opqueue == io_queue::mclock_opclass) {
+ pqueue = std::unique_ptr
+ <ceph::mClockOpClassQueue>(new ceph::mClockOpClassQueue(cct));
+ } else if (opqueue == io_queue::mclock_client) {
+ pqueue = std::unique_ptr
+ <ceph::mClockClientQueue>(new ceph::mClockClientQueue(cct));
}
}
- };
+ }; // struct ShardData
vector<ShardData*> shard_list;
OSD *osd;
OSDMapRef get_osdmap() {
return osdmap;
}
- epoch_t get_osdmap_epoch() {
+ epoch_t get_osdmap_epoch() const {
return osdmap ? osdmap->get_epoch() : 0;
}
epoch_t advance_to, PG *pg,
ThreadPool::TPHandle &handle,
PG::RecoveryCtx *rctx,
- set<boost::intrusive_ptr<PG> > *split_pgs
+ set<PGRef> *split_pgs
);
void consume_map();
void activate_map();
PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
PG *_lookup_lock_pg(spg_t pgid);
+
+public:
+ PG *lookup_lock_pg(spg_t pgid);
+
+protected:
PG *_open_lock_pg(OSDMapRef createmap,
spg_t pg, bool no_lockdep_check=false);
enum res_result {
void split_pgs(
PG *parent,
- const set<spg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs,
+ const set<spg_t> &childpgids, set<PGRef> *out_pgs,
OSDMapRef curmap,
OSDMapRef nextmap,
PG::RecoveryCtx *rctx);
}
pg_stat_queue_lock.Unlock();
}
+ void clear_outstanding_pg_stats(){
+ Mutex::Locker l(pg_stat_queue_lock);
+ outstanding_pg_stats.clear();
+ }
ceph_tid_t get_tid() {
return service.get_tid();
void handle_pg_backfill_reserve(OpRequestRef op);
void handle_pg_recovery_reserve(OpRequestRef op);
+ void handle_force_recovery(Message *m);
+
void handle_pg_remove(OpRequestRef op);
void _remove_pg(PG *pg);
}
} remove_wq;
- private:
+private:
bool ms_can_fast_dispatch_any() const override { return true; }
bool ms_can_fast_dispatch(const Message *m) const override {
switch (m->get_type()) {
case MSG_OSD_REP_SCRUBMAP:
case MSG_OSD_PG_UPDATE_LOG_MISSING:
case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+ case MSG_OSD_PG_RECOVERY_DELETE:
+ case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
return true;
default:
return false;
io_queue get_io_queue() const {
if (cct->_conf->osd_op_queue == "debug_random") {
+ static io_queue index_lookup[] = { io_queue::prioritized,
+ io_queue::weightedpriority,
+ io_queue::mclock_opclass,
+ io_queue::mclock_client };
srand(time(NULL));
- return (rand() % 2 < 1) ? prioritized : weightedpriority;
- } else if (cct->_conf->osd_op_queue == "wpq") {
- return weightedpriority;
+ unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
+ return index_lookup[which];
+ } else if (cct->_conf->osd_op_queue == "prioritized") {
+ return io_queue::prioritized;
+ } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
+ return io_queue::mclock_opclass;
+ } else if (cct->_conf->osd_op_queue == "mclock_client") {
+ return io_queue::mclock_client;
} else {
- return prioritized;
+ // default / catch-all is 'wpq'
+ return io_queue::weightedpriority;
}
}
if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
srand(time(NULL));
return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
- } else if (cct->_conf->osd_op_queue_cut_off == "low") {
- return CEPH_MSG_PRIO_LOW;
- } else {
+ } else if (cct->_conf->osd_op_queue_cut_off == "high") {
return CEPH_MSG_PRIO_HIGH;
+ } else {
+ // default / catch-all is 'low'
+ return CEPH_MSG_PRIO_LOW;
}
}
int init_op_flags(OpRequestRef& op);
+ int get_num_op_shards();
+ int get_num_op_threads();
+
+ float get_osd_recovery_sleep();
+
public:
static int peek_meta(ObjectStore *store, string& magic,
uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami);
friend class OSDService;
};
+
+std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+
+
//compatibility of the executable
extern const CompatSet::Feature ceph_osd_feature_compat[];
extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
extern const CompatSet::Feature ceph_osd_feature_incompat[];
-#endif
+#endif // CEPH_OSD_H