]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osdc/Objecter.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / osdc / Objecter.cc
index 46bee0125e6a2c01f33d7bd106499a21679e5f2c..e821b4ee00f0443a750726fe8d06d5dfbd0586eb 100644 (file)
  *
  */
 
+#include <algorithm>
+#include <cerrno>
+
 #include "Objecter.h"
 #include "osd/OSDMap.h"
+#include "osd/error_code.h"
 #include "Filer.h"
 
 #include "mon/MonClient.h"
+#include "mon/error_code.h"
 
 #include "msg/Messenger.h"
 #include "msg/Message.h"
 
 #include "messages/MWatchNotify.h"
 
-#include <errno.h>
 
+#include "common/Cond.h"
 #include "common/config.h"
 #include "common/perf_counters.h"
 #include "common/scrub_types.h"
 #include "include/str_list.h"
 #include "common/errno.h"
 #include "common/EventTrace.h"
+#include "common/async/waiter.h"
+#include "error_code.h"
+
+
+using std::list;
+using std::make_pair;
+using std::map;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+using ceph::decode;
+using ceph::encode;
+using ceph::Formatter;
+
+using std::defer_lock;
+using std::scoped_lock;
+using std::shared_lock;
+using std::unique_lock;
 
 using ceph::real_time;
 using ceph::real_clock;
@@ -59,6 +87,14 @@ using ceph::mono_time;
 
 using ceph::timespan;
 
+using ceph::shunique_lock;
+using ceph::acquire_shared;
+using ceph::acquire_unique;
+
+namespace bc = boost::container;
+namespace bs = boost::system;
+namespace ca = ceph::async;
+namespace cb = ceph::buffer;
 
 #define dout_subsys ceph_subsys_objecter
 #undef dout_prefix
@@ -73,6 +109,7 @@ enum {
   l_osdc_op_send_bytes,
   l_osdc_op_resend,
   l_osdc_op_reply,
+  l_osdc_oplen_avg,
 
   l_osdc_op,
   l_osdc_op_r,
@@ -98,9 +135,6 @@ enum {
   l_osdc_osdop_cmpxattr,
   l_osdc_osdop_rmxattr,
   l_osdc_osdop_resetxattrs,
-  l_osdc_osdop_tmap_up,
-  l_osdc_osdop_tmap_put,
-  l_osdc_osdop_tmap_get,
   l_osdc_osdop_call,
   l_osdc_osdop_watch,
   l_osdc_osdop_notify,
@@ -146,82 +180,61 @@ enum {
   l_osdc_last,
 };
 
+namespace {
+inline bs::error_code osdcode(int r) {
+  return (r < 0) ? bs::error_code(-r, osd_category()) : bs::error_code();
+}
+}
 
 // config obs ----------------------------
 
-static const char *config_keys[] = {
-  "crush_location",
-  NULL
-};
-
 class Objecter::RequestStateHook : public AdminSocketHook {
   Objecter *m_objecter;
 public:
   explicit RequestStateHook(Objecter *objecter);
-  bool call(std::string command, cmdmap_t& cmdmap, std::string format,
-            bufferlist& out) override;
+  int call(std::string_view command, const cmdmap_t& cmdmap,
+          const bufferlist&,
+          Formatter *f,
+          std::ostream& ss,
+          cb::list& out) override;
 };
 
-/**
- * This is a more limited form of C_Contexts, but that requires
- * a ceph_context which we don't have here.
- */
-class ObjectOperation::C_TwoContexts : public Context {
-  Context *first;
-  Context *second;
-public:
-  C_TwoContexts(Context *first, Context *second) :
-    first(first), second(second) {}
-  void finish(int r) override {
-    first->complete(r);
-    second->complete(r);
-    first = NULL;
-    second = NULL;
-  }
-
-  ~C_TwoContexts() override {
-    delete first;
-    delete second;
-  }
-};
-
-void ObjectOperation::add_handler(Context *extra) {
-  size_t last = out_handler.size() - 1;
-  Context *orig = out_handler[last];
-  if (orig) {
-    Context *wrapper = new C_TwoContexts(orig, extra);
-    out_handler[last] = wrapper;
-  } else {
-    out_handler[last] = extra;
-  }
-}
-
-Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
-  object_t& oid)
+std::unique_lock<std::mutex> Objecter::OSDSession::get_lock(object_t& oid)
 {
   if (oid.name.empty())
-    return unique_completion_lock();
+    return {};
 
   static constexpr uint32_t HASH_PRIME = 1021;
   uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
     % HASH_PRIME;
 
-  return unique_completion_lock(completion_locks[h % num_locks],
-                               std::defer_lock);
+  return {completion_locks[h % num_locks], std::defer_lock};
 }
 
 const char** Objecter::get_tracked_conf_keys() const
 {
+  static const char *config_keys[] = {
+    "crush_location",
+    "rados_mon_op_timeout",
+    "rados_osd_op_timeout",
+    NULL
+  };
   return config_keys;
 }
 
 
-void Objecter::handle_conf_change(const struct md_config_t *conf,
+void Objecter::handle_conf_change(const ConfigProxy& conf,
                                  const std::set <std::string> &changed)
 {
   if (changed.count("crush_location")) {
     update_crush_location();
   }
+  if (changed.count("rados_mon_op_timeout")) {
+    mon_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
+  }
+  if (changed.count("rados_osd_op_timeout")) {
+    osd_timeout = conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
+  }
 }
 
 void Objecter::update_crush_location()
@@ -237,7 +250,7 @@ void Objecter::update_crush_location()
  */
 void Objecter::init()
 {
-  assert(!initialized);
+  ceph_assert(!initialized);
 
   if (!logger) {
     PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
@@ -246,9 +259,10 @@ void Objecter::init()
                PerfCountersBuilder::PRIO_CRITICAL);
     pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
     pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
-    pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data");
+    pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
     pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
     pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
+    pcb.add_u64_avg(l_osdc_oplen_avg, "oplen_avg", "Average length of operation vector");
 
     pcb.add_u64_counter(l_osdc_op, "op", "Operations");
     pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
@@ -292,12 +306,6 @@ void Objecter::init()
                        "Remove xattr operations");
     pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
                        "Reset xattr operations");
-    pcb.add_u64_counter(l_osdc_osdop_tmap_up, "osdop_tmap_up",
-                       "TMAP update operations");
-    pcb.add_u64_counter(l_osdc_osdop_tmap_put, "osdop_tmap_put",
-                       "TMAP put operations");
-    pcb.add_u64_counter(l_osdc_osdop_tmap_get, "osdop_tmap_get",
-                       "TMAP get operations");
     pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
                        "Call (execute) operations");
     pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
@@ -370,9 +378,8 @@ void Objecter::init()
   }
 
   m_request_state_hook = new RequestStateHook(this);
-  AdminSocket* admin_socket = cct->get_admin_socket();
+  auto admin_socket = cct->get_admin_socket();
   int ret = admin_socket->register_command("objecter_requests",
-                                          "objecter_requests",
                                           m_request_state_hook,
                                           "show in-progress osd requests");
 
@@ -385,7 +392,7 @@ void Objecter::init()
 
   update_crush_location();
 
-  cct->_conf->add_observer(this);
+  cct->_conf.add_observer(this);
 
   initialized = true;
 }
@@ -400,6 +407,7 @@ void Objecter::start(const OSDMap* o)
   start_tick();
   if (o) {
     osdmap->deepish_copy_from(*o);
+    prune_pg_mapping(osdmap->get_pools());
   } else if (osdmap->get_epoch() == 0) {
     _maybe_request_map();
   }
@@ -407,65 +415,64 @@ void Objecter::start(const OSDMap* o)
 
 void Objecter::shutdown()
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
   unique_lock wl(rwlock);
 
   initialized = false;
 
-  cct->_conf->remove_observer(this);
+  wl.unlock();
+  cct->_conf.remove_observer(this);
+  wl.lock();
 
