]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/OSD.cc
update sources to 12.2.2
[ceph.git] / ceph / src / osd / OSD.cc
index 5b979c8c9f48307385f486962b9923f65d87b95e..926a3be7e03422c9a9bff9e4f3c5914385020110 100644 (file)
@@ -45,6 +45,7 @@
 #include "common/ceph_time.h"
 #include "common/version.h"
 #include "common/io_priority.h"
+#include "common/pick_address.h"
 
 #include "os/ObjectStore.h"
 #ifdef HAVE_LIBFUSE
@@ -88,6 +89,7 @@
 #include "messages/MOSDPGBackfill.h"
 #include "messages/MBackfillReserve.h"
 #include "messages/MRecoveryReserve.h"
+#include "messages/MOSDForceRecovery.h"
 #include "messages/MOSDECSubOpWrite.h"
 #include "messages/MOSDECSubOpWriteReply.h"
 #include "messages/MOSDECSubOpRead.h"
@@ -187,6 +189,7 @@ CompatSet OSD::get_osd_initial_compat_set() {
   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA);
   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING);
   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO);
+  ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES);
   return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
                   ceph_osd_feature_incompat);
 }
@@ -244,9 +247,9 @@ OSDService::OSDService(OSD *osd) :
   recovery_sleep_lock("OSDService::recovery_sleep_lock"),
   recovery_sleep_timer(cct, recovery_sleep_lock, false),
   reserver_finisher(cct),
-  local_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
+  local_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
                 cct->_conf->osd_min_recovery_priority),
-  remote_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
+  remote_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
                  cct->_conf->osd_min_recovery_priority),
   pg_temp_lock("OSDService::pg_temp_lock"),
   snap_sleep_lock("OSDService::snap_sleep_lock"),
@@ -255,7 +258,7 @@ OSDService::OSDService(OSD *osd) :
   scrub_sleep_lock("OSDService::scrub_sleep_lock"),
   scrub_sleep_timer(
     osd->client_messenger->cct, scrub_sleep_lock, false /* relax locking */),
-  snap_reserver(&reserver_finisher,
+  snap_reserver(cct, &reserver_finisher,
                cct->_conf->osd_max_trimming_pgs),
   recovery_lock("OSDService::recovery_lock"),
   recovery_ops_active(0),
@@ -582,6 +585,11 @@ void OSDService::activate_map()
   agent_lock.Unlock();
 }
 
+void OSDService::request_osdmap_update(epoch_t e)
+{
+  osd->osdmap_subscribe(e, false);
+}
+
 class AgentTimeoutCB : public Context {
   PGRef pg;
 public:
@@ -818,11 +826,11 @@ void OSDService::check_full_status(float ratio)
     dout(10) << __func__ << " " << get_full_state_name(cur_state)
             << " -> " << get_full_state_name(new_state) << dendl;
     if (new_state == FAILSAFE) {
-      clog->error() << "failsafe engaged, dropping updates, now "
+      clog->error() << "full status failsafe engaged, dropping updates, now "
                    << (int)roundf(ratio * 100) << "% full";
     } else if (cur_state == FAILSAFE) {
-      clog->error() << "failsafe disengaged, no longer dropping updates, now "
-                   << (int)roundf(ratio * 100) << "% full";
+      clog->error() << "full status failsafe disengaged, no longer dropping "
+                    << "updates, now " << (int)roundf(ratio * 100) << "% full";
     }
     cur_state = new_state;
   }
@@ -921,7 +929,8 @@ void OSDService::set_injectfull(s_names type, int64_t count)
 }
 
 osd_stat_t OSDService::set_osd_stat(const struct store_statfs_t &stbuf,
-                                    vector<int>& hb_peers)
+                                    vector<int>& hb_peers,
+                                   int num_pgs)
 {
   uint64_t bytes = stbuf.total;
   uint64_t used = bytes - stbuf.available;
@@ -938,6 +947,7 @@ osd_stat_t OSDService::set_osd_stat(const struct store_statfs_t &stbuf,
     osd_stat.kb = bytes >> 10;
     osd_stat.kb_used = used >> 10;
     osd_stat.kb_avail = avail >> 10;
+    osd_stat.num_pgs = num_pgs;
     return osd_stat;
   }
 }
@@ -952,7 +962,7 @@ void OSDService::update_osd_stat(vector<int>& hb_peers)
     return;
   }
 
-  auto new_stat = set_osd_stat(stbuf, hb_peers);
+  auto new_stat = set_osd_stat(stbuf, hb_peers, osd->get_num_pgs());
   dout(20) << "update_osd_stat " << new_stat << dendl;
   assert(new_stat.kb);
   float ratio = ((float)new_stat.kb_used) / ((float)new_stat.kb);
@@ -1078,7 +1088,9 @@ void OSDService::send_pg_temp()
 void OSDService::send_pg_created(pg_t pgid)
 {
   dout(20) << __func__ << dendl;
-  monc->send_mon_message(new MOSDPGCreated(pgid));
+  if (osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+    monc->send_mon_message(new MOSDPGCreated(pgid));
+  }
 }
 
 // --------------------------------------
@@ -1777,7 +1789,7 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, const string &dev,
     waiter.wait();
   }
 
-  ret = write_meta(store, sb.cluster_fsid, sb.osd_fsid, whoami);
+  ret = write_meta(cct, store, sb.cluster_fsid, sb.osd_fsid, whoami);
   if (ret) {
     derr << "OSD::mkfs: failed to write fsid file: error "
          << cpp_strerror(ret) << dendl;
@@ -1791,7 +1803,7 @@ free_store:
   return ret;
 }
 
