#include "msg/Dispatcher.h"
-#include "common/Mutex.h"
-#include "common/RWLock.h"
#include "common/Timer.h"
#include "common/WorkQueue.h"
#include "common/AsyncReserver.h"
#include "common/ceph_context.h"
#include "common/config_cacher.h"
#include "common/zipkin_trace.h"
+#include "common/ceph_timer.h"
#include "mgr/MgrClient.h"
#include "osd/ClassHandler.h"
#include "include/CompatSet.h"
+#include "include/common_fwd.h"
#include "OpRequest.h"
#include "Session.h"
-#include "osd/OpQueueItem.h"
+#include "osd/scheduler/OpScheduler.h"
#include <atomic>
#include <map>
#include "common/sharedptr_registry.hpp"
#include "common/WeightedPriorityQueue.h"
#include "common/PrioritizedQueue.h"
-#include "osd/mClockOpClassQueue.h"
-#include "osd/mClockClientQueue.h"
#include "messages/MOSDOp.h"
#include "common/EventTrace.h"
+#include "osd/osd_perf_counters.h"
#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
*/
-enum {
- l_osd_first = 10000,
- l_osd_op_wip,
- l_osd_op,
- l_osd_op_inb,
- l_osd_op_outb,
- l_osd_op_lat,
- l_osd_op_process_lat,
- l_osd_op_prepare_lat,
- l_osd_op_r,
- l_osd_op_r_outb,
- l_osd_op_r_lat,
- l_osd_op_r_lat_outb_hist,
- l_osd_op_r_process_lat,
- l_osd_op_r_prepare_lat,
- l_osd_op_w,
- l_osd_op_w_inb,
- l_osd_op_w_lat,
- l_osd_op_w_lat_inb_hist,
- l_osd_op_w_process_lat,
- l_osd_op_w_prepare_lat,
- l_osd_op_rw,
- l_osd_op_rw_inb,
- l_osd_op_rw_outb,
- l_osd_op_rw_lat,
- l_osd_op_rw_lat_inb_hist,
- l_osd_op_rw_lat_outb_hist,
- l_osd_op_rw_process_lat,
- l_osd_op_rw_prepare_lat,
-
- l_osd_op_before_queue_op_lat,
- l_osd_op_before_dequeue_op_lat,
-
- l_osd_sop,
- l_osd_sop_inb,
- l_osd_sop_lat,
- l_osd_sop_w,
- l_osd_sop_w_inb,
- l_osd_sop_w_lat,
- l_osd_sop_pull,
- l_osd_sop_pull_lat,
- l_osd_sop_push,
- l_osd_sop_push_inb,
- l_osd_sop_push_lat,
-
- l_osd_pull,
- l_osd_push,
- l_osd_push_outb,
-
- l_osd_rop,
- l_osd_rbytes,
-
- l_osd_loadavg,
- l_osd_cached_crc,
- l_osd_cached_crc_adjusted,
- l_osd_missed_crc,
-
- l_osd_pg,
- l_osd_pg_primary,
- l_osd_pg_replica,
- l_osd_pg_stray,
- l_osd_pg_removing,
- l_osd_hb_to,
- l_osd_map,
- l_osd_mape,
- l_osd_mape_dup,
-
- l_osd_waiting_for_map,
-
- l_osd_map_cache_hit,
- l_osd_map_cache_miss,
- l_osd_map_cache_miss_low,
- l_osd_map_cache_miss_low_avg,
- l_osd_map_bl_cache_hit,
- l_osd_map_bl_cache_miss,
-
- l_osd_stat_bytes,
- l_osd_stat_bytes_used,
- l_osd_stat_bytes_avail,
-
- l_osd_copyfrom,
-
- l_osd_tier_promote,
- l_osd_tier_flush,
- l_osd_tier_flush_fail,
- l_osd_tier_try_flush,
- l_osd_tier_try_flush_fail,
- l_osd_tier_evict,
- l_osd_tier_whiteout,
- l_osd_tier_dirty,
- l_osd_tier_clean,
- l_osd_tier_delay,
- l_osd_tier_proxy_read,
- l_osd_tier_proxy_write,
-
- l_osd_agent_wake,
- l_osd_agent_skip,
- l_osd_agent_flush,
- l_osd_agent_evict,
-
- l_osd_object_ctx_cache_hit,
- l_osd_object_ctx_cache_total,
-
- l_osd_op_cache_hit,
- l_osd_tier_flush_lat,
- l_osd_tier_promote_lat,
- l_osd_tier_r_lat,
-
- l_osd_pg_info,
- l_osd_pg_fastinfo,
- l_osd_pg_biginfo,
-
- l_osd_last,
-};
-
-// RecoveryState perf counters
-enum {
- rs_first = 20000,
- rs_initial_latency,
- rs_started_latency,
- rs_reset_latency,
- rs_start_latency,
- rs_primary_latency,
- rs_peering_latency,
- rs_backfilling_latency,
- rs_waitremotebackfillreserved_latency,
- rs_waitlocalbackfillreserved_latency,
- rs_notbackfilling_latency,
- rs_repnotrecovering_latency,
- rs_repwaitrecoveryreserved_latency,
- rs_repwaitbackfillreserved_latency,
- rs_reprecovering_latency,
- rs_activating_latency,
- rs_waitlocalrecoveryreserved_latency,
- rs_waitremoterecoveryreserved_latency,
- rs_recovering_latency,
- rs_recovered_latency,
- rs_clean_latency,
- rs_active_latency,
- rs_replicaactive_latency,
- rs_stray_latency,
- rs_getinfo_latency,
- rs_getlog_latency,
- rs_waitactingchange_latency,
- rs_incomplete_latency,
- rs_down_latency,
- rs_getmissing_latency,
- rs_waitupthru_latency,
- rs_notrecovering_latency,
- rs_last,
-};
-
class Messenger;
class Message;
class MonClient;
-class PerfCounters;
class ObjectStore;
class FuseStore;
class OSDMap;
struct C_FinishSplits;
struct C_OpenPGs;
class LogChannel;
-class CephContext;
class MOSDOp;
class MOSDPGCreate2;
class MOSDPGInfo;
class MOSDPGRemove;
class MOSDForceRecovery;
+class MMonGetPurgedSnapsReply;
class OSD;
class OSDService {
+ using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
public:
OSD *osd;
CephContext *cct;
PerfCounters *&logger;
PerfCounters *&recoverystate_perf;
MonClient *&monc;
- ClassHandler *&class_handler;
md_config_cacher_t<Option::size_t> osd_max_object_size;
md_config_cacher_t<bool> osd_skip_data_digest;
- void enqueue_back(OpQueueItem&& qi);
- void enqueue_front(OpQueueItem&& qi);
+ void enqueue_back(OpSchedulerItem&& qi);
+ void enqueue_front(OpSchedulerItem&& qi);
void maybe_inject_dispatch_delay() {
if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
}
}
+ ceph::signedspan get_mnow();
+
private:
// -- superblock --
ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
private:
OSDMapRef next_osdmap;
ceph::condition_variable pre_publish_cond;
+ int pre_publish_waiter = 0;
public:
void pre_publish_map(OSDMapRef map) {
if (--(i->second) == 0) {
map_reservations.erase(i);
}
- pre_publish_cond.notify_all();
+ if (pre_publish_waiter) {
+ pre_publish_cond.notify_all();
+ }
}
/// blocks until there are no reserved maps prior to next_osdmap
void await_reserved_maps() {
std::unique_lock l{pre_publish_lock};
ceph_assert(next_osdmap);
+ pre_publish_waiter++;
pre_publish_cond.wait(l, [this] {
auto i = map_reservations.cbegin();
return (i == map_reservations.cend() ||
i->first >= next_osdmap->get_epoch());
});
+ pre_publish_waiter--;
}
OSDMapRef get_next_osdmap() {
std::lock_guard l(pre_publish_lock);
return next_osdmap;
}
-private:
- Mutex peer_map_epoch_lock;
- map<int, epoch_t> peer_map_epoch;
-public:
- epoch_t get_peer_epoch(int p);
- epoch_t note_peer_epoch(int p, epoch_t e);
- void forget_peer_epoch(int p, epoch_t e);
+ void maybe_share_map(Connection *con,
+ const OSDMapRef& osdmap,
+ epoch_t peer_epoch_lb=0);
void send_map(class MOSDMap *m, Connection *con);
- void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
+ void send_incremental_map(epoch_t since, Connection *con,
+ const OSDMapRef& osdmap);
MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
OSDSuperblock& superblock);
- bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
- const OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
- void share_map(entity_name_t name, Connection *con, epoch_t epoch,
- OSDMapRef& osdmap, epoch_t *sent_epoch_p);
- void share_map_peer(int peer, Connection *con,
- OSDMapRef map = OSDMapRef());
ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
+ void send_message_osd_cluster(std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch);
void send_message_osd_cluster(Message *m, Connection *con) {
con->send_message(m);
}
void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
con->send_message(m);
}
- void send_message_osd_client(Message *m, Connection *con) {
- con->send_message(m);
- }
void send_message_osd_client(Message *m, const ConnectionRef& con) {
con->send_message(m);
}
private:
// -- scrub scheduling --
- Mutex sched_scrub_lock;
+ ceph::mutex sched_scrub_lock = ceph::make_mutex("OSDService::sched_scrub_lock");
int scrubs_local;
int scrubs_remote;
void dump_scrub_reservations(Formatter *f);
void reply_op_error(OpRequestRef op, int err);
- void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
+ void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
+ vector<pg_log_op_return_item_t> op_returns);
void handle_misdirected_op(PG *pg, OpRequestRef op);
private:
// -- agent shared state --
- Mutex agent_lock;
- Cond agent_cond;
+ ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
+ ceph::condition_variable agent_cond;
map<uint64_t, set<PGRef> > agent_queue;
set<PGRef>::iterator agent_queue_pos;
bool agent_valid_iterator;
}
} agent_thread;
bool agent_stop_flag;
- Mutex agent_timer_lock;
+ ceph::mutex agent_timer_lock = ceph::make_mutex("OSDService::agent_timer_lock");
SafeTimer agent_timer;
public:
agent_valid_iterator = false; // inserting higher-priority queue
set<PGRef>& nq = agent_queue[priority];
if (nq.empty())
- agent_cond.Signal();
+ agent_cond.notify_all();
nq.insert(pg);
}
std::lock_guard l(agent_lock);
ceph_assert(agent_ops > 0);
--agent_ops;
- agent_cond.Signal();
+ agent_cond.notify_all();
}
/// note start of an async (flush) op
--agent_ops;
ceph_assert(agent_oids.count(oid) == 1);
agent_oids.erase(oid);
- agent_cond.Signal();
+ agent_cond.notify_all();
}
/// check if we are operating on an object
promote_counter.finish(bytes);
}
void promote_throttle_recalibrate();
+ unsigned get_num_shards() const {
+ return m_objecter_finishers;
+ }
+ Finisher* get_objecter_finisher(int shard) {
+ return objecter_finishers[shard].get();
+ }
// -- Objecter, for tiering reads/writes from/to other OSDs --
- Objecter *objecter;
+ std::unique_ptr<Objecter> objecter;
int m_objecter_finishers;
- vector<Finisher*> objecter_finishers;
+ std::vector<std::unique_ptr<Finisher>> objecter_finishers;
// -- Watch --
- Mutex watch_lock;
+ ceph::mutex watch_lock = ceph::make_mutex("OSDService::watch_lock");
SafeTimer watch_timer;
uint64_t next_notif_id;
uint64_t get_next_id(epoch_t cur_epoch) {
}
// -- Recovery/Backfill Request Scheduling --
- Mutex recovery_request_lock;
+ ceph::mutex recovery_request_lock = ceph::make_mutex("OSDService::recovery_request_lock");
SafeTimer recovery_request_timer;
// For async recovery sleep
bool recovery_needs_sleep = true;
- utime_t recovery_schedule_time = utime_t();
+ ceph::real_clock::time_point recovery_schedule_time;
// For recovery & scrub & snap
- Mutex sleep_lock;
+ ceph::mutex sleep_lock = ceph::make_mutex("OSDService::sleep_lock");
SafeTimer sleep_timer;
// -- tids --
AsyncReserver<spg_t> remote_reserver;
// -- pg merge --
- Mutex merge_lock = {"OSD::merge_lock"};
+ ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
map<pg_t,eversion_t> ready_to_merge_source; // pg -> version
map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target; // pg -> (version,les,lec)
set<pg_t> not_ready_to_merge_source;
void send_ready_to_merge();
void _send_ready_to_merge();
void clear_sent_ready_to_merge();
- void prune_sent_ready_to_merge(OSDMapRef& osdmap);
+ void prune_sent_ready_to_merge(const OSDMapRef& osdmap);
// -- pg_temp --
private:
- Mutex pg_temp_lock;
+ ceph::mutex pg_temp_lock = ceph::make_mutex("OSDService::pg_temp_lock");
struct pg_temp_t {
vector<int> acting;
bool forced = false;
private:
// -- pg recovery and associated throttling --
- Mutex recovery_lock;
+ ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock");
list<pair<epoch_t, PGRef> > awaiting_throttle;
utime_t defer_recovery_until;
return awaiting.second.get() == pg;
});
}
+
+ unsigned get_target_pg_log_entries() const;
+
// delayed pg activation
void queue_for_recovery(PG *pg) {
std::lock_guard l(recovery_lock);
_queue_for_recovery(make_pair(queued, pg), reserved_pushes);
}
+ void queue_check_readable(spg_t spgid,
+ epoch_t lpr,
+ ceph::signedspan delay = ceph::signedspan::zero());
+
// osd map cache (past osd maps)
- Mutex map_cache_lock;
+ ceph::mutex map_cache_lock = ceph::make_mutex("OSDService::map_cache_lock");
SharedLRU<epoch_t, const OSDMap> map_cache;
SimpleLRU<epoch_t, bufferlist> map_bl_cache;
SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
- /// final pg_num values for recently deleted pools
- map<int64_t,int> deleted_pool_pg_nums;
-
OSDMapRef try_get_map(epoch_t e);
OSDMapRef get_map(epoch_t e) {
OSDMapRef ret(try_get_map(e));
}
OSDMapRef _add_map(OSDMap *o);
- void add_map_bl(epoch_t e, bufferlist& bl) {
- std::lock_guard l(map_cache_lock);
- return _add_map_bl(e, bl);
- }
void _add_map_bl(epoch_t e, bufferlist& bl);
bool get_map_bl(epoch_t e, bufferlist& bl) {
std::lock_guard l(map_cache_lock);
}
bool _get_map_bl(epoch_t e, bufferlist& bl);
- void add_map_inc_bl(epoch_t e, bufferlist& bl) {
- std::lock_guard l(map_cache_lock);
- return _add_map_inc_bl(e, bl);
- }
void _add_map_inc_bl(epoch_t e, bufferlist& bl);
bool get_inc_map_bl(epoch_t e, bufferlist& bl);
- /// get last pg_num before a pool was deleted (if any)
- int get_deleted_pool_pg_num(int64_t pool);
-
- void store_deleted_pool_pg_num(int64_t pool, int pg_num) {
- std::lock_guard l(map_cache_lock);
- deleted_pool_pg_nums[pool] = pg_num;
- }
-
- /// get pgnum from newmap or, if pool was deleted, last map pool existed in
- int get_possibly_deleted_pool_pg_num(OSDMapRef newmap,
- int64_t pool) {
- if (newmap->have_pg_pool(pool)) {
- return newmap->get_pg_num(pool);
- }
- return get_deleted_pool_pg_num(pool);
- }
-
/// identify split child pgids over a osdmap interval
void identify_splits_and_merges(
OSDMapRef old_map,
void shutdown();
// -- stats --
- Mutex stat_lock;
+ ceph::mutex stat_lock = ceph::make_mutex("OSDService::stat_lock");
osd_stat_t osd_stat;
uint32_t seq = 0;
// -- OSD Full Status --
private:
friend TestOpsSocketHook;
- mutable Mutex full_status_lock;
+ mutable ceph::mutex full_status_lock = ceph::make_mutex("OSDService::full_status_lock");
enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
const char *get_full_state_name(s_names s) const {
switch (s) {
bool is_nearfull() const;
bool need_fullness_update(); ///< osdmap state needs update
void set_injectfull(s_names type, int64_t count);
- bool check_osdmap_full(const set<pg_shard_t> &missing_on);
// -- epochs --
private:
- mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
+ // protects access to boot_epoch, up_epoch, bind_epoch
+ mutable ceph::mutex epoch_lock = ceph::make_mutex("OSDService::epoch_lock");
epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
epoch_t up_epoch; // _most_recent_ epoch we were marked up
epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
void request_osdmap_update(epoch_t e);
+ // -- heartbeats --
+ ceph::mutex hb_stamp_lock = ceph::make_mutex("OSDServce::hb_stamp_lock");
+
+ /// osd -> heartbeat stamps
+ vector<HeartbeatStampsRef> hb_stamps;
+
+ /// get or create a ref for a peer's HeartbeatStamps
+ HeartbeatStampsRef get_hb_stamps(unsigned osd);
+
+
+ // Timer for readable leases
+ ceph::timer<ceph::mono_clock> mono_timer = ceph::timer<ceph::mono_clock>{ceph::construct_suspended};
+
+ void queue_renew_lease(epoch_t epoch, spg_t spgid);
+
// -- stopping --
- Mutex is_stopping_lock;
- Cond is_stopping_cond;
+ ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock");
+ ceph::condition_variable is_stopping_cond;
enum {
NOT_STOPPING,
PREPARING_TO_STOP,
#ifdef PG_DEBUG_REFS
- Mutex pgid_lock;
+ ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
map<spg_t, int> pgid_tracker;
map<spg_t, PG*> live_pgs;
void add_pgid(spg_t pgid, PG *pg);
#endif
explicit OSDService(OSD *osd);
- ~OSDService();
+ ~OSDService() = default;
};
-
-enum class io_queue {
- prioritized,
- weightedpriority,
- mclock_opclass,
- mclock_client,
-};
-
-
/*
Each PG slot includes queues for events that are processing and/or waiting
don't affect the given PG.)
- we maintain two separate wait lists, *waiting* and *waiting_peering*. The
- OpQueueItem has an is_peering() bool to determine which we use. Waiting
+ OpSchedulerItem has an is_peering() bool to determine which we use. Waiting
peering events are queued up by epoch required.
- when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
*/
struct OSDShardPGSlot {
+ using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
PGRef pg; ///< pg reference
- deque<OpQueueItem> to_process; ///< order items for this slot
+ deque<OpSchedulerItem> to_process; ///< order items for this slot
int num_running = 0; ///< _process threads doing pg lookup/lock
- deque<OpQueueItem> waiting; ///< waiting for pg (or map + pg)
+ deque<OpSchedulerItem> waiting; ///< waiting for pg (or map + pg)
/// waiting for map (peering evt)
- map<epoch_t,deque<OpQueueItem>> waiting_peering;
+ map<epoch_t,deque<OpSchedulerItem>> waiting_peering;
/// incremented by wake_pg_waiters; indicates racing _process threads
/// should bail out (their op has been requeued)
ceph::mutex sdata_wait_lock;
ceph::condition_variable sdata_cond;
- string osdmap_lock_name;
ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock
OSDMapRef shard_osdmap;
ceph::mutex shard_lock; ///< protects remaining members below
/// map of slots for each spg_t. maintains ordering of items dequeued
- /// from pqueue while _process thread drops shard lock to acquire the
+ /// from scheduler while _process thread drops shard lock to acquire the
/// pg lock. stale slots are removed by consume_map.
unordered_map<spg_t,unique_ptr<OSDShardPGSlot>> pg_slots;
ceph::condition_variable min_pg_epoch_cond;
/// priority queue
- std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
+ ceph::osd::scheduler::OpSchedulerRef scheduler;
bool stop_waiting = false;
ContextQueue context_queue;
- void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
- unsigned priority = item.get_priority();
- unsigned cost = item.get_cost();
- if (priority >= cutoff)
- pqueue->enqueue_strict_front(
- item.get_owner(),
- priority, std::move(item));
- else
- pqueue->enqueue_front(
- item.get_owner(),
- priority, cost, std::move(item));
- }
-
void _attach_pg(OSDShardPGSlot *slot, PG *pg);
void _detach_pg(OSDShardPGSlot *slot);
/// push osdmap into shard
void consume_map(
- OSDMapRef& osdmap,
+ const OSDMapRef& osdmap,
unsigned *pushes_to_free);
void _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
OSDShard(
int id,
CephContext *cct,
- OSD *osd,
- uint64_t max_tok_per_prio, uint64_t min_cost,
- io_queue opqueue)
- : shard_id(id),
- cct(cct),
- osd(osd),
- shard_name(string("OSDShard.") + stringify(id)),
- sdata_wait_lock_name(shard_name + "::sdata_wait_lock"),
- sdata_wait_lock{make_mutex(sdata_wait_lock_name)},
- osdmap_lock_name(shard_name + "::osdmap_lock"),
- osdmap_lock{make_mutex(osdmap_lock_name)},
- shard_lock_name(shard_name + "::shard_lock"),
- shard_lock{make_mutex(shard_lock_name)},
- context_queue(sdata_wait_lock, sdata_cond) {
- if (opqueue == io_queue::weightedpriority) {
- pqueue = std::make_unique<
- WeightedPriorityQueue<OpQueueItem,uint64_t>>(
- max_tok_per_prio, min_cost);
- } else if (opqueue == io_queue::prioritized) {
- pqueue = std::make_unique<
- PrioritizedQueue<OpQueueItem,uint64_t>>(
- max_tok_per_prio, min_cost);
- } else if (opqueue == io_queue::mclock_opclass) {
- pqueue = std::make_unique<ceph::mClockOpClassQueue>(cct);
- } else if (opqueue == io_queue::mclock_client) {
- pqueue = std::make_unique<ceph::mClockClientQueue>(cct);
- }
- }
+ OSD *osd);
};
class OSD : public Dispatcher,
public md_config_obs_t {
+ using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
+
/** OSD **/
- Mutex osd_lock; // global lock
+ // global lock
+ ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
SafeTimer tick_timer; // safe timer (osd_lock)
// Tick timer for those stuff that do not need osd_lock
- Mutex tick_timer_lock;
+ ceph::mutex tick_timer_lock = ceph::make_mutex("OSD::tick_timer_lock");
SafeTimer tick_timer_without_osd_lock;
std::string gss_ktfile_client{};
int whoami;
std::string dev_path, journal_path;
- int last_require_osd_release = 0;
+ ceph_release_t last_require_osd_release{ceph_release_t::unknown};
int numa_node = -1;
size_t numa_cpu_set_size = 0;
// asok
friend class OSDSocketHook;
class OSDSocketHook *asok_hook;
- bool asok_command(std::string_view admin_command, const cmdmap_t& cmdmap,
- std::string_view format, std::ostream& ss);
+ void asok_command(
+ std::string_view prefix,
+ const cmdmap_t& cmdmap,
+ Formatter *f,
+ const bufferlist& inbl,
+ std::function<void(int,const std::string&,bufferlist&)> on_finish);
public:
- ClassHandler *class_handler = nullptr;
int get_nodeid() { return whoami; }
static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
object_t("snapmapper"),
0)));
}
+ static ghobject_t make_purged_snaps_oid() {
+ return ghobject_t(hobject_t(
+ sobject_t(
+ object_t("purged_snaps"),
+ 0)));
+ }
static ghobject_t make_pg_log_oid(spg_t pg) {
stringstream ss;
private:
ShardedThreadPool osd_op_tp;
- ThreadPool command_tp;
void get_latest_osdmap();
// -- sessions --
private:
- void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap);
- void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
+ void dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap);
- Mutex session_waiting_lock;
- set<SessionRef> session_waiting_for_map;
+ ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock");
+ set<ceph::ref_t<Session>> session_waiting_for_map;
/// Caller assumes refs for included Sessions
- void get_sessions_waiting_for_map(set<SessionRef> *out) {
+ void get_sessions_waiting_for_map(set<ceph::ref_t<Session>> *out) {
std::lock_guard l(session_waiting_lock);
out->swap(session_waiting_for_map);
}
- void register_session_waiting_on_map(SessionRef session) {
+ void register_session_waiting_on_map(const ceph::ref_t<Session>& session) {
std::lock_guard l(session_waiting_lock);
session_waiting_for_map.insert(session);
}
- void clear_session_waiting_on_map(SessionRef session) {
+ void clear_session_waiting_on_map(const ceph::ref_t<Session>& session) {
std::lock_guard l(session_waiting_lock);
session_waiting_for_map.erase(session);
}
void dispatch_sessions_waiting_on_map() {
- set<SessionRef> sessions_to_check;
+ set<ceph::ref_t<Session>> sessions_to_check;
get_sessions_waiting_for_map(&sessions_to_check);
for (auto i = sessions_to_check.begin();
i != sessions_to_check.end();
sessions_to_check.erase(i++)) {
std::lock_guard l{(*i)->session_dispatch_lock};
- SessionRef session = *i;
- dispatch_session_waiting(session, osdmap);
+ dispatch_session_waiting(*i, get_osdmap());
}
}
- void session_handle_reset(SessionRef session) {
+ void session_handle_reset(const ceph::ref_t<Session>& session) {
std::lock_guard l(session->session_dispatch_lock);
clear_session_waiting_on_map(session);
void osdmap_subscribe(version_t epoch, bool force_request);
/** @} monc helpers */
- Mutex osdmap_subscribe_lock;
+ ceph::mutex osdmap_subscribe_lock = ceph::make_mutex("OSD::osdmap_subscribe_lock");
epoch_t latest_subscribed_epoch{0};
// -- heartbeat --
}
return !is_unhealthy(now);
}
+
+ void clear_mark_down(Connection *except = nullptr) {
+ if (con_back && con_back != except) {
+ con_back->mark_down();
+ con_back->clear_priv();
+ con_back.reset(nullptr);
+ }
+ if (con_front && con_front != except) {
+ con_front->mark_down();
+ con_front->clear_priv();
+ con_front.reset(nullptr);
+ }
+ }
};
- /// state attached to outgoing heartbeat connections
- struct HeartbeatSession : public RefCountedObject {
- int peer;
- explicit HeartbeatSession(int p) : peer(p) {}
- };
- Mutex heartbeat_lock;
+
+ ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
map<int, int> debug_heartbeat_drops_remaining;
- Cond heartbeat_cond;
+ ceph::condition_variable heartbeat_cond;
bool heartbeat_stop;
std::atomic<bool> heartbeat_need_update;
map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
Messenger *hb_back_server_messenger;
utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
double daily_loadavg;
+ ceph::mono_time startup_time;
// Track ping repsonse times using vector as a circular buffer
// MUST BE A POWER OF 2
void heartbeat_kick() {
std::lock_guard l(heartbeat_lock);
- heartbeat_cond.Signal();
+ heartbeat_cond.notify_all();
}
struct T_Heartbeat : public Thread {
int ms_handle_authentication(Connection *con) override {
return true;
}
- bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer) override {
- // some pre-nautilus OSDs get confused if you include an
- // authorizer but they are not expecting it. do not try to authorize
- // heartbeat connections until all OSDs are nautilus.
- if (osd->get_osdmap()->require_osd_release >= CEPH_RELEASE_NAUTILUS) {
- return osd->ms_get_authorizer(dest_type, authorizer);
- }
- return false;
- }
- KeyStore *ms_get_auth1_authorizer_keystore() override {
- return osd->ms_get_auth1_authorizer_keystore();
- }
} heartbeat_dispatcher;
private:
list<OpRequestRef> finished;
void take_waiters(list<OpRequestRef>& ls) {
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
finished.splice(finished.end(), ls);
}
void do_waiters();
friend struct C_FinishSplits;
friend struct C_OpenPGs;
- // -- op queue --
- friend std::ostream& operator<<(std::ostream& out, const io_queue& q);
-
- const io_queue op_queue;
-public:
- const unsigned int op_prio_cutoff;
protected:
/*
* The ordered op delivery chain is:
*
- * fast dispatch -> pqueue back
- * pqueue front <-> to_process back
+ * fast dispatch -> scheduler back
+ * scheduler front <-> to_process back
* to_process front -> RunVis(item)
* <- queue_front()
*
- * The pqueue is per-shard, and to_process is per pg_slot. Items can be
- * pushed back up into to_process and/or pqueue while order is preserved.
+ * The scheduler is per-shard, and to_process is per pg_slot. Items can be
+ * pushed back up into to_process and/or scheduler while order is preserved.
*
* Multiple worker threads can operate on each shard.
*
* wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
* and already requeued the items.
*/
- friend class PGOpItem;
- friend class PGPeeringItem;
- friend class PGRecovery;
- friend class PGDelete;
+ friend class ceph::osd::scheduler::PGOpItem;
+ friend class ceph::osd::scheduler::PGPeeringItem;
+ friend class ceph::osd::scheduler::PGRecovery;
+ friend class ceph::osd::scheduler::PGDelete;
class ShardedOpWQ
- : public ShardedThreadPool::ShardedWQ<OpQueueItem>
+ : public ShardedThreadPool::ShardedWQ<OpSchedulerItem>
{
OSD *osd;
time_t ti,
time_t si,
ShardedThreadPool* tp)
- : ShardedThreadPool::ShardedWQ<OpQueueItem>(ti, si, tp),
+ : ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
osd(o) {
}
void _add_slot_waiter(
spg_t token,
OSDShardPGSlot *slot,
- OpQueueItem&& qi);
+ OpSchedulerItem&& qi);
/// try to do some work
void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
/// enqueue a new item
- void _enqueue(OpQueueItem&& item) override;
+ void _enqueue(OpSchedulerItem&& item) override;
/// requeue an old item (at the front of the line)
- void _enqueue_front(OpQueueItem&& item) override;
+ void _enqueue_front(OpSchedulerItem&& item) override;
void return_waiting_threads() override {
for(uint32_t i = 0; i < osd->num_shards; i++) {
std::scoped_lock l{sdata->shard_lock};
f->open_object_section(queue_name);
- sdata->pqueue->dump(f);
+ sdata->scheduler->dump(*f);
f->close_section();
}
}
ceph_assert(sdata);
std::lock_guard l(sdata->shard_lock);
if (thread_index < osd->num_shards) {
- return sdata->pqueue->empty() && sdata->context_queue.empty();
+ return sdata->scheduler->empty() && sdata->context_queue.empty();
} else {
- return sdata->pqueue->empty();
+ return sdata->scheduler->empty();
}
}
void enqueue_peering_evt(
spg_t pgid,
PGPeeringEventRef ref);
- void enqueue_peering_evt_front(
- spg_t pgid,
- PGPeeringEventRef ref);
void dequeue_peering_evt(
OSDShard *sdata,
PG *pg,
protected:
// -- osd map --
- OSDMapRef osdmap;
- OSDMapRef get_osdmap() {
- return osdmap;
+ // TODO: switch to std::atomic<OSDMapRef> when C++20 will be available.
+ OSDMapRef _osdmap;
+ void set_osdmap(OSDMapRef osdmap) {
+ std::atomic_store(&_osdmap, osdmap);
+ }
+ OSDMapRef get_osdmap() const {
+ return std::atomic_load(&_osdmap);
}
epoch_t get_osdmap_epoch() const {
+ // XXX: performance?
+ auto osdmap = get_osdmap();
return osdmap ? osdmap->get_epoch() : 0;
}
pool_pg_num_history_t pg_num_history;
- utime_t had_map_since;
- RWLock map_lock;
+ ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock");
list<OpRequestRef> waiting_for_osdmap;
deque<utime_t> osd_markdown_log;
epoch_t advance_to,
PG *pg,
ThreadPool::TPHandle &handle,
- PG::RecoveryCtx *rctx);
+ PeeringCtx &rctx);
void consume_map();
void activate_map();
OSDMapRef add_map(OSDMap *o) {
return service.add_map(o);
}
- void add_map_bl(epoch_t e, bufferlist& bl) {
- return service.add_map_bl(e, bl);
- }
bool get_map_bl(epoch_t e, bufferlist& bl) {
return service.get_map_bl(e, bl);
}
- void add_map_inc_bl(epoch_t e, bufferlist& bl) {
- return service.add_map_inc_bl(e, bl);
- }
public:
// -- shards --
}
protected:
- Mutex merge_lock = {"OSD::merge_lock"};
+ ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
/// merge epoch -> target pgid -> source pgid -> pg
map<epoch_t,map<spg_t,map<spg_t,PGRef>>> merge_waiters;
std::atomic<size_t> num_pgs = {0};
std::mutex pending_creates_lock;
- using create_from_osd_t = std::pair<pg_t, bool /* is primary*/>;
+ using create_from_osd_t = std::pair<spg_t, bool /* is primary*/>;
std::set<create_from_osd_t> pending_creates_from_osd;
unsigned pending_creates_from_mon = 0;
const set<spg_t> &childpgids, set<PGRef> *out_pgs,
OSDMapRef curmap,
OSDMapRef nextmap,
- PG::RecoveryCtx *rctx);
+ PeeringCtx &rctx);
void _finish_splits(set<PGRef>& pgs);
// == monitor interaction ==
- Mutex mon_report_lock;
+ ceph::mutex mon_report_lock = ceph::make_mutex("OSD::mon_report_lock");
utime_t last_mon_report;
Finisher boot_finisher;
void _preboot(epoch_t oldest, epoch_t newest);
void _send_boot();
void _collect_metadata(map<string,string> *pmeta);
+ void _get_purged_snaps();
+ void handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *r);
void start_waiting_for_healthy();
bool _is_healthy();
void cancel_pending_failures();
ceph::coarse_mono_clock::time_point last_sent_beacon;
- Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
+ ceph::mutex min_last_epoch_clean_lock = ceph::make_mutex("OSD::min_last_epoch_clean_lock");
epoch_t min_last_epoch_clean = 0;
// which pgs were scanned for min_lec
std::vector<pg_t> min_last_epoch_clean_pgs;
return service.get_tid();
}
+ double scrub_sleep_time(bool must_scrub);
+
// -- generic pg peering --
- PG::RecoveryCtx create_context();
- void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
+ PeeringCtx create_context();
+ void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
ThreadPool::TPHandle *handle = NULL);
- void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
- ThreadPool::TPHandle *handle = NULL);
- void discard_context(PG::RecoveryCtx &ctx);
- void do_notifies(map<int,
- vector<pair<pg_notify_t, PastIntervals> > >&
- notify_list,
- OSDMapRef map);
- void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
- OSDMapRef map);
- void do_infos(map<int,
- vector<pair<pg_notify_t, PastIntervals> > >& info_map,
- OSDMapRef map);
bool require_mon_peer(const Message *m);
bool require_mon_or_mgr_peer(const Message *m);
* address as in the given map.
* @pre op was sent by an OSD using the cluster messenger
*/
- bool require_same_peer_instance(const Message *m, OSDMapRef& map,
+ bool require_same_peer_instance(const Message *m, const OSDMapRef& map,
bool is_fast_dispatch);
bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
void handle_fast_force_recovery(MOSDForceRecovery *m);
// -- commands --
- struct Command {
- vector<string> cmd;
- ceph_tid_t tid;
- bufferlist indata;
- ConnectionRef con;
-
- Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
- : cmd(c), tid(t), indata(bl), con(co) {}
- };
- list<Command*> command_queue;
- struct CommandWQ : public ThreadPool::WorkQueue<Command> {
- OSD *osd;
- CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
- : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
-
- bool _empty() override {
- return osd->command_queue.empty();
- }
- bool _enqueue(Command *c) override {
- osd->command_queue.push_back(c);
- return true;
- }
- void _dequeue(Command *pg) override {
- ceph_abort();
- }
- Command *_dequeue() override {
- if (osd->command_queue.empty())
- return NULL;
- Command *c = osd->command_queue.front();
- osd->command_queue.pop_front();
- return c;
- }
- void _process(Command *c, ThreadPool::TPHandle &) override {
- osd->osd_lock.lock();
- if (osd->is_stopping()) {
- osd->osd_lock.unlock();
- delete c;
- return;
- }
- osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
- osd->osd_lock.unlock();
- delete c;
- }
- void _clear() override {
- while (!osd->command_queue.empty()) {
- Command *c = osd->command_queue.front();
- osd->command_queue.pop_front();
- delete c;
- }
- }
- } command_wq;
-
- void handle_command(class MMonCommand *m);
void handle_command(class MCommand *m);
- void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
- int _do_command(
- Connection *con, cmdmap_t& cmdmap, ceph_tid_t tid, bufferlist& data,
- bufferlist& odata, stringstream& ss, stringstream& ds);
// -- pg recovery --
case MSG_MON_COMMAND:
case MSG_OSD_PG_CREATE2:
case MSG_OSD_PG_QUERY:
+ case MSG_OSD_PG_QUERY2:
case MSG_OSD_PG_INFO:
+ case MSG_OSD_PG_INFO2:
case MSG_OSD_PG_NOTIFY:
+ case MSG_OSD_PG_NOTIFY2:
case MSG_OSD_PG_LOG:
case MSG_OSD_PG_TRIM:
case MSG_OSD_PG_REMOVE:
case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
case MSG_OSD_PG_RECOVERY_DELETE:
case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+ case MSG_OSD_PG_LEASE:
+ case MSG_OSD_PG_LEASE_ACK:
return true;
default:
return false;
}
void ms_fast_dispatch(Message *m) override;
bool ms_dispatch(Message *m) override;
- bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer) override;
void ms_handle_connect(Connection *con) override;
void ms_handle_fast_connect(Connection *con) override;
void ms_handle_fast_accept(Connection *con) override;
int ms_handle_authentication(Connection *con) override;
- KeyStore *ms_get_auth1_authorizer_keystore() override;
bool ms_handle_reset(Connection *con) override;
void ms_handle_remote_reset(Connection *con) override {}
bool ms_handle_refused(Connection *con) override;
- io_queue get_io_queue() const {
- if (cct->_conf->osd_op_queue == "debug_random") {
- static io_queue index_lookup[] = { io_queue::prioritized,
- io_queue::weightedpriority,
- io_queue::mclock_opclass,
- io_queue::mclock_client };
- srand(time(NULL));
- unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
- return index_lookup[which];
- } else if (cct->_conf->osd_op_queue == "prioritized") {
- return io_queue::prioritized;
- } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
- return io_queue::mclock_opclass;
- } else if (cct->_conf->osd_op_queue == "mclock_client") {
- return io_queue::mclock_client;
- } else {
- // default / catch-all is 'wpq'
- return io_queue::weightedpriority;
- }
- }
-
- unsigned int get_io_prio_cut() const {
- if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
- srand(time(NULL));
- return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
- } else if (cct->_conf->osd_op_queue_cut_off == "high") {
- return CEPH_MSG_PRIO_HIGH;
- } else {
- // default / catch-all is 'low'
- return CEPH_MSG_PRIO_LOW;
- }
- }
-
public:
/* internal and external can point to the same messenger, they will still
* be cleaned up properly*/
void handle_fast_scrub(struct MOSDScrub2 *m);
void handle_osd_ping(class MOSDPing *m);
- int init_op_flags(OpRequestRef& op);
-
+ size_t get_num_cache_shards();
int get_num_op_shards();
int get_num_op_threads();
float get_osd_delete_sleep();
float get_osd_snap_trim_sleep();
+ int get_recovery_max_active();
+
+ void scrub_purged_snaps();
void probe_smart(const string& devid, ostream& ss);
public:
uuid_d *cluster_fsid,
uuid_d *osd_fsid,
int *whoami,
- int *min_osd_release);
+ ceph_release_t *min_osd_release);
// startup/shutdown
friend class OSDService;
private:
- void set_perf_queries(
- const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &queries);
- void get_perf_reports(
- std::map<OSDPerfMetricQuery, OSDPerfMetricReport> *reports);
+ void set_perf_queries(const ConfigPayload &config_payload);
+ MetricPayload get_perf_reports();
- Mutex m_perf_queries_lock = {"OSD::m_perf_queries_lock"};
+ ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
std::list<OSDPerfMetricQuery> m_perf_queries;
std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
};
-std::ostream& operator<<(std::ostream& out, const io_queue& q);
-
-
//compatibility of the executable
extern const CompatSet::Feature ceph_osd_feature_compat[];
extern const CompatSet::Feature ceph_osd_feature_ro_compat[];