]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osdc/Objecter.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / osdc / Objecter.cc
index 447f44f6256b6fbbc59b26bb7965ad63e5c050c5..27e00c2309f24092a811487f86ad115708dbdd72 100644 (file)
@@ -98,9 +98,6 @@ enum {
   l_osdc_osdop_cmpxattr,
   l_osdc_osdop_rmxattr,
   l_osdc_osdop_resetxattrs,
-  l_osdc_osdop_tmap_up,
-  l_osdc_osdop_tmap_put,
-  l_osdc_osdop_tmap_get,
   l_osdc_osdop_call,
   l_osdc_osdop_watch,
   l_osdc_osdop_notify,
@@ -158,8 +155,8 @@ class Objecter::RequestStateHook : public AdminSocketHook {
   Objecter *m_objecter;
 public:
   explicit RequestStateHook(Objecter *objecter);
-  bool call(std::string command, cmdmap_t& cmdmap, std::string format,
-            bufferlist& out) override;
+  bool call(std::string_view command, const cmdmap_t& cmdmap,
+           std::string_view format, bufferlist& out) override;
 };
 
 /**
@@ -216,7 +213,7 @@ const char** Objecter::get_tracked_conf_keys() const
 }
 
 
-void Objecter::handle_conf_change(const struct md_config_t *conf,
+void Objecter::handle_conf_change(const ConfigProxy& conf,
                                  const std::set <std::string> &changed)
 {
   if (changed.count("crush_location")) {
@@ -237,7 +234,7 @@ void Objecter::update_crush_location()
  */
 void Objecter::init()
 {
-  assert(!initialized);
+  ceph_assert(!initialized);
 
   if (!logger) {
     PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
@@ -246,7 +243,7 @@ void Objecter::init()
                PerfCountersBuilder::PRIO_CRITICAL);
     pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
     pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
-    pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(BYTES));
+    pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
     pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
     pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
 
@@ -292,12 +289,6 @@ void Objecter::init()
                        "Remove xattr operations");
     pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
                        "Reset xattr operations");
-    pcb.add_u64_counter(l_osdc_osdop_tmap_up, "osdop_tmap_up",
-                       "TMAP update operations");
-    pcb.add_u64_counter(l_osdc_osdop_tmap_put, "osdop_tmap_put",
-                       "TMAP put operations");
-    pcb.add_u64_counter(l_osdc_osdop_tmap_get, "osdop_tmap_get",
-                       "TMAP get operations");
     pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
                        "Call (execute) operations");
     pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
@@ -385,7 +376,7 @@ void Objecter::init()
 
   update_crush_location();
 
-  cct->_conf->add_observer(this);
+  cct->_conf.add_observer(this);
 
   initialized = true;
 }
@@ -407,14 +398,14 @@ void Objecter::start(const OSDMap* o)
 
 void Objecter::shutdown()
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
   unique_lock wl(rwlock);
 
   initialized = false;
 
   wl.unlock();
-  cct->_conf->remove_observer(this);
+  cct->_conf.remove_observer(this);
   wl.lock();
 
   map<int,OSDSession*>::iterator p;
