#endif
#include "osd/PG.h"
+#include "osd/scrub_machine.h"
+#include "osd/pg_scrubber.h"
#include "include/types.h"
#include "include/compat.h"
#include "common/ceph_releases.h"
#include "common/ceph_time.h"
#include "common/version.h"
+#include "common/async/blocked_completion.h"
#include "common/pick_address.h"
#include "common/blkdev.h"
#include "common/numa.h"
#include "messages/MOSDPGInfo2.h"
#include "messages/MOSDPGCreate.h"
#include "messages/MOSDPGCreate2.h"
-#include "messages/MOSDPGScan.h"
#include "messages/MBackfillReserve.h"
#include "messages/MRecoveryReserve.h"
#include "messages/MOSDForceRecovery.h"
#include "messages/MCommandReply.h"
#include "messages/MPGStats.h"
-#include "messages/MPGStatsAck.h"
#include "messages/MWatchNotify.h"
#include "messages/MOSDPGPush.h"
#include "perfglue/cpu_profiler.h"
#include "perfglue/heap_profiler.h"
+#include "osd/ClassHandler.h"
#include "osd/OpRequest.h"
#include "auth/AuthAuthorizeHandler.h"
#else
#define tracepoint(...)
#endif
+#ifdef HAVE_JAEGER
+#include "common/tracer.h"
+#endif
#define dout_context cct
#define dout_subsys ceph_subsys_osd
#undef dout_prefix
#define dout_prefix _prefix(_dout, whoami, get_osdmap_epoch())
+using std::deque;
+using std::list;
+using std::lock_guard;
+using std::make_pair;
+using std::make_tuple;
+using std::make_unique;
+using std::map;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::to_string;
+using std::unique_ptr;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::bufferptr;
+using ceph::decode;
+using ceph::encode;
+using ceph::fixed_u_to_string;
+using ceph::Formatter;
+using ceph::heartbeat_handle_d;
+using ceph::make_mutex;
+
using namespace ceph::osd::scheduler;
using TOPNSPC::common::cmd_getval;
return compat;
}
-OSDService::OSDService(OSD *osd) :
+OSDService::OSDService(OSD *osd, ceph::async::io_context_pool& poolctx) :
osd(osd),
cct(osd->cct),
whoami(osd->whoami), store(osd->store),
last_recalibrate(ceph_clock_now()),
promote_max_objects(0),
promote_max_bytes(0),
+ poolctx(poolctx),
objecter(make_unique<Objecter>(osd->client_messenger->cct,
osd->objecter_messenger,
- osd->monc, nullptr)),
+ osd->monc, poolctx)),
m_objecter_finishers(cct->_conf->osd_objecter_finishers),
watch_timer(osd->client_messenger->cct, watch_lock),
next_notif_id(0),
}
#ifdef PG_DEBUG_REFS
-void OSDService::add_pgid(spg_t pgid, PG *pg){
+void OSDService::add_pgid(spg_t pgid, PG *pg) {
std::lock_guard l(pgid_lock);
if (!pgid_tracker.count(pgid)) {
live_pgs[pgid] = pg;
<< " no agent_work, delay for " << cct->_conf->osd_agent_delay_time
<< " seconds" << dendl;
- osd->logger->inc(l_osd_tier_delay);
+ logger->inc(l_osd_tier_delay);
// Queue a timer to call agent_choose_mode for this pg in 5 seconds
std::lock_guard timer_locker{agent_timer_lock};
Context *cb = new AgentTimeoutCB(pg);
used = bytes - avail;
}
- osd->logger->set(l_osd_stat_bytes, bytes);
- osd->logger->set(l_osd_stat_bytes_used, used);
- osd->logger->set(l_osd_stat_bytes_avail, avail);
+ logger->set(l_osd_stat_bytes, bytes);
+ logger->set(l_osd_stat_bytes_used, used);
+ logger->set(l_osd_stat_bytes_avail, avail);
std::lock_guard l(stat_lock);
osd_stat.statfs = stbuf;
}
max--;
max_bytes -= bl.length();
- m->maps[since].claim(bl);
+ m->maps[since] = std::move(bl);
}
for (epoch_t e = since + 1; e <= to; ++e) {
bufferlist bl;
if (get_inc_map_bl(e, bl)) {
- m->incremental_maps[e].claim(bl);
+ m->incremental_maps[e] = std::move(bl);
} else {
dout(10) << __func__ << " missing incremental map " << e << dendl;
if (!get_map_bl(e, bl)) {
derr << __func__ << " also missing full map " << e << dendl;
goto panic;
}
- m->maps[e].claim(bl);
+ m->maps[e] = std::move(bl);
}
max--;
max_bytes -= bl.length();
// send something
bufferlist bl;
if (get_inc_map_bl(m->newest_map, bl)) {
- m->incremental_maps[m->newest_map].claim(bl);
+ m->incremental_maps[m->newest_map] = std::move(bl);
} else {
derr << __func__ << " unable to load latest map " << m->newest_map << dendl;
if (!get_map_bl(m->newest_map, bl)) {
<< dendl;
ceph_abort();
}
- m->maps[m->newest_map].claim(bl);
+ m->maps[m->newest_map] = std::move(bl);
}
return m;
}
{
bool found = map_bl_cache.lookup(e, &bl);
if (found) {
- if (logger)
- logger->inc(l_osd_map_bl_cache_hit);
+ logger->inc(l_osd_map_bl_cache_hit);
return true;
}
- if (logger)
- logger->inc(l_osd_map_bl_cache_miss);
+ logger->inc(l_osd_map_bl_cache_miss);
found = store->read(meta_ch,
OSD::get_osdmap_pobject_name(e), 0, 0, bl,
CEPH_OSD_OP_FLAG_FADVISE_WILLNEED) >= 0;
std::lock_guard l(map_cache_lock);
bool found = map_bl_inc_cache.lookup(e, &bl);
if (found) {
- if (logger)
- logger->inc(l_osd_map_bl_cache_hit);
+ logger->inc(l_osd_map_bl_cache_hit);
return true;
}
- if (logger)
- logger->inc(l_osd_map_bl_cache_miss);
+ logger->inc(l_osd_map_bl_cache_miss);
found = store->read(meta_ch,
OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl,
CEPH_OSD_OP_FLAG_FADVISE_WILLNEED) >= 0;
OSDMapRef retval = map_cache.lookup(epoch);
if (retval) {
dout(30) << "get_map " << epoch << " -cached" << dendl;
- if (logger) {
- logger->inc(l_osd_map_cache_hit);
- }
+ logger->inc(l_osd_map_cache_hit);
return retval;
}
- if (logger) {
+ {
logger->inc(l_osd_map_cache_miss);
epoch_t lb = map_cache.cached_key_lower_bound();
if (epoch < lb) {
pg->get_osdmap_epoch()));
}
-void OSDService::queue_for_scrub(PG *pg, bool with_high_priority)
+template <class MSG_TYPE>
+void OSDService::queue_scrub_event_msg(PG* pg,
+ Scrub::scrub_prio_t with_priority,
+ unsigned int qu_priority)
{
- unsigned scrub_queue_priority = pg->scrubber.priority;
- if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
- scrub_queue_priority = cct->_conf->osd_client_op_priority;
- }
const auto epoch = pg->get_osdmap_epoch();
- enqueue_back(
- OpSchedulerItem(
- unique_ptr<OpSchedulerItem::OpQueueable>(new PGScrub(pg->get_pgid(), epoch)),
- cct->_conf->osd_scrub_cost,
- scrub_queue_priority,
- ceph_clock_now(),
- 0,
- epoch));
+ auto msg = new MSG_TYPE(pg->get_pgid(), epoch);
+ dout(15) << "queue a scrub event (" << *msg << ") for " << *pg << ". Epoch: " << epoch << dendl;
+
+ enqueue_back(OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(msg), cct->_conf->osd_scrub_cost,
+ pg->scrub_requeue_priority(with_priority, qu_priority), ceph_clock_now(), 0, epoch));
+}
+
+template <class MSG_TYPE>
+void OSDService::queue_scrub_event_msg(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ const auto epoch = pg->get_osdmap_epoch();
+ auto msg = new MSG_TYPE(pg->get_pgid(), epoch);
+ dout(15) << "queue a scrub event (" << *msg << ") for " << *pg << ". Epoch: " << epoch << dendl;
+
+ enqueue_back(OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(msg), cct->_conf->osd_scrub_cost,
+ pg->scrub_requeue_priority(with_priority), ceph_clock_now(), 0, epoch));
+}
+
+void OSDService::queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ queue_scrub_event_msg<PGScrub>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ queue_scrub_event_msg<PGScrubAfterRepair>(pg, with_priority);
+}
+
+void OSDService::queue_for_rep_scrub(PG* pg,
+ Scrub::scrub_prio_t with_priority,
+ unsigned int qu_priority)
+{
+ queue_scrub_event_msg<PGRepScrub>(pg, with_priority, qu_priority);
+}
+
+void OSDService::queue_for_rep_scrub_resched(PG* pg,
+ Scrub::scrub_prio_t with_priority,
+ unsigned int qu_priority)
+{
+ // Resulting scrub event: 'SchedReplica'
+ queue_scrub_event_msg<PGRepScrubResched>(pg, with_priority, qu_priority);
+}
+
+void OSDService::queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ // Resulting scrub event: 'RemotesReserved'
+ queue_scrub_event_msg<PGScrubResourcesOK>(pg, with_priority);
+}
+
+void OSDService::queue_for_scrub_denied(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ // Resulting scrub event: 'ReservationFailure'
+ queue_scrub_event_msg<PGScrubDenied>(pg, with_priority);
+}
+
+void OSDService::queue_for_scrub_resched(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ // Resulting scrub event: 'InternalSchedScrub'
+ queue_scrub_event_msg<PGScrubResched>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_pushes_update(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ // Resulting scrub event: 'ActivePushesUpd'
+ queue_scrub_event_msg<PGScrubPushesUpdate>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ queue_scrub_event_msg<PGScrubAppliedUpdate>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_unblocking(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ // Resulting scrub event: 'Unblocked'
+ queue_scrub_event_msg<PGScrubUnblocked>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_digest_update(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ // Resulting scrub event: 'DigestUpdate'
+ queue_scrub_event_msg<PGScrubDigestUpdate>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ // Resulting scrub event: 'GotReplicas'
+ queue_scrub_event_msg<PGScrubGotReplMaps>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority)
+{
+ // Resulting scrub event: 'ReplicaPushesUpd'
+ queue_scrub_event_msg<PGScrubReplicaPushes>(pg, with_priority);
}
void OSDService::queue_for_pg_delete(spg_t pgid, epoch_t e)
#define dout_prefix *_dout
// Commands shared between OSD's console and admin console:
-namespace ceph {
-namespace osd_cmds {
+namespace ceph::osd_cmds {
int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f, std::ostream& os);
-
-}} // namespace ceph::osd_cmds
+
+} // namespace ceph::osd_cmds
int OSD::mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, string osdspec_affinity)
{
Messenger *hb_back_serverm,
Messenger *osdc_messenger,
MonClient *mc,
- const std::string &dev, const std::string &jdev) :
+ const std::string &dev, const std::string &jdev,
+ ceph::async::io_context_pool& poolctx) :
Dispatcher(cct_),
tick_timer(cct, osd_lock),
tick_timer_without_osd_lock(cct, tick_timer_lock),
objecter_messenger(osdc_messenger),
monc(mc),
mgrc(cct_, client_messenger, &mc->monmap),
- logger(NULL),
- recoverystate_perf(NULL),
+ logger(create_logger()),
+ recoverystate_perf(create_recoverystate_perf()),
store(store_),
log_client(cct, client_messenger, &mc->monmap, LogClient::NO_FLAGS),
clog(log_client.create_channel()),
test_ops_hook(NULL),
op_shardedwq(
this,
- cct->_conf->osd_op_thread_timeout,
- cct->_conf->osd_op_thread_suicide_timeout,
+ ceph::make_timespan(cct->_conf->osd_op_thread_timeout),
+ ceph::make_timespan(cct->_conf->osd_op_thread_suicide_timeout),
&osd_op_tp),
last_pg_create_epoch(0),
boot_finisher(cct),
up_thru_wanted(0),
requested_full_first(0),
requested_full_last(0),
- service(this)
+ service(this, poolctx)
{
if (!gss_ktfile_client.empty()) {
- // Assert we can export environment variable
- /*
+ // Assert we can export environment variable
+ /*
The default client keytab is used, if it is present and readable,
to automatically obtain initial credentials for GSSAPI client
applications. The principal name of the first entry in the client
2. The default_client_keytab_name profile variable in [libdefaults].
3. The hardcoded default, DEFCKTNAME.
*/
- const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
+ const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
gss_ktfile_client.c_str(), 1));
ceph_assert(set_result == 0);
}
f->open_object_section("pq");
op_shardedwq.dump(f);
f->close_section();
- } else if (prefix == "dump_blacklist") {
+ } else if (prefix == "dump_blocklist") {
list<pair<entity_addr_t,utime_t> > bl;
OSDMapRef curmap = service.get_osdmap();
- f->open_array_section("blacklist");
- curmap->get_blacklist(&bl);
+ f->open_array_section("blocklist");
+ curmap->get_blocklist(&bl);
for (list<pair<entity_addr_t,utime_t> >::iterator it = bl.begin();
it != bl.end(); ++it) {
f->open_object_section("entry");
it->second.localtime(f->dump_stream("expire_time"));
f->close_section(); //entry
}
- f->close_section(); //blacklist
+ f->close_section(); //blocklist
} else if (prefix == "dump_watchers") {
list<obj_watch_item_t> watchers;
// scan pg's
store->compact();
auto end = ceph::coarse_mono_clock::now();
double duration = std::chrono::duration<double>(end-start).count();
- dout(1) << "finished manual compaction in "
+ dout(1) << "finished manual compaction in "
<< duration
<< " seconds" << dendl;
f->open_object_section("compact_result");
check_osdmap_features();
- create_recoverystate_perf();
-
{
epoch_t bind_epoch = osdmap->get_epoch();
service.set_epochs(NULL, NULL, &bind_epoch);
dout(2) << "superblock: I am osd." << superblock.whoami << dendl;
- create_logger();
+ if (cct->_conf.get_val<bool>("osd_compact_on_start")) {
+ dout(2) << "compacting object store's omap" << dendl;
+ store->compact();
+ }
// prime osd stats
{
service.set_statfs(stbuf, alerts);
}
- // client_messenger auth_client is already set up by monc.
+ // client_messenger's auth_client will be set up by monc->init() later.
for (auto m : { cluster_messenger,
objecter_messenger,
hb_front_client_messenger,
if (r < 0)
goto out;
- mgrc.set_pgstats_cb([this](){ return collect_pg_stats(); });
+ mgrc.set_pgstats_cb([this]() { return collect_pg_stats(); });
mgrc.set_perf_metric_query_cb(
[this](const ConfigPayload &config_payload) {
set_perf_queries(config_payload);
asok_hook,
"dump op priority queue state");
ceph_assert(r == 0);
- r = admin_socket->register_command("dump_blacklist",
+ r = admin_socket->register_command("dump_blocklist",
asok_hook,
- "dump blacklisted clients and times");
+ "dump blocklisted clients and times");
ceph_assert(r == 0);
r = admin_socket->register_command("dump_watchers",
asok_hook,
ceph_assert(r == 0);
}
-void OSD::create_logger()
+PerfCounters* OSD::create_logger()
{
- dout(10) << "create_logger" << dendl;
-
- logger = build_osd_logger(cct);
+ PerfCounters* logger = build_osd_logger(cct);
cct->get_perfcounters_collection()->add(logger);
+ return logger;
}
-void OSD::create_recoverystate_perf()
+PerfCounters* OSD::create_recoverystate_perf()
{
- dout(10) << "create_recoverystate_perf" << dendl;
-
- recoverystate_perf = build_recoverystate_perf(cct);
+ PerfCounters* recoverystate_perf = build_recoverystate_perf(cct);
cct->get_perfcounters_collection()->add(recoverystate_perf);
+ return recoverystate_perf;
}
int OSD::shutdown()
{
if (cct->_conf->osd_fast_shutdown) {
derr << "*** Immediate shutdown (osd_fast_shutdown=true) ***" << dendl;
+ if (cct->_conf->osd_fast_shutdown_notify_mon)
+ service.prepare_to_stop();
cct->_log->flush();
_exit(0);
}
}
decode(ec_profile, p);
}
- PGPool pool(cct, createmap, pgid.pool(), pi, name);
+ PGPool pool(createmap, pgid.pool(), pi, name);
PG *pg;
if (pi.type == pg_pool_t::TYPE_REPLICATED ||
pi.type == pg_pool_t::TYPE_ERASURE)
i != heartbeat_peers.end();
++i) {
int peer = i->first;
+ Session *s = static_cast<Session*>(i->second.con_back->get_priv().get());
+ if (!s) {
+ dout(30) << "heartbeat osd." << peer << " has no open con" << dendl;
+ continue;
+ }
dout(30) << "heartbeat sending ping to osd." << peer << dendl;
i->second.last_tx = now;
if (i->second.hb_interval_start == utime_t())
i->second.hb_interval_start = now;
- Session *s = static_cast<Session*>(i->second.con_back->get_priv().get());
std::optional<ceph::signedspan> delta_ub;
s->stamps->sent_ping(&delta_ub);
ceph_assert(ceph_mutex_is_locked(tick_timer_lock));
dout(10) << "tick_without_osd_lock" << dendl;
- logger->set(l_osd_cached_crc, buffer::get_cached_crc());
- logger->set(l_osd_cached_crc_adjusted, buffer::get_cached_crc_adjusted());
- logger->set(l_osd_missed_crc, buffer::get_missed_crc());
+ logger->set(l_osd_cached_crc, ceph::buffer::get_cached_crc());
+ logger->set(l_osd_cached_crc_adjusted, ceph::buffer::get_cached_crc_adjusted());
+ logger->set(l_osd_missed_crc, ceph::buffer::get_missed_crc());
// refresh osd stats
struct store_statfs_t stbuf;
// borrow lec lock to pretect last_sent_beacon from changing
std::lock_guard l{min_last_epoch_clean_lock};
const auto elapsed = now - last_sent_beacon;
- if (chrono::duration_cast<chrono::seconds>(elapsed).count() >
+ if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() >
cct->_conf->osd_beacon_report_interval) {
need_send_beacon = true;
}
return true;
}
-struct C_OSD_GetVersion : public Context {
+struct CB_OSD_GetVersion {
OSD *osd;
- uint64_t oldest, newest;
- explicit C_OSD_GetVersion(OSD *o) : osd(o), oldest(0), newest(0) {}
- void finish(int r) override {
- if (r >= 0)
+ explicit CB_OSD_GetVersion(OSD *o) : osd(o) {}
+ void operator ()(boost::system::error_code ec, version_t newest,
+ version_t oldest) {
+ if (!ec)
osd->_got_mon_epochs(oldest, newest);
}
};
set_state(STATE_PREBOOT);
dout(10) << "start_boot - have maps " << superblock.oldest_map
<< ".." << superblock.newest_map << dendl;
- C_OSD_GetVersion *c = new C_OSD_GetVersion(this);
- monc->get_version("osdmap", &c->newest, &c->oldest, c);
+ monc->get_version("osdmap", CB_OSD_GetVersion(this));
}
void OSD::_got_mon_epochs(epoch_t oldest, epoch_t newest)
} else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
derr << "osdmap SORTBITWISE OSDMap flag is NOT set; please set it"
<< dendl;
- } else if (osdmap->require_osd_release < ceph_release_t::luminous) {
- derr << "osdmap require_osd_release < luminous; please upgrade to luminous"
- << dendl;
} else if (service.need_fullness_update()) {
derr << "osdmap fullness state needs update" << dendl;
send_full_update();
requested_full_first = requested_full_last = 0;
return;
}
-
+
requested_full_first = e + 1;
dout(10) << __func__ << " " << e << ", requested " << requested_full_first
std::lock_guard l{min_last_epoch_clean_lock};
beacon = new MOSDBeacon(get_osdmap_epoch(),
min_last_epoch_clean,
- superblock.last_purged_snaps_scrub);
+ superblock.last_purged_snaps_scrub,
+ cct->_conf->osd_beacon_report_interval);
beacon->pgs = min_last_epoch_clean_pgs;
last_sent_beacon = now;
}
void OSD::ms_fast_dispatch(Message *m)
{
+
+#ifdef HAVE_JAEGER
+ jaeger_tracing::init_tracer("osd-services-reinit");
+ dout(10) << "jaeger tracer after " << opentracing::Tracer::Global() << dendl;
+ auto dispatch_span = jaeger_tracing::new_span(__func__);
+#endif
FUNCTRACE(cct);
if (service.is_stopping()) {
m->put();
tracepoint(osd, ms_fast_dispatch, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
}
-
+#ifdef HAVE_JAEGER
+ op->set_osd_parent_span(dispatch_span);
+ if (op->osd_parent_span) {
+ auto op_req_span = jaeger_tracing::child_span("op-request-created", op->osd_parent_span);
+ op->set_osd_parent_span(op_req_span);
+ }
+#endif
if (m->trace)
op->osd_trace.init("osd op", &trace_endpoint, &m->trace);
service.release_map(nextmap);
}
}
- OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false);
+ OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false);
}
int OSD::ms_handle_authentication(Connection *con)
try {
decode(str, p);
}
- catch (buffer::error& e) {
+ catch (ceph::buffer::error& e) {
dout(10) << __func__ << " session " << s << " " << s->entity_name
<< " failed to decode caps string" << dendl;
ret = -EACCES;
return pgid < rhs.pgid;
}
+void OSDService::dumps_scrub(ceph::Formatter *f)
+{
+ ceph_assert(f != nullptr);
+ std::lock_guard l(sched_scrub_lock);
+
+ f->open_array_section("scrubs");
+ for (const auto &i: sched_scrub_pg) {
+ f->open_object_section("scrub");
+ f->dump_stream("pgid") << i.pgid;
+ f->dump_stream("sched_time") << i.sched_time;
+ f->dump_stream("deadline") << i.deadline;
+ f->dump_bool("forced", i.sched_time == PgScrubber::scrub_must_stamp());
+ f->close_section();
+ }
+ f->close_section();
+}
+
double OSD::scrub_sleep_time(bool must_scrub)
{
if (must_scrub) {
time_permit = true;
}
}
- if (!time_permit) {
+ if (time_permit) {
dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
<< " - " << cct->_conf->osd_scrub_end_hour
- << " now " << bdt.tm_hour << " = no" << dendl;
+ << " now " << bdt.tm_hour << " = yes" << dendl;
} else {
dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
<< " - " << cct->_conf->osd_scrub_end_hour
- << " now " << bdt.tm_hour << " = yes" << dendl;
+ << " now " << bdt.tm_hour << " = no" << dendl;
}
return time_permit;
}
void OSD::sched_scrub()
{
+ dout(20) << __func__ << " sched_scrub starts" << dendl;
+
// if not permitted, fail fast
if (!service.can_inc_scrubs()) {
+ dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl;
return;
}
bool allow_requested_repair_only = false;
if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) {
if (!cct->_conf->osd_repair_during_recovery) {
- dout(20) << __func__ << " not scheduling scrubs due to active recovery" << dendl;
+ dout(15) << __func__ << ": not scheduling scrubs due to active recovery" << dendl;
return;
}
dout(10) << __func__
bool load_is_low = scrub_load_below_threshold();
dout(20) << "sched_scrub load_is_low=" << (int)load_is_low << dendl;
- OSDService::ScrubJob scrub;
- if (service.first_scrub_stamp(&scrub)) {
+ OSDService::ScrubJob scrub_job;
+ if (service.first_scrub_stamp(&scrub_job)) {
do {
- dout(30) << "sched_scrub examine " << scrub.pgid << " at " << scrub.sched_time << dendl;
+ dout(30) << "sched_scrub examine " << scrub_job.pgid << " at " << scrub_job.sched_time << dendl;
- if (scrub.sched_time > now) {
+ if (scrub_job.sched_time > now) {
// save ourselves some effort
- dout(10) << "sched_scrub " << scrub.pgid << " scheduled at " << scrub.sched_time
+ dout(20) << "sched_scrub " << scrub_job.pgid << " scheduled at " << scrub_job.sched_time
<< " > " << now << dendl;
break;
}
- if ((scrub.deadline.is_zero() || scrub.deadline >= now) && !(time_permit && load_is_low)) {
- dout(10) << __func__ << " not scheduling scrub for " << scrub.pgid << " due to "
+ if ((scrub_job.deadline.is_zero() || scrub_job.deadline >= now) && !(time_permit && load_is_low)) {
+ dout(15) << __func__ << " not scheduling scrub for " << scrub_job.pgid << " due to "
<< (!time_permit ? "time not permit" : "high load") << dendl;
continue;
}
- PGRef pg = _lookup_lock_pg(scrub.pgid);
- if (!pg)
+ PGRef pg = _lookup_lock_pg(scrub_job.pgid);
+ if (!pg) {
+ dout(20) << __func__ << " pg " << scrub_job.pgid << " not found" << dendl;
continue;
+ }
+
// This has already started, so go on to the next scrub job
- if (pg->scrubber.active) {
+ if (pg->is_scrub_active()) {
pg->unlock();
- dout(30) << __func__ << ": already in progress pgid " << scrub.pgid << dendl;
+ dout(20) << __func__ << ": already in progress pgid " << scrub_job.pgid << dendl;
continue;
}
- // Skip other kinds of scrubing if only explicitly requested repairing is allowed
- if (allow_requested_repair_only && !pg->scrubber.must_repair) {
+ // Skip other kinds of scrubbing if only explicitly requested repairing is allowed
+ if (allow_requested_repair_only && !pg->m_planned_scrub.must_repair) {
pg->unlock();
- dout(10) << __func__ << " skip " << scrub.pgid
+ dout(10) << __func__ << " skip " << scrub_job.pgid
<< " because repairing is not explicitly requested on it"
<< dendl;
continue;
}
+
// If it is reserving, let it resolve before going to the next scrub job
- if (pg->scrubber.local_reserved && !pg->scrubber.active) {
+ if (pg->m_scrubber->is_reserving()) {
pg->unlock();
- dout(30) << __func__ << ": reserve in progress pgid " << scrub.pgid << dendl;
+ dout(10) << __func__ << ": reserve in progress pgid " << scrub_job.pgid << dendl;
break;
}
- dout(10) << "sched_scrub scrubbing " << scrub.pgid << " at " << scrub.sched_time
+ dout(15) << "sched_scrub scrubbing " << scrub_job.pgid << " at " << scrub_job.sched_time
<< (pg->get_must_scrub() ? ", explicitly requested" :
(load_is_low ? ", load_is_low" : " deadline < now"))
<< dendl;
if (pg->sched_scrub()) {
pg->unlock();
+ dout(10) << __func__ << " scheduled a scrub!" << " (~" << scrub_job.pgid << "~)" << dendl;
break;
}
pg->unlock();
- } while (service.next_scrub_stamp(scrub, &scrub));
+ } while (service.next_scrub_stamp(scrub_job, &scrub_job));
}
dout(20) << "sched_scrub done" << dendl;
}
void OSD::resched_all_scrubs()
{
dout(10) << __func__ << ": start" << dendl;
- OSDService::ScrubJob scrub;
- if (service.first_scrub_stamp(&scrub)) {
+ OSDService::ScrubJob scrub_job;
+ if (service.first_scrub_stamp(&scrub_job)) {
do {
- dout(20) << __func__ << ": examine " << scrub.pgid << dendl;
+ dout(20) << __func__ << ": examine " << scrub_job.pgid << dendl;
- PGRef pg = _lookup_lock_pg(scrub.pgid);
+ PGRef pg = _lookup_lock_pg(scrub_job.pgid);
if (!pg)
continue;
- if (!pg->scrubber.must_scrub && !pg->scrubber.need_auto) {
- dout(20) << __func__ << ": reschedule " << scrub.pgid << dendl;
+ if (!pg->m_planned_scrub.must_scrub && !pg->m_planned_scrub.need_auto) {
+ dout(15) << __func__ << ": reschedule " << scrub_job.pgid << dendl;
pg->on_info_history_change();
}
pg->unlock();
- } while (service.next_scrub_stamp(scrub, &scrub));
+ } while (service.next_scrub_stamp(scrub_job, &scrub_job));
}
dout(10) << __func__ << ": done" << dendl;
}
}
pg->get_pg_stats([&](const pg_stat_t& s, epoch_t lec) {
m->pg_stat[pg->pg_id.pgid] = s;
- min_last_epoch_clean = min(min_last_epoch_clean, lec);
+ min_last_epoch_clean = std::min(min_last_epoch_clean, lec);
min_last_epoch_clean_pgs.push_back(pg->pg_id.pgid);
});
}
OSDMapRef newmap = get_map(cur);
ceph_assert(newmap); // we just cached it above!
- // start blacklisting messages sent to peers that go down.
+ // start blocklisting messages sent to peers that go down.
service.pre_publish_map(newmap);
// kill connections to newly down osds
set<int> avoid_ports;
#if defined(__FreeBSD__)
// prevent FreeBSD from grabbing the client_messenger port during
- // rebinding. In which case a cluster_meesneger will connect also
+ // rebinding. In which case a cluster_meesneger will connect also
// to the same port
client_messenger->get_myaddrs().get_ports(&avoid_ports);
#endif
}
ceph_assert(pg->is_locked());
OSDMapRef lastmap = pg->get_osdmap();
- ceph_assert(lastmap->get_epoch() < osd_epoch);
set<PGRef> new_pgs; // any split children
bool ret = true;
// This is true for the first recovery op and when the previous recovery op
// has been scheduled in the past. The next recovery op is scheduled after
// completing the sleep from now.
-
+
if (auto now = ceph::real_clock::now();
service.recovery_schedule_time < now) {
service.recovery_schedule_time = now;
#endif
bool do_unfound = pg->start_recovery_ops(reserved_pushes, handle, &started);
- dout(10) << "do_recovery started " << started << "/" << reserved_pushes
+ dout(10) << "do_recovery started " << started << "/" << reserved_pushes
<< " on " << *pg << dendl;
if (do_unfound) {
const unsigned priority = op->get_req()->get_priority();
const int cost = op->get_req()->get_cost();
const uint64_t owner = op->get_req()->get_source().num();
+ const int type = op->get_req()->get_type();
dout(15) << "enqueue_op " << op << " prio " << priority
+ << " type " << type
<< " cost " << cost
<< " latency " << latency
<< " epoch " << epoch
op->osd_trace.event("enqueue op");
op->osd_trace.keyval("priority", priority);
op->osd_trace.keyval("cost", cost);
+#ifdef HAVE_JAEGER
+ if (op->osd_parent_span) {
+ auto enqueue_span = jaeger_tracing::child_span(__func__, op->osd_parent_span);
+ enqueue_span->Log({
+ {"priority", priority},
+ {"cost", cost},
+ {"epoch", epoch},
+ {"owner", owner},
+ {"type", type}
+ });
+ }
+#endif
op->mark_queued_for_pg();
logger->tinc(l_osd_op_before_queue_op_lat, latency);
- op_shardedwq.queue(
- OpSchedulerItem(
- unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(pg, std::move(op))),
- cost, priority, stamp, owner, epoch));
+ if (type == MSG_OSD_PG_PUSH ||
+ type == MSG_OSD_PG_PUSH_REPLY) {
+ op_shardedwq.queue(
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(new PGRecoveryMsg(pg, std::move(op))),
+ cost, priority, stamp, owner, epoch));
+ } else {
+ op_shardedwq.queue(
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(pg, std::move(op))),
+ cost, priority, stamp, owner, epoch));
+ }
}
void OSD::enqueue_peering_evt(spg_t pgid, PGPeeringEventRef evt)
"osd_map_cache_size",
"osd_pg_epoch_max_lag_factor",
"osd_pg_epoch_persisted_max_stale",
+ "osd_recovery_sleep",
+ "osd_recovery_sleep_hdd",
+ "osd_recovery_sleep_ssd",
+ "osd_recovery_sleep_hybrid",
+ "osd_recovery_max_active",
+ "osd_recovery_max_active_hdd",
+ "osd_recovery_max_active_ssd",
// clog & admin clog
"clog_to_monitors",
"clog_to_syslog",
const std::set <std::string> &changed)
{
std::lock_guard l{osd_lock};
- if (changed.count("osd_max_backfills")) {
- service.local_reserver.set_max(cct->_conf->osd_max_backfills);
- service.remote_reserver.set_max(cct->_conf->osd_max_backfills);
+
+ if (changed.count("osd_max_backfills") ||
+ changed.count("osd_delete_sleep") ||
+ changed.count("osd_delete_sleep_hdd") ||
+ changed.count("osd_delete_sleep_ssd") ||
+ changed.count("osd_delete_sleep_hybrid") ||
+ changed.count("osd_snap_trim_sleep") ||
+ changed.count("osd_snap_trim_sleep_hdd") ||
+ changed.count("osd_snap_trim_sleep_ssd") ||
+ changed.count("osd_snap_trim_sleep_hybrid") ||
+ changed.count("osd_scrub_sleep") ||
+ changed.count("osd_recovery_sleep") ||
+ changed.count("osd_recovery_sleep_hdd") ||
+ changed.count("osd_recovery_sleep_ssd") ||
+ changed.count("osd_recovery_sleep_hybrid") ||
+ changed.count("osd_recovery_max_active") ||
+ changed.count("osd_recovery_max_active_hdd") ||
+ changed.count("osd_recovery_max_active_ssd")) {
+ if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" &&
+ cct->_conf.get_val<std::string>("osd_mclock_profile") != "custom") {
+ // Set ceph config option to meet QoS goals
+ // Set high value for recovery max active
+ uint32_t recovery_max_active = 1000;
+ if (cct->_conf->osd_recovery_max_active) {
+ cct->_conf.set_val(
+ "osd_recovery_max_active", std::to_string(recovery_max_active));
+ }
+ if (store_is_rotational) {
+ cct->_conf.set_val(
+ "osd_recovery_max_active_hdd", std::to_string(recovery_max_active));
+ } else {
+ cct->_conf.set_val(
+ "osd_recovery_max_active_ssd", std::to_string(recovery_max_active));
+ }
+ // Set high value for osd_max_backfill
+ cct->_conf.set_val("osd_max_backfills", std::to_string(1000));
+
+ // Disable recovery sleep
+ cct->_conf.set_val("osd_recovery_sleep", std::to_string(0));
+ cct->_conf.set_val("osd_recovery_sleep_hdd", std::to_string(0));
+ cct->_conf.set_val("osd_recovery_sleep_ssd", std::to_string(0));
+ cct->_conf.set_val("osd_recovery_sleep_hybrid", std::to_string(0));
+
+ // Disable delete sleep
+ cct->_conf.set_val("osd_delete_sleep", std::to_string(0));
+ cct->_conf.set_val("osd_delete_sleep_hdd", std::to_string(0));
+ cct->_conf.set_val("osd_delete_sleep_ssd", std::to_string(0));
+ cct->_conf.set_val("osd_delete_sleep_hybrid", std::to_string(0));
+
+ // Disable snap trim sleep
+ cct->_conf.set_val("osd_snap_trim_sleep", std::to_string(0));
+ cct->_conf.set_val("osd_snap_trim_sleep_hdd", std::to_string(0));
+ cct->_conf.set_val("osd_snap_trim_sleep_ssd", std::to_string(0));
+ cct->_conf.set_val("osd_snap_trim_sleep_hybrid", std::to_string(0));
+
+ // Disable scrub sleep
+ cct->_conf.set_val("osd_scrub_sleep", std::to_string(0));
+ } else {
+ service.local_reserver.set_max(cct->_conf->osd_max_backfills);
+ service.remote_reserver.set_max(cct->_conf->osd_max_backfills);
+ }
}
if (changed.count("osd_min_recovery_priority")) {
service.local_reserver.set_min_priority(cct->_conf->osd_min_recovery_priority);
dout(0) << __func__ << ": scrub interval change" << dendl;
}
check_config();
+ if (changed.count("osd_asio_thread_count")) {
+ service.poolctx.stop();
+ service.poolctx.start(conf.get_val<std::uint64_t>("osd_asio_thread_count"));
+ }
}
void OSD::update_log_config()
<< cct->_conf->osd_pg_epoch_persisted_max_stale << ")";
}
if (cct->_conf->osd_object_clean_region_max_num_intervals < 0) {
- clog->warn() << "osd_object_clean_region_max_num_intervals ("
+ clog->warn() << "osd_object_clean_region_max_num_intervals ("
<< cct->_conf->osd_object_clean_region_max_num_intervals
<< ") is < 0";
}
{
dout(10) << __func__ << " -- start" << dendl;
- C_SaferCond cond;
- service.objecter->wait_for_latest_osdmap(&cond);
- cond.wait();
+ boost::system::error_code ec;
+ service.objecter->wait_for_latest_osdmap(ceph::async::use_blocked[ec]);
dout(10) << __func__ << " -- finish" << dendl;
}
osdmap_lock{make_mutex(shard_name + "::osdmap_lock")},
shard_lock_name(shard_name + "::shard_lock"),
shard_lock{make_mutex(shard_lock_name)},
- scheduler(ceph::osd::scheduler::make_scheduler(cct)),
+ scheduler(ceph::osd::scheduler::make_scheduler(
+ cct, osd->num_shards, osd->store->is_rotational())),
context_queue(sdata_wait_lock, sdata_cond)
{
dout(0) << "using op scheduler " << *scheduler << dendl;
sdata->context_queue.move_to(oncommits);
}
- if (sdata->scheduler->empty()) {
+ WorkItem work_item;
+ while (!std::get_if<OpSchedulerItem>(&work_item)) {
+ if (sdata->scheduler->empty()) {
+ if (osd->is_stopping()) {
+ sdata->shard_lock.unlock();
+ for (auto c : oncommits) {
+ dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
+ delete c;
+ }
+ return; // OSD shutdown, discard.
+ }
+ sdata->shard_lock.unlock();
+ handle_oncommits(oncommits);
+ return;
+ }
+
+ work_item = sdata->scheduler->dequeue();
if (osd->is_stopping()) {
sdata->shard_lock.unlock();
for (auto c : oncommits) {
- dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
- delete c;
+ dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
+ delete c;
}
return; // OSD shutdown, discard.
}
- sdata->shard_lock.unlock();
- handle_oncommits(oncommits);
- return;
- }
- OpSchedulerItem item = sdata->scheduler->dequeue();
+ // If the work item is scheduled in the future, wait until
+ // the time returned in the dequeue response before retrying.
+ if (auto when_ready = std::get_if<double>(&work_item)) {
+ if (is_smallest_thread_index) {
+ sdata->shard_lock.unlock();
+ handle_oncommits(oncommits);
+ return;
+ }
+ std::unique_lock wait_lock{sdata->sdata_wait_lock};
+ auto future_time = ceph::real_clock::from_double(*when_ready);
+ dout(10) << __func__ << " dequeue future request at " << future_time << dendl;
+ sdata->shard_lock.unlock();
+ ++sdata->waiting_threads;
+ sdata->sdata_cond.wait_until(wait_lock, future_time);
+ --sdata->waiting_threads;
+ wait_lock.unlock();
+ sdata->shard_lock.lock();
+ }
+ } // while
+
+ // Access the stored item
+ auto item = std::move(std::get<OpSchedulerItem>(work_item));
if (osd->is_stopping()) {
sdata->shard_lock.unlock();
for (auto c : oncommits) {
sdata->scheduler->enqueue(std::move(item));
}
- if (empty) {
+ {
std::lock_guard l{sdata->sdata_wait_lock};
- sdata->sdata_cond.notify_all();
+ if (empty) {
+ sdata->sdata_cond.notify_all();
+ } else if (sdata->waiting_threads) {
+ sdata->sdata_cond.notify_one();
+ }
}
}
sdata->sdata_cond.notify_one();
}
-namespace ceph {
-namespace osd_cmds {
+namespace ceph::osd_cmds {
int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f,
std::ostream& os)
os << "could not issue heap profiler command -- not using tcmalloc!";
return -EOPNOTSUPP;
}
-
+
string cmd;
if (!cmd_getval(cmdmap, "heapcmd", cmd)) {
os << "unable to get value for command \"" << cmd << "\"";
return -EINVAL;
}
-
+
std::vector<std::string> cmd_vec;
get_str_vec(cmd, cmd_vec);
if (cmd_getval(cmdmap, "value", val)) {
cmd_vec.push_back(val);
}
-
+
ceph_heap_profiler_handle_command(cmd_vec, os);
-
+
return 0;
}
-
-}} // namespace ceph::osd_cmds
+
+} // namespace ceph::osd_cmds