#include "common/ceph_time.h"
#include "common/version.h"
#include "common/io_priority.h"
+#include "common/pick_address.h"
#include "os/ObjectStore.h"
#ifdef HAVE_LIBFUSE
recovery_sleep_lock("OSDService::recovery_sleep_lock"),
recovery_sleep_timer(cct, recovery_sleep_lock, false),
reserver_finisher(cct),
- local_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
+ local_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
cct->_conf->osd_min_recovery_priority),
- remote_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
+ remote_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
cct->_conf->osd_min_recovery_priority),
pg_temp_lock("OSDService::pg_temp_lock"),
snap_sleep_lock("OSDService::snap_sleep_lock"),
scrub_sleep_lock("OSDService::scrub_sleep_lock"),
scrub_sleep_timer(
osd->client_messenger->cct, scrub_sleep_lock, false /* relax locking */),
- snap_reserver(&reserver_finisher,
+ snap_reserver(cct, &reserver_finisher,
cct->_conf->osd_max_trimming_pgs),
recovery_lock("OSDService::recovery_lock"),
recovery_ops_active(0),
agent_lock.Unlock();
}
+void OSDService::request_osdmap_update(epoch_t e)
+{
+ osd->osdmap_subscribe(e, false);
+}
+
class AgentTimeoutCB : public Context {
PGRef pg;
public:
}
-void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
+void OSDService::queue_want_pg_temp(pg_t pgid,
+ const vector<int>& want,
+ bool forced)
{
Mutex::Locker l(pg_temp_lock);
- map<pg_t,vector<int> >::iterator p = pg_temp_pending.find(pgid);
+ auto p = pg_temp_pending.find(pgid);
if (p == pg_temp_pending.end() ||
- p->second != want) {
- pg_temp_wanted[pgid] = want;
+ p->second.acting != want ||
+ forced) {
+ pg_temp_wanted[pgid] = pg_temp_t{want, forced};
}
}
void OSDService::_sent_pg_temp()
{
- for (map<pg_t,vector<int> >::iterator p = pg_temp_wanted.begin();
- p != pg_temp_wanted.end();
- ++p)
- pg_temp_pending[p->first] = p->second;
+ pg_temp_pending.insert(make_move_iterator(begin(pg_temp_wanted)),
+ make_move_iterator(end(pg_temp_wanted)));
pg_temp_wanted.clear();
}
<< pg_temp_wanted.size() << dendl;
}
+std::ostream& operator<<(std::ostream& out,
+ const OSDService::pg_temp_t& pg_temp)
+{
+ out << pg_temp.acting;
+ if (pg_temp.forced) {
+ out << " (forced)";
+ }
+ return out;
+}
+
void OSDService::send_pg_temp()
{
Mutex::Locker l(pg_temp_lock);
if (pg_temp_wanted.empty())
return;
dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
- MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
- m->pg_temp = pg_temp_wanted;
- monc->send_mon_message(m);
+ MOSDPGTemp *ms[2] = {nullptr, nullptr};
+ for (auto& pg_temp : pg_temp_wanted) {
+ auto& m = ms[pg_temp.second.forced];
+ if (!m) {
+ m = new MOSDPGTemp(osdmap->get_epoch());
+ m->forced = pg_temp.second.forced;
+ }
+ m->pg_temp.emplace(pg_temp.first,
+ pg_temp.second.acting);
+ }
+ for (auto m : ms) {
+ if (m) {
+ monc->send_mon_message(m);
+ }
+ }
_sent_pg_temp();
}
if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
dout(20) << __func__ << " " << scrubs_pending << " -> " << (scrubs_pending+1)
- << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+ << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active
+ << ")" << dendl;
can_inc = true;
} else {
- dout(20) << __func__ << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
+ dout(20) << __func__ << " " << scrubs_pending << " + " << scrubs_active
+ << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
}
return can_inc;
MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
OSDSuperblock& sblock)
{
- MOSDMap *m = new MOSDMap(monc->get_fsid());
+ MOSDMap *m = new MOSDMap(monc->get_fsid(),
+ osdmap->get_encoding_features());
m->oldest_map = max_oldest_map;
m->newest_map = sblock.newest_map;
OSDSuperblock sblock(get_superblock());
if (since < sblock.oldest_map) {
// just send latest full map
- MOSDMap *m = new MOSDMap(monc->get_fsid());
+ MOSDMap *m = new MOSDMap(monc->get_fsid(),
+ osdmap->get_encoding_features());
m->oldest_map = max_oldest_map;
m->newest_map = sblock.newest_map;
get_map_bl(to, m->maps[to]);
waiter.wait();
}
- ret = write_meta(store, sb.cluster_fsid, sb.osd_fsid, whoami);
+ ret = write_meta(cct, store, sb.cluster_fsid, sb.osd_fsid, whoami);
if (ret) {
derr << "OSD::mkfs: failed to write fsid file: error "
<< cpp_strerror(ret) << dendl;
return ret;
}
-int OSD::write_meta(ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami)
+int OSD::write_meta(CephContext *cct, ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami)
{
char val[80];
int r;
if (r < 0)
return r;
+ string key = cct->_conf->get_val<string>("key");
+ if (key.size()) {
+ r = store->write_meta("osd_key", key);
+ if (r < 0)
+ return r;
+ } else {
+ string keyfile = cct->_conf->get_val<string>("keyfile");
+ if (!keyfile.empty()) {
+ bufferlist keybl;
+ string err;
+ if (keyfile == "-") {
+ static_assert(1024 * 1024 >
+ (sizeof(CryptoKey) - sizeof(bufferptr) +
+ sizeof(__u16) + 16 /* AES_KEY_LEN */ + 3 - 1) / 3. * 4.,
+ "1MB should be enough for a base64 encoded CryptoKey");
+ r = keybl.read_fd(STDIN_FILENO, 1024 * 1024);
+ } else {
+ r = keybl.read_file(keyfile.c_str(), &err);
+ }
+ if (r < 0) {
+ derr << __func__ << " failed to read keyfile " << keyfile << ": "
+ << err << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ r = store->write_meta("osd_key", keybl.to_str());
+ if (r < 0)
+ return r;
+ }
+ }
+
r = store->write_meta("ready", "ready");
if (r < 0)
return r;
disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"),
command_tp(cct, "OSD::command_tp", "tp_osd_cmd", 1),
session_waiting_lock("OSD::session_waiting_lock"),
+ osdmap_subscribe_lock("OSD::osdmap_subscribe_lock"),
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false),
heartbeat_need_update(true),
update_log_config();
peering_tp.start();
+
+ service.init();
+ service.publish_map(osdmap);
+ service.publish_superblock(superblock);
+ service.max_oldest_map = superblock.oldest_map;
+
osd_op_tp.start();
disk_tp.start();
command_tp.start();
tick_timer_without_osd_lock.add_event_after(cct->_conf->osd_heartbeat_interval, new C_Tick_WithoutOSDLock(this));
}
- service.init();
- service.publish_map(osdmap);
- service.publish_superblock(superblock);
- service.max_oldest_map = superblock.oldest_map;
-
osd_lock.Unlock();
r = monc->authenticate();
};
+ // All the basic OSD operation stats are to be considered useful
+ osd_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
+
osd_plb.add_u64(
l_osd_op_wip, "op_wip",
"Replication operations currently being processed (primary)");
l_osd_op_rw_prepare_lat, "op_rw_prepare_latency",
"Latency of read-modify-write operations (excluding queue time and wait for finished)");
+ // Now we move on to some more obscure stats, revert to assuming things
+ // are low priority unless otherwise specified.
+ osd_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
+
osd_plb.add_time_avg(l_osd_op_before_queue_op_lat, "op_before_queue_op_lat",
"Latency of IO before calling queue(before really queue into ShardedOpWq)"); // client io before queue op_wq latency
osd_plb.add_time_avg(l_osd_op_before_dequeue_op_lat, "op_before_dequeue_op_lat",
osd_plb.add_u64(
l_osd_pg_stray, "numpg_stray",
"Placement groups ready to be deleted from this osd");
+ osd_plb.add_u64(
+ l_osd_pg_removing, "numpg_removing",
+ "Placement groups queued for local deletion", "pgsr",
+ PerfCountersBuilder::PRIO_USEFUL);
osd_plb.add_u64(
l_osd_hb_to, "heartbeat_to_peers", "Heartbeat (ping) peers we send to");
osd_plb.add_u64_counter(l_osd_map, "map_messages", "OSD map messages");
l_osd_map_bl_cache_miss, "osd_map_bl_cache_miss",
"OSDMap buffer cache misses");
- osd_plb.add_u64(l_osd_stat_bytes, "stat_bytes", "OSD size");
- osd_plb.add_u64(l_osd_stat_bytes_used, "stat_bytes_used", "Used space");
+ osd_plb.add_u64(
+ l_osd_stat_bytes, "stat_bytes", "OSD size", "size",
+ PerfCountersBuilder::PRIO_USEFUL);
+ osd_plb.add_u64(
+ l_osd_stat_bytes_used, "stat_bytes_used", "Used space", "used",
+ PerfCountersBuilder::PRIO_USEFUL);
osd_plb.add_u64(l_osd_stat_bytes_avail, "stat_bytes_avail", "Available space");
osd_plb.add_u64_counter(
set_state(STATE_STOPPING);
// Debugging
- cct->_conf->set_val("debug_osd", "100");
- cct->_conf->set_val("debug_journal", "100");
- cct->_conf->set_val("debug_filestore", "100");
- cct->_conf->set_val("debug_ms", "100");
- cct->_conf->apply_changes(NULL);
+ if (cct->_conf->get_val<bool>("osd_debug_shutdown")) {
+ cct->_conf->set_val("debug_osd", "100");
+ cct->_conf->set_val("debug_journal", "100");
+ cct->_conf->set_val("debug_filestore", "100");
+ cct->_conf->set_val("debug_bluestore", "100");
+ cct->_conf->set_val("debug_ms", "100");
+ cct->_conf->apply_changes(NULL);
+ }
// stop MgrClient earlier as it's more like an internal consumer of OSD
mgrc.shutdown();
pg->upgrade(store);
}
+ if (pg->dne()) {
+ dout(10) << "load_pgs " << *it << " deleting dne" << dendl;
+ pg->ch = nullptr;
+ service.pg_remove_epoch(pg->pg_id);
+ pg->unlock();
+ {
+ // Delete pg
+ RWLock::WLocker l(pg_map_lock);
+ auto p = pg_map.find(pg->get_pgid());
+ assert(p != pg_map.end() && p->second == pg);
+ dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl;
+ pg_map.erase(p);
+ pg->put("PGMap");
+ }
+ recursive_remove_collection(cct, store, pgid, *it);
+ continue;
+ }
+
service.init_splits_between(pg->info.pgid, pg->get_osdmap(), osdmap);
// generate state for PG's current mapping
++i) {
PG *pg = i->second;
+ // Ignore PGs only partially created (DNE)
+ if (pg->info.dne()) {
+ continue;
+ }
+
auto rpib = pg->get_required_past_interval_bounds(
pg->info,
superblock.oldest_map);
ceph_abort();
}
+ const bool is_mon_create =
+ evt->get_event().dynamic_type() == PG::NullEvt::static_type();
+ if (maybe_wait_for_max_pg(pgid, is_mon_create)) {
+ return -EAGAIN;
+ }
// do we need to resurrect a deleting pg?
spg_t resurrected;
PGRef old_pg_state;
}
}
+bool OSD::maybe_wait_for_max_pg(spg_t pgid, bool is_mon_create)
+{
+ const auto max_pgs_per_osd =
+ (cct->_conf->get_val<uint64_t>("mon_max_pg_per_osd") *
+ cct->_conf->get_val<double>("osd_max_pg_per_osd_hard_ratio"));
+
+ RWLock::RLocker pg_map_locker{pg_map_lock};
+ if (pg_map.size() < max_pgs_per_osd) {
+ return false;
+ }
+ lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+ if (is_mon_create) {
+ pending_creates_from_mon++;
+ } else {
+ bool is_primary = osdmap->get_pg_acting_rank(pgid.pgid, whoami) == 0;
+ pending_creates_from_osd.emplace(pgid.pgid, is_primary);
+ }
+ dout(5) << __func__ << " withhold creation of pg " << pgid
+ << ": " << pg_map.size() << " >= "<< max_pgs_per_osd << dendl;
+ return true;
+}
+
+// to re-trigger a peering, we have to twiddle the pg mapping a little bit,
+// see PG::should_restart_peering(). OSDMap::pg_to_up_acting_osds() will turn
+// to up set if pg_temp is empty. so an empty pg_temp won't work.
+static vector<int32_t> twiddle(const vector<int>& acting) {
+ if (acting.size() > 1) {
+ return {acting[0]};
+ } else {
+ vector<int32_t> twiddled(acting.begin(), acting.end());
+ twiddled.push_back(-1);
+ return twiddled;
+ }
+}
+
+void OSD::resume_creating_pg()
+{
+ bool do_sub_pg_creates = false;
+ bool have_pending_creates = false;
+ {
+ const auto max_pgs_per_osd =
+ (cct->_conf->get_val<uint64_t>("mon_max_pg_per_osd") *
+ cct->_conf->get_val<double>("osd_max_pg_per_osd_hard_ratio"));
+ RWLock::RLocker l(pg_map_lock);
+ if (max_pgs_per_osd <= pg_map.size()) {
+ // this could happen if admin decreases this setting before a PG is removed
+ return;
+ }
+ unsigned spare_pgs = max_pgs_per_osd - pg_map.size();
+ lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+ if (pending_creates_from_mon > 0) {
+ do_sub_pg_creates = true;
+ if (pending_creates_from_mon >= spare_pgs) {
+ spare_pgs = pending_creates_from_mon = 0;
+ } else {
+ spare_pgs -= pending_creates_from_mon;
+ pending_creates_from_mon = 0;
+ }
+ }
+ auto pg = pending_creates_from_osd.cbegin();
+ while (spare_pgs > 0 && pg != pending_creates_from_osd.cend()) {
+ dout(20) << __func__ << " pg " << pg->first << dendl;
+ vector<int> acting;
+ osdmap->pg_to_up_acting_osds(pg->first, nullptr, nullptr, &acting, nullptr);
+ service.queue_want_pg_temp(pg->first, twiddle(acting), true);
+ pg = pending_creates_from_osd.erase(pg);
+ do_sub_pg_creates = true;
+ spare_pgs--;
+ }
+ have_pending_creates = (pending_creates_from_mon > 0 ||
+ !pending_creates_from_osd.empty());
+ }
+
+ bool do_renew_subs = false;
+ if (do_sub_pg_creates) {
+ if (monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0)) {
+ dout(4) << __func__ << ": resolicit pg creates from mon since "
+ << last_pg_create_epoch << dendl;
+ do_renew_subs = true;
+ }
+ }
+ version_t start = osdmap->get_epoch() + 1;
+ if (have_pending_creates) {
+ // don't miss any new osdmap deleting PGs
+ if (monc->sub_want("osdmap", start, 0)) {
+ dout(4) << __func__ << ": resolicit osdmap from mon since "
+ << start << dendl;
+ do_renew_subs = true;
+ }
+ } else if (do_sub_pg_creates) {
+ // no need to subscribe the osdmap continuously anymore
+ // once the pgtemp and/or mon_subscribe(pg_creates) is sent
+ if (monc->sub_want_increment("osdmap", start, CEPH_SUBSCRIBE_ONETIME)) {
+ dout(4) << __func__ << ": re-subscribe osdmap(onetime) since"
+ << start << dendl;
+ do_renew_subs = true;
+ }
+ }
+
+ if (do_renew_subs) {
+ monc->renew_subs();
+ }
+
+ service.send_pg_temp();
+}
void OSD::build_initial_pg_history(
spg_t pgid,
&debug);
if (new_interval) {
h->same_interval_since = e;
- }
- if (up != new_up) {
- h->same_up_since = e;
- }
- if (acting_primary != new_acting_primary) {
- h->same_primary_since = e;
- }
- if (pgid.pgid.is_split(lastmap->get_pg_num(pgid.pgid.pool()),
- osdmap->get_pg_num(pgid.pgid.pool()),
- nullptr)) {
- h->last_epoch_split = e;
+ if (up != new_up) {
+ h->same_up_since = e;
+ }
+ if (acting_primary != new_acting_primary) {
+ h->same_primary_since = e;
+ }
+ if (pgid.pgid.is_split(lastmap->get_pg_num(pgid.pgid.pool()),
+ osdmap->get_pg_num(pgid.pgid.pool()),
+ nullptr)) {
+ h->last_epoch_split = e;
+ }
+ up = new_up;
+ acting = new_acting;
+ up_primary = new_up_primary;
+ acting_primary = new_acting_primary;
}
lastmap = osdmap;
}
logger->set(l_osd_cached_crc, buffer::get_cached_crc());
logger->set(l_osd_cached_crc_adjusted, buffer::get_cached_crc_adjusted());
logger->set(l_osd_missed_crc, buffer::get_missed_crc());
+ logger->set(l_osd_pg_removing, remove_wq.get_remove_queue_len());
// osd_lock is not being held, which means the OSD state
// might change when doing the monitor report
sched_scrub();
}
service.promote_throttle_recalibrate();
+ resume_creating_pg();
bool need_send_beacon = false;
const auto now = ceph::coarse_mono_clock::now();
{
}
}
- check_ops_in_flight();
+ mgrc.update_osd_health(get_health_metrics());
service.kick_recovery_queue();
tick_timer_without_osd_lock.add_event_after(OSD_TICK_INTERVAL, new C_Tick_WithoutOSDLock(this));
}
if (pool < 0 && isdigit(poolstr[0]))
pool = atoll(poolstr.c_str());
if (pool < 0) {
- ss << "Invalid pool" << poolstr;
+ ss << "Invalid pool '" << poolstr << "''";
return;
}
if (osdmap->get_epoch() == 0) {
derr << "waiting for initial osdmap" << dendl;
} else if (osdmap->is_destroyed(whoami)) {
- derr << "osdmap says I am destroyed, exiting" << dendl;
- exit(0);
+ derr << "osdmap says I am destroyed" << dendl;
+ // provide a small margin so we don't livelock seeing if we
+ // un-destroyed ourselves.
+ if (osdmap->get_epoch() > newest - 1) {
+ exit(0);
+ }
} else if (osdmap->test_flag(CEPH_OSDMAP_NOUP) || osdmap->is_noup(whoami)) {
derr << "osdmap NOUP flag is set, waiting for it to clear" << dendl;
} else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
dout(1) << "start_waiting_for_healthy" << dendl;
set_state(STATE_WAITING_FOR_HEALTHY);
last_heartbeat_resample = utime_t();
+
+ // subscribe to osdmap updates, in case our peers really are known to be dead
+ osdmap_subscribe(osdmap->get_epoch() + 1, false);
}
bool OSD::_is_healthy()
collect_sys_info(pm, cct);
+ std::string front_iface, back_iface;
+ /*
+ pick_iface(cct,
+ CEPH_PICK_ADDRESS_PUBLIC | CEPH_PICK_ADDRESS_CLUSTER,
+ &front_iface, &back_iface);
+ */
+ (*pm)["front_iface"] = pick_iface(cct,
+ client_messenger->get_myaddr().get_sockaddr_storage());
+ (*pm)["back_iface"] = pick_iface(cct,
+ cluster_messenger->get_myaddr().get_sockaddr_storage());
+
dout(10) << __func__ << " " << *pm << dendl;
}
}
-bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
- int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
- bool& isvalid, CryptoKey& session_key)
+bool OSD::ms_verify_authorizer(
+ Connection *con, int peer_type,
+ int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
+ bool& isvalid, CryptoKey& session_key,
+ std::unique_ptr<AuthAuthorizerChallenge> *challenge)
{
AuthAuthorizeHandler *authorize_handler = 0;
switch (peer_type) {
isvalid = authorize_handler->verify_authorizer(
cct, keys,
authorizer_data, authorizer_reply, name, global_id, caps_info, session_key,
- &auid);
+ &auid, challenge);
} else {
dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
isvalid = false;
struct tm bdt;
time_t tt = now.sec();
localtime_r(&tt, &bdt);
+
+ bool day_permit = false;
+ if (cct->_conf->osd_scrub_begin_week_day < cct->_conf->osd_scrub_end_week_day) {
+ if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day && bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
+ day_permit = true;
+ }
+ } else {
+ if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day || bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
+ day_permit = true;
+ }
+ }
+
+ if (!day_permit) {
+ dout(20) << __func__ << " should run between week day " << cct->_conf->osd_scrub_begin_week_day
+ << " - " << cct->_conf->osd_scrub_end_week_day
+ << " now " << bdt.tm_wday << " = no" << dendl;
+ return false;
+ }
+
bool time_permit = false;
if (cct->_conf->osd_scrub_begin_hour < cct->_conf->osd_scrub_end_hour) {
if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour && bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
if (!service.can_inc_scrubs_pending()) {
return;
}
+ if (!cct->_conf->osd_scrub_during_recovery && service.is_recovery_active()) {
+ dout(20) << __func__ << " not scheduling scrubs due to active recovery" << dendl;
+ return;
+ }
+
utime_t now = ceph_clock_now();
bool time_permit = scrub_time_permit(now);
break;
}
- if (!cct->_conf->osd_scrub_during_recovery && service.is_recovery_active()) {
- dout(10) << __func__ << "not scheduling scrub of " << scrub.pgid << " due to active recovery ops" << dendl;
- break;
- }
-
if ((scrub.deadline >= now) && !(time_permit && load_is_low)) {
dout(10) << __func__ << " not scheduling scrub for " << scrub.pgid << " due to "
<< (!time_permit ? "time not permit" : "high load") << dendl;
+vector<OSDHealthMetric> OSD::get_health_metrics()
+{
+ vector<OSDHealthMetric> metrics;
+ lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+ auto n_primaries = pending_creates_from_mon;
+ for (const auto& create : pending_creates_from_osd) {
+ if (create.second) {
+ n_primaries++;
+ }
+ }
+ metrics.emplace_back(osd_metric::PENDING_CREATING_PGS, n_primaries);
+ return metrics;
+}
+
// =====================================================
// MAP
void OSD::osdmap_subscribe(version_t epoch, bool force_request)
{
- OSDMapRef osdmap = service.get_osdmap();
- if (osdmap->get_epoch() >= epoch)
+ Mutex::Locker l(osdmap_subscribe_lock);
+ if (latest_subscribed_epoch >= epoch && !force_request)
return;
+ latest_subscribed_epoch = MAX(epoch, latest_subscribed_epoch);
+
if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
force_request) {
monc->renew_subs();
assert(osd_lock.is_locked());
dout(7) << "consume_map version " << osdmap->get_epoch() << dendl;
+ /** make sure the cluster is speaking in SORTBITWISE, because we don't
+ * speak the older sorting version any more. Be careful not to force
+ * a shutdown if we are merely processing old maps, though.
+ */
+ if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE) && is_active()) {
+ derr << __func__ << " SORTBITWISE flag is not set" << dendl;
+ ceph_abort();
+ }
+
int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0;
list<PGRef> to_remove;
pg->unlock();
}
+
+ lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+ for (auto pg = pending_creates_from_osd.cbegin();
+ pg != pending_creates_from_osd.cend();) {
+ if (osdmap->get_pg_acting_rank(pg->first, whoami) < 0) {
+ pg = pending_creates_from_osd.erase(pg);
+ } else {
+ ++pg;
+ }
+ }
}
for (list<PGRef>::iterator i = to_remove.begin();
logger->set(l_osd_pg_primary, num_pg_primary);
logger->set(l_osd_pg_replica, num_pg_replica);
logger->set(l_osd_pg_stray, num_pg_stray);
+ logger->set(l_osd_pg_removing, remove_wq.get_remove_queue_len());
}
void OSD::activate_map()
dout(7) << "activate_map version " << osdmap->get_epoch() << dendl;
- if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
- derr << __func__ << " SORTBITWISE flag is not set" << dendl;
- ceph_abort();
- }
-
if (osdmap->test_flag(CEPH_OSDMAP_FULL)) {
dout(10) << " osdmap flagged full, doing onetime osdmap subscribe" << dendl;
osdmap_subscribe(osdmap->get_epoch() + 1, false);
<< dendl;
continue;
}
-
if (handle_pg_peering_evt(
pgid,
history,
service.send_pg_created(pgid.pgid);
}
}
- last_pg_create_epoch = m->epoch;
+ {
+ lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+ if (pending_creates_from_mon == 0) {
+ last_pg_create_epoch = m->epoch;
+ }
+ }
maybe_update_heartbeat_peers();
}
continue;
}
service.share_map_peer(it->first, con.get(), curmap);
- dout(7) << __func__ << " osd " << it->first
+ dout(7) << __func__ << " osd." << it->first
<< " on " << it->second.size() << " PGs" << dendl;
MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
it->second);
m->query_epoch,
PG::RemoteBackfillReserved()));
} else if (m->type == MBackfillReserve::REJECT) {
+ // NOTE: this is replica -> primary "i reject your request"
+ // and also primary -> replica "cancel my previously-granted request"
evt = PG::CephPeeringEvtRef(
new PG::CephPeeringEvt(
m->query_epoch,
pg->put("PGMap"); // since we've taken it out of map
}
-
// =========================================================
// RECOVERY
i->lock();
int pgstate = i->get_state();
if ( ((newstate == PG_STATE_FORCED_RECOVERY) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING))) ||
- ((newstate == PG_STATE_FORCED_BACKFILL) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILL))) )
+ ((newstate == PG_STATE_FORCED_BACKFILL) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING))) )
i->_change_recovery_force_mode(newstate, false);
i->unlock();
}
* queue_recovery_after_sleep.
*/
float recovery_sleep = get_osd_recovery_sleep();
- if (recovery_sleep > 0 && service.recovery_needs_sleep) {
- PGRef pgref(pg);
- auto recovery_requeue_callback = new FunctionContext([this, pgref, queued, reserved_pushes](int r) {
- dout(20) << "do_recovery wake up at "
- << ceph_clock_now()
- << ", re-queuing recovery" << dendl;
- service.recovery_needs_sleep = false;
- service.queue_recovery_after_sleep(pgref.get(), queued, reserved_pushes);
- });
+ {
Mutex::Locker l(service.recovery_sleep_lock);
-
- // This is true for the first recovery op and when the previous recovery op
- // has been scheduled in the past. The next recovery op is scheduled after
- // completing the sleep from now.
- if (service.recovery_schedule_time < ceph_clock_now()) {
- service.recovery_schedule_time = ceph_clock_now();
- }
- service.recovery_schedule_time += recovery_sleep;
- service.recovery_sleep_timer.add_event_at(service.recovery_schedule_time,
- recovery_requeue_callback);
- dout(20) << "Recovery event scheduled at "
- << service.recovery_schedule_time << dendl;
- return;
+ if (recovery_sleep > 0 && service.recovery_needs_sleep) {
+ PGRef pgref(pg);
+ auto recovery_requeue_callback = new FunctionContext([this, pgref, queued, reserved_pushes](int r) {
+ dout(20) << "do_recovery wake up at "
+ << ceph_clock_now()
+ << ", re-queuing recovery" << dendl;
+ Mutex::Locker l(service.recovery_sleep_lock);
+ service.recovery_needs_sleep = false;
+ service.queue_recovery_after_sleep(pgref.get(), queued, reserved_pushes);
+ });
+
+ // This is true for the first recovery op and when the previous recovery op
+ // has been scheduled in the past. The next recovery op is scheduled after
+ // completing the sleep from now.
+ if (service.recovery_schedule_time < ceph_clock_now()) {
+ service.recovery_schedule_time = ceph_clock_now();
+ }
+ service.recovery_schedule_time += recovery_sleep;
+ service.recovery_sleep_timer.add_event_at(service.recovery_schedule_time,
+ recovery_requeue_callback);
+ dout(20) << "Recovery event scheduled at "
+ << service.recovery_schedule_time << dendl;
+ return;
+ }
}
{
- service.recovery_needs_sleep = true;
+ {
+ Mutex::Locker l(service.recovery_sleep_lock);
+ service.recovery_needs_sleep = true;
+ }
+
if (pg->pg_has_reset_since(queued)) {
goto out;
}
pg->discover_all_missing(*rctx.query_map);
if (rctx.query_map->empty()) {
string action;
- if (pg->state_test(PG_STATE_BACKFILL)) {
+ if (pg->state_test(PG_STATE_BACKFILLING)) {
auto evt = PG::CephPeeringEvtRef(new PG::CephPeeringEvt(
queued,
queued,
- PG::CancelBackfill()));
+ PG::DeferBackfill(cct->_conf->osd_recovery_retry_interval)));
pg->queue_peering_event(evt);
action = "in backfill";
} else if (pg->state_test(PG_STATE_RECOVERING)) {
auto evt = PG::CephPeeringEvtRef(new PG::CephPeeringEvt(
queued,
queued,
- PG::CancelRecovery()));
+ PG::DeferRecovery(cct->_conf->osd_recovery_retry_interval)));
pg->queue_peering_event(evt);
action = "in recovery";
} else {
bool OSDService::is_recovery_active()
{
- Mutex::Locker l(recovery_lock);
- return recovery_ops_active > 0;
+ return local_reserver.has_reservation() || remote_reserver.has_reservation();
}
// =========================================================
uint32_t shard_index = pgid.hash_to_shard(shard_list.size());
auto sdata = shard_list[shard_index];
bool queued = false;
- unsigned pushes_to_free = 0;
{
Mutex::Locker l(sdata->sdata_op_ordering_lock);
auto p = sdata->pg_slots.find(pgid);
++i) {
sdata->_enqueue_front(make_pair(pgid, *i), osd->op_prio_cutoff);
}
- for (auto& q : p->second.to_process) {
- pushes_to_free += q.get_reserved_pushes();
- }
p->second.to_process.clear();
p->second.waiting_for_pg = false;
++p->second.requeue_seq;
queued = true;
}
}
- if (pushes_to_free > 0) {
- osd->service.release_reserved_pushes(pushes_to_free);
- }
if (queued) {
sdata->sdata_lock.Lock();
sdata->sdata_cond.SignalOne();