-int OSD::write_meta(ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami)
+int OSD::write_meta(CephContext *cct, ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami)
 {
   char val[80];
   int r;
@@ -1811,6 +1823,14 @@ int OSD::write_meta(ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid,
   if (r < 0)
     return r;
 
+  string key = cct->_conf->get_val<string>("key");
+  lderr(cct) << "key " << key << dendl;
+  if (key.size()) {
+    r = store->write_meta("osd_key", key);
+    if (r < 0)
+      return r;
+  }
+
   r = store->write_meta("ready", "ready");
   if (r < 0)
     return r;
@@ -1906,6 +1926,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"),
   command_tp(cct, "OSD::command_tp", "tp_osd_cmd",  1),
   session_waiting_lock("OSD::session_waiting_lock"),
+  osdmap_subscribe_lock("OSD::osdmap_subscribe_lock"),
   heartbeat_lock("OSD::heartbeat_lock"),
   heartbeat_stop(false),
   heartbeat_need_update(true),
@@ -2042,30 +2063,48 @@ bool OSD::asok_command(string admin_command, cmdmap_t& cmdmap, string format,
   } else if (admin_command == "flush_journal") {
     store->flush_journal();
   } else if (admin_command == "dump_ops_in_flight" ||
-            admin_command == "ops") {
-    if (!op_tracker.dump_ops_in_flight(f)) {
-      ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
-       Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
-    }
-  } else if (admin_command == "dump_blocked_ops") {
-    if (!op_tracker.dump_ops_in_flight(f, true)) {
-      ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
-       Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
-    }
-  } else if (admin_command == "dump_historic_ops") {
-    if (!op_tracker.dump_historic_ops(f, false)) {
-      ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
-       Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
-    }
-  } else if (admin_command == "dump_historic_ops_by_duration") {
-    if (!op_tracker.dump_historic_ops(f, true)) {
-      ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
-       Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
-    }
-  } else if (admin_command == "dump_historic_slow_ops") {
-    if (!op_tracker.dump_historic_slow_ops(f)) {
-      ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
-       Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
+             admin_command == "ops" ||
+             admin_command == "dump_blocked_ops" ||
+             admin_command == "dump_historic_ops" ||
+             admin_command == "dump_historic_ops_by_duration" ||
+             admin_command == "dump_historic_slow_ops") {
+
+    const string error_str = "op_tracker tracking is not enabled now, so no ops are tracked currently, \
+even those get stuck. Please enable \"osd_enable_op_tracker\", and the tracker \
+will start to track new ops received afterwards.";
+
+    set<string> filters;
+    vector<string> filter_str;
+    if (cmd_getval(cct, cmdmap, "filterstr", filter_str)) {
+        copy(filter_str.begin(), filter_str.end(),
+           inserter(filters, filters.end()));
+    }
+
+    if (admin_command == "dump_ops_in_flight" ||
+        admin_command == "ops") {
+      if (!op_tracker.dump_ops_in_flight(f, false, filters)) {
+        ss << error_str;
+      }
+    }
+    if (admin_command == "dump_blocked_ops") {
+      if (!op_tracker.dump_ops_in_flight(f, true, filters)) {
+        ss << error_str;
+      }
+    }
+    if (admin_command == "dump_historic_ops") {
+      if (!op_tracker.dump_historic_ops(f, false, filters)) {
+        ss << error_str;
+      }
+    }
+    if (admin_command == "dump_historic_ops_by_duration") {
+      if (!op_tracker.dump_historic_ops(f, true, filters)) {
+        ss << error_str;
+      }
+    }
+    if (admin_command == "dump_historic_slow_ops") {
+      if (!op_tracker.dump_historic_slow_ops(f, filters)) {
+        ss << error_str;
+      }
     }
   } else if (admin_command == "dump_op_pq_state") {
     f->open_object_section("pq");
@@ -2280,10 +2319,10 @@ int OSD::enable_disable_fuse(bool stop)
     delete fuse_store;
     fuse_store = NULL;
     r = ::rmdir(mntpath.c_str());
-    if (r < 0)
-      r = -errno;
     if (r < 0) {
-      derr << __func__ << " failed to rmdir " << mntpath << dendl;
+      r = -errno;
+      derr << __func__ << " failed to rmdir " << mntpath << ": "
+           << cpp_strerror(r) << dendl;
       return r;
     }
     return 0;
@@ -2331,6 +2370,18 @@ int OSD::get_num_op_threads()
     return get_num_op_shards() * cct->_conf->osd_op_num_threads_per_shard_ssd;
 }
 
+float OSD::get_osd_recovery_sleep()
+{
+  if (cct->_conf->osd_recovery_sleep)
+    return cct->_conf->osd_recovery_sleep;
+  if (!store_is_rotational && !journal_is_rotational)
+    return cct->_conf->osd_recovery_sleep_ssd;
+  else if (store_is_rotational && !journal_is_rotational)
+    return cct->_conf->get_val<double>("osd_recovery_sleep_hybrid");
+  else
+    return cct->_conf->osd_recovery_sleep_hdd;
+}
+
 int OSD::init()
 {
   CompatSet initial, diff;
@@ -2347,6 +2398,7 @@ int OSD::init()
   dout(2) << "init " << dev_path
          << " (looks like " << (store_is_rotational ? "hdd" : "ssd") << ")"
          << dendl;
+  dout(2) << "journal " << journal_path << dendl;
   assert(store);  // call pre_init() first!
 
   store->set_cache_shards(get_num_op_shards());
@@ -2356,6 +2408,9 @@ int OSD::init()
     derr << "OSD:init: unable to mount object store" << dendl;
     return r;
   }
+  journal_is_rotational = store->is_journal_rotational();
+  dout(2) << "journal looks like " << (journal_is_rotational ? "hdd" : "ssd")
+          << dendl;
 
   enable_disable_fuse(false);
 
@@ -2483,6 +2538,9 @@ int OSD::init()
 
   clear_temp_objects();
 
+  // initialize osdmap references in sharded wq
+  op_shardedwq.prune_pg_waiters(osdmap, whoami);
+
   // load up pgs (as they previously existed)
   load_pgs();
 
@@ -2582,9 +2640,11 @@ int OSD::init()
 
   r = monc->authenticate();
   if (r < 0) {
+    derr << __func__ << " authentication failed: " << cpp_strerror(r)
+         << dendl;
     osd_lock.Lock(); // locker is going to unlock this on function exit
     if (is_stopping())
-      r =  0;
+      r = 0;
     goto monout;
   }
 
@@ -2592,9 +2652,10 @@ int OSD::init()
     derr << "unable to obtain rotating service keys; retrying" << dendl;
     ++rotating_auth_attempts;
     if (rotating_auth_attempts > g_conf->max_rotating_auth_attempts) {
+        derr << __func__ << " wait_auth_rotating timed out" << dendl;
         osd_lock.Lock(); // make locker happy
         if (!is_stopping()) {
-            r = - ETIMEDOUT;
+            r = -ETIMEDOUT;
         }
         goto monout;
     }
@@ -2602,12 +2663,16 @@ int OSD::init()
 
   r = update_crush_device_class();
   if (r < 0) {
+    derr << __func__ << " unable to update_crush_device_class: "
+        << cpp_strerror(r) << dendl;
     osd_lock.Lock();
     goto monout;
   }
 
   r = update_crush_location();
   if (r < 0) {
+    derr << __func__ << " unable to update_crush_location: "
+         << cpp_strerror(r) << dendl;
     osd_lock.Lock();
     goto monout;
   }
@@ -2643,8 +2708,7 @@ int OSD::init()
 
   return 0;
 monout:
-  mgrc.shutdown();
-  monc->shutdown();
+  exit(1);
 
 out:
   enable_disable_fuse(true);
@@ -2666,26 +2730,38 @@ void OSD::final_init()
                                      "flush the journal to permanent store");
   assert(r == 0);
   r = admin_socket->register_command("dump_ops_in_flight",
-                                    "dump_ops_in_flight", asok_hook,
+                                    "dump_ops_in_flight " \
+                                    "name=filterstr,type=CephString,n=N,req=false",
+                                    asok_hook,
                                     "show the ops currently in flight");
   assert(r == 0);
   r = admin_socket->register_command("ops",
-                                    "ops", asok_hook,
+                                    "ops " \
+                                    "name=filterstr,type=CephString,n=N,req=false",
+                                    asok_hook,
                                     "show the ops currently in flight");
   assert(r == 0);
   r = admin_socket->register_command("dump_blocked_ops",
-                                    "dump_blocked_ops", asok_hook,
+                                    "dump_blocked_ops " \
+                                    "name=filterstr,type=CephString,n=N,req=false",
+                                    asok_hook,
                                     "show the blocked ops currently in flight");
   assert(r == 0);
-  r = admin_socket->register_command("dump_historic_ops", "dump_historic_ops",
+  r = admin_socket->register_command("dump_historic_ops",
+                                     "dump_historic_ops " \
+                                     "name=filterstr,type=CephString,n=N,req=false",
                                     asok_hook,
                                     "show recent ops");
   assert(r == 0);
-  r = admin_socket->register_command("dump_historic_slow_ops", "dump_historic_slow_ops",
+  r = admin_socket->register_command("dump_historic_slow_ops",
+                                     "dump_historic_slow_ops " \
+                                     "name=filterstr,type=CephString,n=N,req=false",
                                     asok_hook,
                                     "show slowest recent ops");
   assert(r == 0);
-  r = admin_socket->register_command("dump_historic_ops_by_duration", "dump_historic_ops_by_duration",
+  r = admin_socket->register_command("dump_historic_ops_by_duration",
+                                     "dump_historic_ops_by_duration " \
+                                     "name=filterstr,type=CephString,n=N,req=false",
                                     asok_hook,
                                     "show slowest recent ops, sorted by duration");
   assert(r == 0);
@@ -2888,6 +2964,9 @@ void OSD::create_logger()
   };
 
 
+  // All the basic OSD operation stats are to be considered useful
+  osd_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
+
   osd_plb.add_u64(
     l_osd_op_wip, "op_wip",
     "Replication operations currently being processed (primary)");
@@ -2975,6 +3054,10 @@ void OSD::create_logger()
     l_osd_op_rw_prepare_lat, "op_rw_prepare_latency",
     "Latency of read-modify-write operations (excluding queue time and wait for finished)");
 
+  // Now we move on to some more obscure stats, revert to assuming things
+  // are low priority unless otherwise specified.
+  osd_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
+
   osd_plb.add_time_avg(l_osd_op_before_queue_op_lat, "op_before_queue_op_lat",
     "Latency of IO before calling queue(before really queue into ShardedOpWq)"); // client io before queue op_wq latency
   osd_plb.add_time_avg(l_osd_op_before_dequeue_op_lat, "op_before_dequeue_op_lat",
@@ -3061,8 +3144,12 @@ void OSD::create_logger()
     l_osd_map_bl_cache_miss, "osd_map_bl_cache_miss",
     "OSDMap buffer cache misses");
 
-  osd_plb.add_u64(l_osd_stat_bytes, "stat_bytes", "OSD size");
-  osd_plb.add_u64(l_osd_stat_bytes_used, "stat_bytes_used", "Used space");
+  osd_plb.add_u64(
+    l_osd_stat_bytes, "stat_bytes", "OSD size", "size",
+    PerfCountersBuilder::PRIO_USEFUL);
+  osd_plb.add_u64(
+    l_osd_stat_bytes_used, "stat_bytes_used", "Used space", "used",
+    PerfCountersBuilder::PRIO_USEFUL);
   osd_plb.add_u64(l_osd_stat_bytes_avail, "stat_bytes_avail", "Available space");
 
   osd_plb.add_u64_counter(
@@ -3182,11 +3269,14 @@ int OSD::shutdown()
   set_state(STATE_STOPPING);
 
   // Debugging
-  cct->_conf->set_val("debug_osd", "100");
-  cct->_conf->set_val("debug_journal", "100");
-  cct->_conf->set_val("debug_filestore", "100");
-  cct->_conf->set_val("debug_ms", "100");
-  cct->_conf->apply_changes(NULL);
+  if (cct->_conf->get_val<bool>("osd_debug_shutdown")) {
+    cct->_conf->set_val("debug_osd", "100");
+    cct->_conf->set_val("debug_journal", "100");
+    cct->_conf->set_val("debug_filestore", "100");
+    cct->_conf->set_val("debug_bluestore", "100");
+    cct->_conf->set_val("debug_ms", "100");
+    cct->_conf->apply_changes(NULL);
+  }
 
   // stop MgrClient earlier as it's more like an internal consumer of OSD
   mgrc.shutdown();
@@ -3476,6 +3566,7 @@ int OSD::update_crush_device_class()
   }
 
   if (device_class.empty()) {
+    dout(20) << __func__ << " no device class stored locally" << dendl;
     return 0;
   }
 
@@ -3485,11 +3576,12 @@ int OSD::update_crush_device_class()
     string("\"ids\": [\"") + stringify(whoami) + string("\"]}");
 
   r = mon_cmd_maybe_osd_create(cmd);
-  if (r == -EPERM) {
-    r = 0;
-  }
-
-  return r;
+  // the above cmd can fail for various reasons, e.g.:
+  //   (1) we are connecting to a pre-luminous monitor
+  //   (2) user manually specify a class other than
+  //       'ceph-disk prepare --crush-device-class'
+  // simply skip result-checking for now
+  return 0;
 }
 
 void OSD::write_superblock(ObjectStore::Transaction& t)
@@ -3993,6 +4085,11 @@ void OSD::build_past_intervals_parallel()
         ++i) {
       PG *pg = i->second;
 
+      // Ignore PGs only partially created (DNE)
+      if (pg->info.dne()) {
+       continue;
+      }
+
       auto rpib = pg->get_required_past_interval_bounds(
        pg->info,
        superblock.oldest_map);
@@ -4179,6 +4276,11 @@ int OSD::handle_pg_peering_evt(
       ceph_abort();
     }
 
+    const bool is_mon_create =
+      evt->get_event().dynamic_type() == PG::NullEvt::static_type();
+    if (maybe_wait_for_max_pg(pgid, is_mon_create)) {
+      return -EAGAIN;
+    }
     // do we need to resurrect a deleting pg?
     spg_t resurrected;
     PGRef old_pg_state;
@@ -4319,6 +4421,88 @@ int OSD::handle_pg_peering_evt(
   }
 }
 
+bool OSD::maybe_wait_for_max_pg(spg_t pgid, bool is_mon_create)
+{
+  const auto max_pgs_per_osd =
+    (cct->_conf->get_val<uint64_t>("mon_max_pg_per_osd") *
+     cct->_conf->get_val<double>("osd_max_pg_per_osd_hard_ratio"));
+
+  RWLock::RLocker pg_map_locker{pg_map_lock};
+  if (pg_map.size() < max_pgs_per_osd) {
+    return false;
+  }
+  lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+  if (is_mon_create) {
+    pending_creates_from_mon++;
+  } else {
+    pending_creates_from_osd.emplace(pgid.pgid);
+  }
+  dout(5) << __func__ << " withhold creation of pg " << pgid
+         << ": " << pg_map.size() << " >= "<< max_pgs_per_osd << dendl;
+  return true;
+}
+
+// to re-trigger a peering, we have to twiddle the pg mapping a little bit,
+// see PG::should_restart_peering(). OSDMap::pg_to_up_acting_osds() will turn
+// to up set if pg_temp is empty. so an empty pg_temp won't work.
+static vector<int32_t> twiddle(const vector<int>& acting) {
+  if (acting.size() > 1) {
+    return {acting[0]};
+  } else {
+    vector<int32_t> twiddled(acting.begin(), acting.end());
+    twiddled.push_back(-1);
+    return twiddled;
+  }
+}
+
+void OSD::resume_creating_pg()
+{
+  bool do_sub_pg_creates = false;
+  MOSDPGTemp *pgtemp = nullptr;
+  {
+    const auto max_pgs_per_osd =
+      (cct->_conf->get_val<uint64_t>("mon_max_pg_per_osd") *
+       cct->_conf->get_val<double>("osd_max_pg_per_osd_hard_ratio"));
+    RWLock::RLocker l(pg_map_lock);
+    if (max_pgs_per_osd <= pg_map.size()) {
+      // this could happen if admin decreases this setting before a PG is removed
+      return;
+    }
+    unsigned spare_pgs = max_pgs_per_osd - pg_map.size();
+    lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+    if (pending_creates_from_mon > 0) {
+      do_sub_pg_creates = true;
+      if (pending_creates_from_mon >= spare_pgs) {
+       spare_pgs = pending_creates_from_mon = 0;
+      } else {
+       spare_pgs -= pending_creates_from_mon;
+       pending_creates_from_mon = 0;
+      }
+    }
+    auto pg = pending_creates_from_osd.cbegin();
+    while (spare_pgs > 0 && pg != pending_creates_from_osd.cend()) {
+      if (!pgtemp) {
+       pgtemp = new MOSDPGTemp{osdmap->get_epoch()};
+      }
+      vector<int> acting;
+      osdmap->pg_to_up_acting_osds(*pg, nullptr, nullptr, &acting, nullptr);
+      pgtemp->pg_temp[*pg] = twiddle(acting);
+      pg = pending_creates_from_osd.erase(pg);
+      spare_pgs--;
+    }
+  }
+  if (do_sub_pg_creates) {
+    if (monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0)) {
+      dout(4) << __func__ << ": resolicit pg creates from mon since "
+             << last_pg_create_epoch << dendl;
+      monc->renew_subs();
+    }
+  }
+  if (pgtemp) {
+    pgtemp->forced = true;
+    monc->send_mon_message(pgtemp);
+  }
+}
 
 void OSD::build_initial_pg_history(
   spg_t pgid,
@@ -4377,12 +4561,21 @@ void OSD::build_initial_pg_history(
       &debug);
     if (new_interval) {
       h->same_interval_since = e;
-    }
-    if (up != new_up) {
-      h->same_up_since = e;
-    }
-    if (acting_primary != new_acting_primary) {
-      h->same_primary_since = e;
+      if (up != new_up) {
+        h->same_up_since = e;
+      }
+      if (acting_primary != new_acting_primary) {
+        h->same_primary_since = e;
+      }
+      if (pgid.pgid.is_split(lastmap->get_pg_num(pgid.pgid.pool()),
+                             osdmap->get_pg_num(pgid.pgid.pool()),
+                             nullptr)) {
+        h->last_epoch_split = e;
+      }
+      up = new_up;
+      acting = new_acting;
+      up_primary = new_up_primary;
+      acting_primary = new_acting_primary;
     }
     lastmap = osdmap;
   }
