-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
+ * License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
- *
+ *
*/
#ifndef CEPH_OSD_H
#include "msg/Dispatcher.h"
-#include "common/Mutex.h"
-#include "common/RWLock.h"
+#include "common/async/context_pool.h"
#include "common/Timer.h"
#include "common/WorkQueue.h"
#include "common/AsyncReserver.h"
#include "common/ceph_context.h"
+#include "common/config_cacher.h"
#include "common/zipkin_trace.h"
+#include "common/ceph_timer.h"
#include "mgr/MgrClient.h"
#include "os/ObjectStore.h"
-#include "OSDCap.h"
-
-#include "auth/KeyRing.h"
-#include "osd/ClassHandler.h"
#include "include/CompatSet.h"
+#include "include/common_fwd.h"
#include "OpRequest.h"
#include "Session.h"
-#include "osd/PGQueueable.h"
+#include "osd/scheduler/OpScheduler.h"
#include <atomic>
#include <map>
#include <memory>
-#include "include/memory.h"
-using namespace std;
+#include <string>
#include "include/unordered_map.h"
#include "common/shared_cache.hpp"
#include "common/simple_cache.hpp"
-#include "common/sharedptr_registry.hpp"
-#include "common/WeightedPriorityQueue.h"
-#include "common/PrioritizedQueue.h"
-#include "osd/mClockOpClassQueue.h"
-#include "osd/mClockClientQueue.h"
#include "messages/MOSDOp.h"
-#include "include/Spinlock.h"
#include "common/EventTrace.h"
+#include "osd/osd_perf_counters.h"
+#include "common/Finisher.h"
+#include "scrubber/osd_scrub_sched.h"
#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
+/*
-enum {
- l_osd_first = 10000,
- l_osd_op_wip,
- l_osd_op,
- l_osd_op_inb,
- l_osd_op_outb,
- l_osd_op_lat,
- l_osd_op_process_lat,
- l_osd_op_prepare_lat,
- l_osd_op_r,
- l_osd_op_r_outb,
- l_osd_op_r_lat,
- l_osd_op_r_lat_outb_hist,
- l_osd_op_r_process_lat,
- l_osd_op_r_prepare_lat,
- l_osd_op_w,
- l_osd_op_w_inb,
- l_osd_op_w_lat,
- l_osd_op_w_lat_inb_hist,
- l_osd_op_w_process_lat,
- l_osd_op_w_prepare_lat,
- l_osd_op_rw,
- l_osd_op_rw_inb,
- l_osd_op_rw_outb,
- l_osd_op_rw_lat,
- l_osd_op_rw_lat_inb_hist,
- l_osd_op_rw_lat_outb_hist,
- l_osd_op_rw_process_lat,
- l_osd_op_rw_prepare_lat,
-
- l_osd_op_before_queue_op_lat,
- l_osd_op_before_dequeue_op_lat,
-
- l_osd_sop,
- l_osd_sop_inb,
- l_osd_sop_lat,
- l_osd_sop_w,
- l_osd_sop_w_inb,
- l_osd_sop_w_lat,
- l_osd_sop_pull,
- l_osd_sop_pull_lat,
- l_osd_sop_push,
- l_osd_sop_push_inb,
- l_osd_sop_push_lat,
-
- l_osd_pull,
- l_osd_push,
- l_osd_push_outb,
-
- l_osd_rop,
-
- l_osd_loadavg,
- l_osd_buf,
- l_osd_history_alloc_bytes,
- l_osd_history_alloc_num,
- l_osd_cached_crc,
- l_osd_cached_crc_adjusted,
- l_osd_missed_crc,
-
- l_osd_pg,
- l_osd_pg_primary,
- l_osd_pg_replica,
- l_osd_pg_stray,
- l_osd_pg_removing,
- l_osd_hb_to,
- l_osd_map,
- l_osd_mape,
- l_osd_mape_dup,
-
- l_osd_waiting_for_map,
-
- l_osd_map_cache_hit,
- l_osd_map_cache_miss,
- l_osd_map_cache_miss_low,
- l_osd_map_cache_miss_low_avg,
- l_osd_map_bl_cache_hit,
- l_osd_map_bl_cache_miss,
-
- l_osd_stat_bytes,
- l_osd_stat_bytes_used,
- l_osd_stat_bytes_avail,
-
- l_osd_copyfrom,
-
- l_osd_tier_promote,
- l_osd_tier_flush,
- l_osd_tier_flush_fail,
- l_osd_tier_try_flush,
- l_osd_tier_try_flush_fail,
- l_osd_tier_evict,
- l_osd_tier_whiteout,
- l_osd_tier_dirty,
- l_osd_tier_clean,
- l_osd_tier_delay,
- l_osd_tier_proxy_read,
- l_osd_tier_proxy_write,
-
- l_osd_agent_wake,
- l_osd_agent_skip,
- l_osd_agent_flush,
- l_osd_agent_evict,
-
- l_osd_object_ctx_cache_hit,
- l_osd_object_ctx_cache_total,
-
- l_osd_op_cache_hit,
- l_osd_tier_flush_lat,
- l_osd_tier_promote_lat,
- l_osd_tier_r_lat,
-
- l_osd_pg_info,
- l_osd_pg_fastinfo,
- l_osd_pg_biginfo,
-
- l_osd_last,
-};
+ lock ordering for pg map
-// RecoveryState perf counters
-enum {
- rs_first = 20000,
- rs_initial_latency,
- rs_started_latency,
- rs_reset_latency,
- rs_start_latency,
- rs_primary_latency,
- rs_peering_latency,
- rs_backfilling_latency,
- rs_waitremotebackfillreserved_latency,
- rs_waitlocalbackfillreserved_latency,
- rs_notbackfilling_latency,
- rs_repnotrecovering_latency,
- rs_repwaitrecoveryreserved_latency,
- rs_repwaitbackfillreserved_latency,
- rs_reprecovering_latency,
- rs_activating_latency,
- rs_waitlocalrecoveryreserved_latency,
- rs_waitremoterecoveryreserved_latency,
- rs_recovering_latency,
- rs_recovered_latency,
- rs_clean_latency,
- rs_active_latency,
- rs_replicaactive_latency,
- rs_stray_latency,
- rs_getinfo_latency,
- rs_getlog_latency,
- rs_waitactingchange_latency,
- rs_incomplete_latency,
- rs_down_latency,
- rs_getmissing_latency,
- rs_waitupthru_latency,
- rs_notrecovering_latency,
- rs_last,
-};
+ PG::lock
+ ShardData::lock
+ OSD::pg_map_lock
+
+ */
class Messenger;
class Message;
class MonClient;
-class PerfCounters;
class ObjectStore;
class FuseStore;
class OSDMap;
class MLog;
class Objecter;
+class KeyStore;
class Watch;
class PrimaryLogPG;
-class AuthAuthorizeHandlerRegistry;
-
class TestOpsSocketHook;
-struct C_CompleteSplits;
+struct C_FinishSplits;
struct C_OpenPGs;
class LogChannel;
-class CephContext;
-typedef ceph::shared_ptr<ObjectStore::Sequencer> SequencerRef;
-class MOSDOp;
-class DeletingState {
- Mutex lock;
- Cond cond;
- enum {
- QUEUED,
- CLEARING_DIR,
- CLEARING_WAITING,
- DELETING_DIR,
- DELETED_DIR,
- CANCELED,
- } status;
- bool stop_deleting;
-public:
- const spg_t pgid;
- const PGRef old_pg_state;
- explicit DeletingState(const pair<spg_t, PGRef> &in) :
- lock("DeletingState::lock"), status(QUEUED), stop_deleting(false),
- pgid(in.first), old_pg_state(in.second) {
- }
-
- /// transition status to CLEARING_WAITING
- bool pause_clearing() {
- Mutex::Locker l(lock);
- assert(status == CLEARING_DIR);
- if (stop_deleting) {
- status = CANCELED;
- cond.Signal();
- return false;
- }
- status = CLEARING_WAITING;
- return true;
- } ///< @return false if we should cancel deletion
-
- /// start or resume the clearing - transition the status to CLEARING_DIR
- bool start_or_resume_clearing() {
- Mutex::Locker l(lock);
- assert(
- status == QUEUED ||
- status == DELETED_DIR ||
- status == CLEARING_WAITING);
- if (stop_deleting) {
- status = CANCELED;
- cond.Signal();
- return false;
- }
- status = CLEARING_DIR;
- return true;
- } ///< @return false if we should cancel the deletion
-
- /// transition status to CLEARING_DIR
- bool resume_clearing() {
- Mutex::Locker l(lock);
- assert(status == CLEARING_WAITING);
- if (stop_deleting) {
- status = CANCELED;
- cond.Signal();
- return false;
- }
- status = CLEARING_DIR;
- return true;
- } ///< @return false if we should cancel deletion
-
- /// transition status to deleting
- bool start_deleting() {
- Mutex::Locker l(lock);
- assert(status == CLEARING_DIR);
- if (stop_deleting) {
- status = CANCELED;
- cond.Signal();
- return false;
- }
- status = DELETING_DIR;
- return true;
- } ///< @return false if we should cancel deletion
-
- /// signal collection removal queued
- void finish_deleting() {
- Mutex::Locker l(lock);
- assert(status == DELETING_DIR);
- status = DELETED_DIR;
- cond.Signal();
- }
-
- /// try to halt the deletion
- bool try_stop_deletion() {
- Mutex::Locker l(lock);
- stop_deleting = true;
- /**
- * If we are in DELETING_DIR or CLEARING_DIR, there are in progress
- * operations we have to wait for before continuing on. States
- * CLEARING_WAITING and QUEUED indicate that the remover will check
- * stop_deleting before queueing any further operations. CANCELED
- * indicates that the remover has already halted. DELETED_DIR
- * indicates that the deletion has been fully queued.
- */
- while (status == DELETING_DIR || status == CLEARING_DIR)
- cond.Wait(lock);
- return status != DELETED_DIR;
- } ///< @return true if we don't need to recreate the collection
-};
-typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
+class MOSDPGCreate2;
+class MOSDPGNotify;
+class MOSDPGInfo;
+class MOSDPGRemove;
+class MOSDForceRecovery;
+class MMonGetPurgedSnapsReply;
class OSD;
class OSDService {
+ using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
public:
OSD *osd;
CephContext *cct;
- SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry;
- ceph::shared_ptr<ObjectStore::Sequencer> meta_osr;
- SharedPtrRegistry<spg_t, DeletingState> deleting_pgs;
+ ObjectStore::CollectionHandle meta_ch;
const int whoami;
- ObjectStore *&store;
+ ObjectStore * const store;
LogClient &log_client;
LogChannelRef clog;
PGRecoveryStats &pg_recovery_stats;
PerfCounters *&logger;
PerfCounters *&recoverystate_perf;
MonClient *&monc;
- ThreadPool::BatchWorkQueue<PG> &peering_wq;
- GenContextWQ recovery_gen_wq;
- ClassHandler *&class_handler;
- void enqueue_back(spg_t pgid, PGQueueable qi);
- void enqueue_front(spg_t pgid, PGQueueable qi);
+ md_config_cacher_t<Option::size_t> osd_max_object_size;
+ md_config_cacher_t<bool> osd_skip_data_digest;
+
+ void enqueue_back(OpSchedulerItem&& qi);
+ void enqueue_front(OpSchedulerItem&& qi);
void maybe_inject_dispatch_delay() {
- if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
+ if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
if (rand() % 10000 <
- g_conf->osd_debug_inject_dispatch_delay_probability * 10000) {
+ g_conf()->osd_debug_inject_dispatch_delay_probability * 10000) {
utime_t t;
- t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration);
+ t.set_from_double(g_conf()->osd_debug_inject_dispatch_delay_duration);
t.sleep();
}
}
}
-private:
- // -- map epoch lower bound --
- Mutex pg_epoch_lock;
- multiset<epoch_t> pg_epochs;
- map<spg_t,epoch_t> pg_epoch;
-
-public:
- void pg_add_epoch(spg_t pgid, epoch_t epoch) {
- Mutex::Locker l(pg_epoch_lock);
- map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
- assert(t == pg_epoch.end());
- pg_epoch[pgid] = epoch;
- pg_epochs.insert(epoch);
- }
- void pg_update_epoch(spg_t pgid, epoch_t epoch) {
- Mutex::Locker l(pg_epoch_lock);
- map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
- assert(t != pg_epoch.end());
- pg_epochs.erase(pg_epochs.find(t->second));
- t->second = epoch;
- pg_epochs.insert(epoch);
- }
- void pg_remove_epoch(spg_t pgid) {
- Mutex::Locker l(pg_epoch_lock);
- map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
- if (t != pg_epoch.end()) {
- pg_epochs.erase(pg_epochs.find(t->second));
- pg_epoch.erase(t);
- }
- }
- epoch_t get_min_pg_epoch() {
- Mutex::Locker l(pg_epoch_lock);
- if (pg_epochs.empty())
- return 0;
- else
- return *pg_epochs.begin();
- }
+ ceph::signedspan get_mnow();
private:
// -- superblock --
- Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
+ ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
OSDSuperblock superblock;
public:
OSDSuperblock get_superblock() {
- Mutex::Locker l(publish_lock);
+ std::lock_guard l(publish_lock);
return superblock;
}
void publish_superblock(const OSDSuperblock &block) {
- Mutex::Locker l(publish_lock);
+ std::lock_guard l(publish_lock);
superblock = block;
}
public:
OSDMapRef get_osdmap() {
- Mutex::Locker l(publish_lock);
+ std::lock_guard l(publish_lock);
return osdmap;
}
epoch_t get_osdmap_epoch() {
- Mutex::Locker l(publish_lock);
+ std::lock_guard l(publish_lock);
return osdmap ? osdmap->get_epoch() : 0;
}
void publish_map(OSDMapRef map) {
- Mutex::Locker l(publish_lock);
+ std::lock_guard l(publish_lock);
osdmap = map;
}
/*
- * osdmap - current published map
- * next_osdmap - pre_published map that is about to be published.
+ * osdmap - current published std::map
+ * next_osdmap - pre_published std::map that is about to be published.
*
* We use the next_osdmap to send messages and initiate connections,
- * but only if the target is the same instance as the one in the map
+ * but only if the target is the same instance as the one in the std::map
* epoch the current user is working from (i.e., the result is
* equivalent to what is in next_osdmap).
*
*/
private:
OSDMapRef next_osdmap;
- Cond pre_publish_cond;
+ ceph::condition_variable pre_publish_cond;
+ int pre_publish_waiter = 0;
public:
void pre_publish_map(OSDMapRef map) {
- Mutex::Locker l(pre_publish_lock);
+ std::lock_guard l(pre_publish_lock);
next_osdmap = std::move(map);
}
void activate_map();
/// map epochs reserved below
- map<epoch_t, unsigned> map_reservations;
+ std::map<epoch_t, unsigned> map_reservations;
/// gets ref to next_osdmap and registers the epoch as reserved
OSDMapRef get_nextmap_reserved() {
- Mutex::Locker l(pre_publish_lock);
- if (!next_osdmap)
- return OSDMapRef();
+ std::lock_guard l(pre_publish_lock);
epoch_t e = next_osdmap->get_epoch();
- map<epoch_t, unsigned>::iterator i =
- map_reservations.insert(make_pair(e, 0)).first;
+ std::map<epoch_t, unsigned>::iterator i =
+ map_reservations.insert(std::make_pair(e, 0)).first;
i->second++;
return next_osdmap;
}
/// releases reservation on map
void release_map(OSDMapRef osdmap) {
- Mutex::Locker l(pre_publish_lock);
- map<epoch_t, unsigned>::iterator i =
+ std::lock_guard l(pre_publish_lock);
+ std::map<epoch_t, unsigned>::iterator i =
map_reservations.find(osdmap->get_epoch());
- assert(i != map_reservations.end());
- assert(i->second > 0);
+ ceph_assert(i != map_reservations.end());
+ ceph_assert(i->second > 0);
if (--(i->second) == 0) {
map_reservations.erase(i);
}
- pre_publish_cond.Signal();
+ if (pre_publish_waiter) {
+ pre_publish_cond.notify_all();
+ }
}
/// blocks until there are no reserved maps prior to next_osdmap
void await_reserved_maps() {
- Mutex::Locker l(pre_publish_lock);
- assert(next_osdmap);
- while (true) {
- map<epoch_t, unsigned>::const_iterator i = map_reservations.cbegin();
- if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) {
- break;
- } else {
- pre_publish_cond.Wait(pre_publish_lock);
- }
- }
+ std::unique_lock l{pre_publish_lock};
+ ceph_assert(next_osdmap);
+ pre_publish_waiter++;
+ pre_publish_cond.wait(l, [this] {
+ auto i = map_reservations.cbegin();
+ return (i == map_reservations.cend() ||
+ i->first >= next_osdmap->get_epoch());
+ });
+ pre_publish_waiter--;
+ }
+ OSDMapRef get_next_osdmap() {
+ std::lock_guard l(pre_publish_lock);
+ return next_osdmap;
}
-private:
- Mutex peer_map_epoch_lock;
- map<int, epoch_t> peer_map_epoch;
-public:
- epoch_t get_peer_epoch(int p);
- epoch_t note_peer_epoch(int p, epoch_t e);
- void forget_peer_epoch(int p, epoch_t e);
+ void maybe_share_map(Connection *con,
+ const OSDMapRef& osdmap,
+ epoch_t peer_epoch_lb=0);
void send_map(class MOSDMap *m, Connection *con);
- void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
+ void send_incremental_map(epoch_t since, Connection *con,
+ const OSDMapRef& osdmap);
MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
OSDSuperblock& superblock);
- bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
- const OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
- void share_map(entity_name_t name, Connection *con, epoch_t epoch,
- OSDMapRef& osdmap, epoch_t *sent_epoch_p);
- void share_map_peer(int peer, Connection *con,
- OSDMapRef map = OSDMapRef());
ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
- pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
+ std::pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
- void send_message_osd_cluster(Message *m, Connection *con) {
- con->send_message(m);
+ void send_message_osd_cluster(std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch);
+ void send_message_osd_cluster(MessageRef m, Connection *con) {
+ con->send_message2(std::move(m));
}
void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
con->send_message(m);
}
- void send_message_osd_client(Message *m, Connection *con) {
- con->send_message(m);
- }
void send_message_osd_client(Message *m, const ConnectionRef& con) {
con->send_message(m);
}
- entity_name_t get_cluster_msgr_name() {
- return cluster_messenger->get_myname();
- }
+ entity_name_t get_cluster_msgr_name() const;
-private:
- // -- scrub scheduling --
- Mutex sched_scrub_lock;
- int scrubs_pending;
- int scrubs_active;
public:
- struct ScrubJob {
- CephContext* cct;
- /// pg to be scrubbed
- spg_t pgid;
- /// a time scheduled for scrub. but the scrub could be delayed if system
- /// load is too high or it fails to fall in the scrub hours
- utime_t sched_time;
- /// the hard upper bound of scrub time
- utime_t deadline;
- ScrubJob() : cct(nullptr) {}
- explicit ScrubJob(CephContext* cct, const spg_t& pg,
- const utime_t& timestamp,
- double pool_scrub_min_interval = 0,
- double pool_scrub_max_interval = 0, bool must = true);
- /// order the jobs by sched_time
- bool operator<(const ScrubJob& rhs) const;
- };
- set<ScrubJob> sched_scrub_pg;
-
- /// @returns the scrub_reg_stamp used for unregister the scrub job
- utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
- double pool_scrub_max_interval, bool must) {
- ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
- must);
- Mutex::Locker l(sched_scrub_lock);
- sched_scrub_pg.insert(scrub);
- return scrub.sched_time;
- }
- void unreg_pg_scrub(spg_t pgid, utime_t t) {
- Mutex::Locker l(sched_scrub_lock);
- size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
- assert(removed);
- }
- bool first_scrub_stamp(ScrubJob *out) {
- Mutex::Locker l(sched_scrub_lock);
- if (sched_scrub_pg.empty())
- return false;
- set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
- *out = *iter;
- return true;
- }
- bool next_scrub_stamp(const ScrubJob& next,
- ScrubJob *out) {
- Mutex::Locker l(sched_scrub_lock);
- if (sched_scrub_pg.empty())
- return false;
- set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
- if (iter == sched_scrub_pg.cend())
- return false;
- ++iter;
- if (iter == sched_scrub_pg.cend())
- return false;
- *out = *iter;
- return true;
- }
-
- void dumps_scrub(Formatter *f) {
- assert(f != nullptr);
- Mutex::Locker l(sched_scrub_lock);
-
- f->open_array_section("scrubs");
- for (const auto &i: sched_scrub_pg) {
- f->open_object_section("scrub");
- f->dump_stream("pgid") << i.pgid;
- f->dump_stream("sched_time") << i.sched_time;
- f->dump_stream("deadline") << i.deadline;
- f->dump_bool("forced", i.sched_time == i.deadline);
- f->close_section();
- }
- f->close_section();
- }
-
- bool can_inc_scrubs_pending();
- bool inc_scrubs_pending();
- void inc_scrubs_active(bool reserved);
- void dec_scrubs_pending();
- void dec_scrubs_active();
void reply_op_error(OpRequestRef op, int err);
- void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
+ void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
+ std::vector<pg_log_op_return_item_t> op_returns);
void handle_misdirected_op(PG *pg, OpRequestRef op);
+ private:
+ /**
+ * The entity that maintains the set of PGs we may scrub (i.e. - those that we
+ * are their primary), and schedules their scrubbing.
+ */
+ ScrubQueue m_scrub_queue;
-private:
+ public:
+ ScrubQueue& get_scrub_services() { return m_scrub_queue; }
+
+ /**
+ * A callback used by the ScrubQueue object to initiate a scrub on a specific PG.
+ *
+ * The request might fail for multiple reasons, as ScrubQueue cannot by its own
+ * check some of the PG-specific preconditions and those are checked here. See
+ * attempt_t definition.
+ *
+ * @param pgid to scrub
+ * @param allow_requested_repair_only
+ * @return a Scrub::attempt_t detailing either a success, or the failure reason.
+ */
+ Scrub::schedule_result_t initiate_a_scrub(spg_t pgid, bool allow_requested_repair_only);
+
+
+ private:
// -- agent shared state --
- Mutex agent_lock;
- Cond agent_cond;
- map<uint64_t, set<PGRef> > agent_queue;
- set<PGRef>::iterator agent_queue_pos;
+ ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
+ ceph::condition_variable agent_cond;
+ std::map<uint64_t, std::set<PGRef> > agent_queue;
+ std::set<PGRef>::iterator agent_queue_pos;
bool agent_valid_iterator;
int agent_ops;
int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
- set<hobject_t> agent_oids;
+ std::set<hobject_t> agent_oids;
bool agent_active;
struct AgentThread : public Thread {
OSDService *osd;
}
} agent_thread;
bool agent_stop_flag;
- Mutex agent_timer_lock;
+ ceph::mutex agent_timer_lock = ceph::make_mutex("OSDService::agent_timer_lock");
SafeTimer agent_timer;
public:
if (!agent_queue.empty() &&
agent_queue.rbegin()->first < priority)
agent_valid_iterator = false; // inserting higher-priority queue
- set<PGRef>& nq = agent_queue[priority];
+ std::set<PGRef>& nq = agent_queue[priority];
if (nq.empty())
- agent_cond.Signal();
+ agent_cond.notify_all();
nq.insert(pg);
}
void _dequeue(PG *pg, uint64_t old_priority) {
- set<PGRef>& oq = agent_queue[old_priority];
- set<PGRef>::iterator p = oq.find(pg);
- assert(p != oq.end());
+ std::set<PGRef>& oq = agent_queue[old_priority];
+ std::set<PGRef>::iterator p = oq.find(pg);
+ ceph_assert(p != oq.end());
if (p == agent_queue_pos)
++agent_queue_pos;
oq.erase(p);
/// enable agent for a pg
void agent_enable_pg(PG *pg, uint64_t priority) {
- Mutex::Locker l(agent_lock);
+ std::lock_guard l(agent_lock);
_enqueue(pg, priority);
}
/// adjust priority for an enagled pg
void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
- Mutex::Locker l(agent_lock);
- assert(new_priority != old_priority);
+ std::lock_guard l(agent_lock);
+ ceph_assert(new_priority != old_priority);
_enqueue(pg, new_priority);
_dequeue(pg, old_priority);
}
/// disable agent for a pg
void agent_disable_pg(PG *pg, uint64_t old_priority) {
- Mutex::Locker l(agent_lock);
+ std::lock_guard l(agent_lock);
_dequeue(pg, old_priority);
}
/// note start of an async (evict) op
void agent_start_evict_op() {
- Mutex::Locker l(agent_lock);
+ std::lock_guard l(agent_lock);
++agent_ops;
}
/// note finish or cancellation of an async (evict) op
void agent_finish_evict_op() {
- Mutex::Locker l(agent_lock);
- assert(agent_ops > 0);
+ std::lock_guard l(agent_lock);
+ ceph_assert(agent_ops > 0);
--agent_ops;
- agent_cond.Signal();
+ agent_cond.notify_all();
}
/// note start of an async (flush) op
void agent_start_op(const hobject_t& oid) {
- Mutex::Locker l(agent_lock);
+ std::lock_guard l(agent_lock);
++agent_ops;
- assert(agent_oids.count(oid) == 0);
+ ceph_assert(agent_oids.count(oid) == 0);
agent_oids.insert(oid);
}
/// note finish or cancellation of an async (flush) op
void agent_finish_op(const hobject_t& oid) {
- Mutex::Locker l(agent_lock);
- assert(agent_ops > 0);
+ std::lock_guard l(agent_lock);
+ ceph_assert(agent_ops > 0);
--agent_ops;
- assert(agent_oids.count(oid) == 1);
+ ceph_assert(agent_oids.count(oid) == 1);
agent_oids.erase(oid);
- agent_cond.Signal();
+ agent_cond.notify_all();
}
/// check if we are operating on an object
bool agent_is_active_oid(const hobject_t& oid) {
- Mutex::Locker l(agent_lock);
+ std::lock_guard l(agent_lock);
return agent_oids.count(oid);
}
/// get count of active agent ops
int agent_get_num_ops() {
- Mutex::Locker l(agent_lock);
+ std::lock_guard l(agent_lock);
return agent_ops;
}
void agent_inc_high_count() {
- Mutex::Locker l(agent_lock);
+ std::lock_guard l(agent_lock);
flush_mode_high_count ++;
}
void agent_dec_high_count() {
- Mutex::Locker l(agent_lock);
+ std::lock_guard l(agent_lock);
flush_mode_high_count --;
}
private:
/// throttle promotion attempts
- std::atomic_uint promote_probability_millis{1000}; ///< probability thousands. one word.
+ std::atomic<unsigned int> promote_probability_millis{1000}; ///< probability thousands. one word.
PromoteCounter promote_counter;
utime_t last_recalibrate;
unsigned long promote_max_objects, promote_max_bytes;
promote_counter.finish(bytes);
}
void promote_throttle_recalibrate();
+ unsigned get_num_shards() const {
+ return m_objecter_finishers;
+ }
+ Finisher* get_objecter_finisher(int shard) {
+ return objecter_finishers[shard].get();
+ }
// -- Objecter, for tiering reads/writes from/to other OSDs --
- Objecter *objecter;
- Finisher objecter_finisher;
+ ceph::async::io_context_pool& poolctx;
+ std::unique_ptr<Objecter> objecter;
+ int m_objecter_finishers;
+ std::vector<std::unique_ptr<Finisher>> objecter_finishers;
// -- Watch --
- Mutex watch_lock;
+ ceph::mutex watch_lock = ceph::make_mutex("OSDService::watch_lock");
SafeTimer watch_timer;
uint64_t next_notif_id;
uint64_t get_next_id(epoch_t cur_epoch) {
- Mutex::Locker l(watch_lock);
+ std::lock_guard l(watch_lock);
return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
}
// -- Recovery/Backfill Request Scheduling --
- Mutex recovery_request_lock;
+ ceph::mutex recovery_request_lock = ceph::make_mutex("OSDService::recovery_request_lock");
SafeTimer recovery_request_timer;
// For async recovery sleep
bool recovery_needs_sleep = true;
- utime_t recovery_schedule_time = utime_t();
+ ceph::real_clock::time_point recovery_schedule_time;
- Mutex recovery_sleep_lock;
- SafeTimer recovery_sleep_timer;
+ // For recovery & scrub & snap
+ ceph::mutex sleep_lock = ceph::make_mutex("OSDService::sleep_lock");
+ SafeTimer sleep_timer;
// -- tids --
// for ops i issue
- std::atomic_uint last_tid{0};
+ std::atomic<unsigned int> last_tid{0};
ceph_tid_t get_tid() {
return (ceph_tid_t)last_tid++;
}
// -- backfill_reservation --
Finisher reserver_finisher;
- AsyncReserver<spg_t> local_reserver;
- AsyncReserver<spg_t> remote_reserver;
+ AsyncReserver<spg_t, Finisher> local_reserver;
+ AsyncReserver<spg_t, Finisher> remote_reserver;
+
+ // -- pg merge --
+ ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
+ std::map<pg_t,eversion_t> ready_to_merge_source; // pg -> version
+ std::map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target; // pg -> (version,les,lec)
+ std::set<pg_t> not_ready_to_merge_source;
+ std::map<pg_t,pg_t> not_ready_to_merge_target;
+ std::set<pg_t> sent_ready_to_merge_source;
+
+ void set_ready_to_merge_source(PG *pg,
+ eversion_t version);
+ void set_ready_to_merge_target(PG *pg,
+ eversion_t version,
+ epoch_t last_epoch_started,
+ epoch_t last_epoch_clean);
+ void set_not_ready_to_merge_source(pg_t source);
+ void set_not_ready_to_merge_target(pg_t target, pg_t source);
+ void clear_ready_to_merge(PG *pg);
+ void send_ready_to_merge();
+ void _send_ready_to_merge();
+ void clear_sent_ready_to_merge();
+ void prune_sent_ready_to_merge(const OSDMapRef& osdmap);
// -- pg_temp --
private:
- Mutex pg_temp_lock;
+ ceph::mutex pg_temp_lock = ceph::make_mutex("OSDService::pg_temp_lock");
struct pg_temp_t {
- pg_temp_t()
- {}
- pg_temp_t(vector<int> v, bool f)
- : acting{v}, forced{f}
- {}
- vector<int> acting;
+ std::vector<int> acting;
bool forced = false;
};
- map<pg_t, pg_temp_t> pg_temp_wanted;
- map<pg_t, pg_temp_t> pg_temp_pending;
+ std::map<pg_t, pg_temp_t> pg_temp_wanted;
+ std::map<pg_t, pg_temp_t> pg_temp_pending;
void _sent_pg_temp();
friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
public:
- void queue_want_pg_temp(pg_t pgid, const vector<int>& want,
+ void queue_want_pg_temp(pg_t pgid, const std::vector<int>& want,
bool forced = false);
void remove_want_pg_temp(pg_t pgid);
void requeue_pg_temp();
void send_pg_temp();
+ ceph::mutex pg_created_lock = ceph::make_mutex("OSDService::pg_created_lock");
+ std::set<pg_t> pg_created;
void send_pg_created(pg_t pgid);
+ void prune_pg_created();
+ void send_pg_created();
- void queue_for_peering(PG *pg);
+ AsyncReserver<spg_t, Finisher> snap_reserver;
+ void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
+ void queue_for_snap_trim(PG *pg);
+ void queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority);
- Mutex snap_sleep_lock;
- SafeTimer snap_sleep_timer;
+ void queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_priority);
- Mutex scrub_sleep_lock;
- SafeTimer scrub_sleep_timer;
+ /// queue the message (-> event) that all replicas have reserved scrub resources for us
+ void queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority);
- AsyncReserver<spg_t> snap_reserver;
- void queue_for_snap_trim(PG *pg);
+ /// queue the message (-> event) that some replicas denied our scrub resources request
+ void queue_for_scrub_denied(PG* pg, Scrub::scrub_prio_t with_priority);
- void queue_for_scrub(PG *pg, bool with_high_priority) {
- unsigned scrub_queue_priority = pg->scrubber.priority;
- if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
- scrub_queue_priority = cct->_conf->osd_client_op_priority;
- }
- enqueue_back(
- pg->info.pgid,
- PGQueueable(
- PGScrub(pg->get_osdmap()->get_epoch()),
- cct->_conf->osd_scrub_cost,
- scrub_queue_priority,
- ceph_clock_now(),
- entity_inst_t(),
- pg->get_osdmap()->get_epoch()));
- }
+ /// Signals either (a) the end of a sleep period, or (b) a recheck of the availability
+ /// of the primary map being created by the backend.
+ void queue_for_scrub_resched(PG* pg, Scrub::scrub_prio_t with_priority);
+
+ /// Signals a change in the number of in-flight recovery writes
+ void queue_scrub_pushes_update(PG* pg, Scrub::scrub_prio_t with_priority);
+
+ /// Signals that all pending updates were applied
+ void queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority);
+
+ /// Signals that the selected chunk (objects range) is available for scrubbing
+ void queue_scrub_chunk_free(PG* pg, Scrub::scrub_prio_t with_priority);
+
+ /// The chunk selected is blocked by user operations, and cannot be scrubbed now
+ void queue_scrub_chunk_busy(PG* pg, Scrub::scrub_prio_t with_priority);
+
+ /// The block-range that was locked and prevented the scrubbing - is freed
+ void queue_scrub_unblocking(PG* pg, Scrub::scrub_prio_t with_priority);
+
+ /// Signals that all write OPs are done
+ void queue_scrub_digest_update(PG* pg, Scrub::scrub_prio_t with_priority);
+
+ /// Signals that the the local (Primary's) scrub map is ready
+ void queue_scrub_got_local_map(PG* pg, Scrub::scrub_prio_t with_priority);
+
+ /// Signals that we (the Primary) got all waited-for scrub-maps from our replicas
+ void queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority);
+
+ /// Signals that all chunks were handled
+ /// Note: always with high priority, as must be acted upon before the
+ /// next scrub request arrives from the Primary (and the primary is free
+ /// to send the request once the replica's map is received).
+ void queue_scrub_is_finished(PG* pg);
+
+ /// Signals that there are more chunks to handle
+ void queue_scrub_next_chunk(PG* pg, Scrub::scrub_prio_t with_priority);
+
+ /// Signals that we have finished comparing the maps for this chunk
+ /// Note: required, as in Crimson this operation is 'futurized'.
+ void queue_scrub_maps_compared(PG* pg, Scrub::scrub_prio_t with_priority);
+
+ void queue_for_rep_scrub(PG* pg,
+ Scrub::scrub_prio_t with_high_priority,
+ unsigned int qu_priority,
+ Scrub::act_token_t act_token);
+
+ /// Signals a change in the number of in-flight recovery writes
+ void queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority);
+
+ /// (not in Crimson) Queue a SchedReplica event to be sent to the replica, to
+ /// trigger a re-check of the availability of the scrub map prepared by the
+ /// backend.
+ void queue_for_rep_scrub_resched(PG* pg,
+ Scrub::scrub_prio_t with_high_priority,
+ unsigned int qu_priority,
+ Scrub::act_token_t act_token);
+
+ void queue_for_pg_delete(spg_t pgid, epoch_t e);
+ bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
private:
// -- pg recovery and associated throttling --
- Mutex recovery_lock;
- list<pair<epoch_t, PGRef> > awaiting_throttle;
+ ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock");
+ std::list<std::pair<epoch_t, PGRef> > awaiting_throttle;
+
+ /// queue a scrub-related message for a PG
+ template <class MSG_TYPE>
+ void queue_scrub_event_msg(PG* pg,
+ Scrub::scrub_prio_t with_priority,
+ unsigned int qu_priority,
+ Scrub::act_token_t act_token);
+
+ /// An alternative version of queue_scrub_event_msg(), in which the queuing priority is
+ /// provided by the executing scrub (i.e. taken from PgScrubber::m_flags)
+ template <class MSG_TYPE>
+ void queue_scrub_event_msg(PG* pg, Scrub::scrub_prio_t with_priority);
utime_t defer_recovery_until;
uint64_t recovery_ops_active;
uint64_t recovery_ops_reserved;
bool recovery_paused;
#ifdef DEBUG_RECOVERY_OIDS
- map<spg_t, set<hobject_t> > recovery_oids;
+ std::map<spg_t, std::set<hobject_t> > recovery_oids;
#endif
bool _recover_now(uint64_t *available_pushes);
void _maybe_queue_recovery();
void _queue_for_recovery(
- pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
- assert(recovery_lock.is_locked_by_me());
- enqueue_back(
- p.second->info.pgid,
- PGQueueable(
- PGRecovery(p.first, reserved_pushes),
- cct->_conf->osd_recovery_cost,
- cct->_conf->osd_recovery_priority,
- ceph_clock_now(),
- entity_inst_t(),
- p.first));
- }
+ std::pair<epoch_t, PGRef> p, uint64_t reserved_pushes);
public:
void start_recovery_op(PG *pg, const hobject_t& soid);
void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
bool is_recovery_active();
- void release_reserved_pushes(uint64_t pushes) {
- Mutex::Locker l(recovery_lock);
- assert(recovery_ops_reserved >= pushes);
- recovery_ops_reserved -= pushes;
- _maybe_queue_recovery();
- }
+ void release_reserved_pushes(uint64_t pushes);
void defer_recovery(float defer_for) {
defer_recovery_until = ceph_clock_now();
defer_recovery_until += defer_for;
}
void pause_recovery() {
- Mutex::Locker l(recovery_lock);
+ std::lock_guard l(recovery_lock);
recovery_paused = true;
}
bool recovery_is_paused() {
- Mutex::Locker l(recovery_lock);
+ std::lock_guard l(recovery_lock);
return recovery_paused;
}
void unpause_recovery() {
- Mutex::Locker l(recovery_lock);
+ std::lock_guard l(recovery_lock);
recovery_paused = false;
_maybe_queue_recovery();
}
void kick_recovery_queue() {
- Mutex::Locker l(recovery_lock);
+ std::lock_guard l(recovery_lock);
_maybe_queue_recovery();
}
void clear_queued_recovery(PG *pg) {
- Mutex::Locker l(recovery_lock);
- for (list<pair<epoch_t, PGRef> >::iterator i = awaiting_throttle.begin();
- i != awaiting_throttle.end();
- ) {
- if (i->second.get() == pg) {
- awaiting_throttle.erase(i);
- return;
- } else {
- ++i;
- }
- }
+ std::lock_guard l(recovery_lock);
+ awaiting_throttle.remove_if(
+ [pg](decltype(awaiting_throttle)::const_reference awaiting ) {
+ return awaiting.second.get() == pg;
+ });
}
+
+ unsigned get_target_pg_log_entries() const;
+
// delayed pg activation
void queue_for_recovery(PG *pg) {
- Mutex::Locker l(recovery_lock);
+ std::lock_guard l(recovery_lock);
- if (pg->get_state() & (PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL)) {
- awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
+ if (pg->is_forced_recovery_or_backfill()) {
+ awaiting_throttle.push_front(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
} else {
- awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
+ awaiting_throttle.push_back(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
}
_maybe_queue_recovery();
}
void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
- Mutex::Locker l(recovery_lock);
- _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
+ std::lock_guard l(recovery_lock);
+ _queue_for_recovery(std::make_pair(queued, pg), reserved_pushes);
}
- void adjust_pg_priorities(const vector<PGRef>& pgs, int newflags);
+ void queue_check_readable(spg_t spgid,
+ epoch_t lpr,
+ ceph::signedspan delay = ceph::signedspan::zero());
// osd map cache (past osd maps)
- Mutex map_cache_lock;
+ ceph::mutex map_cache_lock = ceph::make_mutex("OSDService::map_cache_lock");
SharedLRU<epoch_t, const OSDMap> map_cache;
- SimpleLRU<epoch_t, bufferlist> map_bl_cache;
- SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
+ SimpleLRU<epoch_t, ceph::buffer::list> map_bl_cache;
+ SimpleLRU<epoch_t, ceph::buffer::list> map_bl_inc_cache;
OSDMapRef try_get_map(epoch_t e);
OSDMapRef get_map(epoch_t e) {
OSDMapRef ret(try_get_map(e));
- assert(ret);
+ ceph_assert(ret);
return ret;
}
OSDMapRef add_map(OSDMap *o) {
- Mutex::Locker l(map_cache_lock);
+ std::lock_guard l(map_cache_lock);
return _add_map(o);
}
OSDMapRef _add_map(OSDMap *o);
- void add_map_bl(epoch_t e, bufferlist& bl) {
- Mutex::Locker l(map_cache_lock);
- return _add_map_bl(e, bl);
- }
- void pin_map_bl(epoch_t e, bufferlist &bl);
- void _add_map_bl(epoch_t e, bufferlist& bl);
- bool get_map_bl(epoch_t e, bufferlist& bl) {
- Mutex::Locker l(map_cache_lock);
+ void _add_map_bl(epoch_t e, ceph::buffer::list& bl);
+ bool get_map_bl(epoch_t e, ceph::buffer::list& bl) {
+ std::lock_guard l(map_cache_lock);
return _get_map_bl(e, bl);
}
- bool _get_map_bl(epoch_t e, bufferlist& bl);
+ bool _get_map_bl(epoch_t e, ceph::buffer::list& bl);
- void add_map_inc_bl(epoch_t e, bufferlist& bl) {
- Mutex::Locker l(map_cache_lock);
- return _add_map_inc_bl(e, bl);
- }
- void pin_map_inc_bl(epoch_t e, bufferlist &bl);
- void _add_map_inc_bl(epoch_t e, bufferlist& bl);
- bool get_inc_map_bl(epoch_t e, bufferlist& bl);
+ void _add_map_inc_bl(epoch_t e, ceph::buffer::list& bl);
+ bool get_inc_map_bl(epoch_t e, ceph::buffer::list& bl);
- void clear_map_bl_cache_pins(epoch_t e);
+ /// identify split child pgids over a osdmap interval
+ void identify_splits_and_merges(
+ OSDMapRef old_map,
+ OSDMapRef new_map,
+ spg_t pgid,
+ std::set<std::pair<spg_t,epoch_t>> *new_children,
+ std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
void need_heartbeat_peer_update();
- void pg_stat_queue_enqueue(PG *pg);
- void pg_stat_queue_dequeue(PG *pg);
-
void init();
- void final_init();
+ void final_init();
void start_shutdown();
void shutdown_reserver();
void shutdown();
-private:
- // split
- Mutex in_progress_split_lock;
- map<spg_t, spg_t> pending_splits; // child -> parent
- map<spg_t, set<spg_t> > rev_pending_splits; // parent -> [children]
- set<spg_t> in_progress_splits; // child
-
-public:
- void _start_split(spg_t parent, const set<spg_t> &children);
- void start_split(spg_t parent, const set<spg_t> &children) {
- Mutex::Locker l(in_progress_split_lock);
- return _start_split(parent, children);
- }
- void mark_split_in_progress(spg_t parent, const set<spg_t> &pgs);
- void complete_split(const set<spg_t> &pgs);
- void cancel_pending_splits_for_parent(spg_t parent);
- void _cancel_pending_splits_for_parent(spg_t parent);
- bool splitting(spg_t pgid);
- void expand_pg_num(OSDMapRef old_map,
- OSDMapRef new_map);
- void _maybe_split_pgid(OSDMapRef old_map,
- OSDMapRef new_map,
- spg_t pgid);
- void init_splits_between(spg_t pgid, OSDMapRef frommap, OSDMapRef tomap);
-
// -- stats --
- Mutex stat_lock;
+ ceph::mutex stat_lock = ceph::make_mutex("OSDService::stat_lock");
osd_stat_t osd_stat;
uint32_t seq = 0;
- void update_osd_stat(vector<int>& hb_peers);
- osd_stat_t set_osd_stat(const struct store_statfs_t &stbuf,
- vector<int>& hb_peers,
- int num_pgs);
+ void set_statfs(const struct store_statfs_t &stbuf,
+ osd_alert_list_t& alerts);
+ osd_stat_t set_osd_stat(std::vector<int>& hb_peers, int num_pgs);
+ void inc_osd_stat_repaired(void);
+ float compute_adjusted_ratio(osd_stat_t new_stat, float *pratio, uint64_t adjust_used = 0);
osd_stat_t get_osd_stat() {
- Mutex::Locker l(stat_lock);
+ std::lock_guard l(stat_lock);
++seq;
osd_stat.up_from = up_epoch;
osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
return osd_stat;
}
uint64_t get_osd_stat_seq() {
- Mutex::Locker l(stat_lock);
+ std::lock_guard l(stat_lock);
return osd_stat.seq;
}
+ void get_hb_pingtime(std::map<int, osd_stat_t::Interfaces> *pp)
+ {
+ std::lock_guard l(stat_lock);
+ *pp = osd_stat.hb_pingtime;
+ return;
+ }
// -- OSD Full Status --
private:
friend TestOpsSocketHook;
- mutable Mutex full_status_lock;
+ mutable ceph::mutex full_status_lock = ceph::make_mutex("OSDService::full_status_lock");
enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
const char *get_full_state_name(s_names s) const {
switch (s) {
default: return "???";
}
}
- s_names get_full_state(string type) const {
+ s_names get_full_state(std::string type) const {
if (type == "none")
return NONE;
else if (type == "failsafe")
else
return INVALID;
}
- double cur_ratio; ///< current utilization
+ double cur_ratio, physical_ratio; ///< current utilization
mutable int64_t injectfull = 0;
s_names injectfull_state = NONE;
float get_failsafe_full_ratio();
- void check_full_status(float ratio);
- bool _check_full(s_names type, ostream &ss) const;
+ bool _check_inject_full(DoutPrefixProvider *dpp, s_names type) const;
+ bool _check_full(DoutPrefixProvider *dpp, s_names type) const;
public:
- bool check_failsafe_full(ostream &ss) const;
- bool check_full(ostream &ss) const;
- bool check_backfill_full(ostream &ss) const;
- bool check_nearfull(ostream &ss) const;
+ void check_full_status(float ratio, float pratio);
+ s_names recalc_full_state(float ratio, float pratio, std::string &inject);
+ bool _tentative_full(DoutPrefixProvider *dpp, s_names type, uint64_t adjust_used, osd_stat_t);
+ bool check_failsafe_full(DoutPrefixProvider *dpp) const;
+ bool check_full(DoutPrefixProvider *dpp) const;
+ bool tentative_backfill_full(DoutPrefixProvider *dpp, uint64_t adjust_used, osd_stat_t);
+ bool check_backfill_full(DoutPrefixProvider *dpp) const;
+ bool check_nearfull(DoutPrefixProvider *dpp) const;
bool is_failsafe_full() const;
bool is_full() const;
bool is_backfillfull() const;
bool is_nearfull() const;
bool need_fullness_update(); ///< osdmap state needs update
void set_injectfull(s_names type, int64_t count);
- bool check_osdmap_full(const set<pg_shard_t> &missing_on);
// -- epochs --
private:
- mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
+ // protects access to boot_epoch, up_epoch, bind_epoch
+ mutable ceph::mutex epoch_lock = ceph::make_mutex("OSDService::epoch_lock");
epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
epoch_t up_epoch; // _most_recent_ epoch we were marked up
epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
public:
/**
- * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
+ * Retrieve the boot_, up_, and bind_ epochs the OSD has std::set. The params
* can be NULL if you don't care about them.
*/
void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
epoch_t *_bind_epoch) const;
/**
- * Set the boot, up, and bind epochs. Any NULL params will not be set.
+ * Std::set the boot, up, and bind epochs. Any NULL params will not be std::set.
*/
void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
const epoch_t *_bind_epoch);
void request_osdmap_update(epoch_t e);
+ // -- heartbeats --
+ ceph::mutex hb_stamp_lock = ceph::make_mutex("OSDServce::hb_stamp_lock");
+
+ /// osd -> heartbeat stamps
+ std::vector<HeartbeatStampsRef> hb_stamps;
+
+ /// get or create a ref for a peer's HeartbeatStamps
+ HeartbeatStampsRef get_hb_stamps(unsigned osd);
+
+
+ // Timer for readable leases
+ ceph::timer<ceph::mono_clock> mono_timer = ceph::timer<ceph::mono_clock>{ceph::construct_suspended};
+
+ void queue_renew_lease(epoch_t epoch, spg_t spgid);
+
// -- stopping --
- Mutex is_stopping_lock;
- Cond is_stopping_cond;
+ ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock");
+ ceph::condition_variable is_stopping_cond;
enum {
NOT_STOPPING,
PREPARING_TO_STOP,
STOPPING };
- std::atomic_int state{NOT_STOPPING};
- int get_state() {
+ std::atomic<int> state{NOT_STOPPING};
+ int get_state() const {
return state;
}
void set_state(int s) {
#ifdef PG_DEBUG_REFS
- Mutex pgid_lock;
- map<spg_t, int> pgid_tracker;
- map<spg_t, PG*> live_pgs;
+ ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
+ std::map<spg_t, int> pgid_tracker;
+ std::map<spg_t, PG*> live_pgs;
void add_pgid(spg_t pgid, PG *pg);
void remove_pgid(spg_t pgid, PG *pg);
void dump_live_pgids();
#endif
- explicit OSDService(OSD *osd);
- ~OSDService();
+ explicit OSDService(OSD *osd, ceph::async::io_context_pool& poolctx);
+ ~OSDService() = default;
+};
+
+/*
+
+ Each PG slot includes queues for events that are processing and/or waiting
+ for a PG to be materialized in the slot.
+
+ These are the constraints:
+
+ - client ops must remained ordered by client, regardless of std::map epoch
+ - peering messages/events from peers must remain ordered by peer
+ - peering messages and client ops need not be ordered relative to each other
+
+ - some peering events can create a pg (e.g., notify)
+ - the query peering event can proceed when a PG doesn't exist
+
+ Implementation notes:
+
+ - everybody waits for split. If the OSD has the parent PG it will instantiate
+ the PGSlot early and mark it waiting_for_split. Everything will wait until
+ the parent is able to commit the split operation and the child PG's are
+ materialized in the child slots.
+
+ - every event has an epoch property and will wait for the OSDShard to catch
+ up to that epoch. For example, if we get a peering event from a future
+ epoch, the event will wait in the slot until the local OSD has caught up.
+ (We should be judicious in specifying the required epoch [by, e.g., setting
+ it to the same_interval_since epoch] so that we don't wait for epochs that
+ don't affect the given PG.)
+
+ - we maintain two separate wait lists, *waiting* and *waiting_peering*. The
+ OpSchedulerItem has an is_peering() bool to determine which we use. Waiting
+ peering events are queued up by epoch required.
+
+ - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
+ materialized the PG), we wake *all* waiting items. (This could be optimized,
+ probably, but we don't bother.) We always requeue peering items ahead of
+ client ops.
+
+ - some peering events are marked !peering_requires_pg (PGQuery). if we do
+ not have a PG these are processed immediately (under the shard lock).
+
+ - we do not have a PG present, we check if the slot maps to the current host.
+ if so, we either queue the item and wait for the PG to materialize, or
+ (if the event is a pg creating event like PGNotify), we materialize the PG.
+
+ - when we advance the osdmap on the OSDShard, we scan pg slots and
+ discard any slots with no pg (and not waiting_for_split) that no
+ longer std::map to the current host.
+
+ */
+
+struct OSDShardPGSlot {
+ using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
+ PGRef pg; ///< pg reference
+ std::deque<OpSchedulerItem> to_process; ///< order items for this slot
+ int num_running = 0; ///< _process threads doing pg lookup/lock
+
+ std::deque<OpSchedulerItem> waiting; ///< waiting for pg (or map + pg)
+
+ /// waiting for map (peering evt)
+ std::map<epoch_t,std::deque<OpSchedulerItem>> waiting_peering;
+
+ /// incremented by wake_pg_waiters; indicates racing _process threads
+ /// should bail out (their op has been requeued)
+ uint64_t requeue_seq = 0;
+
+ /// waiting for split child to materialize in these epoch(s)
+ std::set<epoch_t> waiting_for_split;
+
+ epoch_t epoch = 0;
+ boost::intrusive::set_member_hook<> pg_epoch_item;
+
+ /// waiting for a merge (source or target) by this epoch
+ epoch_t waiting_for_merge_epoch = 0;
+};
+
+struct OSDShard {
+ const unsigned shard_id;
+ CephContext *cct;
+ OSD *osd;
+
+ std::string shard_name;
+
+ std::string sdata_wait_lock_name;
+ ceph::mutex sdata_wait_lock;
+ ceph::condition_variable sdata_cond;
+ int waiting_threads = 0;
+
+ ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock
+ OSDMapRef shard_osdmap;
+
+ OSDMapRef get_osdmap() {
+ std::lock_guard l(osdmap_lock);
+ return shard_osdmap;
+ }
+
+ std::string shard_lock_name;
+ ceph::mutex shard_lock; ///< protects remaining members below
+
+ /// map of slots for each spg_t. maintains ordering of items dequeued
+ /// from scheduler while _process thread drops shard lock to acquire the
+ /// pg lock. stale slots are removed by consume_map.
+ std::unordered_map<spg_t,std::unique_ptr<OSDShardPGSlot>> pg_slots;
+
+ struct pg_slot_compare_by_epoch {
+ bool operator()(const OSDShardPGSlot& l, const OSDShardPGSlot& r) const {
+ return l.epoch < r.epoch;
+ }
+ };
+
+ /// maintain an ordering of pg slots by pg epoch
+ boost::intrusive::multiset<
+ OSDShardPGSlot,
+ boost::intrusive::member_hook<
+ OSDShardPGSlot,
+ boost::intrusive::set_member_hook<>,
+ &OSDShardPGSlot::pg_epoch_item>,
+ boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch;
+ int waiting_for_min_pg_epoch = 0;
+ ceph::condition_variable min_pg_epoch_cond;
+
+ /// priority queue
+ ceph::osd::scheduler::OpSchedulerRef scheduler;
+
+ bool stop_waiting = false;
+
+ ContextQueue context_queue;
+
+ void _attach_pg(OSDShardPGSlot *slot, PG *pg);
+ void _detach_pg(OSDShardPGSlot *slot);
+
+ void update_pg_epoch(OSDShardPGSlot *slot, epoch_t epoch);
+ epoch_t get_min_pg_epoch();
+ void wait_min_pg_epoch(epoch_t need);
+
+ /// return newest epoch we are waiting for
+ epoch_t get_max_waiting_epoch();
+
+ /// push osdmap into shard
+ void consume_map(
+ const OSDMapRef& osdmap,
+ unsigned *pushes_to_free);
+
+ int _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
+
+ void identify_splits_and_merges(
+ const OSDMapRef& as_of_osdmap,
+ std::set<std::pair<spg_t,epoch_t>> *split_children,
+ std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
+ void _prime_splits(std::set<std::pair<spg_t,epoch_t>> *pgids);
+ void prime_splits(const OSDMapRef& as_of_osdmap,
+ std::set<std::pair<spg_t,epoch_t>> *pgids);
+ void prime_merges(const OSDMapRef& as_of_osdmap,
+ std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
+ void register_and_wake_split_child(PG *pg);
+ void unprime_split_children(spg_t parent, unsigned old_pg_num);
+ void update_scheduler_config();
+ std::string get_scheduler_type();
+
+ OSDShard(
+ int id,
+ CephContext *cct,
+ OSD *osd);
};
class OSD : public Dispatcher,
public md_config_obs_t {
+ using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
+
/** OSD **/
- Mutex osd_lock; // global lock
+ // global lock
+ ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
SafeTimer tick_timer; // safe timer (osd_lock)
// Tick timer for those stuff that do not need osd_lock
- Mutex tick_timer_lock;
+ ceph::mutex tick_timer_lock = ceph::make_mutex("OSD::tick_timer_lock");
SafeTimer tick_timer_without_osd_lock;
+ std::string gss_ktfile_client{};
+
public:
// config observer bits
const char** get_tracked_conf_keys() const override;
- void handle_conf_change(const struct md_config_t *conf,
+ void handle_conf_change(const ConfigProxy& conf,
const std::set <std::string> &changed) override;
void update_log_config();
void check_config();
const double OSD_TICK_INTERVAL = { 1.0 };
double get_tick_interval() const;
- AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry;
- AuthAuthorizeHandlerRegistry *authorize_handler_service_registry;
-
Messenger *cluster_messenger;
Messenger *client_messenger;
Messenger *objecter_messenger;
MgrClient mgrc;
PerfCounters *logger;
PerfCounters *recoverystate_perf;
- ObjectStore *store;
+ std::unique_ptr<ObjectStore> store;
#ifdef HAVE_LIBFUSE
FuseStore *fuse_store = nullptr;
#endif
int whoami;
std::string dev_path, journal_path;
+ ceph_release_t last_require_osd_release{ceph_release_t::unknown};
+
+ int numa_node = -1;
+ size_t numa_cpu_set_size = 0;
+ cpu_set_t numa_cpu_set;
+
bool store_is_rotational = true;
bool journal_is_rotational = true;
ZTracer::Endpoint trace_endpoint;
- void create_logger();
- void create_recoverystate_perf();
+ PerfCounters* create_logger();
+ PerfCounters* create_recoverystate_perf();
void tick();
void tick_without_osd_lock();
void _dispatch(Message *m);
void dispatch_op(OpRequestRef op);
- void check_osdmap_features(ObjectStore *store);
+ void check_osdmap_features();
// asok
friend class OSDSocketHook;
class OSDSocketHook *asok_hook;
- bool asok_command(string admin_command, cmdmap_t& cmdmap, string format, ostream& ss);
+ using PGRefOrError = std::tuple<std::optional<PGRef>, int>;
+ PGRefOrError locate_asok_target(const cmdmap_t& cmdmap,
+ std::stringstream& ss, bool only_primary);
+ int asok_route_to_pg(bool only_primary,
+ std::string_view prefix,
+ cmdmap_t cmdmap,
+ Formatter *f,
+ std::stringstream& ss,
+ const bufferlist& inbl,
+ bufferlist& outbl,
+ std::function<void(int, const std::string&, bufferlist&)> on_finish);
+ void asok_command(
+ std::string_view prefix,
+ const cmdmap_t& cmdmap,
+ ceph::Formatter *f,
+ const ceph::buffer::list& inbl,
+ std::function<void(int,const std::string&,ceph::buffer::list&)> on_finish);
public:
- ClassHandler *class_handler = nullptr;
int get_nodeid() { return whoami; }
-
+
static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
char foo[20];
snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
object_t("snapmapper"),
0)));
}
+ static ghobject_t make_purged_snaps_oid() {
+ return ghobject_t(hobject_t(
+ sobject_t(
+ object_t("purged_snaps"),
+ 0)));
+ }
static ghobject_t make_pg_log_oid(spg_t pg) {
- stringstream ss;
+ std::stringstream ss;
ss << "pglog_" << pg;
- string s;
+ std::string s;
getline(ss, s);
return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
}
-
+
static ghobject_t make_pg_biginfo_oid(spg_t pg) {
- stringstream ss;
+ std::stringstream ss;
ss << "pginfo_" << pg;
- string s;
+ std::string s;
getline(ss, s);
return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
}
hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
return ghobject_t(oid);
}
+
+ static ghobject_t make_final_pool_info_oid(int64_t pool) {
+ return ghobject_t(
+ hobject_t(
+ sobject_t(
+ object_t(std::string("final_pool_") + stringify(pool)),
+ CEPH_NOSNAP)));
+ }
+
+ static ghobject_t make_pg_num_history_oid() {
+ return ghobject_t(hobject_t(sobject_t("pg_num_history", CEPH_NOSNAP)));
+ }
+
static void recursive_remove_collection(CephContext* cct,
ObjectStore *store,
spg_t pgid,
/**
* get_osd_initial_compat_set()
*
- * Get the initial feature set for this OSD. Features
+ * Get the initial feature std::set for this OSD. Features
* here are automatically upgraded.
*
* Return value: Initial osd CompatSet
* Return value: CompatSet of all supported features
*/
static CompatSet get_osd_compat_set();
-
+
private:
class C_Tick;
class C_Tick_WithoutOSDLock;
+ // -- config settings --
+ float m_osd_pg_epoch_max_lag_factor;
+
// -- superblock --
OSDSuperblock superblock;
}
private:
- std::atomic_int state{STATE_INITIALIZING};
- bool waiting_for_luminous_mons = false;
+ std::atomic<int> state{STATE_INITIALIZING};
public:
int get_state() const {
private:
- ThreadPool peering_tp;
ShardedThreadPool osd_op_tp;
- ThreadPool disk_tp;
- ThreadPool command_tp;
- void set_disk_tp_priority();
void get_latest_osdmap();
// -- sessions --
private:
- void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
- void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
+ void dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap);
- Mutex session_waiting_lock;
- set<Session*> session_waiting_for_map;
+ ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock");
+ std::set<ceph::ref_t<Session>> session_waiting_for_map;
/// Caller assumes refs for included Sessions
- void get_sessions_waiting_for_map(set<Session*> *out) {
- Mutex::Locker l(session_waiting_lock);
+ void get_sessions_waiting_for_map(std::set<ceph::ref_t<Session>> *out) {
+ std::lock_guard l(session_waiting_lock);
out->swap(session_waiting_for_map);
}
- void register_session_waiting_on_map(Session *session) {
- Mutex::Locker l(session_waiting_lock);
- if (session_waiting_for_map.insert(session).second) {
- session->get();
- }
+ void register_session_waiting_on_map(const ceph::ref_t<Session>& session) {
+ std::lock_guard l(session_waiting_lock);
+ session_waiting_for_map.insert(session);
}
- void clear_session_waiting_on_map(Session *session) {
- Mutex::Locker l(session_waiting_lock);
- set<Session*>::iterator i = session_waiting_for_map.find(session);
- if (i != session_waiting_for_map.end()) {
- (*i)->put();
- session_waiting_for_map.erase(i);
- }
+ void clear_session_waiting_on_map(const ceph::ref_t<Session>& session) {
+ std::lock_guard l(session_waiting_lock);
+ session_waiting_for_map.erase(session);
}
void dispatch_sessions_waiting_on_map() {
- set<Session*> sessions_to_check;
+ std::set<ceph::ref_t<Session>> sessions_to_check;
get_sessions_waiting_for_map(&sessions_to_check);
- for (set<Session*>::iterator i = sessions_to_check.begin();
+ for (auto i = sessions_to_check.begin();
i != sessions_to_check.end();
sessions_to_check.erase(i++)) {
- (*i)->session_dispatch_lock.Lock();
- dispatch_session_waiting(*i, osdmap);
- (*i)->session_dispatch_lock.Unlock();
- (*i)->put();
+ std::lock_guard l{(*i)->session_dispatch_lock};
+ dispatch_session_waiting(*i, get_osdmap());
}
}
- void session_handle_reset(Session *session) {
- Mutex::Locker l(session->session_dispatch_lock);
+ void session_handle_reset(const ceph::ref_t<Session>& session) {
+ std::lock_guard l(session->session_dispatch_lock);
clear_session_waiting_on_map(session);
session->clear_backoffs();
void osdmap_subscribe(version_t epoch, bool force_request);
/** @} monc helpers */
- Mutex osdmap_subscribe_lock;
+ ceph::mutex osdmap_subscribe_lock = ceph::make_mutex("OSD::osdmap_subscribe_lock");
epoch_t latest_subscribed_epoch{0};
// -- heartbeat --
utime_t last_rx_front; ///< last time we got a ping reply on the front side
utime_t last_rx_back; ///< last time we got a ping reply on the back side
epoch_t epoch; ///< most recent epoch we wanted this peer
+ /// number of connections we send and receive heartbeat pings/replies
+ static constexpr int HEARTBEAT_MAX_CONN = 2;
+ /// history of inflight pings, arranging by timestamp we sent
+ /// send time -> deadline -> remaining replies
+ std::map<utime_t, std::pair<utime_t, int>> ping_history;
+
+ utime_t hb_interval_start;
+ uint32_t hb_average_count = 0;
+ uint32_t hb_index = 0;
+
+ uint32_t hb_total_back = 0;
+ uint32_t hb_min_back = UINT_MAX;
+ uint32_t hb_max_back = 0;
+ std::vector<uint32_t> hb_back_pingtime;
+ std::vector<uint32_t> hb_back_min;
+ std::vector<uint32_t> hb_back_max;
+
+ uint32_t hb_total_front = 0;
+ uint32_t hb_min_front = UINT_MAX;
+ uint32_t hb_max_front = 0;
+ std::vector<uint32_t> hb_front_pingtime;
+ std::vector<uint32_t> hb_front_min;
+ std::vector<uint32_t> hb_front_max;
+
+ bool is_stale(utime_t stale) const {
+ if (ping_history.empty()) {
+ return false;
+ }
+ utime_t oldest_deadline = ping_history.begin()->second.first;
+ return oldest_deadline <= stale;
+ }
+
+ bool is_unhealthy(utime_t now) const {
+ if (ping_history.empty()) {
+ /// we haven't sent a ping yet or we have got all replies,
+ /// in either way we are safe and healthy for now
+ return false;
+ }
- bool is_unhealthy(utime_t cutoff) const {
- return
- ! ((last_rx_front > cutoff ||
- (last_rx_front == utime_t() && (last_tx == utime_t() ||
- first_tx > cutoff))) &&
- (last_rx_back > cutoff ||
- (last_rx_back == utime_t() && (last_tx == utime_t() ||
- first_tx > cutoff))));
+ utime_t oldest_deadline = ping_history.begin()->second.first;
+ return now > oldest_deadline;
}
- bool is_healthy(utime_t cutoff) const {
- return last_rx_front > cutoff && last_rx_back > cutoff;
+
+ bool is_healthy(utime_t now) const {
+ if (last_rx_front == utime_t() || last_rx_back == utime_t()) {
+ // only declare to be healthy until we have received the first
+ // replies from both front/back connections
+ return false;
+ }
+ return !is_unhealthy(now);
}
+ void clear_mark_down(Connection *except = nullptr) {
+ if (con_back && con_back != except) {
+ con_back->mark_down();
+ con_back->clear_priv();
+ con_back.reset(nullptr);
+ }
+ if (con_front && con_front != except) {
+ con_front->mark_down();
+ con_front->clear_priv();
+ con_front.reset(nullptr);
+ }
+ }
};
- /// state attached to outgoing heartbeat connections
- struct HeartbeatSession : public RefCountedObject {
- int peer;
- explicit HeartbeatSession(int p) : peer(p) {}
- };
- Mutex heartbeat_lock;
- map<int, int> debug_heartbeat_drops_remaining;
- Cond heartbeat_cond;
+
+ ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
+ std::map<int, int> debug_heartbeat_drops_remaining;
+ ceph::condition_variable heartbeat_cond;
bool heartbeat_stop;
- std::atomic_bool heartbeat_need_update;
- map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
+ std::atomic<bool> heartbeat_need_update;
+ std::map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
utime_t last_mon_heartbeat;
Messenger *hb_front_client_messenger;
Messenger *hb_back_client_messenger;
Messenger *hb_back_server_messenger;
utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
double daily_loadavg;
-
+ ceph::mono_time startup_time;
+
+ // Track ping repsonse times using vector as a circular buffer
+ // MUST BE A POWER OF 2
+ const uint32_t hb_vector_size = 16;
+
void _add_heartbeat_peer(int p);
void _remove_heartbeat_peer(int p);
bool heartbeat_reset(Connection *con);
void maybe_update_heartbeat_peers();
- void reset_heartbeat_peers();
+ void reset_heartbeat_peers(bool all);
bool heartbeat_peers_need_update() {
return heartbeat_need_update.load();
}
void need_heartbeat_peer_update();
void heartbeat_kick() {
- Mutex::Locker l(heartbeat_lock);
- heartbeat_cond.Signal();
+ std::lock_guard l(heartbeat_lock);
+ heartbeat_cond.notify_all();
}
struct T_Heartbeat : public Thread {
bool ms_can_fast_dispatch_any() const override { return true; }
bool ms_can_fast_dispatch(const Message *m) const override {
switch (m->get_type()) {
- case CEPH_MSG_PING:
- case MSG_OSD_PING:
- return true;
- default:
- return false;
- }
+ case CEPH_MSG_PING:
+ case MSG_OSD_PING:
+ return true;
+ default:
+ return false;
+ }
}
void ms_fast_dispatch(Message *m) override {
osd->heartbeat_dispatch(m);
bool ms_handle_refused(Connection *con) override {
return osd->ms_handle_refused(con);
}
- bool ms_verify_authorizer(Connection *con, int peer_type,
- int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
- bool& isvalid, CryptoKey& session_key,
- std::unique_ptr<AuthAuthorizerChallenge> *challenge) override {
- isvalid = true;
+ int ms_handle_authentication(Connection *con) override {
return true;
}
} heartbeat_dispatcher;
private:
// -- waiters --
- list<OpRequestRef> finished;
-
- void take_waiters(list<OpRequestRef>& ls) {
- assert(osd_lock.is_locked());
+ std::list<OpRequestRef> finished;
+
+ void take_waiters(std::list<OpRequestRef>& ls) {
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
finished.splice(finished.end(), ls);
}
void do_waiters();
-
+
// -- op tracking --
OpTracker op_tracker;
- void check_ops_in_flight();
- void test_ops(std::string command, std::string args, ostream& ss);
+ void test_ops(std::string command, std::string args, std::ostream& ss);
friend class TestOpsSocketHook;
TestOpsSocketHook *test_ops_hook;
- friend struct C_CompleteSplits;
+ friend struct C_FinishSplits;
friend struct C_OpenPGs;
- // -- op queue --
- enum class io_queue {
- prioritized,
- weightedpriority,
- mclock_opclass,
- mclock_client,
- };
- friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
-
- const io_queue op_queue;
- const unsigned int op_prio_cutoff;
+protected:
/*
* The ordered op delivery chain is:
*
- * fast dispatch -> pqueue back
- * pqueue front <-> to_process back
+ * fast dispatch -> scheduler back
+ * scheduler front <-> to_process back
* to_process front -> RunVis(item)
* <- queue_front()
*
- * The pqueue is per-shard, and to_process is per pg_slot. Items can be
- * pushed back up into to_process and/or pqueue while order is preserved.
+ * The scheduler is per-shard, and to_process is per pg_slot. Items can be
+ * pushed back up into to_process and/or scheduler while order is preserved.
*
* Multiple worker threads can operate on each shard.
*
- * Under normal circumstances, num_running == to_proces.size(). There are
+ * Under normal circumstances, num_running == to_process.size(). There are
* two times when that is not true: (1) when waiting_for_pg == true and
* to_process is accumulating requests that are waiting for the pg to be
* instantiated; in that case they will all get requeued together by
* wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
* and already requeued the items.
*/
- friend class PGQueueable;
+ friend class ceph::osd::scheduler::PGOpItem;
+ friend class ceph::osd::scheduler::PGPeeringItem;
+ friend class ceph::osd::scheduler::PGRecovery;
+ friend class ceph::osd::scheduler::PGRecoveryMsg;
+ friend class ceph::osd::scheduler::PGDelete;
class ShardedOpWQ
- : public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
+ : public ShardedThreadPool::ShardedWQ<OpSchedulerItem>
{
- struct ShardData {
- Mutex sdata_lock;
- Cond sdata_cond;
-
- Mutex sdata_op_ordering_lock; ///< protects all members below
-
- OSDMapRef waiting_for_pg_osdmap;
- struct pg_slot {
- PGRef pg; ///< cached pg reference [optional]
- list<PGQueueable> to_process; ///< order items for this slot
- int num_running = 0; ///< _process threads doing pg lookup/lock
-
- /// true if pg does/did not exist. if so all new items go directly to
- /// to_process. cleared by prune_pg_waiters.
- bool waiting_for_pg = false;
-
- /// incremented by wake_pg_waiters; indicates racing _process threads
- /// should bail out (their op has been requeued)
- uint64_t requeue_seq = 0;
- };
-
- /// map of slots for each spg_t. maintains ordering of items dequeued
- /// from pqueue while _process thread drops shard lock to acquire the
- /// pg lock. slots are removed only by prune_pg_waiters.
- unordered_map<spg_t,pg_slot> pg_slots;
-
- /// priority queue
- std::unique_ptr<OpQueue< pair<spg_t, PGQueueable>, entity_inst_t>> pqueue;
-
- void _enqueue_front(pair<spg_t, PGQueueable> item, unsigned cutoff) {
- unsigned priority = item.second.get_priority();
- unsigned cost = item.second.get_cost();
- if (priority >= cutoff)
- pqueue->enqueue_strict_front(
- item.second.get_owner(),
- priority, item);
- else
- pqueue->enqueue_front(
- item.second.get_owner(),
- priority, cost, item);
- }
-
- ShardData(
- string lock_name, string ordering_lock,
- uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
- io_queue opqueue)
- : sdata_lock(lock_name.c_str(), false, true, false, cct),
- sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
- false, cct) {
- if (opqueue == io_queue::weightedpriority) {
- pqueue = std::unique_ptr
- <WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
- new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
- max_tok_per_prio, min_cost));
- } else if (opqueue == io_queue::prioritized) {
- pqueue = std::unique_ptr
- <PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
- new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
- max_tok_per_prio, min_cost));
- } else if (opqueue == io_queue::mclock_opclass) {
- pqueue = std::unique_ptr
- <ceph::mClockOpClassQueue>(new ceph::mClockOpClassQueue(cct));
- } else if (opqueue == io_queue::mclock_client) {
- pqueue = std::unique_ptr
- <ceph::mClockClientQueue>(new ceph::mClockClientQueue(cct));
- }
- }
- }; // struct ShardData
-
- vector<ShardData*> shard_list;
OSD *osd;
- uint32_t num_shards;
public:
- ShardedOpWQ(uint32_t pnum_shards,
- OSD *o,
- time_t ti,
- time_t si,
+ ShardedOpWQ(OSD *o,
+ ceph::timespan ti,
+ ceph::timespan si,
ShardedThreadPool* tp)
- : ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>(ti, si, tp),
- osd(o),
- num_shards(pnum_shards) {
- for (uint32_t i = 0; i < num_shards; i++) {
- char lock_name[32] = {0};
- snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
- char order_lock[32] = {0};
- snprintf(order_lock, sizeof(order_lock), "%s.%d",
- "OSD:ShardedOpWQ:order:", i);
- ShardData* one_shard = new ShardData(
- lock_name, order_lock,
- osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
- osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue);
- shard_list.push_back(one_shard);
- }
+ : ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
+ osd(o) {
}
- ~ShardedOpWQ() override {
- while (!shard_list.empty()) {
- delete shard_list.back();
- shard_list.pop_back();
- }
- }
-
- /// wake any pg waiters after a PG is created/instantiated
- void wake_pg_waiters(spg_t pgid);
- /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here
- void prune_pg_waiters(OSDMapRef osdmap, int whoami);
-
- /// clear cached PGRef on pg deletion
- void clear_pg_pointer(spg_t pgid);
-
- /// clear pg_slots on shutdown
- void clear_pg_slots();
+ void _add_slot_waiter(
+ spg_t token,
+ OSDShardPGSlot *slot,
+ OpSchedulerItem&& qi);
/// try to do some work
- void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
+ void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb) override;
/// enqueue a new item
- void _enqueue(pair <spg_t, PGQueueable> item) override;
+ void _enqueue(OpSchedulerItem&& item) override;
/// requeue an old item (at the front of the line)
- void _enqueue_front(pair <spg_t, PGQueueable> item) override;
-
+ void _enqueue_front(OpSchedulerItem&& item) override;
+
void return_waiting_threads() override {
- for(uint32_t i = 0; i < num_shards; i++) {
- ShardData* sdata = shard_list[i];
- assert (NULL != sdata);
- sdata->sdata_lock.Lock();
- sdata->sdata_cond.Signal();
- sdata->sdata_lock.Unlock();
+ for(uint32_t i = 0; i < osd->num_shards; i++) {
+ OSDShard* sdata = osd->shards[i];
+ assert (NULL != sdata);
+ std::scoped_lock l{sdata->sdata_wait_lock};
+ sdata->stop_waiting = true;
+ sdata->sdata_cond.notify_all();
}
}
- void dump(Formatter *f) {
- for(uint32_t i = 0; i < num_shards; i++) {
- ShardData* sdata = shard_list[i];
- char lock_name[32] = {0};
- snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
+ void stop_return_waiting_threads() override {
+ for(uint32_t i = 0; i < osd->num_shards; i++) {
+ OSDShard* sdata = osd->shards[i];
assert (NULL != sdata);
- sdata->sdata_op_ordering_lock.Lock();
- f->open_object_section(lock_name);
- sdata->pqueue->dump(f);
- f->close_section();
- sdata->sdata_op_ordering_lock.Unlock();
+ std::scoped_lock l{sdata->sdata_wait_lock};
+ sdata->stop_waiting = false;
}
}
- /// Must be called on ops queued back to front
- struct Pred {
- spg_t pgid;
- list<OpRequestRef> *out_ops;
- uint64_t reserved_pushes_to_free;
- Pred(spg_t pg, list<OpRequestRef> *out_ops = 0)
- : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
- void accumulate(const PGQueueable &op) {
- reserved_pushes_to_free += op.get_reserved_pushes();
- if (out_ops) {
- boost::optional<OpRequestRef> mop = op.maybe_get_op();
- if (mop)
- out_ops->push_front(*mop);
- }
- }
- bool operator()(const pair<spg_t, PGQueueable> &op) {
- if (op.first == pgid) {
- accumulate(op.second);
- return true;
- } else {
- return false;
- }
- }
- uint64_t get_reserved_pushes_to_free() const {
- return reserved_pushes_to_free;
+ void dump(ceph::Formatter *f) {
+ for(uint32_t i = 0; i < osd->num_shards; i++) {
+ auto &&sdata = osd->shards[i];
+
+ char queue_name[32] = {0};
+ snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i);
+ ceph_assert(NULL != sdata);
+
+ std::scoped_lock l{sdata->shard_lock};
+ f->open_object_section(queue_name);
+ sdata->scheduler->dump(*f);
+ f->close_section();
}
- };
+ }
bool is_shard_empty(uint32_t thread_index) override {
- uint32_t shard_index = thread_index % num_shards;
- ShardData* sdata = shard_list[shard_index];
- assert(NULL != sdata);
- Mutex::Locker l(sdata->sdata_op_ordering_lock);
- return sdata->pqueue->empty();
+ uint32_t shard_index = thread_index % osd->num_shards;
+ auto &&sdata = osd->shards[shard_index];
+ ceph_assert(sdata);
+ std::lock_guard l(sdata->shard_lock);
+ if (thread_index < osd->num_shards) {
+ return sdata->scheduler->empty() && sdata->context_queue.empty();
+ } else {
+ return sdata->scheduler->empty();
+ }
+ }
+
+ void handle_oncommits(std::list<Context*>& oncommits) {
+ for (auto p : oncommits) {
+ p->complete(0);
+ }
}
} op_shardedwq;
- void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch);
+ void enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch);
void dequeue_op(
PGRef pg, OpRequestRef op,
ThreadPool::TPHandle &handle);
- // -- peering queue --
- struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
- list<PG*> peering_queue;
- OSD *osd;
- set<PG*> in_use;
- PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
- : ThreadPool::BatchWorkQueue<PG>(
- "OSD::PeeringWQ", ti, si, tp), osd(o) {}
-
- void _dequeue(PG *pg) override {
- for (list<PG*>::iterator i = peering_queue.begin();
- i != peering_queue.end();
- ) {
- if (*i == pg) {
- peering_queue.erase(i++);
- pg->put("PeeringWQ");
- } else {
- ++i;
- }
- }
- }
- bool _enqueue(PG *pg) override {
- pg->get("PeeringWQ");
- peering_queue.push_back(pg);
- return true;
- }
- bool _empty() override {
- return peering_queue.empty();
- }
- void _dequeue(list<PG*> *out) override;
- void _process(
- const list<PG *> &pgs,
- ThreadPool::TPHandle &handle) override {
- assert(!pgs.empty());
- osd->process_peering_events(pgs, handle);
- for (list<PG *>::const_iterator i = pgs.begin();
- i != pgs.end();
- ++i) {
- (*i)->put("PeeringWQ");
- }
- }
- void _process_finish(const list<PG *> &pgs) override {
- for (list<PG*>::const_iterator i = pgs.begin();
- i != pgs.end();
- ++i) {
- in_use.erase(*i);
- }
- }
- void _clear() override {
- assert(peering_queue.empty());
- }
- } peering_wq;
-
- void process_peering_events(
- const list<PG*> &pg,
- ThreadPool::TPHandle &handle);
+ void enqueue_peering_evt(
+ spg_t pgid,
+ PGPeeringEventRef ref);
+ void dequeue_peering_evt(
+ OSDShard *sdata,
+ PG *pg,
+ PGPeeringEventRef ref,
+ ThreadPool::TPHandle& handle);
+
+ void dequeue_delete(
+ OSDShard *sdata,
+ PG *pg,
+ epoch_t epoch,
+ ThreadPool::TPHandle& handle);
friend class PG;
+ friend struct OSDShard;
friend class PrimaryLogPG;
+ friend class PgScrubber;
protected:
// -- osd map --
- OSDMapRef osdmap;
- OSDMapRef get_osdmap() {
- return osdmap;
+ // TODO: switch to std::atomic<OSDMapRef> when C++20 will be available.
+ OSDMapRef _osdmap;
+ void set_osdmap(OSDMapRef osdmap) {
+ std::atomic_store(&_osdmap, osdmap);
+ }
+ OSDMapRef get_osdmap() const {
+ return std::atomic_load(&_osdmap);
}
epoch_t get_osdmap_epoch() const {
+ // XXX: performance?
+ auto osdmap = get_osdmap();
return osdmap ? osdmap->get_epoch() : 0;
}
- utime_t had_map_since;
- RWLock map_lock;
- list<OpRequestRef> waiting_for_osdmap;
- deque<utime_t> osd_markdown_log;
+ pool_pg_num_history_t pg_num_history;
+
+ ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock");
+ std::list<OpRequestRef> waiting_for_osdmap;
+ std::deque<utime_t> osd_markdown_log;
friend struct send_map_on_destruct;
void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
void note_down_osd(int osd);
void note_up_osd(int osd);
- friend class C_OnMapCommit;
+ friend struct C_OnMapCommit;
bool advance_pg(
- epoch_t advance_to, PG *pg,
+ epoch_t advance_to,
+ PG *pg,
ThreadPool::TPHandle &handle,
- PG::RecoveryCtx *rctx,
- set<PGRef> *split_pgs
- );
+ PeeringCtx &rctx);
void consume_map();
void activate_map();
OSDMapRef add_map(OSDMap *o) {
return service.add_map(o);
}
- void add_map_bl(epoch_t e, bufferlist& bl) {
- return service.add_map_bl(e, bl);
- }
- void pin_map_bl(epoch_t e, bufferlist &bl) {
- return service.pin_map_bl(e, bl);
- }
- bool get_map_bl(epoch_t e, bufferlist& bl) {
+ bool get_map_bl(epoch_t e, ceph::buffer::list& bl) {
return service.get_map_bl(e, bl);
}
- void add_map_inc_bl(epoch_t e, bufferlist& bl) {
- return service.add_map_inc_bl(e, bl);
+
+public:
+ // -- shards --
+ std::vector<OSDShard*> shards;
+ uint32_t num_shards = 0;
+
+ void inc_num_pgs() {
+ ++num_pgs;
}
- void pin_map_inc_bl(epoch_t e, bufferlist &bl) {
- return service.pin_map_inc_bl(e, bl);
+ void dec_num_pgs() {
+ --num_pgs;
+ }
+ int get_num_pgs() const {
+ return num_pgs;
}
protected:
+ ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
+ /// merge epoch -> target pgid -> source pgid -> pg
+ std::map<epoch_t,std::map<spg_t,std::map<spg_t,PGRef>>> merge_waiters;
+
+ bool add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef source,
+ unsigned need);
+
// -- placement groups --
- RWLock pg_map_lock; // this lock orders *above* individual PG _locks
- ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
+ std::atomic<size_t> num_pgs = {0};
std::mutex pending_creates_lock;
- using create_from_osd_t = std::pair<pg_t, bool /* is primary*/>;
+ using create_from_osd_t = std::pair<spg_t, bool /* is primary*/>;
std::set<create_from_osd_t> pending_creates_from_osd;
unsigned pending_creates_from_mon = 0;
- map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
PGRecoveryStats pg_recovery_stats;
- PGPool _get_pool(int id, OSDMapRef createmap);
+ PGRef _lookup_pg(spg_t pgid);
+ PGRef _lookup_lock_pg(spg_t pgid);
+ void register_pg(PGRef pg);
+ bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
- PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
- PG *_lookup_lock_pg(spg_t pgid);
+ void _get_pgs(std::vector<PGRef> *v, bool clear_too=false);
+ void _get_pgids(std::vector<spg_t> *v);
public:
- PG *lookup_lock_pg(spg_t pgid);
+ PGRef lookup_lock_pg(spg_t pgid);
- int get_num_pgs() {
- RWLock::RLocker l(pg_map_lock);
- return pg_map.size();
- }
+ std::set<int64_t> get_mapped_pools();
protected:
- PG *_open_lock_pg(OSDMapRef createmap,
- spg_t pg, bool no_lockdep_check=false);
- enum res_result {
- RES_PARENT, // resurrected a parent
- RES_SELF, // resurrected self
- RES_NONE // nothing relevant deleting
- };
- res_result _try_resurrect_pg(
- OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state);
-
- PG *_create_lock_pg(
- OSDMapRef createmap,
- spg_t pgid,
- bool hold_map_lock,
- bool backfill,
- int role,
- vector<int>& up, int up_primary,
- vector<int>& acting, int acting_primary,
- pg_history_t history,
- const PastIntervals& pi,
- ObjectStore::Transaction& t);
-
PG* _make_pg(OSDMapRef createmap, spg_t pgid);
- void add_newly_split_pg(PG *pg,
- PG::RecoveryCtx *rctx);
- int handle_pg_peering_evt(
- spg_t pgid,
- const pg_history_t& orig_history,
- const PastIntervals& pi,
- epoch_t epoch,
- PG::CephPeeringEvtRef evt);
- bool maybe_wait_for_max_pg(spg_t pgid, bool is_mon_create);
+ bool maybe_wait_for_max_pg(const OSDMapRef& osdmap,
+ spg_t pgid, bool is_mon_create);
void resume_creating_pg();
void load_pgs();
- void build_past_intervals_parallel();
/// build initial pg history and intervals on create
void build_initial_pg_history(
pg_history_t *h,
PastIntervals *pi);
- /// project pg history from from to now
- bool project_pg_history(
- spg_t pgid, pg_history_t& h, epoch_t from,
- const vector<int>& lastup,
- int lastupprimary,
- const vector<int>& lastacting,
- int lastactingprimary
- ); ///< @return false if there was a map gap between from and now
-
- // this must be called with pg->lock held on any pg addition to pg_map
- void wake_pg_waiters(PGRef pg) {
- assert(pg->is_locked());
- op_shardedwq.wake_pg_waiters(pg->info.pgid);
- }
epoch_t last_pg_create_epoch;
void handle_pg_create(OpRequestRef op);
void split_pgs(
PG *parent,
- const set<spg_t> &childpgids, set<PGRef> *out_pgs,
+ const std::set<spg_t> &childpgids, std::set<PGRef> *out_pgs,
OSDMapRef curmap,
OSDMapRef nextmap,
- PG::RecoveryCtx *rctx);
+ PeeringCtx &rctx);
+ void _finish_splits(std::set<PGRef>& pgs);
// == monitor interaction ==
- Mutex mon_report_lock;
+ ceph::mutex mon_report_lock = ceph::make_mutex("OSD::mon_report_lock");
utime_t last_mon_report;
- utime_t last_pg_stats_sent;
-
- /* if our monitor dies, we want to notice it and reconnect.
- * So we keep track of when it last acked our stat updates,
- * and if too much time passes (and we've been sending
- * more updates) then we can call it dead and reconnect
- * elsewhere.
- */
- utime_t last_pg_stats_ack;
- float stats_ack_timeout;
- set<uint64_t> outstanding_pg_stats; // how many stat updates haven't been acked yet
+ Finisher boot_finisher;
// -- boot --
void start_boot();
void _got_mon_epochs(epoch_t oldest, epoch_t newest);
void _preboot(epoch_t oldest, epoch_t newest);
void _send_boot();
- void _collect_metadata(map<string,string> *pmeta);
+ void _collect_metadata(std::map<std::string,std::string> *pmeta);
+ void _get_purged_snaps();
+ void handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *r);
void start_waiting_for_healthy();
bool _is_healthy();
void send_full_update();
-
- friend struct C_OSD_GetVersion;
+
+ friend struct CB_OSD_GetVersion;
// -- alive --
epoch_t up_thru_wanted;
void got_full_map(epoch_t e);
// -- failures --
- map<int,utime_t> failure_queue;
- map<int,pair<utime_t,entity_inst_t> > failure_pending;
+ std::map<int,utime_t> failure_queue;
+ std::map<int,std::pair<utime_t,entity_addrvec_t> > failure_pending;
void requeue_failures();
void send_failures();
- void send_still_alive(epoch_t epoch, const entity_inst_t &i);
-
- // -- pg stats --
- Mutex pg_stat_queue_lock;
- Cond pg_stat_queue_cond;
- xlist<PG*> pg_stat_queue;
- bool osd_stat_updated;
- uint64_t pg_stat_tid, pg_stat_tid_flushed;
-
- void send_pg_stats(const utime_t &now);
- void handle_pg_stats_ack(class MPGStatsAck *ack);
- void flush_pg_stats();
+ void send_still_alive(epoch_t epoch, int osd, const entity_addrvec_t &addrs);
+ void cancel_pending_failures();
ceph::coarse_mono_clock::time_point last_sent_beacon;
- Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
+ ceph::mutex min_last_epoch_clean_lock = ceph::make_mutex("OSD::min_last_epoch_clean_lock");
epoch_t min_last_epoch_clean = 0;
// which pgs were scanned for min_lec
std::vector<pg_t> min_last_epoch_clean_pgs;
void send_beacon(const ceph::coarse_mono_clock::time_point& now);
- void pg_stat_queue_enqueue(PG *pg) {
- pg_stat_queue_lock.Lock();
- if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) {
- pg->get("pg_stat_queue");
- pg_stat_queue.push_back(&pg->stat_queue_item);
- }
- osd_stat_updated = true;
- pg_stat_queue_lock.Unlock();
- }
- void pg_stat_queue_dequeue(PG *pg) {
- pg_stat_queue_lock.Lock();
- if (pg->stat_queue_item.remove_myself())
- pg->put("pg_stat_queue");
- pg_stat_queue_lock.Unlock();
- }
- void clear_pg_stat_queue() {
- pg_stat_queue_lock.Lock();
- while (!pg_stat_queue.empty()) {
- PG *pg = pg_stat_queue.front();
- pg_stat_queue.pop_front();
- pg->put("pg_stat_queue");
- }
- pg_stat_queue_lock.Unlock();
- }
- void clear_outstanding_pg_stats(){
- Mutex::Locker l(pg_stat_queue_lock);
- outstanding_pg_stats.clear();
- }
-
ceph_tid_t get_tid() {
return service.get_tid();
}
// -- generic pg peering --
- PG::RecoveryCtx create_context();
- void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
+ void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
ThreadPool::TPHandle *handle = NULL);
- void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
- ThreadPool::TPHandle *handle = NULL);
- void do_notifies(map<int,
- vector<pair<pg_notify_t, PastIntervals> > >&
- notify_list,
- OSDMapRef map);
- void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
- OSDMapRef map);
- void do_infos(map<int,
- vector<pair<pg_notify_t, PastIntervals> > >& info_map,
- OSDMapRef map);
bool require_mon_peer(const Message *m);
bool require_mon_or_mgr_peer(const Message *m);
bool require_self_aliveness(const Message *m, epoch_t alive_since);
/**
* Verifies that the OSD who sent the given op has the same
- * address as in the given map.
+ * address as in the given std::map.
* @pre op was sent by an OSD using the cluster messenger
*/
- bool require_same_peer_instance(const Message *m, OSDMapRef& map,
+ bool require_same_peer_instance(const Message *m, const OSDMapRef& map,
bool is_fast_dispatch);
bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
bool is_fast_dispatch);
- void handle_pg_query(OpRequestRef op);
- void handle_pg_notify(OpRequestRef op);
- void handle_pg_log(OpRequestRef op);
- void handle_pg_info(OpRequestRef op);
- void handle_pg_trim(OpRequestRef op);
+ void handle_fast_pg_create(MOSDPGCreate2 *m);
+ void handle_pg_query_nopg(const MQuery& q);
+ void handle_fast_pg_notify(MOSDPGNotify *m);
+ void handle_pg_notify_nopg(const MNotifyRec& q);
+ void handle_fast_pg_info(MOSDPGInfo *m);
+ void handle_fast_pg_remove(MOSDPGRemove *m);
- void handle_pg_backfill_reserve(OpRequestRef op);
- void handle_pg_recovery_reserve(OpRequestRef op);
-
- void handle_force_recovery(Message *m);
+public:
+ // used by OSDShard
+ PGRef handle_pg_create_info(const OSDMapRef& osdmap, const PGCreateInfo *info);
+protected:
- void handle_pg_remove(OpRequestRef op);
- void _remove_pg(PG *pg);
+ void handle_fast_force_recovery(MOSDForceRecovery *m);
// -- commands --
- struct Command {
- vector<string> cmd;
- ceph_tid_t tid;
- bufferlist indata;
- ConnectionRef con;
-
- Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
- : cmd(c), tid(t), indata(bl), con(co) {}
- };
- list<Command*> command_queue;
- struct CommandWQ : public ThreadPool::WorkQueue<Command> {
- OSD *osd;
- CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
- : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
-
- bool _empty() override {
- return osd->command_queue.empty();
- }
- bool _enqueue(Command *c) override {
- osd->command_queue.push_back(c);
- return true;
- }
- void _dequeue(Command *pg) override {
- ceph_abort();
- }
- Command *_dequeue() override {
- if (osd->command_queue.empty())
- return NULL;
- Command *c = osd->command_queue.front();
- osd->command_queue.pop_front();
- return c;
- }
- void _process(Command *c, ThreadPool::TPHandle &) override {
- osd->osd_lock.Lock();
- if (osd->is_stopping()) {
- osd->osd_lock.Unlock();
- delete c;
- return;
- }
- osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
- osd->osd_lock.Unlock();
- delete c;
- }
- void _clear() override {
- while (!osd->command_queue.empty()) {
- Command *c = osd->command_queue.front();
- osd->command_queue.pop_front();
- delete c;
- }
- }
- } command_wq;
-
- void handle_command(class MMonCommand *m);
void handle_command(class MCommand *m);
- void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
+
// -- pg recovery --
void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
// -- scrubbing --
void sched_scrub();
+ void resched_all_scrubs();
bool scrub_random_backoff();
- bool scrub_load_below_threshold();
- bool scrub_time_permit(utime_t now);
-
- // -- removing --
- struct RemoveWQ :
- public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
- CephContext* cct;
- ObjectStore *&store;
- list<pair<PGRef, DeletingStateRef> > remove_queue;
- RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si,
- ThreadPool *tp)
- : ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >(
- "OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {}
-
- bool _empty() override {
- return remove_queue.empty();
- }
- void _enqueue(pair<PGRef, DeletingStateRef> item) override {
- remove_queue.push_back(item);
- }
- void _enqueue_front(pair<PGRef, DeletingStateRef> item) override {
- remove_queue.push_front(item);
- }
- bool _dequeue(pair<PGRef, DeletingStateRef> item) {
- ceph_abort();
- }
- pair<PGRef, DeletingStateRef> _dequeue() override {
- assert(!remove_queue.empty());
- pair<PGRef, DeletingStateRef> item = remove_queue.front();
- remove_queue.pop_front();
- return item;
- }
- void _process(pair<PGRef, DeletingStateRef>,
- ThreadPool::TPHandle &) override;
- void _clear() override {
- remove_queue.clear();
- }
- int get_remove_queue_len() {
- lock();
- int r = remove_queue.size();
- unlock();
- return r;
- }
- } remove_wq;
// -- status reporting --
MPGStats *collect_pg_stats();
- std::vector<OSDHealthMetric> get_health_metrics();
+ std::vector<DaemonHealthMetric> get_health_metrics();
+
private:
bool ms_can_fast_dispatch_any() const override { return true; }
bool ms_can_fast_dispatch(const Message *m) const override {
switch (m->get_type()) {
+ case CEPH_MSG_PING:
case CEPH_MSG_OSD_OP:
case CEPH_MSG_OSD_BACKOFF:
- case MSG_OSD_SUBOP:
+ case MSG_OSD_SCRUB2:
+ case MSG_OSD_FORCE_RECOVERY:
+ case MSG_MON_COMMAND:
+ case MSG_OSD_PG_CREATE2:
+ case MSG_OSD_PG_QUERY:
+ case MSG_OSD_PG_QUERY2:
+ case MSG_OSD_PG_INFO:
+ case MSG_OSD_PG_INFO2:
+ case MSG_OSD_PG_NOTIFY:
+ case MSG_OSD_PG_NOTIFY2:
+ case MSG_OSD_PG_LOG:
+ case MSG_OSD_PG_TRIM:
+ case MSG_OSD_PG_REMOVE:
+ case MSG_OSD_BACKFILL_RESERVE:
+ case MSG_OSD_RECOVERY_RESERVE:
case MSG_OSD_REPOP:
- case MSG_OSD_SUBOPREPLY:
case MSG_OSD_REPOPREPLY:
case MSG_OSD_PG_PUSH:
case MSG_OSD_PG_PULL:
case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
case MSG_OSD_PG_RECOVERY_DELETE:
case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+ case MSG_OSD_PG_LEASE:
+ case MSG_OSD_PG_LEASE_ACK:
return true;
default:
return false;
}
}
void ms_fast_dispatch(Message *m) override;
- void ms_fast_preprocess(Message *m) override;
bool ms_dispatch(Message *m) override;
- bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) override;
- bool ms_verify_authorizer(Connection *con, int peer_type,
- int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
- bool& isvalid, CryptoKey& session_key,
- std::unique_ptr<AuthAuthorizerChallenge> *challenge) override;
void ms_handle_connect(Connection *con) override;
void ms_handle_fast_connect(Connection *con) override;
void ms_handle_fast_accept(Connection *con) override;
+ int ms_handle_authentication(Connection *con) override;
bool ms_handle_reset(Connection *con) override;
void ms_handle_remote_reset(Connection *con) override {}
bool ms_handle_refused(Connection *con) override;
- io_queue get_io_queue() const {
- if (cct->_conf->osd_op_queue == "debug_random") {
- static io_queue index_lookup[] = { io_queue::prioritized,
- io_queue::weightedpriority,
- io_queue::mclock_opclass,
- io_queue::mclock_client };
- srand(time(NULL));
- unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
- return index_lookup[which];
- } else if (cct->_conf->osd_op_queue == "prioritized") {
- return io_queue::prioritized;
- } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
- return io_queue::mclock_opclass;
- } else if (cct->_conf->osd_op_queue == "mclock_client") {
- return io_queue::mclock_client;
- } else {
- // default / catch-all is 'wpq'
- return io_queue::weightedpriority;
- }
- }
-
- unsigned int get_io_prio_cut() const {
- if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
- srand(time(NULL));
- return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
- } else if (cct->_conf->osd_op_queue_cut_off == "high") {
- return CEPH_MSG_PRIO_HIGH;
- } else {
- // default / catch-all is 'low'
- return CEPH_MSG_PRIO_LOW;
- }
- }
-
public:
/* internal and external can point to the same messenger, they will still
* be cleaned up properly*/
OSD(CephContext *cct_,
- ObjectStore *store_,
+ std::unique_ptr<ObjectStore> store_,
int id,
Messenger *internal,
Messenger *external,
Messenger *hb_front_server,
Messenger *hb_back_server,
Messenger *osdc_messenger,
- MonClient *mc, const std::string &dev, const std::string &jdev);
+ MonClient *mc, const std::string &dev, const std::string &jdev,
+ ceph::async::io_context_pool& poolctx);
~OSD() override;
// static bits
- static int mkfs(CephContext *cct, ObjectStore *store,
- const string& dev,
- uuid_d fsid, int whoami);
- /* remove any non-user xattrs from a map of them */
- void filter_xattrs(map<string, bufferptr>& attrs) {
- for (map<string, bufferptr>::iterator iter = attrs.begin();
+ static int mkfs(CephContext *cct,
+ std::unique_ptr<ObjectStore> store,
+ uuid_d fsid,
+ int whoami,
+ std::string osdspec_affinity);
+
+ /* remove any non-user xattrs from a std::map of them */
+ void filter_xattrs(std::map<std::string, ceph::buffer::ptr>& attrs) {
+ for (std::map<std::string, ceph::buffer::ptr>::iterator iter = attrs.begin();
iter != attrs.end();
) {
if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
}
private:
- int mon_cmd_maybe_osd_create(string &cmd);
+ int mon_cmd_maybe_osd_create(std::string &cmd);
int update_crush_device_class();
int update_crush_location();
static int write_meta(CephContext *cct,
ObjectStore *store,
- uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
+ uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami, std::string& osdspec_affinity);
- void handle_pg_scrub(struct MOSDScrub *m, PG* pg);
- void handle_scrub(struct MOSDScrub *m);
+ void handle_scrub(class MOSDScrub *m);
+ void handle_fast_scrub(class MOSDScrub2 *m);
void handle_osd_ping(class MOSDPing *m);
- int init_op_flags(OpRequestRef& op);
-
+ size_t get_num_cache_shards();
int get_num_op_shards();
int get_num_op_threads();
float get_osd_recovery_sleep();
+ float get_osd_delete_sleep();
+ float get_osd_snap_trim_sleep();
+
+ int get_recovery_max_active();
+ void maybe_override_max_osd_capacity_for_qos();
+ bool maybe_override_options_for_qos();
+ int run_osd_bench_test(int64_t count,
+ int64_t bsize,
+ int64_t osize,
+ int64_t onum,
+ double *elapsed,
+ std::ostream& ss);
+ int mon_cmd_set_config(const std::string &key, const std::string &val);
+ bool unsupported_objstore_for_qos();
+
+ void scrub_purged_snaps();
+ void probe_smart(const std::string& devid, std::ostream& ss);
public:
- static int peek_meta(ObjectStore *store, string& magic,
- uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami);
-
+ static int peek_meta(ObjectStore *store,
+ std::string *magic,
+ uuid_d *cluster_fsid,
+ uuid_d *osd_fsid,
+ int *whoami,
+ ceph_release_t *min_osd_release);
+
// startup/shutdown
int pre_init();
void final_init();
int enable_disable_fuse(bool stop);
+ int set_numa_affinity();
void suicide(int exitcode);
int shutdown();
public:
OSDService service;
friend class OSDService;
-};
+private:
+ void set_perf_queries(const ConfigPayload &config_payload);
+ MetricPayload get_perf_reports();
-std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+ ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
+ std::list<OSDPerfMetricQuery> m_perf_queries;
+ std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
+};
//compatibility of the executable