-  map<int,OSDSession*>::iterator p;
   while (!osd_sessions.empty()) {
-    p = osd_sessions.begin();
+    auto p = osd_sessions.begin();
     close_session(p->second);
   }
 
   while(!check_latest_map_lingers.empty()) {
-    map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
+    auto i = check_latest_map_lingers.begin();
     i->second->put();
     check_latest_map_lingers.erase(i->first);
   }
 
   while(!check_latest_map_ops.empty()) {
-    map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
+    auto i = check_latest_map_ops.begin();
     i->second->put();
     check_latest_map_ops.erase(i->first);
   }
 
   while(!check_latest_map_commands.empty()) {
-    map<ceph_tid_t, CommandOp*>::iterator i
-      = check_latest_map_commands.begin();
+    auto i = check_latest_map_commands.begin();
     i->second->put();
     check_latest_map_commands.erase(i->first);
   }
 
   while(!poolstat_ops.empty()) {
-    map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
+    auto i = poolstat_ops.begin();
     delete i->second;
     poolstat_ops.erase(i->first);
   }
 
   while(!statfs_ops.empty()) {
-    map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
+    auto i = statfs_ops.begin();
     delete i->second;
     statfs_ops.erase(i->first);
   }
 
   while(!pool_ops.empty()) {
-    map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
+    auto i = pool_ops.begin();
     delete i->second;
     pool_ops.erase(i->first);
   }
 
   ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
   while(!homeless_session->linger_ops.empty()) {
-    std::map<uint64_t, LingerOp*>::iterator i
-      = homeless_session->linger_ops.begin();
+    auto i = homeless_session->linger_ops.begin();
     ldout(cct, 10) << " linger_op " << i->first << dendl;
     LingerOp *lop = i->second;
     {
-      OSDSession::unique_lock swl(homeless_session->lock);
+      std::unique_lock swl(homeless_session->lock);
       _session_linger_op_remove(homeless_session, lop);
     }
     linger_ops.erase(lop->linger_id);
@@ -474,23 +481,22 @@ void Objecter::shutdown()
   }
 
   while(!homeless_session->ops.empty()) {
-    std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
+    auto i = homeless_session->ops.begin();
     ldout(cct, 10) << " op " << i->first << dendl;
-    Op *op = i->second;
+    auto op = i->second;
     {
-      OSDSession::unique_lock swl(homeless_session->lock);
+      std::unique_lock swl(homeless_session->lock);
       _session_op_remove(homeless_session, op);
     }
     op->put();
   }
 
   while(!homeless_session->command_ops.empty()) {
-    std::map<ceph_tid_t, CommandOp*>::iterator i
-      = homeless_session->command_ops.begin();
+    auto i = homeless_session->command_ops.begin();
     ldout(cct, 10) << " command_op " << i->first << dendl;
-    CommandOp *cop = i->second;
+    auto cop = i->second;
     {
-      OSDSession::unique_lock swl(homeless_session->lock);
+      std::unique_lock swl(homeless_session->lock);
       _session_command_op_remove(homeless_session, cop);
     }
     cop->put();
@@ -516,22 +522,22 @@ void Objecter::shutdown()
   // This is safe because we guarantee no concurrent calls to
   // shutdown() with the ::initialized check at start.
   if (m_request_state_hook) {
-    AdminSocket* admin_socket = cct->get_admin_socket();
-    admin_socket->unregister_command("objecter_requests");
+    auto admin_socket = cct->get_admin_socket();
+    admin_socket->unregister_commands(m_request_state_hook);
     delete m_request_state_hook;
     m_request_state_hook = NULL;
   }
 }
 
 void Objecter::_send_linger(LingerOp *info,
-                           shunique_lock& sul)
+                           ceph::shunique_lock<ceph::shared_mutex>& sul)
 {
-  assert(sul.owns_lock() && sul.mutex() == &rwlock);
+  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
 
-  vector<OSDOp> opv;
-  Context *oncommit = NULL;
-  LingerOp::shared_lock watchl(info->watch_lock);
-  bufferlist *poutbl = NULL;
+  fu2::unique_function<Op::OpSig> oncommit;
+  osdc_opvec opv;
+  std::shared_lock watchl(info->watch_lock);
+  cb::list *poutbl = nullptr;
   if (info->registered && info->is_watch) {
     ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
                   << dendl;
@@ -540,22 +546,26 @@ void Objecter::_send_linger(LingerOp *info,
     opv.back().op.watch.cookie = info->get_cookie();
     opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
     opv.back().op.watch.gen = ++info->register_gen;
-    oncommit = new C_Linger_Reconnect(this, info);
+    oncommit = CB_Linger_Reconnect(this, info);
   } else {
     ldout(cct, 15) << "send_linger " << info->linger_id << " register"
                   << dendl;
     opv = info->ops;
-    C_Linger_Commit *c = new C_Linger_Commit(this, info);
+    // TODO Augment ca::Completion with an equivalent of
+    // target so we can handle these cases better.
+    auto c = std::make_unique<CB_Linger_Commit>(this, info);
     if (!info->is_watch) {
       info->notify_id = 0;
       poutbl = &c->outbl;
     }
-    oncommit = c;
+    oncommit = [c = std::move(c)](bs::error_code ec) mutable {
+                std::move(*c)(ec);
+              };
   }
   watchl.unlock();
-  Op *o = new Op(info->target.base_oid, info->target.base_oloc,
-                opv, info->target.flags | CEPH_OSD_FLAG_READ,
-                oncommit, info->pobjver);
+  auto o = new Op(info->target.base_oid, info->target.base_oloc,
+                 std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
+                 std::move(oncommit), info->pobjver);
   o->outbl = poutbl;
   o->snapid = info->snap;
   o->snapc = info->snapc;
@@ -566,33 +576,38 @@ void Objecter::_send_linger(LingerOp *info,
 
   // do not resend this; we will send a new op to reregister
   o->should_resend = false;
+  o->ctx_budgeted = true;
 
   if (info->register_tid) {
-    // repeat send.  cancel old registeration op, if any.
-    OSDSession::unique_lock sl(info->session->lock);
+    // repeat send.  cancel old registration op, if any.
+    std::unique_lock sl(info->session->lock);
     if (info->session->ops.count(info->register_tid)) {
-      Op *o = info->session->ops[info->register_tid];
+      auto o = info->session->ops[info->register_tid];
       _op_cancel_map_check(o);
       _cancel_linger_op(o);
     }
     sl.unlock();
-
-    _op_submit(o, sul, &info->register_tid);
-  } else {
-    // first send
-    _op_submit_with_budget(o, sul, &info->register_tid);
   }
 
+  _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
+
   logger->inc(l_osdc_linger_send);
 }
 
-void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
+void Objecter::_linger_commit(LingerOp *info, bs::error_code ec,
+                             cb::list& outbl)
 {
-  LingerOp::unique_lock wl(info->watch_lock);
+  std::unique_lock wl(info->watch_lock);
   ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
   if (info->on_reg_commit) {
-    info->on_reg_commit->complete(r);
-    info->on_reg_commit = NULL;
+    info->on_reg_commit->defer(std::move(info->on_reg_commit),
+                              ec, cb::list{});
+    info->on_reg_commit.reset();
+  }
+  if (ec && info->on_notify_finish) {
+    info->on_notify_finish->defer(std::move(info->on_notify_finish),
+                                 ec, cb::list{});
+    info->on_notify_finish.reset();
   }
 
   // only tell the user the first time we do this
@@ -601,65 +616,65 @@ void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
 
   if (!info->is_watch) {
     // make note of the notify_id
-    bufferlist::iterator p = outbl.begin();
+    auto p = outbl.cbegin();
     try {
-      ::decode(info->notify_id, p);
+      decode(info->notify_id, p);
       ldout(cct, 10) << "_linger_commit  notify_id=" << info->notify_id
                     << dendl;
     }
-    catch (buffer::error& e) {
+    catch (cb::error& e) {
     }
   }
 }
 
-struct C_DoWatchError : public Context {
+class CB_DoWatchError {
   Objecter *objecter;
-  Objecter::LingerOp *info;
-  int err;
-  C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
-    : objecter(o), info(i), err(r) {
-    info->get();
+  boost::intrusive_ptr<Objecter::LingerOp> info;
+  bs::error_code ec;
+public:
+  CB_DoWatchError(Objecter *o, Objecter::LingerOp *i,
+                 bs::error_code ec)
+    : objecter(o), info(i), ec(ec) {
     info->_queued_async();
   }
-  void finish(int r) override {
-    Objecter::unique_lock wl(objecter->rwlock);
+  void operator()() {
+    std::unique_lock wl(objecter->rwlock);
     bool canceled = info->canceled;
     wl.unlock();
 
     if (!canceled) {
-      info->watch_context->handle_error(info->get_cookie(), err);
+      info->handle(ec, 0, info->get_cookie(), 0, {});
     }
 
     info->finished_async();
-    info->put();
   }
 };
 
-int Objecter::_normalize_watch_error(int r)
+bs::error_code Objecter::_normalize_watch_error(bs::error_code ec)
 {
   // translate ENOENT -> ENOTCONN so that a delete->disconnection
-  // notification and a failure to reconnect becuase we raced with
+  // notification and a failure to reconnect because we raced with
   // the delete appear the same to the user.
-  if (r == -ENOENT)
-    r = -ENOTCONN;
-  return r;
+  if (ec == bs::errc::no_such_file_or_directory)
+    ec = bs::error_code(ENOTCONN, osd_category());
+  return ec;
 }
 
-void Objecter::_linger_reconnect(LingerOp *info, int r)
+void Objecter::_linger_reconnect(LingerOp *info, bs::error_code ec)
 {
-  ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
+  ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << ec 
                 << " (last_error " << info->last_error << ")" << dendl;
-  if (r < 0) {
-    LingerOp::unique_lock wl(info->watch_lock);
+  std::unique_lock wl(info->watch_lock);
+  if (ec) {
     if (!info->last_error) {
-      r = _normalize_watch_error(r);
-      info->last_error = r;
-      if (info->watch_context) {
-       finisher->queue(new C_DoWatchError(this, info, r));
+      ec = _normalize_watch_error(ec);
+      if (info->handle) {
+       boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
       }
     }
-    wl.unlock();
   }
+
+  info->last_error = ec;
 }
 
 void Objecter::_send_linger_ping(LingerOp *info)
@@ -677,48 +692,47 @@ void Objecter::_send_linger_ping(LingerOp *info)
     return;
   }
 
-  ceph::mono_time now = ceph::mono_clock::now();
+  ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
   ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
                 << dendl;
 
-  vector<OSDOp> opv(1);
+  osdc_opvec opv(1);
   opv[0].op.op = CEPH_OSD_OP_WATCH;
   opv[0].op.watch.cookie = info->get_cookie();
   opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
   opv[0].op.watch.gen = info->register_gen;
-  C_Linger_Ping *onack = new C_Linger_Ping(this, info);
+
   Op *o = new Op(info->target.base_oid, info->target.base_oloc,
-                opv, info->target.flags | CEPH_OSD_FLAG_READ,
-                onack, NULL, NULL);
+                std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
+                CB_Linger_Ping(this, info, now),
+                nullptr, nullptr);
   o->target = info->target;
   o->should_resend = false;
   _send_op_account(o);
-  MOSDOp *m = _prepare_osd_op(o);
   o->tid = ++last_tid;
   _session_op_assign(info->session, o);
-  _send_op(o, m);
+  _send_op(o);
   info->ping_tid = o->tid;
 
-  onack->sent = now;
   logger->inc(l_osdc_linger_ping);
 }
 
-void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent,
+void Objecter::_linger_ping(LingerOp *info, bs::error_code ec, ceph::coarse_mono_time sent,
                            uint32_t register_gen)
 {
-  LingerOp::unique_lock l(info->watch_lock);
+  std::unique_lock l(info->watch_lock);
   ldout(cct, 10) << __func__ << " " << info->linger_id
-                << " sent " << sent << " gen " << register_gen << " = " << r
+                << " sent " << sent << " gen " << register_gen << " = " << ec
                 << " (last_error " << info->last_error
                 << " register_gen " << info->register_gen << ")" << dendl;
   if (info->register_gen == register_gen) {
-    if (r == 0) {
+    if (!ec) {
       info->watch_valid_thru = sent;
-    } else if (r < 0 && !info->last_error) {
-      r = _normalize_watch_error(r);
-      info->last_error = r;
-      if (info->watch_context) {
-       finisher->queue(new C_DoWatchError(this, info, r));
+    } else if (ec && !info->last_error) {
+      ec = _normalize_watch_error(ec);
+      info->last_error = ec;
+      if (info->handle) {
+       boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
       }
     }
   } else {
@@ -726,23 +740,23 @@ void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent,
   }
 }
 
-int Objecter::linger_check(LingerOp *info)
+tl::expected<ceph::timespan,
+            bs::error_code> Objecter::linger_check(LingerOp *info)
 {
-  LingerOp::shared_lock l(info->watch_lock);
+  std::shared_lock l(info->watch_lock);
 
-  mono_time stamp = info->watch_valid_thru;
+  ceph::coarse_mono_time stamp = info->watch_valid_thru;
   if (!info->watch_pending_async.empty())
-    stamp = MIN(info->watch_valid_thru, info->watch_pending_async.front());
-  auto age = mono_clock::now() - stamp;
+    stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
+  auto age = ceph::coarse_mono_clock::now() - stamp;
 
   ldout(cct, 10) << __func__ << " " << info->linger_id
                 << " err " << info->last_error
                 << " age " << age << dendl;
   if (info->last_error)
-    return info->last_error;
+    return tl::unexpected(info->last_error);
   // return a safe upper bound (we are truncating to ms)
-  return
-    1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
+  return age;
 }
 
 void Objecter::linger_cancel(LingerOp *info)
@@ -758,13 +772,13 @@ void Objecter::_linger_cancel(LingerOp *info)
   ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
   if (!info->canceled) {
     OSDSession *s = info->session;
-    OSDSession::unique_lock sl(s->lock);
+    std::unique_lock sl(s->lock);
     _session_linger_op_remove(s, info);
     sl.unlock();
 
     linger_ops.erase(info->linger_id);
     linger_ops_set.erase(info);
-    assert(linger_ops.size() == linger_ops_set.size());
+    ceph_assert(linger_ops.size() == linger_ops_set.size());
 
     info->canceled = true;
     info->put();
@@ -779,25 +793,22 @@ Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
                                              const object_locator_t& oloc,
                                              int flags)
 {
-  LingerOp *info = new LingerOp;
+  unique_lock l(rwlock);
+  // Acquire linger ID
+  auto info = new LingerOp(this, ++max_linger_id);
   info->target.base_oid = oid;
   info->target.base_oloc = oloc;
   if (info->target.base_oloc.key == oid)
     info->target.base_oloc.key.clear();
   info->target.flags = flags;
-  info->watch_valid_thru = mono_clock::now();
-
-  unique_lock l(rwlock);
-
-  // Acquire linger ID
-  info->linger_id = ++max_linger_id;
+  info->watch_valid_thru = ceph::coarse_mono_clock::now();
   ldout(cct, 10) << __func__ << " info " << info
                 << " linger_id " << info->linger_id
                 << " cookie " << info->get_cookie()
                 << dendl;
   linger_ops[info->linger_id] = info;
   linger_ops_set.insert(info);
-  assert(linger_ops.size() == linger_ops_set.size());
+  ceph_assert(linger_ops.size() == linger_ops_set.size());
 
   info->get(); // for the caller
   return info;
@@ -807,8 +818,8 @@ ceph_tid_t Objecter::linger_watch(LingerOp *info,
                                  ObjectOperation& op,
                                  const SnapContext& snapc,
                                  real_time mtime,
-                                 bufferlist& inbl,
-                                 Context *oncommit,
+                                 cb::list& inbl,
+                                 decltype(info->on_reg_commit)&& oncommit,
                                  version_t *objver)
 {
   info->is_watch = true;
@@ -817,52 +828,61 @@ ceph_tid_t Objecter::linger_watch(LingerOp *info,
   info->target.flags |= CEPH_OSD_FLAG_WRITE;
   info->ops = op.ops;
   info->inbl = inbl;
-  info->poutbl = NULL;
   info->pobjver = objver;
-  info->on_reg_commit = oncommit;
+  info->on_reg_commit = std::move(oncommit);
+
+  info->ctx_budget = take_linger_budget(info);
 
   shunique_lock sul(rwlock, ceph::acquire_unique);
   _linger_submit(info, sul);
   logger->inc(l_osdc_linger_active);
 
+  op.clear();
   return info->linger_id;
 }
 
 ceph_tid_t Objecter::linger_notify(LingerOp *info,
                                   ObjectOperation& op,
-                                  snapid_t snap, bufferlist& inbl,
-                                  bufferlist *poutbl,
-                                  Context *onfinish,
+                                  snapid_t snap, cb::list& inbl,
+                                  decltype(LingerOp::on_reg_commit)&& onfinish,
                                   version_t *objver)
 {
   info->snap = snap;
   info->target.flags |= CEPH_OSD_FLAG_READ;
   info->ops = op.ops;
   info->inbl = inbl;
-  info->poutbl = poutbl;
   info->pobjver = objver;
-  info->on_reg_commit = onfinish;
+  info->on_reg_commit = std::move(onfinish);
+  info->ctx_budget = take_linger_budget(info);
 
   shunique_lock sul(rwlock, ceph::acquire_unique);
   _linger_submit(info, sul);
   logger->inc(l_osdc_linger_active);
 
+  op.clear();
   return info->linger_id;
 }
 
-void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
+void Objecter::_linger_submit(LingerOp *info,
+                             ceph::shunique_lock<ceph::shared_mutex>& sul)
 {
-  assert(sul.owns_lock() && sul.mutex() == &rwlock);
-  assert(info->linger_id);
+  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
+  ceph_assert(info->linger_id);
+  ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
 
   // Populate Op::target
   OSDSession *s = NULL;
-  _calc_target(&info->target, nullptr);
+  int r = _calc_target(&info->target, nullptr);
+  switch (r) {
+  case RECALC_OP_TARGET_POOL_EIO:
+    _check_linger_pool_eio(info);
+    return;
+  }
 
   // Create LingerOp<->OSDSession relation
-  int r = _get_session(info->target.osd, &s, sul);
-  assert(r == 0);
-  OSDSession::unique_lock sl(s->lock);
+  r = _get_session(info->target.osd, &s, sul);
+  ceph_assert(r == 0);
+  unique_lock sl(s->lock);
   _session_linger_op_assign(s, info);
   sl.unlock();
   put_session(s);
@@ -870,18 +890,16 @@ void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
   _send_linger(info, sul);
 }
 
-struct C_DoWatchNotify : public Context {
+struct CB_DoWatchNotify {
   Objecter *objecter;
-  Objecter::LingerOp *info;
-  MWatchNotify *msg;
-  C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
+  boost::intrusive_ptr<Objecter::LingerOp> info;
+  boost::intrusive_ptr<MWatchNotify> msg;
+  CB_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
     : objecter(o), info(i), msg(m) {
-    info->get();
     info->_queued_async();
-    msg->get();
   }
-  void finish(int r) override {
-    objecter->_do_watch_notify(info, msg);
+  void operator()() {
+    objecter->_do_watch_notify(std::move(info), std::move(msg));
   }
 };
 
@@ -897,12 +915,13 @@ void Objecter::handle_watch_notify(MWatchNotify *m)
     ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
     return;
   }
-  LingerOp::unique_lock wl(info->watch_lock);
+  std::unique_lock wl(info->watch_lock);
   if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
     if (!info->last_error) {
-      info->last_error = -ENOTCONN;
-      if (info->watch_context) {
-       finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
+      info->last_error = bs::error_code(ENOTCONN, osd_category());
+      if (info->handle) {
+       boost::asio::defer(finish_strand, CB_DoWatchError(this, info,
+                                                         info->last_error));
       }
     }
   } else if (!info->is_watch) {
@@ -914,24 +933,26 @@ void Objecter::handle_watch_notify(MWatchNotify *m)
       ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
                     << " != " << info->notify_id << ", ignoring" << dendl;
     } else if (info->on_notify_finish) {
-      info->notify_result_bl->claim(m->get_data());
-      info->on_notify_finish->complete(m->return_code);
+      info->on_notify_finish->defer(
+       std::move(info->on_notify_finish),
+       osdcode(m->return_code), std::move(m->get_data()));
 
       // if we race with reconnect we might get a second notify; only
       // notify the caller once!
-      info->on_notify_finish = NULL;
+      info->on_notify_finish = nullptr;
     }
   } else {
-    finisher->queue(new C_DoWatchNotify(this, info, m));
+    boost::asio::defer(finish_strand, CB_DoWatchNotify(this, info, m));
   }
 }
 
-void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
+void Objecter::_do_watch_notify(boost::intrusive_ptr<LingerOp> info,
+                                boost::intrusive_ptr<MWatchNotify> m)
 {
   ldout(cct, 10) << __func__ << " " << *m << dendl;
 
   shared_lock l(rwlock);
-  assert(initialized);
+  ceph_assert(initialized);
 
   if (info->canceled) {
     l.unlock();
@@ -939,31 +960,25 @@ void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
   }
 
   // notify completion?
-  assert(info->is_watch);
-  assert(info->watch_context);
-  assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
+  ceph_assert(info->is_watch);
+  ceph_assert(info->handle);
+  ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
 
   l.unlock();
 
   switch (m->opcode) {
   case CEPH_WATCH_EVENT_NOTIFY:
-    info->watch_context->handle_notify(m->notify_id, m->cookie,
-                                      m->notifier_gid, m->bl);
+    info->handle({}, m->notify_id, m->cookie, m->notifier_gid, std::move(m->bl));
     break;
   }
 
  out:
   info->finished_async();
-  info->put();
-  m->put();
 }
 
 bool Objecter::ms_dispatch(Message *m)
 {
   ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
-  if (!initialized)
-    return false;
-
   switch (m->get_type()) {
     // these we exlusively handle
   case CEPH_MSG_OSD_OPREPLY:
@@ -1009,26 +1024,27 @@ bool Objecter::ms_dispatch(Message *m)
   return false;
 }
 
-void Objecter::_scan_requests(OSDSession *s,
-                             bool force_resend,
-                             bool cluster_full,
-                             map<int64_t, bool> *pool_full_map,
-                             map<ceph_tid_t, Op*>& need_resend,
-                             list<LingerOp*>& need_resend_linger,
-                             map<ceph_tid_t, CommandOp*>& need_resend_command,
-                             shunique_lock& sul)
+void Objecter::_scan_requests(
+  OSDSession *s,
+  bool skipped_map,
+  bool cluster_full,
+  map<int64_t, bool> *pool_full_map,
+  map<ceph_tid_t, Op*>& need_resend,
+  list<LingerOp*>& need_resend_linger,
+  map<ceph_tid_t, CommandOp*>& need_resend_command,
+  ceph::shunique_lock<ceph::shared_mutex>& sul)
 {
-  assert(sul.owns_lock() && sul.mutex() == &rwlock);
+  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
 
   list<LingerOp*> unregister_lingers;
 
-  OSDSession::unique_lock sl(s->lock);
+  std::unique_lock sl(s->lock);
 
   // check for changed linger mappings (_before_ regular ops)
-  map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
+  auto lp = s->linger_ops.begin();
   while (lp != s->linger_ops.end()) {
-    LingerOp *op = lp->second;
-    assert(op->session == s);
+    auto op = lp->second;
+    ceph_assert(op->session == s);
     // check_linger_pool_dne() may touch linger_ops; prevent iterator
     // invalidation
     ++lp;
@@ -1040,7 +1056,7 @@ void Objecter::_scan_requests(OSDSession *s,
        (*pool_full_map)[op->target.base_oloc.pool];
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
-      if (!force_resend && !force_resend_writes)
+      if (!skipped_map && !force_resend_writes)
        break;
       // -- fall-thru --
     case RECALC_OP_TARGET_NEED_RESEND:
@@ -1056,15 +1072,23 @@ void Objecter::_scan_requests(OSDSession *s,
        unregister_lingers.push_back(op);
       }
       break;
+    case RECALC_OP_TARGET_POOL_EIO:
+      _check_linger_pool_eio(op);
+      ldout(cct, 10) << " need to unregister linger op "
+                    << op->linger_id << dendl;
+      op->get();
+      unregister_lingers.push_back(op);
+      break;
     }
   }
 
   // check for changed request mappings
-  map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
+  auto p = s->ops.begin();
   while (p != s->ops.end()) {
     Op *op = p->second;
     ++p;   // check_op_pool_dne() may touch ops; prevent iterator invalidation
     ldout(cct, 10) << " checking op " << op->tid << dendl;
+    _prune_snapc(osdmap->get_new_removed_snaps(), op);
     bool force_resend_writes = cluster_full;
     if (pool_full_map)
       force_resend_writes = force_resend_writes ||
@@ -1073,26 +1097,27 @@ void Objecter::_scan_requests(OSDSession *s,
                         op->session ? op->session->con.get() : nullptr);
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
-      if (!force_resend && !(force_resend_writes && op->respects_full()))
+      if (!skipped_map && !(force_resend_writes && op->target.respects_full()))
        break;
       // -- fall-thru --
     case RECALC_OP_TARGET_NEED_RESEND:
-      if (op->session) {
-       _session_op_remove(op->session, op);
-      }
+      _session_op_remove(op->session, op);
       need_resend[op->tid] = op;
       _op_cancel_map_check(op);
       break;
     case RECALC_OP_TARGET_POOL_DNE:
       _check_op_pool_dne(op, &sl);
       break;
+    case RECALC_OP_TARGET_POOL_EIO:
+      _check_op_pool_eio(op, &sl);
+      break;
     }
   }
 
   // commands
-  map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
+  auto cp = s->command_ops.begin();
   while (cp != s->command_ops.end()) {
-    CommandOp *c = cp->second;
+    auto c = cp->second;
     ++cp;
     ldout(cct, 10) << " checking command " << c->tid << dendl;
     bool force_resend_writes = cluster_full;
@@ -1103,14 +1128,12 @@ void Objecter::_scan_requests(OSDSession *s,
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
       // resend if skipped map; otherwise do nothing.
-      if (!force_resend && !force_resend_writes)
+      if (!skipped_map && !force_resend_writes)
        break;
       // -- fall-thru --
     case RECALC_OP_TARGET_NEED_RESEND:
       need_resend_command[c->tid] = c;
-      if (c->session) {
-       _session_command_op_remove(c->session, c);
-      }
+      _session_command_op_remove(c->session, c);
       _command_cancel_map_check(c);
       break;
     case RECALC_OP_TARGET_POOL_DNE:
@@ -1123,7 +1146,7 @@ void Objecter::_scan_requests(OSDSession *s,
 
   sl.unlock();
 
-  for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
+  for (auto iter = unregister_lingers.begin();
        iter != unregister_lingers.end();
        ++iter) {
     _linger_cancel(*iter);
@@ -1133,11 +1156,11 @@ void Objecter::_scan_requests(OSDSession *s,
 
 void Objecter::handle_osd_map(MOSDMap *m)
 {
-  shunique_lock sul(rwlock, acquire_unique);
+  ceph::shunique_lock sul(rwlock, acquire_unique);
   if (!initialized)
     return;
 
-  assert(osdmap);
+  ceph_assert(osdmap);
 
   if (m->fsid != monc->get_fsid()) {
     ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
@@ -1150,8 +1173,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
   bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
     _osdmap_has_pool_full();
   map<int64_t, bool> pool_full_map;
-  for (map<int64_t, pg_pool_t>::const_iterator it
-        = osdmap->get_pools().begin();
+  for (auto it = osdmap->get_pools().begin();
        it != osdmap->get_pools().end(); ++it)
     pool_full_map[it->first] = _osdmap_pool_full(it->second);
 
@@ -1183,18 +1205,17 @@ void Objecter::handle_osd_map(MOSDMap *m)
          OSDMap::Incremental inc(m->incremental_maps[e]);
          osdmap->apply_incremental(inc);
 
-          emit_blacklist_events(inc);
+          emit_blocklist_events(inc);
 
          logger->inc(l_osdc_map_inc);
        }
        else if (m->maps.count(e)) {
          ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
-          OSDMap *new_osdmap = new OSDMap();
+          auto new_osdmap = std::make_unique<OSDMap>();
           new_osdmap->decode(m->maps[e]);
 
-          emit_blacklist_events(*osdmap, *new_osdmap);
-
-          osdmap = new_osdmap;
+          emit_blocklist_events(*osdmap, *new_osdmap);
+          osdmap = std::move(new_osdmap);
 
          logger->inc(l_osdc_map_full);
        }
@@ -1214,16 +1235,20 @@ void Objecter::handle_osd_map(MOSDMap *m)
        }
        logger->set(l_osdc_map_epoch, osdmap->get_epoch());
 
+        prune_pg_mapping(osdmap->get_pools());
        cluster_full = cluster_full || _osdmap_full_flag();
        update_pool_full_map(pool_full_map);
 
        // check all outstanding requests on every epoch
+       for (auto& i : need_resend) {
+         _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
+       }
        _scan_requests(homeless_session, skipped_map, cluster_full,
                       &pool_full_map, need_resend,
                       need_resend_linger, need_resend_command, sul);
-       for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
+       for (auto p = osd_sessions.begin();
             p != osd_sessions.end(); ) {
-         OSDSession *s = p->second;
+         auto s = p->second;
          _scan_requests(s, skipped_map, cluster_full,
                         &pool_full_map, need_resend,
                         need_resend_linger, need_resend_command, sul);
@@ -1231,18 +1256,18 @@ void Objecter::handle_osd_map(MOSDMap *m)
          // osd down or addr change?
          if (!osdmap->is_up(s->osd) ||
              (s->con &&
-              s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)) {
+              s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
            close_session(s);
          }
        }
 
-       assert(e == osdmap->get_epoch());
+       ceph_assert(e == osdmap->get_epoch());
       }
 
     } else {
       // first map.  we want the full thing.
       if (m->maps.count(m->get_last())) {
-       for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
+       for (auto p = osd_sessions.begin();
             p != osd_sessions.end(); ++p) {
          OSDSession *s = p->second;
          _scan_requests(s, false, false, NULL, need_resend,
@@ -1251,6 +1276,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
        ldout(cct, 3) << "handle_osd_map decoding full epoch "
                      << m->get_last() << dendl;
        osdmap->decode(m->maps[m->get_last()]);
+        prune_pg_mapping(osdmap->get_pools());
 
        _scan_requests(homeless_session, false, false, NULL,
                       need_resend, need_resend_linger,
@@ -1292,19 +1318,19 @@ void Objecter::handle_osd_map(MOSDMap *m)
   }
 
   // resend requests
-  for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
+  for (auto p = need_resend.begin();
        p != need_resend.end(); ++p) {
-    Op *op = p->second;
-    OSDSession *s = op->session;
+    auto op = p->second;
+    auto s = op->session;
     bool mapped_session = false;
     if (!s) {
       int r = _map_session(&op->target, &s, sul);
-      assert(r == 0);
+      ceph_assert(r == 0);
       mapped_session = true;
     } else {
       get_session(s);
     }
-    OSDSession::unique_lock sl(s->lock);
+    std::unique_lock sl(s->lock);
     if (mapped_session) {
       _session_op_assign(s, op);
     }
@@ -1320,26 +1346,18 @@ void Objecter::handle_osd_map(MOSDMap *m)
     sl.unlock();
     put_session(s);
   }
-  for (list<LingerOp*>::iterator p = need_resend_linger.begin();
+  for (auto p = need_resend_linger.begin();
        p != need_resend_linger.end(); ++p) {
     LingerOp *op = *p;
-    if (!op->session) {
-      _calc_target(&op->target, nullptr);
-      OSDSession *s = NULL;
-      const int r = _get_session(op->target.osd, &s, sul);
-      assert(r == 0);
-      assert(s != NULL);
-      op->session = s;
-      put_session(s);
-    }
+    ceph_assert(op->session);
     if (!op->session->is_homeless()) {
       logger->inc(l_osdc_linger_resend);
       _send_linger(op, sul);
     }
   }
-  for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
+  for (auto p = need_resend_command.begin();
        p != need_resend_command.end(); ++p) {
-    CommandOp *c = p->second;
+    auto c = p->second;
     if (c->target.osd >= 0) {
       _assign_command_session(c, sul);
       if (c->session && !c->session->is_homeless()) {
@@ -1351,14 +1369,12 @@ void Objecter::handle_osd_map(MOSDMap *m)
   _dump_active();
 
   // finish any Contexts that were waiting on a map update
-  map<epoch_t,list< pair< Context*, int > > >::iterator p =
-    waiting_for_map.begin();
+  auto p = waiting_for_map.begin();
   while (p != waiting_for_map.end() &&
         p->first <= osdmap->get_epoch()) {
     //go through the list and call the onfinish methods
-    for (list<pair<Context*, int> >::iterator i = p->second.begin();
-        i != p->second.end(); ++i) {
-      i->first->complete(i->second);
+    for (auto& [c, ec] : p->second) {
+      ca::post(std::move(c), ec);
     }
     waiting_for_map.erase(p++);
   }
@@ -1370,73 +1386,80 @@ void Objecter::handle_osd_map(MOSDMap *m)
   }
 }
 
-void Objecter::enable_blacklist_events()
+void Objecter::enable_blocklist_events()
 {
   unique_lock wl(rwlock);
 
-  blacklist_events_enabled = true;
+  blocklist_events_enabled = true;
 }
 
-void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
+void Objecter::consume_blocklist_events(std::set<entity_addr_t> *events)
 {
   unique_lock wl(rwlock);
 
   if (events->empty()) {
-    events->swap(blacklist_events);
+    events->swap(blocklist_events);
   } else {
-    for (const auto &i : blacklist_events) {
+    for (const auto &i : blocklist_events) {
       events->insert(i);
     }
-    blacklist_events.clear();
+    blocklist_events.clear();
   }
 }
 
-void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
+void Objecter::emit_blocklist_events(const OSDMap::Incremental &inc)
 {
-  if (!blacklist_events_enabled) {
+  if (!blocklist_events_enabled) {
     return;
   }
 
-  for (const auto &i : inc.new_blacklist) {
-    blacklist_events.insert(i.first);
+  for (const auto &i : inc.new_blocklist) {
+    blocklist_events.insert(i.first);
   }
 }
 
-void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
+void Objecter::emit_blocklist_events(const OSDMap &old_osd_map,
                                      const OSDMap &new_osd_map)
 {
-  if (!blacklist_events_enabled) {
+  if (!blocklist_events_enabled) {
     return;
   }
 
   std::set<entity_addr_t> old_set;
   std::set<entity_addr_t> new_set;
+  std::set<entity_addr_t> old_range_set;
+  std::set<entity_addr_t> new_range_set;
 
-  old_osd_map.get_blacklist(&old_set);
-  new_osd_map.get_blacklist(&new_set);
+  old_osd_map.get_blocklist(&old_set, &old_range_set);
+  new_osd_map.get_blocklist(&new_set, &new_range_set);
 
   std::set<entity_addr_t> delta_set;
   std::set_difference(
       new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
       std::inserter(delta_set, delta_set.begin()));
-  blacklist_events.insert(delta_set.begin(), delta_set.end());
+  std::set_difference(
+      new_range_set.begin(), new_range_set.end(),
+      old_range_set.begin(), old_range_set.end(),
+      std::inserter(delta_set, delta_set.begin()));
+  blocklist_events.insert(delta_set.begin(), delta_set.end());
 }
 
 // op pool check
 
-void Objecter::C_Op_Map_Latest::finish(int r)
+void Objecter::CB_Op_Map_Latest::operator()(bs::error_code e,
+                                           version_t latest, version_t)
 {
-  if (r == -EAGAIN || r == -ECANCELED)
+  if (e == bs::errc::resource_unavailable_try_again ||
+      e == bs::errc::operation_canceled)
     return;
 
   lgeneric_subdout(objecter->cct, objecter, 10)
-    << "op_map_latest r=" << r << " tid=" << tid
+    << "op_map_latest r=" << e << " tid=" << tid
     << " latest " << latest << dendl;
 
-  Objecter::unique_lock wl(objecter->rwlock);
+  unique_lock wl(objecter->rwlock);
 
-  map<ceph_tid_t, Op*>::iterator iter =
-    objecter->check_latest_map_ops.find(tid);
+  auto iter = objecter->check_latest_map_ops.find(tid);
   if (iter == objecter->check_latest_map_ops.end()) {
     lgeneric_subdout(objecter->cct, objecter, 10)
       << "op_map_latest op "<< tid << " not found" << dendl;
@@ -1452,7 +1475,7 @@ void Objecter::C_Op_Map_Latest::finish(int r)
   if (op->map_dne_bound == 0)
     op->map_dne_bound = latest;
 
-  OSDSession::unique_lock sl(op->session->lock, defer_lock);
+  unique_lock sl(op->session->lock, defer_lock);
   objecter->_check_op_pool_dne(op, &sl);
 
   op->put();
@@ -1506,7 +1529,7 @@ int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
   const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
   if (!pi)
     return -ENOENT;
-  for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
+  for (auto p = pi->snaps.begin();
        p != pi->snaps.end();
        ++p) {
     snaps->push_back(p->first);
@@ -1515,7 +1538,7 @@ int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
 }
 
 // sl may be unlocked.
-void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
+void Objecter::_check_op_pool_dne(Op *op, std::unique_lock<std::shared_mutex> *sl)
 {
   // rwlock is locked unique
 
@@ -1538,14 +1561,15 @@ void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
       ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
                     << " concluding pool " << op->target.base_pgid.pool()
                     << " dne" << dendl;
-      if (op->onfinish) {
-       op->onfinish->complete(-ENOENT);
+      if (op->has_completion()) {
+       num_in_flight--;
+       op->complete(osdc_errc::pool_dne, -ENOENT);
       }
 
       OSDSession *s = op->session;
       if (s) {
-       assert(s != NULL);
-       assert(sl->mutex() == &s->lock);
+       ceph_assert(s != NULL);
+       ceph_assert(sl->mutex() == &s->lock);
        bool session_locked = sl->owns_lock();
        if (!session_locked) {
          sl->lock();
@@ -1563,6 +1587,37 @@ void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
   }
 }
 
+// sl may be unlocked.
+void Objecter::_check_op_pool_eio(Op *op, std::unique_lock<std::shared_mutex> *sl)
+{
+  // rwlock is locked unique
+
+  // we had a new enough map
+  ldout(cct, 10) << "check_op_pool_eio tid " << op->tid
+                << " concluding pool " << op->target.base_pgid.pool()
+                << " has eio" << dendl;
+  if (op->has_completion()) {
+    num_in_flight--;
+    op->complete(osdc_errc::pool_eio, -EIO);
+  }
+
+  OSDSession *s = op->session;
+  if (s) {
+    ceph_assert(s != NULL);
+    ceph_assert(sl->mutex() == &s->lock);
+    bool session_locked = sl->owns_lock();
+    if (!session_locked) {
+      sl->lock();
+    }
+    _finish_op(op, 0);
+    if (!session_locked) {
+      sl->unlock();
+    }
+  } else {
+    _finish_op(op, 0); // no session
+  }
+}
+
 void Objecter::_send_op_map_check(Op *op)
 {
   // rwlock is locked unique
@@ -1570,16 +1625,14 @@ void Objecter::_send_op_map_check(Op *op)
   if (check_latest_map_ops.count(op->tid) == 0) {
     op->get();
     check_latest_map_ops[op->tid] = op;
-    C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
-    monc->get_version("osdmap", &c->latest, NULL, c);
+    monc->get_version("osdmap", CB_Op_Map_Latest(this, op->tid));
   }
 }
 
 void Objecter::_op_cancel_map_check(Op *op)
 {
   // rwlock is locked unique
-  map<ceph_tid_t, Op*>::iterator iter =
-    check_latest_map_ops.find(op->tid);
+  auto iter = check_latest_map_ops.find(op->tid);
   if (iter != check_latest_map_ops.end()) {
     Op *op = iter->second;
     op->put();
@@ -1589,22 +1642,24 @@ void Objecter::_op_cancel_map_check(Op *op)
 
 // linger pool check
 
-void Objecter::C_Linger_Map_Latest::finish(int r)
+void Objecter::CB_Linger_Map_Latest::operator()(bs::error_code e,
+                                               version_t latest,
+                                               version_t)
 {
-  if (r == -EAGAIN || r == -ECANCELED) {
+  if (e == bs::errc::resource_unavailable_try_again ||
+      e == bs::errc::operation_canceled) {
     // ignore callback; we will retry in resend_mon_ops()
     return;
   }
 
   unique_lock wl(objecter->rwlock);
 
-  map<uint64_t, LingerOp*>::iterator iter =
-    objecter->check_latest_map_lingers.find(linger_id);
+  auto iter = objecter->check_latest_map_lingers.find(linger_id);
   if (iter == objecter->check_latest_map_lingers.end()) {
     return;
   }
 
-  LingerOp *op = iter->second;
+  auto op = iter->second;
   objecter->check_latest_map_lingers.erase(iter);
 
   if (op->map_dne_bound == 0)
@@ -1639,8 +1694,16 @@ void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
   }
   if (op->map_dne_bound > 0) {
     if (osdmap->get_epoch() >= op->map_dne_bound) {
+      std::unique_lock wl{op->watch_lock};
       if (op->on_reg_commit) {
-       op->on_reg_commit->complete(-ENOENT);
+       op->on_reg_commit->defer(std::move(op->on_reg_commit),
+                                osdc_errc::pool_dne, cb::list{});
+       op->on_reg_commit = nullptr;
+      }
+      if (op->on_notify_finish) {
+       op->on_notify_finish->defer(std::move(op->on_notify_finish),
+                                   osdc_errc::pool_dne, cb::list{});
+        op->on_notify_finish = nullptr;
       }
       *need_unregister = true;
     }
@@ -1649,14 +1712,30 @@ void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
   }
 }
 
+void Objecter::_check_linger_pool_eio(LingerOp *op)
+{
+  // rwlock is locked unique
+
+  std::unique_lock wl{op->watch_lock};
+  if (op->on_reg_commit) {
+    op->on_reg_commit->defer(std::move(op->on_reg_commit),
+                            osdc_errc::pool_dne, cb::list{});
+    op->on_reg_commit = nullptr;
+  }
+  if (op->on_notify_finish) {
+    op->on_notify_finish->defer(std::move(op->on_notify_finish),
+                               osdc_errc::pool_dne, cb::list{});
+    op->on_notify_finish = nullptr;
+  }
+}
+
 void Objecter::_send_linger_map_check(LingerOp *op)
 {
   // ask the monitor
   if (check_latest_map_lingers.count(op->linger_id) == 0) {
     op->get();
     check_latest_map_lingers[op->linger_id] = op;
-    C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
-    monc->get_version("osdmap", &c->latest, NULL, c);
+    monc->get_version("osdmap", CB_Linger_Map_Latest(this, op->linger_id));
   }
 }
 
@@ -1664,8 +1743,7 @@ void Objecter::_linger_cancel_map_check(LingerOp *op)
 {
   // rwlock is locked unique
 
-  map<uint64_t, LingerOp*>::iterator iter =
-    check_latest_map_lingers.find(op->linger_id);
+  auto iter = check_latest_map_lingers.find(op->linger_id);
   if (iter != check_latest_map_lingers.end()) {
     LingerOp *op = iter->second;
     op->put();
@@ -1675,28 +1753,31 @@ void Objecter::_linger_cancel_map_check(LingerOp *op)
 
 // command pool check
 
-void Objecter::C_Command_Map_Latest::finish(int r)
+void Objecter::CB_Command_Map_Latest::operator()(bs::error_code e,
+                                                version_t latest, version_t)
 {
-  if (r == -EAGAIN || r == -ECANCELED) {
+  if (e == bs::errc::resource_unavailable_try_again ||
+      e == bs::errc::operation_canceled) {
     // ignore callback; we will retry in resend_mon_ops()
     return;
   }
 
   unique_lock wl(objecter->rwlock);
 
-  map<uint64_t, CommandOp*>::iterator iter =
-    objecter->check_latest_map_commands.find(tid);
+  auto iter = objecter->check_latest_map_commands.find(tid);
   if (iter == objecter->check_latest_map_commands.end()) {
     return;
   }
 
-  CommandOp *c = iter->second;
+  auto c = iter->second;
   objecter->check_latest_map_commands.erase(iter);
 
   if (c->map_dne_bound == 0)
     c->map_dne_bound = latest;
 
+  unique_lock sul(c->session->lock);
   objecter->_check_command_map_dne(c);
+  sul.unlock();
 
   c->put();
 }
@@ -1704,6 +1785,7 @@ void Objecter::C_Command_Map_Latest::finish(int r)
 void Objecter::_check_command_map_dne(CommandOp *c)
 {
   // rwlock is locked unique
+  // session is locked unique
 
   ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
                 << " current " << osdmap->get_epoch()
@@ -1711,7 +1793,8 @@ void Objecter::_check_command_map_dne(CommandOp *c)
                 << dendl;
   if (c->map_dne_bound > 0) {
     if (osdmap->get_epoch() >= c->map_dne_bound) {
-      _finish_command(c, c->map_check_error, c->map_check_error_str);
+      _finish_command(c, osdcode(c->map_check_error),
+                     std::move(c->map_check_error_str), {});
     }
   } else {
     _send_command_map_check(c);
@@ -1721,13 +1804,13 @@ void Objecter::_check_command_map_dne(CommandOp *c)
 void Objecter::_send_command_map_check(CommandOp *c)
 {
   // rwlock is locked unique
+  // session is locked unique
 
   // ask the monitor
   if (check_latest_map_commands.count(c->tid) == 0) {
     c->get();
     check_latest_map_commands[c->tid] = c;
-    C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
-    monc->get_version("osdmap", &f->latest, NULL, f);
+    monc->get_version("osdmap", CB_Command_Map_Latest(this, c->tid));
   }
 }
 
@@ -1735,10 +1818,9 @@ void Objecter::_command_cancel_map_check(CommandOp *c)
 {
   // rwlock is locked uniqe
 
-  map<uint64_t, CommandOp*>::iterator iter =
-    check_latest_map_commands.find(c->tid);
+  auto iter = check_latest_map_commands.find(c->tid);
   if (iter != check_latest_map_commands.end()) {
-    CommandOp *c = iter->second;
+    auto c = iter->second;
     c->put();
     check_latest_map_commands.erase(iter);
   }
@@ -1751,9 +1833,10 @@ void Objecter::_command_cancel_map_check(CommandOp *c)
  * @returns 0 on success, or -EAGAIN if the lock context requires
  * promotion to write.
  */
-int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
+int Objecter::_get_session(int osd, OSDSession **session,
+                          shunique_lock<ceph::shared_mutex>& sul)
 {
-  assert(sul && sul.mutex() == &rwlock);
+  ceph_assert(sul && sul.mutex() == &rwlock);
 
   if (osd < 0) {
     *session = homeless_session;
@@ -1762,9 +1845,9 @@ int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
     return 0;
   }
 
-  map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
+  auto p = osd_sessions.find(osd);
   if (p != osd_sessions.end()) {
-    OSDSession *s = p->second;
+    auto s = p->second;
     s->get();
     *session = s;
     ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
@@ -1774,10 +1857,10 @@ int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
   if (!sul.owns_lock()) {
     return -EAGAIN;
   }
-  OSDSession *s = new OSDSession(cct, osd);
+  auto s = new OSDSession(cct, osd);
   osd_sessions[osd] = s;
-  s->con = messenger->get_connection(osdmap->get_inst(osd));
-  s->con->set_priv(s->get());
+  s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
+  s->con->set_priv(RefCountedPtr{s});
   logger->inc(l_osdc_osd_session_open);
   logger->set(l_osdc_osd_sessions, osd_sessions.size());
   s->get();
@@ -1798,7 +1881,7 @@ void Objecter::put_session(Objecter::OSDSession *s)
 
 void Objecter::get_session(Objecter::OSDSession *s)
 {
-  assert(s != NULL);
+  ceph_assert(s != NULL);
 
   if (!s->is_homeless()) {
     ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
@@ -1809,18 +1892,19 @@ void Objecter::get_session(Objecter::OSDSession *s)
 
 void Objecter::_reopen_session(OSDSession *s)
 {
+  // rwlock is locked unique
   // s->lock is locked
 
-  entity_inst_t inst = osdmap->get_inst(s->osd);
+  auto addrs = osdmap->get_addrs(s->osd);
   ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
-                << inst << dendl;
+                << addrs << dendl;
   if (s->con) {
     s->con->set_priv(NULL);
     s->con->mark_down();
     logger->inc(l_osdc_osd_session_close);
   }
-  s->con = messenger->get_connection(inst);
-  s->con->set_priv(s->get());
+  s->con = messenger->connect_to_osd(addrs);
+  s->con->set_priv(RefCountedPtr{s});
   s->incarnation++;
   logger->inc(l_osdc_osd_session_open);
 }
@@ -1835,28 +1919,28 @@ void Objecter::close_session(OSDSession *s)
     s->con->mark_down();
     logger->inc(l_osdc_osd_session_close);
   }
-  OSDSession::unique_lock sl(s->lock);
+  unique_lock sl(s->lock);
 
   std::list<LingerOp*> homeless_lingers;
   std::list<CommandOp*> homeless_commands;
   std::list<Op*> homeless_ops;
 
   while (!s->linger_ops.empty()) {
-    std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
+    auto i = s->linger_ops.begin();
     ldout(cct, 10) << " linger_op " << i->first << dendl;
     homeless_lingers.push_back(i->second);
     _session_linger_op_remove(s, i->second);
   }
 
   while (!s->ops.empty()) {
-    std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
+    auto i = s->ops.begin();
     ldout(cct, 10) << " op " << i->first << dendl;
     homeless_ops.push_back(i->second);
     _session_op_remove(s, i->second);
   }
 
   while (!s->command_ops.empty()) {
-    std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
+    auto i = s->command_ops.begin();
     ldout(cct, 10) << " command_op " << i->first << dendl;
     homeless_commands.push_back(i->second);
     _session_command_op_remove(s, i->second);
@@ -1868,16 +1952,16 @@ void Objecter::close_session(OSDSession *s)
 
   // Assign any leftover ops to the homeless session
   {
-    OSDSession::unique_lock hsl(homeless_session->lock);
-    for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
+    unique_lock hsl(homeless_session->lock);
+    for (auto i = homeless_lingers.begin();
         i != homeless_lingers.end(); ++i) {
       _session_linger_op_assign(homeless_session, *i);
     }
-    for (std::list<Op*>::iterator i = homeless_ops.begin();
+    for (auto i = homeless_ops.begin();
         i != homeless_ops.end(); ++i) {
       _session_op_assign(homeless_session, *i);
     }
-    for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
+    for (auto i = homeless_commands.begin();
         i != homeless_commands.end(); ++i) {
       _session_command_op_assign(homeless_session, *i);
     }
@@ -1886,71 +1970,37 @@ void Objecter::close_session(OSDSession *s)
   logger->set(l_osdc_osd_sessions, osd_sessions.size());
 }
 
-void Objecter::wait_for_osd_map()
+void Objecter::wait_for_osd_map(epoch_t e)
 {
   unique_lock l(rwlock);
-  if (osdmap->get_epoch()) {
+  if (osdmap->get_epoch() >= e) {
     l.unlock();
     return;
   }
 
-  // Leave this since it goes with C_SafeCond
-  Mutex lock("");
-  Cond cond;
-  bool done;
-  lock.Lock();
-  C_SafeCond *context = new C_SafeCond(&lock, &cond, &done, NULL);
-  waiting_for_map[0].push_back(pair<Context*, int>(context, 0));
+  ca::waiter<bs::error_code> w;
+  waiting_for_map[e].emplace_back(OpCompletion::create(
+                                   service.get_executor(),
+                                   w.ref()),
+                                 bs::error_code{});
   l.unlock();
-  while (!done)
-    cond.Wait(lock);
-  lock.Unlock();
-}
-
-struct C_Objecter_GetVersion : public Context {
-  Objecter *objecter;
-  uint64_t oldest, newest;
-  Context *fin;
-  C_Objecter_GetVersion(Objecter *o, Context *c)
-    : objecter(o), oldest(0), newest(0), fin(c) {}
-  void finish(int r) override {
-    if (r >= 0) {
-      objecter->get_latest_version(oldest, newest, fin);
-    } else if (r == -EAGAIN) { // try again as instructed
-      objecter->wait_for_latest_osdmap(fin);
-    } else {
-      // it doesn't return any other error codes!
-      ceph_abort();
-    }
-  }
-};
-
-void Objecter::wait_for_latest_osdmap(Context *fin)
-{
-  ldout(cct, 10) << __func__ << dendl;
-  C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
-  monc->get_version("osdmap", &c->newest, &c->oldest, c);
-}
-
-void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
-{
-  unique_lock wl(rwlock);
-  _get_latest_version(oldest, newest, fin);
+  w.wait();
 }
 
 void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
-                                  Context *fin)
+                                  std::unique_ptr<OpCompletion> fin,
+                                  std::unique_lock<ceph::shared_mutex>&& l)
 {
-  // rwlock is locked unique
+  ceph_assert(fin);
   if (osdmap->get_epoch() >= newest) {
-  ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
-    if (fin)
-      fin->complete(0);
-    return;
+    ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
+    l.unlock();
+    ca::defer(std::move(fin), bs::error_code{});
+  } else {
+    ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
+    _wait_for_new_map(std::move(fin), newest, bs::error_code{});
+    l.unlock();
   }
-
-  ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
-  _wait_for_new_map(fin, newest, 0);
 }
 
 void Objecter::maybe_request_map()
@@ -1979,10 +2029,11 @@ void Objecter::_maybe_request_map()
   }
 }
 
-void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
+void Objecter::_wait_for_new_map(std::unique_ptr<OpCompletion> c, epoch_t epoch,
+                                bs::error_code ec)
 {
   // rwlock is locked unique
-  waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
+  waiting_for_map[epoch].emplace_back(std::move(c), ec);
   _maybe_request_map();
 }
 
@@ -2007,30 +2058,6 @@ bool Objecter::have_map(const epoch_t epoch)
   }
 }
 
-bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
-{
-  unique_lock wl(rwlock);
-  if (osdmap->get_epoch() >= epoch) {
-    return true;
-  }
-  _wait_for_new_map(c, epoch, err);
-  return false;
-}
-
-void Objecter::kick_requests(OSDSession *session)
-{
-  ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
-
-  map<uint64_t, LingerOp *> lresend;
-  unique_lock wl(rwlock);
-
-  OSDSession::unique_lock sl(session->lock);
-  _kick_requests(session, lresend);
-  sl.unlock();
-
-  _linger_ops_resend(lresend, wl);
-}
-
 void Objecter::_kick_requests(OSDSession *session,
                              map<uint64_t, LingerOp *>& lresend)
 {
@@ -2042,11 +2069,9 @@ void Objecter::_kick_requests(OSDSession *session,
 
   // resend ops
   map<ceph_tid_t,Op*> resend;  // resend in tid order
-  for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
-       p != session->ops.end();) {
+  for (auto p = session->ops.begin(); p != session->ops.end();) {
     Op *op = p->second;
     ++p;
-    logger->inc(l_osdc_op_resend);
     if (op->should_resend) {
       if (!op->target.paused)
        resend[op->tid] = op;
@@ -2056,26 +2081,27 @@ void Objecter::_kick_requests(OSDSession *session,
     }
   }
 
+  logger->inc(l_osdc_op_resend, resend.size());
   while (!resend.empty()) {
     _send_op(resend.begin()->second);
     resend.erase(resend.begin());
   }
 
   // resend lingers
-  for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
+  logger->inc(l_osdc_linger_resend, session->linger_ops.size());
+  for (auto j = session->linger_ops.begin();
        j != session->linger_ops.end(); ++j) {
     LingerOp *op = j->second;
     op->get();
-    logger->inc(l_osdc_linger_resend);
-    assert(lresend.count(j->first) == 0);
+    ceph_assert(lresend.count(j->first) == 0);
     lresend[j->first] = op;
   }
 
   // resend commands
+  logger->inc(l_osdc_command_resend, session->command_ops.size());
   map<uint64_t,CommandOp*> cresend;  // resend in order
-  for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
+  for (auto k = session->command_ops.begin();
        k != session->command_ops.end(); ++k) {
-    logger->inc(l_osdc_command_resend);
     cresend[k->first] = k->second;
   }
   while (!cresend.empty()) {
@@ -2085,9 +2111,9 @@ void Objecter::_kick_requests(OSDSession *session,
 }
 
 void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
-                                 unique_lock& ul)
+                                 unique_lock<ceph::shared_mutex>& ul)
 {
-  assert(ul.owns_lock());
+  ceph_assert(ul.owns_lock());
   shunique_lock sul(std::move(ul));
   while (!lresend.empty()) {
     LingerOp *op = lresend.begin()->second;
@@ -2097,12 +2123,12 @@ void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
     op->put();
     lresend.erase(lresend.begin());
   }
-  ul = unique_lock(sul.release_to_unique());
+  ul = sul.release_to_unique();
 }
 
 void Objecter::start_tick()
 {
-  assert(tick_event == 0);
+  ceph_assert(tick_event == 0);
   tick_event =
     timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
                    &Objecter::tick, this);
@@ -2127,21 +2153,19 @@ void Objecter::tick()
 
 
   // look for laggy requests
-  auto cutoff = ceph::mono_clock::now();
+  auto cutoff = ceph::coarse_mono_clock::now();
   cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout);  // timeout
 
   unsigned laggy_ops = 0;
 
-  for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
+  for (auto siter = osd_sessions.begin();
        siter != osd_sessions.end(); ++siter) {
-    OSDSession *s = siter->second;
-    OSDSession::lock_guard l(s->lock);
+    auto s = siter->second;
+    scoped_lock l(s->lock);
     bool found = false;
-    for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
-       p != s->ops.end();
-       ++p) {
-      Op *op = p->second;
-      assert(op->session);
+    for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
+      auto op = p->second;
+      ceph_assert(op->session);
       if (op->stamp < cutoff) {
        ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
                      << " is laggy" << dendl;
@@ -2149,23 +2173,23 @@ void Objecter::tick()
        ++laggy_ops;
       }
     }
-    for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
+    for (auto p = s->linger_ops.begin();
        p != s->linger_ops.end();
        ++p) {
-      LingerOp *op = p->second;
-      LingerOp::unique_lock wl(op->watch_lock);
-      assert(op->session);
+      auto op = p->second;
+      std::unique_lock wl(op->watch_lock);
+      ceph_assert(op->session);
       ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
                     << " (osd." << op->session->osd << ")" << dendl;
       found = true;
       if (op->is_watch && op->registered && !op->last_error)
        _send_linger_ping(op);
     }
-    for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
+    for (auto p = s->command_ops.begin();
        p != s->command_ops.end();
        ++p) {
-      CommandOp *op = p->second;
-      assert(op->session);
+      auto op = p->second;
+      ceph_assert(op->session);
       ldout(cct, 10) << " pinging osd that serves command tid " << p->first
                     << " (osd." << op->session->osd << ")" << dendl;
       found = true;
@@ -2183,14 +2207,12 @@ void Objecter::tick()
   if (!toping.empty()) {
     // send a ping to these osds, to ensure we detect any session resets
     // (osd reply message policy is lossy)
-    for (set<OSDSession*>::const_iterator i = toping.begin();
-        i != toping.end();
-        ++i) {
+    for (auto i = toping.begin(); i != toping.end(); ++i) {
       (*i)->con->send_message(new MPing);
     }
   }
 
-  // Make sure we don't resechedule if we wake up after shutdown
+  // Make sure we don't reschedule if we wake up after shutdown
   if (initialized) {
     tick_event = timer.reschedule_me(ceph::make_timespan(
                                       cct->_conf->objecter_tick_interval));
@@ -2203,48 +2225,37 @@ void Objecter::resend_mon_ops()
 
   ldout(cct, 10) << "resend_mon_ops" << dendl;
 
-  for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
-       p != poolstat_ops.end();
-       ++p) {
+  for (auto p = poolstat_ops.begin(); p != poolstat_ops.end(); ++p) {
     _poolstat_submit(p->second);
     logger->inc(l_osdc_poolstat_resend);
   }
 
-  for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
-       p != statfs_ops.end();
-       ++p) {
+  for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
     _fs_stats_submit(p->second);
     logger->inc(l_osdc_statfs_resend);
   }
 
-  for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
-       p != pool_ops.end();
-       ++p) {
+  for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
     _pool_op_submit(p->second);
     logger->inc(l_osdc_poolop_resend);
   }
 
-  for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
+  for (auto p = check_latest_map_ops.begin();
        p != check_latest_map_ops.end();
        ++p) {
-    C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
-    monc->get_version("osdmap", &c->latest, NULL, c);
+    monc->get_version("osdmap", CB_Op_Map_Latest(this, p->second->tid));
   }
 
-  for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
+  for (auto p = check_latest_map_lingers.begin();
        p != check_latest_map_lingers.end();
        ++p) {
-    C_Linger_Map_Latest *c
-      = new C_Linger_Map_Latest(this, p->second->linger_id);
-    monc->get_version("osdmap", &c->latest, NULL, c);
+    monc->get_version("osdmap", CB_Linger_Map_Latest(this, p->second->linger_id));
   }
 
-  for (map<uint64_t, CommandOp*>::iterator p
-        = check_latest_map_commands.begin();
+  for (auto p = check_latest_map_commands.begin();
        p != check_latest_map_commands.end();
        ++p) {
-    C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
-    monc->get_version("osdmap", &c->latest, NULL, c);
+    monc->get_version("osdmap", CB_Command_Map_Latest(this, p->second->tid));
   }
 }
 
@@ -2260,15 +2271,16 @@ void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
   _op_submit_with_budget(op, rl, ptid, ctx_budget);
 }
 
-void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
+void Objecter::_op_submit_with_budget(Op *op,
+                                     shunique_lock<ceph::shared_mutex>& sul,
                                      ceph_tid_t *ptid,
                                      int *ctx_budget)
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
-  assert(op->ops.size() == op->out_bl.size());
-  assert(op->ops.size() == op->out_rval.size());
-  assert(op->ops.size() == op->out_handler.size());
+  ceph_assert(op->ops.size() == op->out_bl.size());
+  ceph_assert(op->ops.size() == op->out_rval.size());
+  ceph_assert(op->ops.size() == op->out_handler.size());
 
   // throttle.  before we look at any state, because
   // _take_op_budget() may drop our lock while it blocks.
@@ -2298,7 +2310,7 @@ void Objecter::_send_op_account(Op *op)
   inflight_ops++;
 
   // add to gather set(s)
-  if (op->onfinish) {
+  if (op->has_completion()) {
     num_in_flight++;
   } else {
     ldout(cct, 20) << " note: not requesting reply" << dendl;
@@ -2306,6 +2318,7 @@ void Objecter::_send_op_account(Op *op)
 
   logger->inc(l_osdc_op_active);
   logger->inc(l_osdc_op);
+  logger->inc(l_osdc_oplen_avg, op->ops.size());
 
   if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
       (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
@@ -2318,7 +2331,7 @@ void Objecter::_send_op_account(Op *op)
   if (op->target.flags & CEPH_OSD_FLAG_PGOP)
     logger->inc(l_osdc_op_pg);
 
-  for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
+  for (auto p = op->ops.begin(); p != op->ops.end(); ++p) {
     int code = l_osdc_osdop_other;
     switch (p->op.op) {
     case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
@@ -2338,9 +2351,6 @@ void Objecter::_send_op_account(Op *op)
     case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
     case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
     case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
-    case CEPH_OSD_OP_TMAPUP: code = l_osdc_osdop_tmap_up; break;
-    case CEPH_OSD_OP_TMAPPUT: code = l_osdc_osdop_tmap_put; break;
-    case CEPH_OSD_OP_TMAPGET: code = l_osdc_osdop_tmap_get; break;
 
     // OMAP read operations
     case CEPH_OSD_OP_OMAPGETVALS:
@@ -2366,23 +2376,34 @@ void Objecter::_send_op_account(Op *op)
   }
 }
 
-void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
+void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid)
 {
   // rwlock is locked
 
   ldout(cct, 10) << __func__ << " op " << op << dendl;
 
   // pick target
-  assert(op->session == NULL);
+  ceph_assert(op->session == NULL);
   OSDSession *s = NULL;
 
-  bool check_for_latest_map = _calc_target(&op->target, nullptr)
-    == RECALC_OP_TARGET_POOL_DNE;
+  bool check_for_latest_map = false;
+  int r = _calc_target(&op->target, nullptr);
+  switch(r) {
+  case RECALC_OP_TARGET_POOL_DNE:
+    check_for_latest_map = true;
+    break;
+  case RECALC_OP_TARGET_POOL_EIO:
+    if (op->has_completion()) {
+      op->complete(osdc_errc::pool_eio, -EIO);
+    }
+    return;
+  }
 
   // Try to get a session, including a retry if we need to take write lock
-  int r = _get_session(op->target.osd, &s, sul);
+  r = _get_session(op->target.osd, &s, sul);
   if (r == -EAGAIN ||
-      (check_for_latest_map && sul.owns_lock_shared())) {
+      (check_for_latest_map && sul.owns_lock_shared()) ||
+      cct->_conf->objecter_debug_inject_relock_delay) {
     epoch_t orig_epoch = osdmap->get_epoch();
     sul.unlock();
     if (cct->_conf->objecter_debug_inject_relock_delay) {
@@ -2403,47 +2424,22 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
     }
   }
   if (r == -EAGAIN) {
-    assert(s == NULL);
+    ceph_assert(s == NULL);
     r = _get_session(op->target.osd, &s, sul);
   }
-  assert(r == 0);
-  assert(s);  // may be homeless
+  ceph_assert(r == 0);
+  ceph_assert(s);  // may be homeless
 
   _send_op_account(op);
 
   // send?
 
-  assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
-
-  if (osdmap_full_try) {
-    op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
-  }
+  ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
 
   bool need_send = false;
-
-  if (osdmap->get_epoch() < epoch_barrier) {
-    ldout(cct, 10) << " barrier, paused " << op << " tid " << op->tid
+  if (op->target.paused) {
+    ldout(cct, 10) << " tid " << op->tid << " op " << op << " is paused"
                   << dendl;
-    op->target.paused = true;
-    _maybe_request_map();
-  } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
-             osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
-    ldout(cct, 10) << " paused modify " << op << " tid " << op->tid
-                  << dendl;
-    op->target.paused = true;
-    _maybe_request_map();
-  } else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
-            osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
-    ldout(cct, 10) << " paused read " << op << " tid " << op->tid
-                  << dendl;
-    op->target.paused = true;
-    _maybe_request_map();
-  } else if (op->respects_full() &&
-            (_osdmap_full_flag() ||
-             _osdmap_pool_full(op->target.base_oloc.pool))) {
-    ldout(cct, 0) << " FULL, paused modify " << op << " tid "
-                 << op->tid << dendl;
-    op->target.paused = true;
     _maybe_request_map();
   } else if (!s->is_homeless()) {
     need_send = true;
@@ -2451,12 +2447,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
     _maybe_request_map();
   }
 
-  MOSDOp *m = NULL;
-  if (need_send) {
-    m = _prepare_osd_op(op);
-  }
-
-  OSDSession::unique_lock sl(s->lock);
+  unique_lock sl(s->lock);
   if (op->tid == 0)
     op->tid = ++last_tid;
 
@@ -2469,7 +2460,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
   _session_op_assign(s, op);
 
   if (need_send) {
-    _send_op(op, m);
+    _send_op(op);
   }
 
   // Last chance to touch Op here, after giving up session lock it can
@@ -2490,30 +2481,31 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
 
 int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
-  OSDSession::unique_lock sl(s->lock);
+  unique_lock sl(s->lock);
 
-  map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
+  auto p = s->ops.find(tid);
   if (p == s->ops.end()) {
     ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
                   << s->osd << dendl;
     return -ENOENT;
   }
 
+#if 0
   if (s->con) {
-    ldout(cct, 20) << " revoking rx buffer for " << tid
+    ldout(cct, 20) << " revoking rx ceph::buffer for " << tid
                   << " on " << s->con << dendl;
     s->con->revoke_rx_buffer(tid);
   }
+#endif
 
   ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
                 << dendl;
   Op *op = p->second;
-  if (op->onfinish) {
+  if (op->has_completion()) {
     num_in_flight--;
-    op->onfinish->complete(r);
-    op->onfinish = NULL;
+    op->complete(osdcode(r), r);
   }
   _op_cancel_map_check(op);
   _finish_op(op, r);
@@ -2532,6 +2524,16 @@ int Objecter::op_cancel(ceph_tid_t tid, int r)
   return ret;
 }
 
+int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
+{
+  unique_lock wl(rwlock);
+  ldout(cct,10) << __func__ << " " << tids << dendl;
+  for (auto tid : tids) {
+    _op_cancel(tid, r);
+  }
+  return 0;
+}
+
 int Objecter::_op_cancel(ceph_tid_t tid, int r)
 {
   int ret = 0;
@@ -2541,10 +2543,10 @@ int Objecter::_op_cancel(ceph_tid_t tid, int r)
 
 start:
 
-  for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
+  for (auto siter = osd_sessions.begin();
        siter != osd_sessions.end(); ++siter) {
     OSDSession *s = siter->second;
-    OSDSession::shared_lock sl(s->lock);
+    shared_lock sl(s->lock);
     if (s->ops.find(tid) != s->ops.end()) {
       sl.unlock();
       ret = op_cancel(s, tid, r);
@@ -2560,7 +2562,7 @@ start:
                << " not found in live sessions" << dendl;
 
   // Handle case where the op is in homeless session
-  OSDSession::shared_lock sl(homeless_session->lock);
+  shared_lock sl(homeless_session->lock);
   if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
     sl.unlock();
     ret = op_cancel(homeless_session, tid, r);
@@ -2588,11 +2590,11 @@ epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
   std::vector<ceph_tid_t> to_cancel;
   bool found = false;
 
-  for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
+  for (auto siter = osd_sessions.begin();
        siter != osd_sessions.end(); ++siter) {
     OSDSession *s = siter->second;
-    OSDSession::shared_lock sl(s->lock);
-    for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
+    shared_lock sl(s->lock);
+    for (auto op_i = s->ops.begin();
         op_i != s->ops.end(); ++op_i) {
       if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
          && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
@@ -2601,13 +2603,11 @@ epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
     }
     sl.unlock();
 
-    for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
-        titer != to_cancel.end();
-        ++titer) {
+    for (auto titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) {
       int cancel_result = op_cancel(s, *titer, r);
       // We hold rwlock across search and cancellation, so cancels
       // should always succeed
-      assert(cancel_result == 0);
+      ceph_assert(cancel_result == 0);
     }
     if (!found && to_cancel.size())
       found = true;
@@ -2632,7 +2632,7 @@ bool Objecter::is_pg_changed(
   const vector<int>& newacting,
   bool any_change)
 {
-  if (OSDMap::primary_changed(
+  if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213
        oldprimary,
        oldacting,
        newprimary,
@@ -2648,7 +2648,7 @@ bool Objecter::target_should_be_paused(op_target_t *t)
   const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
   bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
   bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
-    _osdmap_full_flag() || _osdmap_pool_full(*pi);
+    (t->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi)));
 
   return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
     (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
@@ -2689,8 +2689,7 @@ bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
 
 bool Objecter::_osdmap_has_pool_full() const
 {
-  for (map<int64_t, pg_pool_t>::const_iterator it
-        = osdmap->get_pools().begin();
+  for (auto it = osdmap->get_pools().begin();
        it != osdmap->get_pools().end(); ++it) {
     if (_osdmap_pool_full(it->second))
       return true;
@@ -2698,18 +2697,13 @@ bool Objecter::_osdmap_has_pool_full() const
   return false;
 }
 
-bool Objecter::_osdmap_pool_full(const pg_pool_t &p) const
-{
-  return p.has_flag(pg_pool_t::FLAG_FULL) && honor_osdmap_full;
-}
-
 /**
  * Wrapper around osdmap->test_flag for special handling of the FULL flag.
  */
 bool Objecter::_osdmap_full_flag() const
 {
-  // Ignore the FULL flag if the caller has honor_osdmap_full
-  return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
+  // Ignore the FULL flag if the caller does not have honor_osdmap_full
+  return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_pool_full;
 }
 
 void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
@@ -2746,6 +2740,34 @@ int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
   return p->raw_hash_to_pg(p->hash_key(key, ns));
 }
 
+void Objecter::_prune_snapc(
+  const mempool::osdmap::map<int64_t,
+  snap_interval_set_t>& new_removed_snaps,
+  Op *op)
+{
+  bool match = false;
+  auto i = new_removed_snaps.find(op->target.base_pgid.pool());
+  if (i != new_removed_snaps.end()) {
+    for (auto s : op->snapc.snaps) {
+      if (i->second.contains(s)) {
+       match = true;
+       break;
+      }
+    }
+    if (match) {
+      vector<snapid_t> new_snaps;
+      for (auto s : op->snapc.snaps) {
+       if (!i->second.contains(s)) {
+         new_snaps.push_back(s);
+       }
+      }
+      op->snapc.snaps.swap(new_snaps);
+      ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
+                   << " (was " << new_snaps << ")" << dendl;
+    }
+  }
+}
+
 int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
 {
   // rwlock is locked
@@ -2765,6 +2787,11 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
     t->osd = -1;
     return RECALC_OP_TARGET_POOL_DNE;
   }
+
+  if (pi->has_flag(pg_pool_t::FLAG_EIO)) {
+    return RECALC_OP_TARGET_POOL_EIO;
+  }
+
   ldout(cct,30) << __func__ << "  base pi " << pi
                << " pg_num " << pi->get_pg_num() << dendl;
 
@@ -2795,9 +2822,9 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
 
   pg_t pgid;
   if (t->precalc_pgid) {
-    assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
-    assert(t->base_oid.name.empty()); // make sure this is a pg op
-    assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
+    ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
+    ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
+    ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
     pgid = t->base_pgid;
   } else {
     int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
@@ -2816,10 +2843,20 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
   int size = pi->size;
   int min_size = pi->min_size;
   unsigned pg_num = pi->get_pg_num();
+  unsigned pg_num_mask = pi->get_pg_num_mask();
+  unsigned pg_num_pending = pi->get_pg_num_pending();
   int up_primary, acting_primary;
   vector<int> up, acting;
-  osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
-                              &acting, &acting_primary);
+  ps_t actual_ps = ceph_stable_mod(pgid.ps(), pg_num, pg_num_mask);
+  pg_t actual_pgid(actual_ps, pgid.pool());
+  if (!lookup_pg_mapping(actual_pgid, osdmap->get_epoch(), &up, &up_primary,
+                         &acting, &acting_primary)) {
+    osdmap->pg_to_up_acting_osds(actual_pgid, &up, &up_primary,
+                                 &acting, &acting_primary);
+    pg_mapping_t pg_mapping(osdmap->get_epoch(),
+                            up, up_primary, acting, acting_primary);
+    update_pg_mapping(actual_pgid, std::move(pg_mapping));
+  }
   bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
   bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
   unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
@@ -2839,73 +2876,102 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
        min_size,
        t->pg_num,
        pg_num,
+       t->pg_num_pending,
+       pg_num_pending,
        t->sort_bitwise,
        sort_bitwise,
        t->recovery_deletes,
        recovery_deletes,
+       t->peering_crush_bucket_count,
+       pi->peering_crush_bucket_count,
+       t->peering_crush_bucket_target,
+       pi->peering_crush_bucket_target,
+       t->peering_crush_bucket_barrier,
+       pi->peering_crush_bucket_barrier,
+       t->peering_crush_mandatory_member,
+       pi->peering_crush_mandatory_member,
        prev_pgid)) {
     force_resend = true;
   }
 
   bool unpaused = false;
-  if (t->paused && !target_should_be_paused(t)) {
-    t->paused = false;
+  bool should_be_paused = target_should_be_paused(t);
+  if (t->paused && !should_be_paused) {
     unpaused = true;
   }
+  if (t->paused != should_be_paused) {
+    ldout(cct, 10) << __func__ << " paused " << t->paused
+                  << " -> " << should_be_paused << dendl;
+    t->paused = should_be_paused;
+  }
 
   bool legacy_change =
     t->pgid != pgid ||
       is_pg_changed(
        t->acting_primary, t->acting, acting_primary, acting,
        t->used_replica || any_change);
-  bool split = false;
+  bool split_or_merge = false;
   if (t->pg_num) {
-    split = prev_pgid.is_split(t->pg_num, pg_num, nullptr);
+    split_or_merge =
+      prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
+      prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
+      prev_pgid.is_merge_target(t->pg_num, pg_num);
   }
 
-  if (legacy_change || split || force_resend) {
+  if (legacy_change || split_or_merge || force_resend) {
     t->pgid = pgid;
-    t->acting = acting;
+    t->acting = std::move(acting);
     t->acting_primary = acting_primary;
     t->up_primary = up_primary;
-    t->up = up;
+    t->up = std::move(up);
     t->size = size;
     t->min_size = min_size;
     t->pg_num = pg_num;
-    t->pg_num_mask = pi->get_pg_num_mask();
-    osdmap->get_primary_shard(
-      pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
-      &t->actual_pgid);
+    t->pg_num_mask = pg_num_mask;
+    t->pg_num_pending = pg_num_pending;
+    spg_t spgid(actual_pgid);
+    if (pi->is_erasure()) {
+      for (uint8_t i = 0; i < t->acting.size(); ++i) {
+        if (t->acting[i] == acting_primary) {
+          spgid.reset_shard(shard_id_t(i));
+          break;
+        }
+      }
+    }
+    t->actual_pgid = spgid;
     t->sort_bitwise = sort_bitwise;
     t->recovery_deletes = recovery_deletes;
+    t->peering_crush_bucket_count = pi->peering_crush_bucket_count;
+    t->peering_crush_bucket_target = pi->peering_crush_bucket_target;
+    t->peering_crush_bucket_barrier = pi->peering_crush_bucket_barrier;
+    t->peering_crush_mandatory_member = pi->peering_crush_mandatory_member;
     ldout(cct, 10) << __func__ << " "
                   << " raw pgid " << pgid << " -> actual " << t->actual_pgid
-                  << " acting " << acting
+                  << " acting " << t->acting
                   << " primary " << acting_primary << dendl;
     t->used_replica = false;
-    if (acting_primary == -1) {
-      t->osd = -1;
-    } else {
+    if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
+                     CEPH_OSD_FLAG_LOCALIZE_READS)) &&
+        !is_write && pi->is_replicated() && t->acting.size() > 1) {
       int osd;
-      bool read = is_read && !is_write;
-      if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
-       int p = rand() % acting.size();
+      ceph_assert(is_read && t->acting[0] == acting_primary);
+      if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
+       int p = rand() % t->acting.size();
        if (p)
          t->used_replica = true;
-       osd = acting[p];
-       ldout(cct, 10) << " chose random osd." << osd << " of " << acting
+       osd = t->acting[p];
+       ldout(cct, 10) << " chose random osd." << osd << " of " << t->acting
                       << dendl;
-      } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
-                acting.size() > 1) {
+      } else {
        // look for a local replica.  prefer the primary if the
        // distance is the same.
        int best = -1;
        int best_locality = 0;
-       for (unsigned i = 0; i < acting.size(); ++i) {
+       for (unsigned i = 0; i < t->acting.size(); ++i) {
          int locality = osdmap->crush->get_common_ancestor_distance(
-                cct, acting[i], crush_location);
+                cct, t->acting[i], crush_location);
          ldout(cct, 20) << __func__ << " localize: rank " << i
-                        << " osd." << acting[i]
+                        << " osd." << t->acting[i]
                         << " locality " << locality << dendl;
          if (i == 0 ||
              (locality >= 0 && best_locality >= 0 &&
@@ -2917,25 +2983,28 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
              t->used_replica = true;
          }
        }
-       assert(best >= 0);
-       osd = acting[best];
-      } else {
-       osd = acting_primary;
+       ceph_assert(best >= 0);
+       osd = t->acting[best];
       }
       t->osd = osd;
+    } else {
+      t->osd = acting_primary;
     }
   }
   if (legacy_change || unpaused || force_resend) {
     return RECALC_OP_TARGET_NEED_RESEND;
   }
-  if (split && con && con->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT)) {
+  if (split_or_merge &&
+      (osdmap->require_osd_release >= ceph_release_t::luminous ||
+       HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
+                   RESEND_ON_SPLIT))) {
     return RECALC_OP_TARGET_NEED_RESEND;
   }
   return RECALC_OP_TARGET_NO_ACTION;
 }
 
 int Objecter::_map_session(op_target_t *target, OSDSession **s,
-                          shunique_lock& sul)
+                          shunique_lock<ceph::shared_mutex>& sul)
 {
   _calc_target(target, nullptr);
   return _get_session(target->osd, s, sul);
@@ -2944,8 +3013,8 @@ int Objecter::_map_session(op_target_t *target, OSDSession **s,
 void Objecter::_session_op_assign(OSDSession *to, Op *op)
 {
   // to->lock is locked
-  assert(op->session == NULL);
-  assert(op->tid);
+  ceph_assert(op->session == NULL);
+  ceph_assert(op->tid);
 
   get_session(to);
   op->session = to;
@@ -2960,7 +3029,7 @@ void Objecter::_session_op_assign(OSDSession *to, Op *op)
 
 void Objecter::_session_op_remove(OSDSession *from, Op *op)
 {
-  assert(op->session == from);
+  ceph_assert(op->session == from);
   // from->lock is locked
 
   if (from->is_homeless()) {
@@ -2977,7 +3046,7 @@ void Objecter::_session_op_remove(OSDSession *from, Op *op)
 void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
 {
   // to lock is locked unique
-  assert(op->session == NULL);
+  ceph_assert(op->session == NULL);
 
   if (to->is_homeless()) {
     num_homeless_ops++;
@@ -2993,7 +3062,7 @@ void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
 
 void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
 {
-  assert(from == op->session);
+  ceph_assert(from == op->session);
   // from->lock is locked unique
 
   if (from->is_homeless()) {
@@ -3010,7 +3079,7 @@ void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
 
 void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
 {
-  assert(from == op->session);
+  ceph_assert(from == op->session);
   // from->lock is locked
 
   if (from->is_homeless()) {
@@ -3027,8 +3096,8 @@ void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
 void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
 {
   // to->lock is locked
-  assert(op->session == NULL);
-  assert(op->tid);
+  ceph_assert(op->session == NULL);
+  ceph_assert(op->tid);
 
   if (to->is_homeless()) {
     num_homeless_ops++;
@@ -3042,7 +3111,7 @@ void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
 }
 
 int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
-                                      shunique_lock& sul)
+                                      shunique_lock<ceph::shared_mutex>& sul)
 {
   // rwlock is locked unique
 
@@ -3054,14 +3123,15 @@ int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
 
     OSDSession *s = NULL;
     r = _get_session(linger_op->target.osd, &s, sul);
-    assert(r == 0);
+    ceph_assert(r == 0);
 
     if (linger_op->session != s) {
       // NB locking two sessions (s and linger_op->session) at the
       // same time here is only safe because we are the only one that
-      // takes two, and we are holding rwlock for write.  Disable
-      // lockdep because it doesn't know that.
-      OSDSession::unique_lock sl(s->lock);
+      // takes two, and we are holding rwlock for write.  We use
+      // std::shared_mutex in OSDSession because lockdep doesn't know
+      // that.
+      unique_lock sl(s->lock);
       _session_linger_op_remove(linger_op->session, linger_op);
       _session_linger_op_assign(s, linger_op);
     }
@@ -3076,9 +3146,9 @@ void Objecter::_cancel_linger_op(Op *op)
 {
   ldout(cct, 15) << "cancel_op " << op->tid << dendl;
 
-  assert(!op->should_resend);
-  if (op->onfinish) {
-    delete op->onfinish;
+  ceph_assert(!op->should_resend);
+  if (op->has_completion()) {
+    op->onfinish = nullptr;
     num_in_flight--;
   }
 
@@ -3087,12 +3157,14 @@ void Objecter::_cancel_linger_op(Op *op)
 
 void Objecter::_finish_op(Op *op, int r)
 {
-  ldout(cct, 15) << "finish_op " << op->tid << dendl;
+  ldout(cct, 15) << __func__ << " " << op->tid << dendl;
 
   // op->session->lock is locked unique or op->session is null
 
-  if (!op->ctx_budgeted && op->budgeted)
-    put_op_budget(op);
+  if (!op->ctx_budgeted && op->budget >= 0) {
+    put_op_budget_bytes(op->budget);
+    op->budget = -1;
+  }
 
   if (op->ontimeout && r != -ETIMEDOUT)
     timer.cancel_event(op->ontimeout);
@@ -3103,51 +3175,36 @@ void Objecter::_finish_op(Op *op, int r)
 
   logger->dec(l_osdc_op_active);
 
-  assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
+  ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
 
   inflight_ops--;
 
   op->put();
 }
 
-void Objecter::finish_op(OSDSession *session, ceph_tid_t tid)
-{
-  ldout(cct, 15) << "finish_op " << tid << dendl;
-  shared_lock rl(rwlock);
-
-  OSDSession::unique_lock wl(session->lock);
-
-  map<ceph_tid_t, Op *>::iterator iter = session->ops.find(tid);
-  if (iter == session->ops.end())
-    return;
-
-  Op *op = iter->second;
-
-  _finish_op(op, 0);
-}
-
-MOSDOp *Objecter::_prepare_osd_op(Op *op)
+Objecter::MOSDOp *Objecter::_prepare_osd_op(Op *op)
 {
   // rwlock is locked
 
   int flags = op->target.flags;
   flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
+  flags |= CEPH_OSD_FLAG_SUPPORTSPOOLEIO;
 
   // Nothing checks this any longer, but needed for compatibility with
   // pre-luminous osds
   flags |= CEPH_OSD_FLAG_ONDISK;
 
-  if (!honor_osdmap_full)
+  if (!honor_pool_full)
     flags |= CEPH_OSD_FLAG_FULL_FORCE;
 
   op->target.paused = false;
-  op->stamp = ceph::mono_clock::now();
+  op->stamp = ceph::coarse_mono_clock::now();
 
   hobject_t hobj = op->target.get_hobj();
-  MOSDOp *m = new MOSDOp(client_inc, op->tid,
-                        hobj, op->target.actual_pgid,
-                        osdmap->get_epoch(),
-                        flags, op->features);
+  auto m = new MOSDOp(client_inc, op->tid,
+                     hobj, op->target.actual_pgid,
+                     osdmap->get_epoch(),
+                     flags, op->features);
 
   m->set_snapid(op->snapid);
   m->set_snap_seq(op->snapc.seq);
@@ -3171,20 +3228,24 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op)
   }
 
   logger->inc(l_osdc_op_send);
-  logger->inc(l_osdc_op_send_bytes, m->get_data().length());
+  ssize_t sum = 0;
+  for (unsigned i = 0; i < m->ops.size(); i++) {
+    sum += m->ops[i].indata.length();
+  }
+  logger->inc(l_osdc_op_send_bytes, sum);
 
   return m;
 }
 
-void Objecter::_send_op(Op *op, MOSDOp *m)
+void Objecter::_send_op(Op *op)
 {
   // rwlock is locked
   // op->session->lock is locked
 
   // backoff?
-  hobject_t hoid = op->target.get_hobj();
   auto p = op->session->backoffs.find(op->target.actual_pgid);
   if (p != op->session->backoffs.end()) {
+    hobject_t hoid = op->target.get_hobj();
     auto q = p->second.lower_bound(hoid);
     if (q != p->second.begin()) {
       --q;
@@ -3205,10 +3266,8 @@ void Objecter::_send_op(Op *op, MOSDOp *m)
     }
   }
 
-  if (!m) {
-    assert(op->tid > 0);
-    m = _prepare_osd_op(op);
-  }
+  ceph_assert(op->tid > 0);
+  MOSDOp *m = _prepare_osd_op(op);
 
   if (op->target.actual_pgid != m->get_spg()) {
     ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
@@ -3223,47 +3282,46 @@ void Objecter::_send_op(Op *op, MOSDOp *m)
                 << dendl;
 
   ConnectionRef con = op->session->con;
-  assert(con);
+  ceph_assert(con);
 
-  // preallocated rx buffer?
+#if 0
+  // preallocated rx ceph::buffer?
   if (op->con) {
-    ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on "
+    ldout(cct, 20) << " revoking rx ceph::buffer for " << op->tid << " on "
                   << op->con << dendl;
     op->con->revoke_rx_buffer(op->tid);
   }
   if (op->outbl &&
       op->ontimeout == 0 &&  // only post rx_buffer if no timeout; see #9582
       op->outbl->length()) {
-    ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con
+    op->outbl->invalidate_crc();  // messenger writes through c_str()
+    ldout(cct, 20) << " posting rx ceph::buffer for " << op->tid << " on " << con
                   << dendl;
     op->con = con;
     op->con->post_rx_buffer(op->tid, *op->outbl);
   }
+#endif
 
   op->incarnation = op->session->incarnation;
 
-  m->set_tid(op->tid);
-
   if (op->trace.valid()) {
     m->trace.init("op msg", nullptr, &op->trace);
   }
   op->session->con->send_message(m);
 }
 
-int Objecter::calc_op_budget(Op *op)
+int Objecter::calc_op_budget(const bc::small_vector_base<OSDOp>& ops)
 {
   int op_budget = 0;
-  for (vector<OSDOp>::iterator i = op->ops.begin();
-       i != op->ops.end();
-       ++i) {
+  for (auto i = ops.begin(); i != ops.end(); ++i) {
     if (i->op.op & CEPH_OSD_OP_MODE_WR) {
       op_budget += i->indata.length();
     } else if (ceph_osd_op_mode_read(i->op.op)) {
-      if (ceph_osd_op_type_data(i->op.op)) {
-       if ((int64_t)i->op.extent.length > 0)
-         op_budget += (int64_t)i->op.extent.length;
+      if (ceph_osd_op_uses_extent(i->op.op)) {
+        if ((int64_t)i->op.extent.length > 0)
+          op_budget += (int64_t)i->op.extent.length;
       } else if (ceph_osd_op_type_attr(i->op.op)) {
-       op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
+        op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
       }
     }
   }
@@ -3271,14 +3329,14 @@ int Objecter::calc_op_budget(Op *op)
 }
 
 void Objecter::_throttle_op(Op *op,
-                           shunique_lock& sul,
+                           shunique_lock<ceph::shared_mutex>& sul,
                            int op_budget)
 {
-  assert(sul && sul.mutex() == &rwlock);
+  ceph_assert(sul && sul.mutex() == &rwlock);
   bool locked_for_write = sul.owns_lock();
 
   if (!op_budget)
-    op_budget = calc_op_budget(op);
+    op_budget = calc_op_budget(op->ops);
   if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
     sul.unlock();
     op_throttle_bytes.get(op_budget);
@@ -3297,15 +3355,9 @@ void Objecter::_throttle_op(Op *op,
   }
 }
 
-void Objecter::unregister_op(Op *op)
+int Objecter::take_linger_budget(LingerOp *info)
 {
-  OSDSession::unique_lock sl(op->session->lock);
-  op->session->ops.erase(op->tid);
-  sl.unlock();
-  put_session(op->session);
-  op->session = NULL;
-
-  inflight_ops--;
+  return 1;
 }
 
 /* This function DOES put the passed message before returning */
@@ -3323,17 +3375,15 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
-    if (s) {
-      s->put();
-    }
     m->put();
     return;
   }
 
-  OSDSession::unique_lock sl(s->lock);
+  unique_lock sl(s->lock);
 
   map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
   if (iter == s->ops.end()) {
@@ -3342,7 +3392,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
                                                    " onnvram" : " ack"))
                  << " ... stray" << dendl;
     sl.unlock();
-    put_session(s);
     m->put();
     return;
   }
@@ -3360,12 +3409,11 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   if (retry_writes_after_first_reply && op->attempts == 1 &&
       (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
     ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
-    if (op->onfinish) {
+    if (op->has_completion()) {
       num_in_flight--;
     }
     _session_op_remove(s, op);
     sl.unlock();
-    put_session(s);
 
     _op_submit(op, sul, NULL);
     m->put();
@@ -3381,7 +3429,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
                    << op->session->con->get_peer_addr() << dendl;
       m->put();
       sl.unlock();
-      put_session(s);
       return;
     }
   } else {
@@ -3390,24 +3437,25 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     // have, but that is better than doing callbacks out of order.
   }
 
-  Context *onfinish = 0;
+  decltype(op->onfinish) onfinish;
 
   int rc = m->get_result();
 
   if (m->is_redirect_reply()) {
     ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
-    if (op->onfinish)
+    if (op->has_completion())
       num_in_flight--;
     _session_op_remove(s, op);
     sl.unlock();
-    put_session(s);
 
     // FIXME: two redirects could race and reorder
 
     op->tid = 0;
     m->get_redirect().combine_with_locator(op->target.target_oloc,
                                           op->target.target_oid.name);
-    op->target.flags |= CEPH_OSD_FLAG_REDIRECTED;
+    op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
+                        CEPH_OSD_FLAG_IGNORE_CACHE |
+                        CEPH_OSD_FLAG_IGNORE_OVERLAY);
     _op_submit(op, sul, NULL);
     m->put();
     return;
@@ -3415,11 +3463,10 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
   if (rc == -EAGAIN) {
     ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
-    if (op->onfinish)
+    if (op->has_completion())
       num_in_flight--;
     _session_op_remove(s, op);
     sl.unlock();
-    put_session(s);
 
     op->tid = 0;
     op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
@@ -3441,9 +3488,27 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
   // got data?
   if (op->outbl) {
+#if 0
     if (op->con)
       op->con->revoke_rx_buffer(op->tid);
-    m->claim_data(*op->outbl);
+#endif
+    auto& bl = m->get_data();
+    if (op->outbl->length() == bl.length() &&
+       bl.get_num_buffers() <= 1) {
+      // this is here to keep previous users to *relied* on getting data
+      // read into existing buffers happy.  Notably,
+      // libradosstriper::RadosStriperImpl::aio_read().
+      ldout(cct,10) << __func__ << " copying resulting " << bl.length()
+                   << " into existing ceph::buffer of length " << op->outbl->length()
+                   << dendl;
+      cb::list t;
+      t = std::move(*op->outbl);
+      t.invalidate_crc();  // we're overwriting the raw buffers via c_str()
+      bl.begin().copy(bl.length(), t.c_str());
+      op->outbl->substr_of(t, 0, bl.length());
+    } else {
+      m->claim_data(*op->outbl);
+    }
     op->outbl = 0;
   }
 
@@ -3456,15 +3521,20 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
                  << " != request ops " << op->ops
                  << " from " << m->get_source_inst() << dendl;
 
-  vector<bufferlist*>::iterator pb = op->out_bl.begin();
-  vector<int*>::iterator pr = op->out_rval.begin();
-  vector<Context*>::iterator ph = op->out_handler.begin();
-  assert(op->out_bl.size() == op->out_rval.size());
-  assert(op->out_bl.size() == op->out_handler.size());
-  vector<OSDOp>::iterator p = out_ops.begin();
+  ceph_assert(op->ops.size() == op->out_bl.size());
+  ceph_assert(op->ops.size() == op->out_rval.size());
+  ceph_assert(op->ops.size() == op->out_ec.size());
+  ceph_assert(op->ops.size() == op->out_handler.size());
+  auto pb = op->out_bl.begin();
+  auto pr = op->out_rval.begin();
+  auto pe = op->out_ec.begin();
+  auto ph = op->out_handler.begin();
+  ceph_assert(op->out_bl.size() == op->out_rval.size());
+  ceph_assert(op->out_bl.size() == op->out_handler.size());
+  auto p = out_ops.begin();
   for (unsigned i = 0;
        p != out_ops.end() && pb != op->out_bl.end();
-       ++i, ++p, ++pb, ++pr, ++ph) {
+       ++i, ++p, ++pb, ++pr, ++pe, ++ph) {
     ldout(cct, 10) << " op " << i << " rval " << p->rval
                   << " len " << p->outdata.length() << dendl;
     if (*pb)
@@ -3473,20 +3543,24 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     // can change it if e.g. decoding fails
     if (*pr)
       **pr = ceph_to_hostos_errno(p->rval);
+    if (*pe)
+      **pe = p->rval < 0 ? bs::error_code(-p->rval, osd_category()) :
+       bs::error_code();
     if (*ph) {
-      ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
-      (*ph)->complete(ceph_to_hostos_errno(p->rval));
-      *ph = NULL;
+      std::move((*ph))(p->rval < 0 ?
+                      bs::error_code(-p->rval, osd_category()) :
+                      bs::error_code(),
+                      p->rval, p->outdata);
     }
   }
 
   // NOTE: we assume that since we only request ONDISK ever we will
   // only ever get back one (type of) ack ever.
 
-  if (op->onfinish) {
+  if (op->has_completion()) {
     num_in_flight--;
-    onfinish = op->onfinish;
-    op->onfinish = NULL;
+    onfinish = std::move(op->onfinish);
+    op->onfinish = nullptr;
   }
   logger->inc(l_osdc_op_reply);
 
@@ -3505,15 +3579,14 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   sl.unlock();
 
   // do callbacks
-  if (onfinish) {
-    onfinish->complete(rc);
+  if (Op::has_completion(onfinish)) {
+    Op::complete(std::move(onfinish), osdcode(rc), rc);
   }
   if (completion_lock.mutex()) {
     completion_lock.unlock();
   }
 
   m->put();
-  put_session(s);
 }
 
 void Objecter::handle_osd_backoff(MOSDBackoff *m)
@@ -3526,19 +3599,17 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m)
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
-    if (s)
-      s->put();
     m->put();
     return;
   }
 
   get_session(s);
-  s->put();  // from get_priv() above
 
-  OSDSession::unique_lock sl(s->lock);
+  unique_lock sl(s->lock);
 
   switch (m->op) {
   case CEPH_OSD_BACKOFF_OP_BLOCK:
@@ -3553,10 +3624,9 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m)
 
       // ack with original backoff's epoch so that the osd can discard this if
       // there was a pg split.
-      Message *r = new MOSDBackoff(m->pgid,
-                                  m->map_epoch,
-                                  CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
-                                  m->id, m->begin, m->end);
+      auto r = new MOSDBackoff(m->pgid, m->map_epoch,
+                              CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
+                              m->id, m->begin, m->end);
       // this priority must match the MOSDOps from _prepare_osd_op
       r->set_priority(cct->_conf->osd_client_op_priority);
       con->send_message(r);
@@ -3581,7 +3651,7 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m)
                       << " [" << b->begin << "," << b->end
                       << ")" << dendl;
        auto spgp = s->backoffs.find(b->pgid);
-       assert(spgp != s->backoffs.end());
+       ceph_assert(spgp != s->backoffs.end());
        spgp->second.erase(b->begin);
        if (spgp->second.empty()) {
          s->backoffs.erase(spgp);
@@ -3624,7 +3694,7 @@ uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
   shared_lock rl(rwlock);
   list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
                                pos, list_context->pool_id, string());
-  ldout(cct, 10) << __func__ << list_context
+  ldout(cct, 10) << __func__ << " " << list_context
                 << " pos " << pos << " -> " << list_context->pos << dendl;
   pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
   list_context->current_pg = actual.ps();
@@ -3719,7 +3789,7 @@ void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
   op.pg_nls(list_context->max_entries, list_context->filter,
            list_context->pos, osdmap->get_epoch());
   list_context->bl.clear();
-  C_NList *onack = new C_NList(list_context, onfinish, this);
+  auto onack = new C_NList(list_context, onfinish, this);
   object_locator_t oloc(list_context->pool_id, list_context->nspace);
 
   // note current_pg in case we don't have (or lose) SORTBITWISE
@@ -3736,12 +3806,13 @@ void Objecter::_nlist_reply(NListContext *list_context, int r,
 {
   ldout(cct, 10) << __func__ << " " << list_context << dendl;
 
-  bufferlist::iterator iter = list_context->bl.begin();
+  auto iter = list_context->bl.cbegin();
   pg_nls_response_t response;
-  bufferlist extra_info;
-  ::decode(response, iter);
+  decode(response, iter);
   if (!iter.end()) {
-    ::decode(extra_info, iter);
+    // we do this as legacy.
+    cb::list legacy_extra_info;
+    decode(legacy_extra_info, iter);
   }
 
   // if the osd returns 1 (newer code), or handle MAX, it means we
@@ -3768,9 +3839,10 @@ void Objecter::_nlist_reply(NListContext *list_context, int r,
                 << ", response.entries " << response.entries
                 << ", handle " << response.handle
                 << ", tentative new pos " << list_context->pos << dendl;
-  list_context->extra_info.append(extra_info);
   if (response_size) {
-    list_context->list.splice(list_context->list.end(), response.entries);
+    std::move(response.entries.begin(), response.entries.end(),
+             std::back_inserter(list_context->list));
+    response.entries.clear();
   }
 
   if (list_context->list.size() >= list_context->max_entries) {
@@ -3799,204 +3871,182 @@ void Objecter::put_nlist_context_budget(NListContext *list_context)
 
 // snapshots
 
-int Objecter::create_pool_snap(int64_t pool, string& snap_name,
-                              Context *onfinish)
+void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name,
+                               decltype(PoolOp::onfinish)&& onfinish)
 {
   unique_lock wl(rwlock);
   ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
                 << snap_name << dendl;
 
   const pg_pool_t *p = osdmap->get_pg_pool(pool);
-  if (!p)
-    return -EINVAL;
-  if (p->snap_exists(snap_name.c_str()))
-    return -EEXIST;
+  if (!p) {
+    onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+    return;
+  }
+  if (p->snap_exists(snap_name)) {
+    onfinish->defer(std::move(onfinish), osdc_errc::snapshot_exists,
+                   cb::list{});
+    return;
+  }
 
-  PoolOp *op = new PoolOp;
-  if (!op)
-    return -ENOMEM;
+  auto op = new PoolOp;
   op->tid = ++last_tid;
   op->pool = pool;
   op->name = snap_name;
-  op->onfinish = onfinish;
+  op->onfinish = std::move(onfinish);
   op->pool_op = POOL_OP_CREATE_SNAP;
   pool_ops[op->tid] = op;
 
   pool_op_submit(op);
-
-  return 0;
 }
 
-struct C_SelfmanagedSnap : public Context {
-  bufferlist bl;
-  snapid_t *psnapid;
-  Context *fin;
-  C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
-  void finish(int r) override {
-    if (r == 0) {
-      bufferlist::iterator p = bl.begin();
-      ::decode(*psnapid, p);
+struct CB_SelfmanagedSnap {
+  std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> fin;
+  CB_SelfmanagedSnap(decltype(fin)&& fin)
+    : fin(std::move(fin)) {}
+  void operator()(bs::error_code ec, const cb::list& bl) {
+    snapid_t snapid = 0;
+    if (!ec) {
+      try {
+       auto p = bl.cbegin();
+       decode(snapid, p);
+      } catch (const cb::error& e) {
+        ec = e.code();
+      }
     }
-    fin->complete(r);
+    fin->defer(std::move(fin), ec, snapid);
   }
 };
 
-int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
-                                       Context *onfinish)
+void Objecter::allocate_selfmanaged_snap(
+  int64_t pool,
+  std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> onfinish)
 {
   unique_lock wl(rwlock);
   ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
-  PoolOp *op = new PoolOp;
-  if (!op) return -ENOMEM;
+  auto op = new PoolOp;
   op->tid = ++last_tid;
   op->pool = pool;
-  C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
-  op->onfinish = fin;
-  op->blp = &fin->bl;
+  op->onfinish = PoolOp::OpComp::create(
+    service.get_executor(),
+    CB_SelfmanagedSnap(std::move(onfinish)));
   op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
   pool_ops[op->tid] = op;
 
   pool_op_submit(op);
-  return 0;
 }
 
-int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
-                              Context *onfinish)
+void Objecter::delete_pool_snap(
+  int64_t pool, std::string_view snap_name,
+  decltype(PoolOp::onfinish)&& onfinish)
 {
   unique_lock wl(rwlock);
   ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
                 << snap_name << dendl;
 
   const pg_pool_t *p = osdmap->get_pg_pool(pool);
-  if (!p)
-    return -EINVAL;
-  if (!p->snap_exists(snap_name.c_str()))
-    return -ENOENT;
+  if (!p) {
+    onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+    return;
+  }
+
+  if (!p->snap_exists(snap_name)) {
+    onfinish->defer(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{});
+    return;
+  }
 
-  PoolOp *op = new PoolOp;
-  if (!op)
-    return -ENOMEM;
+  auto op = new PoolOp;
   op->tid = ++last_tid;
   op->pool = pool;
   op->name = snap_name;
-  op->onfinish = onfinish;
+  op->onfinish = std::move(onfinish);
   op->pool_op = POOL_OP_DELETE_SNAP;
   pool_ops[op->tid] = op;
 
   pool_op_submit(op);
-
-  return 0;
 }
 
-int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
-                                     Context *onfinish)
+void Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
+                                      decltype(PoolOp::onfinish)&& onfinish)
 {
   unique_lock wl(rwlock);
   ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
                 << snap << dendl;
-  PoolOp *op = new PoolOp;
-  if (!op) return -ENOMEM;
+  auto op = new PoolOp;
   op->tid = ++last_tid;
   op->pool = pool;
-  op->onfinish = onfinish;
+  op->onfinish = std::move(onfinish);
   op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
   op->snapid = snap;
   pool_ops[op->tid] = op;
 
   pool_op_submit(op);
-
-  return 0;
 }
 
-int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
-                         int crush_rule)
+void Objecter::create_pool(std::string_view name,
+                          decltype(PoolOp::onfinish)&& onfinish,
+                          int crush_rule)
 {
   unique_lock wl(rwlock);
   ldout(cct, 10) << "create_pool name=" << name << dendl;
 
-  if (osdmap->lookup_pg_pool_name(name) >= 0)
-    return -EEXIST;
+  if (osdmap->lookup_pg_pool_name(name) >= 0) {
+    onfinish->defer(std::move(onfinish), osdc_errc::pool_exists, cb::list{});
+    return;
+  }
 
-  PoolOp *op = new PoolOp;
-  if (!op)
-    return -ENOMEM;
+  auto op = new PoolOp;
   op->tid = ++last_tid;
   op->pool = 0;
   op->name = name;
-  op->onfinish = onfinish;
+  op->onfinish = std::move(onfinish);
   op->pool_op = POOL_OP_CREATE;
   pool_ops[op->tid] = op;
-  op->auid = auid;
   op->crush_rule = crush_rule;
 
   pool_op_submit(op);
-
-  return 0;
 }
 
-int Objecter::delete_pool(int64_t pool, Context *onfinish)
+void Objecter::delete_pool(int64_t pool,
+                          decltype(PoolOp::onfinish)&& onfinish)
 {
   unique_lock wl(rwlock);
   ldout(cct, 10) << "delete_pool " << pool << dendl;
 
   if (!osdmap->have_pg_pool(pool))
-    return -ENOENT;
-
-  _do_delete_pool(pool, onfinish);
-  return 0;
+    onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+  else
+    _do_delete_pool(pool, std::move(onfinish));
 }
 
-int Objecter::delete_pool(const string &pool_name, Context *onfinish)
+void Objecter::delete_pool(std::string_view pool_name,
+                          decltype(PoolOp::onfinish)&& onfinish)
 {
   unique_lock wl(rwlock);
   ldout(cct, 10) << "delete_pool " << pool_name << dendl;
 
   int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
   if (pool < 0)
-    return pool;
-
-  _do_delete_pool(pool, onfinish);
-  return 0;
+    // This only returns one error: -ENOENT.
+    onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+  else
+    _do_delete_pool(pool, std::move(onfinish));
 }
 
-void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
+void Objecter::_do_delete_pool(int64_t pool,
+                              decltype(PoolOp::onfinish)&& onfinish)
+
 {
-  PoolOp *op = new PoolOp;
+  auto op = new PoolOp;
   op->tid = ++last_tid;
   op->pool = pool;
   op->name = "delete";
-  op->onfinish = onfinish;
+  op->onfinish = std::move(onfinish);
   op->pool_op = POOL_OP_DELETE;
   pool_ops[op->tid] = op;
   pool_op_submit(op);
 }
 
-/**
- * change the auid owner of a pool by contacting the monitor.
- * This requires the current connection to have write permissions
- * on both the pool's current auid and the new (parameter) auid.
- * Uses the standard Context callback when done.
- */
-int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
-{
-  unique_lock wl(rwlock);
-  ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
-  PoolOp *op = new PoolOp;
-  if (!op) return -ENOMEM;
-  op->tid = ++last_tid;
-  op->pool = pool;
-  op->name = "change_pool_auid";
-  op->onfinish = onfinish;
-  op->pool_op = POOL_OP_AUID_CHANGE;
-  op->auid = auid;
-  pool_ops[op->tid] = op;
-
-  logger->set(l_osdc_poolop_active, pool_ops.size());
-
-  pool_op_submit(op);
-  return 0;
-}
-
 void Objecter::pool_op_submit(PoolOp *op)
 {
   // rwlock is locked
@@ -4013,27 +4063,29 @@ void Objecter::_pool_op_submit(PoolOp *op)
   // rwlock is locked unique
 
   ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
-  MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
-                          op->name, op->pool_op,
-                          op->auid, last_seen_osdmap_version);
+  auto m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
+                      op->name, op->pool_op,
+                      last_seen_osdmap_version);
   if (op->snapid) m->snapid = op->snapid;
   if (op->crush_rule) m->crush_rule = op->crush_rule;
   monc->send_mon_message(m);
-  op->last_submit = ceph::mono_clock::now();
+  op->last_submit = ceph::coarse_mono_clock::now();
 
   logger->inc(l_osdc_poolop_send);
 }
 
 /**
  * Handle a reply to a PoolOp message. Check that we sent the message
- * and give the caller responsibility for the returned bufferlist.
+ * and give the caller responsibility for the returned cb::list.
  * Then either call the finisher or stash the PoolOp, depending on if we
  * have a new enough map.
  * Lastly, clean up the message and PoolOp.
  */
 void Objecter::handle_pool_op_reply(MPoolOpReply *m)
 {
-  FUNCTRACE();
+  int rc = m->replyCode;
+  auto ec = rc < 0 ? bs::error_code(-rc, mon_category()) : bs::error_code();
+  FUNCTRACE(cct);
   shunique_lock sul(rwlock, acquire_shared);
   if (!initialized) {
     sul.unlock();
@@ -4043,13 +4095,12 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m)
 
   ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
   ceph_tid_t tid = m->get_tid();
-  map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
+  auto iter = pool_ops.find(tid);
   if (iter != pool_ops.end()) {
     PoolOp *op = iter->second;
     ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
                   << ceph_pool_op_name(op->pool_op) << dendl;
-    if (op->blp)
-      op->blp->claim(m->response_data);
+    cb::list bl{std::move(m->response_data)};
     if (m->version > last_seen_osdmap_version)
       last_seen_osdmap_version = m->version;
     if (osdmap->get_epoch() < m->epoch) {
@@ -4063,19 +4114,27 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m)
       if (osdmap->get_epoch() < m->epoch) {
        ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
                       << " before calling back" << dendl;
-       _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
+       _wait_for_new_map(OpCompletion::create(
+                           service.get_executor(),
+                           [o = std::move(op->onfinish),
+                            bl = std::move(bl)](
+                             bs::error_code ec) mutable {
+                             o->defer(std::move(o), ec, bl);
+                           }),
+                         m->epoch,
+                         ec);
       } else {
        // map epoch changed, probably because a MOSDMap message
        // sneaked in. Do caller-specified callback now or else
        // we lose it forever.
-       assert(op->onfinish);
-       op->onfinish->complete(m->replyCode);
+       ceph_assert(op->onfinish);
+       op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
       }
     } else {
-      assert(op->onfinish);
-      op->onfinish->complete(m->replyCode);
+      ceph_assert(op->onfinish);
+      op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
     }
-    op->onfinish = NULL;
+    op->onfinish = nullptr;
     if (!sul.owns_lock()) {
       sul.unlock();
       sul.lock();
@@ -4098,11 +4157,11 @@ done:
 
 int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
   unique_lock wl(rwlock);
 
-  map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
+  auto it = pool_ops.find(tid);
   if (it == pool_ops.end()) {
     ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
     return -ENOENT;
@@ -4112,7 +4171,7 @@ int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
 
   PoolOp *op = it->second;
   if (op->onfinish)
-    op->onfinish->complete(r);
+    op->onfinish->defer(std::move(op->onfinish), osdcode(r), cb::list{});
 
   _finish_pool_op(op, r);
   return 0;
@@ -4133,17 +4192,16 @@ void Objecter::_finish_pool_op(PoolOp *op, int r)
 
 // pool stats
 
-void Objecter::get_pool_stats(list<string>& pools,
-                             map<string,pool_stat_t> *result,
-                             Context *onfinish)
+void Objecter::get_pool_stats(
+  const std::vector<std::string>& pools,
+  decltype(PoolStatOp::onfinish)&& onfinish)
 {
   ldout(cct, 10) << "get_pool_stats " << pools << dendl;
 
-  PoolStatOp *op = new PoolStatOp;
+  auto op = new PoolStatOp;
   op->tid = ++last_tid;
   op->pools = pools;
-  op->pool_stats = result;
-  op->onfinish = onfinish;
+  op->onfinish = std::move(onfinish);
   if (mon_timeout > timespan(0)) {
     op->ontimeout = timer.add_event(mon_timeout,
                                    [this, op]() {
@@ -4168,7 +4226,7 @@ void Objecter::_poolstat_submit(PoolStatOp *op)
   monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
                                           op->pools,
                                           last_seen_pgmap_version));
-  op->last_submit = ceph::mono_clock::now();
+  op->last_submit = ceph::coarse_mono_clock::now();
 
   logger->inc(l_osdc_poolstat_send);
 }
@@ -4184,15 +4242,15 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
     return;
   }
 
-  map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
+  auto iter = poolstat_ops.find(tid);
   if (iter != poolstat_ops.end()) {
     PoolStatOp *op = poolstat_ops[tid];
     ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
-    *op->pool_stats = m->pool_stats;
     if (m->version > last_seen_pgmap_version) {
       last_seen_pgmap_version = m->version;
     }
-    op->onfinish->complete(0);
+    op->onfinish->defer(std::move(op->onfinish), bs::error_code{},
+                       std::move(m->pool_stats), m->per_pool);
     _finish_pool_stat_op(op, 0);
   } else {
     ldout(cct, 10) << "unknown request " << tid << dendl;
@@ -4203,11 +4261,11 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
 
 int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
   unique_lock wl(rwlock);
 
-  map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
+  auto it = poolstat_ops.find(tid);
   if (it == poolstat_ops.end()) {
     ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
     return -ENOENT;
@@ -4215,9 +4273,10 @@ int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
 
   ldout(cct, 10) << __func__ << " tid " << tid << dendl;
 
-  PoolStatOp *op = it->second;
+  auto op = it->second;
   if (op->onfinish)
-    op->onfinish->complete(r);
+    op->onfinish->defer(std::move(op->onfinish), osdcode(r),
+                       bc::flat_map<std::string, pool_stat_t>{}, false);
   _finish_pool_stat_op(op, r);
   return 0;
 }
@@ -4235,18 +4294,16 @@ void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
   delete op;
 }
 
-void Objecter::get_fs_stats(ceph_statfs& result,
-                           boost::optional<int64_t> data_pool,
-                           Context *onfinish)
+void Objecter::get_fs_stats(std::optional<int64_t> poolid,
+                           decltype(StatfsOp::onfinish)&& onfinish)
 {
   ldout(cct, 10) << "get_fs_stats" << dendl;
   unique_lock l(rwlock);
 
-  StatfsOp *op = new StatfsOp;
+  auto op = new StatfsOp;
   op->tid = ++last_tid;
-  op->stats = &result;
-  op->data_pool = data_pool;
-  op->onfinish = onfinish;
+  op->data_pool = poolid;
+  op->onfinish = std::move(onfinish);
   if (mon_timeout > timespan(0)) {
     op->ontimeout = timer.add_event(mon_timeout,
                                    [this, op]() {
@@ -4270,7 +4327,7 @@ void Objecter::_fs_stats_submit(StatfsOp *op)
   monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
                                     op->data_pool,
                                     last_seen_pgmap_version));
-  op->last_submit = ceph::mono_clock::now();
+  op->last_submit = ceph::coarse_mono_clock::now();
 
   logger->inc(l_osdc_statfs_send);
 }
@@ -4289,10 +4346,9 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m)
   if (statfs_ops.count(tid)) {
     StatfsOp *op = statfs_ops[tid];
     ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
-    *(op->stats) = m->h.st;
     if (m->h.version > last_seen_pgmap_version)
       last_seen_pgmap_version = m->h.version;
-    op->onfinish->complete(0);
+    op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, m->h.st);
     _finish_statfs_op(op, 0);
   } else {
     ldout(cct, 10) << "unknown request " << tid << dendl;
@@ -4303,11 +4359,11 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m)
 
 int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
   unique_lock wl(rwlock);
 
-  map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
+  auto it = statfs_ops.find(tid);
   if (it == statfs_ops.end()) {
     ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
     return -ENOENT;
@@ -4315,9 +4371,9 @@ int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
 
   ldout(cct, 10) << __func__ << " tid " << tid << dendl;
 
-  StatfsOp *op = it->second;
+  auto op = it->second;
   if (op->onfinish)
-    op->onfinish->complete(r);
+    op->onfinish->defer(std::move(op->onfinish), osdcode(r), ceph_statfs{});
   _finish_statfs_op(op, r);
   return 0;
 }
@@ -4338,16 +4394,16 @@ void Objecter::_finish_statfs_op(StatfsOp *op, int r)
 // scatter/gather
 
 void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
-                              vector<bufferlist>& resultbl,
-                              bufferlist *bl, Context *onfinish)
+                              vector<cb::list>& resultbl,
+                              cb::list *bl, Context *onfinish)
 {
   // all done
   ldout(cct, 15) << "_sg_read_finish" << dendl;
 
   if (extents.size() > 1) {
     Striper::StripedReadResult r;
-    vector<bufferlist>::iterator bit = resultbl.begin();
-    for (vector<ObjectExtent>::iterator eit = extents.begin();
+    auto bit = resultbl.begin();
+    for (auto eit = extents.begin();
         eit != extents.end();
         ++eit, ++bit) {
       r.add_partial_result(cct, *bit, eit->buffer_extents);
@@ -4356,7 +4412,7 @@ void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
     r.assemble_result(cct, *bl, false);
   } else {
     ldout(cct, 15) << "  only one frag" << dendl;
-    bl->claim(resultbl[0]);
+    *bl = std::move(resultbl[0]);
   }
 
   // done
@@ -4384,24 +4440,28 @@ bool Objecter::ms_handle_reset(Connection *con)
   if (!initialized)
     return false;
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
-    OSDSession *session = static_cast<OSDSession*>(con->get_priv());
+    unique_lock wl(rwlock);
+
+    auto priv = con->get_priv();
+    auto session = static_cast<OSDSession*>(priv.get());
     if (session) {
       ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
                    << " osd." << session->osd << dendl;
-      unique_lock wl(rwlock);
-      if (!initialized) {
+      // the session maybe had been closed if new osdmap just handled
+      // says the osd down
+      if (!(initialized && osdmap->is_up(session->osd))) {
+       ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl;
        wl.unlock();
        return false;
       }
       map<uint64_t, LingerOp *> lresend;
-      OSDSession::unique_lock sl(session->lock);
+      unique_lock sl(session->lock);
       _reopen_session(session);
       _kick_requests(session, lresend);
       sl.unlock();
       _linger_ops_resend(lresend, wl);
       wl.unlock();
       maybe_request_map();
-      session->put();
     }
     return true;
   }
@@ -4428,18 +4488,6 @@ bool Objecter::ms_handle_refused(Connection *con)
   return false;
 }
 
-bool Objecter::ms_get_authorizer(int dest_type,
-                                AuthAuthorizer **authorizer,
-                                bool force_new)
-{
-  if (!initialized)
-    return false;
-  if (dest_type == CEPH_ENTITY_TYPE_MON)
-    return true;
-  *authorizer = monc->build_authorizer(dest_type);
-  return *authorizer != NULL;
-}
-
 void Objecter::op_target_t::dump(Formatter *f) const
 {
   f->dump_stream("pg") << pgid;
@@ -4455,9 +4503,7 @@ void Objecter::op_target_t::dump(Formatter *f) const
 
 void Objecter::_dump_active(OSDSession *s)
 {
-  for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
-       p != s->ops.end();
-       ++p) {
+  for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
     Op *op = p->second;
     ldout(cct, 20) << op->tid << "\t" << op->target.pgid
                   << "\tosd." << (op->session ? op->session->osd : -1)
@@ -4470,10 +4516,10 @@ void Objecter::_dump_active()
 {
   ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
                 << dendl;
-  for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
+  for (auto siter = osd_sessions.begin();
        siter != osd_sessions.end(); ++siter) {
-    OSDSession *s = siter->second;
-    OSDSession::shared_lock sl(s->lock);
+    auto s = siter->second;
+    shared_lock sl(s->lock);
     _dump_active(s);
     sl.unlock();
   }
@@ -4502,23 +4548,21 @@ void Objecter::dump_requests(Formatter *fmt)
 
 void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
 {
-  for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
-       p != s->ops.end();
-       ++p) {
+  for (auto p = s->ops.begin(); p != s->ops.end(); ++p) {
     Op *op = p->second;
+    auto age = std::chrono::duration<double>(ceph::coarse_mono_clock::now() - op->stamp);
     fmt->open_object_section("op");
     fmt->dump_unsigned("tid", op->tid);
     op->target.dump(fmt);
     fmt->dump_stream("last_sent") << op->stamp;
+    fmt->dump_float("age", age.count());
     fmt->dump_int("attempts", op->attempts);
     fmt->dump_stream("snapid") << op->snapid;
     fmt->dump_stream("snap_context") << op->snapc;
     fmt->dump_stream("mtime") << op->mtime;
 
     fmt->open_array_section("osd_ops");
-    for (vector<OSDOp>::const_iterator it = op->ops.begin();
-        it != op->ops.end();
-        ++it) {
+    for (auto it = op->ops.begin(); it != op->ops.end(); ++it) {
       fmt->dump_stream("osd_op") << *it;
     }
     fmt->close_section(); // osd_ops array
@@ -4531,10 +4575,10 @@ void Objecter::dump_ops(Formatter *fmt)
 {
   // Read-lock on Objecter held
   fmt->open_array_section("ops");
-  for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
+  for (auto siter = osd_sessions.begin();
        siter != osd_sessions.end(); ++siter) {
     OSDSession *s = siter->second;
-    OSDSession::shared_lock sl(s->lock);
+    shared_lock sl(s->lock);
     _dump_ops(s, fmt);
     sl.unlock();
   }
@@ -4544,10 +4588,8 @@ void Objecter::dump_ops(Formatter *fmt)
 
 void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
 {
-  for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
-       p != s->linger_ops.end();
-       ++p) {
-    LingerOp *op = p->second;
+  for (auto p = s->linger_ops.begin(); p != s->linger_ops.end(); ++p) {
+    auto op = p->second;
     fmt->open_object_section("linger_op");
     fmt->dump_unsigned("linger_id", op->linger_id);
     op->target.dump(fmt);
@@ -4561,10 +4603,10 @@ void Objecter::dump_linger_ops(Formatter *fmt)
 {
   // We have a read-lock on the objecter
   fmt->open_array_section("linger_ops");
-  for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
+  for (auto siter = osd_sessions.begin();
        siter != osd_sessions.end(); ++siter) {
-    OSDSession *s = siter->second;
-    OSDSession::shared_lock sl(s->lock);
+    auto s = siter->second;
+    shared_lock sl(s->lock);
     _dump_linger_ops(s, fmt);
     sl.unlock();
   }
@@ -4574,16 +4616,13 @@ void Objecter::dump_linger_ops(Formatter *fmt)
 
 void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
 {
-  for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
-       p != s->command_ops.end();
-       ++p) {
-    CommandOp *op = p->second;
+  for (auto p = s->command_ops.begin(); p != s->command_ops.end(); ++p) {
+    auto op = p->second;
     fmt->open_object_section("command_op");
     fmt->dump_unsigned("command_id", op->tid);
     fmt->dump_int("osd", op->session ? op->session->osd : -1);
     fmt->open_array_section("command");
-    for (vector<string>::const_iterator q = op->cmd.begin();
-        q != op->cmd.end(); ++q)
+    for (auto q = op->cmd.begin(); q != op->cmd.end(); ++q)
       fmt->dump_string("word", *q);
     fmt->close_section();
     if (op->target_osd >= 0)
@@ -4598,10 +4637,10 @@ void Objecter::dump_command_ops(Formatter *fmt)
 {
   // We have a read-lock on the Objecter here
   fmt->open_array_section("command_ops");
-  for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
+  for (auto siter = osd_sessions.begin();
        siter != osd_sessions.end(); ++siter) {
-    OSDSession *s = siter->second;
-    OSDSession::shared_lock sl(s->lock);
+    auto s = siter->second;
+    shared_lock sl(s->lock);
     _dump_command_ops(s, fmt);
     sl.unlock();
   }
@@ -4612,16 +4651,13 @@ void Objecter::dump_command_ops(Formatter *fmt)
 void Objecter::dump_pool_ops(Formatter *fmt) const
 {
   fmt->open_array_section("pool_ops");
-  for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
-       p != pool_ops.end();
-       ++p) {
-    PoolOp *op = p->second;
+  for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) {
+    auto op = p->second;
     fmt->open_object_section("pool_op");
     fmt->dump_unsigned("tid", op->tid);
     fmt->dump_int("pool", op->pool);
     fmt->dump_string("name", op->name);
     fmt->dump_int("operation_type", op->pool_op);
-    fmt->dump_unsigned("auid", op->auid);
     fmt->dump_unsigned("crush_rule", op->crush_rule);
     fmt->dump_stream("snapid") << op->snapid;
     fmt->dump_stream("last_sent") << op->last_submit;
@@ -4633,7 +4669,7 @@ void Objecter::dump_pool_ops(Formatter *fmt) const
 void Objecter::dump_pool_stat_ops(Formatter *fmt) const
 {
   fmt->open_array_section("pool_stat_ops");
-  for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
+  for (auto p = poolstat_ops.begin();
        p != poolstat_ops.end();
        ++p) {
     PoolStatOp *op = p->second;
@@ -4642,10 +4678,8 @@ void Objecter::dump_pool_stat_ops(Formatter *fmt) const
     fmt->dump_stream("last_sent") << op->last_submit;
 
     fmt->open_array_section("pools");
-    for (list<string>::const_iterator it = op->pools.begin();
-        it != op->pools.end();
-        ++it) {
-      fmt->dump_string("pool", *it);
+    for (const auto& it : op->pools) {
+      fmt->dump_string("pool", it);
     }
     fmt->close_section(); // pools array
 
@@ -4657,10 +4691,8 @@ void Objecter::dump_pool_stat_ops(Formatter *fmt) const
 void Objecter::dump_statfs_ops(Formatter *fmt) const
 {
   fmt->open_array_section("statfs_ops");
-  for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
-       p != statfs_ops.end();
-       ++p) {
-    StatfsOp *op = p->second;
+  for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) {
+    auto op = p->second;
     fmt->open_object_section("statfs_op");
     fmt->dump_unsigned("tid", op->tid);
     fmt->dump_stream("last_sent") << op->last_submit;
@@ -4674,34 +4706,39 @@ Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
 {
 }
 
-bool Objecter::RequestStateHook::call(std::string command, cmdmap_t& cmdmap,
-                                     std::string format, bufferlist& out)
+int Objecter::RequestStateHook::call(std::string_view command,
+                                    const cmdmap_t& cmdmap,
+                                    const bufferlist&,
+                                    Formatter *f,
+                                    std::ostream& ss,
+                                    cb::list& out)
 {
-  Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
   shared_lock rl(m_objecter->rwlock);
   m_objecter->dump_requests(f);
-  f->flush(out);
-  delete f;
-  return true;
+  return 0;
 }
 
-void Objecter::blacklist_self(bool set)
+void Objecter::blocklist_self(bool set)
 {
-  ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
+  ldout(cct, 10) << "blocklist_self " << (set ? "add" : "rm") << dendl;
 
   vector<string> cmd;
-  cmd.push_back("{\"prefix\":\"osd blacklist\", ");
+  cmd.push_back("{\"prefix\":\"osd blocklist\", ");
   if (set)
-    cmd.push_back("\"blacklistop\":\"add\",");
+    cmd.push_back("\"blocklistop\":\"add\",");
   else
-    cmd.push_back("\"blacklistop\":\"rm\",");
+    cmd.push_back("\"blocklistop\":\"rm\",");
   stringstream ss;
-  ss << messenger->get_myaddr();
+  // this is somewhat imprecise in that we are blocklisting our first addr only
+  ss << messenger->get_myaddrs().front().get_legacy_str();
   cmd.push_back("\"addr\":\"" + ss.str() + "\"");
 
-  MMonCommand *m = new MMonCommand(monc->get_fsid());
+  auto m = new MMonCommand(monc->get_fsid());
   m->cmd = cmd;
 
+  // NOTE: no fallback to legacy blacklist command implemented here
+  // since this is only used for test code.
+
   monc->send_mon_message(m);
 }
 
@@ -4716,24 +4753,21 @@ void Objecter::handle_command_reply(MCommandReply *m)
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
     m->put();
-    if (s)
-      s->put();
     return;
   }
 
-  OSDSession::shared_lock sl(s->lock);
-  map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
+  shared_lock sl(s->lock);
+  auto p = s->command_ops.find(m->get_tid());
   if (p == s->command_ops.end()) {
     ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
                   << " not found" << dendl;
     m->put();
     sl.unlock();
-    if (s)
-      s->put();
     return;
   }
 
@@ -4746,23 +4780,39 @@ void Objecter::handle_command_reply(MCommandReply *m)
                   << dendl;
     m->put();
     sl.unlock();
-    if (s)
-      s->put();
     return;
   }
-  if (c->poutbl) {
-    c->poutbl->claim(m->get_data());
+
+  if (m->r == -EAGAIN) {
+    ldout(cct,10) << __func__ << " tid " << m->get_tid()
+                 << " got EAGAIN, requesting map and resending" << dendl;
+    // NOTE: This might resend twice... once now, and once again when
+    // we get an updated osdmap and the PG is found to have moved.
+    _maybe_request_map();
+    _send_command(c);
+    m->put();
+    sl.unlock();
+    return;
   }
 
   sl.unlock();
 
+  unique_lock sul(s->lock);
+  _finish_command(c, m->r < 0 ? bs::error_code(-m->r, osd_category()) :
+                 bs::error_code(), std::move(m->rs),
+                 std::move(m->get_data()));
+  sul.unlock();
 
-  _finish_command(c, m->r, m->rs);
   m->put();
-  if (s)
-    s->put();
 }
 
+Objecter::LingerOp::LingerOp(Objecter *o, uint64_t linger_id)
+  : objecter(o),
+    linger_id(linger_id),
+    watch_lock(ceph::make_shared_mutex(
+                fmt::format("LingerOp::watch_lock #{}", linger_id)))
+{}
+
 void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
 {
   shunique_lock sul(rwlock, ceph::acquire_unique);
@@ -4772,7 +4822,7 @@ void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
   c->tid = tid;
 
   {
-    OSDSession::unique_lock hs_wl(homeless_session->lock);
+    unique_lock hs_wl(homeless_session->lock);
     _session_command_op_assign(homeless_session, c);
   }
 
@@ -4781,8 +4831,9 @@ void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
   if (osd_timeout > timespan(0)) {
     c->ontimeout = timer.add_event(osd_timeout,
                                   [this, c, tid]() {
-                                    command_op_cancel(c->session, tid,
-                                                      -ETIMEDOUT); });
+                                    command_op_cancel(
+                                      c->session, tid,
+                                      osdc_errc::timed_out); });
   }
 
   if (!c->session->is_homeless()) {
@@ -4792,14 +4843,16 @@ void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
   }
   if (c->map_check_error)
     _send_command_map_check(c);
-  *ptid = tid;
+  if (ptid)
+    *ptid = tid;
 
   logger->inc(l_osdc_command_active);
 }
 
-int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
+int Objecter::_calc_command_target(CommandOp *c,
+                                  shunique_lock<ceph::shared_mutex>& sul)
 {
-  assert(sul.owns_lock() && sul.mutex() == &rwlock);
+  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
 
   c->map_check_error = 0;
 
@@ -4837,7 +4890,7 @@ int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
 
   OSDSession *s;
   int r = _get_session(c->target.osd, &s, sul);
-  assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+  ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
 
   if (c->session != s) {
     put_session(s);
@@ -4853,22 +4906,22 @@ int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
 }
 
 void Objecter::_assign_command_session(CommandOp *c,
-                                      shunique_lock& sul)
+                                      shunique_lock<ceph::shared_mutex>& sul)
 {
-  assert(sul.owns_lock() && sul.mutex() == &rwlock);
+  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
 
   OSDSession *s;
   int r = _get_session(c->target.osd, &s, sul);
-  assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+  ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
 
   if (c->session != s) {
     if (c->session) {
       OSDSession *cs = c->session;
-      OSDSession::unique_lock csl(cs->lock);
+      unique_lock csl(cs->lock);
       _session_command_op_remove(c->session, c);
       csl.unlock();
     }
-    OSDSession::unique_lock sl(s->lock);
+    unique_lock sl(s->lock);
     _session_command_op_assign(s, c);
   }
 
@@ -4878,9 +4931,9 @@ void Objecter::_assign_command_session(CommandOp *c,
 void Objecter::_send_command(CommandOp *c)
 {
   ldout(cct, 10) << "_send_command " << c->tid << dendl;
-  assert(c->session);
-  assert(c->session->con);
-  MCommand *m = new MCommand(monc->monmap.fsid);
+  ceph_assert(c->session);
+  ceph_assert(c->session->con);
+  auto m = new MCommand(monc->monmap.fsid);
   m->cmd = c->cmd;
   m->set_data(c->inbl);
   m->set_tid(c->tid);
@@ -4888,13 +4941,14 @@ void Objecter::_send_command(CommandOp *c)
   logger->inc(l_osdc_command_send);
 }
 
-int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
+int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid,
+                               bs::error_code ec)
 {
-  assert(initialized);
+  ceph_assert(initialized);
 
   unique_lock wl(rwlock);
 
-  map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
+  auto it = s->command_ops.find(tid);
   if (it == s->command_ops.end()) {
     ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
     return -ENOENT;
@@ -4904,28 +4958,28 @@ int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
 
   CommandOp *op = it->second;
   _command_cancel_map_check(op);
-  _finish_command(op, r, "");
+  unique_lock sl(op->session->lock);
+  _finish_command(op, ec, {}, {});
+  sl.unlock();
   return 0;
 }
 
-void Objecter::_finish_command(CommandOp *c, int r, string rs)
+void Objecter::_finish_command(CommandOp *c, bs::error_code ec,
+                              string&& rs, cb::list&& bl)
 {
   // rwlock is locked unique
+  // session lock is locked
 
-  ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
+  ldout(cct, 10) << "_finish_command " << c->tid << " = " << ec << " "
                 << rs << dendl;
-  if (c->prs)
-    *c->prs = rs;
+
   if (c->onfinish)
-    c->onfinish->complete(r);
+    c->onfinish->defer(std::move(c->onfinish), ec, std::move(rs), std::move(bl));
 
-  if (c->ontimeout && r != -ETIMEDOUT)
+  if (c->ontimeout && ec != bs::errc::timed_out)
     timer.cancel_event(c->ontimeout);
 
-  OSDSession *s = c->session;
-  OSDSession::unique_lock sl(s->lock);
   _session_command_op_remove(c->session, c);
-  sl.unlock();
 
   c->put();
 
@@ -4936,31 +4990,38 @@ Objecter::OSDSession::~OSDSession()
 {
   // Caller is responsible for re-assigning or
   // destroying any ops that were assigned to us
-  assert(ops.empty());
-  assert(linger_ops.empty());
-  assert(command_ops.empty());
+  ceph_assert(ops.empty());
+  ceph_assert(linger_ops.empty());
+  ceph_assert(command_ops.empty());
 }
 
-Objecter::~Objecter()
+Objecter::Objecter(CephContext *cct,
+                  Messenger *m, MonClient *mc,
+                  boost::asio::io_context& service) :
+  Dispatcher(cct), messenger(m), monc(mc), service(service)
 {
-  delete osdmap;
+  mon_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
+  osd_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
+}
 
-  assert(homeless_session->get_nref() == 1);
-  assert(num_homeless_ops == 0);
+Objecter::~Objecter()
+{
+  ceph_assert(homeless_session->get_nref() == 1);
+  ceph_assert(num_homeless_ops == 0);
   homeless_session->put();
 
-  assert(osd_sessions.empty());
-  assert(poolstat_ops.empty());
-  assert(statfs_ops.empty());
-  assert(pool_ops.empty());
-  assert(waiting_for_map.empty());
-  assert(linger_ops.empty());
-  assert(check_latest_map_lingers.empty());
-  assert(check_latest_map_ops.empty());
-  assert(check_latest_map_commands.empty());
+  ceph_assert(osd_sessions.empty());
+  ceph_assert(poolstat_ops.empty());
+  ceph_assert(statfs_ops.empty());
+  ceph_assert(pool_ops.empty());
+  ceph_assert(waiting_for_map.empty());
+  ceph_assert(linger_ops.empty());
+  ceph_assert(check_latest_map_lingers.empty());
+  ceph_assert(check_latest_map_ops.empty());
+  ceph_assert(check_latest_map_commands.empty());
 
-  assert(!m_request_state_hook);
-  assert(!logger);
+  ceph_assert(!m_request_state_hook);
+  ceph_assert(!logger);
 }
 
 /**
@@ -4968,7 +5029,7 @@ Objecter::~Objecter()
  * sending any more operations to OSDs.  Use this
  * when it is known that the client can't trust
  * anything from before this epoch (e.g. due to
- * client blacklist at this epoch).
+ * client blocklist at this epoch).
  */
 void Objecter::set_epoch_barrier(epoch_t epoch)
 {
@@ -4995,153 +5056,212 @@ hobject_t Objecter::enumerate_objects_end()
   return hobject_t::get_max();
 }
 
-struct C_EnumerateReply : public Context {
-  bufferlist bl;
-
-  Objecter *objecter;
-  hobject_t *next;
-  std::list<librados::ListObjectImpl> *result;
+template<typename T>
+struct EnumerationContext {
+  Objecter* objecter;
   const hobject_t end;
-  const int64_t pool_id;
-  Context *on_finish;
+  const cb::list filter;
+  uint32_t max;
+  const object_locator_t oloc;
+  std::vector<T> ls;
+private:
+  fu2::unique_function<void(bs::error_code,
+                           std::vector<T>,
+                           hobject_t) &&> on_finish;
+public:
+  epoch_t epoch = 0;
+  int budget = -1;
+
+  EnumerationContext(Objecter* objecter,
+                    hobject_t end, cb::list filter,
+                    uint32_t max, object_locator_t oloc,
+                    decltype(on_finish) on_finish)
+    : objecter(objecter), end(std::move(end)), filter(std::move(filter)),
+      max(max), oloc(std::move(oloc)), on_finish(std::move(on_finish)) {}
+
+  void operator()(bs::error_code ec,
+                 std::vector<T> v,
+                 hobject_t h) && {
+    if (budget >= 0) {
+      objecter->put_op_budget_bytes(budget);
+      budget = -1;
+    }
 
-  epoch_t epoch;
-  int budget;
+    std::move(on_finish)(ec, std::move(v), std::move(h));
+  }
+};
+
+template<typename T>
+struct CB_EnumerateReply {
+  cb::list bl;
+
+  Objecter* objecter;
+  std::unique_ptr<EnumerationContext<T>> ctx;
 
-  C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
-      std::list<librados::ListObjectImpl> *result_,
-      const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
-    objecter(objecter_), next(next_), result(result_),
-    end(end_), pool_id(pool_id_), on_finish(on_finish_),
-    epoch(0), budget(0)
-  {}
+  CB_EnumerateReply(Objecter* objecter,
+                   std::unique_ptr<EnumerationContext<T>>&& ctx) :
+    objecter(objecter), ctx(std::move(ctx)) {}
 
-  void finish(int r) override {
-    objecter->_enumerate_reply(
-      bl, r, end, pool_id, budget, epoch, result, next, on_finish);
+  void operator()(bs::error_code ec) {
+    objecter->_enumerate_reply(std::move(bl), ec, std::move(ctx));
   }
 };
 
+template<typename T>
 void Objecter::enumerate_objects(
-    int64_t pool_id,
-    const std::string &ns,
-    const hobject_t &start,
-    const hobject_t &end,
-    const uint32_t max,
-    const bufferlist &filter_bl,
-    std::list<librados::ListObjectImpl> *result, 
-    hobject_t *next,
-    Context *on_finish)
-{
-  assert(result);
-
+  int64_t pool_id,
+  std::string_view ns,
+  hobject_t start,
+  hobject_t end,
+  const uint32_t max,
+  const cb::list& filter_bl,
+  fu2::unique_function<void(bs::error_code,
+                           std::vector<T>,
+                           hobject_t) &&> on_finish) {
   if (!end.is_max() && start > end) {
     lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
-    on_finish->complete(-EINVAL);
+    std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
     return;
   }
 
   if (max < 1) {
     lderr(cct) << __func__ << ": result size may not be zero" << dendl;
-    on_finish->complete(-EINVAL);
+    std::move(on_finish)(osdc_errc::precondition_violated, {}, {});
     return;
   }
 
   if (start.is_max()) {
-    on_finish->complete(0);
+    std::move(on_finish)({}, {}, {});
     return;
   }
 
   shared_lock rl(rwlock);
-  assert(osdmap->get_epoch());
+  ceph_assert(osdmap->get_epoch());
   if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
     rl.unlock();
     lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
-    on_finish->complete(-EOPNOTSUPP);
+    std::move(on_finish)(osdc_errc::not_supported, {}, {});
     return;
   }
-  const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
+  const pg_pool_tp = osdmap->get_pg_pool(pool_id);
   if (!p) {
     lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
               << osdmap->get_epoch() << dendl;
     rl.unlock();
-    on_finish->complete(-ENOENT);
+    std::move(on_finish)(osdc_errc::pool_dne, {}, {});
     return;
   } else {
     rl.unlock();
   }
 
-  ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
-
-  // Stash completion state
-  C_EnumerateReply *on_ack = new C_EnumerateReply(
-      this, next, result, end, pool_id, on_finish);
-
+  _issue_enumerate(start,
+                  std::make_unique<EnumerationContext<T>>(
+                    this, std::move(end), filter_bl,
+                    max, object_locator_t{pool_id, ns},
+                    std::move(on_finish)));
+}
+
+template
+void Objecter::enumerate_objects<librados::ListObjectImpl>(
+  int64_t pool_id,
+  std::string_view ns,
+  hobject_t start,
+  hobject_t end,
+  const uint32_t max,
+  const cb::list& filter_bl,
+  fu2::unique_function<void(bs::error_code,
+                           std::vector<librados::ListObjectImpl>,
+                           hobject_t) &&> on_finish);
+
+template
+void Objecter::enumerate_objects<neorados::Entry>(
+  int64_t pool_id,
+  std::string_view ns,
+  hobject_t start,
+  hobject_t end,
+  const uint32_t max,
+  const cb::list& filter_bl,
+  fu2::unique_function<void(bs::error_code,
+                           std::vector<neorados::Entry>,
+                           hobject_t) &&> on_finish);
+
+
+
+template<typename T>
+void Objecter::_issue_enumerate(hobject_t start,
+                               std::unique_ptr<EnumerationContext<T>> ctx) {
   ObjectOperation op;
-  op.pg_nls(max, filter_bl, start, 0);
+  auto c = ctx.get();
+  op.pg_nls(c->max, c->filter, start, osdmap->get_epoch());
+  auto on_ack = std::make_unique<CB_EnumerateReply<T>>(this, std::move(ctx));
+  // I hate having to do this. Try to find a cleaner way
+  // later.
+  auto epoch = &c->epoch;
+  auto budget = &c->budget;
+  auto pbl = &on_ack->bl;
 
   // Issue.  See you later in _enumerate_reply
-  object_locator_t oloc(pool_id, ns);
-  pg_read(start.get_hash(), oloc, op,
-         &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
-}
-
+  pg_read(start.get_hash(),
+         c->oloc, op, pbl, 0,
+         Op::OpComp::create(service.get_executor(),
+                            [c = std::move(on_ack)]
+                            (bs::error_code ec) mutable {
+                              (*c)(ec);
+                            }), epoch, budget);
+}
+
+template
+void Objecter::_issue_enumerate<librados::ListObjectImpl>(
+  hobject_t start,
+  std::unique_ptr<EnumerationContext<librados::ListObjectImpl>> ctx);
+template
+void Objecter::_issue_enumerate<neorados::Entry>(
+  hobject_t start, std::unique_ptr<EnumerationContext<neorados::Entry>> ctx);
+
+template<typename T>
 void Objecter::_enumerate_reply(
-    bufferlist &bl,
-    int r,
-    const hobject_t &end,
-    const int64_t pool_id,
-    int budget,
-    epoch_t reply_epoch,
-    std::list<librados::ListObjectImpl> *result,
-    hobject_t *next,
-    Context *on_finish)
-{
-  if (budget > 0) {
-    put_op_budget_bytes(budget);
-  }
-
-  if (r < 0) {
-    ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
-    on_finish->complete(r);
+  cb::list&& bl,
+  bs::error_code ec,
+  std::unique_ptr<EnumerationContext<T>>&& ctx)
+{
+  if (ec) {
+    std::move(*ctx)(ec, {}, {});
     return;
   }
 
-  assert(next != NULL);
-
   // Decode the results
-  bufferlist::iterator iter = bl.begin();
-  pg_nls_response_t response;
+  auto iter = bl.cbegin();
+  pg_nls_response_template<T> response;
+
+  try {
+    response.decode(iter);
+    if (!iter.end()) {
+      // extra_info isn't used anywhere. We do this solely to preserve
+      // backward compatibility
+      cb::list legacy_extra_info;
+      decode(legacy_extra_info, iter);
+    }
+  } catch (const bs::system_error& e) {
+    std::move(*ctx)(e.code(), {}, {});
+    return;
+  }
 
-  // XXX extra_info doesn't seem used anywhere?
-  bufferlist extra_info;
-  ::decode(response, iter);
-  if (!iter.end()) {
-    ::decode(extra_info, iter);
+  shared_lock rl(rwlock);
+  auto pool = osdmap->get_pg_pool(ctx->oloc.get_pool());
+  rl.unlock();
+  if (!pool) {
+    // pool is gone, drop any results which are now meaningless.
+    std::move(*ctx)(osdc_errc::pool_dne, {}, {});
+    return;
   }
 
-  ldout(cct, 10) << __func__ << ": got " << response.entries.size()
-                << " handle " << response.handle
-                << " reply_epoch " << reply_epoch << dendl;
-  ldout(cct, 20) << __func__ << ": response.entries.size "
-                << response.entries.size() << ", response.entries "
-                << response.entries << dendl;
-  if (response.handle <= end) {
-    *next = response.handle;
+  hobject_t next;
+  if ((response.handle <= ctx->end)) {
+    next = response.handle;
   } else {
-    ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
-                  << dendl;
-    *next = end;
+    next = ctx->end;
 
     // drop anything after 'end'
-    shared_lock rl(rwlock);
-    const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
-    if (!pool) {
-      // pool is gone, drop any results which are now meaningless.
-      rl.unlock();
-      on_finish->complete(-ENOENT);
-      return;
-    }
     while (!response.entries.empty()) {
       uint32_t hash = response.entries.back().locator.empty() ?
        pool->hash_key(response.entries.back().oid,
@@ -5152,37 +5272,64 @@ void Objecter::_enumerate_reply(
                     response.entries.back().locator,
                     CEPH_NOSNAP,
                     hash,
-                    pool_id,
+                    ctx->oloc.get_pool(),
                     response.entries.back().nspace);
-      if (last < end)
+      if (last < ctx->end)
        break;
-      ldout(cct, 20) << __func__ << " dropping item " << last
-                    << " >= end " << end << dendl;
       response.entries.pop_back();
     }
-    rl.unlock();
   }
-  if (!response.entries.empty()) {
-    result->merge(response.entries);
+
+  if (response.entries.size() <= ctx->max) {
+    ctx->max -= response.entries.size();
+    std::move(response.entries.begin(), response.entries.end(),
+             std::back_inserter(ctx->ls));
+  } else {
+    auto i = response.entries.begin();
+    while (ctx->max > 0) {
+      ctx->ls.push_back(std::move(*i));
+      --(ctx->max);
+      ++i;
+    }
+    uint32_t hash =
+      i->locator.empty() ?
+      pool->hash_key(i->oid, i->nspace) :
+      pool->hash_key(i->locator, i->nspace);
+
+    next = hobject_t{i->oid, i->locator,
+                    CEPH_NOSNAP,
+                    hash,
+                    ctx->oloc.get_pool(),
+                    i->nspace};
   }
 
-  // release the listing context's budget once all
-  // OPs (in the session) are finished
-#if 0
-  put_nlist_context_budget(list_context);
-#endif
-  on_finish->complete(r);
-  return;
+  if (next == ctx->end || ctx->max == 0) {
+    std::move(*ctx)(ec, std::move(ctx->ls), std::move(next));
+  } else {
+    _issue_enumerate(next, std::move(ctx));
+  }
 }
 
+template
+void Objecter::_enumerate_reply<librados::ListObjectImpl>(
+  cb::list&& bl,
+  bs::error_code ec,
+  std::unique_ptr<EnumerationContext<librados::ListObjectImpl>>&& ctx);
+
+template
+void Objecter::_enumerate_reply<neorados::Entry>(
+  cb::list&& bl,
+  bs::error_code ec,
+  std::unique_ptr<EnumerationContext<neorados::Entry>>&& ctx);
+
 namespace {
   using namespace librados;
 
   template <typename T>
-  void do_decode(std::vector<T>& items, std::vector<bufferlist>& bls)
+  void do_decode(std::vector<T>& items, std::vector<cb::list>& bls)
   {
     for (auto bl : bls) {
-      auto p = bl.begin();
+      auto p = bl.cbegin();
       T t;
       decode(t, p);
       items.push_back(t);
@@ -5190,19 +5337,19 @@ namespace {
   }
 
   struct C_ObjectOperation_scrub_ls : public Context {
-    bufferlist bl;
-    uint32_t *interval;
+    cb::list bl;
+    uint32_tinterval;
     std::vector<inconsistent_obj_t> *objects = nullptr;
     std::vector<inconsistent_snapset_t> *snapsets = nullptr;
-    int *rval;
+    intrval;
 
-    C_ObjectOperation_scrub_ls(uint32_t *interval,
-                              std::vector<inconsistent_obj_t> *objects,
-                              int *rval)
+    C_ObjectOperation_scrub_ls(uint32_tinterval,
+                              std::vector<inconsistent_obj_t>objects,
+                              intrval)
       : interval(interval), objects(objects), rval(rval) {}
-    C_ObjectOperation_scrub_ls(uint32_t *interval,
-                              std::vector<inconsistent_snapset_t> *snapsets,
-                              int *rval)
+    C_ObjectOperation_scrub_ls(uint32_tinterval,
+                              std::vector<inconsistent_snapset_t>snapsets,
+                              intrval)
       : interval(interval), snapsets(snapsets), rval(rval) {}
     void finish(int r) override {
       if (r < 0 && r != -EAGAIN) {
@@ -5216,7 +5363,7 @@ namespace {
 
       try {
        decode();
-      } catch (buffer::error&) {
+      } catch (cb::error&) {
        if (rval)
          *rval = -EIO;
       }
@@ -5224,7 +5371,7 @@ namespace {
   private:
     void decode() {
       scrub_ls_result_t result;
-      auto p = bl.begin();
+      auto p = bl.cbegin();
       result.decode(p);
       *interval = result.interval;
       if (objects) {
@@ -5236,19 +5383,19 @@ namespace {
   };
 
   template <typename T>
-  void do_scrub_ls(::ObjectOperation *op,
+  void do_scrub_ls(::ObjectOperationop,
                   const scrub_ls_arg_t& arg,
                   std::vector<T> *items,
-                  uint32_t *interval,
-                  int *rval)
+                  uint32_tinterval,
+                  intrval)
   {
     OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
     op->flags |= CEPH_OSD_FLAG_PGOP;
-    assert(interval);
+    ceph_assert(interval);
     arg.encode(osd_op.indata);
     unsigned p = op->ops.size() - 1;
-    auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
-    op->out_handler[p] = h;
+    auto h = new C_ObjectOperation_scrub_ls{interval, items, rval};
+    op->set_handler(h);
     op->out_bl[p] = &h->bl;
     op->out_rval[p] = rval;
   }
@@ -5256,9 +5403,9 @@ namespace {
 
 void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
                                 uint64_t max_to_get,
-                                std::vector<librados::inconsistent_obj_t> *objects,
-                                uint32_t *interval,
-                                int *rval)
+                                std::vector<librados::inconsistent_obj_t>objects,
+                                uint32_tinterval,
+                                intrval)
 {
   scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
   do_scrub_ls(this, arg, objects, interval, rval);