@@ -4692,7 +4885,11 @@ void OSD::handle_osd_ping(MOSDPing *m)
   }
 
   OSDMapRef curmap = service.get_osdmap();
-  assert(curmap);
+  if (!curmap) {
+    heartbeat_lock.Unlock();
+    m->put();
+    return;
+  }
 
   switch (m->op) {
 
@@ -5111,6 +5308,7 @@ void OSD::tick_without_osd_lock()
       sched_scrub();
     }
     service.promote_throttle_recalibrate();
+    resume_creating_pg();
     bool need_send_beacon = false;
     const auto now = ceph::coarse_mono_clock::now();
     {
@@ -5178,7 +5376,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
     if (pool < 0 && isdigit(poolstr[0]))
       pool = atoll(poolstr.c_str());
     if (pool < 0) {
-      ss << "Invalid pool" << poolstr;
+      ss << "Invalid pool '" << poolstr << "''";
       return;
     }
 
@@ -5657,6 +5855,9 @@ void OSD::_preboot(epoch_t oldest, epoch_t newest)
   // if our map within recent history, try to add ourselves to the osdmap.
   if (osdmap->get_epoch() == 0) {
     derr << "waiting for initial osdmap" << dendl;
+  } else if (osdmap->is_destroyed(whoami)) {
+    derr << "osdmap says I am destroyed, exiting" << dendl;
+    exit(0);
   } else if (osdmap->test_flag(CEPH_OSDMAP_NOUP) || osdmap->is_noup(whoami)) {
     derr << "osdmap NOUP flag is set, waiting for it to clear" << dendl;
   } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
@@ -5709,6 +5910,9 @@ void OSD::start_waiting_for_healthy()
   dout(1) << "start_waiting_for_healthy" << dendl;
   set_state(STATE_WAITING_FOR_HEALTHY);
   last_heartbeat_resample = utime_t();
+
+  // subscribe to osdmap updates, in case our peers really are known to be dead
+  osdmap_subscribe(osdmap->get_epoch() + 1, false);
 }
 
 bool OSD::_is_healthy()
