#include "common/errno.h"
#include "common/ceph_argparse.h"
+#include "common/ceph_time.h"
#include "common/version.h"
#include "common/io_priority.h"
+#include "common/pick_address.h"
#include "os/ObjectStore.h"
#ifdef HAVE_LIBFUSE
#include "messages/MLog.h"
#include "messages/MGenericMessage.h"
-#include "messages/MPing.h"
#include "messages/MOSDPing.h"
#include "messages/MOSDFailure.h"
#include "messages/MOSDMarkMeDown.h"
#include "messages/MOSDPGBackfill.h"
#include "messages/MBackfillReserve.h"
#include "messages/MRecoveryReserve.h"
+#include "messages/MOSDForceRecovery.h"
#include "messages/MOSDECSubOpWrite.h"
#include "messages/MOSDECSubOpWriteReply.h"
#include "messages/MOSDECSubOpRead.h"
#undef dout_prefix
#define dout_prefix _prefix(_dout, whoami, get_osdmap_epoch())
+
const double OSD::OSD_TICK_INTERVAL = 1.0;
static ostream& _prefix(std::ostream* _dout, int whoami, epoch_t epoch) {
return *_dout << "osd." << whoami << " " << epoch << " ";
}
-void PGQueueable::RunVis::operator()(const OpRequestRef &op) {
- return osd->dequeue_op(pg, op, handle);
-}
-
-void PGQueueable::RunVis::operator()(const PGSnapTrim &op) {
- return pg->snap_trimmer(op.epoch_queued);
-}
-
-void PGQueueable::RunVis::operator()(const PGScrub &op) {
- return pg->scrub(op.epoch_queued, handle);
-}
-
-void PGQueueable::RunVis::operator()(const PGRecovery &op) {
- return osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle);
-}
-
//Initial features in new superblock.
//Features here are also automatically upgraded
CompatSet OSD::get_osd_initial_compat_set() {
ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA);
ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING);
ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES);
return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
ceph_osd_feature_incompat);
}
next_notif_id(0),
recovery_request_lock("OSDService::recovery_request_lock"),
recovery_request_timer(cct, recovery_request_lock, false),
+ recovery_sleep_lock("OSDService::recovery_sleep_lock"),
+ recovery_sleep_timer(cct, recovery_sleep_lock, false),
reserver_finisher(cct),
- local_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
+ local_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
cct->_conf->osd_min_recovery_priority),
- remote_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
+ remote_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
cct->_conf->osd_min_recovery_priority),
pg_temp_lock("OSDService::pg_temp_lock"),
snap_sleep_lock("OSDService::snap_sleep_lock"),
snap_sleep_timer(
osd->client_messenger->cct, snap_sleep_lock, false /* relax locking */),
- snap_reserver(&reserver_finisher,
+ scrub_sleep_lock("OSDService::scrub_sleep_lock"),
+ scrub_sleep_timer(
+ osd->client_messenger->cct, scrub_sleep_lock, false /* relax locking */),
+ snap_reserver(cct, &reserver_finisher,
cct->_conf->osd_max_trimming_pgs),
recovery_lock("OSDService::recovery_lock"),
recovery_ops_active(0),
delete objecter;
}
+
+
+#ifdef PG_DEBUG_REFS
+void OSDService::add_pgid(spg_t pgid, PG *pg){
+ Mutex::Locker l(pgid_lock);
+ if (!pgid_tracker.count(pgid)) {
+ live_pgs[pgid] = pg;
+ }
+ pgid_tracker[pgid]++;
+}
+void OSDService::remove_pgid(spg_t pgid, PG *pg)
+{
+ Mutex::Locker l(pgid_lock);
+ assert(pgid_tracker.count(pgid));
+ assert(pgid_tracker[pgid] > 0);
+ pgid_tracker[pgid]--;
+ if (pgid_tracker[pgid] == 0) {
+ pgid_tracker.erase(pgid);
+ live_pgs.erase(pgid);
+ }
+}
+void OSDService::dump_live_pgids()
+{
+ Mutex::Locker l(pgid_lock);
+ derr << "live pgids:" << dendl;
+ for (map<spg_t, int>::const_iterator i = pgid_tracker.cbegin();
+ i != pgid_tracker.cend();
+ ++i) {
+ derr << "\t" << *i << dendl;
+ live_pgs[i->first]->dump_live_ids();
+ }
+}
+#endif
+
+
void OSDService::_start_split(spg_t parent, const set<spg_t> &children)
{
for (set<spg_t>::const_iterator i = children.begin();
Mutex::Locker l(agent_timer_lock);
agent_timer.shutdown();
}
+
+ {
+ Mutex::Locker l(recovery_sleep_lock);
+ recovery_sleep_timer.shutdown();
+ }
}
-void OSDService::shutdown()
+void OSDService::shutdown_reserver()
{
reserver_finisher.wait_for_empty();
reserver_finisher.stop();
+}
+
+void OSDService::shutdown()
+{
{
Mutex::Locker l(watch_lock);
watch_timer.shutdown();
snap_sleep_timer.shutdown();
}
+ {
+ Mutex::Locker l(scrub_sleep_lock);
+ scrub_sleep_timer.shutdown();
+ }
+
osdmap = OSDMapRef();
next_osdmap = OSDMapRef();
}
watch_timer.init();
agent_timer.init();
snap_sleep_timer.init();
+ scrub_sleep_timer.init();
agent_thread.create("osd_srv_agent");
agent_lock.Unlock();
}
+void OSDService::request_osdmap_update(epoch_t e)
+{
+ osd->osdmap_subscribe(e, false);
+}
+
class AgentTimeoutCB : public Context {
PGRef pg;
public:
return full_ratio;
}
-void OSDService::check_full_status(const osd_stat_t &osd_stat)
+void OSDService::check_full_status(float ratio)
{
Mutex::Locker l(full_status_lock);
- float ratio = ((float)osd_stat.kb_used) / ((float)osd_stat.kb);
cur_ratio = ratio;
// The OSDMap ratios take precendence. So if the failsafe is .95 and
float full_ratio = std::max(osdmap->get_full_ratio(), backfillfull_ratio);
float failsafe_ratio = std::max(get_failsafe_full_ratio(), full_ratio);
- if (!osdmap->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+ if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
// use the failsafe for nearfull and full; the mon isn't using the
// flags anyway because we're mid-upgrade.
full_ratio = failsafe_ratio;
dout(10) << __func__ << " " << get_full_state_name(cur_state)
<< " -> " << get_full_state_name(new_state) << dendl;
if (new_state == FAILSAFE) {
- clog->error() << "failsafe engaged, dropping updates, now "
+ clog->error() << "full status failsafe engaged, dropping updates, now "
<< (int)roundf(ratio * 100) << "% full";
} else if (cur_state == FAILSAFE) {
- clog->error() << "failsafe disengaged, no longer dropping updates, now "
- << (int)roundf(ratio * 100) << "% full";
+ clog->error() << "full status failsafe disengaged, no longer dropping "
+ << "updates, now " << (int)roundf(ratio * 100) << "% full";
}
cur_state = new_state;
}
injectfull = count;
}
-void OSDService::update_osd_stat(vector<int>& hb_peers)
+osd_stat_t OSDService::set_osd_stat(const struct store_statfs_t &stbuf,
+ vector<int>& hb_peers,
+ int num_pgs)
{
- Mutex::Locker lock(stat_lock);
+ uint64_t bytes = stbuf.total;
+ uint64_t used = bytes - stbuf.available;
+ uint64_t avail = stbuf.available;
- osd_stat.hb_peers.swap(hb_peers);
+ osd->logger->set(l_osd_stat_bytes, bytes);
+ osd->logger->set(l_osd_stat_bytes_used, used);
+ osd->logger->set(l_osd_stat_bytes_avail, avail);
- osd->op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist);
+ {
+ Mutex::Locker l(stat_lock);
+ osd_stat.hb_peers.swap(hb_peers);
+ osd->op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist);
+ osd_stat.kb = bytes >> 10;
+ osd_stat.kb_used = used >> 10;
+ osd_stat.kb_avail = avail >> 10;
+ osd_stat.num_pgs = num_pgs;
+ return osd_stat;
+ }
+}
- // fill in osd stats too
+void OSDService::update_osd_stat(vector<int>& hb_peers)
+{
+ // load osd stats first
struct store_statfs_t stbuf;
int r = osd->store->statfs(&stbuf);
if (r < 0) {
return;
}
- uint64_t bytes = stbuf.total;
- uint64_t used = bytes - stbuf.available;
- uint64_t avail = stbuf.available;
-
- osd_stat.kb = bytes >> 10;
- osd_stat.kb_used = used >> 10;
- osd_stat.kb_avail = avail >> 10;
-
- osd->logger->set(l_osd_stat_bytes, bytes);
- osd->logger->set(l_osd_stat_bytes_used, used);
- osd->logger->set(l_osd_stat_bytes_avail, avail);
-
- dout(20) << "update_osd_stat " << osd_stat << dendl;
-
- check_full_status(osd_stat);
+ auto new_stat = set_osd_stat(stbuf, hb_peers, osd->get_num_pgs());
+ dout(20) << "update_osd_stat " << new_stat << dendl;
+ assert(new_stat.kb);
+ float ratio = ((float)new_stat.kb_used) / ((float)new_stat.kb);
+ check_full_status(ratio);
}
bool OSDService::check_osdmap_full(const set<pg_shard_t> &missing_on)
}
-void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
+void OSDService::queue_want_pg_temp(pg_t pgid,
+ const vector<int>& want,
+ bool forced)
{
Mutex::Locker l(pg_temp_lock);
- map<pg_t,vector<int> >::iterator p = pg_temp_pending.find(pgid);
+ auto p = pg_temp_pending.find(pgid);
if (p == pg_temp_pending.end() ||
- p->second != want) {
- pg_temp_wanted[pgid] = want;
+ p->second.acting != want ||
+ forced) {
+ pg_temp_wanted[pgid] = pg_temp_t{want, forced};
}
}
void OSDService::_sent_pg_temp()
{
- for (map<pg_t,vector<int> >::iterator p = pg_temp_wanted.begin();
- p != pg_temp_wanted.end();
- ++p)
- pg_temp_pending[p->first] = p->second;
+ pg_temp_pending.insert(make_move_iterator(begin(pg_temp_wanted)),
+ make_move_iterator(end(pg_temp_wanted)));
pg_temp_wanted.clear();
}
<< pg_temp_wanted.size() << dendl;
}
+std::ostream& operator<<(std::ostream& out,
+ const OSDService::pg_temp_t& pg_temp)
+{
+ out << pg_temp.acting;
+ if (pg_temp.forced) {
+ out << " (forced)";
+ }
+ return out;
+}
+
void OSDService::send_pg_temp()
{
Mutex::Locker l(pg_temp_lock);
if (pg_temp_wanted.empty())
return;
dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
- MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
- m->pg_temp = pg_temp_wanted;
- monc->send_mon_message(m);
+ MOSDPGTemp *ms[2] = {nullptr, nullptr};
+ for (auto& pg_temp : pg_temp_wanted) {
+ auto& m = ms[pg_temp.second.forced];
+ if (!m) {
+ m = new MOSDPGTemp(osdmap->get_epoch());
+ m->forced = pg_temp.second.forced;
+ }
+ m->pg_temp.emplace(pg_temp.first,
+ pg_temp.second.acting);
+ }
+ for (auto m : ms) {
+ if (m) {
+ monc->send_mon_message(m);
+ }
+ }
_sent_pg_temp();
}
void OSDService::send_pg_created(pg_t pgid)
{
dout(20) << __func__ << dendl;
- monc->send_mon_message(new MOSDPGCreated(pgid));
+ if (osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+ monc->send_mon_message(new MOSDPGCreated(pgid));
+ }
}
// --------------------------------------
if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
dout(20) << __func__ << " " << scrubs_pending << " -> " << (scrubs_pending+1)
- << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+ << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active
+ << ")" << dendl;
can_inc = true;
} else {
- dout(20) << __func__ << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
+ dout(20) << __func__ << " " << scrubs_pending << " + " << scrubs_active
+ << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
}
return can_inc;
MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
OSDSuperblock& sblock)
{
- MOSDMap *m = new MOSDMap(monc->get_fsid());
+ MOSDMap *m = new MOSDMap(monc->get_fsid(),
+ osdmap->get_encoding_features());
m->oldest_map = max_oldest_map;
m->newest_map = sblock.newest_map;
OSDSuperblock sblock(get_superblock());
if (since < sblock.oldest_map) {
// just send latest full map
- MOSDMap *m = new MOSDMap(monc->get_fsid());
+ MOSDMap *m = new MOSDMap(monc->get_fsid(),
+ osdmap->get_encoding_features());
m->oldest_map = max_oldest_map;
m->newest_map = sblock.newest_map;
get_map_bl(to, m->maps[to]);
bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
{
bool found = map_bl_cache.lookup(e, &bl);
- if (found)
+ if (found) {
+ if (logger)
+ logger->inc(l_osd_map_bl_cache_hit);
return true;
+ }
+ if (logger)
+ logger->inc(l_osd_map_bl_cache_miss);
found = store->read(coll_t::meta(),
- OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
- if (found)
+ OSD::get_osdmap_pobject_name(e), 0, 0, bl,
+ CEPH_OSD_OP_FLAG_FADVISE_WILLNEED) >= 0;
+ if (found) {
_add_map_bl(e, bl);
+ }
return found;
}
{
Mutex::Locker l(map_cache_lock);
bool found = map_bl_inc_cache.lookup(e, &bl);
- if (found)
+ if (found) {
+ if (logger)
+ logger->inc(l_osd_map_bl_cache_hit);
return true;
+ }
+ if (logger)
+ logger->inc(l_osd_map_bl_cache_miss);
found = store->read(coll_t::meta(),
- OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
- if (found)
+ OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl,
+ CEPH_OSD_OP_FLAG_FADVISE_WILLNEED) >= 0;
+ if (found) {
_add_map_inc_bl(e, bl);
+ }
return found;
}
void OSDService::_add_map_bl(epoch_t e, bufferlist& bl)
{
dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl;
+ // cache a contiguous buffer
+ if (bl.get_num_buffers() > 1) {
+ bl.rebuild();
+ }
+ bl.try_assign_to_mempool(mempool::mempool_osd_mapbl);
map_bl_cache.add(e, bl);
}
void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl)
{
dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl;
+ // cache a contiguous buffer
+ if (bl.get_num_buffers() > 1) {
+ bl.rebuild();
+ }
+ bl.try_assign_to_mempool(mempool::mempool_osd_mapbl);
map_bl_inc_cache.add(e, bl);
}
void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl)
{
Mutex::Locker l(map_cache_lock);
+ // cache a contiguous buffer
+ if (bl.get_num_buffers() > 1) {
+ bl.rebuild();
+ }
map_bl_inc_cache.pin(e, bl);
}
void OSDService::pin_map_bl(epoch_t e, bufferlist &bl)
{
Mutex::Locker l(map_cache_lock);
+ // cache a contiguous buffer
+ if (bl.get_num_buffers() > 1) {
+ bl.rebuild();
+ }
map_bl_cache.pin(e, bl);
}
void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
{
+ if (!cct->_conf->osd_debug_misdirected_ops) {
+ return;
+ }
+
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
assert(m->get_type() == CEPH_MSG_OSD_OP);
<< " to osd." << whoami
<< " not " << pg->acting
<< " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch();
- if (g_conf->osd_enxio_on_misdirected_op) {
- reply_op_error(op, -ENXIO);
- }
}
void OSDService::enqueue_back(spg_t pgid, PGQueueable qi)
ret = store->mkfs();
if (ret) {
- derr << "OSD::mkfs: ObjectStore::mkfs failed with error " << ret << dendl;
+ derr << "OSD::mkfs: ObjectStore::mkfs failed with error "
+ << cpp_strerror(ret) << dendl;
goto free_store;
}
- store->set_cache_shards(cct->_conf->osd_op_num_shards);
+ store->set_cache_shards(1); // doesn't matter for mkfs!
ret = store->mount();
if (ret) {
- derr << "OSD::mkfs: couldn't mount ObjectStore: error " << ret << dendl;
+ derr << "OSD::mkfs: couldn't mount ObjectStore: error "
+ << cpp_strerror(ret) << dendl;
goto free_store;
}
ret = store->apply_transaction(osr.get(), std::move(t));
if (ret) {
derr << "OSD::mkfs: error while writing OSD_SUPERBLOCK_GOBJECT: "
- << "apply_transaction returned " << ret << dendl;
+ << "apply_transaction returned " << cpp_strerror(ret) << dendl;
goto umount_store;
}
}
waiter.wait();
}
- ret = write_meta(store, sb.cluster_fsid, sb.osd_fsid, whoami);
+ ret = write_meta(cct, store, sb.cluster_fsid, sb.osd_fsid, whoami);
if (ret) {
- derr << "OSD::mkfs: failed to write fsid file: error " << ret << dendl;
+ derr << "OSD::mkfs: failed to write fsid file: error "
+ << cpp_strerror(ret) << dendl;
goto umount_store;
}
return ret;
}
-int OSD::write_meta(ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami)
+int OSD::write_meta(CephContext *cct, ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami)
{
char val[80];
int r;
if (r < 0)
return r;
+ string key = cct->_conf->get_val<string>("key");
+ if (key.size()) {
+ r = store->write_meta("osd_key", key);
+ if (r < 0)
+ return r;
+ } else {
+ string keyfile = cct->_conf->get_val<string>("keyfile");
+ if (!keyfile.empty()) {
+ bufferlist keybl;
+ string err;
+ if (keyfile == "-") {
+ static_assert(1024 * 1024 >
+ (sizeof(CryptoKey) - sizeof(bufferptr) +
+ sizeof(__u16) + 16 /* AES_KEY_LEN */ + 3 - 1) / 3. * 4.,
+ "1MB should be enough for a base64 encoded CryptoKey");
+ r = keybl.read_fd(STDIN_FILENO, 1024 * 1024);
+ } else {
+ r = keybl.read_file(keyfile.c_str(), &err);
+ }
+ if (r < 0) {
+ derr << __func__ << " failed to read keyfile " << keyfile << ": "
+ << err << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ r = store->write_meta("osd_key", keybl.to_str());
+ if (r < 0)
+ return r;
+ }
+ }
+
r = store->write_meta("ready", "ready");
if (r < 0)
return r;
clog(log_client.create_channel()),
whoami(id),
dev_path(dev), journal_path(jdev),
+ store_is_rotational(store->is_rotational()),
trace_endpoint("0.0.0.0", 0, "osd"),
asok_hook(NULL),
osd_compat(get_osd_compat_set()),
- osd_tp(cct, "OSD::osd_tp", "tp_osd", cct->_conf->osd_op_threads, "osd_op_threads"),
+ peering_tp(cct, "OSD::peering_tp", "tp_peering",
+ cct->_conf->osd_peering_wq_threads,
+ "osd_peering_tp_threads"),
osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
- cct->_conf->osd_op_num_threads_per_shard * cct->_conf->osd_op_num_shards),
+ get_num_op_threads()),
disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"),
command_tp(cct, "OSD::command_tp", "tp_osd_cmd", 1),
session_waiting_lock("OSD::session_waiting_lock"),
+ osdmap_subscribe_lock("OSD::osdmap_subscribe_lock"),
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false),
heartbeat_need_update(true),
op_queue(get_io_queue()),
op_prio_cutoff(get_io_prio_cut()),
op_shardedwq(
- cct->_conf->osd_op_num_shards,
+ get_num_op_shards(),
this,
cct->_conf->osd_op_thread_timeout,
cct->_conf->osd_op_thread_suicide_timeout,
this,
cct->_conf->osd_op_thread_timeout,
cct->_conf->osd_op_thread_suicide_timeout,
- &osd_tp),
+ &peering_tp),
map_lock("OSD::map_lock"),
pg_map_lock("OSD::pg_map_lock"),
last_pg_create_epoch(0),
} else if (admin_command == "flush_journal") {
store->flush_journal();
} else if (admin_command == "dump_ops_in_flight" ||
- admin_command == "ops") {
- if (!op_tracker.dump_ops_in_flight(f)) {
- ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
- Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
- }
- } else if (admin_command == "dump_blocked_ops") {
- if (!op_tracker.dump_ops_in_flight(f, true)) {
- ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
- Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
- }
- } else if (admin_command == "dump_historic_ops") {
- if (!op_tracker.dump_historic_ops(f, false)) {
- ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
- Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
- }
- } else if (admin_command == "dump_historic_ops_by_duration") {
- if (!op_tracker.dump_historic_ops(f, true)) {
- ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
- Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
- }
- } else if (admin_command == "dump_historic_slow_ops") {
- if (!op_tracker.dump_historic_slow_ops(f)) {
- ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
- Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
+ admin_command == "ops" ||
+ admin_command == "dump_blocked_ops" ||
+ admin_command == "dump_historic_ops" ||
+ admin_command == "dump_historic_ops_by_duration" ||
+ admin_command == "dump_historic_slow_ops") {
+
+ const string error_str = "op_tracker tracking is not enabled now, so no ops are tracked currently, \
+even those get stuck. Please enable \"osd_enable_op_tracker\", and the tracker \
+will start to track new ops received afterwards.";
+
+ set<string> filters;
+ vector<string> filter_str;
+ if (cmd_getval(cct, cmdmap, "filterstr", filter_str)) {
+ copy(filter_str.begin(), filter_str.end(),
+ inserter(filters, filters.end()));
+ }
+
+ if (admin_command == "dump_ops_in_flight" ||
+ admin_command == "ops") {
+ if (!op_tracker.dump_ops_in_flight(f, false, filters)) {
+ ss << error_str;
+ }
+ }
+ if (admin_command == "dump_blocked_ops") {
+ if (!op_tracker.dump_ops_in_flight(f, true, filters)) {
+ ss << error_str;
+ }
+ }
+ if (admin_command == "dump_historic_ops") {
+ if (!op_tracker.dump_historic_ops(f, false, filters)) {
+ ss << error_str;
+ }
+ }
+ if (admin_command == "dump_historic_ops_by_duration") {
+ if (!op_tracker.dump_historic_ops(f, true, filters)) {
+ ss << error_str;
+ }
+ }
+ if (admin_command == "dump_historic_slow_ops") {
+ if (!op_tracker.dump_historic_slow_ops(f, filters)) {
+ ss << error_str;
+ }
}
} else if (admin_command == "dump_op_pq_state") {
f->open_object_section("pq");
curmap->get_blacklist(&bl);
for (list<pair<entity_addr_t,utime_t> >::iterator it = bl.begin();
it != bl.end(); ++it) {
- f->open_array_section("entry");
+ f->open_object_section("entry");
f->open_object_section("entity_addr_t");
it->first.dump(f);
f->close_section(); //entity_addr_t
for (list<obj_watch_item_t>::iterator it = watchers.begin();
it != watchers.end(); ++it) {
- f->open_array_section("watch");
+ f->open_object_section("watch");
f->dump_string("namespace", it->obj.nspace);
f->dump_string("object", it->obj.oid.name);
it->wi.name.dump(f);
f->close_section(); //entity_name_t
- f->dump_int("cookie", it->wi.cookie);
- f->dump_int("timeout", it->wi.timeout_seconds);
+ f->dump_unsigned("cookie", it->wi.cookie);
+ f->dump_unsigned("timeout", it->wi.timeout_seconds);
f->open_object_section("entity_addr_t");
it->wi.addr.dump(f);
pg->unlock();
}
f->close_section();
+ } else if (admin_command == "compact") {
+ dout(1) << "triggering manual compaction" << dendl;
+ auto start = ceph::coarse_mono_clock::now();
+ store->compact();
+ auto end = ceph::coarse_mono_clock::now();
+ auto time_span = chrono::duration_cast<chrono::duration<double>>(end - start);
+ dout(1) << "finished manual compaction in "
+ << time_span.count()
+ << " seconds" << dendl;
+ f->open_object_section("compact_result");
+ f->dump_float("elapsed_time", time_span.count());
+ f->close_section();
} else {
assert(0 == "broken asok registration");
}
delete fuse_store;
fuse_store = NULL;
r = ::rmdir(mntpath.c_str());
- if (r < 0)
- r = -errno;
if (r < 0) {
- derr << __func__ << " failed to rmdir " << mntpath << dendl;
+ r = -errno;
+ derr << __func__ << " failed to rmdir " << mntpath << ": "
+ << cpp_strerror(r) << dendl;
return r;
}
return 0;
return 0;
}
+int OSD::get_num_op_shards()
+{
+ if (cct->_conf->osd_op_num_shards)
+ return cct->_conf->osd_op_num_shards;
+ if (store_is_rotational)
+ return cct->_conf->osd_op_num_shards_hdd;
+ else
+ return cct->_conf->osd_op_num_shards_ssd;
+}
+
+int OSD::get_num_op_threads()
+{
+ if (cct->_conf->osd_op_num_threads_per_shard)
+ return get_num_op_shards() * cct->_conf->osd_op_num_threads_per_shard;
+ if (store_is_rotational)
+ return get_num_op_shards() * cct->_conf->osd_op_num_threads_per_shard_hdd;
+ else
+ return get_num_op_shards() * cct->_conf->osd_op_num_threads_per_shard_ssd;
+}
+
+float OSD::get_osd_recovery_sleep()
+{
+ if (cct->_conf->osd_recovery_sleep)
+ return cct->_conf->osd_recovery_sleep;
+ if (!store_is_rotational && !journal_is_rotational)
+ return cct->_conf->osd_recovery_sleep_ssd;
+ else if (store_is_rotational && !journal_is_rotational)
+ return cct->_conf->get_val<double>("osd_recovery_sleep_hybrid");
+ else
+ return cct->_conf->osd_recovery_sleep_hdd;
+}
+
int OSD::init()
{
CompatSet initial, diff;
tick_timer.init();
tick_timer_without_osd_lock.init();
service.recovery_request_timer.init();
+ service.recovery_sleep_timer.init();
// mount.
- dout(2) << "mounting " << dev_path << " "
- << (journal_path.empty() ? "(no journal)" : journal_path) << dendl;
+ dout(2) << "init " << dev_path
+ << " (looks like " << (store_is_rotational ? "hdd" : "ssd") << ")"
+ << dendl;
+ dout(2) << "journal " << journal_path << dendl;
assert(store); // call pre_init() first!
- store->set_cache_shards(cct->_conf->osd_op_num_shards);
+ store->set_cache_shards(get_num_op_shards());
int r = store->mount();
if (r < 0) {
derr << "OSD:init: unable to mount object store" << dendl;
return r;
}
+ journal_is_rotational = store->is_journal_rotational();
+ dout(2) << "journal looks like " << (journal_is_rotational ? "hdd" : "ssd")
+ << dendl;
enable_disable_fuse(false);
clear_temp_objects();
+ // initialize osdmap references in sharded wq
+ op_shardedwq.prune_pg_waiters(osdmap, whoami);
+
// load up pgs (as they previously existed)
load_pgs();
monc->set_log_client(&log_client);
update_log_config();
- osd_tp.start();
+ peering_tp.start();
+
+ service.init();
+ service.publish_map(osdmap);
+ service.publish_superblock(superblock);
+ service.max_oldest_map = superblock.oldest_map;
+
osd_op_tp.start();
disk_tp.start();
command_tp.start();
tick_timer_without_osd_lock.add_event_after(cct->_conf->osd_heartbeat_interval, new C_Tick_WithoutOSDLock(this));
}
- service.init();
- service.publish_map(osdmap);
- service.publish_superblock(superblock);
- service.max_oldest_map = superblock.oldest_map;
-
osd_lock.Unlock();
r = monc->authenticate();
if (r < 0) {
+ derr << __func__ << " authentication failed: " << cpp_strerror(r)
+ << dendl;
osd_lock.Lock(); // locker is going to unlock this on function exit
if (is_stopping())
- r = 0;
+ r = 0;
goto monout;
}
derr << "unable to obtain rotating service keys; retrying" << dendl;
++rotating_auth_attempts;
if (rotating_auth_attempts > g_conf->max_rotating_auth_attempts) {
+ derr << __func__ << " wait_auth_rotating timed out" << dendl;
osd_lock.Lock(); // make locker happy
if (!is_stopping()) {
- r = - ETIMEDOUT;
+ r = -ETIMEDOUT;
}
goto monout;
}
r = update_crush_device_class();
if (r < 0) {
+ derr << __func__ << " unable to update_crush_device_class: "
+ << cpp_strerror(r) << dendl;
osd_lock.Lock();
goto monout;
}
r = update_crush_location();
if (r < 0) {
+ derr << __func__ << " unable to update_crush_location: "
+ << cpp_strerror(r) << dendl;
osd_lock.Lock();
goto monout;
}
return 0;
monout:
- mgrc.shutdown();
- monc->shutdown();
+ exit(1);
out:
enable_disable_fuse(true);
"flush the journal to permanent store");
assert(r == 0);
r = admin_socket->register_command("dump_ops_in_flight",
- "dump_ops_in_flight", asok_hook,
+ "dump_ops_in_flight " \
+ "name=filterstr,type=CephString,n=N,req=false",
+ asok_hook,
"show the ops currently in flight");
assert(r == 0);
r = admin_socket->register_command("ops",
- "ops", asok_hook,
+ "ops " \
+ "name=filterstr,type=CephString,n=N,req=false",
+ asok_hook,
"show the ops currently in flight");
assert(r == 0);
r = admin_socket->register_command("dump_blocked_ops",
- "dump_blocked_ops", asok_hook,
+ "dump_blocked_ops " \
+ "name=filterstr,type=CephString,n=N,req=false",
+ asok_hook,
"show the blocked ops currently in flight");
assert(r == 0);
- r = admin_socket->register_command("dump_historic_ops", "dump_historic_ops",
+ r = admin_socket->register_command("dump_historic_ops",
+ "dump_historic_ops " \
+ "name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show recent ops");
assert(r == 0);
- r = admin_socket->register_command("dump_historic_slow_ops", "dump_historic_slow_ops",
+ r = admin_socket->register_command("dump_historic_slow_ops",
+ "dump_historic_slow_ops " \
+ "name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show slowest recent ops");
assert(r == 0);
- r = admin_socket->register_command("dump_historic_ops_by_duration", "dump_historic_ops_by_duration",
+ r = admin_socket->register_command("dump_historic_ops_by_duration",
+ "dump_historic_ops_by_duration " \
+ "name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show slowest recent ops, sorted by duration");
assert(r == 0);
"show recent state history");
assert(r == 0);
+ r = admin_socket->register_command("compact", "compact",
+ asok_hook,
+ "Commpact object store's omap."
+ " WARNING: Compaction probably slows your requests");
+ assert(r == 0);
+
test_ops_hook = new TestOpsSocketHook(&(this->service), this->store);
// Note: pools are CephString instead of CephPoolname because
// these commands traditionally support both pool names and numbers
};
+ // All the basic OSD operation stats are to be considered useful
+ osd_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
+
osd_plb.add_u64(
l_osd_op_wip, "op_wip",
"Replication operations currently being processed (primary)");
osd_plb.add_time_avg(
l_osd_op_r_lat, "op_r_latency",
"Latency of read operation (including queue time)");
- osd_plb.add_histogram(
+ osd_plb.add_u64_counter_histogram(
l_osd_op_r_lat_outb_hist, "op_r_latency_out_bytes_histogram",
op_hist_x_axis_config, op_hist_y_axis_config,
"Histogram of operation latency (including queue time) + data read");
osd_plb.add_time_avg(
l_osd_op_w_lat, "op_w_latency",
"Latency of write operation (including queue time)");
- osd_plb.add_histogram(
+ osd_plb.add_u64_counter_histogram(
l_osd_op_w_lat_inb_hist, "op_w_latency_in_bytes_histogram",
op_hist_x_axis_config, op_hist_y_axis_config,
"Histogram of operation latency (including queue time) + data written");
osd_plb.add_time_avg(
l_osd_op_rw_lat, "op_rw_latency",
"Latency of read-modify-write operation (including queue time)");
- osd_plb.add_histogram(
+ osd_plb.add_u64_counter_histogram(
l_osd_op_rw_lat_inb_hist, "op_rw_latency_in_bytes_histogram",
op_hist_x_axis_config, op_hist_y_axis_config,
"Histogram of rw operation latency (including queue time) + data written");
- osd_plb.add_histogram(
+ osd_plb.add_u64_counter_histogram(
l_osd_op_rw_lat_outb_hist, "op_rw_latency_out_bytes_histogram",
op_hist_x_axis_config, op_hist_y_axis_config,
"Histogram of rw operation latency (including queue time) + data read");
l_osd_op_rw_prepare_lat, "op_rw_prepare_latency",
"Latency of read-modify-write operations (excluding queue time and wait for finished)");
+ // Now we move on to some more obscure stats, revert to assuming things
+ // are low priority unless otherwise specified.
+ osd_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
+
+ osd_plb.add_time_avg(l_osd_op_before_queue_op_lat, "op_before_queue_op_lat",
+ "Latency of IO before calling queue(before really queue into ShardedOpWq)"); // client io before queue op_wq latency
+ osd_plb.add_time_avg(l_osd_op_before_dequeue_op_lat, "op_before_dequeue_op_lat",
+ "Latency of IO before calling dequeue_op(already dequeued and get PG lock)"); // client io before dequeue_op latency
+
osd_plb.add_u64_counter(
l_osd_sop, "subop", "Suboperations");
osd_plb.add_u64_counter(
osd_plb.add_u64(
l_osd_pg_stray, "numpg_stray",
"Placement groups ready to be deleted from this osd");
+ osd_plb.add_u64(
+ l_osd_pg_removing, "numpg_removing",
+ "Placement groups queued for local deletion", "pgsr",
+ PerfCountersBuilder::PRIO_USEFUL);
osd_plb.add_u64(
l_osd_hb_to, "heartbeat_to_peers", "Heartbeat (ping) peers we send to");
osd_plb.add_u64_counter(l_osd_map, "map_messages", "OSD map messages");
osd_plb.add_u64_counter(
l_osd_waiting_for_map, "messages_delayed_for_map",
"Operations waiting for OSD map");
+
osd_plb.add_u64_counter(
l_osd_map_cache_hit, "osd_map_cache_hit", "osdmap cache hit");
osd_plb.add_u64_counter(
osd_plb.add_u64_avg(
l_osd_map_cache_miss_low_avg, "osd_map_cache_miss_low_avg",
"osdmap cache miss, avg distance below cache lower bound");
+ osd_plb.add_u64_counter(
+ l_osd_map_bl_cache_hit, "osd_map_bl_cache_hit",
+ "OSDMap buffer cache hits");
+ osd_plb.add_u64_counter(
+ l_osd_map_bl_cache_miss, "osd_map_bl_cache_miss",
+ "OSDMap buffer cache misses");
- osd_plb.add_u64(l_osd_stat_bytes, "stat_bytes", "OSD size");
- osd_plb.add_u64(l_osd_stat_bytes_used, "stat_bytes_used", "Used space");
+ osd_plb.add_u64(
+ l_osd_stat_bytes, "stat_bytes", "OSD size", "size",
+ PerfCountersBuilder::PRIO_USEFUL);
+ osd_plb.add_u64(
+ l_osd_stat_bytes_used, "stat_bytes_used", "Used space", "used",
+ PerfCountersBuilder::PRIO_USEFUL);
osd_plb.add_u64(l_osd_stat_bytes_avail, "stat_bytes_avail", "Available space");
osd_plb.add_u64_counter(
set_state(STATE_STOPPING);
// Debugging
- cct->_conf->set_val("debug_osd", "100");
- cct->_conf->set_val("debug_journal", "100");
- cct->_conf->set_val("debug_filestore", "100");
- cct->_conf->set_val("debug_ms", "100");
- cct->_conf->apply_changes(NULL);
+ if (cct->_conf->get_val<bool>("osd_debug_shutdown")) {
+ cct->_conf->set_val("debug_osd", "100");
+ cct->_conf->set_val("debug_journal", "100");
+ cct->_conf->set_val("debug_filestore", "100");
+ cct->_conf->set_val("debug_bluestore", "100");
+ cct->_conf->set_val("debug_ms", "100");
+ cct->_conf->apply_changes(NULL);
+ }
// stop MgrClient earlier as it's more like an internal consumer of OSD
mgrc.shutdown();
cct->get_admin_socket()->unregister_command("dump_watchers");
cct->get_admin_socket()->unregister_command("dump_reservations");
cct->get_admin_socket()->unregister_command("get_latest_osdmap");
+ cct->get_admin_socket()->unregister_command("heap");
cct->get_admin_socket()->unregister_command("set_heap_property");
cct->get_admin_socket()->unregister_command("get_heap_property");
cct->get_admin_socket()->unregister_command("dump_objectstore_kv_stats");
+ cct->get_admin_socket()->unregister_command("dump_scrubs");
cct->get_admin_socket()->unregister_command("calc_objectstore_db_histogram");
cct->get_admin_socket()->unregister_command("flush_store_cache");
cct->get_admin_socket()->unregister_command("dump_pgstate_history");
+ cct->get_admin_socket()->unregister_command("compact");
delete asok_hook;
asok_hook = NULL;
cct->get_admin_socket()->unregister_command("injectdataerr");
cct->get_admin_socket()->unregister_command("injectmdataerr");
cct->get_admin_socket()->unregister_command("set_recovery_delay");
+ cct->get_admin_socket()->unregister_command("trigger_scrub");
+ cct->get_admin_socket()->unregister_command("injectfull");
delete test_ops_hook;
test_ops_hook = NULL;
heartbeat_lock.Unlock();
heartbeat_thread.join();
- osd_tp.drain();
+ peering_tp.drain();
peering_wq.clear();
- osd_tp.stop();
+ peering_tp.stop();
dout(10) << "osd tp stopped" << dendl;
osd_op_tp.drain();
assert(pg_stat_queue.empty());
}
+ service.shutdown_reserver();
+
// Remove PGs
#ifdef PG_DEBUG_REFS
service.dump_live_pgids();
#ifdef PG_DEBUG_REFS
p->second->dump_live_ids();
#endif
- ceph_abort();
+ if (cct->_conf->osd_shutdown_pgref_assert) {
+ ceph_abort();
+ }
}
p->second->unlock();
p->second->put("PGMap");
int OSD::update_crush_device_class()
{
+ if (!cct->_conf->osd_class_update_on_start) {
+ dout(10) << __func__ << " osd_class_update_on_start = false" << dendl;
+ return 0;
+ }
+
string device_class;
int r = store->read_meta("crush_device_class", &device_class);
- if (r < 0)
+ if (r < 0 || device_class.empty()) {
+ device_class = store->get_default_device_class();
+ }
+
+ if (device_class.empty()) {
+ dout(20) << __func__ << " no device class stored locally" << dendl;
return 0;
+ }
string cmd =
string("{\"prefix\": \"osd crush set-device-class\", ") +
- string("\"id\": ") + stringify(whoami) + string(", ") +
- string("\"class\": \"") + device_class + string("\"}");
-
- return mon_cmd_maybe_osd_create(cmd);
+ string("\"class\": \"") + device_class + string("\", ") +
+ string("\"ids\": [\"") + stringify(whoami) + string("\"]}");
+
+ r = mon_cmd_maybe_osd_create(cmd);
+ // the above cmd can fail for various reasons, e.g.:
+ // (1) we are connecting to a pre-luminous monitor
+ // (2) user manually specify a class other than
+ // 'ceph-disk prepare --crush-device-class'
+ // simply skip result-checking for now
+ return 0;
}
void OSD::write_superblock(ObjectStore::Transaction& t)
return pg;
}
+PG *OSD::lookup_lock_pg(spg_t pgid)
+{
+ return _lookup_lock_pg(pgid);
+}
+
PG *OSD::_lookup_lock_pg_with_map_lock_held(spg_t pgid)
{
assert(pg_map.count(pgid));
pg->upgrade(store);
}
+ if (pg->dne()) {
+ dout(10) << "load_pgs " << *it << " deleting dne" << dendl;
+ pg->ch = nullptr;
+ service.pg_remove_epoch(pg->pg_id);
+ pg->unlock();
+ {
+ // Delete pg
+ RWLock::WLocker l(pg_map_lock);
+ auto p = pg_map.find(pg->get_pgid());
+ assert(p != pg_map.end() && p->second == pg);
+ dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl;
+ pg_map.erase(p);
+ pg->put("PGMap");
+ }
+ recursive_remove_collection(cct, store, pgid, *it);
+ continue;
+ }
+
service.init_splits_between(pg->info.pgid, pg->get_osdmap(), osdmap);
// generate state for PG's current mapping
++i) {
PG *pg = i->second;
+ // Ignore PGs only partially created (DNE)
+ if (pg->info.dne()) {
+ continue;
+ }
+
auto rpib = pg->get_required_past_interval_bounds(
pg->info,
superblock.oldest_map);
ceph_abort();
}
+ const bool is_mon_create =
+ evt->get_event().dynamic_type() == PG::NullEvt::static_type();
+ if (maybe_wait_for_max_pg(pgid, is_mon_create)) {
+ return -EAGAIN;
+ }
// do we need to resurrect a deleting pg?
spg_t resurrected;
PGRef old_pg_state;
}
}
+bool OSD::maybe_wait_for_max_pg(spg_t pgid, bool is_mon_create)
+{
+ const auto max_pgs_per_osd =
+ (cct->_conf->get_val<uint64_t>("mon_max_pg_per_osd") *
+ cct->_conf->get_val<double>("osd_max_pg_per_osd_hard_ratio"));
+
+ RWLock::RLocker pg_map_locker{pg_map_lock};
+ if (pg_map.size() < max_pgs_per_osd) {
+ return false;
+ }
+ lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+ if (is_mon_create) {
+ pending_creates_from_mon++;
+ } else {
+ bool is_primary = osdmap->get_pg_acting_rank(pgid.pgid, whoami) == 0;
+ pending_creates_from_osd.emplace(pgid.pgid, is_primary);
+ }
+ dout(5) << __func__ << " withhold creation of pg " << pgid
+ << ": " << pg_map.size() << " >= "<< max_pgs_per_osd << dendl;
+ return true;
+}
+
+// to re-trigger a peering, we have to twiddle the pg mapping a little bit,
+// see PG::should_restart_peering(). OSDMap::pg_to_up_acting_osds() will turn
+// to up set if pg_temp is empty. so an empty pg_temp won't work.
+static vector<int32_t> twiddle(const vector<int>& acting) {
+ if (acting.size() > 1) {
+ return {acting[0]};
+ } else {
+ vector<int32_t> twiddled(acting.begin(), acting.end());
+ twiddled.push_back(-1);
+ return twiddled;
+ }
+}
+
+void OSD::resume_creating_pg()
+{
+ bool do_sub_pg_creates = false;
+ bool have_pending_creates = false;
+ {
+ const auto max_pgs_per_osd =
+ (cct->_conf->get_val<uint64_t>("mon_max_pg_per_osd") *
+ cct->_conf->get_val<double>("osd_max_pg_per_osd_hard_ratio"));
+ RWLock::RLocker l(pg_map_lock);
+ if (max_pgs_per_osd <= pg_map.size()) {
+ // this could happen if admin decreases this setting before a PG is removed
+ return;
+ }
+ unsigned spare_pgs = max_pgs_per_osd - pg_map.size();
+ lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+ if (pending_creates_from_mon > 0) {
+ do_sub_pg_creates = true;
+ if (pending_creates_from_mon >= spare_pgs) {
+ spare_pgs = pending_creates_from_mon = 0;
+ } else {
+ spare_pgs -= pending_creates_from_mon;
+ pending_creates_from_mon = 0;
+ }
+ }
+ auto pg = pending_creates_from_osd.cbegin();
+ while (spare_pgs > 0 && pg != pending_creates_from_osd.cend()) {
+ dout(20) << __func__ << " pg " << pg->first << dendl;
+ vector<int> acting;
+ osdmap->pg_to_up_acting_osds(pg->first, nullptr, nullptr, &acting, nullptr);
+ service.queue_want_pg_temp(pg->first, twiddle(acting), true);
+ pg = pending_creates_from_osd.erase(pg);
+ do_sub_pg_creates = true;
+ spare_pgs--;
+ }
+ have_pending_creates = (pending_creates_from_mon > 0 ||
+ !pending_creates_from_osd.empty());
+ }
+
+ bool do_renew_subs = false;
+ if (do_sub_pg_creates) {
+ if (monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0)) {
+ dout(4) << __func__ << ": resolicit pg creates from mon since "
+ << last_pg_create_epoch << dendl;
+ do_renew_subs = true;
+ }
+ }
+ version_t start = osdmap->get_epoch() + 1;
+ if (have_pending_creates) {
+ // don't miss any new osdmap deleting PGs
+ if (monc->sub_want("osdmap", start, 0)) {
+ dout(4) << __func__ << ": resolicit osdmap from mon since "
+ << start << dendl;
+ do_renew_subs = true;
+ }
+ } else if (do_sub_pg_creates) {
+ // no need to subscribe the osdmap continuously anymore
+ // once the pgtemp and/or mon_subscribe(pg_creates) is sent
+ if (monc->sub_want_increment("osdmap", start, CEPH_SUBSCRIBE_ONETIME)) {
+ dout(4) << __func__ << ": re-subscribe osdmap(onetime) since"
+ << start << dendl;
+ do_renew_subs = true;
+ }
+ }
+
+ if (do_renew_subs) {
+ monc->renew_subs();
+ }
+
+ service.send_pg_temp();
+}
void OSD::build_initial_pg_history(
spg_t pgid,
{
dout(10) << __func__ << " " << pgid << " created " << created << dendl;
h->epoch_created = created;
+ h->epoch_pool_created = created;
h->same_interval_since = created;
h->same_up_since = created;
h->same_primary_since = created;
&debug);
if (new_interval) {
h->same_interval_since = e;
- }
- if (up != new_up) {
- h->same_up_since = e;
- }
- if (acting_primary != new_acting_primary) {
- h->same_primary_since = e;
+ if (up != new_up) {
+ h->same_up_since = e;
+ }
+ if (acting_primary != new_acting_primary) {
+ h->same_primary_since = e;
+ }
+ if (pgid.pgid.is_split(lastmap->get_pg_num(pgid.pgid.pool()),
+ osdmap->get_pg_num(pgid.pgid.pool()),
+ nullptr)) {
+ h->last_epoch_split = e;
+ }
+ up = new_up;
+ acting = new_acting;
+ up_primary = new_up_primary;
+ acting_primary = new_acting_primary;
}
lastmap = osdmap;
}
break;
}
- // base case: these floors should be the creation epoch if we didn't
+ // base case: these floors should be the pg creation epoch if we didn't
// find any changes.
if (e == h.epoch_created) {
if (!h.same_interval_since)
}
OSDMapRef curmap = service.get_osdmap();
- assert(curmap);
+ if (!curmap) {
+ heartbeat_lock.Unlock();
+ m->put();
+ return;
+ }
switch (m->op) {
Message *r = new MOSDPing(monc->get_fsid(),
curmap->get_epoch(),
- MOSDPing::PING_REPLY,
- m->stamp);
+ MOSDPing::PING_REPLY, m->stamp,
+ cct->_conf->osd_heartbeat_min_size);
m->get_connection()->send_message(r);
if (curmap->is_up(from)) {
Message *r = new MOSDPing(monc->get_fsid(),
curmap->get_epoch(),
MOSDPing::YOU_DIED,
- m->stamp);
+ m->stamp,
+ cct->_conf->osd_heartbeat_min_size);
m->get_connection()->send_message(r);
}
}
dout(30) << "heartbeat sending ping to osd." << peer << dendl;
i->second.con_back->send_message(new MOSDPing(monc->get_fsid(),
service.get_osdmap()->get_epoch(),
- MOSDPing::PING,
- now));
+ MOSDPing::PING, now,
+ cct->_conf->osd_heartbeat_min_size));
if (i->second.con_front)
i->second.con_front->send_message(new MOSDPing(monc->get_fsid(),
service.get_osdmap()->get_epoch(),
- MOSDPing::PING,
- now));
+ MOSDPing::PING, now,
+ cct->_conf->osd_heartbeat_min_size));
}
logger->set(l_osd_hb_to, heartbeat_peers.size());
if (is_waiting_for_healthy()) {
start_boot();
+ } else if (is_preboot() &&
+ waiting_for_luminous_mons &&
+ monc->monmap.get_required_features().contains_all(
+ ceph::features::mon::FEATURE_LUMINOUS)) {
+ // mon upgrade finished!
+ start_boot();
}
do_waiters();
tick_timer.add_event_after(OSD_TICK_INTERVAL, new C_Tick(this));
-
- if (is_active()) {
- const auto now = ceph::coarse_mono_clock::now();
- const auto elapsed = now - last_sent_beacon;
- if (chrono::duration_cast<chrono::seconds>(elapsed).count() >
- cct->_conf->osd_beacon_report_interval) {
- send_beacon(now);
- }
- }
}
void OSD::tick_without_osd_lock()
logger->set(l_osd_cached_crc, buffer::get_cached_crc());
logger->set(l_osd_cached_crc_adjusted, buffer::get_cached_crc_adjusted());
logger->set(l_osd_missed_crc, buffer::get_missed_crc());
+ logger->set(l_osd_pg_removing, remove_wq.get_remove_queue_len());
// osd_lock is not being held, which means the OSD state
// might change when doing the monitor report
// do any pending reports
send_full_update();
send_failures();
- send_pg_stats(now);
+ if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+ send_pg_stats(now);
+ }
}
map_lock.put_read();
}
sched_scrub();
}
service.promote_throttle_recalibrate();
+ resume_creating_pg();
+ bool need_send_beacon = false;
+ const auto now = ceph::coarse_mono_clock::now();
+ {
+ // borrow lec lock to pretect last_sent_beacon from changing
+ Mutex::Locker l{min_last_epoch_clean_lock};
+ const auto elapsed = now - last_sent_beacon;
+ if (chrono::duration_cast<chrono::seconds>(elapsed).count() >
+ cct->_conf->osd_beacon_report_interval) {
+ need_send_beacon = true;
+ }
+ }
+ if (need_send_beacon) {
+ send_beacon(now);
+ }
}
- check_ops_in_flight();
+ mgrc.update_osd_health(get_health_metrics());
service.kick_recovery_queue();
tick_timer_without_osd_lock.add_event_after(OSD_TICK_INTERVAL, new C_Tick_WithoutOSDLock(this));
}
if (pool < 0 && isdigit(poolstr[0]))
pool = atoll(poolstr.c_str());
if (pool < 0) {
- ss << "Invalid pool" << poolstr;
+ ss << "Invalid pool '" << poolstr << "''";
return;
}
service.send_pg_temp();
requeue_failures();
send_failures();
- send_pg_stats(now);
+ if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+ send_pg_stats(now);
+ }
map_lock.put_read();
if (is_active()) {
}
dout(1) << __func__ << dendl;
set_state(STATE_PREBOOT);
+ waiting_for_luminous_mons = false;
dout(10) << "start_boot - have maps " << superblock.oldest_map
<< ".." << superblock.newest_map << dendl;
C_OSD_GetVersion *c = new C_OSD_GetVersion(this);
heartbeat();
// if our map within recent history, try to add ourselves to the osdmap.
- if (osdmap->test_flag(CEPH_OSDMAP_NOUP)) {
+ if (osdmap->get_epoch() == 0) {
+ derr << "waiting for initial osdmap" << dendl;
+ } else if (osdmap->is_destroyed(whoami)) {
+ derr << "osdmap says I am destroyed" << dendl;
+ // provide a small margin so we don't livelock seeing if we
+ // un-destroyed ourselves.
+ if (osdmap->get_epoch() > newest - 1) {
+ exit(0);
+ }
+ } else if (osdmap->test_flag(CEPH_OSDMAP_NOUP) || osdmap->is_noup(whoami)) {
derr << "osdmap NOUP flag is set, waiting for it to clear" << dendl;
} else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
derr << "osdmap SORTBITWISE OSDMap flag is NOT set; please set it"
<< dendl;
- } else if (!osdmap->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+ } else if (osdmap->require_osd_release < CEPH_RELEASE_JEWEL) {
derr << "osdmap REQUIRE_JEWEL OSDMap flag is NOT set; please set it"
<< dendl;
} else if (!monc->monmap.get_required_features().contains_all(
ceph::features::mon::FEATURE_LUMINOUS)) {
derr << "monmap REQUIRE_LUMINOUS is NOT set; must upgrade all monitors to "
<< "Luminous or later before Luminous OSDs will boot" << dendl;
+ waiting_for_luminous_mons = true;
} else if (service.need_fullness_update()) {
derr << "osdmap fullness state needs update" << dendl;
send_full_update();
dout(1) << "start_waiting_for_healthy" << dendl;
set_state(STATE_WAITING_FOR_HEALTHY);
last_heartbeat_resample = utime_t();
+
+ // subscribe to osdmap updates, in case our peers really are known to be dead
+ osdmap_subscribe(osdmap->get_epoch() + 1, false);
}
bool OSD::_is_healthy()
{
// config info
(*pm)["osd_data"] = dev_path;
- (*pm)["osd_journal"] = journal_path;
+ if (store->get_type() == "filestore") {
+ // not applicable for bluestore
+ (*pm)["osd_journal"] = journal_path;
+ }
(*pm)["front_addr"] = stringify(client_messenger->get_myaddr());
(*pm)["back_addr"] = stringify(cluster_messenger->get_myaddr());
(*pm)["hb_front_addr"] = stringify(hb_front_server_messenger->get_myaddr());
// backend
(*pm)["osd_objectstore"] = store->get_type();
+ (*pm)["rotational"] = store_is_rotational ? "1" : "0";
+ (*pm)["journal_rotational"] = journal_is_rotational ? "1" : "0";
+ (*pm)["default_device_class"] = store->get_default_device_class();
store->collect_metadata(pm);
collect_sys_info(pm, cct);
+ std::string front_iface, back_iface;
+ /*
+ pick_iface(cct,
+ CEPH_PICK_ADDRESS_PUBLIC | CEPH_PICK_ADDRESS_CLUSTER,
+ &front_iface, &back_iface);
+ */
+ (*pm)["front_iface"] = pick_iface(cct,
+ client_messenger->get_myaddr().get_sockaddr_storage());
+ (*pm)["back_iface"] = pick_iface(cct,
+ cluster_messenger->get_myaddr().get_sockaddr_storage());
+
dout(10) << __func__ << " " << *pm << dendl;
}
utime_t now = ceph_clock_now();
while (!failure_queue.empty()) {
int osd = failure_queue.begin()->first;
- entity_inst_t i = osdmap->get_inst(osd);
if (!failure_pending.count(osd)) {
+ entity_inst_t i = osdmap->get_inst(osd);
int failed_for = (int)(double)(now - failure_queue.begin()->second);
monc->send_mon_message(new MOSDFailure(monc->get_fsid(), i, failed_for,
osdmap->get_epoch()));
void OSD::send_pg_stats(const utime_t &now)
{
assert(map_lock.is_locked());
+ assert(osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS);
dout(20) << "send_pg_stats" << dendl;
osd_stat_t cur_stat = service.get_osd_stat();
monmap.get_required_features().contains_all(
ceph::features::mon::FEATURE_LUMINOUS)) {
dout(20) << __func__ << " sending" << dendl;
- last_sent_beacon = now;
MOSDBeacon* beacon = nullptr;
{
Mutex::Locker l{min_last_epoch_clean_lock};
beacon = new MOSDBeacon(osdmap->get_epoch(), min_last_epoch_clean);
std::swap(beacon->pgs, min_last_epoch_clean_pgs);
+ last_sent_beacon = now;
}
monc->send_mon_message(beacon);
} else {
"name=offset,type=CephString,req=false",
"list missing objects on this pg, perhaps starting at an offset given in JSON",
"osd", "r", "cli,rest")
+COMMAND("perf histogram dump "
+ "name=logger,type=CephString,req=false "
+ "name=counter,type=CephString,req=false",
+ "Get histogram data",
+ "osd", "r", "cli,rest")
// tell <osd.n> commands. Validation of osd.n must be special-cased in client
COMMAND("version", "report version of OSD", "osd", "r", "cli,rest")
"name=injected_args,type=CephString,n=N",
"inject configuration arguments into running OSD",
"osd", "rw", "cli,rest")
+COMMAND("config set " \
+ "name=key,type=CephString name=value,type=CephString",
+ "Set a configuration option at runtime (not persistent)",
+ "osd", "rw", "cli,rest")
COMMAND("cluster_log " \
"name=level,type=CephChoices,strings=error,warning,info,debug " \
"name=message,type=CephString,n=N",
"osd", "r", "cli,rest")
COMMAND("reset_pg_recovery_stats", "reset pg recovery statistics",
"osd", "rw", "cli,rest")
+COMMAND("compact",
+ "compact object store's omap. "
+ "WARNING: Compaction probably slows your requests",
+ "osd", "rw", "cli,rest")
};
void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data)
r = cct->_conf->injectargs(args, &ss);
osd_lock.Lock();
}
+ else if (prefix == "config set") {
+ std::string key;
+ std::string val;
+ cmd_getval(cct, cmdmap, "key", key);
+ cmd_getval(cct, cmdmap, "value", val);
+ osd_lock.Unlock();
+ r = cct->_conf->set_val(key, val, true, &ss);
+ if (r == 0) {
+ cct->_conf->apply_changes(nullptr);
+ }
+ osd_lock.Lock();
+ }
else if (prefix == "cluster_log") {
vector<string> msg;
cmd_getval(cct, cmdmap, "message", msg);
}
else if (prefix == "flush_pg_stats") {
- flush_pg_stats();
+ if (osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+ mgrc.send_pgstats();
+ ds << service.get_osd_stat_seq() << "\n";
+ } else {
+ flush_pg_stats();
+ }
}
else if (prefix == "heap") {
pg_recovery_stats.reset();
}
+ else if (prefix == "perf histogram dump") {
+ std::string logger;
+ std::string counter;
+ cmd_getval(cct, cmdmap, "logger", logger);
+ cmd_getval(cct, cmdmap, "counter", counter);
+ if (f) {
+ cct->get_perfcounters_collection()->dump_formatted_histograms(
+ f.get(), false, logger, counter);
+ f->flush(ds);
+ }
+ }
+
+ else if (prefix == "compact") {
+ dout(1) << "triggering manual compaction" << dendl;
+ auto start = ceph::coarse_mono_clock::now();
+ store->compact();
+ auto end = ceph::coarse_mono_clock::now();
+ auto time_span = chrono::duration_cast<chrono::duration<double>>(end - start);
+ dout(1) << "finished manual compaction in "
+ << time_span.count()
+ << " seconds" << dendl;
+ ss << "compacted omap in " << time_span.count() << " seconds";
+ }
+
else {
ss << "unrecognized command! " << cmd;
r = -EINVAL;
{
dout(10) << "OSD::ms_get_authorizer type=" << ceph_entity_type_name(dest_type) << dendl;
+ if (is_stopping()) {
+ dout(10) << __func__ << " bailing, we are shutting down" << dendl;
+ return false;
+ }
+
if (dest_type == CEPH_ENTITY_TYPE_MON)
return true;
}
-bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
- int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
- bool& isvalid, CryptoKey& session_key)
+bool OSD::ms_verify_authorizer(
+ Connection *con, int peer_type,
+ int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
+ bool& isvalid, CryptoKey& session_key,
+ std::unique_ptr<AuthAuthorizerChallenge> *challenge)
{
AuthAuthorizeHandler *authorize_handler = 0;
switch (peer_type) {
uint64_t global_id;
uint64_t auid = CEPH_AUTH_UID_DEFAULT;
- isvalid = authorize_handler->verify_authorizer(
- cct, monc->rotating_secrets.get(),
- authorizer_data, authorizer_reply, name, global_id, caps_info, session_key,
- &auid);
+ RotatingKeyRing *keys = monc->rotating_secrets.get();
+ if (keys) {
+ isvalid = authorize_handler->verify_authorizer(
+ cct, keys,
+ authorizer_data, authorizer_reply, name, global_id, caps_info, session_key,
+ &auid, challenge);
+ } else {
+ dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
+ isvalid = false;
+ }
if (isvalid) {
Session *s = static_cast<Session *>(con->get_priv());
handle_scrub(static_cast<MOSDScrub*>(m));
break;
+ case MSG_OSD_FORCE_RECOVERY:
+ handle_force_recovery(m);
+ break;
+
// -- need OSDMap --
case MSG_OSD_PG_CREATE:
struct tm bdt;
time_t tt = now.sec();
localtime_r(&tt, &bdt);
+
+ bool day_permit = false;
+ if (cct->_conf->osd_scrub_begin_week_day < cct->_conf->osd_scrub_end_week_day) {
+ if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day && bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
+ day_permit = true;
+ }
+ } else {
+ if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day || bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
+ day_permit = true;
+ }
+ }
+
+ if (!day_permit) {
+ dout(20) << __func__ << " should run between week day " << cct->_conf->osd_scrub_begin_week_day
+ << " - " << cct->_conf->osd_scrub_end_week_day
+ << " now " << bdt.tm_wday << " = no" << dendl;
+ return false;
+ }
+
bool time_permit = false;
if (cct->_conf->osd_scrub_begin_hour < cct->_conf->osd_scrub_end_hour) {
if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour && bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
if (!service.can_inc_scrubs_pending()) {
return;
}
+ if (!cct->_conf->osd_scrub_during_recovery && service.is_recovery_active()) {
+ dout(20) << __func__ << " not scheduling scrubs due to active recovery" << dendl;
+ return;
+ }
+
utime_t now = ceph_clock_now();
bool time_permit = scrub_time_permit(now);
break;
}
- if (!cct->_conf->osd_scrub_during_recovery && service.is_recovery_active()) {
- dout(10) << __func__ << "not scheduling scrub of " << scrub.pgid << " due to active recovery ops" << dendl;
- break;
- }
-
if ((scrub.deadline >= now) && !(time_permit && load_is_low)) {
dout(10) << __func__ << " not scheduling scrub for " << scrub.pgid << " due to "
<< (!time_permit ? "time not permit" : "high load") << dendl;
+vector<OSDHealthMetric> OSD::get_health_metrics()
+{
+ vector<OSDHealthMetric> metrics;
+ lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+ auto n_primaries = pending_creates_from_mon;
+ for (const auto& create : pending_creates_from_osd) {
+ if (create.second) {
+ n_primaries++;
+ }
+ }
+ metrics.emplace_back(osd_metric::PENDING_CREATING_PGS, n_primaries);
+ return metrics;
+}
+
// =====================================================
// MAP
void OSD::osdmap_subscribe(version_t epoch, bool force_request)
{
- OSDMapRef osdmap = service.get_osdmap();
- if (osdmap->get_epoch() >= epoch)
+ Mutex::Locker l(osdmap_subscribe_lock);
+ if (latest_subscribed_epoch >= epoch && !force_request)
return;
+ latest_subscribed_epoch = MAX(epoch, latest_subscribed_epoch);
+
if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
force_request) {
monc->renew_subs();
if (num > 0) {
service.publish_superblock(superblock);
write_superblock(t);
- store->queue_transaction(service.meta_osr.get(), std::move(t), nullptr);
+ int tr = store->queue_transaction(service.meta_osr.get(), std::move(t), nullptr);
+ assert(tr == 0);
}
// we should not remove the cached maps
assert(min <= service.map_cache.cached_key_lower_bound());
rerequest_full_maps();
}
- if (last <= superblock.newest_map) {
- dout(10) << " no new maps here, dropping" << dendl;
- m->put();
- return;
- }
-
if (superblock.oldest_map) {
// make sure we at least keep pace with incoming maps
trim_maps(m->oldest_map, last - first + 1, skip_maps);
return;
}
Mutex::Locker l(osd_lock);
+ if (is_stopping()) {
+ dout(10) << __func__ << " bailing, we are shutting down" << dendl;
+ return;
+ }
map_lock.get_write();
bool do_shutdown = false;
}
}
- if (osdmap->test_flag(CEPH_OSDMAP_NOUP) !=
- newmap->test_flag(CEPH_OSDMAP_NOUP)) {
+ if ((osdmap->test_flag(CEPH_OSDMAP_NOUP) !=
+ newmap->test_flag(CEPH_OSDMAP_NOUP)) ||
+ (osdmap->is_noup(whoami) != newmap->is_noup(whoami))) {
dout(10) << __func__ << " NOUP flag changed in " << newmap->get_epoch()
<< dendl;
if (is_booting()) {
do_restart = true;
}
}
+ if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS &&
+ newmap->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+ dout(10) << __func__ << " require_osd_release reached luminous in "
+ << newmap->get_epoch() << dendl;
+ clear_pg_stat_queue();
+ clear_outstanding_pg_stats();
+ }
osdmap = newmap;
epoch_t up_epoch;
if (service.is_preparing_to_stop() || service.is_stopping()) {
service.got_stop_ack();
} else {
- clog->warn() << "map e" << osdmap->get_epoch()
- << " wrongly marked me down at e"
- << osdmap->get_down_at(whoami);
+ clog->warn() << "Monitor daemon marked osd." << whoami << " down, "
+ "but it is still running";
+ clog->debug() << "map e" << osdmap->get_epoch()
+ << " wrongly marked me down at e"
+ << osdmap->get_down_at(whoami);
}
} else if (!osdmap->get_addr(whoami).probably_equals(
client_messenger->get_myaddr())) {
} else if (!osdmap->get_hb_back_addr(whoami).probably_equals(
hb_back_server_messenger->get_myaddr())) {
clog->error() << "map e" << osdmap->get_epoch()
- << " had wrong hb back addr ("
+ << " had wrong heartbeat back addr ("
<< osdmap->get_hb_back_addr(whoami)
<< " != my " << hb_back_server_messenger->get_myaddr()
<< ")";
!osdmap->get_hb_front_addr(whoami).probably_equals(
hb_front_server_messenger->get_myaddr())) {
clog->error() << "map e" << osdmap->get_epoch()
- << " had wrong hb front addr ("
+ << " had wrong heartbeat front addr ("
<< osdmap->get_hb_front_addr(whoami)
<< " != my " << hb_front_server_messenger->get_myaddr()
<< ")";
activate_map();
}
- if (m->newest_map && m->newest_map > last) {
- dout(10) << " msg say newest map is " << m->newest_map
- << ", requesting more" << dendl;
- osdmap_subscribe(osdmap->get_epoch()+1, false);
- }
- else if (do_shutdown) {
+ if (do_shutdown) {
if (network_error) {
Mutex::Locker l(heartbeat_lock);
map<int,pair<utime_t,entity_inst_t>>::iterator it =
dout(0) << __func__ << " shutdown OSD via async signal" << dendl;
queue_async_signal(SIGINT);
}
+ else if (m->newest_map && m->newest_map > last) {
+ dout(10) << " msg say newest map is " << m->newest_map
+ << ", requesting more" << dendl;
+ osdmap_subscribe(osdmap->get_epoch()+1, false);
+ }
else if (is_preboot()) {
if (m->get_source().is_mon())
_preboot(m->oldest_map, m->newest_map);
epoch_t osd_epoch, PG *pg,
ThreadPool::TPHandle &handle,
PG::RecoveryCtx *rctx,
- set<boost::intrusive_ptr<PG> > *new_pgs)
+ set<PGRef> *new_pgs)
{
assert(pg->is_locked());
epoch_t next_epoch = pg->get_osdmap()->get_epoch() + 1;
assert(osd_lock.is_locked());
dout(7) << "consume_map version " << osdmap->get_epoch() << dendl;
+ /** make sure the cluster is speaking in SORTBITWISE, because we don't
+ * speak the older sorting version any more. Be careful not to force
+ * a shutdown if we are merely processing old maps, though.
+ */
+ if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE) && is_active()) {
+ derr << __func__ << " SORTBITWISE flag is not set" << dendl;
+ ceph_abort();
+ }
+
int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0;
list<PGRef> to_remove;
pg->unlock();
}
+
+ lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+ for (auto pg = pending_creates_from_osd.cbegin();
+ pg != pending_creates_from_osd.cend();) {
+ if (osdmap->get_pg_acting_rank(pg->first, whoami) < 0) {
+ pg = pending_creates_from_osd.erase(pg);
+ } else {
+ ++pg;
+ }
+ }
}
for (list<PGRef>::iterator i = to_remove.begin();
logger->set(l_osd_pg_primary, num_pg_primary);
logger->set(l_osd_pg_replica, num_pg_replica);
logger->set(l_osd_pg_stray, num_pg_stray);
+ logger->set(l_osd_pg_removing, remove_wq.get_remove_queue_len());
}
void OSD::activate_map()
dout(7) << "activate_map version " << osdmap->get_epoch() << dendl;
- if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
- derr << __func__ << " SORTBITWISE flag is not set" << dendl;
- ceph_abort();
- }
-
if (osdmap->test_flag(CEPH_OSDMAP_FULL)) {
dout(10) << " osdmap flagged full, doing onetime osdmap subscribe" << dendl;
osdmap_subscribe(osdmap->get_epoch() + 1, false);
void OSD::split_pgs(
PG *parent,
- const set<spg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs,
+ const set<spg_t> &childpgids, set<PGRef> *out_pgs,
OSDMapRef curmap,
OSDMapRef nextmap,
PG::RecoveryCtx *rctx)
<< dendl;
continue;
}
-
if (handle_pg_peering_evt(
pgid,
history,
service.send_pg_created(pgid.pgid);
}
}
- last_pg_create_epoch = m->epoch;
+ {
+ lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+ if (pending_creates_from_mon == 0) {
+ last_pg_create_epoch = m->epoch;
+ }
+ }
maybe_update_heartbeat_peers();
}
continue;
}
service.share_map_peer(it->first, con.get(), curmap);
- dout(7) << __func__ << " osd " << it->first
+ dout(7) << __func__ << " osd." << it->first
<< " on " << it->second.size() << " PGs" << dendl;
MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
it->second);
m->query_epoch,
PG::RemoteBackfillReserved()));
} else if (m->type == MBackfillReserve::REJECT) {
+ // NOTE: this is replica -> primary "i reject your request"
+ // and also primary -> replica "cancel my previously-granted request"
evt = PG::CephPeeringEvtRef(
new PG::CephPeeringEvt(
m->query_epoch,
pg->unlock();
}
+void OSD::handle_force_recovery(Message *m)
+{
+ MOSDForceRecovery *msg = static_cast<MOSDForceRecovery*>(m);
+ assert(msg->get_type() == MSG_OSD_FORCE_RECOVERY);
+
+ vector<PGRef> local_pgs;
+ local_pgs.reserve(msg->forced_pgs.size());
+
+ {
+ RWLock::RLocker l(pg_map_lock);
+ for (auto& i : msg->forced_pgs) {
+ spg_t locpg;
+ if (osdmap->get_primary_shard(i, &locpg)) {
+ auto pg_map_entry = pg_map.find(locpg);
+ if (pg_map_entry != pg_map.end()) {
+ local_pgs.push_back(pg_map_entry->second);
+ }
+ }
+ }
+ }
+
+ if (local_pgs.size()) {
+ service.adjust_pg_priorities(local_pgs, msg->options);
+ }
+
+ msg->put();
+}
/** PGQuery
* from primary to replica | stray
pg->put("PGMap"); // since we've taken it out of map
}
-
// =========================================================
// RECOVERY
return true;
}
+
+void OSDService::adjust_pg_priorities(const vector<PGRef>& pgs, int newflags)
+{
+ if (!pgs.size() || !(newflags & (OFR_BACKFILL | OFR_RECOVERY)))
+ return;
+ int newstate = 0;
+
+ if (newflags & OFR_BACKFILL) {
+ newstate = PG_STATE_FORCED_BACKFILL;
+ } else if (newflags & OFR_RECOVERY) {
+ newstate = PG_STATE_FORCED_RECOVERY;
+ }
+
+ // debug output here may get large, don't generate it if debug level is below
+ // 10 and use abbreviated pg ids otherwise
+ if ((cct)->_conf->subsys.should_gather(ceph_subsys_osd, 10)) {
+ stringstream ss;
+
+ for (auto& i : pgs) {
+ ss << i->get_pgid() << " ";
+ }
+
+ dout(10) << __func__ << " working on " << ss.str() << dendl;
+ }
+
+ if (newflags & OFR_CANCEL) {
+ for (auto& i : pgs) {
+ i->lock();
+ i->_change_recovery_force_mode(newstate, true);
+ i->unlock();
+ }
+ } else {
+ for (auto& i : pgs) {
+ // make sure the PG is in correct state before forcing backfill or recovery, or
+ // else we'll make PG keeping FORCE_* flag forever, requiring osds restart
+ // or forcing somehow recovery/backfill.
+ i->lock();
+ int pgstate = i->get_state();
+ if ( ((newstate == PG_STATE_FORCED_RECOVERY) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING))) ||
+ ((newstate == PG_STATE_FORCED_BACKFILL) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING))) )
+ i->_change_recovery_force_mode(newstate, false);
+ i->unlock();
+ }
+ }
+}
+
void OSD::do_recovery(
PG *pg, epoch_t queued, uint64_t reserved_pushes,
ThreadPool::TPHandle &handle)
{
uint64_t started = 0;
- if (cct->_conf->osd_recovery_sleep > 0) {
- handle.suspend_tp_timeout();
- pg->unlock();
- utime_t t;
- t.set_from_double(cct->_conf->osd_recovery_sleep);
- t.sleep();
- dout(20) << __func__ << " slept for " << t << dendl;
- pg->lock();
- handle.reset_tp_timeout();
+
+ /*
+ * When the value of osd_recovery_sleep is set greater than zero, recovery
+ * ops are scheduled after osd_recovery_sleep amount of time from the previous
+ * recovery event's schedule time. This is done by adding a
+ * recovery_requeue_callback event, which re-queues the recovery op using
+ * queue_recovery_after_sleep.
+ */
+ float recovery_sleep = get_osd_recovery_sleep();
+ {
+ Mutex::Locker l(service.recovery_sleep_lock);
+ if (recovery_sleep > 0 && service.recovery_needs_sleep) {
+ PGRef pgref(pg);
+ auto recovery_requeue_callback = new FunctionContext([this, pgref, queued, reserved_pushes](int r) {
+ dout(20) << "do_recovery wake up at "
+ << ceph_clock_now()
+ << ", re-queuing recovery" << dendl;
+ Mutex::Locker l(service.recovery_sleep_lock);
+ service.recovery_needs_sleep = false;
+ service.queue_recovery_after_sleep(pgref.get(), queued, reserved_pushes);
+ });
+
+ // This is true for the first recovery op and when the previous recovery op
+ // has been scheduled in the past. The next recovery op is scheduled after
+ // completing the sleep from now.
+ if (service.recovery_schedule_time < ceph_clock_now()) {
+ service.recovery_schedule_time = ceph_clock_now();
+ }
+ service.recovery_schedule_time += recovery_sleep;
+ service.recovery_sleep_timer.add_event_at(service.recovery_schedule_time,
+ recovery_requeue_callback);
+ dout(20) << "Recovery event scheduled at "
+ << service.recovery_schedule_time << dendl;
+ return;
+ }
}
{
+ {
+ Mutex::Locker l(service.recovery_sleep_lock);
+ service.recovery_needs_sleep = true;
+ }
+
if (pg->pg_has_reset_since(queued)) {
goto out;
}
if (!more && pg->have_unfound()) {
pg->discover_all_missing(*rctx.query_map);
if (rctx.query_map->empty()) {
- dout(10) << "do_recovery no luck, giving up on this pg for now" << dendl;
+ string action;
+ if (pg->state_test(PG_STATE_BACKFILLING)) {
+ auto evt = PG::CephPeeringEvtRef(new PG::CephPeeringEvt(
+ queued,
+ queued,
+ PG::DeferBackfill(cct->_conf->osd_recovery_retry_interval)));
+ pg->queue_peering_event(evt);
+ action = "in backfill";
+ } else if (pg->state_test(PG_STATE_RECOVERING)) {
+ auto evt = PG::CephPeeringEvtRef(new PG::CephPeeringEvt(
+ queued,
+ queued,
+ PG::DeferRecovery(cct->_conf->osd_recovery_retry_interval)));
+ pg->queue_peering_event(evt);
+ action = "in recovery";
+ } else {
+ action = "already out of recovery/backfill";
+ }
+ dout(10) << __func__ << ": no luck, giving up on this pg for now (" << action << ")" << dendl;
} else {
- dout(10) << "do_recovery no luck, giving up on this pg for now" << dendl;
+ dout(10) << __func__ << ": no luck, giving up on this pg for now (queue_recovery)" << dendl;
pg->queue_recovery();
}
}
bool OSDService::is_recovery_active()
{
- if (recovery_ops_active > 0)
- return true;
-
- return false;
+ return local_reserver.has_reservation() || remote_reserver.has_reservation();
}
// =========================================================
op->osd_trace.keyval("priority", op->get_req()->get_priority());
op->osd_trace.keyval("cost", op->get_req()->get_cost());
op->mark_queued_for_pg();
+ logger->tinc(l_osd_op_before_queue_op_lat, latency);
op_shardedwq.queue(make_pair(pg, PGQueueable(op, epoch)));
}
<< " " << *(op->get_req())
<< " pg " << *pg << dendl;
+ logger->tinc(l_osd_op_before_dequeue_op_lat, latency);
+
Session *session = static_cast<Session *>(
op->get_req()->get_connection()->get_priv());
if (session) {
struct C_CompleteSplits : public Context {
OSD *osd;
- set<boost::intrusive_ptr<PG> > pgs;
- C_CompleteSplits(OSD *osd, const set<boost::intrusive_ptr<PG> > &in)
+ set<PGRef> pgs;
+ C_CompleteSplits(OSD *osd, const set<PGRef> &in)
: osd(osd), pgs(in) {}
void finish(int r) override {
Mutex::Locker l(osd->osd_lock);
if (osd->is_stopping())
return;
PG::RecoveryCtx rctx = osd->create_context();
- for (set<boost::intrusive_ptr<PG> >::iterator i = pgs.begin();
+ for (set<PGRef>::iterator i = pgs.begin();
i != pgs.end();
++i) {
osd->pg_map_lock.get_write();
(*i)->lock();
- osd->add_newly_split_pg(&**i, &rctx);
+ PG *pg = i->get();
+ osd->add_newly_split_pg(pg, &rctx);
if (!((*i)->deleting)) {
set<spg_t> to_complete;
to_complete.insert((*i)->info.pgid);
osd->service.complete_split(to_complete);
}
osd->pg_map_lock.put_write();
- osd->dispatch_context_transaction(rctx, &**i);
+ osd->dispatch_context_transaction(rctx, pg);
osd->wake_pg_waiters(*i);
(*i)->unlock();
}
for (list<PG*>::const_iterator i = pgs.begin();
i != pgs.end();
++i) {
- set<boost::intrusive_ptr<PG> > split_pgs;
+ set<PGRef> split_pgs;
PG *pg = *i;
pg->lock_suspend_timeout(handle);
curmap = service.get_osdmap();
static const char* KEYS[] = {
"osd_max_backfills",
"osd_min_recovery_priority",
- "osd_op_complaint_time", "osd_op_log_threshold",
- "osd_op_history_size", "osd_op_history_duration",
+ "osd_max_trimming_pgs",
+ "osd_op_complaint_time",
+ "osd_op_log_threshold",
+ "osd_op_history_size",
+ "osd_op_history_duration",
+ "osd_op_history_slow_op_size",
+ "osd_op_history_slow_op_threshold",
"osd_enable_op_tracker",
"osd_map_cache_size",
"osd_map_max_advance",
"osd_recovery_delay_start",
"osd_client_message_size_cap",
"osd_client_message_cap",
+ "osd_heartbeat_min_size",
+ "osd_heartbeat_interval",
NULL
};
return KEYS;
if (base_pool && base_pool->require_rollback()) {
if ((iter->op.op != CEPH_OSD_OP_READ) &&
(iter->op.op != CEPH_OSD_OP_CHECKSUM) &&
+ (iter->op.op != CEPH_OSD_OP_CMPEXT) &&
(iter->op.op != CEPH_OSD_OP_STAT) &&
(iter->op.op != CEPH_OSD_OP_ISDIRTY) &&
(iter->op.op != CEPH_OSD_OP_UNDIRTY) &&
in_use.insert(out->begin(), out->end());
}
+
// =============================================================
#undef dout_context
uint32_t shard_index = pgid.hash_to_shard(shard_list.size());
auto sdata = shard_list[shard_index];
bool queued = false;
- unsigned pushes_to_free = 0;
{
Mutex::Locker l(sdata->sdata_op_ordering_lock);
auto p = sdata->pg_slots.find(pgid);
++i) {
sdata->_enqueue_front(make_pair(pgid, *i), osd->op_prio_cutoff);
}
- for (auto& q : p->second.to_process) {
- pushes_to_free += q.get_reserved_pushes();
- }
p->second.to_process.clear();
p->second.waiting_for_pg = false;
++p->second.requeue_seq;
queued = true;
}
}
- if (pushes_to_free > 0) {
- osd->service.release_reserved_pushes(pushes_to_free);
- }
if (queued) {
sdata->sdata_lock.Lock();
sdata->sdata_cond.SignalOne();
if (sdata->pqueue->empty()) {
dout(20) << __func__ << " empty q, waiting" << dendl;
// optimistically sleep a moment; maybe another work item will come along.
- sdata->sdata_op_ordering_lock.Unlock();
osd->cct->get_heartbeat_map()->reset_timeout(hb,
osd->cct->_conf->threadpool_default_timeout, 0);
sdata->sdata_lock.Lock();
+ sdata->sdata_op_ordering_lock.Unlock();
sdata->sdata_cond.WaitInterval(sdata->sdata_lock,
utime_t(osd->cct->_conf->threadpool_empty_queue_max_wait, 0));
sdata->sdata_lock.Unlock();
}} // namespace ceph::osd_cmds
+
+std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q) {
+ switch(q) {
+ case OSD::io_queue::prioritized:
+ out << "prioritized";
+ break;
+ case OSD::io_queue::weightedpriority:
+ out << "weightedpriority";
+ break;
+ case OSD::io_queue::mclock_opclass:
+ out << "mclock_opclass";
+ break;
+ case OSD::io_queue::mclock_client:
+ out << "mclock_client";
+ break;
+ }
+ return out;
+}