@@ -528,7 +519,7 @@ void Objecter::shutdown()
 void Objecter::_send_linger(LingerOp *info,
                            shunique_lock& sul)
 {
-  assert(sul.owns_lock() && sul.mutex() == &rwlock);
+  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
 
   vector<OSDOp> opv;
   Context *oncommit = NULL;
@@ -568,9 +559,10 @@ void Objecter::_send_linger(LingerOp *info,
 
   // do not resend this; we will send a new op to reregister
   o->should_resend = false;
+  o->ctx_budgeted = true;
 
   if (info->register_tid) {
-    // repeat send.  cancel old registeration op, if any.
+    // repeat send.  cancel old registration op, if any.
     OSDSession::unique_lock sl(info->session->lock);
     if (info->session->ops.count(info->register_tid)) {
       Op *o = info->session->ops[info->register_tid];
@@ -578,13 +570,10 @@ void Objecter::_send_linger(LingerOp *info,
       _cancel_linger_op(o);
     }
     sl.unlock();
-
-    _op_submit(o, sul, &info->register_tid);
-  } else {
-    // first send
-    _op_submit_with_budget(o, sul, &info->register_tid);
   }
 
+  _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
+
   logger->inc(l_osdc_linger_send);
 }
 
@@ -607,9 +596,9 @@ void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
 
   if (!info->is_watch) {
     // make note of the notify_id
-    bufferlist::iterator p = outbl.begin();
+    auto p = outbl.cbegin();
     try {
-      ::decode(info->notify_id, p);
+      decode(info->notify_id, p);
       ldout(cct, 10) << "_linger_commit  notify_id=" << info->notify_id
                     << dendl;
     }
@@ -644,7 +633,7 @@ struct C_DoWatchError : public Context {
 int Objecter::_normalize_watch_error(int r)
 {
   // translate ENOENT -> ENOTCONN so that a delete->disconnection
-  // notification and a failure to reconnect becuase we raced with
+  // notification and a failure to reconnect because we raced with
   // the delete appear the same to the user.
   if (r == -ENOENT)
     r = -ENOTCONN;
@@ -683,7 +672,7 @@ void Objecter::_send_linger_ping(LingerOp *info)
     return;
   }
 
-  ceph::mono_time now = ceph::mono_clock::now();
+  ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
   ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
                 << dendl;
 
@@ -699,17 +688,16 @@ void Objecter::_send_linger_ping(LingerOp *info)
   o->target = info->target;
   o->should_resend = false;
   _send_op_account(o);
-  MOSDOp *m = _prepare_osd_op(o);
   o->tid = ++last_tid;
   _session_op_assign(info->session, o);
-  _send_op(o, m);
+  _send_op(o);
   info->ping_tid = o->tid;
 
   onack->sent = now;
   logger->inc(l_osdc_linger_ping);
 }
 
-void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent,
+void Objecter::_linger_ping(LingerOp *info, int r, ceph::coarse_mono_time sent,
                            uint32_t register_gen)
 {
   LingerOp::unique_lock l(info->watch_lock);
@@ -736,10 +724,10 @@ int Objecter::linger_check(LingerOp *info)
 {
   LingerOp::shared_lock l(info->watch_lock);
 
-  mono_time stamp = info->watch_valid_thru;
+  ceph::coarse_mono_time stamp = info->watch_valid_thru;
   if (!info->watch_pending_async.empty())
-    stamp = MIN(info->watch_valid_thru, info->watch_pending_async.front());
-  auto age = mono_clock::now() - stamp;
+    stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
+  auto age = ceph::coarse_mono_clock::now() - stamp;
 
   ldout(cct, 10) << __func__ << " " << info->linger_id
                 << " err " << info->last_error
@@ -770,7 +758,7 @@ void Objecter::_linger_cancel(LingerOp *info)
 
     linger_ops.erase(info->linger_id);
     linger_ops_set.erase(info);
-    assert(linger_ops.size() == linger_ops_set.size());
+    ceph_assert(linger_ops.size() == linger_ops_set.size());
 
     info->canceled = true;
     info->put();
@@ -785,13 +773,13 @@ Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
                                              const object_locator_t& oloc,
                                              int flags)
 {
-  LingerOp *info = new LingerOp;
+  LingerOp *info = new LingerOp(this);
   info->target.base_oid = oid;
   info->target.base_oloc = oloc;
   if (info->target.base_oloc.key == oid)
     info->target.base_oloc.key.clear();
   info->target.flags = flags;
-  info->watch_valid_thru = mono_clock::now();
+  info->watch_valid_thru = ceph::coarse_mono_clock::now();
 
   unique_lock l(rwlock);
 
@@ -803,7 +791,7 @@ Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
                 << dendl;
   linger_ops[info->linger_id] = info;
   linger_ops_set.insert(info);
-  assert(linger_ops.size() == linger_ops_set.size());
+  ceph_assert(linger_ops.size() == linger_ops_set.size());
 
   info->get(); // for the caller
   return info;
@@ -827,6 +815,8 @@ ceph_tid_t Objecter::linger_watch(LingerOp *info,
   info->pobjver = objver;
   info->on_reg_commit = oncommit;
 
+  info->ctx_budget = take_linger_budget(info);
+
   shunique_lock sul(rwlock, ceph::acquire_unique);
   _linger_submit(info, sul);
   logger->inc(l_osdc_linger_active);
@@ -849,6 +839,8 @@ ceph_tid_t Objecter::linger_notify(LingerOp *info,
   info->pobjver = objver;
   info->on_reg_commit = onfinish;
 
+  info->ctx_budget = take_linger_budget(info);
+  
   shunique_lock sul(rwlock, ceph::acquire_unique);
   _linger_submit(info, sul);
   logger->inc(l_osdc_linger_active);
@@ -858,8 +850,9 @@ ceph_tid_t Objecter::linger_notify(LingerOp *info,
 
 void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
 {
-  assert(sul.owns_lock() && sul.mutex() == &rwlock);
-  assert(info->linger_id);
+  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
+  ceph_assert(info->linger_id);
+  ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
 
   // Populate Op::target
   OSDSession *s = NULL;
@@ -867,7 +860,7 @@ void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
 
   // Create LingerOp<->OSDSession relation
   int r = _get_session(info->target.osd, &s, sul);
-  assert(r == 0);
+  ceph_assert(r == 0);
   OSDSession::unique_lock sl(s->lock);
   _session_linger_op_assign(s, info);
   sl.unlock();
@@ -937,7 +930,7 @@ void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
   ldout(cct, 10) << __func__ << " " << *m << dendl;
 
   shared_lock l(rwlock);
-  assert(initialized);
+  ceph_assert(initialized);
 
   if (info->canceled) {
     l.unlock();
@@ -945,9 +938,9 @@ void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
   }
 
   // notify completion?
-  assert(info->is_watch);
-  assert(info->watch_context);
-  assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
+  ceph_assert(info->is_watch);
+  ceph_assert(info->watch_context);
+  ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
 
   l.unlock();
 
@@ -967,9 +960,6 @@ void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
 bool Objecter::ms_dispatch(Message *m)
 {
   ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
-  if (!initialized)
-    return false;
-
   switch (m->get_type()) {
     // these we exlusively handle
   case CEPH_MSG_OSD_OPREPLY:
@@ -1015,16 +1005,18 @@ bool Objecter::ms_dispatch(Message *m)
   return false;
 }
 
-void Objecter::_scan_requests(OSDSession *s,
-                             bool force_resend,
-                             bool cluster_full,
-                             map<int64_t, bool> *pool_full_map,
-                             map<ceph_tid_t, Op*>& need_resend,
-                             list<LingerOp*>& need_resend_linger,
-                             map<ceph_tid_t, CommandOp*>& need_resend_command,
-                             shunique_lock& sul)
+void Objecter::_scan_requests(
+  OSDSession *s,
+  bool skipped_map,
+  bool cluster_full,
+  map<int64_t, bool> *pool_full_map,
+  map<ceph_tid_t, Op*>& need_resend,
+  list<LingerOp*>& need_resend_linger,
+  map<ceph_tid_t, CommandOp*>& need_resend_command,
+  shunique_lock& sul,
+  const mempool::osdmap::map<int64_t,OSDMap::snap_interval_set_t> *gap_removed_snaps)
 {
-  assert(sul.owns_lock() && sul.mutex() == &rwlock);
+  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
 
   list<LingerOp*> unregister_lingers;
 
@@ -1034,7 +1026,7 @@ void Objecter::_scan_requests(OSDSession *s,
   map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
   while (lp != s->linger_ops.end()) {
     LingerOp *op = lp->second;
-    assert(op->session == s);
+    ceph_assert(op->session == s);
     // check_linger_pool_dne() may touch linger_ops; prevent iterator
     // invalidation
     ++lp;
@@ -1046,7 +1038,7 @@ void Objecter::_scan_requests(OSDSession *s,
        (*pool_full_map)[op->target.base_oloc.pool];
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
-      if (!force_resend && !force_resend_writes)
+      if (!skipped_map && !force_resend_writes)
        break;
       // -- fall-thru --
     case RECALC_OP_TARGET_NEED_RESEND:
@@ -1071,6 +1063,10 @@ void Objecter::_scan_requests(OSDSession *s,
     Op *op = p->second;
     ++p;   // check_op_pool_dne() may touch ops; prevent iterator invalidation
     ldout(cct, 10) << " checking op " << op->tid << dendl;
+    _prune_snapc(osdmap->get_new_removed_snaps(), op);
+    if (skipped_map) {
+      _prune_snapc(*gap_removed_snaps, op);
+    }
     bool force_resend_writes = cluster_full;
     if (pool_full_map)
       force_resend_writes = force_resend_writes ||
@@ -1079,13 +1075,11 @@ void Objecter::_scan_requests(OSDSession *s,
                         op->session ? op->session->con.get() : nullptr);
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
-      if (!force_resend && !(force_resend_writes && op->respects_full()))
+      if (!skipped_map && !(force_resend_writes && op->respects_full()))
        break;
       // -- fall-thru --
     case RECALC_OP_TARGET_NEED_RESEND:
-      if (op->session) {
-       _session_op_remove(op->session, op);
-      }
+      _session_op_remove(op->session, op);
       need_resend[op->tid] = op;
       _op_cancel_map_check(op);
       break;
@@ -1109,14 +1103,12 @@ void Objecter::_scan_requests(OSDSession *s,
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
       // resend if skipped map; otherwise do nothing.
-      if (!force_resend && !force_resend_writes)
+      if (!skipped_map && !force_resend_writes)
        break;
       // -- fall-thru --
     case RECALC_OP_TARGET_NEED_RESEND:
       need_resend_command[c->tid] = c;
-      if (c->session) {
-       _session_command_op_remove(c->session, c);
-      }
+      _session_command_op_remove(c->session, c);
       _command_cancel_map_check(c);
       break;
     case RECALC_OP_TARGET_POOL_DNE:
@@ -1143,7 +1135,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
   if (!initialized)
     return;
 
-  assert(osdmap);
+  ceph_assert(osdmap);
 
   if (m->fsid != monc->get_fsid()) {
     ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
@@ -1224,25 +1216,33 @@ void Objecter::handle_osd_map(MOSDMap *m)
        update_pool_full_map(pool_full_map);
 
        // check all outstanding requests on every epoch
+       for (auto& i : need_resend) {
+         _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
+         if (skipped_map) {
+           _prune_snapc(m->gap_removed_snaps, i.second);
+         }
+       }
        _scan_requests(homeless_session, skipped_map, cluster_full,
                       &pool_full_map, need_resend,
-                      need_resend_linger, need_resend_command, sul);
+                      need_resend_linger, need_resend_command, sul,
+                      &m->gap_removed_snaps);
        for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
             p != osd_sessions.end(); ) {
          OSDSession *s = p->second;
          _scan_requests(s, skipped_map, cluster_full,
                         &pool_full_map, need_resend,
-                        need_resend_linger, need_resend_command, sul);
+                        need_resend_linger, need_resend_command, sul,
+                        &m->gap_removed_snaps);
          ++p;
          // osd down or addr change?
          if (!osdmap->is_up(s->osd) ||
              (s->con &&
-              s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)) {
+              s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
            close_session(s);
          }
        }
 
-       assert(e == osdmap->get_epoch());
+       ceph_assert(e == osdmap->get_epoch());
       }
 
     } else {
@@ -1252,7 +1252,8 @@ void Objecter::handle_osd_map(MOSDMap *m)
             p != osd_sessions.end(); ++p) {
          OSDSession *s = p->second;
          _scan_requests(s, false, false, NULL, need_resend,
-                        need_resend_linger, need_resend_command, sul);
+                        need_resend_linger, need_resend_command, sul,
+                        nullptr);
        }
        ldout(cct, 3) << "handle_osd_map decoding full epoch "
                      << m->get_last() << dendl;
@@ -1260,7 +1261,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
 
        _scan_requests(homeless_session, false, false, NULL,
                       need_resend, need_resend_linger,
-                      need_resend_command, sul);
+                      need_resend_command, sul, nullptr);
       } else {
        ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
                      << dendl;
@@ -1305,7 +1306,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
     bool mapped_session = false;
     if (!s) {
       int r = _map_session(&op->target, &s, sul);
-      assert(r == 0);
+      ceph_assert(r == 0);
       mapped_session = true;
     } else {
       get_session(s);
@@ -1329,15 +1330,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
   for (list<LingerOp*>::iterator p = need_resend_linger.begin();
        p != need_resend_linger.end(); ++p) {
     LingerOp *op = *p;
-    if (!op->session) {
-      _calc_target(&op->target, nullptr);
-      OSDSession *s = NULL;
-      const int r = _get_session(op->target.osd, &s, sul);
-      assert(r == 0);
-      assert(s != NULL);
-      op->session = s;
-      put_session(s);
-    }
+    ceph_assert(op->session);
     if (!op->session->is_homeless()) {
       logger->inc(l_osdc_linger_resend);
       _send_linger(op, sul);
@@ -1545,13 +1538,14 @@ void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
                     << " concluding pool " << op->target.base_pgid.pool()
                     << " dne" << dendl;
       if (op->onfinish) {
+       num_in_flight--;
        op->onfinish->complete(-ENOENT);
       }
 
       OSDSession *s = op->session;
       if (s) {
-       assert(s != NULL);
-       assert(sl->mutex() == &s->lock);
+       ceph_assert(s != NULL);
+       ceph_assert(sl->mutex() == &s->lock);
        bool session_locked = sl->owns_lock();
        if (!session_locked) {
          sl->lock();
@@ -1769,7 +1763,7 @@ void Objecter::_command_cancel_map_check(CommandOp *c)
  */
 int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
 {
-  assert(sul && sul.mutex() == &rwlock);
+  ceph_assert(sul && sul.mutex() == &rwlock);
 
   if (osd < 0) {
     *session = homeless_session;
@@ -1792,8 +1786,8 @@ int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
   }
   OSDSession *s = new OSDSession(cct, osd);
   osd_sessions[osd] = s;
-  s->con = messenger->get_connection(osdmap->get_inst(osd));
-  s->con->set_priv(s->get());
+  s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
+  s->con->set_priv(RefCountedPtr{s});
   logger->inc(l_osdc_osd_session_open);
   logger->set(l_osdc_osd_sessions, osd_sessions.size());
   s->get();
@@ -1814,7 +1808,7 @@ void Objecter::put_session(Objecter::OSDSession *s)
 
 void Objecter::get_session(Objecter::OSDSession *s)
 {
-  assert(s != NULL);
+  ceph_assert(s != NULL);
 
   if (!s->is_homeless()) {
     ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
@@ -1828,16 +1822,16 @@ void Objecter::_reopen_session(OSDSession *s)
   // rwlock is locked unique
   // s->lock is locked
 
-  entity_inst_t inst = osdmap->get_inst(s->osd);
+  auto addrs = osdmap->get_addrs(s->osd);
   ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
-                << inst << dendl;
+                << addrs << dendl;
   if (s->con) {
     s->con->set_priv(NULL);
     s->con->mark_down();
     logger->inc(l_osdc_osd_session_close);
   }
-  s->con = messenger->get_connection(inst);
-  s->con->set_priv(s->get());
+  s->con = messenger->connect_to_osd(addrs);
+  s->con->set_priv(RefCountedPtr{s});
   s->incarnation++;
   logger->inc(l_osdc_osd_session_open);
 }
@@ -1952,15 +1946,9 @@ void Objecter::wait_for_latest_osdmap(Context *fin)
 void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
 {
   unique_lock wl(rwlock);
-  _get_latest_version(oldest, newest, fin);
-}
-
-void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
-                                  Context *fin)
-{
-  // rwlock is locked unique
   if (osdmap->get_epoch() >= newest) {
-  ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
+    ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
+    wl.unlock();
     if (fin)
       fin->complete(0);
     return;
@@ -2034,20 +2022,6 @@ bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
   return false;
 }
 
-void Objecter::kick_requests(OSDSession *session)
-{
-  ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
-
-  map<uint64_t, LingerOp *> lresend;
-  unique_lock wl(rwlock);
-
-  OSDSession::unique_lock sl(session->lock);
-  _kick_requests(session, lresend);
-  sl.unlock();
-
-  _linger_ops_resend(lresend, wl);
-}
-
 void Objecter::_kick_requests(OSDSession *session,
                              map<uint64_t, LingerOp *>& lresend)
 {
@@ -2063,7 +2037,6 @@ void Objecter::_kick_requests(OSDSession *session,
        p != session->ops.end();) {
     Op *op = p->second;
     ++p;
-    logger->inc(l_osdc_op_resend);
     if (op->should_resend) {
       if (!op->target.paused)
        resend[op->tid] = op;
@@ -2073,26 +2046,27 @@ void Objecter::_kick_requests(OSDSession *session,
     }
   }
 
+  logger->inc(l_osdc_op_resend, resend.size());
   while (!resend.empty()) {
     _send_op(resend.begin()->second);
     resend.erase(resend.begin());
   }
 
   // resend lingers
+  logger->inc(l_osdc_linger_resend, session->linger_ops.size());
   for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
        j != session->linger_ops.end(); ++j) {
     LingerOp *op = j->second;
     op->get();
-    logger->inc(l_osdc_linger_resend);
-    assert(lresend.count(j->first) == 0);
+    ceph_assert(lresend.count(j->first) == 0);
     lresend[j->first] = op;
   }
 
   // resend commands
+  logger->inc(l_osdc_command_resend, session->command_ops.size());
   map<uint64_t,CommandOp*> cresend;  // resend in order
   for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
        k != session->command_ops.end(); ++k) {
-    logger->inc(l_osdc_command_resend);
     cresend[k->first] = k->second;
   }
   while (!cresend.empty()) {
@@ -2104,7 +2078,7 @@ void Objecter::_kick_requests(OSDSession *session,
 void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
                                  unique_lock& ul)
 {
-  assert(ul.owns_lock());
+  ceph_assert(ul.owns_lock());
   shunique_lock sul(std::move(ul));
   while (!lresend.empty()) {
     LingerOp *op = lresend.begin()->second;
@@ -2114,12 +2088,12 @@ void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
     op->put();
     lresend.erase(lresend.begin());
   }
-  ul = unique_lock(sul.release_to_unique());
+  ul = sul.release_to_unique();
 }
 
 void Objecter::start_tick()
 {
-  assert(tick_event == 0);
+  ceph_assert(tick_event == 0);
   tick_event =
     timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
                    &Objecter::tick, this);
@@ -2144,7 +2118,7 @@ void Objecter::tick()
 
 
   // look for laggy requests
-  auto cutoff = ceph::mono_clock::now();
+  auto cutoff = ceph::coarse_mono_clock::now();
   cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout);  // timeout
 
   unsigned laggy_ops = 0;
@@ -2158,7 +2132,7 @@ void Objecter::tick()
        p != s->ops.end();
        ++p) {
       Op *op = p->second;
-      assert(op->session);
+      ceph_assert(op->session);
       if (op->stamp < cutoff) {
        ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
                      << " is laggy" << dendl;
@@ -2171,7 +2145,7 @@ void Objecter::tick()
        ++p) {
       LingerOp *op = p->second;
       LingerOp::unique_lock wl(op->watch_lock);
-      assert(op->session);
+      ceph_assert(op->session);
       ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
                     << " (osd." << op->session->osd << ")" << dendl;
       found = true;
@@ -2182,7 +2156,7 @@ void Objecter::tick()
        p != s->command_ops.end();
        ++p) {
       CommandOp *op = p->second;
-      assert(op->session);
+      ceph_assert(op->session);
       ldout(cct, 10) << " pinging osd that serves command tid " << p->first
                     << " (osd." << op->session->osd << ")" << dendl;
       found = true;
@@ -2207,7 +2181,7 @@ void Objecter::tick()
     }
   }
 
-  // Make sure we don't resechedule if we wake up after shutdown
+  // Make sure we don't reschedule if we wake up after shutdown
   if (initialized) {
     tick_event = timer.reschedule_me(ceph::make_timespan(
                                       cct->_conf->objecter_tick_interval));
@@ -2281,11 +2255,11 @@ void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
                                      ceph_tid_t *ptid,
                                      int *ctx_budget)
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
-  assert(op->ops.size() == op->out_bl.size());
-  assert(op->ops.size() == op->out_rval.size());
-  assert(op->ops.size() == op->out_handler.size());
+  ceph_assert(op->ops.size() == op->out_bl.size());
+  ceph_assert(op->ops.size() == op->out_rval.size());
+  ceph_assert(op->ops.size() == op->out_handler.size());
 
   // throttle.  before we look at any state, because
   // _take_op_budget() may drop our lock while it blocks.
@@ -2355,9 +2329,6 @@ void Objecter::_send_op_account(Op *op)
     case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
     case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
     case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
-    case CEPH_OSD_OP_TMAPUP: code = l_osdc_osdop_tmap_up; break;
-    case CEPH_OSD_OP_TMAPPUT: code = l_osdc_osdop_tmap_put; break;
-    case CEPH_OSD_OP_TMAPGET: code = l_osdc_osdop_tmap_get; break;
 
     // OMAP read operations
     case CEPH_OSD_OP_OMAPGETVALS:
@@ -2390,7 +2361,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
   ldout(cct, 10) << __func__ << " op " << op << dendl;
 
   // pick target
-  assert(op->session == NULL);
+  ceph_assert(op->session == NULL);
   OSDSession *s = NULL;
 
   bool check_for_latest_map = _calc_target(&op->target, nullptr)
@@ -2399,7 +2370,8 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
   // Try to get a session, including a retry if we need to take write lock
   int r = _get_session(op->target.osd, &s, sul);
   if (r == -EAGAIN ||
-      (check_for_latest_map && sul.owns_lock_shared())) {
+      (check_for_latest_map && sul.owns_lock_shared()) ||
+      cct->_conf->objecter_debug_inject_relock_delay) {
     epoch_t orig_epoch = osdmap->get_epoch();
     sul.unlock();
     if (cct->_conf->objecter_debug_inject_relock_delay) {
@@ -2420,17 +2392,17 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
     }
   }
   if (r == -EAGAIN) {
-    assert(s == NULL);
+    ceph_assert(s == NULL);
     r = _get_session(op->target.osd, &s, sul);
   }
-  assert(r == 0);
-  assert(s);  // may be homeless
+  ceph_assert(r == 0);
+  ceph_assert(s);  // may be homeless
 
   _send_op_account(op);
 
   // send?
 
-  assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
+  ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
 
   if (osdmap_full_try) {
     op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
@@ -2468,11 +2440,6 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
     _maybe_request_map();
   }
 
-  MOSDOp *m = NULL;
-  if (need_send) {
-    m = _prepare_osd_op(op);
-  }
-
   OSDSession::unique_lock sl(s->lock);
   if (op->tid == 0)
     op->tid = ++last_tid;
@@ -2486,7 +2453,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
   _session_op_assign(s, op);
 
   if (need_send) {
-    _send_op(op, m);
+    _send_op(op);
   }
 
   // Last chance to touch Op here, after giving up session lock it can
@@ -2507,7 +2474,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
 
 int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
   OSDSession::unique_lock sl(s->lock);
 
@@ -2518,11 +2485,13 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
     return -ENOENT;
   }
 
+#if 0
   if (s->con) {
     ldout(cct, 20) << " revoking rx buffer for " << tid
                   << " on " << s->con << dendl;
     s->con->revoke_rx_buffer(tid);
   }
+#endif
 
   ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
                 << dendl;
@@ -2634,7 +2603,7 @@ epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
       int cancel_result = op_cancel(s, *titer, r);
       // We hold rwlock across search and cancellation, so cancels
       // should always succeed
-      assert(cancel_result == 0);
+      ceph_assert(cancel_result == 0);
     }
     if (!found && to_cancel.size())
       found = true;
@@ -2735,7 +2704,7 @@ bool Objecter::_osdmap_pool_full(const pg_pool_t &p) const
  */
 bool Objecter::_osdmap_full_flag() const
 {
-  // Ignore the FULL flag if the caller has honor_osdmap_full
+  // Ignore the FULL flag if the caller does not have honor_osdmap_full
   return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
 }
 
@@ -2773,6 +2742,34 @@ int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
   return p->raw_hash_to_pg(p->hash_key(key, ns));
 }
 
+void Objecter::_prune_snapc(
+  const mempool::osdmap::map<int64_t,
+  OSDMap::snap_interval_set_t>& new_removed_snaps,
+  Op *op)
+{
+  bool match = false;
+  auto i = new_removed_snaps.find(op->target.base_pgid.pool());
+  if (i != new_removed_snaps.end()) {
+    for (auto s : op->snapc.snaps) {
+      if (i->second.contains(s)) {
+       match = true;
+       break;
+      }
+    }
+    if (match) {
+      vector<snapid_t> new_snaps;
+      for (auto s : op->snapc.snaps) {
+       if (!i->second.contains(s)) {
+         new_snaps.push_back(s);
+       }
+      }
+      op->snapc.snaps.swap(new_snaps);
+      ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
+                   << " (was " << new_snaps << ")" << dendl;
+    }
+  }
+}
+
 int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
 {
   // rwlock is locked
@@ -2822,9 +2819,9 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
 
   pg_t pgid;
   if (t->precalc_pgid) {
-    assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
-    assert(t->base_oid.name.empty()); // make sure this is a pg op
-    assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
+    ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
+    ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
+    ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
     pgid = t->base_pgid;
   } else {
     int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
@@ -2843,6 +2840,7 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
   int size = pi->size;
   int min_size = pi->min_size;
   unsigned pg_num = pi->get_pg_num();
+  unsigned pg_num_pending = pi->get_pg_num_pending();
   int up_primary, acting_primary;
   vector<int> up, acting;
   osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
@@ -2866,6 +2864,8 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
        min_size,
        t->pg_num,
        pg_num,
+       t->pg_num_pending,
+       pg_num_pending,
        t->sort_bitwise,
        sort_bitwise,
        t->recovery_deletes,
@@ -2886,12 +2886,15 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
       is_pg_changed(
        t->acting_primary, t->acting, acting_primary, acting,
        t->used_replica || any_change);
-  bool split = false;
+  bool split_or_merge = false;
   if (t->pg_num) {
-    split = prev_pgid.is_split(t->pg_num, pg_num, nullptr);
+    split_or_merge =
+      prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
+      prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
+      prev_pgid.is_merge_target(t->pg_num, pg_num);
   }
 
-  if (legacy_change || split || force_resend) {
+  if (legacy_change || split_or_merge || force_resend) {
     t->pgid = pgid;
     t->acting = acting;
     t->acting_primary = acting_primary;
@@ -2901,6 +2904,7 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
     t->min_size = min_size;
     t->pg_num = pg_num;
     t->pg_num_mask = pi->get_pg_num_mask();
+    t->pg_num_pending = pg_num_pending;
     osdmap->get_primary_shard(
       pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
       &t->actual_pgid);
@@ -2945,7 +2949,7 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
              t->used_replica = true;
          }
        }
-       assert(best >= 0);
+       ceph_assert(best >= 0);
        osd = acting[best];
       } else {
        osd = acting_primary;
@@ -2956,7 +2960,7 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
   if (legacy_change || unpaused || force_resend) {
     return RECALC_OP_TARGET_NEED_RESEND;
   }
-  if (split &&
+  if (split_or_merge &&
       (osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS ||
        HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
                    RESEND_ON_SPLIT))) {
@@ -2975,8 +2979,8 @@ int Objecter::_map_session(op_target_t *target, OSDSession **s,
 void Objecter::_session_op_assign(OSDSession *to, Op *op)
 {
   // to->lock is locked
-  assert(op->session == NULL);
-  assert(op->tid);
+  ceph_assert(op->session == NULL);
+  ceph_assert(op->tid);
 
   get_session(to);
   op->session = to;
@@ -2991,7 +2995,7 @@ void Objecter::_session_op_assign(OSDSession *to, Op *op)
 
 void Objecter::_session_op_remove(OSDSession *from, Op *op)
 {
-  assert(op->session == from);
+  ceph_assert(op->session == from);
   // from->lock is locked
 
   if (from->is_homeless()) {
@@ -3008,7 +3012,7 @@ void Objecter::_session_op_remove(OSDSession *from, Op *op)
 void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
 {
   // to lock is locked unique
-  assert(op->session == NULL);
+  ceph_assert(op->session == NULL);
 
   if (to->is_homeless()) {
     num_homeless_ops++;
@@ -3024,7 +3028,7 @@ void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
 
 void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
 {
-  assert(from == op->session);
+  ceph_assert(from == op->session);
   // from->lock is locked unique
 
   if (from->is_homeless()) {
@@ -3041,7 +3045,7 @@ void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
 
 void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
 {
-  assert(from == op->session);
+  ceph_assert(from == op->session);
   // from->lock is locked
 
   if (from->is_homeless()) {
@@ -3058,8 +3062,8 @@ void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
 void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
 {
   // to->lock is locked
-  assert(op->session == NULL);
-  assert(op->tid);
+  ceph_assert(op->session == NULL);
+  ceph_assert(op->tid);
 
   if (to->is_homeless()) {
     num_homeless_ops++;
@@ -3085,7 +3089,7 @@ int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
 
     OSDSession *s = NULL;
     r = _get_session(linger_op->target.osd, &s, sul);
-    assert(r == 0);
+    ceph_assert(r == 0);
 
     if (linger_op->session != s) {
       // NB locking two sessions (s and linger_op->session) at the
@@ -3107,7 +3111,7 @@ void Objecter::_cancel_linger_op(Op *op)
 {
   ldout(cct, 15) << "cancel_op " << op->tid << dendl;
 
-  assert(!op->should_resend);
+  ceph_assert(!op->should_resend);
   if (op->onfinish) {
     delete op->onfinish;
     num_in_flight--;
@@ -3118,12 +3122,14 @@ void Objecter::_cancel_linger_op(Op *op)
 
 void Objecter::_finish_op(Op *op, int r)
 {
-  ldout(cct, 15) << "finish_op " << op->tid << dendl;
+  ldout(cct, 15) << __func__ << " " << op->tid << dendl;
 
   // op->session->lock is locked unique or op->session is null
 
-  if (!op->ctx_budgeted && op->budgeted)
-    put_op_budget(op);
+  if (!op->ctx_budgeted && op->budget >= 0) {
+    put_op_budget_bytes(op->budget);
+    op->budget = -1;
+  }
 
   if (op->ontimeout && r != -ETIMEDOUT)
     timer.cancel_event(op->ontimeout);
@@ -3134,29 +3140,13 @@ void Objecter::_finish_op(Op *op, int r)
 
   logger->dec(l_osdc_op_active);
 
-  assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
+  ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
 
   inflight_ops--;
 
   op->put();
 }
 
-void Objecter::finish_op(OSDSession *session, ceph_tid_t tid)
-{
-  ldout(cct, 15) << "finish_op " << tid << dendl;
-  shared_lock rl(rwlock);
-
-  OSDSession::unique_lock wl(session->lock);
-
-  map<ceph_tid_t, Op *>::iterator iter = session->ops.find(tid);
-  if (iter == session->ops.end())
-    return;
-
-  Op *op = iter->second;
-
-  _finish_op(op, 0);
-}
-
 MOSDOp *Objecter::_prepare_osd_op(Op *op)
 {
   // rwlock is locked
@@ -3172,7 +3162,7 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op)
     flags |= CEPH_OSD_FLAG_FULL_FORCE;
 
   op->target.paused = false;
-  op->stamp = ceph::mono_clock::now();
+  op->stamp = ceph::coarse_mono_clock::now();
 
   hobject_t hobj = op->target.get_hobj();
   MOSDOp *m = new MOSDOp(client_inc, op->tid,
@@ -3211,7 +3201,7 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op)
   return m;
 }
 
-void Objecter::_send_op(Op *op, MOSDOp *m)
+void Objecter::_send_op(Op *op)
 {
   // rwlock is locked
   // op->session->lock is locked
@@ -3240,10 +3230,8 @@ void Objecter::_send_op(Op *op, MOSDOp *m)
     }
   }
 
-  if (!m) {
-    assert(op->tid > 0);
-    m = _prepare_osd_op(op);
-  }
+  ceph_assert(op->tid > 0);
+  MOSDOp *m = _prepare_osd_op(op);
 
   if (op->target.actual_pgid != m->get_spg()) {
     ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
@@ -3258,8 +3246,9 @@ void Objecter::_send_op(Op *op, MOSDOp *m)
                 << dendl;
 
   ConnectionRef con = op->session->con;
-  assert(con);
+  ceph_assert(con);
 
+#if 0
   // preallocated rx buffer?
   if (op->con) {
     ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on "
@@ -3269,27 +3258,27 @@ void Objecter::_send_op(Op *op, MOSDOp *m)
   if (op->outbl &&
       op->ontimeout == 0 &&  // only post rx_buffer if no timeout; see #9582
       op->outbl->length()) {
+    op->outbl->invalidate_crc();  // messenger writes through c_str()
     ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con
                   << dendl;
     op->con = con;
     op->con->post_rx_buffer(op->tid, *op->outbl);
   }
+#endif
 
   op->incarnation = op->session->incarnation;
 
-  m->set_tid(op->tid);
-
   if (op->trace.valid()) {
     m->trace.init("op msg", nullptr, &op->trace);
   }
   op->session->con->send_message(m);
 }
 
-int Objecter::calc_op_budget(Op *op)
+int Objecter::calc_op_budget(const vector<OSDOp>& ops)
 {
   int op_budget = 0;
-  for (vector<OSDOp>::iterator i = op->ops.begin();
-       i != op->ops.end();
+  for (vector<OSDOp>::const_iterator i = ops.begin();
+       i != ops.end();
        ++i) {
     if (i->op.op & CEPH_OSD_OP_MODE_WR) {
       op_budget += i->indata.length();
@@ -3309,11 +3298,11 @@ void Objecter::_throttle_op(Op *op,
                            shunique_lock& sul,
                            int op_budget)
 {
-  assert(sul && sul.mutex() == &rwlock);
+  ceph_assert(sul && sul.mutex() == &rwlock);
   bool locked_for_write = sul.owns_lock();
 
   if (!op_budget)
-    op_budget = calc_op_budget(op);
+    op_budget = calc_op_budget(op->ops);
   if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
     sul.unlock();
     op_throttle_bytes.get(op_budget);
@@ -3332,15 +3321,9 @@ void Objecter::_throttle_op(Op *op,
   }
 }
 
-void Objecter::unregister_op(Op *op)
+int Objecter::take_linger_budget(LingerOp *info)
 {
-  OSDSession::unique_lock sl(op->session->lock);
-  op->session->ops.erase(op->tid);
-  sl.unlock();
-  put_session(op->session);
-  op->session = NULL;
-
-  inflight_ops--;
+  return 1;
 }
 
 /* This function DOES put the passed message before returning */
@@ -3358,12 +3341,10 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
-    if (s) {
-      s->put();
-    }
     m->put();
     return;
   }
@@ -3377,7 +3358,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
                                                    " onnvram" : " ack"))
                  << " ... stray" << dendl;
     sl.unlock();
-    put_session(s);
     m->put();
     return;
   }
@@ -3400,7 +3380,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     }
     _session_op_remove(s, op);
     sl.unlock();
-    put_session(s);
 
     _op_submit(op, sul, NULL);
     m->put();
@@ -3416,7 +3395,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
                    << op->session->con->get_peer_addr() << dendl;
       m->put();
       sl.unlock();
-      put_session(s);
       return;
     }
   } else {
@@ -3435,7 +3413,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
       num_in_flight--;
     _session_op_remove(s, op);
     sl.unlock();
-    put_session(s);
 
     // FIXME: two redirects could race and reorder
 
@@ -3456,7 +3433,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
       num_in_flight--;
     _session_op_remove(s, op);
     sl.unlock();
-    put_session(s);
 
     op->tid = 0;
     op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
@@ -3478,9 +3454,27 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
   // got data?
   if (op->outbl) {
+#if 0
     if (op->con)
       op->con->revoke_rx_buffer(op->tid);
-    m->claim_data(*op->outbl);
+#endif
+    auto& bl = m->get_data();
+    if (op->outbl->length() == bl.length() &&
+       bl.get_num_buffers() <= 1) {
+      // this is here to keep previous users to *relied* on getting data
+      // read into existing buffers happy.  Notably,
+      // libradosstriper::RadosStriperImpl::aio_read().
+      ldout(cct,10) << __func__ << " copying resulting " << bl.length()
+                   << " into existing buffer of length " << op->outbl->length()
+                   << dendl;
+      bufferlist t;
+      t.claim(*op->outbl);
+      t.invalidate_crc();  // we're overwriting the raw buffers via c_str()
+      bl.copy(0, bl.length(), t.c_str());
+      op->outbl->substr_of(t, 0, bl.length());
+    } else {
+      m->claim_data(*op->outbl);
+    }
     op->outbl = 0;
   }
 
@@ -3496,8 +3490,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   vector<bufferlist*>::iterator pb = op->out_bl.begin();
   vector<int*>::iterator pr = op->out_rval.begin();
   vector<Context*>::iterator ph = op->out_handler.begin();
-  assert(op->out_bl.size() == op->out_rval.size());
-  assert(op->out_bl.size() == op->out_handler.size());
+  ceph_assert(op->out_bl.size() == op->out_rval.size());
+  ceph_assert(op->out_bl.size() == op->out_handler.size());
   vector<OSDOp>::iterator p = out_ops.begin();
   for (unsigned i = 0;
        p != out_ops.end() && pb != op->out_bl.end();
@@ -3550,7 +3544,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   }
 
   m->put();
-  put_session(s);
 }
 
 void Objecter::handle_osd_backoff(MOSDBackoff *m)
@@ -3563,17 +3556,15 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m)
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
-    if (s)
-      s->put();
     m->put();
     return;
   }
 
   get_session(s);
-  s->put();  // from get_priv() above
 
   OSDSession::unique_lock sl(s->lock);
 
@@ -3618,7 +3609,7 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m)
                       << " [" << b->begin << "," << b->end
                       << ")" << dendl;
        auto spgp = s->backoffs.find(b->pgid);
-       assert(spgp != s->backoffs.end());
+       ceph_assert(spgp != s->backoffs.end());
        spgp->second.erase(b->begin);
        if (spgp->second.empty()) {
          s->backoffs.erase(spgp);
@@ -3661,7 +3652,7 @@ uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
   shared_lock rl(rwlock);
   list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
                                pos, list_context->pool_id, string());
-  ldout(cct, 10) << __func__ << list_context
+  ldout(cct, 10) << __func__ << " " << list_context
                 << " pos " << pos << " -> " << list_context->pos << dendl;
   pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
   list_context->current_pg = actual.ps();
@@ -3773,12 +3764,12 @@ void Objecter::_nlist_reply(NListContext *list_context, int r,
 {
   ldout(cct, 10) << __func__ << " " << list_context << dendl;
 
-  bufferlist::iterator iter = list_context->bl.begin();
+  auto iter = list_context->bl.cbegin();
   pg_nls_response_t response;
   bufferlist extra_info;
-  ::decode(response, iter);
+  decode(response, iter);
   if (!iter.end()) {
-    ::decode(extra_info, iter);
+    decode(extra_info, iter);
   }
 
   // if the osd returns 1 (newer code), or handle MAX, it means we
@@ -3871,8 +3862,12 @@ struct C_SelfmanagedSnap : public Context {
   C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
   void finish(int r) override {
     if (r == 0) {
-      bufferlist::iterator p = bl.begin();
-      ::decode(*psnapid, p);
+      try {
+        auto p = bl.cbegin();
+        decode(*psnapid, p);
+      } catch (buffer::error&) {
+        r = -EIO;
+      }
     }
     fin->complete(r);
   }
@@ -3945,7 +3940,7 @@ int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
   return 0;
 }
 
-int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
+int Objecter::create_pool(string& name, Context *onfinish,
                          int crush_rule)
 {
   unique_lock wl(rwlock);
@@ -3963,7 +3958,6 @@ int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
   op->onfinish = onfinish;
   op->pool_op = POOL_OP_CREATE;
   pool_ops[op->tid] = op;
-  op->auid = auid;
   op->crush_rule = crush_rule;
 
   pool_op_submit(op);
@@ -4008,32 +4002,6 @@ void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
   pool_op_submit(op);
 }
 
-/**
- * change the auid owner of a pool by contacting the monitor.
- * This requires the current connection to have write permissions
- * on both the pool's current auid and the new (parameter) auid.
- * Uses the standard Context callback when done.
- */
-int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
-{
-  unique_lock wl(rwlock);
-  ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
-  PoolOp *op = new PoolOp;
-  if (!op) return -ENOMEM;
-  op->tid = ++last_tid;
-  op->pool = pool;
-  op->name = "change_pool_auid";
-  op->onfinish = onfinish;
-  op->pool_op = POOL_OP_AUID_CHANGE;
-  op->auid = auid;
-  pool_ops[op->tid] = op;
-
-  logger->set(l_osdc_poolop_active, pool_ops.size());
-
-  pool_op_submit(op);
-  return 0;
-}
-
 void Objecter::pool_op_submit(PoolOp *op)
 {
   // rwlock is locked
@@ -4052,11 +4020,11 @@ void Objecter::_pool_op_submit(PoolOp *op)
   ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
   MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
                           op->name, op->pool_op,
-                          op->auid, last_seen_osdmap_version);
+                          last_seen_osdmap_version);
   if (op->snapid) m->snapid = op->snapid;
   if (op->crush_rule) m->crush_rule = op->crush_rule;
   monc->send_mon_message(m);
-  op->last_submit = ceph::mono_clock::now();
+  op->last_submit = ceph::coarse_mono_clock::now();
 
   logger->inc(l_osdc_poolop_send);
 }
@@ -4070,7 +4038,7 @@ void Objecter::_pool_op_submit(PoolOp *op)
  */
 void Objecter::handle_pool_op_reply(MPoolOpReply *m)
 {
-  FUNCTRACE();
+  FUNCTRACE(cct);
   shunique_lock sul(rwlock, acquire_shared);
   if (!initialized) {
     sul.unlock();
@@ -4105,11 +4073,11 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m)
        // map epoch changed, probably because a MOSDMap message
        // sneaked in. Do caller-specified callback now or else
        // we lose it forever.
-       assert(op->onfinish);
+       ceph_assert(op->onfinish);
        op->onfinish->complete(m->replyCode);
       }
     } else {
-      assert(op->onfinish);
+      ceph_assert(op->onfinish);
       op->onfinish->complete(m->replyCode);
     }
     op->onfinish = NULL;
@@ -4135,7 +4103,7 @@ done:
 
 int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
   unique_lock wl(rwlock);
 
@@ -4205,7 +4173,7 @@ void Objecter::_poolstat_submit(PoolStatOp *op)
   monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
                                           op->pools,
                                           last_seen_pgmap_version));
-  op->last_submit = ceph::mono_clock::now();
+  op->last_submit = ceph::coarse_mono_clock::now();
 
   logger->inc(l_osdc_poolstat_send);
 }
@@ -4240,7 +4208,7 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
 
 int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
   unique_lock wl(rwlock);
 
@@ -4307,7 +4275,7 @@ void Objecter::_fs_stats_submit(StatfsOp *op)
   monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
                                     op->data_pool,
                                     last_seen_pgmap_version));
-  op->last_submit = ceph::mono_clock::now();
+  op->last_submit = ceph::coarse_mono_clock::now();
 
   logger->inc(l_osdc_statfs_send);
 }
@@ -4340,7 +4308,7 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m)
 
 int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
   unique_lock wl(rwlock);
 
@@ -4422,8 +4390,9 @@ bool Objecter::ms_handle_reset(Connection *con)
     return false;
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
     unique_lock wl(rwlock);
-    
-    OSDSession *session = static_cast<OSDSession*>(con->get_priv());
+
+    auto priv = con->get_priv();
+    auto session = static_cast<OSDSession*>(priv.get());
     if (session) {
       ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
                    << " osd." << session->osd << dendl;
@@ -4442,7 +4411,6 @@ bool Objecter::ms_handle_reset(Connection *con)
       _linger_ops_resend(lresend, wl);
       wl.unlock();
       maybe_request_map();
-      session->put();
     }
     return true;
   }
@@ -4470,8 +4438,7 @@ bool Objecter::ms_handle_refused(Connection *con)
 }
 
 bool Objecter::ms_get_authorizer(int dest_type,
-                                AuthAuthorizer **authorizer,
-                                bool force_new)
+                                AuthAuthorizer **authorizer)
 {
   if (!initialized)
     return false;
@@ -4662,7 +4629,6 @@ void Objecter::dump_pool_ops(Formatter *fmt) const
     fmt->dump_int("pool", op->pool);
     fmt->dump_string("name", op->name);
     fmt->dump_int("operation_type", op->pool_op);
-    fmt->dump_unsigned("auid", op->auid);
     fmt->dump_unsigned("crush_rule", op->crush_rule);
     fmt->dump_stream("snapid") << op->snapid;
     fmt->dump_stream("last_sent") << op->last_submit;
@@ -4715,8 +4681,10 @@ Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
 {
 }
 
-bool Objecter::RequestStateHook::call(std::string command, cmdmap_t& cmdmap,
-                                     std::string format, bufferlist& out)
+bool Objecter::RequestStateHook::call(std::string_view command,
+                                     const cmdmap_t& cmdmap,
+                                     std::string_view format,
+                                     bufferlist& out)
 {
   Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
   shared_lock rl(m_objecter->rwlock);
@@ -4737,7 +4705,8 @@ void Objecter::blacklist_self(bool set)
   else
     cmd.push_back("\"blacklistop\":\"rm\",");
   stringstream ss;
-  ss << messenger->get_myaddr();
+  // this is somewhat imprecise in that we are blacklisting our first addr only
+  ss << messenger->get_myaddrs().front().get_legacy_str();
   cmd.push_back("\"addr\":\"" + ss.str() + "\"");
 
   MMonCommand *m = new MMonCommand(monc->get_fsid());
@@ -4757,12 +4726,11 @@ void Objecter::handle_command_reply(MCommandReply *m)
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
     m->put();
-    if (s)
-      s->put();
     return;
   }
 
@@ -4773,8 +4741,6 @@ void Objecter::handle_command_reply(MCommandReply *m)
                   << " not found" << dendl;
     m->put();
     sl.unlock();
-    if (s)
-      s->put();
     return;
   }
 
@@ -4787,8 +4753,6 @@ void Objecter::handle_command_reply(MCommandReply *m)
                   << dendl;
     m->put();
     sl.unlock();
-    if (s)
-      s->put();
     return;
   }
   if (c->poutbl) {
@@ -4802,8 +4766,6 @@ void Objecter::handle_command_reply(MCommandReply *m)
   sul.unlock();
 
   m->put();
-  if (s)
-    s->put();
 }
 
 void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
@@ -4842,7 +4804,7 @@ void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
 
 int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
 {
-  assert(sul.owns_lock() && sul.mutex() == &rwlock);
+  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
 
   c->map_check_error = 0;
 
@@ -4880,7 +4842,7 @@ int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
 
   OSDSession *s;
   int r = _get_session(c->target.osd, &s, sul);
-  assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+  ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
 
   if (c->session != s) {
     put_session(s);
@@ -4898,11 +4860,11 @@ int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
 void Objecter::_assign_command_session(CommandOp *c,
                                       shunique_lock& sul)
 {
-  assert(sul.owns_lock() && sul.mutex() == &rwlock);
+  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
 
   OSDSession *s;
   int r = _get_session(c->target.osd, &s, sul);
-  assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+  ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
 
   if (c->session != s) {
     if (c->session) {
@@ -4921,8 +4883,8 @@ void Objecter::_assign_command_session(CommandOp *c,
 void Objecter::_send_command(CommandOp *c)
 {
   ldout(cct, 10) << "_send_command " << c->tid << dendl;
-  assert(c->session);
-  assert(c->session->con);
+  ceph_assert(c->session);
+  ceph_assert(c->session->con);
   MCommand *m = new MCommand(monc->monmap.fsid);
   m->cmd = c->cmd;
   m->set_data(c->inbl);
@@ -4933,7 +4895,7 @@ void Objecter::_send_command(CommandOp *c)
 
 int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
   unique_lock wl(rwlock);
 
@@ -4968,7 +4930,6 @@ void Objecter::_finish_command(CommandOp *c, int r, string rs)
   if (c->ontimeout && r != -ETIMEDOUT)
     timer.cancel_event(c->ontimeout);
 
-  OSDSession *s = c->session;
   _session_command_op_remove(c->session, c);
 
   c->put();
@@ -4980,31 +4941,31 @@ Objecter::OSDSession::~OSDSession()
 {
   // Caller is responsible for re-assigning or
   // destroying any ops that were assigned to us
-  assert(ops.empty());
-  assert(linger_ops.empty());
-  assert(command_ops.empty());
+  ceph_assert(ops.empty());
+  ceph_assert(linger_ops.empty());
+  ceph_assert(command_ops.empty());
 }
 
 Objecter::~Objecter()
 {
   delete osdmap;
 
-  assert(homeless_session->get_nref() == 1);
-  assert(num_homeless_ops == 0);
+  ceph_assert(homeless_session->get_nref() == 1);
+  ceph_assert(num_homeless_ops == 0);
   homeless_session->put();
 
-  assert(osd_sessions.empty());
-  assert(poolstat_ops.empty());
-  assert(statfs_ops.empty());
-  assert(pool_ops.empty());
-  assert(waiting_for_map.empty());
-  assert(linger_ops.empty());
-  assert(check_latest_map_lingers.empty());
-  assert(check_latest_map_ops.empty());
-  assert(check_latest_map_commands.empty());
+  ceph_assert(osd_sessions.empty());
+  ceph_assert(poolstat_ops.empty());
+  ceph_assert(statfs_ops.empty());
+  ceph_assert(pool_ops.empty());
+  ceph_assert(waiting_for_map.empty());
+  ceph_assert(linger_ops.empty());
+  ceph_assert(check_latest_map_lingers.empty());
+  ceph_assert(check_latest_map_ops.empty());
+  ceph_assert(check_latest_map_commands.empty());
 
-  assert(!m_request_state_hook);
-  assert(!logger);
+  ceph_assert(!m_request_state_hook);
+  ceph_assert(!logger);
 }
 
 /**
@@ -5057,7 +5018,7 @@ struct C_EnumerateReply : public Context {
       const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
     objecter(objecter_), next(next_), result(result_),
     end(end_), pool_id(pool_id_), on_finish(on_finish_),
-    epoch(0), budget(0)
+    epoch(0), budget(-1)
   {}
 
   void finish(int r) override {
@@ -5077,7 +5038,7 @@ void Objecter::enumerate_objects(
     hobject_t *next,
     Context *on_finish)
 {
-  assert(result);
+  ceph_assert(result);
 
   if (!end.is_max() && start > end) {
     lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
@@ -5097,7 +5058,7 @@ void Objecter::enumerate_objects(
   }
 
   shared_lock rl(rwlock);
-  assert(osdmap->get_epoch());
+  ceph_assert(osdmap->get_epoch());
   if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
     rl.unlock();
     lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
@@ -5141,7 +5102,7 @@ void Objecter::_enumerate_reply(
     hobject_t *next,
     Context *on_finish)
 {
-  if (budget > 0) {
+  if (budget >= 0) {
     put_op_budget_bytes(budget);
   }
 
@@ -5151,17 +5112,17 @@ void Objecter::_enumerate_reply(
     return;
   }
 
-  assert(next != NULL);
+  ceph_assert(next != NULL);
 
   // Decode the results
-  bufferlist::iterator iter = bl.begin();
+  auto iter = bl.cbegin();
   pg_nls_response_t response;
 
   // XXX extra_info doesn't seem used anywhere?
   bufferlist extra_info;
-  ::decode(response, iter);
+  decode(response, iter);
   if (!iter.end()) {
-    ::decode(extra_info, iter);
+    decode(extra_info, iter);
   }
 
   ldout(cct, 10) << __func__ << ": got " << response.entries.size()
@@ -5226,7 +5187,7 @@ namespace {
   void do_decode(std::vector<T>& items, std::vector<bufferlist>& bls)
   {
     for (auto bl : bls) {
-      auto p = bl.begin();
+      auto p = bl.cbegin();
       T t;
       decode(t, p);
       items.push_back(t);
@@ -5268,7 +5229,7 @@ namespace {
   private:
     void decode() {
       scrub_ls_result_t result;
-      auto p = bl.begin();
+      auto p = bl.cbegin();
       result.decode(p);
       *interval = result.interval;
       if (objects) {
@@ -5288,7 +5249,7 @@ namespace {
   {
     OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
     op->flags |= CEPH_OSD_FLAG_PGOP;
-    assert(interval);
+    ceph_assert(interval);
     arg.encode(osd_op.indata);
     unsigned p = op->ops.size() - 1;
     auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};