@@ -5808,7 +6012,10 @@ void OSD::_collect_metadata(map<string,string> *pm)
 {
   // config info
   (*pm)["osd_data"] = dev_path;
-  (*pm)["osd_journal"] = journal_path;
+  if (store->get_type() == "filestore") {
+    // not applicable for bluestore
+    (*pm)["osd_journal"] = journal_path;
+  }
   (*pm)["front_addr"] = stringify(client_messenger->get_myaddr());
   (*pm)["back_addr"] = stringify(cluster_messenger->get_myaddr());
   (*pm)["hb_front_addr"] = stringify(hb_front_server_messenger->get_myaddr());
@@ -5817,10 +6024,23 @@ void OSD::_collect_metadata(map<string,string> *pm)
   // backend
   (*pm)["osd_objectstore"] = store->get_type();
   (*pm)["rotational"] = store_is_rotational ? "1" : "0";
+  (*pm)["journal_rotational"] = journal_is_rotational ? "1" : "0";
+  (*pm)["default_device_class"] = store->get_default_device_class();
   store->collect_metadata(pm);
 
   collect_sys_info(pm, cct);
 
+  std::string front_iface, back_iface;
+  /*
+  pick_iface(cct,
+      CEPH_PICK_ADDRESS_PUBLIC | CEPH_PICK_ADDRESS_CLUSTER,
+      &front_iface, &back_iface);
+      */
+  (*pm)["front_iface"] = pick_iface(cct,
+      client_messenger->get_myaddr().get_sockaddr_storage());
+  (*pm)["back_iface"] = pick_iface(cct,
+      cluster_messenger->get_myaddr().get_sockaddr_storage());
+
   dout(10) << __func__ << " " << *pm << dendl;
 }
 
@@ -6213,6 +6433,10 @@ COMMAND("injectargs " \
        "name=injected_args,type=CephString,n=N",
        "inject configuration arguments into running OSD",
        "osd", "rw", "cli,rest")
+COMMAND("config set " \
+       "name=key,type=CephString name=value,type=CephString",
+       "Set a configuration option at runtime (not persistent)",
+       "osd", "rw", "cli,rest")
 COMMAND("cluster_log " \
        "name=level,type=CephChoices,strings=error,warning,info,debug " \
        "name=message,type=CephString,n=N",
@@ -6327,6 +6551,18 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
     r = cct->_conf->injectargs(args, &ss);
     osd_lock.Lock();
   }
+  else if (prefix == "config set") {
+    std::string key;
+    std::string val;
+    cmd_getval(cct, cmdmap, "key", key);
+    cmd_getval(cct, cmdmap, "value", val);
+    osd_lock.Unlock();
+    r = cct->_conf->set_val(key, val, true, &ss);
+    if (r == 0) {
+      cct->_conf->apply_changes(nullptr);
+    }
+    osd_lock.Lock();
+  }
   else if (prefix == "cluster_log") {
     vector<string> msg;
     cmd_getval(cct, cmdmap, "message", msg);
@@ -6931,10 +7167,16 @@ bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
   uint64_t global_id;
   uint64_t auid = CEPH_AUTH_UID_DEFAULT;
 
-  isvalid = authorize_handler->verify_authorizer(
-    cct, monc->rotating_secrets.get(),
-    authorizer_data, authorizer_reply, name, global_id, caps_info, session_key,
-    &auid);
+  RotatingKeyRing *keys = monc->rotating_secrets.get();
+  if (keys) {
+    isvalid = authorize_handler->verify_authorizer(
+      cct, keys,
+      authorizer_data, authorizer_reply, name, global_id, caps_info, session_key,
+      &auid);
+  } else {
+    dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
+    isvalid = false;
+  }
 
   if (isvalid) {
     Session *s = static_cast<Session *>(con->get_priv());
@@ -7053,6 +7295,10 @@ void OSD::_dispatch(Message *m)
     handle_scrub(static_cast<MOSDScrub*>(m));
     break;
 
+  case MSG_OSD_FORCE_RECOVERY:
+    handle_force_recovery(m);
+    break;
+
     // -- need OSDMap --
 
   case MSG_OSD_PG_CREATE:
@@ -7242,6 +7488,11 @@ void OSD::sched_scrub()
   if (!service.can_inc_scrubs_pending()) {
     return;
   }
+  if (!cct->_conf->osd_scrub_during_recovery && service.is_recovery_active()) {
+    dout(20) << __func__ << " not scheduling scrubs due to active recovery" << dendl;
+    return;
+  }
+
 
   utime_t now = ceph_clock_now();
   bool time_permit = scrub_time_permit(now);
@@ -7260,11 +7511,6 @@ void OSD::sched_scrub()
        break;
       }
 
-      if (!cct->_conf->osd_scrub_during_recovery && service.is_recovery_active()) {
-        dout(10) << __func__ << "not scheduling scrub of " << scrub.pgid << " due to active recovery ops" << dendl;
-        break;
-      }
-
       if ((scrub.deadline >= now) && !(time_permit && load_is_low)) {
         dout(10) << __func__ << " not scheduling scrub for " << scrub.pgid << " due to "
                  << (!time_permit ? "time not permit" : "high load") << dendl;
@@ -7364,10 +7610,12 @@ struct C_OnMapApply : public Context {
 
 void OSD::osdmap_subscribe(version_t epoch, bool force_request)
 {
-  OSDMapRef osdmap = service.get_osdmap();
-  if (osdmap->get_epoch() >= epoch)
+  Mutex::Locker l(osdmap_subscribe_lock);
+  if (latest_subscribed_epoch >= epoch && !force_request)
     return;
 
+  latest_subscribed_epoch = MAX(epoch, latest_subscribed_epoch);
+
   if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
       force_request) {
     monc->renew_subs();
@@ -7751,9 +7999,11 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
        if (service.is_preparing_to_stop() || service.is_stopping()) {
          service.got_stop_ack();
        } else {
-         clog->warn() << "map e" << osdmap->get_epoch()
-                      << " wrongly marked me down at e"
-                      << osdmap->get_down_at(whoami);
+          clog->warn() << "Monitor daemon marked osd." << whoami << " down, "
+                          "but it is still running";
+          clog->debug() << "map e" << osdmap->get_epoch()
+                        << " wrongly marked me down at e"
+                        << osdmap->get_down_at(whoami);
        }
       } else if (!osdmap->get_addr(whoami).probably_equals(
                   client_messenger->get_myaddr())) {
@@ -7769,7 +8019,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
       } else if (!osdmap->get_hb_back_addr(whoami).probably_equals(
                   hb_back_server_messenger->get_myaddr())) {
        clog->error() << "map e" << osdmap->get_epoch()
-                     << " had wrong hb back addr ("
+                     << " had wrong heartbeat back addr ("
                      << osdmap->get_hb_back_addr(whoami)
                      << " != my " << hb_back_server_messenger->get_myaddr()
                      << ")";
@@ -7777,7 +8027,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
                 !osdmap->get_hb_front_addr(whoami).probably_equals(
                   hb_front_server_messenger->get_myaddr())) {
        clog->error() << "map e" << osdmap->get_epoch()
-                     << " had wrong hb front addr ("
+                     << " had wrong heartbeat front addr ("
                      << osdmap->get_hb_front_addr(whoami)
                      << " != my " << hb_front_server_messenger->get_myaddr()
                      << ")";
@@ -8036,6 +8286,15 @@ void OSD::consume_map()
   assert(osd_lock.is_locked());
   dout(7) << "consume_map version " << osdmap->get_epoch() << dendl;
 
+  /** make sure the cluster is speaking in SORTBITWISE, because we don't
+   *  speak the older sorting version any more. Be careful not to force
+   *  a shutdown if we are merely processing old maps, though.
+   */
+  if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE) && is_active()) {
+    derr << __func__ << " SORTBITWISE flag is not set" << dendl;
+    ceph_abort();
+  }
+
   int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0;
   list<PGRef> to_remove;
 
@@ -8063,6 +8322,16 @@ void OSD::consume_map()
 
       pg->unlock();
     }
+
+    lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+    for (auto pg = pending_creates_from_osd.cbegin();
+        pg != pending_creates_from_osd.cend();) {
+      if (osdmap->get_pg_acting_rank(*pg, whoami) < 0) {
+       pg = pending_creates_from_osd.erase(pg);
+      } else {
+       ++pg;
+      }
+    }
   }
 
   for (list<PGRef>::iterator i = to_remove.begin();
@@ -8117,11 +8386,6 @@ void OSD::activate_map()
 
   dout(7) << "activate_map version " << osdmap->get_epoch() << dendl;
 
-  if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
-    derr << __func__ << " SORTBITWISE flag is not set" << dendl;
-    ceph_abort();
-  }
-
   if (osdmap->test_flag(CEPH_OSDMAP_FULL)) {
     dout(10) << " osdmap flagged full, doing onetime osdmap subscribe" << dendl;
     osdmap_subscribe(osdmap->get_epoch() + 1, false);
@@ -8396,7 +8660,6 @@ void OSD::handle_pg_create(OpRequestRef op)
               << dendl;
       continue;
     }
-
     if (handle_pg_peering_evt(
           pgid,
           history,
@@ -8411,8 +8674,13 @@ void OSD::handle_pg_create(OpRequestRef op)
       service.send_pg_created(pgid.pgid);
     }
   }
-  last_pg_create_epoch = m->epoch;
 
+  {
+    lock_guard<mutex> pending_creates_locker{pending_creates_lock};
+    if (pending_creates_from_mon == 0) {
+      last_pg_create_epoch = m->epoch;
+    }
+  }
   maybe_update_heartbeat_peers();
 }
 
@@ -8532,7 +8800,7 @@ void OSD::do_notifies(
       continue;
     }
     service.share_map_peer(it->first, con.get(), curmap);
-    dout(7) << __func__ << " osd " << it->first
+    dout(7) << __func__ << " osd." << it->first
            << " on " << it->second.size() << " PGs" << dendl;
     MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
                                       it->second);
@@ -8788,6 +9056,8 @@ void OSD::handle_pg_backfill_reserve(OpRequestRef op)
        m->query_epoch,
        PG::RemoteBackfillReserved()));
   } else if (m->type == MBackfillReserve::REJECT) {
+    // NOTE: this is replica -> primary "i reject your request"
+    //      and also primary -> replica "cancel my previously-granted request"
     evt = PG::CephPeeringEvtRef(
       new PG::CephPeeringEvt(
        m->query_epoch,
@@ -8860,6 +9130,33 @@ void OSD::handle_pg_recovery_reserve(OpRequestRef op)
   pg->unlock();
 }
 
+void OSD::handle_force_recovery(Message *m)
+{
+  MOSDForceRecovery *msg = static_cast<MOSDForceRecovery*>(m);
+  assert(msg->get_type() == MSG_OSD_FORCE_RECOVERY);
+
+  vector<PGRef> local_pgs;
+  local_pgs.reserve(msg->forced_pgs.size());
+
+  {
+    RWLock::RLocker l(pg_map_lock);
+    for (auto& i : msg->forced_pgs) {
+      spg_t locpg;
+      if (osdmap->get_primary_shard(i, &locpg)) {
+       auto pg_map_entry = pg_map.find(locpg);
+       if (pg_map_entry != pg_map.end()) {
+         local_pgs.push_back(pg_map_entry->second);
+       }
+      }
+    }
+  }
+
+  if (local_pgs.size()) {
+    service.adjust_pg_priorities(local_pgs, msg->options);
+  }
+
+  msg->put();
+}
 
 /** PGQuery
  * from primary to replica | stray
@@ -9067,7 +9364,6 @@ void OSD::_remove_pg(PG *pg)
   pg->put("PGMap"); // since we've taken it out of map
 }
 
-
 // =========================================================
 // RECOVERY
 
@@ -9114,6 +9410,52 @@ bool OSDService::_recover_now(uint64_t *available_pushes)
   return true;
 }
 
+
+void OSDService::adjust_pg_priorities(const vector<PGRef>& pgs, int newflags)
+{
+  if (!pgs.size() || !(newflags & (OFR_BACKFILL | OFR_RECOVERY)))
+    return;
+  int newstate = 0;
+
+  if (newflags & OFR_BACKFILL) {
+    newstate = PG_STATE_FORCED_BACKFILL;
+  } else if (newflags & OFR_RECOVERY) {
+    newstate = PG_STATE_FORCED_RECOVERY;
+  }
+
+  // debug output here may get large, don't generate it if debug level is below
+  // 10 and use abbreviated pg ids otherwise
+  if ((cct)->_conf->subsys.should_gather(ceph_subsys_osd, 10)) {
+    stringstream ss;
+
+    for (auto& i : pgs) {
+      ss << i->get_pgid() << " ";
+    }
+
+    dout(10) << __func__ << " working on " << ss.str() << dendl;
+  }
+
+  if (newflags & OFR_CANCEL) {
+    for (auto& i : pgs) {
+      i->lock();
+      i->_change_recovery_force_mode(newstate, true);
+      i->unlock();
+    }
+  } else {
+    for (auto& i : pgs) {
+      // make sure the PG is in correct state before forcing backfill or recovery, or
+      // else we'll make PG keeping FORCE_* flag forever, requiring osds restart
+      // or forcing somehow recovery/backfill.
+      i->lock();
+      int pgstate = i->get_state();
+      if ( ((newstate == PG_STATE_FORCED_RECOVERY) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING))) ||
+           ((newstate == PG_STATE_FORCED_BACKFILL) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING))) )
+        i->_change_recovery_force_mode(newstate, false);
+      i->unlock();
+    }
+  }
+}
+
 void OSD::do_recovery(
   PG *pg, epoch_t queued, uint64_t reserved_pushes,
   ThreadPool::TPHandle &handle)
@@ -9127,7 +9469,8 @@ void OSD::do_recovery(
    * recovery_requeue_callback event, which re-queues the recovery op using
    * queue_recovery_after_sleep.
    */
-  if (cct->_conf->osd_recovery_sleep > 0 && service.recovery_needs_sleep) {
+  float recovery_sleep = get_osd_recovery_sleep();
+  if (recovery_sleep > 0 && service.recovery_needs_sleep) {
     PGRef pgref(pg);
     auto recovery_requeue_callback = new FunctionContext([this, pgref, queued, reserved_pushes](int r) {
       dout(20) << "do_recovery wake up at "
@@ -9144,7 +9487,7 @@ void OSD::do_recovery(
     if (service.recovery_schedule_time < ceph_clock_now()) {
       service.recovery_schedule_time = ceph_clock_now();
     }
-    service.recovery_schedule_time += cct->_conf->osd_recovery_sleep;
+    service.recovery_schedule_time += recovery_sleep;
     service.recovery_sleep_timer.add_event_at(service.recovery_schedule_time,
                                              recovery_requeue_callback);
     dout(20) << "Recovery event scheduled at "
@@ -9191,18 +9534,18 @@ void OSD::do_recovery(
       pg->discover_all_missing(*rctx.query_map);
       if (rctx.query_map->empty()) {
        string action;
-        if (pg->state_test(PG_STATE_BACKFILL)) {
+        if (pg->state_test(PG_STATE_BACKFILLING)) {
          auto evt = PG::CephPeeringEvtRef(new PG::CephPeeringEvt(
            queued,
            queued,
-           PG::CancelBackfill()));
+           PG::DeferBackfill(cct->_conf->osd_recovery_retry_interval)));
          pg->queue_peering_event(evt);
          action = "in backfill";
         } else if (pg->state_test(PG_STATE_RECOVERING)) {
          auto evt = PG::CephPeeringEvtRef(new PG::CephPeeringEvt(
            queued,
            queued,
-           PG::CancelRecovery()));
+           PG::DeferRecovery(cct->_conf->osd_recovery_retry_interval)));
          pg->queue_peering_event(evt);
          action = "in recovery";
        } else {
@@ -9264,8 +9607,7 @@ void OSDService::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
 
 bool OSDService::is_recovery_active()
 {
-  Mutex::Locker l(recovery_lock);
-  return recovery_ops_active > 0;
+  return local_reserver.has_reservation() || remote_reserver.has_reservation();
 }
 
 // =========================================================
@@ -9669,6 +10011,7 @@ int OSD::init_op_flags(OpRequestRef& op)
       if (base_pool && base_pool->require_rollback()) {
         if ((iter->op.op != CEPH_OSD_OP_READ) &&
             (iter->op.op != CEPH_OSD_OP_CHECKSUM) &&
+            (iter->op.op != CEPH_OSD_OP_CMPEXT) &&
             (iter->op.op != CEPH_OSD_OP_STAT) &&
             (iter->op.op != CEPH_OSD_OP_ISDIRTY) &&
             (iter->op.op != CEPH_OSD_OP_UNDIRTY) &&